From 82fcea61c21b31ce325dc47401808dd45d4ef42c Mon Sep 17 00:00:00 2001 From: "Niall Douglas (s [underscore] sourceforge {at} nedprod [dot] com)" Date: Tue, 22 Dec 2020 19:34:56 +0000 Subject: wip dynamic_thread_pool_group, so far has Windows support only. Note that LLFIO now hard requires Windows 7 rather than Vista. --- CMakeLists.txt | 3 +- cmake/headers.cmake | 2 + cmake/tests.cmake | 1 + include/llfio/revision.hpp | 6 +- .../v2.0/detail/impl/dynamic_thread_pool_group.ipp | 728 +++++++++++++++++++++ include/llfio/v2.0/dynamic_thread_pool_group.hpp | 311 +++++++++ include/llfio/v2.0/llfio.hpp | 2 +- include/llfio/v2.0/logging.hpp | 31 +- include/llfio/v2.0/status_code.hpp | 6 +- 9 files changed, 1081 insertions(+), 9 deletions(-) create mode 100644 include/llfio/v2.0/detail/impl/dynamic_thread_pool_group.ipp create mode 100644 include/llfio/v2.0/dynamic_thread_pool_group.hpp diff --git a/CMakeLists.txt b/CMakeLists.txt index dec1ade8..ccf2387d 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -293,7 +293,8 @@ if(LLFIO_USE_EXPERIMENTAL_SG14_STATUS_CODE) all_compile_definitions(PUBLIC LLFIO_EXPERIMENTAL_STATUS_CODE=1) endif() if(WIN32) - all_compile_definitions(PRIVATE _WIN32_WINNT=0x601) ## Target Win7 + all_compile_definitions(PRIVATE _WIN32_WINNT=0x601) ## Target Win7 + target_compile_definitions(llfio_hl INTERFACE _WIN32_WINNT=0x601) ## Target Win7 if(NOT LLFIO_USE_EXPERIMENTAL_SG14_STATUS_CODE) target_link_libraries(llfio_hl INTERFACE ntkernel-error-category::hl) target_link_libraries(llfio_dl PUBLIC ntkernel-error-category::dl) diff --git a/cmake/headers.cmake b/cmake/headers.cmake index af4511e6..88c7deb9 100644 --- a/cmake/headers.cmake +++ b/cmake/headers.cmake @@ -29,6 +29,7 @@ set(llfio_HEADERS "include/llfio/v2.0/detail/impl/cached_parent_handle_adapter.ipp" "include/llfio/v2.0/detail/impl/clone.ipp" "include/llfio/v2.0/detail/impl/config.ipp" + "include/llfio/v2.0/detail/impl/dynamic_thread_pool_group.ipp" "include/llfio/v2.0/detail/impl/fast_random_file_handle.ipp" "include/llfio/v2.0/detail/impl/io_multiplexer.ipp" "include/llfio/v2.0/detail/impl/path_discovery.ipp" @@ -77,6 +78,7 @@ set(llfio_HEADERS "include/llfio/v2.0/detail/impl/windows/test/iocp_multiplexer.ipp" "include/llfio/v2.0/detail/impl/windows/utils.ipp" "include/llfio/v2.0/directory_handle.hpp" + "include/llfio/v2.0/dynamic_thread_pool_group.hpp" "include/llfio/v2.0/fast_random_file_handle.hpp" "include/llfio/v2.0/file_handle.hpp" "include/llfio/v2.0/fs_handle.hpp" diff --git a/cmake/tests.cmake b/cmake/tests.cmake index 47e3ba9e..de7d54d4 100644 --- a/cmake/tests.cmake +++ b/cmake/tests.cmake @@ -7,6 +7,7 @@ set(llfio_TESTS "test/tests/directory_handle_create_close/runner.cpp" "test/tests/directory_handle_enumerate/kernel_directory_handle_enumerate.cpp.hpp" "test/tests/directory_handle_enumerate/runner.cpp" + "test/tests/dynamic_thread_pool_group.cpp" "test/tests/fast_random_file_handle.cpp" "test/tests/file_handle_create_close/kernel_file_handle.cpp.hpp" "test/tests/file_handle_create_close/runner.cpp" diff --git a/include/llfio/revision.hpp b/include/llfio/revision.hpp index 9e73339c..4fec5a76 100644 --- a/include/llfio/revision.hpp +++ b/include/llfio/revision.hpp @@ -1,4 +1,4 @@ // Note the second line of this file must ALWAYS be the git SHA, third line ALWAYS the git SHA update time -#define LLFIO_PREVIOUS_COMMIT_REF 540cff7aa6d51cdad25c50857a4c3f523368cd33 -#define LLFIO_PREVIOUS_COMMIT_DATE "2020-12-17 13:41:30 +00:00" -#define LLFIO_PREVIOUS_COMMIT_UNIQUE 540cff7a +#define LLFIO_PREVIOUS_COMMIT_REF 4fa0e9bf0835d9d4d9dd8b0f93e6c650650fc0af +#define LLFIO_PREVIOUS_COMMIT_DATE "2020-12-21 16:27:04 +00:00" +#define LLFIO_PREVIOUS_COMMIT_UNIQUE 4fa0e9bf diff --git a/include/llfio/v2.0/detail/impl/dynamic_thread_pool_group.ipp b/include/llfio/v2.0/detail/impl/dynamic_thread_pool_group.ipp new file mode 100644 index 00000000..c44dcde4 --- /dev/null +++ b/include/llfio/v2.0/detail/impl/dynamic_thread_pool_group.ipp @@ -0,0 +1,728 @@ +/* Dynamic thread pool group +(C) 2020 Niall Douglas (9 commits) +File Created: Dec 2020 + + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License in the accompanying file +Licence.txt or at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + + +Distributed under the Boost Software License, Version 1.0. + (See accompanying file Licence.txt or copy at + http://www.boost.org/LICENSE_1_0.txt) +*/ + +#include "../../dynamic_thread_pool_group.hpp" + +#include +#include +#include +#include + +#ifdef _WIN32 +#include "windows/import.hpp" +#include +#else +#include +#endif + +LLFIO_V2_NAMESPACE_BEGIN + +namespace detail +{ + struct global_dynamic_thread_pool_impl + { +#ifdef _WIN32 + using threadh_type = PTP_CALLBACK_INSTANCE; + using grouph_type = PTP_CALLBACK_ENVIRON; + static void CALLBACK _win32_worker_thread_callback(threadh_type threadh, PVOID Parameter, PTP_WORK /*unused*/) + { + auto *workitem = (dynamic_thread_pool_group::work_item *) Parameter; + global_dynamic_thread_pool()._workerthread(workitem, threadh); + } + static void CALLBACK _win32_timer_thread_callback(threadh_type threadh, PVOID Parameter, PTP_TIMER /*unused*/) + { + auto *workitem = (dynamic_thread_pool_group::work_item *) Parameter; + global_dynamic_thread_pool()._timerthread(workitem, threadh); + } +#else +#endif + + std::mutex workqueue_lock; + using _lock_guard = std::unique_lock; + struct workqueue_item + { + std::unordered_set items; + }; + std::vector workqueue; + + global_dynamic_thread_pool_impl() + { + workqueue.reserve(4); // preallocate 4 levels of nesting + } + + template static void _append_to_list(T &what, U *v) + { + if(what.front == nullptr) + { + assert(what.back == nullptr); + v->_next = v->_prev = nullptr; + what.front = what.back = v; + } + else + { + v->_next = nullptr; + v->_prev = what.back; + what.back->_next = v; + what.back = v; + } + what.count++; + } + template static void _remove_from_list(T &what, U *v) + { + if(v->_prev == nullptr && v->_next == nullptr) + { + assert(what.front == v); + assert(what.back == v); + what.front = what.back = nullptr; + } + else if(v->_prev == nullptr) + { + assert(what.front == v); + v->_next->_prev = nullptr; + what.front = v->_next; + v->_next = v->_prev = nullptr; + } + else if(v->_next == nullptr) + { + assert(what.back == v); + v->_prev->_next = nullptr; + what.back = v->_prev; + v->_next = v->_prev = nullptr; + } + else + { + v->_next->_prev = v->_prev; + v->_prev->_next = v->_next; + v->_next = v->_prev = nullptr; + } + what.count--; + } + + result _prepare_work_item_delay(dynamic_thread_pool_group::work_item *workitem, grouph_type grouph, deadline d) + { + if(!d) + { + return errc::invalid_argument; + } + if(workitem->_nextwork == 0 || d.nsecs > 0) + { + if(nullptr == workitem->_internaltimerh) + { + workitem->_internaltimerh = CreateThreadpoolTimer(_win32_timer_thread_callback, workitem, grouph); + if(nullptr == workitem->_internaltimerh) + { + return win32_error(); + } + } + if(d.nsecs > 0) + { + if(d.steady) + { + workitem->_timepoint1 = std::chrono::steady_clock::now() + std::chrono::nanoseconds(d.nsecs); + } + else + { + workitem->_timepoint2 = d.to_time_point(); + } + } + } + return success(); + } + + inline void _submit_work_item(_lock_guard &g, dynamic_thread_pool_group::work_item *workitem); + + inline result submit(_lock_guard &g, dynamic_thread_pool_group_impl *group, span work) noexcept; + + inline void _work_item_done(_lock_guard &g, dynamic_thread_pool_group::work_item *i) noexcept; + inline void _work_item_next(_lock_guard &g, dynamic_thread_pool_group::work_item *i) noexcept; + + inline result stop(_lock_guard &g, dynamic_thread_pool_group_impl *group, result err) noexcept; + inline result wait(_lock_guard &g, bool reap, const dynamic_thread_pool_group_impl *group, deadline d) noexcept; + + inline void _timerthread(dynamic_thread_pool_group::work_item *workitem, threadh_type selfthreadh); + inline void _workerthread(dynamic_thread_pool_group::work_item *workitem, threadh_type selfthreadh); + }; + struct global_dynamic_thread_pool_impl_thread_local_state_t + { + dynamic_thread_pool_group::work_item *workitem{nullptr}; + global_dynamic_thread_pool_impl::threadh_type current_callback_instance{nullptr}; + size_t nesting_level{0}; + }; + LLFIO_HEADERS_ONLY_FUNC_SPEC global_dynamic_thread_pool_impl_thread_local_state_t &global_dynamic_thread_pool_thread_local_state() noexcept + { + static thread_local global_dynamic_thread_pool_impl_thread_local_state_t tls; + return tls; + } + + LLFIO_HEADERS_ONLY_FUNC_SPEC global_dynamic_thread_pool_impl &global_dynamic_thread_pool() noexcept + { + static global_dynamic_thread_pool_impl impl; + return impl; + } +} // namespace detail + + +class dynamic_thread_pool_group_impl final : public dynamic_thread_pool_group +{ + friend struct detail::global_dynamic_thread_pool_impl; + + mutable std::mutex _lock; + using _lock_guard = detail::global_dynamic_thread_pool_impl::_lock_guard; + size_t _nesting_level{0}; + struct workitems_t + { + size_t count{0}; + dynamic_thread_pool_group::work_item *front{nullptr}, *back{nullptr}; + } _work_items_active, _work_items_done, _work_items_delayed; + std::atomic _stopping{false}, _stopped{true}, _completing{false}; + result _abnormal_completion_cause{success()}; // The cause of any abnormal group completion + +#ifdef _WIN32 + TP_CALLBACK_ENVIRON _callbackenviron; + PTP_CALLBACK_ENVIRON _grouph{&_callbackenviron}; +#endif + +public: + result init() + { + LLFIO_LOG_FUNCTION_CALL(this); + try + { + auto &impl = detail::global_dynamic_thread_pool(); + _nesting_level = detail::global_dynamic_thread_pool_thread_local_state().nesting_level; +#ifdef _WIN32 + InitializeThreadpoolEnvironment(_grouph); +#else +#endif + detail::global_dynamic_thread_pool_impl::_lock_guard g(impl.workqueue_lock); + // Append this group to the global work queue at its nesting level + if(_nesting_level >= impl.workqueue.size()) + { + impl.workqueue.resize(_nesting_level + 1); + } + impl.workqueue[_nesting_level].items.insert(this); + return success(); + } + catch(...) + { + return error_from_exception(); + } + } + + virtual ~dynamic_thread_pool_group_impl() + { + LLFIO_LOG_FUNCTION_CALL(this); + (void) wait(); + auto &impl = detail::global_dynamic_thread_pool(); +#ifdef _WIN32 + if(nullptr != _grouph) + { + DestroyThreadpoolEnvironment(_grouph); + _grouph = nullptr; + } +#else +#endif + detail::global_dynamic_thread_pool_impl::_lock_guard g(impl.workqueue_lock); + assert(impl.workqueue.size() > _nesting_level); + impl.workqueue[_nesting_level].items.erase(this); + while(!impl.workqueue.empty() && impl.workqueue.back().items.empty()) + { + impl.workqueue.pop_back(); + } + } + + virtual result submit(span work) noexcept override + { + LLFIO_LOG_FUNCTION_CALL(this); + if(_stopping.load(std::memory_order_relaxed)) + { + return errc::operation_canceled; + } + _stopped.store(false, std::memory_order_release); + if(_completing.load(std::memory_order_relaxed)) + { + for(auto *i : work) + { + i->_parent = this; + detail::global_dynamic_thread_pool_impl::_append_to_list(_work_items_delayed, i); + } + return success(); + } + auto &impl = detail::global_dynamic_thread_pool(); + _lock_guard g(_lock); + if(_work_items_active.count == 0 && _work_items_done.count == 0) + { + _abnormal_completion_cause = success(); + } + OUTCOME_TRY(impl.submit(g, this, work)); + if(_work_items_active.count == 0) + { + _stopped.store(true, std::memory_order_release); + } + return success(); + } + + virtual result stop() noexcept override + { + LLFIO_LOG_FUNCTION_CALL(this); + if(_stopped.load(std::memory_order_relaxed)) + { + return success(); + } + auto &impl = detail::global_dynamic_thread_pool(); + _lock_guard g(_lock); + return impl.stop(g, this, errc::operation_canceled); + } + + virtual bool stopping() const noexcept override { return _stopping.load(std::memory_order_relaxed); } + + virtual bool stopped() const noexcept override { return _stopped.load(std::memory_order_relaxed); } + + virtual result wait(deadline d = {}) const noexcept override + { + LLFIO_LOG_FUNCTION_CALL(this); + if(_stopped.load(std::memory_order_relaxed)) + { + return success(); + } + auto &impl = detail::global_dynamic_thread_pool(); + _lock_guard g(_lock); + return impl.wait(g, true, this, d); + } +}; + +LLFIO_HEADERS_ONLY_MEMFUNC_SPEC size_t dynamic_thread_pool_group::current_nesting_level() noexcept +{ + return detail::global_dynamic_thread_pool_thread_local_state().nesting_level; +} + +LLFIO_HEADERS_ONLY_MEMFUNC_SPEC dynamic_thread_pool_group::work_item *dynamic_thread_pool_group::current_work_item() noexcept +{ + return detail::global_dynamic_thread_pool_thread_local_state().workitem; +} + +LLFIO_HEADERS_ONLY_FUNC_SPEC result make_dynamic_thread_pool_group() noexcept +{ + try + { + auto ret = std::make_unique(); + OUTCOME_TRY(ret->init()); + return dynamic_thread_pool_group_ptr(std::move(ret)); + } + catch(...) + { + return error_from_exception(); + } +} + +namespace detail +{ + inline void global_dynamic_thread_pool_impl::_submit_work_item(_lock_guard &g, dynamic_thread_pool_group::work_item *workitem) + { + (void) g; + if(workitem->_nextwork != -1 && !workitem->_parent->_stopping.load(std::memory_order_relaxed)) + { + // If no work item for now, or there is a delay, schedule a timer + if(workitem->_nextwork == 0 || workitem->_timepoint1 != std::chrono::steady_clock::time_point() || + workitem->_timepoint2 != std::chrono::system_clock::time_point()) + { + assert(workitem->_internaltimerh != nullptr); +#ifdef _WIN32 + LARGE_INTEGER li; + if(workitem->_timepoint1 != std::chrono::steady_clock::time_point()) + { + li.QuadPart = std::chrono::duration_cast(workitem->_timepoint1 - std::chrono::steady_clock::now()).count() / 100; + if(li.QuadPart < 0) + { + li.QuadPart = 0; + } + li.QuadPart = -li.QuadPart; // negative is relative + } + else if(workitem->_timepoint2 != std::chrono::system_clock::time_point()) + { + li = windows_nt_kernel::from_timepoint(workitem->_timepoint2); + } + else + { + li.QuadPart = -1; // smallest possible non immediate duration from now + } + FILETIME ft; + ft.dwHighDateTime = (DWORD) li.HighPart; + ft.dwLowDateTime = li.LowPart; + // std::cout << "*** timer " << workitem << std::endl; + SetThreadpoolTimer((PTP_TIMER) workitem->_internaltimerh, &ft, 0, 1000); +#else +#endif + } + else + { +#ifdef _WIN32 + // Set the priority of the group according to distance from the top + TP_CALLBACK_PRIORITY priority = TP_CALLBACK_PRIORITY_LOW; + if(workqueue.size() - workitem->_parent->_nesting_level == 1) + { + priority = TP_CALLBACK_PRIORITY_HIGH; + } + else if(workqueue.size() - workitem->_parent->_nesting_level == 2) + { + priority = TP_CALLBACK_PRIORITY_NORMAL; + } + SetThreadpoolCallbackPriority(workitem->_parent->_grouph, priority); + // std::cout << "*** submit " << workitem << std::endl; + SubmitThreadpoolWork((PTP_WORK) workitem->_internalworkh); +#else +#endif + } + } + } + + inline result global_dynamic_thread_pool_impl::submit(_lock_guard &g, dynamic_thread_pool_group_impl *group, + span work) noexcept + { + try + { + if(work.empty()) + { + return success(); + } + for(auto *i : work) + { + if(i->_parent != nullptr) + { + return errc::address_in_use; + } + } + auto uninit = make_scope_exit([&]() noexcept { + for(auto *i : work) + { + _remove_from_list(group->_work_items_active, i); +#ifdef _WIN32 + if(nullptr != i->_internaltimerh) + { + CloseThreadpoolTimer((PTP_TIMER) i->_internaltimerh); + i->_internaltimerh = nullptr; + } + if(nullptr != i->_internalworkh) + { + CloseThreadpoolWork((PTP_WORK) i->_internalworkh); + i->_internalworkh = nullptr; + } +#else +#endif + i->_parent = nullptr; + } + }); + for(auto *i : work) + { + deadline d(std::chrono::seconds(0)); + i->_parent = group; + i->_nextwork = i->next(d); + if(-1 == i->_nextwork) + { + _append_to_list(group->_work_items_done, i); + } + else + { +#ifdef _WIN32 + i->_internalworkh = CreateThreadpoolWork(_win32_worker_thread_callback, i, group->_grouph); + if(nullptr == i->_internalworkh) + { + return win32_error(); + } +#else +#endif + OUTCOME_TRY(_prepare_work_item_delay(i, group->_grouph, d)); + _append_to_list(group->_work_items_active, i); + } + } + uninit.release(); + { + for(auto *i : work) + { + _submit_work_item(g, i); + } + } + return success(); + } + catch(...) + { + return error_from_exception(); + } + } + + inline void global_dynamic_thread_pool_impl::_work_item_done(_lock_guard &g, dynamic_thread_pool_group::work_item *i) noexcept + { + (void) g; +#ifdef _WIN32 + if(1 == i->_internalworkh_inuse) + { + i->_internalworkh_inuse = 2; + } + else + { + if(i->_internaltimerh != nullptr) + { + CloseThreadpoolTimer((PTP_TIMER) i->_internaltimerh); + i->_internaltimerh = nullptr; + } + if(i->_internalworkh != nullptr) + { + CloseThreadpoolWork((PTP_WORK) i->_internalworkh); + i->_internalworkh = nullptr; + } + } +#else +#endif + _remove_from_list(i->_parent->_work_items_active, i); + _append_to_list(i->_parent->_work_items_done, i); + if(i->_parent->_work_items_active.count == 0) + { + auto *parent = i->_parent; + i = nullptr; + auto *v = parent->_work_items_done.front, *n = v; + for(; v != nullptr; v = n) + { + v->_parent = nullptr; + n = v->_next; + } + n = v = parent->_work_items_done.front; + parent->_work_items_done.front = parent->_work_items_done.back = nullptr; + parent->_work_items_done.count = 0; + parent->_stopping.store(false, std::memory_order_release); + parent->_stopped.store(true, std::memory_order_release); + parent->_completing.store(true, std::memory_order_release); + for(; v != nullptr; v = n) + { + n = v->_next; + v->group_complete(parent->_abnormal_completion_cause); + } + parent->_completing.store(false, std::memory_order_release); + // Did a least one group_complete() submit more work to myself? + while(parent->_work_items_delayed.count > 0) + { + i = parent->_work_items_delayed.front; + _remove_from_list(parent->_work_items_delayed, i); + auto r = submit(g, parent, {&i, 1}); + if(!r) + { + parent->_work_items_delayed = {}; + (void) stop(g, parent, std::move(r)); + break; + } + } + } + } + inline void global_dynamic_thread_pool_impl::_work_item_next(_lock_guard &g, dynamic_thread_pool_group::work_item *workitem) noexcept + { + assert(workitem->_nextwork != -1); + if(workitem->_nextwork == 0) + { + deadline d(std::chrono::seconds(0)); + workitem->_nextwork = workitem->next(d); + auto r = _prepare_work_item_delay(workitem, workitem->_parent->_grouph, d); + if(!r) + { + (void) stop(g, workitem->_parent, std::move(r)); + _work_item_done(g, workitem); + return; + } + } + if(-1 == workitem->_nextwork) + { + _work_item_done(g, workitem); + return; + } + _submit_work_item(g, workitem); + } + + inline result global_dynamic_thread_pool_impl::stop(_lock_guard &g, dynamic_thread_pool_group_impl *group, result err) noexcept + { + (void) g; + if(group->_abnormal_completion_cause) + { + group->_abnormal_completion_cause = std::move(err); + } + group->_stopping.store(true, std::memory_order_release); + return success(); + } + + inline result global_dynamic_thread_pool_impl::wait(_lock_guard &g, bool reap, const dynamic_thread_pool_group_impl *group, deadline d) noexcept + { + LLFIO_DEADLINE_TO_SLEEP_INIT(d); + if(!d || d.nsecs > 0) + { + auto &tls = detail::global_dynamic_thread_pool_thread_local_state(); +#ifdef _WIN32 + if(tls.current_callback_instance != nullptr) + { + // I am being called from within a thread worker. Tell + // the thread pool that I am not going to exit promptly. + CallbackMayRunLong(tls.current_callback_instance); + } +#endif + // Is this a cancellation? + if(group->_stopping.load(std::memory_order_relaxed)) + { + while(group->_work_items_active.count > 0) + { +#ifdef _WIN32 + auto *i = group->_work_items_active.front; + if(nullptr != i->_internalworkh) + { + i->_internalworkh_inuse = 1; + g.unlock(); + WaitForThreadpoolWorkCallbacks((PTP_WORK) i->_internalworkh, true); + g.lock(); + if(i->_internalworkh_inuse == 2) + { + CloseThreadpoolWork((PTP_WORK) i->_internalworkh); + i->_internalworkh = nullptr; + } + i->_internalworkh_inuse = 0; + } + if(nullptr != i->_internaltimerh) + { + i->_internalworkh_inuse = 1; + g.unlock(); + WaitForThreadpoolTimerCallbacks((PTP_TIMER) i->_internaltimerh, true); + g.lock(); + if(i->_internalworkh_inuse == 2) + { + CloseThreadpoolTimer((PTP_TIMER) i->_internaltimerh); + i->_internalworkh = nullptr; + } + i->_internalworkh_inuse = 0; + } + if(group->_work_items_active.count > 0 && group->_work_items_active.front == i) + { + // This item got cancelled before it started + _work_item_done(g, group->_work_items_active.front); + } +#else +#endif + } + assert(!group->_stopping.load(std::memory_order_relaxed)); + } + else if(!d) + { + while(group->_work_items_active.count > 0) + { +#ifdef _WIN32 + auto *i = group->_work_items_active.front; + if(nullptr != i->_internalworkh) + { + i->_internalworkh_inuse = 1; + g.unlock(); + WaitForThreadpoolWorkCallbacks((PTP_WORK) i->_internalworkh, false); + g.lock(); + if(i->_internalworkh_inuse == 2) + { + CloseThreadpoolWork((PTP_WORK) i->_internalworkh); + i->_internalworkh = nullptr; + } + i->_internalworkh_inuse = 0; + } + if(nullptr != i->_internaltimerh) + { + i->_internalworkh_inuse = 1; + g.unlock(); + WaitForThreadpoolTimerCallbacks((PTP_TIMER) i->_internaltimerh, false); + g.lock(); + if(i->_internalworkh_inuse == 2) + { + CloseThreadpoolTimer((PTP_TIMER) i->_internaltimerh); + i->_internalworkh = nullptr; + } + i->_internalworkh_inuse = 0; + } +#else +#endif + } + } + else + { + while(group->_work_items_active.count > 0) + { + LLFIO_DEADLINE_TO_TIMEOUT_LOOP(d); + g.unlock(); + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + g.lock(); + } + } + } + if(group->_work_items_active.count > 0) + { + return errc::timed_out; + } + if(reap) + { + return std::move(group->_abnormal_completion_cause); + } + return success(); + } + + inline void global_dynamic_thread_pool_impl::_timerthread(dynamic_thread_pool_group::work_item *workitem, threadh_type /*unused*/) + { + LLFIO_LOG_FUNCTION_CALL(this); + // std::cout << "*** _timerthread " << workitem << std::endl; + _lock_guard g(workitem->_parent->_lock); + _work_item_next(g, workitem); + } + + // Worker thread entry point + inline void global_dynamic_thread_pool_impl::_workerthread(dynamic_thread_pool_group::work_item *workitem, threadh_type selfthreadh) + { + LLFIO_LOG_FUNCTION_CALL(this); + // std::cout << "*** _workerthread " << workitem << " with work " << workitem->_nextwork << std::endl; + assert(workitem->_nextwork != -1); + assert(workitem->_nextwork != 0); + if(workitem->_parent->_stopped.load(std::memory_order_relaxed)) + { + _lock_guard g(workitem->_parent->_lock); + _work_item_done(g, workitem); + return; + } + auto &tls = detail::global_dynamic_thread_pool_thread_local_state(); + auto old_thread_local_state = tls; + tls.workitem = workitem; + tls.current_callback_instance = selfthreadh; + tls.nesting_level++; + auto r = (*workitem)(workitem->_nextwork); + workitem->_nextwork = 0; // call next() next time + tls = old_thread_local_state; + _lock_guard g(workitem->_parent->_lock); + if(!r) + { + (void) stop(g, workitem->_parent, std::move(r)); + _work_item_done(g, workitem); + workitem = nullptr; + } + else + { + _work_item_next(g, workitem); + } + } +} // namespace detail + +LLFIO_V2_NAMESPACE_END diff --git a/include/llfio/v2.0/dynamic_thread_pool_group.hpp b/include/llfio/v2.0/dynamic_thread_pool_group.hpp new file mode 100644 index 00000000..c216f023 --- /dev/null +++ b/include/llfio/v2.0/dynamic_thread_pool_group.hpp @@ -0,0 +1,311 @@ +/* Dynamic thread pool group +(C) 2020 Niall Douglas (9 commits) +File Created: Dec 2020 + + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License in the accompanying file +Licence.txt or at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + + +Distributed under the Boost Software License, Version 1.0. + (See accompanying file Licence.txt or copy at + http://www.boost.org/LICENSE_1_0.txt) +*/ + +#ifndef LLFIO_DYNAMIC_THREAD_POOL_GROUP_H +#define LLFIO_DYNAMIC_THREAD_POOL_GROUP_H + +#include "deadline.h" + +#include // for unique_ptr and shared_ptr + +#ifdef _MSC_VER +#pragma warning(push) +#pragma warning(disable : 4251) // dll interface +#endif + +LLFIO_V2_NAMESPACE_EXPORT_BEGIN + +class dynamic_thread_pool_group_impl; + +namespace detail +{ + struct global_dynamic_thread_pool_impl; + LLFIO_HEADERS_ONLY_FUNC_SPEC global_dynamic_thread_pool_impl &global_dynamic_thread_pool() noexcept; +} // namespace detail + +/*! \class dynamic_thread_pool_group +\brief Work group within the global dynamic thread pool. + +Some operating systems provide a per-process global kernel thread pool capable of +dynamically adjusting its kernel thread count to how many of the threads in +the pool are currently blocked. The platform will choose the exact strategy used, +but as an example of a strategy, one might keep creating new kernel threads +so long as the total threads currently running and not blocked on page faults, +i/o or syscalls, is below the hardware concurrency. Similarly, if more threads +are running and not blocked than hardware concurrency, one might remove kernel +threads from executing work. Such a strategy would dynamically increase +concurrency until all CPUs are busy, but reduce concurrency if more work is +being done than CPUs available. + +Such dynamic kernel thread pools are excellent for CPU bound processing, you +simply fire and forget work into them. However, for i/o bound processing, you +must be careful as there are gotchas. For non-seekable i/o, it is very possible +that there could be 100k handles upon which we do i/o. Doing i/o on +100k handles using a dynamic thread pool would in theory cause the creation +of 100k kernel threads, which would not be wise. A much better solution is +to use an `io_multiplexer` to await changes in large sets of i/o handles. + +For seekable i/o, the same problem applies, but worse again: an i/o bound problem +would cause a rapid increase in the number of kernel threads, which by +definition makes i/o even more congested. Basically the system runs off +into pathological performance loss. You must therefore never naively do +i/o bound work (e.g. with memory mapped files) from within a dynamic thread +pool without employing some mechanism to force concurrency downwards if +the backing storage is congested. + +## Work groups + +Instances of this class contain zero or more work items. Each work item +is asked for its next item of work, and if an item of work is available, +that item of work is executed by the global kernel thread pool at a time +of its choosing. It is NEVER possible that any one work item is concurrently +executed at a time, each work item is always sequentially executed with +respect to itself. The only concurrency possible is *across* work items. +Therefore, if you want to execute the same piece of code concurrently, +you need to submit a separate work item for each possible amount of +concurrency (e.g. `std::thread::hardware_concurrency()`). + +You can have as many or as few items of work as you like. You can +dynamically submit additional work items at any time. The group of work items can +be waited upon to complete, after which the work group becomes reset as +if back to freshly constructed. You can also stop executing all the work +items in the group, even if they have not fully completed. If any work +item returns a failure, this equals a `stop()`, and the next `wait()` will +return that error. + +Work items may create sub work groups as part of their +operation. If they do so, the work items from such nested work groups are +scheduled preferentially. This ensures good forward progress, so if you +have 100 work items each of which do another 100 work items, you don't get +10,000 slowly progressing work. Rather, the work items in the first set +progress slowly, whereas the work items in the second set progress quickly. + +`work_item::next()` may optionally set a deadline to delay when that work +item ought to be processed again. Deadlines can be relative or absolute. + +## C++ 23 Executors + +As with elsewhere in LLFIO, as a low level facility, we don't implement +https://wg21.link/P0443 Executors, but it is trivially easy to implement +a dynamic equivalent to `std::static_thread_pool` using this class. + +## Implementation notes + +### Microsoft Windows + +On Microsoft Windows, the Win32 thread pool API is used (https://docs.microsoft.com/en-us/windows/win32/procthread/thread-pool-api). +This is an IOCP-aware thread pool which will dynamically increase the number +of kernel threads until none are blocked. If more kernel threads +are running than twice the number of CPUs in the system, the number of kernel +threads is dynamically reduced. The maximum number of kernel threads which +will run simultaneously is 500. Note that the Win32 thread pool is shared +across the process by multiple Windows facilities. + +Note that the Win32 thread pool has built in support for IOCP, so if you +have a custom i/o multiplexer, you can use the global Win32 thread pool +to execute i/o completions handling. See `CreateThreadpoolIo()` for more. + +No dynamic memory allocation is performed by this implementation outside +of the initial `make_dynamic_thread_pool_group()`. The Win32 thread pool +API may perform dynamic memory allocation internally, but that is outside +our control. + +### Linux + +On Linux, a similar strategy to Microsoft Windows' approach is used. We +dynamically increase the number of kernel threads until none are sleeping +awaiting i/o. If more kernel threads are running than 1.5x the number of +CPUs in the system, the number of kernel threads is dynamically reduced. +For portability, we also gate the maximum number of kernel threads to 500. +Note that **all** the kernel threads for the current process are considered, +not just the kernel threads created by this thread pool implementation. +Therefore, if you have alternative thread pool implementations (e.g. OpenMP, +`std::async`), those are also included in the dynamic adjustment. + +As this is wholly implemented by this library, dynamic memory allocation +occurs in the initial `make_dynamic_thread_pool_group()`, but otherwise +the implementation does not perform dynamic memory allocations. +*/ +class LLFIO_DECL dynamic_thread_pool_group +{ +public: + //! An individual item of work within the work group. + class work_item + { + friend struct detail::global_dynamic_thread_pool_impl; + friend class dynamic_thread_pool_group_impl; + dynamic_thread_pool_group_impl *_parent{nullptr}; + void *_internalworkh{nullptr}, *_internaltimerh{nullptr}; + work_item *_prev{nullptr}, *_next{nullptr}; + intptr_t _nextwork{0}; + std::chrono::steady_clock::time_point _timepoint1; + std::chrono::system_clock::time_point _timepoint2; + int _internalworkh_inuse{0}; + + protected: + work_item() = default; + work_item(const work_item &) = default; + work_item(work_item &&) = default; + work_item &operator=(const work_item &) = default; + work_item &operator=(work_item &&) = default; + + public: + virtual ~work_item() {} + + //! Returns the parent work group between successful submission and just before `group_complete()`. + dynamic_thread_pool_group *parent() const noexcept { return reinterpret_cast(_parent); } + + /*! Invoked by the i/o thread pool to determine if this work item + has more work to do. + + \return If there is no work _currently_ available to do, but there + might be some later, you should return zero. You will be called again + later after other work has been done. If you return -1, you are + saying that no further work will be done, and the group need never + call you again. If you have more work you want to do, return any + other value. + \param d Optional delay before the next item of work ought to be + executed (return != 0), or `next()` ought to be called again to + determine the next item (return == 0). On entry `d` is set to no + delay, so if you don't modify it, the next item of work occurs + as soon as possible. + + Note that this function is called from multiple kernel threads. + You must NOT do any significant work in this function. + In particular do NOT call any dynamic thread pool group function, + as you will experience deadlock. + + `dynamic_thread_pool_group::current_work_item()` may have any + value during this call. + */ + virtual intptr_t next(deadline &d) noexcept = 0; + + /*! Invoked by the i/o thread pool to perform the next item of work. + + \return Any failure causes all remaining work in this group to + be cancelled as soon as possible. + \param work The value returned by `next()`. + + Note that this function is called from multiple kernel threads, + and may not be the kernel thread from which `next()` + was called. + + `dynamic_thread_pool_group::current_work_item()` will always be + `this` during this call. + */ + virtual result operator()(intptr_t work) noexcept = 0; + + /*! Invoked by the i/o thread pool when all work in this thread + pool group is complete. + + `cancelled` indicates if this is an abnormal completion. If its + error compares equal to `errc::operation_cancelled`, then `stop()` + was called. + + Just before this is called for all work items submitted, the group + becomes reset to fresh, and `parent()` becomes null. You can resubmit + this work item, but do not submit other work items until their + `group_complete()` has been invoked. + + Note that this function is called from multiple kernel threads. + + `dynamic_thread_pool_group::current_work_item()` may have any + value during this call. + */ + virtual void group_complete(const result &cancelled) noexcept { (void) cancelled; } + }; + + virtual ~dynamic_thread_pool_group() {} + + /*! \brief Threadsafe. Submit one or more work items for execution. Note that you can submit more later. + + Note that if the group is currently stopping, you cannot submit more + work until the group has stopped. An error code comparing equal to + `errc::operation_canceled` is returned if you try. + */ + virtual result submit(span work) noexcept = 0; + + //! Threadsafe. Cancel any remaining work previously submitted, but without blocking (use `wait()` to block). + virtual result stop() noexcept = 0; + + /*! \brief Threadsafe. True if a work item reported an error, or + `stop()` was called, but work items are still running. + */ + virtual bool stopping() const noexcept = 0; + + //! Threadsafe. True if all the work previously submitted is complete. + virtual bool stopped() const noexcept = 0; + + //! Threadsafe. Wait for work previously submitted to complete, returning any failures by any work item. + virtual result wait(deadline d = {}) const noexcept = 0; + //! \overload + template result wait_for(const std::chrono::duration &duration) const noexcept + { + auto r = wait(duration); + if(!r && r.error() == errc::timed_out) + { + return false; + } + OUTCOME_TRY(std::move(r)); + return true; + } + //! \overload + template result wait_until(const std::chrono::time_point &timeout) const noexcept + { + auto r = wait(timeout); + if(!r && r.error() == errc::timed_out) + { + return false; + } + OUTCOME_TRY(std::move(r)); + return true; + } + + //! Returns the work item nesting level which would be used if a new dynamic thread pool group were created within the current work item. + static LLFIO_HEADERS_ONLY_MEMFUNC_SPEC size_t current_nesting_level() noexcept; + //! Returns the work item the calling thread is running within, if any. + static LLFIO_HEADERS_ONLY_MEMFUNC_SPEC work_item *current_work_item() noexcept; +}; +//! A unique ptr to a work group within the global dynamic thread pool. +using dynamic_thread_pool_group_ptr = std::unique_ptr; + +//! Creates a new work group within the global dynamic thread pool. +LLFIO_HEADERS_ONLY_FUNC_SPEC result make_dynamic_thread_pool_group() noexcept; + +// BEGIN make_free_functions.py +// END make_free_functions.py + +LLFIO_V2_NAMESPACE_END + +#ifdef _MSC_VER +#pragma warning(pop) +#endif + +#if LLFIO_HEADERS_ONLY == 1 && !defined(DOXYGEN_SHOULD_SKIP_THIS) +#define LLFIO_INCLUDED_BY_HEADER 1 +#include "detail/impl/dynamic_thread_pool_group.ipp" +#undef LLFIO_INCLUDED_BY_HEADER +#endif + +#endif diff --git a/include/llfio/v2.0/llfio.hpp b/include/llfio/v2.0/llfio.hpp index 9cbe624d..5c0874de 100644 --- a/include/llfio/v2.0/llfio.hpp +++ b/include/llfio/v2.0/llfio.hpp @@ -63,9 +63,9 @@ import LLFIO_MODULE_NAME; #include "utils.hpp" #include "directory_handle.hpp" +#include "dynamic_thread_pool_group.hpp" #include "fast_random_file_handle.hpp" #include "file_handle.hpp" -//#include "io_thread_pool_group.hpp" #include "process_handle.hpp" #include "statfs.hpp" #ifdef LLFIO_INCLUDE_STORAGE_PROFILE diff --git a/include/llfio/v2.0/logging.hpp b/include/llfio/v2.0/logging.hpp index 1dff759a..7ef30363 100644 --- a/include/llfio/v2.0/logging.hpp +++ b/include/llfio/v2.0/logging.hpp @@ -77,7 +77,7 @@ public: { reinterpret_cast(detail::thread_local_log_level()) = n; } - ~log_level_guard() {reinterpret_cast(detail::thread_local_log_level()) = _v; } + ~log_level_guard() { reinterpret_cast(detail::thread_local_log_level()) = _v; } }; // Infrastructure for recording the current path for when failure occurs @@ -262,8 +262,9 @@ namespace detail return span(buffer, length); } // Strips a __PRETTY_FUNCTION__ of all instances of ::LLFIO_V2_NAMESPACE:: and ::LLFIO_V2_NAMESPACE:: - inline void strip_pretty_function(char *out, size_t bytes, const char *in) + inline void strip_pretty_function(char *_out, size_t bytes, const char *in) { + char *out = _out; const span remove1 = llfio_namespace_string(); const span remove2 = outcome_namespace_string(); for(--bytes; bytes && *in; --bytes) @@ -272,6 +273,32 @@ namespace detail in += remove1.size(); if(!strncmp(in, remove2.data(), remove2.size())) in += remove2.size(); + if(!strncmp(in, "basic_result<", 13)) + { + int count = 13; + for(--bytes; bytes && *in && count; --bytes, --count) + { + *out++ = *in++; + } + if(!*in || bytes ==0) + { + break; + } + count = 1; + while(*in && count > 0) + { + if(*in == '<') + { + count++; + } + else if(*in == '>') + { + count--; + } + in++; + } + in--; + } *out++ = *in++; } *out = 0; diff --git a/include/llfio/v2.0/status_code.hpp b/include/llfio/v2.0/status_code.hpp index 80fd905d..03eb0f67 100644 --- a/include/llfio/v2.0/status_code.hpp +++ b/include/llfio/v2.0/status_code.hpp @@ -25,8 +25,6 @@ Distributed under the Boost Software License, Version 1.0. #ifndef LLFIO_STATUS_CODE_HPP #define LLFIO_STATUS_CODE_HPP -#include "logging.hpp" - /* The SG14 status code implementation is quite profoundly different to the error code implementation. In the error code implementation, std::error_code is fixed by the standard library, so we wrap it with extra metadata into @@ -68,6 +66,8 @@ as that (a) enables safe header only LLFIO on Windows (b) produces better codege #error LLFIO needs Outcome v2.2 or higher #endif +#include "logging.hpp" + LLFIO_V2_NAMESPACE_BEGIN #ifndef LLFIO_DISABLE_PATHS_IN_FAILURE_INFO @@ -351,6 +351,8 @@ LLFIO_V2_NAMESPACE_END #error LLFIO needs Outcome v2.2 or higher #endif +#include "logging.hpp" + LLFIO_V2_NAMESPACE_BEGIN namespace detail -- cgit v1.2.3