Welcome to mirror list, hosted at ThFree Co, Russian Federation.

git.blender.org/blender.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
Diffstat (limited to 'intern/cycles/util/util_task.cpp')
-rw-r--r--intern/cycles/util/util_task.cpp711
1 files changed, 352 insertions, 359 deletions
diff --git a/intern/cycles/util/util_task.cpp b/intern/cycles/util/util_task.cpp
index ce166af206a..4b11ce73ea9 100644
--- a/intern/cycles/util/util_task.cpp
+++ b/intern/cycles/util/util_task.cpp
@@ -23,10 +23,14 @@
//#define THREADING_DEBUG_ENABLED
#ifdef THREADING_DEBUG_ENABLED
-#include <stdio.h>
-#define THREADING_DEBUG(...) do { printf(__VA_ARGS__); fflush(stdout); } while(0)
+# include <stdio.h>
+# define THREADING_DEBUG(...) \
+ do { \
+ printf(__VA_ARGS__); \
+ fflush(stdout); \
+ } while (0)
#else
-#define THREADING_DEBUG(...)
+# define THREADING_DEBUG(...)
#endif
CCL_NAMESPACE_BEGIN
@@ -35,156 +39,156 @@ CCL_NAMESPACE_BEGIN
TaskPool::TaskPool()
{
- num_tasks_handled = 0;
- num = 0;
- do_cancel = false;
+ num_tasks_handled = 0;
+ num = 0;
+ do_cancel = false;
}
TaskPool::~TaskPool()
{
- stop();
+ stop();
}
void TaskPool::push(Task *task, bool front)
{
- TaskScheduler::Entry entry;
+ TaskScheduler::Entry entry;
- entry.task = task;
- entry.pool = this;
+ entry.task = task;
+ entry.pool = this;
- TaskScheduler::push(entry, front);
+ TaskScheduler::push(entry, front);
}
-void TaskPool::push(const TaskRunFunction& run, bool front)
+void TaskPool::push(const TaskRunFunction &run, bool front)
{
- push(new Task(run), front);
+ push(new Task(run), front);
}
void TaskPool::wait_work(Summary *stats)
{
- thread_scoped_lock num_lock(num_mutex);
+ thread_scoped_lock num_lock(num_mutex);
- while(num != 0) {
- num_lock.unlock();
+ while (num != 0) {
+ num_lock.unlock();
- thread_scoped_lock queue_lock(TaskScheduler::queue_mutex);
+ thread_scoped_lock queue_lock(TaskScheduler::queue_mutex);
- /* find task from this pool. if we get a task from another pool,
- * we can get into deadlock */
- TaskScheduler::Entry work_entry;
- bool found_entry = false;
- list<TaskScheduler::Entry>::iterator it;
+ /* find task from this pool. if we get a task from another pool,
+ * we can get into deadlock */
+ TaskScheduler::Entry work_entry;
+ bool found_entry = false;
+ list<TaskScheduler::Entry>::iterator it;
- for(it = TaskScheduler::queue.begin(); it != TaskScheduler::queue.end(); it++) {
- TaskScheduler::Entry& entry = *it;
+ for (it = TaskScheduler::queue.begin(); it != TaskScheduler::queue.end(); it++) {
+ TaskScheduler::Entry &entry = *it;
- if(entry.pool == this) {
- work_entry = entry;
- found_entry = true;
- TaskScheduler::queue.erase(it);
- break;
- }
- }
+ if (entry.pool == this) {
+ work_entry = entry;
+ found_entry = true;
+ TaskScheduler::queue.erase(it);
+ break;
+ }
+ }
- queue_lock.unlock();
+ queue_lock.unlock();
- /* if found task, do it, otherwise wait until other tasks are done */
- if(found_entry) {
- /* run task */
- work_entry.task->run(0);
+ /* if found task, do it, otherwise wait until other tasks are done */
+ if (found_entry) {
+ /* run task */
+ work_entry.task->run(0);
- /* delete task */
- delete work_entry.task;
+ /* delete task */
+ delete work_entry.task;
- /* notify pool task was done */
- num_decrease(1);
- }
+ /* notify pool task was done */
+ num_decrease(1);
+ }
- num_lock.lock();
- if(num == 0)
- break;
+ num_lock.lock();
+ if (num == 0)
+ break;
- if(!found_entry) {
- THREADING_DEBUG("num==%d, Waiting for condition in TaskPool::wait_work !found_entry\n", num);
- num_cond.wait(num_lock);
- THREADING_DEBUG("num==%d, condition wait done in TaskPool::wait_work !found_entry\n", num);
- }
- }
+ if (!found_entry) {
+ THREADING_DEBUG("num==%d, Waiting for condition in TaskPool::wait_work !found_entry\n", num);
+ num_cond.wait(num_lock);
+ THREADING_DEBUG("num==%d, condition wait done in TaskPool::wait_work !found_entry\n", num);
+ }
+ }
- if(stats != NULL) {
- stats->time_total = time_dt() - start_time;
- stats->num_tasks_handled = num_tasks_handled;
- }
+ if (stats != NULL) {
+ stats->time_total = time_dt() - start_time;
+ stats->num_tasks_handled = num_tasks_handled;
+ }
}
void TaskPool::cancel()
{
- do_cancel = true;
+ do_cancel = true;
- TaskScheduler::clear(this);
+ TaskScheduler::clear(this);
- {
- thread_scoped_lock num_lock(num_mutex);
+ {
+ thread_scoped_lock num_lock(num_mutex);
- while(num) {
- THREADING_DEBUG("num==%d, Waiting for condition in TaskPool::cancel\n", num);
- num_cond.wait(num_lock);
- THREADING_DEBUG("num==%d condition wait done in TaskPool::cancel\n", num);
- }
- }
+ while (num) {
+ THREADING_DEBUG("num==%d, Waiting for condition in TaskPool::cancel\n", num);
+ num_cond.wait(num_lock);
+ THREADING_DEBUG("num==%d condition wait done in TaskPool::cancel\n", num);
+ }
+ }
- do_cancel = false;
+ do_cancel = false;
}
void TaskPool::stop()
{
- TaskScheduler::clear(this);
+ TaskScheduler::clear(this);
- assert(num == 0);
+ assert(num == 0);
}
bool TaskPool::canceled()
{
- return do_cancel;
+ return do_cancel;
}
bool TaskPool::finished()
{
- thread_scoped_lock num_lock(num_mutex);
- return num == 0;
+ thread_scoped_lock num_lock(num_mutex);
+ return num == 0;
}
void TaskPool::num_decrease(int done)
{
- num_mutex.lock();
- num -= done;
+ num_mutex.lock();
+ num -= done;
- assert(num >= 0);
- if(num == 0) {
- THREADING_DEBUG("num==%d, notifying all in TaskPool::num_decrease\n", num);
- num_cond.notify_all();
- }
+ assert(num >= 0);
+ if (num == 0) {
+ THREADING_DEBUG("num==%d, notifying all in TaskPool::num_decrease\n", num);
+ num_cond.notify_all();
+ }
- num_mutex.unlock();
+ num_mutex.unlock();
}
void TaskPool::num_increase()
{
- thread_scoped_lock num_lock(num_mutex);
- if(num_tasks_handled == 0) {
- start_time = time_dt();
- }
- num++;
- num_tasks_handled++;
- THREADING_DEBUG("num==%d, notifying all in TaskPool::num_increase\n", num);
- num_cond.notify_all();
+ thread_scoped_lock num_lock(num_mutex);
+ if (num_tasks_handled == 0) {
+ start_time = time_dt();
+ }
+ num++;
+ num_tasks_handled++;
+ THREADING_DEBUG("num==%d, notifying all in TaskPool::num_increase\n", num);
+ num_cond.notify_all();
}
/* Task Scheduler */
thread_mutex TaskScheduler::mutex;
int TaskScheduler::users = 0;
-vector<thread*> TaskScheduler::threads;
+vector<thread *> TaskScheduler::threads;
bool TaskScheduler::do_exit = false;
list<TaskScheduler::Entry> TaskScheduler::queue;
@@ -198,412 +202,401 @@ namespace {
* that node.
* If node is not available, then the corresponding number of processors is
* zero. */
-void get_per_node_num_processors(vector<int>* num_per_node_processors)
-{
- const int num_nodes = system_cpu_num_numa_nodes();
- if(num_nodes == 0) {
- LOG(ERROR) << "Zero available NUMA nodes, is not supposed to happen.";
- return;
- }
- num_per_node_processors->resize(num_nodes);
- for(int node = 0; node < num_nodes; ++node) {
- if(!system_cpu_is_numa_node_available(node)) {
- (*num_per_node_processors)[node] = 0;
- continue;
- }
- (*num_per_node_processors)[node] =
- system_cpu_num_numa_node_processors(node);
- }
+void get_per_node_num_processors(vector<int> *num_per_node_processors)
+{
+ const int num_nodes = system_cpu_num_numa_nodes();
+ if (num_nodes == 0) {
+ LOG(ERROR) << "Zero available NUMA nodes, is not supposed to happen.";
+ return;
+ }
+ num_per_node_processors->resize(num_nodes);
+ for (int node = 0; node < num_nodes; ++node) {
+ if (!system_cpu_is_numa_node_available(node)) {
+ (*num_per_node_processors)[node] = 0;
+ continue;
+ }
+ (*num_per_node_processors)[node] = system_cpu_num_numa_node_processors(node);
+ }
}
/* Calculate total number of processors on all available nodes.
* This is similar to system_cpu_thread_count(), but uses pre-calculated number
* of processors on each of the node, avoiding extra system calls and checks for
* the node availability. */
-int get_num_total_processors(const vector<int>& num_per_node_processors)
+int get_num_total_processors(const vector<int> &num_per_node_processors)
{
- int num_total_processors = 0;
- foreach(int num_node_processors, num_per_node_processors) {
- num_total_processors += num_node_processors;
- }
- return num_total_processors;
+ int num_total_processors = 0;
+ foreach (int num_node_processors, num_per_node_processors) {
+ num_total_processors += num_node_processors;
+ }
+ return num_total_processors;
}
/* Compute NUMA node for every thread to run on, for the best performance. */
vector<int> distribute_threads_on_nodes(const int num_threads)
{
- /* Start with all threads unassigned to any specific NUMA node. */
- vector<int> thread_nodes(num_threads, -1);
- const int num_active_group_processors =
- system_cpu_num_active_group_processors();
- VLOG(1) << "Detected " << num_active_group_processors << " processors "
- << "in active group.";
- if(num_active_group_processors >= num_threads) {
- /* If the current thread is set up in a way that its affinity allows to
- * use at least requested number of threads we do not explicitly set
- * affinity to the worker therads.
- * This way we allow users to manually edit affinity of the parent
- * thread, and here we follow that affinity. This way it's possible to
- * have two Cycles/Blender instances running manually set to a different
- * dies on a CPU. */
- VLOG(1) << "Not setting thread group affinity.";
- return thread_nodes;
- }
- vector<int> num_per_node_processors;
- get_per_node_num_processors(&num_per_node_processors);
- if(num_per_node_processors.size() == 0) {
- /* Error was already repported, here we can't do anything, so we simply
- * leave default affinity to all the worker threads. */
- return thread_nodes;
- }
- const int num_nodes = num_per_node_processors.size();
- int thread_index = 0;
- /* First pass: fill in all the nodes to their maximum.
- *
- * If there is less threads than the overall nodes capacity, some of the
- * nodes or parts of them will idle.
- *
- * TODO(sergey): Consider picking up fastest nodes if number of threads
- * fits on them. For example, on Threadripper2 we might consider using nodes
- * 0 and 2 if user requested 32 render threads. */
- const int num_total_node_processors =
- get_num_total_processors(num_per_node_processors);
- int current_node_index = 0;
- while(thread_index < num_total_node_processors &&
- thread_index < num_threads) {
- const int num_node_processors =
- num_per_node_processors[current_node_index];
- for(int processor_index = 0;
- processor_index < num_node_processors;
- ++processor_index)
- {
- VLOG(1) << "Scheduling thread " << thread_index << " to node "
- << current_node_index << ".";
- thread_nodes[thread_index] = current_node_index;
- ++thread_index;
- if(thread_index == num_threads) {
- /* All threads are scheduled on their nodes. */
- return thread_nodes;
- }
- }
- ++current_node_index;
- }
- /* Second pass: keep scheduling threads to each node one by one, uniformly
- * fillign them in.
- * This is where things becomes tricky to predict for the maximum
- * performance: on the one hand this avoids too much threading overhead on
- * few nodes, but for the final performance having all the overhead on one
- * node might be better idea (since other nodes will have better chance of
- * rendering faster).
- * But more tricky is that nodes might have difference capacity, so we might
- * want to do some weighted scheduling. For example, if node 0 has 16
- * processors and node 1 has 32 processors, we'd better schedule 1 extra
- * thread on node 0 and 2 extra threads on node 1. */
- current_node_index = 0;
- while(thread_index < num_threads) {
- /* Skip unavailable nodes. */
- /* TODO(sergey): Add sanity check against deadlock. */
- while(num_per_node_processors[current_node_index] == 0) {
- current_node_index = (current_node_index + 1) % num_nodes;
- }
- VLOG(1) << "Scheduling thread " << thread_index << " to node "
- << current_node_index << ".";
- ++thread_index;
- current_node_index = (current_node_index + 1) % num_nodes;
- }
-
- return thread_nodes;
+ /* Start with all threads unassigned to any specific NUMA node. */
+ vector<int> thread_nodes(num_threads, -1);
+ const int num_active_group_processors = system_cpu_num_active_group_processors();
+ VLOG(1) << "Detected " << num_active_group_processors << " processors "
+ << "in active group.";
+ if (num_active_group_processors >= num_threads) {
+ /* If the current thread is set up in a way that its affinity allows to
+ * use at least requested number of threads we do not explicitly set
+ * affinity to the worker therads.
+ * This way we allow users to manually edit affinity of the parent
+ * thread, and here we follow that affinity. This way it's possible to
+ * have two Cycles/Blender instances running manually set to a different
+ * dies on a CPU. */
+ VLOG(1) << "Not setting thread group affinity.";
+ return thread_nodes;
+ }
+ vector<int> num_per_node_processors;
+ get_per_node_num_processors(&num_per_node_processors);
+ if (num_per_node_processors.size() == 0) {
+ /* Error was already repported, here we can't do anything, so we simply
+ * leave default affinity to all the worker threads. */
+ return thread_nodes;
+ }
+ const int num_nodes = num_per_node_processors.size();
+ int thread_index = 0;
+ /* First pass: fill in all the nodes to their maximum.
+ *
+ * If there is less threads than the overall nodes capacity, some of the
+ * nodes or parts of them will idle.
+ *
+ * TODO(sergey): Consider picking up fastest nodes if number of threads
+ * fits on them. For example, on Threadripper2 we might consider using nodes
+ * 0 and 2 if user requested 32 render threads. */
+ const int num_total_node_processors = get_num_total_processors(num_per_node_processors);
+ int current_node_index = 0;
+ while (thread_index < num_total_node_processors && thread_index < num_threads) {
+ const int num_node_processors = num_per_node_processors[current_node_index];
+ for (int processor_index = 0; processor_index < num_node_processors; ++processor_index) {
+ VLOG(1) << "Scheduling thread " << thread_index << " to node " << current_node_index << ".";
+ thread_nodes[thread_index] = current_node_index;
+ ++thread_index;
+ if (thread_index == num_threads) {
+ /* All threads are scheduled on their nodes. */
+ return thread_nodes;
+ }
+ }
+ ++current_node_index;
+ }
+ /* Second pass: keep scheduling threads to each node one by one, uniformly
+ * fillign them in.
+ * This is where things becomes tricky to predict for the maximum
+ * performance: on the one hand this avoids too much threading overhead on
+ * few nodes, but for the final performance having all the overhead on one
+ * node might be better idea (since other nodes will have better chance of
+ * rendering faster).
+ * But more tricky is that nodes might have difference capacity, so we might
+ * want to do some weighted scheduling. For example, if node 0 has 16
+ * processors and node 1 has 32 processors, we'd better schedule 1 extra
+ * thread on node 0 and 2 extra threads on node 1. */
+ current_node_index = 0;
+ while (thread_index < num_threads) {
+ /* Skip unavailable nodes. */
+ /* TODO(sergey): Add sanity check against deadlock. */
+ while (num_per_node_processors[current_node_index] == 0) {
+ current_node_index = (current_node_index + 1) % num_nodes;
+ }
+ VLOG(1) << "Scheduling thread " << thread_index << " to node " << current_node_index << ".";
+ ++thread_index;
+ current_node_index = (current_node_index + 1) % num_nodes;
+ }
+
+ return thread_nodes;
}
} // namespace
void TaskScheduler::init(int num_threads)
{
- thread_scoped_lock lock(mutex);
- /* Multiple cycles instances can use this task scheduler, sharing the same
- * threads, so we keep track of the number of users. */
- ++users;
- if(users != 1) {
- return;
- }
- do_exit = false;
- const bool use_auto_threads = (num_threads == 0);
- if(use_auto_threads) {
- /* Automatic number of threads. */
- num_threads = system_cpu_thread_count();
- }
- VLOG(1) << "Creating pool of " << num_threads << " threads.";
-
- /* Compute distribution on NUMA nodes. */
- vector<int> thread_nodes = distribute_threads_on_nodes(num_threads);
-
- /* Launch threads that will be waiting for work. */
- threads.resize(num_threads);
- for(int thread_index = 0; thread_index < num_threads; ++thread_index) {
- threads[thread_index] = new thread(
- function_bind(&TaskScheduler::thread_run, thread_index + 1),
- thread_nodes[thread_index]);
- }
+ thread_scoped_lock lock(mutex);
+ /* Multiple cycles instances can use this task scheduler, sharing the same
+ * threads, so we keep track of the number of users. */
+ ++users;
+ if (users != 1) {
+ return;
+ }
+ do_exit = false;
+ const bool use_auto_threads = (num_threads == 0);
+ if (use_auto_threads) {
+ /* Automatic number of threads. */
+ num_threads = system_cpu_thread_count();
+ }
+ VLOG(1) << "Creating pool of " << num_threads << " threads.";
+
+ /* Compute distribution on NUMA nodes. */
+ vector<int> thread_nodes = distribute_threads_on_nodes(num_threads);
+
+ /* Launch threads that will be waiting for work. */
+ threads.resize(num_threads);
+ for (int thread_index = 0; thread_index < num_threads; ++thread_index) {
+ threads[thread_index] = new thread(function_bind(&TaskScheduler::thread_run, thread_index + 1),
+ thread_nodes[thread_index]);
+ }
}
void TaskScheduler::exit()
{
- thread_scoped_lock lock(mutex);
- users--;
- if(users == 0) {
- VLOG(1) << "De-initializing thread pool of task scheduler.";
- /* stop all waiting threads */
- TaskScheduler::queue_mutex.lock();
- do_exit = true;
- TaskScheduler::queue_cond.notify_all();
- TaskScheduler::queue_mutex.unlock();
-
- /* delete threads */
- foreach(thread *t, threads) {
- t->join();
- delete t;
- }
- threads.clear();
- }
+ thread_scoped_lock lock(mutex);
+ users--;
+ if (users == 0) {
+ VLOG(1) << "De-initializing thread pool of task scheduler.";
+ /* stop all waiting threads */
+ TaskScheduler::queue_mutex.lock();
+ do_exit = true;
+ TaskScheduler::queue_cond.notify_all();
+ TaskScheduler::queue_mutex.unlock();
+
+ /* delete threads */
+ foreach (thread *t, threads) {
+ t->join();
+ delete t;
+ }
+ threads.clear();
+ }
}
void TaskScheduler::free_memory()
{
- assert(users == 0);
- threads.free_memory();
+ assert(users == 0);
+ threads.free_memory();
}
-bool TaskScheduler::thread_wait_pop(Entry& entry)
+bool TaskScheduler::thread_wait_pop(Entry &entry)
{
- thread_scoped_lock queue_lock(queue_mutex);
+ thread_scoped_lock queue_lock(queue_mutex);
- while(queue.empty() && !do_exit)
- queue_cond.wait(queue_lock);
+ while (queue.empty() && !do_exit)
+ queue_cond.wait(queue_lock);
- if(queue.empty()) {
- assert(do_exit);
- return false;
- }
+ if (queue.empty()) {
+ assert(do_exit);
+ return false;
+ }
- entry = queue.front();
- queue.pop_front();
+ entry = queue.front();
+ queue.pop_front();
- return true;
+ return true;
}
void TaskScheduler::thread_run(int thread_id)
{
- Entry entry;
+ Entry entry;
- /* todo: test affinity/denormal mask */
+ /* todo: test affinity/denormal mask */
- /* keep popping off tasks */
- while(thread_wait_pop(entry)) {
- /* run task */
- entry.task->run(thread_id);
+ /* keep popping off tasks */
+ while (thread_wait_pop(entry)) {
+ /* run task */
+ entry.task->run(thread_id);
- /* delete task */
- delete entry.task;
+ /* delete task */
+ delete entry.task;
- /* notify pool task was done */
- entry.pool->num_decrease(1);
- }
+ /* notify pool task was done */
+ entry.pool->num_decrease(1);
+ }
}
-void TaskScheduler::push(Entry& entry, bool front)
+void TaskScheduler::push(Entry &entry, bool front)
{
- entry.pool->num_increase();
+ entry.pool->num_increase();
- /* add entry to queue */
- TaskScheduler::queue_mutex.lock();
- if(front)
- TaskScheduler::queue.push_front(entry);
- else
- TaskScheduler::queue.push_back(entry);
+ /* add entry to queue */
+ TaskScheduler::queue_mutex.lock();
+ if (front)
+ TaskScheduler::queue.push_front(entry);
+ else
+ TaskScheduler::queue.push_back(entry);
- TaskScheduler::queue_cond.notify_one();
- TaskScheduler::queue_mutex.unlock();
+ TaskScheduler::queue_cond.notify_one();
+ TaskScheduler::queue_mutex.unlock();
}
void TaskScheduler::clear(TaskPool *pool)
{
- thread_scoped_lock queue_lock(TaskScheduler::queue_mutex);
+ thread_scoped_lock queue_lock(TaskScheduler::queue_mutex);
- /* erase all tasks from this pool from the queue */
- list<Entry>::iterator it = queue.begin();
- int done = 0;
+ /* erase all tasks from this pool from the queue */
+ list<Entry>::iterator it = queue.begin();
+ int done = 0;
- while(it != queue.end()) {
- Entry& entry = *it;
+ while (it != queue.end()) {
+ Entry &entry = *it;
- if(entry.pool == pool) {
- done++;
- delete entry.task;
+ if (entry.pool == pool) {
+ done++;
+ delete entry.task;
- it = queue.erase(it);
- }
- else
- it++;
- }
+ it = queue.erase(it);
+ }
+ else
+ it++;
+ }
- queue_lock.unlock();
+ queue_lock.unlock();
- /* notify done */
- pool->num_decrease(done);
+ /* notify done */
+ pool->num_decrease(done);
}
/* Dedicated Task Pool */
DedicatedTaskPool::DedicatedTaskPool()
{
- do_cancel = false;
- do_exit = false;
- num = 0;
+ do_cancel = false;
+ do_exit = false;
+ num = 0;
- worker_thread = new thread(function_bind(&DedicatedTaskPool::thread_run, this));
+ worker_thread = new thread(function_bind(&DedicatedTaskPool::thread_run, this));
}
DedicatedTaskPool::~DedicatedTaskPool()
{
- stop();
- worker_thread->join();
- delete worker_thread;
+ stop();
+ worker_thread->join();
+ delete worker_thread;
}
void DedicatedTaskPool::push(Task *task, bool front)
{
- num_increase();
+ num_increase();
- /* add task to queue */
- queue_mutex.lock();
- if(front)
- queue.push_front(task);
- else
- queue.push_back(task);
+ /* add task to queue */
+ queue_mutex.lock();
+ if (front)
+ queue.push_front(task);
+ else
+ queue.push_back(task);
- queue_cond.notify_one();
- queue_mutex.unlock();
+ queue_cond.notify_one();
+ queue_mutex.unlock();
}
-void DedicatedTaskPool::push(const TaskRunFunction& run, bool front)
+void DedicatedTaskPool::push(const TaskRunFunction &run, bool front)
{
- push(new Task(run), front);
+ push(new Task(run), front);
}
void DedicatedTaskPool::wait()
{
- thread_scoped_lock num_lock(num_mutex);
+ thread_scoped_lock num_lock(num_mutex);
- while(num)
- num_cond.wait(num_lock);
+ while (num)
+ num_cond.wait(num_lock);
}
void DedicatedTaskPool::cancel()
{
- do_cancel = true;
+ do_cancel = true;
- clear();
- wait();
+ clear();
+ wait();
- do_cancel = false;
+ do_cancel = false;
}
void DedicatedTaskPool::stop()
{
- clear();
+ clear();
- do_exit = true;
- queue_cond.notify_all();
+ do_exit = true;
+ queue_cond.notify_all();
- wait();
+ wait();
- assert(num == 0);
+ assert(num == 0);
}
bool DedicatedTaskPool::canceled()
{
- return do_cancel;
+ return do_cancel;
}
void DedicatedTaskPool::num_decrease(int done)
{
- thread_scoped_lock num_lock(num_mutex);
- num -= done;
+ thread_scoped_lock num_lock(num_mutex);
+ num -= done;
- assert(num >= 0);
- if(num == 0)
- num_cond.notify_all();
+ assert(num >= 0);
+ if (num == 0)
+ num_cond.notify_all();
}
void DedicatedTaskPool::num_increase()
{
- thread_scoped_lock num_lock(num_mutex);
- num++;
- num_cond.notify_all();
+ thread_scoped_lock num_lock(num_mutex);
+ num++;
+ num_cond.notify_all();
}
-bool DedicatedTaskPool::thread_wait_pop(Task*& task)
+bool DedicatedTaskPool::thread_wait_pop(Task *&task)
{
- thread_scoped_lock queue_lock(queue_mutex);
+ thread_scoped_lock queue_lock(queue_mutex);
- while(queue.empty() && !do_exit)
- queue_cond.wait(queue_lock);
+ while (queue.empty() && !do_exit)
+ queue_cond.wait(queue_lock);
- if(queue.empty()) {
- assert(do_exit);
- return false;
- }
+ if (queue.empty()) {
+ assert(do_exit);
+ return false;
+ }
- task = queue.front();
- queue.pop_front();
+ task = queue.front();
+ queue.pop_front();
- return true;
+ return true;
}
void DedicatedTaskPool::thread_run()
{
- Task *task;
+ Task *task;
- /* keep popping off tasks */
- while(thread_wait_pop(task)) {
- /* run task */
- task->run(0);
+ /* keep popping off tasks */
+ while (thread_wait_pop(task)) {
+ /* run task */
+ task->run(0);
- /* delete task */
- delete task;
+ /* delete task */
+ delete task;
- /* notify task was done */
- num_decrease(1);
- }
+ /* notify task was done */
+ num_decrease(1);
+ }
}
void DedicatedTaskPool::clear()
{
- thread_scoped_lock queue_lock(queue_mutex);
+ thread_scoped_lock queue_lock(queue_mutex);
- /* erase all tasks from the queue */
- list<Task*>::iterator it = queue.begin();
- int done = 0;
+ /* erase all tasks from the queue */
+ list<Task *>::iterator it = queue.begin();
+ int done = 0;
- while(it != queue.end()) {
- done++;
- delete *it;
+ while (it != queue.end()) {
+ done++;
+ delete *it;
- it = queue.erase(it);
- }
+ it = queue.erase(it);
+ }
- queue_lock.unlock();
+ queue_lock.unlock();
- /* notify done */
- num_decrease(done);
+ /* notify done */
+ num_decrease(done);
}
string TaskPool::Summary::full_report() const
{
- string report = "";
- report += string_printf("Total time: %f\n", time_total);
- report += string_printf("Tasks handled: %d\n", num_tasks_handled);
- return report;
+ string report = "";
+ report += string_printf("Total time: %f\n", time_total);
+ report += string_printf("Tasks handled: %d\n", num_tasks_handled);
+ return report;
}
CCL_NAMESPACE_END