diff options
author | Sergey Sharybin <sergey.vfx@gmail.com> | 2014-12-23 08:04:03 +0300 |
---|---|---|
committer | Sergey Sharybin <sergey.vfx@gmail.com> | 2014-12-23 08:04:03 +0300 |
commit | 9282d305bdc56522543129436db1e8a5d19cf39f (patch) | |
tree | b6d0cdcc7dd3f4113cf9e4f9813c40f44d29746e /source/blender/blenlib/intern/task.c | |
parent | de724a258eda45d1fed2b2176006c3b2df8abea2 (diff) | |
parent | 646a96bf8edc211a06f3df652101c265ee166e8d (diff) |
Merge branch 'master' into texture_nodes_refactortexture_nodes_refactor
Conflicts:
source/blender/nodes/texture/nodes/node_texture_math.c
Diffstat (limited to 'source/blender/blenlib/intern/task.c')
-rw-r--r-- | source/blender/blenlib/intern/task.c | 193 |
1 files changed, 179 insertions, 14 deletions
diff --git a/source/blender/blenlib/intern/task.c b/source/blender/blenlib/intern/task.c index 8d867b9f295..d187a8d1968 100644 --- a/source/blender/blenlib/intern/task.c +++ b/source/blender/blenlib/intern/task.c @@ -29,9 +29,12 @@ #include "MEM_guardedalloc.h" #include "BLI_listbase.h" +#include "BLI_math.h" #include "BLI_task.h" #include "BLI_threads.h" +#include "atomic_ops.h" + /* Types */ typedef struct Task { @@ -48,6 +51,8 @@ struct TaskPool { volatile size_t num; volatile size_t done; + size_t num_threads; + size_t currently_running_tasks; ThreadMutex num_mutex; ThreadCondition num_cond; @@ -83,6 +88,7 @@ static void task_pool_num_decrease(TaskPool *pool, size_t done) BLI_assert(pool->num >= done); pool->num -= done; + atomic_sub_z(&pool->currently_running_tasks, done); pool->done += done; if (pool->num == 0) @@ -103,19 +109,37 @@ static void task_pool_num_increase(TaskPool *pool) static bool task_scheduler_thread_wait_pop(TaskScheduler *scheduler, Task **task) { + bool found_task = false; BLI_mutex_lock(&scheduler->queue_mutex); while (!scheduler->queue.first && !scheduler->do_exit) BLI_condition_wait(&scheduler->queue_cond, &scheduler->queue_mutex); - if (!scheduler->queue.first) { - BLI_mutex_unlock(&scheduler->queue_mutex); - BLI_assert(scheduler->do_exit); - return false; - } - - *task = scheduler->queue.first; - BLI_remlink(&scheduler->queue, *task); + do { + Task *current_task; + if (!scheduler->queue.first) { + BLI_mutex_unlock(&scheduler->queue_mutex); + BLI_assert(scheduler->do_exit); + return false; + } + for (current_task = scheduler->queue.first; + current_task != NULL; + current_task = current_task->next) + { + TaskPool *pool = current_task->pool; + if (pool->num_threads == 0 || + pool->currently_running_tasks < pool->num_threads) + { + *task = current_task; + found_task = true; + atomic_add_z(&pool->currently_running_tasks, 1); + BLI_remlink(&scheduler->queue, *task); + break; + } + } + if (!found_task) + BLI_condition_wait(&scheduler->queue_cond, &scheduler->queue_mutex); + } while (!found_task); BLI_mutex_unlock(&scheduler->queue_mutex); @@ -287,6 +311,8 @@ TaskPool *BLI_task_pool_create(TaskScheduler *scheduler, void *userdata) pool->scheduler = scheduler; pool->num = 0; + pool->num_threads = 0; + pool->currently_running_tasks = 0; pool->do_cancel = false; BLI_mutex_init(&pool->num_mutex); @@ -350,12 +376,16 @@ void BLI_task_pool_work_and_wait(TaskPool *pool) /* find task from this pool. if we get a task from another pool, * we can get into deadlock */ - for (task = scheduler->queue.first; task; task = task->next) { - if (task->pool == pool) { - work_task = task; - found_task = true; - BLI_remlink(&scheduler->queue, task); - break; + if (pool->num_threads == 0 || + pool->currently_running_tasks < pool->num_threads) + { + for (task = scheduler->queue.first; task; task = task->next) { + if (task->pool == pool) { + work_task = task; + found_task = true; + BLI_remlink(&scheduler->queue, task); + break; + } } } @@ -364,6 +394,7 @@ void BLI_task_pool_work_and_wait(TaskPool *pool) /* if found task, do it, otherwise wait until other tasks are done */ if (found_task) { /* run task */ + atomic_add_z(&pool->currently_running_tasks, 1); work_task->run(pool, work_task->taskdata, 0); /* delete task */ @@ -386,6 +417,12 @@ void BLI_task_pool_work_and_wait(TaskPool *pool) BLI_mutex_unlock(&pool->num_mutex); } +void BLI_pool_set_num_threads(TaskPool *pool, int num_threads) +{ + /* NOTE: Don't try to modify threads while tasks are running! */ + pool->num_threads = num_threads; +} + void BLI_task_pool_cancel(TaskPool *pool) { pool->do_cancel = true; @@ -428,3 +465,131 @@ size_t BLI_task_pool_tasks_done(TaskPool *pool) return pool->done; } +/* Parallel range routines */ + +/** + * + * Main functions: + * - #BLI_task_parallel_range + * + * TODO: + * - #BLI_task_parallel_foreach_listbase (#ListBase - double linked list) + * - #BLI_task_parallel_foreach_link (#Link - single linked list) + * - #BLI_task_parallel_foreach_ghash/gset (#GHash/#GSet - hash & set) + * - #BLI_task_parallel_foreach_mempool (#BLI_mempool - iterate over mempools) + * + * Possible improvements: + * + * - Chunk iterations to reduce number of spin locks. + */ + +typedef struct ParallelRangeState { + int start, stop; + void *userdata; + TaskParallelRangeFunc func; + + int iter; + int chunk_size; + SpinLock lock; +} ParallelRangeState; + +BLI_INLINE bool parallel_range_next_iter_get( + ParallelRangeState * __restrict state, + int * __restrict iter, int * __restrict count) +{ + bool result = false; + if (state->iter < state->stop) { + BLI_spin_lock(&state->lock); + if (state->iter < state->stop) { + *count = min_ii(state->chunk_size, state->stop - state->iter); + *iter = state->iter; + state->iter += *count; + result = true; + } + BLI_spin_unlock(&state->lock); + } + return result; +} + +static void parallel_range_func( + TaskPool * __restrict pool, + void *UNUSED(taskdata), + int UNUSED(threadid)) +{ + ParallelRangeState * __restrict state = BLI_task_pool_userdata(pool); + int iter, count; + while (parallel_range_next_iter_get(state, &iter, &count)) { + int i; + for (i = 0; i < count; ++i) { + state->func(state->userdata, iter + i); + } + } +} + +void BLI_task_parallel_range_ex( + int start, int stop, + void *userdata, + TaskParallelRangeFunc func, + const int range_threshold, + const bool use_dynamic_scheduling) +{ + TaskScheduler *task_scheduler; + TaskPool *task_pool; + ParallelRangeState state; + int i, num_threads, num_tasks; + + BLI_assert(start < stop); + + /* If it's not enough data to be crunched, don't bother with tasks at all, + * do everything from the main thread. + */ + if (stop - start < range_threshold) { + for (i = start; i < stop; ++i) { + func(userdata, i); + } + return; + } + + task_scheduler = BLI_task_scheduler_get(); + task_pool = BLI_task_pool_create(task_scheduler, &state); + num_threads = BLI_task_scheduler_num_threads(task_scheduler); + + /* The idea here is to prevent creating task for each of the loop iterations + * and instead have tasks which are evenly distributed across CPU cores and + * pull next iter to be crunched using the queue. + */ + num_tasks = num_threads * 2; + + BLI_spin_init(&state.lock); + state.start = start; + state.stop = stop; + state.userdata = userdata; + state.func = func; + state.iter = start; + if (use_dynamic_scheduling) { + state.chunk_size = 32; + } + else { + state.chunk_size = (stop - start) / (num_tasks); + } + + for (i = 0; i < num_tasks; i++) { + BLI_task_pool_push(task_pool, + parallel_range_func, + NULL, false, + TASK_PRIORITY_HIGH); + } + + BLI_task_pool_work_and_wait(task_pool); + BLI_task_pool_free(task_pool); + + BLI_spin_end(&state.lock); +} + +void BLI_task_parallel_range( + int start, int stop, + void *userdata, + TaskParallelRangeFunc func) +{ + BLI_task_parallel_range_ex(start, stop, userdata, func, 64, false); +} |