Welcome to mirror list, hosted at ThFree Co, Russian Federation.

github.com/windirstat/llfio.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorNiall Douglas (s [underscore] sourceforge {at} nedprod [dot] com) <spamtrap@nedprod.com>2020-12-24 19:05:07 +0300
committerNiall Douglas (s [underscore] sourceforge {at} nedprod [dot] com) <spamtrap@nedprod.com>2021-03-16 13:21:38 +0300
commitdfa6771c4f8ba0635a252a39d13a48c7c5c75489 (patch)
treed3f1ef27a142fd77e850440766add50a97d1858a
parent82fcea61c21b31ce325dc47401808dd45d4ef42c (diff)
Add unit test to ensure nested dynamic thread pool groups work as expected. Still Windows only implementation.
-rw-r--r--include/llfio/v2.0/detail/impl/dynamic_thread_pool_group.ipp73
-rw-r--r--include/llfio/v2.0/dynamic_thread_pool_group.hpp62
-rw-r--r--test/tests/dynamic_thread_pool_group.cpp335
3 files changed, 450 insertions, 20 deletions
diff --git a/include/llfio/v2.0/detail/impl/dynamic_thread_pool_group.ipp b/include/llfio/v2.0/detail/impl/dynamic_thread_pool_group.ipp
index c44dcde4..7e6eb2ac 100644
--- a/include/llfio/v2.0/detail/impl/dynamic_thread_pool_group.ipp
+++ b/include/llfio/v2.0/detail/impl/dynamic_thread_pool_group.ipp
@@ -475,7 +475,7 @@ namespace detail
{
(void) g;
#ifdef _WIN32
- if(1 == i->_internalworkh_inuse)
+ if(i->_internalworkh_inuse > 0)
{
i->_internalworkh_inuse = 2;
}
@@ -504,6 +504,7 @@ namespace detail
for(; v != nullptr; v = n)
{
v->_parent = nullptr;
+ v->_nextwork = -1;
n = v->_next;
}
n = v = parent->_work_items_done.front;
@@ -590,27 +591,49 @@ namespace detail
auto *i = group->_work_items_active.front;
if(nullptr != i->_internalworkh)
{
- i->_internalworkh_inuse = 1;
+ if(0 == i->_internalworkh_inuse)
+ {
+ i->_internalworkh_inuse = 1;
+ }
g.unlock();
WaitForThreadpoolWorkCallbacks((PTP_WORK) i->_internalworkh, true);
g.lock();
if(i->_internalworkh_inuse == 2)
{
- CloseThreadpoolWork((PTP_WORK) i->_internalworkh);
- i->_internalworkh = nullptr;
+ if(nullptr != i->_internalworkh)
+ {
+ CloseThreadpoolWork((PTP_WORK) i->_internalworkh);
+ i->_internalworkh = nullptr;
+ }
+ if(nullptr != i->_internaltimerh)
+ {
+ CloseThreadpoolTimer((PTP_TIMER) i->_internaltimerh);
+ i->_internaltimerh = nullptr;
+ }
}
i->_internalworkh_inuse = 0;
}
if(nullptr != i->_internaltimerh)
{
- i->_internalworkh_inuse = 1;
+ if(0 == i->_internalworkh_inuse)
+ {
+ i->_internalworkh_inuse = 1;
+ }
g.unlock();
WaitForThreadpoolTimerCallbacks((PTP_TIMER) i->_internaltimerh, true);
g.lock();
if(i->_internalworkh_inuse == 2)
{
- CloseThreadpoolTimer((PTP_TIMER) i->_internaltimerh);
- i->_internalworkh = nullptr;
+ if(nullptr != i->_internalworkh)
+ {
+ CloseThreadpoolWork((PTP_WORK) i->_internalworkh);
+ i->_internalworkh = nullptr;
+ }
+ if(nullptr != i->_internaltimerh)
+ {
+ CloseThreadpoolTimer((PTP_TIMER) i->_internaltimerh);
+ i->_internaltimerh = nullptr;
+ }
}
i->_internalworkh_inuse = 0;
}
@@ -632,27 +655,49 @@ namespace detail
auto *i = group->_work_items_active.front;
if(nullptr != i->_internalworkh)
{
- i->_internalworkh_inuse = 1;
+ if(0 == i->_internalworkh_inuse)
+ {
+ i->_internalworkh_inuse = 1;
+ }
g.unlock();
WaitForThreadpoolWorkCallbacks((PTP_WORK) i->_internalworkh, false);
g.lock();
if(i->_internalworkh_inuse == 2)
{
- CloseThreadpoolWork((PTP_WORK) i->_internalworkh);
- i->_internalworkh = nullptr;
+ if(nullptr != i->_internalworkh)
+ {
+ CloseThreadpoolWork((PTP_WORK) i->_internalworkh);
+ i->_internalworkh = nullptr;
+ }
+ if(nullptr != i->_internaltimerh)
+ {
+ CloseThreadpoolTimer((PTP_TIMER) i->_internaltimerh);
+ i->_internaltimerh = nullptr;
+ }
}
i->_internalworkh_inuse = 0;
}
if(nullptr != i->_internaltimerh)
{
- i->_internalworkh_inuse = 1;
+ if(0 == i->_internalworkh_inuse)
+ {
+ i->_internalworkh_inuse = 1;
+ }
g.unlock();
WaitForThreadpoolTimerCallbacks((PTP_TIMER) i->_internaltimerh, false);
g.lock();
if(i->_internalworkh_inuse == 2)
{
- CloseThreadpoolTimer((PTP_TIMER) i->_internaltimerh);
- i->_internalworkh = nullptr;
+ if(nullptr != i->_internalworkh)
+ {
+ CloseThreadpoolWork((PTP_WORK) i->_internalworkh);
+ i->_internalworkh = nullptr;
+ }
+ if(nullptr != i->_internaltimerh)
+ {
+ CloseThreadpoolTimer((PTP_TIMER) i->_internaltimerh);
+ i->_internaltimerh = nullptr;
+ }
}
i->_internalworkh_inuse = 0;
}
@@ -707,7 +752,7 @@ namespace detail
auto old_thread_local_state = tls;
tls.workitem = workitem;
tls.current_callback_instance = selfthreadh;
- tls.nesting_level++;
+ tls.nesting_level = workitem->_parent->_nesting_level + 1;
auto r = (*workitem)(workitem->_nextwork);
workitem->_nextwork = 0; // call next() next time
tls = old_thread_local_state;
diff --git a/include/llfio/v2.0/dynamic_thread_pool_group.hpp b/include/llfio/v2.0/dynamic_thread_pool_group.hpp
index c216f023..d3b6279d 100644
--- a/include/llfio/v2.0/dynamic_thread_pool_group.hpp
+++ b/include/llfio/v2.0/dynamic_thread_pool_group.hpp
@@ -158,20 +158,56 @@ public:
dynamic_thread_pool_group_impl *_parent{nullptr};
void *_internalworkh{nullptr}, *_internaltimerh{nullptr};
work_item *_prev{nullptr}, *_next{nullptr};
- intptr_t _nextwork{0};
+ intptr_t _nextwork{-1};
std::chrono::steady_clock::time_point _timepoint1;
std::chrono::system_clock::time_point _timepoint2;
int _internalworkh_inuse{0};
protected:
work_item() = default;
- work_item(const work_item &) = default;
- work_item(work_item &&) = default;
- work_item &operator=(const work_item &) = default;
- work_item &operator=(work_item &&) = default;
+ work_item(const work_item &o) = delete;
+ work_item(work_item &&o) noexcept
+ : _parent(o._parent)
+ , _internalworkh(o._internalworkh)
+ , _internaltimerh(o._internaltimerh)
+ , _prev(o._prev)
+ , _next(o._next)
+ , _nextwork(o._nextwork)
+ , _timepoint1(o._timepoint1)
+ , _timepoint2(o._timepoint2)
+ {
+ assert(o._parent == nullptr);
+ assert(o._internalworkh == nullptr);
+ assert(o._internaltimerh == nullptr);
+ if(o._parent != nullptr || o._internalworkh != nullptr)
+ {
+ LLFIO_LOG_FATAL(this, "FATAL: dynamic_thread_pool_group::work_item was relocated in memory during use!");
+ abort();
+ }
+ o._prev = o._next = nullptr;
+ o._nextwork = -1;
+ }
+ work_item &operator=(const work_item &) = delete;
+ work_item &operator=(work_item &&) = delete;
public:
- virtual ~work_item() {}
+ virtual ~work_item()
+ {
+ assert(_nextwork == -1);
+ if(_nextwork != -1)
+ {
+ LLFIO_LOG_FATAL(this, "FATAL: dynamic_thread_pool_group::work_item destroyed before all work was done!");
+ abort();
+ }
+ assert(_internalworkh == nullptr);
+ assert(_internaltimerh == nullptr);
+ assert(_parent == nullptr);
+ if(_internalworkh != nullptr || _parent != nullptr)
+ {
+ LLFIO_LOG_FATAL(this, "FATAL: dynamic_thread_pool_group::work_item destroyed before group_complete() was executed!");
+ abort();
+ }
+ }
//! Returns the parent work group between successful submission and just before `group_complete()`.
dynamic_thread_pool_group *parent() const noexcept { return reinterpret_cast<dynamic_thread_pool_group *>(_parent); }
@@ -245,6 +281,20 @@ public:
`errc::operation_canceled` is returned if you try.
*/
virtual result<void> submit(span<work_item *> work) noexcept = 0;
+ //! \overload
+ result<void> submit(work_item *wi) noexcept { return submit(span<work_item *>(&wi, 1)); }
+ //! \overload
+ LLFIO_TEMPLATE(class T)
+ LLFIO_TREQUIRES(LLFIO_TPRED(!std::is_pointer<T>::value), LLFIO_TPRED(std::is_base_of<work_item, T>::value))
+ result<void> submit(span<T> wi) noexcept
+ {
+ auto *wis = (T **) alloca(sizeof(T *) * wi.size());
+ for(size_t n = 0; n < wi.size(); n++)
+ {
+ wis[n] = &wi[n];
+ }
+ return submit(span<work_item *>((work_item **) wis, wi.size()));
+ }
//! Threadsafe. Cancel any remaining work previously submitted, but without blocking (use `wait()` to block).
virtual result<void> stop() noexcept = 0;
diff --git a/test/tests/dynamic_thread_pool_group.cpp b/test/tests/dynamic_thread_pool_group.cpp
new file mode 100644
index 00000000..8e068583
--- /dev/null
+++ b/test/tests/dynamic_thread_pool_group.cpp
@@ -0,0 +1,335 @@
+/* Integration test kernel for dynamic_thread_pool_group
+(C) 2020 Niall Douglas <http://www.nedproductions.biz/> (2 commits)
+File Created: Dec 2020
+
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License in the accompanying file
+Licence.txt or at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+
+Distributed under the Boost Software License, Version 1.0.
+ (See accompanying file Licence.txt or copy at
+ http://www.boost.org/LICENSE_1_0.txt)
+*/
+
+//#define LLFIO_LOGGING_LEVEL 99
+
+#include "../test_kernel_decl.hpp"
+
+static inline void TestDynamicThreadPoolGroupWorks()
+{
+ namespace llfio = LLFIO_V2_NAMESPACE;
+ // llfio::log_level_guard llg(llfio::log_level::all);
+ struct work_item;
+ struct shared_state_t
+ {
+ std::atomic<intptr_t> p{0};
+ std::atomic<size_t> concurrency{0}, max_concurrency{0}, group_completes{0};
+ std::vector<size_t> executed;
+ llfio::dynamic_thread_pool_group_ptr tpg{llfio::make_dynamic_thread_pool_group().value()};
+ std::atomic<bool> cancelling{false};
+ } shared_state;
+ struct work_item final : public llfio::dynamic_thread_pool_group::work_item
+ {
+ using _base = llfio::dynamic_thread_pool_group::work_item;
+ shared_state_t *shared{nullptr};
+ std::atomic<bool> within{false};
+
+ work_item() = default;
+ explicit work_item(shared_state_t *_shared)
+ : shared(_shared)
+ {
+ }
+ work_item(work_item &&o) noexcept
+ : _base(std::move(o))
+ , shared(o.shared)
+ {
+ }
+
+ virtual intptr_t next(llfio::deadline &d) noexcept override
+ {
+ bool expected = false;
+ BOOST_CHECK(within.compare_exchange_strong(expected, true));
+ (void) d;
+ BOOST_CHECK(parent() == shared->tpg.get());
+ auto ret = shared->p.fetch_sub(1);
+ if(ret < 0)
+ {
+ ret = -1;
+ }
+ // std::cout << " next() returns " << ret << std::endl;
+ expected = true;
+ BOOST_CHECK(within.compare_exchange_strong(expected, false));
+ return ret;
+ }
+ virtual llfio::result<void> operator()(intptr_t work) noexcept override
+ {
+ bool expected = false;
+ BOOST_CHECK(within.compare_exchange_strong(expected, true));
+ auto concurrency = shared->concurrency.fetch_add(1) + 1;
+ for(size_t expected_ = shared->max_concurrency; concurrency > expected_;)
+ {
+ shared->max_concurrency.compare_exchange_weak(expected_, concurrency);
+ }
+ BOOST_CHECK(parent() == shared->tpg.get());
+ BOOST_CHECK(llfio::dynamic_thread_pool_group::current_nesting_level() == 1);
+ BOOST_CHECK(llfio::dynamic_thread_pool_group::current_work_item() == this);
+ // std::cout << " () executes " << work << std::endl;
+ std::this_thread::sleep_for(std::chrono::milliseconds(100));
+ auto *executed = (std::atomic<size_t> *) &shared->executed[work];
+ executed->fetch_add(1);
+ shared->concurrency.fetch_sub(1);
+ expected = true;
+ BOOST_CHECK(within.compare_exchange_strong(expected, false));
+ return llfio::success();
+ }
+ virtual void group_complete(const llfio::result<void> &cancelled) noexcept override
+ {
+ bool expected = false;
+ BOOST_CHECK(within.compare_exchange_strong(expected, true));
+ BOOST_CHECK(parent() == nullptr);
+ BOOST_CHECK(shared->cancelling == cancelled.has_error());
+ // std::cout << " group_complete()" << std::endl;
+ shared->group_completes.fetch_add(1);
+ expected = true;
+ BOOST_CHECK(within.compare_exchange_strong(expected, false));
+ }
+ };
+ std::vector<work_item> workitems;
+ auto reset = [&](size_t count) {
+ workitems.clear();
+ shared_state.executed.clear();
+ shared_state.executed.resize(count + 1);
+ for(size_t n = 0; n < count; n++)
+ {
+ workitems.emplace_back(&shared_state);
+ }
+ shared_state.p = (intptr_t) count;
+ shared_state.concurrency = 0;
+ shared_state.max_concurrency = 0;
+ shared_state.group_completes = 0;
+ };
+ auto submit = [&] {
+ auto **wis = (llfio::dynamic_thread_pool_group::work_item **) alloca(sizeof(work_item *) * workitems.size());
+ for(size_t n = 0; n < workitems.size(); n++)
+ {
+ wis[n] = &workitems[n];
+ }
+ BOOST_CHECK(!shared_state.tpg->stopping());
+ BOOST_CHECK(shared_state.tpg->stopped());
+ BOOST_CHECK(llfio::dynamic_thread_pool_group::current_nesting_level() == 0);
+ BOOST_CHECK(llfio::dynamic_thread_pool_group::current_work_item() == nullptr);
+ for(size_t n = 0; n < workitems.size(); n++)
+ {
+ BOOST_CHECK(workitems[n].parent() == nullptr);
+ }
+ shared_state.tpg->submit({wis, workitems.size()}).value();
+ BOOST_CHECK(!shared_state.tpg->stopping());
+ BOOST_CHECK(!shared_state.tpg->stopped());
+ for(size_t n = 0; n < workitems.size(); n++)
+ {
+ BOOST_CHECK(workitems[n].parent() == shared_state.tpg.get());
+ }
+ BOOST_CHECK(llfio::dynamic_thread_pool_group::current_nesting_level() == 0);
+ BOOST_CHECK(llfio::dynamic_thread_pool_group::current_work_item() == nullptr);
+ };
+ auto check = [&] {
+ shared_state.tpg->wait().value();
+ BOOST_CHECK(!shared_state.tpg->stopping());
+ BOOST_CHECK(shared_state.tpg->stopped());
+ BOOST_CHECK(llfio::dynamic_thread_pool_group::current_nesting_level() == 0);
+ BOOST_CHECK(llfio::dynamic_thread_pool_group::current_work_item() == nullptr);
+ for(size_t n = 0; n < workitems.size(); n++)
+ {
+ BOOST_CHECK(workitems[n].parent() == nullptr);
+ }
+ BOOST_CHECK(shared_state.group_completes == workitems.size());
+ BOOST_CHECK(shared_state.executed[0] == 0);
+ if(shared_state.cancelling)
+ {
+ size_t executed = 0, notexecuted = 0;
+ for(size_t n = 1; n <= workitems.size(); n++)
+ {
+ if(shared_state.executed[n] == 1)
+ {
+ executed++;
+ }
+ else
+ {
+ notexecuted++;
+ }
+ }
+ std::cout << "During cancellation, executed " << executed << " and did not execute " << notexecuted << std::endl;
+ }
+ else
+ {
+ for(size_t n = 1; n <= workitems.size(); n++)
+ {
+ BOOST_CHECK(shared_state.executed[n] == 1);
+ if(shared_state.executed[n] != 1)
+ {
+ std::cout << "shared_state.executed[" << n << "] = " << shared_state.executed[n] << std::endl;
+ }
+ }
+ }
+ std::cout << "Maximum concurrency achieved with " << workitems.size() << " work items = " << shared_state.max_concurrency << "\n" << std::endl;
+ };
+
+ // Test a single work item
+ reset(1);
+ submit();
+ check();
+
+ // Test 10 work items
+ reset(10);
+ submit();
+ check();
+
+ // Test 1000 work items
+ reset(1000);
+ submit();
+ check();
+
+ // Test 1000 work items with stop
+ reset(1000);
+ submit();
+ std::this_thread::sleep_for(std::chrono::milliseconds(10));
+ shared_state.cancelling = true;
+ shared_state.tpg->stop().value();
+ BOOST_CHECK(shared_state.tpg->stopping());
+ auto r = shared_state.tpg->wait();
+ BOOST_CHECK(!shared_state.tpg->stopping());
+ BOOST_REQUIRE(!r);
+ BOOST_CHECK(r.error() == llfio::errc::operation_canceled);
+ check();
+}
+
+static inline void TestDynamicThreadPoolGroupNestingWorks()
+{
+ namespace llfio = LLFIO_V2_NAMESPACE;
+ static constexpr size_t MAX_NESTING = 10;
+ static constexpr int COUNT_PER_WORK_ITEM = 1000;
+ struct shared_state_t
+ {
+ std::mutex lock;
+ std::unordered_map<uint64_t, size_t> time_bucket;
+ llfio::dynamic_thread_pool_group_ptr tpg;
+ double stddev{0};
+ void calc_stddev()
+ {
+ stddev = 0;
+ uint64_t mean = 0, count = 0;
+ for(auto &i : time_bucket)
+ {
+ mean += i.first*i.second;
+ count += i.second;
+ }
+ mean /= count;
+ for(auto &i : time_bucket)
+ {
+ double diff = (double) abs((int64_t) i.first - (int64_t) mean);
+ stddev += diff * diff * i.second;
+ }
+ stddev /= count;
+ stddev = sqrt(stddev);
+ }
+ } shared_states[MAX_NESTING];
+ struct work_item final : public llfio::dynamic_thread_pool_group::work_item
+ {
+ using _base = llfio::dynamic_thread_pool_group::work_item;
+ const size_t nesting{0};
+ llfio::span<shared_state_t> shared_states;
+ std::atomic<int> count{COUNT_PER_WORK_ITEM};
+ std::unique_ptr<work_item> childwi;
+
+ work_item() = default;
+ explicit work_item(size_t _nesting, llfio::span<shared_state_t> _shared_states)
+ : nesting(_nesting)
+ , shared_states(_shared_states)
+ {
+ if(nesting + 1 < MAX_NESTING)
+ {
+ childwi = std::make_unique<work_item>(nesting + 1, shared_states);
+ }
+ }
+ work_item(work_item &&o) noexcept
+ : nesting(o.nesting)
+ , shared_states(o.shared_states)
+ , childwi(std::move(o.childwi))
+ {
+ }
+
+ virtual intptr_t next(llfio::deadline & /*unused*/) noexcept override
+ {
+ auto ret = count.fetch_sub(1);
+ if(ret <= 0)
+ {
+ ret = -1;
+ }
+ return ret;
+ }
+ virtual llfio::result<void> operator()(intptr_t work) noexcept override
+ {
+ auto supposed_nesting_level = llfio::dynamic_thread_pool_group::current_nesting_level();
+ BOOST_CHECK(nesting + 1 == supposed_nesting_level);
+ if(nesting + 1 != supposed_nesting_level)
+ {
+ std::cerr << "current_nesting_level() reports " << supposed_nesting_level << " not " << (nesting + 1) << std::endl;
+ }
+ uint64_t idx = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now().time_since_epoch()).count();
+ std::lock_guard<std::mutex> g(shared_states[nesting].lock);
+ if(COUNT_PER_WORK_ITEM == work && childwi)
+ {
+ if(!shared_states[nesting].tpg)
+ {
+ shared_states[nesting].tpg = llfio::make_dynamic_thread_pool_group().value();
+ }
+ OUTCOME_TRY(shared_states[nesting].tpg->submit(childwi.get()));
+ }
+ shared_states[nesting].time_bucket[idx]++;
+ return llfio::success();
+ }
+ // virtual void group_complete(const llfio::result<void> &/*unused*/) noexcept override { }
+ };
+ std::vector<work_item> workitems;
+ for(size_t n = 0; n < 100; n++)
+ {
+ workitems.emplace_back(0, shared_states);
+ }
+ auto tpg = llfio::make_dynamic_thread_pool_group().value();
+ tpg->submit(llfio::span<work_item>(workitems)).value();
+ tpg->wait().value();
+ for(size_t n = 0; n < MAX_NESTING - 1; n++)
+ {
+ std::unique_lock<std::mutex> g(shared_states[n].lock);
+ while(!shared_states[n].tpg)
+ {
+ g.unlock();
+ std::this_thread::sleep_for(std::chrono::milliseconds(1));
+ g.lock();
+ }
+ g.unlock();
+ shared_states[n].tpg->wait().value();
+ }
+ for(size_t n = 0; n < MAX_NESTING; n++)
+ {
+ shared_states[n].calc_stddev();
+ std::cout << " Standard deviation for nesting level " << (n + 1) << " was " << shared_states[n].stddev << std::endl;
+ }
+ BOOST_CHECK(shared_states[MAX_NESTING - 1].stddev < shared_states[MAX_NESTING / 2].stddev / 2);
+}
+
+KERNELTEST_TEST_KERNEL(integration, llfio, dynamic_thread_pool_group, works, "Tests that llfio::dynamic_thread_pool_group works as expected",
+ TestDynamicThreadPoolGroupWorks())
+KERNELTEST_TEST_KERNEL(integration, llfio, dynamic_thread_pool_group, nested, "Tests that nesting of llfio::dynamic_thread_pool_group works as expected",
+ TestDynamicThreadPoolGroupNestingWorks())