Welcome to mirror list, hosted at ThFree Co, Russian Federation.

git.blender.org/blender.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--source/blender/blenlib/intern/task.c76
1 files changed, 70 insertions, 6 deletions
diff --git a/source/blender/blenlib/intern/task.c b/source/blender/blenlib/intern/task.c
index a7449f97bfc..4e73ff66338 100644
--- a/source/blender/blenlib/intern/task.c
+++ b/source/blender/blenlib/intern/task.c
@@ -48,6 +48,27 @@
*/
#define MEMPOOL_SIZE 256
+/* Number of tasks which are pushed directly to local thread queue.
+ *
+ * This allows thread to fetch next task without locking the whole queue.
+ */
+#define LOCALQUEUE_SIZE 1
+
+#ifndef NDEBUG
+# define ASSERT_THREAD_ID(scheduler, thread_id) \
+ do { \
+ if (!BLI_thread_is_main()) { \
+ TaskThread *thread = pthread_getspecific(scheduler->tls_id_key); \
+ BLI_assert(thread_id == thread->id); \
+ } \
+ else { \
+ BLI_assert(thread_id == 0); \
+ } \
+ } while (false)
+#else
+# define ASSERT_THREAD_ID(scheduler, thread_id)
+#endif
+
typedef struct Task {
struct Task *next, *prev;
@@ -104,6 +125,8 @@ typedef struct TaskMemPoolStats {
typedef struct TaskThreadLocalStorage {
TaskMemPool task_mempool;
+ int num_local_queue;
+ Task *local_queue[LOCALQUEUE_SIZE];
} TaskThreadLocalStorage;
struct TaskPool {
@@ -117,6 +140,7 @@ struct TaskPool {
ThreadMutex user_mutex;
volatile bool do_cancel;
+ volatile bool do_work;
/* If set, this pool may never be work_and_wait'ed, which means TaskScheduler
* has to use its special background fallback thread in case we are in
@@ -175,6 +199,9 @@ BLI_INLINE TaskThreadLocalStorage *get_task_tls(TaskPool *pool,
TaskScheduler *scheduler = pool->scheduler;
BLI_assert(thread_id >= 0);
BLI_assert(thread_id <= scheduler->num_threads);
+ if (thread_id == 0) {
+ return &scheduler->task_threads[pool->thread_id].tls;
+ }
return &scheduler->task_threads[thread_id].tls;
}
@@ -314,9 +341,28 @@ static bool task_scheduler_thread_wait_pop(TaskScheduler *scheduler, Task **task
return true;
}
+BLI_INLINE void handle_local_queue(TaskThreadLocalStorage *tls,
+ const int thread_id)
+{
+ while (tls->num_local_queue > 0) {
+ /* We pop task from queue before handling it so handler of the task can
+ * push next job to the local queue.
+ */
+ tls->num_local_queue--;
+ Task *local_task = tls->local_queue[tls->num_local_queue];
+ /* TODO(sergey): Double-check work_and_wait() doesn't handle other's
+ * pool tasks.
+ */
+ TaskPool *local_pool = local_task->pool;
+ local_task->run(local_pool, local_task->taskdata, thread_id);
+ task_free(local_pool, local_task, thread_id);
+ }
+}
+
static void *task_scheduler_thread_run(void *thread_p)
{
TaskThread *thread = (TaskThread *) thread_p;
+ TaskThreadLocalStorage *tls = &thread->tls;
TaskScheduler *scheduler = thread->scheduler;
int thread_id = thread->id;
Task *task;
@@ -333,6 +379,9 @@ static void *task_scheduler_thread_run(void *thread_p)
/* delete task */
task_free(pool, task, thread_id);
+ /* Handle all tasks from local queue. */
+ handle_local_queue(tls, thread_id);
+
/* notify pool task was done */
task_pool_num_decrease(pool, 1);
}
@@ -506,6 +555,7 @@ static TaskPool *task_pool_create_ex(TaskScheduler *scheduler, void *userdata, c
pool->scheduler = scheduler;
pool->num = 0;
pool->do_cancel = false;
+ pool->do_work = false;
pool->run_in_background = is_background;
BLI_mutex_init(&pool->num_mutex);
@@ -603,6 +653,19 @@ static void task_pool_push(
task->freedata = freedata;
task->pool = pool;
+ if (thread_id != -1 &&
+ (thread_id != pool->thread_id || pool->do_work))
+ {
+ ASSERT_THREAD_ID(pool->scheduler, thread_id);
+
+ TaskThreadLocalStorage *tls = get_task_tls(pool, thread_id);
+ if (tls->num_local_queue < LOCALQUEUE_SIZE) {
+ tls->local_queue[tls->num_local_queue] = task;
+ tls->num_local_queue++;
+ return;
+ }
+ }
+
task_scheduler_push(pool->scheduler, task, priority);
}
@@ -627,14 +690,12 @@ void BLI_task_pool_push_from_thread(TaskPool *pool, TaskRunFunction run,
void BLI_task_pool_work_and_wait(TaskPool *pool)
{
+ TaskThreadLocalStorage *tls = get_task_tls(pool, pool->thread_id);
TaskScheduler *scheduler = pool->scheduler;
-#ifndef NDEBUG
- if (!BLI_thread_is_main()) {
- TaskThread *thread = pthread_getspecific(scheduler->tls_id_key);
- BLI_assert(pool->thread_id == thread->id);
- }
-#endif
+ pool->do_work = true;
+
+ ASSERT_THREAD_ID(pool->scheduler, pool->thread_id);
BLI_mutex_lock(&pool->num_mutex);
@@ -668,6 +729,9 @@ void BLI_task_pool_work_and_wait(TaskPool *pool)
/* delete task */
task_free(pool, task, pool->thread_id);
+ /* Handle all tasks from local queue. */
+ handle_local_queue(tls, pool->thread_id);
+
/* notify pool task was done */
task_pool_num_decrease(pool, 1);
}