summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--doc/api/worker_threads.md28
-rw-r--r--lib/internal/worker/io.js11
-rw-r--r--lib/worker_threads.js2
-rw-r--r--src/env.h1
-rw-r--r--src/node_messaging.cc118
-rw-r--r--src/node_messaging.h3
-rw-r--r--test/parallel/test-worker-message-port-receive-message.js25
7 files changed, 139 insertions, 49 deletions
diff --git a/doc/api/worker_threads.md b/doc/api/worker_threads.md
index 77afaaec9c..3c36a0a03c 100644
--- a/doc/api/worker_threads.md
+++ b/doc/api/worker_threads.md
@@ -125,6 +125,34 @@ if (isMainThread) {
}
```
+## worker.receiveMessageOnPort(port)
+<!-- YAML
+added: REPLACEME
+-->
+
+* `port` {MessagePort}
+
+* Returns: {Object|undefined}
+
+Receive a single message from a given `MessagePort`. If no message is available,
+`undefined` is returned, otherwise an object with a single `message` property
+that contains the message payload, corresponding to the oldest message in the
+`MessagePort`’s queue.
+
+```js
+const { MessageChannel, receiveMessageOnPort } = require('worker_threads');
+const { port1, port2 } = new MessageChannel();
+port1.postMessage({ hello: 'world' });
+
+console.log(receiveMessageOnPort(port2));
+// Prints: { message: { hello: 'world' } }
+console.log(receiveMessageOnPort(port2));
+// Prints: undefined
+```
+
+When this function is used, no `'message'` event will be emitted and the
+`onmessage` listener will not be invoked.
+
## worker.SHARE_ENV
<!-- YAML
added: v11.14.0
diff --git a/lib/internal/worker/io.js b/lib/internal/worker/io.js
index 8b7aedd39d..63c8a8d976 100644
--- a/lib/internal/worker/io.js
+++ b/lib/internal/worker/io.js
@@ -4,13 +4,15 @@ const { Object } = primordials;
const {
handle_onclose: handleOnCloseSymbol,
- oninit: onInitSymbol
+ oninit: onInitSymbol,
+ no_message_symbol: noMessageSymbol
} = internalBinding('symbols');
const {
MessagePort,
MessageChannel,
drainMessagePort,
moveMessagePortToContext,
+ receiveMessageOnPort: receiveMessageOnPort_,
stopMessagePort
} = internalBinding('messaging');
const {
@@ -235,6 +237,12 @@ function createWorkerStdio() {
};
}
+function receiveMessageOnPort(port) {
+ const message = receiveMessageOnPort_(port);
+ if (message === noMessageSymbol) return undefined;
+ return { message };
+}
+
module.exports = {
drainMessagePort,
messageTypes,
@@ -245,6 +253,7 @@ module.exports = {
moveMessagePortToContext,
MessagePort,
MessageChannel,
+ receiveMessageOnPort,
setupPortReferencing,
ReadableWorkerStdio,
WritableWorkerStdio,
diff --git a/lib/worker_threads.js b/lib/worker_threads.js
index 01d01e2a33..bd455edea2 100644
--- a/lib/worker_threads.js
+++ b/lib/worker_threads.js
@@ -11,6 +11,7 @@ const {
MessagePort,
MessageChannel,
moveMessagePortToContext,
+ receiveMessageOnPort
} = require('internal/worker/io');
module.exports = {
@@ -18,6 +19,7 @@ module.exports = {
MessagePort,
MessageChannel,
moveMessagePortToContext,
+ receiveMessageOnPort,
threadId,
SHARE_ENV,
Worker,
diff --git a/src/env.h b/src/env.h
index 70d335f3c5..7b7e9f6132 100644
--- a/src/env.h
+++ b/src/env.h
@@ -130,6 +130,7 @@ constexpr size_t kFsStatsBufferLength = kFsStatsFieldsNumber * 2;
// for the sake of convenience.
#define PER_ISOLATE_SYMBOL_PROPERTIES(V) \
V(handle_onclose_symbol, "handle_onclose") \
+ V(no_message_symbol, "no_message_symbol") \
V(oninit_symbol, "oninit") \
V(owner_symbol, "owner") \
diff --git a/src/node_messaging.cc b/src/node_messaging.cc
index 98ef42df75..ea7b48779c 100644
--- a/src/node_messaging.cc
+++ b/src/node_messaging.cc
@@ -569,6 +569,40 @@ MessagePort* MessagePort::New(
return port;
}
+MaybeLocal<Value> MessagePort::ReceiveMessage(Local<Context> context,
+ bool only_if_receiving) {
+ Message received;
+ {
+ // Get the head of the message queue.
+ Mutex::ScopedLock lock(data_->mutex_);
+
+ Debug(this, "MessagePort has message");
+
+ bool wants_message = receiving_messages_ || !only_if_receiving;
+ // We have nothing to do if:
+ // - There are no pending messages
+ // - We are not intending to receive messages, and the message we would
+ // receive is not the final "close" message.
+ if (data_->incoming_messages_.empty() ||
+ (!wants_message &&
+ !data_->incoming_messages_.front().IsCloseMessage())) {
+ return env()->no_message_symbol();
+ }
+
+ received = std::move(data_->incoming_messages_.front());
+ data_->incoming_messages_.pop_front();
+ }
+
+ if (received.IsCloseMessage()) {
+ Close();
+ return env()->no_message_symbol();
+ }
+
+ if (!env()->can_call_into_js()) return MaybeLocal<Value>();
+
+ return received.Deserialize(env(), context);
+}
+
void MessagePort::OnMessage() {
Debug(this, "Running MessagePort::OnMessage()");
HandleScope handle_scope(env()->isolate());
@@ -579,32 +613,12 @@ void MessagePort::OnMessage() {
// messages, so we need to check that this handle still owns its `data_` field
// on every iteration.
while (data_) {
- Message received;
- {
- // Get the head of the message queue.
- Mutex::ScopedLock lock(data_->mutex_);
-
- Debug(this, "MessagePort has message, receiving = %d",
- static_cast<int>(receiving_messages_));
-
- // We have nothing to do if:
- // - There are no pending messages
- // - We are not intending to receive messages, and the message we would
- // receive is not the final "close" message.
- if (data_->incoming_messages_.empty() ||
- (!receiving_messages_ &&
- !data_->incoming_messages_.front().IsCloseMessage())) {
- break;
- }
+ HandleScope handle_scope(env()->isolate());
+ Context::Scope context_scope(context);
- received = std::move(data_->incoming_messages_.front());
- data_->incoming_messages_.pop_front();
- }
-
- if (received.IsCloseMessage()) {
- Close();
- return;
- }
+ Local<Value> payload;
+ if (!ReceiveMessage(context, true).ToLocal(&payload)) break;
+ if (payload == env()->no_message_symbol()) break;
if (!env()->can_call_into_js()) {
Debug(this, "MessagePort drains queue because !can_call_into_js()");
@@ -612,28 +626,20 @@ void MessagePort::OnMessage() {
continue;
}
- {
- // Call the JS .onmessage() callback.
- HandleScope handle_scope(env()->isolate());
- Context::Scope context_scope(context);
-
- Local<Object> event;
- Local<Value> payload;
- Local<Value> cb_args[1];
- if (!received.Deserialize(env(), context).ToLocal(&payload) ||
- !env()->message_event_object_template()->NewInstance(context)
- .ToLocal(&event) ||
- event->Set(context, env()->data_string(), payload).IsNothing() ||
- event->Set(context, env()->target_string(), object()).IsNothing() ||
- (cb_args[0] = event, false) ||
- MakeCallback(env()->onmessage_string(),
- arraysize(cb_args),
- cb_args).IsEmpty()) {
- // Re-schedule OnMessage() execution in case of failure.
- if (data_)
- TriggerAsync();
- return;
- }
+ Local<Object> event;
+ Local<Value> cb_args[1];
+ if (!env()->message_event_object_template()->NewInstance(context)
+ .ToLocal(&event) ||
+ event->Set(context, env()->data_string(), payload).IsNothing() ||
+ event->Set(context, env()->target_string(), object()).IsNothing() ||
+ (cb_args[0] = event, false) ||
+ MakeCallback(env()->onmessage_string(),
+ arraysize(cb_args),
+ cb_args).IsEmpty()) {
+ // Re-schedule OnMessage() execution in case of failure.
+ if (data_)
+ TriggerAsync();
+ return;
}
}
}
@@ -754,11 +760,26 @@ void MessagePort::Stop(const FunctionCallbackInfo<Value>& args) {
void MessagePort::Drain(const FunctionCallbackInfo<Value>& args) {
MessagePort* port;
- CHECK(args[0]->IsObject());
ASSIGN_OR_RETURN_UNWRAP(&port, args[0].As<Object>());
port->OnMessage();
}
+void MessagePort::ReceiveMessage(const FunctionCallbackInfo<Value>& args) {
+ CHECK(args[0]->IsObject());
+ MessagePort* port = Unwrap<MessagePort>(args[0].As<Object>());
+ if (port == nullptr) {
+ // Return 'no messages' for a closed port.
+ args.GetReturnValue().Set(
+ Environment::GetCurrent(args)->no_message_symbol());
+ return;
+ }
+
+ MaybeLocal<Value> payload =
+ port->ReceiveMessage(port->object()->CreationContext(), false);
+ if (!payload.IsEmpty())
+ args.GetReturnValue().Set(payload.ToLocalChecked());
+}
+
void MessagePort::MoveToContext(const FunctionCallbackInfo<Value>& args) {
Environment* env = Environment::GetCurrent(args);
if (!args[0]->IsObject() ||
@@ -875,6 +896,7 @@ static void InitMessaging(Local<Object> target,
// the browser equivalents do not provide them.
env->SetMethod(target, "stopMessagePort", MessagePort::Stop);
env->SetMethod(target, "drainMessagePort", MessagePort::Drain);
+ env->SetMethod(target, "receiveMessageOnPort", MessagePort::ReceiveMessage);
env->SetMethod(target, "moveMessagePortToContext",
MessagePort::MoveToContext);
}
diff --git a/src/node_messaging.h b/src/node_messaging.h
index 08a6798e3c..9f7929aa1c 100644
--- a/src/node_messaging.h
+++ b/src/node_messaging.h
@@ -163,6 +163,7 @@ class MessagePort : public HandleWrap {
static void Start(const v8::FunctionCallbackInfo<v8::Value>& args);
static void Stop(const v8::FunctionCallbackInfo<v8::Value>& args);
static void Drain(const v8::FunctionCallbackInfo<v8::Value>& args);
+ static void ReceiveMessage(const v8::FunctionCallbackInfo<v8::Value>& args);
/* static */
static void MoveToContext(const v8::FunctionCallbackInfo<v8::Value>& args);
@@ -200,6 +201,8 @@ class MessagePort : public HandleWrap {
void OnClose() override;
void OnMessage();
void TriggerAsync();
+ v8::MaybeLocal<v8::Value> ReceiveMessage(v8::Local<v8::Context> context,
+ bool only_if_receiving);
std::unique_ptr<MessagePortData> data_ = nullptr;
bool receiving_messages_ = false;
diff --git a/test/parallel/test-worker-message-port-receive-message.js b/test/parallel/test-worker-message-port-receive-message.js
new file mode 100644
index 0000000000..9bd8e7ed64
--- /dev/null
+++ b/test/parallel/test-worker-message-port-receive-message.js
@@ -0,0 +1,25 @@
+'use strict';
+const common = require('../common');
+const assert = require('assert');
+const { MessageChannel, receiveMessageOnPort } = require('worker_threads');
+
+const { port1, port2 } = new MessageChannel();
+
+const message1 = { hello: 'world' };
+const message2 = { foo: 'bar' };
+
+// Make sure receiveMessageOnPort() works in a FIFO way, the same way it does
+// when we’re using events.
+assert.deepStrictEqual(receiveMessageOnPort(port2), undefined);
+port1.postMessage(message1);
+port1.postMessage(message2);
+assert.deepStrictEqual(receiveMessageOnPort(port2), { message: message1 });
+assert.deepStrictEqual(receiveMessageOnPort(port2), { message: message2 });
+assert.deepStrictEqual(receiveMessageOnPort(port2), undefined);
+assert.deepStrictEqual(receiveMessageOnPort(port2), undefined);
+
+// Make sure message handlers aren’t called.
+port2.on('message', common.mustNotCall());
+port1.postMessage(message1);
+assert.deepStrictEqual(receiveMessageOnPort(port2), { message: message1 });
+port1.close();