From 8acb416ad0532d249a0342f39103dd09fbbeeb2e Mon Sep 17 00:00:00 2001 From: isaacs Date: Sun, 7 Oct 2012 13:12:21 -0700 Subject: streams2: Handle immediate synthetic transforms properly --- lib/_stream_passthrough.js | 3 +- lib/_stream_readable.js | 134 ++++++++++++++++++++++++++++++--------------- lib/_stream_transform.js | 7 ++- 3 files changed, 96 insertions(+), 48 deletions(-) (limited to 'lib') diff --git a/lib/_stream_passthrough.js b/lib/_stream_passthrough.js index dd6390fc6e..5acd27b260 100644 --- a/lib/_stream_passthrough.js +++ b/lib/_stream_passthrough.js @@ -34,6 +34,5 @@ function PassThrough(options) { } PassThrough.prototype._transform = function(chunk, output, cb) { - output(chunk); - cb(); + cb(null, chunk); }; diff --git a/lib/_stream_readable.js b/lib/_stream_readable.js index ea944fcb44..916ebc2056 100644 --- a/lib/_stream_readable.js +++ b/lib/_stream_readable.js @@ -28,7 +28,7 @@ var StringDecoder; util.inherits(Readable, Stream); -function ReadableState(options, stream) { +function ReadableState(options) { options = options || {}; this.bufferSize = options.bufferSize || 16 * 1024; @@ -44,7 +44,6 @@ function ReadableState(options, stream) { this.flowing = false; this.ended = false; this.endEmitted = false; - this.stream = stream; this.reading = false; // whenever we return null, then we set a flag to say @@ -71,52 +70,76 @@ Readable.prototype.setEncoding = function(enc) { this._readableState.decoder = new StringDecoder(enc); }; -// you can override either this method, or _read(n, cb) below. -Readable.prototype.read = function(n) { - var state = this._readableState; - if (state.length === 0 && state.ended) { - endReadable(this); - return null; - } +function howMuchToRead(n, state) { + if (state.length === 0 && state.ended) + return 0; + + if (isNaN(n)) + return state.length; - if (isNaN(n) || n <= 0) - n = state.length + if (n <= 0) + return 0; - // XXX: controversial. // don't have that much. return null, unless we've ended. - // However, if the low water mark is lower than the number of bytes, - // then we still need to return what we have, or else it won't kick - // off another _read() call. For example, - // lwm=5 - // len=9 - // read(10) - // We don't have that many bytes, so it'd be tempting to return null, - // but then it won't ever cause _read to be called, so in that case, - // we just return what we have, and let the programmer deal with it. if (n > state.length) { - if (!state.ended && state.length <= state.lowWaterMark) { + if (!state.ended) { state.needReadable = true; - n = 0; + return 0; } else - n = state.length; + return state.length; } + return n; +} - var ret; - if (n > 0) - ret = fromList(n, state.buffer, state.length, !!state.decoder); - else - ret = null; +// you can override either this method, or _read(n, cb) below. +Readable.prototype.read = function(n) { + var state = this._readableState; + var nOrig = n; - if (ret === null || ret.length === 0) - state.needReadable = true; + n = howMuchToRead(n, state); - state.length -= n; + // if we've ended, and we're now clear, then finish it up. + if (n === 0 && state.ended) { + endReadable(this); + return null; + } - if (!state.ended && - state.length <= state.lowWaterMark && - !state.reading) { + // All the actual chunk generation logic needs to be + // *below* the call to _read. The reason is that in certain + // synthetic stream cases, such as passthrough streams, _read + // may be a completely synchronous operation which may change + // the state of the read buffer, providing enough data when + // before there was *not* enough. + // + // So, the steps are: + // 1. Figure out what the state of things will be after we do + // a read from the buffer. + // + // 2. If that resulting state will trigger a _read, then call _read. + // Note that this may be asynchronous, or synchronous. Yes, it is + // deeply ugly to write APIs this way, but that still doesn't mean + // that the Readable class should behave improperly, as streams are + // designed to be sync/async agnostic. + // Take note if the _read call is sync or async (ie, if the read call + // has returned yet), so that we know whether or not it's safe to emit + // 'readable' etc. + // + // 3. Actually pull the requested chunks out of the buffer and return. + + // if we need a readable event, then we need to do some reading. + var doRead = state.needReadable; + // if we currently have less than the lowWaterMark, then also read some + if (state.length - n <= state.lowWaterMark) + doRead = true; + // however, if we've ended, then there's no point, and if we're already + // reading, then it's unnecessary. + if (state.ended || state.reading) + doRead = false; + + if (doRead) { + var sync = true; state.reading = true; // call internal read method this._read(state.bufferSize, function onread(er, chunk) { @@ -125,21 +148,27 @@ Readable.prototype.read = function(n) { return this.emit('error', er); if (!chunk || !chunk.length) { + // eof state.ended = true; // if we've ended and we have some data left, then emit // 'readable' now to make sure it gets picked up. - if (state.length > 0) - this.emit('readable'); - else - endReadable(this); + if (!sync) { + if (state.length > 0) + this.emit('readable'); + else + endReadable(this); + } return; } if (state.decoder) chunk = state.decoder.write(chunk); - state.length += chunk.length; - state.buffer.push(chunk); + // update the buffer info. + if (chunk) { + state.length += chunk.length; + state.buffer.push(chunk); + } // if we haven't gotten enough to pass the lowWaterMark, // and we haven't ended, then don't bother telling the user @@ -152,14 +181,33 @@ Readable.prototype.read = function(n) { return; } - // now we have something to call this.read() to get. - if (state.needReadable) { + if (state.needReadable && !sync) { state.needReadable = false; this.emit('readable'); } }.bind(this)); + sync = false; } + // If _read called its callback synchronously, then `reading` + // will be false, and we need to re-evaluate how much data we + // can return to the user. + if (doRead && !state.reading) + n = howMuchToRead(nOrig, state); + + var ret; + if (n > 0) + ret = fromList(n, state.buffer, state.length, !!state.decoder); + else + ret = null; + + if (ret === null || ret.length === 0) { + state.needReadable = true; + n = 0; + } + + state.length -= n; + return ret; }; diff --git a/lib/_stream_transform.js b/lib/_stream_transform.js index 40917de7ab..16f2cacb93 100644 --- a/lib/_stream_transform.js +++ b/lib/_stream_transform.js @@ -122,13 +122,14 @@ Transform.prototype._write = function(chunk, cb) { if (ts.pendingReadCb) { var readcb = ts.pendingReadCb; ts.pendingReadCb = null; - this._read(-1, readcb); + this._read(0, readcb); } // if we weren't waiting for it, but nothing is queued up, then // still kick off a transform, just so it's there when the user asks. - if (rs.length === 0) { - var ret = this.read(); + var doRead = rs.needReadable || rs.length <= rs.lowWaterMark; + if (doRead && !rs.reading) { + var ret = this.read(0); if (ret !== null) return cb(new Error('invalid stream transform state')); } -- cgit v1.2.3