summaryrefslogtreecommitdiff
path: root/lib
diff options
context:
space:
mode:
authorMathias Buus <mathiasbuus@gmail.com>2018-08-21 20:05:12 +0200
committerMathias Buus <mathiasbuus@gmail.com>2018-10-30 15:17:40 +0100
commitf24b070cb7fb04df6249fab5264df2146e2b6cac (patch)
tree46438985fb9676a4e5b951a00f8a30693f25c280 /lib
parentcd1193d9ed83c37a431a19ae33bbf5e25ec15d65 (diff)
downloadandroid-node-v8-f24b070cb7fb04df6249fab5264df2146e2b6cac.tar.gz
android-node-v8-f24b070cb7fb04df6249fab5264df2146e2b6cac.tar.bz2
android-node-v8-f24b070cb7fb04df6249fab5264df2146e2b6cac.zip
stream: add auto-destroy mode
PR-URL: https://github.com/nodejs/node/pull/22795 Reviewed-By: James M Snell <jasnell@gmail.com> Reviewed-By: Matteo Collina <matteo.collina@gmail.com> Reviewed-By: Anna Henningsen <anna@addaleax.net> Reviewed-By: Jeremiah Senkpiel <fishrock123@rocketmail.com>
Diffstat (limited to 'lib')
-rw-r--r--lib/_stream_readable.js23
-rw-r--r--lib/_stream_writable.js26
-rw-r--r--lib/internal/streams/destroy.js20
3 files changed, 57 insertions, 12 deletions
diff --git a/lib/_stream_readable.js b/lib/_stream_readable.js
index 488d10a10b..2a2122e0e5 100644
--- a/lib/_stream_readable.js
+++ b/lib/_stream_readable.js
@@ -46,6 +46,7 @@ let createReadableStreamAsyncIterator;
util.inherits(Readable, Stream);
+const { errorOrDestroy } = destroyImpl;
const kProxyEvents = ['error', 'close', 'destroy', 'pause', 'resume'];
function prependListener(emitter, event, fn) {
@@ -117,6 +118,9 @@ function ReadableState(options, stream, isDuplex) {
// Should close be emitted on destroy. Defaults to true.
this.emitClose = options.emitClose !== false;
+ // Should .destroy() be called after 'end' (and potentially 'finish')
+ this.autoDestroy = !!options.autoDestroy;
+
// has it been destroyed
this.destroyed = false;
@@ -235,7 +239,7 @@ function readableAddChunk(stream, chunk, encoding, addToFront, skipChunkCheck) {
if (!skipChunkCheck)
er = chunkInvalid(state, chunk);
if (er) {
- stream.emit('error', er);
+ errorOrDestroy(stream, er);
} else if (state.objectMode || chunk && chunk.length > 0) {
if (typeof chunk !== 'string' &&
!state.objectMode &&
@@ -245,11 +249,11 @@ function readableAddChunk(stream, chunk, encoding, addToFront, skipChunkCheck) {
if (addToFront) {
if (state.endEmitted)
- stream.emit('error', new ERR_STREAM_UNSHIFT_AFTER_END_EVENT());
+ errorOrDestroy(stream, new ERR_STREAM_UNSHIFT_AFTER_END_EVENT());
else
addChunk(stream, state, chunk, true);
} else if (state.ended) {
- stream.emit('error', new ERR_STREAM_PUSH_AFTER_EOF());
+ errorOrDestroy(stream, new ERR_STREAM_PUSH_AFTER_EOF());
} else if (state.destroyed) {
return false;
} else {
@@ -581,7 +585,7 @@ function maybeReadMore_(stream, state) {
// for virtual (non-string, non-buffer) streams, "length" is somewhat
// arbitrary, and perhaps not very meaningful.
Readable.prototype._read = function(n) {
- this.emit('error', new ERR_METHOD_NOT_IMPLEMENTED('_read()'));
+ errorOrDestroy(this, new ERR_METHOD_NOT_IMPLEMENTED('_read()'));
};
Readable.prototype.pipe = function(dest, pipeOpts) {
@@ -687,7 +691,7 @@ Readable.prototype.pipe = function(dest, pipeOpts) {
unpipe();
dest.removeListener('error', onerror);
if (EE.listenerCount(dest, 'error') === 0)
- dest.emit('error', er);
+ errorOrDestroy(dest, er);
}
// Make sure our error handler is attached before userland ones.
@@ -1092,5 +1096,14 @@ function endReadableNT(state, stream) {
state.endEmitted = true;
stream.readable = false;
stream.emit('end');
+
+ if (state.autoDestroy) {
+ // In case of duplex streams we need a way to detect
+ // if the writable side is ready for autoDestroy as well
+ const wState = stream._writableState;
+ if (!wState || (wState.autoDestroy && wState.finished)) {
+ stream.destroy();
+ }
+ }
}
}
diff --git a/lib/_stream_writable.js b/lib/_stream_writable.js
index 3bad957912..160179cd0e 100644
--- a/lib/_stream_writable.js
+++ b/lib/_stream_writable.js
@@ -45,6 +45,8 @@ const {
ERR_UNKNOWN_ENCODING
} = require('internal/errors').codes;
+const { errorOrDestroy } = destroyImpl;
+
util.inherits(Writable, Stream);
function nop() {}
@@ -147,6 +149,9 @@ function WritableState(options, stream, isDuplex) {
// Should close be emitted on destroy. Defaults to true.
this.emitClose = options.emitClose !== false;
+ // Should .destroy() be called after 'finish' (and potentially 'end')
+ this.autoDestroy = !!options.autoDestroy;
+
// count buffered requests
this.bufferedRequestCount = 0;
@@ -235,14 +240,14 @@ function Writable(options) {
// Otherwise people can pipe Writable streams, which is just wrong.
Writable.prototype.pipe = function() {
- this.emit('error', new ERR_STREAM_CANNOT_PIPE());
+ errorOrDestroy(this, new ERR_STREAM_CANNOT_PIPE());
};
function writeAfterEnd(stream, cb) {
var er = new ERR_STREAM_WRITE_AFTER_END();
// TODO: defer error events consistently everywhere, not just the cb
- stream.emit('error', er);
+ errorOrDestroy(stream, er);
process.nextTick(cb, er);
}
@@ -258,7 +263,7 @@ function validChunk(stream, state, chunk, cb) {
er = new ERR_INVALID_ARG_TYPE('chunk', ['string', 'Buffer'], chunk);
}
if (er) {
- stream.emit('error', er);
+ errorOrDestroy(stream, er);
process.nextTick(cb, er);
return false;
}
@@ -422,13 +427,13 @@ function onwriteError(stream, state, sync, er, cb) {
// after error
process.nextTick(finishMaybe, stream, state);
stream._writableState.errorEmitted = true;
- stream.emit('error', er);
+ errorOrDestroy(stream, er);
} else {
// the caller expect this to happen before if
// it is async
cb(er);
stream._writableState.errorEmitted = true;
- stream.emit('error', er);
+ errorOrDestroy(stream, er);
// this can emit finish, but finish must
// always follow error
finishMaybe(stream, state);
@@ -612,7 +617,7 @@ function callFinal(stream, state) {
stream._final((err) => {
state.pendingcb--;
if (err) {
- stream.emit('error', err);
+ errorOrDestroy(stream, err);
}
state.prefinished = true;
stream.emit('prefinish');
@@ -639,6 +644,15 @@ function finishMaybe(stream, state) {
if (state.pendingcb === 0) {
state.finished = true;
stream.emit('finish');
+
+ if (state.autoDestroy) {
+ // In case of duplex streams we need a way to detect
+ // if the readable side is ready for autoDestroy as well
+ const rState = stream._readableState;
+ if (!rState || (rState.autoDestroy && rState.endEmitted)) {
+ stream.destroy();
+ }
+ }
}
}
return need;
diff --git a/lib/internal/streams/destroy.js b/lib/internal/streams/destroy.js
index 3a0383cc3c..ce9d2545e4 100644
--- a/lib/internal/streams/destroy.js
+++ b/lib/internal/streams/destroy.js
@@ -82,7 +82,25 @@ function emitErrorNT(self, err) {
self.emit('error', err);
}
+function errorOrDestroy(stream, err) {
+ // We have tests that rely on errors being emitted
+ // in the same tick, so changing this is semver major.
+ // For now when you opt-in to autoDestroy we allow
+ // the error to be emitted nextTick. In a future
+ // semver major update we should change the default to this.
+
+ const rState = stream._readableState;
+ const wState = stream._writableState;
+
+ if ((rState && rState.autoDestroy) || (wState && wState.autoDestroy))
+ stream.destroy(err);
+ else
+ stream.emit('error', err);
+}
+
+
module.exports = {
destroy,
- undestroy
+ undestroy,
+ errorOrDestroy
};