diff options
Diffstat (limited to 'source/blender/blenlib/intern/task.c')
-rw-r--r-- | source/blender/blenlib/intern/task.c | 119 |
1 files changed, 86 insertions, 33 deletions
diff --git a/source/blender/blenlib/intern/task.c b/source/blender/blenlib/intern/task.c index 07c67f001f9..d187a8d1968 100644 --- a/source/blender/blenlib/intern/task.c +++ b/source/blender/blenlib/intern/task.c @@ -29,9 +29,12 @@ #include "MEM_guardedalloc.h" #include "BLI_listbase.h" +#include "BLI_math.h" #include "BLI_task.h" #include "BLI_threads.h" +#include "atomic_ops.h" + /* Types */ typedef struct Task { @@ -48,6 +51,8 @@ struct TaskPool { volatile size_t num; volatile size_t done; + size_t num_threads; + size_t currently_running_tasks; ThreadMutex num_mutex; ThreadCondition num_cond; @@ -83,6 +88,7 @@ static void task_pool_num_decrease(TaskPool *pool, size_t done) BLI_assert(pool->num >= done); pool->num -= done; + atomic_sub_z(&pool->currently_running_tasks, done); pool->done += done; if (pool->num == 0) @@ -103,19 +109,37 @@ static void task_pool_num_increase(TaskPool *pool) static bool task_scheduler_thread_wait_pop(TaskScheduler *scheduler, Task **task) { + bool found_task = false; BLI_mutex_lock(&scheduler->queue_mutex); while (!scheduler->queue.first && !scheduler->do_exit) BLI_condition_wait(&scheduler->queue_cond, &scheduler->queue_mutex); - if (!scheduler->queue.first) { - BLI_mutex_unlock(&scheduler->queue_mutex); - BLI_assert(scheduler->do_exit); - return false; - } - - *task = scheduler->queue.first; - BLI_remlink(&scheduler->queue, *task); + do { + Task *current_task; + if (!scheduler->queue.first) { + BLI_mutex_unlock(&scheduler->queue_mutex); + BLI_assert(scheduler->do_exit); + return false; + } + for (current_task = scheduler->queue.first; + current_task != NULL; + current_task = current_task->next) + { + TaskPool *pool = current_task->pool; + if (pool->num_threads == 0 || + pool->currently_running_tasks < pool->num_threads) + { + *task = current_task; + found_task = true; + atomic_add_z(&pool->currently_running_tasks, 1); + BLI_remlink(&scheduler->queue, *task); + break; + } + } + if (!found_task) + BLI_condition_wait(&scheduler->queue_cond, &scheduler->queue_mutex); + } while (!found_task); BLI_mutex_unlock(&scheduler->queue_mutex); @@ -287,6 +311,8 @@ TaskPool *BLI_task_pool_create(TaskScheduler *scheduler, void *userdata) pool->scheduler = scheduler; pool->num = 0; + pool->num_threads = 0; + pool->currently_running_tasks = 0; pool->do_cancel = false; BLI_mutex_init(&pool->num_mutex); @@ -350,12 +376,16 @@ void BLI_task_pool_work_and_wait(TaskPool *pool) /* find task from this pool. if we get a task from another pool, * we can get into deadlock */ - for (task = scheduler->queue.first; task; task = task->next) { - if (task->pool == pool) { - work_task = task; - found_task = true; - BLI_remlink(&scheduler->queue, task); - break; + if (pool->num_threads == 0 || + pool->currently_running_tasks < pool->num_threads) + { + for (task = scheduler->queue.first; task; task = task->next) { + if (task->pool == pool) { + work_task = task; + found_task = true; + BLI_remlink(&scheduler->queue, task); + break; + } } } @@ -364,6 +394,7 @@ void BLI_task_pool_work_and_wait(TaskPool *pool) /* if found task, do it, otherwise wait until other tasks are done */ if (found_task) { /* run task */ + atomic_add_z(&pool->currently_running_tasks, 1); work_task->run(pool, work_task->taskdata, 0); /* delete task */ @@ -386,6 +417,12 @@ void BLI_task_pool_work_and_wait(TaskPool *pool) BLI_mutex_unlock(&pool->num_mutex); } +void BLI_pool_set_num_threads(TaskPool *pool, int num_threads) +{ + /* NOTE: Don't try to modify threads while tasks are running! */ + pool->num_threads = num_threads; +} + void BLI_task_pool_cancel(TaskPool *pool) { pool->do_cancel = true; @@ -452,18 +489,21 @@ typedef struct ParallelRangeState { TaskParallelRangeFunc func; int iter; + int chunk_size; SpinLock lock; } ParallelRangeState; BLI_INLINE bool parallel_range_next_iter_get( - ParallelRangeState *state, - int *iter) + ParallelRangeState * __restrict state, + int * __restrict iter, int * __restrict count) { bool result = false; if (state->iter < state->stop) { BLI_spin_lock(&state->lock); if (state->iter < state->stop) { - *iter = state->iter++; + *count = min_ii(state->chunk_size, state->stop - state->iter); + *iter = state->iter; + state->iter += *count; result = true; } BLI_spin_unlock(&state->lock); @@ -472,14 +512,17 @@ BLI_INLINE bool parallel_range_next_iter_get( } static void parallel_range_func( - TaskPool *pool, + TaskPool * __restrict pool, void *UNUSED(taskdata), int UNUSED(threadid)) { - ParallelRangeState *state = BLI_task_pool_userdata(pool); - int iter; - while (parallel_range_next_iter_get(state, &iter)) { - state->func(state->userdata, iter); + ParallelRangeState * __restrict state = BLI_task_pool_userdata(pool); + int iter, count; + while (parallel_range_next_iter_get(state, &iter, &count)) { + int i; + for (i = 0; i < count; ++i) { + state->func(state->userdata, iter + i); + } } } @@ -487,12 +530,13 @@ void BLI_task_parallel_range_ex( int start, int stop, void *userdata, TaskParallelRangeFunc func, - const int range_threshold) + const int range_threshold, + const bool use_dynamic_scheduling) { TaskScheduler *task_scheduler; TaskPool *task_pool; ParallelRangeState state; - int i; + int i, num_threads, num_tasks; BLI_assert(start < stop); @@ -506,21 +550,30 @@ void BLI_task_parallel_range_ex( return; } - BLI_spin_init(&state.lock); - state.start = start; - state.stop = stop; - state.userdata = userdata; - state.func = func; - state.iter = start; - task_scheduler = BLI_task_scheduler_get(); task_pool = BLI_task_pool_create(task_scheduler, &state); + num_threads = BLI_task_scheduler_num_threads(task_scheduler); /* The idea here is to prevent creating task for each of the loop iterations * and instead have tasks which are evenly distributed across CPU cores and * pull next iter to be crunched using the queue. */ - for (i = 0; i < 2 * BLI_task_scheduler_num_threads(task_scheduler); i++) { + num_tasks = num_threads * 2; + + BLI_spin_init(&state.lock); + state.start = start; + state.stop = stop; + state.userdata = userdata; + state.func = func; + state.iter = start; + if (use_dynamic_scheduling) { + state.chunk_size = 32; + } + else { + state.chunk_size = (stop - start) / (num_tasks); + } + + for (i = 0; i < num_tasks; i++) { BLI_task_pool_push(task_pool, parallel_range_func, NULL, false, @@ -538,5 +591,5 @@ void BLI_task_parallel_range( void *userdata, TaskParallelRangeFunc func) { - BLI_task_parallel_range_ex(start, stop, userdata, func, 64); + BLI_task_parallel_range_ex(start, stop, userdata, func, 64, false); } |