summaryrefslogtreecommitdiff
path: root/deps/npm/node_modules/readable-stream/lib/_stream_readable.js
diff options
context:
space:
mode:
Diffstat (limited to 'deps/npm/node_modules/readable-stream/lib/_stream_readable.js')
-rw-r--r--deps/npm/node_modules/readable-stream/lib/_stream_readable.js710
1 files changed, 374 insertions, 336 deletions
diff --git a/deps/npm/node_modules/readable-stream/lib/_stream_readable.js b/deps/npm/node_modules/readable-stream/lib/_stream_readable.js
index bf34ac65e1..b9b1b742cc 100644
--- a/deps/npm/node_modules/readable-stream/lib/_stream_readable.js
+++ b/deps/npm/node_modules/readable-stream/lib/_stream_readable.js
@@ -18,118 +18,111 @@
// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
// USE OR OTHER DEALINGS IN THE SOFTWARE.
-
'use strict';
-/*<replacement>*/
-
-var pna = require('process-nextick-args');
-/*</replacement>*/
-
module.exports = Readable;
-
/*<replacement>*/
-var isArray = require('isarray');
-/*</replacement>*/
-/*<replacement>*/
var Duplex;
/*</replacement>*/
Readable.ReadableState = ReadableState;
-
/*<replacement>*/
+
var EE = require('events').EventEmitter;
-var EElistenerCount = function (emitter, type) {
+var EElistenerCount = function EElistenerCount(emitter, type) {
return emitter.listeners(type).length;
};
/*</replacement>*/
/*<replacement>*/
+
+
var Stream = require('./internal/streams/stream');
/*</replacement>*/
-/*<replacement>*/
-var Buffer = require('safe-buffer').Buffer;
+var Buffer = require('buffer').Buffer;
+
var OurUint8Array = global.Uint8Array || function () {};
+
function _uint8ArrayToBuffer(chunk) {
return Buffer.from(chunk);
}
+
function _isUint8Array(obj) {
return Buffer.isBuffer(obj) || obj instanceof OurUint8Array;
}
-
-/*</replacement>*/
-
/*<replacement>*/
-var util = require('core-util-is');
-util.inherits = require('inherits');
-/*</replacement>*/
-/*<replacement>*/
+
var debugUtil = require('util');
-var debug = void 0;
+
+var debug;
+
if (debugUtil && debugUtil.debuglog) {
debug = debugUtil.debuglog('stream');
} else {
- debug = function () {};
+ debug = function debug() {};
}
/*</replacement>*/
-var BufferList = require('./internal/streams/BufferList');
+
+var BufferList = require('./internal/streams/buffer_list');
+
var destroyImpl = require('./internal/streams/destroy');
+
+var _require = require('./internal/streams/state'),
+ getHighWaterMark = _require.getHighWaterMark;
+
+var _require$codes = require('../errors').codes,
+ ERR_INVALID_ARG_TYPE = _require$codes.ERR_INVALID_ARG_TYPE,
+ ERR_STREAM_PUSH_AFTER_EOF = _require$codes.ERR_STREAM_PUSH_AFTER_EOF,
+ ERR_METHOD_NOT_IMPLEMENTED = _require$codes.ERR_METHOD_NOT_IMPLEMENTED,
+ ERR_STREAM_UNSHIFT_AFTER_END_EVENT = _require$codes.ERR_STREAM_UNSHIFT_AFTER_END_EVENT;
+
+var _require2 = require('../experimentalWarning'),
+ emitExperimentalWarning = _require2.emitExperimentalWarning; // Lazy loaded to improve the startup performance.
+
+
var StringDecoder;
+var createReadableStreamAsyncIterator;
-util.inherits(Readable, Stream);
+require('inherits')(Readable, Stream);
var kProxyEvents = ['error', 'close', 'destroy', 'pause', 'resume'];
function prependListener(emitter, event, fn) {
// Sadly this is not cacheable as some libraries bundle their own
// event emitter implementation with them.
- if (typeof emitter.prependListener === 'function') return emitter.prependListener(event, fn);
-
- // This is a hack to make sure that our error handler is attached before any
+ if (typeof emitter.prependListener === 'function') return emitter.prependListener(event, fn); // This is a hack to make sure that our error handler is attached before any
// userland ones. NEVER DO THIS. This is here only because this code needs
// to continue to work with older versions of Node.js that do not include
// the prependListener() method. The goal is to eventually remove this hack.
- if (!emitter._events || !emitter._events[event]) emitter.on(event, fn);else if (isArray(emitter._events[event])) emitter._events[event].unshift(fn);else emitter._events[event] = [fn, emitter._events[event]];
+
+ if (!emitter._events || !emitter._events[event]) emitter.on(event, fn);else if (Array.isArray(emitter._events[event])) emitter._events[event].unshift(fn);else emitter._events[event] = [fn, emitter._events[event]];
}
-function ReadableState(options, stream) {
+function ReadableState(options, stream, isDuplex) {
Duplex = Duplex || require('./_stream_duplex');
-
- options = options || {};
-
- // Duplex streams are both readable and writable, but share
+ options = options || {}; // Duplex streams are both readable and writable, but share
// the same options object.
// However, some cases require setting options to different
// values for the readable and the writable sides of the duplex stream.
// These options can be provided separately as readableXXX and writableXXX.
- var isDuplex = stream instanceof Duplex;
- // object stream flag. Used to make read(n) ignore n and to
+ if (typeof isDuplex !== 'boolean') isDuplex = stream instanceof Duplex; // object stream flag. Used to make read(n) ignore n and to
// make all the buffer merging and length checks go away
- this.objectMode = !!options.objectMode;
-
- if (isDuplex) this.objectMode = this.objectMode || !!options.readableObjectMode;
- // the point at which it stops calling _read() to fill the buffer
+ this.objectMode = !!options.objectMode;
+ if (isDuplex) this.objectMode = this.objectMode || !!options.readableObjectMode; // the point at which it stops calling _read() to fill the buffer
// Note: 0 is a valid value, means "don't call _read preemptively ever"
- var hwm = options.highWaterMark;
- var readableHwm = options.readableHighWaterMark;
- var defaultHwm = this.objectMode ? 16 : 16 * 1024;
-
- if (hwm || hwm === 0) this.highWaterMark = hwm;else if (isDuplex && (readableHwm || readableHwm === 0)) this.highWaterMark = readableHwm;else this.highWaterMark = defaultHwm;
-
- // cast to ints.
- this.highWaterMark = Math.floor(this.highWaterMark);
- // A linked list is used to store data chunks instead of an array because the
+ this.highWaterMark = getHighWaterMark(this, options, 'readableHighWaterMark', isDuplex); // A linked list is used to store data chunks instead of an array because the
// linked list can remove elements from the beginning faster than
// array.shift()
+
this.buffer = new BufferList();
this.length = 0;
this.pipes = null;
@@ -137,37 +130,33 @@ function ReadableState(options, stream) {
this.flowing = null;
this.ended = false;
this.endEmitted = false;
- this.reading = false;
-
- // a flag to be able to tell if the event 'readable'/'data' is emitted
+ this.reading = false; // a flag to be able to tell if the event 'readable'/'data' is emitted
// immediately, or on a later tick. We set this to true at first, because
// any actions that shouldn't happen until "later" should generally also
// not happen before the first read call.
- this.sync = true;
- // whenever we return null, then we set a flag to say
+ this.sync = true; // whenever we return null, then we set a flag to say
// that we're awaiting a 'readable' event emission.
+
this.needReadable = false;
this.emittedReadable = false;
this.readableListening = false;
- this.resumeScheduled = false;
+ this.resumeScheduled = false; // Should close be emitted on destroy. Defaults to true.
- // has it been destroyed
- this.destroyed = false;
+ this.emitClose = options.emitClose !== false; // has it been destroyed
- // Crypto is kind of old and crusty. Historically, its default string
+ this.destroyed = false; // Crypto is kind of old and crusty. Historically, its default string
// encoding is 'binary' so we have to make this configurable.
// Everything else in the universe uses 'utf8', though.
- this.defaultEncoding = options.defaultEncoding || 'utf8';
- // the number of writers that are awaiting a drain event in .pipe()s
- this.awaitDrain = 0;
+ this.defaultEncoding = options.defaultEncoding || 'utf8'; // the number of writers that are awaiting a drain event in .pipe()s
- // if true, a maybeReadMore has been scheduled
- this.readingMore = false;
+ this.awaitDrain = 0; // if true, a maybeReadMore has been scheduled
+ this.readingMore = false;
this.decoder = null;
this.encoding = null;
+
if (options.encoding) {
if (!StringDecoder) StringDecoder = require('string_decoder/').StringDecoder;
this.decoder = new StringDecoder(options.encoding);
@@ -177,17 +166,16 @@ function ReadableState(options, stream) {
function Readable(options) {
Duplex = Duplex || require('./_stream_duplex');
+ if (!(this instanceof Readable)) return new Readable(options); // Checking for a Stream.Duplex instance is faster here instead of inside
+ // the ReadableState constructor, at least with V8 6.5
- if (!(this instanceof Readable)) return new Readable(options);
+ var isDuplex = this instanceof Duplex;
+ this._readableState = new ReadableState(options, this, isDuplex); // legacy
- this._readableState = new ReadableState(options, this);
-
- // legacy
this.readable = true;
if (options) {
if (typeof options.read === 'function') this._read = options.read;
-
if (typeof options.destroy === 'function') this._destroy = options.destroy;
}
@@ -195,36 +183,40 @@ function Readable(options) {
}
Object.defineProperty(Readable.prototype, 'destroyed', {
- get: function () {
+ // making it explicit this property is not enumerable
+ // because otherwise some prototype manipulation in
+ // userland will fail
+ enumerable: false,
+ get: function get() {
if (this._readableState === undefined) {
return false;
}
+
return this._readableState.destroyed;
},
- set: function (value) {
+ set: function set(value) {
// we ignore the value if the stream
// has not been initialized yet
if (!this._readableState) {
return;
- }
-
- // backward compatibility, the user is explicitly
+ } // backward compatibility, the user is explicitly
// managing destroyed
+
+
this._readableState.destroyed = value;
}
});
-
Readable.prototype.destroy = destroyImpl.destroy;
Readable.prototype._undestroy = destroyImpl.undestroy;
+
Readable.prototype._destroy = function (err, cb) {
- this.push(null);
cb(err);
-};
-
-// Manually shove something into the read() buffer.
+}; // Manually shove something into the read() buffer.
// This returns true if the highWaterMark has not been hit yet,
// similar to how Writable.write() returns true if you should
// write() some more.
+
+
Readable.prototype.push = function (chunk, encoding) {
var state = this._readableState;
var skipChunkCheck;
@@ -232,10 +224,12 @@ Readable.prototype.push = function (chunk, encoding) {
if (!state.objectMode) {
if (typeof chunk === 'string') {
encoding = encoding || state.defaultEncoding;
+
if (encoding !== state.encoding) {
chunk = Buffer.from(chunk, encoding);
encoding = '';
}
+
skipChunkCheck = true;
}
} else {
@@ -243,21 +237,24 @@ Readable.prototype.push = function (chunk, encoding) {
}
return readableAddChunk(this, chunk, encoding, false, skipChunkCheck);
-};
+}; // Unshift should *always* be something directly out of read()
+
-// Unshift should *always* be something directly out of read()
Readable.prototype.unshift = function (chunk) {
return readableAddChunk(this, chunk, null, true, false);
};
function readableAddChunk(stream, chunk, encoding, addToFront, skipChunkCheck) {
+ debug('readableAddChunk', chunk);
var state = stream._readableState;
+
if (chunk === null) {
state.reading = false;
onEofChunk(stream, state);
} else {
var er;
if (!skipChunkCheck) er = chunkInvalid(state, chunk);
+
if (er) {
stream.emit('error', er);
} else if (state.objectMode || chunk && chunk.length > 0) {
@@ -266,11 +263,14 @@ function readableAddChunk(stream, chunk, encoding, addToFront, skipChunkCheck) {
}
if (addToFront) {
- if (state.endEmitted) stream.emit('error', new Error('stream.unshift() after end event'));else addChunk(stream, state, chunk, true);
+ if (state.endEmitted) stream.emit('error', new ERR_STREAM_UNSHIFT_AFTER_END_EVENT());else addChunk(stream, state, chunk, true);
} else if (state.ended) {
- stream.emit('error', new Error('stream.push() after EOF'));
+ stream.emit('error', new ERR_STREAM_PUSH_AFTER_EOF());
+ } else if (state.destroyed) {
+ return false;
} else {
state.reading = false;
+
if (state.decoder && !encoding) {
chunk = state.decoder.write(chunk);
if (state.objectMode || chunk.length !== 0) addChunk(stream, state, chunk, false);else maybeReadMore(stream, state);
@@ -280,59 +280,56 @@ function readableAddChunk(stream, chunk, encoding, addToFront, skipChunkCheck) {
}
} else if (!addToFront) {
state.reading = false;
+ maybeReadMore(stream, state);
}
- }
+ } // We can push more data if we are below the highWaterMark.
+ // Also, if we have no data yet, we can stand some more bytes.
+ // This is to work around cases where hwm=0, such as the repl.
- return needMoreData(state);
+
+ return !state.ended && (state.length < state.highWaterMark || state.length === 0);
}
function addChunk(stream, state, chunk, addToFront) {
if (state.flowing && state.length === 0 && !state.sync) {
+ state.awaitDrain = 0;
stream.emit('data', chunk);
- stream.read(0);
} else {
// update the buffer info.
state.length += state.objectMode ? 1 : chunk.length;
if (addToFront) state.buffer.unshift(chunk);else state.buffer.push(chunk);
-
if (state.needReadable) emitReadable(stream);
}
+
maybeReadMore(stream, state);
}
function chunkInvalid(state, chunk) {
var er;
+
if (!_isUint8Array(chunk) && typeof chunk !== 'string' && chunk !== undefined && !state.objectMode) {
- er = new TypeError('Invalid non-string/buffer chunk');
+ er = new ERR_INVALID_ARG_TYPE('chunk', ['string', 'Buffer', 'Uint8Array'], chunk);
}
- return er;
-}
-// if it's past the high water mark, we can push in some more.
-// Also, if we have no data yet, we can stand some
-// more bytes. This is to work around cases where hwm=0,
-// such as the repl. Also, if the push() triggered a
-// readable event, and the user called read(largeNumber) such that
-// needReadable was set, then we ought to push more, so that another
-// 'readable' event will be triggered.
-function needMoreData(state) {
- return !state.ended && (state.needReadable || state.length < state.highWaterMark || state.length === 0);
+ return er;
}
Readable.prototype.isPaused = function () {
return this._readableState.flowing === false;
-};
+}; // backwards compatibility.
+
-// backwards compatibility.
Readable.prototype.setEncoding = function (enc) {
if (!StringDecoder) StringDecoder = require('string_decoder/').StringDecoder;
- this._readableState.decoder = new StringDecoder(enc);
- this._readableState.encoding = enc;
+ this._readableState.decoder = new StringDecoder(enc); // if setEncoding(null), decoder.encoding equals utf8
+
+ this._readableState.encoding = this._readableState.decoder.encoding;
return this;
-};
+}; // Don't raise the hwm > 8MB
+
-// Don't raise the hwm > 8MB
var MAX_HWM = 0x800000;
+
function computeNewHighWaterMark(n) {
if (n >= MAX_HWM) {
n = MAX_HWM;
@@ -347,56 +344,55 @@ function computeNewHighWaterMark(n) {
n |= n >>> 16;
n++;
}
- return n;
-}
-// This function is designed to be inlinable, so please take care when making
+ return n;
+} // This function is designed to be inlinable, so please take care when making
// changes to the function body.
+
+
function howMuchToRead(n, state) {
if (n <= 0 || state.length === 0 && state.ended) return 0;
if (state.objectMode) return 1;
+
if (n !== n) {
// Only flow one buffer at a time
if (state.flowing && state.length) return state.buffer.head.data.length;else return state.length;
- }
- // If we're asking for more than the current hwm, then raise the hwm.
+ } // If we're asking for more than the current hwm, then raise the hwm.
+
+
if (n > state.highWaterMark) state.highWaterMark = computeNewHighWaterMark(n);
- if (n <= state.length) return n;
- // Don't have enough
+ if (n <= state.length) return n; // Don't have enough
+
if (!state.ended) {
state.needReadable = true;
return 0;
}
+
return state.length;
-}
+} // you can override either this method, or the async _read(n) below.
+
-// you can override either this method, or the async _read(n) below.
Readable.prototype.read = function (n) {
debug('read', n);
n = parseInt(n, 10);
var state = this._readableState;
var nOrig = n;
-
- if (n !== 0) state.emittedReadable = false;
-
- // if we're doing read(0) to trigger a readable event, but we
+ if (n !== 0) state.emittedReadable = false; // if we're doing read(0) to trigger a readable event, but we
// already have a bunch of data in the buffer, then just trigger
// the 'readable' event and move on.
- if (n === 0 && state.needReadable && (state.length >= state.highWaterMark || state.ended)) {
+
+ if (n === 0 && state.needReadable && ((state.highWaterMark !== 0 ? state.length >= state.highWaterMark : state.length > 0) || state.ended)) {
debug('read: emitReadable', state.length, state.ended);
if (state.length === 0 && state.ended) endReadable(this);else emitReadable(this);
return null;
}
- n = howMuchToRead(n, state);
+ n = howMuchToRead(n, state); // if we've ended, and we're now clear, then finish it up.
- // if we've ended, and we're now clear, then finish it up.
if (n === 0 && state.ended) {
if (state.length === 0) endReadable(this);
return null;
- }
-
- // All the actual chunk generation logic needs to be
+ } // All the actual chunk generation logic needs to be
// *below* the call to _read. The reason is that in certain
// synthetic stream cases, such as passthrough streams, _read
// may be a completely synchronous operation which may change
@@ -417,33 +413,34 @@ Readable.prototype.read = function (n) {
// 'readable' etc.
//
// 3. Actually pull the requested chunks out of the buffer and return.
-
// if we need a readable event, then we need to do some reading.
+
+
var doRead = state.needReadable;
- debug('need readable', doRead);
+ debug('need readable', doRead); // if we currently have less than the highWaterMark, then also read some
- // if we currently have less than the highWaterMark, then also read some
if (state.length === 0 || state.length - n < state.highWaterMark) {
doRead = true;
debug('length less than watermark', doRead);
- }
-
- // however, if we've ended, then there's no point, and if we're already
+ } // however, if we've ended, then there's no point, and if we're already
// reading, then it's unnecessary.
+
+
if (state.ended || state.reading) {
doRead = false;
debug('reading or ended', doRead);
} else if (doRead) {
debug('do read');
state.reading = true;
- state.sync = true;
- // if the length is currently zero, then we *need* a readable event.
- if (state.length === 0) state.needReadable = true;
- // call internal read method
+ state.sync = true; // if the length is currently zero, then we *need* a readable event.
+
+ if (state.length === 0) state.needReadable = true; // call internal read method
+
this._read(state.highWaterMark);
- state.sync = false;
- // If _read pushed data synchronously, then `reading` will be false,
+
+ state.sync = false; // If _read pushed data synchronously, then `reading` will be false,
// and we need to re-evaluate how much data we can return to the user.
+
if (!state.reading) n = howMuchToRead(nOrig, state);
}
@@ -455,87 +452,115 @@ Readable.prototype.read = function (n) {
n = 0;
} else {
state.length -= n;
+ state.awaitDrain = 0;
}
if (state.length === 0) {
// If we have nothing in the buffer, then we want to know
// as soon as we *do* get something into the buffer.
- if (!state.ended) state.needReadable = true;
+ if (!state.ended) state.needReadable = true; // If we tried to read() past the EOF, then emit end on the next tick.
- // If we tried to read() past the EOF, then emit end on the next tick.
if (nOrig !== n && state.ended) endReadable(this);
}
if (ret !== null) this.emit('data', ret);
-
return ret;
};
function onEofChunk(stream, state) {
if (state.ended) return;
+
if (state.decoder) {
var chunk = state.decoder.end();
+
if (chunk && chunk.length) {
state.buffer.push(chunk);
state.length += state.objectMode ? 1 : chunk.length;
}
}
+
state.ended = true;
- // emit 'readable' now to make sure it gets picked up.
- emitReadable(stream);
-}
+ if (state.sync) {
+ // if we are sync, wait until next tick to emit the data.
+ // Otherwise we risk emitting data in the flow()
+ // the readable code triggers during a read() call
+ emitReadable(stream);
+ } else {
+ // emit 'readable' now to make sure it gets picked up.
+ state.needReadable = false;
-// Don't emit readable right away in sync mode, because this can trigger
+ if (!state.emittedReadable) {
+ state.emittedReadable = true;
+ emitReadable_(stream);
+ }
+ }
+} // Don't emit readable right away in sync mode, because this can trigger
// another read() call => stack overflow. This way, it might trigger
// a nextTick recursion warning, but that's not so bad.
+
+
function emitReadable(stream) {
var state = stream._readableState;
state.needReadable = false;
+
if (!state.emittedReadable) {
debug('emitReadable', state.flowing);
state.emittedReadable = true;
- if (state.sync) pna.nextTick(emitReadable_, stream);else emitReadable_(stream);
+ process.nextTick(emitReadable_, stream);
}
}
function emitReadable_(stream) {
- debug('emit readable');
- stream.emit('readable');
- flow(stream);
-}
+ var state = stream._readableState;
+ debug('emitReadable_', state.destroyed, state.length, state.ended);
+
+ if (!state.destroyed && (state.length || state.ended)) {
+ stream.emit('readable');
+ } // The stream needs another readable event if
+ // 1. It is not flowing, as the flow mechanism will take
+ // care of it.
+ // 2. It is not ended.
+ // 3. It is below the highWaterMark, so we can schedule
+ // another readable later.
+
-// at this point, the user has presumably seen the 'readable' event,
+ state.needReadable = !state.flowing && !state.ended && state.length <= state.highWaterMark;
+ flow(stream);
+} // at this point, the user has presumably seen the 'readable' event,
// and called read() to consume some data. that may have triggered
// in turn another _read(n) call, in which case reading = true if
// it's in progress.
// However, if we're not ended, or reading, and the length < hwm,
// then go ahead and try to read some more preemptively.
+
+
function maybeReadMore(stream, state) {
if (!state.readingMore) {
state.readingMore = true;
- pna.nextTick(maybeReadMore_, stream, state);
+ process.nextTick(maybeReadMore_, stream, state);
}
}
function maybeReadMore_(stream, state) {
var len = state.length;
- while (!state.reading && !state.flowing && !state.ended && state.length < state.highWaterMark) {
+
+ while (!state.reading && !state.ended && state.length < state.highWaterMark) {
debug('maybeReadMore read 0');
stream.read(0);
- if (len === state.length)
- // didn't get any data, stop spinning.
+ if (len === state.length) // didn't get any data, stop spinning.
break;else len = state.length;
}
- state.readingMore = false;
-}
-// abstract method. to be overridden in specific implementation classes.
+ state.readingMore = false;
+} // abstract method. to be overridden in specific implementation classes.
// call cb(er, data) where data is <= n in length.
// for virtual (non-string, non-buffer) streams, "length" is somewhat
// arbitrary, and perhaps not very meaningful.
+
+
Readable.prototype._read = function (n) {
- this.emit('error', new Error('_read() is not implemented'));
+ this.emit('error', new ERR_METHOD_NOT_IMPLEMENTED('_read()'));
};
Readable.prototype.pipe = function (dest, pipeOpts) {
@@ -546,24 +571,26 @@ Readable.prototype.pipe = function (dest, pipeOpts) {
case 0:
state.pipes = dest;
break;
+
case 1:
state.pipes = [state.pipes, dest];
break;
+
default:
state.pipes.push(dest);
break;
}
+
state.pipesCount += 1;
debug('pipe count=%d opts=%j', state.pipesCount, pipeOpts);
-
var doEnd = (!pipeOpts || pipeOpts.end !== false) && dest !== process.stdout && dest !== process.stderr;
-
var endFn = doEnd ? onend : unpipe;
- if (state.endEmitted) pna.nextTick(endFn);else src.once('end', endFn);
-
+ if (state.endEmitted) process.nextTick(endFn);else src.once('end', endFn);
dest.on('unpipe', onunpipe);
+
function onunpipe(readable, unpipeInfo) {
debug('onunpipe');
+
if (readable === src) {
if (unpipeInfo && unpipeInfo.hasUnpiped === false) {
unpipeInfo.hasUnpiped = true;
@@ -575,19 +602,19 @@ Readable.prototype.pipe = function (dest, pipeOpts) {
function onend() {
debug('onend');
dest.end();
- }
-
- // when the dest drains, it reduces the awaitDrain counter
+ } // when the dest drains, it reduces the awaitDrain counter
// on the source. This would be more elegant with a .once()
// handler in flow(), but adding and removing repeatedly is
// too slow.
+
+
var ondrain = pipeOnDrain(src);
dest.on('drain', ondrain);
-
var cleanedUp = false;
+
function cleanup() {
- debug('cleanup');
- // cleanup event handlers once the pipe is broken
+ debug('cleanup'); // cleanup event handlers once the pipe is broken
+
dest.removeListener('close', onclose);
dest.removeListener('finish', onfinish);
dest.removeListener('drain', ondrain);
@@ -596,75 +623,71 @@ Readable.prototype.pipe = function (dest, pipeOpts) {
src.removeListener('end', onend);
src.removeListener('end', unpipe);
src.removeListener('data', ondata);
-
- cleanedUp = true;
-
- // if the reader is waiting for a drain event from this
+ cleanedUp = true; // if the reader is waiting for a drain event from this
// specific writer, then it would cause it to never start
// flowing again.
// So, if this is awaiting a drain, then we just call it now.
// If we don't know, then assume that we are waiting for one.
+
if (state.awaitDrain && (!dest._writableState || dest._writableState.needDrain)) ondrain();
}
- // If the user pushes more data while we're writing to dest then we'll end up
- // in ondata again. However, we only want to increase awaitDrain once because
- // dest will only emit one 'drain' event for the multiple writes.
- // => Introduce a guard on increasing awaitDrain.
- var increasedAwaitDrain = false;
src.on('data', ondata);
+
function ondata(chunk) {
debug('ondata');
- increasedAwaitDrain = false;
var ret = dest.write(chunk);
- if (false === ret && !increasedAwaitDrain) {
+ debug('dest.write', ret);
+
+ if (ret === false) {
// If the user unpiped during `dest.write()`, it is possible
// to get stuck in a permanently paused state if that write
// also returned false.
// => Check whether `dest` is still a piping destination.
if ((state.pipesCount === 1 && state.pipes === dest || state.pipesCount > 1 && indexOf(state.pipes, dest) !== -1) && !cleanedUp) {
- debug('false write response, pause', src._readableState.awaitDrain);
- src._readableState.awaitDrain++;
- increasedAwaitDrain = true;
+ debug('false write response, pause', state.awaitDrain);
+ state.awaitDrain++;
}
+
src.pause();
}
- }
-
- // if the dest has an error, then stop piping into it.
+ } // if the dest has an error, then stop piping into it.
// however, don't suppress the throwing behavior for this.
+
+
function onerror(er) {
debug('onerror', er);
unpipe();
dest.removeListener('error', onerror);
if (EElistenerCount(dest, 'error') === 0) dest.emit('error', er);
- }
+ } // Make sure our error handler is attached before userland ones.
+
- // Make sure our error handler is attached before userland ones.
- prependListener(dest, 'error', onerror);
+ prependListener(dest, 'error', onerror); // Both close and finish should trigger unpipe, but only once.
- // Both close and finish should trigger unpipe, but only once.
function onclose() {
dest.removeListener('finish', onfinish);
unpipe();
}
+
dest.once('close', onclose);
+
function onfinish() {
debug('onfinish');
dest.removeListener('close', onclose);
unpipe();
}
+
dest.once('finish', onfinish);
function unpipe() {
debug('unpipe');
src.unpipe(dest);
- }
+ } // tell the dest that it's being piped to
+
- // tell the dest that it's being piped to
- dest.emit('pipe', src);
+ dest.emit('pipe', src); // start the flow if it hasn't been started already.
- // start the flow if it hasn't been started already.
if (!state.flowing) {
debug('pipe resume');
src.resume();
@@ -674,10 +697,11 @@ Readable.prototype.pipe = function (dest, pipeOpts) {
};
function pipeOnDrain(src) {
- return function () {
+ return function pipeOnDrainFunctionResult() {
var state = src._readableState;
debug('pipeOnDrain', state.awaitDrain);
if (state.awaitDrain) state.awaitDrain--;
+
if (state.awaitDrain === 0 && EElistenerCount(src, 'data')) {
state.flowing = true;
flow(src);
@@ -687,27 +711,24 @@ function pipeOnDrain(src) {
Readable.prototype.unpipe = function (dest) {
var state = this._readableState;
- var unpipeInfo = { hasUnpiped: false };
+ var unpipeInfo = {
+ hasUnpiped: false
+ }; // if we're not piping anywhere, then do nothing.
- // if we're not piping anywhere, then do nothing.
- if (state.pipesCount === 0) return this;
+ if (state.pipesCount === 0) return this; // just one destination. most common case.
- // just one destination. most common case.
if (state.pipesCount === 1) {
// passed in one, but it's not the right one.
if (dest && dest !== state.pipes) return this;
+ if (!dest) dest = state.pipes; // got a match.
- if (!dest) dest = state.pipes;
-
- // got a match.
state.pipes = null;
state.pipesCount = 0;
state.flowing = false;
if (dest) dest.emit('unpipe', this, unpipeInfo);
return this;
- }
+ } // slow case. multiple pipe destinations.
- // slow case. multiple pipe destinations.
if (!dest) {
// remove all.
@@ -718,80 +739,133 @@ Readable.prototype.unpipe = function (dest) {
state.flowing = false;
for (var i = 0; i < len; i++) {
- dests[i].emit('unpipe', this, unpipeInfo);
- }return this;
- }
+ dests[i].emit('unpipe', this, {
+ hasUnpiped: false
+ });
+ }
+
+ return this;
+ } // try to find the right one.
+
- // try to find the right one.
var index = indexOf(state.pipes, dest);
if (index === -1) return this;
-
state.pipes.splice(index, 1);
state.pipesCount -= 1;
if (state.pipesCount === 1) state.pipes = state.pipes[0];
-
dest.emit('unpipe', this, unpipeInfo);
-
return this;
-};
-
-// set up data events if they are asked for
+}; // set up data events if they are asked for
// Ensure readable listeners eventually get something
+
+
Readable.prototype.on = function (ev, fn) {
var res = Stream.prototype.on.call(this, ev, fn);
+ var state = this._readableState;
if (ev === 'data') {
- // Start flowing on next tick if stream isn't explicitly paused
- if (this._readableState.flowing !== false) this.resume();
+ // update readableListening so that resume() may be a no-op
+ // a few lines down. This is needed to support once('readable').
+ state.readableListening = this.listenerCount('readable') > 0; // Try start flowing on next tick if stream isn't explicitly paused
+
+ if (state.flowing !== false) this.resume();
} else if (ev === 'readable') {
- var state = this._readableState;
if (!state.endEmitted && !state.readableListening) {
state.readableListening = state.needReadable = true;
+ state.flowing = false;
state.emittedReadable = false;
- if (!state.reading) {
- pna.nextTick(nReadingNextTick, this);
- } else if (state.length) {
+ debug('on readable', state.length, state.reading);
+
+ if (state.length) {
emitReadable(this);
+ } else if (!state.reading) {
+ process.nextTick(nReadingNextTick, this);
}
}
}
return res;
};
+
Readable.prototype.addListener = Readable.prototype.on;
+Readable.prototype.removeListener = function (ev, fn) {
+ var res = Stream.prototype.removeListener.call(this, ev, fn);
+
+ if (ev === 'readable') {
+ // We need to check if there is someone still listening to
+ // readable and reset the state. However this needs to happen
+ // after readable has been emitted but before I/O (nextTick) to
+ // support once('readable', fn) cycles. This means that calling
+ // resume within the same tick will have no
+ // effect.
+ process.nextTick(updateReadableListening, this);
+ }
+
+ return res;
+};
+
+Readable.prototype.removeAllListeners = function (ev) {
+ var res = Stream.prototype.removeAllListeners.apply(this, arguments);
+
+ if (ev === 'readable' || ev === undefined) {
+ // We need to check if there is someone still listening to
+ // readable and reset the state. However this needs to happen
+ // after readable has been emitted but before I/O (nextTick) to
+ // support once('readable', fn) cycles. This means that calling
+ // resume within the same tick will have no
+ // effect.
+ process.nextTick(updateReadableListening, this);
+ }
+
+ return res;
+};
+
+function updateReadableListening(self) {
+ self._readableState.readableListening = self.listenerCount('readable') > 0; // crude way to check if we should resume
+
+ if (self.listenerCount('data') > 0) {
+ self.resume();
+ }
+}
+
function nReadingNextTick(self) {
debug('readable nexttick read 0');
self.read(0);
-}
-
-// pause() and resume() are remnants of the legacy readable stream API
+} // pause() and resume() are remnants of the legacy readable stream API
// If the user uses them, then switch into old mode.
+
+
Readable.prototype.resume = function () {
var state = this._readableState;
+
if (!state.flowing) {
- debug('resume');
- state.flowing = true;
+ debug('resume'); // we flow only if there is no one listening
+ // for readable, but we still have to call
+ // resume()
+
+ state.flowing = !state.readableListening;
resume(this, state);
}
+
return this;
};
function resume(stream, state) {
if (!state.resumeScheduled) {
state.resumeScheduled = true;
- pna.nextTick(resume_, stream, state);
+ process.nextTick(resume_, stream, state);
}
}
function resume_(stream, state) {
+ debug('resume', state.reading);
+
if (!state.reading) {
- debug('resume read 0');
stream.read(0);
}
state.resumeScheduled = false;
- state.awaitDrain = 0;
stream.emit('resume');
flow(stream);
if (state.flowing && !state.reading) stream.read(0);
@@ -799,31 +873,36 @@ function resume_(stream, state) {
Readable.prototype.pause = function () {
debug('call pause flowing=%j', this._readableState.flowing);
- if (false !== this._readableState.flowing) {
+
+ if (this._readableState.flowing !== false) {
debug('pause');
this._readableState.flowing = false;
this.emit('pause');
}
+
return this;
};
function flow(stream) {
var state = stream._readableState;
debug('flow', state.flowing);
- while (state.flowing && stream.read() !== null) {}
-}
-// wrap an old-style stream as the async data source.
+ while (state.flowing && stream.read() !== null) {
+ ;
+ }
+} // wrap an old-style stream as the async data source.
// This is *not* part of the readable stream interface.
// It is an ugly unfortunate mess of history.
+
+
Readable.prototype.wrap = function (stream) {
var _this = this;
var state = this._readableState;
var paused = false;
-
stream.on('end', function () {
debug('wrapped end');
+
if (state.decoder && !state.ended) {
var chunk = state.decoder.end();
if (chunk && chunk.length) _this.push(chunk);
@@ -831,42 +910,41 @@ Readable.prototype.wrap = function (stream) {
_this.push(null);
});
-
stream.on('data', function (chunk) {
debug('wrapped data');
- if (state.decoder) chunk = state.decoder.write(chunk);
+ if (state.decoder) chunk = state.decoder.write(chunk); // don't skip over falsy values in objectMode
- // don't skip over falsy values in objectMode
if (state.objectMode && (chunk === null || chunk === undefined)) return;else if (!state.objectMode && (!chunk || !chunk.length)) return;
var ret = _this.push(chunk);
+
if (!ret) {
paused = true;
stream.pause();
}
- });
-
- // proxy all the other methods.
+ }); // proxy all the other methods.
// important when wrapping filters and duplexes.
+
for (var i in stream) {
if (this[i] === undefined && typeof stream[i] === 'function') {
- this[i] = function (method) {
- return function () {
+ this[i] = function methodWrap(method) {
+ return function methodWrapReturnFunction() {
return stream[method].apply(stream, arguments);
};
}(i);
}
- }
+ } // proxy certain important events.
+
- // proxy certain important events.
for (var n = 0; n < kProxyEvents.length; n++) {
stream.on(kProxyEvents[n], this.emit.bind(this, kProxyEvents[n]));
- }
-
- // when we try to consume some more bytes, simply unpause the
+ } // when we try to consume some more bytes, simply unpause the
// underlying stream.
+
+
this._read = function (n) {
debug('wrapped _read', n);
+
if (paused) {
paused = false;
stream.resume();
@@ -876,134 +954,93 @@ Readable.prototype.wrap = function (stream) {
return this;
};
+if (typeof Symbol === 'function') {
+ Readable.prototype[Symbol.asyncIterator] = function () {
+ emitExperimentalWarning('Readable[Symbol.asyncIterator]');
+
+ if (createReadableStreamAsyncIterator === undefined) {
+ createReadableStreamAsyncIterator = require('./internal/streams/async_iterator');
+ }
+
+ return createReadableStreamAsyncIterator(this);
+ };
+}
+
Object.defineProperty(Readable.prototype, 'readableHighWaterMark', {
// making it explicit this property is not enumerable
// because otherwise some prototype manipulation in
// userland will fail
enumerable: false,
- get: function () {
+ get: function get() {
return this._readableState.highWaterMark;
}
});
+Object.defineProperty(Readable.prototype, 'readableBuffer', {
+ // making it explicit this property is not enumerable
+ // because otherwise some prototype manipulation in
+ // userland will fail
+ enumerable: false,
+ get: function get() {
+ return this._readableState && this._readableState.buffer;
+ }
+});
+Object.defineProperty(Readable.prototype, 'readableFlowing', {
+ // making it explicit this property is not enumerable
+ // because otherwise some prototype manipulation in
+ // userland will fail
+ enumerable: false,
+ get: function get() {
+ return this._readableState.flowing;
+ },
+ set: function set(state) {
+ if (this._readableState) {
+ this._readableState.flowing = state;
+ }
+ }
+}); // exposed for testing purposes only.
-// exposed for testing purposes only.
Readable._fromList = fromList;
-
-// Pluck off n bytes from an array of buffers.
+Object.defineProperty(Readable.prototype, 'readableLength', {
+ // making it explicit this property is not enumerable
+ // because otherwise some prototype manipulation in
+ // userland will fail
+ enumerable: false,
+ get: function get() {
+ return this._readableState.length;
+ }
+}); // Pluck off n bytes from an array of buffers.
// Length is the combined lengths of all the buffers in the list.
// This function is designed to be inlinable, so please take care when making
// changes to the function body.
+
function fromList(n, state) {
// nothing buffered
if (state.length === 0) return null;
-
var ret;
if (state.objectMode) ret = state.buffer.shift();else if (!n || n >= state.length) {
// read it all, truncate the list
- if (state.decoder) ret = state.buffer.join('');else if (state.buffer.length === 1) ret = state.buffer.head.data;else ret = state.buffer.concat(state.length);
+ if (state.decoder) ret = state.buffer.join('');else if (state.buffer.length === 1) ret = state.buffer.first();else ret = state.buffer.concat(state.length);
state.buffer.clear();
} else {
// read part of list
- ret = fromListPartial(n, state.buffer, state.decoder);
- }
-
- return ret;
-}
-
-// Extracts only enough buffered data to satisfy the amount requested.
-// This function is designed to be inlinable, so please take care when making
-// changes to the function body.
-function fromListPartial(n, list, hasStrings) {
- var ret;
- if (n < list.head.data.length) {
- // slice is the same for buffers and strings
- ret = list.head.data.slice(0, n);
- list.head.data = list.head.data.slice(n);
- } else if (n === list.head.data.length) {
- // first chunk is a perfect match
- ret = list.shift();
- } else {
- // result spans more than one buffer
- ret = hasStrings ? copyFromBufferString(n, list) : copyFromBuffer(n, list);
- }
- return ret;
-}
-
-// Copies a specified amount of characters from the list of buffered data
-// chunks.
-// This function is designed to be inlinable, so please take care when making
-// changes to the function body.
-function copyFromBufferString(n, list) {
- var p = list.head;
- var c = 1;
- var ret = p.data;
- n -= ret.length;
- while (p = p.next) {
- var str = p.data;
- var nb = n > str.length ? str.length : n;
- if (nb === str.length) ret += str;else ret += str.slice(0, n);
- n -= nb;
- if (n === 0) {
- if (nb === str.length) {
- ++c;
- if (p.next) list.head = p.next;else list.head = list.tail = null;
- } else {
- list.head = p;
- p.data = str.slice(nb);
- }
- break;
- }
- ++c;
- }
- list.length -= c;
- return ret;
-}
-
-// Copies a specified amount of bytes from the list of buffered data chunks.
-// This function is designed to be inlinable, so please take care when making
-// changes to the function body.
-function copyFromBuffer(n, list) {
- var ret = Buffer.allocUnsafe(n);
- var p = list.head;
- var c = 1;
- p.data.copy(ret);
- n -= p.data.length;
- while (p = p.next) {
- var buf = p.data;
- var nb = n > buf.length ? buf.length : n;
- buf.copy(ret, ret.length - n, 0, nb);
- n -= nb;
- if (n === 0) {
- if (nb === buf.length) {
- ++c;
- if (p.next) list.head = p.next;else list.head = list.tail = null;
- } else {
- list.head = p;
- p.data = buf.slice(nb);
- }
- break;
- }
- ++c;
+ ret = state.buffer.consume(n, state.decoder);
}
- list.length -= c;
return ret;
}
function endReadable(stream) {
var state = stream._readableState;
-
- // If we get here before consuming all the bytes, then that is a
- // bug in node. Should never happen.
- if (state.length > 0) throw new Error('"endReadable()" called on non-empty stream');
+ debug('endReadable', state.endEmitted);
if (!state.endEmitted) {
state.ended = true;
- pna.nextTick(endReadableNT, state, stream);
+ process.nextTick(endReadableNT, state, stream);
}
}
function endReadableNT(state, stream) {
- // Check that we didn't get one last unshift.
+ debug('endReadableNT', state.endEmitted, state.length); // Check that we didn't get one last unshift.
+
if (!state.endEmitted && state.length === 0) {
state.endEmitted = true;
stream.readable = false;
@@ -1015,5 +1052,6 @@ function indexOf(xs, x) {
for (var i = 0, l = xs.length; i < l; i++) {
if (xs[i] === x) return i;
}
+
return -1;
} \ No newline at end of file