summaryrefslogtreecommitdiff
path: root/lib/internal/worker.js
diff options
context:
space:
mode:
Diffstat (limited to 'lib/internal/worker.js')
-rw-r--r--lib/internal/worker.js221
1 files changed, 218 insertions, 3 deletions
diff --git a/lib/internal/worker.js b/lib/internal/worker.js
index 73f7525aa7..c982478b93 100644
--- a/lib/internal/worker.js
+++ b/lib/internal/worker.js
@@ -1,24 +1,49 @@
'use strict';
+const Buffer = require('buffer').Buffer;
const EventEmitter = require('events');
+const assert = require('assert');
+const path = require('path');
const util = require('util');
+const {
+ ERR_INVALID_ARG_TYPE,
+ ERR_WORKER_NEED_ABSOLUTE_PATH,
+ ERR_WORKER_UNSERIALIZABLE_ERROR
+} = require('internal/errors').codes;
const { internalBinding } = require('internal/bootstrap/loaders');
const { MessagePort, MessageChannel } = internalBinding('messaging');
const { handle_onclose } = internalBinding('symbols');
+const { clearAsyncIdStack } = require('internal/async_hooks');
util.inherits(MessagePort, EventEmitter);
+const {
+ Worker: WorkerImpl,
+ getEnvMessagePort,
+ threadId
+} = internalBinding('worker');
+
+const isMainThread = threadId === 0;
+
const kOnMessageListener = Symbol('kOnMessageListener');
+const kHandle = Symbol('kHandle');
+const kPort = Symbol('kPort');
+const kPublicPort = Symbol('kPublicPort');
+const kDispose = Symbol('kDispose');
+const kOnExit = Symbol('kOnExit');
+const kOnMessage = Symbol('kOnMessage');
+const kOnCouldNotSerializeErr = Symbol('kOnCouldNotSerializeErr');
+const kOnErrorMessage = Symbol('kOnErrorMessage');
const debug = util.debuglog('worker');
-// A MessagePort consists of a handle (that wraps around an
+// A communication channel consisting of a handle (that wraps around an
// uv_async_t) which can receive information from other threads and emits
// .onmessage events, and a function used for sending data to a MessagePort
// in some other thread.
MessagePort.prototype[kOnMessageListener] = function onmessage(payload) {
- debug('received message', payload);
+ debug(`[${threadId}] received message`, payload);
// Emit the deserialized object to userland.
this.emit('message', payload);
};
@@ -79,6 +104,9 @@ MessagePort.prototype.close = function(cb) {
originalClose.call(this);
};
+const drainMessagePort = MessagePort.prototype.drain;
+delete MessagePort.prototype.drain;
+
function setupPortReferencing(port, eventEmitter, eventName) {
// Keep track of whether there are any workerMessage listeners:
// If there are some, ref() the channel so it keeps the event loop alive.
@@ -99,7 +127,194 @@ function setupPortReferencing(port, eventEmitter, eventName) {
});
}
+
+class Worker extends EventEmitter {
+ constructor(filename, options = {}) {
+ super();
+ debug(`[${threadId}] create new worker`, filename, options);
+ if (typeof filename !== 'string') {
+ throw new ERR_INVALID_ARG_TYPE('filename', 'string', filename);
+ }
+
+ if (!options.eval && !path.isAbsolute(filename)) {
+ throw new ERR_WORKER_NEED_ABSOLUTE_PATH(filename);
+ }
+
+ // Set up the C++ handle for the worker, as well as some internal wiring.
+ this[kHandle] = new WorkerImpl();
+ this[kHandle].onexit = (code) => this[kOnExit](code);
+ this[kPort] = this[kHandle].messagePort;
+ this[kPort].on('message', (data) => this[kOnMessage](data));
+ this[kPort].start();
+ this[kPort].unref();
+ debug(`[${threadId}] created Worker with ID ${this.threadId}`);
+
+ const { port1, port2 } = new MessageChannel();
+ this[kPublicPort] = port1;
+ this[kPublicPort].on('message', (message) => this.emit('message', message));
+ setupPortReferencing(this[kPublicPort], this, 'message');
+ this[kPort].postMessage({
+ type: 'loadScript',
+ filename,
+ doEval: !!options.eval,
+ workerData: options.workerData,
+ publicPort: port2
+ }, [port2]);
+ // Actually start the new thread now that everything is in place.
+ this[kHandle].startThread();
+ }
+
+ [kOnExit](code) {
+ debug(`[${threadId}] hears end event for Worker ${this.threadId}`);
+ drainMessagePort.call(this[kPublicPort]);
+ this[kDispose]();
+ this.emit('exit', code);
+ this.removeAllListeners();
+ }
+
+ [kOnCouldNotSerializeErr]() {
+ this.emit('error', new ERR_WORKER_UNSERIALIZABLE_ERROR());
+ }
+
+ [kOnErrorMessage](serialized) {
+ // This is what is called for uncaught exceptions.
+ const error = deserializeError(serialized);
+ this.emit('error', error);
+ }
+
+ [kOnMessage](message) {
+ switch (message.type) {
+ case 'upAndRunning':
+ return this.emit('online');
+ case 'couldNotSerializeError':
+ return this[kOnCouldNotSerializeErr]();
+ case 'errorMessage':
+ return this[kOnErrorMessage](message.error);
+ }
+
+ assert.fail(`Unknown worker message type ${message.type}`);
+ }
+
+ [kDispose]() {
+ this[kHandle].onexit = null;
+ this[kHandle] = null;
+ this[kPort] = null;
+ this[kPublicPort] = null;
+ }
+
+ postMessage(...args) {
+ this[kPublicPort].postMessage(...args);
+ }
+
+ terminate(callback) {
+ if (this[kHandle] === null) return;
+
+ debug(`[${threadId}] terminates Worker with ID ${this.threadId}`);
+
+ if (typeof callback !== 'undefined')
+ this.once('exit', (exitCode) => callback(null, exitCode));
+
+ this[kHandle].stopThread();
+ }
+
+ ref() {
+ if (this[kHandle] === null) return;
+
+ this[kHandle].ref();
+ this[kPublicPort].ref();
+ }
+
+ unref() {
+ if (this[kHandle] === null) return;
+
+ this[kHandle].unref();
+ this[kPublicPort].unref();
+ }
+
+ get threadId() {
+ if (this[kHandle] === null) return -1;
+
+ return this[kHandle].threadId;
+ }
+}
+
+let originalFatalException;
+
+function setupChild(evalScript) {
+ // Called during bootstrap to set up worker script execution.
+ debug(`[${threadId}] is setting up worker child environment`);
+ const port = getEnvMessagePort();
+
+ const publicWorker = require('worker');
+
+ port.on('message', (message) => {
+ if (message.type === 'loadScript') {
+ const { filename, doEval, workerData, publicPort } = message;
+ publicWorker.parentPort = publicPort;
+ setupPortReferencing(publicPort, publicPort, 'message');
+ publicWorker.workerData = workerData;
+ debug(`[${threadId}] starts worker script ${filename} ` +
+ `(eval = ${eval}) at cwd = ${process.cwd()}`);
+ port.unref();
+ port.postMessage({ type: 'upAndRunning' });
+ if (doEval) {
+ evalScript('[worker eval]', filename);
+ } else {
+ process.argv[1] = filename; // script filename
+ require('module').runMain();
+ }
+ return;
+ }
+
+ assert.fail(`Unknown worker message type ${message.type}`);
+ });
+
+ port.start();
+
+ originalFatalException = process._fatalException;
+ process._fatalException = fatalException;
+
+ function fatalException(error) {
+ debug(`[${threadId}] gets fatal exception`);
+ let caught = false;
+ try {
+ caught = originalFatalException.call(this, error);
+ } catch (e) {
+ error = e;
+ }
+ debug(`[${threadId}] fatal exception caught = ${caught}`);
+
+ if (!caught) {
+ let serialized;
+ try {
+ serialized = serializeError(error);
+ } catch {}
+ debug(`[${threadId}] fatal exception serialized = ${!!serialized}`);
+ if (serialized)
+ port.postMessage({ type: 'errorMessage', error: serialized });
+ else
+ port.postMessage({ type: 'couldNotSerializeError' });
+ clearAsyncIdStack();
+ }
+ }
+}
+
+// TODO(addaleax): These can be improved a lot.
+function serializeError(error) {
+ return Buffer.from(util.inspect(error), 'utf8');
+}
+
+function deserializeError(error) {
+ return Buffer.from(error.buffer,
+ error.byteOffset,
+ error.byteLength).toString('utf8');
+}
+
module.exports = {
MessagePort,
- MessageChannel
+ MessageChannel,
+ threadId,
+ Worker,
+ setupChild,
+ isMainThread
};