/* Dynamic thread pool group (C) 2020-2021 Niall Douglas (9 commits) File Created: Dec 2020 Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License in the accompanying file Licence.txt or at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. Distributed under the Boost Software License, Version 1.0. (See accompanying file Licence.txt or copy at http://www.boost.org/LICENSE_1_0.txt) */ #ifndef LLFIO_DYNAMIC_THREAD_POOL_GROUP_H #define LLFIO_DYNAMIC_THREAD_POOL_GROUP_H #include "deadline.h" #include // for unique_ptr and shared_ptr #ifdef _MSC_VER #pragma warning(push) #pragma warning(disable : 4251) // dll interface #pragma warning(disable : 4275) // dll interface #endif LLFIO_V2_NAMESPACE_EXPORT_BEGIN class dynamic_thread_pool_group_impl; class io_handle; namespace detail { struct global_dynamic_thread_pool_impl; struct global_dynamic_thread_pool_impl_workqueue_item; LLFIO_HEADERS_ONLY_FUNC_SPEC global_dynamic_thread_pool_impl &global_dynamic_thread_pool() noexcept; } // namespace detail /*! \class dynamic_thread_pool_group \brief Work group within the global dynamic thread pool. Some operating systems provide a per-process global kernel thread pool capable of dynamically adjusting its kernel thread count to how many of the threads in the pool are currently blocked. The platform will choose the exact strategy used, but as an example of a strategy, one might keep creating new kernel threads so long as the total threads currently running and not blocked on page faults, i/o or syscalls, is below the hardware concurrency. Similarly, if more threads are running and not blocked than hardware concurrency, one might remove kernel threads from executing work. Such a strategy would dynamically increase concurrency until all CPUs are busy, but reduce concurrency if more work is being done than CPUs available. Such dynamic kernel thread pools are excellent for CPU bound processing, you simply fire and forget work into them. However, for i/o bound processing, you must be careful as there are gotchas. For non-seekable i/o, it is very possible that there could be 100k handles upon which we do i/o. Doing i/o on 100k handles using a dynamic thread pool would in theory cause the creation of 100k kernel threads, which would not be wise. A much better solution is to use an `io_multiplexer` to await changes in large sets of i/o handles. For seekable i/o, the same problem applies, but worse again: an i/o bound problem would cause a rapid increase in the number of kernel threads, which by definition makes i/o even more congested. Basically the system runs off into pathological performance loss. You must therefore never naively do i/o bound work (e.g. with memory mapped files) from within a dynamic thread pool without employing some mechanism to force concurrency downwards if the backing storage is congested. ## Work groups Instances of this class contain zero or more work items. Each work item is asked for its next item of work, and if an item of work is available, that item of work is executed by the global kernel thread pool at a time of its choosing. It is NEVER possible that any one work item is concurrently executed at a time, each work item is always sequentially executed with respect to itself. The only concurrency possible is *across* work items. Therefore, if you want to execute the same piece of code concurrently, you need to submit a separate work item for each possible amount of concurrency (e.g. `std::thread::hardware_concurrency()`). You can have as many or as few items of work as you like. You can dynamically submit additional work items at any time, except when a group is currently in the process of being stopped. The group of work items can be waited upon to complete, after which the work group becomes reset as if back to freshly constructed. You can also stop executing all the work items in the group, even if they have not fully completed. If any work item returns a failure, this equals a `stop()`, and the next `wait()` will return that error. Work items may create sub work groups as part of their operation. If they do so, the work items from such nested work groups are scheduled preferentially. This ensures good forward progress, so if you have 100 work items each of which do another 100 work items, you don't get 10,000 slowly progressing work. Rather, the work items in the first set progress slowly, whereas the work items in the second set progress quickly. `work_item::next()` may optionally set a deadline to delay when that work item ought to be processed again. Deadlines can be relative or absolute. ## C++ 23 Executors As with elsewhere in LLFIO, as a low level facility, we don't implement https://wg21.link/P0443 Executors, but it is trivially easy to implement a dynamic equivalent to `std::static_thread_pool` using this class. ## Implementation notes ### Microsoft Windows On Microsoft Windows, the Win32 thread pool API is used (https://docs.microsoft.com/en-us/windows/win32/procthread/thread-pool-api). This is an IOCP-aware thread pool which will dynamically increase the number of kernel threads until none are blocked. If more kernel threads are running than twice the number of CPUs in the system, the number of kernel threads is dynamically reduced. The maximum number of kernel threads which will run simultaneously is 500. Note that the Win32 thread pool is shared across the process by multiple Windows facilities. Note that the Win32 thread pool has built in support for IOCP, so if you have a custom i/o multiplexer, you can use the global Win32 thread pool to execute i/o completions handling. See `CreateThreadpoolIo()` for more. No dynamic memory allocation is performed by this implementation outside of the initial `make_dynamic_thread_pool_group()`. The Win32 thread pool API may perform dynamic memory allocation internally, but that is outside our control. Overhead of LLFIO above the Win32 thread pool API is very low, statistically unmeasurable. ### POSIX If not on Linux, you will need libdispatch which is detected by LLFIO cmake during configuration. libdispatch is better known as Grand Central Dispatch, originally a Mac OS technology but since ported to a high quality kernel based implementation on recent FreeBSDs, and to a lower quality userspace based implementation on Linux. Generally libdispatch should get automatically found on Mac OS without additional effort; on FreeBSD it may need installing from ports; on Linux you would need to explicitly install `libdispatch-dev` or the equivalent. You can force the use in cmake of libdispatch by setting the cmake variable `LLFIO_USE_LIBDISPATCH` to On. Overhead of LLFIO above the libdispatch API is very low, statistically unmeasurable. ### Linux On Linux only, we have a custom userspace implementation with superior performance. A similar strategy to Microsoft Windows' approach is used. We dynamically increase the number of kernel threads until none are sleeping awaiting i/o. If more kernel threads are running than three more than the number of CPUs in the system, the number of kernel threads is dynamically reduced. Note that **all** the kernel threads for the current process are considered, not just the kernel threads created by this thread pool implementation. Therefore, if you have alternative thread pool implementations (e.g. OpenMP, `std::async`), those are also included in the dynamic adjustment. As this is wholly implemented by this library, dynamic memory allocation occurs in the initial `make_dynamic_thread_pool_group()` and per thread creation, but otherwise the implementation does not perform dynamic memory allocations. After multiple rewrites, eventually I got this custom userspace implementation to have superior performance to both ASIO and libdispatch. For larger work items the difference is meaningless between all three, however for smaller work items I benchmarked this custom userspace implementation as beating (non-dynamic) ASIO by approx 29% and Linux libdispatch by approx 52% (note that Linux libdispatch appears to have a scale up bug when work items are small and few, it is often less than half the performance of LLFIO's custom implementation). */ class LLFIO_DECL dynamic_thread_pool_group { friend class dynamic_thread_pool_group_impl; public: //! An individual item of work within the work group. class work_item { friend struct detail::global_dynamic_thread_pool_impl; friend struct detail::global_dynamic_thread_pool_impl_workqueue_item; friend class dynamic_thread_pool_group_impl; std::atomic _parent{nullptr}; void *_internalworkh{nullptr}; void *_internaltimerh{nullptr}; // lazily created if next() ever returns a deadline work_item *_prev{nullptr}, *_next{nullptr}, *_next_scheduled{nullptr}; std::atomic _nextwork{-1}; std::chrono::steady_clock::time_point _timepoint1; std::chrono::system_clock::time_point _timepoint2; int _internalworkh_inuse{0}; protected: constexpr bool _has_timer_set_relative() const noexcept { return _timepoint1 != std::chrono::steady_clock::time_point(); } constexpr bool _has_timer_set_absolute() const noexcept { return _timepoint2 != std::chrono::system_clock::time_point(); } constexpr bool _has_timer_set() const noexcept { return _has_timer_set_relative() || _has_timer_set_absolute(); } constexpr work_item() {} work_item(const work_item &o) = delete; work_item(work_item &&o) noexcept : _parent(o._parent.load(std::memory_order_relaxed)) , _internalworkh(o._internalworkh) , _internaltimerh(o._internaltimerh) , _prev(o._prev) , _next(o._next) , _next_scheduled(o._next_scheduled) , _nextwork(o._nextwork.load(std::memory_order_relaxed)) , _timepoint1(o._timepoint1) , _timepoint2(o._timepoint2) , _internalworkh_inuse(o._internalworkh_inuse) { assert(o._parent.load(std::memory_order_relaxed) == nullptr); assert(o._internalworkh == nullptr); assert(o._internaltimerh == nullptr); if(o._parent.load(std::memory_order_relaxed) != nullptr || o._internalworkh != nullptr) { LLFIO_LOG_FATAL(this, "FATAL: dynamic_thread_pool_group::work_item was relocated in memory during use!"); abort(); } o._prev = o._next = o._next_scheduled = nullptr; o._nextwork.store(-1, std::memory_order_relaxed); o._internalworkh_inuse = 0; } work_item &operator=(const work_item &) = delete; work_item &operator=(work_item &&) = delete; public: virtual ~work_item() { assert(_nextwork.load(std::memory_order_relaxed) == -1); if(_nextwork.load(std::memory_order_relaxed) != -1) { LLFIO_LOG_FATAL(this, "FATAL: dynamic_thread_pool_group::work_item destroyed before all work was done!"); abort(); } assert(_internalworkh == nullptr); assert(_internaltimerh == nullptr); assert(_parent == nullptr); if(_internalworkh != nullptr || _parent != nullptr) { LLFIO_LOG_FATAL(this, "FATAL: dynamic_thread_pool_group::work_item destroyed before group_complete() was executed!"); abort(); } } //! Returns the parent work group between successful submission and just before `group_complete()`. dynamic_thread_pool_group *parent() const noexcept { return reinterpret_cast(_parent.load(std::memory_order_relaxed)); } /*! Invoked by the i/o thread pool to determine if this work item has more work to do. \return If there is no work _currently_ available to do, but there might be some later, you should return zero. You will be called again later after other work has been done. If you return -1, you are saying that no further work will be done, and the group need never call you again. If you have more work you want to do, return any other value. \param d Optional delay before the next item of work ought to be executed (return != 0), or `next()` ought to be called again to determine the next item (return == 0). On entry `d` is set to no delay, so if you don't modify it, the next item of work occurs as soon as possible. Note that this function is called from multiple kernel threads. You must NOT do any significant work in this function. In particular do NOT call any dynamic thread pool group function, as you will experience deadlock. `dynamic_thread_pool_group::current_work_item()` may have any value during this call. */ virtual intptr_t next(deadline &d) noexcept = 0; /*! Invoked by the i/o thread pool to perform the next item of work. \return Any failure causes all remaining work in this group to be cancelled as soon as possible. \param work The value returned by `next()`. Note that this function is called from multiple kernel threads, and may not be the kernel thread from which `next()` was called. `dynamic_thread_pool_group::current_work_item()` will always be `this` during this call. */ virtual result operator()(intptr_t work) noexcept = 0; /*! Invoked by the i/o thread pool when all work in this thread pool group is complete. `cancelled` indicates if this is an abnormal completion. If its error compares equal to `errc::operation_cancelled`, then `stop()` was called. Just before this is called for all work items submitted, the group becomes reset to fresh, and `parent()` becomes null. You can resubmit this work item, but do not submit other work items until their `group_complete()` has been invoked. Note that this function is called from multiple kernel threads. `dynamic_thread_pool_group::current_work_item()` may have any value during this call. */ virtual void group_complete(const result &cancelled) noexcept { (void) cancelled; } }; /*! \class io_aware_work_item \brief A work item which paces when it next executes according to i/o congestion. Currently there is only a working implementation of this for the Microsoft Windows and Linux platforms, due to lack of working `statfs_t::f_iosinprogress` on other platforms. If retrieving that for a seekable handle does not work, the constructor throws an exception. For seekable handles, currently `reads`, `writes` and `barriers` are ignored. We simply retrieve, periodically, `statfs_t::f_iosinprogress` and `statfs_t::f_iosbusytime` for the storage devices backing the seekable handle. If the recent averaged i/o wait time exceeds `max_iosbusytime` and the i/o in progress > `max_iosinprogress`, `next()` will start setting the default deadline passed to `io_aware_next()`. Thereafter, every 1/10th of a second, if `statfs_t::f_iosinprogress` is above `max_iosinprogress`, it will increase the deadline by 1/16th, whereas if it is below `min_iosinprogress`, it will decrease the deadline by 1/16th. The default deadline chosen is always the worst of all the storage devices of all the handles. This will reduce concurrency within the kernel thread pool in order to reduce congestion on the storage devices. If at any point `statfs_t::f_iosbusytime` drops below `max_iosbusytime` as averaged across one second, and `statfs_t::f_iosinprogress` drops below `min_iosinprogress`, the additional throttling is completely removed. `io_aware_next()` can ignore the default deadline passed into it, and can set any other deadline. For non-seekable handles, the handle must have an i/o multiplexer set upon it, and on Microsoft Windows, that i/o multiplexer must be utilising the IOCP instance of the global Win32 thread pool. For each `reads`, `writes` and `barriers` which is non-zero, a corresponding zero length i/o is constructed and initiated. When the i/o completes, and all readable handles in the work item's set have data waiting to be read, and all writable handles in the work item's set have space to allow writes, only then is the work item invoked with the next piece of work. \note Non-seekable handle support is not implemented yet. */ class LLFIO_DECL io_aware_work_item : public work_item { public: //! Maximum i/o busyness above which throttling is to begin. float max_iosbusytime{0.95f}; //! Minimum i/o in progress to target if `iosbusytime` exceeded. The default of 16 suits SSDs, you want around 4 for spinning rust or NV-RAM. uint32_t min_iosinprogress{16}; //! Maximum i/o in progress to target if `iosbusytime` exceeded. The default of 32 suits SSDs, you want around 8 for spinning rust or NV-RAM. #ifdef _WIN32 uint32_t max_iosinprogress{1}; // windows appears to do a lot of i/o coalescing #else uint32_t max_iosinprogress{32}; #endif //! Information about an i/o handle this work item will use struct io_handle_awareness { //! An i/o handle this work item will use io_handle *h{nullptr}; //! The relative amount of reading done by this work item from the handle. float reads{0}; //! The relative amount of writing done by this work item to the handle. float writes{0}; //! The relative amount of write barriering done by this work item to the handle. float barriers{0}; void *_internal{nullptr}; }; private: const span _handles; LLFIO_HEADERS_ONLY_VIRTUAL_SPEC intptr_t next(deadline &d) noexcept override final; public: constexpr io_aware_work_item() {} /*! \brief Constructs a work item aware of i/o done to the handles in `hs`. Note that the `reads`, `writes` and `barriers` are normalised to proportions out of `1.0` by this constructor, so if for example you had `reads/writes/barriers = 200/100/0`, after normalisation those become `0.66/0.33/0.0` such that the total is `1.0`. If `reads/writes/barriers = 0/0/0` on entry, they are replaced with `0.5/0.5/0.0`. Note that normalisation is across *all* i/o handles in the set, so three handles each with `reads/writes/barriers = 200/100/0` on entry would have `0.22/0.11/0.0` each after construction. */ explicit LLFIO_HEADERS_ONLY_MEMFUNC_SPEC io_aware_work_item(span hs); io_aware_work_item(io_aware_work_item &&o) noexcept : work_item(std::move(o)) , _handles(o._handles) { } LLFIO_HEADERS_ONLY_MEMFUNC_SPEC ~io_aware_work_item(); //! The handles originally registered during construction. span handles() const noexcept { return _handles; } /*! \brief As for `work_item::next()`, but deadline may be extended to reduce i/o congestion on the hardware devices to which the handles refer. */ virtual intptr_t io_aware_next(deadline &d) noexcept = 0; }; virtual ~dynamic_thread_pool_group() {} /*! \brief A textual description of the underlying implementation of this dynamic thread pool group. The current possible underlying implementations are: - "Grand Central Dispatch" (Mac OS, FreeBSD, Linux) - "Linux native" (Linux) - "Win32 thread pool (Vista+)" (Windows) Which one is chosen depends on what was detected at cmake configure time, and possibly what the host OS running the program binary supports. */ static LLFIO_HEADERS_ONLY_MEMFUNC_SPEC const char *implementation_description() noexcept; /*! \brief Threadsafe. Submit one or more work items for execution. Note that you can submit more later. Note that if the group is currently stopping, you cannot submit more work until the group has stopped. An error code comparing equal to `errc::operation_canceled` is returned if you try. */ virtual result submit(span work) noexcept = 0; //! \overload result submit(work_item *wi) noexcept { return submit(span(&wi, 1)); } //! \overload LLFIO_TEMPLATE(class T) LLFIO_TREQUIRES(LLFIO_TPRED(!std::is_pointer::value), LLFIO_TPRED(std::is_base_of::value)) result submit(span wi) noexcept { auto *wis = (T **) alloca(sizeof(T *) * wi.size()); for(size_t n = 0; n < wi.size(); n++) { wis[n] = &wi[n]; } return submit(span((work_item **) wis, wi.size())); } //! Threadsafe. Cancel any remaining work previously submitted, but without blocking (use `wait()` to block). virtual result stop() noexcept = 0; /*! \brief Threadsafe. True if a work item reported an error, or `stop()` was called, but work items are still running. */ virtual bool stopping() const noexcept = 0; //! Threadsafe. True if all the work previously submitted is complete. virtual bool stopped() const noexcept = 0; //! Threadsafe. Wait for work previously submitted to complete, returning any failures by any work item. virtual result wait(deadline d = {}) const noexcept = 0; //! \overload template result wait_for(const std::chrono::duration &duration) const noexcept { auto r = wait(duration); if(!r && r.error() == errc::timed_out) { return false; } OUTCOME_TRY(std::move(r)); return true; } //! \overload template result wait_until(const std::chrono::time_point &timeout) const noexcept { auto r = wait(timeout); if(!r && r.error() == errc::timed_out) { return false; } OUTCOME_TRY(std::move(r)); return true; } //! Returns the work item nesting level which would be used if a new dynamic thread pool group were created within the current work item. static LLFIO_HEADERS_ONLY_MEMFUNC_SPEC size_t current_nesting_level() noexcept; //! Returns the work item the calling thread is running within, if any. static LLFIO_HEADERS_ONLY_MEMFUNC_SPEC work_item *current_work_item() noexcept; /*! \brief Returns the number of milliseconds that a thread is without work before it is shut down. Note that this will be zero on all but on Linux if using our local thread pool implementation, because the system controls this value on Windows, Grand Central Dispatch etc. */ static LLFIO_HEADERS_ONLY_MEMFUNC_SPEC uint32_t ms_sleep_for_more_work() noexcept; /*! \brief Sets the number of milliseconds that a thread is without work before it is shut down, returning the value actually set. Note that this will have no effect (and thus return zero) on all but on Linux if using our local thread pool implementation, because the system controls this value on Windows, Grand Central Dispatch etc. */ static LLFIO_HEADERS_ONLY_MEMFUNC_SPEC uint32_t ms_sleep_for_more_work(uint32_t v) noexcept; }; //! A unique ptr to a work group within the global dynamic thread pool. using dynamic_thread_pool_group_ptr = std::unique_ptr; //! Creates a new work group within the global dynamic thread pool. LLFIO_HEADERS_ONLY_FUNC_SPEC result make_dynamic_thread_pool_group() noexcept; // BEGIN make_free_functions.py // END make_free_functions.py LLFIO_V2_NAMESPACE_END #ifdef _MSC_VER #pragma warning(pop) #endif #if LLFIO_HEADERS_ONLY == 1 && !defined(DOXYGEN_SHOULD_SKIP_THIS) #define LLFIO_INCLUDED_BY_HEADER 1 #include "detail/impl/dynamic_thread_pool_group.ipp" #undef LLFIO_INCLUDED_BY_HEADER #endif #endif