diff options
-rw-r--r-- | source/blender/blenlib/intern/task.c | 76 |
1 files changed, 70 insertions, 6 deletions
diff --git a/source/blender/blenlib/intern/task.c b/source/blender/blenlib/intern/task.c index a7449f97bfc..4e73ff66338 100644 --- a/source/blender/blenlib/intern/task.c +++ b/source/blender/blenlib/intern/task.c @@ -48,6 +48,27 @@ */ #define MEMPOOL_SIZE 256 +/* Number of tasks which are pushed directly to local thread queue. + * + * This allows thread to fetch next task without locking the whole queue. + */ +#define LOCALQUEUE_SIZE 1 + +#ifndef NDEBUG +# define ASSERT_THREAD_ID(scheduler, thread_id) \ + do { \ + if (!BLI_thread_is_main()) { \ + TaskThread *thread = pthread_getspecific(scheduler->tls_id_key); \ + BLI_assert(thread_id == thread->id); \ + } \ + else { \ + BLI_assert(thread_id == 0); \ + } \ + } while (false) +#else +# define ASSERT_THREAD_ID(scheduler, thread_id) +#endif + typedef struct Task { struct Task *next, *prev; @@ -104,6 +125,8 @@ typedef struct TaskMemPoolStats { typedef struct TaskThreadLocalStorage { TaskMemPool task_mempool; + int num_local_queue; + Task *local_queue[LOCALQUEUE_SIZE]; } TaskThreadLocalStorage; struct TaskPool { @@ -117,6 +140,7 @@ struct TaskPool { ThreadMutex user_mutex; volatile bool do_cancel; + volatile bool do_work; /* If set, this pool may never be work_and_wait'ed, which means TaskScheduler * has to use its special background fallback thread in case we are in @@ -175,6 +199,9 @@ BLI_INLINE TaskThreadLocalStorage *get_task_tls(TaskPool *pool, TaskScheduler *scheduler = pool->scheduler; BLI_assert(thread_id >= 0); BLI_assert(thread_id <= scheduler->num_threads); + if (thread_id == 0) { + return &scheduler->task_threads[pool->thread_id].tls; + } return &scheduler->task_threads[thread_id].tls; } @@ -314,9 +341,28 @@ static bool task_scheduler_thread_wait_pop(TaskScheduler *scheduler, Task **task return true; } +BLI_INLINE void handle_local_queue(TaskThreadLocalStorage *tls, + const int thread_id) +{ + while (tls->num_local_queue > 0) { + /* We pop task from queue before handling it so handler of the task can + * push next job to the local queue. + */ + tls->num_local_queue--; + Task *local_task = tls->local_queue[tls->num_local_queue]; + /* TODO(sergey): Double-check work_and_wait() doesn't handle other's + * pool tasks. + */ + TaskPool *local_pool = local_task->pool; + local_task->run(local_pool, local_task->taskdata, thread_id); + task_free(local_pool, local_task, thread_id); + } +} + static void *task_scheduler_thread_run(void *thread_p) { TaskThread *thread = (TaskThread *) thread_p; + TaskThreadLocalStorage *tls = &thread->tls; TaskScheduler *scheduler = thread->scheduler; int thread_id = thread->id; Task *task; @@ -333,6 +379,9 @@ static void *task_scheduler_thread_run(void *thread_p) /* delete task */ task_free(pool, task, thread_id); + /* Handle all tasks from local queue. */ + handle_local_queue(tls, thread_id); + /* notify pool task was done */ task_pool_num_decrease(pool, 1); } @@ -506,6 +555,7 @@ static TaskPool *task_pool_create_ex(TaskScheduler *scheduler, void *userdata, c pool->scheduler = scheduler; pool->num = 0; pool->do_cancel = false; + pool->do_work = false; pool->run_in_background = is_background; BLI_mutex_init(&pool->num_mutex); @@ -603,6 +653,19 @@ static void task_pool_push( task->freedata = freedata; task->pool = pool; + if (thread_id != -1 && + (thread_id != pool->thread_id || pool->do_work)) + { + ASSERT_THREAD_ID(pool->scheduler, thread_id); + + TaskThreadLocalStorage *tls = get_task_tls(pool, thread_id); + if (tls->num_local_queue < LOCALQUEUE_SIZE) { + tls->local_queue[tls->num_local_queue] = task; + tls->num_local_queue++; + return; + } + } + task_scheduler_push(pool->scheduler, task, priority); } @@ -627,14 +690,12 @@ void BLI_task_pool_push_from_thread(TaskPool *pool, TaskRunFunction run, void BLI_task_pool_work_and_wait(TaskPool *pool) { + TaskThreadLocalStorage *tls = get_task_tls(pool, pool->thread_id); TaskScheduler *scheduler = pool->scheduler; -#ifndef NDEBUG - if (!BLI_thread_is_main()) { - TaskThread *thread = pthread_getspecific(scheduler->tls_id_key); - BLI_assert(pool->thread_id == thread->id); - } -#endif + pool->do_work = true; + + ASSERT_THREAD_ID(pool->scheduler, pool->thread_id); BLI_mutex_lock(&pool->num_mutex); @@ -668,6 +729,9 @@ void BLI_task_pool_work_and_wait(TaskPool *pool) /* delete task */ task_free(pool, task, pool->thread_id); + /* Handle all tasks from local queue. */ + handle_local_queue(tls, pool->thread_id); + /* notify pool task was done */ task_pool_num_decrease(pool, 1); } |