Welcome to mirror list, hosted at ThFree Co, Russian Federation.

thread_pool_computational.hpp « base - github.com/mapsme/omim.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
blob: 027a163d0f94707e88208364125061aaf2b36aa6 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
#pragma once

#include "base/assert.hpp"
#include "base/thread_utils.hpp"

#include <atomic>
#include <functional>
#include <future>
#include <memory>
#include <queue>
#include <thread>

namespace base
{
using namespace threads;
namespace thread_pool
{
namespace computational
{
// ThreadPool is needed for easy parallelization of tasks.
// ThreadPool can accept tasks that return result as std::future.
// When the destructor is called, all threads will join.
// Warning: ThreadPool works with std::thread instead of SimpleThread and therefore
// should not be used when the JVM is needed.
class ThreadPool
{
public:
  using FunctionType = FunctionWrapper;
  using Threads = std::vector<std::thread>;

  // Constructs a ThreadPool.
  // threadCount - number of threads used by the thread pool.
  // Warning: The constructor may throw exceptions.
  ThreadPool(size_t threadCount) : m_done(false), m_joiner(m_threads)
  {
    CHECK_GREATER(threadCount, 0, ());

    m_threads.reserve(threadCount);
    try
    {
      for (size_t i = 0; i < threadCount; i++)
        m_threads.emplace_back(&ThreadPool::Worker, this);
    }
    catch (...)  // std::system_error etc.
    {
      Stop();
      throw;
    }
  }

  // Destroys the ThreadPool.
  // This function will block until all runnables have been completed.
  ~ThreadPool()
  {
    {
      std::unique_lock<std::mutex> lock(m_mutex);
      m_done = true;
    }
    m_condition.notify_all();
  }

  // Submit task for execution.
  // func - task to be performed.
  // args - arguments for func.
  // The function will return the object future.
  // Warning: If the thread pool is stopped then the call will be ignored.
  template <typename F, typename... Args>
  auto Submit(F && func, Args &&... args) -> std::future<decltype(func(args...))>
  {
    {
      std::unique_lock<std::mutex> lock(m_mutex);
      if (m_done)
        return {};
    }
    using ResultType = decltype(func(args...));
    std::packaged_task<ResultType()> task(std::bind(std::forward<F>(func),
                                                    std::forward<Args>(args)...));
    std::future<ResultType> result(task.get_future());
    {
      std::unique_lock<std::mutex> lock(m_mutex);
      m_queue.emplace(std::move(task));
    }
    m_condition.notify_one();
    return result;
  }

  // Stop a ThreadPool.
  // Removes the tasks that are not yet started from the queue.
  // Unlike the destructor, this function does not wait for all runnables to complete:
  // the tasks will stop as soon as possible.
  void Stop()
  {
    {
      std::unique_lock<std::mutex> lock(m_mutex);
      auto empty = std::queue<FunctionType>();
      m_queue.swap(empty);
      m_done = true;
    }
    m_condition.notify_all();
  }

private:
  void Worker()
  {
    while (true)
    {
      FunctionType task;
      {
        std::unique_lock<std::mutex> lock(m_mutex);
        m_condition.wait(lock, [&] {
          return m_done || !m_queue.empty();
        });

        if (m_done && m_queue.empty())
          return;

        // It may seem that at this point the queue may be empty, provided that m_done == false and
        // m_queue.empty() == true. But it is not possible that the queue is not empty guarantees
        // check in m_condition.wait.
        task = std::move(m_queue.front());
        m_queue.pop();
      }

      task();
    }
  }

  bool m_done;
  std::mutex m_mutex;
  std::condition_variable m_condition;
  std::queue<FunctionType> m_queue;
  Threads m_threads;
  ThreadsJoiner<> m_joiner;
};
}  // namespace computational
}  // namespace thread_pool
}  // namespace base