From e50f1ddc6540680d2aafc1c76f8339d69350f84a Mon Sep 17 00:00:00 2001 From: Brecht Van Lommel Date: Fri, 5 Jun 2020 16:39:57 +0200 Subject: Cycles: use TBB for task pools and task scheduler No significant performance improvement is expected, but it means we have a single thread pool throughout Blender. And it should make adding more parallellization in the future easier. After previous refactoring commits this is basically a drop-in replacement. One difference is that the task pool had a mechanism for scheduling tasks to the front of the queue to minimize memory usage. TBB has a smarter algorithm to balance depth-first and breadth-first scheduling of tasks and we assume that removes the need to manually provide hints to the scheduler. Fixes T77533 --- intern/cycles/bvh/bvh_build.cpp | 20 +-- intern/cycles/bvh/bvh_sort.cpp | 2 +- intern/cycles/device/device.cpp | 2 +- intern/cycles/device/device.h | 3 +- intern/cycles/util/util_task.cpp | 362 +++------------------------------------ intern/cycles/util/util_task.h | 56 ++---- 6 files changed, 49 insertions(+), 396 deletions(-) (limited to 'intern') diff --git a/intern/cycles/bvh/bvh_build.cpp b/intern/cycles/bvh/bvh_build.cpp index 116576b101d..0235ac33c77 100644 --- a/intern/cycles/bvh/bvh_build.cpp +++ b/intern/cycles/bvh/bvh_build.cpp @@ -626,8 +626,8 @@ BVHNode *BVHBuild::build_node(const BVHObjectBinning &range, int level) /* Threaded build */ inner = new InnerNode(bounds); - task_pool.push([=] { thread_build_node(inner, 0, left, level + 1); }, true); - task_pool.push([=] { thread_build_node(inner, 1, right, level + 1); }, true); + task_pool.push([=] { thread_build_node(inner, 0, left, level + 1); }); + task_pool.push([=] { thread_build_node(inner, 1, right, level + 1); }); } if (do_unalinged_split) { @@ -742,16 +742,12 @@ BVHNode *BVHBuild::build_node(const BVHRange &range, /* Create tasks for left and right nodes, using copy for most arguments and * move for reference to avoid memory copies. */ - task_pool.push( - [=, refs = std::move(left_references)]() mutable { - thread_build_spatial_split_node(inner, 0, left, refs, level + 1); - }, - true); - task_pool.push( - [=, refs = std::move(right_references)]() mutable { - thread_build_spatial_split_node(inner, 1, right, refs, level + 1); - }, - true); + task_pool.push([=, refs = std::move(left_references)]() mutable { + thread_build_spatial_split_node(inner, 0, left, refs, level + 1); + }); + task_pool.push([=, refs = std::move(right_references)]() mutable { + thread_build_spatial_split_node(inner, 1, right, refs, level + 1); + }); } if (do_unalinged_split) { diff --git a/intern/cycles/bvh/bvh_sort.cpp b/intern/cycles/bvh/bvh_sort.cpp index 5bdded354bc..b01785b547a 100644 --- a/intern/cycles/bvh/bvh_sort.cpp +++ b/intern/cycles/bvh/bvh_sort.cpp @@ -147,7 +147,7 @@ static void bvh_reference_sort_threaded(TaskPool *task_pool, if (left < end) { if (start < right) { task_pool->push( - function_bind(bvh_reference_sort_threaded, task_pool, data, left, end, compare), true); + function_bind(bvh_reference_sort_threaded, task_pool, data, left, end, compare)); } else { start = left; diff --git a/intern/cycles/device/device.cpp b/intern/cycles/device/device.cpp index 41dd7894d93..263d3d24b13 100644 --- a/intern/cycles/device/device.cpp +++ b/intern/cycles/device/device.cpp @@ -77,7 +77,7 @@ std::ostream &operator<<(std::ostream &os, const DeviceRequestedFeatures &reques /* Device */ -Device::~Device() +Device::~Device() noexcept(false) { if (!background) { if (vertex_buffer != 0) { diff --git a/intern/cycles/device/device.h b/intern/cycles/device/device.h index dff981080a5..67828103394 100644 --- a/intern/cycles/device/device.h +++ b/intern/cycles/device/device.h @@ -319,7 +319,8 @@ class Device { virtual void mem_free_sub_ptr(device_ptr /*ptr*/){}; public: - virtual ~Device(); + /* noexcept needed to silence TBB warning. */ + virtual ~Device() noexcept(false); /* info */ DeviceInfo info; diff --git a/intern/cycles/util/util_task.cpp b/intern/cycles/util/util_task.cpp index eb07ec0bfa0..4fb61392e92 100644 --- a/intern/cycles/util/util_task.cpp +++ b/intern/cycles/util/util_task.cpp @@ -20,28 +20,12 @@ #include "util/util_system.h" #include "util/util_time.h" -//#define THREADING_DEBUG_ENABLED - -#ifdef THREADING_DEBUG_ENABLED -# include -# define THREADING_DEBUG(...) \ - do { \ - printf(__VA_ARGS__); \ - fflush(stdout); \ - } while (0) -#else -# define THREADING_DEBUG(...) -#endif - CCL_NAMESPACE_BEGIN /* Task Pool */ -TaskPool::TaskPool() +TaskPool::TaskPool() : start_time(time_dt()), num_tasks_handled(0) { - num_tasks_handled = 0; - num = 0; - do_cancel = false; } TaskPool::~TaskPool() @@ -49,66 +33,15 @@ TaskPool::~TaskPool() cancel(); } -void TaskPool::push(TaskRunFunction &&task, bool front) +void TaskPool::push(TaskRunFunction &&task) { - TaskScheduler::Entry entry; - - entry.task = new TaskRunFunction(std::move(task)); - entry.pool = this; - - TaskScheduler::push(entry, front); + tbb_group.run(std::move(task)); + num_tasks_handled++; } void TaskPool::wait_work(Summary *stats) { - thread_scoped_lock num_lock(num_mutex); - - while (num != 0) { - num_lock.unlock(); - - thread_scoped_lock queue_lock(TaskScheduler::queue_mutex); - - /* find task from this pool. if we get a task from another pool, - * we can get into deadlock */ - TaskScheduler::Entry work_entry; - bool found_entry = false; - list::iterator it; - - for (it = TaskScheduler::queue.begin(); it != TaskScheduler::queue.end(); it++) { - TaskScheduler::Entry &entry = *it; - - if (entry.pool == this) { - work_entry = entry; - found_entry = true; - TaskScheduler::queue.erase(it); - break; - } - } - - queue_lock.unlock(); - - /* if found task, do it, otherwise wait until other tasks are done */ - if (found_entry) { - /* run task */ - (*work_entry.task)(); - - /* delete task */ - delete work_entry.task; - - /* notify pool task was done */ - num_decrease(1); - } - - num_lock.lock(); - if (num == 0) - break; - - if (!found_entry) { - THREADING_DEBUG("num==%d, Waiting for condition in TaskPool::wait_work !found_entry\n", num); - num_cond.wait(num_lock); - THREADING_DEBUG("num==%d, condition wait done in TaskPool::wait_work !found_entry\n", num); - } - } + tbb_group.wait(); if (stats != NULL) { stats->time_total = time_dt() - start_time; @@ -118,180 +51,21 @@ void TaskPool::wait_work(Summary *stats) void TaskPool::cancel() { - do_cancel = true; - - TaskScheduler::clear(this); - - { - thread_scoped_lock num_lock(num_mutex); - - while (num) { - THREADING_DEBUG("num==%d, Waiting for condition in TaskPool::cancel\n", num); - num_cond.wait(num_lock); - THREADING_DEBUG("num==%d condition wait done in TaskPool::cancel\n", num); - } - } - - do_cancel = false; + tbb_group.cancel(); + tbb_group.wait(); } bool TaskPool::canceled() { - return do_cancel; -} - -void TaskPool::num_decrease(int done) -{ - num_mutex.lock(); - num -= done; - - assert(num >= 0); - if (num == 0) { - THREADING_DEBUG("num==%d, notifying all in TaskPool::num_decrease\n", num); - num_cond.notify_all(); - } - - num_mutex.unlock(); -} - -void TaskPool::num_increase() -{ - thread_scoped_lock num_lock(num_mutex); - if (num_tasks_handled == 0) { - start_time = time_dt(); - } - num++; - num_tasks_handled++; - THREADING_DEBUG("num==%d, notifying all in TaskPool::num_increase\n", num); - num_cond.notify_all(); + return tbb_group.is_canceling(); } /* Task Scheduler */ thread_mutex TaskScheduler::mutex; int TaskScheduler::users = 0; -vector TaskScheduler::threads; -bool TaskScheduler::do_exit = false; - -list TaskScheduler::queue; -thread_mutex TaskScheduler::queue_mutex; -thread_condition_variable TaskScheduler::queue_cond; - -namespace { - -/* Get number of processors on each of the available nodes. The result is sized - * by the highest node index, and element corresponds to number of processors on - * that node. - * If node is not available, then the corresponding number of processors is - * zero. */ -void get_per_node_num_processors(vector *num_per_node_processors) -{ - const int num_nodes = system_cpu_num_numa_nodes(); - if (num_nodes == 0) { - LOG(ERROR) << "Zero available NUMA nodes, is not supposed to happen."; - return; - } - num_per_node_processors->resize(num_nodes); - for (int node = 0; node < num_nodes; ++node) { - if (!system_cpu_is_numa_node_available(node)) { - (*num_per_node_processors)[node] = 0; - continue; - } - (*num_per_node_processors)[node] = system_cpu_num_numa_node_processors(node); - } -} - -/* Calculate total number of processors on all available nodes. - * This is similar to system_cpu_thread_count(), but uses pre-calculated number - * of processors on each of the node, avoiding extra system calls and checks for - * the node availability. */ -int get_num_total_processors(const vector &num_per_node_processors) -{ - int num_total_processors = 0; - foreach (int num_node_processors, num_per_node_processors) { - num_total_processors += num_node_processors; - } - return num_total_processors; -} - -/* Compute NUMA node for every thread to run on, for the best performance. */ -vector distribute_threads_on_nodes(const int num_threads) -{ - /* Start with all threads unassigned to any specific NUMA node. */ - vector thread_nodes(num_threads, -1); - const int num_active_group_processors = system_cpu_num_active_group_processors(); - VLOG(1) << "Detected " << num_active_group_processors << " processors " - << "in active group."; - if (num_active_group_processors >= num_threads) { - /* If the current thread is set up in a way that its affinity allows to - * use at least requested number of threads we do not explicitly set - * affinity to the worker threads. - * This way we allow users to manually edit affinity of the parent - * thread, and here we follow that affinity. This way it's possible to - * have two Cycles/Blender instances running manually set to a different - * dies on a CPU. */ - VLOG(1) << "Not setting thread group affinity."; - return thread_nodes; - } - vector num_per_node_processors; - get_per_node_num_processors(&num_per_node_processors); - if (num_per_node_processors.size() == 0) { - /* Error was already reported, here we can't do anything, so we simply - * leave default affinity to all the worker threads. */ - return thread_nodes; - } - const int num_nodes = num_per_node_processors.size(); - int thread_index = 0; - /* First pass: fill in all the nodes to their maximum. - * - * If there is less threads than the overall nodes capacity, some of the - * nodes or parts of them will idle. - * - * TODO(sergey): Consider picking up fastest nodes if number of threads - * fits on them. For example, on Threadripper2 we might consider using nodes - * 0 and 2 if user requested 32 render threads. */ - const int num_total_node_processors = get_num_total_processors(num_per_node_processors); - int current_node_index = 0; - while (thread_index < num_total_node_processors && thread_index < num_threads) { - const int num_node_processors = num_per_node_processors[current_node_index]; - for (int processor_index = 0; processor_index < num_node_processors; ++processor_index) { - VLOG(1) << "Scheduling thread " << thread_index << " to node " << current_node_index << "."; - thread_nodes[thread_index] = current_node_index; - ++thread_index; - if (thread_index == num_threads) { - /* All threads are scheduled on their nodes. */ - return thread_nodes; - } - } - ++current_node_index; - } - /* Second pass: keep scheduling threads to each node one by one, - * uniformly filling them in. - * This is where things becomes tricky to predict for the maximum - * performance: on the one hand this avoids too much threading overhead on - * few nodes, but for the final performance having all the overhead on one - * node might be better idea (since other nodes will have better chance of - * rendering faster). - * But more tricky is that nodes might have difference capacity, so we might - * want to do some weighted scheduling. For example, if node 0 has 16 - * processors and node 1 has 32 processors, we'd better schedule 1 extra - * thread on node 0 and 2 extra threads on node 1. */ - current_node_index = 0; - while (thread_index < num_threads) { - /* Skip unavailable nodes. */ - /* TODO(sergey): Add sanity check against deadlock. */ - while (num_per_node_processors[current_node_index] == 0) { - current_node_index = (current_node_index + 1) % num_nodes; - } - VLOG(1) << "Scheduling thread " << thread_index << " to node " << current_node_index << "."; - ++thread_index; - current_node_index = (current_node_index + 1) % num_nodes; - } - - return thread_nodes; -} - -} // namespace +int TaskScheduler::active_num_threads = 0; +tbb::global_control *TaskScheduler::global_control = nullptr; void TaskScheduler::init(int num_threads) { @@ -302,22 +76,15 @@ void TaskScheduler::init(int num_threads) if (users != 1) { return; } - do_exit = false; - const bool use_auto_threads = (num_threads == 0); - if (use_auto_threads) { + if (num_threads > 0) { /* Automatic number of threads. */ - num_threads = system_cpu_thread_count(); + VLOG(1) << "Overriding number of TBB threads to " << num_threads << "."; + global_control = new tbb::global_control(tbb::global_control::max_allowed_parallelism, + num_threads); + active_num_threads = num_threads; } - VLOG(1) << "Creating pool of " << num_threads << " threads."; - - /* Compute distribution on NUMA nodes. */ - vector thread_nodes = distribute_threads_on_nodes(num_threads); - - /* Launch threads that will be waiting for work. */ - threads.resize(num_threads); - for (int thread_index = 0; thread_index < num_threads; ++thread_index) { - threads[thread_index] = new thread(function_bind(&TaskScheduler::thread_run), - thread_nodes[thread_index]); + else { + active_num_threads = system_cpu_thread_count(); } } @@ -326,105 +93,20 @@ void TaskScheduler::exit() thread_scoped_lock lock(mutex); users--; if (users == 0) { - VLOG(1) << "De-initializing thread pool of task scheduler."; - /* stop all waiting threads */ - TaskScheduler::queue_mutex.lock(); - do_exit = true; - TaskScheduler::queue_cond.notify_all(); - TaskScheduler::queue_mutex.unlock(); - - /* delete threads */ - foreach (thread *t, threads) { - t->join(); - delete t; - } - threads.clear(); + delete global_control; + global_control = nullptr; + active_num_threads = 0; } } void TaskScheduler::free_memory() { assert(users == 0); - threads.free_memory(); -} - -bool TaskScheduler::thread_wait_pop(Entry &entry) -{ - thread_scoped_lock queue_lock(queue_mutex); - - while (queue.empty() && !do_exit) - queue_cond.wait(queue_lock); - - if (queue.empty()) { - assert(do_exit); - return false; - } - - entry = queue.front(); - queue.pop_front(); - - return true; } -void TaskScheduler::thread_run() +int TaskScheduler::num_threads() { - Entry entry; - - /* todo: test affinity/denormal mask */ - - /* keep popping off tasks */ - while (thread_wait_pop(entry)) { - /* run task */ - (*entry.task)(); - - /* delete task */ - delete entry.task; - - /* notify pool task was done */ - entry.pool->num_decrease(1); - } -} - -void TaskScheduler::push(Entry &entry, bool front) -{ - entry.pool->num_increase(); - - /* add entry to queue */ - TaskScheduler::queue_mutex.lock(); - if (front) - TaskScheduler::queue.push_front(entry); - else - TaskScheduler::queue.push_back(entry); - - TaskScheduler::queue_cond.notify_one(); - TaskScheduler::queue_mutex.unlock(); -} - -void TaskScheduler::clear(TaskPool *pool) -{ - thread_scoped_lock queue_lock(TaskScheduler::queue_mutex); - - /* erase all tasks from this pool from the queue */ - list::iterator it = queue.begin(); - int done = 0; - - while (it != queue.end()) { - Entry &entry = *it; - - if (entry.pool == pool) { - done++; - delete entry.task; - - it = queue.erase(it); - } - else - it++; - } - - queue_lock.unlock(); - - /* notify done */ - pool->num_decrease(done); + return active_num_threads; } /* Dedicated Task Pool */ diff --git a/intern/cycles/util/util_task.h b/intern/cycles/util/util_task.h index a7232e68f60..ef1d2d70800 100644 --- a/intern/cycles/util/util_task.h +++ b/intern/cycles/util/util_task.h @@ -25,6 +25,10 @@ #define TBB_SUPPRESS_DEPRECATED_MESSAGES 1 #include +#if TBB_INTERFACE_VERSION_MAJOR >= 10 +# define WITH_TBB_GLOBAL_CONTROL +#endif + CCL_NAMESPACE_BEGIN using tbb::blocked_range; @@ -62,24 +66,15 @@ class TaskPool { TaskPool(); ~TaskPool(); - void push(TaskRunFunction &&task, bool front = false); + void push(TaskRunFunction &&task); void wait_work(Summary *stats = NULL); /* work and wait until all tasks are done */ - void cancel(); /* cancel all tasks, keep worker threads running */ + void cancel(); /* cancel all tasks and wait until they are no longer executing */ bool canceled(); /* for worker threads, test if canceled */ protected: - friend class TaskScheduler; - - void num_decrease(int done); - void num_increase(); - - thread_mutex num_mutex; - thread_condition_variable num_cond; - - int num; - bool do_cancel; + tbb::task_group tbb_group; /* ** Statistics ** */ @@ -101,40 +96,19 @@ class TaskScheduler { static void exit(); static void free_memory(); - /* number of threads that can work on task */ - static int num_threads() - { - return threads.size(); - } - - /* test if any session is using the scheduler */ - static bool active() - { - return users != 0; - } + /* Approximate number of threads that will work on task, which may be lower + * or higher than the actual number of threads. Use as little as possible and + * leave splitting up tasks to the scheduler.. */ + static int num_threads(); protected: - friend class TaskPool; - - struct Entry { - TaskRunFunction *task; - TaskPool *pool; - }; - static thread_mutex mutex; static int users; - static vector threads; - static bool do_exit; - - static list queue; - static thread_mutex queue_mutex; - static thread_condition_variable queue_cond; + static int active_num_threads; - static void thread_run(); - static bool thread_wait_pop(Entry &entry); - - static void push(Entry &entry, bool front); - static void clear(TaskPool *pool); +#ifdef WITH_TBB_GLOBAL_CONTROL + static tbb::global_control *global_control; +#endif }; /* Dedicated Task Pool -- cgit v1.2.3