From 8187ce775d9f3c06ce92b0c0aace01c80ca958d3 Mon Sep 17 00:00:00 2001 From: mgsergio Date: Mon, 19 Jun 2017 15:52:47 +0300 Subject: Merge pull request #6300 from ygorshenin/worker-thread [base] WorkerThread. --- base/CMakeLists.txt | 6 ++- base/base.pro | 2 + base/base_tests/CMakeLists.txt | 1 + base/base_tests/base_tests.pro | 1 + base/base_tests/worker_thread_tests.cpp | 70 +++++++++++++++++++++++++++++++ base/worker_thread.cpp | 73 +++++++++++++++++++++++++++++++++ base/worker_thread.hpp | 58 ++++++++++++++++++++++++++ 7 files changed, 209 insertions(+), 2 deletions(-) create mode 100644 base/base_tests/worker_thread_tests.cpp create mode 100644 base/worker_thread.cpp create mode 100644 base/worker_thread.hpp (limited to 'base') diff --git a/base/CMakeLists.txt b/base/CMakeLists.txt index e4d124537a..1ef8cf87ea 100644 --- a/base/CMakeLists.txt +++ b/base/CMakeLists.txt @@ -22,6 +22,8 @@ set( get_time.hpp gmtime.cpp gmtime.hpp + internal/message.cpp + internal/message.hpp levenshtein_dfa.cpp levenshtein_dfa.hpp limited_priority_queue.hpp @@ -72,14 +74,14 @@ set( threaded_container.cpp threaded_container.hpp threaded_list.hpp - internal/message.cpp - internal/message.hpp timegm.cpp timegm.hpp timer.cpp timer.hpp uni_string_dfa.cpp uni_string_dfa.hpp + worker_thread.cpp + worker_thread.hpp ) add_library(${PROJECT_NAME} ${SRC}) diff --git a/base/base.pro b/base/base.pro index b9bf3d586e..1f0e31b62f 100644 --- a/base/base.pro +++ b/base/base.pro @@ -33,6 +33,7 @@ SOURCES += \ timegm.cpp \ timer.cpp \ uni_string_dfa.cpp \ + worker_thread.cpp \ HEADERS += \ SRC_FIRST.hpp \ @@ -91,3 +92,4 @@ HEADERS += \ timegm.hpp \ timer.hpp \ uni_string_dfa.hpp \ + worker_thread.hpp \ diff --git a/base/base_tests/CMakeLists.txt b/base/base_tests/CMakeLists.txt index 2a31f84d0c..724127020f 100644 --- a/base/base_tests/CMakeLists.txt +++ b/base/base_tests/CMakeLists.txt @@ -35,6 +35,7 @@ set( timegm_test.cpp timer_test.cpp uni_string_dfa_test.cpp + worker_thread_tests.cpp ) omim_add_test(${PROJECT_NAME} ${SRC}) diff --git a/base/base_tests/base_tests.pro b/base/base_tests/base_tests.pro index a057e05ed7..c848dca3b9 100644 --- a/base/base_tests/base_tests.pro +++ b/base/base_tests/base_tests.pro @@ -45,5 +45,6 @@ SOURCES += \ timegm_test.cpp \ timer_test.cpp \ uni_string_dfa_test.cpp \ + worker_thread_tests.cpp \ HEADERS += diff --git a/base/base_tests/worker_thread_tests.cpp b/base/base_tests/worker_thread_tests.cpp new file mode 100644 index 0000000000..c2c825f319 --- /dev/null +++ b/base/base_tests/worker_thread_tests.cpp @@ -0,0 +1,70 @@ +#include "testing/testing.hpp" + +#include "base/worker_thread.hpp" + +#include +#include + +using namespace base; +using namespace std; + +namespace +{ +UNIT_TEST(WorkerThread_Smoke) +{ + { + WorkerThread thread; + } + + { + WorkerThread thread; + thread.Shutdown(WorkerThread::Exit::SkipPending); + } + + { + WorkerThread thread; + thread.Shutdown(WorkerThread::Exit::ExecPending); + } +} + +UNIT_TEST(WorkerThread_SimpleSync) +{ + int value = 0; + + mutex mu; + condition_variable cv; + bool done = false; + + WorkerThread thread; + thread.Push([&value]() { ++value; }); + thread.Push([&value]() { value *= 2; }); + thread.Push([&value]() { value = value * value * value; }); + thread.Push([&]() { + lock_guard lk(mu); + done = true; + cv.notify_one(); + }); + + { + unique_lock lk(mu); + cv.wait(lk, [&done]() { return done; }); + } + + TEST_EQUAL(value, 8, ()); +} + +UNIT_TEST(WorkerThread_SimpleFlush) +{ + int value = 0; + { + WorkerThread thread; + thread.Push([&value]() { ++value; }); + thread.Push([&value]() { + for (int i = 0; i < 10; ++i) + value *= 2; + }); + thread.Shutdown(WorkerThread::Exit::ExecPending); + } + TEST_EQUAL(value, 1024, ()); +} +} // namespace diff --git a/base/worker_thread.cpp b/base/worker_thread.cpp new file mode 100644 index 0000000000..9556fb581d --- /dev/null +++ b/base/worker_thread.cpp @@ -0,0 +1,73 @@ +#include "base/worker_thread.hpp" + +using namespace std; + +namespace base +{ +WorkerThread::WorkerThread() +{ + m_thread = threads::SimpleThread(&WorkerThread::ProcessTasks, this); +} + +WorkerThread::~WorkerThread() +{ + ASSERT(m_checker.CalledOnOriginalThread(), ()); + Shutdown(Exit::SkipPending); + m_thread.join(); +} + +void WorkerThread::ProcessTasks() +{ + unique_lock lk(m_mu, defer_lock); + + while (true) + { + Task task; + + { + lk.lock(); + m_cv.wait(lk, [this]() { return m_shutdown || !m_queue.empty(); }); + + if (m_shutdown) + { + switch (m_exit) + { + case Exit::ExecPending: + { + while (!m_queue.empty()) + { + m_queue.front()(); + m_queue.pop(); + } + break; + } + case Exit::SkipPending: break; + } + + break; + } + + CHECK(!m_queue.empty(), ()); + task = move(m_queue.front()); + m_queue.pop(); + lk.unlock(); + } + + task(); + } +} + +void WorkerThread::Shutdown(Exit e) +{ + ASSERT(m_checker.CalledOnOriginalThread(), ()); + + if (m_shutdown) + return; + + CHECK(!m_shutdown, ()); + lock_guard lk(m_mu); + m_shutdown = true; + m_exit = e; + m_cv.notify_one(); +} +} // namespace base diff --git a/base/worker_thread.hpp b/base/worker_thread.hpp new file mode 100644 index 0000000000..d8ddd48ae5 --- /dev/null +++ b/base/worker_thread.hpp @@ -0,0 +1,58 @@ +#pragma once + +#include "base/assert.hpp" +#include "base/thread.hpp" +#include "base/thread_checker.hpp" + +#include +#include +#include +#include + +namespace base +{ +// This class represents a simple worker thread with a queue of tasks. +// +// *NOTE* This class is not thread-safe. +class WorkerThread +{ +public: + enum class Exit + { + ExecPending, + SkipPending + }; + + using Task = std::function; + + WorkerThread(); + ~WorkerThread(); + + template + void Push(T && t) + { + ASSERT(m_checker.CalledOnOriginalThread(), ()); + CHECK(!m_shutdown, ()); + + std::lock_guard lk(m_mu); + m_queue.emplace(std::forward(t)); + m_cv.notify_one(); + } + + void Shutdown(Exit e); + +private: + void ProcessTasks(); + + threads::SimpleThread m_thread; + std::mutex m_mu; + std::condition_variable m_cv; + + bool m_shutdown = false; + Exit m_exit = Exit::SkipPending; + + std::queue m_queue; + + ThreadChecker m_checker; +}; +} // namespace base -- cgit v1.2.3