Welcome to mirror list, hosted at ThFree Co, Russian Federation.

github.com/windirstat/llfio.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorNiall Douglas (s [underscore] sourceforge {at} nedprod [dot] com) <spam@nowhere>2016-03-21 02:41:51 +0300
committerNiall Douglas (s [underscore] sourceforge {at} nedprod [dot] com) <spam@nowhere>2016-03-21 02:41:51 +0300
commit758a934ab266ed660daa54b72e4606b78e374071 (patch)
tree6f2fe1c5d2b8331f9319549bc6f0c3390168eb6b /attic/example
AFIO v2: Relocate all the AFIO v2 files in fs_probe into the root hierarchy. AFIO v2 is now the master branch!
Diffstat (limited to 'attic/example')
-rw-r--r--attic/example/.clang-format57
-rw-r--r--attic/example/adopt_example.cpp57
-rw-r--r--attic/example/barrier_example.cpp67
-rw-r--r--attic/example/benchmark_asio.cpp47
-rw-r--r--attic/example/benchmark_atomic_log.cpp521
-rw-r--r--attic/example/benchmark_chained1.cpp48
-rw-r--r--attic/example/benchmark_chained2.cpp43
-rw-r--r--attic/example/benchmark_latency.cpp162
-rw-r--r--attic/example/benchmark_unchained1.cpp43
-rw-r--r--attic/example/benchmark_unchained2.cpp37
-rw-r--r--attic/example/call_example.cpp25
-rw-r--r--attic/example/closure_execution_afio_io_example.cpp61
-rw-r--r--attic/example/closure_execution_traditional_io_example.cpp53
-rw-r--r--attic/example/completion_example1.cpp58
-rw-r--r--attic/example/completion_example2.cpp77
-rw-r--r--attic/example/determine_legal_filenames.cpp35
-rw-r--r--attic/example/enumerate_example.cpp43
-rw-r--r--attic/example/filecopy_example.cpp185
-rw-r--r--attic/example/filedir_example.cpp74
-rw-r--r--attic/example/filter_example.cpp36
-rw-r--r--attic/example/find_in_files_afio.cpp351
-rw-r--r--attic/example/find_in_files_iostreams.cpp109
-rw-r--r--attic/example/readallof_example.cpp54
-rw-r--r--attic/example/readwrite_example.cpp78
-rw-r--r--attic/example/readwrite_example_traditional.cpp46
-rw-r--r--attic/example/statfs_example.cpp18
-rw-r--r--attic/example/workshop_atomic_updates_afio.cpp2
-rw-r--r--attic/example/workshop_atomic_updates_afio.ipp323
-rw-r--r--attic/example/workshop_benchmark.cpp151
-rw-r--r--attic/example/workshop_final_afio.cpp2
-rw-r--r--attic/example/workshop_final_afio.ipp665
-rw-r--r--attic/example/workshop_naive.cpp2
-rw-r--r--attic/example/workshop_naive.ipp126
-rw-r--r--attic/example/workshop_naive_afio.cpp2
-rw-r--r--attic/example/workshop_naive_afio.ipp269
-rw-r--r--attic/example/workshop_naive_async_afio.cpp2
-rw-r--r--attic/example/workshop_naive_async_afio.ipp272
37 files changed, 4201 insertions, 0 deletions
diff --git a/attic/example/.clang-format b/attic/example/.clang-format
new file mode 100644
index 00000000..975edaa5
--- /dev/null
+++ b/attic/example/.clang-format
@@ -0,0 +1,57 @@
+---
+Language: Cpp
+AccessModifierOffset: -2
+ConstructorInitializerIndentWidth: 4
+AlignEscapedNewlinesLeft: false
+AlignTrailingComments: true
+AllowAllParametersOfDeclarationOnNextLine: true
+AllowShortBlocksOnASingleLine: false
+AllowShortIfStatementsOnASingleLine: false
+AllowShortLoopsOnASingleLine: false
+AllowShortFunctionsOnASingleLine: Inline
+AlwaysBreakTemplateDeclarations: false
+AlwaysBreakBeforeMultilineStrings: false
+BreakBeforeBinaryOperators: None
+BreakBeforeBraces: Allman
+BreakBeforeTernaryOperators: false
+BreakConstructorInitializersBeforeComma: true
+BinPackParameters: true
+ColumnLimit: 85
+CommentPragmas: '^!<'
+ConstructorInitializerAllOnOneLineOrOnePerLine: false
+ContinuationIndentWidth: 0
+Cpp11BracedListStyle: true
+DerivePointerAlignment: false
+DisableFormat: false
+ExperimentalAutoDetectBinPacking: false
+ForEachMacros: [ foreach, Q_FOREACH, BOOST_FOREACH ]
+IndentCaseLabels: false
+IndentFunctionDeclarationAfterType: false
+IndentWidth: 2
+IndentWrappedFunctionNames: false
+MaxEmptyLinesToKeep: 2
+KeepEmptyLinesAtTheStartOfBlocks: true
+NamespaceIndentation: All
+ObjCSpaceAfterProperty: false
+ObjCSpaceBeforeProtocolList: false
+PenaltyBreakBeforeFirstCallParameter: 19
+PenaltyBreakComment: 300
+PenaltyBreakString: 1000
+PenaltyBreakFirstLessLess: 120
+PenaltyExcessCharacter: 1000000
+PenaltyReturnTypeOnItsOwnLine: 60
+PointerAlignment: Right
+Standard: Cpp11
+SpaceAfterCStyleCast: true
+SpaceBeforeAssignmentOperators: true
+SpaceBeforeParens: Never
+SpaceInEmptyParentheses: false
+SpacesBeforeTrailingComments: 2
+SpacesInParentheses: false
+SpacesInAngles: false
+SpacesInCStyleCastParentheses: false
+SpacesInContainerLiterals: true
+TabWidth: 8
+UseTab: Never
+...
+
diff --git a/attic/example/adopt_example.cpp b/attic/example/adopt_example.cpp
new file mode 100644
index 00000000..b18e9827
--- /dev/null
+++ b/attic/example/adopt_example.cpp
@@ -0,0 +1,57 @@
+#include "afio_pch.hpp"
+
+//[adopt_example
+struct test_handle : boost::afio::handle
+{
+ test_handle(boost::afio::dispatcher *parent) :
+ boost::afio::handle(parent,
+ boost::afio::file_flags::none) {}
+ virtual void close() override final
+ {
+ // Do nothing
+ }
+ virtual handle::open_states is_open() const override final
+ {
+ return handle::open_states::open;
+ }
+ virtual void *native_handle() const override final
+ {
+ return nullptr;
+ }
+ virtual boost::afio::path path(bool refresh=false) override final
+ {
+ return boost::afio::path();
+ }
+ virtual boost::afio::path path() const override final
+ {
+ return boost::afio::path();
+ }
+ virtual boost::afio::directory_entry direntry(boost::afio::metadata_flags
+ wanted=boost::afio::directory_entry::metadata_fastpath()) override final
+ {
+ return boost::afio::directory_entry();
+ }
+ virtual boost::afio::path target() override final
+ {
+ return boost::afio::path();
+ }
+ virtual void link(const boost::afio::path_req &req) override final
+ {
+ }
+ virtual void unlink() override final
+ {
+ }
+ virtual void atomic_relink(const boost::afio::path_req &req) override final
+ {
+ }
+};
+
+int main(void)
+{
+ using namespace BOOST_AFIO_V2_NAMESPACE;
+ auto dispatcher = boost::afio::make_dispatcher().get();
+ current_dispatcher_guard h(dispatcher);
+ auto foreignh=std::make_shared<test_handle>(dispatcher.get());
+ return 0;
+}
+//]
diff --git a/attic/example/barrier_example.cpp b/attic/example/barrier_example.cpp
new file mode 100644
index 00000000..46a739f2
--- /dev/null
+++ b/attic/example/barrier_example.cpp
@@ -0,0 +1,67 @@
+#include "afio_pch.hpp"
+
+int main(void)
+{
+ //[barrier_example
+ // Assume that groups is 10,000 items long with item.first being randomly
+ // between 1 and 500. This example is adapted from the barrier() unit test.
+ //
+ // What we're going to do is this: for each item in groups, schedule item.first
+ // parallel ops and a barrier which completes only when the last of that
+ // parallel group completes. Chain the next group to only execute after the
+ // preceding group's barrier completes. Repeat until all groups have been executed.
+ std::shared_ptr<boost::afio::dispatcher> dispatcher=
+ boost::afio::make_dispatcher().get();
+ std::vector<std::pair<size_t, int>> groups;
+ boost::afio::atomic<size_t> callcount[10000];
+ memset(&callcount, 0, sizeof(callcount));
+
+ // This lambda is what each parallel op in each group will do: increment an atomic
+ // for that group.
+ auto inccount = [](boost::afio::atomic<size_t> *count){ (*count)++; };
+
+ // This lambda is called after each barrier completes, and it checks that exactly
+ // the right number of inccount lambdas were executed.
+ auto verifybarrier = [](boost::afio::atomic<size_t> *count, size_t shouldbe)
+ {
+ if (*count != shouldbe)
+ throw std::runtime_error("Count was not what it should have been!");
+ return true;
+ };
+
+ // For each group, dispatch ops and a barrier for them
+ boost::afio::future<> next;
+ bool isfirst = true;
+ for(auto &run : groups)
+ {
+ // Create a vector of run.first size of bound inccount lambdas
+ // This will be the batch issued for this group
+ std::vector<std::function<void()>> thisgroupcalls(run.first, std::bind(inccount, &callcount[run.second]));
+ std::vector<boost::afio::future<>> thisgroupcallops;
+ // If this is the first item, schedule without precondition
+ if (isfirst)
+ {
+ thisgroupcallops = dispatcher->call(thisgroupcalls);
+ isfirst = false;
+ }
+ else
+ {
+ // Create a vector of run.first size of preconditions exactly
+ // matching the number in this batch. Note that the precondition
+ // for all of these is the preceding verify op
+ std::vector<boost::afio::future<>> dependency(run.first, next);
+ thisgroupcallops = dispatcher->call(dependency, thisgroupcalls);
+ }
+ // barrier() is very easy: its number of output ops exactly matches its input
+ // but none of the output will complete until the last of the input completes
+ auto thisgroupbarriered = dispatcher->barrier(thisgroupcallops);
+ // Schedule a call of the verify lambda once barrier completes. Here we choose
+ // the first item of the barrier's return, but in truth any of them are good.
+ auto verify = dispatcher->call(thisgroupbarriered.front(), std::function<bool()>(std::bind(verifybarrier, &callcount[run.second], run.first)));
+ // Set the dependency for the next batch to be the just scheduled verify op
+ next = verify;
+ }
+ // next was the last op scheduled, so waiting on it waits on everything
+ when_all_p(next).wait();
+ //]
+}
diff --git a/attic/example/benchmark_asio.cpp b/attic/example/benchmark_asio.cpp
new file mode 100644
index 00000000..45de6e45
--- /dev/null
+++ b/attic/example/benchmark_asio.cpp
@@ -0,0 +1,47 @@
+#include "afio_pch.hpp"
+
+/* My Intel Core i7 3770K running Windows 8 x64: 2591360 closures/sec
+ My Intel Core i7 3770K running Linux x64: 1611040 closures/sec (4 threads)
+*/
+
+static boost::afio::atomic<size_t> togo(0);
+static int callback()
+{
+#if 0
+ Sleep(0);
+#endif
+ --togo;
+ return 1;
+};
+int main(void)
+{
+ using namespace boost::afio;
+ typedef chrono::duration<double, ratio<1, 1>> secs_type;
+ auto threadpool=process_threadpool();
+ auto begin=chrono::high_resolution_clock::now();
+ while(chrono::duration_cast<secs_type>(chrono::high_resolution_clock::now()-begin).count()<3);
+
+ atomic<size_t> threads(0);
+#if 0
+ std::cout << "Attach profiler now and hit Return" << std::endl;
+ getchar();
+#endif
+ begin=chrono::high_resolution_clock::now();
+#pragma omp parallel
+ {
+ ++threads;
+ for(size_t n=0; n<5000000; n++)
+ {
+ ++togo;
+ threadpool->enqueue(callback);
+ }
+ }
+ while(togo)
+ this_thread::sleep_for(chrono::milliseconds(1));
+ auto end=chrono::high_resolution_clock::now();
+ auto diff=chrono::duration_cast<secs_type>(end-begin);
+ std::cout << "It took " << diff.count() << " secs to execute " << (5000000*threads) << " closures which is " << (5000000*threads/diff.count()) << " closures/sec" << std::endl;
+ std::cout << "\nPress Return to exit ..." << std::endl;
+ getchar();
+ return 0;
+}
diff --git a/attic/example/benchmark_atomic_log.cpp b/attic/example/benchmark_atomic_log.cpp
new file mode 100644
index 00000000..ab046837
--- /dev/null
+++ b/attic/example/benchmark_atomic_log.cpp
@@ -0,0 +1,521 @@
+#include "afio_pch.hpp"
+
+/* On my Win8.1 x86 laptop Intel i5 540M @ 2.53Ghz on NTFS Samsung SSD:
+
+Benchmarking traditional file locks with 2 concurrent writers ...
+Waiting for threads to exit ...
+For 2 concurrent writers, achieved 1720.7 attempts per second with a success rate of 1335.24 writes per second which is a 77.5984% success rate.
+
+Benchmarking file locks via atomic append with 2 concurrent writers ...
+Waiting for threads to exit ...
+For 2 concurrent writers, achieved 2413.66 attempts per second with a success rate of 790.364 writes per second which is a 32.7455% success rate.
+Traditional locks were 1.6894 times faster.
+
+Without file_flags::temporary_file | file_flags::delete_on_close, traditional file locks were:
+
+Benchmarking traditional file locks with 2 concurrent writers ...
+Waiting for threads to exit ...
+For 2 concurrent writers, achieved 1266.4 attempts per second with a success rate of 809.013 writes per second which is a 63.883% success rate.
+
+*/
+
+//[benchmark_atomic_log
+int main(int argc, const char *argv[])
+{
+ using namespace BOOST_AFIO_V2_NAMESPACE;
+ using BOOST_AFIO_V2_NAMESPACE::off_t;
+ typedef chrono::duration<double, ratio<1, 1>> secs_type;
+ double traditional_locks=0, atomic_log_locks=0;
+ try { filesystem::remove_all("testdir"); } catch(...) {}
+
+ size_t totalwriters=2, writers=totalwriters;
+ if(argc>1)
+ writers=totalwriters=atoi(argv[1]);
+ {
+ auto dispatcher = make_dispatcher().get();
+ auto mkdir(dispatcher->dir(path_req("testdir", file_flags::create)));
+ auto mkdir1(dispatcher->dir(path_req::relative(mkdir, "1", file_flags::create)));
+ auto mkdir2(dispatcher->dir(path_req::relative(mkdir, "2", file_flags::create)));
+ auto mkdir3(dispatcher->dir(path_req::relative(mkdir, "3", file_flags::create)));
+ auto mkdir4(dispatcher->dir(path_req::relative(mkdir, "4", file_flags::create)));
+ auto mkdir5(dispatcher->dir(path_req::relative(mkdir, "5", file_flags::create)));
+ auto mkdir6(dispatcher->dir(path_req::relative(mkdir, "6", file_flags::create)));
+ auto mkdir7(dispatcher->dir(path_req::relative(mkdir, "7", file_flags::create)));
+ auto mkdir8(dispatcher->dir(path_req::relative(mkdir, "8", file_flags::create)));
+ auto statfs_(dispatcher->statfs(mkdir, fs_metadata_flags::All));
+ auto statfs(statfs_.get());
+ std::cout << "The filing system holding our test directory is " << statfs.f_fstypename << " and has features:" << std::endl;
+#define PRINT_FIELD(field, ...) \
+ std::cout << " f_flags." #field ": "; std::cout << statfs.f_flags.field __VA_ARGS__ << std::endl
+ PRINT_FIELD(rdonly);
+ PRINT_FIELD(noexec);
+ PRINT_FIELD(nosuid);
+ PRINT_FIELD(acls);
+ PRINT_FIELD(xattr);
+ PRINT_FIELD(compression);
+ PRINT_FIELD(extents);
+ PRINT_FIELD(filecompression);
+#undef PRINT_FIELD
+#define PRINT_FIELD(field, ...) \
+ std::cout << " f_" #field ": "; std::cout << statfs.f_##field __VA_ARGS__ << std::endl
+ PRINT_FIELD(bsize);
+ PRINT_FIELD(iosize);
+ PRINT_FIELD(blocks, << " (" << (statfs.f_blocks*statfs.f_bsize / 1024.0 / 1024.0 / 1024.0) << " Gb)");
+ PRINT_FIELD(bfree, << " (" << (statfs.f_bfree*statfs.f_bsize / 1024.0 / 1024.0 / 1024.0) << " Gb)");
+ PRINT_FIELD(bavail, << " (" << (statfs.f_bavail*statfs.f_bsize / 1024.0 / 1024.0 / 1024.0) << " Gb)");
+#undef PRINT_FIELD
+ }
+ if(1)
+ {
+ std::cout << "\nBenchmarking a single traditional lock file with " << writers << " concurrent writers ...\n";
+ std::vector<thread> threads;
+ atomic<bool> done(true);
+ atomic<size_t> attempts(0), successes(0);
+ for(size_t n=0; n<writers; n++)
+ {
+ threads.push_back(thread([&done, &attempts, &successes, n]{
+ try
+ {
+ // Create a dispatcher
+ auto dispatcher = make_dispatcher().get();
+ // Schedule opening the log file for writing log entries
+ auto logfile(dispatcher->file(path_req("testdir/log",
+ file_flags::create | file_flags::read_write)));
+ // Retrieve any errors which occurred
+ logfile.get();
+ // Wait until all threads are ready
+ while(done) { this_thread::yield(); }
+ while(!done)
+ {
+ // Traditional file locks are very simple: try to exclusively create the lock file.
+ // If you succeed, you have the lock.
+ auto lockfile(dispatcher->file(path_req("testdir/log.lock",
+ file_flags::create_only_if_not_exist | file_flags::write | file_flags::temporary_file | file_flags::delete_on_close)));
+ attempts.fetch_add(1, memory_order_relaxed);
+ // v1.4 of the AFIO engine will return error_code instead of exceptions for this
+ try { lockfile.get(); } catch(const system_error &e) { continue; }
+ std::string logentry("I am log writer "), mythreadid(to_string(n)), logentryend("!\n");
+ // Fetch the size
+ off_t where=logfile->lstat().st_size, entrysize=logentry.size()+mythreadid.size()+logentryend.size();
+ // Schedule extending the log file
+ auto extendlog(dispatcher->truncate(logfile, where+entrysize));
+ // Schedule writing the new entry
+ auto writetolog(dispatcher->write(make_io_req(extendlog, {logentry, mythreadid, logentryend}, where)));
+ writetolog.get();
+ extendlog.get();
+ successes.fetch_add(1, memory_order_relaxed);
+ }
+ }
+ catch(const system_error &e) { std::cerr << "ERROR: test exits via system_error code " << e.code().value() << "(" << e.what() << ")" << std::endl; abort(); }
+ catch(const std::exception &e) { std::cerr << "ERROR: test exits via exception (" << e.what() << ")" << std::endl; abort(); }
+ catch(...) { std::cerr << "ERROR: test exits via unknown exception" << std::endl; abort(); }
+ }));
+ }
+ auto begin=chrono::high_resolution_clock::now();
+ done=false;
+ std::this_thread::sleep_for(std::chrono::seconds(20));
+ done=true;
+ std::cout << "Waiting for threads to exit ..." << std::endl;
+ for(auto &i : threads)
+ i.join();
+ auto end=chrono::high_resolution_clock::now();
+ auto diff=chrono::duration_cast<secs_type>(end-begin);
+ std::cout << "For " << writers << " concurrent writers, achieved " << (attempts/diff.count()) << " attempts per second with a "
+ "success rate of " << (successes/diff.count()) << " writes per second which is a " << (100.0*successes/attempts) << "% success rate." << std::endl;
+ traditional_locks=successes/diff.count();
+ }
+
+ if(1)
+ {
+ std::cout << "\nBenchmarking eight traditional lock files with " << writers << " concurrent writers ...\n";
+ std::vector<thread> threads;
+ atomic<bool> done(true);
+ atomic<size_t> attempts(0), successes(0);
+ for(size_t n=0; n<writers; n++)
+ {
+ threads.push_back(thread([&done, &attempts, &successes, n]{
+ try
+ {
+ // Create a dispatcher
+ auto dispatcher = make_dispatcher().get();
+ // Schedule opening the log file for writing log entries
+ auto logfile(dispatcher->file(path_req("testdir/log",
+ file_flags::create | file_flags::read_write)));
+ // Retrieve any errors which occurred
+ logfile.get();
+ // Wait until all threads are ready
+ while(done) { this_thread::yield(); }
+ while(!done)
+ {
+ // Parallel try to exclusively create all eight lock files
+ std::vector<path_req> lockfiles; lockfiles.reserve(8);
+ lockfiles.push_back(path_req("testdir/1/log.lock",
+ file_flags::create_only_if_not_exist | file_flags::write | file_flags::temporary_file | file_flags::delete_on_close));
+ lockfiles.push_back(path_req("testdir/2/log.lock",
+ file_flags::create_only_if_not_exist | file_flags::write | file_flags::temporary_file | file_flags::delete_on_close));
+ lockfiles.push_back(path_req("testdir/3/log.lock",
+ file_flags::create_only_if_not_exist | file_flags::write | file_flags::temporary_file | file_flags::delete_on_close));
+ lockfiles.push_back(path_req("testdir/4/log.lock",
+ file_flags::create_only_if_not_exist | file_flags::write | file_flags::temporary_file | file_flags::delete_on_close));
+ lockfiles.push_back(path_req("testdir/5/log.lock",
+ file_flags::create_only_if_not_exist | file_flags::write | file_flags::temporary_file | file_flags::delete_on_close));
+ lockfiles.push_back(path_req("testdir/6/log.lock",
+ file_flags::create_only_if_not_exist | file_flags::write | file_flags::temporary_file | file_flags::delete_on_close));
+ lockfiles.push_back(path_req("testdir/7/log.lock",
+ file_flags::create_only_if_not_exist | file_flags::write | file_flags::temporary_file | file_flags::delete_on_close));
+ lockfiles.push_back(path_req("testdir/8/log.lock",
+ file_flags::create_only_if_not_exist | file_flags::write | file_flags::temporary_file | file_flags::delete_on_close));
+ auto lockfile(dispatcher->file(lockfiles));
+ attempts.fetch_add(1, memory_order_relaxed);
+#if 1
+ // v1.4 of the AFIO engine will return error_code instead of exceptions for this
+ try { lockfile[7].get(); } catch(const system_error &e) { continue; }
+ try { lockfile[6].get(); } catch(const system_error &e) { continue; }
+ try { lockfile[5].get(); } catch(const system_error &e) { continue; }
+ try { lockfile[4].get(); } catch(const system_error &e) { continue; }
+ try { lockfile[3].get(); } catch(const system_error &e) { continue; }
+ try { lockfile[2].get(); } catch(const system_error &e) { continue; }
+ try { lockfile[1].get(); } catch(const system_error &e) { continue; }
+ try { lockfile[0].get(); } catch(const system_error &e) { continue; }
+#else
+ try
+ {
+ auto barrier(dispatcher->barrier(lockfile));
+ // v1.4 of the AFIO engine will return error_code instead of exceptions for this
+ for(size_t n=0; n<8; n++)
+ barrier[n].get();
+ }
+ catch(const system_error &e) { continue; }
+#endif
+ std::string logentry("I am log writer "), mythreadid(to_string(n)), logentryend("!\n");
+ // Fetch the size
+ off_t where=logfile->lstat().st_size, entrysize=logentry.size()+mythreadid.size()+logentryend.size();
+ // Schedule extending the log file
+ auto extendlog(dispatcher->truncate(logfile, where+entrysize));
+ // Schedule writing the new entry
+ auto writetolog(dispatcher->write(make_io_req(extendlog, {logentry, mythreadid, logentryend}, where)));
+ // Fetch errors from the last operation first to avoid sleep-wake cycling
+ writetolog.get();
+ extendlog.get();
+ successes.fetch_add(1, memory_order_relaxed);
+ }
+ }
+ catch(const system_error &e) { std::cerr << "ERROR: test exits via system_error code " << e.code().value() << "(" << e.what() << ")" << std::endl; abort(); }
+ catch(const std::exception &e) { std::cerr << "ERROR: test exits via exception (" << e.what() << ")" << std::endl; abort(); }
+ catch(...) { std::cerr << "ERROR: test exits via unknown exception" << std::endl; abort(); }
+ }));
+ }
+ auto begin=chrono::high_resolution_clock::now();
+ done=false;
+ std::this_thread::sleep_for(std::chrono::seconds(20));
+ done=true;
+ std::cout << "Waiting for threads to exit ..." << std::endl;
+ for(auto &i : threads)
+ i.join();
+ auto end=chrono::high_resolution_clock::now();
+ auto diff=chrono::duration_cast<secs_type>(end-begin);
+ std::cout << "For " << writers << " concurrent writers, achieved " << (attempts/diff.count()) << " attempts per second with a "
+ "success rate of " << (successes/diff.count()) << " writes per second which is a " << (100.0*successes/attempts) << "% success rate." << std::endl;
+ }
+
+ // **** WARNING UNSUPPORTED UNDOCUMENTED API DO NOT USE IN YOUR CODE ****
+ if(1)
+ {
+ std::cout << "\nBenchmarking a ranged file lock with " << writers << " concurrent writers ...\n";
+ std::vector<thread> threads;
+ atomic<bool> done(true);
+ atomic<size_t> attempts(0), successes(0);
+ for(size_t n=0; n<writers; n++)
+ {
+ threads.push_back(thread([&done, &attempts, &successes, n]{
+ try
+ {
+ // Create a dispatcher
+ auto dispatcher = make_dispatcher().get();
+ // Schedule opening the log file for writing log entries
+ auto logfile(dispatcher->file(path_req("testdir/log",
+ file_flags::create | file_flags::read_write | file_flags::os_lockable)));
+ // Retrieve any errors which occurred
+ logfile.get();
+ // Wait until all threads are ready
+ while(done) { this_thread::yield(); }
+ while(!done)
+ {
+ attempts.fetch_add(1, memory_order_relaxed);
+ // **** WARNING UNSUPPORTED UNDOCUMENTED API DO NOT USE IN YOUR CODE ****
+ dispatcher->lock({logfile}).front().get();
+ std::string logentry("I am log writer "), mythreadid(to_string(n)), logentryend("!\n");
+ // Fetch the size
+ off_t where=logfile->lstat().st_size, entrysize=logentry.size()+mythreadid.size()+logentryend.size();
+ // Schedule extending the log file
+ auto extendlog(dispatcher->truncate(logfile, where+entrysize));
+ // Schedule writing the new entry
+ auto writetolog(dispatcher->write(make_io_req(extendlog, {logentry, mythreadid, logentryend}, where)));
+ writetolog.get();
+ extendlog.get();
+ successes.fetch_add(1, memory_order_relaxed);
+ dispatcher->lock({{logfile, nullptr}}).front().get();
+ }
+ }
+ catch(const system_error &e) { std::cerr << "ERROR: test exits via system_error code " << e.code().value() << "(" << e.what() << ")" << std::endl; abort(); }
+ catch(const std::exception &e) { std::cerr << "ERROR: test exits via exception (" << e.what() << ")" << std::endl; abort(); }
+ catch(...) { std::cerr << "ERROR: test exits via unknown exception" << std::endl; abort(); }
+ }));
+ }
+ auto begin=chrono::high_resolution_clock::now();
+ done=false;
+ std::this_thread::sleep_for(std::chrono::seconds(20));
+ done=true;
+ std::cout << "Waiting for threads to exit ..." << std::endl;
+ for(auto &i : threads)
+ i.join();
+ auto end=chrono::high_resolution_clock::now();
+ auto diff=chrono::duration_cast<secs_type>(end-begin);
+ std::cout << "For " << writers << " concurrent writers, achieved " << (attempts/diff.count()) << " attempts per second with a "
+ "success rate of " << (successes/diff.count()) << " writes per second which is a " << (100.0*successes/attempts) << "% success rate." << std::endl;
+ }
+
+ if(1)
+ {
+ std::cout << "\nBenchmarking file locks via atomic append with " << writers << " concurrent writers ...\n";
+ std::vector<thread> threads;
+ atomic<bool> done(true);
+ atomic<size_t> attempts(0), successes(0);
+ for(size_t thread=0; thread<writers; thread++)
+ {
+ threads.push_back(std::thread([&done, &attempts, &successes, thread]{
+ try
+ {
+ // Create a dispatcher
+ auto dispatcher = make_dispatcher().get();
+ // Schedule opening the log file for writing log entries
+ auto logfile(dispatcher->file(path_req("testdir/log",
+ file_flags::create | file_flags::read_write)));
+ // Schedule opening the lock file for scanning and hole punching
+ auto lockfilez(dispatcher->file(path_req("testdir/log.lock",
+ file_flags::create | file_flags::read_write)));
+ // Schedule opening the lock file for atomic appending
+ auto lockfilea(dispatcher->file(path_req("testdir/log.lock",
+ file_flags::create | file_flags::write | file_flags::append)));
+ // Retrieve any errors which occurred
+ lockfilea.get(); lockfilez.get(); logfile.get();
+ while(!done)
+ {
+ // Each lock log entry is 16 bytes in length.
+ enum class message_code_t : uint8_t
+ {
+ unlock=0,
+ havelock=1,
+ rescind=2,
+ interest=3,
+ nominate=5
+ };
+#pragma pack(push, 1)
+ union message_t
+ {
+ char bytes[16];
+ struct
+ {
+ message_code_t code;
+ char __padding1[3];
+ uint32_t timestamp; // time_t
+ uint64_t uniqueid;
+ };
+ };
+#pragma pack(pop)
+ static_assert(sizeof(message_t)==16, "message_t is not 16 bytes long!");
+ auto gettime=[]{ return (uint32_t)(std::time(nullptr)-1420070400UL/* 1st Jan 2015*/); };
+ message_t temp, buffers[256];
+ off_t buffersoffset;
+ uint32_t nowtimestamp=gettime();
+ // TODO FIXME: If multiple machines are all accessing the lock file, nowtimestamp
+ // ought to be corrected for drift
+
+ // Step 1: Register my interest
+ memset(temp.bytes, 0, sizeof(temp));
+ temp.code=message_code_t::interest;
+ temp.timestamp=nowtimestamp;
+ temp.uniqueid=thread; // TODO FIXME: Needs to be a VERY random number to prevent collision.
+ dispatcher->write(make_io_req(lockfilea, temp.bytes, sizeof(temp), 0)).get();
+
+ // Step 2: Wait until my interest message appears, also figure out what interests precede
+ // mine and where my start of interest begins, and if someone currently has the lock
+ off_t startofinterest=dispatcher->extents(lockfilez).get().front().first;
+ off_t myuniqueid=(off_t)-1;
+ bool findPreceding=true;
+ std::vector<std::pair<bool, off_t>> preceding;
+ std::pair<bool, off_t> lockid;
+ auto iterate=[&]{
+ size_t validPrecedingCount=0;
+ off_t lockfilesize=lockfilez->lstat(metadata_flags::size).st_size;
+ buffersoffset=lockfilesize>sizeof(buffers) ? lockfilesize-sizeof(buffers) : 0;
+ //buffersoffset-=buffersoffset % sizeof(buffers[0]);
+ for(; !validPrecedingCount && buffersoffset>=startofinterest && buffersoffset<lockfilesize; buffersoffset-=sizeof(buffers))
+ {
+ size_t amount=(size_t)(lockfilesize-buffersoffset);
+ if(amount>sizeof(buffers))
+ amount=sizeof(buffers);
+ dispatcher->read(make_io_req(lockfilez, (void *) buffers, amount, buffersoffset)).get();
+ for(size_t n=amount/sizeof(buffers[0])-1; !validPrecedingCount && n<amount/sizeof(buffers[0]); n--)
+ {
+ // Early exit if messages have become stale
+ if(!buffers[n].timestamp || (buffers[n].timestamp<nowtimestamp && nowtimestamp-buffers[n].timestamp>20))
+ {
+ startofinterest=buffersoffset+n*sizeof(buffers[0]);
+ break;
+ }
+ // Find if he is locked or unlocked
+ if(lockid.second==(off_t) -1)
+ {
+ if(buffers[n].code==message_code_t::unlock)
+ lockid=std::make_pair(false, buffers[n].uniqueid);
+ else if(buffers[n].code==message_code_t::havelock)
+ lockid=std::make_pair(true, buffers[n].uniqueid);
+ }
+ // Am I searching for my interest?
+ if(myuniqueid==(off_t)-1)
+ {
+ if(!memcmp(buffers+n, &temp, sizeof(temp)))
+ myuniqueid=buffersoffset+n*sizeof(buffers[0]);
+ }
+ else if(findPreceding && (buffers[n].uniqueid<myuniqueid || buffersoffset+n*sizeof(buffers[0])<myuniqueid))
+ {
+ // We are searching for preceding claims now
+ if(buffers[n].code==message_code_t::rescind || buffers[n].code==message_code_t::unlock)
+ preceding.push_back(std::make_pair(false, buffers[n].uniqueid));
+ else if(buffers[n].code==message_code_t::nominate || buffers[n].code==message_code_t::havelock)
+ {
+ if(buffers[n].uniqueid<myuniqueid && preceding.end()==std::find(preceding.begin(), preceding.end(), std::make_pair(false, (off_t) buffers[n].uniqueid)))
+ {
+ preceding.push_back(std::make_pair(true, buffers[n].uniqueid));
+ validPrecedingCount++;
+ }
+ }
+ else if(buffers[n].code==message_code_t::interest)
+ {
+ if(buffersoffset+n*sizeof(buffers[0])<myuniqueid && preceding.end()==std::find(preceding.begin(), preceding.end(), std::make_pair(false, buffersoffset+n*sizeof(buffers[0]))))
+ {
+ preceding.push_back(std::make_pair(true, buffersoffset+n*sizeof(buffers[0])));
+ validPrecedingCount++;
+ }
+ }
+ }
+ }
+ }
+#if 0
+ std::cout << thread << ": myuniqueid=" << myuniqueid << " startofinterest=" << startofinterest << " size=" << lockfilez->lstat(metadata_flags::size).st_size << " lockid=" << lockid.first << "," << lockid.second << " preceding=";
+ for(auto &i : preceding)
+ std::cout << i.first << "," << i.second << ";";
+ std::cout << std::endl;
+#endif
+ if(findPreceding)
+ {
+ // Remove all rescinded interests preceding ours
+ preceding.erase(std::remove_if(preceding.begin(), preceding.end(), [](const std::pair<bool, off_t> &i){ return !i.first; }), preceding.end());
+ std::sort(preceding.begin(), preceding.end());
+ findPreceding=false;
+ }
+ };
+ do
+ {
+ lockid=std::make_pair(false, (off_t)-1);
+ iterate();
+ // Didn't find it, so sleep and retry, maybe st_size will have updated by then
+ if(myuniqueid==(off_t)-1) this_thread::sleep_for(chrono::milliseconds(1));
+ } while(myuniqueid==(off_t)-1);
+
+ // Step 3: If there is no lock and no interest precedes mine, claim the mutex. Else issue a nominate for myself,
+ // once per ten seconds.
+ {
+ nowtimestamp=0;
+ mutex m;
+ condition_variable c;
+ unique_lock<decltype(m)> l(m);
+ atomic<bool> fileChanged(false);
+ for(;;)
+ {
+ attempts.fetch_add(1, memory_order_relaxed);
+ temp.timestamp=gettime();
+ temp.uniqueid=myuniqueid;
+ if(preceding.empty())
+ {
+ temp.code=message_code_t::havelock;
+ dispatcher->write(make_io_req(lockfilea, temp.bytes, sizeof(temp), 0)).get();
+ // Zero the range between startofinterest and myuniqueid
+ if(startofinterest<myuniqueid)
+ {
+ std::vector<std::pair<off_t, off_t>> range={{startofinterest, myuniqueid-startofinterest}};
+ dispatcher->zero(lockfilez, range).get();
+ // std::cout << thread << ": lock taken for myuniqueid=" << myuniqueid << ", zeroing " << range.front().first << ", " << range.front().second << std::endl;
+ }
+ break;
+ }
+ else
+ {
+ auto lockfilechanged=[&]{
+ fileChanged=true;
+ c.notify_all();
+ };
+ // TODO FIXME: Put a modify watch on the lockfile instead of spinning
+ if(temp.timestamp-nowtimestamp>=10)
+ {
+ temp.code=message_code_t::nominate;
+ dispatcher->write(make_io_req(lockfilea, temp.bytes, sizeof(temp), 0)).get();
+ nowtimestamp=temp.timestamp;
+ }
+ //c.wait_for(l, chrono::milliseconds(1), [&fileChanged]{ return fileChanged==true; });
+ fileChanged=false;
+ preceding.clear();
+ findPreceding=true;
+ lockid=std::make_pair(false, (off_t)-1);
+ iterate();
+ }
+ }
+ }
+
+ // Step 4: I now have the lock, so do my thing
+ std::string logentry("I am log writer "), mythreadid(to_string(thread)), logentryend("!\n");
+ // Fetch the size
+ off_t where=logfile->lstat().st_size, entrysize=logentry.size()+mythreadid.size()+logentryend.size();
+ // Schedule extending the log file
+ auto extendlog(dispatcher->truncate(logfile, where+entrysize));
+ // Schedule writing the new entry
+ auto writetolog(dispatcher->write(make_io_req(extendlog, {logentry, mythreadid, logentryend}, where)));
+ // Fetch errors from the last operation first to avoid sleep-wake cycling
+ writetolog.get();
+ extendlog.get();
+ successes.fetch_add(1, memory_order_relaxed);
+// std::cout << thread << ": doing work for myuniqueid=" << myuniqueid << std::endl;
+// this_thread::sleep_for(chrono::milliseconds(250));
+
+ // Step 5: Release the lock
+ temp.code=message_code_t::unlock;
+ temp.timestamp=gettime();
+ temp.uniqueid=myuniqueid;
+// std::cout << thread << ": lock released for myuniqueid=" << myuniqueid << std::endl;
+ dispatcher->write(make_io_req(lockfilea, temp.bytes, sizeof(temp), 0)).get();
+ }
+ }
+ catch(const system_error &e) { std::cerr << "ERROR: test exits via system_error code " << e.code().value() << "(" << e.what() << ")" << std::endl; abort(); }
+ catch(const std::exception &e) { std::cerr << "ERROR: test exits via exception (" << e.what() << ")" << std::endl; abort(); }
+ catch(...) { std::cerr << "ERROR: test exits via unknown exception" << std::endl; abort(); }
+ }));
+ }
+ auto begin=chrono::high_resolution_clock::now();
+ done=false;
+ std::this_thread::sleep_for(std::chrono::seconds(20));
+ done=true;
+ std::cout << "Waiting for threads to exit ..." << std::endl;
+ for(auto &i : threads)
+ i.join();
+ auto end=chrono::high_resolution_clock::now();
+ auto diff=chrono::duration_cast<secs_type>(end-begin);
+ std::cout << "For " << writers << " concurrent writers, achieved " << (attempts/diff.count()) << " attempts per second with a "
+ "success rate of " << (successes/diff.count()) << " writes per second which is a " << (100.0*successes/attempts) << "% success rate." << std::endl;
+ atomic_log_locks=successes/diff.count();
+ }
+ filesystem::remove_all("testdir");
+ std::cout << "Traditional locks were " << (traditional_locks/atomic_log_locks) << " times faster." << std::endl;
+ return 0;
+}
+//]
diff --git a/attic/example/benchmark_chained1.cpp b/attic/example/benchmark_chained1.cpp
new file mode 100644
index 00000000..f40bf1c9
--- /dev/null
+++ b/attic/example/benchmark_chained1.cpp
@@ -0,0 +1,48 @@
+#include "afio_pch.hpp"
+
+/* My Intel Core i7 3770K running Windows 8 x64: 726124 closures/sec
+ My Intel Core i7 3770K running Linux x64: 968005 closures/sec
+*/
+
+static std::pair<bool, std::shared_ptr<boost::afio::handle>> _callback(size_t, boost::afio::future<> op)
+{
+#if 0
+ // Simulate an i/o op with a context switch
+ Sleep(0);
+#endif
+ return std::make_pair(true, op.get_handle());
+};
+
+int main(void)
+{
+ using namespace boost::afio;
+ auto dispatcher=make_dispatcher().get();
+ typedef chrono::duration<double, ratio<1, 1>> secs_type;
+ auto begin=chrono::high_resolution_clock::now();
+ while(chrono::duration_cast<secs_type>(chrono::high_resolution_clock::now()-begin).count()<3);
+
+ std::pair<async_op_flags, dispatcher::completion_t *> callback(async_op_flags::none, _callback);
+ atomic<size_t> threads(0);
+#if 0
+ std::cout << "Attach profiler now and hit Return" << std::endl;
+ getchar();
+#endif
+ begin=chrono::high_resolution_clock::now();
+#pragma omp parallel
+ {
+ future<> last;
+ threads++;
+ for(size_t n=0; n<500000; n++)
+ {
+ last=dispatcher->completion(last, callback);
+ }
+ }
+ while(dispatcher->wait_queue_depth())
+ this_thread::sleep_for(chrono::milliseconds(1));
+ auto end=chrono::high_resolution_clock::now();
+ auto diff=chrono::duration_cast<secs_type>(end-begin);
+ std::cout << "It took " << diff.count() << " secs to execute " << (500000*threads) << " closures which is " << (500000*threads/diff.count()) << " chained closures/sec" << std::endl;
+ std::cout << "\nPress Return to exit ..." << std::endl;
+ getchar();
+ return 0;
+}
diff --git a/attic/example/benchmark_chained2.cpp b/attic/example/benchmark_chained2.cpp
new file mode 100644
index 00000000..d410076c
--- /dev/null
+++ b/attic/example/benchmark_chained2.cpp
@@ -0,0 +1,43 @@
+#include "afio_pch.hpp"
+
+/* My Intel Core i7 3770K running Windows 8 x64: 596318 closures/sec
+ My Intel Core i7 3770K running Linux x64: 794384 closures/sec
+*/
+
+static int callback()
+{
+#if 0
+ Sleep(0);
+#endif
+ return 1;
+};
+
+int main(void)
+{
+ using namespace boost::afio;
+ auto dispatcher=make_dispatcher().get();
+ typedef chrono::duration<double, ratio<1, 1>> secs_type;
+ auto begin=chrono::high_resolution_clock::now();
+ while(chrono::duration_cast<secs_type>(chrono::high_resolution_clock::now()-begin).count()<3);
+
+ atomic<size_t> threads(0);
+ std::vector<std::function<int()>> callbacks(1, callback);
+ begin=chrono::high_resolution_clock::now();
+#pragma omp parallel
+ {
+ std::vector<future<>> preconditions(1);
+ threads++;
+ for(size_t n=0; n<500000; n++)
+ {
+ preconditions.front()=dispatcher->call(preconditions, callbacks).front();
+ }
+ }
+ while(dispatcher->wait_queue_depth())
+ this_thread::sleep_for(chrono::milliseconds(1));
+ auto end=chrono::high_resolution_clock::now();
+ auto diff=chrono::duration_cast<secs_type>(end-begin);
+ std::cout << "It took " << diff.count() << " secs to execute " << (500000*threads) << " closures which is " << (500000*threads/diff.count()) << " chained closures/sec" << std::endl;
+ std::cout << "\nPress Return to exit ..." << std::endl;
+ getchar();
+ return 0;
+}
diff --git a/attic/example/benchmark_latency.cpp b/attic/example/benchmark_latency.cpp
new file mode 100644
index 00000000..f11422fd
--- /dev/null
+++ b/attic/example/benchmark_latency.cpp
@@ -0,0 +1,162 @@
+#include "afio_pch.hpp"
+#include <thread>
+
+#define ITERATIONS 10000
+#define CONCURRENCY 32
+
+// Optional
+//#define MULTIPLIER 1000000 // output number of microseconds instead of seconds
+#define MULTIPLIER 3900000000ULL // output number of CPU clocks instead of seconds
+
+typedef decltype(boost::afio::chrono::high_resolution_clock::now()) time_point;
+size_t id_offset;
+static time_point points[100000];
+static time_point::duration overhead, timesliceoverhead, sleepoverhead;
+static std::pair<bool, std::shared_ptr<boost::afio::handle>> _callback(size_t id, boost::afio::future<> op)
+{
+ using namespace boost::afio;
+ points[id-id_offset]=chrono::high_resolution_clock::now();
+ return std::make_pair(true, op.get_handle());
+};
+
+int main(void)
+{
+ using namespace boost::afio;
+ auto dispatcher=make_dispatcher().get();
+ typedef chrono::duration<double, ratio<1, 1>> secs_type;
+ {
+ size_t total1=0, total2=0, total3=0;
+ auto begin=chrono::high_resolution_clock::now();
+ while(chrono::duration_cast<secs_type>(chrono::high_resolution_clock::now()-begin).count()<1)
+ {
+ auto now2=chrono::high_resolution_clock::now();
+ this_thread::yield();
+ auto now3=chrono::high_resolution_clock::now();
+ timesliceoverhead+=now3-now2;
+ total1++;
+ }
+ while(chrono::duration_cast<secs_type>(chrono::high_resolution_clock::now()-begin).count()<2)
+ {
+ auto now2=chrono::high_resolution_clock::now();
+ this_thread::sleep_for(chrono::nanoseconds(1));
+ auto now3=chrono::high_resolution_clock::now();
+ sleepoverhead+=now3-now2;
+ total3++;
+ }
+ while(chrono::duration_cast<secs_type>(chrono::high_resolution_clock::now()-begin).count()<3)
+ {
+ auto now1=chrono::high_resolution_clock::now();
+ auto now2=chrono::high_resolution_clock::now();
+ overhead+=now2-now1;
+ total2++;
+ }
+ overhead=time_point::duration(overhead.count()/total2);
+ sleepoverhead=time_point::duration(sleepoverhead.count()/total3);
+ timesliceoverhead=time_point::duration(timesliceoverhead.count()/total1);
+ std::cout << "Timing overhead is calculated to be " << chrono::duration_cast<secs_type>(overhead).count() << " seconds." << std::endl;
+ std::cout << "OS context switch overhead is calculated to be " << chrono::duration_cast<secs_type>(timesliceoverhead).count() << " seconds." << std::endl;
+ std::cout << "OS sleep overhead is calculated to be " << chrono::duration_cast<secs_type>(sleepoverhead).count() << " seconds." << std::endl;
+ }
+
+ std::pair<async_op_flags, dispatcher::completion_t *> callback(async_op_flags::none, _callback);
+ std::ofstream csv("afio_latencies.csv");
+ csv << "Timing overhead is calculated to be," << chrono::duration_cast<secs_type>(overhead).count()
+#ifdef MULTIPLIER
+ * MULTIPLIER
+#endif
+ << std::endl;
+ csv << "OS context switch overhead is calculated to be," << chrono::duration_cast<secs_type>(timesliceoverhead).count()
+#ifdef MULTIPLIER
+ * MULTIPLIER
+#endif
+ << std::endl << std::endl;
+ csv << "OS sleep overhead is calculated to be," << chrono::duration_cast<secs_type>(sleepoverhead).count()
+#ifdef MULTIPLIER
+ * MULTIPLIER
+#endif
+ << std::endl << std::endl;
+ csv << "Concurrency,Handler Min,Handler Max,Handler Average,Handler Stddev,Complete Min,Complete Max,Complete Average,Complete Stddev" << std::endl;
+ for(size_t concurrency=0; concurrency<CONCURRENCY; concurrency++)
+ {
+ future<> last[CONCURRENCY];
+ time_point begin[CONCURRENCY], handled[CONCURRENCY], end[CONCURRENCY];
+ atomic<bool> waiter;
+ double handler[ITERATIONS], complete[ITERATIONS];
+ std::vector<std::thread> threads;
+ threads.reserve(CONCURRENCY);
+ size_t iterations=(concurrency>=8) ? ITERATIONS/10 : ITERATIONS;
+ std::cout << "Running " << iterations << " iterations of concurrency " << concurrency+1 << " ..." << std::endl;
+ for(size_t n=0; n<iterations; n++)
+ {
+ threads.clear();
+ waiter.store(true);
+ atomic<size_t> threads_ready(0);
+ for(size_t c=0; c<=concurrency; c++)
+ {
+ threads.push_back(std::thread([&, c]{
+ ++threads_ready;
+ while(waiter)
+#ifdef BOOST_SMT_PAUSE
+ BOOST_SMT_PAUSE
+#endif
+ ;
+ begin[c]=chrono::high_resolution_clock::now();
+ last[c]=dispatcher->completion(future<>(), callback);
+ last[c].get();
+ end[c]=chrono::high_resolution_clock::now();
+ handled[c]=points[last[c].id()-id_offset];
+ }));
+ }
+ while(threads_ready<=concurrency)
+#ifdef BOOST_SMT_PAUSE
+ BOOST_SMT_PAUSE
+#endif
+ ;
+ waiter.store(false);
+ for(auto &i: threads)
+ i.join();
+ for(size_t c=0; c<=concurrency; c++)
+ {
+ handler[n]=chrono::duration_cast<secs_type>(handled[c]-begin[c]-overhead).count();
+ complete[n]=chrono::duration_cast<secs_type>(end[c]-handled[c]-overhead).count();
+ }
+ }
+ for(size_t n=0; n<=concurrency; n++)
+ if(last[n].id()>id_offset) id_offset=last[n].id();
+ double minHandler=1<<30, maxHandler=0, totalHandler=0, minComplete=1<<30, maxComplete=0, totalComplete=0;
+ for(size_t n=0; n<iterations; n++)
+ {
+ if(handler[n]<minHandler) minHandler=handler[n];
+ if(handler[n]>maxHandler) maxHandler=handler[n];
+ if(complete[n]<minHandler) minComplete=complete[n];
+ if(complete[n]>maxHandler) maxComplete=complete[n];
+ totalHandler+=handler[n];
+ totalComplete+=complete[n];
+ }
+ totalHandler/=iterations;
+ totalComplete/=iterations;
+ double varianceHandler=0, varianceComplete=0;
+ for(size_t n=0; n<iterations; n++)
+ {
+ varianceHandler+=pow(handler[n]-totalHandler, 2);
+ varianceComplete+=pow(complete[n]-totalComplete, 2);
+ }
+ varianceHandler/=iterations;
+ varianceComplete/=iterations;
+ varianceHandler=sqrt(varianceHandler);
+ varianceComplete=sqrt(varianceComplete);
+#ifdef MULTIPLIER
+ minHandler*=MULTIPLIER;
+ maxHandler*=MULTIPLIER;
+ totalHandler*=MULTIPLIER;
+ varianceHandler*=MULTIPLIER;
+ minComplete*=MULTIPLIER;
+ maxComplete*=MULTIPLIER;
+ totalComplete*=MULTIPLIER;
+ varianceComplete*=MULTIPLIER;
+#endif
+ csv << concurrency+1 << "," << minHandler << "," << maxHandler << "," << totalHandler << "," << varianceHandler << ","
+ << minComplete << "," << maxComplete << "," << totalComplete << "," << varianceComplete << std::endl;
+ }
+ return 0;
+}
diff --git a/attic/example/benchmark_unchained1.cpp b/attic/example/benchmark_unchained1.cpp
new file mode 100644
index 00000000..2f9b6769
--- /dev/null
+++ b/attic/example/benchmark_unchained1.cpp
@@ -0,0 +1,43 @@
+#include "afio_pch.hpp"
+
+/* My Intel Core i7 3770K running Windows 8 x64: 1555990 closures/sec
+ My Intel Core i7 3770K running Linux x64: 1432810 closures/sec
+*/
+
+static std::pair<bool, std::shared_ptr<boost::afio::handle>> callback(size_t, boost::afio::future<> op)
+{
+#if 0
+ // Simulate an i/o op with a context switch
+ Sleep(0);
+#endif
+ return std::make_pair(true, op.get_handle());
+};
+
+int main(void)
+{
+ using namespace boost::afio;
+ auto dispatcher=make_dispatcher().get();
+ typedef chrono::duration<double, ratio<1, 1>> secs_type;
+ auto begin=chrono::high_resolution_clock::now();
+ while(chrono::duration_cast<secs_type>(chrono::high_resolution_clock::now()-begin).count()<3);
+
+ std::vector<future<>> preconditions;
+ std::vector<std::pair<async_op_flags, dispatcher::completion_t *>> callbacks(1,
+ std::make_pair(async_op_flags::none, callback));
+#if 0
+ std::cout << "Attach profiler now and hit Return" << std::endl;
+ getchar();
+#endif
+ begin=chrono::high_resolution_clock::now();
+#pragma omp parallel for
+ for(int n=0; n<5000000; n++)
+ dispatcher->completion(preconditions, callbacks);
+ while(dispatcher->wait_queue_depth())
+ this_thread::sleep_for(chrono::milliseconds(1));
+ auto end=chrono::high_resolution_clock::now();
+ auto diff=chrono::duration_cast<secs_type>(end-begin);
+ std::cout << "It took " << diff.count() << " secs to execute 5,000,000 closures which is " << (5000000/diff.count()) << " unchained closures/sec" << std::endl;
+ std::cout << "\nPress Return to exit ..." << std::endl;
+ getchar();
+ return 0;
+}
diff --git a/attic/example/benchmark_unchained2.cpp b/attic/example/benchmark_unchained2.cpp
new file mode 100644
index 00000000..db9affe0
--- /dev/null
+++ b/attic/example/benchmark_unchained2.cpp
@@ -0,0 +1,37 @@
+#include "afio_pch.hpp"
+
+/* My Intel Core i7 3770K running Windows 8 x64: 911963 closures/sec
+ My Intel Core i7 3770K running Linux x64: 1094780 closures/sec
+*/
+
+static int callback()
+{
+#if 0
+ Sleep(0);
+#endif
+ return 1;
+};
+
+int main(void)
+{
+ using namespace boost::afio;
+ auto dispatcher=make_dispatcher().get();
+ typedef chrono::duration<double, ratio<1, 1>> secs_type;
+ auto begin=chrono::high_resolution_clock::now();
+ while(chrono::duration_cast<secs_type>(chrono::high_resolution_clock::now()-begin).count()<3);
+
+ std::vector<future<>> preconditions;
+ std::vector<std::function<int()>> callbacks(1, callback);
+ begin=chrono::high_resolution_clock::now();
+#pragma omp parallel for
+ for(int n=0; n<5000000; n++)
+ dispatcher->call(preconditions, callbacks);
+ while(dispatcher->wait_queue_depth())
+ this_thread::sleep_for(chrono::milliseconds(1));
+ auto end=chrono::high_resolution_clock::now();
+ auto diff=chrono::duration_cast<secs_type>(end-begin);
+ std::cout << "It took " << diff.count() << " secs to execute 5,000,000 closures which is " << (5000000/diff.count()) << " unchained closures/sec" << std::endl;
+ std::cout << "\nPress Return to exit ..." << std::endl;
+ getchar();
+ return 0;
+}
diff --git a/attic/example/call_example.cpp b/attic/example/call_example.cpp
new file mode 100644
index 00000000..d4a72642
--- /dev/null
+++ b/attic/example/call_example.cpp
@@ -0,0 +1,25 @@
+//#define BOOST_RESULT_OF_USE_DECLTYPE 1
+#include "afio_pch.hpp"
+
+int main(void)
+{
+ //[call_example
+ // Create a dispatcher instance
+ auto dispatcher=boost::afio::make_dispatcher().get();
+
+ // Schedule an asynchronous call of some function with some bound set of arguments
+ auto helloworld=dispatcher->call(boost::afio::future<>() /* no precondition */, [](std::string text) -> int {
+ std::cout << text << std::endl;
+ return 42;
+ }, std::string("Hello world"));
+
+ // Schedule as asynchronous call of some function to occur only after helloworld completes
+ auto addtovalue=dispatcher->call(helloworld, [&helloworld]() -> int {
+ return helloworld.get()+1;
+ });
+
+ // Print the result returned by the future for the lambda, which will be 43
+ std::cout << "addtovalue() returned " << addtovalue.get() << std::endl;
+ //]
+ return 0;
+}
diff --git a/attic/example/closure_execution_afio_io_example.cpp b/attic/example/closure_execution_afio_io_example.cpp
new file mode 100644
index 00000000..96ac4808
--- /dev/null
+++ b/attic/example/closure_execution_afio_io_example.cpp
@@ -0,0 +1,61 @@
+#include "afio_pch.hpp"
+
+//[closure_execution_afio_example
+#include <vector>
+
+int main()
+{
+ const int ary_size = 10;
+
+ //set up a file to read from
+ std::ofstream out_file("somefile.dat", std::ios::binary);
+ for (int i = 0; i < ary_size; ++i)
+ {
+ out_file.write(reinterpret_cast<const char*>(&i), sizeof(i));
+ }
+ out_file.close();
+
+
+ //set up the afio dispatcher
+ auto dispatcher = boost::afio::make_dispatcher().get();
+
+ //set up an array to hold our integers
+ int ary[ary_size];
+
+ //schedule the file open
+ auto opened_file = dispatcher->file(boost::afio::path_req("somefile.dat",
+ boost::afio::file_flags::read));
+
+ //set up vectors for the individual read operations, and the work on each integer
+ std::vector<boost::afio::future<>> read_ops(ary_size);
+ std::vector<std::function<void()>> vec_func(ary_size);
+ for (int i = 0; i < ary_size; ++i)
+ {
+ read_ops[i] = dispatcher->read(boost::afio::io_req<int>(opened_file,
+ &ary[i], sizeof(int), i*sizeof(int)));
+
+ vec_func[i] = std::bind([](int* a){ *a *= 2 ; }, &ary[i]);
+ }
+
+ // schedule the work to be done after reading in an integer
+ auto work = dispatcher->call(read_ops, vec_func);
+
+ //schedule the file to be closed after reads are finished
+ auto closed_file = dispatcher->close(dispatcher->barrier(read_ops).front());
+
+ // make sure work has completed before trying to print data from the array
+ boost::afio::when_all_p(work.begin(), work.end()).wait();
+
+ //verify the out put is as expected: "0, 2, 4, 6, 8, 10, 12, 14, 16, 18"
+ for (int i = 0; i < ary_size; ++i)
+ {
+ std::cout << ary[i];
+ if(i == ary_size-1)
+ std::cout << std::endl;
+ else
+ std::cout << ", ";
+ }
+
+ return 0;
+}
+//]
diff --git a/attic/example/closure_execution_traditional_io_example.cpp b/attic/example/closure_execution_traditional_io_example.cpp
new file mode 100644
index 00000000..62483edc
--- /dev/null
+++ b/attic/example/closure_execution_traditional_io_example.cpp
@@ -0,0 +1,53 @@
+#include "afio_pch.hpp"
+
+//[closure_execution_traditional_example
+#include <iostream>
+#include <fstream>
+
+int main()
+{
+
+ const int ary_size = 10;
+
+ // set up a file to read from
+ std::ofstream out_file("somefile.dat", std::ios::binary);
+ for (int i = 0; i < ary_size; ++i)
+ {
+ out_file.write(reinterpret_cast<const char*>(&i), sizeof(i));
+ }
+ out_file.close();
+
+ //setup an array of integers
+ int ary[ary_size];
+ //file open
+ std::ifstream file("somefile.dat");
+
+ //read in ints to ary
+ if (file)
+ {
+ for (int i = 0; i < ary_size; ++i)
+ {
+ file.read((char*) &ary[i], sizeof(ary[i]));
+ }
+ //close file
+ file.close();
+
+
+ //do some work with the array of ints
+ for (int i = 0; i < ary_size; ++i)
+ ary[i] *= 2;
+ }
+
+ //verify the out put is as expected: "0, 2, 4, 6, 8, 10, 12, 14, 16, 18"
+ for (int i = 0; i < ary_size; ++i)
+ {
+ std::cout << ary[i];
+ if(i == ary_size-1)
+ std::cout << std::endl;
+ else
+ std::cout << ", ";
+ }
+
+ return 0;
+}
+//]
diff --git a/attic/example/completion_example1.cpp b/attic/example/completion_example1.cpp
new file mode 100644
index 00000000..706a5e44
--- /dev/null
+++ b/attic/example/completion_example1.cpp
@@ -0,0 +1,58 @@
+//#define BOOST_RESULT_OF_USE_DECLTYPE 1
+#include "afio_pch.hpp"
+
+int main(void)
+{
+ //[completion_example1
+ // Create a dispatcher instance
+ std::shared_ptr<boost::afio::dispatcher> dispatcher=
+ boost::afio::make_dispatcher().get();
+
+ // Completion handlers are the lowest level completion routine available, and therefore the least
+ // overhead but at the cost of considerable extra programmer effort. You almost certainly want
+ // to use the call() member function instead.
+
+ // First create some callable entity ...
+ auto completer=[](
+ /* These are always the standard parameters */
+ size_t id, boost::afio::future<> precondition,
+ /* From now on user defined parameters */
+ std::string text)
+ /* This is always the return type */
+ -> std::pair<bool, std::shared_ptr<boost::afio::handle>>
+ {
+ /* id is the unique, non-zero integer id of this op.
+ precondition is the op you supplied as precondition. As it will by definition
+ have completed by now, you can fetch from its h member variable a shared pointer
+ to the shared stl_future containing either the output handle or the error state.
+ */
+ std::cout << text << std::endl;
+
+ // Return whether this completion has completed now or is it deferred,
+ // along with the handle we pass onto any completions completing on this op
+ // Note that op.get() by default rethrows any exception contained by the op.
+ // Normally this is highly desirable.
+ return std::make_pair(true, precondition.get_handle());
+ };
+
+ // Bind any user defined parameters to create a proper boost::afio::dispatcher::completion_t
+ std::function<boost::afio::dispatcher::completion_t> boundf=
+ std::bind(completer,
+ /* The standard parameters */
+ std::placeholders::_1, std::placeholders::_2,
+ /* Any values for the user defined parameters. Remember ALWAYS to pass by value! */
+ std::string("Hello world"));
+
+ // Schedule an asynchronous call of the completion with some bound set of arguments
+ boost::afio::future<> helloworld=
+ dispatcher->completion(boost::afio::future<>() /* no precondition */,
+ std::make_pair(boost::afio::async_op_flags::none, boundf));
+
+ // Create a boost::stl_future<> representing the ops passed to when_all_p()
+ boost::afio::stl_future<std::vector<std::shared_ptr<boost::afio::handle>>> stl_future
+ =boost::afio::when_all_p(helloworld);
+ // ... and wait for it to complete
+ stl_future.wait();
+ //]
+ return 0;
+}
diff --git a/attic/example/completion_example2.cpp b/attic/example/completion_example2.cpp
new file mode 100644
index 00000000..513eea77
--- /dev/null
+++ b/attic/example/completion_example2.cpp
@@ -0,0 +1,77 @@
+//#define BOOST_RESULT_OF_USE_DECLTYPE 1
+#include "afio_pch.hpp"
+
+int main(void)
+{
+ //[completion_example2
+ // Create a dispatcher instance
+ std::shared_ptr<boost::afio::dispatcher> dispatcher=
+ boost::afio::make_dispatcher().get();
+
+ // One thing direct programming of completion handlers can do which call() cannot is immediate
+ // completions. These run immediately after the precondition finishes by the thread worker
+ // which executed the precondition rather than being appended to the FIFO queue. This can
+ // be useful for ensuring data is still cache-local for example.
+
+ // Create the completion, using the standard form
+ auto completion=[](std::shared_ptr<boost::afio::dispatcher> dispatcher,
+ /* These are always the standard parameters */
+ size_t id, boost::afio::future<> precondition)
+ /* This is always the return type */
+ -> std::pair<bool, std::shared_ptr<boost::afio::handle>>
+ {
+ std::cout << "I am completion" << std::endl;
+
+ // Create some callable entity which will do the actual completion. It can be
+ // anything you like, but you need a minimum of its integer id.
+ auto completer=[](std::shared_ptr<boost::afio::dispatcher> dispatcher,
+ size_t id, boost::afio::future<> op) -> int
+ {
+ try
+ {
+ std::cout << "I am completer" << std::endl;
+
+ // Do stuff, returning the handle you want passed onto dependencies.
+ // Note that op.get() rethrows any exception in op. Normally you want this.
+ dispatcher->complete_async_op(id, op.get_handle());
+ }
+ catch(...)
+ {
+ // In non-deferred completions AFIO traps exceptions for you. Here, you must
+ // do it by hand and tell AFIO about what exception state to return.
+ boost::afio::exception_ptr e(boost::afio::current_exception());
+ dispatcher->complete_async_op(id, e);
+ }
+ return 0;
+ };
+ // Bind the id and handle to completer, and enqueue for later asynchronous execution.
+ std::async(std::launch::async, completer, dispatcher, id, precondition);
+
+ // Indicate we are not done yet
+ return std::make_pair(false, precondition.get_handle());
+ };
+
+ // Bind any user defined parameters to create a proper boost::afio::dispatcher::completion_t
+ std::function<boost::afio::dispatcher::completion_t> boundf=
+ std::bind(completion, dispatcher,
+ /* The standard parameters */
+ std::placeholders::_1, std::placeholders::_2);
+
+ // Schedule an asynchronous call of the completion
+ boost::afio::future<> op=
+ dispatcher->completion(boost::afio::future<>() /* no precondition */,
+ std::make_pair(
+ /* Complete boundf immediately after its precondition (in this
+ case as there is no precondition that means right now before
+ completion() returns) */
+ boost::afio::async_op_flags::immediate,
+ boundf));
+
+ // Create a boost::stl_future<> representing the ops passed to when_all_p()
+ boost::afio::stl_future<std::vector<std::shared_ptr<boost::afio::handle>>> stl_future
+ =boost::afio::when_all_p(op);
+ // ... and wait for it to complete
+ stl_future.wait();
+ //]
+ return 0;
+}
diff --git a/attic/example/determine_legal_filenames.cpp b/attic/example/determine_legal_filenames.cpp
new file mode 100644
index 00000000..667b3f36
--- /dev/null
+++ b/attic/example/determine_legal_filenames.cpp
@@ -0,0 +1,35 @@
+#include "afio_pch.hpp"
+
+int main(void)
+{
+ using namespace boost::afio;
+ auto dispatcher=boost::afio::make_dispatcher().get();
+
+ auto mkdir(dispatcher->dir(path_req("testdir", file_flags::create)));
+ try
+ {
+ for(size_t n=1; n<256; n++)
+ {
+ char cs[2]={ (char) n, 0 };
+ path p(cs);
+ try
+ {
+ auto mkfile(dispatcher->file(path_req::relative(mkdir, p, boost::afio::file_flags::create)));
+ mkfile.get();
+ auto rmfile(dispatcher->close(dispatcher->rmfile(mkfile)));
+ std::cout << "Character " << n << " (" << p << ") is permitted on this operating system." << std::endl;
+ }
+ catch(...)
+ {
+ std::cout << "Character " << n << " (" << p << ") failed on this operating system." << std::endl;
+ }
+ }
+ }
+ catch(...)
+ {
+ std::cerr << boost::current_exception_diagnostic_information(true) << std::endl;
+ throw;
+ }
+ auto rmdir(dispatcher->close(dispatcher->rmdir(mkdir)));
+ rmdir.get();
+}
diff --git a/attic/example/enumerate_example.cpp b/attic/example/enumerate_example.cpp
new file mode 100644
index 00000000..35bac7a5
--- /dev/null
+++ b/attic/example/enumerate_example.cpp
@@ -0,0 +1,43 @@
+#include "afio_pch.hpp"
+
+int main(void)
+{
+ //[enumerate_example
+ boost::afio::current_dispatcher_guard h(boost::afio::make_dispatcher().get());
+
+ // Schedule an opening of the root directory
+ boost::afio::future<> rootdir(boost::afio::async_dir("/"));
+
+ std::pair<std::vector<boost::afio::directory_entry>, bool> list;
+ // This is used to reset the enumeration to the start
+ bool restart=true;
+ do
+ {
+ // Schedule an enumeration of an open directory handle
+ boost::afio::future<std::pair<std::vector<boost::afio::directory_entry>, bool>>
+ enumeration(boost::afio::async_enumerate(rootdir,
+ /* This is the maximum entries to enumerate. Note
+ the use of compatibility_maximum() which is the
+ same value your libc uses. The problem with smaller
+ enumerations is that the directory contents can change
+ out from underneath you more frequently. */
+ boost::afio::directory_entry::compatibility_maximum(),
+ /* True if to reset enumeration */
+ restart));
+ restart=false;
+
+ // People using AFIO often forget that futures can be waited
+ // on normally without needing to wait on the op handle
+ list=enumeration.get();
+ for(boost::afio::directory_entry &i : list.first)
+ {
+#ifdef WIN32
+ std::wcout << i.name();
+#else
+ std::cout << i.name();
+#endif
+ std::cout << " type " << static_cast<int>(i.st_type()) << std::endl;
+ }
+ } while(list.second);
+ //]
+}
diff --git a/attic/example/filecopy_example.cpp b/attic/example/filecopy_example.cpp
new file mode 100644
index 00000000..e48a9c36
--- /dev/null
+++ b/attic/example/filecopy_example.cpp
@@ -0,0 +1,185 @@
+#include "afio_pch.hpp"
+
+static uint32_t crc32(const void* data, size_t length, uint32_t previousCrc32 = 0)
+{
+ const uint32_t Polynomial = 0xEDB88320;
+ uint32_t crc = ~previousCrc32;
+ unsigned char* current = (unsigned char*) data;
+ while (length--)
+ {
+ crc ^= *current++;
+ for (unsigned int j = 0; j < 8; j++)
+ crc = (crc >> 1) ^ (-int(crc & 1) & Polynomial);
+ }
+ return ~crc; // same as crc ^ 0xFFFFFFFF
+}
+
+//[filecopy_example
+namespace {
+ using namespace boost::afio;
+ using boost::afio::off_t;
+
+ // Keep memory buffers around
+ // A special allocator of highly efficient file i/o memory
+ typedef std::vector<char, utils::page_allocator<char>> file_buffer_type;
+ static std::vector<std::unique_ptr<file_buffer_type>> buffers;
+
+ // Parallel copy files in sources into dest, concatenating
+ stl_future<std::vector<handle_ptr>> async_concatenate_files(
+ atomic<off_t> &written, off_t &totalbytes,
+ dispatcher_ptr dispatcher,
+ boost::afio::filesystem::path dest, std::vector<boost::afio::filesystem::path> sources,
+ size_t chunk_size=1024*1024 /* 1Mb */)
+ {
+ // Schedule the opening of the output file for writing
+ auto oh = async_file(dest, file_flags::create | file_flags::write);
+ // Schedule the opening of all the input files for reading as a batch
+ std::vector<path_req> ihs_reqs; ihs_reqs.reserve(sources.size());
+ for(auto &&source : sources)
+ ihs_reqs.push_back(path_req(source, file_flags::read
+ |file_flags::will_be_sequentially_accessed));
+ auto ihs=dispatcher->file(ihs_reqs);
+ // Retrieve any error from opening the output
+ oh.get();
+ // Wait for the input file handles to open so we can get their sizes
+ // (plus any failures to open)
+ when_all_p(ihs).get();
+
+ // Need to figure out the sizes of the sources so we can resize output
+ // correctly. We also need to allocate scratch buffers for each source.
+ std::vector<std::tuple<off_t, off_t>> offsets;
+ offsets.reserve(ihs.size());
+ off_t offset=0, max_individual=0;
+ for(auto &ih : ihs)
+ {
+ // Get the file's size in bytes
+ off_t bytes=ih->direntry(metadata_flags::size).st_size();
+ if(bytes>max_individual) max_individual=bytes;
+ //std::cout << "File " << ih->path() << " size " << bytes << " to offset " << offset << std::endl;
+ // Push the offset to write at, amount to write, and a scratch buffer
+ offsets.push_back(std::make_tuple(offset, bytes));
+ buffers.push_back(detail::make_unique<file_buffer_type>(chunk_size));
+ offset+=bytes;
+ }
+ // Schedule resizing output to correct size, retrieving errors
+ totalbytes=offset;
+ auto ohresize=async_truncate(oh, offset);
+ when_all_p(ohresize).get();
+
+ // Schedule the parallel processing of all input files, sequential per file,
+ // but only after the output file has been resized
+ std::vector<future<>> lasts;
+ for(auto &i : ihs)
+ lasts.push_back(dispatcher->depends(ohresize, i));
+ for(off_t o=0; o<max_individual; o+=chunk_size)
+ {
+ for(size_t idx=0; idx<ihs_reqs.size(); idx++)
+ {
+ auto offset=std::get<0>(offsets[idx]), bytes=std::get<1>(offsets[idx]);
+ auto &buffer=buffers[idx];
+ if(o<bytes)
+ {
+ off_t thischunk=bytes-o;
+ if(thischunk>chunk_size) thischunk=chunk_size;
+ //std::cout << "Writing " << thischunk << " from offset " << o << " in " << lasts[idx]->path() << std::endl;
+ // Schedule a filling of buffer from offset o after last has completed
+ auto readchunk = async_read(lasts[idx], buffer->data(), (size_t)thischunk, o);
+ // Schedule a writing of buffer to offset offset+o after readchunk is ready
+ // Note the call to dispatcher->depends() to make sure the write only occurs
+ // after the read completes
+ auto writechunk = async_write(depends(readchunk, ohresize), buffer->data(), (size_t)thischunk, offset + o);
+ // Schedule incrementing written after write has completed
+ auto incwritten = writechunk.then([&written, thischunk](future<> f) {
+ written += thischunk;
+ return f;
+ });
+ // Don't do next read until written is incremented
+ lasts[idx]=dispatcher->depends(incwritten, readchunk);
+ }
+ }
+ }
+ // Having scheduled all the reads and write, return a stl_future which returns when
+ // they're done
+ return when_all_p(lasts);
+ }
+}
+
+int main(int argc, const char *argv[])
+{
+ using namespace boost::afio;
+ using boost::afio::off_t;
+ typedef chrono::duration<double, ratio<1, 1>> secs_type;
+ if(argc<3)
+ {
+ std::cerr << "ERROR: Need to specify destination path and source paths"
+ << std::endl;
+ return 1;
+ }
+ try
+ {
+ atomic<off_t> written(0);
+ off_t totalbytes=0;
+ std::shared_ptr<boost::afio::dispatcher> dispatcher=
+ boost::afio::make_dispatcher().get();
+ // Set a dispatcher as current for this thread
+ boost::afio::current_dispatcher_guard guard(dispatcher);
+
+ boost::afio::filesystem::path dest=argv[1];
+ std::vector<boost::afio::filesystem::path> sources;
+ std::cout << "Concatenating into " << dest << " the files ";
+ for(int n=2; n<argc; ++n)
+ {
+ sources.push_back(argv[n]);
+ std::cout << sources.back();
+ if(n<argc-1) std::cout << ", ";
+ }
+ std::cout << " ..." << std::endl;
+
+ auto begin=chrono::steady_clock::now();
+ auto h=async_concatenate_files(written, totalbytes, dispatcher, dest, sources);
+ // Print progress once a second until it's done
+ while(future_status::timeout==h.wait_for(boost::afio::chrono::seconds(1)))
+ {
+ std::cout << "\r" << (100*written)/totalbytes << "% complete (" << written
+ << " out of " << totalbytes << " @ " << (written/chrono::duration_cast<secs_type>(
+ chrono::steady_clock::now()-begin).count()/1024/1024) << "Mb/sec) ..." << std::flush;
+ }
+ // Retrieve any errors
+ h.get();
+ std::cout << std::endl;
+ }
+ catch(...)
+ {
+ std::cerr << "ERROR: " << boost::current_exception_diagnostic_information(true)
+ << std::endl;
+ return 1;
+ }
+ // Make sure output really is input concatenated
+ std::cout << "CRC checking that the output exactly matches the inputs ..." << std::endl;
+ uint32_t crc1 = 0, crc2 = 0;
+ off_t offset = 0;
+ std::ifstream o(argv[1], std::ifstream::in);
+ for (int n = 2; n < argc; n++)
+ {
+ std::ifstream i(argv[n], std::ifstream::in);
+ i.seekg(0, i.end);
+ std::vector<char> buffer1((size_t)i.tellg()), buffer2((size_t)i.tellg());
+ i.seekg(0, i.beg);
+ o.read(buffer1.data(), buffer1.size());
+ i.read(buffer2.data(), buffer2.size());
+ bool quiet = false;
+ for (size_t n = 0; n<buffer1.size(); n += 64)
+ {
+ crc1 = crc32(buffer1.data() + n, 64, crc1);
+ crc2 = crc32(buffer2.data() + n, 64, crc2);
+ if (crc1 != crc2 && !quiet)
+ {
+ std::cerr << "ERROR: Offset " << offset+n << " not copied correctly!" << std::endl;
+ quiet = true;
+ }
+ }
+ offset += buffer1.size();
+ }
+ return 0;
+}
+//]
diff --git a/attic/example/filedir_example.cpp b/attic/example/filedir_example.cpp
new file mode 100644
index 00000000..45439ed5
--- /dev/null
+++ b/attic/example/filedir_example.cpp
@@ -0,0 +1,74 @@
+#include "afio_pch.hpp"
+
+int main(void)
+{
+ //[filedir_example
+ using namespace BOOST_AFIO_V2_NAMESPACE;
+ using BOOST_AFIO_V2_NAMESPACE::rmdir;
+ std::shared_ptr<boost::afio::dispatcher> dispatcher =
+ boost::afio::make_dispatcher().get();
+ current_dispatcher_guard h(dispatcher);
+
+ // Free function
+ try
+ {
+ // Schedule creating a directory called testdir
+ auto mkdir(async_dir("testdir", boost::afio::file_flags::create));
+ // Schedule creating a file called testfile in testdir only when testdir has been created
+ auto mkfile(async_file(mkdir, "testfile", boost::afio::file_flags::create));
+ // Schedule creating a symbolic link called linktodir to the item referred to by the precondition
+ // i.e. testdir. Note that on Windows you can only symbolic link directories.
+ auto mklink(async_symlink(mkdir, "linktodir", mkdir, boost::afio::file_flags::create));
+
+ // Schedule deleting the symbolic link only after when it has been created
+ auto rmlink(async_rmsymlink(mklink));
+ // Schedule deleting the file only after when it has been created
+ auto rmfile(async_close(async_rmfile(mkfile)));
+ // Schedule waiting until both the preceding operations have finished
+ auto barrier(dispatcher->barrier({ rmlink, rmfile }));
+ // Schedule deleting the directory only after the barrier completes
+ auto rmdir(async_rmdir(depends(barrier.front(), mkdir)));
+ // Check ops for errors
+ boost::afio::when_all_p(mkdir, mkfile, mklink, rmlink, rmfile, rmdir).wait();
+ }
+ catch (...)
+ {
+ std::cerr << boost::current_exception_diagnostic_information(true) << std::endl;
+ throw;
+ }
+
+ // Batch
+ try
+ {
+ // Schedule creating a directory called testdir
+ auto mkdir(dispatcher->dir(std::vector<boost::afio::path_req>(1,
+ boost::afio::path_req("testdir", boost::afio::file_flags::create))).front());
+ // Schedule creating a file called testfile in testdir only when testdir has been created
+ auto mkfile(dispatcher->file(std::vector<boost::afio::path_req>(1,
+ boost::afio::path_req::relative(mkdir, "testfile",
+ boost::afio::file_flags::create))).front());
+ // Schedule creating a symbolic link called linktodir to the item referred to by the precondition
+ // i.e. testdir. Note that on Windows you can only symbolic link directories. Note that creating
+ // symlinks must *always* be as an absolute path, as that is how they are stored.
+ auto mklink(dispatcher->symlink(std::vector<boost::afio::path_req>(1,
+ boost::afio::path_req::absolute(mkdir, "testdir/linktodir",
+ boost::afio::file_flags::create))).front());
+
+ // Schedule deleting the symbolic link only after when it has been created
+ auto rmlink(dispatcher->close(std::vector<future<>>(1, dispatcher->rmsymlink(mklink))).front());
+ // Schedule deleting the file only after when it has been created
+ auto rmfile(dispatcher->close(std::vector<future<>>(1, dispatcher->rmfile(mkfile))).front());
+ // Schedule waiting until both the preceding operations have finished
+ auto barrier(dispatcher->barrier({rmlink, rmfile}));
+ // Schedule deleting the directory only after the barrier completes
+ auto rmdir(dispatcher->rmdir(std::vector<path_req>(1, dispatcher->depends(barrier.front(), mkdir))).front());
+ // Check ops for errors
+ boost::afio::when_all_p(mkdir, mkfile, mklink, rmlink, rmfile, rmdir).wait();
+ }
+ catch(...)
+ {
+ std::cerr << boost::current_exception_diagnostic_information(true) << std::endl;
+ throw;
+ }
+ //]
+}
diff --git a/attic/example/filter_example.cpp b/attic/example/filter_example.cpp
new file mode 100644
index 00000000..464f47e8
--- /dev/null
+++ b/attic/example/filter_example.cpp
@@ -0,0 +1,36 @@
+#include "afio_pch.hpp"
+
+//[filter_example
+using namespace boost::afio;
+
+// This function will be called for every file opened
+static void open_file_filter(detail::OpType, future<> &op) noexcept
+{
+ std::cout << "File handle " << op->native_handle() << " opened!" << std::endl;
+}
+
+// This function will be called for every read and write performed
+static void readwrite_filter(detail::OpType optype, handle *h,
+ const detail::io_req_impl<true> &req, boost::afio::off_t offset, size_t buffer_idx,
+ size_t buffers, const boost::system::error_code &ec, size_t bytes_transferred)
+{
+ std::cout << "File handle " << h->native_handle() << (detail::OpType::read==optype ?
+ " read " : " wrote ") << bytes_transferred << " bytes at offset " << offset << std::endl;
+}
+
+int main(void)
+{
+ dispatcher_ptr dispatcher=
+ make_dispatcher().get();
+
+ // Install filters BEFORE scheduling any ops as the filter APIs are NOT
+ // threadsafe. This filters all file opens.
+ dispatcher->post_op_filter({ std::make_pair(detail::OpType::file /* just file opens */,
+ std::function<dispatcher::filter_t>(open_file_filter)) });
+
+ // This filters all reads and writes
+ dispatcher->post_readwrite_filter({ std::make_pair(detail::OpType::Unknown /* all */,
+ std::function<dispatcher::filter_readwrite_t>(readwrite_filter)) });
+ return 0;
+}
+//]
diff --git a/attic/example/find_in_files_afio.cpp b/attic/example/find_in_files_afio.cpp
new file mode 100644
index 00000000..a1427fd2
--- /dev/null
+++ b/attic/example/find_in_files_afio.cpp
@@ -0,0 +1,351 @@
+#include "afio_pch.hpp"
+#include <deque>
+#include <regex>
+#include <chrono>
+#include <mutex>
+#include <future>
+#include <initializer_list>
+#include "boost/exception/diagnostic_information.hpp"
+#include "boost/../libs/afio/test/Aligned_Allocator.hpp"
+
+/* My Intel Core i7 3770K running Windows 8 x64 with 7200rpm drive, using
+Sysinternals RAMMap to clear disc cache (http://technet.microsoft.com/en-us/sysinternals/ff700229.aspx)
+
+Warm cache:
+92 files matched out of 36734 files which was 7031545071 bytes.
+The search took 2.5173 seconds which was 14592.6 files per second or 2663.89 Mb/sec.
+
+Cold cache:
+91 files matched out of 36422 files which was 6108537728 bytes.
+The search took 388.74 seconds which was 93.6925 files per second or 14.9857 Mb/sec.
+
+Warm cache mmaps:
+92 files matched out of 36734 files which was 7031400686 bytes.
+The search took 1.02974 seconds which was 35673.1 files per second or 6512.01 Mb/sec.
+
+Cold cache mmaps:
+91 files matched out of 36422 files which was 6109258426 bytes.
+The search took 242.76 seconds which was 150.033 files per second or 24 Mb/sec.
+*/
+
+#define USE_MMAPS
+
+//[find_in_files_afio
+using namespace boost::afio;
+
+// Often it's easiest for a lot of nesting callbacks to carry state via a this pointer
+class find_in_files
+{
+public:
+ std::promise<int> finished;
+ std::regex regexpr; // The precompiled regular expression
+ dispatcher_ptr dispatcher;
+ recursive_mutex opslock;
+ std::deque<future<>> ops; // For exception gathering
+ std::atomic<size_t> bytesread, filesread, filesmatched, scheduled, completed;
+ std::vector<std::pair<boost::afio::filesystem::path, size_t>> filepaths;
+
+ // Signals finish once all scheduled ops have completed
+ void docompleted(size_t inc)
+ {
+ size_t c=(completed+=inc);
+ if(c==scheduled)
+ finished.set_value(0);
+ };
+ // Adds ops to the list of scheduled
+ void doscheduled(std::initializer_list<future<>> list)
+ {
+ scheduled+=list.size();
+ //boost::lock_guard<decltype(opslock)> lock(opslock);
+ //ops.insert(ops.end(), list.begin(), list.end());
+ }
+ void doscheduled(std::vector<future<>> list)
+ {
+ scheduled+=list.size();
+ //boost::lock_guard<decltype(opslock)> lock(opslock);
+ //ops.insert(ops.end(), list.begin(), list.end());
+ }
+ void dosearch(handle_ptr h, const char *buffer, size_t length)
+ {
+ // Search the buffer for the regular expression
+ if(std::regex_search(buffer, regexpr))
+ {
+#pragma omp critical
+ {
+ std::cout << h->path() << std::endl;
+ }
+ ++filesmatched;
+ }
+ ++filesread;
+ bytesread+=length;
+ }
+ // A file searching completion, called when each file read completes
+ std::pair<bool, handle_ptr> file_read(size_t id,
+ future<> op, std::shared_ptr<std::vector<char,
+ detail::aligned_allocator<char, 4096, false>>> _buffer, size_t length)
+ {
+ handle_ptr h(op.get_handle());
+ //std::cout << "R " << h->path() << std::endl;
+ char *buffer=_buffer->data();
+ buffer[length]=0;
+ dosearch(h, buffer, length);
+ docompleted(2);
+ // Throw away the buffer now rather than later to keep memory consumption down
+ _buffer->clear();
+ return std::make_pair(true, h);
+ }
+ // A file reading completion, called when each file open completes
+ std::pair<bool, handle_ptr> file_opened(size_t id,
+ future<> op, size_t length)
+ {
+ handle_ptr h(op.get_handle());
+ //std::cout << "F " << h->path() << std::endl;
+ if (length)
+ {
+#ifdef USE_MMAPS
+ auto map(h->map_file());
+ if (map)
+ {
+ dosearch(h, (const char *)map->addr, length);
+ }
+ else
+#endif
+ {
+ // Allocate a sufficient 4Kb aligned buffer
+ size_t _length = (4095 + length)&~4095;
+ auto buffer = std::make_shared < std::vector<char,
+ detail::aligned_allocator<char, 4096, false >> >(_length + 1);
+ // Schedule a read of the file
+ auto read = dispatcher->read(make_io_req(
+ dispatcher->op_from_scheduled_id(id), buffer->data(), _length, 0));
+ auto read_done = dispatcher->completion(read,
+ std::make_pair(async_op_flags::none/*regex search might be slow*/,
+ std::function<dispatcher::completion_t>(
+ std::bind(&find_in_files::file_read, this, std::placeholders::_1,
+ std::placeholders::_2, buffer, length))));
+ doscheduled({ read, read_done });
+ }
+ }
+ docompleted(2);
+ return std::make_pair(true, h);
+ }
+ // An enumeration parsing completion, called when each directory enumeration completes
+ std::pair<bool, handle_ptr> dir_enumerated(size_t id,
+ future<> op,
+ std::shared_ptr<future<std::pair<std::vector<directory_entry>, bool>>> listing)
+ {
+ handle_ptr h(op.get_handle());
+ future<> lastdir, thisop(dispatcher->op_from_scheduled_id(id));
+ // Get the entries from the ready stl_future
+ std::vector<directory_entry> entries(std::move(listing->get().first));
+ //std::cout << "E " << h->path() << std::endl;
+ // For each of the directories schedule an open and enumeration
+#if 0
+ // Algorithm 1
+ {
+ std::vector<path_req> dir_reqs; dir_reqs.reserve(entries.size());
+ for(auto &entry : entries)
+ {
+ if((entry.st_type()&S_IFDIR)==S_IFDIR)
+ {
+ dir_reqs.push_back(path_req(thisop, h->path()/entry.name()));
+ }
+ }
+ if(!dir_reqs.empty())
+ {
+ std::vector<std::pair<async_op_flags, std::function<dispatcher::completion_t>>> dir_openedfs(dir_reqs.size(), std::make_pair(async_op_flags::None, std::bind(&find_in_files::dir_opened, this, std::placeholders::_1, std::placeholders::_2)));
+ auto dir_opens=dispatcher->dir(dir_reqs);
+ doscheduled(dir_opens);
+ auto dir_openeds=dispatcher->completion(dir_opens, dir_openedfs);
+ doscheduled(dir_openeds);
+ // Hold back some of the concurrency
+ lastdir=dir_openeds.back();
+ }
+ }
+#else
+ // Algorithm 2
+ // The Windows NT kernel filing system driver gets upset with too much concurrency
+ // when used with OSDirect so throttle directory enumerations to enforce some depth first traversal.
+ {
+ std::pair<async_op_flags, std::function<dispatcher::completion_t>> dir_openedf=std::make_pair(async_op_flags::none, std::bind(&find_in_files::dir_opened, this,
+ std::placeholders::_1, std::placeholders::_2));
+ for(auto &entry : entries)
+ {
+ if(entry.st_type()==
+#ifdef BOOST_AFIO_USE_LEGACY_FILESYSTEM_SEMANTICS
+ boost::afio::filesystem::file_type::directory_file)
+#else
+ boost::afio::filesystem::file_type::directory)
+#endif
+ {
+ auto dir_open=dispatcher->dir(path_req::absolute(lastdir, h->path()/entry.name()));
+ auto dir_opened=dispatcher->completion(dir_open, dir_openedf);
+ doscheduled({ dir_open, dir_opened });
+ lastdir=dir_opened;
+ }
+ }
+ }
+#endif
+
+ // For each of the files schedule an open and search
+#if 0
+ // Algorithm 1
+ {
+ std::vector<path_req> file_reqs; file_reqs.reserve(entries.size());
+ std::vector<std::pair<async_op_flags, std::function<dispatcher::completion_t>>> file_openedfs; file_openedfs.reserve(entries.size());
+ for(auto &entry : entries)
+ {
+ if((entry.st_type()&S_IFREG)==S_IFREG)
+ {
+ size_t length=(size_t)entry.st_size();
+ if(length)
+ {
+ file_flags flags=file_flags::read;
+#ifdef USE_MMAPS
+ if(length>16384) flags=flags|file_flags::os_mmap;
+#endif
+ file_reqs.push_back(path_req(lastdir, h->path()/entry.name(), flags));
+ file_openedfs.push_back(std::make_pair(async_op_flags::None, std::bind(&find_in_files::file_opened, this, std::placeholders::_1, std::placeholders::_2, length)));
+ }
+ }
+ }
+ auto file_opens=dispatcher->file(file_reqs);
+ doscheduled(file_opens);
+ auto file_openeds=dispatcher->completion(file_opens, file_openedfs);
+ doscheduled(file_openeds);
+ }
+#else
+ // Algorithm 2
+ {
+ for(auto &entry : entries)
+ {
+ if(entry.st_type()==
+#ifdef BOOST_AFIO_USE_LEGACY_FILESYSTEM_SEMANTICS
+ boost::afio::filesystem::file_type::regular_file)
+#else
+ boost::afio::filesystem::file_type::regular)
+#endif
+ {
+ size_t length=(size_t)entry.st_size();
+ if(length)
+ {
+ file_flags flags=file_flags::read;
+ auto file_open=dispatcher->file(path_req::absolute(lastdir, h->path()/entry.name(), flags));
+ auto file_opened=dispatcher->completion(file_open,
+ std::make_pair(async_op_flags::none,
+ std::function<dispatcher::completion_t>(
+ std::bind(&find_in_files::file_opened, this,
+ std::placeholders::_1, std::placeholders::_2, length))));
+ doscheduled({ file_open, file_opened });
+ lastdir=file_opened;
+ }
+ }
+ }
+ }
+#endif
+ docompleted(2);
+ return std::make_pair(true, h);
+ }
+ // A directory enumerating completion, called once per directory open in the tree
+ std::pair<bool, handle_ptr> dir_opened(size_t id,
+ future<> op)
+ {
+ handle_ptr h(op.get_handle());
+ //std::cout << "D " << h->path() << std::endl;
+ // Now we have an open directory handle, schedule an enumeration
+ auto enumeration=dispatcher->enumerate(enumerate_req(
+ dispatcher->op_from_scheduled_id(id), metadata_flags::size, 1000));
+ future<> enumeration_op(enumeration);
+ auto listing=std::make_shared<future<std::pair<std::vector<directory_entry>,
+ bool>>>(std::move(enumeration));
+ auto enumeration_done=dispatcher->completion(enumeration_op,
+ make_pair(async_op_flags::none,
+ std::function<dispatcher::completion_t>(
+ std::bind(&find_in_files::dir_enumerated, this,
+ std::placeholders::_1, std::placeholders::_2, listing))));
+ doscheduled({enumeration, enumeration_done});
+ docompleted(2);
+ // Complete only if not the cur dir opened
+ return std::make_pair(true, h);
+ };
+ void dowait()
+ {
+ // Prepare finished
+ auto finished_waiter=finished.get_future();
+#if 0 // Disabled to maximise performance
+ // Wait till done, retrieving any exceptions as they come in to keep memory consumption down
+ std::future_status status;
+ do
+ {
+ status=finished_waiter.wait_for(std::chrono::milliseconds(1000));
+ std::cout << "\nDispatcher has " << dispatcher->count() << " fds open and " << dispatcher->wait_queue_depth() << " ops outstanding." << std::endl;
+ std::vector<future<>> batch; batch.reserve(10000);
+ {
+ boost::lock_guard<decltype(opslock)> lock(opslock);
+ while(status==std::future_status::timeout ? (ops.size()>5000) : !ops.empty())
+ {
+ batch.push_back(std::move(ops.front()));
+ ops.pop_front();
+ }
+ }
+ // Retrieve any exceptions
+ std::cout << "Processed " << batch.size() << " ops for exception state." << std::endl;
+ if(!batch.empty())
+ when_all_p(batch.begin(), batch.end()).wait();
+ } while(status==std::future_status::timeout);
+#else
+ finished_waiter.wait();
+#endif
+ }
+ // Constructor, which starts the ball rolling
+ find_in_files(const char *_regexpr) : regexpr(_regexpr),
+ // Create an AFIO dispatcher that bypasses any filing system buffers
+ dispatcher(make_dispatcher("file:///", file_flags::will_be_sequentially_accessed/*|file_flags::os_direct*/).get()),
+ bytesread(0), filesread(0), filesmatched(0), scheduled(0), completed(0)
+ {
+ filepaths.reserve(50000);
+
+ // Schedule the recursive enumeration of the current directory
+ std::cout << "\n\nStarting directory enumerations ..." << std::endl;
+ auto cur_dir=dispatcher->dir(path_req(""));
+ auto cur_dir_opened=dispatcher->completion(cur_dir, std::make_pair(async_op_flags::none,
+ std::function<dispatcher::completion_t>(
+ std::bind(&find_in_files::dir_opened, this,
+ std::placeholders::_1, std::placeholders::_2))));
+ doscheduled({cur_dir, cur_dir_opened});
+ dowait();
+ }
+};
+
+int main(int argc, const char *argv[])
+{
+ using std::placeholders::_1; using std::placeholders::_2;
+ using namespace boost::afio;
+ typedef chrono::duration<double, ratio<1, 1>> secs_type;
+ if(argc<2)
+ {
+ std::cerr << "ERROR: Specify a regular expression to search all files in the current directory." << std::endl;
+ return 1;
+ }
+ // Prime SpeedStep
+ auto begin=chrono::high_resolution_clock::now();
+ while(chrono::duration_cast<secs_type>(chrono::high_resolution_clock::now()-begin).count()<1);
+ try
+ {
+ begin=chrono::high_resolution_clock::now();
+ find_in_files finder(argv[1]);
+ auto end=chrono::high_resolution_clock::now();
+ auto diff=chrono::duration_cast<secs_type>(end-begin);
+ std::cout << "\n" << finder.filesmatched << " files matched out of " << finder.filesread
+ << " files which was " << finder.bytesread << " bytes." << std::endl;
+ std::cout << "The search took " << diff.count() << " seconds which was " << finder.filesread/diff.count()
+ << " files per second or " << (finder.bytesread/diff.count()/1024/1024) << " Mb/sec." << std::endl;
+ }
+ catch(...)
+ {
+ std::cerr << boost::current_exception_diagnostic_information(true) << std::endl;
+ return 1;
+ }
+ return 0;
+}
+//]
diff --git a/attic/example/find_in_files_iostreams.cpp b/attic/example/find_in_files_iostreams.cpp
new file mode 100644
index 00000000..0114ec57
--- /dev/null
+++ b/attic/example/find_in_files_iostreams.cpp
@@ -0,0 +1,109 @@
+#include "afio_pch.hpp"
+#include <iostream>
+#include <fstream>
+#include <regex>
+#include <chrono>
+#if BOOST_AFIO_USE_BOOST_FILESYSTEM
+#include "boost/filesystem/fstream.hpp"
+#endif
+
+/* My Intel Core i7 3770K running Windows 8 x64 with 7200rpm drive, using
+Sysinternals RAMMap to clear disc cache (http://technet.microsoft.com/en-us/sysinternals/ff700229.aspx)
+
+Single threaded, warm cache:
+92 files matched out of 39279 files which was 7093894518 bytes.
+The search took 8.32834 seconds which was 4716.3 files per second or 812.318 Mb/sec.
+
+Single threaded, cold cache:
+91 files matched out of 38967 files which was 6170927489 bytes.
+The search took 369.046 seconds which was 105.588 files per second or 15.9467 Mb/sec.
+
+OpenMP, warm cache:
+92 files matched out of 38943 files which was 7092655881 bytes.
+The search took 3.73611 seconds which was 10423.4 files per second or 1810.46 Mb/sec.
+
+OpenMP, cold cache:
+91 files matched out of 38886 files which was 6170656567 bytes.
+The search took 741.131 seconds which was 52.4684 files per second or 7.94029 Mb/sec.
+*/
+
+//[find_in_files_iostreams
+int main(int argc, const char *argv[])
+{
+ using namespace std;
+ namespace filesystem = boost::afio::filesystem;
+#if BOOST_AFIO_USE_BOOST_FILESYSTEM
+ using boost::filesystem::ifstream;
+#endif
+ typedef chrono::duration<double, ratio<1, 1>> secs_type;
+ if(argc<2)
+ {
+ cerr << "ERROR: Specify a regular expression to search all files in the current directory." << endl;
+ return 1;
+ }
+ // Prime SpeedStep
+ auto begin=chrono::high_resolution_clock::now();
+ while(chrono::duration_cast<secs_type>(chrono::high_resolution_clock::now()-begin).count()<1);
+ size_t bytesread=0, filesread=0, filesmatched=0;
+ try
+ {
+ begin=chrono::high_resolution_clock::now();
+
+ // Generate a list of all files here and below the current working directory
+ vector<filesystem::path> filepaths;
+ for(auto it=filesystem::recursive_directory_iterator("."); it!=filesystem::recursive_directory_iterator(); ++it)
+ {
+ if(it->status().type()!=
+#ifdef BOOST_AFIO_USE_LEGACY_FILESYSTEM_SEMANTICS
+ filesystem::file_type::regular_file)
+#else
+ filesystem::file_type::regular)
+#endif
+ continue;
+ filepaths.push_back(it->path());
+ }
+
+ // Compile the regular expression, and have OpenMP parallelise the loop
+ regex regexpr(argv[1]);
+#pragma omp parallel for schedule(dynamic)
+ for(int n=0; n<(int) filepaths.size(); n++)
+ {
+ // Open the file
+ ifstream s(filepaths[n], ifstream::binary);
+ s.exceptions(fstream::failbit | fstream::badbit); // Turn on exception throwing
+ // Get its length
+ s.seekg(0, ios::end);
+ size_t length=(size_t) s.tellg();
+ s.seekg(0, ios::beg);
+ // Allocate a sufficient buffer, avoiding the byte clearing vector would do
+ unique_ptr<char[]> buffer(new char[length+1]);
+ // Read in the file, terminating with zero
+ s.read(buffer.get(), length);
+ buffer.get()[length]=0;
+ // Search the buffer for the regular expression
+ if(regex_search(buffer.get(), regexpr))
+ {
+#pragma omp critical
+ {
+ cout << filepaths[n] << endl;
+ }
+ filesmatched++;
+ }
+ filesread++;
+ bytesread+=length;
+ }
+ auto end=chrono::high_resolution_clock::now();
+ auto diff=chrono::duration_cast<secs_type>(end-begin);
+ cout << "\n" << filesmatched << " files matched out of " << filesread << " files which was "
+ << bytesread << " bytes." << endl;
+ cout << "The search took " << diff.count() << " seconds which was " << filesread/diff.count()
+ << " files per second or " << (bytesread/diff.count()/1024/1024) << " Mb/sec." << endl;
+ }
+ catch(...)
+ {
+ cerr << boost::current_exception_diagnostic_information(true) << endl;
+ return 1;
+ }
+ return 0;
+}
+//]
diff --git a/attic/example/readallof_example.cpp b/attic/example/readallof_example.cpp
new file mode 100644
index 00000000..c7882c88
--- /dev/null
+++ b/attic/example/readallof_example.cpp
@@ -0,0 +1,54 @@
+#include "afio_pch.hpp"
+
+int main(void)
+{
+ using namespace boost::afio;
+ auto dispatcher=make_dispatcher().get();
+ current_dispatcher_guard h(dispatcher);
+
+{
+ //[readallof_example_bad
+ char input[1024];
+ auto file_opened = file("foo.txt");
+ read(file_opened, input, 0);
+ //]
+}
+
+{
+ //[readallof_example_many
+ char input[1024];
+ // Schedule enumerating the containing directory, but only for foo.txt
+ auto dir_opened = async_dir(""); // "" means current directory in AFIO
+ auto file_enumed = async_enumerate(dir_opened, metadata_flags::size,
+ 2, true, "foo.txt");
+ // Schedule in parallel opening the file
+ auto file_opened = async_file(dir_opened, "foo.txt");
+ auto file_read = file_enumed.then(
+ [&input](future<std::pair<std::vector<directory_entry>, bool>> &f) {
+ // Get the directory_entry for the first result
+ directory_entry &de = f.get().first.front();
+ // Schedule a file read once we know the file size
+ return async_read(f, (void *)input,
+ (size_t)de.st_size(), // won't block
+ 0);
+ });
+ file_read.get();
+ //]
+}
+
+{
+ //[readallof_example_single
+ char input[1024];
+ // Open the file synchronously
+ auto file_opened = file("foo.txt");
+ // Fetch ONLY the size metadata. Blocks because it's synchronous!
+ directory_entry de = file_opened->direntry(metadata_flags::size);
+ // Read now we know the file size
+ read(file_opened,
+ (void *) input,
+ (size_t) de.st_size(), // doesn't block, as size was fetched before.
+ 0);
+ //]
+}
+ return 0;
+}
diff --git a/attic/example/readwrite_example.cpp b/attic/example/readwrite_example.cpp
new file mode 100644
index 00000000..5a94c561
--- /dev/null
+++ b/attic/example/readwrite_example.cpp
@@ -0,0 +1,78 @@
+#include "afio_pch.hpp"
+
+int main(void)
+{
+ try
+ {
+ //[readwrite_example
+ namespace afio = BOOST_AFIO_V2_NAMESPACE;
+ namespace asio = BOOST_AFIO_V2_NAMESPACE::asio;
+
+ // Set a dispatcher as current for this thread
+ afio::current_dispatcher_guard h(afio::make_dispatcher().get());
+
+ // Schedule an opening of a file called example_file.txt
+ afio::future<> openfile = afio::async_file("example_file.txt",
+ afio::file_flags::create | afio::file_flags::read_write);
+
+ // Something a bit surprising for many people is that writing off
+ // the end of a file in AFIO does NOT extend the file and writes
+ // which go past the end will simply fail instead. Why not?
+ // Simple: that's the convention with async file i/o, because
+ // synchronising multiple processes concurrently adjusting a
+ // file's length has significant overhead which is wasted if you
+ // don't need that functionality. Luckily, there is an easy
+ // workaround: either open a file for append-only access, in which
+ // case all writes extend the file for you, or else you explicitly
+ // extend files before writing, like this:
+ afio::future<> resizedfile = afio::async_truncate(openfile, 12);
+
+ // Config a write gather. You could do this of course as a batch
+ // of writes, but a write gather has optimised host OS support in most
+ // cases, so it's one syscall instead of many.
+ std::vector<asio::const_buffer> buffers;
+ buffers.push_back(asio::const_buffer("He", 2));
+ buffers.push_back(asio::const_buffer("ll", 2));
+ buffers.push_back(asio::const_buffer("o ", 2));
+ buffers.push_back(asio::const_buffer("Wo", 2));
+ buffers.push_back(asio::const_buffer("rl", 2));
+ buffers.push_back(asio::const_buffer("d\n", 2));
+ // Schedule the write gather to offset zero after the resize file
+ afio::future<> written(afio::async_write(resizedfile, buffers, 0));
+
+ // Have the compiler config the exact same write gather as earlier for you
+ // The compiler assembles an identical sequence of ASIO write gather
+ // buffers for you
+ std::vector<std::string> buffers2={ "He", "ll", "o ", "Wo", "rl", "d\n" };
+ // Schedule this to occur after the previous write completes
+ afio::future<> written2(afio::async_write(written, buffers2, 0));
+
+ // Schedule making sure the previous batch has definitely reached physical storage
+ // This won't complete until the write is on disc
+ afio::future<> stored(afio::async_sync(written2));
+
+ // Schedule filling this array from the file. Note how convenient std::array
+ // is and completely replaces C style char buffer[bytes]
+ std::array<char, 12> buffer;
+ afio::future<> read(afio::async_read(stored, buffer, 0));
+
+ // Schedule the closing and deleting of example_file.txt after the contents read
+ afio::future<> deletedfile(afio::async_rmfile(afio::async_close(read)));
+
+ // Wait until the buffer has been filled, checking all steps for errors
+ afio::when_all_p(openfile, resizedfile, written, written2, stored, read).get(); /*< waits for file open, resize, write, sync and read to complete, throwing any exceptions encountered >*/
+
+ // There is actually a io_req<std::string> specialisation you
+ // can use to skip this bit by reading directly into a string ...
+ std::string contents(buffer.begin(), buffer.end());
+ std::cout << "Contents of file is '" << contents << "'" << std::endl;
+
+ // Check remaining ops for errors
+ deletedfile.get();
+ //]
+ }
+ catch(const BOOST_AFIO_V2_NAMESPACE::system_error &e) { std::cerr << "ERROR: program exits via system_error code " << e.code().value() << " (" << e.what() << ")" << std::endl; return 1; }
+ catch(const std::exception &e) { std::cerr << "ERROR: program exits via exception (" << e.what() << ")" << std::endl; return 1; }
+ catch(...) { std::cerr << "ERROR: program exits via " << boost::current_exception_diagnostic_information(true) << std::endl; return 1; }
+ return 0;
+}
diff --git a/attic/example/readwrite_example_traditional.cpp b/attic/example/readwrite_example_traditional.cpp
new file mode 100644
index 00000000..56f82113
--- /dev/null
+++ b/attic/example/readwrite_example_traditional.cpp
@@ -0,0 +1,46 @@
+#include "afio_pch.hpp"
+
+int main(void)
+{
+ //[readwrite_example_traditional
+
+ try
+ {
+ // Open a file called example_file.txt
+ std::fstream openfile("example_file.txt"); /*< opens file >*/
+ // Turn on exception throwing
+ openfile.exceptions(std::fstream::failbit | std::fstream::badbit);
+
+ // Do a write gather. STL iostreams will buffer the writes
+ // and coalesce them into a single syscall
+ openfile << "He"; /*< writes >*/
+ openfile << "ll";
+ openfile << "o ";
+ openfile << "Wo";
+ openfile << "rl";
+ openfile << "d\n";
+
+ // Make sure the previous batch has definitely reached physical storage
+ // This won't complete until the write is on disc
+ openfile.sync(); /*< syncs >*/
+
+ // Fill this array from the file
+ std::array<char, 12> buffer;
+ openfile.seekg(0, std::ios::beg);
+ openfile.read(buffer.data(), buffer.size()); /*< reads >*/
+
+ // Close the file and delete it
+ openfile.close(); /*< closes file >*/
+ boost::afio::filesystem::remove("example_file.txt"); /*< deletes file >*/
+
+ // Convert the read array into a string
+ std::string contents(buffer.begin(), buffer.end());
+ std::cout << "Contents of file is '" << contents << "'" << std::endl;
+ }
+ catch(...)
+ {
+ std::cerr << boost::current_exception_diagnostic_information(true) << std::endl;
+ throw;
+ }
+ //]
+}
diff --git a/attic/example/statfs_example.cpp b/attic/example/statfs_example.cpp
new file mode 100644
index 00000000..122c07e0
--- /dev/null
+++ b/attic/example/statfs_example.cpp
@@ -0,0 +1,18 @@
+#include "afio_pch.hpp"
+
+int main(void)
+{
+ //[statfs_example
+ boost::afio::current_dispatcher_guard h(boost::afio::make_dispatcher().get());
+
+ // Open the root directory
+ boost::afio::handle_ptr rootdir(boost::afio::dir("/"));
+
+ // Ask the filing system of the root directory how much free space there is
+ boost::afio::statfs_t statfs(boost::afio::statfs(rootdir,
+ boost::afio::fs_metadata_flags::bsize|boost::afio::fs_metadata_flags::bfree));
+
+ std::cout << "Your root filing system has "
+ << (statfs.f_bfree*statfs.f_bsize/1024.0/1024.0/1024.0) << " Gb free." << std::endl;
+ //]
+}
diff --git a/attic/example/workshop_atomic_updates_afio.cpp b/attic/example/workshop_atomic_updates_afio.cpp
new file mode 100644
index 00000000..f36ea1c5
--- /dev/null
+++ b/attic/example/workshop_atomic_updates_afio.cpp
@@ -0,0 +1,2 @@
+#include "afio_pch.hpp"
+#include "workshop_atomic_updates_afio.ipp"
diff --git a/attic/example/workshop_atomic_updates_afio.ipp b/attic/example/workshop_atomic_updates_afio.ipp
new file mode 100644
index 00000000..c4daafe4
--- /dev/null
+++ b/attic/example/workshop_atomic_updates_afio.ipp
@@ -0,0 +1,323 @@
+//[workshop_atomic_updates_afio_interface
+namespace afio = BOOST_AFIO_V2_NAMESPACE;
+namespace filesystem = BOOST_AFIO_V2_NAMESPACE::filesystem;
+using BOOST_OUTCOME_V1_NAMESPACE::lightweight_futures::shared_future;
+
+class data_store
+{
+ afio::dispatcher_ptr _dispatcher;
+ afio::handle_ptr _store;
+public:
+ // Type used for read streams
+ using istream = std::shared_ptr<std::istream>;
+ // Type used for write streams
+ using ostream = std::shared_ptr<std::ostream>;
+ // Type used for lookup
+ using lookup_result_type = shared_future<istream>;
+ // Type used for write
+ using write_result_type = shared_future<ostream>;
+
+ // Disposition flags
+ static constexpr size_t writeable = (1<<0);
+
+ // Open a data store at path
+ data_store(size_t flags = 0, afio::path path = "store");
+
+ // Look up item named name for reading, returning an istream for the item
+ shared_future<istream> lookup(std::string name) noexcept;
+ // Look up item named name for writing, returning an ostream for that item
+ shared_future<ostream> write(std::string name) noexcept;
+};
+//]
+
+//[workshop_atomic_updates_afio3]
+namespace asio = BOOST_AFIO_V2_NAMESPACE::asio;
+using BOOST_OUTCOME_V1_NAMESPACE::empty;
+using BOOST_AFIO_V2_NAMESPACE::error_code;
+using BOOST_AFIO_V2_NAMESPACE::generic_category;
+
+// A special allocator of highly efficient file i/o memory
+using file_buffer_type = std::vector<char, afio::utils::page_allocator<char>>;
+
+// An iostream which reads directly from a memory mapped AFIO file
+struct idirectstream : public std::istream
+{
+ struct directstreambuf : public std::streambuf
+ {
+ afio::handle_ptr h; // Holds the file open
+ std::shared_ptr<file_buffer_type> buffer;
+ afio::handle::mapped_file_ptr mfp;
+ // From a mmap
+ directstreambuf(afio::handle_ptr _h, afio::handle::mapped_file_ptr _mfp, size_t length) : h(std::move(_h)), mfp(std::move(_mfp))
+ {
+ // Set the get buffer this streambuf is to use
+ setg((char *) mfp->addr, (char *) mfp->addr, (char *) mfp->addr + length);
+ }
+ // From a malloc
+ directstreambuf(afio::handle_ptr _h, std::shared_ptr<file_buffer_type> _buffer, size_t length) : h(std::move(_h)), buffer(std::move(_buffer))
+ {
+ // Set the get buffer this streambuf is to use
+ setg(buffer->data(), buffer->data(), buffer->data() + length);
+ }
+ };
+ std::unique_ptr<directstreambuf> buf;
+ template<class U> idirectstream(afio::handle_ptr h, U &&buffer, size_t length) : std::istream(new directstreambuf(std::move(h), std::forward<U>(buffer), length)), buf(static_cast<directstreambuf *>(rdbuf()))
+ {
+ }
+ virtual ~idirectstream() override
+ {
+ // Reset the stream before deleting the buffer
+ rdbuf(nullptr);
+ }
+};
+
+// An iostream which writes to an AFIO file in 4Kb pages
+struct odirectstream : public std::ostream
+{
+ struct directstreambuf : public std::streambuf
+ {
+ using int_type = std::streambuf::int_type;
+ using traits_type = std::streambuf::traits_type;
+ afio::future<> lastwrite; // the last async write performed
+ afio::off_t offset; // offset of next write
+ file_buffer_type buffer; // a page size on this machine
+ file_buffer_type lastbuffer;
+ directstreambuf(afio::handle_ptr _h) : lastwrite(std::move(_h)), offset(0), buffer(afio::utils::page_sizes().front())
+ {
+ // Set the put buffer this streambuf is to use
+ setp(buffer.data(), buffer.data() + buffer.size());
+ }
+ virtual ~directstreambuf() override
+ {
+ try
+ {
+ // Flush buffers and wait until last write completes
+ // Schedule an asynchronous write of the buffer to storage
+ size_t thisbuffer = pptr() - pbase();
+ if(thisbuffer)
+ lastwrite = afio::async_write(afio::async_truncate(lastwrite, offset+thisbuffer), buffer.data(), thisbuffer, offset);
+ lastwrite.get();
+ // TODO: On Windows do I need to close and reopen the file to flush metadata before
+ // the rename or does the rename do it for me?
+ // Get handle to the parent directory
+ auto dirh(lastwrite->container());
+ // Atomically rename "tmpXXXXXXXXXXXXXXXX" to "0"
+ lastwrite->atomic_relink(afio::path_req::relative(dirh, "0"));
+#ifdef __linux__
+ // Journalled Linux filing systems don't need this, but if you enabled afio::file_flags::always_sync
+ // you might want to issue this too.
+ afio::sync(dirh);
+#endif
+ }
+ catch(...)
+ {
+ }
+ }
+ virtual int_type overflow(int_type c) override
+ {
+ size_t thisbuffer=pptr()-pbase();
+ if(thisbuffer>=buffer.size())
+ sync();
+ if(c!=traits_type::eof())
+ {
+ *pptr()=(char)c;
+ pbump(1);
+ return traits_type::to_int_type(c);
+ }
+ return traits_type::eof();
+ }
+ virtual int sync() override
+ {
+ // Wait for the last write to complete, propagating any exceptions
+ lastwrite.get();
+ size_t thisbuffer=pptr()-pbase();
+ if(thisbuffer > 0)
+ {
+ // Detach the current buffer and replace with a fresh one to allow the kernel to steal the page
+ lastbuffer=std::move(buffer);
+ buffer.resize(lastbuffer.size());
+ setp(buffer.data(), buffer.data() + buffer.size());
+ // Schedule an extension of physical storage by an extra page
+ lastwrite = afio::async_truncate(lastwrite, offset + thisbuffer);
+ // Schedule an asynchronous write of the buffer to storage
+ lastwrite=afio::async_write(lastwrite, lastbuffer.data(), thisbuffer, offset);
+ offset+=thisbuffer;
+ }
+ return 0;
+ }
+ };
+ std::unique_ptr<directstreambuf> buf;
+ odirectstream(afio::handle_ptr h) : std::ostream(new directstreambuf(std::move(h))), buf(static_cast<directstreambuf *>(rdbuf()))
+ {
+ }
+ virtual ~odirectstream() override
+ {
+ // Reset the stream before deleting the buffer
+ rdbuf(nullptr);
+ }
+};
+//]
+
+//[workshop_atomic_updates_afio1]
+namespace asio = BOOST_AFIO_V2_NAMESPACE::asio;
+using BOOST_OUTCOME_V1_NAMESPACE::empty;
+using BOOST_AFIO_V2_NAMESPACE::error_code;
+using BOOST_AFIO_V2_NAMESPACE::generic_category;
+
+static bool is_valid_name(const std::string &name) noexcept
+{
+ static const std::string banned("<>:\"/\\|?*\0", 10);
+ if(std::string::npos!=name.find_first_of(banned))
+ return false;
+ // No leading period
+ return name[0]!='.';
+}
+
+// Keep a cache of crypto strong random names
+static std::string random_name()
+{
+ static struct random_names_type
+ {
+ std::vector<std::string> names;
+ size_t idx;
+ random_names_type(size_t count) : names(count), idx(0)
+ {
+ for(size_t n=0; n<count; n++)
+ names[n]=afio::utils::random_string(16); // 128 bits
+ }
+ std::string get()
+ {
+ if(idx==names.size())
+ idx=0;
+ return names[idx++];
+ }
+ } random_names(10000);
+ return random_names.get();
+}
+
+data_store::data_store(size_t flags, afio::path path)
+{
+ // Make a dispatcher for the local filesystem URI, masking out write flags on all operations if not writeable
+ _dispatcher=afio::make_dispatcher("file:///", afio::file_flags::none, !(flags & writeable) ? afio::file_flags::write : afio::file_flags::none).get();
+ // Set the dispatcher for this thread, and open a handle to the store directory
+ afio::current_dispatcher_guard h(_dispatcher);
+ _store=afio::dir(std::move(path), afio::file_flags::create); // throws if there was an error
+ // Precalculate the cache of random names
+ random_name();
+}
+
+shared_future<data_store::istream> data_store::lookup(std::string name) noexcept
+{
+ if(!is_valid_name(name))
+ return error_code(EINVAL, generic_category());
+ try
+ {
+ name.append("/0");
+ // Schedule the opening of the file for reading
+ afio::future<> h(afio::async_file(_store, name, afio::file_flags::read));
+ // When it completes, call this continuation
+ return h.then([](afio::future<> &_h) -> shared_future<data_store::istream> {
+ // If file didn't open, return the error or exception immediately
+ BOOST_OUTCOME_PROPAGATE(_h);
+ size_t length=(size_t) _h->lstat(afio::metadata_flags::size).st_size;
+ // Is a memory map more appropriate?
+ if(length>=128*1024)
+ {
+ afio::handle::mapped_file_ptr mfp;
+ if((mfp=_h->map_file()))
+ {
+ data_store::istream ret(std::make_shared<idirectstream>(_h.get_handle(), std::move(mfp), length));
+ return ret;
+ }
+ }
+ // Schedule the reading of the file into a buffer
+ auto buffer=std::make_shared<file_buffer_type>(length);
+ afio::future<> h(afio::async_read(_h, buffer->data(), length, 0));
+ // When the read completes call this continuation
+ return h.then([buffer, length](const afio::future<> &h) -> shared_future<data_store::istream> {
+ // If read failed, return the error or exception immediately
+ BOOST_OUTCOME_PROPAGATE(h);
+ data_store::istream ret(std::make_shared<idirectstream>(h.get_handle(), buffer, length));
+ return ret;
+ });
+ });
+ }
+ catch(...)
+ {
+ return std::current_exception();
+ }
+}
+//]
+
+//[workshop_atomic_updates_afio2]
+shared_future<data_store::ostream> data_store::write(std::string name) noexcept
+{
+ if(!is_valid_name(name))
+ return error_code(EINVAL, generic_category());
+ try
+ {
+ // Schedule the opening of the directory
+ afio::future<> dirh(afio::async_dir(_store, name, afio::file_flags::create));
+#ifdef __linux__
+ // Flush metadata on Linux only. This will be a noop unless we created a new directory
+ // above, and if we don't flush the new key directory it and its contents may not appear
+ // in the store directory after a suddenly power loss, even if it and its contents are
+ // all on physical storage.
+ dirh.then([this](const afio::future<> &h) { async_sync(_store); });
+#endif
+ // Make a crypto strong random file name
+ std::string randomname("tmp");
+ randomname.append(random_name());
+ // Schedule the opening of the file for writing
+ afio::future<> h(afio::async_file(dirh, randomname, afio::file_flags::create | afio::file_flags::write
+ | afio::file_flags::hold_parent_open // handle should keep a handle_ptr to its parent dir
+ /*| afio::file_flags::always_sync*/ // writes don't complete until upon physical storage
+ ));
+ // When it completes, call this continuation
+ return h.then([](const afio::future<> &h) -> shared_future<data_store::ostream> {
+ // If file didn't open, return the error or exception immediately
+ BOOST_OUTCOME_PROPAGATE(h);
+ // Create an ostream which directly uses the file.
+ data_store::ostream ret(std::make_shared<odirectstream>(h.get_handle()));
+ return std::move(ret);
+ });
+ }
+ catch (...)
+ {
+ return std::current_exception();
+ }
+}
+//]
+
+int main(void)
+{
+ // To write a key-value item called "dog"
+ {
+ data_store ds;
+ auto dogh = ds.write("dog").get();
+ auto &dogs = *dogh;
+ dogs << "I am a dog";
+ }
+
+ // To retrieve a key-value item called "dog"
+ {
+ data_store ds;
+ auto dogh = ds.lookup("dog");
+ if (dogh.empty())
+ std::cerr << "No item called dog" << std::endl;
+ else if(dogh.has_error())
+ std::cerr << "ERROR: Looking up dog returned error " << dogh.get_error().message() << std::endl;
+ else if (dogh.has_exception())
+ {
+ std::cerr << "FATAL: Looking up dog threw exception" << std::endl;
+ std::terminate();
+ }
+ else
+ {
+ std::string buffer;
+ *dogh.get() >> buffer;
+ std::cout << "Item dog has value " << buffer << std::endl;
+ }
+ }
+ return 0;
+}
diff --git a/attic/example/workshop_benchmark.cpp b/attic/example/workshop_benchmark.cpp
new file mode 100644
index 00000000..e3f2d58d
--- /dev/null
+++ b/attic/example/workshop_benchmark.cpp
@@ -0,0 +1,151 @@
+#include "afio_pch.hpp"
+
+#ifdef _DEBUG
+#define ITEMS 64
+#else
+#define ITEMS 65536
+#endif
+#define PARALLELISM 256 // up to max fds on your platform
+
+namespace iostreams {
+#include "workshop_naive.ipp"
+}
+namespace naive {
+#include "workshop_naive_afio.ipp"
+}
+namespace atomic_updates {
+#include "workshop_atomic_updates_afio.ipp"
+}
+namespace final {
+#include "workshop_final_afio.ipp"
+#include "../detail/SpookyV2.cpp"
+}
+
+
+namespace filesystem = BOOST_AFIO_V2_NAMESPACE::filesystem;
+namespace chrono = BOOST_AFIO_V2_NAMESPACE::chrono;
+using BOOST_AFIO_V2_NAMESPACE::ratio;
+
+// From http://burtleburtle.net/bob/rand/smallprng.html
+typedef unsigned int u4;
+typedef struct ranctx { u4 a; u4 b; u4 c; u4 d; } ranctx;
+
+#define rot(x,k) (((x)<<(k))|((x)>>(32-(k))))
+static u4 ranval(ranctx *x) {
+ u4 e = x->a - rot(x->b, 27);
+ x->a = x->b ^ rot(x->c, 17);
+ x->b = x->c + x->d;
+ x->c = x->d + e;
+ x->d = e + x->a;
+ return x->d;
+}
+
+static void raninit(ranctx *x, u4 seed) {
+ u4 i;
+ x->a = 0xf1ea5eed, x->b = x->c = x->d = seed;
+ for (i = 0; i < 20; ++i) {
+ (void) ranval(x);
+ }
+}
+
+static std::vector<std::pair<ranctx, unsigned>> items;
+
+void prepare()
+{
+ ranctx gen;
+ raninit(&gen, 0x78adbcff);
+ items.clear();
+ items.reserve(ITEMS);
+ for (size_t n = 0; n < ITEMS; n++)
+ {
+ unsigned t=ranval(&gen);
+ items.push_back(std::make_pair(gen, t));
+ }
+}
+
+template<class data_store> void benchmark(const char *filename, const char *desc, bool parallel_writes)
+{
+ typedef chrono::duration<double, ratio<1, 1>> secs_type;
+ std::vector<std::tuple<size_t, double, double>> results;
+ prepare();
+ for(size_t n=1; n<=ITEMS; n<<=1)
+ {
+ std::cout << "Benchmarking " << desc << " insertion of " << n << " records ... ";
+ if(filesystem::exists("store"))
+ filesystem::remove_all("store");
+ auto begin=chrono::high_resolution_clock::now();
+ while(chrono::duration_cast<secs_type>(chrono::high_resolution_clock::now()-begin).count()<1);
+ data_store store(data_store::writeable);
+ size_t write_parallelism = parallel_writes ? PARALLELISM : 1;
+ std::vector<typename data_store::write_result_type> ops(write_parallelism);
+ begin=chrono::high_resolution_clock::now();
+ for (size_t m = 0; m < n; m += write_parallelism)
+ {
+ int todo = (int)(n < write_parallelism ? n : write_parallelism);
+#pragma omp parallel for
+ for (int o = 0; o < todo; o++)
+ ops[o] = store.write(std::to_string(m + (size_t)o));
+#pragma omp parallel for
+ for (int o = 0; o < todo; o++)
+ {
+ *ops[o].get() << items[m + (size_t)o].second;
+ ops[o].get()->flush();
+ }
+ // Some dumping of the writes should have happened by now
+#pragma omp parallel for
+ for (int o = 0; o < todo; o++)
+ ops[o]=typename data_store::write_result_type();
+ }
+ auto end=chrono::high_resolution_clock::now();
+ auto diff=chrono::duration_cast<secs_type>(end-begin).count();
+ std::cout << (n/diff) << " items/sec" << std::endl;
+ results.push_back(std::make_tuple(n, n/diff, 0));
+ }
+ auto resultsit = results.begin();
+ for (size_t n = 1; n <= ITEMS; n <<= 1)
+ {
+ std::cout << "Benchmarking " << desc << " lookup of " << n << " records ... ";
+ data_store store;
+ std::vector<typename data_store::lookup_result_type> ops(PARALLELISM);
+ auto begin = chrono::high_resolution_clock::now();
+ for (size_t m = 0; m < n; m += PARALLELISM)
+ {
+ int todo = n < PARALLELISM ? n : PARALLELISM;
+#pragma omp parallel for
+ for (int o = 0; o < todo; o++)
+ ops[o] = store.lookup(std::to_string(m + (size_t) o));
+ // Some readahead should have happened by now
+#pragma omp parallel for
+ for (int o = 0; o < todo; o++)
+ {
+ auto s(ops[o].get());
+ unsigned t;
+ *s >> t;
+ if (t != items[m+(size_t) o].second)
+ std::cerr << "ERROR: Item " << m << " has incorrect contents!" << std::endl;
+ ops[o]=typename data_store::lookup_result_type();
+ }
+ }
+ auto end = chrono::high_resolution_clock::now();
+ auto diff = chrono::duration_cast<secs_type>(end - begin).count();
+ std::cout << (n / diff) << " items/sec" << std::endl;
+ std::get<2>(*resultsit++) = n / diff;
+ }
+ std::ofstream csvh(filename);
+ csvh << "Items,Inserts/sec,Lookups/sec" << std::endl;
+ for(auto &i : results)
+ csvh << std::get<0>(i) << "," << std::get<1>(i) << "," << std::get<2>(i) << std::endl;
+}
+
+int main(void)
+{
+ typedef chrono::duration<double, ratio<1, 1>> secs_type;
+ auto begin=chrono::high_resolution_clock::now();
+ while(chrono::duration_cast<secs_type>(chrono::high_resolution_clock::now()-begin).count()<3);
+
+ //benchmark<iostreams::data_store>("iostreams.csv", "STL iostreams", true);
+ //benchmark<naive::data_store>("afio_naive.csv", "AFIO naive", true);
+ //benchmark<atomic_updates::data_store>("afio_atomic.csv", "AFIO atomic update", true);
+ benchmark<final::data_store>("afio_final.csv", "AFIO single file", true);
+ return 0;
+} \ No newline at end of file
diff --git a/attic/example/workshop_final_afio.cpp b/attic/example/workshop_final_afio.cpp
new file mode 100644
index 00000000..c3fcceb9
--- /dev/null
+++ b/attic/example/workshop_final_afio.cpp
@@ -0,0 +1,2 @@
+#include "afio_pch.hpp"
+#include "workshop_final_afio.ipp"
diff --git a/attic/example/workshop_final_afio.ipp b/attic/example/workshop_final_afio.ipp
new file mode 100644
index 00000000..655b8367
--- /dev/null
+++ b/attic/example/workshop_final_afio.ipp
@@ -0,0 +1,665 @@
+#include "../detail/SpookyV2.h"
+
+//[workshop_final_interface
+namespace afio = BOOST_AFIO_V2_NAMESPACE;
+namespace filesystem = BOOST_AFIO_V2_NAMESPACE::filesystem;
+using BOOST_OUTCOME_V1_NAMESPACE::outcome;
+using BOOST_OUTCOME_V1_NAMESPACE::lightweight_futures::shared_future;
+
+class data_store
+{
+ struct _ostream;
+ friend struct _ostream;
+ afio::dispatcher_ptr _dispatcher;
+ // The small blob store keeps non-memory mappable blobs at 32 byte alignments
+ afio::handle_ptr _small_blob_store, _small_blob_store_append, _small_blob_store_ecc;
+ // The large blob store keeps memory mappable blobs at 4Kb alignments
+ afio::handle_ptr _large_blob_store, _large_blob_store_append, _large_blob_store_ecc;
+ // The index is where we keep the map of keys to blobs
+ afio::handle_ptr _index_store, _index_store_append, _index_store_ecc;
+ struct index;
+ std::unique_ptr<index> _index;
+public:
+ // Type used for read streams
+ using istream = std::shared_ptr<std::istream>;
+ // Type used for write streams
+ using ostream = std::shared_ptr<std::ostream>;
+ // Type used for lookup
+ using lookup_result_type = shared_future<istream>;
+ // Type used for write
+ using write_result_type = outcome<ostream>;
+
+ // Disposition flags
+ static constexpr size_t writeable = (1<<0);
+
+ // Open a data store at path
+ data_store(size_t flags = 0, afio::path path = "store");
+
+ // Look up item named name for reading, returning an istream for the item
+ shared_future<istream> lookup(std::string name) noexcept;
+ // Look up item named name for writing, returning an ostream for that item
+ outcome<ostream> write(std::string name) noexcept;
+};
+//]
+
+//[workshop_final1]
+namespace asio = BOOST_AFIO_V2_NAMESPACE::asio;
+using BOOST_AFIO_V2_NAMESPACE::error_code;
+using BOOST_AFIO_V2_NAMESPACE::generic_category;
+using BOOST_OUTCOME_V1_NAMESPACE::outcome;
+
+// A special allocator of highly efficient file i/o memory
+using file_buffer_type = std::vector<char, afio::utils::page_allocator<char>>;
+
+// Serialisation helper types
+#pragma pack(push, 1)
+struct ondisk_file_header // 20 bytes
+{
+ union
+ {
+ afio::off_t length; // Always 32 in byte order of whoever wrote this
+ char endian[8];
+ };
+ afio::off_t index_offset_begin; // Hint to the length of the store when the index was last written
+ unsigned int time_count; // Racy monotonically increasing count
+};
+struct ondisk_record_header // 28 bytes - ALWAYS ALIGNED TO 32 BYTE FILE OFFSET
+{
+ afio::off_t magic : 16; // 0xad magic
+ afio::off_t kind : 2; // 0 for zeroed space, 1,2 for blob, 3 for index
+ afio::off_t _spare : 6;
+ afio::off_t length : 40; // Size of the object (including this preamble, regions, key values) (+8)
+ unsigned int age; // file header time_count when this was added (+12)
+ uint64 hash[2]; // 128-bit SpookyHash of the object (from below onwards) (+28)
+ // ondisk_index_regions
+ // ondisk_index_key_value (many until length)
+};
+struct ondisk_index_regions // 12 + regions_size * 32
+{
+ afio::off_t thisoffset; // this index only valid if equals this offset
+ unsigned int regions_size; // count of regions with their status (+28)
+ struct ondisk_index_region
+ {
+ afio::off_t offset; // offset to this region
+ ondisk_record_header r; // copy of the header at the offset to avoid a disk seek
+ } regions[1];
+};
+struct ondisk_index_key_value // 8 + sizeof(key)
+{
+ unsigned int region_index; // Index into regions
+ unsigned int name_size; // Length of key
+ char name[1]; // Key string (utf-8)
+};
+#pragma pack(pop)
+
+struct data_store::index
+{
+ struct region
+ {
+ enum kind_type { zeroed=0, small_blob=1, large_blob=2, index=3 } kind;
+ afio::off_t offset, length;
+ uint64 hash[2];
+ region(ondisk_index_regions::ondisk_index_region *r) : kind(static_cast<kind_type>(r->r.kind)), offset(r->offset), length(r->r.length) { memcpy(hash, r->r.hash, sizeof(hash)); }
+ bool operator<(const region &o) const noexcept { return offset<o.offset; }
+ bool operator==(const region &o) const noexcept { return offset==o.offset && length==o.length; }
+ };
+ afio::off_t offset_loaded_from; // Offset this index was loaded from
+ unsigned int last_time_count; // Header time count
+ std::vector<region> regions;
+ std::unordered_map<std::string, size_t> key_to_region;
+ index() : offset_loaded_from(0) { }
+//]
+//[workshop_final2]
+ struct last_good_ondisk_index_info
+ {
+ afio::off_t offset;
+ std::unique_ptr<char[]> buffer;
+ size_t size;
+ last_good_ondisk_index_info() : offset(0), size(0) { }
+ };
+ // Finds the last good index in the store
+ outcome<last_good_ondisk_index_info> find_last_good_ondisk_index(afio::handle_ptr h) noexcept
+ {
+ last_good_ondisk_index_info ret;
+ error_code ec;
+ try
+ {
+ // Read the front of the store file to get the index offset hint
+ ondisk_file_header header;
+ afio::read(h, header, 0);
+ afio::off_t offset=0;
+ if(header.length==32)
+ offset=header.index_offset_begin;
+ else if(header.endian[0]==32) // wrong endian
+ return error_code(ENOEXEC, generic_category());
+ else // corrupted
+ return error_code(ENOTSUP, generic_category());
+ last_time_count=header.time_count;
+ // Fetch the valid extents
+ auto valid_extents(afio::extents(h));
+ auto valid_extents_it=valid_extents.begin();
+ // Iterate the records starting from index offset hint, keeping track of last known good index
+ bool done=true;
+ do
+ {
+ afio::off_t linear_scanning=0;
+ ondisk_record_header record;
+ afio::off_t file_length=h->lstat(afio::metadata_flags::size).st_size;
+ for(; offset<file_length;)
+ {
+ // Round to 32 byte boundary
+ offset&=~31ULL;
+ // Find my valid extent
+ while(offset>=valid_extents_it->first+valid_extents_it->second)
+ {
+ if(valid_extents.end()==++valid_extents_it)
+ {
+ valid_extents=afio::extents(h);
+ valid_extents_it=valid_extents.begin();
+ }
+ }
+ // Is this offset within a valid extent? If not, bump it.
+ if(offset<valid_extents_it->first)
+ offset=valid_extents_it->first;
+ afio::read(ec, h, record, offset);
+ if(ec) return ec;
+ // If this does not contain a valid record, linear scan
+ // until we find one
+ if(record.magic!=0xad || (record.length & 31))
+ {
+start_linear_scan:
+ if(!linear_scanning)
+ {
+ std::cerr << "WARNING: Corrupt record detected at " << offset << ", linear scanning ..." << std::endl;
+ linear_scanning=offset;
+ }
+ offset+=32;
+ continue;
+ }
+ // Is this the first good record after a corrupted section?
+ if(linear_scanning)
+ {
+ std::cerr << "NOTE: Found valid record after linear scan at " << offset << std::endl;
+ std::cerr << " Removing invalid data between " << linear_scanning << " and " << offset << std::endl;
+ // Rewrite a valid record to span the invalid section
+ ondisk_record_header temp={0};
+ temp.magic=0xad;
+ temp.length=offset-linear_scanning;
+ temp.age=last_time_count;
+ afio::write(ec, h, temp, linear_scanning);
+ // Deallocate the physical storage for the invalid section
+ afio::zero(ec, h, {{linear_scanning+12, offset-linear_scanning-12}});
+ linear_scanning=0;
+ }
+ // If not an index, skip entire record
+ if(record.kind!=3 /*index*/)
+ {
+ // If this record length is wrong, start a linear scan
+ if(record.length>file_length-offset)
+ offset+=32;
+ else
+ offset+=record.length;
+ continue;
+ }
+ std::unique_ptr<char[]> buffer;
+ // If record.length is corrupt, this will throw bad_alloc
+ try
+ {
+ buffer.reset(new char[(size_t) record.length-sizeof(header)]);
+ }
+ catch(...)
+ {
+ // Either we are out of memory, or it's a corrupted record
+ // TODO: Try a ECC heal pass here
+ // If this record length is wrong, start a linear scan
+ if(record.length>file_length-offset)
+ goto start_linear_scan;
+ else
+ offset+=record.length;
+ continue;
+ }
+ afio::read(ec, h, buffer.get(), (size_t) record.length-sizeof(header), offset+sizeof(header));
+ if(ec)
+ return ec;
+ uint64 hash[2]={0, 0};
+ SpookyHash::Hash128(buffer.get(), (size_t) record.length-sizeof(header), hash, hash+1);
+ // Is this record correct?
+ if(!memcmp(hash, record.hash, sizeof(hash)))
+ {
+ // Is this index not conflicted? If not, it's the new most recent index
+ ondisk_index_regions *r=(ondisk_index_regions *) buffer.get();
+ if(r->thisoffset==offset)
+ {
+ ret.buffer=std::move(buffer);
+ ret.size=(size_t) record.length-sizeof(header);
+ ret.offset=offset;
+ }
+ }
+ offset+=record.length;
+ }
+ if(ret.offset) // we have a valid most recent index so we're done
+ done=true;
+ else if(header.index_offset_begin>sizeof(header))
+ {
+ // Looks like the end of the store got trashed.
+ // Reparse from the very beginning
+ offset=32;
+ header.index_offset_begin=0;
+ }
+ else
+ {
+ // No viable records at all, or empty store.
+ done=true;
+ }
+ } while(!done);
+ return ret;
+ }
+ catch(...)
+ {
+ return std::current_exception();
+ }
+ }
+//]
+//[workshop_final3]
+ // Loads the index from the store
+ outcome<void> load(afio::handle_ptr h) noexcept
+ {
+ // If find_last_good_ondisk_index() returns error or exception, return those, else
+ // initialise ondisk_index_info to monad.get()
+ BOOST_OUTCOME_AUTO(ondisk_index_info, find_last_good_ondisk_index(h));
+ error_code ec;
+ try
+ {
+ offset_loaded_from=0;
+ regions.clear();
+ key_to_region.clear();
+ ondisk_index_regions *r=(ondisk_index_regions *) ondisk_index_info.buffer.get();
+ regions.reserve(r->regions_size);
+ for(size_t n=0; n<r->regions_size; n++)
+ regions.push_back(region(r->regions+n));
+ ondisk_index_key_value *k=(ondisk_index_key_value *)(r+r->regions_size), *end=(ondisk_index_key_value *)(ondisk_index_info.buffer.get()+ondisk_index_info.size);
+ while(k<end)
+ key_to_region.insert(std::make_pair(std::string(k->name, k->name_size), k->region_index));
+ offset_loaded_from=ondisk_index_info.offset;
+ return {};
+ }
+ catch(...)
+ {
+ return std::current_exception();
+ }
+ }
+//]
+//[workshop_final4]
+ // Writes the index to the store
+ outcome<void> store(afio::handle_ptr rwh, afio::handle_ptr appendh) noexcept
+ {
+ error_code ec;
+ std::vector<ondisk_index_regions::ondisk_index_region> ondisk_regions;
+ ondisk_regions.reserve(65536);
+ for(auto &i : regions)
+ {
+ ondisk_index_regions::ondisk_index_region r;
+ r.offset=i.offset;
+ r.r.kind=i.kind;
+ r.r.length=i.length;
+ memcpy(r.r.hash, i.hash, sizeof(i.hash));
+ ondisk_regions.push_back(r);
+ }
+
+ size_t bytes=0;
+ for(auto &i : key_to_region)
+ bytes+=8+i.first.size();
+ std::vector<char> ondisk_key_values(bytes);
+ ondisk_index_key_value *kv=(ondisk_index_key_value *) ondisk_key_values.data();
+ for(auto &i : key_to_region)
+ {
+ kv->region_index=(unsigned int) i.second;
+ kv->name_size=(unsigned int) i.first.size();
+ memcpy(kv->name, i.first.c_str(), i.first.size());
+ kv=(ondisk_index_key_value *)(((char *) kv) + 8 + i.first.size());
+ }
+
+ struct
+ {
+ ondisk_record_header header;
+ ondisk_index_regions header2;
+ } h;
+ h.header.magic=0xad;
+ h.header.kind=3; // writing an index
+ h.header._spare=0;
+ h.header.length=sizeof(h.header)+sizeof(h.header2)+sizeof(ondisk_regions.front())*(regions.size()-1)+bytes;
+ h.header.age=last_time_count;
+ // hash calculated later
+ // thisoffset calculated later
+ h.header2.regions_size=(unsigned int) regions.size();
+ // Pad zeros to 32 byte multiple
+ std::vector<char> zeros((h.header.length+31)&~31ULL);
+ h.header.length+=zeros.size();
+
+ // Create the gather buffer sequence
+ std::vector<asio::const_buffer> buffers(4);
+ buffers[0]=asio::const_buffer(&h, 36);
+ buffers[1]=asio::const_buffer(ondisk_regions.data(), sizeof(ondisk_regions.front())*ondisk_regions.size());
+ buffers[2]=asio::const_buffer(ondisk_key_values.data(), ondisk_key_values.size());
+ if(zeros.empty())
+ buffers.pop_back();
+ else
+ buffers[3]=asio::const_buffer(zeros.data(), zeros.size());
+ file_buffer_type reread(sizeof(h));
+ ondisk_record_header *reread_header=(ondisk_record_header *) reread.data();
+ bool success=false;
+ do
+ {
+ // Is this index stale?
+ BOOST_OUTCOME_AUTO(ondisk_index_info, find_last_good_ondisk_index(rwh));
+ if(ondisk_index_info.offset!=offset_loaded_from)
+ {
+ // A better conflict resolution strategy might check to see if deltas
+ // are compatible, but for the sake of brevity we always report conflict
+ return error_code(EDEADLK, generic_category());
+ }
+ // Take the current length of the store file. Any index written will be at or after this.
+ h.header2.thisoffset=appendh->lstat(afio::metadata_flags::size).st_size;
+ memset(h.header.hash, 0, sizeof(h.header.hash));
+ // Hash the end of the first gather buffer and all the remaining gather buffers
+ SpookyHash::Hash128(asio::buffer_cast<const char *>(buffers[0])+24, asio::buffer_size(buffers[0])-24, h.header.hash, h.header.hash+1);
+ SpookyHash::Hash128(asio::buffer_cast<const char *>(buffers[1]), asio::buffer_size(buffers[1]), h.header.hash, h.header.hash+1);
+ SpookyHash::Hash128(asio::buffer_cast<const char *>(buffers[2]), asio::buffer_size(buffers[2]), h.header.hash, h.header.hash+1);
+ if(buffers.size()>3)
+ SpookyHash::Hash128(asio::buffer_cast<const char *>(buffers[3]), asio::buffer_size(buffers[3]), h.header.hash, h.header.hash+1);
+ // Atomic append the record
+ afio::write(ec, appendh, buffers, 0);
+ if(ec) return ec;
+ // Reread the record
+ afio::read(ec, rwh, reread.data(), reread.size(), h.header2.thisoffset);
+ if(ec) return ec;
+ // If the record doesn't match it could be due to a lag in st_size between open handles,
+ // so retry until success or stale index
+ } while(memcmp(reread_header->hash, h.header.hash, sizeof(h.header.hash)));
+
+ // New index has been successfully written. Update the hint at the front of the file.
+ // This update is racy of course, but as it's merely a hint we don't care.
+ ondisk_file_header file_header;
+ afio::read(ec, rwh, file_header, 0);
+ if(!ec && file_header.index_offset_begin<h.header2.thisoffset)
+ {
+ file_header.index_offset_begin=h.header2.thisoffset;
+ file_header.time_count++;
+ afio::write(ec, rwh, file_header, 0);
+ }
+ offset_loaded_from=h.header2.thisoffset;
+ last_time_count=file_header.time_count;
+ return {};
+ }
+//]
+//[workshop_final5]
+ // Reloads the index if needed
+ outcome<void> refresh(afio::handle_ptr h) noexcept
+ {
+ static afio::off_t last_size;
+ error_code ec;
+ afio::off_t size=h->lstat(afio::metadata_flags::size).st_size;
+ // Has the size changed? If so, need to check the hint
+ if(size>last_size)
+ {
+ last_size=size;
+ ondisk_file_header header;
+ afio::read(ec, h, header, 0);
+ if(ec) return ec;
+ // If the hint is moved, we are stale
+ if(header.index_offset_begin>offset_loaded_from)
+ return load(h);
+ }
+ return {};
+ }
+};
+//]
+
+//[workshop_final6]
+namespace asio = BOOST_AFIO_V2_NAMESPACE::asio;
+using BOOST_AFIO_V2_NAMESPACE::error_code;
+using BOOST_AFIO_V2_NAMESPACE::generic_category;
+
+// A special allocator of highly efficient file i/o memory
+using file_buffer_type = std::vector<char, afio::utils::page_allocator<char>>;
+
+// An iostream which reads directly from a memory mapped AFIO file
+struct idirectstream : public std::istream
+{
+ struct directstreambuf : public std::streambuf
+ {
+ afio::handle_ptr h; // Holds the file open
+ std::shared_ptr<file_buffer_type> buffer;
+ // From a mmap
+ directstreambuf(afio::handle_ptr _h, char *addr, size_t length) : h(std::move(_h))
+ {
+ // Set the get buffer this streambuf is to use
+ setg(addr, addr, addr + length);
+ }
+ // From a malloc
+ directstreambuf(afio::handle_ptr _h, std::shared_ptr<file_buffer_type> _buffer, size_t length) : h(std::move(_h)), buffer(std::move(_buffer))
+ {
+ // Set the get buffer this streambuf is to use
+ setg(buffer->data(), buffer->data(), buffer->data() + length);
+ }
+ };
+ std::unique_ptr<directstreambuf> buf;
+ template<class U> idirectstream(afio::handle_ptr h, U &&buffer, size_t length) : std::istream(new directstreambuf(std::move(h), std::forward<U>(buffer), length)), buf(static_cast<directstreambuf *>(rdbuf()))
+ {
+ }
+ virtual ~idirectstream() override
+ {
+ // Reset the stream before deleting the buffer
+ rdbuf(nullptr);
+ }
+};
+//]
+//[workshop_final7]
+// An iostream which buffers all the output, then commits on destruct
+struct data_store::_ostream : public std::ostream
+{
+ struct ostreambuf : public std::streambuf
+ {
+ using int_type = std::streambuf::int_type;
+ using traits_type = std::streambuf::traits_type;
+ data_store *ds;
+ std::string name;
+ file_buffer_type buffer;
+ ostreambuf(data_store *_ds, std::string _name) : ds(_ds), name(std::move(_name)), buffer(afio::utils::page_sizes().front())
+ {
+ // Set the put buffer this streambuf is to use
+ setp(buffer.data(), buffer.data() + buffer.size());
+ }
+ virtual ~ostreambuf() override
+ {
+ try
+ {
+ ondisk_index_regions::ondisk_index_region r;
+ r.r.magic=0xad;
+ r.r.kind=1; // small blob
+ r.r.length=sizeof(r.r)+buffer.size();
+ r.r.length=(r.r.length+31)&~31ULL; // pad to 32 byte multiple
+ r.r.age=ds->_index->last_time_count;
+ memset(r.r.hash, 0, sizeof(r.r.hash));
+ SpookyHash::Hash128(buffer.data(), (size_t)(r.r.length-sizeof(r.r)), r.r.hash, r.r.hash+1);
+
+ // Create the gather buffer sequence and atomic append the blob
+ std::vector<asio::const_buffer> buffers(2);
+ buffers[0]=asio::const_buffer((char *) &r.r, sizeof(r.r));
+ buffers[1]=asio::const_buffer(buffer.data(), (size_t)(r.r.length-sizeof(r.r)));
+ error_code ec;
+ auto offset=ds->_small_blob_store_append->lstat(afio::metadata_flags::size).st_size;
+ afio::write(ec, ds->_small_blob_store_append, buffers, 0);
+ if(ec)
+ abort(); // should really do something better here
+
+ // Find out where my blob ended up
+ ondisk_record_header header;
+ do
+ {
+ afio::read(ec, ds->_small_blob_store_append, header, offset);
+ if(ec) abort();
+ if(header.kind==1 /*small blob*/ && !memcmp(header.hash, r.r.hash, sizeof(header.hash)))
+ {
+ r.offset=offset;
+ break;
+ }
+ offset+=header.length;
+ } while(offset<ds->_small_blob_store_append->lstat(afio::metadata_flags::size).st_size);
+
+ for(;;)
+ {
+ // Add my blob to the regions
+ ds->_index->regions.push_back(&r);
+ // Add my key to the key index
+ ds->_index->key_to_region[name]=ds->_index->regions.size()-1;
+ // Commit the index, and if successful exit
+ if(!ds->_index->store(ds->_index_store, ds->_index_store_append))
+ return;
+ // Reload the index and retry
+ if(ds->_index->load(ds->_index_store))
+ abort();
+ }
+ }
+ catch(...)
+ {
+ }
+ }
+ virtual int_type overflow(int_type c) override
+ {
+ size_t thisbuffer=pptr()-pbase();
+ if(thisbuffer>=buffer.size())
+ sync();
+ if(c!=traits_type::eof())
+ {
+ *pptr()=(char)c;
+ pbump(1);
+ return traits_type::to_int_type(c);
+ }
+ return traits_type::eof();
+ }
+ virtual int sync() override
+ {
+ buffer.resize(buffer.size()*2);
+ setp(buffer.data() + buffer.size()/2, buffer.data() + buffer.size());
+ return 0;
+ }
+ };
+ std::unique_ptr<ostreambuf> buf;
+ _ostream(data_store *ds, std::string name) : std::ostream(new ostreambuf(ds, std::move(name))), buf(static_cast<ostreambuf *>(rdbuf()))
+ {
+ }
+ virtual ~_ostream() override
+ {
+ // Reset the stream before deleting the buffer
+ rdbuf(nullptr);
+ }
+};
+//]
+
+//[workshop_final8]
+data_store::data_store(size_t flags, afio::path path)
+{
+ // Make a dispatcher for the local filesystem URI, masking out write flags on all operations if not writeable
+ _dispatcher=afio::make_dispatcher("file:///", afio::file_flags::none, !(flags & writeable) ? afio::file_flags::write : afio::file_flags::none).get();
+ // Set the dispatcher for this thread, and create/open a handle to the store directory
+ afio::current_dispatcher_guard h(_dispatcher);
+ auto dirh(afio::dir(std::move(path), afio::file_flags::create)); // throws if there was an error
+
+ // The small blob store keeps non-memory mappable blobs at 32 byte alignments
+ _small_blob_store_append=afio::file(dirh, "small_blob_store", afio::file_flags::create | afio::file_flags::append); // throws if there was an error
+ _small_blob_store=afio::file(dirh, "small_blob_store", afio::file_flags::read_write); // throws if there was an error
+ _small_blob_store_ecc=afio::file(dirh, "small_blob_store.ecc", afio::file_flags::create | afio::file_flags::read_write); // throws if there was an error
+
+ // The large blob store keeps memory mappable blobs at 4Kb alignments
+ // TODO
+
+ // The index is where we keep the map of keys to blobs
+ _index_store_append=afio::file(dirh, "index", afio::file_flags::create | afio::file_flags::append); // throws if there was an error
+ _index_store=afio::file(dirh, "index", afio::file_flags::read_write); // throws if there was an error
+ _index_store_ecc=afio::file(dirh, "index.ecc", afio::file_flags::create | afio::file_flags::read_write); // throws if there was an error
+ // Is this store just created?
+ if(!_index_store_append->lstat(afio::metadata_flags::size).st_size)
+ {
+ ondisk_file_header header;
+ header.length=32;
+ header.index_offset_begin=32;
+ header.time_count=0;
+ // This is racy, but the file format doesn't care
+ afio::write(_index_store_append, header, 0);
+ }
+ _index.reset(new index);
+}
+
+shared_future<data_store::istream> data_store::lookup(std::string name) noexcept
+{
+ try
+ {
+ BOOST_OUTCOME_PROPAGATE(_index->refresh(_index_store));
+ auto it=_index->key_to_region.find(name);
+ if(_index->key_to_region.end()==it)
+ return error_code(ENOENT, generic_category()); // not found
+ auto &r=_index->regions[it->second];
+ afio::off_t offset=r.offset+24, length=r.length-24;
+ // Schedule the reading of the file into a buffer
+ auto buffer=std::make_shared<file_buffer_type>((size_t) length);
+ afio::future<> h(afio::async_read(_small_blob_store, buffer->data(), (size_t) length, offset));
+ // When the read completes call this continuation
+ return h.then([buffer, length](const afio::future<> &h) -> shared_future<data_store::istream> {
+ // If read failed, return the error or exception immediately
+ BOOST_OUTCOME_PROPAGATE(h);
+ data_store::istream ret(std::make_shared<idirectstream>(h.get_handle(), buffer, (size_t) length));
+ return ret;
+ });
+ }
+ catch(...)
+ {
+ return std::current_exception();
+ }
+}
+
+outcome<data_store::ostream> data_store::write(std::string name) noexcept
+{
+ try
+ {
+ return std::make_shared<_ostream>(this, std::move(name));
+ }
+ catch (...)
+ {
+ return std::current_exception();
+ }
+}
+//]
+
+int main(void)
+{
+ // To write a key-value item called "dog"
+ {
+ data_store ds;
+ auto dogh = ds.write("dog").get();
+ auto &dogs = *dogh;
+ dogs << "I am a dog";
+ }
+
+ // To retrieve a key-value item called "dog"
+ {
+ data_store ds;
+ auto dogh = ds.lookup("dog");
+ if (dogh.empty())
+ std::cerr << "No item called dog" << std::endl;
+ else if(dogh.has_error())
+ std::cerr << "ERROR: Looking up dog returned error " << dogh.get_error().message() << std::endl;
+ else if (dogh.has_exception())
+ {
+ std::cerr << "FATAL: Looking up dog threw exception" << std::endl;
+ std::terminate();
+ }
+ else
+ {
+ std::string buffer;
+ *dogh.get() >> buffer;
+ std::cout << "Item dog has value " << buffer << std::endl;
+ }
+ }
+ return 0;
+}
diff --git a/attic/example/workshop_naive.cpp b/attic/example/workshop_naive.cpp
new file mode 100644
index 00000000..225d09b7
--- /dev/null
+++ b/attic/example/workshop_naive.cpp
@@ -0,0 +1,2 @@
+#include "afio_pch.hpp"
+#include "workshop_naive.ipp"
diff --git a/attic/example/workshop_naive.ipp b/attic/example/workshop_naive.ipp
new file mode 100644
index 00000000..0739095b
--- /dev/null
+++ b/attic/example/workshop_naive.ipp
@@ -0,0 +1,126 @@
+//[workshop_naive_interface
+namespace filesystem = BOOST_AFIO_V2_NAMESPACE::filesystem;
+using BOOST_OUTCOME_V1_NAMESPACE::outcome;
+
+class data_store
+{
+ filesystem::path _store_path;
+ bool _writeable;
+public:
+ // Type used for read streams
+ using istream = std::shared_ptr<std::istream>;
+ // Type used for write streams
+ using ostream = std::shared_ptr<std::ostream>;
+ // Type used for lookup
+ using lookup_result_type = outcome<istream>;
+ // Type used for write
+ using write_result_type = outcome<ostream>;
+
+ // Disposition flags
+ static constexpr size_t writeable = (1<<0);
+
+ // Open a data store at path
+ data_store(size_t flags = 0, filesystem::path path = "store");
+
+ // Look up item named name for reading, returning a std::istream for the item if it exists
+ outcome<istream> lookup(std::string name) noexcept;
+ // Look up item named name for writing, returning an ostream for that item
+ outcome<ostream> write(std::string name) noexcept;
+};
+//]
+
+//[workshop_naive]
+using BOOST_OUTCOME_V1_NAMESPACE::empty;
+using BOOST_AFIO_V2_NAMESPACE::error_code;
+using BOOST_AFIO_V2_NAMESPACE::generic_category;
+
+static bool is_valid_name(const std::string &name) noexcept
+{
+ static const std::string banned("<>:\"/\\|?*\0", 10);
+ if(std::string::npos!=name.find_first_of(banned))
+ return false;
+ // No leading period
+ return name[0]!='.';
+}
+
+static std::shared_ptr<std::fstream> name_to_fstream(const filesystem::path &store_path, std::string name, std::ios::openmode mode)
+{
+ auto to_lookup(store_path / name);
+ return std::make_shared<std::fstream>(to_lookup.native(), mode);
+}
+
+data_store::data_store(size_t flags, filesystem::path path) : _store_path(std::move(path)), _writeable(flags & writeable)
+{
+}
+
+outcome<data_store::istream> data_store::lookup(std::string name) noexcept
+{
+ if(!is_valid_name(name))
+ return error_code(EINVAL, generic_category());
+ try
+ {
+ istream ret(name_to_fstream(_store_path, std::move(name), std::ios::in | std::ios::binary));
+ if(!ret->fail())
+ return std::move(ret);
+ return empty;
+ }
+ catch(...)
+ {
+ return std::current_exception();
+ }
+}
+
+outcome<data_store::ostream> data_store::write(std::string name) noexcept
+{
+ if(!_writeable)
+ return error_code(EROFS, generic_category());
+ if(!is_valid_name(name))
+ return error_code(EINVAL, generic_category());
+ try
+ {
+ if (!filesystem::exists(_store_path))
+ filesystem::create_directory(_store_path);
+ ostream ret(name_to_fstream(_store_path, std::move(name), std::ios::out | std::ios::binary));
+ return std::move(ret);
+ }
+ catch (...)
+ {
+ return std::current_exception();
+ }
+}
+//]
+
+int main(void)
+{
+ //[workshop_use_naive]
+ // To write a key-value item called "dog"
+ {
+ data_store ds;
+ auto dogh = ds.write("dog").get();
+ auto &dogs = *dogh;
+ dogs << "I am a dog";
+ }
+
+ // To retrieve a key-value item called "dog"
+ {
+ data_store ds;
+ auto dogh = ds.lookup("dog");
+ if (dogh.empty())
+ std::cerr << "No item called dog" << std::endl;
+ else if(dogh.has_error())
+ std::cerr << "ERROR: Looking up dog returned error " << dogh.get_error().message() << std::endl;
+ else if (dogh.has_exception())
+ {
+ std::cerr << "FATAL: Looking up dog threw exception" << std::endl;
+ std::terminate();
+ }
+ else
+ {
+ std::string buffer;
+ *dogh.get() >> buffer;
+ std::cout << "Item dog has value " << buffer << std::endl;
+ }
+ }
+ //]
+ return 0;
+}
diff --git a/attic/example/workshop_naive_afio.cpp b/attic/example/workshop_naive_afio.cpp
new file mode 100644
index 00000000..a807b6c8
--- /dev/null
+++ b/attic/example/workshop_naive_afio.cpp
@@ -0,0 +1,2 @@
+#include "afio_pch.hpp"
+#include "workshop_naive_afio.ipp"
diff --git a/attic/example/workshop_naive_afio.ipp b/attic/example/workshop_naive_afio.ipp
new file mode 100644
index 00000000..6100ed96
--- /dev/null
+++ b/attic/example/workshop_naive_afio.ipp
@@ -0,0 +1,269 @@
+//[workshop_naive_afio_interface
+namespace afio = BOOST_AFIO_V2_NAMESPACE;
+namespace filesystem = BOOST_AFIO_V2_NAMESPACE::filesystem;
+using BOOST_OUTCOME_V1_NAMESPACE::outcome;
+
+class data_store
+{
+ afio::dispatcher_ptr _dispatcher;
+ afio::handle_ptr _store;
+public:
+ // Type used for read streams
+ using istream = std::shared_ptr<std::istream>;
+ // Type used for write streams
+ using ostream = std::shared_ptr<std::ostream>;
+ // Type used for lookup
+ using lookup_result_type = outcome<istream>;
+ // Type used for write
+ using write_result_type = outcome<ostream>;
+
+ // Disposition flags
+ static constexpr size_t writeable = (1<<0);
+
+ // Open a data store at path
+ data_store(size_t flags = 0, afio::path path = "store");
+
+ // Look up item named name for reading, returning a std::istream for the item if it exists
+ outcome<istream> lookup(std::string name) noexcept;
+ // Look up item named name for writing, returning an ostream for that item
+ outcome<ostream> write(std::string name) noexcept;
+};
+//]
+
+//[workshop_naive_afio2]
+namespace asio = BOOST_AFIO_V2_NAMESPACE::asio;
+using BOOST_OUTCOME_V1_NAMESPACE::empty;
+using BOOST_AFIO_V2_NAMESPACE::error_code;
+using BOOST_AFIO_V2_NAMESPACE::generic_category;
+
+// A special allocator of highly efficient file i/o memory
+using file_buffer_type = std::vector<char, afio::utils::page_allocator<char>>;
+
+// An iostream which reads directly from a memory mapped AFIO file
+struct idirectstream : public std::istream
+{
+ struct directstreambuf : public std::streambuf
+ {
+ afio::handle_ptr h; // Holds the file open and therefore mapped
+ file_buffer_type buffer;
+ afio::handle::mapped_file_ptr mfp;
+ directstreambuf(error_code &ec, afio::handle_ptr _h) : h(std::move(_h))
+ {
+ // Get the size of the file. If greater than 128Kb mmap it
+ size_t length=(size_t) h->lstat(afio::metadata_flags::size).st_size;
+ char *p=nullptr;
+ if(length>=128*1024)
+ {
+ if((mfp=h->map_file()))
+ p = (char *) mfp->addr;
+ }
+ if(!p)
+ {
+ buffer.resize(length);
+ afio::read(ec, h, buffer.data(), length, 0);
+ p=buffer.data();
+ }
+ // Set the get buffer this streambuf is to use
+ setg(p, p, p + length);
+ }
+ };
+ std::unique_ptr<directstreambuf> buf;
+ idirectstream(error_code &ec, afio::handle_ptr h) : std::istream(new directstreambuf(ec, std::move(h))), buf(static_cast<directstreambuf *>(rdbuf()))
+ {
+ }
+ virtual ~idirectstream() override
+ {
+ // Reset the stream before deleting the buffer
+ rdbuf(nullptr);
+ }
+};
+
+// An iostream which writes to an AFIO file in 4Kb pages
+struct odirectstream : public std::ostream
+{
+ struct directstreambuf : public std::streambuf
+ {
+ using int_type = std::streambuf::int_type;
+ using traits_type = std::streambuf::traits_type;
+ afio::future<> lastwrite; // the last async write performed
+ afio::off_t offset; // offset of next write
+ file_buffer_type buffer; // a page size on this machine
+ file_buffer_type lastbuffer;
+ directstreambuf(afio::handle_ptr _h) : lastwrite(std::move(_h)), offset(0), buffer(afio::utils::page_sizes().front())
+ {
+ // Set the put buffer this streambuf is to use
+ setp(buffer.data(), buffer.data() + buffer.size());
+ }
+ virtual ~directstreambuf() override
+ {
+ try
+ {
+ // Flush buffers and wait until last write completes
+ // Schedule an asynchronous write of the buffer to storage
+ size_t thisbuffer = pptr() - pbase();
+ if(thisbuffer)
+ lastwrite = afio::async_write(afio::async_truncate(lastwrite, offset+thisbuffer), buffer.data(), thisbuffer, offset);
+ lastwrite.get();
+ }
+ catch(...)
+ {
+ }
+ }
+ virtual int_type overflow(int_type c) override
+ {
+ size_t thisbuffer=pptr()-pbase();
+ if(thisbuffer>=buffer.size())
+ sync();
+ if(c!=traits_type::eof())
+ {
+ *pptr()=(char)c;
+ pbump(1);
+ return traits_type::to_int_type(c);
+ }
+ return traits_type::eof();
+ }
+ virtual int sync() override
+ {
+ // Wait for the last write to complete, propagating any exceptions
+ lastwrite.get();
+ size_t thisbuffer=pptr()-pbase();
+ if(thisbuffer > 0)
+ {
+ // Detach the current buffer and replace with a fresh one to allow the kernel to steal the page
+ lastbuffer=std::move(buffer);
+ buffer.resize(lastbuffer.size());
+ setp(buffer.data(), buffer.data() + buffer.size());
+ // Schedule an extension of physical storage by an extra page
+ lastwrite = afio::async_truncate(lastwrite, offset + thisbuffer);
+ // Schedule an asynchronous write of the buffer to storage
+ lastwrite=afio::async_write(lastwrite, lastbuffer.data(), thisbuffer, offset);
+ offset+=thisbuffer;
+ }
+ return 0;
+ }
+ };
+ std::unique_ptr<directstreambuf> buf;
+ odirectstream(afio::handle_ptr h) : std::ostream(new directstreambuf(std::move(h))), buf(static_cast<directstreambuf *>(rdbuf()))
+ {
+ }
+ virtual ~odirectstream() override
+ {
+ // Reset the stream before deleting the buffer
+ rdbuf(nullptr);
+ }
+};
+//]
+
+//[workshop_naive_afio1]
+namespace asio = BOOST_AFIO_V2_NAMESPACE::asio;
+using BOOST_OUTCOME_V1_NAMESPACE::empty;
+using BOOST_AFIO_V2_NAMESPACE::error_code;
+using BOOST_AFIO_V2_NAMESPACE::generic_category;
+
+static bool is_valid_name(const std::string &name) noexcept
+{
+ static const std::string banned("<>:\"/\\|?*\0", 10);
+ if(std::string::npos!=name.find_first_of(banned))
+ return false;
+ // No leading period
+ return name[0]!='.';
+}
+
+data_store::data_store(size_t flags, afio::path path)
+{
+ // Make a dispatcher for the local filesystem URI, masking out write flags on all operations if not writeable
+ _dispatcher=afio::make_dispatcher("file:///", afio::file_flags::none, !(flags & writeable) ? afio::file_flags::write : afio::file_flags::none).get();
+ // Set the dispatcher for this thread, and open a handle to the store directory
+ afio::current_dispatcher_guard h(_dispatcher);
+ _store=afio::dir(std::move(path), afio::file_flags::create); // throws if there was an error
+}
+
+outcome<data_store::istream> data_store::lookup(std::string name) noexcept
+{
+ if(!is_valid_name(name))
+ return error_code(EINVAL, generic_category());
+ try
+ {
+ error_code ec;
+ // Open the file using the handle to the store directory as the base.
+ // The store directory can be freely renamed by any third party process
+ // and everything here will work perfectly.
+ afio::handle_ptr h(afio::file(ec, _store, name, afio::file_flags::read));
+ if(ec)
+ {
+ // If the file was not found, return empty else the error
+ if(ec==error_code(ENOENT, generic_category()))
+ return empty;
+ return ec;
+ }
+ // Create an istream which directly uses the mapped file.
+ data_store::istream ret(std::make_shared<idirectstream>(ec, std::move(h)));
+ if(ec)
+ return ec;
+ return ret;
+ }
+ catch(...)
+ {
+ return std::current_exception();
+ }
+}
+
+outcome<data_store::ostream> data_store::write(std::string name) noexcept
+{
+ if(!is_valid_name(name))
+ return error_code(EINVAL, generic_category());
+ try
+ {
+ error_code ec;
+ // Open the file using the handle to the store directory as the base.
+ // The store directory can be freely renamed by any third party process
+ // and everything here will work perfectly. You could enable direct
+ // buffer writing - this sends 4Kb pages directly to the physical hardware
+ // bypassing the kernel file page cache, however this is not optimal if reads of
+ // the value are likely to occur soon.
+ afio::handle_ptr h(afio::file(ec, _store, name, afio::file_flags::create | afio::file_flags::write
+ /*| afio::file_flags::os_direct*/));
+ if(ec)
+ return ec;
+ // Create an ostream which directly uses the mapped file.
+ return outcome<data_store::ostream>(std::make_shared<odirectstream>(std::move(h)));
+ }
+ catch (...)
+ {
+ return std::current_exception();
+ }
+}
+//]
+
+int main(void)
+{
+ // To write a key-value item called "dog"
+ {
+ data_store ds;
+ auto dogh = ds.write("dog").get();
+ auto &dogs = *dogh;
+ dogs << "I am a dog";
+ }
+
+ // To retrieve a key-value item called "dog"
+ {
+ data_store ds;
+ auto dogh = ds.lookup("dog");
+ if (dogh.empty())
+ std::cerr << "No item called dog" << std::endl;
+ else if(dogh.has_error())
+ std::cerr << "ERROR: Looking up dog returned error " << dogh.get_error().message() << std::endl;
+ else if (dogh.has_exception())
+ {
+ std::cerr << "FATAL: Looking up dog threw exception" << std::endl;
+ std::terminate();
+ }
+ else
+ {
+ std::string buffer;
+ *dogh.get() >> buffer;
+ std::cout << "Item dog has value " << buffer << std::endl;
+ }
+ }
+ return 0;
+}
diff --git a/attic/example/workshop_naive_async_afio.cpp b/attic/example/workshop_naive_async_afio.cpp
new file mode 100644
index 00000000..86cb8052
--- /dev/null
+++ b/attic/example/workshop_naive_async_afio.cpp
@@ -0,0 +1,2 @@
+#include "afio_pch.hpp"
+#include "workshop_naive_async_afio.ipp"
diff --git a/attic/example/workshop_naive_async_afio.ipp b/attic/example/workshop_naive_async_afio.ipp
new file mode 100644
index 00000000..651d2a4e
--- /dev/null
+++ b/attic/example/workshop_naive_async_afio.ipp
@@ -0,0 +1,272 @@
+//[workshop_naive_async_afio_interface
+namespace afio = BOOST_AFIO_V2_NAMESPACE;
+namespace filesystem = BOOST_AFIO_V2_NAMESPACE::filesystem;
+using BOOST_OUTCOME_V1_NAMESPACE::lightweight_futures::shared_future;
+
+class data_store
+{
+ afio::dispatcher_ptr _dispatcher;
+ afio::handle_ptr _store;
+public:
+ // Type used for read streams
+ using istream = std::shared_ptr<std::istream>;
+ // Type used for write streams
+ using ostream = std::shared_ptr<std::ostream>;
+ // Type used for lookup
+ using lookup_result_type = shared_future<istream>;
+ // Type used for write
+ using write_result_type = shared_future<ostream>;
+
+ // Disposition flags
+ static constexpr size_t writeable = (1<<0);
+
+ // Open a data store at path
+ data_store(size_t flags = 0, afio::path path = "store");
+
+ // Look up item named name for reading, returning an istream for the item
+ shared_future<istream> lookup(std::string name) noexcept;
+ // Look up item named name for writing, returning an ostream for that item
+ shared_future<ostream> write(std::string name) noexcept;
+};
+//]
+
+//[workshop_naive_async_afio3]
+namespace asio = BOOST_AFIO_V2_NAMESPACE::asio;
+using BOOST_AFIO_V2_NAMESPACE::error_code;
+using BOOST_AFIO_V2_NAMESPACE::generic_category;
+
+// A special allocator of highly efficient file i/o memory
+using file_buffer_type = std::vector<char, afio::utils::page_allocator<char>>;
+
+// An iostream which reads directly from a memory mapped AFIO file
+struct idirectstream : public std::istream
+{
+ struct directstreambuf : public std::streambuf
+ {
+ afio::handle_ptr h; // Holds the file open
+ std::shared_ptr<file_buffer_type> buffer;
+ afio::handle::mapped_file_ptr mfp;
+ // From a mmap
+ directstreambuf(afio::handle_ptr _h, afio::handle::mapped_file_ptr _mfp, size_t length) : h(std::move(_h)), mfp(std::move(_mfp))
+ {
+ // Set the get buffer this streambuf is to use
+ setg((char *) mfp->addr, (char *) mfp->addr, (char *) mfp->addr + length);
+ }
+ // From a malloc
+ directstreambuf(afio::handle_ptr _h, std::shared_ptr<file_buffer_type> _buffer, size_t length) : h(std::move(_h)), buffer(std::move(_buffer))
+ {
+ // Set the get buffer this streambuf is to use
+ setg(buffer->data(), buffer->data(), buffer->data() + length);
+ }
+ };
+ std::unique_ptr<directstreambuf> buf;
+ template<class U> idirectstream(afio::handle_ptr h, U &&buffer, size_t length) : std::istream(new directstreambuf(std::move(h), std::forward<U>(buffer), length)), buf(static_cast<directstreambuf *>(rdbuf()))
+ {
+ }
+ virtual ~idirectstream() override
+ {
+ // Reset the stream before deleting the buffer
+ rdbuf(nullptr);
+ }
+};
+
+// An iostream which writes to an AFIO file in 4Kb pages
+struct odirectstream : public std::ostream
+{
+ struct directstreambuf : public std::streambuf
+ {
+ using int_type = std::streambuf::int_type;
+ using traits_type = std::streambuf::traits_type;
+ afio::future<> lastwrite; // the last async write performed
+ afio::off_t offset; // offset of next write
+ file_buffer_type buffer; // a page size on this machine
+ file_buffer_type lastbuffer;
+ directstreambuf(afio::handle_ptr _h) : lastwrite(std::move(_h)), offset(0), buffer(afio::utils::page_sizes().front())
+ {
+ // Set the put buffer this streambuf is to use
+ setp(buffer.data(), buffer.data() + buffer.size());
+ }
+ virtual ~directstreambuf() override
+ {
+ try
+ {
+ // Flush buffers and wait until last write completes
+ // Schedule an asynchronous write of the buffer to storage
+ size_t thisbuffer = pptr() - pbase();
+ if(thisbuffer)
+ lastwrite = afio::async_write(afio::async_truncate(lastwrite, offset+thisbuffer), buffer.data(), thisbuffer, offset);
+ lastwrite.get();
+ }
+ catch(...)
+ {
+ }
+ }
+ virtual int_type overflow(int_type c) override
+ {
+ size_t thisbuffer=pptr()-pbase();
+ if(thisbuffer>=buffer.size())
+ sync();
+ if(c!=traits_type::eof())
+ {
+ *pptr()=(char)c;
+ pbump(1);
+ return traits_type::to_int_type(c);
+ }
+ return traits_type::eof();
+ }
+ virtual int sync() override
+ {
+ // Wait for the last write to complete, propagating any exceptions
+ lastwrite.get();
+ size_t thisbuffer=pptr()-pbase();
+ if(thisbuffer > 0)
+ {
+ // Detach the current buffer and replace with a fresh one to allow the kernel to steal the page
+ lastbuffer=std::move(buffer);
+ buffer.resize(lastbuffer.size());
+ setp(buffer.data(), buffer.data() + buffer.size());
+ // Schedule an extension of physical storage by an extra page
+ lastwrite = afio::async_truncate(lastwrite, offset + thisbuffer);
+ // Schedule an asynchronous write of the buffer to storage
+ lastwrite=afio::async_write(lastwrite, lastbuffer.data(), thisbuffer, offset);
+ offset+=thisbuffer;
+ }
+ return 0;
+ }
+ };
+ std::unique_ptr<directstreambuf> buf;
+ odirectstream(afio::handle_ptr h) : std::ostream(new directstreambuf(std::move(h))), buf(static_cast<directstreambuf *>(rdbuf()))
+ {
+ }
+ virtual ~odirectstream() override
+ {
+ // Reset the stream before deleting the buffer
+ rdbuf(nullptr);
+ }
+};
+//]
+
+namespace asio = BOOST_AFIO_V2_NAMESPACE::asio;
+using BOOST_AFIO_V2_NAMESPACE::error_code;
+using BOOST_AFIO_V2_NAMESPACE::generic_category;
+
+static bool is_valid_name(const std::string &name) noexcept
+{
+ static const std::string banned("<>:\"/\\|?*\0", 10);
+ if(std::string::npos!=name.find_first_of(banned))
+ return false;
+ // No leading period
+ return name[0]!='.';
+}
+
+data_store::data_store(size_t flags, afio::path path)
+{
+ // Make a dispatcher for the local filesystem URI, masking out write flags on all operations if not writeable
+ _dispatcher=afio::make_dispatcher("file:///", afio::file_flags::none, !(flags & writeable) ? afio::file_flags::write : afio::file_flags::none).get();
+ // Set the dispatcher for this thread, and open a handle to the store directory
+ afio::current_dispatcher_guard h(_dispatcher);
+ _store=afio::dir(std::move(path), afio::file_flags::create); // throws if there was an error
+}
+
+//[workshop_naive_async_afio2]
+shared_future<data_store::istream> data_store::lookup(std::string name) noexcept
+{
+ if(!is_valid_name(name))
+ return error_code(EINVAL, generic_category());
+ try
+ {
+ // Schedule the opening of the file for reading
+ afio::future<> h(afio::async_file(_store, name, afio::file_flags::read));
+ // When it completes, call this continuation
+ return h.then([](afio::future<> &_h) -> shared_future<data_store::istream> {
+ // If file didn't open, return the error or exception immediately
+ BOOST_OUTCOME_PROPAGATE(_h);
+ size_t length=(size_t) _h->lstat(afio::metadata_flags::size).st_size;
+ // Is a memory map more appropriate?
+ if(length>=128*1024)
+ {
+ afio::handle::mapped_file_ptr mfp;
+ if((mfp=_h->map_file()))
+ {
+ data_store::istream ret(std::make_shared<idirectstream>(_h.get_handle(), std::move(mfp), length));
+ return ret;
+ }
+ }
+ // Schedule the reading of the file into a buffer
+ auto buffer=std::make_shared<file_buffer_type>(length);
+ afio::future<> h(afio::async_read(_h, buffer->data(), length, 0));
+ // When the read completes call this continuation
+ return h.then([buffer, length](const afio::future<> &h) -> shared_future<data_store::istream> {
+ // If read failed, return the error or exception immediately
+ BOOST_OUTCOME_PROPAGATE(h);
+ data_store::istream ret(std::make_shared<idirectstream>(h.get_handle(), buffer, length));
+ return ret;
+ });
+ });
+ }
+ catch(...)
+ {
+ // Boost.Monad futures are also monads, so this implies a make_ready_future()
+ return std::current_exception();
+ }
+}
+//]
+
+//[workshop_naive_async_afio1]
+shared_future<data_store::ostream> data_store::write(std::string name) noexcept
+{
+ if(!is_valid_name(name))
+ return error_code(EINVAL, generic_category());
+ try
+ {
+ // Schedule the opening of the file for writing
+ afio::future<> h(afio::async_file(_store, name, afio::file_flags::create | afio::file_flags::write));
+ // When it completes, call this continuation
+ return h.then([](const afio::future<> &h) -> shared_future<data_store::ostream> {
+ // If file didn't open, return the error or exception immediately
+ BOOST_OUTCOME_PROPAGATE(h);
+ // Create an ostream which directly uses the file.
+ data_store::ostream ret(std::make_shared<odirectstream>(h.get_handle()));
+ return std::move(ret);
+ });
+ }
+ catch (...)
+ {
+ // Boost.Monad futures are also monads, so this implies a make_ready_future()
+ return std::current_exception();
+ }
+}
+//]
+
+int main(void)
+{
+ // To write a key-value item called "dog"
+ {
+ data_store ds;
+ auto dogh = ds.write("dog").get();
+ auto &dogs = *dogh;
+ dogs << "I am a dog";
+ }
+
+ // To retrieve a key-value item called "dog"
+ {
+ data_store ds;
+ auto dogh = ds.lookup("dog");
+ if (dogh.empty())
+ std::cerr << "No item called dog" << std::endl;
+ else if(dogh.has_error())
+ std::cerr << "ERROR: Looking up dog returned error " << dogh.get_error().message() << std::endl;
+ else if (dogh.has_exception())
+ {
+ std::cerr << "FATAL: Looking up dog threw exception" << std::endl;
+ std::terminate();
+ }
+ else
+ {
+ std::string buffer;
+ *dogh.get() >> buffer;
+ std::cout << "Item dog has value " << buffer << std::endl;
+ }
+ }
+ return 0;
+}