diff options
author | Anna Henningsen <anna@addaleax.net> | 2019-04-18 00:56:57 +0200 |
---|---|---|
committer | Anna Henningsen <anna@addaleax.net> | 2019-05-19 22:01:34 +0200 |
commit | 76f2168393d5b97aadce573415143383ccd3d78e (patch) | |
tree | dc1f2de83234e4fb1892e82d6d432f8ed54d2a98 /src/node_messaging.cc | |
parent | 13d2df530b613f5f41db3e9680fa85d55bea0f1e (diff) | |
download | android-node-v8-76f2168393d5b97aadce573415143383ccd3d78e.tar.gz android-node-v8-76f2168393d5b97aadce573415143383ccd3d78e.tar.bz2 android-node-v8-76f2168393d5b97aadce573415143383ccd3d78e.zip |
worker: add ability to unshift message from MessagePort
In combination with Atomics, this makes it possible to implement
generic synchronous functionality, e.g. `importScript()`, in Workers
purely by communicating with other threads.
This is a continuation of https://github.com/nodejs/node/pull/26686,
where a preference for a solution was voiced that allowed reading
individual messages, rather than emitting all messages through events.
PR-URL: https://github.com/nodejs/node/pull/27294
Reviewed-By: Benjamin Gruenbaum <benjamingr@gmail.com>
Reviewed-By: James M Snell <jasnell@gmail.com>
Reviewed-By: Gus Caplan <me@gus.host>
Reviewed-By: Ruben Bridgewater <ruben@bridgewater.de>
Diffstat (limited to 'src/node_messaging.cc')
-rw-r--r-- | src/node_messaging.cc | 118 |
1 files changed, 70 insertions, 48 deletions
diff --git a/src/node_messaging.cc b/src/node_messaging.cc index 98ef42df75..ea7b48779c 100644 --- a/src/node_messaging.cc +++ b/src/node_messaging.cc @@ -569,6 +569,40 @@ MessagePort* MessagePort::New( return port; } +MaybeLocal<Value> MessagePort::ReceiveMessage(Local<Context> context, + bool only_if_receiving) { + Message received; + { + // Get the head of the message queue. + Mutex::ScopedLock lock(data_->mutex_); + + Debug(this, "MessagePort has message"); + + bool wants_message = receiving_messages_ || !only_if_receiving; + // We have nothing to do if: + // - There are no pending messages + // - We are not intending to receive messages, and the message we would + // receive is not the final "close" message. + if (data_->incoming_messages_.empty() || + (!wants_message && + !data_->incoming_messages_.front().IsCloseMessage())) { + return env()->no_message_symbol(); + } + + received = std::move(data_->incoming_messages_.front()); + data_->incoming_messages_.pop_front(); + } + + if (received.IsCloseMessage()) { + Close(); + return env()->no_message_symbol(); + } + + if (!env()->can_call_into_js()) return MaybeLocal<Value>(); + + return received.Deserialize(env(), context); +} + void MessagePort::OnMessage() { Debug(this, "Running MessagePort::OnMessage()"); HandleScope handle_scope(env()->isolate()); @@ -579,32 +613,12 @@ void MessagePort::OnMessage() { // messages, so we need to check that this handle still owns its `data_` field // on every iteration. while (data_) { - Message received; - { - // Get the head of the message queue. - Mutex::ScopedLock lock(data_->mutex_); - - Debug(this, "MessagePort has message, receiving = %d", - static_cast<int>(receiving_messages_)); - - // We have nothing to do if: - // - There are no pending messages - // - We are not intending to receive messages, and the message we would - // receive is not the final "close" message. - if (data_->incoming_messages_.empty() || - (!receiving_messages_ && - !data_->incoming_messages_.front().IsCloseMessage())) { - break; - } + HandleScope handle_scope(env()->isolate()); + Context::Scope context_scope(context); - received = std::move(data_->incoming_messages_.front()); - data_->incoming_messages_.pop_front(); - } - - if (received.IsCloseMessage()) { - Close(); - return; - } + Local<Value> payload; + if (!ReceiveMessage(context, true).ToLocal(&payload)) break; + if (payload == env()->no_message_symbol()) break; if (!env()->can_call_into_js()) { Debug(this, "MessagePort drains queue because !can_call_into_js()"); @@ -612,28 +626,20 @@ void MessagePort::OnMessage() { continue; } - { - // Call the JS .onmessage() callback. - HandleScope handle_scope(env()->isolate()); - Context::Scope context_scope(context); - - Local<Object> event; - Local<Value> payload; - Local<Value> cb_args[1]; - if (!received.Deserialize(env(), context).ToLocal(&payload) || - !env()->message_event_object_template()->NewInstance(context) - .ToLocal(&event) || - event->Set(context, env()->data_string(), payload).IsNothing() || - event->Set(context, env()->target_string(), object()).IsNothing() || - (cb_args[0] = event, false) || - MakeCallback(env()->onmessage_string(), - arraysize(cb_args), - cb_args).IsEmpty()) { - // Re-schedule OnMessage() execution in case of failure. - if (data_) - TriggerAsync(); - return; - } + Local<Object> event; + Local<Value> cb_args[1]; + if (!env()->message_event_object_template()->NewInstance(context) + .ToLocal(&event) || + event->Set(context, env()->data_string(), payload).IsNothing() || + event->Set(context, env()->target_string(), object()).IsNothing() || + (cb_args[0] = event, false) || + MakeCallback(env()->onmessage_string(), + arraysize(cb_args), + cb_args).IsEmpty()) { + // Re-schedule OnMessage() execution in case of failure. + if (data_) + TriggerAsync(); + return; } } } @@ -754,11 +760,26 @@ void MessagePort::Stop(const FunctionCallbackInfo<Value>& args) { void MessagePort::Drain(const FunctionCallbackInfo<Value>& args) { MessagePort* port; - CHECK(args[0]->IsObject()); ASSIGN_OR_RETURN_UNWRAP(&port, args[0].As<Object>()); port->OnMessage(); } +void MessagePort::ReceiveMessage(const FunctionCallbackInfo<Value>& args) { + CHECK(args[0]->IsObject()); + MessagePort* port = Unwrap<MessagePort>(args[0].As<Object>()); + if (port == nullptr) { + // Return 'no messages' for a closed port. + args.GetReturnValue().Set( + Environment::GetCurrent(args)->no_message_symbol()); + return; + } + + MaybeLocal<Value> payload = + port->ReceiveMessage(port->object()->CreationContext(), false); + if (!payload.IsEmpty()) + args.GetReturnValue().Set(payload.ToLocalChecked()); +} + void MessagePort::MoveToContext(const FunctionCallbackInfo<Value>& args) { Environment* env = Environment::GetCurrent(args); if (!args[0]->IsObject() || @@ -875,6 +896,7 @@ static void InitMessaging(Local<Object> target, // the browser equivalents do not provide them. env->SetMethod(target, "stopMessagePort", MessagePort::Stop); env->SetMethod(target, "drainMessagePort", MessagePort::Drain); + env->SetMethod(target, "receiveMessageOnPort", MessagePort::ReceiveMessage); env->SetMethod(target, "moveMessagePortToContext", MessagePort::MoveToContext); } |