summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/node_messaging.cc99
-rw-r--r--src/node_messaging.h31
-rw-r--r--test/parallel/test-worker-message-port-transfer-closed.js54
-rw-r--r--test/parallel/test-worker-message-port-transfer-self.js33
-rw-r--r--test/parallel/test-worker-message-port-transfer-target.js24
5 files changed, 213 insertions, 28 deletions
diff --git a/src/node_messaging.cc b/src/node_messaging.cc
index 7ddd3c4165..712add06d3 100644
--- a/src/node_messaging.cc
+++ b/src/node_messaging.cc
@@ -144,7 +144,7 @@ void Message::AddMessagePort(std::unique_ptr<MessagePortData>&& data) {
namespace {
-void ThrowDataCloneError(Environment* env, Local<String> message) {
+void ThrowDataCloneException(Environment* env, Local<String> message) {
Local<Value> argv[] = {
message,
FIXED_ONE_BYTE_STRING(env->isolate(), "DataCloneError")
@@ -168,7 +168,7 @@ class SerializerDelegate : public ValueSerializer::Delegate {
: env_(env), context_(context), msg_(m) {}
void ThrowDataCloneError(Local<String> message) override {
- ThrowDataCloneError(env_, message);
+ ThrowDataCloneException(env_, message);
}
Maybe<bool> WriteHostObject(Isolate* isolate, Local<Object> object) override {
@@ -239,7 +239,8 @@ class SerializerDelegate : public ValueSerializer::Delegate {
Maybe<bool> Message::Serialize(Environment* env,
Local<Context> context,
Local<Value> input,
- Local<Value> transfer_list_v) {
+ Local<Value> transfer_list_v,
+ Local<Object> source_port) {
HandleScope handle_scope(env->isolate());
Context::Scope context_scope(context);
@@ -273,8 +274,23 @@ Maybe<bool> Message::Serialize(Environment* env,
continue;
} else if (env->message_port_constructor_template()
->HasInstance(entry)) {
+ // Check if the source MessagePort is being transferred.
+ if (!source_port.IsEmpty() && entry == source_port) {
+ ThrowDataCloneException(
+ env,
+ FIXED_ONE_BYTE_STRING(env->isolate(),
+ "Transfer list contains source port"));
+ return Nothing<bool>();
+ }
MessagePort* port = Unwrap<MessagePort>(entry.As<Object>());
- CHECK_NE(port, nullptr);
+ if (port == nullptr || port->IsDetached()) {
+ ThrowDataCloneException(
+ env,
+ FIXED_ONE_BYTE_STRING(
+ env->isolate(),
+ "MessagePort in transfer list is already detached"));
+ return Nothing<bool>();
+ }
delegate.ports_.push_back(port);
continue;
}
@@ -410,6 +426,10 @@ uv_async_t* MessagePort::async() {
return reinterpret_cast<uv_async_t*>(GetHandle());
}
+bool MessagePort::IsDetached() const {
+ return data_ == nullptr || IsHandleClosing();
+}
+
void MessagePort::TriggerAsync() {
if (IsHandleClosing()) return;
CHECK_EQ(uv_async_send(async()), 0);
@@ -552,36 +572,69 @@ std::unique_ptr<MessagePortData> MessagePort::Detach() {
}
-void MessagePort::Send(Message&& message) {
- Mutex::ScopedLock lock(*data_->sibling_mutex_);
- if (data_->sibling_ == nullptr)
- return;
- data_->sibling_->AddToIncomingQueue(std::move(message));
-}
+Maybe<bool> MessagePort::PostMessage(Environment* env,
+ Local<Value> message_v,
+ Local<Value> transfer_v) {
+ Isolate* isolate = env->isolate();
+ Local<Object> obj = object(isolate);
+ Local<Context> context = obj->CreationContext();
-void MessagePort::Send(const FunctionCallbackInfo<Value>& args) {
- Environment* env = Environment::GetCurrent(args);
- Local<Context> context = object(env->isolate())->CreationContext();
Message msg;
- if (msg.Serialize(env, context, args[0], args[1])
- .IsNothing()) {
- return;
+
+ // Per spec, we need to both check if transfer list has the source port, and
+ // serialize the input message, even if the MessagePort is closed or detached.
+
+ Maybe<bool> serialization_maybe =
+ msg.Serialize(env, context, message_v, transfer_v, obj);
+ if (data_ == nullptr) {
+ return serialization_maybe;
+ }
+ if (serialization_maybe.IsNothing()) {
+ return Nothing<bool>();
+ }
+
+ Mutex::ScopedLock lock(*data_->sibling_mutex_);
+ bool doomed = false;
+
+ // Check if the target port is posted to itself.
+ if (data_->sibling_ != nullptr) {
+ for (const auto& port_data : msg.message_ports()) {
+ if (data_->sibling_ == port_data.get()) {
+ doomed = true;
+ ProcessEmitWarning(env, "The target port was posted to itself, and "
+ "the communication channel was lost");
+ break;
+ }
+ }
}
- Send(std::move(msg));
+
+ if (data_->sibling_ == nullptr || doomed)
+ return Just(true);
+
+ data_->sibling_->AddToIncomingQueue(std::move(msg));
+ return Just(true);
}
void MessagePort::PostMessage(const FunctionCallbackInfo<Value>& args) {
Environment* env = Environment::GetCurrent(args);
- MessagePort* port;
- ASSIGN_OR_RETURN_UNWRAP(&port, args.This());
- if (!port->data_) {
- return THROW_ERR_CLOSED_MESSAGE_PORT(env);
- }
if (args.Length() == 0) {
return THROW_ERR_MISSING_ARGS(env, "Not enough arguments to "
"MessagePort.postMessage");
}
- port->Send(args);
+
+ MessagePort* port = Unwrap<MessagePort>(args.This());
+ // Even if the backing MessagePort object has already been deleted, we still
+ // want to serialize the message to ensure spec-compliant behavior w.r.t.
+ // transfers.
+ if (port == nullptr) {
+ Message msg;
+ Local<Object> obj = args.This();
+ Local<Context> context = obj->CreationContext();
+ USE(msg.Serialize(env, context, args[0], args[1], obj));
+ return;
+ }
+
+ port->PostMessage(env, args[0], args[1]);
}
void MessagePort::Start() {
diff --git a/src/node_messaging.h b/src/node_messaging.h
index 28122c526c..62ae633b9e 100644
--- a/src/node_messaging.h
+++ b/src/node_messaging.h
@@ -32,10 +32,14 @@ class Message {
// Serialize a JS value, and optionally transfer objects, into this message.
// The Message object retains ownership of all transferred objects until
// deserialization.
+ // The source_port parameter, if provided, will make Serialize() throw a
+ // "DataCloneError" DOMException if source_port is found in transfer_list.
v8::Maybe<bool> Serialize(Environment* env,
v8::Local<v8::Context> context,
v8::Local<v8::Value> input,
- v8::Local<v8::Value> transfer_list);
+ v8::Local<v8::Value> transfer_list,
+ v8::Local<v8::Object> source_port =
+ v8::Local<v8::Object>());
// Internal method of Message that is called when a new SharedArrayBuffer
// object is encountered in the incoming value's structure.
@@ -44,6 +48,13 @@ class Message {
// and that transfers ownership of `data` to this message.
void AddMessagePort(std::unique_ptr<MessagePortData>&& data);
+ // The MessagePorts that will be transferred, as recorded by Serialize().
+ // Used for warning user about posting the target MessagePort to itself,
+ // which will as a side effect destroy the communication channel.
+ const std::vector<std::unique_ptr<MessagePortData>>& message_ports() const {
+ return message_ports_;
+ }
+
private:
MallocedBuffer<char> main_message_buf_;
std::vector<MallocedBuffer<char>> array_buffer_contents_;
@@ -122,10 +133,11 @@ class MessagePort : public HandleWrap {
std::unique_ptr<MessagePortData> data = nullptr);
// Send a message, i.e. deliver it into the sibling's incoming queue.
- // If there is no sibling, i.e. this port is closed,
- // this message is silently discarded.
- void Send(Message&& message);
- void Send(const v8::FunctionCallbackInfo<v8::Value>& args);
+ // If this port is closed, or if there is no sibling, this message is
+ // serialized with transfers, then silently discarded.
+ v8::Maybe<bool> PostMessage(Environment* env,
+ v8::Local<v8::Value> message,
+ v8::Local<v8::Value> transfer);
// Deliver a single message into this port's incoming queue.
void AddToIncomingQueue(Message&& message);
@@ -157,6 +169,15 @@ class MessagePort : public HandleWrap {
void Close(
v8::Local<v8::Value> close_callback = v8::Local<v8::Value>()) override;
+ // Returns true if either data_ has been freed, or if the handle is being
+ // closed. Equivalent to the [[Detached]] internal slot in the HTML Standard.
+ //
+ // If checking if a JavaScript MessagePort object is detached, this method
+ // alone is often not enough, since the backing C++ MessagePort object may
+ // have been deleted already. For all intents and purposes, an object with a
+ // NULL pointer to the C++ MessagePort object is also detached.
+ inline bool IsDetached() const;
+
size_t self_size() const override;
private:
diff --git a/test/parallel/test-worker-message-port-transfer-closed.js b/test/parallel/test-worker-message-port-transfer-closed.js
new file mode 100644
index 0000000000..435a3789fc
--- /dev/null
+++ b/test/parallel/test-worker-message-port-transfer-closed.js
@@ -0,0 +1,54 @@
+// Flags: --experimental-worker
+'use strict';
+
+const common = require('../common');
+const assert = require('assert');
+const { MessageChannel } = require('worker_threads');
+
+// This tests various behaviors around transferring MessagePorts with closing
+// or closed handles.
+
+const { port1, port2 } = new MessageChannel();
+
+const arrayBuf = new ArrayBuffer(10);
+port1.onmessage = common.mustNotCall();
+port2.onmessage = common.mustNotCall();
+
+function testSingle(closedPort, potentiallyOpenPort) {
+ assert.throws(common.mustCall(() => {
+ potentiallyOpenPort.postMessage(null, [arrayBuf, closedPort]);
+ }), common.mustCall((err) => {
+ assert.strictEqual(err.name, 'DataCloneError');
+ assert.strictEqual(err.message,
+ 'MessagePort in transfer list is already detached');
+ assert.strictEqual(err.code, 25);
+ assert.ok(err instanceof Error);
+
+ const DOMException = err.constructor;
+ assert.ok(err instanceof DOMException);
+ assert.strictEqual(DOMException.name, 'DOMException');
+
+ return true;
+ }));
+
+ // arrayBuf must not be transferred, even though it is present earlier in the
+ // transfer list than the closedPort.
+ assert.strictEqual(arrayBuf.byteLength, 10);
+}
+
+function testBothClosed() {
+ testSingle(port1, port2);
+ testSingle(port2, port1);
+}
+
+// Even though the port handles may not be completely closed in C++ land, the
+// observable behavior must be that the closing/detachment is synchronous and
+// instant.
+
+port1.close(common.mustCall(testBothClosed));
+testSingle(port1, port2);
+port2.close(common.mustCall(testBothClosed));
+testBothClosed();
+
+setTimeout(common.mustNotCall('The communication channel is still open'),
+ common.platformTimeout(1000)).unref();
diff --git a/test/parallel/test-worker-message-port-transfer-self.js b/test/parallel/test-worker-message-port-transfer-self.js
new file mode 100644
index 0000000000..1855023cdf
--- /dev/null
+++ b/test/parallel/test-worker-message-port-transfer-self.js
@@ -0,0 +1,33 @@
+// Flags: --experimental-worker
+'use strict';
+
+const common = require('../common');
+const assert = require('assert');
+const { MessageChannel } = require('worker_threads');
+
+const { port1, port2 } = new MessageChannel();
+
+assert.throws(common.mustCall(() => {
+ port1.postMessage(null, [port1]);
+}), common.mustCall((err) => {
+ assert.strictEqual(err.name, 'DataCloneError');
+ assert.strictEqual(err.message, 'Transfer list contains source port');
+ assert.strictEqual(err.code, 25);
+ assert.ok(err instanceof Error);
+
+ const DOMException = err.constructor;
+ assert.ok(err instanceof DOMException);
+ assert.strictEqual(DOMException.name, 'DOMException');
+
+ return true;
+}));
+
+// The failed transfer should not affect the ports in anyway.
+port2.onmessage = common.mustCall((message) => {
+ assert.strictEqual(message, 2);
+ port1.close();
+
+ setTimeout(common.mustNotCall('The communication channel is still open'),
+ common.platformTimeout(1000)).unref();
+});
+port1.postMessage(2);
diff --git a/test/parallel/test-worker-message-port-transfer-target.js b/test/parallel/test-worker-message-port-transfer-target.js
new file mode 100644
index 0000000000..8e6354d826
--- /dev/null
+++ b/test/parallel/test-worker-message-port-transfer-target.js
@@ -0,0 +1,24 @@
+// Flags: --experimental-worker
+'use strict';
+
+const common = require('../common');
+const assert = require('assert');
+const { MessageChannel } = require('worker_threads');
+
+const { port1, port2 } = new MessageChannel();
+
+const arrayBuf = new ArrayBuffer(10);
+
+common.expectWarning('Warning',
+ 'The target port was posted to itself, and the ' +
+ 'communication channel was lost',
+ common.noWarnCode);
+port2.onmessage = common.mustNotCall();
+port2.postMessage(null, [port1, arrayBuf]);
+
+// arrayBuf must be transferred, despite the fact that port2 never received the
+// message.
+assert.strictEqual(arrayBuf.byteLength, 0);
+
+setTimeout(common.mustNotCall('The communication channel is still open'),
+ common.platformTimeout(1000)).unref();