diff options
author | Fedor Indutny <fedor@indutny.com> | 2021-05-01 21:26:46 +0300 |
---|---|---|
committer | Fedor Indutny <fedor@indutny.com> | 2021-05-05 04:39:28 +0300 |
commit | 7abc7e45b2e177307f67f33906663b5de02f2916 (patch) | |
tree | d99728b85e4339ab28757b406be1d1afc0241b59 /src | |
parent | 96431282705e19d93f37d2a397afb90b5a5a316e (diff) |
node-api: faster threadsafe_function
Invoke threadsafe_function during the same tick and avoid marshalling
costs between threads and/or churning event loop if either:
1. There's a queued call already
2. `Push()` is called while the main thread was running
threadsafe_function
PR-URL: https://github.com/nodejs/node/pull/38506
Reviewed-By: Anna Henningsen <anna@addaleax.net>
Reviewed-By: Rich Trott <rtrott@gmail.com>
Reviewed-By: James M Snell <jasnell@gmail.com>
Diffstat (limited to 'src')
-rw-r--r-- | src/node_api.cc | 88 |
1 files changed, 55 insertions, 33 deletions
diff --git a/src/node_api.cc b/src/node_api.cc index 8dbf48d466d..4216356cb59 100644 --- a/src/node_api.cc +++ b/src/node_api.cc @@ -12,6 +12,7 @@ #include "tracing/traced_value.h" #include "util-inl.h" +#include <atomic> #include <memory> struct node_napi_env__ : public napi_env__ { @@ -137,6 +138,7 @@ class ThreadSafeFunction : public node::AsyncResource { *v8::String::Utf8Value(env_->isolate, name)), thread_count(thread_count_), is_closing(false), + dispatch_state(kDispatchIdle), context(context_), max_queue_size(max_queue_size_), env(env_), @@ -176,10 +178,8 @@ class ThreadSafeFunction : public node::AsyncResource { return napi_closing; } } else { - if (uv_async_send(&async) != 0) { - return napi_generic_failure; - } queue.push(data); + Send(); return napi_ok; } } @@ -211,9 +211,7 @@ class ThreadSafeFunction : public node::AsyncResource { if (is_closing && max_queue_size > 0) { cond->Signal(lock); } - if (uv_async_send(&async) != 0) { - return napi_generic_failure; - } + Send(); } } @@ -238,7 +236,6 @@ class ThreadSafeFunction : public node::AsyncResource { cond = std::make_unique<node::ConditionVariable>(); } if (max_queue_size == 0 || cond) { - CHECK_EQ(0, uv_idle_init(loop, &idle)); return napi_ok; } @@ -263,21 +260,46 @@ class ThreadSafeFunction : public node::AsyncResource { napi_status Unref() { uv_unref(reinterpret_cast<uv_handle_t*>(&async)); - uv_unref(reinterpret_cast<uv_handle_t*>(&idle)); return napi_ok; } napi_status Ref() { uv_ref(reinterpret_cast<uv_handle_t*>(&async)); - uv_ref(reinterpret_cast<uv_handle_t*>(&idle)); return napi_ok; } - void DispatchOne() { + inline void* Context() { + return context; + } + + protected: + void Dispatch() { + bool has_more = true; + + // Limit maximum synchronous iteration count to prevent event loop + // starvation. See `src/node_messaging.cc` for an inspiration. + unsigned int iterations_left = kMaxIterationCount; + while (has_more && --iterations_left != 0) { + dispatch_state = kDispatchRunning; + has_more = DispatchOne(); + + // Send() was called while we were executing the JS function + if (dispatch_state.exchange(kDispatchIdle) != kDispatchRunning) { + has_more = true; + } + } + + if (has_more) { + Send(); + } + } + + bool DispatchOne() { void* data = nullptr; bool popped_value = false; + bool has_more = false; { node::Mutex::ScopedLock lock(this->mutex); @@ -302,9 +324,9 @@ class ThreadSafeFunction : public node::AsyncResource { cond->Signal(lock); } CloseHandlesAndMaybeDelete(); - } else { - CHECK_EQ(0, uv_idle_stop(&idle)); } + } else { + has_more = true; } } } @@ -322,6 +344,8 @@ class ThreadSafeFunction : public node::AsyncResource { call_js_cb(env, js_callback, context, data); }); } + + return has_more; } void Finalize() { @@ -335,10 +359,6 @@ class ThreadSafeFunction : public node::AsyncResource { EmptyQueueAndDelete(); } - inline void* Context() { - return context; - } - void CloseHandlesAndMaybeDelete(bool set_closing = false) { v8::HandleScope scope(env->isolate); if (set_closing) { @@ -358,18 +378,20 @@ class ThreadSafeFunction : public node::AsyncResource { ThreadSafeFunction* ts_fn = node::ContainerOf(&ThreadSafeFunction::async, reinterpret_cast<uv_async_t*>(handle)); - v8::HandleScope scope(ts_fn->env->isolate); - ts_fn->env->node_env()->CloseHandle( - reinterpret_cast<uv_handle_t*>(&ts_fn->idle), - [](uv_handle_t* handle) -> void { - ThreadSafeFunction* ts_fn = - node::ContainerOf(&ThreadSafeFunction::idle, - reinterpret_cast<uv_idle_t*>(handle)); - ts_fn->Finalize(); - }); + ts_fn->Finalize(); }); } + void Send() { + // Ask currently running Dispatch() to make one more iteration + unsigned char current_state = dispatch_state.fetch_or(kDispatchPending); + if ((current_state & kDispatchRunning) == kDispatchRunning) { + return; + } + + CHECK_EQ(0, uv_async_send(&async)); + } + // Default way of calling into JavaScript. Used when ThreadSafeFunction is // without a call_js_cb_. static void CallJs(napi_env env, napi_value cb, void* context, void* data) { @@ -393,16 +415,10 @@ class ThreadSafeFunction : public node::AsyncResource { } } - static void IdleCb(uv_idle_t* idle) { - ThreadSafeFunction* ts_fn = - node::ContainerOf(&ThreadSafeFunction::idle, idle); - ts_fn->DispatchOne(); - } - static void AsyncCb(uv_async_t* async) { ThreadSafeFunction* ts_fn = node::ContainerOf(&ThreadSafeFunction::async, async); - CHECK_EQ(0, uv_idle_start(&ts_fn->idle, IdleCb)); + ts_fn->Dispatch(); } static void Cleanup(void* data) { @@ -411,14 +427,20 @@ class ThreadSafeFunction : public node::AsyncResource { } private: + static const unsigned char kDispatchIdle = 0; + static const unsigned char kDispatchRunning = 1 << 0; + static const unsigned char kDispatchPending = 1 << 1; + + static const unsigned int kMaxIterationCount = 1000; + // These are variables protected by the mutex. node::Mutex mutex; std::unique_ptr<node::ConditionVariable> cond; std::queue<void*> queue; uv_async_t async; - uv_idle_t idle; size_t thread_count; bool is_closing; + std::atomic_uchar dispatch_state; // These are variables set once, upon creation, and then never again, which // means we don't need the mutex to read them. |