Welcome to mirror list, hosted at ThFree Co, Russian Federation.

git.blender.org/blender.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
Diffstat (limited to 'source/blender/blenlib/intern/task_pool.cc')
-rw-r--r--source/blender/blenlib/intern/task_pool.cc1167
1 files changed, 345 insertions, 822 deletions
diff --git a/source/blender/blenlib/intern/task_pool.cc b/source/blender/blenlib/intern/task_pool.cc
index 8085d495248..c4d60673492 100644
--- a/source/blender/blenlib/intern/task_pool.cc
+++ b/source/blender/blenlib/intern/task_pool.cc
@@ -17,731 +17,396 @@
/** \file
* \ingroup bli
*
- * A generic task system which can be used for any task based subsystem.
+ * Task pool to run tasks in parallel.
*/
+#include <memory>
#include <stdlib.h>
+#include <utility>
#include "MEM_guardedalloc.h"
#include "DNA_listBase.h"
-#include "BLI_listbase.h"
#include "BLI_math.h"
#include "BLI_mempool.h"
#include "BLI_task.h"
#include "BLI_threads.h"
-#include "atomic_ops.h"
-
-/* Define this to enable some detailed statistic print. */
-#undef DEBUG_STATS
-
-/* Types */
-
-/* Number of per-thread pre-allocated tasks.
- *
- * For more details see description of TaskMemPool.
- */
-#define MEMPOOL_SIZE 256
-
-/* Number of tasks which are pushed directly to local thread queue.
- *
- * This allows thread to fetch next task without locking the whole queue.
- */
-#define LOCAL_QUEUE_SIZE 1
-
-/* Number of tasks which are allowed to be scheduled in a delayed manner.
- *
- * This allows to use less locks per graph node children schedule. More details
- * could be found at TaskThreadLocalStorage::do_delayed_push.
- */
-#define DELAYED_QUEUE_SIZE 4096
-
-#ifndef NDEBUG
-# define ASSERT_THREAD_ID(scheduler, thread_id) \
- do { \
- if (!BLI_thread_is_main()) { \
- TaskThread *thread = (TaskThread *)pthread_getspecific(scheduler->tls_id_key); \
- if (thread == NULL) { \
- BLI_assert(thread_id == 0); \
- } \
- else { \
- BLI_assert(thread_id == thread->id); \
- } \
- } \
- else { \
- BLI_assert(thread_id == 0); \
- } \
- } while (false)
-#else
-# define ASSERT_THREAD_ID(scheduler, thread_id)
+#ifdef WITH_TBB
+/* Quiet top level deprecation message, unrelated to API usage here. */
+# define TBB_SUPPRESS_DEPRECATED_MESSAGES 1
+# include <tbb/tbb.h>
#endif
-typedef struct Task {
- struct Task *next, *prev;
+/* Task
+ *
+ * Unit of work to execute. This is a C++ class to work with TBB. */
+class Task {
+ public:
+ TaskPool *pool;
TaskRunFunction run;
void *taskdata;
bool free_taskdata;
TaskFreeFunction freedata;
- TaskPool *pool;
-} Task;
-/* This is a per-thread storage of pre-allocated tasks.
- *
- * The idea behind this is simple: reduce amount of malloc() calls when pushing
- * new task to the pool. This is done by keeping memory from the tasks which
- * were finished already, so instead of freeing that memory we put it to the
- * pool for the later re-use.
- *
- * The tricky part here is to avoid any inter-thread synchronization, hence no
- * lock must exist around this pool. The pool will become an owner of the pointer
- * from freed task, and only corresponding thread will be able to use this pool
- * (no memory stealing and such).
- *
- * This leads to the following use of the pool:
- *
- * - task_push() should provide proper thread ID from which the task is being
- * pushed from.
- *
- * - Task allocation function which check corresponding memory pool and if there
- * is any memory in there it'll mark memory as re-used, remove it from the pool
- * and use that memory for the new task.
- *
- * At this moment task queue owns the memory.
- *
- * - When task is done and task_free() is called the memory will be put to the
- * pool which corresponds to a thread which handled the task.
- */
-typedef struct TaskMemPool {
- /* Number of pre-allocated tasks in the pool. */
- int num_tasks;
- /* Pre-allocated task memory pointers. */
- Task *tasks[MEMPOOL_SIZE];
-} TaskMemPool;
-
-#ifdef DEBUG_STATS
-typedef struct TaskMemPoolStats {
- /* Number of allocations. */
- int num_alloc;
- /* Number of avoided allocations (pointer was re-used from the pool). */
- int num_reuse;
- /* Number of discarded memory due to pool saturation, */
- int num_discard;
-} TaskMemPoolStats;
-#endif
-
-typedef struct TaskThreadLocalStorage {
- /* Memory pool for faster task allocation.
- * The idea is to re-use memory of finished/discarded tasks by this thread.
- */
- TaskMemPool task_mempool;
-
- /* Local queue keeps thread alive by keeping small amount of tasks ready
- * to be picked up without causing global thread locks for synchronization.
- */
- int num_local_queue;
- Task *local_queue[LOCAL_QUEUE_SIZE];
-
- /* Thread can be marked for delayed tasks push. This is helpful when it's
- * know that lots of subsequent task pushed will happen from the same thread
- * without "interrupting" for task execution.
- *
- * We try to accumulate as much tasks as possible in a local queue without
- * any locks first, and then we push all of them into a scheduler's queue
- * from within a single mutex lock.
- */
- bool do_delayed_push;
- int num_delayed_queue;
- Task *delayed_queue[DELAYED_QUEUE_SIZE];
-} TaskThreadLocalStorage;
-
-struct TaskPool {
- TaskScheduler *scheduler;
-
- volatile size_t num;
- ThreadMutex num_mutex;
- ThreadCondition num_cond;
-
- void *userdata;
- ThreadMutex user_mutex;
+ Task(TaskPool *pool,
+ TaskRunFunction run,
+ void *taskdata,
+ bool free_taskdata,
+ TaskFreeFunction freedata)
+ : pool(pool), run(run), taskdata(taskdata), free_taskdata(free_taskdata), freedata(freedata)
+ {
+ }
- volatile bool do_cancel;
- volatile bool do_work;
+ ~Task()
+ {
+ if (free_taskdata) {
+ if (freedata) {
+ freedata(pool, taskdata);
+ }
+ else {
+ MEM_freeN(taskdata);
+ }
+ }
+ }
- volatile bool is_suspended;
- bool start_suspended;
- ListBase suspended_queue;
- size_t num_suspended;
+ /* Move constructor.
+ * For performance, ensure we never copy the task and only move it.
+ * For TBB version 2017 and earlier we apply a workaround to make up for
+ * the lack of move constructor support. */
+ Task(Task &&other)
+ : pool(other.pool),
+ run(other.run),
+ taskdata(other.taskdata),
+ free_taskdata(other.free_taskdata),
+ freedata(other.freedata)
+ {
+ other.pool = NULL;
+ other.run = NULL;
+ other.taskdata = NULL;
+ other.free_taskdata = false;
+ other.freedata = NULL;
+ }
+
+#if defined(WITH_TBB) && TBB_INTERFACE_VERSION_MAJOR < 10
+ Task(const Task &other)
+ : pool(other.pool),
+ run(other.run),
+ taskdata(other.taskdata),
+ free_taskdata(other.free_taskdata),
+ freedata(other.freedata)
+ {
+ ((Task &)other).pool = NULL;
+ ((Task &)other).run = NULL;
+ ((Task &)other).taskdata = NULL;
+ ((Task &)other).free_taskdata = false;
+ ((Task &)other).freedata = NULL;
+ }
+#else
+ Task(const Task &other) = delete;
+#endif
- TaskPriority priority;
+ Task &operator=(const Task &other) = delete;
+ Task &operator=(Task &&other) = delete;
- /* If set, this pool may never be work_and_wait'ed, which means TaskScheduler
- * has to use its special background fallback thread in case we are in
- * single-threaded situation.
- */
- bool run_in_background;
+ /* Execute task. */
+ void operator()() const
+ {
+ run(pool, taskdata);
+ }
+};
- /* This is a task scheduler's ID of a thread at which pool was constructed.
- * It will be used to access task TLS.
- */
- int thread_id;
+/* TBB Task Group.
+ *
+ * Subclass since there seems to be no other way to set priority. */
+
+#ifdef WITH_TBB
+class TBBTaskGroup : public tbb::task_group {
+ public:
+ TBBTaskGroup(TaskPriority priority)
+ {
+ switch (priority) {
+ case TASK_PRIORITY_LOW:
+ my_context.set_priority(tbb::priority_low);
+ break;
+ case TASK_PRIORITY_HIGH:
+ my_context.set_priority(tbb::priority_normal);
+ break;
+ }
+ }
- /* For the pools which are created from non-main thread which is not a
- * scheduler worker thread we can't re-use any of scheduler's threads TLS
- * and have to use our own one.
- */
- bool use_local_tls;
- TaskThreadLocalStorage local_tls;
-#ifndef NDEBUG
- pthread_t creator_thread_id;
+ ~TBBTaskGroup()
+ {
+ }
+};
#endif
-#ifdef DEBUG_STATS
- TaskMemPoolStats *mempool_stats;
-#endif
-};
+/* Task Pool */
-struct TaskScheduler {
- pthread_t *threads;
- struct TaskThread *task_threads;
- int num_threads;
- bool background_thread_only;
+typedef enum TaskPoolType {
+ TASK_POOL_TBB,
+ TASK_POOL_TBB_SUSPENDED,
+ TASK_POOL_NO_THREADS,
+ TASK_POOL_BACKGROUND,
+ TASK_POOL_BACKGROUND_SERIAL,
+} TaskPoolType;
- ListBase queue;
- ThreadMutex queue_mutex;
- ThreadCondition queue_cond;
+struct TaskPool {
+ TaskPoolType type;
+ bool use_threads;
- ThreadMutex startup_mutex;
- ThreadCondition startup_cond;
- volatile int num_thread_started;
+ ThreadMutex user_mutex;
+ void *userdata;
- volatile bool do_exit;
+ /* TBB task pool. */
+#ifdef WITH_TBB
+ TBBTaskGroup tbb_group;
+#endif
+ volatile bool is_suspended;
+ BLI_mempool *suspended_mempool;
- /* NOTE: In pthread's TLS we store the whole TaskThread structure. */
- pthread_key_t tls_id_key;
+ /* Background task pool. */
+ ListBase background_threads;
+ ThreadQueue *background_queue;
+ volatile bool background_is_canceling;
};
-typedef struct TaskThread {
- TaskScheduler *scheduler;
- int id;
- TaskThreadLocalStorage tls;
-} TaskThread;
-
-/* Helper */
-BLI_INLINE void task_data_free(Task *task, const int thread_id)
-{
- if (task->free_taskdata) {
- if (task->freedata) {
- task->freedata(task->pool, task->taskdata, thread_id);
- }
- else {
- MEM_freeN(task->taskdata);
- }
- }
-}
-
-BLI_INLINE void initialize_task_tls(TaskThreadLocalStorage *tls)
-{
- memset(tls, 0, sizeof(TaskThreadLocalStorage));
-}
+/* TBB Task Pool.
+ *
+ * Task pool using the TBB scheduler for tasks. When building without TBB
+ * support or running Blender with -t 1, this reverts to single threaded.
+ *
+ * Tasks may be suspended until in all are created, to make it possible to
+ * initialize data structures and create tasks in a single pass. */
-BLI_INLINE TaskThreadLocalStorage *get_task_tls(TaskPool *pool, const int thread_id)
+static void tbb_task_pool_create(TaskPool *pool, TaskPriority priority)
{
- TaskScheduler *scheduler = pool->scheduler;
- BLI_assert(thread_id >= 0);
- BLI_assert(thread_id <= scheduler->num_threads);
- if (pool->use_local_tls && thread_id == 0) {
- BLI_assert(pool->thread_id == 0);
- BLI_assert(!BLI_thread_is_main());
- BLI_assert(pthread_equal(pthread_self(), pool->creator_thread_id));
- return &pool->local_tls;
+ if (pool->type == TASK_POOL_TBB_SUSPENDED) {
+ pool->is_suspended = true;
+ pool->suspended_mempool = BLI_mempool_create(sizeof(Task), 512, 512, BLI_MEMPOOL_ALLOW_ITER);
}
- if (thread_id == 0) {
- BLI_assert(BLI_thread_is_main());
- return &scheduler->task_threads[pool->thread_id].tls;
- }
- return &scheduler->task_threads[thread_id].tls;
-}
-BLI_INLINE void free_task_tls(TaskThreadLocalStorage *tls)
-{
- TaskMemPool *task_mempool = &tls->task_mempool;
- for (int i = 0; i < task_mempool->num_tasks; i++) {
- MEM_freeN(task_mempool->tasks[i]);
+#ifdef WITH_TBB
+ if (pool->use_threads) {
+ new (&pool->tbb_group) TBBTaskGroup(priority);
}
-}
-
-static Task *task_alloc(TaskPool *pool, const int thread_id)
-{
- BLI_assert(thread_id <= pool->scheduler->num_threads);
- if (thread_id != -1) {
- BLI_assert(thread_id >= 0);
- BLI_assert(thread_id <= pool->scheduler->num_threads);
- TaskThreadLocalStorage *tls = get_task_tls(pool, thread_id);
- TaskMemPool *task_mempool = &tls->task_mempool;
- /* Try to re-use task memory from a thread local storage. */
- if (task_mempool->num_tasks > 0) {
- --task_mempool->num_tasks;
- /* Success! We've just avoided task allocation. */
-#ifdef DEBUG_STATS
- pool->mempool_stats[thread_id].num_reuse++;
-#endif
- return task_mempool->tasks[task_mempool->num_tasks];
- }
- /* We are doomed to allocate new task data. */
-#ifdef DEBUG_STATS
- pool->mempool_stats[thread_id].num_alloc++;
+#else
+ UNUSED_VARS(priority);
#endif
- }
- return (Task *)MEM_mallocN(sizeof(Task), "New task");
}
-static void task_free(TaskPool *pool, Task *task, const int thread_id)
+static void tbb_task_pool_run(TaskPool *pool, Task &&task)
{
- task_data_free(task, thread_id);
- BLI_assert(thread_id >= 0);
- BLI_assert(thread_id <= pool->scheduler->num_threads);
- if (thread_id == 0) {
- BLI_assert(pool->use_local_tls || BLI_thread_is_main());
+ if (pool->is_suspended) {
+ /* Suspended task that will be executed in work_and_wait(). */
+ Task *task_mem = (Task *)BLI_mempool_alloc(pool->suspended_mempool);
+ new (task_mem) Task(std::move(task));
+#ifdef __GNUC__
+ /* Work around apparent compiler bug where task is not properly copied
+ * to task_mem. This appears unrelated to the use of placement new or
+ * move semantics, happens even writing to a plain C struct. Rather the
+ * call into TBB seems to have some indirect effect. */
+ std::atomic_thread_fence(std::memory_order_release);
+#endif
}
- TaskThreadLocalStorage *tls = get_task_tls(pool, thread_id);
- TaskMemPool *task_mempool = &tls->task_mempool;
- if (task_mempool->num_tasks < MEMPOOL_SIZE - 1) {
- /* Successfully allowed the task to be re-used later. */
- task_mempool->tasks[task_mempool->num_tasks] = task;
- ++task_mempool->num_tasks;
+#ifdef WITH_TBB
+ else if (pool->use_threads) {
+ /* Execute in TBB task group. */
+ pool->tbb_group.run(std::move(task));
}
- else {
- /* Local storage saturated, no other way than just discard
- * the memory.
- *
- * TODO(sergey): We can perhaps store such pointer in a global
- * scheduler pool, maybe it'll be faster than discarding and
- * allocating again.
- */
- MEM_freeN(task);
-#ifdef DEBUG_STATS
- pool->mempool_stats[thread_id].num_discard++;
#endif
+ else {
+ /* Execute immediately. */
+ task();
}
}
-/* Task Scheduler */
-
-static void task_pool_num_decrease(TaskPool *pool, size_t done)
+static void tbb_task_pool_work_and_wait(TaskPool *pool)
{
- BLI_mutex_lock(&pool->num_mutex);
+ /* Start any suspended task now. */
+ if (pool->suspended_mempool) {
+ pool->is_suspended = false;
- BLI_assert(pool->num >= done);
-
- pool->num -= done;
+ BLI_mempool_iter iter;
+ BLI_mempool_iternew(pool->suspended_mempool, &iter);
+ while (Task *task = (Task *)BLI_mempool_iterstep(&iter)) {
+ tbb_task_pool_run(pool, std::move(*task));
+ }
- if (pool->num == 0) {
- BLI_condition_notify_all(&pool->num_cond);
+ BLI_mempool_clear(pool->suspended_mempool);
}
- BLI_mutex_unlock(&pool->num_mutex);
+#ifdef WITH_TBB
+ if (pool->use_threads) {
+ /* This is called wait(), but internally it can actually do work. This
+ * matters because we don't want recursive usage of task pools to run
+ * out of threads and get stuck. */
+ pool->tbb_group.wait();
+ }
+#endif
}
-static void task_pool_num_increase(TaskPool *pool, size_t new_num)
+static void tbb_task_pool_cancel(TaskPool *pool)
{
- BLI_mutex_lock(&pool->num_mutex);
-
- pool->num += new_num;
- BLI_condition_notify_all(&pool->num_cond);
-
- BLI_mutex_unlock(&pool->num_mutex);
+#ifdef WITH_TBB
+ if (pool->use_threads) {
+ pool->tbb_group.cancel();
+ pool->tbb_group.wait();
+ }
+#else
+ UNUSED_VARS(pool);
+#endif
}
-static bool task_scheduler_thread_wait_pop(TaskScheduler *scheduler, Task **task)
+static bool tbb_task_pool_canceled(TaskPool *pool)
{
- bool found_task = false;
- BLI_mutex_lock(&scheduler->queue_mutex);
-
- while (!scheduler->queue.first && !scheduler->do_exit) {
- BLI_condition_wait(&scheduler->queue_cond, &scheduler->queue_mutex);
+#ifdef WITH_TBB
+ if (pool->use_threads) {
+ return pool->tbb_group.is_canceling();
}
+#else
+ UNUSED_VARS(pool);
+#endif
- do {
- Task *current_task;
-
- /* Assuming we can only have a void queue in 'exit' case here seems logical
- * (we should only be here after our worker thread has been woken up from a
- * condition_wait(), which only happens after a new task was added to the queue),
- * but it is wrong.
- * Waiting on condition may wake up the thread even if condition is not signaled
- * (spurious wake-ups), and some race condition may also empty the queue **after**
- * condition has been signaled, but **before** awoken thread reaches this point...
- * See http://stackoverflow.com/questions/8594591
- *
- * So we only abort here if do_exit is set.
- */
- if (scheduler->do_exit) {
- BLI_mutex_unlock(&scheduler->queue_mutex);
- return false;
- }
-
- for (current_task = (Task *)scheduler->queue.first; current_task != NULL;
- current_task = current_task->next) {
- TaskPool *pool = current_task->pool;
-
- if (scheduler->background_thread_only && !pool->run_in_background) {
- continue;
- }
-
- *task = current_task;
- found_task = true;
- BLI_remlink(&scheduler->queue, *task);
- break;
- }
- if (!found_task) {
- BLI_condition_wait(&scheduler->queue_cond, &scheduler->queue_mutex);
- }
- } while (!found_task);
-
- BLI_mutex_unlock(&scheduler->queue_mutex);
-
- return true;
+ return false;
}
-BLI_INLINE void handle_local_queue(TaskThreadLocalStorage *tls, const int thread_id)
+static void tbb_task_pool_free(TaskPool *pool)
{
- BLI_assert(!tls->do_delayed_push);
- while (tls->num_local_queue > 0) {
- /* We pop task from queue before handling it so handler of the task can
- * push next job to the local queue.
- */
- tls->num_local_queue--;
- Task *local_task = tls->local_queue[tls->num_local_queue];
- /* TODO(sergey): Double-check work_and_wait() doesn't handle other's
- * pool tasks.
- */
- TaskPool *local_pool = local_task->pool;
- local_task->run(local_pool, local_task->taskdata, thread_id);
- task_free(local_pool, local_task, thread_id);
+#ifdef WITH_TBB
+ if (pool->use_threads) {
+ pool->tbb_group.~TBBTaskGroup();
}
- BLI_assert(!tls->do_delayed_push);
-}
+#endif
-static void *task_scheduler_thread_run(void *thread_p)
-{
- TaskThread *thread = (TaskThread *)thread_p;
- TaskThreadLocalStorage *tls = &thread->tls;
- TaskScheduler *scheduler = thread->scheduler;
- int thread_id = thread->id;
- Task *task;
-
- pthread_setspecific(scheduler->tls_id_key, thread);
-
- /* signal the main thread when all threads have started */
- BLI_mutex_lock(&scheduler->startup_mutex);
- scheduler->num_thread_started++;
- if (scheduler->num_thread_started == scheduler->num_threads) {
- BLI_condition_notify_one(&scheduler->startup_cond);
+ if (pool->suspended_mempool) {
+ BLI_mempool_destroy(pool->suspended_mempool);
}
- BLI_mutex_unlock(&scheduler->startup_mutex);
-
- /* keep popping off tasks */
- while (task_scheduler_thread_wait_pop(scheduler, &task)) {
- TaskPool *pool = task->pool;
-
- /* run task */
- BLI_assert(!tls->do_delayed_push);
- task->run(pool, task->taskdata, thread_id);
- BLI_assert(!tls->do_delayed_push);
-
- /* delete task */
- task_free(pool, task, thread_id);
+}
- /* Handle all tasks from local queue. */
- handle_local_queue(tls, thread_id);
+/* Background Task Pool.
+ *
+ * Fallback for running background tasks when building without TBB. */
- /* notify pool task was done */
- task_pool_num_decrease(pool, 1);
+static void *background_task_run(void *userdata)
+{
+ TaskPool *pool = (TaskPool *)userdata;
+ while (Task *task = (Task *)BLI_thread_queue_pop(pool->background_queue)) {
+ (*task)();
+ task->~Task();
+ MEM_freeN(task);
}
-
return NULL;
}
-TaskScheduler *BLI_task_scheduler_create(int num_threads)
+static void background_task_pool_create(TaskPool *pool)
{
- TaskScheduler *scheduler = (TaskScheduler *)MEM_callocN(sizeof(TaskScheduler), "TaskScheduler");
-
- /* multiple places can use this task scheduler, sharing the same
- * threads, so we keep track of the number of users. */
- scheduler->do_exit = false;
-
- BLI_listbase_clear(&scheduler->queue);
- BLI_mutex_init(&scheduler->queue_mutex);
- BLI_condition_init(&scheduler->queue_cond);
-
- BLI_mutex_init(&scheduler->startup_mutex);
- BLI_condition_init(&scheduler->startup_cond);
- scheduler->num_thread_started = 0;
-
- if (num_threads == 0) {
- /* automatic number of threads will be main thread + num cores */
- num_threads = BLI_system_thread_count();
- }
-
- /* main thread will also work, so we count it too */
- num_threads -= 1;
-
- /* Add background-only thread if needed. */
- if (num_threads == 0) {
- scheduler->background_thread_only = true;
- num_threads = 1;
- }
-
- scheduler->task_threads = (TaskThread *)MEM_mallocN(sizeof(TaskThread) * (num_threads + 1),
- "TaskScheduler task threads");
-
- /* Initialize TLS for main thread. */
- initialize_task_tls(&scheduler->task_threads[0].tls);
-
- pthread_key_create(&scheduler->tls_id_key, NULL);
-
- /* launch threads that will be waiting for work */
- if (num_threads > 0) {
- int i;
-
- scheduler->num_threads = num_threads;
- scheduler->threads = (pthread_t *)MEM_callocN(sizeof(pthread_t) * num_threads,
- "TaskScheduler threads");
-
- for (i = 0; i < num_threads; i++) {
- TaskThread *thread = &scheduler->task_threads[i + 1];
- thread->scheduler = scheduler;
- thread->id = i + 1;
- initialize_task_tls(&thread->tls);
-
- if (pthread_create(&scheduler->threads[i], NULL, task_scheduler_thread_run, thread) != 0) {
- fprintf(stderr, "TaskScheduler failed to launch thread %d/%d\n", i, num_threads);
- }
- }
- }
-
- /* Wait for all worker threads to start before returning to caller to prevent the case where
- * threads are still starting and pthread_join is called, which causes a deadlock on pthreads4w.
- */
- BLI_mutex_lock(&scheduler->startup_mutex);
- /* NOTE: Use loop here to avoid false-positive everything-is-ready caused by spontaneous thread
- * wake up. */
- while (scheduler->num_thread_started != num_threads) {
- BLI_condition_wait(&scheduler->startup_cond, &scheduler->startup_mutex);
- }
- BLI_mutex_unlock(&scheduler->startup_mutex);
-
- return scheduler;
+ pool->background_queue = BLI_thread_queue_init();
+ BLI_threadpool_init(&pool->background_threads, background_task_run, 1);
}
-void BLI_task_scheduler_free(TaskScheduler *scheduler)
+static void background_task_pool_run(TaskPool *pool, Task &&task)
{
- Task *task;
-
- /* stop all waiting threads */
- BLI_mutex_lock(&scheduler->queue_mutex);
- scheduler->do_exit = true;
- BLI_condition_notify_all(&scheduler->queue_cond);
- BLI_mutex_unlock(&scheduler->queue_mutex);
-
- pthread_key_delete(scheduler->tls_id_key);
-
- /* delete threads */
- if (scheduler->threads) {
- int i;
-
- for (i = 0; i < scheduler->num_threads; i++) {
- if (pthread_join(scheduler->threads[i], NULL) != 0) {
- fprintf(stderr, "TaskScheduler failed to join thread %d/%d\n", i, scheduler->num_threads);
- }
- }
+ Task *task_mem = (Task *)MEM_mallocN(sizeof(Task), __func__);
+ new (task_mem) Task(std::move(task));
+ BLI_thread_queue_push(pool->background_queue, task_mem);
- MEM_freeN(scheduler->threads);
+ if (BLI_available_threads(&pool->background_threads)) {
+ BLI_threadpool_insert(&pool->background_threads, pool);
}
-
- /* Delete task thread data */
- if (scheduler->task_threads) {
- for (int i = 0; i < scheduler->num_threads + 1; i++) {
- TaskThreadLocalStorage *tls = &scheduler->task_threads[i].tls;
- free_task_tls(tls);
- }
-
- MEM_freeN(scheduler->task_threads);
- }
-
- /* delete leftover tasks */
- for (task = (Task *)scheduler->queue.first; task; task = task->next) {
- task_data_free(task, 0);
- }
- BLI_freelistN(&scheduler->queue);
-
- /* delete mutex/condition */
- BLI_mutex_end(&scheduler->queue_mutex);
- BLI_condition_end(&scheduler->queue_cond);
- BLI_mutex_end(&scheduler->startup_mutex);
- BLI_condition_end(&scheduler->startup_cond);
-
- MEM_freeN(scheduler);
}
-int BLI_task_scheduler_num_threads(TaskScheduler *scheduler)
+static void background_task_pool_work_and_wait(TaskPool *pool)
{
- return scheduler->num_threads + 1;
+ /* Signal background thread to stop waiting for new tasks if none are
+ * left, and wait for tasks and thread to finish. */
+ BLI_thread_queue_nowait(pool->background_queue);
+ BLI_thread_queue_wait_finish(pool->background_queue);
+ BLI_threadpool_clear(&pool->background_threads);
}
-static void task_scheduler_push(TaskScheduler *scheduler, Task *task, TaskPriority priority)
+static void background_task_pool_cancel(TaskPool *pool)
{
- task_pool_num_increase(task->pool, 1);
+ pool->background_is_canceling = true;
- /* add task to queue */
- BLI_mutex_lock(&scheduler->queue_mutex);
-
- if (priority == TASK_PRIORITY_HIGH) {
- BLI_addhead(&scheduler->queue, task);
- }
- else {
- BLI_addtail(&scheduler->queue, task);
+ /* Remove tasks not yet started by background thread. */
+ BLI_thread_queue_nowait(pool->background_queue);
+ while (Task *task = (Task *)BLI_thread_queue_pop(pool->background_queue)) {
+ task->~Task();
+ MEM_freeN(task);
}
- BLI_condition_notify_one(&scheduler->queue_cond);
- BLI_mutex_unlock(&scheduler->queue_mutex);
+ /* Let background thread finish or cancel task it is working on. */
+ BLI_threadpool_remove(&pool->background_threads, pool);
+ pool->background_is_canceling = false;
}
-static void task_scheduler_push_all(TaskScheduler *scheduler,
- TaskPool *pool,
- Task **tasks,
- int num_tasks)
+static bool background_task_pool_canceled(TaskPool *pool)
{
- if (num_tasks == 0) {
- return;
- }
-
- task_pool_num_increase(pool, num_tasks);
-
- BLI_mutex_lock(&scheduler->queue_mutex);
-
- for (int i = 0; i < num_tasks; i++) {
- BLI_addhead(&scheduler->queue, tasks[i]);
- }
-
- BLI_condition_notify_all(&scheduler->queue_cond);
- BLI_mutex_unlock(&scheduler->queue_mutex);
+ return pool->background_is_canceling;
}
-static void task_scheduler_clear(TaskScheduler *scheduler, TaskPool *pool)
+static void background_task_pool_free(TaskPool *pool)
{
- Task *task, *nexttask;
- size_t done = 0;
-
- BLI_mutex_lock(&scheduler->queue_mutex);
-
- /* free all tasks from this pool from the queue */
- for (task = (Task *)scheduler->queue.first; task; task = nexttask) {
- nexttask = task->next;
-
- if (task->pool == pool) {
- task_data_free(task, pool->thread_id);
- BLI_freelinkN(&scheduler->queue, task);
+ background_task_pool_work_and_wait(pool);
- done++;
- }
- }
-
- BLI_mutex_unlock(&scheduler->queue_mutex);
-
- /* notify done */
- task_pool_num_decrease(pool, done);
+ BLI_threadpool_end(&pool->background_threads);
+ BLI_thread_queue_free(pool->background_queue);
}
/* Task Pool */
-static TaskPool *task_pool_create_ex(TaskScheduler *scheduler,
- void *userdata,
- const bool is_background,
- const bool is_suspended,
- TaskPriority priority)
+static TaskPool *task_pool_create_ex(void *userdata, TaskPoolType type, TaskPriority priority)
{
- TaskPool *pool = (TaskPool *)MEM_mallocN(sizeof(TaskPool), "TaskPool");
+ /* Ensure malloc will go fine from threads,
+ *
+ * This is needed because we could be in main thread here
+ * and malloc could be non-thread safe at this point because
+ * no other jobs are running.
+ */
+ BLI_threaded_malloc_begin();
-#ifndef NDEBUG
- /* Assert we do not try to create a background pool from some parent task -
- * those only work OK from main thread. */
- if (is_background) {
- const pthread_t thread_id = pthread_self();
- int i = scheduler->num_threads;
+ const bool use_threads = BLI_task_scheduler_num_threads() > 1 && type != TASK_POOL_NO_THREADS;
- while (i--) {
- BLI_assert(!pthread_equal(scheduler->threads[i], thread_id));
- }
+ /* Background task pool uses regular TBB scheduling if available. Only when
+ * building without TBB or running with -t 1 do we need to ensure these tasks
+ * do not block the main thread. */
+ if (type == TASK_POOL_BACKGROUND && use_threads) {
+ type = TASK_POOL_TBB;
}
-#endif
- pool->scheduler = scheduler;
- pool->num = 0;
- pool->do_cancel = false;
- pool->do_work = false;
- pool->is_suspended = is_suspended;
- pool->start_suspended = is_suspended;
- pool->num_suspended = 0;
- pool->suspended_queue.first = pool->suspended_queue.last = NULL;
- pool->priority = priority;
- pool->run_in_background = is_background;
- pool->use_local_tls = false;
-
- BLI_mutex_init(&pool->num_mutex);
- BLI_condition_init(&pool->num_cond);
+ /* Allocate task pool. */
+ TaskPool *pool = (TaskPool *)MEM_callocN(sizeof(TaskPool), "TaskPool");
+
+ pool->type = type;
+ pool->use_threads = use_threads;
pool->userdata = userdata;
BLI_mutex_init(&pool->user_mutex);
- if (BLI_thread_is_main()) {
- pool->thread_id = 0;
- }
- else {
- TaskThread *thread = (TaskThread *)pthread_getspecific(scheduler->tls_id_key);
- if (thread == NULL) {
- /* NOTE: Task pool is created from non-main thread which is not
- * managed by the task scheduler. We identify ourselves as thread ID
- * 0 but we do not use scheduler's TLS storage and use our own
- * instead to avoid any possible threading conflicts.
- */
- pool->thread_id = 0;
- pool->use_local_tls = true;
-#ifndef NDEBUG
- pool->creator_thread_id = pthread_self();
-#endif
- initialize_task_tls(&pool->local_tls);
- }
- else {
- pool->thread_id = thread->id;
- }
+ switch (type) {
+ case TASK_POOL_TBB:
+ case TASK_POOL_TBB_SUSPENDED:
+ case TASK_POOL_NO_THREADS:
+ tbb_task_pool_create(pool, priority);
+ break;
+ case TASK_POOL_BACKGROUND:
+ case TASK_POOL_BACKGROUND_SERIAL:
+ background_task_pool_create(pool);
+ break;
}
-#ifdef DEBUG_STATS
- pool->mempool_stats = (TaskMemPoolStats *)MEM_callocN(
- sizeof(*pool->mempool_stats) * (scheduler->num_threads + 1), "per-taskpool mempool stats");
-#endif
-
- /* Ensure malloc will go fine from threads,
- *
- * This is needed because we could be in main thread here
- * and malloc could be non-thread safe at this point because
- * no other jobs are running.
- */
- BLI_threaded_malloc_begin();
-
return pool;
}
/**
* Create a normal task pool. Tasks will be executed as soon as they are added.
*/
-TaskPool *BLI_task_pool_create(TaskScheduler *scheduler, void *userdata, TaskPriority priority)
+TaskPool *BLI_task_pool_create(void *userdata, TaskPriority priority)
{
- return task_pool_create_ex(scheduler, userdata, false, false, priority);
+ return task_pool_create_ex(userdata, TASK_POOL_TBB, priority);
}
/**
@@ -756,11 +421,9 @@ TaskPool *BLI_task_pool_create(TaskScheduler *scheduler, void *userdata, TaskPri
* they could end never being executed, since the 'fallback' background thread is already
* busy with parent task in single-threaded context).
*/
-TaskPool *BLI_task_pool_create_background(TaskScheduler *scheduler,
- void *userdata,
- TaskPriority priority)
+TaskPool *BLI_task_pool_create_background(void *userdata, TaskPriority priority)
{
- return task_pool_create_ex(scheduler, userdata, true, false, priority);
+ return task_pool_create_ex(userdata, TASK_POOL_BACKGROUND, priority);
}
/**
@@ -768,231 +431,117 @@ TaskPool *BLI_task_pool_create_background(TaskScheduler *scheduler,
* for until BLI_task_pool_work_and_wait() is called. This helps reducing threading
* overhead when pushing huge amount of small initial tasks from the main thread.
*/
-TaskPool *BLI_task_pool_create_suspended(TaskScheduler *scheduler,
- void *userdata,
- TaskPriority priority)
+TaskPool *BLI_task_pool_create_suspended(void *userdata, TaskPriority priority)
{
- return task_pool_create_ex(scheduler, userdata, false, true, priority);
+ return task_pool_create_ex(userdata, TASK_POOL_TBB_SUSPENDED, priority);
}
-void BLI_task_pool_free(TaskPool *pool)
+/**
+ * Single threaded task pool that executes pushed task immediately, for
+ * debugging purposes.
+ */
+TaskPool *BLI_task_pool_create_no_threads(void *userdata)
{
- BLI_task_pool_cancel(pool);
-
- BLI_mutex_end(&pool->num_mutex);
- BLI_condition_end(&pool->num_cond);
+ return task_pool_create_ex(userdata, TASK_POOL_NO_THREADS, TASK_PRIORITY_HIGH);
+}
- BLI_mutex_end(&pool->user_mutex);
+/**
+ * Task pool that executes one task after the other, possibly on different threads
+ * but never in parallel.
+ */
+TaskPool *BLI_task_pool_create_background_serial(void *userdata, TaskPriority priority)
+{
+ return task_pool_create_ex(userdata, TASK_POOL_BACKGROUND_SERIAL, priority);
+}
-#ifdef DEBUG_STATS
- printf("Thread ID Allocated Reused Discarded\n");
- for (int i = 0; i < pool->scheduler->num_threads + 1; i++) {
- printf("%02d %05d %05d %05d\n",
- i,
- pool->mempool_stats[i].num_alloc,
- pool->mempool_stats[i].num_reuse,
- pool->mempool_stats[i].num_discard);
+void BLI_task_pool_free(TaskPool *pool)
+{
+ switch (pool->type) {
+ case TASK_POOL_TBB:
+ case TASK_POOL_TBB_SUSPENDED:
+ case TASK_POOL_NO_THREADS:
+ tbb_task_pool_free(pool);
+ break;
+ case TASK_POOL_BACKGROUND:
+ case TASK_POOL_BACKGROUND_SERIAL:
+ background_task_pool_free(pool);
+ break;
}
- MEM_freeN(pool->mempool_stats);
-#endif
- if (pool->use_local_tls) {
- free_task_tls(&pool->local_tls);
- }
+ BLI_mutex_end(&pool->user_mutex);
MEM_freeN(pool);
BLI_threaded_malloc_end();
}
-BLI_INLINE bool task_can_use_local_queues(TaskPool *pool, int thread_id)
-{
- return (thread_id != -1 && (thread_id != pool->thread_id || pool->do_work));
-}
-
-static void task_pool_push(TaskPool *pool,
- TaskRunFunction run,
- void *taskdata,
- bool free_taskdata,
- TaskFreeFunction freedata,
- int thread_id)
-{
- /* Allocate task and fill it's properties. */
- Task *task = task_alloc(pool, thread_id);
- task->run = run;
- task->taskdata = taskdata;
- task->free_taskdata = free_taskdata;
- task->freedata = freedata;
- task->pool = pool;
- /* For suspended pools we put everything yo a global queue first
- * and exit as soon as possible.
- *
- * This tasks will be moved to actual execution when pool is
- * activated by work_and_wait().
- */
- if (pool->is_suspended) {
- BLI_addhead(&pool->suspended_queue, task);
- atomic_fetch_and_add_z(&pool->num_suspended, 1);
- return;
- }
- /* Populate to any local queue first, this is cheapest push ever. */
- if (task_can_use_local_queues(pool, thread_id)) {
- ASSERT_THREAD_ID(pool->scheduler, thread_id);
- TaskThreadLocalStorage *tls = get_task_tls(pool, thread_id);
- /* Try to push to a local execution queue.
- * These tasks will be picked up next.
- */
- if (tls->num_local_queue < LOCAL_QUEUE_SIZE) {
- tls->local_queue[tls->num_local_queue] = task;
- tls->num_local_queue++;
- return;
- }
- /* If we are in the delayed tasks push mode, we push tasks to a
- * temporary local queue first without any locks, and then move them
- * to global execution queue with a single lock.
- */
- if (tls->do_delayed_push && tls->num_delayed_queue < DELAYED_QUEUE_SIZE) {
- tls->delayed_queue[tls->num_delayed_queue] = task;
- tls->num_delayed_queue++;
- return;
- }
- }
- /* Do push to a global execution pool, slowest possible method,
- * causes quite reasonable amount of threading overhead.
- */
- task_scheduler_push(pool->scheduler, task, pool->priority);
-}
-
void BLI_task_pool_push(TaskPool *pool,
TaskRunFunction run,
void *taskdata,
bool free_taskdata,
TaskFreeFunction freedata)
{
- task_pool_push(pool, run, taskdata, free_taskdata, freedata, -1);
-}
+ Task task(pool, run, taskdata, free_taskdata, freedata);
-void BLI_task_pool_push_from_thread(TaskPool *pool,
- TaskRunFunction run,
- void *taskdata,
- bool free_taskdata,
- TaskFreeFunction freedata,
- int thread_id)
-{
- task_pool_push(pool, run, taskdata, free_taskdata, freedata, thread_id);
+ switch (pool->type) {
+ case TASK_POOL_TBB:
+ case TASK_POOL_TBB_SUSPENDED:
+ case TASK_POOL_NO_THREADS:
+ tbb_task_pool_run(pool, std::move(task));
+ break;
+ case TASK_POOL_BACKGROUND:
+ case TASK_POOL_BACKGROUND_SERIAL:
+ background_task_pool_run(pool, std::move(task));
+ break;
+ }
}
void BLI_task_pool_work_and_wait(TaskPool *pool)
{
- TaskThreadLocalStorage *tls = get_task_tls(pool, pool->thread_id);
- TaskScheduler *scheduler = pool->scheduler;
-
- if (atomic_fetch_and_and_uint8((uint8_t *)&pool->is_suspended, 0)) {
- if (pool->num_suspended) {
- task_pool_num_increase(pool, pool->num_suspended);
- BLI_mutex_lock(&scheduler->queue_mutex);
-
- BLI_movelisttolist(&scheduler->queue, &pool->suspended_queue);
-
- BLI_condition_notify_all(&scheduler->queue_cond);
- BLI_mutex_unlock(&scheduler->queue_mutex);
-
- pool->num_suspended = 0;
- }
- }
-
- pool->do_work = true;
-
- ASSERT_THREAD_ID(pool->scheduler, pool->thread_id);
-
- handle_local_queue(tls, pool->thread_id);
-
- BLI_mutex_lock(&pool->num_mutex);
-
- while (pool->num != 0) {
- Task *task, *work_task = NULL;
- bool found_task = false;
-
- BLI_mutex_unlock(&pool->num_mutex);
-
- BLI_mutex_lock(&scheduler->queue_mutex);
-
- /* find task from this pool. if we get a task from another pool,
- * we can get into deadlock */
-
- for (task = (Task *)scheduler->queue.first; task; task = task->next) {
- if (task->pool == pool) {
- work_task = task;
- found_task = true;
- BLI_remlink(&scheduler->queue, task);
- break;
- }
- }
-
- BLI_mutex_unlock(&scheduler->queue_mutex);
-
- /* if found task, do it, otherwise wait until other tasks are done */
- if (found_task) {
- /* run task */
- BLI_assert(!tls->do_delayed_push);
- work_task->run(pool, work_task->taskdata, pool->thread_id);
- BLI_assert(!tls->do_delayed_push);
-
- /* delete task */
- task_free(pool, task, pool->thread_id);
-
- /* Handle all tasks from local queue. */
- handle_local_queue(tls, pool->thread_id);
-
- /* notify pool task was done */
- task_pool_num_decrease(pool, 1);
- }
-
- BLI_mutex_lock(&pool->num_mutex);
- if (pool->num == 0) {
+ switch (pool->type) {
+ case TASK_POOL_TBB:
+ case TASK_POOL_TBB_SUSPENDED:
+ case TASK_POOL_NO_THREADS:
+ tbb_task_pool_work_and_wait(pool);
+ break;
+ case TASK_POOL_BACKGROUND:
+ case TASK_POOL_BACKGROUND_SERIAL:
+ background_task_pool_work_and_wait(pool);
break;
- }
-
- if (!found_task) {
- BLI_condition_wait(&pool->num_cond, &pool->num_mutex);
- }
}
-
- BLI_mutex_unlock(&pool->num_mutex);
-
- BLI_assert(tls->num_local_queue == 0);
-}
-
-void BLI_task_pool_work_wait_and_reset(TaskPool *pool)
-{
- BLI_task_pool_work_and_wait(pool);
-
- pool->do_work = false;
- pool->is_suspended = pool->start_suspended;
}
void BLI_task_pool_cancel(TaskPool *pool)
{
- pool->do_cancel = true;
-
- task_scheduler_clear(pool->scheduler, pool);
-
- /* wait until all entries are cleared */
- BLI_mutex_lock(&pool->num_mutex);
- while (pool->num) {
- BLI_condition_wait(&pool->num_cond, &pool->num_mutex);
+ switch (pool->type) {
+ case TASK_POOL_TBB:
+ case TASK_POOL_TBB_SUSPENDED:
+ case TASK_POOL_NO_THREADS:
+ tbb_task_pool_cancel(pool);
+ break;
+ case TASK_POOL_BACKGROUND:
+ case TASK_POOL_BACKGROUND_SERIAL:
+ background_task_pool_cancel(pool);
+ break;
}
- BLI_mutex_unlock(&pool->num_mutex);
-
- pool->do_cancel = false;
}
bool BLI_task_pool_canceled(TaskPool *pool)
{
- return pool->do_cancel;
+ switch (pool->type) {
+ case TASK_POOL_TBB:
+ case TASK_POOL_TBB_SUSPENDED:
+ case TASK_POOL_NO_THREADS:
+ return tbb_task_pool_canceled(pool);
+ case TASK_POOL_BACKGROUND:
+ case TASK_POOL_BACKGROUND_SERIAL:
+ return background_task_pool_canceled(pool);
+ }
+ BLI_assert("BLI_task_pool_canceled: Control flow should not come here!");
+ return false;
}
-void *BLI_task_pool_userdata(TaskPool *pool)
+void *BLI_task_pool_user_data(TaskPool *pool)
{
return pool->userdata;
}
@@ -1001,29 +550,3 @@ ThreadMutex *BLI_task_pool_user_mutex(TaskPool *pool)
{
return &pool->user_mutex;
}
-
-int BLI_task_pool_creator_thread_id(TaskPool *pool)
-{
- return pool->thread_id;
-}
-
-void BLI_task_pool_delayed_push_begin(TaskPool *pool, int thread_id)
-{
- if (task_can_use_local_queues(pool, thread_id)) {
- ASSERT_THREAD_ID(pool->scheduler, thread_id);
- TaskThreadLocalStorage *tls = get_task_tls(pool, thread_id);
- tls->do_delayed_push = true;
- }
-}
-
-void BLI_task_pool_delayed_push_end(TaskPool *pool, int thread_id)
-{
- if (task_can_use_local_queues(pool, thread_id)) {
- ASSERT_THREAD_ID(pool->scheduler, thread_id);
- TaskThreadLocalStorage *tls = get_task_tls(pool, thread_id);
- BLI_assert(tls->do_delayed_push);
- task_scheduler_push_all(pool->scheduler, pool, tls->delayed_queue, tls->num_delayed_queue);
- tls->do_delayed_push = false;
- tls->num_delayed_queue = 0;
- }
-}