Welcome to mirror list, hosted at ThFree Co, Russian Federation.

git.blender.org/blender.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
Diffstat (limited to 'source/blender/blenlib/intern/task_pool.cc')
-rw-r--r--source/blender/blenlib/intern/task_pool.cc546
1 files changed, 546 insertions, 0 deletions
diff --git a/source/blender/blenlib/intern/task_pool.cc b/source/blender/blenlib/intern/task_pool.cc
new file mode 100644
index 00000000000..cf328ec407c
--- /dev/null
+++ b/source/blender/blenlib/intern/task_pool.cc
@@ -0,0 +1,546 @@
+/*
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License
+ * as published by the Free Software Foundation; either version 2
+ * of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ */
+
+/** \file
+ * \ingroup bli
+ *
+ * Task pool to run tasks in parallel.
+ */
+
+#include <memory>
+#include <stdlib.h>
+#include <utility>
+
+#include "MEM_guardedalloc.h"
+
+#include "DNA_listBase.h"
+
+#include "BLI_math.h"
+#include "BLI_mempool.h"
+#include "BLI_task.h"
+#include "BLI_threads.h"
+
+#ifdef WITH_TBB
+/* Quiet top level deprecation message, unrelated to API usage here. */
+# define TBB_SUPPRESS_DEPRECATED_MESSAGES 1
+# include <tbb/tbb.h>
+#endif
+
+/* Task
+ *
+ * Unit of work to execute. This is a C++ class to work with TBB. */
+
+class Task {
+ public:
+ TaskPool *pool;
+ TaskRunFunction run;
+ void *taskdata;
+ bool free_taskdata;
+ TaskFreeFunction freedata;
+
+ Task(TaskPool *pool,
+ TaskRunFunction run,
+ void *taskdata,
+ bool free_taskdata,
+ TaskFreeFunction freedata)
+ : pool(pool), run(run), taskdata(taskdata), free_taskdata(free_taskdata), freedata(freedata)
+ {
+ }
+
+ ~Task()
+ {
+ if (free_taskdata) {
+ if (freedata) {
+ freedata(pool, taskdata);
+ }
+ else {
+ MEM_freeN(taskdata);
+ }
+ }
+ }
+
+ /* Move constructor.
+ * For performance, ensure we never copy the task and only move it.
+ * For TBB version 2017 and earlier we apply a workaround to make up for
+ * the lack of move constructor support. */
+ Task(Task &&other)
+ : pool(other.pool),
+ run(other.run),
+ taskdata(other.taskdata),
+ free_taskdata(other.free_taskdata),
+ freedata(other.freedata)
+ {
+ other.pool = NULL;
+ other.run = NULL;
+ other.taskdata = NULL;
+ other.free_taskdata = false;
+ other.freedata = NULL;
+ }
+
+#if defined(WITH_TBB) && TBB_INTERFACE_VERSION_MAJOR < 10
+ Task(const Task &other)
+ : pool(other.pool),
+ run(other.run),
+ taskdata(other.taskdata),
+ free_taskdata(other.free_taskdata),
+ freedata(other.freedata)
+ {
+ ((Task &)other).pool = NULL;
+ ((Task &)other).run = NULL;
+ ((Task &)other).taskdata = NULL;
+ ((Task &)other).free_taskdata = false;
+ ((Task &)other).freedata = NULL;
+ }
+#else
+ Task(const Task &other) = delete;
+#endif
+
+ Task &operator=(const Task &other) = delete;
+ Task &operator=(Task &&other) = delete;
+
+ /* Execute task. */
+ void operator()() const
+ {
+#ifdef WITH_TBB
+ tbb::this_task_arena::isolate([this] { run(pool, taskdata); });
+#else
+ run(pool, taskdata);
+#endif
+ }
+};
+
+/* TBB Task Group.
+ *
+ * Subclass since there seems to be no other way to set priority. */
+
+#ifdef WITH_TBB
+class TBBTaskGroup : public tbb::task_group {
+ public:
+ TBBTaskGroup(TaskPriority priority)
+ {
+ switch (priority) {
+ case TASK_PRIORITY_LOW:
+ my_context.set_priority(tbb::priority_low);
+ break;
+ case TASK_PRIORITY_HIGH:
+ my_context.set_priority(tbb::priority_normal);
+ break;
+ }
+ }
+
+ ~TBBTaskGroup()
+ {
+ }
+};
+#endif
+
+/* Task Pool */
+
+typedef enum TaskPoolType {
+ TASK_POOL_TBB,
+ TASK_POOL_TBB_SUSPENDED,
+ TASK_POOL_NO_THREADS,
+ TASK_POOL_BACKGROUND,
+ TASK_POOL_BACKGROUND_SERIAL,
+} TaskPoolType;
+
+struct TaskPool {
+ TaskPoolType type;
+ bool use_threads;
+
+ ThreadMutex user_mutex;
+ void *userdata;
+
+ /* TBB task pool. */
+#ifdef WITH_TBB
+ TBBTaskGroup tbb_group;
+#endif
+ volatile bool is_suspended;
+ BLI_mempool *suspended_mempool;
+
+ /* Background task pool. */
+ ListBase background_threads;
+ ThreadQueue *background_queue;
+ volatile bool background_is_canceling;
+};
+
+/* TBB Task Pool.
+ *
+ * Task pool using the TBB scheduler for tasks. When building without TBB
+ * support or running Blender with -t 1, this reverts to single threaded.
+ *
+ * Tasks may be suspended until in all are created, to make it possible to
+ * initialize data structures and create tasks in a single pass. */
+
+static void tbb_task_pool_create(TaskPool *pool, TaskPriority priority)
+{
+ if (pool->type == TASK_POOL_TBB_SUSPENDED) {
+ pool->is_suspended = true;
+ pool->suspended_mempool = BLI_mempool_create(sizeof(Task), 512, 512, BLI_MEMPOOL_ALLOW_ITER);
+ }
+
+#ifdef WITH_TBB
+ if (pool->use_threads) {
+ new (&pool->tbb_group) TBBTaskGroup(priority);
+ }
+#else
+ UNUSED_VARS(priority);
+#endif
+}
+
+static void tbb_task_pool_run(TaskPool *pool, Task &&task)
+{
+ if (pool->is_suspended) {
+ /* Suspended task that will be executed in work_and_wait(). */
+ Task *task_mem = (Task *)BLI_mempool_alloc(pool->suspended_mempool);
+ new (task_mem) Task(std::move(task));
+#ifdef __GNUC__
+ /* Work around apparent compiler bug where task is not properly copied
+ * to task_mem. This appears unrelated to the use of placement new or
+ * move semantics, happens even writing to a plain C struct. Rather the
+ * call into TBB seems to have some indirect effect. */
+ std::atomic_thread_fence(std::memory_order_release);
+#endif
+ }
+#ifdef WITH_TBB
+ else if (pool->use_threads) {
+ /* Execute in TBB task group. */
+ pool->tbb_group.run(std::move(task));
+ }
+#endif
+ else {
+ /* Execute immediately. */
+ task();
+ }
+}
+
+static void tbb_task_pool_work_and_wait(TaskPool *pool)
+{
+ /* Start any suspended task now. */
+ if (pool->suspended_mempool) {
+ pool->is_suspended = false;
+
+ BLI_mempool_iter iter;
+ BLI_mempool_iternew(pool->suspended_mempool, &iter);
+ while (Task *task = (Task *)BLI_mempool_iterstep(&iter)) {
+ tbb_task_pool_run(pool, std::move(*task));
+ }
+
+ BLI_mempool_clear(pool->suspended_mempool);
+ }
+
+#ifdef WITH_TBB
+ if (pool->use_threads) {
+ /* This is called wait(), but internally it can actually do work. This
+ * matters because we don't want recursive usage of task pools to run
+ * out of threads and get stuck. */
+ pool->tbb_group.wait();
+ }
+#endif
+}
+
+static void tbb_task_pool_cancel(TaskPool *pool)
+{
+#ifdef WITH_TBB
+ if (pool->use_threads) {
+ pool->tbb_group.cancel();
+ pool->tbb_group.wait();
+ }
+#else
+ UNUSED_VARS(pool);
+#endif
+}
+
+static bool tbb_task_pool_canceled(TaskPool *pool)
+{
+#ifdef WITH_TBB
+ if (pool->use_threads) {
+ return pool->tbb_group.is_canceling();
+ }
+#else
+ UNUSED_VARS(pool);
+#endif
+
+ return false;
+}
+
+static void tbb_task_pool_free(TaskPool *pool)
+{
+#ifdef WITH_TBB
+ if (pool->use_threads) {
+ pool->tbb_group.~TBBTaskGroup();
+ }
+#endif
+
+ if (pool->suspended_mempool) {
+ BLI_mempool_destroy(pool->suspended_mempool);
+ }
+}
+
+/* Background Task Pool.
+ *
+ * Fallback for running background tasks when building without TBB. */
+
+static void *background_task_run(void *userdata)
+{
+ TaskPool *pool = (TaskPool *)userdata;
+ while (Task *task = (Task *)BLI_thread_queue_pop(pool->background_queue)) {
+ (*task)();
+ task->~Task();
+ MEM_freeN(task);
+ }
+ return NULL;
+}
+
+static void background_task_pool_create(TaskPool *pool)
+{
+ pool->background_queue = BLI_thread_queue_init();
+ BLI_threadpool_init(&pool->background_threads, background_task_run, 1);
+}
+
+static void background_task_pool_run(TaskPool *pool, Task &&task)
+{
+ Task *task_mem = (Task *)MEM_mallocN(sizeof(Task), __func__);
+ new (task_mem) Task(std::move(task));
+ BLI_thread_queue_push(pool->background_queue, task_mem);
+
+ if (BLI_available_threads(&pool->background_threads)) {
+ BLI_threadpool_insert(&pool->background_threads, pool);
+ }
+}
+
+static void background_task_pool_work_and_wait(TaskPool *pool)
+{
+ /* Signal background thread to stop waiting for new tasks if none are
+ * left, and wait for tasks and thread to finish. */
+ BLI_thread_queue_nowait(pool->background_queue);
+ BLI_thread_queue_wait_finish(pool->background_queue);
+ BLI_threadpool_clear(&pool->background_threads);
+}
+
+static void background_task_pool_cancel(TaskPool *pool)
+{
+ pool->background_is_canceling = true;
+
+ /* Remove tasks not yet started by background thread. */
+ BLI_thread_queue_nowait(pool->background_queue);
+ while (Task *task = (Task *)BLI_thread_queue_pop(pool->background_queue)) {
+ task->~Task();
+ MEM_freeN(task);
+ }
+
+ /* Let background thread finish or cancel task it is working on. */
+ BLI_threadpool_remove(&pool->background_threads, pool);
+ pool->background_is_canceling = false;
+}
+
+static bool background_task_pool_canceled(TaskPool *pool)
+{
+ return pool->background_is_canceling;
+}
+
+static void background_task_pool_free(TaskPool *pool)
+{
+ background_task_pool_work_and_wait(pool);
+
+ BLI_threadpool_end(&pool->background_threads);
+ BLI_thread_queue_free(pool->background_queue);
+}
+
+/* Task Pool */
+
+static TaskPool *task_pool_create_ex(void *userdata, TaskPoolType type, TaskPriority priority)
+{
+ const bool use_threads = BLI_task_scheduler_num_threads() > 1 && type != TASK_POOL_NO_THREADS;
+
+ /* Background task pool uses regular TBB scheduling if available. Only when
+ * building without TBB or running with -t 1 do we need to ensure these tasks
+ * do not block the main thread. */
+ if (type == TASK_POOL_BACKGROUND && use_threads) {
+ type = TASK_POOL_TBB;
+ }
+
+ /* Allocate task pool. */
+ TaskPool *pool = (TaskPool *)MEM_callocN(sizeof(TaskPool), "TaskPool");
+
+ pool->type = type;
+ pool->use_threads = use_threads;
+
+ pool->userdata = userdata;
+ BLI_mutex_init(&pool->user_mutex);
+
+ switch (type) {
+ case TASK_POOL_TBB:
+ case TASK_POOL_TBB_SUSPENDED:
+ case TASK_POOL_NO_THREADS:
+ tbb_task_pool_create(pool, priority);
+ break;
+ case TASK_POOL_BACKGROUND:
+ case TASK_POOL_BACKGROUND_SERIAL:
+ background_task_pool_create(pool);
+ break;
+ }
+
+ return pool;
+}
+
+/**
+ * Create a normal task pool. Tasks will be executed as soon as they are added.
+ */
+TaskPool *BLI_task_pool_create(void *userdata, TaskPriority priority)
+{
+ return task_pool_create_ex(userdata, TASK_POOL_TBB, priority);
+}
+
+/**
+ * Create a background task pool.
+ * In multi-threaded context, there is no differences with #BLI_task_pool_create(),
+ * but in single-threaded case it is ensured to have at least one worker thread to run on
+ * (i.e. you don't have to call #BLI_task_pool_work_and_wait
+ * on it to be sure it will be processed).
+ *
+ * \note Background pools are non-recursive
+ * (that is, you should not create other background pools in tasks assigned to a background pool,
+ * they could end never being executed, since the 'fallback' background thread is already
+ * busy with parent task in single-threaded context).
+ */
+TaskPool *BLI_task_pool_create_background(void *userdata, TaskPriority priority)
+{
+ return task_pool_create_ex(userdata, TASK_POOL_BACKGROUND, priority);
+}
+
+/**
+ * Similar to BLI_task_pool_create() but does not schedule any tasks for execution
+ * for until BLI_task_pool_work_and_wait() is called. This helps reducing threading
+ * overhead when pushing huge amount of small initial tasks from the main thread.
+ */
+TaskPool *BLI_task_pool_create_suspended(void *userdata, TaskPriority priority)
+{
+ return task_pool_create_ex(userdata, TASK_POOL_TBB_SUSPENDED, priority);
+}
+
+/**
+ * Single threaded task pool that executes pushed task immediately, for
+ * debugging purposes.
+ */
+TaskPool *BLI_task_pool_create_no_threads(void *userdata)
+{
+ return task_pool_create_ex(userdata, TASK_POOL_NO_THREADS, TASK_PRIORITY_HIGH);
+}
+
+/**
+ * Task pool that executes one task after the other, possibly on different threads
+ * but never in parallel.
+ */
+TaskPool *BLI_task_pool_create_background_serial(void *userdata, TaskPriority priority)
+{
+ return task_pool_create_ex(userdata, TASK_POOL_BACKGROUND_SERIAL, priority);
+}
+
+void BLI_task_pool_free(TaskPool *pool)
+{
+ switch (pool->type) {
+ case TASK_POOL_TBB:
+ case TASK_POOL_TBB_SUSPENDED:
+ case TASK_POOL_NO_THREADS:
+ tbb_task_pool_free(pool);
+ break;
+ case TASK_POOL_BACKGROUND:
+ case TASK_POOL_BACKGROUND_SERIAL:
+ background_task_pool_free(pool);
+ break;
+ }
+
+ BLI_mutex_end(&pool->user_mutex);
+
+ MEM_freeN(pool);
+}
+
+void BLI_task_pool_push(TaskPool *pool,
+ TaskRunFunction run,
+ void *taskdata,
+ bool free_taskdata,
+ TaskFreeFunction freedata)
+{
+ Task task(pool, run, taskdata, free_taskdata, freedata);
+
+ switch (pool->type) {
+ case TASK_POOL_TBB:
+ case TASK_POOL_TBB_SUSPENDED:
+ case TASK_POOL_NO_THREADS:
+ tbb_task_pool_run(pool, std::move(task));
+ break;
+ case TASK_POOL_BACKGROUND:
+ case TASK_POOL_BACKGROUND_SERIAL:
+ background_task_pool_run(pool, std::move(task));
+ break;
+ }
+}
+
+void BLI_task_pool_work_and_wait(TaskPool *pool)
+{
+ switch (pool->type) {
+ case TASK_POOL_TBB:
+ case TASK_POOL_TBB_SUSPENDED:
+ case TASK_POOL_NO_THREADS:
+ tbb_task_pool_work_and_wait(pool);
+ break;
+ case TASK_POOL_BACKGROUND:
+ case TASK_POOL_BACKGROUND_SERIAL:
+ background_task_pool_work_and_wait(pool);
+ break;
+ }
+}
+
+void BLI_task_pool_cancel(TaskPool *pool)
+{
+ switch (pool->type) {
+ case TASK_POOL_TBB:
+ case TASK_POOL_TBB_SUSPENDED:
+ case TASK_POOL_NO_THREADS:
+ tbb_task_pool_cancel(pool);
+ break;
+ case TASK_POOL_BACKGROUND:
+ case TASK_POOL_BACKGROUND_SERIAL:
+ background_task_pool_cancel(pool);
+ break;
+ }
+}
+
+bool BLI_task_pool_canceled(TaskPool *pool)
+{
+ switch (pool->type) {
+ case TASK_POOL_TBB:
+ case TASK_POOL_TBB_SUSPENDED:
+ case TASK_POOL_NO_THREADS:
+ return tbb_task_pool_canceled(pool);
+ case TASK_POOL_BACKGROUND:
+ case TASK_POOL_BACKGROUND_SERIAL:
+ return background_task_pool_canceled(pool);
+ }
+ BLI_assert("BLI_task_pool_canceled: Control flow should not come here!");
+ return false;
+}
+
+void *BLI_task_pool_user_data(TaskPool *pool)
+{
+ return pool->userdata;
+}
+
+ThreadMutex *BLI_task_pool_user_mutex(TaskPool *pool)
+{
+ return &pool->user_mutex;
+}