Welcome to mirror list, hosted at ThFree Co, Russian Federation.

github.com/windirstat/llfio.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorNiall Douglas (s [underscore] sourceforge {at} nedprod [dot] com) <spamtrap@nedprod.com>2021-01-01 21:00:17 +0300
committerNiall Douglas (s [underscore] sourceforge {at} nedprod [dot] com) <spamtrap@nedprod.com>2021-03-16 13:21:39 +0300
commit02c1625d7d7fda0ed37c48d6aa419943cbeeebb3 (patch)
treefa6eab9c049ddc11a5498fe24f7d50b46d6e20d9
parent7eeb88fccc7e7071a8065afbbffc36379e8b7091 (diff)
Implemented and debugged a Grand Unified Dispatch backend for dynamic_thread_pool_group. Works surprisingly nicely on Linux, haven't actually tested it on Mac OS nor FreeBSD, but no reason it shouldn't work just fine.
-rw-r--r--CMakeLists.txt22
-rw-r--r--include/llfio/revision.hpp6
-rw-r--r--include/llfio/v2.0/detail/impl/dynamic_thread_pool_group.ipp232
-rw-r--r--include/llfio/v2.0/dynamic_thread_pool_group.hpp40
-rw-r--r--include/llfio/v2.0/llfio.hpp2
-rw-r--r--test/tests/dynamic_thread_pool_group.cpp22
6 files changed, 268 insertions, 56 deletions
diff --git a/CMakeLists.txt b/CMakeLists.txt
index ccf2387d..d6112775 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -28,6 +28,7 @@ include(QuickCppLibUtils)
include(QuickCppLibPolicies)
option(LLFIO_USE_EXPERIMENTAL_SG14_STATUS_CODE "Whether to use SG14 status_code for failure handling" OFF)
+option(LLFIO_DISABLE_LIBDISPATCH "Whether to disable automatic discovery of libdispatch/Grand Unified Dispatch" OFF)
option(LLFIO_ENABLE_DEPENDENCY_SMOKE_TEST "Whether to build executables which are smoke tests that LLFIO is fully working. Used by various package managers such as vcpkg." OFF)
option(LLFIO_ASSUME_CROSS_COMPILING "Whether to assume we are cross compiling. Normally automatically detected, but if automatic detection doesn't work, a working <filesystem> will not be found during cmake configure." OFF)
option(UNIT_TESTS_BUILD_ALL "Whether to run all of the unit test suite." OFF)
@@ -283,6 +284,27 @@ int main() {
all_compile_definitions(PUBLIC LLFIO_FORCE_EXPERIMENTAL_FILESYSTEM=1 KERNELTEST_FORCE_EXPERIMENTAL_FILESYSTEM=1)
endif()
endif()
+# Do we have Grand Central Dispatch on this platform?
+if(NOT LLFIO_DISABLE_LIBDISPATCH)
+ function(check_have_libdispatch)
+ set(CMAKE_REQUIRED_LIBRARIES dispatch)
+ check_cxx_source_compiles("
+#include <dispatch/dispatch.h>
+int main() {
+ dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0);
+ return 0;
+}
+" LLFIO_HAS_LIBDISPATCH)
+ endfunction()
+ check_have_libdispatch()
+ if(LLFIO_HAS_LIBDISPATCH)
+ all_compile_definitions(PUBLIC LLFIO_FORCE_USE_LIBDISPATCH=1)
+ all_link_libraries(PUBLIC dispatch)
+ endif()
+endif()
+if(NOT LLFIO_HAS_LIBDISPATCH AND (CMAKE_SYSTEM_NAME MATCHES "FreeBSD" OR APPLE))
+ indented_message(FATAL_ERROR "FATAL: Grand Central Dispatch as libdispatch was not found on this FreeBSD or Mac OS system. libdispatch is required for LLFIO to build on those systems.")
+endif()
# Set any macros this library requires
all_compile_definitions(PRIVATE LLFIO_INCLUDE_STORAGE_PROFILE=1 LLFIO_ENABLE_TEST_IO_MULTIPLEXERS=1)
diff --git a/include/llfio/revision.hpp b/include/llfio/revision.hpp
index 4fec5a76..4c4d51a2 100644
--- a/include/llfio/revision.hpp
+++ b/include/llfio/revision.hpp
@@ -1,4 +1,4 @@
// Note the second line of this file must ALWAYS be the git SHA, third line ALWAYS the git SHA update time
-#define LLFIO_PREVIOUS_COMMIT_REF 4fa0e9bf0835d9d4d9dd8b0f93e6c650650fc0af
-#define LLFIO_PREVIOUS_COMMIT_DATE "2020-12-21 16:27:04 +00:00"
-#define LLFIO_PREVIOUS_COMMIT_UNIQUE 4fa0e9bf
+#define LLFIO_PREVIOUS_COMMIT_REF 151ca5af1030e9b0393730f33d50882ec103aac8
+#define LLFIO_PREVIOUS_COMMIT_DATE "2020-12-29 14:41:13 +00:00"
+#define LLFIO_PREVIOUS_COMMIT_UNIQUE 151ca5af
diff --git a/include/llfio/v2.0/detail/impl/dynamic_thread_pool_group.ipp b/include/llfio/v2.0/detail/impl/dynamic_thread_pool_group.ipp
index 85586224..d40ed3b6 100644
--- a/include/llfio/v2.0/detail/impl/dynamic_thread_pool_group.ipp
+++ b/include/llfio/v2.0/detail/impl/dynamic_thread_pool_group.ipp
@@ -24,6 +24,7 @@ Distributed under the Boost Software License, Version 1.0.
#include "../../dynamic_thread_pool_group.hpp"
+#include "../../file_handle.hpp"
#include "../../statfs.hpp"
#include <atomic>
@@ -32,12 +33,24 @@ Distributed under the Boost Software License, Version 1.0.
#include <unordered_set>
#include <vector>
+#include <iostream>
+
+#if LLFIO_FORCE_USE_LIBDISPATCH
+#include <dispatch/dispatch.h>
+#define LLFIO_DYNAMIC_THREAD_POOL_GROUP_USING_GCD 1
+#else
#ifdef _WIN32
#include "windows/import.hpp"
#include <threadpoolapiset.h>
#else
-#include <pthread.h>
-#include <thread>
+#if __has_include(<dispatch/dispatch.h>)
+#include <dispatch/dispatch.h>
+#define LLFIO_DYNAMIC_THREAD_POOL_GROUP_USING_GCD 1
+#else
+#define LLFIO_DYNAMIC_THREAD_POOL_GROUP_USING_GCD 0
+#error Right now dynamic_thread_pool_group requires libdispatch to be available on POSIX. It should get auto discovered if installed, which is the default on BSD and Mac OS. Try installing libdispatch-dev if on Linux.
+#endif
+#endif
#endif
LLFIO_V2_NAMESPACE_BEGIN
@@ -46,7 +59,20 @@ namespace detail
{
struct global_dynamic_thread_pool_impl
{
-#ifdef _WIN32
+#if LLFIO_DYNAMIC_THREAD_POOL_GROUP_USING_GCD
+ using threadh_type = void *;
+ using grouph_type = dispatch_group_t;
+ static void _gcd_dispatch_callback(void *arg)
+ {
+ auto *workitem = (dynamic_thread_pool_group::work_item *) arg;
+ global_dynamic_thread_pool()._workerthread(workitem, nullptr);
+ }
+ static void _gcd_timer_callback(void *arg)
+ {
+ auto *workitem = (dynamic_thread_pool_group::work_item *) arg;
+ global_dynamic_thread_pool()._timerthread(workitem, nullptr);
+ }
+#elif defined(_WIN32)
using threadh_type = PTP_CALLBACK_INSTANCE;
using grouph_type = PTP_CALLBACK_ENVIRON;
static void CALLBACK _win32_worker_thread_callback(threadh_type threadh, PVOID Parameter, PTP_WORK /*unused*/)
@@ -59,7 +85,6 @@ namespace detail
auto *workitem = (dynamic_thread_pool_group::work_item *) Parameter;
global_dynamic_thread_pool()._timerthread(workitem, threadh);
}
-#else
#endif
std::mutex workqueue_lock;
@@ -75,11 +100,11 @@ namespace detail
{
size_t refcount{0};
deadline default_deadline;
- float average_busy{0};
+ float average_busy{0}, average_queuedepth{0};
std::chrono::steady_clock::time_point last_updated;
statfs_t statfs;
};
- std::unordered_map<file_handle::unique_id_type, io_aware_work_item_statfs, file_handle::unique_id_type_hasher> io_aware_work_item_handles;
+ std::unordered_map<fs_handle::unique_id_type, io_aware_work_item_statfs, fs_handle::unique_id_type_hasher> io_aware_work_item_handles;
global_dynamic_thread_pool_impl()
{
@@ -140,15 +165,22 @@ namespace detail
{
return errc::invalid_argument;
}
+ workitem->_timepoint1 = {};
+ workitem->_timepoint2 = {};
if(workitem->_nextwork == 0 || d.nsecs > 0)
{
if(nullptr == workitem->_internaltimerh)
{
+#if LLFIO_DYNAMIC_THREAD_POOL_GROUP_USING_GCD
+ (void) grouph;
+ workitem->_internaltimerh = (void *) (uintptr_t) -1;
+#elif defined(_WIN32)
workitem->_internaltimerh = CreateThreadpoolTimer(_win32_timer_thread_callback, workitem, grouph);
if(nullptr == workitem->_internaltimerh)
{
return win32_error();
}
+#endif
}
if(d.nsecs > 0)
{
@@ -213,7 +245,9 @@ class dynamic_thread_pool_group_impl final : public dynamic_thread_pool_group
std::atomic<bool> _stopping{false}, _stopped{true}, _completing{false};
result<void> _abnormal_completion_cause{success()}; // The cause of any abnormal group completion
-#ifdef _WIN32
+#if LLFIO_DYNAMIC_THREAD_POOL_GROUP_USING_GCD
+ dispatch_group_t _grouph;
+#elif defined(_WIN32)
TP_CALLBACK_ENVIRON _callbackenviron;
PTP_CALLBACK_ENVIRON _grouph{&_callbackenviron};
#endif
@@ -226,9 +260,14 @@ public:
{
auto &impl = detail::global_dynamic_thread_pool();
_nesting_level = detail::global_dynamic_thread_pool_thread_local_state().nesting_level;
-#ifdef _WIN32
+#if LLFIO_DYNAMIC_THREAD_POOL_GROUP_USING_GCD
+ _grouph = dispatch_group_create();
+ if(_grouph == nullptr)
+ {
+ return errc::not_enough_memory;
+ }
+#elif defined(_WIN32)
InitializeThreadpoolEnvironment(_grouph);
-#else
#endif
detail::global_dynamic_thread_pool_impl::_lock_guard g(impl.workqueue_lock);
// Append this group to the global work queue at its nesting level
@@ -250,13 +289,18 @@ public:
LLFIO_LOG_FUNCTION_CALL(this);
(void) wait();
auto &impl = detail::global_dynamic_thread_pool();
-#ifdef _WIN32
+#if LLFIO_DYNAMIC_THREAD_POOL_GROUP_USING_GCD
+ if(nullptr != _grouph)
+ {
+ dispatch_release(_grouph);
+ _grouph = nullptr;
+ }
+#elif defined(_WIN32)
if(nullptr != _grouph)
{
DestroyThreadpoolEnvironment(_grouph);
_grouph = nullptr;
}
-#else
#endif
detail::global_dynamic_thread_pool_impl::_lock_guard g(impl.workqueue_lock);
assert(impl.workqueue.size() > _nesting_level);
@@ -356,14 +400,45 @@ namespace detail
inline void global_dynamic_thread_pool_impl::_submit_work_item(_lock_guard &g, dynamic_thread_pool_group::work_item *workitem)
{
(void) g;
- if(workitem->_nextwork != -1 && !workitem->_parent.load(std::memory_order_relaxed)->_stopping.load(std::memory_order_relaxed))
+ if(workitem->_nextwork != -1)
{
// If no work item for now, or there is a delay, schedule a timer
if(workitem->_nextwork == 0 || workitem->_timepoint1 != std::chrono::steady_clock::time_point() ||
workitem->_timepoint2 != std::chrono::system_clock::time_point())
{
assert(workitem->_internaltimerh != nullptr);
-#ifdef _WIN32
+#if LLFIO_DYNAMIC_THREAD_POOL_GROUP_USING_GCD
+ dispatch_time_t when;
+ if(workitem->_timepoint1 != std::chrono::steady_clock::time_point())
+ {
+ auto duration = std::chrono::duration_cast<std::chrono::nanoseconds>(workitem->_timepoint1 - std::chrono::steady_clock::now()).count();
+ if(duration > 1000000000LL)
+ {
+ // Because GCD has no way of cancelling timers, nor assigning them to a group,
+ // we clamp the timer to 1 second. Then if cancellation is ever done to the group,
+ // the worst possible wait is 1 second. _timerthread will reschedule the timer
+ // if it gets called short.
+ duration = 1000000000LL;
+ }
+ when = dispatch_time(DISPATCH_TIME_NOW, duration);
+ }
+ else if(workitem->_timepoint2 != std::chrono::system_clock::time_point())
+ {
+ deadline d(workitem->_timepoint2);
+ auto now = std::chrono::system_clock::now();
+ if(workitem->_timepoint2 - now > std::chrono::seconds(1))
+ {
+ d = now + std::chrono::seconds(1);
+ }
+ when = dispatch_walltime(&d.utc, 0);
+ }
+ else
+ {
+ when = dispatch_time(DISPATCH_TIME_NOW, 1); // smallest possible non immediate duration from now
+ }
+ // std::cout << "*** timer " << workitem << std::endl;
+ dispatch_after_f(when, dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_HIGH, 0), workitem, _gcd_timer_callback);
+#elif defined(_WIN32)
LARGE_INTEGER li;
DWORD slop = 1000;
if(workitem->_timepoint1 != std::chrono::steady_clock::time_point())
@@ -392,12 +467,24 @@ namespace detail
ft.dwLowDateTime = li.LowPart;
// std::cout << "*** timer " << workitem << std::endl;
SetThreadpoolTimer((PTP_TIMER) workitem->_internaltimerh, &ft, 0, slop);
-#else
#endif
}
else
{
-#ifdef _WIN32
+#if LLFIO_DYNAMIC_THREAD_POOL_GROUP_USING_GCD
+ intptr_t priority = DISPATCH_QUEUE_PRIORITY_LOW;
+ if(workqueue.size() - workitem->_parent.load(std::memory_order_relaxed)->_nesting_level == 1)
+ {
+ priority = DISPATCH_QUEUE_PRIORITY_HIGH;
+ }
+ else if(workqueue.size() - workitem->_parent.load(std::memory_order_relaxed)->_nesting_level == 2)
+ {
+ priority = DISPATCH_QUEUE_PRIORITY_DEFAULT;
+ }
+ // std::cout << "*** submit " << workitem << std::endl;
+ dispatch_group_async_f(workitem->_parent.load(std::memory_order_relaxed)->_grouph, dispatch_get_global_queue(priority, 0), workitem,
+ _gcd_dispatch_callback);
+#elif defined(_WIN32)
// Set the priority of the group according to distance from the top
TP_CALLBACK_PRIORITY priority = TP_CALLBACK_PRIORITY_LOW;
if(workqueue.size() - workitem->_parent.load(std::memory_order_relaxed)->_nesting_level == 1)
@@ -411,7 +498,6 @@ namespace detail
SetThreadpoolCallbackPriority(workitem->_parent.load(std::memory_order_relaxed)->_grouph, priority);
// std::cout << "*** submit " << workitem << std::endl;
SubmitThreadpoolWork((PTP_WORK) workitem->_internalworkh);
-#else
#endif
}
}
@@ -437,7 +523,10 @@ namespace detail
for(auto *i : work)
{
_remove_from_list(group->_work_items_active, i);
-#ifdef _WIN32
+#if LLFIO_DYNAMIC_THREAD_POOL_GROUP_USING_GCD
+ i->_internalworkh = nullptr;
+ i->_internaltimerh = nullptr;
+#elif defined(_WIN32)
if(nullptr != i->_internaltimerh)
{
CloseThreadpoolTimer((PTP_TIMER) i->_internaltimerh);
@@ -448,7 +537,6 @@ namespace detail
CloseThreadpoolWork((PTP_WORK) i->_internalworkh);
i->_internalworkh = nullptr;
}
-#else
#endif
i->_parent.store(nullptr, std::memory_order_release);
}
@@ -464,13 +552,14 @@ namespace detail
}
else
{
-#ifdef _WIN32
+#if LLFIO_DYNAMIC_THREAD_POOL_GROUP_USING_GCD
+ i->_internalworkh = (void *) (uintptr_t) -1;
+#elif defined(_WIN32)
i->_internalworkh = CreateThreadpoolWork(_win32_worker_thread_callback, i, group->_grouph);
if(nullptr == i->_internalworkh)
{
return win32_error();
}
-#else
#endif
OUTCOME_TRY(_prepare_work_item_delay(i, group->_grouph, d));
_append_to_list(group->_work_items_active, i);
@@ -494,7 +583,11 @@ namespace detail
inline void global_dynamic_thread_pool_impl::_work_item_done(_lock_guard &g, dynamic_thread_pool_group::work_item *i) noexcept
{
(void) g;
-#ifdef _WIN32
+ // std::cout << "*** _work_item_done " << i << std::endl;
+#if LLFIO_DYNAMIC_THREAD_POOL_GROUP_USING_GCD
+ i->_internaltimerh = nullptr;
+ i->_internalworkh = nullptr;
+#elif defined(_WIN32)
if(i->_internalworkh_inuse > 0)
{
i->_internalworkh_inuse = 2;
@@ -512,7 +605,6 @@ namespace detail
i->_internalworkh = nullptr;
}
}
-#else
#endif
_remove_from_list(i->_parent.load(std::memory_order_relaxed)->_work_items_active, i);
_append_to_list(i->_parent.load(std::memory_order_relaxed)->_work_items_done, i);
@@ -593,21 +685,39 @@ namespace detail
LLFIO_DEADLINE_TO_SLEEP_INIT(d);
if(!d || d.nsecs > 0)
{
+#if LLFIO_DYNAMIC_THREAD_POOL_GROUP_USING_GCD
+ while(group->_work_items_active.count > 0)
+ {
+ LLFIO_DEADLINE_TO_TIMEOUT_LOOP(d);
+ dispatch_time_t timeout = DISPATCH_TIME_FOREVER;
+ if(d)
+ {
+ std::chrono::nanoseconds duration;
+ LLFIO_DEADLINE_TO_PARTIAL_TIMEOUT(duration, d);
+ timeout = dispatch_time(DISPATCH_TIME_NOW, duration.count());
+ }
+ g.unlock();
+ dispatch_group_wait(group->_grouph, timeout);
+ g.lock();
+ // if(1 == group->_work_items_active.count)
+ //{
+ // std::cout << "*** wait item remaining is " << group->_work_items_active.front << std::endl;
+ // std::this_thread::sleep_for(std::chrono::seconds(1));
+ //}
+ }
+#elif defined(_WIN32)
auto &tls = detail::global_dynamic_thread_pool_thread_local_state();
-#ifdef _WIN32
if(tls.current_callback_instance != nullptr)
{
// I am being called from within a thread worker. Tell
// the thread pool that I am not going to exit promptly.
CallbackMayRunLong(tls.current_callback_instance);
}
-#endif
// Is this a cancellation?
if(group->_stopping.load(std::memory_order_relaxed))
{
while(group->_work_items_active.count > 0)
{
-#ifdef _WIN32
auto *i = group->_work_items_active.front;
if(nullptr != i->_internalworkh)
{
@@ -662,8 +772,6 @@ namespace detail
// This item got cancelled before it started
_work_item_done(g, group->_work_items_active.front);
}
-#else
-#endif
}
assert(!group->_stopping.load(std::memory_order_relaxed));
}
@@ -671,7 +779,6 @@ namespace detail
{
while(group->_work_items_active.count > 0)
{
-#ifdef _WIN32
auto *i = group->_work_items_active.front;
if(nullptr != i->_internalworkh)
{
@@ -721,8 +828,6 @@ namespace detail
}
i->_internalworkh_inuse = 0;
}
-#else
-#endif
}
}
else
@@ -735,6 +840,7 @@ namespace detail
g.lock();
}
}
+#endif
}
if(group->_work_items_active.count > 0)
{
@@ -750,10 +856,40 @@ namespace detail
inline void global_dynamic_thread_pool_impl::_timerthread(dynamic_thread_pool_group::work_item *workitem, threadh_type /*unused*/)
{
LLFIO_LOG_FUNCTION_CALL(this);
- // std::cout << "*** _timerthread " << workitem << std::endl;
+ assert(workitem->_nextwork != -1);
_lock_guard g(workitem->_parent.load(std::memory_order_relaxed)->_lock);
- workitem->_timepoint1 = {};
- workitem->_timepoint2 = {};
+ // std::cout << "*** _timerthread " << workitem << std::endl;
+ if(workitem->_parent.load(std::memory_order_relaxed)->_stopping.load(std::memory_order_relaxed))
+ {
+ _work_item_done(g, workitem);
+ return;
+ }
+ if(workitem->_timepoint1 != std::chrono::steady_clock::time_point())
+ {
+#if LLFIO_DYNAMIC_THREAD_POOL_GROUP_USING_GCD
+ auto now = std::chrono::steady_clock::now();
+ if(workitem->_timepoint1 - now > std::chrono::seconds(0))
+ {
+ // Timer fired short, so schedule it again
+ _submit_work_item(g, workitem);
+ return;
+ }
+#endif
+ workitem->_timepoint1 = {};
+ }
+ if(workitem->_timepoint2 != std::chrono::system_clock::time_point())
+ {
+#if LLFIO_DYNAMIC_THREAD_POOL_GROUP_USING_GCD
+ auto now = std::chrono::system_clock::now();
+ if(workitem->_timepoint2 - now > std::chrono::seconds(0))
+ {
+ // Timer fired short, so schedule it again
+ _submit_work_item(g, workitem);
+ return;
+ }
+#endif
+ workitem->_timepoint2 = {};
+ }
_work_item_next(g, workitem);
}
@@ -761,10 +897,13 @@ namespace detail
inline void global_dynamic_thread_pool_impl::_workerthread(dynamic_thread_pool_group::work_item *workitem, threadh_type selfthreadh)
{
LLFIO_LOG_FUNCTION_CALL(this);
- // std::cout << "*** _workerthread " << workitem << " with work " << workitem->_nextwork << std::endl;
+ //{
+ // _lock_guard g(workitem->_parent.load(std::memory_order_relaxed)->_lock);
+ // std::cout << "*** _workerthread " << workitem << " begins with work " << workitem->_nextwork << std::endl;
+ //}
assert(workitem->_nextwork != -1);
assert(workitem->_nextwork != 0);
- if(workitem->_parent.load(std::memory_order_relaxed)->_stopped.load(std::memory_order_relaxed))
+ if(workitem->_parent.load(std::memory_order_relaxed)->_stopping.load(std::memory_order_relaxed))
{
_lock_guard g(workitem->_parent.load(std::memory_order_relaxed)->_lock);
_work_item_done(g, workitem);
@@ -779,12 +918,17 @@ namespace detail
workitem->_nextwork = 0; // call next() next time
tls = old_thread_local_state;
_lock_guard g(workitem->_parent.load(std::memory_order_relaxed)->_lock);
+ // std::cout << "*** _workerthread " << workitem << " ends with work " << workitem->_nextwork << std::endl;
if(!r)
{
(void) stop(g, workitem->_parent.load(std::memory_order_relaxed), std::move(r));
_work_item_done(g, workitem);
workitem = nullptr;
}
+ else if(workitem->_parent.load(std::memory_order_relaxed)->_stopping.load(std::memory_order_relaxed))
+ {
+ _work_item_done(g, workitem);
+ }
else
{
_work_item_next(g, workitem);
@@ -885,21 +1029,31 @@ LLFIO_HEADERS_ONLY_MEMFUNC_SPEC intptr_t dynamic_thread_pool_group::io_aware_wor
(void) i->second.statfs.fill(*h.h, statfs_t::want::iosinprogress | statfs_t::want::iosbusytime);
i->second.last_updated = now;
- if(elapsed>std::chrono::seconds(5))
+ if(elapsed > std::chrono::seconds(5))
{
i->second.average_busy = i->second.statfs.f_iosbusytime;
+ i->second.average_queuedepth = (float) i->second.statfs.f_iosinprogress;
}
else
{
i->second.average_busy = (i->second.average_busy * 0.9f) + (i->second.statfs.f_iosbusytime * 0.1f);
+ i->second.average_queuedepth = (i->second.average_queuedepth * 0.9f) + (i->second.statfs.f_iosinprogress * 0.1f);
}
- if(i->second.average_busy < 0.90f)
+ if(i->second.average_busy < 0.95f && i->second.average_queuedepth < 4)
{
i->second.default_deadline = std::chrono::seconds(0); // remove pacing
}
- else if(i->second.statfs.f_iosinprogress > 1)
+#ifdef _WIN32
+ else if(i->second.average_queuedepth > 1) // windows appears to do a lot of i/o coalescing
+#else
+ else if(i->second.average_queuedepth > 32)
+#endif
{
- if((i->second.default_deadline.nsecs >> 4) > 0)
+ if(0 == i->second.default_deadline.nsecs)
+ {
+ i->second.default_deadline = std::chrono::milliseconds(1); // start with 1ms, it'll reduce from there if needed
+ }
+ else if((i->second.default_deadline.nsecs >> 4) > 0)
{
i->second.default_deadline.nsecs += i->second.default_deadline.nsecs >> 4;
}
diff --git a/include/llfio/v2.0/dynamic_thread_pool_group.hpp b/include/llfio/v2.0/dynamic_thread_pool_group.hpp
index 769e1393..a511c3b5 100644
--- a/include/llfio/v2.0/dynamic_thread_pool_group.hpp
+++ b/include/llfio/v2.0/dynamic_thread_pool_group.hpp
@@ -32,11 +32,13 @@ Distributed under the Boost Software License, Version 1.0.
#ifdef _MSC_VER
#pragma warning(push)
#pragma warning(disable : 4251) // dll interface
+#pragma warning(disable : 4275) // dll interface
#endif
LLFIO_V2_NAMESPACE_EXPORT_BEGIN
class dynamic_thread_pool_group_impl;
+class io_handle;
namespace detail
{
@@ -87,7 +89,8 @@ you need to submit a separate work item for each possible amount of
concurrency (e.g. `std::thread::hardware_concurrency()`).
You can have as many or as few items of work as you like. You can
-dynamically submit additional work items at any time. The group of work items can
+dynamically submit additional work items at any time, except when a group
+is currently in the process of being stopped. The group of work items can
be waited upon to complete, after which the work group becomes reset as
if back to freshly constructed. You can also stop executing all the work
items in the group, even if they have not fully completed. If any work
@@ -131,21 +134,37 @@ of the initial `make_dynamic_thread_pool_group()`. The Win32 thread pool
API may perform dynamic memory allocation internally, but that is outside
our control.
+### POSIX
+
+If an installation of libdispatch is detected by LLFIO cmake during
+configuration, it is used preferentially. libdispatch is better known as
+Grand Central Dispatch, originally a Mac OS technology but since ported
+to a high quality kernel based implementation on recent FreeBSDs, and to
+a lower quality userspace based implementation on Linux. Generally
+libdispatch should get automatically found on Mac OS without additional
+effort; on FreeBSD it may need installing from ports; on Linux you would
+need to explicitly install `libdispatch-dev` or the equivalent. You can
+disable the automatic discovery in cmake of libdispatch by setting the
+cmake variable `LLFIO_DISABLE_LIBDISPATCH` to On.
+
### Linux
-On Linux, a similar strategy to Microsoft Windows' approach is used. We
+If libdispatch is not found, we have a custom Linux only userspace
+implementation. A a similar strategy to Microsoft Windows' approach is used. We
dynamically increase the number of kernel threads until none are sleeping
awaiting i/o. If more kernel threads are running than 1.5x the number of
CPUs in the system, the number of kernel threads is dynamically reduced.
-For portability, we also gate the maximum number of kernel threads to 500.
+For portability, we also gate the maximum number of kernel threads to 500,
+except where threads have been detected as being in prolonged wait states.
Note that **all** the kernel threads for the current process are considered,
not just the kernel threads created by this thread pool implementation.
Therefore, if you have alternative thread pool implementations (e.g. OpenMP,
`std::async`), those are also included in the dynamic adjustment.
As this is wholly implemented by this library, dynamic memory allocation
-occurs in the initial `make_dynamic_thread_pool_group()`, but otherwise
-the implementation does not perform dynamic memory allocations.
+occurs in the initial `make_dynamic_thread_pool_group()` and per thread
+creation, but otherwise the implementation does not perform dynamic memory
+allocations.
*/
class LLFIO_DECL dynamic_thread_pool_group
{
@@ -288,14 +307,15 @@ public:
For seekable handles, currently `reads`, `writes` and `barriers` are ignored. We
simply retrieve, periodically, `statfs_t::f_iosinprogress` and `statfs_t::f_iosbusytime`
- for the storage devices backing the seekable handle. If the i/o wait time exceeds
- 95% and the i/o in progress > 1, `next()` will start setting the default deadline passed to
+ for the storage devices backing the seekable handle. If the recent averaged i/o wait time exceeds
+ 95% and the i/o in progress > 32, `next()` will start setting the default deadline passed to
`io_aware_next()`. Thereafter, every 1/10th of a second, if `statfs_t::f_iosinprogress`
- is above 1, it will increase the deadline by 1/16th, whereas if it is
- below 2, it will decrease the deadline by 1/16th. The default deadline chosen is always the worst of all the
+ is above 32, it will increase the deadline by 1/16th, whereas if it is
+ below 32, it will decrease the deadline by 1/16th. The default deadline chosen is always the worst of all the
storage devices of all the handles. This will reduce concurrency within the kernel thread pool
in order to reduce congestion on the storage devices. If at any point `statfs_t::f_iosbusytime`
- drops below 95% as averaged across one second, the additional
+ drops below 95% as averaged across one second, and `statfs_t::f_iosinprogress` drops
+ below 4, the additional
throttling is completely removed. `io_aware_next()` can ignore the default deadline
passed into it, and can set any other deadline.
diff --git a/include/llfio/v2.0/llfio.hpp b/include/llfio/v2.0/llfio.hpp
index 5c0874de..dc7ed70a 100644
--- a/include/llfio/v2.0/llfio.hpp
+++ b/include/llfio/v2.0/llfio.hpp
@@ -63,7 +63,9 @@ import LLFIO_MODULE_NAME;
#include "utils.hpp"
#include "directory_handle.hpp"
+#ifndef LLFIO_EXCLUDE_DYNAMIC_THREAD_POOL_GROUP
#include "dynamic_thread_pool_group.hpp"
+#endif
#include "fast_random_file_handle.hpp"
#include "file_handle.hpp"
#include "process_handle.hpp"
diff --git a/test/tests/dynamic_thread_pool_group.cpp b/test/tests/dynamic_thread_pool_group.cpp
index 9c8c6de8..58d670c4 100644
--- a/test/tests/dynamic_thread_pool_group.cpp
+++ b/test/tests/dynamic_thread_pool_group.cpp
@@ -26,6 +26,8 @@ Distributed under the Boost Software License, Version 1.0.
#include "../test_kernel_decl.hpp"
+#include <cmath> // for sqrt
+
static inline void TestDynamicThreadPoolGroupWorks()
{
namespace llfio = LLFIO_V2_NAMESPACE;
@@ -85,7 +87,7 @@ static inline void TestDynamicThreadPoolGroupWorks()
BOOST_CHECK(llfio::dynamic_thread_pool_group::current_nesting_level() == 1);
BOOST_CHECK(llfio::dynamic_thread_pool_group::current_work_item() == this);
// std::cout << " () executes " << work << std::endl;
- std::this_thread::sleep_for(std::chrono::milliseconds(100));
+ std::this_thread::sleep_for(std::chrono::milliseconds(50));
auto *executed = (std::atomic<size_t> *) &shared->executed[work];
executed->fetch_add(1);
shared->concurrency.fetch_sub(1);
@@ -263,7 +265,8 @@ static inline void TestDynamicThreadPoolGroupNestingWorks()
}
}
work_item(work_item &&o) noexcept
- : nesting(o.nesting)
+ : _base(std::move(o))
+ , nesting(o.nesting)
, shared_states(o.shared_states)
, childwi(std::move(o.childwi))
{
@@ -331,6 +334,10 @@ static inline void TestDynamicThreadPoolGroupNestingWorks()
static inline void TestDynamicThreadPoolGroupIoAwareWorks()
{
+ // statfs_t::iosinprogress not implemented for these yet
+#if defined(__APPLE__) || defined(__FreeBSD__)
+ return;
+#endif
namespace llfio = LLFIO_V2_NAMESPACE;
static constexpr size_t WORK_ITEMS = 1000;
static constexpr size_t IO_SIZE = 1 * 65536;
@@ -407,7 +414,14 @@ static inline void TestDynamicThreadPoolGroupIoAwareWorks()
}
std::this_thread::sleep_for(std::chrono::milliseconds(250));
}
+ std::cout << "\nStopping ..." << std::endl;
tpg->stop().value();
+ while(!tpg->stopped())
+ {
+ std::cout << "\nCurrent concurrency is " << shared_state.concurrency.load(std::memory_order_relaxed) << " and current pacing is "
+ << (shared_state.current_pacing.load(std::memory_order_relaxed) / 1000.0) << " microseconds." << std::endl;
+ std::this_thread::sleep_for(std::chrono::milliseconds(1000));
+ }
auto r = tpg->wait();
if(!r && r.error() != llfio::errc::operation_canceled)
{
@@ -417,8 +431,8 @@ static inline void TestDynamicThreadPoolGroupIoAwareWorks()
}
KERNELTEST_TEST_KERNEL(integration, llfio, dynamic_thread_pool_group, works, "Tests that llfio::dynamic_thread_pool_group works as expected",
- TestDynamicThreadPoolGroupWorks())
+ TestDynamicThreadPoolGroupWorks())
KERNELTEST_TEST_KERNEL(integration, llfio, dynamic_thread_pool_group, nested, "Tests that nesting of llfio::dynamic_thread_pool_group works as expected",
- TestDynamicThreadPoolGroupNestingWorks())
+ TestDynamicThreadPoolGroupNestingWorks())
KERNELTEST_TEST_KERNEL(integration, llfio, dynamic_thread_pool_group, io_aware_work_item,
"Tests that llfio::dynamic_thread_pool_group::io_aware_work_item works as expected", TestDynamicThreadPoolGroupIoAwareWorks())