diff options
Diffstat (limited to 'base')
-rw-r--r-- | base/CMakeLists.txt | 6 | ||||
-rw-r--r-- | base/base.pro | 2 | ||||
-rw-r--r-- | base/base_tests/CMakeLists.txt | 1 | ||||
-rw-r--r-- | base/base_tests/base_tests.pro | 1 | ||||
-rw-r--r-- | base/base_tests/logging_test.cpp | 17 | ||||
-rw-r--r-- | base/base_tests/worker_thread_tests.cpp | 89 | ||||
-rw-r--r-- | base/logging.hpp | 9 | ||||
-rw-r--r-- | base/newtype.hpp | 19 | ||||
-rw-r--r-- | base/worker_thread.cpp | 69 | ||||
-rw-r--r-- | base/worker_thread.hpp | 63 |
10 files changed, 265 insertions, 11 deletions
diff --git a/base/CMakeLists.txt b/base/CMakeLists.txt index e5abe74ff4..9112a18640 100644 --- a/base/CMakeLists.txt +++ b/base/CMakeLists.txt @@ -22,6 +22,8 @@ set( get_time.hpp gmtime.cpp gmtime.hpp + internal/message.cpp + internal/message.hpp levenshtein_dfa.cpp levenshtein_dfa.hpp limited_priority_queue.hpp @@ -74,14 +76,14 @@ set( threaded_container.cpp threaded_container.hpp threaded_list.hpp - internal/message.cpp - internal/message.hpp timegm.cpp timegm.hpp timer.cpp timer.hpp uni_string_dfa.cpp uni_string_dfa.hpp + worker_thread.cpp + worker_thread.hpp ) add_library(${PROJECT_NAME} ${SRC}) diff --git a/base/base.pro b/base/base.pro index 2a1b95af0d..c9fdc71316 100644 --- a/base/base.pro +++ b/base/base.pro @@ -34,6 +34,7 @@ SOURCES += \ timegm.cpp \ timer.cpp \ uni_string_dfa.cpp \ + worker_thread.cpp \ HEADERS += \ SRC_FIRST.hpp \ @@ -93,3 +94,4 @@ HEADERS += \ timegm.hpp \ timer.hpp \ uni_string_dfa.hpp \ + worker_thread.hpp \ diff --git a/base/base_tests/CMakeLists.txt b/base/base_tests/CMakeLists.txt index e04f68622a..2de3cf2b09 100644 --- a/base/base_tests/CMakeLists.txt +++ b/base/base_tests/CMakeLists.txt @@ -36,6 +36,7 @@ set( timegm_test.cpp timer_test.cpp uni_string_dfa_test.cpp + worker_thread_tests.cpp ) omim_add_test(${PROJECT_NAME} ${SRC}) diff --git a/base/base_tests/base_tests.pro b/base/base_tests/base_tests.pro index 57ddc14452..6b7bce0be3 100644 --- a/base/base_tests/base_tests.pro +++ b/base/base_tests/base_tests.pro @@ -46,5 +46,6 @@ SOURCES += \ timegm_test.cpp \ timer_test.cpp \ uni_string_dfa_test.cpp \ + worker_thread_tests.cpp \ HEADERS += diff --git a/base/base_tests/logging_test.cpp b/base/base_tests/logging_test.cpp index ff5e50ad16..b7b39c994b 100644 --- a/base/base_tests/logging_test.cpp +++ b/base/base_tests/logging_test.cpp @@ -18,6 +18,12 @@ namespace g_SomeFunctionCalled = true; return 3; } + + bool BoolFunction(bool result, bool & called) + { + called = true; + return result; + } } UNIT_TEST(Logging_Level) @@ -43,3 +49,14 @@ UNIT_TEST(NullMessage) char const * ptr = 0; LOG(LINFO, ("Null message test", ptr)); } + +UNIT_TEST(Logging_ConditionalLog) +{ + bool isCalled = false; + CLOG(LINFO, BoolFunction(true, isCalled), ("This should not be displayed")); + TEST(isCalled, ()); + + isCalled = false; + CLOG(LWARNING, BoolFunction(false, isCalled), ("This should be displayed")); + TEST(isCalled, ()); +} diff --git a/base/base_tests/worker_thread_tests.cpp b/base/base_tests/worker_thread_tests.cpp new file mode 100644 index 0000000000..715f5852c9 --- /dev/null +++ b/base/base_tests/worker_thread_tests.cpp @@ -0,0 +1,89 @@ +#include "testing/testing.hpp" + +#include "base/worker_thread.hpp" + +#include <condition_variable> +#include <future> +#include <mutex> + +using namespace base; +using namespace std; + +namespace +{ +UNIT_TEST(WorkerThread_Smoke) +{ + { + WorkerThread thread; + } + + { + WorkerThread thread; + TEST(thread.Shutdown(WorkerThread::Exit::SkipPending), ()); + } + + { + WorkerThread thread; + TEST(thread.Shutdown(WorkerThread::Exit::ExecPending), ()); + } +} + +UNIT_TEST(WorkerThread_SimpleSync) +{ + int value = 0; + + mutex mu; + condition_variable cv; + bool done = false; + + WorkerThread thread; + TEST(thread.Push([&value]() { ++value; }), ()); + TEST(thread.Push([&value]() { value *= 2; }), ()); + TEST(thread.Push([&value]() { value = value * value * value; }), ()); + TEST(thread.Push([&]() { + lock_guard<mutex> lk(mu); + done = true; + cv.notify_one(); + }), ()); + + { + unique_lock<mutex> lk(mu); + cv.wait(lk, [&done]() { return done; }); + } + + TEST_EQUAL(value, 8, ()); +} + +UNIT_TEST(WorkerThread_SimpleFlush) +{ + int value = 0; + { + WorkerThread thread; + TEST(thread.Push([&value]() { ++value; }), ()); + TEST(thread.Push([&value]() { + for (int i = 0; i < 10; ++i) + value *= 2; + }), ()); + TEST(thread.Shutdown(WorkerThread::Exit::ExecPending), ()); + } + TEST_EQUAL(value, 1024, ()); +} + +UNIT_TEST(WorkerThread_PushFromPendingTask) +{ + // promise - future pair is used as a socketpair here to pass a + // signal from the main thread to the worker thread. + promise<void> p; + auto f = p.get_future(); + + WorkerThread thread; + bool const rv = thread.Push([&f, &thread]() { + f.get(); + bool const rv = thread.Push([]() { TEST(false, ("This task should not be executed")); }); + TEST(!rv, ()); + }); + TEST(rv, ()); + thread.Shutdown(WorkerThread::Exit::ExecPending); + p.set_value(); +} +} // namespace diff --git a/base/logging.hpp b/base/logging.hpp index 163041e0fb..10fdf9502c 100644 --- a/base/logging.hpp +++ b/base/logging.hpp @@ -70,6 +70,7 @@ using ::my::LINFO; using ::my::LWARNING; using ::my::LERROR; using ::my::LCRITICAL; +using ::my::NUM_LOG_LEVELS; // Logging macro. // Example usage: LOG(LINFO, (Calc(), m_Var, "Some string constant")); @@ -87,3 +88,11 @@ using ::my::LCRITICAL; if ((level) >= ::my::g_LogLevel) \ ::my::LogMessage(level, my::SrcPoint(), ::my::impl::Message msg); \ } while (false) + +// Conditional log. Logs @msg with level @level in case when @X returns false. +#define CLOG(level, X, msg) \ + do \ + { \ + if (!(X)) \ + LOG(level, (SRC(), "CLOG(" #X ")", ::my::impl::Message msg)); \ + } while (false) diff --git a/base/newtype.hpp b/base/newtype.hpp index f17194e779..50f0cab9fc 100644 --- a/base/newtype.hpp +++ b/base/newtype.hpp @@ -1,6 +1,7 @@ #pragma once #include <iostream> +#include <string> #include <type_traits> namespace my @@ -144,7 +145,7 @@ private: namespace newtype_default_output { template <typename Type, typename Tag> -string SimpleDebugPrint(NewType<Type, Tag> const & nt) +std::string SimpleDebugPrint(NewType<Type, Tag> const & nt) { return ::DebugPrint(nt.Get()); } @@ -155,12 +156,12 @@ string SimpleDebugPrint(NewType<Type, Tag> const & nt) struct NAME ## _tag; \ using NAME = my::NewType<REPR, NAME ## _tag> -#define NEWTYPE_SIMPLE_OUTPUT(NAME) \ - inline string DebugPrint(NAME const & nt) \ - { \ - return my::newtype_default_output::SimpleDebugPrint(nt); \ - } \ - inline ostream & operator<<(ostream & ost, NAME const & nt) \ - { \ - return ost << my::newtype_default_output::SimpleDebugPrint(nt); \ +#define NEWTYPE_SIMPLE_OUTPUT(NAME) \ + inline std::string DebugPrint(NAME const & nt) \ + { \ + return my::newtype_default_output::SimpleDebugPrint(nt); \ + } \ + inline std::ostream & operator<<(std::ostream & ost, NAME const & nt) \ + { \ + return ost << my::newtype_default_output::SimpleDebugPrint(nt); \ } diff --git a/base/worker_thread.cpp b/base/worker_thread.cpp new file mode 100644 index 0000000000..dffbdc2ee2 --- /dev/null +++ b/base/worker_thread.cpp @@ -0,0 +1,69 @@ +#include "base/worker_thread.hpp" + +using namespace std; + +namespace base +{ +WorkerThread::WorkerThread() +{ + m_thread = threads::SimpleThread(&WorkerThread::ProcessTasks, this); +} + +WorkerThread::~WorkerThread() +{ + ASSERT(m_checker.CalledOnOriginalThread(), ()); + Shutdown(Exit::SkipPending); + m_thread.join(); +} + +void WorkerThread::ProcessTasks() +{ + queue<Task> pending; + + while (true) + { + Task task; + + { + unique_lock<mutex> lk(m_mu); + m_cv.wait(lk, [this]() { return m_shutdown || !m_queue.empty(); }); + + if (m_shutdown) + { + switch (m_exit) + { + case Exit::ExecPending: + CHECK(pending.empty(), ()); + m_queue.swap(pending); + break; + case Exit::SkipPending: break; + } + break; + } + + CHECK(!m_queue.empty(), ()); + task = move(m_queue.front()); + m_queue.pop(); + } + + task(); + } + + while (!pending.empty()) + { + pending.front()(); + pending.pop(); + } +} + +bool WorkerThread::Shutdown(Exit e) +{ + lock_guard<mutex> lk(m_mu); + if (m_shutdown) + return false; + m_shutdown = true; + m_exit = e; + m_cv.notify_one(); + return true; +} +} // namespace base diff --git a/base/worker_thread.hpp b/base/worker_thread.hpp new file mode 100644 index 0000000000..2e8b7b374b --- /dev/null +++ b/base/worker_thread.hpp @@ -0,0 +1,63 @@ +#pragma once + +#include "base/assert.hpp" +#include "base/thread.hpp" +#include "base/thread_checker.hpp" + +#include <condition_variable> +#include <mutex> +#include <queue> +#include <utility> + +namespace base +{ +// This class represents a simple worker thread with a queue of tasks. +// +// *NOTE* This class IS thread-safe, but it must be destroyed on the +// same thread it was created. +class WorkerThread +{ +public: + enum class Exit + { + ExecPending, + SkipPending + }; + + using Task = std::function<void()>; + + WorkerThread(); + ~WorkerThread(); + + // Pushes task to the end of the thread's queue. Returns false when + // the thread is shut down. + template <typename T> + bool Push(T && t) + { + std::lock_guard<std::mutex> lk(m_mu); + if (m_shutdown) + return false; + m_queue.emplace(std::forward<T>(t)); + m_cv.notify_one(); + return true; + } + + // Sends a signal to the thread to shut down. Returns false when the + // thread was shut down previously. + bool Shutdown(Exit e); + +private: + void ProcessTasks(); + + threads::SimpleThread m_thread; + std::mutex m_mu; + std::condition_variable m_cv; + + bool m_shutdown = false; + Exit m_exit = Exit::SkipPending; + + std::queue<Task> m_queue; + + ThreadChecker m_checker; +}; +} // namespace base |