summaryrefslogtreecommitdiff
path: root/lib
diff options
context:
space:
mode:
authorRobert Nagy <ronagy@icloud.com>2019-08-24 16:33:46 +0200
committerRich Trott <rtrott@gmail.com>2019-09-30 10:56:29 -0700
commitf663b31cc2aecd585e73430504f3d7f5252851ca (patch)
treeb727bf953711f9f83fa9b7dca13e30f07d17d694 /lib
parent634a9a97f4b380390352543452aed6c7c9defcb4 (diff)
downloadandroid-node-v8-f663b31cc2aecd585e73430504f3d7f5252851ca.tar.gz
android-node-v8-f663b31cc2aecd585e73430504f3d7f5252851ca.tar.bz2
android-node-v8-f663b31cc2aecd585e73430504f3d7f5252851ca.zip
stream: always invoke callback before emitting error
Ensure the callback is always invoked before emitting the error in both sync and async case. PR-URL: https://github.com/nodejs/node/pull/29293 Reviewed-By: Matteo Collina <matteo.collina@gmail.com> Reviewed-By: James M Snell <jasnell@gmail.com>
Diffstat (limited to 'lib')
-rw-r--r--lib/_stream_writable.js37
-rw-r--r--lib/internal/streams/destroy.js13
2 files changed, 33 insertions, 17 deletions
diff --git a/lib/_stream_writable.js b/lib/_stream_writable.js
index c6d895ff5d..9b75b672cb 100644
--- a/lib/_stream_writable.js
+++ b/lib/_stream_writable.js
@@ -158,6 +158,11 @@ function WritableState(options, stream, isDuplex) {
// Should .destroy() be called after 'finish' (and potentially 'end')
this.autoDestroy = !!(options && options.autoDestroy);
+ // Indicates whether the stream has errored. When true all write() calls
+ // should return false. This is needed since when autoDestroy
+ // is disabled we need a way to tell whether the stream has failed.
+ this.errored = false;
+
// Count buffered requests
this.bufferedRequestCount = 0;
@@ -401,7 +406,7 @@ function writeOrBuffer(stream, state, isBuf, chunk, encoding, cb) {
if (!ret)
state.needDrain = true;
- if (state.writing || state.corked) {
+ if (state.writing || state.corked || state.errored) {
var last = state.lastBufferedRequest;
state.lastBufferedRequest = {
chunk,
@@ -420,7 +425,9 @@ function writeOrBuffer(stream, state, isBuf, chunk, encoding, cb) {
doWrite(stream, state, false, len, chunk, encoding, cb);
}
- return ret;
+ // Return false if errored or destroyed in order to break
+ // any synchronous while(stream.write(data)) loops.
+ return ret && !state.errored && !state.destroyed;
}
function doWrite(stream, state, writev, len, chunk, encoding, cb) {
@@ -437,18 +444,11 @@ function doWrite(stream, state, writev, len, chunk, encoding, cb) {
state.sync = false;
}
-function onwriteError(stream, state, sync, er, cb) {
+function onwriteError(stream, state, er, cb) {
--state.pendingcb;
- if (sync) {
- // Defer the callback if we are being called synchronously
- // to avoid piling up things on the stack
- process.nextTick(cb, er);
- } else {
- // The caller expect this to happen before if
- // it is async
- cb(er);
- }
+ cb(er);
+ // This can emit error, but error must always follow cb.
errorOrDestroy(stream, er);
}
@@ -465,9 +465,14 @@ function onwrite(stream, er) {
state.length -= state.writelen;
state.writelen = 0;
- if (er)
- onwriteError(stream, state, sync, er, cb);
- else {
+ if (er) {
+ state.errored = true;
+ if (sync) {
+ process.nextTick(onwriteError, stream, state, er, cb);
+ } else {
+ onwriteError(stream, state, er, cb);
+ }
+ } else {
// Check if we're actually ready to finish, but don't emit yet
var finished = needFinish(state) || stream.destroyed;
@@ -622,7 +627,7 @@ Object.defineProperty(Writable.prototype, 'writableLength', {
function needFinish(state) {
return (state.ending &&
state.length === 0 &&
- !state.errorEmitted &&
+ !state.errored &&
state.bufferedRequest === null &&
!state.finished &&
!state.writing);
diff --git a/lib/internal/streams/destroy.js b/lib/internal/streams/destroy.js
index 27985482ce..8708ca022c 100644
--- a/lib/internal/streams/destroy.js
+++ b/lib/internal/streams/destroy.js
@@ -27,6 +27,10 @@ function destroy(err, cb) {
const r = this._readableState;
const w = this._writableState;
+ if (w && err) {
+ w.errored = true;
+ }
+
if ((w && w.destroyed) || (r && r.destroyed)) {
if (cb) {
cb(err);
@@ -50,10 +54,12 @@ function destroy(err, cb) {
this._destroy(err || null, (err) => {
const emitClose = (w && w.emitClose) || (r && r.emitClose);
if (cb) {
+ // Invoke callback before scheduling emitClose so that callback
+ // can schedule before.
+ cb(err);
if (emitClose) {
process.nextTick(emitCloseNT, this);
}
- cb(err);
} else if (needError(this, err)) {
process.nextTick(emitClose ? emitErrorCloseNT : emitErrorNT, this, err);
} else if (emitClose) {
@@ -91,6 +97,7 @@ function undestroy() {
if (w) {
w.destroyed = false;
+ w.errored = false;
w.ended = false;
w.ending = false;
w.finalCalled = false;
@@ -110,6 +117,10 @@ function errorOrDestroy(stream, err) {
const r = stream._readableState;
const w = stream._writableState;
+ if (w & err) {
+ w.errored = true;
+ }
+
if ((r && r.autoDestroy) || (w && w.autoDestroy))
stream.destroy(err);
else if (needError(stream, err))