Welcome to mirror list, hosted at ThFree Co, Russian Federation.

github.com/mapsme/omim.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
path: root/base
diff options
context:
space:
mode:
authorYuri Gorshenin <y@maps.me>2015-05-22 15:44:38 +0300
committerAlex Zolotarev <alex@maps.me>2015-09-23 02:48:40 +0300
commit12ff790df7d5775763f1d841131185f316b85b15 (patch)
treef108d12d1729417b653ac075611831c298cb210d /base
parentdb394e88b2a5c6c2d9814f3c82586cc8de9c8289 (diff)
[threading] Fixed scheduled task API and tests.
Diffstat (limited to 'base')
-rw-r--r--base/base_tests/scheduled_task_test.cpp113
-rw-r--r--base/scheduled_task.cpp73
-rw-r--r--base/scheduled_task.hpp51
-rw-r--r--base/thread.hpp2
4 files changed, 125 insertions, 114 deletions
diff --git a/base/base_tests/scheduled_task_test.cpp b/base/base_tests/scheduled_task_test.cpp
index 7e2334dc2f..352f0c8225 100644
--- a/base/base_tests/scheduled_task_test.cpp
+++ b/base/base_tests/scheduled_task_test.cpp
@@ -5,77 +5,74 @@
#include "std/atomic.hpp"
#include "std/bind.hpp"
-
namespace
{
-void add_int(atomic<int> & val, int a) { val += a; }
+milliseconds const kTimeInaccuracy(1);
+
+void AddInt(atomic<int> & value, int a) { value += a; }
-void mul_int(atomic<int> & val, int b)
+void MulInt(atomic<int> & value, int m)
{
- int value = val;
- while (!val.compare_exchange_weak(value, value * b))
+ int v = value;
+ while (!value.compare_exchange_weak(v, v * m))
;
}
} // namespace
-/// @todo Next tests are based on assumptions that some delays are suitable for
-/// performing needed checks, before a task will fire.
-
-UNIT_TEST(ScheduledTask_Smoke)
+// ScheduledTask start (stop) is a memory barrier because it starts
+// (stops) a thread. That's why it's ok to create time points before
+// ScheduledTask creation and after ScheduledTask completion. Also,
+// we're assuming that steady_clocks are consistent between CPU cores.
+UNIT_TEST(ScheduledTask_SimpleAdd)
{
- atomic<int> val(0);
-
- ScheduledTask t(bind(&add_int, ref(val), 10), 1000);
-
- // Assume that t thread isn't fired yet.
- TEST_EQUAL(val, 0, ());
-
- threads::Sleep(1100);
-
- TEST_EQUAL(val, 10, ());
+ steady_clock::time_point const start = steady_clock::now();
+ milliseconds const delay(1000);
+
+ atomic<int> value(0);
+ ScheduledTask task1(bind(&AddInt, ref(value), 1), delay);
+ ScheduledTask task2(bind(&AddInt, ref(value), 2), delay);
+ task1.WaitForCompletion();
+ task2.WaitForCompletion();
+ TEST_EQUAL(value, 3, ());
+
+ steady_clock::time_point const end = steady_clock::now();
+ milliseconds const elapsed = duration_cast<milliseconds>(end - start);
+ TEST(elapsed >= delay - kTimeInaccuracy, (elapsed.count(), delay.count()));
}
-UNIT_TEST(ScheduledTask_CancelInfinite)
+UNIT_TEST(ScheduledTask_SimpleMul)
{
- atomic<int> val(2);
-
- ScheduledTask t0(bind(&add_int, ref(val), 10), static_cast<unsigned>(-1));
-
- t0.CancelBlocking();
-
- TEST_EQUAL(val, 2, ());
+ steady_clock::time_point const start = steady_clock::now();
+ milliseconds const delay(1500);
+
+ atomic<int> value(1);
+ ScheduledTask task1(bind(&MulInt, ref(value), 2), delay);
+ ScheduledTask task2(bind(&MulInt, ref(value), 3), delay);
+ task1.WaitForCompletion();
+ task2.WaitForCompletion();
+ TEST_EQUAL(value, 6, ());
+
+ steady_clock::time_point const end = steady_clock::now();
+ milliseconds const elapsed = duration_cast<milliseconds>(end - start);
+ TEST(elapsed >= delay - kTimeInaccuracy, (elapsed.count(), delay.count()));
}
-UNIT_TEST(ScheduledTask_Cancel)
+UNIT_TEST(ScheduledTask_CancelNoBlocking)
{
- atomic<int> val(2);
-
- ScheduledTask t0(bind(&add_int, ref(val), 10), 500);
- ScheduledTask t1(bind(&mul_int, ref(val), 2), 1000);
-
- TEST_EQUAL(val, 2, ());
-
- // Assume that t0 thread isn't fired yet.
- t0.CancelBlocking();
-
- threads::Sleep(1100);
-
- TEST_EQUAL(val, 4, ());
-}
-
-UNIT_TEST(ScheduledTask_NoWaitInCancel)
-{
- atomic<int> val(2);
-
- ScheduledTask t0(bind(&add_int, ref(val), 10), 1000);
- ScheduledTask t1(bind(&mul_int, ref(val), 3), 500);
-
- t0.CancelBlocking();
-
- // Assume that t1 thread isn't fired yet.
- val += 3;
-
- threads::Sleep(600);
-
- TEST_EQUAL(val, 15, ());
+ steady_clock::time_point const start = steady_clock::now();
+ milliseconds const delay(1500);
+
+ atomic<int> value(0);
+ ScheduledTask task(bind(&AddInt, ref(value), 1), delay);
+
+ task.CancelNoBlocking();
+ task.WaitForCompletion();
+
+ if (task.WasStarted())
+ {
+ TEST_EQUAL(value, 1, ());
+ steady_clock::time_point const end = steady_clock::now();
+ milliseconds const elapsed = duration_cast<milliseconds>(end - start);
+ TEST(elapsed >= delay - kTimeInaccuracy, (elapsed.count(), delay.count()));
+ }
}
diff --git a/base/scheduled_task.cpp b/base/scheduled_task.cpp
index 09ac7504b0..6ad2605fe3 100644
--- a/base/scheduled_task.cpp
+++ b/base/scheduled_task.cpp
@@ -1,64 +1,73 @@
#include "base/scheduled_task.hpp"
-#include "base/timer.hpp"
-#include "../base/logging.hpp"
+#include "base/timer.hpp"
+#include "base/logging.hpp"
-#include "../std/algorithm.hpp"
-#include "../std/chrono.hpp"
+#include "std/algorithm.hpp"
+#include "std/mutex.hpp"
-ScheduledTask::Routine::Routine(fn_t const & fn,
- unsigned ms,
- condition_variable & condVar)
- : m_fn(fn),
- m_ms(ms),
- m_condVar(condVar)
-{}
+ScheduledTask::Routine::Routine(fn_t const & fn, milliseconds delay, atomic<bool> & started)
+ : m_fn(fn), m_delay(delay), m_started(started)
+{
+}
void ScheduledTask::Routine::Do()
{
- unique_lock<mutex> lock(m_mutex);
+ mutex mu;
+ unique_lock<mutex> lock(mu);
- milliseconds timeLeft(m_ms);
- while (!IsCancelled() && timeLeft != milliseconds::zero())
+ steady_clock::time_point const end = steady_clock::now() + m_delay;
+ while (!IsCancelled())
{
- my::Timer t;
- m_condVar.wait_for(lock, timeLeft, [this]()
- {
- return IsCancelled();
- });
- milliseconds timeElapsed(static_cast<unsigned>(t.ElapsedSeconds() * 1000));
- timeLeft -= min(timeLeft, timeElapsed);
+ steady_clock::time_point const current = steady_clock::now();
+ if (current >= end)
+ break;
+ m_cv.wait_for(lock, end - current, [this]()
+ {
+ return IsCancelled();
+ });
}
if (!IsCancelled())
+ {
+ m_started = true;
m_fn();
+ }
}
void ScheduledTask::Routine::Cancel()
{
threads::IRoutine::Cancel();
- m_condVar.notify_one();
+ m_cv.notify_one();
}
-ScheduledTask::ScheduledTask(fn_t const & fn, unsigned ms)
+ScheduledTask::ScheduledTask(fn_t const & fn, milliseconds ms) : m_started(false)
{
- m_thread.Create(make_unique<Routine>(fn, ms, m_condVar));
+ m_thread.Create(make_unique<Routine>(fn, ms, m_started));
}
ScheduledTask::~ScheduledTask()
{
- CancelBlocking();
+ CHECK(m_threadChecker.CalledOnOriginalThread(), ());
+ m_thread.Cancel();
}
-bool ScheduledTask::CancelNoBlocking()
+bool ScheduledTask::WasStarted() const
{
- if (!m_thread.GetRoutine())
- return false;
- m_thread.GetRoutine()->Cancel();
- return true;
+ CHECK(m_threadChecker.CalledOnOriginalThread(), ());
+ return m_started;
}
-void ScheduledTask::CancelBlocking()
+void ScheduledTask::CancelNoBlocking()
{
- m_thread.Cancel();
+ CHECK(m_threadChecker.CalledOnOriginalThread(), ());
+ threads::IRoutine * routine = m_thread.GetRoutine();
+ CHECK(routine, ());
+ routine->Cancel();
+}
+
+void ScheduledTask::WaitForCompletion()
+{
+ CHECK(m_threadChecker.CalledOnOriginalThread(), ());
+ m_thread.Join();
}
diff --git a/base/scheduled_task.hpp b/base/scheduled_task.hpp
index 0ad46bbb66..91a070db2a 100644
--- a/base/scheduled_task.hpp
+++ b/base/scheduled_task.hpp
@@ -2,49 +2,54 @@
#include "base/condition.hpp"
#include "base/thread.hpp"
+#include "base/thread_checker.hpp"
+#include "std/chrono.hpp"
#include "std/condition_variable.hpp"
#include "std/function.hpp"
-#include "std/mutex.hpp"
#include "std/unique_ptr.hpp"
-/// Class, which performs any function when the specified
-/// amount of time is elapsed.
+/// Class, which performs any function when the specified amount of
+/// time is elapsed. This class is not thread safe.
class ScheduledTask
{
typedef function<void()> fn_t;
class Routine : public threads::IRoutine
{
- fn_t m_fn;
- unsigned m_ms;
-
- condition_variable & m_condVar;
- mutex m_mutex;
+ fn_t const m_fn;
+ milliseconds const m_delay;
+ condition_variable m_cv;
+ atomic<bool> & m_started;
public:
- Routine(fn_t const & fn, unsigned ms, condition_variable & condVar);
+ Routine(fn_t const & fn, milliseconds delay, atomic<bool> & started);
+
+ // IRoutine overrides:
+ void Do() override;
- virtual void Do();
- virtual void Cancel();
+ // my::Cancellable overrides:
+ void Cancel() override;
};
- /// The construction and destruction order is strict here: m_cond is
- /// used by routine that will be executed on m_thread.
- mutex m_mutex;
- condition_variable m_condVar;
+ /// The construction and destruction order is strict here: m_started
+ /// is used by routine that will be executed on m_thread.
+ atomic<bool> m_started;
threads::Thread m_thread;
+ ThreadChecker m_threadChecker;
+
public:
- /// Constructor by function and time in miliseconds.
- ScheduledTask(fn_t const & fn, unsigned ms);
+ ScheduledTask(fn_t const & fn, milliseconds ms);
~ScheduledTask();
- /// @name Task could be cancelled before time elapses.
- //@{
- /// @return false If the task is already running or in some intermediate state.
- bool CancelNoBlocking();
- void CancelBlocking();
- //@}
+ /// Returns true if task was started after delay.
+ bool WasStarted() const;
+
+ /// Cancels task without waiting for worker thread termination.
+ void CancelNoBlocking();
+
+ /// Waits for task's completion and worker thread termination.
+ void WaitForCompletion();
};
diff --git a/base/thread.hpp b/base/thread.hpp
index 85e1f277a2..a953a11830 100644
--- a/base/thread.hpp
+++ b/base/thread.hpp
@@ -31,7 +31,7 @@ public:
///
/// Thread class manages lifetime of a running IRoutine and guarantees
/// that it will be possible to access the IRoutine after
-/// Thread::Create()call until destruction of a Thread object. In the
+/// Thread::Create() call until cancellation or destruction. In the
/// latter case, system thread will be responsible for deletion of a
/// IRoutine.
class Thread