From 3b1b08b521930a43210310065ddd33537f08010e Mon Sep 17 00:00:00 2001 From: "Niall Douglas (s [underscore] sourceforge {at} nedprod [dot] com)" Date: Mon, 25 Sep 2017 22:00:00 +0100 Subject: Implemented refactor of async_file_handle on POSIX. Reenabled Coroutines TS support which is now working very nicely. --- CMakeLists.txt | 2 +- include/afio/revision.hpp | 6 +- include/afio/v2.0/async_file_handle.hpp | 94 +++++++++++++++++++--- .../v2.0/detail/impl/posix/async_file_handle.ipp | 93 ++++++++++++--------- include/afio/v2.0/detail/impl/posix/io_service.ipp | 2 +- .../v2.0/detail/impl/windows/async_file_handle.ipp | 10 ++- include/afio/v2.0/io_service.hpp | 2 +- test/tests/coroutines.cpp | 15 +++- 8 files changed, 160 insertions(+), 64 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index 49bff93c..37a29431 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -47,7 +47,7 @@ int main() { return g().get(); } set(CXX_HAS_COROUTINES${iter} ${CXX_HAS_COROUTINES${iter}} PARENT_SCOPE) endfunction() include(CheckCXXSourceCompiles) -if(0 AND MSVC) +if(MSVC) CheckCXXHasCoroutines(_MSVC "/await") if(CXX_HAS_COROUTINES_MSVC) all_compile_options(PUBLIC "/await") diff --git a/include/afio/revision.hpp b/include/afio/revision.hpp index c3700b76..35944ec3 100644 --- a/include/afio/revision.hpp +++ b/include/afio/revision.hpp @@ -1,4 +1,4 @@ // Note the second line of this file must ALWAYS be the git SHA, third line ALWAYS the git SHA update time -#define AFIO_PREVIOUS_COMMIT_REF 8b00fcbe1ffff32adba517b28fe51e6052c75046 -#define AFIO_PREVIOUS_COMMIT_DATE "2017-09-24 18:37:01 +00:00" -#define AFIO_PREVIOUS_COMMIT_UNIQUE 8b00fcbe +#define AFIO_PREVIOUS_COMMIT_REF 760a6d37f6ff42a948a1487ac6b1d7deb477195e +#define AFIO_PREVIOUS_COMMIT_DATE "2017-09-25 02:38:38 +00:00" +#define AFIO_PREVIOUS_COMMIT_UNIQUE 760a6d37 diff --git a/include/afio/v2.0/async_file_handle.hpp b/include/afio/v2.0/async_file_handle.hpp index 793b79c3..682f254f 100644 --- a/include/afio/v2.0/async_file_handle.hpp +++ b/include/afio/v2.0/async_file_handle.hpp @@ -32,6 +32,15 @@ Distributed under the Boost Software License, Version 1.0. AFIO_V2_NAMESPACE_EXPORT_BEGIN +namespace detail +{ +#if __cplusplus > 201700 + template using is_invocable_r = std::is_invocable_r; +#else + template using is_invocable_r = std::true_type; +#endif +} + /*! \class async_file_handle \brief An asynchronous handle to an open something. @@ -227,9 +236,12 @@ protected: { read, write, - fsync, - dsync + fsync_sync, + dsync_sync, + fsync_async, + dsync_async }; + struct _erased_completion_handler; // Holds state for an i/o in progress. Will be subclassed with platform specific state and how to implement completion. struct _erased_io_state_type { @@ -267,6 +279,8 @@ protected: } } + //! Retrieves a pointer to the copy of the completion handler held inside the i/o state. + virtual _erased_completion_handler *erased_completion_handler() noexcept = 0; /* Called when an i/o is completed by the system, figures out whether to call invoke_completion. For Windows: @@ -316,11 +330,65 @@ protected: virtual void move(_erased_completion_handler *dest) = 0; // Invokes my completion handler virtual void operator()(_erased_io_state_type *state) = 0; + // Returns a pointer to the completion handler + virtual void *address() noexcept = 0; }; template result AFIO_HEADERS_ONLY_MEMFUNC_SPEC _begin_io(span mem, operation_t operation, io_request reqs, _erased_completion_handler &&completion, IORoutine &&ioroutine) noexcept; AFIO_HEADERS_ONLY_MEMFUNC_SPEC result _begin_io(span mem, operation_t operation, io_request reqs, _erased_completion_handler &&completion) noexcept; public: + /*! \brief Schedule a barrier to occur asynchronously. + + \note All the caveats and exclusions which apply to `barrier()` also apply here. Note that Microsoft Windows + does not support asynchronously executed barriers, and this call will fail on that operating system. + + \return Either an io_state_ptr to the i/o in progress, or an error code. + \param reqs A scatter-gather and offset request for what range to barrier. May be ignored on some platforms + which always write barrier the entire file. Supplying a default initialised reqs write barriers the entire file. + \param completion A callable to call upon i/o completion. Spec is `void(async_file_handle *, io_result &)`. + Note that buffers returned may not be buffers input, see documentation for `barrier()`. + \param wait_for_device True if you want the call to wait until data reaches storage and that storage + has acknowledged the data is physically written. Slow. + \param and_metadata True if you want the call to sync the metadata for retrieving the writes before the + barrier after a sudden power loss event. Slow. + \errors As for `barrier()`, plus `ENOMEM`. + \mallocs If mem is not set, one calloc, one free. The allocation is unavoidable due to the need to store a type + erased completion handler of unknown type and state per buffers input. + */ + AFIO_MAKE_FREE_FUNCTION + template // + AFIO_REQUIRES(detail::is_invocable_r &>::value) // + result async_barrier(io_request reqs, CompletionRoutine &&completion, bool wait_for_device = false, bool and_metadata = false, span mem = {}) noexcept + { + AFIO_LOG_FUNCTION_CALL(this); + struct completion_handler : _erased_completion_handler + { + CompletionRoutine completion; + completion_handler(CompletionRoutine c) + : completion(std::move(c)) + { + } + virtual size_t bytes() const noexcept override final { return sizeof(*this); } + virtual void move(_erased_completion_handler *_dest) override final + { + completion_handler *dest = (completion_handler *) _dest; + new(dest) completion_handler(std::move(*this)); + } + virtual void operator()(_erased_io_state_type *state) override final { completion(state->parent, state->result.write); } + virtual void *address() noexcept override final { return &completion; } + } ch{std::forward(completion)}; + operation_t operation; + if(wait_for_device && and_metadata) + operation = operation_t::fsync_sync; + else if(!wait_for_device && and_metadata) + operation = operation_t::fsync_async; + else if(wait_for_device && !and_metadata) + operation = operation_t::dsync_sync; + else if(!wait_for_device && !and_metadata) + operation = operation_t::dsync_async; + return _begin_io(mem, operation, reinterpret_cast &>(reqs), std::move(ch)); + } + /*! \brief Schedule a read to occur asynchronously. \return Either an io_state_ptr to the i/o in progress, or an error code. @@ -333,8 +401,8 @@ public: erased completion handler of unknown type and state per buffers input. */ AFIO_MAKE_FREE_FUNCTION - template // - AFIO_REQUIRES({ std::declval()(&std::declval(), std::declval>()); }) // + template // + AFIO_REQUIRES(detail::is_invocable_r &>::value) // result async_read(io_request reqs, CompletionRoutine &&completion, span mem = {}) noexcept { AFIO_LOG_FUNCTION_CALL(this); @@ -352,6 +420,7 @@ public: new(dest) completion_handler(std::move(*this)); } virtual void operator()(_erased_io_state_type *state) override final { completion(state->parent, state->result.read); } + virtual void *address() noexcept override final { return &completion; } } ch{std::forward(completion)}; return _begin_io(mem, operation_t::read, reinterpret_cast &>(reqs), std::move(ch)); } @@ -368,8 +437,8 @@ public: erased completion handler of unknown type and state per buffers input. */ AFIO_MAKE_FREE_FUNCTION - template // - AFIO_REQUIRES({ std::declval()(&std::declval(), std::declval>()); }) // + template // + AFIO_REQUIRES(detail::is_invocable_r &>::value) // result async_write(io_request reqs, CompletionRoutine &&completion, span mem = {}) noexcept { AFIO_LOG_FUNCTION_CALL(this); @@ -387,6 +456,7 @@ public: new(dest) completion_handler(std::move(*this)); } virtual void operator()(_erased_io_state_type *state) override final { completion(state->parent, state->result.write); } + virtual void *address() noexcept override final { return &completion; } } ch{std::forward(completion)}; return _begin_io(mem, operation_t::write, reqs, std::move(ch)); } @@ -421,20 +491,22 @@ public: template class awaitable { friend class async_file_handle; - io_state_ptr, BuffersType> _state; + io_state_ptr _state; + awaitable_state *_astate; - awaitable(io_state_ptr, BuffersType> state) + awaitable(io_state_ptr state) : _state(std::move(state)) + , _astate((awaitable_state *) _state->erased_completion_handler()->address()) { } public: //! Called by `co_await` to determine whether to suspend the coroutine. - bool await_ready() { return _state->completion._result.has_value(); } + bool await_ready() { return _astate->_result.has_value(); } //! Called by `co_await` to suspend the coroutine. - void await_suspend(coroutine_handle<> co) { _state->completion._suspended = co; } + void await_suspend(coroutine_handle<> co) { _astate->_suspended = co; } //! Called by `co_await` after resuming the coroutine to return a value. - io_result await_resume() { return std::move(*_state->completion._result); } + io_result await_resume() { return std::move(*_astate->_result); } }; public: diff --git a/include/afio/v2.0/detail/impl/posix/async_file_handle.ipp b/include/afio/v2.0/detail/impl/posix/async_file_handle.ipp index 52e8222c..cf8e92b5 100644 --- a/include/afio/v2.0/detail/impl/posix/async_file_handle.ipp +++ b/include/afio/v2.0/detail/impl/posix/async_file_handle.ipp @@ -32,12 +32,11 @@ Distributed under the Boost Software License, Version 1.0. AFIO_V2_NAMESPACE_BEGIN -async_file_handle::io_result async_file_handle::barrier(async_file_handle::io_request reqs, bool /*wait_for_device*/, bool and_metadata, deadline d) noexcept +async_file_handle::io_result async_file_handle::barrier(async_file_handle::io_request reqs, bool wait_for_device, bool and_metadata, deadline d) noexcept { AFIO_LOG_FUNCTION_CALL(this); optional> ret; - auto _io_state(_begin_io(and_metadata ? operation_t::fsync : operation_t::dsync, std::move(reqs), [&ret](auto *state) { ret = std::move(state->result); }, nullptr)); - OUTCOME_TRY(io_state, _io_state); + OUTCOME_TRY(io_state, async_barrier(reqs, [&ret](async_file_handle *, io_result &result) { ret = std::move(result); }, wait_for_device, and_metadata)); (void) io_state; // While i/o is not done pump i/o completion @@ -58,22 +57,24 @@ async_file_handle::io_result async_file_h return *ret; } -template -result> async_file_handle::_begin_io(async_file_handle::operation_t operation, async_file_handle::io_request reqs, CompletionRoutine &&completion, IORoutine && /*ioroutine*/) noexcept +template result async_file_handle::_begin_io(span mem, async_file_handle::operation_t operation, async_file_handle::io_request reqs, async_file_handle::_erased_completion_handler &&completion, IORoutine && /*unused*/) noexcept { // Need to keep a set of aiocbs matching the scatter-gather buffers - struct state_type : public _io_state_type + struct state_type : public _erased_io_state_type { #if AFIO_USE_POSIX_AIO struct aiocb aiocbs[1]; #else #error todo #endif - state_type(async_file_handle *_parent, operation_t _operation, CompletionRoutine &&f, size_t _items) - : _io_state_type(_parent, _operation, std::forward(f), _items) + _erased_completion_handler *completion; + state_type(async_file_handle *_parent, operation_t _operation, bool must_deallocate_self, size_t _items) + : _erased_io_state_type(_parent, _operation, must_deallocate_self, _items) + , completion(nullptr) { } - AFIO_HEADERS_ONLY_VIRTUAL_SPEC void operator()(long errcode, long bytes_transferred, void *internal_state) noexcept override final + AFIO_HEADERS_ONLY_VIRTUAL_SPEC _erased_completion_handler *erased_completion_handler() noexcept override final { return completion; } + AFIO_HEADERS_ONLY_VIRTUAL_SPEC void _system_io_completion(long errcode, long bytes_transferred, void *internal_state) noexcept override final { #if AFIO_USE_POSIX_AIO struct aiocb **_paiocb = (struct aiocb **) internal_state; @@ -83,10 +84,11 @@ result> async_fi #else #error todo #endif - if(this->result) + auto &result = this->result.write; + if(result) { if(errcode) - this->result = error_code((int) errcode, std::system_category()); + result = error_code((int) errcode, std::system_category()); else { // Figure out which i/o I am and update the buffer in question @@ -100,13 +102,13 @@ result> async_fi AFIO_LOG_FATAL(0, "file_handle::io_state::operator() called with invalid index"); std::terminate(); } - this->result.value()[idx].len = bytes_transferred; + result.value()[idx].len = bytes_transferred; } } this->parent->service()->_work_done(); // Are we done? if(!--this->items_to_go) - this->completion(this); + (*completion)(this); } AFIO_HEADERS_ONLY_VIRTUAL_SPEC ~state_type() override final { @@ -155,22 +157,36 @@ result> async_fi #endif } } + completion->~_erased_completion_handler(); } } * state; extent_type offset = reqs.offset; - size_t statelen = sizeof(state_type) + (reqs.buffers.size() - 1) * sizeof(struct aiocb), items(reqs.buffers.size()); - using return_type = io_state_ptr; + size_t statelen = sizeof(state_type) + (reqs.buffers.size() - 1) * sizeof(struct aiocb) + completion.bytes(); + if(!mem.empty() && statelen > mem.size()) + { + return std::errc::not_enough_memory; + } + size_t items(reqs.buffers.size()); #if AFIO_USE_POSIX_AIO && defined(AIO_LISTIO_MAX) if(items > AIO_LISTIO_MAX) return std::errc::invalid_argument; #endif - void *mem = ::calloc(1, statelen); - if(!mem) - return std::errc::not_enough_memory; - return_type _state((_io_state_type *) mem); - new((state = (state_type *) mem)) state_type(this, operation, std::forward(completion), items); + bool must_deallocate_self = false; + if(mem.empty()) + { + void *_mem = ::calloc(1, statelen); + if(!_mem) + return std::errc::not_enough_memory; + mem = {(char *) _mem, statelen}; + must_deallocate_self = true; + } + io_state_ptr _state((state_type *) mem.data()); + new((state = (state_type *) mem.data())) state_type(this, operation, must_deallocate_self, items); + state->completion = (_erased_completion_handler *) ((uintptr_t) state + sizeof(state_type) + (reqs.buffers.size() - 1) * sizeof(struct aiocb)); + completion.move(state->completion); + // Noexcept move the buffers from req into result - BuffersType &out = state->result.value(); + BuffersType &out = state->result.write.value(); out = std::move(reqs.buffers); for(size_t n = 0; n < items; n++) { @@ -198,10 +214,12 @@ result> async_fi case operation_t::write: aiocb->aio_lio_opcode = LIO_WRITE; break; - case operation_t::fsync: + case operation_t::fsync_async: + case operation_t::fsync_sync: aiocb->aio_lio_opcode = LIO_NOP; break; - case operation_t::dsync: + case operation_t::dsync_async: + case operation_t::dsync_sync: aiocb->aio_lio_opcode = LIO_NOP; break; } @@ -237,15 +255,17 @@ result> async_fi case operation_t::write: ret = lio_listio(LIO_NOWAIT, thislist, items, nullptr); break; - case operation_t::fsync: - case operation_t::dsync: + case operation_t::fsync_async: + case operation_t::fsync_sync: + case operation_t::dsync_async: + case operation_t::dsync_sync: for(size_t n = 0; n < items; n++) { struct aiocb *aiocb = state->aiocbs + n; #if defined(__FreeBSD__) || defined(__APPLE__) // neither of these have fdatasync() ret = aio_fsync(O_SYNC, aiocb); #else - ret = aio_fsync(operation == operation_t::dsync ? O_DSYNC : O_SYNC, aiocb); + ret = aio_fsync((operation == operation_t::dsync_async || operation == operation_t::dsync_sync) ? O_DSYNC : O_SYNC, aiocb); #endif } break; @@ -258,29 +278,24 @@ result> async_fi { service()->_aiocbsv.resize(service()->_aiocbsv.size() - items); state->items_to_go = 0; - state->result = {errno, std::system_category()}; - state->completion(state); + state->result.write = {errno, std::system_category()}; + (*state->completion)(state); return success(std::move(_state)); } service()->_work_enqueued(items); return success(std::move(_state)); } -template result> async_file_handle::async_read(async_file_handle::io_request reqs, CompletionRoutine &&completion) noexcept -{ - return _begin_io(operation_t::read, std::move(reqs), [completion = std::forward(completion)](auto *state) { completion(state->parent, state->result); }, nullptr); -} - -template result> async_file_handle::async_write(async_file_handle::io_request reqs, CompletionRoutine &&completion) noexcept +result async_file_handle::_begin_io(span mem, async_file_handle::operation_t operation, io_request reqs, async_file_handle::_erased_completion_handler &&completion) noexcept { - return _begin_io(operation_t::write, std::move(reqs), [completion = std::forward(completion)](auto *state) { completion(state->parent, state->result); }, nullptr); + return _begin_io(mem, operation, reqs, std::move(completion), nullptr); } async_file_handle::io_result async_file_handle::read(async_file_handle::io_request reqs, deadline d) noexcept { + AFIO_LOG_FUNCTION_CALL(this); optional> ret; - auto _io_state(_begin_io(operation_t::read, std::move(reqs), [&ret](auto *state) { ret = std::move(state->result); }, nullptr)); - OUTCOME_TRY(io_state, _io_state); + OUTCOME_TRY(io_state, async_read(reqs, [&ret](async_file_handle *, io_result &result) { ret = std::move(result); })); (void) io_state; // While i/o is not done pump i/o completion @@ -303,9 +318,9 @@ async_file_handle::io_result async_file_handle: async_file_handle::io_result async_file_handle::write(async_file_handle::io_request reqs, deadline d) noexcept { + AFIO_LOG_FUNCTION_CALL(this); optional> ret; - auto _io_state(_begin_io(operation_t::write, std::move(reqs), [&ret](auto *state) { ret = std::move(state->result); }, nullptr)); - OUTCOME_TRY(io_state, _io_state); + OUTCOME_TRY(io_state, async_write(reqs, [&ret](async_file_handle *, io_result &result) { ret = std::move(result); })); (void) io_state; // While i/o is not done pump i/o completion diff --git a/include/afio/v2.0/detail/impl/posix/io_service.ipp b/include/afio/v2.0/detail/impl/posix/io_service.ipp index 93111482..eae67b4f 100644 --- a/include/afio/v2.0/detail/impl/posix/io_service.ipp +++ b/include/afio/v2.0/detail/impl/posix/io_service.ipp @@ -284,7 +284,7 @@ result io_service::run_until(deadline d) noexcept // The aiocb aio_sigevent.sigev_value.sival_ptr field will point to a file_handle::_io_state_type auto io_state = (async_file_handle::_erased_io_state_type *) aiocb->aio_sigevent.sigev_value.sival_ptr; assert(io_state); - (*io_state)(errcode, ioret, &aiocb); + io_state->_system_io_completion(errcode, ioret, &aiocb); } } // Eliminate any empty holes in the quick aiocbs vector diff --git a/include/afio/v2.0/detail/impl/windows/async_file_handle.ipp b/include/afio/v2.0/detail/impl/windows/async_file_handle.ipp index 42b75fc8..b99ca114 100644 --- a/include/afio/v2.0/detail/impl/windows/async_file_handle.ipp +++ b/include/afio/v2.0/detail/impl/windows/async_file_handle.ipp @@ -45,6 +45,7 @@ template result async_file_handle::_begin_io(span return _begin_io(mem, operation, reqs, std::move(completion), ReadFileEx); case operation_t::write: return _begin_io(mem, operation, reqs, std::move(completion), WriteFileEx); - case operation_t::fsync: - case operation_t::dsync: - // TODO FIXME Implement these for Windows - return std::errc::operation_not_supported; + case operation_t::fsync_async: + case operation_t::dsync_async: + case operation_t::fsync_sync: + case operation_t::dsync_sync: + break; } return std::errc::operation_not_supported; } diff --git a/include/afio/v2.0/io_service.hpp b/include/afio/v2.0/io_service.hpp index 05c9a09e..7a87462e 100644 --- a/include/afio/v2.0/io_service.hpp +++ b/include/afio/v2.0/io_service.hpp @@ -210,7 +210,7 @@ public: if(pi) abort(); _work_done(); - while(!_posts.front().service) + while(!_posts.empty() && !_posts.front().service) _posts.pop_front(); } void _post_done(post_info *pi) diff --git a/test/tests/coroutines.cpp b/test/tests/coroutines.cpp index 61aee5a1..3a570d51 100644 --- a/test/tests/coroutines.cpp +++ b/test/tests/coroutines.cpp @@ -44,9 +44,9 @@ static inline void TestAsyncFileHandleCoroutines() // Launch 8 coroutines, each writing 4Kb of chars 0-8 to every 32Kb block auto coroutine = [&h](size_t no) -> std::future { - alignas(4096) char buffer[4096]; - memset(buffer, (int) ('0' + no), 4096); - afio::async_file_handle::const_buffer_type bt{buffer}; + std::vector> buffer(4096); + memset(buffer.data(), (int) ('0' + no), 4096); + afio::async_file_handle::const_buffer_type bt{buffer.data(), buffer.size()}; for(size_t n = 0; n < 128; n++) { // This will initiate the i/o, and suspend the coroutine until completion. @@ -92,18 +92,25 @@ static inline void TestPostSelfToRunCoroutines() #ifdef __cpp_coroutines namespace afio = AFIO_V2_NAMESPACE; afio::io_service service; + std::atomic ready(false); auto runthreadid = QUICKCPPLIB_NAMESPACE::utils::thread::this_thread_id(); auto coroutine = [&]() -> std::future { auto thisthreadid = QUICKCPPLIB_NAMESPACE::utils::thread::this_thread_id(); BOOST_CHECK(thisthreadid != runthreadid); + ready = true; co_await afio::io_service::awaitable_post_to_self(service); thisthreadid = QUICKCPPLIB_NAMESPACE::utils::thread::this_thread_id(); BOOST_CHECK(thisthreadid == runthreadid); }; auto asynch = std::async(std::launch::async, coroutine); + while(!ready) + { + std::this_thread::yield(); + } while(!service.run()) ; - asynch.get().get(); + auto r = asynch.get(); + r.get(); #endif } -- cgit v1.2.3