diff options
Diffstat (limited to 'source/blender/functions/intern')
5 files changed, 1478 insertions, 0 deletions
diff --git a/source/blender/functions/intern/cpp_types.cc b/source/blender/functions/intern/cpp_types.cc index 5c43fffdd61..f046da30994 100644 --- a/source/blender/functions/intern/cpp_types.cc +++ b/source/blender/functions/intern/cpp_types.cc @@ -16,3 +16,6 @@ MAKE_FIELD_CPP_TYPE(BoolField, bool); MAKE_FIELD_CPP_TYPE(Int8Field, int8_t); MAKE_FIELD_CPP_TYPE(Int32Field, int32_t); MAKE_FIELD_CPP_TYPE(StringField, std::string); +BLI_CPP_TYPE_MAKE(StringValueOrFieldVector, + blender::Vector<blender::fn::ValueOrField<std::string>>, + CPPTypeFlags::None); diff --git a/source/blender/functions/intern/lazy_function.cc b/source/blender/functions/intern/lazy_function.cc new file mode 100644 index 00000000000..46572283e9b --- /dev/null +++ b/source/blender/functions/intern/lazy_function.cc @@ -0,0 +1,66 @@ +/* SPDX-License-Identifier: GPL-2.0-or-later */ + +/** \file + * \ingroup fn + */ + +#include "BLI_array.hh" + +#include "FN_lazy_function.hh" + +namespace blender::fn::lazy_function { + +std::string LazyFunction::name() const +{ + return debug_name_; +} + +std::string LazyFunction::input_name(int index) const +{ + return inputs_[index].debug_name; +} + +std::string LazyFunction::output_name(int index) const +{ + return outputs_[index].debug_name; +} + +void *LazyFunction::init_storage(LinearAllocator<> &UNUSED(allocator)) const +{ + return nullptr; +} + +void LazyFunction::destruct_storage(void *storage) const +{ + BLI_assert(storage == nullptr); + UNUSED_VARS_NDEBUG(storage); +} + +bool LazyFunction::always_used_inputs_available(const Params ¶ms) const +{ + for (const int i : inputs_.index_range()) { + const Input &fn_input = inputs_[i]; + if (fn_input.usage == ValueUsage::Used) { + if (params.try_get_input_data_ptr(i) == nullptr) { + return false; + } + } + } + return true; +} + +void Params::set_default_remaining_outputs() +{ + for (const int i : fn_.outputs().index_range()) { + if (this->output_was_set(i)) { + continue; + } + const Output &fn_output = fn_.outputs()[i]; + const CPPType &type = *fn_output.type; + void *data_ptr = this->get_output_data_ptr(i); + type.value_initialize(data_ptr); + this->output_set(i); + } +} + +} // namespace blender::fn::lazy_function diff --git a/source/blender/functions/intern/lazy_function_execute.cc b/source/blender/functions/intern/lazy_function_execute.cc new file mode 100644 index 00000000000..279056afa99 --- /dev/null +++ b/source/blender/functions/intern/lazy_function_execute.cc @@ -0,0 +1,65 @@ +/* SPDX-License-Identifier: GPL-2.0-or-later */ + +/** \file + * \ingroup fn + */ + +#include "FN_lazy_function_execute.hh" + +namespace blender::fn::lazy_function { + +BasicParams::BasicParams(const LazyFunction &fn, + const Span<GMutablePointer> inputs, + const Span<GMutablePointer> outputs, + MutableSpan<std::optional<ValueUsage>> input_usages, + Span<ValueUsage> output_usages, + MutableSpan<bool> set_outputs) + : Params(fn), + inputs_(inputs), + outputs_(outputs), + input_usages_(input_usages), + output_usages_(output_usages), + set_outputs_(set_outputs) +{ +} + +void *BasicParams::try_get_input_data_ptr_impl(const int index) const +{ + return inputs_[index].get(); +} + +void *BasicParams::try_get_input_data_ptr_or_request_impl(const int index) +{ + void *value = inputs_[index].get(); + if (value == nullptr) { + input_usages_[index] = ValueUsage::Used; + } + return value; +} + +void *BasicParams::get_output_data_ptr_impl(const int index) +{ + return outputs_[index].get(); +} + +void BasicParams::output_set_impl(const int index) +{ + set_outputs_[index] = true; +} + +bool BasicParams::output_was_set_impl(const int index) const +{ + return set_outputs_[index]; +} + +ValueUsage BasicParams::get_output_usage_impl(const int index) const +{ + return output_usages_[index]; +} + +void BasicParams::set_input_unused_impl(const int index) +{ + input_usages_[index] = ValueUsage::Unused; +} + +} // namespace blender::fn::lazy_function diff --git a/source/blender/functions/intern/lazy_function_graph.cc b/source/blender/functions/intern/lazy_function_graph.cc new file mode 100644 index 00000000000..cc55b70d166 --- /dev/null +++ b/source/blender/functions/intern/lazy_function_graph.cc @@ -0,0 +1,181 @@ +/* SPDX-License-Identifier: GPL-2.0-or-later */ + +#include "BLI_dot_export.hh" + +#include "FN_lazy_function_graph.hh" + +namespace blender::fn::lazy_function { + +Graph::~Graph() +{ + for (Node *node : nodes_) { + for (InputSocket *socket : node->inputs_) { + std::destroy_at(socket); + } + for (OutputSocket *socket : node->outputs_) { + std::destroy_at(socket); + } + std::destroy_at(node); + } +} + +FunctionNode &Graph::add_function(const LazyFunction &fn) +{ + const Span<Input> inputs = fn.inputs(); + const Span<Output> outputs = fn.outputs(); + + FunctionNode &node = *allocator_.construct<FunctionNode>().release(); + node.fn_ = &fn; + node.inputs_ = allocator_.construct_elements_and_pointer_array<InputSocket>(inputs.size()); + node.outputs_ = allocator_.construct_elements_and_pointer_array<OutputSocket>(outputs.size()); + + for (const int i : inputs.index_range()) { + InputSocket &socket = *node.inputs_[i]; + socket.index_in_node_ = i; + socket.is_input_ = true; + socket.node_ = &node; + socket.type_ = inputs[i].type; + } + for (const int i : outputs.index_range()) { + OutputSocket &socket = *node.outputs_[i]; + socket.index_in_node_ = i; + socket.is_input_ = false; + socket.node_ = &node; + socket.type_ = outputs[i].type; + } + + nodes_.append(&node); + return node; +} + +DummyNode &Graph::add_dummy(Span<const CPPType *> input_types, Span<const CPPType *> output_types) +{ + DummyNode &node = *allocator_.construct<DummyNode>().release(); + node.fn_ = nullptr; + node.inputs_ = allocator_.construct_elements_and_pointer_array<InputSocket>(input_types.size()); + node.outputs_ = allocator_.construct_elements_and_pointer_array<OutputSocket>( + output_types.size()); + + for (const int i : input_types.index_range()) { + InputSocket &socket = *node.inputs_[i]; + socket.index_in_node_ = i; + socket.is_input_ = true; + socket.node_ = &node; + socket.type_ = input_types[i]; + } + for (const int i : output_types.index_range()) { + OutputSocket &socket = *node.outputs_[i]; + socket.index_in_node_ = i; + socket.is_input_ = false; + socket.node_ = &node; + socket.type_ = output_types[i]; + } + + nodes_.append(&node); + return node; +} + +void Graph::add_link(OutputSocket &from, InputSocket &to) +{ + BLI_assert(to.origin_ == nullptr); + BLI_assert(from.type_ == to.type_); + to.origin_ = &from; + from.targets_.append(&to); +} + +void Graph::update_node_indices() +{ + for (const int i : nodes_.index_range()) { + nodes_[i]->index_in_graph_ = i; + } +} + +bool Graph::node_indices_are_valid() const +{ + for (const int i : nodes_.index_range()) { + if (nodes_[i]->index_in_graph_ != i) { + return false; + } + } + return true; +} + +std::string Socket::name() const +{ + if (node_->is_function()) { + const FunctionNode &fn_node = static_cast<const FunctionNode &>(*node_); + const LazyFunction &fn = fn_node.function(); + if (is_input_) { + return fn.input_name(index_in_node_); + } + return fn.output_name(index_in_node_); + } + return "Unnamed"; +} + +std::string Node::name() const +{ + if (fn_ == nullptr) { + return static_cast<const DummyNode *>(this)->name_; + } + return fn_->name(); +} + +std::string Graph::to_dot() const +{ + dot::DirectedGraph digraph; + digraph.set_rankdir(dot::Attr_rankdir::LeftToRight); + + Map<const Node *, dot::NodeWithSocketsRef> dot_nodes; + + for (const Node *node : nodes_) { + dot::Node &dot_node = digraph.new_node(""); + if (node->is_dummy()) { + dot_node.set_background_color("lightblue"); + } + else { + dot_node.set_background_color("white"); + } + + Vector<std::string> input_names; + Vector<std::string> output_names; + for (const InputSocket *socket : node->inputs()) { + input_names.append(socket->name()); + } + for (const OutputSocket *socket : node->outputs()) { + output_names.append(socket->name()); + } + + dot_nodes.add_new(node, + dot::NodeWithSocketsRef(dot_node, node->name(), input_names, output_names)); + } + + for (const Node *node : nodes_) { + for (const InputSocket *socket : node->inputs()) { + const dot::NodeWithSocketsRef &to_dot_node = dot_nodes.lookup(&socket->node()); + const dot::NodePort to_dot_port = to_dot_node.input(socket->index()); + + if (const OutputSocket *origin = socket->origin()) { + dot::NodeWithSocketsRef &from_dot_node = dot_nodes.lookup(&origin->node()); + digraph.new_edge(from_dot_node.output(origin->index()), to_dot_port); + } + else if (const void *default_value = socket->default_value()) { + const CPPType &type = socket->type(); + std::string value_string; + if (type.is_printable()) { + value_string = type.to_string(default_value); + } + else { + value_string = "<" + type.name() + ">"; + } + dot::Node &default_value_dot_node = digraph.new_node(value_string); + default_value_dot_node.set_shape(dot::Attr_shape::Ellipse); + digraph.new_edge(default_value_dot_node, to_dot_port); + } + } + } + + return digraph.to_dot_string(); +} + +} // namespace blender::fn::lazy_function diff --git a/source/blender/functions/intern/lazy_function_graph_executor.cc b/source/blender/functions/intern/lazy_function_graph_executor.cc new file mode 100644 index 00000000000..176509bd687 --- /dev/null +++ b/source/blender/functions/intern/lazy_function_graph_executor.cc @@ -0,0 +1,1163 @@ +/* SPDX-License-Identifier: GPL-2.0-or-later */ + +/** + * This file implements the evaluation of a lazy-function graph. It's main objectives are: + * - Only compute values that are actually used. + * - Allow spreading the work over an arbitrary number of CPU cores. + * + * Other (simpler) executors with different main objectives could be implemented in the future. For + * some scenarios those could be simpler when many nodes do very little work or most nodes have to + * be processed sequentially. Those assumptions make the first and second objective less important + * respectively. + * + * The design implemented in this executor requires *no* main thread that coordinates everything. + * Instead, one thread will trigger some initial work and then many threads coordinate themselves + * in a distributed fashion. In an ideal situation, every thread ends up processing a separate part + * of the graph which results in less communication overhead. The way TBB schedules tasks helps + * with that: a thread will next process the task that it added to a task pool just before. + * + * Communication between threads is synchronized by using a mutex in every node. When a thread + * wants to access the state of a node, its mutex has to be locked first (with some documented + * exceptions). The assumption here is that most nodes are only ever touched by a single thread and + * therefore the lock contention is reduced the more nodes there are. + * + * Similar to how a #LazyFunction can be thought of as a state machine (see `FN_lazy_function.hh`), + * each node can also be thought of as a state machine. The state of a node contains the evaluation + * state of its inputs and outputs. Every time a node is executed, it has to advance its state in + * some way (e.g. it requests a new input or computes a new output). + * + * At the core of the executor is a task pool. Every task in that pool represents a node execution. + * When a node is executed it may send notifications to other nodes which may in turn add those + * nodes to the task pool. For example, the current node has computed one of its outputs, then the + * computed value is forwarded to all linked inputs, changing their node states in the process. If + * this input was the last missing required input, the node will be added to the task pool so that + * it is executed next. + * + * When the task pool is empty, the executor gives back control to the caller which may later + * provide new inputs to the graph which in turn adds new nodes to the task pool and the process + * starts again. + */ + +#include <mutex> + +#include "BLI_compute_context.hh" +#include "BLI_enumerable_thread_specific.hh" +#include "BLI_function_ref.hh" +#include "BLI_task.h" +#include "BLI_task.hh" +#include "BLI_timeit.hh" + +#include "FN_lazy_function_graph_executor.hh" + +namespace blender::fn::lazy_function { + +enum class NodeScheduleState { + /** + * Default state of every node. + */ + NotScheduled, + /** + * The node has been added to the task pool or is otherwise scheduled to be executed in the + * future. + */ + Scheduled, + /** + * The node is currently running. + */ + Running, + /** + * The node is running and has been rescheduled while running. In this case the node run again. + * This state exists, because we don't want to add the node to the task pool twice, because then + * the node might run twice at the same time, which is not allowed. Instead, once the node is + * done running, it will reschedule itself. + */ + RunningAndRescheduled, +}; + +struct InputState { + /** + * Value of this input socket. By default, the value is empty. When other nodes are done + * computing their outputs, the computed values will be forwarded to linked input sockets. The + * value will then live here until it is found that it is not needed anymore. + * + * If #was_ready_for_execution is true, access does not require holding the node lock. + */ + void *value = nullptr; + /** + * How the node intends to use this input. By default, all inputs may be used. Based on which + * outputs are used, a node can decide that an input will definitely be used or is never used. + * This allows freeing values early and avoids unnecessary computations. + */ + ValueUsage usage = ValueUsage::Maybe; + /** + * Set to true once #value is set and will stay true afterwards. Access during execution of a + * node, does not require holding the node lock. + */ + bool was_ready_for_execution = false; +}; + +struct OutputState { + /** + * Keeps track of how the output value is used. If a connected input becomes used, this output + * has to become used as well. The output becomes unused when it is used by no input socket + * anymore and it's not an output of the graph. + */ + ValueUsage usage = ValueUsage::Maybe; + /** + * This is a copy of #usage that is done right before node execution starts. This is done so that + * the node gets a consistent view of what outputs are used, even when this changes while the + * node is running (the node might be reevaluated in that case). Access during execution of a + * node, does not require holding the node lock. + */ + ValueUsage usage_for_execution = ValueUsage::Maybe; + /** + * Number of linked sockets that might still use the value of this output. + */ + int potential_target_sockets = 0; + /** + * Is set to true once the output has been computed and then stays true. Access does not require + * holding the node lock. + */ + bool has_been_computed = false; + /** + * Holds the output value for a short period of time while the node is initializing it and before + * it's forwarded to input sockets. Access does not require holding the node lock. + */ + void *value = nullptr; +}; + +struct NodeState { + /** + * Needs to be locked when any data in this state is accessed that is not explicitly marked as + * not needing the lock. + */ + mutable std::mutex mutex; + /** + * States of the individual input and output sockets. One can index into these arrays without + * locking. However, to access data inside, a lock is needed unless noted otherwise. + */ + MutableSpan<InputState> inputs; + MutableSpan<OutputState> outputs; + /** + * Counts the number of inputs that still have to be provided to this node, until it should run + * again. This is used as an optimization so that nodes are not scheduled unnecessarily in many + * cases. + */ + int missing_required_inputs = 0; + /** + * Is set to true once the node is done with its work, i.e. when all outputs that may be used + * have been computed. + */ + bool node_has_finished = false; + /** + * Set to true once the node is done running for the first time. + */ + bool had_initialization = true; + /** + * Nodes with side effects should always be executed when their required inputs have been + * computed. + */ + bool has_side_effects = false; + /** + * A node is always in one specific schedule state. This helps to ensure that the same node does + * not run twice at the same time accidentally. + */ + NodeScheduleState schedule_state = NodeScheduleState::NotScheduled; + /** + * Custom storage of the node. + */ + void *storage = nullptr; +}; + +/** + * Utility class that wraps a node whose state is locked. Having this is a separate class is useful + * because it allows methods to communicate that they expect the node to be locked. + */ +struct LockedNode { + /** + * This is the node that is currently locked. + */ + const Node &node; + NodeState &node_state; + + /** + * Used to delay notifying (and therefore locking) other nodes until the current node is not + * locked anymore. This might not be strictly necessary to avoid deadlocks in the current code, + * but is a good measure to avoid accidentally adding a deadlock later on. By not locking more + * than one node per thread at a time, deadlocks are avoided. + * + * The notifications will be send right after the node is not locked anymore. + */ + Vector<const OutputSocket *> delayed_required_outputs; + Vector<const OutputSocket *> delayed_unused_outputs; + Vector<const FunctionNode *> delayed_scheduled_nodes; + + LockedNode(const Node &node, NodeState &node_state) : node(node), node_state(node_state) + { + } +}; + +struct CurrentTask { + /** + * The node that should be run on the same thread after the current node is done. This avoids + * some overhead by skipping a round trip through the task pool. + */ + std::atomic<const FunctionNode *> next_node = nullptr; + /** + * Indicates that some node has been added to the task pool. + */ + std::atomic<bool> added_node_to_pool = false; +}; + +class GraphExecutorLFParams; + +class Executor { + private: + const GraphExecutor &self_; + /** + * Remembers which inputs have been loaded from the caller already, to avoid loading them twice. + * Atomics are used to make sure that every input is only retrieved once. + */ + Array<std::atomic<uint8_t>> loaded_inputs_; + /** + * State of every node, indexed by #Node::index_in_graph. + */ + Array<NodeState *> node_states_; + /** + * Parameters provided by the caller. This is always non-null, while a node is running. + */ + Params *params_ = nullptr; + const Context *context_ = nullptr; + /** + * Used to distribute work on separate nodes to separate threads. + */ + TaskPool *task_pool_ = nullptr; + /** + * A separate linear allocator for every thread. We could potentially reuse some memory, but that + * doesn't seem worth it yet. + */ + threading::EnumerableThreadSpecific<LinearAllocator<>> local_allocators_; + /** + * Set to false when the first execution ends. + */ + bool is_first_execution_ = true; + + friend GraphExecutorLFParams; + + public: + Executor(const GraphExecutor &self) : self_(self), loaded_inputs_(self.graph_inputs_.size()) + { + /* The indices are necessary, because they are used as keys in #node_states_. */ + BLI_assert(self_.graph_.node_indices_are_valid()); + } + + ~Executor() + { + BLI_task_pool_free(task_pool_); + threading::parallel_for(node_states_.index_range(), 1024, [&](const IndexRange range) { + for (const int node_index : range) { + const Node &node = *self_.graph_.nodes()[node_index]; + NodeState &node_state = *node_states_[node_index]; + this->destruct_node_state(node, node_state); + } + }); + } + + /** + * Main entry point to the execution of this graph. + */ + void execute(Params ¶ms, const Context &context) + { + params_ = ¶ms; + context_ = &context; + BLI_SCOPED_DEFER([&]() { + /* Make sure the #params_ pointer is not dangling, even when it shouldn't be accessed by + * anyone. */ + params_ = nullptr; + context_ = nullptr; + is_first_execution_ = false; + }); + + CurrentTask current_task; + if (is_first_execution_) { + this->initialize_node_states(); + task_pool_ = BLI_task_pool_create(this, TASK_PRIORITY_HIGH); + + /* Initialize atomics to zero. */ + memset(static_cast<void *>(loaded_inputs_.data()), 0, loaded_inputs_.size() * sizeof(bool)); + + this->set_always_unused_graph_inputs(); + this->set_defaulted_graph_outputs(); + this->schedule_side_effect_nodes(current_task); + } + + this->schedule_newly_requested_outputs(current_task); + this->forward_newly_provided_inputs(current_task); + + /* Avoid using task pool when there is no parallel work to do. */ + while (!current_task.added_node_to_pool) { + if (current_task.next_node == nullptr) { + /* Nothing to do. */ + return; + } + const FunctionNode &node = *current_task.next_node; + current_task.next_node = nullptr; + this->run_node_task(node, current_task); + } + if (current_task.next_node != nullptr) { + this->add_node_to_task_pool(*current_task.next_node); + } + + BLI_task_pool_work_and_wait(task_pool_); + } + + private: + void initialize_node_states() + { + Span<const Node *> nodes = self_.graph_.nodes(); + node_states_.reinitialize(nodes.size()); + + /* Construct all node states in parallel. */ + threading::parallel_for(nodes.index_range(), 256, [&](const IndexRange range) { + LinearAllocator<> &allocator = local_allocators_.local(); + for (const int i : range) { + const Node &node = *nodes[i]; + NodeState &node_state = *allocator.construct<NodeState>().release(); + node_states_[i] = &node_state; + this->construct_initial_node_state(allocator, node, node_state); + } + }); + } + + void construct_initial_node_state(LinearAllocator<> &allocator, + const Node &node, + NodeState &node_state) + { + const Span<const InputSocket *> node_inputs = node.inputs(); + const Span<const OutputSocket *> node_outputs = node.outputs(); + + node_state.inputs = allocator.construct_array<InputState>(node_inputs.size()); + node_state.outputs = allocator.construct_array<OutputState>(node_outputs.size()); + + for (const int i : node_outputs.index_range()) { + OutputState &output_state = node_state.outputs[i]; + const OutputSocket &output_socket = *node_outputs[i]; + output_state.potential_target_sockets = output_socket.targets().size(); + if (output_state.potential_target_sockets == 0) { + output_state.usage = ValueUsage::Unused; + } + } + } + + void destruct_node_state(const Node &node, NodeState &node_state) + { + if (node.is_function()) { + const LazyFunction &fn = static_cast<const FunctionNode &>(node).function(); + if (node_state.storage != nullptr) { + fn.destruct_storage(node_state.storage); + } + } + for (const int i : node.inputs().index_range()) { + InputState &input_state = node_state.inputs[i]; + const InputSocket &input_socket = node.input(i); + this->destruct_input_value_if_exists(input_state, input_socket.type()); + } + std::destroy_at(&node_state); + } + + void schedule_newly_requested_outputs(CurrentTask ¤t_task) + { + for (const int graph_output_index : self_.graph_outputs_.index_range()) { + if (params_->get_output_usage(graph_output_index) != ValueUsage::Used) { + continue; + } + if (params_->output_was_set(graph_output_index)) { + continue; + } + const InputSocket &socket = *self_.graph_outputs_[graph_output_index]; + const Node &node = socket.node(); + NodeState &node_state = *node_states_[node.index_in_graph()]; + this->with_locked_node(node, node_state, current_task, [&](LockedNode &locked_node) { + this->set_input_required(locked_node, socket); + }); + } + } + + void set_defaulted_graph_outputs() + { + for (const int graph_output_index : self_.graph_outputs_.index_range()) { + const InputSocket &socket = *self_.graph_outputs_[graph_output_index]; + if (socket.origin() != nullptr) { + continue; + } + const CPPType &type = socket.type(); + const void *default_value = socket.default_value(); + BLI_assert(default_value != nullptr); + + if (self_.logger_ != nullptr) { + self_.logger_->log_socket_value(socket, {type, default_value}, *context_); + } + + void *output_ptr = params_->get_output_data_ptr(graph_output_index); + type.copy_construct(default_value, output_ptr); + params_->output_set(graph_output_index); + } + } + + void set_always_unused_graph_inputs() + { + for (const int i : self_.graph_inputs_.index_range()) { + const OutputSocket &socket = *self_.graph_inputs_[i]; + const Node &node = socket.node(); + const NodeState &node_state = *node_states_[node.index_in_graph()]; + const OutputState &output_state = node_state.outputs[socket.index()]; + if (output_state.usage == ValueUsage::Unused) { + params_->set_input_unused(i); + } + } + } + + void schedule_side_effect_nodes(CurrentTask ¤t_task) + { + if (self_.side_effect_provider_ != nullptr) { + const Vector<const FunctionNode *> side_effect_nodes = + self_.side_effect_provider_->get_nodes_with_side_effects(*context_); + for (const FunctionNode *node : side_effect_nodes) { + NodeState &node_state = *node_states_[node->index_in_graph()]; + node_state.has_side_effects = true; + this->with_locked_node(*node, node_state, current_task, [&](LockedNode &locked_node) { + this->schedule_node(locked_node); + }); + } + } + } + + void forward_newly_provided_inputs(CurrentTask ¤t_task) + { + LinearAllocator<> &allocator = local_allocators_.local(); + for (const int graph_input_index : self_.graph_inputs_.index_range()) { + std::atomic<uint8_t> &was_loaded = loaded_inputs_[graph_input_index]; + if (was_loaded.load()) { + continue; + } + void *input_data = params_->try_get_input_data_ptr(graph_input_index); + if (input_data == nullptr) { + continue; + } + if (was_loaded.fetch_or(1)) { + /* The value was forwarded before. */ + continue; + } + this->forward_newly_provided_input(current_task, allocator, graph_input_index, input_data); + } + } + + void forward_newly_provided_input(CurrentTask ¤t_task, + LinearAllocator<> &allocator, + const int graph_input_index, + void *input_data) + { + const OutputSocket &socket = *self_.graph_inputs_[graph_input_index]; + const CPPType &type = socket.type(); + void *buffer = allocator.allocate(type.size(), type.alignment()); + type.move_construct(input_data, buffer); + this->forward_value_to_linked_inputs(socket, {type, buffer}, current_task); + } + + void notify_output_required(const OutputSocket &socket, CurrentTask ¤t_task) + { + const Node &node = socket.node(); + const int index_in_node = socket.index(); + NodeState &node_state = *node_states_[node.index_in_graph()]; + OutputState &output_state = node_state.outputs[index_in_node]; + + /* The notified output socket might be an input of the entire graph. In this case, notify the + * caller that the input is required. */ + if (node.is_dummy()) { + const int graph_input_index = self_.graph_inputs_.index_of(&socket); + std::atomic<uint8_t> &was_loaded = loaded_inputs_[graph_input_index]; + if (was_loaded.load()) { + return; + } + void *input_data = params_->try_get_input_data_ptr_or_request(graph_input_index); + if (input_data == nullptr) { + return; + } + if (was_loaded.fetch_or(1)) { + /* The value was forwarded already. */ + return; + } + this->forward_newly_provided_input( + current_task, local_allocators_.local(), graph_input_index, input_data); + return; + } + + BLI_assert(node.is_function()); + this->with_locked_node(node, node_state, current_task, [&](LockedNode &locked_node) { + if (output_state.usage == ValueUsage::Used) { + return; + } + output_state.usage = ValueUsage::Used; + this->schedule_node(locked_node); + }); + } + + void notify_output_unused(const OutputSocket &socket, CurrentTask ¤t_task) + { + const Node &node = socket.node(); + const int index_in_node = socket.index(); + NodeState &node_state = *node_states_[node.index_in_graph()]; + OutputState &output_state = node_state.outputs[index_in_node]; + + this->with_locked_node(node, node_state, current_task, [&](LockedNode &locked_node) { + output_state.potential_target_sockets -= 1; + if (output_state.potential_target_sockets == 0) { + BLI_assert(output_state.usage != ValueUsage::Unused); + if (output_state.usage == ValueUsage::Maybe) { + output_state.usage = ValueUsage::Unused; + if (node.is_dummy()) { + const int graph_input_index = self_.graph_inputs_.index_of(&socket); + params_->set_input_unused(graph_input_index); + } + else { + this->schedule_node(locked_node); + } + } + } + }); + } + + void schedule_node(LockedNode &locked_node) + { + BLI_assert(locked_node.node.is_function()); + switch (locked_node.node_state.schedule_state) { + case NodeScheduleState::NotScheduled: { + /* Don't add the node to the task pool immediately, because the task pool might start + * executing it immediately (when Blender is started with a single thread). + * That would often result in a deadlock, because we are still holding the mutex of the + * current node. Also see comments in #LockedNode. */ + locked_node.node_state.schedule_state = NodeScheduleState::Scheduled; + locked_node.delayed_scheduled_nodes.append( + &static_cast<const FunctionNode &>(locked_node.node)); + break; + } + case NodeScheduleState::Scheduled: { + break; + } + case NodeScheduleState::Running: { + locked_node.node_state.schedule_state = NodeScheduleState::RunningAndRescheduled; + break; + } + case NodeScheduleState::RunningAndRescheduled: { + break; + } + } + } + + void with_locked_node(const Node &node, + NodeState &node_state, + CurrentTask ¤t_task, + const FunctionRef<void(LockedNode &)> f) + { + BLI_assert(&node_state == node_states_[node.index_in_graph()]); + + LockedNode locked_node{node, node_state}; + { + std::lock_guard lock{node_state.mutex}; + threading::isolate_task([&]() { f(locked_node); }); + } + + this->send_output_required_notifications(locked_node.delayed_required_outputs, current_task); + this->send_output_unused_notifications(locked_node.delayed_unused_outputs, current_task); + this->schedule_new_nodes(locked_node.delayed_scheduled_nodes, current_task); + } + + void send_output_required_notifications(const Span<const OutputSocket *> sockets, + CurrentTask ¤t_task) + { + for (const OutputSocket *socket : sockets) { + this->notify_output_required(*socket, current_task); + } + } + + void send_output_unused_notifications(const Span<const OutputSocket *> sockets, + CurrentTask ¤t_task) + { + for (const OutputSocket *socket : sockets) { + this->notify_output_unused(*socket, current_task); + } + } + + void schedule_new_nodes(const Span<const FunctionNode *> nodes, CurrentTask ¤t_task) + { + for (const FunctionNode *node_to_schedule : nodes) { + /* Avoid a round trip through the task pool for the first node that is scheduled by the + * current node execution. Other nodes are added to the pool so that other threads can pick + * them up. */ + const FunctionNode *expected = nullptr; + if (current_task.next_node.compare_exchange_strong( + expected, node_to_schedule, std::memory_order_relaxed)) { + continue; + } + this->add_node_to_task_pool(*node_to_schedule); + current_task.added_node_to_pool.store(true, std::memory_order_relaxed); + } + } + + void add_node_to_task_pool(const Node &node) + { + BLI_task_pool_push( + task_pool_, Executor::run_node_from_task_pool, (void *)&node, false, nullptr); + } + + static void run_node_from_task_pool(TaskPool *task_pool, void *task_data) + { + void *user_data = BLI_task_pool_user_data(task_pool); + Executor &executor = *static_cast<Executor *>(user_data); + const FunctionNode &node = *static_cast<const FunctionNode *>(task_data); + + /* This loop reduces the number of round trips through the task pool as long as the current + * node is scheduling more nodes. */ + CurrentTask current_task; + current_task.next_node = &node; + while (current_task.next_node != nullptr) { + const FunctionNode &node_to_run = *current_task.next_node; + current_task.next_node = nullptr; + executor.run_node_task(node_to_run, current_task); + } + } + + void run_node_task(const FunctionNode &node, CurrentTask ¤t_task) + { + NodeState &node_state = *node_states_[node.index_in_graph()]; + LinearAllocator<> &allocator = local_allocators_.local(); + const LazyFunction &fn = node.function(); + + bool node_needs_execution = false; + this->with_locked_node(node, node_state, current_task, [&](LockedNode &locked_node) { + BLI_assert(node_state.schedule_state == NodeScheduleState::Scheduled); + node_state.schedule_state = NodeScheduleState::Running; + + if (node_state.node_has_finished) { + return; + } + + bool required_uncomputed_output_exists = false; + for (OutputState &output_state : node_state.outputs) { + output_state.usage_for_execution = output_state.usage; + if (output_state.usage == ValueUsage::Used && !output_state.has_been_computed) { + required_uncomputed_output_exists = true; + } + } + if (!required_uncomputed_output_exists && !node_state.has_side_effects) { + return; + } + + if (node_state.had_initialization) { + /* Initialize storage. */ + node_state.storage = fn.init_storage(allocator); + + /* Load unlinked inputs. */ + for (const int input_index : node.inputs().index_range()) { + const InputSocket &input_socket = node.input(input_index); + if (input_socket.origin() != nullptr) { + continue; + } + InputState &input_state = node_state.inputs[input_index]; + const CPPType &type = input_socket.type(); + const void *default_value = input_socket.default_value(); + BLI_assert(default_value != nullptr); + if (self_.logger_ != nullptr) { + self_.logger_->log_socket_value(input_socket, {type, default_value}, *context_); + } + void *buffer = allocator.allocate(type.size(), type.alignment()); + type.copy_construct(default_value, buffer); + this->forward_value_to_input(locked_node, input_state, {type, buffer}); + } + + /* Request linked inputs that are always needed. */ + const Span<Input> fn_inputs = fn.inputs(); + for (const int input_index : fn_inputs.index_range()) { + const Input &fn_input = fn_inputs[input_index]; + if (fn_input.usage == ValueUsage::Used) { + const InputSocket &input_socket = node.input(input_index); + this->set_input_required(locked_node, input_socket); + } + } + + node_state.had_initialization = false; + } + + for (const int input_index : node_state.inputs.index_range()) { + InputState &input_state = node_state.inputs[input_index]; + if (input_state.was_ready_for_execution) { + continue; + } + if (input_state.value != nullptr) { + input_state.was_ready_for_execution = true; + continue; + } + if (input_state.usage == ValueUsage::Used) { + return; + } + } + + node_needs_execution = true; + }); + + if (node_needs_execution) { + /* Importantly, the node must not be locked when it is executed. That would result in locks + * being hold very long in some cases and results in multiple locks being hold by the same + * thread in the same graph which can lead to deadlocks. */ + this->execute_node(node, node_state, current_task); + } + + this->with_locked_node(node, node_state, current_task, [&](LockedNode &locked_node) { +#ifdef DEBUG + if (node_needs_execution) { + this->assert_expected_outputs_have_been_computed(locked_node); + } +#endif + this->finish_node_if_possible(locked_node); + const bool reschedule_requested = node_state.schedule_state == + NodeScheduleState::RunningAndRescheduled; + node_state.schedule_state = NodeScheduleState::NotScheduled; + if (reschedule_requested && !node_state.node_has_finished) { + this->schedule_node(locked_node); + } + }); + } + + void assert_expected_outputs_have_been_computed(LockedNode &locked_node) + { + const FunctionNode &node = static_cast<const FunctionNode &>(locked_node.node); + const NodeState &node_state = locked_node.node_state; + + if (node_state.missing_required_inputs > 0) { + return; + } + if (node_state.schedule_state == NodeScheduleState::RunningAndRescheduled) { + return; + } + Vector<const OutputSocket *> missing_outputs; + for (const int i : node_state.outputs.index_range()) { + const OutputState &output_state = node_state.outputs[i]; + if (output_state.usage_for_execution == ValueUsage::Used) { + if (!output_state.has_been_computed) { + missing_outputs.append(&node.output(i)); + } + } + } + if (!missing_outputs.is_empty()) { + if (self_.logger_ != nullptr) { + self_.logger_->dump_when_outputs_are_missing(node, missing_outputs, *context_); + } + BLI_assert_unreachable(); + } + } + + void finish_node_if_possible(LockedNode &locked_node) + { + const Node &node = locked_node.node; + NodeState &node_state = locked_node.node_state; + + if (node_state.node_has_finished) { + /* Was finished already. */ + return; + } + /* If there are outputs that may still be used, the node is not done yet. */ + for (const OutputState &output_state : node_state.outputs) { + if (output_state.usage != ValueUsage::Unused && !output_state.has_been_computed) { + return; + } + } + /* If the node is still waiting for inputs, it is not done yet. */ + for (const InputState &input_state : node_state.inputs) { + if (input_state.usage == ValueUsage::Used && !input_state.was_ready_for_execution) { + return; + } + } + + node_state.node_has_finished = true; + + for (const int input_index : node_state.inputs.index_range()) { + const InputSocket &input_socket = node.input(input_index); + InputState &input_state = node_state.inputs[input_index]; + if (input_state.usage == ValueUsage::Maybe) { + this->set_input_unused(locked_node, input_socket); + } + else if (input_state.usage == ValueUsage::Used) { + this->destruct_input_value_if_exists(input_state, input_socket.type()); + } + } + + if (node_state.storage != nullptr) { + if (node.is_function()) { + const FunctionNode &fn_node = static_cast<const FunctionNode &>(node); + fn_node.function().destruct_storage(node_state.storage); + } + node_state.storage = nullptr; + } + } + + void destruct_input_value_if_exists(InputState &input_state, const CPPType &type) + { + if (input_state.value != nullptr) { + type.destruct(input_state.value); + input_state.value = nullptr; + } + } + + void execute_node(const FunctionNode &node, NodeState &node_state, CurrentTask ¤t_task); + + void set_input_unused_during_execution(const Node &node, + NodeState &node_state, + const int input_index, + CurrentTask ¤t_task) + { + const InputSocket &input_socket = node.input(input_index); + this->with_locked_node(node, node_state, current_task, [&](LockedNode &locked_node) { + this->set_input_unused(locked_node, input_socket); + }); + } + + void set_input_unused(LockedNode &locked_node, const InputSocket &input_socket) + { + NodeState &node_state = locked_node.node_state; + const int input_index = input_socket.index(); + InputState &input_state = node_state.inputs[input_index]; + + BLI_assert(input_state.usage != ValueUsage::Used); + if (input_state.usage == ValueUsage::Unused) { + return; + } + input_state.usage = ValueUsage::Unused; + + this->destruct_input_value_if_exists(input_state, input_socket.type()); + if (input_state.was_ready_for_execution) { + return; + } + const OutputSocket *origin = input_socket.origin(); + if (origin != nullptr) { + locked_node.delayed_unused_outputs.append(origin); + } + } + + void *set_input_required_during_execution(const Node &node, + NodeState &node_state, + const int input_index, + CurrentTask ¤t_task) + { + const InputSocket &input_socket = node.input(input_index); + void *result; + this->with_locked_node(node, node_state, current_task, [&](LockedNode &locked_node) { + result = this->set_input_required(locked_node, input_socket); + }); + return result; + } + + void *set_input_required(LockedNode &locked_node, const InputSocket &input_socket) + { + BLI_assert(&locked_node.node == &input_socket.node()); + NodeState &node_state = locked_node.node_state; + const int input_index = input_socket.index(); + InputState &input_state = node_state.inputs[input_index]; + + BLI_assert(input_state.usage != ValueUsage::Unused); + + if (input_state.value != nullptr) { + input_state.was_ready_for_execution = true; + return input_state.value; + } + if (input_state.usage == ValueUsage::Used) { + return nullptr; + } + input_state.usage = ValueUsage::Used; + node_state.missing_required_inputs += 1; + + const OutputSocket *origin_socket = input_socket.origin(); + /* Unlinked inputs are always loaded in advance. */ + BLI_assert(origin_socket != nullptr); + locked_node.delayed_required_outputs.append(origin_socket); + return nullptr; + } + + void forward_value_to_linked_inputs(const OutputSocket &from_socket, + GMutablePointer value_to_forward, + CurrentTask ¤t_task) + { + BLI_assert(value_to_forward.get() != nullptr); + LinearAllocator<> &allocator = local_allocators_.local(); + const CPPType &type = *value_to_forward.type(); + + if (self_.logger_ != nullptr) { + self_.logger_->log_socket_value(from_socket, value_to_forward, *context_); + } + + const Span<const InputSocket *> targets = from_socket.targets(); + for (const InputSocket *target_socket : targets) { + const Node &target_node = target_socket->node(); + NodeState &node_state = *node_states_[target_node.index_in_graph()]; + const int input_index = target_socket->index(); + InputState &input_state = node_state.inputs[input_index]; + const bool is_last_target = target_socket == targets.last(); +#ifdef DEBUG + if (input_state.value != nullptr) { + if (self_.logger_ != nullptr) { + self_.logger_->dump_when_input_is_set_twice(*target_socket, from_socket, *context_); + } + BLI_assert_unreachable(); + } +#endif + BLI_assert(!input_state.was_ready_for_execution); + BLI_assert(target_socket->type() == type); + BLI_assert(target_socket->origin() == &from_socket); + + if (self_.logger_ != nullptr) { + self_.logger_->log_socket_value(*target_socket, value_to_forward, *context_); + } + if (target_node.is_dummy()) { + /* Forward the value to the outside of the graph. */ + const int graph_output_index = self_.graph_outputs_.index_of_try(target_socket); + if (graph_output_index != -1 && + params_->get_output_usage(graph_output_index) != ValueUsage::Unused) { + void *dst_buffer = params_->get_output_data_ptr(graph_output_index); + if (is_last_target) { + type.move_construct(value_to_forward.get(), dst_buffer); + } + else { + type.copy_construct(value_to_forward.get(), dst_buffer); + } + params_->output_set(graph_output_index); + } + continue; + } + this->with_locked_node(target_node, node_state, current_task, [&](LockedNode &locked_node) { + if (input_state.usage == ValueUsage::Unused) { + return; + } + if (is_last_target) { + /* No need to make a copy if this is the last target. */ + this->forward_value_to_input(locked_node, input_state, value_to_forward); + value_to_forward = {}; + } + else { + void *buffer = allocator.allocate(type.size(), type.alignment()); + type.copy_construct(value_to_forward.get(), buffer); + this->forward_value_to_input(locked_node, input_state, {type, buffer}); + } + }); + } + if (value_to_forward.get() != nullptr) { + value_to_forward.destruct(); + } + } + + void forward_value_to_input(LockedNode &locked_node, + InputState &input_state, + GMutablePointer value) + { + NodeState &node_state = locked_node.node_state; + + BLI_assert(input_state.value == nullptr); + BLI_assert(!input_state.was_ready_for_execution); + input_state.value = value.get(); + + if (input_state.usage == ValueUsage::Used) { + node_state.missing_required_inputs -= 1; + if (node_state.missing_required_inputs == 0) { + this->schedule_node(locked_node); + } + } + } +}; + +class GraphExecutorLFParams final : public Params { + private: + Executor &executor_; + const Node &node_; + NodeState &node_state_; + CurrentTask ¤t_task_; + + public: + GraphExecutorLFParams(const LazyFunction &fn, + Executor &executor, + const Node &node, + NodeState &node_state, + CurrentTask ¤t_task) + : Params(fn), + executor_(executor), + node_(node), + node_state_(node_state), + current_task_(current_task) + { + } + + private: + void *try_get_input_data_ptr_impl(const int index) const override + { + const InputState &input_state = node_state_.inputs[index]; + if (input_state.was_ready_for_execution) { + return input_state.value; + } + return nullptr; + } + + void *try_get_input_data_ptr_or_request_impl(const int index) override + { + const InputState &input_state = node_state_.inputs[index]; + if (input_state.was_ready_for_execution) { + return input_state.value; + } + return executor_.set_input_required_during_execution(node_, node_state_, index, current_task_); + } + + void *get_output_data_ptr_impl(const int index) override + { + OutputState &output_state = node_state_.outputs[index]; + BLI_assert(!output_state.has_been_computed); + if (output_state.value == nullptr) { + LinearAllocator<> &allocator = executor_.local_allocators_.local(); + const CPPType &type = node_.output(index).type(); + output_state.value = allocator.allocate(type.size(), type.alignment()); + } + return output_state.value; + } + + void output_set_impl(const int index) override + { + OutputState &output_state = node_state_.outputs[index]; + BLI_assert(!output_state.has_been_computed); + BLI_assert(output_state.value != nullptr); + const OutputSocket &output_socket = node_.output(index); + executor_.forward_value_to_linked_inputs( + output_socket, {output_socket.type(), output_state.value}, current_task_); + output_state.value = nullptr; + output_state.has_been_computed = true; + } + + bool output_was_set_impl(const int index) const override + { + const OutputState &output_state = node_state_.outputs[index]; + return output_state.has_been_computed; + } + + ValueUsage get_output_usage_impl(const int index) const override + { + const OutputState &output_state = node_state_.outputs[index]; + return output_state.usage_for_execution; + } + + void set_input_unused_impl(const int index) override + { + executor_.set_input_unused_during_execution(node_, node_state_, index, current_task_); + } +}; + +/** + * Actually execute the node. + * + * Making this `inline` results in a simpler back-trace in release builds. + */ +inline void Executor::execute_node(const FunctionNode &node, + NodeState &node_state, + CurrentTask ¤t_task) +{ + const LazyFunction &fn = node.function(); + GraphExecutorLFParams node_params{fn, *this, node, node_state, current_task}; + BLI_assert(context_ != nullptr); + Context fn_context = *context_; + fn_context.storage = node_state.storage; + + if (self_.logger_ != nullptr) { + self_.logger_->log_before_node_execute(node, node_params, fn_context); + } + + fn.execute(node_params, fn_context); + + if (self_.logger_ != nullptr) { + self_.logger_->log_after_node_execute(node, node_params, fn_context); + } +} + +GraphExecutor::GraphExecutor(const Graph &graph, + const Span<const OutputSocket *> graph_inputs, + const Span<const InputSocket *> graph_outputs, + const Logger *logger, + const SideEffectProvider *side_effect_provider) + : graph_(graph), + graph_inputs_(graph_inputs), + graph_outputs_(graph_outputs), + logger_(logger), + side_effect_provider_(side_effect_provider) +{ + for (const OutputSocket *socket : graph_inputs_) { + BLI_assert(socket->node().is_dummy()); + inputs_.append({"In", socket->type(), ValueUsage::Maybe}); + } + for (const InputSocket *socket : graph_outputs_) { + BLI_assert(socket->node().is_dummy()); + outputs_.append({"Out", socket->type()}); + } +} + +void GraphExecutor::execute_impl(Params ¶ms, const Context &context) const +{ + Executor &executor = *static_cast<Executor *>(context.storage); + executor.execute(params, context); +} + +void *GraphExecutor::init_storage(LinearAllocator<> &allocator) const +{ + Executor &executor = *allocator.construct<Executor>(*this).release(); + return &executor; +} + +void GraphExecutor::destruct_storage(void *storage) const +{ + std::destroy_at(static_cast<Executor *>(storage)); +} + +void GraphExecutorLogger::log_socket_value(const Socket &socket, + const GPointer value, + const Context &context) const +{ + UNUSED_VARS(socket, value, context); +} + +void GraphExecutorLogger::log_before_node_execute(const FunctionNode &node, + const Params ¶ms, + const Context &context) const +{ + UNUSED_VARS(node, params, context); +} + +void GraphExecutorLogger::log_after_node_execute(const FunctionNode &node, + const Params ¶ms, + const Context &context) const +{ + UNUSED_VARS(node, params, context); +} + +Vector<const FunctionNode *> GraphExecutorSideEffectProvider::get_nodes_with_side_effects( + const Context &context) const +{ + UNUSED_VARS(context); + return {}; +} + +void GraphExecutorLogger::dump_when_outputs_are_missing(const FunctionNode &node, + Span<const OutputSocket *> missing_sockets, + const Context &context) const +{ + UNUSED_VARS(node, missing_sockets, context); +} + +void GraphExecutorLogger::dump_when_input_is_set_twice(const InputSocket &target_socket, + const OutputSocket &from_socket, + const Context &context) const +{ + UNUSED_VARS(target_socket, from_socket, context); +} + +} // namespace blender::fn::lazy_function |