diff options
author | Niall Douglas (s [underscore] sourceforge {at} nedprod [dot] com) <spam@nowhere> | 2016-03-21 02:41:51 +0300 |
---|---|---|
committer | Niall Douglas (s [underscore] sourceforge {at} nedprod [dot] com) <spam@nowhere> | 2016-03-21 02:41:51 +0300 |
commit | 758a934ab266ed660daa54b72e4606b78e374071 (patch) | |
tree | 6f2fe1c5d2b8331f9319549bc6f0c3390168eb6b /attic/example |
AFIO v2: Relocate all the AFIO v2 files in fs_probe into the root hierarchy. AFIO v2 is now the master branch!
Diffstat (limited to 'attic/example')
37 files changed, 4201 insertions, 0 deletions
diff --git a/attic/example/.clang-format b/attic/example/.clang-format new file mode 100644 index 00000000..975edaa5 --- /dev/null +++ b/attic/example/.clang-format @@ -0,0 +1,57 @@ +--- +Language: Cpp +AccessModifierOffset: -2 +ConstructorInitializerIndentWidth: 4 +AlignEscapedNewlinesLeft: false +AlignTrailingComments: true +AllowAllParametersOfDeclarationOnNextLine: true +AllowShortBlocksOnASingleLine: false +AllowShortIfStatementsOnASingleLine: false +AllowShortLoopsOnASingleLine: false +AllowShortFunctionsOnASingleLine: Inline +AlwaysBreakTemplateDeclarations: false +AlwaysBreakBeforeMultilineStrings: false +BreakBeforeBinaryOperators: None +BreakBeforeBraces: Allman +BreakBeforeTernaryOperators: false +BreakConstructorInitializersBeforeComma: true +BinPackParameters: true +ColumnLimit: 85 +CommentPragmas: '^!<' +ConstructorInitializerAllOnOneLineOrOnePerLine: false +ContinuationIndentWidth: 0 +Cpp11BracedListStyle: true +DerivePointerAlignment: false +DisableFormat: false +ExperimentalAutoDetectBinPacking: false +ForEachMacros: [ foreach, Q_FOREACH, BOOST_FOREACH ] +IndentCaseLabels: false +IndentFunctionDeclarationAfterType: false +IndentWidth: 2 +IndentWrappedFunctionNames: false +MaxEmptyLinesToKeep: 2 +KeepEmptyLinesAtTheStartOfBlocks: true +NamespaceIndentation: All +ObjCSpaceAfterProperty: false +ObjCSpaceBeforeProtocolList: false +PenaltyBreakBeforeFirstCallParameter: 19 +PenaltyBreakComment: 300 +PenaltyBreakString: 1000 +PenaltyBreakFirstLessLess: 120 +PenaltyExcessCharacter: 1000000 +PenaltyReturnTypeOnItsOwnLine: 60 +PointerAlignment: Right +Standard: Cpp11 +SpaceAfterCStyleCast: true +SpaceBeforeAssignmentOperators: true +SpaceBeforeParens: Never +SpaceInEmptyParentheses: false +SpacesBeforeTrailingComments: 2 +SpacesInParentheses: false +SpacesInAngles: false +SpacesInCStyleCastParentheses: false +SpacesInContainerLiterals: true +TabWidth: 8 +UseTab: Never +... + diff --git a/attic/example/adopt_example.cpp b/attic/example/adopt_example.cpp new file mode 100644 index 00000000..b18e9827 --- /dev/null +++ b/attic/example/adopt_example.cpp @@ -0,0 +1,57 @@ +#include "afio_pch.hpp" + +//[adopt_example +struct test_handle : boost::afio::handle +{ + test_handle(boost::afio::dispatcher *parent) : + boost::afio::handle(parent, + boost::afio::file_flags::none) {} + virtual void close() override final + { + // Do nothing + } + virtual handle::open_states is_open() const override final + { + return handle::open_states::open; + } + virtual void *native_handle() const override final + { + return nullptr; + } + virtual boost::afio::path path(bool refresh=false) override final + { + return boost::afio::path(); + } + virtual boost::afio::path path() const override final + { + return boost::afio::path(); + } + virtual boost::afio::directory_entry direntry(boost::afio::metadata_flags + wanted=boost::afio::directory_entry::metadata_fastpath()) override final + { + return boost::afio::directory_entry(); + } + virtual boost::afio::path target() override final + { + return boost::afio::path(); + } + virtual void link(const boost::afio::path_req &req) override final + { + } + virtual void unlink() override final + { + } + virtual void atomic_relink(const boost::afio::path_req &req) override final + { + } +}; + +int main(void) +{ + using namespace BOOST_AFIO_V2_NAMESPACE; + auto dispatcher = boost::afio::make_dispatcher().get(); + current_dispatcher_guard h(dispatcher); + auto foreignh=std::make_shared<test_handle>(dispatcher.get()); + return 0; +} +//] diff --git a/attic/example/barrier_example.cpp b/attic/example/barrier_example.cpp new file mode 100644 index 00000000..46a739f2 --- /dev/null +++ b/attic/example/barrier_example.cpp @@ -0,0 +1,67 @@ +#include "afio_pch.hpp" + +int main(void) +{ + //[barrier_example + // Assume that groups is 10,000 items long with item.first being randomly + // between 1 and 500. This example is adapted from the barrier() unit test. + // + // What we're going to do is this: for each item in groups, schedule item.first + // parallel ops and a barrier which completes only when the last of that + // parallel group completes. Chain the next group to only execute after the + // preceding group's barrier completes. Repeat until all groups have been executed. + std::shared_ptr<boost::afio::dispatcher> dispatcher= + boost::afio::make_dispatcher().get(); + std::vector<std::pair<size_t, int>> groups; + boost::afio::atomic<size_t> callcount[10000]; + memset(&callcount, 0, sizeof(callcount)); + + // This lambda is what each parallel op in each group will do: increment an atomic + // for that group. + auto inccount = [](boost::afio::atomic<size_t> *count){ (*count)++; }; + + // This lambda is called after each barrier completes, and it checks that exactly + // the right number of inccount lambdas were executed. + auto verifybarrier = [](boost::afio::atomic<size_t> *count, size_t shouldbe) + { + if (*count != shouldbe) + throw std::runtime_error("Count was not what it should have been!"); + return true; + }; + + // For each group, dispatch ops and a barrier for them + boost::afio::future<> next; + bool isfirst = true; + for(auto &run : groups) + { + // Create a vector of run.first size of bound inccount lambdas + // This will be the batch issued for this group + std::vector<std::function<void()>> thisgroupcalls(run.first, std::bind(inccount, &callcount[run.second])); + std::vector<boost::afio::future<>> thisgroupcallops; + // If this is the first item, schedule without precondition + if (isfirst) + { + thisgroupcallops = dispatcher->call(thisgroupcalls); + isfirst = false; + } + else + { + // Create a vector of run.first size of preconditions exactly + // matching the number in this batch. Note that the precondition + // for all of these is the preceding verify op + std::vector<boost::afio::future<>> dependency(run.first, next); + thisgroupcallops = dispatcher->call(dependency, thisgroupcalls); + } + // barrier() is very easy: its number of output ops exactly matches its input + // but none of the output will complete until the last of the input completes + auto thisgroupbarriered = dispatcher->barrier(thisgroupcallops); + // Schedule a call of the verify lambda once barrier completes. Here we choose + // the first item of the barrier's return, but in truth any of them are good. + auto verify = dispatcher->call(thisgroupbarriered.front(), std::function<bool()>(std::bind(verifybarrier, &callcount[run.second], run.first))); + // Set the dependency for the next batch to be the just scheduled verify op + next = verify; + } + // next was the last op scheduled, so waiting on it waits on everything + when_all_p(next).wait(); + //] +} diff --git a/attic/example/benchmark_asio.cpp b/attic/example/benchmark_asio.cpp new file mode 100644 index 00000000..45de6e45 --- /dev/null +++ b/attic/example/benchmark_asio.cpp @@ -0,0 +1,47 @@ +#include "afio_pch.hpp" + +/* My Intel Core i7 3770K running Windows 8 x64: 2591360 closures/sec + My Intel Core i7 3770K running Linux x64: 1611040 closures/sec (4 threads) +*/ + +static boost::afio::atomic<size_t> togo(0); +static int callback() +{ +#if 0 + Sleep(0); +#endif + --togo; + return 1; +}; +int main(void) +{ + using namespace boost::afio; + typedef chrono::duration<double, ratio<1, 1>> secs_type; + auto threadpool=process_threadpool(); + auto begin=chrono::high_resolution_clock::now(); + while(chrono::duration_cast<secs_type>(chrono::high_resolution_clock::now()-begin).count()<3); + + atomic<size_t> threads(0); +#if 0 + std::cout << "Attach profiler now and hit Return" << std::endl; + getchar(); +#endif + begin=chrono::high_resolution_clock::now(); +#pragma omp parallel + { + ++threads; + for(size_t n=0; n<5000000; n++) + { + ++togo; + threadpool->enqueue(callback); + } + } + while(togo) + this_thread::sleep_for(chrono::milliseconds(1)); + auto end=chrono::high_resolution_clock::now(); + auto diff=chrono::duration_cast<secs_type>(end-begin); + std::cout << "It took " << diff.count() << " secs to execute " << (5000000*threads) << " closures which is " << (5000000*threads/diff.count()) << " closures/sec" << std::endl; + std::cout << "\nPress Return to exit ..." << std::endl; + getchar(); + return 0; +} diff --git a/attic/example/benchmark_atomic_log.cpp b/attic/example/benchmark_atomic_log.cpp new file mode 100644 index 00000000..ab046837 --- /dev/null +++ b/attic/example/benchmark_atomic_log.cpp @@ -0,0 +1,521 @@ +#include "afio_pch.hpp" + +/* On my Win8.1 x86 laptop Intel i5 540M @ 2.53Ghz on NTFS Samsung SSD: + +Benchmarking traditional file locks with 2 concurrent writers ... +Waiting for threads to exit ... +For 2 concurrent writers, achieved 1720.7 attempts per second with a success rate of 1335.24 writes per second which is a 77.5984% success rate. + +Benchmarking file locks via atomic append with 2 concurrent writers ... +Waiting for threads to exit ... +For 2 concurrent writers, achieved 2413.66 attempts per second with a success rate of 790.364 writes per second which is a 32.7455% success rate. +Traditional locks were 1.6894 times faster. + +Without file_flags::temporary_file | file_flags::delete_on_close, traditional file locks were: + +Benchmarking traditional file locks with 2 concurrent writers ... +Waiting for threads to exit ... +For 2 concurrent writers, achieved 1266.4 attempts per second with a success rate of 809.013 writes per second which is a 63.883% success rate. + +*/ + +//[benchmark_atomic_log +int main(int argc, const char *argv[]) +{ + using namespace BOOST_AFIO_V2_NAMESPACE; + using BOOST_AFIO_V2_NAMESPACE::off_t; + typedef chrono::duration<double, ratio<1, 1>> secs_type; + double traditional_locks=0, atomic_log_locks=0; + try { filesystem::remove_all("testdir"); } catch(...) {} + + size_t totalwriters=2, writers=totalwriters; + if(argc>1) + writers=totalwriters=atoi(argv[1]); + { + auto dispatcher = make_dispatcher().get(); + auto mkdir(dispatcher->dir(path_req("testdir", file_flags::create))); + auto mkdir1(dispatcher->dir(path_req::relative(mkdir, "1", file_flags::create))); + auto mkdir2(dispatcher->dir(path_req::relative(mkdir, "2", file_flags::create))); + auto mkdir3(dispatcher->dir(path_req::relative(mkdir, "3", file_flags::create))); + auto mkdir4(dispatcher->dir(path_req::relative(mkdir, "4", file_flags::create))); + auto mkdir5(dispatcher->dir(path_req::relative(mkdir, "5", file_flags::create))); + auto mkdir6(dispatcher->dir(path_req::relative(mkdir, "6", file_flags::create))); + auto mkdir7(dispatcher->dir(path_req::relative(mkdir, "7", file_flags::create))); + auto mkdir8(dispatcher->dir(path_req::relative(mkdir, "8", file_flags::create))); + auto statfs_(dispatcher->statfs(mkdir, fs_metadata_flags::All)); + auto statfs(statfs_.get()); + std::cout << "The filing system holding our test directory is " << statfs.f_fstypename << " and has features:" << std::endl; +#define PRINT_FIELD(field, ...) \ + std::cout << " f_flags." #field ": "; std::cout << statfs.f_flags.field __VA_ARGS__ << std::endl + PRINT_FIELD(rdonly); + PRINT_FIELD(noexec); + PRINT_FIELD(nosuid); + PRINT_FIELD(acls); + PRINT_FIELD(xattr); + PRINT_FIELD(compression); + PRINT_FIELD(extents); + PRINT_FIELD(filecompression); +#undef PRINT_FIELD +#define PRINT_FIELD(field, ...) \ + std::cout << " f_" #field ": "; std::cout << statfs.f_##field __VA_ARGS__ << std::endl + PRINT_FIELD(bsize); + PRINT_FIELD(iosize); + PRINT_FIELD(blocks, << " (" << (statfs.f_blocks*statfs.f_bsize / 1024.0 / 1024.0 / 1024.0) << " Gb)"); + PRINT_FIELD(bfree, << " (" << (statfs.f_bfree*statfs.f_bsize / 1024.0 / 1024.0 / 1024.0) << " Gb)"); + PRINT_FIELD(bavail, << " (" << (statfs.f_bavail*statfs.f_bsize / 1024.0 / 1024.0 / 1024.0) << " Gb)"); +#undef PRINT_FIELD + } + if(1) + { + std::cout << "\nBenchmarking a single traditional lock file with " << writers << " concurrent writers ...\n"; + std::vector<thread> threads; + atomic<bool> done(true); + atomic<size_t> attempts(0), successes(0); + for(size_t n=0; n<writers; n++) + { + threads.push_back(thread([&done, &attempts, &successes, n]{ + try + { + // Create a dispatcher + auto dispatcher = make_dispatcher().get(); + // Schedule opening the log file for writing log entries + auto logfile(dispatcher->file(path_req("testdir/log", + file_flags::create | file_flags::read_write))); + // Retrieve any errors which occurred + logfile.get(); + // Wait until all threads are ready + while(done) { this_thread::yield(); } + while(!done) + { + // Traditional file locks are very simple: try to exclusively create the lock file. + // If you succeed, you have the lock. + auto lockfile(dispatcher->file(path_req("testdir/log.lock", + file_flags::create_only_if_not_exist | file_flags::write | file_flags::temporary_file | file_flags::delete_on_close))); + attempts.fetch_add(1, memory_order_relaxed); + // v1.4 of the AFIO engine will return error_code instead of exceptions for this + try { lockfile.get(); } catch(const system_error &e) { continue; } + std::string logentry("I am log writer "), mythreadid(to_string(n)), logentryend("!\n"); + // Fetch the size + off_t where=logfile->lstat().st_size, entrysize=logentry.size()+mythreadid.size()+logentryend.size(); + // Schedule extending the log file + auto extendlog(dispatcher->truncate(logfile, where+entrysize)); + // Schedule writing the new entry + auto writetolog(dispatcher->write(make_io_req(extendlog, {logentry, mythreadid, logentryend}, where))); + writetolog.get(); + extendlog.get(); + successes.fetch_add(1, memory_order_relaxed); + } + } + catch(const system_error &e) { std::cerr << "ERROR: test exits via system_error code " << e.code().value() << "(" << e.what() << ")" << std::endl; abort(); } + catch(const std::exception &e) { std::cerr << "ERROR: test exits via exception (" << e.what() << ")" << std::endl; abort(); } + catch(...) { std::cerr << "ERROR: test exits via unknown exception" << std::endl; abort(); } + })); + } + auto begin=chrono::high_resolution_clock::now(); + done=false; + std::this_thread::sleep_for(std::chrono::seconds(20)); + done=true; + std::cout << "Waiting for threads to exit ..." << std::endl; + for(auto &i : threads) + i.join(); + auto end=chrono::high_resolution_clock::now(); + auto diff=chrono::duration_cast<secs_type>(end-begin); + std::cout << "For " << writers << " concurrent writers, achieved " << (attempts/diff.count()) << " attempts per second with a " + "success rate of " << (successes/diff.count()) << " writes per second which is a " << (100.0*successes/attempts) << "% success rate." << std::endl; + traditional_locks=successes/diff.count(); + } + + if(1) + { + std::cout << "\nBenchmarking eight traditional lock files with " << writers << " concurrent writers ...\n"; + std::vector<thread> threads; + atomic<bool> done(true); + atomic<size_t> attempts(0), successes(0); + for(size_t n=0; n<writers; n++) + { + threads.push_back(thread([&done, &attempts, &successes, n]{ + try + { + // Create a dispatcher + auto dispatcher = make_dispatcher().get(); + // Schedule opening the log file for writing log entries + auto logfile(dispatcher->file(path_req("testdir/log", + file_flags::create | file_flags::read_write))); + // Retrieve any errors which occurred + logfile.get(); + // Wait until all threads are ready + while(done) { this_thread::yield(); } + while(!done) + { + // Parallel try to exclusively create all eight lock files + std::vector<path_req> lockfiles; lockfiles.reserve(8); + lockfiles.push_back(path_req("testdir/1/log.lock", + file_flags::create_only_if_not_exist | file_flags::write | file_flags::temporary_file | file_flags::delete_on_close)); + lockfiles.push_back(path_req("testdir/2/log.lock", + file_flags::create_only_if_not_exist | file_flags::write | file_flags::temporary_file | file_flags::delete_on_close)); + lockfiles.push_back(path_req("testdir/3/log.lock", + file_flags::create_only_if_not_exist | file_flags::write | file_flags::temporary_file | file_flags::delete_on_close)); + lockfiles.push_back(path_req("testdir/4/log.lock", + file_flags::create_only_if_not_exist | file_flags::write | file_flags::temporary_file | file_flags::delete_on_close)); + lockfiles.push_back(path_req("testdir/5/log.lock", + file_flags::create_only_if_not_exist | file_flags::write | file_flags::temporary_file | file_flags::delete_on_close)); + lockfiles.push_back(path_req("testdir/6/log.lock", + file_flags::create_only_if_not_exist | file_flags::write | file_flags::temporary_file | file_flags::delete_on_close)); + lockfiles.push_back(path_req("testdir/7/log.lock", + file_flags::create_only_if_not_exist | file_flags::write | file_flags::temporary_file | file_flags::delete_on_close)); + lockfiles.push_back(path_req("testdir/8/log.lock", + file_flags::create_only_if_not_exist | file_flags::write | file_flags::temporary_file | file_flags::delete_on_close)); + auto lockfile(dispatcher->file(lockfiles)); + attempts.fetch_add(1, memory_order_relaxed); +#if 1 + // v1.4 of the AFIO engine will return error_code instead of exceptions for this + try { lockfile[7].get(); } catch(const system_error &e) { continue; } + try { lockfile[6].get(); } catch(const system_error &e) { continue; } + try { lockfile[5].get(); } catch(const system_error &e) { continue; } + try { lockfile[4].get(); } catch(const system_error &e) { continue; } + try { lockfile[3].get(); } catch(const system_error &e) { continue; } + try { lockfile[2].get(); } catch(const system_error &e) { continue; } + try { lockfile[1].get(); } catch(const system_error &e) { continue; } + try { lockfile[0].get(); } catch(const system_error &e) { continue; } +#else + try + { + auto barrier(dispatcher->barrier(lockfile)); + // v1.4 of the AFIO engine will return error_code instead of exceptions for this + for(size_t n=0; n<8; n++) + barrier[n].get(); + } + catch(const system_error &e) { continue; } +#endif + std::string logentry("I am log writer "), mythreadid(to_string(n)), logentryend("!\n"); + // Fetch the size + off_t where=logfile->lstat().st_size, entrysize=logentry.size()+mythreadid.size()+logentryend.size(); + // Schedule extending the log file + auto extendlog(dispatcher->truncate(logfile, where+entrysize)); + // Schedule writing the new entry + auto writetolog(dispatcher->write(make_io_req(extendlog, {logentry, mythreadid, logentryend}, where))); + // Fetch errors from the last operation first to avoid sleep-wake cycling + writetolog.get(); + extendlog.get(); + successes.fetch_add(1, memory_order_relaxed); + } + } + catch(const system_error &e) { std::cerr << "ERROR: test exits via system_error code " << e.code().value() << "(" << e.what() << ")" << std::endl; abort(); } + catch(const std::exception &e) { std::cerr << "ERROR: test exits via exception (" << e.what() << ")" << std::endl; abort(); } + catch(...) { std::cerr << "ERROR: test exits via unknown exception" << std::endl; abort(); } + })); + } + auto begin=chrono::high_resolution_clock::now(); + done=false; + std::this_thread::sleep_for(std::chrono::seconds(20)); + done=true; + std::cout << "Waiting for threads to exit ..." << std::endl; + for(auto &i : threads) + i.join(); + auto end=chrono::high_resolution_clock::now(); + auto diff=chrono::duration_cast<secs_type>(end-begin); + std::cout << "For " << writers << " concurrent writers, achieved " << (attempts/diff.count()) << " attempts per second with a " + "success rate of " << (successes/diff.count()) << " writes per second which is a " << (100.0*successes/attempts) << "% success rate." << std::endl; + } + + // **** WARNING UNSUPPORTED UNDOCUMENTED API DO NOT USE IN YOUR CODE **** + if(1) + { + std::cout << "\nBenchmarking a ranged file lock with " << writers << " concurrent writers ...\n"; + std::vector<thread> threads; + atomic<bool> done(true); + atomic<size_t> attempts(0), successes(0); + for(size_t n=0; n<writers; n++) + { + threads.push_back(thread([&done, &attempts, &successes, n]{ + try + { + // Create a dispatcher + auto dispatcher = make_dispatcher().get(); + // Schedule opening the log file for writing log entries + auto logfile(dispatcher->file(path_req("testdir/log", + file_flags::create | file_flags::read_write | file_flags::os_lockable))); + // Retrieve any errors which occurred + logfile.get(); + // Wait until all threads are ready + while(done) { this_thread::yield(); } + while(!done) + { + attempts.fetch_add(1, memory_order_relaxed); + // **** WARNING UNSUPPORTED UNDOCUMENTED API DO NOT USE IN YOUR CODE **** + dispatcher->lock({logfile}).front().get(); + std::string logentry("I am log writer "), mythreadid(to_string(n)), logentryend("!\n"); + // Fetch the size + off_t where=logfile->lstat().st_size, entrysize=logentry.size()+mythreadid.size()+logentryend.size(); + // Schedule extending the log file + auto extendlog(dispatcher->truncate(logfile, where+entrysize)); + // Schedule writing the new entry + auto writetolog(dispatcher->write(make_io_req(extendlog, {logentry, mythreadid, logentryend}, where))); + writetolog.get(); + extendlog.get(); + successes.fetch_add(1, memory_order_relaxed); + dispatcher->lock({{logfile, nullptr}}).front().get(); + } + } + catch(const system_error &e) { std::cerr << "ERROR: test exits via system_error code " << e.code().value() << "(" << e.what() << ")" << std::endl; abort(); } + catch(const std::exception &e) { std::cerr << "ERROR: test exits via exception (" << e.what() << ")" << std::endl; abort(); } + catch(...) { std::cerr << "ERROR: test exits via unknown exception" << std::endl; abort(); } + })); + } + auto begin=chrono::high_resolution_clock::now(); + done=false; + std::this_thread::sleep_for(std::chrono::seconds(20)); + done=true; + std::cout << "Waiting for threads to exit ..." << std::endl; + for(auto &i : threads) + i.join(); + auto end=chrono::high_resolution_clock::now(); + auto diff=chrono::duration_cast<secs_type>(end-begin); + std::cout << "For " << writers << " concurrent writers, achieved " << (attempts/diff.count()) << " attempts per second with a " + "success rate of " << (successes/diff.count()) << " writes per second which is a " << (100.0*successes/attempts) << "% success rate." << std::endl; + } + + if(1) + { + std::cout << "\nBenchmarking file locks via atomic append with " << writers << " concurrent writers ...\n"; + std::vector<thread> threads; + atomic<bool> done(true); + atomic<size_t> attempts(0), successes(0); + for(size_t thread=0; thread<writers; thread++) + { + threads.push_back(std::thread([&done, &attempts, &successes, thread]{ + try + { + // Create a dispatcher + auto dispatcher = make_dispatcher().get(); + // Schedule opening the log file for writing log entries + auto logfile(dispatcher->file(path_req("testdir/log", + file_flags::create | file_flags::read_write))); + // Schedule opening the lock file for scanning and hole punching + auto lockfilez(dispatcher->file(path_req("testdir/log.lock", + file_flags::create | file_flags::read_write))); + // Schedule opening the lock file for atomic appending + auto lockfilea(dispatcher->file(path_req("testdir/log.lock", + file_flags::create | file_flags::write | file_flags::append))); + // Retrieve any errors which occurred + lockfilea.get(); lockfilez.get(); logfile.get(); + while(!done) + { + // Each lock log entry is 16 bytes in length. + enum class message_code_t : uint8_t + { + unlock=0, + havelock=1, + rescind=2, + interest=3, + nominate=5 + }; +#pragma pack(push, 1) + union message_t + { + char bytes[16]; + struct + { + message_code_t code; + char __padding1[3]; + uint32_t timestamp; // time_t + uint64_t uniqueid; + }; + }; +#pragma pack(pop) + static_assert(sizeof(message_t)==16, "message_t is not 16 bytes long!"); + auto gettime=[]{ return (uint32_t)(std::time(nullptr)-1420070400UL/* 1st Jan 2015*/); }; + message_t temp, buffers[256]; + off_t buffersoffset; + uint32_t nowtimestamp=gettime(); + // TODO FIXME: If multiple machines are all accessing the lock file, nowtimestamp + // ought to be corrected for drift + + // Step 1: Register my interest + memset(temp.bytes, 0, sizeof(temp)); + temp.code=message_code_t::interest; + temp.timestamp=nowtimestamp; + temp.uniqueid=thread; // TODO FIXME: Needs to be a VERY random number to prevent collision. + dispatcher->write(make_io_req(lockfilea, temp.bytes, sizeof(temp), 0)).get(); + + // Step 2: Wait until my interest message appears, also figure out what interests precede + // mine and where my start of interest begins, and if someone currently has the lock + off_t startofinterest=dispatcher->extents(lockfilez).get().front().first; + off_t myuniqueid=(off_t)-1; + bool findPreceding=true; + std::vector<std::pair<bool, off_t>> preceding; + std::pair<bool, off_t> lockid; + auto iterate=[&]{ + size_t validPrecedingCount=0; + off_t lockfilesize=lockfilez->lstat(metadata_flags::size).st_size; + buffersoffset=lockfilesize>sizeof(buffers) ? lockfilesize-sizeof(buffers) : 0; + //buffersoffset-=buffersoffset % sizeof(buffers[0]); + for(; !validPrecedingCount && buffersoffset>=startofinterest && buffersoffset<lockfilesize; buffersoffset-=sizeof(buffers)) + { + size_t amount=(size_t)(lockfilesize-buffersoffset); + if(amount>sizeof(buffers)) + amount=sizeof(buffers); + dispatcher->read(make_io_req(lockfilez, (void *) buffers, amount, buffersoffset)).get(); + for(size_t n=amount/sizeof(buffers[0])-1; !validPrecedingCount && n<amount/sizeof(buffers[0]); n--) + { + // Early exit if messages have become stale + if(!buffers[n].timestamp || (buffers[n].timestamp<nowtimestamp && nowtimestamp-buffers[n].timestamp>20)) + { + startofinterest=buffersoffset+n*sizeof(buffers[0]); + break; + } + // Find if he is locked or unlocked + if(lockid.second==(off_t) -1) + { + if(buffers[n].code==message_code_t::unlock) + lockid=std::make_pair(false, buffers[n].uniqueid); + else if(buffers[n].code==message_code_t::havelock) + lockid=std::make_pair(true, buffers[n].uniqueid); + } + // Am I searching for my interest? + if(myuniqueid==(off_t)-1) + { + if(!memcmp(buffers+n, &temp, sizeof(temp))) + myuniqueid=buffersoffset+n*sizeof(buffers[0]); + } + else if(findPreceding && (buffers[n].uniqueid<myuniqueid || buffersoffset+n*sizeof(buffers[0])<myuniqueid)) + { + // We are searching for preceding claims now + if(buffers[n].code==message_code_t::rescind || buffers[n].code==message_code_t::unlock) + preceding.push_back(std::make_pair(false, buffers[n].uniqueid)); + else if(buffers[n].code==message_code_t::nominate || buffers[n].code==message_code_t::havelock) + { + if(buffers[n].uniqueid<myuniqueid && preceding.end()==std::find(preceding.begin(), preceding.end(), std::make_pair(false, (off_t) buffers[n].uniqueid))) + { + preceding.push_back(std::make_pair(true, buffers[n].uniqueid)); + validPrecedingCount++; + } + } + else if(buffers[n].code==message_code_t::interest) + { + if(buffersoffset+n*sizeof(buffers[0])<myuniqueid && preceding.end()==std::find(preceding.begin(), preceding.end(), std::make_pair(false, buffersoffset+n*sizeof(buffers[0])))) + { + preceding.push_back(std::make_pair(true, buffersoffset+n*sizeof(buffers[0]))); + validPrecedingCount++; + } + } + } + } + } +#if 0 + std::cout << thread << ": myuniqueid=" << myuniqueid << " startofinterest=" << startofinterest << " size=" << lockfilez->lstat(metadata_flags::size).st_size << " lockid=" << lockid.first << "," << lockid.second << " preceding="; + for(auto &i : preceding) + std::cout << i.first << "," << i.second << ";"; + std::cout << std::endl; +#endif + if(findPreceding) + { + // Remove all rescinded interests preceding ours + preceding.erase(std::remove_if(preceding.begin(), preceding.end(), [](const std::pair<bool, off_t> &i){ return !i.first; }), preceding.end()); + std::sort(preceding.begin(), preceding.end()); + findPreceding=false; + } + }; + do + { + lockid=std::make_pair(false, (off_t)-1); + iterate(); + // Didn't find it, so sleep and retry, maybe st_size will have updated by then + if(myuniqueid==(off_t)-1) this_thread::sleep_for(chrono::milliseconds(1)); + } while(myuniqueid==(off_t)-1); + + // Step 3: If there is no lock and no interest precedes mine, claim the mutex. Else issue a nominate for myself, + // once per ten seconds. + { + nowtimestamp=0; + mutex m; + condition_variable c; + unique_lock<decltype(m)> l(m); + atomic<bool> fileChanged(false); + for(;;) + { + attempts.fetch_add(1, memory_order_relaxed); + temp.timestamp=gettime(); + temp.uniqueid=myuniqueid; + if(preceding.empty()) + { + temp.code=message_code_t::havelock; + dispatcher->write(make_io_req(lockfilea, temp.bytes, sizeof(temp), 0)).get(); + // Zero the range between startofinterest and myuniqueid + if(startofinterest<myuniqueid) + { + std::vector<std::pair<off_t, off_t>> range={{startofinterest, myuniqueid-startofinterest}}; + dispatcher->zero(lockfilez, range).get(); + // std::cout << thread << ": lock taken for myuniqueid=" << myuniqueid << ", zeroing " << range.front().first << ", " << range.front().second << std::endl; + } + break; + } + else + { + auto lockfilechanged=[&]{ + fileChanged=true; + c.notify_all(); + }; + // TODO FIXME: Put a modify watch on the lockfile instead of spinning + if(temp.timestamp-nowtimestamp>=10) + { + temp.code=message_code_t::nominate; + dispatcher->write(make_io_req(lockfilea, temp.bytes, sizeof(temp), 0)).get(); + nowtimestamp=temp.timestamp; + } + //c.wait_for(l, chrono::milliseconds(1), [&fileChanged]{ return fileChanged==true; }); + fileChanged=false; + preceding.clear(); + findPreceding=true; + lockid=std::make_pair(false, (off_t)-1); + iterate(); + } + } + } + + // Step 4: I now have the lock, so do my thing + std::string logentry("I am log writer "), mythreadid(to_string(thread)), logentryend("!\n"); + // Fetch the size + off_t where=logfile->lstat().st_size, entrysize=logentry.size()+mythreadid.size()+logentryend.size(); + // Schedule extending the log file + auto extendlog(dispatcher->truncate(logfile, where+entrysize)); + // Schedule writing the new entry + auto writetolog(dispatcher->write(make_io_req(extendlog, {logentry, mythreadid, logentryend}, where))); + // Fetch errors from the last operation first to avoid sleep-wake cycling + writetolog.get(); + extendlog.get(); + successes.fetch_add(1, memory_order_relaxed); +// std::cout << thread << ": doing work for myuniqueid=" << myuniqueid << std::endl; +// this_thread::sleep_for(chrono::milliseconds(250)); + + // Step 5: Release the lock + temp.code=message_code_t::unlock; + temp.timestamp=gettime(); + temp.uniqueid=myuniqueid; +// std::cout << thread << ": lock released for myuniqueid=" << myuniqueid << std::endl; + dispatcher->write(make_io_req(lockfilea, temp.bytes, sizeof(temp), 0)).get(); + } + } + catch(const system_error &e) { std::cerr << "ERROR: test exits via system_error code " << e.code().value() << "(" << e.what() << ")" << std::endl; abort(); } + catch(const std::exception &e) { std::cerr << "ERROR: test exits via exception (" << e.what() << ")" << std::endl; abort(); } + catch(...) { std::cerr << "ERROR: test exits via unknown exception" << std::endl; abort(); } + })); + } + auto begin=chrono::high_resolution_clock::now(); + done=false; + std::this_thread::sleep_for(std::chrono::seconds(20)); + done=true; + std::cout << "Waiting for threads to exit ..." << std::endl; + for(auto &i : threads) + i.join(); + auto end=chrono::high_resolution_clock::now(); + auto diff=chrono::duration_cast<secs_type>(end-begin); + std::cout << "For " << writers << " concurrent writers, achieved " << (attempts/diff.count()) << " attempts per second with a " + "success rate of " << (successes/diff.count()) << " writes per second which is a " << (100.0*successes/attempts) << "% success rate." << std::endl; + atomic_log_locks=successes/diff.count(); + } + filesystem::remove_all("testdir"); + std::cout << "Traditional locks were " << (traditional_locks/atomic_log_locks) << " times faster." << std::endl; + return 0; +} +//] diff --git a/attic/example/benchmark_chained1.cpp b/attic/example/benchmark_chained1.cpp new file mode 100644 index 00000000..f40bf1c9 --- /dev/null +++ b/attic/example/benchmark_chained1.cpp @@ -0,0 +1,48 @@ +#include "afio_pch.hpp" + +/* My Intel Core i7 3770K running Windows 8 x64: 726124 closures/sec + My Intel Core i7 3770K running Linux x64: 968005 closures/sec +*/ + +static std::pair<bool, std::shared_ptr<boost::afio::handle>> _callback(size_t, boost::afio::future<> op) +{ +#if 0 + // Simulate an i/o op with a context switch + Sleep(0); +#endif + return std::make_pair(true, op.get_handle()); +}; + +int main(void) +{ + using namespace boost::afio; + auto dispatcher=make_dispatcher().get(); + typedef chrono::duration<double, ratio<1, 1>> secs_type; + auto begin=chrono::high_resolution_clock::now(); + while(chrono::duration_cast<secs_type>(chrono::high_resolution_clock::now()-begin).count()<3); + + std::pair<async_op_flags, dispatcher::completion_t *> callback(async_op_flags::none, _callback); + atomic<size_t> threads(0); +#if 0 + std::cout << "Attach profiler now and hit Return" << std::endl; + getchar(); +#endif + begin=chrono::high_resolution_clock::now(); +#pragma omp parallel + { + future<> last; + threads++; + for(size_t n=0; n<500000; n++) + { + last=dispatcher->completion(last, callback); + } + } + while(dispatcher->wait_queue_depth()) + this_thread::sleep_for(chrono::milliseconds(1)); + auto end=chrono::high_resolution_clock::now(); + auto diff=chrono::duration_cast<secs_type>(end-begin); + std::cout << "It took " << diff.count() << " secs to execute " << (500000*threads) << " closures which is " << (500000*threads/diff.count()) << " chained closures/sec" << std::endl; + std::cout << "\nPress Return to exit ..." << std::endl; + getchar(); + return 0; +} diff --git a/attic/example/benchmark_chained2.cpp b/attic/example/benchmark_chained2.cpp new file mode 100644 index 00000000..d410076c --- /dev/null +++ b/attic/example/benchmark_chained2.cpp @@ -0,0 +1,43 @@ +#include "afio_pch.hpp" + +/* My Intel Core i7 3770K running Windows 8 x64: 596318 closures/sec + My Intel Core i7 3770K running Linux x64: 794384 closures/sec +*/ + +static int callback() +{ +#if 0 + Sleep(0); +#endif + return 1; +}; + +int main(void) +{ + using namespace boost::afio; + auto dispatcher=make_dispatcher().get(); + typedef chrono::duration<double, ratio<1, 1>> secs_type; + auto begin=chrono::high_resolution_clock::now(); + while(chrono::duration_cast<secs_type>(chrono::high_resolution_clock::now()-begin).count()<3); + + atomic<size_t> threads(0); + std::vector<std::function<int()>> callbacks(1, callback); + begin=chrono::high_resolution_clock::now(); +#pragma omp parallel + { + std::vector<future<>> preconditions(1); + threads++; + for(size_t n=0; n<500000; n++) + { + preconditions.front()=dispatcher->call(preconditions, callbacks).front(); + } + } + while(dispatcher->wait_queue_depth()) + this_thread::sleep_for(chrono::milliseconds(1)); + auto end=chrono::high_resolution_clock::now(); + auto diff=chrono::duration_cast<secs_type>(end-begin); + std::cout << "It took " << diff.count() << " secs to execute " << (500000*threads) << " closures which is " << (500000*threads/diff.count()) << " chained closures/sec" << std::endl; + std::cout << "\nPress Return to exit ..." << std::endl; + getchar(); + return 0; +} diff --git a/attic/example/benchmark_latency.cpp b/attic/example/benchmark_latency.cpp new file mode 100644 index 00000000..f11422fd --- /dev/null +++ b/attic/example/benchmark_latency.cpp @@ -0,0 +1,162 @@ +#include "afio_pch.hpp" +#include <thread> + +#define ITERATIONS 10000 +#define CONCURRENCY 32 + +// Optional +//#define MULTIPLIER 1000000 // output number of microseconds instead of seconds +#define MULTIPLIER 3900000000ULL // output number of CPU clocks instead of seconds + +typedef decltype(boost::afio::chrono::high_resolution_clock::now()) time_point; +size_t id_offset; +static time_point points[100000]; +static time_point::duration overhead, timesliceoverhead, sleepoverhead; +static std::pair<bool, std::shared_ptr<boost::afio::handle>> _callback(size_t id, boost::afio::future<> op) +{ + using namespace boost::afio; + points[id-id_offset]=chrono::high_resolution_clock::now(); + return std::make_pair(true, op.get_handle()); +}; + +int main(void) +{ + using namespace boost::afio; + auto dispatcher=make_dispatcher().get(); + typedef chrono::duration<double, ratio<1, 1>> secs_type; + { + size_t total1=0, total2=0, total3=0; + auto begin=chrono::high_resolution_clock::now(); + while(chrono::duration_cast<secs_type>(chrono::high_resolution_clock::now()-begin).count()<1) + { + auto now2=chrono::high_resolution_clock::now(); + this_thread::yield(); + auto now3=chrono::high_resolution_clock::now(); + timesliceoverhead+=now3-now2; + total1++; + } + while(chrono::duration_cast<secs_type>(chrono::high_resolution_clock::now()-begin).count()<2) + { + auto now2=chrono::high_resolution_clock::now(); + this_thread::sleep_for(chrono::nanoseconds(1)); + auto now3=chrono::high_resolution_clock::now(); + sleepoverhead+=now3-now2; + total3++; + } + while(chrono::duration_cast<secs_type>(chrono::high_resolution_clock::now()-begin).count()<3) + { + auto now1=chrono::high_resolution_clock::now(); + auto now2=chrono::high_resolution_clock::now(); + overhead+=now2-now1; + total2++; + } + overhead=time_point::duration(overhead.count()/total2); + sleepoverhead=time_point::duration(sleepoverhead.count()/total3); + timesliceoverhead=time_point::duration(timesliceoverhead.count()/total1); + std::cout << "Timing overhead is calculated to be " << chrono::duration_cast<secs_type>(overhead).count() << " seconds." << std::endl; + std::cout << "OS context switch overhead is calculated to be " << chrono::duration_cast<secs_type>(timesliceoverhead).count() << " seconds." << std::endl; + std::cout << "OS sleep overhead is calculated to be " << chrono::duration_cast<secs_type>(sleepoverhead).count() << " seconds." << std::endl; + } + + std::pair<async_op_flags, dispatcher::completion_t *> callback(async_op_flags::none, _callback); + std::ofstream csv("afio_latencies.csv"); + csv << "Timing overhead is calculated to be," << chrono::duration_cast<secs_type>(overhead).count() +#ifdef MULTIPLIER + * MULTIPLIER +#endif + << std::endl; + csv << "OS context switch overhead is calculated to be," << chrono::duration_cast<secs_type>(timesliceoverhead).count() +#ifdef MULTIPLIER + * MULTIPLIER +#endif + << std::endl << std::endl; + csv << "OS sleep overhead is calculated to be," << chrono::duration_cast<secs_type>(sleepoverhead).count() +#ifdef MULTIPLIER + * MULTIPLIER +#endif + << std::endl << std::endl; + csv << "Concurrency,Handler Min,Handler Max,Handler Average,Handler Stddev,Complete Min,Complete Max,Complete Average,Complete Stddev" << std::endl; + for(size_t concurrency=0; concurrency<CONCURRENCY; concurrency++) + { + future<> last[CONCURRENCY]; + time_point begin[CONCURRENCY], handled[CONCURRENCY], end[CONCURRENCY]; + atomic<bool> waiter; + double handler[ITERATIONS], complete[ITERATIONS]; + std::vector<std::thread> threads; + threads.reserve(CONCURRENCY); + size_t iterations=(concurrency>=8) ? ITERATIONS/10 : ITERATIONS; + std::cout << "Running " << iterations << " iterations of concurrency " << concurrency+1 << " ..." << std::endl; + for(size_t n=0; n<iterations; n++) + { + threads.clear(); + waiter.store(true); + atomic<size_t> threads_ready(0); + for(size_t c=0; c<=concurrency; c++) + { + threads.push_back(std::thread([&, c]{ + ++threads_ready; + while(waiter) +#ifdef BOOST_SMT_PAUSE + BOOST_SMT_PAUSE +#endif + ; + begin[c]=chrono::high_resolution_clock::now(); + last[c]=dispatcher->completion(future<>(), callback); + last[c].get(); + end[c]=chrono::high_resolution_clock::now(); + handled[c]=points[last[c].id()-id_offset]; + })); + } + while(threads_ready<=concurrency) +#ifdef BOOST_SMT_PAUSE + BOOST_SMT_PAUSE +#endif + ; + waiter.store(false); + for(auto &i: threads) + i.join(); + for(size_t c=0; c<=concurrency; c++) + { + handler[n]=chrono::duration_cast<secs_type>(handled[c]-begin[c]-overhead).count(); + complete[n]=chrono::duration_cast<secs_type>(end[c]-handled[c]-overhead).count(); + } + } + for(size_t n=0; n<=concurrency; n++) + if(last[n].id()>id_offset) id_offset=last[n].id(); + double minHandler=1<<30, maxHandler=0, totalHandler=0, minComplete=1<<30, maxComplete=0, totalComplete=0; + for(size_t n=0; n<iterations; n++) + { + if(handler[n]<minHandler) minHandler=handler[n]; + if(handler[n]>maxHandler) maxHandler=handler[n]; + if(complete[n]<minHandler) minComplete=complete[n]; + if(complete[n]>maxHandler) maxComplete=complete[n]; + totalHandler+=handler[n]; + totalComplete+=complete[n]; + } + totalHandler/=iterations; + totalComplete/=iterations; + double varianceHandler=0, varianceComplete=0; + for(size_t n=0; n<iterations; n++) + { + varianceHandler+=pow(handler[n]-totalHandler, 2); + varianceComplete+=pow(complete[n]-totalComplete, 2); + } + varianceHandler/=iterations; + varianceComplete/=iterations; + varianceHandler=sqrt(varianceHandler); + varianceComplete=sqrt(varianceComplete); +#ifdef MULTIPLIER + minHandler*=MULTIPLIER; + maxHandler*=MULTIPLIER; + totalHandler*=MULTIPLIER; + varianceHandler*=MULTIPLIER; + minComplete*=MULTIPLIER; + maxComplete*=MULTIPLIER; + totalComplete*=MULTIPLIER; + varianceComplete*=MULTIPLIER; +#endif + csv << concurrency+1 << "," << minHandler << "," << maxHandler << "," << totalHandler << "," << varianceHandler << "," + << minComplete << "," << maxComplete << "," << totalComplete << "," << varianceComplete << std::endl; + } + return 0; +} diff --git a/attic/example/benchmark_unchained1.cpp b/attic/example/benchmark_unchained1.cpp new file mode 100644 index 00000000..2f9b6769 --- /dev/null +++ b/attic/example/benchmark_unchained1.cpp @@ -0,0 +1,43 @@ +#include "afio_pch.hpp" + +/* My Intel Core i7 3770K running Windows 8 x64: 1555990 closures/sec + My Intel Core i7 3770K running Linux x64: 1432810 closures/sec +*/ + +static std::pair<bool, std::shared_ptr<boost::afio::handle>> callback(size_t, boost::afio::future<> op) +{ +#if 0 + // Simulate an i/o op with a context switch + Sleep(0); +#endif + return std::make_pair(true, op.get_handle()); +}; + +int main(void) +{ + using namespace boost::afio; + auto dispatcher=make_dispatcher().get(); + typedef chrono::duration<double, ratio<1, 1>> secs_type; + auto begin=chrono::high_resolution_clock::now(); + while(chrono::duration_cast<secs_type>(chrono::high_resolution_clock::now()-begin).count()<3); + + std::vector<future<>> preconditions; + std::vector<std::pair<async_op_flags, dispatcher::completion_t *>> callbacks(1, + std::make_pair(async_op_flags::none, callback)); +#if 0 + std::cout << "Attach profiler now and hit Return" << std::endl; + getchar(); +#endif + begin=chrono::high_resolution_clock::now(); +#pragma omp parallel for + for(int n=0; n<5000000; n++) + dispatcher->completion(preconditions, callbacks); + while(dispatcher->wait_queue_depth()) + this_thread::sleep_for(chrono::milliseconds(1)); + auto end=chrono::high_resolution_clock::now(); + auto diff=chrono::duration_cast<secs_type>(end-begin); + std::cout << "It took " << diff.count() << " secs to execute 5,000,000 closures which is " << (5000000/diff.count()) << " unchained closures/sec" << std::endl; + std::cout << "\nPress Return to exit ..." << std::endl; + getchar(); + return 0; +} diff --git a/attic/example/benchmark_unchained2.cpp b/attic/example/benchmark_unchained2.cpp new file mode 100644 index 00000000..db9affe0 --- /dev/null +++ b/attic/example/benchmark_unchained2.cpp @@ -0,0 +1,37 @@ +#include "afio_pch.hpp" + +/* My Intel Core i7 3770K running Windows 8 x64: 911963 closures/sec + My Intel Core i7 3770K running Linux x64: 1094780 closures/sec +*/ + +static int callback() +{ +#if 0 + Sleep(0); +#endif + return 1; +}; + +int main(void) +{ + using namespace boost::afio; + auto dispatcher=make_dispatcher().get(); + typedef chrono::duration<double, ratio<1, 1>> secs_type; + auto begin=chrono::high_resolution_clock::now(); + while(chrono::duration_cast<secs_type>(chrono::high_resolution_clock::now()-begin).count()<3); + + std::vector<future<>> preconditions; + std::vector<std::function<int()>> callbacks(1, callback); + begin=chrono::high_resolution_clock::now(); +#pragma omp parallel for + for(int n=0; n<5000000; n++) + dispatcher->call(preconditions, callbacks); + while(dispatcher->wait_queue_depth()) + this_thread::sleep_for(chrono::milliseconds(1)); + auto end=chrono::high_resolution_clock::now(); + auto diff=chrono::duration_cast<secs_type>(end-begin); + std::cout << "It took " << diff.count() << " secs to execute 5,000,000 closures which is " << (5000000/diff.count()) << " unchained closures/sec" << std::endl; + std::cout << "\nPress Return to exit ..." << std::endl; + getchar(); + return 0; +} diff --git a/attic/example/call_example.cpp b/attic/example/call_example.cpp new file mode 100644 index 00000000..d4a72642 --- /dev/null +++ b/attic/example/call_example.cpp @@ -0,0 +1,25 @@ +//#define BOOST_RESULT_OF_USE_DECLTYPE 1 +#include "afio_pch.hpp" + +int main(void) +{ + //[call_example + // Create a dispatcher instance + auto dispatcher=boost::afio::make_dispatcher().get(); + + // Schedule an asynchronous call of some function with some bound set of arguments + auto helloworld=dispatcher->call(boost::afio::future<>() /* no precondition */, [](std::string text) -> int { + std::cout << text << std::endl; + return 42; + }, std::string("Hello world")); + + // Schedule as asynchronous call of some function to occur only after helloworld completes + auto addtovalue=dispatcher->call(helloworld, [&helloworld]() -> int { + return helloworld.get()+1; + }); + + // Print the result returned by the future for the lambda, which will be 43 + std::cout << "addtovalue() returned " << addtovalue.get() << std::endl; + //] + return 0; +} diff --git a/attic/example/closure_execution_afio_io_example.cpp b/attic/example/closure_execution_afio_io_example.cpp new file mode 100644 index 00000000..96ac4808 --- /dev/null +++ b/attic/example/closure_execution_afio_io_example.cpp @@ -0,0 +1,61 @@ +#include "afio_pch.hpp" + +//[closure_execution_afio_example +#include <vector> + +int main() +{ + const int ary_size = 10; + + //set up a file to read from + std::ofstream out_file("somefile.dat", std::ios::binary); + for (int i = 0; i < ary_size; ++i) + { + out_file.write(reinterpret_cast<const char*>(&i), sizeof(i)); + } + out_file.close(); + + + //set up the afio dispatcher + auto dispatcher = boost::afio::make_dispatcher().get(); + + //set up an array to hold our integers + int ary[ary_size]; + + //schedule the file open + auto opened_file = dispatcher->file(boost::afio::path_req("somefile.dat", + boost::afio::file_flags::read)); + + //set up vectors for the individual read operations, and the work on each integer + std::vector<boost::afio::future<>> read_ops(ary_size); + std::vector<std::function<void()>> vec_func(ary_size); + for (int i = 0; i < ary_size; ++i) + { + read_ops[i] = dispatcher->read(boost::afio::io_req<int>(opened_file, + &ary[i], sizeof(int), i*sizeof(int))); + + vec_func[i] = std::bind([](int* a){ *a *= 2 ; }, &ary[i]); + } + + // schedule the work to be done after reading in an integer + auto work = dispatcher->call(read_ops, vec_func); + + //schedule the file to be closed after reads are finished + auto closed_file = dispatcher->close(dispatcher->barrier(read_ops).front()); + + // make sure work has completed before trying to print data from the array + boost::afio::when_all_p(work.begin(), work.end()).wait(); + + //verify the out put is as expected: "0, 2, 4, 6, 8, 10, 12, 14, 16, 18" + for (int i = 0; i < ary_size; ++i) + { + std::cout << ary[i]; + if(i == ary_size-1) + std::cout << std::endl; + else + std::cout << ", "; + } + + return 0; +} +//] diff --git a/attic/example/closure_execution_traditional_io_example.cpp b/attic/example/closure_execution_traditional_io_example.cpp new file mode 100644 index 00000000..62483edc --- /dev/null +++ b/attic/example/closure_execution_traditional_io_example.cpp @@ -0,0 +1,53 @@ +#include "afio_pch.hpp" + +//[closure_execution_traditional_example +#include <iostream> +#include <fstream> + +int main() +{ + + const int ary_size = 10; + + // set up a file to read from + std::ofstream out_file("somefile.dat", std::ios::binary); + for (int i = 0; i < ary_size; ++i) + { + out_file.write(reinterpret_cast<const char*>(&i), sizeof(i)); + } + out_file.close(); + + //setup an array of integers + int ary[ary_size]; + //file open + std::ifstream file("somefile.dat"); + + //read in ints to ary + if (file) + { + for (int i = 0; i < ary_size; ++i) + { + file.read((char*) &ary[i], sizeof(ary[i])); + } + //close file + file.close(); + + + //do some work with the array of ints + for (int i = 0; i < ary_size; ++i) + ary[i] *= 2; + } + + //verify the out put is as expected: "0, 2, 4, 6, 8, 10, 12, 14, 16, 18" + for (int i = 0; i < ary_size; ++i) + { + std::cout << ary[i]; + if(i == ary_size-1) + std::cout << std::endl; + else + std::cout << ", "; + } + + return 0; +} +//] diff --git a/attic/example/completion_example1.cpp b/attic/example/completion_example1.cpp new file mode 100644 index 00000000..706a5e44 --- /dev/null +++ b/attic/example/completion_example1.cpp @@ -0,0 +1,58 @@ +//#define BOOST_RESULT_OF_USE_DECLTYPE 1 +#include "afio_pch.hpp" + +int main(void) +{ + //[completion_example1 + // Create a dispatcher instance + std::shared_ptr<boost::afio::dispatcher> dispatcher= + boost::afio::make_dispatcher().get(); + + // Completion handlers are the lowest level completion routine available, and therefore the least + // overhead but at the cost of considerable extra programmer effort. You almost certainly want + // to use the call() member function instead. + + // First create some callable entity ... + auto completer=[]( + /* These are always the standard parameters */ + size_t id, boost::afio::future<> precondition, + /* From now on user defined parameters */ + std::string text) + /* This is always the return type */ + -> std::pair<bool, std::shared_ptr<boost::afio::handle>> + { + /* id is the unique, non-zero integer id of this op. + precondition is the op you supplied as precondition. As it will by definition + have completed by now, you can fetch from its h member variable a shared pointer + to the shared stl_future containing either the output handle or the error state. + */ + std::cout << text << std::endl; + + // Return whether this completion has completed now or is it deferred, + // along with the handle we pass onto any completions completing on this op + // Note that op.get() by default rethrows any exception contained by the op. + // Normally this is highly desirable. + return std::make_pair(true, precondition.get_handle()); + }; + + // Bind any user defined parameters to create a proper boost::afio::dispatcher::completion_t + std::function<boost::afio::dispatcher::completion_t> boundf= + std::bind(completer, + /* The standard parameters */ + std::placeholders::_1, std::placeholders::_2, + /* Any values for the user defined parameters. Remember ALWAYS to pass by value! */ + std::string("Hello world")); + + // Schedule an asynchronous call of the completion with some bound set of arguments + boost::afio::future<> helloworld= + dispatcher->completion(boost::afio::future<>() /* no precondition */, + std::make_pair(boost::afio::async_op_flags::none, boundf)); + + // Create a boost::stl_future<> representing the ops passed to when_all_p() + boost::afio::stl_future<std::vector<std::shared_ptr<boost::afio::handle>>> stl_future + =boost::afio::when_all_p(helloworld); + // ... and wait for it to complete + stl_future.wait(); + //] + return 0; +} diff --git a/attic/example/completion_example2.cpp b/attic/example/completion_example2.cpp new file mode 100644 index 00000000..513eea77 --- /dev/null +++ b/attic/example/completion_example2.cpp @@ -0,0 +1,77 @@ +//#define BOOST_RESULT_OF_USE_DECLTYPE 1 +#include "afio_pch.hpp" + +int main(void) +{ + //[completion_example2 + // Create a dispatcher instance + std::shared_ptr<boost::afio::dispatcher> dispatcher= + boost::afio::make_dispatcher().get(); + + // One thing direct programming of completion handlers can do which call() cannot is immediate + // completions. These run immediately after the precondition finishes by the thread worker + // which executed the precondition rather than being appended to the FIFO queue. This can + // be useful for ensuring data is still cache-local for example. + + // Create the completion, using the standard form + auto completion=[](std::shared_ptr<boost::afio::dispatcher> dispatcher, + /* These are always the standard parameters */ + size_t id, boost::afio::future<> precondition) + /* This is always the return type */ + -> std::pair<bool, std::shared_ptr<boost::afio::handle>> + { + std::cout << "I am completion" << std::endl; + + // Create some callable entity which will do the actual completion. It can be + // anything you like, but you need a minimum of its integer id. + auto completer=[](std::shared_ptr<boost::afio::dispatcher> dispatcher, + size_t id, boost::afio::future<> op) -> int + { + try + { + std::cout << "I am completer" << std::endl; + + // Do stuff, returning the handle you want passed onto dependencies. + // Note that op.get() rethrows any exception in op. Normally you want this. + dispatcher->complete_async_op(id, op.get_handle()); + } + catch(...) + { + // In non-deferred completions AFIO traps exceptions for you. Here, you must + // do it by hand and tell AFIO about what exception state to return. + boost::afio::exception_ptr e(boost::afio::current_exception()); + dispatcher->complete_async_op(id, e); + } + return 0; + }; + // Bind the id and handle to completer, and enqueue for later asynchronous execution. + std::async(std::launch::async, completer, dispatcher, id, precondition); + + // Indicate we are not done yet + return std::make_pair(false, precondition.get_handle()); + }; + + // Bind any user defined parameters to create a proper boost::afio::dispatcher::completion_t + std::function<boost::afio::dispatcher::completion_t> boundf= + std::bind(completion, dispatcher, + /* The standard parameters */ + std::placeholders::_1, std::placeholders::_2); + + // Schedule an asynchronous call of the completion + boost::afio::future<> op= + dispatcher->completion(boost::afio::future<>() /* no precondition */, + std::make_pair( + /* Complete boundf immediately after its precondition (in this + case as there is no precondition that means right now before + completion() returns) */ + boost::afio::async_op_flags::immediate, + boundf)); + + // Create a boost::stl_future<> representing the ops passed to when_all_p() + boost::afio::stl_future<std::vector<std::shared_ptr<boost::afio::handle>>> stl_future + =boost::afio::when_all_p(op); + // ... and wait for it to complete + stl_future.wait(); + //] + return 0; +} diff --git a/attic/example/determine_legal_filenames.cpp b/attic/example/determine_legal_filenames.cpp new file mode 100644 index 00000000..667b3f36 --- /dev/null +++ b/attic/example/determine_legal_filenames.cpp @@ -0,0 +1,35 @@ +#include "afio_pch.hpp" + +int main(void) +{ + using namespace boost::afio; + auto dispatcher=boost::afio::make_dispatcher().get(); + + auto mkdir(dispatcher->dir(path_req("testdir", file_flags::create))); + try + { + for(size_t n=1; n<256; n++) + { + char cs[2]={ (char) n, 0 }; + path p(cs); + try + { + auto mkfile(dispatcher->file(path_req::relative(mkdir, p, boost::afio::file_flags::create))); + mkfile.get(); + auto rmfile(dispatcher->close(dispatcher->rmfile(mkfile))); + std::cout << "Character " << n << " (" << p << ") is permitted on this operating system." << std::endl; + } + catch(...) + { + std::cout << "Character " << n << " (" << p << ") failed on this operating system." << std::endl; + } + } + } + catch(...) + { + std::cerr << boost::current_exception_diagnostic_information(true) << std::endl; + throw; + } + auto rmdir(dispatcher->close(dispatcher->rmdir(mkdir))); + rmdir.get(); +} diff --git a/attic/example/enumerate_example.cpp b/attic/example/enumerate_example.cpp new file mode 100644 index 00000000..35bac7a5 --- /dev/null +++ b/attic/example/enumerate_example.cpp @@ -0,0 +1,43 @@ +#include "afio_pch.hpp" + +int main(void) +{ + //[enumerate_example + boost::afio::current_dispatcher_guard h(boost::afio::make_dispatcher().get()); + + // Schedule an opening of the root directory + boost::afio::future<> rootdir(boost::afio::async_dir("/")); + + std::pair<std::vector<boost::afio::directory_entry>, bool> list; + // This is used to reset the enumeration to the start + bool restart=true; + do + { + // Schedule an enumeration of an open directory handle + boost::afio::future<std::pair<std::vector<boost::afio::directory_entry>, bool>> + enumeration(boost::afio::async_enumerate(rootdir, + /* This is the maximum entries to enumerate. Note + the use of compatibility_maximum() which is the + same value your libc uses. The problem with smaller + enumerations is that the directory contents can change + out from underneath you more frequently. */ + boost::afio::directory_entry::compatibility_maximum(), + /* True if to reset enumeration */ + restart)); + restart=false; + + // People using AFIO often forget that futures can be waited + // on normally without needing to wait on the op handle + list=enumeration.get(); + for(boost::afio::directory_entry &i : list.first) + { +#ifdef WIN32 + std::wcout << i.name(); +#else + std::cout << i.name(); +#endif + std::cout << " type " << static_cast<int>(i.st_type()) << std::endl; + } + } while(list.second); + //] +} diff --git a/attic/example/filecopy_example.cpp b/attic/example/filecopy_example.cpp new file mode 100644 index 00000000..e48a9c36 --- /dev/null +++ b/attic/example/filecopy_example.cpp @@ -0,0 +1,185 @@ +#include "afio_pch.hpp" + +static uint32_t crc32(const void* data, size_t length, uint32_t previousCrc32 = 0) +{ + const uint32_t Polynomial = 0xEDB88320; + uint32_t crc = ~previousCrc32; + unsigned char* current = (unsigned char*) data; + while (length--) + { + crc ^= *current++; + for (unsigned int j = 0; j < 8; j++) + crc = (crc >> 1) ^ (-int(crc & 1) & Polynomial); + } + return ~crc; // same as crc ^ 0xFFFFFFFF +} + +//[filecopy_example +namespace { + using namespace boost::afio; + using boost::afio::off_t; + + // Keep memory buffers around + // A special allocator of highly efficient file i/o memory + typedef std::vector<char, utils::page_allocator<char>> file_buffer_type; + static std::vector<std::unique_ptr<file_buffer_type>> buffers; + + // Parallel copy files in sources into dest, concatenating + stl_future<std::vector<handle_ptr>> async_concatenate_files( + atomic<off_t> &written, off_t &totalbytes, + dispatcher_ptr dispatcher, + boost::afio::filesystem::path dest, std::vector<boost::afio::filesystem::path> sources, + size_t chunk_size=1024*1024 /* 1Mb */) + { + // Schedule the opening of the output file for writing + auto oh = async_file(dest, file_flags::create | file_flags::write); + // Schedule the opening of all the input files for reading as a batch + std::vector<path_req> ihs_reqs; ihs_reqs.reserve(sources.size()); + for(auto &&source : sources) + ihs_reqs.push_back(path_req(source, file_flags::read + |file_flags::will_be_sequentially_accessed)); + auto ihs=dispatcher->file(ihs_reqs); + // Retrieve any error from opening the output + oh.get(); + // Wait for the input file handles to open so we can get their sizes + // (plus any failures to open) + when_all_p(ihs).get(); + + // Need to figure out the sizes of the sources so we can resize output + // correctly. We also need to allocate scratch buffers for each source. + std::vector<std::tuple<off_t, off_t>> offsets; + offsets.reserve(ihs.size()); + off_t offset=0, max_individual=0; + for(auto &ih : ihs) + { + // Get the file's size in bytes + off_t bytes=ih->direntry(metadata_flags::size).st_size(); + if(bytes>max_individual) max_individual=bytes; + //std::cout << "File " << ih->path() << " size " << bytes << " to offset " << offset << std::endl; + // Push the offset to write at, amount to write, and a scratch buffer + offsets.push_back(std::make_tuple(offset, bytes)); + buffers.push_back(detail::make_unique<file_buffer_type>(chunk_size)); + offset+=bytes; + } + // Schedule resizing output to correct size, retrieving errors + totalbytes=offset; + auto ohresize=async_truncate(oh, offset); + when_all_p(ohresize).get(); + + // Schedule the parallel processing of all input files, sequential per file, + // but only after the output file has been resized + std::vector<future<>> lasts; + for(auto &i : ihs) + lasts.push_back(dispatcher->depends(ohresize, i)); + for(off_t o=0; o<max_individual; o+=chunk_size) + { + for(size_t idx=0; idx<ihs_reqs.size(); idx++) + { + auto offset=std::get<0>(offsets[idx]), bytes=std::get<1>(offsets[idx]); + auto &buffer=buffers[idx]; + if(o<bytes) + { + off_t thischunk=bytes-o; + if(thischunk>chunk_size) thischunk=chunk_size; + //std::cout << "Writing " << thischunk << " from offset " << o << " in " << lasts[idx]->path() << std::endl; + // Schedule a filling of buffer from offset o after last has completed + auto readchunk = async_read(lasts[idx], buffer->data(), (size_t)thischunk, o); + // Schedule a writing of buffer to offset offset+o after readchunk is ready + // Note the call to dispatcher->depends() to make sure the write only occurs + // after the read completes + auto writechunk = async_write(depends(readchunk, ohresize), buffer->data(), (size_t)thischunk, offset + o); + // Schedule incrementing written after write has completed + auto incwritten = writechunk.then([&written, thischunk](future<> f) { + written += thischunk; + return f; + }); + // Don't do next read until written is incremented + lasts[idx]=dispatcher->depends(incwritten, readchunk); + } + } + } + // Having scheduled all the reads and write, return a stl_future which returns when + // they're done + return when_all_p(lasts); + } +} + +int main(int argc, const char *argv[]) +{ + using namespace boost::afio; + using boost::afio::off_t; + typedef chrono::duration<double, ratio<1, 1>> secs_type; + if(argc<3) + { + std::cerr << "ERROR: Need to specify destination path and source paths" + << std::endl; + return 1; + } + try + { + atomic<off_t> written(0); + off_t totalbytes=0; + std::shared_ptr<boost::afio::dispatcher> dispatcher= + boost::afio::make_dispatcher().get(); + // Set a dispatcher as current for this thread + boost::afio::current_dispatcher_guard guard(dispatcher); + + boost::afio::filesystem::path dest=argv[1]; + std::vector<boost::afio::filesystem::path> sources; + std::cout << "Concatenating into " << dest << " the files "; + for(int n=2; n<argc; ++n) + { + sources.push_back(argv[n]); + std::cout << sources.back(); + if(n<argc-1) std::cout << ", "; + } + std::cout << " ..." << std::endl; + + auto begin=chrono::steady_clock::now(); + auto h=async_concatenate_files(written, totalbytes, dispatcher, dest, sources); + // Print progress once a second until it's done + while(future_status::timeout==h.wait_for(boost::afio::chrono::seconds(1))) + { + std::cout << "\r" << (100*written)/totalbytes << "% complete (" << written + << " out of " << totalbytes << " @ " << (written/chrono::duration_cast<secs_type>( + chrono::steady_clock::now()-begin).count()/1024/1024) << "Mb/sec) ..." << std::flush; + } + // Retrieve any errors + h.get(); + std::cout << std::endl; + } + catch(...) + { + std::cerr << "ERROR: " << boost::current_exception_diagnostic_information(true) + << std::endl; + return 1; + } + // Make sure output really is input concatenated + std::cout << "CRC checking that the output exactly matches the inputs ..." << std::endl; + uint32_t crc1 = 0, crc2 = 0; + off_t offset = 0; + std::ifstream o(argv[1], std::ifstream::in); + for (int n = 2; n < argc; n++) + { + std::ifstream i(argv[n], std::ifstream::in); + i.seekg(0, i.end); + std::vector<char> buffer1((size_t)i.tellg()), buffer2((size_t)i.tellg()); + i.seekg(0, i.beg); + o.read(buffer1.data(), buffer1.size()); + i.read(buffer2.data(), buffer2.size()); + bool quiet = false; + for (size_t n = 0; n<buffer1.size(); n += 64) + { + crc1 = crc32(buffer1.data() + n, 64, crc1); + crc2 = crc32(buffer2.data() + n, 64, crc2); + if (crc1 != crc2 && !quiet) + { + std::cerr << "ERROR: Offset " << offset+n << " not copied correctly!" << std::endl; + quiet = true; + } + } + offset += buffer1.size(); + } + return 0; +} +//] diff --git a/attic/example/filedir_example.cpp b/attic/example/filedir_example.cpp new file mode 100644 index 00000000..45439ed5 --- /dev/null +++ b/attic/example/filedir_example.cpp @@ -0,0 +1,74 @@ +#include "afio_pch.hpp" + +int main(void) +{ + //[filedir_example + using namespace BOOST_AFIO_V2_NAMESPACE; + using BOOST_AFIO_V2_NAMESPACE::rmdir; + std::shared_ptr<boost::afio::dispatcher> dispatcher = + boost::afio::make_dispatcher().get(); + current_dispatcher_guard h(dispatcher); + + // Free function + try + { + // Schedule creating a directory called testdir + auto mkdir(async_dir("testdir", boost::afio::file_flags::create)); + // Schedule creating a file called testfile in testdir only when testdir has been created + auto mkfile(async_file(mkdir, "testfile", boost::afio::file_flags::create)); + // Schedule creating a symbolic link called linktodir to the item referred to by the precondition + // i.e. testdir. Note that on Windows you can only symbolic link directories. + auto mklink(async_symlink(mkdir, "linktodir", mkdir, boost::afio::file_flags::create)); + + // Schedule deleting the symbolic link only after when it has been created + auto rmlink(async_rmsymlink(mklink)); + // Schedule deleting the file only after when it has been created + auto rmfile(async_close(async_rmfile(mkfile))); + // Schedule waiting until both the preceding operations have finished + auto barrier(dispatcher->barrier({ rmlink, rmfile })); + // Schedule deleting the directory only after the barrier completes + auto rmdir(async_rmdir(depends(barrier.front(), mkdir))); + // Check ops for errors + boost::afio::when_all_p(mkdir, mkfile, mklink, rmlink, rmfile, rmdir).wait(); + } + catch (...) + { + std::cerr << boost::current_exception_diagnostic_information(true) << std::endl; + throw; + } + + // Batch + try + { + // Schedule creating a directory called testdir + auto mkdir(dispatcher->dir(std::vector<boost::afio::path_req>(1, + boost::afio::path_req("testdir", boost::afio::file_flags::create))).front()); + // Schedule creating a file called testfile in testdir only when testdir has been created + auto mkfile(dispatcher->file(std::vector<boost::afio::path_req>(1, + boost::afio::path_req::relative(mkdir, "testfile", + boost::afio::file_flags::create))).front()); + // Schedule creating a symbolic link called linktodir to the item referred to by the precondition + // i.e. testdir. Note that on Windows you can only symbolic link directories. Note that creating + // symlinks must *always* be as an absolute path, as that is how they are stored. + auto mklink(dispatcher->symlink(std::vector<boost::afio::path_req>(1, + boost::afio::path_req::absolute(mkdir, "testdir/linktodir", + boost::afio::file_flags::create))).front()); + + // Schedule deleting the symbolic link only after when it has been created + auto rmlink(dispatcher->close(std::vector<future<>>(1, dispatcher->rmsymlink(mklink))).front()); + // Schedule deleting the file only after when it has been created + auto rmfile(dispatcher->close(std::vector<future<>>(1, dispatcher->rmfile(mkfile))).front()); + // Schedule waiting until both the preceding operations have finished + auto barrier(dispatcher->barrier({rmlink, rmfile})); + // Schedule deleting the directory only after the barrier completes + auto rmdir(dispatcher->rmdir(std::vector<path_req>(1, dispatcher->depends(barrier.front(), mkdir))).front()); + // Check ops for errors + boost::afio::when_all_p(mkdir, mkfile, mklink, rmlink, rmfile, rmdir).wait(); + } + catch(...) + { + std::cerr << boost::current_exception_diagnostic_information(true) << std::endl; + throw; + } + //] +} diff --git a/attic/example/filter_example.cpp b/attic/example/filter_example.cpp new file mode 100644 index 00000000..464f47e8 --- /dev/null +++ b/attic/example/filter_example.cpp @@ -0,0 +1,36 @@ +#include "afio_pch.hpp" + +//[filter_example +using namespace boost::afio; + +// This function will be called for every file opened +static void open_file_filter(detail::OpType, future<> &op) noexcept +{ + std::cout << "File handle " << op->native_handle() << " opened!" << std::endl; +} + +// This function will be called for every read and write performed +static void readwrite_filter(detail::OpType optype, handle *h, + const detail::io_req_impl<true> &req, boost::afio::off_t offset, size_t buffer_idx, + size_t buffers, const boost::system::error_code &ec, size_t bytes_transferred) +{ + std::cout << "File handle " << h->native_handle() << (detail::OpType::read==optype ? + " read " : " wrote ") << bytes_transferred << " bytes at offset " << offset << std::endl; +} + +int main(void) +{ + dispatcher_ptr dispatcher= + make_dispatcher().get(); + + // Install filters BEFORE scheduling any ops as the filter APIs are NOT + // threadsafe. This filters all file opens. + dispatcher->post_op_filter({ std::make_pair(detail::OpType::file /* just file opens */, + std::function<dispatcher::filter_t>(open_file_filter)) }); + + // This filters all reads and writes + dispatcher->post_readwrite_filter({ std::make_pair(detail::OpType::Unknown /* all */, + std::function<dispatcher::filter_readwrite_t>(readwrite_filter)) }); + return 0; +} +//] diff --git a/attic/example/find_in_files_afio.cpp b/attic/example/find_in_files_afio.cpp new file mode 100644 index 00000000..a1427fd2 --- /dev/null +++ b/attic/example/find_in_files_afio.cpp @@ -0,0 +1,351 @@ +#include "afio_pch.hpp" +#include <deque> +#include <regex> +#include <chrono> +#include <mutex> +#include <future> +#include <initializer_list> +#include "boost/exception/diagnostic_information.hpp" +#include "boost/../libs/afio/test/Aligned_Allocator.hpp" + +/* My Intel Core i7 3770K running Windows 8 x64 with 7200rpm drive, using +Sysinternals RAMMap to clear disc cache (http://technet.microsoft.com/en-us/sysinternals/ff700229.aspx) + +Warm cache: +92 files matched out of 36734 files which was 7031545071 bytes. +The search took 2.5173 seconds which was 14592.6 files per second or 2663.89 Mb/sec. + +Cold cache: +91 files matched out of 36422 files which was 6108537728 bytes. +The search took 388.74 seconds which was 93.6925 files per second or 14.9857 Mb/sec. + +Warm cache mmaps: +92 files matched out of 36734 files which was 7031400686 bytes. +The search took 1.02974 seconds which was 35673.1 files per second or 6512.01 Mb/sec. + +Cold cache mmaps: +91 files matched out of 36422 files which was 6109258426 bytes. +The search took 242.76 seconds which was 150.033 files per second or 24 Mb/sec. +*/ + +#define USE_MMAPS + +//[find_in_files_afio +using namespace boost::afio; + +// Often it's easiest for a lot of nesting callbacks to carry state via a this pointer +class find_in_files +{ +public: + std::promise<int> finished; + std::regex regexpr; // The precompiled regular expression + dispatcher_ptr dispatcher; + recursive_mutex opslock; + std::deque<future<>> ops; // For exception gathering + std::atomic<size_t> bytesread, filesread, filesmatched, scheduled, completed; + std::vector<std::pair<boost::afio::filesystem::path, size_t>> filepaths; + + // Signals finish once all scheduled ops have completed + void docompleted(size_t inc) + { + size_t c=(completed+=inc); + if(c==scheduled) + finished.set_value(0); + }; + // Adds ops to the list of scheduled + void doscheduled(std::initializer_list<future<>> list) + { + scheduled+=list.size(); + //boost::lock_guard<decltype(opslock)> lock(opslock); + //ops.insert(ops.end(), list.begin(), list.end()); + } + void doscheduled(std::vector<future<>> list) + { + scheduled+=list.size(); + //boost::lock_guard<decltype(opslock)> lock(opslock); + //ops.insert(ops.end(), list.begin(), list.end()); + } + void dosearch(handle_ptr h, const char *buffer, size_t length) + { + // Search the buffer for the regular expression + if(std::regex_search(buffer, regexpr)) + { +#pragma omp critical + { + std::cout << h->path() << std::endl; + } + ++filesmatched; + } + ++filesread; + bytesread+=length; + } + // A file searching completion, called when each file read completes + std::pair<bool, handle_ptr> file_read(size_t id, + future<> op, std::shared_ptr<std::vector<char, + detail::aligned_allocator<char, 4096, false>>> _buffer, size_t length) + { + handle_ptr h(op.get_handle()); + //std::cout << "R " << h->path() << std::endl; + char *buffer=_buffer->data(); + buffer[length]=0; + dosearch(h, buffer, length); + docompleted(2); + // Throw away the buffer now rather than later to keep memory consumption down + _buffer->clear(); + return std::make_pair(true, h); + } + // A file reading completion, called when each file open completes + std::pair<bool, handle_ptr> file_opened(size_t id, + future<> op, size_t length) + { + handle_ptr h(op.get_handle()); + //std::cout << "F " << h->path() << std::endl; + if (length) + { +#ifdef USE_MMAPS + auto map(h->map_file()); + if (map) + { + dosearch(h, (const char *)map->addr, length); + } + else +#endif + { + // Allocate a sufficient 4Kb aligned buffer + size_t _length = (4095 + length)&~4095; + auto buffer = std::make_shared < std::vector<char, + detail::aligned_allocator<char, 4096, false >> >(_length + 1); + // Schedule a read of the file + auto read = dispatcher->read(make_io_req( + dispatcher->op_from_scheduled_id(id), buffer->data(), _length, 0)); + auto read_done = dispatcher->completion(read, + std::make_pair(async_op_flags::none/*regex search might be slow*/, + std::function<dispatcher::completion_t>( + std::bind(&find_in_files::file_read, this, std::placeholders::_1, + std::placeholders::_2, buffer, length)))); + doscheduled({ read, read_done }); + } + } + docompleted(2); + return std::make_pair(true, h); + } + // An enumeration parsing completion, called when each directory enumeration completes + std::pair<bool, handle_ptr> dir_enumerated(size_t id, + future<> op, + std::shared_ptr<future<std::pair<std::vector<directory_entry>, bool>>> listing) + { + handle_ptr h(op.get_handle()); + future<> lastdir, thisop(dispatcher->op_from_scheduled_id(id)); + // Get the entries from the ready stl_future + std::vector<directory_entry> entries(std::move(listing->get().first)); + //std::cout << "E " << h->path() << std::endl; + // For each of the directories schedule an open and enumeration +#if 0 + // Algorithm 1 + { + std::vector<path_req> dir_reqs; dir_reqs.reserve(entries.size()); + for(auto &entry : entries) + { + if((entry.st_type()&S_IFDIR)==S_IFDIR) + { + dir_reqs.push_back(path_req(thisop, h->path()/entry.name())); + } + } + if(!dir_reqs.empty()) + { + std::vector<std::pair<async_op_flags, std::function<dispatcher::completion_t>>> dir_openedfs(dir_reqs.size(), std::make_pair(async_op_flags::None, std::bind(&find_in_files::dir_opened, this, std::placeholders::_1, std::placeholders::_2))); + auto dir_opens=dispatcher->dir(dir_reqs); + doscheduled(dir_opens); + auto dir_openeds=dispatcher->completion(dir_opens, dir_openedfs); + doscheduled(dir_openeds); + // Hold back some of the concurrency + lastdir=dir_openeds.back(); + } + } +#else + // Algorithm 2 + // The Windows NT kernel filing system driver gets upset with too much concurrency + // when used with OSDirect so throttle directory enumerations to enforce some depth first traversal. + { + std::pair<async_op_flags, std::function<dispatcher::completion_t>> dir_openedf=std::make_pair(async_op_flags::none, std::bind(&find_in_files::dir_opened, this, + std::placeholders::_1, std::placeholders::_2)); + for(auto &entry : entries) + { + if(entry.st_type()== +#ifdef BOOST_AFIO_USE_LEGACY_FILESYSTEM_SEMANTICS + boost::afio::filesystem::file_type::directory_file) +#else + boost::afio::filesystem::file_type::directory) +#endif + { + auto dir_open=dispatcher->dir(path_req::absolute(lastdir, h->path()/entry.name())); + auto dir_opened=dispatcher->completion(dir_open, dir_openedf); + doscheduled({ dir_open, dir_opened }); + lastdir=dir_opened; + } + } + } +#endif + + // For each of the files schedule an open and search +#if 0 + // Algorithm 1 + { + std::vector<path_req> file_reqs; file_reqs.reserve(entries.size()); + std::vector<std::pair<async_op_flags, std::function<dispatcher::completion_t>>> file_openedfs; file_openedfs.reserve(entries.size()); + for(auto &entry : entries) + { + if((entry.st_type()&S_IFREG)==S_IFREG) + { + size_t length=(size_t)entry.st_size(); + if(length) + { + file_flags flags=file_flags::read; +#ifdef USE_MMAPS + if(length>16384) flags=flags|file_flags::os_mmap; +#endif + file_reqs.push_back(path_req(lastdir, h->path()/entry.name(), flags)); + file_openedfs.push_back(std::make_pair(async_op_flags::None, std::bind(&find_in_files::file_opened, this, std::placeholders::_1, std::placeholders::_2, length))); + } + } + } + auto file_opens=dispatcher->file(file_reqs); + doscheduled(file_opens); + auto file_openeds=dispatcher->completion(file_opens, file_openedfs); + doscheduled(file_openeds); + } +#else + // Algorithm 2 + { + for(auto &entry : entries) + { + if(entry.st_type()== +#ifdef BOOST_AFIO_USE_LEGACY_FILESYSTEM_SEMANTICS + boost::afio::filesystem::file_type::regular_file) +#else + boost::afio::filesystem::file_type::regular) +#endif + { + size_t length=(size_t)entry.st_size(); + if(length) + { + file_flags flags=file_flags::read; + auto file_open=dispatcher->file(path_req::absolute(lastdir, h->path()/entry.name(), flags)); + auto file_opened=dispatcher->completion(file_open, + std::make_pair(async_op_flags::none, + std::function<dispatcher::completion_t>( + std::bind(&find_in_files::file_opened, this, + std::placeholders::_1, std::placeholders::_2, length)))); + doscheduled({ file_open, file_opened }); + lastdir=file_opened; + } + } + } + } +#endif + docompleted(2); + return std::make_pair(true, h); + } + // A directory enumerating completion, called once per directory open in the tree + std::pair<bool, handle_ptr> dir_opened(size_t id, + future<> op) + { + handle_ptr h(op.get_handle()); + //std::cout << "D " << h->path() << std::endl; + // Now we have an open directory handle, schedule an enumeration + auto enumeration=dispatcher->enumerate(enumerate_req( + dispatcher->op_from_scheduled_id(id), metadata_flags::size, 1000)); + future<> enumeration_op(enumeration); + auto listing=std::make_shared<future<std::pair<std::vector<directory_entry>, + bool>>>(std::move(enumeration)); + auto enumeration_done=dispatcher->completion(enumeration_op, + make_pair(async_op_flags::none, + std::function<dispatcher::completion_t>( + std::bind(&find_in_files::dir_enumerated, this, + std::placeholders::_1, std::placeholders::_2, listing)))); + doscheduled({enumeration, enumeration_done}); + docompleted(2); + // Complete only if not the cur dir opened + return std::make_pair(true, h); + }; + void dowait() + { + // Prepare finished + auto finished_waiter=finished.get_future(); +#if 0 // Disabled to maximise performance + // Wait till done, retrieving any exceptions as they come in to keep memory consumption down + std::future_status status; + do + { + status=finished_waiter.wait_for(std::chrono::milliseconds(1000)); + std::cout << "\nDispatcher has " << dispatcher->count() << " fds open and " << dispatcher->wait_queue_depth() << " ops outstanding." << std::endl; + std::vector<future<>> batch; batch.reserve(10000); + { + boost::lock_guard<decltype(opslock)> lock(opslock); + while(status==std::future_status::timeout ? (ops.size()>5000) : !ops.empty()) + { + batch.push_back(std::move(ops.front())); + ops.pop_front(); + } + } + // Retrieve any exceptions + std::cout << "Processed " << batch.size() << " ops for exception state." << std::endl; + if(!batch.empty()) + when_all_p(batch.begin(), batch.end()).wait(); + } while(status==std::future_status::timeout); +#else + finished_waiter.wait(); +#endif + } + // Constructor, which starts the ball rolling + find_in_files(const char *_regexpr) : regexpr(_regexpr), + // Create an AFIO dispatcher that bypasses any filing system buffers + dispatcher(make_dispatcher("file:///", file_flags::will_be_sequentially_accessed/*|file_flags::os_direct*/).get()), + bytesread(0), filesread(0), filesmatched(0), scheduled(0), completed(0) + { + filepaths.reserve(50000); + + // Schedule the recursive enumeration of the current directory + std::cout << "\n\nStarting directory enumerations ..." << std::endl; + auto cur_dir=dispatcher->dir(path_req("")); + auto cur_dir_opened=dispatcher->completion(cur_dir, std::make_pair(async_op_flags::none, + std::function<dispatcher::completion_t>( + std::bind(&find_in_files::dir_opened, this, + std::placeholders::_1, std::placeholders::_2)))); + doscheduled({cur_dir, cur_dir_opened}); + dowait(); + } +}; + +int main(int argc, const char *argv[]) +{ + using std::placeholders::_1; using std::placeholders::_2; + using namespace boost::afio; + typedef chrono::duration<double, ratio<1, 1>> secs_type; + if(argc<2) + { + std::cerr << "ERROR: Specify a regular expression to search all files in the current directory." << std::endl; + return 1; + } + // Prime SpeedStep + auto begin=chrono::high_resolution_clock::now(); + while(chrono::duration_cast<secs_type>(chrono::high_resolution_clock::now()-begin).count()<1); + try + { + begin=chrono::high_resolution_clock::now(); + find_in_files finder(argv[1]); + auto end=chrono::high_resolution_clock::now(); + auto diff=chrono::duration_cast<secs_type>(end-begin); + std::cout << "\n" << finder.filesmatched << " files matched out of " << finder.filesread + << " files which was " << finder.bytesread << " bytes." << std::endl; + std::cout << "The search took " << diff.count() << " seconds which was " << finder.filesread/diff.count() + << " files per second or " << (finder.bytesread/diff.count()/1024/1024) << " Mb/sec." << std::endl; + } + catch(...) + { + std::cerr << boost::current_exception_diagnostic_information(true) << std::endl; + return 1; + } + return 0; +} +//] diff --git a/attic/example/find_in_files_iostreams.cpp b/attic/example/find_in_files_iostreams.cpp new file mode 100644 index 00000000..0114ec57 --- /dev/null +++ b/attic/example/find_in_files_iostreams.cpp @@ -0,0 +1,109 @@ +#include "afio_pch.hpp" +#include <iostream> +#include <fstream> +#include <regex> +#include <chrono> +#if BOOST_AFIO_USE_BOOST_FILESYSTEM +#include "boost/filesystem/fstream.hpp" +#endif + +/* My Intel Core i7 3770K running Windows 8 x64 with 7200rpm drive, using +Sysinternals RAMMap to clear disc cache (http://technet.microsoft.com/en-us/sysinternals/ff700229.aspx) + +Single threaded, warm cache: +92 files matched out of 39279 files which was 7093894518 bytes. +The search took 8.32834 seconds which was 4716.3 files per second or 812.318 Mb/sec. + +Single threaded, cold cache: +91 files matched out of 38967 files which was 6170927489 bytes. +The search took 369.046 seconds which was 105.588 files per second or 15.9467 Mb/sec. + +OpenMP, warm cache: +92 files matched out of 38943 files which was 7092655881 bytes. +The search took 3.73611 seconds which was 10423.4 files per second or 1810.46 Mb/sec. + +OpenMP, cold cache: +91 files matched out of 38886 files which was 6170656567 bytes. +The search took 741.131 seconds which was 52.4684 files per second or 7.94029 Mb/sec. +*/ + +//[find_in_files_iostreams +int main(int argc, const char *argv[]) +{ + using namespace std; + namespace filesystem = boost::afio::filesystem; +#if BOOST_AFIO_USE_BOOST_FILESYSTEM + using boost::filesystem::ifstream; +#endif + typedef chrono::duration<double, ratio<1, 1>> secs_type; + if(argc<2) + { + cerr << "ERROR: Specify a regular expression to search all files in the current directory." << endl; + return 1; + } + // Prime SpeedStep + auto begin=chrono::high_resolution_clock::now(); + while(chrono::duration_cast<secs_type>(chrono::high_resolution_clock::now()-begin).count()<1); + size_t bytesread=0, filesread=0, filesmatched=0; + try + { + begin=chrono::high_resolution_clock::now(); + + // Generate a list of all files here and below the current working directory + vector<filesystem::path> filepaths; + for(auto it=filesystem::recursive_directory_iterator("."); it!=filesystem::recursive_directory_iterator(); ++it) + { + if(it->status().type()!= +#ifdef BOOST_AFIO_USE_LEGACY_FILESYSTEM_SEMANTICS + filesystem::file_type::regular_file) +#else + filesystem::file_type::regular) +#endif + continue; + filepaths.push_back(it->path()); + } + + // Compile the regular expression, and have OpenMP parallelise the loop + regex regexpr(argv[1]); +#pragma omp parallel for schedule(dynamic) + for(int n=0; n<(int) filepaths.size(); n++) + { + // Open the file + ifstream s(filepaths[n], ifstream::binary); + s.exceptions(fstream::failbit | fstream::badbit); // Turn on exception throwing + // Get its length + s.seekg(0, ios::end); + size_t length=(size_t) s.tellg(); + s.seekg(0, ios::beg); + // Allocate a sufficient buffer, avoiding the byte clearing vector would do + unique_ptr<char[]> buffer(new char[length+1]); + // Read in the file, terminating with zero + s.read(buffer.get(), length); + buffer.get()[length]=0; + // Search the buffer for the regular expression + if(regex_search(buffer.get(), regexpr)) + { +#pragma omp critical + { + cout << filepaths[n] << endl; + } + filesmatched++; + } + filesread++; + bytesread+=length; + } + auto end=chrono::high_resolution_clock::now(); + auto diff=chrono::duration_cast<secs_type>(end-begin); + cout << "\n" << filesmatched << " files matched out of " << filesread << " files which was " + << bytesread << " bytes." << endl; + cout << "The search took " << diff.count() << " seconds which was " << filesread/diff.count() + << " files per second or " << (bytesread/diff.count()/1024/1024) << " Mb/sec." << endl; + } + catch(...) + { + cerr << boost::current_exception_diagnostic_information(true) << endl; + return 1; + } + return 0; +} +//] diff --git a/attic/example/readallof_example.cpp b/attic/example/readallof_example.cpp new file mode 100644 index 00000000..c7882c88 --- /dev/null +++ b/attic/example/readallof_example.cpp @@ -0,0 +1,54 @@ +#include "afio_pch.hpp" + +int main(void) +{ + using namespace boost::afio; + auto dispatcher=make_dispatcher().get(); + current_dispatcher_guard h(dispatcher); + +{ + //[readallof_example_bad + char input[1024]; + auto file_opened = file("foo.txt"); + read(file_opened, input, 0); + //] +} + +{ + //[readallof_example_many + char input[1024]; + // Schedule enumerating the containing directory, but only for foo.txt + auto dir_opened = async_dir(""); // "" means current directory in AFIO + auto file_enumed = async_enumerate(dir_opened, metadata_flags::size, + 2, true, "foo.txt"); + // Schedule in parallel opening the file + auto file_opened = async_file(dir_opened, "foo.txt"); + auto file_read = file_enumed.then( + [&input](future<std::pair<std::vector<directory_entry>, bool>> &f) { + // Get the directory_entry for the first result + directory_entry &de = f.get().first.front(); + // Schedule a file read once we know the file size + return async_read(f, (void *)input, + (size_t)de.st_size(), // won't block + 0); + }); + file_read.get(); + //] +} + +{ + //[readallof_example_single + char input[1024]; + // Open the file synchronously + auto file_opened = file("foo.txt"); + // Fetch ONLY the size metadata. Blocks because it's synchronous! + directory_entry de = file_opened->direntry(metadata_flags::size); + // Read now we know the file size + read(file_opened, + (void *) input, + (size_t) de.st_size(), // doesn't block, as size was fetched before. + 0); + //] +} + return 0; +} diff --git a/attic/example/readwrite_example.cpp b/attic/example/readwrite_example.cpp new file mode 100644 index 00000000..5a94c561 --- /dev/null +++ b/attic/example/readwrite_example.cpp @@ -0,0 +1,78 @@ +#include "afio_pch.hpp" + +int main(void) +{ + try + { + //[readwrite_example + namespace afio = BOOST_AFIO_V2_NAMESPACE; + namespace asio = BOOST_AFIO_V2_NAMESPACE::asio; + + // Set a dispatcher as current for this thread + afio::current_dispatcher_guard h(afio::make_dispatcher().get()); + + // Schedule an opening of a file called example_file.txt + afio::future<> openfile = afio::async_file("example_file.txt", + afio::file_flags::create | afio::file_flags::read_write); + + // Something a bit surprising for many people is that writing off + // the end of a file in AFIO does NOT extend the file and writes + // which go past the end will simply fail instead. Why not? + // Simple: that's the convention with async file i/o, because + // synchronising multiple processes concurrently adjusting a + // file's length has significant overhead which is wasted if you + // don't need that functionality. Luckily, there is an easy + // workaround: either open a file for append-only access, in which + // case all writes extend the file for you, or else you explicitly + // extend files before writing, like this: + afio::future<> resizedfile = afio::async_truncate(openfile, 12); + + // Config a write gather. You could do this of course as a batch + // of writes, but a write gather has optimised host OS support in most + // cases, so it's one syscall instead of many. + std::vector<asio::const_buffer> buffers; + buffers.push_back(asio::const_buffer("He", 2)); + buffers.push_back(asio::const_buffer("ll", 2)); + buffers.push_back(asio::const_buffer("o ", 2)); + buffers.push_back(asio::const_buffer("Wo", 2)); + buffers.push_back(asio::const_buffer("rl", 2)); + buffers.push_back(asio::const_buffer("d\n", 2)); + // Schedule the write gather to offset zero after the resize file + afio::future<> written(afio::async_write(resizedfile, buffers, 0)); + + // Have the compiler config the exact same write gather as earlier for you + // The compiler assembles an identical sequence of ASIO write gather + // buffers for you + std::vector<std::string> buffers2={ "He", "ll", "o ", "Wo", "rl", "d\n" }; + // Schedule this to occur after the previous write completes + afio::future<> written2(afio::async_write(written, buffers2, 0)); + + // Schedule making sure the previous batch has definitely reached physical storage + // This won't complete until the write is on disc + afio::future<> stored(afio::async_sync(written2)); + + // Schedule filling this array from the file. Note how convenient std::array + // is and completely replaces C style char buffer[bytes] + std::array<char, 12> buffer; + afio::future<> read(afio::async_read(stored, buffer, 0)); + + // Schedule the closing and deleting of example_file.txt after the contents read + afio::future<> deletedfile(afio::async_rmfile(afio::async_close(read))); + + // Wait until the buffer has been filled, checking all steps for errors + afio::when_all_p(openfile, resizedfile, written, written2, stored, read).get(); /*< waits for file open, resize, write, sync and read to complete, throwing any exceptions encountered >*/ + + // There is actually a io_req<std::string> specialisation you + // can use to skip this bit by reading directly into a string ... + std::string contents(buffer.begin(), buffer.end()); + std::cout << "Contents of file is '" << contents << "'" << std::endl; + + // Check remaining ops for errors + deletedfile.get(); + //] + } + catch(const BOOST_AFIO_V2_NAMESPACE::system_error &e) { std::cerr << "ERROR: program exits via system_error code " << e.code().value() << " (" << e.what() << ")" << std::endl; return 1; } + catch(const std::exception &e) { std::cerr << "ERROR: program exits via exception (" << e.what() << ")" << std::endl; return 1; } + catch(...) { std::cerr << "ERROR: program exits via " << boost::current_exception_diagnostic_information(true) << std::endl; return 1; } + return 0; +} diff --git a/attic/example/readwrite_example_traditional.cpp b/attic/example/readwrite_example_traditional.cpp new file mode 100644 index 00000000..56f82113 --- /dev/null +++ b/attic/example/readwrite_example_traditional.cpp @@ -0,0 +1,46 @@ +#include "afio_pch.hpp" + +int main(void) +{ + //[readwrite_example_traditional + + try + { + // Open a file called example_file.txt + std::fstream openfile("example_file.txt"); /*< opens file >*/ + // Turn on exception throwing + openfile.exceptions(std::fstream::failbit | std::fstream::badbit); + + // Do a write gather. STL iostreams will buffer the writes + // and coalesce them into a single syscall + openfile << "He"; /*< writes >*/ + openfile << "ll"; + openfile << "o "; + openfile << "Wo"; + openfile << "rl"; + openfile << "d\n"; + + // Make sure the previous batch has definitely reached physical storage + // This won't complete until the write is on disc + openfile.sync(); /*< syncs >*/ + + // Fill this array from the file + std::array<char, 12> buffer; + openfile.seekg(0, std::ios::beg); + openfile.read(buffer.data(), buffer.size()); /*< reads >*/ + + // Close the file and delete it + openfile.close(); /*< closes file >*/ + boost::afio::filesystem::remove("example_file.txt"); /*< deletes file >*/ + + // Convert the read array into a string + std::string contents(buffer.begin(), buffer.end()); + std::cout << "Contents of file is '" << contents << "'" << std::endl; + } + catch(...) + { + std::cerr << boost::current_exception_diagnostic_information(true) << std::endl; + throw; + } + //] +} diff --git a/attic/example/statfs_example.cpp b/attic/example/statfs_example.cpp new file mode 100644 index 00000000..122c07e0 --- /dev/null +++ b/attic/example/statfs_example.cpp @@ -0,0 +1,18 @@ +#include "afio_pch.hpp" + +int main(void) +{ + //[statfs_example + boost::afio::current_dispatcher_guard h(boost::afio::make_dispatcher().get()); + + // Open the root directory + boost::afio::handle_ptr rootdir(boost::afio::dir("/")); + + // Ask the filing system of the root directory how much free space there is + boost::afio::statfs_t statfs(boost::afio::statfs(rootdir, + boost::afio::fs_metadata_flags::bsize|boost::afio::fs_metadata_flags::bfree)); + + std::cout << "Your root filing system has " + << (statfs.f_bfree*statfs.f_bsize/1024.0/1024.0/1024.0) << " Gb free." << std::endl; + //] +} diff --git a/attic/example/workshop_atomic_updates_afio.cpp b/attic/example/workshop_atomic_updates_afio.cpp new file mode 100644 index 00000000..f36ea1c5 --- /dev/null +++ b/attic/example/workshop_atomic_updates_afio.cpp @@ -0,0 +1,2 @@ +#include "afio_pch.hpp" +#include "workshop_atomic_updates_afio.ipp" diff --git a/attic/example/workshop_atomic_updates_afio.ipp b/attic/example/workshop_atomic_updates_afio.ipp new file mode 100644 index 00000000..c4daafe4 --- /dev/null +++ b/attic/example/workshop_atomic_updates_afio.ipp @@ -0,0 +1,323 @@ +//[workshop_atomic_updates_afio_interface +namespace afio = BOOST_AFIO_V2_NAMESPACE; +namespace filesystem = BOOST_AFIO_V2_NAMESPACE::filesystem; +using BOOST_OUTCOME_V1_NAMESPACE::lightweight_futures::shared_future; + +class data_store +{ + afio::dispatcher_ptr _dispatcher; + afio::handle_ptr _store; +public: + // Type used for read streams + using istream = std::shared_ptr<std::istream>; + // Type used for write streams + using ostream = std::shared_ptr<std::ostream>; + // Type used for lookup + using lookup_result_type = shared_future<istream>; + // Type used for write + using write_result_type = shared_future<ostream>; + + // Disposition flags + static constexpr size_t writeable = (1<<0); + + // Open a data store at path + data_store(size_t flags = 0, afio::path path = "store"); + + // Look up item named name for reading, returning an istream for the item + shared_future<istream> lookup(std::string name) noexcept; + // Look up item named name for writing, returning an ostream for that item + shared_future<ostream> write(std::string name) noexcept; +}; +//] + +//[workshop_atomic_updates_afio3] +namespace asio = BOOST_AFIO_V2_NAMESPACE::asio; +using BOOST_OUTCOME_V1_NAMESPACE::empty; +using BOOST_AFIO_V2_NAMESPACE::error_code; +using BOOST_AFIO_V2_NAMESPACE::generic_category; + +// A special allocator of highly efficient file i/o memory +using file_buffer_type = std::vector<char, afio::utils::page_allocator<char>>; + +// An iostream which reads directly from a memory mapped AFIO file +struct idirectstream : public std::istream +{ + struct directstreambuf : public std::streambuf + { + afio::handle_ptr h; // Holds the file open + std::shared_ptr<file_buffer_type> buffer; + afio::handle::mapped_file_ptr mfp; + // From a mmap + directstreambuf(afio::handle_ptr _h, afio::handle::mapped_file_ptr _mfp, size_t length) : h(std::move(_h)), mfp(std::move(_mfp)) + { + // Set the get buffer this streambuf is to use + setg((char *) mfp->addr, (char *) mfp->addr, (char *) mfp->addr + length); + } + // From a malloc + directstreambuf(afio::handle_ptr _h, std::shared_ptr<file_buffer_type> _buffer, size_t length) : h(std::move(_h)), buffer(std::move(_buffer)) + { + // Set the get buffer this streambuf is to use + setg(buffer->data(), buffer->data(), buffer->data() + length); + } + }; + std::unique_ptr<directstreambuf> buf; + template<class U> idirectstream(afio::handle_ptr h, U &&buffer, size_t length) : std::istream(new directstreambuf(std::move(h), std::forward<U>(buffer), length)), buf(static_cast<directstreambuf *>(rdbuf())) + { + } + virtual ~idirectstream() override + { + // Reset the stream before deleting the buffer + rdbuf(nullptr); + } +}; + +// An iostream which writes to an AFIO file in 4Kb pages +struct odirectstream : public std::ostream +{ + struct directstreambuf : public std::streambuf + { + using int_type = std::streambuf::int_type; + using traits_type = std::streambuf::traits_type; + afio::future<> lastwrite; // the last async write performed + afio::off_t offset; // offset of next write + file_buffer_type buffer; // a page size on this machine + file_buffer_type lastbuffer; + directstreambuf(afio::handle_ptr _h) : lastwrite(std::move(_h)), offset(0), buffer(afio::utils::page_sizes().front()) + { + // Set the put buffer this streambuf is to use + setp(buffer.data(), buffer.data() + buffer.size()); + } + virtual ~directstreambuf() override + { + try + { + // Flush buffers and wait until last write completes + // Schedule an asynchronous write of the buffer to storage + size_t thisbuffer = pptr() - pbase(); + if(thisbuffer) + lastwrite = afio::async_write(afio::async_truncate(lastwrite, offset+thisbuffer), buffer.data(), thisbuffer, offset); + lastwrite.get(); + // TODO: On Windows do I need to close and reopen the file to flush metadata before + // the rename or does the rename do it for me? + // Get handle to the parent directory + auto dirh(lastwrite->container()); + // Atomically rename "tmpXXXXXXXXXXXXXXXX" to "0" + lastwrite->atomic_relink(afio::path_req::relative(dirh, "0")); +#ifdef __linux__ + // Journalled Linux filing systems don't need this, but if you enabled afio::file_flags::always_sync + // you might want to issue this too. + afio::sync(dirh); +#endif + } + catch(...) + { + } + } + virtual int_type overflow(int_type c) override + { + size_t thisbuffer=pptr()-pbase(); + if(thisbuffer>=buffer.size()) + sync(); + if(c!=traits_type::eof()) + { + *pptr()=(char)c; + pbump(1); + return traits_type::to_int_type(c); + } + return traits_type::eof(); + } + virtual int sync() override + { + // Wait for the last write to complete, propagating any exceptions + lastwrite.get(); + size_t thisbuffer=pptr()-pbase(); + if(thisbuffer > 0) + { + // Detach the current buffer and replace with a fresh one to allow the kernel to steal the page + lastbuffer=std::move(buffer); + buffer.resize(lastbuffer.size()); + setp(buffer.data(), buffer.data() + buffer.size()); + // Schedule an extension of physical storage by an extra page + lastwrite = afio::async_truncate(lastwrite, offset + thisbuffer); + // Schedule an asynchronous write of the buffer to storage + lastwrite=afio::async_write(lastwrite, lastbuffer.data(), thisbuffer, offset); + offset+=thisbuffer; + } + return 0; + } + }; + std::unique_ptr<directstreambuf> buf; + odirectstream(afio::handle_ptr h) : std::ostream(new directstreambuf(std::move(h))), buf(static_cast<directstreambuf *>(rdbuf())) + { + } + virtual ~odirectstream() override + { + // Reset the stream before deleting the buffer + rdbuf(nullptr); + } +}; +//] + +//[workshop_atomic_updates_afio1] +namespace asio = BOOST_AFIO_V2_NAMESPACE::asio; +using BOOST_OUTCOME_V1_NAMESPACE::empty; +using BOOST_AFIO_V2_NAMESPACE::error_code; +using BOOST_AFIO_V2_NAMESPACE::generic_category; + +static bool is_valid_name(const std::string &name) noexcept +{ + static const std::string banned("<>:\"/\\|?*\0", 10); + if(std::string::npos!=name.find_first_of(banned)) + return false; + // No leading period + return name[0]!='.'; +} + +// Keep a cache of crypto strong random names +static std::string random_name() +{ + static struct random_names_type + { + std::vector<std::string> names; + size_t idx; + random_names_type(size_t count) : names(count), idx(0) + { + for(size_t n=0; n<count; n++) + names[n]=afio::utils::random_string(16); // 128 bits + } + std::string get() + { + if(idx==names.size()) + idx=0; + return names[idx++]; + } + } random_names(10000); + return random_names.get(); +} + +data_store::data_store(size_t flags, afio::path path) +{ + // Make a dispatcher for the local filesystem URI, masking out write flags on all operations if not writeable + _dispatcher=afio::make_dispatcher("file:///", afio::file_flags::none, !(flags & writeable) ? afio::file_flags::write : afio::file_flags::none).get(); + // Set the dispatcher for this thread, and open a handle to the store directory + afio::current_dispatcher_guard h(_dispatcher); + _store=afio::dir(std::move(path), afio::file_flags::create); // throws if there was an error + // Precalculate the cache of random names + random_name(); +} + +shared_future<data_store::istream> data_store::lookup(std::string name) noexcept +{ + if(!is_valid_name(name)) + return error_code(EINVAL, generic_category()); + try + { + name.append("/0"); + // Schedule the opening of the file for reading + afio::future<> h(afio::async_file(_store, name, afio::file_flags::read)); + // When it completes, call this continuation + return h.then([](afio::future<> &_h) -> shared_future<data_store::istream> { + // If file didn't open, return the error or exception immediately + BOOST_OUTCOME_PROPAGATE(_h); + size_t length=(size_t) _h->lstat(afio::metadata_flags::size).st_size; + // Is a memory map more appropriate? + if(length>=128*1024) + { + afio::handle::mapped_file_ptr mfp; + if((mfp=_h->map_file())) + { + data_store::istream ret(std::make_shared<idirectstream>(_h.get_handle(), std::move(mfp), length)); + return ret; + } + } + // Schedule the reading of the file into a buffer + auto buffer=std::make_shared<file_buffer_type>(length); + afio::future<> h(afio::async_read(_h, buffer->data(), length, 0)); + // When the read completes call this continuation + return h.then([buffer, length](const afio::future<> &h) -> shared_future<data_store::istream> { + // If read failed, return the error or exception immediately + BOOST_OUTCOME_PROPAGATE(h); + data_store::istream ret(std::make_shared<idirectstream>(h.get_handle(), buffer, length)); + return ret; + }); + }); + } + catch(...) + { + return std::current_exception(); + } +} +//] + +//[workshop_atomic_updates_afio2] +shared_future<data_store::ostream> data_store::write(std::string name) noexcept +{ + if(!is_valid_name(name)) + return error_code(EINVAL, generic_category()); + try + { + // Schedule the opening of the directory + afio::future<> dirh(afio::async_dir(_store, name, afio::file_flags::create)); +#ifdef __linux__ + // Flush metadata on Linux only. This will be a noop unless we created a new directory + // above, and if we don't flush the new key directory it and its contents may not appear + // in the store directory after a suddenly power loss, even if it and its contents are + // all on physical storage. + dirh.then([this](const afio::future<> &h) { async_sync(_store); }); +#endif + // Make a crypto strong random file name + std::string randomname("tmp"); + randomname.append(random_name()); + // Schedule the opening of the file for writing + afio::future<> h(afio::async_file(dirh, randomname, afio::file_flags::create | afio::file_flags::write + | afio::file_flags::hold_parent_open // handle should keep a handle_ptr to its parent dir + /*| afio::file_flags::always_sync*/ // writes don't complete until upon physical storage + )); + // When it completes, call this continuation + return h.then([](const afio::future<> &h) -> shared_future<data_store::ostream> { + // If file didn't open, return the error or exception immediately + BOOST_OUTCOME_PROPAGATE(h); + // Create an ostream which directly uses the file. + data_store::ostream ret(std::make_shared<odirectstream>(h.get_handle())); + return std::move(ret); + }); + } + catch (...) + { + return std::current_exception(); + } +} +//] + +int main(void) +{ + // To write a key-value item called "dog" + { + data_store ds; + auto dogh = ds.write("dog").get(); + auto &dogs = *dogh; + dogs << "I am a dog"; + } + + // To retrieve a key-value item called "dog" + { + data_store ds; + auto dogh = ds.lookup("dog"); + if (dogh.empty()) + std::cerr << "No item called dog" << std::endl; + else if(dogh.has_error()) + std::cerr << "ERROR: Looking up dog returned error " << dogh.get_error().message() << std::endl; + else if (dogh.has_exception()) + { + std::cerr << "FATAL: Looking up dog threw exception" << std::endl; + std::terminate(); + } + else + { + std::string buffer; + *dogh.get() >> buffer; + std::cout << "Item dog has value " << buffer << std::endl; + } + } + return 0; +} diff --git a/attic/example/workshop_benchmark.cpp b/attic/example/workshop_benchmark.cpp new file mode 100644 index 00000000..e3f2d58d --- /dev/null +++ b/attic/example/workshop_benchmark.cpp @@ -0,0 +1,151 @@ +#include "afio_pch.hpp" + +#ifdef _DEBUG +#define ITEMS 64 +#else +#define ITEMS 65536 +#endif +#define PARALLELISM 256 // up to max fds on your platform + +namespace iostreams { +#include "workshop_naive.ipp" +} +namespace naive { +#include "workshop_naive_afio.ipp" +} +namespace atomic_updates { +#include "workshop_atomic_updates_afio.ipp" +} +namespace final { +#include "workshop_final_afio.ipp" +#include "../detail/SpookyV2.cpp" +} + + +namespace filesystem = BOOST_AFIO_V2_NAMESPACE::filesystem; +namespace chrono = BOOST_AFIO_V2_NAMESPACE::chrono; +using BOOST_AFIO_V2_NAMESPACE::ratio; + +// From http://burtleburtle.net/bob/rand/smallprng.html +typedef unsigned int u4; +typedef struct ranctx { u4 a; u4 b; u4 c; u4 d; } ranctx; + +#define rot(x,k) (((x)<<(k))|((x)>>(32-(k)))) +static u4 ranval(ranctx *x) { + u4 e = x->a - rot(x->b, 27); + x->a = x->b ^ rot(x->c, 17); + x->b = x->c + x->d; + x->c = x->d + e; + x->d = e + x->a; + return x->d; +} + +static void raninit(ranctx *x, u4 seed) { + u4 i; + x->a = 0xf1ea5eed, x->b = x->c = x->d = seed; + for (i = 0; i < 20; ++i) { + (void) ranval(x); + } +} + +static std::vector<std::pair<ranctx, unsigned>> items; + +void prepare() +{ + ranctx gen; + raninit(&gen, 0x78adbcff); + items.clear(); + items.reserve(ITEMS); + for (size_t n = 0; n < ITEMS; n++) + { + unsigned t=ranval(&gen); + items.push_back(std::make_pair(gen, t)); + } +} + +template<class data_store> void benchmark(const char *filename, const char *desc, bool parallel_writes) +{ + typedef chrono::duration<double, ratio<1, 1>> secs_type; + std::vector<std::tuple<size_t, double, double>> results; + prepare(); + for(size_t n=1; n<=ITEMS; n<<=1) + { + std::cout << "Benchmarking " << desc << " insertion of " << n << " records ... "; + if(filesystem::exists("store")) + filesystem::remove_all("store"); + auto begin=chrono::high_resolution_clock::now(); + while(chrono::duration_cast<secs_type>(chrono::high_resolution_clock::now()-begin).count()<1); + data_store store(data_store::writeable); + size_t write_parallelism = parallel_writes ? PARALLELISM : 1; + std::vector<typename data_store::write_result_type> ops(write_parallelism); + begin=chrono::high_resolution_clock::now(); + for (size_t m = 0; m < n; m += write_parallelism) + { + int todo = (int)(n < write_parallelism ? n : write_parallelism); +#pragma omp parallel for + for (int o = 0; o < todo; o++) + ops[o] = store.write(std::to_string(m + (size_t)o)); +#pragma omp parallel for + for (int o = 0; o < todo; o++) + { + *ops[o].get() << items[m + (size_t)o].second; + ops[o].get()->flush(); + } + // Some dumping of the writes should have happened by now +#pragma omp parallel for + for (int o = 0; o < todo; o++) + ops[o]=typename data_store::write_result_type(); + } + auto end=chrono::high_resolution_clock::now(); + auto diff=chrono::duration_cast<secs_type>(end-begin).count(); + std::cout << (n/diff) << " items/sec" << std::endl; + results.push_back(std::make_tuple(n, n/diff, 0)); + } + auto resultsit = results.begin(); + for (size_t n = 1; n <= ITEMS; n <<= 1) + { + std::cout << "Benchmarking " << desc << " lookup of " << n << " records ... "; + data_store store; + std::vector<typename data_store::lookup_result_type> ops(PARALLELISM); + auto begin = chrono::high_resolution_clock::now(); + for (size_t m = 0; m < n; m += PARALLELISM) + { + int todo = n < PARALLELISM ? n : PARALLELISM; +#pragma omp parallel for + for (int o = 0; o < todo; o++) + ops[o] = store.lookup(std::to_string(m + (size_t) o)); + // Some readahead should have happened by now +#pragma omp parallel for + for (int o = 0; o < todo; o++) + { + auto s(ops[o].get()); + unsigned t; + *s >> t; + if (t != items[m+(size_t) o].second) + std::cerr << "ERROR: Item " << m << " has incorrect contents!" << std::endl; + ops[o]=typename data_store::lookup_result_type(); + } + } + auto end = chrono::high_resolution_clock::now(); + auto diff = chrono::duration_cast<secs_type>(end - begin).count(); + std::cout << (n / diff) << " items/sec" << std::endl; + std::get<2>(*resultsit++) = n / diff; + } + std::ofstream csvh(filename); + csvh << "Items,Inserts/sec,Lookups/sec" << std::endl; + for(auto &i : results) + csvh << std::get<0>(i) << "," << std::get<1>(i) << "," << std::get<2>(i) << std::endl; +} + +int main(void) +{ + typedef chrono::duration<double, ratio<1, 1>> secs_type; + auto begin=chrono::high_resolution_clock::now(); + while(chrono::duration_cast<secs_type>(chrono::high_resolution_clock::now()-begin).count()<3); + + //benchmark<iostreams::data_store>("iostreams.csv", "STL iostreams", true); + //benchmark<naive::data_store>("afio_naive.csv", "AFIO naive", true); + //benchmark<atomic_updates::data_store>("afio_atomic.csv", "AFIO atomic update", true); + benchmark<final::data_store>("afio_final.csv", "AFIO single file", true); + return 0; +}
\ No newline at end of file diff --git a/attic/example/workshop_final_afio.cpp b/attic/example/workshop_final_afio.cpp new file mode 100644 index 00000000..c3fcceb9 --- /dev/null +++ b/attic/example/workshop_final_afio.cpp @@ -0,0 +1,2 @@ +#include "afio_pch.hpp" +#include "workshop_final_afio.ipp" diff --git a/attic/example/workshop_final_afio.ipp b/attic/example/workshop_final_afio.ipp new file mode 100644 index 00000000..655b8367 --- /dev/null +++ b/attic/example/workshop_final_afio.ipp @@ -0,0 +1,665 @@ +#include "../detail/SpookyV2.h" + +//[workshop_final_interface +namespace afio = BOOST_AFIO_V2_NAMESPACE; +namespace filesystem = BOOST_AFIO_V2_NAMESPACE::filesystem; +using BOOST_OUTCOME_V1_NAMESPACE::outcome; +using BOOST_OUTCOME_V1_NAMESPACE::lightweight_futures::shared_future; + +class data_store +{ + struct _ostream; + friend struct _ostream; + afio::dispatcher_ptr _dispatcher; + // The small blob store keeps non-memory mappable blobs at 32 byte alignments + afio::handle_ptr _small_blob_store, _small_blob_store_append, _small_blob_store_ecc; + // The large blob store keeps memory mappable blobs at 4Kb alignments + afio::handle_ptr _large_blob_store, _large_blob_store_append, _large_blob_store_ecc; + // The index is where we keep the map of keys to blobs + afio::handle_ptr _index_store, _index_store_append, _index_store_ecc; + struct index; + std::unique_ptr<index> _index; +public: + // Type used for read streams + using istream = std::shared_ptr<std::istream>; + // Type used for write streams + using ostream = std::shared_ptr<std::ostream>; + // Type used for lookup + using lookup_result_type = shared_future<istream>; + // Type used for write + using write_result_type = outcome<ostream>; + + // Disposition flags + static constexpr size_t writeable = (1<<0); + + // Open a data store at path + data_store(size_t flags = 0, afio::path path = "store"); + + // Look up item named name for reading, returning an istream for the item + shared_future<istream> lookup(std::string name) noexcept; + // Look up item named name for writing, returning an ostream for that item + outcome<ostream> write(std::string name) noexcept; +}; +//] + +//[workshop_final1] +namespace asio = BOOST_AFIO_V2_NAMESPACE::asio; +using BOOST_AFIO_V2_NAMESPACE::error_code; +using BOOST_AFIO_V2_NAMESPACE::generic_category; +using BOOST_OUTCOME_V1_NAMESPACE::outcome; + +// A special allocator of highly efficient file i/o memory +using file_buffer_type = std::vector<char, afio::utils::page_allocator<char>>; + +// Serialisation helper types +#pragma pack(push, 1) +struct ondisk_file_header // 20 bytes +{ + union + { + afio::off_t length; // Always 32 in byte order of whoever wrote this + char endian[8]; + }; + afio::off_t index_offset_begin; // Hint to the length of the store when the index was last written + unsigned int time_count; // Racy monotonically increasing count +}; +struct ondisk_record_header // 28 bytes - ALWAYS ALIGNED TO 32 BYTE FILE OFFSET +{ + afio::off_t magic : 16; // 0xad magic + afio::off_t kind : 2; // 0 for zeroed space, 1,2 for blob, 3 for index + afio::off_t _spare : 6; + afio::off_t length : 40; // Size of the object (including this preamble, regions, key values) (+8) + unsigned int age; // file header time_count when this was added (+12) + uint64 hash[2]; // 128-bit SpookyHash of the object (from below onwards) (+28) + // ondisk_index_regions + // ondisk_index_key_value (many until length) +}; +struct ondisk_index_regions // 12 + regions_size * 32 +{ + afio::off_t thisoffset; // this index only valid if equals this offset + unsigned int regions_size; // count of regions with their status (+28) + struct ondisk_index_region + { + afio::off_t offset; // offset to this region + ondisk_record_header r; // copy of the header at the offset to avoid a disk seek + } regions[1]; +}; +struct ondisk_index_key_value // 8 + sizeof(key) +{ + unsigned int region_index; // Index into regions + unsigned int name_size; // Length of key + char name[1]; // Key string (utf-8) +}; +#pragma pack(pop) + +struct data_store::index +{ + struct region + { + enum kind_type { zeroed=0, small_blob=1, large_blob=2, index=3 } kind; + afio::off_t offset, length; + uint64 hash[2]; + region(ondisk_index_regions::ondisk_index_region *r) : kind(static_cast<kind_type>(r->r.kind)), offset(r->offset), length(r->r.length) { memcpy(hash, r->r.hash, sizeof(hash)); } + bool operator<(const region &o) const noexcept { return offset<o.offset; } + bool operator==(const region &o) const noexcept { return offset==o.offset && length==o.length; } + }; + afio::off_t offset_loaded_from; // Offset this index was loaded from + unsigned int last_time_count; // Header time count + std::vector<region> regions; + std::unordered_map<std::string, size_t> key_to_region; + index() : offset_loaded_from(0) { } +//] +//[workshop_final2] + struct last_good_ondisk_index_info + { + afio::off_t offset; + std::unique_ptr<char[]> buffer; + size_t size; + last_good_ondisk_index_info() : offset(0), size(0) { } + }; + // Finds the last good index in the store + outcome<last_good_ondisk_index_info> find_last_good_ondisk_index(afio::handle_ptr h) noexcept + { + last_good_ondisk_index_info ret; + error_code ec; + try + { + // Read the front of the store file to get the index offset hint + ondisk_file_header header; + afio::read(h, header, 0); + afio::off_t offset=0; + if(header.length==32) + offset=header.index_offset_begin; + else if(header.endian[0]==32) // wrong endian + return error_code(ENOEXEC, generic_category()); + else // corrupted + return error_code(ENOTSUP, generic_category()); + last_time_count=header.time_count; + // Fetch the valid extents + auto valid_extents(afio::extents(h)); + auto valid_extents_it=valid_extents.begin(); + // Iterate the records starting from index offset hint, keeping track of last known good index + bool done=true; + do + { + afio::off_t linear_scanning=0; + ondisk_record_header record; + afio::off_t file_length=h->lstat(afio::metadata_flags::size).st_size; + for(; offset<file_length;) + { + // Round to 32 byte boundary + offset&=~31ULL; + // Find my valid extent + while(offset>=valid_extents_it->first+valid_extents_it->second) + { + if(valid_extents.end()==++valid_extents_it) + { + valid_extents=afio::extents(h); + valid_extents_it=valid_extents.begin(); + } + } + // Is this offset within a valid extent? If not, bump it. + if(offset<valid_extents_it->first) + offset=valid_extents_it->first; + afio::read(ec, h, record, offset); + if(ec) return ec; + // If this does not contain a valid record, linear scan + // until we find one + if(record.magic!=0xad || (record.length & 31)) + { +start_linear_scan: + if(!linear_scanning) + { + std::cerr << "WARNING: Corrupt record detected at " << offset << ", linear scanning ..." << std::endl; + linear_scanning=offset; + } + offset+=32; + continue; + } + // Is this the first good record after a corrupted section? + if(linear_scanning) + { + std::cerr << "NOTE: Found valid record after linear scan at " << offset << std::endl; + std::cerr << " Removing invalid data between " << linear_scanning << " and " << offset << std::endl; + // Rewrite a valid record to span the invalid section + ondisk_record_header temp={0}; + temp.magic=0xad; + temp.length=offset-linear_scanning; + temp.age=last_time_count; + afio::write(ec, h, temp, linear_scanning); + // Deallocate the physical storage for the invalid section + afio::zero(ec, h, {{linear_scanning+12, offset-linear_scanning-12}}); + linear_scanning=0; + } + // If not an index, skip entire record + if(record.kind!=3 /*index*/) + { + // If this record length is wrong, start a linear scan + if(record.length>file_length-offset) + offset+=32; + else + offset+=record.length; + continue; + } + std::unique_ptr<char[]> buffer; + // If record.length is corrupt, this will throw bad_alloc + try + { + buffer.reset(new char[(size_t) record.length-sizeof(header)]); + } + catch(...) + { + // Either we are out of memory, or it's a corrupted record + // TODO: Try a ECC heal pass here + // If this record length is wrong, start a linear scan + if(record.length>file_length-offset) + goto start_linear_scan; + else + offset+=record.length; + continue; + } + afio::read(ec, h, buffer.get(), (size_t) record.length-sizeof(header), offset+sizeof(header)); + if(ec) + return ec; + uint64 hash[2]={0, 0}; + SpookyHash::Hash128(buffer.get(), (size_t) record.length-sizeof(header), hash, hash+1); + // Is this record correct? + if(!memcmp(hash, record.hash, sizeof(hash))) + { + // Is this index not conflicted? If not, it's the new most recent index + ondisk_index_regions *r=(ondisk_index_regions *) buffer.get(); + if(r->thisoffset==offset) + { + ret.buffer=std::move(buffer); + ret.size=(size_t) record.length-sizeof(header); + ret.offset=offset; + } + } + offset+=record.length; + } + if(ret.offset) // we have a valid most recent index so we're done + done=true; + else if(header.index_offset_begin>sizeof(header)) + { + // Looks like the end of the store got trashed. + // Reparse from the very beginning + offset=32; + header.index_offset_begin=0; + } + else + { + // No viable records at all, or empty store. + done=true; + } + } while(!done); + return ret; + } + catch(...) + { + return std::current_exception(); + } + } +//] +//[workshop_final3] + // Loads the index from the store + outcome<void> load(afio::handle_ptr h) noexcept + { + // If find_last_good_ondisk_index() returns error or exception, return those, else + // initialise ondisk_index_info to monad.get() + BOOST_OUTCOME_AUTO(ondisk_index_info, find_last_good_ondisk_index(h)); + error_code ec; + try + { + offset_loaded_from=0; + regions.clear(); + key_to_region.clear(); + ondisk_index_regions *r=(ondisk_index_regions *) ondisk_index_info.buffer.get(); + regions.reserve(r->regions_size); + for(size_t n=0; n<r->regions_size; n++) + regions.push_back(region(r->regions+n)); + ondisk_index_key_value *k=(ondisk_index_key_value *)(r+r->regions_size), *end=(ondisk_index_key_value *)(ondisk_index_info.buffer.get()+ondisk_index_info.size); + while(k<end) + key_to_region.insert(std::make_pair(std::string(k->name, k->name_size), k->region_index)); + offset_loaded_from=ondisk_index_info.offset; + return {}; + } + catch(...) + { + return std::current_exception(); + } + } +//] +//[workshop_final4] + // Writes the index to the store + outcome<void> store(afio::handle_ptr rwh, afio::handle_ptr appendh) noexcept + { + error_code ec; + std::vector<ondisk_index_regions::ondisk_index_region> ondisk_regions; + ondisk_regions.reserve(65536); + for(auto &i : regions) + { + ondisk_index_regions::ondisk_index_region r; + r.offset=i.offset; + r.r.kind=i.kind; + r.r.length=i.length; + memcpy(r.r.hash, i.hash, sizeof(i.hash)); + ondisk_regions.push_back(r); + } + + size_t bytes=0; + for(auto &i : key_to_region) + bytes+=8+i.first.size(); + std::vector<char> ondisk_key_values(bytes); + ondisk_index_key_value *kv=(ondisk_index_key_value *) ondisk_key_values.data(); + for(auto &i : key_to_region) + { + kv->region_index=(unsigned int) i.second; + kv->name_size=(unsigned int) i.first.size(); + memcpy(kv->name, i.first.c_str(), i.first.size()); + kv=(ondisk_index_key_value *)(((char *) kv) + 8 + i.first.size()); + } + + struct + { + ondisk_record_header header; + ondisk_index_regions header2; + } h; + h.header.magic=0xad; + h.header.kind=3; // writing an index + h.header._spare=0; + h.header.length=sizeof(h.header)+sizeof(h.header2)+sizeof(ondisk_regions.front())*(regions.size()-1)+bytes; + h.header.age=last_time_count; + // hash calculated later + // thisoffset calculated later + h.header2.regions_size=(unsigned int) regions.size(); + // Pad zeros to 32 byte multiple + std::vector<char> zeros((h.header.length+31)&~31ULL); + h.header.length+=zeros.size(); + + // Create the gather buffer sequence + std::vector<asio::const_buffer> buffers(4); + buffers[0]=asio::const_buffer(&h, 36); + buffers[1]=asio::const_buffer(ondisk_regions.data(), sizeof(ondisk_regions.front())*ondisk_regions.size()); + buffers[2]=asio::const_buffer(ondisk_key_values.data(), ondisk_key_values.size()); + if(zeros.empty()) + buffers.pop_back(); + else + buffers[3]=asio::const_buffer(zeros.data(), zeros.size()); + file_buffer_type reread(sizeof(h)); + ondisk_record_header *reread_header=(ondisk_record_header *) reread.data(); + bool success=false; + do + { + // Is this index stale? + BOOST_OUTCOME_AUTO(ondisk_index_info, find_last_good_ondisk_index(rwh)); + if(ondisk_index_info.offset!=offset_loaded_from) + { + // A better conflict resolution strategy might check to see if deltas + // are compatible, but for the sake of brevity we always report conflict + return error_code(EDEADLK, generic_category()); + } + // Take the current length of the store file. Any index written will be at or after this. + h.header2.thisoffset=appendh->lstat(afio::metadata_flags::size).st_size; + memset(h.header.hash, 0, sizeof(h.header.hash)); + // Hash the end of the first gather buffer and all the remaining gather buffers + SpookyHash::Hash128(asio::buffer_cast<const char *>(buffers[0])+24, asio::buffer_size(buffers[0])-24, h.header.hash, h.header.hash+1); + SpookyHash::Hash128(asio::buffer_cast<const char *>(buffers[1]), asio::buffer_size(buffers[1]), h.header.hash, h.header.hash+1); + SpookyHash::Hash128(asio::buffer_cast<const char *>(buffers[2]), asio::buffer_size(buffers[2]), h.header.hash, h.header.hash+1); + if(buffers.size()>3) + SpookyHash::Hash128(asio::buffer_cast<const char *>(buffers[3]), asio::buffer_size(buffers[3]), h.header.hash, h.header.hash+1); + // Atomic append the record + afio::write(ec, appendh, buffers, 0); + if(ec) return ec; + // Reread the record + afio::read(ec, rwh, reread.data(), reread.size(), h.header2.thisoffset); + if(ec) return ec; + // If the record doesn't match it could be due to a lag in st_size between open handles, + // so retry until success or stale index + } while(memcmp(reread_header->hash, h.header.hash, sizeof(h.header.hash))); + + // New index has been successfully written. Update the hint at the front of the file. + // This update is racy of course, but as it's merely a hint we don't care. + ondisk_file_header file_header; + afio::read(ec, rwh, file_header, 0); + if(!ec && file_header.index_offset_begin<h.header2.thisoffset) + { + file_header.index_offset_begin=h.header2.thisoffset; + file_header.time_count++; + afio::write(ec, rwh, file_header, 0); + } + offset_loaded_from=h.header2.thisoffset; + last_time_count=file_header.time_count; + return {}; + } +//] +//[workshop_final5] + // Reloads the index if needed + outcome<void> refresh(afio::handle_ptr h) noexcept + { + static afio::off_t last_size; + error_code ec; + afio::off_t size=h->lstat(afio::metadata_flags::size).st_size; + // Has the size changed? If so, need to check the hint + if(size>last_size) + { + last_size=size; + ondisk_file_header header; + afio::read(ec, h, header, 0); + if(ec) return ec; + // If the hint is moved, we are stale + if(header.index_offset_begin>offset_loaded_from) + return load(h); + } + return {}; + } +}; +//] + +//[workshop_final6] +namespace asio = BOOST_AFIO_V2_NAMESPACE::asio; +using BOOST_AFIO_V2_NAMESPACE::error_code; +using BOOST_AFIO_V2_NAMESPACE::generic_category; + +// A special allocator of highly efficient file i/o memory +using file_buffer_type = std::vector<char, afio::utils::page_allocator<char>>; + +// An iostream which reads directly from a memory mapped AFIO file +struct idirectstream : public std::istream +{ + struct directstreambuf : public std::streambuf + { + afio::handle_ptr h; // Holds the file open + std::shared_ptr<file_buffer_type> buffer; + // From a mmap + directstreambuf(afio::handle_ptr _h, char *addr, size_t length) : h(std::move(_h)) + { + // Set the get buffer this streambuf is to use + setg(addr, addr, addr + length); + } + // From a malloc + directstreambuf(afio::handle_ptr _h, std::shared_ptr<file_buffer_type> _buffer, size_t length) : h(std::move(_h)), buffer(std::move(_buffer)) + { + // Set the get buffer this streambuf is to use + setg(buffer->data(), buffer->data(), buffer->data() + length); + } + }; + std::unique_ptr<directstreambuf> buf; + template<class U> idirectstream(afio::handle_ptr h, U &&buffer, size_t length) : std::istream(new directstreambuf(std::move(h), std::forward<U>(buffer), length)), buf(static_cast<directstreambuf *>(rdbuf())) + { + } + virtual ~idirectstream() override + { + // Reset the stream before deleting the buffer + rdbuf(nullptr); + } +}; +//] +//[workshop_final7] +// An iostream which buffers all the output, then commits on destruct +struct data_store::_ostream : public std::ostream +{ + struct ostreambuf : public std::streambuf + { + using int_type = std::streambuf::int_type; + using traits_type = std::streambuf::traits_type; + data_store *ds; + std::string name; + file_buffer_type buffer; + ostreambuf(data_store *_ds, std::string _name) : ds(_ds), name(std::move(_name)), buffer(afio::utils::page_sizes().front()) + { + // Set the put buffer this streambuf is to use + setp(buffer.data(), buffer.data() + buffer.size()); + } + virtual ~ostreambuf() override + { + try + { + ondisk_index_regions::ondisk_index_region r; + r.r.magic=0xad; + r.r.kind=1; // small blob + r.r.length=sizeof(r.r)+buffer.size(); + r.r.length=(r.r.length+31)&~31ULL; // pad to 32 byte multiple + r.r.age=ds->_index->last_time_count; + memset(r.r.hash, 0, sizeof(r.r.hash)); + SpookyHash::Hash128(buffer.data(), (size_t)(r.r.length-sizeof(r.r)), r.r.hash, r.r.hash+1); + + // Create the gather buffer sequence and atomic append the blob + std::vector<asio::const_buffer> buffers(2); + buffers[0]=asio::const_buffer((char *) &r.r, sizeof(r.r)); + buffers[1]=asio::const_buffer(buffer.data(), (size_t)(r.r.length-sizeof(r.r))); + error_code ec; + auto offset=ds->_small_blob_store_append->lstat(afio::metadata_flags::size).st_size; + afio::write(ec, ds->_small_blob_store_append, buffers, 0); + if(ec) + abort(); // should really do something better here + + // Find out where my blob ended up + ondisk_record_header header; + do + { + afio::read(ec, ds->_small_blob_store_append, header, offset); + if(ec) abort(); + if(header.kind==1 /*small blob*/ && !memcmp(header.hash, r.r.hash, sizeof(header.hash))) + { + r.offset=offset; + break; + } + offset+=header.length; + } while(offset<ds->_small_blob_store_append->lstat(afio::metadata_flags::size).st_size); + + for(;;) + { + // Add my blob to the regions + ds->_index->regions.push_back(&r); + // Add my key to the key index + ds->_index->key_to_region[name]=ds->_index->regions.size()-1; + // Commit the index, and if successful exit + if(!ds->_index->store(ds->_index_store, ds->_index_store_append)) + return; + // Reload the index and retry + if(ds->_index->load(ds->_index_store)) + abort(); + } + } + catch(...) + { + } + } + virtual int_type overflow(int_type c) override + { + size_t thisbuffer=pptr()-pbase(); + if(thisbuffer>=buffer.size()) + sync(); + if(c!=traits_type::eof()) + { + *pptr()=(char)c; + pbump(1); + return traits_type::to_int_type(c); + } + return traits_type::eof(); + } + virtual int sync() override + { + buffer.resize(buffer.size()*2); + setp(buffer.data() + buffer.size()/2, buffer.data() + buffer.size()); + return 0; + } + }; + std::unique_ptr<ostreambuf> buf; + _ostream(data_store *ds, std::string name) : std::ostream(new ostreambuf(ds, std::move(name))), buf(static_cast<ostreambuf *>(rdbuf())) + { + } + virtual ~_ostream() override + { + // Reset the stream before deleting the buffer + rdbuf(nullptr); + } +}; +//] + +//[workshop_final8] +data_store::data_store(size_t flags, afio::path path) +{ + // Make a dispatcher for the local filesystem URI, masking out write flags on all operations if not writeable + _dispatcher=afio::make_dispatcher("file:///", afio::file_flags::none, !(flags & writeable) ? afio::file_flags::write : afio::file_flags::none).get(); + // Set the dispatcher for this thread, and create/open a handle to the store directory + afio::current_dispatcher_guard h(_dispatcher); + auto dirh(afio::dir(std::move(path), afio::file_flags::create)); // throws if there was an error + + // The small blob store keeps non-memory mappable blobs at 32 byte alignments + _small_blob_store_append=afio::file(dirh, "small_blob_store", afio::file_flags::create | afio::file_flags::append); // throws if there was an error + _small_blob_store=afio::file(dirh, "small_blob_store", afio::file_flags::read_write); // throws if there was an error + _small_blob_store_ecc=afio::file(dirh, "small_blob_store.ecc", afio::file_flags::create | afio::file_flags::read_write); // throws if there was an error + + // The large blob store keeps memory mappable blobs at 4Kb alignments + // TODO + + // The index is where we keep the map of keys to blobs + _index_store_append=afio::file(dirh, "index", afio::file_flags::create | afio::file_flags::append); // throws if there was an error + _index_store=afio::file(dirh, "index", afio::file_flags::read_write); // throws if there was an error + _index_store_ecc=afio::file(dirh, "index.ecc", afio::file_flags::create | afio::file_flags::read_write); // throws if there was an error + // Is this store just created? + if(!_index_store_append->lstat(afio::metadata_flags::size).st_size) + { + ondisk_file_header header; + header.length=32; + header.index_offset_begin=32; + header.time_count=0; + // This is racy, but the file format doesn't care + afio::write(_index_store_append, header, 0); + } + _index.reset(new index); +} + +shared_future<data_store::istream> data_store::lookup(std::string name) noexcept +{ + try + { + BOOST_OUTCOME_PROPAGATE(_index->refresh(_index_store)); + auto it=_index->key_to_region.find(name); + if(_index->key_to_region.end()==it) + return error_code(ENOENT, generic_category()); // not found + auto &r=_index->regions[it->second]; + afio::off_t offset=r.offset+24, length=r.length-24; + // Schedule the reading of the file into a buffer + auto buffer=std::make_shared<file_buffer_type>((size_t) length); + afio::future<> h(afio::async_read(_small_blob_store, buffer->data(), (size_t) length, offset)); + // When the read completes call this continuation + return h.then([buffer, length](const afio::future<> &h) -> shared_future<data_store::istream> { + // If read failed, return the error or exception immediately + BOOST_OUTCOME_PROPAGATE(h); + data_store::istream ret(std::make_shared<idirectstream>(h.get_handle(), buffer, (size_t) length)); + return ret; + }); + } + catch(...) + { + return std::current_exception(); + } +} + +outcome<data_store::ostream> data_store::write(std::string name) noexcept +{ + try + { + return std::make_shared<_ostream>(this, std::move(name)); + } + catch (...) + { + return std::current_exception(); + } +} +//] + +int main(void) +{ + // To write a key-value item called "dog" + { + data_store ds; + auto dogh = ds.write("dog").get(); + auto &dogs = *dogh; + dogs << "I am a dog"; + } + + // To retrieve a key-value item called "dog" + { + data_store ds; + auto dogh = ds.lookup("dog"); + if (dogh.empty()) + std::cerr << "No item called dog" << std::endl; + else if(dogh.has_error()) + std::cerr << "ERROR: Looking up dog returned error " << dogh.get_error().message() << std::endl; + else if (dogh.has_exception()) + { + std::cerr << "FATAL: Looking up dog threw exception" << std::endl; + std::terminate(); + } + else + { + std::string buffer; + *dogh.get() >> buffer; + std::cout << "Item dog has value " << buffer << std::endl; + } + } + return 0; +} diff --git a/attic/example/workshop_naive.cpp b/attic/example/workshop_naive.cpp new file mode 100644 index 00000000..225d09b7 --- /dev/null +++ b/attic/example/workshop_naive.cpp @@ -0,0 +1,2 @@ +#include "afio_pch.hpp" +#include "workshop_naive.ipp" diff --git a/attic/example/workshop_naive.ipp b/attic/example/workshop_naive.ipp new file mode 100644 index 00000000..0739095b --- /dev/null +++ b/attic/example/workshop_naive.ipp @@ -0,0 +1,126 @@ +//[workshop_naive_interface +namespace filesystem = BOOST_AFIO_V2_NAMESPACE::filesystem; +using BOOST_OUTCOME_V1_NAMESPACE::outcome; + +class data_store +{ + filesystem::path _store_path; + bool _writeable; +public: + // Type used for read streams + using istream = std::shared_ptr<std::istream>; + // Type used for write streams + using ostream = std::shared_ptr<std::ostream>; + // Type used for lookup + using lookup_result_type = outcome<istream>; + // Type used for write + using write_result_type = outcome<ostream>; + + // Disposition flags + static constexpr size_t writeable = (1<<0); + + // Open a data store at path + data_store(size_t flags = 0, filesystem::path path = "store"); + + // Look up item named name for reading, returning a std::istream for the item if it exists + outcome<istream> lookup(std::string name) noexcept; + // Look up item named name for writing, returning an ostream for that item + outcome<ostream> write(std::string name) noexcept; +}; +//] + +//[workshop_naive] +using BOOST_OUTCOME_V1_NAMESPACE::empty; +using BOOST_AFIO_V2_NAMESPACE::error_code; +using BOOST_AFIO_V2_NAMESPACE::generic_category; + +static bool is_valid_name(const std::string &name) noexcept +{ + static const std::string banned("<>:\"/\\|?*\0", 10); + if(std::string::npos!=name.find_first_of(banned)) + return false; + // No leading period + return name[0]!='.'; +} + +static std::shared_ptr<std::fstream> name_to_fstream(const filesystem::path &store_path, std::string name, std::ios::openmode mode) +{ + auto to_lookup(store_path / name); + return std::make_shared<std::fstream>(to_lookup.native(), mode); +} + +data_store::data_store(size_t flags, filesystem::path path) : _store_path(std::move(path)), _writeable(flags & writeable) +{ +} + +outcome<data_store::istream> data_store::lookup(std::string name) noexcept +{ + if(!is_valid_name(name)) + return error_code(EINVAL, generic_category()); + try + { + istream ret(name_to_fstream(_store_path, std::move(name), std::ios::in | std::ios::binary)); + if(!ret->fail()) + return std::move(ret); + return empty; + } + catch(...) + { + return std::current_exception(); + } +} + +outcome<data_store::ostream> data_store::write(std::string name) noexcept +{ + if(!_writeable) + return error_code(EROFS, generic_category()); + if(!is_valid_name(name)) + return error_code(EINVAL, generic_category()); + try + { + if (!filesystem::exists(_store_path)) + filesystem::create_directory(_store_path); + ostream ret(name_to_fstream(_store_path, std::move(name), std::ios::out | std::ios::binary)); + return std::move(ret); + } + catch (...) + { + return std::current_exception(); + } +} +//] + +int main(void) +{ + //[workshop_use_naive] + // To write a key-value item called "dog" + { + data_store ds; + auto dogh = ds.write("dog").get(); + auto &dogs = *dogh; + dogs << "I am a dog"; + } + + // To retrieve a key-value item called "dog" + { + data_store ds; + auto dogh = ds.lookup("dog"); + if (dogh.empty()) + std::cerr << "No item called dog" << std::endl; + else if(dogh.has_error()) + std::cerr << "ERROR: Looking up dog returned error " << dogh.get_error().message() << std::endl; + else if (dogh.has_exception()) + { + std::cerr << "FATAL: Looking up dog threw exception" << std::endl; + std::terminate(); + } + else + { + std::string buffer; + *dogh.get() >> buffer; + std::cout << "Item dog has value " << buffer << std::endl; + } + } + //] + return 0; +} diff --git a/attic/example/workshop_naive_afio.cpp b/attic/example/workshop_naive_afio.cpp new file mode 100644 index 00000000..a807b6c8 --- /dev/null +++ b/attic/example/workshop_naive_afio.cpp @@ -0,0 +1,2 @@ +#include "afio_pch.hpp" +#include "workshop_naive_afio.ipp" diff --git a/attic/example/workshop_naive_afio.ipp b/attic/example/workshop_naive_afio.ipp new file mode 100644 index 00000000..6100ed96 --- /dev/null +++ b/attic/example/workshop_naive_afio.ipp @@ -0,0 +1,269 @@ +//[workshop_naive_afio_interface +namespace afio = BOOST_AFIO_V2_NAMESPACE; +namespace filesystem = BOOST_AFIO_V2_NAMESPACE::filesystem; +using BOOST_OUTCOME_V1_NAMESPACE::outcome; + +class data_store +{ + afio::dispatcher_ptr _dispatcher; + afio::handle_ptr _store; +public: + // Type used for read streams + using istream = std::shared_ptr<std::istream>; + // Type used for write streams + using ostream = std::shared_ptr<std::ostream>; + // Type used for lookup + using lookup_result_type = outcome<istream>; + // Type used for write + using write_result_type = outcome<ostream>; + + // Disposition flags + static constexpr size_t writeable = (1<<0); + + // Open a data store at path + data_store(size_t flags = 0, afio::path path = "store"); + + // Look up item named name for reading, returning a std::istream for the item if it exists + outcome<istream> lookup(std::string name) noexcept; + // Look up item named name for writing, returning an ostream for that item + outcome<ostream> write(std::string name) noexcept; +}; +//] + +//[workshop_naive_afio2] +namespace asio = BOOST_AFIO_V2_NAMESPACE::asio; +using BOOST_OUTCOME_V1_NAMESPACE::empty; +using BOOST_AFIO_V2_NAMESPACE::error_code; +using BOOST_AFIO_V2_NAMESPACE::generic_category; + +// A special allocator of highly efficient file i/o memory +using file_buffer_type = std::vector<char, afio::utils::page_allocator<char>>; + +// An iostream which reads directly from a memory mapped AFIO file +struct idirectstream : public std::istream +{ + struct directstreambuf : public std::streambuf + { + afio::handle_ptr h; // Holds the file open and therefore mapped + file_buffer_type buffer; + afio::handle::mapped_file_ptr mfp; + directstreambuf(error_code &ec, afio::handle_ptr _h) : h(std::move(_h)) + { + // Get the size of the file. If greater than 128Kb mmap it + size_t length=(size_t) h->lstat(afio::metadata_flags::size).st_size; + char *p=nullptr; + if(length>=128*1024) + { + if((mfp=h->map_file())) + p = (char *) mfp->addr; + } + if(!p) + { + buffer.resize(length); + afio::read(ec, h, buffer.data(), length, 0); + p=buffer.data(); + } + // Set the get buffer this streambuf is to use + setg(p, p, p + length); + } + }; + std::unique_ptr<directstreambuf> buf; + idirectstream(error_code &ec, afio::handle_ptr h) : std::istream(new directstreambuf(ec, std::move(h))), buf(static_cast<directstreambuf *>(rdbuf())) + { + } + virtual ~idirectstream() override + { + // Reset the stream before deleting the buffer + rdbuf(nullptr); + } +}; + +// An iostream which writes to an AFIO file in 4Kb pages +struct odirectstream : public std::ostream +{ + struct directstreambuf : public std::streambuf + { + using int_type = std::streambuf::int_type; + using traits_type = std::streambuf::traits_type; + afio::future<> lastwrite; // the last async write performed + afio::off_t offset; // offset of next write + file_buffer_type buffer; // a page size on this machine + file_buffer_type lastbuffer; + directstreambuf(afio::handle_ptr _h) : lastwrite(std::move(_h)), offset(0), buffer(afio::utils::page_sizes().front()) + { + // Set the put buffer this streambuf is to use + setp(buffer.data(), buffer.data() + buffer.size()); + } + virtual ~directstreambuf() override + { + try + { + // Flush buffers and wait until last write completes + // Schedule an asynchronous write of the buffer to storage + size_t thisbuffer = pptr() - pbase(); + if(thisbuffer) + lastwrite = afio::async_write(afio::async_truncate(lastwrite, offset+thisbuffer), buffer.data(), thisbuffer, offset); + lastwrite.get(); + } + catch(...) + { + } + } + virtual int_type overflow(int_type c) override + { + size_t thisbuffer=pptr()-pbase(); + if(thisbuffer>=buffer.size()) + sync(); + if(c!=traits_type::eof()) + { + *pptr()=(char)c; + pbump(1); + return traits_type::to_int_type(c); + } + return traits_type::eof(); + } + virtual int sync() override + { + // Wait for the last write to complete, propagating any exceptions + lastwrite.get(); + size_t thisbuffer=pptr()-pbase(); + if(thisbuffer > 0) + { + // Detach the current buffer and replace with a fresh one to allow the kernel to steal the page + lastbuffer=std::move(buffer); + buffer.resize(lastbuffer.size()); + setp(buffer.data(), buffer.data() + buffer.size()); + // Schedule an extension of physical storage by an extra page + lastwrite = afio::async_truncate(lastwrite, offset + thisbuffer); + // Schedule an asynchronous write of the buffer to storage + lastwrite=afio::async_write(lastwrite, lastbuffer.data(), thisbuffer, offset); + offset+=thisbuffer; + } + return 0; + } + }; + std::unique_ptr<directstreambuf> buf; + odirectstream(afio::handle_ptr h) : std::ostream(new directstreambuf(std::move(h))), buf(static_cast<directstreambuf *>(rdbuf())) + { + } + virtual ~odirectstream() override + { + // Reset the stream before deleting the buffer + rdbuf(nullptr); + } +}; +//] + +//[workshop_naive_afio1] +namespace asio = BOOST_AFIO_V2_NAMESPACE::asio; +using BOOST_OUTCOME_V1_NAMESPACE::empty; +using BOOST_AFIO_V2_NAMESPACE::error_code; +using BOOST_AFIO_V2_NAMESPACE::generic_category; + +static bool is_valid_name(const std::string &name) noexcept +{ + static const std::string banned("<>:\"/\\|?*\0", 10); + if(std::string::npos!=name.find_first_of(banned)) + return false; + // No leading period + return name[0]!='.'; +} + +data_store::data_store(size_t flags, afio::path path) +{ + // Make a dispatcher for the local filesystem URI, masking out write flags on all operations if not writeable + _dispatcher=afio::make_dispatcher("file:///", afio::file_flags::none, !(flags & writeable) ? afio::file_flags::write : afio::file_flags::none).get(); + // Set the dispatcher for this thread, and open a handle to the store directory + afio::current_dispatcher_guard h(_dispatcher); + _store=afio::dir(std::move(path), afio::file_flags::create); // throws if there was an error +} + +outcome<data_store::istream> data_store::lookup(std::string name) noexcept +{ + if(!is_valid_name(name)) + return error_code(EINVAL, generic_category()); + try + { + error_code ec; + // Open the file using the handle to the store directory as the base. + // The store directory can be freely renamed by any third party process + // and everything here will work perfectly. + afio::handle_ptr h(afio::file(ec, _store, name, afio::file_flags::read)); + if(ec) + { + // If the file was not found, return empty else the error + if(ec==error_code(ENOENT, generic_category())) + return empty; + return ec; + } + // Create an istream which directly uses the mapped file. + data_store::istream ret(std::make_shared<idirectstream>(ec, std::move(h))); + if(ec) + return ec; + return ret; + } + catch(...) + { + return std::current_exception(); + } +} + +outcome<data_store::ostream> data_store::write(std::string name) noexcept +{ + if(!is_valid_name(name)) + return error_code(EINVAL, generic_category()); + try + { + error_code ec; + // Open the file using the handle to the store directory as the base. + // The store directory can be freely renamed by any third party process + // and everything here will work perfectly. You could enable direct + // buffer writing - this sends 4Kb pages directly to the physical hardware + // bypassing the kernel file page cache, however this is not optimal if reads of + // the value are likely to occur soon. + afio::handle_ptr h(afio::file(ec, _store, name, afio::file_flags::create | afio::file_flags::write + /*| afio::file_flags::os_direct*/)); + if(ec) + return ec; + // Create an ostream which directly uses the mapped file. + return outcome<data_store::ostream>(std::make_shared<odirectstream>(std::move(h))); + } + catch (...) + { + return std::current_exception(); + } +} +//] + +int main(void) +{ + // To write a key-value item called "dog" + { + data_store ds; + auto dogh = ds.write("dog").get(); + auto &dogs = *dogh; + dogs << "I am a dog"; + } + + // To retrieve a key-value item called "dog" + { + data_store ds; + auto dogh = ds.lookup("dog"); + if (dogh.empty()) + std::cerr << "No item called dog" << std::endl; + else if(dogh.has_error()) + std::cerr << "ERROR: Looking up dog returned error " << dogh.get_error().message() << std::endl; + else if (dogh.has_exception()) + { + std::cerr << "FATAL: Looking up dog threw exception" << std::endl; + std::terminate(); + } + else + { + std::string buffer; + *dogh.get() >> buffer; + std::cout << "Item dog has value " << buffer << std::endl; + } + } + return 0; +} diff --git a/attic/example/workshop_naive_async_afio.cpp b/attic/example/workshop_naive_async_afio.cpp new file mode 100644 index 00000000..86cb8052 --- /dev/null +++ b/attic/example/workshop_naive_async_afio.cpp @@ -0,0 +1,2 @@ +#include "afio_pch.hpp" +#include "workshop_naive_async_afio.ipp" diff --git a/attic/example/workshop_naive_async_afio.ipp b/attic/example/workshop_naive_async_afio.ipp new file mode 100644 index 00000000..651d2a4e --- /dev/null +++ b/attic/example/workshop_naive_async_afio.ipp @@ -0,0 +1,272 @@ +//[workshop_naive_async_afio_interface +namespace afio = BOOST_AFIO_V2_NAMESPACE; +namespace filesystem = BOOST_AFIO_V2_NAMESPACE::filesystem; +using BOOST_OUTCOME_V1_NAMESPACE::lightweight_futures::shared_future; + +class data_store +{ + afio::dispatcher_ptr _dispatcher; + afio::handle_ptr _store; +public: + // Type used for read streams + using istream = std::shared_ptr<std::istream>; + // Type used for write streams + using ostream = std::shared_ptr<std::ostream>; + // Type used for lookup + using lookup_result_type = shared_future<istream>; + // Type used for write + using write_result_type = shared_future<ostream>; + + // Disposition flags + static constexpr size_t writeable = (1<<0); + + // Open a data store at path + data_store(size_t flags = 0, afio::path path = "store"); + + // Look up item named name for reading, returning an istream for the item + shared_future<istream> lookup(std::string name) noexcept; + // Look up item named name for writing, returning an ostream for that item + shared_future<ostream> write(std::string name) noexcept; +}; +//] + +//[workshop_naive_async_afio3] +namespace asio = BOOST_AFIO_V2_NAMESPACE::asio; +using BOOST_AFIO_V2_NAMESPACE::error_code; +using BOOST_AFIO_V2_NAMESPACE::generic_category; + +// A special allocator of highly efficient file i/o memory +using file_buffer_type = std::vector<char, afio::utils::page_allocator<char>>; + +// An iostream which reads directly from a memory mapped AFIO file +struct idirectstream : public std::istream +{ + struct directstreambuf : public std::streambuf + { + afio::handle_ptr h; // Holds the file open + std::shared_ptr<file_buffer_type> buffer; + afio::handle::mapped_file_ptr mfp; + // From a mmap + directstreambuf(afio::handle_ptr _h, afio::handle::mapped_file_ptr _mfp, size_t length) : h(std::move(_h)), mfp(std::move(_mfp)) + { + // Set the get buffer this streambuf is to use + setg((char *) mfp->addr, (char *) mfp->addr, (char *) mfp->addr + length); + } + // From a malloc + directstreambuf(afio::handle_ptr _h, std::shared_ptr<file_buffer_type> _buffer, size_t length) : h(std::move(_h)), buffer(std::move(_buffer)) + { + // Set the get buffer this streambuf is to use + setg(buffer->data(), buffer->data(), buffer->data() + length); + } + }; + std::unique_ptr<directstreambuf> buf; + template<class U> idirectstream(afio::handle_ptr h, U &&buffer, size_t length) : std::istream(new directstreambuf(std::move(h), std::forward<U>(buffer), length)), buf(static_cast<directstreambuf *>(rdbuf())) + { + } + virtual ~idirectstream() override + { + // Reset the stream before deleting the buffer + rdbuf(nullptr); + } +}; + +// An iostream which writes to an AFIO file in 4Kb pages +struct odirectstream : public std::ostream +{ + struct directstreambuf : public std::streambuf + { + using int_type = std::streambuf::int_type; + using traits_type = std::streambuf::traits_type; + afio::future<> lastwrite; // the last async write performed + afio::off_t offset; // offset of next write + file_buffer_type buffer; // a page size on this machine + file_buffer_type lastbuffer; + directstreambuf(afio::handle_ptr _h) : lastwrite(std::move(_h)), offset(0), buffer(afio::utils::page_sizes().front()) + { + // Set the put buffer this streambuf is to use + setp(buffer.data(), buffer.data() + buffer.size()); + } + virtual ~directstreambuf() override + { + try + { + // Flush buffers and wait until last write completes + // Schedule an asynchronous write of the buffer to storage + size_t thisbuffer = pptr() - pbase(); + if(thisbuffer) + lastwrite = afio::async_write(afio::async_truncate(lastwrite, offset+thisbuffer), buffer.data(), thisbuffer, offset); + lastwrite.get(); + } + catch(...) + { + } + } + virtual int_type overflow(int_type c) override + { + size_t thisbuffer=pptr()-pbase(); + if(thisbuffer>=buffer.size()) + sync(); + if(c!=traits_type::eof()) + { + *pptr()=(char)c; + pbump(1); + return traits_type::to_int_type(c); + } + return traits_type::eof(); + } + virtual int sync() override + { + // Wait for the last write to complete, propagating any exceptions + lastwrite.get(); + size_t thisbuffer=pptr()-pbase(); + if(thisbuffer > 0) + { + // Detach the current buffer and replace with a fresh one to allow the kernel to steal the page + lastbuffer=std::move(buffer); + buffer.resize(lastbuffer.size()); + setp(buffer.data(), buffer.data() + buffer.size()); + // Schedule an extension of physical storage by an extra page + lastwrite = afio::async_truncate(lastwrite, offset + thisbuffer); + // Schedule an asynchronous write of the buffer to storage + lastwrite=afio::async_write(lastwrite, lastbuffer.data(), thisbuffer, offset); + offset+=thisbuffer; + } + return 0; + } + }; + std::unique_ptr<directstreambuf> buf; + odirectstream(afio::handle_ptr h) : std::ostream(new directstreambuf(std::move(h))), buf(static_cast<directstreambuf *>(rdbuf())) + { + } + virtual ~odirectstream() override + { + // Reset the stream before deleting the buffer + rdbuf(nullptr); + } +}; +//] + +namespace asio = BOOST_AFIO_V2_NAMESPACE::asio; +using BOOST_AFIO_V2_NAMESPACE::error_code; +using BOOST_AFIO_V2_NAMESPACE::generic_category; + +static bool is_valid_name(const std::string &name) noexcept +{ + static const std::string banned("<>:\"/\\|?*\0", 10); + if(std::string::npos!=name.find_first_of(banned)) + return false; + // No leading period + return name[0]!='.'; +} + +data_store::data_store(size_t flags, afio::path path) +{ + // Make a dispatcher for the local filesystem URI, masking out write flags on all operations if not writeable + _dispatcher=afio::make_dispatcher("file:///", afio::file_flags::none, !(flags & writeable) ? afio::file_flags::write : afio::file_flags::none).get(); + // Set the dispatcher for this thread, and open a handle to the store directory + afio::current_dispatcher_guard h(_dispatcher); + _store=afio::dir(std::move(path), afio::file_flags::create); // throws if there was an error +} + +//[workshop_naive_async_afio2] +shared_future<data_store::istream> data_store::lookup(std::string name) noexcept +{ + if(!is_valid_name(name)) + return error_code(EINVAL, generic_category()); + try + { + // Schedule the opening of the file for reading + afio::future<> h(afio::async_file(_store, name, afio::file_flags::read)); + // When it completes, call this continuation + return h.then([](afio::future<> &_h) -> shared_future<data_store::istream> { + // If file didn't open, return the error or exception immediately + BOOST_OUTCOME_PROPAGATE(_h); + size_t length=(size_t) _h->lstat(afio::metadata_flags::size).st_size; + // Is a memory map more appropriate? + if(length>=128*1024) + { + afio::handle::mapped_file_ptr mfp; + if((mfp=_h->map_file())) + { + data_store::istream ret(std::make_shared<idirectstream>(_h.get_handle(), std::move(mfp), length)); + return ret; + } + } + // Schedule the reading of the file into a buffer + auto buffer=std::make_shared<file_buffer_type>(length); + afio::future<> h(afio::async_read(_h, buffer->data(), length, 0)); + // When the read completes call this continuation + return h.then([buffer, length](const afio::future<> &h) -> shared_future<data_store::istream> { + // If read failed, return the error or exception immediately + BOOST_OUTCOME_PROPAGATE(h); + data_store::istream ret(std::make_shared<idirectstream>(h.get_handle(), buffer, length)); + return ret; + }); + }); + } + catch(...) + { + // Boost.Monad futures are also monads, so this implies a make_ready_future() + return std::current_exception(); + } +} +//] + +//[workshop_naive_async_afio1] +shared_future<data_store::ostream> data_store::write(std::string name) noexcept +{ + if(!is_valid_name(name)) + return error_code(EINVAL, generic_category()); + try + { + // Schedule the opening of the file for writing + afio::future<> h(afio::async_file(_store, name, afio::file_flags::create | afio::file_flags::write)); + // When it completes, call this continuation + return h.then([](const afio::future<> &h) -> shared_future<data_store::ostream> { + // If file didn't open, return the error or exception immediately + BOOST_OUTCOME_PROPAGATE(h); + // Create an ostream which directly uses the file. + data_store::ostream ret(std::make_shared<odirectstream>(h.get_handle())); + return std::move(ret); + }); + } + catch (...) + { + // Boost.Monad futures are also monads, so this implies a make_ready_future() + return std::current_exception(); + } +} +//] + +int main(void) +{ + // To write a key-value item called "dog" + { + data_store ds; + auto dogh = ds.write("dog").get(); + auto &dogs = *dogh; + dogs << "I am a dog"; + } + + // To retrieve a key-value item called "dog" + { + data_store ds; + auto dogh = ds.lookup("dog"); + if (dogh.empty()) + std::cerr << "No item called dog" << std::endl; + else if(dogh.has_error()) + std::cerr << "ERROR: Looking up dog returned error " << dogh.get_error().message() << std::endl; + else if (dogh.has_exception()) + { + std::cerr << "FATAL: Looking up dog threw exception" << std::endl; + std::terminate(); + } + else + { + std::string buffer; + *dogh.get() >> buffer; + std::cout << "Item dog has value " << buffer << std::endl; + } + } + return 0; +} |