diff options
author | Matteo Collina <hello@matteocollina.com> | 2018-01-04 18:06:56 +0100 |
---|---|---|
committer | Matteo Collina <hello@matteocollina.com> | 2018-01-10 10:48:03 +0100 |
commit | 1e0f3315c77033ef0e01bb37c3d41c8e1d65e686 (patch) | |
tree | b529e81c0e3fda479f2ba69996f484490fb098ca /lib/_stream_readable.js | |
parent | 800caac2362e602d80b5c61fe1cb288bbcdb316a (diff) | |
download | android-node-v8-1e0f3315c77033ef0e01bb37c3d41c8e1d65e686.tar.gz android-node-v8-1e0f3315c77033ef0e01bb37c3d41c8e1d65e686.tar.bz2 android-node-v8-1e0f3315c77033ef0e01bb37c3d41c8e1d65e686.zip |
stream: always defer 'readable' with nextTick
Emit 'readable' always in the next tick, resulting in a single
call to _read() per microtick. This removes the need for the
user to implement buffering if they wanted to call this.push()
multiple times in an asynchronous fashion, as this.push() triggers
this._read() call.
PR-URL: https://github.com/nodejs/node/pull/17979
Fixes: https://github.com/nodejs/node/issues/3203
Reviewed-By: James M Snell <jasnell@gmail.com>
Reviewed-By: Anna Henningsen <anna@addaleax.net>
Reviewed-By: Benjamin Gruenbaum <benjamingr@gmail.com>
Diffstat (limited to 'lib/_stream_readable.js')
-rw-r--r-- | lib/_stream_readable.js | 22 |
1 files changed, 14 insertions, 8 deletions
diff --git a/lib/_stream_readable.js b/lib/_stream_readable.js index 21598efa65..eb90c28b64 100644 --- a/lib/_stream_readable.js +++ b/lib/_stream_readable.js @@ -267,7 +267,6 @@ function readableAddChunk(stream, chunk, encoding, addToFront, skipChunkCheck) { function addChunk(stream, state, chunk, addToFront) { if (state.flowing && state.length === 0 && !state.sync) { stream.emit('data', chunk); - stream.read(0); } else { // update the buffer info. state.length += state.objectMode ? 1 : chunk.length; @@ -496,7 +495,11 @@ function onEofChunk(stream, state) { state.ended = true; // emit 'readable' now to make sure it gets picked up. - emitReadable(stream); + state.needReadable = false; + if (!state.emittedReadable) { + state.emittedReadable = true; + emitReadable_(stream); + } } // Don't emit readable right away in sync mode, because this can trigger @@ -508,16 +511,15 @@ function emitReadable(stream) { if (!state.emittedReadable) { debug('emitReadable', state.flowing); state.emittedReadable = true; - if (state.sync) - process.nextTick(emitReadable_, stream); - else - emitReadable_(stream); + process.nextTick(emitReadable_, stream); } } function emitReadable_(stream) { + var state = stream._readableState; debug('emit readable'); stream.emit('readable'); + state.needReadable = !state.flowing && !state.ended; flow(stream); } @@ -537,7 +539,7 @@ function maybeReadMore(stream, state) { function maybeReadMore_(stream, state) { var len = state.length; - while (!state.reading && !state.flowing && !state.ended && + while (!state.reading && !state.ended && state.length < state.highWaterMark) { debug('maybeReadMore read 0'); stream.read(0); @@ -644,6 +646,7 @@ Readable.prototype.pipe = function(dest, pipeOpts) { debug('ondata'); increasedAwaitDrain = false; var ret = dest.write(chunk); + debug('dest.write', ret); if (false === ret && !increasedAwaitDrain) { // If the user unpiped during `dest.write()`, it is possible // to get stuck in a permanently paused state if that write @@ -824,8 +827,8 @@ function resume(stream, state) { } function resume_(stream, state) { + debug('resume', state.reading); if (!state.reading) { - debug('resume read 0'); stream.read(0); } @@ -1087,6 +1090,7 @@ function copyFromBuffer(n, list) { function endReadable(stream) { var state = stream._readableState; + debug('endReadable', state.endEmitted); if (!state.endEmitted) { state.ended = true; process.nextTick(endReadableNT, state, stream); @@ -1094,6 +1098,8 @@ function endReadable(stream) { } function endReadableNT(state, stream) { + debug('endReadableNT', state.endEmitted, state.length); + // Check that we didn't get one last unshift. if (!state.endEmitted && state.length === 0) { state.endEmitted = true; |