Welcome to mirror list, hosted at ThFree Co, Russian Federation.

github.com/windirstat/llfio.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorNiall Douglas (s [underscore] sourceforge {at} nedprod [dot] com) <spamtrap@nedprod.com>2020-12-29 17:41:08 +0300
committerNiall Douglas (s [underscore] sourceforge {at} nedprod [dot] com) <spamtrap@nedprod.com>2021-03-16 13:21:39 +0300
commit7eeb88fccc7e7071a8065afbbffc36379e8b7091 (patch)
treecf7ed48c4f889641b9f8d08c80d9399613ca3e63
parentdfa6771c4f8ba0635a252a39d13a48c7c5c75489 (diff)
Implement dynamic_thread_pool_group::io_aware_work_item.
-rw-r--r--include/llfio/v2.0/detail/impl/dynamic_thread_pool_group.ipp206
-rw-r--r--include/llfio/v2.0/detail/impl/posix/statfs.ipp18
-rw-r--r--include/llfio/v2.0/detail/impl/windows/file_handle.ipp8
-rw-r--r--include/llfio/v2.0/detail/impl/windows/statfs.ipp8
-rw-r--r--include/llfio/v2.0/dynamic_thread_pool_group.hpp103
-rw-r--r--include/llfio/v2.0/fs_handle.hpp2
-rw-r--r--include/llfio/v2.0/statfs.hpp6
-rw-r--r--test/tests/dynamic_thread_pool_group.cpp95
-rw-r--r--test/tests/statfs.cpp6
9 files changed, 397 insertions, 55 deletions
diff --git a/include/llfio/v2.0/detail/impl/dynamic_thread_pool_group.ipp b/include/llfio/v2.0/detail/impl/dynamic_thread_pool_group.ipp
index 7e6eb2ac..85586224 100644
--- a/include/llfio/v2.0/detail/impl/dynamic_thread_pool_group.ipp
+++ b/include/llfio/v2.0/detail/impl/dynamic_thread_pool_group.ipp
@@ -24,8 +24,11 @@ Distributed under the Boost Software License, Version 1.0.
#include "../../dynamic_thread_pool_group.hpp"
+#include "../../statfs.hpp"
+
#include <atomic>
#include <mutex>
+#include <unordered_map>
#include <unordered_set>
#include <vector>
@@ -33,7 +36,8 @@ Distributed under the Boost Software License, Version 1.0.
#include "windows/import.hpp"
#include <threadpoolapiset.h>
#else
-#include <pthread>
+#include <pthread.h>
+#include <thread>
#endif
LLFIO_V2_NAMESPACE_BEGIN
@@ -66,6 +70,17 @@ namespace detail
};
std::vector<workqueue_item> workqueue;
+ std::mutex io_aware_work_item_handles_lock;
+ struct io_aware_work_item_statfs
+ {
+ size_t refcount{0};
+ deadline default_deadline;
+ float average_busy{0};
+ std::chrono::steady_clock::time_point last_updated;
+ statfs_t statfs;
+ };
+ std::unordered_map<file_handle::unique_id_type, io_aware_work_item_statfs, file_handle::unique_id_type_hasher> io_aware_work_item_handles;
+
global_dynamic_thread_pool_impl()
{
workqueue.reserve(4); // preallocate 4 levels of nesting
@@ -264,7 +279,7 @@ public:
{
for(auto *i : work)
{
- i->_parent = this;
+ i->_parent.store(this, std::memory_order_release);
detail::global_dynamic_thread_pool_impl::_append_to_list(_work_items_delayed, i);
}
return success();
@@ -341,7 +356,7 @@ namespace detail
inline void global_dynamic_thread_pool_impl::_submit_work_item(_lock_guard &g, dynamic_thread_pool_group::work_item *workitem)
{
(void) g;
- if(workitem->_nextwork != -1 && !workitem->_parent->_stopping.load(std::memory_order_relaxed))
+ if(workitem->_nextwork != -1 && !workitem->_parent.load(std::memory_order_relaxed)->_stopping.load(std::memory_order_relaxed))
{
// If no work item for now, or there is a delay, schedule a timer
if(workitem->_nextwork == 0 || workitem->_timepoint1 != std::chrono::steady_clock::time_point() ||
@@ -350,6 +365,7 @@ namespace detail
assert(workitem->_internaltimerh != nullptr);
#ifdef _WIN32
LARGE_INTEGER li;
+ DWORD slop = 1000;
if(workitem->_timepoint1 != std::chrono::steady_clock::time_point())
{
li.QuadPart = std::chrono::duration_cast<std::chrono::nanoseconds>(workitem->_timepoint1 - std::chrono::steady_clock::now()).count() / 100;
@@ -357,6 +373,10 @@ namespace detail
{
li.QuadPart = 0;
}
+ if(li.QuadPart / 8 < (int64_t) slop)
+ {
+ slop = (DWORD)(li.QuadPart / 8);
+ }
li.QuadPart = -li.QuadPart; // negative is relative
}
else if(workitem->_timepoint2 != std::chrono::system_clock::time_point())
@@ -371,7 +391,7 @@ namespace detail
ft.dwHighDateTime = (DWORD) li.HighPart;
ft.dwLowDateTime = li.LowPart;
// std::cout << "*** timer " << workitem << std::endl;
- SetThreadpoolTimer((PTP_TIMER) workitem->_internaltimerh, &ft, 0, 1000);
+ SetThreadpoolTimer((PTP_TIMER) workitem->_internaltimerh, &ft, 0, slop);
#else
#endif
}
@@ -380,15 +400,15 @@ namespace detail
#ifdef _WIN32
// Set the priority of the group according to distance from the top
TP_CALLBACK_PRIORITY priority = TP_CALLBACK_PRIORITY_LOW;
- if(workqueue.size() - workitem->_parent->_nesting_level == 1)
+ if(workqueue.size() - workitem->_parent.load(std::memory_order_relaxed)->_nesting_level == 1)
{
priority = TP_CALLBACK_PRIORITY_HIGH;
}
- else if(workqueue.size() - workitem->_parent->_nesting_level == 2)
+ else if(workqueue.size() - workitem->_parent.load(std::memory_order_relaxed)->_nesting_level == 2)
{
priority = TP_CALLBACK_PRIORITY_NORMAL;
}
- SetThreadpoolCallbackPriority(workitem->_parent->_grouph, priority);
+ SetThreadpoolCallbackPriority(workitem->_parent.load(std::memory_order_relaxed)->_grouph, priority);
// std::cout << "*** submit " << workitem << std::endl;
SubmitThreadpoolWork((PTP_WORK) workitem->_internalworkh);
#else
@@ -408,7 +428,7 @@ namespace detail
}
for(auto *i : work)
{
- if(i->_parent != nullptr)
+ if(i->_parent.load(std::memory_order_relaxed) != nullptr)
{
return errc::address_in_use;
}
@@ -430,13 +450,13 @@ namespace detail
}
#else
#endif
- i->_parent = nullptr;
+ i->_parent.store(nullptr, std::memory_order_release);
}
});
for(auto *i : work)
{
deadline d(std::chrono::seconds(0));
- i->_parent = group;
+ i->_parent.store(group, std::memory_order_release);
i->_nextwork = i->next(d);
if(-1 == i->_nextwork)
{
@@ -494,16 +514,16 @@ namespace detail
}
#else
#endif
- _remove_from_list(i->_parent->_work_items_active, i);
- _append_to_list(i->_parent->_work_items_done, i);
- if(i->_parent->_work_items_active.count == 0)
+ _remove_from_list(i->_parent.load(std::memory_order_relaxed)->_work_items_active, i);
+ _append_to_list(i->_parent.load(std::memory_order_relaxed)->_work_items_done, i);
+ if(i->_parent.load(std::memory_order_relaxed)->_work_items_active.count == 0)
{
- auto *parent = i->_parent;
+ auto *parent = i->_parent.load(std::memory_order_relaxed);
i = nullptr;
auto *v = parent->_work_items_done.front, *n = v;
for(; v != nullptr; v = n)
{
- v->_parent = nullptr;
+ v->_parent.store(nullptr, std::memory_order_release);
v->_nextwork = -1;
n = v->_next;
}
@@ -541,10 +561,10 @@ namespace detail
{
deadline d(std::chrono::seconds(0));
workitem->_nextwork = workitem->next(d);
- auto r = _prepare_work_item_delay(workitem, workitem->_parent->_grouph, d);
+ auto r = _prepare_work_item_delay(workitem, workitem->_parent.load(std::memory_order_relaxed)->_grouph, d);
if(!r)
{
- (void) stop(g, workitem->_parent, std::move(r));
+ (void) stop(g, workitem->_parent.load(std::memory_order_relaxed), std::move(r));
_work_item_done(g, workitem);
return;
}
@@ -731,7 +751,9 @@ namespace detail
{
LLFIO_LOG_FUNCTION_CALL(this);
// std::cout << "*** _timerthread " << workitem << std::endl;
- _lock_guard g(workitem->_parent->_lock);
+ _lock_guard g(workitem->_parent.load(std::memory_order_relaxed)->_lock);
+ workitem->_timepoint1 = {};
+ workitem->_timepoint2 = {};
_work_item_next(g, workitem);
}
@@ -742,9 +764,9 @@ namespace detail
// std::cout << "*** _workerthread " << workitem << " with work " << workitem->_nextwork << std::endl;
assert(workitem->_nextwork != -1);
assert(workitem->_nextwork != 0);
- if(workitem->_parent->_stopped.load(std::memory_order_relaxed))
+ if(workitem->_parent.load(std::memory_order_relaxed)->_stopped.load(std::memory_order_relaxed))
{
- _lock_guard g(workitem->_parent->_lock);
+ _lock_guard g(workitem->_parent.load(std::memory_order_relaxed)->_lock);
_work_item_done(g, workitem);
return;
}
@@ -752,14 +774,14 @@ namespace detail
auto old_thread_local_state = tls;
tls.workitem = workitem;
tls.current_callback_instance = selfthreadh;
- tls.nesting_level = workitem->_parent->_nesting_level + 1;
+ tls.nesting_level = workitem->_parent.load(std::memory_order_relaxed)->_nesting_level + 1;
auto r = (*workitem)(workitem->_nextwork);
workitem->_nextwork = 0; // call next() next time
tls = old_thread_local_state;
- _lock_guard g(workitem->_parent->_lock);
+ _lock_guard g(workitem->_parent.load(std::memory_order_relaxed)->_lock);
if(!r)
{
- (void) stop(g, workitem->_parent, std::move(r));
+ (void) stop(g, workitem->_parent.load(std::memory_order_relaxed), std::move(r));
_work_item_done(g, workitem);
workitem = nullptr;
}
@@ -770,4 +792,142 @@ namespace detail
}
} // namespace detail
+
+/****************************************** io_aware_work_item *********************************************/
+
+LLFIO_HEADERS_ONLY_MEMFUNC_SPEC dynamic_thread_pool_group::io_aware_work_item::io_aware_work_item(span<io_handle_awareness> hs)
+ : _handles([](span<io_handle_awareness> hs) -> span<io_handle_awareness> {
+ float all = 0;
+ for(auto &i : hs)
+ {
+ all += i.reads + i.writes + i.barriers;
+ }
+ for(auto &i : hs)
+ {
+ if(all == 0.0f)
+ {
+ i.reads = i.writes = 0.5f;
+ i.barriers = 0.0f;
+ }
+ else
+ {
+ i.reads /= all;
+ i.writes /= all;
+ i.barriers /= all;
+ }
+ }
+ auto &impl = detail::global_dynamic_thread_pool();
+ detail::global_dynamic_thread_pool_impl::_lock_guard g(impl.io_aware_work_item_handles_lock);
+ for(auto &h : hs)
+ {
+ if(!h.h->is_seekable())
+ {
+ throw std::runtime_error("Supplied handle is not seekable");
+ }
+ auto *fh = static_cast<file_handle *>(h.h);
+ auto unique_id = fh->unique_id();
+ auto it = impl.io_aware_work_item_handles.find(unique_id);
+ if(it == impl.io_aware_work_item_handles.end())
+ {
+ it = impl.io_aware_work_item_handles.emplace(unique_id, detail::global_dynamic_thread_pool_impl::io_aware_work_item_statfs{}).first;
+ auto r = it->second.statfs.fill(*fh, statfs_t::want::iosinprogress | statfs_t::want::iosbusytime);
+ if(!r || it->second.statfs.f_iosinprogress == (uint32_t) -1)
+ {
+ impl.io_aware_work_item_handles.erase(it);
+ if(!r)
+ {
+ r.value();
+ }
+ throw std::runtime_error("statfs::f_iosinprogress unavailable for supplied handle");
+ }
+ it->second.last_updated = std::chrono::steady_clock::now();
+ }
+ it->second.refcount++;
+ h._internal = &*it;
+ }
+ return hs;
+ }(hs))
+{
+ LLFIO_LOG_FUNCTION_CALL(this);
+}
+
+LLFIO_HEADERS_ONLY_MEMFUNC_SPEC dynamic_thread_pool_group::io_aware_work_item::~io_aware_work_item()
+{
+ LLFIO_LOG_FUNCTION_CALL(this);
+ auto &impl = detail::global_dynamic_thread_pool();
+ detail::global_dynamic_thread_pool_impl::_lock_guard g(impl.io_aware_work_item_handles_lock);
+ using value_type = decltype(impl.io_aware_work_item_handles)::value_type;
+ for(auto &h : _handles)
+ {
+ auto *i = (value_type *) h._internal;
+ if(0 == --i->second.refcount)
+ {
+ impl.io_aware_work_item_handles.erase(i->first);
+ }
+ }
+}
+
+LLFIO_HEADERS_ONLY_MEMFUNC_SPEC intptr_t dynamic_thread_pool_group::io_aware_work_item::next(deadline &d) noexcept
+{
+ LLFIO_LOG_FUNCTION_CALL(this);
+ {
+ auto &impl = detail::global_dynamic_thread_pool();
+ auto now = std::chrono::steady_clock::now();
+ detail::global_dynamic_thread_pool_impl::_lock_guard g(impl.io_aware_work_item_handles_lock);
+ using value_type = decltype(impl.io_aware_work_item_handles)::value_type;
+ for(auto &h : _handles)
+ {
+ auto *i = (value_type *) h._internal;
+ if(std::chrono::duration_cast<std::chrono::milliseconds>(now - i->second.last_updated) >= std::chrono::milliseconds(100))
+ {
+ // auto old_iosinprogress = i->second.statfs.f_iosinprogress;
+ auto elapsed = now - i->second.last_updated;
+ (void) i->second.statfs.fill(*h.h, statfs_t::want::iosinprogress | statfs_t::want::iosbusytime);
+ i->second.last_updated = now;
+
+ if(elapsed>std::chrono::seconds(5))
+ {
+ i->second.average_busy = i->second.statfs.f_iosbusytime;
+ }
+ else
+ {
+ i->second.average_busy = (i->second.average_busy * 0.9f) + (i->second.statfs.f_iosbusytime * 0.1f);
+ }
+ if(i->second.average_busy < 0.90f)
+ {
+ i->second.default_deadline = std::chrono::seconds(0); // remove pacing
+ }
+ else if(i->second.statfs.f_iosinprogress > 1)
+ {
+ if((i->second.default_deadline.nsecs >> 4) > 0)
+ {
+ i->second.default_deadline.nsecs += i->second.default_deadline.nsecs >> 4;
+ }
+ else
+ {
+ i->second.default_deadline.nsecs++;
+ }
+ }
+ else
+ {
+ if(i->second.default_deadline.nsecs > (i->second.default_deadline.nsecs >> 4) && (i->second.default_deadline.nsecs >> 4) > 0)
+ {
+ i->second.default_deadline.nsecs -= i->second.default_deadline.nsecs >> 4;
+ }
+ else if(i->second.default_deadline.nsecs > 1)
+ {
+ i->second.default_deadline.nsecs--;
+ }
+ }
+ }
+ if(d.nsecs < i->second.default_deadline.nsecs)
+ {
+ d = i->second.default_deadline;
+ }
+ }
+ }
+ return io_aware_next(d);
+}
+
+
LLFIO_V2_NAMESPACE_END
diff --git a/include/llfio/v2.0/detail/impl/posix/statfs.ipp b/include/llfio/v2.0/detail/impl/posix/statfs.ipp
index 8b2ebcf3..97aff0f0 100644
--- a/include/llfio/v2.0/detail/impl/posix/statfs.ipp
+++ b/include/llfio/v2.0/detail/impl/posix/statfs.ipp
@@ -42,7 +42,7 @@ LLFIO_HEADERS_ONLY_MEMFUNC_SPEC result<size_t> statfs_t::fill(const handle &h, s
{
size_t ret = 0;
#ifdef __linux__
- if(!!(wanted & ~(want::iosinprogress | want::ioswaittime)))
+ if(!!(wanted & ~(want::iosinprogress | want::iosbusytime)))
{
struct statfs64 s
{
@@ -319,7 +319,7 @@ LLFIO_HEADERS_ONLY_MEMFUNC_SPEC result<size_t> statfs_t::fill(const handle &h, s
++ret;
}
#endif
- if(!!(wanted & want::iosinprogress) || !!(wanted & want::ioswaittime))
+ if(!!(wanted & want::iosinprogress) || !!(wanted & want::iosbusytime))
{
OUTCOME_TRY(auto ios, _fill_ios(h, f_mntfromname));
if(!!(wanted & want::iosinprogress))
@@ -327,9 +327,9 @@ LLFIO_HEADERS_ONLY_MEMFUNC_SPEC result<size_t> statfs_t::fill(const handle &h, s
f_iosinprogress = ios.first;
++ret;
}
- if(!!(wanted & want::ioswaittime))
+ if(!!(wanted & want::iosbusytime))
{
- f_ioswaittime = ios.second;
+ f_iosbusytime = ios.second;
++ret;
}
}
@@ -367,7 +367,7 @@ LLFIO_HEADERS_ONLY_MEMFUNC_SPEC result<std::pair<uint32_t, float>> statfs_t::_fi
std::chrono::steady_clock::time_point last_updated;
uint32_t f_iosinprogress{0};
- float f_ioswaittime{0};
+ float f_iosbusytime{0};
};
std::mutex lock;
std::vector<item> items;
@@ -381,7 +381,7 @@ LLFIO_HEADERS_ONLY_MEMFUNC_SPEC result<std::pair<uint32_t, float>> statfs_t::_fi
{
if(std::chrono::duration_cast<std::chrono::milliseconds>(now - i.last_updated) < std::chrono::milliseconds(100))
{
- return {i.f_iosinprogress, i.f_ioswaittime}; // exit with old readings
+ return {i.f_iosinprogress, i.f_iosbusytime}; // exit with old readings
}
break;
}
@@ -458,12 +458,12 @@ LLFIO_HEADERS_ONLY_MEMFUNC_SPEC result<std::pair<uint32_t, float>> statfs_t::_fi
else
{
auto timediff = std::chrono::duration_cast<std::chrono::milliseconds>(now - it->last_updated);
- it->f_ioswaittime = std::min((float) ((double) (fields[9] - it->millis) / timediff.count()), 1.0f);
+ it->f_iosbusytime = std::min((float) ((double) (fields[9] - it->millis) / timediff.count()), 1.0f);
it->millis = fields[9];
}
it->f_iosinprogress = (uint32_t) fields[8];
it->last_updated = now;
- return {it->f_iosinprogress, it->f_ioswaittime};
+ return {it->f_iosinprogress, it->f_iosbusytime};
}
}
// It's totally possible that the dev_t reported by stat()
@@ -476,7 +476,7 @@ LLFIO_HEADERS_ONLY_MEMFUNC_SPEC result<std::pair<uint32_t, float>> statfs_t::_fi
return error_from_exception();
}
#else
- /* On FreeBSD, want::iosinprogress and want::ioswaittime could be implemented
+ /* On FreeBSD, want::iosinprogress and want::iosbusytime could be implemented
using libdevstat. See https://www.freebsd.org/cgi/man.cgi?query=devstat&sektion=3.
Code donations welcome!
diff --git a/include/llfio/v2.0/detail/impl/windows/file_handle.ipp b/include/llfio/v2.0/detail/impl/windows/file_handle.ipp
index 047a87b3..6e9c7a25 100644
--- a/include/llfio/v2.0/detail/impl/windows/file_handle.ipp
+++ b/include/llfio/v2.0/detail/impl/windows/file_handle.ipp
@@ -916,7 +916,7 @@ LLFIO_HEADERS_ONLY_MEMFUNC_SPEC result<std::pair<uint32_t, float>> statfs_t::_fi
} last_reading;
uint32_t iosinprogress = 0;
- float ioswaittime = 0;
+ float iosbusytime = 0;
DWORD disk_extents = vde->NumberOfDiskExtents;
for(DWORD disk_extent = 0; disk_extent < disk_extents; disk_extent++)
{
@@ -963,15 +963,15 @@ LLFIO_HEADERS_ONLY_MEMFUNC_SPEC result<std::pair<uint32_t, float>> statfs_t::_fi
uint64_t rd = (uint64_t) dp->ReadTime.QuadPart - (uint64_t) last_reading.items[DiskNumber].ReadTime;
uint64_t wd = (uint64_t) dp->WriteTime.QuadPart - (uint64_t) last_reading.items[DiskNumber].WriteTime;
uint64_t id = (uint64_t) dp->IdleTime.QuadPart - (uint64_t) last_reading.items[DiskNumber].IdleTime;
- ioswaittime += 1 - (float) ((double) id / (rd + wd + id));
+ iosbusytime += 1 - (float) ((double) id / (rd + wd + id));
}
last_reading.items[DiskNumber].ReadTime = dp->ReadTime.QuadPart;
last_reading.items[DiskNumber].WriteTime = dp->WriteTime.QuadPart;
last_reading.items[DiskNumber].IdleTime = dp->IdleTime.QuadPart;
}
iosinprogress /= disk_extents;
- ioswaittime /= disk_extents;
- return {iosinprogress, std::min(ioswaittime, 1.0f)};
+ iosbusytime /= disk_extents;
+ return {iosinprogress, std::min(iosbusytime, 1.0f)};
}
catch(...)
{
diff --git a/include/llfio/v2.0/detail/impl/windows/statfs.ipp b/include/llfio/v2.0/detail/impl/windows/statfs.ipp
index 82ed8ba4..06e6040c 100644
--- a/include/llfio/v2.0/detail/impl/windows/statfs.ipp
+++ b/include/llfio/v2.0/detail/impl/windows/statfs.ipp
@@ -141,7 +141,7 @@ LLFIO_HEADERS_ONLY_MEMFUNC_SPEC result<size_t> statfs_t::fill(const handle &h, s
f_iosize = ffssi->PhysicalBytesPerSectorForPerformance;
++ret;
}
- if(!!(wanted & want::iosinprogress) || !!(wanted & want::ioswaittime))
+ if(!!(wanted & want::iosinprogress) || !!(wanted & want::iosbusytime))
{
if(f_mntfromname.empty())
{
@@ -247,7 +247,7 @@ LLFIO_HEADERS_ONLY_MEMFUNC_SPEC result<size_t> statfs_t::fill(const handle &h, s
break;
}
}
- if(!!(wanted & want::iosinprogress) || !!(wanted & want::ioswaittime))
+ if(!!(wanted & want::iosinprogress) || !!(wanted & want::iosbusytime))
{
OUTCOME_TRY(auto ios, _fill_ios(h, f_mntfromname));
if(!!(wanted & want::iosinprogress))
@@ -255,9 +255,9 @@ LLFIO_HEADERS_ONLY_MEMFUNC_SPEC result<size_t> statfs_t::fill(const handle &h, s
f_iosinprogress = ios.first;
++ret;
}
- if(!!(wanted & want::ioswaittime))
+ if(!!(wanted & want::iosbusytime))
{
- f_ioswaittime = ios.second;
+ f_iosbusytime = ios.second;
++ret;
}
}
diff --git a/include/llfio/v2.0/dynamic_thread_pool_group.hpp b/include/llfio/v2.0/dynamic_thread_pool_group.hpp
index d3b6279d..769e1393 100644
--- a/include/llfio/v2.0/dynamic_thread_pool_group.hpp
+++ b/include/llfio/v2.0/dynamic_thread_pool_group.hpp
@@ -155,7 +155,7 @@ public:
{
friend struct detail::global_dynamic_thread_pool_impl;
friend class dynamic_thread_pool_group_impl;
- dynamic_thread_pool_group_impl *_parent{nullptr};
+ std::atomic<dynamic_thread_pool_group_impl *> _parent{nullptr};
void *_internalworkh{nullptr}, *_internaltimerh{nullptr};
work_item *_prev{nullptr}, *_next{nullptr};
intptr_t _nextwork{-1};
@@ -164,10 +164,12 @@ public:
int _internalworkh_inuse{0};
protected:
- work_item() = default;
+ void *_private{nullptr};
+
+ constexpr work_item() {}
work_item(const work_item &o) = delete;
work_item(work_item &&o) noexcept
- : _parent(o._parent)
+ : _parent(o._parent.load(std::memory_order_relaxed))
, _internalworkh(o._internalworkh)
, _internaltimerh(o._internaltimerh)
, _prev(o._prev)
@@ -175,17 +177,21 @@ public:
, _nextwork(o._nextwork)
, _timepoint1(o._timepoint1)
, _timepoint2(o._timepoint2)
+ , _internalworkh_inuse(o._internalworkh_inuse)
+ , _private(o._private)
{
- assert(o._parent == nullptr);
+ assert(o._parent.load(std::memory_order_relaxed) == nullptr);
assert(o._internalworkh == nullptr);
assert(o._internaltimerh == nullptr);
- if(o._parent != nullptr || o._internalworkh != nullptr)
+ if(o._parent.load(std::memory_order_relaxed) != nullptr || o._internalworkh != nullptr)
{
LLFIO_LOG_FATAL(this, "FATAL: dynamic_thread_pool_group::work_item was relocated in memory during use!");
abort();
}
o._prev = o._next = nullptr;
o._nextwork = -1;
+ o._internalworkh_inuse = 0;
+ o._private = nullptr;
}
work_item &operator=(const work_item &) = delete;
work_item &operator=(work_item &&) = delete;
@@ -210,7 +216,7 @@ public:
}
//! Returns the parent work group between successful submission and just before `group_complete()`.
- dynamic_thread_pool_group *parent() const noexcept { return reinterpret_cast<dynamic_thread_pool_group *>(_parent); }
+ dynamic_thread_pool_group *parent() const noexcept { return reinterpret_cast<dynamic_thread_pool_group *>(_parent.load(std::memory_order_relaxed)); }
/*! Invoked by the i/o thread pool to determine if this work item
has more work to do.
@@ -272,6 +278,91 @@ public:
virtual void group_complete(const result<void> &cancelled) noexcept { (void) cancelled; }
};
+ /*! \class io_aware_work_item
+ \brief A work item which paces when it next executes according to i/o congestion.
+
+ Currently there is only a working implementation of this for the Microsoft Windows
+ and Linux platforms, due to lack of working `statfs_t::f_iosinprogress` on other
+ platforms. If retrieving that for a seekable handle does not work, the constructor
+ throws an exception.
+
+ For seekable handles, currently `reads`, `writes` and `barriers` are ignored. We
+ simply retrieve, periodically, `statfs_t::f_iosinprogress` and `statfs_t::f_iosbusytime`
+ for the storage devices backing the seekable handle. If the i/o wait time exceeds
+ 95% and the i/o in progress > 1, `next()` will start setting the default deadline passed to
+ `io_aware_next()`. Thereafter, every 1/10th of a second, if `statfs_t::f_iosinprogress`
+ is above 1, it will increase the deadline by 1/16th, whereas if it is
+ below 2, it will decrease the deadline by 1/16th. The default deadline chosen is always the worst of all the
+ storage devices of all the handles. This will reduce concurrency within the kernel thread pool
+ in order to reduce congestion on the storage devices. If at any point `statfs_t::f_iosbusytime`
+ drops below 95% as averaged across one second, the additional
+ throttling is completely removed. `io_aware_next()` can ignore the default deadline
+ passed into it, and can set any other deadline.
+
+ For non-seekable handles, the handle must have an i/o multiplexer set upon it, and on
+ Microsoft Windows, that i/o multiplexer must be utilising the IOCP instance of the
+ global Win32 thread pool. For each `reads`, `writes` and `barriers` which is non-zero,
+ a corresponding zero length i/o is constructed and initiated. When the i/o completes,
+ and all readable handles in the work item's set have data waiting to be read, and all
+ writable handles in the work item's set have space to allow writes, only then is the
+ work item invoked with the next piece of work.
+
+ \note Non-seekable handle support is not implemented yet.
+ */
+ class LLFIO_DECL io_aware_work_item : public work_item
+ {
+ public:
+ //! Information about an i/o handle this work item will use
+ struct io_handle_awareness
+ {
+ //! An i/o handle this work item will use
+ io_handle *h{nullptr};
+ //! The relative amount of reading done by this work item from the handle.
+ float reads{0};
+ //! The relative amount of writing done by this work item to the handle.
+ float writes{0};
+ //! The relative amount of write barriering done by this work item to the handle.
+ float barriers{0};
+
+ void *_internal{nullptr};
+ };
+
+ private:
+ const span<io_handle_awareness> _handles;
+
+ LLFIO_HEADERS_ONLY_VIRTUAL_SPEC intptr_t next(deadline &d) noexcept override final;
+
+ public:
+ constexpr io_aware_work_item() {}
+ /*! \brief Constructs a work item aware of i/o done to the handles in `hs`.
+
+ Note that the `reads`, `writes` and `barriers` are normalised to proportions
+ out of `1.0` by this constructor, so if for example you had `reads/writes/barriers = 200/100/0`,
+ after normalisation those become `0.66/0.33/0.0` such that the total is `1.0`.
+ If `reads/writes/barriers = 0/0/0` on entry, they are replaced with `0.5/0.5/0.0`.
+
+ Note that normalisation is across *all* i/o handles in the set, so three handles
+ each with `reads/writes/barriers = 200/100/0` on entry would have `0.22/0.11/0.0`
+ each after construction.
+ */
+ explicit LLFIO_HEADERS_ONLY_MEMFUNC_SPEC io_aware_work_item(span<io_handle_awareness> hs);
+ io_aware_work_item(io_aware_work_item &&o) noexcept
+ : work_item(std::move(o))
+ , _handles(o._handles)
+ {
+ }
+ LLFIO_HEADERS_ONLY_MEMFUNC_SPEC ~io_aware_work_item();
+
+ //! The handles originally registered during construction.
+ span<io_handle_awareness> handles() const noexcept { return _handles; }
+
+ /*! \brief As for `work_item::next()`, but deadline may be extended to
+ reduce i/o congestion on the hardware devices to which the handles
+ refer.
+ */
+ virtual intptr_t io_aware_next(deadline &d) noexcept = 0;
+ };
+
virtual ~dynamic_thread_pool_group() {}
/*! \brief Threadsafe. Submit one or more work items for execution. Note that you can submit more later.
diff --git a/include/llfio/v2.0/fs_handle.hpp b/include/llfio/v2.0/fs_handle.hpp
index 8700a87b..1be9bdaf 100644
--- a/include/llfio/v2.0/fs_handle.hpp
+++ b/include/llfio/v2.0/fs_handle.hpp
@@ -144,6 +144,8 @@ public:
using path_view_type = path_view;
//! The unique identifier type used by this handle
using unique_id_type = QUICKCPPLIB_NAMESPACE::integers128::uint128;
+ //! A hasher for the unique identifier type used by this handle
+ using unique_id_type_hasher = QUICKCPPLIB_NAMESPACE::integers128::uint128_hasher;
protected:
mutable dev_t _devid{0};
diff --git a/include/llfio/v2.0/statfs.hpp b/include/llfio/v2.0/statfs.hpp
index 06421ee4..c8af2fde 100644
--- a/include/llfio/v2.0/statfs.hpp
+++ b/include/llfio/v2.0/statfs.hpp
@@ -63,7 +63,7 @@ filing system for your handle reports a `dev_t` from `fstat()` which does not
match anything in the system's disk hardware i/o stats. As this can be completely
benign (e.g. your handle is a socket), this is treated as a soft failure.
-Note for `f_iosinprogress` and `f_ioswaittime` that support is not implemented yet
+Note for `f_iosinprogress` and `f_iosbusytime` that support is not implemented yet
outside Microsoft Windows and Linux. Note also that for Linux, filing systems
spanning multiple hardware devices have undefined outcomes, whereas on Windows
you are given the average of the values for all underlying hardware devices.
@@ -103,7 +103,7 @@ struct LLFIO_DECL statfs_t
filesystem::path f_mntonname; /*!< directory on which mounted (Windows, POSIX) */
uint32_t f_iosinprogress{_allbits1_32}; /*!< i/o's currently in progress (i.e. queue depth) (Windows, Linux) */
- float f_ioswaittime{_allbits1_float}; /*!< percentage of time spent doing i/o (1.0 = 100%) (Windows, Linux) */
+ float f_iosbusytime{_allbits1_float}; /*!< percentage of time spent doing i/o (1.0 = 100%) (Windows, Linux) */
//! Used to indicate what metadata should be filled in
QUICKCPPLIB_BITFIELD_BEGIN(want){flags = 1 << 0,
@@ -121,7 +121,7 @@ struct LLFIO_DECL statfs_t
mntfromname = 1 << 12,
mntonname = 1 << 13,
iosinprogress = 1 << 14,
- ioswaittime = 1 << 15,
+ iosbusytime = 1 << 15,
all = static_cast<unsigned>(-1)} QUICKCPPLIB_BITFIELD_END(want)
//! Constructs a default initialised instance (all bits set)
statfs_t()
diff --git a/test/tests/dynamic_thread_pool_group.cpp b/test/tests/dynamic_thread_pool_group.cpp
index 8e068583..9c8c6de8 100644
--- a/test/tests/dynamic_thread_pool_group.cpp
+++ b/test/tests/dynamic_thread_pool_group.cpp
@@ -231,7 +231,7 @@ static inline void TestDynamicThreadPoolGroupNestingWorks()
uint64_t mean = 0, count = 0;
for(auto &i : time_bucket)
{
- mean += i.first*i.second;
+ mean += i.first * i.second;
count += i.second;
}
mean /= count;
@@ -329,7 +329,96 @@ static inline void TestDynamicThreadPoolGroupNestingWorks()
BOOST_CHECK(shared_states[MAX_NESTING - 1].stddev < shared_states[MAX_NESTING / 2].stddev / 2);
}
+static inline void TestDynamicThreadPoolGroupIoAwareWorks()
+{
+ namespace llfio = LLFIO_V2_NAMESPACE;
+ static constexpr size_t WORK_ITEMS = 1000;
+ static constexpr size_t IO_SIZE = 1 * 65536;
+ struct shared_state_t
+ {
+ llfio::file_handle h;
+ llfio::dynamic_thread_pool_group::io_aware_work_item::io_handle_awareness awareness;
+ std::atomic<size_t> concurrency{0}, max_concurrency{0};
+ std::atomic<uint64_t> current_pacing{0};
+ } shared_state;
+ struct work_item final : public llfio::dynamic_thread_pool_group::io_aware_work_item
+ {
+ using _base = llfio::dynamic_thread_pool_group::io_aware_work_item;
+ shared_state_t *shared_state;
+
+ work_item() = default;
+ explicit work_item(shared_state_t *_shared_state)
+ : _base({&_shared_state->awareness, 1})
+ , shared_state(_shared_state)
+ {
+ }
+ work_item(work_item &&o) noexcept
+ : _base(std::move(o))
+ , shared_state(o.shared_state)
+ {
+ }
+
+ virtual intptr_t io_aware_next(llfio::deadline &d) noexcept override
+ {
+ shared_state->current_pacing.store(d.nsecs, std::memory_order_relaxed);
+ return 1;
+ }
+ virtual llfio::result<void> operator()(intptr_t /*unused*/) noexcept override
+ {
+ auto concurrency = shared_state->concurrency.fetch_add(1, std::memory_order_relaxed) + 1;
+ for(size_t expected_ = shared_state->max_concurrency; concurrency > expected_;)
+ {
+ shared_state->max_concurrency.compare_exchange_weak(expected_, concurrency);
+ }
+ static thread_local std::vector<llfio::byte, llfio::utils::page_allocator<llfio::byte>> buffer(IO_SIZE);
+ OUTCOME_TRY(shared_state->h.read((concurrency - 1) * IO_SIZE, {{buffer}}));
+ shared_state->concurrency.fetch_sub(1, std::memory_order_relaxed);
+ return llfio::success();
+ }
+ // virtual void group_complete(const llfio::result<void> &/*unused*/) noexcept override { }
+ };
+ shared_state.h = llfio::file_handle::temp_file({}, llfio::file_handle::mode::write, llfio::file_handle::creation::only_if_not_exist,
+ llfio::file_handle::caching::only_metadata)
+ .value();
+ shared_state.awareness.h = &shared_state.h;
+ shared_state.h.truncate(WORK_ITEMS * IO_SIZE).value();
+ alignas(4096) llfio::byte buffer[IO_SIZE];
+ llfio::utils::random_fill((char *) buffer, sizeof(buffer));
+ std::vector<work_item> workitems;
+ for(size_t n = 0; n < WORK_ITEMS; n++)
+ {
+ workitems.emplace_back(&shared_state);
+ shared_state.h.write(n * IO_SIZE, {{buffer, sizeof(buffer)}}).value();
+ }
+ auto tpg = llfio::make_dynamic_thread_pool_group().value();
+ tpg->submit(llfio::span<work_item>(workitems)).value();
+ auto begin = std::chrono::steady_clock::now();
+ size_t paced = 0;
+ while(std::chrono::duration_cast<std::chrono::seconds>(std::chrono::steady_clock::now() - begin) < std::chrono::seconds(60))
+ {
+ llfio::statfs_t statfs;
+ statfs.fill(shared_state.h, llfio::statfs_t::want::iosinprogress | llfio::statfs_t::want::iosbusytime | llfio::statfs_t::want::mntonname).value();
+ std::cout << "\nStorage device at " << statfs.f_mntonname << " is at " << (100.0f * statfs.f_iosbusytime) << "% utilisation and has an i/o queue depth of "
+ << statfs.f_iosinprogress << ". Current concurrency is " << shared_state.concurrency.load(std::memory_order_relaxed) << " and current pacing is "
+ << (shared_state.current_pacing.load(std::memory_order_relaxed) / 1000.0) << " microseconds." << std::endl;
+ if(shared_state.current_pacing.load(std::memory_order_relaxed) > 0)
+ {
+ paced++;
+ }
+ std::this_thread::sleep_for(std::chrono::milliseconds(250));
+ }
+ tpg->stop().value();
+ auto r = tpg->wait();
+ if(!r && r.error() != llfio::errc::operation_canceled)
+ {
+ r.value();
+ }
+ BOOST_CHECK(paced > 0);
+}
+
KERNELTEST_TEST_KERNEL(integration, llfio, dynamic_thread_pool_group, works, "Tests that llfio::dynamic_thread_pool_group works as expected",
- TestDynamicThreadPoolGroupWorks())
+ TestDynamicThreadPoolGroupWorks())
KERNELTEST_TEST_KERNEL(integration, llfio, dynamic_thread_pool_group, nested, "Tests that nesting of llfio::dynamic_thread_pool_group works as expected",
- TestDynamicThreadPoolGroupNestingWorks())
+ TestDynamicThreadPoolGroupNestingWorks())
+KERNELTEST_TEST_KERNEL(integration, llfio, dynamic_thread_pool_group, io_aware_work_item,
+ "Tests that llfio::dynamic_thread_pool_group::io_aware_work_item works as expected", TestDynamicThreadPoolGroupIoAwareWorks())
diff --git a/test/tests/statfs.cpp b/test/tests/statfs.cpp
index 6d4d9881..138fd4c1 100644
--- a/test/tests/statfs.cpp
+++ b/test/tests/statfs.cpp
@@ -49,7 +49,7 @@ static inline void TestStatfsIosInProgress()
std::cout << "\n mounted filesystem = " << statfs.f_mntfromname;
std::cout << "\n directory on which mounted = " << statfs.f_mntonname;
std::cout << "\n i/o's currently in progress (i.e. queue depth) = " << statfs.f_iosinprogress;
- std::cout << "\n percentage of time spent doing i/o (1.0 = 100%) = " << statfs.f_ioswaittime;
+ std::cout << "\n percentage of time spent doing i/o (1.0 = 100%) = " << statfs.f_iosbusytime;
std::cout << std::endl;
};
llfio::statfs_t s1base, s2base;
@@ -78,7 +78,7 @@ static inline void TestStatfsIosInProgress()
print_statfs(h1, s1load);
print_statfs(h2, s2load);
// BOOST_CHECK(s1load.f_iosinprogress > s1base.f_iosinprogress);
- BOOST_CHECK(std::isnan(s1base.f_ioswaittime) || s1load.f_ioswaittime > s1base.f_ioswaittime);
+ BOOST_CHECK(std::isnan(s1base.f_iosbusytime) || s1load.f_iosbusytime > s1base.f_iosbusytime);
f.get();
done = false;
}
@@ -92,7 +92,7 @@ static inline void TestStatfsIosInProgress()
print_statfs(h1, s1load);
print_statfs(h2, s2load);
// BOOST_CHECK(s2load.f_iosinprogress > s2base.f_iosinprogress);
- BOOST_CHECK(std::isnan(s2base.f_ioswaittime) || s2load.f_ioswaittime > s2base.f_ioswaittime);
+ BOOST_CHECK(std::isnan(s2base.f_iosbusytime) || s2load.f_iosbusytime > s2base.f_iosbusytime);
f.get();
done = false;
}