Welcome to mirror list, hosted at ThFree Co, Russian Federation.

git.blender.org/blender.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBastien Montagne <montagne29@wanadoo.fr>2016-12-21 22:33:42 +0300
committerBastien Montagne <montagne29@wanadoo.fr>2017-03-03 19:42:33 +0300
commite0d486eb368dc1a0df2c4ab4a99f9bc29e2dd7db (patch)
tree8bb0e44d0fba63f7f9073f363110013ae26a69fe /source/blender
parent7b92b64742d671a6a858583928305a58739a037b (diff)
Attempt to address performances issues of task scheduler with lots of very small tasks.
This is partially based on Sergey's work from D2421, but pushing the things a bit further. Basically: - We keep a sheduler-counter of TODO tasks, which avoids us to do any locking (even of the spinlock) when queue is empty, in workers. - We spin/nanosleep a bit (less than a ms) when we cannot find a task, before going into real condition-waiting sleep. - We keep a counter of condition-sleeping threads, and only use condition notifications in case we know some are waiting on it. In other words, when no tasks are available, we spend a bit of time in a rather high-activity but very cheap and totally lock-free loop, before going into more expansive real condition-waiting sleep. No noticeable speedup in complex production scene (barbershop one), here master, D2421 and this code give roughly same performances (about 30% slower in new than in old despgraph). But with testfile from T50027 and new depsgraph, after initial bake, with master I have ~14fps, with D2421 ~14.5fps, and with this code ~19.5fps. Note that in theory, we could get completely rid of condition and stay in the nanosleep loop, but this implies some rather high 'noise' (about 3% of CPU usage here with 8 cores), and going into condition-waiting state after a few hundreds of micro-seconds does not give any measurable slow down for me. Also note that this code is only working on POSIX systems (so no Windows, not sure how to do our nanosleeps on this OS :/ ). Reviewers: sergey Differential Revision: https://developer.blender.org/D2426
Diffstat (limited to 'source/blender')
-rw-r--r--source/blender/blenlib/intern/task.c189
1 files changed, 110 insertions, 79 deletions
diff --git a/source/blender/blenlib/intern/task.c b/source/blender/blenlib/intern/task.c
index 91e821a8f1a..21c02c31a9d 100644
--- a/source/blender/blenlib/intern/task.c
+++ b/source/blender/blenlib/intern/task.c
@@ -35,6 +35,8 @@
#include "BLI_task.h"
#include "BLI_threads.h"
+#include "PIL_time.h"
+
#include "atomic_ops.h"
/* Define this to enable some detailed statistic print. */
@@ -48,6 +50,10 @@
*/
#define MEMPOOL_SIZE 256
+/* Parameters controlling how much we spin in nanosleeps before switching to real condition-controlled sleeping. */
+#define NANOSLEEP_MAX_SPINNING 1000 /* Number of failed attempt to get a task before going to condition waiting. */
+#define NANOSLEEP_DURATION (const struct timespec[]){{0, 200L}} /* Nanosleep duration (in nano-seconds). */
+
typedef struct Task {
struct Task *next, *prev;
@@ -105,11 +111,9 @@ typedef struct TaskMemPoolStats {
struct TaskPool {
TaskScheduler *scheduler;
- volatile size_t num;
+ size_t num;
size_t num_threads;
size_t currently_running_tasks;
- ThreadMutex num_mutex;
- ThreadCondition num_cond;
void *userdata;
ThreadMutex user_mutex;
@@ -146,10 +150,14 @@ struct TaskScheduler {
bool background_thread_only;
ListBase queue;
- ThreadMutex queue_mutex;
- ThreadCondition queue_cond;
+ size_t num_queued;
+ SpinLock queue_spinlock;
+
+ ThreadMutex workers_mutex;
+ ThreadCondition workers_condition;
+ size_t workers_sleeping;
- volatile bool do_exit;
+ uint8_t do_exit;
};
typedef struct TaskThread {
@@ -231,36 +239,33 @@ static void task_free(TaskPool *pool, Task *task, const int thread_id)
static void task_pool_num_decrease(TaskPool *pool, size_t done)
{
- BLI_mutex_lock(&pool->num_mutex);
-
BLI_assert(pool->num >= done);
- pool->num -= done;
+ atomic_sub_and_fetch_z(&pool->num, done);
atomic_sub_and_fetch_z(&pool->currently_running_tasks, done);
- if (pool->num == 0)
- BLI_condition_notify_all(&pool->num_cond);
-
- BLI_mutex_unlock(&pool->num_mutex);
+ if (pool->num == 0 && pool->scheduler->workers_sleeping != 0) {
+ BLI_mutex_lock(&pool->scheduler->workers_mutex);
+ BLI_condition_notify_all(&pool->scheduler->workers_condition);
+ BLI_mutex_unlock(&pool->scheduler->workers_mutex);
+ }
}
static void task_pool_num_increase(TaskPool *pool)
{
- BLI_mutex_lock(&pool->num_mutex);
-
- pool->num++;
- BLI_condition_notify_all(&pool->num_cond);
+ atomic_add_and_fetch_z(&pool->num, 1);
- BLI_mutex_unlock(&pool->num_mutex);
+ if (pool->scheduler->workers_sleeping != 0) {
+ BLI_mutex_lock(&pool->scheduler->workers_mutex);
+ BLI_condition_notify_all(&pool->scheduler->workers_condition);
+ BLI_mutex_unlock(&pool->scheduler->workers_mutex);
+ }
}
static bool task_scheduler_thread_wait_pop(TaskScheduler *scheduler, Task **task)
{
bool found_task = false;
- BLI_mutex_lock(&scheduler->queue_mutex);
-
- while (!scheduler->queue.first && !scheduler->do_exit)
- BLI_condition_wait(&scheduler->queue_cond, &scheduler->queue_mutex);
+ int loop_count = 0;
do {
Task *current_task;
@@ -276,38 +281,52 @@ static bool task_scheduler_thread_wait_pop(TaskScheduler *scheduler, Task **task
* So we only abort here if do_exit is set.
*/
if (scheduler->do_exit) {
- BLI_mutex_unlock(&scheduler->queue_mutex);
return false;
}
- for (current_task = scheduler->queue.first;
- current_task != NULL;
- current_task = current_task->next)
- {
- TaskPool *pool = current_task->pool;
+ if (scheduler->num_queued != 0) {
+ BLI_spin_lock(&scheduler->queue_spinlock);
+ for (current_task = scheduler->queue.first;
+ current_task != NULL;
+ current_task = current_task->next)
+ {
+ TaskPool *pool = current_task->pool;
+
+ if (scheduler->background_thread_only && !pool->run_in_background) {
+ continue;
+ }
- if (scheduler->background_thread_only && !pool->run_in_background) {
- continue;
+ if (atomic_add_and_fetch_z(&pool->currently_running_tasks, 1) <= pool->num_threads ||
+ pool->num_threads == 0)
+ {
+ *task = current_task;
+ found_task = true;
+ BLI_remlink(&scheduler->queue, *task);
+ atomic_sub_and_fetch_z(&scheduler->num_queued, 1);
+ break;
+ }
+ else {
+ atomic_sub_and_fetch_z(&pool->currently_running_tasks, 1);
+ }
}
+ BLI_spin_unlock(&scheduler->queue_spinlock);
+ }
- if (atomic_add_and_fetch_z(&pool->currently_running_tasks, 1) <= pool->num_threads ||
- pool->num_threads == 0)
- {
- *task = current_task;
- found_task = true;
- BLI_remlink(&scheduler->queue, *task);
- break;
+ if (!found_task) {
+ if (++loop_count > NANOSLEEP_MAX_SPINNING) {
+ BLI_mutex_lock(&scheduler->workers_mutex);
+ atomic_add_and_fetch_z(&scheduler->workers_sleeping, 1);
+ BLI_condition_wait(&scheduler->workers_condition, &scheduler->workers_mutex);
+ atomic_sub_and_fetch_z(&scheduler->workers_sleeping, 1);
+ BLI_mutex_unlock(&scheduler->workers_mutex);
}
else {
- atomic_sub_and_fetch_z(&pool->currently_running_tasks, 1);
+ nanosleep(NANOSLEEP_DURATION, NULL);
}
+// PIL_sleep_ms(1);
}
- if (!found_task)
- BLI_condition_wait(&scheduler->queue_cond, &scheduler->queue_mutex);
} while (!found_task);
- BLI_mutex_unlock(&scheduler->queue_mutex);
-
return true;
}
@@ -341,11 +360,13 @@ TaskScheduler *BLI_task_scheduler_create(int num_threads)
/* multiple places can use this task scheduler, sharing the same
* threads, so we keep track of the number of users. */
- scheduler->do_exit = false;
+ scheduler->do_exit = 0;
BLI_listbase_clear(&scheduler->queue);
- BLI_mutex_init(&scheduler->queue_mutex);
- BLI_condition_init(&scheduler->queue_cond);
+ BLI_spin_init(&scheduler->queue_spinlock);
+
+ BLI_mutex_init(&scheduler->workers_mutex);
+ BLI_condition_init(&scheduler->workers_condition);
if (num_threads == 0) {
/* automatic number of threads will be main thread + num cores */
@@ -391,10 +412,12 @@ void BLI_task_scheduler_free(TaskScheduler *scheduler)
Task *task;
/* stop all waiting threads */
- BLI_mutex_lock(&scheduler->queue_mutex);
- scheduler->do_exit = true;
- BLI_condition_notify_all(&scheduler->queue_cond);
- BLI_mutex_unlock(&scheduler->queue_mutex);
+ atomic_fetch_and_or_uint8(&scheduler->do_exit, 1);
+ if (scheduler->workers_sleeping != 0) {
+ BLI_mutex_lock(&scheduler->workers_mutex);
+ BLI_condition_notify_all(&scheduler->workers_condition);
+ BLI_mutex_unlock(&scheduler->workers_mutex);
+ }
/* delete threads */
if (scheduler->threads) {
@@ -430,8 +453,10 @@ void BLI_task_scheduler_free(TaskScheduler *scheduler)
BLI_freelistN(&scheduler->queue);
/* delete mutex/condition */
- BLI_mutex_end(&scheduler->queue_mutex);
- BLI_condition_end(&scheduler->queue_cond);
+ BLI_spin_end(&scheduler->queue_spinlock);
+
+ BLI_mutex_end(&scheduler->workers_mutex);
+ BLI_condition_end(&scheduler->workers_condition);
MEM_freeN(scheduler);
}
@@ -446,15 +471,16 @@ static void task_scheduler_push(TaskScheduler *scheduler, Task *task, TaskPriori
task_pool_num_increase(task->pool);
/* add task to queue */
- BLI_mutex_lock(&scheduler->queue_mutex);
+ BLI_spin_lock(&scheduler->queue_spinlock);
if (priority == TASK_PRIORITY_HIGH)
BLI_addhead(&scheduler->queue, task);
else
BLI_addtail(&scheduler->queue, task);
- BLI_condition_notify_one(&scheduler->queue_cond);
- BLI_mutex_unlock(&scheduler->queue_mutex);
+ BLI_spin_unlock(&scheduler->queue_spinlock);
+
+ atomic_add_and_fetch_z(&scheduler->num_queued, 1);
}
static void task_scheduler_clear(TaskScheduler *scheduler, TaskPool *pool)
@@ -462,7 +488,7 @@ static void task_scheduler_clear(TaskScheduler *scheduler, TaskPool *pool)
Task *task, *nexttask;
size_t done = 0;
- BLI_mutex_lock(&scheduler->queue_mutex);
+ BLI_spin_lock(&scheduler->queue_spinlock);
/* free all tasks from this pool from the queue */
for (task = scheduler->queue.first; task; task = nexttask) {
@@ -471,12 +497,13 @@ static void task_scheduler_clear(TaskScheduler *scheduler, TaskPool *pool)
if (task->pool == pool) {
task_data_free(task, 0);
BLI_freelinkN(&scheduler->queue, task);
+ atomic_sub_and_fetch_z(&scheduler->num_queued, 1);
done++;
}
}
- BLI_mutex_unlock(&scheduler->queue_mutex);
+ BLI_spin_unlock(&scheduler->queue_spinlock);
/* notify done */
task_pool_num_decrease(pool, done);
@@ -507,9 +534,6 @@ static TaskPool *task_pool_create_ex(TaskScheduler *scheduler, void *userdata, c
pool->do_cancel = false;
pool->run_in_background = is_background;
- BLI_mutex_init(&pool->num_mutex);
- BLI_condition_init(&pool->num_cond);
-
pool->userdata = userdata;
BLI_mutex_init(&pool->user_mutex);
@@ -567,9 +591,6 @@ void BLI_task_pool_free(TaskPool *pool)
{
BLI_task_pool_cancel(pool);
- BLI_mutex_end(&pool->num_mutex);
- BLI_condition_end(&pool->num_cond);
-
BLI_mutex_end(&pool->user_mutex);
/* Free local memory pool, those pointers are lost forever. */
@@ -634,35 +655,31 @@ void BLI_task_pool_push_from_thread(TaskPool *pool, TaskRunFunction run,
void BLI_task_pool_work_and_wait(TaskPool *pool)
{
TaskScheduler *scheduler = pool->scheduler;
-
- BLI_mutex_lock(&pool->num_mutex);
+ int loop_count = 0;
while (pool->num != 0) {
Task *task, *work_task = NULL;
bool found_task = false;
- BLI_mutex_unlock(&pool->num_mutex);
-
- BLI_mutex_lock(&scheduler->queue_mutex);
-
/* find task from this pool. if we get a task from another pool,
* we can get into deadlock */
if (pool->num_threads == 0 ||
pool->currently_running_tasks < pool->num_threads)
{
+ BLI_spin_lock(&scheduler->queue_spinlock);
for (task = scheduler->queue.first; task; task = task->next) {
if (task->pool == pool) {
work_task = task;
found_task = true;
BLI_remlink(&scheduler->queue, task);
+ atomic_sub_and_fetch_z(&scheduler->num_queued, 1);
break;
}
}
+ BLI_spin_unlock(&scheduler->queue_spinlock);
}
- BLI_mutex_unlock(&scheduler->queue_mutex);
-
/* if found task, do it, otherwise wait until other tasks are done */
if (found_task) {
/* run task */
@@ -674,17 +691,27 @@ void BLI_task_pool_work_and_wait(TaskPool *pool)
/* notify pool task was done */
task_pool_num_decrease(pool, 1);
+
+ /* Reset the 'failde' counter to zero. */
+ loop_count = 0;
}
- BLI_mutex_lock(&pool->num_mutex);
if (pool->num == 0)
break;
- if (!found_task)
- BLI_condition_wait(&pool->num_cond, &pool->num_mutex);
+ if (!found_task) {
+ if (++loop_count > NANOSLEEP_MAX_SPINNING) {
+ BLI_mutex_lock(&scheduler->workers_mutex);
+ atomic_add_and_fetch_z(&scheduler->workers_sleeping, 1);
+ BLI_condition_wait(&scheduler->workers_condition, &scheduler->workers_mutex);
+ atomic_sub_and_fetch_z(&scheduler->workers_sleeping, 1);
+ BLI_mutex_unlock(&scheduler->workers_mutex);
+ }
+ else {
+ nanosleep(NANOSLEEP_DURATION, NULL);
+ }
+ }
}
-
- BLI_mutex_unlock(&pool->num_mutex);
}
void BLI_pool_set_num_threads(TaskPool *pool, int num_threads)
@@ -700,10 +727,14 @@ void BLI_task_pool_cancel(TaskPool *pool)
task_scheduler_clear(pool->scheduler, pool);
/* wait until all entries are cleared */
- BLI_mutex_lock(&pool->num_mutex);
- while (pool->num)
- BLI_condition_wait(&pool->num_cond, &pool->num_mutex);
- BLI_mutex_unlock(&pool->num_mutex);
+ while (pool->num) {
+ /* No real point in spinning here... */
+ BLI_mutex_lock(&pool->scheduler->workers_mutex);
+ atomic_add_and_fetch_z(&pool->scheduler->workers_sleeping, 1);
+ BLI_condition_wait(&pool->scheduler->workers_condition, &pool->scheduler->workers_mutex);
+ atomic_sub_and_fetch_z(&pool->scheduler->workers_sleeping, 1);
+ BLI_mutex_unlock(&pool->scheduler->workers_mutex);
+ }
pool->do_cancel = false;
}