aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--lib/_stream_transform.js2
-rw-r--r--lib/_stream_writable.js43
-rw-r--r--test/simple/test-stream2-writable.js16
3 files changed, 53 insertions, 8 deletions
diff --git a/lib/_stream_transform.js b/lib/_stream_transform.js
index 8a00d343b6..958bfa1148 100644
--- a/lib/_stream_transform.js
+++ b/lib/_stream_transform.js
@@ -125,7 +125,7 @@ function Transform(options) {
// sync guard flag.
this._readableState.sync = false;
- this.once('finish', function() {
+ this.once('prefinish', function() {
if ('function' === typeof this._flush)
this._flush(function(er) {
done(stream, er);
diff --git a/lib/_stream_writable.js b/lib/_stream_writable.js
index 9042df721b..f0034e3ba5 100644
--- a/lib/_stream_writable.js
+++ b/lib/_stream_writable.js
@@ -102,6 +102,14 @@ function WritableState(options, stream) {
this.writelen = 0;
this.buffer = [];
+
+ // number of pending user-supplied write callbacks
+ // this must be 0 before 'finish' can be emitted
+ this.pendingcb = 0;
+
+ // emit prefinish if the only thing we're waiting for is _write cbs
+ // This is relevant for synchronous Transform streams
+ this.prefinished = false;
}
function Writable(options) {
@@ -171,8 +179,10 @@ Writable.prototype.write = function(chunk, encoding, cb) {
if (state.ended)
writeAfterEnd(this, state, cb);
- else if (validChunk(this, state, chunk, cb))
+ else if (validChunk(this, state, chunk, cb)) {
+ state.pendingcb++;
ret = writeOrBuffer(this, state, chunk, encoding, cb);
+ }
return ret;
};
@@ -241,10 +251,13 @@ function doWrite(stream, state, writev, len, chunk, encoding, cb) {
function onwriteError(stream, state, sync, er, cb) {
if (sync)
process.nextTick(function() {
+ state.pendingcb--;
cb(er);
});
- else
+ else {
+ state.pendingcb--;
cb(er);
+ }
stream.emit('error', er);
}
@@ -289,9 +302,9 @@ function onwrite(stream, er) {
function afterWrite(stream, state, finished, cb) {
if (!finished)
onwriteDrain(stream, state);
+ state.pendingcb--;
cb();
- if (finished)
- finishMaybe(stream, state);
+ finishMaybe(stream, state);
}
// Must force callback to be called on nextTick, so that we don't
@@ -315,9 +328,14 @@ function clearBuffer(stream, state) {
for (var c = 0; c < state.buffer.length; c++)
cbs.push(state.buffer[c].callback);
+ // count the one we are adding, as well.
+ // TODO(isaacs) clean this up
+ state.pendingcb++;
doWrite(stream, state, true, state.length, state.buffer, '', function(err) {
- for (var i = 0; i < cbs.length; i++)
+ for (var i = 0; i < cbs.length; i++) {
+ state.pendingcb--;
cbs[i](err);
+ }
});
// Clear buffer
@@ -390,11 +408,22 @@ function needFinish(stream, state) {
!state.writing);
}
+function prefinish(stream, state) {
+ if (!state.prefinished) {
+ state.prefinished = true;
+ stream.emit('prefinish');
+ }
+}
+
function finishMaybe(stream, state) {
var need = needFinish(stream, state);
if (need) {
- state.finished = true;
- stream.emit('finish');
+ if (state.pendingcb === 0) {
+ prefinish(stream, state);
+ state.finished = true;
+ stream.emit('finish');
+ } else
+ prefinish(stream, state);
}
return need;
}
diff --git a/test/simple/test-stream2-writable.js b/test/simple/test-stream2-writable.js
index e0f384cb2a..704100c0da 100644
--- a/test/simple/test-stream2-writable.js
+++ b/test/simple/test-stream2-writable.js
@@ -375,3 +375,19 @@ test('finish does not come before write cb', function(t) {
w.write(Buffer(0));
w.end();
});
+
+test('finish does not come before sync _write cb', function(t) {
+ var w = new W();
+ var writeCb = false;
+ w._write = function(chunk, e, cb) {
+ cb();
+ };
+ w.on('finish', function() {
+ assert(writeCb);
+ t.end();
+ });
+ w.write(Buffer(0), function(er) {
+ writeCb = true;
+ });
+ w.end();
+});