summaryrefslogtreecommitdiff
path: root/lib
diff options
context:
space:
mode:
authorMatteo Collina <hello@matteocollina.com>2017-05-06 14:20:52 +0200
committerMatteo Collina <hello@matteocollina.com>2017-05-22 08:34:14 +0200
commit330c8d743e33a83f85389ea8a64e3d3854ea0048 (patch)
treee3f72136564bc2637e1043133b9f99713d4753b2 /lib
parentd54ec726cc9d0d293e61dc0dba61a09429e4cbaf (diff)
downloadandroid-node-v8-330c8d743e33a83f85389ea8a64e3d3854ea0048.tar.gz
android-node-v8-330c8d743e33a83f85389ea8a64e3d3854ea0048.tar.bz2
android-node-v8-330c8d743e33a83f85389ea8a64e3d3854ea0048.zip
stream: add destroy and _destroy methods.
Adds destroy() and _destroy() methods to Readable, Writable, Duplex and Transform. It also standardizes the behavior and the implementation of destroy(), which has been inconsistent in userland and core. This PR also updates all the subsystems of core to use the new destroy(). PR-URL: https://github.com/nodejs/node/pull/12925 Reviewed-By: James M Snell <jasnell@gmail.com> Reviewed-By: Calvin Metcalf <calvin.metcalf@gmail.com> Reviewed-By: Colin Ihrig <cjihrig@gmail.com>
Diffstat (limited to 'lib')
-rw-r--r--lib/_stream_duplex.js30
-rw-r--r--lib/_stream_readable.js40
-rw-r--r--lib/_stream_transform.js8
-rw-r--r--lib/_stream_writable.js34
-rw-r--r--lib/fs.js11
-rw-r--r--lib/internal/process/stdio.js12
-rw-r--r--lib/internal/streams/destroy.js65
-rw-r--r--lib/net.js62
8 files changed, 203 insertions, 59 deletions
diff --git a/lib/_stream_duplex.js b/lib/_stream_duplex.js
index 4422b62aac..7440cd0872 100644
--- a/lib/_stream_duplex.js
+++ b/lib/_stream_duplex.js
@@ -76,3 +76,33 @@ function onend() {
function onEndNT(self) {
self.end();
}
+
+Object.defineProperty(Duplex.prototype, 'destroyed', {
+ get() {
+ if (this._readableState === undefined ||
+ this._writableState === undefined) {
+ return false;
+ }
+ return this._readableState.destroyed && this._writableState.destroyed;
+ },
+ set(value) {
+ // we ignore the value if the stream
+ // has not been initialized yet
+ if (this._readableState === undefined ||
+ this._writableState === undefined) {
+ return;
+ }
+
+ // backward compatibility, the user is explicitly
+ // managing destroyed
+ this._readableState.destroyed = value;
+ this._writableState.destroyed = value;
+ }
+});
+
+Duplex.prototype._destroy = function(err, cb) {
+ this.push(null);
+ this.end();
+
+ process.nextTick(cb, err);
+};
diff --git a/lib/_stream_readable.js b/lib/_stream_readable.js
index 702d87b549..8b0d45cc86 100644
--- a/lib/_stream_readable.js
+++ b/lib/_stream_readable.js
@@ -30,6 +30,7 @@ const Buffer = require('buffer').Buffer;
const util = require('util');
const debug = util.debuglog('stream');
const BufferList = require('internal/streams/BufferList');
+const destroyImpl = require('internal/streams/destroy');
var StringDecoder;
util.inherits(Readable, Stream);
@@ -99,6 +100,9 @@ function ReadableState(options, stream) {
this.readableListening = false;
this.resumeScheduled = false;
+ // has it been destroyed
+ this.destroyed = false;
+
// Crypto is kind of old and crusty. Historically, its default string
// encoding is 'binary' so we have to make this configurable.
// Everything else in the universe uses 'utf8', though.
@@ -129,12 +133,44 @@ function Readable(options) {
// legacy
this.readable = true;
- if (options && typeof options.read === 'function')
- this._read = options.read;
+ if (options) {
+ if (typeof options.read === 'function')
+ this._read = options.read;
+
+ if (typeof options.destroy === 'function')
+ this._destroy = options.destroy;
+ }
Stream.call(this);
}
+Object.defineProperty(Readable.prototype, 'destroyed', {
+ get() {
+ if (this._readableState === undefined) {
+ return false;
+ }
+ return this._readableState.destroyed;
+ },
+ set(value) {
+ // we ignore the value if the stream
+ // has not been initialized yet
+ if (!this._readableState) {
+ return;
+ }
+
+ // backward compatibility, the user is explicitly
+ // managing destroyed
+ this._readableState.destroyed = value;
+ }
+});
+
+Readable.prototype.destroy = destroyImpl.destroy;
+Readable.prototype._undestroy = destroyImpl.undestroy;
+Readable.prototype._destroy = function(err, cb) {
+ this.push(null);
+ cb(err);
+};
+
// Manually shove something into the read() buffer.
// This returns true if the highWaterMark has not been hit yet,
// similar to how Writable.write() returns true if you should
diff --git a/lib/_stream_transform.js b/lib/_stream_transform.js
index 8adf3ed12d..63f65f34ce 100644
--- a/lib/_stream_transform.js
+++ b/lib/_stream_transform.js
@@ -194,6 +194,14 @@ Transform.prototype._read = function(n) {
};
+Transform.prototype._destroy = function(err, cb) {
+ Duplex.prototype._destroy.call(this, err, (err2) => {
+ cb(err2);
+ this.emit('close');
+ });
+};
+
+
function done(stream, er, data) {
if (er)
return stream.emit('error', er);
diff --git a/lib/_stream_writable.js b/lib/_stream_writable.js
index c3c2ce0710..4e2a19f12c 100644
--- a/lib/_stream_writable.js
+++ b/lib/_stream_writable.js
@@ -32,6 +32,7 @@ const util = require('util');
const internalUtil = require('internal/util');
const Stream = require('stream');
const Buffer = require('buffer').Buffer;
+const destroyImpl = require('internal/streams/destroy');
util.inherits(Writable, Stream);
@@ -66,6 +67,9 @@ function WritableState(options, stream) {
// when 'finish' is emitted
this.finished = false;
+ // has it been destroyed
+ this.destroyed = false;
+
// should we decode strings into buffers before passing to _write?
// this is here so that some node-core streams can optimize string
// handling at a lower level.
@@ -192,6 +196,9 @@ function Writable(options) {
if (typeof options.writev === 'function')
this._writev = options.writev;
+
+ if (typeof options.destroy === 'function')
+ this._destroy = options.destroy;
}
Stream.call(this);
@@ -563,3 +570,30 @@ function onCorkedFinish(corkReq, state, err) {
state.corkedRequestsFree = corkReq;
}
}
+
+Object.defineProperty(Writable.prototype, 'destroyed', {
+ get() {
+ if (this._writableState === undefined) {
+ return false;
+ }
+ return this._writableState.destroyed;
+ },
+ set(value) {
+ // we ignore the value if the stream
+ // has not been initialized yet
+ if (!this._writableState) {
+ return;
+ }
+
+ // backward compatibility, the user is explicitly
+ // managing destroyed
+ this._writableState.destroyed = value;
+ }
+});
+
+Writable.prototype.destroy = destroyImpl.destroy;
+Writable.prototype._undestroy = destroyImpl.undestroy;
+Writable.prototype._destroy = function(err, cb) {
+ this.end();
+ cb(err);
+};
diff --git a/lib/fs.js b/lib/fs.js
index 0f24ee2bb8..de5d6dcfb0 100644
--- a/lib/fs.js
+++ b/lib/fs.js
@@ -1986,11 +1986,10 @@ ReadStream.prototype._read = function(n) {
};
-ReadStream.prototype.destroy = function() {
- if (this.destroyed)
- return;
- this.destroyed = true;
- this.close();
+ReadStream.prototype._destroy = function(err, cb) {
+ this.close(function(err2) {
+ cb(err || err2);
+ });
};
@@ -2157,7 +2156,7 @@ WriteStream.prototype._writev = function(data, cb) {
};
-WriteStream.prototype.destroy = ReadStream.prototype.destroy;
+WriteStream.prototype._destroy = ReadStream.prototype._destroy;
WriteStream.prototype.close = ReadStream.prototype.close;
// There is no shutdown() for files.
diff --git a/lib/internal/process/stdio.js b/lib/internal/process/stdio.js
index adfe1e8e0b..db544b1533 100644
--- a/lib/internal/process/stdio.js
+++ b/lib/internal/process/stdio.js
@@ -18,10 +18,12 @@ function setupStdio() {
function getStdout() {
if (stdout) return stdout;
stdout = createWritableStdioStream(1);
- stdout.destroy = stdout.destroySoon = function(er) {
+ stdout.destroySoon = stdout.destroy;
+ stdout._destroy = function(er, cb) {
+ // avoid errors if we already emitted
const errors = lazyErrors();
er = er || new errors.Error('ERR_STDOUT_CLOSE');
- stdout.emit('error', er);
+ cb(er);
};
if (stdout.isTTY) {
process.on('SIGWINCH', () => stdout._refreshSize());
@@ -32,10 +34,12 @@ function setupStdio() {
function getStderr() {
if (stderr) return stderr;
stderr = createWritableStdioStream(2);
- stderr.destroy = stderr.destroySoon = function(er) {
+ stderr.destroySoon = stderr.destroy;
+ stderr._destroy = function(er, cb) {
+ // avoid errors if we already emitted
const errors = lazyErrors();
er = er || new errors.Error('ERR_STDERR_CLOSE');
- stderr.emit('error', er);
+ cb(er);
};
if (stderr.isTTY) {
process.on('SIGWINCH', () => stderr._refreshSize());
diff --git a/lib/internal/streams/destroy.js b/lib/internal/streams/destroy.js
new file mode 100644
index 0000000000..18a2a83984
--- /dev/null
+++ b/lib/internal/streams/destroy.js
@@ -0,0 +1,65 @@
+'use strict';
+
+// undocumented cb() API, needed for core, not for public API
+function destroy(err, cb) {
+ const readableDestroyed = this._readableState &&
+ this._readableState.destroyed;
+ const writableDestroyed = this._writableState &&
+ this._writableState.destroyed;
+
+ if (readableDestroyed || writableDestroyed) {
+ if (err && (!this._writableState || !this._writableState.errorEmitted)) {
+ process.nextTick(emitErrorNT, this, err);
+ }
+ return;
+ }
+
+ // we set destroyed to true before firing error callbacks in order
+ // to make it re-entrance safe in case destroy() is called within callbacks
+
+ if (this._readableState) {
+ this._readableState.destroyed = true;
+ }
+
+ // if this is a duplex stream mark the writable part as destroyed as well
+ if (this._writableState) {
+ this._writableState.destroyed = true;
+ }
+
+ this._destroy(err || null, (err) => {
+ if (!cb && err) {
+ process.nextTick(emitErrorNT, this, err);
+ if (this._writableState) {
+ this._writableState.errorEmitted = true;
+ }
+ } else if (cb) {
+ cb(err);
+ }
+ });
+}
+
+function undestroy() {
+ if (this._readableState) {
+ this._readableState.destroyed = false;
+ this._readableState.reading = false;
+ this._readableState.ended = false;
+ this._readableState.endEmitted = false;
+ }
+
+ if (this._writableState) {
+ this._writableState.destroyed = false;
+ this._writableState.ended = false;
+ this._writableState.ending = false;
+ this._writableState.finished = false;
+ this._writableState.errorEmitted = false;
+ }
+}
+
+function emitErrorNT(self, err) {
+ self.emit('error', err);
+}
+
+module.exports = {
+ destroy,
+ undestroy
+};
diff --git a/lib/net.js b/lib/net.js
index 0c2c2dceda..a3d778a448 100644
--- a/lib/net.js
+++ b/lib/net.js
@@ -156,7 +156,7 @@ function normalizeArgs(args) {
// called when creating new Socket, or when re-using a closed Socket
function initSocketHandle(self) {
- self.destroyed = false;
+ self._undestroy();
self._bytesDispatched = 0;
self._sockname = null;
@@ -295,7 +295,7 @@ function onSocketFinish() {
var err = this._handle.shutdown(req);
if (err)
- return this._destroy(errnoException(err, 'shutdown'));
+ return this.destroy(errnoException(err, 'shutdown'));
}
@@ -481,7 +481,7 @@ Socket.prototype._read = function(n) {
this._handle.reading = true;
var err = this._handle.readStart();
if (err)
- this._destroy(errnoException(err, 'read'));
+ this.destroy(errnoException(err, 'read'));
}
};
@@ -526,20 +526,6 @@ Socket.prototype.destroySoon = function() {
Socket.prototype._destroy = function(exception, cb) {
debug('destroy');
- function fireErrorCallbacks(self, exception, cb) {
- if (cb) cb(exception);
- if (exception && !self._writableState.errorEmitted) {
- process.nextTick(emitErrorNT, self, exception);
- self._writableState.errorEmitted = true;
- }
- }
-
- if (this.destroyed) {
- debug('already destroyed, fire error callbacks');
- fireErrorCallbacks(this, exception, cb);
- return;
- }
-
this.connecting = false;
this.readable = this.writable = false;
@@ -564,11 +550,7 @@ Socket.prototype._destroy = function(exception, cb) {
this._sockname = null;
}
- // we set destroyed to true before firing error callbacks in order
- // to make it re-entrance safe in case Socket.prototype.destroy()
- // is called within callbacks
- this.destroyed = true;
- fireErrorCallbacks(this, exception, cb);
+ cb(exception);
if (this._server) {
COUNTER_NET_SERVER_CONNECTION_CLOSE(this);
@@ -581,12 +563,6 @@ Socket.prototype._destroy = function(exception, cb) {
};
-Socket.prototype.destroy = function(exception) {
- debug('destroy', exception);
- this._destroy(exception);
-};
-
-
// This function is called whenever the handle gets a
// buffer, or when there's an error reading.
function onread(nread, buffer) {
@@ -614,7 +590,7 @@ function onread(nread, buffer) {
debug('readStop');
var err = handle.readStop();
if (err)
- self._destroy(errnoException(err, 'read'));
+ self.destroy(errnoException(err, 'read'));
}
return;
}
@@ -628,7 +604,7 @@ function onread(nread, buffer) {
// Error, possibly EOF.
if (nread !== uv.UV_EOF) {
- return self._destroy(errnoException(nread, 'read'));
+ return self.destroy(errnoException(nread, 'read'));
}
debug('EOF');
@@ -739,7 +715,7 @@ Socket.prototype._writeGeneric = function(writev, data, encoding, cb) {
this._unrefTimer();
if (!this._handle) {
- this._destroy(new Error('This socket is closed'), cb);
+ this.destroy(new Error('This socket is closed'), cb);
return false;
}
@@ -771,7 +747,7 @@ Socket.prototype._writeGeneric = function(writev, data, encoding, cb) {
}
if (err)
- return this._destroy(errnoException(err, 'write', req.error), cb);
+ return this.destroy(errnoException(err, 'write', req.error), cb);
this._bytesDispatched += req.bytes;
@@ -862,7 +838,7 @@ function afterWrite(status, handle, req, err) {
if (status < 0) {
var ex = errnoException(status, 'write', req.error);
debug('write failure', ex);
- self._destroy(ex, req.cb);
+ self.destroy(ex, req.cb);
return;
}
@@ -896,13 +872,13 @@ function internalConnect(
localAddress = localAddress || '::';
err = self._handle.bind6(localAddress, localPort);
} else {
- self._destroy(new TypeError('Invalid addressType: ' + addressType));
+ self.destroy(new TypeError('Invalid addressType: ' + addressType));
return;
}
if (err) {
const ex = exceptionWithHostPort(err, 'bind', localAddress, localPort);
- self._destroy(ex);
+ self.destroy(ex);
return;
}
}
@@ -944,7 +920,7 @@ function internalConnect(
}
const ex = exceptionWithHostPort(err, 'connect', address, port, details);
- self._destroy(ex);
+ self.destroy(ex);
}
}
@@ -971,14 +947,7 @@ Socket.prototype.connect = function() {
this.write = Socket.prototype.write;
if (this.destroyed) {
- this._readableState.reading = false;
- this._readableState.ended = false;
- this._readableState.endEmitted = false;
- this._writableState.ended = false;
- this._writableState.ending = false;
- this._writableState.finished = false;
- this._writableState.errorEmitted = false;
- this.destroyed = false;
+ this._undestroy();
this._handle = null;
this._peername = null;
this._sockname = null;
@@ -1088,8 +1057,7 @@ function lookupAndConnect(self, options) {
function connectErrorNT(self, err) {
- self.emit('error', err);
- self._destroy();
+ self.destroy(err);
}
@@ -1162,7 +1130,7 @@ function afterConnect(status, handle, req, readable, writable) {
ex.localAddress = req.localAddress;
ex.localPort = req.localPort;
}
- self._destroy(ex);
+ self.destroy(ex);
}
}