aboutsummaryrefslogtreecommitdiff
path: root/lib/internal/process/worker_thread_only.js
diff options
context:
space:
mode:
Diffstat (limited to 'lib/internal/process/worker_thread_only.js')
-rw-r--r--lib/internal/process/worker_thread_only.js97
1 files changed, 89 insertions, 8 deletions
diff --git a/lib/internal/process/worker_thread_only.js b/lib/internal/process/worker_thread_only.js
index a9332fb427..d26159ab45 100644
--- a/lib/internal/process/worker_thread_only.js
+++ b/lib/internal/process/worker_thread_only.js
@@ -7,18 +7,21 @@ const {
threadId
} = internalBinding('worker');
-const debug = require('util').debuglog('worker');
-
const {
+ messageTypes,
+ kStdioWantsMoreDataCallback,
kWaitingStreams,
ReadableWorkerStdio,
WritableWorkerStdio
} = require('internal/worker/io');
-const {
- createMessageHandler,
- createWorkerFatalExeception
-} = require('internal/worker');
+let debuglog;
+function debug(...args) {
+ if (!debuglog) {
+ debuglog = require('util').debuglog('worker');
+ }
+ return debuglog(...args);
+}
const workerStdio = {};
@@ -36,12 +39,90 @@ function initializeWorkerStdio() {
};
}
+function createMessageHandler(port) {
+ const publicWorker = require('worker_threads');
+
+ return function(message) {
+ if (message.type === messageTypes.LOAD_SCRIPT) {
+ const { filename, doEval, workerData, publicPort, hasStdin } = message;
+ publicWorker.parentPort = publicPort;
+ publicWorker.workerData = workerData;
+
+ if (!hasStdin)
+ workerStdio.stdin.push(null);
+
+ debug(`[${threadId}] starts worker script ${filename} ` +
+ `(eval = ${eval}) at cwd = ${process.cwd()}`);
+ port.unref();
+ port.postMessage({ type: messageTypes.UP_AND_RUNNING });
+ if (doEval) {
+ const { evalScript } = require('internal/process/execution');
+ evalScript('[worker eval]', filename);
+ } else {
+ process.argv[1] = filename; // script filename
+ require('module').runMain();
+ }
+ return;
+ } else if (message.type === messageTypes.STDIO_PAYLOAD) {
+ const { stream, chunk, encoding } = message;
+ workerStdio[stream].push(chunk, encoding);
+ return;
+ } else if (message.type === messageTypes.STDIO_WANTS_MORE_DATA) {
+ const { stream } = message;
+ workerStdio[stream][kStdioWantsMoreDataCallback]();
+ return;
+ }
+
+ require('assert').fail(`Unknown worker message type ${message.type}`);
+ };
+}
+
+// XXX(joyeecheung): this has to be returned as an anonymous function
+// wrapped in a closure, see the comment of the original
+// process._fatalException in lib/internal/process/execution.js
+function createWorkerFatalExeception(port) {
+ const {
+ fatalException: originalFatalException
+ } = require('internal/process/execution');
+
+ return (error) => {
+ debug(`[${threadId}] gets fatal exception`);
+ let caught = false;
+ try {
+ caught = originalFatalException.call(this, error);
+ } catch (e) {
+ error = e;
+ }
+ debug(`[${threadId}] fatal exception caught = ${caught}`);
+
+ if (!caught) {
+ let serialized;
+ try {
+ const { serializeError } = require('internal/error-serdes');
+ serialized = serializeError(error);
+ } catch {}
+ debug(`[${threadId}] fatal exception serialized = ${!!serialized}`);
+ if (serialized)
+ port.postMessage({
+ type: messageTypes.ERROR_MESSAGE,
+ error: serialized
+ });
+ else
+ port.postMessage({ type: messageTypes.COULD_NOT_SERIALIZE_ERROR });
+
+ const { clearAsyncIdStack } = require('internal/async_hooks');
+ clearAsyncIdStack();
+
+ process.exit();
+ }
+ };
+}
+
function setup() {
debug(`[${threadId}] is setting up worker child environment`);
const port = getEnvMessagePort();
- const publicWorker = require('worker_threads');
- port.on('message', createMessageHandler(publicWorker, port, workerStdio));
+ port.on('message', createMessageHandler(port));
port.start();
return {