aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--doc/api/worker_threads.md27
-rw-r--r--lib/internal/worker/io.js2
-rw-r--r--lib/worker_threads.js4
-rw-r--r--src/node_messaging.cc39
-rw-r--r--src/node_messaging.h5
-rw-r--r--test/parallel/test-worker-message-port-move.js69
6 files changed, 142 insertions, 4 deletions
diff --git a/doc/api/worker_threads.md b/doc/api/worker_threads.md
index 5152cc8c4d..16787723c6 100644
--- a/doc/api/worker_threads.md
+++ b/doc/api/worker_threads.md
@@ -70,6 +70,30 @@ if (isMainThread) {
}
```
+## worker.moveMessagePortToContext(port, contextifiedSandbox)
+<!-- YAML
+added: REPLACEME
+-->
+
+* `port` {MessagePort} The message port which will be transferred.
+* `contextifiedSandbox` {Object} A [contextified][] object as returned by the
+ `vm.createContext()` method.
+
+* Returns: {MessagePort}
+
+Transfer a `MessagePort` to a different [`vm`][] Context. The original `port`
+object will be rendered unusable, and the returned `MessagePort` instance will
+take its place.
+
+The returned `MessagePort` will be an object in the target context, and will
+inherit from its global `Object` class. Objects passed to the
+[`port.onmessage()`][] listener will also be created in the target context
+and inherit from its global `Object` class.
+
+However, the created `MessagePort` will no longer inherit from
+[`EventEmitter`][], and only [`port.onmessage()`][] can be used to receive
+events using it.
+
## worker.parentPort
<!-- YAML
added: v10.5.0
@@ -583,6 +607,7 @@ active handle in the event system. If the worker is already `unref()`ed calling
[`Worker`]: #worker_threads_class_worker
[`cluster` module]: cluster.html
[`port.on('message')`]: #worker_threads_event_message
+[`port.onmessage()`]: https://developer.mozilla.org/en-US/docs/Web/API/MessagePort/onmessage
[`port.postMessage()`]: #worker_threads_port_postmessage_value_transferlist
[`process.abort()`]: process.html#process_process_abort
[`process.chdir()`]: process.html#process_process_chdir_directory
@@ -600,6 +625,7 @@ active handle in the event system. If the worker is already `unref()`ed calling
[`require('worker_threads').threadId`]: #worker_threads_worker_threadid
[`require('worker_threads').workerData`]: #worker_threads_worker_workerdata
[`trace_events`]: tracing.html
+[`vm`]: vm.html
[`worker.on('message')`]: #worker_threads_event_message_1
[`worker.postMessage()`]: #worker_threads_worker_postmessage_value_transferlist
[`worker.terminate()`]: #worker_threads_worker_terminate_callback
@@ -610,4 +636,5 @@ active handle in the event system. If the worker is already `unref()`ed calling
[Web Workers]: https://developer.mozilla.org/en-US/docs/Web/API/Web_Workers_API
[browser `MessagePort`]: https://developer.mozilla.org/en-US/docs/Web/API/MessagePort
[child processes]: child_process.html
+[contextified]: vm.html#vm_what_does_it_mean_to_contextify_an_object
[v8.serdes]: v8.html#v8_serialization_api
diff --git a/lib/internal/worker/io.js b/lib/internal/worker/io.js
index 387dc9df74..2f1352fdf9 100644
--- a/lib/internal/worker/io.js
+++ b/lib/internal/worker/io.js
@@ -8,6 +8,7 @@ const {
MessagePort,
MessageChannel,
drainMessagePort,
+ moveMessagePortToContext,
stopMessagePort
} = internalBinding('messaging');
const { threadId } = internalBinding('worker');
@@ -233,6 +234,7 @@ module.exports = {
kIncrementsPortRef,
kWaitingStreams,
kStdioWantsMoreDataCallback,
+ moveMessagePortToContext,
MessagePort,
MessageChannel,
setupPortReferencing,
diff --git a/lib/worker_threads.js b/lib/worker_threads.js
index 2fe1a87246..722e47caf1 100644
--- a/lib/worker_threads.js
+++ b/lib/worker_threads.js
@@ -8,13 +8,15 @@ const {
const {
MessagePort,
- MessageChannel
+ MessageChannel,
+ moveMessagePortToContext,
} = require('internal/worker/io');
module.exports = {
isMainThread,
MessagePort,
MessageChannel,
+ moveMessagePortToContext,
threadId,
Worker,
parentPort: null,
diff --git a/src/node_messaging.cc b/src/node_messaging.cc
index 9b9bab0814..a76c64d743 100644
--- a/src/node_messaging.cc
+++ b/src/node_messaging.cc
@@ -2,11 +2,13 @@
#include "async_wrap-inl.h"
#include "debug_utils.h"
+#include "node_contextify.h"
#include "node_buffer.h"
#include "node_errors.h"
#include "node_process.h"
#include "util.h"
+using node::contextify::ContextifyContext;
using v8::Array;
using v8::ArrayBuffer;
using v8::ArrayBufferCreationMode;
@@ -760,6 +762,35 @@ void MessagePort::Drain(const FunctionCallbackInfo<Value>& args) {
port->OnMessage();
}
+void MessagePort::MoveToContext(const FunctionCallbackInfo<Value>& args) {
+ Environment* env = Environment::GetCurrent(args);
+ if (!args[0]->IsObject() ||
+ !env->message_port_constructor_template()->HasInstance(args[0])) {
+ return THROW_ERR_INVALID_ARG_TYPE(env,
+ "First argument needs to be a MessagePort instance");
+ }
+ MessagePort* port = Unwrap<MessagePort>(args[0].As<Object>());
+ CHECK_NOT_NULL(port);
+
+ Local<Value> context_arg = args[1];
+ ContextifyContext* context_wrapper;
+ if (!context_arg->IsObject() ||
+ (context_wrapper = ContextifyContext::ContextFromContextifiedSandbox(
+ env, context_arg.As<Object>())) == nullptr) {
+ return THROW_ERR_INVALID_ARG_TYPE(env, "Invalid context argument");
+ }
+
+ std::unique_ptr<MessagePortData> data;
+ if (!port->IsDetached())
+ data = port->Detach();
+
+ Context::Scope context_scope(context_wrapper->context());
+ MessagePort* target =
+ MessagePort::New(env, context_wrapper->context(), std::move(data));
+ if (target != nullptr)
+ args.GetReturnValue().Set(target->object());
+}
+
void MessagePort::Entangle(MessagePort* a, MessagePort* b) {
Entangle(a, b->data_.get());
}
@@ -816,9 +847,9 @@ static void MessageChannel(const FunctionCallbackInfo<Value>& args) {
MessagePort* port2 = MessagePort::New(env, context);
MessagePort::Entangle(port1, port2);
- args.This()->Set(env->context(), env->port1_string(), port1->object())
+ args.This()->Set(context, env->port1_string(), port1->object())
.FromJust();
- args.This()->Set(env->context(), env->port2_string(), port2->object())
+ args.This()->Set(context, env->port2_string(), port2->object())
.FromJust();
}
@@ -833,7 +864,7 @@ static void InitMessaging(Local<Object> target,
FIXED_ONE_BYTE_STRING(env->isolate(), "MessageChannel");
Local<FunctionTemplate> templ = env->NewFunctionTemplate(MessageChannel);
templ->SetClassName(message_channel_string);
- target->Set(env->context(),
+ target->Set(context,
message_channel_string,
templ->GetFunction(context).ToLocalChecked()).FromJust();
}
@@ -847,6 +878,8 @@ static void InitMessaging(Local<Object> target,
// the browser equivalents do not provide them.
env->SetMethod(target, "stopMessagePort", MessagePort::Stop);
env->SetMethod(target, "drainMessagePort", MessagePort::Drain);
+ env->SetMethod(target, "moveMessagePortToContext",
+ MessagePort::MoveToContext);
}
} // anonymous namespace
diff --git a/src/node_messaging.h b/src/node_messaging.h
index 99e344b8ca..aa2559af2c 100644
--- a/src/node_messaging.h
+++ b/src/node_messaging.h
@@ -158,12 +158,17 @@ class MessagePort : public HandleWrap {
// Stop processing messages on this port as a receiving end.
void Stop();
+ /* constructor */
static void New(const v8::FunctionCallbackInfo<v8::Value>& args);
+ /* prototype methods */
static void PostMessage(const v8::FunctionCallbackInfo<v8::Value>& args);
static void Start(const v8::FunctionCallbackInfo<v8::Value>& args);
static void Stop(const v8::FunctionCallbackInfo<v8::Value>& args);
static void Drain(const v8::FunctionCallbackInfo<v8::Value>& args);
+ /* static */
+ static void MoveToContext(const v8::FunctionCallbackInfo<v8::Value>& args);
+
// Turns `a` and `b` into siblings, i.e. connects the sending side of one
// to the receiving side of the other. This is not thread-safe.
static void Entangle(MessagePort* a, MessagePort* b);
diff --git a/test/parallel/test-worker-message-port-move.js b/test/parallel/test-worker-message-port-move.js
new file mode 100644
index 0000000000..6508213977
--- /dev/null
+++ b/test/parallel/test-worker-message-port-move.js
@@ -0,0 +1,69 @@
+/* global port */
+'use strict';
+const common = require('../common');
+const assert = require('assert');
+const vm = require('vm');
+const {
+ MessagePort, MessageChannel, moveMessagePortToContext
+} = require('worker_threads');
+
+const context = vm.createContext();
+const { port1, port2 } = new MessageChannel();
+context.port = moveMessagePortToContext(port1, context);
+context.global = context;
+Object.assign(context, {
+ global: context,
+ assert,
+ MessagePort,
+ MessageChannel
+});
+
+vm.runInContext('(' + function() {
+ {
+ assert(port.postMessage instanceof Function);
+ assert(port.constructor instanceof Function);
+ for (let obj = port; obj !== null; obj = Object.getPrototypeOf(obj)) {
+ for (const key of Object.getOwnPropertyNames(obj)) {
+ if (typeof obj[key] === 'object' && obj[key] !== null) {
+ assert(obj[key] instanceof Object);
+ } else if (typeof obj[key] === 'function') {
+ assert(obj[key] instanceof Function);
+ }
+ }
+ }
+
+ assert(!(port instanceof MessagePort));
+ assert.strictEqual(port.onmessage, undefined);
+ port.onmessage = function({ data }) {
+ assert(data instanceof Object);
+ port.postMessage(data);
+ };
+ port.start();
+ }
+
+ {
+ let threw = false;
+ try {
+ port.postMessage(global);
+ } catch (e) {
+ assert.strictEqual(e.constructor.name, 'DOMException');
+ assert(e instanceof Object);
+ assert(e instanceof Error);
+ threw = true;
+ }
+ assert(threw);
+ }
+
+ {
+ const newDummyPort = new (port.constructor)();
+ assert(!(newDummyPort instanceof MessagePort));
+ assert(newDummyPort.close instanceof Function);
+ newDummyPort.close();
+ }
+} + ')()', context);
+
+port2.on('message', common.mustCall((msg) => {
+ assert(msg instanceof Object);
+ port2.close();
+}));
+port2.postMessage({});