Welcome to mirror list, hosted at ThFree Co, Russian Federation.

github.com/windirstat/llfio.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorNiall Douglas (s [underscore] sourceforge {at} nedprod [dot] com) <spamtrap@nedprod.com>2020-04-22 13:39:15 +0300
committerNiall Douglas (s [underscore] sourceforge {at} nedprod [dot] com) <spamtrap@nedprod.com>2020-04-22 13:39:15 +0300
commitd3afbce5fe3f63b47dba9b14057d0cbd018e513e (patch)
tree8a25a155e413113dd6a6d09f4a30f622921e6d43
parent181e7310b40b574ba39ab411b93f87ecac689ed9 (diff)
Port async pipe handle test over from resumable i/o branch
-rw-r--r--CMakeLists.txt4
-rw-r--r--include/llfio/revision.hpp6
-rw-r--r--include/llfio/v2.0/detail/impl/windows/test/iocp_multiplexer.ipp92
-rw-r--r--include/llfio/v2.0/io_multiplexer.hpp44
-rw-r--r--test/tests/pipe_handle.cpp284
5 files changed, 377 insertions, 53 deletions
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 4cbe6303..67ff41ba 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -291,7 +291,7 @@ int main() {
endif()
# Set any macros this library requires
-all_compile_definitions(PRIVATE LLFIO_INCLUDE_STORAGE_PROFILE=1)
+all_compile_definitions(PRIVATE LLFIO_INCLUDE_STORAGE_PROFILE=1 LLFIO_ENABLE_TEST_IO_MULTIPLEXERS=1)
foreach(target ${llfio_EXAMPLE_TARGETS})
target_compile_definitions(${target} PRIVATE LLFIO_INCLUDE_STORAGE_PROFILE=1)
endforeach()
@@ -336,7 +336,7 @@ if(NOT PROJECT_IS_DEPENDENCY)
include(QuickCppLibMakeStandardTests)
# For each test target, set definitions and linkage
foreach(target ${llfio_COMPILE_TEST_TARGETS} ${llfio_TEST_TARGETS})
- target_compile_definitions(${target} PRIVATE LLFIO_INCLUDE_STORAGE_PROFILE=1)
+ target_compile_definitions(${target} PRIVATE LLFIO_INCLUDE_STORAGE_PROFILE=1 $<$<PLATFORM_ID:Windows>:LLFIO_ENABLE_TEST_IO_MULTIPLEXERS=1>)
endforeach()
find_quickcpplib_library(kerneltest
GIT_REPOSITORY "https://github.com/ned14/kerneltest.git"
diff --git a/include/llfio/revision.hpp b/include/llfio/revision.hpp
index 6453d0d2..41911801 100644
--- a/include/llfio/revision.hpp
+++ b/include/llfio/revision.hpp
@@ -1,4 +1,4 @@
// Note the second line of this file must ALWAYS be the git SHA, third line ALWAYS the git SHA update time
-#define LLFIO_PREVIOUS_COMMIT_REF 4441633ac61b3a43e69888e3217ecbaea9961d9b
-#define LLFIO_PREVIOUS_COMMIT_DATE "2020-04-21 09:59:37 +00:00"
-#define LLFIO_PREVIOUS_COMMIT_UNIQUE 4441633a
+#define LLFIO_PREVIOUS_COMMIT_REF 181e7310b40b574ba39ab411b93f87ecac689ed9
+#define LLFIO_PREVIOUS_COMMIT_DATE "2020-04-22 08:27:07 +00:00"
+#define LLFIO_PREVIOUS_COMMIT_UNIQUE 181e7310
diff --git a/include/llfio/v2.0/detail/impl/windows/test/iocp_multiplexer.ipp b/include/llfio/v2.0/detail/impl/windows/test/iocp_multiplexer.ipp
index 83968430..488cde03 100644
--- a/include/llfio/v2.0/detail/impl/windows/test/iocp_multiplexer.ipp
+++ b/include/llfio/v2.0/detail/impl/windows/test/iocp_multiplexer.ipp
@@ -60,6 +60,8 @@ namespace test
// static_assert(sizeof(_iocp_operation_state) <= _iocp_operation_state_alignment, "_iocp_operation_state alignment is insufficiently large!");
using _state_lock_guard = typename _iocp_operation_state::_lock_guard;
+ bool _disable_immediate_completions{false};
+
public:
constexpr win_iocp_multiplexer() {}
win_iocp_multiplexer(const win_iocp_multiplexer &) = delete;
@@ -73,13 +75,14 @@ namespace test
(void) win_iocp_multiplexer::close();
}
}
- result<void> init(size_t threads)
+ result<void> init(size_t threads, bool disable_immediate_completions)
{
this->_v.h = CreateIoCompletionPort(INVALID_HANDLE_VALUE, nullptr, 0, (DWORD) threads);
if(nullptr == this->_v.h)
{
return win32_error();
}
+ _disable_immediate_completions = disable_immediate_completions;
return success();
}
@@ -106,6 +109,10 @@ namespace test
{
return ntkernel_error(ntstat);
}
+ if(_disable_immediate_completions)
+ {
+ return success();
+ }
// If this works, we can avoid IOCP entirely for immediately completing i/o
// It'll set native_handle_type::disposition::_multiplexer_state_bit0 if
// we successfully executed this
@@ -210,13 +217,25 @@ namespace test
// LLFIO_HEADERS_ONLY_VIRTUAL_SPEC result<void> flush_inited_io_operations() noexcept { return success(); }
template <class U> io_operation_state_type _check_io_operation(_iocp_operation_state *state, U &&f) noexcept
{
- auto fill_io_result = [&](auto &ret, auto &params) {
+ auto fill_io_result = [&](auto &ret, auto &params) -> bool {
+ for(size_t n = 0; n < params.reqs.buffers.size(); n++)
+ {
+ if(state->_ols[n].Status == STATUS_PENDING)
+ {
+ f(state->_ols[n]);
+ break;
+ }
+ }
+ if(is_completed(state->state) || is_finished(state->state))
+ {
+ return false;
+ }
for(size_t n = 0; n < params.reqs.buffers.size(); n++)
{
assert(state->_ols[n].Status != -1);
if(state->_ols[n].Status == STATUS_PENDING)
{
- f();
+ return false;
}
if(state->_ols[n].Status < 0)
{
@@ -229,6 +248,7 @@ namespace test
ret = {params.reqs.buffers.data(), n + 1};
}
}
+ return true;
};
auto v = state->current_state();
switch(v)
@@ -237,43 +257,52 @@ namespace test
case io_operation_state_type::read_initiated:
{
io_result<buffers_type> ret = {state->payload.noncompleted.params.read.reqs.buffers.data(), 0};
- fill_io_result(ret, state->payload.noncompleted.params.read);
- state->read_completed(std::move(ret));
- if(state->h->native_handle().behaviour & native_handle_type::disposition::_multiplexer_state_bit0)
+ if(fill_io_result(ret, state->payload.noncompleted.params.read))
{
- // SetFileCompletionNotificationModes() above was successful, so we are done
- state->read_finished();
- return io_operation_state_type::read_finished;
+ state->read_completed(std::move(ret));
+ if(state->h->native_handle().behaviour & native_handle_type::disposition::_multiplexer_state_bit0)
+ {
+ // SetFileCompletionNotificationModes() above was successful, so we are done
+ state->read_finished();
+ return io_operation_state_type::read_finished;
+ }
+ return io_operation_state_type::read_completed;
}
- return io_operation_state_type::read_completed;
+ return v;
}
case io_operation_state_type::write_initialised:
case io_operation_state_type::write_initiated:
{
io_result<const_buffers_type> ret = {state->payload.noncompleted.params.write.reqs.buffers.data(), 0};
- fill_io_result(ret, state->payload.noncompleted.params.write);
- state->write_completed(std::move(ret));
- if(state->h->native_handle().behaviour & native_handle_type::disposition::_multiplexer_state_bit0)
+ if(fill_io_result(ret, state->payload.noncompleted.params.write))
{
- // SetFileCompletionNotificationModes() above was successful, so we are done
- state->write_or_barrier_finished();
- return io_operation_state_type::write_or_barrier_finished;
+ state->write_completed(std::move(ret));
+ if(state->h->native_handle().behaviour & native_handle_type::disposition::_multiplexer_state_bit0)
+ {
+ // SetFileCompletionNotificationModes() above was successful, so we are done
+ state->write_or_barrier_finished();
+ return io_operation_state_type::write_or_barrier_finished;
+ }
+ return io_operation_state_type::write_or_barrier_completed;
}
- return io_operation_state_type::write_or_barrier_completed;
+ return v;
}
case io_operation_state_type::barrier_initialised:
case io_operation_state_type::barrier_initiated:
{
io_result<const_buffers_type> ret = {state->payload.noncompleted.params.barrier.reqs.buffers.data(), 0};
- fill_io_result(ret, state->payload.noncompleted.params.barrier);
- state->barrier_completed(std::move(ret));
- if(state->h->native_handle().behaviour & native_handle_type::disposition::_multiplexer_state_bit0)
+ if(fill_io_result(ret, state->payload.noncompleted.params.barrier))
{
- // SetFileCompletionNotificationModes() above was successful, so we are done
- state->write_or_barrier_finished();
- return io_operation_state_type::write_or_barrier_finished;
+ state->barrier_completed(std::move(ret));
+ if(state->h->native_handle().behaviour & native_handle_type::disposition::_multiplexer_state_bit0)
+ {
+ // SetFileCompletionNotificationModes() above was successful, so we are done
+ state->write_or_barrier_finished();
+ return io_operation_state_type::write_or_barrier_finished;
+ }
+ return io_operation_state_type::write_or_barrier_completed;
}
- return io_operation_state_type::write_or_barrier_completed;
+ return v;
}
default:
break;
@@ -286,10 +315,15 @@ namespace test
LLFIO_LOG_FUNCTION_CALL(this);
auto *state = static_cast<_iocp_operation_state *>(_op);
// On Windows, one can update the STATUS_PENDING in an IO_STATUS_BLOCK by calling NtWaitForSingleObject() on it
- return _check_io_operation(state, [state] {
+ return _check_io_operation(state, [&](windows_nt_kernel::IO_STATUS_BLOCK &ol) {
LARGE_INTEGER timeout;
timeout.QuadPart = 0; // poll, don't wait
windows_nt_kernel::NtWaitForSingleObject(state->h->native_handle().h, false, &timeout);
+ if(ol.Status == STATUS_PENDING)
+ {
+ // If it still hasn't budged, try poking IOCP
+ (void) check_for_any_completed_io(std::chrono::seconds(0), 1);
+ }
});
}
LLFIO_HEADERS_ONLY_VIRTUAL_SPEC result<io_operation_state_type> cancel_io_operation(io_operation_state *_op, deadline d = {}) noexcept override
@@ -388,7 +422,7 @@ namespace test
// wake_check_for_any_completed_io() poke
continue;
}
- if(is_completed(_check_io_operation(state, [&] {})))
+ if(is_completed(_check_io_operation(state, [&](windows_nt_kernel::IO_STATUS_BLOCK &) {})))
{
switch(state->state)
{
@@ -420,18 +454,18 @@ namespace test
}
};
- LLFIO_HEADERS_ONLY_FUNC_SPEC result<io_multiplexer_ptr> multiplexer_win_iocp(size_t threads) noexcept
+ LLFIO_HEADERS_ONLY_FUNC_SPEC result<io_multiplexer_ptr> multiplexer_win_iocp(size_t threads, bool disable_immediate_completions) noexcept
{
try
{
if(1 == threads)
{
auto ret = std::make_unique<win_iocp_multiplexer<false>>();
- OUTCOME_TRY(ret->init(1));
+ OUTCOME_TRY(ret->init(1, disable_immediate_completions));
return ret;
}
auto ret = std::make_unique<win_iocp_multiplexer<true>>();
- OUTCOME_TRY(ret->init(threads));
+ OUTCOME_TRY(ret->init(threads, disable_immediate_completions));
return ret;
}
catch(...)
diff --git a/include/llfio/v2.0/io_multiplexer.hpp b/include/llfio/v2.0/io_multiplexer.hpp
index 9e5d62e0..a0e12917 100644
--- a/include/llfio/v2.0/io_multiplexer.hpp
+++ b/include/llfio/v2.0/io_multiplexer.hpp
@@ -527,21 +527,22 @@ public:
//! Used to retrieve the current state of the i/o operation
virtual io_operation_state_type current_state() const noexcept = 0;
- //! After an i/o operation has finished, used to retrieve the result. This can only be called once.
+ //! After an i/o operation has finished, can be used to retrieve the result if the visitor did not.
virtual io_result<buffers_type> get_completed_read() &&noexcept = 0;
- //! After an i/o operation has finished, used to retrieve the result. This can only be called once.
+ //! After an i/o operation has finished, can be used to retrieve the result if the visitor did not.
virtual io_result<const_buffers_type> get_completed_write_or_barrier() &&noexcept = 0;
};
+ //! \brief Called by an i/o operation state to inform you of state change. Note that the i/o operation state lock is HELD during these calls!
struct io_operation_state_visitor
{
virtual ~io_operation_state_visitor() {}
virtual void read_initiated(io_operation_state * /*state*/, io_operation_state_type /*former*/) {}
- virtual void read_completed(io_operation_state * /*state*/, io_operation_state_type /*former*/) {}
+ virtual void read_completed(io_operation_state * /*state*/, io_operation_state_type /*former*/, io_result<buffers_type> && /*res*/) {}
virtual void read_finished(io_operation_state * /*state*/, io_operation_state_type /*former*/) {}
virtual void write_initiated(io_operation_state * /*state*/, io_operation_state_type /*former*/) {}
- virtual void write_completed(io_operation_state * /*state*/, io_operation_state_type /*former*/) {}
+ virtual void write_completed(io_operation_state * /*state*/, io_operation_state_type /*former*/, io_result<const_buffers_type> && /*res*/) {}
virtual void barrier_initiated(io_operation_state * /*state*/, io_operation_state_type /*former*/) {}
- virtual void barrier_completed(io_operation_state * /*state*/, io_operation_state_type /*former*/) {}
+ virtual void barrier_completed(io_operation_state * /*state*/, io_operation_state_type /*former*/, io_result<const_buffers_type> && /*res*/) {}
virtual void write_or_barrier_finished(io_operation_state * /*state*/, io_operation_state_type /*former*/) {}
};
@@ -726,21 +727,11 @@ protected:
virtual io_operation_state_type current_state() const noexcept override { return state; }
virtual io_result<buffers_type> get_completed_read() && noexcept override
{
- assert(is_completed(state) || is_finished(state));
- if(!is_completed(state) && !is_finished(state))
- {
- abort();
- }
io_result<buffers_type> ret(std::move(payload.completed_read));
return ret;
}
virtual io_result<const_buffers_type> get_completed_write_or_barrier() && noexcept override
{
- assert(is_completed(state) || is_finished(state));
- if(!is_completed(state) && !is_finished(state))
- {
- abort();
- }
io_result<const_buffers_type> ret(std::move(payload.completed_write_or_barrier));
return ret;
}
@@ -764,7 +755,7 @@ protected:
new(&payload.completed_read) io_result<buffers_type>(std::move(res));
if(this->visitor != nullptr)
{
- this->visitor->read_completed(this, state);
+ this->visitor->read_completed(this, state, std::move(payload.completed_read));
}
state = io_operation_state_type::read_completed;
}
@@ -799,7 +790,7 @@ protected:
new(&payload.completed_write_or_barrier) io_result<const_buffers_type>(std::move(res));
if(this->visitor != nullptr)
{
- this->visitor->write_completed(this, state);
+ this->visitor->write_completed(this, state, std::move(payload.completed_write_or_barrier));
}
state = io_operation_state_type::write_or_barrier_completed;
}
@@ -823,7 +814,7 @@ protected:
new(&payload.completed_write_or_barrier) io_result<const_buffers_type>(std::move(res));
if(this->visitor != nullptr)
{
- this->visitor->barrier_completed(this, state);
+ this->visitor->barrier_completed(this, state, std::move(payload.completed_write_or_barrier));
}
state = io_operation_state_type::write_or_barrier_completed;
}
@@ -869,6 +860,11 @@ protected:
return std::move(*this)._unsynchronised_io_operation_state::get_completed_write_or_barrier();
}
+ virtual void read_initiated() override
+ {
+ _lock_guard g(this->_lock);
+ return _unsynchronised_io_operation_state::read_initiated();
+ }
virtual void read_completed(io_result<buffers_type> &&res) override
{
_lock_guard g(this->_lock);
@@ -879,11 +875,21 @@ protected:
_lock_guard g(this->_lock);
return _unsynchronised_io_operation_state::read_finished();
}
+ virtual void write_initiated() override
+ {
+ _lock_guard g(this->_lock);
+ return _unsynchronised_io_operation_state::write_initiated();
+ }
virtual void write_completed(io_result<const_buffers_type> &&res) override
{
_lock_guard g(this->_lock);
return _unsynchronised_io_operation_state::write_completed(std::move(res));
}
+ virtual void barrier_initiated() override
+ {
+ _lock_guard g(this->_lock);
+ return _unsynchronised_io_operation_state::barrier_initiated();
+ }
virtual void barrier_completed(io_result<const_buffers_type> &&res) override
{
_lock_guard g(this->_lock);
@@ -966,7 +972,7 @@ namespace test
The multiplexer returned by this function is only a partial implementation, used
only by the test suite. In particular it does not fully implement deadlined i/o.
*/
- LLFIO_HEADERS_ONLY_FUNC_SPEC result<io_multiplexer_ptr> multiplexer_win_iocp(size_t threads) noexcept;
+ LLFIO_HEADERS_ONLY_FUNC_SPEC result<io_multiplexer_ptr> multiplexer_win_iocp(size_t threads, bool disable_immediate_completions) noexcept;
#endif
} // namespace test
#endif
diff --git a/test/tests/pipe_handle.cpp b/test/tests/pipe_handle.cpp
index b266af10..0f514859 100644
--- a/test/tests/pipe_handle.cpp
+++ b/test/tests/pipe_handle.cpp
@@ -25,6 +25,7 @@ Distributed under the Boost Software License, Version 1.0.
#include "../test_kernel_decl.hpp"
#include <future>
+#include <unordered_set>
static inline void TestBlockingPipeHandle()
{
@@ -93,5 +94,288 @@ static inline void TestNonBlockingPipeHandle()
reader.close().value();
}
+#if LLFIO_ENABLE_TEST_IO_MULTIPLEXERS
+static inline void TestMultiplexedPipeHandle()
+{
+ static constexpr size_t MAX_PIPES = 64;
+ namespace llfio = LLFIO_V2_NAMESPACE;
+ auto test_multiplexer = [](llfio::io_multiplexer_ptr multiplexer) {
+ std::vector<llfio::pipe_handle> read_pipes, write_pipes;
+ std::vector<size_t> received_for(MAX_PIPES);
+ struct checking_receiver final : public llfio::io_multiplexer::io_operation_state_visitor
+ {
+ size_t myindex;
+ std::unique_ptr<llfio::byte[]> io_state_ptr;
+ std::vector<size_t> &received_for;
+ union {
+ llfio::byte _buffer[sizeof(size_t)];
+ size_t _index;
+ };
+ llfio::pipe_handle::buffer_type buffer;
+ llfio::io_multiplexer::io_operation_state *io_state{nullptr};
+
+ checking_receiver(size_t _myindex, llfio::io_multiplexer_ptr &multiplexer, std::vector<size_t> &r)
+ : myindex(_myindex)
+ , io_state_ptr(std::make_unique<llfio::byte[]>(multiplexer->io_state_requirements().first))
+ , received_for(r)
+ , buffer(_buffer, sizeof(_buffer))
+ {
+ memset(_buffer, 0, sizeof(_buffer));
+ }
+ checking_receiver(const checking_receiver &) = delete;
+ checking_receiver(checking_receiver &&o) = default;
+ checking_receiver &operator=(const checking_receiver &) = delete;
+ checking_receiver &operator=(checking_receiver &&) = default;
+ ~checking_receiver()
+ {
+ if(io_state != nullptr)
+ {
+ if(!is_finished(io_state->current_state()))
+ {
+ abort();
+ }
+ io_state->~io_operation_state();
+ io_state = nullptr;
+ }
+ }
+
+ // Initiated the read
+ llfio::result<void> read_begin(llfio::io_multiplexer_ptr &multiplexer, llfio::io_handle &h)
+ {
+ if(io_state != nullptr)
+ {
+ BOOST_REQUIRE(is_finished(io_state->current_state()));
+ io_state->~io_operation_state();
+ io_state = nullptr;
+ }
+ buffer = {_buffer, sizeof(_buffer)};
+ OUTCOME_TRY(s, multiplexer->init_io_operation({io_state_ptr.get(), 4096 /*lies*/}, &h, this, {}, {}, llfio::pipe_handle::io_request<llfio::pipe_handle::buffers_type>({&buffer, 1}, 0)));
+ io_state = s;
+ return llfio::success();
+ }
+
+ // Called if the read did not complete immediately
+ virtual void read_initiated(llfio::io_multiplexer::io_operation_state * /*state*/, llfio::io_operation_state_type /*former*/) override { std::cout << " Pipe " << myindex << " will complete read later" << std::endl; }
+
+ // Called when the read completes
+ virtual void read_completed(llfio::io_multiplexer::io_operation_state * /*state*/, llfio::io_operation_state_type former, llfio::pipe_handle::io_result<llfio::pipe_handle::buffers_type> &&res) override
+ {
+ if(is_initialised(former))
+ {
+ std::cout << " Pipe " << myindex << " read completes immediately" << std::endl;
+ }
+ else
+ {
+ std::cout << " Pipe " << myindex << " read completes asynchronously" << std::endl;
+ }
+ BOOST_CHECK(res.has_value());
+ if(res)
+ {
+ BOOST_REQUIRE(res.value().size() == 1);
+ BOOST_CHECK(res.value()[0].data() == _buffer);
+ BOOST_CHECK(res.value()[0].size() == sizeof(size_t));
+ BOOST_REQUIRE(_index < MAX_PIPES);
+ BOOST_CHECK(_index == myindex);
+ received_for[_index]++;
+ }
+ }
+
+ // Called when the state for the read can be disposed
+ virtual void read_finished(llfio::io_multiplexer::io_operation_state * /*state*/, llfio::io_operation_state_type former) override
+ {
+ std::cout << " Pipe " << myindex << " read finishes" << std::endl;
+ BOOST_REQUIRE(former == llfio::io_operation_state_type::read_completed);
+ }
+ };
+ std::vector<checking_receiver> async_reads;
+ for(size_t n = 0; n < MAX_PIPES; n++)
+ {
+ auto ret = llfio::pipe_handle::anonymous_pipe(llfio::pipe_handle::caching::reads, llfio::pipe_handle::flag::multiplexable).value();
+ ret.first.set_multiplexer(multiplexer.get()).value();
+ async_reads.push_back(checking_receiver(n, multiplexer, received_for));
+ read_pipes.push_back(std::move(ret.first));
+ write_pipes.push_back(std::move(ret.second));
+ }
+ auto writerthread = std::async([&] {
+ std::this_thread::sleep_for(std::chrono::milliseconds(500));
+ for(size_t n = MAX_PIPES - 1; n < MAX_PIPES; n--)
+ {
+ auto r = write_pipes[n].write(0, {{(llfio::byte *) &n, sizeof(n)}});
+ if(!r)
+ {
+ abort();
+ }
+ }
+ });
+ // Start the pipe reads. They cannot move in memory until complete
+ for(size_t n = 0; n < MAX_PIPES; n++)
+ {
+ async_reads[n].read_begin(multiplexer, read_pipes[n]).value();
+ }
+ // Wait for all reads to complete
+ for(size_t n = 0; n < MAX_PIPES; n++)
+ {
+ // Spin until this i/o completes
+ for(;;)
+ {
+ auto state = multiplexer->check_io_operation(async_reads[n].io_state);
+ if(is_completed(state) || is_finished(state))
+ {
+ break;
+ }
+ }
+ }
+ for(size_t n = 0; n < MAX_PIPES; n++)
+ {
+ BOOST_CHECK(received_for[n] == 1);
+ }
+ // Wait for all reads to finish
+ for(size_t n = 0; n < MAX_PIPES; n++)
+ {
+ llfio::io_operation_state_type state ;
+ while(!is_finished(state = async_reads[n].io_state->current_state()))
+ {
+ multiplexer->check_for_any_completed_io().value();
+ }
+ }
+ writerthread.get();
+ };
+#ifdef _WIN32
+ std::cout << "\nSingle threaded IOCP, immediate completions:\n";
+ test_multiplexer(llfio::test::multiplexer_win_iocp(1, false).value());
+ std::cout << "\nSingle threaded IOCP, reactor completions:\n";
+ test_multiplexer(llfio::test::multiplexer_win_iocp(1, true).value());
+ std::cout << "\nMultithreaded IOCP, immediate completions:\n";
+ test_multiplexer(llfio::test::multiplexer_win_iocp(2, false).value());
+ std::cout << "\nMultithreaded IOCP, reactor completions:\n";
+ test_multiplexer(llfio::test::multiplexer_win_iocp(2, true).value());
+#else
+#error Not implemented yet
+#endif
+}
+
+#if LLFIO_ENABLE_COROUTINES && 0
+static inline void TestCoroutinedPipeHandle()
+{
+ static constexpr size_t MAX_PIPES = 70;
+ namespace llfio = LLFIO_V2_NAMESPACE;
+ struct io_visitor
+ {
+ using container_type = std::unordered_set<llfio::io_awaitable<llfio::async_read, io_visitor> *>;
+ container_type &c;
+ void await_suspend(container_type::value_type i) { c.insert(i); }
+ void await_resume(container_type::value_type i) { c.erase(i); }
+ };
+ io_visitor::container_type io_pending;
+ struct coroutine
+ {
+ llfio::pipe_handle read_pipe, write_pipe;
+ size_t received_for{0};
+
+ explicit coroutine(llfio::pipe_handle &&r, llfio::pipe_handle &&w)
+ : read_pipe(std::move(r))
+ , write_pipe(std::move(w))
+ {
+ }
+ llfio::eager<llfio::result<void>> operator()(io_visitor::container_type &io_pending)
+ {
+ union {
+ llfio::byte _buffer[sizeof(size_t)];
+ size_t _index;
+ };
+ llfio::pipe_handle::buffer_type buffer;
+ for(;;)
+ {
+ buffer = {_buffer, sizeof(_buffer)};
+ // This will never return if the coroutine gets cancelled
+ auto r = co_await read_pipe.co_read(io_visitor{io_pending}, {{buffer}, 0});
+ if(!r)
+ {
+ co_return r.error();
+ }
+ BOOST_CHECK(r.value().size() == 1);
+ BOOST_CHECK(r.value()[0].size() == sizeof(_buffer));
+ ++received_for;
+ }
+ }
+ };
+ std::vector<coroutine> coroutines;
+ auto multiplexer = llfio::this_thread::multiplexer();
+ for(size_t n = 0; n < MAX_PIPES; n++)
+ {
+ auto ret = llfio::pipe_handle::anonymous_pipe(llfio::pipe_handle::caching::reads, llfio::pipe_handle::flag::multiplexable).value();
+ ret.first.set_multiplexer(multiplexer).value();
+ coroutines.push_back(coroutine(std::move(ret.first), std::move(ret.second)));
+ }
+ // Start the coroutines, all of whom will begin a read and then suspend
+ std::vector<llfio::optional<llfio::eager<llfio::result<void>>>> states(MAX_PIPES);
+ for(size_t n = 0; n < MAX_PIPES; n++)
+ {
+ states[n].emplace(coroutines[n](io_pending));
+ }
+ // Write to all the pipes, then pump coroutine resumption until all completions done
+ for(size_t i = 0; i < 10; i++)
+ {
+ for(size_t n = MAX_PIPES - 1; n < MAX_PIPES; n--)
+ {
+ coroutines[n].write_pipe.write(0, {{(llfio::byte *) &i, sizeof(i)}}).value();
+ }
+ // Take a copy of all pending i/o
+ std::vector<io_visitor::container_type::value_type> copy(io_pending.begin(), io_pending.end());
+ for(;;)
+ {
+ // Manually check if an i/o completion is ready, avoiding any syscalls
+ bool need_to_poll = true;
+ for(auto it = copy.begin(); it != copy.end();)
+ {
+ if((*it)->await_ready())
+ {
+ need_to_poll = false;
+ it = copy.erase(it);
+ std::cout << "Completed an i/o without syscall" << std::endl;
+ }
+ else
+ ++it;
+ }
+ if(need_to_poll)
+ {
+ // Have the kernel tell me when an i/o completion is ready
+ auto r = multiplexer->complete_io();
+ BOOST_CHECK(r.value() != 0);
+ if(r.value() < 0)
+ {
+ for(size_t n = 0; n < MAX_PIPES; n++)
+ {
+ BOOST_CHECK(coroutines[n].received_for == i + 1);
+ }
+ break;
+ }
+ }
+ }
+ }
+ // Rethrow any failures
+ for(size_t n = 0; n < MAX_PIPES; n++)
+ {
+ if(states[n]->await_ready())
+ {
+ states[n]->await_resume().value();
+ }
+ }
+ // Destruction of coroutines when they are suspended must work.
+ // This will cancel any pending i/o and immediately exit the
+ // coroutines
+ states.clear();
+ // Now clear all the coroutines
+ coroutines.clear();
+}
+#endif
+#endif
+
KERNELTEST_TEST_KERNEL(integration, llfio, pipe_handle, blocking, "Tests that blocking llfio::pipe_handle works as expected", TestBlockingPipeHandle())
KERNELTEST_TEST_KERNEL(integration, llfio, pipe_handle, nonblocking, "Tests that nonblocking llfio::pipe_handle works as expected", TestNonBlockingPipeHandle())
+#if LLFIO_ENABLE_TEST_IO_MULTIPLEXERS
+KERNELTEST_TEST_KERNEL(integration, llfio, pipe_handle, multiplexed, "Tests that multiplexed llfio::pipe_handle works as expected", TestMultiplexedPipeHandle())
+#if LLFIO_ENABLE_COROUTINES && 0
+KERNELTEST_TEST_KERNEL(integration, llfio, pipe_handle, coroutined, "Tests that coroutined llfio::pipe_handle works as expected", TestCoroutinedPipeHandle())
+#endif
+#endif