Welcome to mirror list, hosted at ThFree Co, Russian Federation.

github.com/nodejs/node.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorAnna Henningsen <anna@addaleax.net>2021-02-11 15:30:34 +0300
committerMichaël Zasso <targos@protonmail.com>2021-02-28 16:46:24 +0300
commit31f4600b7ad07a9bcdf66d0918eefc6b6f4af874 (patch)
tree98b1b5509c7f84b4cf4dbf085a22745dcfce5ad9 /src
parentddae11213302254792758cea21f6e73ae0848552 (diff)
worker: fix interaction of terminate() with messaging port
When a Worker is terminated, its own handle and the public `MessagePort` are `.ref()`’ed, so that all relevant events, including the `'exit'` events, end up being received. However, this is problematic if messages end up being queued from the Worker between the beginning of the `.terminate()` call and its completion, and there are no `'message'` event handlers present at that time. In that situation, currently the messages would not end up being processed, and since the MessagePort is still `.ref()`’ed, it would keep the event loop alive indefinitely. To fix this: - Make sure that all messages end up being received by `drainMessagePort()`, including cases in which the port had been stopped (i.e. there are no `'message'` listeners) and cases in which we exceed the limit for messages being processed in one batch. - Unref the Worker’s internal ports manually after the Worker has exited. Either of these solutions should be solving this on its own, but I think it makes sense to make sure that both of them happen during cleanup. PR-URL: https://github.com/nodejs/node/pull/37319 Reviewed-By: Benjamin Gruenbaum <benjamingr@gmail.com> Reviewed-By: Colin Ihrig <cjihrig@gmail.com> Reviewed-By: James M Snell <jasnell@gmail.com> Reviewed-By: Rich Trott <rtrott@gmail.com>
Diffstat (limited to 'src')
-rw-r--r--src/node_messaging.cc21
-rw-r--r--src/node_messaging.h9
2 files changed, 20 insertions, 10 deletions
diff --git a/src/node_messaging.cc b/src/node_messaging.cc
index 78fb46ab6f2..2699bd2792e 100644
--- a/src/node_messaging.cc
+++ b/src/node_messaging.cc
@@ -565,7 +565,7 @@ MessagePort::MessagePort(Environment* env,
auto onmessage = [](uv_async_t* handle) {
// Called when data has been put into the queue.
MessagePort* channel = ContainerOf(&MessagePort::async_, handle);
- channel->OnMessage();
+ channel->OnMessage(MessageProcessingMode::kNormalOperation);
};
CHECK_EQ(uv_async_init(env->event_loop(),
@@ -664,7 +664,7 @@ MessagePort* MessagePort::New(
}
MaybeLocal<Value> MessagePort::ReceiveMessage(Local<Context> context,
- bool only_if_receiving) {
+ MessageProcessingMode mode) {
std::shared_ptr<Message> received;
{
// Get the head of the message queue.
@@ -672,7 +672,9 @@ MaybeLocal<Value> MessagePort::ReceiveMessage(Local<Context> context,
Debug(this, "MessagePort has message");
- bool wants_message = receiving_messages_ || !only_if_receiving;
+ bool wants_message =
+ receiving_messages_ ||
+ mode == MessageProcessingMode::kForceReadMessages;
// We have nothing to do if:
// - There are no pending messages
// - We are not intending to receive messages, and the message we would
@@ -697,16 +699,18 @@ MaybeLocal<Value> MessagePort::ReceiveMessage(Local<Context> context,
return received->Deserialize(env(), context);
}
-void MessagePort::OnMessage() {
+void MessagePort::OnMessage(MessageProcessingMode mode) {
Debug(this, "Running MessagePort::OnMessage()");
HandleScope handle_scope(env()->isolate());
Local<Context> context = object(env()->isolate())->CreationContext();
size_t processing_limit;
- {
+ if (mode == MessageProcessingMode::kNormalOperation) {
Mutex::ScopedLock(data_->mutex_);
processing_limit = std::max(data_->incoming_messages_.size(),
static_cast<size_t>(1000));
+ } else {
+ processing_limit = std::numeric_limits<size_t>::max();
}
// data_ can only ever be modified by the owner thread, so no need to lock.
@@ -738,7 +742,7 @@ void MessagePort::OnMessage() {
// Catch any exceptions from parsing the message itself (not from
// emitting it) as 'messageeror' events.
TryCatchScope try_catch(env());
- if (!ReceiveMessage(context, true).ToLocal(&payload)) {
+ if (!ReceiveMessage(context, mode).ToLocal(&payload)) {
if (try_catch.HasCaught() && !try_catch.HasTerminated())
message_error = try_catch.Exception();
goto reschedule;
@@ -999,7 +1003,7 @@ void MessagePort::CheckType(const FunctionCallbackInfo<Value>& args) {
void MessagePort::Drain(const FunctionCallbackInfo<Value>& args) {
MessagePort* port;
ASSIGN_OR_RETURN_UNWRAP(&port, args[0].As<Object>());
- port->OnMessage();
+ port->OnMessage(MessageProcessingMode::kForceReadMessages);
}
void MessagePort::ReceiveMessage(const FunctionCallbackInfo<Value>& args) {
@@ -1018,7 +1022,8 @@ void MessagePort::ReceiveMessage(const FunctionCallbackInfo<Value>& args) {
}
MaybeLocal<Value> payload =
- port->ReceiveMessage(port->object()->CreationContext(), false);
+ port->ReceiveMessage(port->object()->CreationContext(),
+ MessageProcessingMode::kForceReadMessages);
if (!payload.IsEmpty())
args.GetReturnValue().Set(payload.ToLocalChecked());
}
diff --git a/src/node_messaging.h b/src/node_messaging.h
index ad065659772..2e63b22e4ce 100644
--- a/src/node_messaging.h
+++ b/src/node_messaging.h
@@ -285,11 +285,16 @@ class MessagePort : public HandleWrap {
SET_SELF_SIZE(MessagePort)
private:
+ enum class MessageProcessingMode {
+ kNormalOperation,
+ kForceReadMessages
+ };
+
void OnClose() override;
- void OnMessage();
+ void OnMessage(MessageProcessingMode mode);
void TriggerAsync();
v8::MaybeLocal<v8::Value> ReceiveMessage(v8::Local<v8::Context> context,
- bool only_if_receiving);
+ MessageProcessingMode mode);
std::unique_ptr<MessagePortData> data_ = nullptr;
bool receiving_messages_ = false;