From eef0d398dcfd3b4068d6d330d0b0287102cd26c5 Mon Sep 17 00:00:00 2001 From: "Niall Douglas (s [underscore] sourceforge {at} nedprod [dot] com)" Date: Fri, 22 Jan 2021 12:05:05 +0000 Subject: Dynamic scaling within native Linux implementation of dynamic_thread_pool_group is somewhat working now, but it's not working quite right yet. --- include/llfio/revision.hpp | 6 +- .../v2.0/detail/impl/dynamic_thread_pool_group.ipp | 406 +++++++++++++++------ .../v2.0/detail/impl/posix/directory_handle.ipp | 57 ++- 3 files changed, 333 insertions(+), 136 deletions(-) diff --git a/include/llfio/revision.hpp b/include/llfio/revision.hpp index 9e7774c5..3bf437f0 100644 --- a/include/llfio/revision.hpp +++ b/include/llfio/revision.hpp @@ -1,4 +1,4 @@ // Note the second line of this file must ALWAYS be the git SHA, third line ALWAYS the git SHA update time -#define LLFIO_PREVIOUS_COMMIT_REF ca8673c41b5a87fb834f9365c3dfbc6b668cd260 -#define LLFIO_PREVIOUS_COMMIT_DATE "2021-01-15 11:26:17 +00:00" -#define LLFIO_PREVIOUS_COMMIT_UNIQUE ca8673c4 +#define LLFIO_PREVIOUS_COMMIT_REF 0d62d125a2ee2404976155f104dbe109430ae0ba +#define LLFIO_PREVIOUS_COMMIT_DATE "2021-01-19 15:43:28 +00:00" +#define LLFIO_PREVIOUS_COMMIT_UNIQUE 0d62d125 diff --git a/include/llfio/v2.0/detail/impl/dynamic_thread_pool_group.ipp b/include/llfio/v2.0/detail/impl/dynamic_thread_pool_group.ipp index e5c7fbb9..bc9b540c 100644 --- a/include/llfio/v2.0/detail/impl/dynamic_thread_pool_group.ipp +++ b/include/llfio/v2.0/detail/impl/dynamic_thread_pool_group.ipp @@ -163,9 +163,10 @@ namespace detail size_t count{0}; threadmetrics_item *front{nullptr}, *back{nullptr}; uint32_t blocked{0}, running{0}; - } threadmetrics_queue; // items at end are least recently updated - std::vector threadmetrics_sorted; + } threadmetrics_queue; // items at front are least recently updated + std::vector threadmetrics_sorted; // sorted by threadid std::chrono::steady_clock::time_point threadmetrics_last_updated; + std::atomic update_threadmetrics_reentrancy{false}; #ifdef __linux__ int proc_self_task_fd{-1}; #endif @@ -239,20 +240,236 @@ namespace detail } #if !LLFIO_DYNAMIC_THREAD_POOL_GROUP_USING_GCD && !defined(_WIN32) + inline void _execute_work(thread_t *self); + + void _add_thread(_lock_guard & /*unused*/) + { + thread_t *p = nullptr; + try + { + p = new thread_t; + _append_to_list(threadpool_active, p); + p->thread = std::thread([this, p] { _execute_work(p); }); + } + catch(...) + { + if(p != nullptr) + { + _remove_from_list(threadpool_active, p); + } + // drop failure + } + } + + bool _remove_thread(_lock_guard &g, threads_t &which) + { + if(which.count == 0) + { + return false; + } + // Threads which went to sleep the longest ago are at the front + auto *t = which.front; + if(t->state < 0) + { + // He's already exiting + return false; + } + assert(t->state == 0); + t->state--; +#if LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING + std::cout << "*** DTP " << t << " is told to quit" << std::endl; +#endif + do + { + g.unlock(); + t->cond.notify_one(); + g.lock(); + } while(t->state >= -1); +#if LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING + std::cout << "*** DTP " << t << " has quit, deleting" << std::endl; +#endif + _remove_from_list(threadpool_active, t); + t->thread.join(); + delete t; + return true; + } + + ~global_dynamic_thread_pool_impl() + { + { + _lock_guard g(workqueue_lock); // lock global + while(threadpool_active.count > 0 || threadpool_sleeping.count > 0) + { + while(threadpool_sleeping.count > 0) + { + auto removed = _remove_thread(g, threadpool_sleeping); + assert(removed); + (void) removed; + } + if(threadpool_active.count > 0) + { + auto removed = _remove_thread(g, threadpool_active); + assert(removed); + (void) removed; + } + } + } + _lock_guard g(threadmetrics_lock); + for(auto *p : threadmetrics_sorted) + { + delete p; + } + threadmetrics_sorted.clear(); + threadmetrics_queue = {}; +#ifdef __linux__ + if(proc_self_task_fd > 0) + { + ::close(proc_self_task_fd); + proc_self_task_fd = -1; + } +#endif + } + #ifdef __linux__ - void populate_threadmetrics(std::chrono::steady_clock::time_point now) + bool update_threadmetrics(_lock_guard &&g, std::chrono::steady_clock::time_point now, threadmetrics_item *new_items) + { + auto update_item = [&](threadmetrics_item *item) { + char path[64] = "/proc/self/task/", *pend = path + 16, *tend = item->threadid.text; + while(*tend == '0' && (tend - item->threadid.text) < (ssize_t) sizeof(item->threadid.text)) + { + ++tend; + } + while((tend - item->threadid.text) < (ssize_t) sizeof(item->threadid.text)) + { + *pend++ = *tend++; + } + memcpy(pend, "/stat", 6); + int fd = ::open(path, O_RDONLY); + if(-1 == fd) + { + // Thread may have exited since we last populated + if(item->blocked_since == std::chrono::steady_clock::time_point()) + { + threadmetrics_queue.running--; + threadmetrics_queue.blocked++; + } + item->blocked_since = now; + item->last_updated = now; + return; + } + char buffer[1024]; + auto bytesread = ::read(fd, buffer, sizeof(buffer)); + ::close(fd); + buffer[std::max((size_t) bytesread, sizeof(buffer) - 1)] = 0; + char state = 0; + unsigned long majflt = 0, utime = 0, stime = 0; + sscanf(buffer, "%*d %*s %c %*d %*d %*d %*d %*d %*u %*u %*u %lu %*u %lu %lu", &state, &majflt, &utime, &stime); + if(item->utime != (uint32_t) -1 || item->stime != (uint32_t) -1) + { + if(item->utime == (uint32_t) utime && item->stime == (uint32_t) stime && state != 'R') + { + // This thread made no progress since last time + if(item->blocked_since == std::chrono::steady_clock::time_point()) + { + threadmetrics_queue.running--; + threadmetrics_queue.blocked++; + item->blocked_since = now; + } + } + else + { + if(item->blocked_since != std::chrono::steady_clock::time_point()) + { + threadmetrics_queue.running++; + threadmetrics_queue.blocked--; + item->blocked_since = std::chrono::steady_clock::time_point(); + } + } + } + std::cout << "Threadmetrics " << path << " " << state << " " << majflt << " " << utime << " " << stime << ". Previously " << item->diskfaults << " " + << item->utime << " " << item->stime << std::endl; + item->diskfaults = (uint32_t) majflt; + item->utime = (uint32_t) utime; + item->stime = (uint32_t) stime; + item->last_updated = now; + }; + if(new_items != nullptr) + { + for(; new_items != nullptr; new_items = new_items->_next) + { + update_item(new_items); + } + return false; + } + if(threadmetrics_queue.count == 0) + { + return false; + } + size_t updated = 0; + while(updated++ < 10 && now - threadmetrics_queue.front->last_updated >= std::chrono::milliseconds(100)) + { + auto *p = threadmetrics_queue.front; + update_item(p); + _remove_from_list(threadmetrics_queue, p); + _append_to_list(threadmetrics_queue, p); + } + static const auto min_hardware_concurrency = std::thread::hardware_concurrency(); + static const auto max_hardware_concurrency = min_hardware_concurrency + (min_hardware_concurrency >> 1); + auto toadd = std::max((ssize_t) 0, std::min((ssize_t) min_hardware_concurrency - (ssize_t) threadmetrics_queue.running, + (ssize_t) total_submitted_workitems.load(std::memory_order_relaxed) - + (ssize_t) threadpool_threads.load(std::memory_order_relaxed))); + auto toremove = std::max((ssize_t) 0, (ssize_t) threadmetrics_queue.running - (ssize_t) max_hardware_concurrency); + // std::cout << "Threadmetrics toadd = " << (toadd - (ssize_t) threadpool_sleeping.count) << " toremove = " << toremove + // << " running = " << threadmetrics_queue.running << " blocked = " << threadmetrics_queue.blocked << " total = " << threadmetrics_queue.count + // << ". Actual active = " << threadpool_active.count << " sleeping = " << threadpool_sleeping.count + // << ". Current working threads = " << threadpool_threads.load(std::memory_order_relaxed) + // << ". Current submitted work items = " << total_submitted_workitems.load(std::memory_order_relaxed) << std::endl; + if(toadd > 0 || toremove > 0) + { + if(update_threadmetrics_reentrancy.exchange(true, std::memory_order_relaxed)) + { + return false; + } + auto unupdate_threadmetrics_reentrancy = + make_scope_exit([this]() noexcept { update_threadmetrics_reentrancy.store(false, std::memory_order_relaxed); }); + g.unlock(); + _lock_guard gg(workqueue_lock); + toadd -= (ssize_t) threadpool_sleeping.count; + for(; toadd > 0; toadd--) + { + _add_thread(gg); + } + for(; toremove > 0 && threadpool_sleeping.count > 0; toremove--) + { + if(!_remove_thread(gg, threadpool_sleeping)) + { + break; + } + } + if(toremove > 0 && threadpool_active.count > 1) + { + // Kill myself, but not if I'm the final thread who needs to run timers + return true; + } + return false; + } + return false; + } + // Returns true if calling thread is to exit + bool populate_threadmetrics(std::chrono::steady_clock::time_point now) { static thread_local std::vector kernelbuffer(1024); static thread_local std::vector threadidsbuffer(1024 / sizeof(dirent)); using getdents64_t = int (*)(int, char *, unsigned int); static auto getdents = static_cast([](int fd, char *buf, unsigned count) -> int { return syscall(SYS_getdents64, fd, buf, count); }); using dirent = dirent64; - int bytes = 0; + size_t bytes = 0; { _lock_guard g(threadmetrics_lock); - if(now - threadmetrics_last_updated < std::chrono::milliseconds(100)) + if(now - threadmetrics_last_updated < std::chrono::milliseconds(100) && + threadmetrics_queue.running + threadmetrics_queue.blocked >= threadpool_threads.load(std::memory_order_relaxed)) { - return; + return update_threadmetrics(std::move(g), now, nullptr); } if(proc_self_task_fd < 0) { @@ -262,18 +479,33 @@ namespace detail posix_error().throw_exception(); } } - for(;;) + for(auto done = false; !done;) { if(-1 == ::lseek64(proc_self_task_fd, 0, SEEK_SET)) { posix_error().throw_exception(); } - bytes = getdents(proc_self_task_fd, kernelbuffer.data(), kernelbuffer.size()); - if(bytes < (int) kernelbuffer.size()) + bytes = 0; + for(;;) { - break; + int _bytes = getdents(proc_self_task_fd, kernelbuffer.data() + bytes, kernelbuffer.size() - bytes); + // std::cout << "getdents(" << (kernelbuffer.size()-bytes) << ") returns " << _bytes << std::endl; + if(_bytes == 0) + { + done = true; + break; + } + if(_bytes == -1 && errno == EINVAL) + { + kernelbuffer.resize(kernelbuffer.size() << 1); + continue; + } + if(_bytes < 0) + { + posix_error().throw_exception(); + } + bytes += _bytes; } - kernelbuffer.resize(kernelbuffer.size() << 1); } } threadidsbuffer.clear(); @@ -289,12 +521,33 @@ namespace detail break; } } + std::cout << "Parsed from /proc " << threadidsbuffer.size() << " entries." << std::endl; std::sort(threadidsbuffer.begin(), threadidsbuffer.end()); + threadmetrics_item *firstnewitem = nullptr; _lock_guard g(threadmetrics_lock); +#if 0 + { + auto d_it = threadmetrics_sorted.begin(); + auto s_it = threadidsbuffer.begin(); + for(; d_it != threadmetrics_sorted.end() && s_it != threadidsbuffer.end(); ++d_it, ++s_it) + { + std::cout << (*d_it)->threadid_name() << " " << string_view(s_it->text, 12) << "\n"; + } + for(; d_it != threadmetrics_sorted.end(); ++d_it) + { + std::cout << (*d_it)->threadid_name() << " XXXXXXXXXXXX\n"; + } + for(; s_it != threadidsbuffer.end(); ++s_it) + { + std::cout << "XXXXXXXXXXXX " << string_view(s_it->text, 12) << "\n"; + } + std::cout << std::flush; + } +#endif auto d_it = threadmetrics_sorted.begin(); auto s_it = threadidsbuffer.begin(); auto remove_item = [&] { - std::cout << "Removing thread " << (*d_it)->threadid_name() << std::endl; + // std::cout << "Removing thread metrics for " << (*d_it)->threadid_name() << std::endl; if((*d_it)->blocked_since != std::chrono::steady_clock::time_point()) { threadmetrics_queue.blocked--; @@ -307,14 +560,14 @@ namespace detail d_it = threadmetrics_sorted.erase(d_it); }; auto add_item = [&] { - auto p = std::make_unique(*s_it++); - if(d_it != threadmetrics_sorted.end()) - { - ++d_it; - } + auto p = std::make_unique(*s_it); d_it = threadmetrics_sorted.insert(d_it, p.get()); _append_to_list(threadmetrics_queue, p.get()); - std::cout << "Adding thread " << p->threadid_name() << std::endl; + // std::cout << "Adding thread metrics for " << p->threadid_name() << std::endl; + if(firstnewitem == nullptr) + { + firstnewitem = p.get(); + } p.release(); threadmetrics_queue.running++; }; @@ -346,106 +599,28 @@ namespace detail while(s_it != threadidsbuffer.end()) { add_item(); + ++d_it; + ++s_it; } -#if 0 - std::cout << "Threadmetrics:"; - for(auto *p : threadmetrics_sorted) - { - std::cout << "\n " << p->threadid_name(); - } - std::cout << std::endl; -#endif assert(threadmetrics_sorted.size() == threadidsbuffer.size()); - assert(std::is_sorted(threadmetrics_sorted.begin(), threadmetrics_sorted.end(), - [](threadmetrics_item *a, threadmetrics_item *b) { return a->threadid < b->threadid; })); - threadmetrics_last_updated = now; - } -#endif - - inline void _execute_work(thread_t *self); - - void _add_thread(_lock_guard & /*unused*/) - { - thread_t *p = nullptr; - try - { - p = new thread_t; - _append_to_list(threadpool_active, p); - p->thread = std::thread([this, p] { _execute_work(p); }); - } - catch(...) +#if 1 + if(!std::is_sorted(threadmetrics_sorted.begin(), threadmetrics_sorted.end(), + [](threadmetrics_item *a, threadmetrics_item *b) { return a->threadid < b->threadid; })) { - if(p != nullptr) + std::cout << "Threadmetrics:"; + for(auto *p : threadmetrics_sorted) { - _remove_from_list(threadpool_active, p); + std::cout << "\n " << p->threadid_name(); } - // drop failure - } - } - - bool _remove_thread(_lock_guard &g, threads_t &which) - { - if(which.count == 0) - { - return false; + std::cout << std::endl; + abort(); } - // Threads which went to sleep the longest ago are at the front - auto *t = which.front; - assert(t->state == 0); - t->state--; -#if LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING - std::cout << "*** DTP " << t << " is told to quit" << std::endl; -#endif - do - { - g.unlock(); - t->cond.notify_one(); - g.lock(); - } while(t->state >= -1); -#if LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING - std::cout << "*** DTP " << t << " has quit, deleting" << std::endl; #endif - _remove_from_list(threadpool_active, t); - t->thread.join(); - delete t; - return true; + assert(threadmetrics_queue.running + threadmetrics_queue.blocked == threadidsbuffer.size()); + threadmetrics_last_updated = now; + return update_threadmetrics(std::move(g), now, firstnewitem); } - - ~global_dynamic_thread_pool_impl() - { - { - _lock_guard g(workqueue_lock); // lock global - while(threadpool_active.count > 0 || threadpool_sleeping.count > 0) - { - while(threadpool_sleeping.count > 0) - { - auto removed = _remove_thread(g, threadpool_sleeping); - assert(removed); - (void) removed; - } - if(threadpool_active.count > 0) - { - auto removed = _remove_thread(g, threadpool_active); - assert(removed); - (void) removed; - } - } - } - _lock_guard g(threadmetrics_lock); - for(auto *p : threadmetrics_sorted) - { - delete p; - } - threadmetrics_sorted.clear(); - threadmetrics_queue = {}; -#ifdef __linux__ - if(proc_self_task_fd > 0) - { - ::close(proc_self_task_fd); - proc_self_task_fd = -1; - } #endif - } #endif result _prepare_work_item_delay(dynamic_thread_pool_group::work_item *workitem, grouph_type grouph, deadline d) @@ -890,6 +1065,7 @@ namespace detail else if(now - self->last_did_work >= std::chrono::minutes(1)) { _remove_from_list(threadpool_active, self); + threadpool_threads.fetch_sub(1, std::memory_order_release); self->thread.detach(); delete self; return; @@ -941,7 +1117,14 @@ namespace detail // workitem->_internalworkh should be null, however workitem may also no longer exist try { - populate_threadmetrics(now); + if(populate_threadmetrics(now)) + { + _remove_from_list(threadpool_active, self); + threadpool_threads.fetch_sub(1, std::memory_order_release); + self->thread.detach(); + delete self; + return; + } } catch(...) { @@ -1076,9 +1259,6 @@ namespace detail if(threadpool_active.count == 0 && threadpool_sleeping.count == 0) { _add_thread(gg); - _add_thread(gg); - _add_thread(gg); - _add_thread(gg); } else if(threadpool_sleeping.count > 0 && active_work_items > threadpool_active.count) { diff --git a/include/llfio/v2.0/detail/impl/posix/directory_handle.ipp b/include/llfio/v2.0/detail/impl/posix/directory_handle.ipp index 9ea737c0..bd0050d6 100644 --- a/include/llfio/v2.0/detail/impl/posix/directory_handle.ipp +++ b/include/llfio/v2.0/detail/impl/posix/directory_handle.ipp @@ -39,7 +39,8 @@ http://www.boost.org/LICENSE_1_0.txt) LLFIO_V2_NAMESPACE_BEGIN -result directory_handle::directory(const path_handle &base, path_view_type path, mode _mode, creation _creation, caching _caching, flag flags) noexcept +result directory_handle::directory(const path_handle &base, path_view_type path, mode _mode, creation _creation, caching _caching, + flag flags) noexcept { if(flags & flag::unlink_on_first_close) { @@ -314,9 +315,12 @@ result directory_handle::read(io_request((static_cast(s.st_blocks) * 512) < static_cast(s.st_size)); + req.buffers[0].stat.st_sparse = + static_cast((static_cast(s.st_blocks) * 512) < static_cast(s.st_size)); req.buffers._resize(1); - static constexpr stat_t::want default_stat_contents = stat_t::want::dev | stat_t::want::ino | stat_t::want::type | stat_t::want::perms | stat_t::want::nlink | stat_t::want::uid | stat_t::want::gid | stat_t::want::rdev | stat_t::want::atim | stat_t::want::mtim | stat_t::want::ctim | stat_t::want::size | + static constexpr stat_t::want default_stat_contents = stat_t::want::dev | stat_t::want::ino | stat_t::want::type | stat_t::want::perms | + stat_t::want::nlink | stat_t::want::uid | stat_t::want::gid | stat_t::want::rdev | + stat_t::want::atim | stat_t::want::mtim | stat_t::want::ctim | stat_t::want::size | stat_t::want::allocated | stat_t::want::blocks | stat_t::want::blksize #ifdef HAVE_STAT_FLAGS | stat_t::want::flags @@ -360,8 +364,7 @@ result directory_handle::read(io_request directory_handle::read(io_request(buffer), bytesavailable); - if(req.kernelbuffer.empty() && bytes == -1 && EINVAL == errno) + bytes = 0; + int _bytes; + do { - size_t toallocate = req.buffers._kernel_buffer_size * 2; - auto *mem = (char *) operator new[](toallocate, std::nothrow); // don't initialise - if(mem == nullptr) + assert(bytes <= bytesavailable); + _bytes = getdents(_v.fd, reinterpret_cast(buffer) + bytes, bytesavailable - bytes); + if(_bytes == 0) { - return errc::not_enough_memory; + done = true; + break; } - req.buffers._kernel_buffer.reset(); - req.buffers._kernel_buffer = std::unique_ptr(mem); - req.buffers._kernel_buffer_size = toallocate; - } - else - { - if(bytes == -1) + if(req.kernelbuffer.empty() && _bytes == -1 && EINVAL == errno) + { + size_t toallocate = req.buffers._kernel_buffer_size * 2; + auto *mem = (char *) operator new[](toallocate, std::nothrow); // don't initialise + if(mem == nullptr) + { + return errc::not_enough_memory; + } + req.buffers._kernel_buffer.reset(); + req.buffers._kernel_buffer = std::unique_ptr(mem); + req.buffers._kernel_buffer_size = toallocate; + // We need to reset and do the whole thing against to ensure single shot atomicity + break; + } + else if(_bytes == -1) { return posix_error(); } - done = true; - } + else + { + assert(_bytes > 0); + bytes += _bytes; + } + } while(!done); } while(!done); if(bytes == 0) { -- cgit v1.2.3