Welcome to mirror list, hosted at ThFree Co, Russian Federation.

github.com/nodejs/node.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJames M Snell <jasnell@gmail.com>2020-11-27 00:54:23 +0300
committerJames M Snell <jasnell@gmail.com>2020-12-01 19:02:28 +0300
commit9e446b3e9f8c8aa0dcfb974f306504f307b8fac2 (patch)
tree0cfecf19e0435fa40b9741c5eed35d56ae66d5da /src/node_messaging.cc
parent09fd8f13c87a455c833e83f494616d6cc774aafa (diff)
worker: add experimental BroadcastChannel
Signed-off-by: James M Snell <jasnell@gmail.com> PR-URL: https://github.com/nodejs/node/pull/36271 Reviewed-By: Anna Henningsen <anna@addaleax.net> Reviewed-By: Benjamin Gruenbaum <benjamingr@gmail.com>
Diffstat (limited to 'src/node_messaging.cc')
-rw-r--r--src/node_messaging.cc217
1 files changed, 161 insertions, 56 deletions
diff --git a/src/node_messaging.cc b/src/node_messaging.cc
index db3c24c3f85..74f75071429 100644
--- a/src/node_messaging.cc
+++ b/src/node_messaging.cc
@@ -161,7 +161,6 @@ MaybeLocal<Value> Message::Deserialize(Environment* env,
std::move(shared_array_buffers_[i]));
shared_array_buffers.push_back(sab);
}
- shared_array_buffers_.clear();
DeserializerDelegate delegate(
this, env, host_objects, shared_array_buffers, wasm_modules_);
@@ -178,7 +177,6 @@ MaybeLocal<Value> Message::Deserialize(Environment* env,
ArrayBuffer::New(env->isolate(), std::move(array_buffers_[i]));
deserializer.TransferArrayBuffer(i, ab);
}
- array_buffers_.clear();
if (deserializer.ReadHeader(context).IsNothing())
return {};
@@ -517,7 +515,20 @@ void Message::MemoryInfo(MemoryTracker* tracker) const {
tracker->TrackField("transferables", transferables_);
}
-MessagePortData::MessagePortData(MessagePort* owner) : owner_(owner) { }
+// TODO(@jasnell): The name here will be an empty string if the
+// one-to-one MessageChannel is used. In such cases,
+// SiblingGroup::Get() will return nothing and group_ will be
+// an empty pointer. @addaleax suggests that the code here
+// could be clearer if attaching the SiblingGroup were a
+// separate step rather than part of the constructor here.
+MessagePortData::MessagePortData(
+ MessagePort* owner,
+ const std::string& name)
+ : owner_(owner),
+ group_(SiblingGroup::Get(name)) {
+ if (group_)
+ group_->Entangle(this);
+}
MessagePortData::~MessagePortData() {
CHECK_NULL(owner_);
@@ -529,7 +540,7 @@ void MessagePortData::MemoryInfo(MemoryTracker* tracker) const {
tracker->TrackField("incoming_messages", incoming_messages_);
}
-void MessagePortData::AddToIncomingQueue(Message&& message) {
+void MessagePortData::AddToIncomingQueue(std::shared_ptr<Message> message) {
// This function will be called by other threads.
Mutex::ScopedLock lock(mutex_);
incoming_messages_.emplace_back(std::move(message));
@@ -541,32 +552,17 @@ void MessagePortData::AddToIncomingQueue(Message&& message) {
}
void MessagePortData::Entangle(MessagePortData* a, MessagePortData* b) {
- CHECK_NULL(a->sibling_);
- CHECK_NULL(b->sibling_);
- a->sibling_ = b;
- b->sibling_ = a;
- a->sibling_mutex_ = b->sibling_mutex_;
+ CHECK(!a->group_);
+ CHECK(!b->group_);
+ b->group_ = a->group_ = std::make_shared<SiblingGroup>();
+ a->group_->Entangle(a);
+ a->group_->Entangle(b);
}
void MessagePortData::Disentangle() {
- // Grab a copy of the sibling mutex, then replace it so that each sibling
- // has its own sibling_mutex_ now.
- std::shared_ptr<Mutex> sibling_mutex = sibling_mutex_;
- Mutex::ScopedLock sibling_lock(*sibling_mutex);
- sibling_mutex_ = std::make_shared<Mutex>();
-
- MessagePortData* sibling = sibling_;
- if (sibling_ != nullptr) {
- sibling_->sibling_ = nullptr;
- sibling_ = nullptr;
- }
-
- // We close MessagePorts after disentanglement, so we enqueue a corresponding
- // message and trigger the corresponding uv_async_t to let them know that
- // this happened.
- AddToIncomingQueue(Message());
- if (sibling != nullptr) {
- sibling->AddToIncomingQueue(Message());
+ if (group_) {
+ group_->Disentangle(this);
+ group_.reset();
}
}
@@ -576,12 +572,13 @@ MessagePort::~MessagePort() {
MessagePort::MessagePort(Environment* env,
Local<Context> context,
- Local<Object> wrap)
+ Local<Object> wrap,
+ const std::string& name)
: HandleWrap(env,
wrap,
reinterpret_cast<uv_handle_t*>(&async_),
AsyncWrap::PROVIDER_MESSAGEPORT),
- data_(new MessagePortData(this)) {
+ data_(new MessagePortData(this, name)) {
auto onmessage = [](uv_async_t* handle) {
// Called when data has been put into the queue.
MessagePort* channel = ContainerOf(&MessagePort::async_, handle);
@@ -647,7 +644,8 @@ void MessagePort::New(const FunctionCallbackInfo<Value>& args) {
MessagePort* MessagePort::New(
Environment* env,
Local<Context> context,
- std::unique_ptr<MessagePortData> data) {
+ std::unique_ptr<MessagePortData> data,
+ const std::string& name) {
Context::Scope context_scope(context);
Local<FunctionTemplate> ctor_templ = GetMessagePortConstructorTemplate(env);
@@ -656,7 +654,7 @@ MessagePort* MessagePort::New(
Local<Object> instance;
if (!ctor_templ->InstanceTemplate()->NewInstance(context).ToLocal(&instance))
return nullptr;
- MessagePort* port = new MessagePort(env, context, instance);
+ MessagePort* port = new MessagePort(env, context, instance, name);
CHECK_NOT_NULL(port);
if (port->IsHandleClosing()) {
// Construction failed with an exception.
@@ -681,7 +679,7 @@ MessagePort* MessagePort::New(
MaybeLocal<Value> MessagePort::ReceiveMessage(Local<Context> context,
bool only_if_receiving) {
- Message received;
+ std::shared_ptr<Message> received;
{
// Get the head of the message queue.
Mutex::ScopedLock lock(data_->mutex_);
@@ -695,22 +693,22 @@ MaybeLocal<Value> MessagePort::ReceiveMessage(Local<Context> context,
// receive is not the final "close" message.
if (data_->incoming_messages_.empty() ||
(!wants_message &&
- !data_->incoming_messages_.front().IsCloseMessage())) {
+ !data_->incoming_messages_.front()->IsCloseMessage())) {
return env()->no_message_symbol();
}
- received = std::move(data_->incoming_messages_.front());
+ received = data_->incoming_messages_.front();
data_->incoming_messages_.pop_front();
}
- if (received.IsCloseMessage()) {
+ if (received->IsCloseMessage()) {
Close();
return env()->no_message_symbol();
}
if (!env()->can_call_into_js()) return MaybeLocal<Value>();
- return received.Deserialize(env(), context);
+ return received->Deserialize(env(), context);
}
void MessagePort::OnMessage() {
@@ -829,13 +827,13 @@ Maybe<bool> MessagePort::PostMessage(Environment* env,
Local<Object> obj = object(isolate);
Local<Context> context = obj->CreationContext();
- Message msg;
+ std::shared_ptr<Message> msg = std::make_shared<Message>();
// Per spec, we need to both check if transfer list has the source port, and
// serialize the input message, even if the MessagePort is closed or detached.
Maybe<bool> serialization_maybe =
- msg.Serialize(env, context, message_v, transfer_v, obj);
+ msg->Serialize(env, context, message_v, transfer_v, obj);
if (data_ == nullptr) {
return serialization_maybe;
}
@@ -843,26 +841,26 @@ Maybe<bool> MessagePort::PostMessage(Environment* env,
return Nothing<bool>();
}
- Mutex::ScopedLock lock(*data_->sibling_mutex_);
- bool doomed = false;
+ std::string error;
+ Maybe<bool> res = data_->Dispatch(msg, &error);
+ if (res.IsNothing())
+ return res;
- // Check if the target port is posted to itself.
- if (data_->sibling_ != nullptr) {
- for (const auto& transferable : msg.transferables()) {
- if (data_->sibling_ == transferable.get()) {
- doomed = true;
- ProcessEmitWarning(env, "The target port was posted to itself, and "
- "the communication channel was lost");
- break;
- }
- }
- }
+ if (!error.empty())
+ ProcessEmitWarning(env, error.c_str());
- if (data_->sibling_ == nullptr || doomed)
- return Just(true);
+ return res;
+}
- data_->sibling_->AddToIncomingQueue(std::move(msg));
- return Just(true);
+Maybe<bool> MessagePortData::Dispatch(
+ std::shared_ptr<Message> message,
+ std::string* error) {
+ if (!group_) {
+ if (error != nullptr)
+ *error = "MessagePortData is not entangled.";
+ return Nothing<bool>();
+ }
+ return group_->Dispatch(this, message, error);
}
static Maybe<bool> ReadIterable(Environment* env,
@@ -969,7 +967,9 @@ void MessagePort::PostMessage(const FunctionCallbackInfo<Value>& args) {
return;
}
- port->PostMessage(env, args[0], transfer_list);
+ Maybe<bool> res = port->PostMessage(env, args[0], transfer_list);
+ if (res.IsJust())
+ args.GetReturnValue().Set(res.FromJust());
}
void MessagePort::Start() {
@@ -1273,6 +1273,99 @@ Maybe<bool> JSTransferable::Data::FinalizeTransferWrite(
return ret;
}
+std::shared_ptr<SiblingGroup> SiblingGroup::Get(const std::string& name) {
+ if (name.empty()) return {};
+ Mutex::ScopedLock lock(SiblingGroup::groups_mutex_);
+ std::shared_ptr<SiblingGroup> group;
+ auto i = groups_.find(name);
+ if (i == groups_.end() || i->second.expired()) {
+ group = std::make_shared<SiblingGroup>(name);
+ groups_[name] = group;
+ } else {
+ group = i->second.lock();
+ }
+ return group;
+}
+
+void SiblingGroup::CheckSiblingGroup(const std::string& name) {
+ Mutex::ScopedLock lock(SiblingGroup::groups_mutex_);
+ auto i = groups_.find(name);
+ if (i != groups_.end() && i->second.expired())
+ groups_.erase(name);
+}
+
+SiblingGroup::SiblingGroup(const std::string& name)
+ : name_(name) { }
+
+SiblingGroup::~SiblingGroup() {
+ // If this is a named group, check to see if we can remove the group
+ if (!name_.empty())
+ CheckSiblingGroup(name_);
+}
+
+Maybe<bool> SiblingGroup::Dispatch(
+ MessagePortData* source,
+ std::shared_ptr<Message> message,
+ std::string* error) {
+
+ Mutex::ScopedLock lock(group_mutex_);
+
+ // The source MessagePortData is not part of this group.
+ if (ports_.find(source) == ports_.end()) {
+ if (error != nullptr)
+ *error = "Source MessagePort is not entangled with this group.";
+ return Nothing<bool>();
+ }
+
+ // There are no destination ports.
+ if (size() <= 1)
+ return Just(false);
+
+ // Transferables cannot be used when there is more
+ // than a single destination.
+ if (size() > 2 && message->transferables().size()) {
+ if (error != nullptr)
+ *error = "Transferables cannot be used with multiple destinations.";
+ return Nothing<bool>();
+ }
+
+ for (MessagePortData* port : ports_) {
+ if (port == source)
+ continue;
+ // This loop should only be entered if there's only a single destination
+ for (const auto& transferable : message->transferables()) {
+ if (port == transferable.get()) {
+ if (error != nullptr) {
+ *error = "The target port was posted to itself, and the "
+ "communication channel was lost";
+ }
+ return Just(true);
+ }
+ }
+ port->AddToIncomingQueue(message);
+ }
+
+ return Just(true);
+}
+
+void SiblingGroup::Entangle(MessagePortData* data) {
+ Mutex::ScopedLock lock(group_mutex_);
+ ports_.insert(data);
+}
+
+void SiblingGroup::Disentangle(MessagePortData* data) {
+ Mutex::ScopedLock lock(group_mutex_);
+ ports_.erase(data);
+
+ data->AddToIncomingQueue(std::make_shared<Message>());
+ // If this is an anonymous group and there's another port, close it.
+ if (size() == 1 && name_.empty())
+ (*(ports_.begin()))->AddToIncomingQueue(std::make_shared<Message>());
+}
+
+SiblingGroup::Map SiblingGroup::groups_;
+Mutex SiblingGroup::groups_mutex_;
+
namespace {
static void SetDeserializerCreateObjectFunction(
@@ -1308,6 +1401,16 @@ static void MessageChannel(const FunctionCallbackInfo<Value>& args) {
.Check();
}
+static void BroadcastChannel(const FunctionCallbackInfo<Value>& args) {
+ CHECK(args[0]->IsString());
+ Environment* env = Environment::GetCurrent(args);
+ Context::Scope context_scope(env->context());
+ Utf8Value name(env->isolate(), args[0]);
+ MessagePort* port =
+ MessagePort::New(env, env->context(), nullptr, std::string(*name));
+ args.GetReturnValue().Set(port->object());
+}
+
static void InitMessaging(Local<Object> target,
Local<Value> unused,
Local<Context> context,
@@ -1352,6 +1455,7 @@ static void InitMessaging(Local<Object> target,
MessagePort::MoveToContext);
env->SetMethod(target, "setDeserializerCreateObjectFunction",
SetDeserializerCreateObjectFunction);
+ env->SetMethod(target, "broadcastChannel", BroadcastChannel);
{
Local<Function> domexception = GetDOMException(context).ToLocalChecked();
@@ -1365,6 +1469,7 @@ static void InitMessaging(Local<Object> target,
static void RegisterExternalReferences(ExternalReferenceRegistry* registry) {
registry->Register(MessageChannel);
+ registry->Register(BroadcastChannel);
registry->Register(JSTransferable::New);
registry->Register(MessagePort::New);
registry->Register(MessagePort::PostMessage);