diff options
Diffstat (limited to 'source/blender/blenlib')
-rw-r--r-- | source/blender/blenlib/BLI_task.h | 108 | ||||
-rw-r--r-- | source/blender/blenlib/BLI_threads.h | 32 | ||||
-rw-r--r-- | source/blender/blenlib/CMakeLists.txt | 2 | ||||
-rw-r--r-- | source/blender/blenlib/intern/task.c | 424 | ||||
-rw-r--r-- | source/blender/blenlib/intern/threads.c | 114 |
5 files changed, 583 insertions, 97 deletions
diff --git a/source/blender/blenlib/BLI_task.h b/source/blender/blenlib/BLI_task.h new file mode 100644 index 00000000000..f57d42858c7 --- /dev/null +++ b/source/blender/blenlib/BLI_task.h @@ -0,0 +1,108 @@ +/* + * ***** BEGIN GPL LICENSE BLOCK ***** + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software Foundation, + * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + * + * ***** END GPL LICENSE BLOCK ***** + */ + +#ifndef __BLI_TASK_H__ +#define __BLI_TASK_H__ + +/** \file BLI_task.h + * \ingroup bli + */ + +#ifdef __cplusplus +extern "C" { +#endif + +#include "BLI_threads.h" +#include "BLI_utildefines.h" + +/* Task Scheduler + * + * Central scheduler that holds running threads ready to execute tasks. A single + * queue holds the task from all pools. + * + * Init/exit must be called before/after any task pools are created/freed, and + * must be called from the main threads. All other scheduler and pool functions + * are thread-safe. */ + +typedef struct TaskScheduler TaskScheduler; + +enum { + TASK_SCHEDULER_AUTO_THREADS = 0, + TASK_SCHEDULER_SINGLE_THREAD = 1 +}; + +TaskScheduler *BLI_task_scheduler_create(int num_threads); +void BLI_task_scheduler_free(TaskScheduler *scheduler); + +int BLI_task_scheduler_num_threads(TaskScheduler *scheduler); + +/* Task Pool + * + * Pool of tasks that will be executed by the central TaskScheduler. For each + * pool, we can wait for all tasks to be done, or cancel them before they are + * done. + * + * Running tasks may spawn new tasks. + * + * Pools may be nested, i.e. a thread running a task can create another task + * pool with smaller tasks. When other threads are busy they will continue + * working on their own tasks, if not they will join in, no new threads will + * be launched. + */ + +typedef enum TaskPriority { + TASK_PRIORITY_LOW, + TASK_PRIORITY_HIGH +} TaskPriority; + +typedef struct TaskPool TaskPool; +typedef void (*TaskRunFunction)(TaskPool *pool, void *taskdata, int threadid); + +TaskPool *BLI_task_pool_create(TaskScheduler *scheduler, void *userdata); +void BLI_task_pool_free(TaskPool *pool); + +void BLI_task_pool_push(TaskPool *pool, TaskRunFunction run, + void *taskdata, bool free_taskdata, TaskPriority priority); + +/* work and wait until all tasks are done */ +void BLI_task_pool_work_and_wait(TaskPool *pool); +/* cancel all tasks, keep worker threads running */ +void BLI_task_pool_cancel(TaskPool *pool); +/* stop all worker threads */ +void BLI_task_pool_stop(TaskPool *pool); + +/* for worker threads, test if cancelled */ +bool BLI_task_pool_cancelled(TaskPool *pool); + +/* optional userdata pointer to pass along to run function */ +void *BLI_task_pool_userdata(TaskPool *pool); + +/* optional mutex to use from run function */ +ThreadMutex *BLI_task_pool_user_mutex(TaskPool *pool); + +/* number of tasks done, for stats, don't use this to make decisions */ +size_t BLI_task_pool_tasks_done(TaskPool *pool); + +#ifdef __cplusplus +} +#endif + +#endif + diff --git a/source/blender/blenlib/BLI_threads.h b/source/blender/blenlib/BLI_threads.h index 38ad0827246..7a3ee1dd0f4 100644 --- a/source/blender/blenlib/BLI_threads.h +++ b/source/blender/blenlib/BLI_threads.h @@ -45,6 +45,7 @@ extern "C" { #define BLENDER_MAX_THREADS 64 struct ListBase; +struct TaskScheduler; /* Threading API */ @@ -52,6 +53,8 @@ struct ListBase; void BLI_threadapi_init(void); void BLI_threadapi_exit(void); +struct TaskScheduler *BLI_task_scheduler_get(void); + void BLI_init_threads(struct ListBase *threadbase, void *(*do_thread)(void *), int tot); int BLI_available_threads(struct ListBase *threadbase); int BLI_available_thread_index(struct ListBase *threadbase); @@ -101,6 +104,7 @@ ThreadMutex *BLI_mutex_alloc(void); void BLI_mutex_free(ThreadMutex *mutex); void BLI_mutex_lock(ThreadMutex *mutex); +bool BLI_mutex_trylock(ThreadMutex *mutex); void BLI_mutex_unlock(ThreadMutex *mutex); /* Spin Lock */ @@ -144,27 +148,15 @@ void BLI_ticket_mutex_free(TicketMutex *ticket); void BLI_ticket_mutex_lock(TicketMutex *ticket); void BLI_ticket_mutex_unlock(TicketMutex *ticket); -/* ThreadedWorker - * - * A simple tool for dispatching work to a limited number of threads - * in a transparent fashion from the caller's perspective. */ - -struct ThreadedWorker; - -/* Create a new worker supporting tot parallel threads. - * When new work in inserted and all threads are busy, sleep(sleep_time) before checking again - */ -struct ThreadedWorker *BLI_create_worker(void *(*do_thread)(void *), int tot, int sleep_time); - -/* join all working threads */ -void BLI_end_worker(struct ThreadedWorker *worker); - -/* also ends all working threads */ -void BLI_destroy_worker(struct ThreadedWorker *worker); +/* Condition */ + +typedef pthread_cond_t ThreadCondition; -/* Spawns a new work thread if possible, sleeps until one is available otherwise - * NOTE: inserting work is NOT thread safe, so make sure it is only done from one thread */ -void BLI_insert_work(struct ThreadedWorker *worker, void *param); +void BLI_condition_init(ThreadCondition *cond); +void BLI_condition_wait(ThreadCondition *cond, ThreadMutex *mutex); +void BLI_condition_notify_one(ThreadCondition *cond); +void BLI_condition_notify_all(ThreadCondition *cond); +void BLI_condition_end(ThreadCondition *cond); /* ThreadWorkQueue * diff --git a/source/blender/blenlib/CMakeLists.txt b/source/blender/blenlib/CMakeLists.txt index 65ba545ef13..d855d45760a 100644 --- a/source/blender/blenlib/CMakeLists.txt +++ b/source/blender/blenlib/CMakeLists.txt @@ -94,6 +94,7 @@ set(SRC intern/string.c intern/string_cursor_utf8.c intern/string_utf8.c + intern/task.c intern/threads.c intern/time.c intern/uvproject.c @@ -160,6 +161,7 @@ set(SRC BLI_string_cursor_utf8.h BLI_string_utf8.h BLI_sys_types.h + BLI_task.h BLI_threads.h BLI_utildefines.h BLI_uvproject.h diff --git a/source/blender/blenlib/intern/task.c b/source/blender/blenlib/intern/task.c new file mode 100644 index 00000000000..7fa108b906f --- /dev/null +++ b/source/blender/blenlib/intern/task.c @@ -0,0 +1,424 @@ +/* + * ***** BEGIN GPL LICENSE BLOCK ***** + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software Foundation, + * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + * + * ***** END GPL LICENSE BLOCK ***** + */ + +#include <stdlib.h> + +#include "MEM_guardedalloc.h" + +#include "BLI_listbase.h" +#include "BLI_task.h" +#include "BLI_threads.h" + +/* Types */ + +typedef struct Task { + struct Task *next, *prev; + + TaskRunFunction run; + void *taskdata; + bool free_taskdata; + TaskPool *pool; +} Task; + +struct TaskPool { + TaskScheduler *scheduler; + + volatile size_t num; + volatile size_t done; + ThreadMutex num_mutex; + ThreadCondition num_cond; + + void *userdata; + ThreadMutex user_mutex; + + volatile bool do_cancel; +}; + +struct TaskScheduler { + pthread_t *threads; + struct TaskThread *task_threads; + int num_threads; + + ListBase queue; + ThreadMutex queue_mutex; + ThreadCondition queue_cond; + + volatile bool do_exit; +}; + +typedef struct TaskThread { + TaskScheduler *scheduler; + int id; +} TaskThread; + +/* Task Scheduler */ + +static void task_pool_num_decrease(TaskPool *pool, size_t done) +{ + BLI_mutex_lock(&pool->num_mutex); + + BLI_assert(pool->num >= done); + + pool->num -= done; + pool->done += done; + + if (pool->num == 0) + BLI_condition_notify_all(&pool->num_cond); + + BLI_mutex_unlock(&pool->num_mutex); +} + +static void task_pool_num_increase(TaskPool *pool) +{ + BLI_mutex_lock(&pool->num_mutex); + + pool->num++; + BLI_condition_notify_all(&pool->num_cond); + + BLI_mutex_unlock(&pool->num_mutex); +} + +static bool task_scheduler_thread_wait_pop(TaskScheduler *scheduler, Task **task) +{ + BLI_mutex_lock(&scheduler->queue_mutex); + + while (!scheduler->queue.first && !scheduler->do_exit) + BLI_condition_wait(&scheduler->queue_cond, &scheduler->queue_mutex); + + if (!scheduler->queue.first) { + BLI_mutex_unlock(&scheduler->queue_mutex); + BLI_assert(scheduler->do_exit); + return false; + } + + *task = scheduler->queue.first; + BLI_remlink(&scheduler->queue, *task); + + BLI_mutex_unlock(&scheduler->queue_mutex); + + return true; +} + +static void *task_scheduler_thread_run(void *thread_p) +{ + TaskThread *thread = (TaskThread *) thread_p; + TaskScheduler *scheduler = thread->scheduler; + int thread_id = thread->id; + Task *task; + + /* keep popping off tasks */ + while (task_scheduler_thread_wait_pop(scheduler, &task)) { + TaskPool *pool = task->pool; + + /* run task */ + task->run(pool, task->taskdata, thread_id); + + /* delete task */ + if (task->free_taskdata) + MEM_freeN(task->taskdata); + MEM_freeN(task); + + /* notify pool task was done */ + task_pool_num_decrease(pool, 1); + } + + return NULL; +} + +TaskScheduler *BLI_task_scheduler_create(int num_threads) +{ + TaskScheduler *scheduler = MEM_callocN(sizeof(TaskScheduler), "TaskScheduler"); + + /* multiple places can use this task scheduler, sharing the same + * threads, so we keep track of the number of users. */ + scheduler->do_exit = false; + + scheduler->queue.first = scheduler->queue.last = NULL; + BLI_mutex_init(&scheduler->queue_mutex); + BLI_condition_init(&scheduler->queue_cond); + + if (num_threads == 0) { + /* automatic number of threads will be main thread + num cores */ + num_threads = BLI_system_thread_count(); + } + + /* main thread will also work, so we count it too */ + num_threads -= 1; + + /* launch threads that will be waiting for work */ + if (num_threads > 0) { + int i; + + scheduler->num_threads = num_threads; + scheduler->threads = MEM_callocN(sizeof(pthread_t) * num_threads, "TaskScheduler threads"); + scheduler->task_threads = MEM_callocN(sizeof(TaskThread) * num_threads, "TaskScheduler task threads"); + + for (i = 0; i < num_threads; i++) { + TaskThread *thread = &scheduler->task_threads[i]; + thread->scheduler = scheduler; + thread->id = i + 1; + + if (pthread_create(&scheduler->threads[i], NULL, task_scheduler_thread_run, thread) != 0) { + fprintf(stderr, "TaskScheduler failed to launch thread %d/%d\n", i, num_threads); + MEM_freeN(thread); + } + } + } + + return scheduler; +} + +void BLI_task_scheduler_free(TaskScheduler *scheduler) +{ + Task *task; + + /* stop all waiting threads */ + BLI_mutex_lock(&scheduler->queue_mutex); + scheduler->do_exit = true; + BLI_condition_notify_all(&scheduler->queue_cond); + BLI_mutex_unlock(&scheduler->queue_mutex); + + /* delete threads */ + if (scheduler->threads) { + int i; + + for (i = 0; i < scheduler->num_threads; i++) { + if (pthread_join(scheduler->threads[i], NULL) != 0) + fprintf(stderr, "TaskScheduler failed to join thread %d/%d\n", i, scheduler->num_threads); + } + + MEM_freeN(scheduler->threads); + } + + /* Delete task thread data */ + if (scheduler->task_threads) { + MEM_freeN(scheduler->task_threads); + } + + /* delete leftover tasks */ + for (task = scheduler->queue.first; task; task = task->next) { + if (task->free_taskdata) + MEM_freeN(task->taskdata); + } + BLI_freelistN(&scheduler->queue); + + /* delete mutex/condition */ + BLI_mutex_end(&scheduler->queue_mutex); + BLI_condition_end(&scheduler->queue_cond); + + MEM_freeN(scheduler); +} + +int BLI_task_scheduler_num_threads(TaskScheduler *scheduler) +{ + return scheduler->num_threads + 1; +} + +static void task_scheduler_push(TaskScheduler *scheduler, Task *task, TaskPriority priority) +{ + task_pool_num_increase(task->pool); + + /* add task to queue */ + BLI_mutex_lock(&scheduler->queue_mutex); + + if (priority == TASK_PRIORITY_HIGH) + BLI_addhead(&scheduler->queue, task); + else + BLI_addtail(&scheduler->queue, task); + + BLI_condition_notify_one(&scheduler->queue_cond); + BLI_mutex_unlock(&scheduler->queue_mutex); +} + +static void task_scheduler_clear(TaskScheduler *scheduler, TaskPool *pool) +{ + Task *task, *nexttask; + size_t done = 0; + + BLI_mutex_lock(&scheduler->queue_mutex); + + /* free all tasks from this pool from the queue */ + for (task = scheduler->queue.first; task; task = nexttask) { + nexttask = task->next; + + if (task->pool == pool) { + if (task->free_taskdata) + MEM_freeN(task->taskdata); + BLI_freelinkN(&scheduler->queue, task); + + done++; + } + } + + BLI_mutex_unlock(&scheduler->queue_mutex); + + /* notify done */ + task_pool_num_decrease(pool, done); +} + +/* Task Pool */ + +TaskPool *BLI_task_pool_create(TaskScheduler *scheduler, void *userdata) +{ + TaskPool *pool = MEM_callocN(sizeof(TaskPool), "TaskPool"); + + pool->scheduler = scheduler; + pool->num = 0; + pool->do_cancel = false; + + BLI_mutex_init(&pool->num_mutex); + BLI_condition_init(&pool->num_cond); + + pool->userdata = userdata; + BLI_mutex_init(&pool->user_mutex); + + /* Ensure malloc will go fine from threads, + * + * This is needed because we could be in main thread here + * and malloc could be non-threda safe at this point because + * no other jobs are running. + */ + BLI_begin_threaded_malloc(); + + return pool; +} + +void BLI_task_pool_free(TaskPool *pool) +{ + BLI_task_pool_stop(pool); + + BLI_mutex_end(&pool->num_mutex); + BLI_condition_end(&pool->num_cond); + + BLI_mutex_end(&pool->user_mutex); + + MEM_freeN(pool); + + BLI_end_threaded_malloc(); +} + +void BLI_task_pool_push(TaskPool *pool, TaskRunFunction run, + void *taskdata, bool free_taskdata, TaskPriority priority) +{ + Task *task = MEM_callocN(sizeof(Task), "Task"); + + task->run = run; + task->taskdata = taskdata; + task->free_taskdata = free_taskdata; + task->pool = pool; + + task_scheduler_push(pool->scheduler, task, priority); +} + +void BLI_task_pool_work_and_wait(TaskPool *pool) +{ + TaskScheduler *scheduler = pool->scheduler; + + BLI_mutex_lock(&pool->num_mutex); + + while (pool->num != 0) { + Task *task, *work_task = NULL; + bool found_task = false; + + BLI_mutex_unlock(&pool->num_mutex); + + BLI_mutex_lock(&scheduler->queue_mutex); + + /* find task from this pool. if we get a task from another pool, + * we can get into deadlock */ + + for (task = scheduler->queue.first; task; task = task->next) { + if (task->pool == pool) { + work_task = task; + found_task = true; + BLI_remlink(&scheduler->queue, task); + break; + } + } + + BLI_mutex_unlock(&scheduler->queue_mutex); + + /* if found task, do it, otherwise wait until other tasks are done */ + if (found_task) { + /* run task */ + work_task->run(pool, work_task->taskdata, 0); + + /* delete task */ + if (work_task->free_taskdata) + MEM_freeN(work_task->taskdata); + MEM_freeN(work_task); + + /* notify pool task was done */ + task_pool_num_decrease(pool, 1); + } + + BLI_mutex_lock(&pool->num_mutex); + if (pool->num == 0) + break; + + if (!found_task) + BLI_condition_wait(&pool->num_cond, &pool->num_mutex); + } + + BLI_mutex_unlock(&pool->num_mutex); +} + +void BLI_task_pool_cancel(TaskPool *pool) +{ + pool->do_cancel = true; + + task_scheduler_clear(pool->scheduler, pool); + + /* wait until all entries are cleared */ + BLI_mutex_lock(&pool->num_mutex); + while (pool->num) + BLI_condition_wait(&pool->num_cond, &pool->num_mutex); + BLI_mutex_unlock(&pool->num_mutex); + + pool->do_cancel = false; +} + +void BLI_task_pool_stop(TaskPool *pool) +{ + task_scheduler_clear(pool->scheduler, pool); + + BLI_assert(pool->num == 0); +} + +bool BLI_task_pool_cancelled(TaskPool *pool) +{ + return pool->do_cancel; +} + +void *BLI_task_pool_userdata(TaskPool *pool) +{ + return pool->userdata; +} + +ThreadMutex *BLI_task_pool_user_mutex(TaskPool *pool) +{ + return &pool->user_mutex; +} + +size_t BLI_task_pool_tasks_done(TaskPool *pool) +{ + return pool->done; +} + diff --git a/source/blender/blenlib/intern/threads.c b/source/blender/blenlib/intern/threads.c index c8b84d9310a..64682965649 100644 --- a/source/blender/blenlib/intern/threads.c +++ b/source/blender/blenlib/intern/threads.c @@ -37,6 +37,7 @@ #include "BLI_listbase.h" #include "BLI_gsqueue.h" +#include "BLI_task.h" #include "BLI_threads.h" #include "PIL_time.h" @@ -63,6 +64,9 @@ extern pthread_key_t gomp_tls_key; static void *thread_tls_data; #endif +/* We're using one global task scheduler for all kind of tasks. */ +static TaskScheduler *task_scheduler = NULL; + /* ********** basic thread control API ************ * * Many thread cases have an X amount of jobs, and only an Y amount of @@ -151,9 +155,26 @@ void BLI_threadapi_init(void) void BLI_threadapi_exit(void) { + if (task_scheduler) { + BLI_task_scheduler_free(task_scheduler); + } BLI_spin_end(&_malloc_lock); } +TaskScheduler *BLI_task_scheduler_get(void) +{ + if (task_scheduler == NULL) { + int tot_thread = BLI_system_thread_count(); + + /* Do a lazy initialization, so it happes after + * command line arguments parsing + */ + task_scheduler = BLI_task_scheduler_create(tot_thread); + } + + return task_scheduler; +} + /* tot = 0 only initializes malloc mutex in a safe way (see sequence.c) * problem otherwise: scene render will kill of the mutex! */ @@ -419,6 +440,11 @@ void BLI_mutex_unlock(ThreadMutex *mutex) pthread_mutex_unlock(mutex); } +bool BLI_mutex_trylock(ThreadMutex *mutex) +{ + return (pthread_mutex_trylock(mutex) == 0); +} + void BLI_mutex_end(ThreadMutex *mutex) { pthread_mutex_destroy(mutex); @@ -563,97 +589,31 @@ void BLI_ticket_mutex_unlock(TicketMutex *ticket) /* ************************************************ */ -typedef struct ThreadedWorker { - ListBase threadbase; - void *(*work_fnct)(void *); - char busy[RE_MAX_THREAD]; - int total; - int sleep_time; -} ThreadedWorker; - -typedef struct WorkParam { - ThreadedWorker *worker; - void *param; - int index; -} WorkParam; +/* Condition */ -static void *exec_work_fnct(void *v_param) +void BLI_condition_init(ThreadCondition *cond) { - WorkParam *p = (WorkParam *)v_param; - void *value; - - value = p->worker->work_fnct(p->param); - - p->worker->busy[p->index] = 0; - MEM_freeN(p); - - return value; + pthread_cond_init(cond, NULL); } -ThreadedWorker *BLI_create_worker(void *(*do_thread)(void *), int tot, int sleep_time) +void BLI_condition_wait(ThreadCondition *cond, ThreadMutex *mutex) { - ThreadedWorker *worker; - - (void)sleep_time; /* unused */ - - worker = MEM_callocN(sizeof(ThreadedWorker), "threadedworker"); - - if (tot > RE_MAX_THREAD) { - tot = RE_MAX_THREAD; - } - else if (tot < 1) { - tot = 1; - } - - worker->total = tot; - worker->work_fnct = do_thread; - - BLI_init_threads(&worker->threadbase, exec_work_fnct, tot); - - return worker; + pthread_cond_wait(cond, mutex); } -void BLI_end_worker(ThreadedWorker *worker) +void BLI_condition_notify_one(ThreadCondition *cond) { - BLI_remove_threads(&worker->threadbase); + pthread_cond_signal(cond); } -void BLI_destroy_worker(ThreadedWorker *worker) +void BLI_condition_notify_all(ThreadCondition *cond) { - BLI_end_worker(worker); - BLI_freelistN(&worker->threadbase); - MEM_freeN(worker); + pthread_cond_broadcast(cond); } -void BLI_insert_work(ThreadedWorker *worker, void *param) +void BLI_condition_end(ThreadCondition *cond) { - WorkParam *p = MEM_callocN(sizeof(WorkParam), "workparam"); - int index; - - if (BLI_available_threads(&worker->threadbase) == 0) { - index = worker->total; - while (index == worker->total) { - PIL_sleep_ms(worker->sleep_time); - - for (index = 0; index < worker->total; index++) { - if (worker->busy[index] == 0) { - BLI_remove_thread_index(&worker->threadbase, index); - break; - } - } - } - } - else { - index = BLI_available_thread_index(&worker->threadbase); - } - - worker->busy[index] = 1; - - p->param = param; - p->index = index; - p->worker = worker; - - BLI_insert_thread(&worker->threadbase, p); + pthread_cond_destroy(cond); } /* ************************************************ */ |