summaryrefslogtreecommitdiff
path: root/lib/_stream_readable.js
diff options
context:
space:
mode:
authorMatteo Collina <hello@matteocollina.com>2018-01-04 18:06:56 +0100
committerMatteo Collina <hello@matteocollina.com>2018-01-10 10:48:03 +0100
commit1e0f3315c77033ef0e01bb37c3d41c8e1d65e686 (patch)
treeb529e81c0e3fda479f2ba69996f484490fb098ca /lib/_stream_readable.js
parent800caac2362e602d80b5c61fe1cb288bbcdb316a (diff)
downloadandroid-node-v8-1e0f3315c77033ef0e01bb37c3d41c8e1d65e686.tar.gz
android-node-v8-1e0f3315c77033ef0e01bb37c3d41c8e1d65e686.tar.bz2
android-node-v8-1e0f3315c77033ef0e01bb37c3d41c8e1d65e686.zip
stream: always defer 'readable' with nextTick
Emit 'readable' always in the next tick, resulting in a single call to _read() per microtick. This removes the need for the user to implement buffering if they wanted to call this.push() multiple times in an asynchronous fashion, as this.push() triggers this._read() call. PR-URL: https://github.com/nodejs/node/pull/17979 Fixes: https://github.com/nodejs/node/issues/3203 Reviewed-By: James M Snell <jasnell@gmail.com> Reviewed-By: Anna Henningsen <anna@addaleax.net> Reviewed-By: Benjamin Gruenbaum <benjamingr@gmail.com>
Diffstat (limited to 'lib/_stream_readable.js')
-rw-r--r--lib/_stream_readable.js22
1 files changed, 14 insertions, 8 deletions
diff --git a/lib/_stream_readable.js b/lib/_stream_readable.js
index 21598efa65..eb90c28b64 100644
--- a/lib/_stream_readable.js
+++ b/lib/_stream_readable.js
@@ -267,7 +267,6 @@ function readableAddChunk(stream, chunk, encoding, addToFront, skipChunkCheck) {
function addChunk(stream, state, chunk, addToFront) {
if (state.flowing && state.length === 0 && !state.sync) {
stream.emit('data', chunk);
- stream.read(0);
} else {
// update the buffer info.
state.length += state.objectMode ? 1 : chunk.length;
@@ -496,7 +495,11 @@ function onEofChunk(stream, state) {
state.ended = true;
// emit 'readable' now to make sure it gets picked up.
- emitReadable(stream);
+ state.needReadable = false;
+ if (!state.emittedReadable) {
+ state.emittedReadable = true;
+ emitReadable_(stream);
+ }
}
// Don't emit readable right away in sync mode, because this can trigger
@@ -508,16 +511,15 @@ function emitReadable(stream) {
if (!state.emittedReadable) {
debug('emitReadable', state.flowing);
state.emittedReadable = true;
- if (state.sync)
- process.nextTick(emitReadable_, stream);
- else
- emitReadable_(stream);
+ process.nextTick(emitReadable_, stream);
}
}
function emitReadable_(stream) {
+ var state = stream._readableState;
debug('emit readable');
stream.emit('readable');
+ state.needReadable = !state.flowing && !state.ended;
flow(stream);
}
@@ -537,7 +539,7 @@ function maybeReadMore(stream, state) {
function maybeReadMore_(stream, state) {
var len = state.length;
- while (!state.reading && !state.flowing && !state.ended &&
+ while (!state.reading && !state.ended &&
state.length < state.highWaterMark) {
debug('maybeReadMore read 0');
stream.read(0);
@@ -644,6 +646,7 @@ Readable.prototype.pipe = function(dest, pipeOpts) {
debug('ondata');
increasedAwaitDrain = false;
var ret = dest.write(chunk);
+ debug('dest.write', ret);
if (false === ret && !increasedAwaitDrain) {
// If the user unpiped during `dest.write()`, it is possible
// to get stuck in a permanently paused state if that write
@@ -824,8 +827,8 @@ function resume(stream, state) {
}
function resume_(stream, state) {
+ debug('resume', state.reading);
if (!state.reading) {
- debug('resume read 0');
stream.read(0);
}
@@ -1087,6 +1090,7 @@ function copyFromBuffer(n, list) {
function endReadable(stream) {
var state = stream._readableState;
+ debug('endReadable', state.endEmitted);
if (!state.endEmitted) {
state.ended = true;
process.nextTick(endReadableNT, state, stream);
@@ -1094,6 +1098,8 @@ function endReadable(stream) {
}
function endReadableNT(state, stream) {
+ debug('endReadableNT', state.endEmitted, state.length);
+
// Check that we didn't get one last unshift.
if (!state.endEmitted && state.length === 0) {
state.endEmitted = true;