summaryrefslogtreecommitdiff
path: root/src/node_messaging.h
diff options
context:
space:
mode:
Diffstat (limited to 'src/node_messaging.h')
-rw-r--r--src/node_messaging.h167
1 files changed, 167 insertions, 0 deletions
diff --git a/src/node_messaging.h b/src/node_messaging.h
new file mode 100644
index 0000000000..7bd60163ea
--- /dev/null
+++ b/src/node_messaging.h
@@ -0,0 +1,167 @@
+#ifndef SRC_NODE_MESSAGING_H_
+#define SRC_NODE_MESSAGING_H_
+
+#if defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS
+
+#include "env.h"
+#include "node_mutex.h"
+#include <list>
+#include <memory>
+
+namespace node {
+namespace worker {
+
+class MessagePortData;
+class MessagePort;
+
+// Represents a single communication message.
+class Message {
+ public:
+ explicit Message(MallocedBuffer<char>&& payload = MallocedBuffer<char>());
+
+ Message(Message&& other) = default;
+ Message& operator=(Message&& other) = default;
+ Message& operator=(const Message&) = delete;
+ Message(const Message&) = delete;
+
+ // Deserialize the contained JS value. May only be called once, and only
+ // after Serialize() has been called (e.g. by another thread).
+ v8::MaybeLocal<v8::Value> Deserialize(Environment* env,
+ v8::Local<v8::Context> context);
+
+ // Serialize a JS value, and optionally transfer objects, into this message.
+ // The Message object retains ownership of all transferred objects until
+ // deserialization.
+ v8::Maybe<bool> Serialize(Environment* env,
+ v8::Local<v8::Context> context,
+ v8::Local<v8::Value> input,
+ v8::Local<v8::Value> transfer_list);
+
+ private:
+ MallocedBuffer<char> main_message_buf_;
+ std::vector<MallocedBuffer<char>> array_buffer_contents_;
+
+ friend class MessagePort;
+};
+
+// This contains all data for a `MessagePort` instance that is not tied to
+// a specific Environment/Isolate/event loop, for easier transfer between those.
+class MessagePortData {
+ public:
+ explicit MessagePortData(MessagePort* owner);
+ ~MessagePortData();
+
+ MessagePortData(MessagePortData&& other) = delete;
+ MessagePortData& operator=(MessagePortData&& other) = delete;
+ MessagePortData(const MessagePortData& other) = delete;
+ MessagePortData& operator=(const MessagePortData& other) = delete;
+
+ // Add a message to the incoming queue and notify the receiver.
+ // This may be called from any thread.
+ void AddToIncomingQueue(Message&& message);
+
+ // Returns true if and only this MessagePort is currently not entangled
+ // with another message port.
+ bool IsSiblingClosed() const;
+
+ // Turns `a` and `b` into siblings, i.e. connects the sending side of one
+ // to the receiving side of the other. This is not thread-safe.
+ static void Entangle(MessagePortData* a, MessagePortData* b);
+
+ // Removes any possible sibling. This is thread-safe (it acquires both
+ // `sibling_mutex_` and `mutex_`), and has to be because it is called once
+ // the corresponding JS handle handle wants to close
+ // which can happen on either side of a worker.
+ void Disentangle();
+
+ private:
+ // After disentangling this message port, the owner handle (if any)
+ // is asynchronously triggered, so that it can close down naturally.
+ void PingOwnerAfterDisentanglement();
+
+ // This mutex protects all fields below it, with the exception of
+ // sibling_.
+ mutable Mutex mutex_;
+ bool receiving_messages_ = false;
+ std::list<Message> incoming_messages_;
+ MessagePort* owner_ = nullptr;
+ // This mutex protects the sibling_ field and is shared between two entangled
+ // MessagePorts. If both mutexes are acquired, this one needs to be
+ // acquired first.
+ std::shared_ptr<Mutex> sibling_mutex_ = std::make_shared<Mutex>();
+ MessagePortData* sibling_ = nullptr;
+
+ friend class MessagePort;
+};
+
+// A message port that receives messages from other threads, including
+// the uv_async_t handle that is used to notify the current event loop of
+// new incoming messages.
+class MessagePort : public HandleWrap {
+ public:
+ // Create a new MessagePort. The `context` argument specifies the Context
+ // instance that is used for creating the values emitted from this port.
+ MessagePort(Environment* env,
+ v8::Local<v8::Context> context,
+ v8::Local<v8::Object> wrap);
+ ~MessagePort();
+
+ // Create a new message port instance, optionally over an existing
+ // `MessagePortData` object.
+ static MessagePort* New(Environment* env,
+ v8::Local<v8::Context> context,
+ std::unique_ptr<MessagePortData> data = nullptr);
+
+ // Send a message, i.e. deliver it into the sibling's incoming queue.
+ // If there is no sibling, i.e. this port is closed,
+ // this message is silently discarded.
+ void Send(Message&& message);
+ void Send(const v8::FunctionCallbackInfo<v8::Value>& args);
+ // Deliver a single message into this port's incoming queue.
+ void AddToIncomingQueue(Message&& message);
+
+ // Start processing messages on this port as a receiving end.
+ void Start();
+ // Stop processing messages on this port as a receiving end.
+ void Stop();
+
+ static void New(const v8::FunctionCallbackInfo<v8::Value>& args);
+ static void PostMessage(const v8::FunctionCallbackInfo<v8::Value>& args);
+ static void Start(const v8::FunctionCallbackInfo<v8::Value>& args);
+ static void Stop(const v8::FunctionCallbackInfo<v8::Value>& args);
+
+ // Turns `a` and `b` into siblings, i.e. connects the sending side of one
+ // to the receiving side of the other. This is not thread-safe.
+ static void Entangle(MessagePort* a, MessagePort* b);
+ static void Entangle(MessagePort* a, MessagePortData* b);
+
+ // Detach this port's data for transferring. After this, the MessagePortData
+ // is no longer associated with this handle, although it can still receive
+ // messages.
+ std::unique_ptr<MessagePortData> Detach();
+
+ bool IsSiblingClosed() const;
+
+ size_t self_size() const override;
+
+ private:
+ void OnClose() override;
+ void OnMessage();
+ void TriggerAsync();
+ inline uv_async_t* async();
+
+ std::unique_ptr<MessagePortData> data_ = nullptr;
+
+ friend class MessagePortData;
+};
+
+v8::MaybeLocal<v8::Function> GetMessagePortConstructor(
+ Environment* env, v8::Local<v8::Context> context);
+
+} // namespace worker
+} // namespace node
+
+#endif // defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS
+
+
+#endif // SRC_NODE_MESSAGING_H_