aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorFelix Geisendörfer <felix@debuggable.com>2011-04-14 20:33:54 +0200
committerRyan Dahl <ry@tinyclouds.org>2011-04-14 14:12:01 -0700
commit6c5b31bd80d2d38600d84a5be0ae9c5bbad94c01 (patch)
tree9e2a6d70414412470cdbfd4ffd3a198e40425f14
parent8417870f513e6f1ab98e07fe95d3430dbec48b87 (diff)
downloadandroid-node-v8-6c5b31bd80d2d38600d84a5be0ae9c5bbad94c01.tar.gz
android-node-v8-6c5b31bd80d2d38600d84a5be0ae9c5bbad94c01.tar.bz2
android-node-v8-6c5b31bd80d2d38600d84a5be0ae9c5bbad94c01.zip
Fix: Multiple pipes to the same stream were broken
When creating multiple .pipe()s to the same destination stream, the first source to end would close the destination, breaking all remaining pipes. This patch fixes the problem by keeping track of all open pipes, so that we only call end on destinations that have no more sources piping to them. closes #929
-rw-r--r--lib/stream.js29
-rw-r--r--test/simple/test-stream-pipe-cleanup.js21
2 files changed, 41 insertions, 9 deletions
diff --git a/lib/stream.js b/lib/stream.js
index d31a9fe239..632c87d2e2 100644
--- a/lib/stream.js
+++ b/lib/stream.js
@@ -28,9 +28,13 @@ function Stream() {
util.inherits(Stream, events.EventEmitter);
exports.Stream = Stream;
+var pipes = [];
+
Stream.prototype.pipe = function(dest, options) {
var source = this;
+ pipes.push(dest);
+
function ondata(chunk) {
if (dest.writable) {
if (false === dest.write(chunk)) source.pause();
@@ -52,10 +56,18 @@ Stream.prototype.pipe = function(dest, options) {
if (!options || options.end !== false) {
function onend() {
+ var index = pipes.indexOf(dest);
+ pipes.splice(index, 1);
+
+ if (pipes.indexOf(dest) > -1) {
+ return;
+ }
+
dest.end();
}
source.on('end', onend);
+ source.on('close', onend);
}
/*
@@ -73,34 +85,35 @@ Stream.prototype.pipe = function(dest, options) {
source.emit('resume');
};
}
-
+
var onpause = function() {
source.pause();
}
dest.on('pause', onpause);
-
+
var onresume = function() {
if (source.readable) source.resume();
};
-
+
dest.on('resume', onresume);
-
+
var cleanup = function () {
source.removeListener('data', ondata);
dest.removeListener('drain', ondrain);
source.removeListener('end', onend);
-
+ source.removeListener('close', onend);
+
dest.removeListener('pause', onpause);
dest.removeListener('resume', onresume);
-
+
source.removeListener('end', cleanup);
source.removeListener('close', cleanup);
-
+
dest.removeListener('end', cleanup);
dest.removeListener('close', cleanup);
}
-
+
source.on('end', cleanup);
source.on('close', cleanup);
diff --git a/test/simple/test-stream-pipe-cleanup.js b/test/simple/test-stream-pipe-cleanup.js
index fce4ac82a7..32ecd153dc 100644
--- a/test/simple/test-stream-pipe-cleanup.js
+++ b/test/simple/test-stream-pipe-cleanup.js
@@ -28,10 +28,13 @@ var util = require('util');
function Writable () {
this.writable = true;
+ this.endCalls = 0;
stream.Stream.call(this);
}
util.inherits(Writable, stream.Stream);
-Writable.prototype.end = function () {}
+Writable.prototype.end = function () {
+ this.endCalls++;
+}
function Readable () {
this.readable = true;
@@ -56,6 +59,9 @@ for (i = 0; i < limit; i++) {
r.emit('end')
}
assert.equal(0, r.listeners('end').length);
+assert.equal(limit, w.endCalls);
+
+w.endCalls = 0;
for (i = 0; i < limit; i++) {
r = new Readable()
@@ -63,6 +69,19 @@ for (i = 0; i < limit; i++) {
r.emit('close')
}
assert.equal(0, r.listeners('close').length);
+assert.equal(limit, w.endCalls);
+
+w.endCalls = 0;
+
+var r2;
+r = new Readable()
+r2 = new Readable();
+
+r.pipe(w)
+r2.pipe(w)
+r.emit('close')
+r2.emit('close')
+assert.equal(1, w.endCalls);
r = new Readable();