diff options
author | Niall Douglas (s [underscore] sourceforge {at} nedprod [dot] com) <spamtrap@nedprod.com> | 2020-12-24 19:05:07 +0300 |
---|---|---|
committer | Niall Douglas (s [underscore] sourceforge {at} nedprod [dot] com) <spamtrap@nedprod.com> | 2021-03-16 13:21:38 +0300 |
commit | dfa6771c4f8ba0635a252a39d13a48c7c5c75489 (patch) | |
tree | d3f1ef27a142fd77e850440766add50a97d1858a /include | |
parent | 82fcea61c21b31ce325dc47401808dd45d4ef42c (diff) |
Add unit test to ensure nested dynamic thread pool groups work as expected. Still Windows only implementation.
Diffstat (limited to 'include')
-rw-r--r-- | include/llfio/v2.0/detail/impl/dynamic_thread_pool_group.ipp | 73 | ||||
-rw-r--r-- | include/llfio/v2.0/dynamic_thread_pool_group.hpp | 62 |
2 files changed, 115 insertions, 20 deletions
diff --git a/include/llfio/v2.0/detail/impl/dynamic_thread_pool_group.ipp b/include/llfio/v2.0/detail/impl/dynamic_thread_pool_group.ipp index c44dcde4..7e6eb2ac 100644 --- a/include/llfio/v2.0/detail/impl/dynamic_thread_pool_group.ipp +++ b/include/llfio/v2.0/detail/impl/dynamic_thread_pool_group.ipp @@ -475,7 +475,7 @@ namespace detail { (void) g; #ifdef _WIN32 - if(1 == i->_internalworkh_inuse) + if(i->_internalworkh_inuse > 0) { i->_internalworkh_inuse = 2; } @@ -504,6 +504,7 @@ namespace detail for(; v != nullptr; v = n) { v->_parent = nullptr; + v->_nextwork = -1; n = v->_next; } n = v = parent->_work_items_done.front; @@ -590,27 +591,49 @@ namespace detail auto *i = group->_work_items_active.front; if(nullptr != i->_internalworkh) { - i->_internalworkh_inuse = 1; + if(0 == i->_internalworkh_inuse) + { + i->_internalworkh_inuse = 1; + } g.unlock(); WaitForThreadpoolWorkCallbacks((PTP_WORK) i->_internalworkh, true); g.lock(); if(i->_internalworkh_inuse == 2) { - CloseThreadpoolWork((PTP_WORK) i->_internalworkh); - i->_internalworkh = nullptr; + if(nullptr != i->_internalworkh) + { + CloseThreadpoolWork((PTP_WORK) i->_internalworkh); + i->_internalworkh = nullptr; + } + if(nullptr != i->_internaltimerh) + { + CloseThreadpoolTimer((PTP_TIMER) i->_internaltimerh); + i->_internaltimerh = nullptr; + } } i->_internalworkh_inuse = 0; } if(nullptr != i->_internaltimerh) { - i->_internalworkh_inuse = 1; + if(0 == i->_internalworkh_inuse) + { + i->_internalworkh_inuse = 1; + } g.unlock(); WaitForThreadpoolTimerCallbacks((PTP_TIMER) i->_internaltimerh, true); g.lock(); if(i->_internalworkh_inuse == 2) { - CloseThreadpoolTimer((PTP_TIMER) i->_internaltimerh); - i->_internalworkh = nullptr; + if(nullptr != i->_internalworkh) + { + CloseThreadpoolWork((PTP_WORK) i->_internalworkh); + i->_internalworkh = nullptr; + } + if(nullptr != i->_internaltimerh) + { + CloseThreadpoolTimer((PTP_TIMER) i->_internaltimerh); + i->_internaltimerh = nullptr; + } } i->_internalworkh_inuse = 0; } @@ -632,27 +655,49 @@ namespace detail auto *i = group->_work_items_active.front; if(nullptr != i->_internalworkh) { - i->_internalworkh_inuse = 1; + if(0 == i->_internalworkh_inuse) + { + i->_internalworkh_inuse = 1; + } g.unlock(); WaitForThreadpoolWorkCallbacks((PTP_WORK) i->_internalworkh, false); g.lock(); if(i->_internalworkh_inuse == 2) { - CloseThreadpoolWork((PTP_WORK) i->_internalworkh); - i->_internalworkh = nullptr; + if(nullptr != i->_internalworkh) + { + CloseThreadpoolWork((PTP_WORK) i->_internalworkh); + i->_internalworkh = nullptr; + } + if(nullptr != i->_internaltimerh) + { + CloseThreadpoolTimer((PTP_TIMER) i->_internaltimerh); + i->_internaltimerh = nullptr; + } } i->_internalworkh_inuse = 0; } if(nullptr != i->_internaltimerh) { - i->_internalworkh_inuse = 1; + if(0 == i->_internalworkh_inuse) + { + i->_internalworkh_inuse = 1; + } g.unlock(); WaitForThreadpoolTimerCallbacks((PTP_TIMER) i->_internaltimerh, false); g.lock(); if(i->_internalworkh_inuse == 2) { - CloseThreadpoolTimer((PTP_TIMER) i->_internaltimerh); - i->_internalworkh = nullptr; + if(nullptr != i->_internalworkh) + { + CloseThreadpoolWork((PTP_WORK) i->_internalworkh); + i->_internalworkh = nullptr; + } + if(nullptr != i->_internaltimerh) + { + CloseThreadpoolTimer((PTP_TIMER) i->_internaltimerh); + i->_internaltimerh = nullptr; + } } i->_internalworkh_inuse = 0; } @@ -707,7 +752,7 @@ namespace detail auto old_thread_local_state = tls; tls.workitem = workitem; tls.current_callback_instance = selfthreadh; - tls.nesting_level++; + tls.nesting_level = workitem->_parent->_nesting_level + 1; auto r = (*workitem)(workitem->_nextwork); workitem->_nextwork = 0; // call next() next time tls = old_thread_local_state; diff --git a/include/llfio/v2.0/dynamic_thread_pool_group.hpp b/include/llfio/v2.0/dynamic_thread_pool_group.hpp index c216f023..d3b6279d 100644 --- a/include/llfio/v2.0/dynamic_thread_pool_group.hpp +++ b/include/llfio/v2.0/dynamic_thread_pool_group.hpp @@ -158,20 +158,56 @@ public: dynamic_thread_pool_group_impl *_parent{nullptr}; void *_internalworkh{nullptr}, *_internaltimerh{nullptr}; work_item *_prev{nullptr}, *_next{nullptr}; - intptr_t _nextwork{0}; + intptr_t _nextwork{-1}; std::chrono::steady_clock::time_point _timepoint1; std::chrono::system_clock::time_point _timepoint2; int _internalworkh_inuse{0}; protected: work_item() = default; - work_item(const work_item &) = default; - work_item(work_item &&) = default; - work_item &operator=(const work_item &) = default; - work_item &operator=(work_item &&) = default; + work_item(const work_item &o) = delete; + work_item(work_item &&o) noexcept + : _parent(o._parent) + , _internalworkh(o._internalworkh) + , _internaltimerh(o._internaltimerh) + , _prev(o._prev) + , _next(o._next) + , _nextwork(o._nextwork) + , _timepoint1(o._timepoint1) + , _timepoint2(o._timepoint2) + { + assert(o._parent == nullptr); + assert(o._internalworkh == nullptr); + assert(o._internaltimerh == nullptr); + if(o._parent != nullptr || o._internalworkh != nullptr) + { + LLFIO_LOG_FATAL(this, "FATAL: dynamic_thread_pool_group::work_item was relocated in memory during use!"); + abort(); + } + o._prev = o._next = nullptr; + o._nextwork = -1; + } + work_item &operator=(const work_item &) = delete; + work_item &operator=(work_item &&) = delete; public: - virtual ~work_item() {} + virtual ~work_item() + { + assert(_nextwork == -1); + if(_nextwork != -1) + { + LLFIO_LOG_FATAL(this, "FATAL: dynamic_thread_pool_group::work_item destroyed before all work was done!"); + abort(); + } + assert(_internalworkh == nullptr); + assert(_internaltimerh == nullptr); + assert(_parent == nullptr); + if(_internalworkh != nullptr || _parent != nullptr) + { + LLFIO_LOG_FATAL(this, "FATAL: dynamic_thread_pool_group::work_item destroyed before group_complete() was executed!"); + abort(); + } + } //! Returns the parent work group between successful submission and just before `group_complete()`. dynamic_thread_pool_group *parent() const noexcept { return reinterpret_cast<dynamic_thread_pool_group *>(_parent); } @@ -245,6 +281,20 @@ public: `errc::operation_canceled` is returned if you try. */ virtual result<void> submit(span<work_item *> work) noexcept = 0; + //! \overload + result<void> submit(work_item *wi) noexcept { return submit(span<work_item *>(&wi, 1)); } + //! \overload + LLFIO_TEMPLATE(class T) + LLFIO_TREQUIRES(LLFIO_TPRED(!std::is_pointer<T>::value), LLFIO_TPRED(std::is_base_of<work_item, T>::value)) + result<void> submit(span<T> wi) noexcept + { + auto *wis = (T **) alloca(sizeof(T *) * wi.size()); + for(size_t n = 0; n < wi.size(); n++) + { + wis[n] = &wi[n]; + } + return submit(span<work_item *>((work_item **) wis, wi.size())); + } //! Threadsafe. Cancel any remaining work previously submitted, but without blocking (use `wait()` to block). virtual result<void> stop() noexcept = 0; |