Welcome to mirror list, hosted at ThFree Co, Russian Federation.

github.com/windirstat/llfio.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorNiall Douglas (s [underscore] sourceforge {at} nedprod [dot] com) <spamtrap@nedprod.com>2017-09-25 05:38:34 +0300
committerNiall Douglas (s [underscore] sourceforge {at} nedprod [dot] com) <spamtrap@nedprod.com>2017-09-25 05:38:34 +0300
commit760a6d37f6ff42a948a1487ac6b1d7deb477195e (patch)
tree1dca5e85fca15b90748953d8c5f7e95a8e968e9f
parent8b00fcbe1ffff32adba517b28fe51e6052c75046 (diff)
wip Replacing async_file_handle's i/o routines with ABI stable editions which
optionally don't allocate memory. Working on Windows, POSIX still to do.
-rw-r--r--cmake/tests.cmake1
m---------doc/html8
-rw-r--r--include/afio/revision.hpp6
-rw-r--r--include/afio/v2.0/async_file_handle.hpp187
-rw-r--r--include/afio/v2.0/detail/impl/windows/async_file_handle.ipp87
-rw-r--r--include/afio/v2.0/detail/impl/windows/import.hpp4
-rw-r--r--include/afio/v2.0/io_service.hpp23
-rw-r--r--test/tests/async_io.cpp71
-rw-r--r--test/tests/coroutines.cpp25
9 files changed, 271 insertions, 141 deletions
diff --git a/cmake/tests.cmake b/cmake/tests.cmake
index 636119ef..11d087ac 100644
--- a/cmake/tests.cmake
+++ b/cmake/tests.cmake
@@ -7,6 +7,7 @@ set(afio_TESTS
"test/tests/file_handle_create_close/kernel_file_handle.cpp.hpp"
"test/tests/map_handle_create_close/kernel_map_handle.cpp.hpp"
"test/tests/section_handle_create_close/kernel_section_handle.cpp.hpp"
+ "test/tests/async_io.cpp"
"test/tests/coroutines.cpp"
"test/tests/current_path.cpp"
"test/tests/directory_handle_create_close/runner.cpp"
diff --git a/doc/html b/doc/html
-Subproject 3a7ffeb387ac3488c3faccbf4c9b2912a76d99f
+Subproject bae3c1c2f02853d7e35e9fdc4d74c2b85fcc87d
diff --git a/include/afio/revision.hpp b/include/afio/revision.hpp
index cd3a5fdc..c3700b76 100644
--- a/include/afio/revision.hpp
+++ b/include/afio/revision.hpp
@@ -1,4 +1,4 @@
// Note the second line of this file must ALWAYS be the git SHA, third line ALWAYS the git SHA update time
-#define AFIO_PREVIOUS_COMMIT_REF 9d0e4e204b7ab65d53fcbb847facd076502d8c63
-#define AFIO_PREVIOUS_COMMIT_DATE "2017-09-24 14:22:05 +00:00"
-#define AFIO_PREVIOUS_COMMIT_UNIQUE 9d0e4e20
+#define AFIO_PREVIOUS_COMMIT_REF 8b00fcbe1ffff32adba517b28fe51e6052c75046
+#define AFIO_PREVIOUS_COMMIT_DATE "2017-09-24 18:37:01 +00:00"
+#define AFIO_PREVIOUS_COMMIT_UNIQUE 8b00fcbe
diff --git a/include/afio/v2.0/async_file_handle.hpp b/include/afio/v2.0/async_file_handle.hpp
index 50cd486c..793b79c3 100644
--- a/include/afio/v2.0/async_file_handle.hpp
+++ b/include/afio/v2.0/async_file_handle.hpp
@@ -49,8 +49,6 @@ only use case where using async i/o makes sense given the other options below.
created the owning `io_service` which MUST also be the same kernel thread as which
runs the i/o service's `run()` function.
-\todo Direct use of `calloc()` ought to be replaced with a user supplied STL allocator instance.
-
\snippet coroutines.cpp coroutines_example
*/
class AFIO_DECL async_file_handle : public file_handle
@@ -233,32 +231,31 @@ protected:
dsync
};
// Holds state for an i/o in progress. Will be subclassed with platform specific state and how to implement completion.
- // Note this is allocated using malloc not new to avoid memory zeroing, and therefore it has a custom deleter.
struct _erased_io_state_type
{
+ friend class io_service;
async_file_handle *parent;
operation_t operation;
+ bool must_deallocate_self;
size_t items;
shared_size_type items_to_go;
- constexpr _erased_io_state_type(async_file_handle *_parent, operation_t _operation, size_t _items)
+ union result_storage {
+ io_result<buffers_type> read;
+ io_result<const_buffers_type> write;
+ constexpr result_storage()
+ : read(buffers_type())
+ {
+ }
+ } result;
+ constexpr _erased_io_state_type(async_file_handle *_parent, operation_t _operation, bool _must_deallocate_self, size_t _items)
: parent(_parent)
, operation(_operation)
+ , must_deallocate_self(_must_deallocate_self)
, items(_items)
, items_to_go(0)
+ , result()
{
}
- /*
- For Windows:
- - errcode: GetLastError() code
- - bytes_transferred: obvious
- - internal_state: LPOVERLAPPED for this op
-
- For POSIX AIO:
- - errcode: errno code
- - bytes_transferred: return from aio_return(), usually bytes transferred
- - internal_state: address of pointer to struct aiocb in io_service's _aiocbsv
- */
- virtual void operator()(long errcode, long bytes_transferred, void *internal_state) noexcept = 0;
AFIO_HEADERS_ONLY_VIRTUAL_SPEC ~_erased_io_state_type()
{
// i/o still pending is very bad, this should never happen
@@ -269,26 +266,32 @@ protected:
abort();
}
}
- };
- // State for an i/o in progress, but with the per operation typing
- template <class CompletionRoutine, class BuffersType> struct _io_state_type : public _erased_io_state_type
- {
- io_result<BuffersType> result;
- CompletionRoutine completion;
- constexpr _io_state_type(async_file_handle *_parent, operation_t _operation, CompletionRoutine &&f, size_t _items)
- : _erased_io_state_type(_parent, _operation, _items)
- , result(BuffersType())
- , completion(std::forward<CompletionRoutine>(f))
- {
- }
+
+ /* Called when an i/o is completed by the system, figures out whether to call invoke_completion.
+
+ For Windows:
+ - errcode: GetLastError() code
+ - bytes_transferred: obvious
+ - internal_state: LPOVERLAPPED for this op
+
+ For POSIX AIO:
+ - errcode: errno code
+ - bytes_transferred: return from aio_return(), usually bytes transferred
+ - internal_state: address of pointer to struct aiocb in io_service's _aiocbsv
+ */
+ virtual void _system_io_completion(long errcode, long bytes_transferred, void *internal_state) noexcept = 0;
};
struct _io_state_deleter
{
template <class U> void operator()(U *_ptr) const
{
+ bool must_deallocate_self = _ptr->must_deallocate_self;
_ptr->~U();
- char *ptr = (char *) _ptr;
- ::free(ptr);
+ if(must_deallocate_self)
+ {
+ char *ptr = (char *) _ptr;
+ ::free(ptr);
+ }
}
};
@@ -296,51 +299,97 @@ public:
/*! Smart pointer to state of an i/o in progress. Destroying this before an i/o has completed
is <b>blocking</b> because the i/o must be cancelled before the destructor can safely exit.
*/
- using erased_io_state_ptr = std::unique_ptr<_erased_io_state_type, _io_state_deleter>;
- /*! Smart pointer to state of an i/o in progress. Destroying this before an i/o has completed
- is <b>blocking</b> because the i/o must be cancelled before the destructor can safely exit.
- */
- template <class CompletionRoutine, class BuffersType> using io_state_ptr = std::unique_ptr<_io_state_type<CompletionRoutine, BuffersType>, _io_state_deleter>;
- //! Erases the type of an io_state_ptr so it can be stored non-templated.
- template <class CompletionRoutine, class BuffersType> static erased_io_state_ptr erase(io_state_ptr<CompletionRoutine, BuffersType> &&p) noexcept
- {
- _erased_io_state_type *_p = p.release();
- return erased_io_state_ptr(_p);
- }
+ using io_state_ptr = std::unique_ptr<_erased_io_state_type, _io_state_deleter>;
#if DOXYGEN_SHOULD_SKIP_THIS
private:
#else
protected:
#endif
- template <class CompletionRoutine, class BuffersType, class IORoutine> result<io_state_ptr<CompletionRoutine, BuffersType>> _begin_io(operation_t operation, io_request<BuffersType> reqs, CompletionRoutine &&completion, IORoutine &&ioroutine) noexcept;
+ // Used to indirect copy and call of unknown completion handler
+ struct _erased_completion_handler
+ {
+ virtual ~_erased_completion_handler() {}
+ // Returns my size including completion handler
+ virtual size_t bytes() const noexcept = 0;
+ // Moves me and handler to some new location
+ virtual void move(_erased_completion_handler *dest) = 0;
+ // Invokes my completion handler
+ virtual void operator()(_erased_io_state_type *state) = 0;
+ };
+ template <class BuffersType, class IORoutine> result<io_state_ptr> AFIO_HEADERS_ONLY_MEMFUNC_SPEC _begin_io(span<char> mem, operation_t operation, io_request<BuffersType> reqs, _erased_completion_handler &&completion, IORoutine &&ioroutine) noexcept;
+ AFIO_HEADERS_ONLY_MEMFUNC_SPEC result<io_state_ptr> _begin_io(span<char> mem, operation_t operation, io_request<const_buffers_type> reqs, _erased_completion_handler &&completion) noexcept;
public:
/*! \brief Schedule a read to occur asynchronously.
\return Either an io_state_ptr to the i/o in progress, or an error code.
\param reqs A scatter-gather and offset request.
- \param completion A callable to call upon i/o completion. Spec is void(async_file_handle *, io_result<buffers_type> &).
- Note that buffers returned may not be buffers input, see documentation for read().
- \errors As for read(), plus ENOMEM.
- \mallocs One calloc, one free. The allocation is unavoidable due to the need to store a type
- erased completion handler of unknown type.
+ \param completion A callable to call upon i/o completion. Spec is `void(async_file_handle *, io_result<buffers_type> &)`.
+ Note that buffers returned may not be buffers input, see documentation for `read()`.
+ \param mem Optional span of memory to use to avoid using `calloc()`. Note span MUST be all bits zero on entry.
+ \errors As for `read()`, plus `ENOMEM`.
+ \mallocs If mem is not set, one calloc, one free. The allocation is unavoidable due to the need to store a type
+ erased completion handler of unknown type and state per buffers input.
*/
AFIO_MAKE_FREE_FUNCTION
- template <class CompletionRoutine> result<io_state_ptr<CompletionRoutine, buffers_type>> async_read(io_request<buffers_type> reqs, CompletionRoutine &&completion) noexcept;
+ template <class CompletionRoutine> //
+ AFIO_REQUIRES({ std::declval<CompletionRoutine>()(&std::declval<async_file_handle>(), std::declval<io_result<buffers_type>>()); }) //
+ result<io_state_ptr> async_read(io_request<buffers_type> reqs, CompletionRoutine &&completion, span<char> mem = {}) noexcept
+ {
+ AFIO_LOG_FUNCTION_CALL(this);
+ struct completion_handler : _erased_completion_handler
+ {
+ CompletionRoutine completion;
+ completion_handler(CompletionRoutine c)
+ : completion(std::move(c))
+ {
+ }
+ virtual size_t bytes() const noexcept override final { return sizeof(*this); }
+ virtual void move(_erased_completion_handler *_dest) override final
+ {
+ completion_handler *dest = (completion_handler *) _dest;
+ new(dest) completion_handler(std::move(*this));
+ }
+ virtual void operator()(_erased_io_state_type *state) override final { completion(state->parent, state->result.read); }
+ } ch{std::forward<CompletionRoutine>(completion)};
+ return _begin_io(mem, operation_t::read, reinterpret_cast<io_request<const_buffers_type> &>(reqs), std::move(ch));
+ }
/*! \brief Schedule a write to occur asynchronously.
\return Either an io_state_ptr to the i/o in progress, or an error code.
\param reqs A scatter-gather and offset request.
- \param completion A callable to call upon i/o completion. Spec is void(async_file_handle *, io_result<const_buffers_type> &).
- Note that buffers returned may not be buffers input, see documentation for write().
- \errors As for write(), plus ENOMEM.
- \mallocs One calloc, one free. The allocation is unavoidable due to the need to store a type
- erased completion handler of unknown type.
+ \param completion A callable to call upon i/o completion. Spec is `void(async_file_handle *, io_result<const_buffers_type> &)`.
+ Note that buffers returned may not be buffers input, see documentation for `write()`.
+ \param mem Optional span of memory to use to avoid using `calloc()`. Note span MUST be all bits zero on entry.
+ \errors As for `write()`, plus `ENOMEM`.
+ \mallocs If mem in not set, one calloc, one free. The allocation is unavoidable due to the need to store a type
+ erased completion handler of unknown type and state per buffers input.
*/
AFIO_MAKE_FREE_FUNCTION
- template <class CompletionRoutine> result<io_state_ptr<CompletionRoutine, const_buffers_type>> async_write(io_request<const_buffers_type> reqs, CompletionRoutine &&completion) noexcept;
+ template <class CompletionRoutine> //
+ AFIO_REQUIRES({ std::declval<CompletionRoutine>()(&std::declval<async_file_handle>(), std::declval<io_result<const_buffers_type>>()); }) //
+ result<io_state_ptr> async_write(io_request<const_buffers_type> reqs, CompletionRoutine &&completion, span<char> mem = {}) noexcept
+ {
+ AFIO_LOG_FUNCTION_CALL(this);
+ struct completion_handler : _erased_completion_handler
+ {
+ CompletionRoutine completion;
+ completion_handler(CompletionRoutine c)
+ : completion(std::move(c))
+ {
+ }
+ virtual size_t bytes() const noexcept override final { return sizeof(*this); }
+ virtual void move(_erased_completion_handler *_dest) override final
+ {
+ completion_handler *dest = (completion_handler *) _dest;
+ new(dest) completion_handler(std::move(*this));
+ }
+ virtual void operator()(_erased_io_state_type *state) override final { completion(state->parent, state->result.write); }
+ } ch{std::forward<CompletionRoutine>(completion)};
+ return _begin_io(mem, operation_t::write, reqs, std::move(ch));
+ }
using file_handle::read;
AFIO_HEADERS_ONLY_VIRTUAL_SPEC io_result<buffers_type> read(io_request<buffers_type> reqs, deadline d = deadline()) noexcept override;
@@ -513,36 +562,6 @@ inline async_file_handle::io_result<async_file_handle::const_buffers_type> barri
{
return self.barrier(std::forward<decltype(reqs)>(reqs), std::forward<decltype(wait_for_device)>(wait_for_device), std::forward<decltype(and_metadata)>(and_metadata), std::forward<decltype(d)>(d));
}
-/*! \brief Schedule a read to occur asynchronously.
-
-\return Either an io_state_ptr to the i/o in progress, or an error code.
-\param self The object whose member function to call.
-\param reqs A scatter-gather and offset request.
-\param completion A callable to call upon i/o completion. Spec is void(async_file_handle *, io_result<buffers_type> &).
-Note that buffers returned may not be buffers input, see documentation for read().
-\errors As for read(), plus ENOMEM.
-\mallocs One calloc, one free. The allocation is unavoidable due to the need to store a type
-erased completion handler of unknown type.
-*/
-template <class CompletionRoutine> inline result<async_file_handle::io_state_ptr<CompletionRoutine, async_file_handle::buffers_type>> async_read(async_file_handle &self, async_file_handle::io_request<async_file_handle::buffers_type> reqs, CompletionRoutine &&completion) noexcept
-{
- return self.async_read(std::forward<decltype(reqs)>(reqs), std::forward<decltype(completion)>(completion));
-}
-/*! \brief Schedule a write to occur asynchronously.
-
-\return Either an io_state_ptr to the i/o in progress, or an error code.
-\param self The object whose member function to call.
-\param reqs A scatter-gather and offset request.
-\param completion A callable to call upon i/o completion. Spec is void(async_file_handle *, io_result<const_buffers_type> &).
-Note that buffers returned may not be buffers input, see documentation for write().
-\errors As for write(), plus ENOMEM.
-\mallocs One calloc, one free. The allocation is unavoidable due to the need to store a type
-erased completion handler of unknown type.
-*/
-template <class CompletionRoutine> inline result<async_file_handle::io_state_ptr<CompletionRoutine, async_file_handle::const_buffers_type>> async_write(async_file_handle &self, async_file_handle::io_request<async_file_handle::const_buffers_type> reqs, CompletionRoutine &&completion) noexcept
-{
- return self.async_write(std::forward<decltype(reqs)>(reqs), std::forward<decltype(completion)>(completion));
-}
#if defined(__cpp_coroutines) || defined(DOXYGEN_IS_IN_THE_HOUSE)
/*! \brief Schedule a read to occur asynchronously.
diff --git a/include/afio/v2.0/detail/impl/windows/async_file_handle.ipp b/include/afio/v2.0/detail/impl/windows/async_file_handle.ipp
index 59d4b397..42b75fc8 100644
--- a/include/afio/v2.0/detail/impl/windows/async_file_handle.ipp
+++ b/include/afio/v2.0/detail/impl/windows/async_file_handle.ipp
@@ -33,25 +33,27 @@ async_file_handle::io_result<async_file_handle::const_buffers_type> async_file_h
return file_handle::barrier(std::move(reqs), wait_for_device, and_metadata, std::move(d));
}
-template <class CompletionRoutine, class BuffersType, class IORoutine>
-result<async_file_handle::io_state_ptr<CompletionRoutine, BuffersType>> async_file_handle::_begin_io(async_file_handle::operation_t operation, async_file_handle::io_request<BuffersType> reqs, CompletionRoutine &&completion, IORoutine &&ioroutine) noexcept
+template <class BuffersType, class IORoutine> result<async_file_handle::io_state_ptr> async_file_handle::_begin_io(span<char> mem, async_file_handle::operation_t operation, async_file_handle::io_request<BuffersType> reqs, async_file_handle::_erased_completion_handler &&completion, IORoutine &&ioroutine) noexcept
{
// Need to keep a set of OVERLAPPED matching the scatter-gather buffers
- struct state_type : public _io_state_type<CompletionRoutine, BuffersType>
+ struct state_type : public _erased_io_state_type
{
OVERLAPPED ols[1];
- state_type(async_file_handle *_parent, operation_t _operation, CompletionRoutine &&f, size_t _items)
- : _io_state_type<CompletionRoutine, BuffersType>(_parent, _operation, std::forward<CompletionRoutine>(f), _items)
+ _erased_completion_handler *completion;
+ state_type(async_file_handle *_parent, operation_t _operation, bool must_deallocate_self, size_t _items)
+ : _erased_io_state_type(_parent, _operation, must_deallocate_self, _items)
+ , completion(nullptr)
{
}
- AFIO_HEADERS_ONLY_VIRTUAL_SPEC void operator()(long errcode, long bytes_transferred, void *internal_state) noexcept override final
+ AFIO_HEADERS_ONLY_VIRTUAL_SPEC void _system_io_completion(long errcode, long bytes_transferred, void *internal_state) noexcept override final
{
LPOVERLAPPED ol = (LPOVERLAPPED) internal_state;
ol->hEvent = nullptr;
- if(this->result)
+ auto &result = this->result.write;
+ if(result)
{
if(errcode)
- this->result = error_code{errcode, std::system_category()};
+ result = error_code{errcode, std::system_category()};
else
{
// Figure out which i/o I am and update the buffer in question
@@ -61,13 +63,13 @@ result<async_file_handle::io_state_ptr<CompletionRoutine, BuffersType>> async_fi
AFIO_LOG_FATAL(0, "async_file_handle::io_state::operator() called with invalid index");
std::terminate();
}
- this->result.value()[idx].len = bytes_transferred;
+ result.value()[idx].len = bytes_transferred;
}
}
this->parent->service()->_work_done();
// Are we done?
if(!--this->items_to_go)
- this->completion(this);
+ (*completion)(this);
}
AFIO_HEADERS_ONLY_VIRTUAL_SPEC ~state_type() override final
{
@@ -99,20 +101,33 @@ result<async_file_handle::io_state_ptr<CompletionRoutine, BuffersType>> async_fi
#endif
}
}
+ completion->~_erased_completion_handler();
}
} * state;
extent_type offset = reqs.offset;
- size_t statelen = sizeof(state_type) + (reqs.buffers.size() - 1) * sizeof(OVERLAPPED), items(reqs.buffers.size());
- using return_type = io_state_ptr<CompletionRoutine, BuffersType>;
+ size_t statelen = sizeof(state_type) + (reqs.buffers.size() - 1) * sizeof(OVERLAPPED) + completion.bytes();
+ if(!mem.empty() && statelen > mem.size())
+ {
+ return std::errc::not_enough_memory;
+ }
+ size_t items(reqs.buffers.size());
// On Windows i/o must be scheduled on the same thread pumping completion
if(GetCurrentThreadId() != service()->_threadid)
return std::errc::operation_not_supported;
- void *mem = ::calloc(1, statelen);
- if(!mem)
- return std::errc::not_enough_memory;
- return_type _state((_io_state_type<CompletionRoutine, BuffersType> *) mem);
- new((state = (state_type *) mem)) state_type(this, operation, std::forward<CompletionRoutine>(completion), items);
+ bool must_deallocate_self = false;
+ if(mem.empty())
+ {
+ void *_mem = ::calloc(1, statelen);
+ if(!_mem)
+ return std::errc::not_enough_memory;
+ mem = {(char *) _mem, statelen};
+ must_deallocate_self = true;
+ }
+ io_state_ptr _state((state_type *) mem.data());
+ new((state = (state_type *) mem.data())) state_type(this, operation, must_deallocate_self, items);
+ state->completion = (_erased_completion_handler *) ((uintptr_t) state + sizeof(state_type) + (reqs.buffers.size() - 1) * sizeof(OVERLAPPED));
+ completion.move(state->completion);
// To be called once each buffer is read
struct handle_completion
@@ -120,11 +135,11 @@ result<async_file_handle::io_state_ptr<CompletionRoutine, BuffersType>> async_fi
static VOID CALLBACK Do(DWORD errcode, DWORD bytes_transferred, LPOVERLAPPED ol)
{
state_type *state = (state_type *) ol->hEvent;
- (*state)(errcode, bytes_transferred, ol);
+ state->_system_io_completion(errcode, bytes_transferred, ol);
}
};
// Noexcept move the buffers from req into result
- BuffersType &out = state->result.value();
+ auto &out = state->result.write.value();
out = std::move(reqs.buffers);
for(size_t n = 0; n < items; n++)
{
@@ -154,13 +169,13 @@ result<async_file_handle::io_state_ptr<CompletionRoutine, BuffersType>> async_fi
assert((out[n].len & 511) == 0);
}
#endif
- if(!ioroutine(_v.h, out[n].data, (DWORD) out[n].len, ol, handle_completion::Do))
+ if(!ioroutine(_v.h, (char *) out[n].data, (DWORD) out[n].len, ol, handle_completion::Do))
{
--state->items_to_go;
- state->result = {GetLastError(), std::system_category()};
+ state->result.write = {GetLastError(), std::system_category()};
// Fire completion now if we didn't schedule anything
if(!n)
- state->completion(state);
+ (*state->completion)(state);
return _state;
}
service()->_work_enqueued();
@@ -168,24 +183,27 @@ result<async_file_handle::io_state_ptr<CompletionRoutine, BuffersType>> async_fi
return _state;
}
-template <class CompletionRoutine> result<async_file_handle::io_state_ptr<CompletionRoutine, async_file_handle::buffers_type>> async_file_handle::async_read(async_file_handle::io_request<async_file_handle::buffers_type> reqs, CompletionRoutine &&completion) noexcept
-{
- AFIO_LOG_FUNCTION_CALL(this);
- return _begin_io(operation_t::read, std::move(reqs), [completion = std::forward<CompletionRoutine>(completion)](auto *state) { completion(state->parent, state->result); }, ReadFileEx);
-}
-
-template <class CompletionRoutine> result<async_file_handle::io_state_ptr<CompletionRoutine, async_file_handle::const_buffers_type>> async_file_handle::async_write(async_file_handle::io_request<async_file_handle::const_buffers_type> reqs, CompletionRoutine &&completion) noexcept
+result<async_file_handle::io_state_ptr> async_file_handle::_begin_io(span<char> mem, async_file_handle::operation_t operation, io_request<const_buffers_type> reqs, async_file_handle::_erased_completion_handler &&completion) noexcept
{
- AFIO_LOG_FUNCTION_CALL(this);
- return _begin_io(operation_t::write, std::move(reqs), [completion = std::forward<CompletionRoutine>(completion)](auto *state) { completion(state->parent, state->result); }, WriteFileEx);
+ switch(operation)
+ {
+ case operation_t::read:
+ return _begin_io(mem, operation, reqs, std::move(completion), ReadFileEx);
+ case operation_t::write:
+ return _begin_io(mem, operation, reqs, std::move(completion), WriteFileEx);
+ case operation_t::fsync:
+ case operation_t::dsync:
+ // TODO FIXME Implement these for Windows
+ return std::errc::operation_not_supported;
+ }
+ return std::errc::operation_not_supported;
}
async_file_handle::io_result<async_file_handle::buffers_type> async_file_handle::read(async_file_handle::io_request<async_file_handle::buffers_type> reqs, deadline d) noexcept
{
AFIO_LOG_FUNCTION_CALL(this);
optional<io_result<buffers_type>> ret;
- auto _io_state(_begin_io(operation_t::read, std::move(reqs), [&ret](auto *state) { ret = std::move(state->result); }, ReadFileEx));
- OUTCOME_TRY(io_state, _io_state);
+ OUTCOME_TRY(io_state, async_read(reqs, [&ret](async_file_handle *, io_result<buffers_type> &result) { ret = std::move(result); }));
(void) io_state; // holds i/o open until it completes
// While i/o is not done pump i/o completion
@@ -210,8 +228,7 @@ async_file_handle::io_result<async_file_handle::const_buffers_type> async_file_h
{
AFIO_LOG_FUNCTION_CALL(this);
optional<io_result<const_buffers_type>> ret;
- auto _io_state(_begin_io(operation_t::write, std::move(reqs), [&ret](auto *state) { ret = std::move(state->result); }, WriteFileEx));
- OUTCOME_TRY(io_state, _io_state);
+ OUTCOME_TRY(io_state, async_write(reqs, [&ret](async_file_handle *, io_result<const_buffers_type> &result) { ret = std::move(result); }));
(void) io_state; // holds i/o open until it completes
// While i/o is not done pump i/o completion
diff --git a/include/afio/v2.0/detail/impl/windows/import.hpp b/include/afio/v2.0/detail/impl/windows/import.hpp
index b0f65cfb..f7b2fb5f 100644
--- a/include/afio/v2.0/detail/impl/windows/import.hpp
+++ b/include/afio/v2.0/detail/impl/windows/import.hpp
@@ -967,11 +967,13 @@ inline bool ntsleep(const deadline &d, bool return_on_alert = false) noexcept
windows_nt_kernel::init();
using namespace windows_nt_kernel;
AFIO_WIN_DEADLINE_TO_SLEEP_INIT(d);
+ alignas(8) LARGE_INTEGER infinity;
+ infinity.QuadPart = INT64_MIN;
for(;;)
{
AFIO_WIN_DEADLINE_TO_SLEEP_LOOP(d);
// Pump alerts and APCs
- NTSTATUS ntstat = NtDelayExecution(true, timeout);
+ NTSTATUS ntstat = NtDelayExecution(true, timeout ? timeout : &infinity);
(void) ntstat;
if((d).steady)
{
diff --git a/include/afio/v2.0/io_service.hpp b/include/afio/v2.0/io_service.hpp
index ff9747ae..05c9a09e 100644
--- a/include/afio/v2.0/io_service.hpp
+++ b/include/afio/v2.0/io_service.hpp
@@ -280,11 +280,20 @@ public:
template <class U> void post(U &&f) { _post(detail::make_function_ptr<void(io_service *)>(std::forward<U>(f))); }
#if defined(__cpp_coroutines) || defined(DOXYGEN_IS_IN_THE_HOUSE)
-private:
- struct _post_to_self_awaitable
+ /*! An awaitable suspending execution of this coroutine on the current kernel thread,
+ and resuming execution on the kernel thread running this i/o service. This is a
+ convenience wrapper for `post()`.
+ */
+ struct awaitable_post_to_self
{
io_service *service;
+ //! Constructor, takes the i/o service whose kernel thread we are to reschedule onto
+ awaitable_post_to_self(io_service &_service)
+ : service(&_service)
+ {
+ }
+
bool await_ready() { return false; }
void await_suspend(coroutine_handle<> co)
{
@@ -292,16 +301,6 @@ private:
}
void await_resume() {}
};
-
-public:
- /*! Suspend execution of this coroutine on this kernel thread, and resume execution on
- the kernel thread running this i/o service. This is a convenience wrapper for `post()`.
- */
- void co_post_self_to_run()
- {
- // Suspend on this kernel thread, resume on run() thread
- co_await _post_to_self_awaitable{this};
- }
#endif
};
diff --git a/test/tests/async_io.cpp b/test/tests/async_io.cpp
new file mode 100644
index 00000000..a36c9a7f
--- /dev/null
+++ b/test/tests/async_io.cpp
@@ -0,0 +1,71 @@
+/* Integration test kernel for async i/o
+(C) 2017 Niall Douglas <http://www.nedproductions.biz/> (2 commits)
+File Created: Sept 2016
+
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License in the accompanying file
+Licence.txt or at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+
+Distributed under the Boost Software License, Version 1.0.
+ (See accompanying file Licence.txt or copy at
+ http://www.boost.org/LICENSE_1_0.txt)
+*/
+
+#include "../../include/afio/afio.hpp"
+#include "kerneltest/include/kerneltest.hpp"
+
+#include <future>
+
+static inline void TestAsyncFileHandle()
+{
+ namespace afio = AFIO_V2_NAMESPACE;
+ afio::io_service service;
+ afio::async_file_handle h = afio::async_file_handle::async_file(service, {}, "temp", afio::file_handle::mode::write, afio::file_handle::creation::if_needed, afio::file_handle::caching::only_metadata, afio::file_handle::flag::unlink_on_close).value();
+ std::vector<std::pair<std::future<afio::async_file_handle::const_buffers_type>, afio::async_file_handle::io_state_ptr>> futures;
+ futures.reserve(1024);
+ h.truncate(1024 * 4096).value();
+ alignas(4096) char buffer[4096];
+ memset(buffer, (int) ('0'), 4096);
+ afio::async_file_handle::const_buffer_type bt{buffer, sizeof(buffer)};
+ for(size_t n = 0; n < 1024; n++)
+ {
+ std::promise<afio::async_file_handle::const_buffers_type> p;
+ auto f(p.get_future());
+ auto g(h
+ .async_write({bt, n * 4096}, [ p = std::move(p), n ](afio::async_file_handle *, afio::async_file_handle::io_result<afio::async_file_handle::const_buffers_type> & result) mutable {
+ try
+ {
+ p.set_value(std::move(result).value());
+ // std::cout << "Written block " << n << " successfully" << std::endl;
+ }
+ catch(...)
+ {
+ p.set_exception(std::current_exception());
+ // std::cout << "Written block " << n << " unsuccessfully" << std::endl;
+ }
+ })
+ .value());
+ futures.push_back({std::move(f), std::move(g)});
+ }
+ // Pump the i/o until no more work remains.
+ while(service.run().value())
+ ;
+ // Make sure nothing went wrong by fetching the futures.
+ for(auto &i : futures)
+ {
+ BOOST_CHECK(i.first.get().data()->len == 4096);
+ }
+}
+
+KERNELTEST_TEST_KERNEL(integration, afio, works, async_file_handle, "Tests that afio::async_file_handle works as expected", TestAsyncFileHandle())
diff --git a/test/tests/coroutines.cpp b/test/tests/coroutines.cpp
index fe23cef0..61aee5a1 100644
--- a/test/tests/coroutines.cpp
+++ b/test/tests/coroutines.cpp
@@ -33,9 +33,9 @@ static inline void TestAsyncFileHandleCoroutines()
//! [coroutines_example]
namespace afio = AFIO_V2_NAMESPACE;
- // Create an i/o service for this thread
+ // Create an i/o service for this thread
afio::io_service service;
-
+
// Create an async file i/o handle attached to the i/o service for this thread
afio::async_file_handle h = afio::async_file_handle::async_file(service, {}, "temp", afio::file_handle::mode::write, afio::file_handle::creation::if_needed, afio::file_handle::caching::only_metadata, afio::file_handle::flag::unlink_on_close).value();
@@ -87,4 +87,25 @@ static inline void TestAsyncFileHandleCoroutines()
#endif
}
+static inline void TestPostSelfToRunCoroutines()
+{
+#ifdef __cpp_coroutines
+ namespace afio = AFIO_V2_NAMESPACE;
+ afio::io_service service;
+ auto runthreadid = QUICKCPPLIB_NAMESPACE::utils::thread::this_thread_id();
+ auto coroutine = [&]() -> std::future<void> {
+ auto thisthreadid = QUICKCPPLIB_NAMESPACE::utils::thread::this_thread_id();
+ BOOST_CHECK(thisthreadid != runthreadid);
+ co_await afio::io_service::awaitable_post_to_self(service);
+ thisthreadid = QUICKCPPLIB_NAMESPACE::utils::thread::this_thread_id();
+ BOOST_CHECK(thisthreadid == runthreadid);
+ };
+ auto asynch = std::async(std::launch::async, coroutine);
+ while(!service.run())
+ ;
+ asynch.get().get();
+#endif
+}
+
KERNELTEST_TEST_KERNEL(integration, afio, coroutines, async_file_handle, "Tests that afio::async_file_handle works as expected with Coroutines", TestAsyncFileHandleCoroutines())
+KERNELTEST_TEST_KERNEL(integration, afio, coroutines, co_post_self_to_run, "Tests that afio::io_service::co_post_self_to_run() works as expected with Coroutines", TestPostSelfToRunCoroutines())