Welcome to mirror list, hosted at ThFree Co, Russian Federation.

git.blender.org/blender.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSergey Sharybin <sergey.vfx@gmail.com>2017-05-31 16:24:09 +0300
committerSergey Sharybin <sergey.vfx@gmail.com>2017-05-31 16:44:08 +0300
commita481908232ef20449e6ad6951769677e0b108ca8 (patch)
tree596743c09a8f1572c8ef58d07eb165bdb72377b6
parent2ae697393664dd94cc74ce2dfd7c691e8483cba9 (diff)
Task scheduler: Optimize subsequent pushing bunch of tasks
The idea is to accumulate all new tasks in a thread local queue first without doing any thread synchronization (aka, locks and conditional variables) and move those tasks to a scheduler queue once they are all ready. This way we avoid per-task-pool lock and only have one lock per bunch of tasks. This is particularly handy when scheduling new dependency graph node children. Brings FPS of cached simulation from the linked below file from ~30 to ~50. See documentation for BLI_task_pool_delayed_push_{begin, end} and for TaskThreadLocalStorage::do_delayed_push. Fixes T50027: Rigidbody playback and simulation performance regression with new depsgraph Thanks Bastien for the review!
-rw-r--r--source/blender/blenlib/BLI_task.h7
-rw-r--r--source/blender/blenlib/intern/task.c114
-rw-r--r--source/blender/depsgraph/intern/eval/deg_eval.cc2
3 files changed, 115 insertions, 8 deletions
diff --git a/source/blender/blenlib/BLI_task.h b/source/blender/blenlib/BLI_task.h
index c3c587275e1..721327d26a8 100644
--- a/source/blender/blenlib/BLI_task.h
+++ b/source/blender/blenlib/BLI_task.h
@@ -106,6 +106,13 @@ void *BLI_task_pool_userdata(TaskPool *pool);
/* optional mutex to use from run function */
ThreadMutex *BLI_task_pool_user_mutex(TaskPool *pool);
+/* Delayed push, use that to reduce thread overhead by accumulating
+ * all new tasks into local queue first and pushing it to scheduler
+ * from within a single mutex lock.
+ */
+void BLI_task_pool_delayed_push_begin(TaskPool *pool, int thread_id);
+void BLI_task_pool_delayed_push_end(TaskPool *pool, int thread_id);
+
/* Parallel for routines */
typedef void (*TaskParallelRangeFunc)(void *userdata, const int iter);
typedef void (*TaskParallelRangeFuncEx)(void *userdata, void *userdata_chunk, const int iter, const int thread_id);
diff --git a/source/blender/blenlib/intern/task.c b/source/blender/blenlib/intern/task.c
index 3766a72319b..a1eae8f1955 100644
--- a/source/blender/blenlib/intern/task.c
+++ b/source/blender/blenlib/intern/task.c
@@ -54,6 +54,13 @@
*/
#define LOCAL_QUEUE_SIZE 1
+/* Number of tasks which are allowed to be scheduled in a delayed manner.
+ *
+ * This allows to use less locks per graph node children schedule. More details
+ * could be found at TaskThreadLocalStorage::do_delayed_push.
+ */
+#define DELAYED_QUEUE_SIZE 4096
+
#ifndef NDEBUG
# define ASSERT_THREAD_ID(scheduler, thread_id) \
do { \
@@ -129,9 +136,28 @@ typedef struct TaskMemPoolStats {
#endif
typedef struct TaskThreadLocalStorage {
+ /* Memory pool for faster task allocation.
+ * The idea is to re-use memory of finished/discarded tasks by this thread.
+ */
TaskMemPool task_mempool;
+
+ /* Local queue keeps thread alive by keeping small amount of tasks ready
+ * to be picked up without causing global thread locks for synchronization.
+ */
int num_local_queue;
Task *local_queue[LOCAL_QUEUE_SIZE];
+
+ /* Thread can be marked for delayed tasks push. This is helpful when it's
+ * know that lots of subsequent task pushed will happen from the same thread
+ * without "interrupting" for task execution.
+ *
+ * We try to accumulate as much tasks as possible in a local queue without
+ * any locks first, and then we push all of them into a scheduler's queue
+ * from within a single mutex lock.
+ */
+ bool do_delayed_push;
+ int num_delayed_queue;
+ Task *delayed_queue[DELAYED_QUEUE_SIZE];
} TaskThreadLocalStorage;
struct TaskPool {
@@ -378,6 +404,7 @@ static bool task_scheduler_thread_wait_pop(TaskScheduler *scheduler, Task **task
BLI_INLINE void handle_local_queue(TaskThreadLocalStorage *tls,
const int thread_id)
{
+ BLI_assert(!tls->do_delayed_push);
while (tls->num_local_queue > 0) {
/* We pop task from queue before handling it so handler of the task can
* push next job to the local queue.
@@ -391,6 +418,7 @@ BLI_INLINE void handle_local_queue(TaskThreadLocalStorage *tls,
local_task->run(local_pool, local_task->taskdata, thread_id);
task_free(local_pool, local_task, thread_id);
}
+ BLI_assert(!tls->do_delayed_push);
}
static void *task_scheduler_thread_run(void *thread_p)
@@ -408,7 +436,9 @@ static void *task_scheduler_thread_run(void *thread_p)
TaskPool *pool = task->pool;
/* run task */
+ BLI_assert(!tls->do_delayed_push);
task->run(pool, task->taskdata, thread_id);
+ BLI_assert(!tls->do_delayed_push);
/* delete task */
task_free(pool, task, thread_id);
@@ -547,6 +577,27 @@ static void task_scheduler_push(TaskScheduler *scheduler, Task *task, TaskPriori
BLI_mutex_unlock(&scheduler->queue_mutex);
}
+static void task_scheduler_push_all(TaskScheduler *scheduler,
+ TaskPool *pool,
+ Task **tasks,
+ int num_tasks)
+{
+ if (num_tasks == 0) {
+ return;
+ }
+
+ task_pool_num_increase(pool, num_tasks);
+
+ BLI_mutex_lock(&scheduler->queue_mutex);
+
+ for (int i = 0; i < num_tasks; i++) {
+ BLI_addhead(&scheduler->queue, tasks[i]);
+ }
+
+ BLI_condition_notify_all(&scheduler->queue_cond);
+ BLI_mutex_unlock(&scheduler->queue_mutex);
+}
+
static void task_scheduler_clear(TaskScheduler *scheduler, TaskPool *pool)
{
Task *task, *nexttask;
@@ -714,38 +765,59 @@ void BLI_task_pool_free(TaskPool *pool)
BLI_end_threaded_malloc();
}
+BLI_INLINE bool task_can_use_local_queues(TaskPool *pool, int thread_id)
+{
+ return (thread_id != -1 && (thread_id != pool->thread_id || pool->do_work));
+}
+
static void task_pool_push(
TaskPool *pool, TaskRunFunction run, void *taskdata,
bool free_taskdata, TaskFreeFunction freedata, TaskPriority priority,
int thread_id)
{
+ /* Allocate task and fill it's properties. */
Task *task = task_alloc(pool, thread_id);
-
task->run = run;
task->taskdata = taskdata;
task->free_taskdata = free_taskdata;
task->freedata = freedata;
task->pool = pool;
-
+ /* For suspended pools we put everything yo a global queue first
+ * and exit as soon as possible.
+ *
+ * This tasks will be moved to actual execution when pool is
+ * activated by work_and_wait().
+ */
if (pool->is_suspended) {
BLI_addhead(&pool->suspended_queue, task);
atomic_fetch_and_add_z(&pool->num_suspended, 1);
return;
}
-
- if (thread_id != -1 &&
- (thread_id != pool->thread_id || pool->do_work))
- {
+ /* Populate to any local queue first, this is cheapest push ever. */
+ if (task_can_use_local_queues(pool, thread_id)) {
ASSERT_THREAD_ID(pool->scheduler, thread_id);
-
TaskThreadLocalStorage *tls = get_task_tls(pool, thread_id);
+ /* Try to push to a local execution queue.
+ * These tasks will be picked up next.
+ */
if (tls->num_local_queue < LOCAL_QUEUE_SIZE) {
tls->local_queue[tls->num_local_queue] = task;
tls->num_local_queue++;
return;
}
+ /* If we are in the delayed tasks push mode, we push tasks to a
+ * temporary local queue first without any locks, and then move them
+ * to global execution queue with a single lock.
+ */
+ if (tls->do_delayed_push && tls->num_delayed_queue < DELAYED_QUEUE_SIZE) {
+ tls->delayed_queue[tls->num_delayed_queue] = task;
+ tls->num_delayed_queue++;
+ return;
+ }
}
-
+ /* Do push to a global execution ppol, slowest possible method,
+ * causes quite reasonable amount of threading overhead.
+ */
task_scheduler_push(pool->scheduler, task, priority);
}
@@ -816,7 +888,9 @@ void BLI_task_pool_work_and_wait(TaskPool *pool)
/* if found task, do it, otherwise wait until other tasks are done */
if (found_task) {
/* run task */
+ BLI_assert(!tls->do_delayed_push);
work_task->run(pool, work_task->taskdata, pool->thread_id);
+ BLI_assert(!tls->do_delayed_push);
/* delete task */
task_free(pool, task, pool->thread_id);
@@ -871,6 +945,30 @@ ThreadMutex *BLI_task_pool_user_mutex(TaskPool *pool)
return &pool->user_mutex;
}
+void BLI_task_pool_delayed_push_begin(TaskPool *pool, int thread_id)
+{
+ if (task_can_use_local_queues(pool, thread_id)) {
+ ASSERT_THREAD_ID(pool->scheduler, thread_id);
+ TaskThreadLocalStorage *tls = get_task_tls(pool, thread_id);
+ tls->do_delayed_push = true;
+ }
+}
+
+void BLI_task_pool_delayed_push_end(TaskPool *pool, int thread_id)
+{
+ if (task_can_use_local_queues(pool, thread_id)) {
+ ASSERT_THREAD_ID(pool->scheduler, thread_id);
+ TaskThreadLocalStorage *tls = get_task_tls(pool, thread_id);
+ BLI_assert(tls->do_delayed_push);
+ task_scheduler_push_all(pool->scheduler,
+ pool,
+ tls->delayed_queue,
+ tls->num_delayed_queue);
+ tls->do_delayed_push = false;
+ tls->num_delayed_queue = 0;
+ }
+}
+
/* Parallel range routines */
/**
diff --git a/source/blender/depsgraph/intern/eval/deg_eval.cc b/source/blender/depsgraph/intern/eval/deg_eval.cc
index e739bc9dbb5..54947ddbb5e 100644
--- a/source/blender/depsgraph/intern/eval/deg_eval.cc
+++ b/source/blender/depsgraph/intern/eval/deg_eval.cc
@@ -126,7 +126,9 @@ static void deg_task_run_func(TaskPool *pool,
#endif
}
+ BLI_task_pool_delayed_push_begin(pool, thread_id);
schedule_children(pool, state->graph, node, state->layers, thread_id);
+ BLI_task_pool_delayed_push_end(pool, thread_id);
}
typedef struct CalculatePengindData {