diff options
author | Niall Douglas (s [underscore] sourceforge {at} nedprod [dot] com) <spamtrap@nedprod.com> | 2017-09-25 05:38:34 +0300 |
---|---|---|
committer | Niall Douglas (s [underscore] sourceforge {at} nedprod [dot] com) <spamtrap@nedprod.com> | 2017-09-25 05:38:34 +0300 |
commit | 760a6d37f6ff42a948a1487ac6b1d7deb477195e (patch) | |
tree | 1dca5e85fca15b90748953d8c5f7e95a8e968e9f | |
parent | 8b00fcbe1ffff32adba517b28fe51e6052c75046 (diff) |
wip Replacing async_file_handle's i/o routines with ABI stable editions which
optionally don't allocate memory. Working on Windows, POSIX still to do.
-rw-r--r-- | cmake/tests.cmake | 1 | ||||
m--------- | doc/html | 8 | ||||
-rw-r--r-- | include/afio/revision.hpp | 6 | ||||
-rw-r--r-- | include/afio/v2.0/async_file_handle.hpp | 187 | ||||
-rw-r--r-- | include/afio/v2.0/detail/impl/windows/async_file_handle.ipp | 87 | ||||
-rw-r--r-- | include/afio/v2.0/detail/impl/windows/import.hpp | 4 | ||||
-rw-r--r-- | include/afio/v2.0/io_service.hpp | 23 | ||||
-rw-r--r-- | test/tests/async_io.cpp | 71 | ||||
-rw-r--r-- | test/tests/coroutines.cpp | 25 |
9 files changed, 271 insertions, 141 deletions
diff --git a/cmake/tests.cmake b/cmake/tests.cmake index 636119ef..11d087ac 100644 --- a/cmake/tests.cmake +++ b/cmake/tests.cmake @@ -7,6 +7,7 @@ set(afio_TESTS "test/tests/file_handle_create_close/kernel_file_handle.cpp.hpp" "test/tests/map_handle_create_close/kernel_map_handle.cpp.hpp" "test/tests/section_handle_create_close/kernel_section_handle.cpp.hpp" + "test/tests/async_io.cpp" "test/tests/coroutines.cpp" "test/tests/current_path.cpp" "test/tests/directory_handle_create_close/runner.cpp" diff --git a/doc/html b/doc/html -Subproject 3a7ffeb387ac3488c3faccbf4c9b2912a76d99f +Subproject bae3c1c2f02853d7e35e9fdc4d74c2b85fcc87d diff --git a/include/afio/revision.hpp b/include/afio/revision.hpp index cd3a5fdc..c3700b76 100644 --- a/include/afio/revision.hpp +++ b/include/afio/revision.hpp @@ -1,4 +1,4 @@ // Note the second line of this file must ALWAYS be the git SHA, third line ALWAYS the git SHA update time -#define AFIO_PREVIOUS_COMMIT_REF 9d0e4e204b7ab65d53fcbb847facd076502d8c63 -#define AFIO_PREVIOUS_COMMIT_DATE "2017-09-24 14:22:05 +00:00" -#define AFIO_PREVIOUS_COMMIT_UNIQUE 9d0e4e20 +#define AFIO_PREVIOUS_COMMIT_REF 8b00fcbe1ffff32adba517b28fe51e6052c75046 +#define AFIO_PREVIOUS_COMMIT_DATE "2017-09-24 18:37:01 +00:00" +#define AFIO_PREVIOUS_COMMIT_UNIQUE 8b00fcbe diff --git a/include/afio/v2.0/async_file_handle.hpp b/include/afio/v2.0/async_file_handle.hpp index 50cd486c..793b79c3 100644 --- a/include/afio/v2.0/async_file_handle.hpp +++ b/include/afio/v2.0/async_file_handle.hpp @@ -49,8 +49,6 @@ only use case where using async i/o makes sense given the other options below. created the owning `io_service` which MUST also be the same kernel thread as which runs the i/o service's `run()` function. -\todo Direct use of `calloc()` ought to be replaced with a user supplied STL allocator instance. - \snippet coroutines.cpp coroutines_example */ class AFIO_DECL async_file_handle : public file_handle @@ -233,32 +231,31 @@ protected: dsync }; // Holds state for an i/o in progress. Will be subclassed with platform specific state and how to implement completion. - // Note this is allocated using malloc not new to avoid memory zeroing, and therefore it has a custom deleter. struct _erased_io_state_type { + friend class io_service; async_file_handle *parent; operation_t operation; + bool must_deallocate_self; size_t items; shared_size_type items_to_go; - constexpr _erased_io_state_type(async_file_handle *_parent, operation_t _operation, size_t _items) + union result_storage { + io_result<buffers_type> read; + io_result<const_buffers_type> write; + constexpr result_storage() + : read(buffers_type()) + { + } + } result; + constexpr _erased_io_state_type(async_file_handle *_parent, operation_t _operation, bool _must_deallocate_self, size_t _items) : parent(_parent) , operation(_operation) + , must_deallocate_self(_must_deallocate_self) , items(_items) , items_to_go(0) + , result() { } - /* - For Windows: - - errcode: GetLastError() code - - bytes_transferred: obvious - - internal_state: LPOVERLAPPED for this op - - For POSIX AIO: - - errcode: errno code - - bytes_transferred: return from aio_return(), usually bytes transferred - - internal_state: address of pointer to struct aiocb in io_service's _aiocbsv - */ - virtual void operator()(long errcode, long bytes_transferred, void *internal_state) noexcept = 0; AFIO_HEADERS_ONLY_VIRTUAL_SPEC ~_erased_io_state_type() { // i/o still pending is very bad, this should never happen @@ -269,26 +266,32 @@ protected: abort(); } } - }; - // State for an i/o in progress, but with the per operation typing - template <class CompletionRoutine, class BuffersType> struct _io_state_type : public _erased_io_state_type - { - io_result<BuffersType> result; - CompletionRoutine completion; - constexpr _io_state_type(async_file_handle *_parent, operation_t _operation, CompletionRoutine &&f, size_t _items) - : _erased_io_state_type(_parent, _operation, _items) - , result(BuffersType()) - , completion(std::forward<CompletionRoutine>(f)) - { - } + + /* Called when an i/o is completed by the system, figures out whether to call invoke_completion. + + For Windows: + - errcode: GetLastError() code + - bytes_transferred: obvious + - internal_state: LPOVERLAPPED for this op + + For POSIX AIO: + - errcode: errno code + - bytes_transferred: return from aio_return(), usually bytes transferred + - internal_state: address of pointer to struct aiocb in io_service's _aiocbsv + */ + virtual void _system_io_completion(long errcode, long bytes_transferred, void *internal_state) noexcept = 0; }; struct _io_state_deleter { template <class U> void operator()(U *_ptr) const { + bool must_deallocate_self = _ptr->must_deallocate_self; _ptr->~U(); - char *ptr = (char *) _ptr; - ::free(ptr); + if(must_deallocate_self) + { + char *ptr = (char *) _ptr; + ::free(ptr); + } } }; @@ -296,51 +299,97 @@ public: /*! Smart pointer to state of an i/o in progress. Destroying this before an i/o has completed is <b>blocking</b> because the i/o must be cancelled before the destructor can safely exit. */ - using erased_io_state_ptr = std::unique_ptr<_erased_io_state_type, _io_state_deleter>; - /*! Smart pointer to state of an i/o in progress. Destroying this before an i/o has completed - is <b>blocking</b> because the i/o must be cancelled before the destructor can safely exit. - */ - template <class CompletionRoutine, class BuffersType> using io_state_ptr = std::unique_ptr<_io_state_type<CompletionRoutine, BuffersType>, _io_state_deleter>; - //! Erases the type of an io_state_ptr so it can be stored non-templated. - template <class CompletionRoutine, class BuffersType> static erased_io_state_ptr erase(io_state_ptr<CompletionRoutine, BuffersType> &&p) noexcept - { - _erased_io_state_type *_p = p.release(); - return erased_io_state_ptr(_p); - } + using io_state_ptr = std::unique_ptr<_erased_io_state_type, _io_state_deleter>; #if DOXYGEN_SHOULD_SKIP_THIS private: #else protected: #endif - template <class CompletionRoutine, class BuffersType, class IORoutine> result<io_state_ptr<CompletionRoutine, BuffersType>> _begin_io(operation_t operation, io_request<BuffersType> reqs, CompletionRoutine &&completion, IORoutine &&ioroutine) noexcept; + // Used to indirect copy and call of unknown completion handler + struct _erased_completion_handler + { + virtual ~_erased_completion_handler() {} + // Returns my size including completion handler + virtual size_t bytes() const noexcept = 0; + // Moves me and handler to some new location + virtual void move(_erased_completion_handler *dest) = 0; + // Invokes my completion handler + virtual void operator()(_erased_io_state_type *state) = 0; + }; + template <class BuffersType, class IORoutine> result<io_state_ptr> AFIO_HEADERS_ONLY_MEMFUNC_SPEC _begin_io(span<char> mem, operation_t operation, io_request<BuffersType> reqs, _erased_completion_handler &&completion, IORoutine &&ioroutine) noexcept; + AFIO_HEADERS_ONLY_MEMFUNC_SPEC result<io_state_ptr> _begin_io(span<char> mem, operation_t operation, io_request<const_buffers_type> reqs, _erased_completion_handler &&completion) noexcept; public: /*! \brief Schedule a read to occur asynchronously. \return Either an io_state_ptr to the i/o in progress, or an error code. \param reqs A scatter-gather and offset request. - \param completion A callable to call upon i/o completion. Spec is void(async_file_handle *, io_result<buffers_type> &). - Note that buffers returned may not be buffers input, see documentation for read(). - \errors As for read(), plus ENOMEM. - \mallocs One calloc, one free. The allocation is unavoidable due to the need to store a type - erased completion handler of unknown type. + \param completion A callable to call upon i/o completion. Spec is `void(async_file_handle *, io_result<buffers_type> &)`. + Note that buffers returned may not be buffers input, see documentation for `read()`. + \param mem Optional span of memory to use to avoid using `calloc()`. Note span MUST be all bits zero on entry. + \errors As for `read()`, plus `ENOMEM`. + \mallocs If mem is not set, one calloc, one free. The allocation is unavoidable due to the need to store a type + erased completion handler of unknown type and state per buffers input. */ AFIO_MAKE_FREE_FUNCTION - template <class CompletionRoutine> result<io_state_ptr<CompletionRoutine, buffers_type>> async_read(io_request<buffers_type> reqs, CompletionRoutine &&completion) noexcept; + template <class CompletionRoutine> // + AFIO_REQUIRES({ std::declval<CompletionRoutine>()(&std::declval<async_file_handle>(), std::declval<io_result<buffers_type>>()); }) // + result<io_state_ptr> async_read(io_request<buffers_type> reqs, CompletionRoutine &&completion, span<char> mem = {}) noexcept + { + AFIO_LOG_FUNCTION_CALL(this); + struct completion_handler : _erased_completion_handler + { + CompletionRoutine completion; + completion_handler(CompletionRoutine c) + : completion(std::move(c)) + { + } + virtual size_t bytes() const noexcept override final { return sizeof(*this); } + virtual void move(_erased_completion_handler *_dest) override final + { + completion_handler *dest = (completion_handler *) _dest; + new(dest) completion_handler(std::move(*this)); + } + virtual void operator()(_erased_io_state_type *state) override final { completion(state->parent, state->result.read); } + } ch{std::forward<CompletionRoutine>(completion)}; + return _begin_io(mem, operation_t::read, reinterpret_cast<io_request<const_buffers_type> &>(reqs), std::move(ch)); + } /*! \brief Schedule a write to occur asynchronously. \return Either an io_state_ptr to the i/o in progress, or an error code. \param reqs A scatter-gather and offset request. - \param completion A callable to call upon i/o completion. Spec is void(async_file_handle *, io_result<const_buffers_type> &). - Note that buffers returned may not be buffers input, see documentation for write(). - \errors As for write(), plus ENOMEM. - \mallocs One calloc, one free. The allocation is unavoidable due to the need to store a type - erased completion handler of unknown type. + \param completion A callable to call upon i/o completion. Spec is `void(async_file_handle *, io_result<const_buffers_type> &)`. + Note that buffers returned may not be buffers input, see documentation for `write()`. + \param mem Optional span of memory to use to avoid using `calloc()`. Note span MUST be all bits zero on entry. + \errors As for `write()`, plus `ENOMEM`. + \mallocs If mem in not set, one calloc, one free. The allocation is unavoidable due to the need to store a type + erased completion handler of unknown type and state per buffers input. */ AFIO_MAKE_FREE_FUNCTION - template <class CompletionRoutine> result<io_state_ptr<CompletionRoutine, const_buffers_type>> async_write(io_request<const_buffers_type> reqs, CompletionRoutine &&completion) noexcept; + template <class CompletionRoutine> // + AFIO_REQUIRES({ std::declval<CompletionRoutine>()(&std::declval<async_file_handle>(), std::declval<io_result<const_buffers_type>>()); }) // + result<io_state_ptr> async_write(io_request<const_buffers_type> reqs, CompletionRoutine &&completion, span<char> mem = {}) noexcept + { + AFIO_LOG_FUNCTION_CALL(this); + struct completion_handler : _erased_completion_handler + { + CompletionRoutine completion; + completion_handler(CompletionRoutine c) + : completion(std::move(c)) + { + } + virtual size_t bytes() const noexcept override final { return sizeof(*this); } + virtual void move(_erased_completion_handler *_dest) override final + { + completion_handler *dest = (completion_handler *) _dest; + new(dest) completion_handler(std::move(*this)); + } + virtual void operator()(_erased_io_state_type *state) override final { completion(state->parent, state->result.write); } + } ch{std::forward<CompletionRoutine>(completion)}; + return _begin_io(mem, operation_t::write, reqs, std::move(ch)); + } using file_handle::read; AFIO_HEADERS_ONLY_VIRTUAL_SPEC io_result<buffers_type> read(io_request<buffers_type> reqs, deadline d = deadline()) noexcept override; @@ -513,36 +562,6 @@ inline async_file_handle::io_result<async_file_handle::const_buffers_type> barri { return self.barrier(std::forward<decltype(reqs)>(reqs), std::forward<decltype(wait_for_device)>(wait_for_device), std::forward<decltype(and_metadata)>(and_metadata), std::forward<decltype(d)>(d)); } -/*! \brief Schedule a read to occur asynchronously. - -\return Either an io_state_ptr to the i/o in progress, or an error code. -\param self The object whose member function to call. -\param reqs A scatter-gather and offset request. -\param completion A callable to call upon i/o completion. Spec is void(async_file_handle *, io_result<buffers_type> &). -Note that buffers returned may not be buffers input, see documentation for read(). -\errors As for read(), plus ENOMEM. -\mallocs One calloc, one free. The allocation is unavoidable due to the need to store a type -erased completion handler of unknown type. -*/ -template <class CompletionRoutine> inline result<async_file_handle::io_state_ptr<CompletionRoutine, async_file_handle::buffers_type>> async_read(async_file_handle &self, async_file_handle::io_request<async_file_handle::buffers_type> reqs, CompletionRoutine &&completion) noexcept -{ - return self.async_read(std::forward<decltype(reqs)>(reqs), std::forward<decltype(completion)>(completion)); -} -/*! \brief Schedule a write to occur asynchronously. - -\return Either an io_state_ptr to the i/o in progress, or an error code. -\param self The object whose member function to call. -\param reqs A scatter-gather and offset request. -\param completion A callable to call upon i/o completion. Spec is void(async_file_handle *, io_result<const_buffers_type> &). -Note that buffers returned may not be buffers input, see documentation for write(). -\errors As for write(), plus ENOMEM. -\mallocs One calloc, one free. The allocation is unavoidable due to the need to store a type -erased completion handler of unknown type. -*/ -template <class CompletionRoutine> inline result<async_file_handle::io_state_ptr<CompletionRoutine, async_file_handle::const_buffers_type>> async_write(async_file_handle &self, async_file_handle::io_request<async_file_handle::const_buffers_type> reqs, CompletionRoutine &&completion) noexcept -{ - return self.async_write(std::forward<decltype(reqs)>(reqs), std::forward<decltype(completion)>(completion)); -} #if defined(__cpp_coroutines) || defined(DOXYGEN_IS_IN_THE_HOUSE) /*! \brief Schedule a read to occur asynchronously. diff --git a/include/afio/v2.0/detail/impl/windows/async_file_handle.ipp b/include/afio/v2.0/detail/impl/windows/async_file_handle.ipp index 59d4b397..42b75fc8 100644 --- a/include/afio/v2.0/detail/impl/windows/async_file_handle.ipp +++ b/include/afio/v2.0/detail/impl/windows/async_file_handle.ipp @@ -33,25 +33,27 @@ async_file_handle::io_result<async_file_handle::const_buffers_type> async_file_h return file_handle::barrier(std::move(reqs), wait_for_device, and_metadata, std::move(d)); } -template <class CompletionRoutine, class BuffersType, class IORoutine> -result<async_file_handle::io_state_ptr<CompletionRoutine, BuffersType>> async_file_handle::_begin_io(async_file_handle::operation_t operation, async_file_handle::io_request<BuffersType> reqs, CompletionRoutine &&completion, IORoutine &&ioroutine) noexcept +template <class BuffersType, class IORoutine> result<async_file_handle::io_state_ptr> async_file_handle::_begin_io(span<char> mem, async_file_handle::operation_t operation, async_file_handle::io_request<BuffersType> reqs, async_file_handle::_erased_completion_handler &&completion, IORoutine &&ioroutine) noexcept { // Need to keep a set of OVERLAPPED matching the scatter-gather buffers - struct state_type : public _io_state_type<CompletionRoutine, BuffersType> + struct state_type : public _erased_io_state_type { OVERLAPPED ols[1]; - state_type(async_file_handle *_parent, operation_t _operation, CompletionRoutine &&f, size_t _items) - : _io_state_type<CompletionRoutine, BuffersType>(_parent, _operation, std::forward<CompletionRoutine>(f), _items) + _erased_completion_handler *completion; + state_type(async_file_handle *_parent, operation_t _operation, bool must_deallocate_self, size_t _items) + : _erased_io_state_type(_parent, _operation, must_deallocate_self, _items) + , completion(nullptr) { } - AFIO_HEADERS_ONLY_VIRTUAL_SPEC void operator()(long errcode, long bytes_transferred, void *internal_state) noexcept override final + AFIO_HEADERS_ONLY_VIRTUAL_SPEC void _system_io_completion(long errcode, long bytes_transferred, void *internal_state) noexcept override final { LPOVERLAPPED ol = (LPOVERLAPPED) internal_state; ol->hEvent = nullptr; - if(this->result) + auto &result = this->result.write; + if(result) { if(errcode) - this->result = error_code{errcode, std::system_category()}; + result = error_code{errcode, std::system_category()}; else { // Figure out which i/o I am and update the buffer in question @@ -61,13 +63,13 @@ result<async_file_handle::io_state_ptr<CompletionRoutine, BuffersType>> async_fi AFIO_LOG_FATAL(0, "async_file_handle::io_state::operator() called with invalid index"); std::terminate(); } - this->result.value()[idx].len = bytes_transferred; + result.value()[idx].len = bytes_transferred; } } this->parent->service()->_work_done(); // Are we done? if(!--this->items_to_go) - this->completion(this); + (*completion)(this); } AFIO_HEADERS_ONLY_VIRTUAL_SPEC ~state_type() override final { @@ -99,20 +101,33 @@ result<async_file_handle::io_state_ptr<CompletionRoutine, BuffersType>> async_fi #endif } } + completion->~_erased_completion_handler(); } } * state; extent_type offset = reqs.offset; - size_t statelen = sizeof(state_type) + (reqs.buffers.size() - 1) * sizeof(OVERLAPPED), items(reqs.buffers.size()); - using return_type = io_state_ptr<CompletionRoutine, BuffersType>; + size_t statelen = sizeof(state_type) + (reqs.buffers.size() - 1) * sizeof(OVERLAPPED) + completion.bytes(); + if(!mem.empty() && statelen > mem.size()) + { + return std::errc::not_enough_memory; + } + size_t items(reqs.buffers.size()); // On Windows i/o must be scheduled on the same thread pumping completion if(GetCurrentThreadId() != service()->_threadid) return std::errc::operation_not_supported; - void *mem = ::calloc(1, statelen); - if(!mem) - return std::errc::not_enough_memory; - return_type _state((_io_state_type<CompletionRoutine, BuffersType> *) mem); - new((state = (state_type *) mem)) state_type(this, operation, std::forward<CompletionRoutine>(completion), items); + bool must_deallocate_self = false; + if(mem.empty()) + { + void *_mem = ::calloc(1, statelen); + if(!_mem) + return std::errc::not_enough_memory; + mem = {(char *) _mem, statelen}; + must_deallocate_self = true; + } + io_state_ptr _state((state_type *) mem.data()); + new((state = (state_type *) mem.data())) state_type(this, operation, must_deallocate_self, items); + state->completion = (_erased_completion_handler *) ((uintptr_t) state + sizeof(state_type) + (reqs.buffers.size() - 1) * sizeof(OVERLAPPED)); + completion.move(state->completion); // To be called once each buffer is read struct handle_completion @@ -120,11 +135,11 @@ result<async_file_handle::io_state_ptr<CompletionRoutine, BuffersType>> async_fi static VOID CALLBACK Do(DWORD errcode, DWORD bytes_transferred, LPOVERLAPPED ol) { state_type *state = (state_type *) ol->hEvent; - (*state)(errcode, bytes_transferred, ol); + state->_system_io_completion(errcode, bytes_transferred, ol); } }; // Noexcept move the buffers from req into result - BuffersType &out = state->result.value(); + auto &out = state->result.write.value(); out = std::move(reqs.buffers); for(size_t n = 0; n < items; n++) { @@ -154,13 +169,13 @@ result<async_file_handle::io_state_ptr<CompletionRoutine, BuffersType>> async_fi assert((out[n].len & 511) == 0); } #endif - if(!ioroutine(_v.h, out[n].data, (DWORD) out[n].len, ol, handle_completion::Do)) + if(!ioroutine(_v.h, (char *) out[n].data, (DWORD) out[n].len, ol, handle_completion::Do)) { --state->items_to_go; - state->result = {GetLastError(), std::system_category()}; + state->result.write = {GetLastError(), std::system_category()}; // Fire completion now if we didn't schedule anything if(!n) - state->completion(state); + (*state->completion)(state); return _state; } service()->_work_enqueued(); @@ -168,24 +183,27 @@ result<async_file_handle::io_state_ptr<CompletionRoutine, BuffersType>> async_fi return _state; } -template <class CompletionRoutine> result<async_file_handle::io_state_ptr<CompletionRoutine, async_file_handle::buffers_type>> async_file_handle::async_read(async_file_handle::io_request<async_file_handle::buffers_type> reqs, CompletionRoutine &&completion) noexcept -{ - AFIO_LOG_FUNCTION_CALL(this); - return _begin_io(operation_t::read, std::move(reqs), [completion = std::forward<CompletionRoutine>(completion)](auto *state) { completion(state->parent, state->result); }, ReadFileEx); -} - -template <class CompletionRoutine> result<async_file_handle::io_state_ptr<CompletionRoutine, async_file_handle::const_buffers_type>> async_file_handle::async_write(async_file_handle::io_request<async_file_handle::const_buffers_type> reqs, CompletionRoutine &&completion) noexcept +result<async_file_handle::io_state_ptr> async_file_handle::_begin_io(span<char> mem, async_file_handle::operation_t operation, io_request<const_buffers_type> reqs, async_file_handle::_erased_completion_handler &&completion) noexcept { - AFIO_LOG_FUNCTION_CALL(this); - return _begin_io(operation_t::write, std::move(reqs), [completion = std::forward<CompletionRoutine>(completion)](auto *state) { completion(state->parent, state->result); }, WriteFileEx); + switch(operation) + { + case operation_t::read: + return _begin_io(mem, operation, reqs, std::move(completion), ReadFileEx); + case operation_t::write: + return _begin_io(mem, operation, reqs, std::move(completion), WriteFileEx); + case operation_t::fsync: + case operation_t::dsync: + // TODO FIXME Implement these for Windows + return std::errc::operation_not_supported; + } + return std::errc::operation_not_supported; } async_file_handle::io_result<async_file_handle::buffers_type> async_file_handle::read(async_file_handle::io_request<async_file_handle::buffers_type> reqs, deadline d) noexcept { AFIO_LOG_FUNCTION_CALL(this); optional<io_result<buffers_type>> ret; - auto _io_state(_begin_io(operation_t::read, std::move(reqs), [&ret](auto *state) { ret = std::move(state->result); }, ReadFileEx)); - OUTCOME_TRY(io_state, _io_state); + OUTCOME_TRY(io_state, async_read(reqs, [&ret](async_file_handle *, io_result<buffers_type> &result) { ret = std::move(result); })); (void) io_state; // holds i/o open until it completes // While i/o is not done pump i/o completion @@ -210,8 +228,7 @@ async_file_handle::io_result<async_file_handle::const_buffers_type> async_file_h { AFIO_LOG_FUNCTION_CALL(this); optional<io_result<const_buffers_type>> ret; - auto _io_state(_begin_io(operation_t::write, std::move(reqs), [&ret](auto *state) { ret = std::move(state->result); }, WriteFileEx)); - OUTCOME_TRY(io_state, _io_state); + OUTCOME_TRY(io_state, async_write(reqs, [&ret](async_file_handle *, io_result<const_buffers_type> &result) { ret = std::move(result); })); (void) io_state; // holds i/o open until it completes // While i/o is not done pump i/o completion diff --git a/include/afio/v2.0/detail/impl/windows/import.hpp b/include/afio/v2.0/detail/impl/windows/import.hpp index b0f65cfb..f7b2fb5f 100644 --- a/include/afio/v2.0/detail/impl/windows/import.hpp +++ b/include/afio/v2.0/detail/impl/windows/import.hpp @@ -967,11 +967,13 @@ inline bool ntsleep(const deadline &d, bool return_on_alert = false) noexcept windows_nt_kernel::init(); using namespace windows_nt_kernel; AFIO_WIN_DEADLINE_TO_SLEEP_INIT(d); + alignas(8) LARGE_INTEGER infinity; + infinity.QuadPart = INT64_MIN; for(;;) { AFIO_WIN_DEADLINE_TO_SLEEP_LOOP(d); // Pump alerts and APCs - NTSTATUS ntstat = NtDelayExecution(true, timeout); + NTSTATUS ntstat = NtDelayExecution(true, timeout ? timeout : &infinity); (void) ntstat; if((d).steady) { diff --git a/include/afio/v2.0/io_service.hpp b/include/afio/v2.0/io_service.hpp index ff9747ae..05c9a09e 100644 --- a/include/afio/v2.0/io_service.hpp +++ b/include/afio/v2.0/io_service.hpp @@ -280,11 +280,20 @@ public: template <class U> void post(U &&f) { _post(detail::make_function_ptr<void(io_service *)>(std::forward<U>(f))); } #if defined(__cpp_coroutines) || defined(DOXYGEN_IS_IN_THE_HOUSE) -private: - struct _post_to_self_awaitable + /*! An awaitable suspending execution of this coroutine on the current kernel thread, + and resuming execution on the kernel thread running this i/o service. This is a + convenience wrapper for `post()`. + */ + struct awaitable_post_to_self { io_service *service; + //! Constructor, takes the i/o service whose kernel thread we are to reschedule onto + awaitable_post_to_self(io_service &_service) + : service(&_service) + { + } + bool await_ready() { return false; } void await_suspend(coroutine_handle<> co) { @@ -292,16 +301,6 @@ private: } void await_resume() {} }; - -public: - /*! Suspend execution of this coroutine on this kernel thread, and resume execution on - the kernel thread running this i/o service. This is a convenience wrapper for `post()`. - */ - void co_post_self_to_run() - { - // Suspend on this kernel thread, resume on run() thread - co_await _post_to_self_awaitable{this}; - } #endif }; diff --git a/test/tests/async_io.cpp b/test/tests/async_io.cpp new file mode 100644 index 00000000..a36c9a7f --- /dev/null +++ b/test/tests/async_io.cpp @@ -0,0 +1,71 @@ +/* Integration test kernel for async i/o +(C) 2017 Niall Douglas <http://www.nedproductions.biz/> (2 commits) +File Created: Sept 2016 + + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License in the accompanying file +Licence.txt or at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + + +Distributed under the Boost Software License, Version 1.0. + (See accompanying file Licence.txt or copy at + http://www.boost.org/LICENSE_1_0.txt) +*/ + +#include "../../include/afio/afio.hpp" +#include "kerneltest/include/kerneltest.hpp" + +#include <future> + +static inline void TestAsyncFileHandle() +{ + namespace afio = AFIO_V2_NAMESPACE; + afio::io_service service; + afio::async_file_handle h = afio::async_file_handle::async_file(service, {}, "temp", afio::file_handle::mode::write, afio::file_handle::creation::if_needed, afio::file_handle::caching::only_metadata, afio::file_handle::flag::unlink_on_close).value(); + std::vector<std::pair<std::future<afio::async_file_handle::const_buffers_type>, afio::async_file_handle::io_state_ptr>> futures; + futures.reserve(1024); + h.truncate(1024 * 4096).value(); + alignas(4096) char buffer[4096]; + memset(buffer, (int) ('0'), 4096); + afio::async_file_handle::const_buffer_type bt{buffer, sizeof(buffer)}; + for(size_t n = 0; n < 1024; n++) + { + std::promise<afio::async_file_handle::const_buffers_type> p; + auto f(p.get_future()); + auto g(h + .async_write({bt, n * 4096}, [ p = std::move(p), n ](afio::async_file_handle *, afio::async_file_handle::io_result<afio::async_file_handle::const_buffers_type> & result) mutable { + try + { + p.set_value(std::move(result).value()); + // std::cout << "Written block " << n << " successfully" << std::endl; + } + catch(...) + { + p.set_exception(std::current_exception()); + // std::cout << "Written block " << n << " unsuccessfully" << std::endl; + } + }) + .value()); + futures.push_back({std::move(f), std::move(g)}); + } + // Pump the i/o until no more work remains. + while(service.run().value()) + ; + // Make sure nothing went wrong by fetching the futures. + for(auto &i : futures) + { + BOOST_CHECK(i.first.get().data()->len == 4096); + } +} + +KERNELTEST_TEST_KERNEL(integration, afio, works, async_file_handle, "Tests that afio::async_file_handle works as expected", TestAsyncFileHandle()) diff --git a/test/tests/coroutines.cpp b/test/tests/coroutines.cpp index fe23cef0..61aee5a1 100644 --- a/test/tests/coroutines.cpp +++ b/test/tests/coroutines.cpp @@ -33,9 +33,9 @@ static inline void TestAsyncFileHandleCoroutines() //! [coroutines_example] namespace afio = AFIO_V2_NAMESPACE; - // Create an i/o service for this thread + // Create an i/o service for this thread afio::io_service service; - + // Create an async file i/o handle attached to the i/o service for this thread afio::async_file_handle h = afio::async_file_handle::async_file(service, {}, "temp", afio::file_handle::mode::write, afio::file_handle::creation::if_needed, afio::file_handle::caching::only_metadata, afio::file_handle::flag::unlink_on_close).value(); @@ -87,4 +87,25 @@ static inline void TestAsyncFileHandleCoroutines() #endif } +static inline void TestPostSelfToRunCoroutines() +{ +#ifdef __cpp_coroutines + namespace afio = AFIO_V2_NAMESPACE; + afio::io_service service; + auto runthreadid = QUICKCPPLIB_NAMESPACE::utils::thread::this_thread_id(); + auto coroutine = [&]() -> std::future<void> { + auto thisthreadid = QUICKCPPLIB_NAMESPACE::utils::thread::this_thread_id(); + BOOST_CHECK(thisthreadid != runthreadid); + co_await afio::io_service::awaitable_post_to_self(service); + thisthreadid = QUICKCPPLIB_NAMESPACE::utils::thread::this_thread_id(); + BOOST_CHECK(thisthreadid == runthreadid); + }; + auto asynch = std::async(std::launch::async, coroutine); + while(!service.run()) + ; + asynch.get().get(); +#endif +} + KERNELTEST_TEST_KERNEL(integration, afio, coroutines, async_file_handle, "Tests that afio::async_file_handle works as expected with Coroutines", TestAsyncFileHandleCoroutines()) +KERNELTEST_TEST_KERNEL(integration, afio, coroutines, co_post_self_to_run, "Tests that afio::io_service::co_post_self_to_run() works as expected with Coroutines", TestPostSelfToRunCoroutines()) |