summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--doc/api/errors.md23
-rw-r--r--doc/api/process.md27
-rw-r--r--doc/api/vm.md2
-rw-r--r--doc/api/worker.md299
-rw-r--r--lib/inspector.js2
-rw-r--r--lib/internal/bootstrap/node.js15
-rw-r--r--lib/internal/errors.js5
-rw-r--r--lib/internal/process.js6
-rw-r--r--lib/internal/process/methods.js7
-rw-r--r--lib/internal/process/stdio.js7
-rw-r--r--lib/internal/util/inspector.js4
-rw-r--r--lib/internal/worker.js221
-rw-r--r--lib/worker.js17
-rw-r--r--node.gyp2
-rw-r--r--src/async_wrap.h1
-rw-r--r--src/base_object-inl.h8
-rw-r--r--src/base_object.h4
-rw-r--r--src/bootstrapper.cc14
-rw-r--r--src/callback_scope.cc5
-rw-r--r--src/env-inl.h31
-rw-r--r--src/env.cc27
-rw-r--r--src/env.h29
-rw-r--r--src/js_stream.cc15
-rw-r--r--src/node.cc116
-rw-r--r--src/node_errors.h4
-rw-r--r--src/node_internals.h5
-rw-r--r--src/node_messaging.cc29
-rw-r--r--src/node_messaging.h5
-rw-r--r--src/node_worker.cc428
-rw-r--r--src/node_worker.h83
-rw-r--r--test/fixtures/worker-script.mjs3
-rw-r--r--test/parallel/test-message-channel-sharedarraybuffer.js28
-rw-r--r--test/parallel/test-message-channel.js22
-rw-r--r--test/parallel/test-worker-cleanup-handles.js30
-rw-r--r--test/parallel/test-worker-dns-terminate.js15
-rw-r--r--test/parallel/test-worker-esmodule.js11
-rw-r--r--test/parallel/test-worker-memory.js41
-rw-r--r--test/parallel/test-worker-nexttick-terminate.js20
-rw-r--r--test/parallel/test-worker-syntax-error-file.js18
-rw-r--r--test/parallel/test-worker-syntax-error.js17
-rw-r--r--test/parallel/test-worker-uncaught-exception-async.js20
-rw-r--r--test/parallel/test-worker-uncaught-exception.js18
-rw-r--r--test/parallel/test-worker.js18
-rw-r--r--test/sequential/test-async-wrap-getasyncid.js1
44 files changed, 1629 insertions, 74 deletions
diff --git a/doc/api/errors.md b/doc/api/errors.md
index 0a49757da3..972d0971c4 100644
--- a/doc/api/errors.md
+++ b/doc/api/errors.md
@@ -1313,6 +1313,13 @@ but not provided in the `transferList` for that call.
An [ES6 module][] could not be resolved.
+<a id="ERR_MISSING_PLATFORM_FOR_WORKER"></a>
+### ERR_MISSING_PLATFORM_FOR_WORKER
+
+The V8 platform used by this instance of Node.js does not support creating
+Workers. This is caused by lack of embedder support for Workers. In particular,
+this error will not occur with standard builds of Node.js.
+
<a id="ERR_MODULE_RESOLUTION_LEGACY"></a>
### ERR_MODULE_RESOLUTION_LEGACY
@@ -1722,6 +1729,22 @@ The fulfilled value of a linking promise is not a `vm.Module` object.
The current module's status does not allow for this operation. The specific
meaning of the error depends on the specific function.
+<a id="ERR_WORKER_NEED_ABSOLUTE_PATH"></a>
+### ERR_WORKER_NEED_ABSOLUTE_PATH
+
+The path for the main script of a worker is not an absolute path.
+
+<a id="ERR_WORKER_UNSERIALIZABLE_ERROR"></a>
+### ERR_WORKER_UNSERIALIZABLE_ERROR
+
+All attempts at serializing an uncaught exception from a worker thread failed.
+
+<a id="ERR_WORKER_UNSUPPORTED_EXTENSION"></a>
+### ERR_WORKER_UNSUPPORTED_EXTENSION
+
+The pathname used for the main script of a worker has an
+unknown file extension.
+
<a id="ERR_ZLIB_INITIALIZATION_FAILED"></a>
### ERR_ZLIB_INITIALIZATION_FAILED
diff --git a/doc/api/process.md b/doc/api/process.md
index 8205efbd78..0a9c52a44c 100644
--- a/doc/api/process.md
+++ b/doc/api/process.md
@@ -410,6 +410,8 @@ added: v0.7.0
The `process.abort()` method causes the Node.js process to exit immediately and
generate a core file.
+This feature is not available in [`Worker`][] threads.
+
## process.arch
<!-- YAML
added: v0.5.0
@@ -517,6 +519,8 @@ try {
}
```
+This feature is not available in [`Worker`][] threads.
+
## process.config
<!-- YAML
added: v0.7.7
@@ -918,6 +922,8 @@ console.log(process.env.test);
// => 1
```
+`process.env` is read-only in [`Worker`][] threads.
+
## process.execArgv
<!-- YAML
added: v0.7.7
@@ -1030,6 +1036,9 @@ If it is necessary to terminate the Node.js process due to an error condition,
throwing an *uncaught* error and allowing the process to terminate accordingly
is safer than calling `process.exit()`.
+In [`Worker`][] threads, this function stops the current thread rather
+than the current process.
+
## process.exitCode
<!-- YAML
added: v0.11.8
@@ -1203,6 +1212,7 @@ console.log(process.getgroups()); // [ 27, 30, 46, 1000 ]
This function is only available on POSIX platforms (i.e. not Windows or
Android).
+This feature is not available in [`Worker`][] threads.
## process.kill(pid[, signal])
<!-- YAML
@@ -1306,6 +1316,9 @@ The _heap_ is where objects, strings, and closures are stored. Variables are
stored in the _stack_ and the actual JavaScript code resides in the
_code segment_.
+When using [`Worker`][] threads, `rss` will be a value that is valid for the
+entire process, while the other fields will only refer to the current thread.
+
## process.nextTick(callback[, ...args])
<!-- YAML
added: v0.1.26
@@ -1569,6 +1582,7 @@ if (process.getegid && process.setegid) {
This function is only available on POSIX platforms (i.e. not Windows or
Android).
+This feature is not available in [`Worker`][] threads.
## process.seteuid(id)
<!-- YAML
@@ -1596,6 +1610,7 @@ if (process.geteuid && process.seteuid) {
This function is only available on POSIX platforms (i.e. not Windows or
Android).
+This feature is not available in [`Worker`][] threads.
## process.setgid(id)
<!-- YAML
@@ -1623,6 +1638,7 @@ if (process.getgid && process.setgid) {
This function is only available on POSIX platforms (i.e. not Windows or
Android).
+This feature is not available in [`Worker`][] threads.
## process.setgroups(groups)
<!-- YAML
@@ -1639,6 +1655,7 @@ The `groups` array can contain numeric group IDs, group names or both.
This function is only available on POSIX platforms (i.e. not Windows or
Android).
+This feature is not available in [`Worker`][] threads.
## process.setuid(id)
<!-- YAML
@@ -1664,6 +1681,7 @@ if (process.getuid && process.setuid) {
This function is only available on POSIX platforms (i.e. not Windows or
Android).
+This feature is not available in [`Worker`][] threads.
## process.setUncaughtExceptionCaptureCallback(fn)
<!-- YAML
@@ -1700,6 +1718,8 @@ a [Writable][] stream.
`process.stderr` differs from other Node.js streams in important ways, see
[note on process I/O][] for more information.
+This feature is not available in [`Worker`][] threads.
+
## process.stdin
* {Stream}
@@ -1732,6 +1752,8 @@ In "old" streams mode the `stdin` stream is paused by default, so one
must call `process.stdin.resume()` to read from it. Note also that calling
`process.stdin.resume()` itself would switch stream to "old" mode.
+This feature is not available in [`Worker`][] threads.
+
## process.stdout
* {Stream}
@@ -1750,6 +1772,8 @@ process.stdin.pipe(process.stdout);
`process.stdout` differs from other Node.js streams in important ways, see
[note on process I/O][] for more information.
+This feature is not available in [`Worker`][] threads.
+
### A note on process I/O
`process.stdout` and `process.stderr` differ from other Node.js streams in
@@ -1865,6 +1889,8 @@ console.log(
);
```
+This feature is not available in [`Worker`][] threads.
+
## process.uptime()
<!-- YAML
added: v0.5.0
@@ -1992,6 +2018,7 @@ cases:
[`ChildProcess`]: child_process.html#child_process_class_childprocess
[`Error`]: errors.html#errors_class_error
[`EventEmitter`]: events.html#events_class_eventemitter
+[`Worker`]: worker.html#worker_worker
[`console.error()`]: console.html#console_console_error_data_args
[`console.log()`]: console.html#console_console_log_data_args
[`domain`]: domain.html
diff --git a/doc/api/vm.md b/doc/api/vm.md
index ff2f8c4ad0..f3b79199c1 100644
--- a/doc/api/vm.md
+++ b/doc/api/vm.md
@@ -174,7 +174,7 @@ const contextifiedSandbox = vm.createContext({ secret: 42 });
Creates a new ES `Module` object.
-*Note*: Properties assigned to the `import.meta` object that are objects may
+Properties assigned to the `import.meta` object that are objects may
allow the `Module` to access information outside the specified `context`, if the
object is created in the top level context. Use `vm.runInContext()` to create
objects in a specific context.
diff --git a/doc/api/worker.md b/doc/api/worker.md
index 974ff2e467..3517a4c86a 100644
--- a/doc/api/worker.md
+++ b/doc/api/worker.md
@@ -4,6 +4,94 @@
> Stability: 1 - Experimental
+The `worker` module provides a way to create multiple environments running
+on independent threads, and to create message channels between them. It
+can be accessed using:
+
+```js
+const worker = require('worker');
+```
+
+Workers are useful for performing CPU-intensive JavaScript operations; do not
+use them for I/O, since Node.js’s built-in mechanisms for performing operations
+asynchronously already treat it more efficiently than Worker threads can.
+
+Workers, unlike child processes or when using the `cluster` module, can also
+share memory efficiently by transferring `ArrayBuffer` instances or sharing
+`SharedArrayBuffer` instances between them.
+
+## Example
+
+```js
+const { Worker, isMainThread, parentPort, workerData } = require('worker');
+
+if (isMainThread) {
+ module.exports = async function parseJSAsync(script) {
+ return new Promise((resolve, reject) => {
+ const worker = new Worker(__filename, {
+ workerData: script
+ });
+ worker.on('message', resolve);
+ worker.on('error', reject);
+ worker.on('exit', (code) => {
+ if (code !== 0)
+ reject(new Error(`Worker stopped with exit code ${code}`));
+ });
+ });
+ };
+} else {
+ const { parse } = require('some-js-parsing-library');
+ const script = workerData;
+ parentPort.postMessage(parse(script));
+}
+```
+
+Note that this example spawns a Worker thread for each `parse` call.
+In practice, it is strongly recommended to use a pool of Workers for these
+kinds of tasks, since the overhead of creating Workers would likely exceed the
+benefit of handing the work off to it.
+
+## worker.isMainThread
+<!-- YAML
+added: REPLACEME
+-->
+
+* {boolean}
+
+Is `true` if this code is not running inside of a [`Worker`][] thread.
+
+## worker.parentPort
+<!-- YAML
+added: REPLACEME
+-->
+
+* {null|MessagePort}
+
+If this thread was spawned as a [`Worker`][], this will be a [`MessagePort`][]
+allowing communication with the parent thread. Messages sent using
+`parentPort.postMessage()` will be available in the parent thread
+using `worker.on('message')`, and messages sent from the parent thread
+using `worker.postMessage()` will be available in this thread using
+`parentPort.on('message')`.
+
+## worker.threadId
+<!-- YAML
+added: REPLACEME
+-->
+
+* {integer}
+
+An integer identifier for the current thread. On the corresponding worker object
+(if there is any), it is available as [`worker.threadId`][].
+
+## worker.workerData
+<!-- YAML
+added: REPLACEME
+-->
+
+An arbitrary JavaScript value that contains a clone of the data passed
+to this thread’s `Worker` constructor.
+
## Class: MessageChannel
<!-- YAML
added: REPLACEME
@@ -21,7 +109,7 @@ const { MessageChannel } = require('worker');
const { port1, port2 } = new MessageChannel();
port1.on('message', (message) => console.log('received', message));
port2.postMessage({ foo: 'bar' });
-// prints: received { foo: 'bar' }
+// prints: received { foo: 'bar' } from the `port1.on('message')` listener
```
## Class: MessagePort
@@ -141,13 +229,220 @@ If listeners are attached or removed using `.on('message')`, the port will
be `ref()`ed and `unref()`ed automatically depending on whether
listeners for the event exist.
+## Class: Worker
+<!-- YAML
+added: REPLACEME
+-->
+
+The `Worker` class represents an independent JavaScript execution thread.
+Most Node.js APIs are available inside of it.
+
+Notable differences inside a Worker environment are:
+
+- The [`process.stdin`][], [`process.stdout`][] and [`process.stderr`][]
+ properties are set to `null`.
+- The [`require('worker').isMainThread`][] property is set to `false`.
+- The [`require('worker').parentPort`][] message port is available,
+- [`process.exit()`][] does not stop the whole program, just the single thread,
+ and [`process.abort()`][] is not available.
+- [`process.chdir()`][] and `process` methods that set group or user ids
+ are not available.
+- [`process.env`][] is a read-only reference to the environment variables.
+- [`process.title`][] cannot be modified.
+- Signals will not be delivered through [`process.on('...')`][Signals events].
+- Execution may stop at any point as a result of [`worker.terminate()`][]
+ being invoked.
+- IPC channels from parent processes are not accessible.
+
+Currently, the following differences also exist until they are addressed:
+
+- The [`inspector`][] module is not available yet.
+- Native addons are not supported yet.
+
+Creating `Worker` instances inside of other `Worker`s is possible.
+
+Like [Web Workers][] and the [`cluster` module][], two-way communication can be
+achieved through inter-thread message passing. Internally, a `Worker` has a
+built-in pair of [`MessagePort`][]s that are already associated with each other
+when the `Worker` is created. While the `MessagePort` object on the parent side
+is not directly exposed, its functionalities are exposed through
+[`worker.postMessage()`][] and the [`worker.on('message')`][] event
+on the `Worker` object for the parent thread.
+
+To create custom messaging channels (which is encouraged over using the default
+global channel because it facilitates separation of concerns), users can create
+a `MessageChannel` object on either thread and pass one of the
+`MessagePort`s on that `MessageChannel` to the other thread through a
+pre-existing channel, such as the global one.
+
+See [`port.postMessage()`][] for more information on how messages are passed,
+and what kind of JavaScript values can be successfully transported through
+the thread barrier.
+
+For example:
+
+```js
+const assert = require('assert');
+const { Worker, MessageChannel, MessagePort, isMainThread } = require('worker');
+if (isMainThread) {
+ const worker = new Worker(__filename);
+ const subChannel = new MessageChannel();
+ worker.postMessage({ hereIsYourPort: subChannel.port1 }, [subChannel.port1]);
+ subChannel.port2.on('message', (value) => {
+ console.log('received:', value);
+ });
+} else {
+ require('worker').once('workerMessage', (value) => {
+ assert(value.hereIsYourPort instanceof MessagePort);
+ value.hereIsYourPort.postMessage('the worker is sending this');
+ value.hereIsYourPort.close();
+ });
+}
+```
+
+### new Worker(filename, options)
+
+* `filename` {string} The absolute path to the Worker’s main script.
+ If `options.eval` is true, this is a string containing JavaScript code rather
+ than a path.
+* `options` {Object}
+ * `eval` {boolean} If true, interpret the first argument to the constructor
+ as a script that is executed once the worker is online.
+ * `data` {any} Any JavaScript value that will be cloned and made
+ available as [`require('worker').workerData`][]. The cloning will occur as
+ described in the [HTML structured clone algorithm][], and an error will be
+ thrown if the object cannot be cloned (e.g. because it contains
+ `function`s).
+
+### Event: 'error'
+<!-- YAML
+added: REPLACEME
+-->
+
+* `err` {Error}
+
+The `'error'` event is emitted if the worker thread throws an uncaught
+exception. In that case, the worker will be terminated.
+
+### Event: 'exit'
+<!-- YAML
+added: REPLACEME
+-->
+
+* `exitCode` {integer}
+
+The `'exit'` event is emitted once the worker has stopped. If the worker
+exited by calling [`process.exit()`][], the `exitCode` parameter will be the
+passed exit code. If the worker was terminated, the `exitCode` parameter will
+be `1`.
+
+### Event: 'message'
+<!-- YAML
+added: REPLACEME
+-->
+
+* `value` {any} The transmitted value
+
+The `'message'` event is emitted when the worker thread has invoked
+[`require('worker').postMessage()`][]. See the [`port.on('message')`][] event
+for more details.
+
+### Event: 'online'
+<!-- YAML
+added: REPLACEME
+-->
+
+The `'online'` event is emitted when the worker thread has started executing
+JavaScript code.
+
+### worker.postMessage(value[, transferList])
+<!-- YAML
+added: REPLACEME
+-->
+
+* `value` {any}
+* `transferList` {Object[]}
+
+Send a message to the worker that will be received via
+[`require('worker').on('workerMessage')`][]. See [`port.postMessage()`][] for
+more details.
+
+### worker.ref()
+<!-- YAML
+added: REPLACEME
+-->
+
+Opposite of `unref()`, calling `ref()` on a previously `unref()`ed worker will
+*not* let the program exit if it's the only active handle left (the default
+behavior). If the worker is `ref()`ed, calling `ref()` again will have
+no effect.
+
+### worker.terminate([callback])
+<!-- YAML
+added: REPLACEME
+-->
+
+* `callback` {Function}
+
+Stop all JavaScript execution in the worker thread as soon as possible.
+`callback` is an optional function that is invoked once this operation is known
+to have completed.
+
+**Warning**: Currently, not all code in the internals of Node.js is prepared to
+expect termination at arbitrary points in time and may crash if it encounters
+that condition. Consequently, you should currently only call `.terminate()` if
+it is known that the Worker thread is not accessing Node.js core modules other
+than what is exposed in the `worker` module.
+
+### worker.threadId
+<!-- YAML
+added: REPLACEME
+-->
+
+* {integer}
+
+An integer identifier for the referenced thread. Inside the worker thread,
+it is available as [`require('worker').threadId`][].
+
+### worker.unref()
+<!-- YAML
+added: REPLACEME
+-->
+
+Calling `unref()` on a worker will allow the thread to exit if this is the only
+active handle in the event system. If the worker is already `unref()`ed calling
+`unref()` again will have no effect.
+
[`Buffer`]: buffer.html
-[child processes]: child_process.html
[`EventEmitter`]: events.html
[`MessagePort`]: #worker_class_messageport
[`port.postMessage()`]: #worker_port_postmessage_value_transferlist
+[`Worker`]: #worker_class_worker
+[`worker.terminate()`]: #worker_worker_terminate_callback
+[`worker.postMessage()`]: #worker_worker_postmessage_value_transferlist_1
+[`worker.on('message')`]: #worker_event_message_1
+[`worker.threadId`]: #worker_worker_threadid_1
+[`port.on('message')`]: #worker_event_message
+[`process.exit()`]: process.html#process_process_exit_code
+[`process.abort()`]: process.html#process_process_abort
+[`process.chdir()`]: process.html#process_process_chdir_directory
+[`process.env`]: process.html#process_process_env
+[`process.stdin`]: process.html#process_process_stdin
+[`process.stderr`]: process.html#process_process_stderr
+[`process.stdout`]: process.html#process_process_stdout
+[`process.title`]: process.html#process_process_title
+[`require('worker').workerData`]: #worker_worker_workerdata
+[`require('worker').on('workerMessage')`]: #worker_event_workermessage
+[`require('worker').postMessage()`]: #worker_worker_postmessage_value_transferlist
+[`require('worker').isMainThread`]: #worker_worker_ismainthread
+[`require('worker').threadId`]: #worker_worker_threadid
+[`cluster` module]: cluster.html
+[`inspector`]: inspector.html
[v8.serdes]: v8.html#v8_serialization_api
[`SharedArrayBuffer`]: https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/SharedArrayBuffer
+[Signals events]: process.html#process_signal_events
[`Uint8Array`]: https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Uint8Array
[browser `MessagePort`]: https://developer.mozilla.org/en-US/docs/Web/API/MessagePort
+[child processes]: child_process.html
[HTML structured clone algorithm]: https://developer.mozilla.org/en-US/docs/Web/API/Web_Workers_API/Structured_clone_algorithm
+[Web Workers]: https://developer.mozilla.org/en-US/docs/Web/API/Web_Workers_API
diff --git a/lib/inspector.js b/lib/inspector.js
index 3285c1040a..f4ec71fd6c 100644
--- a/lib/inspector.js
+++ b/lib/inspector.js
@@ -12,7 +12,7 @@ const {
const util = require('util');
const { Connection, open, url } = process.binding('inspector');
-if (!Connection)
+if (!Connection || !require('internal/worker').isMainThread)
throw new ERR_INSPECTOR_NOT_AVAILABLE();
const connectionSymbol = Symbol('connectionProperty');
diff --git a/lib/internal/bootstrap/node.js b/lib/internal/bootstrap/node.js
index 6477c2d828..4817ec110a 100644
--- a/lib/internal/bootstrap/node.js
+++ b/lib/internal/bootstrap/node.js
@@ -24,6 +24,7 @@
_shouldAbortOnUncaughtToggle },
{ internalBinding, NativeModule }) {
const exceptionHandlerState = { captureFn: null };
+ const isMainThread = internalBinding('worker').threadId === 0;
function startup() {
const EventEmitter = NativeModule.require('events');
@@ -100,7 +101,9 @@
NativeModule.require('internal/inspector_async_hook').setup();
}
- _process.setupChannel();
+ if (isMainThread)
+ _process.setupChannel();
+
_process.setupRawDebug(_rawDebug);
const browserGlobals = !process._noBrowserGlobals;
@@ -175,8 +178,11 @@
// are running from a script and running the REPL - but there are a few
// others like the debugger or running --eval arguments. Here we decide
// which mode we run in.
-
- if (NativeModule.exists('_third_party_main')) {
+ if (internalBinding('worker').getEnvMessagePort() !== undefined) {
+ // This means we are in a Worker context, and any script execution
+ // will be directed by the worker module.
+ NativeModule.require('internal/worker').setupChild(evalScript);
+ } else if (NativeModule.exists('_third_party_main')) {
// To allow people to extend Node in different ways, this hook allows
// one to drop a file lib/_third_party_main.js into the build
// directory which will be executed instead of Node's normal loading.
@@ -542,7 +548,7 @@
return `process.binding('inspector').callAndPauseOnStart(${fn}, {})`;
}
- function evalScript(name) {
+ function evalScript(name, body = wrapForBreakOnFirstLine(process._eval)) {
const CJSModule = NativeModule.require('internal/modules/cjs/loader');
const path = NativeModule.require('path');
const cwd = tryGetCwd(path);
@@ -550,7 +556,6 @@
const module = new CJSModule(name);
module.filename = path.join(cwd, name);
module.paths = CJSModule._nodeModulePaths(cwd);
- const body = wrapForBreakOnFirstLine(process._eval);
const script = `global.__filename = ${JSON.stringify(name)};\n` +
'global.exports = exports;\n' +
'global.module = module;\n' +
diff --git a/lib/internal/errors.js b/lib/internal/errors.js
index 89c0139f8b..d59531debb 100644
--- a/lib/internal/errors.js
+++ b/lib/internal/errors.js
@@ -844,4 +844,9 @@ E('ERR_VM_MODULE_NOT_LINKED',
E('ERR_VM_MODULE_NOT_MODULE',
'Provided module is not an instance of Module', Error);
E('ERR_VM_MODULE_STATUS', 'Module status %s', Error);
+E('ERR_WORKER_NEED_ABSOLUTE_PATH',
+ 'The worker script filename must be an absolute path. Received "%s"',
+ TypeError);
+E('ERR_WORKER_UNSERIALIZABLE_ERROR',
+ 'Serializing an uncaught exception failed', Error);
E('ERR_ZLIB_INITIALIZATION_FAILED', 'Initialization failed', Error);
diff --git a/lib/internal/process.js b/lib/internal/process.js
index 0f0e40d6a0..f01be32be4 100644
--- a/lib/internal/process.js
+++ b/lib/internal/process.js
@@ -16,6 +16,7 @@ const util = require('util');
const constants = process.binding('constants').os.signals;
const assert = require('assert').strict;
const { deprecate } = require('internal/util');
+const { isMainThread } = require('internal/worker');
process.assert = deprecate(
function(x, msg) {
@@ -186,6 +187,11 @@ function setupKillAndExit() {
function setupSignalHandlers() {
+ if (!isMainThread) {
+ // Worker threads don't receive signals.
+ return;
+ }
+
const signalWraps = Object.create(null);
let Signal;
diff --git a/lib/internal/process/methods.js b/lib/internal/process/methods.js
index 91aca398b3..9a954f6a9b 100644
--- a/lib/internal/process/methods.js
+++ b/lib/internal/process/methods.js
@@ -8,11 +8,18 @@ const {
validateMode,
validateUint32
} = require('internal/validators');
+const {
+ isMainThread
+} = require('internal/worker');
function setupProcessMethods(_chdir, _cpuUsage, _hrtime, _memoryUsage,
_rawDebug, _umask, _initgroups, _setegid,
_seteuid, _setgid, _setuid, _setgroups) {
// Non-POSIX platforms like Windows don't have certain methods.
+ // Workers also lack these methods since they change process-global state.
+ if (!isMainThread)
+ return;
+
if (_setgid !== undefined) {
setupPosixMethods(_initgroups, _setegid, _seteuid,
_setgid, _setuid, _setgroups);
diff --git a/lib/internal/process/stdio.js b/lib/internal/process/stdio.js
index eaba4dfca1..76e6ab8514 100644
--- a/lib/internal/process/stdio.js
+++ b/lib/internal/process/stdio.js
@@ -6,6 +6,7 @@ const {
ERR_UNKNOWN_STDIN_TYPE,
ERR_UNKNOWN_STREAM_TYPE
} = require('internal/errors').codes;
+const { isMainThread } = require('internal/worker');
exports.setup = setupStdio;
@@ -16,6 +17,8 @@ function setupStdio() {
function getStdout() {
if (stdout) return stdout;
+ if (!isMainThread)
+ return new (require('stream').Writable)({ write(b, e, cb) { cb(); } });
stdout = createWritableStdioStream(1);
stdout.destroySoon = stdout.destroy;
stdout._destroy = function(er, cb) {
@@ -31,6 +34,8 @@ function setupStdio() {
function getStderr() {
if (stderr) return stderr;
+ if (!isMainThread)
+ return new (require('stream').Writable)({ write(b, e, cb) { cb(); } });
stderr = createWritableStdioStream(2);
stderr.destroySoon = stderr.destroy;
stderr._destroy = function(er, cb) {
@@ -46,6 +51,8 @@ function setupStdio() {
function getStdin() {
if (stdin) return stdin;
+ if (!isMainThread)
+ return new (require('stream').Readable)({ read() { this.push(null); } });
const tty_wrap = process.binding('tty_wrap');
const fd = 0;
diff --git a/lib/internal/util/inspector.js b/lib/internal/util/inspector.js
index 634d330233..3dd73415de 100644
--- a/lib/internal/util/inspector.js
+++ b/lib/internal/util/inspector.js
@@ -1,6 +1,8 @@
'use strict';
-const hasInspector = process.config.variables.v8_enable_inspector === 1;
+// TODO(addaleax): Figure out how to integrate the inspector with workers.
+const hasInspector = process.config.variables.v8_enable_inspector === 1 &&
+ require('internal/worker').isMainThread;
const inspector = hasInspector ? require('inspector') : undefined;
let session;
diff --git a/lib/internal/worker.js b/lib/internal/worker.js
index 73f7525aa7..c982478b93 100644
--- a/lib/internal/worker.js
+++ b/lib/internal/worker.js
@@ -1,24 +1,49 @@
'use strict';
+const Buffer = require('buffer').Buffer;
const EventEmitter = require('events');
+const assert = require('assert');
+const path = require('path');
const util = require('util');
+const {
+ ERR_INVALID_ARG_TYPE,
+ ERR_WORKER_NEED_ABSOLUTE_PATH,
+ ERR_WORKER_UNSERIALIZABLE_ERROR
+} = require('internal/errors').codes;
const { internalBinding } = require('internal/bootstrap/loaders');
const { MessagePort, MessageChannel } = internalBinding('messaging');
const { handle_onclose } = internalBinding('symbols');
+const { clearAsyncIdStack } = require('internal/async_hooks');
util.inherits(MessagePort, EventEmitter);
+const {
+ Worker: WorkerImpl,
+ getEnvMessagePort,
+ threadId
+} = internalBinding('worker');
+
+const isMainThread = threadId === 0;
+
const kOnMessageListener = Symbol('kOnMessageListener');
+const kHandle = Symbol('kHandle');
+const kPort = Symbol('kPort');
+const kPublicPort = Symbol('kPublicPort');
+const kDispose = Symbol('kDispose');
+const kOnExit = Symbol('kOnExit');
+const kOnMessage = Symbol('kOnMessage');
+const kOnCouldNotSerializeErr = Symbol('kOnCouldNotSerializeErr');
+const kOnErrorMessage = Symbol('kOnErrorMessage');
const debug = util.debuglog('worker');
-// A MessagePort consists of a handle (that wraps around an
+// A communication channel consisting of a handle (that wraps around an
// uv_async_t) which can receive information from other threads and emits
// .onmessage events, and a function used for sending data to a MessagePort
// in some other thread.
MessagePort.prototype[kOnMessageListener] = function onmessage(payload) {
- debug('received message', payload);
+ debug(`[${threadId}] received message`, payload);
// Emit the deserialized object to userland.
this.emit('message', payload);
};
@@ -79,6 +104,9 @@ MessagePort.prototype.close = function(cb) {
originalClose.call(this);
};
+const drainMessagePort = MessagePort.prototype.drain;
+delete MessagePort.prototype.drain;
+
function setupPortReferencing(port, eventEmitter, eventName) {
// Keep track of whether there are any workerMessage listeners:
// If there are some, ref() the channel so it keeps the event loop alive.
@@ -99,7 +127,194 @@ function setupPortReferencing(port, eventEmitter, eventName) {
});
}
+
+class Worker extends EventEmitter {
+ constructor(filename, options = {}) {
+ super();
+ debug(`[${threadId}] create new worker`, filename, options);
+ if (typeof filename !== 'string') {
+ throw new ERR_INVALID_ARG_TYPE('filename', 'string', filename);
+ }
+
+ if (!options.eval && !path.isAbsolute(filename)) {
+ throw new ERR_WORKER_NEED_ABSOLUTE_PATH(filename);
+ }
+
+ // Set up the C++ handle for the worker, as well as some internal wiring.
+ this[kHandle] = new WorkerImpl();
+ this[kHandle].onexit = (code) => this[kOnExit](code);
+ this[kPort] = this[kHandle].messagePort;
+ this[kPort].on('message', (data) => this[kOnMessage](data));
+ this[kPort].start();
+ this[kPort].unref();
+ debug(`[${threadId}] created Worker with ID ${this.threadId}`);
+
+ const { port1, port2 } = new MessageChannel();
+ this[kPublicPort] = port1;
+ this[kPublicPort].on('message', (message) => this.emit('message', message));
+ setupPortReferencing(this[kPublicPort], this, 'message');
+ this[kPort].postMessage({
+ type: 'loadScript',
+ filename,
+ doEval: !!options.eval,
+ workerData: options.workerData,
+ publicPort: port2
+ }, [port2]);
+ // Actually start the new thread now that everything is in place.
+ this[kHandle].startThread();
+ }
+
+ [kOnExit](code) {
+ debug(`[${threadId}] hears end event for Worker ${this.threadId}`);
+ drainMessagePort.call(this[kPublicPort]);
+ this[kDispose]();
+ this.emit('exit', code);
+ this.removeAllListeners();
+ }
+
+ [kOnCouldNotSerializeErr]() {
+ this.emit('error', new ERR_WORKER_UNSERIALIZABLE_ERROR());
+ }
+
+ [kOnErrorMessage](serialized) {
+ // This is what is called for uncaught exceptions.
+ const error = deserializeError(serialized);
+ this.emit('error', error);
+ }
+
+ [kOnMessage](message) {
+ switch (message.type) {
+ case 'upAndRunning':
+ return this.emit('online');
+ case 'couldNotSerializeError':
+ return this[kOnCouldNotSerializeErr]();
+ case 'errorMessage':
+ return this[kOnErrorMessage](message.error);
+ }
+
+ assert.fail(`Unknown worker message type ${message.type}`);
+ }
+
+ [kDispose]() {
+ this[kHandle].onexit = null;
+ this[kHandle] = null;
+ this[kPort] = null;
+ this[kPublicPort] = null;
+ }
+
+ postMessage(...args) {
+ this[kPublicPort].postMessage(...args);
+ }
+
+ terminate(callback) {
+ if (this[kHandle] === null) return;
+
+ debug(`[${threadId}] terminates Worker with ID ${this.threadId}`);
+
+ if (typeof callback !== 'undefined')
+ this.once('exit', (exitCode) => callback(null, exitCode));
+
+ this[kHandle].stopThread();
+ }
+
+ ref() {
+ if (this[kHandle] === null) return;
+
+ this[kHandle].ref();
+ this[kPublicPort].ref();
+ }
+
+ unref() {
+ if (this[kHandle] === null) return;
+
+ this[kHandle].unref();
+ this[kPublicPort].unref();
+ }
+
+ get threadId() {
+ if (this[kHandle] === null) return -1;
+
+ return this[kHandle].threadId;
+ }
+}
+
+let originalFatalException;
+
+function setupChild(evalScript) {
+ // Called during bootstrap to set up worker script execution.
+ debug(`[${threadId}] is setting up worker child environment`);
+ const port = getEnvMessagePort();
+
+ const publicWorker = require('worker');
+
+ port.on('message', (message) => {
+ if (message.type === 'loadScript') {
+ const { filename, doEval, workerData, publicPort } = message;
+ publicWorker.parentPort = publicPort;
+ setupPortReferencing(publicPort, publicPort, 'message');
+ publicWorker.workerData = workerData;
+ debug(`[${threadId}] starts worker script ${filename} ` +
+ `(eval = ${eval}) at cwd = ${process.cwd()}`);
+ port.unref();
+ port.postMessage({ type: 'upAndRunning' });
+ if (doEval) {
+ evalScript('[worker eval]', filename);
+ } else {
+ process.argv[1] = filename; // script filename
+ require('module').runMain();
+ }
+ return;
+ }
+
+ assert.fail(`Unknown worker message type ${message.type}`);
+ });
+
+ port.start();
+
+ originalFatalException = process._fatalException;
+ process._fatalException = fatalException;
+
+ function fatalException(error) {
+ debug(`[${threadId}] gets fatal exception`);
+ let caught = false;
+ try {
+ caught = originalFatalException.call(this, error);
+ } catch (e) {
+ error = e;
+ }
+ debug(`[${threadId}] fatal exception caught = ${caught}`);
+
+ if (!caught) {
+ let serialized;
+ try {
+ serialized = serializeError(error);
+ } catch {}
+ debug(`[${threadId}] fatal exception serialized = ${!!serialized}`);
+ if (serialized)
+ port.postMessage({ type: 'errorMessage', error: serialized });
+ else
+ port.postMessage({ type: 'couldNotSerializeError' });
+ clearAsyncIdStack();
+ }
+ }
+}
+
+// TODO(addaleax): These can be improved a lot.
+function serializeError(error) {
+ return Buffer.from(util.inspect(error), 'utf8');
+}
+
+function deserializeError(error) {
+ return Buffer.from(error.buffer,
+ error.byteOffset,
+ error.byteLength).toString('utf8');
+}
+
module.exports = {
MessagePort,
- MessageChannel
+ MessageChannel,
+ threadId,
+ Worker,
+ setupChild,
+ isMainThread
};
diff --git a/lib/worker.js b/lib/worker.js
index d67fb4efe4..0609650cd5 100644
--- a/lib/worker.js
+++ b/lib/worker.js
@@ -1,5 +1,18 @@
'use strict';
-const { MessagePort, MessageChannel } = require('internal/worker');
+const {
+ isMainThread,
+ MessagePort,
+ MessageChannel,
+ threadId,
+ Worker
+} = require('internal/worker');
-module.exports = { MessagePort, MessageChannel };
+module.exports = {
+ isMainThread,
+ MessagePort,
+ MessageChannel,
+ threadId,
+ Worker,
+ parentPort: null
+};
diff --git a/node.gyp b/node.gyp
index 804c102a5b..e9a2cb46a8 100644
--- a/node.gyp
+++ b/node.gyp
@@ -349,6 +349,7 @@
'src/node_v8.cc',
'src/node_stat_watcher.cc',
'src/node_watchdog.cc',
+ 'src/node_worker.cc',
'src/node_zlib.cc',
'src/node_i18n.cc',
'src/pipe_wrap.cc',
@@ -407,6 +408,7 @@
'src/node_wrap.h',
'src/node_revert.h',
'src/node_i18n.h',
+ 'src/node_worker.h',
'src/pipe_wrap.h',
'src/tty_wrap.h',
'src/tcp_wrap.h',
diff --git a/src/async_wrap.h b/src/async_wrap.h
index cf269a4c1f..b2f96477b4 100644
--- a/src/async_wrap.h
+++ b/src/async_wrap.h
@@ -67,6 +67,7 @@ namespace node {
V(TTYWRAP) \
V(UDPSENDWRAP) \
V(UDPWRAP) \
+ V(WORKER) \
V(WRITEWRAP) \
V(ZLIB)
diff --git a/src/base_object-inl.h b/src/base_object-inl.h
index 3bd854639b..06a2922397 100644
--- a/src/base_object-inl.h
+++ b/src/base_object-inl.h
@@ -65,6 +65,14 @@ v8::Local<v8::Object> BaseObject::object() {
return PersistentToLocal(env_->isolate(), persistent_handle_);
}
+v8::Local<v8::Object> BaseObject::object(v8::Isolate* isolate) {
+ v8::Local<v8::Object> handle = object();
+#ifdef DEBUG
+ CHECK_EQ(handle->CreationContext()->GetIsolate(), isolate);
+ CHECK_EQ(env_->isolate(), isolate);
+#endif
+ return handle;
+}
Environment* BaseObject::env() const {
return env_;
diff --git a/src/base_object.h b/src/base_object.h
index e0b6084340..38291d598f 100644
--- a/src/base_object.h
+++ b/src/base_object.h
@@ -43,6 +43,10 @@ class BaseObject {
// persistent.IsEmpty() is true.
inline v8::Local<v8::Object> object();
+ // Same as the above, except it additionally verifies that this object
+ // is associated with the passed Isolate in debug mode.
+ inline v8::Local<v8::Object> object(v8::Isolate* isolate);
+
inline Persistent<v8::Object>& persistent();
inline Environment* env() const;
diff --git a/src/bootstrapper.cc b/src/bootstrapper.cc
index 35c7c4dc69..f9db02562d 100644
--- a/src/bootstrapper.cc
+++ b/src/bootstrapper.cc
@@ -114,12 +114,14 @@ void SetupBootstrapObject(Environment* env,
BOOTSTRAP_METHOD(_umask, Umask);
#if defined(__POSIX__) && !defined(__ANDROID__) && !defined(__CloudABI__)
- BOOTSTRAP_METHOD(_initgroups, InitGroups);
- BOOTSTRAP_METHOD(_setegid, SetEGid);
- BOOTSTRAP_METHOD(_seteuid, SetEUid);
- BOOTSTRAP_METHOD(_setgid, SetGid);
- BOOTSTRAP_METHOD(_setuid, SetUid);
- BOOTSTRAP_METHOD(_setgroups, SetGroups);
+ if (env->is_main_thread()) {
+ BOOTSTRAP_METHOD(_initgroups, InitGroups);
+ BOOTSTRAP_METHOD(_setegid, SetEGid);
+ BOOTSTRAP_METHOD(_seteuid, SetEUid);
+ BOOTSTRAP_METHOD(_setgid, SetGid);
+ BOOTSTRAP_METHOD(_setuid, SetUid);
+ BOOTSTRAP_METHOD(_setgroups, SetGroups);
+ }
#endif // __POSIX__ && !defined(__ANDROID__) && !defined(__CloudABI__)
Local<String> should_abort_on_uncaught_toggle =
diff --git a/src/callback_scope.cc b/src/callback_scope.cc
index 9eac7beb03..23e6d5b063 100644
--- a/src/callback_scope.cc
+++ b/src/callback_scope.cc
@@ -79,6 +79,11 @@ void InternalCallbackScope::Close() {
closed_ = true;
HandleScope handle_scope(env_->isolate());
+ if (!env_->can_call_into_js()) return;
+ if (failed_ && !env_->is_main_thread() && env_->is_stopping_worker()) {
+ env_->async_hooks()->clear_async_id_stack();
+ }
+
if (pushed_ids_)
env_->async_hooks()->pop_async_id(async_context_.async_id);
diff --git a/src/env-inl.h b/src/env-inl.h
index 50328bd77c..eeb419b4a0 100644
--- a/src/env-inl.h
+++ b/src/env-inl.h
@@ -582,13 +582,42 @@ void Environment::SetUnrefImmediate(native_immediate_callback cb,
}
inline bool Environment::can_call_into_js() const {
- return can_call_into_js_;
+ return can_call_into_js_ && (is_main_thread() || !is_stopping_worker());
}
inline void Environment::set_can_call_into_js(bool can_call_into_js) {
can_call_into_js_ = can_call_into_js;
}
+inline bool Environment::is_main_thread() const {
+ return thread_id_ == 0;
+}
+
+inline double Environment::thread_id() const {
+ return thread_id_;
+}
+
+inline void Environment::set_thread_id(double id) {
+ thread_id_ = id;
+}
+
+inline worker::Worker* Environment::worker_context() const {
+ return worker_context_;
+}
+
+inline void Environment::set_worker_context(worker::Worker* context) {
+ CHECK_EQ(worker_context_, nullptr); // Should be set only once.
+ worker_context_ = context;
+}
+
+inline void Environment::add_sub_worker_context(worker::Worker* context) {
+ sub_worker_contexts_.insert(context);
+}
+
+inline void Environment::remove_sub_worker_context(worker::Worker* context) {
+ sub_worker_contexts_.erase(context);
+}
+
inline performance::performance_state* Environment::performance_state() {
return performance_state_.get();
}
diff --git a/src/env.cc b/src/env.cc
index 090b43968b..8df59d1546 100644
--- a/src/env.cc
+++ b/src/env.cc
@@ -4,6 +4,7 @@
#include "node_buffer.h"
#include "node_platform.h"
#include "node_file.h"
+#include "node_worker.h"
#include "tracing/agent.h"
#include <stdio.h>
@@ -25,6 +26,7 @@ using v8::StackTrace;
using v8::String;
using v8::Symbol;
using v8::Value;
+using worker::Worker;
IsolateData::IsolateData(Isolate* isolate,
uv_loop_t* event_loop,
@@ -444,7 +446,9 @@ void Environment::RunAndClearNativeImmediates() {
if (it->refed_)
ref_count++;
if (UNLIKELY(try_catch.HasCaught())) {
- FatalException(isolate(), try_catch);
+ if (!try_catch.HasTerminated())
+ FatalException(isolate(), try_catch);
+
// Bail out, remove the already executed callbacks from list
// and set up a new TryCatch for the other pending callbacks.
std::move_backward(it, list.end(), list.begin() + (list.end() - it));
@@ -632,4 +636,25 @@ void Environment::AsyncHooks::grow_async_ids_stack() {
uv_key_t Environment::thread_local_env = {};
+void Environment::Exit(int exit_code) {
+ if (is_main_thread())
+ exit(exit_code);
+ else
+ worker_context_->Exit(exit_code);
+}
+
+void Environment::stop_sub_worker_contexts() {
+ while (!sub_worker_contexts_.empty()) {
+ Worker* w = *sub_worker_contexts_.begin();
+ remove_sub_worker_context(w);
+ w->Exit(1);
+ w->JoinThread();
+ }
+}
+
+bool Environment::is_stopping_worker() const {
+ CHECK(!is_main_thread());
+ return worker_context_->is_stopped();
+}
+
} // namespace node
diff --git a/src/env.h b/src/env.h
index cdb592732a..cf6873e5fe 100644
--- a/src/env.h
+++ b/src/env.h
@@ -55,6 +55,10 @@ namespace performance {
class performance_state;
}
+namespace worker {
+class Worker;
+}
+
namespace loader {
class ModuleWrap;
@@ -193,7 +197,10 @@ struct PackageConfig {
V(mac_string, "mac") \
V(main_string, "main") \
V(max_buffer_string, "maxBuffer") \
+ V(max_semi_space_size_string, "maxSemiSpaceSize") \
+ V(max_old_space_size_string, "maxOldSpaceSize") \
V(message_string, "message") \
+ V(message_port_string, "messagePort") \
V(message_port_constructor_string, "MessagePort") \
V(minttl_string, "minttl") \
V(modulus_string, "modulus") \
@@ -280,6 +287,7 @@ struct PackageConfig {
V(subject_string, "subject") \
V(subjectaltname_string, "subjectaltname") \
V(syscall_string, "syscall") \
+ V(thread_id_string, "threadId") \
V(ticketkeycallback_string, "onticketkeycallback") \
V(timeout_string, "timeout") \
V(tls_ticket_string, "tlsTicket") \
@@ -328,6 +336,7 @@ struct PackageConfig {
V(http2stream_constructor_template, v8::ObjectTemplate) \
V(immediate_callback_function, v8::Function) \
V(inspector_console_api_object, v8::Object) \
+ V(message_port, v8::Object) \
V(message_port_constructor_template, v8::FunctionTemplate) \
V(pbkdf2_constructor_template, v8::ObjectTemplate) \
V(pipe_constructor_template, v8::FunctionTemplate) \
@@ -601,6 +610,7 @@ class Environment {
void RegisterHandleCleanups();
void CleanupHandles();
+ void Exit(int code);
// Register clean-up cb to be called on environment destruction.
inline void RegisterHandleCleanup(uv_handle_t* handle,
@@ -714,6 +724,18 @@ class Environment {
inline bool can_call_into_js() const;
inline void set_can_call_into_js(bool can_call_into_js);
+ // TODO(addaleax): This should be inline.
+ bool is_stopping_worker() const;
+
+ inline bool is_main_thread() const;
+ inline double thread_id() const;
+ inline void set_thread_id(double id);
+ inline worker::Worker* worker_context() const;
+ inline void set_worker_context(worker::Worker* context);
+ inline void add_sub_worker_context(worker::Worker* context);
+ inline void remove_sub_worker_context(worker::Worker* context);
+ void stop_sub_worker_contexts();
+
inline void ThrowError(const char* errmsg);
inline void ThrowTypeError(const char* errmsg);
inline void ThrowRangeError(const char* errmsg);
@@ -855,12 +877,15 @@ class Environment {
std::vector<double> destroy_async_id_list_;
AliasedBuffer<uint32_t, v8::Uint32Array> should_abort_on_uncaught_toggle_;
-
int should_not_abort_scope_counter_ = 0;
std::unique_ptr<performance::performance_state> performance_state_;
std::unordered_map<std::string, uint64_t> performance_marks_;
+
bool can_call_into_js_ = true;
+ double thread_id_ = 0;
+ std::unordered_set<worker::Worker*> sub_worker_contexts_;
+
#if HAVE_INSPECTOR
std::unique_ptr<inspector::Agent> inspector_agent_;
@@ -893,6 +918,8 @@ class Environment {
std::vector<std::unique_ptr<fs::FileHandleReadWrap>>
file_handle_read_wrap_freelist_;
+ worker::Worker* worker_context_ = nullptr;
+
struct ExitCallback {
void (*cb_)(void* arg);
void* arg_;
diff --git a/src/js_stream.cc b/src/js_stream.cc
index c766c322e3..e562a62f3d 100644
--- a/src/js_stream.cc
+++ b/src/js_stream.cc
@@ -44,7 +44,8 @@ bool JSStream::IsClosing() {
TryCatch try_catch(env()->isolate());
Local<Value> value;
if (!MakeCallback(env()->isclosing_string(), 0, nullptr).ToLocal(&value)) {
- FatalException(env()->isolate(), try_catch);
+ if (!try_catch.HasTerminated())
+ FatalException(env()->isolate(), try_catch);
return true;
}
return value->IsTrue();
@@ -59,7 +60,8 @@ int JSStream::ReadStart() {
int value_int = UV_EPROTO;
if (!MakeCallback(env()->onreadstart_string(), 0, nullptr).ToLocal(&value) ||
!value->Int32Value(env()->context()).To(&value_int)) {
- FatalException(env()->isolate(), try_catch);
+ if (!try_catch.HasTerminated())
+ FatalException(env()->isolate(), try_catch);
}
return value_int;
}
@@ -73,7 +75,8 @@ int JSStream::ReadStop() {
int value_int = UV_EPROTO;
if (!MakeCallback(env()->onreadstop_string(), 0, nullptr).ToLocal(&value) ||
!value->Int32Value(env()->context()).To(&value_int)) {
- FatalException(env()->isolate(), try_catch);
+ if (!try_catch.HasTerminated())
+ FatalException(env()->isolate(), try_catch);
}
return value_int;
}
@@ -94,7 +97,8 @@ int JSStream::DoShutdown(ShutdownWrap* req_wrap) {
arraysize(argv),
argv).ToLocal(&value) ||
!value->Int32Value(env()->context()).To(&value_int)) {
- FatalException(env()->isolate(), try_catch);
+ if (!try_catch.HasTerminated())
+ FatalException(env()->isolate(), try_catch);
}
return value_int;
}
@@ -128,7 +132,8 @@ int JSStream::DoWrite(WriteWrap* w,
arraysize(argv),
argv).ToLocal(&value) ||
!value->Int32Value(env()->context()).To(&value_int)) {
- FatalException(env()->isolate(), try_catch);
+ if (!try_catch.HasTerminated())
+ FatalException(env()->isolate(), try_catch);
}
return value_int;
}
diff --git a/src/node.cc b/src/node.cc
index baa97281b0..663e4a222e 100644
--- a/src/node.cc
+++ b/src/node.cc
@@ -1021,9 +1021,9 @@ void AppendExceptionLine(Environment* env,
}
-static void ReportException(Environment* env,
- Local<Value> er,
- Local<Message> message) {
+void ReportException(Environment* env,
+ Local<Value> er,
+ Local<Message> message) {
CHECK(!er.IsEmpty());
HandleScope scope(env->isolate());
@@ -1110,9 +1110,9 @@ static void ReportException(Environment* env, const TryCatch& try_catch) {
// Executes a str within the current v8 context.
-static Local<Value> ExecuteString(Environment* env,
- Local<String> source,
- Local<String> filename) {
+static MaybeLocal<Value> ExecuteString(Environment* env,
+ Local<String> source,
+ Local<String> filename) {
EscapableHandleScope scope(env->isolate());
TryCatch try_catch(env->isolate());
@@ -1125,13 +1125,19 @@ static Local<Value> ExecuteString(Environment* env,
v8::Script::Compile(env->context(), source, &origin);
if (script.IsEmpty()) {
ReportException(env, try_catch);
- exit(3);
+ env->Exit(3);
+ return MaybeLocal<Value>();
}
MaybeLocal<Value> result = script.ToLocalChecked()->Run(env->context());
if (result.IsEmpty()) {
+ if (try_catch.HasTerminated()) {
+ env->isolate()->CancelTerminateExecution();
+ return MaybeLocal<Value>();
+ }
ReportException(env, try_catch);
- exit(4);
+ env->Exit(4);
+ return MaybeLocal<Value>();
}
return scope.Escape(result.ToLocalChecked());
@@ -1230,6 +1236,7 @@ static void Abort(const FunctionCallbackInfo<Value>& args) {
void Chdir(const FunctionCallbackInfo<Value>& args) {
Environment* env = Environment::GetCurrent(args);
+ CHECK(env->is_main_thread());
CHECK_EQ(args.Length(), 1);
CHECK(args[0]->IsString());
@@ -1411,6 +1418,7 @@ static void GetEGid(const FunctionCallbackInfo<Value>& args) {
void SetGid(const FunctionCallbackInfo<Value>& args) {
Environment* env = Environment::GetCurrent(args);
+ CHECK(env->is_main_thread());
CHECK_EQ(args.Length(), 1);
CHECK(args[0]->IsUint32() || args[0]->IsString());
@@ -1430,6 +1438,7 @@ void SetGid(const FunctionCallbackInfo<Value>& args) {
void SetEGid(const FunctionCallbackInfo<Value>& args) {
Environment* env = Environment::GetCurrent(args);
+ CHECK(env->is_main_thread());
CHECK_EQ(args.Length(), 1);
CHECK(args[0]->IsUint32() || args[0]->IsString());
@@ -1449,6 +1458,7 @@ void SetEGid(const FunctionCallbackInfo<Value>& args) {
void SetUid(const FunctionCallbackInfo<Value>& args) {
Environment* env = Environment::GetCurrent(args);
+ CHECK(env->is_main_thread());
CHECK_EQ(args.Length(), 1);
CHECK(args[0]->IsUint32() || args[0]->IsString());
@@ -1468,6 +1478,7 @@ void SetUid(const FunctionCallbackInfo<Value>& args) {
void SetEUid(const FunctionCallbackInfo<Value>& args) {
Environment* env = Environment::GetCurrent(args);
+ CHECK(env->is_main_thread());
CHECK_EQ(args.Length(), 1);
CHECK(args[0]->IsUint32() || args[0]->IsString());
@@ -1629,9 +1640,10 @@ static void WaitForInspectorDisconnect(Environment* env) {
static void Exit(const FunctionCallbackInfo<Value>& args) {
- WaitForInspectorDisconnect(Environment::GetCurrent(args));
+ Environment* env = Environment::GetCurrent(args);
+ WaitForInspectorDisconnect(env);
v8_platform.StopTracingAgent();
- exit(args[0]->Int32Value());
+ env->Exit(args[0]->Int32Value());
}
@@ -2040,6 +2052,9 @@ void FatalException(Isolate* isolate,
Local<Value> caught =
fatal_exception_function->Call(process_object, 1, &error);
+ if (fatal_try_catch.HasTerminated())
+ return;
+
if (fatal_try_catch.HasCaught()) {
// The fatal exception function threw, so we must exit
ReportException(env, fatal_try_catch);
@@ -2053,6 +2068,12 @@ void FatalException(Isolate* isolate,
void FatalException(Isolate* isolate, const TryCatch& try_catch) {
+ // If we try to print out a termination exception, we'd just get 'null',
+ // so just crashing here with that information seems like a better idea,
+ // and in particular it seems like we should handle terminations at the call
+ // site for this function rather than by printing them out somewhere.
+ CHECK(!try_catch.HasTerminated());
+
HandleScope scope(isolate);
if (!try_catch.IsVerbose()) {
FatalException(isolate, try_catch.Exception(), try_catch.Message());
@@ -2574,11 +2595,12 @@ void SetupProcessObject(Environment* env,
Local<Object> process = env->process_object();
auto title_string = FIXED_ONE_BYTE_STRING(env->isolate(), "title");
- CHECK(process->SetAccessor(env->context(),
- title_string,
- ProcessTitleGetter,
- ProcessTitleSetter,
- env->as_external()).FromJust());
+ CHECK(process->SetAccessor(
+ env->context(),
+ title_string,
+ ProcessTitleGetter,
+ env->is_main_thread() ? ProcessTitleSetter : nullptr,
+ env->as_external()).FromJust());
// process.version
READONLY_PROPERTY(process,
@@ -2862,25 +2884,27 @@ void SetupProcessObject(Environment* env,
CHECK(process->SetAccessor(env->context(),
debug_port_string,
DebugPortGetter,
- DebugPortSetter,
+ env->is_main_thread() ? DebugPortSetter : nullptr,
env->as_external()).FromJust());
// define various internal methods
- env->SetMethod(process,
- "_startProfilerIdleNotifier",
- StartProfilerIdleNotifier);
- env->SetMethod(process,
- "_stopProfilerIdleNotifier",
- StopProfilerIdleNotifier);
+ if (env->is_main_thread()) {
+ env->SetMethod(process,
+ "_startProfilerIdleNotifier",
+ StartProfilerIdleNotifier);
+ env->SetMethod(process,
+ "_stopProfilerIdleNotifier",
+ StopProfilerIdleNotifier);
+ env->SetMethod(process, "abort", Abort);
+ env->SetMethod(process, "chdir", Chdir);
+ env->SetMethod(process, "umask", Umask);
+ }
+
env->SetMethod(process, "_getActiveRequests", GetActiveRequests);
env->SetMethod(process, "_getActiveHandles", GetActiveHandles);
env->SetMethod(process, "reallyExit", Exit);
- env->SetMethod(process, "abort", Abort);
- env->SetMethod(process, "chdir", Chdir);
env->SetMethod(process, "cwd", Cwd);
- env->SetMethod(process, "umask", Umask);
-
#if defined(__POSIX__) && !defined(__ANDROID__) && !defined(__CloudABI__)
env->SetMethod(process, "getuid", GetUid);
env->SetMethod(process, "geteuid", GetEUid);
@@ -2890,16 +2914,17 @@ void SetupProcessObject(Environment* env,
#endif // __POSIX__ && !defined(__ANDROID__) && !defined(__CloudABI__)
env->SetMethod(process, "_kill", Kill);
+ env->SetMethod(process, "dlopen", DLOpen);
- env->SetMethod(process, "_debugProcess", DebugProcess);
- env->SetMethod(process, "_debugEnd", DebugEnd);
+ if (env->is_main_thread()) {
+ env->SetMethod(process, "_debugProcess", DebugProcess);
+ env->SetMethod(process, "_debugEnd", DebugEnd);
+ }
env->SetMethod(process, "hrtime", Hrtime);
env->SetMethod(process, "cpuUsage", CPUUsage);
- env->SetMethod(process, "dlopen", DLOpen);
-
env->SetMethod(process, "uptime", Uptime);
env->SetMethod(process, "memoryUsage", MemoryUsage);
}
@@ -2935,8 +2960,10 @@ void RawDebug(const FunctionCallbackInfo<Value>& args) {
}
-static Local<Function> GetBootstrapper(Environment* env, Local<String> source,
- Local<String> script_name) {
+static MaybeLocal<Function> GetBootstrapper(
+ Environment* env,
+ Local<String> source,
+ Local<String> script_name) {
EscapableHandleScope scope(env->isolate());
TryCatch try_catch(env->isolate());
@@ -2947,16 +2974,17 @@ static Local<Function> GetBootstrapper(Environment* env, Local<String> source,
try_catch.SetVerbose(false);
// Execute the bootstrapper javascript file
- Local<Value> bootstrapper_v = ExecuteString(env, source, script_name);
+ MaybeLocal<Value> bootstrapper_v = ExecuteString(env, source, script_name);
+ if (bootstrapper_v.IsEmpty()) // This happens when execution was interrupted.
+ return MaybeLocal<Function>();
+
if (try_catch.HasCaught()) {
ReportException(env, try_catch);
exit(10);
}
- CHECK(bootstrapper_v->IsFunction());
- Local<Function> bootstrapper = Local<Function>::Cast(bootstrapper_v);
-
- return scope.Escape(bootstrapper);
+ CHECK(bootstrapper_v.ToLocalChecked()->IsFunction());
+ return scope.Escape(bootstrapper_v.ToLocalChecked().As<Function>());
}
static bool ExecuteBootstrapper(Environment* env, Local<Function> bootstrapper,
@@ -2995,13 +3023,18 @@ void LoadEnvironment(Environment* env) {
// node_js2c.
Local<String> loaders_name =
FIXED_ONE_BYTE_STRING(env->isolate(), "internal/bootstrap/loaders.js");
- Local<Function> loaders_bootstrapper =
+ MaybeLocal<Function> loaders_bootstrapper =
GetBootstrapper(env, LoadersBootstrapperSource(env), loaders_name);
Local<String> node_name =
FIXED_ONE_BYTE_STRING(env->isolate(), "internal/bootstrap/node.js");
- Local<Function> node_bootstrapper =
+ MaybeLocal<Function> node_bootstrapper =
GetBootstrapper(env, NodeBootstrapperSource(env), node_name);
+ if (loaders_bootstrapper.IsEmpty() || node_bootstrapper.IsEmpty()) {
+ // Execution was interrupted.
+ return;
+ }
+
// Add a reference to the global object
Local<Object> global = env->context()->Global();
@@ -3049,7 +3082,7 @@ void LoadEnvironment(Environment* env) {
// Bootstrap internal loaders
Local<Value> bootstrapped_loaders;
- if (!ExecuteBootstrapper(env, loaders_bootstrapper,
+ if (!ExecuteBootstrapper(env, loaders_bootstrapper.ToLocalChecked(),
arraysize(loaders_bootstrapper_args),
loaders_bootstrapper_args,
&bootstrapped_loaders)) {
@@ -3065,7 +3098,7 @@ void LoadEnvironment(Environment* env) {
bootstrapper,
bootstrapped_loaders
};
- if (!ExecuteBootstrapper(env, node_bootstrapper,
+ if (!ExecuteBootstrapper(env, node_bootstrapper.ToLocalChecked(),
arraysize(node_bootstrapper_args),
node_bootstrapper_args,
&bootstrapped_node)) {
@@ -4279,6 +4312,7 @@ inline int Start(Isolate* isolate, IsolateData* isolate_data,
WaitForInspectorDisconnect(&env);
env.set_can_call_into_js(false);
+ env.stop_sub_worker_contexts();
env.RunCleanup();
RunAtExit(&env);
diff --git a/src/node_errors.h b/src/node_errors.h
index 931ce7b8fd..2c97088cc5 100644
--- a/src/node_errors.h
+++ b/src/node_errors.h
@@ -34,6 +34,7 @@ namespace node {
V(ERR_MISSING_ARGS, TypeError) \
V(ERR_MISSING_MESSAGE_PORT_IN_TRANSFER_LIST, TypeError) \
V(ERR_MISSING_MODULE, Error) \
+ V(ERR_MISSING_PLATFORM_FOR_WORKER, Error) \
V(ERR_SCRIPT_EXECUTION_INTERRUPTED, Error) \
V(ERR_SCRIPT_EXECUTION_TIMEOUT, Error) \
V(ERR_STRING_TOO_LONG, Error) \
@@ -68,6 +69,9 @@ namespace node {
V(ERR_MEMORY_ALLOCATION_FAILED, "Failed to allocate memory") \
V(ERR_MISSING_MESSAGE_PORT_IN_TRANSFER_LIST, \
"MessagePort was found in message but not listed in transferList") \
+ V(ERR_MISSING_PLATFORM_FOR_WORKER, \
+ "The V8 platform used by this instance of Node does not support " \
+ "creating Workers") \
V(ERR_SCRIPT_EXECUTION_INTERRUPTED, \
"Script execution was interrupted by `SIGINT`") \
V(ERR_TRANSFERRING_EXTERNALIZED_SHAREDARRAYBUFFER, \
diff --git a/src/node_internals.h b/src/node_internals.h
index a5d8ed0e5d..7760eb26c6 100644
--- a/src/node_internals.h
+++ b/src/node_internals.h
@@ -137,6 +137,7 @@ struct sockaddr;
V(util) \
V(uv) \
V(v8) \
+ V(worker) \
V(zlib)
#define NODE_BUILTIN_MODULES(V) \
@@ -314,6 +315,10 @@ class FatalTryCatch : public v8::TryCatch {
Environment* env_;
};
+void ReportException(Environment* env,
+ v8::Local<v8::Value> er,
+ v8::Local<v8::Message> message);
+
v8::Maybe<bool> ProcessEmitWarning(Environment* env, const char* fmt, ...);
v8::Maybe<bool> ProcessEmitDeprecationWarning(Environment* env,
const char* warning,
diff --git a/src/node_messaging.cc b/src/node_messaging.cc
index b56cef2d77..352749ea48 100644
--- a/src/node_messaging.cc
+++ b/src/node_messaging.cc
@@ -57,7 +57,7 @@ class DeserializerDelegate : public ValueDeserializer::Delegate {
if (!deserializer->ReadUint32(&id))
return MaybeLocal<Object>();
CHECK_LE(id, message_ports_.size());
- return message_ports_[id]->object();
+ return message_ports_[id]->object(isolate);
};
MaybeLocal<SharedArrayBuffer> GetSharedArrayBufferFromId(
@@ -436,7 +436,7 @@ MessagePort* MessagePort::New(
void MessagePort::OnMessage() {
HandleScope handle_scope(env()->isolate());
- Local<Context> context = object()->CreationContext();
+ Local<Context> context = object(env()->isolate())->CreationContext();
// data_ can only ever be modified by the owner thread, so no need to lock.
// However, the message port may be transferred while it is processing
@@ -447,6 +447,13 @@ void MessagePort::OnMessage() {
{
// Get the head of the message queue.
Mutex::ScopedLock lock(data_->mutex_);
+
+ if (stop_event_loop_) {
+ CHECK(!data_->receiving_messages_);
+ uv_stop(env()->event_loop());
+ break;
+ }
+
if (!data_->receiving_messages_)
break;
if (data_->incoming_messages_.empty())
@@ -514,8 +521,9 @@ void MessagePort::Send(Message&& message) {
void MessagePort::Send(const FunctionCallbackInfo<Value>& args) {
Environment* env = Environment::GetCurrent(args);
+ Local<Context> context = object(env->isolate())->CreationContext();
Message msg;
- if (msg.Serialize(env, object()->CreationContext(), args[0], args[1])
+ if (msg.Serialize(env, context, args[0], args[1])
.IsNothing()) {
return;
}
@@ -548,6 +556,14 @@ void MessagePort::Stop() {
data_->receiving_messages_ = false;
}
+void MessagePort::StopEventLoop() {
+ Mutex::ScopedLock lock(data_->mutex_);
+ data_->receiving_messages_ = false;
+ stop_event_loop_ = true;
+
+ TriggerAsync();
+}
+
void MessagePort::Start(const FunctionCallbackInfo<Value>& args) {
Environment* env = Environment::GetCurrent(args);
MessagePort* port;
@@ -570,6 +586,12 @@ void MessagePort::Stop(const FunctionCallbackInfo<Value>& args) {
port->Stop();
}
+void MessagePort::Drain(const FunctionCallbackInfo<Value>& args) {
+ MessagePort* port;
+ ASSIGN_OR_RETURN_UNWRAP(&port, args.This());
+ port->OnMessage();
+}
+
size_t MessagePort::self_size() const {
Mutex::ScopedLock lock(data_->mutex_);
size_t sz = sizeof(*this) + sizeof(*data_);
@@ -604,6 +626,7 @@ MaybeLocal<Function> GetMessagePortConstructor(
env->SetProtoMethod(m, "postMessage", MessagePort::PostMessage);
env->SetProtoMethod(m, "start", MessagePort::Start);
env->SetProtoMethod(m, "stop", MessagePort::Stop);
+ env->SetProtoMethod(m, "drain", MessagePort::Drain);
env->SetProtoMethod(m, "close", HandleWrap::Close);
env->SetProtoMethod(m, "unref", HandleWrap::Unref);
env->SetProtoMethod(m, "ref", HandleWrap::Ref);
diff --git a/src/node_messaging.h b/src/node_messaging.h
index ff8fcc7243..9a13437d19 100644
--- a/src/node_messaging.h
+++ b/src/node_messaging.h
@@ -133,11 +133,15 @@ class MessagePort : public HandleWrap {
void Start();
// Stop processing messages on this port as a receiving end.
void Stop();
+ // Stop processing messages on this port as a receiving end,
+ // and stop the event loop that this port is associated with.
+ void StopEventLoop();
static void New(const v8::FunctionCallbackInfo<v8::Value>& args);
static void PostMessage(const v8::FunctionCallbackInfo<v8::Value>& args);
static void Start(const v8::FunctionCallbackInfo<v8::Value>& args);
static void Stop(const v8::FunctionCallbackInfo<v8::Value>& args);
+ static void Drain(const v8::FunctionCallbackInfo<v8::Value>& args);
// Turns `a` and `b` into siblings, i.e. connects the sending side of one
// to the receiving side of the other. This is not thread-safe.
@@ -160,6 +164,7 @@ class MessagePort : public HandleWrap {
inline uv_async_t* async();
std::unique_ptr<MessagePortData> data_ = nullptr;
+ bool stop_event_loop_ = false;
friend class MessagePortData;
};
diff --git a/src/node_worker.cc b/src/node_worker.cc
new file mode 100644
index 0000000000..366dca353d
--- /dev/null
+++ b/src/node_worker.cc
@@ -0,0 +1,428 @@
+#include "node_worker.h"
+#include "node_errors.h"
+#include "node_internals.h"
+#include "node_buffer.h"
+#include "node_perf.h"
+#include "util.h"
+#include "util-inl.h"
+#include "async_wrap.h"
+#include "async_wrap-inl.h"
+
+using v8::ArrayBuffer;
+using v8::Context;
+using v8::Function;
+using v8::FunctionCallbackInfo;
+using v8::FunctionTemplate;
+using v8::HandleScope;
+using v8::Integer;
+using v8::Isolate;
+using v8::Local;
+using v8::Locker;
+using v8::Number;
+using v8::Object;
+using v8::SealHandleScope;
+using v8::String;
+using v8::Value;
+
+namespace node {
+namespace worker {
+
+namespace {
+
+double next_thread_id = 1;
+Mutex next_thread_id_mutex;
+
+} // anonymous namespace
+
+Worker::Worker(Environment* env, Local<Object> wrap)
+ : AsyncWrap(env, wrap, AsyncWrap::PROVIDER_WORKER) {
+ // Generate a new thread id.
+ {
+ Mutex::ScopedLock next_thread_id_lock(next_thread_id_mutex);
+ thread_id_ = next_thread_id++;
+ }
+ wrap->Set(env->context(),
+ env->thread_id_string(),
+ Number::New(env->isolate(), thread_id_)).FromJust();
+
+ // Set up everything that needs to be set up in the parent environment.
+ parent_port_ = MessagePort::New(env, env->context());
+ if (parent_port_ == nullptr) {
+ // This can happen e.g. because execution is terminating.
+ return;
+ }
+
+ child_port_data_.reset(new MessagePortData(nullptr));
+ MessagePort::Entangle(parent_port_, child_port_data_.get());
+
+ object()->Set(env->context(),
+ env->message_port_string(),
+ parent_port_->object()).FromJust();
+
+ array_buffer_allocator_.reset(CreateArrayBufferAllocator());
+
+ isolate_ = NewIsolate(array_buffer_allocator_.get());
+ CHECK_NE(isolate_, nullptr);
+ CHECK_EQ(uv_loop_init(&loop_), 0);
+
+ thread_exit_async_.reset(new uv_async_t);
+ thread_exit_async_->data = this;
+ CHECK_EQ(uv_async_init(env->event_loop(),
+ thread_exit_async_.get(),
+ [](uv_async_t* handle) {
+ static_cast<Worker*>(handle->data)->OnThreadStopped();
+ }), 0);
+
+ {
+ // Enter an environment capable of executing code in the child Isolate
+ // (and only in it).
+ Locker locker(isolate_);
+ Isolate::Scope isolate_scope(isolate_);
+ HandleScope handle_scope(isolate_);
+
+ isolate_data_.reset(CreateIsolateData(isolate_,
+ &loop_,
+ env->isolate_data()->platform(),
+ array_buffer_allocator_.get()));
+ CHECK(isolate_data_);
+
+ Local<Context> context = NewContext(isolate_);
+ Context::Scope context_scope(context);
+
+ // TODO(addaleax): Use CreateEnvironment(), or generally another public API.
+ env_.reset(new Environment(isolate_data_.get(),
+ context,
+ nullptr));
+ CHECK_NE(env_, nullptr);
+ env_->set_abort_on_uncaught_exception(false);
+ env_->set_worker_context(this);
+ env_->set_thread_id(thread_id_);
+
+ env_->Start(0, nullptr, 0, nullptr, env->profiler_idle_notifier_started());
+ }
+
+ // The new isolate won't be bothered on this thread again.
+ isolate_->DiscardThreadSpecificMetadata();
+}
+
+bool Worker::is_stopped() const {
+ Mutex::ScopedLock stopped_lock(stopped_mutex_);
+ return stopped_;
+}
+
+void Worker::Run() {
+ MultiIsolatePlatform* platform = isolate_data_->platform();
+ CHECK_NE(platform, nullptr);
+
+ {
+ Locker locker(isolate_);
+ Isolate::Scope isolate_scope(isolate_);
+ SealHandleScope outer_seal(isolate_);
+
+ {
+ Context::Scope context_scope(env_->context());
+ HandleScope handle_scope(isolate_);
+
+ {
+ HandleScope handle_scope(isolate_);
+ Mutex::ScopedLock lock(mutex_);
+ // Set up the message channel for receiving messages in the child.
+ child_port_ = MessagePort::New(env_.get(),
+ env_->context(),
+ std::move(child_port_data_));
+ // MessagePort::New() may return nullptr if execution is terminated
+ // within it.
+ if (child_port_ != nullptr)
+ env_->set_message_port(child_port_->object(isolate_));
+ }
+
+ if (!is_stopped()) {
+ HandleScope handle_scope(isolate_);
+ Environment::AsyncCallbackScope callback_scope(env_.get());
+ env_->async_hooks()->push_async_ids(1, 0);
+ // This loads the Node bootstrapping code.
+ LoadEnvironment(env_.get());
+ env_->async_hooks()->pop_async_id(1);
+ }
+
+ {
+ SealHandleScope seal(isolate_);
+ bool more;
+ env_->performance_state()->Mark(
+ node::performance::NODE_PERFORMANCE_MILESTONE_LOOP_START);
+ do {
+ if (is_stopped()) break;
+ uv_run(&loop_, UV_RUN_DEFAULT);
+ if (is_stopped()) break;
+
+ platform->DrainBackgroundTasks(isolate_);
+
+ more = uv_loop_alive(&loop_);
+ if (more && !is_stopped())
+ continue;
+
+ EmitBeforeExit(env_.get());
+
+ // Emit `beforeExit` if the loop became alive either after emitting
+ // event, or after running some callbacks.
+ more = uv_loop_alive(&loop_);
+ } while (more == true);
+ env_->performance_state()->Mark(
+ node::performance::NODE_PERFORMANCE_MILESTONE_LOOP_EXIT);
+ }
+ }
+
+ {
+ int exit_code;
+ bool stopped = is_stopped();
+ if (!stopped)
+ exit_code = EmitExit(env_.get());
+ Mutex::ScopedLock lock(mutex_);
+ if (exit_code_ == 0 && !stopped)
+ exit_code_ = exit_code;
+ }
+
+ env_->set_can_call_into_js(false);
+ Isolate::DisallowJavascriptExecutionScope disallow_js(isolate_,
+ Isolate::DisallowJavascriptExecutionScope::THROW_ON_FAILURE);
+
+ // Grab the parent-to-child channel and render is unusable.
+ MessagePort* child_port;
+ {
+ Mutex::ScopedLock lock(mutex_);
+ child_port = child_port_;
+ child_port_ = nullptr;
+ }
+
+ {
+ Context::Scope context_scope(env_->context());
+ child_port->Close();
+ env_->stop_sub_worker_contexts();
+ env_->RunCleanup();
+ RunAtExit(env_.get());
+
+ {
+ Mutex::ScopedLock stopped_lock(stopped_mutex_);
+ stopped_ = true;
+ }
+
+ env_->RunCleanup();
+
+ // This call needs to be made while the `Environment` is still alive
+ // because we assume that it is available for async tracking in the
+ // NodePlatform implementation.
+ platform->DrainBackgroundTasks(isolate_);
+ }
+
+ env_.reset();
+ }
+
+ DisposeIsolate();
+
+ // Need to run the loop one more time to close the platform's uv_async_t
+ uv_run(&loop_, UV_RUN_ONCE);
+
+ {
+ Mutex::ScopedLock lock(mutex_);
+ CHECK(thread_exit_async_);
+ scheduled_on_thread_stopped_ = true;
+ uv_async_send(thread_exit_async_.get());
+ }
+}
+
+void Worker::DisposeIsolate() {
+ if (isolate_ == nullptr)
+ return;
+
+ CHECK(isolate_data_);
+ MultiIsolatePlatform* platform = isolate_data_->platform();
+ platform->CancelPendingDelayedTasks(isolate_);
+
+ isolate_data_.reset();
+
+ isolate_->Dispose();
+ isolate_ = nullptr;
+}
+
+void Worker::JoinThread() {
+ if (thread_joined_)
+ return;
+ CHECK_EQ(uv_thread_join(&tid_), 0);
+ thread_joined_ = true;
+
+ env()->remove_sub_worker_context(this);
+
+ if (thread_exit_async_) {
+ env()->CloseHandle(thread_exit_async_.release(), [](uv_async_t* async) {
+ delete async;
+ });
+
+ if (scheduled_on_thread_stopped_)
+ OnThreadStopped();
+ }
+}
+
+void Worker::OnThreadStopped() {
+ Mutex::ScopedLock lock(mutex_);
+ scheduled_on_thread_stopped_ = false;
+
+ {
+ Mutex::ScopedLock stopped_lock(stopped_mutex_);
+ CHECK(stopped_);
+ }
+
+ CHECK_EQ(child_port_, nullptr);
+ parent_port_ = nullptr;
+
+ // It's okay to join the thread while holding the mutex because
+ // OnThreadStopped means it's no longer doing any work that might grab it
+ // and really just silently exiting.
+ JoinThread();
+
+ {
+ HandleScope handle_scope(env()->isolate());
+ Context::Scope context_scope(env()->context());
+
+ // Reset the parent port as we're closing it now anyway.
+ object()->Set(env()->context(),
+ env()->message_port_string(),
+ Undefined(env()->isolate())).FromJust();
+
+ Local<Value> code = Integer::New(env()->isolate(), exit_code_);
+ MakeCallback(env()->onexit_string(), 1, &code);
+ }
+
+ // JoinThread() cleared all libuv handles bound to this Worker,
+ // the C++ object is no longer needed for anything now.
+ MakeWeak();
+}
+
+Worker::~Worker() {
+ Mutex::ScopedLock lock(mutex_);
+ JoinThread();
+
+ CHECK(stopped_);
+ CHECK(thread_joined_);
+ CHECK_EQ(child_port_, nullptr);
+ CHECK_EQ(uv_loop_close(&loop_), 0);
+
+ // This has most likely already happened within the worker thread -- this
+ // is just in case Worker creation failed early.
+ DisposeIsolate();
+}
+
+void Worker::New(const FunctionCallbackInfo<Value>& args) {
+ Environment* env = Environment::GetCurrent(args);
+
+ CHECK(args.IsConstructCall());
+
+ if (env->isolate_data()->platform() == nullptr) {
+ THROW_ERR_MISSING_PLATFORM_FOR_WORKER(env);
+ return;
+ }
+
+ new Worker(env, args.This());
+}
+
+void Worker::StartThread(const FunctionCallbackInfo<Value>& args) {
+ Worker* w;
+ ASSIGN_OR_RETURN_UNWRAP(&w, args.This());
+ Mutex::ScopedLock lock(w->mutex_);
+
+ w->env()->add_sub_worker_context(w);
+ w->stopped_ = false;
+ CHECK_EQ(uv_thread_create(&w->tid_, [](void* arg) {
+ static_cast<Worker*>(arg)->Run();
+ }, static_cast<void*>(w)), 0);
+ w->thread_joined_ = false;
+}
+
+void Worker::StopThread(const FunctionCallbackInfo<Value>& args) {
+ Worker* w;
+ ASSIGN_OR_RETURN_UNWRAP(&w, args.This());
+
+ w->Exit(1);
+ w->JoinThread();
+}
+
+void Worker::Ref(const FunctionCallbackInfo<Value>& args) {
+ Worker* w;
+ ASSIGN_OR_RETURN_UNWRAP(&w, args.This());
+ if (w->thread_exit_async_)
+ uv_ref(reinterpret_cast<uv_handle_t*>(w->thread_exit_async_.get()));
+}
+
+void Worker::Unref(const FunctionCallbackInfo<Value>& args) {
+ Worker* w;
+ ASSIGN_OR_RETURN_UNWRAP(&w, args.This());
+ if (w->thread_exit_async_)
+ uv_unref(reinterpret_cast<uv_handle_t*>(w->thread_exit_async_.get()));
+}
+
+void Worker::Exit(int code) {
+ Mutex::ScopedLock lock(mutex_);
+ Mutex::ScopedLock stopped_lock(stopped_mutex_);
+ if (!stopped_) {
+ CHECK_NE(env_, nullptr);
+ stopped_ = true;
+ exit_code_ = code;
+ if (child_port_ != nullptr)
+ child_port_->StopEventLoop();
+ isolate_->TerminateExecution();
+ }
+}
+
+size_t Worker::self_size() const {
+ return sizeof(*this);
+}
+
+namespace {
+
+// Return the MessagePort that is global for this Environment and communicates
+// with the internal [kPort] port of the JS Worker class in the parent thread.
+void GetEnvMessagePort(const FunctionCallbackInfo<Value>& args) {
+ Environment* env = Environment::GetCurrent(args);
+ Local<Object> port = env->message_port();
+ if (!port.IsEmpty()) {
+ CHECK_EQ(port->CreationContext()->GetIsolate(), args.GetIsolate());
+ args.GetReturnValue().Set(port);
+ }
+}
+
+void InitWorker(Local<Object> target,
+ Local<Value> unused,
+ Local<Context> context,
+ void* priv) {
+ Environment* env = Environment::GetCurrent(context);
+
+ {
+ Local<FunctionTemplate> w = env->NewFunctionTemplate(Worker::New);
+
+ w->InstanceTemplate()->SetInternalFieldCount(1);
+
+ AsyncWrap::AddWrapMethods(env, w);
+ env->SetProtoMethod(w, "startThread", Worker::StartThread);
+ env->SetProtoMethod(w, "stopThread", Worker::StopThread);
+ env->SetProtoMethod(w, "ref", Worker::Ref);
+ env->SetProtoMethod(w, "unref", Worker::Unref);
+
+ Local<String> workerString =
+ FIXED_ONE_BYTE_STRING(env->isolate(), "Worker");
+ w->SetClassName(workerString);
+ target->Set(workerString, w->GetFunction());
+ }
+
+ env->SetMethod(target, "getEnvMessagePort", GetEnvMessagePort);
+
+ auto thread_id_string = FIXED_ONE_BYTE_STRING(env->isolate(), "threadId");
+ target->Set(env->context(),
+ thread_id_string,
+ Number::New(env->isolate(), env->thread_id())).FromJust();
+}
+
+} // anonymous namespace
+
+} // namespace worker
+} // namespace node
+
+NODE_MODULE_CONTEXT_AWARE_INTERNAL(worker, node::worker::InitWorker)
diff --git a/src/node_worker.h b/src/node_worker.h
new file mode 100644
index 0000000000..0a98d2f11e
--- /dev/null
+++ b/src/node_worker.h
@@ -0,0 +1,83 @@
+#ifndef SRC_NODE_WORKER_H_
+#define SRC_NODE_WORKER_H_
+
+#if defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS
+
+#include "node_messaging.h"
+#include <unordered_map>
+
+namespace node {
+namespace worker {
+
+// A worker thread, as represented in its parent thread.
+class Worker : public AsyncWrap {
+ public:
+ Worker(Environment* env, v8::Local<v8::Object> wrap);
+ ~Worker();
+
+ // Run the worker. This is only called from the worker thread.
+ void Run();
+
+ // Forcibly exit the thread with a specified exit code. This may be called
+ // from any thread.
+ void Exit(int code);
+
+ // Wait for the worker thread to stop (in a blocking manner).
+ void JoinThread();
+
+ size_t self_size() const override;
+ bool is_stopped() const;
+
+ static void New(const v8::FunctionCallbackInfo<v8::Value>& args);
+ static void StartThread(const v8::FunctionCallbackInfo<v8::Value>& args);
+ static void StopThread(const v8::FunctionCallbackInfo<v8::Value>& args);
+ static void GetMessagePort(const v8::FunctionCallbackInfo<v8::Value>& args);
+ static void Ref(const v8::FunctionCallbackInfo<v8::Value>& args);
+ static void Unref(const v8::FunctionCallbackInfo<v8::Value>& args);
+
+ private:
+ void OnThreadStopped();
+ void DisposeIsolate();
+
+ uv_loop_t loop_;
+ DeleteFnPtr<IsolateData, FreeIsolateData> isolate_data_;
+ DeleteFnPtr<Environment, FreeEnvironment> env_;
+ v8::Isolate* isolate_ = nullptr;
+ DeleteFnPtr<ArrayBufferAllocator, FreeArrayBufferAllocator>
+ array_buffer_allocator_;
+ uv_thread_t tid_;
+
+ // This mutex protects access to all variables listed below it.
+ mutable Mutex mutex_;
+
+ // Currently only used for telling the parent thread that the child
+ // thread exited.
+ std::unique_ptr<uv_async_t> thread_exit_async_;
+ bool scheduled_on_thread_stopped_ = false;
+
+ // This mutex only protects stopped_. If both locks are acquired, this needs
+ // to be the latter one.
+ mutable Mutex stopped_mutex_;
+ bool stopped_ = true;
+
+ bool thread_joined_ = true;
+ int exit_code_ = 0;
+ double thread_id_ = -1;
+
+ std::unique_ptr<MessagePortData> child_port_data_;
+
+ // The child port is always kept alive by the child Environment's persistent
+ // handle to it.
+ MessagePort* child_port_ = nullptr;
+ // This is always kept alive because the JS object associated with the Worker
+ // instance refers to it via its [kPort] property.
+ MessagePort* parent_port_ = nullptr;
+};
+
+} // namespace worker
+} // namespace node
+
+#endif // defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS
+
+
+#endif // SRC_NODE_WORKER_H_
diff --git a/test/fixtures/worker-script.mjs b/test/fixtures/worker-script.mjs
new file mode 100644
index 0000000000..b712248b27
--- /dev/null
+++ b/test/fixtures/worker-script.mjs
@@ -0,0 +1,3 @@
+import worker from 'worker';
+
+worker.parentPort.postMessage('Hello, world!');
diff --git a/test/parallel/test-message-channel-sharedarraybuffer.js b/test/parallel/test-message-channel-sharedarraybuffer.js
new file mode 100644
index 0000000000..7ae922adbc
--- /dev/null
+++ b/test/parallel/test-message-channel-sharedarraybuffer.js
@@ -0,0 +1,28 @@
+// Flags: --expose-gc --experimental-worker
+'use strict';
+
+const common = require('../common');
+const assert = require('assert');
+const { Worker } = require('worker');
+
+{
+ const sharedArrayBuffer = new SharedArrayBuffer(12);
+ const local = Buffer.from(sharedArrayBuffer);
+
+ const w = new Worker(`
+ const { parentPort } = require('worker');
+ parentPort.on('message', ({ sharedArrayBuffer }) => {
+ const local = Buffer.from(sharedArrayBuffer);
+ local.write('world!', 6);
+ parentPort.postMessage('written!');
+ });
+ `, { eval: true });
+ w.on('message', common.mustCall(() => {
+ assert.strictEqual(local.toString(), 'Hello world!');
+ global.gc();
+ w.terminate();
+ }));
+ w.postMessage({ sharedArrayBuffer });
+ // This would be a race condition if the memory regions were overlapping
+ local.write('Hello ');
+}
diff --git a/test/parallel/test-message-channel.js b/test/parallel/test-message-channel.js
index 0facaa1d83..eb13fa57c6 100644
--- a/test/parallel/test-message-channel.js
+++ b/test/parallel/test-message-channel.js
@@ -2,7 +2,7 @@
'use strict';
const common = require('../common');
const assert = require('assert');
-const { MessageChannel } = require('worker');
+const { MessageChannel, MessagePort, Worker } = require('worker');
{
const channel = new MessageChannel();
@@ -24,3 +24,23 @@ const { MessageChannel } = require('worker');
channel.port2.on('close', common.mustCall());
channel.port2.close();
}
+
+{
+ const channel = new MessageChannel();
+
+ const w = new Worker(`
+ const { MessagePort } = require('worker');
+ const assert = require('assert');
+ require('worker').parentPort.on('message', ({ port }) => {
+ assert(port instanceof MessagePort);
+ port.postMessage('works');
+ });
+ `, { eval: true });
+ w.postMessage({ port: channel.port2 }, [ channel.port2 ]);
+ assert(channel.port1 instanceof MessagePort);
+ assert(channel.port2 instanceof MessagePort);
+ channel.port1.on('message', common.mustCall((message) => {
+ assert.strictEqual(message, 'works');
+ w.terminate();
+ }));
+}
diff --git a/test/parallel/test-worker-cleanup-handles.js b/test/parallel/test-worker-cleanup-handles.js
new file mode 100644
index 0000000000..ba4f6aa51a
--- /dev/null
+++ b/test/parallel/test-worker-cleanup-handles.js
@@ -0,0 +1,30 @@
+// Flags: --experimental-worker
+'use strict';
+const common = require('../common');
+const assert = require('assert');
+const { Worker, isMainThread, parentPort } = require('worker');
+const { Server } = require('net');
+const fs = require('fs');
+
+if (isMainThread) {
+ const w = new Worker(__filename);
+ let fd = null;
+ w.on('message', common.mustCall((fd_) => {
+ assert.strictEqual(typeof fd_, 'number');
+ fd = fd_;
+ }));
+ w.on('exit', common.mustCall((code) => {
+ if (fd === -1) {
+ // This happens when server sockets don’t have file descriptors,
+ // i.e. on Windows.
+ return;
+ }
+ common.expectsError(() => fs.fstatSync(fd),
+ { code: 'EBADF' });
+ }));
+} else {
+ const server = new Server();
+ server.listen(0);
+ parentPort.postMessage(server._handle.fd);
+ server.unref();
+}
diff --git a/test/parallel/test-worker-dns-terminate.js b/test/parallel/test-worker-dns-terminate.js
new file mode 100644
index 0000000000..079a29d52e
--- /dev/null
+++ b/test/parallel/test-worker-dns-terminate.js
@@ -0,0 +1,15 @@
+// Flags: --experimental-worker
+'use strict';
+const common = require('../common');
+const { Worker } = require('worker');
+
+const w = new Worker(`
+const dns = require('dns');
+dns.lookup('nonexistent.org', () => {});
+require('worker').parentPort.postMessage('0');
+`, { eval: true });
+
+w.on('message', common.mustCall(() => {
+ // This should not crash the worker during a DNS request.
+ w.terminate(common.mustCall());
+}));
diff --git a/test/parallel/test-worker-esmodule.js b/test/parallel/test-worker-esmodule.js
new file mode 100644
index 0000000000..4189eeca3f
--- /dev/null
+++ b/test/parallel/test-worker-esmodule.js
@@ -0,0 +1,11 @@
+// Flags: --experimental-worker --experimental-modules
+'use strict';
+const common = require('../common');
+const fixtures = require('../common/fixtures');
+const assert = require('assert');
+const { Worker } = require('worker');
+
+const w = new Worker(fixtures.path('worker-script.mjs'));
+w.on('message', common.mustCall((message) => {
+ assert.strictEqual(message, 'Hello, world!');
+}));
diff --git a/test/parallel/test-worker-memory.js b/test/parallel/test-worker-memory.js
new file mode 100644
index 0000000000..34b1e0acaf
--- /dev/null
+++ b/test/parallel/test-worker-memory.js
@@ -0,0 +1,41 @@
+// Flags: --experimental-worker
+'use strict';
+const common = require('../common');
+const assert = require('assert');
+const util = require('util');
+const { Worker } = require('worker');
+
+const numWorkers = +process.env.JOBS || require('os').cpus().length;
+
+// Verify that a Worker's memory isn't kept in memory after the thread finishes.
+
+function run(n, done) {
+ if (n <= 0)
+ return done();
+ const worker = new Worker(
+ 'require(\'worker\').parentPort.postMessage(2 + 2)',
+ { eval: true });
+ worker.on('message', common.mustCall((value) => {
+ assert.strictEqual(value, 4);
+ }));
+ worker.on('exit', common.mustCall(() => {
+ run(n - 1, done);
+ }));
+}
+
+const startStats = process.memoryUsage();
+let finished = 0;
+for (let i = 0; i < numWorkers; ++i) {
+ run(60 / numWorkers, () => {
+ if (++finished === numWorkers) {
+ const finishStats = process.memoryUsage();
+ // A typical value for this ratio would be ~1.15.
+ // 5 as a upper limit is generous, but the main point is that we
+ // don't have the memory of 50 Isolates/Node.js environments just lying
+ // around somewhere.
+ assert.ok(finishStats.rss / startStats.rss < 5,
+ 'Unexpected memory overhead: ' +
+ util.inspect([startStats, finishStats]));
+ }
+ });
+}
diff --git a/test/parallel/test-worker-nexttick-terminate.js b/test/parallel/test-worker-nexttick-terminate.js
new file mode 100644
index 0000000000..b010a7dbe5
--- /dev/null
+++ b/test/parallel/test-worker-nexttick-terminate.js
@@ -0,0 +1,20 @@
+// Flags: --experimental-worker
+'use strict';
+const common = require('../common');
+const { Worker } = require('worker');
+
+// Checks that terminating in the middle of `process.nextTick()` does not
+// Crash the process.
+
+const w = new Worker(`
+require('worker').parentPort.postMessage('0');
+process.nextTick(() => {
+ while(1);
+});
+`, { eval: true });
+
+w.on('message', common.mustCall(() => {
+ setTimeout(() => {
+ w.terminate(common.mustCall());
+ }, 1);
+}));
diff --git a/test/parallel/test-worker-syntax-error-file.js b/test/parallel/test-worker-syntax-error-file.js
new file mode 100644
index 0000000000..37798f3343
--- /dev/null
+++ b/test/parallel/test-worker-syntax-error-file.js
@@ -0,0 +1,18 @@
+// Flags: --experimental-worker
+'use strict';
+const common = require('../common');
+const fixtures = require('../common/fixtures');
+const assert = require('assert');
+const { Worker } = require('worker');
+
+// Do not use isMainThread so that this test itself can be run inside a Worker.
+if (!process.env.HAS_STARTED_WORKER) {
+ process.env.HAS_STARTED_WORKER = 1;
+ const w = new Worker(fixtures.path('syntax', 'bad_syntax.js'));
+ w.on('message', common.mustNotCall());
+ w.on('error', common.mustCall((err) => {
+ assert(/SyntaxError/.test(err));
+ }));
+} else {
+ throw new Error('foo');
+}
diff --git a/test/parallel/test-worker-syntax-error.js b/test/parallel/test-worker-syntax-error.js
new file mode 100644
index 0000000000..8f9812a721
--- /dev/null
+++ b/test/parallel/test-worker-syntax-error.js
@@ -0,0 +1,17 @@
+// Flags: --experimental-worker
+'use strict';
+const common = require('../common');
+const assert = require('assert');
+const { Worker } = require('worker');
+
+// Do not use isMainThread so that this test itself can be run inside a Worker.
+if (!process.env.HAS_STARTED_WORKER) {
+ process.env.HAS_STARTED_WORKER = 1;
+ const w = new Worker('abc)', { eval: true });
+ w.on('message', common.mustNotCall());
+ w.on('error', common.mustCall((err) => {
+ assert(/SyntaxError/.test(err));
+ }));
+} else {
+ throw new Error('foo');
+}
diff --git a/test/parallel/test-worker-uncaught-exception-async.js b/test/parallel/test-worker-uncaught-exception-async.js
new file mode 100644
index 0000000000..c1d2a5f4fc
--- /dev/null
+++ b/test/parallel/test-worker-uncaught-exception-async.js
@@ -0,0 +1,20 @@
+// Flags: --experimental-worker
+'use strict';
+const common = require('../common');
+const assert = require('assert');
+const { Worker } = require('worker');
+
+// Do not use isMainThread so that this test itself can be run inside a Worker.
+if (!process.env.HAS_STARTED_WORKER) {
+ process.env.HAS_STARTED_WORKER = 1;
+ const w = new Worker(__filename);
+ w.on('message', common.mustNotCall());
+ w.on('error', common.mustCall((err) => {
+ // TODO(addaleax): be more specific here
+ assert(/foo/.test(err));
+ }));
+} else {
+ setImmediate(() => {
+ throw new Error('foo');
+ });
+}
diff --git a/test/parallel/test-worker-uncaught-exception.js b/test/parallel/test-worker-uncaught-exception.js
new file mode 100644
index 0000000000..b0e3ad11fa
--- /dev/null
+++ b/test/parallel/test-worker-uncaught-exception.js
@@ -0,0 +1,18 @@
+// Flags: --experimental-worker
+'use strict';
+const common = require('../common');
+const assert = require('assert');
+const { Worker } = require('worker');
+
+// Do not use isMainThread so that this test itself can be run inside a Worker.
+if (!process.env.HAS_STARTED_WORKER) {
+ process.env.HAS_STARTED_WORKER = 1;
+ const w = new Worker(__filename);
+ w.on('message', common.mustNotCall());
+ w.on('error', common.mustCall((err) => {
+ // TODO(addaleax): be more specific here
+ assert(/foo/.test(err));
+ }));
+} else {
+ throw new Error('foo');
+}
diff --git a/test/parallel/test-worker.js b/test/parallel/test-worker.js
new file mode 100644
index 0000000000..3fa6e67a34
--- /dev/null
+++ b/test/parallel/test-worker.js
@@ -0,0 +1,18 @@
+// Flags: --experimental-worker
+'use strict';
+const common = require('../common');
+const assert = require('assert');
+const { Worker, isMainThread, parentPort } = require('worker');
+
+if (isMainThread) {
+ const w = new Worker(__filename);
+ w.on('message', common.mustCall((message) => {
+ assert.strictEqual(message, 'Hello, world!');
+ }));
+} else {
+ setImmediate(() => {
+ process.nextTick(() => {
+ parentPort.postMessage('Hello, world!');
+ });
+ });
+}
diff --git a/test/sequential/test-async-wrap-getasyncid.js b/test/sequential/test-async-wrap-getasyncid.js
index 84a3e3b1f4..af08d7b656 100644
--- a/test/sequential/test-async-wrap-getasyncid.js
+++ b/test/sequential/test-async-wrap-getasyncid.js
@@ -38,6 +38,7 @@ common.crashOnUnhandledRejection();
// TODO(addaleax): Test for these
delete providers.STREAMPIPE;
delete providers.MESSAGEPORT;
+ delete providers.WORKER;
const objKeys = Object.keys(providers);
if (objKeys.length > 0)