From cf5f9867ff3e700dfd72519e7bdeb701e254317f Mon Sep 17 00:00:00 2001 From: Matteo Collina Date: Mon, 26 Feb 2018 09:24:30 +0100 Subject: stream: 'readable' have precedence over flowing MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit In Streams3 the 'readable' event/.read() method had a lower precedence than the `'data'` event that made them impossible to use them together. This make `.resume()` a no-op if there is a listener for the `'readable'` event, making the stream non-flowing if there is a `'data'`  listener. Fixes: https://github.com/nodejs/node/issues/18058 PR-URL: https://github.com/nodejs/node/pull/18994 Reviewed-By: James M Snell Reviewed-By: Anna Henningsen --- lib/_stream_readable.js | 57 +++++++++++++++++++++++++++++++++++++++++++------ 1 file changed, 50 insertions(+), 7 deletions(-) (limited to 'lib/_stream_readable.js') diff --git a/lib/_stream_readable.js b/lib/_stream_readable.js index 7c6671fcd0..5b044e79c1 100644 --- a/lib/_stream_readable.js +++ b/lib/_stream_readable.js @@ -223,6 +223,7 @@ Readable.prototype.unshift = function(chunk) { }; function readableAddChunk(stream, chunk, encoding, addToFront, skipChunkCheck) { + debug('readableAddChunk', chunk); var state = stream._readableState; if (chunk === null) { state.reading = false; @@ -799,20 +800,24 @@ Readable.prototype.unpipe = function(dest) { // Ensure readable listeners eventually get something Readable.prototype.on = function(ev, fn) { const res = Stream.prototype.on.call(this, ev, fn); + const state = this._readableState; if (ev === 'data') { - // Start flowing on next tick if stream isn't explicitly paused - if (this._readableState.flowing !== false) + // update readableListening so that resume() may be a no-op + // a few lines down. This is needed to support once('readable'). + state.readableListening = this.listenerCount('readable') > 0; + + // Try start flowing on next tick if stream isn't explicitly paused + if (state.flowing !== false) this.resume(); } else if (ev === 'readable') { - const state = this._readableState; if (!state.endEmitted && !state.readableListening) { state.readableListening = state.needReadable = true; state.emittedReadable = false; - if (!state.reading) { - process.nextTick(nReadingNextTick, this); - } else if (state.length) { + if (state.length) { emitReadable(this); + } else if (!state.reading) { + process.nextTick(nReadingNextTick, this); } } } @@ -821,6 +826,42 @@ Readable.prototype.on = function(ev, fn) { }; Readable.prototype.addListener = Readable.prototype.on; +Readable.prototype.removeListener = function(ev, fn) { + const res = Stream.prototype.removeListener.call(this, ev, fn); + + if (ev === 'readable') { + // We need to check if there is someone still listening to + // to readable and reset the state. However this needs to happen + // after readable has been emitted but before I/O (nextTick) to + // support once('readable', fn) cycles. This means that calling + // resume within the same tick will have no + // effect. + process.nextTick(updateReadableListening, this); + } + + return res; +}; + +Readable.prototype.removeAllListeners = function(ev) { + const res = Stream.prototype.removeAllListeners.call(this, ev); + + if (ev === 'readable' || ev === undefined) { + // We need to check if there is someone still listening to + // to readable and reset the state. However this needs to happen + // after readable has been emitted but before I/O (nextTick) to + // support once('readable', fn) cycles. This means that calling + // resume within the same tick will have no + // effect. + process.nextTick(updateReadableListening, this); + } + + return res; +}; + +function updateReadableListening(self) { + self._readableState.readableListening = self.listenerCount('readable') > 0; +} + function nReadingNextTick(self) { debug('readable nexttick read 0'); self.read(0); @@ -832,7 +873,9 @@ Readable.prototype.resume = function() { var state = this._readableState; if (!state.flowing) { debug('resume'); - state.flowing = true; + // we flow only if there is no one listening + // for readable + state.flowing = !state.readableListening; resume(this, state); } return this; -- cgit v1.2.3