Welcome to mirror list, hosted at ThFree Co, Russian Federation.

github.com/mapsme/omim.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
path: root/base
diff options
context:
space:
mode:
authormgsergio <mgsergio@yandex.ru>2017-06-19 15:52:47 +0300
committerArsentiy Milchakov <milcars@mapswithme.com>2017-06-26 18:48:52 +0300
commit8187ce775d9f3c06ce92b0c0aace01c80ca958d3 (patch)
treefe85bf378d4b83be44795d304575315a955cc6d1 /base
parentd4edcee341e3f3bd38db6f66519984d08f189c1c (diff)
Merge pull request #6300 from ygorshenin/worker-thread
[base] WorkerThread.
Diffstat (limited to 'base')
-rw-r--r--base/CMakeLists.txt6
-rw-r--r--base/base.pro2
-rw-r--r--base/base_tests/CMakeLists.txt1
-rw-r--r--base/base_tests/base_tests.pro1
-rw-r--r--base/base_tests/worker_thread_tests.cpp70
-rw-r--r--base/worker_thread.cpp73
-rw-r--r--base/worker_thread.hpp58
7 files changed, 209 insertions, 2 deletions
diff --git a/base/CMakeLists.txt b/base/CMakeLists.txt
index e4d124537a..1ef8cf87ea 100644
--- a/base/CMakeLists.txt
+++ b/base/CMakeLists.txt
@@ -22,6 +22,8 @@ set(
get_time.hpp
gmtime.cpp
gmtime.hpp
+ internal/message.cpp
+ internal/message.hpp
levenshtein_dfa.cpp
levenshtein_dfa.hpp
limited_priority_queue.hpp
@@ -72,14 +74,14 @@ set(
threaded_container.cpp
threaded_container.hpp
threaded_list.hpp
- internal/message.cpp
- internal/message.hpp
timegm.cpp
timegm.hpp
timer.cpp
timer.hpp
uni_string_dfa.cpp
uni_string_dfa.hpp
+ worker_thread.cpp
+ worker_thread.hpp
)
add_library(${PROJECT_NAME} ${SRC})
diff --git a/base/base.pro b/base/base.pro
index b9bf3d586e..1f0e31b62f 100644
--- a/base/base.pro
+++ b/base/base.pro
@@ -33,6 +33,7 @@ SOURCES += \
timegm.cpp \
timer.cpp \
uni_string_dfa.cpp \
+ worker_thread.cpp \
HEADERS += \
SRC_FIRST.hpp \
@@ -91,3 +92,4 @@ HEADERS += \
timegm.hpp \
timer.hpp \
uni_string_dfa.hpp \
+ worker_thread.hpp \
diff --git a/base/base_tests/CMakeLists.txt b/base/base_tests/CMakeLists.txt
index 2a31f84d0c..724127020f 100644
--- a/base/base_tests/CMakeLists.txt
+++ b/base/base_tests/CMakeLists.txt
@@ -35,6 +35,7 @@ set(
timegm_test.cpp
timer_test.cpp
uni_string_dfa_test.cpp
+ worker_thread_tests.cpp
)
omim_add_test(${PROJECT_NAME} ${SRC})
diff --git a/base/base_tests/base_tests.pro b/base/base_tests/base_tests.pro
index a057e05ed7..c848dca3b9 100644
--- a/base/base_tests/base_tests.pro
+++ b/base/base_tests/base_tests.pro
@@ -45,5 +45,6 @@ SOURCES += \
timegm_test.cpp \
timer_test.cpp \
uni_string_dfa_test.cpp \
+ worker_thread_tests.cpp \
HEADERS +=
diff --git a/base/base_tests/worker_thread_tests.cpp b/base/base_tests/worker_thread_tests.cpp
new file mode 100644
index 0000000000..c2c825f319
--- /dev/null
+++ b/base/base_tests/worker_thread_tests.cpp
@@ -0,0 +1,70 @@
+#include "testing/testing.hpp"
+
+#include "base/worker_thread.hpp"
+
+#include <condition_variable>
+#include <mutex>
+
+using namespace base;
+using namespace std;
+
+namespace
+{
+UNIT_TEST(WorkerThread_Smoke)
+{
+ {
+ WorkerThread thread;
+ }
+
+ {
+ WorkerThread thread;
+ thread.Shutdown(WorkerThread::Exit::SkipPending);
+ }
+
+ {
+ WorkerThread thread;
+ thread.Shutdown(WorkerThread::Exit::ExecPending);
+ }
+}
+
+UNIT_TEST(WorkerThread_SimpleSync)
+{
+ int value = 0;
+
+ mutex mu;
+ condition_variable cv;
+ bool done = false;
+
+ WorkerThread thread;
+ thread.Push([&value]() { ++value; });
+ thread.Push([&value]() { value *= 2; });
+ thread.Push([&value]() { value = value * value * value; });
+ thread.Push([&]() {
+ lock_guard<mutex> lk(mu);
+ done = true;
+ cv.notify_one();
+ });
+
+ {
+ unique_lock<mutex> lk(mu);
+ cv.wait(lk, [&done]() { return done; });
+ }
+
+ TEST_EQUAL(value, 8, ());
+}
+
+UNIT_TEST(WorkerThread_SimpleFlush)
+{
+ int value = 0;
+ {
+ WorkerThread thread;
+ thread.Push([&value]() { ++value; });
+ thread.Push([&value]() {
+ for (int i = 0; i < 10; ++i)
+ value *= 2;
+ });
+ thread.Shutdown(WorkerThread::Exit::ExecPending);
+ }
+ TEST_EQUAL(value, 1024, ());
+}
+} // namespace
diff --git a/base/worker_thread.cpp b/base/worker_thread.cpp
new file mode 100644
index 0000000000..9556fb581d
--- /dev/null
+++ b/base/worker_thread.cpp
@@ -0,0 +1,73 @@
+#include "base/worker_thread.hpp"
+
+using namespace std;
+
+namespace base
+{
+WorkerThread::WorkerThread()
+{
+ m_thread = threads::SimpleThread(&WorkerThread::ProcessTasks, this);
+}
+
+WorkerThread::~WorkerThread()
+{
+ ASSERT(m_checker.CalledOnOriginalThread(), ());
+ Shutdown(Exit::SkipPending);
+ m_thread.join();
+}
+
+void WorkerThread::ProcessTasks()
+{
+ unique_lock<mutex> lk(m_mu, defer_lock);
+
+ while (true)
+ {
+ Task task;
+
+ {
+ lk.lock();
+ m_cv.wait(lk, [this]() { return m_shutdown || !m_queue.empty(); });
+
+ if (m_shutdown)
+ {
+ switch (m_exit)
+ {
+ case Exit::ExecPending:
+ {
+ while (!m_queue.empty())
+ {
+ m_queue.front()();
+ m_queue.pop();
+ }
+ break;
+ }
+ case Exit::SkipPending: break;
+ }
+
+ break;
+ }
+
+ CHECK(!m_queue.empty(), ());
+ task = move(m_queue.front());
+ m_queue.pop();
+ lk.unlock();
+ }
+
+ task();
+ }
+}
+
+void WorkerThread::Shutdown(Exit e)
+{
+ ASSERT(m_checker.CalledOnOriginalThread(), ());
+
+ if (m_shutdown)
+ return;
+
+ CHECK(!m_shutdown, ());
+ lock_guard<mutex> lk(m_mu);
+ m_shutdown = true;
+ m_exit = e;
+ m_cv.notify_one();
+}
+} // namespace base
diff --git a/base/worker_thread.hpp b/base/worker_thread.hpp
new file mode 100644
index 0000000000..d8ddd48ae5
--- /dev/null
+++ b/base/worker_thread.hpp
@@ -0,0 +1,58 @@
+#pragma once
+
+#include "base/assert.hpp"
+#include "base/thread.hpp"
+#include "base/thread_checker.hpp"
+
+#include <condition_variable>
+#include <mutex>
+#include <queue>
+#include <utility>
+
+namespace base
+{
+// This class represents a simple worker thread with a queue of tasks.
+//
+// *NOTE* This class is not thread-safe.
+class WorkerThread
+{
+public:
+ enum class Exit
+ {
+ ExecPending,
+ SkipPending
+ };
+
+ using Task = std::function<void()>;
+
+ WorkerThread();
+ ~WorkerThread();
+
+ template <typename T>
+ void Push(T && t)
+ {
+ ASSERT(m_checker.CalledOnOriginalThread(), ());
+ CHECK(!m_shutdown, ());
+
+ std::lock_guard<std::mutex> lk(m_mu);
+ m_queue.emplace(std::forward<T>(t));
+ m_cv.notify_one();
+ }
+
+ void Shutdown(Exit e);
+
+private:
+ void ProcessTasks();
+
+ threads::SimpleThread m_thread;
+ std::mutex m_mu;
+ std::condition_variable m_cv;
+
+ bool m_shutdown = false;
+ Exit m_exit = Exit::SkipPending;
+
+ std::queue<Task> m_queue;
+
+ ThreadChecker m_checker;
+};
+} // namespace base