diff options
Diffstat (limited to 'include/llfio/v2.0/detail/impl/dynamic_thread_pool_group.ipp')
-rw-r--r-- | include/llfio/v2.0/detail/impl/dynamic_thread_pool_group.ipp | 2491 |
1 files changed, 2491 insertions, 0 deletions
diff --git a/include/llfio/v2.0/detail/impl/dynamic_thread_pool_group.ipp b/include/llfio/v2.0/detail/impl/dynamic_thread_pool_group.ipp new file mode 100644 index 00000000..287683bf --- /dev/null +++ b/include/llfio/v2.0/detail/impl/dynamic_thread_pool_group.ipp @@ -0,0 +1,2491 @@ +/* Dynamic thread pool group +(C) 2020-2021 Niall Douglas <http://www.nedproductions.biz/> (9 commits) +File Created: Dec 2020 + + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License in the accompanying file +Licence.txt or at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + + +Distributed under the Boost Software License, Version 1.0. + (See accompanying file Licence.txt or copy at + http://www.boost.org/LICENSE_1_0.txt) +*/ + +#include "../../dynamic_thread_pool_group.hpp" + +#include "../../file_handle.hpp" +#include "../../statfs.hpp" + +#include "quickcpplib/spinlock.hpp" + +#include <atomic> +#include <memory> +#include <mutex> +#include <unordered_map> +#include <unordered_set> +#include <vector> + +#include <iostream> + +#ifndef LLFIO_DYNAMIC_THREAD_POOL_GROUP_USING_GCD +#if LLFIO_FORCE_USE_LIBDISPATCH +#include <dispatch/dispatch.h> +#define LLFIO_DYNAMIC_THREAD_POOL_GROUP_USING_GCD 1 +#else +#ifdef _WIN32 +#include "windows/import.hpp" +#include <threadpoolapiset.h> +#else +#if __has_include(<dispatch/dispatch.h>) +#include <dispatch/dispatch.h> +#define LLFIO_DYNAMIC_THREAD_POOL_GROUP_USING_GCD 1 +#endif +#endif +#endif +#endif +#if !LLFIO_DYNAMIC_THREAD_POOL_GROUP_USING_GCD && !defined(_WIN32) +#if !defined(__linux__) +#error dynamic_thread_pool_group requires Grand Central Dispatch (libdispatch) on non-Linux POSIX. +#endif +#include <dirent.h> /* Defines DT_* constants */ +#include <sys/syscall.h> + +#include <condition_variable> +#include <thread> +#endif + +#define LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING 0 + +/* NOTE that the Linux results are from a VM on the same machine as the Windows results, +so they are not directly comparable. + +Linux 4Kb and 64Kb + +Benchmarking asio ... + For 1 work items got 38182.6 SHA256 hashes/sec with 1 maximum concurrency. + For 2 work items got 68664 SHA256 hashes/sec with 2 maximum concurrency. + For 4 work items got 87036.4 SHA256 hashes/sec with 4 maximum concurrency. + For 8 work items got 78702.2 SHA256 hashes/sec with 8 maximum concurrency. + For 16 work items got 51911.2 SHA256 hashes/sec with 16 maximum concurrency. + For 32 work items got 553964 SHA256 hashes/sec with 31 maximum concurrency. + For 64 work items got 713844 SHA256 hashes/sec with 36 maximum concurrency. + For 128 work items got 700172 SHA256 hashes/sec with 37 maximum concurrency. + For 256 work items got 716099 SHA256 hashes/sec with 37 maximum concurrency. + For 512 work items got 703323 SHA256 hashes/sec with 37 maximum concurrency. + For 1024 work items got 722827 SHA256 hashes/sec with 38 maximum concurrency. + +Benchmarking asio ... + For 1 work items got 3917.88 SHA256 hashes/sec with 1 maximum concurrency. + For 2 work items got 7798.29 SHA256 hashes/sec with 2 maximum concurrency. + For 4 work items got 14395.2 SHA256 hashes/sec with 4 maximum concurrency. + For 8 work items got 23633.4 SHA256 hashes/sec with 8 maximum concurrency. + For 16 work items got 31771.1 SHA256 hashes/sec with 16 maximum concurrency. + For 32 work items got 57978 SHA256 hashes/sec with 32 maximum concurrency. + For 64 work items got 66200.6 SHA256 hashes/sec with 64 maximum concurrency. + For 128 work items got 65706.5 SHA256 hashes/sec with 64 maximum concurrency. + For 256 work items got 65717.5 SHA256 hashes/sec with 64 maximum concurrency. + For 512 work items got 65652.4 SHA256 hashes/sec with 64 maximum concurrency. + For 1024 work items got 65580.3 SHA256 hashes/sec with 64 maximum concurrency. + + +Windows 4Kb and 64kB + +Benchmarking asio ... + For 1 work items got 51216.7 SHA256 hashes/sec with 1 maximum concurrency. + For 2 work items got 97691 SHA256 hashes/sec with 2 maximum concurrency. + For 4 work items got 184381 SHA256 hashes/sec with 4 maximum concurrency. + For 8 work items got 305270 SHA256 hashes/sec with 8 maximum concurrency. + For 16 work items got 520728 SHA256 hashes/sec with 16 maximum concurrency. + For 32 work items got 482729 SHA256 hashes/sec with 32 maximum concurrency. + For 64 work items got 1.02629e+06 SHA256 hashes/sec with 64 maximum concurrency. + For 128 work items got 1.01816e+06 SHA256 hashes/sec with 64 maximum concurrency. + For 256 work items got 1.01672e+06 SHA256 hashes/sec with 64 maximum concurrency. + For 512 work items got 1.01727e+06 SHA256 hashes/sec with 64 maximum concurrency. + For 1024 work items got 1.01477e+06 SHA256 hashes/sec with 64 maximum concurrency. + +Benchmarking asio ... + For 1 work items got 4069.92 SHA256 hashes/sec with 1 maximum concurrency. + For 2 work items got 8099.1 SHA256 hashes/sec with 2 maximum concurrency. + For 4 work items got 16021.7 SHA256 hashes/sec with 4 maximum concurrency. + For 8 work items got 30275.2 SHA256 hashes/sec with 8 maximum concurrency. + For 16 work items got 40972.5 SHA256 hashes/sec with 16 maximum concurrency. + For 32 work items got 70919.2 SHA256 hashes/sec with 32 maximum concurrency. + For 64 work items got 71917 SHA256 hashes/sec with 64 maximum concurrency. + For 128 work items got 71111.8 SHA256 hashes/sec with 64 maximum concurrency. + For 256 work items got 70963.5 SHA256 hashes/sec with 64 maximum concurrency. + For 512 work items got 70956.3 SHA256 hashes/sec with 64 maximum concurrency. + For 1024 work items got 70989.9 SHA256 hashes/sec with 64 maximum concurrency. +*/ + + +/* Linux 4Kb and 64Kb libdispatch + +Benchmarking llfio (Grand Central Dispatch) ... + For 1 work items got 33942.7 SHA256 hashes/sec with 1 maximum concurrency. + For 2 work items got 91275.8 SHA256 hashes/sec with 2 maximum concurrency. + For 4 work items got 191446 SHA256 hashes/sec with 4 maximum concurrency. + For 8 work items got 325776 SHA256 hashes/sec with 8 maximum concurrency. + For 16 work items got 405282 SHA256 hashes/sec with 16 maximum concurrency. + For 32 work items got 408015 SHA256 hashes/sec with 31 maximum concurrency. + For 64 work items got 412343 SHA256 hashes/sec with 32 maximum concurrency. + For 128 work items got 450024 SHA256 hashes/sec with 41 maximum concurrency. + For 256 work items got 477885 SHA256 hashes/sec with 46 maximum concurrency. + For 512 work items got 531752 SHA256 hashes/sec with 48 maximum concurrency. + For 1024 work items got 608181 SHA256 hashes/sec with 44 maximum concurrency. + +Benchmarking llfio (Grand Central Dispatch) ... + For 1 work items got 3977.21 SHA256 hashes/sec with 1 maximum concurrency. + For 2 work items got 7980.09 SHA256 hashes/sec with 2 maximum concurrency. + For 4 work items got 15075.6 SHA256 hashes/sec with 4 maximum concurrency. + For 8 work items got 24427.3 SHA256 hashes/sec with 8 maximum concurrency. + For 16 work items got 41858.7 SHA256 hashes/sec with 16 maximum concurrency. + For 32 work items got 64896.4 SHA256 hashes/sec with 32 maximum concurrency. + For 64 work items got 65683.6 SHA256 hashes/sec with 34 maximum concurrency. + For 128 work items got 65476.1 SHA256 hashes/sec with 35 maximum concurrency. + For 256 work items got 65210.6 SHA256 hashes/sec with 36 maximum concurrency. + For 512 work items got 65241.1 SHA256 hashes/sec with 36 maximum concurrency. + For 1024 work items got 65205.3 SHA256 hashes/sec with 37 maximum concurrency. +*/ + +/* Linux 4Kb and 64Kb native + +Benchmarking llfio (Linux native) ... + For 1 work items got 65160.3 SHA256 hashes/sec with 1 maximum concurrency. + For 2 work items got 126586 SHA256 hashes/sec with 2 maximum concurrency. + For 4 work items got 246616 SHA256 hashes/sec with 4 maximum concurrency. + For 8 work items got 478938 SHA256 hashes/sec with 8 maximum concurrency. + For 16 work items got 529919 SHA256 hashes/sec with 15 maximum concurrency. + For 32 work items got 902885 SHA256 hashes/sec with 32 maximum concurrency. + For 64 work items got 919633 SHA256 hashes/sec with 34 maximum concurrency. + For 128 work items got 919695 SHA256 hashes/sec with 35 maximum concurrency. + For 256 work items got 923159 SHA256 hashes/sec with 36 maximum concurrency. + For 512 work items got 922961 SHA256 hashes/sec with 37 maximum concurrency. + For 1024 work items got 926624 SHA256 hashes/sec with 38 maximum concurrency. + +Benchmarking llfio (Linux native) ... + For 1 work items got 4193.79 SHA256 hashes/sec with 1 maximum concurrency. + For 2 work items got 8422.44 SHA256 hashes/sec with 2 maximum concurrency. + For 4 work items got 12521.7 SHA256 hashes/sec with 3 maximum concurrency. + For 8 work items got 20028.4 SHA256 hashes/sec with 6 maximum concurrency. + For 16 work items got 30657.4 SHA256 hashes/sec with 10 maximum concurrency. + For 32 work items got 53217.4 SHA256 hashes/sec with 20 maximum concurrency. + For 64 work items got 65452.3 SHA256 hashes/sec with 32 maximum concurrency. + For 128 work items got 65396.3 SHA256 hashes/sec with 32 maximum concurrency. + For 256 work items got 65363.7 SHA256 hashes/sec with 32 maximum concurrency. + For 512 work items got 65198.2 SHA256 hashes/sec with 32 maximum concurrency. + For 1024 work items got 65003.9 SHA256 hashes/sec with 34 maximum concurrency. +*/ + + +/* Windows 4Kb and 64Kb Win32 thread pool + +Benchmarking llfio (Win32 thread pool (Vista+)) ... + For 1 work items got 57995.3 SHA256 hashes/sec with 1 maximum concurrency. + For 2 work items got 120267 SHA256 hashes/sec with 2 maximum concurrency. + For 4 work items got 238139 SHA256 hashes/sec with 4 maximum concurrency. + For 8 work items got 413488 SHA256 hashes/sec with 8 maximum concurrency. + For 16 work items got 575423 SHA256 hashes/sec with 16 maximum concurrency. + For 32 work items got 720938 SHA256 hashes/sec with 31 maximum concurrency. + For 64 work items got 703460 SHA256 hashes/sec with 30 maximum concurrency. + For 128 work items got 678257 SHA256 hashes/sec with 29 maximum concurrency. + For 256 work items got 678898 SHA256 hashes/sec with 29 maximum concurrency. + For 512 work items got 671729 SHA256 hashes/sec with 28 maximum concurrency. + For 1024 work items got 674433 SHA256 hashes/sec with 30 maximum concurrency. + +Benchmarking llfio (Win32 thread pool (Vista+)) ... + For 1 work items got 4132.18 SHA256 hashes/sec with 1 maximum concurrency. + For 2 work items got 8197.21 SHA256 hashes/sec with 2 maximum concurrency. + For 4 work items got 16281.3 SHA256 hashes/sec with 4 maximum concurrency. + For 8 work items got 27447.5 SHA256 hashes/sec with 8 maximum concurrency. + For 16 work items got 42621.3 SHA256 hashes/sec with 16 maximum concurrency. + For 32 work items got 69857.7 SHA256 hashes/sec with 32 maximum concurrency. + For 64 work items got 68797.9 SHA256 hashes/sec with 33 maximum concurrency. + For 128 work items got 68980.4 SHA256 hashes/sec with 33 maximum concurrency. + For 256 work items got 70370.8 SHA256 hashes/sec with 33 maximum concurrency. + For 512 work items got 70365.8 SHA256 hashes/sec with 33 maximum concurrency. + For 1024 work items got 70794.6 SHA256 hashes/sec with 33 maximum concurrency. +*/ + + +LLFIO_V2_NAMESPACE_BEGIN + +namespace detail +{ + struct dynamic_thread_pool_group_impl_guard : std::unique_lock<std::mutex> + { + using std::unique_lock<std::mutex>::unique_lock; + }; +#if 0 + template <class T> class fake_atomic + { + T _v; + + public: + constexpr fake_atomic(T v) + : _v(v) + { + } + T load(std::memory_order /*unused*/) const { return _v; } + void store(T v, std::memory_order /*unused*/) { _v = v; } + T fetch_add(T v, std::memory_order /*unused*/) + { + _v += v; + return _v - v; + } + T fetch_sub(T v, std::memory_order /*unused*/) + { + _v -= v; + return _v + v; + } + }; +#endif + struct global_dynamic_thread_pool_impl_workqueue_item + { + const size_t nesting_level; + std::shared_ptr<global_dynamic_thread_pool_impl_workqueue_item> next; + std::unordered_set<dynamic_thread_pool_group_impl *> items; // Do NOT use without holding workqueue_lock + + explicit global_dynamic_thread_pool_impl_workqueue_item(size_t _nesting_level, std::shared_ptr<global_dynamic_thread_pool_impl_workqueue_item> &&preceding) + : nesting_level(_nesting_level) + , next(preceding) + { + } + +#if !LLFIO_DYNAMIC_THREAD_POOL_GROUP_USING_GCD && !defined(_WIN32) + static constexpr unsigned TOTAL_NEXTACTIVES = 1; + struct next_active_base_t + { + std::atomic<unsigned> count{0}; + QUICKCPPLIB_NAMESPACE::configurable_spinlock::spinlock<unsigned> lock; + dynamic_thread_pool_group::work_item *front{nullptr}, *back{nullptr}; + + next_active_base_t() = default; + next_active_base_t(const next_active_base_t &o) + : count(o.count.load(std::memory_order_relaxed)) + , front(o.front) + , back(o.back) + { + } + }; + struct alignas(64) next_active_work_t : next_active_base_t + { + char _padding[64 - sizeof(next_active_base_t)]; // 40 bytes? + } next_actives[TOTAL_NEXTACTIVES]; + static_assert(sizeof(next_active_work_t) == 64, "next_active_work_t is not a cacheline"); + next_active_base_t next_timer_relative, next_timer_absolute; + + dynamic_thread_pool_group::work_item *next_active(unsigned &count, size_t threadidx) + { + if(TOTAL_NEXTACTIVES > 1) + { + threadidx &= (TOTAL_NEXTACTIVES - 1); + const size_t original_threadidx = threadidx; + bool all_empty = true; + for(;;) + { + next_active_base_t &x = next_actives[threadidx]; + if(x.count.load(std::memory_order_relaxed) > 0) + { + all_empty = false; + if(x.lock.try_lock()) + { + auto *ret = x.front; + if(ret != nullptr) + { + x.front = ret->_next_scheduled; + count = x.count.fetch_sub(1, std::memory_order_relaxed); + if(x.front == nullptr) + { + assert(x.back == ret); + x.back = nullptr; + } + ret->_next_scheduled = nullptr; + x.lock.unlock(); + return ret; + } + x.lock.unlock(); + } + } + if(++threadidx >= TOTAL_NEXTACTIVES) + { + threadidx = 0; + } + if(threadidx == original_threadidx) + { + if(all_empty) + { + return nullptr; + } + all_empty = true; + } + } + } + else + { + next_active_base_t &x = next_actives[0]; + if(x.count.load(std::memory_order_relaxed) > 0) + { + x.lock.lock(); + auto *ret = x.front; + if(ret != nullptr) + { + x.front = ret->_next_scheduled; + count = x.count.fetch_sub(1, std::memory_order_relaxed); + if(x.front == nullptr) + { + assert(x.back == ret); + x.back = nullptr; + } + ret->_next_scheduled = nullptr; + x.lock.unlock(); + return ret; + } + x.lock.unlock(); + } + } + return nullptr; + } + + private: + next_active_base_t &_choose_next_active() + { + unsigned idx = (unsigned) -1, max_count = (unsigned) -1; + for(unsigned n = 0; n < TOTAL_NEXTACTIVES; n++) + { + auto c = next_actives[n].count.load(std::memory_order_relaxed); + if(c < max_count) + { + idx = n; + max_count = c; + } + } + for(;;) + { + if(next_actives[idx].lock.try_lock()) + { + return next_actives[idx]; + } + if(++idx >= TOTAL_NEXTACTIVES) + { + idx = 0; + } + } + } + + public: + void append_active(dynamic_thread_pool_group::work_item *p) + { + next_active_base_t &x = _choose_next_active(); + x.count.fetch_add(1, std::memory_order_relaxed); + if(x.back == nullptr) + { + assert(x.front == nullptr); + x.front = x.back = p; + x.lock.unlock(); + return; + } + p->_next_scheduled = nullptr; + x.back->_next_scheduled = p; + x.back = p; + x.lock.unlock(); + } + void prepend_active(dynamic_thread_pool_group::work_item *p) + { + next_active_base_t &x = _choose_next_active(); + x.count.fetch_add(1, std::memory_order_relaxed); + if(x.front == nullptr) + { + assert(x.back == nullptr); + x.front = x.back = p; + x.lock.unlock(); + return; + } + p->_next_scheduled = x.front; + x.front = p; + x.lock.unlock(); + } + + // x must be LOCKED on entry + template <int which> dynamic_thread_pool_group::work_item *next_timer() + { + if(which == 0) + { + return nullptr; + } + next_active_base_t &x = (which == 1) ? next_timer_relative : next_timer_absolute; + // x.lock.lock(); + auto *ret = x.front; + if(ret == nullptr) + { + assert(x.back == nullptr); + x.lock.unlock(); + return nullptr; + } + x.front = ret->_next_scheduled; + if(x.front == nullptr) + { + assert(x.back == ret); + x.back = nullptr; + } + ret->_next_scheduled = nullptr; + x.count.fetch_sub(1, std::memory_order_relaxed); + x.lock.unlock(); + return ret; + } + void append_timer(dynamic_thread_pool_group::work_item *i) + { + if(i->_timepoint1 != std::chrono::steady_clock::time_point()) + { + next_timer_relative.lock.lock(); + next_timer_relative.count.fetch_add(1, std::memory_order_relaxed); + if(next_timer_relative.front == nullptr) + { + next_timer_relative.front = next_timer_relative.back = i; + } + else + { + bool done = false; + for(dynamic_thread_pool_group::work_item *p = nullptr, *n = next_timer_relative.front; n != nullptr; p = n, n = n->_next_scheduled) + { + if(n->_timepoint1 <= i->_timepoint1) + { + if(p == nullptr) + { + i->_next_scheduled = n; + next_timer_relative.front = i; + } + else + { + i->_next_scheduled = n; + p->_next_scheduled = i; + } + done = true; + break; + } + } + if(!done) + { + next_timer_relative.back->_next_scheduled = i; + i->_next_scheduled = nullptr; + next_timer_relative.back = i; + } + } + next_timer_relative.lock.unlock(); + } + else + { + next_timer_absolute.lock.lock(); + next_timer_absolute.count.fetch_add(1, std::memory_order_relaxed); + if(next_timer_absolute.front == nullptr) + { + next_timer_absolute.front = next_timer_absolute.back = i; + } + else + { + bool done = false; + for(dynamic_thread_pool_group::work_item *p = nullptr, *n = next_timer_absolute.front; n != nullptr; p = n, n = n->_next_scheduled) + { + if(n->_timepoint2 <= i->_timepoint2) + { + if(p == nullptr) + { + i->_next_scheduled = n; + next_timer_absolute.front = i; + } + else + { + i->_next_scheduled = n; + p->_next_scheduled = i; + } + done = true; + break; + } + } + if(!done) + { + next_timer_absolute.back->_next_scheduled = i; + i->_next_scheduled = nullptr; + next_timer_absolute.back = i; + } + } + next_timer_absolute.lock.unlock(); + } + } +#endif + }; + struct global_dynamic_thread_pool_impl + { + using _spinlock_type = QUICKCPPLIB_NAMESPACE::configurable_spinlock::spinlock<unsigned>; + + _spinlock_type workqueue_lock; + struct workqueue_guard : std::unique_lock<_spinlock_type> + { + using std::unique_lock<_spinlock_type>::unique_lock; + }; + std::shared_ptr<global_dynamic_thread_pool_impl_workqueue_item> workqueue; +#if LLFIO_DYNAMIC_THREAD_POOL_GROUP_USING_GCD + using threadh_type = void *; + using grouph_type = dispatch_group_t; + static void _gcd_dispatch_callback(void *arg) + { + auto *workitem = (dynamic_thread_pool_group::work_item *) arg; + global_dynamic_thread_pool()._workerthread(workitem, nullptr); + } + static void _gcd_timer_callback(void *arg) + { + auto *workitem = (dynamic_thread_pool_group::work_item *) arg; + global_dynamic_thread_pool()._timerthread(workitem, nullptr); + } +#elif defined(_WIN32) + using threadh_type = PTP_CALLBACK_INSTANCE; + using grouph_type = PTP_CALLBACK_ENVIRON; + static void CALLBACK _win32_worker_thread_callback(threadh_type threadh, PVOID Parameter, PTP_WORK /*unused*/) + { + auto *workitem = (dynamic_thread_pool_group::work_item *) Parameter; + global_dynamic_thread_pool()._workerthread(workitem, threadh); + } + static void CALLBACK _win32_timer_thread_callback(threadh_type threadh, PVOID Parameter, PTP_TIMER /*unused*/) + { + auto *workitem = (dynamic_thread_pool_group::work_item *) Parameter; + global_dynamic_thread_pool()._timerthread(workitem, threadh); + } +#else + global_dynamic_thread_pool_impl_workqueue_item first_execute{(size_t) -1, {}}; + using threadh_type = void *; + using grouph_type = void *; + std::mutex threadpool_lock; + struct threadpool_guard : std::unique_lock<std::mutex> + { + using std::unique_lock<std::mutex>::unique_lock; + }; + struct thread_t + { + thread_t *_prev{nullptr}, *_next{nullptr}; + std::thread thread; + std::condition_variable cond; + std::chrono::steady_clock::time_point last_did_work; + std::atomic<int> state{0}; // <0 = dead, 0 = sleeping/please die, 1 = busy + }; + struct threads_t + { + size_t count{0}; + thread_t *front{nullptr}, *back{nullptr}; + } threadpool_active, threadpool_sleeping; + std::atomic<size_t> total_submitted_workitems{0}, threadpool_threads{0}; + std::atomic<uint32_t> ms_sleep_for_more_work{20000}; + + std::mutex threadmetrics_lock; + struct threadmetrics_guard : std::unique_lock<std::mutex> + { + using std::unique_lock<std::mutex>::unique_lock; + }; + struct threadmetrics_threadid + { + char text[12]; // enough for a UINT32_MAX in decimal + constexpr threadmetrics_threadid() + : text{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0} + { + } + threadmetrics_threadid(string_view sv) + { + memset(text, '0', sizeof(text)); + assert(sv.size() <= sizeof(text)); + if(sv.size() > sizeof(text)) + { + abort(); + } + memcpy(text + sizeof(text) - sv.size(), sv.data(), sv.size()); + } + int compare(const threadmetrics_threadid &o) const noexcept { return memcmp(text, o.text, sizeof(text)); } + bool operator<(const threadmetrics_threadid &o) const noexcept { return compare(o) < 0; } + bool operator==(const threadmetrics_threadid &o) const noexcept { return compare(o) == 0; } + }; + struct threadmetrics_item + { + threadmetrics_item *_prev{nullptr}, *_next{nullptr}; + std::chrono::steady_clock::time_point last_updated, blocked_since; // latter set if thread seen no time + threadmetrics_threadid threadid; + uint32_t diskfaults{(uint32_t) -1}, utime{(uint32_t) -1}, stime{(uint32_t) -1}; // culmulative ticks spent in user and system for this thread + + explicit threadmetrics_item(threadmetrics_threadid v) + : threadid(v) + { + } + string_view threadid_name() const noexcept { return string_view(threadid.text, sizeof(threadid.text)); } + }; + struct threadmetrics_t + { + size_t count{0}; + threadmetrics_item *front{nullptr}, *back{nullptr}; + uint32_t blocked{0}, running{0}; + } threadmetrics_queue; // items at front are least recently updated + std::vector<threadmetrics_item *> threadmetrics_sorted; // sorted by threadid + std::chrono::steady_clock::time_point threadmetrics_last_updated; + std::atomic<unsigned> populate_threadmetrics_reentrancy{0}; +#ifdef __linux__ + std::mutex proc_self_task_fd_lock; + int proc_self_task_fd{-1}; +#endif +#endif + + std::mutex io_aware_work_item_handles_lock; + struct io_aware_work_item_handles_guard : std::unique_lock<std::mutex> + { + using std::unique_lock<std::mutex>::unique_lock; + }; + struct io_aware_work_item_statfs + { + size_t refcount{0}; + deadline default_deadline; + float average_busy{0}, average_queuedepth{0}; + std::chrono::steady_clock::time_point last_updated; + statfs_t statfs; + }; + std::unordered_map<fs_handle::unique_id_type, io_aware_work_item_statfs, fs_handle::unique_id_type_hasher> io_aware_work_item_handles; + + global_dynamic_thread_pool_impl() + { +#if !LLFIO_DYNAMIC_THREAD_POOL_GROUP_USING_GCD && !defined(_WIN32) + populate_threadmetrics(std::chrono::steady_clock::now()); +#endif + } + + template <class T, class U> static void _append_to_list(T &what, U *v) + { + if(what.front == nullptr) + { + assert(what.back == nullptr); + v->_next = v->_prev = nullptr; + what.front = what.back = v; + } + else + { + v->_next = nullptr; + v->_prev = what.back; + what.back->_next = v; + what.back = v; + } + what.count++; + } + template <class T, class U> static void _prepend_to_list(T &what, U *v) + { + if(what.front == nullptr) + { + assert(what.back == nullptr); + v->_next = v->_prev = nullptr; + what.front = what.back = v; + } + else + { + v->_prev = nullptr; + v->_next = what.front; + what.front->_prev = v; + what.front = v; + } + what.count++; + } + template <class T, class U> static void _remove_from_list(T &what, U *v) + { + if(v->_prev == nullptr && v->_next == nullptr) + { + assert(what.front == v); + assert(what.back == v); + what.front = what.back = nullptr; + } + else if(v->_prev == nullptr) + { + assert(what.front == v); + v->_next->_prev = nullptr; + what.front = v->_next; + v->_next = v->_prev = nullptr; + } + else if(v->_next == nullptr) + { + assert(what.back == v); + v->_prev->_next = nullptr; + what.back = v->_prev; + v->_next = v->_prev = nullptr; + } + else + { + v->_next->_prev = v->_prev; + v->_prev->_next = v->_next; + v->_next = v->_prev = nullptr; + } + what.count--; + } + +#if !LLFIO_DYNAMIC_THREAD_POOL_GROUP_USING_GCD && !defined(_WIN32) + inline void _execute_work(thread_t *self); + + void _add_thread(threadpool_guard & /*unused*/) + { + thread_t *p = nullptr; + try + { + p = new thread_t; + _append_to_list(threadpool_active, p); + p->thread = std::thread([this, p] { _execute_work(p); }); + } + catch(...) + { + if(p != nullptr) + { + _remove_from_list(threadpool_active, p); + } + // drop failure + } + } + + bool _remove_thread(threadpool_guard &g, threads_t &which) + { + if(which.count == 0) + { + return false; + } + // Threads which went to sleep the longest ago are at the front + auto *t = which.front; + if(t->state.load(std::memory_order_acquire) < 0) + { + // He's already exiting + return false; + } + assert(t->state.load(std::memory_order_acquire) == 0); + t->state.fetch_sub(1, std::memory_order_release); +#if LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING + std::cout << "*** DTP " << t << " is told to quit" << std::endl; +#endif + do + { + g.unlock(); + t->cond.notify_one(); + g.lock(); + } while(t->state.load(std::memory_order_acquire) >= -1); +#if LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING + std::cout << "*** DTP " << t << " has quit, deleting" << std::endl; +#endif + _remove_from_list(threadpool_active, t); + t->thread.join(); + delete t; + return true; + } + + ~global_dynamic_thread_pool_impl() + { + { + threadpool_guard g(threadpool_lock); + while(threadpool_active.count > 0 || threadpool_sleeping.count > 0) + { + while(threadpool_sleeping.count > 0) + { + auto removed = _remove_thread(g, threadpool_sleeping); + assert(removed); + (void) removed; + } + if(threadpool_active.count > 0) + { + auto removed = _remove_thread(g, threadpool_active); + assert(removed); + (void) removed; + } + } + } + threadmetrics_guard g(threadmetrics_lock); + for(auto *p : threadmetrics_sorted) + { + delete p; + } + threadmetrics_sorted.clear(); + threadmetrics_queue = {}; +#ifdef __linux__ + if(proc_self_task_fd > 0) + { + ::close(proc_self_task_fd); + proc_self_task_fd = -1; + } +#endif + } + +#ifdef __linux__ + // You are guaranteed only one of these EVER executes at a time. Locking is probably overkill, but equally also probably harmless + bool update_threadmetrics(threadmetrics_guard &&g, std::chrono::steady_clock::time_point now, threadmetrics_item *new_items) + { + auto update_item = [&](threadmetrics_item *item) { + char path[64] = "/proc/self/task/", *pend = path + 16, *tend = item->threadid.text; + while(*tend == '0' && (tend - item->threadid.text) < (ssize_t) sizeof(item->threadid.text)) + { + ++tend; + } + while((tend - item->threadid.text) < (ssize_t) sizeof(item->threadid.text)) + { + *pend++ = *tend++; + } + memcpy(pend, "/stat", 6); + int fd = ::open(path, O_RDONLY); + if(-1 == fd) + { + // Thread may have exited since we last populated + if(item->blocked_since == std::chrono::steady_clock::time_point()) + { + threadmetrics_queue.running--; + threadmetrics_queue.blocked++; + } + item->blocked_since = now; + item->last_updated = now; + return; + } + char buffer[1024]; + auto bytesread = ::read(fd, buffer, sizeof(buffer)); + ::close(fd); + buffer[std::max((size_t) bytesread, sizeof(buffer) - 1)] = 0; + char state = 0; + unsigned long majflt = 0, utime = 0, stime = 0; + sscanf(buffer, "%*d %*s %c %*d %*d %*d %*d %*d %*u %*u %*u %lu %*u %lu %lu", &state, &majflt, &utime, &stime); + if(item->utime != (uint32_t) -1 || item->stime != (uint32_t) -1) + { + if(item->utime == (uint32_t) utime && item->stime == (uint32_t) stime && state != 'R') + { + // This thread made no progress since last time + if(item->blocked_since == std::chrono::steady_clock::time_point()) + { + threadmetrics_queue.running--; + threadmetrics_queue.blocked++; + item->blocked_since = now; + } + } + else + { + if(item->blocked_since != std::chrono::steady_clock::time_point()) + { + threadmetrics_queue.running++; + threadmetrics_queue.blocked--; + item->blocked_since = std::chrono::steady_clock::time_point(); + } + } + } + // std::cout << "Threadmetrics " << path << " " << state << " " << majflt << " " << utime << " " << stime << ". Previously " << item->diskfaults << " " + // << item->utime << " " << item->stime << std::endl; + item->diskfaults = (uint32_t) majflt; + item->utime = (uint32_t) utime; + item->stime = (uint32_t) stime; + item->last_updated = now; + }; + if(new_items != nullptr) + { + for(; new_items != nullptr; new_items = new_items->_next) + { + update_item(new_items); + } + return false; + } + if(threadmetrics_queue.count == 0) + { + return false; + } + size_t updated = 0; + while(now - threadmetrics_queue.front->last_updated >= std::chrono::milliseconds(200) && updated++ < 4) + { + auto *p = threadmetrics_queue.front; + update_item(p); + _remove_from_list(threadmetrics_queue, p); + _append_to_list(threadmetrics_queue, p); + } + // if(updated > 0) + { + static const auto min_hardware_concurrency = std::thread::hardware_concurrency(); + static const auto max_hardware_concurrency = min_hardware_concurrency + 3; + auto threadmetrics_running = (ssize_t) threadmetrics_queue.running; + auto threadmetrics_blocked = (ssize_t) threadmetrics_queue.blocked; + g.unlock(); // drop threadmetrics_lock + + threadpool_guard gg(threadpool_lock); + // Adjust for the number of threads sleeping for more work + threadmetrics_running += threadpool_sleeping.count; + threadmetrics_blocked -= threadpool_sleeping.count; + if(threadmetrics_blocked < 0) + { + threadmetrics_blocked = 0; + } + const auto desired_concurrency = std::min((ssize_t) min_hardware_concurrency, (ssize_t) total_submitted_workitems.load(std::memory_order_relaxed)); + auto toadd = std::max((ssize_t) 0, std::min(desired_concurrency - threadmetrics_running, desired_concurrency - (ssize_t) threadpool_active.count)); + auto toremove = std::max((ssize_t) 0, (ssize_t) threadmetrics_running - (ssize_t) max_hardware_concurrency); + if(toadd > 0 || toremove > 0) + { + // std::cout << "total active = " << threadpool_active.count << " total idle = " << threadpool_sleeping.count + // << " threadmetrics_running = " << threadmetrics_running << " threadmetrics_blocked = " << threadmetrics_blocked << " toadd = " << toadd + // << " toremove = " << toremove << std::endl; + if(toadd > 0) + { + _add_thread(gg); + } + if(toremove > 0 && threadpool_active.count > 1) + { + // Kill myself, but not if I'm the final thread who might need to run timers + return true; + } + } + } + return false; + } + // Returns true if calling thread is to exit + bool populate_threadmetrics(std::chrono::steady_clock::time_point now) + { + if(populate_threadmetrics_reentrancy.exchange(1, std::memory_order_relaxed) == 1) + { + return false; + } + auto unpopulate_threadmetrics_reentrancy = make_scope_exit([this]() noexcept { populate_threadmetrics_reentrancy.store(0, std::memory_order_relaxed); }); + + static thread_local std::vector<char> kernelbuffer(1024); + static thread_local std::vector<threadmetrics_threadid> threadidsbuffer(1024 / sizeof(dirent)); + using getdents64_t = int (*)(int, char *, unsigned int); + static auto getdents = static_cast<getdents64_t>([](int fd, char *buf, unsigned count) -> int { return syscall(SYS_getdents64, fd, buf, count); }); + using dirent = dirent64; + size_t bytes = 0; + { + threadmetrics_guard g(threadmetrics_lock); + if(now - threadmetrics_last_updated < std::chrono::milliseconds(250) && + threadmetrics_queue.running + threadmetrics_queue.blocked >= threadpool_threads.load(std::memory_order_relaxed)) + { + if(now - threadmetrics_last_updated < std::chrono::milliseconds(100)) + { + return false; + } + return update_threadmetrics(std::move(g), now, nullptr); + } + threadmetrics_last_updated = now; + if(proc_self_task_fd < 0) + { + proc_self_task_fd = ::open("/proc/self/task", O_RDONLY | O_DIRECTORY | O_CLOEXEC); + if(proc_self_task_fd < 0) + { + posix_error().throw_exception(); + } + } + } + { + std::lock_guard<std::mutex> g(proc_self_task_fd_lock); + /* It turns out that /proc/self/task is quite racy in the Linux kernel, so keep + looping this until it stops telling obvious lies. + */ + for(auto done = false; !done;) + { + if(-1 == ::lseek64(proc_self_task_fd, 0, SEEK_SET)) + { + posix_error().throw_exception(); + } + auto _bytes = getdents(proc_self_task_fd, kernelbuffer.data(), kernelbuffer.size()); + // std::cout << "getdents(" << (kernelbuffer.size()-bytes) << ") returns " << _bytes << std::endl; + if(_bytes < 0 && errno != EINVAL) + { + posix_error().throw_exception(); + } + if(_bytes >= 0 && kernelbuffer.size() - (size_t) _bytes >= sizeof(dirent) + 16) + { + bytes = (size_t) _bytes; + threadidsbuffer.clear(); + for(auto *dent = (dirent *) kernelbuffer.data();; dent = reinterpret_cast<dirent *>(reinterpret_cast<uintptr_t>(dent) + dent->d_reclen)) + { + if(dent->d_ino != 0u && dent->d_type == DT_DIR && dent->d_name[0] != '.') + { + size_t length = strchr(dent->d_name, 0) - dent->d_name; + threadidsbuffer.push_back(string_view(dent->d_name, length)); + } + if((bytes -= dent->d_reclen) <= 0) + { + break; + } + } + auto mythreadcount = threadpool_threads.load(std::memory_order_relaxed); + if(threadidsbuffer.size() >= mythreadcount) + { + // std::cout << "Parsed from /proc " << threadidsbuffer.size() << " entries, should be at least " << mythreadcount << std::endl; + std::sort(threadidsbuffer.begin(), threadidsbuffer.end()); + done = true; + break; + } +#ifndef NDEBUG + std::cout << "NOTE: /proc returned " << threadidsbuffer.size() << " items when we know for a fact at least " << mythreadcount + << " threads exist, retrying!" << std::endl; +#endif + continue; + } + kernelbuffer.resize(kernelbuffer.size() << 1); + } + } + threadmetrics_item *firstnewitem = nullptr; + threadmetrics_guard g(threadmetrics_lock); +#if 0 + { + std::stringstream s; + s << "Parsed from /proc " << threadidsbuffer.size() << " entries (should be at least " << threadpool_threads.load(std::memory_order_relaxed) << "):"; + for(auto &i : threadidsbuffer) + { + s << " " << string_view(i.text, 12); + } + std::cout << s.str() << std::endl; + } +#endif +#if 0 + { + auto d_it = threadmetrics_sorted.begin(); + auto s_it = threadidsbuffer.begin(); + for(; d_it != threadmetrics_sorted.end() && s_it != threadidsbuffer.end(); ++d_it, ++s_it) + { + std::cout << (*d_it)->threadid_name() << " " << string_view(s_it->text, 12) << "\n"; + } + for(; d_it != threadmetrics_sorted.end(); ++d_it) + { + std::cout << (*d_it)->threadid_name() << " XXXXXXXXXXXX\n"; + } + for(; s_it != threadidsbuffer.end(); ++s_it) + { + std::cout << "XXXXXXXXXXXX " << string_view(s_it->text, 12) << "\n"; + } + std::cout << std::flush; + } +#endif + auto d_it = threadmetrics_sorted.begin(); + auto s_it = threadidsbuffer.begin(); + auto remove_item = [&] { + // std::cout << "Removing thread metrics for " << (*d_it)->threadid_name() << std::endl; + if((*d_it)->blocked_since != std::chrono::steady_clock::time_point()) + { + threadmetrics_queue.blocked--; + } + else + { + threadmetrics_queue.running--; + } + _remove_from_list(threadmetrics_queue, *d_it); + delete *d_it; + d_it = threadmetrics_sorted.erase(d_it); + }; + auto add_item = [&] { + auto p = std::make_unique<threadmetrics_item>(*s_it); + d_it = threadmetrics_sorted.insert(d_it, p.get()); + _append_to_list(threadmetrics_queue, p.get()); + // std::cout << "Adding thread metrics for " << p->threadid_name() << std::endl; + if(firstnewitem == nullptr) + { + firstnewitem = p.get(); + } + p.release(); + threadmetrics_queue.running++; + }; + // std::cout << "Compare" << std::endl; + for(; d_it != threadmetrics_sorted.end() && s_it != threadidsbuffer.end();) + { + auto c = (*d_it)->threadid.compare(*s_it); + // std::cout << "Comparing " << (*d_it)->threadid_name() << " with " << string_view(s_it->text, 12) << " = " << c << std::endl; + if(0 == c) + { + ++d_it; + ++s_it; + continue; + } + if(c < 0) + { + // d_it has gone away + remove_item(); + } + if(c > 0) + { + // s_it is a new entry + add_item(); + } + } + // std::cout << "Tail dest" << std::endl; + while(d_it != threadmetrics_sorted.end()) + { + remove_item(); + } + // std::cout << "Tail source" << std::endl; + while(s_it != threadidsbuffer.end()) + { + add_item(); + ++d_it; + ++s_it; + } + assert(threadmetrics_sorted.size() == threadidsbuffer.size()); +#if 0 + if(!std::is_sorted(threadmetrics_sorted.begin(), threadmetrics_sorted.end(), + [](threadmetrics_item *a, threadmetrics_item *b) { return a->threadid < b->threadid; })) + { + std::cout << "Threadmetrics:"; + for(auto *p : threadmetrics_sorted) + { + std::cout << "\n " << p->threadid_name(); + } + std::cout << std::endl; + abort(); + } +#endif + assert(threadmetrics_queue.running + threadmetrics_queue.blocked == threadidsbuffer.size()); + return update_threadmetrics(std::move(g), now, firstnewitem); + } +#endif +#endif + + result<void> _prepare_work_item_delay(dynamic_thread_pool_group::work_item *workitem, grouph_type grouph, deadline d) + { + (void) grouph; + if(!d) + { + return errc::invalid_argument; + } + if(workitem->_nextwork.load(std::memory_order_acquire) == 0 || d.nsecs > 0) + { + if(d.nsecs > 0) + { + if(d.steady) + { + workitem->_timepoint1 = std::chrono::steady_clock::now() + std::chrono::nanoseconds(d.nsecs); + workitem->_timepoint2 = {}; + } + else + { + workitem->_timepoint1 = {}; + workitem->_timepoint2 = d.to_time_point(); + } + } + else + { + workitem->_timepoint1 = std::chrono::steady_clock::time_point(std::chrono::steady_clock::duration(1)); + workitem->_timepoint2 = {}; + } + assert(workitem->_has_timer_set()); +#if defined(_WIN32) + if(nullptr == workitem->_internaltimerh) + { + workitem->_internaltimerh = CreateThreadpoolTimer(_win32_timer_thread_callback, workitem, grouph); + if(nullptr == workitem->_internaltimerh) + { + return win32_error(); + } + } +#endif + } + else + { + if(workitem->_timepoint1 != std::chrono::steady_clock::time_point()) + { + workitem->_timepoint1 = {}; + } + if(workitem->_timepoint2 != std::chrono::system_clock::time_point()) + { + workitem->_timepoint2 = {}; + } + assert(!workitem->_has_timer_set()); + } + return success(); + } + + inline void _submit_work_item(bool submit_into_highest_priority, dynamic_thread_pool_group::work_item *workitem, bool defer_pool_wake); + + inline result<void> submit(dynamic_thread_pool_group_impl_guard &g, dynamic_thread_pool_group_impl *group, + span<dynamic_thread_pool_group::work_item *> work) noexcept; + + inline void _work_item_done(dynamic_thread_pool_group_impl_guard &g, dynamic_thread_pool_group::work_item *i) noexcept; + + inline result<void> stop(dynamic_thread_pool_group_impl_guard &g, dynamic_thread_pool_group_impl *group, result<void> err) noexcept; + inline result<void> wait(dynamic_thread_pool_group_impl_guard &g, bool reap, dynamic_thread_pool_group_impl *group, deadline d) noexcept; + + inline void _timerthread(dynamic_thread_pool_group::work_item *workitem, threadh_type selfthreadh); + inline void _workerthread(dynamic_thread_pool_group::work_item *workitem, threadh_type selfthreadh); + }; + struct global_dynamic_thread_pool_impl_thread_local_state_t + { + dynamic_thread_pool_group::work_item *workitem{nullptr}; + global_dynamic_thread_pool_impl::threadh_type current_callback_instance{nullptr}; + size_t nesting_level{0}; + }; + LLFIO_HEADERS_ONLY_FUNC_SPEC global_dynamic_thread_pool_impl_thread_local_state_t &global_dynamic_thread_pool_thread_local_state() noexcept + { + static thread_local global_dynamic_thread_pool_impl_thread_local_state_t tls; + return tls; + } + + LLFIO_HEADERS_ONLY_FUNC_SPEC global_dynamic_thread_pool_impl &global_dynamic_thread_pool() noexcept + { + static global_dynamic_thread_pool_impl impl; + return impl; + } +} // namespace detail + + +LLFIO_HEADERS_ONLY_MEMFUNC_SPEC const char *dynamic_thread_pool_group::implementation_description() noexcept +{ +#if LLFIO_DYNAMIC_THREAD_POOL_GROUP_USING_GCD + return "Grand Central Dispatch"; +#elif defined(_WIN32) + return "Win32 thread pool (Vista+)"; +#elif defined(__linux__) + return "Linux native"; +#else +#error Unknown platform +#endif +} + +class dynamic_thread_pool_group_impl final : public dynamic_thread_pool_group +{ + friend struct detail::global_dynamic_thread_pool_impl; + + mutable std::mutex _lock; + size_t _nesting_level{0}; + struct workitems_t + { + size_t count{0}; + dynamic_thread_pool_group::work_item *front{nullptr}, *back{nullptr}; + } _work_items_active, _work_items_done, _work_items_delayed; + std::atomic<bool> _stopping{false}, _stopped{true}, _completing{false}; + std::atomic<int> _waits{0}; + result<void> _abnormal_completion_cause{success()}; // The cause of any abnormal group completion + +#if LLFIO_DYNAMIC_THREAD_POOL_GROUP_USING_GCD + dispatch_group_t _grouph; +#elif defined(_WIN32) + TP_CALLBACK_ENVIRON _callbackenviron; + PTP_CALLBACK_ENVIRON _grouph{&_callbackenviron}; +#else + void *_grouph{nullptr}; + size_t _newly_added_active_work_items{0}; + size_t _timer_work_items_remaining{0}; + size_t _active_work_items_remaining{0}; +#endif + +public: + result<void> init() + { + LLFIO_LOG_FUNCTION_CALL(this); + try + { + auto &impl = detail::global_dynamic_thread_pool(); + _nesting_level = detail::global_dynamic_thread_pool_thread_local_state().nesting_level; +#if LLFIO_DYNAMIC_THREAD_POOL_GROUP_USING_GCD + _grouph = dispatch_group_create(); + if(_grouph == nullptr) + { + return errc::not_enough_memory; + } +#elif defined(_WIN32) + InitializeThreadpoolEnvironment(_grouph); +#endif + detail::global_dynamic_thread_pool_impl::workqueue_guard g(impl.workqueue_lock); + // Append this group to the global work queue at its nesting level + if(!impl.workqueue || impl.workqueue->nesting_level <= _nesting_level) + { + impl.workqueue = std::make_shared<detail::global_dynamic_thread_pool_impl_workqueue_item>(_nesting_level, std::move(impl.workqueue)); + } + impl.workqueue->items.insert(this); + return success(); + } + catch(...) + { + return error_from_exception(); + } + } + + virtual ~dynamic_thread_pool_group_impl() + { + LLFIO_LOG_FUNCTION_CALL(this); + (void) wait(); + auto &impl = detail::global_dynamic_thread_pool(); + // detail::dynamic_thread_pool_group_impl_guard g1(_lock); +#if LLFIO_DYNAMIC_THREAD_POOL_GROUP_USING_GCD + if(nullptr != _grouph) + { + dispatch_release(_grouph); + _grouph = nullptr; + } +#elif defined(_WIN32) + if(nullptr != _grouph) + { + DestroyThreadpoolEnvironment(_grouph); + _grouph = nullptr; + } +#endif + detail::global_dynamic_thread_pool_impl::workqueue_guard g2(impl.workqueue_lock); + assert(impl.workqueue->nesting_level >= _nesting_level); + for(auto *p = impl.workqueue.get(); p != nullptr; p = p->next.get()) + { + if(p->nesting_level == _nesting_level) + { + p->items.erase(this); + break; + } + } + while(impl.workqueue && impl.workqueue->items.empty()) + { + impl.workqueue = std::move(impl.workqueue->next); + } + } + + virtual result<void> submit(span<work_item *> work) noexcept override + { + LLFIO_LOG_FUNCTION_CALL(this); + if(_stopping.load(std::memory_order_relaxed)) + { + return errc::operation_canceled; + } + if(_completing.load(std::memory_order_relaxed)) + { + for(auto *i : work) + { + i->_parent.store(this, std::memory_order_release); + detail::global_dynamic_thread_pool_impl::_append_to_list(_work_items_delayed, i); + } + return success(); + } + _stopped.store(false, std::memory_order_release); + auto &impl = detail::global_dynamic_thread_pool(); + detail::dynamic_thread_pool_group_impl_guard g(_lock); // lock group + if(_work_items_active.count == 0 && _work_items_done.count == 0) + { + _abnormal_completion_cause = success(); + } + OUTCOME_TRY(impl.submit(g, this, work)); + if(_work_items_active.count == 0) + { + _stopped.store(true, std::memory_order_release); + } + return success(); + } + + virtual result<void> stop() noexcept override + { + LLFIO_LOG_FUNCTION_CALL(this); + if(_stopped.load(std::memory_order_relaxed)) + { + return success(); + } + auto &impl = detail::global_dynamic_thread_pool(); + detail::dynamic_thread_pool_group_impl_guard g(_lock); // lock group + return impl.stop(g, this, errc::operation_canceled); + } + + virtual bool stopping() const noexcept override { return _stopping.load(std::memory_order_relaxed); } + + virtual bool stopped() const noexcept override { return _stopped.load(std::memory_order_relaxed); } + + virtual result<void> wait(deadline d = {}) const noexcept override + { + LLFIO_LOG_FUNCTION_CALL(this); + if(_stopped.load(std::memory_order_relaxed)) + { + return success(); + } + auto &impl = detail::global_dynamic_thread_pool(); + detail::dynamic_thread_pool_group_impl_guard g(_lock); // lock group + return impl.wait(g, true, const_cast<dynamic_thread_pool_group_impl *>(this), d); + } +}; + +LLFIO_HEADERS_ONLY_MEMFUNC_SPEC size_t dynamic_thread_pool_group::current_nesting_level() noexcept +{ + return detail::global_dynamic_thread_pool_thread_local_state().nesting_level; +} + +LLFIO_HEADERS_ONLY_MEMFUNC_SPEC dynamic_thread_pool_group::work_item *dynamic_thread_pool_group::current_work_item() noexcept +{ + return detail::global_dynamic_thread_pool_thread_local_state().workitem; +} + +LLFIO_HEADERS_ONLY_MEMFUNC_SPEC uint32_t dynamic_thread_pool_group::ms_sleep_for_more_work() noexcept +{ +#if !LLFIO_DYNAMIC_THREAD_POOL_GROUP_USING_GCD && !defined(_WIN32) + return detail::global_dynamic_thread_pool().ms_sleep_for_more_work.load(std::memory_order_relaxed); +#else + return 0; +#endif +} + +LLFIO_HEADERS_ONLY_MEMFUNC_SPEC uint32_t dynamic_thread_pool_group::ms_sleep_for_more_work(uint32_t v) noexcept +{ +#if !LLFIO_DYNAMIC_THREAD_POOL_GROUP_USING_GCD && !defined(_WIN32) + if(0 == v) + { + v = 1; + } + detail::global_dynamic_thread_pool().ms_sleep_for_more_work.store(v, std::memory_order_relaxed); + return v; +#else + (void) v; + return 0; +#endif +} + +LLFIO_HEADERS_ONLY_FUNC_SPEC result<dynamic_thread_pool_group_ptr> make_dynamic_thread_pool_group() noexcept +{ + try + { + auto ret = std::make_unique<dynamic_thread_pool_group_impl>(); + OUTCOME_TRY(ret->init()); + return dynamic_thread_pool_group_ptr(std::move(ret)); + } + catch(...) + { + return error_from_exception(); + } +} + +namespace detail +{ +#if !LLFIO_DYNAMIC_THREAD_POOL_GROUP_USING_GCD && !defined(_WIN32) + inline void global_dynamic_thread_pool_impl::_execute_work(thread_t *self) + { + pthread_setname_np(pthread_self(), "LLFIO DYN TPG"); + self->last_did_work = std::chrono::steady_clock::now(); + self->state.fetch_add(1, std::memory_order_release); // busy + const unsigned mythreadidx = threadpool_threads.fetch_add(1, std::memory_order_release); +#if LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING + std::cout << "*** DTP " << self << " begins." << std::endl; +#endif + while(self->state.load(std::memory_order_relaxed) > 0) + { + dynamic_thread_pool_group::work_item *workitem = nullptr; + bool workitem_is_timer = false; + std::chrono::steady_clock::time_point now_steady, earliest_duration; + std::chrono::system_clock::time_point now_system, earliest_absolute; + // Start from highest priority work group, executing any timers due before selecting a work item + { + auto examine_wq = [&](global_dynamic_thread_pool_impl_workqueue_item &wq) -> dynamic_thread_pool_group::work_item * { + if(wq.next_timer_relative.count.load(std::memory_order_relaxed) > 0) + { + if(now_steady == std::chrono::steady_clock::time_point()) + { + now_steady = std::chrono::steady_clock::now(); + } + wq.next_timer_relative.lock.lock(); + if(wq.next_timer_relative.front != nullptr) + { + if(wq.next_timer_relative.front->_timepoint1 <= now_steady) + { + workitem = wq.next_timer<1>(); // unlocks wq.next_timer_relative.lock + workitem_is_timer = true; + return workitem; + } + if(earliest_duration == std::chrono::steady_clock::time_point() || wq.next_timer_relative.front->_timepoint1 < earliest_duration) + { + earliest_duration = wq.next_timer_relative.front->_timepoint1; + } + } + wq.next_timer_relative.lock.unlock(); + } + if(wq.next_timer_absolute.count.load(std::memory_order_relaxed) > 0) + { + if(now_system == std::chrono::system_clock::time_point()) + { + now_system = std::chrono::system_clock::now(); + } + wq.next_timer_absolute.lock.lock(); + if(wq.next_timer_absolute.front != nullptr) + { + if(wq.next_timer_absolute.front->_timepoint2 <= now_system) + { + workitem = wq.next_timer<2>(); // unlocks wq.next_timer_absolute.lock + workitem_is_timer = true; + return workitem; + } + if(earliest_absolute == std::chrono::system_clock::time_point() || wq.next_timer_absolute.front->_timepoint2 < earliest_absolute) + { + earliest_absolute = wq.next_timer_absolute.front->_timepoint2; + } + } + wq.next_timer_absolute.lock.unlock(); + } + unsigned count = 0; + return wq.next_active(count, mythreadidx); + }; + workitem = examine_wq(first_execute); + if(workitem == nullptr) + { + workqueue_lock.lock(); + auto lock_wq = workqueue; // take shared_ptr to highest priority collection of work groups + workqueue_lock.unlock(); + while(lock_wq) + { + workitem = examine_wq(*lock_wq); + if(workitem != nullptr) + { + // workqueue_lock.lock(); + // std::cout << "workitem = " << workitem << " nesting_level = " << wq.nesting_level << " count = " << count << std::endl; + // workqueue_lock.unlock(); + break; + } + workqueue_lock.lock(); + lock_wq = lock_wq->next; + workqueue_lock.unlock(); + } + } + } + if(now_steady == std::chrono::steady_clock::time_point()) + { + now_steady = std::chrono::steady_clock::now(); + } + // If there are no timers, and no work to do, time to either die or sleep + if(workitem == nullptr) + { + const std::chrono::steady_clock::duration max_sleep(std::chrono::milliseconds(ms_sleep_for_more_work.load(std::memory_order_relaxed))); + if(now_steady - self->last_did_work >= max_sleep) + { + threadpool_guard g(threadpool_lock); + // If there are any timers running, leave at least one worker thread + if(threadpool_active.count > 1 || + (earliest_duration == std::chrono::steady_clock::time_point() && earliest_absolute == std::chrono::system_clock::time_point())) + { + _remove_from_list(threadpool_active, self); + threadpool_threads.fetch_sub(1, std::memory_order_release); +#if LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING + std::cout << "*** DTP " << self << " exits due to no new work for ms_sleep_for_more_work" << std::endl; +#endif + self->thread.detach(); + delete self; + return; + } + } + std::chrono::steady_clock::duration duration(max_sleep); + if(earliest_duration != std::chrono::steady_clock::time_point()) + { + if(now_steady - earliest_duration < duration) + { + duration = now_steady - earliest_duration; + } + } + else if(earliest_absolute != std::chrono::system_clock::time_point()) + { + if(now_system == std::chrono::system_clock::time_point()) + { + now_system = std::chrono::system_clock::now(); + } + auto diff = now_system - earliest_absolute; + if(diff > duration) + { + earliest_absolute = {}; + } + } + threadpool_guard g(threadpool_lock); + _remove_from_list(threadpool_active, self); + _append_to_list(threadpool_sleeping, self); + self->state.fetch_sub(1, std::memory_order_release); + if(earliest_absolute != std::chrono::system_clock::time_point()) + { +#if LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING + std::cout << "*** DTP " << self << " goes to sleep absolute" << std::endl; +#endif + self->cond.wait_until(g, earliest_absolute); + } + else + { +#if LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING + std::cout << "*** DTP " << self << " goes to sleep for " << std::chrono::duration_cast<std::chrono::milliseconds>(duration).count() << std::endl; +#endif + self->cond.wait_for(g, duration); + } + self->state.fetch_add(1, std::memory_order_release); + _remove_from_list(threadpool_sleeping, self); + _append_to_list(threadpool_active, self); +#if LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING + std::cout << "*** DTP " << self << " wakes, state = " << self->state << std::endl; +#endif + g.unlock(); + try + { + populate_threadmetrics(now_steady); + } + catch(...) + { + } + continue; + } + self->last_did_work = now_steady; +#if LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING + std::cout << "*** DTP " << self << " executes work item " << workitem << std::endl; +#endif + total_submitted_workitems.fetch_sub(1, std::memory_order_relaxed); + if(workitem_is_timer) + { + _timerthread(workitem, nullptr); + } + else + { + _workerthread(workitem, nullptr); + } + // workitem->_internalworkh should be null, however workitem may also no longer exist + try + { + if(populate_threadmetrics(now_steady)) + { + threadpool_guard g(threadpool_lock); + _remove_from_list(threadpool_active, self); + threadpool_threads.fetch_sub(1, std::memory_order_release); +#if LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING + std::cout << "*** DTP " << self << " exits due to threadmetrics saying we exceed max concurrency" << std::endl; +#endif + self->thread.detach(); + delete self; + return; + } + } + catch(...) + { + } + } + self->state.fetch_sub(2, std::memory_order_release); // dead + threadpool_threads.fetch_sub(1, std::memory_order_release); +#if LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING + std::cout << "*** DTP " << self << " exits due to state request, state = " << self->state << std::endl; +#endif + } +#endif + + inline void global_dynamic_thread_pool_impl::_submit_work_item(bool submit_into_highest_priority, dynamic_thread_pool_group::work_item *workitem, + bool defer_pool_wake) + { + (void) submit_into_highest_priority; + (void) defer_pool_wake; + const auto nextwork = workitem->_nextwork.load(std::memory_order_acquire); + if(nextwork != -1) + { + auto *parent = workitem->_parent.load(std::memory_order_relaxed); + // If no work item for now, or there is a delay, schedule a timer + if(nextwork == 0 || workitem->_has_timer_set()) + { +#if LLFIO_DYNAMIC_THREAD_POOL_GROUP_USING_GCD + dispatch_time_t when; + if(workitem->_has_timer_set_relative()) + { + // Special constant for immediately rescheduled work items + if(workitem->_timepoint1 == std::chrono::steady_clock::time_point(std::chrono::steady_clock::duration(1))) + { + when = dispatch_time(DISPATCH_TIME_NOW, 0); + } + else + { + auto duration = std::chrono::duration_cast<std::chrono::nanoseconds>(workitem->_timepoint1 - std::chrono::steady_clock::now()).count(); + if(duration > 1000000000LL) + { + // Because GCD has no way of cancelling timers, nor assigning them to a group, + // we clamp the timer to 1 second. Then if cancellation is ever done to the group, + // the worst possible wait is 1 second. _timerthread will reschedule the timer + // if it gets called short. + duration = 1000000000LL; + } + when = dispatch_time(DISPATCH_TIME_NOW, duration); + } + } + else if(workitem->_has_timer_set_absolute()) + { + deadline d(workitem->_timepoint2); + auto now = std::chrono::system_clock::now(); + if(workitem->_timepoint2 - now > std::chrono::seconds(1)) + { + d = now + std::chrono::seconds(1); + } + when = dispatch_walltime(&d.utc, 0); + } + else + { + when = dispatch_time(DISPATCH_TIME_NOW, 1); // smallest possible non immediate duration from now + } + // std::cout << "*** timer " << workitem << std::endl; + dispatch_after_f(when, dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_HIGH, 0), workitem, _gcd_timer_callback); +#elif defined(_WIN32) + LARGE_INTEGER li; + DWORD slop = 1000; + if(workitem->_has_timer_set_relative()) + { + // Special constant for immediately rescheduled work items + if(workitem->_timepoint1 == std::chrono::steady_clock::time_point(std::chrono::steady_clock::duration(1))) + { + li.QuadPart = -1; // smallest possible non immediate duration from now + } + else + { + li.QuadPart = std::chrono::duration_cast<std::chrono::nanoseconds>(workitem->_timepoint1 - std::chrono::steady_clock::now()).count() / 100; + if(li.QuadPart < 0) + { + li.QuadPart = 0; + } + if(li.QuadPart / 8 < (int64_t) slop) + { + slop = (DWORD)(li.QuadPart / 8); + } + li.QuadPart = -li.QuadPart; // negative is relative + } + } + else if(workitem->_has_timer_set_absolute()) + { + li = windows_nt_kernel::from_timepoint(workitem->_timepoint2); + } + else + { + li.QuadPart = -1; // smallest possible non immediate duration from now + } + FILETIME ft; + ft.dwHighDateTime = (DWORD) li.HighPart; + ft.dwLowDateTime = li.LowPart; + // std::cout << "*** timer " << workitem << std::endl; + SetThreadpoolTimer((PTP_TIMER) workitem->_internaltimerh, &ft, 0, slop); +#else + workqueue_guard gg(workqueue_lock); + for(auto *p = workqueue.get(); p != nullptr; p = p->next.get()) + { + if(p->nesting_level == parent->_nesting_level) + { + p->append_timer(workitem); + break; + } + } +#endif + } + else + { +#if LLFIO_DYNAMIC_THREAD_POOL_GROUP_USING_GCD + intptr_t priority = DISPATCH_QUEUE_PRIORITY_LOW; + { + global_dynamic_thread_pool_impl::workqueue_guard gg(workqueue_lock); + if(workqueue->nesting_level == parent->_nesting_level) + { + priority = DISPATCH_QUEUE_PRIORITY_HIGH; + } + else if(workqueue->nesting_level == parent->_nesting_level + 1) + { + priority = DISPATCH_QUEUE_PRIORITY_DEFAULT; + } + } + // std::cout << "*** submit " << workitem << std::endl; + dispatch_group_async_f(parent->_grouph, dispatch_get_global_queue(priority, 0), workitem, _gcd_dispatch_callback); +#elif defined(_WIN32) + // Set the priority of the group according to distance from the top + TP_CALLBACK_PRIORITY priority = TP_CALLBACK_PRIORITY_LOW; + { + global_dynamic_thread_pool_impl::workqueue_guard gg(workqueue_lock); + if(workqueue->nesting_level == parent->_nesting_level) + { + priority = TP_CALLBACK_PRIORITY_HIGH; + } + else if(workqueue->nesting_level == parent->_nesting_level + 1) + { + priority = TP_CALLBACK_PRIORITY_NORMAL; + } + } + SetThreadpoolCallbackPriority(parent->_grouph, priority); + // std::cout << "*** submit " << workitem << std::endl; + SubmitThreadpoolWork((PTP_WORK) workitem->_internalworkh); +#else + global_dynamic_thread_pool_impl::workqueue_guard gg(workqueue_lock); + if(submit_into_highest_priority) + { + // TODO: It would be super nice if we prepended this instead if it came from a timer + first_execute.append_active(workitem); + // std::cout << "append_active _nesting_level = " << parent->_nesting_level << std::endl; + } + else + { + for(auto *p = workqueue.get(); p != nullptr; p = p->next.get()) + { + if(p->nesting_level == parent->_nesting_level) + { + // TODO: It would be super nice if we prepended this instead if it came from a timer + p->append_active(workitem); + // std::cout << "append_active _nesting_level = " << parent->_nesting_level << std::endl; + break; + } + } + } +#endif + } + +#if !LLFIO_DYNAMIC_THREAD_POOL_GROUP_USING_GCD && !defined(_WIN32) +#if LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING + std::cout << "*** DTP submits work item " << workitem << std::endl; +#endif + const auto active_work_items = total_submitted_workitems.fetch_add(1, std::memory_order_relaxed) + 1; + if(!defer_pool_wake) + { + { + threadpool_guard gg(threadpool_lock); + if(threadpool_active.count == 0 && threadpool_sleeping.count == 0) + { + _add_thread(gg); + } + else if(threadpool_sleeping.count > 0 && active_work_items > threadpool_active.count) + { + // Try to wake the most recently slept first + auto *t = threadpool_sleeping.back; + auto now = std::chrono::steady_clock::now(); + for(size_t n = std::min(active_work_items - threadpool_active.count, threadpool_sleeping.count); n > 0; n--) + { + t->last_did_work = now; // prevent reap + t->cond.notify_one(); + t = t->_prev; + } + } + } + } +#endif + } + } + + inline result<void> global_dynamic_thread_pool_impl::submit(dynamic_thread_pool_group_impl_guard &g, dynamic_thread_pool_group_impl *group, + span<dynamic_thread_pool_group::work_item *> work) noexcept + { + try + { + if(work.empty()) + { + return success(); + } + for(auto *i : work) + { + if(i->_parent.load(std::memory_order_relaxed) != nullptr) + { + return errc::address_in_use; + } + } + auto uninit = make_scope_exit([&]() noexcept { + for(auto *i : work) + { + _remove_from_list(group->_work_items_active, i); +#if defined(_WIN32) + if(nullptr != i->_internaltimerh) + { + CloseThreadpoolTimer((PTP_TIMER) i->_internaltimerh); + i->_internaltimerh = nullptr; + } + if(nullptr != i->_internalworkh) + { + CloseThreadpoolWork((PTP_WORK) i->_internalworkh); + i->_internalworkh = nullptr; + } +#endif + i->_parent.store(nullptr, std::memory_order_release); + } + }); + for(auto *i : work) + { + deadline d(std::chrono::seconds(0)); + i->_parent.store(group, std::memory_order_release); + i->_nextwork.store(i->next(d), std::memory_order_release); + if(-1 == i->_nextwork.load(std::memory_order_acquire)) + { + _append_to_list(group->_work_items_done, i); + } + else + { +#if defined(_WIN32) + i->_internalworkh = CreateThreadpoolWork(_win32_worker_thread_callback, i, group->_grouph); + if(nullptr == i->_internalworkh) + { + return win32_error(); + } +#endif + OUTCOME_TRY(_prepare_work_item_delay(i, group->_grouph, d)); + _prepend_to_list(group->_work_items_active, i); + } + } + uninit.release(); + g.unlock(); + { + for(auto *i : work) + { +#if !LLFIO_DYNAMIC_THREAD_POOL_GROUP_USING_GCD && !defined(_WIN32) + group->_newly_added_active_work_items++; + group->_active_work_items_remaining++; +#endif + _submit_work_item(true, i, i != work.back()); + } + } + g.lock(); + return success(); + } + catch(...) + { + return error_from_exception(); + } + } + + inline void global_dynamic_thread_pool_impl::_work_item_done(dynamic_thread_pool_group_impl_guard &g, dynamic_thread_pool_group::work_item *i) noexcept + { + (void) g; + // std::cout << "*** _work_item_done " << i << std::endl; + auto *parent = i->_parent.load(std::memory_order_relaxed); + _remove_from_list(parent->_work_items_active, i); + _append_to_list(parent->_work_items_done, i); +#if defined(_WIN32) + if(i->_internalworkh_inuse > 0) + { + i->_internalworkh_inuse = 2; + } + else + { + if(i->_internaltimerh != nullptr) + { + CloseThreadpoolTimer((PTP_TIMER) i->_internaltimerh); + i->_internaltimerh = nullptr; + } + if(i->_internalworkh != nullptr) + { + CloseThreadpoolWork((PTP_WORK) i->_internalworkh); + i->_internalworkh = nullptr; + } + } +#endif + if(parent->_work_items_active.count == 0) + { + i = nullptr; + auto *v = parent->_work_items_done.front, *n = v; + for(; v != nullptr; v = n) + { + v->_parent.store(nullptr, std::memory_order_release); + v->_nextwork.store(-1, std::memory_order_release); + n = v->_next; + } + n = v = parent->_work_items_done.front; + parent->_work_items_done.front = parent->_work_items_done.back = nullptr; + parent->_work_items_done.count = 0; + parent->_stopping.store(false, std::memory_order_release); + parent->_completing.store(true, std::memory_order_release); // cause submissions to enter _work_items_delayed +#if LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING + std::cout << "*** DTP executes group_complete for group " << parent << std::endl; +#endif + for(; v != nullptr; v = n) + { + n = v->_next; + v->group_complete(parent->_abnormal_completion_cause); + } + parent->_stopped.store(true, std::memory_order_release); + parent->_completing.store(false, std::memory_order_release); // cease submitting to _work_items_delayed +#if LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING + std::cout << "*** DTP group_complete done for group " << parent << ". _work_items_delayed.count = " << parent->_work_items_delayed.count << std::endl; +#endif + if(parent->_work_items_delayed.count > 0) + { + /* If there are waits on this group to complete, forward progress those now. + */ + while(parent->_waits.load(std::memory_order_relaxed) > 0) + { +#if LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING + std::cout << "*** DTP group_complete blocks on waits for group " << parent << std::endl; +#endif + g.unlock(); + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + g.lock(); + } + // Now submit all delayed work + while(parent->_work_items_delayed.count > 0) + { + i = parent->_work_items_delayed.front; + _remove_from_list(parent->_work_items_delayed, i); + auto r = submit(g, parent, {&i, 1}); +#if LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING + std::cout << "*** DTP group_complete submits delayed work item " << i << " for group " << parent << " which saw error "; + if(r) + { + std::cout << "none" << std::endl; + } + else + { + std::cout << r.error().message() << std::endl; + } +#endif + if(!r) + { + parent->_work_items_delayed = {}; + (void) stop(g, parent, std::move(r)); + break; + } + } + } + } + } + + inline result<void> global_dynamic_thread_pool_impl::stop(dynamic_thread_pool_group_impl_guard &g, dynamic_thread_pool_group_impl *group, + result<void> err) noexcept + { + (void) g; + if(group->_abnormal_completion_cause) + { + group->_abnormal_completion_cause = std::move(err); + } + group->_stopping.store(true, std::memory_order_release); + return success(); + } + + + inline result<void> global_dynamic_thread_pool_impl::wait(dynamic_thread_pool_group_impl_guard &g, bool reap, dynamic_thread_pool_group_impl *group, + deadline d) noexcept + { + LLFIO_DEADLINE_TO_SLEEP_INIT(d); + if(!d || d.nsecs > 0) + { + /* To ensure forward progress, we need to gate new waits during delayed work submission. + Otherwise waits may never exit if the window where _work_items_active.count == 0 is + missed. + */ + while(group->_work_items_delayed.count > 0) + { + g.unlock(); + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + g.lock(); + } + group->_waits.fetch_add(1, std::memory_order_release); + auto unwaitcount = make_scope_exit([&]() noexcept { group->_waits.fetch_sub(1, std::memory_order_release); }); +#if LLFIO_DYNAMIC_THREAD_POOL_GROUP_USING_GCD + while(group->_work_items_active.count > 0) + { + LLFIO_DEADLINE_TO_TIMEOUT_LOOP(d); + dispatch_time_t timeout = DISPATCH_TIME_FOREVER; + if(d) + { + std::chrono::nanoseconds duration; + LLFIO_DEADLINE_TO_PARTIAL_TIMEOUT(duration, d); + timeout = dispatch_time(DISPATCH_TIME_NOW, duration.count()); + } + g.unlock(); + dispatch_group_wait(group->_grouph, timeout); + g.lock(); + // if(1 == group->_work_items_active.count) + //{ + // std::cout << "*** wait item remaining is " << group->_work_items_active.front << std::endl; + // std::this_thread::sleep_for(std::chrono::seconds(1)); + //} + } +#elif defined(_WIN32) + auto &tls = detail::global_dynamic_thread_pool_thread_local_state(); + if(tls.current_callback_instance != nullptr) + { + // I am being called from within a thread worker. Tell + // the thread pool that I am not going to exit promptly. + CallbackMayRunLong(tls.current_callback_instance); + } + // Is this a cancellation? + if(group->_stopping.load(std::memory_order_relaxed)) + { + while(group->_work_items_active.count > 0) + { + auto *i = group->_work_items_active.front; + if(nullptr != i->_internalworkh) + { + if(0 == i->_internalworkh_inuse) + { + i->_internalworkh_inuse = 1; + } + g.unlock(); + WaitForThreadpoolWorkCallbacks((PTP_WORK) i->_internalworkh, true); + g.lock(); + if(i->_internalworkh_inuse == 2) + { + if(nullptr != i->_internalworkh) + { + CloseThreadpoolWork((PTP_WORK) i->_internalworkh); + i->_internalworkh = nullptr; + } + if(nullptr != i->_internaltimerh) + { + CloseThreadpoolTimer((PTP_TIMER) i->_internaltimerh); + i->_internaltimerh = nullptr; + } + } + i->_internalworkh_inuse = 0; + } + if(nullptr != i->_internaltimerh) + { + if(0 == i->_internalworkh_inuse) + { + i->_internalworkh_inuse = 1; + } + g.unlock(); + WaitForThreadpoolTimerCallbacks((PTP_TIMER) i->_internaltimerh, true); + g.lock(); + if(i->_internalworkh_inuse == 2) + { + if(nullptr != i->_internalworkh) + { + CloseThreadpoolWork((PTP_WORK) i->_internalworkh); + i->_internalworkh = nullptr; + } + if(nullptr != i->_internaltimerh) + { + CloseThreadpoolTimer((PTP_TIMER) i->_internaltimerh); + i->_internaltimerh = nullptr; + } + } + i->_internalworkh_inuse = 0; + } + if(group->_work_items_active.count > 0 && group->_work_items_active.front == i) + { + // This item got cancelled before it started + _work_item_done(g, group->_work_items_active.front); + } + } + assert(!group->_stopping.load(std::memory_order_relaxed)); + } + else if(!d) + { + while(group->_work_items_active.count > 0) + { + auto *i = group->_work_items_active.front; + if(nullptr != i->_internalworkh) + { + if(0 == i->_internalworkh_inuse) + { + i->_internalworkh_inuse = 1; + } + g.unlock(); + WaitForThreadpoolWorkCallbacks((PTP_WORK) i->_internalworkh, false); + g.lock(); + if(i->_internalworkh_inuse == 2) + { + if(nullptr != i->_internalworkh) + { + CloseThreadpoolWork((PTP_WORK) i->_internalworkh); + i->_internalworkh = nullptr; + } + if(nullptr != i->_internaltimerh) + { + CloseThreadpoolTimer((PTP_TIMER) i->_internaltimerh); + i->_internaltimerh = nullptr; + } + } + i->_internalworkh_inuse = 0; + } + if(nullptr != i->_internaltimerh) + { + if(0 == i->_internalworkh_inuse) + { + i->_internalworkh_inuse = 1; + } + g.unlock(); + WaitForThreadpoolTimerCallbacks((PTP_TIMER) i->_internaltimerh, false); + g.lock(); + if(i->_internalworkh_inuse == 2) + { + if(nullptr != i->_internalworkh) + { + CloseThreadpoolWork((PTP_WORK) i->_internalworkh); + i->_internalworkh = nullptr; + } + if(nullptr != i->_internaltimerh) + { + CloseThreadpoolTimer((PTP_TIMER) i->_internaltimerh); + i->_internaltimerh = nullptr; + } + } + i->_internalworkh_inuse = 0; + } + } + } + else + { + while(group->_work_items_active.count > 0) + { + LLFIO_DEADLINE_TO_TIMEOUT_LOOP(d); + g.unlock(); + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + g.lock(); + } + } +#else + while(group->_work_items_active.count > 0) + { + LLFIO_DEADLINE_TO_TIMEOUT_LOOP(d); + g.unlock(); + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + g.lock(); + } +#endif + } + if(group->_work_items_active.count > 0) + { + return errc::timed_out; + } + if(reap) + { + return std::move(group->_abnormal_completion_cause); + } + return success(); + } + + inline void global_dynamic_thread_pool_impl::_timerthread(dynamic_thread_pool_group::work_item *workitem, threadh_type /*unused*/) + { + LLFIO_LOG_FUNCTION_CALL(this); + assert(workitem->_nextwork.load(std::memory_order_relaxed) != -1); + assert(workitem->_has_timer_set()); + auto *parent = workitem->_parent.load(std::memory_order_relaxed); + // std::cout << "*** _timerthread " << workitem << std::endl; + if(parent->_stopping.load(std::memory_order_relaxed)) + { + dynamic_thread_pool_group_impl_guard g(parent->_lock); // lock group + _work_item_done(g, workitem); + return; + } + if(workitem->_has_timer_set_relative()) + { +#if LLFIO_DYNAMIC_THREAD_POOL_GROUP_USING_GCD + auto now = std::chrono::steady_clock::now(); + if(workitem->_timepoint1 - now > std::chrono::seconds(0)) + { + // Timer fired short, so schedule it again + _submit_work_item(false, workitem, false); + return; + } +#endif + workitem->_timepoint1 = {}; + } + if(workitem->_has_timer_set_absolute()) + { +#if LLFIO_DYNAMIC_THREAD_POOL_GROUP_USING_GCD + auto now = std::chrono::system_clock::now(); + if(workitem->_timepoint2 - now > std::chrono::seconds(0)) + { + // Timer fired short, so schedule it again + _submit_work_item(false, workitem, false); + return; + } +#endif + workitem->_timepoint2 = {}; + } + assert(!workitem->_has_timer_set()); + if(workitem->_nextwork.load(std::memory_order_acquire) == 0) + { + deadline d(std::chrono::seconds(0)); + workitem->_nextwork.store(workitem->next(d), std::memory_order_release); + auto r2 = _prepare_work_item_delay(workitem, parent->_grouph, d); + if(!r2) + { + dynamic_thread_pool_group_impl_guard g(parent->_lock); // lock group + (void) stop(g, parent, std::move(r2)); + _work_item_done(g, workitem); + return; + } + if(-1 == workitem->_nextwork.load(std::memory_order_relaxed)) + { + dynamic_thread_pool_group_impl_guard g(parent->_lock); // lock group + _work_item_done(g, workitem); + return; + } + _submit_work_item(false, workitem, false); + return; + } + _submit_work_item(false, workitem, false); + } + + // Worker thread entry point + inline void global_dynamic_thread_pool_impl::_workerthread(dynamic_thread_pool_group::work_item *workitem, threadh_type selfthreadh) + { + LLFIO_LOG_FUNCTION_CALL(this); + //{ + // _lock_guard g(parent->_lock); + // std::cout << "*** _workerthread " << workitem << " begins with work " << workitem->_nextwork.load(std::memory_order_relaxed) << std::endl; + //} + assert(workitem->_nextwork.load(std::memory_order_relaxed) != -1); + assert(workitem->_nextwork.load(std::memory_order_relaxed) != 0); + auto *parent = workitem->_parent.load(std::memory_order_relaxed); + if(parent->_stopping.load(std::memory_order_relaxed)) + { + dynamic_thread_pool_group_impl_guard g(parent->_lock); // lock group + _work_item_done(g, workitem); + return; + } + auto &tls = detail::global_dynamic_thread_pool_thread_local_state(); + auto old_thread_local_state = tls; + tls.workitem = workitem; + tls.current_callback_instance = selfthreadh; + tls.nesting_level = parent->_nesting_level + 1; + auto r = (*workitem)(workitem->_nextwork.load(std::memory_order_acquire)); + workitem->_nextwork.store(0, std::memory_order_release); // call next() next time + tls = old_thread_local_state; + // std::cout << "*** _workerthread " << workitem << " ends with work " << workitem->_nextwork << std::endl; + if(!r) + { + dynamic_thread_pool_group_impl_guard g(parent->_lock); // lock group + (void) stop(g, parent, std::move(r)); + _work_item_done(g, workitem); + workitem = nullptr; + } + else if(parent->_stopping.load(std::memory_order_relaxed)) + { + dynamic_thread_pool_group_impl_guard g(parent->_lock); // lock group + _work_item_done(g, workitem); + } + else + { + deadline d(std::chrono::seconds(0)); + workitem->_nextwork.store(workitem->next(d), std::memory_order_release); + auto r2 = _prepare_work_item_delay(workitem, parent->_grouph, d); + if(!r2) + { + dynamic_thread_pool_group_impl_guard g(parent->_lock); // lock group + (void) stop(g, parent, std::move(r2)); + _work_item_done(g, workitem); + return; + } + if(-1 == workitem->_nextwork.load(std::memory_order_relaxed)) + { + dynamic_thread_pool_group_impl_guard g(parent->_lock); // lock group + _work_item_done(g, workitem); + return; + } + _submit_work_item(false, workitem, false); + } + } +} // namespace detail + + +/****************************************** io_aware_work_item *********************************************/ + +LLFIO_HEADERS_ONLY_MEMFUNC_SPEC dynamic_thread_pool_group::io_aware_work_item::io_aware_work_item(span<io_handle_awareness> hs) + : _handles([](span<io_handle_awareness> hs) -> span<io_handle_awareness> { + float all = 0; + for(auto &i : hs) + { + all += i.reads + i.writes + i.barriers; + } + for(auto &i : hs) + { + if(all == 0.0f) + { + i.reads = i.writes = 0.5f; + i.barriers = 0.0f; + } + else + { + i.reads /= all; + i.writes /= all; + i.barriers /= all; + } + } + auto &impl = detail::global_dynamic_thread_pool(); + detail::global_dynamic_thread_pool_impl::io_aware_work_item_handles_guard g(impl.io_aware_work_item_handles_lock); + for(auto &h : hs) + { + if(!h.h->is_seekable()) + { + throw std::runtime_error("Supplied handle is not seekable"); + } + auto *fh = static_cast<file_handle *>(h.h); + auto unique_id = fh->unique_id(); + auto it = impl.io_aware_work_item_handles.find(unique_id); + if(it == impl.io_aware_work_item_handles.end()) + { + it = impl.io_aware_work_item_handles.emplace(unique_id, detail::global_dynamic_thread_pool_impl::io_aware_work_item_statfs{}).first; + auto r = it->second.statfs.fill(*fh, statfs_t::want::iosinprogress | statfs_t::want::iosbusytime); + if(!r || it->second.statfs.f_iosinprogress == (uint32_t) -1) + { + impl.io_aware_work_item_handles.erase(it); + if(!r) + { + r.value(); + } + throw std::runtime_error("statfs::f_iosinprogress unavailable for supplied handle"); + } + it->second.last_updated = std::chrono::steady_clock::now(); + } + it->second.refcount++; + h._internal = &*it; + } + return hs; + }(hs)) +{ + LLFIO_LOG_FUNCTION_CALL(this); +} + +LLFIO_HEADERS_ONLY_MEMFUNC_SPEC dynamic_thread_pool_group::io_aware_work_item::~io_aware_work_item() +{ + LLFIO_LOG_FUNCTION_CALL(this); + auto &impl = detail::global_dynamic_thread_pool(); + detail::global_dynamic_thread_pool_impl::io_aware_work_item_handles_guard g(impl.io_aware_work_item_handles_lock); + using value_type = decltype(impl.io_aware_work_item_handles)::value_type; + for(auto &h : _handles) + { + auto *i = (value_type *) h._internal; + if(0 == --i->second.refcount) + { + impl.io_aware_work_item_handles.erase(i->first); + } + } +} + +LLFIO_HEADERS_ONLY_MEMFUNC_SPEC intptr_t dynamic_thread_pool_group::io_aware_work_item::next(deadline &d) noexcept +{ + LLFIO_LOG_FUNCTION_CALL(this); + { + auto &impl = detail::global_dynamic_thread_pool(); + auto now = std::chrono::steady_clock::now(); + detail::global_dynamic_thread_pool_impl::io_aware_work_item_handles_guard g(impl.io_aware_work_item_handles_lock); + using value_type = decltype(impl.io_aware_work_item_handles)::value_type; + for(auto &h : _handles) + { + auto *i = (value_type *) h._internal; + if(std::chrono::duration_cast<std::chrono::milliseconds>(now - i->second.last_updated) >= std::chrono::milliseconds(100)) + { + // auto old_iosinprogress = i->second.statfs.f_iosinprogress; + auto elapsed = now - i->second.last_updated; + (void) i->second.statfs.fill(*h.h, statfs_t::want::iosinprogress | statfs_t::want::iosbusytime); + i->second.last_updated = now; + + if(elapsed > std::chrono::seconds(5)) + { + i->second.average_busy = i->second.statfs.f_iosbusytime; + i->second.average_queuedepth = (float) i->second.statfs.f_iosinprogress; + } + else + { + i->second.average_busy = (i->second.average_busy * 0.9f) + (i->second.statfs.f_iosbusytime * 0.1f); + i->second.average_queuedepth = (i->second.average_queuedepth * 0.9f) + (i->second.statfs.f_iosinprogress * 0.1f); + } + if(i->second.average_busy < this->max_iosbusytime && i->second.average_queuedepth < this->min_iosinprogress) + { + i->second.default_deadline = std::chrono::seconds(0); // remove pacing + } + else if(i->second.average_queuedepth > this->max_iosinprogress) + { + if(0 == i->second.default_deadline.nsecs) + { + i->second.default_deadline = std::chrono::milliseconds(1); // start with 1ms, it'll reduce from there if needed + } + else if((i->second.default_deadline.nsecs >> 4) > 0) + { + i->second.default_deadline.nsecs += i->second.default_deadline.nsecs >> 4; + } + else + { + i->second.default_deadline.nsecs++; + } + } + else if(i->second.average_queuedepth < this->min_iosinprogress) + { + if(i->second.default_deadline.nsecs > (i->second.default_deadline.nsecs >> 4) && (i->second.default_deadline.nsecs >> 4) > 0) + { + i->second.default_deadline.nsecs -= i->second.default_deadline.nsecs >> 4; + } + else if(i->second.default_deadline.nsecs > 1) + { + i->second.default_deadline.nsecs--; + } + } + } + if(d.nsecs < i->second.default_deadline.nsecs) + { + d = i->second.default_deadline; + } + } + } + return io_aware_next(d); +} + + +LLFIO_V2_NAMESPACE_END |