diff options
Diffstat (limited to 'source/blender/blenlib/intern/task.c')
-rw-r--r-- | source/blender/blenlib/intern/task.c | 469 |
1 files changed, 346 insertions, 123 deletions
diff --git a/source/blender/blenlib/intern/task.c b/source/blender/blenlib/intern/task.c index fc2d9674c2f..e050f3148b8 100644 --- a/source/blender/blenlib/intern/task.c +++ b/source/blender/blenlib/intern/task.c @@ -48,6 +48,39 @@ */ #define MEMPOOL_SIZE 256 +/* Number of tasks which are pushed directly to local thread queue. + * + * This allows thread to fetch next task without locking the whole queue. + */ +#define LOCAL_QUEUE_SIZE 1 + +/* Number of tasks which are allowed to be scheduled in a delayed manner. + * + * This allows to use less locks per graph node children schedule. More details + * could be found at TaskThreadLocalStorage::do_delayed_push. + */ +#define DELAYED_QUEUE_SIZE 4096 + +#ifndef NDEBUG +# define ASSERT_THREAD_ID(scheduler, thread_id) \ + do { \ + if (!BLI_thread_is_main()) { \ + TaskThread *thread = pthread_getspecific(scheduler->tls_id_key); \ + if (thread == NULL) { \ + BLI_assert(thread_id == 0); \ + } \ + else { \ + BLI_assert(thread_id == thread->id); \ + } \ + } \ + else { \ + BLI_assert(thread_id == 0); \ + } \ + } while (false) +#else +# define ASSERT_THREAD_ID(scheduler, thread_id) +#endif + typedef struct Task { struct Task *next, *prev; @@ -102,13 +135,35 @@ typedef struct TaskMemPoolStats { } TaskMemPoolStats; #endif +typedef struct TaskThreadLocalStorage { + /* Memory pool for faster task allocation. + * The idea is to re-use memory of finished/discarded tasks by this thread. + */ + TaskMemPool task_mempool; + + /* Local queue keeps thread alive by keeping small amount of tasks ready + * to be picked up without causing global thread locks for synchronization. + */ + int num_local_queue; + Task *local_queue[LOCAL_QUEUE_SIZE]; + + /* Thread can be marked for delayed tasks push. This is helpful when it's + * know that lots of subsequent task pushed will happen from the same thread + * without "interrupting" for task execution. + * + * We try to accumulate as much tasks as possible in a local queue without + * any locks first, and then we push all of them into a scheduler's queue + * from within a single mutex lock. + */ + bool do_delayed_push; + int num_delayed_queue; + Task *delayed_queue[DELAYED_QUEUE_SIZE]; +} TaskThreadLocalStorage; + struct TaskPool { TaskScheduler *scheduler; volatile size_t num; - volatile size_t done; - size_t num_threads; - size_t currently_running_tasks; ThreadMutex num_mutex; ThreadCondition num_cond; @@ -116,6 +171,11 @@ struct TaskPool { ThreadMutex user_mutex; volatile bool do_cancel; + volatile bool do_work; + + volatile bool is_suspended; + ListBase suspended_queue; + size_t num_suspended; /* If set, this pool may never be work_and_wait'ed, which means TaskScheduler * has to use its special background fallback thread in case we are in @@ -123,16 +183,20 @@ struct TaskPool { */ bool run_in_background; - /* This pool is used for caching task pointers for thread id 0. - * This could either point to a global scheduler's task_mempool[0] if the - * pool is handled form the main thread or point to task_mempool_local - * otherwise. - * - * This way we solve possible threading conflicts accessing same global - * memory pool from multiple threads from which wait_work() is called. + /* This is a task scheduler's ID of a thread at which pool was constructed. + * It will be used to access task TLS. + */ + int thread_id; + + /* For the pools which are created from non-main thread which is not a + * scheduler worker thread we can't re-use any of scheduler's threads TLS + * and have to use our own one. */ - TaskMemPool *task_mempool; - TaskMemPool task_mempool_local; + bool use_local_tls; + TaskThreadLocalStorage local_tls; +#ifndef NDEBUG + pthread_t creator_thread_id; +#endif #ifdef DEBUG_STATS TaskMemPoolStats *mempool_stats; @@ -142,7 +206,6 @@ struct TaskPool { struct TaskScheduler { pthread_t *threads; struct TaskThread *task_threads; - TaskMemPool *task_mempool; int num_threads; bool background_thread_only; @@ -151,15 +214,19 @@ struct TaskScheduler { ThreadCondition queue_cond; volatile bool do_exit; + + /* NOTE: In pthread's TLS we store the whole TaskThread structure. */ + pthread_key_t tls_id_key; }; typedef struct TaskThread { TaskScheduler *scheduler; int id; + TaskThreadLocalStorage tls; } TaskThread; /* Helper */ -static void task_data_free(Task *task, const int thread_id) +BLI_INLINE void task_data_free(Task *task, const int thread_id) { if (task->free_taskdata) { if (task->freedata) { @@ -171,28 +238,54 @@ static void task_data_free(Task *task, const int thread_id) } } -BLI_INLINE TaskMemPool *get_task_mempool(TaskPool *pool, const int thread_id) +BLI_INLINE void initialize_task_tls(TaskThreadLocalStorage *tls) +{ + memset(tls, 0, sizeof(TaskThreadLocalStorage)); +} + +BLI_INLINE TaskThreadLocalStorage *get_task_tls(TaskPool *pool, + const int thread_id) { + TaskScheduler *scheduler = pool->scheduler; + BLI_assert(thread_id >= 0); + BLI_assert(thread_id <= scheduler->num_threads); + if (pool->use_local_tls && thread_id == 0) { + BLI_assert(pool->thread_id == 0); + BLI_assert(!BLI_thread_is_main()); + BLI_assert(pthread_equal(pthread_self(), pool->creator_thread_id)); + return &pool->local_tls; + } if (thread_id == 0) { - return pool->task_mempool; + BLI_assert(BLI_thread_is_main()); + return &scheduler->task_threads[pool->thread_id].tls; + } + return &scheduler->task_threads[thread_id].tls; +} + +BLI_INLINE void free_task_tls(TaskThreadLocalStorage *tls) +{ + TaskMemPool *task_mempool = &tls->task_mempool; + for (int i = 0; i < task_mempool->num_tasks; ++i) { + MEM_freeN(task_mempool->tasks[i]); } - return &pool->scheduler->task_mempool[thread_id]; } static Task *task_alloc(TaskPool *pool, const int thread_id) { - assert(thread_id <= pool->scheduler->num_threads); + BLI_assert(thread_id <= pool->scheduler->num_threads); if (thread_id != -1) { - assert(thread_id >= 0); - TaskMemPool *mem_pool = get_task_mempool(pool, thread_id); + BLI_assert(thread_id >= 0); + BLI_assert(thread_id <= pool->scheduler->num_threads); + TaskThreadLocalStorage *tls = get_task_tls(pool, thread_id); + TaskMemPool *task_mempool = &tls->task_mempool; /* Try to re-use task memory from a thread local storage. */ - if (mem_pool->num_tasks > 0) { - --mem_pool->num_tasks; + if (task_mempool->num_tasks > 0) { + --task_mempool->num_tasks; /* Success! We've just avoided task allocation. */ #ifdef DEBUG_STATS pool->mempool_stats[thread_id].num_reuse++; #endif - return mem_pool->tasks[mem_pool->num_tasks]; + return task_mempool->tasks[task_mempool->num_tasks]; } /* We are doomed to allocate new task data. */ #ifdef DEBUG_STATS @@ -205,13 +298,17 @@ static Task *task_alloc(TaskPool *pool, const int thread_id) static void task_free(TaskPool *pool, Task *task, const int thread_id) { task_data_free(task, thread_id); - assert(thread_id >= 0); - assert(thread_id <= pool->scheduler->num_threads); - TaskMemPool *mem_pool = get_task_mempool(pool, thread_id); - if (mem_pool->num_tasks < MEMPOOL_SIZE - 1) { + BLI_assert(thread_id >= 0); + BLI_assert(thread_id <= pool->scheduler->num_threads); + if (thread_id == 0) { + BLI_assert(pool->use_local_tls || BLI_thread_is_main()); + } + TaskThreadLocalStorage *tls = get_task_tls(pool, thread_id); + TaskMemPool *task_mempool = &tls->task_mempool; + if (task_mempool->num_tasks < MEMPOOL_SIZE - 1) { /* Successfully allowed the task to be re-used later. */ - mem_pool->tasks[mem_pool->num_tasks] = task; - ++mem_pool->num_tasks; + task_mempool->tasks[task_mempool->num_tasks] = task; + ++task_mempool->num_tasks; } else { /* Local storage saturated, no other way than just discard @@ -237,8 +334,6 @@ static void task_pool_num_decrease(TaskPool *pool, size_t done) BLI_assert(pool->num >= done); pool->num -= done; - atomic_sub_and_fetch_z(&pool->currently_running_tasks, done); - pool->done += done; if (pool->num == 0) BLI_condition_notify_all(&pool->num_cond); @@ -246,11 +341,11 @@ static void task_pool_num_decrease(TaskPool *pool, size_t done) BLI_mutex_unlock(&pool->num_mutex); } -static void task_pool_num_increase(TaskPool *pool) +static void task_pool_num_increase(TaskPool *pool, size_t new) { BLI_mutex_lock(&pool->num_mutex); - pool->num++; + pool->num += new; BLI_condition_notify_all(&pool->num_cond); BLI_mutex_unlock(&pool->num_mutex); @@ -292,17 +387,10 @@ static bool task_scheduler_thread_wait_pop(TaskScheduler *scheduler, Task **task continue; } - if (atomic_add_and_fetch_z(&pool->currently_running_tasks, 1) <= pool->num_threads || - pool->num_threads == 0) - { - *task = current_task; - found_task = true; - BLI_remlink(&scheduler->queue, *task); - break; - } - else { - atomic_sub_and_fetch_z(&pool->currently_running_tasks, 1); - } + *task = current_task; + found_task = true; + BLI_remlink(&scheduler->queue, *task); + break; } if (!found_task) BLI_condition_wait(&scheduler->queue_cond, &scheduler->queue_mutex); @@ -313,23 +401,51 @@ static bool task_scheduler_thread_wait_pop(TaskScheduler *scheduler, Task **task return true; } +BLI_INLINE void handle_local_queue(TaskThreadLocalStorage *tls, + const int thread_id) +{ + BLI_assert(!tls->do_delayed_push); + while (tls->num_local_queue > 0) { + /* We pop task from queue before handling it so handler of the task can + * push next job to the local queue. + */ + tls->num_local_queue--; + Task *local_task = tls->local_queue[tls->num_local_queue]; + /* TODO(sergey): Double-check work_and_wait() doesn't handle other's + * pool tasks. + */ + TaskPool *local_pool = local_task->pool; + local_task->run(local_pool, local_task->taskdata, thread_id); + task_free(local_pool, local_task, thread_id); + } + BLI_assert(!tls->do_delayed_push); +} + static void *task_scheduler_thread_run(void *thread_p) { TaskThread *thread = (TaskThread *) thread_p; + TaskThreadLocalStorage *tls = &thread->tls; TaskScheduler *scheduler = thread->scheduler; int thread_id = thread->id; Task *task; + pthread_setspecific(scheduler->tls_id_key, thread); + /* keep popping off tasks */ while (task_scheduler_thread_wait_pop(scheduler, &task)) { TaskPool *pool = task->pool; /* run task */ + BLI_assert(!tls->do_delayed_push); task->run(pool, task->taskdata, thread_id); + BLI_assert(!tls->do_delayed_push); /* delete task */ task_free(pool, task, thread_id); + /* Handle all tasks from local queue. */ + handle_local_queue(tls, thread_id); + /* notify pool task was done */ task_pool_num_decrease(pool, 1); } @@ -359,30 +475,35 @@ TaskScheduler *BLI_task_scheduler_create(int num_threads) /* Add background-only thread if needed. */ if (num_threads == 0) { - scheduler->background_thread_only = true; - num_threads = 1; + scheduler->background_thread_only = true; + num_threads = 1; } + scheduler->task_threads = MEM_mallocN(sizeof(TaskThread) * (num_threads + 1), + "TaskScheduler task threads"); + + /* Initialize TLS for main thread. */ + initialize_task_tls(&scheduler->task_threads[0].tls); + + pthread_key_create(&scheduler->tls_id_key, NULL); + /* launch threads that will be waiting for work */ if (num_threads > 0) { int i; scheduler->num_threads = num_threads; scheduler->threads = MEM_callocN(sizeof(pthread_t) * num_threads, "TaskScheduler threads"); - scheduler->task_threads = MEM_callocN(sizeof(TaskThread) * num_threads, "TaskScheduler task threads"); for (i = 0; i < num_threads; i++) { - TaskThread *thread = &scheduler->task_threads[i]; + TaskThread *thread = &scheduler->task_threads[i + 1]; thread->scheduler = scheduler; thread->id = i + 1; + initialize_task_tls(&thread->tls); if (pthread_create(&scheduler->threads[i], NULL, task_scheduler_thread_run, thread) != 0) { fprintf(stderr, "TaskScheduler failed to launch thread %d/%d\n", i, num_threads); } } - - scheduler->task_mempool = MEM_callocN(sizeof(*scheduler->task_mempool) * (num_threads + 1), - "TaskScheduler task_mempool"); } return scheduler; @@ -398,6 +519,8 @@ void BLI_task_scheduler_free(TaskScheduler *scheduler) BLI_condition_notify_all(&scheduler->queue_cond); BLI_mutex_unlock(&scheduler->queue_mutex); + pthread_key_delete(scheduler->tls_id_key); + /* delete threads */ if (scheduler->threads) { int i; @@ -412,17 +535,12 @@ void BLI_task_scheduler_free(TaskScheduler *scheduler) /* Delete task thread data */ if (scheduler->task_threads) { - MEM_freeN(scheduler->task_threads); - } - - /* Delete task memory pool */ - if (scheduler->task_mempool) { - for (int i = 0; i <= scheduler->num_threads; ++i) { - for (int j = 0; j < scheduler->task_mempool[i].num_tasks; ++j) { - MEM_freeN(scheduler->task_mempool[i].tasks[j]); - } + for (int i = 0; i < scheduler->num_threads + 1; ++i) { + TaskThreadLocalStorage *tls = &scheduler->task_threads[i].tls; + free_task_tls(tls); } - MEM_freeN(scheduler->task_mempool); + + MEM_freeN(scheduler->task_threads); } /* delete leftover tasks */ @@ -445,7 +563,7 @@ int BLI_task_scheduler_num_threads(TaskScheduler *scheduler) static void task_scheduler_push(TaskScheduler *scheduler, Task *task, TaskPriority priority) { - task_pool_num_increase(task->pool); + task_pool_num_increase(task->pool, 1); /* add task to queue */ BLI_mutex_lock(&scheduler->queue_mutex); @@ -459,6 +577,27 @@ static void task_scheduler_push(TaskScheduler *scheduler, Task *task, TaskPriori BLI_mutex_unlock(&scheduler->queue_mutex); } +static void task_scheduler_push_all(TaskScheduler *scheduler, + TaskPool *pool, + Task **tasks, + int num_tasks) +{ + if (num_tasks == 0) { + return; + } + + task_pool_num_increase(pool, num_tasks); + + BLI_mutex_lock(&scheduler->queue_mutex); + + for (int i = 0; i < num_tasks; i++) { + BLI_addhead(&scheduler->queue, tasks[i]); + } + + BLI_condition_notify_all(&scheduler->queue_cond); + BLI_mutex_unlock(&scheduler->queue_mutex); +} + static void task_scheduler_clear(TaskScheduler *scheduler, TaskPool *pool) { Task *task, *nexttask; @@ -471,7 +610,7 @@ static void task_scheduler_clear(TaskScheduler *scheduler, TaskPool *pool) nexttask = task->next; if (task->pool == pool) { - task_data_free(task, 0); + task_data_free(task, pool->thread_id); BLI_freelinkN(&scheduler->queue, task); done++; @@ -486,7 +625,10 @@ static void task_scheduler_clear(TaskScheduler *scheduler, TaskPool *pool) /* Task Pool */ -static TaskPool *task_pool_create_ex(TaskScheduler *scheduler, void *userdata, const bool is_background) +static TaskPool *task_pool_create_ex(TaskScheduler *scheduler, + void *userdata, + const bool is_background, + const bool is_suspended) { TaskPool *pool = MEM_mallocN(sizeof(TaskPool), "TaskPool"); @@ -504,11 +646,13 @@ static TaskPool *task_pool_create_ex(TaskScheduler *scheduler, void *userdata, c pool->scheduler = scheduler; pool->num = 0; - pool->done = 0; - pool->num_threads = 0; - pool->currently_running_tasks = 0; pool->do_cancel = false; + pool->do_work = false; + pool->is_suspended = is_suspended; + pool->num_suspended = 0; + pool->suspended_queue.first = pool->suspended_queue.last = NULL; pool->run_in_background = is_background; + pool->use_local_tls = false; BLI_mutex_init(&pool->num_mutex); BLI_condition_init(&pool->num_cond); @@ -517,11 +661,26 @@ static TaskPool *task_pool_create_ex(TaskScheduler *scheduler, void *userdata, c BLI_mutex_init(&pool->user_mutex); if (BLI_thread_is_main()) { - pool->task_mempool = scheduler->task_mempool; + pool->thread_id = 0; } else { - pool->task_mempool = &pool->task_mempool_local; - pool->task_mempool_local.num_tasks = 0; + TaskThread *thread = pthread_getspecific(scheduler->tls_id_key); + if (thread == NULL) { + /* NOTE: Task pool is created from non-main thread which is not + * managed by the task scheduler. We identify ourselves as thread ID + * 0 but we do not use scheduler's TLS storage and use our own + * instead to avoid any possible threading conflicts. + */ + pool->thread_id = 0; + pool->use_local_tls = true; +#ifndef NDEBUG + pool->creator_thread_id = pthread_self(); +#endif + initialize_task_tls(&pool->local_tls); + } + else { + pool->thread_id = thread->id; + } } #ifdef DEBUG_STATS @@ -548,7 +707,7 @@ static TaskPool *task_pool_create_ex(TaskScheduler *scheduler, void *userdata, c */ TaskPool *BLI_task_pool_create(TaskScheduler *scheduler, void *userdata) { - return task_pool_create_ex(scheduler, userdata, false); + return task_pool_create_ex(scheduler, userdata, false, false); } /** @@ -563,25 +722,28 @@ TaskPool *BLI_task_pool_create(TaskScheduler *scheduler, void *userdata) */ TaskPool *BLI_task_pool_create_background(TaskScheduler *scheduler, void *userdata) { - return task_pool_create_ex(scheduler, userdata, true); + return task_pool_create_ex(scheduler, userdata, true, false); +} + +/** + * Similar to BLI_task_pool_create() but does not schedule any tasks for execution + * for until BLI_task_pool_work_and_wait() is called. This helps reducing therading + * overhead when pushing huge amount of small initial tasks from the main thread. + */ +TaskPool *BLI_task_pool_create_suspended(TaskScheduler *scheduler, void *userdata) +{ + return task_pool_create_ex(scheduler, userdata, false, true); } void BLI_task_pool_free(TaskPool *pool) { - BLI_task_pool_stop(pool); + BLI_task_pool_cancel(pool); BLI_mutex_end(&pool->num_mutex); BLI_condition_end(&pool->num_cond); BLI_mutex_end(&pool->user_mutex); - /* Free local memory pool, those pointers are lost forever. */ - if (pool->task_mempool == &pool->task_mempool_local) { - for (int i = 0; i < pool->task_mempool_local.num_tasks; i++) { - MEM_freeN(pool->task_mempool_local.tasks[i]); - } - } - #ifdef DEBUG_STATS printf("Thread ID Allocated Reused Discarded\n"); for (int i = 0; i < pool->scheduler->num_threads + 1; ++i) { @@ -594,24 +756,68 @@ void BLI_task_pool_free(TaskPool *pool) MEM_freeN(pool->mempool_stats); #endif + if (pool->use_local_tls) { + free_task_tls(&pool->local_tls); + } + MEM_freeN(pool); BLI_end_threaded_malloc(); } +BLI_INLINE bool task_can_use_local_queues(TaskPool *pool, int thread_id) +{ + return (thread_id != -1 && (thread_id != pool->thread_id || pool->do_work)); +} + static void task_pool_push( TaskPool *pool, TaskRunFunction run, void *taskdata, bool free_taskdata, TaskFreeFunction freedata, TaskPriority priority, int thread_id) { + /* Allocate task and fill it's properties. */ Task *task = task_alloc(pool, thread_id); - task->run = run; task->taskdata = taskdata; task->free_taskdata = free_taskdata; task->freedata = freedata; task->pool = pool; - + /* For suspended pools we put everything yo a global queue first + * and exit as soon as possible. + * + * This tasks will be moved to actual execution when pool is + * activated by work_and_wait(). + */ + if (pool->is_suspended) { + BLI_addhead(&pool->suspended_queue, task); + atomic_fetch_and_add_z(&pool->num_suspended, 1); + return; + } + /* Populate to any local queue first, this is cheapest push ever. */ + if (task_can_use_local_queues(pool, thread_id)) { + ASSERT_THREAD_ID(pool->scheduler, thread_id); + TaskThreadLocalStorage *tls = get_task_tls(pool, thread_id); + /* Try to push to a local execution queue. + * These tasks will be picked up next. + */ + if (tls->num_local_queue < LOCAL_QUEUE_SIZE) { + tls->local_queue[tls->num_local_queue] = task; + tls->num_local_queue++; + return; + } + /* If we are in the delayed tasks push mode, we push tasks to a + * temporary local queue first without any locks, and then move them + * to global execution queue with a single lock. + */ + if (tls->do_delayed_push && tls->num_delayed_queue < DELAYED_QUEUE_SIZE) { + tls->delayed_queue[tls->num_delayed_queue] = task; + tls->num_delayed_queue++; + return; + } + } + /* Do push to a global execution ppol, slowest possible method, + * causes quite reasonable amount of threading overhead. + */ task_scheduler_push(pool->scheduler, task, priority); } @@ -636,8 +842,25 @@ void BLI_task_pool_push_from_thread(TaskPool *pool, TaskRunFunction run, void BLI_task_pool_work_and_wait(TaskPool *pool) { + TaskThreadLocalStorage *tls = get_task_tls(pool, pool->thread_id); TaskScheduler *scheduler = pool->scheduler; + if (atomic_fetch_and_and_uint8((uint8_t *)&pool->is_suspended, 0)) { + if (pool->num_suspended) { + task_pool_num_increase(pool, pool->num_suspended); + BLI_mutex_lock(&scheduler->queue_mutex); + + BLI_movelisttolist(&scheduler->queue, &pool->suspended_queue); + + BLI_condition_notify_all(&scheduler->queue_cond); + BLI_mutex_unlock(&scheduler->queue_mutex); + } + } + + pool->do_work = true; + + ASSERT_THREAD_ID(pool->scheduler, pool->thread_id); + BLI_mutex_lock(&pool->num_mutex); while (pool->num != 0) { @@ -651,16 +874,12 @@ void BLI_task_pool_work_and_wait(TaskPool *pool) /* find task from this pool. if we get a task from another pool, * we can get into deadlock */ - if (pool->num_threads == 0 || - pool->currently_running_tasks < pool->num_threads) - { - for (task = scheduler->queue.first; task; task = task->next) { - if (task->pool == pool) { - work_task = task; - found_task = true; - BLI_remlink(&scheduler->queue, task); - break; - } + for (task = scheduler->queue.first; task; task = task->next) { + if (task->pool == pool) { + work_task = task; + found_task = true; + BLI_remlink(&scheduler->queue, task); + break; } } @@ -669,11 +888,15 @@ void BLI_task_pool_work_and_wait(TaskPool *pool) /* if found task, do it, otherwise wait until other tasks are done */ if (found_task) { /* run task */ - atomic_add_and_fetch_z(&pool->currently_running_tasks, 1); - work_task->run(pool, work_task->taskdata, 0); + BLI_assert(!tls->do_delayed_push); + work_task->run(pool, work_task->taskdata, pool->thread_id); + BLI_assert(!tls->do_delayed_push); /* delete task */ - task_free(pool, task, 0); + task_free(pool, task, pool->thread_id); + + /* Handle all tasks from local queue. */ + handle_local_queue(tls, pool->thread_id); /* notify pool task was done */ task_pool_num_decrease(pool, 1); @@ -688,22 +911,8 @@ void BLI_task_pool_work_and_wait(TaskPool *pool) } BLI_mutex_unlock(&pool->num_mutex); -} - -int BLI_pool_get_num_threads(TaskPool *pool) -{ - if (pool->num_threads != 0) { - return pool->num_threads; - } - else { - return BLI_task_scheduler_num_threads(pool->scheduler); - } -} -void BLI_pool_set_num_threads(TaskPool *pool, int num_threads) -{ - /* NOTE: Don't try to modify threads while tasks are running! */ - pool->num_threads = num_threads; + handle_local_queue(tls, pool->thread_id); } void BLI_task_pool_cancel(TaskPool *pool) @@ -721,13 +930,6 @@ void BLI_task_pool_cancel(TaskPool *pool) pool->do_cancel = false; } -void BLI_task_pool_stop(TaskPool *pool) -{ - task_scheduler_clear(pool->scheduler, pool); - - BLI_assert(pool->num == 0); -} - bool BLI_task_pool_canceled(TaskPool *pool) { return pool->do_cancel; @@ -743,9 +945,28 @@ ThreadMutex *BLI_task_pool_user_mutex(TaskPool *pool) return &pool->user_mutex; } -size_t BLI_task_pool_tasks_done(TaskPool *pool) +void BLI_task_pool_delayed_push_begin(TaskPool *pool, int thread_id) +{ + if (task_can_use_local_queues(pool, thread_id)) { + ASSERT_THREAD_ID(pool->scheduler, thread_id); + TaskThreadLocalStorage *tls = get_task_tls(pool, thread_id); + tls->do_delayed_push = true; + } +} + +void BLI_task_pool_delayed_push_end(TaskPool *pool, int thread_id) { - return pool->done; + if (task_can_use_local_queues(pool, thread_id)) { + ASSERT_THREAD_ID(pool->scheduler, thread_id); + TaskThreadLocalStorage *tls = get_task_tls(pool, thread_id); + BLI_assert(tls->do_delayed_push); + task_scheduler_push_all(pool->scheduler, + pool, + tls->delayed_queue, + tls->num_delayed_queue); + tls->do_delayed_push = false; + tls->num_delayed_queue = 0; + } } /* Parallel range routines */ @@ -783,7 +1004,7 @@ BLI_INLINE bool parallel_range_next_iter_get( int * __restrict iter, int * __restrict count) { uint32_t uval = atomic_fetch_and_add_uint32((uint32_t *)(&state->iter), state->chunk_size); - int previter = *(int32_t*)&uval; + int previter = *(int32_t *)&uval; *iter = previter; *count = max_ii(0, min_ii(state->chunk_size, state->stop - previter)); @@ -906,7 +1127,7 @@ static void task_parallel_range_ex( atomic_fetch_and_add_uint32((uint32_t *)(&state.iter), 0); if (use_userdata_chunk) { - userdata_chunk_array = MALLOCA(userdata_chunk_size * num_tasks); + userdata_chunk_array = MALLOCA(userdata_chunk_size * num_tasks); } for (i = 0; i < num_tasks; i++) { @@ -918,7 +1139,8 @@ static void task_parallel_range_ex( BLI_task_pool_push_from_thread(task_pool, parallel_range_func, userdata_chunk_local, false, - TASK_PRIORITY_HIGH, 0); + TASK_PRIORITY_HIGH, + task_pool->thread_id); } BLI_task_pool_work_and_wait(task_pool); @@ -1124,7 +1346,8 @@ void BLI_task_parallel_listbase( BLI_task_pool_push_from_thread(task_pool, parallel_listbase_func, NULL, false, - TASK_PRIORITY_HIGH, 0); + TASK_PRIORITY_HIGH, + task_pool->thread_id); } BLI_task_pool_work_and_wait(task_pool); |