Welcome to mirror list, hosted at ThFree Co, Russian Federation.

github.com/nodejs/node.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
Diffstat (limited to 'src/node_messaging.h')
-rw-r--r--src/node_messaging.h76
1 files changed, 64 insertions, 12 deletions
diff --git a/src/node_messaging.h b/src/node_messaging.h
index 7ef02226480..22c11321ef7 100644
--- a/src/node_messaging.h
+++ b/src/node_messaging.h
@@ -5,7 +5,11 @@
#include "env.h"
#include "node_mutex.h"
-#include <list>
+#include "v8.h"
+#include <deque>
+#include <string>
+#include <unordered_map>
+#include <set>
namespace node {
namespace worker {
@@ -45,6 +49,7 @@ class Message : public MemoryRetainer {
// V8 ValueSerializer API. If `payload` is empty, this message indicates
// that the receiving message port should close itself.
explicit Message(MallocedBuffer<char>&& payload = MallocedBuffer<char>());
+ ~Message() = default;
Message(Message&& other) = default;
Message& operator=(Message&& other) = default;
@@ -105,11 +110,58 @@ class Message : public MemoryRetainer {
friend class MessagePort;
};
+class SiblingGroup {
+ public:
+ // Named SiblingGroup, Used for one-to-many BroadcastChannels.
+ static std::shared_ptr<SiblingGroup> Get(const std::string& name);
+
+ // Anonymous SiblingGroup, Used for one-to-one MessagePort pairs.
+ SiblingGroup() = default;
+ explicit SiblingGroup(const std::string& name);
+ ~SiblingGroup();
+
+ // Dispatches the Message to the collection of associated
+ // ports. If there is more than one destination port and
+ // the Message contains transferables, Dispatch will fail.
+ // Returns Just(true) if successful and the message was
+ // dispatched to at least one destination. Returns Just(false)
+ // if there were no destinations. Returns Nothing<bool>()
+ // if there was an error. If error is not nullptr, it will
+ // be set to an error message or warning message as appropriate.
+ v8::Maybe<bool> Dispatch(
+ MessagePortData* source,
+ std::shared_ptr<Message> message,
+ std::string* error = nullptr);
+
+ void Entangle(MessagePortData* data);
+
+ void Disentangle(MessagePortData* data);
+
+ const std::string& name() const { return name_; }
+
+ size_t size() const { return ports_.size(); }
+
+ private:
+ std::string name_;
+ std::set<MessagePortData*> ports_;
+ Mutex group_mutex_;
+
+ static void CheckSiblingGroup(const std::string& name);
+
+ using Map =
+ std::unordered_map<std::string, std::weak_ptr<SiblingGroup>>;
+
+ static Mutex groups_mutex_;
+ static Map groups_;
+};
+
// This contains all data for a `MessagePort` instance that is not tied to
// a specific Environment/Isolate/event loop, for easier transfer between those.
class MessagePortData : public TransferData {
public:
- explicit MessagePortData(MessagePort* owner);
+ explicit MessagePortData(
+ MessagePort* owner,
+ const std::string& name = std::string());
~MessagePortData() override;
MessagePortData(MessagePortData&& other) = delete;
@@ -119,7 +171,10 @@ class MessagePortData : public TransferData {
// Add a message to the incoming queue and notify the receiver.
// This may be called from any thread.
- void AddToIncomingQueue(Message&& message);
+ void AddToIncomingQueue(std::shared_ptr<Message> message);
+ v8::Maybe<bool> Dispatch(
+ std::shared_ptr<Message> message,
+ std::string* error = nullptr);
// Turns `a` and `b` into siblings, i.e. connects the sending side of one
// to the receiving side of the other. This is not thread-safe.
@@ -144,14 +199,9 @@ class MessagePortData : public TransferData {
// This mutex protects all fields below it, with the exception of
// sibling_.
mutable Mutex mutex_;
- std::list<Message> incoming_messages_;
+ std::deque<std::shared_ptr<Message>> incoming_messages_;
MessagePort* owner_ = nullptr;
- // This mutex protects the sibling_ field and is shared between two entangled
- // MessagePorts. If both mutexes are acquired, this one needs to be
- // acquired first.
- std::shared_ptr<Mutex> sibling_mutex_ = std::make_shared<Mutex>();
- MessagePortData* sibling_ = nullptr;
-
+ std::shared_ptr<SiblingGroup> group_;
friend class MessagePort;
};
@@ -166,7 +216,8 @@ class MessagePort : public HandleWrap {
// creating MessagePort instances.
MessagePort(Environment* env,
v8::Local<v8::Context> context,
- v8::Local<v8::Object> wrap);
+ v8::Local<v8::Object> wrap,
+ const std::string& name = std::string());
public:
~MessagePort() override;
@@ -175,7 +226,8 @@ class MessagePort : public HandleWrap {
// `MessagePortData` object.
static MessagePort* New(Environment* env,
v8::Local<v8::Context> context,
- std::unique_ptr<MessagePortData> data = nullptr);
+ std::unique_ptr<MessagePortData> data = nullptr,
+ const std::string& name = std::string());
// Send a message, i.e. deliver it into the sibling's incoming queue.
// If this port is closed, or if there is no sibling, this message is