diff options
author | Brian White <mscdex@mscdex.net> | 2017-05-19 04:07:53 -0400 |
---|---|---|
committer | Anna Henningsen <anna@addaleax.net> | 2017-05-23 20:50:22 +0200 |
commit | 359ea2a0bd772beb25267478d4f8e3ed59021e19 (patch) | |
tree | 5e19b0a5f377aca01ea51c22e5a00808942f16d0 /lib | |
parent | 594b5d7b8970a68d9cd66f629cf1862cfca68412 (diff) | |
download | android-node-v8-359ea2a0bd772beb25267478d4f8e3ed59021e19.tar.gz android-node-v8-359ea2a0bd772beb25267478d4f8e3ed59021e19.tar.bz2 android-node-v8-359ea2a0bd772beb25267478d4f8e3ed59021e19.zip |
stream: improve readable push performance
PR-URL: https://github.com/nodejs/node/pull/13113
Reviewed-By: James M Snell <jasnell@gmail.com>
Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
Diffstat (limited to 'lib')
-rw-r--r-- | lib/_stream_readable.js | 141 |
1 files changed, 74 insertions, 67 deletions
diff --git a/lib/_stream_readable.js b/lib/_stream_readable.js index 8b0d45cc86..9666c7d6fb 100644 --- a/lib/_stream_readable.js +++ b/lib/_stream_readable.js @@ -177,81 +177,97 @@ Readable.prototype._destroy = function(err, cb) { // write() some more. Readable.prototype.push = function(chunk, encoding) { var state = this._readableState; - - if (!state.objectMode && typeof chunk === 'string') { - encoding = encoding || state.defaultEncoding; - if (encoding !== state.encoding) { - chunk = Buffer.from(chunk, encoding); - encoding = ''; + var skipChunkCheck; + + if (!state.objectMode) { + if (typeof chunk === 'string') { + encoding = encoding || state.defaultEncoding; + if (encoding !== state.encoding) { + chunk = Buffer.from(chunk, encoding); + encoding = ''; + } + skipChunkCheck = true; } + } else { + skipChunkCheck = true; } - return readableAddChunk(this, state, chunk, encoding, false); + return readableAddChunk(this, chunk, encoding, false, skipChunkCheck); }; // Unshift should *always* be something directly out of read() Readable.prototype.unshift = function(chunk) { - var state = this._readableState; - return readableAddChunk(this, state, chunk, '', true); -}; - -Readable.prototype.isPaused = function() { - return this._readableState.flowing === false; + return readableAddChunk(this, chunk, null, true, false); }; -function readableAddChunk(stream, state, chunk, encoding, addToFront) { - var er = chunkInvalid(state, chunk); - if (er) { - stream.emit('error', er); - } else if (chunk === null) { +function readableAddChunk(stream, chunk, encoding, addToFront, skipChunkCheck) { + var state = stream._readableState; + if (chunk === null) { state.reading = false; onEofChunk(stream, state); - } else if (state.objectMode || chunk && chunk.length > 0) { - if (state.ended && !addToFront) { - const e = new Error('stream.push() after EOF'); - stream.emit('error', e); - } else if (state.endEmitted && addToFront) { - const e = new Error('stream.unshift() after end event'); - stream.emit('error', e); - } else { - var skipAdd; - if (state.decoder && !addToFront && !encoding) { - chunk = state.decoder.write(chunk); - skipAdd = (!state.objectMode && chunk.length === 0); - } - - if (!addToFront) + } else { + var er; + if (!skipChunkCheck) + er = chunkInvalid(state, chunk); + if (er) { + stream.emit('error', er); + } else if (state.objectMode || chunk && chunk.length > 0) { + if (addToFront) { + if (state.endEmitted) + stream.emit('error', new Error('stream.unshift() after end event')); + else + addChunk(stream, state, chunk, true); + } else if (state.ended) { + stream.emit('error', new Error('stream.push() after EOF')); + } else { state.reading = false; - - // Don't add to the buffer if we've decoded to an empty string chunk and - // we're not in object mode - if (!skipAdd) { - // if we want the data now, just emit it. - if (state.flowing && state.length === 0 && !state.sync) { - stream.emit('data', chunk); - stream.read(0); - } else { - // update the buffer info. - state.length += state.objectMode ? 1 : chunk.length; - if (addToFront) - state.buffer.unshift(chunk); + if (state.decoder && !encoding) { + chunk = state.decoder.write(chunk); + if (state.objectMode || chunk.length !== 0) + addChunk(stream, state, chunk, false); else - state.buffer.push(chunk); - - if (state.needReadable) - emitReadable(stream); + maybeReadMore(stream, state); + } else { + addChunk(stream, state, chunk, false); } } - - maybeReadMore(stream, state); + } else if (!addToFront) { + state.reading = false; } - } else if (!addToFront) { - state.reading = false; } return needMoreData(state); } +function addChunk(stream, state, chunk, addToFront) { + if (state.flowing && state.length === 0 && !state.sync) { + stream.emit('data', chunk); + stream.read(0); + } else { + // update the buffer info. + state.length += state.objectMode ? 1 : chunk.length; + if (addToFront) + state.buffer.unshift(chunk); + else + state.buffer.push(chunk); + + if (state.needReadable) + emitReadable(stream); + } + maybeReadMore(stream, state); +} + +function chunkInvalid(state, chunk) { + var er; + if (!(chunk instanceof Buffer) && + typeof chunk !== 'string' && + chunk !== undefined && + !state.objectMode) { + er = new TypeError('Invalid non-string/buffer chunk'); + } + return er; +} + // if it's past the high water mark, we can push in some more. // Also, if we have no data yet, we can stand some @@ -267,6 +283,10 @@ function needMoreData(state) { state.length === 0); } +Readable.prototype.isPaused = function() { + return this._readableState.flowing === false; +}; + // backwards compatibility. Readable.prototype.setEncoding = function(enc) { if (!StringDecoder) @@ -438,19 +458,6 @@ Readable.prototype.read = function(n) { return ret; }; -function chunkInvalid(state, chunk) { - var er = null; - if (!(chunk instanceof Buffer) && - typeof chunk !== 'string' && - chunk !== null && - chunk !== undefined && - !state.objectMode) { - er = new TypeError('Invalid non-string/buffer chunk'); - } - return er; -} - - function onEofChunk(stream, state) { if (state.ended) return; if (state.decoder) { |