From dfa6771c4f8ba0635a252a39d13a48c7c5c75489 Mon Sep 17 00:00:00 2001 From: "Niall Douglas (s [underscore] sourceforge {at} nedprod [dot] com)" Date: Thu, 24 Dec 2020 16:05:07 +0000 Subject: Add unit test to ensure nested dynamic thread pool groups work as expected. Still Windows only implementation. --- test/tests/dynamic_thread_pool_group.cpp | 335 +++++++++++++++++++++++++++++++ 1 file changed, 335 insertions(+) create mode 100644 test/tests/dynamic_thread_pool_group.cpp (limited to 'test') diff --git a/test/tests/dynamic_thread_pool_group.cpp b/test/tests/dynamic_thread_pool_group.cpp new file mode 100644 index 00000000..8e068583 --- /dev/null +++ b/test/tests/dynamic_thread_pool_group.cpp @@ -0,0 +1,335 @@ +/* Integration test kernel for dynamic_thread_pool_group +(C) 2020 Niall Douglas (2 commits) +File Created: Dec 2020 + + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License in the accompanying file +Licence.txt or at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + + +Distributed under the Boost Software License, Version 1.0. + (See accompanying file Licence.txt or copy at + http://www.boost.org/LICENSE_1_0.txt) +*/ + +//#define LLFIO_LOGGING_LEVEL 99 + +#include "../test_kernel_decl.hpp" + +static inline void TestDynamicThreadPoolGroupWorks() +{ + namespace llfio = LLFIO_V2_NAMESPACE; + // llfio::log_level_guard llg(llfio::log_level::all); + struct work_item; + struct shared_state_t + { + std::atomic p{0}; + std::atomic concurrency{0}, max_concurrency{0}, group_completes{0}; + std::vector executed; + llfio::dynamic_thread_pool_group_ptr tpg{llfio::make_dynamic_thread_pool_group().value()}; + std::atomic cancelling{false}; + } shared_state; + struct work_item final : public llfio::dynamic_thread_pool_group::work_item + { + using _base = llfio::dynamic_thread_pool_group::work_item; + shared_state_t *shared{nullptr}; + std::atomic within{false}; + + work_item() = default; + explicit work_item(shared_state_t *_shared) + : shared(_shared) + { + } + work_item(work_item &&o) noexcept + : _base(std::move(o)) + , shared(o.shared) + { + } + + virtual intptr_t next(llfio::deadline &d) noexcept override + { + bool expected = false; + BOOST_CHECK(within.compare_exchange_strong(expected, true)); + (void) d; + BOOST_CHECK(parent() == shared->tpg.get()); + auto ret = shared->p.fetch_sub(1); + if(ret < 0) + { + ret = -1; + } + // std::cout << " next() returns " << ret << std::endl; + expected = true; + BOOST_CHECK(within.compare_exchange_strong(expected, false)); + return ret; + } + virtual llfio::result operator()(intptr_t work) noexcept override + { + bool expected = false; + BOOST_CHECK(within.compare_exchange_strong(expected, true)); + auto concurrency = shared->concurrency.fetch_add(1) + 1; + for(size_t expected_ = shared->max_concurrency; concurrency > expected_;) + { + shared->max_concurrency.compare_exchange_weak(expected_, concurrency); + } + BOOST_CHECK(parent() == shared->tpg.get()); + BOOST_CHECK(llfio::dynamic_thread_pool_group::current_nesting_level() == 1); + BOOST_CHECK(llfio::dynamic_thread_pool_group::current_work_item() == this); + // std::cout << " () executes " << work << std::endl; + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + auto *executed = (std::atomic *) &shared->executed[work]; + executed->fetch_add(1); + shared->concurrency.fetch_sub(1); + expected = true; + BOOST_CHECK(within.compare_exchange_strong(expected, false)); + return llfio::success(); + } + virtual void group_complete(const llfio::result &cancelled) noexcept override + { + bool expected = false; + BOOST_CHECK(within.compare_exchange_strong(expected, true)); + BOOST_CHECK(parent() == nullptr); + BOOST_CHECK(shared->cancelling == cancelled.has_error()); + // std::cout << " group_complete()" << std::endl; + shared->group_completes.fetch_add(1); + expected = true; + BOOST_CHECK(within.compare_exchange_strong(expected, false)); + } + }; + std::vector workitems; + auto reset = [&](size_t count) { + workitems.clear(); + shared_state.executed.clear(); + shared_state.executed.resize(count + 1); + for(size_t n = 0; n < count; n++) + { + workitems.emplace_back(&shared_state); + } + shared_state.p = (intptr_t) count; + shared_state.concurrency = 0; + shared_state.max_concurrency = 0; + shared_state.group_completes = 0; + }; + auto submit = [&] { + auto **wis = (llfio::dynamic_thread_pool_group::work_item **) alloca(sizeof(work_item *) * workitems.size()); + for(size_t n = 0; n < workitems.size(); n++) + { + wis[n] = &workitems[n]; + } + BOOST_CHECK(!shared_state.tpg->stopping()); + BOOST_CHECK(shared_state.tpg->stopped()); + BOOST_CHECK(llfio::dynamic_thread_pool_group::current_nesting_level() == 0); + BOOST_CHECK(llfio::dynamic_thread_pool_group::current_work_item() == nullptr); + for(size_t n = 0; n < workitems.size(); n++) + { + BOOST_CHECK(workitems[n].parent() == nullptr); + } + shared_state.tpg->submit({wis, workitems.size()}).value(); + BOOST_CHECK(!shared_state.tpg->stopping()); + BOOST_CHECK(!shared_state.tpg->stopped()); + for(size_t n = 0; n < workitems.size(); n++) + { + BOOST_CHECK(workitems[n].parent() == shared_state.tpg.get()); + } + BOOST_CHECK(llfio::dynamic_thread_pool_group::current_nesting_level() == 0); + BOOST_CHECK(llfio::dynamic_thread_pool_group::current_work_item() == nullptr); + }; + auto check = [&] { + shared_state.tpg->wait().value(); + BOOST_CHECK(!shared_state.tpg->stopping()); + BOOST_CHECK(shared_state.tpg->stopped()); + BOOST_CHECK(llfio::dynamic_thread_pool_group::current_nesting_level() == 0); + BOOST_CHECK(llfio::dynamic_thread_pool_group::current_work_item() == nullptr); + for(size_t n = 0; n < workitems.size(); n++) + { + BOOST_CHECK(workitems[n].parent() == nullptr); + } + BOOST_CHECK(shared_state.group_completes == workitems.size()); + BOOST_CHECK(shared_state.executed[0] == 0); + if(shared_state.cancelling) + { + size_t executed = 0, notexecuted = 0; + for(size_t n = 1; n <= workitems.size(); n++) + { + if(shared_state.executed[n] == 1) + { + executed++; + } + else + { + notexecuted++; + } + } + std::cout << "During cancellation, executed " << executed << " and did not execute " << notexecuted << std::endl; + } + else + { + for(size_t n = 1; n <= workitems.size(); n++) + { + BOOST_CHECK(shared_state.executed[n] == 1); + if(shared_state.executed[n] != 1) + { + std::cout << "shared_state.executed[" << n << "] = " << shared_state.executed[n] << std::endl; + } + } + } + std::cout << "Maximum concurrency achieved with " << workitems.size() << " work items = " << shared_state.max_concurrency << "\n" << std::endl; + }; + + // Test a single work item + reset(1); + submit(); + check(); + + // Test 10 work items + reset(10); + submit(); + check(); + + // Test 1000 work items + reset(1000); + submit(); + check(); + + // Test 1000 work items with stop + reset(1000); + submit(); + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + shared_state.cancelling = true; + shared_state.tpg->stop().value(); + BOOST_CHECK(shared_state.tpg->stopping()); + auto r = shared_state.tpg->wait(); + BOOST_CHECK(!shared_state.tpg->stopping()); + BOOST_REQUIRE(!r); + BOOST_CHECK(r.error() == llfio::errc::operation_canceled); + check(); +} + +static inline void TestDynamicThreadPoolGroupNestingWorks() +{ + namespace llfio = LLFIO_V2_NAMESPACE; + static constexpr size_t MAX_NESTING = 10; + static constexpr int COUNT_PER_WORK_ITEM = 1000; + struct shared_state_t + { + std::mutex lock; + std::unordered_map time_bucket; + llfio::dynamic_thread_pool_group_ptr tpg; + double stddev{0}; + void calc_stddev() + { + stddev = 0; + uint64_t mean = 0, count = 0; + for(auto &i : time_bucket) + { + mean += i.first*i.second; + count += i.second; + } + mean /= count; + for(auto &i : time_bucket) + { + double diff = (double) abs((int64_t) i.first - (int64_t) mean); + stddev += diff * diff * i.second; + } + stddev /= count; + stddev = sqrt(stddev); + } + } shared_states[MAX_NESTING]; + struct work_item final : public llfio::dynamic_thread_pool_group::work_item + { + using _base = llfio::dynamic_thread_pool_group::work_item; + const size_t nesting{0}; + llfio::span shared_states; + std::atomic count{COUNT_PER_WORK_ITEM}; + std::unique_ptr childwi; + + work_item() = default; + explicit work_item(size_t _nesting, llfio::span _shared_states) + : nesting(_nesting) + , shared_states(_shared_states) + { + if(nesting + 1 < MAX_NESTING) + { + childwi = std::make_unique(nesting + 1, shared_states); + } + } + work_item(work_item &&o) noexcept + : nesting(o.nesting) + , shared_states(o.shared_states) + , childwi(std::move(o.childwi)) + { + } + + virtual intptr_t next(llfio::deadline & /*unused*/) noexcept override + { + auto ret = count.fetch_sub(1); + if(ret <= 0) + { + ret = -1; + } + return ret; + } + virtual llfio::result operator()(intptr_t work) noexcept override + { + auto supposed_nesting_level = llfio::dynamic_thread_pool_group::current_nesting_level(); + BOOST_CHECK(nesting + 1 == supposed_nesting_level); + if(nesting + 1 != supposed_nesting_level) + { + std::cerr << "current_nesting_level() reports " << supposed_nesting_level << " not " << (nesting + 1) << std::endl; + } + uint64_t idx = std::chrono::duration_cast(std::chrono::steady_clock::now().time_since_epoch()).count(); + std::lock_guard g(shared_states[nesting].lock); + if(COUNT_PER_WORK_ITEM == work && childwi) + { + if(!shared_states[nesting].tpg) + { + shared_states[nesting].tpg = llfio::make_dynamic_thread_pool_group().value(); + } + OUTCOME_TRY(shared_states[nesting].tpg->submit(childwi.get())); + } + shared_states[nesting].time_bucket[idx]++; + return llfio::success(); + } + // virtual void group_complete(const llfio::result &/*unused*/) noexcept override { } + }; + std::vector workitems; + for(size_t n = 0; n < 100; n++) + { + workitems.emplace_back(0, shared_states); + } + auto tpg = llfio::make_dynamic_thread_pool_group().value(); + tpg->submit(llfio::span(workitems)).value(); + tpg->wait().value(); + for(size_t n = 0; n < MAX_NESTING - 1; n++) + { + std::unique_lock g(shared_states[n].lock); + while(!shared_states[n].tpg) + { + g.unlock(); + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + g.lock(); + } + g.unlock(); + shared_states[n].tpg->wait().value(); + } + for(size_t n = 0; n < MAX_NESTING; n++) + { + shared_states[n].calc_stddev(); + std::cout << " Standard deviation for nesting level " << (n + 1) << " was " << shared_states[n].stddev << std::endl; + } + BOOST_CHECK(shared_states[MAX_NESTING - 1].stddev < shared_states[MAX_NESTING / 2].stddev / 2); +} + +KERNELTEST_TEST_KERNEL(integration, llfio, dynamic_thread_pool_group, works, "Tests that llfio::dynamic_thread_pool_group works as expected", + TestDynamicThreadPoolGroupWorks()) +KERNELTEST_TEST_KERNEL(integration, llfio, dynamic_thread_pool_group, nested, "Tests that nesting of llfio::dynamic_thread_pool_group works as expected", + TestDynamicThreadPoolGroupNestingWorks()) -- cgit v1.2.3