diff options
author | James M Snell <jasnell@gmail.com> | 2020-11-27 00:54:23 +0300 |
---|---|---|
committer | James M Snell <jasnell@gmail.com> | 2020-12-01 19:02:28 +0300 |
commit | 9e446b3e9f8c8aa0dcfb974f306504f307b8fac2 (patch) | |
tree | 0cfecf19e0435fa40b9741c5eed35d56ae66d5da | |
parent | 09fd8f13c87a455c833e83f494616d6cc774aafa (diff) |
worker: add experimental BroadcastChannel
Signed-off-by: James M Snell <jasnell@gmail.com>
PR-URL: https://github.com/nodejs/node/pull/36271
Reviewed-By: Anna Henningsen <anna@addaleax.net>
Reviewed-By: Benjamin Gruenbaum <benjamingr@gmail.com>
-rw-r--r-- | doc/api/worker_threads.md | 92 | ||||
-rw-r--r-- | lib/internal/worker/io.js | 87 | ||||
-rw-r--r-- | lib/worker_threads.js | 2 | ||||
-rw-r--r-- | src/node_messaging.cc | 217 | ||||
-rw-r--r-- | src/node_messaging.h | 76 | ||||
-rw-r--r-- | test/parallel/test-worker-broadcastchannel-wpt.js | 148 | ||||
-rw-r--r-- | test/parallel/test-worker-broadcastchannel.js | 135 | ||||
-rw-r--r-- | tools/doc/type-parser.js | 4 |
8 files changed, 689 insertions, 72 deletions
diff --git a/doc/api/worker_threads.md b/doc/api/worker_threads.md index bfc754b77ca..8c4c5b3854f 100644 --- a/doc/api/worker_threads.md +++ b/doc/api/worker_threads.md @@ -274,6 +274,98 @@ if (isMainThread) { } ``` +## Class: `BroadcastChannel extends EventTarget` +<!-- YAML +added: REPLACEME +--> + +> Stability: 1 - Experimental + +Instances of `BroadcastChannel` allow asynchronous one-to-many communication +with all other `BroadcastChannel` instances bound to the same channel name. + +```js +'use strict'; + +const { + isMainThread, + BroadcastChannel, + Worker +} = require('worker_threads'); + +const bc = new BroadcastChannel('hello'); + +if (isMainThread) { + let c = 0; + bc.onmessage = (event) => { + console.log(event.data); + if (++c === 10) bc.close(); + }; + for (let n = 0; n < 10; n++) + new Worker(__filename); +} else { + bc.postMessage('hello from every worker'); + bc.close(); +} +``` + +### `new BroadcastChannel(name)` +<!-- YAML +added: REPLACEME +--> + +* `name` {any} The name of the channel to connect to. Any JavaScript value + that can be converted to a string using ``${name}`` is permitted. + +### `broadcastChannel.close()` +<!-- YAML +added: REPLACEME +--> + +Closes the `BroadcastChannel` connection. + +### `broadcastChannel.onmessage` +<!-- YAML +added: REPLACEME +--> + +* Type: {Function} Invoked with a single `MessageEvent` argument + when a message is received. + +### `broadcastChannel.onmessageerror` +<!-- YAML +added: REPLACEME +--> + +* Type: {Function} Invoked with a received message cannot be + deserialized. + +### `broadcastChannel.postMessage(message)` +<!-- YAML +added: REPLACEME +--> + +* `message` {any} Any cloneable JavaScript value. + +### `broadcastChannel.ref()` +<!-- YAML +added: REPLACEME +--> + +Opposite of `unref()`. Calling `ref()` on a previously `unref()`ed +BroadcastChannel will *not* let the program exit if it's the only active handle +left (the default behavior). If the port is `ref()`ed, calling `ref()` again +will have no effect. + +### `broadcastChannel.unref()` +<!-- YAML +added: REPLACEME +--> + +Calling `unref()` on a BroadcastChannel will allow the thread to exit if this +is the only active handle in the event system. If the BroadcastChannel is +already `unref()`ed calling `unref()` again will have no effect. + ## Class: `MessageChannel` <!-- YAML added: v10.5.0 diff --git a/lib/internal/worker/io.js b/lib/internal/worker/io.js index 73958dbc2d7..2352ac6ebf8 100644 --- a/lib/internal/worker/io.js +++ b/lib/internal/worker/io.js @@ -19,11 +19,13 @@ const { const { MessagePort, MessageChannel, + broadcastChannel, drainMessagePort, moveMessagePortToContext, receiveMessageOnPort: receiveMessageOnPort_, stopMessagePort, - checkMessagePort + checkMessagePort, + DOMException, } = internalBinding('messaging'); const { getEnvMessagePort @@ -41,14 +43,20 @@ const { } = require('internal/event_target'); const { inspect } = require('internal/util/inspect'); const { - ERR_INVALID_ARG_TYPE -} = require('internal/errors').codes; + codes: { + ERR_INVALID_ARG_TYPE, + ERR_MISSING_ARGS, + } +} = require('internal/errors'); const kData = Symbol('kData'); +const kHandle = Symbol('kHandle'); const kIncrementsPortRef = Symbol('kIncrementsPortRef'); const kLastEventId = Symbol('kLastEventId'); const kName = Symbol('kName'); const kOrigin = Symbol('kOrigin'); +const kOnMessage = Symbol('kOnMessage'); +const kOnMessageError = Symbol('kOnMessageError'); const kPort = Symbol('kPort'); const kPorts = Symbol('kPorts'); const kWaitingStreams = Symbol('kWaitingStreams'); @@ -324,6 +332,76 @@ function receiveMessageOnPort(port) { return { message }; } +function onMessageEvent(type, data) { + this.dispatchEvent(new MessageEvent(type, { data })); +} + +class BroadcastChannel extends EventTarget { + constructor(name) { + if (arguments.length === 0) + throw new ERR_MISSING_ARGS('name'); + super(); + this[kName] = `${name}`; + this[kHandle] = broadcastChannel(this[kName]); + this[kOnMessage] = onMessageEvent.bind(this, 'message'); + this[kOnMessageError] = onMessageEvent.bind(this, 'messageerror'); + this[kHandle].on('message', this[kOnMessage]); + this[kHandle].on('messageerror', this[kOnMessageError]); + } + + [inspect.custom](depth, options) { + if (depth < 0) + return 'BroadcastChannel'; + + const opts = { + ...options, + depth: options.depth == null ? null : options.depth - 1 + }; + + return `BroadcastChannel ${inspect({ + name: this[kName], + active: this[kHandle] !== undefined, + }, opts)}`; + } + + get name() { return this[kName]; } + + close() { + if (this[kHandle] === undefined) + return; + this[kHandle].off('message', this[kOnMessage]); + this[kHandle].off('messageerror', this[kOnMessageError]); + this[kOnMessage] = undefined; + this[kOnMessageError] = undefined; + this[kHandle].close(); + this[kHandle] = undefined; + } + + postMessage(message) { + if (arguments.length === 0) + throw new ERR_MISSING_ARGS('message'); + if (this[kHandle] === undefined) + throw new DOMException('BroadcastChannel is closed.'); + if (this[kHandle].postMessage(message) === undefined) + throw new DOMException('Message could not be posted.'); + } + + ref() { + if (this[kHandle]) + this[kHandle].ref(); + return this; + } + + unref() { + if (this[kHandle]) + this[kHandle].unref(); + return this; + } +} + +defineEventHandler(BroadcastChannel.prototype, 'message'); +defineEventHandler(BroadcastChannel.prototype, 'messageerror'); + module.exports = { drainMessagePort, messageTypes, @@ -339,5 +417,6 @@ module.exports = { setupPortReferencing, ReadableWorkerStdio, WritableWorkerStdio, - createWorkerStdio + createWorkerStdio, + BroadcastChannel, }; diff --git a/lib/worker_threads.js b/lib/worker_threads.js index 1b9c6ae945e..efa1c51112a 100644 --- a/lib/worker_threads.js +++ b/lib/worker_threads.js @@ -13,6 +13,7 @@ const { MessageChannel, moveMessagePortToContext, receiveMessageOnPort, + BroadcastChannel, } = require('internal/worker/io'); const { @@ -32,4 +33,5 @@ module.exports = { Worker, parentPort: null, workerData: null, + BroadcastChannel, }; diff --git a/src/node_messaging.cc b/src/node_messaging.cc index db3c24c3f85..74f75071429 100644 --- a/src/node_messaging.cc +++ b/src/node_messaging.cc @@ -161,7 +161,6 @@ MaybeLocal<Value> Message::Deserialize(Environment* env, std::move(shared_array_buffers_[i])); shared_array_buffers.push_back(sab); } - shared_array_buffers_.clear(); DeserializerDelegate delegate( this, env, host_objects, shared_array_buffers, wasm_modules_); @@ -178,7 +177,6 @@ MaybeLocal<Value> Message::Deserialize(Environment* env, ArrayBuffer::New(env->isolate(), std::move(array_buffers_[i])); deserializer.TransferArrayBuffer(i, ab); } - array_buffers_.clear(); if (deserializer.ReadHeader(context).IsNothing()) return {}; @@ -517,7 +515,20 @@ void Message::MemoryInfo(MemoryTracker* tracker) const { tracker->TrackField("transferables", transferables_); } -MessagePortData::MessagePortData(MessagePort* owner) : owner_(owner) { } +// TODO(@jasnell): The name here will be an empty string if the +// one-to-one MessageChannel is used. In such cases, +// SiblingGroup::Get() will return nothing and group_ will be +// an empty pointer. @addaleax suggests that the code here +// could be clearer if attaching the SiblingGroup were a +// separate step rather than part of the constructor here. +MessagePortData::MessagePortData( + MessagePort* owner, + const std::string& name) + : owner_(owner), + group_(SiblingGroup::Get(name)) { + if (group_) + group_->Entangle(this); +} MessagePortData::~MessagePortData() { CHECK_NULL(owner_); @@ -529,7 +540,7 @@ void MessagePortData::MemoryInfo(MemoryTracker* tracker) const { tracker->TrackField("incoming_messages", incoming_messages_); } -void MessagePortData::AddToIncomingQueue(Message&& message) { +void MessagePortData::AddToIncomingQueue(std::shared_ptr<Message> message) { // This function will be called by other threads. Mutex::ScopedLock lock(mutex_); incoming_messages_.emplace_back(std::move(message)); @@ -541,32 +552,17 @@ void MessagePortData::AddToIncomingQueue(Message&& message) { } void MessagePortData::Entangle(MessagePortData* a, MessagePortData* b) { - CHECK_NULL(a->sibling_); - CHECK_NULL(b->sibling_); - a->sibling_ = b; - b->sibling_ = a; - a->sibling_mutex_ = b->sibling_mutex_; + CHECK(!a->group_); + CHECK(!b->group_); + b->group_ = a->group_ = std::make_shared<SiblingGroup>(); + a->group_->Entangle(a); + a->group_->Entangle(b); } void MessagePortData::Disentangle() { - // Grab a copy of the sibling mutex, then replace it so that each sibling - // has its own sibling_mutex_ now. - std::shared_ptr<Mutex> sibling_mutex = sibling_mutex_; - Mutex::ScopedLock sibling_lock(*sibling_mutex); - sibling_mutex_ = std::make_shared<Mutex>(); - - MessagePortData* sibling = sibling_; - if (sibling_ != nullptr) { - sibling_->sibling_ = nullptr; - sibling_ = nullptr; - } - - // We close MessagePorts after disentanglement, so we enqueue a corresponding - // message and trigger the corresponding uv_async_t to let them know that - // this happened. - AddToIncomingQueue(Message()); - if (sibling != nullptr) { - sibling->AddToIncomingQueue(Message()); + if (group_) { + group_->Disentangle(this); + group_.reset(); } } @@ -576,12 +572,13 @@ MessagePort::~MessagePort() { MessagePort::MessagePort(Environment* env, Local<Context> context, - Local<Object> wrap) + Local<Object> wrap, + const std::string& name) : HandleWrap(env, wrap, reinterpret_cast<uv_handle_t*>(&async_), AsyncWrap::PROVIDER_MESSAGEPORT), - data_(new MessagePortData(this)) { + data_(new MessagePortData(this, name)) { auto onmessage = [](uv_async_t* handle) { // Called when data has been put into the queue. MessagePort* channel = ContainerOf(&MessagePort::async_, handle); @@ -647,7 +644,8 @@ void MessagePort::New(const FunctionCallbackInfo<Value>& args) { MessagePort* MessagePort::New( Environment* env, Local<Context> context, - std::unique_ptr<MessagePortData> data) { + std::unique_ptr<MessagePortData> data, + const std::string& name) { Context::Scope context_scope(context); Local<FunctionTemplate> ctor_templ = GetMessagePortConstructorTemplate(env); @@ -656,7 +654,7 @@ MessagePort* MessagePort::New( Local<Object> instance; if (!ctor_templ->InstanceTemplate()->NewInstance(context).ToLocal(&instance)) return nullptr; - MessagePort* port = new MessagePort(env, context, instance); + MessagePort* port = new MessagePort(env, context, instance, name); CHECK_NOT_NULL(port); if (port->IsHandleClosing()) { // Construction failed with an exception. @@ -681,7 +679,7 @@ MessagePort* MessagePort::New( MaybeLocal<Value> MessagePort::ReceiveMessage(Local<Context> context, bool only_if_receiving) { - Message received; + std::shared_ptr<Message> received; { // Get the head of the message queue. Mutex::ScopedLock lock(data_->mutex_); @@ -695,22 +693,22 @@ MaybeLocal<Value> MessagePort::ReceiveMessage(Local<Context> context, // receive is not the final "close" message. if (data_->incoming_messages_.empty() || (!wants_message && - !data_->incoming_messages_.front().IsCloseMessage())) { + !data_->incoming_messages_.front()->IsCloseMessage())) { return env()->no_message_symbol(); } - received = std::move(data_->incoming_messages_.front()); + received = data_->incoming_messages_.front(); data_->incoming_messages_.pop_front(); } - if (received.IsCloseMessage()) { + if (received->IsCloseMessage()) { Close(); return env()->no_message_symbol(); } if (!env()->can_call_into_js()) return MaybeLocal<Value>(); - return received.Deserialize(env(), context); + return received->Deserialize(env(), context); } void MessagePort::OnMessage() { @@ -829,13 +827,13 @@ Maybe<bool> MessagePort::PostMessage(Environment* env, Local<Object> obj = object(isolate); Local<Context> context = obj->CreationContext(); - Message msg; + std::shared_ptr<Message> msg = std::make_shared<Message>(); // Per spec, we need to both check if transfer list has the source port, and // serialize the input message, even if the MessagePort is closed or detached. Maybe<bool> serialization_maybe = - msg.Serialize(env, context, message_v, transfer_v, obj); + msg->Serialize(env, context, message_v, transfer_v, obj); if (data_ == nullptr) { return serialization_maybe; } @@ -843,26 +841,26 @@ Maybe<bool> MessagePort::PostMessage(Environment* env, return Nothing<bool>(); } - Mutex::ScopedLock lock(*data_->sibling_mutex_); - bool doomed = false; + std::string error; + Maybe<bool> res = data_->Dispatch(msg, &error); + if (res.IsNothing()) + return res; - // Check if the target port is posted to itself. - if (data_->sibling_ != nullptr) { - for (const auto& transferable : msg.transferables()) { - if (data_->sibling_ == transferable.get()) { - doomed = true; - ProcessEmitWarning(env, "The target port was posted to itself, and " - "the communication channel was lost"); - break; - } - } - } + if (!error.empty()) + ProcessEmitWarning(env, error.c_str()); - if (data_->sibling_ == nullptr || doomed) - return Just(true); + return res; +} - data_->sibling_->AddToIncomingQueue(std::move(msg)); - return Just(true); +Maybe<bool> MessagePortData::Dispatch( + std::shared_ptr<Message> message, + std::string* error) { + if (!group_) { + if (error != nullptr) + *error = "MessagePortData is not entangled."; + return Nothing<bool>(); + } + return group_->Dispatch(this, message, error); } static Maybe<bool> ReadIterable(Environment* env, @@ -969,7 +967,9 @@ void MessagePort::PostMessage(const FunctionCallbackInfo<Value>& args) { return; } - port->PostMessage(env, args[0], transfer_list); + Maybe<bool> res = port->PostMessage(env, args[0], transfer_list); + if (res.IsJust()) + args.GetReturnValue().Set(res.FromJust()); } void MessagePort::Start() { @@ -1273,6 +1273,99 @@ Maybe<bool> JSTransferable::Data::FinalizeTransferWrite( return ret; } +std::shared_ptr<SiblingGroup> SiblingGroup::Get(const std::string& name) { + if (name.empty()) return {}; + Mutex::ScopedLock lock(SiblingGroup::groups_mutex_); + std::shared_ptr<SiblingGroup> group; + auto i = groups_.find(name); + if (i == groups_.end() || i->second.expired()) { + group = std::make_shared<SiblingGroup>(name); + groups_[name] = group; + } else { + group = i->second.lock(); + } + return group; +} + +void SiblingGroup::CheckSiblingGroup(const std::string& name) { + Mutex::ScopedLock lock(SiblingGroup::groups_mutex_); + auto i = groups_.find(name); + if (i != groups_.end() && i->second.expired()) + groups_.erase(name); +} + +SiblingGroup::SiblingGroup(const std::string& name) + : name_(name) { } + +SiblingGroup::~SiblingGroup() { + // If this is a named group, check to see if we can remove the group + if (!name_.empty()) + CheckSiblingGroup(name_); +} + +Maybe<bool> SiblingGroup::Dispatch( + MessagePortData* source, + std::shared_ptr<Message> message, + std::string* error) { + + Mutex::ScopedLock lock(group_mutex_); + + // The source MessagePortData is not part of this group. + if (ports_.find(source) == ports_.end()) { + if (error != nullptr) + *error = "Source MessagePort is not entangled with this group."; + return Nothing<bool>(); + } + + // There are no destination ports. + if (size() <= 1) + return Just(false); + + // Transferables cannot be used when there is more + // than a single destination. + if (size() > 2 && message->transferables().size()) { + if (error != nullptr) + *error = "Transferables cannot be used with multiple destinations."; + return Nothing<bool>(); + } + + for (MessagePortData* port : ports_) { + if (port == source) + continue; + // This loop should only be entered if there's only a single destination + for (const auto& transferable : message->transferables()) { + if (port == transferable.get()) { + if (error != nullptr) { + *error = "The target port was posted to itself, and the " + "communication channel was lost"; + } + return Just(true); + } + } + port->AddToIncomingQueue(message); + } + + return Just(true); +} + +void SiblingGroup::Entangle(MessagePortData* data) { + Mutex::ScopedLock lock(group_mutex_); + ports_.insert(data); +} + +void SiblingGroup::Disentangle(MessagePortData* data) { + Mutex::ScopedLock lock(group_mutex_); + ports_.erase(data); + + data->AddToIncomingQueue(std::make_shared<Message>()); + // If this is an anonymous group and there's another port, close it. + if (size() == 1 && name_.empty()) + (*(ports_.begin()))->AddToIncomingQueue(std::make_shared<Message>()); +} + +SiblingGroup::Map SiblingGroup::groups_; +Mutex SiblingGroup::groups_mutex_; + namespace { static void SetDeserializerCreateObjectFunction( @@ -1308,6 +1401,16 @@ static void MessageChannel(const FunctionCallbackInfo<Value>& args) { .Check(); } +static void BroadcastChannel(const FunctionCallbackInfo<Value>& args) { + CHECK(args[0]->IsString()); + Environment* env = Environment::GetCurrent(args); + Context::Scope context_scope(env->context()); + Utf8Value name(env->isolate(), args[0]); + MessagePort* port = + MessagePort::New(env, env->context(), nullptr, std::string(*name)); + args.GetReturnValue().Set(port->object()); +} + static void InitMessaging(Local<Object> target, Local<Value> unused, Local<Context> context, @@ -1352,6 +1455,7 @@ static void InitMessaging(Local<Object> target, MessagePort::MoveToContext); env->SetMethod(target, "setDeserializerCreateObjectFunction", SetDeserializerCreateObjectFunction); + env->SetMethod(target, "broadcastChannel", BroadcastChannel); { Local<Function> domexception = GetDOMException(context).ToLocalChecked(); @@ -1365,6 +1469,7 @@ static void InitMessaging(Local<Object> target, static void RegisterExternalReferences(ExternalReferenceRegistry* registry) { registry->Register(MessageChannel); + registry->Register(BroadcastChannel); registry->Register(JSTransferable::New); registry->Register(MessagePort::New); registry->Register(MessagePort::PostMessage); diff --git a/src/node_messaging.h b/src/node_messaging.h index 7ef02226480..22c11321ef7 100644 --- a/src/node_messaging.h +++ b/src/node_messaging.h @@ -5,7 +5,11 @@ #include "env.h" #include "node_mutex.h" -#include <list> +#include "v8.h" +#include <deque> +#include <string> +#include <unordered_map> +#include <set> namespace node { namespace worker { @@ -45,6 +49,7 @@ class Message : public MemoryRetainer { // V8 ValueSerializer API. If `payload` is empty, this message indicates // that the receiving message port should close itself. explicit Message(MallocedBuffer<char>&& payload = MallocedBuffer<char>()); + ~Message() = default; Message(Message&& other) = default; Message& operator=(Message&& other) = default; @@ -105,11 +110,58 @@ class Message : public MemoryRetainer { friend class MessagePort; }; +class SiblingGroup { + public: + // Named SiblingGroup, Used for one-to-many BroadcastChannels. + static std::shared_ptr<SiblingGroup> Get(const std::string& name); + + // Anonymous SiblingGroup, Used for one-to-one MessagePort pairs. + SiblingGroup() = default; + explicit SiblingGroup(const std::string& name); + ~SiblingGroup(); + + // Dispatches the Message to the collection of associated + // ports. If there is more than one destination port and + // the Message contains transferables, Dispatch will fail. + // Returns Just(true) if successful and the message was + // dispatched to at least one destination. Returns Just(false) + // if there were no destinations. Returns Nothing<bool>() + // if there was an error. If error is not nullptr, it will + // be set to an error message or warning message as appropriate. + v8::Maybe<bool> Dispatch( + MessagePortData* source, + std::shared_ptr<Message> message, + std::string* error = nullptr); + + void Entangle(MessagePortData* data); + + void Disentangle(MessagePortData* data); + + const std::string& name() const { return name_; } + + size_t size() const { return ports_.size(); } + + private: + std::string name_; + std::set<MessagePortData*> ports_; + Mutex group_mutex_; + + static void CheckSiblingGroup(const std::string& name); + + using Map = + std::unordered_map<std::string, std::weak_ptr<SiblingGroup>>; + + static Mutex groups_mutex_; + static Map groups_; +}; + // This contains all data for a `MessagePort` instance that is not tied to // a specific Environment/Isolate/event loop, for easier transfer between those. class MessagePortData : public TransferData { public: - explicit MessagePortData(MessagePort* owner); + explicit MessagePortData( + MessagePort* owner, + const std::string& name = std::string()); ~MessagePortData() override; MessagePortData(MessagePortData&& other) = delete; @@ -119,7 +171,10 @@ class MessagePortData : public TransferData { // Add a message to the incoming queue and notify the receiver. // This may be called from any thread. - void AddToIncomingQueue(Message&& message); + void AddToIncomingQueue(std::shared_ptr<Message> message); + v8::Maybe<bool> Dispatch( + std::shared_ptr<Message> message, + std::string* error = nullptr); // Turns `a` and `b` into siblings, i.e. connects the sending side of one // to the receiving side of the other. This is not thread-safe. @@ -144,14 +199,9 @@ class MessagePortData : public TransferData { // This mutex protects all fields below it, with the exception of // sibling_. mutable Mutex mutex_; - std::list<Message> incoming_messages_; + std::deque<std::shared_ptr<Message>> incoming_messages_; MessagePort* owner_ = nullptr; - // This mutex protects the sibling_ field and is shared between two entangled - // MessagePorts. If both mutexes are acquired, this one needs to be - // acquired first. - std::shared_ptr<Mutex> sibling_mutex_ = std::make_shared<Mutex>(); - MessagePortData* sibling_ = nullptr; - + std::shared_ptr<SiblingGroup> group_; friend class MessagePort; }; @@ -166,7 +216,8 @@ class MessagePort : public HandleWrap { // creating MessagePort instances. MessagePort(Environment* env, v8::Local<v8::Context> context, - v8::Local<v8::Object> wrap); + v8::Local<v8::Object> wrap, + const std::string& name = std::string()); public: ~MessagePort() override; @@ -175,7 +226,8 @@ class MessagePort : public HandleWrap { // `MessagePortData` object. static MessagePort* New(Environment* env, v8::Local<v8::Context> context, - std::unique_ptr<MessagePortData> data = nullptr); + std::unique_ptr<MessagePortData> data = nullptr, + const std::string& name = std::string()); // Send a message, i.e. deliver it into the sibling's incoming queue. // If this port is closed, or if there is no sibling, this message is diff --git a/test/parallel/test-worker-broadcastchannel-wpt.js b/test/parallel/test-worker-broadcastchannel-wpt.js new file mode 100644 index 00000000000..3a44366d5c9 --- /dev/null +++ b/test/parallel/test-worker-broadcastchannel-wpt.js @@ -0,0 +1,148 @@ +'use strict'; + +const common = require('../common'); +const assert = require('assert'); +const { + BroadcastChannel, +} = require('worker_threads'); + +{ + const c1 = new BroadcastChannel('eventType').unref(); + const c2 = new BroadcastChannel('eventType'); + + c2.onmessage = common.mustCall((e) => { + assert(e instanceof MessageEvent); + assert.strictEqual(e.target, c2); + assert.strictEqual(e.type, 'message'); + assert.strictEqual(e.data, 'hello world'); + c2.close(); + }); + c1.postMessage('hello world'); +} + +{ + // Messages are delivered in port creation order. + // TODO(@jasnell): The ordering here is different than + // what the browsers would implement due to the different + // dispatching algorithm under the covers. What's not + // immediately clear is whether the ordering is spec + // mandated. In this test, c1 should receive events + // first, then c2, then c3. In the Node.js dispatching + // algorithm this means the ordering is: + // from c3 (c1 from c3) + // done (c1 from c2) + // from c1 (c2 from c1) + // from c3 (c2 from c3) + // from c1 (c3 from c1) + // done (c3 from c2) + // + // Whereas in the browser-ordering (as illustrated in the + // Web Platform Tests) it would be: + // from c1 (c2 from c1) + // from c1 (c3 from c1) + // from c3 (c1 from c3) + // from c3 (c2 from c3) + // done (c1 from c2) + // done (c3 from c2) + const c1 = new BroadcastChannel('order'); + const c2 = new BroadcastChannel('order'); + const c3 = new BroadcastChannel('order'); + + const events = []; + let doneCount = 0; + const handler = common.mustCall((e) => { + events.push(e); + if (e.data === 'done') { + doneCount++; + if (doneCount === 2) { + assert.strictEqual(events.length, 6); + assert.strictEqual(events[0].data, 'from c3'); + assert.strictEqual(events[1].data, 'done'); + assert.strictEqual(events[2].data, 'from c1'); + assert.strictEqual(events[3].data, 'from c3'); + assert.strictEqual(events[4].data, 'from c1'); + assert.strictEqual(events[5].data, 'done'); + c1.close(); + c2.close(); + c3.close(); + } + } + }, 6); + c1.onmessage = handler; + c2.onmessage = handler; + c3.onmessage = handler; + + c1.postMessage('from c1'); + c3.postMessage('from c3'); + c2.postMessage('done'); +} + +{ + // Messages aren't delivered to a closed port + const c1 = new BroadcastChannel('closed1').unref(); + const c2 = new BroadcastChannel('closed1'); + const c3 = new BroadcastChannel('closed1'); + + c2.onmessage = common.mustNotCall(); + c2.close(); + c3.onmessage = common.mustCall(() => c3.close()); + c1.postMessage('test'); +} + +{ + // Messages aren't delivered to a port closed after calling postMessage. + const c1 = new BroadcastChannel('closed2').unref(); + const c2 = new BroadcastChannel('closed2'); + const c3 = new BroadcastChannel('closed2'); + + c2.onmessage = common.mustNotCall(); + c3.onmessage = common.mustCall(() => c3.close()); + c1.postMessage('test'); + c2.close(); +} + +{ + // Closing and creating channels during message delivery works correctly + const c1 = new BroadcastChannel('create-in-onmessage').unref(); + const c2 = new BroadcastChannel('create-in-onmessage'); + + c2.onmessage = common.mustCall((e) => { + assert.strictEqual(e.data, 'first'); + c2.close(); + const c3 = new BroadcastChannel('create-in-onmessage'); + c3.onmessage = common.mustCall((event) => { + assert.strictEqual(event.data, 'done'); + c3.close(); + }); + c1.postMessage('done'); + }); + c1.postMessage('first'); + c2.postMessage('second'); +} + +{ + // Closing a channel in onmessage prevents already queued tasks + // from firing onmessage events + const c1 = new BroadcastChannel('close-in-onmessage2').unref(); + const c2 = new BroadcastChannel('close-in-onmessage2'); + const c3 = new BroadcastChannel('close-in-onmessage2'); + const events = []; + c1.onmessage = (e) => events.push('c1: ' + e.data); + c2.onmessage = (e) => events.push('c2: ' + e.data); + c3.onmessage = (e) => events.push('c3: ' + e.data); + + // c2 closes itself when it receives the first message + c2.addEventListener('message', common.mustCall(() => c2.close())); + + c3.addEventListener('message', common.mustCall((e) => { + if (e.data === 'done') { + assert.deepStrictEqual(events, [ + 'c2: first', + 'c3: first', + 'c3: done']); + c3.close(); + } + }, 2)); + c1.postMessage('first'); + c1.postMessage('done'); +} diff --git a/test/parallel/test-worker-broadcastchannel.js b/test/parallel/test-worker-broadcastchannel.js new file mode 100644 index 00000000000..1f104a5edf4 --- /dev/null +++ b/test/parallel/test-worker-broadcastchannel.js @@ -0,0 +1,135 @@ +'use strict'; + +const common = require('../common'); +const { + BroadcastChannel, + Worker, +} = require('worker_threads'); +const assert = require('assert'); + +assert.throws(() => new BroadcastChannel(Symbol('test')), { + message: /Cannot convert a Symbol value to a string/ +}); + +assert.throws(() => new BroadcastChannel(), { + message: /The "name" argument must be specified/ +}); + +// These should all just work +[undefined, 1, null, 'test', 1n, false, Infinity].forEach((i) => { + const bc = new BroadcastChannel(i); + assert.strictEqual(bc.name, `${i}`); + bc.close(); +}); + +{ + // Empty postMessage throws + const bc = new BroadcastChannel('whatever'); + assert.throws(() => bc.postMessage(), { + message: /The "message" argument must be specified/ + }); + bc.close(); + // Calling close multiple times should not throw + bc.close(); + + // Calling postMessage after close should throw + assert.throws(() => bc.postMessage(null), { + message: /BroadcastChannel is closed/ + }); +} + +{ + const bc1 = new BroadcastChannel('channel1'); + const bc2 = new BroadcastChannel('channel1'); + const bc3 = new BroadcastChannel('channel1'); + const bc4 = new BroadcastChannel('channel2'); + assert.strictEqual(bc1.name, 'channel1'); + assert.strictEqual(bc2.name, 'channel1'); + assert.strictEqual(bc3.name, 'channel1'); + assert.strictEqual(bc4.name, 'channel2'); + bc1.addEventListener('message', common.mustCall((event) => { + assert.strictEqual(event.data, 'hello'); + bc1.close(); + bc2.close(); + bc4.close(); + })); + bc3.addEventListener('message', common.mustCall((event) => { + assert.strictEqual(event.data, 'hello'); + bc3.close(); + })); + bc2.addEventListener('message', common.mustNotCall()); + bc4.addEventListener('message', common.mustNotCall()); + bc2.postMessage('hello'); +} + +{ + const bc1 = new BroadcastChannel('onmessage-channel1'); + const bc2 = new BroadcastChannel('onmessage-channel1'); + const bc3 = new BroadcastChannel('onmessage-channel1'); + const bc4 = new BroadcastChannel('onmessage-channel2'); + assert.strictEqual(bc1.name, 'onmessage-channel1'); + assert.strictEqual(bc2.name, 'onmessage-channel1'); + assert.strictEqual(bc3.name, 'onmessage-channel1'); + assert.strictEqual(bc4.name, 'onmessage-channel2'); + bc1.onmessage = common.mustCall((event) => { + assert.strictEqual(event.data, 'hello'); + bc1.close(); + bc2.close(); + bc4.close(); + }); + bc3.onmessage = common.mustCall((event) => { + assert.strictEqual(event.data, 'hello'); + bc3.close(); + }); + bc2.onmessage = common.mustNotCall(); + bc4.onmessage = common.mustNotCall(); + bc2.postMessage('hello'); +} + +{ + const bc = new BroadcastChannel('worker1'); + new Worker(` + const assert = require('assert'); + const { BroadcastChannel } = require('worker_threads'); + const bc = new BroadcastChannel('worker1'); + bc.addEventListener('message', (event) => { + assert.strictEqual(event.data, 123); + // If this close() is not executed, the test should hang and timeout. + // If the test does hang and timeout in CI, then the first step should + // be to check that the two bc.close() calls are being made. + bc.close(); + }); + bc.postMessage(321); + `, { eval: true }); + bc.addEventListener('message', common.mustCall(({ data }) => { + assert.strictEqual(data, 321); + bc.postMessage(123); + bc.close(); + })); +} + +{ + const bc1 = new BroadcastChannel('channel3'); + const bc2 = new BroadcastChannel('channel3'); + bc2.postMessage(new SharedArrayBuffer(10)); + bc1.addEventListener('message', common.mustCall(({ data }) => { + assert(data instanceof SharedArrayBuffer); + bc1.close(); + bc2.close(); + })); +} + +{ + const bc1 = new BroadcastChannel('channel3'); + const mc = new MessageChannel(); + assert.throws(() => bc1.postMessage(mc), { + message: /Object that needs transfer was found/ + }); + assert.throws(() => bc1.postMessage(Symbol()), { + message: /Symbol\(\) could not be cloned/ + }); + bc1.close(); + assert.throws(() => bc1.postMessage(Symbol()), { + message: /BroadcastChannel is closed/ + }); +} diff --git a/tools/doc/type-parser.js b/tools/doc/type-parser.js index e3d032319de..72a2ec41994 100644 --- a/tools/doc/type-parser.js +++ b/tools/doc/type-parser.js @@ -40,6 +40,10 @@ const customTypesMap = { 'WebAssembly.Instance': `${jsDocPrefix}Reference/Global_Objects/WebAssembly/Instance`, + 'BroadcastChannel': + 'worker_threads.html#worker_threads_class_broadcastchannel_' + + 'extends_eventtarget', + 'Iterable': `${jsDocPrefix}Reference/Iteration_protocols#The_iterable_protocol`, 'Iterator': |