Welcome to mirror list, hosted at ThFree Co, Russian Federation.

git.blender.org/blender.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
Diffstat (limited to 'source/blender/blenlib/intern/task.c')
-rw-r--r--source/blender/blenlib/intern/task.c189
1 files changed, 110 insertions, 79 deletions
diff --git a/source/blender/blenlib/intern/task.c b/source/blender/blenlib/intern/task.c
index 91e821a8f1a..21c02c31a9d 100644
--- a/source/blender/blenlib/intern/task.c
+++ b/source/blender/blenlib/intern/task.c
@@ -35,6 +35,8 @@
#include "BLI_task.h"
#include "BLI_threads.h"
+#include "PIL_time.h"
+
#include "atomic_ops.h"
/* Define this to enable some detailed statistic print. */
@@ -48,6 +50,10 @@
*/
#define MEMPOOL_SIZE 256
+/* Parameters controlling how much we spin in nanosleeps before switching to real condition-controlled sleeping. */
+#define NANOSLEEP_MAX_SPINNING 1000 /* Number of failed attempt to get a task before going to condition waiting. */
+#define NANOSLEEP_DURATION (const struct timespec[]){{0, 200L}} /* Nanosleep duration (in nano-seconds). */
+
typedef struct Task {
struct Task *next, *prev;
@@ -105,11 +111,9 @@ typedef struct TaskMemPoolStats {
struct TaskPool {
TaskScheduler *scheduler;
- volatile size_t num;
+ size_t num;
size_t num_threads;
size_t currently_running_tasks;
- ThreadMutex num_mutex;
- ThreadCondition num_cond;
void *userdata;
ThreadMutex user_mutex;
@@ -146,10 +150,14 @@ struct TaskScheduler {
bool background_thread_only;
ListBase queue;
- ThreadMutex queue_mutex;
- ThreadCondition queue_cond;
+ size_t num_queued;
+ SpinLock queue_spinlock;
+
+ ThreadMutex workers_mutex;
+ ThreadCondition workers_condition;
+ size_t workers_sleeping;
- volatile bool do_exit;
+ uint8_t do_exit;
};
typedef struct TaskThread {
@@ -231,36 +239,33 @@ static void task_free(TaskPool *pool, Task *task, const int thread_id)
static void task_pool_num_decrease(TaskPool *pool, size_t done)
{
- BLI_mutex_lock(&pool->num_mutex);
-
BLI_assert(pool->num >= done);
- pool->num -= done;
+ atomic_sub_and_fetch_z(&pool->num, done);
atomic_sub_and_fetch_z(&pool->currently_running_tasks, done);
- if (pool->num == 0)
- BLI_condition_notify_all(&pool->num_cond);
-
- BLI_mutex_unlock(&pool->num_mutex);
+ if (pool->num == 0 && pool->scheduler->workers_sleeping != 0) {
+ BLI_mutex_lock(&pool->scheduler->workers_mutex);
+ BLI_condition_notify_all(&pool->scheduler->workers_condition);
+ BLI_mutex_unlock(&pool->scheduler->workers_mutex);
+ }
}
static void task_pool_num_increase(TaskPool *pool)
{
- BLI_mutex_lock(&pool->num_mutex);
-
- pool->num++;
- BLI_condition_notify_all(&pool->num_cond);
+ atomic_add_and_fetch_z(&pool->num, 1);
- BLI_mutex_unlock(&pool->num_mutex);
+ if (pool->scheduler->workers_sleeping != 0) {
+ BLI_mutex_lock(&pool->scheduler->workers_mutex);
+ BLI_condition_notify_all(&pool->scheduler->workers_condition);
+ BLI_mutex_unlock(&pool->scheduler->workers_mutex);
+ }
}
static bool task_scheduler_thread_wait_pop(TaskScheduler *scheduler, Task **task)
{
bool found_task = false;
- BLI_mutex_lock(&scheduler->queue_mutex);
-
- while (!scheduler->queue.first && !scheduler->do_exit)
- BLI_condition_wait(&scheduler->queue_cond, &scheduler->queue_mutex);
+ int loop_count = 0;
do {
Task *current_task;
@@ -276,38 +281,52 @@ static bool task_scheduler_thread_wait_pop(TaskScheduler *scheduler, Task **task
* So we only abort here if do_exit is set.
*/
if (scheduler->do_exit) {
- BLI_mutex_unlock(&scheduler->queue_mutex);
return false;
}
- for (current_task = scheduler->queue.first;
- current_task != NULL;
- current_task = current_task->next)
- {
- TaskPool *pool = current_task->pool;
+ if (scheduler->num_queued != 0) {
+ BLI_spin_lock(&scheduler->queue_spinlock);
+ for (current_task = scheduler->queue.first;
+ current_task != NULL;
+ current_task = current_task->next)
+ {
+ TaskPool *pool = current_task->pool;
+
+ if (scheduler->background_thread_only && !pool->run_in_background) {
+ continue;
+ }
- if (scheduler->background_thread_only && !pool->run_in_background) {
- continue;
+ if (atomic_add_and_fetch_z(&pool->currently_running_tasks, 1) <= pool->num_threads ||
+ pool->num_threads == 0)
+ {
+ *task = current_task;
+ found_task = true;
+ BLI_remlink(&scheduler->queue, *task);
+ atomic_sub_and_fetch_z(&scheduler->num_queued, 1);
+ break;
+ }
+ else {
+ atomic_sub_and_fetch_z(&pool->currently_running_tasks, 1);
+ }
}
+ BLI_spin_unlock(&scheduler->queue_spinlock);
+ }
- if (atomic_add_and_fetch_z(&pool->currently_running_tasks, 1) <= pool->num_threads ||
- pool->num_threads == 0)
- {
- *task = current_task;
- found_task = true;
- BLI_remlink(&scheduler->queue, *task);
- break;
+ if (!found_task) {
+ if (++loop_count > NANOSLEEP_MAX_SPINNING) {
+ BLI_mutex_lock(&scheduler->workers_mutex);
+ atomic_add_and_fetch_z(&scheduler->workers_sleeping, 1);
+ BLI_condition_wait(&scheduler->workers_condition, &scheduler->workers_mutex);
+ atomic_sub_and_fetch_z(&scheduler->workers_sleeping, 1);
+ BLI_mutex_unlock(&scheduler->workers_mutex);
}
else {
- atomic_sub_and_fetch_z(&pool->currently_running_tasks, 1);
+ nanosleep(NANOSLEEP_DURATION, NULL);
}
+// PIL_sleep_ms(1);
}
- if (!found_task)
- BLI_condition_wait(&scheduler->queue_cond, &scheduler->queue_mutex);
} while (!found_task);
- BLI_mutex_unlock(&scheduler->queue_mutex);
-
return true;
}
@@ -341,11 +360,13 @@ TaskScheduler *BLI_task_scheduler_create(int num_threads)
/* multiple places can use this task scheduler, sharing the same
* threads, so we keep track of the number of users. */
- scheduler->do_exit = false;
+ scheduler->do_exit = 0;
BLI_listbase_clear(&scheduler->queue);
- BLI_mutex_init(&scheduler->queue_mutex);
- BLI_condition_init(&scheduler->queue_cond);
+ BLI_spin_init(&scheduler->queue_spinlock);
+
+ BLI_mutex_init(&scheduler->workers_mutex);
+ BLI_condition_init(&scheduler->workers_condition);
if (num_threads == 0) {
/* automatic number of threads will be main thread + num cores */
@@ -391,10 +412,12 @@ void BLI_task_scheduler_free(TaskScheduler *scheduler)
Task *task;
/* stop all waiting threads */
- BLI_mutex_lock(&scheduler->queue_mutex);
- scheduler->do_exit = true;
- BLI_condition_notify_all(&scheduler->queue_cond);
- BLI_mutex_unlock(&scheduler->queue_mutex);
+ atomic_fetch_and_or_uint8(&scheduler->do_exit, 1);
+ if (scheduler->workers_sleeping != 0) {
+ BLI_mutex_lock(&scheduler->workers_mutex);
+ BLI_condition_notify_all(&scheduler->workers_condition);
+ BLI_mutex_unlock(&scheduler->workers_mutex);
+ }
/* delete threads */
if (scheduler->threads) {
@@ -430,8 +453,10 @@ void BLI_task_scheduler_free(TaskScheduler *scheduler)
BLI_freelistN(&scheduler->queue);
/* delete mutex/condition */
- BLI_mutex_end(&scheduler->queue_mutex);
- BLI_condition_end(&scheduler->queue_cond);
+ BLI_spin_end(&scheduler->queue_spinlock);
+
+ BLI_mutex_end(&scheduler->workers_mutex);
+ BLI_condition_end(&scheduler->workers_condition);
MEM_freeN(scheduler);
}
@@ -446,15 +471,16 @@ static void task_scheduler_push(TaskScheduler *scheduler, Task *task, TaskPriori
task_pool_num_increase(task->pool);
/* add task to queue */
- BLI_mutex_lock(&scheduler->queue_mutex);
+ BLI_spin_lock(&scheduler->queue_spinlock);
if (priority == TASK_PRIORITY_HIGH)
BLI_addhead(&scheduler->queue, task);
else
BLI_addtail(&scheduler->queue, task);
- BLI_condition_notify_one(&scheduler->queue_cond);
- BLI_mutex_unlock(&scheduler->queue_mutex);
+ BLI_spin_unlock(&scheduler->queue_spinlock);
+
+ atomic_add_and_fetch_z(&scheduler->num_queued, 1);
}
static void task_scheduler_clear(TaskScheduler *scheduler, TaskPool *pool)
@@ -462,7 +488,7 @@ static void task_scheduler_clear(TaskScheduler *scheduler, TaskPool *pool)
Task *task, *nexttask;
size_t done = 0;
- BLI_mutex_lock(&scheduler->queue_mutex);
+ BLI_spin_lock(&scheduler->queue_spinlock);
/* free all tasks from this pool from the queue */
for (task = scheduler->queue.first; task; task = nexttask) {
@@ -471,12 +497,13 @@ static void task_scheduler_clear(TaskScheduler *scheduler, TaskPool *pool)
if (task->pool == pool) {
task_data_free(task, 0);
BLI_freelinkN(&scheduler->queue, task);
+ atomic_sub_and_fetch_z(&scheduler->num_queued, 1);
done++;
}
}
- BLI_mutex_unlock(&scheduler->queue_mutex);
+ BLI_spin_unlock(&scheduler->queue_spinlock);
/* notify done */
task_pool_num_decrease(pool, done);
@@ -507,9 +534,6 @@ static TaskPool *task_pool_create_ex(TaskScheduler *scheduler, void *userdata, c
pool->do_cancel = false;
pool->run_in_background = is_background;
- BLI_mutex_init(&pool->num_mutex);
- BLI_condition_init(&pool->num_cond);
-
pool->userdata = userdata;
BLI_mutex_init(&pool->user_mutex);
@@ -567,9 +591,6 @@ void BLI_task_pool_free(TaskPool *pool)
{
BLI_task_pool_cancel(pool);
- BLI_mutex_end(&pool->num_mutex);
- BLI_condition_end(&pool->num_cond);
-
BLI_mutex_end(&pool->user_mutex);
/* Free local memory pool, those pointers are lost forever. */
@@ -634,35 +655,31 @@ void BLI_task_pool_push_from_thread(TaskPool *pool, TaskRunFunction run,
void BLI_task_pool_work_and_wait(TaskPool *pool)
{
TaskScheduler *scheduler = pool->scheduler;
-
- BLI_mutex_lock(&pool->num_mutex);
+ int loop_count = 0;
while (pool->num != 0) {
Task *task, *work_task = NULL;
bool found_task = false;
- BLI_mutex_unlock(&pool->num_mutex);
-
- BLI_mutex_lock(&scheduler->queue_mutex);
-
/* find task from this pool. if we get a task from another pool,
* we can get into deadlock */
if (pool->num_threads == 0 ||
pool->currently_running_tasks < pool->num_threads)
{
+ BLI_spin_lock(&scheduler->queue_spinlock);
for (task = scheduler->queue.first; task; task = task->next) {
if (task->pool == pool) {
work_task = task;
found_task = true;
BLI_remlink(&scheduler->queue, task);
+ atomic_sub_and_fetch_z(&scheduler->num_queued, 1);
break;
}
}
+ BLI_spin_unlock(&scheduler->queue_spinlock);
}
- BLI_mutex_unlock(&scheduler->queue_mutex);
-
/* if found task, do it, otherwise wait until other tasks are done */
if (found_task) {
/* run task */
@@ -674,17 +691,27 @@ void BLI_task_pool_work_and_wait(TaskPool *pool)
/* notify pool task was done */
task_pool_num_decrease(pool, 1);
+
+ /* Reset the 'failde' counter to zero. */
+ loop_count = 0;
}
- BLI_mutex_lock(&pool->num_mutex);
if (pool->num == 0)
break;
- if (!found_task)
- BLI_condition_wait(&pool->num_cond, &pool->num_mutex);
+ if (!found_task) {
+ if (++loop_count > NANOSLEEP_MAX_SPINNING) {
+ BLI_mutex_lock(&scheduler->workers_mutex);
+ atomic_add_and_fetch_z(&scheduler->workers_sleeping, 1);
+ BLI_condition_wait(&scheduler->workers_condition, &scheduler->workers_mutex);
+ atomic_sub_and_fetch_z(&scheduler->workers_sleeping, 1);
+ BLI_mutex_unlock(&scheduler->workers_mutex);
+ }
+ else {
+ nanosleep(NANOSLEEP_DURATION, NULL);
+ }
+ }
}
-
- BLI_mutex_unlock(&pool->num_mutex);
}
void BLI_pool_set_num_threads(TaskPool *pool, int num_threads)
@@ -700,10 +727,14 @@ void BLI_task_pool_cancel(TaskPool *pool)
task_scheduler_clear(pool->scheduler, pool);
/* wait until all entries are cleared */
- BLI_mutex_lock(&pool->num_mutex);
- while (pool->num)
- BLI_condition_wait(&pool->num_cond, &pool->num_mutex);
- BLI_mutex_unlock(&pool->num_mutex);
+ while (pool->num) {
+ /* No real point in spinning here... */
+ BLI_mutex_lock(&pool->scheduler->workers_mutex);
+ atomic_add_and_fetch_z(&pool->scheduler->workers_sleeping, 1);
+ BLI_condition_wait(&pool->scheduler->workers_condition, &pool->scheduler->workers_mutex);
+ atomic_sub_and_fetch_z(&pool->scheduler->workers_sleeping, 1);
+ BLI_mutex_unlock(&pool->scheduler->workers_mutex);
+ }
pool->do_cancel = false;
}