diff options
author | Niall Douglas (s [underscore] sourceforge {at} nedprod [dot] com) <spamtrap@nedprod.com> | 2020-12-24 19:05:07 +0300 |
---|---|---|
committer | Niall Douglas (s [underscore] sourceforge {at} nedprod [dot] com) <spamtrap@nedprod.com> | 2021-03-16 13:21:38 +0300 |
commit | dfa6771c4f8ba0635a252a39d13a48c7c5c75489 (patch) | |
tree | d3f1ef27a142fd77e850440766add50a97d1858a | |
parent | 82fcea61c21b31ce325dc47401808dd45d4ef42c (diff) |
Add unit test to ensure nested dynamic thread pool groups work as expected. Still Windows only implementation.
-rw-r--r-- | include/llfio/v2.0/detail/impl/dynamic_thread_pool_group.ipp | 73 | ||||
-rw-r--r-- | include/llfio/v2.0/dynamic_thread_pool_group.hpp | 62 | ||||
-rw-r--r-- | test/tests/dynamic_thread_pool_group.cpp | 335 |
3 files changed, 450 insertions, 20 deletions
diff --git a/include/llfio/v2.0/detail/impl/dynamic_thread_pool_group.ipp b/include/llfio/v2.0/detail/impl/dynamic_thread_pool_group.ipp index c44dcde4..7e6eb2ac 100644 --- a/include/llfio/v2.0/detail/impl/dynamic_thread_pool_group.ipp +++ b/include/llfio/v2.0/detail/impl/dynamic_thread_pool_group.ipp @@ -475,7 +475,7 @@ namespace detail { (void) g; #ifdef _WIN32 - if(1 == i->_internalworkh_inuse) + if(i->_internalworkh_inuse > 0) { i->_internalworkh_inuse = 2; } @@ -504,6 +504,7 @@ namespace detail for(; v != nullptr; v = n) { v->_parent = nullptr; + v->_nextwork = -1; n = v->_next; } n = v = parent->_work_items_done.front; @@ -590,27 +591,49 @@ namespace detail auto *i = group->_work_items_active.front; if(nullptr != i->_internalworkh) { - i->_internalworkh_inuse = 1; + if(0 == i->_internalworkh_inuse) + { + i->_internalworkh_inuse = 1; + } g.unlock(); WaitForThreadpoolWorkCallbacks((PTP_WORK) i->_internalworkh, true); g.lock(); if(i->_internalworkh_inuse == 2) { - CloseThreadpoolWork((PTP_WORK) i->_internalworkh); - i->_internalworkh = nullptr; + if(nullptr != i->_internalworkh) + { + CloseThreadpoolWork((PTP_WORK) i->_internalworkh); + i->_internalworkh = nullptr; + } + if(nullptr != i->_internaltimerh) + { + CloseThreadpoolTimer((PTP_TIMER) i->_internaltimerh); + i->_internaltimerh = nullptr; + } } i->_internalworkh_inuse = 0; } if(nullptr != i->_internaltimerh) { - i->_internalworkh_inuse = 1; + if(0 == i->_internalworkh_inuse) + { + i->_internalworkh_inuse = 1; + } g.unlock(); WaitForThreadpoolTimerCallbacks((PTP_TIMER) i->_internaltimerh, true); g.lock(); if(i->_internalworkh_inuse == 2) { - CloseThreadpoolTimer((PTP_TIMER) i->_internaltimerh); - i->_internalworkh = nullptr; + if(nullptr != i->_internalworkh) + { + CloseThreadpoolWork((PTP_WORK) i->_internalworkh); + i->_internalworkh = nullptr; + } + if(nullptr != i->_internaltimerh) + { + CloseThreadpoolTimer((PTP_TIMER) i->_internaltimerh); + i->_internaltimerh = nullptr; + } } i->_internalworkh_inuse = 0; } @@ -632,27 +655,49 @@ namespace detail auto *i = group->_work_items_active.front; if(nullptr != i->_internalworkh) { - i->_internalworkh_inuse = 1; + if(0 == i->_internalworkh_inuse) + { + i->_internalworkh_inuse = 1; + } g.unlock(); WaitForThreadpoolWorkCallbacks((PTP_WORK) i->_internalworkh, false); g.lock(); if(i->_internalworkh_inuse == 2) { - CloseThreadpoolWork((PTP_WORK) i->_internalworkh); - i->_internalworkh = nullptr; + if(nullptr != i->_internalworkh) + { + CloseThreadpoolWork((PTP_WORK) i->_internalworkh); + i->_internalworkh = nullptr; + } + if(nullptr != i->_internaltimerh) + { + CloseThreadpoolTimer((PTP_TIMER) i->_internaltimerh); + i->_internaltimerh = nullptr; + } } i->_internalworkh_inuse = 0; } if(nullptr != i->_internaltimerh) { - i->_internalworkh_inuse = 1; + if(0 == i->_internalworkh_inuse) + { + i->_internalworkh_inuse = 1; + } g.unlock(); WaitForThreadpoolTimerCallbacks((PTP_TIMER) i->_internaltimerh, false); g.lock(); if(i->_internalworkh_inuse == 2) { - CloseThreadpoolTimer((PTP_TIMER) i->_internaltimerh); - i->_internalworkh = nullptr; + if(nullptr != i->_internalworkh) + { + CloseThreadpoolWork((PTP_WORK) i->_internalworkh); + i->_internalworkh = nullptr; + } + if(nullptr != i->_internaltimerh) + { + CloseThreadpoolTimer((PTP_TIMER) i->_internaltimerh); + i->_internaltimerh = nullptr; + } } i->_internalworkh_inuse = 0; } @@ -707,7 +752,7 @@ namespace detail auto old_thread_local_state = tls; tls.workitem = workitem; tls.current_callback_instance = selfthreadh; - tls.nesting_level++; + tls.nesting_level = workitem->_parent->_nesting_level + 1; auto r = (*workitem)(workitem->_nextwork); workitem->_nextwork = 0; // call next() next time tls = old_thread_local_state; diff --git a/include/llfio/v2.0/dynamic_thread_pool_group.hpp b/include/llfio/v2.0/dynamic_thread_pool_group.hpp index c216f023..d3b6279d 100644 --- a/include/llfio/v2.0/dynamic_thread_pool_group.hpp +++ b/include/llfio/v2.0/dynamic_thread_pool_group.hpp @@ -158,20 +158,56 @@ public: dynamic_thread_pool_group_impl *_parent{nullptr}; void *_internalworkh{nullptr}, *_internaltimerh{nullptr}; work_item *_prev{nullptr}, *_next{nullptr}; - intptr_t _nextwork{0}; + intptr_t _nextwork{-1}; std::chrono::steady_clock::time_point _timepoint1; std::chrono::system_clock::time_point _timepoint2; int _internalworkh_inuse{0}; protected: work_item() = default; - work_item(const work_item &) = default; - work_item(work_item &&) = default; - work_item &operator=(const work_item &) = default; - work_item &operator=(work_item &&) = default; + work_item(const work_item &o) = delete; + work_item(work_item &&o) noexcept + : _parent(o._parent) + , _internalworkh(o._internalworkh) + , _internaltimerh(o._internaltimerh) + , _prev(o._prev) + , _next(o._next) + , _nextwork(o._nextwork) + , _timepoint1(o._timepoint1) + , _timepoint2(o._timepoint2) + { + assert(o._parent == nullptr); + assert(o._internalworkh == nullptr); + assert(o._internaltimerh == nullptr); + if(o._parent != nullptr || o._internalworkh != nullptr) + { + LLFIO_LOG_FATAL(this, "FATAL: dynamic_thread_pool_group::work_item was relocated in memory during use!"); + abort(); + } + o._prev = o._next = nullptr; + o._nextwork = -1; + } + work_item &operator=(const work_item &) = delete; + work_item &operator=(work_item &&) = delete; public: - virtual ~work_item() {} + virtual ~work_item() + { + assert(_nextwork == -1); + if(_nextwork != -1) + { + LLFIO_LOG_FATAL(this, "FATAL: dynamic_thread_pool_group::work_item destroyed before all work was done!"); + abort(); + } + assert(_internalworkh == nullptr); + assert(_internaltimerh == nullptr); + assert(_parent == nullptr); + if(_internalworkh != nullptr || _parent != nullptr) + { + LLFIO_LOG_FATAL(this, "FATAL: dynamic_thread_pool_group::work_item destroyed before group_complete() was executed!"); + abort(); + } + } //! Returns the parent work group between successful submission and just before `group_complete()`. dynamic_thread_pool_group *parent() const noexcept { return reinterpret_cast<dynamic_thread_pool_group *>(_parent); } @@ -245,6 +281,20 @@ public: `errc::operation_canceled` is returned if you try. */ virtual result<void> submit(span<work_item *> work) noexcept = 0; + //! \overload + result<void> submit(work_item *wi) noexcept { return submit(span<work_item *>(&wi, 1)); } + //! \overload + LLFIO_TEMPLATE(class T) + LLFIO_TREQUIRES(LLFIO_TPRED(!std::is_pointer<T>::value), LLFIO_TPRED(std::is_base_of<work_item, T>::value)) + result<void> submit(span<T> wi) noexcept + { + auto *wis = (T **) alloca(sizeof(T *) * wi.size()); + for(size_t n = 0; n < wi.size(); n++) + { + wis[n] = &wi[n]; + } + return submit(span<work_item *>((work_item **) wis, wi.size())); + } //! Threadsafe. Cancel any remaining work previously submitted, but without blocking (use `wait()` to block). virtual result<void> stop() noexcept = 0; diff --git a/test/tests/dynamic_thread_pool_group.cpp b/test/tests/dynamic_thread_pool_group.cpp new file mode 100644 index 00000000..8e068583 --- /dev/null +++ b/test/tests/dynamic_thread_pool_group.cpp @@ -0,0 +1,335 @@ +/* Integration test kernel for dynamic_thread_pool_group +(C) 2020 Niall Douglas <http://www.nedproductions.biz/> (2 commits) +File Created: Dec 2020 + + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License in the accompanying file +Licence.txt or at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + + +Distributed under the Boost Software License, Version 1.0. + (See accompanying file Licence.txt or copy at + http://www.boost.org/LICENSE_1_0.txt) +*/ + +//#define LLFIO_LOGGING_LEVEL 99 + +#include "../test_kernel_decl.hpp" + +static inline void TestDynamicThreadPoolGroupWorks() +{ + namespace llfio = LLFIO_V2_NAMESPACE; + // llfio::log_level_guard llg(llfio::log_level::all); + struct work_item; + struct shared_state_t + { + std::atomic<intptr_t> p{0}; + std::atomic<size_t> concurrency{0}, max_concurrency{0}, group_completes{0}; + std::vector<size_t> executed; + llfio::dynamic_thread_pool_group_ptr tpg{llfio::make_dynamic_thread_pool_group().value()}; + std::atomic<bool> cancelling{false}; + } shared_state; + struct work_item final : public llfio::dynamic_thread_pool_group::work_item + { + using _base = llfio::dynamic_thread_pool_group::work_item; + shared_state_t *shared{nullptr}; + std::atomic<bool> within{false}; + + work_item() = default; + explicit work_item(shared_state_t *_shared) + : shared(_shared) + { + } + work_item(work_item &&o) noexcept + : _base(std::move(o)) + , shared(o.shared) + { + } + + virtual intptr_t next(llfio::deadline &d) noexcept override + { + bool expected = false; + BOOST_CHECK(within.compare_exchange_strong(expected, true)); + (void) d; + BOOST_CHECK(parent() == shared->tpg.get()); + auto ret = shared->p.fetch_sub(1); + if(ret < 0) + { + ret = -1; + } + // std::cout << " next() returns " << ret << std::endl; + expected = true; + BOOST_CHECK(within.compare_exchange_strong(expected, false)); + return ret; + } + virtual llfio::result<void> operator()(intptr_t work) noexcept override + { + bool expected = false; + BOOST_CHECK(within.compare_exchange_strong(expected, true)); + auto concurrency = shared->concurrency.fetch_add(1) + 1; + for(size_t expected_ = shared->max_concurrency; concurrency > expected_;) + { + shared->max_concurrency.compare_exchange_weak(expected_, concurrency); + } + BOOST_CHECK(parent() == shared->tpg.get()); + BOOST_CHECK(llfio::dynamic_thread_pool_group::current_nesting_level() == 1); + BOOST_CHECK(llfio::dynamic_thread_pool_group::current_work_item() == this); + // std::cout << " () executes " << work << std::endl; + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + auto *executed = (std::atomic<size_t> *) &shared->executed[work]; + executed->fetch_add(1); + shared->concurrency.fetch_sub(1); + expected = true; + BOOST_CHECK(within.compare_exchange_strong(expected, false)); + return llfio::success(); + } + virtual void group_complete(const llfio::result<void> &cancelled) noexcept override + { + bool expected = false; + BOOST_CHECK(within.compare_exchange_strong(expected, true)); + BOOST_CHECK(parent() == nullptr); + BOOST_CHECK(shared->cancelling == cancelled.has_error()); + // std::cout << " group_complete()" << std::endl; + shared->group_completes.fetch_add(1); + expected = true; + BOOST_CHECK(within.compare_exchange_strong(expected, false)); + } + }; + std::vector<work_item> workitems; + auto reset = [&](size_t count) { + workitems.clear(); + shared_state.executed.clear(); + shared_state.executed.resize(count + 1); + for(size_t n = 0; n < count; n++) + { + workitems.emplace_back(&shared_state); + } + shared_state.p = (intptr_t) count; + shared_state.concurrency = 0; + shared_state.max_concurrency = 0; + shared_state.group_completes = 0; + }; + auto submit = [&] { + auto **wis = (llfio::dynamic_thread_pool_group::work_item **) alloca(sizeof(work_item *) * workitems.size()); + for(size_t n = 0; n < workitems.size(); n++) + { + wis[n] = &workitems[n]; + } + BOOST_CHECK(!shared_state.tpg->stopping()); + BOOST_CHECK(shared_state.tpg->stopped()); + BOOST_CHECK(llfio::dynamic_thread_pool_group::current_nesting_level() == 0); + BOOST_CHECK(llfio::dynamic_thread_pool_group::current_work_item() == nullptr); + for(size_t n = 0; n < workitems.size(); n++) + { + BOOST_CHECK(workitems[n].parent() == nullptr); + } + shared_state.tpg->submit({wis, workitems.size()}).value(); + BOOST_CHECK(!shared_state.tpg->stopping()); + BOOST_CHECK(!shared_state.tpg->stopped()); + for(size_t n = 0; n < workitems.size(); n++) + { + BOOST_CHECK(workitems[n].parent() == shared_state.tpg.get()); + } + BOOST_CHECK(llfio::dynamic_thread_pool_group::current_nesting_level() == 0); + BOOST_CHECK(llfio::dynamic_thread_pool_group::current_work_item() == nullptr); + }; + auto check = [&] { + shared_state.tpg->wait().value(); + BOOST_CHECK(!shared_state.tpg->stopping()); + BOOST_CHECK(shared_state.tpg->stopped()); + BOOST_CHECK(llfio::dynamic_thread_pool_group::current_nesting_level() == 0); + BOOST_CHECK(llfio::dynamic_thread_pool_group::current_work_item() == nullptr); + for(size_t n = 0; n < workitems.size(); n++) + { + BOOST_CHECK(workitems[n].parent() == nullptr); + } + BOOST_CHECK(shared_state.group_completes == workitems.size()); + BOOST_CHECK(shared_state.executed[0] == 0); + if(shared_state.cancelling) + { + size_t executed = 0, notexecuted = 0; + for(size_t n = 1; n <= workitems.size(); n++) + { + if(shared_state.executed[n] == 1) + { + executed++; + } + else + { + notexecuted++; + } + } + std::cout << "During cancellation, executed " << executed << " and did not execute " << notexecuted << std::endl; + } + else + { + for(size_t n = 1; n <= workitems.size(); n++) + { + BOOST_CHECK(shared_state.executed[n] == 1); + if(shared_state.executed[n] != 1) + { + std::cout << "shared_state.executed[" << n << "] = " << shared_state.executed[n] << std::endl; + } + } + } + std::cout << "Maximum concurrency achieved with " << workitems.size() << " work items = " << shared_state.max_concurrency << "\n" << std::endl; + }; + + // Test a single work item + reset(1); + submit(); + check(); + + // Test 10 work items + reset(10); + submit(); + check(); + + // Test 1000 work items + reset(1000); + submit(); + check(); + + // Test 1000 work items with stop + reset(1000); + submit(); + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + shared_state.cancelling = true; + shared_state.tpg->stop().value(); + BOOST_CHECK(shared_state.tpg->stopping()); + auto r = shared_state.tpg->wait(); + BOOST_CHECK(!shared_state.tpg->stopping()); + BOOST_REQUIRE(!r); + BOOST_CHECK(r.error() == llfio::errc::operation_canceled); + check(); +} + +static inline void TestDynamicThreadPoolGroupNestingWorks() +{ + namespace llfio = LLFIO_V2_NAMESPACE; + static constexpr size_t MAX_NESTING = 10; + static constexpr int COUNT_PER_WORK_ITEM = 1000; + struct shared_state_t + { + std::mutex lock; + std::unordered_map<uint64_t, size_t> time_bucket; + llfio::dynamic_thread_pool_group_ptr tpg; + double stddev{0}; + void calc_stddev() + { + stddev = 0; + uint64_t mean = 0, count = 0; + for(auto &i : time_bucket) + { + mean += i.first*i.second; + count += i.second; + } + mean /= count; + for(auto &i : time_bucket) + { + double diff = (double) abs((int64_t) i.first - (int64_t) mean); + stddev += diff * diff * i.second; + } + stddev /= count; + stddev = sqrt(stddev); + } + } shared_states[MAX_NESTING]; + struct work_item final : public llfio::dynamic_thread_pool_group::work_item + { + using _base = llfio::dynamic_thread_pool_group::work_item; + const size_t nesting{0}; + llfio::span<shared_state_t> shared_states; + std::atomic<int> count{COUNT_PER_WORK_ITEM}; + std::unique_ptr<work_item> childwi; + + work_item() = default; + explicit work_item(size_t _nesting, llfio::span<shared_state_t> _shared_states) + : nesting(_nesting) + , shared_states(_shared_states) + { + if(nesting + 1 < MAX_NESTING) + { + childwi = std::make_unique<work_item>(nesting + 1, shared_states); + } + } + work_item(work_item &&o) noexcept + : nesting(o.nesting) + , shared_states(o.shared_states) + , childwi(std::move(o.childwi)) + { + } + + virtual intptr_t next(llfio::deadline & /*unused*/) noexcept override + { + auto ret = count.fetch_sub(1); + if(ret <= 0) + { + ret = -1; + } + return ret; + } + virtual llfio::result<void> operator()(intptr_t work) noexcept override + { + auto supposed_nesting_level = llfio::dynamic_thread_pool_group::current_nesting_level(); + BOOST_CHECK(nesting + 1 == supposed_nesting_level); + if(nesting + 1 != supposed_nesting_level) + { + std::cerr << "current_nesting_level() reports " << supposed_nesting_level << " not " << (nesting + 1) << std::endl; + } + uint64_t idx = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now().time_since_epoch()).count(); + std::lock_guard<std::mutex> g(shared_states[nesting].lock); + if(COUNT_PER_WORK_ITEM == work && childwi) + { + if(!shared_states[nesting].tpg) + { + shared_states[nesting].tpg = llfio::make_dynamic_thread_pool_group().value(); + } + OUTCOME_TRY(shared_states[nesting].tpg->submit(childwi.get())); + } + shared_states[nesting].time_bucket[idx]++; + return llfio::success(); + } + // virtual void group_complete(const llfio::result<void> &/*unused*/) noexcept override { } + }; + std::vector<work_item> workitems; + for(size_t n = 0; n < 100; n++) + { + workitems.emplace_back(0, shared_states); + } + auto tpg = llfio::make_dynamic_thread_pool_group().value(); + tpg->submit(llfio::span<work_item>(workitems)).value(); + tpg->wait().value(); + for(size_t n = 0; n < MAX_NESTING - 1; n++) + { + std::unique_lock<std::mutex> g(shared_states[n].lock); + while(!shared_states[n].tpg) + { + g.unlock(); + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + g.lock(); + } + g.unlock(); + shared_states[n].tpg->wait().value(); + } + for(size_t n = 0; n < MAX_NESTING; n++) + { + shared_states[n].calc_stddev(); + std::cout << " Standard deviation for nesting level " << (n + 1) << " was " << shared_states[n].stddev << std::endl; + } + BOOST_CHECK(shared_states[MAX_NESTING - 1].stddev < shared_states[MAX_NESTING / 2].stddev / 2); +} + +KERNELTEST_TEST_KERNEL(integration, llfio, dynamic_thread_pool_group, works, "Tests that llfio::dynamic_thread_pool_group works as expected", + TestDynamicThreadPoolGroupWorks()) +KERNELTEST_TEST_KERNEL(integration, llfio, dynamic_thread_pool_group, nested, "Tests that nesting of llfio::dynamic_thread_pool_group works as expected", + TestDynamicThreadPoolGroupNestingWorks()) |