summaryrefslogtreecommitdiff
path: root/lib/internal/worker.js
diff options
context:
space:
mode:
authorAnna Henningsen <anna@addaleax.net>2018-05-13 23:25:14 +0200
committerAnna Henningsen <anna@addaleax.net>2018-06-06 19:43:56 +0200
commitddefa0f2c58450a1fa6cb7cd3b38ba02e733d4db (patch)
treef97649e6b44fa3cb10cf80b40ac4bca7d8253695 /lib/internal/worker.js
parent147ea5e3d78effd6da8ea2bfbf6bb77b6aaf52da (diff)
downloadandroid-node-v8-ddefa0f2c58450a1fa6cb7cd3b38ba02e733d4db.tar.gz
android-node-v8-ddefa0f2c58450a1fa6cb7cd3b38ba02e733d4db.tar.bz2
android-node-v8-ddefa0f2c58450a1fa6cb7cd3b38ba02e733d4db.zip
worker: enable stdio
Provide `stdin`, `stdout` and `stderr` options for the `Worker` constructor, and make these available to the worker thread under their usual names. The default for `stdin` is an empty stream, the default for `stdout` and `stderr` is redirecting to the parent thread’s corresponding stdio streams. PR-URL: https://github.com/nodejs/node/pull/20876 Reviewed-By: Gireesh Punathil <gpunathi@in.ibm.com> Reviewed-By: Benjamin Gruenbaum <benjamingr@gmail.com> Reviewed-By: Shingo Inoue <leko.noor@gmail.com> Reviewed-By: Matteo Collina <matteo.collina@gmail.com> Reviewed-By: Tiancheng "Timothy" Gu <timothygu99@gmail.com> Reviewed-By: John-David Dalton <john.david.dalton@gmail.com> Reviewed-By: Gus Caplan <me@gus.host>
Diffstat (limited to 'lib/internal/worker.js')
-rw-r--r--lib/internal/worker.js166
1 files changed, 163 insertions, 3 deletions
diff --git a/lib/internal/worker.js b/lib/internal/worker.js
index edd954d8a3..5bd4c215e0 100644
--- a/lib/internal/worker.js
+++ b/lib/internal/worker.js
@@ -5,6 +5,7 @@ const EventEmitter = require('events');
const assert = require('assert');
const path = require('path');
const util = require('util');
+const { Readable, Writable } = require('stream');
const {
ERR_INVALID_ARG_TYPE,
ERR_WORKER_NEED_ABSOLUTE_PATH,
@@ -29,6 +30,7 @@ const isMainThread = threadId === 0;
const kOnMessageListener = Symbol('kOnMessageListener');
const kHandle = Symbol('kHandle');
+const kName = Symbol('kName');
const kPort = Symbol('kPort');
const kPublicPort = Symbol('kPublicPort');
const kDispose = Symbol('kDispose');
@@ -36,6 +38,12 @@ const kOnExit = Symbol('kOnExit');
const kOnMessage = Symbol('kOnMessage');
const kOnCouldNotSerializeErr = Symbol('kOnCouldNotSerializeErr');
const kOnErrorMessage = Symbol('kOnErrorMessage');
+const kParentSideStdio = Symbol('kParentSideStdio');
+const kWritableCallbacks = Symbol('kWritableCallbacks');
+const kStdioWantsMoreDataCallback = Symbol('kStdioWantsMoreDataCallback');
+const kStartedReading = Symbol('kStartedReading');
+const kWaitingStreams = Symbol('kWaitingStreams');
+const kIncrementsPortRef = Symbol('kIncrementsPortRef');
const debug = util.debuglog('worker');
@@ -129,6 +137,72 @@ function setupPortReferencing(port, eventEmitter, eventName) {
}
+class ReadableWorkerStdio extends Readable {
+ constructor(port, name) {
+ super();
+ this[kPort] = port;
+ this[kName] = name;
+ this[kIncrementsPortRef] = true;
+ this[kStartedReading] = false;
+ this.on('end', () => {
+ if (this[kIncrementsPortRef] && --this[kPort][kWaitingStreams] === 0)
+ this[kPort].unref();
+ });
+ }
+
+ _read() {
+ if (!this[kStartedReading] && this[kIncrementsPortRef]) {
+ this[kStartedReading] = true;
+ if (this[kPort][kWaitingStreams]++ === 0)
+ this[kPort].ref();
+ }
+
+ this[kPort].postMessage({
+ type: 'stdioWantsMoreData',
+ stream: this[kName]
+ });
+ }
+}
+
+class WritableWorkerStdio extends Writable {
+ constructor(port, name) {
+ super({ decodeStrings: false });
+ this[kPort] = port;
+ this[kName] = name;
+ this[kWritableCallbacks] = [];
+ }
+
+ _write(chunk, encoding, cb) {
+ this[kPort].postMessage({
+ type: 'stdioPayload',
+ stream: this[kName],
+ chunk,
+ encoding
+ });
+ this[kWritableCallbacks].push(cb);
+ if (this[kPort][kWaitingStreams]++ === 0)
+ this[kPort].ref();
+ }
+
+ _final(cb) {
+ this[kPort].postMessage({
+ type: 'stdioPayload',
+ stream: this[kName],
+ chunk: null
+ });
+ cb();
+ }
+
+ [kStdioWantsMoreDataCallback]() {
+ const cbs = this[kWritableCallbacks];
+ this[kWritableCallbacks] = [];
+ for (const cb of cbs)
+ cb();
+ if ((this[kPort][kWaitingStreams] -= cbs.length) === 0)
+ this[kPort].unref();
+ }
+}
+
class Worker extends EventEmitter {
constructor(filename, options = {}) {
super();
@@ -154,8 +228,25 @@ class Worker extends EventEmitter {
this[kPort].on('message', (data) => this[kOnMessage](data));
this[kPort].start();
this[kPort].unref();
+ this[kPort][kWaitingStreams] = 0;
debug(`[${threadId}] created Worker with ID ${this.threadId}`);
+ let stdin = null;
+ if (options.stdin)
+ stdin = new WritableWorkerStdio(this[kPort], 'stdin');
+ const stdout = new ReadableWorkerStdio(this[kPort], 'stdout');
+ if (!options.stdout) {
+ stdout[kIncrementsPortRef] = false;
+ pipeWithoutWarning(stdout, process.stdout);
+ }
+ const stderr = new ReadableWorkerStdio(this[kPort], 'stderr');
+ if (!options.stderr) {
+ stderr[kIncrementsPortRef] = false;
+ pipeWithoutWarning(stderr, process.stderr);
+ }
+
+ this[kParentSideStdio] = { stdin, stdout, stderr };
+
const { port1, port2 } = new MessageChannel();
this[kPublicPort] = port1;
this[kPublicPort].on('message', (message) => this.emit('message', message));
@@ -165,7 +256,8 @@ class Worker extends EventEmitter {
filename,
doEval: !!options.eval,
workerData: options.workerData,
- publicPort: port2
+ publicPort: port2,
+ hasStdin: !!options.stdin
}, [port2]);
// Actually start the new thread now that everything is in place.
this[kHandle].startThread();
@@ -197,6 +289,16 @@ class Worker extends EventEmitter {
return this[kOnCouldNotSerializeErr]();
case 'errorMessage':
return this[kOnErrorMessage](message.error);
+ case 'stdioPayload':
+ {
+ const { stream, chunk, encoding } = message;
+ return this[kParentSideStdio][stream].push(chunk, encoding);
+ }
+ case 'stdioWantsMoreData':
+ {
+ const { stream } = message;
+ return this[kParentSideStdio][stream][kStdioWantsMoreDataCallback]();
+ }
}
assert.fail(`Unknown worker message type ${message.type}`);
@@ -207,6 +309,18 @@ class Worker extends EventEmitter {
this[kHandle] = null;
this[kPort] = null;
this[kPublicPort] = null;
+
+ const { stdout, stderr } = this[kParentSideStdio];
+ this[kParentSideStdio] = null;
+
+ if (!stdout._readableState.ended) {
+ debug(`[${threadId}] explicitly closes stdout for ${this.threadId}`);
+ stdout.push(null);
+ }
+ if (!stderr._readableState.ended) {
+ debug(`[${threadId}] explicitly closes stderr for ${this.threadId}`);
+ stderr.push(null);
+ }
}
postMessage(...args) {
@@ -243,6 +357,27 @@ class Worker extends EventEmitter {
return this[kHandle].threadId;
}
+
+ get stdin() {
+ return this[kParentSideStdio].stdin;
+ }
+
+ get stdout() {
+ return this[kParentSideStdio].stdout;
+ }
+
+ get stderr() {
+ return this[kParentSideStdio].stderr;
+ }
+}
+
+const workerStdio = {};
+if (!isMainThread) {
+ const port = getEnvMessagePort();
+ port[kWaitingStreams] = 0;
+ workerStdio.stdin = new ReadableWorkerStdio(port, 'stdin');
+ workerStdio.stdout = new WritableWorkerStdio(port, 'stdout');
+ workerStdio.stderr = new WritableWorkerStdio(port, 'stderr');
}
let originalFatalException;
@@ -256,10 +391,14 @@ function setupChild(evalScript) {
port.on('message', (message) => {
if (message.type === 'loadScript') {
- const { filename, doEval, workerData, publicPort } = message;
+ const { filename, doEval, workerData, publicPort, hasStdin } = message;
publicWorker.parentPort = publicPort;
setupPortReferencing(publicPort, publicPort, 'message');
publicWorker.workerData = workerData;
+
+ if (!hasStdin)
+ workerStdio.stdin.push(null);
+
debug(`[${threadId}] starts worker script ${filename} ` +
`(eval = ${eval}) at cwd = ${process.cwd()}`);
port.unref();
@@ -271,6 +410,14 @@ function setupChild(evalScript) {
require('module').runMain();
}
return;
+ } else if (message.type === 'stdioPayload') {
+ const { stream, chunk, encoding } = message;
+ workerStdio[stream].push(chunk, encoding);
+ return;
+ } else if (message.type === 'stdioWantsMoreData') {
+ const { stream } = message;
+ workerStdio[stream][kStdioWantsMoreDataCallback]();
+ return;
}
assert.fail(`Unknown worker message type ${message.type}`);
@@ -317,11 +464,24 @@ function deserializeError(error) {
error.byteLength).toString('utf8');
}
+function pipeWithoutWarning(source, dest) {
+ const sourceMaxListeners = source._maxListeners;
+ const destMaxListeners = dest._maxListeners;
+ source.setMaxListeners(Infinity);
+ dest.setMaxListeners(Infinity);
+
+ source.pipe(dest);
+
+ source._maxListeners = sourceMaxListeners;
+ dest._maxListeners = destMaxListeners;
+}
+
module.exports = {
MessagePort,
MessageChannel,
threadId,
Worker,
setupChild,
- isMainThread
+ isMainThread,
+ workerStdio
};