diff options
Diffstat (limited to 'deps/npm/node_modules/readable-stream/lib/_stream_readable.js')
-rw-r--r-- | deps/npm/node_modules/readable-stream/lib/_stream_readable.js | 46 |
1 files changed, 38 insertions, 8 deletions
diff --git a/deps/npm/node_modules/readable-stream/lib/_stream_readable.js b/deps/npm/node_modules/readable-stream/lib/_stream_readable.js index b9b1b742cc..33f478d7e8 100644 --- a/deps/npm/node_modules/readable-stream/lib/_stream_readable.js +++ b/deps/npm/node_modules/readable-stream/lib/_stream_readable.js @@ -141,7 +141,8 @@ function ReadableState(options, stream, isDuplex) { this.needReadable = false; this.emittedReadable = false; this.readableListening = false; - this.resumeScheduled = false; // Should close be emitted on destroy. Defaults to true. + this.resumeScheduled = false; + this.paused = true; // Should close be emitted on destroy. Defaults to true. this.emitClose = options.emitClose !== false; // has it been destroyed @@ -543,13 +544,35 @@ function maybeReadMore(stream, state) { } function maybeReadMore_(stream, state) { - var len = state.length; - - while (!state.reading && !state.ended && state.length < state.highWaterMark) { + // Attempt to read more data if we should. + // + // The conditions for reading more data are (one of): + // - Not enough data buffered (state.length < state.highWaterMark). The loop + // is responsible for filling the buffer with enough data if such data + // is available. If highWaterMark is 0 and we are not in the flowing mode + // we should _not_ attempt to buffer any extra data. We'll get more data + // when the stream consumer calls read() instead. + // - No data in the buffer, and the stream is in flowing mode. In this mode + // the loop below is responsible for ensuring read() is called. Failing to + // call read here would abort the flow and there's no other mechanism for + // continuing the flow if the stream consumer has just subscribed to the + // 'data' event. + // + // In addition to the above conditions to keep reading data, the following + // conditions prevent the data from being read: + // - The stream has ended (state.ended). + // - There is already a pending 'read' operation (state.reading). This is a + // case where the the stream has called the implementation defined _read() + // method, but they are processing the call asynchronously and have _not_ + // called push() with new data. In this case we skip performing more + // read()s. The execution ends in this method again after the _read() ends + // up calling push() with more data. + while (!state.reading && !state.ended && (state.length < state.highWaterMark || state.flowing && state.length === 0)) { + var len = state.length; debug('maybeReadMore read 0'); stream.read(0); if (len === state.length) // didn't get any data, stop spinning. - break;else len = state.length; + break; } state.readingMore = false; @@ -822,9 +845,14 @@ Readable.prototype.removeAllListeners = function (ev) { }; function updateReadableListening(self) { - self._readableState.readableListening = self.listenerCount('readable') > 0; // crude way to check if we should resume - - if (self.listenerCount('data') > 0) { + var state = self._readableState; + state.readableListening = self.listenerCount('readable') > 0; + + if (state.resumeScheduled && !state.paused) { + // flowing needs to be set to true now, otherwise + // the upcoming resume will not flow. + state.flowing = true; // crude way to check if we should resume + } else if (self.listenerCount('data') > 0) { self.resume(); } } @@ -848,6 +876,7 @@ Readable.prototype.resume = function () { resume(this, state); } + state.paused = false; return this; }; @@ -880,6 +909,7 @@ Readable.prototype.pause = function () { this.emit('pause'); } + this._readableState.paused = true; return this; }; |