summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--lib/internal/bootstrap/node.js43
-rw-r--r--lib/internal/process/main_thread_only.js10
-rw-r--r--lib/internal/process/stdio.js26
-rw-r--r--lib/internal/process/worker_thread_only.js51
-rw-r--r--lib/internal/worker.js274
-rw-r--r--lib/internal/worker/io.js245
-rw-r--r--lib/worker_threads.js7
-rw-r--r--node.gyp1
8 files changed, 358 insertions, 299 deletions
diff --git a/lib/internal/bootstrap/node.js b/lib/internal/bootstrap/node.js
index 4de951ca54..81ec1fc12b 100644
--- a/lib/internal/bootstrap/node.js
+++ b/lib/internal/bootstrap/node.js
@@ -140,9 +140,13 @@ function startup() {
}
if (isMainThread) {
- mainThreadSetup.setupStdio();
+ const { getStdout, getStdin, getStderr } =
+ NativeModule.require('internal/process/stdio').getMainThreadStdio();
+ setupProcessStdio(getStdout, getStdin, getStderr);
} else {
- workerThreadSetup.setupStdio();
+ const { getStdout, getStdin, getStderr } =
+ workerThreadSetup.initializeWorkerStdio();
+ setupProcessStdio(getStdout, getStdin, getStderr);
}
if (global.__coverage__)
@@ -312,8 +316,14 @@ function startup() {
function startExecution() {
// This means we are in a Worker context, and any script execution
// will be directed by the worker module.
- if (internalBinding('worker').getEnvMessagePort() !== undefined) {
- NativeModule.require('internal/worker').setupChild();
+ if (!isMainThread) {
+ const workerThreadSetup = NativeModule.require(
+ 'internal/process/worker_thread_only'
+ );
+ // Set up the message port and start listening
+ const { workerFatalExeception } = workerThreadSetup.setup();
+ // Overwrite fatalException
+ process._fatalException = workerFatalExeception;
return;
}
@@ -505,6 +515,31 @@ function setupProcessObject() {
EventEmitter.call(process);
}
+function setupProcessStdio(getStdout, getStdin, getStderr) {
+ Object.defineProperty(process, 'stdout', {
+ configurable: true,
+ enumerable: true,
+ get: getStdout
+ });
+
+ Object.defineProperty(process, 'stderr', {
+ configurable: true,
+ enumerable: true,
+ get: getStderr
+ });
+
+ Object.defineProperty(process, 'stdin', {
+ configurable: true,
+ enumerable: true,
+ get: getStdin
+ });
+
+ process.openStdin = function() {
+ process.stdin.resume();
+ return process.stdin;
+ };
+}
+
function setupGlobalVariables() {
Object.defineProperty(global, Symbol.toStringTag, {
value: 'global',
diff --git a/lib/internal/process/main_thread_only.js b/lib/internal/process/main_thread_only.js
index 862194ae46..42579e9da8 100644
--- a/lib/internal/process/main_thread_only.js
+++ b/lib/internal/process/main_thread_only.js
@@ -16,15 +16,6 @@ const {
validateString
} = require('internal/validators');
-const {
- setupProcessStdio,
- getMainThreadStdio
-} = require('internal/process/stdio');
-
-function setupStdio() {
- setupProcessStdio(getMainThreadStdio());
-}
-
// The execution of this function itself should not cause any side effects.
function wrapProcessMethods(binding) {
function chdir(directory) {
@@ -174,7 +165,6 @@ function setupChildProcessIpcChannel() {
}
module.exports = {
- setupStdio,
wrapProcessMethods,
setupSignalHandlers,
setupChildProcessIpcChannel,
diff --git a/lib/internal/process/stdio.js b/lib/internal/process/stdio.js
index 5e9ff6b260..bf5f6df15f 100644
--- a/lib/internal/process/stdio.js
+++ b/lib/internal/process/stdio.js
@@ -1,6 +1,5 @@
'use strict';
-exports.setupProcessStdio = setupProcessStdio;
exports.getMainThreadStdio = getMainThreadStdio;
function dummyDestroy(err, cb) { cb(err); }
@@ -134,31 +133,6 @@ function getMainThreadStdio() {
};
}
-function setupProcessStdio({ getStdout, getStdin, getStderr }) {
- Object.defineProperty(process, 'stdout', {
- configurable: true,
- enumerable: true,
- get: getStdout
- });
-
- Object.defineProperty(process, 'stderr', {
- configurable: true,
- enumerable: true,
- get: getStderr
- });
-
- Object.defineProperty(process, 'stdin', {
- configurable: true,
- enumerable: true,
- get: getStdin
- });
-
- process.openStdin = function() {
- process.stdin.resume();
- return process.stdin;
- };
-}
-
function createWritableStdioStream(fd) {
var stream;
const tty_wrap = internalBinding('tty_wrap');
diff --git a/lib/internal/process/worker_thread_only.js b/lib/internal/process/worker_thread_only.js
index 834ba6078f..a9332fb427 100644
--- a/lib/internal/process/worker_thread_only.js
+++ b/lib/internal/process/worker_thread_only.js
@@ -2,23 +2,54 @@
// This file contains process bootstrappers that can only be
// run in the worker thread.
+const {
+ getEnvMessagePort,
+ threadId
+} = internalBinding('worker');
+
+const debug = require('util').debuglog('worker');
const {
- setupProcessStdio
-} = require('internal/process/stdio');
+ kWaitingStreams,
+ ReadableWorkerStdio,
+ WritableWorkerStdio
+} = require('internal/worker/io');
const {
- workerStdio
+ createMessageHandler,
+ createWorkerFatalExeception
} = require('internal/worker');
-function setupStdio() {
- setupProcessStdio({
- getStdout: () => workerStdio.stdout,
- getStderr: () => workerStdio.stderr,
- getStdin: () => workerStdio.stdin
- });
+const workerStdio = {};
+
+function initializeWorkerStdio() {
+ const port = getEnvMessagePort();
+ port[kWaitingStreams] = 0;
+ workerStdio.stdin = new ReadableWorkerStdio(port, 'stdin');
+ workerStdio.stdout = new WritableWorkerStdio(port, 'stdout');
+ workerStdio.stderr = new WritableWorkerStdio(port, 'stderr');
+
+ return {
+ getStdout() { return workerStdio.stdout; },
+ getStderr() { return workerStdio.stderr; },
+ getStdin() { return workerStdio.stdin; }
+ };
+}
+
+function setup() {
+ debug(`[${threadId}] is setting up worker child environment`);
+
+ const port = getEnvMessagePort();
+ const publicWorker = require('worker_threads');
+ port.on('message', createMessageHandler(publicWorker, port, workerStdio));
+ port.start();
+
+ return {
+ workerFatalExeception: createWorkerFatalExeception(port)
+ };
}
module.exports = {
- setupStdio
+ initializeWorkerStdio,
+ setup
};
diff --git a/lib/internal/worker.js b/lib/internal/worker.js
index c079afbeb1..c4393d0459 100644
--- a/lib/internal/worker.js
+++ b/lib/internal/worker.js
@@ -4,35 +4,37 @@ const EventEmitter = require('events');
const assert = require('assert');
const path = require('path');
const util = require('util');
-const { Readable, Writable } = require('stream');
const {
ERR_WORKER_PATH,
ERR_WORKER_UNSERIALIZABLE_ERROR,
ERR_WORKER_UNSUPPORTED_EXTENSION,
} = require('internal/errors').codes;
const { validateString } = require('internal/validators');
+const { clearAsyncIdStack } = require('internal/async_hooks');
-const { MessagePort, MessageChannel } = internalBinding('messaging');
const {
- handle_onclose: handleOnCloseSymbol,
- oninit: onInitSymbol
-} = internalBinding('symbols');
-const { clearAsyncIdStack } = require('internal/async_hooks');
+ drainMessagePort,
+ MessageChannel,
+ messageTypes,
+ kPort,
+ kIncrementsPortRef,
+ kWaitingStreams,
+ kStdioWantsMoreDataCallback,
+ setupPortReferencing,
+ ReadableWorkerStdio,
+ WritableWorkerStdio,
+} = require('internal/worker/io');
const { serializeError, deserializeError } = require('internal/error-serdes');
const { pathToFileURL } = require('url');
const {
Worker: WorkerImpl,
- getEnvMessagePort,
threadId
} = internalBinding('worker');
const isMainThread = threadId === 0;
-const kOnMessageListener = Symbol('kOnMessageListener');
const kHandle = Symbol('kHandle');
-const kName = Symbol('kName');
-const kPort = Symbol('kPort');
const kPublicPort = Symbol('kPublicPort');
const kDispose = Symbol('kDispose');
const kOnExit = Symbol('kOnExit');
@@ -40,213 +42,9 @@ const kOnMessage = Symbol('kOnMessage');
const kOnCouldNotSerializeErr = Symbol('kOnCouldNotSerializeErr');
const kOnErrorMessage = Symbol('kOnErrorMessage');
const kParentSideStdio = Symbol('kParentSideStdio');
-const kWritableCallbacks = Symbol('kWritableCallbacks');
-const kStdioWantsMoreDataCallback = Symbol('kStdioWantsMoreDataCallback');
-const kStartedReading = Symbol('kStartedReading');
-const kWaitingStreams = Symbol('kWaitingStreams');
-const kIncrementsPortRef = Symbol('kIncrementsPortRef');
const debug = util.debuglog('worker');
-const messageTypes = {
- UP_AND_RUNNING: 'upAndRunning',
- COULD_NOT_SERIALIZE_ERROR: 'couldNotSerializeError',
- ERROR_MESSAGE: 'errorMessage',
- STDIO_PAYLOAD: 'stdioPayload',
- STDIO_WANTS_MORE_DATA: 'stdioWantsMoreData',
- LOAD_SCRIPT: 'loadScript'
-};
-
-// We have to mess with the MessagePort prototype a bit, so that a) we can make
-// it inherit from EventEmitter, even though it is a C++ class, and b) we do
-// not provide methods that are not present in the Browser and not documented
-// on our side (e.g. hasRef).
-// Save a copy of the original set of methods as a shallow clone.
-const MessagePortPrototype = Object.create(
- Object.getPrototypeOf(MessagePort.prototype),
- Object.getOwnPropertyDescriptors(MessagePort.prototype));
-// Set up the new inheritance chain.
-Object.setPrototypeOf(MessagePort, EventEmitter);
-Object.setPrototypeOf(MessagePort.prototype, EventEmitter.prototype);
-// Finally, purge methods we don't want to be public.
-delete MessagePort.prototype.stop;
-delete MessagePort.prototype.drain;
-MessagePort.prototype.ref = MessagePortPrototype.ref;
-MessagePort.prototype.unref = MessagePortPrototype.unref;
-
-// A communication channel consisting of a handle (that wraps around an
-// uv_async_t) which can receive information from other threads and emits
-// .onmessage events, and a function used for sending data to a MessagePort
-// in some other thread.
-MessagePort.prototype[kOnMessageListener] = function onmessage(payload) {
- debug(`[${threadId}] received message`, payload);
- // Emit the deserialized object to userland.
- this.emit('message', payload);
-};
-
-// This is for compatibility with the Web's MessagePort API. It makes sense to
-// provide it as an `EventEmitter` in Node.js, but if somebody overrides
-// `onmessage`, we'll switch over to the Web API model.
-Object.defineProperty(MessagePort.prototype, 'onmessage', {
- enumerable: true,
- configurable: true,
- get() {
- return this[kOnMessageListener];
- },
- set(value) {
- this[kOnMessageListener] = value;
- if (typeof value === 'function') {
- this.ref();
- MessagePortPrototype.start.call(this);
- } else {
- this.unref();
- MessagePortPrototype.stop.call(this);
- }
- }
-});
-
-// This is called from inside the `MessagePort` constructor.
-function oninit() {
- setupPortReferencing(this, this, 'message');
-}
-
-Object.defineProperty(MessagePort.prototype, onInitSymbol, {
- enumerable: true,
- writable: false,
- value: oninit
-});
-
-// This is called after the underlying `uv_async_t` has been closed.
-function onclose() {
- if (typeof this.onclose === 'function') {
- // Not part of the Web standard yet, but there aren't many reasonable
- // alternatives in a non-EventEmitter usage setting.
- // Refs: https://github.com/whatwg/html/issues/1766
- this.onclose();
- }
- this.emit('close');
-}
-
-Object.defineProperty(MessagePort.prototype, handleOnCloseSymbol, {
- enumerable: false,
- writable: false,
- value: onclose
-});
-
-MessagePort.prototype.close = function(cb) {
- if (typeof cb === 'function')
- this.once('close', cb);
- MessagePortPrototype.close.call(this);
-};
-
-Object.defineProperty(MessagePort.prototype, util.inspect.custom, {
- enumerable: false,
- writable: false,
- value: function inspect() { // eslint-disable-line func-name-matching
- let ref;
- try {
- // This may throw when `this` does not refer to a native object,
- // e.g. when accessing the prototype directly.
- ref = MessagePortPrototype.hasRef.call(this);
- } catch { return this; }
- return Object.assign(Object.create(MessagePort.prototype),
- ref === undefined ? {
- active: false,
- } : {
- active: true,
- refed: ref
- },
- this);
- }
-});
-
-function setupPortReferencing(port, eventEmitter, eventName) {
- // Keep track of whether there are any workerMessage listeners:
- // If there are some, ref() the channel so it keeps the event loop alive.
- // If there are none or all are removed, unref() the channel so the worker
- // can shutdown gracefully.
- port.unref();
- eventEmitter.on('newListener', (name) => {
- if (name === eventName && eventEmitter.listenerCount(eventName) === 0) {
- port.ref();
- MessagePortPrototype.start.call(port);
- }
- });
- eventEmitter.on('removeListener', (name) => {
- if (name === eventName && eventEmitter.listenerCount(eventName) === 0) {
- MessagePortPrototype.stop.call(port);
- port.unref();
- }
- });
-}
-
-
-class ReadableWorkerStdio extends Readable {
- constructor(port, name) {
- super();
- this[kPort] = port;
- this[kName] = name;
- this[kIncrementsPortRef] = true;
- this[kStartedReading] = false;
- this.on('end', () => {
- if (this[kIncrementsPortRef] && --this[kPort][kWaitingStreams] === 0)
- this[kPort].unref();
- });
- }
-
- _read() {
- if (!this[kStartedReading] && this[kIncrementsPortRef]) {
- this[kStartedReading] = true;
- if (this[kPort][kWaitingStreams]++ === 0)
- this[kPort].ref();
- }
-
- this[kPort].postMessage({
- type: messageTypes.STDIO_WANTS_MORE_DATA,
- stream: this[kName]
- });
- }
-}
-
-class WritableWorkerStdio extends Writable {
- constructor(port, name) {
- super({ decodeStrings: false });
- this[kPort] = port;
- this[kName] = name;
- this[kWritableCallbacks] = [];
- }
-
- _write(chunk, encoding, cb) {
- this[kPort].postMessage({
- type: messageTypes.STDIO_PAYLOAD,
- stream: this[kName],
- chunk,
- encoding
- });
- this[kWritableCallbacks].push(cb);
- if (this[kPort][kWaitingStreams]++ === 0)
- this[kPort].ref();
- }
-
- _final(cb) {
- this[kPort].postMessage({
- type: messageTypes.STDIO_PAYLOAD,
- stream: this[kName],
- chunk: null
- });
- cb();
- }
-
- [kStdioWantsMoreDataCallback]() {
- const cbs = this[kWritableCallbacks];
- this[kWritableCallbacks] = [];
- for (const cb of cbs)
- cb();
- if ((this[kPort][kWaitingStreams] -= cbs.length) === 0)
- this[kPort].unref();
- }
-}
-
class Worker extends EventEmitter {
constructor(filename, options = {}) {
super();
@@ -314,8 +112,8 @@ class Worker extends EventEmitter {
[kOnExit](code) {
debug(`[${threadId}] hears end event for Worker ${this.threadId}`);
- MessagePortPrototype.drain.call(this[kPublicPort]);
- MessagePortPrototype.drain.call(this[kPort]);
+ drainMessagePort(this[kPublicPort]);
+ drainMessagePort(this[kPort]);
this[kDispose]();
this.emit('exit', code);
this.removeAllListeners();
@@ -421,25 +219,8 @@ class Worker extends EventEmitter {
}
}
-const workerStdio = {};
-if (!isMainThread) {
- const port = getEnvMessagePort();
- port[kWaitingStreams] = 0;
- workerStdio.stdin = new ReadableWorkerStdio(port, 'stdin');
- workerStdio.stdout = new WritableWorkerStdio(port, 'stdout');
- workerStdio.stderr = new WritableWorkerStdio(port, 'stderr');
-}
-
-let originalFatalException;
-
-function setupChild() {
- // Called during bootstrap to set up worker script execution.
- debug(`[${threadId}] is setting up worker child environment`);
- const port = getEnvMessagePort();
-
- const publicWorker = require('worker_threads');
-
- port.on('message', (message) => {
+function createMessageHandler(publicWorker, port, workerStdio) {
+ return function(message) {
if (message.type === messageTypes.LOAD_SCRIPT) {
const { filename, doEval, workerData, publicPort, hasStdin } = message;
publicWorker.parentPort = publicPort;
@@ -471,14 +252,15 @@ function setupChild() {
}
assert.fail(`Unknown worker message type ${message.type}`);
- });
-
- port.start();
+ };
+}
- originalFatalException = process._fatalException;
- process._fatalException = fatalException;
+function createWorkerFatalExeception(port) {
+ const {
+ fatalException: originalFatalException
+ } = require('internal/process/execution');
- function fatalException(error) {
+ return function(error) {
debug(`[${threadId}] gets fatal exception`);
let caught = false;
try {
@@ -505,7 +287,7 @@ function setupChild() {
process.exit();
}
- }
+ };
}
function pipeWithoutWarning(source, dest) {
@@ -521,11 +303,9 @@ function pipeWithoutWarning(source, dest) {
}
module.exports = {
- MessagePort,
- MessageChannel,
+ createMessageHandler,
+ createWorkerFatalExeception,
threadId,
Worker,
- setupChild,
- isMainThread,
- workerStdio
+ isMainThread
};
diff --git a/lib/internal/worker/io.js b/lib/internal/worker/io.js
new file mode 100644
index 0000000000..d249ba8508
--- /dev/null
+++ b/lib/internal/worker/io.js
@@ -0,0 +1,245 @@
+'use strict';
+
+const {
+ handle_onclose: handleOnCloseSymbol,
+ oninit: onInitSymbol
+} = internalBinding('symbols');
+const {
+ MessagePort,
+ MessageChannel
+} = internalBinding('messaging');
+const { threadId } = internalBinding('worker');
+
+const { Readable, Writable } = require('stream');
+const EventEmitter = require('events');
+const util = require('util');
+const debug = util.debuglog('worker');
+
+const kIncrementsPortRef = Symbol('kIncrementsPortRef');
+const kName = Symbol('kName');
+const kOnMessageListener = Symbol('kOnMessageListener');
+const kPort = Symbol('kPort');
+const kWaitingStreams = Symbol('kWaitingStreams');
+const kWritableCallbacks = Symbol('kWritableCallbacks');
+const kStartedReading = Symbol('kStartedReading');
+const kStdioWantsMoreDataCallback = Symbol('kStdioWantsMoreDataCallback');
+
+const messageTypes = {
+ UP_AND_RUNNING: 'upAndRunning',
+ COULD_NOT_SERIALIZE_ERROR: 'couldNotSerializeError',
+ ERROR_MESSAGE: 'errorMessage',
+ STDIO_PAYLOAD: 'stdioPayload',
+ STDIO_WANTS_MORE_DATA: 'stdioWantsMoreData',
+ LOAD_SCRIPT: 'loadScript'
+};
+
+// Original drain from C++
+const originalDrain = MessagePort.prototype.drain;
+
+function drainMessagePort(port) {
+ return originalDrain.call(port);
+}
+
+// We have to mess with the MessagePort prototype a bit, so that a) we can make
+// it inherit from EventEmitter, even though it is a C++ class, and b) we do
+// not provide methods that are not present in the Browser and not documented
+// on our side (e.g. hasRef).
+// Save a copy of the original set of methods as a shallow clone.
+const MessagePortPrototype = Object.create(
+ Object.getPrototypeOf(MessagePort.prototype),
+ Object.getOwnPropertyDescriptors(MessagePort.prototype));
+// Set up the new inheritance chain.
+Object.setPrototypeOf(MessagePort, EventEmitter);
+Object.setPrototypeOf(MessagePort.prototype, EventEmitter.prototype);
+// Finally, purge methods we don't want to be public.
+delete MessagePort.prototype.stop;
+delete MessagePort.prototype.drain;
+MessagePort.prototype.ref = MessagePortPrototype.ref;
+MessagePort.prototype.unref = MessagePortPrototype.unref;
+
+// A communication channel consisting of a handle (that wraps around an
+// uv_async_t) which can receive information from other threads and emits
+// .onmessage events, and a function used for sending data to a MessagePort
+// in some other thread.
+MessagePort.prototype[kOnMessageListener] = function onmessage(payload) {
+ debug(`[${threadId}] received message`, payload);
+ // Emit the deserialized object to userland.
+ this.emit('message', payload);
+};
+
+// This is for compatibility with the Web's MessagePort API. It makes sense to
+// provide it as an `EventEmitter` in Node.js, but if somebody overrides
+// `onmessage`, we'll switch over to the Web API model.
+Object.defineProperty(MessagePort.prototype, 'onmessage', {
+ enumerable: true,
+ configurable: true,
+ get() {
+ return this[kOnMessageListener];
+ },
+ set(value) {
+ this[kOnMessageListener] = value;
+ if (typeof value === 'function') {
+ this.ref();
+ MessagePortPrototype.start.call(this);
+ } else {
+ this.unref();
+ MessagePortPrototype.stop.call(this);
+ }
+ }
+});
+
+// This is called from inside the `MessagePort` constructor.
+function oninit() {
+ setupPortReferencing(this, this, 'message');
+}
+
+Object.defineProperty(MessagePort.prototype, onInitSymbol, {
+ enumerable: true,
+ writable: false,
+ value: oninit
+});
+
+// This is called after the underlying `uv_async_t` has been closed.
+function onclose() {
+ if (typeof this.onclose === 'function') {
+ // Not part of the Web standard yet, but there aren't many reasonable
+ // alternatives in a non-EventEmitter usage setting.
+ // Refs: https://github.com/whatwg/html/issues/1766
+ this.onclose();
+ }
+ this.emit('close');
+}
+
+Object.defineProperty(MessagePort.prototype, handleOnCloseSymbol, {
+ enumerable: false,
+ writable: false,
+ value: onclose
+});
+
+MessagePort.prototype.close = function(cb) {
+ if (typeof cb === 'function')
+ this.once('close', cb);
+ MessagePortPrototype.close.call(this);
+};
+
+Object.defineProperty(MessagePort.prototype, util.inspect.custom, {
+ enumerable: false,
+ writable: false,
+ value: function inspect() { // eslint-disable-line func-name-matching
+ let ref;
+ try {
+ // This may throw when `this` does not refer to a native object,
+ // e.g. when accessing the prototype directly.
+ ref = MessagePortPrototype.hasRef.call(this);
+ } catch { return this; }
+ return Object.assign(Object.create(MessagePort.prototype),
+ ref === undefined ? {
+ active: false,
+ } : {
+ active: true,
+ refed: ref
+ },
+ this);
+ }
+});
+
+function setupPortReferencing(port, eventEmitter, eventName) {
+ // Keep track of whether there are any workerMessage listeners:
+ // If there are some, ref() the channel so it keeps the event loop alive.
+ // If there are none or all are removed, unref() the channel so the worker
+ // can shutdown gracefully.
+ port.unref();
+ eventEmitter.on('newListener', (name) => {
+ if (name === eventName && eventEmitter.listenerCount(eventName) === 0) {
+ port.ref();
+ MessagePortPrototype.start.call(port);
+ }
+ });
+ eventEmitter.on('removeListener', (name) => {
+ if (name === eventName && eventEmitter.listenerCount(eventName) === 0) {
+ MessagePortPrototype.stop.call(port);
+ port.unref();
+ }
+ });
+}
+
+
+class ReadableWorkerStdio extends Readable {
+ constructor(port, name) {
+ super();
+ this[kPort] = port;
+ this[kName] = name;
+ this[kIncrementsPortRef] = true;
+ this[kStartedReading] = false;
+ this.on('end', () => {
+ if (this[kIncrementsPortRef] && --this[kPort][kWaitingStreams] === 0)
+ this[kPort].unref();
+ });
+ }
+
+ _read() {
+ if (!this[kStartedReading] && this[kIncrementsPortRef]) {
+ this[kStartedReading] = true;
+ if (this[kPort][kWaitingStreams]++ === 0)
+ this[kPort].ref();
+ }
+
+ this[kPort].postMessage({
+ type: messageTypes.STDIO_WANTS_MORE_DATA,
+ stream: this[kName]
+ });
+ }
+}
+
+class WritableWorkerStdio extends Writable {
+ constructor(port, name) {
+ super({ decodeStrings: false });
+ this[kPort] = port;
+ this[kName] = name;
+ this[kWritableCallbacks] = [];
+ }
+
+ _write(chunk, encoding, cb) {
+ this[kPort].postMessage({
+ type: messageTypes.STDIO_PAYLOAD,
+ stream: this[kName],
+ chunk,
+ encoding
+ });
+ this[kWritableCallbacks].push(cb);
+ if (this[kPort][kWaitingStreams]++ === 0)
+ this[kPort].ref();
+ }
+
+ _final(cb) {
+ this[kPort].postMessage({
+ type: messageTypes.STDIO_PAYLOAD,
+ stream: this[kName],
+ chunk: null
+ });
+ cb();
+ }
+
+ [kStdioWantsMoreDataCallback]() {
+ const cbs = this[kWritableCallbacks];
+ this[kWritableCallbacks] = [];
+ for (const cb of cbs)
+ cb();
+ if ((this[kPort][kWaitingStreams] -= cbs.length) === 0)
+ this[kPort].unref();
+ }
+}
+
+module.exports = {
+ drainMessagePort,
+ messageTypes,
+ kPort,
+ kIncrementsPortRef,
+ kWaitingStreams,
+ kStdioWantsMoreDataCallback,
+ MessagePort,
+ MessageChannel,
+ setupPortReferencing,
+ ReadableWorkerStdio,
+ WritableWorkerStdio
+};
diff --git a/lib/worker_threads.js b/lib/worker_threads.js
index 0609650cd5..828edb6bff 100644
--- a/lib/worker_threads.js
+++ b/lib/worker_threads.js
@@ -2,12 +2,15 @@
const {
isMainThread,
- MessagePort,
- MessageChannel,
threadId,
Worker
} = require('internal/worker');
+const {
+ MessagePort,
+ MessageChannel
+} = require('internal/worker/io');
+
module.exports = {
isMainThread,
MessagePort,
diff --git a/node.gyp b/node.gyp
index 8db51f6857..d2c3560131 100644
--- a/node.gyp
+++ b/node.gyp
@@ -181,6 +181,7 @@
'lib/internal/stream_base_commons.js',
'lib/internal/vm/source_text_module.js',
'lib/internal/worker.js',
+ 'lib/internal/worker/io.js',
'lib/internal/streams/lazy_transform.js',
'lib/internal/streams/async_iterator.js',
'lib/internal/streams/buffer_list.js',