Welcome to mirror list, hosted at ThFree Co, Russian Federation.

git.blender.org/blender.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--source/blender/blenlib/BLI_lazy_threading.hh83
-rw-r--r--source/blender/blenlib/BLI_task.hh4
-rw-r--r--source/blender/blenlib/CMakeLists.txt1
-rw-r--r--source/blender/blenlib/intern/lazy_threading.cc30
-rw-r--r--source/blender/blenlib/intern/task_range.cc3
-rw-r--r--source/blender/functions/FN_lazy_function.hh66
-rw-r--r--source/blender/functions/FN_lazy_function_execute.hh1
-rw-r--r--source/blender/functions/intern/lazy_function.cc5
-rw-r--r--source/blender/functions/intern/lazy_function_execute.cc7
-rw-r--r--source/blender/functions/intern/lazy_function_graph_executor.cc285
-rw-r--r--source/blender/nodes/NOD_geometry_nodes_lazy_function.hh8
-rw-r--r--source/blender/nodes/geometry/nodes/node_geo_distribute_points_on_faces.cc2
-rw-r--r--source/blender/nodes/intern/geometry_nodes_lazy_function.cc61
13 files changed, 447 insertions, 109 deletions
diff --git a/source/blender/blenlib/BLI_lazy_threading.hh b/source/blender/blenlib/BLI_lazy_threading.hh
new file mode 100644
index 00000000000..61532fe24f0
--- /dev/null
+++ b/source/blender/blenlib/BLI_lazy_threading.hh
@@ -0,0 +1,83 @@
+/* SPDX-License-Identifier: GPL-2.0-or-later */
+
+#pragma once
+
+/** \file
+ * \ingroup bli
+ *
+ * The goal of "lazy threading" is to avoid using threads unless one can reasonably assume that it
+ * is worth distributing work over multiple threads. Using threads can lead to worse overall
+ * performance by introducing inter-thread communication overhead. Keeping all work on a single
+ * thread reduces this overhead to zero and also makes better use of the CPU cache.
+ *
+ * Functions like #parallel_for also solve this to some degree by using a "grain size". When the
+ * number of individual tasks is too small, no multi-threading is used. This works very well when
+ * there are many homogeneous tasks that can be expected to take approximately the same time.
+ *
+ * The situation becomes more difficult when:
+ * - The individual tasks are not homogeneous, i.e. they take different amounts of time to compute.
+ * - It is practically impossible to guess how long each task will take in advance.
+ *
+ * Given those constraints, a single grain size cannot be determined. One could just schedule all
+ * tasks individually but that would create a lot of overhead when the tasks happen to be very
+ * small. While TBB will keep all tasks on a single thread if the other threads are busy, if they
+ * are idle they will start stealing the work even if that's not benefitial for overall
+ * performance.
+ *
+ * This file provides a simple API that allows a task scheduler to properly handle tasks whose size
+ * is not known in advance. The key idea is this:
+ *
+ * > By default, all work stays on a single thread. If an individual task notices that it is about
+ * > start a computation that will take a while, it notifies the task scheduler further up on the
+ * > stack. The scheduler then allows other threads to take over other tasks that were originally
+ * > meant for the current thread.
+ *
+ * This way, when all tasks are small, no threading overhead has to be paid for. Whenever there is
+ * a task that keeps the current thread busy for a while, the other tasks are moved to a separate
+ * thread so that they can be executed without waiting for the long computation to finish.
+ *
+ * Consequently, the earlier a task knows during it execution that it will take a while, the
+ * better. That's because if it is blocking anyway, it's more efficient to move the other tasks to
+ * another thread earlier.
+ *
+ * To make this work, three things have to be solved:
+ * 1. The task scheduler has to be able to start single-threaded and become multi-threaded after
+ * tasks have started executing. This has to be solved in the specific task scheduler.
+ * 2. There has to be a way for the currently running task to tell the task scheduler that it is
+ * about to perform a computation that will take a while and that it would be reasonable to move
+ * other tasks to other threads. This part is implemented in the API provided by this file.
+ * 3. Individual tasks have to decide when a computation is long enough to justify talking to the
+ * scheduler. This is always based on heuristics that have to be fine tuned over time. One could
+ * assume that this means adding new work-size checks to many parts in Blender, but that's
+ * actually not necessary, because these checks exist already in the form of grain sizes passed
+ * to e.g. #parallel_for. The assumption here is that when the task thinks the current work load
+ * is big enough to justify using threads, it's also big enough to justify using another thread
+ * for waiting tasks on the current thread.
+ */
+
+#include "BLI_function_ref.hh"
+
+namespace blender::lazy_threading {
+
+/**
+ * Tell task schedulers on the current thread that it is about to start a long computation
+ * and that other waiting tasks should better be moved to another thread if possible.
+ */
+void send_hint();
+
+/**
+ * Used by the task scheduler to receive hints from current tasks that they will take a while.
+ * This should only be allocated on the stack.
+ */
+class HintReceiver {
+ public:
+ /**
+ * The passed in function is called when a task signals that it will take a while.
+ * \note The function has to stay alive after the call to the constructor. So one must not pass a
+ * lambda directly into this constructor but store it in a separate variable on the stack first.
+ */
+ HintReceiver(FunctionRef<void()> fn);
+ ~HintReceiver();
+};
+
+} // namespace blender::lazy_threading
diff --git a/source/blender/blenlib/BLI_task.hh b/source/blender/blenlib/BLI_task.hh
index 33a781d3749..9f9a57be634 100644
--- a/source/blender/blenlib/BLI_task.hh
+++ b/source/blender/blenlib/BLI_task.hh
@@ -31,6 +31,7 @@
#endif
#include "BLI_index_range.hh"
+#include "BLI_lazy_threading.hh"
#include "BLI_utildefines.h"
namespace blender::threading {
@@ -56,6 +57,7 @@ void parallel_for(IndexRange range, int64_t grain_size, const Function &function
#ifdef WITH_TBB
/* Invoking tbb for small workloads has a large overhead. */
if (range.size() >= grain_size) {
+ lazy_threading::send_hint();
tbb::parallel_for(
tbb::blocked_range<int64_t>(range.first(), range.one_after_last(), grain_size),
[&](const tbb::blocked_range<int64_t> &subrange) {
@@ -78,6 +80,7 @@ Value parallel_reduce(IndexRange range,
{
#ifdef WITH_TBB
if (range.size() >= grain_size) {
+ lazy_threading::send_hint();
return tbb::parallel_reduce(
tbb::blocked_range<int64_t>(range.first(), range.one_after_last(), grain_size),
identity,
@@ -114,6 +117,7 @@ template<typename... Functions>
void parallel_invoke(const bool use_threading, Functions &&...functions)
{
if (use_threading) {
+ lazy_threading::send_hint();
parallel_invoke(std::forward<Functions>(functions)...);
}
else {
diff --git a/source/blender/blenlib/CMakeLists.txt b/source/blender/blenlib/CMakeLists.txt
index 50dc11cbf0a..36acbd41ad7 100644
--- a/source/blender/blenlib/CMakeLists.txt
+++ b/source/blender/blenlib/CMakeLists.txt
@@ -85,6 +85,7 @@ set(SRC
intern/kdtree_3d.c
intern/kdtree_4d.c
intern/lasso_2d.c
+ intern/lazy_threading.cc
intern/length_parameterize.cc
intern/listbase.c
intern/math_base.c
diff --git a/source/blender/blenlib/intern/lazy_threading.cc b/source/blender/blenlib/intern/lazy_threading.cc
new file mode 100644
index 00000000000..803fd81a96d
--- /dev/null
+++ b/source/blender/blenlib/intern/lazy_threading.cc
@@ -0,0 +1,30 @@
+/* SPDX-License-Identifier: GPL-2.0-or-later */
+
+#include "BLI_lazy_threading.hh"
+#include "BLI_vector.hh"
+
+namespace blender::lazy_threading {
+
+/**
+ * This is a #RawVector so that it can be destructed after Blender checks for memory leaks.
+ */
+thread_local RawVector<FunctionRef<void()>, 0> hint_receivers;
+
+void send_hint()
+{
+ for (const FunctionRef<void()> &fn : hint_receivers) {
+ fn();
+ }
+}
+
+HintReceiver::HintReceiver(const FunctionRef<void()> fn)
+{
+ hint_receivers.append(fn);
+}
+
+HintReceiver::~HintReceiver()
+{
+ hint_receivers.pop_last();
+}
+
+} // namespace blender::lazy_threading
diff --git a/source/blender/blenlib/intern/task_range.cc b/source/blender/blenlib/intern/task_range.cc
index 7e405529f03..181b760bea1 100644
--- a/source/blender/blenlib/intern/task_range.cc
+++ b/source/blender/blenlib/intern/task_range.cc
@@ -12,6 +12,7 @@
#include "DNA_listBase.h"
+#include "BLI_lazy_threading.hh"
#include "BLI_task.h"
#include "BLI_threads.h"
@@ -104,6 +105,8 @@ void BLI_task_parallel_range(const int start,
const size_t grainsize = MAX2(settings->min_iter_per_thread, 1);
const tbb::blocked_range<int> range(start, stop, grainsize);
+ blender::lazy_threading::send_hint();
+
if (settings->func_reduce) {
parallel_reduce(range, task);
if (settings->userdata_chunk) {
diff --git a/source/blender/functions/FN_lazy_function.hh b/source/blender/functions/FN_lazy_function.hh
index 59a3a90b0b0..4a539e7cbd1 100644
--- a/source/blender/functions/FN_lazy_function.hh
+++ b/source/blender/functions/FN_lazy_function.hh
@@ -43,6 +43,13 @@
#include "BLI_linear_allocator.hh"
#include "BLI_vector.hh"
+#include <atomic>
+#include <thread>
+
+#ifdef DEBUG
+# define FN_LAZY_FUNCTION_DEBUG_THREADS
+#endif
+
namespace blender::fn::lazy_function {
enum class ValueUsage {
@@ -102,9 +109,13 @@ class Params {
* The lazy-function this #Params has been prepared for.
*/
const LazyFunction &fn_;
+#ifdef FN_LAZY_FUNCTION_DEBUG_THREADS
+ std::thread::id main_thread_id_;
+ std::atomic<bool> allow_multi_threading_;
+#endif
public:
- Params(const LazyFunction &fn);
+ Params(const LazyFunction &fn, bool allow_multi_threading_initially);
/**
* Get a pointer to an input value if the value is available already. Otherwise null is returned.
@@ -154,7 +165,7 @@ class Params {
* Typed utility methods that wrap the methods above.
*/
template<typename T> T extract_input(int index);
- template<typename T> const T &get_input(int index);
+ template<typename T> const T &get_input(int index) const;
template<typename T> T *try_get_input_data_ptr_or_request(int index);
template<typename T> void set_output(int index, T &&value);
@@ -163,7 +174,15 @@ class Params {
*/
void set_default_remaining_outputs();
+ /**
+ * Returns true when the lazy-function is now allowed to use multi-threading when interacting
+ * with this #Params. That means, it is allowed to call non-const methods from different threads.
+ */
+ bool try_enable_multi_threading();
+
private:
+ void assert_valid_thread() const;
+
/**
* Methods that need to be implemented by subclasses. Those are separate from the non-virtual
* methods above to make it easy to insert additional debugging logic on top of the
@@ -176,6 +195,7 @@ class Params {
virtual bool output_was_set_impl(int index) const = 0;
virtual ValueUsage get_output_usage_impl(int index) const = 0;
virtual void set_input_unused_impl(int index) = 0;
+ virtual bool try_enable_multi_threading_impl();
};
/**
@@ -312,7 +332,14 @@ inline void LazyFunction::execute(Params &params, const Context &context) const
/** \name #Params Inline Methods
* \{ */
-inline Params::Params(const LazyFunction &fn) : fn_(fn)
+inline Params::Params(const LazyFunction &fn,
+ [[maybe_unused]] bool allow_multi_threading_initially)
+ : fn_(fn)
+#ifdef FN_LAZY_FUNCTION_DEBUG_THREADS
+ ,
+ main_thread_id_(std::this_thread::get_id()),
+ allow_multi_threading_(allow_multi_threading_initially)
+#endif
{
}
@@ -323,16 +350,19 @@ inline void *Params::try_get_input_data_ptr(const int index) const
inline void *Params::try_get_input_data_ptr_or_request(const int index)
{
+ this->assert_valid_thread();
return this->try_get_input_data_ptr_or_request_impl(index);
}
inline void *Params::get_output_data_ptr(const int index)
{
+ this->assert_valid_thread();
return this->get_output_data_ptr_impl(index);
}
inline void Params::output_set(const int index)
{
+ this->assert_valid_thread();
this->output_set_impl(index);
}
@@ -348,18 +378,20 @@ inline ValueUsage Params::get_output_usage(const int index) const
inline void Params::set_input_unused(const int index)
{
+ this->assert_valid_thread();
this->set_input_unused_impl(index);
}
template<typename T> inline T Params::extract_input(const int index)
{
+ this->assert_valid_thread();
void *data = this->try_get_input_data_ptr(index);
BLI_assert(data != nullptr);
T return_value = std::move(*static_cast<T *>(data));
return return_value;
}
-template<typename T> inline const T &Params::get_input(const int index)
+template<typename T> inline const T &Params::get_input(const int index) const
{
const void *data = this->try_get_input_data_ptr(index);
BLI_assert(data != nullptr);
@@ -368,17 +400,43 @@ template<typename T> inline const T &Params::get_input(const int index)
template<typename T> inline T *Params::try_get_input_data_ptr_or_request(const int index)
{
+ this->assert_valid_thread();
return static_cast<T *>(this->try_get_input_data_ptr_or_request(index));
}
template<typename T> inline void Params::set_output(const int index, T &&value)
{
using DecayT = std::decay_t<T>;
+ this->assert_valid_thread();
void *data = this->get_output_data_ptr(index);
new (data) DecayT(std::forward<T>(value));
this->output_set(index);
}
+inline bool Params::try_enable_multi_threading()
+{
+ this->assert_valid_thread();
+ const bool success = this->try_enable_multi_threading_impl();
+#ifdef FN_LAZY_FUNCTION_DEBUG_THREADS
+ if (success) {
+ allow_multi_threading_ = true;
+ }
+#endif
+ return success;
+}
+
+inline void Params::assert_valid_thread() const
+{
+#ifdef FN_LAZY_FUNCTION_DEBUG_THREADS
+ if (allow_multi_threading_) {
+ return;
+ }
+ if (main_thread_id_ != std::this_thread::get_id()) {
+ BLI_assert_unreachable();
+ }
+#endif
+}
+
/** \} */
} // namespace blender::fn::lazy_function
diff --git a/source/blender/functions/FN_lazy_function_execute.hh b/source/blender/functions/FN_lazy_function_execute.hh
index ade23ad17c7..31bbddf5baf 100644
--- a/source/blender/functions/FN_lazy_function_execute.hh
+++ b/source/blender/functions/FN_lazy_function_execute.hh
@@ -41,6 +41,7 @@ class BasicParams : public Params {
bool output_was_set_impl(const int index) const override;
ValueUsage get_output_usage_impl(const int index) const override;
void set_input_unused_impl(const int index) override;
+ bool try_enable_multi_threading_impl() override;
};
namespace detail {
diff --git a/source/blender/functions/intern/lazy_function.cc b/source/blender/functions/intern/lazy_function.cc
index 46572283e9b..2d69af84f85 100644
--- a/source/blender/functions/intern/lazy_function.cc
+++ b/source/blender/functions/intern/lazy_function.cc
@@ -63,4 +63,9 @@ void Params::set_default_remaining_outputs()
}
}
+bool Params::try_enable_multi_threading_impl()
+{
+ return false;
+}
+
} // namespace blender::fn::lazy_function
diff --git a/source/blender/functions/intern/lazy_function_execute.cc b/source/blender/functions/intern/lazy_function_execute.cc
index 279056afa99..cea9b48d5bc 100644
--- a/source/blender/functions/intern/lazy_function_execute.cc
+++ b/source/blender/functions/intern/lazy_function_execute.cc
@@ -14,7 +14,7 @@ BasicParams::BasicParams(const LazyFunction &fn,
MutableSpan<std::optional<ValueUsage>> input_usages,
Span<ValueUsage> output_usages,
MutableSpan<bool> set_outputs)
- : Params(fn),
+ : Params(fn, true),
inputs_(inputs),
outputs_(outputs),
input_usages_(input_usages),
@@ -62,4 +62,9 @@ void BasicParams::set_input_unused_impl(const int index)
input_usages_[index] = ValueUsage::Unused;
}
+bool BasicParams::try_enable_multi_threading_impl()
+{
+ return true;
+}
+
} // namespace blender::fn::lazy_function
diff --git a/source/blender/functions/intern/lazy_function_graph_executor.cc b/source/blender/functions/intern/lazy_function_graph_executor.cc
index 176509bd687..f3ce2476085 100644
--- a/source/blender/functions/intern/lazy_function_graph_executor.cc
+++ b/source/blender/functions/intern/lazy_function_graph_executor.cc
@@ -3,18 +3,20 @@
/**
* This file implements the evaluation of a lazy-function graph. It's main objectives are:
* - Only compute values that are actually used.
- * - Allow spreading the work over an arbitrary number of CPU cores.
+ * - Stay single threaded when nodes are executed quickly.
+ * - Allow spreading the work over an arbitrary number of threads efficiently.
*
- * Other (simpler) executors with different main objectives could be implemented in the future. For
- * some scenarios those could be simpler when many nodes do very little work or most nodes have to
- * be processed sequentially. Those assumptions make the first and second objective less important
- * respectively.
+ * This executor makes use of `FN_lazy_threading.hh` to enable multi-threading only when it seems
+ * benefitial. It operates in two modes: single- and multi-threaded. The use of a task pool and
+ * locks is avoided in single-threaded mode. Once multi-threading is enabled the executor starts
+ * using both. It is not possible to switch back from multi-threaded to single-threaded mode.
*
- * The design implemented in this executor requires *no* main thread that coordinates everything.
- * Instead, one thread will trigger some initial work and then many threads coordinate themselves
- * in a distributed fashion. In an ideal situation, every thread ends up processing a separate part
- * of the graph which results in less communication overhead. The way TBB schedules tasks helps
- * with that: a thread will next process the task that it added to a task pool just before.
+ * The multi-threading design implemented in this executor requires *no* main thread that
+ * coordinates everything. Instead, one thread will trigger some initial work and then many threads
+ * coordinate themselves in a distributed fashion. In an ideal situation, every thread ends up
+ * processing a separate part of the graph which results in less communication overhead. The way
+ * TBB schedules tasks helps with that: a thread will next process the task that it added to a task
+ * pool just before.
*
* Communication between threads is synchronized by using a mutex in every node. When a thread
* wants to access the state of a node, its mutex has to be locked first (with some documented
@@ -26,15 +28,14 @@
* state of its inputs and outputs. Every time a node is executed, it has to advance its state in
* some way (e.g. it requests a new input or computes a new output).
*
- * At the core of the executor is a task pool. Every task in that pool represents a node execution.
- * When a node is executed it may send notifications to other nodes which may in turn add those
- * nodes to the task pool. For example, the current node has computed one of its outputs, then the
+ * When a node is executed it may send notifications to other nodes which may in turn schedule
+ * those nodes. For example, when the current node has computed one of its outputs, then the
* computed value is forwarded to all linked inputs, changing their node states in the process. If
- * this input was the last missing required input, the node will be added to the task pool so that
- * it is executed next.
+ * this input was the last missing required input, the node will be scheduled that it is executed
+ * next.
*
- * When the task pool is empty, the executor gives back control to the caller which may later
- * provide new inputs to the graph which in turn adds new nodes to the task pool and the process
+ * When all tasks are completed, the executor gives back control to the caller which may later
+ * provide new inputs to the graph which in turn leads to new nodes being scheduled and the process
* starts again.
*/
@@ -190,27 +191,31 @@ struct LockedNode {
*/
Vector<const OutputSocket *> delayed_required_outputs;
Vector<const OutputSocket *> delayed_unused_outputs;
- Vector<const FunctionNode *> delayed_scheduled_nodes;
LockedNode(const Node &node, NodeState &node_state) : node(node), node_state(node_state)
{
}
};
+class Executor;
+class GraphExecutorLFParams;
+
struct CurrentTask {
/**
- * The node that should be run on the same thread after the current node is done. This avoids
- * some overhead by skipping a round trip through the task pool.
+ * Mutex used to protect #scheduled_nodes when the executor uses multi-threading.
*/
- std::atomic<const FunctionNode *> next_node = nullptr;
+ std::mutex mutex;
/**
- * Indicates that some node has been added to the task pool.
+ * Nodes that have been scheduled to execute next.
*/
- std::atomic<bool> added_node_to_pool = false;
+ Vector<const FunctionNode *> scheduled_nodes;
+ /**
+ * Makes it cheaper to check if there are any scheduled nodes because it avoids locking the
+ * mutex.
+ */
+ std::atomic<bool> has_scheduled_nodes = false;
};
-class GraphExecutorLFParams;
-
class Executor {
private:
const GraphExecutor &self_;
@@ -230,13 +235,18 @@ class Executor {
const Context *context_ = nullptr;
/**
* Used to distribute work on separate nodes to separate threads.
+ * If this is empty, the executor is in single threaded mode.
*/
- TaskPool *task_pool_ = nullptr;
+ std::atomic<TaskPool *> task_pool_ = nullptr;
+#ifdef FN_LAZY_FUNCTION_DEBUG_THREADS
+ std::thread::id current_main_thread_;
+#endif
/**
* A separate linear allocator for every thread. We could potentially reuse some memory, but that
* doesn't seem worth it yet.
*/
threading::EnumerableThreadSpecific<LinearAllocator<>> local_allocators_;
+ LinearAllocator<> *main_local_allocator_ = nullptr;
/**
* Set to false when the first execution ends.
*/
@@ -249,11 +259,14 @@ class Executor {
{
/* The indices are necessary, because they are used as keys in #node_states_. */
BLI_assert(self_.graph_.node_indices_are_valid());
+ main_local_allocator_ = &local_allocators_.local();
}
~Executor()
{
- BLI_task_pool_free(task_pool_);
+ if (TaskPool *task_pool = task_pool_.load()) {
+ BLI_task_pool_free(task_pool);
+ }
threading::parallel_for(node_states_.index_range(), 1024, [&](const IndexRange range) {
for (const int node_index : range) {
const Node &node = *self_.graph_.nodes()[node_index];
@@ -270,18 +283,23 @@ class Executor {
{
params_ = &params;
context_ = &context;
- BLI_SCOPED_DEFER([&]() {
- /* Make sure the #params_ pointer is not dangling, even when it shouldn't be accessed by
- * anyone. */
+#ifdef FN_LAZY_FUNCTION_DEBUG_THREADS
+ current_main_thread_ = std::this_thread::get_id();
+#endif
+ const auto deferred_func = [&]() {
+ /* Make sure the pointers are not dangling, even when it shouldn't be accessed by anyone. */
params_ = nullptr;
context_ = nullptr;
is_first_execution_ = false;
- });
+#ifdef FN_LAZY_FUNCTION_DEBUG_THREADS
+ current_main_thread_ = {};
+#endif
+ };
+ BLI_SCOPED_DEFER(deferred_func);
CurrentTask current_task;
if (is_first_execution_) {
this->initialize_node_states();
- task_pool_ = BLI_task_pool_create(this, TASK_PRIORITY_HIGH);
/* Initialize atomics to zero. */
memset(static_cast<void *>(loaded_inputs_.data()), 0, loaded_inputs_.size() * sizeof(bool));
@@ -294,21 +312,11 @@ class Executor {
this->schedule_newly_requested_outputs(current_task);
this->forward_newly_provided_inputs(current_task);
- /* Avoid using task pool when there is no parallel work to do. */
- while (!current_task.added_node_to_pool) {
- if (current_task.next_node == nullptr) {
- /* Nothing to do. */
- return;
- }
- const FunctionNode &node = *current_task.next_node;
- current_task.next_node = nullptr;
- this->run_node_task(node, current_task);
- }
- if (current_task.next_node != nullptr) {
- this->add_node_to_task_pool(*current_task.next_node);
- }
+ this->run_task(current_task);
- BLI_task_pool_work_and_wait(task_pool_);
+ if (TaskPool *task_pool = task_pool_.load()) {
+ BLI_task_pool_work_and_wait(task_pool);
+ }
}
private:
@@ -426,7 +434,7 @@ class Executor {
NodeState &node_state = *node_states_[node->index_in_graph()];
node_state.has_side_effects = true;
this->with_locked_node(*node, node_state, current_task, [&](LockedNode &locked_node) {
- this->schedule_node(locked_node);
+ this->schedule_node(locked_node, current_task);
});
}
}
@@ -434,7 +442,7 @@ class Executor {
void forward_newly_provided_inputs(CurrentTask &current_task)
{
- LinearAllocator<> &allocator = local_allocators_.local();
+ LinearAllocator<> &allocator = this->get_main_or_local_allocator();
for (const int graph_input_index : self_.graph_inputs_.index_range()) {
std::atomic<uint8_t> &was_loaded = loaded_inputs_[graph_input_index];
if (was_loaded.load()) {
@@ -488,7 +496,7 @@ class Executor {
return;
}
this->forward_newly_provided_input(
- current_task, local_allocators_.local(), graph_input_index, input_data);
+ current_task, this->get_main_or_local_allocator(), graph_input_index, input_data);
return;
}
@@ -498,7 +506,7 @@ class Executor {
return;
}
output_state.usage = ValueUsage::Used;
- this->schedule_node(locked_node);
+ this->schedule_node(locked_node, current_task);
});
}
@@ -520,25 +528,28 @@ class Executor {
params_->set_input_unused(graph_input_index);
}
else {
- this->schedule_node(locked_node);
+ this->schedule_node(locked_node, current_task);
}
}
}
});
}
- void schedule_node(LockedNode &locked_node)
+ void schedule_node(LockedNode &locked_node, CurrentTask &current_task)
{
BLI_assert(locked_node.node.is_function());
switch (locked_node.node_state.schedule_state) {
case NodeScheduleState::NotScheduled: {
- /* Don't add the node to the task pool immediately, because the task pool might start
- * executing it immediately (when Blender is started with a single thread).
- * That would often result in a deadlock, because we are still holding the mutex of the
- * current node. Also see comments in #LockedNode. */
locked_node.node_state.schedule_state = NodeScheduleState::Scheduled;
- locked_node.delayed_scheduled_nodes.append(
- &static_cast<const FunctionNode &>(locked_node.node));
+ const FunctionNode &node = static_cast<const FunctionNode &>(locked_node.node);
+ if (this->use_multi_threading()) {
+ std::lock_guard lock{current_task.mutex};
+ current_task.scheduled_nodes.append(&node);
+ }
+ else {
+ current_task.scheduled_nodes.append(&node);
+ }
+ current_task.has_scheduled_nodes.store(true, std::memory_order_relaxed);
break;
}
case NodeScheduleState::Scheduled: {
@@ -562,14 +573,16 @@ class Executor {
BLI_assert(&node_state == node_states_[node.index_in_graph()]);
LockedNode locked_node{node, node_state};
- {
+ if (this->use_multi_threading()) {
std::lock_guard lock{node_state.mutex};
threading::isolate_task([&]() { f(locked_node); });
}
+ else {
+ f(locked_node);
+ }
this->send_output_required_notifications(locked_node.delayed_required_outputs, current_task);
this->send_output_unused_notifications(locked_node.delayed_unused_outputs, current_task);
- this->schedule_new_nodes(locked_node.delayed_scheduled_nodes, current_task);
}
void send_output_required_notifications(const Span<const OutputSocket *> sockets,
@@ -588,49 +601,21 @@ class Executor {
}
}
- void schedule_new_nodes(const Span<const FunctionNode *> nodes, CurrentTask &current_task)
+ void run_task(CurrentTask &current_task)
{
- for (const FunctionNode *node_to_schedule : nodes) {
- /* Avoid a round trip through the task pool for the first node that is scheduled by the
- * current node execution. Other nodes are added to the pool so that other threads can pick
- * them up. */
- const FunctionNode *expected = nullptr;
- if (current_task.next_node.compare_exchange_strong(
- expected, node_to_schedule, std::memory_order_relaxed)) {
- continue;
+ while (!current_task.scheduled_nodes.is_empty()) {
+ const FunctionNode &node = *current_task.scheduled_nodes.pop_last();
+ if (current_task.scheduled_nodes.is_empty()) {
+ current_task.has_scheduled_nodes.store(false, std::memory_order_relaxed);
}
- this->add_node_to_task_pool(*node_to_schedule);
- current_task.added_node_to_pool.store(true, std::memory_order_relaxed);
- }
- }
-
- void add_node_to_task_pool(const Node &node)
- {
- BLI_task_pool_push(
- task_pool_, Executor::run_node_from_task_pool, (void *)&node, false, nullptr);
- }
-
- static void run_node_from_task_pool(TaskPool *task_pool, void *task_data)
- {
- void *user_data = BLI_task_pool_user_data(task_pool);
- Executor &executor = *static_cast<Executor *>(user_data);
- const FunctionNode &node = *static_cast<const FunctionNode *>(task_data);
-
- /* This loop reduces the number of round trips through the task pool as long as the current
- * node is scheduling more nodes. */
- CurrentTask current_task;
- current_task.next_node = &node;
- while (current_task.next_node != nullptr) {
- const FunctionNode &node_to_run = *current_task.next_node;
- current_task.next_node = nullptr;
- executor.run_node_task(node_to_run, current_task);
+ this->run_node_task(node, current_task);
}
}
void run_node_task(const FunctionNode &node, CurrentTask &current_task)
{
NodeState &node_state = *node_states_[node.index_in_graph()];
- LinearAllocator<> &allocator = local_allocators_.local();
+ LinearAllocator<> &allocator = this->get_main_or_local_allocator();
const LazyFunction &fn = node.function();
bool node_needs_execution = false;
@@ -672,7 +657,7 @@ class Executor {
}
void *buffer = allocator.allocate(type.size(), type.alignment());
type.copy_construct(default_value, buffer);
- this->forward_value_to_input(locked_node, input_state, {type, buffer});
+ this->forward_value_to_input(locked_node, input_state, {type, buffer}, current_task);
}
/* Request linked inputs that are always needed. */
@@ -723,7 +708,7 @@ class Executor {
NodeScheduleState::RunningAndRescheduled;
node_state.schedule_state = NodeScheduleState::NotScheduled;
if (reschedule_requested && !node_state.node_has_finished) {
- this->schedule_node(locked_node);
+ this->schedule_node(locked_node, current_task);
}
});
}
@@ -887,7 +872,7 @@ class Executor {
CurrentTask &current_task)
{
BLI_assert(value_to_forward.get() != nullptr);
- LinearAllocator<> &allocator = local_allocators_.local();
+ LinearAllocator<> &allocator = this->get_main_or_local_allocator();
const CPPType &type = *value_to_forward.type();
if (self_.logger_ != nullptr) {
@@ -938,13 +923,13 @@ class Executor {
}
if (is_last_target) {
/* No need to make a copy if this is the last target. */
- this->forward_value_to_input(locked_node, input_state, value_to_forward);
+ this->forward_value_to_input(locked_node, input_state, value_to_forward, current_task);
value_to_forward = {};
}
else {
void *buffer = allocator.allocate(type.size(), type.alignment());
type.copy_construct(value_to_forward.get(), buffer);
- this->forward_value_to_input(locked_node, input_state, {type, buffer});
+ this->forward_value_to_input(locked_node, input_state, {type, buffer}, current_task);
}
});
}
@@ -955,7 +940,8 @@ class Executor {
void forward_value_to_input(LockedNode &locked_node,
InputState &input_state,
- GMutablePointer value)
+ GMutablePointer value,
+ CurrentTask &current_task)
{
NodeState &node_state = locked_node.node_state;
@@ -966,10 +952,82 @@ class Executor {
if (input_state.usage == ValueUsage::Used) {
node_state.missing_required_inputs -= 1;
if (node_state.missing_required_inputs == 0) {
- this->schedule_node(locked_node);
+ this->schedule_node(locked_node, current_task);
}
}
}
+
+ bool use_multi_threading() const
+ {
+ return task_pool_.load() != nullptr;
+ }
+
+ bool try_enable_multi_threading()
+ {
+ if (this->use_multi_threading()) {
+ return true;
+ }
+#ifdef FN_LAZY_FUNCTION_DEBUG_THREADS
+ /* Only the current main thread is allowed to enabled multi-threading, because the executor is
+ * still in single-threaded mode. */
+ if (current_main_thread_ != std::this_thread::get_id()) {
+ BLI_assert_unreachable();
+ }
+#endif
+ /* Check of the caller supports multi-threading. */
+ if (!params_->try_enable_multi_threading()) {
+ return false;
+ }
+ /* Avoid using multiple threads when only one thread can be used anyway. */
+ if (BLI_system_thread_count() <= 1) {
+ return false;
+ }
+ task_pool_.store(BLI_task_pool_create(this, TASK_PRIORITY_HIGH));
+ return true;
+ }
+
+ /**
+ * Allow other threads to steal all the nodes that are currently scheduled on this thread.
+ */
+ void move_scheduled_nodes_to_task_pool(CurrentTask &current_task)
+ {
+ BLI_assert(this->use_multi_threading());
+ using FunctionNodeVector = Vector<const FunctionNode *>;
+ FunctionNodeVector *nodes = MEM_new<FunctionNodeVector>(__func__);
+ {
+ std::lock_guard lock{current_task.mutex};
+ if (current_task.scheduled_nodes.is_empty()) {
+ return;
+ }
+ *nodes = std::move(current_task.scheduled_nodes);
+ current_task.has_scheduled_nodes.store(false, std::memory_order_relaxed);
+ }
+ /* All nodes are pushed as a single task in the pool. This avoids unnecessary threading
+ * overhead when the nodes are fast to compute. */
+ BLI_task_pool_push(
+ task_pool_.load(),
+ [](TaskPool *pool, void *data) {
+ Executor &executor = *static_cast<Executor *>(BLI_task_pool_user_data(pool));
+ FunctionNodeVector &nodes = *static_cast<FunctionNodeVector *>(data);
+ CurrentTask new_current_task;
+ new_current_task.scheduled_nodes = std::move(nodes);
+ new_current_task.has_scheduled_nodes.store(true, std::memory_order_relaxed);
+ executor.run_task(new_current_task);
+ },
+ nodes,
+ true,
+ [](TaskPool * /*pool*/, void *data) {
+ MEM_delete(static_cast<FunctionNodeVector *>(data));
+ });
+ }
+
+ LinearAllocator<> &get_main_or_local_allocator()
+ {
+ if (this->use_multi_threading()) {
+ return local_allocators_.local();
+ }
+ return *main_local_allocator_;
+ }
};
class GraphExecutorLFParams final : public Params {
@@ -985,7 +1043,7 @@ class GraphExecutorLFParams final : public Params {
const Node &node,
NodeState &node_state,
CurrentTask &current_task)
- : Params(fn),
+ : Params(fn, executor.use_multi_threading()),
executor_(executor),
node_(node),
node_state_(node_state),
@@ -1017,7 +1075,7 @@ class GraphExecutorLFParams final : public Params {
OutputState &output_state = node_state_.outputs[index];
BLI_assert(!output_state.has_been_computed);
if (output_state.value == nullptr) {
- LinearAllocator<> &allocator = executor_.local_allocators_.local();
+ LinearAllocator<> &allocator = executor_.get_main_or_local_allocator();
const CPPType &type = node_.output(index).type();
output_state.value = allocator.allocate(type.size(), type.alignment());
}
@@ -1052,6 +1110,11 @@ class GraphExecutorLFParams final : public Params {
{
executor_.set_input_unused_during_execution(node_, node_state_, index, current_task_);
}
+
+ bool try_enable_multi_threading_impl() override
+ {
+ return executor_.try_enable_multi_threading();
+ }
};
/**
@@ -1073,6 +1136,20 @@ inline void Executor::execute_node(const FunctionNode &node,
self_.logger_->log_before_node_execute(node, node_params, fn_context);
}
+ /* This is run when the execution of the node calls `lazy_threading::send_hint` to indicate that
+ * the execution will take a while. In this case, other tasks waiting on this thread should be
+ * allowed to be picked up by another thread. */
+ auto blocking_hint_fn = [&]() {
+ if (!current_task.has_scheduled_nodes.load()) {
+ return;
+ }
+ if (!this->try_enable_multi_threading()) {
+ return;
+ }
+ this->move_scheduled_nodes_to_task_pool(current_task);
+ };
+
+ lazy_threading::HintReceiver blocking_hint_receiver{blocking_hint_fn};
fn.execute(node_params, fn_context);
if (self_.logger_ != nullptr) {
diff --git a/source/blender/nodes/NOD_geometry_nodes_lazy_function.hh b/source/blender/nodes/NOD_geometry_nodes_lazy_function.hh
index 929f20af1c8..240a0115f68 100644
--- a/source/blender/nodes/NOD_geometry_nodes_lazy_function.hh
+++ b/source/blender/nodes/NOD_geometry_nodes_lazy_function.hh
@@ -124,6 +124,11 @@ struct GeometryNodesLazyFunctionGraphInfo {
* Mappings between the lazy-function graph and the #bNodeTree.
*/
GeometryNodeLazyFunctionGraphMapping mapping;
+ /**
+ * Approximate number of nodes in the graph if all sub-graphs were inlined.
+ * This can be used as a simple heuristic for the complexity of the node group.
+ */
+ int num_inline_nodes_approximate = 0;
GeometryNodesLazyFunctionGraphInfo();
~GeometryNodesLazyFunctionGraphInfo();
@@ -148,6 +153,9 @@ class GeometryNodesLazyFunctionLogger : public fn::lazy_function::GraphExecutor:
void dump_when_input_is_set_twice(const lf::InputSocket &target_socket,
const lf::OutputSocket &from_socket,
const lf::Context &context) const override;
+ void log_before_node_execute(const lf::FunctionNode &node,
+ const lf::Params &params,
+ const lf::Context &context) const override;
};
/**
diff --git a/source/blender/nodes/geometry/nodes/node_geo_distribute_points_on_faces.cc b/source/blender/nodes/geometry/nodes/node_geo_distribute_points_on_faces.cc
index b84ee33e26f..a007f6afcc7 100644
--- a/source/blender/nodes/geometry/nodes/node_geo_distribute_points_on_faces.cc
+++ b/source/blender/nodes/geometry/nodes/node_geo_distribute_points_on_faces.cc
@@ -533,6 +533,8 @@ static void node_geo_exec(GeoNodeExecParams params)
attribute_outputs.rotation_id = StrongAnonymousAttributeID("Rotation");
}
+ lazy_threading::send_hint();
+
geometry_set.modify_geometry_sets([&](GeometrySet &geometry_set) {
point_distribution_calculate(
geometry_set, selection_field, method, seed, attribute_outputs, params);
diff --git a/source/blender/nodes/intern/geometry_nodes_lazy_function.cc b/source/blender/nodes/intern/geometry_nodes_lazy_function.cc
index 996cea26718..af6861a59c0 100644
--- a/source/blender/nodes/intern/geometry_nodes_lazy_function.cc
+++ b/source/blender/nodes/intern/geometry_nodes_lazy_function.cc
@@ -16,6 +16,7 @@
#include "NOD_multi_function.hh"
#include "NOD_node_declaration.hh"
+#include "BLI_lazy_threading.hh"
#include "BLI_map.hh"
#include "DNA_ID.h"
@@ -559,6 +560,7 @@ class LazyFunctionForViewerNode : public LazyFunction {
class LazyFunctionForGroupNode : public LazyFunction {
private:
const bNode &group_node_;
+ bool has_many_nodes_ = false;
std::optional<GeometryNodesLazyFunctionLogger> lf_logger_;
std::optional<GeometryNodesLazyFunctionSideEffectProvider> lf_side_effect_provider_;
std::optional<lf::GraphExecutor> graph_executor_;
@@ -577,6 +579,8 @@ class LazyFunctionForGroupNode : public LazyFunction {
bNodeTree *group_btree = reinterpret_cast<bNodeTree *>(group_node_.id);
BLI_assert(group_btree != nullptr);
+ has_many_nodes_ = lf_graph_info.num_inline_nodes_approximate > 1000;
+
Vector<const lf::OutputSocket *> graph_inputs;
for (const lf::OutputSocket *socket : lf_graph_info.mapping.group_input_sockets) {
if (socket != nullptr) {
@@ -608,6 +612,12 @@ class LazyFunctionForGroupNode : public LazyFunction {
GeoNodesLFUserData *user_data = dynamic_cast<GeoNodesLFUserData *>(context.user_data);
BLI_assert(user_data != nullptr);
+ if (has_many_nodes_) {
+ /* If the called node group has many nodes, it's likely that executing it takes a while even
+ * if every individual node is very small. */
+ lazy_threading::send_hint();
+ }
+
/* The compute context changes when entering a node group. */
bke::NodeGroupComputeContext compute_context{user_data->compute_context, group_node_.name};
GeoNodesLFUserData group_user_data = *user_data;
@@ -699,6 +709,7 @@ struct GeometryNodesLazyFunctionGraphBuilder {
this->add_default_inputs();
lf_graph_->update_node_indices();
+ lf_graph_info_->num_inline_nodes_approximate += lf_graph_->nodes().size();
}
private:
@@ -915,6 +926,8 @@ struct GeometryNodesLazyFunctionGraphBuilder {
mapping_->bsockets_by_lf_socket_map.add(&lf_socket, &bsocket);
}
mapping_->group_node_map.add(&bnode, &lf_node);
+ lf_graph_info_->num_inline_nodes_approximate +=
+ group_lf_graph_info->num_inline_nodes_approximate;
}
void handle_geometry_node(const bNode &bnode)
@@ -1358,4 +1371,52 @@ GeometryNodesLazyFunctionGraphInfo::~GeometryNodesLazyFunctionGraphInfo()
}
}
+static void add_thread_id_debug_message(const GeometryNodesLazyFunctionGraphInfo &lf_graph_info,
+ const lf::FunctionNode &node,
+ const lf::Context &context)
+{
+ static std::atomic<int> thread_id_source = 0;
+ static thread_local const int thread_id = thread_id_source.fetch_add(1);
+ static thread_local const std::string thread_id_str = "Thread: " + std::to_string(thread_id);
+
+ GeoNodesLFUserData *user_data = dynamic_cast<GeoNodesLFUserData *>(context.user_data);
+ BLI_assert(user_data != nullptr);
+ if (user_data->modifier_data->eval_log == nullptr) {
+ return;
+ }
+ geo_eval_log::GeoTreeLogger &tree_logger =
+ user_data->modifier_data->eval_log->get_local_tree_logger(*user_data->compute_context);
+
+ /* Find corresponding node based on the socket mapping. */
+ auto check_sockets = [&](const Span<const lf::Socket *> lf_sockets) {
+ for (const lf::Socket *lf_socket : lf_sockets) {
+ const Span<const bNodeSocket *> bsockets =
+ lf_graph_info.mapping.bsockets_by_lf_socket_map.lookup(lf_socket);
+ if (!bsockets.is_empty()) {
+ const bNodeSocket &bsocket = *bsockets[0];
+ const bNode &bnode = bsocket.owner_node();
+ tree_logger.debug_messages.append(
+ {tree_logger.allocator->copy_string(bnode.name), thread_id_str});
+ return true;
+ }
+ }
+ return false;
+ };
+
+ if (check_sockets(node.inputs().cast<const lf::Socket *>())) {
+ return;
+ }
+ check_sockets(node.outputs().cast<const lf::Socket *>());
+}
+
+void GeometryNodesLazyFunctionLogger::log_before_node_execute(const lf::FunctionNode &node,
+ const lf::Params &UNUSED(params),
+ const lf::Context &context) const
+{
+ /* Enable this to see the threads that invoked a node. */
+ if constexpr (false) {
+ add_thread_id_debug_message(lf_graph_info_, node, context);
+ }
+}
+
} // namespace blender::nodes