diff options
author | Niall Douglas (s [underscore] sourceforge {at} nedprod [dot] com) <spamtrap@nedprod.com> | 2017-09-05 01:08:12 +0300 |
---|---|---|
committer | Niall Douglas (s [underscore] sourceforge {at} nedprod [dot] com) <spamtrap@nedprod.com> | 2017-09-05 01:08:12 +0300 |
commit | 0faf4c163d4126d2a2a68729f200e3dc77e2572a (patch) | |
tree | 6d2cef0a22e5ca0a2255f55b8125c6c5413265fb /programs | |
parent | cf5baa47479e00b0034caf5d863b8b4c5f25124e (diff) |
Implemented mmap based small file append. Insertion is now equally fast as fetching on Windows, suggesting that fetching is overwhelmingly slow now.
Diffstat (limited to 'programs')
-rw-r--r-- | programs/key-value-store/Readme.md | 9 | ||||
-rw-r--r-- | programs/key-value-store/include/key_value_store.hpp | 97 | ||||
-rw-r--r-- | programs/key-value-store/main.cpp | 34 |
3 files changed, 128 insertions, 12 deletions
diff --git a/programs/key-value-store/Readme.md b/programs/key-value-store/Readme.md index a52a5436..0bc82639 100644 --- a/programs/key-value-store/Readme.md +++ b/programs/key-value-store/Readme.md @@ -7,12 +7,17 @@ a store, and to test AFIO's design. Nobody should use this store for anything serious. ## Todo: -- [ ] Add sparse file creation on Windows to AFIO and see how the +- [x] Add sparse file creation on Windows to AFIO and see how the benchmarks fare. - [x] Add key-value deletion. - [x] Atomic append should issue gather buffers of `IOV_MAX` -- [ ] Optionally use mmaps to extend smallfile instead of atomic appends. +- [x] Optionally use mmaps to extend smallfile instead of atomic appends. Likely highly racy on Linux due to kernel bugs :) +- [ ] Use mmaps for all smallfiles + - Windows x64 provides 128Tb of address space + - Linux x64 provides 128Tb of address space + - On adoption of smallfile, would need to parse backwards from end to + find last used record. - [ ] Online free space consolidation (copy early still in use records into new small file, update index to use new small file) - [ ] Per 1Mb free space consolidated, punch hole diff --git a/programs/key-value-store/include/key_value_store.hpp b/programs/key-value-store/include/key_value_store.hpp index 3498f33f..6df08cef 100644 --- a/programs/key-value-store/include/key_value_store.hpp +++ b/programs/key-value-store/include/key_value_store.hpp @@ -210,7 +210,8 @@ namespace key_value_store auto smallfileclaimed = fh.value().try_lock(_indexinuseoffset, 1, true); if(smallfileclaimed) { - _mysmallfile = afio::file_handle::file(dir, name, afio::file_handle::mode::append, afio::file_handle::creation::open_existing, caching).value(); + _mysmallfile = afio::file_handle::file(dir, name, afio::file_handle::mode::write, afio::file_handle::creation::open_existing, caching).value(); + _mysmallfile.set_append_only(true).value(); _smallfileguard = std::move(smallfileclaimed).value(); _mysmallfileidx = n; _smallfiles.read.push_back(std::move(fh).value()); @@ -263,7 +264,7 @@ namespace key_value_store basic_key_value_store &operator=(const basic_key_value_store &) = delete; basic_key_value_store &operator=(basic_key_value_store &&) = delete; - basic_key_value_store(const afio::path_handle &dir, size_t hashtableentries, bool enable_integrity = false, afio::file_handle::mode mode = afio::file_handle::mode::write, afio::file_handle::caching caching = afio::file_handle::caching::temporary) + basic_key_value_store(const afio::path_handle &dir, size_t hashtableentries, bool enable_integrity = false, afio::file_handle::mode mode = afio::file_handle::mode::write, afio::file_handle::caching caching = afio::file_handle::caching::all) : _indexfile(afio::file_handle::file(dir, "index", mode, (mode == afio::file_handle::mode::write) ? afio::file_handle::creation::if_needed : afio::file_handle::creation::open_existing, caching).value()) { if(mode == afio::file_handle::mode::write) @@ -327,7 +328,7 @@ namespace key_value_store } } //! \overload - basic_key_value_store(const afio::path_view &dir, size_t hashtableentries, bool enable_integrity = false, afio::file_handle::mode mode = afio::file_handle::mode::write, afio::file_handle::caching caching = afio::file_handle::caching::temporary) + basic_key_value_store(const afio::path_view &dir, size_t hashtableentries, bool enable_integrity = false, afio::file_handle::mode mode = afio::file_handle::mode::write, afio::file_handle::caching caching = afio::file_handle::caching::all) : basic_key_value_store(afio::directory_handle::directory({}, dir, afio::directory_handle::mode::write, afio::directory_handle::creation::if_needed).value(), hashtableentries, enable_integrity, mode, caching) { } @@ -371,7 +372,22 @@ namespace key_value_store data corruption, denial of service and root privilege exploits have been found from the unexpected interactions between memory maps and a moving end of file. */ - void use_mmaps_for_commit(bool v) noexcept { _use_mmaps_for_commit = v; } + void use_mmaps_for_commit(bool v) + { + if(_use_mmaps_for_commit == v) + { + return; + } + if(v && !_use_mmaps_for_commit) + { + _mysmallfile.set_append_only(false).value(); + } + else + { + _mysmallfile.set_append_only(true).value(); + } + _use_mmaps_for_commit = v; + } //! Retrieve when keys were last updated by setting the second to the latest transaction counter. //! Note that counter will be `(uint64_t)-1` for any unknown keys. Never throws exceptions. @@ -707,8 +723,79 @@ namespace key_value_store this_transaction_counter = _.this_transaction_counter; } - // Gather append write all my items to my smallfile + bool items_written = false; + if(_parent->_use_mmaps_for_commit) + { + afio::file_handle::extent_type original_length = _parent->_mysmallfile.length().value(); + // How big does this map need to be? + size_t totalcommitsize = 0; + for(size_t n = 0; n < _items.size(); n++) + { + toupdate_type &thisupdate = toupdate[n]; + const transaction::_item &item = _items[n]; + totalcommitsize += thisupdate.removal ? 64 : _parent->_pad_length(item.towrite->size()); + } + if(totalcommitsize >= 4096) + { + { +#ifdef _WIN32 + afio::file_handle::extent_type mapbegin = original_length & ~65535; +#else + afio::file_handle::extent_type mapbegin = afio::utils::round_down_to_page_size(original_length); +#endif + afio::file_handle::extent_type mapend = afio::utils::round_up_to_page_size(original_length + totalcommitsize); + _parent->_mysmallfile.truncate(mapend).value(); + auto sh = afio::section_handle::section(_parent->_mysmallfile, mapend).value(); + auto mh = afio::map_handle::map(sh, mapend - mapbegin, mapbegin).value(); + char *value = mh.address() + (original_length - mapbegin); + afio::file_handle::extent_type value_offset = original_length; + for(size_t n = 0; n < _items.size(); n++) + { + toupdate_type &thisupdate = toupdate[n]; + const transaction::_item &item = _items[n]; + size_t totalwrite = 0; + if(thisupdate.removal) + { + totalwrite = 64; + } + else + { + memcpy(value, item.towrite->data(), item.towrite->size()); + totalwrite = _parent->_pad_length(item.towrite->size()); + } + index::value_tail *vt = reinterpret_cast<index::value_tail *>(value + totalwrite - sizeof(index::value_tail)); + vt->key = thisupdate.key; + vt->transaction_counter = this_transaction_counter; + if(thisupdate.removal) + { + vt->length = (uint64_t) -1; // this key is being deleted + memset(&thisupdate.history_item, 0, sizeof(thisupdate.history_item)); + } + else + { + vt->length = item.towrite->size(); + index::value_history::item &history_item = thisupdate.history_item; + history_item.transaction_counter = this_transaction_counter; + history_item.value_offset = (value_offset + totalwrite) / 64; + history_item.value_identifier = _parent->_mysmallfileidx; + history_item.length = vt->length; + } + if(_parent->_indexheader->contents_hashed) + { + vt->hash = QUICKCPPLIB_NAMESPACE::algorithm::hash::fast_hash::hash(value, totalwrite); + } + value += totalwrite; + value_offset += totalwrite; + } + } + // Place maximum extent back where it is supposed to be + _parent->_mysmallfile.truncate(original_length + totalcommitsize).value(); + items_written = true; + } + } + if(!items_written) { + // Gather append write all my items to my smallfile afio::file_handle::extent_type value_offset = _parent->_mysmallfile.length().value(); assert((value_offset % 64) == 0); // POSIX guarantees that at least 16 gather buffers can be written in a single shot diff --git a/programs/key-value-store/main.cpp b/programs/key-value-store/main.cpp index a03059a3..aea2f8f5 100644 --- a/programs/key-value-store/main.cpp +++ b/programs/key-value-store/main.cpp @@ -57,6 +57,13 @@ void benchmark(key_value_store::basic_key_value_store &store, const char *desc) auto diff = std::chrono::duration_cast<std::chrono::milliseconds>(end - begin).count(); std::cout << " Inserted at " << (1000000000ULL / diff) << " items per sec" << std::endl; } +#if 0 + { + auto begin = std::chrono::high_resolution_clock::now(); + while(std::chrono::duration_cast<std::chrono::seconds>(std::chrono::high_resolution_clock::now() - begin).count() < 5) + ; + } +#endif std::cout << " Retrieving 1M key-value pairs ..." << std::endl; { auto begin = std::chrono::high_resolution_clock::now(); @@ -158,7 +165,7 @@ int main() } { key_value_store::basic_key_value_store store("teststore", 2000000); - benchmark(store, "no integrity, no durability"); + benchmark(store, "no integrity, no durability, commit appends"); } { std::error_code ec; @@ -166,18 +173,35 @@ int main() } { key_value_store::basic_key_value_store store("teststore", 2000000, true); - benchmark(store, "integrity, no durability"); + benchmark(store, "integrity, no durability, commit appends"); + } + { + std::error_code ec; + AFIO_V2_NAMESPACE::filesystem::remove_all("teststore", ec); + } + { + key_value_store::basic_key_value_store store("teststore", 2000000); + store.use_mmaps_for_commit(true); + benchmark(store, "no integrity, no durability, commit mmaps"); + } + { + std::error_code ec; + AFIO_V2_NAMESPACE::filesystem::remove_all("teststore", ec); + } + { + key_value_store::basic_key_value_store store("teststore", 2000000, true); + store.use_mmaps_for_commit(true); + benchmark(store, "integrity, no durability, commit mmaps"); } -#if 0 { std::error_code ec; AFIO_V2_NAMESPACE::filesystem::remove_all("teststore", ec); } { key_value_store::basic_key_value_store store("teststore", 2000000, true, AFIO_V2_NAMESPACE::file_handle::mode::write, AFIO_V2_NAMESPACE::file_handle::caching::reads); - benchmark(store, "integrity, durability"); + store.use_mmaps_for_commit(true); + benchmark(store, "integrity, durability, commit mmaps"); } -#endif } catch(const std::exception &e) { |