summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--lib/_stream_passthrough.js3
-rw-r--r--lib/_stream_readable.js134
-rw-r--r--lib/_stream_transform.js7
3 files changed, 96 insertions, 48 deletions
diff --git a/lib/_stream_passthrough.js b/lib/_stream_passthrough.js
index dd6390fc6e..5acd27b260 100644
--- a/lib/_stream_passthrough.js
+++ b/lib/_stream_passthrough.js
@@ -34,6 +34,5 @@ function PassThrough(options) {
}
PassThrough.prototype._transform = function(chunk, output, cb) {
- output(chunk);
- cb();
+ cb(null, chunk);
};
diff --git a/lib/_stream_readable.js b/lib/_stream_readable.js
index ea944fcb44..916ebc2056 100644
--- a/lib/_stream_readable.js
+++ b/lib/_stream_readable.js
@@ -28,7 +28,7 @@ var StringDecoder;
util.inherits(Readable, Stream);
-function ReadableState(options, stream) {
+function ReadableState(options) {
options = options || {};
this.bufferSize = options.bufferSize || 16 * 1024;
@@ -44,7 +44,6 @@ function ReadableState(options, stream) {
this.flowing = false;
this.ended = false;
this.endEmitted = false;
- this.stream = stream;
this.reading = false;
// whenever we return null, then we set a flag to say
@@ -71,52 +70,76 @@ Readable.prototype.setEncoding = function(enc) {
this._readableState.decoder = new StringDecoder(enc);
};
-// you can override either this method, or _read(n, cb) below.
-Readable.prototype.read = function(n) {
- var state = this._readableState;
- if (state.length === 0 && state.ended) {
- endReadable(this);
- return null;
- }
+function howMuchToRead(n, state) {
+ if (state.length === 0 && state.ended)
+ return 0;
+
+ if (isNaN(n))
+ return state.length;
- if (isNaN(n) || n <= 0)
- n = state.length
+ if (n <= 0)
+ return 0;
- // XXX: controversial.
// don't have that much. return null, unless we've ended.
- // However, if the low water mark is lower than the number of bytes,
- // then we still need to return what we have, or else it won't kick
- // off another _read() call. For example,
- // lwm=5
- // len=9
- // read(10)
- // We don't have that many bytes, so it'd be tempting to return null,
- // but then it won't ever cause _read to be called, so in that case,
- // we just return what we have, and let the programmer deal with it.
if (n > state.length) {
- if (!state.ended && state.length <= state.lowWaterMark) {
+ if (!state.ended) {
state.needReadable = true;
- n = 0;
+ return 0;
} else
- n = state.length;
+ return state.length;
}
+ return n;
+}
- var ret;
- if (n > 0)
- ret = fromList(n, state.buffer, state.length, !!state.decoder);
- else
- ret = null;
+// you can override either this method, or _read(n, cb) below.
+Readable.prototype.read = function(n) {
+ var state = this._readableState;
+ var nOrig = n;
- if (ret === null || ret.length === 0)
- state.needReadable = true;
+ n = howMuchToRead(n, state);
- state.length -= n;
+ // if we've ended, and we're now clear, then finish it up.
+ if (n === 0 && state.ended) {
+ endReadable(this);
+ return null;
+ }
- if (!state.ended &&
- state.length <= state.lowWaterMark &&
- !state.reading) {
+ // All the actual chunk generation logic needs to be
+ // *below* the call to _read. The reason is that in certain
+ // synthetic stream cases, such as passthrough streams, _read
+ // may be a completely synchronous operation which may change
+ // the state of the read buffer, providing enough data when
+ // before there was *not* enough.
+ //
+ // So, the steps are:
+ // 1. Figure out what the state of things will be after we do
+ // a read from the buffer.
+ //
+ // 2. If that resulting state will trigger a _read, then call _read.
+ // Note that this may be asynchronous, or synchronous. Yes, it is
+ // deeply ugly to write APIs this way, but that still doesn't mean
+ // that the Readable class should behave improperly, as streams are
+ // designed to be sync/async agnostic.
+ // Take note if the _read call is sync or async (ie, if the read call
+ // has returned yet), so that we know whether or not it's safe to emit
+ // 'readable' etc.
+ //
+ // 3. Actually pull the requested chunks out of the buffer and return.
+
+ // if we need a readable event, then we need to do some reading.
+ var doRead = state.needReadable;
+ // if we currently have less than the lowWaterMark, then also read some
+ if (state.length - n <= state.lowWaterMark)
+ doRead = true;
+ // however, if we've ended, then there's no point, and if we're already
+ // reading, then it's unnecessary.
+ if (state.ended || state.reading)
+ doRead = false;
+
+ if (doRead) {
+ var sync = true;
state.reading = true;
// call internal read method
this._read(state.bufferSize, function onread(er, chunk) {
@@ -125,21 +148,27 @@ Readable.prototype.read = function(n) {
return this.emit('error', er);
if (!chunk || !chunk.length) {
+ // eof
state.ended = true;
// if we've ended and we have some data left, then emit
// 'readable' now to make sure it gets picked up.
- if (state.length > 0)
- this.emit('readable');
- else
- endReadable(this);
+ if (!sync) {
+ if (state.length > 0)
+ this.emit('readable');
+ else
+ endReadable(this);
+ }
return;
}
if (state.decoder)
chunk = state.decoder.write(chunk);
- state.length += chunk.length;
- state.buffer.push(chunk);
+ // update the buffer info.
+ if (chunk) {
+ state.length += chunk.length;
+ state.buffer.push(chunk);
+ }
// if we haven't gotten enough to pass the lowWaterMark,
// and we haven't ended, then don't bother telling the user
@@ -152,14 +181,33 @@ Readable.prototype.read = function(n) {
return;
}
- // now we have something to call this.read() to get.
- if (state.needReadable) {
+ if (state.needReadable && !sync) {
state.needReadable = false;
this.emit('readable');
}
}.bind(this));
+ sync = false;
}
+ // If _read called its callback synchronously, then `reading`
+ // will be false, and we need to re-evaluate how much data we
+ // can return to the user.
+ if (doRead && !state.reading)
+ n = howMuchToRead(nOrig, state);
+
+ var ret;
+ if (n > 0)
+ ret = fromList(n, state.buffer, state.length, !!state.decoder);
+ else
+ ret = null;
+
+ if (ret === null || ret.length === 0) {
+ state.needReadable = true;
+ n = 0;
+ }
+
+ state.length -= n;
+
return ret;
};
diff --git a/lib/_stream_transform.js b/lib/_stream_transform.js
index 40917de7ab..16f2cacb93 100644
--- a/lib/_stream_transform.js
+++ b/lib/_stream_transform.js
@@ -122,13 +122,14 @@ Transform.prototype._write = function(chunk, cb) {
if (ts.pendingReadCb) {
var readcb = ts.pendingReadCb;
ts.pendingReadCb = null;
- this._read(-1, readcb);
+ this._read(0, readcb);
}
// if we weren't waiting for it, but nothing is queued up, then
// still kick off a transform, just so it's there when the user asks.
- if (rs.length === 0) {
- var ret = this.read();
+ var doRead = rs.needReadable || rs.length <= rs.lowWaterMark;
+ if (doRead && !rs.reading) {
+ var ret = this.read(0);
if (ret !== null)
return cb(new Error('invalid stream transform state'));
}