Welcome to mirror list, hosted at ThFree Co, Russian Federation.

git.blender.org/blender.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJacques Lucke <jacques@blender.org>2022-09-20 11:59:12 +0300
committerJacques Lucke <jacques@blender.org>2022-09-20 12:08:05 +0300
commit5c81d3bd4691214164a7f071d239ef6c84dba8dc (patch)
treecbab752ca58da40b45c15540707a2b0a22d769da /source/blender/functions/intern/lazy_function_graph_executor.cc
parent7a239812ca55153fc9751b6a87bc5c4deb76456b (diff)
Geometry Nodes: improve evaluator with lazy threading
In large node setup the threading overhead was sometimes very significant. That's especially true when most nodes do very little work. This commit improves the scheduling by not using multi-threading in many cases unless it's likely that it will be worth it. For more details see the comments in `BLI_lazy_threading.hh`. Differential Revision: https://developer.blender.org/D15976
Diffstat (limited to 'source/blender/functions/intern/lazy_function_graph_executor.cc')
-rw-r--r--source/blender/functions/intern/lazy_function_graph_executor.cc285
1 files changed, 181 insertions, 104 deletions
diff --git a/source/blender/functions/intern/lazy_function_graph_executor.cc b/source/blender/functions/intern/lazy_function_graph_executor.cc
index 176509bd687..f3ce2476085 100644
--- a/source/blender/functions/intern/lazy_function_graph_executor.cc
+++ b/source/blender/functions/intern/lazy_function_graph_executor.cc
@@ -3,18 +3,20 @@
/**
* This file implements the evaluation of a lazy-function graph. It's main objectives are:
* - Only compute values that are actually used.
- * - Allow spreading the work over an arbitrary number of CPU cores.
+ * - Stay single threaded when nodes are executed quickly.
+ * - Allow spreading the work over an arbitrary number of threads efficiently.
*
- * Other (simpler) executors with different main objectives could be implemented in the future. For
- * some scenarios those could be simpler when many nodes do very little work or most nodes have to
- * be processed sequentially. Those assumptions make the first and second objective less important
- * respectively.
+ * This executor makes use of `FN_lazy_threading.hh` to enable multi-threading only when it seems
+ * benefitial. It operates in two modes: single- and multi-threaded. The use of a task pool and
+ * locks is avoided in single-threaded mode. Once multi-threading is enabled the executor starts
+ * using both. It is not possible to switch back from multi-threaded to single-threaded mode.
*
- * The design implemented in this executor requires *no* main thread that coordinates everything.
- * Instead, one thread will trigger some initial work and then many threads coordinate themselves
- * in a distributed fashion. In an ideal situation, every thread ends up processing a separate part
- * of the graph which results in less communication overhead. The way TBB schedules tasks helps
- * with that: a thread will next process the task that it added to a task pool just before.
+ * The multi-threading design implemented in this executor requires *no* main thread that
+ * coordinates everything. Instead, one thread will trigger some initial work and then many threads
+ * coordinate themselves in a distributed fashion. In an ideal situation, every thread ends up
+ * processing a separate part of the graph which results in less communication overhead. The way
+ * TBB schedules tasks helps with that: a thread will next process the task that it added to a task
+ * pool just before.
*
* Communication between threads is synchronized by using a mutex in every node. When a thread
* wants to access the state of a node, its mutex has to be locked first (with some documented
@@ -26,15 +28,14 @@
* state of its inputs and outputs. Every time a node is executed, it has to advance its state in
* some way (e.g. it requests a new input or computes a new output).
*
- * At the core of the executor is a task pool. Every task in that pool represents a node execution.
- * When a node is executed it may send notifications to other nodes which may in turn add those
- * nodes to the task pool. For example, the current node has computed one of its outputs, then the
+ * When a node is executed it may send notifications to other nodes which may in turn schedule
+ * those nodes. For example, when the current node has computed one of its outputs, then the
* computed value is forwarded to all linked inputs, changing their node states in the process. If
- * this input was the last missing required input, the node will be added to the task pool so that
- * it is executed next.
+ * this input was the last missing required input, the node will be scheduled that it is executed
+ * next.
*
- * When the task pool is empty, the executor gives back control to the caller which may later
- * provide new inputs to the graph which in turn adds new nodes to the task pool and the process
+ * When all tasks are completed, the executor gives back control to the caller which may later
+ * provide new inputs to the graph which in turn leads to new nodes being scheduled and the process
* starts again.
*/
@@ -190,27 +191,31 @@ struct LockedNode {
*/
Vector<const OutputSocket *> delayed_required_outputs;
Vector<const OutputSocket *> delayed_unused_outputs;
- Vector<const FunctionNode *> delayed_scheduled_nodes;
LockedNode(const Node &node, NodeState &node_state) : node(node), node_state(node_state)
{
}
};
+class Executor;
+class GraphExecutorLFParams;
+
struct CurrentTask {
/**
- * The node that should be run on the same thread after the current node is done. This avoids
- * some overhead by skipping a round trip through the task pool.
+ * Mutex used to protect #scheduled_nodes when the executor uses multi-threading.
*/
- std::atomic<const FunctionNode *> next_node = nullptr;
+ std::mutex mutex;
/**
- * Indicates that some node has been added to the task pool.
+ * Nodes that have been scheduled to execute next.
*/
- std::atomic<bool> added_node_to_pool = false;
+ Vector<const FunctionNode *> scheduled_nodes;
+ /**
+ * Makes it cheaper to check if there are any scheduled nodes because it avoids locking the
+ * mutex.
+ */
+ std::atomic<bool> has_scheduled_nodes = false;
};
-class GraphExecutorLFParams;
-
class Executor {
private:
const GraphExecutor &self_;
@@ -230,13 +235,18 @@ class Executor {
const Context *context_ = nullptr;
/**
* Used to distribute work on separate nodes to separate threads.
+ * If this is empty, the executor is in single threaded mode.
*/
- TaskPool *task_pool_ = nullptr;
+ std::atomic<TaskPool *> task_pool_ = nullptr;
+#ifdef FN_LAZY_FUNCTION_DEBUG_THREADS
+ std::thread::id current_main_thread_;
+#endif
/**
* A separate linear allocator for every thread. We could potentially reuse some memory, but that
* doesn't seem worth it yet.
*/
threading::EnumerableThreadSpecific<LinearAllocator<>> local_allocators_;
+ LinearAllocator<> *main_local_allocator_ = nullptr;
/**
* Set to false when the first execution ends.
*/
@@ -249,11 +259,14 @@ class Executor {
{
/* The indices are necessary, because they are used as keys in #node_states_. */
BLI_assert(self_.graph_.node_indices_are_valid());
+ main_local_allocator_ = &local_allocators_.local();
}
~Executor()
{
- BLI_task_pool_free(task_pool_);
+ if (TaskPool *task_pool = task_pool_.load()) {
+ BLI_task_pool_free(task_pool);
+ }
threading::parallel_for(node_states_.index_range(), 1024, [&](const IndexRange range) {
for (const int node_index : range) {
const Node &node = *self_.graph_.nodes()[node_index];
@@ -270,18 +283,23 @@ class Executor {
{
params_ = &params;
context_ = &context;
- BLI_SCOPED_DEFER([&]() {
- /* Make sure the #params_ pointer is not dangling, even when it shouldn't be accessed by
- * anyone. */
+#ifdef FN_LAZY_FUNCTION_DEBUG_THREADS
+ current_main_thread_ = std::this_thread::get_id();
+#endif
+ const auto deferred_func = [&]() {
+ /* Make sure the pointers are not dangling, even when it shouldn't be accessed by anyone. */
params_ = nullptr;
context_ = nullptr;
is_first_execution_ = false;
- });
+#ifdef FN_LAZY_FUNCTION_DEBUG_THREADS
+ current_main_thread_ = {};
+#endif
+ };
+ BLI_SCOPED_DEFER(deferred_func);
CurrentTask current_task;
if (is_first_execution_) {
this->initialize_node_states();
- task_pool_ = BLI_task_pool_create(this, TASK_PRIORITY_HIGH);
/* Initialize atomics to zero. */
memset(static_cast<void *>(loaded_inputs_.data()), 0, loaded_inputs_.size() * sizeof(bool));
@@ -294,21 +312,11 @@ class Executor {
this->schedule_newly_requested_outputs(current_task);
this->forward_newly_provided_inputs(current_task);
- /* Avoid using task pool when there is no parallel work to do. */
- while (!current_task.added_node_to_pool) {
- if (current_task.next_node == nullptr) {
- /* Nothing to do. */
- return;
- }
- const FunctionNode &node = *current_task.next_node;
- current_task.next_node = nullptr;
- this->run_node_task(node, current_task);
- }
- if (current_task.next_node != nullptr) {
- this->add_node_to_task_pool(*current_task.next_node);
- }
+ this->run_task(current_task);
- BLI_task_pool_work_and_wait(task_pool_);
+ if (TaskPool *task_pool = task_pool_.load()) {
+ BLI_task_pool_work_and_wait(task_pool);
+ }
}
private:
@@ -426,7 +434,7 @@ class Executor {
NodeState &node_state = *node_states_[node->index_in_graph()];
node_state.has_side_effects = true;
this->with_locked_node(*node, node_state, current_task, [&](LockedNode &locked_node) {
- this->schedule_node(locked_node);
+ this->schedule_node(locked_node, current_task);
});
}
}
@@ -434,7 +442,7 @@ class Executor {
void forward_newly_provided_inputs(CurrentTask &current_task)
{
- LinearAllocator<> &allocator = local_allocators_.local();
+ LinearAllocator<> &allocator = this->get_main_or_local_allocator();
for (const int graph_input_index : self_.graph_inputs_.index_range()) {
std::atomic<uint8_t> &was_loaded = loaded_inputs_[graph_input_index];
if (was_loaded.load()) {
@@ -488,7 +496,7 @@ class Executor {
return;
}
this->forward_newly_provided_input(
- current_task, local_allocators_.local(), graph_input_index, input_data);
+ current_task, this->get_main_or_local_allocator(), graph_input_index, input_data);
return;
}
@@ -498,7 +506,7 @@ class Executor {
return;
}
output_state.usage = ValueUsage::Used;
- this->schedule_node(locked_node);
+ this->schedule_node(locked_node, current_task);
});
}
@@ -520,25 +528,28 @@ class Executor {
params_->set_input_unused(graph_input_index);
}
else {
- this->schedule_node(locked_node);
+ this->schedule_node(locked_node, current_task);
}
}
}
});
}
- void schedule_node(LockedNode &locked_node)
+ void schedule_node(LockedNode &locked_node, CurrentTask &current_task)
{
BLI_assert(locked_node.node.is_function());
switch (locked_node.node_state.schedule_state) {
case NodeScheduleState::NotScheduled: {
- /* Don't add the node to the task pool immediately, because the task pool might start
- * executing it immediately (when Blender is started with a single thread).
- * That would often result in a deadlock, because we are still holding the mutex of the
- * current node. Also see comments in #LockedNode. */
locked_node.node_state.schedule_state = NodeScheduleState::Scheduled;
- locked_node.delayed_scheduled_nodes.append(
- &static_cast<const FunctionNode &>(locked_node.node));
+ const FunctionNode &node = static_cast<const FunctionNode &>(locked_node.node);
+ if (this->use_multi_threading()) {
+ std::lock_guard lock{current_task.mutex};
+ current_task.scheduled_nodes.append(&node);
+ }
+ else {
+ current_task.scheduled_nodes.append(&node);
+ }
+ current_task.has_scheduled_nodes.store(true, std::memory_order_relaxed);
break;
}
case NodeScheduleState::Scheduled: {
@@ -562,14 +573,16 @@ class Executor {
BLI_assert(&node_state == node_states_[node.index_in_graph()]);
LockedNode locked_node{node, node_state};
- {
+ if (this->use_multi_threading()) {
std::lock_guard lock{node_state.mutex};
threading::isolate_task([&]() { f(locked_node); });
}
+ else {
+ f(locked_node);
+ }
this->send_output_required_notifications(locked_node.delayed_required_outputs, current_task);
this->send_output_unused_notifications(locked_node.delayed_unused_outputs, current_task);
- this->schedule_new_nodes(locked_node.delayed_scheduled_nodes, current_task);
}
void send_output_required_notifications(const Span<const OutputSocket *> sockets,
@@ -588,49 +601,21 @@ class Executor {
}
}
- void schedule_new_nodes(const Span<const FunctionNode *> nodes, CurrentTask &current_task)
+ void run_task(CurrentTask &current_task)
{
- for (const FunctionNode *node_to_schedule : nodes) {
- /* Avoid a round trip through the task pool for the first node that is scheduled by the
- * current node execution. Other nodes are added to the pool so that other threads can pick
- * them up. */
- const FunctionNode *expected = nullptr;
- if (current_task.next_node.compare_exchange_strong(
- expected, node_to_schedule, std::memory_order_relaxed)) {
- continue;
+ while (!current_task.scheduled_nodes.is_empty()) {
+ const FunctionNode &node = *current_task.scheduled_nodes.pop_last();
+ if (current_task.scheduled_nodes.is_empty()) {
+ current_task.has_scheduled_nodes.store(false, std::memory_order_relaxed);
}
- this->add_node_to_task_pool(*node_to_schedule);
- current_task.added_node_to_pool.store(true, std::memory_order_relaxed);
- }
- }
-
- void add_node_to_task_pool(const Node &node)
- {
- BLI_task_pool_push(
- task_pool_, Executor::run_node_from_task_pool, (void *)&node, false, nullptr);
- }
-
- static void run_node_from_task_pool(TaskPool *task_pool, void *task_data)
- {
- void *user_data = BLI_task_pool_user_data(task_pool);
- Executor &executor = *static_cast<Executor *>(user_data);
- const FunctionNode &node = *static_cast<const FunctionNode *>(task_data);
-
- /* This loop reduces the number of round trips through the task pool as long as the current
- * node is scheduling more nodes. */
- CurrentTask current_task;
- current_task.next_node = &node;
- while (current_task.next_node != nullptr) {
- const FunctionNode &node_to_run = *current_task.next_node;
- current_task.next_node = nullptr;
- executor.run_node_task(node_to_run, current_task);
+ this->run_node_task(node, current_task);
}
}
void run_node_task(const FunctionNode &node, CurrentTask &current_task)
{
NodeState &node_state = *node_states_[node.index_in_graph()];
- LinearAllocator<> &allocator = local_allocators_.local();
+ LinearAllocator<> &allocator = this->get_main_or_local_allocator();
const LazyFunction &fn = node.function();
bool node_needs_execution = false;
@@ -672,7 +657,7 @@ class Executor {
}
void *buffer = allocator.allocate(type.size(), type.alignment());
type.copy_construct(default_value, buffer);
- this->forward_value_to_input(locked_node, input_state, {type, buffer});
+ this->forward_value_to_input(locked_node, input_state, {type, buffer}, current_task);
}
/* Request linked inputs that are always needed. */
@@ -723,7 +708,7 @@ class Executor {
NodeScheduleState::RunningAndRescheduled;
node_state.schedule_state = NodeScheduleState::NotScheduled;
if (reschedule_requested && !node_state.node_has_finished) {
- this->schedule_node(locked_node);
+ this->schedule_node(locked_node, current_task);
}
});
}
@@ -887,7 +872,7 @@ class Executor {
CurrentTask &current_task)
{
BLI_assert(value_to_forward.get() != nullptr);
- LinearAllocator<> &allocator = local_allocators_.local();
+ LinearAllocator<> &allocator = this->get_main_or_local_allocator();
const CPPType &type = *value_to_forward.type();
if (self_.logger_ != nullptr) {
@@ -938,13 +923,13 @@ class Executor {
}
if (is_last_target) {
/* No need to make a copy if this is the last target. */
- this->forward_value_to_input(locked_node, input_state, value_to_forward);
+ this->forward_value_to_input(locked_node, input_state, value_to_forward, current_task);
value_to_forward = {};
}
else {
void *buffer = allocator.allocate(type.size(), type.alignment());
type.copy_construct(value_to_forward.get(), buffer);
- this->forward_value_to_input(locked_node, input_state, {type, buffer});
+ this->forward_value_to_input(locked_node, input_state, {type, buffer}, current_task);
}
});
}
@@ -955,7 +940,8 @@ class Executor {
void forward_value_to_input(LockedNode &locked_node,
InputState &input_state,
- GMutablePointer value)
+ GMutablePointer value,
+ CurrentTask &current_task)
{
NodeState &node_state = locked_node.node_state;
@@ -966,10 +952,82 @@ class Executor {
if (input_state.usage == ValueUsage::Used) {
node_state.missing_required_inputs -= 1;
if (node_state.missing_required_inputs == 0) {
- this->schedule_node(locked_node);
+ this->schedule_node(locked_node, current_task);
}
}
}
+
+ bool use_multi_threading() const
+ {
+ return task_pool_.load() != nullptr;
+ }
+
+ bool try_enable_multi_threading()
+ {
+ if (this->use_multi_threading()) {
+ return true;
+ }
+#ifdef FN_LAZY_FUNCTION_DEBUG_THREADS
+ /* Only the current main thread is allowed to enabled multi-threading, because the executor is
+ * still in single-threaded mode. */
+ if (current_main_thread_ != std::this_thread::get_id()) {
+ BLI_assert_unreachable();
+ }
+#endif
+ /* Check of the caller supports multi-threading. */
+ if (!params_->try_enable_multi_threading()) {
+ return false;
+ }
+ /* Avoid using multiple threads when only one thread can be used anyway. */
+ if (BLI_system_thread_count() <= 1) {
+ return false;
+ }
+ task_pool_.store(BLI_task_pool_create(this, TASK_PRIORITY_HIGH));
+ return true;
+ }
+
+ /**
+ * Allow other threads to steal all the nodes that are currently scheduled on this thread.
+ */
+ void move_scheduled_nodes_to_task_pool(CurrentTask &current_task)
+ {
+ BLI_assert(this->use_multi_threading());
+ using FunctionNodeVector = Vector<const FunctionNode *>;
+ FunctionNodeVector *nodes = MEM_new<FunctionNodeVector>(__func__);
+ {
+ std::lock_guard lock{current_task.mutex};
+ if (current_task.scheduled_nodes.is_empty()) {
+ return;
+ }
+ *nodes = std::move(current_task.scheduled_nodes);
+ current_task.has_scheduled_nodes.store(false, std::memory_order_relaxed);
+ }
+ /* All nodes are pushed as a single task in the pool. This avoids unnecessary threading
+ * overhead when the nodes are fast to compute. */
+ BLI_task_pool_push(
+ task_pool_.load(),
+ [](TaskPool *pool, void *data) {
+ Executor &executor = *static_cast<Executor *>(BLI_task_pool_user_data(pool));
+ FunctionNodeVector &nodes = *static_cast<FunctionNodeVector *>(data);
+ CurrentTask new_current_task;
+ new_current_task.scheduled_nodes = std::move(nodes);
+ new_current_task.has_scheduled_nodes.store(true, std::memory_order_relaxed);
+ executor.run_task(new_current_task);
+ },
+ nodes,
+ true,
+ [](TaskPool * /*pool*/, void *data) {
+ MEM_delete(static_cast<FunctionNodeVector *>(data));
+ });
+ }
+
+ LinearAllocator<> &get_main_or_local_allocator()
+ {
+ if (this->use_multi_threading()) {
+ return local_allocators_.local();
+ }
+ return *main_local_allocator_;
+ }
};
class GraphExecutorLFParams final : public Params {
@@ -985,7 +1043,7 @@ class GraphExecutorLFParams final : public Params {
const Node &node,
NodeState &node_state,
CurrentTask &current_task)
- : Params(fn),
+ : Params(fn, executor.use_multi_threading()),
executor_(executor),
node_(node),
node_state_(node_state),
@@ -1017,7 +1075,7 @@ class GraphExecutorLFParams final : public Params {
OutputState &output_state = node_state_.outputs[index];
BLI_assert(!output_state.has_been_computed);
if (output_state.value == nullptr) {
- LinearAllocator<> &allocator = executor_.local_allocators_.local();
+ LinearAllocator<> &allocator = executor_.get_main_or_local_allocator();
const CPPType &type = node_.output(index).type();
output_state.value = allocator.allocate(type.size(), type.alignment());
}
@@ -1052,6 +1110,11 @@ class GraphExecutorLFParams final : public Params {
{
executor_.set_input_unused_during_execution(node_, node_state_, index, current_task_);
}
+
+ bool try_enable_multi_threading_impl() override
+ {
+ return executor_.try_enable_multi_threading();
+ }
};
/**
@@ -1073,6 +1136,20 @@ inline void Executor::execute_node(const FunctionNode &node,
self_.logger_->log_before_node_execute(node, node_params, fn_context);
}
+ /* This is run when the execution of the node calls `lazy_threading::send_hint` to indicate that
+ * the execution will take a while. In this case, other tasks waiting on this thread should be
+ * allowed to be picked up by another thread. */
+ auto blocking_hint_fn = [&]() {
+ if (!current_task.has_scheduled_nodes.load()) {
+ return;
+ }
+ if (!this->try_enable_multi_threading()) {
+ return;
+ }
+ this->move_scheduled_nodes_to_task_pool(current_task);
+ };
+
+ lazy_threading::HintReceiver blocking_hint_receiver{blocking_hint_fn};
fn.execute(node_params, fn_context);
if (self_.logger_ != nullptr) {