Welcome to mirror list, hosted at ThFree Co, Russian Federation.

github.com/nodejs/node.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorFedor Indutny <fedor@indutny.com>2021-05-01 21:26:46 +0300
committerFedor Indutny <fedor@indutny.com>2021-05-05 04:39:28 +0300
commit7abc7e45b2e177307f67f33906663b5de02f2916 (patch)
treed99728b85e4339ab28757b406be1d1afc0241b59 /src
parent96431282705e19d93f37d2a397afb90b5a5a316e (diff)
node-api: faster threadsafe_function
Invoke threadsafe_function during the same tick and avoid marshalling costs between threads and/or churning event loop if either: 1. There's a queued call already 2. `Push()` is called while the main thread was running threadsafe_function PR-URL: https://github.com/nodejs/node/pull/38506 Reviewed-By: Anna Henningsen <anna@addaleax.net> Reviewed-By: Rich Trott <rtrott@gmail.com> Reviewed-By: James M Snell <jasnell@gmail.com>
Diffstat (limited to 'src')
-rw-r--r--src/node_api.cc88
1 files changed, 55 insertions, 33 deletions
diff --git a/src/node_api.cc b/src/node_api.cc
index 8dbf48d466d..4216356cb59 100644
--- a/src/node_api.cc
+++ b/src/node_api.cc
@@ -12,6 +12,7 @@
#include "tracing/traced_value.h"
#include "util-inl.h"
+#include <atomic>
#include <memory>
struct node_napi_env__ : public napi_env__ {
@@ -137,6 +138,7 @@ class ThreadSafeFunction : public node::AsyncResource {
*v8::String::Utf8Value(env_->isolate, name)),
thread_count(thread_count_),
is_closing(false),
+ dispatch_state(kDispatchIdle),
context(context_),
max_queue_size(max_queue_size_),
env(env_),
@@ -176,10 +178,8 @@ class ThreadSafeFunction : public node::AsyncResource {
return napi_closing;
}
} else {
- if (uv_async_send(&async) != 0) {
- return napi_generic_failure;
- }
queue.push(data);
+ Send();
return napi_ok;
}
}
@@ -211,9 +211,7 @@ class ThreadSafeFunction : public node::AsyncResource {
if (is_closing && max_queue_size > 0) {
cond->Signal(lock);
}
- if (uv_async_send(&async) != 0) {
- return napi_generic_failure;
- }
+ Send();
}
}
@@ -238,7 +236,6 @@ class ThreadSafeFunction : public node::AsyncResource {
cond = std::make_unique<node::ConditionVariable>();
}
if (max_queue_size == 0 || cond) {
- CHECK_EQ(0, uv_idle_init(loop, &idle));
return napi_ok;
}
@@ -263,21 +260,46 @@ class ThreadSafeFunction : public node::AsyncResource {
napi_status Unref() {
uv_unref(reinterpret_cast<uv_handle_t*>(&async));
- uv_unref(reinterpret_cast<uv_handle_t*>(&idle));
return napi_ok;
}
napi_status Ref() {
uv_ref(reinterpret_cast<uv_handle_t*>(&async));
- uv_ref(reinterpret_cast<uv_handle_t*>(&idle));
return napi_ok;
}
- void DispatchOne() {
+ inline void* Context() {
+ return context;
+ }
+
+ protected:
+ void Dispatch() {
+ bool has_more = true;
+
+ // Limit maximum synchronous iteration count to prevent event loop
+ // starvation. See `src/node_messaging.cc` for an inspiration.
+ unsigned int iterations_left = kMaxIterationCount;
+ while (has_more && --iterations_left != 0) {
+ dispatch_state = kDispatchRunning;
+ has_more = DispatchOne();
+
+ // Send() was called while we were executing the JS function
+ if (dispatch_state.exchange(kDispatchIdle) != kDispatchRunning) {
+ has_more = true;
+ }
+ }
+
+ if (has_more) {
+ Send();
+ }
+ }
+
+ bool DispatchOne() {
void* data = nullptr;
bool popped_value = false;
+ bool has_more = false;
{
node::Mutex::ScopedLock lock(this->mutex);
@@ -302,9 +324,9 @@ class ThreadSafeFunction : public node::AsyncResource {
cond->Signal(lock);
}
CloseHandlesAndMaybeDelete();
- } else {
- CHECK_EQ(0, uv_idle_stop(&idle));
}
+ } else {
+ has_more = true;
}
}
}
@@ -322,6 +344,8 @@ class ThreadSafeFunction : public node::AsyncResource {
call_js_cb(env, js_callback, context, data);
});
}
+
+ return has_more;
}
void Finalize() {
@@ -335,10 +359,6 @@ class ThreadSafeFunction : public node::AsyncResource {
EmptyQueueAndDelete();
}
- inline void* Context() {
- return context;
- }
-
void CloseHandlesAndMaybeDelete(bool set_closing = false) {
v8::HandleScope scope(env->isolate);
if (set_closing) {
@@ -358,18 +378,20 @@ class ThreadSafeFunction : public node::AsyncResource {
ThreadSafeFunction* ts_fn =
node::ContainerOf(&ThreadSafeFunction::async,
reinterpret_cast<uv_async_t*>(handle));
- v8::HandleScope scope(ts_fn->env->isolate);
- ts_fn->env->node_env()->CloseHandle(
- reinterpret_cast<uv_handle_t*>(&ts_fn->idle),
- [](uv_handle_t* handle) -> void {
- ThreadSafeFunction* ts_fn =
- node::ContainerOf(&ThreadSafeFunction::idle,
- reinterpret_cast<uv_idle_t*>(handle));
- ts_fn->Finalize();
- });
+ ts_fn->Finalize();
});
}
+ void Send() {
+ // Ask currently running Dispatch() to make one more iteration
+ unsigned char current_state = dispatch_state.fetch_or(kDispatchPending);
+ if ((current_state & kDispatchRunning) == kDispatchRunning) {
+ return;
+ }
+
+ CHECK_EQ(0, uv_async_send(&async));
+ }
+
// Default way of calling into JavaScript. Used when ThreadSafeFunction is
// without a call_js_cb_.
static void CallJs(napi_env env, napi_value cb, void* context, void* data) {
@@ -393,16 +415,10 @@ class ThreadSafeFunction : public node::AsyncResource {
}
}
- static void IdleCb(uv_idle_t* idle) {
- ThreadSafeFunction* ts_fn =
- node::ContainerOf(&ThreadSafeFunction::idle, idle);
- ts_fn->DispatchOne();
- }
-
static void AsyncCb(uv_async_t* async) {
ThreadSafeFunction* ts_fn =
node::ContainerOf(&ThreadSafeFunction::async, async);
- CHECK_EQ(0, uv_idle_start(&ts_fn->idle, IdleCb));
+ ts_fn->Dispatch();
}
static void Cleanup(void* data) {
@@ -411,14 +427,20 @@ class ThreadSafeFunction : public node::AsyncResource {
}
private:
+ static const unsigned char kDispatchIdle = 0;
+ static const unsigned char kDispatchRunning = 1 << 0;
+ static const unsigned char kDispatchPending = 1 << 1;
+
+ static const unsigned int kMaxIterationCount = 1000;
+
// These are variables protected by the mutex.
node::Mutex mutex;
std::unique_ptr<node::ConditionVariable> cond;
std::queue<void*> queue;
uv_async_t async;
- uv_idle_t idle;
size_t thread_count;
bool is_closing;
+ std::atomic_uchar dispatch_state;
// These are variables set once, upon creation, and then never again, which
// means we don't need the mutex to read them.