summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/env.h3
-rw-r--r--src/node_messaging.cc217
-rw-r--r--src/node_messaging.h6
-rw-r--r--test/parallel/test-worker-message-port-terminate-transfer-list.js26
-rw-r--r--test/parallel/test-worker-message-port.js63
5 files changed, 238 insertions, 77 deletions
diff --git a/src/env.h b/src/env.h
index 15c2f95313..87d71bbc9f 100644
--- a/src/env.h
+++ b/src/env.h
@@ -211,6 +211,7 @@ constexpr size_t kFsStatsBufferLength =
V(dns_soa_string, "SOA") \
V(dns_srv_string, "SRV") \
V(dns_txt_string, "TXT") \
+ V(done_string, "done") \
V(duration_string, "duration") \
V(emit_warning_string, "emitWarning") \
V(encoding_string, "encoding") \
@@ -272,6 +273,7 @@ constexpr size_t kFsStatsBufferLength =
V(modulus_string, "modulus") \
V(name_string, "name") \
V(netmask_string, "netmask") \
+ V(next_string, "next") \
V(nistcurve_string, "nistCurve") \
V(nsname_string, "nsname") \
V(ocsp_request_string, "OCSPRequest") \
@@ -353,6 +355,7 @@ constexpr size_t kFsStatsBufferLength =
V(ticketkeycallback_string, "onticketkeycallback") \
V(timeout_string, "timeout") \
V(tls_ticket_string, "tlsTicket") \
+ V(transfer_string, "transfer") \
V(ttl_string, "ttl") \
V(type_string, "type") \
V(uid_string, "uid") \
diff --git a/src/node_messaging.cc b/src/node_messaging.cc
index 46f06b747e..5aec784f60 100644
--- a/src/node_messaging.cc
+++ b/src/node_messaging.cc
@@ -31,6 +31,7 @@ using v8::Object;
using v8::ObjectTemplate;
using v8::SharedArrayBuffer;
using v8::String;
+using v8::Symbol;
using v8::Value;
using v8::ValueDeserializer;
using v8::ValueSerializer;
@@ -304,7 +305,7 @@ class SerializerDelegate : public ValueSerializer::Delegate {
Maybe<bool> Message::Serialize(Environment* env,
Local<Context> context,
Local<Value> input,
- Local<Value> transfer_list_v,
+ const TransferList& transfer_list_v,
Local<Object> source_port) {
HandleScope handle_scope(env->isolate());
Context::Scope context_scope(context);
@@ -317,72 +318,66 @@ Maybe<bool> Message::Serialize(Environment* env,
delegate.serializer = &serializer;
std::vector<Local<ArrayBuffer>> array_buffers;
- if (transfer_list_v->IsArray()) {
- Local<Array> transfer_list = transfer_list_v.As<Array>();
- uint32_t length = transfer_list->Length();
- for (uint32_t i = 0; i < length; ++i) {
- Local<Value> entry;
- if (!transfer_list->Get(context, i).ToLocal(&entry))
- return Nothing<bool>();
- // Currently, we support ArrayBuffers and MessagePorts.
- if (entry->IsArrayBuffer()) {
- Local<ArrayBuffer> ab = entry.As<ArrayBuffer>();
- // If we cannot render the ArrayBuffer unusable in this Isolate and
- // take ownership of its memory, copying the buffer will have to do.
- if (!ab->IsDetachable() || ab->IsExternal() ||
- !env->isolate_data()->uses_node_allocator()) {
- continue;
- }
- if (std::find(array_buffers.begin(), array_buffers.end(), ab) !=
- array_buffers.end()) {
- ThrowDataCloneException(
- context,
- FIXED_ONE_BYTE_STRING(
- env->isolate(),
- "Transfer list contains duplicate ArrayBuffer"));
- return Nothing<bool>();
- }
- // We simply use the array index in the `array_buffers` list as the
- // ID that we write into the serialized buffer.
- uint32_t id = array_buffers.size();
- array_buffers.push_back(ab);
- serializer.TransferArrayBuffer(id, ab);
- continue;
- } else if (env->message_port_constructor_template()
- ->HasInstance(entry)) {
- // Check if the source MessagePort is being transferred.
- if (!source_port.IsEmpty() && entry == source_port) {
- ThrowDataCloneException(
- context,
- FIXED_ONE_BYTE_STRING(env->isolate(),
- "Transfer list contains source port"));
- return Nothing<bool>();
- }
- MessagePort* port = Unwrap<MessagePort>(entry.As<Object>());
- if (port == nullptr || port->IsDetached()) {
- ThrowDataCloneException(
- context,
- FIXED_ONE_BYTE_STRING(
- env->isolate(),
- "MessagePort in transfer list is already detached"));
- return Nothing<bool>();
- }
- if (std::find(delegate.ports_.begin(), delegate.ports_.end(), port) !=
- delegate.ports_.end()) {
- ThrowDataCloneException(
- context,
- FIXED_ONE_BYTE_STRING(
- env->isolate(),
- "Transfer list contains duplicate MessagePort"));
- return Nothing<bool>();
- }
- delegate.ports_.push_back(port);
+ for (uint32_t i = 0; i < transfer_list_v.length(); ++i) {
+ Local<Value> entry = transfer_list_v[i];
+ // Currently, we support ArrayBuffers and MessagePorts.
+ if (entry->IsArrayBuffer()) {
+ Local<ArrayBuffer> ab = entry.As<ArrayBuffer>();
+ // If we cannot render the ArrayBuffer unusable in this Isolate and
+ // take ownership of its memory, copying the buffer will have to do.
+ if (!ab->IsDetachable() || ab->IsExternal() ||
+ !env->isolate_data()->uses_node_allocator()) {
continue;
}
-
- THROW_ERR_INVALID_TRANSFER_OBJECT(env);
- return Nothing<bool>();
+ if (std::find(array_buffers.begin(), array_buffers.end(), ab) !=
+ array_buffers.end()) {
+ ThrowDataCloneException(
+ context,
+ FIXED_ONE_BYTE_STRING(
+ env->isolate(),
+ "Transfer list contains duplicate ArrayBuffer"));
+ return Nothing<bool>();
+ }
+ // We simply use the array index in the `array_buffers` list as the
+ // ID that we write into the serialized buffer.
+ uint32_t id = array_buffers.size();
+ array_buffers.push_back(ab);
+ serializer.TransferArrayBuffer(id, ab);
+ continue;
+ } else if (env->message_port_constructor_template()
+ ->HasInstance(entry)) {
+ // Check if the source MessagePort is being transferred.
+ if (!source_port.IsEmpty() && entry == source_port) {
+ ThrowDataCloneException(
+ context,
+ FIXED_ONE_BYTE_STRING(env->isolate(),
+ "Transfer list contains source port"));
+ return Nothing<bool>();
+ }
+ MessagePort* port = Unwrap<MessagePort>(entry.As<Object>());
+ if (port == nullptr || port->IsDetached()) {
+ ThrowDataCloneException(
+ context,
+ FIXED_ONE_BYTE_STRING(
+ env->isolate(),
+ "MessagePort in transfer list is already detached"));
+ return Nothing<bool>();
+ }
+ if (std::find(delegate.ports_.begin(), delegate.ports_.end(), port) !=
+ delegate.ports_.end()) {
+ ThrowDataCloneException(
+ context,
+ FIXED_ONE_BYTE_STRING(
+ env->isolate(),
+ "Transfer list contains duplicate MessagePort"));
+ return Nothing<bool>();
+ }
+ delegate.ports_.push_back(port);
+ continue;
}
+
+ THROW_ERR_INVALID_TRANSFER_OBJECT(env);
+ return Nothing<bool>();
}
serializer.WriteHeader();
@@ -664,7 +659,7 @@ std::unique_ptr<MessagePortData> MessagePort::Detach() {
Maybe<bool> MessagePort::PostMessage(Environment* env,
Local<Value> message_v,
- Local<Value> transfer_v) {
+ const TransferList& transfer_v) {
Isolate* isolate = env->isolate();
Local<Object> obj = object(isolate);
Local<Context> context = obj->CreationContext();
@@ -705,20 +700,98 @@ Maybe<bool> MessagePort::PostMessage(Environment* env,
return Just(true);
}
+static Maybe<bool> ReadIterable(Environment* env,
+ Local<Context> context,
+ // NOLINTNEXTLINE(runtime/references)
+ TransferList& transfer_list,
+ Local<Value> object) {
+ if (!object->IsObject()) return Just(false);
+
+ if (object->IsArray()) {
+ Local<Array> arr = object.As<Array>();
+ size_t length = arr->Length();
+ transfer_list.AllocateSufficientStorage(length);
+ for (size_t i = 0; i < length; i++) {
+ if (!arr->Get(context, i).ToLocal(&transfer_list[i]))
+ return Nothing<bool>();
+ }
+ return Just(true);
+ }
+
+ Isolate* isolate = env->isolate();
+ Local<Value> iterator_method;
+ if (!object.As<Object>()->Get(context, Symbol::GetIterator(isolate))
+ .ToLocal(&iterator_method)) return Nothing<bool>();
+ if (!iterator_method->IsFunction()) return Just(false);
+
+ Local<Value> iterator;
+ if (!iterator_method.As<Function>()->Call(context, object, 0, nullptr)
+ .ToLocal(&iterator)) return Nothing<bool>();
+ if (!iterator->IsObject()) return Just(false);
+
+ Local<Value> next;
+ if (!iterator.As<Object>()->Get(context, env->next_string()).ToLocal(&next))
+ return Nothing<bool>();
+ if (!next->IsFunction()) return Just(false);
+
+ std::vector<Local<Value>> entries;
+ while (env->can_call_into_js()) {
+ Local<Value> result;
+ if (!next.As<Function>()->Call(context, iterator, 0, nullptr)
+ .ToLocal(&result)) return Nothing<bool>();
+ if (!result->IsObject()) return Just(false);
+
+ Local<Value> done;
+ if (!result.As<Object>()->Get(context, env->done_string()).ToLocal(&done))
+ return Nothing<bool>();
+ if (done->BooleanValue(isolate)) break;
+
+ Local<Value> val;
+ if (!result.As<Object>()->Get(context, env->value_string()).ToLocal(&val))
+ return Nothing<bool>();
+ entries.push_back(val);
+ }
+
+ transfer_list.AllocateSufficientStorage(entries.size());
+ std::copy(entries.begin(), entries.end(), &transfer_list[0]);
+ return Just(true);
+}
+
void MessagePort::PostMessage(const FunctionCallbackInfo<Value>& args) {
Environment* env = Environment::GetCurrent(args);
+ Local<Object> obj = args.This();
+ Local<Context> context = obj->CreationContext();
+
if (args.Length() == 0) {
return THROW_ERR_MISSING_ARGS(env, "Not enough arguments to "
"MessagePort.postMessage");
}
+
if (!args[1]->IsNullOrUndefined() && !args[1]->IsObject()) {
// Browsers ignore null or undefined, and otherwise accept an array or an
// options object.
- // TODO(addaleax): Add support for an options object and generic sequence
- // support.
- // Refs: https://github.com/nodejs/node/pull/28033#discussion_r289964991
return THROW_ERR_INVALID_ARG_TYPE(env,
- "Optional transferList argument must be an array");
+ "Optional transferList argument must be an iterable");
+ }
+
+ TransferList transfer_list;
+ if (args[1]->IsObject()) {
+ bool was_iterable;
+ if (!ReadIterable(env, context, transfer_list, args[1]).To(&was_iterable))
+ return;
+ if (!was_iterable) {
+ Local<Value> transfer_option;
+ if (!args[1].As<Object>()->Get(context, env->transfer_string())
+ .ToLocal(&transfer_option)) return;
+ if (!transfer_option->IsUndefined()) {
+ if (!ReadIterable(env, context, transfer_list, transfer_option)
+ .To(&was_iterable)) return;
+ if (!was_iterable) {
+ return THROW_ERR_INVALID_ARG_TYPE(env,
+ "Optional options.transfer argument must be an iterable");
+ }
+ }
+ }
}
MessagePort* port = Unwrap<MessagePort>(args.This());
@@ -727,13 +800,11 @@ void MessagePort::PostMessage(const FunctionCallbackInfo<Value>& args) {
// transfers.
if (port == nullptr) {
Message msg;
- Local<Object> obj = args.This();
- Local<Context> context = obj->CreationContext();
- USE(msg.Serialize(env, context, args[0], args[1], obj));
+ USE(msg.Serialize(env, context, args[0], transfer_list, obj));
return;
}
- port->PostMessage(env, args[0], args[1]);
+ port->PostMessage(env, args[0], transfer_list);
}
void MessagePort::Start() {
diff --git a/src/node_messaging.h b/src/node_messaging.h
index d9f25a95d7..054521b056 100644
--- a/src/node_messaging.h
+++ b/src/node_messaging.h
@@ -14,6 +14,8 @@ namespace worker {
class MessagePortData;
class MessagePort;
+typedef MaybeStackBuffer<v8::Local<v8::Value>, 8> TransferList;
+
// Represents a single communication message.
class Message : public MemoryRetainer {
public:
@@ -44,7 +46,7 @@ class Message : public MemoryRetainer {
v8::Maybe<bool> Serialize(Environment* env,
v8::Local<v8::Context> context,
v8::Local<v8::Value> input,
- v8::Local<v8::Value> transfer_list,
+ const TransferList& transfer_list,
v8::Local<v8::Object> source_port =
v8::Local<v8::Object>());
@@ -149,7 +151,7 @@ class MessagePort : public HandleWrap {
// serialized with transfers, then silently discarded.
v8::Maybe<bool> PostMessage(Environment* env,
v8::Local<v8::Value> message,
- v8::Local<v8::Value> transfer);
+ const TransferList& transfer);
// Start processing messages on this port as a receiving end.
void Start();
diff --git a/test/parallel/test-worker-message-port-terminate-transfer-list.js b/test/parallel/test-worker-message-port-terminate-transfer-list.js
new file mode 100644
index 0000000000..a066405d9d
--- /dev/null
+++ b/test/parallel/test-worker-message-port-terminate-transfer-list.js
@@ -0,0 +1,26 @@
+'use strict';
+const common = require('../common');
+
+const { parentPort, MessageChannel, Worker } = require('worker_threads');
+
+// Do not use isMainThread so that this test itself can be run inside a Worker.
+if (!process.env.HAS_STARTED_WORKER) {
+ process.env.HAS_STARTED_WORKER = 1;
+ const w = new Worker(__filename);
+ w.once('message', common.mustCall(() => {
+ w.once('message', common.mustNotCall());
+ setTimeout(() => w.terminate(), 100);
+ }));
+} else {
+ const { port1 } = new MessageChannel();
+
+ parentPort.postMessage('ready');
+
+ // Make sure we don’t end up running JS after the infinite loop is broken.
+ port1.postMessage({}, {
+ transfer: (function*() { while (true); })()
+ });
+
+ parentPort.postMessage('UNREACHABLE');
+ process.kill(process.pid, 'SIGINT');
+}
diff --git a/test/parallel/test-worker-message-port.js b/test/parallel/test-worker-message-port.js
index d1c58216fd..d128dc7edb 100644
--- a/test/parallel/test-worker-message-port.js
+++ b/test/parallel/test-worker-message-port.js
@@ -72,22 +72,81 @@ const { MessageChannel, MessagePort } = require('worker_threads');
{
const { port1, port2 } = new MessageChannel();
- port2.on('message', common.mustCall(4));
+ port2.on('message', common.mustCall(6));
port1.postMessage(1, null);
port1.postMessage(2, undefined);
port1.postMessage(3, []);
port1.postMessage(4, {});
+ port1.postMessage(5, { transfer: undefined });
+ port1.postMessage(6, { transfer: [] });
const err = {
constructor: TypeError,
code: 'ERR_INVALID_ARG_TYPE',
- message: 'Optional transferList argument must be an array'
+ message: 'Optional transferList argument must be an iterable'
};
assert.throws(() => port1.postMessage(5, 0), err);
assert.throws(() => port1.postMessage(5, false), err);
assert.throws(() => port1.postMessage(5, 'X'), err);
assert.throws(() => port1.postMessage(5, Symbol('X')), err);
+
+ const err2 = {
+ constructor: TypeError,
+ code: 'ERR_INVALID_ARG_TYPE',
+ message: 'Optional options.transfer argument must be an iterable'
+ };
+
+ assert.throws(() => port1.postMessage(5, { transfer: null }), err2);
+ assert.throws(() => port1.postMessage(5, { transfer: 0 }), err2);
+ assert.throws(() => port1.postMessage(5, { transfer: false }), err2);
+ assert.throws(() => port1.postMessage(5, { transfer: {} }), err2);
+ assert.throws(() => port1.postMessage(5, {
+ transfer: { [Symbol.iterator]() { return {}; } }
+ }), err2);
+ assert.throws(() => port1.postMessage(5, {
+ transfer: { [Symbol.iterator]() { return { next: 42 }; } }
+ }), err2);
+ assert.throws(() => port1.postMessage(5, {
+ transfer: { [Symbol.iterator]() { return { next: null }; } }
+ }), err2);
+ port1.close();
+}
+
+{
+ // Make sure these ArrayBuffers end up detached, i.e. are actually being
+ // transferred because the transfer list provides them.
+ const { port1, port2 } = new MessageChannel();
+ port2.on('message', common.mustCall((msg) => {
+ assert.strictEqual(msg.ab.byteLength, 10);
+ }, 4));
+
+ {
+ const ab = new ArrayBuffer(10);
+ port1.postMessage({ ab }, [ ab ]);
+ assert.strictEqual(ab.byteLength, 0);
+ }
+
+ {
+ const ab = new ArrayBuffer(10);
+ port1.postMessage({ ab }, { transfer: [ ab ] });
+ assert.strictEqual(ab.byteLength, 0);
+ }
+
+ {
+ const ab = new ArrayBuffer(10);
+ port1.postMessage({ ab }, (function*() { yield ab; })());
+ assert.strictEqual(ab.byteLength, 0);
+ }
+
+ {
+ const ab = new ArrayBuffer(10);
+ port1.postMessage({ ab }, {
+ transfer: (function*() { yield ab; })()
+ });
+ assert.strictEqual(ab.byteLength, 0);
+ }
+
port1.close();
}