Welcome to mirror list, hosted at ThFree Co, Russian Federation.

github.com/mapsme/omim.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorYuri Gorshenin <y@maps.me>2017-09-29 14:12:12 +0300
committerArsentiy Milchakov <milcars@mapswithme.com>2017-09-29 16:27:27 +0300
commit851b67edbe8fbf3fd86cb9ac110ad3d3882e959c (patch)
tree39c1a9e7adb6b4d6e883eec2ce9cd9b909651383
parentb8cdc1b24e963d7d38271c97d1333c3c7ac69b90 (diff)
Review fixes.
-rw-r--r--base/base_tests/worker_thread_tests.cpp2
-rw-r--r--base/worker_thread.cpp55
-rw-r--r--base/worker_thread.hpp35
3 files changed, 47 insertions, 45 deletions
diff --git a/base/base_tests/worker_thread_tests.cpp b/base/base_tests/worker_thread_tests.cpp
index e478236cbf..6b07cb6b88 100644
--- a/base/base_tests/worker_thread_tests.cpp
+++ b/base/base_tests/worker_thread_tests.cpp
@@ -89,7 +89,7 @@ UNIT_TEST(WorkerThread_PushFromPendingTask)
p.set_value();
}
-UNIT_TEST(WorkerThread_PushDelayedTask)
+UNIT_TEST(WorkerThread_DelayedAndImmediateTasks)
{
int const kNumTasks = 100;
diff --git a/base/worker_thread.cpp b/base/worker_thread.cpp
index 8c15175127..c42dff9364 100644
--- a/base/worker_thread.cpp
+++ b/base/worker_thread.cpp
@@ -1,5 +1,7 @@
#include "base/worker_thread.hpp"
+#include <array>
+
using namespace std;
namespace base
@@ -28,8 +30,6 @@ bool WorkerThread::Push(Task const & t)
bool WorkerThread::PushDelayed(Duration const & delay, Task && t)
{
- // NOTE: this code depends on the fact that steady_clock is the same
- // for different threads.
auto const when = Now() + delay;
return TouchQueues([&]() { m_delayed.emplace(when, move(t)); });
}
@@ -47,16 +47,16 @@ void WorkerThread::ProcessTasks()
while (true)
{
- Task task;
+ array<Task, QUEUE_TYPE_COUNT> tasks;
{
unique_lock<mutex> lk(m_mu);
if (!m_delayed.empty())
{
- // We need to wait for the moment when the delayed task must
- // be executed, but may be interrupted earlier, in case of
- // immediate task or another delayed task that must be
- // executed earlier.
+ // We need to wait until the moment when the earliest delayed
+ // task may be executed, given that an immediate task or a
+ // delayed task with an earlier execution time may arrive
+ // while we are waiting.
auto const when = m_delayed.top().m_when;
m_cv.wait_until(lk, when, [this, when]() {
return m_shutdown || !m_immediate.empty() || m_delayed.top().m_when < when;
@@ -64,9 +64,8 @@ void WorkerThread::ProcessTasks()
}
else
{
- // When there is no delayed tasks in the queue, we need to
- // wait until there is at least one immediate task or delayed
- // task.
+ // When there are no delayed tasks in the queue, we need to
+ // wait until there is at least one immediate or delayed task.
m_cv.wait(lk,
[this]() { return m_shutdown || !m_immediate.empty() || !m_delayed.empty(); });
}
@@ -91,38 +90,24 @@ void WorkerThread::ProcessTasks()
auto const canExecImmediate = !m_immediate.empty();
auto const canExecDelayed = !m_delayed.empty() && Now() >= m_delayed.top().m_when;
- if (!canExecImmediate && !canExecDelayed)
- continue;
-
- ASSERT(canExecImmediate || canExecDelayed, ());
- bool execImmediate = canExecImmediate;
- bool execDelayed = canExecDelayed;
-
- if (canExecImmediate && canExecDelayed)
- {
- // Tasks are executed in the Round-Robin order to prevent
- // bias.
- execImmediate = m_lastQueue == QueueType::Delayed;
- execDelayed = m_lastQueue == QueueType::Immediate;
- }
-
- if (execImmediate)
+ if (canExecImmediate)
{
- task = move(m_immediate.front());
+ tasks[QUEUE_TYPE_IMMEDIATE] = move(m_immediate.front());
m_immediate.pop();
- m_lastQueue = QueueType::Immediate;
}
- else
+
+ if (canExecDelayed)
{
- ASSERT(execDelayed, ());
- task = move(m_delayed.top().m_task);
+ tasks[QUEUE_TYPE_DELAYED] = move(m_delayed.top().m_task);
m_delayed.pop();
- m_lastQueue = QueueType::Delayed;
}
}
- if (task)
- task();
+ for (auto const & task : tasks)
+ {
+ if (task)
+ task();
+ }
}
for (; !pendingImmediate.empty(); pendingImmediate.pop())
@@ -131,7 +116,7 @@ void WorkerThread::ProcessTasks()
for (; !pendingDelayed.empty(); pendingDelayed.pop())
{
auto const & top = pendingDelayed.top();
- while(true)
+ while (true)
{
auto const now = Now();
if (now >= top.m_when)
diff --git a/base/worker_thread.hpp b/base/worker_thread.hpp
index 6a4a4af554..1408e89c0c 100644
--- a/base/worker_thread.hpp
+++ b/base/worker_thread.hpp
@@ -33,11 +33,28 @@ public:
WorkerThread();
~WorkerThread() override;
- // Pushes task to the end of the thread's queue. Returns false when
- // the thread is shut down.
+ // Pushes task to the end of the thread's queue of immediate tasks.
+ // Returns false when the thread is shut down.
+ //
+ // The task |t| is going to be executed after all immediate tasks
+ // that were pushed pushed before it.
bool Push(Task && t) override;
bool Push(Task const & t) override;
+ // Pushes task to the thread's queue of delayed tasks. Returns false
+ // when the thread is shut down.
+ //
+ // The task |t| is going to be executed not earlier than after
+ // |delay|. No other guarantees about execution order are made. In
+ // particular, when executing:
+ //
+ // PushDelayed(3ms, task1);
+ // PushDelayed(1ms, task2);
+ //
+ // there is no guarantee that |task2| will be executed before |task1|.
+ //
+ // NOTE: current implementation depends on the fact that
+ // steady_clock is the same for different threads.
bool PushDelayed(Duration const & delay, Task && t);
bool PushDelayed(Duration const & delay, Task const & t);
@@ -48,6 +65,13 @@ public:
TimePoint Now() const { return Clock::now(); }
private:
+ enum QueueType
+ {
+ QUEUE_TYPE_IMMEDIATE,
+ QUEUE_TYPE_DELAYED,
+ QUEUE_TYPE_COUNT
+ };
+
struct DelayedTask
{
template <typename T>
@@ -66,12 +90,6 @@ private:
using DelayedQueue =
std::priority_queue<DelayedTask, std::vector<DelayedTask>, std::greater<DelayedTask>>;
- enum class QueueType
- {
- Immediate,
- Delayed
- };
-
template <typename Fn>
bool TouchQueues(Fn && fn)
{
@@ -94,7 +112,6 @@ private:
ImmediateQueue m_immediate;
DelayedQueue m_delayed;
- QueueType m_lastQueue = QueueType::Immediate;
ThreadChecker m_checker;
};