summaryrefslogtreecommitdiff
path: root/deps/npm/node_modules/readable-stream/lib/_stream_readable.js
diff options
context:
space:
mode:
Diffstat (limited to 'deps/npm/node_modules/readable-stream/lib/_stream_readable.js')
-rw-r--r--deps/npm/node_modules/readable-stream/lib/_stream_readable.js46
1 files changed, 38 insertions, 8 deletions
diff --git a/deps/npm/node_modules/readable-stream/lib/_stream_readable.js b/deps/npm/node_modules/readable-stream/lib/_stream_readable.js
index b9b1b742cc..33f478d7e8 100644
--- a/deps/npm/node_modules/readable-stream/lib/_stream_readable.js
+++ b/deps/npm/node_modules/readable-stream/lib/_stream_readable.js
@@ -141,7 +141,8 @@ function ReadableState(options, stream, isDuplex) {
this.needReadable = false;
this.emittedReadable = false;
this.readableListening = false;
- this.resumeScheduled = false; // Should close be emitted on destroy. Defaults to true.
+ this.resumeScheduled = false;
+ this.paused = true; // Should close be emitted on destroy. Defaults to true.
this.emitClose = options.emitClose !== false; // has it been destroyed
@@ -543,13 +544,35 @@ function maybeReadMore(stream, state) {
}
function maybeReadMore_(stream, state) {
- var len = state.length;
-
- while (!state.reading && !state.ended && state.length < state.highWaterMark) {
+ // Attempt to read more data if we should.
+ //
+ // The conditions for reading more data are (one of):
+ // - Not enough data buffered (state.length < state.highWaterMark). The loop
+ // is responsible for filling the buffer with enough data if such data
+ // is available. If highWaterMark is 0 and we are not in the flowing mode
+ // we should _not_ attempt to buffer any extra data. We'll get more data
+ // when the stream consumer calls read() instead.
+ // - No data in the buffer, and the stream is in flowing mode. In this mode
+ // the loop below is responsible for ensuring read() is called. Failing to
+ // call read here would abort the flow and there's no other mechanism for
+ // continuing the flow if the stream consumer has just subscribed to the
+ // 'data' event.
+ //
+ // In addition to the above conditions to keep reading data, the following
+ // conditions prevent the data from being read:
+ // - The stream has ended (state.ended).
+ // - There is already a pending 'read' operation (state.reading). This is a
+ // case where the the stream has called the implementation defined _read()
+ // method, but they are processing the call asynchronously and have _not_
+ // called push() with new data. In this case we skip performing more
+ // read()s. The execution ends in this method again after the _read() ends
+ // up calling push() with more data.
+ while (!state.reading && !state.ended && (state.length < state.highWaterMark || state.flowing && state.length === 0)) {
+ var len = state.length;
debug('maybeReadMore read 0');
stream.read(0);
if (len === state.length) // didn't get any data, stop spinning.
- break;else len = state.length;
+ break;
}
state.readingMore = false;
@@ -822,9 +845,14 @@ Readable.prototype.removeAllListeners = function (ev) {
};
function updateReadableListening(self) {
- self._readableState.readableListening = self.listenerCount('readable') > 0; // crude way to check if we should resume
-
- if (self.listenerCount('data') > 0) {
+ var state = self._readableState;
+ state.readableListening = self.listenerCount('readable') > 0;
+
+ if (state.resumeScheduled && !state.paused) {
+ // flowing needs to be set to true now, otherwise
+ // the upcoming resume will not flow.
+ state.flowing = true; // crude way to check if we should resume
+ } else if (self.listenerCount('data') > 0) {
self.resume();
}
}
@@ -848,6 +876,7 @@ Readable.prototype.resume = function () {
resume(this, state);
}
+ state.paused = false;
return this;
};
@@ -880,6 +909,7 @@ Readable.prototype.pause = function () {
this.emit('pause');
}
+ this._readableState.paused = true;
return this;
};