diff options
author | Fedor Indutny <fedor.indutny@gmail.com> | 2013-04-14 08:15:15 -0400 |
---|---|---|
committer | Fedor Indutny <fedor.indutny@gmail.com> | 2013-04-27 15:59:13 +0400 |
commit | 21ed8df696833a12d8d12dc4f19b3e99434a50b4 (patch) | |
tree | a726a7c27c3ecde3d21472e92e1e46273050ded2 /lib | |
parent | 5db936d2aecaddb5d539855a150813e36df45b66 (diff) | |
download | android-node-v8-21ed8df696833a12d8d12dc4f19b3e99434a50b4.tar.gz android-node-v8-21ed8df696833a12d8d12dc4f19b3e99434a50b4.tar.bz2 android-node-v8-21ed8df696833a12d8d12dc4f19b3e99434a50b4.zip |
streams: introduce .cork/.uncork/._writev
Diffstat (limited to 'lib')
-rw-r--r-- | lib/_stream_writable.js | 103 |
1 files changed, 78 insertions, 25 deletions
diff --git a/lib/_stream_writable.js b/lib/_stream_writable.js index c060e015a0..9042df721b 100644 --- a/lib/_stream_writable.js +++ b/lib/_stream_writable.js @@ -76,6 +76,9 @@ function WritableState(options, stream) { // a flag to see when we're in the middle of a write. this.writing = false; + // when true all writes will be buffered until .uncork() call + this.corked = false; + // a flag to be able to tell if the onwrite cb is called immediately, // or on a later tick. We set this to true at first, becuase any // actions that shouldn't happen until "later" should generally also @@ -174,6 +177,26 @@ Writable.prototype.write = function(chunk, encoding, cb) { return ret; }; +Writable.prototype.cork = function() { + var state = this._writableState; + + state.corked = true; +}; + +Writable.prototype.uncork = function() { + var state = this._writableState; + + if (state.corked) { + state.corked = false; + + if (!state.writing && + !state.finished && + !state.bufferProcessing && + state.buffer.length) + clearBuffer(this, state); + } +}; + function decodeChunk(state, chunk, encoding) { if (!state.objectMode && state.decodeStrings !== false && @@ -195,20 +218,23 @@ function writeOrBuffer(stream, state, chunk, encoding, cb) { var ret = state.length < state.highWaterMark; state.needDrain = !ret; - if (state.writing) + if (state.writing || state.corked) state.buffer.push(new WriteReq(chunk, encoding, cb)); else - doWrite(stream, state, len, chunk, encoding, cb); + doWrite(stream, state, false, len, chunk, encoding, cb); return ret; } -function doWrite(stream, state, len, chunk, encoding, cb) { +function doWrite(stream, state, writev, len, chunk, encoding, cb) { state.writelen = len; state.writecb = cb; state.writing = true; state.sync = true; - stream._write(chunk, encoding, state.onwrite); + if (writev) + stream._writev(chunk, state.onwrite); + else + stream._write(chunk, encoding, state.onwrite); state.sync = false; } @@ -243,8 +269,12 @@ function onwrite(stream, er) { // Check if we're actually ready to finish, but don't emit yet var finished = needFinish(stream, state); - if (!finished && !state.bufferProcessing && state.buffer.length) + if (!finished && + !state.corked && + !state.bufferProcessing && + state.buffer.length) { clearBuffer(stream, state); + } if (sync) { process.nextTick(function() { @@ -279,36 +309,56 @@ function onwriteDrain(stream, state) { function clearBuffer(stream, state) { state.bufferProcessing = true; - for (var c = 0; c < state.buffer.length; c++) { - var entry = state.buffer[c]; - var chunk = entry.chunk; - var encoding = entry.encoding; - var cb = entry.callback; - var len = state.objectMode ? 1 : chunk.length; - - doWrite(stream, state, len, chunk, encoding, cb); - - // if we didn't call the onwrite immediately, then - // it means that we need to wait until it does. - // also, that means that the chunk and cb are currently - // being processed, so move the buffer counter past them. - if (state.writing) { - c++; - break; + if (stream._writev && state.buffer.length > 1) { + // Fast case, write everything using _writev() + var cbs = []; + for (var c = 0; c < state.buffer.length; c++) + cbs.push(state.buffer[c].callback); + + doWrite(stream, state, true, state.length, state.buffer, '', function(err) { + for (var i = 0; i < cbs.length; i++) + cbs[i](err); + }); + + // Clear buffer + state.buffer = []; + } else { + // Slow case, write chunks one-by-one + for (var c = 0; c < state.buffer.length; c++) { + var entry = state.buffer[c]; + var chunk = entry.chunk; + var encoding = entry.encoding; + var cb = entry.callback; + var len = state.objectMode ? 1 : chunk.length; + + doWrite(stream, state, false, len, chunk, encoding, cb); + + // if we didn't call the onwrite immediately, then + // it means that we need to wait until it does. + // also, that means that the chunk and cb are currently + // being processed, so move the buffer counter past them. + if (state.writing) { + c++; + break; + } } + + if (c < state.buffer.length) + state.buffer = state.buffer.slice(c); + else + state.buffer.length = 0; } state.bufferProcessing = false; - if (c < state.buffer.length) - state.buffer = state.buffer.slice(c); - else - state.buffer.length = 0; } Writable.prototype._write = function(chunk, encoding, cb) { cb(new Error('not implemented')); + }; +Writable.prototype._writev = null; + Writable.prototype.end = function(chunk, encoding, cb) { var state = this._writableState; @@ -324,6 +374,9 @@ Writable.prototype.end = function(chunk, encoding, cb) { if (typeof chunk !== 'undefined' && chunk !== null) this.write(chunk, encoding); + // .end() should .uncork() + this.uncork(); + // ignore unnecessary end() calls. if (!state.ending && !state.finished) endWritable(this, state, cb); |