diff options
-rw-r--r-- | source/blender/blenlib/intern/task.c | 189 |
1 files changed, 110 insertions, 79 deletions
diff --git a/source/blender/blenlib/intern/task.c b/source/blender/blenlib/intern/task.c index 91e821a8f1a..21c02c31a9d 100644 --- a/source/blender/blenlib/intern/task.c +++ b/source/blender/blenlib/intern/task.c @@ -35,6 +35,8 @@ #include "BLI_task.h" #include "BLI_threads.h" +#include "PIL_time.h" + #include "atomic_ops.h" /* Define this to enable some detailed statistic print. */ @@ -48,6 +50,10 @@ */ #define MEMPOOL_SIZE 256 +/* Parameters controlling how much we spin in nanosleeps before switching to real condition-controlled sleeping. */ +#define NANOSLEEP_MAX_SPINNING 1000 /* Number of failed attempt to get a task before going to condition waiting. */ +#define NANOSLEEP_DURATION (const struct timespec[]){{0, 200L}} /* Nanosleep duration (in nano-seconds). */ + typedef struct Task { struct Task *next, *prev; @@ -105,11 +111,9 @@ typedef struct TaskMemPoolStats { struct TaskPool { TaskScheduler *scheduler; - volatile size_t num; + size_t num; size_t num_threads; size_t currently_running_tasks; - ThreadMutex num_mutex; - ThreadCondition num_cond; void *userdata; ThreadMutex user_mutex; @@ -146,10 +150,14 @@ struct TaskScheduler { bool background_thread_only; ListBase queue; - ThreadMutex queue_mutex; - ThreadCondition queue_cond; + size_t num_queued; + SpinLock queue_spinlock; + + ThreadMutex workers_mutex; + ThreadCondition workers_condition; + size_t workers_sleeping; - volatile bool do_exit; + uint8_t do_exit; }; typedef struct TaskThread { @@ -231,36 +239,33 @@ static void task_free(TaskPool *pool, Task *task, const int thread_id) static void task_pool_num_decrease(TaskPool *pool, size_t done) { - BLI_mutex_lock(&pool->num_mutex); - BLI_assert(pool->num >= done); - pool->num -= done; + atomic_sub_and_fetch_z(&pool->num, done); atomic_sub_and_fetch_z(&pool->currently_running_tasks, done); - if (pool->num == 0) - BLI_condition_notify_all(&pool->num_cond); - - BLI_mutex_unlock(&pool->num_mutex); + if (pool->num == 0 && pool->scheduler->workers_sleeping != 0) { + BLI_mutex_lock(&pool->scheduler->workers_mutex); + BLI_condition_notify_all(&pool->scheduler->workers_condition); + BLI_mutex_unlock(&pool->scheduler->workers_mutex); + } } static void task_pool_num_increase(TaskPool *pool) { - BLI_mutex_lock(&pool->num_mutex); - - pool->num++; - BLI_condition_notify_all(&pool->num_cond); + atomic_add_and_fetch_z(&pool->num, 1); - BLI_mutex_unlock(&pool->num_mutex); + if (pool->scheduler->workers_sleeping != 0) { + BLI_mutex_lock(&pool->scheduler->workers_mutex); + BLI_condition_notify_all(&pool->scheduler->workers_condition); + BLI_mutex_unlock(&pool->scheduler->workers_mutex); + } } static bool task_scheduler_thread_wait_pop(TaskScheduler *scheduler, Task **task) { bool found_task = false; - BLI_mutex_lock(&scheduler->queue_mutex); - - while (!scheduler->queue.first && !scheduler->do_exit) - BLI_condition_wait(&scheduler->queue_cond, &scheduler->queue_mutex); + int loop_count = 0; do { Task *current_task; @@ -276,38 +281,52 @@ static bool task_scheduler_thread_wait_pop(TaskScheduler *scheduler, Task **task * So we only abort here if do_exit is set. */ if (scheduler->do_exit) { - BLI_mutex_unlock(&scheduler->queue_mutex); return false; } - for (current_task = scheduler->queue.first; - current_task != NULL; - current_task = current_task->next) - { - TaskPool *pool = current_task->pool; + if (scheduler->num_queued != 0) { + BLI_spin_lock(&scheduler->queue_spinlock); + for (current_task = scheduler->queue.first; + current_task != NULL; + current_task = current_task->next) + { + TaskPool *pool = current_task->pool; + + if (scheduler->background_thread_only && !pool->run_in_background) { + continue; + } - if (scheduler->background_thread_only && !pool->run_in_background) { - continue; + if (atomic_add_and_fetch_z(&pool->currently_running_tasks, 1) <= pool->num_threads || + pool->num_threads == 0) + { + *task = current_task; + found_task = true; + BLI_remlink(&scheduler->queue, *task); + atomic_sub_and_fetch_z(&scheduler->num_queued, 1); + break; + } + else { + atomic_sub_and_fetch_z(&pool->currently_running_tasks, 1); + } } + BLI_spin_unlock(&scheduler->queue_spinlock); + } - if (atomic_add_and_fetch_z(&pool->currently_running_tasks, 1) <= pool->num_threads || - pool->num_threads == 0) - { - *task = current_task; - found_task = true; - BLI_remlink(&scheduler->queue, *task); - break; + if (!found_task) { + if (++loop_count > NANOSLEEP_MAX_SPINNING) { + BLI_mutex_lock(&scheduler->workers_mutex); + atomic_add_and_fetch_z(&scheduler->workers_sleeping, 1); + BLI_condition_wait(&scheduler->workers_condition, &scheduler->workers_mutex); + atomic_sub_and_fetch_z(&scheduler->workers_sleeping, 1); + BLI_mutex_unlock(&scheduler->workers_mutex); } else { - atomic_sub_and_fetch_z(&pool->currently_running_tasks, 1); + nanosleep(NANOSLEEP_DURATION, NULL); } +// PIL_sleep_ms(1); } - if (!found_task) - BLI_condition_wait(&scheduler->queue_cond, &scheduler->queue_mutex); } while (!found_task); - BLI_mutex_unlock(&scheduler->queue_mutex); - return true; } @@ -341,11 +360,13 @@ TaskScheduler *BLI_task_scheduler_create(int num_threads) /* multiple places can use this task scheduler, sharing the same * threads, so we keep track of the number of users. */ - scheduler->do_exit = false; + scheduler->do_exit = 0; BLI_listbase_clear(&scheduler->queue); - BLI_mutex_init(&scheduler->queue_mutex); - BLI_condition_init(&scheduler->queue_cond); + BLI_spin_init(&scheduler->queue_spinlock); + + BLI_mutex_init(&scheduler->workers_mutex); + BLI_condition_init(&scheduler->workers_condition); if (num_threads == 0) { /* automatic number of threads will be main thread + num cores */ @@ -391,10 +412,12 @@ void BLI_task_scheduler_free(TaskScheduler *scheduler) Task *task; /* stop all waiting threads */ - BLI_mutex_lock(&scheduler->queue_mutex); - scheduler->do_exit = true; - BLI_condition_notify_all(&scheduler->queue_cond); - BLI_mutex_unlock(&scheduler->queue_mutex); + atomic_fetch_and_or_uint8(&scheduler->do_exit, 1); + if (scheduler->workers_sleeping != 0) { + BLI_mutex_lock(&scheduler->workers_mutex); + BLI_condition_notify_all(&scheduler->workers_condition); + BLI_mutex_unlock(&scheduler->workers_mutex); + } /* delete threads */ if (scheduler->threads) { @@ -430,8 +453,10 @@ void BLI_task_scheduler_free(TaskScheduler *scheduler) BLI_freelistN(&scheduler->queue); /* delete mutex/condition */ - BLI_mutex_end(&scheduler->queue_mutex); - BLI_condition_end(&scheduler->queue_cond); + BLI_spin_end(&scheduler->queue_spinlock); + + BLI_mutex_end(&scheduler->workers_mutex); + BLI_condition_end(&scheduler->workers_condition); MEM_freeN(scheduler); } @@ -446,15 +471,16 @@ static void task_scheduler_push(TaskScheduler *scheduler, Task *task, TaskPriori task_pool_num_increase(task->pool); /* add task to queue */ - BLI_mutex_lock(&scheduler->queue_mutex); + BLI_spin_lock(&scheduler->queue_spinlock); if (priority == TASK_PRIORITY_HIGH) BLI_addhead(&scheduler->queue, task); else BLI_addtail(&scheduler->queue, task); - BLI_condition_notify_one(&scheduler->queue_cond); - BLI_mutex_unlock(&scheduler->queue_mutex); + BLI_spin_unlock(&scheduler->queue_spinlock); + + atomic_add_and_fetch_z(&scheduler->num_queued, 1); } static void task_scheduler_clear(TaskScheduler *scheduler, TaskPool *pool) @@ -462,7 +488,7 @@ static void task_scheduler_clear(TaskScheduler *scheduler, TaskPool *pool) Task *task, *nexttask; size_t done = 0; - BLI_mutex_lock(&scheduler->queue_mutex); + BLI_spin_lock(&scheduler->queue_spinlock); /* free all tasks from this pool from the queue */ for (task = scheduler->queue.first; task; task = nexttask) { @@ -471,12 +497,13 @@ static void task_scheduler_clear(TaskScheduler *scheduler, TaskPool *pool) if (task->pool == pool) { task_data_free(task, 0); BLI_freelinkN(&scheduler->queue, task); + atomic_sub_and_fetch_z(&scheduler->num_queued, 1); done++; } } - BLI_mutex_unlock(&scheduler->queue_mutex); + BLI_spin_unlock(&scheduler->queue_spinlock); /* notify done */ task_pool_num_decrease(pool, done); @@ -507,9 +534,6 @@ static TaskPool *task_pool_create_ex(TaskScheduler *scheduler, void *userdata, c pool->do_cancel = false; pool->run_in_background = is_background; - BLI_mutex_init(&pool->num_mutex); - BLI_condition_init(&pool->num_cond); - pool->userdata = userdata; BLI_mutex_init(&pool->user_mutex); @@ -567,9 +591,6 @@ void BLI_task_pool_free(TaskPool *pool) { BLI_task_pool_cancel(pool); - BLI_mutex_end(&pool->num_mutex); - BLI_condition_end(&pool->num_cond); - BLI_mutex_end(&pool->user_mutex); /* Free local memory pool, those pointers are lost forever. */ @@ -634,35 +655,31 @@ void BLI_task_pool_push_from_thread(TaskPool *pool, TaskRunFunction run, void BLI_task_pool_work_and_wait(TaskPool *pool) { TaskScheduler *scheduler = pool->scheduler; - - BLI_mutex_lock(&pool->num_mutex); + int loop_count = 0; while (pool->num != 0) { Task *task, *work_task = NULL; bool found_task = false; - BLI_mutex_unlock(&pool->num_mutex); - - BLI_mutex_lock(&scheduler->queue_mutex); - /* find task from this pool. if we get a task from another pool, * we can get into deadlock */ if (pool->num_threads == 0 || pool->currently_running_tasks < pool->num_threads) { + BLI_spin_lock(&scheduler->queue_spinlock); for (task = scheduler->queue.first; task; task = task->next) { if (task->pool == pool) { work_task = task; found_task = true; BLI_remlink(&scheduler->queue, task); + atomic_sub_and_fetch_z(&scheduler->num_queued, 1); break; } } + BLI_spin_unlock(&scheduler->queue_spinlock); } - BLI_mutex_unlock(&scheduler->queue_mutex); - /* if found task, do it, otherwise wait until other tasks are done */ if (found_task) { /* run task */ @@ -674,17 +691,27 @@ void BLI_task_pool_work_and_wait(TaskPool *pool) /* notify pool task was done */ task_pool_num_decrease(pool, 1); + + /* Reset the 'failde' counter to zero. */ + loop_count = 0; } - BLI_mutex_lock(&pool->num_mutex); if (pool->num == 0) break; - if (!found_task) - BLI_condition_wait(&pool->num_cond, &pool->num_mutex); + if (!found_task) { + if (++loop_count > NANOSLEEP_MAX_SPINNING) { + BLI_mutex_lock(&scheduler->workers_mutex); + atomic_add_and_fetch_z(&scheduler->workers_sleeping, 1); + BLI_condition_wait(&scheduler->workers_condition, &scheduler->workers_mutex); + atomic_sub_and_fetch_z(&scheduler->workers_sleeping, 1); + BLI_mutex_unlock(&scheduler->workers_mutex); + } + else { + nanosleep(NANOSLEEP_DURATION, NULL); + } + } } - - BLI_mutex_unlock(&pool->num_mutex); } void BLI_pool_set_num_threads(TaskPool *pool, int num_threads) @@ -700,10 +727,14 @@ void BLI_task_pool_cancel(TaskPool *pool) task_scheduler_clear(pool->scheduler, pool); /* wait until all entries are cleared */ - BLI_mutex_lock(&pool->num_mutex); - while (pool->num) - BLI_condition_wait(&pool->num_cond, &pool->num_mutex); - BLI_mutex_unlock(&pool->num_mutex); + while (pool->num) { + /* No real point in spinning here... */ + BLI_mutex_lock(&pool->scheduler->workers_mutex); + atomic_add_and_fetch_z(&pool->scheduler->workers_sleeping, 1); + BLI_condition_wait(&pool->scheduler->workers_condition, &pool->scheduler->workers_mutex); + atomic_sub_and_fetch_z(&pool->scheduler->workers_sleeping, 1); + BLI_mutex_unlock(&pool->scheduler->workers_mutex); + } pool->do_cancel = false; } |