diff options
Diffstat (limited to 'src/node_messaging.cc')
-rw-r--r-- | src/node_messaging.cc | 53 |
1 files changed, 26 insertions, 27 deletions
diff --git a/src/node_messaging.cc b/src/node_messaging.cc index b9212ba272d..98ef42df758 100644 --- a/src/node_messaging.cc +++ b/src/node_messaging.cc @@ -40,6 +40,10 @@ namespace worker { Message::Message(MallocedBuffer<char>&& buffer) : main_message_buf_(std::move(buffer)) {} +bool Message::IsCloseMessage() const { + return main_message_buf_.data == nullptr; +} + namespace { // This is used to tell V8 how to read transferred host objects, like other @@ -91,6 +95,8 @@ class DeserializerDelegate : public ValueDeserializer::Delegate { MaybeLocal<Value> Message::Deserialize(Environment* env, Local<Context> context) { + CHECK(!IsCloseMessage()); + EscapableHandleScope handle_scope(env->isolate()); Context::Scope context_scope(context); @@ -395,6 +401,7 @@ Maybe<bool> Message::Serialize(Environment* env, // The serializer gave us a buffer allocated using `malloc()`. std::pair<uint8_t*, size_t> data = serializer.Release(); + CHECK_NOT_NULL(data.first); main_message_buf_ = MallocedBuffer<char>(reinterpret_cast<char*>(data.first), data.second); return Just(true); @@ -430,11 +437,6 @@ void MessagePortData::AddToIncomingQueue(Message&& message) { } } -bool MessagePortData::IsSiblingClosed() const { - Mutex::ScopedLock lock(*sibling_mutex_); - return sibling_ == nullptr; -} - void MessagePortData::Entangle(MessagePortData* a, MessagePortData* b) { CHECK_NULL(a->sibling_); CHECK_NULL(b->sibling_); @@ -443,12 +445,6 @@ void MessagePortData::Entangle(MessagePortData* a, MessagePortData* b) { a->sibling_mutex_ = b->sibling_mutex_; } -void MessagePortData::PingOwnerAfterDisentanglement() { - Mutex::ScopedLock lock(mutex_); - if (owner_ != nullptr) - owner_->TriggerAsync(); -} - void MessagePortData::Disentangle() { // Grab a copy of the sibling mutex, then replace it so that each sibling // has its own sibling_mutex_ now. @@ -462,11 +458,12 @@ void MessagePortData::Disentangle() { sibling_ = nullptr; } - // We close MessagePorts after disentanglement, so we trigger the - // corresponding uv_async_t to let them know that this happened. - PingOwnerAfterDisentanglement(); + // We close MessagePorts after disentanglement, so we enqueue a corresponding + // message and trigger the corresponding uv_async_t to let them know that + // this happened. + AddToIncomingQueue(Message()); if (sibling != nullptr) { - sibling->PingOwnerAfterDisentanglement(); + sibling->AddToIncomingQueue(Message()); } } @@ -590,14 +587,25 @@ void MessagePort::OnMessage() { Debug(this, "MessagePort has message, receiving = %d", static_cast<int>(receiving_messages_)); - if (!receiving_messages_) - break; - if (data_->incoming_messages_.empty()) + // We have nothing to do if: + // - There are no pending messages + // - We are not intending to receive messages, and the message we would + // receive is not the final "close" message. + if (data_->incoming_messages_.empty() || + (!receiving_messages_ && + !data_->incoming_messages_.front().IsCloseMessage())) { break; + } + received = std::move(data_->incoming_messages_.front()); data_->incoming_messages_.pop_front(); } + if (received.IsCloseMessage()) { + Close(); + return; + } + if (!env()->can_call_into_js()) { Debug(this, "MessagePort drains queue because !can_call_into_js()"); // In this case there is nothing to do but to drain the current queue. @@ -628,15 +636,6 @@ void MessagePort::OnMessage() { } } } - - if (data_ && data_->IsSiblingClosed()) { - Close(); - } -} - -bool MessagePort::IsSiblingClosed() const { - CHECK(data_); - return data_->IsSiblingClosed(); } void MessagePort::OnClose() { |