summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--doc/api/stream.markdown28
-rw-r--r--lib/_stream_passthrough.js2
-rw-r--r--lib/_stream_transform.js40
-rw-r--r--lib/crypto.js14
-rw-r--r--lib/zlib.js16
-rw-r--r--test/simple/test-stream2-transform.js34
-rw-r--r--test/simple/test-stream2-unpipe-drain.js2
-rw-r--r--test/simple/test-stream2-unpipe-leak.js2
8 files changed, 69 insertions, 69 deletions
diff --git a/doc/api/stream.markdown b/doc/api/stream.markdown
index a6a833779d..2b192d6b4a 100644
--- a/doc/api/stream.markdown
+++ b/doc/api/stream.markdown
@@ -589,11 +589,9 @@ In classes that extend the Transform class, make sure to call the
constructor so that the buffering settings can be properly
initialized.
-### transform.\_transform(chunk, outputFn, callback)
+### transform.\_transform(chunk, callback)
* `chunk` {Buffer} The chunk to be transformed.
-* `outputFn` {Function} Call this function with any output data to be
- passed to the readable interface.
* `callback` {Function} Call this function (optionally with an error
argument) when you are done processing the supplied chunk.
@@ -609,20 +607,21 @@ Transform class, to handle the bytes being written, and pass them off
to the readable portion of the interface. Do asynchronous I/O,
process things, and so on.
+Call `transform.push(outputChunk)` 0 or more times to generate output
+from this input chunk, depending on how much data you want to output
+as a result of this chunk.
+
Call the callback function only when the current chunk is completely
-consumed. Note that this may mean that you call the `outputFn` zero
-or more times, depending on how much data you want to output as a
-result of this chunk.
+consumed. Note that there may or may not be output as a result of any
+particular input chunk.
This method is prefixed with an underscore because it is internal to
the class that defines it, and should not be called directly by user
programs. However, you **are** expected to override this method in
your own extension classes.
-### transform.\_flush(outputFn, callback)
+### transform.\_flush(callback)
-* `outputFn` {Function} Call this function with any output data to be
- passed to the readable interface.
* `callback` {Function} Call this function (optionally with an error
argument) when you are done flushing any remaining data.
@@ -639,8 +638,9 @@ can with what is left, so that the data will be complete.
In those cases, you can implement a `_flush` method, which will be
called at the very end, after all the written data is consumed, but
before emitting `end` to signal the end of the readable side. Just
-like with `_transform`, call `outputFn` zero or more times, as
-appropriate, and call `callback` when the flush operation is complete.
+like with `_transform`, call `transform.push(chunk)` zero or more
+times, as appropriate, and call `callback` when the flush operation is
+complete.
This method is prefixed with an underscore because it is internal to
the class that defines it, and should not be called directly by user
@@ -671,7 +671,7 @@ function SimpleProtocol(options) {
SimpleProtocol.prototype = Object.create(
Transform.prototype, { constructor: { value: SimpleProtocol }});
-SimpleProtocol.prototype._transform = function(chunk, output, done) {
+SimpleProtocol.prototype._transform = function(chunk, done) {
if (!this._inBody) {
// check if the chunk has a \n\n
var split = -1;
@@ -707,11 +707,11 @@ SimpleProtocol.prototype._transform = function(chunk, output, done) {
this.emit('header', this.header);
// now, because we got some extra data, emit this first.
- output(b);
+ this.push(b);
}
} else {
// from there on, just provide the data to our consumer as-is.
- output(b);
+ this.push(b);
}
done();
};
diff --git a/lib/_stream_passthrough.js b/lib/_stream_passthrough.js
index 0f2fe14c78..557d6de99a 100644
--- a/lib/_stream_passthrough.js
+++ b/lib/_stream_passthrough.js
@@ -36,6 +36,6 @@ function PassThrough(options) {
Transform.call(this, options);
}
-PassThrough.prototype._transform = function(chunk, output, cb) {
+PassThrough.prototype._transform = function(chunk, cb) {
cb(null, chunk);
};
diff --git a/lib/_stream_transform.js b/lib/_stream_transform.js
index d8f6e60532..222b1390f8 100644
--- a/lib/_stream_transform.js
+++ b/lib/_stream_transform.js
@@ -71,10 +71,6 @@ util.inherits(Transform, Duplex);
function TransformState(options, stream) {
var ts = this;
- this.output = function(chunk) {
- ts.needTransform = false;
- stream.push(chunk);
- };
this.afterTransform = function(er, data) {
return afterTransform(stream, er, data);
@@ -99,7 +95,7 @@ function afterTransform(stream, er, data) {
ts.writecb = null;
if (data !== null && data !== undefined)
- ts.output(data);
+ stream.push(data);
if (cb)
cb(er);
@@ -132,7 +128,7 @@ function Transform(options) {
this.once('finish', function() {
if ('function' === typeof this._flush)
- this._flush(ts.output, function(er) {
+ this._flush(function(er) {
done(stream, er);
});
else
@@ -140,12 +136,17 @@ function Transform(options) {
});
}
+Transform.prototype.push = function(chunk) {
+ this._transformState.needTransform = false;
+ return Duplex.prototype.push.call(this, chunk);
+};
+
// This is the part where you do stuff!
// override this function in implementation classes.
// 'chunk' is an input chunk.
//
-// Call `output(newChunk)` to pass along transformed output
-// to the readable side. You may call 'output' zero or more times.
+// Call `push(newChunk)` to pass along transformed output
+// to the readable side. You may call 'push' zero or more times.
//
// Call `cb(err)` when you are done with this chunk. If you pass
// an error, then that'll put the hurt on the whole operation. If you
@@ -158,11 +159,13 @@ Transform.prototype._write = function(chunk, cb) {
var ts = this._transformState;
ts.writecb = cb;
ts.writechunk = chunk;
- if (ts.transforming)
- return;
- var rs = this._readableState;
- if (ts.needTransform || rs.needReadable || rs.length < rs.highWaterMark)
- this._read(rs.bufferSize);
+ if (!ts.transforming) {
+ var rs = this._readableState;
+ if (ts.needTransform ||
+ rs.needReadable ||
+ rs.length < rs.highWaterMark)
+ this._read(rs.bufferSize);
+ }
};
// Doesn't matter what the args are here.
@@ -173,13 +176,12 @@ Transform.prototype._read = function(n) {
if (ts.writechunk && ts.writecb && !ts.transforming) {
ts.transforming = true;
- this._transform(ts.writechunk, ts.output, ts.afterTransform);
- return;
+ this._transform(ts.writechunk, ts.afterTransform);
+ } else {
+ // mark that we need a transform, so that any data that comes in
+ // will get processed, now that we've asked for it.
+ ts.needTransform = true;
}
-
- // mark that we need a transform, so that any data that comes in
- // will get processed, now that we've asked for it.
- ts.needTransform = true;
};
diff --git a/lib/crypto.js b/lib/crypto.js
index 224c9da5db..500e14d2f8 100644
--- a/lib/crypto.js
+++ b/lib/crypto.js
@@ -160,13 +160,13 @@ function Hash(algorithm, options) {
util.inherits(Hash, stream.Transform);
-Hash.prototype._transform = function(chunk, output, callback) {
+Hash.prototype._transform = function(chunk, callback) {
this._binding.update(chunk);
callback();
};
-Hash.prototype._flush = function(output, callback) {
- output(this._binding.digest());
+Hash.prototype._flush = function(callback) {
+ this.push(this._binding.digest());
callback();
};
@@ -226,13 +226,13 @@ function Cipher(cipher, password, options) {
util.inherits(Cipher, stream.Transform);
-Cipher.prototype._transform = function(chunk, output, callback) {
- output(this._binding.update(chunk));
+Cipher.prototype._transform = function(chunk, callback) {
+ this.push(this._binding.update(chunk));
callback();
};
-Cipher.prototype._flush = function(output, callback) {
- output(this._binding.final());
+Cipher.prototype._flush = function(callback) {
+ this.push(this._binding.final());
callback();
};
diff --git a/lib/zlib.js b/lib/zlib.js
index 409286bc14..d3aa858fba 100644
--- a/lib/zlib.js
+++ b/lib/zlib.js
@@ -308,8 +308,8 @@ Zlib.prototype.reset = function reset() {
return this._binding.reset();
};
-Zlib.prototype._flush = function(output, callback) {
- this._transform(null, output, callback);
+Zlib.prototype._flush = function(callback) {
+ this._transform(null, callback);
};
Zlib.prototype.flush = function(callback) {
@@ -320,12 +320,10 @@ Zlib.prototype.flush = function(callback) {
ws.needDrain = true;
var self = this;
this.once('drain', function() {
- self._flush(ts.output, callback);
+ self._flush(callback);
});
- return;
- }
-
- this._flush(ts.output, callback || function() {});
+ } else
+ this._flush(callback || function() {});
};
Zlib.prototype.close = function(callback) {
@@ -345,7 +343,7 @@ Zlib.prototype.close = function(callback) {
});
};
-Zlib.prototype._transform = function(chunk, output, cb) {
+Zlib.prototype._transform = function(chunk, cb) {
var flushFlag;
var ws = this._writableState;
var ending = ws.ending || ws.ended;
@@ -392,7 +390,7 @@ Zlib.prototype._transform = function(chunk, output, cb) {
var out = self._buffer.slice(self._offset, self._offset + have);
self._offset += have;
// serve some output to the consumer.
- output(out);
+ self.push(out);
}
// exhausted the output buffer, or used all the input create a new one.
diff --git a/test/simple/test-stream2-transform.js b/test/simple/test-stream2-transform.js
index 998e1f8e51..baef18d0c7 100644
--- a/test/simple/test-stream2-transform.js
+++ b/test/simple/test-stream2-transform.js
@@ -67,9 +67,9 @@ test('writable side consumption', function(t) {
});
var transformed = 0;
- tx._transform = function(chunk, output, cb) {
+ tx._transform = function(chunk, cb) {
transformed += chunk.length;
- output(chunk);
+ tx.push(chunk);
cb();
};
@@ -106,10 +106,10 @@ test('passthrough', function(t) {
test('simple transform', function(t) {
var pt = new Transform;
- pt._transform = function(c, output, cb) {
+ pt._transform = function(c, cb) {
var ret = new Buffer(c.length);
ret.fill('x');
- output(ret);
+ pt.push(ret);
cb();
};
@@ -128,9 +128,9 @@ test('simple transform', function(t) {
test('async passthrough', function(t) {
var pt = new Transform;
- pt._transform = function(chunk, output, cb) {
+ pt._transform = function(chunk, cb) {
setTimeout(function() {
- output(chunk);
+ pt.push(chunk);
cb();
}, 10);
};
@@ -154,11 +154,11 @@ test('assymetric transform (expand)', function(t) {
var pt = new Transform;
// emit each chunk 2 times.
- pt._transform = function(chunk, output, cb) {
+ pt._transform = function(chunk, cb) {
setTimeout(function() {
- output(chunk);
+ pt.push(chunk);
setTimeout(function() {
- output(chunk);
+ pt.push(chunk);
cb();
}, 10)
}, 10);
@@ -189,24 +189,24 @@ test('assymetric transform (compress)', function(t) {
// or whatever's left.
pt.state = '';
- pt._transform = function(chunk, output, cb) {
+ pt._transform = function(chunk, cb) {
if (!chunk)
chunk = '';
var s = chunk.toString();
setTimeout(function() {
this.state += s.charAt(0);
if (this.state.length === 3) {
- output(new Buffer(this.state));
+ pt.push(new Buffer(this.state));
this.state = '';
}
cb();
}.bind(this), 10);
};
- pt._flush = function(output, cb) {
+ pt._flush = function(cb) {
// just output whatever we have.
setTimeout(function() {
- output(new Buffer(this.state));
+ pt.push(new Buffer(this.state));
this.state = '';
cb();
}.bind(this), 10);
@@ -359,9 +359,9 @@ test('passthrough facaded', function(t) {
test('object transform (json parse)', function(t) {
console.error('json parse stream');
var jp = new Transform({ objectMode: true });
- jp._transform = function(data, output, cb) {
+ jp._transform = function(data, cb) {
try {
- output(JSON.parse(data));
+ jp.push(JSON.parse(data));
cb();
} catch (er) {
cb(er);
@@ -399,9 +399,9 @@ test('object transform (json parse)', function(t) {
test('object transform (json stringify)', function(t) {
console.error('json parse stream');
var js = new Transform({ objectMode: true });
- js._transform = function(data, output, cb) {
+ js._transform = function(data, cb) {
try {
- output(JSON.stringify(data));
+ js.push(JSON.stringify(data));
cb();
} catch (er) {
cb(er);
diff --git a/test/simple/test-stream2-unpipe-drain.js b/test/simple/test-stream2-unpipe-drain.js
index 4f5b3b7532..d66dc3cbec 100644
--- a/test/simple/test-stream2-unpipe-drain.js
+++ b/test/simple/test-stream2-unpipe-drain.js
@@ -32,7 +32,7 @@ function TestWriter() {
}
util.inherits(TestWriter, stream.Writable);
-TestWriter.prototype._write = function (buffer, callback) {
+TestWriter.prototype._write = function (buffer, encoding, callback) {
console.log('write called');
// super slow write stream (callback never called)
};
diff --git a/test/simple/test-stream2-unpipe-leak.js b/test/simple/test-stream2-unpipe-leak.js
index 3ab02e9ac1..c2cf9077f3 100644
--- a/test/simple/test-stream2-unpipe-leak.js
+++ b/test/simple/test-stream2-unpipe-leak.js
@@ -31,7 +31,7 @@ function TestWriter() {
}
util.inherits(TestWriter, stream.Writable);
-TestWriter.prototype._write = function(buffer, callback) {
+TestWriter.prototype._write = function(buffer, encoding, callback) {
callback(null);
};