diff options
-rw-r--r-- | doc/api/worker_threads.md | 27 | ||||
-rw-r--r-- | lib/internal/worker/io.js | 2 | ||||
-rw-r--r-- | lib/worker_threads.js | 4 | ||||
-rw-r--r-- | src/node_messaging.cc | 39 | ||||
-rw-r--r-- | src/node_messaging.h | 5 | ||||
-rw-r--r-- | test/parallel/test-worker-message-port-move.js | 69 |
6 files changed, 142 insertions, 4 deletions
diff --git a/doc/api/worker_threads.md b/doc/api/worker_threads.md index 5152cc8c4d..16787723c6 100644 --- a/doc/api/worker_threads.md +++ b/doc/api/worker_threads.md @@ -70,6 +70,30 @@ if (isMainThread) { } ``` +## worker.moveMessagePortToContext(port, contextifiedSandbox) +<!-- YAML +added: REPLACEME +--> + +* `port` {MessagePort} The message port which will be transferred. +* `contextifiedSandbox` {Object} A [contextified][] object as returned by the + `vm.createContext()` method. + +* Returns: {MessagePort} + +Transfer a `MessagePort` to a different [`vm`][] Context. The original `port` +object will be rendered unusable, and the returned `MessagePort` instance will +take its place. + +The returned `MessagePort` will be an object in the target context, and will +inherit from its global `Object` class. Objects passed to the +[`port.onmessage()`][] listener will also be created in the target context +and inherit from its global `Object` class. + +However, the created `MessagePort` will no longer inherit from +[`EventEmitter`][], and only [`port.onmessage()`][] can be used to receive +events using it. + ## worker.parentPort <!-- YAML added: v10.5.0 @@ -583,6 +607,7 @@ active handle in the event system. If the worker is already `unref()`ed calling [`Worker`]: #worker_threads_class_worker [`cluster` module]: cluster.html [`port.on('message')`]: #worker_threads_event_message +[`port.onmessage()`]: https://developer.mozilla.org/en-US/docs/Web/API/MessagePort/onmessage [`port.postMessage()`]: #worker_threads_port_postmessage_value_transferlist [`process.abort()`]: process.html#process_process_abort [`process.chdir()`]: process.html#process_process_chdir_directory @@ -600,6 +625,7 @@ active handle in the event system. If the worker is already `unref()`ed calling [`require('worker_threads').threadId`]: #worker_threads_worker_threadid [`require('worker_threads').workerData`]: #worker_threads_worker_workerdata [`trace_events`]: tracing.html +[`vm`]: vm.html [`worker.on('message')`]: #worker_threads_event_message_1 [`worker.postMessage()`]: #worker_threads_worker_postmessage_value_transferlist [`worker.terminate()`]: #worker_threads_worker_terminate_callback @@ -610,4 +636,5 @@ active handle in the event system. If the worker is already `unref()`ed calling [Web Workers]: https://developer.mozilla.org/en-US/docs/Web/API/Web_Workers_API [browser `MessagePort`]: https://developer.mozilla.org/en-US/docs/Web/API/MessagePort [child processes]: child_process.html +[contextified]: vm.html#vm_what_does_it_mean_to_contextify_an_object [v8.serdes]: v8.html#v8_serialization_api diff --git a/lib/internal/worker/io.js b/lib/internal/worker/io.js index 387dc9df74..2f1352fdf9 100644 --- a/lib/internal/worker/io.js +++ b/lib/internal/worker/io.js @@ -8,6 +8,7 @@ const { MessagePort, MessageChannel, drainMessagePort, + moveMessagePortToContext, stopMessagePort } = internalBinding('messaging'); const { threadId } = internalBinding('worker'); @@ -233,6 +234,7 @@ module.exports = { kIncrementsPortRef, kWaitingStreams, kStdioWantsMoreDataCallback, + moveMessagePortToContext, MessagePort, MessageChannel, setupPortReferencing, diff --git a/lib/worker_threads.js b/lib/worker_threads.js index 2fe1a87246..722e47caf1 100644 --- a/lib/worker_threads.js +++ b/lib/worker_threads.js @@ -8,13 +8,15 @@ const { const { MessagePort, - MessageChannel + MessageChannel, + moveMessagePortToContext, } = require('internal/worker/io'); module.exports = { isMainThread, MessagePort, MessageChannel, + moveMessagePortToContext, threadId, Worker, parentPort: null, diff --git a/src/node_messaging.cc b/src/node_messaging.cc index 9b9bab0814..a76c64d743 100644 --- a/src/node_messaging.cc +++ b/src/node_messaging.cc @@ -2,11 +2,13 @@ #include "async_wrap-inl.h" #include "debug_utils.h" +#include "node_contextify.h" #include "node_buffer.h" #include "node_errors.h" #include "node_process.h" #include "util.h" +using node::contextify::ContextifyContext; using v8::Array; using v8::ArrayBuffer; using v8::ArrayBufferCreationMode; @@ -760,6 +762,35 @@ void MessagePort::Drain(const FunctionCallbackInfo<Value>& args) { port->OnMessage(); } +void MessagePort::MoveToContext(const FunctionCallbackInfo<Value>& args) { + Environment* env = Environment::GetCurrent(args); + if (!args[0]->IsObject() || + !env->message_port_constructor_template()->HasInstance(args[0])) { + return THROW_ERR_INVALID_ARG_TYPE(env, + "First argument needs to be a MessagePort instance"); + } + MessagePort* port = Unwrap<MessagePort>(args[0].As<Object>()); + CHECK_NOT_NULL(port); + + Local<Value> context_arg = args[1]; + ContextifyContext* context_wrapper; + if (!context_arg->IsObject() || + (context_wrapper = ContextifyContext::ContextFromContextifiedSandbox( + env, context_arg.As<Object>())) == nullptr) { + return THROW_ERR_INVALID_ARG_TYPE(env, "Invalid context argument"); + } + + std::unique_ptr<MessagePortData> data; + if (!port->IsDetached()) + data = port->Detach(); + + Context::Scope context_scope(context_wrapper->context()); + MessagePort* target = + MessagePort::New(env, context_wrapper->context(), std::move(data)); + if (target != nullptr) + args.GetReturnValue().Set(target->object()); +} + void MessagePort::Entangle(MessagePort* a, MessagePort* b) { Entangle(a, b->data_.get()); } @@ -816,9 +847,9 @@ static void MessageChannel(const FunctionCallbackInfo<Value>& args) { MessagePort* port2 = MessagePort::New(env, context); MessagePort::Entangle(port1, port2); - args.This()->Set(env->context(), env->port1_string(), port1->object()) + args.This()->Set(context, env->port1_string(), port1->object()) .FromJust(); - args.This()->Set(env->context(), env->port2_string(), port2->object()) + args.This()->Set(context, env->port2_string(), port2->object()) .FromJust(); } @@ -833,7 +864,7 @@ static void InitMessaging(Local<Object> target, FIXED_ONE_BYTE_STRING(env->isolate(), "MessageChannel"); Local<FunctionTemplate> templ = env->NewFunctionTemplate(MessageChannel); templ->SetClassName(message_channel_string); - target->Set(env->context(), + target->Set(context, message_channel_string, templ->GetFunction(context).ToLocalChecked()).FromJust(); } @@ -847,6 +878,8 @@ static void InitMessaging(Local<Object> target, // the browser equivalents do not provide them. env->SetMethod(target, "stopMessagePort", MessagePort::Stop); env->SetMethod(target, "drainMessagePort", MessagePort::Drain); + env->SetMethod(target, "moveMessagePortToContext", + MessagePort::MoveToContext); } } // anonymous namespace diff --git a/src/node_messaging.h b/src/node_messaging.h index 99e344b8ca..aa2559af2c 100644 --- a/src/node_messaging.h +++ b/src/node_messaging.h @@ -158,12 +158,17 @@ class MessagePort : public HandleWrap { // Stop processing messages on this port as a receiving end. void Stop(); + /* constructor */ static void New(const v8::FunctionCallbackInfo<v8::Value>& args); + /* prototype methods */ static void PostMessage(const v8::FunctionCallbackInfo<v8::Value>& args); static void Start(const v8::FunctionCallbackInfo<v8::Value>& args); static void Stop(const v8::FunctionCallbackInfo<v8::Value>& args); static void Drain(const v8::FunctionCallbackInfo<v8::Value>& args); + /* static */ + static void MoveToContext(const v8::FunctionCallbackInfo<v8::Value>& args); + // Turns `a` and `b` into siblings, i.e. connects the sending side of one // to the receiving side of the other. This is not thread-safe. static void Entangle(MessagePort* a, MessagePort* b); diff --git a/test/parallel/test-worker-message-port-move.js b/test/parallel/test-worker-message-port-move.js new file mode 100644 index 0000000000..6508213977 --- /dev/null +++ b/test/parallel/test-worker-message-port-move.js @@ -0,0 +1,69 @@ +/* global port */ +'use strict'; +const common = require('../common'); +const assert = require('assert'); +const vm = require('vm'); +const { + MessagePort, MessageChannel, moveMessagePortToContext +} = require('worker_threads'); + +const context = vm.createContext(); +const { port1, port2 } = new MessageChannel(); +context.port = moveMessagePortToContext(port1, context); +context.global = context; +Object.assign(context, { + global: context, + assert, + MessagePort, + MessageChannel +}); + +vm.runInContext('(' + function() { + { + assert(port.postMessage instanceof Function); + assert(port.constructor instanceof Function); + for (let obj = port; obj !== null; obj = Object.getPrototypeOf(obj)) { + for (const key of Object.getOwnPropertyNames(obj)) { + if (typeof obj[key] === 'object' && obj[key] !== null) { + assert(obj[key] instanceof Object); + } else if (typeof obj[key] === 'function') { + assert(obj[key] instanceof Function); + } + } + } + + assert(!(port instanceof MessagePort)); + assert.strictEqual(port.onmessage, undefined); + port.onmessage = function({ data }) { + assert(data instanceof Object); + port.postMessage(data); + }; + port.start(); + } + + { + let threw = false; + try { + port.postMessage(global); + } catch (e) { + assert.strictEqual(e.constructor.name, 'DOMException'); + assert(e instanceof Object); + assert(e instanceof Error); + threw = true; + } + assert(threw); + } + + { + const newDummyPort = new (port.constructor)(); + assert(!(newDummyPort instanceof MessagePort)); + assert(newDummyPort.close instanceof Function); + newDummyPort.close(); + } +} + ')()', context); + +port2.on('message', common.mustCall((msg) => { + assert(msg instanceof Object); + port2.close(); +})); +port2.postMessage({}); |