Welcome to mirror list, hosted at ThFree Co, Russian Federation.

github.com/nodejs/node.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/node_messaging.cc53
-rw-r--r--src/node_messaging.h16
-rw-r--r--test/parallel/test-worker-message-port-message-before-close.js38
3 files changed, 71 insertions, 36 deletions
diff --git a/src/node_messaging.cc b/src/node_messaging.cc
index b9212ba272d..98ef42df758 100644
--- a/src/node_messaging.cc
+++ b/src/node_messaging.cc
@@ -40,6 +40,10 @@ namespace worker {
Message::Message(MallocedBuffer<char>&& buffer)
: main_message_buf_(std::move(buffer)) {}
+bool Message::IsCloseMessage() const {
+ return main_message_buf_.data == nullptr;
+}
+
namespace {
// This is used to tell V8 how to read transferred host objects, like other
@@ -91,6 +95,8 @@ class DeserializerDelegate : public ValueDeserializer::Delegate {
MaybeLocal<Value> Message::Deserialize(Environment* env,
Local<Context> context) {
+ CHECK(!IsCloseMessage());
+
EscapableHandleScope handle_scope(env->isolate());
Context::Scope context_scope(context);
@@ -395,6 +401,7 @@ Maybe<bool> Message::Serialize(Environment* env,
// The serializer gave us a buffer allocated using `malloc()`.
std::pair<uint8_t*, size_t> data = serializer.Release();
+ CHECK_NOT_NULL(data.first);
main_message_buf_ =
MallocedBuffer<char>(reinterpret_cast<char*>(data.first), data.second);
return Just(true);
@@ -430,11 +437,6 @@ void MessagePortData::AddToIncomingQueue(Message&& message) {
}
}
-bool MessagePortData::IsSiblingClosed() const {
- Mutex::ScopedLock lock(*sibling_mutex_);
- return sibling_ == nullptr;
-}
-
void MessagePortData::Entangle(MessagePortData* a, MessagePortData* b) {
CHECK_NULL(a->sibling_);
CHECK_NULL(b->sibling_);
@@ -443,12 +445,6 @@ void MessagePortData::Entangle(MessagePortData* a, MessagePortData* b) {
a->sibling_mutex_ = b->sibling_mutex_;
}
-void MessagePortData::PingOwnerAfterDisentanglement() {
- Mutex::ScopedLock lock(mutex_);
- if (owner_ != nullptr)
- owner_->TriggerAsync();
-}
-
void MessagePortData::Disentangle() {
// Grab a copy of the sibling mutex, then replace it so that each sibling
// has its own sibling_mutex_ now.
@@ -462,11 +458,12 @@ void MessagePortData::Disentangle() {
sibling_ = nullptr;
}
- // We close MessagePorts after disentanglement, so we trigger the
- // corresponding uv_async_t to let them know that this happened.
- PingOwnerAfterDisentanglement();
+ // We close MessagePorts after disentanglement, so we enqueue a corresponding
+ // message and trigger the corresponding uv_async_t to let them know that
+ // this happened.
+ AddToIncomingQueue(Message());
if (sibling != nullptr) {
- sibling->PingOwnerAfterDisentanglement();
+ sibling->AddToIncomingQueue(Message());
}
}
@@ -590,14 +587,25 @@ void MessagePort::OnMessage() {
Debug(this, "MessagePort has message, receiving = %d",
static_cast<int>(receiving_messages_));
- if (!receiving_messages_)
- break;
- if (data_->incoming_messages_.empty())
+ // We have nothing to do if:
+ // - There are no pending messages
+ // - We are not intending to receive messages, and the message we would
+ // receive is not the final "close" message.
+ if (data_->incoming_messages_.empty() ||
+ (!receiving_messages_ &&
+ !data_->incoming_messages_.front().IsCloseMessage())) {
break;
+ }
+
received = std::move(data_->incoming_messages_.front());
data_->incoming_messages_.pop_front();
}
+ if (received.IsCloseMessage()) {
+ Close();
+ return;
+ }
+
if (!env()->can_call_into_js()) {
Debug(this, "MessagePort drains queue because !can_call_into_js()");
// In this case there is nothing to do but to drain the current queue.
@@ -628,15 +636,6 @@ void MessagePort::OnMessage() {
}
}
}
-
- if (data_ && data_->IsSiblingClosed()) {
- Close();
- }
-}
-
-bool MessagePort::IsSiblingClosed() const {
- CHECK(data_);
- return data_->IsSiblingClosed();
}
void MessagePort::OnClose() {
diff --git a/src/node_messaging.h b/src/node_messaging.h
index 0a729c14108..08a6798e3cd 100644
--- a/src/node_messaging.h
+++ b/src/node_messaging.h
@@ -17,6 +17,9 @@ class MessagePort;
// Represents a single communication message.
class Message : public MemoryRetainer {
public:
+ // Create a Message with a specific underlying payload, in the format of the
+ // V8 ValueSerializer API. If `payload` is empty, this message indicates
+ // that the receiving message port should close itself.
explicit Message(MallocedBuffer<char>&& payload = MallocedBuffer<char>());
Message(Message&& other) = default;
@@ -24,6 +27,10 @@ class Message : public MemoryRetainer {
Message& operator=(const Message&) = delete;
Message(const Message&) = delete;
+ // Whether this is a message indicating that the port is to be closed.
+ // This is the last message to be received by a MessagePort.
+ bool IsCloseMessage() const;
+
// Deserialize the contained JS value. May only be called once, and only
// after Serialize() has been called (e.g. by another thread).
v8::MaybeLocal<v8::Value> Deserialize(Environment* env,
@@ -89,10 +96,6 @@ class MessagePortData : public MemoryRetainer {
// This may be called from any thread.
void AddToIncomingQueue(Message&& message);
- // Returns true if and only this MessagePort is currently not entangled
- // with another message port.
- bool IsSiblingClosed() const;
-
// Turns `a` and `b` into siblings, i.e. connects the sending side of one
// to the receiving side of the other. This is not thread-safe.
static void Entangle(MessagePortData* a, MessagePortData* b);
@@ -109,10 +112,6 @@ class MessagePortData : public MemoryRetainer {
SET_SELF_SIZE(MessagePortData)
private:
- // After disentangling this message port, the owner handle (if any)
- // is asynchronously triggered, so that it can close down naturally.
- void PingOwnerAfterDisentanglement();
-
// This mutex protects all fields below it, with the exception of
// sibling_.
mutable Mutex mutex_;
@@ -178,7 +177,6 @@ class MessagePort : public HandleWrap {
// messages.
std::unique_ptr<MessagePortData> Detach();
- bool IsSiblingClosed() const;
void Close(
v8::Local<v8::Value> close_callback = v8::Local<v8::Value>()) override;
diff --git a/test/parallel/test-worker-message-port-message-before-close.js b/test/parallel/test-worker-message-port-message-before-close.js
new file mode 100644
index 00000000000..ecaad9c8767
--- /dev/null
+++ b/test/parallel/test-worker-message-port-message-before-close.js
@@ -0,0 +1,38 @@
+'use strict';
+const common = require('../common');
+const assert = require('assert');
+const { once } = require('events');
+const { Worker, MessageChannel } = require('worker_threads');
+
+// This is a regression test for the race condition underlying
+// https://github.com/nodejs/node/issues/22762.
+// It ensures that all messages send before a MessagePort#close() call are
+// received. Previously, what could happen was a race condition like this:
+// - Thread 1 sends message A
+// - Thread 2 begins receiving/emitting message A
+// - Thread 1 sends message B
+// - Thread 1 closes its side of the channel
+// - Thread 2 finishes receiving/emitting message A
+// - Thread 2 sees that the port should be closed
+// - Thread 2 closes the port, discarding message B in the process.
+
+async function test() {
+ const worker = new Worker(`
+ require('worker_threads').parentPort.on('message', ({ port }) => {
+ port.postMessage('firstMessage');
+ port.postMessage('lastMessage');
+ port.close();
+ });
+ `, { eval: true });
+
+ for (let i = 0; i < 10000; i++) {
+ const { port1, port2 } = new MessageChannel();
+ worker.postMessage({ port: port2 }, [ port2 ]);
+ await once(port1, 'message'); // 'complexObject'
+ assert.deepStrictEqual(await once(port1, 'message'), ['lastMessage']);
+ }
+
+ worker.terminate();
+}
+
+test().then(common.mustCall());