summaryrefslogtreecommitdiff
path: root/deps/npm/node_modules/readable-stream/lib/_stream_readable.js
diff options
context:
space:
mode:
authorRebecca Turner <me@re-becca.org>2015-06-12 04:04:17 -0400
committerJeremiah Senkpiel <fishrock123@rocketmail.com>2015-06-15 10:15:55 -0700
commit3e12561b55de721faf7bcccad1dc0ccef7c28a47 (patch)
treedfe70fe3d6d00d47981b5df180c5b0d9aff32e1e /deps/npm/node_modules/readable-stream/lib/_stream_readable.js
parent5c2707c1b2078e0d4ede7fadb1adfa5eebf29210 (diff)
downloadandroid-node-v8-3e12561b55de721faf7bcccad1dc0ccef7c28a47.tar.gz
android-node-v8-3e12561b55de721faf7bcccad1dc0ccef7c28a47.tar.bz2
android-node-v8-3e12561b55de721faf7bcccad1dc0ccef7c28a47.zip
deps: upgrade to npm 2.11.2
PR-URL: https://github.com/nodejs/io.js/pull/1956 Reviewed-By: Jeremiah Senkpiel <fishrock123@rocketmail.com>
Diffstat (limited to 'deps/npm/node_modules/readable-stream/lib/_stream_readable.js')
-rw-r--r--deps/npm/node_modules/readable-stream/lib/_stream_readable.js377
1 files changed, 173 insertions, 204 deletions
diff --git a/deps/npm/node_modules/readable-stream/lib/_stream_readable.js b/deps/npm/node_modules/readable-stream/lib/_stream_readable.js
index 630722099e..19ab358898 100644
--- a/deps/npm/node_modules/readable-stream/lib/_stream_readable.js
+++ b/deps/npm/node_modules/readable-stream/lib/_stream_readable.js
@@ -49,15 +49,29 @@ util.inherits = require('inherits');
var StringDecoder;
+
+/*<replacement>*/
+var debug = require('util');
+if (debug && debug.debuglog) {
+ debug = debug.debuglog('stream');
+} else {
+ debug = function () {};
+}
+/*</replacement>*/
+
+
util.inherits(Readable, Stream);
function ReadableState(options, stream) {
+ var Duplex = require('./_stream_duplex');
+
options = options || {};
// the point at which it stops calling _read() to fill the buffer
// Note: 0 is a valid value, means "don't call _read preemptively ever"
var hwm = options.highWaterMark;
- this.highWaterMark = (hwm || hwm === 0) ? hwm : 16 * 1024;
+ var defaultHwm = options.objectMode ? 16 : 16 * 1024;
+ this.highWaterMark = (hwm || hwm === 0) ? hwm : defaultHwm;
// cast to ints.
this.highWaterMark = ~~this.highWaterMark;
@@ -66,19 +80,13 @@ function ReadableState(options, stream) {
this.length = 0;
this.pipes = null;
this.pipesCount = 0;
- this.flowing = false;
+ this.flowing = null;
this.ended = false;
this.endEmitted = false;
this.reading = false;
- // In streams that never have any data, and do push(null) right away,
- // the consumer can miss the 'end' event if they do some I/O before
- // consuming the stream. So, we don't emit('end') until some reading
- // happens.
- this.calledRead = false;
-
// a flag to be able to tell if the onwrite cb is called immediately,
- // or on a later tick. We set this to true at first, becuase any
+ // or on a later tick. We set this to true at first, because any
// actions that shouldn't happen until "later" should generally also
// not happen before the first write call.
this.sync = true;
@@ -94,6 +102,9 @@ function ReadableState(options, stream) {
// make all the buffer merging and length checks go away
this.objectMode = !!options.objectMode;
+ if (stream instanceof Duplex)
+ this.objectMode = this.objectMode || !!options.readableObjectMode;
+
// Crypto is kind of old and crusty. Historically, its default string
// encoding is 'binary' so we have to make this configurable.
// Everything else in the universe uses 'utf8', though.
@@ -120,6 +131,8 @@ function ReadableState(options, stream) {
}
function Readable(options) {
+ var Duplex = require('./_stream_duplex');
+
if (!(this instanceof Readable))
return new Readable(options);
@@ -138,7 +151,7 @@ function Readable(options) {
Readable.prototype.push = function(chunk, encoding) {
var state = this._readableState;
- if (typeof chunk === 'string' && !state.objectMode) {
+ if (util.isString(chunk) && !state.objectMode) {
encoding = encoding || state.defaultEncoding;
if (encoding !== state.encoding) {
chunk = new Buffer(chunk, encoding);
@@ -159,7 +172,7 @@ function readableAddChunk(stream, state, chunk, encoding, addToFront) {
var er = chunkInvalid(state, chunk);
if (er) {
stream.emit('error', er);
- } else if (chunk === null || chunk === undefined) {
+ } else if (util.isNullOrUndefined(chunk)) {
state.reading = false;
if (!state.ended)
onEofChunk(stream, state);
@@ -174,17 +187,24 @@ function readableAddChunk(stream, state, chunk, encoding, addToFront) {
if (state.decoder && !addToFront && !encoding)
chunk = state.decoder.write(chunk);
- // update the buffer info.
- state.length += state.objectMode ? 1 : chunk.length;
- if (addToFront) {
- state.buffer.unshift(chunk);
- } else {
+ if (!addToFront)
state.reading = false;
- state.buffer.push(chunk);
- }
- if (state.needReadable)
- emitReadable(stream);
+ // if we want the data now, just emit it.
+ if (state.flowing && state.length === 0 && !state.sync) {
+ stream.emit('data', chunk);
+ stream.read(0);
+ } else {
+ // update the buffer info.
+ state.length += state.objectMode ? 1 : chunk.length;
+ if (addToFront)
+ state.buffer.unshift(chunk);
+ else
+ state.buffer.push(chunk);
+
+ if (state.needReadable)
+ emitReadable(stream);
+ }
maybeReadMore(stream, state);
}
@@ -217,6 +237,7 @@ Readable.prototype.setEncoding = function(enc) {
StringDecoder = require('string_decoder/').StringDecoder;
this._readableState.decoder = new StringDecoder(enc);
this._readableState.encoding = enc;
+ return this;
};
// Don't raise the hwm > 128MB
@@ -240,7 +261,7 @@ function howMuchToRead(n, state) {
if (state.objectMode)
return n === 0 ? 0 : 1;
- if (n === null || isNaN(n)) {
+ if (isNaN(n) || util.isNull(n)) {
// only flow one buffer at a time
if (state.flowing && state.buffer.length)
return state.buffer[0].length;
@@ -272,12 +293,11 @@ function howMuchToRead(n, state) {
// you can override either this method, or the async _read(n) below.
Readable.prototype.read = function(n) {
+ debug('read', n);
var state = this._readableState;
- state.calledRead = true;
var nOrig = n;
- var ret;
- if (typeof n !== 'number' || n > 0)
+ if (!util.isNumber(n) || n > 0)
state.emittedReadable = false;
// if we're doing read(0) to trigger a readable event, but we
@@ -286,7 +306,11 @@ Readable.prototype.read = function(n) {
if (n === 0 &&
state.needReadable &&
(state.length >= state.highWaterMark || state.ended)) {
- emitReadable(this);
+ debug('read: emitReadable', state.length, state.ended);
+ if (state.length === 0 && state.ended)
+ endReadable(this);
+ else
+ emitReadable(this);
return null;
}
@@ -294,28 +318,9 @@ Readable.prototype.read = function(n) {
// if we've ended, and we're now clear, then finish it up.
if (n === 0 && state.ended) {
- ret = null;
-
- // In cases where the decoder did not receive enough data
- // to produce a full chunk, then immediately received an
- // EOF, state.buffer will contain [<Buffer >, <Buffer 00 ...>].
- // howMuchToRead will see this and coerce the amount to
- // read to zero (because it's looking at the length of the
- // first <Buffer > in state.buffer), and we'll end up here.
- //
- // This can only happen via state.decoder -- no other venue
- // exists for pushing a zero-length chunk into state.buffer
- // and triggering this behavior. In this case, we return our
- // remaining data and end the stream, if appropriate.
- if (state.length > 0 && state.decoder) {
- ret = fromList(n, state);
- state.length -= ret.length;
- }
-
if (state.length === 0)
endReadable(this);
-
- return ret;
+ return null;
}
// All the actual chunk generation logic needs to be
@@ -342,17 +347,23 @@ Readable.prototype.read = function(n) {
// if we need a readable event, then we need to do some reading.
var doRead = state.needReadable;
+ debug('need readable', doRead);
// if we currently have less than the highWaterMark, then also read some
- if (state.length - n <= state.highWaterMark)
+ if (state.length === 0 || state.length - n < state.highWaterMark) {
doRead = true;
+ debug('length less than watermark', doRead);
+ }
// however, if we've ended, then there's no point, and if we're already
// reading, then it's unnecessary.
- if (state.ended || state.reading)
+ if (state.ended || state.reading) {
doRead = false;
+ debug('reading or ended', doRead);
+ }
if (doRead) {
+ debug('do read');
state.reading = true;
state.sync = true;
// if the length is currently zero, then we *need* a readable event.
@@ -363,18 +374,18 @@ Readable.prototype.read = function(n) {
state.sync = false;
}
- // If _read called its callback synchronously, then `reading`
- // will be false, and we need to re-evaluate how much data we
- // can return to the user.
+ // If _read pushed data synchronously, then `reading` will be false,
+ // and we need to re-evaluate how much data we can return to the user.
if (doRead && !state.reading)
n = howMuchToRead(nOrig, state);
+ var ret;
if (n > 0)
ret = fromList(n, state);
else
ret = null;
- if (ret === null) {
+ if (util.isNull(ret)) {
state.needReadable = true;
n = 0;
}
@@ -386,21 +397,21 @@ Readable.prototype.read = function(n) {
if (state.length === 0 && !state.ended)
state.needReadable = true;
- // If we happened to read() exactly the remaining amount in the
- // buffer, and the EOF has been seen at this point, then make sure
- // that we emit 'end' on the very next tick.
- if (state.ended && !state.endEmitted && state.length === 0)
+ // If we tried to read() past the EOF, then emit end on the next tick.
+ if (nOrig !== n && state.ended && state.length === 0)
endReadable(this);
+ if (!util.isNull(ret))
+ this.emit('data', ret);
+
return ret;
};
function chunkInvalid(state, chunk) {
var er = null;
- if (!Buffer.isBuffer(chunk) &&
- 'string' !== typeof chunk &&
- chunk !== null &&
- chunk !== undefined &&
+ if (!util.isBuffer(chunk) &&
+ !util.isString(chunk) &&
+ !util.isNullOrUndefined(chunk) &&
!state.objectMode) {
er = new TypeError('Invalid non-string/buffer chunk');
}
@@ -418,12 +429,8 @@ function onEofChunk(stream, state) {
}
state.ended = true;
- // if we've ended and we have some data left, then emit
- // 'readable' now to make sure it gets picked up.
- if (state.length > 0)
- emitReadable(stream);
- else
- endReadable(stream);
+ // emit 'readable' now to make sure it gets picked up.
+ emitReadable(stream);
}
// Don't emit readable right away in sync mode, because this can trigger
@@ -432,20 +439,22 @@ function onEofChunk(stream, state) {
function emitReadable(stream) {
var state = stream._readableState;
state.needReadable = false;
- if (state.emittedReadable)
- return;
-
- state.emittedReadable = true;
- if (state.sync)
- process.nextTick(function() {
+ if (!state.emittedReadable) {
+ debug('emitReadable', state.flowing);
+ state.emittedReadable = true;
+ if (state.sync)
+ process.nextTick(function() {
+ emitReadable_(stream);
+ });
+ else
emitReadable_(stream);
- });
- else
- emitReadable_(stream);
+ }
}
function emitReadable_(stream) {
+ debug('emit readable');
stream.emit('readable');
+ flow(stream);
}
@@ -468,6 +477,7 @@ function maybeReadMore_(stream, state) {
var len = state.length;
while (!state.reading && !state.flowing && !state.ended &&
state.length < state.highWaterMark) {
+ debug('maybeReadMore read 0');
stream.read(0);
if (len === state.length)
// didn't get any data, stop spinning.
@@ -502,6 +512,7 @@ Readable.prototype.pipe = function(dest, pipeOpts) {
break;
}
state.pipesCount += 1;
+ debug('pipe count=%d opts=%j', state.pipesCount, pipeOpts);
var doEnd = (!pipeOpts || pipeOpts.end !== false) &&
dest !== process.stdout &&
@@ -515,11 +526,14 @@ Readable.prototype.pipe = function(dest, pipeOpts) {
dest.on('unpipe', onunpipe);
function onunpipe(readable) {
- if (readable !== src) return;
- cleanup();
+ debug('onunpipe');
+ if (readable === src) {
+ cleanup();
+ }
}
function onend() {
+ debug('onend');
dest.end();
}
@@ -531,6 +545,7 @@ Readable.prototype.pipe = function(dest, pipeOpts) {
dest.on('drain', ondrain);
function cleanup() {
+ debug('cleanup');
// cleanup event handlers once the pipe is broken
dest.removeListener('close', onclose);
dest.removeListener('finish', onfinish);
@@ -539,19 +554,34 @@ Readable.prototype.pipe = function(dest, pipeOpts) {
dest.removeListener('unpipe', onunpipe);
src.removeListener('end', onend);
src.removeListener('end', cleanup);
+ src.removeListener('data', ondata);
// if the reader is waiting for a drain event from this
// specific writer, then it would cause it to never start
// flowing again.
// So, if this is awaiting a drain, then we just call it now.
// If we don't know, then assume that we are waiting for one.
- if (!dest._writableState || dest._writableState.needDrain)
+ if (state.awaitDrain &&
+ (!dest._writableState || dest._writableState.needDrain))
ondrain();
}
+ src.on('data', ondata);
+ function ondata(chunk) {
+ debug('ondata');
+ var ret = dest.write(chunk);
+ if (false === ret) {
+ debug('false write response, pause',
+ src._readableState.awaitDrain);
+ src._readableState.awaitDrain++;
+ src.pause();
+ }
+ }
+
// if the dest has an error, then stop piping into it.
// however, don't suppress the throwing behavior for this.
function onerror(er) {
+ debug('onerror', er);
unpipe();
dest.removeListener('error', onerror);
if (EE.listenerCount(dest, 'error') === 0)
@@ -575,12 +605,14 @@ Readable.prototype.pipe = function(dest, pipeOpts) {
}
dest.once('close', onclose);
function onfinish() {
+ debug('onfinish');
dest.removeListener('close', onclose);
unpipe();
}
dest.once('finish', onfinish);
function unpipe() {
+ debug('unpipe');
src.unpipe(dest);
}
@@ -589,16 +621,8 @@ Readable.prototype.pipe = function(dest, pipeOpts) {
// start the flow if it hasn't been started already.
if (!state.flowing) {
- // the handler that waits for readable events after all
- // the data gets sucked out in flow.
- // This would be easier to follow with a .once() handler
- // in flow(), but that is too slow.
- this.on('readable', pipeOnReadable);
-
- state.flowing = true;
- process.nextTick(function() {
- flow(src);
- });
+ debug('pipe resume');
+ src.resume();
}
return dest;
@@ -606,63 +630,15 @@ Readable.prototype.pipe = function(dest, pipeOpts) {
function pipeOnDrain(src) {
return function() {
- var dest = this;
var state = src._readableState;
- state.awaitDrain--;
- if (state.awaitDrain === 0)
+ debug('pipeOnDrain', state.awaitDrain);
+ if (state.awaitDrain)
+ state.awaitDrain--;
+ if (state.awaitDrain === 0 && EE.listenerCount(src, 'data')) {
+ state.flowing = true;
flow(src);
- };
-}
-
-function flow(src) {
- var state = src._readableState;
- var chunk;
- state.awaitDrain = 0;
-
- function write(dest, i, list) {
- var written = dest.write(chunk);
- if (false === written) {
- state.awaitDrain++;
}
- }
-
- while (state.pipesCount && null !== (chunk = src.read())) {
-
- if (state.pipesCount === 1)
- write(state.pipes, 0, null);
- else
- forEach(state.pipes, write);
-
- src.emit('data', chunk);
-
- // if anyone needs a drain, then we have to wait for that.
- if (state.awaitDrain > 0)
- return;
- }
-
- // if every destination was unpiped, either before entering this
- // function, or in the while loop, then stop flowing.
- //
- // NB: This is a pretty rare edge case.
- if (state.pipesCount === 0) {
- state.flowing = false;
-
- // if there were data event listeners added, then switch to old mode.
- if (EE.listenerCount(src, 'data') > 0)
- emitDataEvents(src);
- return;
- }
-
- // at this point, no one needed a drain, so we just ran out of data
- // on the next readable event, start it over again.
- state.ranOut = true;
-}
-
-function pipeOnReadable() {
- if (this._readableState.ranOut) {
- this._readableState.ranOut = false;
- flow(this);
- }
+ };
}
@@ -685,7 +661,6 @@ Readable.prototype.unpipe = function(dest) {
// got a match.
state.pipes = null;
state.pipesCount = 0;
- this.removeListener('readable', pipeOnReadable);
state.flowing = false;
if (dest)
dest.emit('unpipe', this);
@@ -700,7 +675,6 @@ Readable.prototype.unpipe = function(dest) {
var len = state.pipesCount;
state.pipes = null;
state.pipesCount = 0;
- this.removeListener('readable', pipeOnReadable);
state.flowing = false;
for (var i = 0; i < len; i++)
@@ -728,8 +702,11 @@ Readable.prototype.unpipe = function(dest) {
Readable.prototype.on = function(ev, fn) {
var res = Stream.prototype.on.call(this, ev, fn);
- if (ev === 'data' && !this._readableState.flowing)
- emitDataEvents(this);
+ // If listening to data, and it has not explicitly been paused,
+ // then call resume to start the flow of data on the next tick.
+ if (ev === 'data' && false !== this._readableState.flowing) {
+ this.resume();
+ }
if (ev === 'readable' && this.readable) {
var state = this._readableState;
@@ -738,7 +715,11 @@ Readable.prototype.on = function(ev, fn) {
state.emittedReadable = false;
state.needReadable = true;
if (!state.reading) {
- this.read(0);
+ var self = this;
+ process.nextTick(function() {
+ debug('readable nexttick read 0');
+ self.read(0);
+ });
} else if (state.length) {
emitReadable(this, state);
}
@@ -752,63 +733,54 @@ Readable.prototype.addListener = Readable.prototype.on;
// pause() and resume() are remnants of the legacy readable stream API
// If the user uses them, then switch into old mode.
Readable.prototype.resume = function() {
- emitDataEvents(this);
- this.read(0);
- this.emit('resume');
+ var state = this._readableState;
+ if (!state.flowing) {
+ debug('resume');
+ state.flowing = true;
+ if (!state.reading) {
+ debug('resume read 0');
+ this.read(0);
+ }
+ resume(this, state);
+ }
+ return this;
};
+function resume(stream, state) {
+ if (!state.resumeScheduled) {
+ state.resumeScheduled = true;
+ process.nextTick(function() {
+ resume_(stream, state);
+ });
+ }
+}
+
+function resume_(stream, state) {
+ state.resumeScheduled = false;
+ stream.emit('resume');
+ flow(stream);
+ if (state.flowing && !state.reading)
+ stream.read(0);
+}
+
Readable.prototype.pause = function() {
- emitDataEvents(this, true);
- this.emit('pause');
+ debug('call pause flowing=%j', this._readableState.flowing);
+ if (false !== this._readableState.flowing) {
+ debug('pause');
+ this._readableState.flowing = false;
+ this.emit('pause');
+ }
+ return this;
};
-function emitDataEvents(stream, startPaused) {
+function flow(stream) {
var state = stream._readableState;
-
+ debug('flow', state.flowing);
if (state.flowing) {
- // https://github.com/isaacs/readable-stream/issues/16
- throw new Error('Cannot switch to old mode now.');
+ do {
+ var chunk = stream.read();
+ } while (null !== chunk && state.flowing);
}
-
- var paused = startPaused || false;
- var readable = false;
-
- // convert to an old-style stream.
- stream.readable = true;
- stream.pipe = Stream.prototype.pipe;
- stream.on = stream.addListener = Stream.prototype.on;
-
- stream.on('readable', function() {
- readable = true;
-
- var c;
- while (!paused && (null !== (c = stream.read())))
- stream.emit('data', c);
-
- if (c === null) {
- readable = false;
- stream._readableState.needReadable = true;
- }
- });
-
- stream.pause = function() {
- paused = true;
- this.emit('pause');
- };
-
- stream.resume = function() {
- paused = false;
- if (readable)
- process.nextTick(function() {
- stream.emit('readable');
- });
- else
- this.read(0);
- this.emit('resume');
- };
-
- // now make it start, just in case it hadn't already.
- stream.emit('readable');
}
// wrap an old-style stream as the async data source.
@@ -820,6 +792,7 @@ Readable.prototype.wrap = function(stream) {
var self = this;
stream.on('end', function() {
+ debug('wrapped end');
if (state.decoder && !state.ended) {
var chunk = state.decoder.end();
if (chunk && chunk.length)
@@ -830,14 +803,10 @@ Readable.prototype.wrap = function(stream) {
});
stream.on('data', function(chunk) {
+ debug('wrapped data');
if (state.decoder)
chunk = state.decoder.write(chunk);
-
- // don't skip over falsy values in objectMode
- //if (state.objectMode && util.isNullOrUndefined(chunk))
- if (state.objectMode && (chunk === null || chunk === undefined))
- return;
- else if (!state.objectMode && (!chunk || !chunk.length))
+ if (!chunk || !state.objectMode && !chunk.length)
return;
var ret = self.push(chunk);
@@ -850,8 +819,7 @@ Readable.prototype.wrap = function(stream) {
// proxy all the other methods.
// important when wrapping filters and duplexes.
for (var i in stream) {
- if (typeof stream[i] === 'function' &&
- typeof this[i] === 'undefined') {
+ if (util.isFunction(stream[i]) && util.isUndefined(this[i])) {
this[i] = function(method) { return function() {
return stream[method].apply(stream, arguments);
}}(i);
@@ -867,6 +835,7 @@ Readable.prototype.wrap = function(stream) {
// when we try to consume some more bytes, simply unpause the
// underlying stream.
self._read = function(n) {
+ debug('wrapped _read', n);
if (paused) {
paused = false;
stream.resume();
@@ -955,7 +924,7 @@ function endReadable(stream) {
if (state.length > 0)
throw new Error('endReadable called on non-empty stream');
- if (!state.endEmitted && state.calledRead) {
+ if (!state.endEmitted) {
state.ended = true;
process.nextTick(function() {
// Check that we didn't get one last unshift.