Welcome to mirror list, hosted at ThFree Co, Russian Federation.

github.com/nodejs/node.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJames M Snell <jasnell@gmail.com>2020-11-27 00:54:23 +0300
committerJames M Snell <jasnell@gmail.com>2020-12-01 19:02:28 +0300
commit9e446b3e9f8c8aa0dcfb974f306504f307b8fac2 (patch)
tree0cfecf19e0435fa40b9741c5eed35d56ae66d5da
parent09fd8f13c87a455c833e83f494616d6cc774aafa (diff)
worker: add experimental BroadcastChannel
Signed-off-by: James M Snell <jasnell@gmail.com> PR-URL: https://github.com/nodejs/node/pull/36271 Reviewed-By: Anna Henningsen <anna@addaleax.net> Reviewed-By: Benjamin Gruenbaum <benjamingr@gmail.com>
-rw-r--r--doc/api/worker_threads.md92
-rw-r--r--lib/internal/worker/io.js87
-rw-r--r--lib/worker_threads.js2
-rw-r--r--src/node_messaging.cc217
-rw-r--r--src/node_messaging.h76
-rw-r--r--test/parallel/test-worker-broadcastchannel-wpt.js148
-rw-r--r--test/parallel/test-worker-broadcastchannel.js135
-rw-r--r--tools/doc/type-parser.js4
8 files changed, 689 insertions, 72 deletions
diff --git a/doc/api/worker_threads.md b/doc/api/worker_threads.md
index bfc754b77ca..8c4c5b3854f 100644
--- a/doc/api/worker_threads.md
+++ b/doc/api/worker_threads.md
@@ -274,6 +274,98 @@ if (isMainThread) {
}
```
+## Class: `BroadcastChannel extends EventTarget`
+<!-- YAML
+added: REPLACEME
+-->
+
+> Stability: 1 - Experimental
+
+Instances of `BroadcastChannel` allow asynchronous one-to-many communication
+with all other `BroadcastChannel` instances bound to the same channel name.
+
+```js
+'use strict';
+
+const {
+ isMainThread,
+ BroadcastChannel,
+ Worker
+} = require('worker_threads');
+
+const bc = new BroadcastChannel('hello');
+
+if (isMainThread) {
+ let c = 0;
+ bc.onmessage = (event) => {
+ console.log(event.data);
+ if (++c === 10) bc.close();
+ };
+ for (let n = 0; n < 10; n++)
+ new Worker(__filename);
+} else {
+ bc.postMessage('hello from every worker');
+ bc.close();
+}
+```
+
+### `new BroadcastChannel(name)`
+<!-- YAML
+added: REPLACEME
+-->
+
+* `name` {any} The name of the channel to connect to. Any JavaScript value
+ that can be converted to a string using ``${name}`` is permitted.
+
+### `broadcastChannel.close()`
+<!-- YAML
+added: REPLACEME
+-->
+
+Closes the `BroadcastChannel` connection.
+
+### `broadcastChannel.onmessage`
+<!-- YAML
+added: REPLACEME
+-->
+
+* Type: {Function} Invoked with a single `MessageEvent` argument
+ when a message is received.
+
+### `broadcastChannel.onmessageerror`
+<!-- YAML
+added: REPLACEME
+-->
+
+* Type: {Function} Invoked with a received message cannot be
+ deserialized.
+
+### `broadcastChannel.postMessage(message)`
+<!-- YAML
+added: REPLACEME
+-->
+
+* `message` {any} Any cloneable JavaScript value.
+
+### `broadcastChannel.ref()`
+<!-- YAML
+added: REPLACEME
+-->
+
+Opposite of `unref()`. Calling `ref()` on a previously `unref()`ed
+BroadcastChannel will *not* let the program exit if it's the only active handle
+left (the default behavior). If the port is `ref()`ed, calling `ref()` again
+will have no effect.
+
+### `broadcastChannel.unref()`
+<!-- YAML
+added: REPLACEME
+-->
+
+Calling `unref()` on a BroadcastChannel will allow the thread to exit if this
+is the only active handle in the event system. If the BroadcastChannel is
+already `unref()`ed calling `unref()` again will have no effect.
+
## Class: `MessageChannel`
<!-- YAML
added: v10.5.0
diff --git a/lib/internal/worker/io.js b/lib/internal/worker/io.js
index 73958dbc2d7..2352ac6ebf8 100644
--- a/lib/internal/worker/io.js
+++ b/lib/internal/worker/io.js
@@ -19,11 +19,13 @@ const {
const {
MessagePort,
MessageChannel,
+ broadcastChannel,
drainMessagePort,
moveMessagePortToContext,
receiveMessageOnPort: receiveMessageOnPort_,
stopMessagePort,
- checkMessagePort
+ checkMessagePort,
+ DOMException,
} = internalBinding('messaging');
const {
getEnvMessagePort
@@ -41,14 +43,20 @@ const {
} = require('internal/event_target');
const { inspect } = require('internal/util/inspect');
const {
- ERR_INVALID_ARG_TYPE
-} = require('internal/errors').codes;
+ codes: {
+ ERR_INVALID_ARG_TYPE,
+ ERR_MISSING_ARGS,
+ }
+} = require('internal/errors');
const kData = Symbol('kData');
+const kHandle = Symbol('kHandle');
const kIncrementsPortRef = Symbol('kIncrementsPortRef');
const kLastEventId = Symbol('kLastEventId');
const kName = Symbol('kName');
const kOrigin = Symbol('kOrigin');
+const kOnMessage = Symbol('kOnMessage');
+const kOnMessageError = Symbol('kOnMessageError');
const kPort = Symbol('kPort');
const kPorts = Symbol('kPorts');
const kWaitingStreams = Symbol('kWaitingStreams');
@@ -324,6 +332,76 @@ function receiveMessageOnPort(port) {
return { message };
}
+function onMessageEvent(type, data) {
+ this.dispatchEvent(new MessageEvent(type, { data }));
+}
+
+class BroadcastChannel extends EventTarget {
+ constructor(name) {
+ if (arguments.length === 0)
+ throw new ERR_MISSING_ARGS('name');
+ super();
+ this[kName] = `${name}`;
+ this[kHandle] = broadcastChannel(this[kName]);
+ this[kOnMessage] = onMessageEvent.bind(this, 'message');
+ this[kOnMessageError] = onMessageEvent.bind(this, 'messageerror');
+ this[kHandle].on('message', this[kOnMessage]);
+ this[kHandle].on('messageerror', this[kOnMessageError]);
+ }
+
+ [inspect.custom](depth, options) {
+ if (depth < 0)
+ return 'BroadcastChannel';
+
+ const opts = {
+ ...options,
+ depth: options.depth == null ? null : options.depth - 1
+ };
+
+ return `BroadcastChannel ${inspect({
+ name: this[kName],
+ active: this[kHandle] !== undefined,
+ }, opts)}`;
+ }
+
+ get name() { return this[kName]; }
+
+ close() {
+ if (this[kHandle] === undefined)
+ return;
+ this[kHandle].off('message', this[kOnMessage]);
+ this[kHandle].off('messageerror', this[kOnMessageError]);
+ this[kOnMessage] = undefined;
+ this[kOnMessageError] = undefined;
+ this[kHandle].close();
+ this[kHandle] = undefined;
+ }
+
+ postMessage(message) {
+ if (arguments.length === 0)
+ throw new ERR_MISSING_ARGS('message');
+ if (this[kHandle] === undefined)
+ throw new DOMException('BroadcastChannel is closed.');
+ if (this[kHandle].postMessage(message) === undefined)
+ throw new DOMException('Message could not be posted.');
+ }
+
+ ref() {
+ if (this[kHandle])
+ this[kHandle].ref();
+ return this;
+ }
+
+ unref() {
+ if (this[kHandle])
+ this[kHandle].unref();
+ return this;
+ }
+}
+
+defineEventHandler(BroadcastChannel.prototype, 'message');
+defineEventHandler(BroadcastChannel.prototype, 'messageerror');
+
module.exports = {
drainMessagePort,
messageTypes,
@@ -339,5 +417,6 @@ module.exports = {
setupPortReferencing,
ReadableWorkerStdio,
WritableWorkerStdio,
- createWorkerStdio
+ createWorkerStdio,
+ BroadcastChannel,
};
diff --git a/lib/worker_threads.js b/lib/worker_threads.js
index 1b9c6ae945e..efa1c51112a 100644
--- a/lib/worker_threads.js
+++ b/lib/worker_threads.js
@@ -13,6 +13,7 @@ const {
MessageChannel,
moveMessagePortToContext,
receiveMessageOnPort,
+ BroadcastChannel,
} = require('internal/worker/io');
const {
@@ -32,4 +33,5 @@ module.exports = {
Worker,
parentPort: null,
workerData: null,
+ BroadcastChannel,
};
diff --git a/src/node_messaging.cc b/src/node_messaging.cc
index db3c24c3f85..74f75071429 100644
--- a/src/node_messaging.cc
+++ b/src/node_messaging.cc
@@ -161,7 +161,6 @@ MaybeLocal<Value> Message::Deserialize(Environment* env,
std::move(shared_array_buffers_[i]));
shared_array_buffers.push_back(sab);
}
- shared_array_buffers_.clear();
DeserializerDelegate delegate(
this, env, host_objects, shared_array_buffers, wasm_modules_);
@@ -178,7 +177,6 @@ MaybeLocal<Value> Message::Deserialize(Environment* env,
ArrayBuffer::New(env->isolate(), std::move(array_buffers_[i]));
deserializer.TransferArrayBuffer(i, ab);
}
- array_buffers_.clear();
if (deserializer.ReadHeader(context).IsNothing())
return {};
@@ -517,7 +515,20 @@ void Message::MemoryInfo(MemoryTracker* tracker) const {
tracker->TrackField("transferables", transferables_);
}
-MessagePortData::MessagePortData(MessagePort* owner) : owner_(owner) { }
+// TODO(@jasnell): The name here will be an empty string if the
+// one-to-one MessageChannel is used. In such cases,
+// SiblingGroup::Get() will return nothing and group_ will be
+// an empty pointer. @addaleax suggests that the code here
+// could be clearer if attaching the SiblingGroup were a
+// separate step rather than part of the constructor here.
+MessagePortData::MessagePortData(
+ MessagePort* owner,
+ const std::string& name)
+ : owner_(owner),
+ group_(SiblingGroup::Get(name)) {
+ if (group_)
+ group_->Entangle(this);
+}
MessagePortData::~MessagePortData() {
CHECK_NULL(owner_);
@@ -529,7 +540,7 @@ void MessagePortData::MemoryInfo(MemoryTracker* tracker) const {
tracker->TrackField("incoming_messages", incoming_messages_);
}
-void MessagePortData::AddToIncomingQueue(Message&& message) {
+void MessagePortData::AddToIncomingQueue(std::shared_ptr<Message> message) {
// This function will be called by other threads.
Mutex::ScopedLock lock(mutex_);
incoming_messages_.emplace_back(std::move(message));
@@ -541,32 +552,17 @@ void MessagePortData::AddToIncomingQueue(Message&& message) {
}
void MessagePortData::Entangle(MessagePortData* a, MessagePortData* b) {
- CHECK_NULL(a->sibling_);
- CHECK_NULL(b->sibling_);
- a->sibling_ = b;
- b->sibling_ = a;
- a->sibling_mutex_ = b->sibling_mutex_;
+ CHECK(!a->group_);
+ CHECK(!b->group_);
+ b->group_ = a->group_ = std::make_shared<SiblingGroup>();
+ a->group_->Entangle(a);
+ a->group_->Entangle(b);
}
void MessagePortData::Disentangle() {
- // Grab a copy of the sibling mutex, then replace it so that each sibling
- // has its own sibling_mutex_ now.
- std::shared_ptr<Mutex> sibling_mutex = sibling_mutex_;
- Mutex::ScopedLock sibling_lock(*sibling_mutex);
- sibling_mutex_ = std::make_shared<Mutex>();
-
- MessagePortData* sibling = sibling_;
- if (sibling_ != nullptr) {
- sibling_->sibling_ = nullptr;
- sibling_ = nullptr;
- }
-
- // We close MessagePorts after disentanglement, so we enqueue a corresponding
- // message and trigger the corresponding uv_async_t to let them know that
- // this happened.
- AddToIncomingQueue(Message());
- if (sibling != nullptr) {
- sibling->AddToIncomingQueue(Message());
+ if (group_) {
+ group_->Disentangle(this);
+ group_.reset();
}
}
@@ -576,12 +572,13 @@ MessagePort::~MessagePort() {
MessagePort::MessagePort(Environment* env,
Local<Context> context,
- Local<Object> wrap)
+ Local<Object> wrap,
+ const std::string& name)
: HandleWrap(env,
wrap,
reinterpret_cast<uv_handle_t*>(&async_),
AsyncWrap::PROVIDER_MESSAGEPORT),
- data_(new MessagePortData(this)) {
+ data_(new MessagePortData(this, name)) {
auto onmessage = [](uv_async_t* handle) {
// Called when data has been put into the queue.
MessagePort* channel = ContainerOf(&MessagePort::async_, handle);
@@ -647,7 +644,8 @@ void MessagePort::New(const FunctionCallbackInfo<Value>& args) {
MessagePort* MessagePort::New(
Environment* env,
Local<Context> context,
- std::unique_ptr<MessagePortData> data) {
+ std::unique_ptr<MessagePortData> data,
+ const std::string& name) {
Context::Scope context_scope(context);
Local<FunctionTemplate> ctor_templ = GetMessagePortConstructorTemplate(env);
@@ -656,7 +654,7 @@ MessagePort* MessagePort::New(
Local<Object> instance;
if (!ctor_templ->InstanceTemplate()->NewInstance(context).ToLocal(&instance))
return nullptr;
- MessagePort* port = new MessagePort(env, context, instance);
+ MessagePort* port = new MessagePort(env, context, instance, name);
CHECK_NOT_NULL(port);
if (port->IsHandleClosing()) {
// Construction failed with an exception.
@@ -681,7 +679,7 @@ MessagePort* MessagePort::New(
MaybeLocal<Value> MessagePort::ReceiveMessage(Local<Context> context,
bool only_if_receiving) {
- Message received;
+ std::shared_ptr<Message> received;
{
// Get the head of the message queue.
Mutex::ScopedLock lock(data_->mutex_);
@@ -695,22 +693,22 @@ MaybeLocal<Value> MessagePort::ReceiveMessage(Local<Context> context,
// receive is not the final "close" message.
if (data_->incoming_messages_.empty() ||
(!wants_message &&
- !data_->incoming_messages_.front().IsCloseMessage())) {
+ !data_->incoming_messages_.front()->IsCloseMessage())) {
return env()->no_message_symbol();
}
- received = std::move(data_->incoming_messages_.front());
+ received = data_->incoming_messages_.front();
data_->incoming_messages_.pop_front();
}
- if (received.IsCloseMessage()) {
+ if (received->IsCloseMessage()) {
Close();
return env()->no_message_symbol();
}
if (!env()->can_call_into_js()) return MaybeLocal<Value>();
- return received.Deserialize(env(), context);
+ return received->Deserialize(env(), context);
}
void MessagePort::OnMessage() {
@@ -829,13 +827,13 @@ Maybe<bool> MessagePort::PostMessage(Environment* env,
Local<Object> obj = object(isolate);
Local<Context> context = obj->CreationContext();
- Message msg;
+ std::shared_ptr<Message> msg = std::make_shared<Message>();
// Per spec, we need to both check if transfer list has the source port, and
// serialize the input message, even if the MessagePort is closed or detached.
Maybe<bool> serialization_maybe =
- msg.Serialize(env, context, message_v, transfer_v, obj);
+ msg->Serialize(env, context, message_v, transfer_v, obj);
if (data_ == nullptr) {
return serialization_maybe;
}
@@ -843,26 +841,26 @@ Maybe<bool> MessagePort::PostMessage(Environment* env,
return Nothing<bool>();
}
- Mutex::ScopedLock lock(*data_->sibling_mutex_);
- bool doomed = false;
+ std::string error;
+ Maybe<bool> res = data_->Dispatch(msg, &error);
+ if (res.IsNothing())
+ return res;
- // Check if the target port is posted to itself.
- if (data_->sibling_ != nullptr) {
- for (const auto& transferable : msg.transferables()) {
- if (data_->sibling_ == transferable.get()) {
- doomed = true;
- ProcessEmitWarning(env, "The target port was posted to itself, and "
- "the communication channel was lost");
- break;
- }
- }
- }
+ if (!error.empty())
+ ProcessEmitWarning(env, error.c_str());
- if (data_->sibling_ == nullptr || doomed)
- return Just(true);
+ return res;
+}
- data_->sibling_->AddToIncomingQueue(std::move(msg));
- return Just(true);
+Maybe<bool> MessagePortData::Dispatch(
+ std::shared_ptr<Message> message,
+ std::string* error) {
+ if (!group_) {
+ if (error != nullptr)
+ *error = "MessagePortData is not entangled.";
+ return Nothing<bool>();
+ }
+ return group_->Dispatch(this, message, error);
}
static Maybe<bool> ReadIterable(Environment* env,
@@ -969,7 +967,9 @@ void MessagePort::PostMessage(const FunctionCallbackInfo<Value>& args) {
return;
}
- port->PostMessage(env, args[0], transfer_list);
+ Maybe<bool> res = port->PostMessage(env, args[0], transfer_list);
+ if (res.IsJust())
+ args.GetReturnValue().Set(res.FromJust());
}
void MessagePort::Start() {
@@ -1273,6 +1273,99 @@ Maybe<bool> JSTransferable::Data::FinalizeTransferWrite(
return ret;
}
+std::shared_ptr<SiblingGroup> SiblingGroup::Get(const std::string& name) {
+ if (name.empty()) return {};
+ Mutex::ScopedLock lock(SiblingGroup::groups_mutex_);
+ std::shared_ptr<SiblingGroup> group;
+ auto i = groups_.find(name);
+ if (i == groups_.end() || i->second.expired()) {
+ group = std::make_shared<SiblingGroup>(name);
+ groups_[name] = group;
+ } else {
+ group = i->second.lock();
+ }
+ return group;
+}
+
+void SiblingGroup::CheckSiblingGroup(const std::string& name) {
+ Mutex::ScopedLock lock(SiblingGroup::groups_mutex_);
+ auto i = groups_.find(name);
+ if (i != groups_.end() && i->second.expired())
+ groups_.erase(name);
+}
+
+SiblingGroup::SiblingGroup(const std::string& name)
+ : name_(name) { }
+
+SiblingGroup::~SiblingGroup() {
+ // If this is a named group, check to see if we can remove the group
+ if (!name_.empty())
+ CheckSiblingGroup(name_);
+}
+
+Maybe<bool> SiblingGroup::Dispatch(
+ MessagePortData* source,
+ std::shared_ptr<Message> message,
+ std::string* error) {
+
+ Mutex::ScopedLock lock(group_mutex_);
+
+ // The source MessagePortData is not part of this group.
+ if (ports_.find(source) == ports_.end()) {
+ if (error != nullptr)
+ *error = "Source MessagePort is not entangled with this group.";
+ return Nothing<bool>();
+ }
+
+ // There are no destination ports.
+ if (size() <= 1)
+ return Just(false);
+
+ // Transferables cannot be used when there is more
+ // than a single destination.
+ if (size() > 2 && message->transferables().size()) {
+ if (error != nullptr)
+ *error = "Transferables cannot be used with multiple destinations.";
+ return Nothing<bool>();
+ }
+
+ for (MessagePortData* port : ports_) {
+ if (port == source)
+ continue;
+ // This loop should only be entered if there's only a single destination
+ for (const auto& transferable : message->transferables()) {
+ if (port == transferable.get()) {
+ if (error != nullptr) {
+ *error = "The target port was posted to itself, and the "
+ "communication channel was lost";
+ }
+ return Just(true);
+ }
+ }
+ port->AddToIncomingQueue(message);
+ }
+
+ return Just(true);
+}
+
+void SiblingGroup::Entangle(MessagePortData* data) {
+ Mutex::ScopedLock lock(group_mutex_);
+ ports_.insert(data);
+}
+
+void SiblingGroup::Disentangle(MessagePortData* data) {
+ Mutex::ScopedLock lock(group_mutex_);
+ ports_.erase(data);
+
+ data->AddToIncomingQueue(std::make_shared<Message>());
+ // If this is an anonymous group and there's another port, close it.
+ if (size() == 1 && name_.empty())
+ (*(ports_.begin()))->AddToIncomingQueue(std::make_shared<Message>());
+}
+
+SiblingGroup::Map SiblingGroup::groups_;
+Mutex SiblingGroup::groups_mutex_;
+
namespace {
static void SetDeserializerCreateObjectFunction(
@@ -1308,6 +1401,16 @@ static void MessageChannel(const FunctionCallbackInfo<Value>& args) {
.Check();
}
+static void BroadcastChannel(const FunctionCallbackInfo<Value>& args) {
+ CHECK(args[0]->IsString());
+ Environment* env = Environment::GetCurrent(args);
+ Context::Scope context_scope(env->context());
+ Utf8Value name(env->isolate(), args[0]);
+ MessagePort* port =
+ MessagePort::New(env, env->context(), nullptr, std::string(*name));
+ args.GetReturnValue().Set(port->object());
+}
+
static void InitMessaging(Local<Object> target,
Local<Value> unused,
Local<Context> context,
@@ -1352,6 +1455,7 @@ static void InitMessaging(Local<Object> target,
MessagePort::MoveToContext);
env->SetMethod(target, "setDeserializerCreateObjectFunction",
SetDeserializerCreateObjectFunction);
+ env->SetMethod(target, "broadcastChannel", BroadcastChannel);
{
Local<Function> domexception = GetDOMException(context).ToLocalChecked();
@@ -1365,6 +1469,7 @@ static void InitMessaging(Local<Object> target,
static void RegisterExternalReferences(ExternalReferenceRegistry* registry) {
registry->Register(MessageChannel);
+ registry->Register(BroadcastChannel);
registry->Register(JSTransferable::New);
registry->Register(MessagePort::New);
registry->Register(MessagePort::PostMessage);
diff --git a/src/node_messaging.h b/src/node_messaging.h
index 7ef02226480..22c11321ef7 100644
--- a/src/node_messaging.h
+++ b/src/node_messaging.h
@@ -5,7 +5,11 @@
#include "env.h"
#include "node_mutex.h"
-#include <list>
+#include "v8.h"
+#include <deque>
+#include <string>
+#include <unordered_map>
+#include <set>
namespace node {
namespace worker {
@@ -45,6 +49,7 @@ class Message : public MemoryRetainer {
// V8 ValueSerializer API. If `payload` is empty, this message indicates
// that the receiving message port should close itself.
explicit Message(MallocedBuffer<char>&& payload = MallocedBuffer<char>());
+ ~Message() = default;
Message(Message&& other) = default;
Message& operator=(Message&& other) = default;
@@ -105,11 +110,58 @@ class Message : public MemoryRetainer {
friend class MessagePort;
};
+class SiblingGroup {
+ public:
+ // Named SiblingGroup, Used for one-to-many BroadcastChannels.
+ static std::shared_ptr<SiblingGroup> Get(const std::string& name);
+
+ // Anonymous SiblingGroup, Used for one-to-one MessagePort pairs.
+ SiblingGroup() = default;
+ explicit SiblingGroup(const std::string& name);
+ ~SiblingGroup();
+
+ // Dispatches the Message to the collection of associated
+ // ports. If there is more than one destination port and
+ // the Message contains transferables, Dispatch will fail.
+ // Returns Just(true) if successful and the message was
+ // dispatched to at least one destination. Returns Just(false)
+ // if there were no destinations. Returns Nothing<bool>()
+ // if there was an error. If error is not nullptr, it will
+ // be set to an error message or warning message as appropriate.
+ v8::Maybe<bool> Dispatch(
+ MessagePortData* source,
+ std::shared_ptr<Message> message,
+ std::string* error = nullptr);
+
+ void Entangle(MessagePortData* data);
+
+ void Disentangle(MessagePortData* data);
+
+ const std::string& name() const { return name_; }
+
+ size_t size() const { return ports_.size(); }
+
+ private:
+ std::string name_;
+ std::set<MessagePortData*> ports_;
+ Mutex group_mutex_;
+
+ static void CheckSiblingGroup(const std::string& name);
+
+ using Map =
+ std::unordered_map<std::string, std::weak_ptr<SiblingGroup>>;
+
+ static Mutex groups_mutex_;
+ static Map groups_;
+};
+
// This contains all data for a `MessagePort` instance that is not tied to
// a specific Environment/Isolate/event loop, for easier transfer between those.
class MessagePortData : public TransferData {
public:
- explicit MessagePortData(MessagePort* owner);
+ explicit MessagePortData(
+ MessagePort* owner,
+ const std::string& name = std::string());
~MessagePortData() override;
MessagePortData(MessagePortData&& other) = delete;
@@ -119,7 +171,10 @@ class MessagePortData : public TransferData {
// Add a message to the incoming queue and notify the receiver.
// This may be called from any thread.
- void AddToIncomingQueue(Message&& message);
+ void AddToIncomingQueue(std::shared_ptr<Message> message);
+ v8::Maybe<bool> Dispatch(
+ std::shared_ptr<Message> message,
+ std::string* error = nullptr);
// Turns `a` and `b` into siblings, i.e. connects the sending side of one
// to the receiving side of the other. This is not thread-safe.
@@ -144,14 +199,9 @@ class MessagePortData : public TransferData {
// This mutex protects all fields below it, with the exception of
// sibling_.
mutable Mutex mutex_;
- std::list<Message> incoming_messages_;
+ std::deque<std::shared_ptr<Message>> incoming_messages_;
MessagePort* owner_ = nullptr;
- // This mutex protects the sibling_ field and is shared between two entangled
- // MessagePorts. If both mutexes are acquired, this one needs to be
- // acquired first.
- std::shared_ptr<Mutex> sibling_mutex_ = std::make_shared<Mutex>();
- MessagePortData* sibling_ = nullptr;
-
+ std::shared_ptr<SiblingGroup> group_;
friend class MessagePort;
};
@@ -166,7 +216,8 @@ class MessagePort : public HandleWrap {
// creating MessagePort instances.
MessagePort(Environment* env,
v8::Local<v8::Context> context,
- v8::Local<v8::Object> wrap);
+ v8::Local<v8::Object> wrap,
+ const std::string& name = std::string());
public:
~MessagePort() override;
@@ -175,7 +226,8 @@ class MessagePort : public HandleWrap {
// `MessagePortData` object.
static MessagePort* New(Environment* env,
v8::Local<v8::Context> context,
- std::unique_ptr<MessagePortData> data = nullptr);
+ std::unique_ptr<MessagePortData> data = nullptr,
+ const std::string& name = std::string());
// Send a message, i.e. deliver it into the sibling's incoming queue.
// If this port is closed, or if there is no sibling, this message is
diff --git a/test/parallel/test-worker-broadcastchannel-wpt.js b/test/parallel/test-worker-broadcastchannel-wpt.js
new file mode 100644
index 00000000000..3a44366d5c9
--- /dev/null
+++ b/test/parallel/test-worker-broadcastchannel-wpt.js
@@ -0,0 +1,148 @@
+'use strict';
+
+const common = require('../common');
+const assert = require('assert');
+const {
+ BroadcastChannel,
+} = require('worker_threads');
+
+{
+ const c1 = new BroadcastChannel('eventType').unref();
+ const c2 = new BroadcastChannel('eventType');
+
+ c2.onmessage = common.mustCall((e) => {
+ assert(e instanceof MessageEvent);
+ assert.strictEqual(e.target, c2);
+ assert.strictEqual(e.type, 'message');
+ assert.strictEqual(e.data, 'hello world');
+ c2.close();
+ });
+ c1.postMessage('hello world');
+}
+
+{
+ // Messages are delivered in port creation order.
+ // TODO(@jasnell): The ordering here is different than
+ // what the browsers would implement due to the different
+ // dispatching algorithm under the covers. What's not
+ // immediately clear is whether the ordering is spec
+ // mandated. In this test, c1 should receive events
+ // first, then c2, then c3. In the Node.js dispatching
+ // algorithm this means the ordering is:
+ // from c3 (c1 from c3)
+ // done (c1 from c2)
+ // from c1 (c2 from c1)
+ // from c3 (c2 from c3)
+ // from c1 (c3 from c1)
+ // done (c3 from c2)
+ //
+ // Whereas in the browser-ordering (as illustrated in the
+ // Web Platform Tests) it would be:
+ // from c1 (c2 from c1)
+ // from c1 (c3 from c1)
+ // from c3 (c1 from c3)
+ // from c3 (c2 from c3)
+ // done (c1 from c2)
+ // done (c3 from c2)
+ const c1 = new BroadcastChannel('order');
+ const c2 = new BroadcastChannel('order');
+ const c3 = new BroadcastChannel('order');
+
+ const events = [];
+ let doneCount = 0;
+ const handler = common.mustCall((e) => {
+ events.push(e);
+ if (e.data === 'done') {
+ doneCount++;
+ if (doneCount === 2) {
+ assert.strictEqual(events.length, 6);
+ assert.strictEqual(events[0].data, 'from c3');
+ assert.strictEqual(events[1].data, 'done');
+ assert.strictEqual(events[2].data, 'from c1');
+ assert.strictEqual(events[3].data, 'from c3');
+ assert.strictEqual(events[4].data, 'from c1');
+ assert.strictEqual(events[5].data, 'done');
+ c1.close();
+ c2.close();
+ c3.close();
+ }
+ }
+ }, 6);
+ c1.onmessage = handler;
+ c2.onmessage = handler;
+ c3.onmessage = handler;
+
+ c1.postMessage('from c1');
+ c3.postMessage('from c3');
+ c2.postMessage('done');
+}
+
+{
+ // Messages aren't delivered to a closed port
+ const c1 = new BroadcastChannel('closed1').unref();
+ const c2 = new BroadcastChannel('closed1');
+ const c3 = new BroadcastChannel('closed1');
+
+ c2.onmessage = common.mustNotCall();
+ c2.close();
+ c3.onmessage = common.mustCall(() => c3.close());
+ c1.postMessage('test');
+}
+
+{
+ // Messages aren't delivered to a port closed after calling postMessage.
+ const c1 = new BroadcastChannel('closed2').unref();
+ const c2 = new BroadcastChannel('closed2');
+ const c3 = new BroadcastChannel('closed2');
+
+ c2.onmessage = common.mustNotCall();
+ c3.onmessage = common.mustCall(() => c3.close());
+ c1.postMessage('test');
+ c2.close();
+}
+
+{
+ // Closing and creating channels during message delivery works correctly
+ const c1 = new BroadcastChannel('create-in-onmessage').unref();
+ const c2 = new BroadcastChannel('create-in-onmessage');
+
+ c2.onmessage = common.mustCall((e) => {
+ assert.strictEqual(e.data, 'first');
+ c2.close();
+ const c3 = new BroadcastChannel('create-in-onmessage');
+ c3.onmessage = common.mustCall((event) => {
+ assert.strictEqual(event.data, 'done');
+ c3.close();
+ });
+ c1.postMessage('done');
+ });
+ c1.postMessage('first');
+ c2.postMessage('second');
+}
+
+{
+ // Closing a channel in onmessage prevents already queued tasks
+ // from firing onmessage events
+ const c1 = new BroadcastChannel('close-in-onmessage2').unref();
+ const c2 = new BroadcastChannel('close-in-onmessage2');
+ const c3 = new BroadcastChannel('close-in-onmessage2');
+ const events = [];
+ c1.onmessage = (e) => events.push('c1: ' + e.data);
+ c2.onmessage = (e) => events.push('c2: ' + e.data);
+ c3.onmessage = (e) => events.push('c3: ' + e.data);
+
+ // c2 closes itself when it receives the first message
+ c2.addEventListener('message', common.mustCall(() => c2.close()));
+
+ c3.addEventListener('message', common.mustCall((e) => {
+ if (e.data === 'done') {
+ assert.deepStrictEqual(events, [
+ 'c2: first',
+ 'c3: first',
+ 'c3: done']);
+ c3.close();
+ }
+ }, 2));
+ c1.postMessage('first');
+ c1.postMessage('done');
+}
diff --git a/test/parallel/test-worker-broadcastchannel.js b/test/parallel/test-worker-broadcastchannel.js
new file mode 100644
index 00000000000..1f104a5edf4
--- /dev/null
+++ b/test/parallel/test-worker-broadcastchannel.js
@@ -0,0 +1,135 @@
+'use strict';
+
+const common = require('../common');
+const {
+ BroadcastChannel,
+ Worker,
+} = require('worker_threads');
+const assert = require('assert');
+
+assert.throws(() => new BroadcastChannel(Symbol('test')), {
+ message: /Cannot convert a Symbol value to a string/
+});
+
+assert.throws(() => new BroadcastChannel(), {
+ message: /The "name" argument must be specified/
+});
+
+// These should all just work
+[undefined, 1, null, 'test', 1n, false, Infinity].forEach((i) => {
+ const bc = new BroadcastChannel(i);
+ assert.strictEqual(bc.name, `${i}`);
+ bc.close();
+});
+
+{
+ // Empty postMessage throws
+ const bc = new BroadcastChannel('whatever');
+ assert.throws(() => bc.postMessage(), {
+ message: /The "message" argument must be specified/
+ });
+ bc.close();
+ // Calling close multiple times should not throw
+ bc.close();
+
+ // Calling postMessage after close should throw
+ assert.throws(() => bc.postMessage(null), {
+ message: /BroadcastChannel is closed/
+ });
+}
+
+{
+ const bc1 = new BroadcastChannel('channel1');
+ const bc2 = new BroadcastChannel('channel1');
+ const bc3 = new BroadcastChannel('channel1');
+ const bc4 = new BroadcastChannel('channel2');
+ assert.strictEqual(bc1.name, 'channel1');
+ assert.strictEqual(bc2.name, 'channel1');
+ assert.strictEqual(bc3.name, 'channel1');
+ assert.strictEqual(bc4.name, 'channel2');
+ bc1.addEventListener('message', common.mustCall((event) => {
+ assert.strictEqual(event.data, 'hello');
+ bc1.close();
+ bc2.close();
+ bc4.close();
+ }));
+ bc3.addEventListener('message', common.mustCall((event) => {
+ assert.strictEqual(event.data, 'hello');
+ bc3.close();
+ }));
+ bc2.addEventListener('message', common.mustNotCall());
+ bc4.addEventListener('message', common.mustNotCall());
+ bc2.postMessage('hello');
+}
+
+{
+ const bc1 = new BroadcastChannel('onmessage-channel1');
+ const bc2 = new BroadcastChannel('onmessage-channel1');
+ const bc3 = new BroadcastChannel('onmessage-channel1');
+ const bc4 = new BroadcastChannel('onmessage-channel2');
+ assert.strictEqual(bc1.name, 'onmessage-channel1');
+ assert.strictEqual(bc2.name, 'onmessage-channel1');
+ assert.strictEqual(bc3.name, 'onmessage-channel1');
+ assert.strictEqual(bc4.name, 'onmessage-channel2');
+ bc1.onmessage = common.mustCall((event) => {
+ assert.strictEqual(event.data, 'hello');
+ bc1.close();
+ bc2.close();
+ bc4.close();
+ });
+ bc3.onmessage = common.mustCall((event) => {
+ assert.strictEqual(event.data, 'hello');
+ bc3.close();
+ });
+ bc2.onmessage = common.mustNotCall();
+ bc4.onmessage = common.mustNotCall();
+ bc2.postMessage('hello');
+}
+
+{
+ const bc = new BroadcastChannel('worker1');
+ new Worker(`
+ const assert = require('assert');
+ const { BroadcastChannel } = require('worker_threads');
+ const bc = new BroadcastChannel('worker1');
+ bc.addEventListener('message', (event) => {
+ assert.strictEqual(event.data, 123);
+ // If this close() is not executed, the test should hang and timeout.
+ // If the test does hang and timeout in CI, then the first step should
+ // be to check that the two bc.close() calls are being made.
+ bc.close();
+ });
+ bc.postMessage(321);
+ `, { eval: true });
+ bc.addEventListener('message', common.mustCall(({ data }) => {
+ assert.strictEqual(data, 321);
+ bc.postMessage(123);
+ bc.close();
+ }));
+}
+
+{
+ const bc1 = new BroadcastChannel('channel3');
+ const bc2 = new BroadcastChannel('channel3');
+ bc2.postMessage(new SharedArrayBuffer(10));
+ bc1.addEventListener('message', common.mustCall(({ data }) => {
+ assert(data instanceof SharedArrayBuffer);
+ bc1.close();
+ bc2.close();
+ }));
+}
+
+{
+ const bc1 = new BroadcastChannel('channel3');
+ const mc = new MessageChannel();
+ assert.throws(() => bc1.postMessage(mc), {
+ message: /Object that needs transfer was found/
+ });
+ assert.throws(() => bc1.postMessage(Symbol()), {
+ message: /Symbol\(\) could not be cloned/
+ });
+ bc1.close();
+ assert.throws(() => bc1.postMessage(Symbol()), {
+ message: /BroadcastChannel is closed/
+ });
+}
diff --git a/tools/doc/type-parser.js b/tools/doc/type-parser.js
index e3d032319de..72a2ec41994 100644
--- a/tools/doc/type-parser.js
+++ b/tools/doc/type-parser.js
@@ -40,6 +40,10 @@ const customTypesMap = {
'WebAssembly.Instance':
`${jsDocPrefix}Reference/Global_Objects/WebAssembly/Instance`,
+ 'BroadcastChannel':
+ 'worker_threads.html#worker_threads_class_broadcastchannel_' +
+ 'extends_eventtarget',
+
'Iterable':
`${jsDocPrefix}Reference/Iteration_protocols#The_iterable_protocol`,
'Iterator':