Welcome to mirror list, hosted at ThFree Co, Russian Federation.

github.com/nodejs/node.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
Diffstat (limited to 'src/node_messaging.cc')
-rw-r--r--src/node_messaging.cc53
1 files changed, 26 insertions, 27 deletions
diff --git a/src/node_messaging.cc b/src/node_messaging.cc
index b9212ba272d..98ef42df758 100644
--- a/src/node_messaging.cc
+++ b/src/node_messaging.cc
@@ -40,6 +40,10 @@ namespace worker {
Message::Message(MallocedBuffer<char>&& buffer)
: main_message_buf_(std::move(buffer)) {}
+bool Message::IsCloseMessage() const {
+ return main_message_buf_.data == nullptr;
+}
+
namespace {
// This is used to tell V8 how to read transferred host objects, like other
@@ -91,6 +95,8 @@ class DeserializerDelegate : public ValueDeserializer::Delegate {
MaybeLocal<Value> Message::Deserialize(Environment* env,
Local<Context> context) {
+ CHECK(!IsCloseMessage());
+
EscapableHandleScope handle_scope(env->isolate());
Context::Scope context_scope(context);
@@ -395,6 +401,7 @@ Maybe<bool> Message::Serialize(Environment* env,
// The serializer gave us a buffer allocated using `malloc()`.
std::pair<uint8_t*, size_t> data = serializer.Release();
+ CHECK_NOT_NULL(data.first);
main_message_buf_ =
MallocedBuffer<char>(reinterpret_cast<char*>(data.first), data.second);
return Just(true);
@@ -430,11 +437,6 @@ void MessagePortData::AddToIncomingQueue(Message&& message) {
}
}
-bool MessagePortData::IsSiblingClosed() const {
- Mutex::ScopedLock lock(*sibling_mutex_);
- return sibling_ == nullptr;
-}
-
void MessagePortData::Entangle(MessagePortData* a, MessagePortData* b) {
CHECK_NULL(a->sibling_);
CHECK_NULL(b->sibling_);
@@ -443,12 +445,6 @@ void MessagePortData::Entangle(MessagePortData* a, MessagePortData* b) {
a->sibling_mutex_ = b->sibling_mutex_;
}
-void MessagePortData::PingOwnerAfterDisentanglement() {
- Mutex::ScopedLock lock(mutex_);
- if (owner_ != nullptr)
- owner_->TriggerAsync();
-}
-
void MessagePortData::Disentangle() {
// Grab a copy of the sibling mutex, then replace it so that each sibling
// has its own sibling_mutex_ now.
@@ -462,11 +458,12 @@ void MessagePortData::Disentangle() {
sibling_ = nullptr;
}
- // We close MessagePorts after disentanglement, so we trigger the
- // corresponding uv_async_t to let them know that this happened.
- PingOwnerAfterDisentanglement();
+ // We close MessagePorts after disentanglement, so we enqueue a corresponding
+ // message and trigger the corresponding uv_async_t to let them know that
+ // this happened.
+ AddToIncomingQueue(Message());
if (sibling != nullptr) {
- sibling->PingOwnerAfterDisentanglement();
+ sibling->AddToIncomingQueue(Message());
}
}
@@ -590,14 +587,25 @@ void MessagePort::OnMessage() {
Debug(this, "MessagePort has message, receiving = %d",
static_cast<int>(receiving_messages_));
- if (!receiving_messages_)
- break;
- if (data_->incoming_messages_.empty())
+ // We have nothing to do if:
+ // - There are no pending messages
+ // - We are not intending to receive messages, and the message we would
+ // receive is not the final "close" message.
+ if (data_->incoming_messages_.empty() ||
+ (!receiving_messages_ &&
+ !data_->incoming_messages_.front().IsCloseMessage())) {
break;
+ }
+
received = std::move(data_->incoming_messages_.front());
data_->incoming_messages_.pop_front();
}
+ if (received.IsCloseMessage()) {
+ Close();
+ return;
+ }
+
if (!env()->can_call_into_js()) {
Debug(this, "MessagePort drains queue because !can_call_into_js()");
// In this case there is nothing to do but to drain the current queue.
@@ -628,15 +636,6 @@ void MessagePort::OnMessage() {
}
}
}
-
- if (data_ && data_->IsSiblingClosed()) {
- Close();
- }
-}
-
-bool MessagePort::IsSiblingClosed() const {
- CHECK(data_);
- return data_->IsSiblingClosed();
}
void MessagePort::OnClose() {