summaryrefslogtreecommitdiff
path: root/lib/_stream_readable.js
diff options
context:
space:
mode:
authorMatteo Collina <hello@matteocollina.com>2018-02-26 09:24:30 +0100
committerMatteo Collina <hello@matteocollina.com>2018-04-06 13:50:15 +0200
commitcf5f9867ff3e700dfd72519e7bdeb701e254317f (patch)
tree83472945c722622dae2d64cc97bad86e5c4547f9 /lib/_stream_readable.js
parent1e07acd476309e7ddc4981160b89731b61a31179 (diff)
downloadandroid-node-v8-cf5f9867ff3e700dfd72519e7bdeb701e254317f.tar.gz
android-node-v8-cf5f9867ff3e700dfd72519e7bdeb701e254317f.tar.bz2
android-node-v8-cf5f9867ff3e700dfd72519e7bdeb701e254317f.zip
stream: 'readable' have precedence over flowing
In Streams3 the 'readable' event/.read() method had a lower precedence than the `'data'` event that made them impossible to use them together. This make `.resume()` a no-op if there is a listener for the `'readable'` event, making the stream non-flowing if there is a `'data'`  listener. Fixes: https://github.com/nodejs/node/issues/18058 PR-URL: https://github.com/nodejs/node/pull/18994 Reviewed-By: James M Snell <jasnell@gmail.com> Reviewed-By: Anna Henningsen <anna@addaleax.net>
Diffstat (limited to 'lib/_stream_readable.js')
-rw-r--r--lib/_stream_readable.js57
1 files changed, 50 insertions, 7 deletions
diff --git a/lib/_stream_readable.js b/lib/_stream_readable.js
index 7c6671fcd0..5b044e79c1 100644
--- a/lib/_stream_readable.js
+++ b/lib/_stream_readable.js
@@ -223,6 +223,7 @@ Readable.prototype.unshift = function(chunk) {
};
function readableAddChunk(stream, chunk, encoding, addToFront, skipChunkCheck) {
+ debug('readableAddChunk', chunk);
var state = stream._readableState;
if (chunk === null) {
state.reading = false;
@@ -799,20 +800,24 @@ Readable.prototype.unpipe = function(dest) {
// Ensure readable listeners eventually get something
Readable.prototype.on = function(ev, fn) {
const res = Stream.prototype.on.call(this, ev, fn);
+ const state = this._readableState;
if (ev === 'data') {
- // Start flowing on next tick if stream isn't explicitly paused
- if (this._readableState.flowing !== false)
+ // update readableListening so that resume() may be a no-op
+ // a few lines down. This is needed to support once('readable').
+ state.readableListening = this.listenerCount('readable') > 0;
+
+ // Try start flowing on next tick if stream isn't explicitly paused
+ if (state.flowing !== false)
this.resume();
} else if (ev === 'readable') {
- const state = this._readableState;
if (!state.endEmitted && !state.readableListening) {
state.readableListening = state.needReadable = true;
state.emittedReadable = false;
- if (!state.reading) {
- process.nextTick(nReadingNextTick, this);
- } else if (state.length) {
+ if (state.length) {
emitReadable(this);
+ } else if (!state.reading) {
+ process.nextTick(nReadingNextTick, this);
}
}
}
@@ -821,6 +826,42 @@ Readable.prototype.on = function(ev, fn) {
};
Readable.prototype.addListener = Readable.prototype.on;
+Readable.prototype.removeListener = function(ev, fn) {
+ const res = Stream.prototype.removeListener.call(this, ev, fn);
+
+ if (ev === 'readable') {
+ // We need to check if there is someone still listening to
+ // to readable and reset the state. However this needs to happen
+ // after readable has been emitted but before I/O (nextTick) to
+ // support once('readable', fn) cycles. This means that calling
+ // resume within the same tick will have no
+ // effect.
+ process.nextTick(updateReadableListening, this);
+ }
+
+ return res;
+};
+
+Readable.prototype.removeAllListeners = function(ev) {
+ const res = Stream.prototype.removeAllListeners.call(this, ev);
+
+ if (ev === 'readable' || ev === undefined) {
+ // We need to check if there is someone still listening to
+ // to readable and reset the state. However this needs to happen
+ // after readable has been emitted but before I/O (nextTick) to
+ // support once('readable', fn) cycles. This means that calling
+ // resume within the same tick will have no
+ // effect.
+ process.nextTick(updateReadableListening, this);
+ }
+
+ return res;
+};
+
+function updateReadableListening(self) {
+ self._readableState.readableListening = self.listenerCount('readable') > 0;
+}
+
function nReadingNextTick(self) {
debug('readable nexttick read 0');
self.read(0);
@@ -832,7 +873,9 @@ Readable.prototype.resume = function() {
var state = this._readableState;
if (!state.flowing) {
debug('resume');
- state.flowing = true;
+ // we flow only if there is no one listening
+ // for readable
+ state.flowing = !state.readableListening;
resume(this, state);
}
return this;