Welcome to mirror list, hosted at ThFree Co, Russian Federation.

github.com/windirstat/llfio.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorNiall Douglas (s [underscore] sourceforge {at} nedprod [dot] com) <spamtrap@nedprod.com>2020-12-24 19:05:07 +0300
committerNiall Douglas (s [underscore] sourceforge {at} nedprod [dot] com) <spamtrap@nedprod.com>2021-03-16 13:21:38 +0300
commitdfa6771c4f8ba0635a252a39d13a48c7c5c75489 (patch)
treed3f1ef27a142fd77e850440766add50a97d1858a /include
parent82fcea61c21b31ce325dc47401808dd45d4ef42c (diff)
Add unit test to ensure nested dynamic thread pool groups work as expected. Still Windows only implementation.
Diffstat (limited to 'include')
-rw-r--r--include/llfio/v2.0/detail/impl/dynamic_thread_pool_group.ipp73
-rw-r--r--include/llfio/v2.0/dynamic_thread_pool_group.hpp62
2 files changed, 115 insertions, 20 deletions
diff --git a/include/llfio/v2.0/detail/impl/dynamic_thread_pool_group.ipp b/include/llfio/v2.0/detail/impl/dynamic_thread_pool_group.ipp
index c44dcde4..7e6eb2ac 100644
--- a/include/llfio/v2.0/detail/impl/dynamic_thread_pool_group.ipp
+++ b/include/llfio/v2.0/detail/impl/dynamic_thread_pool_group.ipp
@@ -475,7 +475,7 @@ namespace detail
{
(void) g;
#ifdef _WIN32
- if(1 == i->_internalworkh_inuse)
+ if(i->_internalworkh_inuse > 0)
{
i->_internalworkh_inuse = 2;
}
@@ -504,6 +504,7 @@ namespace detail
for(; v != nullptr; v = n)
{
v->_parent = nullptr;
+ v->_nextwork = -1;
n = v->_next;
}
n = v = parent->_work_items_done.front;
@@ -590,27 +591,49 @@ namespace detail
auto *i = group->_work_items_active.front;
if(nullptr != i->_internalworkh)
{
- i->_internalworkh_inuse = 1;
+ if(0 == i->_internalworkh_inuse)
+ {
+ i->_internalworkh_inuse = 1;
+ }
g.unlock();
WaitForThreadpoolWorkCallbacks((PTP_WORK) i->_internalworkh, true);
g.lock();
if(i->_internalworkh_inuse == 2)
{
- CloseThreadpoolWork((PTP_WORK) i->_internalworkh);
- i->_internalworkh = nullptr;
+ if(nullptr != i->_internalworkh)
+ {
+ CloseThreadpoolWork((PTP_WORK) i->_internalworkh);
+ i->_internalworkh = nullptr;
+ }
+ if(nullptr != i->_internaltimerh)
+ {
+ CloseThreadpoolTimer((PTP_TIMER) i->_internaltimerh);
+ i->_internaltimerh = nullptr;
+ }
}
i->_internalworkh_inuse = 0;
}
if(nullptr != i->_internaltimerh)
{
- i->_internalworkh_inuse = 1;
+ if(0 == i->_internalworkh_inuse)
+ {
+ i->_internalworkh_inuse = 1;
+ }
g.unlock();
WaitForThreadpoolTimerCallbacks((PTP_TIMER) i->_internaltimerh, true);
g.lock();
if(i->_internalworkh_inuse == 2)
{
- CloseThreadpoolTimer((PTP_TIMER) i->_internaltimerh);
- i->_internalworkh = nullptr;
+ if(nullptr != i->_internalworkh)
+ {
+ CloseThreadpoolWork((PTP_WORK) i->_internalworkh);
+ i->_internalworkh = nullptr;
+ }
+ if(nullptr != i->_internaltimerh)
+ {
+ CloseThreadpoolTimer((PTP_TIMER) i->_internaltimerh);
+ i->_internaltimerh = nullptr;
+ }
}
i->_internalworkh_inuse = 0;
}
@@ -632,27 +655,49 @@ namespace detail
auto *i = group->_work_items_active.front;
if(nullptr != i->_internalworkh)
{
- i->_internalworkh_inuse = 1;
+ if(0 == i->_internalworkh_inuse)
+ {
+ i->_internalworkh_inuse = 1;
+ }
g.unlock();
WaitForThreadpoolWorkCallbacks((PTP_WORK) i->_internalworkh, false);
g.lock();
if(i->_internalworkh_inuse == 2)
{
- CloseThreadpoolWork((PTP_WORK) i->_internalworkh);
- i->_internalworkh = nullptr;
+ if(nullptr != i->_internalworkh)
+ {
+ CloseThreadpoolWork((PTP_WORK) i->_internalworkh);
+ i->_internalworkh = nullptr;
+ }
+ if(nullptr != i->_internaltimerh)
+ {
+ CloseThreadpoolTimer((PTP_TIMER) i->_internaltimerh);
+ i->_internaltimerh = nullptr;
+ }
}
i->_internalworkh_inuse = 0;
}
if(nullptr != i->_internaltimerh)
{
- i->_internalworkh_inuse = 1;
+ if(0 == i->_internalworkh_inuse)
+ {
+ i->_internalworkh_inuse = 1;
+ }
g.unlock();
WaitForThreadpoolTimerCallbacks((PTP_TIMER) i->_internaltimerh, false);
g.lock();
if(i->_internalworkh_inuse == 2)
{
- CloseThreadpoolTimer((PTP_TIMER) i->_internaltimerh);
- i->_internalworkh = nullptr;
+ if(nullptr != i->_internalworkh)
+ {
+ CloseThreadpoolWork((PTP_WORK) i->_internalworkh);
+ i->_internalworkh = nullptr;
+ }
+ if(nullptr != i->_internaltimerh)
+ {
+ CloseThreadpoolTimer((PTP_TIMER) i->_internaltimerh);
+ i->_internaltimerh = nullptr;
+ }
}
i->_internalworkh_inuse = 0;
}
@@ -707,7 +752,7 @@ namespace detail
auto old_thread_local_state = tls;
tls.workitem = workitem;
tls.current_callback_instance = selfthreadh;
- tls.nesting_level++;
+ tls.nesting_level = workitem->_parent->_nesting_level + 1;
auto r = (*workitem)(workitem->_nextwork);
workitem->_nextwork = 0; // call next() next time
tls = old_thread_local_state;
diff --git a/include/llfio/v2.0/dynamic_thread_pool_group.hpp b/include/llfio/v2.0/dynamic_thread_pool_group.hpp
index c216f023..d3b6279d 100644
--- a/include/llfio/v2.0/dynamic_thread_pool_group.hpp
+++ b/include/llfio/v2.0/dynamic_thread_pool_group.hpp
@@ -158,20 +158,56 @@ public:
dynamic_thread_pool_group_impl *_parent{nullptr};
void *_internalworkh{nullptr}, *_internaltimerh{nullptr};
work_item *_prev{nullptr}, *_next{nullptr};
- intptr_t _nextwork{0};
+ intptr_t _nextwork{-1};
std::chrono::steady_clock::time_point _timepoint1;
std::chrono::system_clock::time_point _timepoint2;
int _internalworkh_inuse{0};
protected:
work_item() = default;
- work_item(const work_item &) = default;
- work_item(work_item &&) = default;
- work_item &operator=(const work_item &) = default;
- work_item &operator=(work_item &&) = default;
+ work_item(const work_item &o) = delete;
+ work_item(work_item &&o) noexcept
+ : _parent(o._parent)
+ , _internalworkh(o._internalworkh)
+ , _internaltimerh(o._internaltimerh)
+ , _prev(o._prev)
+ , _next(o._next)
+ , _nextwork(o._nextwork)
+ , _timepoint1(o._timepoint1)
+ , _timepoint2(o._timepoint2)
+ {
+ assert(o._parent == nullptr);
+ assert(o._internalworkh == nullptr);
+ assert(o._internaltimerh == nullptr);
+ if(o._parent != nullptr || o._internalworkh != nullptr)
+ {
+ LLFIO_LOG_FATAL(this, "FATAL: dynamic_thread_pool_group::work_item was relocated in memory during use!");
+ abort();
+ }
+ o._prev = o._next = nullptr;
+ o._nextwork = -1;
+ }
+ work_item &operator=(const work_item &) = delete;
+ work_item &operator=(work_item &&) = delete;
public:
- virtual ~work_item() {}
+ virtual ~work_item()
+ {
+ assert(_nextwork == -1);
+ if(_nextwork != -1)
+ {
+ LLFIO_LOG_FATAL(this, "FATAL: dynamic_thread_pool_group::work_item destroyed before all work was done!");
+ abort();
+ }
+ assert(_internalworkh == nullptr);
+ assert(_internaltimerh == nullptr);
+ assert(_parent == nullptr);
+ if(_internalworkh != nullptr || _parent != nullptr)
+ {
+ LLFIO_LOG_FATAL(this, "FATAL: dynamic_thread_pool_group::work_item destroyed before group_complete() was executed!");
+ abort();
+ }
+ }
//! Returns the parent work group between successful submission and just before `group_complete()`.
dynamic_thread_pool_group *parent() const noexcept { return reinterpret_cast<dynamic_thread_pool_group *>(_parent); }
@@ -245,6 +281,20 @@ public:
`errc::operation_canceled` is returned if you try.
*/
virtual result<void> submit(span<work_item *> work) noexcept = 0;
+ //! \overload
+ result<void> submit(work_item *wi) noexcept { return submit(span<work_item *>(&wi, 1)); }
+ //! \overload
+ LLFIO_TEMPLATE(class T)
+ LLFIO_TREQUIRES(LLFIO_TPRED(!std::is_pointer<T>::value), LLFIO_TPRED(std::is_base_of<work_item, T>::value))
+ result<void> submit(span<T> wi) noexcept
+ {
+ auto *wis = (T **) alloca(sizeof(T *) * wi.size());
+ for(size_t n = 0; n < wi.size(); n++)
+ {
+ wis[n] = &wi[n];
+ }
+ return submit(span<work_item *>((work_item **) wis, wi.size()));
+ }
//! Threadsafe. Cancel any remaining work previously submitted, but without blocking (use `wait()` to block).
virtual result<void> stop() noexcept = 0;