Welcome to mirror list, hosted at ThFree Co, Russian Federation.

thread_pool_delayed.hpp « base - github.com/mapsme/omim.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
blob: 2bc1ab9105927e55c6de3520e85e152800b3b2a9 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
#pragma once

#include "base/assert.hpp"
#include "base/bidirectional_map.hpp"
#include "base/linked_map.hpp"
#include "base/task_loop.hpp"
#include "base/thread.hpp"
#include "base/thread_checker.hpp"

#include <chrono>
#include <condition_variable>
#include <cstdint>
#include <map>
#include <memory>
#include <mutex>
#include <unordered_map>
#include <utility>
#include <vector>

namespace base
{
namespace thread_pool
{
namespace delayed
{
// This class represents a simple thread pool with a queue of tasks.
//
// *NOTE* This class IS NOT thread-safe, it must be destroyed on the
// same thread it was created, but Push* methods are thread-safe.
class ThreadPool : public TaskLoop
{
public:
  using Clock = std::chrono::steady_clock;
  using Duration = Clock::duration;
  using TimePoint = Clock::time_point;

  // Use it outside the class for testing purposes only.
  static constexpr TaskId kImmediateMinId = 1;
  static constexpr TaskId kImmediateMaxId = std::numeric_limits<TaskId>::max() / 2;
  static constexpr TaskId kDelayedMinId = kImmediateMaxId + 1;
  static constexpr TaskId kDelayedMaxId = std::numeric_limits<TaskId>::max();

  enum class Exit
  {
    ExecPending,
    SkipPending
  };

  explicit ThreadPool(size_t threadsCount = 1, Exit e = Exit::SkipPending);
  ~ThreadPool() override;

  // Pushes task to the end of the thread's queue of immediate tasks.
  //
  // The task |t| is going to be executed after all immediate tasks
  // that were pushed pushed before it.
  PushResult Push(Task && t) override;
  PushResult Push(Task const & t) override;

  // Pushes task to the thread's queue of delayed tasks.
  //
  // The task |t| is going to be executed not earlier than after
  // |delay|.  No other guarantees about execution order are made.  In
  // particular, when executing:
  //
  // PushDelayed(3ms, task1);
  // PushDelayed(1ms, task2);
  //
  // there is no guarantee that |task2| will be executed before |task1|.
  //
  // NOTE: current implementation depends on the fact that
  // steady_clock is the same for different threads.
  PushResult PushDelayed(Duration const & delay, Task && t);
  PushResult PushDelayed(Duration const & delay, Task const & t);

  // Cancels task if it is in queue and is not running yet.
  // Returns false when thread is shut down,
  // task is not found or already running, otherwise true.
  bool Cancel(TaskId id);

  // Sends a signal to the thread to shut down. Returns false when the
  // thread was shut down previously.
  bool Shutdown(Exit e);

  // Sends a signal to the thread to shut down and waits for completion.
  void ShutdownAndJoin();
  bool IsShutDown();

  static TimePoint Now() { return Clock::now(); }

private:
  enum QueueType
  {
    QUEUE_TYPE_IMMEDIATE,
    QUEUE_TYPE_DELAYED,
    QUEUE_TYPE_COUNT
  };

  struct DelayedTask
  {
    template <typename T>
    DelayedTask(TaskId id, TimePoint const & when, T && task)
      : m_id(id)
      , m_when(when)
      , m_task(std::forward<T>(task))
    {
    }

    bool operator<(DelayedTask const & rhs) const
    {
      if (m_when == rhs.m_when)
        return m_id < rhs.m_id;

      return m_when < rhs.m_when;
    }
    bool operator>(DelayedTask const & rhs) const { return rhs < *this; }

    TaskId m_id = kNoId;
    TimePoint m_when = {};
    Task m_task = {};
  };

  template <typename T>
  struct DeRef
  {
    bool operator()(T const & lhs, T const & rhs) const { return *lhs < *rhs; }
  };

  using ImmediateQueue = base::LinkedMap<TaskId, Task>;

  using DelayedValue = std::shared_ptr<DelayedTask>;
  class DelayedQueue : public BidirectionalMap<TaskId, DelayedValue,
                                              std::unordered_map, std::hash<TaskId>,
                                              std::multimap, DeRef<DelayedValue>>
  {
  public:
    Value const & GetFirstValue() const
    {
      auto const & vTok = GetValuesToKeys();
      CHECK(!vTok.empty(), ());
      return vTok.begin()->first;
    }
  };

  template <typename T>
  PushResult AddImmediate(T && task);
  template <typename T>
  PushResult AddDelayed(Duration const & delay, T && task);
  template <typename Add>
  PushResult AddTask(Add && add);

  void ProcessTasks();

  std::vector<::threads::SimpleThread> m_threads;
  std::mutex m_mu;
  std::condition_variable m_cv;

  bool m_shutdown = false;
  Exit m_exit = Exit::SkipPending;

  ImmediateQueue m_immediate;
  DelayedQueue m_delayed;

  TaskId m_immediateLastId;
  TaskId m_delayedLastId;

  ThreadChecker m_checker;
};
}  // namespace delayed
}  // namespace thread_pool
}  // namespace base