diff options
Diffstat (limited to 'include/llfio/v2.0')
-rw-r--r-- | include/llfio/v2.0/detail/impl/dynamic_thread_pool_group.ipp | 2491 | ||||
-rw-r--r-- | include/llfio/v2.0/detail/impl/posix/directory_handle.ipp | 57 | ||||
-rw-r--r-- | include/llfio/v2.0/detail/impl/posix/statfs.ipp | 494 | ||||
-rw-r--r-- | include/llfio/v2.0/detail/impl/windows/directory_handle.ipp | 2 | ||||
-rw-r--r-- | include/llfio/v2.0/detail/impl/windows/file_handle.ipp | 111 | ||||
-rw-r--r-- | include/llfio/v2.0/detail/impl/windows/statfs.ipp | 21 | ||||
-rw-r--r-- | include/llfio/v2.0/dynamic_thread_pool_group.hpp | 530 | ||||
-rw-r--r-- | include/llfio/v2.0/fs_handle.hpp | 2 | ||||
-rw-r--r-- | include/llfio/v2.0/llfio.hpp | 5 | ||||
-rw-r--r-- | include/llfio/v2.0/logging.hpp | 31 | ||||
-rw-r--r-- | include/llfio/v2.0/statfs.hpp | 59 | ||||
-rw-r--r-- | include/llfio/v2.0/status_code.hpp | 6 |
12 files changed, 3621 insertions, 188 deletions
diff --git a/include/llfio/v2.0/detail/impl/dynamic_thread_pool_group.ipp b/include/llfio/v2.0/detail/impl/dynamic_thread_pool_group.ipp new file mode 100644 index 00000000..287683bf --- /dev/null +++ b/include/llfio/v2.0/detail/impl/dynamic_thread_pool_group.ipp @@ -0,0 +1,2491 @@ +/* Dynamic thread pool group +(C) 2020-2021 Niall Douglas <http://www.nedproductions.biz/> (9 commits) +File Created: Dec 2020 + + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License in the accompanying file +Licence.txt or at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + + +Distributed under the Boost Software License, Version 1.0. + (See accompanying file Licence.txt or copy at + http://www.boost.org/LICENSE_1_0.txt) +*/ + +#include "../../dynamic_thread_pool_group.hpp" + +#include "../../file_handle.hpp" +#include "../../statfs.hpp" + +#include "quickcpplib/spinlock.hpp" + +#include <atomic> +#include <memory> +#include <mutex> +#include <unordered_map> +#include <unordered_set> +#include <vector> + +#include <iostream> + +#ifndef LLFIO_DYNAMIC_THREAD_POOL_GROUP_USING_GCD +#if LLFIO_FORCE_USE_LIBDISPATCH +#include <dispatch/dispatch.h> +#define LLFIO_DYNAMIC_THREAD_POOL_GROUP_USING_GCD 1 +#else +#ifdef _WIN32 +#include "windows/import.hpp" +#include <threadpoolapiset.h> +#else +#if __has_include(<dispatch/dispatch.h>) +#include <dispatch/dispatch.h> +#define LLFIO_DYNAMIC_THREAD_POOL_GROUP_USING_GCD 1 +#endif +#endif +#endif +#endif +#if !LLFIO_DYNAMIC_THREAD_POOL_GROUP_USING_GCD && !defined(_WIN32) +#if !defined(__linux__) +#error dynamic_thread_pool_group requires Grand Central Dispatch (libdispatch) on non-Linux POSIX. +#endif +#include <dirent.h> /* Defines DT_* constants */ +#include <sys/syscall.h> + +#include <condition_variable> +#include <thread> +#endif + +#define LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING 0 + +/* NOTE that the Linux results are from a VM on the same machine as the Windows results, +so they are not directly comparable. + +Linux 4Kb and 64Kb + +Benchmarking asio ... + For 1 work items got 38182.6 SHA256 hashes/sec with 1 maximum concurrency. + For 2 work items got 68664 SHA256 hashes/sec with 2 maximum concurrency. + For 4 work items got 87036.4 SHA256 hashes/sec with 4 maximum concurrency. + For 8 work items got 78702.2 SHA256 hashes/sec with 8 maximum concurrency. + For 16 work items got 51911.2 SHA256 hashes/sec with 16 maximum concurrency. + For 32 work items got 553964 SHA256 hashes/sec with 31 maximum concurrency. + For 64 work items got 713844 SHA256 hashes/sec with 36 maximum concurrency. + For 128 work items got 700172 SHA256 hashes/sec with 37 maximum concurrency. + For 256 work items got 716099 SHA256 hashes/sec with 37 maximum concurrency. + For 512 work items got 703323 SHA256 hashes/sec with 37 maximum concurrency. + For 1024 work items got 722827 SHA256 hashes/sec with 38 maximum concurrency. + +Benchmarking asio ... + For 1 work items got 3917.88 SHA256 hashes/sec with 1 maximum concurrency. + For 2 work items got 7798.29 SHA256 hashes/sec with 2 maximum concurrency. + For 4 work items got 14395.2 SHA256 hashes/sec with 4 maximum concurrency. + For 8 work items got 23633.4 SHA256 hashes/sec with 8 maximum concurrency. + For 16 work items got 31771.1 SHA256 hashes/sec with 16 maximum concurrency. + For 32 work items got 57978 SHA256 hashes/sec with 32 maximum concurrency. + For 64 work items got 66200.6 SHA256 hashes/sec with 64 maximum concurrency. + For 128 work items got 65706.5 SHA256 hashes/sec with 64 maximum concurrency. + For 256 work items got 65717.5 SHA256 hashes/sec with 64 maximum concurrency. + For 512 work items got 65652.4 SHA256 hashes/sec with 64 maximum concurrency. + For 1024 work items got 65580.3 SHA256 hashes/sec with 64 maximum concurrency. + + +Windows 4Kb and 64kB + +Benchmarking asio ... + For 1 work items got 51216.7 SHA256 hashes/sec with 1 maximum concurrency. + For 2 work items got 97691 SHA256 hashes/sec with 2 maximum concurrency. + For 4 work items got 184381 SHA256 hashes/sec with 4 maximum concurrency. + For 8 work items got 305270 SHA256 hashes/sec with 8 maximum concurrency. + For 16 work items got 520728 SHA256 hashes/sec with 16 maximum concurrency. + For 32 work items got 482729 SHA256 hashes/sec with 32 maximum concurrency. + For 64 work items got 1.02629e+06 SHA256 hashes/sec with 64 maximum concurrency. + For 128 work items got 1.01816e+06 SHA256 hashes/sec with 64 maximum concurrency. + For 256 work items got 1.01672e+06 SHA256 hashes/sec with 64 maximum concurrency. + For 512 work items got 1.01727e+06 SHA256 hashes/sec with 64 maximum concurrency. + For 1024 work items got 1.01477e+06 SHA256 hashes/sec with 64 maximum concurrency. + +Benchmarking asio ... + For 1 work items got 4069.92 SHA256 hashes/sec with 1 maximum concurrency. + For 2 work items got 8099.1 SHA256 hashes/sec with 2 maximum concurrency. + For 4 work items got 16021.7 SHA256 hashes/sec with 4 maximum concurrency. + For 8 work items got 30275.2 SHA256 hashes/sec with 8 maximum concurrency. + For 16 work items got 40972.5 SHA256 hashes/sec with 16 maximum concurrency. + For 32 work items got 70919.2 SHA256 hashes/sec with 32 maximum concurrency. + For 64 work items got 71917 SHA256 hashes/sec with 64 maximum concurrency. + For 128 work items got 71111.8 SHA256 hashes/sec with 64 maximum concurrency. + For 256 work items got 70963.5 SHA256 hashes/sec with 64 maximum concurrency. + For 512 work items got 70956.3 SHA256 hashes/sec with 64 maximum concurrency. + For 1024 work items got 70989.9 SHA256 hashes/sec with 64 maximum concurrency. +*/ + + +/* Linux 4Kb and 64Kb libdispatch + +Benchmarking llfio (Grand Central Dispatch) ... + For 1 work items got 33942.7 SHA256 hashes/sec with 1 maximum concurrency. + For 2 work items got 91275.8 SHA256 hashes/sec with 2 maximum concurrency. + For 4 work items got 191446 SHA256 hashes/sec with 4 maximum concurrency. + For 8 work items got 325776 SHA256 hashes/sec with 8 maximum concurrency. + For 16 work items got 405282 SHA256 hashes/sec with 16 maximum concurrency. + For 32 work items got 408015 SHA256 hashes/sec with 31 maximum concurrency. + For 64 work items got 412343 SHA256 hashes/sec with 32 maximum concurrency. + For 128 work items got 450024 SHA256 hashes/sec with 41 maximum concurrency. + For 256 work items got 477885 SHA256 hashes/sec with 46 maximum concurrency. + For 512 work items got 531752 SHA256 hashes/sec with 48 maximum concurrency. + For 1024 work items got 608181 SHA256 hashes/sec with 44 maximum concurrency. + +Benchmarking llfio (Grand Central Dispatch) ... + For 1 work items got 3977.21 SHA256 hashes/sec with 1 maximum concurrency. + For 2 work items got 7980.09 SHA256 hashes/sec with 2 maximum concurrency. + For 4 work items got 15075.6 SHA256 hashes/sec with 4 maximum concurrency. + For 8 work items got 24427.3 SHA256 hashes/sec with 8 maximum concurrency. + For 16 work items got 41858.7 SHA256 hashes/sec with 16 maximum concurrency. + For 32 work items got 64896.4 SHA256 hashes/sec with 32 maximum concurrency. + For 64 work items got 65683.6 SHA256 hashes/sec with 34 maximum concurrency. + For 128 work items got 65476.1 SHA256 hashes/sec with 35 maximum concurrency. + For 256 work items got 65210.6 SHA256 hashes/sec with 36 maximum concurrency. + For 512 work items got 65241.1 SHA256 hashes/sec with 36 maximum concurrency. + For 1024 work items got 65205.3 SHA256 hashes/sec with 37 maximum concurrency. +*/ + +/* Linux 4Kb and 64Kb native + +Benchmarking llfio (Linux native) ... + For 1 work items got 65160.3 SHA256 hashes/sec with 1 maximum concurrency. + For 2 work items got 126586 SHA256 hashes/sec with 2 maximum concurrency. + For 4 work items got 246616 SHA256 hashes/sec with 4 maximum concurrency. + For 8 work items got 478938 SHA256 hashes/sec with 8 maximum concurrency. + For 16 work items got 529919 SHA256 hashes/sec with 15 maximum concurrency. + For 32 work items got 902885 SHA256 hashes/sec with 32 maximum concurrency. + For 64 work items got 919633 SHA256 hashes/sec with 34 maximum concurrency. + For 128 work items got 919695 SHA256 hashes/sec with 35 maximum concurrency. + For 256 work items got 923159 SHA256 hashes/sec with 36 maximum concurrency. + For 512 work items got 922961 SHA256 hashes/sec with 37 maximum concurrency. + For 1024 work items got 926624 SHA256 hashes/sec with 38 maximum concurrency. + +Benchmarking llfio (Linux native) ... + For 1 work items got 4193.79 SHA256 hashes/sec with 1 maximum concurrency. + For 2 work items got 8422.44 SHA256 hashes/sec with 2 maximum concurrency. + For 4 work items got 12521.7 SHA256 hashes/sec with 3 maximum concurrency. + For 8 work items got 20028.4 SHA256 hashes/sec with 6 maximum concurrency. + For 16 work items got 30657.4 SHA256 hashes/sec with 10 maximum concurrency. + For 32 work items got 53217.4 SHA256 hashes/sec with 20 maximum concurrency. + For 64 work items got 65452.3 SHA256 hashes/sec with 32 maximum concurrency. + For 128 work items got 65396.3 SHA256 hashes/sec with 32 maximum concurrency. + For 256 work items got 65363.7 SHA256 hashes/sec with 32 maximum concurrency. + For 512 work items got 65198.2 SHA256 hashes/sec with 32 maximum concurrency. + For 1024 work items got 65003.9 SHA256 hashes/sec with 34 maximum concurrency. +*/ + + +/* Windows 4Kb and 64Kb Win32 thread pool + +Benchmarking llfio (Win32 thread pool (Vista+)) ... + For 1 work items got 57995.3 SHA256 hashes/sec with 1 maximum concurrency. + For 2 work items got 120267 SHA256 hashes/sec with 2 maximum concurrency. + For 4 work items got 238139 SHA256 hashes/sec with 4 maximum concurrency. + For 8 work items got 413488 SHA256 hashes/sec with 8 maximum concurrency. + For 16 work items got 575423 SHA256 hashes/sec with 16 maximum concurrency. + For 32 work items got 720938 SHA256 hashes/sec with 31 maximum concurrency. + For 64 work items got 703460 SHA256 hashes/sec with 30 maximum concurrency. + For 128 work items got 678257 SHA256 hashes/sec with 29 maximum concurrency. + For 256 work items got 678898 SHA256 hashes/sec with 29 maximum concurrency. + For 512 work items got 671729 SHA256 hashes/sec with 28 maximum concurrency. + For 1024 work items got 674433 SHA256 hashes/sec with 30 maximum concurrency. + +Benchmarking llfio (Win32 thread pool (Vista+)) ... + For 1 work items got 4132.18 SHA256 hashes/sec with 1 maximum concurrency. + For 2 work items got 8197.21 SHA256 hashes/sec with 2 maximum concurrency. + For 4 work items got 16281.3 SHA256 hashes/sec with 4 maximum concurrency. + For 8 work items got 27447.5 SHA256 hashes/sec with 8 maximum concurrency. + For 16 work items got 42621.3 SHA256 hashes/sec with 16 maximum concurrency. + For 32 work items got 69857.7 SHA256 hashes/sec with 32 maximum concurrency. + For 64 work items got 68797.9 SHA256 hashes/sec with 33 maximum concurrency. + For 128 work items got 68980.4 SHA256 hashes/sec with 33 maximum concurrency. + For 256 work items got 70370.8 SHA256 hashes/sec with 33 maximum concurrency. + For 512 work items got 70365.8 SHA256 hashes/sec with 33 maximum concurrency. + For 1024 work items got 70794.6 SHA256 hashes/sec with 33 maximum concurrency. +*/ + + +LLFIO_V2_NAMESPACE_BEGIN + +namespace detail +{ + struct dynamic_thread_pool_group_impl_guard : std::unique_lock<std::mutex> + { + using std::unique_lock<std::mutex>::unique_lock; + }; +#if 0 + template <class T> class fake_atomic + { + T _v; + + public: + constexpr fake_atomic(T v) + : _v(v) + { + } + T load(std::memory_order /*unused*/) const { return _v; } + void store(T v, std::memory_order /*unused*/) { _v = v; } + T fetch_add(T v, std::memory_order /*unused*/) + { + _v += v; + return _v - v; + } + T fetch_sub(T v, std::memory_order /*unused*/) + { + _v -= v; + return _v + v; + } + }; +#endif + struct global_dynamic_thread_pool_impl_workqueue_item + { + const size_t nesting_level; + std::shared_ptr<global_dynamic_thread_pool_impl_workqueue_item> next; + std::unordered_set<dynamic_thread_pool_group_impl *> items; // Do NOT use without holding workqueue_lock + + explicit global_dynamic_thread_pool_impl_workqueue_item(size_t _nesting_level, std::shared_ptr<global_dynamic_thread_pool_impl_workqueue_item> &&preceding) + : nesting_level(_nesting_level) + , next(preceding) + { + } + +#if !LLFIO_DYNAMIC_THREAD_POOL_GROUP_USING_GCD && !defined(_WIN32) + static constexpr unsigned TOTAL_NEXTACTIVES = 1; + struct next_active_base_t + { + std::atomic<unsigned> count{0}; + QUICKCPPLIB_NAMESPACE::configurable_spinlock::spinlock<unsigned> lock; + dynamic_thread_pool_group::work_item *front{nullptr}, *back{nullptr}; + + next_active_base_t() = default; + next_active_base_t(const next_active_base_t &o) + : count(o.count.load(std::memory_order_relaxed)) + , front(o.front) + , back(o.back) + { + } + }; + struct alignas(64) next_active_work_t : next_active_base_t + { + char _padding[64 - sizeof(next_active_base_t)]; // 40 bytes? + } next_actives[TOTAL_NEXTACTIVES]; + static_assert(sizeof(next_active_work_t) == 64, "next_active_work_t is not a cacheline"); + next_active_base_t next_timer_relative, next_timer_absolute; + + dynamic_thread_pool_group::work_item *next_active(unsigned &count, size_t threadidx) + { + if(TOTAL_NEXTACTIVES > 1) + { + threadidx &= (TOTAL_NEXTACTIVES - 1); + const size_t original_threadidx = threadidx; + bool all_empty = true; + for(;;) + { + next_active_base_t &x = next_actives[threadidx]; + if(x.count.load(std::memory_order_relaxed) > 0) + { + all_empty = false; + if(x.lock.try_lock()) + { + auto *ret = x.front; + if(ret != nullptr) + { + x.front = ret->_next_scheduled; + count = x.count.fetch_sub(1, std::memory_order_relaxed); + if(x.front == nullptr) + { + assert(x.back == ret); + x.back = nullptr; + } + ret->_next_scheduled = nullptr; + x.lock.unlock(); + return ret; + } + x.lock.unlock(); + } + } + if(++threadidx >= TOTAL_NEXTACTIVES) + { + threadidx = 0; + } + if(threadidx == original_threadidx) + { + if(all_empty) + { + return nullptr; + } + all_empty = true; + } + } + } + else + { + next_active_base_t &x = next_actives[0]; + if(x.count.load(std::memory_order_relaxed) > 0) + { + x.lock.lock(); + auto *ret = x.front; + if(ret != nullptr) + { + x.front = ret->_next_scheduled; + count = x.count.fetch_sub(1, std::memory_order_relaxed); + if(x.front == nullptr) + { + assert(x.back == ret); + x.back = nullptr; + } + ret->_next_scheduled = nullptr; + x.lock.unlock(); + return ret; + } + x.lock.unlock(); + } + } + return nullptr; + } + + private: + next_active_base_t &_choose_next_active() + { + unsigned idx = (unsigned) -1, max_count = (unsigned) -1; + for(unsigned n = 0; n < TOTAL_NEXTACTIVES; n++) + { + auto c = next_actives[n].count.load(std::memory_order_relaxed); + if(c < max_count) + { + idx = n; + max_count = c; + } + } + for(;;) + { + if(next_actives[idx].lock.try_lock()) + { + return next_actives[idx]; + } + if(++idx >= TOTAL_NEXTACTIVES) + { + idx = 0; + } + } + } + + public: + void append_active(dynamic_thread_pool_group::work_item *p) + { + next_active_base_t &x = _choose_next_active(); + x.count.fetch_add(1, std::memory_order_relaxed); + if(x.back == nullptr) + { + assert(x.front == nullptr); + x.front = x.back = p; + x.lock.unlock(); + return; + } + p->_next_scheduled = nullptr; + x.back->_next_scheduled = p; + x.back = p; + x.lock.unlock(); + } + void prepend_active(dynamic_thread_pool_group::work_item *p) + { + next_active_base_t &x = _choose_next_active(); + x.count.fetch_add(1, std::memory_order_relaxed); + if(x.front == nullptr) + { + assert(x.back == nullptr); + x.front = x.back = p; + x.lock.unlock(); + return; + } + p->_next_scheduled = x.front; + x.front = p; + x.lock.unlock(); + } + + // x must be LOCKED on entry + template <int which> dynamic_thread_pool_group::work_item *next_timer() + { + if(which == 0) + { + return nullptr; + } + next_active_base_t &x = (which == 1) ? next_timer_relative : next_timer_absolute; + // x.lock.lock(); + auto *ret = x.front; + if(ret == nullptr) + { + assert(x.back == nullptr); + x.lock.unlock(); + return nullptr; + } + x.front = ret->_next_scheduled; + if(x.front == nullptr) + { + assert(x.back == ret); + x.back = nullptr; + } + ret->_next_scheduled = nullptr; + x.count.fetch_sub(1, std::memory_order_relaxed); + x.lock.unlock(); + return ret; + } + void append_timer(dynamic_thread_pool_group::work_item *i) + { + if(i->_timepoint1 != std::chrono::steady_clock::time_point()) + { + next_timer_relative.lock.lock(); + next_timer_relative.count.fetch_add(1, std::memory_order_relaxed); + if(next_timer_relative.front == nullptr) + { + next_timer_relative.front = next_timer_relative.back = i; + } + else + { + bool done = false; + for(dynamic_thread_pool_group::work_item *p = nullptr, *n = next_timer_relative.front; n != nullptr; p = n, n = n->_next_scheduled) + { + if(n->_timepoint1 <= i->_timepoint1) + { + if(p == nullptr) + { + i->_next_scheduled = n; + next_timer_relative.front = i; + } + else + { + i->_next_scheduled = n; + p->_next_scheduled = i; + } + done = true; + break; + } + } + if(!done) + { + next_timer_relative.back->_next_scheduled = i; + i->_next_scheduled = nullptr; + next_timer_relative.back = i; + } + } + next_timer_relative.lock.unlock(); + } + else + { + next_timer_absolute.lock.lock(); + next_timer_absolute.count.fetch_add(1, std::memory_order_relaxed); + if(next_timer_absolute.front == nullptr) + { + next_timer_absolute.front = next_timer_absolute.back = i; + } + else + { + bool done = false; + for(dynamic_thread_pool_group::work_item *p = nullptr, *n = next_timer_absolute.front; n != nullptr; p = n, n = n->_next_scheduled) + { + if(n->_timepoint2 <= i->_timepoint2) + { + if(p == nullptr) + { + i->_next_scheduled = n; + next_timer_absolute.front = i; + } + else + { + i->_next_scheduled = n; + p->_next_scheduled = i; + } + done = true; + break; + } + } + if(!done) + { + next_timer_absolute.back->_next_scheduled = i; + i->_next_scheduled = nullptr; + next_timer_absolute.back = i; + } + } + next_timer_absolute.lock.unlock(); + } + } +#endif + }; + struct global_dynamic_thread_pool_impl + { + using _spinlock_type = QUICKCPPLIB_NAMESPACE::configurable_spinlock::spinlock<unsigned>; + + _spinlock_type workqueue_lock; + struct workqueue_guard : std::unique_lock<_spinlock_type> + { + using std::unique_lock<_spinlock_type>::unique_lock; + }; + std::shared_ptr<global_dynamic_thread_pool_impl_workqueue_item> workqueue; +#if LLFIO_DYNAMIC_THREAD_POOL_GROUP_USING_GCD + using threadh_type = void *; + using grouph_type = dispatch_group_t; + static void _gcd_dispatch_callback(void *arg) + { + auto *workitem = (dynamic_thread_pool_group::work_item *) arg; + global_dynamic_thread_pool()._workerthread(workitem, nullptr); + } + static void _gcd_timer_callback(void *arg) + { + auto *workitem = (dynamic_thread_pool_group::work_item *) arg; + global_dynamic_thread_pool()._timerthread(workitem, nullptr); + } +#elif defined(_WIN32) + using threadh_type = PTP_CALLBACK_INSTANCE; + using grouph_type = PTP_CALLBACK_ENVIRON; + static void CALLBACK _win32_worker_thread_callback(threadh_type threadh, PVOID Parameter, PTP_WORK /*unused*/) + { + auto *workitem = (dynamic_thread_pool_group::work_item *) Parameter; + global_dynamic_thread_pool()._workerthread(workitem, threadh); + } + static void CALLBACK _win32_timer_thread_callback(threadh_type threadh, PVOID Parameter, PTP_TIMER /*unused*/) + { + auto *workitem = (dynamic_thread_pool_group::work_item *) Parameter; + global_dynamic_thread_pool()._timerthread(workitem, threadh); + } +#else + global_dynamic_thread_pool_impl_workqueue_item first_execute{(size_t) -1, {}}; + using threadh_type = void *; + using grouph_type = void *; + std::mutex threadpool_lock; + struct threadpool_guard : std::unique_lock<std::mutex> + { + using std::unique_lock<std::mutex>::unique_lock; + }; + struct thread_t + { + thread_t *_prev{nullptr}, *_next{nullptr}; + std::thread thread; + std::condition_variable cond; + std::chrono::steady_clock::time_point last_did_work; + std::atomic<int> state{0}; // <0 = dead, 0 = sleeping/please die, 1 = busy + }; + struct threads_t + { + size_t count{0}; + thread_t *front{nullptr}, *back{nullptr}; + } threadpool_active, threadpool_sleeping; + std::atomic<size_t> total_submitted_workitems{0}, threadpool_threads{0}; + std::atomic<uint32_t> ms_sleep_for_more_work{20000}; + + std::mutex threadmetrics_lock; + struct threadmetrics_guard : std::unique_lock<std::mutex> + { + using std::unique_lock<std::mutex>::unique_lock; + }; + struct threadmetrics_threadid + { + char text[12]; // enough for a UINT32_MAX in decimal + constexpr threadmetrics_threadid() + : text{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0} + { + } + threadmetrics_threadid(string_view sv) + { + memset(text, '0', sizeof(text)); + assert(sv.size() <= sizeof(text)); + if(sv.size() > sizeof(text)) + { + abort(); + } + memcpy(text + sizeof(text) - sv.size(), sv.data(), sv.size()); + } + int compare(const threadmetrics_threadid &o) const noexcept { return memcmp(text, o.text, sizeof(text)); } + bool operator<(const threadmetrics_threadid &o) const noexcept { return compare(o) < 0; } + bool operator==(const threadmetrics_threadid &o) const noexcept { return compare(o) == 0; } + }; + struct threadmetrics_item + { + threadmetrics_item *_prev{nullptr}, *_next{nullptr}; + std::chrono::steady_clock::time_point last_updated, blocked_since; // latter set if thread seen no time + threadmetrics_threadid threadid; + uint32_t diskfaults{(uint32_t) -1}, utime{(uint32_t) -1}, stime{(uint32_t) -1}; // culmulative ticks spent in user and system for this thread + + explicit threadmetrics_item(threadmetrics_threadid v) + : threadid(v) + { + } + string_view threadid_name() const noexcept { return string_view(threadid.text, sizeof(threadid.text)); } + }; + struct threadmetrics_t + { + size_t count{0}; + threadmetrics_item *front{nullptr}, *back{nullptr}; + uint32_t blocked{0}, running{0}; + } threadmetrics_queue; // items at front are least recently updated + std::vector<threadmetrics_item *> threadmetrics_sorted; // sorted by threadid + std::chrono::steady_clock::time_point threadmetrics_last_updated; + std::atomic<unsigned> populate_threadmetrics_reentrancy{0}; +#ifdef __linux__ + std::mutex proc_self_task_fd_lock; + int proc_self_task_fd{-1}; +#endif +#endif + + std::mutex io_aware_work_item_handles_lock; + struct io_aware_work_item_handles_guard : std::unique_lock<std::mutex> + { + using std::unique_lock<std::mutex>::unique_lock; + }; + struct io_aware_work_item_statfs + { + size_t refcount{0}; + deadline default_deadline; + float average_busy{0}, average_queuedepth{0}; + std::chrono::steady_clock::time_point last_updated; + statfs_t statfs; + }; + std::unordered_map<fs_handle::unique_id_type, io_aware_work_item_statfs, fs_handle::unique_id_type_hasher> io_aware_work_item_handles; + + global_dynamic_thread_pool_impl() + { +#if !LLFIO_DYNAMIC_THREAD_POOL_GROUP_USING_GCD && !defined(_WIN32) + populate_threadmetrics(std::chrono::steady_clock::now()); +#endif + } + + template <class T, class U> static void _append_to_list(T &what, U *v) + { + if(what.front == nullptr) + { + assert(what.back == nullptr); + v->_next = v->_prev = nullptr; + what.front = what.back = v; + } + else + { + v->_next = nullptr; + v->_prev = what.back; + what.back->_next = v; + what.back = v; + } + what.count++; + } + template <class T, class U> static void _prepend_to_list(T &what, U *v) + { + if(what.front == nullptr) + { + assert(what.back == nullptr); + v->_next = v->_prev = nullptr; + what.front = what.back = v; + } + else + { + v->_prev = nullptr; + v->_next = what.front; + what.front->_prev = v; + what.front = v; + } + what.count++; + } + template <class T, class U> static void _remove_from_list(T &what, U *v) + { + if(v->_prev == nullptr && v->_next == nullptr) + { + assert(what.front == v); + assert(what.back == v); + what.front = what.back = nullptr; + } + else if(v->_prev == nullptr) + { + assert(what.front == v); + v->_next->_prev = nullptr; + what.front = v->_next; + v->_next = v->_prev = nullptr; + } + else if(v->_next == nullptr) + { + assert(what.back == v); + v->_prev->_next = nullptr; + what.back = v->_prev; + v->_next = v->_prev = nullptr; + } + else + { + v->_next->_prev = v->_prev; + v->_prev->_next = v->_next; + v->_next = v->_prev = nullptr; + } + what.count--; + } + +#if !LLFIO_DYNAMIC_THREAD_POOL_GROUP_USING_GCD && !defined(_WIN32) + inline void _execute_work(thread_t *self); + + void _add_thread(threadpool_guard & /*unused*/) + { + thread_t *p = nullptr; + try + { + p = new thread_t; + _append_to_list(threadpool_active, p); + p->thread = std::thread([this, p] { _execute_work(p); }); + } + catch(...) + { + if(p != nullptr) + { + _remove_from_list(threadpool_active, p); + } + // drop failure + } + } + + bool _remove_thread(threadpool_guard &g, threads_t &which) + { + if(which.count == 0) + { + return false; + } + // Threads which went to sleep the longest ago are at the front + auto *t = which.front; + if(t->state.load(std::memory_order_acquire) < 0) + { + // He's already exiting + return false; + } + assert(t->state.load(std::memory_order_acquire) == 0); + t->state.fetch_sub(1, std::memory_order_release); +#if LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING + std::cout << "*** DTP " << t << " is told to quit" << std::endl; +#endif + do + { + g.unlock(); + t->cond.notify_one(); + g.lock(); + } while(t->state.load(std::memory_order_acquire) >= -1); +#if LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING + std::cout << "*** DTP " << t << " has quit, deleting" << std::endl; +#endif + _remove_from_list(threadpool_active, t); + t->thread.join(); + delete t; + return true; + } + + ~global_dynamic_thread_pool_impl() + { + { + threadpool_guard g(threadpool_lock); + while(threadpool_active.count > 0 || threadpool_sleeping.count > 0) + { + while(threadpool_sleeping.count > 0) + { + auto removed = _remove_thread(g, threadpool_sleeping); + assert(removed); + (void) removed; + } + if(threadpool_active.count > 0) + { + auto removed = _remove_thread(g, threadpool_active); + assert(removed); + (void) removed; + } + } + } + threadmetrics_guard g(threadmetrics_lock); + for(auto *p : threadmetrics_sorted) + { + delete p; + } + threadmetrics_sorted.clear(); + threadmetrics_queue = {}; +#ifdef __linux__ + if(proc_self_task_fd > 0) + { + ::close(proc_self_task_fd); + proc_self_task_fd = -1; + } +#endif + } + +#ifdef __linux__ + // You are guaranteed only one of these EVER executes at a time. Locking is probably overkill, but equally also probably harmless + bool update_threadmetrics(threadmetrics_guard &&g, std::chrono::steady_clock::time_point now, threadmetrics_item *new_items) + { + auto update_item = [&](threadmetrics_item *item) { + char path[64] = "/proc/self/task/", *pend = path + 16, *tend = item->threadid.text; + while(*tend == '0' && (tend - item->threadid.text) < (ssize_t) sizeof(item->threadid.text)) + { + ++tend; + } + while((tend - item->threadid.text) < (ssize_t) sizeof(item->threadid.text)) + { + *pend++ = *tend++; + } + memcpy(pend, "/stat", 6); + int fd = ::open(path, O_RDONLY); + if(-1 == fd) + { + // Thread may have exited since we last populated + if(item->blocked_since == std::chrono::steady_clock::time_point()) + { + threadmetrics_queue.running--; + threadmetrics_queue.blocked++; + } + item->blocked_since = now; + item->last_updated = now; + return; + } + char buffer[1024]; + auto bytesread = ::read(fd, buffer, sizeof(buffer)); + ::close(fd); + buffer[std::max((size_t) bytesread, sizeof(buffer) - 1)] = 0; + char state = 0; + unsigned long majflt = 0, utime = 0, stime = 0; + sscanf(buffer, "%*d %*s %c %*d %*d %*d %*d %*d %*u %*u %*u %lu %*u %lu %lu", &state, &majflt, &utime, &stime); + if(item->utime != (uint32_t) -1 || item->stime != (uint32_t) -1) + { + if(item->utime == (uint32_t) utime && item->stime == (uint32_t) stime && state != 'R') + { + // This thread made no progress since last time + if(item->blocked_since == std::chrono::steady_clock::time_point()) + { + threadmetrics_queue.running--; + threadmetrics_queue.blocked++; + item->blocked_since = now; + } + } + else + { + if(item->blocked_since != std::chrono::steady_clock::time_point()) + { + threadmetrics_queue.running++; + threadmetrics_queue.blocked--; + item->blocked_since = std::chrono::steady_clock::time_point(); + } + } + } + // std::cout << "Threadmetrics " << path << " " << state << " " << majflt << " " << utime << " " << stime << ". Previously " << item->diskfaults << " " + // << item->utime << " " << item->stime << std::endl; + item->diskfaults = (uint32_t) majflt; + item->utime = (uint32_t) utime; + item->stime = (uint32_t) stime; + item->last_updated = now; + }; + if(new_items != nullptr) + { + for(; new_items != nullptr; new_items = new_items->_next) + { + update_item(new_items); + } + return false; + } + if(threadmetrics_queue.count == 0) + { + return false; + } + size_t updated = 0; + while(now - threadmetrics_queue.front->last_updated >= std::chrono::milliseconds(200) && updated++ < 4) + { + auto *p = threadmetrics_queue.front; + update_item(p); + _remove_from_list(threadmetrics_queue, p); + _append_to_list(threadmetrics_queue, p); + } + // if(updated > 0) + { + static const auto min_hardware_concurrency = std::thread::hardware_concurrency(); + static const auto max_hardware_concurrency = min_hardware_concurrency + 3; + auto threadmetrics_running = (ssize_t) threadmetrics_queue.running; + auto threadmetrics_blocked = (ssize_t) threadmetrics_queue.blocked; + g.unlock(); // drop threadmetrics_lock + + threadpool_guard gg(threadpool_lock); + // Adjust for the number of threads sleeping for more work + threadmetrics_running += threadpool_sleeping.count; + threadmetrics_blocked -= threadpool_sleeping.count; + if(threadmetrics_blocked < 0) + { + threadmetrics_blocked = 0; + } + const auto desired_concurrency = std::min((ssize_t) min_hardware_concurrency, (ssize_t) total_submitted_workitems.load(std::memory_order_relaxed)); + auto toadd = std::max((ssize_t) 0, std::min(desired_concurrency - threadmetrics_running, desired_concurrency - (ssize_t) threadpool_active.count)); + auto toremove = std::max((ssize_t) 0, (ssize_t) threadmetrics_running - (ssize_t) max_hardware_concurrency); + if(toadd > 0 || toremove > 0) + { + // std::cout << "total active = " << threadpool_active.count << " total idle = " << threadpool_sleeping.count + // << " threadmetrics_running = " << threadmetrics_running << " threadmetrics_blocked = " << threadmetrics_blocked << " toadd = " << toadd + // << " toremove = " << toremove << std::endl; + if(toadd > 0) + { + _add_thread(gg); + } + if(toremove > 0 && threadpool_active.count > 1) + { + // Kill myself, but not if I'm the final thread who might need to run timers + return true; + } + } + } + return false; + } + // Returns true if calling thread is to exit + bool populate_threadmetrics(std::chrono::steady_clock::time_point now) + { + if(populate_threadmetrics_reentrancy.exchange(1, std::memory_order_relaxed) == 1) + { + return false; + } + auto unpopulate_threadmetrics_reentrancy = make_scope_exit([this]() noexcept { populate_threadmetrics_reentrancy.store(0, std::memory_order_relaxed); }); + + static thread_local std::vector<char> kernelbuffer(1024); + static thread_local std::vector<threadmetrics_threadid> threadidsbuffer(1024 / sizeof(dirent)); + using getdents64_t = int (*)(int, char *, unsigned int); + static auto getdents = static_cast<getdents64_t>([](int fd, char *buf, unsigned count) -> int { return syscall(SYS_getdents64, fd, buf, count); }); + using dirent = dirent64; + size_t bytes = 0; + { + threadmetrics_guard g(threadmetrics_lock); + if(now - threadmetrics_last_updated < std::chrono::milliseconds(250) && + threadmetrics_queue.running + threadmetrics_queue.blocked >= threadpool_threads.load(std::memory_order_relaxed)) + { + if(now - threadmetrics_last_updated < std::chrono::milliseconds(100)) + { + return false; + } + return update_threadmetrics(std::move(g), now, nullptr); + } + threadmetrics_last_updated = now; + if(proc_self_task_fd < 0) + { + proc_self_task_fd = ::open("/proc/self/task", O_RDONLY | O_DIRECTORY | O_CLOEXEC); + if(proc_self_task_fd < 0) + { + posix_error().throw_exception(); + } + } + } + { + std::lock_guard<std::mutex> g(proc_self_task_fd_lock); + /* It turns out that /proc/self/task is quite racy in the Linux kernel, so keep + looping this until it stops telling obvious lies. + */ + for(auto done = false; !done;) + { + if(-1 == ::lseek64(proc_self_task_fd, 0, SEEK_SET)) + { + posix_error().throw_exception(); + } + auto _bytes = getdents(proc_self_task_fd, kernelbuffer.data(), kernelbuffer.size()); + // std::cout << "getdents(" << (kernelbuffer.size()-bytes) << ") returns " << _bytes << std::endl; + if(_bytes < 0 && errno != EINVAL) + { + posix_error().throw_exception(); + } + if(_bytes >= 0 && kernelbuffer.size() - (size_t) _bytes >= sizeof(dirent) + 16) + { + bytes = (size_t) _bytes; + threadidsbuffer.clear(); + for(auto *dent = (dirent *) kernelbuffer.data();; dent = reinterpret_cast<dirent *>(reinterpret_cast<uintptr_t>(dent) + dent->d_reclen)) + { + if(dent->d_ino != 0u && dent->d_type == DT_DIR && dent->d_name[0] != '.') + { + size_t length = strchr(dent->d_name, 0) - dent->d_name; + threadidsbuffer.push_back(string_view(dent->d_name, length)); + } + if((bytes -= dent->d_reclen) <= 0) + { + break; + } + } + auto mythreadcount = threadpool_threads.load(std::memory_order_relaxed); + if(threadidsbuffer.size() >= mythreadcount) + { + // std::cout << "Parsed from /proc " << threadidsbuffer.size() << " entries, should be at least " << mythreadcount << std::endl; + std::sort(threadidsbuffer.begin(), threadidsbuffer.end()); + done = true; + break; + } +#ifndef NDEBUG + std::cout << "NOTE: /proc returned " << threadidsbuffer.size() << " items when we know for a fact at least " << mythreadcount + << " threads exist, retrying!" << std::endl; +#endif + continue; + } + kernelbuffer.resize(kernelbuffer.size() << 1); + } + } + threadmetrics_item *firstnewitem = nullptr; + threadmetrics_guard g(threadmetrics_lock); +#if 0 + { + std::stringstream s; + s << "Parsed from /proc " << threadidsbuffer.size() << " entries (should be at least " << threadpool_threads.load(std::memory_order_relaxed) << "):"; + for(auto &i : threadidsbuffer) + { + s << " " << string_view(i.text, 12); + } + std::cout << s.str() << std::endl; + } +#endif +#if 0 + { + auto d_it = threadmetrics_sorted.begin(); + auto s_it = threadidsbuffer.begin(); + for(; d_it != threadmetrics_sorted.end() && s_it != threadidsbuffer.end(); ++d_it, ++s_it) + { + std::cout << (*d_it)->threadid_name() << " " << string_view(s_it->text, 12) << "\n"; + } + for(; d_it != threadmetrics_sorted.end(); ++d_it) + { + std::cout << (*d_it)->threadid_name() << " XXXXXXXXXXXX\n"; + } + for(; s_it != threadidsbuffer.end(); ++s_it) + { + std::cout << "XXXXXXXXXXXX " << string_view(s_it->text, 12) << "\n"; + } + std::cout << std::flush; + } +#endif + auto d_it = threadmetrics_sorted.begin(); + auto s_it = threadidsbuffer.begin(); + auto remove_item = [&] { + // std::cout << "Removing thread metrics for " << (*d_it)->threadid_name() << std::endl; + if((*d_it)->blocked_since != std::chrono::steady_clock::time_point()) + { + threadmetrics_queue.blocked--; + } + else + { + threadmetrics_queue.running--; + } + _remove_from_list(threadmetrics_queue, *d_it); + delete *d_it; + d_it = threadmetrics_sorted.erase(d_it); + }; + auto add_item = [&] { + auto p = std::make_unique<threadmetrics_item>(*s_it); + d_it = threadmetrics_sorted.insert(d_it, p.get()); + _append_to_list(threadmetrics_queue, p.get()); + // std::cout << "Adding thread metrics for " << p->threadid_name() << std::endl; + if(firstnewitem == nullptr) + { + firstnewitem = p.get(); + } + p.release(); + threadmetrics_queue.running++; + }; + // std::cout << "Compare" << std::endl; + for(; d_it != threadmetrics_sorted.end() && s_it != threadidsbuffer.end();) + { + auto c = (*d_it)->threadid.compare(*s_it); + // std::cout << "Comparing " << (*d_it)->threadid_name() << " with " << string_view(s_it->text, 12) << " = " << c << std::endl; + if(0 == c) + { + ++d_it; + ++s_it; + continue; + } + if(c < 0) + { + // d_it has gone away + remove_item(); + } + if(c > 0) + { + // s_it is a new entry + add_item(); + } + } + // std::cout << "Tail dest" << std::endl; + while(d_it != threadmetrics_sorted.end()) + { + remove_item(); + } + // std::cout << "Tail source" << std::endl; + while(s_it != threadidsbuffer.end()) + { + add_item(); + ++d_it; + ++s_it; + } + assert(threadmetrics_sorted.size() == threadidsbuffer.size()); +#if 0 + if(!std::is_sorted(threadmetrics_sorted.begin(), threadmetrics_sorted.end(), + [](threadmetrics_item *a, threadmetrics_item *b) { return a->threadid < b->threadid; })) + { + std::cout << "Threadmetrics:"; + for(auto *p : threadmetrics_sorted) + { + std::cout << "\n " << p->threadid_name(); + } + std::cout << std::endl; + abort(); + } +#endif + assert(threadmetrics_queue.running + threadmetrics_queue.blocked == threadidsbuffer.size()); + return update_threadmetrics(std::move(g), now, firstnewitem); + } +#endif +#endif + + result<void> _prepare_work_item_delay(dynamic_thread_pool_group::work_item *workitem, grouph_type grouph, deadline d) + { + (void) grouph; + if(!d) + { + return errc::invalid_argument; + } + if(workitem->_nextwork.load(std::memory_order_acquire) == 0 || d.nsecs > 0) + { + if(d.nsecs > 0) + { + if(d.steady) + { + workitem->_timepoint1 = std::chrono::steady_clock::now() + std::chrono::nanoseconds(d.nsecs); + workitem->_timepoint2 = {}; + } + else + { + workitem->_timepoint1 = {}; + workitem->_timepoint2 = d.to_time_point(); + } + } + else + { + workitem->_timepoint1 = std::chrono::steady_clock::time_point(std::chrono::steady_clock::duration(1)); + workitem->_timepoint2 = {}; + } + assert(workitem->_has_timer_set()); +#if defined(_WIN32) + if(nullptr == workitem->_internaltimerh) + { + workitem->_internaltimerh = CreateThreadpoolTimer(_win32_timer_thread_callback, workitem, grouph); + if(nullptr == workitem->_internaltimerh) + { + return win32_error(); + } + } +#endif + } + else + { + if(workitem->_timepoint1 != std::chrono::steady_clock::time_point()) + { + workitem->_timepoint1 = {}; + } + if(workitem->_timepoint2 != std::chrono::system_clock::time_point()) + { + workitem->_timepoint2 = {}; + } + assert(!workitem->_has_timer_set()); + } + return success(); + } + + inline void _submit_work_item(bool submit_into_highest_priority, dynamic_thread_pool_group::work_item *workitem, bool defer_pool_wake); + + inline result<void> submit(dynamic_thread_pool_group_impl_guard &g, dynamic_thread_pool_group_impl *group, + span<dynamic_thread_pool_group::work_item *> work) noexcept; + + inline void _work_item_done(dynamic_thread_pool_group_impl_guard &g, dynamic_thread_pool_group::work_item *i) noexcept; + + inline result<void> stop(dynamic_thread_pool_group_impl_guard &g, dynamic_thread_pool_group_impl *group, result<void> err) noexcept; + inline result<void> wait(dynamic_thread_pool_group_impl_guard &g, bool reap, dynamic_thread_pool_group_impl *group, deadline d) noexcept; + + inline void _timerthread(dynamic_thread_pool_group::work_item *workitem, threadh_type selfthreadh); + inline void _workerthread(dynamic_thread_pool_group::work_item *workitem, threadh_type selfthreadh); + }; + struct global_dynamic_thread_pool_impl_thread_local_state_t + { + dynamic_thread_pool_group::work_item *workitem{nullptr}; + global_dynamic_thread_pool_impl::threadh_type current_callback_instance{nullptr}; + size_t nesting_level{0}; + }; + LLFIO_HEADERS_ONLY_FUNC_SPEC global_dynamic_thread_pool_impl_thread_local_state_t &global_dynamic_thread_pool_thread_local_state() noexcept + { + static thread_local global_dynamic_thread_pool_impl_thread_local_state_t tls; + return tls; + } + + LLFIO_HEADERS_ONLY_FUNC_SPEC global_dynamic_thread_pool_impl &global_dynamic_thread_pool() noexcept + { + static global_dynamic_thread_pool_impl impl; + return impl; + } +} // namespace detail + + +LLFIO_HEADERS_ONLY_MEMFUNC_SPEC const char *dynamic_thread_pool_group::implementation_description() noexcept +{ +#if LLFIO_DYNAMIC_THREAD_POOL_GROUP_USING_GCD + return "Grand Central Dispatch"; +#elif defined(_WIN32) + return "Win32 thread pool (Vista+)"; +#elif defined(__linux__) + return "Linux native"; +#else +#error Unknown platform +#endif +} + +class dynamic_thread_pool_group_impl final : public dynamic_thread_pool_group +{ + friend struct detail::global_dynamic_thread_pool_impl; + + mutable std::mutex _lock; + size_t _nesting_level{0}; + struct workitems_t + { + size_t count{0}; + dynamic_thread_pool_group::work_item *front{nullptr}, *back{nullptr}; + } _work_items_active, _work_items_done, _work_items_delayed; + std::atomic<bool> _stopping{false}, _stopped{true}, _completing{false}; + std::atomic<int> _waits{0}; + result<void> _abnormal_completion_cause{success()}; // The cause of any abnormal group completion + +#if LLFIO_DYNAMIC_THREAD_POOL_GROUP_USING_GCD + dispatch_group_t _grouph; +#elif defined(_WIN32) + TP_CALLBACK_ENVIRON _callbackenviron; + PTP_CALLBACK_ENVIRON _grouph{&_callbackenviron}; +#else + void *_grouph{nullptr}; + size_t _newly_added_active_work_items{0}; + size_t _timer_work_items_remaining{0}; + size_t _active_work_items_remaining{0}; +#endif + +public: + result<void> init() + { + LLFIO_LOG_FUNCTION_CALL(this); + try + { + auto &impl = detail::global_dynamic_thread_pool(); + _nesting_level = detail::global_dynamic_thread_pool_thread_local_state().nesting_level; +#if LLFIO_DYNAMIC_THREAD_POOL_GROUP_USING_GCD + _grouph = dispatch_group_create(); + if(_grouph == nullptr) + { + return errc::not_enough_memory; + } +#elif defined(_WIN32) + InitializeThreadpoolEnvironment(_grouph); +#endif + detail::global_dynamic_thread_pool_impl::workqueue_guard g(impl.workqueue_lock); + // Append this group to the global work queue at its nesting level + if(!impl.workqueue || impl.workqueue->nesting_level <= _nesting_level) + { + impl.workqueue = std::make_shared<detail::global_dynamic_thread_pool_impl_workqueue_item>(_nesting_level, std::move(impl.workqueue)); + } + impl.workqueue->items.insert(this); + return success(); + } + catch(...) + { + return error_from_exception(); + } + } + + virtual ~dynamic_thread_pool_group_impl() + { + LLFIO_LOG_FUNCTION_CALL(this); + (void) wait(); + auto &impl = detail::global_dynamic_thread_pool(); + // detail::dynamic_thread_pool_group_impl_guard g1(_lock); +#if LLFIO_DYNAMIC_THREAD_POOL_GROUP_USING_GCD + if(nullptr != _grouph) + { + dispatch_release(_grouph); + _grouph = nullptr; + } +#elif defined(_WIN32) + if(nullptr != _grouph) + { + DestroyThreadpoolEnvironment(_grouph); + _grouph = nullptr; + } +#endif + detail::global_dynamic_thread_pool_impl::workqueue_guard g2(impl.workqueue_lock); + assert(impl.workqueue->nesting_level >= _nesting_level); + for(auto *p = impl.workqueue.get(); p != nullptr; p = p->next.get()) + { + if(p->nesting_level == _nesting_level) + { + p->items.erase(this); + break; + } + } + while(impl.workqueue && impl.workqueue->items.empty()) + { + impl.workqueue = std::move(impl.workqueue->next); + } + } + + virtual result<void> submit(span<work_item *> work) noexcept override + { + LLFIO_LOG_FUNCTION_CALL(this); + if(_stopping.load(std::memory_order_relaxed)) + { + return errc::operation_canceled; + } + if(_completing.load(std::memory_order_relaxed)) + { + for(auto *i : work) + { + i->_parent.store(this, std::memory_order_release); + detail::global_dynamic_thread_pool_impl::_append_to_list(_work_items_delayed, i); + } + return success(); + } + _stopped.store(false, std::memory_order_release); + auto &impl = detail::global_dynamic_thread_pool(); + detail::dynamic_thread_pool_group_impl_guard g(_lock); // lock group + if(_work_items_active.count == 0 && _work_items_done.count == 0) + { + _abnormal_completion_cause = success(); + } + OUTCOME_TRY(impl.submit(g, this, work)); + if(_work_items_active.count == 0) + { + _stopped.store(true, std::memory_order_release); + } + return success(); + } + + virtual result<void> stop() noexcept override + { + LLFIO_LOG_FUNCTION_CALL(this); + if(_stopped.load(std::memory_order_relaxed)) + { + return success(); + } + auto &impl = detail::global_dynamic_thread_pool(); + detail::dynamic_thread_pool_group_impl_guard g(_lock); // lock group + return impl.stop(g, this, errc::operation_canceled); + } + + virtual bool stopping() const noexcept override { return _stopping.load(std::memory_order_relaxed); } + + virtual bool stopped() const noexcept override { return _stopped.load(std::memory_order_relaxed); } + + virtual result<void> wait(deadline d = {}) const noexcept override + { + LLFIO_LOG_FUNCTION_CALL(this); + if(_stopped.load(std::memory_order_relaxed)) + { + return success(); + } + auto &impl = detail::global_dynamic_thread_pool(); + detail::dynamic_thread_pool_group_impl_guard g(_lock); // lock group + return impl.wait(g, true, const_cast<dynamic_thread_pool_group_impl *>(this), d); + } +}; + +LLFIO_HEADERS_ONLY_MEMFUNC_SPEC size_t dynamic_thread_pool_group::current_nesting_level() noexcept +{ + return detail::global_dynamic_thread_pool_thread_local_state().nesting_level; +} + +LLFIO_HEADERS_ONLY_MEMFUNC_SPEC dynamic_thread_pool_group::work_item *dynamic_thread_pool_group::current_work_item() noexcept +{ + return detail::global_dynamic_thread_pool_thread_local_state().workitem; +} + +LLFIO_HEADERS_ONLY_MEMFUNC_SPEC uint32_t dynamic_thread_pool_group::ms_sleep_for_more_work() noexcept +{ +#if !LLFIO_DYNAMIC_THREAD_POOL_GROUP_USING_GCD && !defined(_WIN32) + return detail::global_dynamic_thread_pool().ms_sleep_for_more_work.load(std::memory_order_relaxed); +#else + return 0; +#endif +} + +LLFIO_HEADERS_ONLY_MEMFUNC_SPEC uint32_t dynamic_thread_pool_group::ms_sleep_for_more_work(uint32_t v) noexcept +{ +#if !LLFIO_DYNAMIC_THREAD_POOL_GROUP_USING_GCD && !defined(_WIN32) + if(0 == v) + { + v = 1; + } + detail::global_dynamic_thread_pool().ms_sleep_for_more_work.store(v, std::memory_order_relaxed); + return v; +#else + (void) v; + return 0; +#endif +} + +LLFIO_HEADERS_ONLY_FUNC_SPEC result<dynamic_thread_pool_group_ptr> make_dynamic_thread_pool_group() noexcept +{ + try + { + auto ret = std::make_unique<dynamic_thread_pool_group_impl>(); + OUTCOME_TRY(ret->init()); + return dynamic_thread_pool_group_ptr(std::move(ret)); + } + catch(...) + { + return error_from_exception(); + } +} + +namespace detail +{ +#if !LLFIO_DYNAMIC_THREAD_POOL_GROUP_USING_GCD && !defined(_WIN32) + inline void global_dynamic_thread_pool_impl::_execute_work(thread_t *self) + { + pthread_setname_np(pthread_self(), "LLFIO DYN TPG"); + self->last_did_work = std::chrono::steady_clock::now(); + self->state.fetch_add(1, std::memory_order_release); // busy + const unsigned mythreadidx = threadpool_threads.fetch_add(1, std::memory_order_release); +#if LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING + std::cout << "*** DTP " << self << " begins." << std::endl; +#endif + while(self->state.load(std::memory_order_relaxed) > 0) + { + dynamic_thread_pool_group::work_item *workitem = nullptr; + bool workitem_is_timer = false; + std::chrono::steady_clock::time_point now_steady, earliest_duration; + std::chrono::system_clock::time_point now_system, earliest_absolute; + // Start from highest priority work group, executing any timers due before selecting a work item + { + auto examine_wq = [&](global_dynamic_thread_pool_impl_workqueue_item &wq) -> dynamic_thread_pool_group::work_item * { + if(wq.next_timer_relative.count.load(std::memory_order_relaxed) > 0) + { + if(now_steady == std::chrono::steady_clock::time_point()) + { + now_steady = std::chrono::steady_clock::now(); + } + wq.next_timer_relative.lock.lock(); + if(wq.next_timer_relative.front != nullptr) + { + if(wq.next_timer_relative.front->_timepoint1 <= now_steady) + { + workitem = wq.next_timer<1>(); // unlocks wq.next_timer_relative.lock + workitem_is_timer = true; + return workitem; + } + if(earliest_duration == std::chrono::steady_clock::time_point() || wq.next_timer_relative.front->_timepoint1 < earliest_duration) + { + earliest_duration = wq.next_timer_relative.front->_timepoint1; + } + } + wq.next_timer_relative.lock.unlock(); + } + if(wq.next_timer_absolute.count.load(std::memory_order_relaxed) > 0) + { + if(now_system == std::chrono::system_clock::time_point()) + { + now_system = std::chrono::system_clock::now(); + } + wq.next_timer_absolute.lock.lock(); + if(wq.next_timer_absolute.front != nullptr) + { + if(wq.next_timer_absolute.front->_timepoint2 <= now_system) + { + workitem = wq.next_timer<2>(); // unlocks wq.next_timer_absolute.lock + workitem_is_timer = true; + return workitem; + } + if(earliest_absolute == std::chrono::system_clock::time_point() || wq.next_timer_absolute.front->_timepoint2 < earliest_absolute) + { + earliest_absolute = wq.next_timer_absolute.front->_timepoint2; + } + } + wq.next_timer_absolute.lock.unlock(); + } + unsigned count = 0; + return wq.next_active(count, mythreadidx); + }; + workitem = examine_wq(first_execute); + if(workitem == nullptr) + { + workqueue_lock.lock(); + auto lock_wq = workqueue; // take shared_ptr to highest priority collection of work groups + workqueue_lock.unlock(); + while(lock_wq) + { + workitem = examine_wq(*lock_wq); + if(workitem != nullptr) + { + // workqueue_lock.lock(); + // std::cout << "workitem = " << workitem << " nesting_level = " << wq.nesting_level << " count = " << count << std::endl; + // workqueue_lock.unlock(); + break; + } + workqueue_lock.lock(); + lock_wq = lock_wq->next; + workqueue_lock.unlock(); + } + } + } + if(now_steady == std::chrono::steady_clock::time_point()) + { + now_steady = std::chrono::steady_clock::now(); + } + // If there are no timers, and no work to do, time to either die or sleep + if(workitem == nullptr) + { + const std::chrono::steady_clock::duration max_sleep(std::chrono::milliseconds(ms_sleep_for_more_work.load(std::memory_order_relaxed))); + if(now_steady - self->last_did_work >= max_sleep) + { + threadpool_guard g(threadpool_lock); + // If there are any timers running, leave at least one worker thread + if(threadpool_active.count > 1 || + (earliest_duration == std::chrono::steady_clock::time_point() && earliest_absolute == std::chrono::system_clock::time_point())) + { + _remove_from_list(threadpool_active, self); + threadpool_threads.fetch_sub(1, std::memory_order_release); +#if LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING + std::cout << "*** DTP " << self << " exits due to no new work for ms_sleep_for_more_work" << std::endl; +#endif + self->thread.detach(); + delete self; + return; + } + } + std::chrono::steady_clock::duration duration(max_sleep); + if(earliest_duration != std::chrono::steady_clock::time_point()) + { + if(now_steady - earliest_duration < duration) + { + duration = now_steady - earliest_duration; + } + } + else if(earliest_absolute != std::chrono::system_clock::time_point()) + { + if(now_system == std::chrono::system_clock::time_point()) + { + now_system = std::chrono::system_clock::now(); + } + auto diff = now_system - earliest_absolute; + if(diff > duration) + { + earliest_absolute = {}; + } + } + threadpool_guard g(threadpool_lock); + _remove_from_list(threadpool_active, self); + _append_to_list(threadpool_sleeping, self); + self->state.fetch_sub(1, std::memory_order_release); + if(earliest_absolute != std::chrono::system_clock::time_point()) + { +#if LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING + std::cout << "*** DTP " << self << " goes to sleep absolute" << std::endl; +#endif + self->cond.wait_until(g, earliest_absolute); + } + else + { +#if LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING + std::cout << "*** DTP " << self << " goes to sleep for " << std::chrono::duration_cast<std::chrono::milliseconds>(duration).count() << std::endl; +#endif + self->cond.wait_for(g, duration); + } + self->state.fetch_add(1, std::memory_order_release); + _remove_from_list(threadpool_sleeping, self); + _append_to_list(threadpool_active, self); +#if LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING + std::cout << "*** DTP " << self << " wakes, state = " << self->state << std::endl; +#endif + g.unlock(); + try + { + populate_threadmetrics(now_steady); + } + catch(...) + { + } + continue; + } + self->last_did_work = now_steady; +#if LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING + std::cout << "*** DTP " << self << " executes work item " << workitem << std::endl; +#endif + total_submitted_workitems.fetch_sub(1, std::memory_order_relaxed); + if(workitem_is_timer) + { + _timerthread(workitem, nullptr); + } + else + { + _workerthread(workitem, nullptr); + } + // workitem->_internalworkh should be null, however workitem may also no longer exist + try + { + if(populate_threadmetrics(now_steady)) + { + threadpool_guard g(threadpool_lock); + _remove_from_list(threadpool_active, self); + threadpool_threads.fetch_sub(1, std::memory_order_release); +#if LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING + std::cout << "*** DTP " << self << " exits due to threadmetrics saying we exceed max concurrency" << std::endl; +#endif + self->thread.detach(); + delete self; + return; + } + } + catch(...) + { + } + } + self->state.fetch_sub(2, std::memory_order_release); // dead + threadpool_threads.fetch_sub(1, std::memory_order_release); +#if LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING + std::cout << "*** DTP " << self << " exits due to state request, state = " << self->state << std::endl; +#endif + } +#endif + + inline void global_dynamic_thread_pool_impl::_submit_work_item(bool submit_into_highest_priority, dynamic_thread_pool_group::work_item *workitem, + bool defer_pool_wake) + { + (void) submit_into_highest_priority; + (void) defer_pool_wake; + const auto nextwork = workitem->_nextwork.load(std::memory_order_acquire); + if(nextwork != -1) + { + auto *parent = workitem->_parent.load(std::memory_order_relaxed); + // If no work item for now, or there is a delay, schedule a timer + if(nextwork == 0 || workitem->_has_timer_set()) + { +#if LLFIO_DYNAMIC_THREAD_POOL_GROUP_USING_GCD + dispatch_time_t when; + if(workitem->_has_timer_set_relative()) + { + // Special constant for immediately rescheduled work items + if(workitem->_timepoint1 == std::chrono::steady_clock::time_point(std::chrono::steady_clock::duration(1))) + { + when = dispatch_time(DISPATCH_TIME_NOW, 0); + } + else + { + auto duration = std::chrono::duration_cast<std::chrono::nanoseconds>(workitem->_timepoint1 - std::chrono::steady_clock::now()).count(); + if(duration > 1000000000LL) + { + // Because GCD has no way of cancelling timers, nor assigning them to a group, + // we clamp the timer to 1 second. Then if cancellation is ever done to the group, + // the worst possible wait is 1 second. _timerthread will reschedule the timer + // if it gets called short. + duration = 1000000000LL; + } + when = dispatch_time(DISPATCH_TIME_NOW, duration); + } + } + else if(workitem->_has_timer_set_absolute()) + { + deadline d(workitem->_timepoint2); + auto now = std::chrono::system_clock::now(); + if(workitem->_timepoint2 - now > std::chrono::seconds(1)) + { + d = now + std::chrono::seconds(1); + } + when = dispatch_walltime(&d.utc, 0); + } + else + { + when = dispatch_time(DISPATCH_TIME_NOW, 1); // smallest possible non immediate duration from now + } + // std::cout << "*** timer " << workitem << std::endl; + dispatch_after_f(when, dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_HIGH, 0), workitem, _gcd_timer_callback); +#elif defined(_WIN32) + LARGE_INTEGER li; + DWORD slop = 1000; + if(workitem->_has_timer_set_relative()) + { + // Special constant for immediately rescheduled work items + if(workitem->_timepoint1 == std::chrono::steady_clock::time_point(std::chrono::steady_clock::duration(1))) + { + li.QuadPart = -1; // smallest possible non immediate duration from now + } + else + { + li.QuadPart = std::chrono::duration_cast<std::chrono::nanoseconds>(workitem->_timepoint1 - std::chrono::steady_clock::now()).count() / 100; + if(li.QuadPart < 0) + { + li.QuadPart = 0; + } + if(li.QuadPart / 8 < (int64_t) slop) + { + slop = (DWORD)(li.QuadPart / 8); + } + li.QuadPart = -li.QuadPart; // negative is relative + } + } + else if(workitem->_has_timer_set_absolute()) + { + li = windows_nt_kernel::from_timepoint(workitem->_timepoint2); + } + else + { + li.QuadPart = -1; // smallest possible non immediate duration from now + } + FILETIME ft; + ft.dwHighDateTime = (DWORD) li.HighPart; + ft.dwLowDateTime = li.LowPart; + // std::cout << "*** timer " << workitem << std::endl; + SetThreadpoolTimer((PTP_TIMER) workitem->_internaltimerh, &ft, 0, slop); +#else + workqueue_guard gg(workqueue_lock); + for(auto *p = workqueue.get(); p != nullptr; p = p->next.get()) + { + if(p->nesting_level == parent->_nesting_level) + { + p->append_timer(workitem); + break; + } + } +#endif + } + else + { +#if LLFIO_DYNAMIC_THREAD_POOL_GROUP_USING_GCD + intptr_t priority = DISPATCH_QUEUE_PRIORITY_LOW; + { + global_dynamic_thread_pool_impl::workqueue_guard gg(workqueue_lock); + if(workqueue->nesting_level == parent->_nesting_level) + { + priority = DISPATCH_QUEUE_PRIORITY_HIGH; + } + else if(workqueue->nesting_level == parent->_nesting_level + 1) + { + priority = DISPATCH_QUEUE_PRIORITY_DEFAULT; + } + } + // std::cout << "*** submit " << workitem << std::endl; + dispatch_group_async_f(parent->_grouph, dispatch_get_global_queue(priority, 0), workitem, _gcd_dispatch_callback); +#elif defined(_WIN32) + // Set the priority of the group according to distance from the top + TP_CALLBACK_PRIORITY priority = TP_CALLBACK_PRIORITY_LOW; + { + global_dynamic_thread_pool_impl::workqueue_guard gg(workqueue_lock); + if(workqueue->nesting_level == parent->_nesting_level) + { + priority = TP_CALLBACK_PRIORITY_HIGH; + } + else if(workqueue->nesting_level == parent->_nesting_level + 1) + { + priority = TP_CALLBACK_PRIORITY_NORMAL; + } + } + SetThreadpoolCallbackPriority(parent->_grouph, priority); + // std::cout << "*** submit " << workitem << std::endl; + SubmitThreadpoolWork((PTP_WORK) workitem->_internalworkh); +#else + global_dynamic_thread_pool_impl::workqueue_guard gg(workqueue_lock); + if(submit_into_highest_priority) + { + // TODO: It would be super nice if we prepended this instead if it came from a timer + first_execute.append_active(workitem); + // std::cout << "append_active _nesting_level = " << parent->_nesting_level << std::endl; + } + else + { + for(auto *p = workqueue.get(); p != nullptr; p = p->next.get()) + { + if(p->nesting_level == parent->_nesting_level) + { + // TODO: It would be super nice if we prepended this instead if it came from a timer + p->append_active(workitem); + // std::cout << "append_active _nesting_level = " << parent->_nesting_level << std::endl; + break; + } + } + } +#endif + } + +#if !LLFIO_DYNAMIC_THREAD_POOL_GROUP_USING_GCD && !defined(_WIN32) +#if LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING + std::cout << "*** DTP submits work item " << workitem << std::endl; +#endif + const auto active_work_items = total_submitted_workitems.fetch_add(1, std::memory_order_relaxed) + 1; + if(!defer_pool_wake) + { + { + threadpool_guard gg(threadpool_lock); + if(threadpool_active.count == 0 && threadpool_sleeping.count == 0) + { + _add_thread(gg); + } + else if(threadpool_sleeping.count > 0 && active_work_items > threadpool_active.count) + { + // Try to wake the most recently slept first + auto *t = threadpool_sleeping.back; + auto now = std::chrono::steady_clock::now(); + for(size_t n = std::min(active_work_items - threadpool_active.count, threadpool_sleeping.count); n > 0; n--) + { + t->last_did_work = now; // prevent reap + t->cond.notify_one(); + t = t->_prev; + } + } + } + } +#endif + } + } + + inline result<void> global_dynamic_thread_pool_impl::submit(dynamic_thread_pool_group_impl_guard &g, dynamic_thread_pool_group_impl *group, + span<dynamic_thread_pool_group::work_item *> work) noexcept + { + try + { + if(work.empty()) + { + return success(); + } + for(auto *i : work) + { + if(i->_parent.load(std::memory_order_relaxed) != nullptr) + { + return errc::address_in_use; + } + } + auto uninit = make_scope_exit([&]() noexcept { + for(auto *i : work) + { + _remove_from_list(group->_work_items_active, i); +#if defined(_WIN32) + if(nullptr != i->_internaltimerh) + { + CloseThreadpoolTimer((PTP_TIMER) i->_internaltimerh); + i->_internaltimerh = nullptr; + } + if(nullptr != i->_internalworkh) + { + CloseThreadpoolWork((PTP_WORK) i->_internalworkh); + i->_internalworkh = nullptr; + } +#endif + i->_parent.store(nullptr, std::memory_order_release); + } + }); + for(auto *i : work) + { + deadline d(std::chrono::seconds(0)); + i->_parent.store(group, std::memory_order_release); + i->_nextwork.store(i->next(d), std::memory_order_release); + if(-1 == i->_nextwork.load(std::memory_order_acquire)) + { + _append_to_list(group->_work_items_done, i); + } + else + { +#if defined(_WIN32) + i->_internalworkh = CreateThreadpoolWork(_win32_worker_thread_callback, i, group->_grouph); + if(nullptr == i->_internalworkh) + { + return win32_error(); + } +#endif + OUTCOME_TRY(_prepare_work_item_delay(i, group->_grouph, d)); + _prepend_to_list(group->_work_items_active, i); + } + } + uninit.release(); + g.unlock(); + { + for(auto *i : work) + { +#if !LLFIO_DYNAMIC_THREAD_POOL_GROUP_USING_GCD && !defined(_WIN32) + group->_newly_added_active_work_items++; + group->_active_work_items_remaining++; +#endif + _submit_work_item(true, i, i != work.back()); + } + } + g.lock(); + return success(); + } + catch(...) + { + return error_from_exception(); + } + } + + inline void global_dynamic_thread_pool_impl::_work_item_done(dynamic_thread_pool_group_impl_guard &g, dynamic_thread_pool_group::work_item *i) noexcept + { + (void) g; + // std::cout << "*** _work_item_done " << i << std::endl; + auto *parent = i->_parent.load(std::memory_order_relaxed); + _remove_from_list(parent->_work_items_active, i); + _append_to_list(parent->_work_items_done, i); +#if defined(_WIN32) + if(i->_internalworkh_inuse > 0) + { + i->_internalworkh_inuse = 2; + } + else + { + if(i->_internaltimerh != nullptr) + { + CloseThreadpoolTimer((PTP_TIMER) i->_internaltimerh); + i->_internaltimerh = nullptr; + } + if(i->_internalworkh != nullptr) + { + CloseThreadpoolWork((PTP_WORK) i->_internalworkh); + i->_internalworkh = nullptr; + } + } +#endif + if(parent->_work_items_active.count == 0) + { + i = nullptr; + auto *v = parent->_work_items_done.front, *n = v; + for(; v != nullptr; v = n) + { + v->_parent.store(nullptr, std::memory_order_release); + v->_nextwork.store(-1, std::memory_order_release); + n = v->_next; + } + n = v = parent->_work_items_done.front; + parent->_work_items_done.front = parent->_work_items_done.back = nullptr; + parent->_work_items_done.count = 0; + parent->_stopping.store(false, std::memory_order_release); + parent->_completing.store(true, std::memory_order_release); // cause submissions to enter _work_items_delayed +#if LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING + std::cout << "*** DTP executes group_complete for group " << parent << std::endl; +#endif + for(; v != nullptr; v = n) + { + n = v->_next; + v->group_complete(parent->_abnormal_completion_cause); + } + parent->_stopped.store(true, std::memory_order_release); + parent->_completing.store(false, std::memory_order_release); // cease submitting to _work_items_delayed +#if LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING + std::cout << "*** DTP group_complete done for group " << parent << ". _work_items_delayed.count = " << parent->_work_items_delayed.count << std::endl; +#endif + if(parent->_work_items_delayed.count > 0) + { + /* If there are waits on this group to complete, forward progress those now. + */ + while(parent->_waits.load(std::memory_order_relaxed) > 0) + { +#if LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING + std::cout << "*** DTP group_complete blocks on waits for group " << parent << std::endl; +#endif + g.unlock(); + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + g.lock(); + } + // Now submit all delayed work + while(parent->_work_items_delayed.count > 0) + { + i = parent->_work_items_delayed.front; + _remove_from_list(parent->_work_items_delayed, i); + auto r = submit(g, parent, {&i, 1}); +#if LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING + std::cout << "*** DTP group_complete submits delayed work item " << i << " for group " << parent << " which saw error "; + if(r) + { + std::cout << "none" << std::endl; + } + else + { + std::cout << r.error().message() << std::endl; + } +#endif + if(!r) + { + parent->_work_items_delayed = {}; + (void) stop(g, parent, std::move(r)); + break; + } + } + } + } + } + + inline result<void> global_dynamic_thread_pool_impl::stop(dynamic_thread_pool_group_impl_guard &g, dynamic_thread_pool_group_impl *group, + result<void> err) noexcept + { + (void) g; + if(group->_abnormal_completion_cause) + { + group->_abnormal_completion_cause = std::move(err); + } + group->_stopping.store(true, std::memory_order_release); + return success(); + } + + + inline result<void> global_dynamic_thread_pool_impl::wait(dynamic_thread_pool_group_impl_guard &g, bool reap, dynamic_thread_pool_group_impl *group, + deadline d) noexcept + { + LLFIO_DEADLINE_TO_SLEEP_INIT(d); + if(!d || d.nsecs > 0) + { + /* To ensure forward progress, we need to gate new waits during delayed work submission. + Otherwise waits may never exit if the window where _work_items_active.count == 0 is + missed. + */ + while(group->_work_items_delayed.count > 0) + { + g.unlock(); + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + g.lock(); + } + group->_waits.fetch_add(1, std::memory_order_release); + auto unwaitcount = make_scope_exit([&]() noexcept { group->_waits.fetch_sub(1, std::memory_order_release); }); +#if LLFIO_DYNAMIC_THREAD_POOL_GROUP_USING_GCD + while(group->_work_items_active.count > 0) + { + LLFIO_DEADLINE_TO_TIMEOUT_LOOP(d); + dispatch_time_t timeout = DISPATCH_TIME_FOREVER; + if(d) + { + std::chrono::nanoseconds duration; + LLFIO_DEADLINE_TO_PARTIAL_TIMEOUT(duration, d); + timeout = dispatch_time(DISPATCH_TIME_NOW, duration.count()); + } + g.unlock(); + dispatch_group_wait(group->_grouph, timeout); + g.lock(); + // if(1 == group->_work_items_active.count) + //{ + // std::cout << "*** wait item remaining is " << group->_work_items_active.front << std::endl; + // std::this_thread::sleep_for(std::chrono::seconds(1)); + //} + } +#elif defined(_WIN32) + auto &tls = detail::global_dynamic_thread_pool_thread_local_state(); + if(tls.current_callback_instance != nullptr) + { + // I am being called from within a thread worker. Tell + // the thread pool that I am not going to exit promptly. + CallbackMayRunLong(tls.current_callback_instance); + } + // Is this a cancellation? + if(group->_stopping.load(std::memory_order_relaxed)) + { + while(group->_work_items_active.count > 0) + { + auto *i = group->_work_items_active.front; + if(nullptr != i->_internalworkh) + { + if(0 == i->_internalworkh_inuse) + { + i->_internalworkh_inuse = 1; + } + g.unlock(); + WaitForThreadpoolWorkCallbacks((PTP_WORK) i->_internalworkh, true); + g.lock(); + if(i->_internalworkh_inuse == 2) + { + if(nullptr != i->_internalworkh) + { + CloseThreadpoolWork((PTP_WORK) i->_internalworkh); + i->_internalworkh = nullptr; + } + if(nullptr != i->_internaltimerh) + { + CloseThreadpoolTimer((PTP_TIMER) i->_internaltimerh); + i->_internaltimerh = nullptr; + } + } + i->_internalworkh_inuse = 0; + } + if(nullptr != i->_internaltimerh) + { + if(0 == i->_internalworkh_inuse) + { + i->_internalworkh_inuse = 1; + } + g.unlock(); + WaitForThreadpoolTimerCallbacks((PTP_TIMER) i->_internaltimerh, true); + g.lock(); + if(i->_internalworkh_inuse == 2) + { + if(nullptr != i->_internalworkh) + { + CloseThreadpoolWork((PTP_WORK) i->_internalworkh); + i->_internalworkh = nullptr; + } + if(nullptr != i->_internaltimerh) + { + CloseThreadpoolTimer((PTP_TIMER) i->_internaltimerh); + i->_internaltimerh = nullptr; + } + } + i->_internalworkh_inuse = 0; + } + if(group->_work_items_active.count > 0 && group->_work_items_active.front == i) + { + // This item got cancelled before it started + _work_item_done(g, group->_work_items_active.front); + } + } + assert(!group->_stopping.load(std::memory_order_relaxed)); + } + else if(!d) + { + while(group->_work_items_active.count > 0) + { + auto *i = group->_work_items_active.front; + if(nullptr != i->_internalworkh) + { + if(0 == i->_internalworkh_inuse) + { + i->_internalworkh_inuse = 1; + } + g.unlock(); + WaitForThreadpoolWorkCallbacks((PTP_WORK) i->_internalworkh, false); + g.lock(); + if(i->_internalworkh_inuse == 2) + { + if(nullptr != i->_internalworkh) + { + CloseThreadpoolWork((PTP_WORK) i->_internalworkh); + i->_internalworkh = nullptr; + } + if(nullptr != i->_internaltimerh) + { + CloseThreadpoolTimer((PTP_TIMER) i->_internaltimerh); + i->_internaltimerh = nullptr; + } + } + i->_internalworkh_inuse = 0; + } + if(nullptr != i->_internaltimerh) + { + if(0 == i->_internalworkh_inuse) + { + i->_internalworkh_inuse = 1; + } + g.unlock(); + WaitForThreadpoolTimerCallbacks((PTP_TIMER) i->_internaltimerh, false); + g.lock(); + if(i->_internalworkh_inuse == 2) + { + if(nullptr != i->_internalworkh) + { + CloseThreadpoolWork((PTP_WORK) i->_internalworkh); + i->_internalworkh = nullptr; + } + if(nullptr != i->_internaltimerh) + { + CloseThreadpoolTimer((PTP_TIMER) i->_internaltimerh); + i->_internaltimerh = nullptr; + } + } + i->_internalworkh_inuse = 0; + } + } + } + else + { + while(group->_work_items_active.count > 0) + { + LLFIO_DEADLINE_TO_TIMEOUT_LOOP(d); + g.unlock(); + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + g.lock(); + } + } +#else + while(group->_work_items_active.count > 0) + { + LLFIO_DEADLINE_TO_TIMEOUT_LOOP(d); + g.unlock(); + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + g.lock(); + } +#endif + } + if(group->_work_items_active.count > 0) + { + return errc::timed_out; + } + if(reap) + { + return std::move(group->_abnormal_completion_cause); + } + return success(); + } + + inline void global_dynamic_thread_pool_impl::_timerthread(dynamic_thread_pool_group::work_item *workitem, threadh_type /*unused*/) + { + LLFIO_LOG_FUNCTION_CALL(this); + assert(workitem->_nextwork.load(std::memory_order_relaxed) != -1); + assert(workitem->_has_timer_set()); + auto *parent = workitem->_parent.load(std::memory_order_relaxed); + // std::cout << "*** _timerthread " << workitem << std::endl; + if(parent->_stopping.load(std::memory_order_relaxed)) + { + dynamic_thread_pool_group_impl_guard g(parent->_lock); // lock group + _work_item_done(g, workitem); + return; + } + if(workitem->_has_timer_set_relative()) + { +#if LLFIO_DYNAMIC_THREAD_POOL_GROUP_USING_GCD + auto now = std::chrono::steady_clock::now(); + if(workitem->_timepoint1 - now > std::chrono::seconds(0)) + { + // Timer fired short, so schedule it again + _submit_work_item(false, workitem, false); + return; + } +#endif + workitem->_timepoint1 = {}; + } + if(workitem->_has_timer_set_absolute()) + { +#if LLFIO_DYNAMIC_THREAD_POOL_GROUP_USING_GCD + auto now = std::chrono::system_clock::now(); + if(workitem->_timepoint2 - now > std::chrono::seconds(0)) + { + // Timer fired short, so schedule it again + _submit_work_item(false, workitem, false); + return; + } +#endif + workitem->_timepoint2 = {}; + } + assert(!workitem->_has_timer_set()); + if(workitem->_nextwork.load(std::memory_order_acquire) == 0) + { + deadline d(std::chrono::seconds(0)); + workitem->_nextwork.store(workitem->next(d), std::memory_order_release); + auto r2 = _prepare_work_item_delay(workitem, parent->_grouph, d); + if(!r2) + { + dynamic_thread_pool_group_impl_guard g(parent->_lock); // lock group + (void) stop(g, parent, std::move(r2)); + _work_item_done(g, workitem); + return; + } + if(-1 == workitem->_nextwork.load(std::memory_order_relaxed)) + { + dynamic_thread_pool_group_impl_guard g(parent->_lock); // lock group + _work_item_done(g, workitem); + return; + } + _submit_work_item(false, workitem, false); + return; + } + _submit_work_item(false, workitem, false); + } + + // Worker thread entry point + inline void global_dynamic_thread_pool_impl::_workerthread(dynamic_thread_pool_group::work_item *workitem, threadh_type selfthreadh) + { + LLFIO_LOG_FUNCTION_CALL(this); + //{ + // _lock_guard g(parent->_lock); + // std::cout << "*** _workerthread " << workitem << " begins with work " << workitem->_nextwork.load(std::memory_order_relaxed) << std::endl; + //} + assert(workitem->_nextwork.load(std::memory_order_relaxed) != -1); + assert(workitem->_nextwork.load(std::memory_order_relaxed) != 0); + auto *parent = workitem->_parent.load(std::memory_order_relaxed); + if(parent->_stopping.load(std::memory_order_relaxed)) + { + dynamic_thread_pool_group_impl_guard g(parent->_lock); // lock group + _work_item_done(g, workitem); + return; + } + auto &tls = detail::global_dynamic_thread_pool_thread_local_state(); + auto old_thread_local_state = tls; + tls.workitem = workitem; + tls.current_callback_instance = selfthreadh; + tls.nesting_level = parent->_nesting_level + 1; + auto r = (*workitem)(workitem->_nextwork.load(std::memory_order_acquire)); + workitem->_nextwork.store(0, std::memory_order_release); // call next() next time + tls = old_thread_local_state; + // std::cout << "*** _workerthread " << workitem << " ends with work " << workitem->_nextwork << std::endl; + if(!r) + { + dynamic_thread_pool_group_impl_guard g(parent->_lock); // lock group + (void) stop(g, parent, std::move(r)); + _work_item_done(g, workitem); + workitem = nullptr; + } + else if(parent->_stopping.load(std::memory_order_relaxed)) + { + dynamic_thread_pool_group_impl_guard g(parent->_lock); // lock group + _work_item_done(g, workitem); + } + else + { + deadline d(std::chrono::seconds(0)); + workitem->_nextwork.store(workitem->next(d), std::memory_order_release); + auto r2 = _prepare_work_item_delay(workitem, parent->_grouph, d); + if(!r2) + { + dynamic_thread_pool_group_impl_guard g(parent->_lock); // lock group + (void) stop(g, parent, std::move(r2)); + _work_item_done(g, workitem); + return; + } + if(-1 == workitem->_nextwork.load(std::memory_order_relaxed)) + { + dynamic_thread_pool_group_impl_guard g(parent->_lock); // lock group + _work_item_done(g, workitem); + return; + } + _submit_work_item(false, workitem, false); + } + } +} // namespace detail + + +/****************************************** io_aware_work_item *********************************************/ + +LLFIO_HEADERS_ONLY_MEMFUNC_SPEC dynamic_thread_pool_group::io_aware_work_item::io_aware_work_item(span<io_handle_awareness> hs) + : _handles([](span<io_handle_awareness> hs) -> span<io_handle_awareness> { + float all = 0; + for(auto &i : hs) + { + all += i.reads + i.writes + i.barriers; + } + for(auto &i : hs) + { + if(all == 0.0f) + { + i.reads = i.writes = 0.5f; + i.barriers = 0.0f; + } + else + { + i.reads /= all; + i.writes /= all; + i.barriers /= all; + } + } + auto &impl = detail::global_dynamic_thread_pool(); + detail::global_dynamic_thread_pool_impl::io_aware_work_item_handles_guard g(impl.io_aware_work_item_handles_lock); + for(auto &h : hs) + { + if(!h.h->is_seekable()) + { + throw std::runtime_error("Supplied handle is not seekable"); + } + auto *fh = static_cast<file_handle *>(h.h); + auto unique_id = fh->unique_id(); + auto it = impl.io_aware_work_item_handles.find(unique_id); + if(it == impl.io_aware_work_item_handles.end()) + { + it = impl.io_aware_work_item_handles.emplace(unique_id, detail::global_dynamic_thread_pool_impl::io_aware_work_item_statfs{}).first; + auto r = it->second.statfs.fill(*fh, statfs_t::want::iosinprogress | statfs_t::want::iosbusytime); + if(!r || it->second.statfs.f_iosinprogress == (uint32_t) -1) + { + impl.io_aware_work_item_handles.erase(it); + if(!r) + { + r.value(); + } + throw std::runtime_error("statfs::f_iosinprogress unavailable for supplied handle"); + } + it->second.last_updated = std::chrono::steady_clock::now(); + } + it->second.refcount++; + h._internal = &*it; + } + return hs; + }(hs)) +{ + LLFIO_LOG_FUNCTION_CALL(this); +} + +LLFIO_HEADERS_ONLY_MEMFUNC_SPEC dynamic_thread_pool_group::io_aware_work_item::~io_aware_work_item() +{ + LLFIO_LOG_FUNCTION_CALL(this); + auto &impl = detail::global_dynamic_thread_pool(); + detail::global_dynamic_thread_pool_impl::io_aware_work_item_handles_guard g(impl.io_aware_work_item_handles_lock); + using value_type = decltype(impl.io_aware_work_item_handles)::value_type; + for(auto &h : _handles) + { + auto *i = (value_type *) h._internal; + if(0 == --i->second.refcount) + { + impl.io_aware_work_item_handles.erase(i->first); + } + } +} + +LLFIO_HEADERS_ONLY_MEMFUNC_SPEC intptr_t dynamic_thread_pool_group::io_aware_work_item::next(deadline &d) noexcept +{ + LLFIO_LOG_FUNCTION_CALL(this); + { + auto &impl = detail::global_dynamic_thread_pool(); + auto now = std::chrono::steady_clock::now(); + detail::global_dynamic_thread_pool_impl::io_aware_work_item_handles_guard g(impl.io_aware_work_item_handles_lock); + using value_type = decltype(impl.io_aware_work_item_handles)::value_type; + for(auto &h : _handles) + { + auto *i = (value_type *) h._internal; + if(std::chrono::duration_cast<std::chrono::milliseconds>(now - i->second.last_updated) >= std::chrono::milliseconds(100)) + { + // auto old_iosinprogress = i->second.statfs.f_iosinprogress; + auto elapsed = now - i->second.last_updated; + (void) i->second.statfs.fill(*h.h, statfs_t::want::iosinprogress | statfs_t::want::iosbusytime); + i->second.last_updated = now; + + if(elapsed > std::chrono::seconds(5)) + { + i->second.average_busy = i->second.statfs.f_iosbusytime; + i->second.average_queuedepth = (float) i->second.statfs.f_iosinprogress; + } + else + { + i->second.average_busy = (i->second.average_busy * 0.9f) + (i->second.statfs.f_iosbusytime * 0.1f); + i->second.average_queuedepth = (i->second.average_queuedepth * 0.9f) + (i->second.statfs.f_iosinprogress * 0.1f); + } + if(i->second.average_busy < this->max_iosbusytime && i->second.average_queuedepth < this->min_iosinprogress) + { + i->second.default_deadline = std::chrono::seconds(0); // remove pacing + } + else if(i->second.average_queuedepth > this->max_iosinprogress) + { + if(0 == i->second.default_deadline.nsecs) + { + i->second.default_deadline = std::chrono::milliseconds(1); // start with 1ms, it'll reduce from there if needed + } + else if((i->second.default_deadline.nsecs >> 4) > 0) + { + i->second.default_deadline.nsecs += i->second.default_deadline.nsecs >> 4; + } + else + { + i->second.default_deadline.nsecs++; + } + } + else if(i->second.average_queuedepth < this->min_iosinprogress) + { + if(i->second.default_deadline.nsecs > (i->second.default_deadline.nsecs >> 4) && (i->second.default_deadline.nsecs >> 4) > 0) + { + i->second.default_deadline.nsecs -= i->second.default_deadline.nsecs >> 4; + } + else if(i->second.default_deadline.nsecs > 1) + { + i->second.default_deadline.nsecs--; + } + } + } + if(d.nsecs < i->second.default_deadline.nsecs) + { + d = i->second.default_deadline; + } + } + } + return io_aware_next(d); +} + + +LLFIO_V2_NAMESPACE_END diff --git a/include/llfio/v2.0/detail/impl/posix/directory_handle.ipp b/include/llfio/v2.0/detail/impl/posix/directory_handle.ipp index 9ea737c0..bd0050d6 100644 --- a/include/llfio/v2.0/detail/impl/posix/directory_handle.ipp +++ b/include/llfio/v2.0/detail/impl/posix/directory_handle.ipp @@ -39,7 +39,8 @@ http://www.boost.org/LICENSE_1_0.txt) LLFIO_V2_NAMESPACE_BEGIN -result<directory_handle> directory_handle::directory(const path_handle &base, path_view_type path, mode _mode, creation _creation, caching _caching, flag flags) noexcept +result<directory_handle> directory_handle::directory(const path_handle &base, path_view_type path, mode _mode, creation _creation, caching _caching, + flag flags) noexcept { if(flags & flag::unlink_on_first_close) { @@ -314,9 +315,12 @@ result<directory_handle::buffers_type> directory_handle::read(io_request<buffers req.buffers[0].stat.st_birthtim = to_timepoint(s.st_birthtim); #endif #endif - req.buffers[0].stat.st_sparse = static_cast<unsigned int>((static_cast<handle::extent_type>(s.st_blocks) * 512) < static_cast<handle::extent_type>(s.st_size)); + req.buffers[0].stat.st_sparse = + static_cast<unsigned int>((static_cast<handle::extent_type>(s.st_blocks) * 512) < static_cast<handle::extent_type>(s.st_size)); req.buffers._resize(1); - static constexpr stat_t::want default_stat_contents = stat_t::want::dev | stat_t::want::ino | stat_t::want::type | stat_t::want::perms | stat_t::want::nlink | stat_t::want::uid | stat_t::want::gid | stat_t::want::rdev | stat_t::want::atim | stat_t::want::mtim | stat_t::want::ctim | stat_t::want::size | + static constexpr stat_t::want default_stat_contents = stat_t::want::dev | stat_t::want::ino | stat_t::want::type | stat_t::want::perms | + stat_t::want::nlink | stat_t::want::uid | stat_t::want::gid | stat_t::want::rdev | + stat_t::want::atim | stat_t::want::mtim | stat_t::want::ctim | stat_t::want::size | stat_t::want::allocated | stat_t::want::blocks | stat_t::want::blksize #ifdef HAVE_STAT_FLAGS | stat_t::want::flags @@ -360,8 +364,7 @@ result<directory_handle::buffers_type> directory_handle::read(io_request<buffers } stat_t::want default_stat_contents = stat_t::want::ino | stat_t::want::type; dirent *buffer; - size_t bytesavailable; - int bytes; + size_t bytesavailable, bytes; bool done = false; do { @@ -383,27 +386,41 @@ result<directory_handle::buffers_type> directory_handle::read(io_request<buffers if(-1 == ::lseek(_v.fd, 0, SEEK_SET)) return posix_error(); #endif - bytes = getdents(_v.fd, reinterpret_cast<char *>(buffer), bytesavailable); - if(req.kernelbuffer.empty() && bytes == -1 && EINVAL == errno) + bytes = 0; + int _bytes; + do { - size_t toallocate = req.buffers._kernel_buffer_size * 2; - auto *mem = (char *) operator new[](toallocate, std::nothrow); // don't initialise - if(mem == nullptr) + assert(bytes <= bytesavailable); + _bytes = getdents(_v.fd, reinterpret_cast<char *>(buffer) + bytes, bytesavailable - bytes); + if(_bytes == 0) { - return errc::not_enough_memory; + done = true; + break; } - req.buffers._kernel_buffer.reset(); - req.buffers._kernel_buffer = std::unique_ptr<char[]>(mem); - req.buffers._kernel_buffer_size = toallocate; - } - else - { - if(bytes == -1) + if(req.kernelbuffer.empty() && _bytes == -1 && EINVAL == errno) + { + size_t toallocate = req.buffers._kernel_buffer_size * 2; + auto *mem = (char *) operator new[](toallocate, std::nothrow); // don't initialise + if(mem == nullptr) + { + return errc::not_enough_memory; + } + req.buffers._kernel_buffer.reset(); + req.buffers._kernel_buffer = std::unique_ptr<char[]>(mem); + req.buffers._kernel_buffer_size = toallocate; + // We need to reset and do the whole thing against to ensure single shot atomicity + break; + } + else if(_bytes == -1) { return posix_error(); } - done = true; - } + else + { + assert(_bytes > 0); + bytes += _bytes; + } + } while(!done); } while(!done); if(bytes == 0) { diff --git a/include/llfio/v2.0/detail/impl/posix/statfs.ipp b/include/llfio/v2.0/detail/impl/posix/statfs.ipp index a853a418..2e3b0859 100644 --- a/include/llfio/v2.0/detail/impl/posix/statfs.ipp +++ b/include/llfio/v2.0/detail/impl/posix/statfs.ipp @@ -1,5 +1,5 @@ /* Information about the volume storing a file -(C) 2016-2017 Niall Douglas <http://www.nedproductions.biz/> (5 commits) +(C) 2016-2020 Niall Douglas <http://www.nedproductions.biz/> (5 commits) File Created: Jan 2016 @@ -25,10 +25,15 @@ Distributed under the Boost Software License, Version 1.0. #include "../../../handle.hpp" #include "../../../statfs.hpp" +#include <chrono> +#include <mutex> +#include <vector> + #include <sys/mount.h> #ifdef __linux__ #include <mntent.h> #include <sys/statfs.h> +#include <sys/sysmacros.h> #endif LLFIO_V2_NAMESPACE_BEGIN @@ -37,186 +42,189 @@ LLFIO_HEADERS_ONLY_MEMFUNC_SPEC result<size_t> statfs_t::fill(const handle &h, s { size_t ret = 0; #ifdef __linux__ - struct statfs64 s - { - }; - memset(&s, 0, sizeof(s)); - if(-1 == fstatfs64(h.native_handle().fd, &s)) - { - return posix_error(); - } - if(!!(wanted & want::bsize)) + if(!!(wanted & ~(want::iosinprogress | want::iosbusytime))) { - f_bsize = s.f_bsize; - ++ret; - } - if(!!(wanted & want::iosize)) - { - f_iosize = s.f_frsize; - ++ret; - } - if(!!(wanted & want::blocks)) - { - f_blocks = s.f_blocks; - ++ret; - } - if(!!(wanted & want::bfree)) - { - f_bfree = s.f_bfree; - ++ret; - } - if(!!(wanted & want::bavail)) - { - f_bavail = s.f_bavail; - ++ret; - } - if(!!(wanted & want::files)) - { - f_files = s.f_files; - ++ret; - } - if(!!(wanted & want::ffree)) - { - f_ffree = s.f_ffree; - ++ret; - } - if(!!(wanted & want::namemax)) - { - f_namemax = s.f_namelen; - ++ret; - } - // if(!!(wanted&&want::owner)) { f_owner =s.f_owner; ++ret; } - if(!!(wanted & want::fsid)) - { - f_fsid[0] = static_cast<unsigned>(s.f_fsid.__val[0]); - f_fsid[1] = static_cast<unsigned>(s.f_fsid.__val[1]); - ++ret; - } - if(!!(wanted & want::flags) || !!(wanted & want::fstypename) || !!(wanted & want::mntfromname) || !!(wanted & want::mntonname)) - { - try + struct statfs64 s { - struct mountentry - { - std::string mnt_fsname, mnt_dir, mnt_type, mnt_opts; - mountentry(const char *a, const char *b, const char *c, const char *d) - : mnt_fsname(a) - , mnt_dir(b) - , mnt_type(c) - , mnt_opts(d) - { - } - }; - std::vector<std::pair<mountentry, struct statfs64>> mountentries; + }; + memset(&s, 0, sizeof(s)); + if(-1 == fstatfs64(h.native_handle().fd, &s)) + { + return posix_error(); + } + if(!!(wanted & want::bsize)) + { + f_bsize = s.f_bsize; + ++ret; + } + if(!!(wanted & want::iosize)) + { + f_iosize = s.f_frsize; + ++ret; + } + if(!!(wanted & want::blocks)) + { + f_blocks = s.f_blocks; + ++ret; + } + if(!!(wanted & want::bfree)) + { + f_bfree = s.f_bfree; + ++ret; + } + if(!!(wanted & want::bavail)) + { + f_bavail = s.f_bavail; + ++ret; + } + if(!!(wanted & want::files)) + { + f_files = s.f_files; + ++ret; + } + if(!!(wanted & want::ffree)) + { + f_ffree = s.f_ffree; + ++ret; + } + if(!!(wanted & want::namemax)) + { + f_namemax = s.f_namelen; + ++ret; + } + // if(!!(wanted&&want::owner)) { f_owner =s.f_owner; ++ret; } + if(!!(wanted & want::fsid)) + { + f_fsid[0] = static_cast<unsigned>(s.f_fsid.__val[0]); + f_fsid[1] = static_cast<unsigned>(s.f_fsid.__val[1]); + ++ret; + } + if(!!(wanted & want::flags) || !!(wanted & want::fstypename) || !!(wanted & want::mntfromname) || !!(wanted & want::mntonname)) + { + try { - // Need to parse mount options on Linux - FILE *mtab = setmntent("/etc/mtab", "r"); - if(mtab == nullptr) - { - mtab = setmntent("/proc/mounts", "r"); - } - if(mtab == nullptr) - { - return posix_error(); - } - auto unmtab = make_scope_exit([mtab]() noexcept { endmntent(mtab); }); - struct mntent m + struct mountentry { + std::string mnt_fsname, mnt_dir, mnt_type, mnt_opts; + mountentry(const char *a, const char *b, const char *c, const char *d) + : mnt_fsname(a) + , mnt_dir(b) + , mnt_type(c) + , mnt_opts(d) + { + } }; - char buffer[32768]; - while(getmntent_r(mtab, &m, buffer, sizeof(buffer)) != nullptr) + std::vector<std::pair<mountentry, struct statfs64>> mountentries; { - struct statfs64 temp + // Need to parse mount options on Linux + FILE *mtab = setmntent("/etc/mtab", "r"); + if(mtab == nullptr) + { + mtab = setmntent("/proc/mounts", "r"); + } + if(mtab == nullptr) + { + return posix_error(); + } + auto unmtab = make_scope_exit([mtab]() noexcept { endmntent(mtab); }); + struct mntent m { }; - memset(&temp, 0, sizeof(temp)); - // std::cout << m.mnt_fsname << "," << m.mnt_dir << "," << m.mnt_type << "," << m.mnt_opts << std::endl; - if(0 == statfs64(m.mnt_dir, &temp)) + char buffer[32768]; + while(getmntent_r(mtab, &m, buffer, sizeof(buffer)) != nullptr) { - // std::cout << " " << temp.f_fsid.__val[0] << temp.f_fsid.__val[1] << " =? " << s.f_fsid.__val[0] << s.f_fsid.__val[1] << std::endl; - if(temp.f_type == s.f_type && (memcmp(&temp.f_fsid, &s.f_fsid, sizeof(s.f_fsid)) == 0)) + struct statfs64 temp + { + }; + memset(&temp, 0, sizeof(temp)); + // std::cout << m.mnt_fsname << "," << m.mnt_dir << "," << m.mnt_type << "," << m.mnt_opts << std::endl; + if(0 == statfs64(m.mnt_dir, &temp)) { - mountentries.emplace_back(mountentry(m.mnt_fsname, m.mnt_dir, m.mnt_type, m.mnt_opts), temp); + // std::cout << " " << temp.f_fsid.__val[0] << temp.f_fsid.__val[1] << " =? " << s.f_fsid.__val[0] << s.f_fsid.__val[1] << std::endl; + if(temp.f_type == s.f_type && (memcmp(&temp.f_fsid, &s.f_fsid, sizeof(s.f_fsid)) == 0)) + { + mountentries.emplace_back(mountentry(m.mnt_fsname, m.mnt_dir, m.mnt_type, m.mnt_opts), temp); + } } } } - } #ifndef LLFIO_COMPILING_FOR_GCOV - if(mountentries.empty()) - { - return errc::no_such_file_or_directory; - } - /* Choose the mount entry with the most closely matching statfs. You can't choose - exclusively based on mount point because of bind mounts. Note that we have a - particular problem in Docker: + if(mountentries.empty()) + { + return errc::no_such_file_or_directory; + } + /* Choose the mount entry with the most closely matching statfs. You can't choose + exclusively based on mount point because of bind mounts. Note that we have a + particular problem in Docker: - rootfs / rootfs rw 0 0 - overlay / overlay rw,relatime,lowerdir=... 0 0 - /dev/sda3 /etc xfs rw,relatime,... 0 0 - tmpfs /dev tmpfs rw,nosuid,... 0 0 - tmpfs /proc/acpi tmpfs rw,nosuid,... 0 0 - ... + rootfs / rootfs rw 0 0 + overlay / overlay rw,relatime,lowerdir=... 0 0 + /dev/sda3 /etc xfs rw,relatime,... 0 0 + tmpfs /dev tmpfs rw,nosuid,... 0 0 + tmpfs /proc/acpi tmpfs rw,nosuid,... 0 0 + ... - If f_type and f_fsid are identical for the statfs for the mount as for our file, - then you will get multiple mountentries, and there is no obvious way of - disambiguating them. What we do is match mount based on the longest match - of the mount point with the current path of our file. - */ - if(mountentries.size() > 1) - { - OUTCOME_TRY(auto &¤tfilepath_, h.current_path()); - string_view currentfilepath(currentfilepath_.native()); - std::vector<std::pair<size_t, size_t>> scores(mountentries.size()); - //std::cout << "*** For matching mount entries to file with path " << currentfilepath << ":\n"; - for(size_t n = 0; n < mountentries.size(); n++) + If f_type and f_fsid are identical for the statfs for the mount as for our file, + then you will get multiple mountentries, and there is no obvious way of + disambiguating them. What we do is match mount based on the longest match + of the mount point with the current path of our file. + */ + if(mountentries.size() > 1) { - scores[n].first = - (currentfilepath.substr(0, mountentries[n].first.mnt_dir.size()) == mountentries[n].first.mnt_dir) ? mountentries[n].first.mnt_dir.size() : 0; - //std::cout << "*** Mount entry " << mountentries[n].first.mnt_dir << " get score " << scores[n].first << std::endl; - scores[n].second = n; + OUTCOME_TRY(auto &¤tfilepath_, h.current_path()); + string_view currentfilepath(currentfilepath_.native()); + std::vector<std::pair<size_t, size_t>> scores(mountentries.size()); + // std::cout << "*** For matching mount entries to file with path " << currentfilepath << ":\n"; + for(size_t n = 0; n < mountentries.size(); n++) + { + scores[n].first = + (currentfilepath.substr(0, mountentries[n].first.mnt_dir.size()) == mountentries[n].first.mnt_dir) ? mountentries[n].first.mnt_dir.size() : 0; + // std::cout << "*** Mount entry " << mountentries[n].first.mnt_dir << " get score " << scores[n].first << std::endl; + scores[n].second = n; + } + std::sort(scores.begin(), scores.end()); + // Choose the item whose mnt_dir most matched the current path for our file. + auto temp(std::move(mountentries[scores.back().second])); + mountentries.clear(); + mountentries.push_back(std::move(temp)); } - std::sort(scores.begin(), scores.end()); - // Choose the item whose mnt_dir most matched the current path for our file. - auto temp(std::move(mountentries[scores.back().second])); - mountentries.clear(); - mountentries.push_back(std::move(temp)); - } #endif - if(!!(wanted & want::flags)) - { - f_flags.rdonly = static_cast<uint32_t>((s.f_flags & MS_RDONLY) != 0); - f_flags.noexec = static_cast<uint32_t>((s.f_flags & MS_NOEXEC) != 0); - f_flags.nosuid = static_cast<uint32_t>((s.f_flags & MS_NOSUID) != 0); - f_flags.acls = static_cast<uint32_t>(std::string::npos != mountentries.front().first.mnt_opts.find("acl") && - std::string::npos == mountentries.front().first.mnt_opts.find("noacl")); - f_flags.xattr = static_cast<uint32_t>(std::string::npos != mountentries.front().first.mnt_opts.find("xattr") && - std::string::npos == mountentries.front().first.mnt_opts.find("nouser_xattr")); - // out.f_flags.compression=0; - // Those filing systems supporting FALLOC_FL_PUNCH_HOLE - f_flags.extents = static_cast<uint32_t>(mountentries.front().first.mnt_type == "btrfs" || mountentries.front().first.mnt_type == "ext4" || - mountentries.front().first.mnt_type == "xfs" || mountentries.front().first.mnt_type == "tmpfs"); - ++ret; - } - if(!!(wanted & want::fstypename)) - { - f_fstypename = mountentries.front().first.mnt_type; - ++ret; - } - if(!!(wanted & want::mntfromname)) - { - f_mntfromname = mountentries.front().first.mnt_fsname; - ++ret; + if(!!(wanted & want::flags)) + { + f_flags.rdonly = static_cast<uint32_t>((s.f_flags & MS_RDONLY) != 0); + f_flags.noexec = static_cast<uint32_t>((s.f_flags & MS_NOEXEC) != 0); + f_flags.nosuid = static_cast<uint32_t>((s.f_flags & MS_NOSUID) != 0); + f_flags.acls = static_cast<uint32_t>(std::string::npos != mountentries.front().first.mnt_opts.find("acl") && + std::string::npos == mountentries.front().first.mnt_opts.find("noacl")); + f_flags.xattr = static_cast<uint32_t>(std::string::npos != mountentries.front().first.mnt_opts.find("xattr") && + std::string::npos == mountentries.front().first.mnt_opts.find("nouser_xattr")); + // out.f_flags.compression=0; + // Those filing systems supporting FALLOC_FL_PUNCH_HOLE + f_flags.extents = static_cast<uint32_t>(mountentries.front().first.mnt_type == "btrfs" || mountentries.front().first.mnt_type == "ext4" || + mountentries.front().first.mnt_type == "xfs" || mountentries.front().first.mnt_type == "tmpfs"); + ++ret; + } + if(!!(wanted & want::fstypename)) + { + f_fstypename = mountentries.front().first.mnt_type; + ++ret; + } + if(!!(wanted & want::mntfromname)) + { + f_mntfromname = mountentries.front().first.mnt_fsname; + ++ret; + } + if(!!(wanted & want::mntonname)) + { + f_mntonname = mountentries.front().first.mnt_dir; + ++ret; + } } - if(!!(wanted & want::mntonname)) + catch(...) { - f_mntonname = mountentries.front().first.mnt_dir; - ++ret; + return error_from_exception(); } } - catch(...) - { - return error_from_exception(); - } } #else struct statfs s; @@ -311,7 +319,177 @@ LLFIO_HEADERS_ONLY_MEMFUNC_SPEC result<size_t> statfs_t::fill(const handle &h, s ++ret; } #endif + if(!!(wanted & want::iosinprogress) || !!(wanted & want::iosbusytime)) + { + OUTCOME_TRY(auto &&ios, _fill_ios(h, f_mntfromname)); + if(!!(wanted & want::iosinprogress)) + { + f_iosinprogress = ios.first; + ++ret; + } + if(!!(wanted & want::iosbusytime)) + { + f_iosbusytime = ios.second; + ++ret; + } + } return ret; } +/******************************************* statfs_t ************************************************/ + +LLFIO_HEADERS_ONLY_MEMFUNC_SPEC result<std::pair<uint32_t, float>> statfs_t::_fill_ios(const handle &h, const std::string & /*unused*/) noexcept +{ + (void) h; + try + { +#ifdef __linux__ + struct stat s + { + }; + memset(&s, 0, sizeof(s)); + + if(-1 == ::fstat(h.native_handle().fd, &s)) + { + if(!h.is_symlink() || EBADF != errno) + { + return posix_error(); + } + // This is a hack, but symlink_handle includes this first so there is a chicken and egg dependency problem + OUTCOME_TRY(detail::stat_from_symlink(s, h)); + } + + static struct last_reading_t + { + struct item + { + dev_t st_dev; + size_t millis{0}; + std::chrono::steady_clock::time_point last_updated; + + uint32_t f_iosinprogress{0}; + float f_iosbusytime{0}; + }; + std::mutex lock; + std::vector<item> items; + } last_reading; + auto now = std::chrono::steady_clock::now(); + { + std::lock_guard<std::mutex> g(last_reading.lock); + for(auto &i : last_reading.items) + { + if(i.st_dev == s.st_dev) + { + if(std::chrono::duration_cast<std::chrono::milliseconds>(now - i.last_updated) < std::chrono::milliseconds(100)) + { + return {i.f_iosinprogress, i.f_iosbusytime}; // exit with old readings + } + break; + } + } + } + try + { + int fd = ::open("/proc/diskstats", O_RDONLY); + if(fd >= 0) + { + std::string diskstats; + diskstats.resize(4096); + for(;;) + { + auto read = ::read(fd, (char *) diskstats.data(), diskstats.size()); + if(read < 0) + { + return posix_error(); + } + if(read < (ssize_t) diskstats.size()) + { + ::close(fd); + diskstats.resize(read); + break; + } + try + { + diskstats.resize(diskstats.size() << 1); + } + catch(...) + { + ::close(fd); + throw; + } + } + /* Format is (https://www.kernel.org/doc/Documentation/iostats.txt): + <dev id major> <dev id minor> <device name> 01 02 03 04 05 06 07 08 09 10 ... + + Field 9 is i/o's currently in progress. + Field 10 is milliseconds spent doing i/o (cumulative). + */ + auto match_line = [&](string_view sv) { + int major = 0, minor = 0; + sscanf(sv.data(), "%d %d", &major, &minor); + // printf("Does %d,%d match %d,%d?\n", major, minor, major(s.st_dev), minor(s.st_dev)); + return (makedev(major, minor) == s.st_dev); + }; + for(size_t is = 0, ie = diskstats.find(10); ie != diskstats.npos; is = ie + 1, ie = diskstats.find(10, is)) + { + auto sv = string_view(diskstats).substr(is, ie - is); + if(match_line(sv)) + { + int major = 0, minor = 0; + char devicename[64]; + size_t fields[12]; + sscanf(sv.data(), "%d %d %s %zu %zu %zu %zu %zu %zu %zu %zu %zu %zu", &major, &minor, devicename, fields + 0, fields + 1, fields + 2, fields + 3, + fields + 4, fields + 5, fields + 6, fields + 7, fields + 8, fields + 9); + std::lock_guard<std::mutex> g(last_reading.lock); + auto it = last_reading.items.begin(); + for(; it != last_reading.items.end(); ++it) + { + if(it->st_dev == s.st_dev) + { + break; + } + } + if(it == last_reading.items.end()) + { + last_reading.items.emplace_back(); + it = --last_reading.items.end(); + it->st_dev = s.st_dev; + it->millis = fields[9]; + } + else + { + auto timediff = std::chrono::duration_cast<std::chrono::milliseconds>(now - it->last_updated); + it->f_iosbusytime = std::min((float) ((double) (fields[9] - it->millis) / timediff.count()), 1.0f); + it->millis = fields[9]; + } + it->f_iosinprogress = (uint32_t) fields[8]; + it->last_updated = now; + return {it->f_iosinprogress, it->f_iosbusytime}; + } + } + // It's totally possible that the dev_t reported by stat() + // does not appear in /proc/diskstats, if this occurs then + // return all bits one to indicate soft failure. + } + } + catch(...) + { + return error_from_exception(); + } +#else + /* On FreeBSD, want::iosinprogress and want::iosbusytime could be implemented + using libdevstat. See https://www.freebsd.org/cgi/man.cgi?query=devstat&sektion=3. + Code donations welcome! + + On Mac OS, getting the current i/o wait time appears to be privileged only? + */ +#endif + return {-1, detail::constexpr_float_allbits_set_nan()}; + } + catch(...) + { + return error_from_exception(); + } +} + LLFIO_V2_NAMESPACE_END diff --git a/include/llfio/v2.0/detail/impl/windows/directory_handle.ipp b/include/llfio/v2.0/detail/impl/windows/directory_handle.ipp index 60e34e5e..4da58a91 100644 --- a/include/llfio/v2.0/detail/impl/windows/directory_handle.ipp +++ b/include/llfio/v2.0/detail/impl/windows/directory_handle.ipp @@ -28,6 +28,8 @@ http://www.boost.org/LICENSE_1_0.txt) #include "import.hpp" +#include "../../../file_handle.hpp" + LLFIO_V2_NAMESPACE_BEGIN result<directory_handle> directory_handle::directory(const path_handle &base, path_view_type path, mode _mode, creation _creation, caching _caching, flag flags) noexcept diff --git a/include/llfio/v2.0/detail/impl/windows/file_handle.ipp b/include/llfio/v2.0/detail/impl/windows/file_handle.ipp index 610f20b1..5b36810f 100644 --- a/include/llfio/v2.0/detail/impl/windows/file_handle.ipp +++ b/include/llfio/v2.0/detail/impl/windows/file_handle.ipp @@ -25,6 +25,11 @@ Distributed under the Boost Software License, Version 1.0. #include "../../../file_handle.hpp" #include "import.hpp" +#include "../../../statfs.hpp" + +#include <mutex> +#include <vector> + LLFIO_V2_NAMESPACE_BEGIN result<file_handle> file_handle::file(const path_handle &base, file_handle::path_view_type path, file_handle::mode _mode, file_handle::creation _creation, @@ -821,7 +826,7 @@ result<file_handle::extent_pair> file_handle::clone_extents_to(file_handle::exte } done = true; } - //assert(done); + // assert(done); dest_length = destoffset + extent.length; truncate_back_on_failure = false; LLFIO_DEADLINE_TO_TIMEOUT_LOOP(d); @@ -870,4 +875,108 @@ result<file_handle::extent_type> file_handle::zero(file_handle::extent_pair exte return success(); } + +/******************************************* statfs_t ************************************************/ + +LLFIO_HEADERS_ONLY_MEMFUNC_SPEC result<std::pair<uint32_t, float>> statfs_t::_fill_ios(const handle & /*unused*/, const std::string &mntfromname) noexcept +{ + try + { + alignas(8) wchar_t buffer[32769]; + // Firstly open a handle to the volume + OUTCOME_TRY(auto &&volumeh, file_handle::file({}, mntfromname, handle::mode::none, handle::creation::open_existing, handle::caching::only_metadata)); + // Now ask the volume what physical disks it spans + auto *vde = reinterpret_cast<VOLUME_DISK_EXTENTS *>(buffer); + OVERLAPPED ol{}; + memset(&ol, 0, sizeof(ol)); + ol.Internal = static_cast<ULONG_PTR>(-1); + if(DeviceIoControl(volumeh.native_handle().h, IOCTL_VOLUME_GET_VOLUME_DISK_EXTENTS, nullptr, 0, vde, sizeof(buffer), nullptr, &ol) == 0) + { + if(ERROR_IO_PENDING == GetLastError()) + { + NTSTATUS ntstat = ntwait(volumeh.native_handle().h, ol, deadline()); + if(ntstat != 0) + { + return ntkernel_error(ntstat); + } + } + if(ERROR_SUCCESS != GetLastError()) + { + return win32_error(); + } + } + static struct last_reading_t + { + struct item + { + int64_t ReadTime{0}, WriteTime{0}, IdleTime{0}; + }; + std::mutex lock; + std::vector<item> items; + } last_reading; + + uint32_t iosinprogress = 0; + float iosbusytime = 0; + DWORD disk_extents = vde->NumberOfDiskExtents; + for(DWORD disk_extent = 0; disk_extent < disk_extents; disk_extent++) + { + alignas(8) wchar_t physicaldrivename[32] = L"\\\\.\\PhysicalDrive", *e = physicaldrivename + 17; + const auto DiskNumber = vde->Extents[disk_extent].DiskNumber; + if(DiskNumber >= 100) + { + *e++ = '0' + ((DiskNumber / 100) % 10); + } + if(DiskNumber >= 10) + { + *e++ = '0' + ((DiskNumber / 10) % 10); + } + *e++ = '0' + (DiskNumber % 10); + *e = 0; + OUTCOME_TRY(auto &&diskh, file_handle::file({}, path_view(physicaldrivename, e - physicaldrivename, path_view::zero_terminated), handle::mode::none, + handle::creation::open_existing, handle::caching::only_metadata)); + ol.Internal = static_cast<ULONG_PTR>(-1); + auto *dp = reinterpret_cast<DISK_PERFORMANCE *>(buffer); + if(DeviceIoControl(diskh.native_handle().h, IOCTL_DISK_PERFORMANCE, nullptr, 0, dp, sizeof(buffer), nullptr, &ol) == 0) + { + if(ERROR_IO_PENDING == GetLastError()) + { + NTSTATUS ntstat = ntwait(diskh.native_handle().h, ol, deadline()); + if(ntstat != 0) + { + return ntkernel_error(ntstat); + } + } + if(ERROR_SUCCESS != GetLastError()) + { + return win32_error(); + } + } + //printf("%llu,%llu,%llu\n", dp->ReadTime.QuadPart, dp->WriteTime.QuadPart, dp->IdleTime.QuadPart); + iosinprogress += dp->QueueDepth; + std::lock_guard<std::mutex> g(last_reading.lock); + if(last_reading.items.size() < DiskNumber + 1) + { + last_reading.items.resize(DiskNumber + 1); + } + else + { + uint64_t rd = (uint64_t) dp->ReadTime.QuadPart - (uint64_t) last_reading.items[DiskNumber].ReadTime; + uint64_t wd = (uint64_t) dp->WriteTime.QuadPart - (uint64_t) last_reading.items[DiskNumber].WriteTime; + uint64_t id = (uint64_t) dp->IdleTime.QuadPart - (uint64_t) last_reading.items[DiskNumber].IdleTime; + iosbusytime += 1 - (float) ((double) id / (rd + wd + id)); + } + last_reading.items[DiskNumber].ReadTime = dp->ReadTime.QuadPart; + last_reading.items[DiskNumber].WriteTime = dp->WriteTime.QuadPart; + last_reading.items[DiskNumber].IdleTime = dp->IdleTime.QuadPart; + } + iosinprogress /= disk_extents; + iosbusytime /= disk_extents; + return {iosinprogress, std::min(iosbusytime, 1.0f)}; + } + catch(...) + { + return error_from_exception(); + } +} + LLFIO_V2_NAMESPACE_END diff --git a/include/llfio/v2.0/detail/impl/windows/statfs.ipp b/include/llfio/v2.0/detail/impl/windows/statfs.ipp index 8b23b4e1..a712433b 100644 --- a/include/llfio/v2.0/detail/impl/windows/statfs.ipp +++ b/include/llfio/v2.0/detail/impl/windows/statfs.ipp @@ -141,6 +141,13 @@ LLFIO_HEADERS_ONLY_MEMFUNC_SPEC result<size_t> statfs_t::fill(const handle &h, s f_iosize = ffssi->PhysicalBytesPerSectorForPerformance; ++ret; } + if(!!(wanted & want::iosinprogress) || !!(wanted & want::iosbusytime)) + { + if(f_mntfromname.empty()) + { + wanted |= want::mntfromname; + } + } if((wanted & want::mntfromname) || (wanted & want::mntonname)) { // Irrespective we need the path before figuring out the mounted device @@ -240,6 +247,20 @@ LLFIO_HEADERS_ONLY_MEMFUNC_SPEC result<size_t> statfs_t::fill(const handle &h, s break; } } + if(!!(wanted & want::iosinprogress) || !!(wanted & want::iosbusytime)) + { + OUTCOME_TRY(auto &&ios, _fill_ios(h, f_mntfromname)); + if(!!(wanted & want::iosinprogress)) + { + f_iosinprogress = ios.first; + ++ret; + } + if(!!(wanted & want::iosbusytime)) + { + f_iosbusytime = ios.second; + ++ret; + } + } return ret; } diff --git a/include/llfio/v2.0/dynamic_thread_pool_group.hpp b/include/llfio/v2.0/dynamic_thread_pool_group.hpp new file mode 100644 index 00000000..09828a6e --- /dev/null +++ b/include/llfio/v2.0/dynamic_thread_pool_group.hpp @@ -0,0 +1,530 @@ +/* Dynamic thread pool group +(C) 2020-2021 Niall Douglas <http://www.nedproductions.biz/> (9 commits) +File Created: Dec 2020 + + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License in the accompanying file +Licence.txt or at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + + +Distributed under the Boost Software License, Version 1.0. + (See accompanying file Licence.txt or copy at + http://www.boost.org/LICENSE_1_0.txt) +*/ + +#ifndef LLFIO_DYNAMIC_THREAD_POOL_GROUP_H +#define LLFIO_DYNAMIC_THREAD_POOL_GROUP_H + +#include "deadline.h" + +#include <memory> // for unique_ptr and shared_ptr + +#ifdef _MSC_VER +#pragma warning(push) +#pragma warning(disable : 4251) // dll interface +#pragma warning(disable : 4275) // dll interface +#endif + +LLFIO_V2_NAMESPACE_EXPORT_BEGIN + +class dynamic_thread_pool_group_impl; +class io_handle; + +namespace detail +{ + struct global_dynamic_thread_pool_impl; + struct global_dynamic_thread_pool_impl_workqueue_item; + LLFIO_HEADERS_ONLY_FUNC_SPEC global_dynamic_thread_pool_impl &global_dynamic_thread_pool() noexcept; +} // namespace detail + +/*! \class dynamic_thread_pool_group +\brief Work group within the global dynamic thread pool. + +Some operating systems provide a per-process global kernel thread pool capable of +dynamically adjusting its kernel thread count to how many of the threads in +the pool are currently blocked. The platform will choose the exact strategy used, +but as an example of a strategy, one might keep creating new kernel threads +so long as the total threads currently running and not blocked on page faults, +i/o or syscalls, is below the hardware concurrency. Similarly, if more threads +are running and not blocked than hardware concurrency, one might remove kernel +threads from executing work. Such a strategy would dynamically increase +concurrency until all CPUs are busy, but reduce concurrency if more work is +being done than CPUs available. + +Such dynamic kernel thread pools are excellent for CPU bound processing, you +simply fire and forget work into them. However, for i/o bound processing, you +must be careful as there are gotchas. For non-seekable i/o, it is very possible +that there could be 100k handles upon which we do i/o. Doing i/o on +100k handles using a dynamic thread pool would in theory cause the creation +of 100k kernel threads, which would not be wise. A much better solution is +to use an `io_multiplexer` to await changes in large sets of i/o handles. + +For seekable i/o, the same problem applies, but worse again: an i/o bound problem +would cause a rapid increase in the number of kernel threads, which by +definition makes i/o even more congested. Basically the system runs off +into pathological performance loss. You must therefore never naively do +i/o bound work (e.g. with memory mapped files) from within a dynamic thread +pool without employing some mechanism to force concurrency downwards if +the backing storage is congested. + +## Work groups + +Instances of this class contain zero or more work items. Each work item +is asked for its next item of work, and if an item of work is available, +that item of work is executed by the global kernel thread pool at a time +of its choosing. It is NEVER possible that any one work item is concurrently +executed at a time, each work item is always sequentially executed with +respect to itself. The only concurrency possible is *across* work items. +Therefore, if you want to execute the same piece of code concurrently, +you need to submit a separate work item for each possible amount of +concurrency (e.g. `std::thread::hardware_concurrency()`). + +You can have as many or as few items of work as you like. You can +dynamically submit additional work items at any time, except when a group +is currently in the process of being stopped. The group of work items can +be waited upon to complete, after which the work group becomes reset as +if back to freshly constructed. You can also stop executing all the work +items in the group, even if they have not fully completed. If any work +item returns a failure, this equals a `stop()`, and the next `wait()` will +return that error. + +Work items may create sub work groups as part of their +operation. If they do so, the work items from such nested work groups are +scheduled preferentially. This ensures good forward progress, so if you +have 100 work items each of which do another 100 work items, you don't get +10,000 slowly progressing work. Rather, the work items in the first set +progress slowly, whereas the work items in the second set progress quickly. + +`work_item::next()` may optionally set a deadline to delay when that work +item ought to be processed again. Deadlines can be relative or absolute. + +## C++ 23 Executors + +As with elsewhere in LLFIO, as a low level facility, we don't implement +https://wg21.link/P0443 Executors, but it is trivially easy to implement +a dynamic equivalent to `std::static_thread_pool` using this class. + +## Implementation notes + +### Microsoft Windows + +On Microsoft Windows, the Win32 thread pool API is used (https://docs.microsoft.com/en-us/windows/win32/procthread/thread-pool-api). +This is an IOCP-aware thread pool which will dynamically increase the number +of kernel threads until none are blocked. If more kernel threads +are running than twice the number of CPUs in the system, the number of kernel +threads is dynamically reduced. The maximum number of kernel threads which +will run simultaneously is 500. Note that the Win32 thread pool is shared +across the process by multiple Windows facilities. + +Note that the Win32 thread pool has built in support for IOCP, so if you +have a custom i/o multiplexer, you can use the global Win32 thread pool +to execute i/o completions handling. See `CreateThreadpoolIo()` for more. + +No dynamic memory allocation is performed by this implementation outside +of the initial `make_dynamic_thread_pool_group()`. The Win32 thread pool +API may perform dynamic memory allocation internally, but that is outside +our control. + +Overhead of LLFIO above the Win32 thread pool API is very low, statistically +unmeasurable. + +### POSIX + +If not on Linux, you will need libdispatch which is detected by LLFIO cmake +during configuration. libdispatch is better known as +Grand Central Dispatch, originally a Mac OS technology but since ported +to a high quality kernel based implementation on recent FreeBSDs, and to +a lower quality userspace based implementation on Linux. Generally +libdispatch should get automatically found on Mac OS without additional +effort; on FreeBSD it may need installing from ports; on Linux you would +need to explicitly install `libdispatch-dev` or the equivalent. You can +force the use in cmake of libdispatch by setting the cmake variable +`LLFIO_USE_LIBDISPATCH` to On. + +Overhead of LLFIO above the libdispatch API is very low, statistically +unmeasurable. + +### Linux + +On Linux only, we have a custom userspace implementation with superior performance. +A similar strategy to Microsoft Windows' approach is used. We +dynamically increase the number of kernel threads until none are sleeping +awaiting i/o. If more kernel threads are running than three more than the number of +CPUs in the system, the number of kernel threads is dynamically reduced. +Note that **all** the kernel threads for the current process are considered, +not just the kernel threads created by this thread pool implementation. +Therefore, if you have alternative thread pool implementations (e.g. OpenMP, +`std::async`), those are also included in the dynamic adjustment. + +As this is wholly implemented by this library, dynamic memory allocation +occurs in the initial `make_dynamic_thread_pool_group()` and per thread +creation, but otherwise the implementation does not perform dynamic memory +allocations. + +After multiple rewrites, eventually I got this custom userspace implementation +to have superior performance to both ASIO and libdispatch. For larger work +items the difference is meaningless between all three, however for smaller +work items I benchmarked this custom userspace implementation as beating +(non-dynamic) ASIO by approx 29% and Linux libdispatch by approx 52% (note +that Linux libdispatch appears to have a scale up bug when work items are +small and few, it is often less than half the performance of LLFIO's custom +implementation). +*/ +class LLFIO_DECL dynamic_thread_pool_group +{ + friend class dynamic_thread_pool_group_impl; +public: + //! An individual item of work within the work group. + class work_item + { + friend struct detail::global_dynamic_thread_pool_impl; + friend struct detail::global_dynamic_thread_pool_impl_workqueue_item; + friend class dynamic_thread_pool_group_impl; + std::atomic<dynamic_thread_pool_group_impl *> _parent{nullptr}; + void *_internalworkh{nullptr}; + void *_internaltimerh{nullptr}; // lazily created if next() ever returns a deadline + work_item *_prev{nullptr}, *_next{nullptr}, *_next_scheduled{nullptr}; + std::atomic<intptr_t> _nextwork{-1}; + std::chrono::steady_clock::time_point _timepoint1; + std::chrono::system_clock::time_point _timepoint2; + int _internalworkh_inuse{0}; + + protected: + constexpr bool _has_timer_set_relative() const noexcept { return _timepoint1 != std::chrono::steady_clock::time_point(); } + constexpr bool _has_timer_set_absolute() const noexcept { return _timepoint2 != std::chrono::system_clock::time_point(); } + constexpr bool _has_timer_set() const noexcept { return _has_timer_set_relative() || _has_timer_set_absolute(); } + + constexpr work_item() {} + work_item(const work_item &o) = delete; + work_item(work_item &&o) noexcept + : _parent(o._parent.load(std::memory_order_relaxed)) + , _internalworkh(o._internalworkh) + , _internaltimerh(o._internaltimerh) + , _prev(o._prev) + , _next(o._next) + , _next_scheduled(o._next_scheduled) + , _nextwork(o._nextwork.load(std::memory_order_relaxed)) + , _timepoint1(o._timepoint1) + , _timepoint2(o._timepoint2) + , _internalworkh_inuse(o._internalworkh_inuse) + { + assert(o._parent.load(std::memory_order_relaxed) == nullptr); + assert(o._internalworkh == nullptr); + assert(o._internaltimerh == nullptr); + if(o._parent.load(std::memory_order_relaxed) != nullptr || o._internalworkh != nullptr) + { + LLFIO_LOG_FATAL(this, "FATAL: dynamic_thread_pool_group::work_item was relocated in memory during use!"); + abort(); + } + o._prev = o._next = o._next_scheduled = nullptr; + o._nextwork.store(-1, std::memory_order_relaxed); + o._internalworkh_inuse = 0; + } + work_item &operator=(const work_item &) = delete; + work_item &operator=(work_item &&) = delete; + + public: + virtual ~work_item() + { + assert(_nextwork.load(std::memory_order_relaxed) == -1); + if(_nextwork.load(std::memory_order_relaxed) != -1) + { + LLFIO_LOG_FATAL(this, "FATAL: dynamic_thread_pool_group::work_item destroyed before all work was done!"); + abort(); + } + assert(_internalworkh == nullptr); + assert(_internaltimerh == nullptr); + assert(_parent == nullptr); + if(_internalworkh != nullptr || _parent != nullptr) + { + LLFIO_LOG_FATAL(this, "FATAL: dynamic_thread_pool_group::work_item destroyed before group_complete() was executed!"); + abort(); + } + } + + //! Returns the parent work group between successful submission and just before `group_complete()`. + dynamic_thread_pool_group *parent() const noexcept { return reinterpret_cast<dynamic_thread_pool_group *>(_parent.load(std::memory_order_relaxed)); } + + /*! Invoked by the i/o thread pool to determine if this work item + has more work to do. + + \return If there is no work _currently_ available to do, but there + might be some later, you should return zero. You will be called again + later after other work has been done. If you return -1, you are + saying that no further work will be done, and the group need never + call you again. If you have more work you want to do, return any + other value. + \param d Optional delay before the next item of work ought to be + executed (return != 0), or `next()` ought to be called again to + determine the next item (return == 0). On entry `d` is set to no + delay, so if you don't modify it, the next item of work occurs + as soon as possible. + + Note that this function is called from multiple kernel threads. + You must NOT do any significant work in this function. + In particular do NOT call any dynamic thread pool group function, + as you will experience deadlock. + + `dynamic_thread_pool_group::current_work_item()` may have any + value during this call. + */ + virtual intptr_t next(deadline &d) noexcept = 0; + + /*! Invoked by the i/o thread pool to perform the next item of work. + + \return Any failure causes all remaining work in this group to + be cancelled as soon as possible. + \param work The value returned by `next()`. + + Note that this function is called from multiple kernel threads, + and may not be the kernel thread from which `next()` + was called. + + `dynamic_thread_pool_group::current_work_item()` will always be + `this` during this call. + */ + virtual result<void> operator()(intptr_t work) noexcept = 0; + + /*! Invoked by the i/o thread pool when all work in this thread + pool group is complete. + + `cancelled` indicates if this is an abnormal completion. If its + error compares equal to `errc::operation_cancelled`, then `stop()` + was called. + + Just before this is called for all work items submitted, the group + becomes reset to fresh, and `parent()` becomes null. You can resubmit + this work item, but do not submit other work items until their + `group_complete()` has been invoked. + + Note that this function is called from multiple kernel threads. + + `dynamic_thread_pool_group::current_work_item()` may have any + value during this call. + */ + virtual void group_complete(const result<void> &cancelled) noexcept { (void) cancelled; } + }; + + /*! \class io_aware_work_item + \brief A work item which paces when it next executes according to i/o congestion. + + Currently there is only a working implementation of this for the Microsoft Windows + and Linux platforms, due to lack of working `statfs_t::f_iosinprogress` on other + platforms. If retrieving that for a seekable handle does not work, the constructor + throws an exception. + + For seekable handles, currently `reads`, `writes` and `barriers` are ignored. We + simply retrieve, periodically, `statfs_t::f_iosinprogress` and `statfs_t::f_iosbusytime` + for the storage devices backing the seekable handle. If the recent averaged i/o wait time exceeds + `max_iosbusytime` and the i/o in progress > `max_iosinprogress`, `next()` will + start setting the default deadline passed to + `io_aware_next()`. Thereafter, every 1/10th of a second, if `statfs_t::f_iosinprogress` + is above `max_iosinprogress`, it will increase the deadline by 1/16th, whereas if it is + below `min_iosinprogress`, it will decrease the deadline by 1/16th. The default deadline + chosen is always the worst of all the + storage devices of all the handles. This will reduce concurrency within the kernel thread pool + in order to reduce congestion on the storage devices. If at any point `statfs_t::f_iosbusytime` + drops below `max_iosbusytime` as averaged across one second, and `statfs_t::f_iosinprogress` drops + below `min_iosinprogress`, the additional + throttling is completely removed. `io_aware_next()` can ignore the default deadline + passed into it, and can set any other deadline. + + For non-seekable handles, the handle must have an i/o multiplexer set upon it, and on + Microsoft Windows, that i/o multiplexer must be utilising the IOCP instance of the + global Win32 thread pool. For each `reads`, `writes` and `barriers` which is non-zero, + a corresponding zero length i/o is constructed and initiated. When the i/o completes, + and all readable handles in the work item's set have data waiting to be read, and all + writable handles in the work item's set have space to allow writes, only then is the + work item invoked with the next piece of work. + + \note Non-seekable handle support is not implemented yet. + */ + class LLFIO_DECL io_aware_work_item : public work_item + { + public: + //! Maximum i/o busyness above which throttling is to begin. + float max_iosbusytime{0.95f}; + //! Minimum i/o in progress to target if `iosbusytime` exceeded. The default of 16 suits SSDs, you want around 4 for spinning rust or NV-RAM. + uint32_t min_iosinprogress{16}; + //! Maximum i/o in progress to target if `iosbusytime` exceeded. The default of 32 suits SSDs, you want around 8 for spinning rust or NV-RAM. +#ifdef _WIN32 + uint32_t max_iosinprogress{1}; // windows appears to do a lot of i/o coalescing +#else + uint32_t max_iosinprogress{32}; +#endif + //! Information about an i/o handle this work item will use + struct io_handle_awareness + { + //! An i/o handle this work item will use + io_handle *h{nullptr}; + //! The relative amount of reading done by this work item from the handle. + float reads{0}; + //! The relative amount of writing done by this work item to the handle. + float writes{0}; + //! The relative amount of write barriering done by this work item to the handle. + float barriers{0}; + + void *_internal{nullptr}; + }; + + private: + const span<io_handle_awareness> _handles; + + LLFIO_HEADERS_ONLY_VIRTUAL_SPEC intptr_t next(deadline &d) noexcept override final; + + public: + constexpr io_aware_work_item() {} + /*! \brief Constructs a work item aware of i/o done to the handles in `hs`. + + Note that the `reads`, `writes` and `barriers` are normalised to proportions + out of `1.0` by this constructor, so if for example you had `reads/writes/barriers = 200/100/0`, + after normalisation those become `0.66/0.33/0.0` such that the total is `1.0`. + If `reads/writes/barriers = 0/0/0` on entry, they are replaced with `0.5/0.5/0.0`. + + Note that normalisation is across *all* i/o handles in the set, so three handles + each with `reads/writes/barriers = 200/100/0` on entry would have `0.22/0.11/0.0` + each after construction. + */ + explicit LLFIO_HEADERS_ONLY_MEMFUNC_SPEC io_aware_work_item(span<io_handle_awareness> hs); + io_aware_work_item(io_aware_work_item &&o) noexcept + : work_item(std::move(o)) + , _handles(o._handles) + { + } + LLFIO_HEADERS_ONLY_MEMFUNC_SPEC ~io_aware_work_item(); + + //! The handles originally registered during construction. + span<io_handle_awareness> handles() const noexcept { return _handles; } + + /*! \brief As for `work_item::next()`, but deadline may be extended to + reduce i/o congestion on the hardware devices to which the handles + refer. + */ + virtual intptr_t io_aware_next(deadline &d) noexcept = 0; + }; + + virtual ~dynamic_thread_pool_group() {} + + /*! \brief A textual description of the underlying implementation of + this dynamic thread pool group. + + The current possible underlying implementations are: + + - "Grand Central Dispatch" (Mac OS, FreeBSD, Linux) + - "Linux native" (Linux) + - "Win32 thread pool (Vista+)" (Windows) + + Which one is chosen depends on what was detected at cmake configure time, + and possibly what the host OS running the program binary supports. + */ + static LLFIO_HEADERS_ONLY_MEMFUNC_SPEC const char *implementation_description() noexcept; + + /*! \brief Threadsafe. Submit one or more work items for execution. Note that you can submit more later. + + Note that if the group is currently stopping, you cannot submit more + work until the group has stopped. An error code comparing equal to + `errc::operation_canceled` is returned if you try. + */ + virtual result<void> submit(span<work_item *> work) noexcept = 0; + //! \overload + result<void> submit(work_item *wi) noexcept { return submit(span<work_item *>(&wi, 1)); } + //! \overload + LLFIO_TEMPLATE(class T) + LLFIO_TREQUIRES(LLFIO_TPRED(!std::is_pointer<T>::value), LLFIO_TPRED(std::is_base_of<work_item, T>::value)) + result<void> submit(span<T> wi) noexcept + { + auto *wis = (T **) alloca(sizeof(T *) * wi.size()); + for(size_t n = 0; n < wi.size(); n++) + { + wis[n] = &wi[n]; + } + return submit(span<work_item *>((work_item **) wis, wi.size())); + } + + //! Threadsafe. Cancel any remaining work previously submitted, but without blocking (use `wait()` to block). + virtual result<void> stop() noexcept = 0; + + /*! \brief Threadsafe. True if a work item reported an error, or + `stop()` was called, but work items are still running. + */ + virtual bool stopping() const noexcept = 0; + + //! Threadsafe. True if all the work previously submitted is complete. + virtual bool stopped() const noexcept = 0; + + //! Threadsafe. Wait for work previously submitted to complete, returning any failures by any work item. + virtual result<void> wait(deadline d = {}) const noexcept = 0; + //! \overload + template <class Rep, class Period> result<bool> wait_for(const std::chrono::duration<Rep, Period> &duration) const noexcept + { + auto r = wait(duration); + if(!r && r.error() == errc::timed_out) + { + return false; + } + OUTCOME_TRY(std::move(r)); + return true; + } + //! \overload + template <class Clock, class Duration> result<bool> wait_until(const std::chrono::time_point<Clock, Duration> &timeout) const noexcept + { + auto r = wait(timeout); + if(!r && r.error() == errc::timed_out) + { + return false; + } + OUTCOME_TRY(std::move(r)); + return true; + } + + //! Returns the work item nesting level which would be used if a new dynamic thread pool group were created within the current work item. + static LLFIO_HEADERS_ONLY_MEMFUNC_SPEC size_t current_nesting_level() noexcept; + //! Returns the work item the calling thread is running within, if any. + static LLFIO_HEADERS_ONLY_MEMFUNC_SPEC work_item *current_work_item() noexcept; + /*! \brief Returns the number of milliseconds that a thread is without work before it is shut down. + Note that this will be zero on all but on Linux if using our local thread pool + implementation, because the system controls this value on Windows, Grand Central + Dispatch etc. + */ + static LLFIO_HEADERS_ONLY_MEMFUNC_SPEC uint32_t ms_sleep_for_more_work() noexcept; + /*! \brief Sets the number of milliseconds that a thread is without work before it is shut down, + returning the value actually set. + + Note that this will have no effect (and thus return zero) on all but on Linux if + using our local thread pool implementation, because the system controls this value + on Windows, Grand Central Dispatch etc. + */ + static LLFIO_HEADERS_ONLY_MEMFUNC_SPEC uint32_t ms_sleep_for_more_work(uint32_t v) noexcept; +}; +//! A unique ptr to a work group within the global dynamic thread pool. +using dynamic_thread_pool_group_ptr = std::unique_ptr<dynamic_thread_pool_group>; + +//! Creates a new work group within the global dynamic thread pool. +LLFIO_HEADERS_ONLY_FUNC_SPEC result<dynamic_thread_pool_group_ptr> make_dynamic_thread_pool_group() noexcept; + +// BEGIN make_free_functions.py +// END make_free_functions.py + +LLFIO_V2_NAMESPACE_END + +#ifdef _MSC_VER +#pragma warning(pop) +#endif + +#if LLFIO_HEADERS_ONLY == 1 && !defined(DOXYGEN_SHOULD_SKIP_THIS) +#define LLFIO_INCLUDED_BY_HEADER 1 +#include "detail/impl/dynamic_thread_pool_group.ipp" +#undef LLFIO_INCLUDED_BY_HEADER +#endif + +#endif diff --git a/include/llfio/v2.0/fs_handle.hpp b/include/llfio/v2.0/fs_handle.hpp index 8700a87b..1be9bdaf 100644 --- a/include/llfio/v2.0/fs_handle.hpp +++ b/include/llfio/v2.0/fs_handle.hpp @@ -144,6 +144,8 @@ public: using path_view_type = path_view; //! The unique identifier type used by this handle using unique_id_type = QUICKCPPLIB_NAMESPACE::integers128::uint128; + //! A hasher for the unique identifier type used by this handle + using unique_id_type_hasher = QUICKCPPLIB_NAMESPACE::integers128::uint128_hasher; protected: mutable dev_t _devid{0}; diff --git a/include/llfio/v2.0/llfio.hpp b/include/llfio/v2.0/llfio.hpp index 88bfad2b..a7c23b27 100644 --- a/include/llfio/v2.0/llfio.hpp +++ b/include/llfio/v2.0/llfio.hpp @@ -63,13 +63,16 @@ import LLFIO_MODULE_NAME; #include "utils.hpp" #include "directory_handle.hpp" +#ifndef LLFIO_EXCLUDE_DYNAMIC_THREAD_POOL_GROUP +#include "dynamic_thread_pool_group.hpp" +#endif +#include "fast_random_file_handle.hpp" #include "file_handle.hpp" #include "process_handle.hpp" #include "statfs.hpp" #ifdef LLFIO_INCLUDE_STORAGE_PROFILE #include "storage_profile.hpp" #endif -#include "fast_random_file_handle.hpp" #include "symlink_handle.hpp" #include "algorithm/clone.hpp" diff --git a/include/llfio/v2.0/logging.hpp b/include/llfio/v2.0/logging.hpp index 1dff759a..7ef30363 100644 --- a/include/llfio/v2.0/logging.hpp +++ b/include/llfio/v2.0/logging.hpp @@ -77,7 +77,7 @@ public: { reinterpret_cast<log_level &>(detail::thread_local_log_level()) = n; } - ~log_level_guard() {reinterpret_cast<log_level &>(detail::thread_local_log_level()) = _v; } + ~log_level_guard() { reinterpret_cast<log_level &>(detail::thread_local_log_level()) = _v; } }; // Infrastructure for recording the current path for when failure occurs @@ -262,8 +262,9 @@ namespace detail return span<char>(buffer, length); } // Strips a __PRETTY_FUNCTION__ of all instances of ::LLFIO_V2_NAMESPACE:: and ::LLFIO_V2_NAMESPACE:: - inline void strip_pretty_function(char *out, size_t bytes, const char *in) + inline void strip_pretty_function(char *_out, size_t bytes, const char *in) { + char *out = _out; const span<char> remove1 = llfio_namespace_string(); const span<char> remove2 = outcome_namespace_string(); for(--bytes; bytes && *in; --bytes) @@ -272,6 +273,32 @@ namespace detail in += remove1.size(); if(!strncmp(in, remove2.data(), remove2.size())) in += remove2.size(); + if(!strncmp(in, "basic_result<", 13)) + { + int count = 13; + for(--bytes; bytes && *in && count; --bytes, --count) + { + *out++ = *in++; + } + if(!*in || bytes ==0) + { + break; + } + count = 1; + while(*in && count > 0) + { + if(*in == '<') + { + count++; + } + else if(*in == '>') + { + count--; + } + in++; + } + in--; + } *out++ = *in++; } *out = 0; diff --git a/include/llfio/v2.0/statfs.hpp b/include/llfio/v2.0/statfs.hpp index 71732da5..c8af2fde 100644 --- a/include/llfio/v2.0/statfs.hpp +++ b/include/llfio/v2.0/statfs.hpp @@ -1,5 +1,5 @@ /* Information about the volume storing a file -(C) 2015-2017 Niall Douglas <http://www.nedproductions.biz/> (8 commits) +(C) 2015-2020 Niall Douglas <http://www.nedproductions.biz/> (8 commits) File Created: Dec 2015 @@ -41,13 +41,40 @@ LLFIO_V2_NAMESPACE_EXPORT_BEGIN class handle; +namespace detail +{ + inline constexpr float constexpr_float_allbits_set_nan() + { +#if defined(_MSC_VER) && !defined(__clang__) + // Not all bits 1, but I can't see how to do better whilst inside constexpr + return -NAN; +#else + return -__builtin_nanf("0xffffff"); // all bits 1 +#endif + } +} // namespace detail + /*! \struct statfs_t \brief Metadata about a filing system. Unsupported entries are all bits set. + +Note also that for some fields, a soft failure to read the requested value manifests +as all bits set. For example, `f_iosinprogress` might not be computable if the +filing system for your handle reports a `dev_t` from `fstat()` which does not +match anything in the system's disk hardware i/o stats. As this can be completely +benign (e.g. your handle is a socket), this is treated as a soft failure. + +Note for `f_iosinprogress` and `f_iosbusytime` that support is not implemented yet +outside Microsoft Windows and Linux. Note also that for Linux, filing systems +spanning multiple hardware devices have undefined outcomes, whereas on Windows +you are given the average of the values for all underlying hardware devices. +Code donations improving the support for these items on Mac OS, FreeBSD and Linux +would be welcomed. */ struct LLFIO_DECL statfs_t { static constexpr uint32_t _allbits1_32 = ~0U; static constexpr uint64_t _allbits1_64 = ~0ULL; + static constexpr float _allbits1_float = detail::constexpr_float_allbits_set_nan(); struct f_flags_t { uint32_t rdonly : 1; //!< Filing system is read only (Windows, POSIX) @@ -75,11 +102,31 @@ struct LLFIO_DECL statfs_t std::string f_mntfromname; /*!< mounted filesystem (Windows, POSIX) */ filesystem::path f_mntonname; /*!< directory on which mounted (Windows, POSIX) */ + uint32_t f_iosinprogress{_allbits1_32}; /*!< i/o's currently in progress (i.e. queue depth) (Windows, Linux) */ + float f_iosbusytime{_allbits1_float}; /*!< percentage of time spent doing i/o (1.0 = 100%) (Windows, Linux) */ + //! Used to indicate what metadata should be filled in - QUICKCPPLIB_BITFIELD_BEGIN(want) { flags = 1 << 0, bsize = 1 << 1, iosize = 1 << 2, blocks = 1 << 3, bfree = 1 << 4, bavail = 1 << 5, files = 1 << 6, ffree = 1 << 7, namemax = 1 << 8, owner = 1 << 9, fsid = 1 << 10, fstypename = 1 << 11, mntfromname = 1 << 12, mntonname = 1 << 13, all = static_cast<unsigned>(-1) } - QUICKCPPLIB_BITFIELD_END(want) + QUICKCPPLIB_BITFIELD_BEGIN(want){flags = 1 << 0, + bsize = 1 << 1, + iosize = 1 << 2, + blocks = 1 << 3, + bfree = 1 << 4, + bavail = 1 << 5, + files = 1 << 6, + ffree = 1 << 7, + namemax = 1 << 8, + owner = 1 << 9, + fsid = 1 << 10, + fstypename = 1 << 11, + mntfromname = 1 << 12, + mntonname = 1 << 13, + iosinprogress = 1 << 14, + iosbusytime = 1 << 15, + all = static_cast<unsigned>(-1)} QUICKCPPLIB_BITFIELD_END(want) //! Constructs a default initialised instance (all bits set) - statfs_t() {} // NOLINT Cannot be constexpr due to lack of constexpe string default constructor :( + statfs_t() + { + } // NOLINT Cannot be constexpr due to lack of constexpe string default constructor :( #ifdef __cpp_exceptions //! Constructs a filled instance, throwing as an exception any error which might occur explicit statfs_t(const handle &h, want wanted = want::all) @@ -94,6 +141,10 @@ struct LLFIO_DECL statfs_t #endif //! Fills in the structure with metadata, returning number of items filled in LLFIO_HEADERS_ONLY_MEMFUNC_SPEC result<size_t> fill(const handle &h, want wanted = want::all) noexcept; + +private: + // Implemented in file_handle.ipp on Windows, otherwise in statfs.ipp + static LLFIO_HEADERS_ONLY_MEMFUNC_SPEC result<std::pair<uint32_t, float>> _fill_ios(const handle &h, const std::string &mntfromname) noexcept; }; LLFIO_V2_NAMESPACE_END diff --git a/include/llfio/v2.0/status_code.hpp b/include/llfio/v2.0/status_code.hpp index 80fd905d..03eb0f67 100644 --- a/include/llfio/v2.0/status_code.hpp +++ b/include/llfio/v2.0/status_code.hpp @@ -25,8 +25,6 @@ Distributed under the Boost Software License, Version 1.0. #ifndef LLFIO_STATUS_CODE_HPP #define LLFIO_STATUS_CODE_HPP -#include "logging.hpp" - /* The SG14 status code implementation is quite profoundly different to the error code implementation. In the error code implementation, std::error_code is fixed by the standard library, so we wrap it with extra metadata into @@ -68,6 +66,8 @@ as that (a) enables safe header only LLFIO on Windows (b) produces better codege #error LLFIO needs Outcome v2.2 or higher #endif +#include "logging.hpp" + LLFIO_V2_NAMESPACE_BEGIN #ifndef LLFIO_DISABLE_PATHS_IN_FAILURE_INFO @@ -351,6 +351,8 @@ LLFIO_V2_NAMESPACE_END #error LLFIO needs Outcome v2.2 or higher #endif +#include "logging.hpp" + LLFIO_V2_NAMESPACE_BEGIN namespace detail |