diff options
author | Niall Douglas <s_github@nedprod.com> | 2021-01-19 18:43:54 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2021-01-19 18:43:54 +0300 |
commit | 0dfeb420c1df842ba76401f5ce3a92074dc3bd92 (patch) | |
tree | 55b0474adf303a0756eac2dd77563d2871bda9c0 | |
parent | 07a08c3404f6db01bf6bd8df7fecc1b0ca214214 (diff) | |
parent | 0d62d125a2ee2404976155f104dbe109430ae0ba (diff) |
Merge 0d62d125a2ee2404976155f104dbe109430ae0ba into 07a08c3404f6db01bf6bd8df7fecc1b0ca214214all_tests_passed_0dfeb420c1df842ba76401f5ce3a92074dc3bd92
-rw-r--r-- | CMakeLists.txt | 28 | ||||
-rw-r--r-- | cmake/QuickCppLibBootstrap.cmake | 4 | ||||
-rw-r--r-- | cmake/headers.cmake | 2 | ||||
-rw-r--r-- | cmake/tests.cmake | 2 | ||||
-rw-r--r-- | include/llfio/revision.hpp | 6 | ||||
-rw-r--r-- | include/llfio/v2.0/config.hpp | 8 | ||||
-rw-r--r-- | include/llfio/v2.0/detail/impl/dynamic_thread_pool_group.ipp | 1757 | ||||
-rw-r--r-- | include/llfio/v2.0/detail/impl/posix/statfs.ipp | 493 | ||||
-rw-r--r-- | include/llfio/v2.0/detail/impl/windows/directory_handle.ipp | 2 | ||||
-rw-r--r-- | include/llfio/v2.0/detail/impl/windows/file_handle.ipp | 111 | ||||
-rw-r--r-- | include/llfio/v2.0/detail/impl/windows/statfs.ipp | 21 | ||||
-rw-r--r-- | include/llfio/v2.0/dynamic_thread_pool_group.hpp | 484 | ||||
-rw-r--r-- | include/llfio/v2.0/fs_handle.hpp | 2 | ||||
-rw-r--r-- | include/llfio/v2.0/llfio.hpp | 7 | ||||
-rw-r--r-- | include/llfio/v2.0/logging.hpp | 31 | ||||
-rw-r--r-- | include/llfio/v2.0/statfs.hpp | 59 | ||||
-rw-r--r-- | include/llfio/v2.0/status_code.hpp | 6 | ||||
-rw-r--r-- | test/tests/dynamic_thread_pool_group.cpp | 452 | ||||
-rw-r--r-- | test/tests/statfs.cpp | 101 |
19 files changed, 3398 insertions, 178 deletions
diff --git a/CMakeLists.txt b/CMakeLists.txt index ab59a446..19f71d06 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -28,6 +28,7 @@ include(QuickCppLibUtils) include(QuickCppLibPolicies) option(LLFIO_USE_EXPERIMENTAL_SG14_STATUS_CODE "Whether to use SG14 status_code for failure handling" OFF) +option(LLFIO_DISABLE_LIBDISPATCH "Whether to disable automatic discovery of libdispatch/Grand Unified Dispatch" OFF) ensure_git_subrepo("${CMAKE_CURRENT_SOURCE_DIR}/include/llfio/ntkernel-error-category/include" "https://github.com/ned14/ntkernel-error-category.git") @@ -275,6 +276,30 @@ int main() { all_compile_definitions(PUBLIC LLFIO_FORCE_EXPERIMENTAL_FILESYSTEM=1 KERNELTEST_FORCE_EXPERIMENTAL_FILESYSTEM=1) endif() endif() +# Do we have Grand Central Dispatch on this platform? +if(NOT LLFIO_DISABLE_LIBDISPATCH) + function(check_have_libdispatch postfix) + set(CMAKE_REQUIRED_LIBRARIES ${ARGN}) + check_cxx_source_compiles(" +#include <dispatch/dispatch.h> +int main() { + return dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0) != nullptr; +} +" LLFIO_HAS_LIBDISPATCH_${postfix}) + endfunction() + check_have_libdispatch(BUILTIN) + if(NOT LLFIO_HAS_LIBDISPATCH_BUILTIN) + check_have_libdispatch(WITH_LIBDISPATCH dispatch) + if(LLFIO_HAS_LIBDISPATCH_WITH_LIBDISPATCH) + all_link_libraries(PUBLIC dispatch) + endif() + endif() +else() + all_compile_definitions(PUBLIC LLFIO_DYNAMIC_THREAD_POOL_GROUP_USING_GCD=0) +endif() +if(NOT LLFIO_HAS_LIBDISPATCH_BUILTIN AND NOT LLFIO_HAS_LIBDISPATCH_WITH_LIBDISPATCH AND (CMAKE_SYSTEM_NAME MATCHES "FreeBSD" OR APPLE)) + indented_message(FATAL_ERROR "FATAL: Grand Central Dispatch as libdispatch was not found on this FreeBSD or Mac OS system. libdispatch is required for LLFIO to build on those systems.") +endif() # Set any macros this library requires all_compile_definitions(PRIVATE LLFIO_INCLUDE_STORAGE_PROFILE=1 LLFIO_ENABLE_TEST_IO_MULTIPLEXERS=1) @@ -285,7 +310,8 @@ if(LLFIO_USE_EXPERIMENTAL_SG14_STATUS_CODE) all_compile_definitions(PUBLIC LLFIO_EXPERIMENTAL_STATUS_CODE=1) endif() if(WIN32) - all_compile_definitions(PRIVATE _WIN32_WINNT=0x600) ## Target WinVista + all_compile_definitions(PRIVATE _WIN32_WINNT=0x601) ## Target Win7 + target_compile_definitions(llfio_hl INTERFACE _WIN32_WINNT=0x601) ## Target Win7 if(NOT LLFIO_USE_EXPERIMENTAL_SG14_STATUS_CODE) target_link_libraries(llfio_hl INTERFACE ntkernel-error-category::hl) target_link_libraries(llfio_dl PUBLIC ntkernel-error-category::dl) diff --git a/cmake/QuickCppLibBootstrap.cmake b/cmake/QuickCppLibBootstrap.cmake index 0094ad4f..359c0eb7 100644 --- a/cmake/QuickCppLibBootstrap.cmake +++ b/cmake/QuickCppLibBootstrap.cmake @@ -28,7 +28,7 @@ foreach(item ${CMAKE_MODULE_PATH}) set(quickcpplib_done ON) endif() endforeach() -if(DEFINED quickcpplib_DIR) +if(NOT quickcpplib_done AND quickcpplib_DIR) find_package(quickcpplib QUIET CONFIG) if(quickcpplib_FOUND) if(EXISTS "${quickcpplib_DIR}/share/cmakelib") @@ -53,6 +53,8 @@ if(NOT quickcpplib_done) set(CTEST_QUICKCPPLIB_SCRIPTS "${CMAKE_SOURCE_DIR}/../quickcpplib/scripts") # Copy latest version of myself into end user file(COPY "${CTEST_QUICKCPPLIB_SCRIPTS}/../cmake/QuickCppLibBootstrap.cmake" DESTINATION "${CMAKE_SOURCE_DIR}/cmake/") + elseif(EXISTS "${CMAKE_CURRENT_SOURCE_DIR}/quickcpplib/repo/cmakelib") + set(CTEST_QUICKCPPLIB_CLONE_DIR "${CMAKE_CURRENT_SOURCE_DIR}/quickcpplib") elseif(CMAKE_BINARY_DIR) # Place into root binary directory, same place as where find_quickcpplib_library() puts dependencies. set(CTEST_QUICKCPPLIB_CLONE_DIR "${CMAKE_BINARY_DIR}/quickcpplib") diff --git a/cmake/headers.cmake b/cmake/headers.cmake index af4511e6..88c7deb9 100644 --- a/cmake/headers.cmake +++ b/cmake/headers.cmake @@ -29,6 +29,7 @@ set(llfio_HEADERS "include/llfio/v2.0/detail/impl/cached_parent_handle_adapter.ipp" "include/llfio/v2.0/detail/impl/clone.ipp" "include/llfio/v2.0/detail/impl/config.ipp" + "include/llfio/v2.0/detail/impl/dynamic_thread_pool_group.ipp" "include/llfio/v2.0/detail/impl/fast_random_file_handle.ipp" "include/llfio/v2.0/detail/impl/io_multiplexer.ipp" "include/llfio/v2.0/detail/impl/path_discovery.ipp" @@ -77,6 +78,7 @@ set(llfio_HEADERS "include/llfio/v2.0/detail/impl/windows/test/iocp_multiplexer.ipp" "include/llfio/v2.0/detail/impl/windows/utils.ipp" "include/llfio/v2.0/directory_handle.hpp" + "include/llfio/v2.0/dynamic_thread_pool_group.hpp" "include/llfio/v2.0/fast_random_file_handle.hpp" "include/llfio/v2.0/file_handle.hpp" "include/llfio/v2.0/fs_handle.hpp" diff --git a/cmake/tests.cmake b/cmake/tests.cmake index 8448643e..0710e844 100644 --- a/cmake/tests.cmake +++ b/cmake/tests.cmake @@ -7,6 +7,7 @@ set(llfio_TESTS "test/tests/directory_handle_create_close/runner.cpp" "test/tests/directory_handle_enumerate/kernel_directory_handle_enumerate.cpp.hpp" "test/tests/directory_handle_enumerate/runner.cpp" + "test/tests/dynamic_thread_pool_group.cpp" "test/tests/fast_random_file_handle.cpp" "test/tests/file_handle_create_close/kernel_file_handle.cpp.hpp" "test/tests/file_handle_create_close/runner.cpp" @@ -27,6 +28,7 @@ set(llfio_TESTS "test/tests/section_handle_create_close/kernel_section_handle.cpp.hpp" "test/tests/section_handle_create_close/runner.cpp" "test/tests/shared_fs_mutex.cpp" + "test/tests/statfs.cpp" "test/tests/symlink_handle_create_close/kernel_symlink_handle.cpp.hpp" "test/tests/symlink_handle_create_close/runner.cpp" "test/tests/traverse.cpp" diff --git a/include/llfio/revision.hpp b/include/llfio/revision.hpp index 2eca1678..9e7774c5 100644 --- a/include/llfio/revision.hpp +++ b/include/llfio/revision.hpp @@ -1,4 +1,4 @@ // Note the second line of this file must ALWAYS be the git SHA, third line ALWAYS the git SHA update time -#define LLFIO_PREVIOUS_COMMIT_REF 7c2fd4d1070394326b39323e9beb53823336eca9 -#define LLFIO_PREVIOUS_COMMIT_DATE "2020-11-28 17:58:14 +00:00" -#define LLFIO_PREVIOUS_COMMIT_UNIQUE 7c2fd4d1 +#define LLFIO_PREVIOUS_COMMIT_REF ca8673c41b5a87fb834f9365c3dfbc6b668cd260 +#define LLFIO_PREVIOUS_COMMIT_DATE "2021-01-15 11:26:17 +00:00" +#define LLFIO_PREVIOUS_COMMIT_UNIQUE ca8673c4 diff --git a/include/llfio/v2.0/config.hpp b/include/llfio/v2.0/config.hpp index a25cb21b..e0cf7883 100644 --- a/include/llfio/v2.0/config.hpp +++ b/include/llfio/v2.0/config.hpp @@ -89,11 +89,11 @@ Distributed under the Boost Software License, Version 1.0. #if defined(_WIN32) #if !defined(_WIN32_WINNT) #define _WIN32_WINNT 0x0600 -#elif _WIN32_WINNT < 0x0600 -#error _WIN32_WINNT must at least be set to Windows Vista for LLFIO to work +#elif _WIN32_WINNT < 0x0601 +#error _WIN32_WINNT must at least be set to Windows 7 for LLFIO to work #endif -#if defined(NTDDI_VERSION) && NTDDI_VERSION < 0x06000000 -#error NTDDI_VERSION must at least be set to Windows Vista for LLFIO to work +#if defined(NTDDI_VERSION) && NTDDI_VERSION < 0x06010000 +#error NTDDI_VERSION must at least be set to Windows 7 for LLFIO to work #endif #endif diff --git a/include/llfio/v2.0/detail/impl/dynamic_thread_pool_group.ipp b/include/llfio/v2.0/detail/impl/dynamic_thread_pool_group.ipp new file mode 100644 index 00000000..e5c7fbb9 --- /dev/null +++ b/include/llfio/v2.0/detail/impl/dynamic_thread_pool_group.ipp @@ -0,0 +1,1757 @@ +/* Dynamic thread pool group +(C) 2020 Niall Douglas <http://www.nedproductions.biz/> (9 commits) +File Created: Dec 2020 + + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License in the accompanying file +Licence.txt or at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + + +Distributed under the Boost Software License, Version 1.0. + (See accompanying file Licence.txt or copy at + http://www.boost.org/LICENSE_1_0.txt) +*/ + +#include "../../dynamic_thread_pool_group.hpp" + +#include "../../file_handle.hpp" +#include "../../statfs.hpp" + +#include <atomic> +#include <mutex> +#include <unordered_map> +#include <unordered_set> +#include <vector> + +#include <iostream> + +#ifndef LLFIO_DYNAMIC_THREAD_POOL_GROUP_USING_GCD +#if LLFIO_FORCE_USE_LIBDISPATCH +#include <dispatch/dispatch.h> +#define LLFIO_DYNAMIC_THREAD_POOL_GROUP_USING_GCD 1 +#else +#ifdef _WIN32 +#include "windows/import.hpp" +#include <threadpoolapiset.h> +#else +#if __has_include(<dispatch/dispatch.h>) +#include <dispatch/dispatch.h> +#define LLFIO_DYNAMIC_THREAD_POOL_GROUP_USING_GCD 1 +#endif +#endif +#endif +#endif +#if !LLFIO_DYNAMIC_THREAD_POOL_GROUP_USING_GCD && !defined(_WIN32) +#if !defined(__linux__) +#error dynamic_thread_pool_group requires Grand Central Dispatch (libdispatch) on non-Linux POSIX. +#endif +#include <dirent.h> /* Defines DT_* constants */ +#include <sys/syscall.h> + +#include <condition_variable> +#include <thread> +#endif + +#define LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING 0 + +LLFIO_V2_NAMESPACE_BEGIN + +namespace detail +{ + struct global_dynamic_thread_pool_impl + { + std::mutex workqueue_lock; + using _lock_guard = std::unique_lock<std::mutex>; + struct workqueue_item + { + std::unordered_set<dynamic_thread_pool_group_impl *> items; + std::unordered_set<dynamic_thread_pool_group_impl *>::iterator currentgroup; + size_t currentgroupremaining{0}; + }; + std::vector<workqueue_item> workqueue; +#if LLFIO_DYNAMIC_THREAD_POOL_GROUP_USING_GCD + using threadh_type = void *; + using grouph_type = dispatch_group_t; + static void _gcd_dispatch_callback(void *arg) + { + auto *workitem = (dynamic_thread_pool_group::work_item *) arg; + global_dynamic_thread_pool()._workerthread(workitem, nullptr); + } + static void _gcd_timer_callback(void *arg) + { + auto *workitem = (dynamic_thread_pool_group::work_item *) arg; + global_dynamic_thread_pool()._timerthread(workitem, nullptr); + } +#elif defined(_WIN32) + using threadh_type = PTP_CALLBACK_INSTANCE; + using grouph_type = PTP_CALLBACK_ENVIRON; + static void CALLBACK _win32_worker_thread_callback(threadh_type threadh, PVOID Parameter, PTP_WORK /*unused*/) + { + auto *workitem = (dynamic_thread_pool_group::work_item *) Parameter; + global_dynamic_thread_pool()._workerthread(workitem, threadh); + } + static void CALLBACK _win32_timer_thread_callback(threadh_type threadh, PVOID Parameter, PTP_TIMER /*unused*/) + { + auto *workitem = (dynamic_thread_pool_group::work_item *) Parameter; + global_dynamic_thread_pool()._timerthread(workitem, threadh); + } +#else + using threadh_type = void *; + using grouph_type = void *; + struct thread_t + { + thread_t *_prev{nullptr}, *_next{nullptr}; + std::thread thread; + std::condition_variable cond; + std::chrono::steady_clock::time_point last_did_work; + int state{0}; // <0 = dead, 0 = sleeping/please die, 1 = busy + }; + struct threads_t + { + size_t count{0}; + thread_t *front{nullptr}, *back{nullptr}; + } threadpool_active, threadpool_sleeping; + std::atomic<size_t> total_submitted_workitems{0}, threadpool_threads{0}, threadpool_sleeping_count{0}; + + std::mutex threadmetrics_lock; + struct threadmetrics_threadid + { + char text[12]; // enough for a UINT32_MAX in decimal + constexpr threadmetrics_threadid() + : text{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0} + { + } + threadmetrics_threadid(string_view sv) + { + memset(text, '0', sizeof(text)); + assert(sv.size() <= sizeof(text)); + if(sv.size() > sizeof(text)) + { + abort(); + } + memcpy(text + sizeof(text) - sv.size(), sv.data(), sv.size()); + } + int compare(const threadmetrics_threadid &o) const noexcept { return memcmp(text, o.text, sizeof(text)); } + bool operator<(const threadmetrics_threadid &o) const noexcept { return compare(o) < 0; } + bool operator==(const threadmetrics_threadid &o) const noexcept { return compare(o) == 0; } + }; + struct threadmetrics_item + { + threadmetrics_item *_prev{nullptr}, *_next{nullptr}; + std::chrono::steady_clock::time_point last_updated, blocked_since; // latter set if thread seen no time + threadmetrics_threadid threadid; + uint32_t diskfaults{(uint32_t) -1}, utime{(uint32_t) -1}, stime{(uint32_t) -1}; // culmulative ticks spent in user and system for this thread + + explicit threadmetrics_item(threadmetrics_threadid v) + : threadid(v) + { + } + string_view threadid_name() const noexcept { return string_view(threadid.text, sizeof(threadid.text)); } + }; + struct threadmetrics_t + { + size_t count{0}; + threadmetrics_item *front{nullptr}, *back{nullptr}; + uint32_t blocked{0}, running{0}; + } threadmetrics_queue; // items at end are least recently updated + std::vector<threadmetrics_item *> threadmetrics_sorted; + std::chrono::steady_clock::time_point threadmetrics_last_updated; +#ifdef __linux__ + int proc_self_task_fd{-1}; +#endif +#endif + + std::mutex io_aware_work_item_handles_lock; + struct io_aware_work_item_statfs + { + size_t refcount{0}; + deadline default_deadline; + float average_busy{0}, average_queuedepth{0}; + std::chrono::steady_clock::time_point last_updated; + statfs_t statfs; + }; + std::unordered_map<fs_handle::unique_id_type, io_aware_work_item_statfs, fs_handle::unique_id_type_hasher> io_aware_work_item_handles; + + global_dynamic_thread_pool_impl() + { + workqueue.reserve(4); // preallocate 4 levels of nesting +#if !LLFIO_DYNAMIC_THREAD_POOL_GROUP_USING_GCD && !defined(_WIN32) + populate_threadmetrics(std::chrono::steady_clock::now()); +#endif + } + + template <class T, class U> static void _append_to_list(T &what, U *v) + { + if(what.front == nullptr) + { + assert(what.back == nullptr); + v->_next = v->_prev = nullptr; + what.front = what.back = v; + } + else + { + v->_next = nullptr; + v->_prev = what.back; + what.back->_next = v; + what.back = v; + } + what.count++; + } + template <class T, class U> static void _remove_from_list(T &what, U *v) + { + if(v->_prev == nullptr && v->_next == nullptr) + { + assert(what.front == v); + assert(what.back == v); + what.front = what.back = nullptr; + } + else if(v->_prev == nullptr) + { + assert(what.front == v); + v->_next->_prev = nullptr; + what.front = v->_next; + v->_next = v->_prev = nullptr; + } + else if(v->_next == nullptr) + { + assert(what.back == v); + v->_prev->_next = nullptr; + what.back = v->_prev; + v->_next = v->_prev = nullptr; + } + else + { + v->_next->_prev = v->_prev; + v->_prev->_next = v->_next; + v->_next = v->_prev = nullptr; + } + what.count--; + } + +#if !LLFIO_DYNAMIC_THREAD_POOL_GROUP_USING_GCD && !defined(_WIN32) +#ifdef __linux__ + void populate_threadmetrics(std::chrono::steady_clock::time_point now) + { + static thread_local std::vector<char> kernelbuffer(1024); + static thread_local std::vector<threadmetrics_threadid> threadidsbuffer(1024 / sizeof(dirent)); + using getdents64_t = int (*)(int, char *, unsigned int); + static auto getdents = static_cast<getdents64_t>([](int fd, char *buf, unsigned count) -> int { return syscall(SYS_getdents64, fd, buf, count); }); + using dirent = dirent64; + int bytes = 0; + { + _lock_guard g(threadmetrics_lock); + if(now - threadmetrics_last_updated < std::chrono::milliseconds(100)) + { + return; + } + if(proc_self_task_fd < 0) + { + proc_self_task_fd = ::open("/proc/self/task", O_RDONLY | O_DIRECTORY | O_CLOEXEC); + if(proc_self_task_fd < 0) + { + posix_error().throw_exception(); + } + } + for(;;) + { + if(-1 == ::lseek64(proc_self_task_fd, 0, SEEK_SET)) + { + posix_error().throw_exception(); + } + bytes = getdents(proc_self_task_fd, kernelbuffer.data(), kernelbuffer.size()); + if(bytes < (int) kernelbuffer.size()) + { + break; + } + kernelbuffer.resize(kernelbuffer.size() << 1); + } + } + threadidsbuffer.clear(); + for(auto *dent = (dirent *) kernelbuffer.data();; dent = reinterpret_cast<dirent *>(reinterpret_cast<uintptr_t>(dent) + dent->d_reclen)) + { + if(dent->d_ino != 0u && dent->d_type == DT_DIR && dent->d_name[0] != '.') + { + size_t length = strchr(dent->d_name, 0) - dent->d_name; + threadidsbuffer.push_back(string_view(dent->d_name, length)); + } + if((bytes -= dent->d_reclen) <= 0) + { + break; + } + } + std::sort(threadidsbuffer.begin(), threadidsbuffer.end()); + _lock_guard g(threadmetrics_lock); + auto d_it = threadmetrics_sorted.begin(); + auto s_it = threadidsbuffer.begin(); + auto remove_item = [&] { + std::cout << "Removing thread " << (*d_it)->threadid_name() << std::endl; + if((*d_it)->blocked_since != std::chrono::steady_clock::time_point()) + { + threadmetrics_queue.blocked--; + } + else + { + threadmetrics_queue.running--; + } + _remove_from_list(threadmetrics_queue, *d_it); + d_it = threadmetrics_sorted.erase(d_it); + }; + auto add_item = [&] { + auto p = std::make_unique<threadmetrics_item>(*s_it++); + if(d_it != threadmetrics_sorted.end()) + { + ++d_it; + } + d_it = threadmetrics_sorted.insert(d_it, p.get()); + _append_to_list(threadmetrics_queue, p.get()); + std::cout << "Adding thread " << p->threadid_name() << std::endl; + p.release(); + threadmetrics_queue.running++; + }; + for(; d_it != threadmetrics_sorted.end() && s_it != threadidsbuffer.end();) + { + auto c = (*d_it)->threadid.compare(*s_it); + // std::cout << "Comparing " << (*d_it)->threadid_name() << " with " << string_view(s_it->text, 12) << " = " << c << std::endl; + if(0 == c) + { + ++d_it; + ++s_it; + continue; + } + if(c < 0) + { + // d_it has gone away + remove_item(); + } + if(c > 0) + { + // s_it is a new entry + add_item(); + } + } + while(d_it != threadmetrics_sorted.end()) + { + remove_item(); + } + while(s_it != threadidsbuffer.end()) + { + add_item(); + } +#if 0 + std::cout << "Threadmetrics:"; + for(auto *p : threadmetrics_sorted) + { + std::cout << "\n " << p->threadid_name(); + } + std::cout << std::endl; +#endif + assert(threadmetrics_sorted.size() == threadidsbuffer.size()); + assert(std::is_sorted(threadmetrics_sorted.begin(), threadmetrics_sorted.end(), + [](threadmetrics_item *a, threadmetrics_item *b) { return a->threadid < b->threadid; })); + threadmetrics_last_updated = now; + } +#endif + + inline void _execute_work(thread_t *self); + + void _add_thread(_lock_guard & /*unused*/) + { + thread_t *p = nullptr; + try + { + p = new thread_t; + _append_to_list(threadpool_active, p); + p->thread = std::thread([this, p] { _execute_work(p); }); + } + catch(...) + { + if(p != nullptr) + { + _remove_from_list(threadpool_active, p); + } + // drop failure + } + } + + bool _remove_thread(_lock_guard &g, threads_t &which) + { + if(which.count == 0) + { + return false; + } + // Threads which went to sleep the longest ago are at the front + auto *t = which.front; + assert(t->state == 0); + t->state--; +#if LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING + std::cout << "*** DTP " << t << " is told to quit" << std::endl; +#endif + do + { + g.unlock(); + t->cond.notify_one(); + g.lock(); + } while(t->state >= -1); +#if LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING + std::cout << "*** DTP " << t << " has quit, deleting" << std::endl; +#endif + _remove_from_list(threadpool_active, t); + t->thread.join(); + delete t; + return true; + } + + ~global_dynamic_thread_pool_impl() + { + { + _lock_guard g(workqueue_lock); // lock global + while(threadpool_active.count > 0 || threadpool_sleeping.count > 0) + { + while(threadpool_sleeping.count > 0) + { + auto removed = _remove_thread(g, threadpool_sleeping); + assert(removed); + (void) removed; + } + if(threadpool_active.count > 0) + { + auto removed = _remove_thread(g, threadpool_active); + assert(removed); + (void) removed; + } + } + } + _lock_guard g(threadmetrics_lock); + for(auto *p : threadmetrics_sorted) + { + delete p; + } + threadmetrics_sorted.clear(); + threadmetrics_queue = {}; +#ifdef __linux__ + if(proc_self_task_fd > 0) + { + ::close(proc_self_task_fd); + proc_self_task_fd = -1; + } +#endif + } +#endif + + result<void> _prepare_work_item_delay(dynamic_thread_pool_group::work_item *workitem, grouph_type grouph, deadline d) + { + if(!d) + { + return errc::invalid_argument; + } + workitem->_timepoint1 = {}; + workitem->_timepoint2 = {}; + if(workitem->_nextwork == 0 || d.nsecs > 0) + { + if(nullptr == workitem->_internaltimerh) + { +#if LLFIO_DYNAMIC_THREAD_POOL_GROUP_USING_GCD + (void) grouph; + workitem->_internaltimerh = (void *) (uintptr_t) -1; +#elif defined(_WIN32) + workitem->_internaltimerh = CreateThreadpoolTimer(_win32_timer_thread_callback, workitem, grouph); + if(nullptr == workitem->_internaltimerh) + { + return win32_error(); + } +#else + (void) grouph; + workitem->_internaltimerh = (void *) (uintptr_t) -1; +#endif + } + if(d.nsecs > 0) + { + if(d.steady) + { + workitem->_timepoint1 = std::chrono::steady_clock::now() + std::chrono::nanoseconds(d.nsecs); + } + else + { + workitem->_timepoint2 = d.to_time_point(); + } + } + else + { + workitem->_timepoint1 = std::chrono::steady_clock::time_point(std::chrono::steady_clock::duration(1)); + } + } + return success(); + } + + inline void _submit_work_item(_lock_guard &g, dynamic_thread_pool_group::work_item *workitem); + + inline result<void> submit(_lock_guard &g, dynamic_thread_pool_group_impl *group, span<dynamic_thread_pool_group::work_item *> work) noexcept; + + inline void _work_item_done(_lock_guard &g, dynamic_thread_pool_group::work_item *i) noexcept; + inline void _work_item_next(_lock_guard &g, dynamic_thread_pool_group::work_item *i) noexcept; + + inline result<void> stop(_lock_guard &g, dynamic_thread_pool_group_impl *group, result<void> err) noexcept; + inline result<void> wait(_lock_guard &g, bool reap, dynamic_thread_pool_group_impl *group, deadline d) noexcept; + + inline void _timerthread(dynamic_thread_pool_group::work_item *workitem, threadh_type selfthreadh); + inline void _workerthread(dynamic_thread_pool_group::work_item *workitem, threadh_type selfthreadh); + }; + struct global_dynamic_thread_pool_impl_thread_local_state_t + { + dynamic_thread_pool_group::work_item *workitem{nullptr}; + global_dynamic_thread_pool_impl::threadh_type current_callback_instance{nullptr}; + size_t nesting_level{0}; + }; + LLFIO_HEADERS_ONLY_FUNC_SPEC global_dynamic_thread_pool_impl_thread_local_state_t &global_dynamic_thread_pool_thread_local_state() noexcept + { + static thread_local global_dynamic_thread_pool_impl_thread_local_state_t tls; + return tls; + } + + LLFIO_HEADERS_ONLY_FUNC_SPEC global_dynamic_thread_pool_impl &global_dynamic_thread_pool() noexcept + { + static global_dynamic_thread_pool_impl impl; + return impl; + } +} // namespace detail + + +class dynamic_thread_pool_group_impl final : public dynamic_thread_pool_group +{ + friend struct detail::global_dynamic_thread_pool_impl; + + mutable std::mutex _lock; + using _lock_guard = detail::global_dynamic_thread_pool_impl::_lock_guard; + size_t _nesting_level{0}; + struct workitems_t + { + size_t count{0}; + dynamic_thread_pool_group::work_item *front{nullptr}, *back{nullptr}; + } _work_items_active, _work_items_done, _work_items_delayed; + std::atomic<bool> _stopping{false}, _stopped{true}, _completing{false}; + std::atomic<int> _waits{0}; + result<void> _abnormal_completion_cause{success()}; // The cause of any abnormal group completion + +#if LLFIO_DYNAMIC_THREAD_POOL_GROUP_USING_GCD + dispatch_group_t _grouph; +#elif defined(_WIN32) + TP_CALLBACK_ENVIRON _callbackenviron; + PTP_CALLBACK_ENVIRON _grouph{&_callbackenviron}; +#else + void *_grouph{nullptr}; +#endif + +public: + result<void> init() + { + LLFIO_LOG_FUNCTION_CALL(this); + try + { + auto &impl = detail::global_dynamic_thread_pool(); + _nesting_level = detail::global_dynamic_thread_pool_thread_local_state().nesting_level; +#if LLFIO_DYNAMIC_THREAD_POOL_GROUP_USING_GCD + _grouph = dispatch_group_create(); + if(_grouph == nullptr) + { + return errc::not_enough_memory; + } +#elif defined(_WIN32) + InitializeThreadpoolEnvironment(_grouph); +#endif + detail::global_dynamic_thread_pool_impl::_lock_guard g(impl.workqueue_lock); // lock global + // Append this group to the global work queue at its nesting level + if(_nesting_level >= impl.workqueue.size()) + { + impl.workqueue.resize(_nesting_level + 1); + } + auto &wq = impl.workqueue[_nesting_level]; + wq.items.insert(this); + if(wq.items.size() == 1) + { + wq.currentgroup = wq.items.begin(); + wq.currentgroupremaining = _work_items_active.count; + } + return success(); + } + catch(...) + { + return error_from_exception(); + } + } + + virtual ~dynamic_thread_pool_group_impl() + { + LLFIO_LOG_FUNCTION_CALL(this); + (void) wait(); + auto &impl = detail::global_dynamic_thread_pool(); +#if LLFIO_DYNAMIC_THREAD_POOL_GROUP_USING_GCD + if(nullptr != _grouph) + { + dispatch_release(_grouph); + _grouph = nullptr; + } +#elif defined(_WIN32) + if(nullptr != _grouph) + { + DestroyThreadpoolEnvironment(_grouph); + _grouph = nullptr; + } +#endif + detail::global_dynamic_thread_pool_impl::_lock_guard g(impl.workqueue_lock); // lock global + assert(impl.workqueue.size() > _nesting_level); + auto &wq = impl.workqueue[_nesting_level]; + if(*wq.currentgroup == this) + { + if(wq.items.end() == ++wq.currentgroup) + { + wq.currentgroup = wq.items.begin(); + } + if(!wq.items.empty()) + { + wq.currentgroupremaining = (*wq.currentgroup)->_work_items_active.count; + } + } + wq.items.erase(this); + while(!impl.workqueue.empty() && impl.workqueue.back().items.empty()) + { + impl.workqueue.pop_back(); + } + } + + virtual result<void> submit(span<work_item *> work) noexcept override + { + LLFIO_LOG_FUNCTION_CALL(this); + if(_stopping.load(std::memory_order_relaxed)) + { + return errc::operation_canceled; + } + if(_completing.load(std::memory_order_relaxed)) + { + for(auto *i : work) + { + i->_parent.store(this, std::memory_order_release); + detail::global_dynamic_thread_pool_impl::_append_to_list(_work_items_delayed, i); + } + return success(); + } + _stopped.store(false, std::memory_order_release); + auto &impl = detail::global_dynamic_thread_pool(); + _lock_guard g(_lock); // lock group + if(_work_items_active.count == 0 && _work_items_done.count == 0) + { + _abnormal_completion_cause = success(); + } + OUTCOME_TRY(impl.submit(g, this, work)); + if(_work_items_active.count == 0) + { + _stopped.store(true, std::memory_order_release); + } + return success(); + } + + virtual result<void> stop() noexcept override + { + LLFIO_LOG_FUNCTION_CALL(this); + if(_stopped.load(std::memory_order_relaxed)) + { + return success(); + } + auto &impl = detail::global_dynamic_thread_pool(); + _lock_guard g(_lock); // lock group + return impl.stop(g, this, errc::operation_canceled); + } + + virtual bool stopping() const noexcept override { return _stopping.load(std::memory_order_relaxed); } + + virtual bool stopped() const noexcept override { return _stopped.load(std::memory_order_relaxed); } + + virtual result<void> wait(deadline d = {}) const noexcept override + { + LLFIO_LOG_FUNCTION_CALL(this); + if(_stopped.load(std::memory_order_relaxed)) + { + return success(); + } + auto &impl = detail::global_dynamic_thread_pool(); + _lock_guard g(_lock); // lock group + return impl.wait(g, true, const_cast<dynamic_thread_pool_group_impl *>(this), d); + } +}; + +LLFIO_HEADERS_ONLY_MEMFUNC_SPEC size_t dynamic_thread_pool_group::current_nesting_level() noexcept +{ + return detail::global_dynamic_thread_pool_thread_local_state().nesting_level; +} + +LLFIO_HEADERS_ONLY_MEMFUNC_SPEC dynamic_thread_pool_group::work_item *dynamic_thread_pool_group::current_work_item() noexcept +{ + return detail::global_dynamic_thread_pool_thread_local_state().workitem; +} + +LLFIO_HEADERS_ONLY_FUNC_SPEC result<dynamic_thread_pool_group_ptr> make_dynamic_thread_pool_group() noexcept +{ + try + { + auto ret = std::make_unique<dynamic_thread_pool_group_impl>(); + OUTCOME_TRY(ret->init()); + return dynamic_thread_pool_group_ptr(std::move(ret)); + } + catch(...) + { + return error_from_exception(); + } +} + +namespace detail +{ +#if !LLFIO_DYNAMIC_THREAD_POOL_GROUP_USING_GCD && !defined(_WIN32) + inline void global_dynamic_thread_pool_impl::_execute_work(thread_t *self) + { + pthread_setname_np(pthread_self(), "LLFIO DYN TPG"); + self->last_did_work = std::chrono::steady_clock::now(); + _lock_guard g(workqueue_lock); // lock global + self->state++; // busy + threadpool_threads.fetch_add(1, std::memory_order_release); +#if LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING + std::cout << "*** DTP " << self << " begins." << std::endl; +#endif + size_t workqueue_depth = workqueue.size() - 1; + while(self->state > 0) + { + restart: + dynamic_thread_pool_group::work_item *workitem = nullptr; + std::chrono::steady_clock::time_point earliest_duration; + std::chrono::system_clock::time_point earliest_absolute; + if(!workqueue.empty()) + { + auto wq_it = (workqueue_depth >= workqueue.size()) ? --workqueue.end() : (workqueue.begin() + workqueue_depth); + auto *wq = &(*wq_it); + for(;;) + { + dynamic_thread_pool_group_impl *tpg = *wq->currentgroup; + _lock_guard gg(tpg->_lock); // lock group + if(wq->currentgroupremaining > tpg->_work_items_active.count) + { + wq->currentgroupremaining = tpg->_work_items_active.count; + } +#if LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING + std::cout << "*** DTP " << self << " sees " << wq->currentgroupremaining << " items remaining in group " << tpg << std::endl; +#endif + if(wq->currentgroupremaining > 0 && tpg->_work_items_active.front->_internalworkh == nullptr) + { + auto *wi = tpg->_work_items_active.front; + _remove_from_list(tpg->_work_items_active, wi); + _append_to_list(tpg->_work_items_active, wi); + wq->currentgroupremaining--; + if(wi->_internaltimerh == nullptr) + { + workitem = wi; + workitem->_internalworkh = self; +#if LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING + std::cout << "*** DTP " << self << " chooses work item " << workitem << " from group " << tpg << " distance from top " + << (workqueue.end() - wq_it - 1) << std::endl; +#endif + break; + } + bool invoketimer = false; + if(wi->_timepoint1 != std::chrono::steady_clock::time_point()) + { + // Special constant for immediately rescheduled work items + if(wi->_timepoint1 == std::chrono::steady_clock::time_point(std::chrono::steady_clock::duration(1))) + { + invoketimer = true; + } + else if(earliest_duration == std::chrono::steady_clock::time_point() || wi->_timepoint1 < earliest_duration) + { + earliest_duration = wi->_timepoint1; + if(wi->_timepoint1 <= std::chrono::steady_clock::now()) + { + invoketimer = true; + } + } + } + if(wi->_timepoint2 != std::chrono::system_clock::time_point() && + (earliest_absolute == std::chrono::system_clock::time_point() || wi->_timepoint2 < earliest_absolute)) + { + earliest_absolute = wi->_timepoint2; + if(wi->_timepoint2 <= std::chrono::system_clock::now()) + { + invoketimer = true; + } + } + if(invoketimer) + { + wi->_internalworkh = self; + wi->_internaltimerh = nullptr; +#if LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING + std::cout << "*** DTP " << self << " executes timer item " << wi << std::endl; +#endif + gg.unlock(); + g.unlock(); + _timerthread(wi, nullptr); + g.lock(); + // wi->_internalworkh should be null, however wi may also no longer exist + goto restart; + } +#if LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING + std::cout << "*** DTP " << self << " timer item " << wi << " timer is not ready yet " << std::endl; +#endif + } + else + { + auto startinggroup = wq->currentgroup; + bool at_top = (workqueue_depth == workqueue.size() - 1); + do + { + gg.unlock(); // unlock group + if(++wq->currentgroup == wq->items.end()) + { + wq->currentgroup = wq->items.begin(); + } + if(!at_top) + { + workqueue_depth = workqueue.size() - 1; + wq_it = --workqueue.end(); + at_top = true; + wq = &(*wq_it); + startinggroup = wq->currentgroup; + } + tpg = *wq->currentgroup; + gg = _lock_guard(tpg->_lock); // lock group +#if LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING + std::cout << "*** DTP " << self << " workqueue distance " << (workqueue.end() - wq_it - 1) << " examining " << tpg + << " finds _work_items_active.count = " << tpg->_work_items_active.count << "." << std::endl; +#endif + if(startinggroup == wq->currentgroup) + { + if(tpg->_work_items_active.count == 0 || tpg->_work_items_active.front->_internalworkh != nullptr) + { + // Nothing for me to do in this workqueue + if(wq_it == workqueue.begin()) + { + assert(workitem == nullptr); +#if LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING + std::cout << "*** DTP " << self << " finds work queue empty, going to sleep." << std::endl; +#endif + goto workqueue_empty; + } + gg.unlock(); // unlock group + --wq_it; + workqueue_depth = wq_it - workqueue.begin(); + wq = &(*wq_it); + tpg = *wq->currentgroup; + startinggroup = wq->currentgroup; + gg = _lock_guard(tpg->_lock); // lock group +#if LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING + std::cout << "*** DTP " << self << " moves work search up to distance from top = " << (workqueue.end() - wq_it - 1) << " examining " << tpg + << " finds _work_items_active.count = " << tpg->_work_items_active.count << "." << std::endl; +#endif + continue; + } + } + } while(tpg->_work_items_active.count == 0); + wq->currentgroupremaining = tpg->_work_items_active.count; +#if LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING + std::cout << "*** DTP " << self << " choose new group " << tpg << std::endl; +#endif + } + } + } + workqueue_empty: + auto now = std::chrono::steady_clock::now(); + if(workitem == nullptr) + { + std::chrono::steady_clock::duration duration(std::chrono::minutes(1)); + if(earliest_duration != std::chrono::steady_clock::time_point()) + { + if(now - earliest_duration < duration) + { + duration = now - earliest_duration; + } + } + else if(earliest_absolute != std::chrono::system_clock::time_point()) + { + auto diff = std::chrono::system_clock::now() - earliest_absolute; + if(diff > duration) + { + earliest_absolute = {}; + } + } + else if(now - self->last_did_work >= std::chrono::minutes(1)) + { + _remove_from_list(threadpool_active, self); + self->thread.detach(); + delete self; + return; + } + self->last_did_work = now; + _remove_from_list(threadpool_active, self); + _append_to_list(threadpool_sleeping, self); + self->state--; + threadpool_sleeping_count.fetch_add(1, std::memory_order_release); + if(earliest_absolute != std::chrono::system_clock::time_point()) + { +#if LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING + std::cout << "*** DTP " << self << " goes to sleep absolute" << std::endl; +#endif + self->cond.wait_until(g, earliest_absolute); + } + else + { +#if LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING + std::cout << "*** DTP " << self << " goes to sleep for " << std::chrono::duration_cast<std::chrono::milliseconds>(duration).count() << std::endl; +#endif + self->cond.wait_for(g, duration); + } + self->state++; + _remove_from_list(threadpool_sleeping, self); + _append_to_list(threadpool_active, self); + threadpool_sleeping_count.fetch_sub(1, std::memory_order_release); +#if LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING + std::cout << "*** DTP " << self << " wakes, state = " << self->state << std::endl; +#endif + g.unlock(); + try + { + populate_threadmetrics(now); + } + catch(...) + { + } + g.lock(); + continue; + } + self->last_did_work = now; +#if LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING + std::cout << "*** DTP " << self << " executes work item " << workitem << std::endl; +#endif + total_submitted_workitems.fetch_sub(1, std::memory_order_relaxed); + g.unlock(); + _workerthread(workitem, nullptr); + // workitem->_internalworkh should be null, however workitem may also no longer exist + try + { + populate_threadmetrics(now); + } + catch(...) + { + } + g.lock(); + } + self->state -= 2; // dead + threadpool_threads.fetch_sub(1, std::memory_order_release); +#if LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING + std::cout << "*** DTP " << self << " exits, state = " << self->state << std::endl; +#endif + } +#endif + + inline void global_dynamic_thread_pool_impl::_submit_work_item(_lock_guard &g, dynamic_thread_pool_group::work_item *workitem) + { + (void) g; + if(workitem->_nextwork != -1) + { + // If no work item for now, or there is a delay, schedule a timer + if(workitem->_nextwork == 0 || workitem->_timepoint1 != std::chrono::steady_clock::time_point() || + workitem->_timepoint2 != std::chrono::system_clock::time_point()) + { + assert(workitem->_internaltimerh != nullptr); +#if LLFIO_DYNAMIC_THREAD_POOL_GROUP_USING_GCD + dispatch_time_t when; + if(workitem->_timepoint1 != std::chrono::steady_clock::time_point()) + { + auto duration = std::chrono::duration_cast<std::chrono::nanoseconds>(workitem->_timepoint1 - std::chrono::steady_clock::now()).count(); + if(duration > 1000000000LL) + { + // Because GCD has no way of cancelling timers, nor assigning them to a group, + // we clamp the timer to 1 second. Then if cancellation is ever done to the group, + // the worst possible wait is 1 second. _timerthread will reschedule the timer + // if it gets called short. + duration = 1000000000LL; + } + when = dispatch_time(DISPATCH_TIME_NOW, duration); + } + else if(workitem->_timepoint2 != std::chrono::system_clock::time_point()) + { + deadline d(workitem->_timepoint2); + auto now = std::chrono::system_clock::now(); + if(workitem->_timepoint2 - now > std::chrono::seconds(1)) + { + d = now + std::chrono::seconds(1); + } + when = dispatch_walltime(&d.utc, 0); + } + else + { + when = dispatch_time(DISPATCH_TIME_NOW, 1); // smallest possible non immediate duration from now + } + // std::cout << "*** timer " << workitem << std::endl; + dispatch_after_f(when, dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_HIGH, 0), workitem, _gcd_timer_callback); +#elif defined(_WIN32) + LARGE_INTEGER li; + DWORD slop = 1000; + if(workitem->_timepoint1 != std::chrono::steady_clock::time_point()) + { + li.QuadPart = std::chrono::duration_cast<std::chrono::nanoseconds>(workitem->_timepoint1 - std::chrono::steady_clock::now()).count() / 100; + if(li.QuadPart < 0) + { + li.QuadPart = 0; + } + if(li.QuadPart / 8 < (int64_t) slop) + { + slop = (DWORD)(li.QuadPart / 8); + } + li.QuadPart = -li.QuadPart; // negative is relative + } + else if(workitem->_timepoint2 != std::chrono::system_clock::time_point()) + { + li = windows_nt_kernel::from_timepoint(workitem->_timepoint2); + } + else + { + li.QuadPart = -1; // smallest possible non immediate duration from now + } + FILETIME ft; + ft.dwHighDateTime = (DWORD) li.HighPart; + ft.dwLowDateTime = li.LowPart; + // std::cout << "*** timer " << workitem << std::endl; + SetThreadpoolTimer((PTP_TIMER) workitem->_internaltimerh, &ft, 0, slop); +#endif + } + else + { +#if LLFIO_DYNAMIC_THREAD_POOL_GROUP_USING_GCD + intptr_t priority = DISPATCH_QUEUE_PRIORITY_LOW; + if(workqueue.size() - workitem->_parent.load(std::memory_order_relaxed)->_nesting_level == 1) + { + priority = DISPATCH_QUEUE_PRIORITY_HIGH; + } + else if(workqueue.size() - workitem->_parent.load(std::memory_order_relaxed)->_nesting_level == 2) + { + priority = DISPATCH_QUEUE_PRIORITY_DEFAULT; + } + // std::cout << "*** submit " << workitem << std::endl; + dispatch_group_async_f(workitem->_parent.load(std::memory_order_relaxed)->_grouph, dispatch_get_global_queue(priority, 0), workitem, + _gcd_dispatch_callback); +#elif defined(_WIN32) + // Set the priority of the group according to distance from the top + TP_CALLBACK_PRIORITY priority = TP_CALLBACK_PRIORITY_LOW; + if(workqueue.size() - workitem->_parent.load(std::memory_order_relaxed)->_nesting_level == 1) + { + priority = TP_CALLBACK_PRIORITY_HIGH; + } + else if(workqueue.size() - workitem->_parent.load(std::memory_order_relaxed)->_nesting_level == 2) + { + priority = TP_CALLBACK_PRIORITY_NORMAL; + } + SetThreadpoolCallbackPriority(workitem->_parent.load(std::memory_order_relaxed)->_grouph, priority); + // std::cout << "*** submit " << workitem << std::endl; + SubmitThreadpoolWork((PTP_WORK) workitem->_internalworkh); +#endif + } +#if !LLFIO_DYNAMIC_THREAD_POOL_GROUP_USING_GCD && !defined(_WIN32) + // Indicate that I can be executed again + workitem->_internalworkh = nullptr; +#if LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING + std::cout << "*** DTP submits work item " << workitem << std::endl; +#endif + const auto active_work_items = total_submitted_workitems.fetch_add(1, std::memory_order_relaxed) + 1; + const auto sleeping_count = threadpool_sleeping_count.load(std::memory_order_relaxed); + const auto threads = threadpool_threads.load(std::memory_order_relaxed); + if(sleeping_count > 0 || threads == 0) + { + g.unlock(); // unlock group + { + _lock_guard gg(workqueue_lock); // lock global + if(threadpool_active.count == 0 && threadpool_sleeping.count == 0) + { + _add_thread(gg); + _add_thread(gg); + _add_thread(gg); + _add_thread(gg); + } + else if(threadpool_sleeping.count > 0 && active_work_items > threadpool_active.count) + { + // Try to wake the most recently slept first + auto *t = threadpool_sleeping.back; + auto now = std::chrono::steady_clock::now(); + for(size_t n = std::min(active_work_items - threadpool_active.count, threadpool_sleeping.count); n > 0; n--) + { + t->last_did_work = now; // prevent reap + t->cond.notify_one(); + t = t->_prev; + } + } + } + g.lock(); // lock group + } +#endif + } + } + + inline result<void> global_dynamic_thread_pool_impl::submit(_lock_guard &g, dynamic_thread_pool_group_impl *group, + span<dynamic_thread_pool_group::work_item *> work) noexcept + { + try + { + if(work.empty()) + { + return success(); + } + for(auto *i : work) + { + if(i->_parent.load(std::memory_order_relaxed) != nullptr) + { + return errc::address_in_use; + } + } + auto uninit = make_scope_exit([&]() noexcept { + for(auto *i : work) + { + _remove_from_list(group->_work_items_active, i); +#if LLFIO_DYNAMIC_THREAD_POOL_GROUP_USING_GCD + i->_internalworkh = nullptr; + i->_internaltimerh = nullptr; +#elif defined(_WIN32) + if(nullptr != i->_internaltimerh) + { + CloseThreadpoolTimer((PTP_TIMER) i->_internaltimerh); + i->_internaltimerh = nullptr; + } + if(nullptr != i->_internalworkh) + { + CloseThreadpoolWork((PTP_WORK) i->_internalworkh); + i->_internalworkh = nullptr; + } +#endif + i->_parent.store(nullptr, std::memory_order_release); + } + }); + for(auto *i : work) + { + deadline d(std::chrono::seconds(0)); + i->_parent.store(group, std::memory_order_release); + i->_nextwork = i->next(d); + if(-1 == i->_nextwork) + { + _append_to_list(group->_work_items_done, i); + } + else + { +#if LLFIO_DYNAMIC_THREAD_POOL_GROUP_USING_GCD + i->_internalworkh = (void *) (uintptr_t) -1; +#elif defined(_WIN32) + i->_internalworkh = CreateThreadpoolWork(_win32_worker_thread_callback, i, group->_grouph); + if(nullptr == i->_internalworkh) + { + return win32_error(); + } +#endif + OUTCOME_TRY(_prepare_work_item_delay(i, group->_grouph, d)); + _append_to_list(group->_work_items_active, i); + } + } + uninit.release(); + { + for(auto *i : work) + { + _submit_work_item(g, i); + } + } + return success(); + } + catch(...) + { + return error_from_exception(); + } + } + + inline void global_dynamic_thread_pool_impl::_work_item_done(_lock_guard &g, dynamic_thread_pool_group::work_item *i) noexcept + { + (void) g; + // std::cout << "*** _work_item_done " << i << std::endl; +#if LLFIO_DYNAMIC_THREAD_POOL_GROUP_USING_GCD + i->_internaltimerh = nullptr; + i->_internalworkh = nullptr; +#elif defined(_WIN32) + if(i->_internalworkh_inuse > 0) + { + i->_internalworkh_inuse = 2; + } + else + { + if(i->_internaltimerh != nullptr) + { + CloseThreadpoolTimer((PTP_TIMER) i->_internaltimerh); + i->_internaltimerh = nullptr; + } + if(i->_internalworkh != nullptr) + { + CloseThreadpoolWork((PTP_WORK) i->_internalworkh); + i->_internalworkh = nullptr; + } + } +#else + i->_internaltimerh = nullptr; + i->_internalworkh = nullptr; +#if LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING + std::cout << "*** DTP sets done work item " << i << std::endl; +#endif +#endif + _remove_from_list(i->_parent.load(std::memory_order_relaxed)->_work_items_active, i); + _append_to_list(i->_parent.load(std::memory_order_relaxed)->_work_items_done, i); + if(i->_parent.load(std::memory_order_relaxed)->_work_items_active.count == 0) + { + auto *parent = i->_parent.load(std::memory_order_relaxed); + i = nullptr; + auto *v = parent->_work_items_done.front, *n = v; + for(; v != nullptr; v = n) + { + v->_parent.store(nullptr, std::memory_order_release); + v->_nextwork = -1; + n = v->_next; + } + n = v = parent->_work_items_done.front; + parent->_work_items_done.front = parent->_work_items_done.back = nullptr; + parent->_work_items_done.count = 0; + parent->_stopping.store(false, std::memory_order_release); + parent->_stopped.store(true, std::memory_order_release); + parent->_completing.store(true, std::memory_order_release); // cause submissions to enter _work_items_delayed +#if LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING + std::cout << "*** DTP executes group_complete for group " << parent << std::endl; +#endif + for(; v != nullptr; v = n) + { + n = v->_next; + v->group_complete(parent->_abnormal_completion_cause); + } + parent->_completing.store(false, std::memory_order_release); // cease submitting to _work_items_delayed +#if LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING + std::cout << "*** DTP group_complete done for group " << parent << ". _work_items_delayed.count = " << parent->_work_items_delayed.count << std::endl; +#endif + if(parent->_work_items_delayed.count > 0) + { + /* If there are waits on this group to complete, forward progress those now. + */ + while(parent->_waits.load(std::memory_order_relaxed) > 0) + { +#if LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING + std::cout << "*** DTP group_complete blocks on waits for group " << parent << std::endl; +#endif + g.unlock(); + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + g.lock(); + } + // Now submit all delayed work + while(parent->_work_items_delayed.count > 0) + { + i = parent->_work_items_delayed.front; + _remove_from_list(parent->_work_items_delayed, i); + auto r = submit(g, parent, {&i, 1}); +#if LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING + std::cout << "*** DTP group_complete submits delayed work item " << i << " for group " << parent << " which saw error "; + if(r) + { + std::cout << "none" << std::endl; + } + else + { + std::cout << r.error().message() << std::endl; + } +#endif + if(!r) + { + parent->_work_items_delayed = {}; + (void) stop(g, parent, std::move(r)); + break; + } + } + } + } + } + inline void global_dynamic_thread_pool_impl::_work_item_next(_lock_guard &g, dynamic_thread_pool_group::work_item *workitem) noexcept + { + assert(workitem->_nextwork != -1); + if(workitem->_nextwork == 0) + { + deadline d(std::chrono::seconds(0)); + workitem->_nextwork = workitem->next(d); + auto r = _prepare_work_item_delay(workitem, workitem->_parent.load(std::memory_order_relaxed)->_grouph, d); + if(!r) + { + (void) stop(g, workitem->_parent.load(std::memory_order_relaxed), std::move(r)); + _work_item_done(g, workitem); + return; + } + } + if(-1 == workitem->_nextwork) + { + _work_item_done(g, workitem); + return; + } + _submit_work_item(g, workitem); + } + + inline result<void> global_dynamic_thread_pool_impl::stop(_lock_guard &g, dynamic_thread_pool_group_impl *group, result<void> err) noexcept + { + (void) g; + if(group->_abnormal_completion_cause) + { + group->_abnormal_completion_cause = std::move(err); + } + group->_stopping.store(true, std::memory_order_release); + return success(); + } + + + inline result<void> global_dynamic_thread_pool_impl::wait(_lock_guard &g, bool reap, dynamic_thread_pool_group_impl *group, deadline d) noexcept + { + LLFIO_DEADLINE_TO_SLEEP_INIT(d); + if(!d || d.nsecs > 0) + { + /* To ensure forward progress, we need to gate new waits during delayed work submission. + Otherwise waits may never exit if the window where _work_items_active.count == 0 is + missed. + */ + while(group->_work_items_delayed.count > 0) + { + g.unlock(); + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + g.lock(); + } + group->_waits.fetch_add(1, std::memory_order_release); + auto unwaitcount = make_scope_exit([&]() noexcept { group->_waits.fetch_sub(1, std::memory_order_release); }); +#if LLFIO_DYNAMIC_THREAD_POOL_GROUP_USING_GCD + while(group->_work_items_active.count > 0) + { + LLFIO_DEADLINE_TO_TIMEOUT_LOOP(d); + dispatch_time_t timeout = DISPATCH_TIME_FOREVER; + if(d) + { + std::chrono::nanoseconds duration; + LLFIO_DEADLINE_TO_PARTIAL_TIMEOUT(duration, d); + timeout = dispatch_time(DISPATCH_TIME_NOW, duration.count()); + } + g.unlock(); + dispatch_group_wait(group->_grouph, timeout); + g.lock(); + // if(1 == group->_work_items_active.count) + //{ + // std::cout << "*** wait item remaining is " << group->_work_items_active.front << std::endl; + // std::this_thread::sleep_for(std::chrono::seconds(1)); + //} + } +#elif defined(_WIN32) + auto &tls = detail::global_dynamic_thread_pool_thread_local_state(); + if(tls.current_callback_instance != nullptr) + { + // I am being called from within a thread worker. Tell + // the thread pool that I am not going to exit promptly. + CallbackMayRunLong(tls.current_callback_instance); + } + // Is this a cancellation? + if(group->_stopping.load(std::memory_order_relaxed)) + { + while(group->_work_items_active.count > 0) + { + auto *i = group->_work_items_active.front; + if(nullptr != i->_internalworkh) + { + if(0 == i->_internalworkh_inuse) + { + i->_internalworkh_inuse = 1; + } + g.unlock(); + WaitForThreadpoolWorkCallbacks((PTP_WORK) i->_internalworkh, true); + g.lock(); + if(i->_internalworkh_inuse == 2) + { + if(nullptr != i->_internalworkh) + { + CloseThreadpoolWork((PTP_WORK) i->_internalworkh); + i->_internalworkh = nullptr; + } + if(nullptr != i->_internaltimerh) + { + CloseThreadpoolTimer((PTP_TIMER) i->_internaltimerh); + i->_internaltimerh = nullptr; + } + } + i->_internalworkh_inuse = 0; + } + if(nullptr != i->_internaltimerh) + { + if(0 == i->_internalworkh_inuse) + { + i->_internalworkh_inuse = 1; + } + g.unlock(); + WaitForThreadpoolTimerCallbacks((PTP_TIMER) i->_internaltimerh, true); + g.lock(); + if(i->_internalworkh_inuse == 2) + { + if(nullptr != i->_internalworkh) + { + CloseThreadpoolWork((PTP_WORK) i->_internalworkh); + i->_internalworkh = nullptr; + } + if(nullptr != i->_internaltimerh) + { + CloseThreadpoolTimer((PTP_TIMER) i->_internaltimerh); + i->_internaltimerh = nullptr; + } + } + i->_internalworkh_inuse = 0; + } + if(group->_work_items_active.count > 0 && group->_work_items_active.front == i) + { + // This item got cancelled before it started + _work_item_done(g, group->_work_items_active.front); + } + } + assert(!group->_stopping.load(std::memory_order_relaxed)); + } + else if(!d) + { + while(group->_work_items_active.count > 0) + { + auto *i = group->_work_items_active.front; + if(nullptr != i->_internalworkh) + { + if(0 == i->_internalworkh_inuse) + { + i->_internalworkh_inuse = 1; + } + g.unlock(); + WaitForThreadpoolWorkCallbacks((PTP_WORK) i->_internalworkh, false); + g.lock(); + if(i->_internalworkh_inuse == 2) + { + if(nullptr != i->_internalworkh) + { + CloseThreadpoolWork((PTP_WORK) i->_internalworkh); + i->_internalworkh = nullptr; + } + if(nullptr != i->_internaltimerh) + { + CloseThreadpoolTimer((PTP_TIMER) i->_internaltimerh); + i->_internaltimerh = nullptr; + } + } + i->_internalworkh_inuse = 0; + } + if(nullptr != i->_internaltimerh) + { + if(0 == i->_internalworkh_inuse) + { + i->_internalworkh_inuse = 1; + } + g.unlock(); + WaitForThreadpoolTimerCallbacks((PTP_TIMER) i->_internaltimerh, false); + g.lock(); + if(i->_internalworkh_inuse == 2) + { + if(nullptr != i->_internalworkh) + { + CloseThreadpoolWork((PTP_WORK) i->_internalworkh); + i->_internalworkh = nullptr; + } + if(nullptr != i->_internaltimerh) + { + CloseThreadpoolTimer((PTP_TIMER) i->_internaltimerh); + i->_internaltimerh = nullptr; + } + } + i->_internalworkh_inuse = 0; + } + } + } + else + { + while(group->_work_items_active.count > 0) + { + LLFIO_DEADLINE_TO_TIMEOUT_LOOP(d); + g.unlock(); + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + g.lock(); + } + } +#else +#if 0 + if(group->_stopping.load(std::memory_order_relaxed)) + { + // Kill all work items not currently being executed immediately + for(bool done = false; !done;) + { + done = true; + for(auto *p = group->_work_items_active.front; p != nullptr; p = p->_next) + { + if(p->_internalworkh == nullptr) + { + _remove_from_list(group->_work_items_active, p); + _append_to_list(group->_work_items_done, p); + done = false; + break; + } + } + } + } +#endif + while(group->_work_items_active.count > 0) + { + LLFIO_DEADLINE_TO_TIMEOUT_LOOP(d); + g.unlock(); + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + g.lock(); + } +#endif + } + if(group->_work_items_active.count > 0) + { + return errc::timed_out; + } + if(reap) + { + return std::move(group->_abnormal_completion_cause); + } + return success(); + } + + inline void global_dynamic_thread_pool_impl::_timerthread(dynamic_thread_pool_group::work_item *workitem, threadh_type /*unused*/) + { + LLFIO_LOG_FUNCTION_CALL(this); + assert(workitem->_nextwork != -1); + _lock_guard g(workitem->_parent.load(std::memory_order_relaxed)->_lock); // lock group + // std::cout << "*** _timerthread " << workitem << std::endl; + if(workitem->_parent.load(std::memory_order_relaxed)->_stopping.load(std::memory_order_relaxed)) + { + _work_item_done(g, workitem); + return; + } + if(workitem->_timepoint1 != std::chrono::steady_clock::time_point()) + { +#if LLFIO_DYNAMIC_THREAD_POOL_GROUP_USING_GCD + auto now = std::chrono::steady_clock::now(); + if(workitem->_timepoint1 - now > std::chrono::seconds(0)) + { + // Timer fired short, so schedule it again + _submit_work_item(g, workitem); + return; + } +#endif + workitem->_timepoint1 = {}; + } + if(workitem->_timepoint2 != std::chrono::system_clock::time_point()) + { +#if LLFIO_DYNAMIC_THREAD_POOL_GROUP_USING_GCD + auto now = std::chrono::system_clock::now(); + if(workitem->_timepoint2 - now > std::chrono::seconds(0)) + { + // Timer fired short, so schedule it again + _submit_work_item(g, workitem); + return; + } +#endif + workitem->_timepoint2 = {}; + } + _work_item_next(g, workitem); + } + + // Worker thread entry point + inline void global_dynamic_thread_pool_impl::_workerthread(dynamic_thread_pool_group::work_item *workitem, threadh_type selfthreadh) + { + LLFIO_LOG_FUNCTION_CALL(this); + //{ + // _lock_guard g(workitem->_parent.load(std::memory_order_relaxed)->_lock); + // std::cout << "*** _workerthread " << workitem << " begins with work " << workitem->_nextwork << std::endl; + //} + assert(workitem->_nextwork != -1); + assert(workitem->_nextwork != 0); + if(workitem->_parent.load(std::memory_order_relaxed)->_stopping.load(std::memory_order_relaxed)) + { + _lock_guard g(workitem->_parent.load(std::memory_order_relaxed)->_lock); // lock group + _work_item_done(g, workitem); + return; + } + auto &tls = detail::global_dynamic_thread_pool_thread_local_state(); + auto old_thread_local_state = tls; + tls.workitem = workitem; + tls.current_callback_instance = selfthreadh; + tls.nesting_level = workitem->_parent.load(std::memory_order_relaxed)->_nesting_level + 1; + auto r = (*workitem)(workitem->_nextwork); + workitem->_nextwork = 0; // call next() next time + tls = old_thread_local_state; + _lock_guard g(workitem->_parent.load(std::memory_order_relaxed)->_lock); // lock group + // std::cout << "*** _workerthread " << workitem << " ends with work " << workitem->_nextwork << std::endl; + if(!r) + { + (void) stop(g, workitem->_parent.load(std::memory_order_relaxed), std::move(r)); + _work_item_done(g, workitem); + workitem = nullptr; + } + else if(workitem->_parent.load(std::memory_order_relaxed)->_stopping.load(std::memory_order_relaxed)) + { + _work_item_done(g, workitem); + } + else + { + _work_item_next(g, workitem); + } + } +} // namespace detail + + +/****************************************** io_aware_work_item *********************************************/ + +LLFIO_HEADERS_ONLY_MEMFUNC_SPEC dynamic_thread_pool_group::io_aware_work_item::io_aware_work_item(span<io_handle_awareness> hs) + : _handles([](span<io_handle_awareness> hs) -> span<io_handle_awareness> { + float all = 0; + for(auto &i : hs) + { + all += i.reads + i.writes + i.barriers; + } + for(auto &i : hs) + { + if(all == 0.0f) + { + i.reads = i.writes = 0.5f; + i.barriers = 0.0f; + } + else + { + i.reads /= all; + i.writes /= all; + i.barriers /= all; + } + } + auto &impl = detail::global_dynamic_thread_pool(); + detail::global_dynamic_thread_pool_impl::_lock_guard g(impl.io_aware_work_item_handles_lock); + for(auto &h : hs) + { + if(!h.h->is_seekable()) + { + throw std::runtime_error("Supplied handle is not seekable"); + } + auto *fh = static_cast<file_handle *>(h.h); + auto unique_id = fh->unique_id(); + auto it = impl.io_aware_work_item_handles.find(unique_id); + if(it == impl.io_aware_work_item_handles.end()) + { + it = impl.io_aware_work_item_handles.emplace(unique_id, detail::global_dynamic_thread_pool_impl::io_aware_work_item_statfs{}).first; + auto r = it->second.statfs.fill(*fh, statfs_t::want::iosinprogress | statfs_t::want::iosbusytime); + if(!r || it->second.statfs.f_iosinprogress == (uint32_t) -1) + { + impl.io_aware_work_item_handles.erase(it); + if(!r) + { + r.value(); + } + throw std::runtime_error("statfs::f_iosinprogress unavailable for supplied handle"); + } + it->second.last_updated = std::chrono::steady_clock::now(); + } + it->second.refcount++; + h._internal = &*it; + } + return hs; + }(hs)) +{ + LLFIO_LOG_FUNCTION_CALL(this); +} + +LLFIO_HEADERS_ONLY_MEMFUNC_SPEC dynamic_thread_pool_group::io_aware_work_item::~io_aware_work_item() +{ + LLFIO_LOG_FUNCTION_CALL(this); + auto &impl = detail::global_dynamic_thread_pool(); + detail::global_dynamic_thread_pool_impl::_lock_guard g(impl.io_aware_work_item_handles_lock); + using value_type = decltype(impl.io_aware_work_item_handles)::value_type; + for(auto &h : _handles) + { + auto *i = (value_type *) h._internal; + if(0 == --i->second.refcount) + { + impl.io_aware_work_item_handles.erase(i->first); + } + } +} + +LLFIO_HEADERS_ONLY_MEMFUNC_SPEC intptr_t dynamic_thread_pool_group::io_aware_work_item::next(deadline &d) noexcept +{ + LLFIO_LOG_FUNCTION_CALL(this); + { + auto &impl = detail::global_dynamic_thread_pool(); + auto now = std::chrono::steady_clock::now(); + detail::global_dynamic_thread_pool_impl::_lock_guard g(impl.io_aware_work_item_handles_lock); + using value_type = decltype(impl.io_aware_work_item_handles)::value_type; + for(auto &h : _handles) + { + auto *i = (value_type *) h._internal; + if(std::chrono::duration_cast<std::chrono::milliseconds>(now - i->second.last_updated) >= std::chrono::milliseconds(100)) + { + // auto old_iosinprogress = i->second.statfs.f_iosinprogress; + auto elapsed = now - i->second.last_updated; + (void) i->second.statfs.fill(*h.h, statfs_t::want::iosinprogress | statfs_t::want::iosbusytime); + i->second.last_updated = now; + + if(elapsed > std::chrono::seconds(5)) + { + i->second.average_busy = i->second.statfs.f_iosbusytime; + i->second.average_queuedepth = (float) i->second.statfs.f_iosinprogress; + } + else + { + i->second.average_busy = (i->second.average_busy * 0.9f) + (i->second.statfs.f_iosbusytime * 0.1f); + i->second.average_queuedepth = (i->second.average_queuedepth * 0.9f) + (i->second.statfs.f_iosinprogress * 0.1f); + } + if(i->second.average_busy < this->max_iosbusytime && i->second.average_queuedepth < this->min_iosinprogress) + { + i->second.default_deadline = std::chrono::seconds(0); // remove pacing + } + else if(i->second.average_queuedepth > this->max_iosinprogress) + { + if(0 == i->second.default_deadline.nsecs) + { + i->second.default_deadline = std::chrono::milliseconds(1); // start with 1ms, it'll reduce from there if needed + } + else if((i->second.default_deadline.nsecs >> 4) > 0) + { + i->second.default_deadline.nsecs += i->second.default_deadline.nsecs >> 4; + } + else + { + i->second.default_deadline.nsecs++; + } + } + else if(i->second.average_queuedepth < this->min_iosinprogress) + { + if(i->second.default_deadline.nsecs > (i->second.default_deadline.nsecs >> 4) && (i->second.default_deadline.nsecs >> 4) > 0) + { + i->second.default_deadline.nsecs -= i->second.default_deadline.nsecs >> 4; + } + else if(i->second.default_deadline.nsecs > 1) + { + i->second.default_deadline.nsecs--; + } + } + } + if(d.nsecs < i->second.default_deadline.nsecs) + { + d = i->second.default_deadline; + } + } + } + return io_aware_next(d); +} + + +LLFIO_V2_NAMESPACE_END diff --git a/include/llfio/v2.0/detail/impl/posix/statfs.ipp b/include/llfio/v2.0/detail/impl/posix/statfs.ipp index b3e819b0..092ca9f2 100644 --- a/include/llfio/v2.0/detail/impl/posix/statfs.ipp +++ b/include/llfio/v2.0/detail/impl/posix/statfs.ipp @@ -1,5 +1,5 @@ /* Information about the volume storing a file -(C) 2016-2017 Niall Douglas <http://www.nedproductions.biz/> (5 commits) +(C) 2016-2020 Niall Douglas <http://www.nedproductions.biz/> (5 commits) File Created: Jan 2016 @@ -25,10 +25,15 @@ Distributed under the Boost Software License, Version 1.0. #include "../../../handle.hpp" #include "../../../statfs.hpp" +#include <chrono> +#include <mutex> +#include <vector> + #include <sys/mount.h> #ifdef __linux__ #include <mntent.h> #include <sys/statfs.h> +#include <sys/sysmacros.h> #endif LLFIO_V2_NAMESPACE_BEGIN @@ -37,186 +42,189 @@ LLFIO_HEADERS_ONLY_MEMFUNC_SPEC result<size_t> statfs_t::fill(const handle &h, s { size_t ret = 0; #ifdef __linux__ - struct statfs64 s - { - }; - memset(&s, 0, sizeof(s)); - if(-1 == fstatfs64(h.native_handle().fd, &s)) - { - return posix_error(); - } - if(!!(wanted & want::bsize)) + if(!!(wanted & ~(want::iosinprogress | want::iosbusytime))) { - f_bsize = s.f_bsize; - ++ret; - } - if(!!(wanted & want::iosize)) - { - f_iosize = s.f_frsize; - ++ret; - } - if(!!(wanted & want::blocks)) - { - f_blocks = s.f_blocks; - ++ret; - } - if(!!(wanted & want::bfree)) - { - f_bfree = s.f_bfree; - ++ret; - } - if(!!(wanted & want::bavail)) - { - f_bavail = s.f_bavail; - ++ret; - } - if(!!(wanted & want::files)) - { - f_files = s.f_files; - ++ret; - } - if(!!(wanted & want::ffree)) - { - f_ffree = s.f_ffree; - ++ret; - } - if(!!(wanted & want::namemax)) - { - f_namemax = s.f_namelen; - ++ret; - } - // if(!!(wanted&&want::owner)) { f_owner =s.f_owner; ++ret; } - if(!!(wanted & want::fsid)) - { - f_fsid[0] = static_cast<unsigned>(s.f_fsid.__val[0]); - f_fsid[1] = static_cast<unsigned>(s.f_fsid.__val[1]); - ++ret; - } - if(!!(wanted & want::flags) || !!(wanted & want::fstypename) || !!(wanted & want::mntfromname) || !!(wanted & want::mntonname)) - { - try + struct statfs64 s { - struct mountentry - { - std::string mnt_fsname, mnt_dir, mnt_type, mnt_opts; - mountentry(const char *a, const char *b, const char *c, const char *d) - : mnt_fsname(a) - , mnt_dir(b) - , mnt_type(c) - , mnt_opts(d) - { - } - }; - std::vector<std::pair<mountentry, struct statfs64>> mountentries; + }; + memset(&s, 0, sizeof(s)); + if(-1 == fstatfs64(h.native_handle().fd, &s)) + { + return posix_error(); + } + if(!!(wanted & want::bsize)) + { + f_bsize = s.f_bsize; + ++ret; + } + if(!!(wanted & want::iosize)) + { + f_iosize = s.f_frsize; + ++ret; + } + if(!!(wanted & want::blocks)) + { + f_blocks = s.f_blocks; + ++ret; + } + if(!!(wanted & want::bfree)) + { + f_bfree = s.f_bfree; + ++ret; + } + if(!!(wanted & want::bavail)) + { + f_bavail = s.f_bavail; + ++ret; + } + if(!!(wanted & want::files)) + { + f_files = s.f_files; + ++ret; + } + if(!!(wanted & want::ffree)) + { + f_ffree = s.f_ffree; + ++ret; + } + if(!!(wanted & want::namemax)) + { + f_namemax = s.f_namelen; + ++ret; + } + // if(!!(wanted&&want::owner)) { f_owner =s.f_owner; ++ret; } + if(!!(wanted & want::fsid)) + { + f_fsid[0] = static_cast<unsigned>(s.f_fsid.__val[0]); + f_fsid[1] = static_cast<unsigned>(s.f_fsid.__val[1]); + ++ret; + } + if(!!(wanted & want::flags) || !!(wanted & want::fstypename) || !!(wanted & want::mntfromname) || !!(wanted & want::mntonname)) + { + try { - // Need to parse mount options on Linux - FILE *mtab = setmntent("/etc/mtab", "r"); - if(mtab == nullptr) - { - mtab = setmntent("/proc/mounts", "r"); - } - if(mtab == nullptr) - { - return posix_error(); - } - auto unmtab = make_scope_exit([mtab]() noexcept { endmntent(mtab); }); - struct mntent m + struct mountentry { + std::string mnt_fsname, mnt_dir, mnt_type, mnt_opts; + mountentry(const char *a, const char *b, const char *c, const char *d) + : mnt_fsname(a) + , mnt_dir(b) + , mnt_type(c) + , mnt_opts(d) + { + } }; - char buffer[32768]; - while(getmntent_r(mtab, &m, buffer, sizeof(buffer)) != nullptr) + std::vector<std::pair<mountentry, struct statfs64>> mountentries; { - struct statfs64 temp + // Need to parse mount options on Linux + FILE *mtab = setmntent("/etc/mtab", "r"); + if(mtab == nullptr) + { + mtab = setmntent("/proc/mounts", "r"); + } + if(mtab == nullptr) + { + return posix_error(); + } + auto unmtab = make_scope_exit([mtab]() noexcept { endmntent(mtab); }); + struct mntent m { }; - memset(&temp, 0, sizeof(temp)); - // std::cout << m.mnt_fsname << "," << m.mnt_dir << "," << m.mnt_type << "," << m.mnt_opts << std::endl; - if(0 == statfs64(m.mnt_dir, &temp)) + char buffer[32768]; + while(getmntent_r(mtab, &m, buffer, sizeof(buffer)) != nullptr) { - // std::cout << " " << temp.f_fsid.__val[0] << temp.f_fsid.__val[1] << " =? " << s.f_fsid.__val[0] << s.f_fsid.__val[1] << std::endl; - if(temp.f_type == s.f_type && (memcmp(&temp.f_fsid, &s.f_fsid, sizeof(s.f_fsid)) == 0)) + struct statfs64 temp + { + }; + memset(&temp, 0, sizeof(temp)); + // std::cout << m.mnt_fsname << "," << m.mnt_dir << "," << m.mnt_type << "," << m.mnt_opts << std::endl; + if(0 == statfs64(m.mnt_dir, &temp)) { - mountentries.emplace_back(mountentry(m.mnt_fsname, m.mnt_dir, m.mnt_type, m.mnt_opts), temp); + // std::cout << " " << temp.f_fsid.__val[0] << temp.f_fsid.__val[1] << " =? " << s.f_fsid.__val[0] << s.f_fsid.__val[1] << std::endl; + if(temp.f_type == s.f_type && (memcmp(&temp.f_fsid, &s.f_fsid, sizeof(s.f_fsid)) == 0)) + { + mountentries.emplace_back(mountentry(m.mnt_fsname, m.mnt_dir, m.mnt_type, m.mnt_opts), temp); + } } } } - } #ifndef LLFIO_COMPILING_FOR_GCOV - if(mountentries.empty()) - { - return errc::no_such_file_or_directory; - } - /* Choose the mount entry with the most closely matching statfs. You can't choose - exclusively based on mount point because of bind mounts. Note that we have a - particular problem in Docker: + if(mountentries.empty()) + { + return errc::no_such_file_or_directory; + } + /* Choose the mount entry with the most closely matching statfs. You can't choose + exclusively based on mount point because of bind mounts. Note that we have a + particular problem in Docker: - rootfs / rootfs rw 0 0 - overlay / overlay rw,relatime,lowerdir=... 0 0 - /dev/sda3 /etc xfs rw,relatime,... 0 0 - tmpfs /dev tmpfs rw,nosuid,... 0 0 - tmpfs /proc/acpi tmpfs rw,nosuid,... 0 0 - ... + rootfs / rootfs rw 0 0 + overlay / overlay rw,relatime,lowerdir=... 0 0 + /dev/sda3 /etc xfs rw,relatime,... 0 0 + tmpfs /dev tmpfs rw,nosuid,... 0 0 + tmpfs /proc/acpi tmpfs rw,nosuid,... 0 0 + ... - If f_type and f_fsid are identical for the statfs for the mount as for our file, - then you will get multiple mountentries, and there is no obvious way of - disambiguating them. What we do is match mount based on the longest match - of the mount point with the current path of our file. - */ - if(mountentries.size() > 1) - { - OUTCOME_TRY(auto currentfilepath_, h.current_path()); - string_view currentfilepath(currentfilepath_.native()); - std::vector<std::pair<size_t, size_t>> scores(mountentries.size()); - //std::cout << "*** For matching mount entries to file with path " << currentfilepath << ":\n"; - for(size_t n = 0; n < mountentries.size(); n++) + If f_type and f_fsid are identical for the statfs for the mount as for our file, + then you will get multiple mountentries, and there is no obvious way of + disambiguating them. What we do is match mount based on the longest match + of the mount point with the current path of our file. + */ + if(mountentries.size() > 1) { - scores[n].first = - (currentfilepath.substr(0, mountentries[n].first.mnt_dir.size()) == mountentries[n].first.mnt_dir) ? mountentries[n].first.mnt_dir.size() : 0; - //std::cout << "*** Mount entry " << mountentries[n].first.mnt_dir << " get score " << scores[n].first << std::endl; - scores[n].second = n; + OUTCOME_TRY(auto currentfilepath_, h.current_path()); + string_view currentfilepath(currentfilepath_.native()); + std::vector<std::pair<size_t, size_t>> scores(mountentries.size()); + // std::cout << "*** For matching mount entries to file with path " << currentfilepath << ":\n"; + for(size_t n = 0; n < mountentries.size(); n++) + { + scores[n].first = + (currentfilepath.substr(0, mountentries[n].first.mnt_dir.size()) == mountentries[n].first.mnt_dir) ? mountentries[n].first.mnt_dir.size() : 0; + // std::cout << "*** Mount entry " << mountentries[n].first.mnt_dir << " get score " << scores[n].first << std::endl; + scores[n].second = n; + } + std::sort(scores.begin(), scores.end()); + // Choose the item whose mnt_dir most matched the current path for our file. + auto temp(std::move(mountentries[scores.back().second])); + mountentries.clear(); + mountentries.push_back(std::move(temp)); } - std::sort(scores.begin(), scores.end()); - // Choose the item whose mnt_dir most matched the current path for our file. - auto temp(std::move(mountentries[scores.back().second])); - mountentries.clear(); - mountentries.push_back(std::move(temp)); - } #endif - if(!!(wanted & want::flags)) - { - f_flags.rdonly = static_cast<uint32_t>((s.f_flags & MS_RDONLY) != 0); - f_flags.noexec = static_cast<uint32_t>((s.f_flags & MS_NOEXEC) != 0); - f_flags.nosuid = static_cast<uint32_t>((s.f_flags & MS_NOSUID) != 0); - f_flags.acls = static_cast<uint32_t>(std::string::npos != mountentries.front().first.mnt_opts.find("acl") && - std::string::npos == mountentries.front().first.mnt_opts.find("noacl")); - f_flags.xattr = static_cast<uint32_t>(std::string::npos != mountentries.front().first.mnt_opts.find("xattr") && - std::string::npos == mountentries.front().first.mnt_opts.find("nouser_xattr")); - // out.f_flags.compression=0; - // Those filing systems supporting FALLOC_FL_PUNCH_HOLE - f_flags.extents = static_cast<uint32_t>(mountentries.front().first.mnt_type == "btrfs" || mountentries.front().first.mnt_type == "ext4" || - mountentries.front().first.mnt_type == "xfs" || mountentries.front().first.mnt_type == "tmpfs"); - ++ret; - } - if(!!(wanted & want::fstypename)) - { - f_fstypename = mountentries.front().first.mnt_type; - ++ret; - } - if(!!(wanted & want::mntfromname)) - { - f_mntfromname = mountentries.front().first.mnt_fsname; - ++ret; + if(!!(wanted & want::flags)) + { + f_flags.rdonly = static_cast<uint32_t>((s.f_flags & MS_RDONLY) != 0); + f_flags.noexec = static_cast<uint32_t>((s.f_flags & MS_NOEXEC) != 0); + f_flags.nosuid = static_cast<uint32_t>((s.f_flags & MS_NOSUID) != 0); + f_flags.acls = static_cast<uint32_t>(std::string::npos != mountentries.front().first.mnt_opts.find("acl") && + std::string::npos == mountentries.front().first.mnt_opts.find("noacl")); + f_flags.xattr = static_cast<uint32_t>(std::string::npos != mountentries.front().first.mnt_opts.find("xattr") && + std::string::npos == mountentries.front().first.mnt_opts.find("nouser_xattr")); + // out.f_flags.compression=0; + // Those filing systems supporting FALLOC_FL_PUNCH_HOLE + f_flags.extents = static_cast<uint32_t>(mountentries.front().first.mnt_type == "btrfs" || mountentries.front().first.mnt_type == "ext4" || + mountentries.front().first.mnt_type == "xfs" || mountentries.front().first.mnt_type == "tmpfs"); + ++ret; + } + if(!!(wanted & want::fstypename)) + { + f_fstypename = mountentries.front().first.mnt_type; + ++ret; + } + if(!!(wanted & want::mntfromname)) + { + f_mntfromname = mountentries.front().first.mnt_fsname; + ++ret; + } + if(!!(wanted & want::mntonname)) + { + f_mntonname = mountentries.front().first.mnt_dir; + ++ret; + } } - if(!!(wanted & want::mntonname)) + catch(...) { - f_mntonname = mountentries.front().first.mnt_dir; - ++ret; + return error_from_exception(); } } - catch(...) - { - return error_from_exception(); - } } #else struct statfs s; @@ -311,7 +319,176 @@ LLFIO_HEADERS_ONLY_MEMFUNC_SPEC result<size_t> statfs_t::fill(const handle &h, s ++ret; } #endif + if(!!(wanted & want::iosinprogress) || !!(wanted & want::iosbusytime)) + { + OUTCOME_TRY(auto ios, _fill_ios(h, f_mntfromname)); + if(!!(wanted & want::iosinprogress)) + { + f_iosinprogress = ios.first; + ++ret; + } + if(!!(wanted & want::iosbusytime)) + { + f_iosbusytime = ios.second; + ++ret; + } + } return ret; } +/******************************************* statfs_t ************************************************/ + +LLFIO_HEADERS_ONLY_MEMFUNC_SPEC result<std::pair<uint32_t, float>> statfs_t::_fill_ios(const handle &h, const std::string & /*unused*/) noexcept +{ + try + { +#ifdef __linux__ + struct stat s + { + }; + memset(&s, 0, sizeof(s)); + + if(-1 == ::fstat(h.native_handle().fd, &s)) + { + if(!h.is_symlink() || EBADF != errno) + { + return posix_error(); + } + // This is a hack, but symlink_handle includes this first so there is a chicken and egg dependency problem + OUTCOME_TRY(detail::stat_from_symlink(s, h)); + } + + static struct last_reading_t + { + struct item + { + dev_t st_dev; + size_t millis{0}; + std::chrono::steady_clock::time_point last_updated; + + uint32_t f_iosinprogress{0}; + float f_iosbusytime{0}; + }; + std::mutex lock; + std::vector<item> items; + } last_reading; + auto now = std::chrono::steady_clock::now(); + { + std::lock_guard<std::mutex> g(last_reading.lock); + for(auto &i : last_reading.items) + { + if(i.st_dev == s.st_dev) + { + if(std::chrono::duration_cast<std::chrono::milliseconds>(now - i.last_updated) < std::chrono::milliseconds(100)) + { + return {i.f_iosinprogress, i.f_iosbusytime}; // exit with old readings + } + break; + } + } + } + try + { + int fd = ::open("/proc/diskstats", O_RDONLY); + if(fd >= 0) + { + std::string diskstats; + diskstats.resize(4096); + for(;;) + { + auto read = ::read(fd, (char *) diskstats.data(), diskstats.size()); + if(read < 0) + { + return posix_error(); + } + if(read < (ssize_t) diskstats.size()) + { + ::close(fd); + diskstats.resize(read); + break; + } + try + { + diskstats.resize(diskstats.size() << 1); + } + catch(...) + { + ::close(fd); + throw; + } + } + /* Format is (https://www.kernel.org/doc/Documentation/iostats.txt): + <dev id major> <dev id minor> <device name> 01 02 03 04 05 06 07 08 09 10 ... + + Field 9 is i/o's currently in progress. + Field 10 is milliseconds spent doing i/o (cumulative). + */ + auto match_line = [&](string_view sv) { + int major = 0, minor = 0; + sscanf(sv.data(), "%d %d", &major, &minor); + // printf("Does %d,%d match %d,%d?\n", major, minor, major(s.st_dev), minor(s.st_dev)); + return (makedev(major, minor) == s.st_dev); + }; + for(size_t is = 0, ie = diskstats.find(10); ie != diskstats.npos; is = ie + 1, ie = diskstats.find(10, is)) + { + auto sv = string_view(diskstats).substr(is, ie - is); + if(match_line(sv)) + { + int major = 0, minor = 0; + char devicename[64]; + size_t fields[12]; + sscanf(sv.data(), "%d %d %s %zu %zu %zu %zu %zu %zu %zu %zu %zu %zu", &major, &minor, devicename, fields + 0, fields + 1, fields + 2, fields + 3, + fields + 4, fields + 5, fields + 6, fields + 7, fields + 8, fields + 9); + std::lock_guard<std::mutex> g(last_reading.lock); + auto it = last_reading.items.begin(); + for(; it != last_reading.items.end(); ++it) + { + if(it->st_dev == s.st_dev) + { + break; + } + } + if(it == last_reading.items.end()) + { + last_reading.items.emplace_back(); + it = --last_reading.items.end(); + it->st_dev = s.st_dev; + it->millis = fields[9]; + } + else + { + auto timediff = std::chrono::duration_cast<std::chrono::milliseconds>(now - it->last_updated); + it->f_iosbusytime = std::min((float) ((double) (fields[9] - it->millis) / timediff.count()), 1.0f); + it->millis = fields[9]; + } + it->f_iosinprogress = (uint32_t) fields[8]; + it->last_updated = now; + return {it->f_iosinprogress, it->f_iosbusytime}; + } + } + // It's totally possible that the dev_t reported by stat() + // does not appear in /proc/diskstats, if this occurs then + // return all bits one to indicate soft failure. + } + } + catch(...) + { + return error_from_exception(); + } +#else + /* On FreeBSD, want::iosinprogress and want::iosbusytime could be implemented + using libdevstat. See https://www.freebsd.org/cgi/man.cgi?query=devstat&sektion=3. + Code donations welcome! + + On Mac OS, getting the current i/o wait time appears to be privileged only? + */ +#endif + return {-1, detail::constexpr_float_allbits_set_nan()}; + } + catch(...) + { + return error_from_exception(); + } +} + LLFIO_V2_NAMESPACE_END diff --git a/include/llfio/v2.0/detail/impl/windows/directory_handle.ipp b/include/llfio/v2.0/detail/impl/windows/directory_handle.ipp index 10fcf849..26b3568c 100644 --- a/include/llfio/v2.0/detail/impl/windows/directory_handle.ipp +++ b/include/llfio/v2.0/detail/impl/windows/directory_handle.ipp @@ -25,6 +25,8 @@ http://www.boost.org/LICENSE_1_0.txt) #include "../../../directory_handle.hpp" #include "import.hpp" +#include "../../../file_handle.hpp" + LLFIO_V2_NAMESPACE_BEGIN result<directory_handle> directory_handle::directory(const path_handle &base, path_view_type path, mode _mode, creation _creation, caching _caching, flag flags) noexcept diff --git a/include/llfio/v2.0/detail/impl/windows/file_handle.ipp b/include/llfio/v2.0/detail/impl/windows/file_handle.ipp index 43bbf4df..ae66d46b 100644 --- a/include/llfio/v2.0/detail/impl/windows/file_handle.ipp +++ b/include/llfio/v2.0/detail/impl/windows/file_handle.ipp @@ -25,6 +25,11 @@ Distributed under the Boost Software License, Version 1.0. #include "../../../file_handle.hpp" #include "import.hpp" +#include "../../../statfs.hpp" + +#include <mutex> +#include <vector> + LLFIO_V2_NAMESPACE_BEGIN result<file_handle> file_handle::file(const path_handle &base, file_handle::path_view_type path, file_handle::mode _mode, file_handle::creation _creation, @@ -821,7 +826,7 @@ result<file_handle::extent_pair> file_handle::clone_extents_to(file_handle::exte } done = true; } - //assert(done); + // assert(done); dest_length = destoffset + extent.length; truncate_back_on_failure = false; LLFIO_DEADLINE_TO_TIMEOUT_LOOP(d); @@ -870,4 +875,108 @@ result<file_handle::extent_type> file_handle::zero(file_handle::extent_pair exte return success(); } + +/******************************************* statfs_t ************************************************/ + +LLFIO_HEADERS_ONLY_MEMFUNC_SPEC result<std::pair<uint32_t, float>> statfs_t::_fill_ios(const handle & /*unused*/, const std::string &mntfromname) noexcept +{ + try + { + alignas(8) wchar_t buffer[32769]; + // Firstly open a handle to the volume + OUTCOME_TRY(auto volumeh, file_handle::file({}, mntfromname, handle::mode::none, handle::creation::open_existing, handle::caching::only_metadata)); + // Now ask the volume what physical disks it spans + auto *vde = reinterpret_cast<VOLUME_DISK_EXTENTS *>(buffer); + OVERLAPPED ol{}; + memset(&ol, 0, sizeof(ol)); + ol.Internal = static_cast<ULONG_PTR>(-1); + if(DeviceIoControl(volumeh.native_handle().h, IOCTL_VOLUME_GET_VOLUME_DISK_EXTENTS, nullptr, 0, vde, sizeof(buffer), nullptr, &ol) == 0) + { + if(ERROR_IO_PENDING == GetLastError()) + { + NTSTATUS ntstat = ntwait(volumeh.native_handle().h, ol, deadline()); + if(ntstat != 0) + { + return ntkernel_error(ntstat); + } + } + if(ERROR_SUCCESS != GetLastError()) + { + return win32_error(); + } + } + static struct last_reading_t + { + struct item + { + int64_t ReadTime{0}, WriteTime{0}, IdleTime{0}; + }; + std::mutex lock; + std::vector<item> items; + } last_reading; + + uint32_t iosinprogress = 0; + float iosbusytime = 0; + DWORD disk_extents = vde->NumberOfDiskExtents; + for(DWORD disk_extent = 0; disk_extent < disk_extents; disk_extent++) + { + alignas(8) wchar_t physicaldrivename[32] = L"\\\\.\\PhysicalDrive", *e = physicaldrivename + 17; + const auto DiskNumber = vde->Extents[disk_extent].DiskNumber; + if(DiskNumber >= 100) + { + *e++ = '0' + ((DiskNumber / 100) % 10); + } + if(DiskNumber >= 10) + { + *e++ = '0' + ((DiskNumber / 10) % 10); + } + *e++ = '0' + (DiskNumber % 10); + *e = 0; + OUTCOME_TRY(auto diskh, file_handle::file({}, path_view(physicaldrivename, e - physicaldrivename, path_view::zero_terminated), handle::mode::none, + handle::creation::open_existing, handle::caching::only_metadata)); + ol.Internal = static_cast<ULONG_PTR>(-1); + auto *dp = reinterpret_cast<DISK_PERFORMANCE *>(buffer); + if(DeviceIoControl(diskh.native_handle().h, IOCTL_DISK_PERFORMANCE, nullptr, 0, dp, sizeof(buffer), nullptr, &ol) == 0) + { + if(ERROR_IO_PENDING == GetLastError()) + { + NTSTATUS ntstat = ntwait(diskh.native_handle().h, ol, deadline()); + if(ntstat != 0) + { + return ntkernel_error(ntstat); + } + } + if(ERROR_SUCCESS != GetLastError()) + { + return win32_error(); + } + } + //printf("%llu,%llu,%llu\n", dp->ReadTime.QuadPart, dp->WriteTime.QuadPart, dp->IdleTime.QuadPart); + iosinprogress += dp->QueueDepth; + std::lock_guard<std::mutex> g(last_reading.lock); + if(last_reading.items.size() < DiskNumber + 1) + { + last_reading.items.resize(DiskNumber + 1); + } + else + { + uint64_t rd = (uint64_t) dp->ReadTime.QuadPart - (uint64_t) last_reading.items[DiskNumber].ReadTime; + uint64_t wd = (uint64_t) dp->WriteTime.QuadPart - (uint64_t) last_reading.items[DiskNumber].WriteTime; + uint64_t id = (uint64_t) dp->IdleTime.QuadPart - (uint64_t) last_reading.items[DiskNumber].IdleTime; + iosbusytime += 1 - (float) ((double) id / (rd + wd + id)); + } + last_reading.items[DiskNumber].ReadTime = dp->ReadTime.QuadPart; + last_reading.items[DiskNumber].WriteTime = dp->WriteTime.QuadPart; + last_reading.items[DiskNumber].IdleTime = dp->IdleTime.QuadPart; + } + iosinprogress /= disk_extents; + iosbusytime /= disk_extents; + return {iosinprogress, std::min(iosbusytime, 1.0f)}; + } + catch(...) + { + return error_from_exception(); + } +} + LLFIO_V2_NAMESPACE_END diff --git a/include/llfio/v2.0/detail/impl/windows/statfs.ipp b/include/llfio/v2.0/detail/impl/windows/statfs.ipp index 8b23b4e1..06e6040c 100644 --- a/include/llfio/v2.0/detail/impl/windows/statfs.ipp +++ b/include/llfio/v2.0/detail/impl/windows/statfs.ipp @@ -141,6 +141,13 @@ LLFIO_HEADERS_ONLY_MEMFUNC_SPEC result<size_t> statfs_t::fill(const handle &h, s f_iosize = ffssi->PhysicalBytesPerSectorForPerformance; ++ret; } + if(!!(wanted & want::iosinprogress) || !!(wanted & want::iosbusytime)) + { + if(f_mntfromname.empty()) + { + wanted |= want::mntfromname; + } + } if((wanted & want::mntfromname) || (wanted & want::mntonname)) { // Irrespective we need the path before figuring out the mounted device @@ -240,6 +247,20 @@ LLFIO_HEADERS_ONLY_MEMFUNC_SPEC result<size_t> statfs_t::fill(const handle &h, s break; } } + if(!!(wanted & want::iosinprogress) || !!(wanted & want::iosbusytime)) + { + OUTCOME_TRY(auto ios, _fill_ios(h, f_mntfromname)); + if(!!(wanted & want::iosinprogress)) + { + f_iosinprogress = ios.first; + ++ret; + } + if(!!(wanted & want::iosbusytime)) + { + f_iosbusytime = ios.second; + ++ret; + } + } return ret; } diff --git a/include/llfio/v2.0/dynamic_thread_pool_group.hpp b/include/llfio/v2.0/dynamic_thread_pool_group.hpp new file mode 100644 index 00000000..748bd94c --- /dev/null +++ b/include/llfio/v2.0/dynamic_thread_pool_group.hpp @@ -0,0 +1,484 @@ +/* Dynamic thread pool group +(C) 2020 Niall Douglas <http://www.nedproductions.biz/> (9 commits) +File Created: Dec 2020 + + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License in the accompanying file +Licence.txt or at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + + +Distributed under the Boost Software License, Version 1.0. + (See accompanying file Licence.txt or copy at + http://www.boost.org/LICENSE_1_0.txt) +*/ + +#ifndef LLFIO_DYNAMIC_THREAD_POOL_GROUP_H +#define LLFIO_DYNAMIC_THREAD_POOL_GROUP_H + +#include "deadline.h" + +#include <memory> // for unique_ptr and shared_ptr + +#ifdef _MSC_VER +#pragma warning(push) +#pragma warning(disable : 4251) // dll interface +#pragma warning(disable : 4275) // dll interface +#endif + +LLFIO_V2_NAMESPACE_EXPORT_BEGIN + +class dynamic_thread_pool_group_impl; +class io_handle; + +namespace detail +{ + struct global_dynamic_thread_pool_impl; + LLFIO_HEADERS_ONLY_FUNC_SPEC global_dynamic_thread_pool_impl &global_dynamic_thread_pool() noexcept; +} // namespace detail + +/*! \class dynamic_thread_pool_group +\brief Work group within the global dynamic thread pool. + +Some operating systems provide a per-process global kernel thread pool capable of +dynamically adjusting its kernel thread count to how many of the threads in +the pool are currently blocked. The platform will choose the exact strategy used, +but as an example of a strategy, one might keep creating new kernel threads +so long as the total threads currently running and not blocked on page faults, +i/o or syscalls, is below the hardware concurrency. Similarly, if more threads +are running and not blocked than hardware concurrency, one might remove kernel +threads from executing work. Such a strategy would dynamically increase +concurrency until all CPUs are busy, but reduce concurrency if more work is +being done than CPUs available. + +Such dynamic kernel thread pools are excellent for CPU bound processing, you +simply fire and forget work into them. However, for i/o bound processing, you +must be careful as there are gotchas. For non-seekable i/o, it is very possible +that there could be 100k handles upon which we do i/o. Doing i/o on +100k handles using a dynamic thread pool would in theory cause the creation +of 100k kernel threads, which would not be wise. A much better solution is +to use an `io_multiplexer` to await changes in large sets of i/o handles. + +For seekable i/o, the same problem applies, but worse again: an i/o bound problem +would cause a rapid increase in the number of kernel threads, which by +definition makes i/o even more congested. Basically the system runs off +into pathological performance loss. You must therefore never naively do +i/o bound work (e.g. with memory mapped files) from within a dynamic thread +pool without employing some mechanism to force concurrency downwards if +the backing storage is congested. + +## Work groups + +Instances of this class contain zero or more work items. Each work item +is asked for its next item of work, and if an item of work is available, +that item of work is executed by the global kernel thread pool at a time +of its choosing. It is NEVER possible that any one work item is concurrently +executed at a time, each work item is always sequentially executed with +respect to itself. The only concurrency possible is *across* work items. +Therefore, if you want to execute the same piece of code concurrently, +you need to submit a separate work item for each possible amount of +concurrency (e.g. `std::thread::hardware_concurrency()`). + +You can have as many or as few items of work as you like. You can +dynamically submit additional work items at any time, except when a group +is currently in the process of being stopped. The group of work items can +be waited upon to complete, after which the work group becomes reset as +if back to freshly constructed. You can also stop executing all the work +items in the group, even if they have not fully completed. If any work +item returns a failure, this equals a `stop()`, and the next `wait()` will +return that error. + +Work items may create sub work groups as part of their +operation. If they do so, the work items from such nested work groups are +scheduled preferentially. This ensures good forward progress, so if you +have 100 work items each of which do another 100 work items, you don't get +10,000 slowly progressing work. Rather, the work items in the first set +progress slowly, whereas the work items in the second set progress quickly. + +`work_item::next()` may optionally set a deadline to delay when that work +item ought to be processed again. Deadlines can be relative or absolute. + +## C++ 23 Executors + +As with elsewhere in LLFIO, as a low level facility, we don't implement +https://wg21.link/P0443 Executors, but it is trivially easy to implement +a dynamic equivalent to `std::static_thread_pool` using this class. + +## Implementation notes + +### Microsoft Windows + +On Microsoft Windows, the Win32 thread pool API is used (https://docs.microsoft.com/en-us/windows/win32/procthread/thread-pool-api). +This is an IOCP-aware thread pool which will dynamically increase the number +of kernel threads until none are blocked. If more kernel threads +are running than twice the number of CPUs in the system, the number of kernel +threads is dynamically reduced. The maximum number of kernel threads which +will run simultaneously is 500. Note that the Win32 thread pool is shared +across the process by multiple Windows facilities. + +Note that the Win32 thread pool has built in support for IOCP, so if you +have a custom i/o multiplexer, you can use the global Win32 thread pool +to execute i/o completions handling. See `CreateThreadpoolIo()` for more. + +No dynamic memory allocation is performed by this implementation outside +of the initial `make_dynamic_thread_pool_group()`. The Win32 thread pool +API may perform dynamic memory allocation internally, but that is outside +our control. + +### POSIX + +If an installation of libdispatch is detected by LLFIO cmake during +configuration, it is used preferentially. libdispatch is better known as +Grand Central Dispatch, originally a Mac OS technology but since ported +to a high quality kernel based implementation on recent FreeBSDs, and to +a lower quality userspace based implementation on Linux. Generally +libdispatch should get automatically found on Mac OS without additional +effort; on FreeBSD it may need installing from ports; on Linux you would +need to explicitly install `libdispatch-dev` or the equivalent. You can +disable the automatic discovery in cmake of libdispatch by setting the +cmake variable `LLFIO_DISABLE_LIBDISPATCH` to On. + +### Linux + +If libdispatch is not found, we have a custom Linux only userspace +implementation. A a similar strategy to Microsoft Windows' approach is used. We +dynamically increase the number of kernel threads until none are sleeping +awaiting i/o. If more kernel threads are running than 1.5x the number of +CPUs in the system, the number of kernel threads is dynamically reduced. +For portability, we also gate the maximum number of kernel threads to 500, +except where threads have been detected as being in prolonged wait states. +Note that **all** the kernel threads for the current process are considered, +not just the kernel threads created by this thread pool implementation. +Therefore, if you have alternative thread pool implementations (e.g. OpenMP, +`std::async`), those are also included in the dynamic adjustment. + +As this is wholly implemented by this library, dynamic memory allocation +occurs in the initial `make_dynamic_thread_pool_group()` and per thread +creation, but otherwise the implementation does not perform dynamic memory +allocations. +*/ +class LLFIO_DECL dynamic_thread_pool_group +{ +public: + //! An individual item of work within the work group. + class work_item + { + friend struct detail::global_dynamic_thread_pool_impl; + friend class dynamic_thread_pool_group_impl; + std::atomic<dynamic_thread_pool_group_impl *> _parent{nullptr}; + void *_internalworkh{nullptr}, *_internaltimerh{nullptr}; + work_item *_prev{nullptr}, *_next{nullptr}; + intptr_t _nextwork{-1}; + std::chrono::steady_clock::time_point _timepoint1; + std::chrono::system_clock::time_point _timepoint2; + int _internalworkh_inuse{0}; + + protected: + void *_private{nullptr}; + + constexpr work_item() {} + work_item(const work_item &o) = delete; + work_item(work_item &&o) noexcept + : _parent(o._parent.load(std::memory_order_relaxed)) + , _internalworkh(o._internalworkh) + , _internaltimerh(o._internaltimerh) + , _prev(o._prev) + , _next(o._next) + , _nextwork(o._nextwork) + , _timepoint1(o._timepoint1) + , _timepoint2(o._timepoint2) + , _internalworkh_inuse(o._internalworkh_inuse) + , _private(o._private) + { + assert(o._parent.load(std::memory_order_relaxed) == nullptr); + assert(o._internalworkh == nullptr); + assert(o._internaltimerh == nullptr); + if(o._parent.load(std::memory_order_relaxed) != nullptr || o._internalworkh != nullptr) + { + LLFIO_LOG_FATAL(this, "FATAL: dynamic_thread_pool_group::work_item was relocated in memory during use!"); + abort(); + } + o._prev = o._next = nullptr; + o._nextwork = -1; + o._internalworkh_inuse = 0; + o._private = nullptr; + } + work_item &operator=(const work_item &) = delete; + work_item &operator=(work_item &&) = delete; + + public: + virtual ~work_item() + { + assert(_nextwork == -1); + if(_nextwork != -1) + { + LLFIO_LOG_FATAL(this, "FATAL: dynamic_thread_pool_group::work_item destroyed before all work was done!"); + abort(); + } + assert(_internalworkh == nullptr); + assert(_internaltimerh == nullptr); + assert(_parent == nullptr); + if(_internalworkh != nullptr || _parent != nullptr) + { + LLFIO_LOG_FATAL(this, "FATAL: dynamic_thread_pool_group::work_item destroyed before group_complete() was executed!"); + abort(); + } + } + + //! Returns the parent work group between successful submission and just before `group_complete()`. + dynamic_thread_pool_group *parent() const noexcept { return reinterpret_cast<dynamic_thread_pool_group *>(_parent.load(std::memory_order_relaxed)); } + + /*! Invoked by the i/o thread pool to determine if this work item + has more work to do. + + \return If there is no work _currently_ available to do, but there + might be some later, you should return zero. You will be called again + later after other work has been done. If you return -1, you are + saying that no further work will be done, and the group need never + call you again. If you have more work you want to do, return any + other value. + \param d Optional delay before the next item of work ought to be + executed (return != 0), or `next()` ought to be called again to + determine the next item (return == 0). On entry `d` is set to no + delay, so if you don't modify it, the next item of work occurs + as soon as possible. + + Note that this function is called from multiple kernel threads. + You must NOT do any significant work in this function. + In particular do NOT call any dynamic thread pool group function, + as you will experience deadlock. + + `dynamic_thread_pool_group::current_work_item()` may have any + value during this call. + */ + virtual intptr_t next(deadline &d) noexcept = 0; + + /*! Invoked by the i/o thread pool to perform the next item of work. + + \return Any failure causes all remaining work in this group to + be cancelled as soon as possible. + \param work The value returned by `next()`. + + Note that this function is called from multiple kernel threads, + and may not be the kernel thread from which `next()` + was called. + + `dynamic_thread_pool_group::current_work_item()` will always be + `this` during this call. + */ + virtual result<void> operator()(intptr_t work) noexcept = 0; + + /*! Invoked by the i/o thread pool when all work in this thread + pool group is complete. + + `cancelled` indicates if this is an abnormal completion. If its + error compares equal to `errc::operation_cancelled`, then `stop()` + was called. + + Just before this is called for all work items submitted, the group + becomes reset to fresh, and `parent()` becomes null. You can resubmit + this work item, but do not submit other work items until their + `group_complete()` has been invoked. + + Note that this function is called from multiple kernel threads. + + `dynamic_thread_pool_group::current_work_item()` may have any + value during this call. + */ + virtual void group_complete(const result<void> &cancelled) noexcept { (void) cancelled; } + }; + + /*! \class io_aware_work_item + \brief A work item which paces when it next executes according to i/o congestion. + + Currently there is only a working implementation of this for the Microsoft Windows + and Linux platforms, due to lack of working `statfs_t::f_iosinprogress` on other + platforms. If retrieving that for a seekable handle does not work, the constructor + throws an exception. + + For seekable handles, currently `reads`, `writes` and `barriers` are ignored. We + simply retrieve, periodically, `statfs_t::f_iosinprogress` and `statfs_t::f_iosbusytime` + for the storage devices backing the seekable handle. If the recent averaged i/o wait time exceeds + `max_iosbusytime` and the i/o in progress > `max_iosinprogress`, `next()` will + start setting the default deadline passed to + `io_aware_next()`. Thereafter, every 1/10th of a second, if `statfs_t::f_iosinprogress` + is above `max_iosinprogress`, it will increase the deadline by 1/16th, whereas if it is + below `min_iosinprogress`, it will decrease the deadline by 1/16th. The default deadline + chosen is always the worst of all the + storage devices of all the handles. This will reduce concurrency within the kernel thread pool + in order to reduce congestion on the storage devices. If at any point `statfs_t::f_iosbusytime` + drops below `max_iosbusytime` as averaged across one second, and `statfs_t::f_iosinprogress` drops + below `min_iosinprogress`, the additional + throttling is completely removed. `io_aware_next()` can ignore the default deadline + passed into it, and can set any other deadline. + + For non-seekable handles, the handle must have an i/o multiplexer set upon it, and on + Microsoft Windows, that i/o multiplexer must be utilising the IOCP instance of the + global Win32 thread pool. For each `reads`, `writes` and `barriers` which is non-zero, + a corresponding zero length i/o is constructed and initiated. When the i/o completes, + and all readable handles in the work item's set have data waiting to be read, and all + writable handles in the work item's set have space to allow writes, only then is the + work item invoked with the next piece of work. + + \note Non-seekable handle support is not implemented yet. + */ + class LLFIO_DECL io_aware_work_item : public work_item + { + public: + //! Maximum i/o busyness above which throttling is to begin. + float max_iosbusytime{0.95f}; + //! Minimum i/o in progress to target if `iosbusytime` exceeded. The default of 16 suits SSDs, you want around 4 for spinning rust or NV-RAM. + uint32_t min_iosinprogress{16}; + //! Maximum i/o in progress to target if `iosbusytime` exceeded. The default of 32 suits SSDs, you want around 8 for spinning rust or NV-RAM. +#ifdef _WIN32 + uint32_t max_iosinprogress{1}; // windows appears to do a lot of i/o coalescing +#else + uint32_t max_iosinprogress{32}; +#endif + //! Information about an i/o handle this work item will use + struct io_handle_awareness + { + //! An i/o handle this work item will use + io_handle *h{nullptr}; + //! The relative amount of reading done by this work item from the handle. + float reads{0}; + //! The relative amount of writing done by this work item to the handle. + float writes{0}; + //! The relative amount of write barriering done by this work item to the handle. + float barriers{0}; + + void *_internal{nullptr}; + }; + + private: + const span<io_handle_awareness> _handles; + + LLFIO_HEADERS_ONLY_VIRTUAL_SPEC intptr_t next(deadline &d) noexcept override final; + + public: + constexpr io_aware_work_item() {} + /*! \brief Constructs a work item aware of i/o done to the handles in `hs`. + + Note that the `reads`, `writes` and `barriers` are normalised to proportions + out of `1.0` by this constructor, so if for example you had `reads/writes/barriers = 200/100/0`, + after normalisation those become `0.66/0.33/0.0` such that the total is `1.0`. + If `reads/writes/barriers = 0/0/0` on entry, they are replaced with `0.5/0.5/0.0`. + + Note that normalisation is across *all* i/o handles in the set, so three handles + each with `reads/writes/barriers = 200/100/0` on entry would have `0.22/0.11/0.0` + each after construction. + */ + explicit LLFIO_HEADERS_ONLY_MEMFUNC_SPEC io_aware_work_item(span<io_handle_awareness> hs); + io_aware_work_item(io_aware_work_item &&o) noexcept + : work_item(std::move(o)) + , _handles(o._handles) + { + } + LLFIO_HEADERS_ONLY_MEMFUNC_SPEC ~io_aware_work_item(); + + //! The handles originally registered during construction. + span<io_handle_awareness> handles() const noexcept { return _handles; } + + /*! \brief As for `work_item::next()`, but deadline may be extended to + reduce i/o congestion on the hardware devices to which the handles + refer. + */ + virtual intptr_t io_aware_next(deadline &d) noexcept = 0; + }; + + virtual ~dynamic_thread_pool_group() {} + + /*! \brief Threadsafe. Submit one or more work items for execution. Note that you can submit more later. + + Note that if the group is currently stopping, you cannot submit more + work until the group has stopped. An error code comparing equal to + `errc::operation_canceled` is returned if you try. + */ + virtual result<void> submit(span<work_item *> work) noexcept = 0; + //! \overload + result<void> submit(work_item *wi) noexcept { return submit(span<work_item *>(&wi, 1)); } + //! \overload + LLFIO_TEMPLATE(class T) + LLFIO_TREQUIRES(LLFIO_TPRED(!std::is_pointer<T>::value), LLFIO_TPRED(std::is_base_of<work_item, T>::value)) + result<void> submit(span<T> wi) noexcept + { + auto *wis = (T **) alloca(sizeof(T *) * wi.size()); + for(size_t n = 0; n < wi.size(); n++) + { + wis[n] = &wi[n]; + } + return submit(span<work_item *>((work_item **) wis, wi.size())); + } + + //! Threadsafe. Cancel any remaining work previously submitted, but without blocking (use `wait()` to block). + virtual result<void> stop() noexcept = 0; + + /*! \brief Threadsafe. True if a work item reported an error, or + `stop()` was called, but work items are still running. + */ + virtual bool stopping() const noexcept = 0; + + //! Threadsafe. True if all the work previously submitted is complete. + virtual bool stopped() const noexcept = 0; + + //! Threadsafe. Wait for work previously submitted to complete, returning any failures by any work item. + virtual result<void> wait(deadline d = {}) const noexcept = 0; + //! \overload + template <class Rep, class Period> result<bool> wait_for(const std::chrono::duration<Rep, Period> &duration) const noexcept + { + auto r = wait(duration); + if(!r && r.error() == errc::timed_out) + { + return false; + } + OUTCOME_TRY(std::move(r)); + return true; + } + //! \overload + template <class Clock, class Duration> result<bool> wait_until(const std::chrono::time_point<Clock, Duration> &timeout) const noexcept + { + auto r = wait(timeout); + if(!r && r.error() == errc::timed_out) + { + return false; + } + OUTCOME_TRY(std::move(r)); + return true; + } + + //! Returns the work item nesting level which would be used if a new dynamic thread pool group were created within the current work item. + static LLFIO_HEADERS_ONLY_MEMFUNC_SPEC size_t current_nesting_level() noexcept; + //! Returns the work item the calling thread is running within, if any. + static LLFIO_HEADERS_ONLY_MEMFUNC_SPEC work_item *current_work_item() noexcept; +}; +//! A unique ptr to a work group within the global dynamic thread pool. +using dynamic_thread_pool_group_ptr = std::unique_ptr<dynamic_thread_pool_group>; + +//! Creates a new work group within the global dynamic thread pool. +LLFIO_HEADERS_ONLY_FUNC_SPEC result<dynamic_thread_pool_group_ptr> make_dynamic_thread_pool_group() noexcept; + +// BEGIN make_free_functions.py +// END make_free_functions.py + +LLFIO_V2_NAMESPACE_END + +#ifdef _MSC_VER +#pragma warning(pop) +#endif + +#if LLFIO_HEADERS_ONLY == 1 && !defined(DOXYGEN_SHOULD_SKIP_THIS) +#define LLFIO_INCLUDED_BY_HEADER 1 +#include "detail/impl/dynamic_thread_pool_group.ipp" +#undef LLFIO_INCLUDED_BY_HEADER +#endif + +#endif diff --git a/include/llfio/v2.0/fs_handle.hpp b/include/llfio/v2.0/fs_handle.hpp index 091f2f2e..77efd39d 100644 --- a/include/llfio/v2.0/fs_handle.hpp +++ b/include/llfio/v2.0/fs_handle.hpp @@ -53,6 +53,8 @@ public: using path_view_type = path_view; //! The unique identifier type used by this handle using unique_id_type = QUICKCPPLIB_NAMESPACE::integers128::uint128; + //! A hasher for the unique identifier type used by this handle + using unique_id_type_hasher = QUICKCPPLIB_NAMESPACE::integers128::uint128_hasher; protected: mutable dev_t _devid{0}; diff --git a/include/llfio/v2.0/llfio.hpp b/include/llfio/v2.0/llfio.hpp index 7beb7620..dc7ed70a 100644 --- a/include/llfio/v2.0/llfio.hpp +++ b/include/llfio/v2.0/llfio.hpp @@ -62,14 +62,17 @@ import LLFIO_MODULE_NAME; #include "stat.hpp" #include "utils.hpp" +#include "directory_handle.hpp" +#ifndef LLFIO_EXCLUDE_DYNAMIC_THREAD_POOL_GROUP +#include "dynamic_thread_pool_group.hpp" +#endif +#include "fast_random_file_handle.hpp" #include "file_handle.hpp" #include "process_handle.hpp" -#include "directory_handle.hpp" #include "statfs.hpp" #ifdef LLFIO_INCLUDE_STORAGE_PROFILE #include "storage_profile.hpp" #endif -#include "fast_random_file_handle.hpp" #include "symlink_handle.hpp" #include "algorithm/clone.hpp" diff --git a/include/llfio/v2.0/logging.hpp b/include/llfio/v2.0/logging.hpp index 1dff759a..7ef30363 100644 --- a/include/llfio/v2.0/logging.hpp +++ b/include/llfio/v2.0/logging.hpp @@ -77,7 +77,7 @@ public: { reinterpret_cast<log_level &>(detail::thread_local_log_level()) = n; } - ~log_level_guard() {reinterpret_cast<log_level &>(detail::thread_local_log_level()) = _v; } + ~log_level_guard() { reinterpret_cast<log_level &>(detail::thread_local_log_level()) = _v; } }; // Infrastructure for recording the current path for when failure occurs @@ -262,8 +262,9 @@ namespace detail return span<char>(buffer, length); } // Strips a __PRETTY_FUNCTION__ of all instances of ::LLFIO_V2_NAMESPACE:: and ::LLFIO_V2_NAMESPACE:: - inline void strip_pretty_function(char *out, size_t bytes, const char *in) + inline void strip_pretty_function(char *_out, size_t bytes, const char *in) { + char *out = _out; const span<char> remove1 = llfio_namespace_string(); const span<char> remove2 = outcome_namespace_string(); for(--bytes; bytes && *in; --bytes) @@ -272,6 +273,32 @@ namespace detail in += remove1.size(); if(!strncmp(in, remove2.data(), remove2.size())) in += remove2.size(); + if(!strncmp(in, "basic_result<", 13)) + { + int count = 13; + for(--bytes; bytes && *in && count; --bytes, --count) + { + *out++ = *in++; + } + if(!*in || bytes ==0) + { + break; + } + count = 1; + while(*in && count > 0) + { + if(*in == '<') + { + count++; + } + else if(*in == '>') + { + count--; + } + in++; + } + in--; + } *out++ = *in++; } *out = 0; diff --git a/include/llfio/v2.0/statfs.hpp b/include/llfio/v2.0/statfs.hpp index 71732da5..c8af2fde 100644 --- a/include/llfio/v2.0/statfs.hpp +++ b/include/llfio/v2.0/statfs.hpp @@ -1,5 +1,5 @@ /* Information about the volume storing a file -(C) 2015-2017 Niall Douglas <http://www.nedproductions.biz/> (8 commits) +(C) 2015-2020 Niall Douglas <http://www.nedproductions.biz/> (8 commits) File Created: Dec 2015 @@ -41,13 +41,40 @@ LLFIO_V2_NAMESPACE_EXPORT_BEGIN class handle; +namespace detail +{ + inline constexpr float constexpr_float_allbits_set_nan() + { +#if defined(_MSC_VER) && !defined(__clang__) + // Not all bits 1, but I can't see how to do better whilst inside constexpr + return -NAN; +#else + return -__builtin_nanf("0xffffff"); // all bits 1 +#endif + } +} // namespace detail + /*! \struct statfs_t \brief Metadata about a filing system. Unsupported entries are all bits set. + +Note also that for some fields, a soft failure to read the requested value manifests +as all bits set. For example, `f_iosinprogress` might not be computable if the +filing system for your handle reports a `dev_t` from `fstat()` which does not +match anything in the system's disk hardware i/o stats. As this can be completely +benign (e.g. your handle is a socket), this is treated as a soft failure. + +Note for `f_iosinprogress` and `f_iosbusytime` that support is not implemented yet +outside Microsoft Windows and Linux. Note also that for Linux, filing systems +spanning multiple hardware devices have undefined outcomes, whereas on Windows +you are given the average of the values for all underlying hardware devices. +Code donations improving the support for these items on Mac OS, FreeBSD and Linux +would be welcomed. */ struct LLFIO_DECL statfs_t { static constexpr uint32_t _allbits1_32 = ~0U; static constexpr uint64_t _allbits1_64 = ~0ULL; + static constexpr float _allbits1_float = detail::constexpr_float_allbits_set_nan(); struct f_flags_t { uint32_t rdonly : 1; //!< Filing system is read only (Windows, POSIX) @@ -75,11 +102,31 @@ struct LLFIO_DECL statfs_t std::string f_mntfromname; /*!< mounted filesystem (Windows, POSIX) */ filesystem::path f_mntonname; /*!< directory on which mounted (Windows, POSIX) */ + uint32_t f_iosinprogress{_allbits1_32}; /*!< i/o's currently in progress (i.e. queue depth) (Windows, Linux) */ + float f_iosbusytime{_allbits1_float}; /*!< percentage of time spent doing i/o (1.0 = 100%) (Windows, Linux) */ + //! Used to indicate what metadata should be filled in - QUICKCPPLIB_BITFIELD_BEGIN(want) { flags = 1 << 0, bsize = 1 << 1, iosize = 1 << 2, blocks = 1 << 3, bfree = 1 << 4, bavail = 1 << 5, files = 1 << 6, ffree = 1 << 7, namemax = 1 << 8, owner = 1 << 9, fsid = 1 << 10, fstypename = 1 << 11, mntfromname = 1 << 12, mntonname = 1 << 13, all = static_cast<unsigned>(-1) } - QUICKCPPLIB_BITFIELD_END(want) + QUICKCPPLIB_BITFIELD_BEGIN(want){flags = 1 << 0, + bsize = 1 << 1, + iosize = 1 << 2, + blocks = 1 << 3, + bfree = 1 << 4, + bavail = 1 << 5, + files = 1 << 6, + ffree = 1 << 7, + namemax = 1 << 8, + owner = 1 << 9, + fsid = 1 << 10, + fstypename = 1 << 11, + mntfromname = 1 << 12, + mntonname = 1 << 13, + iosinprogress = 1 << 14, + iosbusytime = 1 << 15, + all = static_cast<unsigned>(-1)} QUICKCPPLIB_BITFIELD_END(want) //! Constructs a default initialised instance (all bits set) - statfs_t() {} // NOLINT Cannot be constexpr due to lack of constexpe string default constructor :( + statfs_t() + { + } // NOLINT Cannot be constexpr due to lack of constexpe string default constructor :( #ifdef __cpp_exceptions //! Constructs a filled instance, throwing as an exception any error which might occur explicit statfs_t(const handle &h, want wanted = want::all) @@ -94,6 +141,10 @@ struct LLFIO_DECL statfs_t #endif //! Fills in the structure with metadata, returning number of items filled in LLFIO_HEADERS_ONLY_MEMFUNC_SPEC result<size_t> fill(const handle &h, want wanted = want::all) noexcept; + +private: + // Implemented in file_handle.ipp on Windows, otherwise in statfs.ipp + static LLFIO_HEADERS_ONLY_MEMFUNC_SPEC result<std::pair<uint32_t, float>> _fill_ios(const handle &h, const std::string &mntfromname) noexcept; }; LLFIO_V2_NAMESPACE_END diff --git a/include/llfio/v2.0/status_code.hpp b/include/llfio/v2.0/status_code.hpp index 80fd905d..03eb0f67 100644 --- a/include/llfio/v2.0/status_code.hpp +++ b/include/llfio/v2.0/status_code.hpp @@ -25,8 +25,6 @@ Distributed under the Boost Software License, Version 1.0. #ifndef LLFIO_STATUS_CODE_HPP #define LLFIO_STATUS_CODE_HPP -#include "logging.hpp" - /* The SG14 status code implementation is quite profoundly different to the error code implementation. In the error code implementation, std::error_code is fixed by the standard library, so we wrap it with extra metadata into @@ -68,6 +66,8 @@ as that (a) enables safe header only LLFIO on Windows (b) produces better codege #error LLFIO needs Outcome v2.2 or higher #endif +#include "logging.hpp" + LLFIO_V2_NAMESPACE_BEGIN #ifndef LLFIO_DISABLE_PATHS_IN_FAILURE_INFO @@ -351,6 +351,8 @@ LLFIO_V2_NAMESPACE_END #error LLFIO needs Outcome v2.2 or higher #endif +#include "logging.hpp" + LLFIO_V2_NAMESPACE_BEGIN namespace detail diff --git a/test/tests/dynamic_thread_pool_group.cpp b/test/tests/dynamic_thread_pool_group.cpp new file mode 100644 index 00000000..6e985fc8 --- /dev/null +++ b/test/tests/dynamic_thread_pool_group.cpp @@ -0,0 +1,452 @@ +/* Integration test kernel for dynamic_thread_pool_group +(C) 2020 Niall Douglas <http://www.nedproductions.biz/> (2 commits) +File Created: Dec 2020 + + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License in the accompanying file +Licence.txt or at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + + +Distributed under the Boost Software License, Version 1.0. + (See accompanying file Licence.txt or copy at + http://www.boost.org/LICENSE_1_0.txt) +*/ + +//#define LLFIO_LOGGING_LEVEL 99 + +#include "../test_kernel_decl.hpp" + +#include <cmath> // for sqrt + +static inline void TestDynamicThreadPoolGroupWorks() +{ + namespace llfio = LLFIO_V2_NAMESPACE; + // llfio::log_level_guard llg(llfio::log_level::all); + struct work_item; + struct shared_state_t + { + std::atomic<intptr_t> p{0}; + std::atomic<size_t> concurrency{0}, max_concurrency{0}, group_completes{0}; + std::vector<size_t> executed; + llfio::dynamic_thread_pool_group_ptr tpg{llfio::make_dynamic_thread_pool_group().value()}; + std::atomic<bool> cancelling{false}; + } shared_state; + struct work_item final : public llfio::dynamic_thread_pool_group::work_item + { + using _base = llfio::dynamic_thread_pool_group::work_item; + shared_state_t *shared{nullptr}; + std::atomic<bool> within{false}; + + work_item() = default; + explicit work_item(shared_state_t *_shared) + : shared(_shared) + { + } + work_item(work_item &&o) noexcept + : _base(std::move(o)) + , shared(o.shared) + { + } + + virtual intptr_t next(llfio::deadline &d) noexcept override + { + bool expected = false; + BOOST_CHECK(within.compare_exchange_strong(expected, true)); + (void) d; + BOOST_CHECK(parent() == shared->tpg.get()); + auto ret = shared->p.fetch_sub(1); + if(ret < 0) + { + ret = -1; + } + // std::cout << " next() returns " << ret << std::endl; + expected = true; + BOOST_CHECK(within.compare_exchange_strong(expected, false)); + return ret; + } + virtual llfio::result<void> operator()(intptr_t work) noexcept override + { + bool expected = false; + BOOST_CHECK(within.compare_exchange_strong(expected, true)); + auto concurrency = shared->concurrency.fetch_add(1) + 1; + for(size_t expected_ = shared->max_concurrency; concurrency > expected_;) + { + shared->max_concurrency.compare_exchange_weak(expected_, concurrency); + } + BOOST_CHECK(parent() == shared->tpg.get()); + BOOST_CHECK(llfio::dynamic_thread_pool_group::current_nesting_level() == 1); + BOOST_CHECK(llfio::dynamic_thread_pool_group::current_work_item() == this); + // std::cout << " () executes " << work << std::endl; + std::this_thread::sleep_for(std::chrono::milliseconds(50)); + auto *executed = (std::atomic<size_t> *) &shared->executed[work]; + executed->fetch_add(1); + shared->concurrency.fetch_sub(1); + expected = true; + BOOST_CHECK(within.compare_exchange_strong(expected, false)); + return llfio::success(); + } + virtual void group_complete(const llfio::result<void> &cancelled) noexcept override + { + bool expected = false; + BOOST_CHECK(within.compare_exchange_strong(expected, true)); + BOOST_CHECK(parent() == nullptr); + BOOST_CHECK(shared->cancelling == cancelled.has_error()); + // std::cout << " group_complete()" << std::endl; + shared->group_completes.fetch_add(1); + expected = true; + BOOST_CHECK(within.compare_exchange_strong(expected, false)); + } + }; + std::vector<work_item> workitems; + auto reset = [&](size_t count) { + workitems.clear(); + shared_state.executed.clear(); + shared_state.executed.resize(count + 1); + for(size_t n = 0; n < count; n++) + { + workitems.emplace_back(&shared_state); + } + shared_state.p = (intptr_t) count; + shared_state.concurrency = 0; + shared_state.max_concurrency = 0; + shared_state.group_completes = 0; + }; + auto submit = [&] { + auto **wis = (llfio::dynamic_thread_pool_group::work_item **) alloca(sizeof(work_item *) * workitems.size()); + for(size_t n = 0; n < workitems.size(); n++) + { + wis[n] = &workitems[n]; + } + BOOST_CHECK(!shared_state.tpg->stopping()); + BOOST_CHECK(shared_state.tpg->stopped()); + BOOST_CHECK(llfio::dynamic_thread_pool_group::current_nesting_level() == 0); + BOOST_CHECK(llfio::dynamic_thread_pool_group::current_work_item() == nullptr); + for(size_t n = 0; n < workitems.size(); n++) + { + BOOST_CHECK(workitems[n].parent() == nullptr); + } + shared_state.tpg->submit({wis, workitems.size()}).value(); + BOOST_CHECK(!shared_state.tpg->stopping()); + BOOST_CHECK(!shared_state.tpg->stopped()); + for(size_t n = 0; n < workitems.size(); n++) + { + BOOST_CHECK(workitems[n].parent() == shared_state.tpg.get()); + } + BOOST_CHECK(llfio::dynamic_thread_pool_group::current_nesting_level() == 0); + BOOST_CHECK(llfio::dynamic_thread_pool_group::current_work_item() == nullptr); + }; + auto check = [&] { + auto r = shared_state.tpg->wait(); + if(!r) + { + std::cerr << "ERROR: wait() reports failure " << r.error().message() << std::endl; + r.value(); + } + BOOST_CHECK(!shared_state.tpg->stopping()); + BOOST_CHECK(shared_state.tpg->stopped()); + BOOST_CHECK(llfio::dynamic_thread_pool_group::current_nesting_level() == 0); + BOOST_CHECK(llfio::dynamic_thread_pool_group::current_work_item() == nullptr); + for(size_t n = 0; n < workitems.size(); n++) + { + BOOST_CHECK(workitems[n].parent() == nullptr); + } + BOOST_CHECK(shared_state.group_completes == workitems.size()); + BOOST_CHECK(shared_state.executed[0] == 0); + if(shared_state.cancelling) + { + size_t executed = 0, notexecuted = 0; + for(size_t n = 1; n <= workitems.size(); n++) + { + if(shared_state.executed[n] == 1) + { + executed++; + } + else + { + notexecuted++; + } + } + std::cout << "During cancellation, executed " << executed << " and did not execute " << notexecuted << std::endl; + } + else + { + for(size_t n = 1; n <= workitems.size(); n++) + { + BOOST_CHECK(shared_state.executed[n] == 1); + if(shared_state.executed[n] != 1) + { + std::cout << "shared_state.executed[" << n << "] = " << shared_state.executed[n] << std::endl; + } + } + } + std::cout << "Maximum concurrency achieved with " << workitems.size() << " work items = " << shared_state.max_concurrency << "\n" << std::endl; + }; + auto print_exception_throw = llfio::make_scope_fail([]() noexcept { + std::cout << "NOTE: Exception throw occurred!" << std::endl; + }); + + // Test a single work item + reset(1); + submit(); + check(); + + // Test 10 work items + reset(10); + submit(); + check(); + + // Test 1000 work items + reset(1000); + submit(); + check(); + + // Test 1000 work items with stop + reset(1000); + submit(); + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + shared_state.cancelling = true; + shared_state.tpg->stop().value(); + BOOST_CHECK(shared_state.tpg->stopping()); + auto r = shared_state.tpg->wait(); + BOOST_CHECK(!shared_state.tpg->stopping()); + BOOST_REQUIRE(!r); + BOOST_CHECK(r.error() == llfio::errc::operation_canceled); + check(); +} + +static inline void TestDynamicThreadPoolGroupNestingWorks() +{ + if(std::thread::hardware_concurrency() < 4) + { + std::cout << "NOTE: Skipping TestDynamicThreadPoolGroupNestingWorks as hardware concurrency is below 4." << std::endl; + return; + } + namespace llfio = LLFIO_V2_NAMESPACE; + static constexpr size_t MAX_NESTING = 10; + static constexpr int COUNT_PER_WORK_ITEM = 1000; + struct shared_state_t + { + std::mutex lock; + std::unordered_map<uint64_t, size_t> time_bucket; + llfio::dynamic_thread_pool_group_ptr tpg; + double stddev{0}; + void calc_stddev() + { + stddev = 0; + uint64_t mean = 0, count = 0; + for(auto &i : time_bucket) + { + mean += i.first * i.second; + count += i.second; + } + mean /= count; + for(auto &i : time_bucket) + { + double diff = (double) abs((int64_t) i.first - (int64_t) mean); + stddev += diff * diff * i.second; + } + stddev /= count; + stddev = sqrt(stddev); + } + } shared_states[MAX_NESTING]; + struct work_item final : public llfio::dynamic_thread_pool_group::work_item + { + using _base = llfio::dynamic_thread_pool_group::work_item; + const size_t nesting{0}; + llfio::span<shared_state_t> shared_states; + std::atomic<int> count{COUNT_PER_WORK_ITEM}; + std::unique_ptr<work_item> childwi; + + work_item() = default; + explicit work_item(size_t _nesting, llfio::span<shared_state_t> _shared_states) + : nesting(_nesting) + , shared_states(_shared_states) + { + if(nesting + 1 < MAX_NESTING) + { + childwi = std::make_unique<work_item>(nesting + 1, shared_states); + } + } + work_item(work_item &&o) noexcept + : _base(std::move(o)) + , nesting(o.nesting) + , shared_states(o.shared_states) + , childwi(std::move(o.childwi)) + { + } + + virtual intptr_t next(llfio::deadline & /*unused*/) noexcept override + { + auto ret = count.fetch_sub(1); + if(ret <= 0) + { + ret = -1; + } + return ret; + } + virtual llfio::result<void> operator()(intptr_t work) noexcept override + { + auto supposed_nesting_level = llfio::dynamic_thread_pool_group::current_nesting_level(); + BOOST_CHECK(nesting + 1 == supposed_nesting_level); + if(nesting + 1 != supposed_nesting_level) + { + std::cerr << "current_nesting_level() reports " << supposed_nesting_level << " not " << (nesting + 1) << std::endl; + } + uint64_t idx = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now().time_since_epoch()).count(); + std::lock_guard<std::mutex> g(shared_states[nesting].lock); + //std::cout << "wi " << this << " nesting " << nesting << " work " << work << std::endl; + if(COUNT_PER_WORK_ITEM == work && childwi) + { + if(!shared_states[nesting].tpg) + { + shared_states[nesting].tpg = llfio::make_dynamic_thread_pool_group().value(); + } + OUTCOME_TRY(shared_states[nesting].tpg->submit(childwi.get())); + } + shared_states[nesting].time_bucket[idx]++; + return llfio::success(); + } + // virtual void group_complete(const llfio::result<void> &/*unused*/) noexcept override { } + }; + std::vector<work_item> workitems; + for(size_t n = 0; n < 100; n++) + { + workitems.emplace_back(0, shared_states); + } + auto tpg = llfio::make_dynamic_thread_pool_group().value(); + tpg->submit(llfio::span<work_item>(workitems)).value(); + tpg->wait().value(); + for(size_t n = 0; n < MAX_NESTING - 1; n++) + { + std::unique_lock<std::mutex> g(shared_states[n].lock); + while(!shared_states[n].tpg) + { + g.unlock(); + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + g.lock(); + } + g.unlock(); + shared_states[n].tpg->wait().value(); + } + for(size_t n = 0; n < MAX_NESTING; n++) + { + shared_states[n].calc_stddev(); + std::cout << " Standard deviation for nesting level " << (n + 1) << " was " << shared_states[n].stddev << std::endl; + } + BOOST_CHECK(shared_states[MAX_NESTING - 1].stddev < shared_states[MAX_NESTING / 2].stddev / 2); +} + +static inline void TestDynamicThreadPoolGroupIoAwareWorks() +{ + // statfs_t::iosinprogress not implemented for these yet +#if defined(__APPLE__) || defined(__FreeBSD__) + return; +#endif + namespace llfio = LLFIO_V2_NAMESPACE; + static constexpr size_t WORK_ITEMS = 1000; + static constexpr size_t IO_SIZE = 1 * 65536; + struct shared_state_t + { + llfio::file_handle h; + llfio::dynamic_thread_pool_group::io_aware_work_item::io_handle_awareness awareness; + std::atomic<size_t> concurrency{0}, max_concurrency{0}; + std::atomic<uint64_t> current_pacing{0}; + } shared_state; + struct work_item final : public llfio::dynamic_thread_pool_group::io_aware_work_item + { + using _base = llfio::dynamic_thread_pool_group::io_aware_work_item; + shared_state_t *shared_state; + + work_item() = default; + explicit work_item(shared_state_t *_shared_state) + : _base({&_shared_state->awareness, 1}) + , shared_state(_shared_state) + { + } + work_item(work_item &&o) noexcept + : _base(std::move(o)) + , shared_state(o.shared_state) + { + } + + virtual intptr_t io_aware_next(llfio::deadline &d) noexcept override + { + shared_state->current_pacing.store(d.nsecs, std::memory_order_relaxed); + return 1; + } + virtual llfio::result<void> operator()(intptr_t /*unused*/) noexcept override + { + auto concurrency = shared_state->concurrency.fetch_add(1, std::memory_order_relaxed) + 1; + for(size_t expected_ = shared_state->max_concurrency; concurrency > expected_;) + { + shared_state->max_concurrency.compare_exchange_weak(expected_, concurrency); + } + static thread_local std::vector<llfio::byte, llfio::utils::page_allocator<llfio::byte>> buffer(IO_SIZE); + OUTCOME_TRY(shared_state->h.read((concurrency - 1) * IO_SIZE, {{buffer}})); + shared_state->concurrency.fetch_sub(1, std::memory_order_relaxed); + return llfio::success(); + } + // virtual void group_complete(const llfio::result<void> &/*unused*/) noexcept override { } + }; + shared_state.h = llfio::file_handle::temp_file({}, llfio::file_handle::mode::write, llfio::file_handle::creation::only_if_not_exist, + llfio::file_handle::caching::only_metadata) + .value(); + shared_state.awareness.h = &shared_state.h; + shared_state.h.truncate(WORK_ITEMS * IO_SIZE).value(); + alignas(4096) llfio::byte buffer[IO_SIZE]; + llfio::utils::random_fill((char *) buffer, sizeof(buffer)); + std::vector<work_item> workitems; + for(size_t n = 0; n < WORK_ITEMS; n++) + { + workitems.emplace_back(&shared_state); + shared_state.h.write(n * IO_SIZE, {{buffer, sizeof(buffer)}}).value(); + } + auto tpg = llfio::make_dynamic_thread_pool_group().value(); + tpg->submit(llfio::span<work_item>(workitems)).value(); + auto begin = std::chrono::steady_clock::now(); + size_t paced = 0; + while(std::chrono::duration_cast<std::chrono::seconds>(std::chrono::steady_clock::now() - begin) < std::chrono::seconds(60)) + { + llfio::statfs_t statfs; + statfs.fill(shared_state.h, llfio::statfs_t::want::iosinprogress | llfio::statfs_t::want::iosbusytime | llfio::statfs_t::want::mntonname).value(); + std::cout << "\nStorage device at " << statfs.f_mntonname << " is at " << (100.0f * statfs.f_iosbusytime) << "% utilisation and has an i/o queue depth of " + << statfs.f_iosinprogress << ". Current concurrency is " << shared_state.concurrency.load(std::memory_order_relaxed) << " and current pacing is " + << (shared_state.current_pacing.load(std::memory_order_relaxed) / 1000.0) << " microseconds." << std::endl; + if(shared_state.current_pacing.load(std::memory_order_relaxed) > 0) + { + paced++; + } + std::this_thread::sleep_for(std::chrono::milliseconds(250)); + } + std::cout << "\nStopping ..." << std::endl; + tpg->stop().value(); + while(!tpg->stopped()) + { + std::cout << "\nCurrent concurrency is " << shared_state.concurrency.load(std::memory_order_relaxed) << " and current pacing is " + << (shared_state.current_pacing.load(std::memory_order_relaxed) / 1000.0) << " microseconds." << std::endl; + std::this_thread::sleep_for(std::chrono::milliseconds(1000)); + } + auto r = tpg->wait(); + if(!r && r.error() != llfio::errc::operation_canceled) + { + r.value(); + } + BOOST_CHECK(paced > 0); +} + +KERNELTEST_TEST_KERNEL(integration, llfio, dynamic_thread_pool_group, works, "Tests that llfio::dynamic_thread_pool_group works as expected", + TestDynamicThreadPoolGroupWorks()) +KERNELTEST_TEST_KERNEL(integration, llfio, dynamic_thread_pool_group, nested, "Tests that nesting of llfio::dynamic_thread_pool_group works as expected", + TestDynamicThreadPoolGroupNestingWorks()) +//KERNELTEST_TEST_KERNEL(integration, llfio, dynamic_thread_pool_group, io_aware_work_item, +// "Tests that llfio::dynamic_thread_pool_group::io_aware_work_item works as expected", TestDynamicThreadPoolGroupIoAwareWorks()) diff --git a/test/tests/statfs.cpp b/test/tests/statfs.cpp new file mode 100644 index 00000000..138fd4c1 --- /dev/null +++ b/test/tests/statfs.cpp @@ -0,0 +1,101 @@ +/* Integration test kernel for statfs +(C) 2020 Niall Douglas <http://www.nedproductions.biz/> (2 commits) +File Created: Dec 2020 + + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License in the accompanying file +Licence.txt or at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + + +Distributed under the Boost Software License, Version 1.0. + (See accompanying file Licence.txt or copy at + http://www.boost.org/LICENSE_1_0.txt) +*/ + +#include "../test_kernel_decl.hpp" + +#include <future> +#include <cmath> + +static inline void TestStatfsIosInProgress() +{ + namespace llfio = LLFIO_V2_NAMESPACE; + llfio::file_handle h1 = llfio::file_handle::uniquely_named_file({}, llfio::file_handle::mode::write, llfio::file_handle::caching::all, + llfio::file_handle::flag::unlink_on_first_close) + .value(); + llfio::file_handle h2 = llfio::file_handle::temp_file().value(); + // h1 is within our build directory's volume, h2 is within our temp volume. + auto print_statfs = [](const llfio::file_handle &h, const llfio::statfs_t &statfs) { + std::cout << "\nFor file " << h.current_path().value() << ":"; + std::cout << "\n fundamental filesystem block size = " << statfs.f_bsize; + std::cout << "\n optimal transfer block size = " << statfs.f_iosize; + std::cout << "\n total data blocks in filesystem = " << statfs.f_blocks; + std::cout << "\n free blocks in filesystem = " << statfs.f_bfree; + std::cout << "\n free blocks avail to non-superuser = " << statfs.f_bavail; + std::cout << "\n total file nodes in filesystem = " << statfs.f_files; + std::cout << "\n free nodes avail to non-superuser = " << statfs.f_ffree; + std::cout << "\n maximum filename length = " << statfs.f_namemax; + std::cout << "\n filesystem type name = " << statfs.f_fstypename; + std::cout << "\n mounted filesystem = " << statfs.f_mntfromname; + std::cout << "\n directory on which mounted = " << statfs.f_mntonname; + std::cout << "\n i/o's currently in progress (i.e. queue depth) = " << statfs.f_iosinprogress; + std::cout << "\n percentage of time spent doing i/o (1.0 = 100%) = " << statfs.f_iosbusytime; + std::cout << std::endl; + }; + llfio::statfs_t s1base, s2base; + s1base.fill(h1).value(); + s2base.fill(h2).value(); + print_statfs(h1, s1base); + print_statfs(h2, s2base); + std::atomic<bool> done{false}; + auto do_io_to = [&done](llfio::file_handle &fh) { + std::vector<llfio::byte> buffer; + buffer.resize(4096); // 1048576 + llfio::utils::random_fill((char *) buffer.data(), buffer.size()); + while(!done) + { + fh.write(0, {{buffer.data(), buffer.size()}}).value(); + fh.barrier().value(); + } + }; + { + auto f = std::async(std::launch::async, [&] { do_io_to(h1); }); + std::this_thread::sleep_for(std::chrono::milliseconds(1000)); + llfio::statfs_t s1load, s2load; + s1load.fill(h1).value(); + s2load.fill(h2).value(); + done = true; + print_statfs(h1, s1load); + print_statfs(h2, s2load); + // BOOST_CHECK(s1load.f_iosinprogress > s1base.f_iosinprogress); + BOOST_CHECK(std::isnan(s1base.f_iosbusytime) || s1load.f_iosbusytime > s1base.f_iosbusytime); + f.get(); + done = false; + } + { + auto f = std::async(std::launch::async, [&] { do_io_to(h2); }); + std::this_thread::sleep_for(std::chrono::milliseconds(1000)); + llfio::statfs_t s1load, s2load; + s1load.fill(h1).value(); + s2load.fill(h2).value(); + done = true; + print_statfs(h1, s1load); + print_statfs(h2, s2load); + // BOOST_CHECK(s2load.f_iosinprogress > s2base.f_iosinprogress); + BOOST_CHECK(std::isnan(s2base.f_iosbusytime) || s2load.f_iosbusytime > s2base.f_iosbusytime); + f.get(); + done = false; + } +} + +KERNELTEST_TEST_KERNEL(integration, llfio, statfs, iosinprogress, "Tests that llfio::statfs_t::f_iosinprogress works as expected", TestStatfsIosInProgress()) |