Welcome to mirror list, hosted at ThFree Co, Russian Federation.

github.com/windirstat/llfio.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
Diffstat (limited to 'include/llfio/v2.0/detail/impl/dynamic_thread_pool_group.ipp')
-rw-r--r--include/llfio/v2.0/detail/impl/dynamic_thread_pool_group.ipp206
1 files changed, 183 insertions, 23 deletions
diff --git a/include/llfio/v2.0/detail/impl/dynamic_thread_pool_group.ipp b/include/llfio/v2.0/detail/impl/dynamic_thread_pool_group.ipp
index 7e6eb2ac..85586224 100644
--- a/include/llfio/v2.0/detail/impl/dynamic_thread_pool_group.ipp
+++ b/include/llfio/v2.0/detail/impl/dynamic_thread_pool_group.ipp
@@ -24,8 +24,11 @@ Distributed under the Boost Software License, Version 1.0.
#include "../../dynamic_thread_pool_group.hpp"
+#include "../../statfs.hpp"
+
#include <atomic>
#include <mutex>
+#include <unordered_map>
#include <unordered_set>
#include <vector>
@@ -33,7 +36,8 @@ Distributed under the Boost Software License, Version 1.0.
#include "windows/import.hpp"
#include <threadpoolapiset.h>
#else
-#include <pthread>
+#include <pthread.h>
+#include <thread>
#endif
LLFIO_V2_NAMESPACE_BEGIN
@@ -66,6 +70,17 @@ namespace detail
};
std::vector<workqueue_item> workqueue;
+ std::mutex io_aware_work_item_handles_lock;
+ struct io_aware_work_item_statfs
+ {
+ size_t refcount{0};
+ deadline default_deadline;
+ float average_busy{0};
+ std::chrono::steady_clock::time_point last_updated;
+ statfs_t statfs;
+ };
+ std::unordered_map<file_handle::unique_id_type, io_aware_work_item_statfs, file_handle::unique_id_type_hasher> io_aware_work_item_handles;
+
global_dynamic_thread_pool_impl()
{
workqueue.reserve(4); // preallocate 4 levels of nesting
@@ -264,7 +279,7 @@ public:
{
for(auto *i : work)
{
- i->_parent = this;
+ i->_parent.store(this, std::memory_order_release);
detail::global_dynamic_thread_pool_impl::_append_to_list(_work_items_delayed, i);
}
return success();
@@ -341,7 +356,7 @@ namespace detail
inline void global_dynamic_thread_pool_impl::_submit_work_item(_lock_guard &g, dynamic_thread_pool_group::work_item *workitem)
{
(void) g;
- if(workitem->_nextwork != -1 && !workitem->_parent->_stopping.load(std::memory_order_relaxed))
+ if(workitem->_nextwork != -1 && !workitem->_parent.load(std::memory_order_relaxed)->_stopping.load(std::memory_order_relaxed))
{
// If no work item for now, or there is a delay, schedule a timer
if(workitem->_nextwork == 0 || workitem->_timepoint1 != std::chrono::steady_clock::time_point() ||
@@ -350,6 +365,7 @@ namespace detail
assert(workitem->_internaltimerh != nullptr);
#ifdef _WIN32
LARGE_INTEGER li;
+ DWORD slop = 1000;
if(workitem->_timepoint1 != std::chrono::steady_clock::time_point())
{
li.QuadPart = std::chrono::duration_cast<std::chrono::nanoseconds>(workitem->_timepoint1 - std::chrono::steady_clock::now()).count() / 100;
@@ -357,6 +373,10 @@ namespace detail
{
li.QuadPart = 0;
}
+ if(li.QuadPart / 8 < (int64_t) slop)
+ {
+ slop = (DWORD)(li.QuadPart / 8);
+ }
li.QuadPart = -li.QuadPart; // negative is relative
}
else if(workitem->_timepoint2 != std::chrono::system_clock::time_point())
@@ -371,7 +391,7 @@ namespace detail
ft.dwHighDateTime = (DWORD) li.HighPart;
ft.dwLowDateTime = li.LowPart;
// std::cout << "*** timer " << workitem << std::endl;
- SetThreadpoolTimer((PTP_TIMER) workitem->_internaltimerh, &ft, 0, 1000);
+ SetThreadpoolTimer((PTP_TIMER) workitem->_internaltimerh, &ft, 0, slop);
#else
#endif
}
@@ -380,15 +400,15 @@ namespace detail
#ifdef _WIN32
// Set the priority of the group according to distance from the top
TP_CALLBACK_PRIORITY priority = TP_CALLBACK_PRIORITY_LOW;
- if(workqueue.size() - workitem->_parent->_nesting_level == 1)
+ if(workqueue.size() - workitem->_parent.load(std::memory_order_relaxed)->_nesting_level == 1)
{
priority = TP_CALLBACK_PRIORITY_HIGH;
}
- else if(workqueue.size() - workitem->_parent->_nesting_level == 2)
+ else if(workqueue.size() - workitem->_parent.load(std::memory_order_relaxed)->_nesting_level == 2)
{
priority = TP_CALLBACK_PRIORITY_NORMAL;
}
- SetThreadpoolCallbackPriority(workitem->_parent->_grouph, priority);
+ SetThreadpoolCallbackPriority(workitem->_parent.load(std::memory_order_relaxed)->_grouph, priority);
// std::cout << "*** submit " << workitem << std::endl;
SubmitThreadpoolWork((PTP_WORK) workitem->_internalworkh);
#else
@@ -408,7 +428,7 @@ namespace detail
}
for(auto *i : work)
{
- if(i->_parent != nullptr)
+ if(i->_parent.load(std::memory_order_relaxed) != nullptr)
{
return errc::address_in_use;
}
@@ -430,13 +450,13 @@ namespace detail
}
#else
#endif
- i->_parent = nullptr;
+ i->_parent.store(nullptr, std::memory_order_release);
}
});
for(auto *i : work)
{
deadline d(std::chrono::seconds(0));
- i->_parent = group;
+ i->_parent.store(group, std::memory_order_release);
i->_nextwork = i->next(d);
if(-1 == i->_nextwork)
{
@@ -494,16 +514,16 @@ namespace detail
}
#else
#endif
- _remove_from_list(i->_parent->_work_items_active, i);
- _append_to_list(i->_parent->_work_items_done, i);
- if(i->_parent->_work_items_active.count == 0)
+ _remove_from_list(i->_parent.load(std::memory_order_relaxed)->_work_items_active, i);
+ _append_to_list(i->_parent.load(std::memory_order_relaxed)->_work_items_done, i);
+ if(i->_parent.load(std::memory_order_relaxed)->_work_items_active.count == 0)
{
- auto *parent = i->_parent;
+ auto *parent = i->_parent.load(std::memory_order_relaxed);
i = nullptr;
auto *v = parent->_work_items_done.front, *n = v;
for(; v != nullptr; v = n)
{
- v->_parent = nullptr;
+ v->_parent.store(nullptr, std::memory_order_release);
v->_nextwork = -1;
n = v->_next;
}
@@ -541,10 +561,10 @@ namespace detail
{
deadline d(std::chrono::seconds(0));
workitem->_nextwork = workitem->next(d);
- auto r = _prepare_work_item_delay(workitem, workitem->_parent->_grouph, d);
+ auto r = _prepare_work_item_delay(workitem, workitem->_parent.load(std::memory_order_relaxed)->_grouph, d);
if(!r)
{
- (void) stop(g, workitem->_parent, std::move(r));
+ (void) stop(g, workitem->_parent.load(std::memory_order_relaxed), std::move(r));
_work_item_done(g, workitem);
return;
}
@@ -731,7 +751,9 @@ namespace detail
{
LLFIO_LOG_FUNCTION_CALL(this);
// std::cout << "*** _timerthread " << workitem << std::endl;
- _lock_guard g(workitem->_parent->_lock);
+ _lock_guard g(workitem->_parent.load(std::memory_order_relaxed)->_lock);
+ workitem->_timepoint1 = {};
+ workitem->_timepoint2 = {};
_work_item_next(g, workitem);
}
@@ -742,9 +764,9 @@ namespace detail
// std::cout << "*** _workerthread " << workitem << " with work " << workitem->_nextwork << std::endl;
assert(workitem->_nextwork != -1);
assert(workitem->_nextwork != 0);
- if(workitem->_parent->_stopped.load(std::memory_order_relaxed))
+ if(workitem->_parent.load(std::memory_order_relaxed)->_stopped.load(std::memory_order_relaxed))
{
- _lock_guard g(workitem->_parent->_lock);
+ _lock_guard g(workitem->_parent.load(std::memory_order_relaxed)->_lock);
_work_item_done(g, workitem);
return;
}
@@ -752,14 +774,14 @@ namespace detail
auto old_thread_local_state = tls;
tls.workitem = workitem;
tls.current_callback_instance = selfthreadh;
- tls.nesting_level = workitem->_parent->_nesting_level + 1;
+ tls.nesting_level = workitem->_parent.load(std::memory_order_relaxed)->_nesting_level + 1;
auto r = (*workitem)(workitem->_nextwork);
workitem->_nextwork = 0; // call next() next time
tls = old_thread_local_state;
- _lock_guard g(workitem->_parent->_lock);
+ _lock_guard g(workitem->_parent.load(std::memory_order_relaxed)->_lock);
if(!r)
{
- (void) stop(g, workitem->_parent, std::move(r));
+ (void) stop(g, workitem->_parent.load(std::memory_order_relaxed), std::move(r));
_work_item_done(g, workitem);
workitem = nullptr;
}
@@ -770,4 +792,142 @@ namespace detail
}
} // namespace detail
+
+/****************************************** io_aware_work_item *********************************************/
+
+LLFIO_HEADERS_ONLY_MEMFUNC_SPEC dynamic_thread_pool_group::io_aware_work_item::io_aware_work_item(span<io_handle_awareness> hs)
+ : _handles([](span<io_handle_awareness> hs) -> span<io_handle_awareness> {
+ float all = 0;
+ for(auto &i : hs)
+ {
+ all += i.reads + i.writes + i.barriers;
+ }
+ for(auto &i : hs)
+ {
+ if(all == 0.0f)
+ {
+ i.reads = i.writes = 0.5f;
+ i.barriers = 0.0f;
+ }
+ else
+ {
+ i.reads /= all;
+ i.writes /= all;
+ i.barriers /= all;
+ }
+ }
+ auto &impl = detail::global_dynamic_thread_pool();
+ detail::global_dynamic_thread_pool_impl::_lock_guard g(impl.io_aware_work_item_handles_lock);
+ for(auto &h : hs)
+ {
+ if(!h.h->is_seekable())
+ {
+ throw std::runtime_error("Supplied handle is not seekable");
+ }
+ auto *fh = static_cast<file_handle *>(h.h);
+ auto unique_id = fh->unique_id();
+ auto it = impl.io_aware_work_item_handles.find(unique_id);
+ if(it == impl.io_aware_work_item_handles.end())
+ {
+ it = impl.io_aware_work_item_handles.emplace(unique_id, detail::global_dynamic_thread_pool_impl::io_aware_work_item_statfs{}).first;
+ auto r = it->second.statfs.fill(*fh, statfs_t::want::iosinprogress | statfs_t::want::iosbusytime);
+ if(!r || it->second.statfs.f_iosinprogress == (uint32_t) -1)
+ {
+ impl.io_aware_work_item_handles.erase(it);
+ if(!r)
+ {
+ r.value();
+ }
+ throw std::runtime_error("statfs::f_iosinprogress unavailable for supplied handle");
+ }
+ it->second.last_updated = std::chrono::steady_clock::now();
+ }
+ it->second.refcount++;
+ h._internal = &*it;
+ }
+ return hs;
+ }(hs))
+{
+ LLFIO_LOG_FUNCTION_CALL(this);
+}
+
+LLFIO_HEADERS_ONLY_MEMFUNC_SPEC dynamic_thread_pool_group::io_aware_work_item::~io_aware_work_item()
+{
+ LLFIO_LOG_FUNCTION_CALL(this);
+ auto &impl = detail::global_dynamic_thread_pool();
+ detail::global_dynamic_thread_pool_impl::_lock_guard g(impl.io_aware_work_item_handles_lock);
+ using value_type = decltype(impl.io_aware_work_item_handles)::value_type;
+ for(auto &h : _handles)
+ {
+ auto *i = (value_type *) h._internal;
+ if(0 == --i->second.refcount)
+ {
+ impl.io_aware_work_item_handles.erase(i->first);
+ }
+ }
+}
+
+LLFIO_HEADERS_ONLY_MEMFUNC_SPEC intptr_t dynamic_thread_pool_group::io_aware_work_item::next(deadline &d) noexcept
+{
+ LLFIO_LOG_FUNCTION_CALL(this);
+ {
+ auto &impl = detail::global_dynamic_thread_pool();
+ auto now = std::chrono::steady_clock::now();
+ detail::global_dynamic_thread_pool_impl::_lock_guard g(impl.io_aware_work_item_handles_lock);
+ using value_type = decltype(impl.io_aware_work_item_handles)::value_type;
+ for(auto &h : _handles)
+ {
+ auto *i = (value_type *) h._internal;
+ if(std::chrono::duration_cast<std::chrono::milliseconds>(now - i->second.last_updated) >= std::chrono::milliseconds(100))
+ {
+ // auto old_iosinprogress = i->second.statfs.f_iosinprogress;
+ auto elapsed = now - i->second.last_updated;
+ (void) i->second.statfs.fill(*h.h, statfs_t::want::iosinprogress | statfs_t::want::iosbusytime);
+ i->second.last_updated = now;
+
+ if(elapsed>std::chrono::seconds(5))
+ {
+ i->second.average_busy = i->second.statfs.f_iosbusytime;
+ }
+ else
+ {
+ i->second.average_busy = (i->second.average_busy * 0.9f) + (i->second.statfs.f_iosbusytime * 0.1f);
+ }
+ if(i->second.average_busy < 0.90f)
+ {
+ i->second.default_deadline = std::chrono::seconds(0); // remove pacing
+ }
+ else if(i->second.statfs.f_iosinprogress > 1)
+ {
+ if((i->second.default_deadline.nsecs >> 4) > 0)
+ {
+ i->second.default_deadline.nsecs += i->second.default_deadline.nsecs >> 4;
+ }
+ else
+ {
+ i->second.default_deadline.nsecs++;
+ }
+ }
+ else
+ {
+ if(i->second.default_deadline.nsecs > (i->second.default_deadline.nsecs >> 4) && (i->second.default_deadline.nsecs >> 4) > 0)
+ {
+ i->second.default_deadline.nsecs -= i->second.default_deadline.nsecs >> 4;
+ }
+ else if(i->second.default_deadline.nsecs > 1)
+ {
+ i->second.default_deadline.nsecs--;
+ }
+ }
+ }
+ if(d.nsecs < i->second.default_deadline.nsecs)
+ {
+ d = i->second.default_deadline;
+ }
+ }
+ }
+ return io_aware_next(d);
+}
+
+
LLFIO_V2_NAMESPACE_END