aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/node_messaging.cc19
-rw-r--r--test/parallel/test-worker-message-port-close-while-receiving.js15
-rw-r--r--test/parallel/test-worker-message-port-infinite-message-loop.js29
3 files changed, 63 insertions, 0 deletions
diff --git a/src/node_messaging.cc b/src/node_messaging.cc
index 5aec784f60..19065fdb7d 100644
--- a/src/node_messaging.cc
+++ b/src/node_messaging.cc
@@ -604,11 +604,30 @@ void MessagePort::OnMessage() {
HandleScope handle_scope(env()->isolate());
Local<Context> context = object(env()->isolate())->CreationContext();
+ size_t processing_limit;
+ {
+ Mutex::ScopedLock(data_->mutex_);
+ processing_limit = std::max(data_->incoming_messages_.size(),
+ static_cast<size_t>(1000));
+ }
+
// data_ can only ever be modified by the owner thread, so no need to lock.
// However, the message port may be transferred while it is processing
// messages, so we need to check that this handle still owns its `data_` field
// on every iteration.
while (data_) {
+ if (processing_limit-- == 0) {
+ // Prevent event loop starvation by only processing those messages without
+ // interruption that were already present when the OnMessage() call was
+ // first triggered, but at least 1000 messages because otherwise the
+ // overhead of repeatedly triggering the uv_async_t instance becomes
+ // noticable, at least on Windows.
+ // (That might require more investigation by somebody more familiar with
+ // Windows.)
+ TriggerAsync();
+ return;
+ }
+
HandleScope handle_scope(env()->isolate());
Context::Scope context_scope(context);
diff --git a/test/parallel/test-worker-message-port-close-while-receiving.js b/test/parallel/test-worker-message-port-close-while-receiving.js
new file mode 100644
index 0000000000..d6f73caff1
--- /dev/null
+++ b/test/parallel/test-worker-message-port-close-while-receiving.js
@@ -0,0 +1,15 @@
+'use strict';
+const common = require('../common');
+
+const { MessageChannel } = require('worker_threads');
+
+// Make sure that closing a message port while receiving messages on it does
+// not stop messages that are already in the queue from being emitted.
+
+const { port1, port2 } = new MessageChannel();
+
+port1.on('message', common.mustCall(() => {
+ port1.close();
+}, 2));
+port2.postMessage('foo');
+port2.postMessage('bar');
diff --git a/test/parallel/test-worker-message-port-infinite-message-loop.js b/test/parallel/test-worker-message-port-infinite-message-loop.js
new file mode 100644
index 0000000000..640b3383ca
--- /dev/null
+++ b/test/parallel/test-worker-message-port-infinite-message-loop.js
@@ -0,0 +1,29 @@
+'use strict';
+const common = require('../common');
+const assert = require('assert');
+
+const { MessageChannel } = require('worker_threads');
+
+// Make sure that an infinite asynchronous .on('message')/postMessage loop
+// does not lead to a stack overflow and does not starve the event loop.
+// We schedule timeouts both from before the the .on('message') handler and
+// inside of it, which both should run.
+
+const { port1, port2 } = new MessageChannel();
+let count = 0;
+port1.on('message', () => {
+ if (count === 0) {
+ setTimeout(common.mustCall(() => {
+ port1.close();
+ }), 0);
+ }
+
+ port2.postMessage(0);
+ assert(count++ < 10000, `hit ${count} loop iterations`);
+});
+
+port2.postMessage(0);
+
+// This is part of the test -- the event loop should be available and not stall
+// out due to the recursive .postMessage() calls.
+setTimeout(common.mustCall(), 0);