diff options
Diffstat (limited to 'lib/internal/process/worker_thread_only.js')
-rw-r--r-- | lib/internal/process/worker_thread_only.js | 97 |
1 files changed, 89 insertions, 8 deletions
diff --git a/lib/internal/process/worker_thread_only.js b/lib/internal/process/worker_thread_only.js index a9332fb427..d26159ab45 100644 --- a/lib/internal/process/worker_thread_only.js +++ b/lib/internal/process/worker_thread_only.js @@ -7,18 +7,21 @@ const { threadId } = internalBinding('worker'); -const debug = require('util').debuglog('worker'); - const { + messageTypes, + kStdioWantsMoreDataCallback, kWaitingStreams, ReadableWorkerStdio, WritableWorkerStdio } = require('internal/worker/io'); -const { - createMessageHandler, - createWorkerFatalExeception -} = require('internal/worker'); +let debuglog; +function debug(...args) { + if (!debuglog) { + debuglog = require('util').debuglog('worker'); + } + return debuglog(...args); +} const workerStdio = {}; @@ -36,12 +39,90 @@ function initializeWorkerStdio() { }; } +function createMessageHandler(port) { + const publicWorker = require('worker_threads'); + + return function(message) { + if (message.type === messageTypes.LOAD_SCRIPT) { + const { filename, doEval, workerData, publicPort, hasStdin } = message; + publicWorker.parentPort = publicPort; + publicWorker.workerData = workerData; + + if (!hasStdin) + workerStdio.stdin.push(null); + + debug(`[${threadId}] starts worker script ${filename} ` + + `(eval = ${eval}) at cwd = ${process.cwd()}`); + port.unref(); + port.postMessage({ type: messageTypes.UP_AND_RUNNING }); + if (doEval) { + const { evalScript } = require('internal/process/execution'); + evalScript('[worker eval]', filename); + } else { + process.argv[1] = filename; // script filename + require('module').runMain(); + } + return; + } else if (message.type === messageTypes.STDIO_PAYLOAD) { + const { stream, chunk, encoding } = message; + workerStdio[stream].push(chunk, encoding); + return; + } else if (message.type === messageTypes.STDIO_WANTS_MORE_DATA) { + const { stream } = message; + workerStdio[stream][kStdioWantsMoreDataCallback](); + return; + } + + require('assert').fail(`Unknown worker message type ${message.type}`); + }; +} + +// XXX(joyeecheung): this has to be returned as an anonymous function +// wrapped in a closure, see the comment of the original +// process._fatalException in lib/internal/process/execution.js +function createWorkerFatalExeception(port) { + const { + fatalException: originalFatalException + } = require('internal/process/execution'); + + return (error) => { + debug(`[${threadId}] gets fatal exception`); + let caught = false; + try { + caught = originalFatalException.call(this, error); + } catch (e) { + error = e; + } + debug(`[${threadId}] fatal exception caught = ${caught}`); + + if (!caught) { + let serialized; + try { + const { serializeError } = require('internal/error-serdes'); + serialized = serializeError(error); + } catch {} + debug(`[${threadId}] fatal exception serialized = ${!!serialized}`); + if (serialized) + port.postMessage({ + type: messageTypes.ERROR_MESSAGE, + error: serialized + }); + else + port.postMessage({ type: messageTypes.COULD_NOT_SERIALIZE_ERROR }); + + const { clearAsyncIdStack } = require('internal/async_hooks'); + clearAsyncIdStack(); + + process.exit(); + } + }; +} + function setup() { debug(`[${threadId}] is setting up worker child environment`); const port = getEnvMessagePort(); - const publicWorker = require('worker_threads'); - port.on('message', createMessageHandler(publicWorker, port, workerStdio)); + port.on('message', createMessageHandler(port)); port.start(); return { |