summaryrefslogtreecommitdiff
path: root/lib
diff options
context:
space:
mode:
authorFedor Indutny <fedor.indutny@gmail.com>2013-04-14 08:15:15 -0400
committerFedor Indutny <fedor.indutny@gmail.com>2013-04-27 15:59:13 +0400
commit21ed8df696833a12d8d12dc4f19b3e99434a50b4 (patch)
treea726a7c27c3ecde3d21472e92e1e46273050ded2 /lib
parent5db936d2aecaddb5d539855a150813e36df45b66 (diff)
downloadandroid-node-v8-21ed8df696833a12d8d12dc4f19b3e99434a50b4.tar.gz
android-node-v8-21ed8df696833a12d8d12dc4f19b3e99434a50b4.tar.bz2
android-node-v8-21ed8df696833a12d8d12dc4f19b3e99434a50b4.zip
streams: introduce .cork/.uncork/._writev
Diffstat (limited to 'lib')
-rw-r--r--lib/_stream_writable.js103
1 files changed, 78 insertions, 25 deletions
diff --git a/lib/_stream_writable.js b/lib/_stream_writable.js
index c060e015a0..9042df721b 100644
--- a/lib/_stream_writable.js
+++ b/lib/_stream_writable.js
@@ -76,6 +76,9 @@ function WritableState(options, stream) {
// a flag to see when we're in the middle of a write.
this.writing = false;
+ // when true all writes will be buffered until .uncork() call
+ this.corked = false;
+
// a flag to be able to tell if the onwrite cb is called immediately,
// or on a later tick. We set this to true at first, becuase any
// actions that shouldn't happen until "later" should generally also
@@ -174,6 +177,26 @@ Writable.prototype.write = function(chunk, encoding, cb) {
return ret;
};
+Writable.prototype.cork = function() {
+ var state = this._writableState;
+
+ state.corked = true;
+};
+
+Writable.prototype.uncork = function() {
+ var state = this._writableState;
+
+ if (state.corked) {
+ state.corked = false;
+
+ if (!state.writing &&
+ !state.finished &&
+ !state.bufferProcessing &&
+ state.buffer.length)
+ clearBuffer(this, state);
+ }
+};
+
function decodeChunk(state, chunk, encoding) {
if (!state.objectMode &&
state.decodeStrings !== false &&
@@ -195,20 +218,23 @@ function writeOrBuffer(stream, state, chunk, encoding, cb) {
var ret = state.length < state.highWaterMark;
state.needDrain = !ret;
- if (state.writing)
+ if (state.writing || state.corked)
state.buffer.push(new WriteReq(chunk, encoding, cb));
else
- doWrite(stream, state, len, chunk, encoding, cb);
+ doWrite(stream, state, false, len, chunk, encoding, cb);
return ret;
}
-function doWrite(stream, state, len, chunk, encoding, cb) {
+function doWrite(stream, state, writev, len, chunk, encoding, cb) {
state.writelen = len;
state.writecb = cb;
state.writing = true;
state.sync = true;
- stream._write(chunk, encoding, state.onwrite);
+ if (writev)
+ stream._writev(chunk, state.onwrite);
+ else
+ stream._write(chunk, encoding, state.onwrite);
state.sync = false;
}
@@ -243,8 +269,12 @@ function onwrite(stream, er) {
// Check if we're actually ready to finish, but don't emit yet
var finished = needFinish(stream, state);
- if (!finished && !state.bufferProcessing && state.buffer.length)
+ if (!finished &&
+ !state.corked &&
+ !state.bufferProcessing &&
+ state.buffer.length) {
clearBuffer(stream, state);
+ }
if (sync) {
process.nextTick(function() {
@@ -279,36 +309,56 @@ function onwriteDrain(stream, state) {
function clearBuffer(stream, state) {
state.bufferProcessing = true;
- for (var c = 0; c < state.buffer.length; c++) {
- var entry = state.buffer[c];
- var chunk = entry.chunk;
- var encoding = entry.encoding;
- var cb = entry.callback;
- var len = state.objectMode ? 1 : chunk.length;
-
- doWrite(stream, state, len, chunk, encoding, cb);
-
- // if we didn't call the onwrite immediately, then
- // it means that we need to wait until it does.
- // also, that means that the chunk and cb are currently
- // being processed, so move the buffer counter past them.
- if (state.writing) {
- c++;
- break;
+ if (stream._writev && state.buffer.length > 1) {
+ // Fast case, write everything using _writev()
+ var cbs = [];
+ for (var c = 0; c < state.buffer.length; c++)
+ cbs.push(state.buffer[c].callback);
+
+ doWrite(stream, state, true, state.length, state.buffer, '', function(err) {
+ for (var i = 0; i < cbs.length; i++)
+ cbs[i](err);
+ });
+
+ // Clear buffer
+ state.buffer = [];
+ } else {
+ // Slow case, write chunks one-by-one
+ for (var c = 0; c < state.buffer.length; c++) {
+ var entry = state.buffer[c];
+ var chunk = entry.chunk;
+ var encoding = entry.encoding;
+ var cb = entry.callback;
+ var len = state.objectMode ? 1 : chunk.length;
+
+ doWrite(stream, state, false, len, chunk, encoding, cb);
+
+ // if we didn't call the onwrite immediately, then
+ // it means that we need to wait until it does.
+ // also, that means that the chunk and cb are currently
+ // being processed, so move the buffer counter past them.
+ if (state.writing) {
+ c++;
+ break;
+ }
}
+
+ if (c < state.buffer.length)
+ state.buffer = state.buffer.slice(c);
+ else
+ state.buffer.length = 0;
}
state.bufferProcessing = false;
- if (c < state.buffer.length)
- state.buffer = state.buffer.slice(c);
- else
- state.buffer.length = 0;
}
Writable.prototype._write = function(chunk, encoding, cb) {
cb(new Error('not implemented'));
+
};
+Writable.prototype._writev = null;
+
Writable.prototype.end = function(chunk, encoding, cb) {
var state = this._writableState;
@@ -324,6 +374,9 @@ Writable.prototype.end = function(chunk, encoding, cb) {
if (typeof chunk !== 'undefined' && chunk !== null)
this.write(chunk, encoding);
+ // .end() should .uncork()
+ this.uncork();
+
// ignore unnecessary end() calls.
if (!state.ending && !state.finished)
endWritable(this, state, cb);