Welcome to mirror list, hosted at ThFree Co, Russian Federation.

git.blender.org/blender.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBastien Montagne <mont29>2019-10-30 14:23:45 +0300
committerBastien Montagne <b.mont29@gmail.com>2019-10-30 14:23:45 +0300
commit29433da4c6e4b2da6f3fa3d1868c5039b4e6be70 (patch)
treed4419c5337f4d47557381d144d2c340f2f85e788 /source/blender/blenlib/intern/task.c
parent3e7af19bf1debc18ce65e611711fb9d96421521e (diff)
BLI_task: Add new generic `BLI_task_parallel_iterator()`.
This new function is part of the 'parallel for loops' functions. It takes an iterator callback to generate items to be processed, in addition to the usual 'process' func callback. This allows to use common code from BLI_task for a wide range of custom iteratiors, whithout having to re-invent the wheel of the whole tasks & data chuncks handling. This supports all settings features from `BLI_task_parallel_range()`, including dynamic and static (if total number of items is knwon) scheduling, TLS data and its finalize callback, etc. One question here is whether we should provide usercode with a spinlock by default, or enforce it to always handle its own sync mechanism. I kept it, since imho it will be needed very often, and generating one is pretty cheap even if unused... ---------- Additionaly, this commit converts (currently unused) `BLI_task_parallel_listbase()` to use that generic code. This was done mostly as proof of concept, but performance-wise it shows some interesting data, roughly: - Very light processing (that should not be threaded anyway) is several times slower, which is expected due to more overhead in loop management code. - Heavier processing can be up to 10% quicker (probably thanks to the switch from dynamic to static scheduling, which reduces a lot locking to fill-in the per-tasks chunks of data). Similar speed-up in non-threaded case comes as a surprise though, not sure what can explain that. While this conversion is not really needed, imho we should keep it (instead of existing code for that function), it's easier to have complex handling logic in as few places as possible, for maintaining and for improving it. Note: That work was initially done to allow for D5372 to be possible... Unfortunately that one proved to be not better than orig code on performances point of view. Reviewed By: sergey Differential Revision: https://developer.blender.org/D5371
Diffstat (limited to 'source/blender/blenlib/intern/task.c')
-rw-r--r--source/blender/blenlib/intern/task.c362
1 files changed, 261 insertions, 101 deletions
diff --git a/source/blender/blenlib/intern/task.c b/source/blender/blenlib/intern/task.c
index 6cdaec97d9a..bb69dc6452f 100644
--- a/source/blender/blenlib/intern/task.c
+++ b/source/blender/blenlib/intern/task.c
@@ -149,7 +149,7 @@ typedef struct TaskThreadLocalStorage {
* without "interrupting" for task execution.
*
* We try to accumulate as much tasks as possible in a local queue without
- * any locks first, and then we push all of them into a schedulers queue
+ * any locks first, and then we push all of them into a scheduler's queue
* from within a single mutex lock.
*/
bool do_delayed_push;
@@ -1052,14 +1052,20 @@ typedef struct ParallelRangeState {
int chunk_size;
} ParallelRangeState;
-BLI_INLINE void task_parallel_range_calc_chunk_size(const TaskParallelSettings *settings,
- const int num_tasks,
- ParallelRangeState *state)
+BLI_INLINE void task_parallel_calc_chunk_size(const TaskParallelSettings *settings,
+ const int tot_items,
+ int num_tasks,
+ int *r_chunk_size)
{
- const int tot_items = state->stop - state->start;
int chunk_size = 0;
- if (settings->min_iter_per_thread > 0) {
+ if (!settings->use_threading) {
+ /* Some users of this helper will still need a valid chunk size in case processing is not
+ * threaded. We can use a bigger one than in default threaded case then. */
+ chunk_size = 1024;
+ num_tasks = 1;
+ }
+ else if (settings->min_iter_per_thread > 0) {
/* Already set by user, no need to do anything here. */
chunk_size = settings->min_iter_per_thread;
}
@@ -1091,16 +1097,30 @@ BLI_INLINE void task_parallel_range_calc_chunk_size(const TaskParallelSettings *
BLI_assert(chunk_size > 0);
- switch (settings->scheduling_mode) {
- case TASK_SCHEDULING_STATIC:
- state->chunk_size = max_ii(chunk_size, tot_items / (num_tasks));
- break;
- case TASK_SCHEDULING_DYNAMIC:
- state->chunk_size = chunk_size;
- break;
+ if (tot_items > 0) {
+ switch (settings->scheduling_mode) {
+ case TASK_SCHEDULING_STATIC:
+ *r_chunk_size = max_ii(chunk_size, tot_items / num_tasks);
+ break;
+ case TASK_SCHEDULING_DYNAMIC:
+ *r_chunk_size = chunk_size;
+ break;
+ }
+ }
+ else {
+ /* If total amount of items is unknown, we can only use dynamic scheduling. */
+ *r_chunk_size = chunk_size;
}
}
+BLI_INLINE void task_parallel_range_calc_chunk_size(const TaskParallelSettings *settings,
+ const int num_tasks,
+ ParallelRangeState *state)
+{
+ task_parallel_calc_chunk_size(
+ settings, state->stop - state->start, num_tasks, &state->chunk_size);
+}
+
BLI_INLINE bool parallel_range_next_iter_get(ParallelRangeState *__restrict state,
int *__restrict iter,
int *__restrict count)
@@ -1256,77 +1276,239 @@ void BLI_task_parallel_range(const int start,
}
}
-#undef MALLOCA
-#undef MALLOCA_FREE
-
-typedef struct ParallelListbaseState {
+typedef struct TaskParallelIteratorState {
void *userdata;
- TaskParallelListbaseFunc func;
+ TaskParallelIteratorIterFunc iter_func;
+ TaskParallelIteratorFunc func;
+
+ /* *** Data used to 'acquire' chunks of items from the iterator. *** */
+ /* Common data also passed to the generator callback. */
+ TaskParallelIteratorStateShared iter_shared;
+ /* Total number of items. If unknown, set it to a negative number. */
+ int tot_items;
+} TaskParallelIteratorState;
+
+BLI_INLINE void task_parallel_iterator_calc_chunk_size(const TaskParallelSettings *settings,
+ const int num_tasks,
+ TaskParallelIteratorState *state)
+{
+ task_parallel_calc_chunk_size(
+ settings, state->tot_items, num_tasks, &state->iter_shared.chunk_size);
+}
- int chunk_size;
- int index;
- Link *link;
- SpinLock lock;
-} ParallelListState;
-
-BLI_INLINE Link *parallel_listbase_next_iter_get(ParallelListState *__restrict state,
- int *__restrict index,
- int *__restrict count)
+static void parallel_iterator_func_do(TaskParallelIteratorState *__restrict state,
+ void *userdata_chunk,
+ int threadid)
{
- int task_count = 0;
- BLI_spin_lock(&state->lock);
- Link *result = state->link;
- if (LIKELY(result != NULL)) {
- *index = state->index;
- while (state->link != NULL && task_count < state->chunk_size) {
- task_count++;
- state->link = state->link->next;
+ TaskParallelTLS tls = {
+ .thread_id = threadid,
+ .userdata_chunk = userdata_chunk,
+ };
+
+ void **current_chunk_items;
+ int *current_chunk_indices;
+ int current_chunk_size;
+
+ const size_t items_size = sizeof(*current_chunk_items) * (size_t)state->iter_shared.chunk_size;
+ const size_t indices_size = sizeof(*current_chunk_indices) *
+ (size_t)state->iter_shared.chunk_size;
+
+ current_chunk_items = MALLOCA(items_size);
+ current_chunk_indices = MALLOCA(indices_size);
+ current_chunk_size = 0;
+
+ for (bool do_abort = false; !do_abort;) {
+ if (state->iter_shared.spin_lock != NULL) {
+ BLI_spin_lock(state->iter_shared.spin_lock);
+ }
+
+ /* Get current status. */
+ int index = state->iter_shared.next_index;
+ void *item = state->iter_shared.next_item;
+ int i;
+
+ /* 'Acquire' a chunk of items from the iterator function. */
+ for (i = 0; i < state->iter_shared.chunk_size && !state->iter_shared.is_finished; i++) {
+ current_chunk_indices[i] = index;
+ current_chunk_items[i] = item;
+ state->iter_func(state->userdata, &tls, &item, &index, &state->iter_shared.is_finished);
+ }
+
+ /* Update current status. */
+ state->iter_shared.next_index = index;
+ state->iter_shared.next_item = item;
+ current_chunk_size = i;
+
+ do_abort = state->iter_shared.is_finished;
+
+ if (state->iter_shared.spin_lock != NULL) {
+ BLI_spin_unlock(state->iter_shared.spin_lock);
+ }
+
+ for (i = 0; i < current_chunk_size; ++i) {
+ state->func(state->userdata, current_chunk_items[i], current_chunk_indices[i], &tls);
}
- state->index += task_count;
}
- BLI_spin_unlock(&state->lock);
- *count = task_count;
- return result;
+
+ MALLOCA_FREE(current_chunk_items, items_size);
+ MALLOCA_FREE(current_chunk_indices, indices_size);
}
-static void parallel_listbase_func(TaskPool *__restrict pool,
- void *UNUSED(taskdata),
- int UNUSED(threadid))
+static void parallel_iterator_func(TaskPool *__restrict pool, void *userdata_chunk, int threadid)
{
- ParallelListState *__restrict state = BLI_task_pool_userdata(pool);
- Link *link;
- int index, count;
+ TaskParallelIteratorState *__restrict state = BLI_task_pool_userdata(pool);
- while ((link = parallel_listbase_next_iter_get(state, &index, &count)) != NULL) {
- for (int i = 0; i < count; i++) {
- state->func(state->userdata, link, index + i);
- link = link->next;
+ parallel_iterator_func_do(state, userdata_chunk, threadid);
+}
+
+static void task_parallel_iterator_no_threads(const TaskParallelSettings *settings,
+ TaskParallelIteratorState *state)
+{
+ /* Prepare user's TLS data. */
+ void *userdata_chunk = settings->userdata_chunk;
+ const size_t userdata_chunk_size = settings->userdata_chunk_size;
+ void *userdata_chunk_local = NULL;
+ const bool use_userdata_chunk = (userdata_chunk_size != 0) && (userdata_chunk != NULL);
+ if (use_userdata_chunk) {
+ userdata_chunk_local = MALLOCA(userdata_chunk_size);
+ memcpy(userdata_chunk_local, userdata_chunk, userdata_chunk_size);
+ }
+
+ /* Also marking it as non-threaded for the iterator callback. */
+ state->iter_shared.spin_lock = NULL;
+
+ parallel_iterator_func_do(state, userdata_chunk, 0);
+
+ if (use_userdata_chunk) {
+ if (settings->func_finalize != NULL) {
+ settings->func_finalize(state->userdata, userdata_chunk_local);
}
+ MALLOCA_FREE(userdata_chunk_local, userdata_chunk_size);
}
}
-static void task_parallel_listbase_no_threads(struct ListBase *listbase,
- void *userdata,
- TaskParallelListbaseFunc func)
+static void task_parallel_iterator_do(const TaskParallelSettings *settings,
+ TaskParallelIteratorState *state)
{
- int i = 0;
- for (Link *link = listbase->first; link != NULL; link = link->next, i++) {
- func(userdata, link, i);
+ TaskScheduler *task_scheduler = BLI_task_scheduler_get();
+ const int num_threads = BLI_task_scheduler_num_threads(task_scheduler);
+
+ task_parallel_iterator_calc_chunk_size(settings, num_threads, state);
+
+ if (!settings->use_threading) {
+ task_parallel_iterator_no_threads(settings, state);
+ return;
}
+
+ const int chunk_size = state->iter_shared.chunk_size;
+ const int tot_items = state->tot_items;
+ const size_t num_tasks = tot_items >= 0 ?
+ (size_t)min_ii(num_threads, state->tot_items / chunk_size) :
+ (size_t)num_threads;
+
+ BLI_assert(num_tasks > 0);
+ if (num_tasks == 1) {
+ task_parallel_iterator_no_threads(settings, state);
+ return;
+ }
+
+ SpinLock spin_lock;
+ BLI_spin_init(&spin_lock);
+ state->iter_shared.spin_lock = &spin_lock;
+
+ void *userdata_chunk = settings->userdata_chunk;
+ const size_t userdata_chunk_size = settings->userdata_chunk_size;
+ void *userdata_chunk_local = NULL;
+ void *userdata_chunk_array = NULL;
+ const bool use_userdata_chunk = (userdata_chunk_size != 0) && (userdata_chunk != NULL);
+
+ TaskPool *task_pool = BLI_task_pool_create_suspended(task_scheduler, state);
+
+ if (use_userdata_chunk) {
+ userdata_chunk_array = MALLOCA(userdata_chunk_size * num_tasks);
+ }
+
+ for (size_t i = 0; i < num_tasks; i++) {
+ if (use_userdata_chunk) {
+ userdata_chunk_local = (char *)userdata_chunk_array + (userdata_chunk_size * i);
+ memcpy(userdata_chunk_local, userdata_chunk, userdata_chunk_size);
+ }
+ /* Use this pool's pre-allocated tasks. */
+ BLI_task_pool_push_from_thread(task_pool,
+ parallel_iterator_func,
+ userdata_chunk_local,
+ false,
+ TASK_PRIORITY_HIGH,
+ task_pool->thread_id);
+ }
+
+ BLI_task_pool_work_and_wait(task_pool);
+ BLI_task_pool_free(task_pool);
+
+ if (use_userdata_chunk) {
+ if (settings->func_finalize != NULL) {
+ for (size_t i = 0; i < num_tasks; i++) {
+ userdata_chunk_local = (char *)userdata_chunk_array + (userdata_chunk_size * i);
+ settings->func_finalize(state->userdata, userdata_chunk_local);
+ }
+ }
+ MALLOCA_FREE(userdata_chunk_array, userdata_chunk_size * num_tasks);
+ }
+
+ BLI_spin_end(&spin_lock);
+ state->iter_shared.spin_lock = NULL;
}
-/* NOTE: The idea here is to compensate for rather measurable threading
- * overhead caused by fetching tasks. With too many CPU threads we are starting
- * to spend too much time in those overheads. */
-BLI_INLINE int task_parallel_listbasecalc_chunk_size(const int num_threads)
+/**
+ * This function allows to parallelize for loops using a generic iterator.
+ *
+ * \param userdata: Common userdata passed to all instances of \a func.
+ * \param iter_func: Callback function used to generate chunks of items.
+ * \param init_item: The initial item, if necessary (may be NULL if unused).
+ * \param init_index: The initial index.
+ * \param tot_items: The total amount of items to iterate over
+ * (if unkown, set it to a negative number).
+ * \param func: Callback function.
+ * \param settings: See public API doc of TaskParallelSettings for description of all settings.
+ *
+ * \note Static scheduling is only available when \a tot_items is >= 0.
+ */
+
+void BLI_task_parallel_iterator(void *userdata,
+ TaskParallelIteratorIterFunc iter_func,
+ void *init_item,
+ const int init_index,
+ const int tot_items,
+ TaskParallelIteratorFunc func,
+ const TaskParallelSettings *settings)
{
- if (num_threads > 32) {
- return 128;
- }
- else if (num_threads > 16) {
- return 64;
+ TaskParallelIteratorState state = {0};
+
+ state.tot_items = tot_items;
+ state.iter_shared.next_index = init_index;
+ state.iter_shared.next_item = init_item;
+ state.iter_shared.is_finished = false;
+ state.userdata = userdata;
+ state.iter_func = iter_func;
+ state.func = func;
+
+ task_parallel_iterator_do(settings, &state);
+}
+
+static void task_parallel_listbase_get(void *__restrict UNUSED(userdata),
+ const TaskParallelTLS *__restrict UNUSED(tls),
+ void **r_next_item,
+ int *r_next_index,
+ bool *r_do_abort)
+{
+ /* Get current status. */
+ Link *link = *r_next_item;
+
+ if (link->next == NULL) {
+ *r_do_abort = true;
}
- return 32;
+ *r_next_item = link->next;
+ (*r_next_index)++;
}
/**
@@ -1335,58 +1517,36 @@ BLI_INLINE int task_parallel_listbasecalc_chunk_size(const int num_threads)
* \param listbase: The double linked list to loop over.
* \param userdata: Common userdata passed to all instances of \a func.
* \param func: Callback function.
- * \param use_threading: If \a true, actually split-execute loop in threads,
- * else just do a sequential forloop
- * (allows caller to use any kind of test to switch on parallelization or not).
+ * \param settings: See public API doc of ParallelRangeSettings for description of all settings.
*
* \note There is no static scheduling here,
* since it would need another full loop over items to count them.
*/
-void BLI_task_parallel_listbase(struct ListBase *listbase,
+void BLI_task_parallel_listbase(ListBase *listbase,
void *userdata,
- TaskParallelListbaseFunc func,
- const bool use_threading)
+ TaskParallelIteratorFunc func,
+ const TaskParallelSettings *settings)
{
if (BLI_listbase_is_empty(listbase)) {
return;
}
- if (!use_threading) {
- task_parallel_listbase_no_threads(listbase, userdata, func);
- return;
- }
- TaskScheduler *task_scheduler = BLI_task_scheduler_get();
- const int num_threads = BLI_task_scheduler_num_threads(task_scheduler);
- /* TODO(sergey): Consider making chunk size configurable. */
- const int chunk_size = task_parallel_listbasecalc_chunk_size(num_threads);
- const int num_tasks = min_ii(num_threads, BLI_listbase_count(listbase) / chunk_size);
- if (num_tasks <= 1) {
- task_parallel_listbase_no_threads(listbase, userdata, func);
- return;
- }
- ParallelListState state;
- TaskPool *task_pool = BLI_task_pool_create_suspended(task_scheduler, &state);
+ TaskParallelIteratorState state = {0};
- state.index = 0;
- state.link = listbase->first;
+ state.tot_items = BLI_listbase_count(listbase);
+ state.iter_shared.next_index = 0;
+ state.iter_shared.next_item = listbase->first;
+ state.iter_shared.is_finished = false;
state.userdata = userdata;
+ state.iter_func = task_parallel_listbase_get;
state.func = func;
- state.chunk_size = chunk_size;
- BLI_spin_init(&state.lock);
- BLI_assert(num_tasks > 0);
- for (int i = 0; i < num_tasks; i++) {
- /* Use this pool's pre-allocated tasks. */
- BLI_task_pool_push_from_thread(
- task_pool, parallel_listbase_func, NULL, false, TASK_PRIORITY_HIGH, task_pool->thread_id);
- }
-
- BLI_task_pool_work_and_wait(task_pool);
- BLI_task_pool_free(task_pool);
-
- BLI_spin_end(&state.lock);
+ task_parallel_iterator_do(settings, &state);
}
+#undef MALLOCA
+#undef MALLOCA_FREE
+
typedef struct ParallelMempoolState {
void *userdata;
TaskParallelMempoolFunc func;