From c66ba10ca949bdcce5f1029634f46b3db092a378 Mon Sep 17 00:00:00 2001 From: "Niall Douglas (s [underscore] sourceforge {at} nedprod [dot] com)" Date: Wed, 17 Mar 2021 12:23:58 +0000 Subject: Fix all tsan failures for both native and libdispatch backends of dynamic_thread_pool_group. If this passes CI, it's ready for merge. --- include/llfio/revision.hpp | 6 +- .../v2.0/detail/impl/dynamic_thread_pool_group.ipp | 112 ++++++++------------- include/llfio/v2.0/dynamic_thread_pool_group.hpp | 10 +- 3 files changed, 50 insertions(+), 78 deletions(-) diff --git a/include/llfio/revision.hpp b/include/llfio/revision.hpp index f0eeb8c2..2cd9f8b5 100644 --- a/include/llfio/revision.hpp +++ b/include/llfio/revision.hpp @@ -1,4 +1,4 @@ // Note the second line of this file must ALWAYS be the git SHA, third line ALWAYS the git SHA update time -#define LLFIO_PREVIOUS_COMMIT_REF 45c0392e65da7e53984b3b734fbb810bb38561c8 -#define LLFIO_PREVIOUS_COMMIT_DATE "2021-03-11 11:47:09 +00:00" -#define LLFIO_PREVIOUS_COMMIT_UNIQUE 45c0392e +#define LLFIO_PREVIOUS_COMMIT_REF 67226948b9f00aebbf33c232d10c417ba1abb289 +#define LLFIO_PREVIOUS_COMMIT_DATE "2021-03-16 12:31:40 +00:00" +#define LLFIO_PREVIOUS_COMMIT_UNIQUE 67226948 diff --git a/include/llfio/v2.0/detail/impl/dynamic_thread_pool_group.ipp b/include/llfio/v2.0/detail/impl/dynamic_thread_pool_group.ipp index 9dff9425..287683bf 100644 --- a/include/llfio/v2.0/detail/impl/dynamic_thread_pool_group.ipp +++ b/include/llfio/v2.0/detail/impl/dynamic_thread_pool_group.ipp @@ -188,7 +188,6 @@ Benchmarking llfio (Linux native) ... */ - /* Windows 4Kb and 64Kb Win32 thread pool Benchmarking llfio (Win32 thread pool (Vista+)) ... @@ -1141,47 +1140,54 @@ namespace detail result _prepare_work_item_delay(dynamic_thread_pool_group::work_item *workitem, grouph_type grouph, deadline d) { + (void) grouph; if(!d) { return errc::invalid_argument; } - workitem->_timepoint1 = {}; - workitem->_timepoint2 = {}; - assert(!workitem->_has_timer_set()); - if(workitem->_nextwork == 0 || d.nsecs > 0) + if(workitem->_nextwork.load(std::memory_order_acquire) == 0 || d.nsecs > 0) { if(d.nsecs > 0) { if(d.steady) { workitem->_timepoint1 = std::chrono::steady_clock::now() + std::chrono::nanoseconds(d.nsecs); + workitem->_timepoint2 = {}; } else { + workitem->_timepoint1 = {}; workitem->_timepoint2 = d.to_time_point(); } } else { workitem->_timepoint1 = std::chrono::steady_clock::time_point(std::chrono::steady_clock::duration(1)); + workitem->_timepoint2 = {}; } assert(workitem->_has_timer_set()); +#if defined(_WIN32) if(nullptr == workitem->_internaltimerh) { -#if LLFIO_DYNAMIC_THREAD_POOL_GROUP_USING_GCD - (void) grouph; - workitem->_internaltimerh = (void *) (uintptr_t) -1; -#elif defined(_WIN32) workitem->_internaltimerh = CreateThreadpoolTimer(_win32_timer_thread_callback, workitem, grouph); if(nullptr == workitem->_internaltimerh) { return win32_error(); } -#else - (void) grouph; - workitem->_internaltimerh = (void *) (uintptr_t) -1; + } #endif + } + else + { + if(workitem->_timepoint1 != std::chrono::steady_clock::time_point()) + { + workitem->_timepoint1 = {}; } + if(workitem->_timepoint2 != std::chrono::system_clock::time_point()) + { + workitem->_timepoint2 = {}; + } + assert(!workitem->_has_timer_set()); } return success(); } @@ -1296,6 +1302,7 @@ public: LLFIO_LOG_FUNCTION_CALL(this); (void) wait(); auto &impl = detail::global_dynamic_thread_pool(); + // detail::dynamic_thread_pool_group_impl_guard g1(_lock); #if LLFIO_DYNAMIC_THREAD_POOL_GROUP_USING_GCD if(nullptr != _grouph) { @@ -1309,7 +1316,7 @@ public: _grouph = nullptr; } #endif - detail::global_dynamic_thread_pool_impl::workqueue_guard g(impl.workqueue_lock); + detail::global_dynamic_thread_pool_impl::workqueue_guard g2(impl.workqueue_lock); assert(impl.workqueue->nesting_level >= _nesting_level); for(auto *p = impl.workqueue.get(); p != nullptr; p = p->next.get()) { @@ -1648,13 +1655,13 @@ namespace detail { (void) submit_into_highest_priority; (void) defer_pool_wake; - if(workitem->_nextwork != -1) + const auto nextwork = workitem->_nextwork.load(std::memory_order_acquire); + if(nextwork != -1) { auto *parent = workitem->_parent.load(std::memory_order_relaxed); // If no work item for now, or there is a delay, schedule a timer - if(workitem->_nextwork == 0 || workitem->_has_timer_set()) + if(nextwork == 0 || workitem->_has_timer_set()) { - assert(workitem->_internaltimerh != nullptr); #if LLFIO_DYNAMIC_THREAD_POOL_GROUP_USING_GCD dispatch_time_t when; if(workitem->_has_timer_set_relative()) @@ -1802,8 +1809,6 @@ namespace detail } #if !LLFIO_DYNAMIC_THREAD_POOL_GROUP_USING_GCD && !defined(_WIN32) - // Indicate that I can be executed again - workitem->_internalworkh = nullptr; #if LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING std::cout << "*** DTP submits work item " << workitem << std::endl; #endif @@ -1854,10 +1859,7 @@ namespace detail for(auto *i : work) { _remove_from_list(group->_work_items_active, i); -#if LLFIO_DYNAMIC_THREAD_POOL_GROUP_USING_GCD - i->_internalworkh = nullptr; - i->_internaltimerh = nullptr; -#elif defined(_WIN32) +#if defined(_WIN32) if(nullptr != i->_internaltimerh) { CloseThreadpoolTimer((PTP_TIMER) i->_internaltimerh); @@ -1876,16 +1878,14 @@ namespace detail { deadline d(std::chrono::seconds(0)); i->_parent.store(group, std::memory_order_release); - i->_nextwork = i->next(d); - if(-1 == i->_nextwork) + i->_nextwork.store(i->next(d), std::memory_order_release); + if(-1 == i->_nextwork.load(std::memory_order_acquire)) { _append_to_list(group->_work_items_done, i); } else { -#if LLFIO_DYNAMIC_THREAD_POOL_GROUP_USING_GCD - i->_internalworkh = (void *) (uintptr_t) -1; -#elif defined(_WIN32) +#if defined(_WIN32) i->_internalworkh = CreateThreadpoolWork(_win32_worker_thread_callback, i, group->_grouph); if(nullptr == i->_internalworkh) { @@ -1908,6 +1908,7 @@ namespace detail _submit_work_item(true, i, i != work.back()); } } + g.lock(); return success(); } catch(...) @@ -1923,10 +1924,7 @@ namespace detail auto *parent = i->_parent.load(std::memory_order_relaxed); _remove_from_list(parent->_work_items_active, i); _append_to_list(parent->_work_items_done, i); -#if LLFIO_DYNAMIC_THREAD_POOL_GROUP_USING_GCD - i->_internaltimerh = nullptr; - i->_internalworkh = nullptr; -#elif defined(_WIN32) +#if defined(_WIN32) if(i->_internalworkh_inuse > 0) { i->_internalworkh_inuse = 2; @@ -1944,12 +1942,6 @@ namespace detail i->_internalworkh = nullptr; } } -#else - i->_internaltimerh = nullptr; - i->_internalworkh = nullptr; -#if LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING - std::cout << "*** DTP sets done work item " << i << std::endl; -#endif #endif if(parent->_work_items_active.count == 0) { @@ -1958,14 +1950,13 @@ namespace detail for(; v != nullptr; v = n) { v->_parent.store(nullptr, std::memory_order_release); - v->_nextwork = -1; + v->_nextwork.store(-1, std::memory_order_release); n = v->_next; } n = v = parent->_work_items_done.front; parent->_work_items_done.front = parent->_work_items_done.back = nullptr; parent->_work_items_done.count = 0; parent->_stopping.store(false, std::memory_order_release); - parent->_stopped.store(true, std::memory_order_release); parent->_completing.store(true, std::memory_order_release); // cause submissions to enter _work_items_delayed #if LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING std::cout << "*** DTP executes group_complete for group " << parent << std::endl; @@ -1975,6 +1966,7 @@ namespace detail n = v->_next; v->group_complete(parent->_abnormal_completion_cause); } + parent->_stopped.store(true, std::memory_order_release); parent->_completing.store(false, std::memory_order_release); // cease submitting to _work_items_delayed #if LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING std::cout << "*** DTP group_complete done for group " << parent << ". _work_items_delayed.count = " << parent->_work_items_delayed.count << std::endl; @@ -2207,26 +2199,6 @@ namespace detail } } #else -#if 0 - if(group->_stopping.load(std::memory_order_relaxed)) - { - // Kill all work items not currently being executed immediately - for(bool done = false; !done;) - { - done = true; - for(auto *p = group->_work_items_active.front; p != nullptr; p = p->_next) - { - if(p->_internalworkh == nullptr) - { - _remove_from_list(group->_work_items_active, p); - _append_to_list(group->_work_items_done, p); - done = false; - break; - } - } - } - } -#endif while(group->_work_items_active.count > 0) { LLFIO_DEADLINE_TO_TIMEOUT_LOOP(d); @@ -2250,7 +2222,7 @@ namespace detail inline void global_dynamic_thread_pool_impl::_timerthread(dynamic_thread_pool_group::work_item *workitem, threadh_type /*unused*/) { LLFIO_LOG_FUNCTION_CALL(this); - assert(workitem->_nextwork != -1); + assert(workitem->_nextwork.load(std::memory_order_relaxed) != -1); assert(workitem->_has_timer_set()); auto *parent = workitem->_parent.load(std::memory_order_relaxed); // std::cout << "*** _timerthread " << workitem << std::endl; @@ -2287,10 +2259,10 @@ namespace detail workitem->_timepoint2 = {}; } assert(!workitem->_has_timer_set()); - if(workitem->_nextwork == 0) + if(workitem->_nextwork.load(std::memory_order_acquire) == 0) { deadline d(std::chrono::seconds(0)); - workitem->_nextwork = workitem->next(d); + workitem->_nextwork.store(workitem->next(d), std::memory_order_release); auto r2 = _prepare_work_item_delay(workitem, parent->_grouph, d); if(!r2) { @@ -2299,7 +2271,7 @@ namespace detail _work_item_done(g, workitem); return; } - if(-1 == workitem->_nextwork) + if(-1 == workitem->_nextwork.load(std::memory_order_relaxed)) { dynamic_thread_pool_group_impl_guard g(parent->_lock); // lock group _work_item_done(g, workitem); @@ -2317,10 +2289,10 @@ namespace detail LLFIO_LOG_FUNCTION_CALL(this); //{ // _lock_guard g(parent->_lock); - // std::cout << "*** _workerthread " << workitem << " begins with work " << workitem->_nextwork << std::endl; + // std::cout << "*** _workerthread " << workitem << " begins with work " << workitem->_nextwork.load(std::memory_order_relaxed) << std::endl; //} - assert(workitem->_nextwork != -1); - assert(workitem->_nextwork != 0); + assert(workitem->_nextwork.load(std::memory_order_relaxed) != -1); + assert(workitem->_nextwork.load(std::memory_order_relaxed) != 0); auto *parent = workitem->_parent.load(std::memory_order_relaxed); if(parent->_stopping.load(std::memory_order_relaxed)) { @@ -2333,8 +2305,8 @@ namespace detail tls.workitem = workitem; tls.current_callback_instance = selfthreadh; tls.nesting_level = parent->_nesting_level + 1; - auto r = (*workitem)(workitem->_nextwork); - workitem->_nextwork = 0; // call next() next time + auto r = (*workitem)(workitem->_nextwork.load(std::memory_order_acquire)); + workitem->_nextwork.store(0, std::memory_order_release); // call next() next time tls = old_thread_local_state; // std::cout << "*** _workerthread " << workitem << " ends with work " << workitem->_nextwork << std::endl; if(!r) @@ -2352,7 +2324,7 @@ namespace detail else { deadline d(std::chrono::seconds(0)); - workitem->_nextwork = workitem->next(d); + workitem->_nextwork.store(workitem->next(d), std::memory_order_release); auto r2 = _prepare_work_item_delay(workitem, parent->_grouph, d); if(!r2) { @@ -2361,7 +2333,7 @@ namespace detail _work_item_done(g, workitem); return; } - if(-1 == workitem->_nextwork) + if(-1 == workitem->_nextwork.load(std::memory_order_relaxed)) { dynamic_thread_pool_group_impl_guard g(parent->_lock); // lock group _work_item_done(g, workitem); diff --git a/include/llfio/v2.0/dynamic_thread_pool_group.hpp b/include/llfio/v2.0/dynamic_thread_pool_group.hpp index 2f89ce98..09828a6e 100644 --- a/include/llfio/v2.0/dynamic_thread_pool_group.hpp +++ b/include/llfio/v2.0/dynamic_thread_pool_group.hpp @@ -194,7 +194,7 @@ public: void *_internalworkh{nullptr}; void *_internaltimerh{nullptr}; // lazily created if next() ever returns a deadline work_item *_prev{nullptr}, *_next{nullptr}, *_next_scheduled{nullptr}; - intptr_t _nextwork{-1}; + std::atomic _nextwork{-1}; std::chrono::steady_clock::time_point _timepoint1; std::chrono::system_clock::time_point _timepoint2; int _internalworkh_inuse{0}; @@ -213,7 +213,7 @@ public: , _prev(o._prev) , _next(o._next) , _next_scheduled(o._next_scheduled) - , _nextwork(o._nextwork) + , _nextwork(o._nextwork.load(std::memory_order_relaxed)) , _timepoint1(o._timepoint1) , _timepoint2(o._timepoint2) , _internalworkh_inuse(o._internalworkh_inuse) @@ -227,7 +227,7 @@ public: abort(); } o._prev = o._next = o._next_scheduled = nullptr; - o._nextwork = -1; + o._nextwork.store(-1, std::memory_order_relaxed); o._internalworkh_inuse = 0; } work_item &operator=(const work_item &) = delete; @@ -236,8 +236,8 @@ public: public: virtual ~work_item() { - assert(_nextwork == -1); - if(_nextwork != -1) + assert(_nextwork.load(std::memory_order_relaxed) == -1); + if(_nextwork.load(std::memory_order_relaxed) != -1) { LLFIO_LOG_FATAL(this, "FATAL: dynamic_thread_pool_group::work_item destroyed before all work was done!"); abort(); -- cgit v1.2.3