summaryrefslogtreecommitdiff
path: root/lib
diff options
context:
space:
mode:
authorTrevor Norris <trev.norris@gmail.com>2017-03-10 06:17:42 -0700
committerAnna Henningsen <anna@addaleax.net>2017-05-10 22:22:29 +0200
commit4a7233c1788334c171d2280026333242df7d37af (patch)
tree649cf5aaa02e391dd90f460c0c4922ba631387dd /lib
parent7e3a3c962f09233c53cee7ebe381341d7c8b7162 (diff)
downloadandroid-node-v8-4a7233c1788334c171d2280026333242df7d37af.tar.gz
android-node-v8-4a7233c1788334c171d2280026333242df7d37af.tar.bz2
android-node-v8-4a7233c1788334c171d2280026333242df7d37af.zip
lib: implement async_hooks API in core
Implement async_hooks support in the following: * fatalException handler * process.nextTick * Timers * net/dgram/http PR-URL: https://github.com/nodejs/node/pull/12892 Ref: https://github.com/nodejs/node/pull/11883 Ref: https://github.com/nodejs/node/pull/8531 Reviewed-By: Andreas Madsen <amwebdk@gmail.com> Reviewed-By: Anna Henningsen <anna@addaleax.net> Reviewed-By: Sam Roberts <vieuxtech@gmail.com> Reviewed-By: Matteo Collina <matteo.collina@gmail.com> Reviewed-By: Refael Ackermann <refack@gmail.com> Reviewed-By: James M Snell <jasnell@gmail.com> Reviewed-By: Jeremiah Senkpiel <fishrock123@rocketmail.com>
Diffstat (limited to 'lib')
-rw-r--r--lib/_http_agent.js9
-rw-r--r--lib/_http_client.js6
-rw-r--r--lib/_http_common.js8
-rw-r--r--lib/_http_outgoing.js12
-rw-r--r--lib/async_hooks.js6
-rw-r--r--lib/dgram.js12
-rw-r--r--lib/internal/bootstrap_node.js24
-rw-r--r--lib/internal/process/next_tick.js153
-rw-r--r--lib/net.js45
-rw-r--r--lib/timers.js107
10 files changed, 343 insertions, 39 deletions
diff --git a/lib/_http_agent.js b/lib/_http_agent.js
index 88f4df402a..d791a961c7 100644
--- a/lib/_http_agent.js
+++ b/lib/_http_agent.js
@@ -25,6 +25,8 @@ const net = require('net');
const util = require('util');
const EventEmitter = require('events');
const debug = util.debuglog('http');
+const async_id_symbol = process.binding('async_wrap').async_id_symbol;
+const nextTick = require('internal/process/next_tick').nextTick;
// New Agent code.
@@ -93,6 +95,7 @@ function Agent(options) {
self.freeSockets[name] = freeSockets;
socket.setKeepAlive(true, self.keepAliveMsecs);
socket.unref();
+ socket[async_id_symbol] = -1;
socket._httpMessage = null;
self.removeSocket(socket, options);
freeSockets.push(socket);
@@ -163,6 +166,8 @@ Agent.prototype.addRequest = function addRequest(req, options, port/*legacy*/,
if (freeLen) {
// we have a free socket, so use that.
var socket = this.freeSockets[name].shift();
+ // Assign the handle a new asyncId and run any init() hooks.
+ socket._handle.asyncReset();
debug('have free socket');
// don't leak
@@ -177,7 +182,7 @@ Agent.prototype.addRequest = function addRequest(req, options, port/*legacy*/,
// If we are under maxSockets create a new one.
this.createSocket(req, options, function(err, newSocket) {
if (err) {
- process.nextTick(function() {
+ nextTick(newSocket._handle.getAsyncId(), function() {
req.emit('error', err);
});
return;
@@ -290,7 +295,7 @@ Agent.prototype.removeSocket = function removeSocket(s, options) {
// If we have pending requests and a socket gets closed make a new one
this.createSocket(req, options, function(err, newSocket) {
if (err) {
- process.nextTick(function() {
+ nextTick(newSocket._handle.getAsyncId(), function() {
req.emit('error', err);
});
return;
diff --git a/lib/_http_client.js b/lib/_http_client.js
index e1f064e832..a238a4f14c 100644
--- a/lib/_http_client.js
+++ b/lib/_http_client.js
@@ -36,6 +36,7 @@ const Agent = require('_http_agent');
const Buffer = require('buffer').Buffer;
const urlToOptions = require('internal/url').urlToOptions;
const outHeadersKey = require('internal/http').outHeadersKey;
+const nextTick = require('internal/process/next_tick').nextTick;
// The actual list of disallowed characters in regexp form is more like:
// /[^A-Za-z0-9\-._~!$&'()*+,;=/:@]/
@@ -587,9 +588,12 @@ function responseKeepAlive(res, req) {
socket.removeListener('close', socketCloseListener);
socket.removeListener('error', socketErrorListener);
socket.once('error', freeSocketErrorListener);
+ // There are cases where _handle === null. Avoid those. Passing null to
+ // nextTick() will call initTriggerId() to retrieve the id.
+ const asyncId = socket._handle ? socket._handle.getAsyncId() : null;
// Mark this socket as available, AFTER user-added end
// handlers have a chance to run.
- process.nextTick(emitFreeNT, socket);
+ nextTick(asyncId, emitFreeNT, socket);
}
}
diff --git a/lib/_http_common.js b/lib/_http_common.js
index b33e38e08d..59eaec5811 100644
--- a/lib/_http_common.js
+++ b/lib/_http_common.js
@@ -28,6 +28,7 @@ const HTTPParser = binding.HTTPParser;
const FreeList = require('internal/freelist');
const ondrain = require('internal/http').ondrain;
const incoming = require('_http_incoming');
+const emitDestroy = require('async_hooks').emitDestroy;
const IncomingMessage = incoming.IncomingMessage;
const readStart = incoming.readStart;
const readStop = incoming.readStop;
@@ -211,8 +212,13 @@ function freeParser(parser, req, socket) {
parser.incoming = null;
parser.outgoing = null;
parser[kOnExecute] = null;
- if (parsers.free(parser) === false)
+ if (parsers.free(parser) === false) {
parser.close();
+ } else {
+ // Since the Parser destructor isn't going to run the destroy() callbacks
+ // it needs to be triggered manually.
+ emitDestroy(parser.getAsyncId());
+ }
}
if (req) {
req.parser = null;
diff --git a/lib/_http_outgoing.js b/lib/_http_outgoing.js
index 4ff84aea7d..9b492df85a 100644
--- a/lib/_http_outgoing.js
+++ b/lib/_http_outgoing.js
@@ -31,6 +31,8 @@ const common = require('_http_common');
const checkIsHttpToken = common._checkIsHttpToken;
const checkInvalidHeaderChar = common._checkInvalidHeaderChar;
const outHeadersKey = require('internal/http').outHeadersKey;
+const async_id_symbol = process.binding('async_wrap').async_id_symbol;
+const nextTick = require('internal/process/next_tick').nextTick;
const CRLF = common.CRLF;
const debug = common.debug;
@@ -264,8 +266,9 @@ function _writeRaw(data, encoding, callback) {
if (this.output.length) {
this._flushOutput(conn);
} else if (!data.length) {
- if (typeof callback === 'function')
- process.nextTick(callback);
+ if (typeof callback === 'function') {
+ nextTick(this.socket[async_id_symbol], callback);
+ }
return true;
}
// Directly write to socket.
@@ -623,7 +626,10 @@ const crlf_buf = Buffer.from('\r\n');
OutgoingMessage.prototype.write = function write(chunk, encoding, callback) {
if (this.finished) {
var err = new Error('write after end');
- process.nextTick(writeAfterEndNT.bind(this), err, callback);
+ nextTick(this.socket[async_id_symbol],
+ writeAfterEndNT.bind(this),
+ err,
+ callback);
return true;
}
diff --git a/lib/async_hooks.js b/lib/async_hooks.js
index 736b189097..867b5eb52d 100644
--- a/lib/async_hooks.js
+++ b/lib/async_hooks.js
@@ -32,7 +32,7 @@ var processing_hook = false;
// Use to temporarily store and updated active_hooks_array if the user enables
// or disables a hook while hooks are being processed.
var tmp_active_hooks_array = null;
-// Keep track of the field counds held in tmp_active_hooks_array.
+// Keep track of the field counts held in tmp_active_hooks_array.
var tmp_async_hook_fields = null;
// Each constant tracks how many callbacks there are for any given step of
@@ -41,9 +41,9 @@ var tmp_async_hook_fields = null;
const { kInit, kBefore, kAfter, kDestroy, kCurrentAsyncId, kCurrentTriggerId,
kAsyncUidCntr, kInitTriggerId } = async_wrap.constants;
+const { async_id_symbol, trigger_id_symbol } = async_wrap;
+
// Used in AsyncHook and AsyncEvent.
-const async_id_symbol = Symbol('_asyncId');
-const trigger_id_symbol = Symbol('_triggerId');
const init_symbol = Symbol('init');
const before_symbol = Symbol('before');
const after_symbol = Symbol('after');
diff --git a/lib/dgram.js b/lib/dgram.js
index f2da1da1cc..4d9f7eb29f 100644
--- a/lib/dgram.js
+++ b/lib/dgram.js
@@ -25,7 +25,10 @@ const assert = require('assert');
const Buffer = require('buffer').Buffer;
const util = require('util');
const EventEmitter = require('events');
+const setInitTriggerId = require('async_hooks').setInitTriggerId;
const UV_UDP_REUSEADDR = process.binding('constants').os.UV_UDP_REUSEADDR;
+const async_id_symbol = process.binding('async_wrap').async_id_symbol;
+const nextTick = require('internal/process/next_tick').nextTick;
const UDP = process.binding('udp_wrap').UDP;
const SendWrap = process.binding('udp_wrap').SendWrap;
@@ -111,6 +114,7 @@ function Socket(type, listener) {
this._handle = handle;
this._receiving = false;
this._bindState = BIND_STATE_UNBOUND;
+ this[async_id_symbol] = this._handle.getAsyncId();
this.type = type;
this.fd = null; // compatibility hack
@@ -432,6 +436,10 @@ function doSend(ex, self, ip, list, address, port, callback) {
req.callback = callback;
req.oncomplete = afterSend;
}
+ // node::SendWrap isn't instantiated and attached to the JS instance of
+ // SendWrap above until send() is called. So don't set the init trigger id
+ // until now.
+ setInitTriggerId(self[async_id_symbol]);
var err = self._handle.send(req,
list,
list.length,
@@ -441,7 +449,7 @@ function doSend(ex, self, ip, list, address, port, callback) {
if (err && callback) {
// don't emit as error, dgram_legacy.js compatibility
const ex = exceptionWithHostPort(err, 'send', address, port);
- process.nextTick(callback, ex);
+ nextTick(self[async_id_symbol], callback, ex);
}
}
@@ -468,7 +476,7 @@ Socket.prototype.close = function(callback) {
this._stopReceiving();
this._handle.close();
this._handle = null;
- process.nextTick(socketCloseNT, this);
+ nextTick(this[async_id_symbol], socketCloseNT, this);
return this;
};
diff --git a/lib/internal/bootstrap_node.js b/lib/internal/bootstrap_node.js
index 67a7c10919..b93b817aba 100644
--- a/lib/internal/bootstrap_node.js
+++ b/lib/internal/bootstrap_node.js
@@ -292,10 +292,20 @@
}
function setupProcessFatal() {
+ const async_wrap = process.binding('async_wrap');
+ // Arrays containing hook flags and ids for async_hook calls.
+ const { async_hook_fields, async_uid_fields } = async_wrap;
+ // Internal functions needed to manipulate the stack.
+ const { clearIdStack, popAsyncIds } = async_wrap;
+ const { kAfter, kCurrentAsyncId, kInitTriggerId } = async_wrap.constants;
process._fatalException = function(er) {
var caught;
+ // It's possible that kInitTriggerId was set for a constructor call that
+ // threw and was never cleared. So clear it now.
+ async_uid_fields[kInitTriggerId] = 0;
+
if (process.domain && process.domain._errorHandler)
caught = process.domain._errorHandler(er);
@@ -314,9 +324,21 @@
// nothing to be done about it at this point.
}
- // if we handled an error, then make sure any ticks get processed
} else {
+ // If we handled an error, then make sure any ticks get processed
NativeModule.require('timers').setImmediate(process._tickCallback);
+
+ // Emit the after() hooks now that the exception has been handled.
+ if (async_hook_fields[kAfter] > 0) {
+ do {
+ NativeModule.require('async_hooks').emitAfter(
+ async_uid_fields[kCurrentAsyncId]);
+ // popAsyncIds() returns true if there are more ids on the stack.
+ } while (popAsyncIds(async_uid_fields[kCurrentAsyncId]));
+ // Or completely empty the id stack.
+ } else {
+ clearIdStack();
+ }
}
return caught;
diff --git a/lib/internal/process/next_tick.js b/lib/internal/process/next_tick.js
index c834ffc2e3..0ba26ce033 100644
--- a/lib/internal/process/next_tick.js
+++ b/lib/internal/process/next_tick.js
@@ -7,11 +7,26 @@
const kMaxCallbacksPerLoop = 1e4;
exports.setup = setupNextTick;
+// Will be overwritten when setupNextTick() is called.
+exports.nextTick = null;
function setupNextTick() {
+ const async_wrap = process.binding('async_wrap');
+ const async_hooks = require('async_hooks');
const promises = require('internal/process/promises');
const errors = require('internal/errors');
const emitPendingUnhandledRejections = promises.setup(scheduleMicrotasks);
+ const initTriggerId = async_hooks.initTriggerId;
+ // Two arrays that share state between C++ and JS.
+ const { async_hook_fields, async_uid_fields } = async_wrap;
+ // Used to change the state of the async id stack.
+ const { pushAsyncIds, popAsyncIds } = async_wrap;
+ // The needed emit*() functions.
+ const { emitInit, emitBefore, emitAfter, emitDestroy } = async_hooks;
+ // Grab the constants necessary for working with internal arrays.
+ const { kInit, kBefore, kAfter, kDestroy, kAsyncUidCntr, kInitTriggerId } =
+ async_wrap.constants;
+ const { async_id_symbol, trigger_id_symbol } = async_wrap;
var nextTickQueue = [];
var microtasksScheduled = false;
@@ -27,6 +42,9 @@ function setupNextTick() {
process._tickCallback = _tickCallback;
process._tickDomainCallback = _tickDomainCallback;
+ // Set the nextTick() function for internal usage.
+ exports.nextTick = internalNextTick;
+
// This tickInfo thing is used so that the C++ code in src/node.cc
// can have easy access to our nextTick state, and avoid unnecessary
// calls into JS land.
@@ -51,10 +69,13 @@ function setupNextTick() {
if (microtasksScheduled)
return;
- nextTickQueue.push({
- callback: runMicrotasksCallback,
- domain: null
- });
+ const tickObject =
+ new TickObject(runMicrotasksCallback, undefined, null);
+ // For the moment all microtasks come from the void until the PromiseHook
+ // API is implemented.
+ tickObject[async_id_symbol] = 0;
+ tickObject[trigger_id_symbol] = 0;
+ nextTickQueue.push(tickObject);
tickInfo[kLength]++;
microtasksScheduled = true;
@@ -89,20 +110,58 @@ function setupNextTick() {
}
}
+ // TODO(trevnorris): Using std::stack of Environment::AsyncHooks::ids_stack_
+ // is much slower here than was the Float64Array stack used in a previous
+ // implementation. Problem is the Float64Array stack was a bit brittle.
+ // Investigate how to harden that implementation and possibly reintroduce it.
+ function nextTickEmitBefore(asyncId, triggerId) {
+ if (async_hook_fields[kBefore] > 0)
+ emitBefore(asyncId, triggerId);
+ else
+ pushAsyncIds(asyncId, triggerId);
+ }
+
+ function nextTickEmitAfter(asyncId) {
+ if (async_hook_fields[kAfter] > 0)
+ emitAfter(asyncId);
+ else
+ popAsyncIds(asyncId);
+ }
+
// Run callbacks that have no domain.
// Using domains will cause this to be overridden.
function _tickCallback() {
- var callback, args, tock;
-
do {
while (tickInfo[kIndex] < tickInfo[kLength]) {
- tock = nextTickQueue[tickInfo[kIndex]++];
- callback = tock.callback;
- args = tock.args;
+ const tock = nextTickQueue[tickInfo[kIndex]++];
+ const callback = tock.callback;
+ const args = tock.args;
+
+ // CHECK(Number.isSafeInteger(tock[async_id_symbol]))
+ // CHECK(tock[async_id_symbol] > 0)
+ // CHECK(Number.isSafeInteger(tock[trigger_id_symbol]))
+ // CHECK(tock[trigger_id_symbol] > 0)
+
+ nextTickEmitBefore(tock[async_id_symbol], tock[trigger_id_symbol]);
+ // emitDestroy() places the async_id_symbol into an asynchronous queue
+ // that calls the destroy callback in the future. It's called before
+ // calling tock.callback so destroy will be called even if the callback
+ // throws an exception that is handles by 'uncaughtException' or a
+ // domain.
+ // TODO(trevnorris): This is a bit of a hack. It relies on the fact
+ // that nextTick() doesn't allow the event loop to proceed, but if
+ // any async hooks are enabled during the callback's execution then
+ // this tock's after hook will be called, but not its destroy hook.
+ if (async_hook_fields[kDestroy] > 0)
+ emitDestroy(tock[async_id_symbol]);
+
// Using separate callback execution functions allows direct
// callback invocation with small numbers of arguments to avoid the
// performance hit associated with using `fn.apply()`
_combinedTickCallback(args, callback);
+
+ nextTickEmitAfter(tock[async_id_symbol]);
+
if (kMaxCallbacksPerLoop < tickInfo[kIndex])
tickDone();
}
@@ -113,20 +172,33 @@ function setupNextTick() {
}
function _tickDomainCallback() {
- var callback, domain, args, tock;
-
do {
while (tickInfo[kIndex] < tickInfo[kLength]) {
- tock = nextTickQueue[tickInfo[kIndex]++];
- callback = tock.callback;
- domain = tock.domain;
- args = tock.args;
+ const tock = nextTickQueue[tickInfo[kIndex]++];
+ const callback = tock.callback;
+ const domain = tock.domain;
+ const args = tock.args;
if (domain)
domain.enter();
+
+ // CHECK(Number.isSafeInteger(tock[async_id_symbol]))
+ // CHECK(tock[async_id_symbol] > 0)
+ // CHECK(Number.isSafeInteger(tock[trigger_id_symbol]))
+ // CHECK(tock[trigger_id_symbol] > 0)
+
+ nextTickEmitBefore(tock[async_id_symbol], tock[trigger_id_symbol]);
+ // TODO(trevnorris): See comment in _tickCallback() as to why this
+ // isn't a good solution.
+ if (async_hook_fields[kDestroy] > 0)
+ emitDestroy(tock[async_id_symbol]);
+
// Using separate callback execution functions allows direct
// callback invocation with small numbers of arguments to avoid the
// performance hit associated with using `fn.apply()`
_combinedTickCallback(args, callback);
+
+ nextTickEmitAfter(tock[async_id_symbol]);
+
if (kMaxCallbacksPerLoop < tickInfo[kIndex])
tickDone();
if (domain)
@@ -138,6 +210,25 @@ function setupNextTick() {
} while (tickInfo[kLength] !== 0);
}
+ function TickObject(callback, args, domain) {
+ this.callback = callback;
+ this.domain = domain;
+ this.args = args;
+ this[async_id_symbol] = -1;
+ this[trigger_id_symbol] = -1;
+ }
+
+ function setupInit(tickObject, triggerId) {
+ tickObject[async_id_symbol] = ++async_uid_fields[kAsyncUidCntr];
+ tickObject[trigger_id_symbol] = triggerId || initTriggerId();
+ if (async_hook_fields[kInit] > 0) {
+ emitInit(tickObject[async_id_symbol],
+ 'TickObject',
+ tickObject[trigger_id_symbol],
+ tickObject);
+ }
+ }
+
function nextTick(callback) {
if (typeof callback !== 'function')
throw new errors.TypeError('ERR_INVALID_CALLBACK');
@@ -152,11 +243,33 @@ function setupNextTick() {
args[i - 1] = arguments[i];
}
- nextTickQueue.push({
- callback,
- domain: process.domain || null,
- args
- });
+ var obj = new TickObject(callback, args, process.domain || null);
+ setupInit(obj, null);
+ nextTickQueue.push(obj);
+ tickInfo[kLength]++;
+ }
+
+ function internalNextTick(triggerId, callback) {
+ if (typeof callback !== 'function')
+ throw new TypeError('callback is not a function');
+ // CHECK(Number.isSafeInteger(triggerId) || triggerId === null)
+ // CHECK(triggerId > 0 || triggerId === null)
+
+ if (process._exiting)
+ return;
+
+ var args;
+ if (arguments.length > 2) {
+ args = new Array(arguments.length - 2);
+ for (var i = 2; i < arguments.length; i++)
+ args[i - 2] = arguments[i];
+ }
+
+ var obj = new TickObject(callback, args, process.domain || null);
+ setupInit(obj, triggerId);
+ // The call to initTriggerId() was skipped, so clear kInitTriggerId.
+ async_uid_fields[kInitTriggerId] = 0;
+ nextTickQueue.push(obj);
tickInfo[kLength]++;
}
}
diff --git a/lib/net.js b/lib/net.js
index 2da278a32d..2431820783 100644
--- a/lib/net.js
+++ b/lib/net.js
@@ -39,6 +39,9 @@ const TCPConnectWrap = process.binding('tcp_wrap').TCPConnectWrap;
const PipeConnectWrap = process.binding('pipe_wrap').PipeConnectWrap;
const ShutdownWrap = process.binding('stream_wrap').ShutdownWrap;
const WriteWrap = process.binding('stream_wrap').WriteWrap;
+const async_id_symbol = process.binding('async_wrap').async_id_symbol;
+const { newUid, setInitTriggerId } = require('async_hooks');
+const nextTick = require('internal/process/next_tick').nextTick;
var cluster;
var dns;
@@ -57,6 +60,12 @@ function createHandle(fd) {
}
+function getNewAsyncId(handle) {
+ return (!handle || typeof handle.getAsyncId !== 'function') ?
+ newUid() : handle.getAsyncId();
+}
+
+
const debug = util.debuglog('net');
function isPipeName(s) {
@@ -147,6 +156,7 @@ function initSocketHandle(self) {
if (self._handle) {
self._handle.owner = self;
self._handle.onread = onread;
+ self[async_id_symbol] = getNewAsyncId(self._handle);
// If handle doesn't support writev - neither do we
if (!self._handle.writev)
@@ -162,6 +172,10 @@ function Socket(options) {
if (!(this instanceof Socket)) return new Socket(options);
this.connecting = false;
+ // Problem with this is that users can supply their own handle, that may not
+ // have _handle.getAsyncId(). In this case an[async_id_symbol] should
+ // probably be supplied by async_hooks.
+ this[async_id_symbol] = -1;
this._hadError = false;
this._handle = null;
this._parent = null;
@@ -176,9 +190,11 @@ function Socket(options) {
if (options.handle) {
this._handle = options.handle; // private
+ this[async_id_symbol] = getNewAsyncId(this._handle);
} else if (options.fd !== undefined) {
this._handle = createHandle(options.fd);
this._handle.open(options.fd);
+ this[async_id_symbol] = this._handle.getAsyncId();
// options.fd can be string (since it is user-defined),
// so changing this to === would be semver-major
// See: https://github.com/nodejs/node/pull/11513
@@ -264,6 +280,10 @@ function onSocketFinish() {
var req = new ShutdownWrap();
req.oncomplete = afterShutdown;
req.handle = this._handle;
+ // node::ShutdownWrap isn't instantiated and attached to the JS instance of
+ // ShutdownWrap above until shutdown() is called. So don't set the init
+ // trigger id until now.
+ setInitTriggerId(this[async_id_symbol]);
var err = this._handle.shutdown(req);
if (err)
@@ -329,7 +349,7 @@ function writeAfterFIN(chunk, encoding, cb) {
// TODO: defer error events consistently everywhere, not just the cb
this.emit('error', er);
if (typeof cb === 'function') {
- process.nextTick(cb, er);
+ nextTick(this[async_id_symbol], cb, er);
}
}
@@ -887,6 +907,10 @@ function internalConnect(
req.localAddress = localAddress;
req.localPort = localPort;
+ // node::TCPConnectWrap isn't instantiated and attached to the JS instance
+ // of TCPConnectWrap above until connect() is called. So don't set the init
+ // trigger id until now.
+ setInitTriggerId(self[async_id_symbol]);
if (addressType === 4)
err = self._handle.connect(req, address, port);
else
@@ -896,6 +920,10 @@ function internalConnect(
const req = new PipeConnectWrap();
req.address = address;
req.oncomplete = afterConnect;
+ // node::PipeConnectWrap isn't instantiated and attached to the JS instance
+ // of PipeConnectWrap above until connect() is called. So don't set the
+ // init trigger id until now.
+ setInitTriggerId(self[async_id_symbol]);
err = self._handle.connect(req, address, afterConnect);
}
@@ -1020,6 +1048,7 @@ function lookupAndConnect(self, options) {
debug('connect: dns options', dnsopts);
self._host = host;
var lookup = options.lookup || dns.lookup;
+ setInitTriggerId(self[async_id_symbol]);
lookup(host, dnsopts, function emitLookup(err, ip, addressType) {
self.emit('lookup', err, ip, addressType, host);
@@ -1167,6 +1196,7 @@ function Server(options, connectionListener) {
configurable: true, enumerable: false
});
+ this[async_id_symbol] = -1;
this._handle = null;
this._usingSlaves = false;
this._slaves = [];
@@ -1274,6 +1304,7 @@ function setupListenHandle(address, port, addressType, backlog, fd) {
this._handle = rval;
}
+ this[async_id_symbol] = getNewAsyncId(this._handle);
this._handle.onconnection = onconnection;
this._handle.owner = this;
@@ -1286,7 +1317,7 @@ function setupListenHandle(address, port, addressType, backlog, fd) {
var ex = exceptionWithHostPort(err, 'listen', address, port);
this._handle.close();
this._handle = null;
- process.nextTick(emitErrorNT, this, ex);
+ nextTick(this[async_id_symbol], emitErrorNT, this, ex);
return;
}
@@ -1297,7 +1328,7 @@ function setupListenHandle(address, port, addressType, backlog, fd) {
if (this._unref)
this.unref();
- process.nextTick(emitListeningNT, this);
+ nextTick(this[async_id_symbol], emitListeningNT, this);
}
Server.prototype._listen2 = setupListenHandle; // legacy alias
@@ -1398,6 +1429,7 @@ Server.prototype.listen = function() {
// (handle[, backlog][, cb]) where handle is an object with a handle
if (options instanceof TCP) {
this._handle = options;
+ this[async_id_symbol] = this._handle.getAsyncId();
listenInCluster(this, null, -1, -1, backlogFromArgs);
return this;
}
@@ -1521,8 +1553,10 @@ function onconnection(err, clientHandle) {
Server.prototype.getConnections = function(cb) {
+ const self = this;
+
function end(err, connections) {
- process.nextTick(cb, err, connections);
+ nextTick(self[async_id_symbol], cb, err, connections);
}
if (!this._usingSlaves) {
@@ -1597,7 +1631,8 @@ Server.prototype._emitCloseIfDrained = function() {
return;
}
- process.nextTick(emitCloseNT, this);
+ const asyncId = this._handle ? this[async_id_symbol] : null;
+ nextTick(asyncId, emitCloseNT, this);
};
diff --git a/lib/timers.js b/lib/timers.js
index 5d21227b7b..fb9984abf4 100644
--- a/lib/timers.js
+++ b/lib/timers.js
@@ -21,14 +21,29 @@
'use strict';
+const async_wrap = process.binding('async_wrap');
const TimerWrap = process.binding('timer_wrap').Timer;
const L = require('internal/linkedlist');
const internalUtil = require('internal/util');
const { createPromise, promiseResolve } = process.binding('util');
+const async_hooks = require('async_hooks');
const assert = require('assert');
const util = require('util');
const debug = util.debuglog('timer');
const kOnTimeout = TimerWrap.kOnTimeout | 0;
+const initTriggerId = async_hooks.initTriggerId;
+// Two arrays that share state between C++ and JS.
+const { async_hook_fields, async_uid_fields } = async_wrap;
+// Used to change the state of the async id stack.
+const { pushAsyncIds, popAsyncIds } = async_wrap;
+// The needed emit*() functions.
+const { emitInit, emitBefore, emitAfter, emitDestroy } = async_hooks;
+// Grab the constants necessary for working with internal arrays.
+const { kInit, kBefore, kAfter, kDestroy, kAsyncUidCntr } =
+ async_wrap.constants;
+// Symbols for storing async id state.
+const async_id_symbol = Symbol('asyncId');
+const trigger_id_symbol = Symbol('triggerId');
// Timeout values > TIMEOUT_MAX are set to 1.
const TIMEOUT_MAX = 2147483647; // 2^31-1
@@ -134,6 +149,22 @@ exports._unrefActive = function(item) {
};
+function timerEmitBefore(asyncId, triggerId) {
+ if (async_hook_fields[kBefore] > 0)
+ emitBefore(asyncId, triggerId);
+ else
+ pushAsyncIds(asyncId, triggerId);
+}
+
+
+function timerEmitAfter(asyncId) {
+ if (async_hook_fields[kAfter] > 0)
+ emitAfter(asyncId);
+ else
+ popAsyncIds(asyncId);
+}
+
+
// The underlying logic for scheduling or re-scheduling a timer.
//
// Appends a timer onto the end of an existing timers list, or creates a new
@@ -154,6 +185,14 @@ function insert(item, unrefed) {
lists[msecs] = list = createTimersList(msecs, unrefed);
}
+ if (!item[async_id_symbol] || item._destroyed) {
+ item._destroyed = false;
+ item[async_id_symbol] = ++async_uid_fields[kAsyncUidCntr];
+ item[trigger_id_symbol] = initTriggerId();
+ if (async_hook_fields[kInit] > 0)
+ emitInit(item[async_id_symbol], 'Timeout', item[trigger_id_symbol], item);
+ }
+
L.append(list, item);
assert(!L.isEmpty(list)); // list is not empty
}
@@ -218,7 +257,14 @@ function listOnTimeout() {
L.remove(timer);
assert(timer !== L.peek(list));
- if (!timer._onTimeout) continue;
+ if (!timer._onTimeout) {
+ if (async_hook_fields[kDestroy] > 0 && !timer._destroyed &&
+ typeof timer[async_id_symbol] === 'number') {
+ emitDestroy(timer[async_id_symbol]);
+ timer._destroyed = true;
+ }
+ continue;
+ }
var domain = timer.domain;
if (domain) {
@@ -268,11 +314,25 @@ function listOnTimeout() {
// 4.7) what is in this smaller function.
function tryOnTimeout(timer, list) {
timer._called = true;
+ const timerAsyncId = (typeof timer[async_id_symbol] === 'number') ?
+ timer[async_id_symbol] : null;
var threw = true;
+ if (timerAsyncId !== null)
+ timerEmitBefore(timerAsyncId, timer[trigger_id_symbol]);
try {
ontimeout(timer);
threw = false;
} finally {
+ if (timerAsyncId !== null) {
+ if (!threw)
+ timerEmitAfter(timerAsyncId);
+ if (!timer._repeat && async_hook_fields[kDestroy] > 0 &&
+ !timer._destroyed) {
+ emitDestroy(timerAsyncId);
+ timer._destroyed = true;
+ }
+ }
+
if (!threw) return;
// Postpone all later list events to next tick. We need to do this
@@ -324,6 +384,15 @@ function reuse(item) {
// Remove a timer. Cancels the timeout and resets the relevant timer properties.
const unenroll = exports.unenroll = function(item) {
+ // Fewer checks may be possible, but these cover everything.
+ if (async_hook_fields[kDestroy] > 0 &&
+ item &&
+ typeof item[async_id_symbol] === 'number' &&
+ !item._destroyed) {
+ emitDestroy(item[async_id_symbol]);
+ item._destroyed = true;
+ }
+
var handle = reuse(item);
if (handle) {
debug('unenroll: list empty');
@@ -516,6 +585,11 @@ function Timeout(after, callback, args) {
this._onTimeout = callback;
this._timerArgs = args;
this._repeat = null;
+ this._destroyed = false;
+ this[async_id_symbol] = ++async_uid_fields[kAsyncUidCntr];
+ this[trigger_id_symbol] = initTriggerId();
+ if (async_hook_fields[kInit] > 0)
+ emitInit(this[async_id_symbol], 'Timeout', this[trigger_id_symbol], this);
}
@@ -570,6 +644,15 @@ Timeout.prototype.ref = function() {
Timeout.prototype.close = function() {
this._onTimeout = null;
if (this._handle) {
+ // Fewer checks may be possible, but these cover everything.
+ if (async_hook_fields[kDestroy] > 0 &&
+ this &&
+ typeof this[async_id_symbol] === 'number' &&
+ !this._destroyed) {
+ emitDestroy(this[async_id_symbol]);
+ this._destroyed = true;
+ }
+
this._idleTimeout = -1;
this._handle[kOnTimeout] = null;
this._handle.close();
@@ -673,11 +756,21 @@ function processImmediate() {
// 4.7) what is in this smaller function.
function tryOnImmediate(immediate, oldTail) {
var threw = true;
+ timerEmitBefore(immediate[async_id_symbol], immediate[trigger_id_symbol]);
try {
// make the actual call outside the try/catch to allow it to be optimized
runCallback(immediate);
threw = false;
} finally {
+ // clearImmediate checks _callback === null for kDestroy hooks.
+ immediate._callback = null;
+ if (!threw)
+ timerEmitAfter(immediate[async_id_symbol]);
+ if (async_hook_fields[kDestroy] > 0 && !immediate._destroyed) {
+ emitDestroy(immediate[async_id_symbol]);
+ immediate._destroyed = true;
+ }
+
if (threw && immediate._idleNext) {
// Handle any remaining on next tick, assuming we're still alive to do so.
const curHead = immediateQueue.head;
@@ -726,7 +819,12 @@ function Immediate() {
this._callback = null;
this._argv = null;
this._onImmediate = null;
+ this._destroyed = false;
this.domain = process.domain;
+ this[async_id_symbol] = ++async_uid_fields[kAsyncUidCntr];
+ this[trigger_id_symbol] = initTriggerId();
+ if (async_hook_fields[kInit] > 0)
+ emitInit(this[async_id_symbol], 'Immediate', this[trigger_id_symbol], this);
}
function setImmediate(callback, arg1, arg2, arg3) {
@@ -785,6 +883,13 @@ function createImmediate(args, callback) {
exports.clearImmediate = function(immediate) {
if (!immediate) return;
+ if (async_hook_fields[kDestroy] > 0 &&
+ immediate._callback !== null &&
+ !immediate._destroyed) {
+ emitDestroy(immediate[async_id_symbol]);
+ immediate._destroyed = true;
+ }
+
immediate._onImmediate = null;
immediateQueue.remove(immediate);