diff options
author | ran <abbshrsoufii@gmail.com> | 2019-08-26 17:00:06 +0800 |
---|---|---|
committer | Anna Henningsen <anna@addaleax.net> | 2019-08-26 15:20:50 +0200 |
commit | 698a29420f92844478101ec1fccdc81b46954e2e (patch) | |
tree | 47e4f9c728981a0e9377d622a957632c39ccf61c /lib | |
parent | 627bf59e8ddd9826720c45f430c2a2e489df6e66 (diff) | |
download | android-node-v8-698a29420f92844478101ec1fccdc81b46954e2e.tar.gz android-node-v8-698a29420f92844478101ec1fccdc81b46954e2e.tar.bz2 android-node-v8-698a29420f92844478101ec1fccdc81b46954e2e.zip |
stream: fix readable state `awaitDrain` increase in recursion
PR-URL: https://github.com/nodejs/node/pull/27572
Reviewed-By: Anna Henningsen <anna@addaleax.net>
Diffstat (limited to 'lib')
-rw-r--r-- | lib/_stream_readable.js | 65 |
1 files changed, 51 insertions, 14 deletions
diff --git a/lib/_stream_readable.js b/lib/_stream_readable.js index ac37930d12..cfa36731e3 100644 --- a/lib/_stream_readable.js +++ b/lib/_stream_readable.js @@ -134,8 +134,10 @@ function ReadableState(options, stream, isDuplex) { // Everything else in the universe uses 'utf8', though. this.defaultEncoding = options.defaultEncoding || 'utf8'; - // The number of writers that are awaiting a drain event in .pipe()s - this.awaitDrain = 0; + // Ref the piped dest which we need a drain event on it + // type: null | Writable | Set<Writable> + this.awaitDrainWriters = null; + this.multiAwaitDrain = false; // If true, a maybeReadMore has been scheduled this.readingMore = false; @@ -310,7 +312,13 @@ function readableAddChunk(stream, chunk, encoding, addToFront) { function addChunk(stream, state, chunk, addToFront) { if (state.flowing && state.length === 0 && !state.sync) { - state.awaitDrain = 0; + // Use the guard to avoid creating `Set()` repeatedly + // when we have multiple pipes. + if (state.multiAwaitDrain) { + state.awaitDrainWriters.clear(); + } else { + state.awaitDrainWriters = null; + } stream.emit('data', chunk); } else { // Update the buffer info. @@ -511,7 +519,11 @@ Readable.prototype.read = function(n) { n = 0; } else { state.length -= n; - state.awaitDrain = 0; + if (state.multiAwaitDrain) { + state.awaitDrainWriters.clear(); + } else { + state.awaitDrainWriters = null; + } } if (state.length === 0) { @@ -656,6 +668,15 @@ Readable.prototype.pipe = function(dest, pipeOpts) { const src = this; const state = this._readableState; + if (state.pipes.length === 1) { + if (!state.multiAwaitDrain) { + state.multiAwaitDrain = true; + state.awaitDrainWriters = new Set( + state.awaitDrainWriters ? [state.awaitDrainWriters] : [] + ); + } + } + state.pipes.push(dest); debug('pipe count=%d opts=%j', state.pipes.length, pipeOpts); @@ -709,7 +730,7 @@ Readable.prototype.pipe = function(dest, pipeOpts) { // flowing again. // So, if this is awaiting a drain, then we just call it now. // If we don't know, then assume that we are waiting for one. - if (ondrain && state.awaitDrain && + if (ondrain && state.awaitDrainWriters && (!dest._writableState || dest._writableState.needDrain)) ondrain(); } @@ -724,16 +745,22 @@ Readable.prototype.pipe = function(dest, pipeOpts) { // to get stuck in a permanently paused state if that write // also returned false. // => Check whether `dest` is still a piping destination. - if (state.pipes.length > 0 && state.pipes.includes(dest) && !cleanedUp) { - debug('false write response, pause', state.awaitDrain); - state.awaitDrain++; + if (!cleanedUp) { + if (state.pipes.length === 1 && state.pipes[0] === dest) { + debug('false write response, pause', 0); + state.awaitDrainWriters = dest; + state.multiAwaitDrain = false; + } else if (state.pipes.length > 1 && state.pipes.includes(dest)) { + debug('false write response, pause', state.awaitDrainWriters.size); + state.awaitDrainWriters.add(dest); + } } if (!ondrain) { // When the dest drains, it reduces the awaitDrain counter // on the source. This would be more elegant with a .once() // handler in flow(), but adding and removing repeatedly is // too slow. - ondrain = pipeOnDrain(src); + ondrain = pipeOnDrain(src, dest); dest.on('drain', ondrain); } src.pause(); @@ -783,13 +810,23 @@ Readable.prototype.pipe = function(dest, pipeOpts) { return dest; }; -function pipeOnDrain(src) { +function pipeOnDrain(src, dest) { return function pipeOnDrainFunctionResult() { const state = src._readableState; - debug('pipeOnDrain', state.awaitDrain); - if (state.awaitDrain) - state.awaitDrain--; - if (state.awaitDrain === 0 && EE.listenerCount(src, 'data')) { + + // `ondrain` will call directly, + // `this` maybe not a reference to dest, + // so we use the real dest here. + if (state.awaitDrainWriters === dest) { + debug('pipeOnDrain', 1); + state.awaitDrainWriters = null; + } else if (state.multiAwaitDrain) { + debug('pipeOnDrain', state.awaitDrainWriters.size); + state.awaitDrainWriters.delete(dest); + } + + if ((!state.awaitDrainWriters || state.awaitDrainWriters.size === 0) && + EE.listenerCount(src, 'data')) { state.flowing = true; flow(src); } |