diff options
author | Rebecca Turner <me@re-becca.org> | 2015-06-12 04:04:17 -0400 |
---|---|---|
committer | Jeremiah Senkpiel <fishrock123@rocketmail.com> | 2015-06-15 10:15:55 -0700 |
commit | 3e12561b55de721faf7bcccad1dc0ccef7c28a47 (patch) | |
tree | dfe70fe3d6d00d47981b5df180c5b0d9aff32e1e /deps/npm/node_modules/readable-stream/lib/_stream_readable.js | |
parent | 5c2707c1b2078e0d4ede7fadb1adfa5eebf29210 (diff) | |
download | android-node-v8-3e12561b55de721faf7bcccad1dc0ccef7c28a47.tar.gz android-node-v8-3e12561b55de721faf7bcccad1dc0ccef7c28a47.tar.bz2 android-node-v8-3e12561b55de721faf7bcccad1dc0ccef7c28a47.zip |
deps: upgrade to npm 2.11.2
PR-URL: https://github.com/nodejs/io.js/pull/1956
Reviewed-By: Jeremiah Senkpiel <fishrock123@rocketmail.com>
Diffstat (limited to 'deps/npm/node_modules/readable-stream/lib/_stream_readable.js')
-rw-r--r-- | deps/npm/node_modules/readable-stream/lib/_stream_readable.js | 377 |
1 files changed, 173 insertions, 204 deletions
diff --git a/deps/npm/node_modules/readable-stream/lib/_stream_readable.js b/deps/npm/node_modules/readable-stream/lib/_stream_readable.js index 630722099e..19ab358898 100644 --- a/deps/npm/node_modules/readable-stream/lib/_stream_readable.js +++ b/deps/npm/node_modules/readable-stream/lib/_stream_readable.js @@ -49,15 +49,29 @@ util.inherits = require('inherits'); var StringDecoder; + +/*<replacement>*/ +var debug = require('util'); +if (debug && debug.debuglog) { + debug = debug.debuglog('stream'); +} else { + debug = function () {}; +} +/*</replacement>*/ + + util.inherits(Readable, Stream); function ReadableState(options, stream) { + var Duplex = require('./_stream_duplex'); + options = options || {}; // the point at which it stops calling _read() to fill the buffer // Note: 0 is a valid value, means "don't call _read preemptively ever" var hwm = options.highWaterMark; - this.highWaterMark = (hwm || hwm === 0) ? hwm : 16 * 1024; + var defaultHwm = options.objectMode ? 16 : 16 * 1024; + this.highWaterMark = (hwm || hwm === 0) ? hwm : defaultHwm; // cast to ints. this.highWaterMark = ~~this.highWaterMark; @@ -66,19 +80,13 @@ function ReadableState(options, stream) { this.length = 0; this.pipes = null; this.pipesCount = 0; - this.flowing = false; + this.flowing = null; this.ended = false; this.endEmitted = false; this.reading = false; - // In streams that never have any data, and do push(null) right away, - // the consumer can miss the 'end' event if they do some I/O before - // consuming the stream. So, we don't emit('end') until some reading - // happens. - this.calledRead = false; - // a flag to be able to tell if the onwrite cb is called immediately, - // or on a later tick. We set this to true at first, becuase any + // or on a later tick. We set this to true at first, because any // actions that shouldn't happen until "later" should generally also // not happen before the first write call. this.sync = true; @@ -94,6 +102,9 @@ function ReadableState(options, stream) { // make all the buffer merging and length checks go away this.objectMode = !!options.objectMode; + if (stream instanceof Duplex) + this.objectMode = this.objectMode || !!options.readableObjectMode; + // Crypto is kind of old and crusty. Historically, its default string // encoding is 'binary' so we have to make this configurable. // Everything else in the universe uses 'utf8', though. @@ -120,6 +131,8 @@ function ReadableState(options, stream) { } function Readable(options) { + var Duplex = require('./_stream_duplex'); + if (!(this instanceof Readable)) return new Readable(options); @@ -138,7 +151,7 @@ function Readable(options) { Readable.prototype.push = function(chunk, encoding) { var state = this._readableState; - if (typeof chunk === 'string' && !state.objectMode) { + if (util.isString(chunk) && !state.objectMode) { encoding = encoding || state.defaultEncoding; if (encoding !== state.encoding) { chunk = new Buffer(chunk, encoding); @@ -159,7 +172,7 @@ function readableAddChunk(stream, state, chunk, encoding, addToFront) { var er = chunkInvalid(state, chunk); if (er) { stream.emit('error', er); - } else if (chunk === null || chunk === undefined) { + } else if (util.isNullOrUndefined(chunk)) { state.reading = false; if (!state.ended) onEofChunk(stream, state); @@ -174,17 +187,24 @@ function readableAddChunk(stream, state, chunk, encoding, addToFront) { if (state.decoder && !addToFront && !encoding) chunk = state.decoder.write(chunk); - // update the buffer info. - state.length += state.objectMode ? 1 : chunk.length; - if (addToFront) { - state.buffer.unshift(chunk); - } else { + if (!addToFront) state.reading = false; - state.buffer.push(chunk); - } - if (state.needReadable) - emitReadable(stream); + // if we want the data now, just emit it. + if (state.flowing && state.length === 0 && !state.sync) { + stream.emit('data', chunk); + stream.read(0); + } else { + // update the buffer info. + state.length += state.objectMode ? 1 : chunk.length; + if (addToFront) + state.buffer.unshift(chunk); + else + state.buffer.push(chunk); + + if (state.needReadable) + emitReadable(stream); + } maybeReadMore(stream, state); } @@ -217,6 +237,7 @@ Readable.prototype.setEncoding = function(enc) { StringDecoder = require('string_decoder/').StringDecoder; this._readableState.decoder = new StringDecoder(enc); this._readableState.encoding = enc; + return this; }; // Don't raise the hwm > 128MB @@ -240,7 +261,7 @@ function howMuchToRead(n, state) { if (state.objectMode) return n === 0 ? 0 : 1; - if (n === null || isNaN(n)) { + if (isNaN(n) || util.isNull(n)) { // only flow one buffer at a time if (state.flowing && state.buffer.length) return state.buffer[0].length; @@ -272,12 +293,11 @@ function howMuchToRead(n, state) { // you can override either this method, or the async _read(n) below. Readable.prototype.read = function(n) { + debug('read', n); var state = this._readableState; - state.calledRead = true; var nOrig = n; - var ret; - if (typeof n !== 'number' || n > 0) + if (!util.isNumber(n) || n > 0) state.emittedReadable = false; // if we're doing read(0) to trigger a readable event, but we @@ -286,7 +306,11 @@ Readable.prototype.read = function(n) { if (n === 0 && state.needReadable && (state.length >= state.highWaterMark || state.ended)) { - emitReadable(this); + debug('read: emitReadable', state.length, state.ended); + if (state.length === 0 && state.ended) + endReadable(this); + else + emitReadable(this); return null; } @@ -294,28 +318,9 @@ Readable.prototype.read = function(n) { // if we've ended, and we're now clear, then finish it up. if (n === 0 && state.ended) { - ret = null; - - // In cases where the decoder did not receive enough data - // to produce a full chunk, then immediately received an - // EOF, state.buffer will contain [<Buffer >, <Buffer 00 ...>]. - // howMuchToRead will see this and coerce the amount to - // read to zero (because it's looking at the length of the - // first <Buffer > in state.buffer), and we'll end up here. - // - // This can only happen via state.decoder -- no other venue - // exists for pushing a zero-length chunk into state.buffer - // and triggering this behavior. In this case, we return our - // remaining data and end the stream, if appropriate. - if (state.length > 0 && state.decoder) { - ret = fromList(n, state); - state.length -= ret.length; - } - if (state.length === 0) endReadable(this); - - return ret; + return null; } // All the actual chunk generation logic needs to be @@ -342,17 +347,23 @@ Readable.prototype.read = function(n) { // if we need a readable event, then we need to do some reading. var doRead = state.needReadable; + debug('need readable', doRead); // if we currently have less than the highWaterMark, then also read some - if (state.length - n <= state.highWaterMark) + if (state.length === 0 || state.length - n < state.highWaterMark) { doRead = true; + debug('length less than watermark', doRead); + } // however, if we've ended, then there's no point, and if we're already // reading, then it's unnecessary. - if (state.ended || state.reading) + if (state.ended || state.reading) { doRead = false; + debug('reading or ended', doRead); + } if (doRead) { + debug('do read'); state.reading = true; state.sync = true; // if the length is currently zero, then we *need* a readable event. @@ -363,18 +374,18 @@ Readable.prototype.read = function(n) { state.sync = false; } - // If _read called its callback synchronously, then `reading` - // will be false, and we need to re-evaluate how much data we - // can return to the user. + // If _read pushed data synchronously, then `reading` will be false, + // and we need to re-evaluate how much data we can return to the user. if (doRead && !state.reading) n = howMuchToRead(nOrig, state); + var ret; if (n > 0) ret = fromList(n, state); else ret = null; - if (ret === null) { + if (util.isNull(ret)) { state.needReadable = true; n = 0; } @@ -386,21 +397,21 @@ Readable.prototype.read = function(n) { if (state.length === 0 && !state.ended) state.needReadable = true; - // If we happened to read() exactly the remaining amount in the - // buffer, and the EOF has been seen at this point, then make sure - // that we emit 'end' on the very next tick. - if (state.ended && !state.endEmitted && state.length === 0) + // If we tried to read() past the EOF, then emit end on the next tick. + if (nOrig !== n && state.ended && state.length === 0) endReadable(this); + if (!util.isNull(ret)) + this.emit('data', ret); + return ret; }; function chunkInvalid(state, chunk) { var er = null; - if (!Buffer.isBuffer(chunk) && - 'string' !== typeof chunk && - chunk !== null && - chunk !== undefined && + if (!util.isBuffer(chunk) && + !util.isString(chunk) && + !util.isNullOrUndefined(chunk) && !state.objectMode) { er = new TypeError('Invalid non-string/buffer chunk'); } @@ -418,12 +429,8 @@ function onEofChunk(stream, state) { } state.ended = true; - // if we've ended and we have some data left, then emit - // 'readable' now to make sure it gets picked up. - if (state.length > 0) - emitReadable(stream); - else - endReadable(stream); + // emit 'readable' now to make sure it gets picked up. + emitReadable(stream); } // Don't emit readable right away in sync mode, because this can trigger @@ -432,20 +439,22 @@ function onEofChunk(stream, state) { function emitReadable(stream) { var state = stream._readableState; state.needReadable = false; - if (state.emittedReadable) - return; - - state.emittedReadable = true; - if (state.sync) - process.nextTick(function() { + if (!state.emittedReadable) { + debug('emitReadable', state.flowing); + state.emittedReadable = true; + if (state.sync) + process.nextTick(function() { + emitReadable_(stream); + }); + else emitReadable_(stream); - }); - else - emitReadable_(stream); + } } function emitReadable_(stream) { + debug('emit readable'); stream.emit('readable'); + flow(stream); } @@ -468,6 +477,7 @@ function maybeReadMore_(stream, state) { var len = state.length; while (!state.reading && !state.flowing && !state.ended && state.length < state.highWaterMark) { + debug('maybeReadMore read 0'); stream.read(0); if (len === state.length) // didn't get any data, stop spinning. @@ -502,6 +512,7 @@ Readable.prototype.pipe = function(dest, pipeOpts) { break; } state.pipesCount += 1; + debug('pipe count=%d opts=%j', state.pipesCount, pipeOpts); var doEnd = (!pipeOpts || pipeOpts.end !== false) && dest !== process.stdout && @@ -515,11 +526,14 @@ Readable.prototype.pipe = function(dest, pipeOpts) { dest.on('unpipe', onunpipe); function onunpipe(readable) { - if (readable !== src) return; - cleanup(); + debug('onunpipe'); + if (readable === src) { + cleanup(); + } } function onend() { + debug('onend'); dest.end(); } @@ -531,6 +545,7 @@ Readable.prototype.pipe = function(dest, pipeOpts) { dest.on('drain', ondrain); function cleanup() { + debug('cleanup'); // cleanup event handlers once the pipe is broken dest.removeListener('close', onclose); dest.removeListener('finish', onfinish); @@ -539,19 +554,34 @@ Readable.prototype.pipe = function(dest, pipeOpts) { dest.removeListener('unpipe', onunpipe); src.removeListener('end', onend); src.removeListener('end', cleanup); + src.removeListener('data', ondata); // if the reader is waiting for a drain event from this // specific writer, then it would cause it to never start // flowing again. // So, if this is awaiting a drain, then we just call it now. // If we don't know, then assume that we are waiting for one. - if (!dest._writableState || dest._writableState.needDrain) + if (state.awaitDrain && + (!dest._writableState || dest._writableState.needDrain)) ondrain(); } + src.on('data', ondata); + function ondata(chunk) { + debug('ondata'); + var ret = dest.write(chunk); + if (false === ret) { + debug('false write response, pause', + src._readableState.awaitDrain); + src._readableState.awaitDrain++; + src.pause(); + } + } + // if the dest has an error, then stop piping into it. // however, don't suppress the throwing behavior for this. function onerror(er) { + debug('onerror', er); unpipe(); dest.removeListener('error', onerror); if (EE.listenerCount(dest, 'error') === 0) @@ -575,12 +605,14 @@ Readable.prototype.pipe = function(dest, pipeOpts) { } dest.once('close', onclose); function onfinish() { + debug('onfinish'); dest.removeListener('close', onclose); unpipe(); } dest.once('finish', onfinish); function unpipe() { + debug('unpipe'); src.unpipe(dest); } @@ -589,16 +621,8 @@ Readable.prototype.pipe = function(dest, pipeOpts) { // start the flow if it hasn't been started already. if (!state.flowing) { - // the handler that waits for readable events after all - // the data gets sucked out in flow. - // This would be easier to follow with a .once() handler - // in flow(), but that is too slow. - this.on('readable', pipeOnReadable); - - state.flowing = true; - process.nextTick(function() { - flow(src); - }); + debug('pipe resume'); + src.resume(); } return dest; @@ -606,63 +630,15 @@ Readable.prototype.pipe = function(dest, pipeOpts) { function pipeOnDrain(src) { return function() { - var dest = this; var state = src._readableState; - state.awaitDrain--; - if (state.awaitDrain === 0) + debug('pipeOnDrain', state.awaitDrain); + if (state.awaitDrain) + state.awaitDrain--; + if (state.awaitDrain === 0 && EE.listenerCount(src, 'data')) { + state.flowing = true; flow(src); - }; -} - -function flow(src) { - var state = src._readableState; - var chunk; - state.awaitDrain = 0; - - function write(dest, i, list) { - var written = dest.write(chunk); - if (false === written) { - state.awaitDrain++; } - } - - while (state.pipesCount && null !== (chunk = src.read())) { - - if (state.pipesCount === 1) - write(state.pipes, 0, null); - else - forEach(state.pipes, write); - - src.emit('data', chunk); - - // if anyone needs a drain, then we have to wait for that. - if (state.awaitDrain > 0) - return; - } - - // if every destination was unpiped, either before entering this - // function, or in the while loop, then stop flowing. - // - // NB: This is a pretty rare edge case. - if (state.pipesCount === 0) { - state.flowing = false; - - // if there were data event listeners added, then switch to old mode. - if (EE.listenerCount(src, 'data') > 0) - emitDataEvents(src); - return; - } - - // at this point, no one needed a drain, so we just ran out of data - // on the next readable event, start it over again. - state.ranOut = true; -} - -function pipeOnReadable() { - if (this._readableState.ranOut) { - this._readableState.ranOut = false; - flow(this); - } + }; } @@ -685,7 +661,6 @@ Readable.prototype.unpipe = function(dest) { // got a match. state.pipes = null; state.pipesCount = 0; - this.removeListener('readable', pipeOnReadable); state.flowing = false; if (dest) dest.emit('unpipe', this); @@ -700,7 +675,6 @@ Readable.prototype.unpipe = function(dest) { var len = state.pipesCount; state.pipes = null; state.pipesCount = 0; - this.removeListener('readable', pipeOnReadable); state.flowing = false; for (var i = 0; i < len; i++) @@ -728,8 +702,11 @@ Readable.prototype.unpipe = function(dest) { Readable.prototype.on = function(ev, fn) { var res = Stream.prototype.on.call(this, ev, fn); - if (ev === 'data' && !this._readableState.flowing) - emitDataEvents(this); + // If listening to data, and it has not explicitly been paused, + // then call resume to start the flow of data on the next tick. + if (ev === 'data' && false !== this._readableState.flowing) { + this.resume(); + } if (ev === 'readable' && this.readable) { var state = this._readableState; @@ -738,7 +715,11 @@ Readable.prototype.on = function(ev, fn) { state.emittedReadable = false; state.needReadable = true; if (!state.reading) { - this.read(0); + var self = this; + process.nextTick(function() { + debug('readable nexttick read 0'); + self.read(0); + }); } else if (state.length) { emitReadable(this, state); } @@ -752,63 +733,54 @@ Readable.prototype.addListener = Readable.prototype.on; // pause() and resume() are remnants of the legacy readable stream API // If the user uses them, then switch into old mode. Readable.prototype.resume = function() { - emitDataEvents(this); - this.read(0); - this.emit('resume'); + var state = this._readableState; + if (!state.flowing) { + debug('resume'); + state.flowing = true; + if (!state.reading) { + debug('resume read 0'); + this.read(0); + } + resume(this, state); + } + return this; }; +function resume(stream, state) { + if (!state.resumeScheduled) { + state.resumeScheduled = true; + process.nextTick(function() { + resume_(stream, state); + }); + } +} + +function resume_(stream, state) { + state.resumeScheduled = false; + stream.emit('resume'); + flow(stream); + if (state.flowing && !state.reading) + stream.read(0); +} + Readable.prototype.pause = function() { - emitDataEvents(this, true); - this.emit('pause'); + debug('call pause flowing=%j', this._readableState.flowing); + if (false !== this._readableState.flowing) { + debug('pause'); + this._readableState.flowing = false; + this.emit('pause'); + } + return this; }; -function emitDataEvents(stream, startPaused) { +function flow(stream) { var state = stream._readableState; - + debug('flow', state.flowing); if (state.flowing) { - // https://github.com/isaacs/readable-stream/issues/16 - throw new Error('Cannot switch to old mode now.'); + do { + var chunk = stream.read(); + } while (null !== chunk && state.flowing); } - - var paused = startPaused || false; - var readable = false; - - // convert to an old-style stream. - stream.readable = true; - stream.pipe = Stream.prototype.pipe; - stream.on = stream.addListener = Stream.prototype.on; - - stream.on('readable', function() { - readable = true; - - var c; - while (!paused && (null !== (c = stream.read()))) - stream.emit('data', c); - - if (c === null) { - readable = false; - stream._readableState.needReadable = true; - } - }); - - stream.pause = function() { - paused = true; - this.emit('pause'); - }; - - stream.resume = function() { - paused = false; - if (readable) - process.nextTick(function() { - stream.emit('readable'); - }); - else - this.read(0); - this.emit('resume'); - }; - - // now make it start, just in case it hadn't already. - stream.emit('readable'); } // wrap an old-style stream as the async data source. @@ -820,6 +792,7 @@ Readable.prototype.wrap = function(stream) { var self = this; stream.on('end', function() { + debug('wrapped end'); if (state.decoder && !state.ended) { var chunk = state.decoder.end(); if (chunk && chunk.length) @@ -830,14 +803,10 @@ Readable.prototype.wrap = function(stream) { }); stream.on('data', function(chunk) { + debug('wrapped data'); if (state.decoder) chunk = state.decoder.write(chunk); - - // don't skip over falsy values in objectMode - //if (state.objectMode && util.isNullOrUndefined(chunk)) - if (state.objectMode && (chunk === null || chunk === undefined)) - return; - else if (!state.objectMode && (!chunk || !chunk.length)) + if (!chunk || !state.objectMode && !chunk.length) return; var ret = self.push(chunk); @@ -850,8 +819,7 @@ Readable.prototype.wrap = function(stream) { // proxy all the other methods. // important when wrapping filters and duplexes. for (var i in stream) { - if (typeof stream[i] === 'function' && - typeof this[i] === 'undefined') { + if (util.isFunction(stream[i]) && util.isUndefined(this[i])) { this[i] = function(method) { return function() { return stream[method].apply(stream, arguments); }}(i); @@ -867,6 +835,7 @@ Readable.prototype.wrap = function(stream) { // when we try to consume some more bytes, simply unpause the // underlying stream. self._read = function(n) { + debug('wrapped _read', n); if (paused) { paused = false; stream.resume(); @@ -955,7 +924,7 @@ function endReadable(stream) { if (state.length > 0) throw new Error('endReadable called on non-empty stream'); - if (!state.endEmitted && state.calledRead) { + if (!state.endEmitted) { state.ended = true; process.nextTick(function() { // Check that we didn't get one last unshift. |