summaryrefslogtreecommitdiff
path: root/src/node_messaging.cc
diff options
context:
space:
mode:
authorAnna Henningsen <anna@addaleax.net>2019-04-18 00:56:57 +0200
committerAnna Henningsen <anna@addaleax.net>2019-05-19 22:01:34 +0200
commit76f2168393d5b97aadce573415143383ccd3d78e (patch)
treedc1f2de83234e4fb1892e82d6d432f8ed54d2a98 /src/node_messaging.cc
parent13d2df530b613f5f41db3e9680fa85d55bea0f1e (diff)
downloadandroid-node-v8-76f2168393d5b97aadce573415143383ccd3d78e.tar.gz
android-node-v8-76f2168393d5b97aadce573415143383ccd3d78e.tar.bz2
android-node-v8-76f2168393d5b97aadce573415143383ccd3d78e.zip
worker: add ability to unshift message from MessagePort
In combination with Atomics, this makes it possible to implement generic synchronous functionality, e.g. `importScript()`, in Workers purely by communicating with other threads. This is a continuation of https://github.com/nodejs/node/pull/26686, where a preference for a solution was voiced that allowed reading individual messages, rather than emitting all messages through events. PR-URL: https://github.com/nodejs/node/pull/27294 Reviewed-By: Benjamin Gruenbaum <benjamingr@gmail.com> Reviewed-By: James M Snell <jasnell@gmail.com> Reviewed-By: Gus Caplan <me@gus.host> Reviewed-By: Ruben Bridgewater <ruben@bridgewater.de>
Diffstat (limited to 'src/node_messaging.cc')
-rw-r--r--src/node_messaging.cc118
1 files changed, 70 insertions, 48 deletions
diff --git a/src/node_messaging.cc b/src/node_messaging.cc
index 98ef42df75..ea7b48779c 100644
--- a/src/node_messaging.cc
+++ b/src/node_messaging.cc
@@ -569,6 +569,40 @@ MessagePort* MessagePort::New(
return port;
}
+MaybeLocal<Value> MessagePort::ReceiveMessage(Local<Context> context,
+ bool only_if_receiving) {
+ Message received;
+ {
+ // Get the head of the message queue.
+ Mutex::ScopedLock lock(data_->mutex_);
+
+ Debug(this, "MessagePort has message");
+
+ bool wants_message = receiving_messages_ || !only_if_receiving;
+ // We have nothing to do if:
+ // - There are no pending messages
+ // - We are not intending to receive messages, and the message we would
+ // receive is not the final "close" message.
+ if (data_->incoming_messages_.empty() ||
+ (!wants_message &&
+ !data_->incoming_messages_.front().IsCloseMessage())) {
+ return env()->no_message_symbol();
+ }
+
+ received = std::move(data_->incoming_messages_.front());
+ data_->incoming_messages_.pop_front();
+ }
+
+ if (received.IsCloseMessage()) {
+ Close();
+ return env()->no_message_symbol();
+ }
+
+ if (!env()->can_call_into_js()) return MaybeLocal<Value>();
+
+ return received.Deserialize(env(), context);
+}
+
void MessagePort::OnMessage() {
Debug(this, "Running MessagePort::OnMessage()");
HandleScope handle_scope(env()->isolate());
@@ -579,32 +613,12 @@ void MessagePort::OnMessage() {
// messages, so we need to check that this handle still owns its `data_` field
// on every iteration.
while (data_) {
- Message received;
- {
- // Get the head of the message queue.
- Mutex::ScopedLock lock(data_->mutex_);
-
- Debug(this, "MessagePort has message, receiving = %d",
- static_cast<int>(receiving_messages_));
-
- // We have nothing to do if:
- // - There are no pending messages
- // - We are not intending to receive messages, and the message we would
- // receive is not the final "close" message.
- if (data_->incoming_messages_.empty() ||
- (!receiving_messages_ &&
- !data_->incoming_messages_.front().IsCloseMessage())) {
- break;
- }
+ HandleScope handle_scope(env()->isolate());
+ Context::Scope context_scope(context);
- received = std::move(data_->incoming_messages_.front());
- data_->incoming_messages_.pop_front();
- }
-
- if (received.IsCloseMessage()) {
- Close();
- return;
- }
+ Local<Value> payload;
+ if (!ReceiveMessage(context, true).ToLocal(&payload)) break;
+ if (payload == env()->no_message_symbol()) break;
if (!env()->can_call_into_js()) {
Debug(this, "MessagePort drains queue because !can_call_into_js()");
@@ -612,28 +626,20 @@ void MessagePort::OnMessage() {
continue;
}
- {
- // Call the JS .onmessage() callback.
- HandleScope handle_scope(env()->isolate());
- Context::Scope context_scope(context);
-
- Local<Object> event;
- Local<Value> payload;
- Local<Value> cb_args[1];
- if (!received.Deserialize(env(), context).ToLocal(&payload) ||
- !env()->message_event_object_template()->NewInstance(context)
- .ToLocal(&event) ||
- event->Set(context, env()->data_string(), payload).IsNothing() ||
- event->Set(context, env()->target_string(), object()).IsNothing() ||
- (cb_args[0] = event, false) ||
- MakeCallback(env()->onmessage_string(),
- arraysize(cb_args),
- cb_args).IsEmpty()) {
- // Re-schedule OnMessage() execution in case of failure.
- if (data_)
- TriggerAsync();
- return;
- }
+ Local<Object> event;
+ Local<Value> cb_args[1];
+ if (!env()->message_event_object_template()->NewInstance(context)
+ .ToLocal(&event) ||
+ event->Set(context, env()->data_string(), payload).IsNothing() ||
+ event->Set(context, env()->target_string(), object()).IsNothing() ||
+ (cb_args[0] = event, false) ||
+ MakeCallback(env()->onmessage_string(),
+ arraysize(cb_args),
+ cb_args).IsEmpty()) {
+ // Re-schedule OnMessage() execution in case of failure.
+ if (data_)
+ TriggerAsync();
+ return;
}
}
}
@@ -754,11 +760,26 @@ void MessagePort::Stop(const FunctionCallbackInfo<Value>& args) {
void MessagePort::Drain(const FunctionCallbackInfo<Value>& args) {
MessagePort* port;
- CHECK(args[0]->IsObject());
ASSIGN_OR_RETURN_UNWRAP(&port, args[0].As<Object>());
port->OnMessage();
}
+void MessagePort::ReceiveMessage(const FunctionCallbackInfo<Value>& args) {
+ CHECK(args[0]->IsObject());
+ MessagePort* port = Unwrap<MessagePort>(args[0].As<Object>());
+ if (port == nullptr) {
+ // Return 'no messages' for a closed port.
+ args.GetReturnValue().Set(
+ Environment::GetCurrent(args)->no_message_symbol());
+ return;
+ }
+
+ MaybeLocal<Value> payload =
+ port->ReceiveMessage(port->object()->CreationContext(), false);
+ if (!payload.IsEmpty())
+ args.GetReturnValue().Set(payload.ToLocalChecked());
+}
+
void MessagePort::MoveToContext(const FunctionCallbackInfo<Value>& args) {
Environment* env = Environment::GetCurrent(args);
if (!args[0]->IsObject() ||
@@ -875,6 +896,7 @@ static void InitMessaging(Local<Object> target,
// the browser equivalents do not provide them.
env->SetMethod(target, "stopMessagePort", MessagePort::Stop);
env->SetMethod(target, "drainMessagePort", MessagePort::Drain);
+ env->SetMethod(target, "receiveMessageOnPort", MessagePort::ReceiveMessage);
env->SetMethod(target, "moveMessagePortToContext",
MessagePort::MoveToContext);
}