diff options
Diffstat (limited to 'source/blender/blenlib/intern/task_pool.cc')
-rw-r--r-- | source/blender/blenlib/intern/task_pool.cc | 1143 |
1 files changed, 319 insertions, 824 deletions
diff --git a/source/blender/blenlib/intern/task_pool.cc b/source/blender/blenlib/intern/task_pool.cc index 60ed156105c..da67412865b 100644 --- a/source/blender/blenlib/intern/task_pool.cc +++ b/source/blender/blenlib/intern/task_pool.cc @@ -17,731 +17,368 @@ /** \file * \ingroup bli * - * A generic task system which can be used for any task based subsystem. + * Task pool to run tasks in parallel. */ +#include <memory> #include <stdlib.h> +#include <utility> #include "MEM_guardedalloc.h" #include "DNA_listBase.h" -#include "BLI_listbase.h" #include "BLI_math.h" #include "BLI_mempool.h" #include "BLI_task.h" #include "BLI_threads.h" -#include "atomic_ops.h" - -/* Define this to enable some detailed statistic print. */ -#undef DEBUG_STATS - -/* Types */ - -/* Number of per-thread pre-allocated tasks. - * - * For more details see description of TaskMemPool. - */ -#define MEMPOOL_SIZE 256 - -/* Number of tasks which are pushed directly to local thread queue. - * - * This allows thread to fetch next task without locking the whole queue. - */ -#define LOCAL_QUEUE_SIZE 1 - -/* Number of tasks which are allowed to be scheduled in a delayed manner. - * - * This allows to use less locks per graph node children schedule. More details - * could be found at TaskThreadLocalStorage::do_delayed_push. - */ -#define DELAYED_QUEUE_SIZE 4096 - -#ifndef NDEBUG -# define ASSERT_THREAD_ID(scheduler, thread_id) \ - do { \ - if (!BLI_thread_is_main()) { \ - TaskThread *thread = (TaskThread *)pthread_getspecific(scheduler->tls_id_key); \ - if (thread == NULL) { \ - BLI_assert(thread_id == 0); \ - } \ - else { \ - BLI_assert(thread_id == thread->id); \ - } \ - } \ - else { \ - BLI_assert(thread_id == 0); \ - } \ - } while (false) -#else -# define ASSERT_THREAD_ID(scheduler, thread_id) +#ifdef WITH_TBB +/* Quiet top level deprecation message, unrelated to API usage here. */ +# define TBB_SUPPRESS_DEPRECATED_MESSAGES 1 +# include <tbb/tbb.h> #endif -typedef struct Task { - struct Task *next, *prev; +/* Task + * + * Unit of work to execute. This is a C++ class to work with TBB. */ +class Task { + public: + TaskPool *pool; TaskRunFunction run; void *taskdata; bool free_taskdata; TaskFreeFunction freedata; - TaskPool *pool; -} Task; -/* This is a per-thread storage of pre-allocated tasks. - * - * The idea behind this is simple: reduce amount of malloc() calls when pushing - * new task to the pool. This is done by keeping memory from the tasks which - * were finished already, so instead of freeing that memory we put it to the - * pool for the later re-use. - * - * The tricky part here is to avoid any inter-thread synchronization, hence no - * lock must exist around this pool. The pool will become an owner of the pointer - * from freed task, and only corresponding thread will be able to use this pool - * (no memory stealing and such). - * - * This leads to the following use of the pool: - * - * - task_push() should provide proper thread ID from which the task is being - * pushed from. - * - * - Task allocation function which check corresponding memory pool and if there - * is any memory in there it'll mark memory as re-used, remove it from the pool - * and use that memory for the new task. - * - * At this moment task queue owns the memory. - * - * - When task is done and task_free() is called the memory will be put to the - * pool which corresponds to a thread which handled the task. - */ -typedef struct TaskMemPool { - /* Number of pre-allocated tasks in the pool. */ - int num_tasks; - /* Pre-allocated task memory pointers. */ - Task *tasks[MEMPOOL_SIZE]; -} TaskMemPool; - -#ifdef DEBUG_STATS -typedef struct TaskMemPoolStats { - /* Number of allocations. */ - int num_alloc; - /* Number of avoided allocations (pointer was re-used from the pool). */ - int num_reuse; - /* Number of discarded memory due to pool saturation, */ - int num_discard; -} TaskMemPoolStats; -#endif - -typedef struct TaskThreadLocalStorage { - /* Memory pool for faster task allocation. - * The idea is to re-use memory of finished/discarded tasks by this thread. - */ - TaskMemPool task_mempool; - - /* Local queue keeps thread alive by keeping small amount of tasks ready - * to be picked up without causing global thread locks for synchronization. - */ - int num_local_queue; - Task *local_queue[LOCAL_QUEUE_SIZE]; - - /* Thread can be marked for delayed tasks push. This is helpful when it's - * know that lots of subsequent task pushed will happen from the same thread - * without "interrupting" for task execution. - * - * We try to accumulate as much tasks as possible in a local queue without - * any locks first, and then we push all of them into a scheduler's queue - * from within a single mutex lock. - */ - bool do_delayed_push; - int num_delayed_queue; - Task *delayed_queue[DELAYED_QUEUE_SIZE]; -} TaskThreadLocalStorage; - -struct TaskPool { - TaskScheduler *scheduler; - - volatile size_t num; - ThreadMutex num_mutex; - ThreadCondition num_cond; - - void *userdata; - ThreadMutex user_mutex; + Task(TaskPool *pool, + TaskRunFunction run, + void *taskdata, + bool free_taskdata, + TaskFreeFunction freedata) + : pool(pool), run(run), taskdata(taskdata), free_taskdata(free_taskdata), freedata(freedata) + { + } - volatile bool do_cancel; - volatile bool do_work; + ~Task() + { + if (free_taskdata) { + if (freedata) { + freedata(pool, taskdata); + } + else { + MEM_freeN(taskdata); + } + } + } - volatile bool is_suspended; - bool start_suspended; - ListBase suspended_queue; - size_t num_suspended; + /* Move constructor. */ + Task(Task &&other) + : pool(other.pool), + run(other.run), + taskdata(other.taskdata), + free_taskdata(other.free_taskdata), + freedata(other.freedata) + { + other.pool = NULL; + other.run = NULL; + other.taskdata = NULL; + other.free_taskdata = false; + other.freedata = NULL; + } - TaskPriority priority; + /* Execute task. */ + void operator()() const + { + run(pool, taskdata); + } - /* If set, this pool may never be work_and_wait'ed, which means TaskScheduler - * has to use its special background fallback thread in case we are in - * single-threaded situation. - */ - bool run_in_background; + /* For performance, ensure we never copy the task and only move it. */ + Task(const Task &other) = delete; + Task &operator=(const Task &other) = delete; + Task &operator=(Task &&other) = delete; +}; - /* This is a task scheduler's ID of a thread at which pool was constructed. - * It will be used to access task TLS. - */ - int thread_id; +/* TBB Task Group. + * + * Subclass since there seems to be no other way to set priority. */ + +#ifdef WITH_TBB +class TBBTaskGroup : public tbb::task_group { + public: + TBBTaskGroup(TaskPriority priority) + { + switch (priority) { + case TASK_PRIORITY_LOW: + my_context.set_priority(tbb::priority_low); + break; + case TASK_PRIORITY_HIGH: + my_context.set_priority(tbb::priority_normal); + break; + } + } - /* For the pools which are created from non-main thread which is not a - * scheduler worker thread we can't re-use any of scheduler's threads TLS - * and have to use our own one. - */ - bool use_local_tls; - TaskThreadLocalStorage local_tls; -#ifndef NDEBUG - pthread_t creator_thread_id; + ~TBBTaskGroup() + { + } +}; #endif -#ifdef DEBUG_STATS - TaskMemPoolStats *mempool_stats; -#endif -}; +/* Task Pool */ -struct TaskScheduler { - pthread_t *threads; - struct TaskThread *task_threads; - int num_threads; - bool background_thread_only; +typedef enum TaskPoolType { + TASK_POOL_TBB, + TASK_POOL_TBB_SUSPENDED, + TASK_POOL_NO_THREADS, + TASK_POOL_BACKGROUND, + TASK_POOL_BACKGROUND_SERIAL, +} TaskPoolType; - ListBase queue; - ThreadMutex queue_mutex; - ThreadCondition queue_cond; +struct TaskPool { + TaskPoolType type; + bool use_threads; - ThreadMutex startup_mutex; - ThreadCondition startup_cond; - volatile int num_thread_started; + ThreadMutex user_mutex; + void *userdata; - volatile bool do_exit; + /* TBB task pool. */ +#ifdef WITH_TBB + TBBTaskGroup tbb_group; +#endif + volatile bool is_suspended; + BLI_mempool *suspended_mempool; - /* NOTE: In pthread's TLS we store the whole TaskThread structure. */ - pthread_key_t tls_id_key; + /* Background task pool. */ + ListBase background_threads; + ThreadQueue *background_queue; + volatile bool background_is_canceling; }; -typedef struct TaskThread { - TaskScheduler *scheduler; - int id; - TaskThreadLocalStorage tls; -} TaskThread; - -/* Helper */ -BLI_INLINE void task_data_free(Task *task, const int UNUSED(thread_id)) -{ - if (task->free_taskdata) { - if (task->freedata) { - task->freedata(task->pool, task->taskdata); - } - else { - MEM_freeN(task->taskdata); - } - } -} - -BLI_INLINE void initialize_task_tls(TaskThreadLocalStorage *tls) -{ - memset(tls, 0, sizeof(TaskThreadLocalStorage)); -} +/* TBB Task Pool. + * + * Task pool using the TBB scheduler for tasks. When building without TBB + * support or running Blender with -t 1, this reverts to single threaded. + * + * Tasks may be suspended until in all are created, to make it possible to + * initialize data structures and create tasks in a single pass. */ -BLI_INLINE TaskThreadLocalStorage *get_task_tls(TaskPool *pool, const int thread_id) +static void tbb_task_pool_create(TaskPool *pool, TaskPriority priority) { - TaskScheduler *scheduler = pool->scheduler; - BLI_assert(thread_id >= 0); - BLI_assert(thread_id <= scheduler->num_threads); - if (pool->use_local_tls && thread_id == 0) { - BLI_assert(pool->thread_id == 0); - BLI_assert(!BLI_thread_is_main()); - BLI_assert(pthread_equal(pthread_self(), pool->creator_thread_id)); - return &pool->local_tls; - } - if (thread_id == 0) { - BLI_assert(BLI_thread_is_main()); - return &scheduler->task_threads[pool->thread_id].tls; + if (pool->type == TASK_POOL_TBB_SUSPENDED) { + pool->is_suspended = true; + pool->suspended_mempool = BLI_mempool_create(sizeof(Task), 512, 512, BLI_MEMPOOL_ALLOW_ITER); } - return &scheduler->task_threads[thread_id].tls; -} -BLI_INLINE void free_task_tls(TaskThreadLocalStorage *tls) -{ - TaskMemPool *task_mempool = &tls->task_mempool; - for (int i = 0; i < task_mempool->num_tasks; i++) { - MEM_freeN(task_mempool->tasks[i]); +#ifdef WITH_TBB + if (pool->use_threads) { + new (&pool->tbb_group) TBBTaskGroup(priority); } -} - -static Task *task_alloc(TaskPool *pool, const int thread_id) -{ - BLI_assert(thread_id <= pool->scheduler->num_threads); - if (thread_id != -1) { - BLI_assert(thread_id >= 0); - BLI_assert(thread_id <= pool->scheduler->num_threads); - TaskThreadLocalStorage *tls = get_task_tls(pool, thread_id); - TaskMemPool *task_mempool = &tls->task_mempool; - /* Try to re-use task memory from a thread local storage. */ - if (task_mempool->num_tasks > 0) { - --task_mempool->num_tasks; - /* Success! We've just avoided task allocation. */ -#ifdef DEBUG_STATS - pool->mempool_stats[thread_id].num_reuse++; #endif - return task_mempool->tasks[task_mempool->num_tasks]; - } - /* We are doomed to allocate new task data. */ -#ifdef DEBUG_STATS - pool->mempool_stats[thread_id].num_alloc++; -#endif - } - return (Task *)MEM_mallocN(sizeof(Task), "New task"); } -static void task_free(TaskPool *pool, Task *task, const int thread_id) +static void tbb_task_pool_run(TaskPool *pool, Task &&task) { - task_data_free(task, thread_id); - BLI_assert(thread_id >= 0); - BLI_assert(thread_id <= pool->scheduler->num_threads); - if (thread_id == 0) { - BLI_assert(pool->use_local_tls || BLI_thread_is_main()); + if (pool->is_suspended) { + /* Suspended task that will be executed in work_and_wait(). */ + Task *task_mem = (Task *)BLI_mempool_alloc(pool->suspended_mempool); + new (task_mem) Task(std::move(task)); +#ifdef __GNUC__ + /* Work around apparent compiler bug where task is not properly copied + * to task_mem. This appears unrelated to the use of placement new or + * move semantics, happens even writing to a plain C struct. Rather the + * call into TBB seems to have some indirect effect. */ + std::atomic_thread_fence(std::memory_order_release); +#endif } - TaskThreadLocalStorage *tls = get_task_tls(pool, thread_id); - TaskMemPool *task_mempool = &tls->task_mempool; - if (task_mempool->num_tasks < MEMPOOL_SIZE - 1) { - /* Successfully allowed the task to be re-used later. */ - task_mempool->tasks[task_mempool->num_tasks] = task; - ++task_mempool->num_tasks; +#ifdef WITH_TBB + else if (pool->use_threads) { + /* Execute in TBB task group. */ + pool->tbb_group.run(std::move(task)); } - else { - /* Local storage saturated, no other way than just discard - * the memory. - * - * TODO(sergey): We can perhaps store such pointer in a global - * scheduler pool, maybe it'll be faster than discarding and - * allocating again. - */ - MEM_freeN(task); -#ifdef DEBUG_STATS - pool->mempool_stats[thread_id].num_discard++; #endif + else { + /* Execute immediately. */ + task(); } } -/* Task Scheduler */ - -static void task_pool_num_decrease(TaskPool *pool, size_t done) +static void tbb_task_pool_work_and_wait(TaskPool *pool) { - BLI_mutex_lock(&pool->num_mutex); - - BLI_assert(pool->num >= done); + /* Start any suspended task now. */ + if (pool->suspended_mempool) { + pool->is_suspended = false; - pool->num -= done; + BLI_mempool_iter iter; + BLI_mempool_iternew(pool->suspended_mempool, &iter); + while (Task *task = (Task *)BLI_mempool_iterstep(&iter)) { + tbb_task_pool_run(pool, std::move(*task)); + } - if (pool->num == 0) { - BLI_condition_notify_all(&pool->num_cond); + BLI_mempool_clear(pool->suspended_mempool); } - BLI_mutex_unlock(&pool->num_mutex); +#ifdef WITH_TBB + if (pool->use_threads) { + /* This is called wait(), but internally it can actually do work. This + * matters because we don't want recursive usage of task pools to run + * out of threads and get stuck. */ + pool->tbb_group.wait(); + } +#endif } -static void task_pool_num_increase(TaskPool *pool, size_t new_num) +static void tbb_task_pool_cancel(TaskPool *pool) { - BLI_mutex_lock(&pool->num_mutex); - - pool->num += new_num; - BLI_condition_notify_all(&pool->num_cond); - - BLI_mutex_unlock(&pool->num_mutex); +#ifdef WITH_TBB + if (pool->use_threads) { + pool->tbb_group.cancel(); + pool->tbb_group.wait(); + } +#endif } -static bool task_scheduler_thread_wait_pop(TaskScheduler *scheduler, Task **task) +static bool tbb_task_pool_canceled(TaskPool *pool) { - bool found_task = false; - BLI_mutex_lock(&scheduler->queue_mutex); - - while (!scheduler->queue.first && !scheduler->do_exit) { - BLI_condition_wait(&scheduler->queue_cond, &scheduler->queue_mutex); +#ifdef WITH_TBB + if (pool->use_threads) { + return pool->tbb_group.is_canceling(); } +#endif - do { - Task *current_task; - - /* Assuming we can only have a void queue in 'exit' case here seems logical - * (we should only be here after our worker thread has been woken up from a - * condition_wait(), which only happens after a new task was added to the queue), - * but it is wrong. - * Waiting on condition may wake up the thread even if condition is not signaled - * (spurious wake-ups), and some race condition may also empty the queue **after** - * condition has been signaled, but **before** awoken thread reaches this point... - * See http://stackoverflow.com/questions/8594591 - * - * So we only abort here if do_exit is set. - */ - if (scheduler->do_exit) { - BLI_mutex_unlock(&scheduler->queue_mutex); - return false; - } - - for (current_task = (Task *)scheduler->queue.first; current_task != NULL; - current_task = current_task->next) { - TaskPool *pool = current_task->pool; - - if (scheduler->background_thread_only && !pool->run_in_background) { - continue; - } - - *task = current_task; - found_task = true; - BLI_remlink(&scheduler->queue, *task); - break; - } - if (!found_task) { - BLI_condition_wait(&scheduler->queue_cond, &scheduler->queue_mutex); - } - } while (!found_task); - - BLI_mutex_unlock(&scheduler->queue_mutex); - - return true; + return false; } -BLI_INLINE void handle_local_queue(TaskThreadLocalStorage *tls, const int thread_id) +static void tbb_task_pool_free(TaskPool *pool) { - BLI_assert(!tls->do_delayed_push); - while (tls->num_local_queue > 0) { - /* We pop task from queue before handling it so handler of the task can - * push next job to the local queue. - */ - tls->num_local_queue--; - Task *local_task = tls->local_queue[tls->num_local_queue]; - /* TODO(sergey): Double-check work_and_wait() doesn't handle other's - * pool tasks. - */ - TaskPool *local_pool = local_task->pool; - local_task->run(local_pool, local_task->taskdata, thread_id); - task_free(local_pool, local_task, thread_id); +#ifdef WITH_TBB + if (pool->use_threads) { + pool->tbb_group.~TBBTaskGroup(); } - BLI_assert(!tls->do_delayed_push); -} +#endif -static void *task_scheduler_thread_run(void *thread_p) -{ - TaskThread *thread = (TaskThread *)thread_p; - TaskThreadLocalStorage *tls = &thread->tls; - TaskScheduler *scheduler = thread->scheduler; - int thread_id = thread->id; - Task *task; - - pthread_setspecific(scheduler->tls_id_key, thread); - - /* signal the main thread when all threads have started */ - BLI_mutex_lock(&scheduler->startup_mutex); - scheduler->num_thread_started++; - if (scheduler->num_thread_started == scheduler->num_threads) { - BLI_condition_notify_one(&scheduler->startup_cond); + if (pool->suspended_mempool) { + BLI_mempool_destroy(pool->suspended_mempool); } - BLI_mutex_unlock(&scheduler->startup_mutex); - - /* keep popping off tasks */ - while (task_scheduler_thread_wait_pop(scheduler, &task)) { - TaskPool *pool = task->pool; - - /* run task */ - BLI_assert(!tls->do_delayed_push); - task->run(pool, task->taskdata, thread_id); - BLI_assert(!tls->do_delayed_push); - - /* delete task */ - task_free(pool, task, thread_id); +} - /* Handle all tasks from local queue. */ - handle_local_queue(tls, thread_id); +/* Background Task Pool. + * + * Fallback for running background tasks when building without TBB. */ - /* notify pool task was done */ - task_pool_num_decrease(pool, 1); +static void *background_task_run(void *userdata) +{ + TaskPool *pool = (TaskPool *)userdata; + while (Task *task = (Task *)BLI_thread_queue_pop(pool->background_queue)) { + (*task)(); + task->~Task(); + MEM_freeN(task); } - return NULL; } -TaskScheduler *BLI_task_scheduler_create(int num_threads) +static void background_task_pool_create(TaskPool *pool) { - TaskScheduler *scheduler = (TaskScheduler *)MEM_callocN(sizeof(TaskScheduler), "TaskScheduler"); - - /* multiple places can use this task scheduler, sharing the same - * threads, so we keep track of the number of users. */ - scheduler->do_exit = false; - - BLI_listbase_clear(&scheduler->queue); - BLI_mutex_init(&scheduler->queue_mutex); - BLI_condition_init(&scheduler->queue_cond); - - BLI_mutex_init(&scheduler->startup_mutex); - BLI_condition_init(&scheduler->startup_cond); - scheduler->num_thread_started = 0; - - if (num_threads == 0) { - /* automatic number of threads will be main thread + num cores */ - num_threads = BLI_system_thread_count(); - } - - /* main thread will also work, so we count it too */ - num_threads -= 1; - - /* Add background-only thread if needed. */ - if (num_threads == 0) { - scheduler->background_thread_only = true; - num_threads = 1; - } - - scheduler->task_threads = (TaskThread *)MEM_mallocN(sizeof(TaskThread) * (num_threads + 1), - "TaskScheduler task threads"); - - /* Initialize TLS for main thread. */ - initialize_task_tls(&scheduler->task_threads[0].tls); - - pthread_key_create(&scheduler->tls_id_key, NULL); - - /* launch threads that will be waiting for work */ - if (num_threads > 0) { - int i; - - scheduler->num_threads = num_threads; - scheduler->threads = (pthread_t *)MEM_callocN(sizeof(pthread_t) * num_threads, - "TaskScheduler threads"); - - for (i = 0; i < num_threads; i++) { - TaskThread *thread = &scheduler->task_threads[i + 1]; - thread->scheduler = scheduler; - thread->id = i + 1; - initialize_task_tls(&thread->tls); - - if (pthread_create(&scheduler->threads[i], NULL, task_scheduler_thread_run, thread) != 0) { - fprintf(stderr, "TaskScheduler failed to launch thread %d/%d\n", i, num_threads); - } - } - } - - /* Wait for all worker threads to start before returning to caller to prevent the case where - * threads are still starting and pthread_join is called, which causes a deadlock on pthreads4w. - */ - BLI_mutex_lock(&scheduler->startup_mutex); - /* NOTE: Use loop here to avoid false-positive everything-is-ready caused by spontaneous thread - * wake up. */ - while (scheduler->num_thread_started != num_threads) { - BLI_condition_wait(&scheduler->startup_cond, &scheduler->startup_mutex); - } - BLI_mutex_unlock(&scheduler->startup_mutex); - - return scheduler; + pool->background_queue = BLI_thread_queue_init(); + BLI_threadpool_init(&pool->background_threads, background_task_run, 1); + BLI_threadpool_insert(&pool->background_threads, pool); } -void BLI_task_scheduler_free(TaskScheduler *scheduler) +static void background_task_pool_run(TaskPool *pool, Task &&task) { - Task *task; - - /* stop all waiting threads */ - BLI_mutex_lock(&scheduler->queue_mutex); - scheduler->do_exit = true; - BLI_condition_notify_all(&scheduler->queue_cond); - BLI_mutex_unlock(&scheduler->queue_mutex); - - pthread_key_delete(scheduler->tls_id_key); - - /* delete threads */ - if (scheduler->threads) { - int i; - - for (i = 0; i < scheduler->num_threads; i++) { - if (pthread_join(scheduler->threads[i], NULL) != 0) { - fprintf(stderr, "TaskScheduler failed to join thread %d/%d\n", i, scheduler->num_threads); - } - } - - MEM_freeN(scheduler->threads); - } - - /* Delete task thread data */ - if (scheduler->task_threads) { - for (int i = 0; i < scheduler->num_threads + 1; i++) { - TaskThreadLocalStorage *tls = &scheduler->task_threads[i].tls; - free_task_tls(tls); - } - - MEM_freeN(scheduler->task_threads); - } - - /* delete leftover tasks */ - for (task = (Task *)scheduler->queue.first; task; task = task->next) { - task_data_free(task, 0); - } - BLI_freelistN(&scheduler->queue); - - /* delete mutex/condition */ - BLI_mutex_end(&scheduler->queue_mutex); - BLI_condition_end(&scheduler->queue_cond); - BLI_mutex_end(&scheduler->startup_mutex); - BLI_condition_end(&scheduler->startup_cond); - - MEM_freeN(scheduler); + Task *task_mem = (Task *)MEM_mallocN(sizeof(Task), __func__); + new (task_mem) Task(std::move(task)); + BLI_thread_queue_push(pool->background_queue, task_mem); } -int BLI_task_scheduler_num_threads(TaskScheduler *scheduler) +static void background_task_pool_work_and_wait(TaskPool *pool) { - return scheduler->num_threads + 1; + /* Signal background thread to stop waiting for new tasks if none are + * left, and wait for tasks and thread to finish. */ + BLI_thread_queue_nowait(pool->background_queue); + BLI_thread_queue_wait_finish(pool->background_queue); + BLI_threadpool_remove(&pool->background_threads, pool); } -static void task_scheduler_push(TaskScheduler *scheduler, Task *task, TaskPriority priority) +static void background_task_pool_cancel(TaskPool *pool) { - task_pool_num_increase(task->pool, 1); - - /* add task to queue */ - BLI_mutex_lock(&scheduler->queue_mutex); + pool->background_is_canceling = true; - if (priority == TASK_PRIORITY_HIGH) { - BLI_addhead(&scheduler->queue, task); - } - else { - BLI_addtail(&scheduler->queue, task); + /* Remove tasks not yet started by background thread. */ + BLI_thread_queue_nowait(pool->background_queue); + while (Task *task = (Task *)BLI_thread_queue_pop(pool->background_queue)) { + task->~Task(); + MEM_freeN(task); } - BLI_condition_notify_one(&scheduler->queue_cond); - BLI_mutex_unlock(&scheduler->queue_mutex); + /* Let background thread finish or cancel task it is working on. */ + BLI_threadpool_remove(&pool->background_threads, pool); + pool->background_is_canceling = false; } -static void task_scheduler_push_all(TaskScheduler *scheduler, - TaskPool *pool, - Task **tasks, - int num_tasks) +static bool background_task_pool_canceled(TaskPool *pool) { - if (num_tasks == 0) { - return; - } - - task_pool_num_increase(pool, num_tasks); - - BLI_mutex_lock(&scheduler->queue_mutex); - - for (int i = 0; i < num_tasks; i++) { - BLI_addhead(&scheduler->queue, tasks[i]); - } - - BLI_condition_notify_all(&scheduler->queue_cond); - BLI_mutex_unlock(&scheduler->queue_mutex); + return pool->background_is_canceling; } -static void task_scheduler_clear(TaskScheduler *scheduler, TaskPool *pool) +static void background_task_pool_free(TaskPool *pool) { - Task *task, *nexttask; - size_t done = 0; - - BLI_mutex_lock(&scheduler->queue_mutex); - - /* free all tasks from this pool from the queue */ - for (task = (Task *)scheduler->queue.first; task; task = nexttask) { - nexttask = task->next; - - if (task->pool == pool) { - task_data_free(task, pool->thread_id); - BLI_freelinkN(&scheduler->queue, task); - - done++; - } - } + background_task_pool_work_and_wait(pool); - BLI_mutex_unlock(&scheduler->queue_mutex); - - /* notify done */ - task_pool_num_decrease(pool, done); + BLI_threadpool_end(&pool->background_threads); + BLI_thread_queue_free(pool->background_queue); } /* Task Pool */ -static TaskPool *task_pool_create_ex(TaskScheduler *scheduler, - void *userdata, - const bool is_background, - const bool is_suspended, - TaskPriority priority) +static TaskPool *task_pool_create_ex(void *userdata, TaskPoolType type, TaskPriority priority) { - TaskPool *pool = (TaskPool *)MEM_mallocN(sizeof(TaskPool), "TaskPool"); + /* Ensure malloc will go fine from threads, + * + * This is needed because we could be in main thread here + * and malloc could be non-thread safe at this point because + * no other jobs are running. + */ + BLI_threaded_malloc_begin(); -#ifndef NDEBUG - /* Assert we do not try to create a background pool from some parent task - - * those only work OK from main thread. */ - if (is_background) { - const pthread_t thread_id = pthread_self(); - int i = scheduler->num_threads; + const bool use_threads = BLI_task_scheduler_num_threads() > 1 && type != TASK_POOL_NO_THREADS; - while (i--) { - BLI_assert(!pthread_equal(scheduler->threads[i], thread_id)); - } + /* Background task pool uses regular TBB scheduling if available. Only when + * building without TBB or running with -t 1 do we need to ensure these tasks + * do not block the main thread. */ + if (type == TASK_POOL_BACKGROUND && use_threads) { + type = TASK_POOL_TBB; } -#endif - pool->scheduler = scheduler; - pool->num = 0; - pool->do_cancel = false; - pool->do_work = false; - pool->is_suspended = is_suspended; - pool->start_suspended = is_suspended; - pool->num_suspended = 0; - pool->suspended_queue.first = pool->suspended_queue.last = NULL; - pool->priority = priority; - pool->run_in_background = is_background; - pool->use_local_tls = false; - - BLI_mutex_init(&pool->num_mutex); - BLI_condition_init(&pool->num_cond); + /* Allocate task pool. */ + TaskPool *pool = (TaskPool *)MEM_callocN(sizeof(TaskPool), "TaskPool"); + + pool->type = type; + pool->use_threads = use_threads; pool->userdata = userdata; BLI_mutex_init(&pool->user_mutex); - if (BLI_thread_is_main()) { - pool->thread_id = 0; - } - else { - TaskThread *thread = (TaskThread *)pthread_getspecific(scheduler->tls_id_key); - if (thread == NULL) { - /* NOTE: Task pool is created from non-main thread which is not - * managed by the task scheduler. We identify ourselves as thread ID - * 0 but we do not use scheduler's TLS storage and use our own - * instead to avoid any possible threading conflicts. - */ - pool->thread_id = 0; - pool->use_local_tls = true; -#ifndef NDEBUG - pool->creator_thread_id = pthread_self(); -#endif - initialize_task_tls(&pool->local_tls); - } - else { - pool->thread_id = thread->id; - } + switch (type) { + case TASK_POOL_TBB: + case TASK_POOL_TBB_SUSPENDED: + case TASK_POOL_NO_THREADS: + tbb_task_pool_create(pool, priority); + break; + case TASK_POOL_BACKGROUND: + case TASK_POOL_BACKGROUND_SERIAL: + background_task_pool_create(pool); + break; } -#ifdef DEBUG_STATS - pool->mempool_stats = (TaskMemPoolStats *)MEM_callocN( - sizeof(*pool->mempool_stats) * (scheduler->num_threads + 1), "per-taskpool mempool stats"); -#endif - - /* Ensure malloc will go fine from threads, - * - * This is needed because we could be in main thread here - * and malloc could be non-thread safe at this point because - * no other jobs are running. - */ - BLI_threaded_malloc_begin(); - return pool; } /** * Create a normal task pool. Tasks will be executed as soon as they are added. */ -TaskPool *BLI_task_pool_create(TaskScheduler *scheduler, void *userdata, TaskPriority priority) +TaskPool *BLI_task_pool_create(void *userdata, TaskPriority priority) { - return task_pool_create_ex(scheduler, userdata, false, false, priority); + return task_pool_create_ex(userdata, TASK_POOL_TBB, priority); } /** @@ -756,11 +393,9 @@ TaskPool *BLI_task_pool_create(TaskScheduler *scheduler, void *userdata, TaskPri * they could end never being executed, since the 'fallback' background thread is already * busy with parent task in single-threaded context). */ -TaskPool *BLI_task_pool_create_background(TaskScheduler *scheduler, - void *userdata, - TaskPriority priority) +TaskPool *BLI_task_pool_create_background(void *userdata, TaskPriority priority) { - return task_pool_create_ex(scheduler, userdata, true, false, priority); + return task_pool_create_ex(userdata, TASK_POOL_BACKGROUND, priority); } /** @@ -768,228 +403,114 @@ TaskPool *BLI_task_pool_create_background(TaskScheduler *scheduler, * for until BLI_task_pool_work_and_wait() is called. This helps reducing threading * overhead when pushing huge amount of small initial tasks from the main thread. */ -TaskPool *BLI_task_pool_create_suspended(TaskScheduler *scheduler, - void *userdata, - TaskPriority priority) +TaskPool *BLI_task_pool_create_suspended(void *userdata, TaskPriority priority) { - return task_pool_create_ex(scheduler, userdata, false, true, priority); + return task_pool_create_ex(userdata, TASK_POOL_TBB_SUSPENDED, priority); } -void BLI_task_pool_free(TaskPool *pool) +/** + * Single threaded task pool that executes pushed task immediately, for + * debugging purposes. + */ +TaskPool *BLI_task_pool_create_no_threads(void *userdata) { - BLI_task_pool_cancel(pool); - - BLI_mutex_end(&pool->num_mutex); - BLI_condition_end(&pool->num_cond); + return task_pool_create_ex(userdata, TASK_POOL_NO_THREADS, TASK_PRIORITY_HIGH); +} - BLI_mutex_end(&pool->user_mutex); +/** + * Task pool that executeds one task after the other, possibly on different threads + * but never in parallel. + */ +TaskPool *BLI_task_pool_create_background_serial(void *userdata, TaskPriority priority) +{ + return task_pool_create_ex(userdata, TASK_POOL_BACKGROUND_SERIAL, priority); +} -#ifdef DEBUG_STATS - printf("Thread ID Allocated Reused Discarded\n"); - for (int i = 0; i < pool->scheduler->num_threads + 1; i++) { - printf("%02d %05d %05d %05d\n", - i, - pool->mempool_stats[i].num_alloc, - pool->mempool_stats[i].num_reuse, - pool->mempool_stats[i].num_discard); +void BLI_task_pool_free(TaskPool *pool) +{ + switch (pool->type) { + case TASK_POOL_TBB: + case TASK_POOL_TBB_SUSPENDED: + case TASK_POOL_NO_THREADS: + tbb_task_pool_free(pool); + break; + case TASK_POOL_BACKGROUND: + case TASK_POOL_BACKGROUND_SERIAL: + background_task_pool_free(pool); + break; } - MEM_freeN(pool->mempool_stats); -#endif - if (pool->use_local_tls) { - free_task_tls(&pool->local_tls); - } + BLI_mutex_end(&pool->user_mutex); MEM_freeN(pool); BLI_threaded_malloc_end(); } -BLI_INLINE bool task_can_use_local_queues(TaskPool *pool, int thread_id) -{ - return (thread_id != -1 && (thread_id != pool->thread_id || pool->do_work)); -} - -static void task_pool_push(TaskPool *pool, - TaskRunFunction run, - void *taskdata, - bool free_taskdata, - TaskFreeFunction freedata, - int thread_id) -{ - /* Allocate task and fill it's properties. */ - Task *task = task_alloc(pool, thread_id); - task->run = run; - task->taskdata = taskdata; - task->free_taskdata = free_taskdata; - task->freedata = freedata; - task->pool = pool; - /* For suspended pools we put everything yo a global queue first - * and exit as soon as possible. - * - * This tasks will be moved to actual execution when pool is - * activated by work_and_wait(). - */ - if (pool->is_suspended) { - BLI_addhead(&pool->suspended_queue, task); - atomic_fetch_and_add_z(&pool->num_suspended, 1); - return; - } - /* Populate to any local queue first, this is cheapest push ever. */ - if (task_can_use_local_queues(pool, thread_id)) { - ASSERT_THREAD_ID(pool->scheduler, thread_id); - TaskThreadLocalStorage *tls = get_task_tls(pool, thread_id); - /* Try to push to a local execution queue. - * These tasks will be picked up next. - */ - if (tls->num_local_queue < LOCAL_QUEUE_SIZE) { - tls->local_queue[tls->num_local_queue] = task; - tls->num_local_queue++; - return; - } - /* If we are in the delayed tasks push mode, we push tasks to a - * temporary local queue first without any locks, and then move them - * to global execution queue with a single lock. - */ - if (tls->do_delayed_push && tls->num_delayed_queue < DELAYED_QUEUE_SIZE) { - tls->delayed_queue[tls->num_delayed_queue] = task; - tls->num_delayed_queue++; - return; - } - } - /* Do push to a global execution pool, slowest possible method, - * causes quite reasonable amount of threading overhead. - */ - task_scheduler_push(pool->scheduler, task, pool->priority); -} - void BLI_task_pool_push(TaskPool *pool, TaskRunFunction run, void *taskdata, bool free_taskdata, TaskFreeFunction freedata) { - task_pool_push(pool, run, taskdata, free_taskdata, freedata, -1); -} + Task task(pool, run, taskdata, free_taskdata, freedata); -void BLI_task_pool_push_from_thread(TaskPool *pool, - TaskRunFunction run, - void *taskdata, - bool free_taskdata, - TaskFreeFunction freedata, - int thread_id) -{ - task_pool_push(pool, run, taskdata, free_taskdata, freedata, thread_id); + switch (pool->type) { + case TASK_POOL_TBB: + case TASK_POOL_TBB_SUSPENDED: + case TASK_POOL_NO_THREADS: + tbb_task_pool_run(pool, std::move(task)); + break; + case TASK_POOL_BACKGROUND: + case TASK_POOL_BACKGROUND_SERIAL: + background_task_pool_run(pool, std::move(task)); + break; + } } void BLI_task_pool_work_and_wait(TaskPool *pool) { - TaskThreadLocalStorage *tls = get_task_tls(pool, pool->thread_id); - TaskScheduler *scheduler = pool->scheduler; - - if (atomic_fetch_and_and_uint8((uint8_t *)&pool->is_suspended, 0)) { - if (pool->num_suspended) { - task_pool_num_increase(pool, pool->num_suspended); - BLI_mutex_lock(&scheduler->queue_mutex); - - BLI_movelisttolist(&scheduler->queue, &pool->suspended_queue); - - BLI_condition_notify_all(&scheduler->queue_cond); - BLI_mutex_unlock(&scheduler->queue_mutex); - - pool->num_suspended = 0; - } - } - - pool->do_work = true; - - ASSERT_THREAD_ID(pool->scheduler, pool->thread_id); - - handle_local_queue(tls, pool->thread_id); - - BLI_mutex_lock(&pool->num_mutex); - - while (pool->num != 0) { - Task *task, *work_task = NULL; - bool found_task = false; - - BLI_mutex_unlock(&pool->num_mutex); - - BLI_mutex_lock(&scheduler->queue_mutex); - - /* find task from this pool. if we get a task from another pool, - * we can get into deadlock */ - - for (task = (Task *)scheduler->queue.first; task; task = task->next) { - if (task->pool == pool) { - work_task = task; - found_task = true; - BLI_remlink(&scheduler->queue, task); - break; - } - } - - BLI_mutex_unlock(&scheduler->queue_mutex); - - /* if found task, do it, otherwise wait until other tasks are done */ - if (found_task) { - /* run task */ - BLI_assert(!tls->do_delayed_push); - work_task->run(pool, work_task->taskdata, pool->thread_id); - BLI_assert(!tls->do_delayed_push); - - /* delete task */ - task_free(pool, task, pool->thread_id); - - /* Handle all tasks from local queue. */ - handle_local_queue(tls, pool->thread_id); - - /* notify pool task was done */ - task_pool_num_decrease(pool, 1); - } - - BLI_mutex_lock(&pool->num_mutex); - if (pool->num == 0) { + switch (pool->type) { + case TASK_POOL_TBB: + case TASK_POOL_TBB_SUSPENDED: + case TASK_POOL_NO_THREADS: + tbb_task_pool_work_and_wait(pool); + break; + case TASK_POOL_BACKGROUND: + case TASK_POOL_BACKGROUND_SERIAL: + background_task_pool_work_and_wait(pool); break; - } - - if (!found_task) { - BLI_condition_wait(&pool->num_cond, &pool->num_mutex); - } } - - BLI_mutex_unlock(&pool->num_mutex); - - BLI_assert(tls->num_local_queue == 0); -} - -void BLI_task_pool_work_wait_and_reset(TaskPool *pool) -{ - BLI_task_pool_work_and_wait(pool); - - pool->do_work = false; - pool->is_suspended = pool->start_suspended; } void BLI_task_pool_cancel(TaskPool *pool) { - pool->do_cancel = true; - - task_scheduler_clear(pool->scheduler, pool); - - /* wait until all entries are cleared */ - BLI_mutex_lock(&pool->num_mutex); - while (pool->num) { - BLI_condition_wait(&pool->num_cond, &pool->num_mutex); + switch (pool->type) { + case TASK_POOL_TBB: + case TASK_POOL_TBB_SUSPENDED: + case TASK_POOL_NO_THREADS: + tbb_task_pool_cancel(pool); + break; + case TASK_POOL_BACKGROUND: + case TASK_POOL_BACKGROUND_SERIAL: + background_task_pool_cancel(pool); + break; } - BLI_mutex_unlock(&pool->num_mutex); - - pool->do_cancel = false; } bool BLI_task_pool_canceled(TaskPool *pool) { - return pool->do_cancel; + switch (pool->type) { + case TASK_POOL_TBB: + case TASK_POOL_TBB_SUSPENDED: + case TASK_POOL_NO_THREADS: + return tbb_task_pool_canceled(pool); + case TASK_POOL_BACKGROUND: + case TASK_POOL_BACKGROUND_SERIAL: + return background_task_pool_canceled(pool); + } + BLI_assert("BLI_task_pool_canceled: Control flow should not come here!"); + return false; } void *BLI_task_pool_user_data(TaskPool *pool) @@ -1000,30 +521,4 @@ void *BLI_task_pool_user_data(TaskPool *pool) ThreadMutex *BLI_task_pool_user_mutex(TaskPool *pool) { return &pool->user_mutex; -} - -int BLI_task_pool_creator_thread_id(TaskPool *pool) -{ - return pool->thread_id; -} - -void BLI_task_pool_delayed_push_begin(TaskPool *pool, int thread_id) -{ - if (task_can_use_local_queues(pool, thread_id)) { - ASSERT_THREAD_ID(pool->scheduler, thread_id); - TaskThreadLocalStorage *tls = get_task_tls(pool, thread_id); - tls->do_delayed_push = true; - } -} - -void BLI_task_pool_delayed_push_end(TaskPool *pool, int thread_id) -{ - if (task_can_use_local_queues(pool, thread_id)) { - ASSERT_THREAD_ID(pool->scheduler, thread_id); - TaskThreadLocalStorage *tls = get_task_tls(pool, thread_id); - BLI_assert(tls->do_delayed_push); - task_scheduler_push_all(pool->scheduler, pool, tls->delayed_queue, tls->num_delayed_queue); - tls->do_delayed_push = false; - tls->num_delayed_queue = 0; - } -} +}
\ No newline at end of file |