Welcome to mirror list, hosted at ThFree Co, Russian Federation.

github.com/nodejs/node.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAnna Henningsen <anna@addaleax.net>2019-05-15 02:24:57 +0300
committerAnna Henningsen <anna@addaleax.net>2019-05-17 15:01:27 +0300
commit6019060cbb0620d685c8a106a4184475420e922b (patch)
tree07cd79823b255f184a7f6f1dfd4652a87f4699e1 /src/node_messaging.cc
parent001526cc4ce6b3ec55fddcc927d9f610b162a657 (diff)
worker: use special message as MessagePort close command
When a `MessagePort` connected to another `MessagePort` closes, the latter `MessagePort` will be closed as well. Until now, this is done by testing whether the ports are still entangled after processing messages. This leaves open a race condition window in which messages sent just before the closure can be lost when timing is unfortunate. (A description of the timing is in the test file.) This can be addressed by using a special message instead, which is the last message received by a `MessagePort`. This way, all previously sent messages are processed first. Fixes: https://github.com/nodejs/node/issues/22762 PR-URL: https://github.com/nodejs/node/pull/27705 Reviewed-By: Rich Trott <rtrott@gmail.com> Reviewed-By: Colin Ihrig <cjihrig@gmail.com>
Diffstat (limited to 'src/node_messaging.cc')
-rw-r--r--src/node_messaging.cc53
1 files changed, 26 insertions, 27 deletions
diff --git a/src/node_messaging.cc b/src/node_messaging.cc
index b9212ba272d..98ef42df758 100644
--- a/src/node_messaging.cc
+++ b/src/node_messaging.cc
@@ -40,6 +40,10 @@ namespace worker {
Message::Message(MallocedBuffer<char>&& buffer)
: main_message_buf_(std::move(buffer)) {}
+bool Message::IsCloseMessage() const {
+ return main_message_buf_.data == nullptr;
+}
+
namespace {
// This is used to tell V8 how to read transferred host objects, like other
@@ -91,6 +95,8 @@ class DeserializerDelegate : public ValueDeserializer::Delegate {
MaybeLocal<Value> Message::Deserialize(Environment* env,
Local<Context> context) {
+ CHECK(!IsCloseMessage());
+
EscapableHandleScope handle_scope(env->isolate());
Context::Scope context_scope(context);
@@ -395,6 +401,7 @@ Maybe<bool> Message::Serialize(Environment* env,
// The serializer gave us a buffer allocated using `malloc()`.
std::pair<uint8_t*, size_t> data = serializer.Release();
+ CHECK_NOT_NULL(data.first);
main_message_buf_ =
MallocedBuffer<char>(reinterpret_cast<char*>(data.first), data.second);
return Just(true);
@@ -430,11 +437,6 @@ void MessagePortData::AddToIncomingQueue(Message&& message) {
}
}
-bool MessagePortData::IsSiblingClosed() const {
- Mutex::ScopedLock lock(*sibling_mutex_);
- return sibling_ == nullptr;
-}
-
void MessagePortData::Entangle(MessagePortData* a, MessagePortData* b) {
CHECK_NULL(a->sibling_);
CHECK_NULL(b->sibling_);
@@ -443,12 +445,6 @@ void MessagePortData::Entangle(MessagePortData* a, MessagePortData* b) {
a->sibling_mutex_ = b->sibling_mutex_;
}
-void MessagePortData::PingOwnerAfterDisentanglement() {
- Mutex::ScopedLock lock(mutex_);
- if (owner_ != nullptr)
- owner_->TriggerAsync();
-}
-
void MessagePortData::Disentangle() {
// Grab a copy of the sibling mutex, then replace it so that each sibling
// has its own sibling_mutex_ now.
@@ -462,11 +458,12 @@ void MessagePortData::Disentangle() {
sibling_ = nullptr;
}
- // We close MessagePorts after disentanglement, so we trigger the
- // corresponding uv_async_t to let them know that this happened.
- PingOwnerAfterDisentanglement();
+ // We close MessagePorts after disentanglement, so we enqueue a corresponding
+ // message and trigger the corresponding uv_async_t to let them know that
+ // this happened.
+ AddToIncomingQueue(Message());
if (sibling != nullptr) {
- sibling->PingOwnerAfterDisentanglement();
+ sibling->AddToIncomingQueue(Message());
}
}
@@ -590,14 +587,25 @@ void MessagePort::OnMessage() {
Debug(this, "MessagePort has message, receiving = %d",
static_cast<int>(receiving_messages_));
- if (!receiving_messages_)
- break;
- if (data_->incoming_messages_.empty())
+ // We have nothing to do if:
+ // - There are no pending messages
+ // - We are not intending to receive messages, and the message we would
+ // receive is not the final "close" message.
+ if (data_->incoming_messages_.empty() ||
+ (!receiving_messages_ &&
+ !data_->incoming_messages_.front().IsCloseMessage())) {
break;
+ }
+
received = std::move(data_->incoming_messages_.front());
data_->incoming_messages_.pop_front();
}
+ if (received.IsCloseMessage()) {
+ Close();
+ return;
+ }
+
if (!env()->can_call_into_js()) {
Debug(this, "MessagePort drains queue because !can_call_into_js()");
// In this case there is nothing to do but to drain the current queue.
@@ -628,15 +636,6 @@ void MessagePort::OnMessage() {
}
}
}
-
- if (data_ && data_->IsSiblingClosed()) {
- Close();
- }
-}
-
-bool MessagePort::IsSiblingClosed() const {
- CHECK(data_);
- return data_->IsSiblingClosed();
}
void MessagePort::OnClose() {