diff options
Diffstat (limited to 'lib/internal/worker.js')
-rw-r--r-- | lib/internal/worker.js | 221 |
1 files changed, 218 insertions, 3 deletions
diff --git a/lib/internal/worker.js b/lib/internal/worker.js index 73f7525aa7..c982478b93 100644 --- a/lib/internal/worker.js +++ b/lib/internal/worker.js @@ -1,24 +1,49 @@ 'use strict'; +const Buffer = require('buffer').Buffer; const EventEmitter = require('events'); +const assert = require('assert'); +const path = require('path'); const util = require('util'); +const { + ERR_INVALID_ARG_TYPE, + ERR_WORKER_NEED_ABSOLUTE_PATH, + ERR_WORKER_UNSERIALIZABLE_ERROR +} = require('internal/errors').codes; const { internalBinding } = require('internal/bootstrap/loaders'); const { MessagePort, MessageChannel } = internalBinding('messaging'); const { handle_onclose } = internalBinding('symbols'); +const { clearAsyncIdStack } = require('internal/async_hooks'); util.inherits(MessagePort, EventEmitter); +const { + Worker: WorkerImpl, + getEnvMessagePort, + threadId +} = internalBinding('worker'); + +const isMainThread = threadId === 0; + const kOnMessageListener = Symbol('kOnMessageListener'); +const kHandle = Symbol('kHandle'); +const kPort = Symbol('kPort'); +const kPublicPort = Symbol('kPublicPort'); +const kDispose = Symbol('kDispose'); +const kOnExit = Symbol('kOnExit'); +const kOnMessage = Symbol('kOnMessage'); +const kOnCouldNotSerializeErr = Symbol('kOnCouldNotSerializeErr'); +const kOnErrorMessage = Symbol('kOnErrorMessage'); const debug = util.debuglog('worker'); -// A MessagePort consists of a handle (that wraps around an +// A communication channel consisting of a handle (that wraps around an // uv_async_t) which can receive information from other threads and emits // .onmessage events, and a function used for sending data to a MessagePort // in some other thread. MessagePort.prototype[kOnMessageListener] = function onmessage(payload) { - debug('received message', payload); + debug(`[${threadId}] received message`, payload); // Emit the deserialized object to userland. this.emit('message', payload); }; @@ -79,6 +104,9 @@ MessagePort.prototype.close = function(cb) { originalClose.call(this); }; +const drainMessagePort = MessagePort.prototype.drain; +delete MessagePort.prototype.drain; + function setupPortReferencing(port, eventEmitter, eventName) { // Keep track of whether there are any workerMessage listeners: // If there are some, ref() the channel so it keeps the event loop alive. @@ -99,7 +127,194 @@ function setupPortReferencing(port, eventEmitter, eventName) { }); } + +class Worker extends EventEmitter { + constructor(filename, options = {}) { + super(); + debug(`[${threadId}] create new worker`, filename, options); + if (typeof filename !== 'string') { + throw new ERR_INVALID_ARG_TYPE('filename', 'string', filename); + } + + if (!options.eval && !path.isAbsolute(filename)) { + throw new ERR_WORKER_NEED_ABSOLUTE_PATH(filename); + } + + // Set up the C++ handle for the worker, as well as some internal wiring. + this[kHandle] = new WorkerImpl(); + this[kHandle].onexit = (code) => this[kOnExit](code); + this[kPort] = this[kHandle].messagePort; + this[kPort].on('message', (data) => this[kOnMessage](data)); + this[kPort].start(); + this[kPort].unref(); + debug(`[${threadId}] created Worker with ID ${this.threadId}`); + + const { port1, port2 } = new MessageChannel(); + this[kPublicPort] = port1; + this[kPublicPort].on('message', (message) => this.emit('message', message)); + setupPortReferencing(this[kPublicPort], this, 'message'); + this[kPort].postMessage({ + type: 'loadScript', + filename, + doEval: !!options.eval, + workerData: options.workerData, + publicPort: port2 + }, [port2]); + // Actually start the new thread now that everything is in place. + this[kHandle].startThread(); + } + + [kOnExit](code) { + debug(`[${threadId}] hears end event for Worker ${this.threadId}`); + drainMessagePort.call(this[kPublicPort]); + this[kDispose](); + this.emit('exit', code); + this.removeAllListeners(); + } + + [kOnCouldNotSerializeErr]() { + this.emit('error', new ERR_WORKER_UNSERIALIZABLE_ERROR()); + } + + [kOnErrorMessage](serialized) { + // This is what is called for uncaught exceptions. + const error = deserializeError(serialized); + this.emit('error', error); + } + + [kOnMessage](message) { + switch (message.type) { + case 'upAndRunning': + return this.emit('online'); + case 'couldNotSerializeError': + return this[kOnCouldNotSerializeErr](); + case 'errorMessage': + return this[kOnErrorMessage](message.error); + } + + assert.fail(`Unknown worker message type ${message.type}`); + } + + [kDispose]() { + this[kHandle].onexit = null; + this[kHandle] = null; + this[kPort] = null; + this[kPublicPort] = null; + } + + postMessage(...args) { + this[kPublicPort].postMessage(...args); + } + + terminate(callback) { + if (this[kHandle] === null) return; + + debug(`[${threadId}] terminates Worker with ID ${this.threadId}`); + + if (typeof callback !== 'undefined') + this.once('exit', (exitCode) => callback(null, exitCode)); + + this[kHandle].stopThread(); + } + + ref() { + if (this[kHandle] === null) return; + + this[kHandle].ref(); + this[kPublicPort].ref(); + } + + unref() { + if (this[kHandle] === null) return; + + this[kHandle].unref(); + this[kPublicPort].unref(); + } + + get threadId() { + if (this[kHandle] === null) return -1; + + return this[kHandle].threadId; + } +} + +let originalFatalException; + +function setupChild(evalScript) { + // Called during bootstrap to set up worker script execution. + debug(`[${threadId}] is setting up worker child environment`); + const port = getEnvMessagePort(); + + const publicWorker = require('worker'); + + port.on('message', (message) => { + if (message.type === 'loadScript') { + const { filename, doEval, workerData, publicPort } = message; + publicWorker.parentPort = publicPort; + setupPortReferencing(publicPort, publicPort, 'message'); + publicWorker.workerData = workerData; + debug(`[${threadId}] starts worker script ${filename} ` + + `(eval = ${eval}) at cwd = ${process.cwd()}`); + port.unref(); + port.postMessage({ type: 'upAndRunning' }); + if (doEval) { + evalScript('[worker eval]', filename); + } else { + process.argv[1] = filename; // script filename + require('module').runMain(); + } + return; + } + + assert.fail(`Unknown worker message type ${message.type}`); + }); + + port.start(); + + originalFatalException = process._fatalException; + process._fatalException = fatalException; + + function fatalException(error) { + debug(`[${threadId}] gets fatal exception`); + let caught = false; + try { + caught = originalFatalException.call(this, error); + } catch (e) { + error = e; + } + debug(`[${threadId}] fatal exception caught = ${caught}`); + + if (!caught) { + let serialized; + try { + serialized = serializeError(error); + } catch {} + debug(`[${threadId}] fatal exception serialized = ${!!serialized}`); + if (serialized) + port.postMessage({ type: 'errorMessage', error: serialized }); + else + port.postMessage({ type: 'couldNotSerializeError' }); + clearAsyncIdStack(); + } + } +} + +// TODO(addaleax): These can be improved a lot. +function serializeError(error) { + return Buffer.from(util.inspect(error), 'utf8'); +} + +function deserializeError(error) { + return Buffer.from(error.buffer, + error.byteOffset, + error.byteLength).toString('utf8'); +} + module.exports = { MessagePort, - MessageChannel + MessageChannel, + threadId, + Worker, + setupChild, + isMainThread }; |