diff options
Diffstat (limited to 'source/blender/blenlib/intern/task.c')
-rw-r--r-- | source/blender/blenlib/intern/task.c | 784 |
1 files changed, 512 insertions, 272 deletions
diff --git a/source/blender/blenlib/intern/task.c b/source/blender/blenlib/intern/task.c index fc2d9674c2f..5d3c6b35ac1 100644 --- a/source/blender/blenlib/intern/task.c +++ b/source/blender/blenlib/intern/task.c @@ -32,6 +32,7 @@ #include "BLI_listbase.h" #include "BLI_math.h" +#include "BLI_mempool.h" #include "BLI_task.h" #include "BLI_threads.h" @@ -48,6 +49,39 @@ */ #define MEMPOOL_SIZE 256 +/* Number of tasks which are pushed directly to local thread queue. + * + * This allows thread to fetch next task without locking the whole queue. + */ +#define LOCAL_QUEUE_SIZE 1 + +/* Number of tasks which are allowed to be scheduled in a delayed manner. + * + * This allows to use less locks per graph node children schedule. More details + * could be found at TaskThreadLocalStorage::do_delayed_push. + */ +#define DELAYED_QUEUE_SIZE 4096 + +#ifndef NDEBUG +# define ASSERT_THREAD_ID(scheduler, thread_id) \ + do { \ + if (!BLI_thread_is_main()) { \ + TaskThread *thread = pthread_getspecific(scheduler->tls_id_key); \ + if (thread == NULL) { \ + BLI_assert(thread_id == 0); \ + } \ + else { \ + BLI_assert(thread_id == thread->id); \ + } \ + } \ + else { \ + BLI_assert(thread_id == 0); \ + } \ + } while (false) +#else +# define ASSERT_THREAD_ID(scheduler, thread_id) +#endif + typedef struct Task { struct Task *next, *prev; @@ -102,13 +136,35 @@ typedef struct TaskMemPoolStats { } TaskMemPoolStats; #endif +typedef struct TaskThreadLocalStorage { + /* Memory pool for faster task allocation. + * The idea is to re-use memory of finished/discarded tasks by this thread. + */ + TaskMemPool task_mempool; + + /* Local queue keeps thread alive by keeping small amount of tasks ready + * to be picked up without causing global thread locks for synchronization. + */ + int num_local_queue; + Task *local_queue[LOCAL_QUEUE_SIZE]; + + /* Thread can be marked for delayed tasks push. This is helpful when it's + * know that lots of subsequent task pushed will happen from the same thread + * without "interrupting" for task execution. + * + * We try to accumulate as much tasks as possible in a local queue without + * any locks first, and then we push all of them into a scheduler's queue + * from within a single mutex lock. + */ + bool do_delayed_push; + int num_delayed_queue; + Task *delayed_queue[DELAYED_QUEUE_SIZE]; +} TaskThreadLocalStorage; + struct TaskPool { TaskScheduler *scheduler; volatile size_t num; - volatile size_t done; - size_t num_threads; - size_t currently_running_tasks; ThreadMutex num_mutex; ThreadCondition num_cond; @@ -116,6 +172,11 @@ struct TaskPool { ThreadMutex user_mutex; volatile bool do_cancel; + volatile bool do_work; + + volatile bool is_suspended; + ListBase suspended_queue; + size_t num_suspended; /* If set, this pool may never be work_and_wait'ed, which means TaskScheduler * has to use its special background fallback thread in case we are in @@ -123,16 +184,20 @@ struct TaskPool { */ bool run_in_background; - /* This pool is used for caching task pointers for thread id 0. - * This could either point to a global scheduler's task_mempool[0] if the - * pool is handled form the main thread or point to task_mempool_local - * otherwise. - * - * This way we solve possible threading conflicts accessing same global - * memory pool from multiple threads from which wait_work() is called. + /* This is a task scheduler's ID of a thread at which pool was constructed. + * It will be used to access task TLS. + */ + int thread_id; + + /* For the pools which are created from non-main thread which is not a + * scheduler worker thread we can't re-use any of scheduler's threads TLS + * and have to use our own one. */ - TaskMemPool *task_mempool; - TaskMemPool task_mempool_local; + bool use_local_tls; + TaskThreadLocalStorage local_tls; +#ifndef NDEBUG + pthread_t creator_thread_id; +#endif #ifdef DEBUG_STATS TaskMemPoolStats *mempool_stats; @@ -142,7 +207,6 @@ struct TaskPool { struct TaskScheduler { pthread_t *threads; struct TaskThread *task_threads; - TaskMemPool *task_mempool; int num_threads; bool background_thread_only; @@ -151,15 +215,19 @@ struct TaskScheduler { ThreadCondition queue_cond; volatile bool do_exit; + + /* NOTE: In pthread's TLS we store the whole TaskThread structure. */ + pthread_key_t tls_id_key; }; typedef struct TaskThread { TaskScheduler *scheduler; int id; + TaskThreadLocalStorage tls; } TaskThread; /* Helper */ -static void task_data_free(Task *task, const int thread_id) +BLI_INLINE void task_data_free(Task *task, const int thread_id) { if (task->free_taskdata) { if (task->freedata) { @@ -171,28 +239,54 @@ static void task_data_free(Task *task, const int thread_id) } } -BLI_INLINE TaskMemPool *get_task_mempool(TaskPool *pool, const int thread_id) +BLI_INLINE void initialize_task_tls(TaskThreadLocalStorage *tls) +{ + memset(tls, 0, sizeof(TaskThreadLocalStorage)); +} + +BLI_INLINE TaskThreadLocalStorage *get_task_tls(TaskPool *pool, + const int thread_id) { + TaskScheduler *scheduler = pool->scheduler; + BLI_assert(thread_id >= 0); + BLI_assert(thread_id <= scheduler->num_threads); + if (pool->use_local_tls && thread_id == 0) { + BLI_assert(pool->thread_id == 0); + BLI_assert(!BLI_thread_is_main()); + BLI_assert(pthread_equal(pthread_self(), pool->creator_thread_id)); + return &pool->local_tls; + } if (thread_id == 0) { - return pool->task_mempool; + BLI_assert(BLI_thread_is_main()); + return &scheduler->task_threads[pool->thread_id].tls; + } + return &scheduler->task_threads[thread_id].tls; +} + +BLI_INLINE void free_task_tls(TaskThreadLocalStorage *tls) +{ + TaskMemPool *task_mempool = &tls->task_mempool; + for (int i = 0; i < task_mempool->num_tasks; ++i) { + MEM_freeN(task_mempool->tasks[i]); } - return &pool->scheduler->task_mempool[thread_id]; } static Task *task_alloc(TaskPool *pool, const int thread_id) { - assert(thread_id <= pool->scheduler->num_threads); + BLI_assert(thread_id <= pool->scheduler->num_threads); if (thread_id != -1) { - assert(thread_id >= 0); - TaskMemPool *mem_pool = get_task_mempool(pool, thread_id); + BLI_assert(thread_id >= 0); + BLI_assert(thread_id <= pool->scheduler->num_threads); + TaskThreadLocalStorage *tls = get_task_tls(pool, thread_id); + TaskMemPool *task_mempool = &tls->task_mempool; /* Try to re-use task memory from a thread local storage. */ - if (mem_pool->num_tasks > 0) { - --mem_pool->num_tasks; + if (task_mempool->num_tasks > 0) { + --task_mempool->num_tasks; /* Success! We've just avoided task allocation. */ #ifdef DEBUG_STATS pool->mempool_stats[thread_id].num_reuse++; #endif - return mem_pool->tasks[mem_pool->num_tasks]; + return task_mempool->tasks[task_mempool->num_tasks]; } /* We are doomed to allocate new task data. */ #ifdef DEBUG_STATS @@ -205,13 +299,17 @@ static Task *task_alloc(TaskPool *pool, const int thread_id) static void task_free(TaskPool *pool, Task *task, const int thread_id) { task_data_free(task, thread_id); - assert(thread_id >= 0); - assert(thread_id <= pool->scheduler->num_threads); - TaskMemPool *mem_pool = get_task_mempool(pool, thread_id); - if (mem_pool->num_tasks < MEMPOOL_SIZE - 1) { + BLI_assert(thread_id >= 0); + BLI_assert(thread_id <= pool->scheduler->num_threads); + if (thread_id == 0) { + BLI_assert(pool->use_local_tls || BLI_thread_is_main()); + } + TaskThreadLocalStorage *tls = get_task_tls(pool, thread_id); + TaskMemPool *task_mempool = &tls->task_mempool; + if (task_mempool->num_tasks < MEMPOOL_SIZE - 1) { /* Successfully allowed the task to be re-used later. */ - mem_pool->tasks[mem_pool->num_tasks] = task; - ++mem_pool->num_tasks; + task_mempool->tasks[task_mempool->num_tasks] = task; + ++task_mempool->num_tasks; } else { /* Local storage saturated, no other way than just discard @@ -237,8 +335,6 @@ static void task_pool_num_decrease(TaskPool *pool, size_t done) BLI_assert(pool->num >= done); pool->num -= done; - atomic_sub_and_fetch_z(&pool->currently_running_tasks, done); - pool->done += done; if (pool->num == 0) BLI_condition_notify_all(&pool->num_cond); @@ -246,11 +342,11 @@ static void task_pool_num_decrease(TaskPool *pool, size_t done) BLI_mutex_unlock(&pool->num_mutex); } -static void task_pool_num_increase(TaskPool *pool) +static void task_pool_num_increase(TaskPool *pool, size_t new) { BLI_mutex_lock(&pool->num_mutex); - pool->num++; + pool->num += new; BLI_condition_notify_all(&pool->num_cond); BLI_mutex_unlock(&pool->num_mutex); @@ -292,17 +388,10 @@ static bool task_scheduler_thread_wait_pop(TaskScheduler *scheduler, Task **task continue; } - if (atomic_add_and_fetch_z(&pool->currently_running_tasks, 1) <= pool->num_threads || - pool->num_threads == 0) - { - *task = current_task; - found_task = true; - BLI_remlink(&scheduler->queue, *task); - break; - } - else { - atomic_sub_and_fetch_z(&pool->currently_running_tasks, 1); - } + *task = current_task; + found_task = true; + BLI_remlink(&scheduler->queue, *task); + break; } if (!found_task) BLI_condition_wait(&scheduler->queue_cond, &scheduler->queue_mutex); @@ -313,23 +402,51 @@ static bool task_scheduler_thread_wait_pop(TaskScheduler *scheduler, Task **task return true; } +BLI_INLINE void handle_local_queue(TaskThreadLocalStorage *tls, + const int thread_id) +{ + BLI_assert(!tls->do_delayed_push); + while (tls->num_local_queue > 0) { + /* We pop task from queue before handling it so handler of the task can + * push next job to the local queue. + */ + tls->num_local_queue--; + Task *local_task = tls->local_queue[tls->num_local_queue]; + /* TODO(sergey): Double-check work_and_wait() doesn't handle other's + * pool tasks. + */ + TaskPool *local_pool = local_task->pool; + local_task->run(local_pool, local_task->taskdata, thread_id); + task_free(local_pool, local_task, thread_id); + } + BLI_assert(!tls->do_delayed_push); +} + static void *task_scheduler_thread_run(void *thread_p) { TaskThread *thread = (TaskThread *) thread_p; + TaskThreadLocalStorage *tls = &thread->tls; TaskScheduler *scheduler = thread->scheduler; int thread_id = thread->id; Task *task; + pthread_setspecific(scheduler->tls_id_key, thread); + /* keep popping off tasks */ while (task_scheduler_thread_wait_pop(scheduler, &task)) { TaskPool *pool = task->pool; /* run task */ + BLI_assert(!tls->do_delayed_push); task->run(pool, task->taskdata, thread_id); + BLI_assert(!tls->do_delayed_push); /* delete task */ task_free(pool, task, thread_id); + /* Handle all tasks from local queue. */ + handle_local_queue(tls, thread_id); + /* notify pool task was done */ task_pool_num_decrease(pool, 1); } @@ -359,30 +476,35 @@ TaskScheduler *BLI_task_scheduler_create(int num_threads) /* Add background-only thread if needed. */ if (num_threads == 0) { - scheduler->background_thread_only = true; - num_threads = 1; + scheduler->background_thread_only = true; + num_threads = 1; } + scheduler->task_threads = MEM_mallocN(sizeof(TaskThread) * (num_threads + 1), + "TaskScheduler task threads"); + + /* Initialize TLS for main thread. */ + initialize_task_tls(&scheduler->task_threads[0].tls); + + pthread_key_create(&scheduler->tls_id_key, NULL); + /* launch threads that will be waiting for work */ if (num_threads > 0) { int i; scheduler->num_threads = num_threads; scheduler->threads = MEM_callocN(sizeof(pthread_t) * num_threads, "TaskScheduler threads"); - scheduler->task_threads = MEM_callocN(sizeof(TaskThread) * num_threads, "TaskScheduler task threads"); for (i = 0; i < num_threads; i++) { - TaskThread *thread = &scheduler->task_threads[i]; + TaskThread *thread = &scheduler->task_threads[i + 1]; thread->scheduler = scheduler; thread->id = i + 1; + initialize_task_tls(&thread->tls); if (pthread_create(&scheduler->threads[i], NULL, task_scheduler_thread_run, thread) != 0) { fprintf(stderr, "TaskScheduler failed to launch thread %d/%d\n", i, num_threads); } } - - scheduler->task_mempool = MEM_callocN(sizeof(*scheduler->task_mempool) * (num_threads + 1), - "TaskScheduler task_mempool"); } return scheduler; @@ -398,6 +520,8 @@ void BLI_task_scheduler_free(TaskScheduler *scheduler) BLI_condition_notify_all(&scheduler->queue_cond); BLI_mutex_unlock(&scheduler->queue_mutex); + pthread_key_delete(scheduler->tls_id_key); + /* delete threads */ if (scheduler->threads) { int i; @@ -412,17 +536,12 @@ void BLI_task_scheduler_free(TaskScheduler *scheduler) /* Delete task thread data */ if (scheduler->task_threads) { - MEM_freeN(scheduler->task_threads); - } - - /* Delete task memory pool */ - if (scheduler->task_mempool) { - for (int i = 0; i <= scheduler->num_threads; ++i) { - for (int j = 0; j < scheduler->task_mempool[i].num_tasks; ++j) { - MEM_freeN(scheduler->task_mempool[i].tasks[j]); - } + for (int i = 0; i < scheduler->num_threads + 1; ++i) { + TaskThreadLocalStorage *tls = &scheduler->task_threads[i].tls; + free_task_tls(tls); } - MEM_freeN(scheduler->task_mempool); + + MEM_freeN(scheduler->task_threads); } /* delete leftover tasks */ @@ -445,7 +564,7 @@ int BLI_task_scheduler_num_threads(TaskScheduler *scheduler) static void task_scheduler_push(TaskScheduler *scheduler, Task *task, TaskPriority priority) { - task_pool_num_increase(task->pool); + task_pool_num_increase(task->pool, 1); /* add task to queue */ BLI_mutex_lock(&scheduler->queue_mutex); @@ -459,6 +578,27 @@ static void task_scheduler_push(TaskScheduler *scheduler, Task *task, TaskPriori BLI_mutex_unlock(&scheduler->queue_mutex); } +static void task_scheduler_push_all(TaskScheduler *scheduler, + TaskPool *pool, + Task **tasks, + int num_tasks) +{ + if (num_tasks == 0) { + return; + } + + task_pool_num_increase(pool, num_tasks); + + BLI_mutex_lock(&scheduler->queue_mutex); + + for (int i = 0; i < num_tasks; i++) { + BLI_addhead(&scheduler->queue, tasks[i]); + } + + BLI_condition_notify_all(&scheduler->queue_cond); + BLI_mutex_unlock(&scheduler->queue_mutex); +} + static void task_scheduler_clear(TaskScheduler *scheduler, TaskPool *pool) { Task *task, *nexttask; @@ -471,7 +611,7 @@ static void task_scheduler_clear(TaskScheduler *scheduler, TaskPool *pool) nexttask = task->next; if (task->pool == pool) { - task_data_free(task, 0); + task_data_free(task, pool->thread_id); BLI_freelinkN(&scheduler->queue, task); done++; @@ -486,7 +626,10 @@ static void task_scheduler_clear(TaskScheduler *scheduler, TaskPool *pool) /* Task Pool */ -static TaskPool *task_pool_create_ex(TaskScheduler *scheduler, void *userdata, const bool is_background) +static TaskPool *task_pool_create_ex(TaskScheduler *scheduler, + void *userdata, + const bool is_background, + const bool is_suspended) { TaskPool *pool = MEM_mallocN(sizeof(TaskPool), "TaskPool"); @@ -504,11 +647,13 @@ static TaskPool *task_pool_create_ex(TaskScheduler *scheduler, void *userdata, c pool->scheduler = scheduler; pool->num = 0; - pool->done = 0; - pool->num_threads = 0; - pool->currently_running_tasks = 0; pool->do_cancel = false; + pool->do_work = false; + pool->is_suspended = is_suspended; + pool->num_suspended = 0; + pool->suspended_queue.first = pool->suspended_queue.last = NULL; pool->run_in_background = is_background; + pool->use_local_tls = false; BLI_mutex_init(&pool->num_mutex); BLI_condition_init(&pool->num_cond); @@ -517,11 +662,26 @@ static TaskPool *task_pool_create_ex(TaskScheduler *scheduler, void *userdata, c BLI_mutex_init(&pool->user_mutex); if (BLI_thread_is_main()) { - pool->task_mempool = scheduler->task_mempool; + pool->thread_id = 0; } else { - pool->task_mempool = &pool->task_mempool_local; - pool->task_mempool_local.num_tasks = 0; + TaskThread *thread = pthread_getspecific(scheduler->tls_id_key); + if (thread == NULL) { + /* NOTE: Task pool is created from non-main thread which is not + * managed by the task scheduler. We identify ourselves as thread ID + * 0 but we do not use scheduler's TLS storage and use our own + * instead to avoid any possible threading conflicts. + */ + pool->thread_id = 0; + pool->use_local_tls = true; +#ifndef NDEBUG + pool->creator_thread_id = pthread_self(); +#endif + initialize_task_tls(&pool->local_tls); + } + else { + pool->thread_id = thread->id; + } } #ifdef DEBUG_STATS @@ -533,10 +693,10 @@ static TaskPool *task_pool_create_ex(TaskScheduler *scheduler, void *userdata, c /* Ensure malloc will go fine from threads, * * This is needed because we could be in main thread here - * and malloc could be non-threda safe at this point because + * and malloc could be non-thread safe at this point because * no other jobs are running. */ - BLI_begin_threaded_malloc(); + BLI_threaded_malloc_begin(); return pool; } @@ -548,7 +708,7 @@ static TaskPool *task_pool_create_ex(TaskScheduler *scheduler, void *userdata, c */ TaskPool *BLI_task_pool_create(TaskScheduler *scheduler, void *userdata) { - return task_pool_create_ex(scheduler, userdata, false); + return task_pool_create_ex(scheduler, userdata, false, false); } /** @@ -563,25 +723,28 @@ TaskPool *BLI_task_pool_create(TaskScheduler *scheduler, void *userdata) */ TaskPool *BLI_task_pool_create_background(TaskScheduler *scheduler, void *userdata) { - return task_pool_create_ex(scheduler, userdata, true); + return task_pool_create_ex(scheduler, userdata, true, false); +} + +/** + * Similar to BLI_task_pool_create() but does not schedule any tasks for execution + * for until BLI_task_pool_work_and_wait() is called. This helps reducing therading + * overhead when pushing huge amount of small initial tasks from the main thread. + */ +TaskPool *BLI_task_pool_create_suspended(TaskScheduler *scheduler, void *userdata) +{ + return task_pool_create_ex(scheduler, userdata, false, true); } void BLI_task_pool_free(TaskPool *pool) { - BLI_task_pool_stop(pool); + BLI_task_pool_cancel(pool); BLI_mutex_end(&pool->num_mutex); BLI_condition_end(&pool->num_cond); BLI_mutex_end(&pool->user_mutex); - /* Free local memory pool, those pointers are lost forever. */ - if (pool->task_mempool == &pool->task_mempool_local) { - for (int i = 0; i < pool->task_mempool_local.num_tasks; i++) { - MEM_freeN(pool->task_mempool_local.tasks[i]); - } - } - #ifdef DEBUG_STATS printf("Thread ID Allocated Reused Discarded\n"); for (int i = 0; i < pool->scheduler->num_threads + 1; ++i) { @@ -594,9 +757,18 @@ void BLI_task_pool_free(TaskPool *pool) MEM_freeN(pool->mempool_stats); #endif + if (pool->use_local_tls) { + free_task_tls(&pool->local_tls); + } + MEM_freeN(pool); - BLI_end_threaded_malloc(); + BLI_threaded_malloc_end(); +} + +BLI_INLINE bool task_can_use_local_queues(TaskPool *pool, int thread_id) +{ + return (thread_id != -1 && (thread_id != pool->thread_id || pool->do_work)); } static void task_pool_push( @@ -604,14 +776,49 @@ static void task_pool_push( bool free_taskdata, TaskFreeFunction freedata, TaskPriority priority, int thread_id) { + /* Allocate task and fill it's properties. */ Task *task = task_alloc(pool, thread_id); - task->run = run; task->taskdata = taskdata; task->free_taskdata = free_taskdata; task->freedata = freedata; task->pool = pool; - + /* For suspended pools we put everything yo a global queue first + * and exit as soon as possible. + * + * This tasks will be moved to actual execution when pool is + * activated by work_and_wait(). + */ + if (pool->is_suspended) { + BLI_addhead(&pool->suspended_queue, task); + atomic_fetch_and_add_z(&pool->num_suspended, 1); + return; + } + /* Populate to any local queue first, this is cheapest push ever. */ + if (task_can_use_local_queues(pool, thread_id)) { + ASSERT_THREAD_ID(pool->scheduler, thread_id); + TaskThreadLocalStorage *tls = get_task_tls(pool, thread_id); + /* Try to push to a local execution queue. + * These tasks will be picked up next. + */ + if (tls->num_local_queue < LOCAL_QUEUE_SIZE) { + tls->local_queue[tls->num_local_queue] = task; + tls->num_local_queue++; + return; + } + /* If we are in the delayed tasks push mode, we push tasks to a + * temporary local queue first without any locks, and then move them + * to global execution queue with a single lock. + */ + if (tls->do_delayed_push && tls->num_delayed_queue < DELAYED_QUEUE_SIZE) { + tls->delayed_queue[tls->num_delayed_queue] = task; + tls->num_delayed_queue++; + return; + } + } + /* Do push to a global execution ppol, slowest possible method, + * causes quite reasonable amount of threading overhead. + */ task_scheduler_push(pool->scheduler, task, priority); } @@ -636,8 +843,25 @@ void BLI_task_pool_push_from_thread(TaskPool *pool, TaskRunFunction run, void BLI_task_pool_work_and_wait(TaskPool *pool) { + TaskThreadLocalStorage *tls = get_task_tls(pool, pool->thread_id); TaskScheduler *scheduler = pool->scheduler; + if (atomic_fetch_and_and_uint8((uint8_t *)&pool->is_suspended, 0)) { + if (pool->num_suspended) { + task_pool_num_increase(pool, pool->num_suspended); + BLI_mutex_lock(&scheduler->queue_mutex); + + BLI_movelisttolist(&scheduler->queue, &pool->suspended_queue); + + BLI_condition_notify_all(&scheduler->queue_cond); + BLI_mutex_unlock(&scheduler->queue_mutex); + } + } + + pool->do_work = true; + + ASSERT_THREAD_ID(pool->scheduler, pool->thread_id); + BLI_mutex_lock(&pool->num_mutex); while (pool->num != 0) { @@ -651,16 +875,12 @@ void BLI_task_pool_work_and_wait(TaskPool *pool) /* find task from this pool. if we get a task from another pool, * we can get into deadlock */ - if (pool->num_threads == 0 || - pool->currently_running_tasks < pool->num_threads) - { - for (task = scheduler->queue.first; task; task = task->next) { - if (task->pool == pool) { - work_task = task; - found_task = true; - BLI_remlink(&scheduler->queue, task); - break; - } + for (task = scheduler->queue.first; task; task = task->next) { + if (task->pool == pool) { + work_task = task; + found_task = true; + BLI_remlink(&scheduler->queue, task); + break; } } @@ -669,11 +889,15 @@ void BLI_task_pool_work_and_wait(TaskPool *pool) /* if found task, do it, otherwise wait until other tasks are done */ if (found_task) { /* run task */ - atomic_add_and_fetch_z(&pool->currently_running_tasks, 1); - work_task->run(pool, work_task->taskdata, 0); + BLI_assert(!tls->do_delayed_push); + work_task->run(pool, work_task->taskdata, pool->thread_id); + BLI_assert(!tls->do_delayed_push); /* delete task */ - task_free(pool, task, 0); + task_free(pool, task, pool->thread_id); + + /* Handle all tasks from local queue. */ + handle_local_queue(tls, pool->thread_id); /* notify pool task was done */ task_pool_num_decrease(pool, 1); @@ -688,22 +912,8 @@ void BLI_task_pool_work_and_wait(TaskPool *pool) } BLI_mutex_unlock(&pool->num_mutex); -} -int BLI_pool_get_num_threads(TaskPool *pool) -{ - if (pool->num_threads != 0) { - return pool->num_threads; - } - else { - return BLI_task_scheduler_num_threads(pool->scheduler); - } -} - -void BLI_pool_set_num_threads(TaskPool *pool, int num_threads) -{ - /* NOTE: Don't try to modify threads while tasks are running! */ - pool->num_threads = num_threads; + handle_local_queue(tls, pool->thread_id); } void BLI_task_pool_cancel(TaskPool *pool) @@ -721,13 +931,6 @@ void BLI_task_pool_cancel(TaskPool *pool) pool->do_cancel = false; } -void BLI_task_pool_stop(TaskPool *pool) -{ - task_scheduler_clear(pool->scheduler, pool); - - BLI_assert(pool->num == 0); -} - bool BLI_task_pool_canceled(TaskPool *pool) { return pool->do_cancel; @@ -743,9 +946,28 @@ ThreadMutex *BLI_task_pool_user_mutex(TaskPool *pool) return &pool->user_mutex; } -size_t BLI_task_pool_tasks_done(TaskPool *pool) +void BLI_task_pool_delayed_push_begin(TaskPool *pool, int thread_id) { - return pool->done; + if (task_can_use_local_queues(pool, thread_id)) { + ASSERT_THREAD_ID(pool->scheduler, thread_id); + TaskThreadLocalStorage *tls = get_task_tls(pool, thread_id); + tls->do_delayed_push = true; + } +} + +void BLI_task_pool_delayed_push_end(TaskPool *pool, int thread_id) +{ + if (task_can_use_local_queues(pool, thread_id)) { + ASSERT_THREAD_ID(pool->scheduler, thread_id); + TaskThreadLocalStorage *tls = get_task_tls(pool, thread_id); + BLI_assert(tls->do_delayed_push); + task_scheduler_push_all(pool->scheduler, + pool, + tls->delayed_queue, + tls->num_delayed_queue); + tls->do_delayed_push = false; + tls->num_delayed_queue = 0; + } } /* Parallel range routines */ @@ -772,7 +994,6 @@ typedef struct ParallelRangeState { void *userdata; TaskParallelRangeFunc func; - TaskParallelRangeFuncEx func_ex; int iter; int chunk_size; @@ -782,8 +1003,7 @@ BLI_INLINE bool parallel_range_next_iter_get( ParallelRangeState * __restrict state, int * __restrict iter, int * __restrict count) { - uint32_t uval = atomic_fetch_and_add_uint32((uint32_t *)(&state->iter), state->chunk_size); - int previter = *(int32_t*)&uval; + int previter = atomic_fetch_and_add_int32(&state->iter, state->chunk_size); *iter = previter; *count = max_ii(0, min_ii(state->chunk_size, state->stop - previter)); @@ -794,51 +1014,67 @@ BLI_INLINE bool parallel_range_next_iter_get( static void parallel_range_func( TaskPool * __restrict pool, void *userdata_chunk, - int threadid) + int thread_id) { ParallelRangeState * __restrict state = BLI_task_pool_userdata(pool); + ParallelRangeTLS tls = { + .thread_id = thread_id, + .userdata_chunk = userdata_chunk, + }; int iter, count; - while (parallel_range_next_iter_get(state, &iter, &count)) { - int i; - - if (state->func_ex) { - for (i = 0; i < count; ++i) { - state->func_ex(state->userdata, userdata_chunk, iter + i, threadid); - } - } - else { - for (i = 0; i < count; ++i) { - state->func(state->userdata, iter + i); - } + for (int i = 0; i < count; ++i) { + state->func(state->userdata, iter + i, &tls); } } } +static void palallel_range_single_thread(const int start, int const stop, + void *userdata, + TaskParallelRangeFunc func, + const ParallelRangeSettings *settings) +{ + void *userdata_chunk = settings->userdata_chunk; + const size_t userdata_chunk_size = settings->userdata_chunk_size; + void *userdata_chunk_local = NULL; + const bool use_userdata_chunk = (userdata_chunk_size != 0) && (userdata_chunk != NULL); + if (use_userdata_chunk) { + userdata_chunk_local = MALLOCA(userdata_chunk_size); + memcpy(userdata_chunk_local, userdata_chunk, userdata_chunk_size); + } + ParallelRangeTLS tls = { + .thread_id = 0, + .userdata_chunk = userdata_chunk_local, + }; + for (int i = start; i < stop; ++i) { + func(userdata, i, &tls); + } + if (settings->func_finalize != NULL) { + settings->func_finalize(userdata, userdata_chunk_local); + } + MALLOCA_FREE(userdata_chunk_local, userdata_chunk_size); +} + /** * This function allows to parallelized for loops in a similar way to OpenMP's 'parallel for' statement. * - * See public API doc for description of parameters. + * See public API doc of ParallelRangeSettings for description of all settings. */ -static void task_parallel_range_ex( - int start, int stop, - void *userdata, - void *userdata_chunk, - const size_t userdata_chunk_size, - TaskParallelRangeFunc func, - TaskParallelRangeFuncEx func_ex, - TaskParallelRangeFuncFinalize func_finalize, - const bool use_threading, - const bool use_dynamic_scheduling) +void BLI_task_parallel_range(const int start, const int stop, + void *userdata, + TaskParallelRangeFunc func, + const ParallelRangeSettings *settings) { TaskScheduler *task_scheduler; TaskPool *task_pool; ParallelRangeState state; int i, num_threads, num_tasks; + void *userdata_chunk = settings->userdata_chunk; + const size_t userdata_chunk_size = settings->userdata_chunk_size; void *userdata_chunk_local = NULL; void *userdata_chunk_array = NULL; - const bool use_userdata_chunk = (func_ex != NULL) && (userdata_chunk_size != 0) && (userdata_chunk != NULL); + const bool use_userdata_chunk = (userdata_chunk_size != 0) && (userdata_chunk != NULL); if (start == stop) { return; @@ -846,67 +1082,65 @@ static void task_parallel_range_ex( BLI_assert(start < stop); if (userdata_chunk_size != 0) { - BLI_assert(func_ex != NULL && func == NULL); BLI_assert(userdata_chunk != NULL); } /* If it's not enough data to be crunched, don't bother with tasks at all, * do everything from the main thread. */ - if (!use_threading) { - if (func_ex) { - if (use_userdata_chunk) { - userdata_chunk_local = MALLOCA(userdata_chunk_size); - memcpy(userdata_chunk_local, userdata_chunk, userdata_chunk_size); - } - - for (i = start; i < stop; ++i) { - func_ex(userdata, userdata_chunk_local, i, 0); - } - - if (func_finalize) { - func_finalize(userdata, userdata_chunk_local); - } - - MALLOCA_FREE(userdata_chunk_local, userdata_chunk_size); - } - else { - for (i = start; i < stop; ++i) { - func(userdata, i); - } - } - + if (!settings->use_threading) { + palallel_range_single_thread(start, stop, + userdata, + func, + settings); return; } task_scheduler = BLI_task_scheduler_get(); - task_pool = BLI_task_pool_create(task_scheduler, &state); num_threads = BLI_task_scheduler_num_threads(task_scheduler); /* The idea here is to prevent creating task for each of the loop iterations * and instead have tasks which are evenly distributed across CPU cores and * pull next iter to be crunched using the queue. */ - num_tasks = num_threads * 2; + num_tasks = num_threads + 2; state.start = start; state.stop = stop; state.userdata = userdata; state.func = func; - state.func_ex = func_ex; state.iter = start; - if (use_dynamic_scheduling) { - state.chunk_size = 32; + switch (settings->scheduling_mode) { + case TASK_SCHEDULING_STATIC: + state.chunk_size = max_ii( + settings->min_iter_per_thread, + (stop - start) / (num_tasks)); + break; + case TASK_SCHEDULING_DYNAMIC: + /* TODO(sergey): Make it configurable from min_iter_per_thread. */ + state.chunk_size = 32; + break; } - else { - state.chunk_size = max_ii(1, (stop - start) / (num_tasks)); + + num_tasks = min_ii(num_tasks, + max_ii(1, (stop - start) / state.chunk_size)); + + if (num_tasks == 1) { + palallel_range_single_thread(start, stop, + userdata, + func, + settings); + return; } - num_tasks = min_ii(num_tasks, (stop - start) / state.chunk_size); - atomic_fetch_and_add_uint32((uint32_t *)(&state.iter), 0); + task_pool = BLI_task_pool_create_suspended(task_scheduler, &state); + + /* NOTE: This way we are adding a memory barrier and ensure all worker + * threads can read and modify the value, without any locks. */ + atomic_fetch_and_add_int32(&state.iter, 0); if (use_userdata_chunk) { - userdata_chunk_array = MALLOCA(userdata_chunk_size * num_tasks); + userdata_chunk_array = MALLOCA(userdata_chunk_size * num_tasks); } for (i = 0; i < num_tasks; i++) { @@ -918,105 +1152,24 @@ static void task_parallel_range_ex( BLI_task_pool_push_from_thread(task_pool, parallel_range_func, userdata_chunk_local, false, - TASK_PRIORITY_HIGH, 0); + TASK_PRIORITY_HIGH, + task_pool->thread_id); } BLI_task_pool_work_and_wait(task_pool); BLI_task_pool_free(task_pool); if (use_userdata_chunk) { - if (func_finalize) { + if (settings->func_finalize != NULL) { for (i = 0; i < num_tasks; i++) { userdata_chunk_local = (char *)userdata_chunk_array + (userdata_chunk_size * i); - func_finalize(userdata, userdata_chunk_local); + settings->func_finalize(userdata, userdata_chunk_local); } } MALLOCA_FREE(userdata_chunk_array, userdata_chunk_size * num_tasks); } } -/** - * This function allows to parallelize for loops in a similar way to OpenMP's 'parallel for' statement. - * - * \param start First index to process. - * \param stop Index to stop looping (excluded). - * \param userdata Common userdata passed to all instances of \a func. - * \param userdata_chunk Optional, each instance of looping chunks will get a copy of this data - * (similar to OpenMP's firstprivate). - * \param userdata_chunk_size Memory size of \a userdata_chunk. - * \param func_ex Callback function (advanced version). - * \param use_threading If \a true, actually split-execute loop in threads, else just do a sequential forloop - * (allows caller to use any kind of test to switch on parallelization or not). - * \param use_dynamic_scheduling If \a true, the whole range is divided in a lot of small chunks (of size 32 currently), - * otherwise whole range is split in a few big chunks (num_threads * 2 chunks currently). - */ -void BLI_task_parallel_range_ex( - int start, int stop, - void *userdata, - void *userdata_chunk, - const size_t userdata_chunk_size, - TaskParallelRangeFuncEx func_ex, - const bool use_threading, - const bool use_dynamic_scheduling) -{ - task_parallel_range_ex( - start, stop, userdata, userdata_chunk, userdata_chunk_size, NULL, func_ex, NULL, - use_threading, use_dynamic_scheduling); -} - -/** - * A simpler version of \a BLI_task_parallel_range_ex, which does not use \a use_dynamic_scheduling, - * and does not handle 'firstprivate'-like \a userdata_chunk. - * - * \param start First index to process. - * \param stop Index to stop looping (excluded). - * \param userdata Common userdata passed to all instances of \a func. - * \param func Callback function (simple version). - * \param use_threading If \a true, actually split-execute loop in threads, else just do a sequential forloop - * (allows caller to use any kind of test to switch on parallelization or not). - */ -void BLI_task_parallel_range( - int start, int stop, - void *userdata, - TaskParallelRangeFunc func, - const bool use_threading) -{ - task_parallel_range_ex(start, stop, userdata, NULL, 0, func, NULL, NULL, use_threading, false); -} - -/** - * This function allows to parallelize for loops in a similar way to OpenMP's 'parallel for' statement, - * with an additional 'finalize' func called from calling thread once whole range have been processed. - * - * \param start First index to process. - * \param stop Index to stop looping (excluded). - * \param userdata Common userdata passed to all instances of \a func. - * \param userdata_chunk Optional, each instance of looping chunks will get a copy of this data - * (similar to OpenMP's firstprivate). - * \param userdata_chunk_size Memory size of \a userdata_chunk. - * \param func_ex Callback function (advanced version). - * \param func_finalize Callback function, called after all workers have finished, - * useful to finalize accumulative tasks. - * \param use_threading If \a true, actually split-execute loop in threads, else just do a sequential forloop - * (allows caller to use any kind of test to switch on parallelization or not). - * \param use_dynamic_scheduling If \a true, the whole range is divided in a lot of small chunks (of size 32 currently), - * otherwise whole range is split in a few big chunks (num_threads * 2 chunks currently). - */ -void BLI_task_parallel_range_finalize( - int start, int stop, - void *userdata, - void *userdata_chunk, - const size_t userdata_chunk_size, - TaskParallelRangeFuncEx func_ex, - TaskParallelRangeFuncFinalize func_finalize, - const bool use_threading, - const bool use_dynamic_scheduling) -{ - task_parallel_range_ex( - start, stop, userdata, userdata_chunk, userdata_chunk_size, NULL, func_ex, func_finalize, - use_threading, use_dynamic_scheduling); -} - #undef MALLOCA #undef MALLOCA_FREE @@ -1103,14 +1256,14 @@ void BLI_task_parallel_listbase( } task_scheduler = BLI_task_scheduler_get(); - task_pool = BLI_task_pool_create(task_scheduler, &state); + task_pool = BLI_task_pool_create_suspended(task_scheduler, &state); num_threads = BLI_task_scheduler_num_threads(task_scheduler); /* The idea here is to prevent creating task for each of the loop iterations * and instead have tasks which are evenly distributed across CPU cores and * pull next iter to be crunched using the queue. */ - num_tasks = num_threads * 2; + num_tasks = num_threads + 2; state.index = 0; state.link = listbase->first; @@ -1124,7 +1277,8 @@ void BLI_task_parallel_listbase( BLI_task_pool_push_from_thread(task_pool, parallel_listbase_func, NULL, false, - TASK_PRIORITY_HIGH, 0); + TASK_PRIORITY_HIGH, + task_pool->thread_id); } BLI_task_pool_work_and_wait(task_pool); @@ -1132,3 +1286,89 @@ void BLI_task_parallel_listbase( BLI_spin_end(&state.lock); } + + +typedef struct ParallelMempoolState { + void *userdata; + TaskParallelMempoolFunc func; +} ParallelMempoolState; + +static void parallel_mempool_func( + TaskPool * __restrict pool, + void *taskdata, + int UNUSED(threadid)) +{ + ParallelMempoolState * __restrict state = BLI_task_pool_userdata(pool); + BLI_mempool_iter *iter = taskdata; + MempoolIterData *item; + + while ((item = BLI_mempool_iterstep(iter)) != NULL) { + state->func(state->userdata, item); + } +} + +/** + * This function allows to parallelize for loops over Mempool items. + * + * \param mempool: The iterable BLI_mempool to loop over. + * \param userdata: Common userdata passed to all instances of \a func. + * \param func: Callback function. + * \param use_threading: If \a true, actually split-execute loop in threads, else just do a sequential for loop + * (allows caller to use any kind of test to switch on parallelization or not). + * + * \note There is no static scheduling here. + */ +void BLI_task_parallel_mempool( + BLI_mempool *mempool, + void *userdata, + TaskParallelMempoolFunc func, + const bool use_threading) +{ + TaskScheduler *task_scheduler; + TaskPool *task_pool; + ParallelMempoolState state; + int i, num_threads, num_tasks; + + if (BLI_mempool_len(mempool) == 0) { + return; + } + + if (!use_threading) { + BLI_mempool_iter iter; + BLI_mempool_iternew(mempool, &iter); + + for (void *item = BLI_mempool_iterstep(&iter); item != NULL; item = BLI_mempool_iterstep(&iter)) { + func(userdata, item); + } + return; + } + + task_scheduler = BLI_task_scheduler_get(); + task_pool = BLI_task_pool_create_suspended(task_scheduler, &state); + num_threads = BLI_task_scheduler_num_threads(task_scheduler); + + /* The idea here is to prevent creating task for each of the loop iterations + * and instead have tasks which are evenly distributed across CPU cores and + * pull next item to be crunched using the threaded-aware BLI_mempool_iter. + */ + num_tasks = num_threads + 2; + + state.userdata = userdata; + state.func = func; + + BLI_mempool_iter *mempool_iterators = BLI_mempool_iter_threadsafe_create(mempool, (size_t)num_tasks); + + for (i = 0; i < num_tasks; i++) { + /* Use this pool's pre-allocated tasks. */ + BLI_task_pool_push_from_thread(task_pool, + parallel_mempool_func, + &mempool_iterators[i], false, + TASK_PRIORITY_HIGH, + task_pool->thread_id); + } + + BLI_task_pool_work_and_wait(task_pool); + BLI_task_pool_free(task_pool); + + BLI_mempool_iter_threadsafe_free(mempool_iterators); +} |