diff options
Diffstat (limited to 'deps/npm/node_modules/readable-stream/lib/_stream_transform.js')
-rw-r--r-- | deps/npm/node_modules/readable-stream/lib/_stream_transform.js | 71 |
1 files changed, 29 insertions, 42 deletions
diff --git a/deps/npm/node_modules/readable-stream/lib/_stream_transform.js b/deps/npm/node_modules/readable-stream/lib/_stream_transform.js index 5d1f8b876d..41a738c4e9 100644 --- a/deps/npm/node_modules/readable-stream/lib/_stream_transform.js +++ b/deps/npm/node_modules/readable-stream/lib/_stream_transform.js @@ -18,7 +18,6 @@ // DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR // OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE // USE OR OTHER DEALINGS IN THE SOFTWARE. - // a transform stream is a readable/writable stream where you do // something with the data. Sometimes it's called a "filter", // but that's not a great name for it, since that implies a thing where @@ -60,40 +59,37 @@ // However, even in such a pathological case, only a single written chunk // would be consumed, and then the rest would wait (un-transformed) until // the results of the previous transformed chunk were consumed. - 'use strict'; module.exports = Transform; -var Duplex = require('./_stream_duplex'); +var _require$codes = require('../errors').codes, + ERR_METHOD_NOT_IMPLEMENTED = _require$codes.ERR_METHOD_NOT_IMPLEMENTED, + ERR_MULTIPLE_CALLBACK = _require$codes.ERR_MULTIPLE_CALLBACK, + ERR_TRANSFORM_ALREADY_TRANSFORMING = _require$codes.ERR_TRANSFORM_ALREADY_TRANSFORMING, + ERR_TRANSFORM_WITH_LENGTH_0 = _require$codes.ERR_TRANSFORM_WITH_LENGTH_0; -/*<replacement>*/ -var util = require('core-util-is'); -util.inherits = require('inherits'); -/*</replacement>*/ +var Duplex = require('./_stream_duplex'); -util.inherits(Transform, Duplex); +require('inherits')(Transform, Duplex); function afterTransform(er, data) { var ts = this._transformState; ts.transforming = false; - var cb = ts.writecb; - if (!cb) { - return this.emit('error', new Error('write callback called multiple times')); + if (cb === null) { + return this.emit('error', new ERR_MULTIPLE_CALLBACK()); } ts.writechunk = null; ts.writecb = null; - if (data != null) // single equals check for both `null` and `undefined` this.push(data); - cb(er); - var rs = this._readableState; rs.reading = false; + if (rs.needReadable || rs.length < rs.highWaterMark) { this._read(rs.highWaterMark); } @@ -101,9 +97,7 @@ function afterTransform(er, data) { function Transform(options) { if (!(this instanceof Transform)) return new Transform(options); - Duplex.call(this, options); - this._transformState = { afterTransform: afterTransform.bind(this), needTransform: false, @@ -111,30 +105,27 @@ function Transform(options) { writecb: null, writechunk: null, writeencoding: null - }; - - // start out asking for a readable event once data is transformed. - this._readableState.needReadable = true; + }; // start out asking for a readable event once data is transformed. - // we have implemented the _read method, and done the other things + this._readableState.needReadable = true; // we have implemented the _read method, and done the other things // that Readable wants before the first _read call, so unset the // sync guard flag. + this._readableState.sync = false; if (options) { if (typeof options.transform === 'function') this._transform = options.transform; - if (typeof options.flush === 'function') this._flush = options.flush; - } + } // When the writable side finishes, then flush out anything remaining. + - // When the writable side finishes, then flush out anything remaining. this.on('prefinish', prefinish); } function prefinish() { var _this = this; - if (typeof this._flush === 'function') { + if (typeof this._flush === 'function' && !this._readableState.destroyed) { this._flush(function (er, data) { done(_this, er, data); }); @@ -146,9 +137,7 @@ function prefinish() { Transform.prototype.push = function (chunk, encoding) { this._transformState.needTransform = false; return Duplex.prototype.push.call(this, chunk, encoding); -}; - -// This is the part where you do stuff! +}; // This is the part where you do stuff! // override this function in implementation classes. // 'chunk' is an input chunk. // @@ -158,8 +147,10 @@ Transform.prototype.push = function (chunk, encoding) { // Call `cb(err)` when you are done with this chunk. If you pass // an error, then that'll put the hurt on the whole operation. If you // never call cb(), then you'll never get another chunk. + + Transform.prototype._transform = function (chunk, encoding, cb) { - throw new Error('_transform() is not implemented'); + cb(new ERR_METHOD_NOT_IMPLEMENTED('_transform()')); }; Transform.prototype._write = function (chunk, encoding, cb) { @@ -167,20 +158,22 @@ Transform.prototype._write = function (chunk, encoding, cb) { ts.writecb = cb; ts.writechunk = chunk; ts.writeencoding = encoding; + if (!ts.transforming) { var rs = this._readableState; if (ts.needTransform || rs.needReadable || rs.length < rs.highWaterMark) this._read(rs.highWaterMark); } -}; - -// Doesn't matter what the args are here. +}; // Doesn't matter what the args are here. // _transform does all the work. // That we got here means that the readable side wants more data. + + Transform.prototype._read = function (n) { var ts = this._transformState; - if (ts.writechunk !== null && ts.writecb && !ts.transforming) { + if (ts.writechunk !== null && !ts.transforming) { ts.transforming = true; + this._transform(ts.writechunk, ts.writeencoding, ts.afterTransform); } else { // mark that we need a transform, so that any data that comes in @@ -190,25 +183,19 @@ Transform.prototype._read = function (n) { }; Transform.prototype._destroy = function (err, cb) { - var _this2 = this; - Duplex.prototype._destroy.call(this, err, function (err2) { cb(err2); - _this2.emit('close'); }); }; function done(stream, er, data) { if (er) return stream.emit('error', er); - if (data != null) // single equals check for both `null` and `undefined` - stream.push(data); - + stream.push(data); // TODO(BridgeAR): Write a test for these two error cases // if there's nothing in the write buffer, then that means // that nothing more will ever be provided - if (stream._writableState.length) throw new Error('Calling transform done when ws.length != 0'); - - if (stream._transformState.transforming) throw new Error('Calling transform done when still transforming'); + if (stream._writableState.length) throw new ERR_TRANSFORM_WITH_LENGTH_0(); + if (stream._transformState.transforming) throw new ERR_TRANSFORM_ALREADY_TRANSFORMING(); return stream.push(null); }
\ No newline at end of file |