diff options
author | James M Snell <jasnell@gmail.com> | 2020-11-27 00:54:23 +0300 |
---|---|---|
committer | James M Snell <jasnell@gmail.com> | 2020-12-01 19:02:28 +0300 |
commit | 9e446b3e9f8c8aa0dcfb974f306504f307b8fac2 (patch) | |
tree | 0cfecf19e0435fa40b9741c5eed35d56ae66d5da /src/node_messaging.h | |
parent | 09fd8f13c87a455c833e83f494616d6cc774aafa (diff) |
worker: add experimental BroadcastChannel
Signed-off-by: James M Snell <jasnell@gmail.com>
PR-URL: https://github.com/nodejs/node/pull/36271
Reviewed-By: Anna Henningsen <anna@addaleax.net>
Reviewed-By: Benjamin Gruenbaum <benjamingr@gmail.com>
Diffstat (limited to 'src/node_messaging.h')
-rw-r--r-- | src/node_messaging.h | 76 |
1 files changed, 64 insertions, 12 deletions
diff --git a/src/node_messaging.h b/src/node_messaging.h index 7ef02226480..22c11321ef7 100644 --- a/src/node_messaging.h +++ b/src/node_messaging.h @@ -5,7 +5,11 @@ #include "env.h" #include "node_mutex.h" -#include <list> +#include "v8.h" +#include <deque> +#include <string> +#include <unordered_map> +#include <set> namespace node { namespace worker { @@ -45,6 +49,7 @@ class Message : public MemoryRetainer { // V8 ValueSerializer API. If `payload` is empty, this message indicates // that the receiving message port should close itself. explicit Message(MallocedBuffer<char>&& payload = MallocedBuffer<char>()); + ~Message() = default; Message(Message&& other) = default; Message& operator=(Message&& other) = default; @@ -105,11 +110,58 @@ class Message : public MemoryRetainer { friend class MessagePort; }; +class SiblingGroup { + public: + // Named SiblingGroup, Used for one-to-many BroadcastChannels. + static std::shared_ptr<SiblingGroup> Get(const std::string& name); + + // Anonymous SiblingGroup, Used for one-to-one MessagePort pairs. + SiblingGroup() = default; + explicit SiblingGroup(const std::string& name); + ~SiblingGroup(); + + // Dispatches the Message to the collection of associated + // ports. If there is more than one destination port and + // the Message contains transferables, Dispatch will fail. + // Returns Just(true) if successful and the message was + // dispatched to at least one destination. Returns Just(false) + // if there were no destinations. Returns Nothing<bool>() + // if there was an error. If error is not nullptr, it will + // be set to an error message or warning message as appropriate. + v8::Maybe<bool> Dispatch( + MessagePortData* source, + std::shared_ptr<Message> message, + std::string* error = nullptr); + + void Entangle(MessagePortData* data); + + void Disentangle(MessagePortData* data); + + const std::string& name() const { return name_; } + + size_t size() const { return ports_.size(); } + + private: + std::string name_; + std::set<MessagePortData*> ports_; + Mutex group_mutex_; + + static void CheckSiblingGroup(const std::string& name); + + using Map = + std::unordered_map<std::string, std::weak_ptr<SiblingGroup>>; + + static Mutex groups_mutex_; + static Map groups_; +}; + // This contains all data for a `MessagePort` instance that is not tied to // a specific Environment/Isolate/event loop, for easier transfer between those. class MessagePortData : public TransferData { public: - explicit MessagePortData(MessagePort* owner); + explicit MessagePortData( + MessagePort* owner, + const std::string& name = std::string()); ~MessagePortData() override; MessagePortData(MessagePortData&& other) = delete; @@ -119,7 +171,10 @@ class MessagePortData : public TransferData { // Add a message to the incoming queue and notify the receiver. // This may be called from any thread. - void AddToIncomingQueue(Message&& message); + void AddToIncomingQueue(std::shared_ptr<Message> message); + v8::Maybe<bool> Dispatch( + std::shared_ptr<Message> message, + std::string* error = nullptr); // Turns `a` and `b` into siblings, i.e. connects the sending side of one // to the receiving side of the other. This is not thread-safe. @@ -144,14 +199,9 @@ class MessagePortData : public TransferData { // This mutex protects all fields below it, with the exception of // sibling_. mutable Mutex mutex_; - std::list<Message> incoming_messages_; + std::deque<std::shared_ptr<Message>> incoming_messages_; MessagePort* owner_ = nullptr; - // This mutex protects the sibling_ field and is shared between two entangled - // MessagePorts. If both mutexes are acquired, this one needs to be - // acquired first. - std::shared_ptr<Mutex> sibling_mutex_ = std::make_shared<Mutex>(); - MessagePortData* sibling_ = nullptr; - + std::shared_ptr<SiblingGroup> group_; friend class MessagePort; }; @@ -166,7 +216,8 @@ class MessagePort : public HandleWrap { // creating MessagePort instances. MessagePort(Environment* env, v8::Local<v8::Context> context, - v8::Local<v8::Object> wrap); + v8::Local<v8::Object> wrap, + const std::string& name = std::string()); public: ~MessagePort() override; @@ -175,7 +226,8 @@ class MessagePort : public HandleWrap { // `MessagePortData` object. static MessagePort* New(Environment* env, v8::Local<v8::Context> context, - std::unique_ptr<MessagePortData> data = nullptr); + std::unique_ptr<MessagePortData> data = nullptr, + const std::string& name = std::string()); // Send a message, i.e. deliver it into the sibling's incoming queue. // If this port is closed, or if there is no sibling, this message is |