Welcome to mirror list, hosted at ThFree Co, Russian Federation.

git.blender.org/blender.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSergey Sharybin <sergey.vfx@gmail.com>2017-03-06 17:40:05 +0300
committerSergey Sharybin <sergey.vfx@gmail.com>2017-03-07 19:32:01 +0300
commit55c2cd85f0bcf39820013c6ebba1d375d078323d (patch)
treee6b04d9ff73aea256f434447d8f683773adfe783 /source/blender/blenlib/intern/task.c
parent2f722f1a4966f4eae695521bbcf3d5b2bf02a8dd (diff)
Task scheduler: Initial implementation of local tasks queues
The idea is to allow some amount of tasks to be pushed from working thread to it's local queue, so we can acquire some work without doing whole mutex lock. This should allow us to remove some hacks from depsgraph which was added there to keep threads alive.
Diffstat (limited to 'source/blender/blenlib/intern/task.c')
-rw-r--r--source/blender/blenlib/intern/task.c76
1 files changed, 70 insertions, 6 deletions
diff --git a/source/blender/blenlib/intern/task.c b/source/blender/blenlib/intern/task.c
index a7449f97bfc..4e73ff66338 100644
--- a/source/blender/blenlib/intern/task.c
+++ b/source/blender/blenlib/intern/task.c
@@ -48,6 +48,27 @@
*/
#define MEMPOOL_SIZE 256
+/* Number of tasks which are pushed directly to local thread queue.
+ *
+ * This allows thread to fetch next task without locking the whole queue.
+ */
+#define LOCALQUEUE_SIZE 1
+
+#ifndef NDEBUG
+# define ASSERT_THREAD_ID(scheduler, thread_id) \
+ do { \
+ if (!BLI_thread_is_main()) { \
+ TaskThread *thread = pthread_getspecific(scheduler->tls_id_key); \
+ BLI_assert(thread_id == thread->id); \
+ } \
+ else { \
+ BLI_assert(thread_id == 0); \
+ } \
+ } while (false)
+#else
+# define ASSERT_THREAD_ID(scheduler, thread_id)
+#endif
+
typedef struct Task {
struct Task *next, *prev;
@@ -104,6 +125,8 @@ typedef struct TaskMemPoolStats {
typedef struct TaskThreadLocalStorage {
TaskMemPool task_mempool;
+ int num_local_queue;
+ Task *local_queue[LOCALQUEUE_SIZE];
} TaskThreadLocalStorage;
struct TaskPool {
@@ -117,6 +140,7 @@ struct TaskPool {
ThreadMutex user_mutex;
volatile bool do_cancel;
+ volatile bool do_work;
/* If set, this pool may never be work_and_wait'ed, which means TaskScheduler
* has to use its special background fallback thread in case we are in
@@ -175,6 +199,9 @@ BLI_INLINE TaskThreadLocalStorage *get_task_tls(TaskPool *pool,
TaskScheduler *scheduler = pool->scheduler;
BLI_assert(thread_id >= 0);
BLI_assert(thread_id <= scheduler->num_threads);
+ if (thread_id == 0) {
+ return &scheduler->task_threads[pool->thread_id].tls;
+ }
return &scheduler->task_threads[thread_id].tls;
}
@@ -314,9 +341,28 @@ static bool task_scheduler_thread_wait_pop(TaskScheduler *scheduler, Task **task
return true;
}
+BLI_INLINE void handle_local_queue(TaskThreadLocalStorage *tls,
+ const int thread_id)
+{
+ while (tls->num_local_queue > 0) {
+ /* We pop task from queue before handling it so handler of the task can
+ * push next job to the local queue.
+ */
+ tls->num_local_queue--;
+ Task *local_task = tls->local_queue[tls->num_local_queue];
+ /* TODO(sergey): Double-check work_and_wait() doesn't handle other's
+ * pool tasks.
+ */
+ TaskPool *local_pool = local_task->pool;
+ local_task->run(local_pool, local_task->taskdata, thread_id);
+ task_free(local_pool, local_task, thread_id);
+ }
+}
+
static void *task_scheduler_thread_run(void *thread_p)
{
TaskThread *thread = (TaskThread *) thread_p;
+ TaskThreadLocalStorage *tls = &thread->tls;
TaskScheduler *scheduler = thread->scheduler;
int thread_id = thread->id;
Task *task;
@@ -333,6 +379,9 @@ static void *task_scheduler_thread_run(void *thread_p)
/* delete task */
task_free(pool, task, thread_id);
+ /* Handle all tasks from local queue. */
+ handle_local_queue(tls, thread_id);
+
/* notify pool task was done */
task_pool_num_decrease(pool, 1);
}
@@ -506,6 +555,7 @@ static TaskPool *task_pool_create_ex(TaskScheduler *scheduler, void *userdata, c
pool->scheduler = scheduler;
pool->num = 0;
pool->do_cancel = false;
+ pool->do_work = false;
pool->run_in_background = is_background;
BLI_mutex_init(&pool->num_mutex);
@@ -603,6 +653,19 @@ static void task_pool_push(
task->freedata = freedata;
task->pool = pool;
+ if (thread_id != -1 &&
+ (thread_id != pool->thread_id || pool->do_work))
+ {
+ ASSERT_THREAD_ID(pool->scheduler, thread_id);
+
+ TaskThreadLocalStorage *tls = get_task_tls(pool, thread_id);
+ if (tls->num_local_queue < LOCALQUEUE_SIZE) {
+ tls->local_queue[tls->num_local_queue] = task;
+ tls->num_local_queue++;
+ return;
+ }
+ }
+
task_scheduler_push(pool->scheduler, task, priority);
}
@@ -627,14 +690,12 @@ void BLI_task_pool_push_from_thread(TaskPool *pool, TaskRunFunction run,
void BLI_task_pool_work_and_wait(TaskPool *pool)
{
+ TaskThreadLocalStorage *tls = get_task_tls(pool, pool->thread_id);
TaskScheduler *scheduler = pool->scheduler;
-#ifndef NDEBUG
- if (!BLI_thread_is_main()) {
- TaskThread *thread = pthread_getspecific(scheduler->tls_id_key);
- BLI_assert(pool->thread_id == thread->id);
- }
-#endif
+ pool->do_work = true;
+
+ ASSERT_THREAD_ID(pool->scheduler, pool->thread_id);
BLI_mutex_lock(&pool->num_mutex);
@@ -668,6 +729,9 @@ void BLI_task_pool_work_and_wait(TaskPool *pool)
/* delete task */
task_free(pool, task, pool->thread_id);
+ /* Handle all tasks from local queue. */
+ handle_local_queue(tls, pool->thread_id);
+
/* notify pool task was done */
task_pool_num_decrease(pool, 1);
}