Welcome to mirror list, hosted at ThFree Co, Russian Federation.

git.blender.org/blender.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBastien Montagne <b.mont29@gmail.com>2019-11-25 21:53:17 +0300
committerBastien Montagne <b.mont29@gmail.com>2019-11-25 21:54:40 +0300
commit3f87ac368483f0dcf86a1e3057ce8a6fbbe702ac (patch)
tree41a77f2a782ea2a841c511a4d934b90774e9fef7 /source/blender/blenlib/intern/task.c
parent52f0d685ba9f33c559e1945c0e8c8d50d2447ffe (diff)
Revert "BLI_task: Add pooled threaded index range iterator."
This reverts commit f9028a3be1f77c01edca44a68894e2ba9d9cfb14. This is giving weird heisenbug crash on only Windows release builds... Reverting until we understand to issue.
Diffstat (limited to 'source/blender/blenlib/intern/task.c')
-rw-r--r--source/blender/blenlib/intern/task.c453
1 files changed, 81 insertions, 372 deletions
diff --git a/source/blender/blenlib/intern/task.c b/source/blender/blenlib/intern/task.c
index e926cbb18bc..0613b558aec 100644
--- a/source/blender/blenlib/intern/task.c
+++ b/source/blender/blenlib/intern/task.c
@@ -1042,56 +1042,15 @@ void BLI_task_pool_delayed_push_end(TaskPool *pool, int thread_id)
if (((_mem) != NULL) && ((_size) > 8192)) \
MEM_freeN((_mem))
-/* Stores all needed data to perform a parallelized iteration,
- * with a same operation (callback function).
- * It can be chained with other tasks in a single-linked list way. */
-typedef struct TaskParallelRangeState {
- struct TaskParallelRangeState *next;
-
- /* Start and end point of integer value iteration. */
+typedef struct ParallelRangeState {
int start, stop;
+ void *userdata;
- /* User-defined data, shared between all worker threads. */
- void *userdata_shared;
- /* User-defined callback function called for each value in [start, stop[ specified range. */
TaskParallelRangeFunc func;
- /* Each instance of looping chunks will get a copy of this data
- * (similar to OpenMP's firstprivate).
- */
- void *initial_tls_memory; /* Pointer to actual user-defined 'tls' data. */
- size_t tls_data_size; /* Size of that data. */
-
- void *flatten_tls_storage; /* 'tls' copies of initial_tls_memory for each running task. */
- /* Number of 'tls' copies in the array, i.e. number of worker threads. */
- size_t num_elements_in_tls_storage;
-
- /* Function called from calling thread once whole range have been processed. */
- TaskParallelFinalizeFunc func_finalize;
-
- /* Current value of the iterator, shared between all threads (atomically updated). */
- int iter_value;
- int iter_chunk_num; /* Amount of iterations to process in a single step. */
-} TaskParallelRangeState;
-
-/* Stores all the parallel tasks for a single pool. */
-typedef struct TaskParallelRangePool {
- /* The workers' task pool. */
- TaskPool *pool;
- /* The number of worker tasks we need to create. */
- int num_tasks;
- /* The total number of iterations in all the added ranges. */
- int num_total_iters;
- /* The size (number of items) processed at once by a worker task. */
+ int iter;
int chunk_size;
-
- /* Linked list of range tasks to process. */
- TaskParallelRangeState *parallel_range_states;
- /* Current range task beeing processed, swapped atomically. */
- TaskParallelRangeState *current_state;
- /* Scheduling settings common to all tasks. */
- TaskParallelSettings *settings;
-} TaskParallelRangePool;
+} ParallelRangeState;
BLI_INLINE void task_parallel_calc_chunk_size(const TaskParallelSettings *settings,
const int tot_items,
@@ -1154,102 +1113,66 @@ BLI_INLINE void task_parallel_calc_chunk_size(const TaskParallelSettings *settin
}
}
-BLI_INLINE void task_parallel_range_calc_chunk_size(TaskParallelRangePool *range_pool)
+BLI_INLINE void task_parallel_range_calc_chunk_size(const TaskParallelSettings *settings,
+ const int num_tasks,
+ ParallelRangeState *state)
{
- int num_iters = 0;
- int min_num_iters = INT_MAX;
- for (TaskParallelRangeState *state = range_pool->parallel_range_states; state != NULL;
- state = state->next) {
- const int ni = state->stop - state->start;
- num_iters += ni;
- if (min_num_iters > ni) {
- min_num_iters = ni;
- }
- }
- range_pool->num_total_iters = num_iters;
- /* Note: Passing min_num_iters here instead of num_iters kind of partially breaks the 'static'
- * scheduling, but pooled range iterator is inherently non-static anyway, so adding a small level
- * of dynamic scheduling here should be fine. */
task_parallel_calc_chunk_size(
- range_pool->settings, min_num_iters, range_pool->num_tasks, &range_pool->chunk_size);
+ settings, state->stop - state->start, num_tasks, &state->chunk_size);
}
-BLI_INLINE bool parallel_range_next_iter_get(TaskParallelRangePool *__restrict range_pool,
- int *__restrict r_iter,
- int *__restrict r_count,
- TaskParallelRangeState **__restrict r_state)
+BLI_INLINE bool parallel_range_next_iter_get(ParallelRangeState *__restrict state,
+ int *__restrict iter,
+ int *__restrict count)
{
- TaskParallelRangeState *state;
- int previter = INT32_MAX;
+ int previter = atomic_fetch_and_add_int32(&state->iter, state->chunk_size);
- do {
- if ((state = range_pool->current_state) == NULL) {
- break;
- }
+ *iter = previter;
+ *count = max_ii(0, min_ii(state->chunk_size, state->stop - previter));
- previter = atomic_fetch_and_add_int32(&state->iter_value, range_pool->chunk_size);
- *r_iter = previter;
- *r_count = max_ii(0, min_ii(range_pool->chunk_size, state->stop - previter));
-
- if (previter >= state->stop) {
- /* At this point the state we got is done, we need to go to the next one. In case some other
- * thread already did it, then this does nothing, and we'll just get current valid state
- * at start of the next loop. */
- atomic_cas_ptr((void **)&range_pool->current_state, state, state->next);
- }
- } while (state != NULL && previter >= state->stop);
-
- *r_state = state;
- return (state != NULL && previter < state->stop);
+ return (previter < state->stop);
}
-static void parallel_range_func(TaskPool *__restrict pool, void *tls_data_idx, int thread_id)
+static void parallel_range_func(TaskPool *__restrict pool, void *userdata_chunk, int thread_id)
{
- TaskParallelRangePool *__restrict range_pool = BLI_task_pool_userdata(pool);
+ ParallelRangeState *__restrict state = BLI_task_pool_userdata(pool);
TaskParallelTLS tls = {
.thread_id = thread_id,
- .userdata_chunk = NULL,
+ .userdata_chunk = userdata_chunk,
};
- TaskParallelRangeState *state;
int iter, count;
- while (parallel_range_next_iter_get(range_pool, &iter, &count, &state)) {
- tls.userdata_chunk = (char *)state->flatten_tls_storage +
- (((size_t)POINTER_AS_INT(tls_data_idx)) * state->tls_data_size);
+ while (parallel_range_next_iter_get(state, &iter, &count)) {
for (int i = 0; i < count; i++) {
- state->func(state->userdata_shared, iter + i, &tls);
+ state->func(state->userdata, iter + i, &tls);
}
}
}
-static void parallel_range_single_thread(TaskParallelRangePool *range_pool)
+static void parallel_range_single_thread(const int start,
+ int const stop,
+ void *userdata,
+ TaskParallelRangeFunc func,
+ const TaskParallelSettings *settings)
{
- for (TaskParallelRangeState *state = range_pool->parallel_range_states; state != NULL;
- state = state->next) {
- const int start = state->start;
- const int stop = state->stop;
- void *userdata = state->userdata_shared;
- TaskParallelRangeFunc func = state->func;
-
- void *initial_tls_memory = state->initial_tls_memory;
- const size_t tls_data_size = state->tls_data_size;
- void *flatten_tls_storage = NULL;
- const bool use_tls_data = (tls_data_size != 0) && (initial_tls_memory != NULL);
- if (use_tls_data) {
- flatten_tls_storage = MALLOCA(tls_data_size);
- memcpy(flatten_tls_storage, initial_tls_memory, tls_data_size);
- }
- TaskParallelTLS tls = {
- .thread_id = 0,
- .userdata_chunk = flatten_tls_storage,
- };
- for (int i = start; i < stop; i++) {
- func(userdata, i, &tls);
- }
- if (state->func_finalize != NULL) {
- state->func_finalize(userdata, flatten_tls_storage);
- }
- MALLOCA_FREE(flatten_tls_storage, tls_data_size);
+ void *userdata_chunk = settings->userdata_chunk;
+ const size_t userdata_chunk_size = settings->userdata_chunk_size;
+ void *userdata_chunk_local = NULL;
+ const bool use_userdata_chunk = (userdata_chunk_size != 0) && (userdata_chunk != NULL);
+ if (use_userdata_chunk) {
+ userdata_chunk_local = MALLOCA(userdata_chunk_size);
+ memcpy(userdata_chunk_local, userdata_chunk, userdata_chunk_size);
}
+ TaskParallelTLS tls = {
+ .thread_id = 0,
+ .userdata_chunk = userdata_chunk_local,
+ };
+ for (int i = start; i < stop; i++) {
+ func(userdata, i, &tls);
+ }
+ if (settings->func_finalize != NULL) {
+ settings->func_finalize(userdata, userdata_chunk_local);
+ }
+ MALLOCA_FREE(userdata_chunk_local, userdata_chunk_size);
}
/**
@@ -1262,85 +1185,78 @@ void BLI_task_parallel_range(const int start,
const int stop,
void *userdata,
TaskParallelRangeFunc func,
- TaskParallelSettings *settings)
+ const TaskParallelSettings *settings)
{
+ TaskScheduler *task_scheduler;
+ TaskPool *task_pool;
+ ParallelRangeState state;
+ int i, num_threads, num_tasks;
+
+ void *userdata_chunk = settings->userdata_chunk;
+ const size_t userdata_chunk_size = settings->userdata_chunk_size;
+ void *userdata_chunk_local = NULL;
+ void *userdata_chunk_array = NULL;
+ const bool use_userdata_chunk = (userdata_chunk_size != 0) && (userdata_chunk != NULL);
+
if (start == stop) {
return;
}
BLI_assert(start < stop);
-
- TaskParallelRangeState state = {
- .next = NULL,
- .start = start,
- .stop = stop,
- .userdata_shared = userdata,
- .func = func,
- .iter_value = start,
- .initial_tls_memory = settings->userdata_chunk,
- .tls_data_size = settings->userdata_chunk_size,
- .func_finalize = settings->func_finalize,
- };
- TaskParallelRangePool range_pool = {
- .pool = NULL, .parallel_range_states = &state, .current_state = NULL, .settings = settings};
- int i, num_threads, num_tasks;
-
- void *tls_data = settings->userdata_chunk;
- const size_t tls_data_size = settings->userdata_chunk_size;
- if (tls_data_size != 0) {
- BLI_assert(tls_data != NULL);
+ if (userdata_chunk_size != 0) {
+ BLI_assert(userdata_chunk != NULL);
}
- const bool use_tls_data = (tls_data_size != 0) && (tls_data != NULL);
- void *flatten_tls_storage = NULL;
/* If it's not enough data to be crunched, don't bother with tasks at all,
- * do everything from the current thread.
+ * do everything from the main thread.
*/
if (!settings->use_threading) {
- parallel_range_single_thread(&range_pool);
+ parallel_range_single_thread(start, stop, userdata, func, settings);
return;
}
- TaskScheduler *task_scheduler = BLI_task_scheduler_get();
+ task_scheduler = BLI_task_scheduler_get();
num_threads = BLI_task_scheduler_num_threads(task_scheduler);
/* The idea here is to prevent creating task for each of the loop iterations
* and instead have tasks which are evenly distributed across CPU cores and
* pull next iter to be crunched using the queue.
*/
- range_pool.num_tasks = num_tasks = num_threads + 2;
+ num_tasks = num_threads + 2;
+
+ state.start = start;
+ state.stop = stop;
+ state.userdata = userdata;
+ state.func = func;
+ state.iter = start;
- task_parallel_range_calc_chunk_size(&range_pool);
- range_pool.num_tasks = num_tasks = min_ii(num_tasks,
- max_ii(1, (stop - start) / range_pool.chunk_size));
+ task_parallel_range_calc_chunk_size(settings, num_tasks, &state);
+ num_tasks = min_ii(num_tasks, max_ii(1, (stop - start) / state.chunk_size));
if (num_tasks == 1) {
- parallel_range_single_thread(&range_pool);
+ parallel_range_single_thread(start, stop, userdata, func, settings);
return;
}
- TaskPool *task_pool = range_pool.pool = BLI_task_pool_create_suspended(task_scheduler,
- &range_pool);
+ task_pool = BLI_task_pool_create_suspended(task_scheduler, &state);
/* NOTE: This way we are adding a memory barrier and ensure all worker
* threads can read and modify the value, without any locks. */
- atomic_cas_ptr((void **)&range_pool.current_state, NULL, &state);
- BLI_assert(range_pool.current_state == &state);
+ atomic_fetch_and_add_int32(&state.iter, 0);
- if (use_tls_data) {
- state.flatten_tls_storage = flatten_tls_storage = MALLOCA(tls_data_size * (size_t)num_tasks);
- state.tls_data_size = tls_data_size;
+ if (use_userdata_chunk) {
+ userdata_chunk_array = MALLOCA(userdata_chunk_size * num_tasks);
}
for (i = 0; i < num_tasks; i++) {
- if (use_tls_data) {
- void *userdata_chunk_local = (char *)flatten_tls_storage + (tls_data_size * (size_t)i);
- memcpy(userdata_chunk_local, tls_data, tls_data_size);
+ if (use_userdata_chunk) {
+ userdata_chunk_local = (char *)userdata_chunk_array + (userdata_chunk_size * i);
+ memcpy(userdata_chunk_local, userdata_chunk, userdata_chunk_size);
}
/* Use this pool's pre-allocated tasks. */
BLI_task_pool_push_from_thread(task_pool,
parallel_range_func,
- POINTER_FROM_INT(i),
+ userdata_chunk_local,
false,
TASK_PRIORITY_HIGH,
task_pool->thread_id);
@@ -1349,222 +1265,15 @@ void BLI_task_parallel_range(const int start,
BLI_task_pool_work_and_wait(task_pool);
BLI_task_pool_free(task_pool);
- if (use_tls_data) {
+ if (use_userdata_chunk) {
if (settings->func_finalize != NULL) {
for (i = 0; i < num_tasks; i++) {
- void *userdata_chunk_local = (char *)flatten_tls_storage + (tls_data_size * (size_t)i);
+ userdata_chunk_local = (char *)userdata_chunk_array + (userdata_chunk_size * i);
settings->func_finalize(userdata, userdata_chunk_local);
}
}
- MALLOCA_FREE(flatten_tls_storage, tls_data_size * (size_t)num_tasks);
- }
-}
-
-/**
- * Initialize a task pool to parallelize several for loops at the same time.
- *
- * See public API doc of ParallelRangeSettings for description of all settings.
- * Note that loop-specific settings (like 'tls' data or finalize function) must be left NULL here.
- * Only settings controlling how iteration is parallelized must be defined, as those will affect
- * all loops added to that pool.
- */
-TaskParallelRangePool *BLI_task_parallel_range_pool_init(const TaskParallelSettings *settings)
-{
- TaskParallelRangePool *range_pool = MEM_callocN(sizeof(*range_pool), __func__);
-
- BLI_assert(settings->userdata_chunk == NULL);
- BLI_assert(settings->func_finalize == NULL);
- range_pool->settings = MEM_mallocN(sizeof(*range_pool->settings), __func__);
- *range_pool->settings = *settings;
-
- return range_pool;
-}
-
-/**
- * Add a loop task to the pool. It does not execute it at all.
- *
- * See public API doc of ParallelRangeSettings for description of all settings.
- * Note that only 'tls'-related data are used here.
- */
-void BLI_task_parallel_range_pool_push(TaskParallelRangePool *range_pool,
- const int start,
- const int stop,
- void *userdata,
- TaskParallelRangeFunc func,
- const TaskParallelSettings *settings)
-{
- BLI_assert(range_pool->pool == NULL);
-
- if (start == stop) {
- return;
- }
-
- BLI_assert(start < stop);
- if (settings->userdata_chunk_size != 0) {
- BLI_assert(settings->userdata_chunk != NULL);
- }
-
- TaskParallelRangeState *state = MEM_callocN(sizeof(*state), __func__);
- state->start = start;
- state->stop = stop;
- state->userdata_shared = userdata;
- state->func = func;
- state->iter_value = start;
- state->initial_tls_memory = settings->userdata_chunk;
- state->tls_data_size = settings->userdata_chunk_size;
- state->func_finalize = settings->func_finalize;
-
- state->next = range_pool->parallel_range_states;
- range_pool->parallel_range_states = state;
-}
-
-static void parallel_range_func_finalize(TaskPool *__restrict pool,
- void *v_state,
- int UNUSED(thread_id))
-{
- TaskParallelRangePool *__restrict range_pool = BLI_task_pool_userdata(pool);
- TaskParallelRangeState *state = v_state;
-
- for (int i = 0; i < range_pool->num_tasks; i++) {
- void *tls_data = (char *)state->flatten_tls_storage + (state->tls_data_size * (size_t)i);
- state->func_finalize(state->userdata_shared, tls_data);
- }
-}
-
-/**
- * Run all tasks pushed to the range_pool.
- *
- * Note that the range pool is re-usable (you may push new tasks into it and call this function
- * again).
- */
-void BLI_task_parallel_range_pool_work_and_wait(TaskParallelRangePool *range_pool)
-{
- BLI_assert(range_pool->pool == NULL);
-
- /* If it's not enough data to be crunched, don't bother with tasks at all,
- * do everything from the current thread.
- */
- if (!range_pool->settings->use_threading) {
- parallel_range_single_thread(range_pool);
- return;
- }
-
- TaskScheduler *task_scheduler = BLI_task_scheduler_get();
- const int num_threads = BLI_task_scheduler_num_threads(task_scheduler);
-
- /* The idea here is to prevent creating task for each of the loop iterations
- * and instead have tasks which are evenly distributed across CPU cores and
- * pull next iter to be crunched using the queue.
- */
- int num_tasks = num_threads + 2;
- range_pool->num_tasks = num_tasks;
-
- task_parallel_range_calc_chunk_size(range_pool);
- range_pool->num_tasks = num_tasks = min_ii(
- num_tasks, max_ii(1, range_pool->num_total_iters / range_pool->chunk_size));
-
- if (num_tasks == 1) {
- parallel_range_single_thread(range_pool);
- return;
- }
-
- /* We create all 'tls' data here in a single loop. */
- for (TaskParallelRangeState *state = range_pool->parallel_range_states; state != NULL;
- state = state->next) {
- void *userdata_chunk = state->initial_tls_memory;
- const size_t userdata_chunk_size = state->tls_data_size;
- if (userdata_chunk_size == 0) {
- BLI_assert(userdata_chunk == NULL);
- continue;
- }
-
- void *userdata_chunk_array = NULL;
- state->flatten_tls_storage = userdata_chunk_array = MALLOCA(userdata_chunk_size *
- (size_t)num_tasks);
- for (int i = 0; i < num_tasks; i++) {
- void *userdata_chunk_local = (char *)userdata_chunk_array +
- (userdata_chunk_size * (size_t)i);
- memcpy(userdata_chunk_local, userdata_chunk, userdata_chunk_size);
- }
- }
-
- TaskPool *task_pool = range_pool->pool = BLI_task_pool_create_suspended(task_scheduler,
- range_pool);
-
- /* NOTE: This way we are adding a memory barrier and ensure all worker
- * threads can read and modify the value, without any locks. */
- atomic_cas_ptr((void **)&range_pool->current_state, NULL, range_pool->parallel_range_states);
- BLI_assert(range_pool->current_state == range_pool->parallel_range_states);
-
- for (int i = 0; i < num_tasks; i++) {
- BLI_task_pool_push_from_thread(task_pool,
- parallel_range_func,
- POINTER_FROM_INT(i),
- false,
- TASK_PRIORITY_HIGH,
- task_pool->thread_id);
- }
-
- BLI_task_pool_work_and_wait(task_pool);
-
- BLI_assert(range_pool->current_state == NULL);
-
- /* Finalize all tasks. */
- for (TaskParallelRangeState *state = range_pool->parallel_range_states; state != NULL;
- state = state->next) {
- const size_t userdata_chunk_size = state->tls_data_size;
- void *userdata_chunk_array = state->flatten_tls_storage;
- if (userdata_chunk_size == 0) {
- BLI_assert(userdata_chunk_array == NULL);
- MEM_freeN(state);
- continue;
- }
-
- if (state->func_finalize != NULL) {
- BLI_task_pool_push_from_thread(task_pool,
- parallel_range_func_finalize,
- state,
- false,
- TASK_PRIORITY_HIGH,
- task_pool->thread_id);
- }
- }
-
- BLI_task_pool_work_and_wait(task_pool);
- BLI_task_pool_free(task_pool);
- range_pool->pool = NULL;
-
- /* Cleanup all tasks. */
- TaskParallelRangeState *state_next;
- for (TaskParallelRangeState *state = range_pool->parallel_range_states; state != NULL;
- state = state_next) {
- state_next = state->next;
-
- const size_t userdata_chunk_size = state->tls_data_size;
- void *userdata_chunk_array = state->flatten_tls_storage;
- if (userdata_chunk_size != 0) {
- BLI_assert(userdata_chunk_array != NULL);
- MALLOCA_FREE(userdata_chunk_array, userdata_chunk_size * (size_t)num_tasks);
- }
-
- MEM_freeN(state);
+ MALLOCA_FREE(userdata_chunk_array, userdata_chunk_size * num_tasks);
}
- range_pool->parallel_range_states = NULL;
-}
-
-/**
- * Clear/free given \a range_pool.
- */
-void BLI_task_parallel_range_pool_free(TaskParallelRangePool *range_pool)
-{
- TaskParallelRangeState *state_next = NULL;
- for (TaskParallelRangeState *state = range_pool->parallel_range_states; state != NULL;
- state = state_next) {
- state_next = state->next;
- MEM_freeN(state);
- }
- MEM_freeN(range_pool->settings);
- MEM_freeN(range_pool);
}
typedef struct TaskParallelIteratorState {