From 5558cb7bc5f3a74589e579e8504f76cde117bb03 Mon Sep 17 00:00:00 2001 From: "Niall Douglas (s [underscore] sourceforge {at} nedprod [dot] com)" Date: Wed, 17 Feb 2021 11:54:44 +0000 Subject: more wip --- include/llfio/revision.hpp | 6 +- .../v2.0/detail/impl/dynamic_thread_pool_group.ipp | 225 +++++++++++++-------- include/llfio/v2.0/detail/impl/posix/statfs.ipp | 4 +- .../llfio/v2.0/detail/impl/windows/file_handle.ipp | 4 +- include/llfio/v2.0/detail/impl/windows/statfs.ipp | 2 +- test/tests/dynamic_thread_pool_group.cpp | 9 +- 6 files changed, 151 insertions(+), 99 deletions(-) diff --git a/include/llfio/revision.hpp b/include/llfio/revision.hpp index 3456e101..a70b6e74 100644 --- a/include/llfio/revision.hpp +++ b/include/llfio/revision.hpp @@ -1,4 +1,4 @@ // Note the second line of this file must ALWAYS be the git SHA, third line ALWAYS the git SHA update time -#define LLFIO_PREVIOUS_COMMIT_REF b40a7594bd14dfbeaca42caf77b00df27df27b95 -#define LLFIO_PREVIOUS_COMMIT_DATE "2021-02-09 12:57:41 +00:00" -#define LLFIO_PREVIOUS_COMMIT_UNIQUE b40a7594 +#define LLFIO_PREVIOUS_COMMIT_REF c9e8352c9314ef2f2aa58fd9c15cdc36da9c99ac +#define LLFIO_PREVIOUS_COMMIT_DATE "2021-02-17 09:27:27 +00:00" +#define LLFIO_PREVIOUS_COMMIT_UNIQUE c9e8352c diff --git a/include/llfio/v2.0/detail/impl/dynamic_thread_pool_group.ipp b/include/llfio/v2.0/detail/impl/dynamic_thread_pool_group.ipp index 3fa590a8..07446ea0 100644 --- a/include/llfio/v2.0/detail/impl/dynamic_thread_pool_group.ipp +++ b/include/llfio/v2.0/detail/impl/dynamic_thread_pool_group.ipp @@ -1,5 +1,5 @@ /* Dynamic thread pool group -(C) 2020 Niall Douglas (9 commits) +(C) 2020-2021 Niall Douglas (9 commits) File Created: Dec 2020 @@ -122,7 +122,7 @@ namespace detail thread_t *front{nullptr}, *back{nullptr}; } threadpool_active, threadpool_sleeping; std::atomic total_submitted_workitems{0}, threadpool_threads{0}, threadpool_sleeping_count{0}; - std::atomic ms_sleep_for_more_work{60000}; + std::atomic ms_sleep_for_more_work{5000}; // TODO put back to 60000 std::mutex threadmetrics_lock; struct threadmetrics_threadid @@ -676,8 +676,25 @@ namespace detail } workitem->_timepoint1 = {}; workitem->_timepoint2 = {}; + assert(!workitem->_has_timer_set()); if(workitem->_nextwork == 0 || d.nsecs > 0) { + if(d.nsecs > 0) + { + if(d.steady) + { + workitem->_timepoint1 = std::chrono::steady_clock::now() + std::chrono::nanoseconds(d.nsecs); + } + else + { + workitem->_timepoint2 = d.to_time_point(); + } + } + else + { + workitem->_timepoint1 = std::chrono::steady_clock::time_point(std::chrono::steady_clock::duration(1)); + } + assert(workitem->_has_timer_set()); if(nullptr == workitem->_internaltimerh) { #if LLFIO_DYNAMIC_THREAD_POOL_GROUP_USING_GCD @@ -694,31 +711,15 @@ namespace detail workitem->_internaltimerh = (void *) (uintptr_t) -1; #endif } - if(d.nsecs > 0) - { - if(d.steady) - { - workitem->_timepoint1 = std::chrono::steady_clock::now() + std::chrono::nanoseconds(d.nsecs); - } - else - { - workitem->_timepoint2 = d.to_time_point(); - } - } - else - { - workitem->_timepoint1 = std::chrono::steady_clock::time_point(std::chrono::steady_clock::duration(1)); - } } return success(); } - inline void _submit_work_item(_lock_guard &g, dynamic_thread_pool_group::work_item *workitem, bool defer_pool_wake); + inline void _submit_work_item(_lock_guard &g, bool item_in_timer_list, dynamic_thread_pool_group::work_item *workitem, bool defer_pool_wake); inline result submit(_lock_guard &g, dynamic_thread_pool_group_impl *group, span work) noexcept; inline void _work_item_done(_lock_guard &g, dynamic_thread_pool_group::work_item *i) noexcept; - inline void _work_item_next(_lock_guard &g, dynamic_thread_pool_group::work_item *i) noexcept; inline result stop(_lock_guard &g, dynamic_thread_pool_group_impl *group, result err) noexcept; inline result wait(_lock_guard &g, bool reap, dynamic_thread_pool_group_impl *group, deadline d) noexcept; @@ -1015,6 +1016,12 @@ namespace detail } for(;;) { + if(workqueue_depth >= workqueue.size()) + { + goto restart; + } + wq_it = workqueue.begin() + workqueue_depth; + wq = &(*wq_it); dynamic_thread_pool_group_impl *tpg = *wq->currentgroup; _lock_guard gg(tpg->_lock); // lock group if(started_from_top) @@ -1178,7 +1185,16 @@ namespace detail auto now = std::chrono::steady_clock::now(); if(workitem == nullptr) { - std::chrono::steady_clock::duration duration(std::chrono::minutes(1)); + const std::chrono::steady_clock::duration max_sleep(std::chrono::milliseconds(ms_sleep_for_more_work.load(std::memory_order_relaxed))); + if(now - self->last_did_work >= max_sleep) + { + _remove_from_list(threadpool_active, self); + threadpool_threads.fetch_sub(1, std::memory_order_release); + self->thread.detach(); + delete self; + return; + } + std::chrono::steady_clock::duration duration(max_sleep); if(earliest_duration != std::chrono::steady_clock::time_point()) { if(now - earliest_duration < duration) @@ -1194,15 +1210,6 @@ namespace detail earliest_absolute = {}; } } - else if(now - self->last_did_work >= std::chrono::milliseconds(ms_sleep_for_more_work.load(std::memory_order_relaxed))) - { - _remove_from_list(threadpool_active, self); - threadpool_threads.fetch_sub(1, std::memory_order_release); - self->thread.detach(); - delete self; - return; - } - self->last_did_work = now; _remove_from_list(threadpool_active, self); _append_to_list(threadpool_sleeping, self); self->state--; @@ -1271,30 +1278,45 @@ namespace detail } #endif - inline void global_dynamic_thread_pool_impl::_submit_work_item(_lock_guard &g, dynamic_thread_pool_group::work_item *workitem, bool defer_pool_wake) + inline void global_dynamic_thread_pool_impl::_submit_work_item(_lock_guard &g, bool item_in_timer_list, dynamic_thread_pool_group::work_item *workitem, + bool defer_pool_wake) { (void) g; (void) defer_pool_wake; if(workitem->_nextwork != -1) { + auto *parent = workitem->_parent.load(std::memory_order_relaxed); // If no work item for now, or there is a delay, schedule a timer if(workitem->_nextwork == 0 || workitem->_has_timer_set()) { assert(workitem->_internaltimerh != nullptr); + if(!item_in_timer_list) + { + _remove_from_list(parent->_work_items_active, workitem); + _append_to_list(parent->_work_items_timer, workitem); + } #if LLFIO_DYNAMIC_THREAD_POOL_GROUP_USING_GCD dispatch_time_t when; if(workitem->_has_timer_set_relative()) { - auto duration = std::chrono::duration_cast(workitem->_timepoint1 - std::chrono::steady_clock::now()).count(); - if(duration > 1000000000LL) + // Special constant for immediately rescheduled work items + if(workitem->_timepoint1 == std::chrono::steady_clock::time_point(std::chrono::steady_clock::duration(1))) + { + when = dispatch_time(DISPATCH_TIME_NOW, 0); + } + else { - // Because GCD has no way of cancelling timers, nor assigning them to a group, - // we clamp the timer to 1 second. Then if cancellation is ever done to the group, - // the worst possible wait is 1 second. _timerthread will reschedule the timer - // if it gets called short. - duration = 1000000000LL; + auto duration = std::chrono::duration_cast(workitem->_timepoint1 - std::chrono::steady_clock::now()).count(); + if(duration > 1000000000LL) + { + // Because GCD has no way of cancelling timers, nor assigning them to a group, + // we clamp the timer to 1 second. Then if cancellation is ever done to the group, + // the worst possible wait is 1 second. _timerthread will reschedule the timer + // if it gets called short. + duration = 1000000000LL; + } + when = dispatch_time(DISPATCH_TIME_NOW, duration); } - when = dispatch_time(DISPATCH_TIME_NOW, duration); } else if(workitem->_has_timer_set_absolute()) { @@ -1317,16 +1339,24 @@ namespace detail DWORD slop = 1000; if(workitem->_has_timer_set_relative()) { - li.QuadPart = std::chrono::duration_cast(workitem->_timepoint1 - std::chrono::steady_clock::now()).count() / 100; - if(li.QuadPart < 0) + // Special constant for immediately rescheduled work items + if(workitem->_timepoint1 == std::chrono::steady_clock::time_point(std::chrono::steady_clock::duration(1))) { - li.QuadPart = 0; + li.QuadPart = -1; // smallest possible non immediate duration from now } - if(li.QuadPart / 8 < (int64_t) slop) + else { - slop = (DWORD)(li.QuadPart / 8); + li.QuadPart = std::chrono::duration_cast(workitem->_timepoint1 - std::chrono::steady_clock::now()).count() / 100; + if(li.QuadPart < 0) + { + li.QuadPart = 0; + } + if(li.QuadPart / 8 < (int64_t) slop) + { + slop = (DWORD)(li.QuadPart / 8); + } + li.QuadPart = -li.QuadPart; // negative is relative } - li.QuadPart = -li.QuadPart; // negative is relative } else if(workitem->_has_timer_set_absolute()) { @@ -1345,35 +1375,40 @@ namespace detail } else { + if(item_in_timer_list) + { + _remove_from_list(parent->_work_items_timer, workitem); + _append_to_list(parent->_work_items_active, workitem); + } #if LLFIO_DYNAMIC_THREAD_POOL_GROUP_USING_GCD intptr_t priority = DISPATCH_QUEUE_PRIORITY_LOW; - if(workqueue.size() - workitem->_parent.load(std::memory_order_relaxed)->_nesting_level == 1) + if(workqueue.size() - parent->_nesting_level == 1) { priority = DISPATCH_QUEUE_PRIORITY_HIGH; } - else if(workqueue.size() - workitem->_parent.load(std::memory_order_relaxed)->_nesting_level == 2) + else if(workqueue.size() - parent->_nesting_level == 2) { priority = DISPATCH_QUEUE_PRIORITY_DEFAULT; } // std::cout << "*** submit " << workitem << std::endl; - dispatch_group_async_f(workitem->_parent.load(std::memory_order_relaxed)->_grouph, dispatch_get_global_queue(priority, 0), workitem, - _gcd_dispatch_callback); + dispatch_group_async_f(parent->_grouph, dispatch_get_global_queue(priority, 0), workitem, _gcd_dispatch_callback); #elif defined(_WIN32) // Set the priority of the group according to distance from the top TP_CALLBACK_PRIORITY priority = TP_CALLBACK_PRIORITY_LOW; - if(workqueue.size() - workitem->_parent.load(std::memory_order_relaxed)->_nesting_level == 1) + if(workqueue.size() - parent->_nesting_level == 1) { priority = TP_CALLBACK_PRIORITY_HIGH; } - else if(workqueue.size() - workitem->_parent.load(std::memory_order_relaxed)->_nesting_level == 2) + else if(workqueue.size() - parent->_nesting_level == 2) { priority = TP_CALLBACK_PRIORITY_NORMAL; } - SetThreadpoolCallbackPriority(workitem->_parent.load(std::memory_order_relaxed)->_grouph, priority); + SetThreadpoolCallbackPriority(parent->_grouph, priority); // std::cout << "*** submit " << workitem << std::endl; SubmitThreadpoolWork((PTP_WORK) workitem->_internalworkh); #endif } + #if !LLFIO_DYNAMIC_THREAD_POOL_GROUP_USING_GCD && !defined(_WIN32) // Indicate that I can be executed again workitem->_internalworkh = nullptr; @@ -1491,7 +1526,7 @@ namespace detail group->_newly_added_active_work_items++; group->_active_work_items_remaining++; #endif - _submit_work_item(g, i, i != work.back()); + _submit_work_item(g, i->_has_timer_set(), i, i != work.back()); } } return success(); @@ -1605,28 +1640,6 @@ namespace detail } } } - inline void global_dynamic_thread_pool_impl::_work_item_next(_lock_guard &g, dynamic_thread_pool_group::work_item *workitem) noexcept - { - assert(workitem->_nextwork != -1); - if(workitem->_nextwork == 0) - { - deadline d(std::chrono::seconds(0)); - workitem->_nextwork = workitem->next(d); - auto r = _prepare_work_item_delay(workitem, workitem->_parent.load(std::memory_order_relaxed)->_grouph, d); - if(!r) - { - (void) stop(g, workitem->_parent.load(std::memory_order_relaxed), std::move(r)); - _work_item_done(g, workitem); - return; - } - } - if(-1 == workitem->_nextwork) - { - _work_item_done(g, workitem); - return; - } - _submit_work_item(g, workitem, false); - } inline result global_dynamic_thread_pool_impl::stop(_lock_guard &g, dynamic_thread_pool_group_impl *group, result err) noexcept { @@ -1872,9 +1885,11 @@ namespace detail { LLFIO_LOG_FUNCTION_CALL(this); assert(workitem->_nextwork != -1); - _lock_guard g(workitem->_parent.load(std::memory_order_relaxed)->_lock); // lock group + assert(workitem->_has_timer_set()); + auto *parent = workitem->_parent.load(std::memory_order_relaxed); + _lock_guard g(parent->_lock); // lock group // std::cout << "*** _timerthread " << workitem << std::endl; - if(workitem->_parent.load(std::memory_order_relaxed)->_stopping.load(std::memory_order_relaxed)) + if(parent->_stopping.load(std::memory_order_relaxed)) { _work_item_done(g, workitem); return; @@ -1905,7 +1920,32 @@ namespace detail #endif workitem->_timepoint2 = {}; } - _work_item_next(g, workitem); + assert(!workitem->_has_timer_set()); + if(workitem->_nextwork == 0) + { + deadline d(std::chrono::seconds(0)); + workitem->_nextwork = workitem->next(d); + auto r2 = _prepare_work_item_delay(workitem, parent->_grouph, d); + if(!workitem->_has_timer_set()) + { + _remove_from_list(parent->_work_items_timer, workitem); + _append_to_list(parent->_work_items_active, workitem); + } + if(!r2) + { + (void) stop(g, parent, std::move(r2)); + _work_item_done(g, workitem); + return; + } + if(-1 == workitem->_nextwork) + { + _work_item_done(g, workitem); + return; + } + _submit_work_item(g, false, workitem, false); + return; + } + _submit_work_item(g, true, workitem, false); } // Worker thread entry point @@ -1913,14 +1953,15 @@ namespace detail { LLFIO_LOG_FUNCTION_CALL(this); //{ - // _lock_guard g(workitem->_parent.load(std::memory_order_relaxed)->_lock); + // _lock_guard g(parent->_lock); // std::cout << "*** _workerthread " << workitem << " begins with work " << workitem->_nextwork << std::endl; //} assert(workitem->_nextwork != -1); assert(workitem->_nextwork != 0); - if(workitem->_parent.load(std::memory_order_relaxed)->_stopping.load(std::memory_order_relaxed)) + auto *parent = workitem->_parent.load(std::memory_order_relaxed); + if(parent->_stopping.load(std::memory_order_relaxed)) { - _lock_guard g(workitem->_parent.load(std::memory_order_relaxed)->_lock); // lock group + _lock_guard g(parent->_lock); // lock group _work_item_done(g, workitem); return; } @@ -1928,25 +1969,39 @@ namespace detail auto old_thread_local_state = tls; tls.workitem = workitem; tls.current_callback_instance = selfthreadh; - tls.nesting_level = workitem->_parent.load(std::memory_order_relaxed)->_nesting_level + 1; + tls.nesting_level = parent->_nesting_level + 1; auto r = (*workitem)(workitem->_nextwork); workitem->_nextwork = 0; // call next() next time tls = old_thread_local_state; - _lock_guard g(workitem->_parent.load(std::memory_order_relaxed)->_lock); // lock group + _lock_guard g(parent->_lock); // lock group // std::cout << "*** _workerthread " << workitem << " ends with work " << workitem->_nextwork << std::endl; if(!r) { - (void) stop(g, workitem->_parent.load(std::memory_order_relaxed), std::move(r)); + (void) stop(g, parent, std::move(r)); _work_item_done(g, workitem); workitem = nullptr; } - else if(workitem->_parent.load(std::memory_order_relaxed)->_stopping.load(std::memory_order_relaxed)) + else if(parent->_stopping.load(std::memory_order_relaxed)) { _work_item_done(g, workitem); } else { - _work_item_next(g, workitem); + deadline d(std::chrono::seconds(0)); + workitem->_nextwork = workitem->next(d); + auto r2 = _prepare_work_item_delay(workitem, parent->_grouph, d); + if(!r2) + { + (void) stop(g, parent, std::move(r2)); + _work_item_done(g, workitem); + return; + } + if(-1 == workitem->_nextwork) + { + _work_item_done(g, workitem); + return; + } + _submit_work_item(g, false, workitem, false); } } } // namespace detail diff --git a/include/llfio/v2.0/detail/impl/posix/statfs.ipp b/include/llfio/v2.0/detail/impl/posix/statfs.ipp index 092ca9f2..c8455ef6 100644 --- a/include/llfio/v2.0/detail/impl/posix/statfs.ipp +++ b/include/llfio/v2.0/detail/impl/posix/statfs.ipp @@ -171,7 +171,7 @@ LLFIO_HEADERS_ONLY_MEMFUNC_SPEC result statfs_t::fill(const handle &h, s */ if(mountentries.size() > 1) { - OUTCOME_TRY(auto currentfilepath_, h.current_path()); + OUTCOME_TRY(auto &¤tfilepath_, h.current_path()); string_view currentfilepath(currentfilepath_.native()); std::vector> scores(mountentries.size()); // std::cout << "*** For matching mount entries to file with path " << currentfilepath << ":\n"; @@ -321,7 +321,7 @@ LLFIO_HEADERS_ONLY_MEMFUNC_SPEC result statfs_t::fill(const handle &h, s #endif if(!!(wanted & want::iosinprogress) || !!(wanted & want::iosbusytime)) { - OUTCOME_TRY(auto ios, _fill_ios(h, f_mntfromname)); + OUTCOME_TRY(auto &&ios, _fill_ios(h, f_mntfromname)); if(!!(wanted & want::iosinprogress)) { f_iosinprogress = ios.first; diff --git a/include/llfio/v2.0/detail/impl/windows/file_handle.ipp b/include/llfio/v2.0/detail/impl/windows/file_handle.ipp index 6e9c7a25..5b36810f 100644 --- a/include/llfio/v2.0/detail/impl/windows/file_handle.ipp +++ b/include/llfio/v2.0/detail/impl/windows/file_handle.ipp @@ -884,7 +884,7 @@ LLFIO_HEADERS_ONLY_MEMFUNC_SPEC result> statfs_t::_fi { alignas(8) wchar_t buffer[32769]; // Firstly open a handle to the volume - OUTCOME_TRY(auto volumeh, file_handle::file({}, mntfromname, handle::mode::none, handle::creation::open_existing, handle::caching::only_metadata)); + OUTCOME_TRY(auto &&volumeh, file_handle::file({}, mntfromname, handle::mode::none, handle::creation::open_existing, handle::caching::only_metadata)); // Now ask the volume what physical disks it spans auto *vde = reinterpret_cast(buffer); OVERLAPPED ol{}; @@ -932,7 +932,7 @@ LLFIO_HEADERS_ONLY_MEMFUNC_SPEC result> statfs_t::_fi } *e++ = '0' + (DiskNumber % 10); *e = 0; - OUTCOME_TRY(auto diskh, file_handle::file({}, path_view(physicaldrivename, e - physicaldrivename, path_view::zero_terminated), handle::mode::none, + OUTCOME_TRY(auto &&diskh, file_handle::file({}, path_view(physicaldrivename, e - physicaldrivename, path_view::zero_terminated), handle::mode::none, handle::creation::open_existing, handle::caching::only_metadata)); ol.Internal = static_cast(-1); auto *dp = reinterpret_cast(buffer); diff --git a/include/llfio/v2.0/detail/impl/windows/statfs.ipp b/include/llfio/v2.0/detail/impl/windows/statfs.ipp index 06e6040c..a712433b 100644 --- a/include/llfio/v2.0/detail/impl/windows/statfs.ipp +++ b/include/llfio/v2.0/detail/impl/windows/statfs.ipp @@ -249,7 +249,7 @@ LLFIO_HEADERS_ONLY_MEMFUNC_SPEC result statfs_t::fill(const handle &h, s } if(!!(wanted & want::iosinprogress) || !!(wanted & want::iosbusytime)) { - OUTCOME_TRY(auto ios, _fill_ios(h, f_mntfromname)); + OUTCOME_TRY(auto &&ios, _fill_ios(h, f_mntfromname)); if(!!(wanted & want::iosinprogress)) { f_iosinprogress = ios.first; diff --git a/test/tests/dynamic_thread_pool_group.cpp b/test/tests/dynamic_thread_pool_group.cpp index dfc46e9a..4989c585 100644 --- a/test/tests/dynamic_thread_pool_group.cpp +++ b/test/tests/dynamic_thread_pool_group.cpp @@ -191,15 +191,12 @@ static inline void TestDynamicThreadPoolGroupWorks() } std::cout << "Maximum concurrency achieved with " << workitems.size() << " work items = " << shared_state.max_concurrency << "\n" << std::endl; }; - auto print_exception_throw = llfio::make_scope_fail([]() noexcept { - std::cout << "NOTE: Exception throw occurred!" << std::endl; - }); + auto print_exception_throw = llfio::make_scope_fail([]() noexcept { std::cout << "NOTE: Exception throw occurred!" << std::endl; }); // Test a single work item reset(1); submit(); check(); - exit(0); // Test 10 work items reset(10); @@ -305,7 +302,7 @@ static inline void TestDynamicThreadPoolGroupNestingWorks() } uint64_t idx = std::chrono::duration_cast(std::chrono::steady_clock::now().time_since_epoch()).count(); std::lock_guard g(shared_states[nesting].lock); - //std::cout << "wi " << this << " nesting " << nesting << " work " << work << std::endl; + // std::cout << "wi " << this << " nesting " << nesting << " work " << work << std::endl; if(COUNT_PER_WORK_ITEM == work && childwi) { if(!shared_states[nesting].tpg) @@ -449,5 +446,5 @@ KERNELTEST_TEST_KERNEL(integration, llfio, dynamic_thread_pool_group, works, "Te TestDynamicThreadPoolGroupWorks()) KERNELTEST_TEST_KERNEL(integration, llfio, dynamic_thread_pool_group, nested, "Tests that nesting of llfio::dynamic_thread_pool_group works as expected", TestDynamicThreadPoolGroupNestingWorks()) -//KERNELTEST_TEST_KERNEL(integration, llfio, dynamic_thread_pool_group, io_aware_work_item, +// KERNELTEST_TEST_KERNEL(integration, llfio, dynamic_thread_pool_group, io_aware_work_item, // "Tests that llfio::dynamic_thread_pool_group::io_aware_work_item works as expected", TestDynamicThreadPoolGroupIoAwareWorks()) -- cgit v1.2.3