From 9e446b3e9f8c8aa0dcfb974f306504f307b8fac2 Mon Sep 17 00:00:00 2001 From: James M Snell Date: Thu, 26 Nov 2020 13:54:23 -0800 Subject: worker: add experimental BroadcastChannel Signed-off-by: James M Snell PR-URL: https://github.com/nodejs/node/pull/36271 Reviewed-By: Anna Henningsen Reviewed-By: Benjamin Gruenbaum --- src/node_messaging.cc | 217 +++++++++++++++++++++++++++++++++++++------------- 1 file changed, 161 insertions(+), 56 deletions(-) (limited to 'src/node_messaging.cc') diff --git a/src/node_messaging.cc b/src/node_messaging.cc index db3c24c3f85..74f75071429 100644 --- a/src/node_messaging.cc +++ b/src/node_messaging.cc @@ -161,7 +161,6 @@ MaybeLocal Message::Deserialize(Environment* env, std::move(shared_array_buffers_[i])); shared_array_buffers.push_back(sab); } - shared_array_buffers_.clear(); DeserializerDelegate delegate( this, env, host_objects, shared_array_buffers, wasm_modules_); @@ -178,7 +177,6 @@ MaybeLocal Message::Deserialize(Environment* env, ArrayBuffer::New(env->isolate(), std::move(array_buffers_[i])); deserializer.TransferArrayBuffer(i, ab); } - array_buffers_.clear(); if (deserializer.ReadHeader(context).IsNothing()) return {}; @@ -517,7 +515,20 @@ void Message::MemoryInfo(MemoryTracker* tracker) const { tracker->TrackField("transferables", transferables_); } -MessagePortData::MessagePortData(MessagePort* owner) : owner_(owner) { } +// TODO(@jasnell): The name here will be an empty string if the +// one-to-one MessageChannel is used. In such cases, +// SiblingGroup::Get() will return nothing and group_ will be +// an empty pointer. @addaleax suggests that the code here +// could be clearer if attaching the SiblingGroup were a +// separate step rather than part of the constructor here. +MessagePortData::MessagePortData( + MessagePort* owner, + const std::string& name) + : owner_(owner), + group_(SiblingGroup::Get(name)) { + if (group_) + group_->Entangle(this); +} MessagePortData::~MessagePortData() { CHECK_NULL(owner_); @@ -529,7 +540,7 @@ void MessagePortData::MemoryInfo(MemoryTracker* tracker) const { tracker->TrackField("incoming_messages", incoming_messages_); } -void MessagePortData::AddToIncomingQueue(Message&& message) { +void MessagePortData::AddToIncomingQueue(std::shared_ptr message) { // This function will be called by other threads. Mutex::ScopedLock lock(mutex_); incoming_messages_.emplace_back(std::move(message)); @@ -541,32 +552,17 @@ void MessagePortData::AddToIncomingQueue(Message&& message) { } void MessagePortData::Entangle(MessagePortData* a, MessagePortData* b) { - CHECK_NULL(a->sibling_); - CHECK_NULL(b->sibling_); - a->sibling_ = b; - b->sibling_ = a; - a->sibling_mutex_ = b->sibling_mutex_; + CHECK(!a->group_); + CHECK(!b->group_); + b->group_ = a->group_ = std::make_shared(); + a->group_->Entangle(a); + a->group_->Entangle(b); } void MessagePortData::Disentangle() { - // Grab a copy of the sibling mutex, then replace it so that each sibling - // has its own sibling_mutex_ now. - std::shared_ptr sibling_mutex = sibling_mutex_; - Mutex::ScopedLock sibling_lock(*sibling_mutex); - sibling_mutex_ = std::make_shared(); - - MessagePortData* sibling = sibling_; - if (sibling_ != nullptr) { - sibling_->sibling_ = nullptr; - sibling_ = nullptr; - } - - // We close MessagePorts after disentanglement, so we enqueue a corresponding - // message and trigger the corresponding uv_async_t to let them know that - // this happened. - AddToIncomingQueue(Message()); - if (sibling != nullptr) { - sibling->AddToIncomingQueue(Message()); + if (group_) { + group_->Disentangle(this); + group_.reset(); } } @@ -576,12 +572,13 @@ MessagePort::~MessagePort() { MessagePort::MessagePort(Environment* env, Local context, - Local wrap) + Local wrap, + const std::string& name) : HandleWrap(env, wrap, reinterpret_cast(&async_), AsyncWrap::PROVIDER_MESSAGEPORT), - data_(new MessagePortData(this)) { + data_(new MessagePortData(this, name)) { auto onmessage = [](uv_async_t* handle) { // Called when data has been put into the queue. MessagePort* channel = ContainerOf(&MessagePort::async_, handle); @@ -647,7 +644,8 @@ void MessagePort::New(const FunctionCallbackInfo& args) { MessagePort* MessagePort::New( Environment* env, Local context, - std::unique_ptr data) { + std::unique_ptr data, + const std::string& name) { Context::Scope context_scope(context); Local ctor_templ = GetMessagePortConstructorTemplate(env); @@ -656,7 +654,7 @@ MessagePort* MessagePort::New( Local instance; if (!ctor_templ->InstanceTemplate()->NewInstance(context).ToLocal(&instance)) return nullptr; - MessagePort* port = new MessagePort(env, context, instance); + MessagePort* port = new MessagePort(env, context, instance, name); CHECK_NOT_NULL(port); if (port->IsHandleClosing()) { // Construction failed with an exception. @@ -681,7 +679,7 @@ MessagePort* MessagePort::New( MaybeLocal MessagePort::ReceiveMessage(Local context, bool only_if_receiving) { - Message received; + std::shared_ptr received; { // Get the head of the message queue. Mutex::ScopedLock lock(data_->mutex_); @@ -695,22 +693,22 @@ MaybeLocal MessagePort::ReceiveMessage(Local context, // receive is not the final "close" message. if (data_->incoming_messages_.empty() || (!wants_message && - !data_->incoming_messages_.front().IsCloseMessage())) { + !data_->incoming_messages_.front()->IsCloseMessage())) { return env()->no_message_symbol(); } - received = std::move(data_->incoming_messages_.front()); + received = data_->incoming_messages_.front(); data_->incoming_messages_.pop_front(); } - if (received.IsCloseMessage()) { + if (received->IsCloseMessage()) { Close(); return env()->no_message_symbol(); } if (!env()->can_call_into_js()) return MaybeLocal(); - return received.Deserialize(env(), context); + return received->Deserialize(env(), context); } void MessagePort::OnMessage() { @@ -829,13 +827,13 @@ Maybe MessagePort::PostMessage(Environment* env, Local obj = object(isolate); Local context = obj->CreationContext(); - Message msg; + std::shared_ptr msg = std::make_shared(); // Per spec, we need to both check if transfer list has the source port, and // serialize the input message, even if the MessagePort is closed or detached. Maybe serialization_maybe = - msg.Serialize(env, context, message_v, transfer_v, obj); + msg->Serialize(env, context, message_v, transfer_v, obj); if (data_ == nullptr) { return serialization_maybe; } @@ -843,26 +841,26 @@ Maybe MessagePort::PostMessage(Environment* env, return Nothing(); } - Mutex::ScopedLock lock(*data_->sibling_mutex_); - bool doomed = false; + std::string error; + Maybe res = data_->Dispatch(msg, &error); + if (res.IsNothing()) + return res; - // Check if the target port is posted to itself. - if (data_->sibling_ != nullptr) { - for (const auto& transferable : msg.transferables()) { - if (data_->sibling_ == transferable.get()) { - doomed = true; - ProcessEmitWarning(env, "The target port was posted to itself, and " - "the communication channel was lost"); - break; - } - } - } + if (!error.empty()) + ProcessEmitWarning(env, error.c_str()); - if (data_->sibling_ == nullptr || doomed) - return Just(true); + return res; +} - data_->sibling_->AddToIncomingQueue(std::move(msg)); - return Just(true); +Maybe MessagePortData::Dispatch( + std::shared_ptr message, + std::string* error) { + if (!group_) { + if (error != nullptr) + *error = "MessagePortData is not entangled."; + return Nothing(); + } + return group_->Dispatch(this, message, error); } static Maybe ReadIterable(Environment* env, @@ -969,7 +967,9 @@ void MessagePort::PostMessage(const FunctionCallbackInfo& args) { return; } - port->PostMessage(env, args[0], transfer_list); + Maybe res = port->PostMessage(env, args[0], transfer_list); + if (res.IsJust()) + args.GetReturnValue().Set(res.FromJust()); } void MessagePort::Start() { @@ -1273,6 +1273,99 @@ Maybe JSTransferable::Data::FinalizeTransferWrite( return ret; } +std::shared_ptr SiblingGroup::Get(const std::string& name) { + if (name.empty()) return {}; + Mutex::ScopedLock lock(SiblingGroup::groups_mutex_); + std::shared_ptr group; + auto i = groups_.find(name); + if (i == groups_.end() || i->second.expired()) { + group = std::make_shared(name); + groups_[name] = group; + } else { + group = i->second.lock(); + } + return group; +} + +void SiblingGroup::CheckSiblingGroup(const std::string& name) { + Mutex::ScopedLock lock(SiblingGroup::groups_mutex_); + auto i = groups_.find(name); + if (i != groups_.end() && i->second.expired()) + groups_.erase(name); +} + +SiblingGroup::SiblingGroup(const std::string& name) + : name_(name) { } + +SiblingGroup::~SiblingGroup() { + // If this is a named group, check to see if we can remove the group + if (!name_.empty()) + CheckSiblingGroup(name_); +} + +Maybe SiblingGroup::Dispatch( + MessagePortData* source, + std::shared_ptr message, + std::string* error) { + + Mutex::ScopedLock lock(group_mutex_); + + // The source MessagePortData is not part of this group. + if (ports_.find(source) == ports_.end()) { + if (error != nullptr) + *error = "Source MessagePort is not entangled with this group."; + return Nothing(); + } + + // There are no destination ports. + if (size() <= 1) + return Just(false); + + // Transferables cannot be used when there is more + // than a single destination. + if (size() > 2 && message->transferables().size()) { + if (error != nullptr) + *error = "Transferables cannot be used with multiple destinations."; + return Nothing(); + } + + for (MessagePortData* port : ports_) { + if (port == source) + continue; + // This loop should only be entered if there's only a single destination + for (const auto& transferable : message->transferables()) { + if (port == transferable.get()) { + if (error != nullptr) { + *error = "The target port was posted to itself, and the " + "communication channel was lost"; + } + return Just(true); + } + } + port->AddToIncomingQueue(message); + } + + return Just(true); +} + +void SiblingGroup::Entangle(MessagePortData* data) { + Mutex::ScopedLock lock(group_mutex_); + ports_.insert(data); +} + +void SiblingGroup::Disentangle(MessagePortData* data) { + Mutex::ScopedLock lock(group_mutex_); + ports_.erase(data); + + data->AddToIncomingQueue(std::make_shared()); + // If this is an anonymous group and there's another port, close it. + if (size() == 1 && name_.empty()) + (*(ports_.begin()))->AddToIncomingQueue(std::make_shared()); +} + +SiblingGroup::Map SiblingGroup::groups_; +Mutex SiblingGroup::groups_mutex_; + namespace { static void SetDeserializerCreateObjectFunction( @@ -1308,6 +1401,16 @@ static void MessageChannel(const FunctionCallbackInfo& args) { .Check(); } +static void BroadcastChannel(const FunctionCallbackInfo& args) { + CHECK(args[0]->IsString()); + Environment* env = Environment::GetCurrent(args); + Context::Scope context_scope(env->context()); + Utf8Value name(env->isolate(), args[0]); + MessagePort* port = + MessagePort::New(env, env->context(), nullptr, std::string(*name)); + args.GetReturnValue().Set(port->object()); +} + static void InitMessaging(Local target, Local unused, Local context, @@ -1352,6 +1455,7 @@ static void InitMessaging(Local target, MessagePort::MoveToContext); env->SetMethod(target, "setDeserializerCreateObjectFunction", SetDeserializerCreateObjectFunction); + env->SetMethod(target, "broadcastChannel", BroadcastChannel); { Local domexception = GetDOMException(context).ToLocalChecked(); @@ -1365,6 +1469,7 @@ static void InitMessaging(Local target, static void RegisterExternalReferences(ExternalReferenceRegistry* registry) { registry->Register(MessageChannel); + registry->Register(BroadcastChannel); registry->Register(JSTransferable::New); registry->Register(MessagePort::New); registry->Register(MessagePort::PostMessage); -- cgit v1.2.3