diff options
Diffstat (limited to 'include/llfio/v2.0/detail/impl/dynamic_thread_pool_group.ipp')
-rw-r--r-- | include/llfio/v2.0/detail/impl/dynamic_thread_pool_group.ipp | 206 |
1 files changed, 183 insertions, 23 deletions
diff --git a/include/llfio/v2.0/detail/impl/dynamic_thread_pool_group.ipp b/include/llfio/v2.0/detail/impl/dynamic_thread_pool_group.ipp index 7e6eb2ac..85586224 100644 --- a/include/llfio/v2.0/detail/impl/dynamic_thread_pool_group.ipp +++ b/include/llfio/v2.0/detail/impl/dynamic_thread_pool_group.ipp @@ -24,8 +24,11 @@ Distributed under the Boost Software License, Version 1.0. #include "../../dynamic_thread_pool_group.hpp" +#include "../../statfs.hpp" + #include <atomic> #include <mutex> +#include <unordered_map> #include <unordered_set> #include <vector> @@ -33,7 +36,8 @@ Distributed under the Boost Software License, Version 1.0. #include "windows/import.hpp" #include <threadpoolapiset.h> #else -#include <pthread> +#include <pthread.h> +#include <thread> #endif LLFIO_V2_NAMESPACE_BEGIN @@ -66,6 +70,17 @@ namespace detail }; std::vector<workqueue_item> workqueue; + std::mutex io_aware_work_item_handles_lock; + struct io_aware_work_item_statfs + { + size_t refcount{0}; + deadline default_deadline; + float average_busy{0}; + std::chrono::steady_clock::time_point last_updated; + statfs_t statfs; + }; + std::unordered_map<file_handle::unique_id_type, io_aware_work_item_statfs, file_handle::unique_id_type_hasher> io_aware_work_item_handles; + global_dynamic_thread_pool_impl() { workqueue.reserve(4); // preallocate 4 levels of nesting @@ -264,7 +279,7 @@ public: { for(auto *i : work) { - i->_parent = this; + i->_parent.store(this, std::memory_order_release); detail::global_dynamic_thread_pool_impl::_append_to_list(_work_items_delayed, i); } return success(); @@ -341,7 +356,7 @@ namespace detail inline void global_dynamic_thread_pool_impl::_submit_work_item(_lock_guard &g, dynamic_thread_pool_group::work_item *workitem) { (void) g; - if(workitem->_nextwork != -1 && !workitem->_parent->_stopping.load(std::memory_order_relaxed)) + if(workitem->_nextwork != -1 && !workitem->_parent.load(std::memory_order_relaxed)->_stopping.load(std::memory_order_relaxed)) { // If no work item for now, or there is a delay, schedule a timer if(workitem->_nextwork == 0 || workitem->_timepoint1 != std::chrono::steady_clock::time_point() || @@ -350,6 +365,7 @@ namespace detail assert(workitem->_internaltimerh != nullptr); #ifdef _WIN32 LARGE_INTEGER li; + DWORD slop = 1000; if(workitem->_timepoint1 != std::chrono::steady_clock::time_point()) { li.QuadPart = std::chrono::duration_cast<std::chrono::nanoseconds>(workitem->_timepoint1 - std::chrono::steady_clock::now()).count() / 100; @@ -357,6 +373,10 @@ namespace detail { li.QuadPart = 0; } + if(li.QuadPart / 8 < (int64_t) slop) + { + slop = (DWORD)(li.QuadPart / 8); + } li.QuadPart = -li.QuadPart; // negative is relative } else if(workitem->_timepoint2 != std::chrono::system_clock::time_point()) @@ -371,7 +391,7 @@ namespace detail ft.dwHighDateTime = (DWORD) li.HighPart; ft.dwLowDateTime = li.LowPart; // std::cout << "*** timer " << workitem << std::endl; - SetThreadpoolTimer((PTP_TIMER) workitem->_internaltimerh, &ft, 0, 1000); + SetThreadpoolTimer((PTP_TIMER) workitem->_internaltimerh, &ft, 0, slop); #else #endif } @@ -380,15 +400,15 @@ namespace detail #ifdef _WIN32 // Set the priority of the group according to distance from the top TP_CALLBACK_PRIORITY priority = TP_CALLBACK_PRIORITY_LOW; - if(workqueue.size() - workitem->_parent->_nesting_level == 1) + if(workqueue.size() - workitem->_parent.load(std::memory_order_relaxed)->_nesting_level == 1) { priority = TP_CALLBACK_PRIORITY_HIGH; } - else if(workqueue.size() - workitem->_parent->_nesting_level == 2) + else if(workqueue.size() - workitem->_parent.load(std::memory_order_relaxed)->_nesting_level == 2) { priority = TP_CALLBACK_PRIORITY_NORMAL; } - SetThreadpoolCallbackPriority(workitem->_parent->_grouph, priority); + SetThreadpoolCallbackPriority(workitem->_parent.load(std::memory_order_relaxed)->_grouph, priority); // std::cout << "*** submit " << workitem << std::endl; SubmitThreadpoolWork((PTP_WORK) workitem->_internalworkh); #else @@ -408,7 +428,7 @@ namespace detail } for(auto *i : work) { - if(i->_parent != nullptr) + if(i->_parent.load(std::memory_order_relaxed) != nullptr) { return errc::address_in_use; } @@ -430,13 +450,13 @@ namespace detail } #else #endif - i->_parent = nullptr; + i->_parent.store(nullptr, std::memory_order_release); } }); for(auto *i : work) { deadline d(std::chrono::seconds(0)); - i->_parent = group; + i->_parent.store(group, std::memory_order_release); i->_nextwork = i->next(d); if(-1 == i->_nextwork) { @@ -494,16 +514,16 @@ namespace detail } #else #endif - _remove_from_list(i->_parent->_work_items_active, i); - _append_to_list(i->_parent->_work_items_done, i); - if(i->_parent->_work_items_active.count == 0) + _remove_from_list(i->_parent.load(std::memory_order_relaxed)->_work_items_active, i); + _append_to_list(i->_parent.load(std::memory_order_relaxed)->_work_items_done, i); + if(i->_parent.load(std::memory_order_relaxed)->_work_items_active.count == 0) { - auto *parent = i->_parent; + auto *parent = i->_parent.load(std::memory_order_relaxed); i = nullptr; auto *v = parent->_work_items_done.front, *n = v; for(; v != nullptr; v = n) { - v->_parent = nullptr; + v->_parent.store(nullptr, std::memory_order_release); v->_nextwork = -1; n = v->_next; } @@ -541,10 +561,10 @@ namespace detail { deadline d(std::chrono::seconds(0)); workitem->_nextwork = workitem->next(d); - auto r = _prepare_work_item_delay(workitem, workitem->_parent->_grouph, d); + auto r = _prepare_work_item_delay(workitem, workitem->_parent.load(std::memory_order_relaxed)->_grouph, d); if(!r) { - (void) stop(g, workitem->_parent, std::move(r)); + (void) stop(g, workitem->_parent.load(std::memory_order_relaxed), std::move(r)); _work_item_done(g, workitem); return; } @@ -731,7 +751,9 @@ namespace detail { LLFIO_LOG_FUNCTION_CALL(this); // std::cout << "*** _timerthread " << workitem << std::endl; - _lock_guard g(workitem->_parent->_lock); + _lock_guard g(workitem->_parent.load(std::memory_order_relaxed)->_lock); + workitem->_timepoint1 = {}; + workitem->_timepoint2 = {}; _work_item_next(g, workitem); } @@ -742,9 +764,9 @@ namespace detail // std::cout << "*** _workerthread " << workitem << " with work " << workitem->_nextwork << std::endl; assert(workitem->_nextwork != -1); assert(workitem->_nextwork != 0); - if(workitem->_parent->_stopped.load(std::memory_order_relaxed)) + if(workitem->_parent.load(std::memory_order_relaxed)->_stopped.load(std::memory_order_relaxed)) { - _lock_guard g(workitem->_parent->_lock); + _lock_guard g(workitem->_parent.load(std::memory_order_relaxed)->_lock); _work_item_done(g, workitem); return; } @@ -752,14 +774,14 @@ namespace detail auto old_thread_local_state = tls; tls.workitem = workitem; tls.current_callback_instance = selfthreadh; - tls.nesting_level = workitem->_parent->_nesting_level + 1; + tls.nesting_level = workitem->_parent.load(std::memory_order_relaxed)->_nesting_level + 1; auto r = (*workitem)(workitem->_nextwork); workitem->_nextwork = 0; // call next() next time tls = old_thread_local_state; - _lock_guard g(workitem->_parent->_lock); + _lock_guard g(workitem->_parent.load(std::memory_order_relaxed)->_lock); if(!r) { - (void) stop(g, workitem->_parent, std::move(r)); + (void) stop(g, workitem->_parent.load(std::memory_order_relaxed), std::move(r)); _work_item_done(g, workitem); workitem = nullptr; } @@ -770,4 +792,142 @@ namespace detail } } // namespace detail + +/****************************************** io_aware_work_item *********************************************/ + +LLFIO_HEADERS_ONLY_MEMFUNC_SPEC dynamic_thread_pool_group::io_aware_work_item::io_aware_work_item(span<io_handle_awareness> hs) + : _handles([](span<io_handle_awareness> hs) -> span<io_handle_awareness> { + float all = 0; + for(auto &i : hs) + { + all += i.reads + i.writes + i.barriers; + } + for(auto &i : hs) + { + if(all == 0.0f) + { + i.reads = i.writes = 0.5f; + i.barriers = 0.0f; + } + else + { + i.reads /= all; + i.writes /= all; + i.barriers /= all; + } + } + auto &impl = detail::global_dynamic_thread_pool(); + detail::global_dynamic_thread_pool_impl::_lock_guard g(impl.io_aware_work_item_handles_lock); + for(auto &h : hs) + { + if(!h.h->is_seekable()) + { + throw std::runtime_error("Supplied handle is not seekable"); + } + auto *fh = static_cast<file_handle *>(h.h); + auto unique_id = fh->unique_id(); + auto it = impl.io_aware_work_item_handles.find(unique_id); + if(it == impl.io_aware_work_item_handles.end()) + { + it = impl.io_aware_work_item_handles.emplace(unique_id, detail::global_dynamic_thread_pool_impl::io_aware_work_item_statfs{}).first; + auto r = it->second.statfs.fill(*fh, statfs_t::want::iosinprogress | statfs_t::want::iosbusytime); + if(!r || it->second.statfs.f_iosinprogress == (uint32_t) -1) + { + impl.io_aware_work_item_handles.erase(it); + if(!r) + { + r.value(); + } + throw std::runtime_error("statfs::f_iosinprogress unavailable for supplied handle"); + } + it->second.last_updated = std::chrono::steady_clock::now(); + } + it->second.refcount++; + h._internal = &*it; + } + return hs; + }(hs)) +{ + LLFIO_LOG_FUNCTION_CALL(this); +} + +LLFIO_HEADERS_ONLY_MEMFUNC_SPEC dynamic_thread_pool_group::io_aware_work_item::~io_aware_work_item() +{ + LLFIO_LOG_FUNCTION_CALL(this); + auto &impl = detail::global_dynamic_thread_pool(); + detail::global_dynamic_thread_pool_impl::_lock_guard g(impl.io_aware_work_item_handles_lock); + using value_type = decltype(impl.io_aware_work_item_handles)::value_type; + for(auto &h : _handles) + { + auto *i = (value_type *) h._internal; + if(0 == --i->second.refcount) + { + impl.io_aware_work_item_handles.erase(i->first); + } + } +} + +LLFIO_HEADERS_ONLY_MEMFUNC_SPEC intptr_t dynamic_thread_pool_group::io_aware_work_item::next(deadline &d) noexcept +{ + LLFIO_LOG_FUNCTION_CALL(this); + { + auto &impl = detail::global_dynamic_thread_pool(); + auto now = std::chrono::steady_clock::now(); + detail::global_dynamic_thread_pool_impl::_lock_guard g(impl.io_aware_work_item_handles_lock); + using value_type = decltype(impl.io_aware_work_item_handles)::value_type; + for(auto &h : _handles) + { + auto *i = (value_type *) h._internal; + if(std::chrono::duration_cast<std::chrono::milliseconds>(now - i->second.last_updated) >= std::chrono::milliseconds(100)) + { + // auto old_iosinprogress = i->second.statfs.f_iosinprogress; + auto elapsed = now - i->second.last_updated; + (void) i->second.statfs.fill(*h.h, statfs_t::want::iosinprogress | statfs_t::want::iosbusytime); + i->second.last_updated = now; + + if(elapsed>std::chrono::seconds(5)) + { + i->second.average_busy = i->second.statfs.f_iosbusytime; + } + else + { + i->second.average_busy = (i->second.average_busy * 0.9f) + (i->second.statfs.f_iosbusytime * 0.1f); + } + if(i->second.average_busy < 0.90f) + { + i->second.default_deadline = std::chrono::seconds(0); // remove pacing + } + else if(i->second.statfs.f_iosinprogress > 1) + { + if((i->second.default_deadline.nsecs >> 4) > 0) + { + i->second.default_deadline.nsecs += i->second.default_deadline.nsecs >> 4; + } + else + { + i->second.default_deadline.nsecs++; + } + } + else + { + if(i->second.default_deadline.nsecs > (i->second.default_deadline.nsecs >> 4) && (i->second.default_deadline.nsecs >> 4) > 0) + { + i->second.default_deadline.nsecs -= i->second.default_deadline.nsecs >> 4; + } + else if(i->second.default_deadline.nsecs > 1) + { + i->second.default_deadline.nsecs--; + } + } + } + if(d.nsecs < i->second.default_deadline.nsecs) + { + d = i->second.default_deadline; + } + } + } + return io_aware_next(d); +} + + LLFIO_V2_NAMESPACE_END |