Welcome to mirror list, hosted at ThFree Co, Russian Federation.

git.blender.org/blender.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
Diffstat (limited to 'source/blender/blenlib/intern/task.c')
-rw-r--r--source/blender/blenlib/intern/task.c193
1 files changed, 179 insertions, 14 deletions
diff --git a/source/blender/blenlib/intern/task.c b/source/blender/blenlib/intern/task.c
index 8d867b9f295..d187a8d1968 100644
--- a/source/blender/blenlib/intern/task.c
+++ b/source/blender/blenlib/intern/task.c
@@ -29,9 +29,12 @@
#include "MEM_guardedalloc.h"
#include "BLI_listbase.h"
+#include "BLI_math.h"
#include "BLI_task.h"
#include "BLI_threads.h"
+#include "atomic_ops.h"
+
/* Types */
typedef struct Task {
@@ -48,6 +51,8 @@ struct TaskPool {
volatile size_t num;
volatile size_t done;
+ size_t num_threads;
+ size_t currently_running_tasks;
ThreadMutex num_mutex;
ThreadCondition num_cond;
@@ -83,6 +88,7 @@ static void task_pool_num_decrease(TaskPool *pool, size_t done)
BLI_assert(pool->num >= done);
pool->num -= done;
+ atomic_sub_z(&pool->currently_running_tasks, done);
pool->done += done;
if (pool->num == 0)
@@ -103,19 +109,37 @@ static void task_pool_num_increase(TaskPool *pool)
static bool task_scheduler_thread_wait_pop(TaskScheduler *scheduler, Task **task)
{
+ bool found_task = false;
BLI_mutex_lock(&scheduler->queue_mutex);
while (!scheduler->queue.first && !scheduler->do_exit)
BLI_condition_wait(&scheduler->queue_cond, &scheduler->queue_mutex);
- if (!scheduler->queue.first) {
- BLI_mutex_unlock(&scheduler->queue_mutex);
- BLI_assert(scheduler->do_exit);
- return false;
- }
-
- *task = scheduler->queue.first;
- BLI_remlink(&scheduler->queue, *task);
+ do {
+ Task *current_task;
+ if (!scheduler->queue.first) {
+ BLI_mutex_unlock(&scheduler->queue_mutex);
+ BLI_assert(scheduler->do_exit);
+ return false;
+ }
+ for (current_task = scheduler->queue.first;
+ current_task != NULL;
+ current_task = current_task->next)
+ {
+ TaskPool *pool = current_task->pool;
+ if (pool->num_threads == 0 ||
+ pool->currently_running_tasks < pool->num_threads)
+ {
+ *task = current_task;
+ found_task = true;
+ atomic_add_z(&pool->currently_running_tasks, 1);
+ BLI_remlink(&scheduler->queue, *task);
+ break;
+ }
+ }
+ if (!found_task)
+ BLI_condition_wait(&scheduler->queue_cond, &scheduler->queue_mutex);
+ } while (!found_task);
BLI_mutex_unlock(&scheduler->queue_mutex);
@@ -287,6 +311,8 @@ TaskPool *BLI_task_pool_create(TaskScheduler *scheduler, void *userdata)
pool->scheduler = scheduler;
pool->num = 0;
+ pool->num_threads = 0;
+ pool->currently_running_tasks = 0;
pool->do_cancel = false;
BLI_mutex_init(&pool->num_mutex);
@@ -350,12 +376,16 @@ void BLI_task_pool_work_and_wait(TaskPool *pool)
/* find task from this pool. if we get a task from another pool,
* we can get into deadlock */
- for (task = scheduler->queue.first; task; task = task->next) {
- if (task->pool == pool) {
- work_task = task;
- found_task = true;
- BLI_remlink(&scheduler->queue, task);
- break;
+ if (pool->num_threads == 0 ||
+ pool->currently_running_tasks < pool->num_threads)
+ {
+ for (task = scheduler->queue.first; task; task = task->next) {
+ if (task->pool == pool) {
+ work_task = task;
+ found_task = true;
+ BLI_remlink(&scheduler->queue, task);
+ break;
+ }
}
}
@@ -364,6 +394,7 @@ void BLI_task_pool_work_and_wait(TaskPool *pool)
/* if found task, do it, otherwise wait until other tasks are done */
if (found_task) {
/* run task */
+ atomic_add_z(&pool->currently_running_tasks, 1);
work_task->run(pool, work_task->taskdata, 0);
/* delete task */
@@ -386,6 +417,12 @@ void BLI_task_pool_work_and_wait(TaskPool *pool)
BLI_mutex_unlock(&pool->num_mutex);
}
+void BLI_pool_set_num_threads(TaskPool *pool, int num_threads)
+{
+ /* NOTE: Don't try to modify threads while tasks are running! */
+ pool->num_threads = num_threads;
+}
+
void BLI_task_pool_cancel(TaskPool *pool)
{
pool->do_cancel = true;
@@ -428,3 +465,131 @@ size_t BLI_task_pool_tasks_done(TaskPool *pool)
return pool->done;
}
+/* Parallel range routines */
+
+/**
+ *
+ * Main functions:
+ * - #BLI_task_parallel_range
+ *
+ * TODO:
+ * - #BLI_task_parallel_foreach_listbase (#ListBase - double linked list)
+ * - #BLI_task_parallel_foreach_link (#Link - single linked list)
+ * - #BLI_task_parallel_foreach_ghash/gset (#GHash/#GSet - hash & set)
+ * - #BLI_task_parallel_foreach_mempool (#BLI_mempool - iterate over mempools)
+ *
+ * Possible improvements:
+ *
+ * - Chunk iterations to reduce number of spin locks.
+ */
+
+typedef struct ParallelRangeState {
+ int start, stop;
+ void *userdata;
+ TaskParallelRangeFunc func;
+
+ int iter;
+ int chunk_size;
+ SpinLock lock;
+} ParallelRangeState;
+
+BLI_INLINE bool parallel_range_next_iter_get(
+ ParallelRangeState * __restrict state,
+ int * __restrict iter, int * __restrict count)
+{
+ bool result = false;
+ if (state->iter < state->stop) {
+ BLI_spin_lock(&state->lock);
+ if (state->iter < state->stop) {
+ *count = min_ii(state->chunk_size, state->stop - state->iter);
+ *iter = state->iter;
+ state->iter += *count;
+ result = true;
+ }
+ BLI_spin_unlock(&state->lock);
+ }
+ return result;
+}
+
+static void parallel_range_func(
+ TaskPool * __restrict pool,
+ void *UNUSED(taskdata),
+ int UNUSED(threadid))
+{
+ ParallelRangeState * __restrict state = BLI_task_pool_userdata(pool);
+ int iter, count;
+ while (parallel_range_next_iter_get(state, &iter, &count)) {
+ int i;
+ for (i = 0; i < count; ++i) {
+ state->func(state->userdata, iter + i);
+ }
+ }
+}
+
+void BLI_task_parallel_range_ex(
+ int start, int stop,
+ void *userdata,
+ TaskParallelRangeFunc func,
+ const int range_threshold,
+ const bool use_dynamic_scheduling)
+{
+ TaskScheduler *task_scheduler;
+ TaskPool *task_pool;
+ ParallelRangeState state;
+ int i, num_threads, num_tasks;
+
+ BLI_assert(start < stop);
+
+ /* If it's not enough data to be crunched, don't bother with tasks at all,
+ * do everything from the main thread.
+ */
+ if (stop - start < range_threshold) {
+ for (i = start; i < stop; ++i) {
+ func(userdata, i);
+ }
+ return;
+ }
+
+ task_scheduler = BLI_task_scheduler_get();
+ task_pool = BLI_task_pool_create(task_scheduler, &state);
+ num_threads = BLI_task_scheduler_num_threads(task_scheduler);
+
+ /* The idea here is to prevent creating task for each of the loop iterations
+ * and instead have tasks which are evenly distributed across CPU cores and
+ * pull next iter to be crunched using the queue.
+ */
+ num_tasks = num_threads * 2;
+
+ BLI_spin_init(&state.lock);
+ state.start = start;
+ state.stop = stop;
+ state.userdata = userdata;
+ state.func = func;
+ state.iter = start;
+ if (use_dynamic_scheduling) {
+ state.chunk_size = 32;
+ }
+ else {
+ state.chunk_size = (stop - start) / (num_tasks);
+ }
+
+ for (i = 0; i < num_tasks; i++) {
+ BLI_task_pool_push(task_pool,
+ parallel_range_func,
+ NULL, false,
+ TASK_PRIORITY_HIGH);
+ }
+
+ BLI_task_pool_work_and_wait(task_pool);
+ BLI_task_pool_free(task_pool);
+
+ BLI_spin_end(&state.lock);
+}
+
+void BLI_task_parallel_range(
+ int start, int stop,
+ void *userdata,
+ TaskParallelRangeFunc func)
+{
+ BLI_task_parallel_range_ex(start, stop, userdata, func, 64, false);
+}