Welcome to mirror list, hosted at ThFree Co, Russian Federation.

git.blender.org/blender.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
Diffstat (limited to 'source/blender/blenlib/intern/task.c')
-rw-r--r--source/blender/blenlib/intern/task.c1848
1 files changed, 916 insertions, 932 deletions
diff --git a/source/blender/blenlib/intern/task.c b/source/blender/blenlib/intern/task.c
index 3a6613b2612..57ce4f16b1a 100644
--- a/source/blender/blenlib/intern/task.c
+++ b/source/blender/blenlib/intern/task.c
@@ -59,33 +59,33 @@
#define DELAYED_QUEUE_SIZE 4096
#ifndef NDEBUG
-# define ASSERT_THREAD_ID(scheduler, thread_id) \
- do { \
- if (!BLI_thread_is_main()) { \
- TaskThread *thread = pthread_getspecific(scheduler->tls_id_key); \
- if (thread == NULL) { \
- BLI_assert(thread_id == 0); \
- } \
- else { \
- BLI_assert(thread_id == thread->id); \
- } \
- } \
- else { \
- BLI_assert(thread_id == 0); \
- } \
- } while (false)
+# define ASSERT_THREAD_ID(scheduler, thread_id) \
+ do { \
+ if (!BLI_thread_is_main()) { \
+ TaskThread *thread = pthread_getspecific(scheduler->tls_id_key); \
+ if (thread == NULL) { \
+ BLI_assert(thread_id == 0); \
+ } \
+ else { \
+ BLI_assert(thread_id == thread->id); \
+ } \
+ } \
+ else { \
+ BLI_assert(thread_id == 0); \
+ } \
+ } while (false)
#else
# define ASSERT_THREAD_ID(scheduler, thread_id)
#endif
typedef struct Task {
- struct Task *next, *prev;
+ struct Task *next, *prev;
- TaskRunFunction run;
- void *taskdata;
- bool free_taskdata;
- TaskFreeFunction freedata;
- TaskPool *pool;
+ TaskRunFunction run;
+ void *taskdata;
+ bool free_taskdata;
+ TaskFreeFunction freedata;
+ TaskPool *pool;
} Task;
/* This is a per-thread storage of pre-allocated tasks.
@@ -115,471 +115,467 @@ typedef struct Task {
* pool which corresponds to a thread which handled the task.
*/
typedef struct TaskMemPool {
- /* Number of pre-allocated tasks in the pool. */
- int num_tasks;
- /* Pre-allocated task memory pointers. */
- Task *tasks[MEMPOOL_SIZE];
+ /* Number of pre-allocated tasks in the pool. */
+ int num_tasks;
+ /* Pre-allocated task memory pointers. */
+ Task *tasks[MEMPOOL_SIZE];
} TaskMemPool;
#ifdef DEBUG_STATS
typedef struct TaskMemPoolStats {
- /* Number of allocations. */
- int num_alloc;
- /* Number of avoided allocations (pointer was re-used from the pool). */
- int num_reuse;
- /* Number of discarded memory due to pool saturation, */
- int num_discard;
+ /* Number of allocations. */
+ int num_alloc;
+ /* Number of avoided allocations (pointer was re-used from the pool). */
+ int num_reuse;
+ /* Number of discarded memory due to pool saturation, */
+ int num_discard;
} TaskMemPoolStats;
#endif
typedef struct TaskThreadLocalStorage {
- /* Memory pool for faster task allocation.
- * The idea is to re-use memory of finished/discarded tasks by this thread.
- */
- TaskMemPool task_mempool;
-
- /* Local queue keeps thread alive by keeping small amount of tasks ready
- * to be picked up without causing global thread locks for synchronization.
- */
- int num_local_queue;
- Task *local_queue[LOCAL_QUEUE_SIZE];
-
- /* Thread can be marked for delayed tasks push. This is helpful when it's
- * know that lots of subsequent task pushed will happen from the same thread
- * without "interrupting" for task execution.
- *
- * We try to accumulate as much tasks as possible in a local queue without
- * any locks first, and then we push all of them into a scheduler's queue
- * from within a single mutex lock.
- */
- bool do_delayed_push;
- int num_delayed_queue;
- Task *delayed_queue[DELAYED_QUEUE_SIZE];
+ /* Memory pool for faster task allocation.
+ * The idea is to re-use memory of finished/discarded tasks by this thread.
+ */
+ TaskMemPool task_mempool;
+
+ /* Local queue keeps thread alive by keeping small amount of tasks ready
+ * to be picked up without causing global thread locks for synchronization.
+ */
+ int num_local_queue;
+ Task *local_queue[LOCAL_QUEUE_SIZE];
+
+ /* Thread can be marked for delayed tasks push. This is helpful when it's
+ * know that lots of subsequent task pushed will happen from the same thread
+ * without "interrupting" for task execution.
+ *
+ * We try to accumulate as much tasks as possible in a local queue without
+ * any locks first, and then we push all of them into a scheduler's queue
+ * from within a single mutex lock.
+ */
+ bool do_delayed_push;
+ int num_delayed_queue;
+ Task *delayed_queue[DELAYED_QUEUE_SIZE];
} TaskThreadLocalStorage;
struct TaskPool {
- TaskScheduler *scheduler;
-
- volatile size_t num;
- ThreadMutex num_mutex;
- ThreadCondition num_cond;
-
- void *userdata;
- ThreadMutex user_mutex;
-
- volatile bool do_cancel;
- volatile bool do_work;
-
- volatile bool is_suspended;
- bool start_suspended;
- ListBase suspended_queue;
- size_t num_suspended;
-
- /* If set, this pool may never be work_and_wait'ed, which means TaskScheduler
- * has to use its special background fallback thread in case we are in
- * single-threaded situation.
- */
- bool run_in_background;
-
- /* This is a task scheduler's ID of a thread at which pool was constructed.
- * It will be used to access task TLS.
- */
- int thread_id;
-
- /* For the pools which are created from non-main thread which is not a
- * scheduler worker thread we can't re-use any of scheduler's threads TLS
- * and have to use our own one.
- */
- bool use_local_tls;
- TaskThreadLocalStorage local_tls;
+ TaskScheduler *scheduler;
+
+ volatile size_t num;
+ ThreadMutex num_mutex;
+ ThreadCondition num_cond;
+
+ void *userdata;
+ ThreadMutex user_mutex;
+
+ volatile bool do_cancel;
+ volatile bool do_work;
+
+ volatile bool is_suspended;
+ bool start_suspended;
+ ListBase suspended_queue;
+ size_t num_suspended;
+
+ /* If set, this pool may never be work_and_wait'ed, which means TaskScheduler
+ * has to use its special background fallback thread in case we are in
+ * single-threaded situation.
+ */
+ bool run_in_background;
+
+ /* This is a task scheduler's ID of a thread at which pool was constructed.
+ * It will be used to access task TLS.
+ */
+ int thread_id;
+
+ /* For the pools which are created from non-main thread which is not a
+ * scheduler worker thread we can't re-use any of scheduler's threads TLS
+ * and have to use our own one.
+ */
+ bool use_local_tls;
+ TaskThreadLocalStorage local_tls;
#ifndef NDEBUG
- pthread_t creator_thread_id;
+ pthread_t creator_thread_id;
#endif
#ifdef DEBUG_STATS
- TaskMemPoolStats *mempool_stats;
+ TaskMemPoolStats *mempool_stats;
#endif
};
struct TaskScheduler {
- pthread_t *threads;
- struct TaskThread *task_threads;
- int num_threads;
- bool background_thread_only;
+ pthread_t *threads;
+ struct TaskThread *task_threads;
+ int num_threads;
+ bool background_thread_only;
- ListBase queue;
- ThreadMutex queue_mutex;
- ThreadCondition queue_cond;
+ ListBase queue;
+ ThreadMutex queue_mutex;
+ ThreadCondition queue_cond;
- volatile bool do_exit;
+ volatile bool do_exit;
- /* NOTE: In pthread's TLS we store the whole TaskThread structure. */
- pthread_key_t tls_id_key;
+ /* NOTE: In pthread's TLS we store the whole TaskThread structure. */
+ pthread_key_t tls_id_key;
};
typedef struct TaskThread {
- TaskScheduler *scheduler;
- int id;
- TaskThreadLocalStorage tls;
+ TaskScheduler *scheduler;
+ int id;
+ TaskThreadLocalStorage tls;
} TaskThread;
/* Helper */
BLI_INLINE void task_data_free(Task *task, const int thread_id)
{
- if (task->free_taskdata) {
- if (task->freedata) {
- task->freedata(task->pool, task->taskdata, thread_id);
- }
- else {
- MEM_freeN(task->taskdata);
- }
- }
+ if (task->free_taskdata) {
+ if (task->freedata) {
+ task->freedata(task->pool, task->taskdata, thread_id);
+ }
+ else {
+ MEM_freeN(task->taskdata);
+ }
+ }
}
BLI_INLINE void initialize_task_tls(TaskThreadLocalStorage *tls)
{
- memset(tls, 0, sizeof(TaskThreadLocalStorage));
+ memset(tls, 0, sizeof(TaskThreadLocalStorage));
}
-BLI_INLINE TaskThreadLocalStorage *get_task_tls(TaskPool *pool,
- const int thread_id)
+BLI_INLINE TaskThreadLocalStorage *get_task_tls(TaskPool *pool, const int thread_id)
{
- TaskScheduler *scheduler = pool->scheduler;
- BLI_assert(thread_id >= 0);
- BLI_assert(thread_id <= scheduler->num_threads);
- if (pool->use_local_tls && thread_id == 0) {
- BLI_assert(pool->thread_id == 0);
- BLI_assert(!BLI_thread_is_main());
- BLI_assert(pthread_equal(pthread_self(), pool->creator_thread_id));
- return &pool->local_tls;
- }
- if (thread_id == 0) {
- BLI_assert(BLI_thread_is_main());
- return &scheduler->task_threads[pool->thread_id].tls;
- }
- return &scheduler->task_threads[thread_id].tls;
+ TaskScheduler *scheduler = pool->scheduler;
+ BLI_assert(thread_id >= 0);
+ BLI_assert(thread_id <= scheduler->num_threads);
+ if (pool->use_local_tls && thread_id == 0) {
+ BLI_assert(pool->thread_id == 0);
+ BLI_assert(!BLI_thread_is_main());
+ BLI_assert(pthread_equal(pthread_self(), pool->creator_thread_id));
+ return &pool->local_tls;
+ }
+ if (thread_id == 0) {
+ BLI_assert(BLI_thread_is_main());
+ return &scheduler->task_threads[pool->thread_id].tls;
+ }
+ return &scheduler->task_threads[thread_id].tls;
}
BLI_INLINE void free_task_tls(TaskThreadLocalStorage *tls)
{
- TaskMemPool *task_mempool = &tls->task_mempool;
- for (int i = 0; i < task_mempool->num_tasks; ++i) {
- MEM_freeN(task_mempool->tasks[i]);
- }
+ TaskMemPool *task_mempool = &tls->task_mempool;
+ for (int i = 0; i < task_mempool->num_tasks; ++i) {
+ MEM_freeN(task_mempool->tasks[i]);
+ }
}
static Task *task_alloc(TaskPool *pool, const int thread_id)
{
- BLI_assert(thread_id <= pool->scheduler->num_threads);
- if (thread_id != -1) {
- BLI_assert(thread_id >= 0);
- BLI_assert(thread_id <= pool->scheduler->num_threads);
- TaskThreadLocalStorage *tls = get_task_tls(pool, thread_id);
- TaskMemPool *task_mempool = &tls->task_mempool;
- /* Try to re-use task memory from a thread local storage. */
- if (task_mempool->num_tasks > 0) {
- --task_mempool->num_tasks;
- /* Success! We've just avoided task allocation. */
+ BLI_assert(thread_id <= pool->scheduler->num_threads);
+ if (thread_id != -1) {
+ BLI_assert(thread_id >= 0);
+ BLI_assert(thread_id <= pool->scheduler->num_threads);
+ TaskThreadLocalStorage *tls = get_task_tls(pool, thread_id);
+ TaskMemPool *task_mempool = &tls->task_mempool;
+ /* Try to re-use task memory from a thread local storage. */
+ if (task_mempool->num_tasks > 0) {
+ --task_mempool->num_tasks;
+ /* Success! We've just avoided task allocation. */
#ifdef DEBUG_STATS
- pool->mempool_stats[thread_id].num_reuse++;
+ pool->mempool_stats[thread_id].num_reuse++;
#endif
- return task_mempool->tasks[task_mempool->num_tasks];
- }
- /* We are doomed to allocate new task data. */
+ return task_mempool->tasks[task_mempool->num_tasks];
+ }
+ /* We are doomed to allocate new task data. */
#ifdef DEBUG_STATS
- pool->mempool_stats[thread_id].num_alloc++;
+ pool->mempool_stats[thread_id].num_alloc++;
#endif
- }
- return MEM_mallocN(sizeof(Task), "New task");
+ }
+ return MEM_mallocN(sizeof(Task), "New task");
}
static void task_free(TaskPool *pool, Task *task, const int thread_id)
{
- task_data_free(task, thread_id);
- BLI_assert(thread_id >= 0);
- BLI_assert(thread_id <= pool->scheduler->num_threads);
- if (thread_id == 0) {
- BLI_assert(pool->use_local_tls || BLI_thread_is_main());
- }
- TaskThreadLocalStorage *tls = get_task_tls(pool, thread_id);
- TaskMemPool *task_mempool = &tls->task_mempool;
- if (task_mempool->num_tasks < MEMPOOL_SIZE - 1) {
- /* Successfully allowed the task to be re-used later. */
- task_mempool->tasks[task_mempool->num_tasks] = task;
- ++task_mempool->num_tasks;
- }
- else {
- /* Local storage saturated, no other way than just discard
- * the memory.
- *
- * TODO(sergey): We can perhaps store such pointer in a global
- * scheduler pool, maybe it'll be faster than discarding and
- * allocating again.
- */
- MEM_freeN(task);
+ task_data_free(task, thread_id);
+ BLI_assert(thread_id >= 0);
+ BLI_assert(thread_id <= pool->scheduler->num_threads);
+ if (thread_id == 0) {
+ BLI_assert(pool->use_local_tls || BLI_thread_is_main());
+ }
+ TaskThreadLocalStorage *tls = get_task_tls(pool, thread_id);
+ TaskMemPool *task_mempool = &tls->task_mempool;
+ if (task_mempool->num_tasks < MEMPOOL_SIZE - 1) {
+ /* Successfully allowed the task to be re-used later. */
+ task_mempool->tasks[task_mempool->num_tasks] = task;
+ ++task_mempool->num_tasks;
+ }
+ else {
+ /* Local storage saturated, no other way than just discard
+ * the memory.
+ *
+ * TODO(sergey): We can perhaps store such pointer in a global
+ * scheduler pool, maybe it'll be faster than discarding and
+ * allocating again.
+ */
+ MEM_freeN(task);
#ifdef DEBUG_STATS
- pool->mempool_stats[thread_id].num_discard++;
+ pool->mempool_stats[thread_id].num_discard++;
#endif
- }
+ }
}
/* Task Scheduler */
static void task_pool_num_decrease(TaskPool *pool, size_t done)
{
- BLI_mutex_lock(&pool->num_mutex);
+ BLI_mutex_lock(&pool->num_mutex);
- BLI_assert(pool->num >= done);
+ BLI_assert(pool->num >= done);
- pool->num -= done;
+ pool->num -= done;
- if (pool->num == 0) {
- BLI_condition_notify_all(&pool->num_cond);
- }
+ if (pool->num == 0) {
+ BLI_condition_notify_all(&pool->num_cond);
+ }
- BLI_mutex_unlock(&pool->num_mutex);
+ BLI_mutex_unlock(&pool->num_mutex);
}
static void task_pool_num_increase(TaskPool *pool, size_t new)
{
- BLI_mutex_lock(&pool->num_mutex);
+ BLI_mutex_lock(&pool->num_mutex);
- pool->num += new;
- BLI_condition_notify_all(&pool->num_cond);
+ pool->num += new;
+ BLI_condition_notify_all(&pool->num_cond);
- BLI_mutex_unlock(&pool->num_mutex);
+ BLI_mutex_unlock(&pool->num_mutex);
}
static bool task_scheduler_thread_wait_pop(TaskScheduler *scheduler, Task **task)
{
- bool found_task = false;
- BLI_mutex_lock(&scheduler->queue_mutex);
-
- while (!scheduler->queue.first && !scheduler->do_exit) {
- BLI_condition_wait(&scheduler->queue_cond, &scheduler->queue_mutex);
- }
-
- do {
- Task *current_task;
-
- /* Assuming we can only have a void queue in 'exit' case here seems logical
- * (we should only be here after our worker thread has been woken up from a
- * condition_wait(), which only happens after a new task was added to the queue),
- * but it is wrong.
- * Waiting on condition may wake up the thread even if condition is not signaled
- * (spurious wake-ups), and some race condition may also empty the queue **after**
- * condition has been signaled, but **before** awoken thread reaches this point...
- * See http://stackoverflow.com/questions/8594591
- *
- * So we only abort here if do_exit is set.
- */
- if (scheduler->do_exit) {
- BLI_mutex_unlock(&scheduler->queue_mutex);
- return false;
- }
-
- for (current_task = scheduler->queue.first;
- current_task != NULL;
- current_task = current_task->next)
- {
- TaskPool *pool = current_task->pool;
-
- if (scheduler->background_thread_only && !pool->run_in_background) {
- continue;
- }
-
- *task = current_task;
- found_task = true;
- BLI_remlink(&scheduler->queue, *task);
- break;
- }
- if (!found_task) {
- BLI_condition_wait(&scheduler->queue_cond, &scheduler->queue_mutex);
- }
- } while (!found_task);
-
- BLI_mutex_unlock(&scheduler->queue_mutex);
-
- return true;
+ bool found_task = false;
+ BLI_mutex_lock(&scheduler->queue_mutex);
+
+ while (!scheduler->queue.first && !scheduler->do_exit) {
+ BLI_condition_wait(&scheduler->queue_cond, &scheduler->queue_mutex);
+ }
+
+ do {
+ Task *current_task;
+
+ /* Assuming we can only have a void queue in 'exit' case here seems logical
+ * (we should only be here after our worker thread has been woken up from a
+ * condition_wait(), which only happens after a new task was added to the queue),
+ * but it is wrong.
+ * Waiting on condition may wake up the thread even if condition is not signaled
+ * (spurious wake-ups), and some race condition may also empty the queue **after**
+ * condition has been signaled, but **before** awoken thread reaches this point...
+ * See http://stackoverflow.com/questions/8594591
+ *
+ * So we only abort here if do_exit is set.
+ */
+ if (scheduler->do_exit) {
+ BLI_mutex_unlock(&scheduler->queue_mutex);
+ return false;
+ }
+
+ for (current_task = scheduler->queue.first; current_task != NULL;
+ current_task = current_task->next) {
+ TaskPool *pool = current_task->pool;
+
+ if (scheduler->background_thread_only && !pool->run_in_background) {
+ continue;
+ }
+
+ *task = current_task;
+ found_task = true;
+ BLI_remlink(&scheduler->queue, *task);
+ break;
+ }
+ if (!found_task) {
+ BLI_condition_wait(&scheduler->queue_cond, &scheduler->queue_mutex);
+ }
+ } while (!found_task);
+
+ BLI_mutex_unlock(&scheduler->queue_mutex);
+
+ return true;
}
-BLI_INLINE void handle_local_queue(TaskThreadLocalStorage *tls,
- const int thread_id)
+BLI_INLINE void handle_local_queue(TaskThreadLocalStorage *tls, const int thread_id)
{
- BLI_assert(!tls->do_delayed_push);
- while (tls->num_local_queue > 0) {
- /* We pop task from queue before handling it so handler of the task can
- * push next job to the local queue.
- */
- tls->num_local_queue--;
- Task *local_task = tls->local_queue[tls->num_local_queue];
- /* TODO(sergey): Double-check work_and_wait() doesn't handle other's
- * pool tasks.
- */
- TaskPool *local_pool = local_task->pool;
- local_task->run(local_pool, local_task->taskdata, thread_id);
- task_free(local_pool, local_task, thread_id);
- }
- BLI_assert(!tls->do_delayed_push);
+ BLI_assert(!tls->do_delayed_push);
+ while (tls->num_local_queue > 0) {
+ /* We pop task from queue before handling it so handler of the task can
+ * push next job to the local queue.
+ */
+ tls->num_local_queue--;
+ Task *local_task = tls->local_queue[tls->num_local_queue];
+ /* TODO(sergey): Double-check work_and_wait() doesn't handle other's
+ * pool tasks.
+ */
+ TaskPool *local_pool = local_task->pool;
+ local_task->run(local_pool, local_task->taskdata, thread_id);
+ task_free(local_pool, local_task, thread_id);
+ }
+ BLI_assert(!tls->do_delayed_push);
}
static void *task_scheduler_thread_run(void *thread_p)
{
- TaskThread *thread = (TaskThread *) thread_p;
- TaskThreadLocalStorage *tls = &thread->tls;
- TaskScheduler *scheduler = thread->scheduler;
- int thread_id = thread->id;
- Task *task;
+ TaskThread *thread = (TaskThread *)thread_p;
+ TaskThreadLocalStorage *tls = &thread->tls;
+ TaskScheduler *scheduler = thread->scheduler;
+ int thread_id = thread->id;
+ Task *task;
- pthread_setspecific(scheduler->tls_id_key, thread);
+ pthread_setspecific(scheduler->tls_id_key, thread);
- /* keep popping off tasks */
- while (task_scheduler_thread_wait_pop(scheduler, &task)) {
- TaskPool *pool = task->pool;
+ /* keep popping off tasks */
+ while (task_scheduler_thread_wait_pop(scheduler, &task)) {
+ TaskPool *pool = task->pool;
- /* run task */
- BLI_assert(!tls->do_delayed_push);
- task->run(pool, task->taskdata, thread_id);
- BLI_assert(!tls->do_delayed_push);
+ /* run task */
+ BLI_assert(!tls->do_delayed_push);
+ task->run(pool, task->taskdata, thread_id);
+ BLI_assert(!tls->do_delayed_push);
- /* delete task */
- task_free(pool, task, thread_id);
+ /* delete task */
+ task_free(pool, task, thread_id);
- /* Handle all tasks from local queue. */
- handle_local_queue(tls, thread_id);
+ /* Handle all tasks from local queue. */
+ handle_local_queue(tls, thread_id);
- /* notify pool task was done */
- task_pool_num_decrease(pool, 1);
- }
+ /* notify pool task was done */
+ task_pool_num_decrease(pool, 1);
+ }
- return NULL;
+ return NULL;
}
TaskScheduler *BLI_task_scheduler_create(int num_threads)
{
- TaskScheduler *scheduler = MEM_callocN(sizeof(TaskScheduler), "TaskScheduler");
+ TaskScheduler *scheduler = MEM_callocN(sizeof(TaskScheduler), "TaskScheduler");
- /* multiple places can use this task scheduler, sharing the same
- * threads, so we keep track of the number of users. */
- scheduler->do_exit = false;
+ /* multiple places can use this task scheduler, sharing the same
+ * threads, so we keep track of the number of users. */
+ scheduler->do_exit = false;
- BLI_listbase_clear(&scheduler->queue);
- BLI_mutex_init(&scheduler->queue_mutex);
- BLI_condition_init(&scheduler->queue_cond);
+ BLI_listbase_clear(&scheduler->queue);
+ BLI_mutex_init(&scheduler->queue_mutex);
+ BLI_condition_init(&scheduler->queue_cond);
- if (num_threads == 0) {
- /* automatic number of threads will be main thread + num cores */
- num_threads = BLI_system_thread_count();
- }
+ if (num_threads == 0) {
+ /* automatic number of threads will be main thread + num cores */
+ num_threads = BLI_system_thread_count();
+ }
- /* main thread will also work, so we count it too */
- num_threads -= 1;
+ /* main thread will also work, so we count it too */
+ num_threads -= 1;
- /* Add background-only thread if needed. */
- if (num_threads == 0) {
- scheduler->background_thread_only = true;
- num_threads = 1;
- }
+ /* Add background-only thread if needed. */
+ if (num_threads == 0) {
+ scheduler->background_thread_only = true;
+ num_threads = 1;
+ }
- scheduler->task_threads = MEM_mallocN(sizeof(TaskThread) * (num_threads + 1),
- "TaskScheduler task threads");
+ scheduler->task_threads = MEM_mallocN(sizeof(TaskThread) * (num_threads + 1),
+ "TaskScheduler task threads");
- /* Initialize TLS for main thread. */
- initialize_task_tls(&scheduler->task_threads[0].tls);
+ /* Initialize TLS for main thread. */
+ initialize_task_tls(&scheduler->task_threads[0].tls);
- pthread_key_create(&scheduler->tls_id_key, NULL);
+ pthread_key_create(&scheduler->tls_id_key, NULL);
- /* launch threads that will be waiting for work */
- if (num_threads > 0) {
- int i;
+ /* launch threads that will be waiting for work */
+ if (num_threads > 0) {
+ int i;
- scheduler->num_threads = num_threads;
- scheduler->threads = MEM_callocN(sizeof(pthread_t) * num_threads, "TaskScheduler threads");
+ scheduler->num_threads = num_threads;
+ scheduler->threads = MEM_callocN(sizeof(pthread_t) * num_threads, "TaskScheduler threads");
- for (i = 0; i < num_threads; i++) {
- TaskThread *thread = &scheduler->task_threads[i + 1];
- thread->scheduler = scheduler;
- thread->id = i + 1;
- initialize_task_tls(&thread->tls);
+ for (i = 0; i < num_threads; i++) {
+ TaskThread *thread = &scheduler->task_threads[i + 1];
+ thread->scheduler = scheduler;
+ thread->id = i + 1;
+ initialize_task_tls(&thread->tls);
- if (pthread_create(&scheduler->threads[i], NULL, task_scheduler_thread_run, thread) != 0) {
- fprintf(stderr, "TaskScheduler failed to launch thread %d/%d\n", i, num_threads);
- }
- }
- }
+ if (pthread_create(&scheduler->threads[i], NULL, task_scheduler_thread_run, thread) != 0) {
+ fprintf(stderr, "TaskScheduler failed to launch thread %d/%d\n", i, num_threads);
+ }
+ }
+ }
- return scheduler;
+ return scheduler;
}
void BLI_task_scheduler_free(TaskScheduler *scheduler)
{
- Task *task;
-
- /* stop all waiting threads */
- BLI_mutex_lock(&scheduler->queue_mutex);
- scheduler->do_exit = true;
- BLI_condition_notify_all(&scheduler->queue_cond);
- BLI_mutex_unlock(&scheduler->queue_mutex);
-
- pthread_key_delete(scheduler->tls_id_key);
-
- /* delete threads */
- if (scheduler->threads) {
- int i;
-
- for (i = 0; i < scheduler->num_threads; i++) {
- if (pthread_join(scheduler->threads[i], NULL) != 0) {
- fprintf(stderr, "TaskScheduler failed to join thread %d/%d\n", i, scheduler->num_threads);
- }
- }
-
- MEM_freeN(scheduler->threads);
- }
-
- /* Delete task thread data */
- if (scheduler->task_threads) {
- for (int i = 0; i < scheduler->num_threads + 1; ++i) {
- TaskThreadLocalStorage *tls = &scheduler->task_threads[i].tls;
- free_task_tls(tls);
- }
-
- MEM_freeN(scheduler->task_threads);
- }
-
- /* delete leftover tasks */
- for (task = scheduler->queue.first; task; task = task->next) {
- task_data_free(task, 0);
- }
- BLI_freelistN(&scheduler->queue);
-
- /* delete mutex/condition */
- BLI_mutex_end(&scheduler->queue_mutex);
- BLI_condition_end(&scheduler->queue_cond);
-
- MEM_freeN(scheduler);
+ Task *task;
+
+ /* stop all waiting threads */
+ BLI_mutex_lock(&scheduler->queue_mutex);
+ scheduler->do_exit = true;
+ BLI_condition_notify_all(&scheduler->queue_cond);
+ BLI_mutex_unlock(&scheduler->queue_mutex);
+
+ pthread_key_delete(scheduler->tls_id_key);
+
+ /* delete threads */
+ if (scheduler->threads) {
+ int i;
+
+ for (i = 0; i < scheduler->num_threads; i++) {
+ if (pthread_join(scheduler->threads[i], NULL) != 0) {
+ fprintf(stderr, "TaskScheduler failed to join thread %d/%d\n", i, scheduler->num_threads);
+ }
+ }
+
+ MEM_freeN(scheduler->threads);
+ }
+
+ /* Delete task thread data */
+ if (scheduler->task_threads) {
+ for (int i = 0; i < scheduler->num_threads + 1; ++i) {
+ TaskThreadLocalStorage *tls = &scheduler->task_threads[i].tls;
+ free_task_tls(tls);
+ }
+
+ MEM_freeN(scheduler->task_threads);
+ }
+
+ /* delete leftover tasks */
+ for (task = scheduler->queue.first; task; task = task->next) {
+ task_data_free(task, 0);
+ }
+ BLI_freelistN(&scheduler->queue);
+
+ /* delete mutex/condition */
+ BLI_mutex_end(&scheduler->queue_mutex);
+ BLI_condition_end(&scheduler->queue_cond);
+
+ MEM_freeN(scheduler);
}
int BLI_task_scheduler_num_threads(TaskScheduler *scheduler)
{
- return scheduler->num_threads + 1;
+ return scheduler->num_threads + 1;
}
static void task_scheduler_push(TaskScheduler *scheduler, Task *task, TaskPriority priority)
{
- task_pool_num_increase(task->pool, 1);
+ task_pool_num_increase(task->pool, 1);
- /* add task to queue */
- BLI_mutex_lock(&scheduler->queue_mutex);
+ /* add task to queue */
+ BLI_mutex_lock(&scheduler->queue_mutex);
- if (priority == TASK_PRIORITY_HIGH) {
- BLI_addhead(&scheduler->queue, task);
- }
- else {
- BLI_addtail(&scheduler->queue, task);
- }
+ if (priority == TASK_PRIORITY_HIGH) {
+ BLI_addhead(&scheduler->queue, task);
+ }
+ else {
+ BLI_addtail(&scheduler->queue, task);
+ }
- BLI_condition_notify_one(&scheduler->queue_cond);
- BLI_mutex_unlock(&scheduler->queue_mutex);
+ BLI_condition_notify_one(&scheduler->queue_cond);
+ BLI_mutex_unlock(&scheduler->queue_mutex);
}
static void task_scheduler_push_all(TaskScheduler *scheduler,
@@ -587,45 +583,45 @@ static void task_scheduler_push_all(TaskScheduler *scheduler,
Task **tasks,
int num_tasks)
{
- if (num_tasks == 0) {
- return;
- }
+ if (num_tasks == 0) {
+ return;
+ }
- task_pool_num_increase(pool, num_tasks);
+ task_pool_num_increase(pool, num_tasks);
- BLI_mutex_lock(&scheduler->queue_mutex);
+ BLI_mutex_lock(&scheduler->queue_mutex);
- for (int i = 0; i < num_tasks; i++) {
- BLI_addhead(&scheduler->queue, tasks[i]);
- }
+ for (int i = 0; i < num_tasks; i++) {
+ BLI_addhead(&scheduler->queue, tasks[i]);
+ }
- BLI_condition_notify_all(&scheduler->queue_cond);
- BLI_mutex_unlock(&scheduler->queue_mutex);
+ BLI_condition_notify_all(&scheduler->queue_cond);
+ BLI_mutex_unlock(&scheduler->queue_mutex);
}
static void task_scheduler_clear(TaskScheduler *scheduler, TaskPool *pool)
{
- Task *task, *nexttask;
- size_t done = 0;
+ Task *task, *nexttask;
+ size_t done = 0;
- BLI_mutex_lock(&scheduler->queue_mutex);
+ BLI_mutex_lock(&scheduler->queue_mutex);
- /* free all tasks from this pool from the queue */
- for (task = scheduler->queue.first; task; task = nexttask) {
- nexttask = task->next;
+ /* free all tasks from this pool from the queue */
+ for (task = scheduler->queue.first; task; task = nexttask) {
+ nexttask = task->next;
- if (task->pool == pool) {
- task_data_free(task, pool->thread_id);
- BLI_freelinkN(&scheduler->queue, task);
+ if (task->pool == pool) {
+ task_data_free(task, pool->thread_id);
+ BLI_freelinkN(&scheduler->queue, task);
- done++;
- }
- }
+ done++;
+ }
+ }
- BLI_mutex_unlock(&scheduler->queue_mutex);
+ BLI_mutex_unlock(&scheduler->queue_mutex);
- /* notify done */
- task_pool_num_decrease(pool, done);
+ /* notify done */
+ task_pool_num_decrease(pool, done);
}
/* Task Pool */
@@ -635,76 +631,75 @@ static TaskPool *task_pool_create_ex(TaskScheduler *scheduler,
const bool is_background,
const bool is_suspended)
{
- TaskPool *pool = MEM_mallocN(sizeof(TaskPool), "TaskPool");
+ TaskPool *pool = MEM_mallocN(sizeof(TaskPool), "TaskPool");
#ifndef NDEBUG
- /* Assert we do not try to create a background pool from some parent task -
- * those only work OK from main thread. */
- if (is_background) {
- const pthread_t thread_id = pthread_self();
- int i = scheduler->num_threads;
-
- while (i--) {
- BLI_assert(!pthread_equal(scheduler->threads[i], thread_id));
- }
- }
+ /* Assert we do not try to create a background pool from some parent task -
+ * those only work OK from main thread. */
+ if (is_background) {
+ const pthread_t thread_id = pthread_self();
+ int i = scheduler->num_threads;
+
+ while (i--) {
+ BLI_assert(!pthread_equal(scheduler->threads[i], thread_id));
+ }
+ }
#endif
- pool->scheduler = scheduler;
- pool->num = 0;
- pool->do_cancel = false;
- pool->do_work = false;
- pool->is_suspended = is_suspended;
- pool->start_suspended = is_suspended;
- pool->num_suspended = 0;
- pool->suspended_queue.first = pool->suspended_queue.last = NULL;
- pool->run_in_background = is_background;
- pool->use_local_tls = false;
-
- BLI_mutex_init(&pool->num_mutex);
- BLI_condition_init(&pool->num_cond);
-
- pool->userdata = userdata;
- BLI_mutex_init(&pool->user_mutex);
-
- if (BLI_thread_is_main()) {
- pool->thread_id = 0;
- }
- else {
- TaskThread *thread = pthread_getspecific(scheduler->tls_id_key);
- if (thread == NULL) {
- /* NOTE: Task pool is created from non-main thread which is not
- * managed by the task scheduler. We identify ourselves as thread ID
- * 0 but we do not use scheduler's TLS storage and use our own
- * instead to avoid any possible threading conflicts.
- */
- pool->thread_id = 0;
- pool->use_local_tls = true;
+ pool->scheduler = scheduler;
+ pool->num = 0;
+ pool->do_cancel = false;
+ pool->do_work = false;
+ pool->is_suspended = is_suspended;
+ pool->start_suspended = is_suspended;
+ pool->num_suspended = 0;
+ pool->suspended_queue.first = pool->suspended_queue.last = NULL;
+ pool->run_in_background = is_background;
+ pool->use_local_tls = false;
+
+ BLI_mutex_init(&pool->num_mutex);
+ BLI_condition_init(&pool->num_cond);
+
+ pool->userdata = userdata;
+ BLI_mutex_init(&pool->user_mutex);
+
+ if (BLI_thread_is_main()) {
+ pool->thread_id = 0;
+ }
+ else {
+ TaskThread *thread = pthread_getspecific(scheduler->tls_id_key);
+ if (thread == NULL) {
+ /* NOTE: Task pool is created from non-main thread which is not
+ * managed by the task scheduler. We identify ourselves as thread ID
+ * 0 but we do not use scheduler's TLS storage and use our own
+ * instead to avoid any possible threading conflicts.
+ */
+ pool->thread_id = 0;
+ pool->use_local_tls = true;
#ifndef NDEBUG
- pool->creator_thread_id = pthread_self();
+ pool->creator_thread_id = pthread_self();
#endif
- initialize_task_tls(&pool->local_tls);
- }
- else {
- pool->thread_id = thread->id;
- }
- }
+ initialize_task_tls(&pool->local_tls);
+ }
+ else {
+ pool->thread_id = thread->id;
+ }
+ }
#ifdef DEBUG_STATS
- pool->mempool_stats =
- MEM_callocN(sizeof(*pool->mempool_stats) * (scheduler->num_threads + 1),
- "per-taskpool mempool stats");
+ pool->mempool_stats = MEM_callocN(sizeof(*pool->mempool_stats) * (scheduler->num_threads + 1),
+ "per-taskpool mempool stats");
#endif
- /* Ensure malloc will go fine from threads,
- *
- * This is needed because we could be in main thread here
- * and malloc could be non-thread safe at this point because
- * no other jobs are running.
- */
- BLI_threaded_malloc_begin();
+ /* Ensure malloc will go fine from threads,
+ *
+ * This is needed because we could be in main thread here
+ * and malloc could be non-thread safe at this point because
+ * no other jobs are running.
+ */
+ BLI_threaded_malloc_begin();
- return pool;
+ return pool;
}
/**
@@ -714,7 +709,7 @@ static TaskPool *task_pool_create_ex(TaskScheduler *scheduler,
*/
TaskPool *BLI_task_pool_create(TaskScheduler *scheduler, void *userdata)
{
- return task_pool_create_ex(scheduler, userdata, false, false);
+ return task_pool_create_ex(scheduler, userdata, false, false);
}
/**
@@ -729,7 +724,7 @@ TaskPool *BLI_task_pool_create(TaskScheduler *scheduler, void *userdata)
*/
TaskPool *BLI_task_pool_create_background(TaskScheduler *scheduler, void *userdata)
{
- return task_pool_create_ex(scheduler, userdata, true, false);
+ return task_pool_create_ex(scheduler, userdata, true, false);
}
/**
@@ -739,256 +734,263 @@ TaskPool *BLI_task_pool_create_background(TaskScheduler *scheduler, void *userda
*/
TaskPool *BLI_task_pool_create_suspended(TaskScheduler *scheduler, void *userdata)
{
- return task_pool_create_ex(scheduler, userdata, false, true);
+ return task_pool_create_ex(scheduler, userdata, false, true);
}
void BLI_task_pool_free(TaskPool *pool)
{
- BLI_task_pool_cancel(pool);
+ BLI_task_pool_cancel(pool);
- BLI_mutex_end(&pool->num_mutex);
- BLI_condition_end(&pool->num_cond);
+ BLI_mutex_end(&pool->num_mutex);
+ BLI_condition_end(&pool->num_cond);
- BLI_mutex_end(&pool->user_mutex);
+ BLI_mutex_end(&pool->user_mutex);
#ifdef DEBUG_STATS
- printf("Thread ID Allocated Reused Discarded\n");
- for (int i = 0; i < pool->scheduler->num_threads + 1; ++i) {
- printf("%02d %05d %05d %05d\n",
- i,
- pool->mempool_stats[i].num_alloc,
- pool->mempool_stats[i].num_reuse,
- pool->mempool_stats[i].num_discard);
- }
- MEM_freeN(pool->mempool_stats);
+ printf("Thread ID Allocated Reused Discarded\n");
+ for (int i = 0; i < pool->scheduler->num_threads + 1; ++i) {
+ printf("%02d %05d %05d %05d\n",
+ i,
+ pool->mempool_stats[i].num_alloc,
+ pool->mempool_stats[i].num_reuse,
+ pool->mempool_stats[i].num_discard);
+ }
+ MEM_freeN(pool->mempool_stats);
#endif
- if (pool->use_local_tls) {
- free_task_tls(&pool->local_tls);
- }
+ if (pool->use_local_tls) {
+ free_task_tls(&pool->local_tls);
+ }
- MEM_freeN(pool);
+ MEM_freeN(pool);
- BLI_threaded_malloc_end();
+ BLI_threaded_malloc_end();
}
BLI_INLINE bool task_can_use_local_queues(TaskPool *pool, int thread_id)
{
- return (thread_id != -1 && (thread_id != pool->thread_id || pool->do_work));
+ return (thread_id != -1 && (thread_id != pool->thread_id || pool->do_work));
}
-static void task_pool_push(
- TaskPool *pool, TaskRunFunction run, void *taskdata,
- bool free_taskdata, TaskFreeFunction freedata, TaskPriority priority,
- int thread_id)
+static void task_pool_push(TaskPool *pool,
+ TaskRunFunction run,
+ void *taskdata,
+ bool free_taskdata,
+ TaskFreeFunction freedata,
+ TaskPriority priority,
+ int thread_id)
{
- /* Allocate task and fill it's properties. */
- Task *task = task_alloc(pool, thread_id);
- task->run = run;
- task->taskdata = taskdata;
- task->free_taskdata = free_taskdata;
- task->freedata = freedata;
- task->pool = pool;
- /* For suspended pools we put everything yo a global queue first
- * and exit as soon as possible.
- *
- * This tasks will be moved to actual execution when pool is
- * activated by work_and_wait().
- */
- if (pool->is_suspended) {
- BLI_addhead(&pool->suspended_queue, task);
- atomic_fetch_and_add_z(&pool->num_suspended, 1);
- return;
- }
- /* Populate to any local queue first, this is cheapest push ever. */
- if (task_can_use_local_queues(pool, thread_id)) {
- ASSERT_THREAD_ID(pool->scheduler, thread_id);
- TaskThreadLocalStorage *tls = get_task_tls(pool, thread_id);
- /* Try to push to a local execution queue.
- * These tasks will be picked up next.
- */
- if (tls->num_local_queue < LOCAL_QUEUE_SIZE) {
- tls->local_queue[tls->num_local_queue] = task;
- tls->num_local_queue++;
- return;
- }
- /* If we are in the delayed tasks push mode, we push tasks to a
- * temporary local queue first without any locks, and then move them
- * to global execution queue with a single lock.
- */
- if (tls->do_delayed_push && tls->num_delayed_queue < DELAYED_QUEUE_SIZE) {
- tls->delayed_queue[tls->num_delayed_queue] = task;
- tls->num_delayed_queue++;
- return;
- }
- }
- /* Do push to a global execution pool, slowest possible method,
- * causes quite reasonable amount of threading overhead.
- */
- task_scheduler_push(pool->scheduler, task, priority);
+ /* Allocate task and fill it's properties. */
+ Task *task = task_alloc(pool, thread_id);
+ task->run = run;
+ task->taskdata = taskdata;
+ task->free_taskdata = free_taskdata;
+ task->freedata = freedata;
+ task->pool = pool;
+ /* For suspended pools we put everything yo a global queue first
+ * and exit as soon as possible.
+ *
+ * This tasks will be moved to actual execution when pool is
+ * activated by work_and_wait().
+ */
+ if (pool->is_suspended) {
+ BLI_addhead(&pool->suspended_queue, task);
+ atomic_fetch_and_add_z(&pool->num_suspended, 1);
+ return;
+ }
+ /* Populate to any local queue first, this is cheapest push ever. */
+ if (task_can_use_local_queues(pool, thread_id)) {
+ ASSERT_THREAD_ID(pool->scheduler, thread_id);
+ TaskThreadLocalStorage *tls = get_task_tls(pool, thread_id);
+ /* Try to push to a local execution queue.
+ * These tasks will be picked up next.
+ */
+ if (tls->num_local_queue < LOCAL_QUEUE_SIZE) {
+ tls->local_queue[tls->num_local_queue] = task;
+ tls->num_local_queue++;
+ return;
+ }
+ /* If we are in the delayed tasks push mode, we push tasks to a
+ * temporary local queue first without any locks, and then move them
+ * to global execution queue with a single lock.
+ */
+ if (tls->do_delayed_push && tls->num_delayed_queue < DELAYED_QUEUE_SIZE) {
+ tls->delayed_queue[tls->num_delayed_queue] = task;
+ tls->num_delayed_queue++;
+ return;
+ }
+ }
+ /* Do push to a global execution pool, slowest possible method,
+ * causes quite reasonable amount of threading overhead.
+ */
+ task_scheduler_push(pool->scheduler, task, priority);
}
-void BLI_task_pool_push_ex(
- TaskPool *pool, TaskRunFunction run, void *taskdata,
- bool free_taskdata, TaskFreeFunction freedata, TaskPriority priority)
+void BLI_task_pool_push_ex(TaskPool *pool,
+ TaskRunFunction run,
+ void *taskdata,
+ bool free_taskdata,
+ TaskFreeFunction freedata,
+ TaskPriority priority)
{
- task_pool_push(pool, run, taskdata, free_taskdata, freedata, priority, -1);
+ task_pool_push(pool, run, taskdata, free_taskdata, freedata, priority, -1);
}
void BLI_task_pool_push(
- TaskPool *pool, TaskRunFunction run, void *taskdata, bool free_taskdata, TaskPriority priority)
+ TaskPool *pool, TaskRunFunction run, void *taskdata, bool free_taskdata, TaskPriority priority)
{
- BLI_task_pool_push_ex(pool, run, taskdata, free_taskdata, NULL, priority);
+ BLI_task_pool_push_ex(pool, run, taskdata, free_taskdata, NULL, priority);
}
-void BLI_task_pool_push_from_thread(TaskPool *pool, TaskRunFunction run,
- void *taskdata, bool free_taskdata, TaskPriority priority, int thread_id)
+void BLI_task_pool_push_from_thread(TaskPool *pool,
+ TaskRunFunction run,
+ void *taskdata,
+ bool free_taskdata,
+ TaskPriority priority,
+ int thread_id)
{
- task_pool_push(pool, run, taskdata, free_taskdata, NULL, priority, thread_id);
+ task_pool_push(pool, run, taskdata, free_taskdata, NULL, priority, thread_id);
}
void BLI_task_pool_work_and_wait(TaskPool *pool)
{
- TaskThreadLocalStorage *tls = get_task_tls(pool, pool->thread_id);
- TaskScheduler *scheduler = pool->scheduler;
+ TaskThreadLocalStorage *tls = get_task_tls(pool, pool->thread_id);
+ TaskScheduler *scheduler = pool->scheduler;
- if (atomic_fetch_and_and_uint8((uint8_t *)&pool->is_suspended, 0)) {
- if (pool->num_suspended) {
- task_pool_num_increase(pool, pool->num_suspended);
- BLI_mutex_lock(&scheduler->queue_mutex);
+ if (atomic_fetch_and_and_uint8((uint8_t *)&pool->is_suspended, 0)) {
+ if (pool->num_suspended) {
+ task_pool_num_increase(pool, pool->num_suspended);
+ BLI_mutex_lock(&scheduler->queue_mutex);
- BLI_movelisttolist(&scheduler->queue, &pool->suspended_queue);
+ BLI_movelisttolist(&scheduler->queue, &pool->suspended_queue);
- BLI_condition_notify_all(&scheduler->queue_cond);
- BLI_mutex_unlock(&scheduler->queue_mutex);
+ BLI_condition_notify_all(&scheduler->queue_cond);
+ BLI_mutex_unlock(&scheduler->queue_mutex);
- pool->num_suspended = 0;
- }
- }
+ pool->num_suspended = 0;
+ }
+ }
- pool->do_work = true;
+ pool->do_work = true;
- ASSERT_THREAD_ID(pool->scheduler, pool->thread_id);
+ ASSERT_THREAD_ID(pool->scheduler, pool->thread_id);
- handle_local_queue(tls, pool->thread_id);
+ handle_local_queue(tls, pool->thread_id);
- BLI_mutex_lock(&pool->num_mutex);
+ BLI_mutex_lock(&pool->num_mutex);
- while (pool->num != 0) {
- Task *task, *work_task = NULL;
- bool found_task = false;
+ while (pool->num != 0) {
+ Task *task, *work_task = NULL;
+ bool found_task = false;
- BLI_mutex_unlock(&pool->num_mutex);
+ BLI_mutex_unlock(&pool->num_mutex);
- BLI_mutex_lock(&scheduler->queue_mutex);
+ BLI_mutex_lock(&scheduler->queue_mutex);
- /* find task from this pool. if we get a task from another pool,
- * we can get into deadlock */
+ /* find task from this pool. if we get a task from another pool,
+ * we can get into deadlock */
- for (task = scheduler->queue.first; task; task = task->next) {
- if (task->pool == pool) {
- work_task = task;
- found_task = true;
- BLI_remlink(&scheduler->queue, task);
- break;
- }
- }
+ for (task = scheduler->queue.first; task; task = task->next) {
+ if (task->pool == pool) {
+ work_task = task;
+ found_task = true;
+ BLI_remlink(&scheduler->queue, task);
+ break;
+ }
+ }
- BLI_mutex_unlock(&scheduler->queue_mutex);
+ BLI_mutex_unlock(&scheduler->queue_mutex);
- /* if found task, do it, otherwise wait until other tasks are done */
- if (found_task) {
- /* run task */
- BLI_assert(!tls->do_delayed_push);
- work_task->run(pool, work_task->taskdata, pool->thread_id);
- BLI_assert(!tls->do_delayed_push);
+ /* if found task, do it, otherwise wait until other tasks are done */
+ if (found_task) {
+ /* run task */
+ BLI_assert(!tls->do_delayed_push);
+ work_task->run(pool, work_task->taskdata, pool->thread_id);
+ BLI_assert(!tls->do_delayed_push);
- /* delete task */
- task_free(pool, task, pool->thread_id);
+ /* delete task */
+ task_free(pool, task, pool->thread_id);
- /* Handle all tasks from local queue. */
- handle_local_queue(tls, pool->thread_id);
+ /* Handle all tasks from local queue. */
+ handle_local_queue(tls, pool->thread_id);
- /* notify pool task was done */
- task_pool_num_decrease(pool, 1);
- }
+ /* notify pool task was done */
+ task_pool_num_decrease(pool, 1);
+ }
- BLI_mutex_lock(&pool->num_mutex);
- if (pool->num == 0) {
- break;
- }
+ BLI_mutex_lock(&pool->num_mutex);
+ if (pool->num == 0) {
+ break;
+ }
- if (!found_task) {
- BLI_condition_wait(&pool->num_cond, &pool->num_mutex);
- }
- }
+ if (!found_task) {
+ BLI_condition_wait(&pool->num_cond, &pool->num_mutex);
+ }
+ }
- BLI_mutex_unlock(&pool->num_mutex);
+ BLI_mutex_unlock(&pool->num_mutex);
- BLI_assert(tls->num_local_queue == 0);
+ BLI_assert(tls->num_local_queue == 0);
}
void BLI_task_pool_work_wait_and_reset(TaskPool *pool)
{
- BLI_task_pool_work_and_wait(pool);
+ BLI_task_pool_work_and_wait(pool);
- pool->do_work = false;
- pool->is_suspended = pool->start_suspended;
+ pool->do_work = false;
+ pool->is_suspended = pool->start_suspended;
}
void BLI_task_pool_cancel(TaskPool *pool)
{
- pool->do_cancel = true;
+ pool->do_cancel = true;
- task_scheduler_clear(pool->scheduler, pool);
+ task_scheduler_clear(pool->scheduler, pool);
- /* wait until all entries are cleared */
- BLI_mutex_lock(&pool->num_mutex);
- while (pool->num) {
- BLI_condition_wait(&pool->num_cond, &pool->num_mutex);
- }
- BLI_mutex_unlock(&pool->num_mutex);
+ /* wait until all entries are cleared */
+ BLI_mutex_lock(&pool->num_mutex);
+ while (pool->num) {
+ BLI_condition_wait(&pool->num_cond, &pool->num_mutex);
+ }
+ BLI_mutex_unlock(&pool->num_mutex);
- pool->do_cancel = false;
+ pool->do_cancel = false;
}
bool BLI_task_pool_canceled(TaskPool *pool)
{
- return pool->do_cancel;
+ return pool->do_cancel;
}
void *BLI_task_pool_userdata(TaskPool *pool)
{
- return pool->userdata;
+ return pool->userdata;
}
ThreadMutex *BLI_task_pool_user_mutex(TaskPool *pool)
{
- return &pool->user_mutex;
+ return &pool->user_mutex;
}
void BLI_task_pool_delayed_push_begin(TaskPool *pool, int thread_id)
{
- if (task_can_use_local_queues(pool, thread_id)) {
- ASSERT_THREAD_ID(pool->scheduler, thread_id);
- TaskThreadLocalStorage *tls = get_task_tls(pool, thread_id);
- tls->do_delayed_push = true;
- }
+ if (task_can_use_local_queues(pool, thread_id)) {
+ ASSERT_THREAD_ID(pool->scheduler, thread_id);
+ TaskThreadLocalStorage *tls = get_task_tls(pool, thread_id);
+ tls->do_delayed_push = true;
+ }
}
void BLI_task_pool_delayed_push_end(TaskPool *pool, int thread_id)
{
- if (task_can_use_local_queues(pool, thread_id)) {
- ASSERT_THREAD_ID(pool->scheduler, thread_id);
- TaskThreadLocalStorage *tls = get_task_tls(pool, thread_id);
- BLI_assert(tls->do_delayed_push);
- task_scheduler_push_all(pool->scheduler,
- pool,
- tls->delayed_queue,
- tls->num_delayed_queue);
- tls->do_delayed_push = false;
- tls->num_delayed_queue = 0;
- }
+ if (task_can_use_local_queues(pool, thread_id)) {
+ ASSERT_THREAD_ID(pool->scheduler, thread_id);
+ TaskThreadLocalStorage *tls = get_task_tls(pool, thread_id);
+ BLI_assert(tls->do_delayed_push);
+ task_scheduler_push_all(pool->scheduler, pool, tls->delayed_queue, tls->num_delayed_queue);
+ tls->do_delayed_push = false;
+ tls->num_delayed_queue = 0;
+ }
}
/* Parallel range routines */
@@ -1007,72 +1009,72 @@ void BLI_task_pool_delayed_push_end(TaskPool *pool, int thread_id)
/* Allows to avoid using malloc for userdata_chunk in tasks, when small enough. */
#define MALLOCA(_size) ((_size) <= 8192) ? alloca((_size)) : MEM_mallocN((_size), __func__)
-#define MALLOCA_FREE(_mem, _size) if (((_mem) != NULL) && ((_size) > 8192)) MEM_freeN((_mem))
+#define MALLOCA_FREE(_mem, _size) \
+ if (((_mem) != NULL) && ((_size) > 8192)) \
+ MEM_freeN((_mem))
typedef struct ParallelRangeState {
- int start, stop;
- void *userdata;
+ int start, stop;
+ void *userdata;
- TaskParallelRangeFunc func;
+ TaskParallelRangeFunc func;
- int iter;
- int chunk_size;
+ int iter;
+ int chunk_size;
} ParallelRangeState;
-BLI_INLINE bool parallel_range_next_iter_get(
- ParallelRangeState * __restrict state,
- int * __restrict iter, int * __restrict count)
+BLI_INLINE bool parallel_range_next_iter_get(ParallelRangeState *__restrict state,
+ int *__restrict iter,
+ int *__restrict count)
{
- int previter = atomic_fetch_and_add_int32(&state->iter, state->chunk_size);
+ int previter = atomic_fetch_and_add_int32(&state->iter, state->chunk_size);
- *iter = previter;
- *count = max_ii(0, min_ii(state->chunk_size, state->stop - previter));
+ *iter = previter;
+ *count = max_ii(0, min_ii(state->chunk_size, state->stop - previter));
- return (previter < state->stop);
+ return (previter < state->stop);
}
-static void parallel_range_func(
- TaskPool * __restrict pool,
- void *userdata_chunk,
- int thread_id)
+static void parallel_range_func(TaskPool *__restrict pool, void *userdata_chunk, int thread_id)
{
- ParallelRangeState * __restrict state = BLI_task_pool_userdata(pool);
- ParallelRangeTLS tls = {
- .thread_id = thread_id,
- .userdata_chunk = userdata_chunk,
- };
- int iter, count;
- while (parallel_range_next_iter_get(state, &iter, &count)) {
- for (int i = 0; i < count; ++i) {
- state->func(state->userdata, iter + i, &tls);
- }
- }
+ ParallelRangeState *__restrict state = BLI_task_pool_userdata(pool);
+ ParallelRangeTLS tls = {
+ .thread_id = thread_id,
+ .userdata_chunk = userdata_chunk,
+ };
+ int iter, count;
+ while (parallel_range_next_iter_get(state, &iter, &count)) {
+ for (int i = 0; i < count; ++i) {
+ state->func(state->userdata, iter + i, &tls);
+ }
+ }
}
-static void parallel_range_single_thread(const int start, int const stop,
+static void parallel_range_single_thread(const int start,
+ int const stop,
void *userdata,
TaskParallelRangeFunc func,
const ParallelRangeSettings *settings)
{
- void *userdata_chunk = settings->userdata_chunk;
- const size_t userdata_chunk_size = settings->userdata_chunk_size;
- void *userdata_chunk_local = NULL;
- const bool use_userdata_chunk = (userdata_chunk_size != 0) && (userdata_chunk != NULL);
- if (use_userdata_chunk) {
- userdata_chunk_local = MALLOCA(userdata_chunk_size);
- memcpy(userdata_chunk_local, userdata_chunk, userdata_chunk_size);
- }
- ParallelRangeTLS tls = {
- .thread_id = 0,
- .userdata_chunk = userdata_chunk_local,
- };
- for (int i = start; i < stop; ++i) {
- func(userdata, i, &tls);
- }
- if (settings->func_finalize != NULL) {
- settings->func_finalize(userdata, userdata_chunk_local);
- }
- MALLOCA_FREE(userdata_chunk_local, userdata_chunk_size);
+ void *userdata_chunk = settings->userdata_chunk;
+ const size_t userdata_chunk_size = settings->userdata_chunk_size;
+ void *userdata_chunk_local = NULL;
+ const bool use_userdata_chunk = (userdata_chunk_size != 0) && (userdata_chunk != NULL);
+ if (use_userdata_chunk) {
+ userdata_chunk_local = MALLOCA(userdata_chunk_size);
+ memcpy(userdata_chunk_local, userdata_chunk, userdata_chunk_size);
+ }
+ ParallelRangeTLS tls = {
+ .thread_id = 0,
+ .userdata_chunk = userdata_chunk_local,
+ };
+ for (int i = start; i < stop; ++i) {
+ func(userdata, i, &tls);
+ }
+ if (settings->func_finalize != NULL) {
+ settings->func_finalize(userdata, userdata_chunk_local);
+ }
+ MALLOCA_FREE(userdata_chunk_local, userdata_chunk_size);
}
/**
@@ -1080,176 +1082,166 @@ static void parallel_range_single_thread(const int start, int const stop,
*
* See public API doc of ParallelRangeSettings for description of all settings.
*/
-void BLI_task_parallel_range(const int start, const int stop,
+void BLI_task_parallel_range(const int start,
+ const int stop,
void *userdata,
TaskParallelRangeFunc func,
const ParallelRangeSettings *settings)
{
- TaskScheduler *task_scheduler;
- TaskPool *task_pool;
- ParallelRangeState state;
- int i, num_threads, num_tasks;
-
- void *userdata_chunk = settings->userdata_chunk;
- const size_t userdata_chunk_size = settings->userdata_chunk_size;
- void *userdata_chunk_local = NULL;
- void *userdata_chunk_array = NULL;
- const bool use_userdata_chunk = (userdata_chunk_size != 0) && (userdata_chunk != NULL);
-
- if (start == stop) {
- return;
- }
-
- BLI_assert(start < stop);
- if (userdata_chunk_size != 0) {
- BLI_assert(userdata_chunk != NULL);
- }
-
- /* If it's not enough data to be crunched, don't bother with tasks at all,
- * do everything from the main thread.
- */
- if (!settings->use_threading) {
- parallel_range_single_thread(start, stop,
- userdata,
- func,
- settings);
- return;
- }
-
- task_scheduler = BLI_task_scheduler_get();
- num_threads = BLI_task_scheduler_num_threads(task_scheduler);
-
- /* The idea here is to prevent creating task for each of the loop iterations
- * and instead have tasks which are evenly distributed across CPU cores and
- * pull next iter to be crunched using the queue.
- */
- num_tasks = num_threads + 2;
-
- state.start = start;
- state.stop = stop;
- state.userdata = userdata;
- state.func = func;
- state.iter = start;
- switch (settings->scheduling_mode) {
- case TASK_SCHEDULING_STATIC:
- state.chunk_size = max_ii(
- settings->min_iter_per_thread,
- (stop - start) / (num_tasks));
- break;
- case TASK_SCHEDULING_DYNAMIC:
- /* TODO(sergey): Make it configurable from min_iter_per_thread. */
- state.chunk_size = 32;
- break;
- }
-
- num_tasks = min_ii(num_tasks,
- max_ii(1, (stop - start) / state.chunk_size));
-
- if (num_tasks == 1) {
- parallel_range_single_thread(start, stop,
- userdata,
- func,
- settings);
- return;
- }
-
- task_pool = BLI_task_pool_create_suspended(task_scheduler, &state);
-
- /* NOTE: This way we are adding a memory barrier and ensure all worker
- * threads can read and modify the value, without any locks. */
- atomic_fetch_and_add_int32(&state.iter, 0);
-
- if (use_userdata_chunk) {
- userdata_chunk_array = MALLOCA(userdata_chunk_size * num_tasks);
- }
-
- for (i = 0; i < num_tasks; i++) {
- if (use_userdata_chunk) {
- userdata_chunk_local = (char *)userdata_chunk_array + (userdata_chunk_size * i);
- memcpy(userdata_chunk_local, userdata_chunk, userdata_chunk_size);
- }
- /* Use this pool's pre-allocated tasks. */
- BLI_task_pool_push_from_thread(task_pool,
- parallel_range_func,
- userdata_chunk_local, false,
- TASK_PRIORITY_HIGH,
- task_pool->thread_id);
- }
-
- BLI_task_pool_work_and_wait(task_pool);
- BLI_task_pool_free(task_pool);
-
- if (use_userdata_chunk) {
- if (settings->func_finalize != NULL) {
- for (i = 0; i < num_tasks; i++) {
- userdata_chunk_local = (char *)userdata_chunk_array + (userdata_chunk_size * i);
- settings->func_finalize(userdata, userdata_chunk_local);
- }
- }
- MALLOCA_FREE(userdata_chunk_array, userdata_chunk_size * num_tasks);
- }
+ TaskScheduler *task_scheduler;
+ TaskPool *task_pool;
+ ParallelRangeState state;
+ int i, num_threads, num_tasks;
+
+ void *userdata_chunk = settings->userdata_chunk;
+ const size_t userdata_chunk_size = settings->userdata_chunk_size;
+ void *userdata_chunk_local = NULL;
+ void *userdata_chunk_array = NULL;
+ const bool use_userdata_chunk = (userdata_chunk_size != 0) && (userdata_chunk != NULL);
+
+ if (start == stop) {
+ return;
+ }
+
+ BLI_assert(start < stop);
+ if (userdata_chunk_size != 0) {
+ BLI_assert(userdata_chunk != NULL);
+ }
+
+ /* If it's not enough data to be crunched, don't bother with tasks at all,
+ * do everything from the main thread.
+ */
+ if (!settings->use_threading) {
+ parallel_range_single_thread(start, stop, userdata, func, settings);
+ return;
+ }
+
+ task_scheduler = BLI_task_scheduler_get();
+ num_threads = BLI_task_scheduler_num_threads(task_scheduler);
+
+ /* The idea here is to prevent creating task for each of the loop iterations
+ * and instead have tasks which are evenly distributed across CPU cores and
+ * pull next iter to be crunched using the queue.
+ */
+ num_tasks = num_threads + 2;
+
+ state.start = start;
+ state.stop = stop;
+ state.userdata = userdata;
+ state.func = func;
+ state.iter = start;
+ switch (settings->scheduling_mode) {
+ case TASK_SCHEDULING_STATIC:
+ state.chunk_size = max_ii(settings->min_iter_per_thread, (stop - start) / (num_tasks));
+ break;
+ case TASK_SCHEDULING_DYNAMIC:
+ /* TODO(sergey): Make it configurable from min_iter_per_thread. */
+ state.chunk_size = 32;
+ break;
+ }
+
+ num_tasks = min_ii(num_tasks, max_ii(1, (stop - start) / state.chunk_size));
+
+ if (num_tasks == 1) {
+ parallel_range_single_thread(start, stop, userdata, func, settings);
+ return;
+ }
+
+ task_pool = BLI_task_pool_create_suspended(task_scheduler, &state);
+
+ /* NOTE: This way we are adding a memory barrier and ensure all worker
+ * threads can read and modify the value, without any locks. */
+ atomic_fetch_and_add_int32(&state.iter, 0);
+
+ if (use_userdata_chunk) {
+ userdata_chunk_array = MALLOCA(userdata_chunk_size * num_tasks);
+ }
+
+ for (i = 0; i < num_tasks; i++) {
+ if (use_userdata_chunk) {
+ userdata_chunk_local = (char *)userdata_chunk_array + (userdata_chunk_size * i);
+ memcpy(userdata_chunk_local, userdata_chunk, userdata_chunk_size);
+ }
+ /* Use this pool's pre-allocated tasks. */
+ BLI_task_pool_push_from_thread(task_pool,
+ parallel_range_func,
+ userdata_chunk_local,
+ false,
+ TASK_PRIORITY_HIGH,
+ task_pool->thread_id);
+ }
+
+ BLI_task_pool_work_and_wait(task_pool);
+ BLI_task_pool_free(task_pool);
+
+ if (use_userdata_chunk) {
+ if (settings->func_finalize != NULL) {
+ for (i = 0; i < num_tasks; i++) {
+ userdata_chunk_local = (char *)userdata_chunk_array + (userdata_chunk_size * i);
+ settings->func_finalize(userdata, userdata_chunk_local);
+ }
+ }
+ MALLOCA_FREE(userdata_chunk_array, userdata_chunk_size * num_tasks);
+ }
}
#undef MALLOCA
#undef MALLOCA_FREE
typedef struct ParallelListbaseState {
- void *userdata;
- TaskParallelListbaseFunc func;
+ void *userdata;
+ TaskParallelListbaseFunc func;
- int chunk_size;
- int index;
- Link *link;
- SpinLock lock;
+ int chunk_size;
+ int index;
+ Link *link;
+ SpinLock lock;
} ParallelListState;
-BLI_INLINE Link *parallel_listbase_next_iter_get(
- ParallelListState * __restrict state,
- int * __restrict index,
- int * __restrict count)
+BLI_INLINE Link *parallel_listbase_next_iter_get(ParallelListState *__restrict state,
+ int *__restrict index,
+ int *__restrict count)
{
- int task_count = 0;
- BLI_spin_lock(&state->lock);
- Link *result = state->link;
- if (LIKELY(result != NULL)) {
- *index = state->index;
- while (state->link != NULL && task_count < state->chunk_size) {
- ++task_count;
- state->link = state->link->next;
- }
- state->index += task_count;
- }
- BLI_spin_unlock(&state->lock);
- *count = task_count;
- return result;
+ int task_count = 0;
+ BLI_spin_lock(&state->lock);
+ Link *result = state->link;
+ if (LIKELY(result != NULL)) {
+ *index = state->index;
+ while (state->link != NULL && task_count < state->chunk_size) {
+ ++task_count;
+ state->link = state->link->next;
+ }
+ state->index += task_count;
+ }
+ BLI_spin_unlock(&state->lock);
+ *count = task_count;
+ return result;
}
-static void parallel_listbase_func(
- TaskPool * __restrict pool,
- void *UNUSED(taskdata),
- int UNUSED(threadid))
+static void parallel_listbase_func(TaskPool *__restrict pool,
+ void *UNUSED(taskdata),
+ int UNUSED(threadid))
{
- ParallelListState * __restrict state = BLI_task_pool_userdata(pool);
- Link *link;
- int index, count;
-
- while ((link = parallel_listbase_next_iter_get(state, &index, &count)) != NULL) {
- for (int i = 0; i < count; ++i) {
- state->func(state->userdata, link, index + i);
- link = link->next;
- }
- }
+ ParallelListState *__restrict state = BLI_task_pool_userdata(pool);
+ Link *link;
+ int index, count;
+
+ while ((link = parallel_listbase_next_iter_get(state, &index, &count)) != NULL) {
+ for (int i = 0; i < count; ++i) {
+ state->func(state->userdata, link, index + i);
+ link = link->next;
+ }
+ }
}
-static void task_parallel_listbase_no_threads(
- struct ListBase *listbase,
- void *userdata,
- TaskParallelListbaseFunc func)
+static void task_parallel_listbase_no_threads(struct ListBase *listbase,
+ void *userdata,
+ TaskParallelListbaseFunc func)
{
- int i = 0;
- for (Link *link = listbase->first; link != NULL; link = link->next, ++i) {
- func(userdata, link, i);
- }
+ int i = 0;
+ for (Link *link = listbase->first; link != NULL; link = link->next, ++i) {
+ func(userdata, link, i);
+ }
}
/* NOTE: The idea here is to compensate for rather measurable threading
@@ -1257,13 +1249,13 @@ static void task_parallel_listbase_no_threads(
* to spend too much time in those overheads. */
BLI_INLINE int task_parallel_listbasecalc_chunk_size(const int num_threads)
{
- if (num_threads > 32) {
- return 128;
- }
- else if (num_threads > 16) {
- return 64;
- }
- return 32;
+ if (num_threads > 32) {
+ return 128;
+ }
+ else if (num_threads > 16) {
+ return 64;
+ }
+ return 32;
}
/**
@@ -1277,75 +1269,65 @@ BLI_INLINE int task_parallel_listbasecalc_chunk_size(const int num_threads)
*
* \note There is no static scheduling here, since it would need another full loop over items to count them...
*/
-void BLI_task_parallel_listbase(
- struct ListBase *listbase,
- void *userdata,
- TaskParallelListbaseFunc func,
- const bool use_threading)
+void BLI_task_parallel_listbase(struct ListBase *listbase,
+ void *userdata,
+ TaskParallelListbaseFunc func,
+ const bool use_threading)
{
- if (BLI_listbase_is_empty(listbase)) {
- return;
- }
- if (!use_threading) {
- task_parallel_listbase_no_threads(listbase, userdata, func);
- return;
- }
- TaskScheduler *task_scheduler = BLI_task_scheduler_get();
- const int num_threads = BLI_task_scheduler_num_threads(task_scheduler);
- /* TODO(sergey): Consider making chunk size configurable. */
- const int chunk_size = task_parallel_listbasecalc_chunk_size(num_threads);
- const int num_tasks = min_ii(
- num_threads,
- BLI_listbase_count(listbase) / chunk_size);
- if (num_tasks <= 1) {
- task_parallel_listbase_no_threads(listbase, userdata, func);
- return;
- }
-
- ParallelListState state;
- TaskPool *task_pool = BLI_task_pool_create_suspended(task_scheduler, &state);
-
- state.index = 0;
- state.link = listbase->first;
- state.userdata = userdata;
- state.func = func;
- state.chunk_size = chunk_size;
- BLI_spin_init(&state.lock);
-
- BLI_assert(num_tasks > 0);
- for (int i = 0; i < num_tasks; i++) {
- /* Use this pool's pre-allocated tasks. */
- BLI_task_pool_push_from_thread(task_pool,
- parallel_listbase_func,
- NULL, false,
- TASK_PRIORITY_HIGH,
- task_pool->thread_id);
- }
-
- BLI_task_pool_work_and_wait(task_pool);
- BLI_task_pool_free(task_pool);
-
- BLI_spin_end(&state.lock);
+ if (BLI_listbase_is_empty(listbase)) {
+ return;
+ }
+ if (!use_threading) {
+ task_parallel_listbase_no_threads(listbase, userdata, func);
+ return;
+ }
+ TaskScheduler *task_scheduler = BLI_task_scheduler_get();
+ const int num_threads = BLI_task_scheduler_num_threads(task_scheduler);
+ /* TODO(sergey): Consider making chunk size configurable. */
+ const int chunk_size = task_parallel_listbasecalc_chunk_size(num_threads);
+ const int num_tasks = min_ii(num_threads, BLI_listbase_count(listbase) / chunk_size);
+ if (num_tasks <= 1) {
+ task_parallel_listbase_no_threads(listbase, userdata, func);
+ return;
+ }
+
+ ParallelListState state;
+ TaskPool *task_pool = BLI_task_pool_create_suspended(task_scheduler, &state);
+
+ state.index = 0;
+ state.link = listbase->first;
+ state.userdata = userdata;
+ state.func = func;
+ state.chunk_size = chunk_size;
+ BLI_spin_init(&state.lock);
+
+ BLI_assert(num_tasks > 0);
+ for (int i = 0; i < num_tasks; i++) {
+ /* Use this pool's pre-allocated tasks. */
+ BLI_task_pool_push_from_thread(
+ task_pool, parallel_listbase_func, NULL, false, TASK_PRIORITY_HIGH, task_pool->thread_id);
+ }
+
+ BLI_task_pool_work_and_wait(task_pool);
+ BLI_task_pool_free(task_pool);
+
+ BLI_spin_end(&state.lock);
}
-
typedef struct ParallelMempoolState {
- void *userdata;
- TaskParallelMempoolFunc func;
+ void *userdata;
+ TaskParallelMempoolFunc func;
} ParallelMempoolState;
-static void parallel_mempool_func(
- TaskPool * __restrict pool,
- void *taskdata,
- int UNUSED(threadid))
+static void parallel_mempool_func(TaskPool *__restrict pool, void *taskdata, int UNUSED(threadid))
{
- ParallelMempoolState * __restrict state = BLI_task_pool_userdata(pool);
- BLI_mempool_iter *iter = taskdata;
- MempoolIterData *item;
+ ParallelMempoolState *__restrict state = BLI_task_pool_userdata(pool);
+ BLI_mempool_iter *iter = taskdata;
+ MempoolIterData *item;
- while ((item = BLI_mempool_iterstep(iter)) != NULL) {
- state->func(state->userdata, item);
- }
+ while ((item = BLI_mempool_iterstep(iter)) != NULL) {
+ state->func(state->userdata, item);
+ }
}
/**
@@ -1359,57 +1341,59 @@ static void parallel_mempool_func(
*
* \note There is no static scheduling here.
*/
-void BLI_task_parallel_mempool(
- BLI_mempool *mempool,
- void *userdata,
- TaskParallelMempoolFunc func,
- const bool use_threading)
+void BLI_task_parallel_mempool(BLI_mempool *mempool,
+ void *userdata,
+ TaskParallelMempoolFunc func,
+ const bool use_threading)
{
- TaskScheduler *task_scheduler;
- TaskPool *task_pool;
- ParallelMempoolState state;
- int i, num_threads, num_tasks;
-
- if (BLI_mempool_len(mempool) == 0) {
- return;
- }
-
- if (!use_threading) {
- BLI_mempool_iter iter;
- BLI_mempool_iternew(mempool, &iter);
-
- for (void *item = BLI_mempool_iterstep(&iter); item != NULL; item = BLI_mempool_iterstep(&iter)) {
- func(userdata, item);
- }
- return;
- }
-
- task_scheduler = BLI_task_scheduler_get();
- task_pool = BLI_task_pool_create_suspended(task_scheduler, &state);
- num_threads = BLI_task_scheduler_num_threads(task_scheduler);
-
- /* The idea here is to prevent creating task for each of the loop iterations
- * and instead have tasks which are evenly distributed across CPU cores and
- * pull next item to be crunched using the threaded-aware BLI_mempool_iter.
- */
- num_tasks = num_threads + 2;
-
- state.userdata = userdata;
- state.func = func;
-
- BLI_mempool_iter *mempool_iterators = BLI_mempool_iter_threadsafe_create(mempool, (size_t)num_tasks);
-
- for (i = 0; i < num_tasks; i++) {
- /* Use this pool's pre-allocated tasks. */
- BLI_task_pool_push_from_thread(task_pool,
- parallel_mempool_func,
- &mempool_iterators[i], false,
- TASK_PRIORITY_HIGH,
- task_pool->thread_id);
- }
-
- BLI_task_pool_work_and_wait(task_pool);
- BLI_task_pool_free(task_pool);
-
- BLI_mempool_iter_threadsafe_free(mempool_iterators);
+ TaskScheduler *task_scheduler;
+ TaskPool *task_pool;
+ ParallelMempoolState state;
+ int i, num_threads, num_tasks;
+
+ if (BLI_mempool_len(mempool) == 0) {
+ return;
+ }
+
+ if (!use_threading) {
+ BLI_mempool_iter iter;
+ BLI_mempool_iternew(mempool, &iter);
+
+ for (void *item = BLI_mempool_iterstep(&iter); item != NULL;
+ item = BLI_mempool_iterstep(&iter)) {
+ func(userdata, item);
+ }
+ return;
+ }
+
+ task_scheduler = BLI_task_scheduler_get();
+ task_pool = BLI_task_pool_create_suspended(task_scheduler, &state);
+ num_threads = BLI_task_scheduler_num_threads(task_scheduler);
+
+ /* The idea here is to prevent creating task for each of the loop iterations
+ * and instead have tasks which are evenly distributed across CPU cores and
+ * pull next item to be crunched using the threaded-aware BLI_mempool_iter.
+ */
+ num_tasks = num_threads + 2;
+
+ state.userdata = userdata;
+ state.func = func;
+
+ BLI_mempool_iter *mempool_iterators = BLI_mempool_iter_threadsafe_create(mempool,
+ (size_t)num_tasks);
+
+ for (i = 0; i < num_tasks; i++) {
+ /* Use this pool's pre-allocated tasks. */
+ BLI_task_pool_push_from_thread(task_pool,
+ parallel_mempool_func,
+ &mempool_iterators[i],
+ false,
+ TASK_PRIORITY_HIGH,
+ task_pool->thread_id);
+ }
+
+ BLI_task_pool_work_and_wait(task_pool);
+ BLI_task_pool_free(task_pool);
+
+ BLI_mempool_iter_threadsafe_free(mempool_iterators);
}