Welcome to mirror list, hosted at ThFree Co, Russian Federation.

github.com/windirstat/llfio.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorNiall Douglas <s_github@nedprod.com>2021-03-17 19:04:35 +0300
committerGitHub <noreply@github.com>2021-03-17 19:04:35 +0300
commit3354ed31eea1534321d2ea0c05e82e572297609c (patch)
treee672081eba86c4edf1db01238317b4379e0352b3
parent17a15470b8d079625732bccfc96a3dd45e18f1c1 (diff)
parentc66ba10ca949bdcce5f1029634f46b3db092a378 (diff)
Merge pull request #68 from ned14/dynamic_thread_pool_group
Dynamic thread pool group
-rw-r--r--CMakeLists.txt32
-rw-r--r--cmake/headers.cmake2
-rw-r--r--cmake/tests.cmake2
-rw-r--r--include/llfio/revision.hpp6
-rw-r--r--include/llfio/v2.0/detail/impl/dynamic_thread_pool_group.ipp2491
-rw-r--r--include/llfio/v2.0/detail/impl/posix/directory_handle.ipp57
-rw-r--r--include/llfio/v2.0/detail/impl/posix/statfs.ipp494
-rw-r--r--include/llfio/v2.0/detail/impl/windows/directory_handle.ipp2
-rw-r--r--include/llfio/v2.0/detail/impl/windows/file_handle.ipp111
-rw-r--r--include/llfio/v2.0/detail/impl/windows/statfs.ipp21
-rw-r--r--include/llfio/v2.0/dynamic_thread_pool_group.hpp530
-rw-r--r--include/llfio/v2.0/fs_handle.hpp2
-rw-r--r--include/llfio/v2.0/llfio.hpp5
-rw-r--r--include/llfio/v2.0/logging.hpp31
-rw-r--r--include/llfio/v2.0/statfs.hpp59
-rw-r--r--include/llfio/v2.0/status_code.hpp6
-rw-r--r--programs/CMakeLists.txt7
-rw-r--r--programs/benchmark-async/main.cpp4
-rw-r--r--programs/benchmark-dynamic_thread_pool_group/main.cpp232
-rw-r--r--test/tests/dynamic_thread_pool_group.cpp450
-rw-r--r--test/tests/statfs.cpp101
21 files changed, 4448 insertions, 197 deletions
diff --git a/CMakeLists.txt b/CMakeLists.txt
index dec1ade8..aa6e8d20 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -32,6 +32,11 @@ option(LLFIO_ENABLE_DEPENDENCY_SMOKE_TEST "Whether to build executables which ar
option(LLFIO_ASSUME_CROSS_COMPILING "Whether to assume we are cross compiling. Normally automatically detected, but if automatic detection doesn't work, a working <filesystem> will not be found during cmake configure." OFF)
option(UNIT_TESTS_BUILD_ALL "Whether to run all of the unit test suite." OFF)
set(UNIT_TESTS_CXX_VERSION "latest" CACHE STRING "The version of C++ to use in the header-only unit tests")
+if(CMAKE_SYSTEM_NAME MATCHES "FreeBSD" OR APPLE)
+ option(LLFIO_USE_LIBDISPATCH "Whether to use libdispatch/Grand Unified Dispatch (defaults on on BSD/Mac OS)" ON)
+else()
+ option(LLFIO_USE_LIBDISPATCH "Whether to use libdispatch/Grand Unified Dispatch (defaults on on BSD/Mac OS)" OFF)
+endif()
ensure_git_subrepo("${CMAKE_CURRENT_SOURCE_DIR}/include/llfio/ntkernel-error-category/include" "https://github.com/ned14/ntkernel-error-category.git")
@@ -283,6 +288,30 @@ int main() {
all_compile_definitions(PUBLIC LLFIO_FORCE_EXPERIMENTAL_FILESYSTEM=1 KERNELTEST_FORCE_EXPERIMENTAL_FILESYSTEM=1)
endif()
endif()
+# Do we have Grand Central Dispatch on this platform?
+if(LLFIO_USE_LIBDISPATCH)
+ function(check_have_libdispatch postfix)
+ set(CMAKE_REQUIRED_LIBRARIES ${ARGN})
+ check_cxx_source_compiles("
+#include <dispatch/dispatch.h>
+int main() {
+ return dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0) != nullptr;
+}
+" LLFIO_HAS_LIBDISPATCH_${postfix})
+ endfunction()
+ check_have_libdispatch(BUILTIN)
+ if(NOT LLFIO_HAS_LIBDISPATCH_BUILTIN)
+ check_have_libdispatch(WITH_LIBDISPATCH dispatch)
+ if(LLFIO_HAS_LIBDISPATCH_WITH_LIBDISPATCH)
+ all_link_libraries(PUBLIC dispatch)
+ endif()
+ endif()
+else()
+ all_compile_definitions(PUBLIC LLFIO_DYNAMIC_THREAD_POOL_GROUP_USING_GCD=0)
+endif()
+if(NOT LLFIO_HAS_LIBDISPATCH_BUILTIN AND NOT LLFIO_HAS_LIBDISPATCH_WITH_LIBDISPATCH AND (CMAKE_SYSTEM_NAME MATCHES "FreeBSD" OR APPLE))
+ indented_message(FATAL_ERROR "FATAL: Grand Central Dispatch as libdispatch was not found on this FreeBSD or Mac OS system. libdispatch is required for LLFIO to build on those systems.")
+endif()
# Set any macros this library requires
all_compile_definitions(PRIVATE LLFIO_INCLUDE_STORAGE_PROFILE=1 LLFIO_ENABLE_TEST_IO_MULTIPLEXERS=1)
@@ -293,7 +322,8 @@ if(LLFIO_USE_EXPERIMENTAL_SG14_STATUS_CODE)
all_compile_definitions(PUBLIC LLFIO_EXPERIMENTAL_STATUS_CODE=1)
endif()
if(WIN32)
- all_compile_definitions(PRIVATE _WIN32_WINNT=0x601) ## Target Win7
+ all_compile_definitions(PRIVATE _WIN32_WINNT=0x601) ## Target Win7
+ target_compile_definitions(llfio_hl INTERFACE _WIN32_WINNT=0x601) ## Target Win7
if(NOT LLFIO_USE_EXPERIMENTAL_SG14_STATUS_CODE)
target_link_libraries(llfio_hl INTERFACE ntkernel-error-category::hl)
target_link_libraries(llfio_dl PUBLIC ntkernel-error-category::dl)
diff --git a/cmake/headers.cmake b/cmake/headers.cmake
index af4511e6..88c7deb9 100644
--- a/cmake/headers.cmake
+++ b/cmake/headers.cmake
@@ -29,6 +29,7 @@ set(llfio_HEADERS
"include/llfio/v2.0/detail/impl/cached_parent_handle_adapter.ipp"
"include/llfio/v2.0/detail/impl/clone.ipp"
"include/llfio/v2.0/detail/impl/config.ipp"
+ "include/llfio/v2.0/detail/impl/dynamic_thread_pool_group.ipp"
"include/llfio/v2.0/detail/impl/fast_random_file_handle.ipp"
"include/llfio/v2.0/detail/impl/io_multiplexer.ipp"
"include/llfio/v2.0/detail/impl/path_discovery.ipp"
@@ -77,6 +78,7 @@ set(llfio_HEADERS
"include/llfio/v2.0/detail/impl/windows/test/iocp_multiplexer.ipp"
"include/llfio/v2.0/detail/impl/windows/utils.ipp"
"include/llfio/v2.0/directory_handle.hpp"
+ "include/llfio/v2.0/dynamic_thread_pool_group.hpp"
"include/llfio/v2.0/fast_random_file_handle.hpp"
"include/llfio/v2.0/file_handle.hpp"
"include/llfio/v2.0/fs_handle.hpp"
diff --git a/cmake/tests.cmake b/cmake/tests.cmake
index b216ccba..de7d54d4 100644
--- a/cmake/tests.cmake
+++ b/cmake/tests.cmake
@@ -7,6 +7,7 @@ set(llfio_TESTS
"test/tests/directory_handle_create_close/runner.cpp"
"test/tests/directory_handle_enumerate/kernel_directory_handle_enumerate.cpp.hpp"
"test/tests/directory_handle_enumerate/runner.cpp"
+ "test/tests/dynamic_thread_pool_group.cpp"
"test/tests/fast_random_file_handle.cpp"
"test/tests/file_handle_create_close/kernel_file_handle.cpp.hpp"
"test/tests/file_handle_create_close/runner.cpp"
@@ -28,6 +29,7 @@ set(llfio_TESTS
"test/tests/section_handle_create_close/kernel_section_handle.cpp.hpp"
"test/tests/section_handle_create_close/runner.cpp"
"test/tests/shared_fs_mutex.cpp"
+ "test/tests/statfs.cpp"
"test/tests/symlink_handle_create_close/kernel_symlink_handle.cpp.hpp"
"test/tests/symlink_handle_create_close/runner.cpp"
"test/tests/traverse.cpp"
diff --git a/include/llfio/revision.hpp b/include/llfio/revision.hpp
index 4f6234d6..2cd9f8b5 100644
--- a/include/llfio/revision.hpp
+++ b/include/llfio/revision.hpp
@@ -1,4 +1,4 @@
// Note the second line of this file must ALWAYS be the git SHA, third line ALWAYS the git SHA update time
-#define LLFIO_PREVIOUS_COMMIT_REF 545a722a055dcc38e7f80520a7a34008bfa9a86f
-#define LLFIO_PREVIOUS_COMMIT_DATE "2021-03-15 10:40:32 +00:00"
-#define LLFIO_PREVIOUS_COMMIT_UNIQUE 545a722a
+#define LLFIO_PREVIOUS_COMMIT_REF 67226948b9f00aebbf33c232d10c417ba1abb289
+#define LLFIO_PREVIOUS_COMMIT_DATE "2021-03-16 12:31:40 +00:00"
+#define LLFIO_PREVIOUS_COMMIT_UNIQUE 67226948
diff --git a/include/llfio/v2.0/detail/impl/dynamic_thread_pool_group.ipp b/include/llfio/v2.0/detail/impl/dynamic_thread_pool_group.ipp
new file mode 100644
index 00000000..287683bf
--- /dev/null
+++ b/include/llfio/v2.0/detail/impl/dynamic_thread_pool_group.ipp
@@ -0,0 +1,2491 @@
+/* Dynamic thread pool group
+(C) 2020-2021 Niall Douglas <http://www.nedproductions.biz/> (9 commits)
+File Created: Dec 2020
+
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License in the accompanying file
+Licence.txt or at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+
+Distributed under the Boost Software License, Version 1.0.
+ (See accompanying file Licence.txt or copy at
+ http://www.boost.org/LICENSE_1_0.txt)
+*/
+
+#include "../../dynamic_thread_pool_group.hpp"
+
+#include "../../file_handle.hpp"
+#include "../../statfs.hpp"
+
+#include "quickcpplib/spinlock.hpp"
+
+#include <atomic>
+#include <memory>
+#include <mutex>
+#include <unordered_map>
+#include <unordered_set>
+#include <vector>
+
+#include <iostream>
+
+#ifndef LLFIO_DYNAMIC_THREAD_POOL_GROUP_USING_GCD
+#if LLFIO_FORCE_USE_LIBDISPATCH
+#include <dispatch/dispatch.h>
+#define LLFIO_DYNAMIC_THREAD_POOL_GROUP_USING_GCD 1
+#else
+#ifdef _WIN32
+#include "windows/import.hpp"
+#include <threadpoolapiset.h>
+#else
+#if __has_include(<dispatch/dispatch.h>)
+#include <dispatch/dispatch.h>
+#define LLFIO_DYNAMIC_THREAD_POOL_GROUP_USING_GCD 1
+#endif
+#endif
+#endif
+#endif
+#if !LLFIO_DYNAMIC_THREAD_POOL_GROUP_USING_GCD && !defined(_WIN32)
+#if !defined(__linux__)
+#error dynamic_thread_pool_group requires Grand Central Dispatch (libdispatch) on non-Linux POSIX.
+#endif
+#include <dirent.h> /* Defines DT_* constants */
+#include <sys/syscall.h>
+
+#include <condition_variable>
+#include <thread>
+#endif
+
+#define LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING 0
+
+/* NOTE that the Linux results are from a VM on the same machine as the Windows results,
+so they are not directly comparable.
+
+Linux 4Kb and 64Kb
+
+Benchmarking asio ...
+ For 1 work items got 38182.6 SHA256 hashes/sec with 1 maximum concurrency.
+ For 2 work items got 68664 SHA256 hashes/sec with 2 maximum concurrency.
+ For 4 work items got 87036.4 SHA256 hashes/sec with 4 maximum concurrency.
+ For 8 work items got 78702.2 SHA256 hashes/sec with 8 maximum concurrency.
+ For 16 work items got 51911.2 SHA256 hashes/sec with 16 maximum concurrency.
+ For 32 work items got 553964 SHA256 hashes/sec with 31 maximum concurrency.
+ For 64 work items got 713844 SHA256 hashes/sec with 36 maximum concurrency.
+ For 128 work items got 700172 SHA256 hashes/sec with 37 maximum concurrency.
+ For 256 work items got 716099 SHA256 hashes/sec with 37 maximum concurrency.
+ For 512 work items got 703323 SHA256 hashes/sec with 37 maximum concurrency.
+ For 1024 work items got 722827 SHA256 hashes/sec with 38 maximum concurrency.
+
+Benchmarking asio ...
+ For 1 work items got 3917.88 SHA256 hashes/sec with 1 maximum concurrency.
+ For 2 work items got 7798.29 SHA256 hashes/sec with 2 maximum concurrency.
+ For 4 work items got 14395.2 SHA256 hashes/sec with 4 maximum concurrency.
+ For 8 work items got 23633.4 SHA256 hashes/sec with 8 maximum concurrency.
+ For 16 work items got 31771.1 SHA256 hashes/sec with 16 maximum concurrency.
+ For 32 work items got 57978 SHA256 hashes/sec with 32 maximum concurrency.
+ For 64 work items got 66200.6 SHA256 hashes/sec with 64 maximum concurrency.
+ For 128 work items got 65706.5 SHA256 hashes/sec with 64 maximum concurrency.
+ For 256 work items got 65717.5 SHA256 hashes/sec with 64 maximum concurrency.
+ For 512 work items got 65652.4 SHA256 hashes/sec with 64 maximum concurrency.
+ For 1024 work items got 65580.3 SHA256 hashes/sec with 64 maximum concurrency.
+
+
+Windows 4Kb and 64kB
+
+Benchmarking asio ...
+ For 1 work items got 51216.7 SHA256 hashes/sec with 1 maximum concurrency.
+ For 2 work items got 97691 SHA256 hashes/sec with 2 maximum concurrency.
+ For 4 work items got 184381 SHA256 hashes/sec with 4 maximum concurrency.
+ For 8 work items got 305270 SHA256 hashes/sec with 8 maximum concurrency.
+ For 16 work items got 520728 SHA256 hashes/sec with 16 maximum concurrency.
+ For 32 work items got 482729 SHA256 hashes/sec with 32 maximum concurrency.
+ For 64 work items got 1.02629e+06 SHA256 hashes/sec with 64 maximum concurrency.
+ For 128 work items got 1.01816e+06 SHA256 hashes/sec with 64 maximum concurrency.
+ For 256 work items got 1.01672e+06 SHA256 hashes/sec with 64 maximum concurrency.
+ For 512 work items got 1.01727e+06 SHA256 hashes/sec with 64 maximum concurrency.
+ For 1024 work items got 1.01477e+06 SHA256 hashes/sec with 64 maximum concurrency.
+
+Benchmarking asio ...
+ For 1 work items got 4069.92 SHA256 hashes/sec with 1 maximum concurrency.
+ For 2 work items got 8099.1 SHA256 hashes/sec with 2 maximum concurrency.
+ For 4 work items got 16021.7 SHA256 hashes/sec with 4 maximum concurrency.
+ For 8 work items got 30275.2 SHA256 hashes/sec with 8 maximum concurrency.
+ For 16 work items got 40972.5 SHA256 hashes/sec with 16 maximum concurrency.
+ For 32 work items got 70919.2 SHA256 hashes/sec with 32 maximum concurrency.
+ For 64 work items got 71917 SHA256 hashes/sec with 64 maximum concurrency.
+ For 128 work items got 71111.8 SHA256 hashes/sec with 64 maximum concurrency.
+ For 256 work items got 70963.5 SHA256 hashes/sec with 64 maximum concurrency.
+ For 512 work items got 70956.3 SHA256 hashes/sec with 64 maximum concurrency.
+ For 1024 work items got 70989.9 SHA256 hashes/sec with 64 maximum concurrency.
+*/
+
+
+/* Linux 4Kb and 64Kb libdispatch
+
+Benchmarking llfio (Grand Central Dispatch) ...
+ For 1 work items got 33942.7 SHA256 hashes/sec with 1 maximum concurrency.
+ For 2 work items got 91275.8 SHA256 hashes/sec with 2 maximum concurrency.
+ For 4 work items got 191446 SHA256 hashes/sec with 4 maximum concurrency.
+ For 8 work items got 325776 SHA256 hashes/sec with 8 maximum concurrency.
+ For 16 work items got 405282 SHA256 hashes/sec with 16 maximum concurrency.
+ For 32 work items got 408015 SHA256 hashes/sec with 31 maximum concurrency.
+ For 64 work items got 412343 SHA256 hashes/sec with 32 maximum concurrency.
+ For 128 work items got 450024 SHA256 hashes/sec with 41 maximum concurrency.
+ For 256 work items got 477885 SHA256 hashes/sec with 46 maximum concurrency.
+ For 512 work items got 531752 SHA256 hashes/sec with 48 maximum concurrency.
+ For 1024 work items got 608181 SHA256 hashes/sec with 44 maximum concurrency.
+
+Benchmarking llfio (Grand Central Dispatch) ...
+ For 1 work items got 3977.21 SHA256 hashes/sec with 1 maximum concurrency.
+ For 2 work items got 7980.09 SHA256 hashes/sec with 2 maximum concurrency.
+ For 4 work items got 15075.6 SHA256 hashes/sec with 4 maximum concurrency.
+ For 8 work items got 24427.3 SHA256 hashes/sec with 8 maximum concurrency.
+ For 16 work items got 41858.7 SHA256 hashes/sec with 16 maximum concurrency.
+ For 32 work items got 64896.4 SHA256 hashes/sec with 32 maximum concurrency.
+ For 64 work items got 65683.6 SHA256 hashes/sec with 34 maximum concurrency.
+ For 128 work items got 65476.1 SHA256 hashes/sec with 35 maximum concurrency.
+ For 256 work items got 65210.6 SHA256 hashes/sec with 36 maximum concurrency.
+ For 512 work items got 65241.1 SHA256 hashes/sec with 36 maximum concurrency.
+ For 1024 work items got 65205.3 SHA256 hashes/sec with 37 maximum concurrency.
+*/
+
+/* Linux 4Kb and 64Kb native
+
+Benchmarking llfio (Linux native) ...
+ For 1 work items got 65160.3 SHA256 hashes/sec with 1 maximum concurrency.
+ For 2 work items got 126586 SHA256 hashes/sec with 2 maximum concurrency.
+ For 4 work items got 246616 SHA256 hashes/sec with 4 maximum concurrency.
+ For 8 work items got 478938 SHA256 hashes/sec with 8 maximum concurrency.
+ For 16 work items got 529919 SHA256 hashes/sec with 15 maximum concurrency.
+ For 32 work items got 902885 SHA256 hashes/sec with 32 maximum concurrency.
+ For 64 work items got 919633 SHA256 hashes/sec with 34 maximum concurrency.
+ For 128 work items got 919695 SHA256 hashes/sec with 35 maximum concurrency.
+ For 256 work items got 923159 SHA256 hashes/sec with 36 maximum concurrency.
+ For 512 work items got 922961 SHA256 hashes/sec with 37 maximum concurrency.
+ For 1024 work items got 926624 SHA256 hashes/sec with 38 maximum concurrency.
+
+Benchmarking llfio (Linux native) ...
+ For 1 work items got 4193.79 SHA256 hashes/sec with 1 maximum concurrency.
+ For 2 work items got 8422.44 SHA256 hashes/sec with 2 maximum concurrency.
+ For 4 work items got 12521.7 SHA256 hashes/sec with 3 maximum concurrency.
+ For 8 work items got 20028.4 SHA256 hashes/sec with 6 maximum concurrency.
+ For 16 work items got 30657.4 SHA256 hashes/sec with 10 maximum concurrency.
+ For 32 work items got 53217.4 SHA256 hashes/sec with 20 maximum concurrency.
+ For 64 work items got 65452.3 SHA256 hashes/sec with 32 maximum concurrency.
+ For 128 work items got 65396.3 SHA256 hashes/sec with 32 maximum concurrency.
+ For 256 work items got 65363.7 SHA256 hashes/sec with 32 maximum concurrency.
+ For 512 work items got 65198.2 SHA256 hashes/sec with 32 maximum concurrency.
+ For 1024 work items got 65003.9 SHA256 hashes/sec with 34 maximum concurrency.
+*/
+
+
+/* Windows 4Kb and 64Kb Win32 thread pool
+
+Benchmarking llfio (Win32 thread pool (Vista+)) ...
+ For 1 work items got 57995.3 SHA256 hashes/sec with 1 maximum concurrency.
+ For 2 work items got 120267 SHA256 hashes/sec with 2 maximum concurrency.
+ For 4 work items got 238139 SHA256 hashes/sec with 4 maximum concurrency.
+ For 8 work items got 413488 SHA256 hashes/sec with 8 maximum concurrency.
+ For 16 work items got 575423 SHA256 hashes/sec with 16 maximum concurrency.
+ For 32 work items got 720938 SHA256 hashes/sec with 31 maximum concurrency.
+ For 64 work items got 703460 SHA256 hashes/sec with 30 maximum concurrency.
+ For 128 work items got 678257 SHA256 hashes/sec with 29 maximum concurrency.
+ For 256 work items got 678898 SHA256 hashes/sec with 29 maximum concurrency.
+ For 512 work items got 671729 SHA256 hashes/sec with 28 maximum concurrency.
+ For 1024 work items got 674433 SHA256 hashes/sec with 30 maximum concurrency.
+
+Benchmarking llfio (Win32 thread pool (Vista+)) ...
+ For 1 work items got 4132.18 SHA256 hashes/sec with 1 maximum concurrency.
+ For 2 work items got 8197.21 SHA256 hashes/sec with 2 maximum concurrency.
+ For 4 work items got 16281.3 SHA256 hashes/sec with 4 maximum concurrency.
+ For 8 work items got 27447.5 SHA256 hashes/sec with 8 maximum concurrency.
+ For 16 work items got 42621.3 SHA256 hashes/sec with 16 maximum concurrency.
+ For 32 work items got 69857.7 SHA256 hashes/sec with 32 maximum concurrency.
+ For 64 work items got 68797.9 SHA256 hashes/sec with 33 maximum concurrency.
+ For 128 work items got 68980.4 SHA256 hashes/sec with 33 maximum concurrency.
+ For 256 work items got 70370.8 SHA256 hashes/sec with 33 maximum concurrency.
+ For 512 work items got 70365.8 SHA256 hashes/sec with 33 maximum concurrency.
+ For 1024 work items got 70794.6 SHA256 hashes/sec with 33 maximum concurrency.
+*/
+
+
+LLFIO_V2_NAMESPACE_BEGIN
+
+namespace detail
+{
+ struct dynamic_thread_pool_group_impl_guard : std::unique_lock<std::mutex>
+ {
+ using std::unique_lock<std::mutex>::unique_lock;
+ };
+#if 0
+ template <class T> class fake_atomic
+ {
+ T _v;
+
+ public:
+ constexpr fake_atomic(T v)
+ : _v(v)
+ {
+ }
+ T load(std::memory_order /*unused*/) const { return _v; }
+ void store(T v, std::memory_order /*unused*/) { _v = v; }
+ T fetch_add(T v, std::memory_order /*unused*/)
+ {
+ _v += v;
+ return _v - v;
+ }
+ T fetch_sub(T v, std::memory_order /*unused*/)
+ {
+ _v -= v;
+ return _v + v;
+ }
+ };
+#endif
+ struct global_dynamic_thread_pool_impl_workqueue_item
+ {
+ const size_t nesting_level;
+ std::shared_ptr<global_dynamic_thread_pool_impl_workqueue_item> next;
+ std::unordered_set<dynamic_thread_pool_group_impl *> items; // Do NOT use without holding workqueue_lock
+
+ explicit global_dynamic_thread_pool_impl_workqueue_item(size_t _nesting_level, std::shared_ptr<global_dynamic_thread_pool_impl_workqueue_item> &&preceding)
+ : nesting_level(_nesting_level)
+ , next(preceding)
+ {
+ }
+
+#if !LLFIO_DYNAMIC_THREAD_POOL_GROUP_USING_GCD && !defined(_WIN32)
+ static constexpr unsigned TOTAL_NEXTACTIVES = 1;
+ struct next_active_base_t
+ {
+ std::atomic<unsigned> count{0};
+ QUICKCPPLIB_NAMESPACE::configurable_spinlock::spinlock<unsigned> lock;
+ dynamic_thread_pool_group::work_item *front{nullptr}, *back{nullptr};
+
+ next_active_base_t() = default;
+ next_active_base_t(const next_active_base_t &o)
+ : count(o.count.load(std::memory_order_relaxed))
+ , front(o.front)
+ , back(o.back)
+ {
+ }
+ };
+ struct alignas(64) next_active_work_t : next_active_base_t
+ {
+ char _padding[64 - sizeof(next_active_base_t)]; // 40 bytes?
+ } next_actives[TOTAL_NEXTACTIVES];
+ static_assert(sizeof(next_active_work_t) == 64, "next_active_work_t is not a cacheline");
+ next_active_base_t next_timer_relative, next_timer_absolute;
+
+ dynamic_thread_pool_group::work_item *next_active(unsigned &count, size_t threadidx)
+ {
+ if(TOTAL_NEXTACTIVES > 1)
+ {
+ threadidx &= (TOTAL_NEXTACTIVES - 1);
+ const size_t original_threadidx = threadidx;
+ bool all_empty = true;
+ for(;;)
+ {
+ next_active_base_t &x = next_actives[threadidx];
+ if(x.count.load(std::memory_order_relaxed) > 0)
+ {
+ all_empty = false;
+ if(x.lock.try_lock())
+ {
+ auto *ret = x.front;
+ if(ret != nullptr)
+ {
+ x.front = ret->_next_scheduled;
+ count = x.count.fetch_sub(1, std::memory_order_relaxed);
+ if(x.front == nullptr)
+ {
+ assert(x.back == ret);
+ x.back = nullptr;
+ }
+ ret->_next_scheduled = nullptr;
+ x.lock.unlock();
+ return ret;
+ }
+ x.lock.unlock();
+ }
+ }
+ if(++threadidx >= TOTAL_NEXTACTIVES)
+ {
+ threadidx = 0;
+ }
+ if(threadidx == original_threadidx)
+ {
+ if(all_empty)
+ {
+ return nullptr;
+ }
+ all_empty = true;
+ }
+ }
+ }
+ else
+ {
+ next_active_base_t &x = next_actives[0];
+ if(x.count.load(std::memory_order_relaxed) > 0)
+ {
+ x.lock.lock();
+ auto *ret = x.front;
+ if(ret != nullptr)
+ {
+ x.front = ret->_next_scheduled;
+ count = x.count.fetch_sub(1, std::memory_order_relaxed);
+ if(x.front == nullptr)
+ {
+ assert(x.back == ret);
+ x.back = nullptr;
+ }
+ ret->_next_scheduled = nullptr;
+ x.lock.unlock();
+ return ret;
+ }
+ x.lock.unlock();
+ }
+ }
+ return nullptr;
+ }
+
+ private:
+ next_active_base_t &_choose_next_active()
+ {
+ unsigned idx = (unsigned) -1, max_count = (unsigned) -1;
+ for(unsigned n = 0; n < TOTAL_NEXTACTIVES; n++)
+ {
+ auto c = next_actives[n].count.load(std::memory_order_relaxed);
+ if(c < max_count)
+ {
+ idx = n;
+ max_count = c;
+ }
+ }
+ for(;;)
+ {
+ if(next_actives[idx].lock.try_lock())
+ {
+ return next_actives[idx];
+ }
+ if(++idx >= TOTAL_NEXTACTIVES)
+ {
+ idx = 0;
+ }
+ }
+ }
+
+ public:
+ void append_active(dynamic_thread_pool_group::work_item *p)
+ {
+ next_active_base_t &x = _choose_next_active();
+ x.count.fetch_add(1, std::memory_order_relaxed);
+ if(x.back == nullptr)
+ {
+ assert(x.front == nullptr);
+ x.front = x.back = p;
+ x.lock.unlock();
+ return;
+ }
+ p->_next_scheduled = nullptr;
+ x.back->_next_scheduled = p;
+ x.back = p;
+ x.lock.unlock();
+ }
+ void prepend_active(dynamic_thread_pool_group::work_item *p)
+ {
+ next_active_base_t &x = _choose_next_active();
+ x.count.fetch_add(1, std::memory_order_relaxed);
+ if(x.front == nullptr)
+ {
+ assert(x.back == nullptr);
+ x.front = x.back = p;
+ x.lock.unlock();
+ return;
+ }
+ p->_next_scheduled = x.front;
+ x.front = p;
+ x.lock.unlock();
+ }
+
+ // x must be LOCKED on entry
+ template <int which> dynamic_thread_pool_group::work_item *next_timer()
+ {
+ if(which == 0)
+ {
+ return nullptr;
+ }
+ next_active_base_t &x = (which == 1) ? next_timer_relative : next_timer_absolute;
+ // x.lock.lock();
+ auto *ret = x.front;
+ if(ret == nullptr)
+ {
+ assert(x.back == nullptr);
+ x.lock.unlock();
+ return nullptr;
+ }
+ x.front = ret->_next_scheduled;
+ if(x.front == nullptr)
+ {
+ assert(x.back == ret);
+ x.back = nullptr;
+ }
+ ret->_next_scheduled = nullptr;
+ x.count.fetch_sub(1, std::memory_order_relaxed);
+ x.lock.unlock();
+ return ret;
+ }
+ void append_timer(dynamic_thread_pool_group::work_item *i)
+ {
+ if(i->_timepoint1 != std::chrono::steady_clock::time_point())
+ {
+ next_timer_relative.lock.lock();
+ next_timer_relative.count.fetch_add(1, std::memory_order_relaxed);
+ if(next_timer_relative.front == nullptr)
+ {
+ next_timer_relative.front = next_timer_relative.back = i;
+ }
+ else
+ {
+ bool done = false;
+ for(dynamic_thread_pool_group::work_item *p = nullptr, *n = next_timer_relative.front; n != nullptr; p = n, n = n->_next_scheduled)
+ {
+ if(n->_timepoint1 <= i->_timepoint1)
+ {
+ if(p == nullptr)
+ {
+ i->_next_scheduled = n;
+ next_timer_relative.front = i;
+ }
+ else
+ {
+ i->_next_scheduled = n;
+ p->_next_scheduled = i;
+ }
+ done = true;
+ break;
+ }
+ }
+ if(!done)
+ {
+ next_timer_relative.back->_next_scheduled = i;
+ i->_next_scheduled = nullptr;
+ next_timer_relative.back = i;
+ }
+ }
+ next_timer_relative.lock.unlock();
+ }
+ else
+ {
+ next_timer_absolute.lock.lock();
+ next_timer_absolute.count.fetch_add(1, std::memory_order_relaxed);
+ if(next_timer_absolute.front == nullptr)
+ {
+ next_timer_absolute.front = next_timer_absolute.back = i;
+ }
+ else
+ {
+ bool done = false;
+ for(dynamic_thread_pool_group::work_item *p = nullptr, *n = next_timer_absolute.front; n != nullptr; p = n, n = n->_next_scheduled)
+ {
+ if(n->_timepoint2 <= i->_timepoint2)
+ {
+ if(p == nullptr)
+ {
+ i->_next_scheduled = n;
+ next_timer_absolute.front = i;
+ }
+ else
+ {
+ i->_next_scheduled = n;
+ p->_next_scheduled = i;
+ }
+ done = true;
+ break;
+ }
+ }
+ if(!done)
+ {
+ next_timer_absolute.back->_next_scheduled = i;
+ i->_next_scheduled = nullptr;
+ next_timer_absolute.back = i;
+ }
+ }
+ next_timer_absolute.lock.unlock();
+ }
+ }
+#endif
+ };
+ struct global_dynamic_thread_pool_impl
+ {
+ using _spinlock_type = QUICKCPPLIB_NAMESPACE::configurable_spinlock::spinlock<unsigned>;
+
+ _spinlock_type workqueue_lock;
+ struct workqueue_guard : std::unique_lock<_spinlock_type>
+ {
+ using std::unique_lock<_spinlock_type>::unique_lock;
+ };
+ std::shared_ptr<global_dynamic_thread_pool_impl_workqueue_item> workqueue;
+#if LLFIO_DYNAMIC_THREAD_POOL_GROUP_USING_GCD
+ using threadh_type = void *;
+ using grouph_type = dispatch_group_t;
+ static void _gcd_dispatch_callback(void *arg)
+ {
+ auto *workitem = (dynamic_thread_pool_group::work_item *) arg;
+ global_dynamic_thread_pool()._workerthread(workitem, nullptr);
+ }
+ static void _gcd_timer_callback(void *arg)
+ {
+ auto *workitem = (dynamic_thread_pool_group::work_item *) arg;
+ global_dynamic_thread_pool()._timerthread(workitem, nullptr);
+ }
+#elif defined(_WIN32)
+ using threadh_type = PTP_CALLBACK_INSTANCE;
+ using grouph_type = PTP_CALLBACK_ENVIRON;
+ static void CALLBACK _win32_worker_thread_callback(threadh_type threadh, PVOID Parameter, PTP_WORK /*unused*/)
+ {
+ auto *workitem = (dynamic_thread_pool_group::work_item *) Parameter;
+ global_dynamic_thread_pool()._workerthread(workitem, threadh);
+ }
+ static void CALLBACK _win32_timer_thread_callback(threadh_type threadh, PVOID Parameter, PTP_TIMER /*unused*/)
+ {
+ auto *workitem = (dynamic_thread_pool_group::work_item *) Parameter;
+ global_dynamic_thread_pool()._timerthread(workitem, threadh);
+ }
+#else
+ global_dynamic_thread_pool_impl_workqueue_item first_execute{(size_t) -1, {}};
+ using threadh_type = void *;
+ using grouph_type = void *;
+ std::mutex threadpool_lock;
+ struct threadpool_guard : std::unique_lock<std::mutex>
+ {
+ using std::unique_lock<std::mutex>::unique_lock;
+ };
+ struct thread_t
+ {
+ thread_t *_prev{nullptr}, *_next{nullptr};
+ std::thread thread;
+ std::condition_variable cond;
+ std::chrono::steady_clock::time_point last_did_work;
+ std::atomic<int> state{0}; // <0 = dead, 0 = sleeping/please die, 1 = busy
+ };
+ struct threads_t
+ {
+ size_t count{0};
+ thread_t *front{nullptr}, *back{nullptr};
+ } threadpool_active, threadpool_sleeping;
+ std::atomic<size_t> total_submitted_workitems{0}, threadpool_threads{0};
+ std::atomic<uint32_t> ms_sleep_for_more_work{20000};
+
+ std::mutex threadmetrics_lock;
+ struct threadmetrics_guard : std::unique_lock<std::mutex>
+ {
+ using std::unique_lock<std::mutex>::unique_lock;
+ };
+ struct threadmetrics_threadid
+ {
+ char text[12]; // enough for a UINT32_MAX in decimal
+ constexpr threadmetrics_threadid()
+ : text{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}
+ {
+ }
+ threadmetrics_threadid(string_view sv)
+ {
+ memset(text, '0', sizeof(text));
+ assert(sv.size() <= sizeof(text));
+ if(sv.size() > sizeof(text))
+ {
+ abort();
+ }
+ memcpy(text + sizeof(text) - sv.size(), sv.data(), sv.size());
+ }
+ int compare(const threadmetrics_threadid &o) const noexcept { return memcmp(text, o.text, sizeof(text)); }
+ bool operator<(const threadmetrics_threadid &o) const noexcept { return compare(o) < 0; }
+ bool operator==(const threadmetrics_threadid &o) const noexcept { return compare(o) == 0; }
+ };
+ struct threadmetrics_item
+ {
+ threadmetrics_item *_prev{nullptr}, *_next{nullptr};
+ std::chrono::steady_clock::time_point last_updated, blocked_since; // latter set if thread seen no time
+ threadmetrics_threadid threadid;
+ uint32_t diskfaults{(uint32_t) -1}, utime{(uint32_t) -1}, stime{(uint32_t) -1}; // culmulative ticks spent in user and system for this thread
+
+ explicit threadmetrics_item(threadmetrics_threadid v)
+ : threadid(v)
+ {
+ }
+ string_view threadid_name() const noexcept { return string_view(threadid.text, sizeof(threadid.text)); }
+ };
+ struct threadmetrics_t
+ {
+ size_t count{0};
+ threadmetrics_item *front{nullptr}, *back{nullptr};
+ uint32_t blocked{0}, running{0};
+ } threadmetrics_queue; // items at front are least recently updated
+ std::vector<threadmetrics_item *> threadmetrics_sorted; // sorted by threadid
+ std::chrono::steady_clock::time_point threadmetrics_last_updated;
+ std::atomic<unsigned> populate_threadmetrics_reentrancy{0};
+#ifdef __linux__
+ std::mutex proc_self_task_fd_lock;
+ int proc_self_task_fd{-1};
+#endif
+#endif
+
+ std::mutex io_aware_work_item_handles_lock;
+ struct io_aware_work_item_handles_guard : std::unique_lock<std::mutex>
+ {
+ using std::unique_lock<std::mutex>::unique_lock;
+ };
+ struct io_aware_work_item_statfs
+ {
+ size_t refcount{0};
+ deadline default_deadline;
+ float average_busy{0}, average_queuedepth{0};
+ std::chrono::steady_clock::time_point last_updated;
+ statfs_t statfs;
+ };
+ std::unordered_map<fs_handle::unique_id_type, io_aware_work_item_statfs, fs_handle::unique_id_type_hasher> io_aware_work_item_handles;
+
+ global_dynamic_thread_pool_impl()
+ {
+#if !LLFIO_DYNAMIC_THREAD_POOL_GROUP_USING_GCD && !defined(_WIN32)
+ populate_threadmetrics(std::chrono::steady_clock::now());
+#endif
+ }
+
+ template <class T, class U> static void _append_to_list(T &what, U *v)
+ {
+ if(what.front == nullptr)
+ {
+ assert(what.back == nullptr);
+ v->_next = v->_prev = nullptr;
+ what.front = what.back = v;
+ }
+ else
+ {
+ v->_next = nullptr;
+ v->_prev = what.back;
+ what.back->_next = v;
+ what.back = v;
+ }
+ what.count++;
+ }
+ template <class T, class U> static void _prepend_to_list(T &what, U *v)
+ {
+ if(what.front == nullptr)
+ {
+ assert(what.back == nullptr);
+ v->_next = v->_prev = nullptr;
+ what.front = what.back = v;
+ }
+ else
+ {
+ v->_prev = nullptr;
+ v->_next = what.front;
+ what.front->_prev = v;
+ what.front = v;
+ }
+ what.count++;
+ }
+ template <class T, class U> static void _remove_from_list(T &what, U *v)
+ {
+ if(v->_prev == nullptr && v->_next == nullptr)
+ {
+ assert(what.front == v);
+ assert(what.back == v);
+ what.front = what.back = nullptr;
+ }
+ else if(v->_prev == nullptr)
+ {
+ assert(what.front == v);
+ v->_next->_prev = nullptr;
+ what.front = v->_next;
+ v->_next = v->_prev = nullptr;
+ }
+ else if(v->_next == nullptr)
+ {
+ assert(what.back == v);
+ v->_prev->_next = nullptr;
+ what.back = v->_prev;
+ v->_next = v->_prev = nullptr;
+ }
+ else
+ {
+ v->_next->_prev = v->_prev;
+ v->_prev->_next = v->_next;
+ v->_next = v->_prev = nullptr;
+ }
+ what.count--;
+ }
+
+#if !LLFIO_DYNAMIC_THREAD_POOL_GROUP_USING_GCD && !defined(_WIN32)
+ inline void _execute_work(thread_t *self);
+
+ void _add_thread(threadpool_guard & /*unused*/)
+ {
+ thread_t *p = nullptr;
+ try
+ {
+ p = new thread_t;
+ _append_to_list(threadpool_active, p);
+ p->thread = std::thread([this, p] { _execute_work(p); });
+ }
+ catch(...)
+ {
+ if(p != nullptr)
+ {
+ _remove_from_list(threadpool_active, p);
+ }
+ // drop failure
+ }
+ }
+
+ bool _remove_thread(threadpool_guard &g, threads_t &which)
+ {
+ if(which.count == 0)
+ {
+ return false;
+ }
+ // Threads which went to sleep the longest ago are at the front
+ auto *t = which.front;
+ if(t->state.load(std::memory_order_acquire) < 0)
+ {
+ // He's already exiting
+ return false;
+ }
+ assert(t->state.load(std::memory_order_acquire) == 0);
+ t->state.fetch_sub(1, std::memory_order_release);
+#if LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING
+ std::cout << "*** DTP " << t << " is told to quit" << std::endl;
+#endif
+ do
+ {
+ g.unlock();
+ t->cond.notify_one();
+ g.lock();
+ } while(t->state.load(std::memory_order_acquire) >= -1);
+#if LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING
+ std::cout << "*** DTP " << t << " has quit, deleting" << std::endl;
+#endif
+ _remove_from_list(threadpool_active, t);
+ t->thread.join();
+ delete t;
+ return true;
+ }
+
+ ~global_dynamic_thread_pool_impl()
+ {
+ {
+ threadpool_guard g(threadpool_lock);
+ while(threadpool_active.count > 0 || threadpool_sleeping.count > 0)
+ {
+ while(threadpool_sleeping.count > 0)
+ {
+ auto removed = _remove_thread(g, threadpool_sleeping);
+ assert(removed);
+ (void) removed;
+ }
+ if(threadpool_active.count > 0)
+ {
+ auto removed = _remove_thread(g, threadpool_active);
+ assert(removed);
+ (void) removed;
+ }
+ }
+ }
+ threadmetrics_guard g(threadmetrics_lock);
+ for(auto *p : threadmetrics_sorted)
+ {
+ delete p;
+ }
+ threadmetrics_sorted.clear();
+ threadmetrics_queue = {};
+#ifdef __linux__
+ if(proc_self_task_fd > 0)
+ {
+ ::close(proc_self_task_fd);
+ proc_self_task_fd = -1;
+ }
+#endif
+ }
+
+#ifdef __linux__
+ // You are guaranteed only one of these EVER executes at a time. Locking is probably overkill, but equally also probably harmless
+ bool update_threadmetrics(threadmetrics_guard &&g, std::chrono::steady_clock::time_point now, threadmetrics_item *new_items)
+ {
+ auto update_item = [&](threadmetrics_item *item) {
+ char path[64] = "/proc/self/task/", *pend = path + 16, *tend = item->threadid.text;
+ while(*tend == '0' && (tend - item->threadid.text) < (ssize_t) sizeof(item->threadid.text))
+ {
+ ++tend;
+ }
+ while((tend - item->threadid.text) < (ssize_t) sizeof(item->threadid.text))
+ {
+ *pend++ = *tend++;
+ }
+ memcpy(pend, "/stat", 6);
+ int fd = ::open(path, O_RDONLY);
+ if(-1 == fd)
+ {
+ // Thread may have exited since we last populated
+ if(item->blocked_since == std::chrono::steady_clock::time_point())
+ {
+ threadmetrics_queue.running--;
+ threadmetrics_queue.blocked++;
+ }
+ item->blocked_since = now;
+ item->last_updated = now;
+ return;
+ }
+ char buffer[1024];
+ auto bytesread = ::read(fd, buffer, sizeof(buffer));
+ ::close(fd);
+ buffer[std::max((size_t) bytesread, sizeof(buffer) - 1)] = 0;
+ char state = 0;
+ unsigned long majflt = 0, utime = 0, stime = 0;
+ sscanf(buffer, "%*d %*s %c %*d %*d %*d %*d %*d %*u %*u %*u %lu %*u %lu %lu", &state, &majflt, &utime, &stime);
+ if(item->utime != (uint32_t) -1 || item->stime != (uint32_t) -1)
+ {
+ if(item->utime == (uint32_t) utime && item->stime == (uint32_t) stime && state != 'R')
+ {
+ // This thread made no progress since last time
+ if(item->blocked_since == std::chrono::steady_clock::time_point())
+ {
+ threadmetrics_queue.running--;
+ threadmetrics_queue.blocked++;
+ item->blocked_since = now;
+ }
+ }
+ else
+ {
+ if(item->blocked_since != std::chrono::steady_clock::time_point())
+ {
+ threadmetrics_queue.running++;
+ threadmetrics_queue.blocked--;
+ item->blocked_since = std::chrono::steady_clock::time_point();
+ }
+ }
+ }
+ // std::cout << "Threadmetrics " << path << " " << state << " " << majflt << " " << utime << " " << stime << ". Previously " << item->diskfaults << " "
+ // << item->utime << " " << item->stime << std::endl;
+ item->diskfaults = (uint32_t) majflt;
+ item->utime = (uint32_t) utime;
+ item->stime = (uint32_t) stime;
+ item->last_updated = now;
+ };
+ if(new_items != nullptr)
+ {
+ for(; new_items != nullptr; new_items = new_items->_next)
+ {
+ update_item(new_items);
+ }
+ return false;
+ }
+ if(threadmetrics_queue.count == 0)
+ {
+ return false;
+ }
+ size_t updated = 0;
+ while(now - threadmetrics_queue.front->last_updated >= std::chrono::milliseconds(200) && updated++ < 4)
+ {
+ auto *p = threadmetrics_queue.front;
+ update_item(p);
+ _remove_from_list(threadmetrics_queue, p);
+ _append_to_list(threadmetrics_queue, p);
+ }
+ // if(updated > 0)
+ {
+ static const auto min_hardware_concurrency = std::thread::hardware_concurrency();
+ static const auto max_hardware_concurrency = min_hardware_concurrency + 3;
+ auto threadmetrics_running = (ssize_t) threadmetrics_queue.running;
+ auto threadmetrics_blocked = (ssize_t) threadmetrics_queue.blocked;
+ g.unlock(); // drop threadmetrics_lock
+
+ threadpool_guard gg(threadpool_lock);
+ // Adjust for the number of threads sleeping for more work
+ threadmetrics_running += threadpool_sleeping.count;
+ threadmetrics_blocked -= threadpool_sleeping.count;
+ if(threadmetrics_blocked < 0)
+ {
+ threadmetrics_blocked = 0;
+ }
+ const auto desired_concurrency = std::min((ssize_t) min_hardware_concurrency, (ssize_t) total_submitted_workitems.load(std::memory_order_relaxed));
+ auto toadd = std::max((ssize_t) 0, std::min(desired_concurrency - threadmetrics_running, desired_concurrency - (ssize_t) threadpool_active.count));
+ auto toremove = std::max((ssize_t) 0, (ssize_t) threadmetrics_running - (ssize_t) max_hardware_concurrency);
+ if(toadd > 0 || toremove > 0)
+ {
+ // std::cout << "total active = " << threadpool_active.count << " total idle = " << threadpool_sleeping.count
+ // << " threadmetrics_running = " << threadmetrics_running << " threadmetrics_blocked = " << threadmetrics_blocked << " toadd = " << toadd
+ // << " toremove = " << toremove << std::endl;
+ if(toadd > 0)
+ {
+ _add_thread(gg);
+ }
+ if(toremove > 0 && threadpool_active.count > 1)
+ {
+ // Kill myself, but not if I'm the final thread who might need to run timers
+ return true;
+ }
+ }
+ }
+ return false;
+ }
+ // Returns true if calling thread is to exit
+ bool populate_threadmetrics(std::chrono::steady_clock::time_point now)
+ {
+ if(populate_threadmetrics_reentrancy.exchange(1, std::memory_order_relaxed) == 1)
+ {
+ return false;
+ }
+ auto unpopulate_threadmetrics_reentrancy = make_scope_exit([this]() noexcept { populate_threadmetrics_reentrancy.store(0, std::memory_order_relaxed); });
+
+ static thread_local std::vector<char> kernelbuffer(1024);
+ static thread_local std::vector<threadmetrics_threadid> threadidsbuffer(1024 / sizeof(dirent));
+ using getdents64_t = int (*)(int, char *, unsigned int);
+ static auto getdents = static_cast<getdents64_t>([](int fd, char *buf, unsigned count) -> int { return syscall(SYS_getdents64, fd, buf, count); });
+ using dirent = dirent64;
+ size_t bytes = 0;
+ {
+ threadmetrics_guard g(threadmetrics_lock);
+ if(now - threadmetrics_last_updated < std::chrono::milliseconds(250) &&
+ threadmetrics_queue.running + threadmetrics_queue.blocked >= threadpool_threads.load(std::memory_order_relaxed))
+ {
+ if(now - threadmetrics_last_updated < std::chrono::milliseconds(100))
+ {
+ return false;
+ }
+ return update_threadmetrics(std::move(g), now, nullptr);
+ }
+ threadmetrics_last_updated = now;
+ if(proc_self_task_fd < 0)
+ {
+ proc_self_task_fd = ::open("/proc/self/task", O_RDONLY | O_DIRECTORY | O_CLOEXEC);
+ if(proc_self_task_fd < 0)
+ {
+ posix_error().throw_exception();
+ }
+ }
+ }
+ {
+ std::lock_guard<std::mutex> g(proc_self_task_fd_lock);
+ /* It turns out that /proc/self/task is quite racy in the Linux kernel, so keep
+ looping this until it stops telling obvious lies.
+ */
+ for(auto done = false; !done;)
+ {
+ if(-1 == ::lseek64(proc_self_task_fd, 0, SEEK_SET))
+ {
+ posix_error().throw_exception();
+ }
+ auto _bytes = getdents(proc_self_task_fd, kernelbuffer.data(), kernelbuffer.size());
+ // std::cout << "getdents(" << (kernelbuffer.size()-bytes) << ") returns " << _bytes << std::endl;
+ if(_bytes < 0 && errno != EINVAL)
+ {
+ posix_error().throw_exception();
+ }
+ if(_bytes >= 0 && kernelbuffer.size() - (size_t) _bytes >= sizeof(dirent) + 16)
+ {
+ bytes = (size_t) _bytes;
+ threadidsbuffer.clear();
+ for(auto *dent = (dirent *) kernelbuffer.data();; dent = reinterpret_cast<dirent *>(reinterpret_cast<uintptr_t>(dent) + dent->d_reclen))
+ {
+ if(dent->d_ino != 0u && dent->d_type == DT_DIR && dent->d_name[0] != '.')
+ {
+ size_t length = strchr(dent->d_name, 0) - dent->d_name;
+ threadidsbuffer.push_back(string_view(dent->d_name, length));
+ }
+ if((bytes -= dent->d_reclen) <= 0)
+ {
+ break;
+ }
+ }
+ auto mythreadcount = threadpool_threads.load(std::memory_order_relaxed);
+ if(threadidsbuffer.size() >= mythreadcount)
+ {
+ // std::cout << "Parsed from /proc " << threadidsbuffer.size() << " entries, should be at least " << mythreadcount << std::endl;
+ std::sort(threadidsbuffer.begin(), threadidsbuffer.end());
+ done = true;
+ break;
+ }
+#ifndef NDEBUG
+ std::cout << "NOTE: /proc returned " << threadidsbuffer.size() << " items when we know for a fact at least " << mythreadcount
+ << " threads exist, retrying!" << std::endl;
+#endif
+ continue;
+ }
+ kernelbuffer.resize(kernelbuffer.size() << 1);
+ }
+ }
+ threadmetrics_item *firstnewitem = nullptr;
+ threadmetrics_guard g(threadmetrics_lock);
+#if 0
+ {
+ std::stringstream s;
+ s << "Parsed from /proc " << threadidsbuffer.size() << " entries (should be at least " << threadpool_threads.load(std::memory_order_relaxed) << "):";
+ for(auto &i : threadidsbuffer)
+ {
+ s << " " << string_view(i.text, 12);
+ }
+ std::cout << s.str() << std::endl;
+ }
+#endif
+#if 0
+ {
+ auto d_it = threadmetrics_sorted.begin();
+ auto s_it = threadidsbuffer.begin();
+ for(; d_it != threadmetrics_sorted.end() && s_it != threadidsbuffer.end(); ++d_it, ++s_it)
+ {
+ std::cout << (*d_it)->threadid_name() << " " << string_view(s_it->text, 12) << "\n";
+ }
+ for(; d_it != threadmetrics_sorted.end(); ++d_it)
+ {
+ std::cout << (*d_it)->threadid_name() << " XXXXXXXXXXXX\n";
+ }
+ for(; s_it != threadidsbuffer.end(); ++s_it)
+ {
+ std::cout << "XXXXXXXXXXXX " << string_view(s_it->text, 12) << "\n";
+ }
+ std::cout << std::flush;
+ }
+#endif
+ auto d_it = threadmetrics_sorted.begin();
+ auto s_it = threadidsbuffer.begin();
+ auto remove_item = [&] {
+ // std::cout << "Removing thread metrics for " << (*d_it)->threadid_name() << std::endl;
+ if((*d_it)->blocked_since != std::chrono::steady_clock::time_point())
+ {
+ threadmetrics_queue.blocked--;
+ }
+ else
+ {
+ threadmetrics_queue.running--;
+ }
+ _remove_from_list(threadmetrics_queue, *d_it);
+ delete *d_it;
+ d_it = threadmetrics_sorted.erase(d_it);
+ };
+ auto add_item = [&] {
+ auto p = std::make_unique<threadmetrics_item>(*s_it);
+ d_it = threadmetrics_sorted.insert(d_it, p.get());
+ _append_to_list(threadmetrics_queue, p.get());
+ // std::cout << "Adding thread metrics for " << p->threadid_name() << std::endl;
+ if(firstnewitem == nullptr)
+ {
+ firstnewitem = p.get();
+ }
+ p.release();
+ threadmetrics_queue.running++;
+ };
+ // std::cout << "Compare" << std::endl;
+ for(; d_it != threadmetrics_sorted.end() && s_it != threadidsbuffer.end();)
+ {
+ auto c = (*d_it)->threadid.compare(*s_it);
+ // std::cout << "Comparing " << (*d_it)->threadid_name() << " with " << string_view(s_it->text, 12) << " = " << c << std::endl;
+ if(0 == c)
+ {
+ ++d_it;
+ ++s_it;
+ continue;
+ }
+ if(c < 0)
+ {
+ // d_it has gone away
+ remove_item();
+ }
+ if(c > 0)
+ {
+ // s_it is a new entry
+ add_item();
+ }
+ }
+ // std::cout << "Tail dest" << std::endl;
+ while(d_it != threadmetrics_sorted.end())
+ {
+ remove_item();
+ }
+ // std::cout << "Tail source" << std::endl;
+ while(s_it != threadidsbuffer.end())
+ {
+ add_item();
+ ++d_it;
+ ++s_it;
+ }
+ assert(threadmetrics_sorted.size() == threadidsbuffer.size());
+#if 0
+ if(!std::is_sorted(threadmetrics_sorted.begin(), threadmetrics_sorted.end(),
+ [](threadmetrics_item *a, threadmetrics_item *b) { return a->threadid < b->threadid; }))
+ {
+ std::cout << "Threadmetrics:";
+ for(auto *p : threadmetrics_sorted)
+ {
+ std::cout << "\n " << p->threadid_name();
+ }
+ std::cout << std::endl;
+ abort();
+ }
+#endif
+ assert(threadmetrics_queue.running + threadmetrics_queue.blocked == threadidsbuffer.size());
+ return update_threadmetrics(std::move(g), now, firstnewitem);
+ }
+#endif
+#endif
+
+ result<void> _prepare_work_item_delay(dynamic_thread_pool_group::work_item *workitem, grouph_type grouph, deadline d)
+ {
+ (void) grouph;
+ if(!d)
+ {
+ return errc::invalid_argument;
+ }
+ if(workitem->_nextwork.load(std::memory_order_acquire) == 0 || d.nsecs > 0)
+ {
+ if(d.nsecs > 0)
+ {
+ if(d.steady)
+ {
+ workitem->_timepoint1 = std::chrono::steady_clock::now() + std::chrono::nanoseconds(d.nsecs);
+ workitem->_timepoint2 = {};
+ }
+ else
+ {
+ workitem->_timepoint1 = {};
+ workitem->_timepoint2 = d.to_time_point();
+ }
+ }
+ else
+ {
+ workitem->_timepoint1 = std::chrono::steady_clock::time_point(std::chrono::steady_clock::duration(1));
+ workitem->_timepoint2 = {};
+ }
+ assert(workitem->_has_timer_set());
+#if defined(_WIN32)
+ if(nullptr == workitem->_internaltimerh)
+ {
+ workitem->_internaltimerh = CreateThreadpoolTimer(_win32_timer_thread_callback, workitem, grouph);
+ if(nullptr == workitem->_internaltimerh)
+ {
+ return win32_error();
+ }
+ }
+#endif
+ }
+ else
+ {
+ if(workitem->_timepoint1 != std::chrono::steady_clock::time_point())
+ {
+ workitem->_timepoint1 = {};
+ }
+ if(workitem->_timepoint2 != std::chrono::system_clock::time_point())
+ {
+ workitem->_timepoint2 = {};
+ }
+ assert(!workitem->_has_timer_set());
+ }
+ return success();
+ }
+
+ inline void _submit_work_item(bool submit_into_highest_priority, dynamic_thread_pool_group::work_item *workitem, bool defer_pool_wake);
+
+ inline result<void> submit(dynamic_thread_pool_group_impl_guard &g, dynamic_thread_pool_group_impl *group,
+ span<dynamic_thread_pool_group::work_item *> work) noexcept;
+
+ inline void _work_item_done(dynamic_thread_pool_group_impl_guard &g, dynamic_thread_pool_group::work_item *i) noexcept;
+
+ inline result<void> stop(dynamic_thread_pool_group_impl_guard &g, dynamic_thread_pool_group_impl *group, result<void> err) noexcept;
+ inline result<void> wait(dynamic_thread_pool_group_impl_guard &g, bool reap, dynamic_thread_pool_group_impl *group, deadline d) noexcept;
+
+ inline void _timerthread(dynamic_thread_pool_group::work_item *workitem, threadh_type selfthreadh);
+ inline void _workerthread(dynamic_thread_pool_group::work_item *workitem, threadh_type selfthreadh);
+ };
+ struct global_dynamic_thread_pool_impl_thread_local_state_t
+ {
+ dynamic_thread_pool_group::work_item *workitem{nullptr};
+ global_dynamic_thread_pool_impl::threadh_type current_callback_instance{nullptr};
+ size_t nesting_level{0};
+ };
+ LLFIO_HEADERS_ONLY_FUNC_SPEC global_dynamic_thread_pool_impl_thread_local_state_t &global_dynamic_thread_pool_thread_local_state() noexcept
+ {
+ static thread_local global_dynamic_thread_pool_impl_thread_local_state_t tls;
+ return tls;
+ }
+
+ LLFIO_HEADERS_ONLY_FUNC_SPEC global_dynamic_thread_pool_impl &global_dynamic_thread_pool() noexcept
+ {
+ static global_dynamic_thread_pool_impl impl;
+ return impl;
+ }
+} // namespace detail
+
+
+LLFIO_HEADERS_ONLY_MEMFUNC_SPEC const char *dynamic_thread_pool_group::implementation_description() noexcept
+{
+#if LLFIO_DYNAMIC_THREAD_POOL_GROUP_USING_GCD
+ return "Grand Central Dispatch";
+#elif defined(_WIN32)
+ return "Win32 thread pool (Vista+)";
+#elif defined(__linux__)
+ return "Linux native";
+#else
+#error Unknown platform
+#endif
+}
+
+class dynamic_thread_pool_group_impl final : public dynamic_thread_pool_group
+{
+ friend struct detail::global_dynamic_thread_pool_impl;
+
+ mutable std::mutex _lock;
+ size_t _nesting_level{0};
+ struct workitems_t
+ {
+ size_t count{0};
+ dynamic_thread_pool_group::work_item *front{nullptr}, *back{nullptr};
+ } _work_items_active, _work_items_done, _work_items_delayed;
+ std::atomic<bool> _stopping{false}, _stopped{true}, _completing{false};
+ std::atomic<int> _waits{0};
+ result<void> _abnormal_completion_cause{success()}; // The cause of any abnormal group completion
+
+#if LLFIO_DYNAMIC_THREAD_POOL_GROUP_USING_GCD
+ dispatch_group_t _grouph;
+#elif defined(_WIN32)
+ TP_CALLBACK_ENVIRON _callbackenviron;
+ PTP_CALLBACK_ENVIRON _grouph{&_callbackenviron};
+#else
+ void *_grouph{nullptr};
+ size_t _newly_added_active_work_items{0};
+ size_t _timer_work_items_remaining{0};
+ size_t _active_work_items_remaining{0};
+#endif
+
+public:
+ result<void> init()
+ {
+ LLFIO_LOG_FUNCTION_CALL(this);
+ try
+ {
+ auto &impl = detail::global_dynamic_thread_pool();
+ _nesting_level = detail::global_dynamic_thread_pool_thread_local_state().nesting_level;
+#if LLFIO_DYNAMIC_THREAD_POOL_GROUP_USING_GCD
+ _grouph = dispatch_group_create();
+ if(_grouph == nullptr)
+ {
+ return errc::not_enough_memory;
+ }
+#elif defined(_WIN32)
+ InitializeThreadpoolEnvironment(_grouph);
+#endif
+ detail::global_dynamic_thread_pool_impl::workqueue_guard g(impl.workqueue_lock);
+ // Append this group to the global work queue at its nesting level
+ if(!impl.workqueue || impl.workqueue->nesting_level <= _nesting_level)
+ {
+ impl.workqueue = std::make_shared<detail::global_dynamic_thread_pool_impl_workqueue_item>(_nesting_level, std::move(impl.workqueue));
+ }
+ impl.workqueue->items.insert(this);
+ return success();
+ }
+ catch(...)
+ {
+ return error_from_exception();
+ }
+ }
+
+ virtual ~dynamic_thread_pool_group_impl()
+ {
+ LLFIO_LOG_FUNCTION_CALL(this);
+ (void) wait();
+ auto &impl = detail::global_dynamic_thread_pool();
+ // detail::dynamic_thread_pool_group_impl_guard g1(_lock);
+#if LLFIO_DYNAMIC_THREAD_POOL_GROUP_USING_GCD
+ if(nullptr != _grouph)
+ {
+ dispatch_release(_grouph);
+ _grouph = nullptr;
+ }
+#elif defined(_WIN32)
+ if(nullptr != _grouph)
+ {
+ DestroyThreadpoolEnvironment(_grouph);
+ _grouph = nullptr;
+ }
+#endif
+ detail::global_dynamic_thread_pool_impl::workqueue_guard g2(impl.workqueue_lock);
+ assert(impl.workqueue->nesting_level >= _nesting_level);
+ for(auto *p = impl.workqueue.get(); p != nullptr; p = p->next.get())
+ {
+ if(p->nesting_level == _nesting_level)
+ {
+ p->items.erase(this);
+ break;
+ }
+ }
+ while(impl.workqueue && impl.workqueue->items.empty())
+ {
+ impl.workqueue = std::move(impl.workqueue->next);
+ }
+ }
+
+ virtual result<void> submit(span<work_item *> work) noexcept override
+ {
+ LLFIO_LOG_FUNCTION_CALL(this);
+ if(_stopping.load(std::memory_order_relaxed))
+ {
+ return errc::operation_canceled;
+ }
+ if(_completing.load(std::memory_order_relaxed))
+ {
+ for(auto *i : work)
+ {
+ i->_parent.store(this, std::memory_order_release);
+ detail::global_dynamic_thread_pool_impl::_append_to_list(_work_items_delayed, i);
+ }
+ return success();
+ }
+ _stopped.store(false, std::memory_order_release);
+ auto &impl = detail::global_dynamic_thread_pool();
+ detail::dynamic_thread_pool_group_impl_guard g(_lock); // lock group
+ if(_work_items_active.count == 0 && _work_items_done.count == 0)
+ {
+ _abnormal_completion_cause = success();
+ }
+ OUTCOME_TRY(impl.submit(g, this, work));
+ if(_work_items_active.count == 0)
+ {
+ _stopped.store(true, std::memory_order_release);
+ }
+ return success();
+ }
+
+ virtual result<void> stop() noexcept override
+ {
+ LLFIO_LOG_FUNCTION_CALL(this);
+ if(_stopped.load(std::memory_order_relaxed))
+ {
+ return success();
+ }
+ auto &impl = detail::global_dynamic_thread_pool();
+ detail::dynamic_thread_pool_group_impl_guard g(_lock); // lock group
+ return impl.stop(g, this, errc::operation_canceled);
+ }
+
+ virtual bool stopping() const noexcept override { return _stopping.load(std::memory_order_relaxed); }
+
+ virtual bool stopped() const noexcept override { return _stopped.load(std::memory_order_relaxed); }
+
+ virtual result<void> wait(deadline d = {}) const noexcept override
+ {
+ LLFIO_LOG_FUNCTION_CALL(this);
+ if(_stopped.load(std::memory_order_relaxed))
+ {
+ return success();
+ }
+ auto &impl = detail::global_dynamic_thread_pool();
+ detail::dynamic_thread_pool_group_impl_guard g(_lock); // lock group
+ return impl.wait(g, true, const_cast<dynamic_thread_pool_group_impl *>(this), d);
+ }
+};
+
+LLFIO_HEADERS_ONLY_MEMFUNC_SPEC size_t dynamic_thread_pool_group::current_nesting_level() noexcept
+{
+ return detail::global_dynamic_thread_pool_thread_local_state().nesting_level;
+}
+
+LLFIO_HEADERS_ONLY_MEMFUNC_SPEC dynamic_thread_pool_group::work_item *dynamic_thread_pool_group::current_work_item() noexcept
+{
+ return detail::global_dynamic_thread_pool_thread_local_state().workitem;
+}
+
+LLFIO_HEADERS_ONLY_MEMFUNC_SPEC uint32_t dynamic_thread_pool_group::ms_sleep_for_more_work() noexcept
+{
+#if !LLFIO_DYNAMIC_THREAD_POOL_GROUP_USING_GCD && !defined(_WIN32)
+ return detail::global_dynamic_thread_pool().ms_sleep_for_more_work.load(std::memory_order_relaxed);
+#else
+ return 0;
+#endif
+}
+
+LLFIO_HEADERS_ONLY_MEMFUNC_SPEC uint32_t dynamic_thread_pool_group::ms_sleep_for_more_work(uint32_t v) noexcept
+{
+#if !LLFIO_DYNAMIC_THREAD_POOL_GROUP_USING_GCD && !defined(_WIN32)
+ if(0 == v)
+ {
+ v = 1;
+ }
+ detail::global_dynamic_thread_pool().ms_sleep_for_more_work.store(v, std::memory_order_relaxed);
+ return v;
+#else
+ (void) v;
+ return 0;
+#endif
+}
+
+LLFIO_HEADERS_ONLY_FUNC_SPEC result<dynamic_thread_pool_group_ptr> make_dynamic_thread_pool_group() noexcept
+{
+ try
+ {
+ auto ret = std::make_unique<dynamic_thread_pool_group_impl>();
+ OUTCOME_TRY(ret->init());
+ return dynamic_thread_pool_group_ptr(std::move(ret));
+ }
+ catch(...)
+ {
+ return error_from_exception();
+ }
+}
+
+namespace detail
+{
+#if !LLFIO_DYNAMIC_THREAD_POOL_GROUP_USING_GCD && !defined(_WIN32)
+ inline void global_dynamic_thread_pool_impl::_execute_work(thread_t *self)
+ {
+ pthread_setname_np(pthread_self(), "LLFIO DYN TPG");
+ self->last_did_work = std::chrono::steady_clock::now();
+ self->state.fetch_add(1, std::memory_order_release); // busy
+ const unsigned mythreadidx = threadpool_threads.fetch_add(1, std::memory_order_release);
+#if LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING
+ std::cout << "*** DTP " << self << " begins." << std::endl;
+#endif
+ while(self->state.load(std::memory_order_relaxed) > 0)
+ {
+ dynamic_thread_pool_group::work_item *workitem = nullptr;
+ bool workitem_is_timer = false;
+ std::chrono::steady_clock::time_point now_steady, earliest_duration;
+ std::chrono::system_clock::time_point now_system, earliest_absolute;
+ // Start from highest priority work group, executing any timers due before selecting a work item
+ {
+ auto examine_wq = [&](global_dynamic_thread_pool_impl_workqueue_item &wq) -> dynamic_thread_pool_group::work_item * {
+ if(wq.next_timer_relative.count.load(std::memory_order_relaxed) > 0)
+ {
+ if(now_steady == std::chrono::steady_clock::time_point())
+ {
+ now_steady = std::chrono::steady_clock::now();
+ }
+ wq.next_timer_relative.lock.lock();
+ if(wq.next_timer_relative.front != nullptr)
+ {
+ if(wq.next_timer_relative.front->_timepoint1 <= now_steady)
+ {
+ workitem = wq.next_timer<1>(); // unlocks wq.next_timer_relative.lock
+ workitem_is_timer = true;
+ return workitem;
+ }
+ if(earliest_duration == std::chrono::steady_clock::time_point() || wq.next_timer_relative.front->_timepoint1 < earliest_duration)
+ {
+ earliest_duration = wq.next_timer_relative.front->_timepoint1;
+ }
+ }
+ wq.next_timer_relative.lock.unlock();
+ }
+ if(wq.next_timer_absolute.count.load(std::memory_order_relaxed) > 0)
+ {
+ if(now_system == std::chrono::system_clock::time_point())
+ {
+ now_system = std::chrono::system_clock::now();
+ }
+ wq.next_timer_absolute.lock.lock();
+ if(wq.next_timer_absolute.front != nullptr)
+ {
+ if(wq.next_timer_absolute.front->_timepoint2 <= now_system)
+ {
+ workitem = wq.next_timer<2>(); // unlocks wq.next_timer_absolute.lock
+ workitem_is_timer = true;
+ return workitem;
+ }
+ if(earliest_absolute == std::chrono::system_clock::time_point() || wq.next_timer_absolute.front->_timepoint2 < earliest_absolute)
+ {
+ earliest_absolute = wq.next_timer_absolute.front->_timepoint2;
+ }
+ }
+ wq.next_timer_absolute.lock.unlock();
+ }
+ unsigned count = 0;
+ return wq.next_active(count, mythreadidx);
+ };
+ workitem = examine_wq(first_execute);
+ if(workitem == nullptr)
+ {
+ workqueue_lock.lock();
+ auto lock_wq = workqueue; // take shared_ptr to highest priority collection of work groups
+ workqueue_lock.unlock();
+ while(lock_wq)
+ {
+ workitem = examine_wq(*lock_wq);
+ if(workitem != nullptr)
+ {
+ // workqueue_lock.lock();
+ // std::cout << "workitem = " << workitem << " nesting_level = " << wq.nesting_level << " count = " << count << std::endl;
+ // workqueue_lock.unlock();
+ break;
+ }
+ workqueue_lock.lock();
+ lock_wq = lock_wq->next;
+ workqueue_lock.unlock();
+ }
+ }
+ }
+ if(now_steady == std::chrono::steady_clock::time_point())
+ {
+ now_steady = std::chrono::steady_clock::now();
+ }
+ // If there are no timers, and no work to do, time to either die or sleep
+ if(workitem == nullptr)
+ {
+ const std::chrono::steady_clock::duration max_sleep(std::chrono::milliseconds(ms_sleep_for_more_work.load(std::memory_order_relaxed)));
+ if(now_steady - self->last_did_work >= max_sleep)
+ {
+ threadpool_guard g(threadpool_lock);
+ // If there are any timers running, leave at least one worker thread
+ if(threadpool_active.count > 1 ||
+ (earliest_duration == std::chrono::steady_clock::time_point() && earliest_absolute == std::chrono::system_clock::time_point()))
+ {
+ _remove_from_list(threadpool_active, self);
+ threadpool_threads.fetch_sub(1, std::memory_order_release);
+#if LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING
+ std::cout << "*** DTP " << self << " exits due to no new work for ms_sleep_for_more_work" << std::endl;
+#endif
+ self->thread.detach();
+ delete self;
+ return;
+ }
+ }
+ std::chrono::steady_clock::duration duration(max_sleep);
+ if(earliest_duration != std::chrono::steady_clock::time_point())
+ {
+ if(now_steady - earliest_duration < duration)
+ {
+ duration = now_steady - earliest_duration;
+ }
+ }
+ else if(earliest_absolute != std::chrono::system_clock::time_point())
+ {
+ if(now_system == std::chrono::system_clock::time_point())
+ {
+ now_system = std::chrono::system_clock::now();
+ }
+ auto diff = now_system - earliest_absolute;
+ if(diff > duration)
+ {
+ earliest_absolute = {};
+ }
+ }
+ threadpool_guard g(threadpool_lock);
+ _remove_from_list(threadpool_active, self);
+ _append_to_list(threadpool_sleeping, self);
+ self->state.fetch_sub(1, std::memory_order_release);
+ if(earliest_absolute != std::chrono::system_clock::time_point())
+ {
+#if LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING
+ std::cout << "*** DTP " << self << " goes to sleep absolute" << std::endl;
+#endif
+ self->cond.wait_until(g, earliest_absolute);
+ }
+ else
+ {
+#if LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING
+ std::cout << "*** DTP " << self << " goes to sleep for " << std::chrono::duration_cast<std::chrono::milliseconds>(duration).count() << std::endl;
+#endif
+ self->cond.wait_for(g, duration);
+ }
+ self->state.fetch_add(1, std::memory_order_release);
+ _remove_from_list(threadpool_sleeping, self);
+ _append_to_list(threadpool_active, self);
+#if LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING
+ std::cout << "*** DTP " << self << " wakes, state = " << self->state << std::endl;
+#endif
+ g.unlock();
+ try
+ {
+ populate_threadmetrics(now_steady);
+ }
+ catch(...)
+ {
+ }
+ continue;
+ }
+ self->last_did_work = now_steady;
+#if LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING
+ std::cout << "*** DTP " << self << " executes work item " << workitem << std::endl;
+#endif
+ total_submitted_workitems.fetch_sub(1, std::memory_order_relaxed);
+ if(workitem_is_timer)
+ {
+ _timerthread(workitem, nullptr);
+ }
+ else
+ {
+ _workerthread(workitem, nullptr);
+ }
+ // workitem->_internalworkh should be null, however workitem may also no longer exist
+ try
+ {
+ if(populate_threadmetrics(now_steady))
+ {
+ threadpool_guard g(threadpool_lock);
+ _remove_from_list(threadpool_active, self);
+ threadpool_threads.fetch_sub(1, std::memory_order_release);
+#if LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING
+ std::cout << "*** DTP " << self << " exits due to threadmetrics saying we exceed max concurrency" << std::endl;
+#endif
+ self->thread.detach();
+ delete self;
+ return;
+ }
+ }
+ catch(...)
+ {
+ }
+ }
+ self->state.fetch_sub(2, std::memory_order_release); // dead
+ threadpool_threads.fetch_sub(1, std::memory_order_release);
+#if LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING
+ std::cout << "*** DTP " << self << " exits due to state request, state = " << self->state << std::endl;
+#endif
+ }
+#endif
+
+ inline void global_dynamic_thread_pool_impl::_submit_work_item(bool submit_into_highest_priority, dynamic_thread_pool_group::work_item *workitem,
+ bool defer_pool_wake)
+ {
+ (void) submit_into_highest_priority;
+ (void) defer_pool_wake;
+ const auto nextwork = workitem->_nextwork.load(std::memory_order_acquire);
+ if(nextwork != -1)
+ {
+ auto *parent = workitem->_parent.load(std::memory_order_relaxed);
+ // If no work item for now, or there is a delay, schedule a timer
+ if(nextwork == 0 || workitem->_has_timer_set())
+ {
+#if LLFIO_DYNAMIC_THREAD_POOL_GROUP_USING_GCD
+ dispatch_time_t when;
+ if(workitem->_has_timer_set_relative())
+ {
+ // Special constant for immediately rescheduled work items
+ if(workitem->_timepoint1 == std::chrono::steady_clock::time_point(std::chrono::steady_clock::duration(1)))
+ {
+ when = dispatch_time(DISPATCH_TIME_NOW, 0);
+ }
+ else
+ {
+ auto duration = std::chrono::duration_cast<std::chrono::nanoseconds>(workitem->_timepoint1 - std::chrono::steady_clock::now()).count();
+ if(duration > 1000000000LL)
+ {
+ // Because GCD has no way of cancelling timers, nor assigning them to a group,
+ // we clamp the timer to 1 second. Then if cancellation is ever done to the group,
+ // the worst possible wait is 1 second. _timerthread will reschedule the timer
+ // if it gets called short.
+ duration = 1000000000LL;
+ }
+ when = dispatch_time(DISPATCH_TIME_NOW, duration);
+ }
+ }
+ else if(workitem->_has_timer_set_absolute())
+ {
+ deadline d(workitem->_timepoint2);
+ auto now = std::chrono::system_clock::now();
+ if(workitem->_timepoint2 - now > std::chrono::seconds(1))
+ {
+ d = now + std::chrono::seconds(1);
+ }
+ when = dispatch_walltime(&d.utc, 0);
+ }
+ else
+ {
+ when = dispatch_time(DISPATCH_TIME_NOW, 1); // smallest possible non immediate duration from now
+ }
+ // std::cout << "*** timer " << workitem << std::endl;
+ dispatch_after_f(when, dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_HIGH, 0), workitem, _gcd_timer_callback);
+#elif defined(_WIN32)
+ LARGE_INTEGER li;
+ DWORD slop = 1000;
+ if(workitem->_has_timer_set_relative())
+ {
+ // Special constant for immediately rescheduled work items
+ if(workitem->_timepoint1 == std::chrono::steady_clock::time_point(std::chrono::steady_clock::duration(1)))
+ {
+ li.QuadPart = -1; // smallest possible non immediate duration from now
+ }
+ else
+ {
+ li.QuadPart = std::chrono::duration_cast<std::chrono::nanoseconds>(workitem->_timepoint1 - std::chrono::steady_clock::now()).count() / 100;
+ if(li.QuadPart < 0)
+ {
+ li.QuadPart = 0;
+ }
+ if(li.QuadPart / 8 < (int64_t) slop)
+ {
+ slop = (DWORD)(li.QuadPart / 8);
+ }
+ li.QuadPart = -li.QuadPart; // negative is relative
+ }
+ }
+ else if(workitem->_has_timer_set_absolute())
+ {
+ li = windows_nt_kernel::from_timepoint(workitem->_timepoint2);
+ }
+ else
+ {
+ li.QuadPart = -1; // smallest possible non immediate duration from now
+ }
+ FILETIME ft;
+ ft.dwHighDateTime = (DWORD) li.HighPart;
+ ft.dwLowDateTime = li.LowPart;
+ // std::cout << "*** timer " << workitem << std::endl;
+ SetThreadpoolTimer((PTP_TIMER) workitem->_internaltimerh, &ft, 0, slop);
+#else
+ workqueue_guard gg(workqueue_lock);
+ for(auto *p = workqueue.get(); p != nullptr; p = p->next.get())
+ {
+ if(p->nesting_level == parent->_nesting_level)
+ {
+ p->append_timer(workitem);
+ break;
+ }
+ }
+#endif
+ }
+ else
+ {
+#if LLFIO_DYNAMIC_THREAD_POOL_GROUP_USING_GCD
+ intptr_t priority = DISPATCH_QUEUE_PRIORITY_LOW;
+ {
+ global_dynamic_thread_pool_impl::workqueue_guard gg(workqueue_lock);
+ if(workqueue->nesting_level == parent->_nesting_level)
+ {
+ priority = DISPATCH_QUEUE_PRIORITY_HIGH;
+ }
+ else if(workqueue->nesting_level == parent->_nesting_level + 1)
+ {
+ priority = DISPATCH_QUEUE_PRIORITY_DEFAULT;
+ }
+ }
+ // std::cout << "*** submit " << workitem << std::endl;
+ dispatch_group_async_f(parent->_grouph, dispatch_get_global_queue(priority, 0), workitem, _gcd_dispatch_callback);
+#elif defined(_WIN32)
+ // Set the priority of the group according to distance from the top
+ TP_CALLBACK_PRIORITY priority = TP_CALLBACK_PRIORITY_LOW;
+ {
+ global_dynamic_thread_pool_impl::workqueue_guard gg(workqueue_lock);
+ if(workqueue->nesting_level == parent->_nesting_level)
+ {
+ priority = TP_CALLBACK_PRIORITY_HIGH;
+ }
+ else if(workqueue->nesting_level == parent->_nesting_level + 1)
+ {
+ priority = TP_CALLBACK_PRIORITY_NORMAL;
+ }
+ }
+ SetThreadpoolCallbackPriority(parent->_grouph, priority);
+ // std::cout << "*** submit " << workitem << std::endl;
+ SubmitThreadpoolWork((PTP_WORK) workitem->_internalworkh);
+#else
+ global_dynamic_thread_pool_impl::workqueue_guard gg(workqueue_lock);
+ if(submit_into_highest_priority)
+ {
+ // TODO: It would be super nice if we prepended this instead if it came from a timer
+ first_execute.append_active(workitem);
+ // std::cout << "append_active _nesting_level = " << parent->_nesting_level << std::endl;
+ }
+ else
+ {
+ for(auto *p = workqueue.get(); p != nullptr; p = p->next.get())
+ {
+ if(p->nesting_level == parent->_nesting_level)
+ {
+ // TODO: It would be super nice if we prepended this instead if it came from a timer
+ p->append_active(workitem);
+ // std::cout << "append_active _nesting_level = " << parent->_nesting_level << std::endl;
+ break;
+ }
+ }
+ }
+#endif
+ }
+
+#if !LLFIO_DYNAMIC_THREAD_POOL_GROUP_USING_GCD && !defined(_WIN32)
+#if LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING
+ std::cout << "*** DTP submits work item " << workitem << std::endl;
+#endif
+ const auto active_work_items = total_submitted_workitems.fetch_add(1, std::memory_order_relaxed) + 1;
+ if(!defer_pool_wake)
+ {
+ {
+ threadpool_guard gg(threadpool_lock);
+ if(threadpool_active.count == 0 && threadpool_sleeping.count == 0)
+ {
+ _add_thread(gg);
+ }
+ else if(threadpool_sleeping.count > 0 && active_work_items > threadpool_active.count)
+ {
+ // Try to wake the most recently slept first
+ auto *t = threadpool_sleeping.back;
+ auto now = std::chrono::steady_clock::now();
+ for(size_t n = std::min(active_work_items - threadpool_active.count, threadpool_sleeping.count); n > 0; n--)
+ {
+ t->last_did_work = now; // prevent reap
+ t->cond.notify_one();
+ t = t->_prev;
+ }
+ }
+ }
+ }
+#endif
+ }
+ }
+
+ inline result<void> global_dynamic_thread_pool_impl::submit(dynamic_thread_pool_group_impl_guard &g, dynamic_thread_pool_group_impl *group,
+ span<dynamic_thread_pool_group::work_item *> work) noexcept
+ {
+ try
+ {
+ if(work.empty())
+ {
+ return success();
+ }
+ for(auto *i : work)
+ {
+ if(i->_parent.load(std::memory_order_relaxed) != nullptr)
+ {
+ return errc::address_in_use;
+ }
+ }
+ auto uninit = make_scope_exit([&]() noexcept {
+ for(auto *i : work)
+ {
+ _remove_from_list(group->_work_items_active, i);
+#if defined(_WIN32)
+ if(nullptr != i->_internaltimerh)
+ {
+ CloseThreadpoolTimer((PTP_TIMER) i->_internaltimerh);
+ i->_internaltimerh = nullptr;
+ }
+ if(nullptr != i->_internalworkh)
+ {
+ CloseThreadpoolWork((PTP_WORK) i->_internalworkh);
+ i->_internalworkh = nullptr;
+ }
+#endif
+ i->_parent.store(nullptr, std::memory_order_release);
+ }
+ });
+ for(auto *i : work)
+ {
+ deadline d(std::chrono::seconds(0));
+ i->_parent.store(group, std::memory_order_release);
+ i->_nextwork.store(i->next(d), std::memory_order_release);
+ if(-1 == i->_nextwork.load(std::memory_order_acquire))
+ {
+ _append_to_list(group->_work_items_done, i);
+ }
+ else
+ {
+#if defined(_WIN32)
+ i->_internalworkh = CreateThreadpoolWork(_win32_worker_thread_callback, i, group->_grouph);
+ if(nullptr == i->_internalworkh)
+ {
+ return win32_error();
+ }
+#endif
+ OUTCOME_TRY(_prepare_work_item_delay(i, group->_grouph, d));
+ _prepend_to_list(group->_work_items_active, i);
+ }
+ }
+ uninit.release();
+ g.unlock();
+ {
+ for(auto *i : work)
+ {
+#if !LLFIO_DYNAMIC_THREAD_POOL_GROUP_USING_GCD && !defined(_WIN32)
+ group->_newly_added_active_work_items++;
+ group->_active_work_items_remaining++;
+#endif
+ _submit_work_item(true, i, i != work.back());
+ }
+ }
+ g.lock();
+ return success();
+ }
+ catch(...)
+ {
+ return error_from_exception();
+ }
+ }
+
+ inline void global_dynamic_thread_pool_impl::_work_item_done(dynamic_thread_pool_group_impl_guard &g, dynamic_thread_pool_group::work_item *i) noexcept
+ {
+ (void) g;
+ // std::cout << "*** _work_item_done " << i << std::endl;
+ auto *parent = i->_parent.load(std::memory_order_relaxed);
+ _remove_from_list(parent->_work_items_active, i);
+ _append_to_list(parent->_work_items_done, i);
+#if defined(_WIN32)
+ if(i->_internalworkh_inuse > 0)
+ {
+ i->_internalworkh_inuse = 2;
+ }
+ else
+ {
+ if(i->_internaltimerh != nullptr)
+ {
+ CloseThreadpoolTimer((PTP_TIMER) i->_internaltimerh);
+ i->_internaltimerh = nullptr;
+ }
+ if(i->_internalworkh != nullptr)
+ {
+ CloseThreadpoolWork((PTP_WORK) i->_internalworkh);
+ i->_internalworkh = nullptr;
+ }
+ }
+#endif
+ if(parent->_work_items_active.count == 0)
+ {
+ i = nullptr;
+ auto *v = parent->_work_items_done.front, *n = v;
+ for(; v != nullptr; v = n)
+ {
+ v->_parent.store(nullptr, std::memory_order_release);
+ v->_nextwork.store(-1, std::memory_order_release);
+ n = v->_next;
+ }
+ n = v = parent->_work_items_done.front;
+ parent->_work_items_done.front = parent->_work_items_done.back = nullptr;
+ parent->_work_items_done.count = 0;
+ parent->_stopping.store(false, std::memory_order_release);
+ parent->_completing.store(true, std::memory_order_release); // cause submissions to enter _work_items_delayed
+#if LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING
+ std::cout << "*** DTP executes group_complete for group " << parent << std::endl;
+#endif
+ for(; v != nullptr; v = n)
+ {
+ n = v->_next;
+ v->group_complete(parent->_abnormal_completion_cause);
+ }
+ parent->_stopped.store(true, std::memory_order_release);
+ parent->_completing.store(false, std::memory_order_release); // cease submitting to _work_items_delayed
+#if LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING
+ std::cout << "*** DTP group_complete done for group " << parent << ". _work_items_delayed.count = " << parent->_work_items_delayed.count << std::endl;
+#endif
+ if(parent->_work_items_delayed.count > 0)
+ {
+ /* If there are waits on this group to complete, forward progress those now.
+ */
+ while(parent->_waits.load(std::memory_order_relaxed) > 0)
+ {
+#if LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING
+ std::cout << "*** DTP group_complete blocks on waits for group " << parent << std::endl;
+#endif
+ g.unlock();
+ std::this_thread::sleep_for(std::chrono::milliseconds(1));
+ g.lock();
+ }
+ // Now submit all delayed work
+ while(parent->_work_items_delayed.count > 0)
+ {
+ i = parent->_work_items_delayed.front;
+ _remove_from_list(parent->_work_items_delayed, i);
+ auto r = submit(g, parent, {&i, 1});
+#if LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING
+ std::cout << "*** DTP group_complete submits delayed work item " << i << " for group " << parent << " which saw error ";
+ if(r)
+ {
+ std::cout << "none" << std::endl;
+ }
+ else
+ {
+ std::cout << r.error().message() << std::endl;
+ }
+#endif
+ if(!r)
+ {
+ parent->_work_items_delayed = {};
+ (void) stop(g, parent, std::move(r));
+ break;
+ }
+ }
+ }
+ }
+ }
+
+ inline result<void> global_dynamic_thread_pool_impl::stop(dynamic_thread_pool_group_impl_guard &g, dynamic_thread_pool_group_impl *group,
+ result<void> err) noexcept
+ {
+ (void) g;
+ if(group->_abnormal_completion_cause)
+ {
+ group->_abnormal_completion_cause = std::move(err);
+ }
+ group->_stopping.store(true, std::memory_order_release);
+ return success();
+ }
+
+
+ inline result<void> global_dynamic_thread_pool_impl::wait(dynamic_thread_pool_group_impl_guard &g, bool reap, dynamic_thread_pool_group_impl *group,
+ deadline d) noexcept
+ {
+ LLFIO_DEADLINE_TO_SLEEP_INIT(d);
+ if(!d || d.nsecs > 0)
+ {
+ /* To ensure forward progress, we need to gate new waits during delayed work submission.
+ Otherwise waits may never exit if the window where _work_items_active.count == 0 is
+ missed.
+ */
+ while(group->_work_items_delayed.count > 0)
+ {
+ g.unlock();
+ std::this_thread::sleep_for(std::chrono::milliseconds(1));
+ g.lock();
+ }
+ group->_waits.fetch_add(1, std::memory_order_release);
+ auto unwaitcount = make_scope_exit([&]() noexcept { group->_waits.fetch_sub(1, std::memory_order_release); });
+#if LLFIO_DYNAMIC_THREAD_POOL_GROUP_USING_GCD
+ while(group->_work_items_active.count > 0)
+ {
+ LLFIO_DEADLINE_TO_TIMEOUT_LOOP(d);
+ dispatch_time_t timeout = DISPATCH_TIME_FOREVER;
+ if(d)
+ {
+ std::chrono::nanoseconds duration;
+ LLFIO_DEADLINE_TO_PARTIAL_TIMEOUT(duration, d);
+ timeout = dispatch_time(DISPATCH_TIME_NOW, duration.count());
+ }
+ g.unlock();
+ dispatch_group_wait(group->_grouph, timeout);
+ g.lock();
+ // if(1 == group->_work_items_active.count)
+ //{
+ // std::cout << "*** wait item remaining is " << group->_work_items_active.front << std::endl;
+ // std::this_thread::sleep_for(std::chrono::seconds(1));
+ //}
+ }
+#elif defined(_WIN32)
+ auto &tls = detail::global_dynamic_thread_pool_thread_local_state();
+ if(tls.current_callback_instance != nullptr)
+ {
+ // I am being called from within a thread worker. Tell
+ // the thread pool that I am not going to exit promptly.
+ CallbackMayRunLong(tls.current_callback_instance);
+ }
+ // Is this a cancellation?
+ if(group->_stopping.load(std::memory_order_relaxed))
+ {
+ while(group->_work_items_active.count > 0)
+ {
+ auto *i = group->_work_items_active.front;
+ if(nullptr != i->_internalworkh)
+ {
+ if(0 == i->_internalworkh_inuse)
+ {
+ i->_internalworkh_inuse = 1;
+ }
+ g.unlock();
+ WaitForThreadpoolWorkCallbacks((PTP_WORK) i->_internalworkh, true);
+ g.lock();
+ if(i->_internalworkh_inuse == 2)
+ {
+ if(nullptr != i->_internalworkh)
+ {
+ CloseThreadpoolWork((PTP_WORK) i->_internalworkh);
+ i->_internalworkh = nullptr;
+ }
+ if(nullptr != i->_internaltimerh)
+ {
+ CloseThreadpoolTimer((PTP_TIMER) i->_internaltimerh);
+ i->_internaltimerh = nullptr;
+ }
+ }
+ i->_internalworkh_inuse = 0;
+ }
+ if(nullptr != i->_internaltimerh)
+ {
+ if(0 == i->_internalworkh_inuse)
+ {
+ i->_internalworkh_inuse = 1;
+ }
+ g.unlock();
+ WaitForThreadpoolTimerCallbacks((PTP_TIMER) i->_internaltimerh, true);
+ g.lock();
+ if(i->_internalworkh_inuse == 2)
+ {
+ if(nullptr != i->_internalworkh)
+ {
+ CloseThreadpoolWork((PTP_WORK) i->_internalworkh);
+ i->_internalworkh = nullptr;
+ }
+ if(nullptr != i->_internaltimerh)
+ {
+ CloseThreadpoolTimer((PTP_TIMER) i->_internaltimerh);
+ i->_internaltimerh = nullptr;
+ }
+ }
+ i->_internalworkh_inuse = 0;
+ }
+ if(group->_work_items_active.count > 0 && group->_work_items_active.front == i)
+ {
+ // This item got cancelled before it started
+ _work_item_done(g, group->_work_items_active.front);
+ }
+ }
+ assert(!group->_stopping.load(std::memory_order_relaxed));
+ }
+ else if(!d)
+ {
+ while(group->_work_items_active.count > 0)
+ {
+ auto *i = group->_work_items_active.front;
+ if(nullptr != i->_internalworkh)
+ {
+ if(0 == i->_internalworkh_inuse)
+ {
+ i->_internalworkh_inuse = 1;
+ }
+ g.unlock();
+ WaitForThreadpoolWorkCallbacks((PTP_WORK) i->_internalworkh, false);
+ g.lock();
+ if(i->_internalworkh_inuse == 2)
+ {
+ if(nullptr != i->_internalworkh)
+ {
+ CloseThreadpoolWork((PTP_WORK) i->_internalworkh);
+ i->_internalworkh = nullptr;
+ }
+ if(nullptr != i->_internaltimerh)
+ {
+ CloseThreadpoolTimer((PTP_TIMER) i->_internaltimerh);
+ i->_internaltimerh = nullptr;
+ }
+ }
+ i->_internalworkh_inuse = 0;
+ }
+ if(nullptr != i->_internaltimerh)
+ {
+ if(0 == i->_internalworkh_inuse)
+ {
+ i->_internalworkh_inuse = 1;
+ }
+ g.unlock();
+ WaitForThreadpoolTimerCallbacks((PTP_TIMER) i->_internaltimerh, false);
+ g.lock();
+ if(i->_internalworkh_inuse == 2)
+ {
+ if(nullptr != i->_internalworkh)
+ {
+ CloseThreadpoolWork((PTP_WORK) i->_internalworkh);
+ i->_internalworkh = nullptr;
+ }
+ if(nullptr != i->_internaltimerh)
+ {
+ CloseThreadpoolTimer((PTP_TIMER) i->_internaltimerh);
+ i->_internaltimerh = nullptr;
+ }
+ }
+ i->_internalworkh_inuse = 0;
+ }
+ }
+ }
+ else
+ {
+ while(group->_work_items_active.count > 0)
+ {
+ LLFIO_DEADLINE_TO_TIMEOUT_LOOP(d);
+ g.unlock();
+ std::this_thread::sleep_for(std::chrono::milliseconds(1));
+ g.lock();
+ }
+ }
+#else
+ while(group->_work_items_active.count > 0)
+ {
+ LLFIO_DEADLINE_TO_TIMEOUT_LOOP(d);
+ g.unlock();
+ std::this_thread::sleep_for(std::chrono::milliseconds(1));
+ g.lock();
+ }
+#endif
+ }
+ if(group->_work_items_active.count > 0)
+ {
+ return errc::timed_out;
+ }
+ if(reap)
+ {
+ return std::move(group->_abnormal_completion_cause);
+ }
+ return success();
+ }
+
+ inline void global_dynamic_thread_pool_impl::_timerthread(dynamic_thread_pool_group::work_item *workitem, threadh_type /*unused*/)
+ {
+ LLFIO_LOG_FUNCTION_CALL(this);
+ assert(workitem->_nextwork.load(std::memory_order_relaxed) != -1);
+ assert(workitem->_has_timer_set());
+ auto *parent = workitem->_parent.load(std::memory_order_relaxed);
+ // std::cout << "*** _timerthread " << workitem << std::endl;
+ if(parent->_stopping.load(std::memory_order_relaxed))
+ {
+ dynamic_thread_pool_group_impl_guard g(parent->_lock); // lock group
+ _work_item_done(g, workitem);
+ return;
+ }
+ if(workitem->_has_timer_set_relative())
+ {
+#if LLFIO_DYNAMIC_THREAD_POOL_GROUP_USING_GCD
+ auto now = std::chrono::steady_clock::now();
+ if(workitem->_timepoint1 - now > std::chrono::seconds(0))
+ {
+ // Timer fired short, so schedule it again
+ _submit_work_item(false, workitem, false);
+ return;
+ }
+#endif
+ workitem->_timepoint1 = {};
+ }
+ if(workitem->_has_timer_set_absolute())
+ {
+#if LLFIO_DYNAMIC_THREAD_POOL_GROUP_USING_GCD
+ auto now = std::chrono::system_clock::now();
+ if(workitem->_timepoint2 - now > std::chrono::seconds(0))
+ {
+ // Timer fired short, so schedule it again
+ _submit_work_item(false, workitem, false);
+ return;
+ }
+#endif
+ workitem->_timepoint2 = {};
+ }
+ assert(!workitem->_has_timer_set());
+ if(workitem->_nextwork.load(std::memory_order_acquire) == 0)
+ {
+ deadline d(std::chrono::seconds(0));
+ workitem->_nextwork.store(workitem->next(d), std::memory_order_release);
+ auto r2 = _prepare_work_item_delay(workitem, parent->_grouph, d);
+ if(!r2)
+ {
+ dynamic_thread_pool_group_impl_guard g(parent->_lock); // lock group
+ (void) stop(g, parent, std::move(r2));
+ _work_item_done(g, workitem);
+ return;
+ }
+ if(-1 == workitem->_nextwork.load(std::memory_order_relaxed))
+ {
+ dynamic_thread_pool_group_impl_guard g(parent->_lock); // lock group
+ _work_item_done(g, workitem);
+ return;
+ }
+ _submit_work_item(false, workitem, false);
+ return;
+ }
+ _submit_work_item(false, workitem, false);
+ }
+
+ // Worker thread entry point
+ inline void global_dynamic_thread_pool_impl::_workerthread(dynamic_thread_pool_group::work_item *workitem, threadh_type selfthreadh)
+ {
+ LLFIO_LOG_FUNCTION_CALL(this);
+ //{
+ // _lock_guard g(parent->_lock);
+ // std::cout << "*** _workerthread " << workitem << " begins with work " << workitem->_nextwork.load(std::memory_order_relaxed) << std::endl;
+ //}
+ assert(workitem->_nextwork.load(std::memory_order_relaxed) != -1);
+ assert(workitem->_nextwork.load(std::memory_order_relaxed) != 0);
+ auto *parent = workitem->_parent.load(std::memory_order_relaxed);
+ if(parent->_stopping.load(std::memory_order_relaxed))
+ {
+ dynamic_thread_pool_group_impl_guard g(parent->_lock); // lock group
+ _work_item_done(g, workitem);
+ return;
+ }
+ auto &tls = detail::global_dynamic_thread_pool_thread_local_state();
+ auto old_thread_local_state = tls;
+ tls.workitem = workitem;
+ tls.current_callback_instance = selfthreadh;
+ tls.nesting_level = parent->_nesting_level + 1;
+ auto r = (*workitem)(workitem->_nextwork.load(std::memory_order_acquire));
+ workitem->_nextwork.store(0, std::memory_order_release); // call next() next time
+ tls = old_thread_local_state;
+ // std::cout << "*** _workerthread " << workitem << " ends with work " << workitem->_nextwork << std::endl;
+ if(!r)
+ {
+ dynamic_thread_pool_group_impl_guard g(parent->_lock); // lock group
+ (void) stop(g, parent, std::move(r));
+ _work_item_done(g, workitem);
+ workitem = nullptr;
+ }
+ else if(parent->_stopping.load(std::memory_order_relaxed))
+ {
+ dynamic_thread_pool_group_impl_guard g(parent->_lock); // lock group
+ _work_item_done(g, workitem);
+ }
+ else
+ {
+ deadline d(std::chrono::seconds(0));
+ workitem->_nextwork.store(workitem->next(d), std::memory_order_release);
+ auto r2 = _prepare_work_item_delay(workitem, parent->_grouph, d);
+ if(!r2)
+ {
+ dynamic_thread_pool_group_impl_guard g(parent->_lock); // lock group
+ (void) stop(g, parent, std::move(r2));
+ _work_item_done(g, workitem);
+ return;
+ }
+ if(-1 == workitem->_nextwork.load(std::memory_order_relaxed))
+ {
+ dynamic_thread_pool_group_impl_guard g(parent->_lock); // lock group
+ _work_item_done(g, workitem);
+ return;
+ }
+ _submit_work_item(false, workitem, false);
+ }
+ }
+} // namespace detail
+
+
+/****************************************** io_aware_work_item *********************************************/
+
+LLFIO_HEADERS_ONLY_MEMFUNC_SPEC dynamic_thread_pool_group::io_aware_work_item::io_aware_work_item(span<io_handle_awareness> hs)
+ : _handles([](span<io_handle_awareness> hs) -> span<io_handle_awareness> {
+ float all = 0;
+ for(auto &i : hs)
+ {
+ all += i.reads + i.writes + i.barriers;
+ }
+ for(auto &i : hs)
+ {
+ if(all == 0.0f)
+ {
+ i.reads = i.writes = 0.5f;
+ i.barriers = 0.0f;
+ }
+ else
+ {
+ i.reads /= all;
+ i.writes /= all;
+ i.barriers /= all;
+ }
+ }
+ auto &impl = detail::global_dynamic_thread_pool();
+ detail::global_dynamic_thread_pool_impl::io_aware_work_item_handles_guard g(impl.io_aware_work_item_handles_lock);
+ for(auto &h : hs)
+ {
+ if(!h.h->is_seekable())
+ {
+ throw std::runtime_error("Supplied handle is not seekable");
+ }
+ auto *fh = static_cast<file_handle *>(h.h);
+ auto unique_id = fh->unique_id();
+ auto it = impl.io_aware_work_item_handles.find(unique_id);
+ if(it == impl.io_aware_work_item_handles.end())
+ {
+ it = impl.io_aware_work_item_handles.emplace(unique_id, detail::global_dynamic_thread_pool_impl::io_aware_work_item_statfs{}).first;
+ auto r = it->second.statfs.fill(*fh, statfs_t::want::iosinprogress | statfs_t::want::iosbusytime);
+ if(!r || it->second.statfs.f_iosinprogress == (uint32_t) -1)
+ {
+ impl.io_aware_work_item_handles.erase(it);
+ if(!r)
+ {
+ r.value();
+ }
+ throw std::runtime_error("statfs::f_iosinprogress unavailable for supplied handle");
+ }
+ it->second.last_updated = std::chrono::steady_clock::now();
+ }
+ it->second.refcount++;
+ h._internal = &*it;
+ }
+ return hs;
+ }(hs))
+{
+ LLFIO_LOG_FUNCTION_CALL(this);
+}
+
+LLFIO_HEADERS_ONLY_MEMFUNC_SPEC dynamic_thread_pool_group::io_aware_work_item::~io_aware_work_item()
+{
+ LLFIO_LOG_FUNCTION_CALL(this);
+ auto &impl = detail::global_dynamic_thread_pool();
+ detail::global_dynamic_thread_pool_impl::io_aware_work_item_handles_guard g(impl.io_aware_work_item_handles_lock);
+ using value_type = decltype(impl.io_aware_work_item_handles)::value_type;
+ for(auto &h : _handles)
+ {
+ auto *i = (value_type *) h._internal;
+ if(0 == --i->second.refcount)
+ {
+ impl.io_aware_work_item_handles.erase(i->first);
+ }
+ }
+}
+
+LLFIO_HEADERS_ONLY_MEMFUNC_SPEC intptr_t dynamic_thread_pool_group::io_aware_work_item::next(deadline &d) noexcept
+{
+ LLFIO_LOG_FUNCTION_CALL(this);
+ {
+ auto &impl = detail::global_dynamic_thread_pool();
+ auto now = std::chrono::steady_clock::now();
+ detail::global_dynamic_thread_pool_impl::io_aware_work_item_handles_guard g(impl.io_aware_work_item_handles_lock);
+ using value_type = decltype(impl.io_aware_work_item_handles)::value_type;
+ for(auto &h : _handles)
+ {
+ auto *i = (value_type *) h._internal;
+ if(std::chrono::duration_cast<std::chrono::milliseconds>(now - i->second.last_updated) >= std::chrono::milliseconds(100))
+ {
+ // auto old_iosinprogress = i->second.statfs.f_iosinprogress;
+ auto elapsed = now - i->second.last_updated;
+ (void) i->second.statfs.fill(*h.h, statfs_t::want::iosinprogress | statfs_t::want::iosbusytime);
+ i->second.last_updated = now;
+
+ if(elapsed > std::chrono::seconds(5))
+ {
+ i->second.average_busy = i->second.statfs.f_iosbusytime;
+ i->second.average_queuedepth = (float) i->second.statfs.f_iosinprogress;
+ }
+ else
+ {
+ i->second.average_busy = (i->second.average_busy * 0.9f) + (i->second.statfs.f_iosbusytime * 0.1f);
+ i->second.average_queuedepth = (i->second.average_queuedepth * 0.9f) + (i->second.statfs.f_iosinprogress * 0.1f);
+ }
+ if(i->second.average_busy < this->max_iosbusytime && i->second.average_queuedepth < this->min_iosinprogress)
+ {
+ i->second.default_deadline = std::chrono::seconds(0); // remove pacing
+ }
+ else if(i->second.average_queuedepth > this->max_iosinprogress)
+ {
+ if(0 == i->second.default_deadline.nsecs)
+ {
+ i->second.default_deadline = std::chrono::milliseconds(1); // start with 1ms, it'll reduce from there if needed
+ }
+ else if((i->second.default_deadline.nsecs >> 4) > 0)
+ {
+ i->second.default_deadline.nsecs += i->second.default_deadline.nsecs >> 4;
+ }
+ else
+ {
+ i->second.default_deadline.nsecs++;
+ }
+ }
+ else if(i->second.average_queuedepth < this->min_iosinprogress)
+ {
+ if(i->second.default_deadline.nsecs > (i->second.default_deadline.nsecs >> 4) && (i->second.default_deadline.nsecs >> 4) > 0)
+ {
+ i->second.default_deadline.nsecs -= i->second.default_deadline.nsecs >> 4;
+ }
+ else if(i->second.default_deadline.nsecs > 1)
+ {
+ i->second.default_deadline.nsecs--;
+ }
+ }
+ }
+ if(d.nsecs < i->second.default_deadline.nsecs)
+ {
+ d = i->second.default_deadline;
+ }
+ }
+ }
+ return io_aware_next(d);
+}
+
+
+LLFIO_V2_NAMESPACE_END
diff --git a/include/llfio/v2.0/detail/impl/posix/directory_handle.ipp b/include/llfio/v2.0/detail/impl/posix/directory_handle.ipp
index 9ea737c0..bd0050d6 100644
--- a/include/llfio/v2.0/detail/impl/posix/directory_handle.ipp
+++ b/include/llfio/v2.0/detail/impl/posix/directory_handle.ipp
@@ -39,7 +39,8 @@ http://www.boost.org/LICENSE_1_0.txt)
LLFIO_V2_NAMESPACE_BEGIN
-result<directory_handle> directory_handle::directory(const path_handle &base, path_view_type path, mode _mode, creation _creation, caching _caching, flag flags) noexcept
+result<directory_handle> directory_handle::directory(const path_handle &base, path_view_type path, mode _mode, creation _creation, caching _caching,
+ flag flags) noexcept
{
if(flags & flag::unlink_on_first_close)
{
@@ -314,9 +315,12 @@ result<directory_handle::buffers_type> directory_handle::read(io_request<buffers
req.buffers[0].stat.st_birthtim = to_timepoint(s.st_birthtim);
#endif
#endif
- req.buffers[0].stat.st_sparse = static_cast<unsigned int>((static_cast<handle::extent_type>(s.st_blocks) * 512) < static_cast<handle::extent_type>(s.st_size));
+ req.buffers[0].stat.st_sparse =
+ static_cast<unsigned int>((static_cast<handle::extent_type>(s.st_blocks) * 512) < static_cast<handle::extent_type>(s.st_size));
req.buffers._resize(1);
- static constexpr stat_t::want default_stat_contents = stat_t::want::dev | stat_t::want::ino | stat_t::want::type | stat_t::want::perms | stat_t::want::nlink | stat_t::want::uid | stat_t::want::gid | stat_t::want::rdev | stat_t::want::atim | stat_t::want::mtim | stat_t::want::ctim | stat_t::want::size |
+ static constexpr stat_t::want default_stat_contents = stat_t::want::dev | stat_t::want::ino | stat_t::want::type | stat_t::want::perms |
+ stat_t::want::nlink | stat_t::want::uid | stat_t::want::gid | stat_t::want::rdev |
+ stat_t::want::atim | stat_t::want::mtim | stat_t::want::ctim | stat_t::want::size |
stat_t::want::allocated | stat_t::want::blocks | stat_t::want::blksize
#ifdef HAVE_STAT_FLAGS
| stat_t::want::flags
@@ -360,8 +364,7 @@ result<directory_handle::buffers_type> directory_handle::read(io_request<buffers
}
stat_t::want default_stat_contents = stat_t::want::ino | stat_t::want::type;
dirent *buffer;
- size_t bytesavailable;
- int bytes;
+ size_t bytesavailable, bytes;
bool done = false;
do
{
@@ -383,27 +386,41 @@ result<directory_handle::buffers_type> directory_handle::read(io_request<buffers
if(-1 == ::lseek(_v.fd, 0, SEEK_SET))
return posix_error();
#endif
- bytes = getdents(_v.fd, reinterpret_cast<char *>(buffer), bytesavailable);
- if(req.kernelbuffer.empty() && bytes == -1 && EINVAL == errno)
+ bytes = 0;
+ int _bytes;
+ do
{
- size_t toallocate = req.buffers._kernel_buffer_size * 2;
- auto *mem = (char *) operator new[](toallocate, std::nothrow); // don't initialise
- if(mem == nullptr)
+ assert(bytes <= bytesavailable);
+ _bytes = getdents(_v.fd, reinterpret_cast<char *>(buffer) + bytes, bytesavailable - bytes);
+ if(_bytes == 0)
{
- return errc::not_enough_memory;
+ done = true;
+ break;
}
- req.buffers._kernel_buffer.reset();
- req.buffers._kernel_buffer = std::unique_ptr<char[]>(mem);
- req.buffers._kernel_buffer_size = toallocate;
- }
- else
- {
- if(bytes == -1)
+ if(req.kernelbuffer.empty() && _bytes == -1 && EINVAL == errno)
+ {
+ size_t toallocate = req.buffers._kernel_buffer_size * 2;
+ auto *mem = (char *) operator new[](toallocate, std::nothrow); // don't initialise
+ if(mem == nullptr)
+ {
+ return errc::not_enough_memory;
+ }
+ req.buffers._kernel_buffer.reset();
+ req.buffers._kernel_buffer = std::unique_ptr<char[]>(mem);
+ req.buffers._kernel_buffer_size = toallocate;
+ // We need to reset and do the whole thing against to ensure single shot atomicity
+ break;
+ }
+ else if(_bytes == -1)
{
return posix_error();
}
- done = true;
- }
+ else
+ {
+ assert(_bytes > 0);
+ bytes += _bytes;
+ }
+ } while(!done);
} while(!done);
if(bytes == 0)
{
diff --git a/include/llfio/v2.0/detail/impl/posix/statfs.ipp b/include/llfio/v2.0/detail/impl/posix/statfs.ipp
index a853a418..2e3b0859 100644
--- a/include/llfio/v2.0/detail/impl/posix/statfs.ipp
+++ b/include/llfio/v2.0/detail/impl/posix/statfs.ipp
@@ -1,5 +1,5 @@
/* Information about the volume storing a file
-(C) 2016-2017 Niall Douglas <http://www.nedproductions.biz/> (5 commits)
+(C) 2016-2020 Niall Douglas <http://www.nedproductions.biz/> (5 commits)
File Created: Jan 2016
@@ -25,10 +25,15 @@ Distributed under the Boost Software License, Version 1.0.
#include "../../../handle.hpp"
#include "../../../statfs.hpp"
+#include <chrono>
+#include <mutex>
+#include <vector>
+
#include <sys/mount.h>
#ifdef __linux__
#include <mntent.h>
#include <sys/statfs.h>
+#include <sys/sysmacros.h>
#endif
LLFIO_V2_NAMESPACE_BEGIN
@@ -37,186 +42,189 @@ LLFIO_HEADERS_ONLY_MEMFUNC_SPEC result<size_t> statfs_t::fill(const handle &h, s
{
size_t ret = 0;
#ifdef __linux__
- struct statfs64 s
- {
- };
- memset(&s, 0, sizeof(s));
- if(-1 == fstatfs64(h.native_handle().fd, &s))
- {
- return posix_error();
- }
- if(!!(wanted & want::bsize))
+ if(!!(wanted & ~(want::iosinprogress | want::iosbusytime)))
{
- f_bsize = s.f_bsize;
- ++ret;
- }
- if(!!(wanted & want::iosize))
- {
- f_iosize = s.f_frsize;
- ++ret;
- }
- if(!!(wanted & want::blocks))
- {
- f_blocks = s.f_blocks;
- ++ret;
- }
- if(!!(wanted & want::bfree))
- {
- f_bfree = s.f_bfree;
- ++ret;
- }
- if(!!(wanted & want::bavail))
- {
- f_bavail = s.f_bavail;
- ++ret;
- }
- if(!!(wanted & want::files))
- {
- f_files = s.f_files;
- ++ret;
- }
- if(!!(wanted & want::ffree))
- {
- f_ffree = s.f_ffree;
- ++ret;
- }
- if(!!(wanted & want::namemax))
- {
- f_namemax = s.f_namelen;
- ++ret;
- }
- // if(!!(wanted&&want::owner)) { f_owner =s.f_owner; ++ret; }
- if(!!(wanted & want::fsid))
- {
- f_fsid[0] = static_cast<unsigned>(s.f_fsid.__val[0]);
- f_fsid[1] = static_cast<unsigned>(s.f_fsid.__val[1]);
- ++ret;
- }
- if(!!(wanted & want::flags) || !!(wanted & want::fstypename) || !!(wanted & want::mntfromname) || !!(wanted & want::mntonname))
- {
- try
+ struct statfs64 s
{
- struct mountentry
- {
- std::string mnt_fsname, mnt_dir, mnt_type, mnt_opts;
- mountentry(const char *a, const char *b, const char *c, const char *d)
- : mnt_fsname(a)
- , mnt_dir(b)
- , mnt_type(c)
- , mnt_opts(d)
- {
- }
- };
- std::vector<std::pair<mountentry, struct statfs64>> mountentries;
+ };
+ memset(&s, 0, sizeof(s));
+ if(-1 == fstatfs64(h.native_handle().fd, &s))
+ {
+ return posix_error();
+ }
+ if(!!(wanted & want::bsize))
+ {
+ f_bsize = s.f_bsize;
+ ++ret;
+ }
+ if(!!(wanted & want::iosize))
+ {
+ f_iosize = s.f_frsize;
+ ++ret;
+ }
+ if(!!(wanted & want::blocks))
+ {
+ f_blocks = s.f_blocks;
+ ++ret;
+ }
+ if(!!(wanted & want::bfree))
+ {
+ f_bfree = s.f_bfree;
+ ++ret;
+ }
+ if(!!(wanted & want::bavail))
+ {
+ f_bavail = s.f_bavail;
+ ++ret;
+ }
+ if(!!(wanted & want::files))
+ {
+ f_files = s.f_files;
+ ++ret;
+ }
+ if(!!(wanted & want::ffree))
+ {
+ f_ffree = s.f_ffree;
+ ++ret;
+ }
+ if(!!(wanted & want::namemax))
+ {
+ f_namemax = s.f_namelen;
+ ++ret;
+ }
+ // if(!!(wanted&&want::owner)) { f_owner =s.f_owner; ++ret; }
+ if(!!(wanted & want::fsid))
+ {
+ f_fsid[0] = static_cast<unsigned>(s.f_fsid.__val[0]);
+ f_fsid[1] = static_cast<unsigned>(s.f_fsid.__val[1]);
+ ++ret;
+ }
+ if(!!(wanted & want::flags) || !!(wanted & want::fstypename) || !!(wanted & want::mntfromname) || !!(wanted & want::mntonname))
+ {
+ try
{
- // Need to parse mount options on Linux
- FILE *mtab = setmntent("/etc/mtab", "r");
- if(mtab == nullptr)
- {
- mtab = setmntent("/proc/mounts", "r");
- }
- if(mtab == nullptr)
- {
- return posix_error();
- }
- auto unmtab = make_scope_exit([mtab]() noexcept { endmntent(mtab); });
- struct mntent m
+ struct mountentry
{
+ std::string mnt_fsname, mnt_dir, mnt_type, mnt_opts;
+ mountentry(const char *a, const char *b, const char *c, const char *d)
+ : mnt_fsname(a)
+ , mnt_dir(b)
+ , mnt_type(c)
+ , mnt_opts(d)
+ {
+ }
};
- char buffer[32768];
- while(getmntent_r(mtab, &m, buffer, sizeof(buffer)) != nullptr)
+ std::vector<std::pair<mountentry, struct statfs64>> mountentries;
{
- struct statfs64 temp
+ // Need to parse mount options on Linux
+ FILE *mtab = setmntent("/etc/mtab", "r");
+ if(mtab == nullptr)
+ {
+ mtab = setmntent("/proc/mounts", "r");
+ }
+ if(mtab == nullptr)
+ {
+ return posix_error();
+ }
+ auto unmtab = make_scope_exit([mtab]() noexcept { endmntent(mtab); });
+ struct mntent m
{
};
- memset(&temp, 0, sizeof(temp));
- // std::cout << m.mnt_fsname << "," << m.mnt_dir << "," << m.mnt_type << "," << m.mnt_opts << std::endl;
- if(0 == statfs64(m.mnt_dir, &temp))
+ char buffer[32768];
+ while(getmntent_r(mtab, &m, buffer, sizeof(buffer)) != nullptr)
{
- // std::cout << " " << temp.f_fsid.__val[0] << temp.f_fsid.__val[1] << " =? " << s.f_fsid.__val[0] << s.f_fsid.__val[1] << std::endl;
- if(temp.f_type == s.f_type && (memcmp(&temp.f_fsid, &s.f_fsid, sizeof(s.f_fsid)) == 0))
+ struct statfs64 temp
+ {
+ };
+ memset(&temp, 0, sizeof(temp));
+ // std::cout << m.mnt_fsname << "," << m.mnt_dir << "," << m.mnt_type << "," << m.mnt_opts << std::endl;
+ if(0 == statfs64(m.mnt_dir, &temp))
{
- mountentries.emplace_back(mountentry(m.mnt_fsname, m.mnt_dir, m.mnt_type, m.mnt_opts), temp);
+ // std::cout << " " << temp.f_fsid.__val[0] << temp.f_fsid.__val[1] << " =? " << s.f_fsid.__val[0] << s.f_fsid.__val[1] << std::endl;
+ if(temp.f_type == s.f_type && (memcmp(&temp.f_fsid, &s.f_fsid, sizeof(s.f_fsid)) == 0))
+ {
+ mountentries.emplace_back(mountentry(m.mnt_fsname, m.mnt_dir, m.mnt_type, m.mnt_opts), temp);
+ }
}
}
}
- }
#ifndef LLFIO_COMPILING_FOR_GCOV
- if(mountentries.empty())
- {
- return errc::no_such_file_or_directory;
- }
- /* Choose the mount entry with the most closely matching statfs. You can't choose
- exclusively based on mount point because of bind mounts. Note that we have a
- particular problem in Docker:
+ if(mountentries.empty())
+ {
+ return errc::no_such_file_or_directory;
+ }
+ /* Choose the mount entry with the most closely matching statfs. You can't choose
+ exclusively based on mount point because of bind mounts. Note that we have a
+ particular problem in Docker:
- rootfs / rootfs rw 0 0
- overlay / overlay rw,relatime,lowerdir=... 0 0
- /dev/sda3 /etc xfs rw,relatime,... 0 0
- tmpfs /dev tmpfs rw,nosuid,... 0 0
- tmpfs /proc/acpi tmpfs rw,nosuid,... 0 0
- ...
+ rootfs / rootfs rw 0 0
+ overlay / overlay rw,relatime,lowerdir=... 0 0
+ /dev/sda3 /etc xfs rw,relatime,... 0 0
+ tmpfs /dev tmpfs rw,nosuid,... 0 0
+ tmpfs /proc/acpi tmpfs rw,nosuid,... 0 0
+ ...
- If f_type and f_fsid are identical for the statfs for the mount as for our file,
- then you will get multiple mountentries, and there is no obvious way of
- disambiguating them. What we do is match mount based on the longest match
- of the mount point with the current path of our file.
- */
- if(mountentries.size() > 1)
- {
- OUTCOME_TRY(auto &&currentfilepath_, h.current_path());
- string_view currentfilepath(currentfilepath_.native());
- std::vector<std::pair<size_t, size_t>> scores(mountentries.size());
- //std::cout << "*** For matching mount entries to file with path " << currentfilepath << ":\n";
- for(size_t n = 0; n < mountentries.size(); n++)
+ If f_type and f_fsid are identical for the statfs for the mount as for our file,
+ then you will get multiple mountentries, and there is no obvious way of
+ disambiguating them. What we do is match mount based on the longest match
+ of the mount point with the current path of our file.
+ */
+ if(mountentries.size() > 1)
{
- scores[n].first =
- (currentfilepath.substr(0, mountentries[n].first.mnt_dir.size()) == mountentries[n].first.mnt_dir) ? mountentries[n].first.mnt_dir.size() : 0;
- //std::cout << "*** Mount entry " << mountentries[n].first.mnt_dir << " get score " << scores[n].first << std::endl;
- scores[n].second = n;
+ OUTCOME_TRY(auto &&currentfilepath_, h.current_path());
+ string_view currentfilepath(currentfilepath_.native());
+ std::vector<std::pair<size_t, size_t>> scores(mountentries.size());
+ // std::cout << "*** For matching mount entries to file with path " << currentfilepath << ":\n";
+ for(size_t n = 0; n < mountentries.size(); n++)
+ {
+ scores[n].first =
+ (currentfilepath.substr(0, mountentries[n].first.mnt_dir.size()) == mountentries[n].first.mnt_dir) ? mountentries[n].first.mnt_dir.size() : 0;
+ // std::cout << "*** Mount entry " << mountentries[n].first.mnt_dir << " get score " << scores[n].first << std::endl;
+ scores[n].second = n;
+ }
+ std::sort(scores.begin(), scores.end());
+ // Choose the item whose mnt_dir most matched the current path for our file.
+ auto temp(std::move(mountentries[scores.back().second]));
+ mountentries.clear();
+ mountentries.push_back(std::move(temp));
}
- std::sort(scores.begin(), scores.end());
- // Choose the item whose mnt_dir most matched the current path for our file.
- auto temp(std::move(mountentries[scores.back().second]));
- mountentries.clear();
- mountentries.push_back(std::move(temp));
- }
#endif
- if(!!(wanted & want::flags))
- {
- f_flags.rdonly = static_cast<uint32_t>((s.f_flags & MS_RDONLY) != 0);
- f_flags.noexec = static_cast<uint32_t>((s.f_flags & MS_NOEXEC) != 0);
- f_flags.nosuid = static_cast<uint32_t>((s.f_flags & MS_NOSUID) != 0);
- f_flags.acls = static_cast<uint32_t>(std::string::npos != mountentries.front().first.mnt_opts.find("acl") &&
- std::string::npos == mountentries.front().first.mnt_opts.find("noacl"));
- f_flags.xattr = static_cast<uint32_t>(std::string::npos != mountentries.front().first.mnt_opts.find("xattr") &&
- std::string::npos == mountentries.front().first.mnt_opts.find("nouser_xattr"));
- // out.f_flags.compression=0;
- // Those filing systems supporting FALLOC_FL_PUNCH_HOLE
- f_flags.extents = static_cast<uint32_t>(mountentries.front().first.mnt_type == "btrfs" || mountentries.front().first.mnt_type == "ext4" ||
- mountentries.front().first.mnt_type == "xfs" || mountentries.front().first.mnt_type == "tmpfs");
- ++ret;
- }
- if(!!(wanted & want::fstypename))
- {
- f_fstypename = mountentries.front().first.mnt_type;
- ++ret;
- }
- if(!!(wanted & want::mntfromname))
- {
- f_mntfromname = mountentries.front().first.mnt_fsname;
- ++ret;
+ if(!!(wanted & want::flags))
+ {
+ f_flags.rdonly = static_cast<uint32_t>((s.f_flags & MS_RDONLY) != 0);
+ f_flags.noexec = static_cast<uint32_t>((s.f_flags & MS_NOEXEC) != 0);
+ f_flags.nosuid = static_cast<uint32_t>((s.f_flags & MS_NOSUID) != 0);
+ f_flags.acls = static_cast<uint32_t>(std::string::npos != mountentries.front().first.mnt_opts.find("acl") &&
+ std::string::npos == mountentries.front().first.mnt_opts.find("noacl"));
+ f_flags.xattr = static_cast<uint32_t>(std::string::npos != mountentries.front().first.mnt_opts.find("xattr") &&
+ std::string::npos == mountentries.front().first.mnt_opts.find("nouser_xattr"));
+ // out.f_flags.compression=0;
+ // Those filing systems supporting FALLOC_FL_PUNCH_HOLE
+ f_flags.extents = static_cast<uint32_t>(mountentries.front().first.mnt_type == "btrfs" || mountentries.front().first.mnt_type == "ext4" ||
+ mountentries.front().first.mnt_type == "xfs" || mountentries.front().first.mnt_type == "tmpfs");
+ ++ret;
+ }
+ if(!!(wanted & want::fstypename))
+ {
+ f_fstypename = mountentries.front().first.mnt_type;
+ ++ret;
+ }
+ if(!!(wanted & want::mntfromname))
+ {
+ f_mntfromname = mountentries.front().first.mnt_fsname;
+ ++ret;
+ }
+ if(!!(wanted & want::mntonname))
+ {
+ f_mntonname = mountentries.front().first.mnt_dir;
+ ++ret;
+ }
}
- if(!!(wanted & want::mntonname))
+ catch(...)
{
- f_mntonname = mountentries.front().first.mnt_dir;
- ++ret;
+ return error_from_exception();
}
}
- catch(...)
- {
- return error_from_exception();
- }
}
#else
struct statfs s;
@@ -311,7 +319,177 @@ LLFIO_HEADERS_ONLY_MEMFUNC_SPEC result<size_t> statfs_t::fill(const handle &h, s
++ret;
}
#endif
+ if(!!(wanted & want::iosinprogress) || !!(wanted & want::iosbusytime))
+ {
+ OUTCOME_TRY(auto &&ios, _fill_ios(h, f_mntfromname));
+ if(!!(wanted & want::iosinprogress))
+ {
+ f_iosinprogress = ios.first;
+ ++ret;
+ }
+ if(!!(wanted & want::iosbusytime))
+ {
+ f_iosbusytime = ios.second;
+ ++ret;
+ }
+ }
return ret;
}
+/******************************************* statfs_t ************************************************/
+
+LLFIO_HEADERS_ONLY_MEMFUNC_SPEC result<std::pair<uint32_t, float>> statfs_t::_fill_ios(const handle &h, const std::string & /*unused*/) noexcept
+{
+ (void) h;
+ try
+ {
+#ifdef __linux__
+ struct stat s
+ {
+ };
+ memset(&s, 0, sizeof(s));
+
+ if(-1 == ::fstat(h.native_handle().fd, &s))
+ {
+ if(!h.is_symlink() || EBADF != errno)
+ {
+ return posix_error();
+ }
+ // This is a hack, but symlink_handle includes this first so there is a chicken and egg dependency problem
+ OUTCOME_TRY(detail::stat_from_symlink(s, h));
+ }
+
+ static struct last_reading_t
+ {
+ struct item
+ {
+ dev_t st_dev;
+ size_t millis{0};
+ std::chrono::steady_clock::time_point last_updated;
+
+ uint32_t f_iosinprogress{0};
+ float f_iosbusytime{0};
+ };
+ std::mutex lock;
+ std::vector<item> items;
+ } last_reading;
+ auto now = std::chrono::steady_clock::now();
+ {
+ std::lock_guard<std::mutex> g(last_reading.lock);
+ for(auto &i : last_reading.items)
+ {
+ if(i.st_dev == s.st_dev)
+ {
+ if(std::chrono::duration_cast<std::chrono::milliseconds>(now - i.last_updated) < std::chrono::milliseconds(100))
+ {
+ return {i.f_iosinprogress, i.f_iosbusytime}; // exit with old readings
+ }
+ break;
+ }
+ }
+ }
+ try
+ {
+ int fd = ::open("/proc/diskstats", O_RDONLY);
+ if(fd >= 0)
+ {
+ std::string diskstats;
+ diskstats.resize(4096);
+ for(;;)
+ {
+ auto read = ::read(fd, (char *) diskstats.data(), diskstats.size());
+ if(read < 0)
+ {
+ return posix_error();
+ }
+ if(read < (ssize_t) diskstats.size())
+ {
+ ::close(fd);
+ diskstats.resize(read);
+ break;
+ }
+ try
+ {
+ diskstats.resize(diskstats.size() << 1);
+ }
+ catch(...)
+ {
+ ::close(fd);
+ throw;
+ }
+ }
+ /* Format is (https://www.kernel.org/doc/Documentation/iostats.txt):
+ <dev id major> <dev id minor> <device name> 01 02 03 04 05 06 07 08 09 10 ...
+
+ Field 9 is i/o's currently in progress.
+ Field 10 is milliseconds spent doing i/o (cumulative).
+ */
+ auto match_line = [&](string_view sv) {
+ int major = 0, minor = 0;
+ sscanf(sv.data(), "%d %d", &major, &minor);
+ // printf("Does %d,%d match %d,%d?\n", major, minor, major(s.st_dev), minor(s.st_dev));
+ return (makedev(major, minor) == s.st_dev);
+ };
+ for(size_t is = 0, ie = diskstats.find(10); ie != diskstats.npos; is = ie + 1, ie = diskstats.find(10, is))
+ {
+ auto sv = string_view(diskstats).substr(is, ie - is);
+ if(match_line(sv))
+ {
+ int major = 0, minor = 0;
+ char devicename[64];
+ size_t fields[12];
+ sscanf(sv.data(), "%d %d %s %zu %zu %zu %zu %zu %zu %zu %zu %zu %zu", &major, &minor, devicename, fields + 0, fields + 1, fields + 2, fields + 3,
+ fields + 4, fields + 5, fields + 6, fields + 7, fields + 8, fields + 9);
+ std::lock_guard<std::mutex> g(last_reading.lock);
+ auto it = last_reading.items.begin();
+ for(; it != last_reading.items.end(); ++it)
+ {
+ if(it->st_dev == s.st_dev)
+ {
+ break;
+ }
+ }
+ if(it == last_reading.items.end())
+ {
+ last_reading.items.emplace_back();
+ it = --last_reading.items.end();
+ it->st_dev = s.st_dev;
+ it->millis = fields[9];
+ }
+ else
+ {
+ auto timediff = std::chrono::duration_cast<std::chrono::milliseconds>(now - it->last_updated);
+ it->f_iosbusytime = std::min((float) ((double) (fields[9] - it->millis) / timediff.count()), 1.0f);
+ it->millis = fields[9];
+ }
+ it->f_iosinprogress = (uint32_t) fields[8];
+ it->last_updated = now;
+ return {it->f_iosinprogress, it->f_iosbusytime};
+ }
+ }
+ // It's totally possible that the dev_t reported by stat()
+ // does not appear in /proc/diskstats, if this occurs then
+ // return all bits one to indicate soft failure.
+ }
+ }
+ catch(...)
+ {
+ return error_from_exception();
+ }
+#else
+ /* On FreeBSD, want::iosinprogress and want::iosbusytime could be implemented
+ using libdevstat. See https://www.freebsd.org/cgi/man.cgi?query=devstat&sektion=3.
+ Code donations welcome!
+
+ On Mac OS, getting the current i/o wait time appears to be privileged only?
+ */
+#endif
+ return {-1, detail::constexpr_float_allbits_set_nan()};
+ }
+ catch(...)
+ {
+ return error_from_exception();
+ }
+}
+
LLFIO_V2_NAMESPACE_END
diff --git a/include/llfio/v2.0/detail/impl/windows/directory_handle.ipp b/include/llfio/v2.0/detail/impl/windows/directory_handle.ipp
index 60e34e5e..4da58a91 100644
--- a/include/llfio/v2.0/detail/impl/windows/directory_handle.ipp
+++ b/include/llfio/v2.0/detail/impl/windows/directory_handle.ipp
@@ -28,6 +28,8 @@ http://www.boost.org/LICENSE_1_0.txt)
#include "import.hpp"
+#include "../../../file_handle.hpp"
+
LLFIO_V2_NAMESPACE_BEGIN
result<directory_handle> directory_handle::directory(const path_handle &base, path_view_type path, mode _mode, creation _creation, caching _caching, flag flags) noexcept
diff --git a/include/llfio/v2.0/detail/impl/windows/file_handle.ipp b/include/llfio/v2.0/detail/impl/windows/file_handle.ipp
index 610f20b1..5b36810f 100644
--- a/include/llfio/v2.0/detail/impl/windows/file_handle.ipp
+++ b/include/llfio/v2.0/detail/impl/windows/file_handle.ipp
@@ -25,6 +25,11 @@ Distributed under the Boost Software License, Version 1.0.
#include "../../../file_handle.hpp"
#include "import.hpp"
+#include "../../../statfs.hpp"
+
+#include <mutex>
+#include <vector>
+
LLFIO_V2_NAMESPACE_BEGIN
result<file_handle> file_handle::file(const path_handle &base, file_handle::path_view_type path, file_handle::mode _mode, file_handle::creation _creation,
@@ -821,7 +826,7 @@ result<file_handle::extent_pair> file_handle::clone_extents_to(file_handle::exte
}
done = true;
}
- //assert(done);
+ // assert(done);
dest_length = destoffset + extent.length;
truncate_back_on_failure = false;
LLFIO_DEADLINE_TO_TIMEOUT_LOOP(d);
@@ -870,4 +875,108 @@ result<file_handle::extent_type> file_handle::zero(file_handle::extent_pair exte
return success();
}
+
+/******************************************* statfs_t ************************************************/
+
+LLFIO_HEADERS_ONLY_MEMFUNC_SPEC result<std::pair<uint32_t, float>> statfs_t::_fill_ios(const handle & /*unused*/, const std::string &mntfromname) noexcept
+{
+ try
+ {
+ alignas(8) wchar_t buffer[32769];
+ // Firstly open a handle to the volume
+ OUTCOME_TRY(auto &&volumeh, file_handle::file({}, mntfromname, handle::mode::none, handle::creation::open_existing, handle::caching::only_metadata));
+ // Now ask the volume what physical disks it spans
+ auto *vde = reinterpret_cast<VOLUME_DISK_EXTENTS *>(buffer);
+ OVERLAPPED ol{};
+ memset(&ol, 0, sizeof(ol));
+ ol.Internal = static_cast<ULONG_PTR>(-1);
+ if(DeviceIoControl(volumeh.native_handle().h, IOCTL_VOLUME_GET_VOLUME_DISK_EXTENTS, nullptr, 0, vde, sizeof(buffer), nullptr, &ol) == 0)
+ {
+ if(ERROR_IO_PENDING == GetLastError())
+ {
+ NTSTATUS ntstat = ntwait(volumeh.native_handle().h, ol, deadline());
+ if(ntstat != 0)
+ {
+ return ntkernel_error(ntstat);
+ }
+ }
+ if(ERROR_SUCCESS != GetLastError())
+ {
+ return win32_error();
+ }
+ }
+ static struct last_reading_t
+ {
+ struct item
+ {
+ int64_t ReadTime{0}, WriteTime{0}, IdleTime{0};
+ };
+ std::mutex lock;
+ std::vector<item> items;
+ } last_reading;
+
+ uint32_t iosinprogress = 0;
+ float iosbusytime = 0;
+ DWORD disk_extents = vde->NumberOfDiskExtents;
+ for(DWORD disk_extent = 0; disk_extent < disk_extents; disk_extent++)
+ {
+ alignas(8) wchar_t physicaldrivename[32] = L"\\\\.\\PhysicalDrive", *e = physicaldrivename + 17;
+ const auto DiskNumber = vde->Extents[disk_extent].DiskNumber;
+ if(DiskNumber >= 100)
+ {
+ *e++ = '0' + ((DiskNumber / 100) % 10);
+ }
+ if(DiskNumber >= 10)
+ {
+ *e++ = '0' + ((DiskNumber / 10) % 10);
+ }
+ *e++ = '0' + (DiskNumber % 10);
+ *e = 0;
+ OUTCOME_TRY(auto &&diskh, file_handle::file({}, path_view(physicaldrivename, e - physicaldrivename, path_view::zero_terminated), handle::mode::none,
+ handle::creation::open_existing, handle::caching::only_metadata));
+ ol.Internal = static_cast<ULONG_PTR>(-1);
+ auto *dp = reinterpret_cast<DISK_PERFORMANCE *>(buffer);
+ if(DeviceIoControl(diskh.native_handle().h, IOCTL_DISK_PERFORMANCE, nullptr, 0, dp, sizeof(buffer), nullptr, &ol) == 0)
+ {
+ if(ERROR_IO_PENDING == GetLastError())
+ {
+ NTSTATUS ntstat = ntwait(diskh.native_handle().h, ol, deadline());
+ if(ntstat != 0)
+ {
+ return ntkernel_error(ntstat);
+ }
+ }
+ if(ERROR_SUCCESS != GetLastError())
+ {
+ return win32_error();
+ }
+ }
+ //printf("%llu,%llu,%llu\n", dp->ReadTime.QuadPart, dp->WriteTime.QuadPart, dp->IdleTime.QuadPart);
+ iosinprogress += dp->QueueDepth;
+ std::lock_guard<std::mutex> g(last_reading.lock);
+ if(last_reading.items.size() < DiskNumber + 1)
+ {
+ last_reading.items.resize(DiskNumber + 1);
+ }
+ else
+ {
+ uint64_t rd = (uint64_t) dp->ReadTime.QuadPart - (uint64_t) last_reading.items[DiskNumber].ReadTime;
+ uint64_t wd = (uint64_t) dp->WriteTime.QuadPart - (uint64_t) last_reading.items[DiskNumber].WriteTime;
+ uint64_t id = (uint64_t) dp->IdleTime.QuadPart - (uint64_t) last_reading.items[DiskNumber].IdleTime;
+ iosbusytime += 1 - (float) ((double) id / (rd + wd + id));
+ }
+ last_reading.items[DiskNumber].ReadTime = dp->ReadTime.QuadPart;
+ last_reading.items[DiskNumber].WriteTime = dp->WriteTime.QuadPart;
+ last_reading.items[DiskNumber].IdleTime = dp->IdleTime.QuadPart;
+ }
+ iosinprogress /= disk_extents;
+ iosbusytime /= disk_extents;
+ return {iosinprogress, std::min(iosbusytime, 1.0f)};
+ }
+ catch(...)
+ {
+ return error_from_exception();
+ }
+}
+
LLFIO_V2_NAMESPACE_END
diff --git a/include/llfio/v2.0/detail/impl/windows/statfs.ipp b/include/llfio/v2.0/detail/impl/windows/statfs.ipp
index 8b23b4e1..a712433b 100644
--- a/include/llfio/v2.0/detail/impl/windows/statfs.ipp
+++ b/include/llfio/v2.0/detail/impl/windows/statfs.ipp
@@ -141,6 +141,13 @@ LLFIO_HEADERS_ONLY_MEMFUNC_SPEC result<size_t> statfs_t::fill(const handle &h, s
f_iosize = ffssi->PhysicalBytesPerSectorForPerformance;
++ret;
}
+ if(!!(wanted & want::iosinprogress) || !!(wanted & want::iosbusytime))
+ {
+ if(f_mntfromname.empty())
+ {
+ wanted |= want::mntfromname;
+ }
+ }
if((wanted & want::mntfromname) || (wanted & want::mntonname))
{
// Irrespective we need the path before figuring out the mounted device
@@ -240,6 +247,20 @@ LLFIO_HEADERS_ONLY_MEMFUNC_SPEC result<size_t> statfs_t::fill(const handle &h, s
break;
}
}
+ if(!!(wanted & want::iosinprogress) || !!(wanted & want::iosbusytime))
+ {
+ OUTCOME_TRY(auto &&ios, _fill_ios(h, f_mntfromname));
+ if(!!(wanted & want::iosinprogress))
+ {
+ f_iosinprogress = ios.first;
+ ++ret;
+ }
+ if(!!(wanted & want::iosbusytime))
+ {
+ f_iosbusytime = ios.second;
+ ++ret;
+ }
+ }
return ret;
}
diff --git a/include/llfio/v2.0/dynamic_thread_pool_group.hpp b/include/llfio/v2.0/dynamic_thread_pool_group.hpp
new file mode 100644
index 00000000..09828a6e
--- /dev/null
+++ b/include/llfio/v2.0/dynamic_thread_pool_group.hpp
@@ -0,0 +1,530 @@
+/* Dynamic thread pool group
+(C) 2020-2021 Niall Douglas <http://www.nedproductions.biz/> (9 commits)
+File Created: Dec 2020
+
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License in the accompanying file
+Licence.txt or at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+
+Distributed under the Boost Software License, Version 1.0.
+ (See accompanying file Licence.txt or copy at
+ http://www.boost.org/LICENSE_1_0.txt)
+*/
+
+#ifndef LLFIO_DYNAMIC_THREAD_POOL_GROUP_H
+#define LLFIO_DYNAMIC_THREAD_POOL_GROUP_H
+
+#include "deadline.h"
+
+#include <memory> // for unique_ptr and shared_ptr
+
+#ifdef _MSC_VER
+#pragma warning(push)
+#pragma warning(disable : 4251) // dll interface
+#pragma warning(disable : 4275) // dll interface
+#endif
+
+LLFIO_V2_NAMESPACE_EXPORT_BEGIN
+
+class dynamic_thread_pool_group_impl;
+class io_handle;
+
+namespace detail
+{
+ struct global_dynamic_thread_pool_impl;
+ struct global_dynamic_thread_pool_impl_workqueue_item;
+ LLFIO_HEADERS_ONLY_FUNC_SPEC global_dynamic_thread_pool_impl &global_dynamic_thread_pool() noexcept;
+} // namespace detail
+
+/*! \class dynamic_thread_pool_group
+\brief Work group within the global dynamic thread pool.
+
+Some operating systems provide a per-process global kernel thread pool capable of
+dynamically adjusting its kernel thread count to how many of the threads in
+the pool are currently blocked. The platform will choose the exact strategy used,
+but as an example of a strategy, one might keep creating new kernel threads
+so long as the total threads currently running and not blocked on page faults,
+i/o or syscalls, is below the hardware concurrency. Similarly, if more threads
+are running and not blocked than hardware concurrency, one might remove kernel
+threads from executing work. Such a strategy would dynamically increase
+concurrency until all CPUs are busy, but reduce concurrency if more work is
+being done than CPUs available.
+
+Such dynamic kernel thread pools are excellent for CPU bound processing, you
+simply fire and forget work into them. However, for i/o bound processing, you
+must be careful as there are gotchas. For non-seekable i/o, it is very possible
+that there could be 100k handles upon which we do i/o. Doing i/o on
+100k handles using a dynamic thread pool would in theory cause the creation
+of 100k kernel threads, which would not be wise. A much better solution is
+to use an `io_multiplexer` to await changes in large sets of i/o handles.
+
+For seekable i/o, the same problem applies, but worse again: an i/o bound problem
+would cause a rapid increase in the number of kernel threads, which by
+definition makes i/o even more congested. Basically the system runs off
+into pathological performance loss. You must therefore never naively do
+i/o bound work (e.g. with memory mapped files) from within a dynamic thread
+pool without employing some mechanism to force concurrency downwards if
+the backing storage is congested.
+
+## Work groups
+
+Instances of this class contain zero or more work items. Each work item
+is asked for its next item of work, and if an item of work is available,
+that item of work is executed by the global kernel thread pool at a time
+of its choosing. It is NEVER possible that any one work item is concurrently
+executed at a time, each work item is always sequentially executed with
+respect to itself. The only concurrency possible is *across* work items.
+Therefore, if you want to execute the same piece of code concurrently,
+you need to submit a separate work item for each possible amount of
+concurrency (e.g. `std::thread::hardware_concurrency()`).
+
+You can have as many or as few items of work as you like. You can
+dynamically submit additional work items at any time, except when a group
+is currently in the process of being stopped. The group of work items can
+be waited upon to complete, after which the work group becomes reset as
+if back to freshly constructed. You can also stop executing all the work
+items in the group, even if they have not fully completed. If any work
+item returns a failure, this equals a `stop()`, and the next `wait()` will
+return that error.
+
+Work items may create sub work groups as part of their
+operation. If they do so, the work items from such nested work groups are
+scheduled preferentially. This ensures good forward progress, so if you
+have 100 work items each of which do another 100 work items, you don't get
+10,000 slowly progressing work. Rather, the work items in the first set
+progress slowly, whereas the work items in the second set progress quickly.
+
+`work_item::next()` may optionally set a deadline to delay when that work
+item ought to be processed again. Deadlines can be relative or absolute.
+
+## C++ 23 Executors
+
+As with elsewhere in LLFIO, as a low level facility, we don't implement
+https://wg21.link/P0443 Executors, but it is trivially easy to implement
+a dynamic equivalent to `std::static_thread_pool` using this class.
+
+## Implementation notes
+
+### Microsoft Windows
+
+On Microsoft Windows, the Win32 thread pool API is used (https://docs.microsoft.com/en-us/windows/win32/procthread/thread-pool-api).
+This is an IOCP-aware thread pool which will dynamically increase the number
+of kernel threads until none are blocked. If more kernel threads
+are running than twice the number of CPUs in the system, the number of kernel
+threads is dynamically reduced. The maximum number of kernel threads which
+will run simultaneously is 500. Note that the Win32 thread pool is shared
+across the process by multiple Windows facilities.
+
+Note that the Win32 thread pool has built in support for IOCP, so if you
+have a custom i/o multiplexer, you can use the global Win32 thread pool
+to execute i/o completions handling. See `CreateThreadpoolIo()` for more.
+
+No dynamic memory allocation is performed by this implementation outside
+of the initial `make_dynamic_thread_pool_group()`. The Win32 thread pool
+API may perform dynamic memory allocation internally, but that is outside
+our control.
+
+Overhead of LLFIO above the Win32 thread pool API is very low, statistically
+unmeasurable.
+
+### POSIX
+
+If not on Linux, you will need libdispatch which is detected by LLFIO cmake
+during configuration. libdispatch is better known as
+Grand Central Dispatch, originally a Mac OS technology but since ported
+to a high quality kernel based implementation on recent FreeBSDs, and to
+a lower quality userspace based implementation on Linux. Generally
+libdispatch should get automatically found on Mac OS without additional
+effort; on FreeBSD it may need installing from ports; on Linux you would
+need to explicitly install `libdispatch-dev` or the equivalent. You can
+force the use in cmake of libdispatch by setting the cmake variable
+`LLFIO_USE_LIBDISPATCH` to On.
+
+Overhead of LLFIO above the libdispatch API is very low, statistically
+unmeasurable.
+
+### Linux
+
+On Linux only, we have a custom userspace implementation with superior performance.
+A similar strategy to Microsoft Windows' approach is used. We
+dynamically increase the number of kernel threads until none are sleeping
+awaiting i/o. If more kernel threads are running than three more than the number of
+CPUs in the system, the number of kernel threads is dynamically reduced.
+Note that **all** the kernel threads for the current process are considered,
+not just the kernel threads created by this thread pool implementation.
+Therefore, if you have alternative thread pool implementations (e.g. OpenMP,
+`std::async`), those are also included in the dynamic adjustment.
+
+As this is wholly implemented by this library, dynamic memory allocation
+occurs in the initial `make_dynamic_thread_pool_group()` and per thread
+creation, but otherwise the implementation does not perform dynamic memory
+allocations.
+
+After multiple rewrites, eventually I got this custom userspace implementation
+to have superior performance to both ASIO and libdispatch. For larger work
+items the difference is meaningless between all three, however for smaller
+work items I benchmarked this custom userspace implementation as beating
+(non-dynamic) ASIO by approx 29% and Linux libdispatch by approx 52% (note
+that Linux libdispatch appears to have a scale up bug when work items are
+small and few, it is often less than half the performance of LLFIO's custom
+implementation).
+*/
+class LLFIO_DECL dynamic_thread_pool_group
+{
+ friend class dynamic_thread_pool_group_impl;
+public:
+ //! An individual item of work within the work group.
+ class work_item
+ {
+ friend struct detail::global_dynamic_thread_pool_impl;
+ friend struct detail::global_dynamic_thread_pool_impl_workqueue_item;
+ friend class dynamic_thread_pool_group_impl;
+ std::atomic<dynamic_thread_pool_group_impl *> _parent{nullptr};
+ void *_internalworkh{nullptr};
+ void *_internaltimerh{nullptr}; // lazily created if next() ever returns a deadline
+ work_item *_prev{nullptr}, *_next{nullptr}, *_next_scheduled{nullptr};
+ std::atomic<intptr_t> _nextwork{-1};
+ std::chrono::steady_clock::time_point _timepoint1;
+ std::chrono::system_clock::time_point _timepoint2;
+ int _internalworkh_inuse{0};
+
+ protected:
+ constexpr bool _has_timer_set_relative() const noexcept { return _timepoint1 != std::chrono::steady_clock::time_point(); }
+ constexpr bool _has_timer_set_absolute() const noexcept { return _timepoint2 != std::chrono::system_clock::time_point(); }
+ constexpr bool _has_timer_set() const noexcept { return _has_timer_set_relative() || _has_timer_set_absolute(); }
+
+ constexpr work_item() {}
+ work_item(const work_item &o) = delete;
+ work_item(work_item &&o) noexcept
+ : _parent(o._parent.load(std::memory_order_relaxed))
+ , _internalworkh(o._internalworkh)
+ , _internaltimerh(o._internaltimerh)
+ , _prev(o._prev)
+ , _next(o._next)
+ , _next_scheduled(o._next_scheduled)
+ , _nextwork(o._nextwork.load(std::memory_order_relaxed))
+ , _timepoint1(o._timepoint1)
+ , _timepoint2(o._timepoint2)
+ , _internalworkh_inuse(o._internalworkh_inuse)
+ {
+ assert(o._parent.load(std::memory_order_relaxed) == nullptr);
+ assert(o._internalworkh == nullptr);
+ assert(o._internaltimerh == nullptr);
+ if(o._parent.load(std::memory_order_relaxed) != nullptr || o._internalworkh != nullptr)
+ {
+ LLFIO_LOG_FATAL(this, "FATAL: dynamic_thread_pool_group::work_item was relocated in memory during use!");
+ abort();
+ }
+ o._prev = o._next = o._next_scheduled = nullptr;
+ o._nextwork.store(-1, std::memory_order_relaxed);
+ o._internalworkh_inuse = 0;
+ }
+ work_item &operator=(const work_item &) = delete;
+ work_item &operator=(work_item &&) = delete;
+
+ public:
+ virtual ~work_item()
+ {
+ assert(_nextwork.load(std::memory_order_relaxed) == -1);
+ if(_nextwork.load(std::memory_order_relaxed) != -1)
+ {
+ LLFIO_LOG_FATAL(this, "FATAL: dynamic_thread_pool_group::work_item destroyed before all work was done!");
+ abort();
+ }
+ assert(_internalworkh == nullptr);
+ assert(_internaltimerh == nullptr);
+ assert(_parent == nullptr);
+ if(_internalworkh != nullptr || _parent != nullptr)
+ {
+ LLFIO_LOG_FATAL(this, "FATAL: dynamic_thread_pool_group::work_item destroyed before group_complete() was executed!");
+ abort();
+ }
+ }
+
+ //! Returns the parent work group between successful submission and just before `group_complete()`.
+ dynamic_thread_pool_group *parent() const noexcept { return reinterpret_cast<dynamic_thread_pool_group *>(_parent.load(std::memory_order_relaxed)); }
+
+ /*! Invoked by the i/o thread pool to determine if this work item
+ has more work to do.
+
+ \return If there is no work _currently_ available to do, but there
+ might be some later, you should return zero. You will be called again
+ later after other work has been done. If you return -1, you are
+ saying that no further work will be done, and the group need never
+ call you again. If you have more work you want to do, return any
+ other value.
+ \param d Optional delay before the next item of work ought to be
+ executed (return != 0), or `next()` ought to be called again to
+ determine the next item (return == 0). On entry `d` is set to no
+ delay, so if you don't modify it, the next item of work occurs
+ as soon as possible.
+
+ Note that this function is called from multiple kernel threads.
+ You must NOT do any significant work in this function.
+ In particular do NOT call any dynamic thread pool group function,
+ as you will experience deadlock.
+
+ `dynamic_thread_pool_group::current_work_item()` may have any
+ value during this call.
+ */
+ virtual intptr_t next(deadline &d) noexcept = 0;
+
+ /*! Invoked by the i/o thread pool to perform the next item of work.
+
+ \return Any failure causes all remaining work in this group to
+ be cancelled as soon as possible.
+ \param work The value returned by `next()`.
+
+ Note that this function is called from multiple kernel threads,
+ and may not be the kernel thread from which `next()`
+ was called.
+
+ `dynamic_thread_pool_group::current_work_item()` will always be
+ `this` during this call.
+ */
+ virtual result<void> operator()(intptr_t work) noexcept = 0;
+
+ /*! Invoked by the i/o thread pool when all work in this thread
+ pool group is complete.
+
+ `cancelled` indicates if this is an abnormal completion. If its
+ error compares equal to `errc::operation_cancelled`, then `stop()`
+ was called.
+
+ Just before this is called for all work items submitted, the group
+ becomes reset to fresh, and `parent()` becomes null. You can resubmit
+ this work item, but do not submit other work items until their
+ `group_complete()` has been invoked.
+
+ Note that this function is called from multiple kernel threads.
+
+ `dynamic_thread_pool_group::current_work_item()` may have any
+ value during this call.
+ */
+ virtual void group_complete(const result<void> &cancelled) noexcept { (void) cancelled; }
+ };
+
+ /*! \class io_aware_work_item
+ \brief A work item which paces when it next executes according to i/o congestion.
+
+ Currently there is only a working implementation of this for the Microsoft Windows
+ and Linux platforms, due to lack of working `statfs_t::f_iosinprogress` on other
+ platforms. If retrieving that for a seekable handle does not work, the constructor
+ throws an exception.
+
+ For seekable handles, currently `reads`, `writes` and `barriers` are ignored. We
+ simply retrieve, periodically, `statfs_t::f_iosinprogress` and `statfs_t::f_iosbusytime`
+ for the storage devices backing the seekable handle. If the recent averaged i/o wait time exceeds
+ `max_iosbusytime` and the i/o in progress > `max_iosinprogress`, `next()` will
+ start setting the default deadline passed to
+ `io_aware_next()`. Thereafter, every 1/10th of a second, if `statfs_t::f_iosinprogress`
+ is above `max_iosinprogress`, it will increase the deadline by 1/16th, whereas if it is
+ below `min_iosinprogress`, it will decrease the deadline by 1/16th. The default deadline
+ chosen is always the worst of all the
+ storage devices of all the handles. This will reduce concurrency within the kernel thread pool
+ in order to reduce congestion on the storage devices. If at any point `statfs_t::f_iosbusytime`
+ drops below `max_iosbusytime` as averaged across one second, and `statfs_t::f_iosinprogress` drops
+ below `min_iosinprogress`, the additional
+ throttling is completely removed. `io_aware_next()` can ignore the default deadline
+ passed into it, and can set any other deadline.
+
+ For non-seekable handles, the handle must have an i/o multiplexer set upon it, and on
+ Microsoft Windows, that i/o multiplexer must be utilising the IOCP instance of the
+ global Win32 thread pool. For each `reads`, `writes` and `barriers` which is non-zero,
+ a corresponding zero length i/o is constructed and initiated. When the i/o completes,
+ and all readable handles in the work item's set have data waiting to be read, and all
+ writable handles in the work item's set have space to allow writes, only then is the
+ work item invoked with the next piece of work.
+
+ \note Non-seekable handle support is not implemented yet.
+ */
+ class LLFIO_DECL io_aware_work_item : public work_item
+ {
+ public:
+ //! Maximum i/o busyness above which throttling is to begin.
+ float max_iosbusytime{0.95f};
+ //! Minimum i/o in progress to target if `iosbusytime` exceeded. The default of 16 suits SSDs, you want around 4 for spinning rust or NV-RAM.
+ uint32_t min_iosinprogress{16};
+ //! Maximum i/o in progress to target if `iosbusytime` exceeded. The default of 32 suits SSDs, you want around 8 for spinning rust or NV-RAM.
+#ifdef _WIN32
+ uint32_t max_iosinprogress{1}; // windows appears to do a lot of i/o coalescing
+#else
+ uint32_t max_iosinprogress{32};
+#endif
+ //! Information about an i/o handle this work item will use
+ struct io_handle_awareness
+ {
+ //! An i/o handle this work item will use
+ io_handle *h{nullptr};
+ //! The relative amount of reading done by this work item from the handle.
+ float reads{0};
+ //! The relative amount of writing done by this work item to the handle.
+ float writes{0};
+ //! The relative amount of write barriering done by this work item to the handle.
+ float barriers{0};
+
+ void *_internal{nullptr};
+ };
+
+ private:
+ const span<io_handle_awareness> _handles;
+
+ LLFIO_HEADERS_ONLY_VIRTUAL_SPEC intptr_t next(deadline &d) noexcept override final;
+
+ public:
+ constexpr io_aware_work_item() {}
+ /*! \brief Constructs a work item aware of i/o done to the handles in `hs`.
+
+ Note that the `reads`, `writes` and `barriers` are normalised to proportions
+ out of `1.0` by this constructor, so if for example you had `reads/writes/barriers = 200/100/0`,
+ after normalisation those become `0.66/0.33/0.0` such that the total is `1.0`.
+ If `reads/writes/barriers = 0/0/0` on entry, they are replaced with `0.5/0.5/0.0`.
+
+ Note that normalisation is across *all* i/o handles in the set, so three handles
+ each with `reads/writes/barriers = 200/100/0` on entry would have `0.22/0.11/0.0`
+ each after construction.
+ */
+ explicit LLFIO_HEADERS_ONLY_MEMFUNC_SPEC io_aware_work_item(span<io_handle_awareness> hs);
+ io_aware_work_item(io_aware_work_item &&o) noexcept
+ : work_item(std::move(o))
+ , _handles(o._handles)
+ {
+ }
+ LLFIO_HEADERS_ONLY_MEMFUNC_SPEC ~io_aware_work_item();
+
+ //! The handles originally registered during construction.
+ span<io_handle_awareness> handles() const noexcept { return _handles; }
+
+ /*! \brief As for `work_item::next()`, but deadline may be extended to
+ reduce i/o congestion on the hardware devices to which the handles
+ refer.
+ */
+ virtual intptr_t io_aware_next(deadline &d) noexcept = 0;
+ };
+
+ virtual ~dynamic_thread_pool_group() {}
+
+ /*! \brief A textual description of the underlying implementation of
+ this dynamic thread pool group.
+
+ The current possible underlying implementations are:
+
+ - "Grand Central Dispatch" (Mac OS, FreeBSD, Linux)
+ - "Linux native" (Linux)
+ - "Win32 thread pool (Vista+)" (Windows)
+
+ Which one is chosen depends on what was detected at cmake configure time,
+ and possibly what the host OS running the program binary supports.
+ */
+ static LLFIO_HEADERS_ONLY_MEMFUNC_SPEC const char *implementation_description() noexcept;
+
+ /*! \brief Threadsafe. Submit one or more work items for execution. Note that you can submit more later.
+
+ Note that if the group is currently stopping, you cannot submit more
+ work until the group has stopped. An error code comparing equal to
+ `errc::operation_canceled` is returned if you try.
+ */
+ virtual result<void> submit(span<work_item *> work) noexcept = 0;
+ //! \overload
+ result<void> submit(work_item *wi) noexcept { return submit(span<work_item *>(&wi, 1)); }
+ //! \overload
+ LLFIO_TEMPLATE(class T)
+ LLFIO_TREQUIRES(LLFIO_TPRED(!std::is_pointer<T>::value), LLFIO_TPRED(std::is_base_of<work_item, T>::value))
+ result<void> submit(span<T> wi) noexcept
+ {
+ auto *wis = (T **) alloca(sizeof(T *) * wi.size());
+ for(size_t n = 0; n < wi.size(); n++)
+ {
+ wis[n] = &wi[n];
+ }
+ return submit(span<work_item *>((work_item **) wis, wi.size()));
+ }
+
+ //! Threadsafe. Cancel any remaining work previously submitted, but without blocking (use `wait()` to block).
+ virtual result<void> stop() noexcept = 0;
+
+ /*! \brief Threadsafe. True if a work item reported an error, or
+ `stop()` was called, but work items are still running.
+ */
+ virtual bool stopping() const noexcept = 0;
+
+ //! Threadsafe. True if all the work previously submitted is complete.
+ virtual bool stopped() const noexcept = 0;
+
+ //! Threadsafe. Wait for work previously submitted to complete, returning any failures by any work item.
+ virtual result<void> wait(deadline d = {}) const noexcept = 0;
+ //! \overload
+ template <class Rep, class Period> result<bool> wait_for(const std::chrono::duration<Rep, Period> &duration) const noexcept
+ {
+ auto r = wait(duration);
+ if(!r && r.error() == errc::timed_out)
+ {
+ return false;
+ }
+ OUTCOME_TRY(std::move(r));
+ return true;
+ }
+ //! \overload
+ template <class Clock, class Duration> result<bool> wait_until(const std::chrono::time_point<Clock, Duration> &timeout) const noexcept
+ {
+ auto r = wait(timeout);
+ if(!r && r.error() == errc::timed_out)
+ {
+ return false;
+ }
+ OUTCOME_TRY(std::move(r));
+ return true;
+ }
+
+ //! Returns the work item nesting level which would be used if a new dynamic thread pool group were created within the current work item.
+ static LLFIO_HEADERS_ONLY_MEMFUNC_SPEC size_t current_nesting_level() noexcept;
+ //! Returns the work item the calling thread is running within, if any.
+ static LLFIO_HEADERS_ONLY_MEMFUNC_SPEC work_item *current_work_item() noexcept;
+ /*! \brief Returns the number of milliseconds that a thread is without work before it is shut down.
+ Note that this will be zero on all but on Linux if using our local thread pool
+ implementation, because the system controls this value on Windows, Grand Central
+ Dispatch etc.
+ */
+ static LLFIO_HEADERS_ONLY_MEMFUNC_SPEC uint32_t ms_sleep_for_more_work() noexcept;
+ /*! \brief Sets the number of milliseconds that a thread is without work before it is shut down,
+ returning the value actually set.
+
+ Note that this will have no effect (and thus return zero) on all but on Linux if
+ using our local thread pool implementation, because the system controls this value
+ on Windows, Grand Central Dispatch etc.
+ */
+ static LLFIO_HEADERS_ONLY_MEMFUNC_SPEC uint32_t ms_sleep_for_more_work(uint32_t v) noexcept;
+};
+//! A unique ptr to a work group within the global dynamic thread pool.
+using dynamic_thread_pool_group_ptr = std::unique_ptr<dynamic_thread_pool_group>;
+
+//! Creates a new work group within the global dynamic thread pool.
+LLFIO_HEADERS_ONLY_FUNC_SPEC result<dynamic_thread_pool_group_ptr> make_dynamic_thread_pool_group() noexcept;
+
+// BEGIN make_free_functions.py
+// END make_free_functions.py
+
+LLFIO_V2_NAMESPACE_END
+
+#ifdef _MSC_VER
+#pragma warning(pop)
+#endif
+
+#if LLFIO_HEADERS_ONLY == 1 && !defined(DOXYGEN_SHOULD_SKIP_THIS)
+#define LLFIO_INCLUDED_BY_HEADER 1
+#include "detail/impl/dynamic_thread_pool_group.ipp"
+#undef LLFIO_INCLUDED_BY_HEADER
+#endif
+
+#endif
diff --git a/include/llfio/v2.0/fs_handle.hpp b/include/llfio/v2.0/fs_handle.hpp
index 8700a87b..1be9bdaf 100644
--- a/include/llfio/v2.0/fs_handle.hpp
+++ b/include/llfio/v2.0/fs_handle.hpp
@@ -144,6 +144,8 @@ public:
using path_view_type = path_view;
//! The unique identifier type used by this handle
using unique_id_type = QUICKCPPLIB_NAMESPACE::integers128::uint128;
+ //! A hasher for the unique identifier type used by this handle
+ using unique_id_type_hasher = QUICKCPPLIB_NAMESPACE::integers128::uint128_hasher;
protected:
mutable dev_t _devid{0};
diff --git a/include/llfio/v2.0/llfio.hpp b/include/llfio/v2.0/llfio.hpp
index 88bfad2b..a7c23b27 100644
--- a/include/llfio/v2.0/llfio.hpp
+++ b/include/llfio/v2.0/llfio.hpp
@@ -63,13 +63,16 @@ import LLFIO_MODULE_NAME;
#include "utils.hpp"
#include "directory_handle.hpp"
+#ifndef LLFIO_EXCLUDE_DYNAMIC_THREAD_POOL_GROUP
+#include "dynamic_thread_pool_group.hpp"
+#endif
+#include "fast_random_file_handle.hpp"
#include "file_handle.hpp"
#include "process_handle.hpp"
#include "statfs.hpp"
#ifdef LLFIO_INCLUDE_STORAGE_PROFILE
#include "storage_profile.hpp"
#endif
-#include "fast_random_file_handle.hpp"
#include "symlink_handle.hpp"
#include "algorithm/clone.hpp"
diff --git a/include/llfio/v2.0/logging.hpp b/include/llfio/v2.0/logging.hpp
index 1dff759a..7ef30363 100644
--- a/include/llfio/v2.0/logging.hpp
+++ b/include/llfio/v2.0/logging.hpp
@@ -77,7 +77,7 @@ public:
{
reinterpret_cast<log_level &>(detail::thread_local_log_level()) = n;
}
- ~log_level_guard() {reinterpret_cast<log_level &>(detail::thread_local_log_level()) = _v; }
+ ~log_level_guard() { reinterpret_cast<log_level &>(detail::thread_local_log_level()) = _v; }
};
// Infrastructure for recording the current path for when failure occurs
@@ -262,8 +262,9 @@ namespace detail
return span<char>(buffer, length);
}
// Strips a __PRETTY_FUNCTION__ of all instances of ::LLFIO_V2_NAMESPACE:: and ::LLFIO_V2_NAMESPACE::
- inline void strip_pretty_function(char *out, size_t bytes, const char *in)
+ inline void strip_pretty_function(char *_out, size_t bytes, const char *in)
{
+ char *out = _out;
const span<char> remove1 = llfio_namespace_string();
const span<char> remove2 = outcome_namespace_string();
for(--bytes; bytes && *in; --bytes)
@@ -272,6 +273,32 @@ namespace detail
in += remove1.size();
if(!strncmp(in, remove2.data(), remove2.size()))
in += remove2.size();
+ if(!strncmp(in, "basic_result<", 13))
+ {
+ int count = 13;
+ for(--bytes; bytes && *in && count; --bytes, --count)
+ {
+ *out++ = *in++;
+ }
+ if(!*in || bytes ==0)
+ {
+ break;
+ }
+ count = 1;
+ while(*in && count > 0)
+ {
+ if(*in == '<')
+ {
+ count++;
+ }
+ else if(*in == '>')
+ {
+ count--;
+ }
+ in++;
+ }
+ in--;
+ }
*out++ = *in++;
}
*out = 0;
diff --git a/include/llfio/v2.0/statfs.hpp b/include/llfio/v2.0/statfs.hpp
index 71732da5..c8af2fde 100644
--- a/include/llfio/v2.0/statfs.hpp
+++ b/include/llfio/v2.0/statfs.hpp
@@ -1,5 +1,5 @@
/* Information about the volume storing a file
-(C) 2015-2017 Niall Douglas <http://www.nedproductions.biz/> (8 commits)
+(C) 2015-2020 Niall Douglas <http://www.nedproductions.biz/> (8 commits)
File Created: Dec 2015
@@ -41,13 +41,40 @@ LLFIO_V2_NAMESPACE_EXPORT_BEGIN
class handle;
+namespace detail
+{
+ inline constexpr float constexpr_float_allbits_set_nan()
+ {
+#if defined(_MSC_VER) && !defined(__clang__)
+ // Not all bits 1, but I can't see how to do better whilst inside constexpr
+ return -NAN;
+#else
+ return -__builtin_nanf("0xffffff"); // all bits 1
+#endif
+ }
+} // namespace detail
+
/*! \struct statfs_t
\brief Metadata about a filing system. Unsupported entries are all bits set.
+
+Note also that for some fields, a soft failure to read the requested value manifests
+as all bits set. For example, `f_iosinprogress` might not be computable if the
+filing system for your handle reports a `dev_t` from `fstat()` which does not
+match anything in the system's disk hardware i/o stats. As this can be completely
+benign (e.g. your handle is a socket), this is treated as a soft failure.
+
+Note for `f_iosinprogress` and `f_iosbusytime` that support is not implemented yet
+outside Microsoft Windows and Linux. Note also that for Linux, filing systems
+spanning multiple hardware devices have undefined outcomes, whereas on Windows
+you are given the average of the values for all underlying hardware devices.
+Code donations improving the support for these items on Mac OS, FreeBSD and Linux
+would be welcomed.
*/
struct LLFIO_DECL statfs_t
{
static constexpr uint32_t _allbits1_32 = ~0U;
static constexpr uint64_t _allbits1_64 = ~0ULL;
+ static constexpr float _allbits1_float = detail::constexpr_float_allbits_set_nan();
struct f_flags_t
{
uint32_t rdonly : 1; //!< Filing system is read only (Windows, POSIX)
@@ -75,11 +102,31 @@ struct LLFIO_DECL statfs_t
std::string f_mntfromname; /*!< mounted filesystem (Windows, POSIX) */
filesystem::path f_mntonname; /*!< directory on which mounted (Windows, POSIX) */
+ uint32_t f_iosinprogress{_allbits1_32}; /*!< i/o's currently in progress (i.e. queue depth) (Windows, Linux) */
+ float f_iosbusytime{_allbits1_float}; /*!< percentage of time spent doing i/o (1.0 = 100%) (Windows, Linux) */
+
//! Used to indicate what metadata should be filled in
- QUICKCPPLIB_BITFIELD_BEGIN(want) { flags = 1 << 0, bsize = 1 << 1, iosize = 1 << 2, blocks = 1 << 3, bfree = 1 << 4, bavail = 1 << 5, files = 1 << 6, ffree = 1 << 7, namemax = 1 << 8, owner = 1 << 9, fsid = 1 << 10, fstypename = 1 << 11, mntfromname = 1 << 12, mntonname = 1 << 13, all = static_cast<unsigned>(-1) }
- QUICKCPPLIB_BITFIELD_END(want)
+ QUICKCPPLIB_BITFIELD_BEGIN(want){flags = 1 << 0,
+ bsize = 1 << 1,
+ iosize = 1 << 2,
+ blocks = 1 << 3,
+ bfree = 1 << 4,
+ bavail = 1 << 5,
+ files = 1 << 6,
+ ffree = 1 << 7,
+ namemax = 1 << 8,
+ owner = 1 << 9,
+ fsid = 1 << 10,
+ fstypename = 1 << 11,
+ mntfromname = 1 << 12,
+ mntonname = 1 << 13,
+ iosinprogress = 1 << 14,
+ iosbusytime = 1 << 15,
+ all = static_cast<unsigned>(-1)} QUICKCPPLIB_BITFIELD_END(want)
//! Constructs a default initialised instance (all bits set)
- statfs_t() {} // NOLINT Cannot be constexpr due to lack of constexpe string default constructor :(
+ statfs_t()
+ {
+ } // NOLINT Cannot be constexpr due to lack of constexpe string default constructor :(
#ifdef __cpp_exceptions
//! Constructs a filled instance, throwing as an exception any error which might occur
explicit statfs_t(const handle &h, want wanted = want::all)
@@ -94,6 +141,10 @@ struct LLFIO_DECL statfs_t
#endif
//! Fills in the structure with metadata, returning number of items filled in
LLFIO_HEADERS_ONLY_MEMFUNC_SPEC result<size_t> fill(const handle &h, want wanted = want::all) noexcept;
+
+private:
+ // Implemented in file_handle.ipp on Windows, otherwise in statfs.ipp
+ static LLFIO_HEADERS_ONLY_MEMFUNC_SPEC result<std::pair<uint32_t, float>> _fill_ios(const handle &h, const std::string &mntfromname) noexcept;
};
LLFIO_V2_NAMESPACE_END
diff --git a/include/llfio/v2.0/status_code.hpp b/include/llfio/v2.0/status_code.hpp
index 80fd905d..03eb0f67 100644
--- a/include/llfio/v2.0/status_code.hpp
+++ b/include/llfio/v2.0/status_code.hpp
@@ -25,8 +25,6 @@ Distributed under the Boost Software License, Version 1.0.
#ifndef LLFIO_STATUS_CODE_HPP
#define LLFIO_STATUS_CODE_HPP
-#include "logging.hpp"
-
/* The SG14 status code implementation is quite profoundly different to the
error code implementation. In the error code implementation, std::error_code
is fixed by the standard library, so we wrap it with extra metadata into
@@ -68,6 +66,8 @@ as that (a) enables safe header only LLFIO on Windows (b) produces better codege
#error LLFIO needs Outcome v2.2 or higher
#endif
+#include "logging.hpp"
+
LLFIO_V2_NAMESPACE_BEGIN
#ifndef LLFIO_DISABLE_PATHS_IN_FAILURE_INFO
@@ -351,6 +351,8 @@ LLFIO_V2_NAMESPACE_END
#error LLFIO needs Outcome v2.2 or higher
#endif
+#include "logging.hpp"
+
LLFIO_V2_NAMESPACE_BEGIN
namespace detail
diff --git a/programs/CMakeLists.txt b/programs/CMakeLists.txt
index 07db5753..ad4bb919 100644
--- a/programs/CMakeLists.txt
+++ b/programs/CMakeLists.txt
@@ -11,8 +11,7 @@ find_quickcpplib_library(quickcpplib
)
find_quickcpplib_library(outcome
GIT_REPOSITORY "https://github.com/ned14/outcome.git"
-# GIT_TAG "develop"
- GIT_TAG "better_optimisation" ## future Outcome v2.2
+ GIT_TAG "master"
REQUIRED
IS_HEADER_ONLY
)
@@ -48,13 +47,15 @@ function(make_program program)
endfunction()
make_program(benchmark-async llfio::hl)
+make_program(benchmark-dynamic_thread_pool_group llfio::hl)
make_program(benchmark-iostreams llfio::hl)
make_program(benchmark-locking llfio::hl kerneltest::hl)
make_program(fs-probe llfio::hl)
make_program(illegal-codepoints llfio::hl)
make_program(key-value-store llfio::hl)
-target_include_directories(benchmark-async PRIVATE "benchmark-async/asio/asio/include")
+target_include_directories(benchmark-async PRIVATE "asio/asio/include")
+target_include_directories(benchmark-dynamic_thread_pool_group PRIVATE "asio/asio/include")
if(MSVC)
target_compile_options(illegal-codepoints PUBLIC /utf-8)
diff --git a/programs/benchmark-async/main.cpp b/programs/benchmark-async/main.cpp
index 1213dd4f..613f338d 100644
--- a/programs/benchmark-async/main.cpp
+++ b/programs/benchmark-async/main.cpp
@@ -377,13 +377,13 @@ completion i/o min 1300 max 1.40714e+06 mean 23037 stddev 15035.4
#include <typeinfo>
#include <vector>
-#if __has_include("asio/asio/include/asio.hpp")
+#if __has_include("../asio/asio/include/asio.hpp")
#define ENABLE_ASIO 1
#if defined(__clang__) && defined(_MSC_VER)
#pragma clang diagnostic push
#pragma clang diagnostic ignored "-Wmicrosoft-include"
#endif
-#include "asio/asio/include/asio.hpp"
+#include "../asio/asio/include/asio.hpp"
#if defined(__clang__) && defined(_MSC_VER)
#pragma clang diagnostic pop
#endif
diff --git a/programs/benchmark-dynamic_thread_pool_group/main.cpp b/programs/benchmark-dynamic_thread_pool_group/main.cpp
new file mode 100644
index 00000000..4fc6c926
--- /dev/null
+++ b/programs/benchmark-dynamic_thread_pool_group/main.cpp
@@ -0,0 +1,232 @@
+/* Test the performance of dynamic thread pool group
+(C) 2021 Niall Douglas <http://www.nedproductions.biz/> (6 commits)
+File Created: Feb 2021
+
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License in the accompanying file
+Licence.txt or at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+
+Distributed under the Boost Software License, Version 1.0.
+ (See accompanying file Licence.txt or copy at
+ http://www.boost.org/LICENSE_1_0.txt)
+*/
+
+//! Seconds to run the benchmark
+static constexpr unsigned BENCHMARK_DURATION = 10;
+//! Maximum work items to create
+static constexpr unsigned MAX_WORK_ITEMS = 1024;
+// Size of buffer to SHA256
+static constexpr unsigned SHA256_BUFFER_SIZE = 4096;
+
+#include "../../include/llfio/llfio.hpp"
+
+#include "quickcpplib/algorithm/small_prng.hpp"
+
+#include <cfloat>
+#include <chrono>
+#include <cmath>
+#include <fstream>
+#include <iostream>
+#include <thread>
+#include <tuple>
+#include <vector>
+
+#if __has_include("../asio/asio/include/asio.hpp")
+#define ENABLE_ASIO 1
+#if defined(__clang__) && defined(_MSC_VER)
+#pragma clang diagnostic push
+#pragma clang diagnostic ignored "-Wmicrosoft-include"
+#endif
+#include "../asio/asio/include/asio.hpp"
+#if defined(__clang__) && defined(_MSC_VER)
+#pragma clang diagnostic pop
+#endif
+#endif
+
+namespace llfio = LLFIO_V2_NAMESPACE;
+
+struct llfio_runner
+{
+ std::atomic<bool> cancel{false};
+ llfio::dynamic_thread_pool_group_ptr group = llfio::make_dynamic_thread_pool_group().value();
+ std::vector<llfio::dynamic_thread_pool_group::work_item *> workitems;
+
+ ~llfio_runner()
+ {
+ for(auto *p : workitems)
+ {
+ delete p;
+ }
+ }
+ template <class F> void add_workitem(F &&f)
+ {
+ struct workitem final : public llfio::dynamic_thread_pool_group::work_item
+ {
+ llfio_runner *parent;
+ F f;
+ workitem(llfio_runner *_parent, F &&_f)
+ : parent(_parent)
+ , f(std::move(_f))
+ {
+ }
+ virtual intptr_t next(llfio::deadline & /*unused*/) noexcept override { return parent->cancel.load(std::memory_order_relaxed) ? -1 : 1; }
+ virtual llfio::result<void> operator()(intptr_t /*unused*/) noexcept override
+ {
+ f();
+ return llfio::success();
+ }
+ };
+ workitems.push_back(new workitem(this, std::move(f)));
+ }
+ std::chrono::microseconds run(unsigned seconds)
+ {
+ group->submit(workitems).value();
+ auto begin = std::chrono::steady_clock::now();
+ std::this_thread::sleep_for(std::chrono::seconds(seconds));
+ cancel.store(true, std::memory_order_release);
+ group->wait().value();
+ auto end = std::chrono::steady_clock::now();
+ return std::chrono::duration_cast<std::chrono::microseconds>(end - begin);
+ }
+};
+
+
+#if ENABLE_ASIO
+struct asio_runner
+{
+ std::atomic<bool> cancel{false};
+ asio::io_context ctx;
+
+ template <class F> struct C
+ {
+ asio_runner *parent;
+ F f;
+ C(asio_runner *_parent, F &&_f)
+ : parent(_parent)
+ , f(std::move(_f))
+ {
+ }
+ void operator()() const
+ {
+ f();
+ if(!parent->cancel.load(std::memory_order_relaxed))
+ {
+ parent->ctx.post(*this);
+ }
+ }
+ };
+ template <class F> void add_workitem(F &&f) { ctx.post(C<F>(this, std::move(f))); }
+ std::chrono::microseconds run(unsigned seconds)
+ {
+ std::vector<std::thread> threads;
+ for(size_t n = 0; n < std::thread::hardware_concurrency() * 2; n++)
+ {
+ threads.emplace_back([&] { ctx.run(); });
+ }
+ auto begin = std::chrono::steady_clock::now();
+ std::this_thread::sleep_for(std::chrono::seconds(seconds));
+ cancel.store(true, std::memory_order_release);
+ for(auto &i : threads)
+ {
+ i.join();
+ }
+ auto end = std::chrono::steady_clock::now();
+ return std::chrono::duration_cast<std::chrono::microseconds>(end - begin);
+ }
+};
+#endif
+
+template <class Runner> void benchmark(const char *name)
+{
+ std::cout << "\nBenchmarking " << name << " ..." << std::endl;
+ struct shared_t
+ {
+ std::atomic<unsigned> concurrency{0};
+ std::atomic<unsigned> max_concurrency{0};
+ };
+ struct worker
+ {
+ shared_t *shared;
+ char buffer[SHA256_BUFFER_SIZE];
+ QUICKCPPLIB_NAMESPACE::algorithm::hash::sha256_hash::result_type hash;
+ uint64_t count{0};
+
+ void operator()()
+ {
+ auto concurrency = shared->concurrency.fetch_add(1, std::memory_order_relaxed) + 1;
+ if(concurrency > shared->max_concurrency.load(std::memory_order_relaxed))
+ {
+ shared->max_concurrency.store(concurrency, std::memory_order_relaxed);
+ }
+ hash = QUICKCPPLIB_NAMESPACE::algorithm::hash::sha256_hash::hash(buffer, sizeof(buffer));
+ count++;
+ shared->concurrency.fetch_sub(1, std::memory_order_relaxed);
+ }
+ explicit worker(shared_t *_shared)
+ : shared(_shared)
+ {
+ }
+ };
+ std::vector<worker> workers;
+ std::vector<std::tuple<size_t, double, unsigned>> results;
+ QUICKCPPLIB_NAMESPACE::algorithm::small_prng::small_prng rand;
+ for(size_t items = 1; items <= MAX_WORK_ITEMS; items <<= 1)
+ {
+ shared_t shared;
+ workers.clear();
+ for(size_t n = 0; n < items; n++)
+ {
+ workers.emplace_back(&shared);
+ for(size_t i = 0; i < sizeof(worker::buffer); i += 4)
+ {
+ auto *p = (uint32_t *) (workers.back().buffer + i);
+ *p = rand();
+ }
+ }
+ Runner runner;
+ for(auto &i : workers)
+ {
+ runner.add_workitem([&] { i(); });
+ }
+ auto duration = runner.run(BENCHMARK_DURATION);
+ uint64_t total = 0;
+ for(auto &i : workers)
+ {
+ total += i.count;
+ }
+ results.emplace_back(items, 1000000.0 * total / duration.count(), shared.max_concurrency);
+ std::cout << " For " << std::get<0>(results.back()) << " work items got " << std::get<1>(results.back()) << " SHA256 hashes/sec with "
+ << std::get<2>(results.back()) << " maximum concurrency." << std::endl;
+ }
+ std::ofstream out(std::string(name) + "_results.csv");
+ out << R"("Work items","SHA256 hashes/sec","Max concurrency")";
+ for(auto &i : results)
+ {
+ out << "\n" << std::get<0>(i) << "," << std::get<1>(i) << "," << std::get<2>(i);
+ }
+ out << std::endl;
+}
+
+int main(void)
+{
+ std::string llfio_name("llfio (");
+ llfio_name.append(llfio::dynamic_thread_pool_group::implementation_description());
+ llfio_name.push_back(')');
+ benchmark<llfio_runner>(llfio_name.c_str());
+
+#if ENABLE_ASIO
+ benchmark<asio_runner>("asio");
+#endif
+ return 0;
+} \ No newline at end of file
diff --git a/test/tests/dynamic_thread_pool_group.cpp b/test/tests/dynamic_thread_pool_group.cpp
new file mode 100644
index 00000000..66a49425
--- /dev/null
+++ b/test/tests/dynamic_thread_pool_group.cpp
@@ -0,0 +1,450 @@
+/* Integration test kernel for dynamic_thread_pool_group
+(C) 2020 Niall Douglas <http://www.nedproductions.biz/> (2 commits)
+File Created: Dec 2020
+
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License in the accompanying file
+Licence.txt or at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+
+Distributed under the Boost Software License, Version 1.0.
+ (See accompanying file Licence.txt or copy at
+ http://www.boost.org/LICENSE_1_0.txt)
+*/
+
+//#define LLFIO_LOGGING_LEVEL 99
+
+#include "../test_kernel_decl.hpp"
+
+#include <cmath> // for sqrt
+
+static inline void TestDynamicThreadPoolGroupWorks()
+{
+ namespace llfio = LLFIO_V2_NAMESPACE;
+ // llfio::log_level_guard llg(llfio::log_level::all);
+ struct work_item;
+ struct shared_state_t
+ {
+ std::atomic<intptr_t> p{0};
+ std::atomic<size_t> concurrency{0}, max_concurrency{0}, group_completes{0};
+ std::vector<size_t> executed;
+ llfio::dynamic_thread_pool_group_ptr tpg{llfio::make_dynamic_thread_pool_group().value()};
+ std::atomic<bool> cancelling{false};
+ } shared_state;
+ struct work_item final : public llfio::dynamic_thread_pool_group::work_item
+ {
+ using _base = llfio::dynamic_thread_pool_group::work_item;
+ shared_state_t *shared{nullptr};
+ std::atomic<bool> within{false};
+
+ work_item() = default;
+ explicit work_item(shared_state_t *_shared)
+ : shared(_shared)
+ {
+ }
+ work_item(work_item &&o) noexcept
+ : _base(std::move(o))
+ , shared(o.shared)
+ {
+ }
+
+ virtual intptr_t next(llfio::deadline &d) noexcept override
+ {
+ bool expected = false;
+ BOOST_CHECK(within.compare_exchange_strong(expected, true));
+ (void) d;
+ BOOST_CHECK(parent() == shared->tpg.get());
+ auto ret = shared->p.fetch_sub(1);
+ if(ret < 0)
+ {
+ ret = -1;
+ }
+ // std::cout << " next() returns " << ret << std::endl;
+ expected = true;
+ BOOST_CHECK(within.compare_exchange_strong(expected, false));
+ return ret;
+ }
+ virtual llfio::result<void> operator()(intptr_t work) noexcept override
+ {
+ bool expected = false;
+ BOOST_CHECK(within.compare_exchange_strong(expected, true));
+ auto concurrency = shared->concurrency.fetch_add(1) + 1;
+ for(size_t expected_ = shared->max_concurrency; concurrency > expected_;)
+ {
+ shared->max_concurrency.compare_exchange_weak(expected_, concurrency);
+ }
+ BOOST_CHECK(parent() == shared->tpg.get());
+ BOOST_CHECK(llfio::dynamic_thread_pool_group::current_nesting_level() == 1);
+ BOOST_CHECK(llfio::dynamic_thread_pool_group::current_work_item() == this);
+ // std::cout << " () executes " << work << std::endl;
+ std::this_thread::sleep_for(std::chrono::milliseconds(50));
+ auto *executed = (std::atomic<size_t> *) &shared->executed[work];
+ executed->fetch_add(1);
+ shared->concurrency.fetch_sub(1);
+ expected = true;
+ BOOST_CHECK(within.compare_exchange_strong(expected, false));
+ return llfio::success();
+ }
+ virtual void group_complete(const llfio::result<void> &cancelled) noexcept override
+ {
+ bool expected = false;
+ BOOST_CHECK(within.compare_exchange_strong(expected, true));
+ BOOST_CHECK(parent() == nullptr);
+ BOOST_CHECK(shared->cancelling == cancelled.has_error());
+ // std::cout << " group_complete()" << std::endl;
+ shared->group_completes.fetch_add(1);
+ expected = true;
+ BOOST_CHECK(within.compare_exchange_strong(expected, false));
+ }
+ };
+ std::vector<work_item> workitems;
+ auto reset = [&](size_t count) {
+ workitems.clear();
+ shared_state.executed.clear();
+ shared_state.executed.resize(count + 1);
+ for(size_t n = 0; n < count; n++)
+ {
+ workitems.emplace_back(&shared_state);
+ }
+ shared_state.p = (intptr_t) count;
+ shared_state.concurrency = 0;
+ shared_state.max_concurrency = 0;
+ shared_state.group_completes = 0;
+ };
+ auto submit = [&] {
+ auto **wis = (llfio::dynamic_thread_pool_group::work_item **) alloca(sizeof(work_item *) * workitems.size());
+ for(size_t n = 0; n < workitems.size(); n++)
+ {
+ wis[n] = &workitems[n];
+ }
+ BOOST_CHECK(!shared_state.tpg->stopping());
+ BOOST_CHECK(shared_state.tpg->stopped());
+ BOOST_CHECK(llfio::dynamic_thread_pool_group::current_nesting_level() == 0);
+ BOOST_CHECK(llfio::dynamic_thread_pool_group::current_work_item() == nullptr);
+ for(size_t n = 0; n < workitems.size(); n++)
+ {
+ BOOST_CHECK(workitems[n].parent() == nullptr);
+ }
+ shared_state.tpg->submit({wis, workitems.size()}).value();
+ BOOST_CHECK(!shared_state.tpg->stopping());
+ BOOST_CHECK(!shared_state.tpg->stopped());
+ for(size_t n = 0; n < workitems.size(); n++)
+ {
+ BOOST_CHECK(workitems[n].parent() == shared_state.tpg.get());
+ }
+ BOOST_CHECK(llfio::dynamic_thread_pool_group::current_nesting_level() == 0);
+ BOOST_CHECK(llfio::dynamic_thread_pool_group::current_work_item() == nullptr);
+ };
+ auto check = [&] {
+ auto r = shared_state.tpg->wait();
+ if(!r)
+ {
+ std::cerr << "ERROR: wait() reports failure " << r.error().message() << std::endl;
+ r.value();
+ }
+ BOOST_CHECK(!shared_state.tpg->stopping());
+ BOOST_CHECK(shared_state.tpg->stopped());
+ BOOST_CHECK(llfio::dynamic_thread_pool_group::current_nesting_level() == 0);
+ BOOST_CHECK(llfio::dynamic_thread_pool_group::current_work_item() == nullptr);
+ for(size_t n = 0; n < workitems.size(); n++)
+ {
+ BOOST_CHECK(workitems[n].parent() == nullptr);
+ }
+ BOOST_CHECK(shared_state.group_completes == workitems.size());
+ BOOST_CHECK(shared_state.executed[0] == 0);
+ if(shared_state.cancelling)
+ {
+ size_t executed = 0, notexecuted = 0;
+ for(size_t n = 1; n <= workitems.size(); n++)
+ {
+ if(shared_state.executed[n] == 1)
+ {
+ executed++;
+ }
+ else
+ {
+ notexecuted++;
+ }
+ }
+ std::cout << "During cancellation, executed " << executed << " and did not execute " << notexecuted << std::endl;
+ }
+ else
+ {
+ for(size_t n = 1; n <= workitems.size(); n++)
+ {
+ BOOST_CHECK(shared_state.executed[n] == 1);
+ if(shared_state.executed[n] != 1)
+ {
+ std::cout << "shared_state.executed[" << n << "] = " << shared_state.executed[n] << std::endl;
+ }
+ }
+ }
+ std::cout << "Maximum concurrency achieved with " << workitems.size() << " work items = " << shared_state.max_concurrency << "\n" << std::endl;
+ };
+ auto print_exception_throw = llfio::make_scope_fail([]() noexcept { std::cout << "NOTE: Exception throw occurred!" << std::endl; });
+
+ // Test a single work item
+ reset(1);
+ submit();
+ check();
+
+ // Test 10 work items
+ reset(10);
+ submit();
+ check();
+
+ // Test 1000 work items
+ reset(1000);
+ submit();
+ check();
+
+ // Test 1000 work items with stop
+ reset(1000);
+ submit();
+ std::this_thread::sleep_for(std::chrono::milliseconds(10));
+ shared_state.cancelling = true;
+ shared_state.tpg->stop().value();
+ BOOST_CHECK(shared_state.tpg->stopping());
+ auto r = shared_state.tpg->wait();
+ BOOST_CHECK(!shared_state.tpg->stopping());
+ BOOST_REQUIRE(!r);
+ BOOST_CHECK(r.error() == llfio::errc::operation_canceled);
+ check();
+}
+
+static inline void TestDynamicThreadPoolGroupNestingWorks()
+{
+ if(std::thread::hardware_concurrency() < 4)
+ {
+ std::cout << "NOTE: Skipping TestDynamicThreadPoolGroupNestingWorks as hardware concurrency is below 4." << std::endl;
+ return;
+ }
+ namespace llfio = LLFIO_V2_NAMESPACE;
+ static constexpr size_t MAX_NESTING = 10;
+ static constexpr int COUNT_PER_WORK_ITEM = 1000;
+ struct shared_state_t
+ {
+ std::mutex lock;
+ std::unordered_map<uint64_t, size_t> time_bucket;
+ llfio::dynamic_thread_pool_group_ptr tpg;
+ double stddev{0};
+ void calc_stddev()
+ {
+ stddev = 0;
+ uint64_t mean = 0, count = 0;
+ for(auto &i : time_bucket)
+ {
+ mean += i.first * i.second;
+ count += i.second;
+ }
+ mean /= count;
+ for(auto &i : time_bucket)
+ {
+ double diff = (double) abs((int64_t) i.first - (int64_t) mean);
+ stddev += diff * diff * i.second;
+ }
+ stddev /= count;
+ stddev = sqrt(stddev);
+ }
+ } shared_states[MAX_NESTING];
+ struct work_item final : public llfio::dynamic_thread_pool_group::work_item
+ {
+ using _base = llfio::dynamic_thread_pool_group::work_item;
+ const size_t nesting{0};
+ llfio::span<shared_state_t> shared_states;
+ std::atomic<int> count{COUNT_PER_WORK_ITEM};
+ std::unique_ptr<work_item> childwi;
+
+ work_item() = default;
+ explicit work_item(size_t _nesting, llfio::span<shared_state_t> _shared_states)
+ : nesting(_nesting)
+ , shared_states(_shared_states)
+ {
+ if(nesting + 1 < MAX_NESTING)
+ {
+ childwi = std::make_unique<work_item>(nesting + 1, shared_states);
+ }
+ }
+ work_item(work_item &&o) noexcept
+ : _base(std::move(o))
+ , nesting(o.nesting)
+ , shared_states(o.shared_states)
+ , childwi(std::move(o.childwi))
+ {
+ }
+
+ virtual intptr_t next(llfio::deadline & /*unused*/) noexcept override
+ {
+ auto ret = count.fetch_sub(1);
+ if(ret <= 0)
+ {
+ ret = -1;
+ }
+ return ret;
+ }
+ virtual llfio::result<void> operator()(intptr_t work) noexcept override
+ {
+ auto supposed_nesting_level = llfio::dynamic_thread_pool_group::current_nesting_level();
+ BOOST_CHECK(nesting + 1 == supposed_nesting_level);
+ if(nesting + 1 != supposed_nesting_level)
+ {
+ std::cerr << "current_nesting_level() reports " << supposed_nesting_level << " not " << (nesting + 1) << std::endl;
+ }
+ uint64_t idx = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now().time_since_epoch()).count();
+ std::lock_guard<std::mutex> g(shared_states[nesting].lock);
+ // std::cout << "wi " << this << " nesting " << nesting << " work " << work << std::endl;
+ if(COUNT_PER_WORK_ITEM == work && childwi)
+ {
+ if(!shared_states[nesting].tpg)
+ {
+ shared_states[nesting].tpg = llfio::make_dynamic_thread_pool_group().value();
+ }
+ OUTCOME_TRY(shared_states[nesting].tpg->submit(childwi.get()));
+ }
+ shared_states[nesting].time_bucket[idx]++;
+ return llfio::success();
+ }
+ // virtual void group_complete(const llfio::result<void> &/*unused*/) noexcept override { }
+ };
+ std::vector<work_item> workitems;
+ for(size_t n = 0; n < 100; n++)
+ {
+ workitems.emplace_back(0, shared_states);
+ }
+ auto tpg = llfio::make_dynamic_thread_pool_group().value();
+ tpg->submit(llfio::span<work_item>(workitems)).value();
+ tpg->wait().value();
+ for(size_t n = 0; n < MAX_NESTING - 1; n++)
+ {
+ std::unique_lock<std::mutex> g(shared_states[n].lock);
+ while(!shared_states[n].tpg)
+ {
+ g.unlock();
+ std::this_thread::sleep_for(std::chrono::milliseconds(1));
+ g.lock();
+ }
+ g.unlock();
+ shared_states[n].tpg->wait().value();
+ }
+ for(size_t n = 0; n < MAX_NESTING; n++)
+ {
+ shared_states[n].calc_stddev();
+ std::cout << " Standard deviation for nesting level " << (n + 1) << " was " << shared_states[n].stddev << std::endl;
+ }
+ BOOST_CHECK(shared_states[MAX_NESTING - 1].stddev < shared_states[MAX_NESTING / 4].stddev * 3 / 4);
+}
+
+static inline void TestDynamicThreadPoolGroupIoAwareWorks()
+{
+ // statfs_t::iosinprogress not implemented for these yet
+#if defined(__APPLE__) || defined(__FreeBSD__)
+ return;
+#endif
+ namespace llfio = LLFIO_V2_NAMESPACE;
+ static constexpr size_t WORK_ITEMS = 1000;
+ static constexpr size_t IO_SIZE = 1 * 65536;
+ struct shared_state_t
+ {
+ llfio::file_handle h;
+ llfio::dynamic_thread_pool_group::io_aware_work_item::io_handle_awareness awareness;
+ std::atomic<size_t> concurrency{0}, max_concurrency{0};
+ std::atomic<uint64_t> current_pacing{0};
+ } shared_state;
+ struct work_item final : public llfio::dynamic_thread_pool_group::io_aware_work_item
+ {
+ using _base = llfio::dynamic_thread_pool_group::io_aware_work_item;
+ shared_state_t *shared_state;
+
+ work_item() = default;
+ explicit work_item(shared_state_t *_shared_state)
+ : _base({&_shared_state->awareness, 1})
+ , shared_state(_shared_state)
+ {
+ }
+ work_item(work_item &&o) noexcept
+ : _base(std::move(o))
+ , shared_state(o.shared_state)
+ {
+ }
+
+ virtual intptr_t io_aware_next(llfio::deadline &d) noexcept override
+ {
+ shared_state->current_pacing.store(d.nsecs, std::memory_order_relaxed);
+ return 1;
+ }
+ virtual llfio::result<void> operator()(intptr_t /*unused*/) noexcept override
+ {
+ auto concurrency = shared_state->concurrency.fetch_add(1, std::memory_order_relaxed) + 1;
+ for(size_t expected_ = shared_state->max_concurrency; concurrency > expected_;)
+ {
+ shared_state->max_concurrency.compare_exchange_weak(expected_, concurrency);
+ }
+ static thread_local std::vector<llfio::byte, llfio::utils::page_allocator<llfio::byte>> buffer(IO_SIZE);
+ OUTCOME_TRY(shared_state->h.read((concurrency - 1) * IO_SIZE, {{buffer}}));
+ shared_state->concurrency.fetch_sub(1, std::memory_order_relaxed);
+ return llfio::success();
+ }
+ // virtual void group_complete(const llfio::result<void> &/*unused*/) noexcept override { }
+ };
+ shared_state.h = llfio::file_handle::temp_file({}, llfio::file_handle::mode::write, llfio::file_handle::creation::only_if_not_exist,
+ llfio::file_handle::caching::only_metadata)
+ .value();
+ shared_state.awareness.h = &shared_state.h;
+ shared_state.h.truncate(WORK_ITEMS * IO_SIZE).value();
+ alignas(4096) llfio::byte buffer[IO_SIZE];
+ llfio::utils::random_fill((char *) buffer, sizeof(buffer));
+ std::vector<work_item> workitems;
+ for(size_t n = 0; n < WORK_ITEMS; n++)
+ {
+ workitems.emplace_back(&shared_state);
+ shared_state.h.write(n * IO_SIZE, {{buffer, sizeof(buffer)}}).value();
+ }
+ auto tpg = llfio::make_dynamic_thread_pool_group().value();
+ tpg->submit(llfio::span<work_item>(workitems)).value();
+ auto begin = std::chrono::steady_clock::now();
+ size_t paced = 0;
+ while(std::chrono::duration_cast<std::chrono::seconds>(std::chrono::steady_clock::now() - begin) < std::chrono::seconds(60))
+ {
+ llfio::statfs_t statfs;
+ statfs.fill(shared_state.h, llfio::statfs_t::want::iosinprogress | llfio::statfs_t::want::iosbusytime | llfio::statfs_t::want::mntonname).value();
+ std::cout << "\nStorage device at " << statfs.f_mntonname << " is at " << (100.0f * statfs.f_iosbusytime) << "% utilisation and has an i/o queue depth of "
+ << statfs.f_iosinprogress << ". Current concurrency is " << shared_state.concurrency.load(std::memory_order_relaxed) << " and current pacing is "
+ << (shared_state.current_pacing.load(std::memory_order_relaxed) / 1000.0) << " microseconds." << std::endl;
+ if(shared_state.current_pacing.load(std::memory_order_relaxed) > 0)
+ {
+ paced++;
+ }
+ std::this_thread::sleep_for(std::chrono::milliseconds(250));
+ }
+ std::cout << "\nStopping ..." << std::endl;
+ tpg->stop().value();
+ while(!tpg->stopped())
+ {
+ std::cout << "\nCurrent concurrency is " << shared_state.concurrency.load(std::memory_order_relaxed) << " and current pacing is "
+ << (shared_state.current_pacing.load(std::memory_order_relaxed) / 1000.0) << " microseconds." << std::endl;
+ std::this_thread::sleep_for(std::chrono::milliseconds(1000));
+ }
+ auto r = tpg->wait();
+ if(!r && r.error() != llfio::errc::operation_canceled)
+ {
+ r.value();
+ }
+ BOOST_CHECK(paced > 0);
+}
+
+KERNELTEST_TEST_KERNEL(integration, llfio, dynamic_thread_pool_group, works, "Tests that llfio::dynamic_thread_pool_group works as expected",
+ TestDynamicThreadPoolGroupWorks())
+KERNELTEST_TEST_KERNEL(integration, llfio, dynamic_thread_pool_group, nested, "Tests that nesting of llfio::dynamic_thread_pool_group works as expected",
+ TestDynamicThreadPoolGroupNestingWorks())
+// KERNELTEST_TEST_KERNEL(integration, llfio, dynamic_thread_pool_group, io_aware_work_item,
+// "Tests that llfio::dynamic_thread_pool_group::io_aware_work_item works as expected", TestDynamicThreadPoolGroupIoAwareWorks())
diff --git a/test/tests/statfs.cpp b/test/tests/statfs.cpp
new file mode 100644
index 00000000..138fd4c1
--- /dev/null
+++ b/test/tests/statfs.cpp
@@ -0,0 +1,101 @@
+/* Integration test kernel for statfs
+(C) 2020 Niall Douglas <http://www.nedproductions.biz/> (2 commits)
+File Created: Dec 2020
+
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License in the accompanying file
+Licence.txt or at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+
+Distributed under the Boost Software License, Version 1.0.
+ (See accompanying file Licence.txt or copy at
+ http://www.boost.org/LICENSE_1_0.txt)
+*/
+
+#include "../test_kernel_decl.hpp"
+
+#include <future>
+#include <cmath>
+
+static inline void TestStatfsIosInProgress()
+{
+ namespace llfio = LLFIO_V2_NAMESPACE;
+ llfio::file_handle h1 = llfio::file_handle::uniquely_named_file({}, llfio::file_handle::mode::write, llfio::file_handle::caching::all,
+ llfio::file_handle::flag::unlink_on_first_close)
+ .value();
+ llfio::file_handle h2 = llfio::file_handle::temp_file().value();
+ // h1 is within our build directory's volume, h2 is within our temp volume.
+ auto print_statfs = [](const llfio::file_handle &h, const llfio::statfs_t &statfs) {
+ std::cout << "\nFor file " << h.current_path().value() << ":";
+ std::cout << "\n fundamental filesystem block size = " << statfs.f_bsize;
+ std::cout << "\n optimal transfer block size = " << statfs.f_iosize;
+ std::cout << "\n total data blocks in filesystem = " << statfs.f_blocks;
+ std::cout << "\n free blocks in filesystem = " << statfs.f_bfree;
+ std::cout << "\n free blocks avail to non-superuser = " << statfs.f_bavail;
+ std::cout << "\n total file nodes in filesystem = " << statfs.f_files;
+ std::cout << "\n free nodes avail to non-superuser = " << statfs.f_ffree;
+ std::cout << "\n maximum filename length = " << statfs.f_namemax;
+ std::cout << "\n filesystem type name = " << statfs.f_fstypename;
+ std::cout << "\n mounted filesystem = " << statfs.f_mntfromname;
+ std::cout << "\n directory on which mounted = " << statfs.f_mntonname;
+ std::cout << "\n i/o's currently in progress (i.e. queue depth) = " << statfs.f_iosinprogress;
+ std::cout << "\n percentage of time spent doing i/o (1.0 = 100%) = " << statfs.f_iosbusytime;
+ std::cout << std::endl;
+ };
+ llfio::statfs_t s1base, s2base;
+ s1base.fill(h1).value();
+ s2base.fill(h2).value();
+ print_statfs(h1, s1base);
+ print_statfs(h2, s2base);
+ std::atomic<bool> done{false};
+ auto do_io_to = [&done](llfio::file_handle &fh) {
+ std::vector<llfio::byte> buffer;
+ buffer.resize(4096); // 1048576
+ llfio::utils::random_fill((char *) buffer.data(), buffer.size());
+ while(!done)
+ {
+ fh.write(0, {{buffer.data(), buffer.size()}}).value();
+ fh.barrier().value();
+ }
+ };
+ {
+ auto f = std::async(std::launch::async, [&] { do_io_to(h1); });
+ std::this_thread::sleep_for(std::chrono::milliseconds(1000));
+ llfio::statfs_t s1load, s2load;
+ s1load.fill(h1).value();
+ s2load.fill(h2).value();
+ done = true;
+ print_statfs(h1, s1load);
+ print_statfs(h2, s2load);
+ // BOOST_CHECK(s1load.f_iosinprogress > s1base.f_iosinprogress);
+ BOOST_CHECK(std::isnan(s1base.f_iosbusytime) || s1load.f_iosbusytime > s1base.f_iosbusytime);
+ f.get();
+ done = false;
+ }
+ {
+ auto f = std::async(std::launch::async, [&] { do_io_to(h2); });
+ std::this_thread::sleep_for(std::chrono::milliseconds(1000));
+ llfio::statfs_t s1load, s2load;
+ s1load.fill(h1).value();
+ s2load.fill(h2).value();
+ done = true;
+ print_statfs(h1, s1load);
+ print_statfs(h2, s2load);
+ // BOOST_CHECK(s2load.f_iosinprogress > s2base.f_iosinprogress);
+ BOOST_CHECK(std::isnan(s2base.f_iosbusytime) || s2load.f_iosbusytime > s2base.f_iosbusytime);
+ f.get();
+ done = false;
+ }
+}
+
+KERNELTEST_TEST_KERNEL(integration, llfio, statfs, iosinprogress, "Tests that llfio::statfs_t::f_iosinprogress works as expected", TestStatfsIosInProgress())