From ca7a32dcacdc633cc28aff41b924fafc0a4d2ac3 Mon Sep 17 00:00:00 2001 From: Yuri Gorshenin Date: Mon, 26 Jun 2017 18:27:47 +0300 Subject: Review fixes. --- base/base_tests/worker_thread_tests.cpp | 22 +++++++++++----------- base/worker_thread.cpp | 29 ++++++++++++++--------------- base/worker_thread.hpp | 17 +++++++++++------ 3 files changed, 36 insertions(+), 32 deletions(-) (limited to 'base') diff --git a/base/base_tests/worker_thread_tests.cpp b/base/base_tests/worker_thread_tests.cpp index c2c825f319..be601e1540 100644 --- a/base/base_tests/worker_thread_tests.cpp +++ b/base/base_tests/worker_thread_tests.cpp @@ -18,12 +18,12 @@ UNIT_TEST(WorkerThread_Smoke) { WorkerThread thread; - thread.Shutdown(WorkerThread::Exit::SkipPending); + TEST(thread.Shutdown(WorkerThread::Exit::SkipPending), ()); } { WorkerThread thread; - thread.Shutdown(WorkerThread::Exit::ExecPending); + TEST(thread.Shutdown(WorkerThread::Exit::ExecPending), ()); } } @@ -36,14 +36,14 @@ UNIT_TEST(WorkerThread_SimpleSync) bool done = false; WorkerThread thread; - thread.Push([&value]() { ++value; }); - thread.Push([&value]() { value *= 2; }); - thread.Push([&value]() { value = value * value * value; }); - thread.Push([&]() { + TEST(thread.Push([&value]() { ++value; }), ()); + TEST(thread.Push([&value]() { value *= 2; }), ()); + TEST(thread.Push([&value]() { value = value * value * value; }), ()); + TEST(thread.Push([&]() { lock_guard lk(mu); done = true; cv.notify_one(); - }); + }), ()); { unique_lock lk(mu); @@ -58,12 +58,12 @@ UNIT_TEST(WorkerThread_SimpleFlush) int value = 0; { WorkerThread thread; - thread.Push([&value]() { ++value; }); - thread.Push([&value]() { + TEST(thread.Push([&value]() { ++value; }), ()); + TEST(thread.Push([&value]() { for (int i = 0; i < 10; ++i) value *= 2; - }); - thread.Shutdown(WorkerThread::Exit::ExecPending); + }), ()); + TEST(thread.Shutdown(WorkerThread::Exit::ExecPending), ()); } TEST_EQUAL(value, 1024, ()); } diff --git a/base/worker_thread.cpp b/base/worker_thread.cpp index 9556fb581d..f0dc9af050 100644 --- a/base/worker_thread.cpp +++ b/base/worker_thread.cpp @@ -18,6 +18,8 @@ WorkerThread::~WorkerThread() void WorkerThread::ProcessTasks() { + queue pending; + unique_lock lk(m_mu, defer_lock); while (true) @@ -33,17 +35,11 @@ void WorkerThread::ProcessTasks() switch (m_exit) { case Exit::ExecPending: - { - while (!m_queue.empty()) - { - m_queue.front()(); - m_queue.pop(); - } + CHECK(pending.empty(), ()); + m_queue.swap(pending); break; - } case Exit::SkipPending: break; } - break; } @@ -55,19 +51,22 @@ void WorkerThread::ProcessTasks() task(); } + + while (!pending.empty()) + { + pending.front()(); + pending.pop(); + } } -void WorkerThread::Shutdown(Exit e) +bool WorkerThread::Shutdown(Exit e) { - ASSERT(m_checker.CalledOnOriginalThread(), ()); - - if (m_shutdown) - return; - - CHECK(!m_shutdown, ()); lock_guard lk(m_mu); + if (m_shutdown) + return false; m_shutdown = true; m_exit = e; m_cv.notify_one(); + return true; } } // namespace base diff --git a/base/worker_thread.hpp b/base/worker_thread.hpp index d8ddd48ae5..2e8b7b374b 100644 --- a/base/worker_thread.hpp +++ b/base/worker_thread.hpp @@ -13,7 +13,8 @@ namespace base { // This class represents a simple worker thread with a queue of tasks. // -// *NOTE* This class is not thread-safe. +// *NOTE* This class IS thread-safe, but it must be destroyed on the +// same thread it was created. class WorkerThread { public: @@ -28,18 +29,22 @@ public: WorkerThread(); ~WorkerThread(); + // Pushes task to the end of the thread's queue. Returns false when + // the thread is shut down. template - void Push(T && t) + bool Push(T && t) { - ASSERT(m_checker.CalledOnOriginalThread(), ()); - CHECK(!m_shutdown, ()); - std::lock_guard lk(m_mu); + if (m_shutdown) + return false; m_queue.emplace(std::forward(t)); m_cv.notify_one(); + return true; } - void Shutdown(Exit e); + // Sends a signal to the thread to shut down. Returns false when the + // thread was shut down previously. + bool Shutdown(Exit e); private: void ProcessTasks(); -- cgit v1.2.3