summaryrefslogtreecommitdiff
path: root/src/node_messaging.cc
diff options
context:
space:
mode:
Diffstat (limited to 'src/node_messaging.cc')
-rw-r--r--src/node_messaging.cc29
1 files changed, 26 insertions, 3 deletions
diff --git a/src/node_messaging.cc b/src/node_messaging.cc
index b56cef2d77..352749ea48 100644
--- a/src/node_messaging.cc
+++ b/src/node_messaging.cc
@@ -57,7 +57,7 @@ class DeserializerDelegate : public ValueDeserializer::Delegate {
if (!deserializer->ReadUint32(&id))
return MaybeLocal<Object>();
CHECK_LE(id, message_ports_.size());
- return message_ports_[id]->object();
+ return message_ports_[id]->object(isolate);
};
MaybeLocal<SharedArrayBuffer> GetSharedArrayBufferFromId(
@@ -436,7 +436,7 @@ MessagePort* MessagePort::New(
void MessagePort::OnMessage() {
HandleScope handle_scope(env()->isolate());
- Local<Context> context = object()->CreationContext();
+ Local<Context> context = object(env()->isolate())->CreationContext();
// data_ can only ever be modified by the owner thread, so no need to lock.
// However, the message port may be transferred while it is processing
@@ -447,6 +447,13 @@ void MessagePort::OnMessage() {
{
// Get the head of the message queue.
Mutex::ScopedLock lock(data_->mutex_);
+
+ if (stop_event_loop_) {
+ CHECK(!data_->receiving_messages_);
+ uv_stop(env()->event_loop());
+ break;
+ }
+
if (!data_->receiving_messages_)
break;
if (data_->incoming_messages_.empty())
@@ -514,8 +521,9 @@ void MessagePort::Send(Message&& message) {
void MessagePort::Send(const FunctionCallbackInfo<Value>& args) {
Environment* env = Environment::GetCurrent(args);
+ Local<Context> context = object(env->isolate())->CreationContext();
Message msg;
- if (msg.Serialize(env, object()->CreationContext(), args[0], args[1])
+ if (msg.Serialize(env, context, args[0], args[1])
.IsNothing()) {
return;
}
@@ -548,6 +556,14 @@ void MessagePort::Stop() {
data_->receiving_messages_ = false;
}
+void MessagePort::StopEventLoop() {
+ Mutex::ScopedLock lock(data_->mutex_);
+ data_->receiving_messages_ = false;
+ stop_event_loop_ = true;
+
+ TriggerAsync();
+}
+
void MessagePort::Start(const FunctionCallbackInfo<Value>& args) {
Environment* env = Environment::GetCurrent(args);
MessagePort* port;
@@ -570,6 +586,12 @@ void MessagePort::Stop(const FunctionCallbackInfo<Value>& args) {
port->Stop();
}
+void MessagePort::Drain(const FunctionCallbackInfo<Value>& args) {
+ MessagePort* port;
+ ASSIGN_OR_RETURN_UNWRAP(&port, args.This());
+ port->OnMessage();
+}
+
size_t MessagePort::self_size() const {
Mutex::ScopedLock lock(data_->mutex_);
size_t sz = sizeof(*this) + sizeof(*data_);
@@ -604,6 +626,7 @@ MaybeLocal<Function> GetMessagePortConstructor(
env->SetProtoMethod(m, "postMessage", MessagePort::PostMessage);
env->SetProtoMethod(m, "start", MessagePort::Start);
env->SetProtoMethod(m, "stop", MessagePort::Stop);
+ env->SetProtoMethod(m, "drain", MessagePort::Drain);
env->SetProtoMethod(m, "close", HandleWrap::Close);
env->SetProtoMethod(m, "unref", HandleWrap::Unref);
env->SetProtoMethod(m, "ref", HandleWrap::Ref);