diff options
author | Anna Henningsen <anna@addaleax.net> | 2018-05-13 23:25:14 +0200 |
---|---|---|
committer | Anna Henningsen <anna@addaleax.net> | 2018-06-06 19:43:56 +0200 |
commit | ddefa0f2c58450a1fa6cb7cd3b38ba02e733d4db (patch) | |
tree | f97649e6b44fa3cb10cf80b40ac4bca7d8253695 /lib/internal/worker.js | |
parent | 147ea5e3d78effd6da8ea2bfbf6bb77b6aaf52da (diff) | |
download | android-node-v8-ddefa0f2c58450a1fa6cb7cd3b38ba02e733d4db.tar.gz android-node-v8-ddefa0f2c58450a1fa6cb7cd3b38ba02e733d4db.tar.bz2 android-node-v8-ddefa0f2c58450a1fa6cb7cd3b38ba02e733d4db.zip |
worker: enable stdio
Provide `stdin`, `stdout` and `stderr` options for the `Worker`
constructor, and make these available to the worker thread
under their usual names.
The default for `stdin` is an empty stream, the default for
`stdout` and `stderr` is redirecting to the parent thread’s
corresponding stdio streams.
PR-URL: https://github.com/nodejs/node/pull/20876
Reviewed-By: Gireesh Punathil <gpunathi@in.ibm.com>
Reviewed-By: Benjamin Gruenbaum <benjamingr@gmail.com>
Reviewed-By: Shingo Inoue <leko.noor@gmail.com>
Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
Reviewed-By: Tiancheng "Timothy" Gu <timothygu99@gmail.com>
Reviewed-By: John-David Dalton <john.david.dalton@gmail.com>
Reviewed-By: Gus Caplan <me@gus.host>
Diffstat (limited to 'lib/internal/worker.js')
-rw-r--r-- | lib/internal/worker.js | 166 |
1 files changed, 163 insertions, 3 deletions
diff --git a/lib/internal/worker.js b/lib/internal/worker.js index edd954d8a3..5bd4c215e0 100644 --- a/lib/internal/worker.js +++ b/lib/internal/worker.js @@ -5,6 +5,7 @@ const EventEmitter = require('events'); const assert = require('assert'); const path = require('path'); const util = require('util'); +const { Readable, Writable } = require('stream'); const { ERR_INVALID_ARG_TYPE, ERR_WORKER_NEED_ABSOLUTE_PATH, @@ -29,6 +30,7 @@ const isMainThread = threadId === 0; const kOnMessageListener = Symbol('kOnMessageListener'); const kHandle = Symbol('kHandle'); +const kName = Symbol('kName'); const kPort = Symbol('kPort'); const kPublicPort = Symbol('kPublicPort'); const kDispose = Symbol('kDispose'); @@ -36,6 +38,12 @@ const kOnExit = Symbol('kOnExit'); const kOnMessage = Symbol('kOnMessage'); const kOnCouldNotSerializeErr = Symbol('kOnCouldNotSerializeErr'); const kOnErrorMessage = Symbol('kOnErrorMessage'); +const kParentSideStdio = Symbol('kParentSideStdio'); +const kWritableCallbacks = Symbol('kWritableCallbacks'); +const kStdioWantsMoreDataCallback = Symbol('kStdioWantsMoreDataCallback'); +const kStartedReading = Symbol('kStartedReading'); +const kWaitingStreams = Symbol('kWaitingStreams'); +const kIncrementsPortRef = Symbol('kIncrementsPortRef'); const debug = util.debuglog('worker'); @@ -129,6 +137,72 @@ function setupPortReferencing(port, eventEmitter, eventName) { } +class ReadableWorkerStdio extends Readable { + constructor(port, name) { + super(); + this[kPort] = port; + this[kName] = name; + this[kIncrementsPortRef] = true; + this[kStartedReading] = false; + this.on('end', () => { + if (this[kIncrementsPortRef] && --this[kPort][kWaitingStreams] === 0) + this[kPort].unref(); + }); + } + + _read() { + if (!this[kStartedReading] && this[kIncrementsPortRef]) { + this[kStartedReading] = true; + if (this[kPort][kWaitingStreams]++ === 0) + this[kPort].ref(); + } + + this[kPort].postMessage({ + type: 'stdioWantsMoreData', + stream: this[kName] + }); + } +} + +class WritableWorkerStdio extends Writable { + constructor(port, name) { + super({ decodeStrings: false }); + this[kPort] = port; + this[kName] = name; + this[kWritableCallbacks] = []; + } + + _write(chunk, encoding, cb) { + this[kPort].postMessage({ + type: 'stdioPayload', + stream: this[kName], + chunk, + encoding + }); + this[kWritableCallbacks].push(cb); + if (this[kPort][kWaitingStreams]++ === 0) + this[kPort].ref(); + } + + _final(cb) { + this[kPort].postMessage({ + type: 'stdioPayload', + stream: this[kName], + chunk: null + }); + cb(); + } + + [kStdioWantsMoreDataCallback]() { + const cbs = this[kWritableCallbacks]; + this[kWritableCallbacks] = []; + for (const cb of cbs) + cb(); + if ((this[kPort][kWaitingStreams] -= cbs.length) === 0) + this[kPort].unref(); + } +} + class Worker extends EventEmitter { constructor(filename, options = {}) { super(); @@ -154,8 +228,25 @@ class Worker extends EventEmitter { this[kPort].on('message', (data) => this[kOnMessage](data)); this[kPort].start(); this[kPort].unref(); + this[kPort][kWaitingStreams] = 0; debug(`[${threadId}] created Worker with ID ${this.threadId}`); + let stdin = null; + if (options.stdin) + stdin = new WritableWorkerStdio(this[kPort], 'stdin'); + const stdout = new ReadableWorkerStdio(this[kPort], 'stdout'); + if (!options.stdout) { + stdout[kIncrementsPortRef] = false; + pipeWithoutWarning(stdout, process.stdout); + } + const stderr = new ReadableWorkerStdio(this[kPort], 'stderr'); + if (!options.stderr) { + stderr[kIncrementsPortRef] = false; + pipeWithoutWarning(stderr, process.stderr); + } + + this[kParentSideStdio] = { stdin, stdout, stderr }; + const { port1, port2 } = new MessageChannel(); this[kPublicPort] = port1; this[kPublicPort].on('message', (message) => this.emit('message', message)); @@ -165,7 +256,8 @@ class Worker extends EventEmitter { filename, doEval: !!options.eval, workerData: options.workerData, - publicPort: port2 + publicPort: port2, + hasStdin: !!options.stdin }, [port2]); // Actually start the new thread now that everything is in place. this[kHandle].startThread(); @@ -197,6 +289,16 @@ class Worker extends EventEmitter { return this[kOnCouldNotSerializeErr](); case 'errorMessage': return this[kOnErrorMessage](message.error); + case 'stdioPayload': + { + const { stream, chunk, encoding } = message; + return this[kParentSideStdio][stream].push(chunk, encoding); + } + case 'stdioWantsMoreData': + { + const { stream } = message; + return this[kParentSideStdio][stream][kStdioWantsMoreDataCallback](); + } } assert.fail(`Unknown worker message type ${message.type}`); @@ -207,6 +309,18 @@ class Worker extends EventEmitter { this[kHandle] = null; this[kPort] = null; this[kPublicPort] = null; + + const { stdout, stderr } = this[kParentSideStdio]; + this[kParentSideStdio] = null; + + if (!stdout._readableState.ended) { + debug(`[${threadId}] explicitly closes stdout for ${this.threadId}`); + stdout.push(null); + } + if (!stderr._readableState.ended) { + debug(`[${threadId}] explicitly closes stderr for ${this.threadId}`); + stderr.push(null); + } } postMessage(...args) { @@ -243,6 +357,27 @@ class Worker extends EventEmitter { return this[kHandle].threadId; } + + get stdin() { + return this[kParentSideStdio].stdin; + } + + get stdout() { + return this[kParentSideStdio].stdout; + } + + get stderr() { + return this[kParentSideStdio].stderr; + } +} + +const workerStdio = {}; +if (!isMainThread) { + const port = getEnvMessagePort(); + port[kWaitingStreams] = 0; + workerStdio.stdin = new ReadableWorkerStdio(port, 'stdin'); + workerStdio.stdout = new WritableWorkerStdio(port, 'stdout'); + workerStdio.stderr = new WritableWorkerStdio(port, 'stderr'); } let originalFatalException; @@ -256,10 +391,14 @@ function setupChild(evalScript) { port.on('message', (message) => { if (message.type === 'loadScript') { - const { filename, doEval, workerData, publicPort } = message; + const { filename, doEval, workerData, publicPort, hasStdin } = message; publicWorker.parentPort = publicPort; setupPortReferencing(publicPort, publicPort, 'message'); publicWorker.workerData = workerData; + + if (!hasStdin) + workerStdio.stdin.push(null); + debug(`[${threadId}] starts worker script ${filename} ` + `(eval = ${eval}) at cwd = ${process.cwd()}`); port.unref(); @@ -271,6 +410,14 @@ function setupChild(evalScript) { require('module').runMain(); } return; + } else if (message.type === 'stdioPayload') { + const { stream, chunk, encoding } = message; + workerStdio[stream].push(chunk, encoding); + return; + } else if (message.type === 'stdioWantsMoreData') { + const { stream } = message; + workerStdio[stream][kStdioWantsMoreDataCallback](); + return; } assert.fail(`Unknown worker message type ${message.type}`); @@ -317,11 +464,24 @@ function deserializeError(error) { error.byteLength).toString('utf8'); } +function pipeWithoutWarning(source, dest) { + const sourceMaxListeners = source._maxListeners; + const destMaxListeners = dest._maxListeners; + source.setMaxListeners(Infinity); + dest.setMaxListeners(Infinity); + + source.pipe(dest); + + source._maxListeners = sourceMaxListeners; + dest._maxListeners = destMaxListeners; +} + module.exports = { MessagePort, MessageChannel, threadId, Worker, setupChild, - isMainThread + isMainThread, + workerStdio }; |