aboutsummaryrefslogtreecommitdiff
path: root/lib
diff options
context:
space:
mode:
authorBrian White <mscdex@mscdex.net>2017-05-19 04:07:53 -0400
committerAnna Henningsen <anna@addaleax.net>2017-05-23 20:50:22 +0200
commit359ea2a0bd772beb25267478d4f8e3ed59021e19 (patch)
tree5e19b0a5f377aca01ea51c22e5a00808942f16d0 /lib
parent594b5d7b8970a68d9cd66f629cf1862cfca68412 (diff)
downloadandroid-node-v8-359ea2a0bd772beb25267478d4f8e3ed59021e19.tar.gz
android-node-v8-359ea2a0bd772beb25267478d4f8e3ed59021e19.tar.bz2
android-node-v8-359ea2a0bd772beb25267478d4f8e3ed59021e19.zip
stream: improve readable push performance
PR-URL: https://github.com/nodejs/node/pull/13113 Reviewed-By: James M Snell <jasnell@gmail.com> Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
Diffstat (limited to 'lib')
-rw-r--r--lib/_stream_readable.js141
1 files changed, 74 insertions, 67 deletions
diff --git a/lib/_stream_readable.js b/lib/_stream_readable.js
index 8b0d45cc86..9666c7d6fb 100644
--- a/lib/_stream_readable.js
+++ b/lib/_stream_readable.js
@@ -177,81 +177,97 @@ Readable.prototype._destroy = function(err, cb) {
// write() some more.
Readable.prototype.push = function(chunk, encoding) {
var state = this._readableState;
-
- if (!state.objectMode && typeof chunk === 'string') {
- encoding = encoding || state.defaultEncoding;
- if (encoding !== state.encoding) {
- chunk = Buffer.from(chunk, encoding);
- encoding = '';
+ var skipChunkCheck;
+
+ if (!state.objectMode) {
+ if (typeof chunk === 'string') {
+ encoding = encoding || state.defaultEncoding;
+ if (encoding !== state.encoding) {
+ chunk = Buffer.from(chunk, encoding);
+ encoding = '';
+ }
+ skipChunkCheck = true;
}
+ } else {
+ skipChunkCheck = true;
}
- return readableAddChunk(this, state, chunk, encoding, false);
+ return readableAddChunk(this, chunk, encoding, false, skipChunkCheck);
};
// Unshift should *always* be something directly out of read()
Readable.prototype.unshift = function(chunk) {
- var state = this._readableState;
- return readableAddChunk(this, state, chunk, '', true);
-};
-
-Readable.prototype.isPaused = function() {
- return this._readableState.flowing === false;
+ return readableAddChunk(this, chunk, null, true, false);
};
-function readableAddChunk(stream, state, chunk, encoding, addToFront) {
- var er = chunkInvalid(state, chunk);
- if (er) {
- stream.emit('error', er);
- } else if (chunk === null) {
+function readableAddChunk(stream, chunk, encoding, addToFront, skipChunkCheck) {
+ var state = stream._readableState;
+ if (chunk === null) {
state.reading = false;
onEofChunk(stream, state);
- } else if (state.objectMode || chunk && chunk.length > 0) {
- if (state.ended && !addToFront) {
- const e = new Error('stream.push() after EOF');
- stream.emit('error', e);
- } else if (state.endEmitted && addToFront) {
- const e = new Error('stream.unshift() after end event');
- stream.emit('error', e);
- } else {
- var skipAdd;
- if (state.decoder && !addToFront && !encoding) {
- chunk = state.decoder.write(chunk);
- skipAdd = (!state.objectMode && chunk.length === 0);
- }
-
- if (!addToFront)
+ } else {
+ var er;
+ if (!skipChunkCheck)
+ er = chunkInvalid(state, chunk);
+ if (er) {
+ stream.emit('error', er);
+ } else if (state.objectMode || chunk && chunk.length > 0) {
+ if (addToFront) {
+ if (state.endEmitted)
+ stream.emit('error', new Error('stream.unshift() after end event'));
+ else
+ addChunk(stream, state, chunk, true);
+ } else if (state.ended) {
+ stream.emit('error', new Error('stream.push() after EOF'));
+ } else {
state.reading = false;
-
- // Don't add to the buffer if we've decoded to an empty string chunk and
- // we're not in object mode
- if (!skipAdd) {
- // if we want the data now, just emit it.
- if (state.flowing && state.length === 0 && !state.sync) {
- stream.emit('data', chunk);
- stream.read(0);
- } else {
- // update the buffer info.
- state.length += state.objectMode ? 1 : chunk.length;
- if (addToFront)
- state.buffer.unshift(chunk);
+ if (state.decoder && !encoding) {
+ chunk = state.decoder.write(chunk);
+ if (state.objectMode || chunk.length !== 0)
+ addChunk(stream, state, chunk, false);
else
- state.buffer.push(chunk);
-
- if (state.needReadable)
- emitReadable(stream);
+ maybeReadMore(stream, state);
+ } else {
+ addChunk(stream, state, chunk, false);
}
}
-
- maybeReadMore(stream, state);
+ } else if (!addToFront) {
+ state.reading = false;
}
- } else if (!addToFront) {
- state.reading = false;
}
return needMoreData(state);
}
+function addChunk(stream, state, chunk, addToFront) {
+ if (state.flowing && state.length === 0 && !state.sync) {
+ stream.emit('data', chunk);
+ stream.read(0);
+ } else {
+ // update the buffer info.
+ state.length += state.objectMode ? 1 : chunk.length;
+ if (addToFront)
+ state.buffer.unshift(chunk);
+ else
+ state.buffer.push(chunk);
+
+ if (state.needReadable)
+ emitReadable(stream);
+ }
+ maybeReadMore(stream, state);
+}
+
+function chunkInvalid(state, chunk) {
+ var er;
+ if (!(chunk instanceof Buffer) &&
+ typeof chunk !== 'string' &&
+ chunk !== undefined &&
+ !state.objectMode) {
+ er = new TypeError('Invalid non-string/buffer chunk');
+ }
+ return er;
+}
+
// if it's past the high water mark, we can push in some more.
// Also, if we have no data yet, we can stand some
@@ -267,6 +283,10 @@ function needMoreData(state) {
state.length === 0);
}
+Readable.prototype.isPaused = function() {
+ return this._readableState.flowing === false;
+};
+
// backwards compatibility.
Readable.prototype.setEncoding = function(enc) {
if (!StringDecoder)
@@ -438,19 +458,6 @@ Readable.prototype.read = function(n) {
return ret;
};
-function chunkInvalid(state, chunk) {
- var er = null;
- if (!(chunk instanceof Buffer) &&
- typeof chunk !== 'string' &&
- chunk !== null &&
- chunk !== undefined &&
- !state.objectMode) {
- er = new TypeError('Invalid non-string/buffer chunk');
- }
- return er;
-}
-
-
function onEofChunk(stream, state) {
if (state.ended) return;
if (state.decoder) {