summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAnna Henningsen <anna@addaleax.net>2019-05-15 01:24:57 +0200
committerAnna Henningsen <anna@addaleax.net>2019-05-17 14:01:27 +0200
commit6019060cbb0620d685c8a106a4184475420e922b (patch)
tree07cd79823b255f184a7f6f1dfd4652a87f4699e1
parent001526cc4ce6b3ec55fddcc927d9f610b162a657 (diff)
downloadandroid-node-v8-6019060cbb0620d685c8a106a4184475420e922b.tar.gz
android-node-v8-6019060cbb0620d685c8a106a4184475420e922b.tar.bz2
android-node-v8-6019060cbb0620d685c8a106a4184475420e922b.zip
worker: use special message as MessagePort close command
When a `MessagePort` connected to another `MessagePort` closes, the latter `MessagePort` will be closed as well. Until now, this is done by testing whether the ports are still entangled after processing messages. This leaves open a race condition window in which messages sent just before the closure can be lost when timing is unfortunate. (A description of the timing is in the test file.) This can be addressed by using a special message instead, which is the last message received by a `MessagePort`. This way, all previously sent messages are processed first. Fixes: https://github.com/nodejs/node/issues/22762 PR-URL: https://github.com/nodejs/node/pull/27705 Reviewed-By: Rich Trott <rtrott@gmail.com> Reviewed-By: Colin Ihrig <cjihrig@gmail.com>
-rw-r--r--src/node_messaging.cc53
-rw-r--r--src/node_messaging.h16
-rw-r--r--test/parallel/test-worker-message-port-message-before-close.js38
3 files changed, 71 insertions, 36 deletions
diff --git a/src/node_messaging.cc b/src/node_messaging.cc
index b9212ba272..98ef42df75 100644
--- a/src/node_messaging.cc
+++ b/src/node_messaging.cc
@@ -40,6 +40,10 @@ namespace worker {
Message::Message(MallocedBuffer<char>&& buffer)
: main_message_buf_(std::move(buffer)) {}
+bool Message::IsCloseMessage() const {
+ return main_message_buf_.data == nullptr;
+}
+
namespace {
// This is used to tell V8 how to read transferred host objects, like other
@@ -91,6 +95,8 @@ class DeserializerDelegate : public ValueDeserializer::Delegate {
MaybeLocal<Value> Message::Deserialize(Environment* env,
Local<Context> context) {
+ CHECK(!IsCloseMessage());
+
EscapableHandleScope handle_scope(env->isolate());
Context::Scope context_scope(context);
@@ -395,6 +401,7 @@ Maybe<bool> Message::Serialize(Environment* env,
// The serializer gave us a buffer allocated using `malloc()`.
std::pair<uint8_t*, size_t> data = serializer.Release();
+ CHECK_NOT_NULL(data.first);
main_message_buf_ =
MallocedBuffer<char>(reinterpret_cast<char*>(data.first), data.second);
return Just(true);
@@ -430,11 +437,6 @@ void MessagePortData::AddToIncomingQueue(Message&& message) {
}
}
-bool MessagePortData::IsSiblingClosed() const {
- Mutex::ScopedLock lock(*sibling_mutex_);
- return sibling_ == nullptr;
-}
-
void MessagePortData::Entangle(MessagePortData* a, MessagePortData* b) {
CHECK_NULL(a->sibling_);
CHECK_NULL(b->sibling_);
@@ -443,12 +445,6 @@ void MessagePortData::Entangle(MessagePortData* a, MessagePortData* b) {
a->sibling_mutex_ = b->sibling_mutex_;
}
-void MessagePortData::PingOwnerAfterDisentanglement() {
- Mutex::ScopedLock lock(mutex_);
- if (owner_ != nullptr)
- owner_->TriggerAsync();
-}
-
void MessagePortData::Disentangle() {
// Grab a copy of the sibling mutex, then replace it so that each sibling
// has its own sibling_mutex_ now.
@@ -462,11 +458,12 @@ void MessagePortData::Disentangle() {
sibling_ = nullptr;
}
- // We close MessagePorts after disentanglement, so we trigger the
- // corresponding uv_async_t to let them know that this happened.
- PingOwnerAfterDisentanglement();
+ // We close MessagePorts after disentanglement, so we enqueue a corresponding
+ // message and trigger the corresponding uv_async_t to let them know that
+ // this happened.
+ AddToIncomingQueue(Message());
if (sibling != nullptr) {
- sibling->PingOwnerAfterDisentanglement();
+ sibling->AddToIncomingQueue(Message());
}
}
@@ -590,14 +587,25 @@ void MessagePort::OnMessage() {
Debug(this, "MessagePort has message, receiving = %d",
static_cast<int>(receiving_messages_));
- if (!receiving_messages_)
- break;
- if (data_->incoming_messages_.empty())
+ // We have nothing to do if:
+ // - There are no pending messages
+ // - We are not intending to receive messages, and the message we would
+ // receive is not the final "close" message.
+ if (data_->incoming_messages_.empty() ||
+ (!receiving_messages_ &&
+ !data_->incoming_messages_.front().IsCloseMessage())) {
break;
+ }
+
received = std::move(data_->incoming_messages_.front());
data_->incoming_messages_.pop_front();
}
+ if (received.IsCloseMessage()) {
+ Close();
+ return;
+ }
+
if (!env()->can_call_into_js()) {
Debug(this, "MessagePort drains queue because !can_call_into_js()");
// In this case there is nothing to do but to drain the current queue.
@@ -628,15 +636,6 @@ void MessagePort::OnMessage() {
}
}
}
-
- if (data_ && data_->IsSiblingClosed()) {
- Close();
- }
-}
-
-bool MessagePort::IsSiblingClosed() const {
- CHECK(data_);
- return data_->IsSiblingClosed();
}
void MessagePort::OnClose() {
diff --git a/src/node_messaging.h b/src/node_messaging.h
index 0a729c1410..08a6798e3c 100644
--- a/src/node_messaging.h
+++ b/src/node_messaging.h
@@ -17,6 +17,9 @@ class MessagePort;
// Represents a single communication message.
class Message : public MemoryRetainer {
public:
+ // Create a Message with a specific underlying payload, in the format of the
+ // V8 ValueSerializer API. If `payload` is empty, this message indicates
+ // that the receiving message port should close itself.
explicit Message(MallocedBuffer<char>&& payload = MallocedBuffer<char>());
Message(Message&& other) = default;
@@ -24,6 +27,10 @@ class Message : public MemoryRetainer {
Message& operator=(const Message&) = delete;
Message(const Message&) = delete;
+ // Whether this is a message indicating that the port is to be closed.
+ // This is the last message to be received by a MessagePort.
+ bool IsCloseMessage() const;
+
// Deserialize the contained JS value. May only be called once, and only
// after Serialize() has been called (e.g. by another thread).
v8::MaybeLocal<v8::Value> Deserialize(Environment* env,
@@ -89,10 +96,6 @@ class MessagePortData : public MemoryRetainer {
// This may be called from any thread.
void AddToIncomingQueue(Message&& message);
- // Returns true if and only this MessagePort is currently not entangled
- // with another message port.
- bool IsSiblingClosed() const;
-
// Turns `a` and `b` into siblings, i.e. connects the sending side of one
// to the receiving side of the other. This is not thread-safe.
static void Entangle(MessagePortData* a, MessagePortData* b);
@@ -109,10 +112,6 @@ class MessagePortData : public MemoryRetainer {
SET_SELF_SIZE(MessagePortData)
private:
- // After disentangling this message port, the owner handle (if any)
- // is asynchronously triggered, so that it can close down naturally.
- void PingOwnerAfterDisentanglement();
-
// This mutex protects all fields below it, with the exception of
// sibling_.
mutable Mutex mutex_;
@@ -178,7 +177,6 @@ class MessagePort : public HandleWrap {
// messages.
std::unique_ptr<MessagePortData> Detach();
- bool IsSiblingClosed() const;
void Close(
v8::Local<v8::Value> close_callback = v8::Local<v8::Value>()) override;
diff --git a/test/parallel/test-worker-message-port-message-before-close.js b/test/parallel/test-worker-message-port-message-before-close.js
new file mode 100644
index 0000000000..ecaad9c876
--- /dev/null
+++ b/test/parallel/test-worker-message-port-message-before-close.js
@@ -0,0 +1,38 @@
+'use strict';
+const common = require('../common');
+const assert = require('assert');
+const { once } = require('events');
+const { Worker, MessageChannel } = require('worker_threads');
+
+// This is a regression test for the race condition underlying
+// https://github.com/nodejs/node/issues/22762.
+// It ensures that all messages send before a MessagePort#close() call are
+// received. Previously, what could happen was a race condition like this:
+// - Thread 1 sends message A
+// - Thread 2 begins receiving/emitting message A
+// - Thread 1 sends message B
+// - Thread 1 closes its side of the channel
+// - Thread 2 finishes receiving/emitting message A
+// - Thread 2 sees that the port should be closed
+// - Thread 2 closes the port, discarding message B in the process.
+
+async function test() {
+ const worker = new Worker(`
+ require('worker_threads').parentPort.on('message', ({ port }) => {
+ port.postMessage('firstMessage');
+ port.postMessage('lastMessage');
+ port.close();
+ });
+ `, { eval: true });
+
+ for (let i = 0; i < 10000; i++) {
+ const { port1, port2 } = new MessageChannel();
+ worker.postMessage({ port: port2 }, [ port2 ]);
+ await once(port1, 'message'); // 'complexObject'
+ assert.deepStrictEqual(await once(port1, 'message'), ['lastMessage']);
+ }
+
+ worker.terminate();
+}
+
+test().then(common.mustCall());