From 476b74799c4504a248d2fe63ae060f8eead21c47 Mon Sep 17 00:00:00 2001 From: "Niall Douglas (s [underscore] sourceforge {at} nedprod [dot] com)" Date: Wed, 13 Jan 2021 12:32:41 +0000 Subject: wip native linux threadpool implementation for dynamic_thread_pool_group. --- CMakeLists.txt | 7 +- include/llfio/revision.hpp | 6 +- .../v2.0/detail/impl/dynamic_thread_pool_group.ipp | 536 +++++++++++++++++++-- include/llfio/v2.0/detail/impl/posix/statfs.ipp | 4 +- test/tests/dynamic_thread_pool_group.cpp | 18 +- 5 files changed, 524 insertions(+), 47 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index 0a82bd45..11750c94 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -291,18 +291,19 @@ if(NOT LLFIO_DISABLE_LIBDISPATCH) check_cxx_source_compiles(" #include int main() { - dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0); - return 0; + return dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0) != nullptr; } " LLFIO_HAS_LIBDISPATCH_${postfix}) endfunction() check_have_libdispatch(BUILTIN) if(NOT LLFIO_HAS_LIBDISPATCH_BUILTIN) - check_have_libdispatch(WITH_LIBDISPATCH) + check_have_libdispatch(WITH_LIBDISPATCH dispatch) if(LLFIO_HAS_LIBDISPATCH_WITH_LIBDISPATCH) all_link_libraries(PUBLIC dispatch) endif() endif() +else() + all_compile_definitions(PUBLIC LLFIO_DYNAMIC_THREAD_POOL_GROUP_USING_GCD=0) endif() if(NOT LLFIO_HAS_LIBDISPATCH_BUILTIN AND NOT LLFIO_HAS_LIBDISPATCH_WITH_LIBDISPATCH AND (CMAKE_SYSTEM_NAME MATCHES "FreeBSD" OR APPLE)) indented_message(FATAL_ERROR "FATAL: Grand Central Dispatch as libdispatch was not found on this FreeBSD or Mac OS system. libdispatch is required for LLFIO to build on those systems.") diff --git a/include/llfio/revision.hpp b/include/llfio/revision.hpp index c56a2976..e5f28dcc 100644 --- a/include/llfio/revision.hpp +++ b/include/llfio/revision.hpp @@ -1,4 +1,4 @@ // Note the second line of this file must ALWAYS be the git SHA, third line ALWAYS the git SHA update time -#define LLFIO_PREVIOUS_COMMIT_REF 96bff4fddf442a2526cda850876b82c191e4030a -#define LLFIO_PREVIOUS_COMMIT_DATE "2021-01-01 18:00:23 +00:00" -#define LLFIO_PREVIOUS_COMMIT_UNIQUE 96bff4fd +#define LLFIO_PREVIOUS_COMMIT_REF 45112b3cffebb5f8409c0edfc8c8879a0aeaf516 +#define LLFIO_PREVIOUS_COMMIT_DATE "2021-01-02 17:33:42 +00:00" +#define LLFIO_PREVIOUS_COMMIT_UNIQUE 45112b3c diff --git a/include/llfio/v2.0/detail/impl/dynamic_thread_pool_group.ipp b/include/llfio/v2.0/detail/impl/dynamic_thread_pool_group.ipp index 57e1e525..a04aef3c 100644 --- a/include/llfio/v2.0/detail/impl/dynamic_thread_pool_group.ipp +++ b/include/llfio/v2.0/detail/impl/dynamic_thread_pool_group.ipp @@ -47,13 +47,19 @@ Distributed under the Boost Software License, Version 1.0. #if __has_include() #include #define LLFIO_DYNAMIC_THREAD_POOL_GROUP_USING_GCD 1 -#else -#define LLFIO_DYNAMIC_THREAD_POOL_GROUP_USING_GCD 0 -#error Right now dynamic_thread_pool_group requires libdispatch to be available on POSIX. It should get auto discovered if installed, which is the default on BSD and Mac OS. Try installing libdispatch-dev if on Linux. #endif #endif #endif #endif +#if !LLFIO_DYNAMIC_THREAD_POOL_GROUP_USING_GCD && !defined(_WIN32) +#if !defined(__linux__) +#error dynamic_thread_pool_group requires Grand Central Dispatch (libdispatch) on non-Linux POSIX. +#endif +#include +#include +#endif + +#define LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING 0 LLFIO_V2_NAMESPACE_BEGIN @@ -61,6 +67,15 @@ namespace detail { struct global_dynamic_thread_pool_impl { + std::mutex workqueue_lock; + using _lock_guard = std::unique_lock; + struct workqueue_item + { + std::unordered_set items; + std::unordered_set::iterator currentgroup; + size_t currentgroupremaining{0}; + }; + std::vector workqueue; #if LLFIO_DYNAMIC_THREAD_POOL_GROUP_USING_GCD using threadh_type = void *; using grouph_type = dispatch_group_t; @@ -87,15 +102,39 @@ namespace detail auto *workitem = (dynamic_thread_pool_group::work_item *) Parameter; global_dynamic_thread_pool()._timerthread(workitem, threadh); } -#endif +#else + using threadh_type = void *; + using grouph_type = void *; + struct thread_t + { + thread_t *_prev{nullptr}, *_next{nullptr}; + std::thread thread; + std::condition_variable cond; + std::chrono::steady_clock::time_point last_did_work; + int state{0}; // <0 = dead, 0 = sleeping/please die, 1 = busy + }; + struct threads_t + { + size_t count{0}; + thread_t *front{nullptr}, *back{nullptr}; + } threadpool_active, threadpool_sleeping; + std::atomic total_submitted_workitems{0}, threadpool_threads{0}, threadpool_sleeping_count{0}; - std::mutex workqueue_lock; - using _lock_guard = std::unique_lock; - struct workqueue_item + std::mutex threadmetrics_lock; + struct threadmetrics_item { - std::unordered_set items; + threadmetrics_item *_prev{nullptr}, *_next{nullptr}; + uint64_t threadid{0}; + std::chrono::steady_clock::time_point last_updated, blocked_since; // latter set if thread seen no time + uint32_t diskfaults{0}, utime{0}, stime{0}; // culmulative ticks spent in user and system for this thread }; - std::vector workqueue; + struct threadmetrics_t + { + size_t count{0}; + threadmetrics_item *front{nullptr}, *back{nullptr}; + uint32_t blocked{0}, running{0}; + } threadmetrics; +#endif std::mutex io_aware_work_item_handles_lock; struct io_aware_work_item_statfs @@ -161,6 +200,77 @@ namespace detail what.count--; } +#if !LLFIO_DYNAMIC_THREAD_POOL_GROUP_USING_GCD && !defined(_WIN32) + inline void _execute_work(thread_t *self); + + void _add_thread(_lock_guard & /*unused*/) + { + thread_t *p = nullptr; + try + { + p = new thread_t; + _append_to_list(threadpool_active, p); + p->thread = std::thread([this, p] { _execute_work(p); }); + } + catch(...) + { + if(p != nullptr) + { + _remove_from_list(threadpool_active, p); + } + // drop failure + } + } + + bool _remove_thread(_lock_guard &g, threads_t &which) + { + if(which.count == 0) + { + return false; + } + // Threads which went to sleep the longest ago are at the front + auto *t = which.front; + assert(t->state == 0); + t->state--; +#if LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING + std::cout << "*** DTP " << t << " is told to quit" << std::endl; +#endif + do + { + g.unlock(); + t->cond.notify_one(); + g.lock(); + } while(t->state >= -1); +#if LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING + std::cout << "*** DTP " << t << " has quit, deleting" << std::endl; +#endif + _remove_from_list(threadpool_active, t); + t->thread.join(); + delete t; + return true; + } + + ~global_dynamic_thread_pool_impl() + { + _lock_guard g(workqueue_lock); // lock global + while(threadpool_active.count > 0 || threadpool_sleeping.count > 0) + { + while(threadpool_sleeping.count > 0) + { + auto removed = _remove_thread(g, threadpool_sleeping); + assert(removed); + (void) removed; + } + if(threadpool_active.count > 0) + { + auto removed = _remove_thread(g, threadpool_active); + assert(removed); + (void) removed; + } + } + } +#endif + result _prepare_work_item_delay(dynamic_thread_pool_group::work_item *workitem, grouph_type grouph, deadline d) { if(!d) @@ -182,6 +292,9 @@ namespace detail { return win32_error(); } +#else + (void) grouph; + workitem->_internaltimerh = (void *) (uintptr_t) -1; #endif } if(d.nsecs > 0) @@ -195,6 +308,10 @@ namespace detail workitem->_timepoint2 = d.to_time_point(); } } + else + { + workitem->_timepoint1 = std::chrono::steady_clock::time_point(std::chrono::steady_clock::duration(1)); + } } return success(); } @@ -207,7 +324,7 @@ namespace detail inline void _work_item_next(_lock_guard &g, dynamic_thread_pool_group::work_item *i) noexcept; inline result stop(_lock_guard &g, dynamic_thread_pool_group_impl *group, result err) noexcept; - inline result wait(_lock_guard &g, bool reap, const dynamic_thread_pool_group_impl *group, deadline d) noexcept; + inline result wait(_lock_guard &g, bool reap, dynamic_thread_pool_group_impl *group, deadline d) noexcept; inline void _timerthread(dynamic_thread_pool_group::work_item *workitem, threadh_type selfthreadh); inline void _workerthread(dynamic_thread_pool_group::work_item *workitem, threadh_type selfthreadh); @@ -245,6 +362,7 @@ class dynamic_thread_pool_group_impl final : public dynamic_thread_pool_group dynamic_thread_pool_group::work_item *front{nullptr}, *back{nullptr}; } _work_items_active, _work_items_done, _work_items_delayed; std::atomic _stopping{false}, _stopped{true}, _completing{false}; + std::atomic _waits{0}; result _abnormal_completion_cause{success()}; // The cause of any abnormal group completion #if LLFIO_DYNAMIC_THREAD_POOL_GROUP_USING_GCD @@ -252,6 +370,8 @@ class dynamic_thread_pool_group_impl final : public dynamic_thread_pool_group #elif defined(_WIN32) TP_CALLBACK_ENVIRON _callbackenviron; PTP_CALLBACK_ENVIRON _grouph{&_callbackenviron}; +#else + void *_grouph{nullptr}; #endif public: @@ -271,13 +391,19 @@ public: #elif defined(_WIN32) InitializeThreadpoolEnvironment(_grouph); #endif - detail::global_dynamic_thread_pool_impl::_lock_guard g(impl.workqueue_lock); + detail::global_dynamic_thread_pool_impl::_lock_guard g(impl.workqueue_lock); // lock global // Append this group to the global work queue at its nesting level if(_nesting_level >= impl.workqueue.size()) { impl.workqueue.resize(_nesting_level + 1); } - impl.workqueue[_nesting_level].items.insert(this); + auto &wq = impl.workqueue[_nesting_level]; + wq.items.insert(this); + if(wq.items.size() == 1) + { + wq.currentgroup = wq.items.begin(); + wq.currentgroupremaining = _work_items_active.count; + } return success(); } catch(...) @@ -304,9 +430,21 @@ public: _grouph = nullptr; } #endif - detail::global_dynamic_thread_pool_impl::_lock_guard g(impl.workqueue_lock); + detail::global_dynamic_thread_pool_impl::_lock_guard g(impl.workqueue_lock); // lock global assert(impl.workqueue.size() > _nesting_level); - impl.workqueue[_nesting_level].items.erase(this); + auto &wq = impl.workqueue[_nesting_level]; + if(*wq.currentgroup == this) + { + if(wq.items.end() == ++wq.currentgroup) + { + wq.currentgroup = wq.items.begin(); + } + if(!wq.items.empty()) + { + wq.currentgroupremaining = (*wq.currentgroup)->_work_items_active.count; + } + } + wq.items.erase(this); while(!impl.workqueue.empty() && impl.workqueue.back().items.empty()) { impl.workqueue.pop_back(); @@ -320,7 +458,6 @@ public: { return errc::operation_canceled; } - _stopped.store(false, std::memory_order_release); if(_completing.load(std::memory_order_relaxed)) { for(auto *i : work) @@ -330,8 +467,9 @@ public: } return success(); } + _stopped.store(false, std::memory_order_release); auto &impl = detail::global_dynamic_thread_pool(); - _lock_guard g(_lock); + _lock_guard g(_lock); // lock group if(_work_items_active.count == 0 && _work_items_done.count == 0) { _abnormal_completion_cause = success(); @@ -352,7 +490,7 @@ public: return success(); } auto &impl = detail::global_dynamic_thread_pool(); - _lock_guard g(_lock); + _lock_guard g(_lock); // lock group return impl.stop(g, this, errc::operation_canceled); } @@ -368,8 +506,8 @@ public: return success(); } auto &impl = detail::global_dynamic_thread_pool(); - _lock_guard g(_lock); - return impl.wait(g, true, this, d); + _lock_guard g(_lock); // lock group + return impl.wait(g, true, const_cast(this), d); } }; @@ -399,6 +537,221 @@ LLFIO_HEADERS_ONLY_FUNC_SPEC result make_dynamic_ namespace detail { +#if !LLFIO_DYNAMIC_THREAD_POOL_GROUP_USING_GCD && !defined(_WIN32) + inline void global_dynamic_thread_pool_impl::_execute_work(thread_t *self) + { + pthread_setname_np(pthread_self(), "LLFIO DYN TPG"); + self->last_did_work = std::chrono::steady_clock::now(); + _lock_guard g(workqueue_lock); // lock global + self->state++; // busy + threadpool_threads.fetch_add(1, std::memory_order_release); +#if LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING + std::cout << "*** DTP " << self << " begins." << std::endl; +#endif + while(self->state > 0) + { + restart: + dynamic_thread_pool_group::work_item *workitem = nullptr; + std::chrono::steady_clock::time_point earliest_duration; + std::chrono::system_clock::time_point earliest_absolute; + if(!workqueue.empty()) + { + auto wq = --workqueue.end(); +#if LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING + std::cout << "*** DTP " << self << " restarts from top of work queue" << std::endl; +#endif + for(;;) + { + dynamic_thread_pool_group_impl *tpg = *wq->currentgroup; + _lock_guard gg(tpg->_lock); // lock group + if(wq->currentgroupremaining > tpg->_work_items_active.count) + { + wq->currentgroupremaining = tpg->_work_items_active.count; + } +#if LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING + std::cout << "*** DTP " << self << " sees " << wq->currentgroupremaining << " items remaining in group " << tpg << std::endl; +#endif + if(wq->currentgroupremaining > 0 && tpg->_work_items_active.front->_internalworkh == nullptr) + { + auto *wi = tpg->_work_items_active.front; + _remove_from_list(tpg->_work_items_active, wi); + _append_to_list(tpg->_work_items_active, wi); + wq->currentgroupremaining--; + if(wi->_internaltimerh == nullptr) + { + workitem = wi; + workitem->_internalworkh = self; +#if LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING + std::cout << "*** DTP " << self << " chooses work item " << workitem << " from group " << tpg << " distance from top " + << (workqueue.end() - wq - 1) << std::endl; +#endif + break; + } + bool invoketimer = false; + if(wi->_timepoint1 != std::chrono::steady_clock::time_point()) + { + // Special constant for immediately rescheduled work items + if(wi->_timepoint1 == std::chrono::steady_clock::time_point(std::chrono::steady_clock::duration(1))) + { + invoketimer = true; + } + else if(earliest_duration == std::chrono::steady_clock::time_point() || wi->_timepoint1 < earliest_duration) + { + earliest_duration = wi->_timepoint1; + if(wi->_timepoint1 <= std::chrono::steady_clock::now()) + { + invoketimer = true; + } + } + } + if(wi->_timepoint2 != std::chrono::system_clock::time_point() && + (earliest_absolute == std::chrono::system_clock::time_point() || wi->_timepoint2 < earliest_absolute)) + { + earliest_absolute = wi->_timepoint2; + if(wi->_timepoint2 <= std::chrono::system_clock::now()) + { + invoketimer = true; + } + } + if(invoketimer) + { + wi->_internalworkh = self; + wi->_internaltimerh = nullptr; +#if LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING + std::cout << "*** DTP " << self << " executes timer item " << wi << std::endl; +#endif + gg.unlock(); + g.unlock(); + _timerthread(wi, nullptr); + g.lock(); + // wi->_internalworkh should be null, however wi may also no longer exist + goto restart; + } +#if LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING + std::cout << "*** DTP " << self << " timer item " << wi << " timer is not ready yet " << std::endl; +#endif + } + else + { + auto startinggroup = wq->currentgroup; + do + { + gg.unlock(); // unlock group + if(++wq->currentgroup == wq->items.end()) + { + wq->currentgroup = wq->items.begin(); + } + tpg = *wq->currentgroup; + gg = _lock_guard(tpg->_lock); // lock group + if(startinggroup == wq->currentgroup) + { +#if LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING + std::cout << "*** DTP " << self << " workqueue distance " << (workqueue.end() - wq - 1) << " examining " << tpg + << " finds _work_items_active.count = " << tpg->_work_items_active.count << "." << std::endl; +#endif + if(tpg->_work_items_active.count == 0 || tpg->_work_items_active.front->_internalworkh != nullptr) + { + // Nothing for me to do in this workqueue + if(wq == workqueue.begin()) + { + assert(workitem == nullptr); +#if LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING + std::cout << "*** DTP " << self << " finds work queue empty, going to sleep." << std::endl; +#endif + goto workqueue_empty; + } + gg.unlock(); // unlock group + --wq; + tpg = *wq->currentgroup; + startinggroup = wq->currentgroup; + gg = _lock_guard(tpg->_lock); // lock group +#if LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING + std::cout << "*** DTP " << self << " moves work search up to distance from top = " << (workqueue.end() - wq - 1) << std::endl; +#endif + continue; + } + } + } while(tpg->_work_items_active.count == 0); + wq->currentgroupremaining = tpg->_work_items_active.count; +#if LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING + std::cout << "*** DTP " << self << " choose new group " << tpg << std::endl; +#endif + } + } + } + workqueue_empty: + auto now = std::chrono::steady_clock::now(); + if(workitem == nullptr) + { + std::chrono::steady_clock::duration duration(std::chrono::minutes(1)); + if(earliest_duration != std::chrono::steady_clock::time_point()) + { + if(now - earliest_duration < duration) + { + duration = now - earliest_duration; + } + } + else if(earliest_absolute != std::chrono::system_clock::time_point()) + { + auto diff = std::chrono::system_clock::now() - earliest_absolute; + if(diff > duration) + { + earliest_absolute = {}; + } + } + else if(now - self->last_did_work >= std::chrono::minutes(1)) + { + _remove_from_list(threadpool_active, self); + self->thread.detach(); + delete self; + return; + } + self->last_did_work = now; + _remove_from_list(threadpool_active, self); + _append_to_list(threadpool_sleeping, self); + self->state--; + threadpool_sleeping_count.fetch_add(1, std::memory_order_release); + if(earliest_absolute != std::chrono::system_clock::time_point()) + { +#if LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING + std::cout << "*** DTP " << self << " goes to sleep absolute" << std::endl; +#endif + self->cond.wait_until(g, earliest_absolute); + } + else + { +#if LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING + std::cout << "*** DTP " << self << " goes to sleep for " << std::chrono::duration_cast(duration).count() << std::endl; +#endif + self->cond.wait_for(g, duration); + } + self->state++; + _remove_from_list(threadpool_sleeping, self); + _append_to_list(threadpool_active, self); + threadpool_sleeping_count.fetch_sub(1, std::memory_order_release); +#if LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING + std::cout << "*** DTP " << self << " wakes, state = " << self->state << std::endl; +#endif + continue; + } + self->last_did_work = now; +#if LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING + std::cout << "*** DTP " << self << " executes work item " << workitem << std::endl; +#endif + total_submitted_workitems.fetch_sub(1, std::memory_order_relaxed); + g.unlock(); + _workerthread(workitem, nullptr); + g.lock(); + // workitem->_internalworkh should be null, however workitem may also no longer exist + } + self->state -= 2; // dead + threadpool_threads.fetch_sub(1, std::memory_order_release); +#if LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING + std::cout << "*** DTP " << self << " exits, state = " << self->state << std::endl; +#endif + } +#endif + inline void global_dynamic_thread_pool_impl::_submit_work_item(_lock_guard &g, dynamic_thread_pool_group::work_item *workitem) { (void) g; @@ -502,6 +855,43 @@ namespace detail SubmitThreadpoolWork((PTP_WORK) workitem->_internalworkh); #endif } +#if !LLFIO_DYNAMIC_THREAD_POOL_GROUP_USING_GCD && !defined(_WIN32) + // Indicate that I can be executed again + workitem->_internalworkh = nullptr; +#if LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING + std::cout << "*** DTP submits work item " << workitem << std::endl; +#endif + const auto active_work_items = total_submitted_workitems.fetch_add(1, std::memory_order_relaxed) + 1; + const auto sleeping_count = threadpool_sleeping_count.load(std::memory_order_relaxed); + const auto threads = threadpool_threads.load(std::memory_order_relaxed); + if(sleeping_count > 0 || threads == 0) + { + g.unlock(); // unlock group + { + _lock_guard gg(workqueue_lock); // lock global + if(threadpool_active.count == 0 && threadpool_sleeping.count == 0) + { + _add_thread(gg); + _add_thread(gg); + _add_thread(gg); + _add_thread(gg); + } + else if(threadpool_sleeping.count > 0 && active_work_items > threadpool_active.count) + { + // Try to wake the most recently slept first + auto *t = threadpool_sleeping.back; + auto now = std::chrono::steady_clock::now(); + for(size_t n = std::min(active_work_items - threadpool_active.count, threadpool_sleeping.count); n > 0; n--) + { + t->last_did_work = now; // prevent reap + t->cond.notify_one(); + t = t->_prev; + } + } + } + g.lock(); // lock group + } +#endif } } @@ -607,6 +997,12 @@ namespace detail i->_internalworkh = nullptr; } } +#else + i->_internaltimerh = nullptr; + i->_internalworkh = nullptr; +#if LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING + std::cout << "*** DTP sets done work item " << i << std::endl; +#endif #endif _remove_from_list(i->_parent.load(std::memory_order_relaxed)->_work_items_active, i); _append_to_list(i->_parent.load(std::memory_order_relaxed)->_work_items_done, i); @@ -626,24 +1022,55 @@ namespace detail parent->_work_items_done.count = 0; parent->_stopping.store(false, std::memory_order_release); parent->_stopped.store(true, std::memory_order_release); - parent->_completing.store(true, std::memory_order_release); + parent->_completing.store(true, std::memory_order_release); // cause submissions to enter _work_items_delayed +#if LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING + std::cout << "*** DTP executes group_complete for group " << parent << std::endl; +#endif for(; v != nullptr; v = n) { n = v->_next; v->group_complete(parent->_abnormal_completion_cause); } - parent->_completing.store(false, std::memory_order_release); - // Did a least one group_complete() submit more work to myself? - while(parent->_work_items_delayed.count > 0) + parent->_completing.store(false, std::memory_order_release); // cease submitting to _work_items_delayed +#if LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING + std::cout << "*** DTP group_complete done for group " << parent << ". _work_items_delayed.count = " << parent->_work_items_delayed.count << std::endl; +#endif + if(parent->_work_items_delayed.count > 0) { - i = parent->_work_items_delayed.front; - _remove_from_list(parent->_work_items_delayed, i); - auto r = submit(g, parent, {&i, 1}); - if(!r) + /* If there are waits on this group to complete, forward progress those now. + */ + while(parent->_waits.load(std::memory_order_relaxed) > 0) { - parent->_work_items_delayed = {}; - (void) stop(g, parent, std::move(r)); - break; +#if LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING + std::cout << "*** DTP group_complete blocks on waits for group " << parent << std::endl; +#endif + g.unlock(); + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + g.lock(); + } + // Now submit all delayed work + while(parent->_work_items_delayed.count > 0) + { + i = parent->_work_items_delayed.front; + _remove_from_list(parent->_work_items_delayed, i); + auto r = submit(g, parent, {&i, 1}); +#if LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING + std::cout << "*** DTP group_complete submits delayed work item " << i << " for group " << parent << " which saw error "; + if(r) + { + std::cout << "none" << std::endl; + } + else + { + std::cout << r.error().message() << std::endl; + } +#endif + if(!r) + { + parent->_work_items_delayed = {}; + (void) stop(g, parent, std::move(r)); + break; + } } } } @@ -682,11 +1109,24 @@ namespace detail return success(); } - inline result global_dynamic_thread_pool_impl::wait(_lock_guard &g, bool reap, const dynamic_thread_pool_group_impl *group, deadline d) noexcept + + inline result global_dynamic_thread_pool_impl::wait(_lock_guard &g, bool reap, dynamic_thread_pool_group_impl *group, deadline d) noexcept { LLFIO_DEADLINE_TO_SLEEP_INIT(d); if(!d || d.nsecs > 0) { + /* To ensure forward progress, we need to gate new waits during delayed work submission. + Otherwise waits may never exit if the window where _work_items_active.count == 0 is + missed. + */ + while(group->_work_items_delayed.count > 0) + { + g.unlock(); + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + g.lock(); + } + group->_waits.fetch_add(1, std::memory_order_release); + auto unwaitcount = make_scope_exit([&]() noexcept { group->_waits.fetch_sub(1, std::memory_order_release); }); #if LLFIO_DYNAMIC_THREAD_POOL_GROUP_USING_GCD while(group->_work_items_active.count > 0) { @@ -842,6 +1282,34 @@ namespace detail g.lock(); } } +#else +#if 0 + if(group->_stopping.load(std::memory_order_relaxed)) + { + // Kill all work items not currently being executed immediately + for(bool done = false; !done;) + { + done = true; + for(auto *p = group->_work_items_active.front; p != nullptr; p = p->_next) + { + if(p->_internalworkh == nullptr) + { + _remove_from_list(group->_work_items_active, p); + _append_to_list(group->_work_items_done, p); + done = false; + break; + } + } + } + } +#endif + while(group->_work_items_active.count > 0) + { + LLFIO_DEADLINE_TO_TIMEOUT_LOOP(d); + g.unlock(); + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + g.lock(); + } #endif } if(group->_work_items_active.count > 0) @@ -859,7 +1327,7 @@ namespace detail { LLFIO_LOG_FUNCTION_CALL(this); assert(workitem->_nextwork != -1); - _lock_guard g(workitem->_parent.load(std::memory_order_relaxed)->_lock); + _lock_guard g(workitem->_parent.load(std::memory_order_relaxed)->_lock); // lock group // std::cout << "*** _timerthread " << workitem << std::endl; if(workitem->_parent.load(std::memory_order_relaxed)->_stopping.load(std::memory_order_relaxed)) { @@ -907,7 +1375,7 @@ namespace detail assert(workitem->_nextwork != 0); if(workitem->_parent.load(std::memory_order_relaxed)->_stopping.load(std::memory_order_relaxed)) { - _lock_guard g(workitem->_parent.load(std::memory_order_relaxed)->_lock); + _lock_guard g(workitem->_parent.load(std::memory_order_relaxed)->_lock); // lock group _work_item_done(g, workitem); return; } @@ -919,7 +1387,7 @@ namespace detail auto r = (*workitem)(workitem->_nextwork); workitem->_nextwork = 0; // call next() next time tls = old_thread_local_state; - _lock_guard g(workitem->_parent.load(std::memory_order_relaxed)->_lock); + _lock_guard g(workitem->_parent.load(std::memory_order_relaxed)->_lock); // lock group // std::cout << "*** _workerthread " << workitem << " ends with work " << workitem->_nextwork << std::endl; if(!r) { diff --git a/include/llfio/v2.0/detail/impl/posix/statfs.ipp b/include/llfio/v2.0/detail/impl/posix/statfs.ipp index 97aff0f0..092ca9f2 100644 --- a/include/llfio/v2.0/detail/impl/posix/statfs.ipp +++ b/include/llfio/v2.0/detail/impl/posix/statfs.ipp @@ -1,5 +1,5 @@ /* Information about the volume storing a file -(C) 2016-2017 Niall Douglas (5 commits) +(C) 2016-2020 Niall Douglas (5 commits) File Created: Jan 2016 @@ -483,7 +483,7 @@ LLFIO_HEADERS_ONLY_MEMFUNC_SPEC result> statfs_t::_fi On Mac OS, getting the current i/o wait time appears to be privileged only? */ #endif - return {-1, _allbits1_float}; + return {-1, detail::constexpr_float_allbits_set_nan()}; } catch(...) { diff --git a/test/tests/dynamic_thread_pool_group.cpp b/test/tests/dynamic_thread_pool_group.cpp index 58d670c4..a40d3b05 100644 --- a/test/tests/dynamic_thread_pool_group.cpp +++ b/test/tests/dynamic_thread_pool_group.cpp @@ -146,7 +146,12 @@ static inline void TestDynamicThreadPoolGroupWorks() BOOST_CHECK(llfio::dynamic_thread_pool_group::current_work_item() == nullptr); }; auto check = [&] { - shared_state.tpg->wait().value(); + auto r = shared_state.tpg->wait(); + if(!r) + { + std::cerr << "ERROR: wait() reports failure " << r.error().message() << std::endl; + r.value(); + } BOOST_CHECK(!shared_state.tpg->stopping()); BOOST_CHECK(shared_state.tpg->stopped()); BOOST_CHECK(llfio::dynamic_thread_pool_group::current_nesting_level() == 0); @@ -186,6 +191,9 @@ static inline void TestDynamicThreadPoolGroupWorks() } std::cout << "Maximum concurrency achieved with " << workitems.size() << " work items = " << shared_state.max_concurrency << "\n" << std::endl; }; + auto print_exception_throw = llfio::make_scope_fail([]() noexcept { + std::cout << "NOTE: Exception throw occurred!" << std::endl; + }); // Test a single work item reset(1); @@ -430,9 +438,9 @@ static inline void TestDynamicThreadPoolGroupIoAwareWorks() BOOST_CHECK(paced > 0); } -KERNELTEST_TEST_KERNEL(integration, llfio, dynamic_thread_pool_group, works, "Tests that llfio::dynamic_thread_pool_group works as expected", - TestDynamicThreadPoolGroupWorks()) +//KERNELTEST_TEST_KERNEL(integration, llfio, dynamic_thread_pool_group, works, "Tests that llfio::dynamic_thread_pool_group works as expected", +// TestDynamicThreadPoolGroupWorks()) KERNELTEST_TEST_KERNEL(integration, llfio, dynamic_thread_pool_group, nested, "Tests that nesting of llfio::dynamic_thread_pool_group works as expected", TestDynamicThreadPoolGroupNestingWorks()) -KERNELTEST_TEST_KERNEL(integration, llfio, dynamic_thread_pool_group, io_aware_work_item, - "Tests that llfio::dynamic_thread_pool_group::io_aware_work_item works as expected", TestDynamicThreadPoolGroupIoAwareWorks()) +//KERNELTEST_TEST_KERNEL(integration, llfio, dynamic_thread_pool_group, io_aware_work_item, +// "Tests that llfio::dynamic_thread_pool_group::io_aware_work_item works as expected", TestDynamicThreadPoolGroupIoAwareWorks()) -- cgit v1.2.3