diff options
Diffstat (limited to 'deps/npm/node_modules/readable-stream/lib/_stream_readable.js')
-rw-r--r-- | deps/npm/node_modules/readable-stream/lib/_stream_readable.js | 248 |
1 files changed, 146 insertions, 102 deletions
diff --git a/deps/npm/node_modules/readable-stream/lib/_stream_readable.js b/deps/npm/node_modules/readable-stream/lib/_stream_readable.js index 79914fa684..208cc65f1c 100644 --- a/deps/npm/node_modules/readable-stream/lib/_stream_readable.js +++ b/deps/npm/node_modules/readable-stream/lib/_stream_readable.js @@ -51,21 +51,21 @@ if (debugUtil && debugUtil.debuglog) { } /*</replacement>*/ +var BufferList = require('./internal/streams/BufferList'); var StringDecoder; util.inherits(Readable, Stream); -var hasPrependListener = typeof EE.prototype.prependListener === 'function'; - function prependListener(emitter, event, fn) { - if (hasPrependListener) return emitter.prependListener(event, fn); - - // This is a brutally ugly hack to make sure that our error handler - // is attached before any userland ones. NEVER DO THIS. This is here - // only because this code needs to continue to work with older versions - // of Node.js that do not include the prependListener() method. The goal - // is to eventually remove this hack. - if (!emitter._events || !emitter._events[event]) emitter.on(event, fn);else if (isArray(emitter._events[event])) emitter._events[event].unshift(fn);else emitter._events[event] = [fn, emitter._events[event]]; + if (typeof emitter.prependListener === 'function') { + return emitter.prependListener(event, fn); + } else { + // This is a hack to make sure that our error handler is attached before any + // userland ones. NEVER DO THIS. This is here only because this code needs + // to continue to work with older versions of Node.js that do not include + // the prependListener() method. The goal is to eventually remove this hack. + if (!emitter._events || !emitter._events[event]) emitter.on(event, fn);else if (isArray(emitter._events[event])) emitter._events[event].unshift(fn);else emitter._events[event] = [fn, emitter._events[event]]; + } } var Duplex; @@ -89,7 +89,10 @@ function ReadableState(options, stream) { // cast to ints. this.highWaterMark = ~ ~this.highWaterMark; - this.buffer = []; + // A linked list is used to store data chunks instead of an array because the + // linked list can remove elements from the beginning faster than + // array.shift() + this.buffer = new BufferList(); this.length = 0; this.pipes = null; this.pipesCount = 0; @@ -252,7 +255,8 @@ function computeNewHighWaterMark(n) { if (n >= MAX_HWM) { n = MAX_HWM; } else { - // Get the next highest power of 2 + // Get the next highest power of 2 to prevent increasing hwm excessively in + // tiny amounts n--; n |= n >>> 1; n |= n >>> 2; @@ -264,44 +268,34 @@ function computeNewHighWaterMark(n) { return n; } +// This function is designed to be inlinable, so please take care when making +// changes to the function body. function howMuchToRead(n, state) { - if (state.length === 0 && state.ended) return 0; - - if (state.objectMode) return n === 0 ? 0 : 1; - - if (n === null || isNaN(n)) { - // only flow one buffer at a time - if (state.flowing && state.buffer.length) return state.buffer[0].length;else return state.length; + if (n <= 0 || state.length === 0 && state.ended) return 0; + if (state.objectMode) return 1; + if (n !== n) { + // Only flow one buffer at a time + if (state.flowing && state.length) return state.buffer.head.data.length;else return state.length; } - - if (n <= 0) return 0; - - // If we're asking for more than the target buffer level, - // then raise the water mark. Bump up to the next highest - // power of 2, to prevent increasing it excessively in tiny - // amounts. + // If we're asking for more than the current hwm, then raise the hwm. if (n > state.highWaterMark) state.highWaterMark = computeNewHighWaterMark(n); - - // don't have that much. return null, unless we've ended. - if (n > state.length) { - if (!state.ended) { - state.needReadable = true; - return 0; - } else { - return state.length; - } + if (n <= state.length) return n; + // Don't have enough + if (!state.ended) { + state.needReadable = true; + return 0; } - - return n; + return state.length; } // you can override either this method, or the async _read(n) below. Readable.prototype.read = function (n) { debug('read', n); + n = parseInt(n, 10); var state = this._readableState; var nOrig = n; - if (typeof n !== 'number' || n > 0) state.emittedReadable = false; + if (n !== 0) state.emittedReadable = false; // if we're doing read(0) to trigger a readable event, but we // already have a bunch of data in the buffer, then just trigger @@ -357,9 +351,7 @@ Readable.prototype.read = function (n) { if (state.ended || state.reading) { doRead = false; debug('reading or ended', doRead); - } - - if (doRead) { + } else if (doRead) { debug('do read'); state.reading = true; state.sync = true; @@ -368,28 +360,29 @@ Readable.prototype.read = function (n) { // call internal read method this._read(state.highWaterMark); state.sync = false; + // If _read pushed data synchronously, then `reading` will be false, + // and we need to re-evaluate how much data we can return to the user. + if (!state.reading) n = howMuchToRead(nOrig, state); } - // If _read pushed data synchronously, then `reading` will be false, - // and we need to re-evaluate how much data we can return to the user. - if (doRead && !state.reading) n = howMuchToRead(nOrig, state); - var ret; if (n > 0) ret = fromList(n, state);else ret = null; if (ret === null) { state.needReadable = true; n = 0; + } else { + state.length -= n; } - state.length -= n; - - // If we have nothing in the buffer, then we want to know - // as soon as we *do* get something into the buffer. - if (state.length === 0 && !state.ended) state.needReadable = true; + if (state.length === 0) { + // If we have nothing in the buffer, then we want to know + // as soon as we *do* get something into the buffer. + if (!state.ended) state.needReadable = true; - // If we tried to read() past the EOF, then emit end on the next tick. - if (nOrig !== n && state.ended && state.length === 0) endReadable(this); + // If we tried to read() past the EOF, then emit end on the next tick. + if (nOrig !== n && state.ended) endReadable(this); + } if (ret !== null) this.emit('data', ret); @@ -537,11 +530,17 @@ Readable.prototype.pipe = function (dest, pipeOpts) { if (state.awaitDrain && (!dest._writableState || dest._writableState.needDrain)) ondrain(); } + // If the user pushes more data while we're writing to dest then we'll end up + // in ondata again. However, we only want to increase awaitDrain once because + // dest will only emit one 'drain' event for the multiple writes. + // => Introduce a guard on increasing awaitDrain. + var increasedAwaitDrain = false; src.on('data', ondata); function ondata(chunk) { debug('ondata'); + increasedAwaitDrain = false; var ret = dest.write(chunk); - if (false === ret) { + if (false === ret && !increasedAwaitDrain) { // If the user unpiped during `dest.write()`, it is possible // to get stuck in a permanently paused state if that write // also returned false. @@ -549,6 +548,7 @@ Readable.prototype.pipe = function (dest, pipeOpts) { if ((state.pipesCount === 1 && state.pipes === dest || state.pipesCount > 1 && indexOf(state.pipes, dest) !== -1) && !cleanedUp) { debug('false write response, pause', src._readableState.awaitDrain); src._readableState.awaitDrain++; + increasedAwaitDrain = true; } src.pause(); } @@ -662,18 +662,14 @@ Readable.prototype.unpipe = function (dest) { Readable.prototype.on = function (ev, fn) { var res = Stream.prototype.on.call(this, ev, fn); - // If listening to data, and it has not explicitly been paused, - // then call resume to start the flow of data on the next tick. - if (ev === 'data' && false !== this._readableState.flowing) { - this.resume(); - } - - if (ev === 'readable' && !this._readableState.endEmitted) { + if (ev === 'data') { + // Start flowing on next tick if stream isn't explicitly paused + if (this._readableState.flowing !== false) this.resume(); + } else if (ev === 'readable') { var state = this._readableState; - if (!state.readableListening) { - state.readableListening = true; + if (!state.endEmitted && !state.readableListening) { + state.readableListening = state.needReadable = true; state.emittedReadable = false; - state.needReadable = true; if (!state.reading) { processNextTick(nReadingNextTick, this); } else if (state.length) { @@ -717,6 +713,7 @@ function resume_(stream, state) { } state.resumeScheduled = false; + state.awaitDrain = 0; stream.emit('resume'); flow(stream); if (state.flowing && !state.reading) stream.read(0); @@ -735,11 +732,7 @@ Readable.prototype.pause = function () { function flow(stream) { var state = stream._readableState; debug('flow', state.flowing); - if (state.flowing) { - do { - var chunk = stream.read(); - } while (null !== chunk && state.flowing); - } + while (state.flowing && stream.read() !== null) {} } // wrap an old-style stream as the async data source. @@ -810,50 +803,101 @@ Readable._fromList = fromList; // Pluck off n bytes from an array of buffers. // Length is the combined lengths of all the buffers in the list. +// This function is designed to be inlinable, so please take care when making +// changes to the function body. function fromList(n, state) { - var list = state.buffer; - var length = state.length; - var stringMode = !!state.decoder; - var objectMode = !!state.objectMode; - var ret; - - // nothing in the list, definitely empty. - if (list.length === 0) return null; + // nothing buffered + if (state.length === 0) return null; - if (length === 0) ret = null;else if (objectMode) ret = list.shift();else if (!n || n >= length) { - // read it all, truncate the array. - if (stringMode) ret = list.join('');else if (list.length === 1) ret = list[0];else ret = Buffer.concat(list, length); - list.length = 0; + var ret; + if (state.objectMode) ret = state.buffer.shift();else if (!n || n >= state.length) { + // read it all, truncate the list + if (state.decoder) ret = state.buffer.join('');else if (state.buffer.length === 1) ret = state.buffer.head.data;else ret = state.buffer.concat(state.length); + state.buffer.clear(); } else { - // read just some of it. - if (n < list[0].length) { - // just take a part of the first list item. - // slice is the same for buffers and strings. - var buf = list[0]; - ret = buf.slice(0, n); - list[0] = buf.slice(n); - } else if (n === list[0].length) { - // first list is a perfect match - ret = list.shift(); - } else { - // complex case. - // we have enough to cover it, but it spans past the first buffer. - if (stringMode) ret = '';else ret = bufferShim.allocUnsafe(n); - - var c = 0; - for (var i = 0, l = list.length; i < l && c < n; i++) { - var _buf = list[0]; - var cpy = Math.min(n - c, _buf.length); + // read part of list + ret = fromListPartial(n, state.buffer, state.decoder); + } - if (stringMode) ret += _buf.slice(0, cpy);else _buf.copy(ret, c, 0, cpy); + return ret; +} - if (cpy < _buf.length) list[0] = _buf.slice(cpy);else list.shift(); +// Extracts only enough buffered data to satisfy the amount requested. +// This function is designed to be inlinable, so please take care when making +// changes to the function body. +function fromListPartial(n, list, hasStrings) { + var ret; + if (n < list.head.data.length) { + // slice is the same for buffers and strings + ret = list.head.data.slice(0, n); + list.head.data = list.head.data.slice(n); + } else if (n === list.head.data.length) { + // first chunk is a perfect match + ret = list.shift(); + } else { + // result spans more than one buffer + ret = hasStrings ? copyFromBufferString(n, list) : copyFromBuffer(n, list); + } + return ret; +} - c += cpy; +// Copies a specified amount of characters from the list of buffered data +// chunks. +// This function is designed to be inlinable, so please take care when making +// changes to the function body. +function copyFromBufferString(n, list) { + var p = list.head; + var c = 1; + var ret = p.data; + n -= ret.length; + while (p = p.next) { + var str = p.data; + var nb = n > str.length ? str.length : n; + if (nb === str.length) ret += str;else ret += str.slice(0, n); + n -= nb; + if (n === 0) { + if (nb === str.length) { + ++c; + if (p.next) list.head = p.next;else list.head = list.tail = null; + } else { + list.head = p; + p.data = str.slice(nb); } + break; } + ++c; } + list.length -= c; + return ret; +} +// Copies a specified amount of bytes from the list of buffered data chunks. +// This function is designed to be inlinable, so please take care when making +// changes to the function body. +function copyFromBuffer(n, list) { + var ret = bufferShim.allocUnsafe(n); + var p = list.head; + var c = 1; + p.data.copy(ret); + n -= p.data.length; + while (p = p.next) { + var buf = p.data; + var nb = n > buf.length ? buf.length : n; + buf.copy(ret, ret.length - n, 0, nb); + n -= nb; + if (n === 0) { + if (nb === buf.length) { + ++c; + if (p.next) list.head = p.next;else list.head = list.tail = null; + } else { + list.head = p; + p.data = buf.slice(nb); + } + break; + } + ++c; + } + list.length -= c; return ret; } |