summaryrefslogtreecommitdiff
path: root/src/node_messaging.h
diff options
context:
space:
mode:
authorAnna Henningsen <anna@addaleax.net>2017-09-05 22:38:32 +0200
committerAnna Henningsen <anna@addaleax.net>2018-06-06 19:43:44 +0200
commite7a2367471177c96f454a18319cf8d2fb25482f9 (patch)
tree9027c9bac22dae6f50ed8222a1a1cbd5e573ffe8 /src/node_messaging.h
parent2e886e9f452e95a4761a3d85540ba561538b4438 (diff)
downloadandroid-node-v8-e7a2367471177c96f454a18319cf8d2fb25482f9.tar.gz
android-node-v8-e7a2367471177c96f454a18319cf8d2fb25482f9.tar.bz2
android-node-v8-e7a2367471177c96f454a18319cf8d2fb25482f9.zip
worker: implement `MessagePort` and `MessageChannel`
Implement `MessagePort` and `MessageChannel` along the lines of the DOM classes of the same names. `MessagePort`s initially support transferring only `ArrayBuffer`s. Thanks to Stephen Belanger for reviewing this change in its original form, to Benjamin Gruenbaum for reviewing the added tests in their original form, and to Olivia Hugger for reviewing the documentation in its original form. Refs: https://github.com/ayojs/ayo/pull/98 PR-URL: https://github.com/nodejs/node/pull/20876 Reviewed-By: Gireesh Punathil <gpunathi@in.ibm.com> Reviewed-By: Benjamin Gruenbaum <benjamingr@gmail.com> Reviewed-By: Shingo Inoue <leko.noor@gmail.com> Reviewed-By: Matteo Collina <matteo.collina@gmail.com> Reviewed-By: Tiancheng "Timothy" Gu <timothygu99@gmail.com> Reviewed-By: John-David Dalton <john.david.dalton@gmail.com> Reviewed-By: Gus Caplan <me@gus.host>
Diffstat (limited to 'src/node_messaging.h')
-rw-r--r--src/node_messaging.h167
1 files changed, 167 insertions, 0 deletions
diff --git a/src/node_messaging.h b/src/node_messaging.h
new file mode 100644
index 0000000000..7bd60163ea
--- /dev/null
+++ b/src/node_messaging.h
@@ -0,0 +1,167 @@
+#ifndef SRC_NODE_MESSAGING_H_
+#define SRC_NODE_MESSAGING_H_
+
+#if defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS
+
+#include "env.h"
+#include "node_mutex.h"
+#include <list>
+#include <memory>
+
+namespace node {
+namespace worker {
+
+class MessagePortData;
+class MessagePort;
+
+// Represents a single communication message.
+class Message {
+ public:
+ explicit Message(MallocedBuffer<char>&& payload = MallocedBuffer<char>());
+
+ Message(Message&& other) = default;
+ Message& operator=(Message&& other) = default;
+ Message& operator=(const Message&) = delete;
+ Message(const Message&) = delete;
+
+ // Deserialize the contained JS value. May only be called once, and only
+ // after Serialize() has been called (e.g. by another thread).
+ v8::MaybeLocal<v8::Value> Deserialize(Environment* env,
+ v8::Local<v8::Context> context);
+
+ // Serialize a JS value, and optionally transfer objects, into this message.
+ // The Message object retains ownership of all transferred objects until
+ // deserialization.
+ v8::Maybe<bool> Serialize(Environment* env,
+ v8::Local<v8::Context> context,
+ v8::Local<v8::Value> input,
+ v8::Local<v8::Value> transfer_list);
+
+ private:
+ MallocedBuffer<char> main_message_buf_;
+ std::vector<MallocedBuffer<char>> array_buffer_contents_;
+
+ friend class MessagePort;
+};
+
+// This contains all data for a `MessagePort` instance that is not tied to
+// a specific Environment/Isolate/event loop, for easier transfer between those.
+class MessagePortData {
+ public:
+ explicit MessagePortData(MessagePort* owner);
+ ~MessagePortData();
+
+ MessagePortData(MessagePortData&& other) = delete;
+ MessagePortData& operator=(MessagePortData&& other) = delete;
+ MessagePortData(const MessagePortData& other) = delete;
+ MessagePortData& operator=(const MessagePortData& other) = delete;
+
+ // Add a message to the incoming queue and notify the receiver.
+ // This may be called from any thread.
+ void AddToIncomingQueue(Message&& message);
+
+ // Returns true if and only this MessagePort is currently not entangled
+ // with another message port.
+ bool IsSiblingClosed() const;
+
+ // Turns `a` and `b` into siblings, i.e. connects the sending side of one
+ // to the receiving side of the other. This is not thread-safe.
+ static void Entangle(MessagePortData* a, MessagePortData* b);
+
+ // Removes any possible sibling. This is thread-safe (it acquires both
+ // `sibling_mutex_` and `mutex_`), and has to be because it is called once
+ // the corresponding JS handle handle wants to close
+ // which can happen on either side of a worker.
+ void Disentangle();
+
+ private:
+ // After disentangling this message port, the owner handle (if any)
+ // is asynchronously triggered, so that it can close down naturally.
+ void PingOwnerAfterDisentanglement();
+
+ // This mutex protects all fields below it, with the exception of
+ // sibling_.
+ mutable Mutex mutex_;
+ bool receiving_messages_ = false;
+ std::list<Message> incoming_messages_;
+ MessagePort* owner_ = nullptr;
+ // This mutex protects the sibling_ field and is shared between two entangled
+ // MessagePorts. If both mutexes are acquired, this one needs to be
+ // acquired first.
+ std::shared_ptr<Mutex> sibling_mutex_ = std::make_shared<Mutex>();
+ MessagePortData* sibling_ = nullptr;
+
+ friend class MessagePort;
+};
+
+// A message port that receives messages from other threads, including
+// the uv_async_t handle that is used to notify the current event loop of
+// new incoming messages.
+class MessagePort : public HandleWrap {
+ public:
+ // Create a new MessagePort. The `context` argument specifies the Context
+ // instance that is used for creating the values emitted from this port.
+ MessagePort(Environment* env,
+ v8::Local<v8::Context> context,
+ v8::Local<v8::Object> wrap);
+ ~MessagePort();
+
+ // Create a new message port instance, optionally over an existing
+ // `MessagePortData` object.
+ static MessagePort* New(Environment* env,
+ v8::Local<v8::Context> context,
+ std::unique_ptr<MessagePortData> data = nullptr);
+
+ // Send a message, i.e. deliver it into the sibling's incoming queue.
+ // If there is no sibling, i.e. this port is closed,
+ // this message is silently discarded.
+ void Send(Message&& message);
+ void Send(const v8::FunctionCallbackInfo<v8::Value>& args);
+ // Deliver a single message into this port's incoming queue.
+ void AddToIncomingQueue(Message&& message);
+
+ // Start processing messages on this port as a receiving end.
+ void Start();
+ // Stop processing messages on this port as a receiving end.
+ void Stop();
+
+ static void New(const v8::FunctionCallbackInfo<v8::Value>& args);
+ static void PostMessage(const v8::FunctionCallbackInfo<v8::Value>& args);
+ static void Start(const v8::FunctionCallbackInfo<v8::Value>& args);
+ static void Stop(const v8::FunctionCallbackInfo<v8::Value>& args);
+
+ // Turns `a` and `b` into siblings, i.e. connects the sending side of one
+ // to the receiving side of the other. This is not thread-safe.
+ static void Entangle(MessagePort* a, MessagePort* b);
+ static void Entangle(MessagePort* a, MessagePortData* b);
+
+ // Detach this port's data for transferring. After this, the MessagePortData
+ // is no longer associated with this handle, although it can still receive
+ // messages.
+ std::unique_ptr<MessagePortData> Detach();
+
+ bool IsSiblingClosed() const;
+
+ size_t self_size() const override;
+
+ private:
+ void OnClose() override;
+ void OnMessage();
+ void TriggerAsync();
+ inline uv_async_t* async();
+
+ std::unique_ptr<MessagePortData> data_ = nullptr;
+
+ friend class MessagePortData;
+};
+
+v8::MaybeLocal<v8::Function> GetMessagePortConstructor(
+ Environment* env, v8::Local<v8::Context> context);
+
+} // namespace worker
+} // namespace node
+
+#endif // defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS
+
+
+#endif // SRC_NODE_MESSAGING_H_