From 426b4c625802c7b6913fa09237aa9745bf3ae84a Mon Sep 17 00:00:00 2001 From: isaacs Date: Sun, 3 Mar 2013 19:14:06 -0800 Subject: stream: _write takes an encoding argument This vastly reduces the overhead of decodeStrings:false streams, such as net and http. --- lib/_stream_passthrough.js | 2 +- lib/_stream_transform.js | 5 +++-- lib/_stream_writable.js | 52 ++++++++++++++++------------------------------ lib/crypto.js | 12 +++++------ lib/fs.js | 6 ++++-- lib/net.js | 24 ++++++++++----------- lib/tls.js | 8 +++++-- lib/zlib.js | 4 ++-- 8 files changed, 52 insertions(+), 61 deletions(-) (limited to 'lib') diff --git a/lib/_stream_passthrough.js b/lib/_stream_passthrough.js index 557d6de99a..a5e986430d 100644 --- a/lib/_stream_passthrough.js +++ b/lib/_stream_passthrough.js @@ -36,6 +36,6 @@ function PassThrough(options) { Transform.call(this, options); } -PassThrough.prototype._transform = function(chunk, cb) { +PassThrough.prototype._transform = function(chunk, encoding, cb) { cb(null, chunk); }; diff --git a/lib/_stream_transform.js b/lib/_stream_transform.js index 222b1390f8..013bebde2a 100644 --- a/lib/_stream_transform.js +++ b/lib/_stream_transform.js @@ -155,10 +155,11 @@ Transform.prototype._transform = function(chunk, output, cb) { throw new Error('not implemented'); }; -Transform.prototype._write = function(chunk, cb) { +Transform.prototype._write = function(chunk, encoding, cb) { var ts = this._transformState; ts.writecb = cb; ts.writechunk = chunk; + ts.writeencoding = encoding; if (!ts.transforming) { var rs = this._readableState; if (ts.needTransform || @@ -176,7 +177,7 @@ Transform.prototype._read = function(n) { if (ts.writechunk && ts.writecb && !ts.transforming) { ts.transforming = true; - this._transform(ts.writechunk, ts.afterTransform); + this._transform(ts.writechunk, ts.writeencoding, ts.afterTransform); } else { // mark that we need a transform, so that any data that comes in // will get processed, now that we've asked for it. diff --git a/lib/_stream_writable.js b/lib/_stream_writable.js index 2dff2d8c75..57926ad57b 100644 --- a/lib/_stream_writable.js +++ b/lib/_stream_writable.js @@ -146,15 +146,6 @@ function validChunk(stream, state, chunk, cb) { return valid; } -function decodeChunk(state, chunk, encoding) { - if (!state.objectMode && - state.decodeStrings !== false && - typeof chunk === 'string') { - chunk = new Buffer(chunk, encoding); - } - return chunk; -} - Writable.prototype.write = function(chunk, encoding, cb) { var state = this._writableState; var ret = false; @@ -177,6 +168,15 @@ Writable.prototype.write = function(chunk, encoding, cb) { return ret; }; +function decodeChunk(state, chunk, encoding) { + if (!state.objectMode && + state.decodeStrings !== false && + typeof chunk === 'string') { + chunk = new Buffer(chunk, encoding); + } + return chunk; +} + // if we're already writing something, then just put this // in the queue, and wait our turn. Otherwise, call _write // If we return false, then we need a drain event, so set that flag. @@ -184,17 +184,13 @@ function writeOrBuffer(stream, state, chunk, encoding, cb) { chunk = decodeChunk(state, chunk, encoding); var len = state.objectMode ? 1 : chunk.length; - // XXX Remove. _write() should take an encoding. - if (state.decodeStrings === false) - chunk = [chunk, encoding]; - state.length += len; var ret = state.length < state.highWaterMark; state.needDrain = !ret; if (state.writing) - state.buffer.push([chunk, cb]); // XXX [chunk,encoding,cb] + state.buffer.push([chunk, encoding, cb]); else doWrite(stream, state, len, chunk, encoding, cb); @@ -206,8 +202,7 @@ function doWrite(stream, state, len, chunk, encoding, cb) { state.writecb = cb; state.writing = true; state.sync = true; - // XXX stream._write(chunk, encoding, state.onwrite) - stream._write(chunk, state.onwrite); + stream._write(chunk, encoding, state.onwrite); state.sync = false; } @@ -271,21 +266,12 @@ function onwriteDrain(stream, state) { function clearBuffer(stream, state) { state.bufferProcessing = true; - // XXX buffer entry should be [chunk, encoding, cb] for (var c = 0; c < state.buffer.length; c++) { - var chunkCb = state.buffer[c]; - var chunk = chunkCb[0]; - var cb = chunkCb[1]; - var encoding = ''; - var len; - - if (state.objectMode) - len = 1; - else if (false === state.decodeStrings) { - len = chunk[0].length; - encoding = chunk[1]; - } else - len = chunk.length; + var entry = state.buffer[c]; + var chunk = entry[0]; + var encoding = entry[1]; + var cb = entry[2]; + var len = state.objectMode ? 1 : chunk.length; doWrite(stream, state, len, chunk, encoding, cb); @@ -306,10 +292,8 @@ function clearBuffer(stream, state) { state.buffer.length = 0; } -Writable.prototype._write = function(chunk, cb) { - process.nextTick(function() { - cb(new Error('not implemented')); - }); +Writable.prototype._write = function(chunk, encoding, cb) { + cb(new Error('not implemented')); }; Writable.prototype.end = function(chunk, encoding, cb) { diff --git a/lib/crypto.js b/lib/crypto.js index 500e14d2f8..01d4b7125b 100644 --- a/lib/crypto.js +++ b/lib/crypto.js @@ -160,8 +160,8 @@ function Hash(algorithm, options) { util.inherits(Hash, stream.Transform); -Hash.prototype._transform = function(chunk, callback) { - this._binding.update(chunk); +Hash.prototype._transform = function(chunk, encoding, callback) { + this._binding.update(chunk, encoding); callback(); }; @@ -226,8 +226,8 @@ function Cipher(cipher, password, options) { util.inherits(Cipher, stream.Transform); -Cipher.prototype._transform = function(chunk, callback) { - this.push(this._binding.update(chunk)); +Cipher.prototype._transform = function(chunk, encoding, callback) { + this.push(this._binding.update(chunk, encoding)); callback(); }; @@ -351,8 +351,8 @@ function Sign(algorithm, options) { util.inherits(Sign, stream.Writable); -Sign.prototype._write = function(chunk, callback) { - this._binding.update(chunk); +Sign.prototype._write = function(chunk, encoding, callback) { + this._binding.update(chunk, encoding); callback(); }; diff --git a/lib/fs.js b/lib/fs.js index d467c5e0cc..39a34abc96 100644 --- a/lib/fs.js +++ b/lib/fs.js @@ -1650,12 +1650,14 @@ WriteStream.prototype.open = function() { }; -WriteStream.prototype._write = function(data, cb) { +WriteStream.prototype._write = function(data, encoding, cb) { if (!Buffer.isBuffer(data)) return this.emit('error', new Error('Invalid data')); if (typeof this.fd !== 'number') - return this.once('open', this._write.bind(this, data, cb)); + return this.once('open', function() { + this._write(data, encoding, cb); + }); var self = this; fs.write(this.fd, data, 0, data.length, this.pos, function(er, bytes) { diff --git a/lib/net.js b/lib/net.js index 35223c3396..4821665218 100644 --- a/lib/net.js +++ b/lib/net.js @@ -161,7 +161,8 @@ function Socket(options) { initSocketHandle(this); - this._pendingWrite = null; + this._pendingData = null; + this._pendingEncoding = ''; // handle strings directly this._writableState.decodeStrings = false; @@ -583,22 +584,20 @@ Socket.prototype.write = function(chunk, encoding, cb) { }; -Socket.prototype._write = function(dataEncoding, cb) { - // assert(Array.isArray(dataEncoding)); - var data = dataEncoding[0]; - var encoding = dataEncoding[1] || 'utf8'; - +Socket.prototype._write = function(data, encoding, cb) { // If we are still connecting, then buffer this for later. // The Writable logic will buffer up any more writes while // waiting for this one to be done. if (this._connecting) { - this._pendingWrite = dataEncoding; + this._pendingData = data; + this._pendingEncoding = encoding; this.once('connect', function() { - this._write(dataEncoding, cb); + this._write(data, encoding, cb); }); return; } - this._pendingWrite = null; + this._pendingData = null; + this._pendingEncoding = ''; timers.active(this); @@ -651,15 +650,16 @@ function createWriteReq(handle, data, encoding) { Socket.prototype.__defineGetter__('bytesWritten', function() { var bytes = this._bytesDispatched, state = this._writableState, - pending = this._pendingWrite; + data = this._pendingData, + encoding = this._pendingEncoding; state.buffer.forEach(function(el) { el = el[0]; bytes += Buffer.byteLength(el[0], el[1]); }); - if (pending) - bytes += Buffer.byteLength(pending[0], pending[1]); + if (data) + bytes += Buffer.byteLength(data, encoding); return bytes; }); diff --git a/lib/tls.js b/lib/tls.js index 86ace15b16..515761410b 100644 --- a/lib/tls.js +++ b/lib/tls.js @@ -239,6 +239,7 @@ function CryptoStream(pair, options) { this.pair = pair; this._pending = null; + this._pendingEncoding = ''; this._pendingCallback = null; this._doneFlag = false; this._resumingSession = false; @@ -300,7 +301,7 @@ function onCryptoStreamEnd() { } -CryptoStream.prototype._write = function write(data, cb) { +CryptoStream.prototype._write = function write(data, encoding, cb) { assert(this._pending === null); // Black-hole data @@ -361,6 +362,7 @@ CryptoStream.prototype._write = function write(data, cb) { // No write has happened this._pending = data; + this._pendingEncoding = encoding; this._pendingCallback = cb; if (this === this.pair.cleartext) { @@ -373,11 +375,13 @@ CryptoStream.prototype._write = function write(data, cb) { CryptoStream.prototype._writePending = function writePending() { var data = this._pending, + encoding = this._pendingEncoding, cb = this._pendingCallback; this._pending = null; + this._pendingEncoding = ''; this._pendingCallback = null; - this._write(data, cb); + this._write(data, encoding, cb); }; diff --git a/lib/zlib.js b/lib/zlib.js index d3aa858fba..dc0aeca304 100644 --- a/lib/zlib.js +++ b/lib/zlib.js @@ -309,7 +309,7 @@ Zlib.prototype.reset = function reset() { }; Zlib.prototype._flush = function(callback) { - this._transform(null, callback); + this._transform(null, '', callback); }; Zlib.prototype.flush = function(callback) { @@ -343,7 +343,7 @@ Zlib.prototype.close = function(callback) { }); }; -Zlib.prototype._transform = function(chunk, cb) { +Zlib.prototype._transform = function(chunk, encoding, cb) { var flushFlag; var ws = this._writableState; var ending = ws.ending || ws.ended; -- cgit v1.2.3