diff options
-rw-r--r-- | doc/api/worker_threads.md | 28 | ||||
-rw-r--r-- | lib/internal/worker/io.js | 11 | ||||
-rw-r--r-- | lib/worker_threads.js | 2 | ||||
-rw-r--r-- | src/env.h | 1 | ||||
-rw-r--r-- | src/node_messaging.cc | 118 | ||||
-rw-r--r-- | src/node_messaging.h | 3 | ||||
-rw-r--r-- | test/parallel/test-worker-message-port-receive-message.js | 25 |
7 files changed, 139 insertions, 49 deletions
diff --git a/doc/api/worker_threads.md b/doc/api/worker_threads.md index 77afaaec9c..3c36a0a03c 100644 --- a/doc/api/worker_threads.md +++ b/doc/api/worker_threads.md @@ -125,6 +125,34 @@ if (isMainThread) { } ``` +## worker.receiveMessageOnPort(port) +<!-- YAML +added: REPLACEME +--> + +* `port` {MessagePort} + +* Returns: {Object|undefined} + +Receive a single message from a given `MessagePort`. If no message is available, +`undefined` is returned, otherwise an object with a single `message` property +that contains the message payload, corresponding to the oldest message in the +`MessagePort`’s queue. + +```js +const { MessageChannel, receiveMessageOnPort } = require('worker_threads'); +const { port1, port2 } = new MessageChannel(); +port1.postMessage({ hello: 'world' }); + +console.log(receiveMessageOnPort(port2)); +// Prints: { message: { hello: 'world' } } +console.log(receiveMessageOnPort(port2)); +// Prints: undefined +``` + +When this function is used, no `'message'` event will be emitted and the +`onmessage` listener will not be invoked. + ## worker.SHARE_ENV <!-- YAML added: v11.14.0 diff --git a/lib/internal/worker/io.js b/lib/internal/worker/io.js index 8b7aedd39d..63c8a8d976 100644 --- a/lib/internal/worker/io.js +++ b/lib/internal/worker/io.js @@ -4,13 +4,15 @@ const { Object } = primordials; const { handle_onclose: handleOnCloseSymbol, - oninit: onInitSymbol + oninit: onInitSymbol, + no_message_symbol: noMessageSymbol } = internalBinding('symbols'); const { MessagePort, MessageChannel, drainMessagePort, moveMessagePortToContext, + receiveMessageOnPort: receiveMessageOnPort_, stopMessagePort } = internalBinding('messaging'); const { @@ -235,6 +237,12 @@ function createWorkerStdio() { }; } +function receiveMessageOnPort(port) { + const message = receiveMessageOnPort_(port); + if (message === noMessageSymbol) return undefined; + return { message }; +} + module.exports = { drainMessagePort, messageTypes, @@ -245,6 +253,7 @@ module.exports = { moveMessagePortToContext, MessagePort, MessageChannel, + receiveMessageOnPort, setupPortReferencing, ReadableWorkerStdio, WritableWorkerStdio, diff --git a/lib/worker_threads.js b/lib/worker_threads.js index 01d01e2a33..bd455edea2 100644 --- a/lib/worker_threads.js +++ b/lib/worker_threads.js @@ -11,6 +11,7 @@ const { MessagePort, MessageChannel, moveMessagePortToContext, + receiveMessageOnPort } = require('internal/worker/io'); module.exports = { @@ -18,6 +19,7 @@ module.exports = { MessagePort, MessageChannel, moveMessagePortToContext, + receiveMessageOnPort, threadId, SHARE_ENV, Worker, @@ -130,6 +130,7 @@ constexpr size_t kFsStatsBufferLength = kFsStatsFieldsNumber * 2; // for the sake of convenience. #define PER_ISOLATE_SYMBOL_PROPERTIES(V) \ V(handle_onclose_symbol, "handle_onclose") \ + V(no_message_symbol, "no_message_symbol") \ V(oninit_symbol, "oninit") \ V(owner_symbol, "owner") \ diff --git a/src/node_messaging.cc b/src/node_messaging.cc index 98ef42df75..ea7b48779c 100644 --- a/src/node_messaging.cc +++ b/src/node_messaging.cc @@ -569,6 +569,40 @@ MessagePort* MessagePort::New( return port; } +MaybeLocal<Value> MessagePort::ReceiveMessage(Local<Context> context, + bool only_if_receiving) { + Message received; + { + // Get the head of the message queue. + Mutex::ScopedLock lock(data_->mutex_); + + Debug(this, "MessagePort has message"); + + bool wants_message = receiving_messages_ || !only_if_receiving; + // We have nothing to do if: + // - There are no pending messages + // - We are not intending to receive messages, and the message we would + // receive is not the final "close" message. + if (data_->incoming_messages_.empty() || + (!wants_message && + !data_->incoming_messages_.front().IsCloseMessage())) { + return env()->no_message_symbol(); + } + + received = std::move(data_->incoming_messages_.front()); + data_->incoming_messages_.pop_front(); + } + + if (received.IsCloseMessage()) { + Close(); + return env()->no_message_symbol(); + } + + if (!env()->can_call_into_js()) return MaybeLocal<Value>(); + + return received.Deserialize(env(), context); +} + void MessagePort::OnMessage() { Debug(this, "Running MessagePort::OnMessage()"); HandleScope handle_scope(env()->isolate()); @@ -579,32 +613,12 @@ void MessagePort::OnMessage() { // messages, so we need to check that this handle still owns its `data_` field // on every iteration. while (data_) { - Message received; - { - // Get the head of the message queue. - Mutex::ScopedLock lock(data_->mutex_); - - Debug(this, "MessagePort has message, receiving = %d", - static_cast<int>(receiving_messages_)); - - // We have nothing to do if: - // - There are no pending messages - // - We are not intending to receive messages, and the message we would - // receive is not the final "close" message. - if (data_->incoming_messages_.empty() || - (!receiving_messages_ && - !data_->incoming_messages_.front().IsCloseMessage())) { - break; - } + HandleScope handle_scope(env()->isolate()); + Context::Scope context_scope(context); - received = std::move(data_->incoming_messages_.front()); - data_->incoming_messages_.pop_front(); - } - - if (received.IsCloseMessage()) { - Close(); - return; - } + Local<Value> payload; + if (!ReceiveMessage(context, true).ToLocal(&payload)) break; + if (payload == env()->no_message_symbol()) break; if (!env()->can_call_into_js()) { Debug(this, "MessagePort drains queue because !can_call_into_js()"); @@ -612,28 +626,20 @@ void MessagePort::OnMessage() { continue; } - { - // Call the JS .onmessage() callback. - HandleScope handle_scope(env()->isolate()); - Context::Scope context_scope(context); - - Local<Object> event; - Local<Value> payload; - Local<Value> cb_args[1]; - if (!received.Deserialize(env(), context).ToLocal(&payload) || - !env()->message_event_object_template()->NewInstance(context) - .ToLocal(&event) || - event->Set(context, env()->data_string(), payload).IsNothing() || - event->Set(context, env()->target_string(), object()).IsNothing() || - (cb_args[0] = event, false) || - MakeCallback(env()->onmessage_string(), - arraysize(cb_args), - cb_args).IsEmpty()) { - // Re-schedule OnMessage() execution in case of failure. - if (data_) - TriggerAsync(); - return; - } + Local<Object> event; + Local<Value> cb_args[1]; + if (!env()->message_event_object_template()->NewInstance(context) + .ToLocal(&event) || + event->Set(context, env()->data_string(), payload).IsNothing() || + event->Set(context, env()->target_string(), object()).IsNothing() || + (cb_args[0] = event, false) || + MakeCallback(env()->onmessage_string(), + arraysize(cb_args), + cb_args).IsEmpty()) { + // Re-schedule OnMessage() execution in case of failure. + if (data_) + TriggerAsync(); + return; } } } @@ -754,11 +760,26 @@ void MessagePort::Stop(const FunctionCallbackInfo<Value>& args) { void MessagePort::Drain(const FunctionCallbackInfo<Value>& args) { MessagePort* port; - CHECK(args[0]->IsObject()); ASSIGN_OR_RETURN_UNWRAP(&port, args[0].As<Object>()); port->OnMessage(); } +void MessagePort::ReceiveMessage(const FunctionCallbackInfo<Value>& args) { + CHECK(args[0]->IsObject()); + MessagePort* port = Unwrap<MessagePort>(args[0].As<Object>()); + if (port == nullptr) { + // Return 'no messages' for a closed port. + args.GetReturnValue().Set( + Environment::GetCurrent(args)->no_message_symbol()); + return; + } + + MaybeLocal<Value> payload = + port->ReceiveMessage(port->object()->CreationContext(), false); + if (!payload.IsEmpty()) + args.GetReturnValue().Set(payload.ToLocalChecked()); +} + void MessagePort::MoveToContext(const FunctionCallbackInfo<Value>& args) { Environment* env = Environment::GetCurrent(args); if (!args[0]->IsObject() || @@ -875,6 +896,7 @@ static void InitMessaging(Local<Object> target, // the browser equivalents do not provide them. env->SetMethod(target, "stopMessagePort", MessagePort::Stop); env->SetMethod(target, "drainMessagePort", MessagePort::Drain); + env->SetMethod(target, "receiveMessageOnPort", MessagePort::ReceiveMessage); env->SetMethod(target, "moveMessagePortToContext", MessagePort::MoveToContext); } diff --git a/src/node_messaging.h b/src/node_messaging.h index 08a6798e3c..9f7929aa1c 100644 --- a/src/node_messaging.h +++ b/src/node_messaging.h @@ -163,6 +163,7 @@ class MessagePort : public HandleWrap { static void Start(const v8::FunctionCallbackInfo<v8::Value>& args); static void Stop(const v8::FunctionCallbackInfo<v8::Value>& args); static void Drain(const v8::FunctionCallbackInfo<v8::Value>& args); + static void ReceiveMessage(const v8::FunctionCallbackInfo<v8::Value>& args); /* static */ static void MoveToContext(const v8::FunctionCallbackInfo<v8::Value>& args); @@ -200,6 +201,8 @@ class MessagePort : public HandleWrap { void OnClose() override; void OnMessage(); void TriggerAsync(); + v8::MaybeLocal<v8::Value> ReceiveMessage(v8::Local<v8::Context> context, + bool only_if_receiving); std::unique_ptr<MessagePortData> data_ = nullptr; bool receiving_messages_ = false; diff --git a/test/parallel/test-worker-message-port-receive-message.js b/test/parallel/test-worker-message-port-receive-message.js new file mode 100644 index 0000000000..9bd8e7ed64 --- /dev/null +++ b/test/parallel/test-worker-message-port-receive-message.js @@ -0,0 +1,25 @@ +'use strict'; +const common = require('../common'); +const assert = require('assert'); +const { MessageChannel, receiveMessageOnPort } = require('worker_threads'); + +const { port1, port2 } = new MessageChannel(); + +const message1 = { hello: 'world' }; +const message2 = { foo: 'bar' }; + +// Make sure receiveMessageOnPort() works in a FIFO way, the same way it does +// when we’re using events. +assert.deepStrictEqual(receiveMessageOnPort(port2), undefined); +port1.postMessage(message1); +port1.postMessage(message2); +assert.deepStrictEqual(receiveMessageOnPort(port2), { message: message1 }); +assert.deepStrictEqual(receiveMessageOnPort(port2), { message: message2 }); +assert.deepStrictEqual(receiveMessageOnPort(port2), undefined); +assert.deepStrictEqual(receiveMessageOnPort(port2), undefined); + +// Make sure message handlers aren’t called. +port2.on('message', common.mustNotCall()); +port1.postMessage(message1); +assert.deepStrictEqual(receiveMessageOnPort(port2), { message: message1 }); +port1.close(); |