From 65693556c49ab446c25f5a9810f1a9d242790db6 Mon Sep 17 00:00:00 2001 From: "Niall Douglas (s [underscore] sourceforge {at} nedprod [dot] com)" Date: Tue, 28 Apr 2020 11:00:09 +0100 Subject: Coroutines pipe test has been ported from resumable_io branch. --- test/tests/pipe_handle.cpp | 184 ++++++++++++++++++++++----------------------- 1 file changed, 90 insertions(+), 94 deletions(-) (limited to 'test') diff --git a/test/tests/pipe_handle.cpp b/test/tests/pipe_handle.cpp index 69727ff2..a56c1025 100644 --- a/test/tests/pipe_handle.cpp +++ b/test/tests/pipe_handle.cpp @@ -154,10 +154,10 @@ static inline void TestMultiplexedPipeHandle() } // Called if the read did not complete immediately - virtual void read_initiated(llfio::io_multiplexer::io_operation_state * /*state*/, llfio::io_operation_state_type /*former*/) override { std::cout << " Pipe " << myindex << " will complete read later" << std::endl; } + virtual void read_initiated(llfio::io_multiplexer::io_operation_state::lock_guard & /*g*/, llfio::io_operation_state_type /*former*/) override { std::cout << " Pipe " << myindex << " will complete read later" << std::endl; } // Called when the read completes - virtual bool read_completed(llfio::io_multiplexer::io_operation_state * /*state*/, llfio::io_operation_state_type former, llfio::pipe_handle::io_result &&res) override + virtual bool read_completed(llfio::io_multiplexer::io_operation_state::lock_guard & /*g*/, llfio::io_operation_state_type former, llfio::pipe_handle::io_result &&res) override { if(is_initialised(former)) { @@ -181,7 +181,7 @@ static inline void TestMultiplexedPipeHandle() } // Called when the state for the read can be disposed - virtual void read_finished(llfio::io_multiplexer::io_operation_state * /*state*/, llfio::io_operation_state_type former) override + virtual void read_finished(llfio::io_multiplexer::io_operation_state::lock_guard & /*g*/, llfio::io_operation_state_type former) override { std::cout << " Pipe " << myindex << " read finishes" << std::endl; BOOST_REQUIRE(former == llfio::io_operation_state_type::read_completed); @@ -232,7 +232,7 @@ static inline void TestMultiplexedPipeHandle() // Wait for all reads to finish for(size_t n = 0; n < MAX_PIPES; n++) { - llfio::io_operation_state_type state ; + llfio::io_operation_state_type state; while(!is_finished(state = async_reads[n].io_state->current_state())) { multiplexer->check_for_any_completed_io().value(); @@ -254,119 +254,115 @@ static inline void TestMultiplexedPipeHandle() #endif } -#if LLFIO_ENABLE_COROUTINES && 0 +#if LLFIO_ENABLE_COROUTINES static inline void TestCoroutinedPipeHandle() { static constexpr size_t MAX_PIPES = 70; namespace llfio = LLFIO_V2_NAMESPACE; - struct io_visitor - { - using container_type = std::unordered_set *>; - container_type &c; - void await_suspend(container_type::value_type i) { c.insert(i); } - void await_resume(container_type::value_type i) { c.erase(i); } - }; - io_visitor::container_type io_pending; - struct coroutine - { - llfio::pipe_handle read_pipe, write_pipe; - size_t received_for{0}; - - explicit coroutine(llfio::pipe_handle &&r, llfio::pipe_handle &&w) - : read_pipe(std::move(r)) - , write_pipe(std::move(w)) - { - } - llfio::eager> operator()(io_visitor::container_type &io_pending) + auto test_multiplexer = [](llfio::io_multiplexer_ptr multiplexer) { + struct coroutine { - union { - llfio::byte _buffer[sizeof(size_t)]; - size_t _index; - }; - llfio::pipe_handle::buffer_type buffer; - for(;;) + llfio::pipe_handle read_pipe, write_pipe; + size_t received_for{0}; + + explicit coroutine(llfio::pipe_handle &&r, llfio::pipe_handle &&w) + : read_pipe(std::move(r)) + , write_pipe(std::move(w)) { - buffer = {_buffer, sizeof(_buffer)}; - // This will never return if the coroutine gets cancelled - auto r = co_await read_pipe.co_read(io_visitor{io_pending}, {{buffer}, 0}); - if(!r) + } + llfio::eager> operator()() + { + union { + llfio::byte _buffer[sizeof(size_t)]; + size_t _index; + }; + llfio::pipe_handle::buffer_type buffer; + for(;;) { - co_return r.error(); + buffer = {_buffer, sizeof(_buffer)}; + // This will never return if the coroutine gets cancelled + auto r = co_await read_pipe.co_read({{&buffer, 1}, 0}); + if(!r) + { + co_return r.error(); + } + BOOST_CHECK(r.value().size() == 1); + BOOST_CHECK(r.value()[0].size() == sizeof(_buffer)); + received_for+=_index; } - BOOST_CHECK(r.value().size() == 1); - BOOST_CHECK(r.value()[0].size() == sizeof(_buffer)); - ++received_for; } + }; + std::vector coroutines; + for(size_t n = 0; n < MAX_PIPES; n++) + { + auto ret = llfio::pipe_handle::anonymous_pipe(llfio::pipe_handle::caching::reads, llfio::pipe_handle::flag::multiplexable).value(); + ret.first.set_multiplexer(multiplexer.get()).value(); + coroutines.push_back(coroutine(std::move(ret.first), std::move(ret.second))); } - }; - std::vector coroutines; - auto multiplexer = llfio::this_thread::multiplexer(); - for(size_t n = 0; n < MAX_PIPES; n++) - { - auto ret = llfio::pipe_handle::anonymous_pipe(llfio::pipe_handle::caching::reads, llfio::pipe_handle::flag::multiplexable).value(); - ret.first.set_multiplexer(multiplexer).value(); - coroutines.push_back(coroutine(std::move(ret.first), std::move(ret.second))); - } - // Start the coroutines, all of whom will begin a read and then suspend - std::vector>>> states(MAX_PIPES); - for(size_t n = 0; n < MAX_PIPES; n++) - { - states[n].emplace(coroutines[n](io_pending)); - } - // Write to all the pipes, then pump coroutine resumption until all completions done - for(size_t i = 0; i < 10; i++) - { - for(size_t n = MAX_PIPES - 1; n < MAX_PIPES; n--) + // Start the coroutines, all of whom will begin a read and then suspend + std::vector>>> states(MAX_PIPES); + for(size_t n = 0; n < MAX_PIPES; n++) { - coroutines[n].write_pipe.write(0, {{(llfio::byte *) &i, sizeof(i)}}).value(); + states[n].emplace(coroutines[n]()); } - // Take a copy of all pending i/o - std::vector copy(io_pending.begin(), io_pending.end()); - for(;;) + // Write to all the pipes, then pump coroutine resumption until all completions done + size_t count = 0, failures = 0; + for(size_t i = 1; i <= 10; i++) { - // Manually check if an i/o completion is ready, avoiding any syscalls - bool need_to_poll = true; - for(auto it = copy.begin(); it != copy.end();) + std::cout << "\nWrite no " << i << std::endl; + for(size_t n = MAX_PIPES - 1; n < MAX_PIPES; n--) { - if((*it)->await_ready()) - { - need_to_poll = false; - it = copy.erase(it); - std::cout << "Completed an i/o without syscall" << std::endl; - } - else - ++it; + coroutines[n].write_pipe.write(0, {{(llfio::byte *) &i, sizeof(i)}}).value(); } - if(need_to_poll) + // Take a copy of all pending i/o + for(;;) { // Have the kernel tell me when an i/o completion is ready - auto r = multiplexer->complete_io(); - BOOST_CHECK(r.value() != 0); - if(r.value() < 0) + auto r = multiplexer->check_for_any_completed_io(); + if(r.value().initiated_ios_completed == 0) { - for(size_t n = 0; n < MAX_PIPES; n++) - { - BOOST_CHECK(coroutines[n].received_for == i + 1); - } break; } } + count += i; + for(size_t n = 0; n < MAX_PIPES; n++) + { + if(coroutines[n].received_for != count) + { + std::cout << "Coroutine " << n << " has count " << coroutines[n].received_for << " instead of " << count << std::endl; + failures++; + } + BOOST_CHECK(coroutines[n].received_for == count); + } + BOOST_REQUIRE(failures == 0); } - } - // Rethrow any failures - for(size_t n = 0; n < MAX_PIPES; n++) - { - if(states[n]->await_ready()) + // Rethrow any failures + for(size_t n = 0; n < MAX_PIPES; n++) { - states[n]->await_resume().value(); + if(states[n]->await_ready()) + { + states[n]->await_resume().value(); + } } - } - // Destruction of coroutines when they are suspended must work. - // This will cancel any pending i/o and immediately exit the - // coroutines - states.clear(); - // Now clear all the coroutines - coroutines.clear(); + // Destruction of coroutines when they are suspended must work. + // This will cancel any pending i/o and immediately exit the + // coroutines + states.clear(); + // Now clear all the coroutines + coroutines.clear(); + }; +#ifdef _WIN32 + std::cout << "\nSingle threaded IOCP, immediate completions:\n"; + test_multiplexer(llfio::test::multiplexer_win_iocp(1, false).value()); + std::cout << "\nSingle threaded IOCP, reactor completions:\n"; + test_multiplexer(llfio::test::multiplexer_win_iocp(1, true).value()); + std::cout << "\nMultithreaded IOCP, immediate completions:\n"; + test_multiplexer(llfio::test::multiplexer_win_iocp(2, false).value()); + std::cout << "\nMultithreaded IOCP, reactor completions:\n"; + test_multiplexer(llfio::test::multiplexer_win_iocp(2, true).value()); +#else +#error Not implemented yet +#endif } #endif #endif @@ -375,7 +371,7 @@ KERNELTEST_TEST_KERNEL(integration, llfio, pipe_handle, blocking, "Tests that bl KERNELTEST_TEST_KERNEL(integration, llfio, pipe_handle, nonblocking, "Tests that nonblocking llfio::pipe_handle works as expected", TestNonBlockingPipeHandle()) #if LLFIO_ENABLE_TEST_IO_MULTIPLEXERS KERNELTEST_TEST_KERNEL(integration, llfio, pipe_handle, multiplexed, "Tests that multiplexed llfio::pipe_handle works as expected", TestMultiplexedPipeHandle()) -#if LLFIO_ENABLE_COROUTINES && 0 +#if LLFIO_ENABLE_COROUTINES KERNELTEST_TEST_KERNEL(integration, llfio, pipe_handle, coroutined, "Tests that coroutined llfio::pipe_handle works as expected", TestCoroutinedPipeHandle()) #endif #endif -- cgit v1.2.3