diff options
13 files changed, 447 insertions, 109 deletions
diff --git a/source/blender/blenlib/BLI_lazy_threading.hh b/source/blender/blenlib/BLI_lazy_threading.hh new file mode 100644 index 00000000000..61532fe24f0 --- /dev/null +++ b/source/blender/blenlib/BLI_lazy_threading.hh @@ -0,0 +1,83 @@ +/* SPDX-License-Identifier: GPL-2.0-or-later */ + +#pragma once + +/** \file + * \ingroup bli + * + * The goal of "lazy threading" is to avoid using threads unless one can reasonably assume that it + * is worth distributing work over multiple threads. Using threads can lead to worse overall + * performance by introducing inter-thread communication overhead. Keeping all work on a single + * thread reduces this overhead to zero and also makes better use of the CPU cache. + * + * Functions like #parallel_for also solve this to some degree by using a "grain size". When the + * number of individual tasks is too small, no multi-threading is used. This works very well when + * there are many homogeneous tasks that can be expected to take approximately the same time. + * + * The situation becomes more difficult when: + * - The individual tasks are not homogeneous, i.e. they take different amounts of time to compute. + * - It is practically impossible to guess how long each task will take in advance. + * + * Given those constraints, a single grain size cannot be determined. One could just schedule all + * tasks individually but that would create a lot of overhead when the tasks happen to be very + * small. While TBB will keep all tasks on a single thread if the other threads are busy, if they + * are idle they will start stealing the work even if that's not benefitial for overall + * performance. + * + * This file provides a simple API that allows a task scheduler to properly handle tasks whose size + * is not known in advance. The key idea is this: + * + * > By default, all work stays on a single thread. If an individual task notices that it is about + * > start a computation that will take a while, it notifies the task scheduler further up on the + * > stack. The scheduler then allows other threads to take over other tasks that were originally + * > meant for the current thread. + * + * This way, when all tasks are small, no threading overhead has to be paid for. Whenever there is + * a task that keeps the current thread busy for a while, the other tasks are moved to a separate + * thread so that they can be executed without waiting for the long computation to finish. + * + * Consequently, the earlier a task knows during it execution that it will take a while, the + * better. That's because if it is blocking anyway, it's more efficient to move the other tasks to + * another thread earlier. + * + * To make this work, three things have to be solved: + * 1. The task scheduler has to be able to start single-threaded and become multi-threaded after + * tasks have started executing. This has to be solved in the specific task scheduler. + * 2. There has to be a way for the currently running task to tell the task scheduler that it is + * about to perform a computation that will take a while and that it would be reasonable to move + * other tasks to other threads. This part is implemented in the API provided by this file. + * 3. Individual tasks have to decide when a computation is long enough to justify talking to the + * scheduler. This is always based on heuristics that have to be fine tuned over time. One could + * assume that this means adding new work-size checks to many parts in Blender, but that's + * actually not necessary, because these checks exist already in the form of grain sizes passed + * to e.g. #parallel_for. The assumption here is that when the task thinks the current work load + * is big enough to justify using threads, it's also big enough to justify using another thread + * for waiting tasks on the current thread. + */ + +#include "BLI_function_ref.hh" + +namespace blender::lazy_threading { + +/** + * Tell task schedulers on the current thread that it is about to start a long computation + * and that other waiting tasks should better be moved to another thread if possible. + */ +void send_hint(); + +/** + * Used by the task scheduler to receive hints from current tasks that they will take a while. + * This should only be allocated on the stack. + */ +class HintReceiver { + public: + /** + * The passed in function is called when a task signals that it will take a while. + * \note The function has to stay alive after the call to the constructor. So one must not pass a + * lambda directly into this constructor but store it in a separate variable on the stack first. + */ + HintReceiver(FunctionRef<void()> fn); + ~HintReceiver(); +}; + +} // namespace blender::lazy_threading diff --git a/source/blender/blenlib/BLI_task.hh b/source/blender/blenlib/BLI_task.hh index 33a781d3749..9f9a57be634 100644 --- a/source/blender/blenlib/BLI_task.hh +++ b/source/blender/blenlib/BLI_task.hh @@ -31,6 +31,7 @@ #endif #include "BLI_index_range.hh" +#include "BLI_lazy_threading.hh" #include "BLI_utildefines.h" namespace blender::threading { @@ -56,6 +57,7 @@ void parallel_for(IndexRange range, int64_t grain_size, const Function &function #ifdef WITH_TBB /* Invoking tbb for small workloads has a large overhead. */ if (range.size() >= grain_size) { + lazy_threading::send_hint(); tbb::parallel_for( tbb::blocked_range<int64_t>(range.first(), range.one_after_last(), grain_size), [&](const tbb::blocked_range<int64_t> &subrange) { @@ -78,6 +80,7 @@ Value parallel_reduce(IndexRange range, { #ifdef WITH_TBB if (range.size() >= grain_size) { + lazy_threading::send_hint(); return tbb::parallel_reduce( tbb::blocked_range<int64_t>(range.first(), range.one_after_last(), grain_size), identity, @@ -114,6 +117,7 @@ template<typename... Functions> void parallel_invoke(const bool use_threading, Functions &&...functions) { if (use_threading) { + lazy_threading::send_hint(); parallel_invoke(std::forward<Functions>(functions)...); } else { diff --git a/source/blender/blenlib/CMakeLists.txt b/source/blender/blenlib/CMakeLists.txt index 50dc11cbf0a..36acbd41ad7 100644 --- a/source/blender/blenlib/CMakeLists.txt +++ b/source/blender/blenlib/CMakeLists.txt @@ -85,6 +85,7 @@ set(SRC intern/kdtree_3d.c intern/kdtree_4d.c intern/lasso_2d.c + intern/lazy_threading.cc intern/length_parameterize.cc intern/listbase.c intern/math_base.c diff --git a/source/blender/blenlib/intern/lazy_threading.cc b/source/blender/blenlib/intern/lazy_threading.cc new file mode 100644 index 00000000000..803fd81a96d --- /dev/null +++ b/source/blender/blenlib/intern/lazy_threading.cc @@ -0,0 +1,30 @@ +/* SPDX-License-Identifier: GPL-2.0-or-later */ + +#include "BLI_lazy_threading.hh" +#include "BLI_vector.hh" + +namespace blender::lazy_threading { + +/** + * This is a #RawVector so that it can be destructed after Blender checks for memory leaks. + */ +thread_local RawVector<FunctionRef<void()>, 0> hint_receivers; + +void send_hint() +{ + for (const FunctionRef<void()> &fn : hint_receivers) { + fn(); + } +} + +HintReceiver::HintReceiver(const FunctionRef<void()> fn) +{ + hint_receivers.append(fn); +} + +HintReceiver::~HintReceiver() +{ + hint_receivers.pop_last(); +} + +} // namespace blender::lazy_threading diff --git a/source/blender/blenlib/intern/task_range.cc b/source/blender/blenlib/intern/task_range.cc index 7e405529f03..181b760bea1 100644 --- a/source/blender/blenlib/intern/task_range.cc +++ b/source/blender/blenlib/intern/task_range.cc @@ -12,6 +12,7 @@ #include "DNA_listBase.h" +#include "BLI_lazy_threading.hh" #include "BLI_task.h" #include "BLI_threads.h" @@ -104,6 +105,8 @@ void BLI_task_parallel_range(const int start, const size_t grainsize = MAX2(settings->min_iter_per_thread, 1); const tbb::blocked_range<int> range(start, stop, grainsize); + blender::lazy_threading::send_hint(); + if (settings->func_reduce) { parallel_reduce(range, task); if (settings->userdata_chunk) { diff --git a/source/blender/functions/FN_lazy_function.hh b/source/blender/functions/FN_lazy_function.hh index 59a3a90b0b0..4a539e7cbd1 100644 --- a/source/blender/functions/FN_lazy_function.hh +++ b/source/blender/functions/FN_lazy_function.hh @@ -43,6 +43,13 @@ #include "BLI_linear_allocator.hh" #include "BLI_vector.hh" +#include <atomic> +#include <thread> + +#ifdef DEBUG +# define FN_LAZY_FUNCTION_DEBUG_THREADS +#endif + namespace blender::fn::lazy_function { enum class ValueUsage { @@ -102,9 +109,13 @@ class Params { * The lazy-function this #Params has been prepared for. */ const LazyFunction &fn_; +#ifdef FN_LAZY_FUNCTION_DEBUG_THREADS + std::thread::id main_thread_id_; + std::atomic<bool> allow_multi_threading_; +#endif public: - Params(const LazyFunction &fn); + Params(const LazyFunction &fn, bool allow_multi_threading_initially); /** * Get a pointer to an input value if the value is available already. Otherwise null is returned. @@ -154,7 +165,7 @@ class Params { * Typed utility methods that wrap the methods above. */ template<typename T> T extract_input(int index); - template<typename T> const T &get_input(int index); + template<typename T> const T &get_input(int index) const; template<typename T> T *try_get_input_data_ptr_or_request(int index); template<typename T> void set_output(int index, T &&value); @@ -163,7 +174,15 @@ class Params { */ void set_default_remaining_outputs(); + /** + * Returns true when the lazy-function is now allowed to use multi-threading when interacting + * with this #Params. That means, it is allowed to call non-const methods from different threads. + */ + bool try_enable_multi_threading(); + private: + void assert_valid_thread() const; + /** * Methods that need to be implemented by subclasses. Those are separate from the non-virtual * methods above to make it easy to insert additional debugging logic on top of the @@ -176,6 +195,7 @@ class Params { virtual bool output_was_set_impl(int index) const = 0; virtual ValueUsage get_output_usage_impl(int index) const = 0; virtual void set_input_unused_impl(int index) = 0; + virtual bool try_enable_multi_threading_impl(); }; /** @@ -312,7 +332,14 @@ inline void LazyFunction::execute(Params ¶ms, const Context &context) const /** \name #Params Inline Methods * \{ */ -inline Params::Params(const LazyFunction &fn) : fn_(fn) +inline Params::Params(const LazyFunction &fn, + [[maybe_unused]] bool allow_multi_threading_initially) + : fn_(fn) +#ifdef FN_LAZY_FUNCTION_DEBUG_THREADS + , + main_thread_id_(std::this_thread::get_id()), + allow_multi_threading_(allow_multi_threading_initially) +#endif { } @@ -323,16 +350,19 @@ inline void *Params::try_get_input_data_ptr(const int index) const inline void *Params::try_get_input_data_ptr_or_request(const int index) { + this->assert_valid_thread(); return this->try_get_input_data_ptr_or_request_impl(index); } inline void *Params::get_output_data_ptr(const int index) { + this->assert_valid_thread(); return this->get_output_data_ptr_impl(index); } inline void Params::output_set(const int index) { + this->assert_valid_thread(); this->output_set_impl(index); } @@ -348,18 +378,20 @@ inline ValueUsage Params::get_output_usage(const int index) const inline void Params::set_input_unused(const int index) { + this->assert_valid_thread(); this->set_input_unused_impl(index); } template<typename T> inline T Params::extract_input(const int index) { + this->assert_valid_thread(); void *data = this->try_get_input_data_ptr(index); BLI_assert(data != nullptr); T return_value = std::move(*static_cast<T *>(data)); return return_value; } -template<typename T> inline const T &Params::get_input(const int index) +template<typename T> inline const T &Params::get_input(const int index) const { const void *data = this->try_get_input_data_ptr(index); BLI_assert(data != nullptr); @@ -368,17 +400,43 @@ template<typename T> inline const T &Params::get_input(const int index) template<typename T> inline T *Params::try_get_input_data_ptr_or_request(const int index) { + this->assert_valid_thread(); return static_cast<T *>(this->try_get_input_data_ptr_or_request(index)); } template<typename T> inline void Params::set_output(const int index, T &&value) { using DecayT = std::decay_t<T>; + this->assert_valid_thread(); void *data = this->get_output_data_ptr(index); new (data) DecayT(std::forward<T>(value)); this->output_set(index); } +inline bool Params::try_enable_multi_threading() +{ + this->assert_valid_thread(); + const bool success = this->try_enable_multi_threading_impl(); +#ifdef FN_LAZY_FUNCTION_DEBUG_THREADS + if (success) { + allow_multi_threading_ = true; + } +#endif + return success; +} + +inline void Params::assert_valid_thread() const +{ +#ifdef FN_LAZY_FUNCTION_DEBUG_THREADS + if (allow_multi_threading_) { + return; + } + if (main_thread_id_ != std::this_thread::get_id()) { + BLI_assert_unreachable(); + } +#endif +} + /** \} */ } // namespace blender::fn::lazy_function diff --git a/source/blender/functions/FN_lazy_function_execute.hh b/source/blender/functions/FN_lazy_function_execute.hh index ade23ad17c7..31bbddf5baf 100644 --- a/source/blender/functions/FN_lazy_function_execute.hh +++ b/source/blender/functions/FN_lazy_function_execute.hh @@ -41,6 +41,7 @@ class BasicParams : public Params { bool output_was_set_impl(const int index) const override; ValueUsage get_output_usage_impl(const int index) const override; void set_input_unused_impl(const int index) override; + bool try_enable_multi_threading_impl() override; }; namespace detail { diff --git a/source/blender/functions/intern/lazy_function.cc b/source/blender/functions/intern/lazy_function.cc index 46572283e9b..2d69af84f85 100644 --- a/source/blender/functions/intern/lazy_function.cc +++ b/source/blender/functions/intern/lazy_function.cc @@ -63,4 +63,9 @@ void Params::set_default_remaining_outputs() } } +bool Params::try_enable_multi_threading_impl() +{ + return false; +} + } // namespace blender::fn::lazy_function diff --git a/source/blender/functions/intern/lazy_function_execute.cc b/source/blender/functions/intern/lazy_function_execute.cc index 279056afa99..cea9b48d5bc 100644 --- a/source/blender/functions/intern/lazy_function_execute.cc +++ b/source/blender/functions/intern/lazy_function_execute.cc @@ -14,7 +14,7 @@ BasicParams::BasicParams(const LazyFunction &fn, MutableSpan<std::optional<ValueUsage>> input_usages, Span<ValueUsage> output_usages, MutableSpan<bool> set_outputs) - : Params(fn), + : Params(fn, true), inputs_(inputs), outputs_(outputs), input_usages_(input_usages), @@ -62,4 +62,9 @@ void BasicParams::set_input_unused_impl(const int index) input_usages_[index] = ValueUsage::Unused; } +bool BasicParams::try_enable_multi_threading_impl() +{ + return true; +} + } // namespace blender::fn::lazy_function diff --git a/source/blender/functions/intern/lazy_function_graph_executor.cc b/source/blender/functions/intern/lazy_function_graph_executor.cc index 176509bd687..f3ce2476085 100644 --- a/source/blender/functions/intern/lazy_function_graph_executor.cc +++ b/source/blender/functions/intern/lazy_function_graph_executor.cc @@ -3,18 +3,20 @@ /** * This file implements the evaluation of a lazy-function graph. It's main objectives are: * - Only compute values that are actually used. - * - Allow spreading the work over an arbitrary number of CPU cores. + * - Stay single threaded when nodes are executed quickly. + * - Allow spreading the work over an arbitrary number of threads efficiently. * - * Other (simpler) executors with different main objectives could be implemented in the future. For - * some scenarios those could be simpler when many nodes do very little work or most nodes have to - * be processed sequentially. Those assumptions make the first and second objective less important - * respectively. + * This executor makes use of `FN_lazy_threading.hh` to enable multi-threading only when it seems + * benefitial. It operates in two modes: single- and multi-threaded. The use of a task pool and + * locks is avoided in single-threaded mode. Once multi-threading is enabled the executor starts + * using both. It is not possible to switch back from multi-threaded to single-threaded mode. * - * The design implemented in this executor requires *no* main thread that coordinates everything. - * Instead, one thread will trigger some initial work and then many threads coordinate themselves - * in a distributed fashion. In an ideal situation, every thread ends up processing a separate part - * of the graph which results in less communication overhead. The way TBB schedules tasks helps - * with that: a thread will next process the task that it added to a task pool just before. + * The multi-threading design implemented in this executor requires *no* main thread that + * coordinates everything. Instead, one thread will trigger some initial work and then many threads + * coordinate themselves in a distributed fashion. In an ideal situation, every thread ends up + * processing a separate part of the graph which results in less communication overhead. The way + * TBB schedules tasks helps with that: a thread will next process the task that it added to a task + * pool just before. * * Communication between threads is synchronized by using a mutex in every node. When a thread * wants to access the state of a node, its mutex has to be locked first (with some documented @@ -26,15 +28,14 @@ * state of its inputs and outputs. Every time a node is executed, it has to advance its state in * some way (e.g. it requests a new input or computes a new output). * - * At the core of the executor is a task pool. Every task in that pool represents a node execution. - * When a node is executed it may send notifications to other nodes which may in turn add those - * nodes to the task pool. For example, the current node has computed one of its outputs, then the + * When a node is executed it may send notifications to other nodes which may in turn schedule + * those nodes. For example, when the current node has computed one of its outputs, then the * computed value is forwarded to all linked inputs, changing their node states in the process. If - * this input was the last missing required input, the node will be added to the task pool so that - * it is executed next. + * this input was the last missing required input, the node will be scheduled that it is executed + * next. * - * When the task pool is empty, the executor gives back control to the caller which may later - * provide new inputs to the graph which in turn adds new nodes to the task pool and the process + * When all tasks are completed, the executor gives back control to the caller which may later + * provide new inputs to the graph which in turn leads to new nodes being scheduled and the process * starts again. */ @@ -190,27 +191,31 @@ struct LockedNode { */ Vector<const OutputSocket *> delayed_required_outputs; Vector<const OutputSocket *> delayed_unused_outputs; - Vector<const FunctionNode *> delayed_scheduled_nodes; LockedNode(const Node &node, NodeState &node_state) : node(node), node_state(node_state) { } }; +class Executor; +class GraphExecutorLFParams; + struct CurrentTask { /** - * The node that should be run on the same thread after the current node is done. This avoids - * some overhead by skipping a round trip through the task pool. + * Mutex used to protect #scheduled_nodes when the executor uses multi-threading. */ - std::atomic<const FunctionNode *> next_node = nullptr; + std::mutex mutex; /** - * Indicates that some node has been added to the task pool. + * Nodes that have been scheduled to execute next. */ - std::atomic<bool> added_node_to_pool = false; + Vector<const FunctionNode *> scheduled_nodes; + /** + * Makes it cheaper to check if there are any scheduled nodes because it avoids locking the + * mutex. + */ + std::atomic<bool> has_scheduled_nodes = false; }; -class GraphExecutorLFParams; - class Executor { private: const GraphExecutor &self_; @@ -230,13 +235,18 @@ class Executor { const Context *context_ = nullptr; /** * Used to distribute work on separate nodes to separate threads. + * If this is empty, the executor is in single threaded mode. */ - TaskPool *task_pool_ = nullptr; + std::atomic<TaskPool *> task_pool_ = nullptr; +#ifdef FN_LAZY_FUNCTION_DEBUG_THREADS + std::thread::id current_main_thread_; +#endif /** * A separate linear allocator for every thread. We could potentially reuse some memory, but that * doesn't seem worth it yet. */ threading::EnumerableThreadSpecific<LinearAllocator<>> local_allocators_; + LinearAllocator<> *main_local_allocator_ = nullptr; /** * Set to false when the first execution ends. */ @@ -249,11 +259,14 @@ class Executor { { /* The indices are necessary, because they are used as keys in #node_states_. */ BLI_assert(self_.graph_.node_indices_are_valid()); + main_local_allocator_ = &local_allocators_.local(); } ~Executor() { - BLI_task_pool_free(task_pool_); + if (TaskPool *task_pool = task_pool_.load()) { + BLI_task_pool_free(task_pool); + } threading::parallel_for(node_states_.index_range(), 1024, [&](const IndexRange range) { for (const int node_index : range) { const Node &node = *self_.graph_.nodes()[node_index]; @@ -270,18 +283,23 @@ class Executor { { params_ = ¶ms; context_ = &context; - BLI_SCOPED_DEFER([&]() { - /* Make sure the #params_ pointer is not dangling, even when it shouldn't be accessed by - * anyone. */ +#ifdef FN_LAZY_FUNCTION_DEBUG_THREADS + current_main_thread_ = std::this_thread::get_id(); +#endif + const auto deferred_func = [&]() { + /* Make sure the pointers are not dangling, even when it shouldn't be accessed by anyone. */ params_ = nullptr; context_ = nullptr; is_first_execution_ = false; - }); +#ifdef FN_LAZY_FUNCTION_DEBUG_THREADS + current_main_thread_ = {}; +#endif + }; + BLI_SCOPED_DEFER(deferred_func); CurrentTask current_task; if (is_first_execution_) { this->initialize_node_states(); - task_pool_ = BLI_task_pool_create(this, TASK_PRIORITY_HIGH); /* Initialize atomics to zero. */ memset(static_cast<void *>(loaded_inputs_.data()), 0, loaded_inputs_.size() * sizeof(bool)); @@ -294,21 +312,11 @@ class Executor { this->schedule_newly_requested_outputs(current_task); this->forward_newly_provided_inputs(current_task); - /* Avoid using task pool when there is no parallel work to do. */ - while (!current_task.added_node_to_pool) { - if (current_task.next_node == nullptr) { - /* Nothing to do. */ - return; - } - const FunctionNode &node = *current_task.next_node; - current_task.next_node = nullptr; - this->run_node_task(node, current_task); - } - if (current_task.next_node != nullptr) { - this->add_node_to_task_pool(*current_task.next_node); - } + this->run_task(current_task); - BLI_task_pool_work_and_wait(task_pool_); + if (TaskPool *task_pool = task_pool_.load()) { + BLI_task_pool_work_and_wait(task_pool); + } } private: @@ -426,7 +434,7 @@ class Executor { NodeState &node_state = *node_states_[node->index_in_graph()]; node_state.has_side_effects = true; this->with_locked_node(*node, node_state, current_task, [&](LockedNode &locked_node) { - this->schedule_node(locked_node); + this->schedule_node(locked_node, current_task); }); } } @@ -434,7 +442,7 @@ class Executor { void forward_newly_provided_inputs(CurrentTask ¤t_task) { - LinearAllocator<> &allocator = local_allocators_.local(); + LinearAllocator<> &allocator = this->get_main_or_local_allocator(); for (const int graph_input_index : self_.graph_inputs_.index_range()) { std::atomic<uint8_t> &was_loaded = loaded_inputs_[graph_input_index]; if (was_loaded.load()) { @@ -488,7 +496,7 @@ class Executor { return; } this->forward_newly_provided_input( - current_task, local_allocators_.local(), graph_input_index, input_data); + current_task, this->get_main_or_local_allocator(), graph_input_index, input_data); return; } @@ -498,7 +506,7 @@ class Executor { return; } output_state.usage = ValueUsage::Used; - this->schedule_node(locked_node); + this->schedule_node(locked_node, current_task); }); } @@ -520,25 +528,28 @@ class Executor { params_->set_input_unused(graph_input_index); } else { - this->schedule_node(locked_node); + this->schedule_node(locked_node, current_task); } } } }); } - void schedule_node(LockedNode &locked_node) + void schedule_node(LockedNode &locked_node, CurrentTask ¤t_task) { BLI_assert(locked_node.node.is_function()); switch (locked_node.node_state.schedule_state) { case NodeScheduleState::NotScheduled: { - /* Don't add the node to the task pool immediately, because the task pool might start - * executing it immediately (when Blender is started with a single thread). - * That would often result in a deadlock, because we are still holding the mutex of the - * current node. Also see comments in #LockedNode. */ locked_node.node_state.schedule_state = NodeScheduleState::Scheduled; - locked_node.delayed_scheduled_nodes.append( - &static_cast<const FunctionNode &>(locked_node.node)); + const FunctionNode &node = static_cast<const FunctionNode &>(locked_node.node); + if (this->use_multi_threading()) { + std::lock_guard lock{current_task.mutex}; + current_task.scheduled_nodes.append(&node); + } + else { + current_task.scheduled_nodes.append(&node); + } + current_task.has_scheduled_nodes.store(true, std::memory_order_relaxed); break; } case NodeScheduleState::Scheduled: { @@ -562,14 +573,16 @@ class Executor { BLI_assert(&node_state == node_states_[node.index_in_graph()]); LockedNode locked_node{node, node_state}; - { + if (this->use_multi_threading()) { std::lock_guard lock{node_state.mutex}; threading::isolate_task([&]() { f(locked_node); }); } + else { + f(locked_node); + } this->send_output_required_notifications(locked_node.delayed_required_outputs, current_task); this->send_output_unused_notifications(locked_node.delayed_unused_outputs, current_task); - this->schedule_new_nodes(locked_node.delayed_scheduled_nodes, current_task); } void send_output_required_notifications(const Span<const OutputSocket *> sockets, @@ -588,49 +601,21 @@ class Executor { } } - void schedule_new_nodes(const Span<const FunctionNode *> nodes, CurrentTask ¤t_task) + void run_task(CurrentTask ¤t_task) { - for (const FunctionNode *node_to_schedule : nodes) { - /* Avoid a round trip through the task pool for the first node that is scheduled by the - * current node execution. Other nodes are added to the pool so that other threads can pick - * them up. */ - const FunctionNode *expected = nullptr; - if (current_task.next_node.compare_exchange_strong( - expected, node_to_schedule, std::memory_order_relaxed)) { - continue; + while (!current_task.scheduled_nodes.is_empty()) { + const FunctionNode &node = *current_task.scheduled_nodes.pop_last(); + if (current_task.scheduled_nodes.is_empty()) { + current_task.has_scheduled_nodes.store(false, std::memory_order_relaxed); } - this->add_node_to_task_pool(*node_to_schedule); - current_task.added_node_to_pool.store(true, std::memory_order_relaxed); - } - } - - void add_node_to_task_pool(const Node &node) - { - BLI_task_pool_push( - task_pool_, Executor::run_node_from_task_pool, (void *)&node, false, nullptr); - } - - static void run_node_from_task_pool(TaskPool *task_pool, void *task_data) - { - void *user_data = BLI_task_pool_user_data(task_pool); - Executor &executor = *static_cast<Executor *>(user_data); - const FunctionNode &node = *static_cast<const FunctionNode *>(task_data); - - /* This loop reduces the number of round trips through the task pool as long as the current - * node is scheduling more nodes. */ - CurrentTask current_task; - current_task.next_node = &node; - while (current_task.next_node != nullptr) { - const FunctionNode &node_to_run = *current_task.next_node; - current_task.next_node = nullptr; - executor.run_node_task(node_to_run, current_task); + this->run_node_task(node, current_task); } } void run_node_task(const FunctionNode &node, CurrentTask ¤t_task) { NodeState &node_state = *node_states_[node.index_in_graph()]; - LinearAllocator<> &allocator = local_allocators_.local(); + LinearAllocator<> &allocator = this->get_main_or_local_allocator(); const LazyFunction &fn = node.function(); bool node_needs_execution = false; @@ -672,7 +657,7 @@ class Executor { } void *buffer = allocator.allocate(type.size(), type.alignment()); type.copy_construct(default_value, buffer); - this->forward_value_to_input(locked_node, input_state, {type, buffer}); + this->forward_value_to_input(locked_node, input_state, {type, buffer}, current_task); } /* Request linked inputs that are always needed. */ @@ -723,7 +708,7 @@ class Executor { NodeScheduleState::RunningAndRescheduled; node_state.schedule_state = NodeScheduleState::NotScheduled; if (reschedule_requested && !node_state.node_has_finished) { - this->schedule_node(locked_node); + this->schedule_node(locked_node, current_task); } }); } @@ -887,7 +872,7 @@ class Executor { CurrentTask ¤t_task) { BLI_assert(value_to_forward.get() != nullptr); - LinearAllocator<> &allocator = local_allocators_.local(); + LinearAllocator<> &allocator = this->get_main_or_local_allocator(); const CPPType &type = *value_to_forward.type(); if (self_.logger_ != nullptr) { @@ -938,13 +923,13 @@ class Executor { } if (is_last_target) { /* No need to make a copy if this is the last target. */ - this->forward_value_to_input(locked_node, input_state, value_to_forward); + this->forward_value_to_input(locked_node, input_state, value_to_forward, current_task); value_to_forward = {}; } else { void *buffer = allocator.allocate(type.size(), type.alignment()); type.copy_construct(value_to_forward.get(), buffer); - this->forward_value_to_input(locked_node, input_state, {type, buffer}); + this->forward_value_to_input(locked_node, input_state, {type, buffer}, current_task); } }); } @@ -955,7 +940,8 @@ class Executor { void forward_value_to_input(LockedNode &locked_node, InputState &input_state, - GMutablePointer value) + GMutablePointer value, + CurrentTask ¤t_task) { NodeState &node_state = locked_node.node_state; @@ -966,10 +952,82 @@ class Executor { if (input_state.usage == ValueUsage::Used) { node_state.missing_required_inputs -= 1; if (node_state.missing_required_inputs == 0) { - this->schedule_node(locked_node); + this->schedule_node(locked_node, current_task); } } } + + bool use_multi_threading() const + { + return task_pool_.load() != nullptr; + } + + bool try_enable_multi_threading() + { + if (this->use_multi_threading()) { + return true; + } +#ifdef FN_LAZY_FUNCTION_DEBUG_THREADS + /* Only the current main thread is allowed to enabled multi-threading, because the executor is + * still in single-threaded mode. */ + if (current_main_thread_ != std::this_thread::get_id()) { + BLI_assert_unreachable(); + } +#endif + /* Check of the caller supports multi-threading. */ + if (!params_->try_enable_multi_threading()) { + return false; + } + /* Avoid using multiple threads when only one thread can be used anyway. */ + if (BLI_system_thread_count() <= 1) { + return false; + } + task_pool_.store(BLI_task_pool_create(this, TASK_PRIORITY_HIGH)); + return true; + } + + /** + * Allow other threads to steal all the nodes that are currently scheduled on this thread. + */ + void move_scheduled_nodes_to_task_pool(CurrentTask ¤t_task) + { + BLI_assert(this->use_multi_threading()); + using FunctionNodeVector = Vector<const FunctionNode *>; + FunctionNodeVector *nodes = MEM_new<FunctionNodeVector>(__func__); + { + std::lock_guard lock{current_task.mutex}; + if (current_task.scheduled_nodes.is_empty()) { + return; + } + *nodes = std::move(current_task.scheduled_nodes); + current_task.has_scheduled_nodes.store(false, std::memory_order_relaxed); + } + /* All nodes are pushed as a single task in the pool. This avoids unnecessary threading + * overhead when the nodes are fast to compute. */ + BLI_task_pool_push( + task_pool_.load(), + [](TaskPool *pool, void *data) { + Executor &executor = *static_cast<Executor *>(BLI_task_pool_user_data(pool)); + FunctionNodeVector &nodes = *static_cast<FunctionNodeVector *>(data); + CurrentTask new_current_task; + new_current_task.scheduled_nodes = std::move(nodes); + new_current_task.has_scheduled_nodes.store(true, std::memory_order_relaxed); + executor.run_task(new_current_task); + }, + nodes, + true, + [](TaskPool * /*pool*/, void *data) { + MEM_delete(static_cast<FunctionNodeVector *>(data)); + }); + } + + LinearAllocator<> &get_main_or_local_allocator() + { + if (this->use_multi_threading()) { + return local_allocators_.local(); + } + return *main_local_allocator_; + } }; class GraphExecutorLFParams final : public Params { @@ -985,7 +1043,7 @@ class GraphExecutorLFParams final : public Params { const Node &node, NodeState &node_state, CurrentTask ¤t_task) - : Params(fn), + : Params(fn, executor.use_multi_threading()), executor_(executor), node_(node), node_state_(node_state), @@ -1017,7 +1075,7 @@ class GraphExecutorLFParams final : public Params { OutputState &output_state = node_state_.outputs[index]; BLI_assert(!output_state.has_been_computed); if (output_state.value == nullptr) { - LinearAllocator<> &allocator = executor_.local_allocators_.local(); + LinearAllocator<> &allocator = executor_.get_main_or_local_allocator(); const CPPType &type = node_.output(index).type(); output_state.value = allocator.allocate(type.size(), type.alignment()); } @@ -1052,6 +1110,11 @@ class GraphExecutorLFParams final : public Params { { executor_.set_input_unused_during_execution(node_, node_state_, index, current_task_); } + + bool try_enable_multi_threading_impl() override + { + return executor_.try_enable_multi_threading(); + } }; /** @@ -1073,6 +1136,20 @@ inline void Executor::execute_node(const FunctionNode &node, self_.logger_->log_before_node_execute(node, node_params, fn_context); } + /* This is run when the execution of the node calls `lazy_threading::send_hint` to indicate that + * the execution will take a while. In this case, other tasks waiting on this thread should be + * allowed to be picked up by another thread. */ + auto blocking_hint_fn = [&]() { + if (!current_task.has_scheduled_nodes.load()) { + return; + } + if (!this->try_enable_multi_threading()) { + return; + } + this->move_scheduled_nodes_to_task_pool(current_task); + }; + + lazy_threading::HintReceiver blocking_hint_receiver{blocking_hint_fn}; fn.execute(node_params, fn_context); if (self_.logger_ != nullptr) { diff --git a/source/blender/nodes/NOD_geometry_nodes_lazy_function.hh b/source/blender/nodes/NOD_geometry_nodes_lazy_function.hh index 929f20af1c8..240a0115f68 100644 --- a/source/blender/nodes/NOD_geometry_nodes_lazy_function.hh +++ b/source/blender/nodes/NOD_geometry_nodes_lazy_function.hh @@ -124,6 +124,11 @@ struct GeometryNodesLazyFunctionGraphInfo { * Mappings between the lazy-function graph and the #bNodeTree. */ GeometryNodeLazyFunctionGraphMapping mapping; + /** + * Approximate number of nodes in the graph if all sub-graphs were inlined. + * This can be used as a simple heuristic for the complexity of the node group. + */ + int num_inline_nodes_approximate = 0; GeometryNodesLazyFunctionGraphInfo(); ~GeometryNodesLazyFunctionGraphInfo(); @@ -148,6 +153,9 @@ class GeometryNodesLazyFunctionLogger : public fn::lazy_function::GraphExecutor: void dump_when_input_is_set_twice(const lf::InputSocket &target_socket, const lf::OutputSocket &from_socket, const lf::Context &context) const override; + void log_before_node_execute(const lf::FunctionNode &node, + const lf::Params ¶ms, + const lf::Context &context) const override; }; /** diff --git a/source/blender/nodes/geometry/nodes/node_geo_distribute_points_on_faces.cc b/source/blender/nodes/geometry/nodes/node_geo_distribute_points_on_faces.cc index b84ee33e26f..a007f6afcc7 100644 --- a/source/blender/nodes/geometry/nodes/node_geo_distribute_points_on_faces.cc +++ b/source/blender/nodes/geometry/nodes/node_geo_distribute_points_on_faces.cc @@ -533,6 +533,8 @@ static void node_geo_exec(GeoNodeExecParams params) attribute_outputs.rotation_id = StrongAnonymousAttributeID("Rotation"); } + lazy_threading::send_hint(); + geometry_set.modify_geometry_sets([&](GeometrySet &geometry_set) { point_distribution_calculate( geometry_set, selection_field, method, seed, attribute_outputs, params); diff --git a/source/blender/nodes/intern/geometry_nodes_lazy_function.cc b/source/blender/nodes/intern/geometry_nodes_lazy_function.cc index 996cea26718..af6861a59c0 100644 --- a/source/blender/nodes/intern/geometry_nodes_lazy_function.cc +++ b/source/blender/nodes/intern/geometry_nodes_lazy_function.cc @@ -16,6 +16,7 @@ #include "NOD_multi_function.hh" #include "NOD_node_declaration.hh" +#include "BLI_lazy_threading.hh" #include "BLI_map.hh" #include "DNA_ID.h" @@ -559,6 +560,7 @@ class LazyFunctionForViewerNode : public LazyFunction { class LazyFunctionForGroupNode : public LazyFunction { private: const bNode &group_node_; + bool has_many_nodes_ = false; std::optional<GeometryNodesLazyFunctionLogger> lf_logger_; std::optional<GeometryNodesLazyFunctionSideEffectProvider> lf_side_effect_provider_; std::optional<lf::GraphExecutor> graph_executor_; @@ -577,6 +579,8 @@ class LazyFunctionForGroupNode : public LazyFunction { bNodeTree *group_btree = reinterpret_cast<bNodeTree *>(group_node_.id); BLI_assert(group_btree != nullptr); + has_many_nodes_ = lf_graph_info.num_inline_nodes_approximate > 1000; + Vector<const lf::OutputSocket *> graph_inputs; for (const lf::OutputSocket *socket : lf_graph_info.mapping.group_input_sockets) { if (socket != nullptr) { @@ -608,6 +612,12 @@ class LazyFunctionForGroupNode : public LazyFunction { GeoNodesLFUserData *user_data = dynamic_cast<GeoNodesLFUserData *>(context.user_data); BLI_assert(user_data != nullptr); + if (has_many_nodes_) { + /* If the called node group has many nodes, it's likely that executing it takes a while even + * if every individual node is very small. */ + lazy_threading::send_hint(); + } + /* The compute context changes when entering a node group. */ bke::NodeGroupComputeContext compute_context{user_data->compute_context, group_node_.name}; GeoNodesLFUserData group_user_data = *user_data; @@ -699,6 +709,7 @@ struct GeometryNodesLazyFunctionGraphBuilder { this->add_default_inputs(); lf_graph_->update_node_indices(); + lf_graph_info_->num_inline_nodes_approximate += lf_graph_->nodes().size(); } private: @@ -915,6 +926,8 @@ struct GeometryNodesLazyFunctionGraphBuilder { mapping_->bsockets_by_lf_socket_map.add(&lf_socket, &bsocket); } mapping_->group_node_map.add(&bnode, &lf_node); + lf_graph_info_->num_inline_nodes_approximate += + group_lf_graph_info->num_inline_nodes_approximate; } void handle_geometry_node(const bNode &bnode) @@ -1358,4 +1371,52 @@ GeometryNodesLazyFunctionGraphInfo::~GeometryNodesLazyFunctionGraphInfo() } } +static void add_thread_id_debug_message(const GeometryNodesLazyFunctionGraphInfo &lf_graph_info, + const lf::FunctionNode &node, + const lf::Context &context) +{ + static std::atomic<int> thread_id_source = 0; + static thread_local const int thread_id = thread_id_source.fetch_add(1); + static thread_local const std::string thread_id_str = "Thread: " + std::to_string(thread_id); + + GeoNodesLFUserData *user_data = dynamic_cast<GeoNodesLFUserData *>(context.user_data); + BLI_assert(user_data != nullptr); + if (user_data->modifier_data->eval_log == nullptr) { + return; + } + geo_eval_log::GeoTreeLogger &tree_logger = + user_data->modifier_data->eval_log->get_local_tree_logger(*user_data->compute_context); + + /* Find corresponding node based on the socket mapping. */ + auto check_sockets = [&](const Span<const lf::Socket *> lf_sockets) { + for (const lf::Socket *lf_socket : lf_sockets) { + const Span<const bNodeSocket *> bsockets = + lf_graph_info.mapping.bsockets_by_lf_socket_map.lookup(lf_socket); + if (!bsockets.is_empty()) { + const bNodeSocket &bsocket = *bsockets[0]; + const bNode &bnode = bsocket.owner_node(); + tree_logger.debug_messages.append( + {tree_logger.allocator->copy_string(bnode.name), thread_id_str}); + return true; + } + } + return false; + }; + + if (check_sockets(node.inputs().cast<const lf::Socket *>())) { + return; + } + check_sockets(node.outputs().cast<const lf::Socket *>()); +} + +void GeometryNodesLazyFunctionLogger::log_before_node_execute(const lf::FunctionNode &node, + const lf::Params &UNUSED(params), + const lf::Context &context) const +{ + /* Enable this to see the threads that invoked a node. */ + if constexpr (false) { + add_thread_id_debug_message(lf_graph_info_, node, context); + } +} + } // namespace blender::nodes |