summaryrefslogtreecommitdiff
path: root/lib
diff options
context:
space:
mode:
authorran <abbshrsoufii@gmail.com>2019-08-26 17:00:06 +0800
committerAnna Henningsen <anna@addaleax.net>2019-08-26 15:20:50 +0200
commit698a29420f92844478101ec1fccdc81b46954e2e (patch)
tree47e4f9c728981a0e9377d622a957632c39ccf61c /lib
parent627bf59e8ddd9826720c45f430c2a2e489df6e66 (diff)
downloadandroid-node-v8-698a29420f92844478101ec1fccdc81b46954e2e.tar.gz
android-node-v8-698a29420f92844478101ec1fccdc81b46954e2e.tar.bz2
android-node-v8-698a29420f92844478101ec1fccdc81b46954e2e.zip
stream: fix readable state `awaitDrain` increase in recursion
PR-URL: https://github.com/nodejs/node/pull/27572 Reviewed-By: Anna Henningsen <anna@addaleax.net>
Diffstat (limited to 'lib')
-rw-r--r--lib/_stream_readable.js65
1 files changed, 51 insertions, 14 deletions
diff --git a/lib/_stream_readable.js b/lib/_stream_readable.js
index ac37930d12..cfa36731e3 100644
--- a/lib/_stream_readable.js
+++ b/lib/_stream_readable.js
@@ -134,8 +134,10 @@ function ReadableState(options, stream, isDuplex) {
// Everything else in the universe uses 'utf8', though.
this.defaultEncoding = options.defaultEncoding || 'utf8';
- // The number of writers that are awaiting a drain event in .pipe()s
- this.awaitDrain = 0;
+ // Ref the piped dest which we need a drain event on it
+ // type: null | Writable | Set<Writable>
+ this.awaitDrainWriters = null;
+ this.multiAwaitDrain = false;
// If true, a maybeReadMore has been scheduled
this.readingMore = false;
@@ -310,7 +312,13 @@ function readableAddChunk(stream, chunk, encoding, addToFront) {
function addChunk(stream, state, chunk, addToFront) {
if (state.flowing && state.length === 0 && !state.sync) {
- state.awaitDrain = 0;
+ // Use the guard to avoid creating `Set()` repeatedly
+ // when we have multiple pipes.
+ if (state.multiAwaitDrain) {
+ state.awaitDrainWriters.clear();
+ } else {
+ state.awaitDrainWriters = null;
+ }
stream.emit('data', chunk);
} else {
// Update the buffer info.
@@ -511,7 +519,11 @@ Readable.prototype.read = function(n) {
n = 0;
} else {
state.length -= n;
- state.awaitDrain = 0;
+ if (state.multiAwaitDrain) {
+ state.awaitDrainWriters.clear();
+ } else {
+ state.awaitDrainWriters = null;
+ }
}
if (state.length === 0) {
@@ -656,6 +668,15 @@ Readable.prototype.pipe = function(dest, pipeOpts) {
const src = this;
const state = this._readableState;
+ if (state.pipes.length === 1) {
+ if (!state.multiAwaitDrain) {
+ state.multiAwaitDrain = true;
+ state.awaitDrainWriters = new Set(
+ state.awaitDrainWriters ? [state.awaitDrainWriters] : []
+ );
+ }
+ }
+
state.pipes.push(dest);
debug('pipe count=%d opts=%j', state.pipes.length, pipeOpts);
@@ -709,7 +730,7 @@ Readable.prototype.pipe = function(dest, pipeOpts) {
// flowing again.
// So, if this is awaiting a drain, then we just call it now.
// If we don't know, then assume that we are waiting for one.
- if (ondrain && state.awaitDrain &&
+ if (ondrain && state.awaitDrainWriters &&
(!dest._writableState || dest._writableState.needDrain))
ondrain();
}
@@ -724,16 +745,22 @@ Readable.prototype.pipe = function(dest, pipeOpts) {
// to get stuck in a permanently paused state if that write
// also returned false.
// => Check whether `dest` is still a piping destination.
- if (state.pipes.length > 0 && state.pipes.includes(dest) && !cleanedUp) {
- debug('false write response, pause', state.awaitDrain);
- state.awaitDrain++;
+ if (!cleanedUp) {
+ if (state.pipes.length === 1 && state.pipes[0] === dest) {
+ debug('false write response, pause', 0);
+ state.awaitDrainWriters = dest;
+ state.multiAwaitDrain = false;
+ } else if (state.pipes.length > 1 && state.pipes.includes(dest)) {
+ debug('false write response, pause', state.awaitDrainWriters.size);
+ state.awaitDrainWriters.add(dest);
+ }
}
if (!ondrain) {
// When the dest drains, it reduces the awaitDrain counter
// on the source. This would be more elegant with a .once()
// handler in flow(), but adding and removing repeatedly is
// too slow.
- ondrain = pipeOnDrain(src);
+ ondrain = pipeOnDrain(src, dest);
dest.on('drain', ondrain);
}
src.pause();
@@ -783,13 +810,23 @@ Readable.prototype.pipe = function(dest, pipeOpts) {
return dest;
};
-function pipeOnDrain(src) {
+function pipeOnDrain(src, dest) {
return function pipeOnDrainFunctionResult() {
const state = src._readableState;
- debug('pipeOnDrain', state.awaitDrain);
- if (state.awaitDrain)
- state.awaitDrain--;
- if (state.awaitDrain === 0 && EE.listenerCount(src, 'data')) {
+
+ // `ondrain` will call directly,
+ // `this` maybe not a reference to dest,
+ // so we use the real dest here.
+ if (state.awaitDrainWriters === dest) {
+ debug('pipeOnDrain', 1);
+ state.awaitDrainWriters = null;
+ } else if (state.multiAwaitDrain) {
+ debug('pipeOnDrain', state.awaitDrainWriters.size);
+ state.awaitDrainWriters.delete(dest);
+ }
+
+ if ((!state.awaitDrainWriters || state.awaitDrainWriters.size === 0) &&
+ EE.listenerCount(src, 'data')) {
state.flowing = true;
flow(src);
}