Welcome to mirror list, hosted at ThFree Co, Russian Federation.

github.com/windirstat/llfio.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorNiall Douglas (s [underscore] sourceforge {at} nedprod [dot] com) <spamtrap@nedprod.com>2021-01-22 15:05:05 +0300
committerNiall Douglas (s [underscore] sourceforge {at} nedprod [dot] com) <spamtrap@nedprod.com>2021-03-16 13:21:41 +0300
commiteef0d398dcfd3b4068d6d330d0b0287102cd26c5 (patch)
treea773544ac48eaa67237e4de4ede665b7056a5256
parentc9f1aea5f393dca136de2c2cf8d20759e234cf8a (diff)
Dynamic scaling within native Linux implementation of dynamic_thread_pool_group is somewhat working now, but it's not working quite right yet.
-rw-r--r--include/llfio/revision.hpp6
-rw-r--r--include/llfio/v2.0/detail/impl/dynamic_thread_pool_group.ipp406
-rw-r--r--include/llfio/v2.0/detail/impl/posix/directory_handle.ipp57
3 files changed, 333 insertions, 136 deletions
diff --git a/include/llfio/revision.hpp b/include/llfio/revision.hpp
index 9e7774c5..3bf437f0 100644
--- a/include/llfio/revision.hpp
+++ b/include/llfio/revision.hpp
@@ -1,4 +1,4 @@
// Note the second line of this file must ALWAYS be the git SHA, third line ALWAYS the git SHA update time
-#define LLFIO_PREVIOUS_COMMIT_REF ca8673c41b5a87fb834f9365c3dfbc6b668cd260
-#define LLFIO_PREVIOUS_COMMIT_DATE "2021-01-15 11:26:17 +00:00"
-#define LLFIO_PREVIOUS_COMMIT_UNIQUE ca8673c4
+#define LLFIO_PREVIOUS_COMMIT_REF 0d62d125a2ee2404976155f104dbe109430ae0ba
+#define LLFIO_PREVIOUS_COMMIT_DATE "2021-01-19 15:43:28 +00:00"
+#define LLFIO_PREVIOUS_COMMIT_UNIQUE 0d62d125
diff --git a/include/llfio/v2.0/detail/impl/dynamic_thread_pool_group.ipp b/include/llfio/v2.0/detail/impl/dynamic_thread_pool_group.ipp
index e5c7fbb9..bc9b540c 100644
--- a/include/llfio/v2.0/detail/impl/dynamic_thread_pool_group.ipp
+++ b/include/llfio/v2.0/detail/impl/dynamic_thread_pool_group.ipp
@@ -163,9 +163,10 @@ namespace detail
size_t count{0};
threadmetrics_item *front{nullptr}, *back{nullptr};
uint32_t blocked{0}, running{0};
- } threadmetrics_queue; // items at end are least recently updated
- std::vector<threadmetrics_item *> threadmetrics_sorted;
+ } threadmetrics_queue; // items at front are least recently updated
+ std::vector<threadmetrics_item *> threadmetrics_sorted; // sorted by threadid
std::chrono::steady_clock::time_point threadmetrics_last_updated;
+ std::atomic<bool> update_threadmetrics_reentrancy{false};
#ifdef __linux__
int proc_self_task_fd{-1};
#endif
@@ -239,20 +240,236 @@ namespace detail
}
#if !LLFIO_DYNAMIC_THREAD_POOL_GROUP_USING_GCD && !defined(_WIN32)
+ inline void _execute_work(thread_t *self);
+
+ void _add_thread(_lock_guard & /*unused*/)
+ {
+ thread_t *p = nullptr;
+ try
+ {
+ p = new thread_t;
+ _append_to_list(threadpool_active, p);
+ p->thread = std::thread([this, p] { _execute_work(p); });
+ }
+ catch(...)
+ {
+ if(p != nullptr)
+ {
+ _remove_from_list(threadpool_active, p);
+ }
+ // drop failure
+ }
+ }
+
+ bool _remove_thread(_lock_guard &g, threads_t &which)
+ {
+ if(which.count == 0)
+ {
+ return false;
+ }
+ // Threads which went to sleep the longest ago are at the front
+ auto *t = which.front;
+ if(t->state < 0)
+ {
+ // He's already exiting
+ return false;
+ }
+ assert(t->state == 0);
+ t->state--;
+#if LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING
+ std::cout << "*** DTP " << t << " is told to quit" << std::endl;
+#endif
+ do
+ {
+ g.unlock();
+ t->cond.notify_one();
+ g.lock();
+ } while(t->state >= -1);
+#if LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING
+ std::cout << "*** DTP " << t << " has quit, deleting" << std::endl;
+#endif
+ _remove_from_list(threadpool_active, t);
+ t->thread.join();
+ delete t;
+ return true;
+ }
+
+ ~global_dynamic_thread_pool_impl()
+ {
+ {
+ _lock_guard g(workqueue_lock); // lock global
+ while(threadpool_active.count > 0 || threadpool_sleeping.count > 0)
+ {
+ while(threadpool_sleeping.count > 0)
+ {
+ auto removed = _remove_thread(g, threadpool_sleeping);
+ assert(removed);
+ (void) removed;
+ }
+ if(threadpool_active.count > 0)
+ {
+ auto removed = _remove_thread(g, threadpool_active);
+ assert(removed);
+ (void) removed;
+ }
+ }
+ }
+ _lock_guard g(threadmetrics_lock);
+ for(auto *p : threadmetrics_sorted)
+ {
+ delete p;
+ }
+ threadmetrics_sorted.clear();
+ threadmetrics_queue = {};
+#ifdef __linux__
+ if(proc_self_task_fd > 0)
+ {
+ ::close(proc_self_task_fd);
+ proc_self_task_fd = -1;
+ }
+#endif
+ }
+
#ifdef __linux__
- void populate_threadmetrics(std::chrono::steady_clock::time_point now)
+ bool update_threadmetrics(_lock_guard &&g, std::chrono::steady_clock::time_point now, threadmetrics_item *new_items)
+ {
+ auto update_item = [&](threadmetrics_item *item) {
+ char path[64] = "/proc/self/task/", *pend = path + 16, *tend = item->threadid.text;
+ while(*tend == '0' && (tend - item->threadid.text) < (ssize_t) sizeof(item->threadid.text))
+ {
+ ++tend;
+ }
+ while((tend - item->threadid.text) < (ssize_t) sizeof(item->threadid.text))
+ {
+ *pend++ = *tend++;
+ }
+ memcpy(pend, "/stat", 6);
+ int fd = ::open(path, O_RDONLY);
+ if(-1 == fd)
+ {
+ // Thread may have exited since we last populated
+ if(item->blocked_since == std::chrono::steady_clock::time_point())
+ {
+ threadmetrics_queue.running--;
+ threadmetrics_queue.blocked++;
+ }
+ item->blocked_since = now;
+ item->last_updated = now;
+ return;
+ }
+ char buffer[1024];
+ auto bytesread = ::read(fd, buffer, sizeof(buffer));
+ ::close(fd);
+ buffer[std::max((size_t) bytesread, sizeof(buffer) - 1)] = 0;
+ char state = 0;
+ unsigned long majflt = 0, utime = 0, stime = 0;
+ sscanf(buffer, "%*d %*s %c %*d %*d %*d %*d %*d %*u %*u %*u %lu %*u %lu %lu", &state, &majflt, &utime, &stime);
+ if(item->utime != (uint32_t) -1 || item->stime != (uint32_t) -1)
+ {
+ if(item->utime == (uint32_t) utime && item->stime == (uint32_t) stime && state != 'R')
+ {
+ // This thread made no progress since last time
+ if(item->blocked_since == std::chrono::steady_clock::time_point())
+ {
+ threadmetrics_queue.running--;
+ threadmetrics_queue.blocked++;
+ item->blocked_since = now;
+ }
+ }
+ else
+ {
+ if(item->blocked_since != std::chrono::steady_clock::time_point())
+ {
+ threadmetrics_queue.running++;
+ threadmetrics_queue.blocked--;
+ item->blocked_since = std::chrono::steady_clock::time_point();
+ }
+ }
+ }
+ std::cout << "Threadmetrics " << path << " " << state << " " << majflt << " " << utime << " " << stime << ". Previously " << item->diskfaults << " "
+ << item->utime << " " << item->stime << std::endl;
+ item->diskfaults = (uint32_t) majflt;
+ item->utime = (uint32_t) utime;
+ item->stime = (uint32_t) stime;
+ item->last_updated = now;
+ };
+ if(new_items != nullptr)
+ {
+ for(; new_items != nullptr; new_items = new_items->_next)
+ {
+ update_item(new_items);
+ }
+ return false;
+ }
+ if(threadmetrics_queue.count == 0)
+ {
+ return false;
+ }
+ size_t updated = 0;
+ while(updated++ < 10 && now - threadmetrics_queue.front->last_updated >= std::chrono::milliseconds(100))
+ {
+ auto *p = threadmetrics_queue.front;
+ update_item(p);
+ _remove_from_list(threadmetrics_queue, p);
+ _append_to_list(threadmetrics_queue, p);
+ }
+ static const auto min_hardware_concurrency = std::thread::hardware_concurrency();
+ static const auto max_hardware_concurrency = min_hardware_concurrency + (min_hardware_concurrency >> 1);
+ auto toadd = std::max((ssize_t) 0, std::min((ssize_t) min_hardware_concurrency - (ssize_t) threadmetrics_queue.running,
+ (ssize_t) total_submitted_workitems.load(std::memory_order_relaxed) -
+ (ssize_t) threadpool_threads.load(std::memory_order_relaxed)));
+ auto toremove = std::max((ssize_t) 0, (ssize_t) threadmetrics_queue.running - (ssize_t) max_hardware_concurrency);
+ // std::cout << "Threadmetrics toadd = " << (toadd - (ssize_t) threadpool_sleeping.count) << " toremove = " << toremove
+ // << " running = " << threadmetrics_queue.running << " blocked = " << threadmetrics_queue.blocked << " total = " << threadmetrics_queue.count
+ // << ". Actual active = " << threadpool_active.count << " sleeping = " << threadpool_sleeping.count
+ // << ". Current working threads = " << threadpool_threads.load(std::memory_order_relaxed)
+ // << ". Current submitted work items = " << total_submitted_workitems.load(std::memory_order_relaxed) << std::endl;
+ if(toadd > 0 || toremove > 0)
+ {
+ if(update_threadmetrics_reentrancy.exchange(true, std::memory_order_relaxed))
+ {
+ return false;
+ }
+ auto unupdate_threadmetrics_reentrancy =
+ make_scope_exit([this]() noexcept { update_threadmetrics_reentrancy.store(false, std::memory_order_relaxed); });
+ g.unlock();
+ _lock_guard gg(workqueue_lock);
+ toadd -= (ssize_t) threadpool_sleeping.count;
+ for(; toadd > 0; toadd--)
+ {
+ _add_thread(gg);
+ }
+ for(; toremove > 0 && threadpool_sleeping.count > 0; toremove--)
+ {
+ if(!_remove_thread(gg, threadpool_sleeping))
+ {
+ break;
+ }
+ }
+ if(toremove > 0 && threadpool_active.count > 1)
+ {
+ // Kill myself, but not if I'm the final thread who needs to run timers
+ return true;
+ }
+ return false;
+ }
+ return false;
+ }
+ // Returns true if calling thread is to exit
+ bool populate_threadmetrics(std::chrono::steady_clock::time_point now)
{
static thread_local std::vector<char> kernelbuffer(1024);
static thread_local std::vector<threadmetrics_threadid> threadidsbuffer(1024 / sizeof(dirent));
using getdents64_t = int (*)(int, char *, unsigned int);
static auto getdents = static_cast<getdents64_t>([](int fd, char *buf, unsigned count) -> int { return syscall(SYS_getdents64, fd, buf, count); });
using dirent = dirent64;
- int bytes = 0;
+ size_t bytes = 0;
{
_lock_guard g(threadmetrics_lock);
- if(now - threadmetrics_last_updated < std::chrono::milliseconds(100))
+ if(now - threadmetrics_last_updated < std::chrono::milliseconds(100) &&
+ threadmetrics_queue.running + threadmetrics_queue.blocked >= threadpool_threads.load(std::memory_order_relaxed))
{
- return;
+ return update_threadmetrics(std::move(g), now, nullptr);
}
if(proc_self_task_fd < 0)
{
@@ -262,18 +479,33 @@ namespace detail
posix_error().throw_exception();
}
}
- for(;;)
+ for(auto done = false; !done;)
{
if(-1 == ::lseek64(proc_self_task_fd, 0, SEEK_SET))
{
posix_error().throw_exception();
}
- bytes = getdents(proc_self_task_fd, kernelbuffer.data(), kernelbuffer.size());
- if(bytes < (int) kernelbuffer.size())
+ bytes = 0;
+ for(;;)
{
- break;
+ int _bytes = getdents(proc_self_task_fd, kernelbuffer.data() + bytes, kernelbuffer.size() - bytes);
+ // std::cout << "getdents(" << (kernelbuffer.size()-bytes) << ") returns " << _bytes << std::endl;
+ if(_bytes == 0)
+ {
+ done = true;
+ break;
+ }
+ if(_bytes == -1 && errno == EINVAL)
+ {
+ kernelbuffer.resize(kernelbuffer.size() << 1);
+ continue;
+ }
+ if(_bytes < 0)
+ {
+ posix_error().throw_exception();
+ }
+ bytes += _bytes;
}
- kernelbuffer.resize(kernelbuffer.size() << 1);
}
}
threadidsbuffer.clear();
@@ -289,12 +521,33 @@ namespace detail
break;
}
}
+ std::cout << "Parsed from /proc " << threadidsbuffer.size() << " entries." << std::endl;
std::sort(threadidsbuffer.begin(), threadidsbuffer.end());
+ threadmetrics_item *firstnewitem = nullptr;
_lock_guard g(threadmetrics_lock);
+#if 0
+ {
+ auto d_it = threadmetrics_sorted.begin();
+ auto s_it = threadidsbuffer.begin();
+ for(; d_it != threadmetrics_sorted.end() && s_it != threadidsbuffer.end(); ++d_it, ++s_it)
+ {
+ std::cout << (*d_it)->threadid_name() << " " << string_view(s_it->text, 12) << "\n";
+ }
+ for(; d_it != threadmetrics_sorted.end(); ++d_it)
+ {
+ std::cout << (*d_it)->threadid_name() << " XXXXXXXXXXXX\n";
+ }
+ for(; s_it != threadidsbuffer.end(); ++s_it)
+ {
+ std::cout << "XXXXXXXXXXXX " << string_view(s_it->text, 12) << "\n";
+ }
+ std::cout << std::flush;
+ }
+#endif
auto d_it = threadmetrics_sorted.begin();
auto s_it = threadidsbuffer.begin();
auto remove_item = [&] {
- std::cout << "Removing thread " << (*d_it)->threadid_name() << std::endl;
+ // std::cout << "Removing thread metrics for " << (*d_it)->threadid_name() << std::endl;
if((*d_it)->blocked_since != std::chrono::steady_clock::time_point())
{
threadmetrics_queue.blocked--;
@@ -307,14 +560,14 @@ namespace detail
d_it = threadmetrics_sorted.erase(d_it);
};
auto add_item = [&] {
- auto p = std::make_unique<threadmetrics_item>(*s_it++);
- if(d_it != threadmetrics_sorted.end())
- {
- ++d_it;
- }
+ auto p = std::make_unique<threadmetrics_item>(*s_it);
d_it = threadmetrics_sorted.insert(d_it, p.get());
_append_to_list(threadmetrics_queue, p.get());
- std::cout << "Adding thread " << p->threadid_name() << std::endl;
+ // std::cout << "Adding thread metrics for " << p->threadid_name() << std::endl;
+ if(firstnewitem == nullptr)
+ {
+ firstnewitem = p.get();
+ }
p.release();
threadmetrics_queue.running++;
};
@@ -346,106 +599,28 @@ namespace detail
while(s_it != threadidsbuffer.end())
{
add_item();
+ ++d_it;
+ ++s_it;
}
-#if 0
- std::cout << "Threadmetrics:";
- for(auto *p : threadmetrics_sorted)
- {
- std::cout << "\n " << p->threadid_name();
- }
- std::cout << std::endl;
-#endif
assert(threadmetrics_sorted.size() == threadidsbuffer.size());
- assert(std::is_sorted(threadmetrics_sorted.begin(), threadmetrics_sorted.end(),
- [](threadmetrics_item *a, threadmetrics_item *b) { return a->threadid < b->threadid; }));
- threadmetrics_last_updated = now;
- }
-#endif
-
- inline void _execute_work(thread_t *self);
-
- void _add_thread(_lock_guard & /*unused*/)
- {
- thread_t *p = nullptr;
- try
- {
- p = new thread_t;
- _append_to_list(threadpool_active, p);
- p->thread = std::thread([this, p] { _execute_work(p); });
- }
- catch(...)
+#if 1
+ if(!std::is_sorted(threadmetrics_sorted.begin(), threadmetrics_sorted.end(),
+ [](threadmetrics_item *a, threadmetrics_item *b) { return a->threadid < b->threadid; }))
{
- if(p != nullptr)
+ std::cout << "Threadmetrics:";
+ for(auto *p : threadmetrics_sorted)
{
- _remove_from_list(threadpool_active, p);
+ std::cout << "\n " << p->threadid_name();
}
- // drop failure
- }
- }
-
- bool _remove_thread(_lock_guard &g, threads_t &which)
- {
- if(which.count == 0)
- {
- return false;
+ std::cout << std::endl;
+ abort();
}
- // Threads which went to sleep the longest ago are at the front
- auto *t = which.front;
- assert(t->state == 0);
- t->state--;
-#if LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING
- std::cout << "*** DTP " << t << " is told to quit" << std::endl;
-#endif
- do
- {
- g.unlock();
- t->cond.notify_one();
- g.lock();
- } while(t->state >= -1);
-#if LLFIO_DYNAMIC_THREAD_POOL_GROUP_PRINTING
- std::cout << "*** DTP " << t << " has quit, deleting" << std::endl;
#endif
- _remove_from_list(threadpool_active, t);
- t->thread.join();
- delete t;
- return true;
+ assert(threadmetrics_queue.running + threadmetrics_queue.blocked == threadidsbuffer.size());
+ threadmetrics_last_updated = now;
+ return update_threadmetrics(std::move(g), now, firstnewitem);
}
-
- ~global_dynamic_thread_pool_impl()
- {
- {
- _lock_guard g(workqueue_lock); // lock global
- while(threadpool_active.count > 0 || threadpool_sleeping.count > 0)
- {
- while(threadpool_sleeping.count > 0)
- {
- auto removed = _remove_thread(g, threadpool_sleeping);
- assert(removed);
- (void) removed;
- }
- if(threadpool_active.count > 0)
- {
- auto removed = _remove_thread(g, threadpool_active);
- assert(removed);
- (void) removed;
- }
- }
- }
- _lock_guard g(threadmetrics_lock);
- for(auto *p : threadmetrics_sorted)
- {
- delete p;
- }
- threadmetrics_sorted.clear();
- threadmetrics_queue = {};
-#ifdef __linux__
- if(proc_self_task_fd > 0)
- {
- ::close(proc_self_task_fd);
- proc_self_task_fd = -1;
- }
#endif
- }
#endif
result<void> _prepare_work_item_delay(dynamic_thread_pool_group::work_item *workitem, grouph_type grouph, deadline d)
@@ -890,6 +1065,7 @@ namespace detail
else if(now - self->last_did_work >= std::chrono::minutes(1))
{
_remove_from_list(threadpool_active, self);
+ threadpool_threads.fetch_sub(1, std::memory_order_release);
self->thread.detach();
delete self;
return;
@@ -941,7 +1117,14 @@ namespace detail
// workitem->_internalworkh should be null, however workitem may also no longer exist
try
{
- populate_threadmetrics(now);
+ if(populate_threadmetrics(now))
+ {
+ _remove_from_list(threadpool_active, self);
+ threadpool_threads.fetch_sub(1, std::memory_order_release);
+ self->thread.detach();
+ delete self;
+ return;
+ }
}
catch(...)
{
@@ -1076,9 +1259,6 @@ namespace detail
if(threadpool_active.count == 0 && threadpool_sleeping.count == 0)
{
_add_thread(gg);
- _add_thread(gg);
- _add_thread(gg);
- _add_thread(gg);
}
else if(threadpool_sleeping.count > 0 && active_work_items > threadpool_active.count)
{
diff --git a/include/llfio/v2.0/detail/impl/posix/directory_handle.ipp b/include/llfio/v2.0/detail/impl/posix/directory_handle.ipp
index 9ea737c0..bd0050d6 100644
--- a/include/llfio/v2.0/detail/impl/posix/directory_handle.ipp
+++ b/include/llfio/v2.0/detail/impl/posix/directory_handle.ipp
@@ -39,7 +39,8 @@ http://www.boost.org/LICENSE_1_0.txt)
LLFIO_V2_NAMESPACE_BEGIN
-result<directory_handle> directory_handle::directory(const path_handle &base, path_view_type path, mode _mode, creation _creation, caching _caching, flag flags) noexcept
+result<directory_handle> directory_handle::directory(const path_handle &base, path_view_type path, mode _mode, creation _creation, caching _caching,
+ flag flags) noexcept
{
if(flags & flag::unlink_on_first_close)
{
@@ -314,9 +315,12 @@ result<directory_handle::buffers_type> directory_handle::read(io_request<buffers
req.buffers[0].stat.st_birthtim = to_timepoint(s.st_birthtim);
#endif
#endif
- req.buffers[0].stat.st_sparse = static_cast<unsigned int>((static_cast<handle::extent_type>(s.st_blocks) * 512) < static_cast<handle::extent_type>(s.st_size));
+ req.buffers[0].stat.st_sparse =
+ static_cast<unsigned int>((static_cast<handle::extent_type>(s.st_blocks) * 512) < static_cast<handle::extent_type>(s.st_size));
req.buffers._resize(1);
- static constexpr stat_t::want default_stat_contents = stat_t::want::dev | stat_t::want::ino | stat_t::want::type | stat_t::want::perms | stat_t::want::nlink | stat_t::want::uid | stat_t::want::gid | stat_t::want::rdev | stat_t::want::atim | stat_t::want::mtim | stat_t::want::ctim | stat_t::want::size |
+ static constexpr stat_t::want default_stat_contents = stat_t::want::dev | stat_t::want::ino | stat_t::want::type | stat_t::want::perms |
+ stat_t::want::nlink | stat_t::want::uid | stat_t::want::gid | stat_t::want::rdev |
+ stat_t::want::atim | stat_t::want::mtim | stat_t::want::ctim | stat_t::want::size |
stat_t::want::allocated | stat_t::want::blocks | stat_t::want::blksize
#ifdef HAVE_STAT_FLAGS
| stat_t::want::flags
@@ -360,8 +364,7 @@ result<directory_handle::buffers_type> directory_handle::read(io_request<buffers
}
stat_t::want default_stat_contents = stat_t::want::ino | stat_t::want::type;
dirent *buffer;
- size_t bytesavailable;
- int bytes;
+ size_t bytesavailable, bytes;
bool done = false;
do
{
@@ -383,27 +386,41 @@ result<directory_handle::buffers_type> directory_handle::read(io_request<buffers
if(-1 == ::lseek(_v.fd, 0, SEEK_SET))
return posix_error();
#endif
- bytes = getdents(_v.fd, reinterpret_cast<char *>(buffer), bytesavailable);
- if(req.kernelbuffer.empty() && bytes == -1 && EINVAL == errno)
+ bytes = 0;
+ int _bytes;
+ do
{
- size_t toallocate = req.buffers._kernel_buffer_size * 2;
- auto *mem = (char *) operator new[](toallocate, std::nothrow); // don't initialise
- if(mem == nullptr)
+ assert(bytes <= bytesavailable);
+ _bytes = getdents(_v.fd, reinterpret_cast<char *>(buffer) + bytes, bytesavailable - bytes);
+ if(_bytes == 0)
{
- return errc::not_enough_memory;
+ done = true;
+ break;
}
- req.buffers._kernel_buffer.reset();
- req.buffers._kernel_buffer = std::unique_ptr<char[]>(mem);
- req.buffers._kernel_buffer_size = toallocate;
- }
- else
- {
- if(bytes == -1)
+ if(req.kernelbuffer.empty() && _bytes == -1 && EINVAL == errno)
+ {
+ size_t toallocate = req.buffers._kernel_buffer_size * 2;
+ auto *mem = (char *) operator new[](toallocate, std::nothrow); // don't initialise
+ if(mem == nullptr)
+ {
+ return errc::not_enough_memory;
+ }
+ req.buffers._kernel_buffer.reset();
+ req.buffers._kernel_buffer = std::unique_ptr<char[]>(mem);
+ req.buffers._kernel_buffer_size = toallocate;
+ // We need to reset and do the whole thing against to ensure single shot atomicity
+ break;
+ }
+ else if(_bytes == -1)
{
return posix_error();
}
- done = true;
- }
+ else
+ {
+ assert(_bytes > 0);
+ bytes += _bytes;
+ }
+ } while(!done);
} while(!done);
if(bytes == 0)
{