diff options
Diffstat (limited to 'src/node_messaging.h')
-rw-r--r-- | src/node_messaging.h | 167 |
1 files changed, 167 insertions, 0 deletions
diff --git a/src/node_messaging.h b/src/node_messaging.h new file mode 100644 index 0000000000..7bd60163ea --- /dev/null +++ b/src/node_messaging.h @@ -0,0 +1,167 @@ +#ifndef SRC_NODE_MESSAGING_H_ +#define SRC_NODE_MESSAGING_H_ + +#if defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS + +#include "env.h" +#include "node_mutex.h" +#include <list> +#include <memory> + +namespace node { +namespace worker { + +class MessagePortData; +class MessagePort; + +// Represents a single communication message. +class Message { + public: + explicit Message(MallocedBuffer<char>&& payload = MallocedBuffer<char>()); + + Message(Message&& other) = default; + Message& operator=(Message&& other) = default; + Message& operator=(const Message&) = delete; + Message(const Message&) = delete; + + // Deserialize the contained JS value. May only be called once, and only + // after Serialize() has been called (e.g. by another thread). + v8::MaybeLocal<v8::Value> Deserialize(Environment* env, + v8::Local<v8::Context> context); + + // Serialize a JS value, and optionally transfer objects, into this message. + // The Message object retains ownership of all transferred objects until + // deserialization. + v8::Maybe<bool> Serialize(Environment* env, + v8::Local<v8::Context> context, + v8::Local<v8::Value> input, + v8::Local<v8::Value> transfer_list); + + private: + MallocedBuffer<char> main_message_buf_; + std::vector<MallocedBuffer<char>> array_buffer_contents_; + + friend class MessagePort; +}; + +// This contains all data for a `MessagePort` instance that is not tied to +// a specific Environment/Isolate/event loop, for easier transfer between those. +class MessagePortData { + public: + explicit MessagePortData(MessagePort* owner); + ~MessagePortData(); + + MessagePortData(MessagePortData&& other) = delete; + MessagePortData& operator=(MessagePortData&& other) = delete; + MessagePortData(const MessagePortData& other) = delete; + MessagePortData& operator=(const MessagePortData& other) = delete; + + // Add a message to the incoming queue and notify the receiver. + // This may be called from any thread. + void AddToIncomingQueue(Message&& message); + + // Returns true if and only this MessagePort is currently not entangled + // with another message port. + bool IsSiblingClosed() const; + + // Turns `a` and `b` into siblings, i.e. connects the sending side of one + // to the receiving side of the other. This is not thread-safe. + static void Entangle(MessagePortData* a, MessagePortData* b); + + // Removes any possible sibling. This is thread-safe (it acquires both + // `sibling_mutex_` and `mutex_`), and has to be because it is called once + // the corresponding JS handle handle wants to close + // which can happen on either side of a worker. + void Disentangle(); + + private: + // After disentangling this message port, the owner handle (if any) + // is asynchronously triggered, so that it can close down naturally. + void PingOwnerAfterDisentanglement(); + + // This mutex protects all fields below it, with the exception of + // sibling_. + mutable Mutex mutex_; + bool receiving_messages_ = false; + std::list<Message> incoming_messages_; + MessagePort* owner_ = nullptr; + // This mutex protects the sibling_ field and is shared between two entangled + // MessagePorts. If both mutexes are acquired, this one needs to be + // acquired first. + std::shared_ptr<Mutex> sibling_mutex_ = std::make_shared<Mutex>(); + MessagePortData* sibling_ = nullptr; + + friend class MessagePort; +}; + +// A message port that receives messages from other threads, including +// the uv_async_t handle that is used to notify the current event loop of +// new incoming messages. +class MessagePort : public HandleWrap { + public: + // Create a new MessagePort. The `context` argument specifies the Context + // instance that is used for creating the values emitted from this port. + MessagePort(Environment* env, + v8::Local<v8::Context> context, + v8::Local<v8::Object> wrap); + ~MessagePort(); + + // Create a new message port instance, optionally over an existing + // `MessagePortData` object. + static MessagePort* New(Environment* env, + v8::Local<v8::Context> context, + std::unique_ptr<MessagePortData> data = nullptr); + + // Send a message, i.e. deliver it into the sibling's incoming queue. + // If there is no sibling, i.e. this port is closed, + // this message is silently discarded. + void Send(Message&& message); + void Send(const v8::FunctionCallbackInfo<v8::Value>& args); + // Deliver a single message into this port's incoming queue. + void AddToIncomingQueue(Message&& message); + + // Start processing messages on this port as a receiving end. + void Start(); + // Stop processing messages on this port as a receiving end. + void Stop(); + + static void New(const v8::FunctionCallbackInfo<v8::Value>& args); + static void PostMessage(const v8::FunctionCallbackInfo<v8::Value>& args); + static void Start(const v8::FunctionCallbackInfo<v8::Value>& args); + static void Stop(const v8::FunctionCallbackInfo<v8::Value>& args); + + // Turns `a` and `b` into siblings, i.e. connects the sending side of one + // to the receiving side of the other. This is not thread-safe. + static void Entangle(MessagePort* a, MessagePort* b); + static void Entangle(MessagePort* a, MessagePortData* b); + + // Detach this port's data for transferring. After this, the MessagePortData + // is no longer associated with this handle, although it can still receive + // messages. + std::unique_ptr<MessagePortData> Detach(); + + bool IsSiblingClosed() const; + + size_t self_size() const override; + + private: + void OnClose() override; + void OnMessage(); + void TriggerAsync(); + inline uv_async_t* async(); + + std::unique_ptr<MessagePortData> data_ = nullptr; + + friend class MessagePortData; +}; + +v8::MaybeLocal<v8::Function> GetMessagePortConstructor( + Environment* env, v8::Local<v8::Context> context); + +} // namespace worker +} // namespace node + +#endif // defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS + + +#endif // SRC_NODE_MESSAGING_H_ |