summaryrefslogtreecommitdiff
path: root/lib
diff options
context:
space:
mode:
authorMathias Buus <mathiasbuus@gmail.com>2018-01-29 19:32:34 +0100
committerMatteo Collina <hello@matteocollina.com>2018-03-06 13:31:56 +0100
commit5e3f51648ed5de36b01d53bde13fb6fb7b965667 (patch)
treea134fb92b8b846e195ea3f960131fcc9147533e6 /lib
parentacac0f852a02c2b129adbc51e0bd8bd482d791af (diff)
downloadandroid-node-v8-5e3f51648ed5de36b01d53bde13fb6fb7b965667.tar.gz
android-node-v8-5e3f51648ed5de36b01d53bde13fb6fb7b965667.tar.bz2
android-node-v8-5e3f51648ed5de36b01d53bde13fb6fb7b965667.zip
stream: updated streams error handling
This improves error handling for streams in a few ways. 1. It ensures that no user defined methods (_read, _write, ...) are run after .destroy has been called. 2. It introduces an explicit error to tell the user if they are write to write, etc to the stream after it has been destroyed. 3. It makes streams always emit close as the last thing after they have been destroyed 4. Changes the default _destroy to not gracefully end streams. It also updates net, http2, zlib and fs to the new error handling. PR-URL: https://github.com/nodejs/node/pull/18438 Reviewed-By: Matteo Collina <matteo.collina@gmail.com> Reviewed-By: James M Snell <jasnell@gmail.com> Reviewed-By: Ruben Bridgewater <ruben@bridgewater.de> Reviewed-By: Anna Henningsen <anna@addaleax.net>
Diffstat (limited to 'lib')
-rw-r--r--lib/_stream_duplex.js7
-rw-r--r--lib/_stream_readable.js6
-rw-r--r--lib/_stream_transform.js3
-rw-r--r--lib/_stream_writable.js10
-rw-r--r--lib/fs.js6
-rw-r--r--lib/internal/errors.js2
-rw-r--r--lib/internal/http2/core.js1
-rw-r--r--lib/internal/streams/destroy.js9
-rw-r--r--lib/net.js5
-rw-r--r--lib/zlib.js9
10 files changed, 37 insertions, 21 deletions
diff --git a/lib/_stream_duplex.js b/lib/_stream_duplex.js
index 59ce832927..1ccb931260 100644
--- a/lib/_stream_duplex.js
+++ b/lib/_stream_duplex.js
@@ -135,10 +135,3 @@ Object.defineProperty(Duplex.prototype, 'destroyed', {
this._writableState.destroyed = value;
}
});
-
-Duplex.prototype._destroy = function(err, cb) {
- this.push(null);
- this.end();
-
- process.nextTick(cb, err);
-};
diff --git a/lib/_stream_readable.js b/lib/_stream_readable.js
index ba231ccda9..5781dfd471 100644
--- a/lib/_stream_readable.js
+++ b/lib/_stream_readable.js
@@ -106,6 +106,9 @@ function ReadableState(options, stream) {
this.readableListening = false;
this.resumeScheduled = false;
+ // Should close be emitted on destroy. Defaults to true.
+ this.emitClose = options.emitClose !== false;
+
// has it been destroyed
this.destroyed = false;
@@ -177,7 +180,6 @@ Object.defineProperty(Readable.prototype, 'destroyed', {
Readable.prototype.destroy = destroyImpl.destroy;
Readable.prototype._undestroy = destroyImpl.undestroy;
Readable.prototype._destroy = function(err, cb) {
- this.push(null);
cb(err);
};
@@ -236,6 +238,8 @@ function readableAddChunk(stream, chunk, encoding, addToFront, skipChunkCheck) {
addChunk(stream, state, chunk, true);
} else if (state.ended) {
stream.emit('error', new errors.Error('ERR_STREAM_PUSH_AFTER_EOF'));
+ } else if (state.destroyed) {
+ return false;
} else {
state.reading = false;
if (state.decoder && !encoding) {
diff --git a/lib/_stream_transform.js b/lib/_stream_transform.js
index a9fcddda2d..b82114ecae 100644
--- a/lib/_stream_transform.js
+++ b/lib/_stream_transform.js
@@ -132,7 +132,7 @@ function Transform(options) {
}
function prefinish() {
- if (typeof this._flush === 'function') {
+ if (typeof this._flush === 'function' && !this._readableState.destroyed) {
this._flush((er, data) => {
done(this, er, data);
});
@@ -194,7 +194,6 @@ Transform.prototype._read = function(n) {
Transform.prototype._destroy = function(err, cb) {
Duplex.prototype._destroy.call(this, err, (err2) => {
cb(err2);
- this.emit('close');
});
};
diff --git a/lib/_stream_writable.js b/lib/_stream_writable.js
index 2b76588135..d5cfe07f17 100644
--- a/lib/_stream_writable.js
+++ b/lib/_stream_writable.js
@@ -134,6 +134,9 @@ function WritableState(options, stream) {
// True if the error was already emitted and should not be thrown again
this.errorEmitted = false;
+ // Should close be emitted on destroy. Defaults to true.
+ this.emitClose = options.emitClose !== false;
+
// count buffered requests
this.bufferedRequestCount = 0;
@@ -390,7 +393,9 @@ function doWrite(stream, state, writev, len, chunk, encoding, cb) {
state.writecb = cb;
state.writing = true;
state.sync = true;
- if (writev)
+ if (state.destroyed)
+ state.onwrite(new errors.Error('ERR_STREAM_DESTROYED', 'write'));
+ else if (writev)
stream._writev(chunk, state.onwrite);
else
stream._write(chunk, encoding, state.onwrite);
@@ -604,7 +609,7 @@ function callFinal(stream, state) {
}
function prefinish(stream, state) {
if (!state.prefinished && !state.finalCalled) {
- if (typeof stream._final === 'function') {
+ if (typeof stream._final === 'function' && !state.destroyed) {
state.pendingcb++;
state.finalCalled = true;
process.nextTick(callFinal, stream, state);
@@ -681,6 +686,5 @@ Object.defineProperty(Writable.prototype, 'destroyed', {
Writable.prototype.destroy = destroyImpl.destroy;
Writable.prototype._undestroy = destroyImpl.undestroy;
Writable.prototype._destroy = function(err, cb) {
- this.end();
cb(err);
};
diff --git a/lib/fs.js b/lib/fs.js
index 3771efad10..917c3eb3a9 100644
--- a/lib/fs.js
+++ b/lib/fs.js
@@ -1929,6 +1929,9 @@ function ReadStream(path, options) {
if (options.highWaterMark === undefined)
options.highWaterMark = 64 * 1024;
+ // for backwards compat do not emit close on destroy.
+ options.emitClose = false;
+
Readable.call(this, options);
// path will be ignored when fd is specified, so it can be falsy
@@ -2084,6 +2087,9 @@ function WriteStream(path, options) {
options = copyObject(getOptions(options, {}));
+ // for backwards compat do not emit close on destroy.
+ options.emitClose = false;
+
Writable.call(this, options);
// path will be ignored when fd is specified, so it can be falsy
diff --git a/lib/internal/errors.js b/lib/internal/errors.js
index a4a79d671e..11f32ccdc1 100644
--- a/lib/internal/errors.js
+++ b/lib/internal/errors.js
@@ -843,6 +843,7 @@ E('ERR_SOCKET_DGRAM_NOT_RUNNING', 'Not running', Error);
E('ERR_STDERR_CLOSE', 'process.stderr cannot be closed', Error);
E('ERR_STDOUT_CLOSE', 'process.stdout cannot be closed', Error);
E('ERR_STREAM_CANNOT_PIPE', 'Cannot pipe, not readable', Error);
+E('ERR_STREAM_DESTROYED', 'Cannot call %s after a stream was destroyed');
E('ERR_STREAM_NULL_VALUES', 'May not write null values to stream', TypeError);
E('ERR_STREAM_PUSH_AFTER_EOF', 'stream.push() after EOF', Error);
E('ERR_STREAM_READ_NOT_IMPLEMENTED', '_read() is not implemented', Error);
@@ -908,7 +909,6 @@ E('ERR_VM_MODULE_NOT_LINKED',
E('ERR_VM_MODULE_NOT_MODULE',
'Provided module is not an instance of Module', Error);
E('ERR_VM_MODULE_STATUS', 'Module status %s', Error);
-E('ERR_ZLIB_BINDING_CLOSED', 'zlib binding closed', Error);
E('ERR_ZLIB_INITIALIZATION_FAILED', 'Initialization failed', Error);
function sysError(code, syscall, path, dest,
diff --git a/lib/internal/http2/core.js b/lib/internal/http2/core.js
index 71bb55ee23..f60c6388af 100644
--- a/lib/internal/http2/core.js
+++ b/lib/internal/http2/core.js
@@ -1475,6 +1475,7 @@ class Http2Stream extends Duplex {
constructor(session, options) {
options.allowHalfOpen = true;
options.decodeStrings = false;
+ options.emitClose = false;
super(options);
this[async_id_symbol] = -1;
diff --git a/lib/internal/streams/destroy.js b/lib/internal/streams/destroy.js
index 985332ac46..5d29e18204 100644
--- a/lib/internal/streams/destroy.js
+++ b/lib/internal/streams/destroy.js
@@ -30,6 +30,7 @@ function destroy(err, cb) {
}
this._destroy(err || null, (err) => {
+ process.nextTick(emitCloseNT, this);
if (!cb && err) {
process.nextTick(emitErrorNT, this, err);
if (this._writableState) {
@@ -43,6 +44,14 @@ function destroy(err, cb) {
return this;
}
+function emitCloseNT(self) {
+ if (self._writableState && !self._writableState.emitClose)
+ return;
+ if (self._readableState && !self._readableState.emitClose)
+ return;
+ self.emit('close');
+}
+
function undestroy() {
if (this._readableState) {
this._readableState.destroyed = false;
diff --git a/lib/net.js b/lib/net.js
index 7583fcb27d..f2cb423f30 100644
--- a/lib/net.js
+++ b/lib/net.js
@@ -232,6 +232,11 @@ function Socket(options) {
options = { fd: options }; // Legacy interface.
else if (options === undefined)
options = {};
+ else
+ options = util._extend({}, options);
+
+ // For backwards compat do not emit close on destroy.
+ options.emitClose = false;
stream.Duplex.call(this, options);
diff --git a/lib/zlib.js b/lib/zlib.js
index 93f878712a..4adfd1ffa2 100644
--- a/lib/zlib.js
+++ b/lib/zlib.js
@@ -25,7 +25,6 @@ const {
ERR_BUFFER_TOO_LARGE,
ERR_INVALID_ARG_TYPE,
ERR_OUT_OF_RANGE,
- ERR_ZLIB_BINDING_CLOSED,
ERR_ZLIB_INITIALIZATION_FAILED
} = require('internal/errors').codes;
const Transform = require('_stream_transform');
@@ -392,7 +391,7 @@ Zlib.prototype.flush = function flush(kind, callback) {
Zlib.prototype.close = function close(callback) {
_close(this, callback);
- process.nextTick(emitCloseNT, this);
+ this.destroy();
};
Zlib.prototype._transform = function _transform(chunk, encoding, cb) {
@@ -510,7 +509,7 @@ function processChunkSync(self, chunk, flushFlag) {
function processChunk(self, chunk, flushFlag, cb) {
var handle = self._handle;
if (!handle)
- return cb(new ERR_ZLIB_BINDING_CLOSED());
+ assert(false, 'zlib binding closed');
handle.buffer = chunk;
handle.cb = cb;
@@ -603,10 +602,6 @@ function _close(engine, callback) {
engine._handle = null;
}
-function emitCloseNT(self) {
- self.emit('close');
-}
-
// generic zlib
// minimal 2-byte header
function Deflate(opts) {