/* Dynamic thread pool group (C) 2020-2021 Niall Douglas (9 commits) File Created: Dec 2020 Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License in the accompanying file Licence.txt or at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. Distributed under the Boost Software License, Version 1.0. (See accompanying file Licence.txt or copy at http://www.boost.org/LICENSE_1_0.txt) */ #include "../../dynamic_thread_pool_group.hpp" #include "../../file_handle.hpp" #include "../../statfs.hpp" #include #include #include #include #include #include #ifndef LLFIO_DYNAMIC_THREAD_POOL_GROUP_USING_GCD #if LLFIO_FORCE_USE_LIBDISPATCH #include #define LLFIO_DYNAMIC_THREAD_POOL_GROUP_USING_GCD 1 #else #ifdef _WIN32 #include "windows/import.hpp" #include #else #if __has_include() #include #define LLFIO_DYNAMIC_THREAD_POOL_GROUP_USING_GCD 1 #endif #endif #endif #endif #if !LLFIO_DYNAMIC_THREAD_POOL_GROUP_USING_GCD && !defined(_WIN32) #if !defined(__linux__) #error dynamic_thread_pool_group requires Grand Central Dispatch (libdispatch) on non-Linux POSIX. #endif #include /* Defines DT_* constants */ #include #include #include #endif #define LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING 0 /* NOTE that the Linux results are from a VM with half the CPUs of the Windows results, so they are not directly comparable. Linux 4Kb and 64Kb Benchmarking asio ... For 1 work items got 33635.5 SHA256 hashes/sec with 1 maximum concurrency. For 2 work items got 59420.4 SHA256 hashes/sec with 2 maximum concurrency. For 4 work items got 65653.8 SHA256 hashes/sec with 4 maximum concurrency. For 8 work items got 42216.3 SHA256 hashes/sec with 8 maximum concurrency. For 16 work items got 458911 SHA256 hashes/sec with 16 maximum concurrency. For 32 work items got 578462 SHA256 hashes/sec with 27 maximum concurrency. For 64 work items got 572456 SHA256 hashes/sec with 27 maximum concurrency. For 128 work items got 572326 SHA256 hashes/sec with 26 maximum concurrency. For 256 work items got 568558 SHA256 hashes/sec with 25 maximum concurrency. For 512 work items got 570342 SHA256 hashes/sec with 26 maximum concurrency. For 1024 work items got 567351 SHA256 hashes/sec with 26 maximum concurrency. Benchmarking asio ... For 1 work items got 3768.07 SHA256 hashes/sec with 1 maximum concurrency. For 2 work items got 7672.47 SHA256 hashes/sec with 2 maximum concurrency. For 4 work items got 14169.5 SHA256 hashes/sec with 4 maximum concurrency. For 8 work items got 21785.9 SHA256 hashes/sec with 8 maximum concurrency. For 16 work items got 30875 SHA256 hashes/sec with 16 maximum concurrency. For 32 work items got 43614.4 SHA256 hashes/sec with 32 maximum concurrency. For 64 work items got 46075.4 SHA256 hashes/sec with 32 maximum concurrency. For 128 work items got 47111.6 SHA256 hashes/sec with 32 maximum concurrency. For 256 work items got 45926.6 SHA256 hashes/sec with 32 maximum concurrency. For 512 work items got 45923.9 SHA256 hashes/sec with 32 maximum concurrency. For 1024 work items got 46250.9 SHA256 hashes/sec with 32 maximum concurrency. Windows 4Kb and 64kB Benchmarking asio ... For 1 work items got 49443.6 SHA256 hashes/sec with 1 maximum concurrency. For 2 work items got 97189 SHA256 hashes/sec with 2 maximum concurrency. For 4 work items got 185187 SHA256 hashes/sec with 4 maximum concurrency. For 8 work items got 328105 SHA256 hashes/sec with 8 maximum concurrency. For 16 work items got 513294 SHA256 hashes/sec with 16 maximum concurrency. For 32 work items got 493040 SHA256 hashes/sec with 32 maximum concurrency. For 64 work items got 1.00736e+06 SHA256 hashes/sec with 64 maximum concurrency. For 128 work items got 996193 SHA256 hashes/sec with 64 maximum concurrency. For 256 work items got 993805 SHA256 hashes/sec with 64 maximum concurrency. For 512 work items got 998211 SHA256 hashes/sec with 64 maximum concurrency. For 1024 work items got 990231 SHA256 hashes/sec with 64 maximum concurrency. Benchmarking asio ... For 1 work items got 3797.05 SHA256 hashes/sec with 1 maximum concurrency. For 2 work items got 7869.94 SHA256 hashes/sec with 2 maximum concurrency. For 4 work items got 15612 SHA256 hashes/sec with 4 maximum concurrency. For 8 work items got 28481.1 SHA256 hashes/sec with 8 maximum concurrency. For 16 work items got 41255.2 SHA256 hashes/sec with 16 maximum concurrency. For 32 work items got 66182.4 SHA256 hashes/sec with 32 maximum concurrency. For 64 work items got 67230.5 SHA256 hashes/sec with 64 maximum concurrency. For 128 work items got 66988.5 SHA256 hashes/sec with 64 maximum concurrency. For 256 work items got 66926.1 SHA256 hashes/sec with 64 maximum concurrency. For 512 work items got 66964.7 SHA256 hashes/sec with 64 maximum concurrency. For 1024 work items got 66911 SHA256 hashes/sec with 64 maximum concurrency. */ /* Linux 4Kb and 64Kb libdispatch Benchmarking llfio (Grand Central Dispatch) ... For 1 work items got 32058.2 SHA256 hashes/sec with 1 maximum concurrency. For 2 work items got 26084.1 SHA256 hashes/sec with 2 maximum concurrency. For 4 work items got 24906.8 SHA256 hashes/sec with 4 maximum concurrency. For 8 work items got 24729.5 SHA256 hashes/sec with 8 maximum concurrency. For 16 work items got 73749.1 SHA256 hashes/sec with 16 maximum concurrency. For 32 work items got 526656 SHA256 hashes/sec with 21 maximum concurrency. For 64 work items got 535043 SHA256 hashes/sec with 27 maximum concurrency. For 128 work items got 541809 SHA256 hashes/sec with 30 maximum concurrency. For 256 work items got 543568 SHA256 hashes/sec with 33 maximum concurrency. For 512 work items got 545540 SHA256 hashes/sec with 37 maximum concurrency. For 1024 work items got 542017 SHA256 hashes/sec with 41 maximum concurrency. Benchmarking llfio (Grand Central Dispatch) ... For 1 work items got 3857.82 SHA256 hashes/sec with 1 maximum concurrency. For 2 work items got 7666.2 SHA256 hashes/sec with 2 maximum concurrency. For 4 work items got 14993.6 SHA256 hashes/sec with 4 maximum concurrency. For 8 work items got 25160 SHA256 hashes/sec with 8 maximum concurrency. For 16 work items got 39015.5 SHA256 hashes/sec with 16 maximum concurrency. For 32 work items got 43494.4 SHA256 hashes/sec with 16 maximum concurrency. For 64 work items got 42874.5 SHA256 hashes/sec with 16 maximum concurrency. For 128 work items got 42678.7 SHA256 hashes/sec with 16 maximum concurrency. For 256 work items got 42661.7 SHA256 hashes/sec with 16 maximum concurrency. For 512 work items got 42670.9 SHA256 hashes/sec with 16 maximum concurrency. For 1024 work items got 44609.5 SHA256 hashes/sec with 16 maximum concurrency. */ /* Windows 4Kb and 64Kb Win32 thread pool Benchmarking llfio (Win32 thread pool (Vista+)) ... For 1 work items got 56553.8 SHA256 hashes/sec with 1 maximum concurrency. For 2 work items got 110711 SHA256 hashes/sec with 2 maximum concurrency. For 4 work items got 207273 SHA256 hashes/sec with 4 maximum concurrency. For 8 work items got 269391 SHA256 hashes/sec with 8 maximum concurrency. For 16 work items got 245053 SHA256 hashes/sec with 16 maximum concurrency. For 32 work items got 260854 SHA256 hashes/sec with 21 maximum concurrency. For 64 work items got 402240 SHA256 hashes/sec with 19 maximum concurrency. For 128 work items got 400192 SHA256 hashes/sec with 19 maximum concurrency. For 256 work items got 405973 SHA256 hashes/sec with 20 maximum concurrency. For 512 work items got 406156 SHA256 hashes/sec with 22 maximum concurrency. For 1024 work items got 405901 SHA256 hashes/sec with 23 maximum concurrency. Benchmarking llfio (Win32 thread pool (Vista+)) ... For 1 work items got 4020.88 SHA256 hashes/sec with 1 maximum concurrency. For 2 work items got 8028.79 SHA256 hashes/sec with 2 maximum concurrency. For 4 work items got 15813 SHA256 hashes/sec with 4 maximum concurrency. For 8 work items got 25539.4 SHA256 hashes/sec with 8 maximum concurrency. For 16 work items got 40522.3 SHA256 hashes/sec with 16 maximum concurrency. For 32 work items got 65182 SHA256 hashes/sec with 32 maximum concurrency. For 64 work items got 65572.9 SHA256 hashes/sec with 33 maximum concurrency. For 128 work items got 66462.3 SHA256 hashes/sec with 33 maximum concurrency. For 256 work items got 66315.3 SHA256 hashes/sec with 33 maximum concurrency. For 512 work items got 66341.5 SHA256 hashes/sec with 33 maximum concurrency. For 1024 work items got 66416.2 SHA256 hashes/sec with 33 maximum concurrency. */ LLFIO_V2_NAMESPACE_BEGIN namespace detail { struct global_dynamic_thread_pool_impl_workqueue_item { std::unordered_set items; #if !LLFIO_DYNAMIC_THREAD_POOL_GROUP_USING_GCD && !defined(_WIN32) dynamic_thread_pool_group::work_item *_next_active_front{nullptr}, *_next_timer_relative_front{nullptr}, *_next_timer_absolute_front{nullptr}; dynamic_thread_pool_group::work_item *_next_active_back{nullptr}, *_next_timer_relative_back{nullptr}, *_next_timer_absolute_back{nullptr}; dynamic_thread_pool_group::work_item *next_active() { auto *ret = _next_active_front; if(ret == nullptr) { assert(_next_active_back == nullptr); return nullptr; } _next_active_front = ret->_next_scheduled; if(_next_active_front == nullptr) { assert(_next_active_back == ret); _next_active_back = nullptr; } ret->_next_scheduled = nullptr; return ret; } void append_active(dynamic_thread_pool_group::work_item *p) { if(_next_active_back == nullptr) { assert(_next_active_front == nullptr); _next_active_front = _next_active_back = p; return; } p->_next_scheduled = nullptr; _next_active_back->_next_scheduled = p; _next_active_back = p; } void prepend_active(dynamic_thread_pool_group::work_item *p) { if(_next_active_front == nullptr) { assert(_next_active_back == nullptr); _next_active_front = _next_active_back = p; return; } p->_next_scheduled = _next_active_front; _next_active_front = p; } dynamic_thread_pool_group::work_item *next_timer(int which) { if(which == 0) { return nullptr; } auto *&front = (which == 1) ? _next_timer_relative_front : _next_timer_absolute_front; auto *&back = (which == 1) ? _next_timer_relative_back : _next_timer_absolute_back; auto *ret = front; if(ret == nullptr) { assert(back == nullptr); return nullptr; } front = ret->_next_scheduled; if(front == nullptr) { assert(back == ret); back = nullptr; } ret->_next_scheduled = nullptr; return ret; } void append_timer(dynamic_thread_pool_group::work_item *i) { if(i->_timepoint1 != std::chrono::steady_clock::time_point()) { if(_next_timer_relative_front == nullptr) { _next_timer_relative_front = _next_timer_relative_back = i; } else { bool done = false; for(dynamic_thread_pool_group::work_item *p = nullptr, *n = _next_timer_relative_front; n != nullptr; p = n, n = n->_next_scheduled) { if(n->_timepoint1 <= i->_timepoint1) { if(p == nullptr) { i->_next_scheduled = n; _next_timer_relative_front = i; } else { i->_next_scheduled = n; p->_next_scheduled = i; } done = true; break; } } if(!done) { _next_timer_relative_back->_next_scheduled = i; i->_next_scheduled = nullptr; _next_timer_relative_back = i; } } } else { if(_next_timer_absolute_front == nullptr) { _next_timer_absolute_front = _next_timer_absolute_back = i; } else { bool done = false; for(dynamic_thread_pool_group::work_item *p = nullptr, *n = _next_timer_absolute_front; n != nullptr; p = n, n = n->_next_scheduled) { if(n->_timepoint2 <= i->_timepoint2) { if(p == nullptr) { i->_next_scheduled = n; _next_timer_absolute_front = i; } else { i->_next_scheduled = n; p->_next_scheduled = i; } done = true; break; } } if(!done) { _next_timer_absolute_back->_next_scheduled = i; i->_next_scheduled = nullptr; _next_timer_absolute_back = i; } } } } #endif }; struct global_dynamic_thread_pool_impl { std::mutex workqueue_lock; std::vector workqueue; #if LLFIO_DYNAMIC_THREAD_POOL_GROUP_USING_GCD using _lock_guard = std::unique_lock; using threadh_type = void *; using grouph_type = dispatch_group_t; static void _gcd_dispatch_callback(void *arg) { auto *workitem = (dynamic_thread_pool_group::work_item *) arg; global_dynamic_thread_pool()._workerthread(workitem, nullptr); } static void _gcd_timer_callback(void *arg) { auto *workitem = (dynamic_thread_pool_group::work_item *) arg; global_dynamic_thread_pool()._timerthread(workitem, nullptr); } #elif defined(_WIN32) using _lock_guard = std::unique_lock; using threadh_type = PTP_CALLBACK_INSTANCE; using grouph_type = PTP_CALLBACK_ENVIRON; static void CALLBACK _win32_worker_thread_callback(threadh_type threadh, PVOID Parameter, PTP_WORK /*unused*/) { auto *workitem = (dynamic_thread_pool_group::work_item *) Parameter; global_dynamic_thread_pool()._workerthread(workitem, threadh); } static void CALLBACK _win32_timer_thread_callback(threadh_type threadh, PVOID Parameter, PTP_TIMER /*unused*/) { auto *workitem = (dynamic_thread_pool_group::work_item *) Parameter; global_dynamic_thread_pool()._timerthread(workitem, threadh); } #else using _lock_guard = std::unique_lock; #if 0 class _lock_guard { std::mutex &_l; bool _is_locked{false}; _lock_guard(std::mutex &l) : _l(l) { lock(); } _lock_guard(const _lock_guard &) = delete; _lock_guard(_lock_guard &&) = delete; _lock_guard &operator=(const _lock_guard &) = delete; _lock_guard &operator=(_lock_guard &&) = delete; ~_lock_guard() { if(_is_locked) { unlock(); } } void lock() { assert(!_is_locked); global_dynamic_thread_pool_threads_awaiting_mutex.fetch_add(1, std::memory_order_relaxed); _l.lock(); _is_locked = true; } void unlock() { assert(_is_locked); global_dynamic_thread_pool_threads_awaiting_mutex.fetch_sub(1, std::memory_order_relaxed); _l.unlock(); _is_locked = false; } }; #endif using threadh_type = void *; using grouph_type = void *; struct thread_t { thread_t *_prev{nullptr}, *_next{nullptr}; std::thread thread; std::condition_variable cond; std::chrono::steady_clock::time_point last_did_work; int state{0}; // <0 = dead, 0 = sleeping/please die, 1 = busy }; struct threads_t { size_t count{0}; thread_t *front{nullptr}, *back{nullptr}; } threadpool_active, threadpool_sleeping; std::atomic total_submitted_workitems{0}, threadpool_threads{0}; std::atomic ms_sleep_for_more_work{20000}; std::mutex threadmetrics_lock; struct threadmetrics_threadid { char text[12]; // enough for a UINT32_MAX in decimal constexpr threadmetrics_threadid() : text{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0} { } threadmetrics_threadid(string_view sv) { memset(text, '0', sizeof(text)); assert(sv.size() <= sizeof(text)); if(sv.size() > sizeof(text)) { abort(); } memcpy(text + sizeof(text) - sv.size(), sv.data(), sv.size()); } int compare(const threadmetrics_threadid &o) const noexcept { return memcmp(text, o.text, sizeof(text)); } bool operator<(const threadmetrics_threadid &o) const noexcept { return compare(o) < 0; } bool operator==(const threadmetrics_threadid &o) const noexcept { return compare(o) == 0; } }; struct threadmetrics_item { threadmetrics_item *_prev{nullptr}, *_next{nullptr}; std::chrono::steady_clock::time_point last_updated, blocked_since; // latter set if thread seen no time threadmetrics_threadid threadid; uint32_t diskfaults{(uint32_t) -1}, utime{(uint32_t) -1}, stime{(uint32_t) -1}; // culmulative ticks spent in user and system for this thread explicit threadmetrics_item(threadmetrics_threadid v) : threadid(v) { } string_view threadid_name() const noexcept { return string_view(threadid.text, sizeof(threadid.text)); } }; struct threadmetrics_t { size_t count{0}; threadmetrics_item *front{nullptr}, *back{nullptr}; uint32_t blocked{0}, running{0}; } threadmetrics_queue; // items at front are least recently updated std::vector threadmetrics_sorted; // sorted by threadid std::chrono::steady_clock::time_point threadmetrics_last_updated; std::atomic populate_threadmetrics_reentrancy{0}; #ifdef __linux__ std::mutex proc_self_task_fd_lock; int proc_self_task_fd{-1}; #endif #endif std::mutex io_aware_work_item_handles_lock; struct io_aware_work_item_statfs { size_t refcount{0}; deadline default_deadline; float average_busy{0}, average_queuedepth{0}; std::chrono::steady_clock::time_point last_updated; statfs_t statfs; }; std::unordered_map io_aware_work_item_handles; global_dynamic_thread_pool_impl() { workqueue.reserve(4); // preallocate 4 levels of nesting #if !LLFIO_DYNAMIC_THREAD_POOL_GROUP_USING_GCD && !defined(_WIN32) populate_threadmetrics(std::chrono::steady_clock::now()); #endif } template static void _append_to_list(T &what, U *v) { if(what.front == nullptr) { assert(what.back == nullptr); v->_next = v->_prev = nullptr; what.front = what.back = v; } else { v->_next = nullptr; v->_prev = what.back; what.back->_next = v; what.back = v; } what.count++; } template static void _prepend_to_list(T &what, U *v) { if(what.front == nullptr) { assert(what.back == nullptr); v->_next = v->_prev = nullptr; what.front = what.back = v; } else { v->_prev = nullptr; v->_next = what.front; what.front->_prev = v; what.front = v; } what.count++; } template static void _remove_from_list(T &what, U *v) { if(v->_prev == nullptr && v->_next == nullptr) { assert(what.front == v); assert(what.back == v); what.front = what.back = nullptr; } else if(v->_prev == nullptr) { assert(what.front == v); v->_next->_prev = nullptr; what.front = v->_next; v->_next = v->_prev = nullptr; } else if(v->_next == nullptr) { assert(what.back == v); v->_prev->_next = nullptr; what.back = v->_prev; v->_next = v->_prev = nullptr; } else { v->_next->_prev = v->_prev; v->_prev->_next = v->_next; v->_next = v->_prev = nullptr; } what.count--; } #if !LLFIO_DYNAMIC_THREAD_POOL_GROUP_USING_GCD && !defined(_WIN32) inline void _execute_work(thread_t *self); void _add_thread(_lock_guard & /*unused*/) { thread_t *p = nullptr; try { p = new thread_t; _append_to_list(threadpool_active, p); p->thread = std::thread([this, p] { _execute_work(p); }); } catch(...) { if(p != nullptr) { _remove_from_list(threadpool_active, p); } // drop failure } } bool _remove_thread(_lock_guard &g, threads_t &which) { if(which.count == 0) { return false; } // Threads which went to sleep the longest ago are at the front auto *t = which.front; if(t->state < 0) { // He's already exiting return false; } assert(t->state == 0); t->state--; #if LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING std::cout << "*** DTP " << t << " is told to quit" << std::endl; #endif do { g.unlock(); t->cond.notify_one(); g.lock(); } while(t->state >= -1); #if LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING std::cout << "*** DTP " << t << " has quit, deleting" << std::endl; #endif _remove_from_list(threadpool_active, t); t->thread.join(); delete t; return true; } ~global_dynamic_thread_pool_impl() { { _lock_guard g(workqueue_lock); // lock global while(threadpool_active.count > 0 || threadpool_sleeping.count > 0) { while(threadpool_sleeping.count > 0) { auto removed = _remove_thread(g, threadpool_sleeping); assert(removed); (void) removed; } if(threadpool_active.count > 0) { auto removed = _remove_thread(g, threadpool_active); assert(removed); (void) removed; } } } _lock_guard g(threadmetrics_lock); for(auto *p : threadmetrics_sorted) { delete p; } threadmetrics_sorted.clear(); threadmetrics_queue = {}; #ifdef __linux__ if(proc_self_task_fd > 0) { ::close(proc_self_task_fd); proc_self_task_fd = -1; } #endif } #ifdef __linux__ // You are guaranteed only one of these EVER executes at a time. Locking is probably overkill, but equally also probably harmless bool update_threadmetrics(_lock_guard &&g, std::chrono::steady_clock::time_point now, threadmetrics_item *new_items) { auto update_item = [&](threadmetrics_item *item) { char path[64] = "/proc/self/task/", *pend = path + 16, *tend = item->threadid.text; while(*tend == '0' && (tend - item->threadid.text) < (ssize_t) sizeof(item->threadid.text)) { ++tend; } while((tend - item->threadid.text) < (ssize_t) sizeof(item->threadid.text)) { *pend++ = *tend++; } memcpy(pend, "/stat", 6); int fd = ::open(path, O_RDONLY); if(-1 == fd) { // Thread may have exited since we last populated if(item->blocked_since == std::chrono::steady_clock::time_point()) { threadmetrics_queue.running--; threadmetrics_queue.blocked++; } item->blocked_since = now; item->last_updated = now; return; } char buffer[1024]; auto bytesread = ::read(fd, buffer, sizeof(buffer)); ::close(fd); buffer[std::max((size_t) bytesread, sizeof(buffer) - 1)] = 0; char state = 0; unsigned long majflt = 0, utime = 0, stime = 0; sscanf(buffer, "%*d %*s %c %*d %*d %*d %*d %*d %*u %*u %*u %lu %*u %lu %lu", &state, &majflt, &utime, &stime); if(item->utime != (uint32_t) -1 || item->stime != (uint32_t) -1) { if(item->utime == (uint32_t) utime && item->stime == (uint32_t) stime && state != 'R') { // This thread made no progress since last time if(item->blocked_since == std::chrono::steady_clock::time_point()) { threadmetrics_queue.running--; threadmetrics_queue.blocked++; item->blocked_since = now; } } else { if(item->blocked_since != std::chrono::steady_clock::time_point()) { threadmetrics_queue.running++; threadmetrics_queue.blocked--; item->blocked_since = std::chrono::steady_clock::time_point(); } } } // std::cout << "Threadmetrics " << path << " " << state << " " << majflt << " " << utime << " " << stime << ". Previously " << item->diskfaults << " " // << item->utime << " " << item->stime << std::endl; item->diskfaults = (uint32_t) majflt; item->utime = (uint32_t) utime; item->stime = (uint32_t) stime; item->last_updated = now; }; if(new_items != nullptr) { for(; new_items != nullptr; new_items = new_items->_next) { update_item(new_items); } return false; } if(threadmetrics_queue.count == 0) { return false; } size_t updated = 0; while(now - threadmetrics_queue.front->last_updated >= std::chrono::milliseconds(200) && updated++ < 4) { auto *p = threadmetrics_queue.front; update_item(p); _remove_from_list(threadmetrics_queue, p); _append_to_list(threadmetrics_queue, p); } // if(updated > 0) { static const auto min_hardware_concurrency = std::thread::hardware_concurrency(); static const auto max_hardware_concurrency = min_hardware_concurrency + 3; auto threadmetrics_running = (ssize_t) threadmetrics_queue.running; auto threadmetrics_blocked = (ssize_t) threadmetrics_queue.blocked; g.unlock(); // drop threadmetrics_lock _lock_guard gg(workqueue_lock); // Adjust for the number of threads sleeping for more work threadmetrics_running += threadpool_sleeping.count; threadmetrics_blocked -= threadpool_sleeping.count; if(threadmetrics_blocked < 0) { threadmetrics_blocked = 0; } const auto desired_concurrency = std::min((ssize_t) min_hardware_concurrency, (ssize_t) total_submitted_workitems.load(std::memory_order_relaxed)); auto toadd = std::max((ssize_t) 0, std::min(desired_concurrency - threadmetrics_running, desired_concurrency - (ssize_t) threadpool_active.count)); auto toremove = std::max((ssize_t) 0, (ssize_t) threadmetrics_running - (ssize_t) max_hardware_concurrency); if(toadd > 0 || toremove > 0) { //std::cout << "total active = " << threadpool_active.count << " total idle = " << threadpool_sleeping.count // << " threadmetrics_running = " << threadmetrics_running << " threadmetrics_blocked = " << threadmetrics_blocked << " toadd = " << toadd // << " toremove = " << toremove << std::endl; if(toadd > 0) { _add_thread(gg); } if(toremove > 0 && threadpool_active.count > 1) { // Kill myself, but not if I'm the final thread who might need to run timers return true; } } } return false; } // Returns true if calling thread is to exit bool populate_threadmetrics(std::chrono::steady_clock::time_point now) { if(populate_threadmetrics_reentrancy.exchange(1, std::memory_order_relaxed) == 1) { return false; } auto unpopulate_threadmetrics_reentrancy = make_scope_exit([this]() noexcept { populate_threadmetrics_reentrancy.store(0, std::memory_order_relaxed); }); static thread_local std::vector kernelbuffer(1024); static thread_local std::vector threadidsbuffer(1024 / sizeof(dirent)); using getdents64_t = int (*)(int, char *, unsigned int); static auto getdents = static_cast([](int fd, char *buf, unsigned count) -> int { return syscall(SYS_getdents64, fd, buf, count); }); using dirent = dirent64; size_t bytes = 0; { _lock_guard g(threadmetrics_lock); if(now - threadmetrics_last_updated < std::chrono::milliseconds(250) && threadmetrics_queue.running + threadmetrics_queue.blocked >= threadpool_threads.load(std::memory_order_relaxed)) { if(now - threadmetrics_last_updated < std::chrono::milliseconds(100)) { return false; } return update_threadmetrics(std::move(g), now, nullptr); } threadmetrics_last_updated = now; if(proc_self_task_fd < 0) { proc_self_task_fd = ::open("/proc/self/task", O_RDONLY | O_DIRECTORY | O_CLOEXEC); if(proc_self_task_fd < 0) { posix_error().throw_exception(); } } } { _lock_guard g(proc_self_task_fd_lock); /* It turns out that /proc/self/task is quite racy in the Linux kernel, so keep looping this until it stops telling obvious lies. */ for(auto done = false; !done;) { if(-1 == ::lseek64(proc_self_task_fd, 0, SEEK_SET)) { posix_error().throw_exception(); } auto _bytes = getdents(proc_self_task_fd, kernelbuffer.data(), kernelbuffer.size()); // std::cout << "getdents(" << (kernelbuffer.size()-bytes) << ") returns " << _bytes << std::endl; if(_bytes < 0 && errno != EINVAL) { posix_error().throw_exception(); } if(_bytes >= 0 && kernelbuffer.size() - (size_t) _bytes >= sizeof(dirent) + 16) { bytes = (size_t) _bytes; threadidsbuffer.clear(); for(auto *dent = (dirent *) kernelbuffer.data();; dent = reinterpret_cast(reinterpret_cast(dent) + dent->d_reclen)) { if(dent->d_ino != 0u && dent->d_type == DT_DIR && dent->d_name[0] != '.') { size_t length = strchr(dent->d_name, 0) - dent->d_name; threadidsbuffer.push_back(string_view(dent->d_name, length)); } if((bytes -= dent->d_reclen) <= 0) { break; } } auto mythreadcount = threadpool_threads.load(std::memory_order_relaxed); if(threadidsbuffer.size() >= mythreadcount) { // std::cout << "Parsed from /proc " << threadidsbuffer.size() << " entries, should be at least " << mythreadcount << std::endl; std::sort(threadidsbuffer.begin(), threadidsbuffer.end()); done = true; break; } #ifndef NDEBUG std::cout << "NOTE: /proc returned " << threadidsbuffer.size() << " items when we know for a fact at least " << mythreadcount << " threads exist, retrying!" << std::endl; #endif continue; } kernelbuffer.resize(kernelbuffer.size() << 1); } } threadmetrics_item *firstnewitem = nullptr; _lock_guard g(threadmetrics_lock); #if 0 { std::stringstream s; s << "Parsed from /proc " << threadidsbuffer.size() << " entries (should be at least " << threadpool_threads.load(std::memory_order_relaxed) << "):"; for(auto &i : threadidsbuffer) { s << " " << string_view(i.text, 12); } std::cout << s.str() << std::endl; } #endif #if 0 { auto d_it = threadmetrics_sorted.begin(); auto s_it = threadidsbuffer.begin(); for(; d_it != threadmetrics_sorted.end() && s_it != threadidsbuffer.end(); ++d_it, ++s_it) { std::cout << (*d_it)->threadid_name() << " " << string_view(s_it->text, 12) << "\n"; } for(; d_it != threadmetrics_sorted.end(); ++d_it) { std::cout << (*d_it)->threadid_name() << " XXXXXXXXXXXX\n"; } for(; s_it != threadidsbuffer.end(); ++s_it) { std::cout << "XXXXXXXXXXXX " << string_view(s_it->text, 12) << "\n"; } std::cout << std::flush; } #endif auto d_it = threadmetrics_sorted.begin(); auto s_it = threadidsbuffer.begin(); auto remove_item = [&] { // std::cout << "Removing thread metrics for " << (*d_it)->threadid_name() << std::endl; if((*d_it)->blocked_since != std::chrono::steady_clock::time_point()) { threadmetrics_queue.blocked--; } else { threadmetrics_queue.running--; } _remove_from_list(threadmetrics_queue, *d_it); delete *d_it; d_it = threadmetrics_sorted.erase(d_it); }; auto add_item = [&] { auto p = std::make_unique(*s_it); d_it = threadmetrics_sorted.insert(d_it, p.get()); _append_to_list(threadmetrics_queue, p.get()); // std::cout << "Adding thread metrics for " << p->threadid_name() << std::endl; if(firstnewitem == nullptr) { firstnewitem = p.get(); } p.release(); threadmetrics_queue.running++; }; // std::cout << "Compare" << std::endl; for(; d_it != threadmetrics_sorted.end() && s_it != threadidsbuffer.end();) { auto c = (*d_it)->threadid.compare(*s_it); // std::cout << "Comparing " << (*d_it)->threadid_name() << " with " << string_view(s_it->text, 12) << " = " << c << std::endl; if(0 == c) { ++d_it; ++s_it; continue; } if(c < 0) { // d_it has gone away remove_item(); } if(c > 0) { // s_it is a new entry add_item(); } } // std::cout << "Tail dest" << std::endl; while(d_it != threadmetrics_sorted.end()) { remove_item(); } // std::cout << "Tail source" << std::endl; while(s_it != threadidsbuffer.end()) { add_item(); ++d_it; ++s_it; } assert(threadmetrics_sorted.size() == threadidsbuffer.size()); #if 0 if(!std::is_sorted(threadmetrics_sorted.begin(), threadmetrics_sorted.end(), [](threadmetrics_item *a, threadmetrics_item *b) { return a->threadid < b->threadid; })) { std::cout << "Threadmetrics:"; for(auto *p : threadmetrics_sorted) { std::cout << "\n " << p->threadid_name(); } std::cout << std::endl; abort(); } #endif assert(threadmetrics_queue.running + threadmetrics_queue.blocked == threadidsbuffer.size()); return update_threadmetrics(std::move(g), now, firstnewitem); } #endif #endif result _prepare_work_item_delay(dynamic_thread_pool_group::work_item *workitem, grouph_type grouph, deadline d) { if(!d) { return errc::invalid_argument; } workitem->_timepoint1 = {}; workitem->_timepoint2 = {}; assert(!workitem->_has_timer_set()); if(workitem->_nextwork == 0 || d.nsecs > 0) { if(d.nsecs > 0) { if(d.steady) { workitem->_timepoint1 = std::chrono::steady_clock::now() + std::chrono::nanoseconds(d.nsecs); } else { workitem->_timepoint2 = d.to_time_point(); } } else { workitem->_timepoint1 = std::chrono::steady_clock::time_point(std::chrono::steady_clock::duration(1)); } assert(workitem->_has_timer_set()); if(nullptr == workitem->_internaltimerh) { #if LLFIO_DYNAMIC_THREAD_POOL_GROUP_USING_GCD (void) grouph; workitem->_internaltimerh = (void *) (uintptr_t) -1; #elif defined(_WIN32) workitem->_internaltimerh = CreateThreadpoolTimer(_win32_timer_thread_callback, workitem, grouph); if(nullptr == workitem->_internaltimerh) { return win32_error(); } #else (void) grouph; workitem->_internaltimerh = (void *) (uintptr_t) -1; #endif } } return success(); } inline void _submit_work_item(_lock_guard &g, bool submit_into_highest_priority, dynamic_thread_pool_group::work_item *workitem, bool defer_pool_wake); inline result submit(_lock_guard &g, dynamic_thread_pool_group_impl *group, span work) noexcept; inline void _work_item_done(_lock_guard &g, dynamic_thread_pool_group::work_item *i) noexcept; inline result stop(_lock_guard &g, dynamic_thread_pool_group_impl *group, result err) noexcept; inline result wait(_lock_guard &g, bool reap, dynamic_thread_pool_group_impl *group, deadline d) noexcept; inline void _timerthread(dynamic_thread_pool_group::work_item *workitem, threadh_type selfthreadh); inline void _workerthread(dynamic_thread_pool_group::work_item *workitem, threadh_type selfthreadh); }; struct global_dynamic_thread_pool_impl_thread_local_state_t { dynamic_thread_pool_group::work_item *workitem{nullptr}; global_dynamic_thread_pool_impl::threadh_type current_callback_instance{nullptr}; size_t nesting_level{0}; }; LLFIO_HEADERS_ONLY_FUNC_SPEC global_dynamic_thread_pool_impl_thread_local_state_t &global_dynamic_thread_pool_thread_local_state() noexcept { static thread_local global_dynamic_thread_pool_impl_thread_local_state_t tls; return tls; } LLFIO_HEADERS_ONLY_FUNC_SPEC global_dynamic_thread_pool_impl &global_dynamic_thread_pool() noexcept { static global_dynamic_thread_pool_impl impl; return impl; } } // namespace detail LLFIO_HEADERS_ONLY_MEMFUNC_SPEC const char *dynamic_thread_pool_group::implementation_description() noexcept { #if LLFIO_DYNAMIC_THREAD_POOL_GROUP_USING_GCD return "Grand Central Dispatch"; #elif defined(_WIN32) return "Win32 thread pool (Vista+)"; #elif defined(__linux__) return "Linux native"; #else #error Unknown platform #endif } class dynamic_thread_pool_group_impl final : public dynamic_thread_pool_group { friend struct detail::global_dynamic_thread_pool_impl; mutable std::mutex _lock; using _lock_guard = detail::global_dynamic_thread_pool_impl::_lock_guard; size_t _nesting_level{0}; struct workitems_t { size_t count{0}; dynamic_thread_pool_group::work_item *front{nullptr}, *back{nullptr}; } _work_items_active, _work_items_done, _work_items_delayed; std::atomic _stopping{false}, _stopped{true}, _completing{false}; std::atomic _waits{0}; result _abnormal_completion_cause{success()}; // The cause of any abnormal group completion #if LLFIO_DYNAMIC_THREAD_POOL_GROUP_USING_GCD dispatch_group_t _grouph; #elif defined(_WIN32) TP_CALLBACK_ENVIRON _callbackenviron; PTP_CALLBACK_ENVIRON _grouph{&_callbackenviron}; #else void *_grouph{nullptr}; size_t _newly_added_active_work_items{0}; size_t _timer_work_items_remaining{0}; size_t _active_work_items_remaining{0}; #endif public: result init() { LLFIO_LOG_FUNCTION_CALL(this); try { auto &impl = detail::global_dynamic_thread_pool(); _nesting_level = detail::global_dynamic_thread_pool_thread_local_state().nesting_level; #if LLFIO_DYNAMIC_THREAD_POOL_GROUP_USING_GCD _grouph = dispatch_group_create(); if(_grouph == nullptr) { return errc::not_enough_memory; } #elif defined(_WIN32) InitializeThreadpoolEnvironment(_grouph); #endif detail::global_dynamic_thread_pool_impl::_lock_guard g(impl.workqueue_lock); // lock global // Append this group to the global work queue at its nesting level if(_nesting_level >= impl.workqueue.size()) { impl.workqueue.resize(_nesting_level + 1); } auto &wq = impl.workqueue[_nesting_level]; wq.items.insert(this); return success(); } catch(...) { return error_from_exception(); } } virtual ~dynamic_thread_pool_group_impl() { LLFIO_LOG_FUNCTION_CALL(this); (void) wait(); auto &impl = detail::global_dynamic_thread_pool(); #if LLFIO_DYNAMIC_THREAD_POOL_GROUP_USING_GCD if(nullptr != _grouph) { dispatch_release(_grouph); _grouph = nullptr; } #elif defined(_WIN32) if(nullptr != _grouph) { DestroyThreadpoolEnvironment(_grouph); _grouph = nullptr; } #endif detail::global_dynamic_thread_pool_impl::_lock_guard g(impl.workqueue_lock); // lock global assert(impl.workqueue.size() > _nesting_level); auto &wq = impl.workqueue[_nesting_level]; wq.items.erase(this); while(!impl.workqueue.empty() && impl.workqueue.back().items.empty()) { impl.workqueue.pop_back(); } } virtual result submit(span work) noexcept override { LLFIO_LOG_FUNCTION_CALL(this); if(_stopping.load(std::memory_order_relaxed)) { return errc::operation_canceled; } if(_completing.load(std::memory_order_relaxed)) { for(auto *i : work) { i->_parent.store(this, std::memory_order_release); detail::global_dynamic_thread_pool_impl::_append_to_list(_work_items_delayed, i); } return success(); } _stopped.store(false, std::memory_order_release); auto &impl = detail::global_dynamic_thread_pool(); _lock_guard g(_lock); // lock group if(_work_items_active.count == 0 && _work_items_done.count == 0) { _abnormal_completion_cause = success(); } OUTCOME_TRY(impl.submit(g, this, work)); if(_work_items_active.count == 0) { _stopped.store(true, std::memory_order_release); } return success(); } virtual result stop() noexcept override { LLFIO_LOG_FUNCTION_CALL(this); if(_stopped.load(std::memory_order_relaxed)) { return success(); } auto &impl = detail::global_dynamic_thread_pool(); _lock_guard g(_lock); // lock group return impl.stop(g, this, errc::operation_canceled); } virtual bool stopping() const noexcept override { return _stopping.load(std::memory_order_relaxed); } virtual bool stopped() const noexcept override { return _stopped.load(std::memory_order_relaxed); } virtual result wait(deadline d = {}) const noexcept override { LLFIO_LOG_FUNCTION_CALL(this); if(_stopped.load(std::memory_order_relaxed)) { return success(); } auto &impl = detail::global_dynamic_thread_pool(); _lock_guard g(_lock); // lock group return impl.wait(g, true, const_cast(this), d); } }; LLFIO_HEADERS_ONLY_MEMFUNC_SPEC size_t dynamic_thread_pool_group::current_nesting_level() noexcept { return detail::global_dynamic_thread_pool_thread_local_state().nesting_level; } LLFIO_HEADERS_ONLY_MEMFUNC_SPEC dynamic_thread_pool_group::work_item *dynamic_thread_pool_group::current_work_item() noexcept { return detail::global_dynamic_thread_pool_thread_local_state().workitem; } LLFIO_HEADERS_ONLY_MEMFUNC_SPEC uint32_t dynamic_thread_pool_group::ms_sleep_for_more_work() noexcept { #if !LLFIO_DYNAMIC_THREAD_POOL_GROUP_USING_GCD && !defined(_WIN32) return detail::global_dynamic_thread_pool().ms_sleep_for_more_work.load(std::memory_order_relaxed); #else return 0; #endif } LLFIO_HEADERS_ONLY_MEMFUNC_SPEC uint32_t dynamic_thread_pool_group::ms_sleep_for_more_work(uint32_t v) noexcept { #if !LLFIO_DYNAMIC_THREAD_POOL_GROUP_USING_GCD && !defined(_WIN32) if(0 == v) { v = 1; } detail::global_dynamic_thread_pool().ms_sleep_for_more_work.store(v, std::memory_order_relaxed); return v; #else (void) v; return 0; #endif } LLFIO_HEADERS_ONLY_FUNC_SPEC result make_dynamic_thread_pool_group() noexcept { try { auto ret = std::make_unique(); OUTCOME_TRY(ret->init()); return dynamic_thread_pool_group_ptr(std::move(ret)); } catch(...) { return error_from_exception(); } } namespace detail { #if !LLFIO_DYNAMIC_THREAD_POOL_GROUP_USING_GCD && !defined(_WIN32) inline void global_dynamic_thread_pool_impl::_execute_work(thread_t *self) { pthread_setname_np(pthread_self(), "LLFIO DYN TPG"); self->last_did_work = std::chrono::steady_clock::now(); _lock_guard g(workqueue_lock); // lock global self->state++; // busy threadpool_threads.fetch_add(1, std::memory_order_release); #if LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING std::cout << "*** DTP " << self << " begins." << std::endl; #endif while(self->state > 0) { dynamic_thread_pool_group::work_item *workitem = nullptr; bool workitem_is_timer = false; std::chrono::steady_clock::time_point now_steady, earliest_duration; std::chrono::system_clock::time_point now_system, earliest_absolute; // Start from highest priority work group, executing any timers due before selecting a work item for(auto it = workqueue.rbegin(); it != workqueue.rend() && workitem == nullptr; ++it) { auto &wq = *it; if(wq._next_timer_relative_front != nullptr) { if(now_steady == std::chrono::steady_clock::time_point()) { now_steady = std::chrono::steady_clock::now(); } if(wq._next_timer_relative_front->_timepoint1 <= now_steady) { workitem = wq.next_timer(1); workitem_is_timer = true; break; } if(earliest_duration == std::chrono::steady_clock::time_point() || wq._next_timer_relative_front->_timepoint1 < earliest_duration) { earliest_duration = wq._next_timer_relative_front->_timepoint1; } } if(wq._next_timer_absolute_front != nullptr) { if(now_system == std::chrono::system_clock::time_point()) { now_system = std::chrono::system_clock::now(); } if(wq._next_timer_absolute_front->_timepoint2 <= now_system) { workitem = wq.next_timer(2); workitem_is_timer = true; break; } if(earliest_absolute == std::chrono::system_clock::time_point() || wq._next_timer_absolute_front->_timepoint2 < earliest_absolute) { earliest_absolute = wq._next_timer_absolute_front->_timepoint2; } } workitem = wq.next_active(); } if(now_steady == std::chrono::steady_clock::time_point()) { now_steady = std::chrono::steady_clock::now(); } // If there are no timers, and no work to do, time to either die or sleep if(workitem == nullptr) { const std::chrono::steady_clock::duration max_sleep(std::chrono::milliseconds(ms_sleep_for_more_work.load(std::memory_order_relaxed))); if(now_steady - self->last_did_work >= max_sleep) { // If there are any timers running, leave at least one worker thread if(threadpool_active.count > 1 || (earliest_duration == std::chrono::steady_clock::time_point() && earliest_absolute == std::chrono::system_clock::time_point())) { _remove_from_list(threadpool_active, self); threadpool_threads.fetch_sub(1, std::memory_order_release); #if 1 // LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING std::cout << "*** DTP " << self << " exits due to no new work for ms_sleep_for_more_work" << std::endl; #endif self->thread.detach(); delete self; return; } } std::chrono::steady_clock::duration duration(max_sleep); if(earliest_duration != std::chrono::steady_clock::time_point()) { if(now_steady - earliest_duration < duration) { duration = now_steady - earliest_duration; } } else if(earliest_absolute != std::chrono::system_clock::time_point()) { if(now_system == std::chrono::system_clock::time_point()) { now_system = std::chrono::system_clock::now(); } auto diff = now_system - earliest_absolute; if(diff > duration) { earliest_absolute = {}; } } _remove_from_list(threadpool_active, self); _append_to_list(threadpool_sleeping, self); self->state--; if(earliest_absolute != std::chrono::system_clock::time_point()) { #if LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING std::cout << "*** DTP " << self << " goes to sleep absolute" << std::endl; #endif self->cond.wait_until(g, earliest_absolute); } else { #if LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING std::cout << "*** DTP " << self << " goes to sleep for " << std::chrono::duration_cast(duration).count() << std::endl; #endif self->cond.wait_for(g, duration); } self->state++; _remove_from_list(threadpool_sleeping, self); _append_to_list(threadpool_active, self); #if LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING std::cout << "*** DTP " << self << " wakes, state = " << self->state << std::endl; #endif g.unlock(); try { populate_threadmetrics(now_steady); } catch(...) { } g.lock(); continue; } self->last_did_work = now_steady; #if LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING std::cout << "*** DTP " << self << " executes work item " << workitem << std::endl; #endif total_submitted_workitems.fetch_sub(1, std::memory_order_relaxed); g.unlock(); if(workitem_is_timer) { _timerthread(workitem, nullptr); } else { _workerthread(workitem, nullptr); } // workitem->_internalworkh should be null, however workitem may also no longer exist try { if(populate_threadmetrics(now_steady)) { _remove_from_list(threadpool_active, self); threadpool_threads.fetch_sub(1, std::memory_order_release); #if 1 // LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING std::cout << "*** DTP " << self << " exits due to threadmetrics saying we exceed max concurrency" << std::endl; #endif self->thread.detach(); delete self; return; } } catch(...) { } g.lock(); } self->state -= 2; // dead threadpool_threads.fetch_sub(1, std::memory_order_release); #if 1 // LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING std::cout << "*** DTP " << self << " exits due to state request, state = " << self->state << std::endl; #endif } #endif inline void global_dynamic_thread_pool_impl::_submit_work_item(_lock_guard &g, bool submit_into_highest_priority, dynamic_thread_pool_group::work_item *workitem, bool defer_pool_wake) { (void) g; (void) submit_into_highest_priority; (void) defer_pool_wake; if(workitem->_nextwork != -1) { auto *parent = workitem->_parent.load(std::memory_order_relaxed); // If no work item for now, or there is a delay, schedule a timer if(workitem->_nextwork == 0 || workitem->_has_timer_set()) { assert(workitem->_internaltimerh != nullptr); #if LLFIO_DYNAMIC_THREAD_POOL_GROUP_USING_GCD dispatch_time_t when; if(workitem->_has_timer_set_relative()) { // Special constant for immediately rescheduled work items if(workitem->_timepoint1 == std::chrono::steady_clock::time_point(std::chrono::steady_clock::duration(1))) { when = dispatch_time(DISPATCH_TIME_NOW, 0); } else { auto duration = std::chrono::duration_cast(workitem->_timepoint1 - std::chrono::steady_clock::now()).count(); if(duration > 1000000000LL) { // Because GCD has no way of cancelling timers, nor assigning them to a group, // we clamp the timer to 1 second. Then if cancellation is ever done to the group, // the worst possible wait is 1 second. _timerthread will reschedule the timer // if it gets called short. duration = 1000000000LL; } when = dispatch_time(DISPATCH_TIME_NOW, duration); } } else if(workitem->_has_timer_set_absolute()) { deadline d(workitem->_timepoint2); auto now = std::chrono::system_clock::now(); if(workitem->_timepoint2 - now > std::chrono::seconds(1)) { d = now + std::chrono::seconds(1); } when = dispatch_walltime(&d.utc, 0); } else { when = dispatch_time(DISPATCH_TIME_NOW, 1); // smallest possible non immediate duration from now } // std::cout << "*** timer " << workitem << std::endl; dispatch_after_f(when, dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_HIGH, 0), workitem, _gcd_timer_callback); #elif defined(_WIN32) LARGE_INTEGER li; DWORD slop = 1000; if(workitem->_has_timer_set_relative()) { // Special constant for immediately rescheduled work items if(workitem->_timepoint1 == std::chrono::steady_clock::time_point(std::chrono::steady_clock::duration(1))) { li.QuadPart = -1; // smallest possible non immediate duration from now } else { li.QuadPart = std::chrono::duration_cast(workitem->_timepoint1 - std::chrono::steady_clock::now()).count() / 100; if(li.QuadPart < 0) { li.QuadPart = 0; } if(li.QuadPart / 8 < (int64_t) slop) { slop = (DWORD)(li.QuadPart / 8); } li.QuadPart = -li.QuadPart; // negative is relative } } else if(workitem->_has_timer_set_absolute()) { li = windows_nt_kernel::from_timepoint(workitem->_timepoint2); } else { li.QuadPart = -1; // smallest possible non immediate duration from now } FILETIME ft; ft.dwHighDateTime = (DWORD) li.HighPart; ft.dwLowDateTime = li.LowPart; // std::cout << "*** timer " << workitem << std::endl; SetThreadpoolTimer((PTP_TIMER) workitem->_internaltimerh, &ft, 0, slop); #else _lock_guard gg(workqueue_lock); auto *wq = &workqueue[submit_into_highest_priority ? (workqueue.size() - 1) : parent->_nesting_level]; wq->append_timer(workitem); #endif } else { #if LLFIO_DYNAMIC_THREAD_POOL_GROUP_USING_GCD intptr_t priority = DISPATCH_QUEUE_PRIORITY_LOW; if(workqueue.size() - parent->_nesting_level == 1) { priority = DISPATCH_QUEUE_PRIORITY_HIGH; } else if(workqueue.size() - parent->_nesting_level == 2) { priority = DISPATCH_QUEUE_PRIORITY_DEFAULT; } // std::cout << "*** submit " << workitem << std::endl; dispatch_group_async_f(parent->_grouph, dispatch_get_global_queue(priority, 0), workitem, _gcd_dispatch_callback); #elif defined(_WIN32) // Set the priority of the group according to distance from the top TP_CALLBACK_PRIORITY priority = TP_CALLBACK_PRIORITY_LOW; if(workqueue.size() - parent->_nesting_level == 1) { priority = TP_CALLBACK_PRIORITY_HIGH; } else if(workqueue.size() - parent->_nesting_level == 2) { priority = TP_CALLBACK_PRIORITY_NORMAL; } SetThreadpoolCallbackPriority(parent->_grouph, priority); // std::cout << "*** submit " << workitem << std::endl; SubmitThreadpoolWork((PTP_WORK) workitem->_internalworkh); #else _lock_guard gg(workqueue_lock); auto *wq = &workqueue[submit_into_highest_priority ? (workqueue.size() - 1) : parent->_nesting_level]; // TODO: It would be super nice if we prepended this instead if it came from a timer wq->append_active(workitem); #endif } #if !LLFIO_DYNAMIC_THREAD_POOL_GROUP_USING_GCD && !defined(_WIN32) // Indicate that I can be executed again workitem->_internalworkh = nullptr; #if LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING std::cout << "*** DTP submits work item " << workitem << std::endl; #endif const auto active_work_items = total_submitted_workitems.fetch_add(1, std::memory_order_relaxed) + 1; if(!defer_pool_wake) { g.unlock(); // unlock group { _lock_guard gg(workqueue_lock); // lock global if(threadpool_active.count == 0 && threadpool_sleeping.count == 0) { _add_thread(gg); } else if(threadpool_sleeping.count > 0 && active_work_items > threadpool_active.count) { // Try to wake the most recently slept first auto *t = threadpool_sleeping.back; auto now = std::chrono::steady_clock::now(); for(size_t n = std::min(active_work_items - threadpool_active.count, threadpool_sleeping.count); n > 0; n--) { t->last_did_work = now; // prevent reap t->cond.notify_one(); t = t->_prev; } } } g.lock(); // lock group } #endif } } inline result global_dynamic_thread_pool_impl::submit(_lock_guard &g, dynamic_thread_pool_group_impl *group, span work) noexcept { try { if(work.empty()) { return success(); } for(auto *i : work) { if(i->_parent.load(std::memory_order_relaxed) != nullptr) { return errc::address_in_use; } } auto uninit = make_scope_exit([&]() noexcept { for(auto *i : work) { _remove_from_list(group->_work_items_active, i); #if LLFIO_DYNAMIC_THREAD_POOL_GROUP_USING_GCD i->_internalworkh = nullptr; i->_internaltimerh = nullptr; #elif defined(_WIN32) if(nullptr != i->_internaltimerh) { CloseThreadpoolTimer((PTP_TIMER) i->_internaltimerh); i->_internaltimerh = nullptr; } if(nullptr != i->_internalworkh) { CloseThreadpoolWork((PTP_WORK) i->_internalworkh); i->_internalworkh = nullptr; } #endif i->_parent.store(nullptr, std::memory_order_release); } }); for(auto *i : work) { deadline d(std::chrono::seconds(0)); i->_parent.store(group, std::memory_order_release); i->_nextwork = i->next(d); if(-1 == i->_nextwork) { _append_to_list(group->_work_items_done, i); } else { #if LLFIO_DYNAMIC_THREAD_POOL_GROUP_USING_GCD i->_internalworkh = (void *) (uintptr_t) -1; #elif defined(_WIN32) i->_internalworkh = CreateThreadpoolWork(_win32_worker_thread_callback, i, group->_grouph); if(nullptr == i->_internalworkh) { return win32_error(); } #endif OUTCOME_TRY(_prepare_work_item_delay(i, group->_grouph, d)); _prepend_to_list(group->_work_items_active, i); } } uninit.release(); { for(auto *i : work) { #if !LLFIO_DYNAMIC_THREAD_POOL_GROUP_USING_GCD && !defined(_WIN32) group->_newly_added_active_work_items++; group->_active_work_items_remaining++; #endif _submit_work_item(g, true, i, i != work.back()); } } return success(); } catch(...) { return error_from_exception(); } } inline void global_dynamic_thread_pool_impl::_work_item_done(_lock_guard &g, dynamic_thread_pool_group::work_item *i) noexcept { (void) g; // std::cout << "*** _work_item_done " << i << std::endl; auto *parent = i->_parent.load(std::memory_order_relaxed); _remove_from_list(parent->_work_items_active, i); _append_to_list(parent->_work_items_done, i); #if LLFIO_DYNAMIC_THREAD_POOL_GROUP_USING_GCD i->_internaltimerh = nullptr; i->_internalworkh = nullptr; #elif defined(_WIN32) if(i->_internalworkh_inuse > 0) { i->_internalworkh_inuse = 2; } else { if(i->_internaltimerh != nullptr) { CloseThreadpoolTimer((PTP_TIMER) i->_internaltimerh); i->_internaltimerh = nullptr; } if(i->_internalworkh != nullptr) { CloseThreadpoolWork((PTP_WORK) i->_internalworkh); i->_internalworkh = nullptr; } } #else i->_internaltimerh = nullptr; i->_internalworkh = nullptr; #if LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING std::cout << "*** DTP sets done work item " << i << std::endl; #endif #endif if(parent->_work_items_active.count == 0) { i = nullptr; auto *v = parent->_work_items_done.front, *n = v; for(; v != nullptr; v = n) { v->_parent.store(nullptr, std::memory_order_release); v->_nextwork = -1; n = v->_next; } n = v = parent->_work_items_done.front; parent->_work_items_done.front = parent->_work_items_done.back = nullptr; parent->_work_items_done.count = 0; parent->_stopping.store(false, std::memory_order_release); parent->_stopped.store(true, std::memory_order_release); parent->_completing.store(true, std::memory_order_release); // cause submissions to enter _work_items_delayed #if LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING std::cout << "*** DTP executes group_complete for group " << parent << std::endl; #endif for(; v != nullptr; v = n) { n = v->_next; v->group_complete(parent->_abnormal_completion_cause); } parent->_completing.store(false, std::memory_order_release); // cease submitting to _work_items_delayed #if LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING std::cout << "*** DTP group_complete done for group " << parent << ". _work_items_delayed.count = " << parent->_work_items_delayed.count << std::endl; #endif if(parent->_work_items_delayed.count > 0) { /* If there are waits on this group to complete, forward progress those now. */ while(parent->_waits.load(std::memory_order_relaxed) > 0) { #if LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING std::cout << "*** DTP group_complete blocks on waits for group " << parent << std::endl; #endif g.unlock(); std::this_thread::sleep_for(std::chrono::milliseconds(1)); g.lock(); } // Now submit all delayed work while(parent->_work_items_delayed.count > 0) { i = parent->_work_items_delayed.front; _remove_from_list(parent->_work_items_delayed, i); auto r = submit(g, parent, {&i, 1}); #if LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING std::cout << "*** DTP group_complete submits delayed work item " << i << " for group " << parent << " which saw error "; if(r) { std::cout << "none" << std::endl; } else { std::cout << r.error().message() << std::endl; } #endif if(!r) { parent->_work_items_delayed = {}; (void) stop(g, parent, std::move(r)); break; } } } } } inline result global_dynamic_thread_pool_impl::stop(_lock_guard &g, dynamic_thread_pool_group_impl *group, result err) noexcept { (void) g; if(group->_abnormal_completion_cause) { group->_abnormal_completion_cause = std::move(err); } group->_stopping.store(true, std::memory_order_release); return success(); } inline result global_dynamic_thread_pool_impl::wait(_lock_guard &g, bool reap, dynamic_thread_pool_group_impl *group, deadline d) noexcept { LLFIO_DEADLINE_TO_SLEEP_INIT(d); if(!d || d.nsecs > 0) { /* To ensure forward progress, we need to gate new waits during delayed work submission. Otherwise waits may never exit if the window where _work_items_active.count == 0 is missed. */ while(group->_work_items_delayed.count > 0) { g.unlock(); std::this_thread::sleep_for(std::chrono::milliseconds(1)); g.lock(); } group->_waits.fetch_add(1, std::memory_order_release); auto unwaitcount = make_scope_exit([&]() noexcept { group->_waits.fetch_sub(1, std::memory_order_release); }); #if LLFIO_DYNAMIC_THREAD_POOL_GROUP_USING_GCD while(group->_work_items_active.count > 0) { LLFIO_DEADLINE_TO_TIMEOUT_LOOP(d); dispatch_time_t timeout = DISPATCH_TIME_FOREVER; if(d) { std::chrono::nanoseconds duration; LLFIO_DEADLINE_TO_PARTIAL_TIMEOUT(duration, d); timeout = dispatch_time(DISPATCH_TIME_NOW, duration.count()); } g.unlock(); dispatch_group_wait(group->_grouph, timeout); g.lock(); // if(1 == group->_work_items_active.count) //{ // std::cout << "*** wait item remaining is " << group->_work_items_active.front << std::endl; // std::this_thread::sleep_for(std::chrono::seconds(1)); //} } #elif defined(_WIN32) auto &tls = detail::global_dynamic_thread_pool_thread_local_state(); if(tls.current_callback_instance != nullptr) { // I am being called from within a thread worker. Tell // the thread pool that I am not going to exit promptly. CallbackMayRunLong(tls.current_callback_instance); } // Is this a cancellation? if(group->_stopping.load(std::memory_order_relaxed)) { while(group->_work_items_active.count > 0) { auto *i = group->_work_items_active.front; if(nullptr != i->_internalworkh) { if(0 == i->_internalworkh_inuse) { i->_internalworkh_inuse = 1; } g.unlock(); WaitForThreadpoolWorkCallbacks((PTP_WORK) i->_internalworkh, true); g.lock(); if(i->_internalworkh_inuse == 2) { if(nullptr != i->_internalworkh) { CloseThreadpoolWork((PTP_WORK) i->_internalworkh); i->_internalworkh = nullptr; } if(nullptr != i->_internaltimerh) { CloseThreadpoolTimer((PTP_TIMER) i->_internaltimerh); i->_internaltimerh = nullptr; } } i->_internalworkh_inuse = 0; } if(nullptr != i->_internaltimerh) { if(0 == i->_internalworkh_inuse) { i->_internalworkh_inuse = 1; } g.unlock(); WaitForThreadpoolTimerCallbacks((PTP_TIMER) i->_internaltimerh, true); g.lock(); if(i->_internalworkh_inuse == 2) { if(nullptr != i->_internalworkh) { CloseThreadpoolWork((PTP_WORK) i->_internalworkh); i->_internalworkh = nullptr; } if(nullptr != i->_internaltimerh) { CloseThreadpoolTimer((PTP_TIMER) i->_internaltimerh); i->_internaltimerh = nullptr; } } i->_internalworkh_inuse = 0; } if(group->_work_items_active.count > 0 && group->_work_items_active.front == i) { // This item got cancelled before it started _work_item_done(g, group->_work_items_active.front); } } assert(!group->_stopping.load(std::memory_order_relaxed)); } else if(!d) { while(group->_work_items_active.count > 0) { auto *i = group->_work_items_active.front; if(nullptr != i->_internalworkh) { if(0 == i->_internalworkh_inuse) { i->_internalworkh_inuse = 1; } g.unlock(); WaitForThreadpoolWorkCallbacks((PTP_WORK) i->_internalworkh, false); g.lock(); if(i->_internalworkh_inuse == 2) { if(nullptr != i->_internalworkh) { CloseThreadpoolWork((PTP_WORK) i->_internalworkh); i->_internalworkh = nullptr; } if(nullptr != i->_internaltimerh) { CloseThreadpoolTimer((PTP_TIMER) i->_internaltimerh); i->_internaltimerh = nullptr; } } i->_internalworkh_inuse = 0; } if(nullptr != i->_internaltimerh) { if(0 == i->_internalworkh_inuse) { i->_internalworkh_inuse = 1; } g.unlock(); WaitForThreadpoolTimerCallbacks((PTP_TIMER) i->_internaltimerh, false); g.lock(); if(i->_internalworkh_inuse == 2) { if(nullptr != i->_internalworkh) { CloseThreadpoolWork((PTP_WORK) i->_internalworkh); i->_internalworkh = nullptr; } if(nullptr != i->_internaltimerh) { CloseThreadpoolTimer((PTP_TIMER) i->_internaltimerh); i->_internaltimerh = nullptr; } } i->_internalworkh_inuse = 0; } } } else { while(group->_work_items_active.count > 0) { LLFIO_DEADLINE_TO_TIMEOUT_LOOP(d); g.unlock(); std::this_thread::sleep_for(std::chrono::milliseconds(1)); g.lock(); } } #else #if 0 if(group->_stopping.load(std::memory_order_relaxed)) { // Kill all work items not currently being executed immediately for(bool done = false; !done;) { done = true; for(auto *p = group->_work_items_active.front; p != nullptr; p = p->_next) { if(p->_internalworkh == nullptr) { _remove_from_list(group->_work_items_active, p); _append_to_list(group->_work_items_done, p); done = false; break; } } } } #endif while(group->_work_items_active.count > 0) { LLFIO_DEADLINE_TO_TIMEOUT_LOOP(d); g.unlock(); std::this_thread::sleep_for(std::chrono::milliseconds(1)); g.lock(); } #endif } if(group->_work_items_active.count > 0) { return errc::timed_out; } if(reap) { return std::move(group->_abnormal_completion_cause); } return success(); } inline void global_dynamic_thread_pool_impl::_timerthread(dynamic_thread_pool_group::work_item *workitem, threadh_type /*unused*/) { LLFIO_LOG_FUNCTION_CALL(this); assert(workitem->_nextwork != -1); assert(workitem->_has_timer_set()); auto *parent = workitem->_parent.load(std::memory_order_relaxed); _lock_guard g(parent->_lock); // lock group // std::cout << "*** _timerthread " << workitem << std::endl; if(parent->_stopping.load(std::memory_order_relaxed)) { _work_item_done(g, workitem); return; } if(workitem->_has_timer_set_relative()) { #if LLFIO_DYNAMIC_THREAD_POOL_GROUP_USING_GCD auto now = std::chrono::steady_clock::now(); if(workitem->_timepoint1 - now > std::chrono::seconds(0)) { // Timer fired short, so schedule it again _submit_work_item(g, false, workitem, false); return; } #endif workitem->_timepoint1 = {}; } if(workitem->_has_timer_set_absolute()) { #if LLFIO_DYNAMIC_THREAD_POOL_GROUP_USING_GCD auto now = std::chrono::system_clock::now(); if(workitem->_timepoint2 - now > std::chrono::seconds(0)) { // Timer fired short, so schedule it again _submit_work_item(g, false, workitem, false); return; } #endif workitem->_timepoint2 = {}; } assert(!workitem->_has_timer_set()); if(workitem->_nextwork == 0) { deadline d(std::chrono::seconds(0)); workitem->_nextwork = workitem->next(d); auto r2 = _prepare_work_item_delay(workitem, parent->_grouph, d); if(!r2) { (void) stop(g, parent, std::move(r2)); _work_item_done(g, workitem); return; } if(-1 == workitem->_nextwork) { _work_item_done(g, workitem); return; } _submit_work_item(g, false, workitem, false); return; } _submit_work_item(g, false, workitem, false); } // Worker thread entry point inline void global_dynamic_thread_pool_impl::_workerthread(dynamic_thread_pool_group::work_item *workitem, threadh_type selfthreadh) { LLFIO_LOG_FUNCTION_CALL(this); //{ // _lock_guard g(parent->_lock); // std::cout << "*** _workerthread " << workitem << " begins with work " << workitem->_nextwork << std::endl; //} assert(workitem->_nextwork != -1); assert(workitem->_nextwork != 0); auto *parent = workitem->_parent.load(std::memory_order_relaxed); if(parent->_stopping.load(std::memory_order_relaxed)) { _lock_guard g(parent->_lock); // lock group _work_item_done(g, workitem); return; } auto &tls = detail::global_dynamic_thread_pool_thread_local_state(); auto old_thread_local_state = tls; tls.workitem = workitem; tls.current_callback_instance = selfthreadh; tls.nesting_level = parent->_nesting_level + 1; auto r = (*workitem)(workitem->_nextwork); workitem->_nextwork = 0; // call next() next time tls = old_thread_local_state; _lock_guard g(parent->_lock); // lock group // std::cout << "*** _workerthread " << workitem << " ends with work " << workitem->_nextwork << std::endl; if(!r) { (void) stop(g, parent, std::move(r)); _work_item_done(g, workitem); workitem = nullptr; } else if(parent->_stopping.load(std::memory_order_relaxed)) { _work_item_done(g, workitem); } else { deadline d(std::chrono::seconds(0)); workitem->_nextwork = workitem->next(d); auto r2 = _prepare_work_item_delay(workitem, parent->_grouph, d); if(!r2) { (void) stop(g, parent, std::move(r2)); _work_item_done(g, workitem); return; } if(-1 == workitem->_nextwork) { _work_item_done(g, workitem); return; } _submit_work_item(g, false, workitem, false); } } } // namespace detail /****************************************** io_aware_work_item *********************************************/ LLFIO_HEADERS_ONLY_MEMFUNC_SPEC dynamic_thread_pool_group::io_aware_work_item::io_aware_work_item(span hs) : _handles([](span hs) -> span { float all = 0; for(auto &i : hs) { all += i.reads + i.writes + i.barriers; } for(auto &i : hs) { if(all == 0.0f) { i.reads = i.writes = 0.5f; i.barriers = 0.0f; } else { i.reads /= all; i.writes /= all; i.barriers /= all; } } auto &impl = detail::global_dynamic_thread_pool(); detail::global_dynamic_thread_pool_impl::_lock_guard g(impl.io_aware_work_item_handles_lock); for(auto &h : hs) { if(!h.h->is_seekable()) { throw std::runtime_error("Supplied handle is not seekable"); } auto *fh = static_cast(h.h); auto unique_id = fh->unique_id(); auto it = impl.io_aware_work_item_handles.find(unique_id); if(it == impl.io_aware_work_item_handles.end()) { it = impl.io_aware_work_item_handles.emplace(unique_id, detail::global_dynamic_thread_pool_impl::io_aware_work_item_statfs{}).first; auto r = it->second.statfs.fill(*fh, statfs_t::want::iosinprogress | statfs_t::want::iosbusytime); if(!r || it->second.statfs.f_iosinprogress == (uint32_t) -1) { impl.io_aware_work_item_handles.erase(it); if(!r) { r.value(); } throw std::runtime_error("statfs::f_iosinprogress unavailable for supplied handle"); } it->second.last_updated = std::chrono::steady_clock::now(); } it->second.refcount++; h._internal = &*it; } return hs; }(hs)) { LLFIO_LOG_FUNCTION_CALL(this); } LLFIO_HEADERS_ONLY_MEMFUNC_SPEC dynamic_thread_pool_group::io_aware_work_item::~io_aware_work_item() { LLFIO_LOG_FUNCTION_CALL(this); auto &impl = detail::global_dynamic_thread_pool(); detail::global_dynamic_thread_pool_impl::_lock_guard g(impl.io_aware_work_item_handles_lock); using value_type = decltype(impl.io_aware_work_item_handles)::value_type; for(auto &h : _handles) { auto *i = (value_type *) h._internal; if(0 == --i->second.refcount) { impl.io_aware_work_item_handles.erase(i->first); } } } LLFIO_HEADERS_ONLY_MEMFUNC_SPEC intptr_t dynamic_thread_pool_group::io_aware_work_item::next(deadline &d) noexcept { LLFIO_LOG_FUNCTION_CALL(this); { auto &impl = detail::global_dynamic_thread_pool(); auto now = std::chrono::steady_clock::now(); detail::global_dynamic_thread_pool_impl::_lock_guard g(impl.io_aware_work_item_handles_lock); using value_type = decltype(impl.io_aware_work_item_handles)::value_type; for(auto &h : _handles) { auto *i = (value_type *) h._internal; if(std::chrono::duration_cast(now - i->second.last_updated) >= std::chrono::milliseconds(100)) { // auto old_iosinprogress = i->second.statfs.f_iosinprogress; auto elapsed = now - i->second.last_updated; (void) i->second.statfs.fill(*h.h, statfs_t::want::iosinprogress | statfs_t::want::iosbusytime); i->second.last_updated = now; if(elapsed > std::chrono::seconds(5)) { i->second.average_busy = i->second.statfs.f_iosbusytime; i->second.average_queuedepth = (float) i->second.statfs.f_iosinprogress; } else { i->second.average_busy = (i->second.average_busy * 0.9f) + (i->second.statfs.f_iosbusytime * 0.1f); i->second.average_queuedepth = (i->second.average_queuedepth * 0.9f) + (i->second.statfs.f_iosinprogress * 0.1f); } if(i->second.average_busy < this->max_iosbusytime && i->second.average_queuedepth < this->min_iosinprogress) { i->second.default_deadline = std::chrono::seconds(0); // remove pacing } else if(i->second.average_queuedepth > this->max_iosinprogress) { if(0 == i->second.default_deadline.nsecs) { i->second.default_deadline = std::chrono::milliseconds(1); // start with 1ms, it'll reduce from there if needed } else if((i->second.default_deadline.nsecs >> 4) > 0) { i->second.default_deadline.nsecs += i->second.default_deadline.nsecs >> 4; } else { i->second.default_deadline.nsecs++; } } else if(i->second.average_queuedepth < this->min_iosinprogress) { if(i->second.default_deadline.nsecs > (i->second.default_deadline.nsecs >> 4) && (i->second.default_deadline.nsecs >> 4) > 0) { i->second.default_deadline.nsecs -= i->second.default_deadline.nsecs >> 4; } else if(i->second.default_deadline.nsecs > 1) { i->second.default_deadline.nsecs--; } } } if(d.nsecs < i->second.default_deadline.nsecs) { d = i->second.default_deadline; } } } return io_aware_next(d); } LLFIO_V2_NAMESPACE_END