Welcome to mirror list, hosted at ThFree Co, Russian Federation.

github.com/windirstat/llfio.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
path: root/test
diff options
context:
space:
mode:
authorNiall Douglas (s [underscore] sourceforge {at} nedprod [dot] com) <spamtrap@nedprod.com>2020-12-24 19:05:07 +0300
committerNiall Douglas (s [underscore] sourceforge {at} nedprod [dot] com) <spamtrap@nedprod.com>2021-03-16 13:21:38 +0300
commitdfa6771c4f8ba0635a252a39d13a48c7c5c75489 (patch)
treed3f1ef27a142fd77e850440766add50a97d1858a /test
parent82fcea61c21b31ce325dc47401808dd45d4ef42c (diff)
Add unit test to ensure nested dynamic thread pool groups work as expected. Still Windows only implementation.
Diffstat (limited to 'test')
-rw-r--r--test/tests/dynamic_thread_pool_group.cpp335
1 files changed, 335 insertions, 0 deletions
diff --git a/test/tests/dynamic_thread_pool_group.cpp b/test/tests/dynamic_thread_pool_group.cpp
new file mode 100644
index 00000000..8e068583
--- /dev/null
+++ b/test/tests/dynamic_thread_pool_group.cpp
@@ -0,0 +1,335 @@
+/* Integration test kernel for dynamic_thread_pool_group
+(C) 2020 Niall Douglas <http://www.nedproductions.biz/> (2 commits)
+File Created: Dec 2020
+
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License in the accompanying file
+Licence.txt or at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+
+Distributed under the Boost Software License, Version 1.0.
+ (See accompanying file Licence.txt or copy at
+ http://www.boost.org/LICENSE_1_0.txt)
+*/
+
+//#define LLFIO_LOGGING_LEVEL 99
+
+#include "../test_kernel_decl.hpp"
+
+static inline void TestDynamicThreadPoolGroupWorks()
+{
+ namespace llfio = LLFIO_V2_NAMESPACE;
+ // llfio::log_level_guard llg(llfio::log_level::all);
+ struct work_item;
+ struct shared_state_t
+ {
+ std::atomic<intptr_t> p{0};
+ std::atomic<size_t> concurrency{0}, max_concurrency{0}, group_completes{0};
+ std::vector<size_t> executed;
+ llfio::dynamic_thread_pool_group_ptr tpg{llfio::make_dynamic_thread_pool_group().value()};
+ std::atomic<bool> cancelling{false};
+ } shared_state;
+ struct work_item final : public llfio::dynamic_thread_pool_group::work_item
+ {
+ using _base = llfio::dynamic_thread_pool_group::work_item;
+ shared_state_t *shared{nullptr};
+ std::atomic<bool> within{false};
+
+ work_item() = default;
+ explicit work_item(shared_state_t *_shared)
+ : shared(_shared)
+ {
+ }
+ work_item(work_item &&o) noexcept
+ : _base(std::move(o))
+ , shared(o.shared)
+ {
+ }
+
+ virtual intptr_t next(llfio::deadline &d) noexcept override
+ {
+ bool expected = false;
+ BOOST_CHECK(within.compare_exchange_strong(expected, true));
+ (void) d;
+ BOOST_CHECK(parent() == shared->tpg.get());
+ auto ret = shared->p.fetch_sub(1);
+ if(ret < 0)
+ {
+ ret = -1;
+ }
+ // std::cout << " next() returns " << ret << std::endl;
+ expected = true;
+ BOOST_CHECK(within.compare_exchange_strong(expected, false));
+ return ret;
+ }
+ virtual llfio::result<void> operator()(intptr_t work) noexcept override
+ {
+ bool expected = false;
+ BOOST_CHECK(within.compare_exchange_strong(expected, true));
+ auto concurrency = shared->concurrency.fetch_add(1) + 1;
+ for(size_t expected_ = shared->max_concurrency; concurrency > expected_;)
+ {
+ shared->max_concurrency.compare_exchange_weak(expected_, concurrency);
+ }
+ BOOST_CHECK(parent() == shared->tpg.get());
+ BOOST_CHECK(llfio::dynamic_thread_pool_group::current_nesting_level() == 1);
+ BOOST_CHECK(llfio::dynamic_thread_pool_group::current_work_item() == this);
+ // std::cout << " () executes " << work << std::endl;
+ std::this_thread::sleep_for(std::chrono::milliseconds(100));
+ auto *executed = (std::atomic<size_t> *) &shared->executed[work];
+ executed->fetch_add(1);
+ shared->concurrency.fetch_sub(1);
+ expected = true;
+ BOOST_CHECK(within.compare_exchange_strong(expected, false));
+ return llfio::success();
+ }
+ virtual void group_complete(const llfio::result<void> &cancelled) noexcept override
+ {
+ bool expected = false;
+ BOOST_CHECK(within.compare_exchange_strong(expected, true));
+ BOOST_CHECK(parent() == nullptr);
+ BOOST_CHECK(shared->cancelling == cancelled.has_error());
+ // std::cout << " group_complete()" << std::endl;
+ shared->group_completes.fetch_add(1);
+ expected = true;
+ BOOST_CHECK(within.compare_exchange_strong(expected, false));
+ }
+ };
+ std::vector<work_item> workitems;
+ auto reset = [&](size_t count) {
+ workitems.clear();
+ shared_state.executed.clear();
+ shared_state.executed.resize(count + 1);
+ for(size_t n = 0; n < count; n++)
+ {
+ workitems.emplace_back(&shared_state);
+ }
+ shared_state.p = (intptr_t) count;
+ shared_state.concurrency = 0;
+ shared_state.max_concurrency = 0;
+ shared_state.group_completes = 0;
+ };
+ auto submit = [&] {
+ auto **wis = (llfio::dynamic_thread_pool_group::work_item **) alloca(sizeof(work_item *) * workitems.size());
+ for(size_t n = 0; n < workitems.size(); n++)
+ {
+ wis[n] = &workitems[n];
+ }
+ BOOST_CHECK(!shared_state.tpg->stopping());
+ BOOST_CHECK(shared_state.tpg->stopped());
+ BOOST_CHECK(llfio::dynamic_thread_pool_group::current_nesting_level() == 0);
+ BOOST_CHECK(llfio::dynamic_thread_pool_group::current_work_item() == nullptr);
+ for(size_t n = 0; n < workitems.size(); n++)
+ {
+ BOOST_CHECK(workitems[n].parent() == nullptr);
+ }
+ shared_state.tpg->submit({wis, workitems.size()}).value();
+ BOOST_CHECK(!shared_state.tpg->stopping());
+ BOOST_CHECK(!shared_state.tpg->stopped());
+ for(size_t n = 0; n < workitems.size(); n++)
+ {
+ BOOST_CHECK(workitems[n].parent() == shared_state.tpg.get());
+ }
+ BOOST_CHECK(llfio::dynamic_thread_pool_group::current_nesting_level() == 0);
+ BOOST_CHECK(llfio::dynamic_thread_pool_group::current_work_item() == nullptr);
+ };
+ auto check = [&] {
+ shared_state.tpg->wait().value();
+ BOOST_CHECK(!shared_state.tpg->stopping());
+ BOOST_CHECK(shared_state.tpg->stopped());
+ BOOST_CHECK(llfio::dynamic_thread_pool_group::current_nesting_level() == 0);
+ BOOST_CHECK(llfio::dynamic_thread_pool_group::current_work_item() == nullptr);
+ for(size_t n = 0; n < workitems.size(); n++)
+ {
+ BOOST_CHECK(workitems[n].parent() == nullptr);
+ }
+ BOOST_CHECK(shared_state.group_completes == workitems.size());
+ BOOST_CHECK(shared_state.executed[0] == 0);
+ if(shared_state.cancelling)
+ {
+ size_t executed = 0, notexecuted = 0;
+ for(size_t n = 1; n <= workitems.size(); n++)
+ {
+ if(shared_state.executed[n] == 1)
+ {
+ executed++;
+ }
+ else
+ {
+ notexecuted++;
+ }
+ }
+ std::cout << "During cancellation, executed " << executed << " and did not execute " << notexecuted << std::endl;
+ }
+ else
+ {
+ for(size_t n = 1; n <= workitems.size(); n++)
+ {
+ BOOST_CHECK(shared_state.executed[n] == 1);
+ if(shared_state.executed[n] != 1)
+ {
+ std::cout << "shared_state.executed[" << n << "] = " << shared_state.executed[n] << std::endl;
+ }
+ }
+ }
+ std::cout << "Maximum concurrency achieved with " << workitems.size() << " work items = " << shared_state.max_concurrency << "\n" << std::endl;
+ };
+
+ // Test a single work item
+ reset(1);
+ submit();
+ check();
+
+ // Test 10 work items
+ reset(10);
+ submit();
+ check();
+
+ // Test 1000 work items
+ reset(1000);
+ submit();
+ check();
+
+ // Test 1000 work items with stop
+ reset(1000);
+ submit();
+ std::this_thread::sleep_for(std::chrono::milliseconds(10));
+ shared_state.cancelling = true;
+ shared_state.tpg->stop().value();
+ BOOST_CHECK(shared_state.tpg->stopping());
+ auto r = shared_state.tpg->wait();
+ BOOST_CHECK(!shared_state.tpg->stopping());
+ BOOST_REQUIRE(!r);
+ BOOST_CHECK(r.error() == llfio::errc::operation_canceled);
+ check();
+}
+
+static inline void TestDynamicThreadPoolGroupNestingWorks()
+{
+ namespace llfio = LLFIO_V2_NAMESPACE;
+ static constexpr size_t MAX_NESTING = 10;
+ static constexpr int COUNT_PER_WORK_ITEM = 1000;
+ struct shared_state_t
+ {
+ std::mutex lock;
+ std::unordered_map<uint64_t, size_t> time_bucket;
+ llfio::dynamic_thread_pool_group_ptr tpg;
+ double stddev{0};
+ void calc_stddev()
+ {
+ stddev = 0;
+ uint64_t mean = 0, count = 0;
+ for(auto &i : time_bucket)
+ {
+ mean += i.first*i.second;
+ count += i.second;
+ }
+ mean /= count;
+ for(auto &i : time_bucket)
+ {
+ double diff = (double) abs((int64_t) i.first - (int64_t) mean);
+ stddev += diff * diff * i.second;
+ }
+ stddev /= count;
+ stddev = sqrt(stddev);
+ }
+ } shared_states[MAX_NESTING];
+ struct work_item final : public llfio::dynamic_thread_pool_group::work_item
+ {
+ using _base = llfio::dynamic_thread_pool_group::work_item;
+ const size_t nesting{0};
+ llfio::span<shared_state_t> shared_states;
+ std::atomic<int> count{COUNT_PER_WORK_ITEM};
+ std::unique_ptr<work_item> childwi;
+
+ work_item() = default;
+ explicit work_item(size_t _nesting, llfio::span<shared_state_t> _shared_states)
+ : nesting(_nesting)
+ , shared_states(_shared_states)
+ {
+ if(nesting + 1 < MAX_NESTING)
+ {
+ childwi = std::make_unique<work_item>(nesting + 1, shared_states);
+ }
+ }
+ work_item(work_item &&o) noexcept
+ : nesting(o.nesting)
+ , shared_states(o.shared_states)
+ , childwi(std::move(o.childwi))
+ {
+ }
+
+ virtual intptr_t next(llfio::deadline & /*unused*/) noexcept override
+ {
+ auto ret = count.fetch_sub(1);
+ if(ret <= 0)
+ {
+ ret = -1;
+ }
+ return ret;
+ }
+ virtual llfio::result<void> operator()(intptr_t work) noexcept override
+ {
+ auto supposed_nesting_level = llfio::dynamic_thread_pool_group::current_nesting_level();
+ BOOST_CHECK(nesting + 1 == supposed_nesting_level);
+ if(nesting + 1 != supposed_nesting_level)
+ {
+ std::cerr << "current_nesting_level() reports " << supposed_nesting_level << " not " << (nesting + 1) << std::endl;
+ }
+ uint64_t idx = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now().time_since_epoch()).count();
+ std::lock_guard<std::mutex> g(shared_states[nesting].lock);
+ if(COUNT_PER_WORK_ITEM == work && childwi)
+ {
+ if(!shared_states[nesting].tpg)
+ {
+ shared_states[nesting].tpg = llfio::make_dynamic_thread_pool_group().value();
+ }
+ OUTCOME_TRY(shared_states[nesting].tpg->submit(childwi.get()));
+ }
+ shared_states[nesting].time_bucket[idx]++;
+ return llfio::success();
+ }
+ // virtual void group_complete(const llfio::result<void> &/*unused*/) noexcept override { }
+ };
+ std::vector<work_item> workitems;
+ for(size_t n = 0; n < 100; n++)
+ {
+ workitems.emplace_back(0, shared_states);
+ }
+ auto tpg = llfio::make_dynamic_thread_pool_group().value();
+ tpg->submit(llfio::span<work_item>(workitems)).value();
+ tpg->wait().value();
+ for(size_t n = 0; n < MAX_NESTING - 1; n++)
+ {
+ std::unique_lock<std::mutex> g(shared_states[n].lock);
+ while(!shared_states[n].tpg)
+ {
+ g.unlock();
+ std::this_thread::sleep_for(std::chrono::milliseconds(1));
+ g.lock();
+ }
+ g.unlock();
+ shared_states[n].tpg->wait().value();
+ }
+ for(size_t n = 0; n < MAX_NESTING; n++)
+ {
+ shared_states[n].calc_stddev();
+ std::cout << " Standard deviation for nesting level " << (n + 1) << " was " << shared_states[n].stddev << std::endl;
+ }
+ BOOST_CHECK(shared_states[MAX_NESTING - 1].stddev < shared_states[MAX_NESTING / 2].stddev / 2);
+}
+
+KERNELTEST_TEST_KERNEL(integration, llfio, dynamic_thread_pool_group, works, "Tests that llfio::dynamic_thread_pool_group works as expected",
+ TestDynamicThreadPoolGroupWorks())
+KERNELTEST_TEST_KERNEL(integration, llfio, dynamic_thread_pool_group, nested, "Tests that nesting of llfio::dynamic_thread_pool_group works as expected",
+ TestDynamicThreadPoolGroupNestingWorks())