summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--lib/_stream_readable.js21
-rw-r--r--test/parallel/test-stream-pipe-flow.js23
-rw-r--r--test/parallel/test-stream2-readable-legacy-drain.js7
3 files changed, 36 insertions, 15 deletions
diff --git a/lib/_stream_readable.js b/lib/_stream_readable.js
index 1830eea66d..01f95104b8 100644
--- a/lib/_stream_readable.js
+++ b/lib/_stream_readable.js
@@ -676,12 +676,7 @@ Readable.prototype.pipe = function(dest, pipeOpts) {
dest.end();
}
- // When the dest drains, it reduces the awaitDrain counter
- // on the source. This would be more elegant with a .once()
- // handler in flow(), but adding and removing repeatedly is
- // too slow.
- const ondrain = pipeOnDrain(src);
- dest.on('drain', ondrain);
+ let ondrain;
var cleanedUp = false;
function cleanup() {
@@ -689,7 +684,9 @@ Readable.prototype.pipe = function(dest, pipeOpts) {
// Cleanup event handlers once the pipe is broken
dest.removeListener('close', onclose);
dest.removeListener('finish', onfinish);
- dest.removeListener('drain', ondrain);
+ if (ondrain) {
+ dest.removeListener('drain', ondrain);
+ }
dest.removeListener('error', onerror);
dest.removeListener('unpipe', onunpipe);
src.removeListener('end', onend);
@@ -703,7 +700,7 @@ Readable.prototype.pipe = function(dest, pipeOpts) {
// flowing again.
// So, if this is awaiting a drain, then we just call it now.
// If we don't know, then assume that we are waiting for one.
- if (state.awaitDrain &&
+ if (ondrain && state.awaitDrain &&
(!dest._writableState || dest._writableState.needDrain))
ondrain();
}
@@ -722,6 +719,14 @@ Readable.prototype.pipe = function(dest, pipeOpts) {
debug('false write response, pause', state.awaitDrain);
state.awaitDrain++;
}
+ if (!ondrain) {
+ // When the dest drains, it reduces the awaitDrain counter
+ // on the source. This would be more elegant with a .once()
+ // handler in flow(), but adding and removing repeatedly is
+ // too slow.
+ ondrain = pipeOnDrain(src);
+ dest.on('drain', ondrain);
+ }
src.pause();
}
}
diff --git a/test/parallel/test-stream-pipe-flow.js b/test/parallel/test-stream-pipe-flow.js
index b696821c0d..1f2e8f54ce 100644
--- a/test/parallel/test-stream-pipe-flow.js
+++ b/test/parallel/test-stream-pipe-flow.js
@@ -1,5 +1,6 @@
'use strict';
const common = require('../common');
+const assert = require('assert');
const { Readable, Writable, PassThrough } = require('stream');
{
@@ -65,3 +66,25 @@ const { Readable, Writable, PassThrough } = require('stream');
wrapper.resume();
wrapper.on('end', common.mustCall());
}
+
+{
+ // Only register drain if there is backpressure.
+ const rs = new Readable({ read() {} });
+
+ const pt = rs
+ .pipe(new PassThrough({ objectMode: true, highWaterMark: 2 }));
+ assert.strictEqual(pt.listenerCount('drain'), 0);
+ pt.on('finish', () => {
+ assert.strictEqual(pt.listenerCount('drain'), 0);
+ });
+
+ rs.push('asd');
+ assert.strictEqual(pt.listenerCount('drain'), 0);
+
+ process.nextTick(() => {
+ rs.push('asd');
+ assert.strictEqual(pt.listenerCount('drain'), 0);
+ rs.push(null);
+ assert.strictEqual(pt.listenerCount('drain'), 0);
+ });
+}
diff --git a/test/parallel/test-stream2-readable-legacy-drain.js b/test/parallel/test-stream2-readable-legacy-drain.js
index 69780a078d..beb3657776 100644
--- a/test/parallel/test-stream2-readable-legacy-drain.js
+++ b/test/parallel/test-stream2-readable-legacy-drain.js
@@ -52,11 +52,4 @@ function drain() {
w.end = common.mustCall();
-// Just for kicks, let's mess with the drain count.
-// This verifies that even if it gets negative in the
-// pipe() cleanup function, we'll still function properly.
-r.on('readable', function() {
- w.emit('drain');
-});
-
r.pipe(w);