diff options
author | Niall Douglas (s [underscore] sourceforge {at} nedprod [dot] com) <spamtrap@nedprod.com> | 2020-04-22 13:39:15 +0300 |
---|---|---|
committer | Niall Douglas (s [underscore] sourceforge {at} nedprod [dot] com) <spamtrap@nedprod.com> | 2020-04-22 13:39:15 +0300 |
commit | d3afbce5fe3f63b47dba9b14057d0cbd018e513e (patch) | |
tree | 8a25a155e413113dd6a6d09f4a30f622921e6d43 | |
parent | 181e7310b40b574ba39ab411b93f87ecac689ed9 (diff) |
Port async pipe handle test over from resumable i/o branch
-rw-r--r-- | CMakeLists.txt | 4 | ||||
-rw-r--r-- | include/llfio/revision.hpp | 6 | ||||
-rw-r--r-- | include/llfio/v2.0/detail/impl/windows/test/iocp_multiplexer.ipp | 92 | ||||
-rw-r--r-- | include/llfio/v2.0/io_multiplexer.hpp | 44 | ||||
-rw-r--r-- | test/tests/pipe_handle.cpp | 284 |
5 files changed, 377 insertions, 53 deletions
diff --git a/CMakeLists.txt b/CMakeLists.txt index 4cbe6303..67ff41ba 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -291,7 +291,7 @@ int main() { endif() # Set any macros this library requires -all_compile_definitions(PRIVATE LLFIO_INCLUDE_STORAGE_PROFILE=1) +all_compile_definitions(PRIVATE LLFIO_INCLUDE_STORAGE_PROFILE=1 LLFIO_ENABLE_TEST_IO_MULTIPLEXERS=1) foreach(target ${llfio_EXAMPLE_TARGETS}) target_compile_definitions(${target} PRIVATE LLFIO_INCLUDE_STORAGE_PROFILE=1) endforeach() @@ -336,7 +336,7 @@ if(NOT PROJECT_IS_DEPENDENCY) include(QuickCppLibMakeStandardTests) # For each test target, set definitions and linkage foreach(target ${llfio_COMPILE_TEST_TARGETS} ${llfio_TEST_TARGETS}) - target_compile_definitions(${target} PRIVATE LLFIO_INCLUDE_STORAGE_PROFILE=1) + target_compile_definitions(${target} PRIVATE LLFIO_INCLUDE_STORAGE_PROFILE=1 $<$<PLATFORM_ID:Windows>:LLFIO_ENABLE_TEST_IO_MULTIPLEXERS=1>) endforeach() find_quickcpplib_library(kerneltest GIT_REPOSITORY "https://github.com/ned14/kerneltest.git" diff --git a/include/llfio/revision.hpp b/include/llfio/revision.hpp index 6453d0d2..41911801 100644 --- a/include/llfio/revision.hpp +++ b/include/llfio/revision.hpp @@ -1,4 +1,4 @@ // Note the second line of this file must ALWAYS be the git SHA, third line ALWAYS the git SHA update time -#define LLFIO_PREVIOUS_COMMIT_REF 4441633ac61b3a43e69888e3217ecbaea9961d9b -#define LLFIO_PREVIOUS_COMMIT_DATE "2020-04-21 09:59:37 +00:00" -#define LLFIO_PREVIOUS_COMMIT_UNIQUE 4441633a +#define LLFIO_PREVIOUS_COMMIT_REF 181e7310b40b574ba39ab411b93f87ecac689ed9 +#define LLFIO_PREVIOUS_COMMIT_DATE "2020-04-22 08:27:07 +00:00" +#define LLFIO_PREVIOUS_COMMIT_UNIQUE 181e7310 diff --git a/include/llfio/v2.0/detail/impl/windows/test/iocp_multiplexer.ipp b/include/llfio/v2.0/detail/impl/windows/test/iocp_multiplexer.ipp index 83968430..488cde03 100644 --- a/include/llfio/v2.0/detail/impl/windows/test/iocp_multiplexer.ipp +++ b/include/llfio/v2.0/detail/impl/windows/test/iocp_multiplexer.ipp @@ -60,6 +60,8 @@ namespace test // static_assert(sizeof(_iocp_operation_state) <= _iocp_operation_state_alignment, "_iocp_operation_state alignment is insufficiently large!"); using _state_lock_guard = typename _iocp_operation_state::_lock_guard; + bool _disable_immediate_completions{false}; + public: constexpr win_iocp_multiplexer() {} win_iocp_multiplexer(const win_iocp_multiplexer &) = delete; @@ -73,13 +75,14 @@ namespace test (void) win_iocp_multiplexer::close(); } } - result<void> init(size_t threads) + result<void> init(size_t threads, bool disable_immediate_completions) { this->_v.h = CreateIoCompletionPort(INVALID_HANDLE_VALUE, nullptr, 0, (DWORD) threads); if(nullptr == this->_v.h) { return win32_error(); } + _disable_immediate_completions = disable_immediate_completions; return success(); } @@ -106,6 +109,10 @@ namespace test { return ntkernel_error(ntstat); } + if(_disable_immediate_completions) + { + return success(); + } // If this works, we can avoid IOCP entirely for immediately completing i/o // It'll set native_handle_type::disposition::_multiplexer_state_bit0 if // we successfully executed this @@ -210,13 +217,25 @@ namespace test // LLFIO_HEADERS_ONLY_VIRTUAL_SPEC result<void> flush_inited_io_operations() noexcept { return success(); } template <class U> io_operation_state_type _check_io_operation(_iocp_operation_state *state, U &&f) noexcept { - auto fill_io_result = [&](auto &ret, auto ¶ms) { + auto fill_io_result = [&](auto &ret, auto ¶ms) -> bool { + for(size_t n = 0; n < params.reqs.buffers.size(); n++) + { + if(state->_ols[n].Status == STATUS_PENDING) + { + f(state->_ols[n]); + break; + } + } + if(is_completed(state->state) || is_finished(state->state)) + { + return false; + } for(size_t n = 0; n < params.reqs.buffers.size(); n++) { assert(state->_ols[n].Status != -1); if(state->_ols[n].Status == STATUS_PENDING) { - f(); + return false; } if(state->_ols[n].Status < 0) { @@ -229,6 +248,7 @@ namespace test ret = {params.reqs.buffers.data(), n + 1}; } } + return true; }; auto v = state->current_state(); switch(v) @@ -237,43 +257,52 @@ namespace test case io_operation_state_type::read_initiated: { io_result<buffers_type> ret = {state->payload.noncompleted.params.read.reqs.buffers.data(), 0}; - fill_io_result(ret, state->payload.noncompleted.params.read); - state->read_completed(std::move(ret)); - if(state->h->native_handle().behaviour & native_handle_type::disposition::_multiplexer_state_bit0) + if(fill_io_result(ret, state->payload.noncompleted.params.read)) { - // SetFileCompletionNotificationModes() above was successful, so we are done - state->read_finished(); - return io_operation_state_type::read_finished; + state->read_completed(std::move(ret)); + if(state->h->native_handle().behaviour & native_handle_type::disposition::_multiplexer_state_bit0) + { + // SetFileCompletionNotificationModes() above was successful, so we are done + state->read_finished(); + return io_operation_state_type::read_finished; + } + return io_operation_state_type::read_completed; } - return io_operation_state_type::read_completed; + return v; } case io_operation_state_type::write_initialised: case io_operation_state_type::write_initiated: { io_result<const_buffers_type> ret = {state->payload.noncompleted.params.write.reqs.buffers.data(), 0}; - fill_io_result(ret, state->payload.noncompleted.params.write); - state->write_completed(std::move(ret)); - if(state->h->native_handle().behaviour & native_handle_type::disposition::_multiplexer_state_bit0) + if(fill_io_result(ret, state->payload.noncompleted.params.write)) { - // SetFileCompletionNotificationModes() above was successful, so we are done - state->write_or_barrier_finished(); - return io_operation_state_type::write_or_barrier_finished; + state->write_completed(std::move(ret)); + if(state->h->native_handle().behaviour & native_handle_type::disposition::_multiplexer_state_bit0) + { + // SetFileCompletionNotificationModes() above was successful, so we are done + state->write_or_barrier_finished(); + return io_operation_state_type::write_or_barrier_finished; + } + return io_operation_state_type::write_or_barrier_completed; } - return io_operation_state_type::write_or_barrier_completed; + return v; } case io_operation_state_type::barrier_initialised: case io_operation_state_type::barrier_initiated: { io_result<const_buffers_type> ret = {state->payload.noncompleted.params.barrier.reqs.buffers.data(), 0}; - fill_io_result(ret, state->payload.noncompleted.params.barrier); - state->barrier_completed(std::move(ret)); - if(state->h->native_handle().behaviour & native_handle_type::disposition::_multiplexer_state_bit0) + if(fill_io_result(ret, state->payload.noncompleted.params.barrier)) { - // SetFileCompletionNotificationModes() above was successful, so we are done - state->write_or_barrier_finished(); - return io_operation_state_type::write_or_barrier_finished; + state->barrier_completed(std::move(ret)); + if(state->h->native_handle().behaviour & native_handle_type::disposition::_multiplexer_state_bit0) + { + // SetFileCompletionNotificationModes() above was successful, so we are done + state->write_or_barrier_finished(); + return io_operation_state_type::write_or_barrier_finished; + } + return io_operation_state_type::write_or_barrier_completed; } - return io_operation_state_type::write_or_barrier_completed; + return v; } default: break; @@ -286,10 +315,15 @@ namespace test LLFIO_LOG_FUNCTION_CALL(this); auto *state = static_cast<_iocp_operation_state *>(_op); // On Windows, one can update the STATUS_PENDING in an IO_STATUS_BLOCK by calling NtWaitForSingleObject() on it - return _check_io_operation(state, [state] { + return _check_io_operation(state, [&](windows_nt_kernel::IO_STATUS_BLOCK &ol) { LARGE_INTEGER timeout; timeout.QuadPart = 0; // poll, don't wait windows_nt_kernel::NtWaitForSingleObject(state->h->native_handle().h, false, &timeout); + if(ol.Status == STATUS_PENDING) + { + // If it still hasn't budged, try poking IOCP + (void) check_for_any_completed_io(std::chrono::seconds(0), 1); + } }); } LLFIO_HEADERS_ONLY_VIRTUAL_SPEC result<io_operation_state_type> cancel_io_operation(io_operation_state *_op, deadline d = {}) noexcept override @@ -388,7 +422,7 @@ namespace test // wake_check_for_any_completed_io() poke continue; } - if(is_completed(_check_io_operation(state, [&] {}))) + if(is_completed(_check_io_operation(state, [&](windows_nt_kernel::IO_STATUS_BLOCK &) {}))) { switch(state->state) { @@ -420,18 +454,18 @@ namespace test } }; - LLFIO_HEADERS_ONLY_FUNC_SPEC result<io_multiplexer_ptr> multiplexer_win_iocp(size_t threads) noexcept + LLFIO_HEADERS_ONLY_FUNC_SPEC result<io_multiplexer_ptr> multiplexer_win_iocp(size_t threads, bool disable_immediate_completions) noexcept { try { if(1 == threads) { auto ret = std::make_unique<win_iocp_multiplexer<false>>(); - OUTCOME_TRY(ret->init(1)); + OUTCOME_TRY(ret->init(1, disable_immediate_completions)); return ret; } auto ret = std::make_unique<win_iocp_multiplexer<true>>(); - OUTCOME_TRY(ret->init(threads)); + OUTCOME_TRY(ret->init(threads, disable_immediate_completions)); return ret; } catch(...) diff --git a/include/llfio/v2.0/io_multiplexer.hpp b/include/llfio/v2.0/io_multiplexer.hpp index 9e5d62e0..a0e12917 100644 --- a/include/llfio/v2.0/io_multiplexer.hpp +++ b/include/llfio/v2.0/io_multiplexer.hpp @@ -527,21 +527,22 @@ public: //! Used to retrieve the current state of the i/o operation virtual io_operation_state_type current_state() const noexcept = 0; - //! After an i/o operation has finished, used to retrieve the result. This can only be called once. + //! After an i/o operation has finished, can be used to retrieve the result if the visitor did not. virtual io_result<buffers_type> get_completed_read() &&noexcept = 0; - //! After an i/o operation has finished, used to retrieve the result. This can only be called once. + //! After an i/o operation has finished, can be used to retrieve the result if the visitor did not. virtual io_result<const_buffers_type> get_completed_write_or_barrier() &&noexcept = 0; }; + //! \brief Called by an i/o operation state to inform you of state change. Note that the i/o operation state lock is HELD during these calls! struct io_operation_state_visitor { virtual ~io_operation_state_visitor() {} virtual void read_initiated(io_operation_state * /*state*/, io_operation_state_type /*former*/) {} - virtual void read_completed(io_operation_state * /*state*/, io_operation_state_type /*former*/) {} + virtual void read_completed(io_operation_state * /*state*/, io_operation_state_type /*former*/, io_result<buffers_type> && /*res*/) {} virtual void read_finished(io_operation_state * /*state*/, io_operation_state_type /*former*/) {} virtual void write_initiated(io_operation_state * /*state*/, io_operation_state_type /*former*/) {} - virtual void write_completed(io_operation_state * /*state*/, io_operation_state_type /*former*/) {} + virtual void write_completed(io_operation_state * /*state*/, io_operation_state_type /*former*/, io_result<const_buffers_type> && /*res*/) {} virtual void barrier_initiated(io_operation_state * /*state*/, io_operation_state_type /*former*/) {} - virtual void barrier_completed(io_operation_state * /*state*/, io_operation_state_type /*former*/) {} + virtual void barrier_completed(io_operation_state * /*state*/, io_operation_state_type /*former*/, io_result<const_buffers_type> && /*res*/) {} virtual void write_or_barrier_finished(io_operation_state * /*state*/, io_operation_state_type /*former*/) {} }; @@ -726,21 +727,11 @@ protected: virtual io_operation_state_type current_state() const noexcept override { return state; } virtual io_result<buffers_type> get_completed_read() && noexcept override { - assert(is_completed(state) || is_finished(state)); - if(!is_completed(state) && !is_finished(state)) - { - abort(); - } io_result<buffers_type> ret(std::move(payload.completed_read)); return ret; } virtual io_result<const_buffers_type> get_completed_write_or_barrier() && noexcept override { - assert(is_completed(state) || is_finished(state)); - if(!is_completed(state) && !is_finished(state)) - { - abort(); - } io_result<const_buffers_type> ret(std::move(payload.completed_write_or_barrier)); return ret; } @@ -764,7 +755,7 @@ protected: new(&payload.completed_read) io_result<buffers_type>(std::move(res)); if(this->visitor != nullptr) { - this->visitor->read_completed(this, state); + this->visitor->read_completed(this, state, std::move(payload.completed_read)); } state = io_operation_state_type::read_completed; } @@ -799,7 +790,7 @@ protected: new(&payload.completed_write_or_barrier) io_result<const_buffers_type>(std::move(res)); if(this->visitor != nullptr) { - this->visitor->write_completed(this, state); + this->visitor->write_completed(this, state, std::move(payload.completed_write_or_barrier)); } state = io_operation_state_type::write_or_barrier_completed; } @@ -823,7 +814,7 @@ protected: new(&payload.completed_write_or_barrier) io_result<const_buffers_type>(std::move(res)); if(this->visitor != nullptr) { - this->visitor->barrier_completed(this, state); + this->visitor->barrier_completed(this, state, std::move(payload.completed_write_or_barrier)); } state = io_operation_state_type::write_or_barrier_completed; } @@ -869,6 +860,11 @@ protected: return std::move(*this)._unsynchronised_io_operation_state::get_completed_write_or_barrier(); } + virtual void read_initiated() override + { + _lock_guard g(this->_lock); + return _unsynchronised_io_operation_state::read_initiated(); + } virtual void read_completed(io_result<buffers_type> &&res) override { _lock_guard g(this->_lock); @@ -879,11 +875,21 @@ protected: _lock_guard g(this->_lock); return _unsynchronised_io_operation_state::read_finished(); } + virtual void write_initiated() override + { + _lock_guard g(this->_lock); + return _unsynchronised_io_operation_state::write_initiated(); + } virtual void write_completed(io_result<const_buffers_type> &&res) override { _lock_guard g(this->_lock); return _unsynchronised_io_operation_state::write_completed(std::move(res)); } + virtual void barrier_initiated() override + { + _lock_guard g(this->_lock); + return _unsynchronised_io_operation_state::barrier_initiated(); + } virtual void barrier_completed(io_result<const_buffers_type> &&res) override { _lock_guard g(this->_lock); @@ -966,7 +972,7 @@ namespace test The multiplexer returned by this function is only a partial implementation, used only by the test suite. In particular it does not fully implement deadlined i/o. */ - LLFIO_HEADERS_ONLY_FUNC_SPEC result<io_multiplexer_ptr> multiplexer_win_iocp(size_t threads) noexcept; + LLFIO_HEADERS_ONLY_FUNC_SPEC result<io_multiplexer_ptr> multiplexer_win_iocp(size_t threads, bool disable_immediate_completions) noexcept; #endif } // namespace test #endif diff --git a/test/tests/pipe_handle.cpp b/test/tests/pipe_handle.cpp index b266af10..0f514859 100644 --- a/test/tests/pipe_handle.cpp +++ b/test/tests/pipe_handle.cpp @@ -25,6 +25,7 @@ Distributed under the Boost Software License, Version 1.0. #include "../test_kernel_decl.hpp" #include <future> +#include <unordered_set> static inline void TestBlockingPipeHandle() { @@ -93,5 +94,288 @@ static inline void TestNonBlockingPipeHandle() reader.close().value(); } +#if LLFIO_ENABLE_TEST_IO_MULTIPLEXERS +static inline void TestMultiplexedPipeHandle() +{ + static constexpr size_t MAX_PIPES = 64; + namespace llfio = LLFIO_V2_NAMESPACE; + auto test_multiplexer = [](llfio::io_multiplexer_ptr multiplexer) { + std::vector<llfio::pipe_handle> read_pipes, write_pipes; + std::vector<size_t> received_for(MAX_PIPES); + struct checking_receiver final : public llfio::io_multiplexer::io_operation_state_visitor + { + size_t myindex; + std::unique_ptr<llfio::byte[]> io_state_ptr; + std::vector<size_t> &received_for; + union { + llfio::byte _buffer[sizeof(size_t)]; + size_t _index; + }; + llfio::pipe_handle::buffer_type buffer; + llfio::io_multiplexer::io_operation_state *io_state{nullptr}; + + checking_receiver(size_t _myindex, llfio::io_multiplexer_ptr &multiplexer, std::vector<size_t> &r) + : myindex(_myindex) + , io_state_ptr(std::make_unique<llfio::byte[]>(multiplexer->io_state_requirements().first)) + , received_for(r) + , buffer(_buffer, sizeof(_buffer)) + { + memset(_buffer, 0, sizeof(_buffer)); + } + checking_receiver(const checking_receiver &) = delete; + checking_receiver(checking_receiver &&o) = default; + checking_receiver &operator=(const checking_receiver &) = delete; + checking_receiver &operator=(checking_receiver &&) = default; + ~checking_receiver() + { + if(io_state != nullptr) + { + if(!is_finished(io_state->current_state())) + { + abort(); + } + io_state->~io_operation_state(); + io_state = nullptr; + } + } + + // Initiated the read + llfio::result<void> read_begin(llfio::io_multiplexer_ptr &multiplexer, llfio::io_handle &h) + { + if(io_state != nullptr) + { + BOOST_REQUIRE(is_finished(io_state->current_state())); + io_state->~io_operation_state(); + io_state = nullptr; + } + buffer = {_buffer, sizeof(_buffer)}; + OUTCOME_TRY(s, multiplexer->init_io_operation({io_state_ptr.get(), 4096 /*lies*/}, &h, this, {}, {}, llfio::pipe_handle::io_request<llfio::pipe_handle::buffers_type>({&buffer, 1}, 0))); + io_state = s; + return llfio::success(); + } + + // Called if the read did not complete immediately + virtual void read_initiated(llfio::io_multiplexer::io_operation_state * /*state*/, llfio::io_operation_state_type /*former*/) override { std::cout << " Pipe " << myindex << " will complete read later" << std::endl; } + + // Called when the read completes + virtual void read_completed(llfio::io_multiplexer::io_operation_state * /*state*/, llfio::io_operation_state_type former, llfio::pipe_handle::io_result<llfio::pipe_handle::buffers_type> &&res) override + { + if(is_initialised(former)) + { + std::cout << " Pipe " << myindex << " read completes immediately" << std::endl; + } + else + { + std::cout << " Pipe " << myindex << " read completes asynchronously" << std::endl; + } + BOOST_CHECK(res.has_value()); + if(res) + { + BOOST_REQUIRE(res.value().size() == 1); + BOOST_CHECK(res.value()[0].data() == _buffer); + BOOST_CHECK(res.value()[0].size() == sizeof(size_t)); + BOOST_REQUIRE(_index < MAX_PIPES); + BOOST_CHECK(_index == myindex); + received_for[_index]++; + } + } + + // Called when the state for the read can be disposed + virtual void read_finished(llfio::io_multiplexer::io_operation_state * /*state*/, llfio::io_operation_state_type former) override + { + std::cout << " Pipe " << myindex << " read finishes" << std::endl; + BOOST_REQUIRE(former == llfio::io_operation_state_type::read_completed); + } + }; + std::vector<checking_receiver> async_reads; + for(size_t n = 0; n < MAX_PIPES; n++) + { + auto ret = llfio::pipe_handle::anonymous_pipe(llfio::pipe_handle::caching::reads, llfio::pipe_handle::flag::multiplexable).value(); + ret.first.set_multiplexer(multiplexer.get()).value(); + async_reads.push_back(checking_receiver(n, multiplexer, received_for)); + read_pipes.push_back(std::move(ret.first)); + write_pipes.push_back(std::move(ret.second)); + } + auto writerthread = std::async([&] { + std::this_thread::sleep_for(std::chrono::milliseconds(500)); + for(size_t n = MAX_PIPES - 1; n < MAX_PIPES; n--) + { + auto r = write_pipes[n].write(0, {{(llfio::byte *) &n, sizeof(n)}}); + if(!r) + { + abort(); + } + } + }); + // Start the pipe reads. They cannot move in memory until complete + for(size_t n = 0; n < MAX_PIPES; n++) + { + async_reads[n].read_begin(multiplexer, read_pipes[n]).value(); + } + // Wait for all reads to complete + for(size_t n = 0; n < MAX_PIPES; n++) + { + // Spin until this i/o completes + for(;;) + { + auto state = multiplexer->check_io_operation(async_reads[n].io_state); + if(is_completed(state) || is_finished(state)) + { + break; + } + } + } + for(size_t n = 0; n < MAX_PIPES; n++) + { + BOOST_CHECK(received_for[n] == 1); + } + // Wait for all reads to finish + for(size_t n = 0; n < MAX_PIPES; n++) + { + llfio::io_operation_state_type state ; + while(!is_finished(state = async_reads[n].io_state->current_state())) + { + multiplexer->check_for_any_completed_io().value(); + } + } + writerthread.get(); + }; +#ifdef _WIN32 + std::cout << "\nSingle threaded IOCP, immediate completions:\n"; + test_multiplexer(llfio::test::multiplexer_win_iocp(1, false).value()); + std::cout << "\nSingle threaded IOCP, reactor completions:\n"; + test_multiplexer(llfio::test::multiplexer_win_iocp(1, true).value()); + std::cout << "\nMultithreaded IOCP, immediate completions:\n"; + test_multiplexer(llfio::test::multiplexer_win_iocp(2, false).value()); + std::cout << "\nMultithreaded IOCP, reactor completions:\n"; + test_multiplexer(llfio::test::multiplexer_win_iocp(2, true).value()); +#else +#error Not implemented yet +#endif +} + +#if LLFIO_ENABLE_COROUTINES && 0 +static inline void TestCoroutinedPipeHandle() +{ + static constexpr size_t MAX_PIPES = 70; + namespace llfio = LLFIO_V2_NAMESPACE; + struct io_visitor + { + using container_type = std::unordered_set<llfio::io_awaitable<llfio::async_read, io_visitor> *>; + container_type &c; + void await_suspend(container_type::value_type i) { c.insert(i); } + void await_resume(container_type::value_type i) { c.erase(i); } + }; + io_visitor::container_type io_pending; + struct coroutine + { + llfio::pipe_handle read_pipe, write_pipe; + size_t received_for{0}; + + explicit coroutine(llfio::pipe_handle &&r, llfio::pipe_handle &&w) + : read_pipe(std::move(r)) + , write_pipe(std::move(w)) + { + } + llfio::eager<llfio::result<void>> operator()(io_visitor::container_type &io_pending) + { + union { + llfio::byte _buffer[sizeof(size_t)]; + size_t _index; + }; + llfio::pipe_handle::buffer_type buffer; + for(;;) + { + buffer = {_buffer, sizeof(_buffer)}; + // This will never return if the coroutine gets cancelled + auto r = co_await read_pipe.co_read(io_visitor{io_pending}, {{buffer}, 0}); + if(!r) + { + co_return r.error(); + } + BOOST_CHECK(r.value().size() == 1); + BOOST_CHECK(r.value()[0].size() == sizeof(_buffer)); + ++received_for; + } + } + }; + std::vector<coroutine> coroutines; + auto multiplexer = llfio::this_thread::multiplexer(); + for(size_t n = 0; n < MAX_PIPES; n++) + { + auto ret = llfio::pipe_handle::anonymous_pipe(llfio::pipe_handle::caching::reads, llfio::pipe_handle::flag::multiplexable).value(); + ret.first.set_multiplexer(multiplexer).value(); + coroutines.push_back(coroutine(std::move(ret.first), std::move(ret.second))); + } + // Start the coroutines, all of whom will begin a read and then suspend + std::vector<llfio::optional<llfio::eager<llfio::result<void>>>> states(MAX_PIPES); + for(size_t n = 0; n < MAX_PIPES; n++) + { + states[n].emplace(coroutines[n](io_pending)); + } + // Write to all the pipes, then pump coroutine resumption until all completions done + for(size_t i = 0; i < 10; i++) + { + for(size_t n = MAX_PIPES - 1; n < MAX_PIPES; n--) + { + coroutines[n].write_pipe.write(0, {{(llfio::byte *) &i, sizeof(i)}}).value(); + } + // Take a copy of all pending i/o + std::vector<io_visitor::container_type::value_type> copy(io_pending.begin(), io_pending.end()); + for(;;) + { + // Manually check if an i/o completion is ready, avoiding any syscalls + bool need_to_poll = true; + for(auto it = copy.begin(); it != copy.end();) + { + if((*it)->await_ready()) + { + need_to_poll = false; + it = copy.erase(it); + std::cout << "Completed an i/o without syscall" << std::endl; + } + else + ++it; + } + if(need_to_poll) + { + // Have the kernel tell me when an i/o completion is ready + auto r = multiplexer->complete_io(); + BOOST_CHECK(r.value() != 0); + if(r.value() < 0) + { + for(size_t n = 0; n < MAX_PIPES; n++) + { + BOOST_CHECK(coroutines[n].received_for == i + 1); + } + break; + } + } + } + } + // Rethrow any failures + for(size_t n = 0; n < MAX_PIPES; n++) + { + if(states[n]->await_ready()) + { + states[n]->await_resume().value(); + } + } + // Destruction of coroutines when they are suspended must work. + // This will cancel any pending i/o and immediately exit the + // coroutines + states.clear(); + // Now clear all the coroutines + coroutines.clear(); +} +#endif +#endif + KERNELTEST_TEST_KERNEL(integration, llfio, pipe_handle, blocking, "Tests that blocking llfio::pipe_handle works as expected", TestBlockingPipeHandle()) KERNELTEST_TEST_KERNEL(integration, llfio, pipe_handle, nonblocking, "Tests that nonblocking llfio::pipe_handle works as expected", TestNonBlockingPipeHandle()) +#if LLFIO_ENABLE_TEST_IO_MULTIPLEXERS +KERNELTEST_TEST_KERNEL(integration, llfio, pipe_handle, multiplexed, "Tests that multiplexed llfio::pipe_handle works as expected", TestMultiplexedPipeHandle()) +#if LLFIO_ENABLE_COROUTINES && 0 +KERNELTEST_TEST_KERNEL(integration, llfio, pipe_handle, coroutined, "Tests that coroutined llfio::pipe_handle works as expected", TestCoroutinedPipeHandle()) +#endif +#endif |