diff options
author | Bastien Montagne <montagne29@wanadoo.fr> | 2016-05-16 18:15:18 +0300 |
---|---|---|
committer | Bastien Montagne <montagne29@wanadoo.fr> | 2016-05-16 18:15:18 +0300 |
commit | 688858d3a807536d2bdcead7b50fc4d0496dab44 (patch) | |
tree | 8304480488491a0f3cef72e731da32a7ad9ceac0 | |
parent | 5a7429c36302b872b8de2eaec88984a9b98cc613 (diff) |
BLI_task: Add new 'BLI_task_parallel_range_finalize()'.
Together with the extended loop callback and userdata_chunk, this allows to perform
cumulative tasks (like aggregation) in a lockfree way using local userdata_chunk to store temp data,
and once all workers have finished, to merge those userdata_chunks in the finalize callback
(from calling thread, so no need to lock here either).
Note that this changes how userdata_chunk is handled (now fully from 'main' thread,
which means a given worker thread will always get the same userdata_chunk, without
being re-initialized anymore to init value at start of each iter chunk).
-rw-r--r-- | source/blender/blenlib/BLI_task.h | 14 | ||||
-rw-r--r-- | source/blender/blenlib/intern/task.c | 85 |
2 files changed, 77 insertions, 22 deletions
diff --git a/source/blender/blenlib/BLI_task.h b/source/blender/blenlib/BLI_task.h index c511ec432ee..967e0be6d0a 100644 --- a/source/blender/blenlib/BLI_task.h +++ b/source/blender/blenlib/BLI_task.h @@ -119,11 +119,13 @@ size_t BLI_task_pool_tasks_done(TaskPool *pool); /* Parallel for routines */ typedef void (*TaskParallelRangeFunc)(void *userdata, const int iter); typedef void (*TaskParallelRangeFuncEx)(void *userdata, void *userdata_chunk, const int iter, const int thread_id); +typedef void (*TaskParallelRangeFuncFinalize)(void *userdata, void *userdata_chunk); void BLI_task_parallel_range_ex( int start, int stop, void *userdata, void *userdata_chunk, - const size_t userdata_chunk_size, TaskParallelRangeFuncEx func_ex, + const size_t userdata_chunk_size, + TaskParallelRangeFuncEx func_ex, const bool use_threading, const bool use_dynamic_scheduling); void BLI_task_parallel_range( @@ -132,6 +134,16 @@ void BLI_task_parallel_range( TaskParallelRangeFunc func, const bool use_threading); +void BLI_task_parallel_range_finalize( + int start, int stop, + void *userdata, + void *userdata_chunk, + const size_t userdata_chunk_size, + TaskParallelRangeFuncEx func_ex, + TaskParallelRangeFuncFinalize func_finalize, + const bool use_threading, + const bool use_dynamic_scheduling); + typedef void (*TaskParallelListbaseFunc)(void *userdata, struct Link *iter, int index); diff --git a/source/blender/blenlib/intern/task.c b/source/blender/blenlib/intern/task.c index a34d9e97fd5..4ef42dcce6f 100644 --- a/source/blender/blenlib/intern/task.c +++ b/source/blender/blenlib/intern/task.c @@ -768,8 +768,6 @@ size_t BLI_task_pool_tasks_done(TaskPool *pool) typedef struct ParallelRangeState { int start, stop; void *userdata; - void *userdata_chunk; - size_t userdata_chunk_size; TaskParallelRangeFunc func; TaskParallelRangeFuncEx func_ex; @@ -792,24 +790,16 @@ BLI_INLINE bool parallel_range_next_iter_get( static void parallel_range_func( TaskPool * __restrict pool, - void *UNUSED(taskdata), + void *userdata_chunk, int threadid) { ParallelRangeState * __restrict state = BLI_task_pool_userdata(pool); int iter, count; - const bool use_userdata_chunk = (state->func_ex != NULL) && - (state->userdata_chunk_size != 0) && (state->userdata_chunk != NULL); - void *userdata_chunk = use_userdata_chunk ? MALLOCA(state->userdata_chunk_size) : NULL; - while (parallel_range_next_iter_get(state, &iter, &count)) { int i; if (state->func_ex) { - if (use_userdata_chunk) { - memcpy(userdata_chunk, state->userdata_chunk, state->userdata_chunk_size); - } - for (i = 0; i < count; ++i) { state->func_ex(state->userdata, userdata_chunk, iter + i, threadid); } @@ -820,8 +810,6 @@ static void parallel_range_func( } } } - - MALLOCA_FREE(userdata_chunk, state->userdata_chunk_size); } /** @@ -836,6 +824,7 @@ static void task_parallel_range_ex( const size_t userdata_chunk_size, TaskParallelRangeFunc func, TaskParallelRangeFuncEx func_ex, + TaskParallelRangeFuncFinalize func_finalize, const bool use_threading, const bool use_dynamic_scheduling) { @@ -844,6 +833,10 @@ static void task_parallel_range_ex( ParallelRangeState state; int i, num_threads, num_tasks; + void *userdata_chunk_local = NULL; + void *userdata_chunk_array = NULL; + const bool use_userdata_chunk = (func_ex != NULL) && (userdata_chunk_size != 0) && (userdata_chunk != NULL); + if (start == stop) { return; } @@ -859,9 +852,6 @@ static void task_parallel_range_ex( */ if (!use_threading) { if (func_ex) { - const bool use_userdata_chunk = (userdata_chunk_size != 0) && (userdata_chunk != NULL); - void *userdata_chunk_local = NULL; - if (use_userdata_chunk) { userdata_chunk_local = MALLOCA(userdata_chunk_size); memcpy(userdata_chunk_local, userdata_chunk, userdata_chunk_size); @@ -871,6 +861,10 @@ static void task_parallel_range_ex( func_ex(userdata, userdata_chunk, i, 0); } + if (func_finalize) { + func_finalize(userdata, userdata_chunk); + } + MALLOCA_FREE(userdata_chunk_local, userdata_chunk_size); } else { @@ -895,8 +889,6 @@ static void task_parallel_range_ex( state.start = start; state.stop = stop; state.userdata = userdata; - state.userdata_chunk = userdata_chunk; - state.userdata_chunk_size = userdata_chunk_size; state.func = func; state.func_ex = func_ex; state.iter = start; @@ -910,15 +902,34 @@ static void task_parallel_range_ex( num_tasks = min_ii(num_tasks, (stop - start) / state.chunk_size); atomic_fetch_and_add_uint32((uint32_t *)(&state.iter), 0); + if (use_userdata_chunk) { + userdata_chunk_array = MALLOCA(userdata_chunk_size * num_tasks); + } + for (i = 0; i < num_tasks; i++) { + if (use_userdata_chunk) { + userdata_chunk_local = (char *)userdata_chunk_array + (userdata_chunk_size * i); + memcpy(userdata_chunk_local, userdata_chunk, userdata_chunk_size); + } + /* Use this pool's pre-allocated tasks. */ BLI_task_pool_push_from_thread(task_pool, parallel_range_func, - NULL, false, + userdata_chunk_local, false, TASK_PRIORITY_HIGH, 0); } BLI_task_pool_work_and_wait(task_pool); BLI_task_pool_free(task_pool); + + if (use_userdata_chunk) { + if (func_finalize) { + for (i = 0; i < num_tasks; i++) { + userdata_chunk_local = (char *)userdata_chunk_array + (userdata_chunk_size * i); + func_finalize(userdata, userdata_chunk_local); + } + } + MALLOCA_FREE(userdata_chunk_array, userdata_chunk_size * num_tasks); + } } /** @@ -946,7 +957,7 @@ void BLI_task_parallel_range_ex( const bool use_dynamic_scheduling) { task_parallel_range_ex( - start, stop, userdata, userdata_chunk, userdata_chunk_size, NULL, func_ex, + start, stop, userdata, userdata_chunk, userdata_chunk_size, NULL, func_ex, NULL, use_threading, use_dynamic_scheduling); } @@ -967,7 +978,39 @@ void BLI_task_parallel_range( TaskParallelRangeFunc func, const bool use_threading) { - task_parallel_range_ex(start, stop, userdata, NULL, 0, func, NULL, use_threading, false); + task_parallel_range_ex(start, stop, userdata, NULL, 0, func, NULL, NULL, use_threading, false); +} + +/** + * This function allows to parallelize for loops in a similar way to OpenMP's 'parallel for' statement, + * with an additional 'finalize' func called from calling thread once whole range have been processed. + * + * \param start First index to process. + * \param stop Index to stop looping (excluded). + * \param userdata Common userdata passed to all instances of \a func. + * \param userdata_chunk Optional, each instance of looping chunks will get a copy of this data + * (similar to OpenMP's firstprivate). + * \param userdata_chunk_size Memory size of \a userdata_chunk. + * \param func_ex Callback function (advanced version). + * \param func_finalize Callback function, called after all workers have finisehd, useful to finalize accumulative tasks. + * \param use_threading If \a true, actually split-execute loop in threads, else just do a sequential forloop + * (allows caller to use any kind of test to switch on parallelization or not). + * \param use_dynamic_scheduling If \a true, the whole range is divided in a lot of small chunks (of size 32 currently), + * otherwise whole range is split in a few big chunks (num_threads * 2 chunks currently). + */ +void BLI_task_parallel_range_finalize( + int start, int stop, + void *userdata, + void *userdata_chunk, + const size_t userdata_chunk_size, + TaskParallelRangeFuncEx func_ex, + TaskParallelRangeFuncFinalize func_finalize, + const bool use_threading, + const bool use_dynamic_scheduling) +{ + task_parallel_range_ex( + start, stop, userdata, userdata_chunk, userdata_chunk_size, NULL, func_ex, func_finalize, + use_threading, use_dynamic_scheduling); } #undef MALLOCA |