diff options
author | Robert Nagy <ronagy@icloud.com> | 2019-08-24 16:33:46 +0200 |
---|---|---|
committer | Rich Trott <rtrott@gmail.com> | 2019-09-30 10:56:29 -0700 |
commit | f663b31cc2aecd585e73430504f3d7f5252851ca (patch) | |
tree | b727bf953711f9f83fa9b7dca13e30f07d17d694 /lib | |
parent | 634a9a97f4b380390352543452aed6c7c9defcb4 (diff) | |
download | android-node-v8-f663b31cc2aecd585e73430504f3d7f5252851ca.tar.gz android-node-v8-f663b31cc2aecd585e73430504f3d7f5252851ca.tar.bz2 android-node-v8-f663b31cc2aecd585e73430504f3d7f5252851ca.zip |
stream: always invoke callback before emitting error
Ensure the callback is always invoked before emitting
the error in both sync and async case.
PR-URL: https://github.com/nodejs/node/pull/29293
Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
Reviewed-By: James M Snell <jasnell@gmail.com>
Diffstat (limited to 'lib')
-rw-r--r-- | lib/_stream_writable.js | 37 | ||||
-rw-r--r-- | lib/internal/streams/destroy.js | 13 |
2 files changed, 33 insertions, 17 deletions
diff --git a/lib/_stream_writable.js b/lib/_stream_writable.js index c6d895ff5d..9b75b672cb 100644 --- a/lib/_stream_writable.js +++ b/lib/_stream_writable.js @@ -158,6 +158,11 @@ function WritableState(options, stream, isDuplex) { // Should .destroy() be called after 'finish' (and potentially 'end') this.autoDestroy = !!(options && options.autoDestroy); + // Indicates whether the stream has errored. When true all write() calls + // should return false. This is needed since when autoDestroy + // is disabled we need a way to tell whether the stream has failed. + this.errored = false; + // Count buffered requests this.bufferedRequestCount = 0; @@ -401,7 +406,7 @@ function writeOrBuffer(stream, state, isBuf, chunk, encoding, cb) { if (!ret) state.needDrain = true; - if (state.writing || state.corked) { + if (state.writing || state.corked || state.errored) { var last = state.lastBufferedRequest; state.lastBufferedRequest = { chunk, @@ -420,7 +425,9 @@ function writeOrBuffer(stream, state, isBuf, chunk, encoding, cb) { doWrite(stream, state, false, len, chunk, encoding, cb); } - return ret; + // Return false if errored or destroyed in order to break + // any synchronous while(stream.write(data)) loops. + return ret && !state.errored && !state.destroyed; } function doWrite(stream, state, writev, len, chunk, encoding, cb) { @@ -437,18 +444,11 @@ function doWrite(stream, state, writev, len, chunk, encoding, cb) { state.sync = false; } -function onwriteError(stream, state, sync, er, cb) { +function onwriteError(stream, state, er, cb) { --state.pendingcb; - if (sync) { - // Defer the callback if we are being called synchronously - // to avoid piling up things on the stack - process.nextTick(cb, er); - } else { - // The caller expect this to happen before if - // it is async - cb(er); - } + cb(er); + // This can emit error, but error must always follow cb. errorOrDestroy(stream, er); } @@ -465,9 +465,14 @@ function onwrite(stream, er) { state.length -= state.writelen; state.writelen = 0; - if (er) - onwriteError(stream, state, sync, er, cb); - else { + if (er) { + state.errored = true; + if (sync) { + process.nextTick(onwriteError, stream, state, er, cb); + } else { + onwriteError(stream, state, er, cb); + } + } else { // Check if we're actually ready to finish, but don't emit yet var finished = needFinish(state) || stream.destroyed; @@ -622,7 +627,7 @@ Object.defineProperty(Writable.prototype, 'writableLength', { function needFinish(state) { return (state.ending && state.length === 0 && - !state.errorEmitted && + !state.errored && state.bufferedRequest === null && !state.finished && !state.writing); diff --git a/lib/internal/streams/destroy.js b/lib/internal/streams/destroy.js index 27985482ce..8708ca022c 100644 --- a/lib/internal/streams/destroy.js +++ b/lib/internal/streams/destroy.js @@ -27,6 +27,10 @@ function destroy(err, cb) { const r = this._readableState; const w = this._writableState; + if (w && err) { + w.errored = true; + } + if ((w && w.destroyed) || (r && r.destroyed)) { if (cb) { cb(err); @@ -50,10 +54,12 @@ function destroy(err, cb) { this._destroy(err || null, (err) => { const emitClose = (w && w.emitClose) || (r && r.emitClose); if (cb) { + // Invoke callback before scheduling emitClose so that callback + // can schedule before. + cb(err); if (emitClose) { process.nextTick(emitCloseNT, this); } - cb(err); } else if (needError(this, err)) { process.nextTick(emitClose ? emitErrorCloseNT : emitErrorNT, this, err); } else if (emitClose) { @@ -91,6 +97,7 @@ function undestroy() { if (w) { w.destroyed = false; + w.errored = false; w.ended = false; w.ending = false; w.finalCalled = false; @@ -110,6 +117,10 @@ function errorOrDestroy(stream, err) { const r = stream._readableState; const w = stream._writableState; + if (w & err) { + w.errored = true; + } + if ((r && r.autoDestroy) || (w && w.autoDestroy)) stream.destroy(err); else if (needError(stream, err)) |