/* SPDX-License-Identifier: GPL-2.0-or-later */ /** * This file implements the evaluation of a lazy-function graph. It's main objectives are: * - Only compute values that are actually used. * - Stay single threaded when nodes are executed quickly. * - Allow spreading the work over an arbitrary number of threads efficiently. * * This executor makes use of `FN_lazy_threading.hh` to enable multi-threading only when it seems * beneficial. It operates in two modes: single- and multi-threaded. The use of a task pool and * locks is avoided in single-threaded mode. Once multi-threading is enabled the executor starts * using both. It is not possible to switch back from multi-threaded to single-threaded mode. * * The multi-threading design implemented in this executor requires *no* main thread that * coordinates everything. Instead, one thread will trigger some initial work and then many threads * coordinate themselves in a distributed fashion. In an ideal situation, every thread ends up * processing a separate part of the graph which results in less communication overhead. The way * TBB schedules tasks helps with that: a thread will next process the task that it added to a task * pool just before. * * Communication between threads is synchronized by using a mutex in every node. When a thread * wants to access the state of a node, its mutex has to be locked first (with some documented * exceptions). The assumption here is that most nodes are only ever touched by a single thread and * therefore the lock contention is reduced the more nodes there are. * * Similar to how a #LazyFunction can be thought of as a state machine (see `FN_lazy_function.hh`), * each node can also be thought of as a state machine. The state of a node contains the evaluation * state of its inputs and outputs. Every time a node is executed, it has to advance its state in * some way (e.g. it requests a new input or computes a new output). * * When a node is executed it may send notifications to other nodes which may in turn schedule * those nodes. For example, when the current node has computed one of its outputs, then the * computed value is forwarded to all linked inputs, changing their node states in the process. If * this input was the last missing required input, the node will be scheduled that it is executed * next. * * When all tasks are completed, the executor gives back control to the caller which may later * provide new inputs to the graph which in turn leads to new nodes being scheduled and the process * starts again. */ #include #include "BLI_compute_context.hh" #include "BLI_enumerable_thread_specific.hh" #include "BLI_function_ref.hh" #include "BLI_task.h" #include "BLI_task.hh" #include "BLI_timeit.hh" #include "FN_lazy_function_graph_executor.hh" namespace blender::fn::lazy_function { enum class NodeScheduleState { /** * Default state of every node. */ NotScheduled, /** * The node has been added to the task pool or is otherwise scheduled to be executed in the * future. */ Scheduled, /** * The node is currently running. */ Running, /** * The node is running and has been rescheduled while running. In this case the node run again. * This state exists, because we don't want to add the node to the task pool twice, because then * the node might run twice at the same time, which is not allowed. Instead, once the node is * done running, it will reschedule itself. */ RunningAndRescheduled, }; struct InputState { /** * Value of this input socket. By default, the value is empty. When other nodes are done * computing their outputs, the computed values will be forwarded to linked input sockets. The * value will then live here until it is found that it is not needed anymore. * * If #was_ready_for_execution is true, access does not require holding the node lock. */ void *value = nullptr; /** * How the node intends to use this input. By default, all inputs may be used. Based on which * outputs are used, a node can decide that an input will definitely be used or is never used. * This allows freeing values early and avoids unnecessary computations. */ ValueUsage usage = ValueUsage::Maybe; /** * Set to true once #value is set and will stay true afterwards. Access during execution of a * node, does not require holding the node lock. */ bool was_ready_for_execution = false; }; struct OutputState { /** * Keeps track of how the output value is used. If a connected input becomes used, this output * has to become used as well. The output becomes unused when it is used by no input socket * anymore and it's not an output of the graph. */ ValueUsage usage = ValueUsage::Maybe; /** * This is a copy of #usage that is done right before node execution starts. This is done so that * the node gets a consistent view of what outputs are used, even when this changes while the * node is running (the node might be reevaluated in that case). Access during execution of a * node, does not require holding the node lock. */ ValueUsage usage_for_execution = ValueUsage::Maybe; /** * Number of linked sockets that might still use the value of this output. */ int potential_target_sockets = 0; /** * Is set to true once the output has been computed and then stays true. Access does not require * holding the node lock. */ bool has_been_computed = false; /** * Holds the output value for a short period of time while the node is initializing it and before * it's forwarded to input sockets. Access does not require holding the node lock. */ void *value = nullptr; }; struct NodeState { /** * Needs to be locked when any data in this state is accessed that is not explicitly marked as * not needing the lock. */ mutable std::mutex mutex; /** * States of the individual input and output sockets. One can index into these arrays without * locking. However, to access data inside, a lock is needed unless noted otherwise. */ MutableSpan inputs; MutableSpan outputs; /** * Counts the number of inputs that still have to be provided to this node, until it should run * again. This is used as an optimization so that nodes are not scheduled unnecessarily in many * cases. */ int missing_required_inputs = 0; /** * Is set to true once the node is done with its work, i.e. when all outputs that may be used * have been computed. */ bool node_has_finished = false; /** * Set to true once the node is done running for the first time. */ bool had_initialization = true; /** * Nodes with side effects should always be executed when their required inputs have been * computed. */ bool has_side_effects = false; /** * A node is always in one specific schedule state. This helps to ensure that the same node does * not run twice at the same time accidentally. */ NodeScheduleState schedule_state = NodeScheduleState::NotScheduled; /** * Custom storage of the node. */ void *storage = nullptr; }; /** * Utility class that wraps a node whose state is locked. Having this is a separate class is useful * because it allows methods to communicate that they expect the node to be locked. */ struct LockedNode { /** * This is the node that is currently locked. */ const Node &node; NodeState &node_state; /** * Used to delay notifying (and therefore locking) other nodes until the current node is not * locked anymore. This might not be strictly necessary to avoid deadlocks in the current code, * but is a good measure to avoid accidentally adding a deadlock later on. By not locking more * than one node per thread at a time, deadlocks are avoided. * * The notifications will be send right after the node is not locked anymore. */ Vector delayed_required_outputs; Vector delayed_unused_outputs; LockedNode(const Node &node, NodeState &node_state) : node(node), node_state(node_state) { } }; class Executor; class GraphExecutorLFParams; struct CurrentTask { /** * Mutex used to protect #scheduled_nodes when the executor uses multi-threading. */ std::mutex mutex; /** * Nodes that have been scheduled to execute next. */ Vector scheduled_nodes; /** * Makes it cheaper to check if there are any scheduled nodes because it avoids locking the * mutex. */ std::atomic has_scheduled_nodes = false; }; class Executor { private: const GraphExecutor &self_; /** * Remembers which inputs have been loaded from the caller already, to avoid loading them twice. * Atomics are used to make sure that every input is only retrieved once. */ Array> loaded_inputs_; /** * State of every node, indexed by #Node::index_in_graph. */ Array node_states_; /** * Parameters provided by the caller. This is always non-null, while a node is running. */ Params *params_ = nullptr; const Context *context_ = nullptr; /** * Used to distribute work on separate nodes to separate threads. * If this is empty, the executor is in single threaded mode. */ std::atomic task_pool_ = nullptr; #ifdef FN_LAZY_FUNCTION_DEBUG_THREADS std::thread::id current_main_thread_; #endif /** * A separate linear allocator for every thread. We could potentially reuse some memory, but that * doesn't seem worth it yet. */ threading::EnumerableThreadSpecific> local_allocators_; LinearAllocator<> *main_local_allocator_ = nullptr; /** * Set to false when the first execution ends. */ bool is_first_execution_ = true; friend GraphExecutorLFParams; public: Executor(const GraphExecutor &self) : self_(self), loaded_inputs_(self.graph_inputs_.size()) { /* The indices are necessary, because they are used as keys in #node_states_. */ BLI_assert(self_.graph_.node_indices_are_valid()); main_local_allocator_ = &local_allocators_.local(); } ~Executor() { if (TaskPool *task_pool = task_pool_.load()) { BLI_task_pool_free(task_pool); } threading::parallel_for(node_states_.index_range(), 1024, [&](const IndexRange range) { for (const int node_index : range) { const Node &node = *self_.graph_.nodes()[node_index]; NodeState &node_state = *node_states_[node_index]; this->destruct_node_state(node, node_state); } }); } /** * Main entry point to the execution of this graph. */ void execute(Params ¶ms, const Context &context) { params_ = ¶ms; context_ = &context; #ifdef FN_LAZY_FUNCTION_DEBUG_THREADS current_main_thread_ = std::this_thread::get_id(); #endif const auto deferred_func = [&]() { /* Make sure the pointers are not dangling, even when it shouldn't be accessed by anyone. */ params_ = nullptr; context_ = nullptr; is_first_execution_ = false; #ifdef FN_LAZY_FUNCTION_DEBUG_THREADS current_main_thread_ = {}; #endif }; BLI_SCOPED_DEFER(deferred_func); CurrentTask current_task; if (is_first_execution_) { this->initialize_node_states(); /* Initialize atomics to zero. */ memset(static_cast(loaded_inputs_.data()), 0, loaded_inputs_.size() * sizeof(bool)); this->set_always_unused_graph_inputs(); this->set_defaulted_graph_outputs(); this->schedule_side_effect_nodes(current_task); } this->schedule_newly_requested_outputs(current_task); this->forward_newly_provided_inputs(current_task); this->run_task(current_task); if (TaskPool *task_pool = task_pool_.load()) { BLI_task_pool_work_and_wait(task_pool); } } private: void initialize_node_states() { Span nodes = self_.graph_.nodes(); node_states_.reinitialize(nodes.size()); /* Construct all node states in parallel. */ threading::parallel_for(nodes.index_range(), 256, [&](const IndexRange range) { LinearAllocator<> &allocator = local_allocators_.local(); for (const int i : range) { const Node &node = *nodes[i]; NodeState &node_state = *allocator.construct().release(); node_states_[i] = &node_state; this->construct_initial_node_state(allocator, node, node_state); } }); } void construct_initial_node_state(LinearAllocator<> &allocator, const Node &node, NodeState &node_state) { const Span node_inputs = node.inputs(); const Span node_outputs = node.outputs(); node_state.inputs = allocator.construct_array(node_inputs.size()); node_state.outputs = allocator.construct_array(node_outputs.size()); for (const int i : node_outputs.index_range()) { OutputState &output_state = node_state.outputs[i]; const OutputSocket &output_socket = *node_outputs[i]; output_state.potential_target_sockets = output_socket.targets().size(); if (output_state.potential_target_sockets == 0) { output_state.usage = ValueUsage::Unused; } } } void destruct_node_state(const Node &node, NodeState &node_state) { if (node.is_function()) { const LazyFunction &fn = static_cast(node).function(); if (node_state.storage != nullptr) { fn.destruct_storage(node_state.storage); } } for (const int i : node.inputs().index_range()) { InputState &input_state = node_state.inputs[i]; const InputSocket &input_socket = node.input(i); this->destruct_input_value_if_exists(input_state, input_socket.type()); } std::destroy_at(&node_state); } void schedule_newly_requested_outputs(CurrentTask ¤t_task) { for (const int graph_output_index : self_.graph_outputs_.index_range()) { if (params_->get_output_usage(graph_output_index) != ValueUsage::Used) { continue; } if (params_->output_was_set(graph_output_index)) { continue; } const InputSocket &socket = *self_.graph_outputs_[graph_output_index]; const Node &node = socket.node(); NodeState &node_state = *node_states_[node.index_in_graph()]; this->with_locked_node(node, node_state, current_task, [&](LockedNode &locked_node) { this->set_input_required(locked_node, socket); }); } } void set_defaulted_graph_outputs() { for (const int graph_output_index : self_.graph_outputs_.index_range()) { const InputSocket &socket = *self_.graph_outputs_[graph_output_index]; if (socket.origin() != nullptr) { continue; } const CPPType &type = socket.type(); const void *default_value = socket.default_value(); BLI_assert(default_value != nullptr); if (self_.logger_ != nullptr) { self_.logger_->log_socket_value(socket, {type, default_value}, *context_); } void *output_ptr = params_->get_output_data_ptr(graph_output_index); type.copy_construct(default_value, output_ptr); params_->output_set(graph_output_index); } } void set_always_unused_graph_inputs() { for (const int i : self_.graph_inputs_.index_range()) { const OutputSocket &socket = *self_.graph_inputs_[i]; const Node &node = socket.node(); const NodeState &node_state = *node_states_[node.index_in_graph()]; const OutputState &output_state = node_state.outputs[socket.index()]; if (output_state.usage == ValueUsage::Unused) { params_->set_input_unused(i); } } } void schedule_side_effect_nodes(CurrentTask ¤t_task) { if (self_.side_effect_provider_ != nullptr) { const Vector side_effect_nodes = self_.side_effect_provider_->get_nodes_with_side_effects(*context_); for (const FunctionNode *node : side_effect_nodes) { NodeState &node_state = *node_states_[node->index_in_graph()]; node_state.has_side_effects = true; this->with_locked_node(*node, node_state, current_task, [&](LockedNode &locked_node) { this->schedule_node(locked_node, current_task); }); } } } void forward_newly_provided_inputs(CurrentTask ¤t_task) { LinearAllocator<> &allocator = this->get_main_or_local_allocator(); for (const int graph_input_index : self_.graph_inputs_.index_range()) { std::atomic &was_loaded = loaded_inputs_[graph_input_index]; if (was_loaded.load()) { continue; } void *input_data = params_->try_get_input_data_ptr(graph_input_index); if (input_data == nullptr) { continue; } if (was_loaded.fetch_or(1)) { /* The value was forwarded before. */ continue; } this->forward_newly_provided_input(current_task, allocator, graph_input_index, input_data); } } void forward_newly_provided_input(CurrentTask ¤t_task, LinearAllocator<> &allocator, const int graph_input_index, void *input_data) { const OutputSocket &socket = *self_.graph_inputs_[graph_input_index]; const CPPType &type = socket.type(); void *buffer = allocator.allocate(type.size(), type.alignment()); type.move_construct(input_data, buffer); this->forward_value_to_linked_inputs(socket, {type, buffer}, current_task); } void notify_output_required(const OutputSocket &socket, CurrentTask ¤t_task) { const Node &node = socket.node(); const int index_in_node = socket.index(); NodeState &node_state = *node_states_[node.index_in_graph()]; OutputState &output_state = node_state.outputs[index_in_node]; /* The notified output socket might be an input of the entire graph. In this case, notify the * caller that the input is required. */ if (node.is_dummy()) { const int graph_input_index = self_.graph_inputs_.index_of(&socket); std::atomic &was_loaded = loaded_inputs_[graph_input_index]; if (was_loaded.load()) { return; } void *input_data = params_->try_get_input_data_ptr_or_request(graph_input_index); if (input_data == nullptr) { return; } if (was_loaded.fetch_or(1)) { /* The value was forwarded already. */ return; } this->forward_newly_provided_input( current_task, this->get_main_or_local_allocator(), graph_input_index, input_data); return; } BLI_assert(node.is_function()); this->with_locked_node(node, node_state, current_task, [&](LockedNode &locked_node) { if (output_state.usage == ValueUsage::Used) { return; } output_state.usage = ValueUsage::Used; this->schedule_node(locked_node, current_task); }); } void notify_output_unused(const OutputSocket &socket, CurrentTask ¤t_task) { const Node &node = socket.node(); const int index_in_node = socket.index(); NodeState &node_state = *node_states_[node.index_in_graph()]; OutputState &output_state = node_state.outputs[index_in_node]; this->with_locked_node(node, node_state, current_task, [&](LockedNode &locked_node) { output_state.potential_target_sockets -= 1; if (output_state.potential_target_sockets == 0) { BLI_assert(output_state.usage != ValueUsage::Unused); if (output_state.usage == ValueUsage::Maybe) { output_state.usage = ValueUsage::Unused; if (node.is_dummy()) { const int graph_input_index = self_.graph_inputs_.index_of(&socket); params_->set_input_unused(graph_input_index); } else { this->schedule_node(locked_node, current_task); } } } }); } void schedule_node(LockedNode &locked_node, CurrentTask ¤t_task) { BLI_assert(locked_node.node.is_function()); switch (locked_node.node_state.schedule_state) { case NodeScheduleState::NotScheduled: { locked_node.node_state.schedule_state = NodeScheduleState::Scheduled; const FunctionNode &node = static_cast(locked_node.node); if (this->use_multi_threading()) { std::lock_guard lock{current_task.mutex}; current_task.scheduled_nodes.append(&node); } else { current_task.scheduled_nodes.append(&node); } current_task.has_scheduled_nodes.store(true, std::memory_order_relaxed); break; } case NodeScheduleState::Scheduled: { break; } case NodeScheduleState::Running: { locked_node.node_state.schedule_state = NodeScheduleState::RunningAndRescheduled; break; } case NodeScheduleState::RunningAndRescheduled: { break; } } } void with_locked_node(const Node &node, NodeState &node_state, CurrentTask ¤t_task, const FunctionRef f) { BLI_assert(&node_state == node_states_[node.index_in_graph()]); LockedNode locked_node{node, node_state}; if (this->use_multi_threading()) { std::lock_guard lock{node_state.mutex}; threading::isolate_task([&]() { f(locked_node); }); } else { f(locked_node); } this->send_output_required_notifications(locked_node.delayed_required_outputs, current_task); this->send_output_unused_notifications(locked_node.delayed_unused_outputs, current_task); } void send_output_required_notifications(const Span sockets, CurrentTask ¤t_task) { for (const OutputSocket *socket : sockets) { this->notify_output_required(*socket, current_task); } } void send_output_unused_notifications(const Span sockets, CurrentTask ¤t_task) { for (const OutputSocket *socket : sockets) { this->notify_output_unused(*socket, current_task); } } void run_task(CurrentTask ¤t_task) { while (!current_task.scheduled_nodes.is_empty()) { const FunctionNode &node = *current_task.scheduled_nodes.pop_last(); if (current_task.scheduled_nodes.is_empty()) { current_task.has_scheduled_nodes.store(false, std::memory_order_relaxed); } this->run_node_task(node, current_task); } } void run_node_task(const FunctionNode &node, CurrentTask ¤t_task) { NodeState &node_state = *node_states_[node.index_in_graph()]; LinearAllocator<> &allocator = this->get_main_or_local_allocator(); const LazyFunction &fn = node.function(); bool node_needs_execution = false; this->with_locked_node(node, node_state, current_task, [&](LockedNode &locked_node) { BLI_assert(node_state.schedule_state == NodeScheduleState::Scheduled); node_state.schedule_state = NodeScheduleState::Running; if (node_state.node_has_finished) { return; } bool required_uncomputed_output_exists = false; for (OutputState &output_state : node_state.outputs) { output_state.usage_for_execution = output_state.usage; if (output_state.usage == ValueUsage::Used && !output_state.has_been_computed) { required_uncomputed_output_exists = true; } } if (!required_uncomputed_output_exists && !node_state.has_side_effects) { return; } if (node_state.had_initialization) { /* Initialize storage. */ node_state.storage = fn.init_storage(allocator); /* Load unlinked inputs. */ for (const int input_index : node.inputs().index_range()) { const InputSocket &input_socket = node.input(input_index); if (input_socket.origin() != nullptr) { continue; } InputState &input_state = node_state.inputs[input_index]; const CPPType &type = input_socket.type(); const void *default_value = input_socket.default_value(); BLI_assert(default_value != nullptr); if (self_.logger_ != nullptr) { self_.logger_->log_socket_value(input_socket, {type, default_value}, *context_); } void *buffer = allocator.allocate(type.size(), type.alignment()); type.copy_construct(default_value, buffer); this->forward_value_to_input(locked_node, input_state, {type, buffer}, current_task); } /* Request linked inputs that are always needed. */ const Span fn_inputs = fn.inputs(); for (const int input_index : fn_inputs.index_range()) { const Input &fn_input = fn_inputs[input_index]; if (fn_input.usage == ValueUsage::Used) { const InputSocket &input_socket = node.input(input_index); this->set_input_required(locked_node, input_socket); } } node_state.had_initialization = false; } for (const int input_index : node_state.inputs.index_range()) { InputState &input_state = node_state.inputs[input_index]; if (input_state.was_ready_for_execution) { continue; } if (input_state.value != nullptr) { input_state.was_ready_for_execution = true; continue; } if (input_state.usage == ValueUsage::Used) { return; } } node_needs_execution = true; }); if (node_needs_execution) { /* Importantly, the node must not be locked when it is executed. That would result in locks * being hold very long in some cases and results in multiple locks being hold by the same * thread in the same graph which can lead to deadlocks. */ this->execute_node(node, node_state, current_task); } this->with_locked_node(node, node_state, current_task, [&](LockedNode &locked_node) { #ifdef DEBUG if (node_needs_execution) { this->assert_expected_outputs_have_been_computed(locked_node); } #endif this->finish_node_if_possible(locked_node); const bool reschedule_requested = node_state.schedule_state == NodeScheduleState::RunningAndRescheduled; node_state.schedule_state = NodeScheduleState::NotScheduled; if (reschedule_requested && !node_state.node_has_finished) { this->schedule_node(locked_node, current_task); } }); } void assert_expected_outputs_have_been_computed(LockedNode &locked_node) { const FunctionNode &node = static_cast(locked_node.node); const NodeState &node_state = locked_node.node_state; if (node_state.missing_required_inputs > 0) { return; } if (node_state.schedule_state == NodeScheduleState::RunningAndRescheduled) { return; } Vector missing_outputs; for (const int i : node_state.outputs.index_range()) { const OutputState &output_state = node_state.outputs[i]; if (output_state.usage_for_execution == ValueUsage::Used) { if (!output_state.has_been_computed) { missing_outputs.append(&node.output(i)); } } } if (!missing_outputs.is_empty()) { if (self_.logger_ != nullptr) { self_.logger_->dump_when_outputs_are_missing(node, missing_outputs, *context_); } BLI_assert_unreachable(); } } void finish_node_if_possible(LockedNode &locked_node) { const Node &node = locked_node.node; NodeState &node_state = locked_node.node_state; if (node_state.node_has_finished) { /* Was finished already. */ return; } /* If there are outputs that may still be used, the node is not done yet. */ for (const OutputState &output_state : node_state.outputs) { if (output_state.usage != ValueUsage::Unused && !output_state.has_been_computed) { return; } } /* If the node is still waiting for inputs, it is not done yet. */ for (const InputState &input_state : node_state.inputs) { if (input_state.usage == ValueUsage::Used && !input_state.was_ready_for_execution) { return; } } node_state.node_has_finished = true; for (const int input_index : node_state.inputs.index_range()) { const InputSocket &input_socket = node.input(input_index); InputState &input_state = node_state.inputs[input_index]; if (input_state.usage == ValueUsage::Maybe) { this->set_input_unused(locked_node, input_socket); } else if (input_state.usage == ValueUsage::Used) { this->destruct_input_value_if_exists(input_state, input_socket.type()); } } if (node_state.storage != nullptr) { if (node.is_function()) { const FunctionNode &fn_node = static_cast(node); fn_node.function().destruct_storage(node_state.storage); } node_state.storage = nullptr; } } void destruct_input_value_if_exists(InputState &input_state, const CPPType &type) { if (input_state.value != nullptr) { type.destruct(input_state.value); input_state.value = nullptr; } } void execute_node(const FunctionNode &node, NodeState &node_state, CurrentTask ¤t_task); void set_input_unused_during_execution(const Node &node, NodeState &node_state, const int input_index, CurrentTask ¤t_task) { const InputSocket &input_socket = node.input(input_index); this->with_locked_node(node, node_state, current_task, [&](LockedNode &locked_node) { this->set_input_unused(locked_node, input_socket); }); } void set_input_unused(LockedNode &locked_node, const InputSocket &input_socket) { NodeState &node_state = locked_node.node_state; const int input_index = input_socket.index(); InputState &input_state = node_state.inputs[input_index]; BLI_assert(input_state.usage != ValueUsage::Used); if (input_state.usage == ValueUsage::Unused) { return; } input_state.usage = ValueUsage::Unused; this->destruct_input_value_if_exists(input_state, input_socket.type()); if (input_state.was_ready_for_execution) { return; } const OutputSocket *origin = input_socket.origin(); if (origin != nullptr) { locked_node.delayed_unused_outputs.append(origin); } } void *set_input_required_during_execution(const Node &node, NodeState &node_state, const int input_index, CurrentTask ¤t_task) { const InputSocket &input_socket = node.input(input_index); void *result; this->with_locked_node(node, node_state, current_task, [&](LockedNode &locked_node) { result = this->set_input_required(locked_node, input_socket); }); return result; } void *set_input_required(LockedNode &locked_node, const InputSocket &input_socket) { BLI_assert(&locked_node.node == &input_socket.node()); NodeState &node_state = locked_node.node_state; const int input_index = input_socket.index(); InputState &input_state = node_state.inputs[input_index]; BLI_assert(input_state.usage != ValueUsage::Unused); if (input_state.value != nullptr) { input_state.was_ready_for_execution = true; return input_state.value; } if (input_state.usage == ValueUsage::Used) { return nullptr; } input_state.usage = ValueUsage::Used; node_state.missing_required_inputs += 1; const OutputSocket *origin_socket = input_socket.origin(); /* Unlinked inputs are always loaded in advance. */ BLI_assert(origin_socket != nullptr); locked_node.delayed_required_outputs.append(origin_socket); return nullptr; } void forward_value_to_linked_inputs(const OutputSocket &from_socket, GMutablePointer value_to_forward, CurrentTask ¤t_task) { BLI_assert(value_to_forward.get() != nullptr); LinearAllocator<> &allocator = this->get_main_or_local_allocator(); const CPPType &type = *value_to_forward.type(); if (self_.logger_ != nullptr) { self_.logger_->log_socket_value(from_socket, value_to_forward, *context_); } const Span targets = from_socket.targets(); for (const InputSocket *target_socket : targets) { const Node &target_node = target_socket->node(); NodeState &node_state = *node_states_[target_node.index_in_graph()]; const int input_index = target_socket->index(); InputState &input_state = node_state.inputs[input_index]; const bool is_last_target = target_socket == targets.last(); #ifdef DEBUG if (input_state.value != nullptr) { if (self_.logger_ != nullptr) { self_.logger_->dump_when_input_is_set_twice(*target_socket, from_socket, *context_); } BLI_assert_unreachable(); } #endif BLI_assert(!input_state.was_ready_for_execution); BLI_assert(target_socket->type() == type); BLI_assert(target_socket->origin() == &from_socket); if (self_.logger_ != nullptr) { self_.logger_->log_socket_value(*target_socket, value_to_forward, *context_); } if (target_node.is_dummy()) { /* Forward the value to the outside of the graph. */ const int graph_output_index = self_.graph_outputs_.index_of_try(target_socket); if (graph_output_index != -1 && params_->get_output_usage(graph_output_index) != ValueUsage::Unused) { void *dst_buffer = params_->get_output_data_ptr(graph_output_index); if (is_last_target) { type.move_construct(value_to_forward.get(), dst_buffer); } else { type.copy_construct(value_to_forward.get(), dst_buffer); } params_->output_set(graph_output_index); } continue; } this->with_locked_node(target_node, node_state, current_task, [&](LockedNode &locked_node) { if (input_state.usage == ValueUsage::Unused) { return; } if (is_last_target) { /* No need to make a copy if this is the last target. */ this->forward_value_to_input(locked_node, input_state, value_to_forward, current_task); value_to_forward = {}; } else { void *buffer = allocator.allocate(type.size(), type.alignment()); type.copy_construct(value_to_forward.get(), buffer); this->forward_value_to_input(locked_node, input_state, {type, buffer}, current_task); } }); } if (value_to_forward.get() != nullptr) { value_to_forward.destruct(); } } void forward_value_to_input(LockedNode &locked_node, InputState &input_state, GMutablePointer value, CurrentTask ¤t_task) { NodeState &node_state = locked_node.node_state; BLI_assert(input_state.value == nullptr); BLI_assert(!input_state.was_ready_for_execution); input_state.value = value.get(); if (input_state.usage == ValueUsage::Used) { node_state.missing_required_inputs -= 1; if (node_state.missing_required_inputs == 0) { this->schedule_node(locked_node, current_task); } } } bool use_multi_threading() const { return task_pool_.load() != nullptr; } bool try_enable_multi_threading() { if (this->use_multi_threading()) { return true; } #ifdef FN_LAZY_FUNCTION_DEBUG_THREADS /* Only the current main thread is allowed to enabled multi-threading, because the executor is * still in single-threaded mode. */ if (current_main_thread_ != std::this_thread::get_id()) { BLI_assert_unreachable(); } #endif /* Check of the caller supports multi-threading. */ if (!params_->try_enable_multi_threading()) { return false; } /* Avoid using multiple threads when only one thread can be used anyway. */ if (BLI_system_thread_count() <= 1) { return false; } task_pool_.store(BLI_task_pool_create(this, TASK_PRIORITY_HIGH)); return true; } /** * Allow other threads to steal all the nodes that are currently scheduled on this thread. */ void move_scheduled_nodes_to_task_pool(CurrentTask ¤t_task) { BLI_assert(this->use_multi_threading()); using FunctionNodeVector = Vector; FunctionNodeVector *nodes = MEM_new(__func__); { std::lock_guard lock{current_task.mutex}; if (current_task.scheduled_nodes.is_empty()) { return; } *nodes = std::move(current_task.scheduled_nodes); current_task.has_scheduled_nodes.store(false, std::memory_order_relaxed); } /* All nodes are pushed as a single task in the pool. This avoids unnecessary threading * overhead when the nodes are fast to compute. */ BLI_task_pool_push( task_pool_.load(), [](TaskPool *pool, void *data) { Executor &executor = *static_cast(BLI_task_pool_user_data(pool)); FunctionNodeVector &nodes = *static_cast(data); CurrentTask new_current_task; new_current_task.scheduled_nodes = std::move(nodes); new_current_task.has_scheduled_nodes.store(true, std::memory_order_relaxed); executor.run_task(new_current_task); }, nodes, true, [](TaskPool * /*pool*/, void *data) { MEM_delete(static_cast(data)); }); } LinearAllocator<> &get_main_or_local_allocator() { if (this->use_multi_threading()) { return local_allocators_.local(); } return *main_local_allocator_; } }; class GraphExecutorLFParams final : public Params { private: Executor &executor_; const Node &node_; NodeState &node_state_; CurrentTask ¤t_task_; public: GraphExecutorLFParams(const LazyFunction &fn, Executor &executor, const Node &node, NodeState &node_state, CurrentTask ¤t_task) : Params(fn, executor.use_multi_threading()), executor_(executor), node_(node), node_state_(node_state), current_task_(current_task) { } private: void *try_get_input_data_ptr_impl(const int index) const override { const InputState &input_state = node_state_.inputs[index]; if (input_state.was_ready_for_execution) { return input_state.value; } return nullptr; } void *try_get_input_data_ptr_or_request_impl(const int index) override { const InputState &input_state = node_state_.inputs[index]; if (input_state.was_ready_for_execution) { return input_state.value; } return executor_.set_input_required_during_execution(node_, node_state_, index, current_task_); } void *get_output_data_ptr_impl(const int index) override { OutputState &output_state = node_state_.outputs[index]; BLI_assert(!output_state.has_been_computed); if (output_state.value == nullptr) { LinearAllocator<> &allocator = executor_.get_main_or_local_allocator(); const CPPType &type = node_.output(index).type(); output_state.value = allocator.allocate(type.size(), type.alignment()); } return output_state.value; } void output_set_impl(const int index) override { OutputState &output_state = node_state_.outputs[index]; BLI_assert(!output_state.has_been_computed); BLI_assert(output_state.value != nullptr); const OutputSocket &output_socket = node_.output(index); executor_.forward_value_to_linked_inputs( output_socket, {output_socket.type(), output_state.value}, current_task_); output_state.value = nullptr; output_state.has_been_computed = true; } bool output_was_set_impl(const int index) const override { const OutputState &output_state = node_state_.outputs[index]; return output_state.has_been_computed; } ValueUsage get_output_usage_impl(const int index) const override { const OutputState &output_state = node_state_.outputs[index]; return output_state.usage_for_execution; } void set_input_unused_impl(const int index) override { executor_.set_input_unused_during_execution(node_, node_state_, index, current_task_); } bool try_enable_multi_threading_impl() override { return executor_.try_enable_multi_threading(); } }; /** * Actually execute the node. * * Making this `inline` results in a simpler back-trace in release builds. */ inline void Executor::execute_node(const FunctionNode &node, NodeState &node_state, CurrentTask ¤t_task) { const LazyFunction &fn = node.function(); GraphExecutorLFParams node_params{fn, *this, node, node_state, current_task}; BLI_assert(context_ != nullptr); Context fn_context = *context_; fn_context.storage = node_state.storage; if (self_.logger_ != nullptr) { self_.logger_->log_before_node_execute(node, node_params, fn_context); } /* This is run when the execution of the node calls `lazy_threading::send_hint` to indicate that * the execution will take a while. In this case, other tasks waiting on this thread should be * allowed to be picked up by another thread. */ auto blocking_hint_fn = [&]() { if (!current_task.has_scheduled_nodes.load()) { return; } if (!this->try_enable_multi_threading()) { return; } this->move_scheduled_nodes_to_task_pool(current_task); }; lazy_threading::HintReceiver blocking_hint_receiver{blocking_hint_fn}; fn.execute(node_params, fn_context); if (self_.logger_ != nullptr) { self_.logger_->log_after_node_execute(node, node_params, fn_context); } } GraphExecutor::GraphExecutor(const Graph &graph, const Span graph_inputs, const Span graph_outputs, const Logger *logger, const SideEffectProvider *side_effect_provider) : graph_(graph), graph_inputs_(graph_inputs), graph_outputs_(graph_outputs), logger_(logger), side_effect_provider_(side_effect_provider) { for (const OutputSocket *socket : graph_inputs_) { BLI_assert(socket->node().is_dummy()); inputs_.append({"In", socket->type(), ValueUsage::Maybe}); } for (const InputSocket *socket : graph_outputs_) { BLI_assert(socket->node().is_dummy()); outputs_.append({"Out", socket->type()}); } } void GraphExecutor::execute_impl(Params ¶ms, const Context &context) const { Executor &executor = *static_cast(context.storage); executor.execute(params, context); } void *GraphExecutor::init_storage(LinearAllocator<> &allocator) const { Executor &executor = *allocator.construct(*this).release(); return &executor; } void GraphExecutor::destruct_storage(void *storage) const { std::destroy_at(static_cast(storage)); } void GraphExecutorLogger::log_socket_value(const Socket &socket, const GPointer value, const Context &context) const { UNUSED_VARS(socket, value, context); } void GraphExecutorLogger::log_before_node_execute(const FunctionNode &node, const Params ¶ms, const Context &context) const { UNUSED_VARS(node, params, context); } void GraphExecutorLogger::log_after_node_execute(const FunctionNode &node, const Params ¶ms, const Context &context) const { UNUSED_VARS(node, params, context); } Vector GraphExecutorSideEffectProvider::get_nodes_with_side_effects( const Context &context) const { UNUSED_VARS(context); return {}; } void GraphExecutorLogger::dump_when_outputs_are_missing(const FunctionNode &node, Span missing_sockets, const Context &context) const { UNUSED_VARS(node, missing_sockets, context); } void GraphExecutorLogger::dump_when_input_is_set_twice(const InputSocket &target_socket, const OutputSocket &from_socket, const Context &context) const { UNUSED_VARS(target_socket, from_socket, context); } } // namespace blender::fn::lazy_function