diff options
author | Niall Douglas (s [underscore] sourceforge {at} nedprod [dot] com) <spamtrap@nedprod.com> | 2017-09-08 03:00:21 +0300 |
---|---|---|
committer | Niall Douglas (s [underscore] sourceforge {at} nedprod [dot] com) <spamtrap@nedprod.com> | 2017-09-08 03:00:21 +0300 |
commit | ce87856169cbbb29482cdc6e0e48d9cdbb04a6b8 (patch) | |
tree | 236f624afcff29dadc1046e0b445de5a334c21b9 /programs | |
parent | fec616c9c6f53b77b53b8852c51bbe5af11b82b8 (diff) |
Made a start on mapped_file_handle
Diffstat (limited to 'programs')
-rw-r--r-- | programs/key-value-store/include/key_value_store.hpp | 87 | ||||
-rw-r--r-- | programs/key-value-store/main.cpp | 21 |
2 files changed, 94 insertions, 14 deletions
diff --git a/programs/key-value-store/include/key_value_store.hpp b/programs/key-value-store/include/key_value_store.hpp index a4283b05..0150bab3 100644 --- a/programs/key-value-store/include/key_value_store.hpp +++ b/programs/key-value-store/include/key_value_store.hpp @@ -161,18 +161,22 @@ namespace key_value_store friend class transaction; afio::file_handle _indexfile; afio::file_handle _mysmallfile; // append only + afio::map_handle _mysmallfilemapped; afio::file_handle::extent_guard _indexfileguard, _smallfileguard; size_t _mysmallfileidx{(size_t) -1}; struct { std::vector<afio::file_handle> read; + std::vector<afio::section_handle> section; + std::vector<afio::map_handle> map; } _smallfiles; optional<index::open_hash_index> _index; index::index *_indexheader{nullptr}; std::mutex _commitlock; bool _use_mmaps_for_commit{false}; + size_t _mmap_over_extension{0}; - static constexpr afio::file_handle::extent_type _indexinuseoffset = (afio::file_handle::extent_type) -1; + static constexpr afio::file_handle::extent_type _indexinuseoffset = INT64_MAX; static constexpr uint64_t _goodmagic = 0x3130564b4f494641; // "AFIOKV01" static constexpr uint64_t _badmagic = 0x3130564b44414544; // "DEADKV01" @@ -221,6 +225,10 @@ namespace key_value_store } if(!claimed) { +#ifndef _WIN32 + // We really need this to only have read only perms, otherwise any mmaps will extend the file ludicrously + fh = afio::file_handle::file(dir, name, afio::file_handle::mode::read, afio::file_handle::creation::open_existing, afio::file_handle::caching::all, afio::file_handle::flag::disable_prefetching); +#endif _smallfiles.read.push_back(std::move(fh).value()); } continue; @@ -228,9 +236,10 @@ namespace key_value_store else if(mode == afio::file_handle::mode::write && !_mysmallfile.is_valid()) { // Going to need a new smallfile - fh = afio::file_handle::file(dir, name, smallfilemode, afio::file_handle::creation::only_if_not_exist, caching); + fh = afio::file_handle::file(dir, name, afio::file_handle::mode::write, afio::file_handle::creation::only_if_not_exist, caching); if(fh) { + fh.value().truncate(64).value(); goto retry; } continue; @@ -360,7 +369,7 @@ namespace key_value_store Normally, this implementation atomic-appends small objects to the smallfile via gather write. This is simple, safe, and with reasonable performance most of the time. - However in certain circumstances e.g. with synchronous i/o enabled, atomic-append forces lots of + However in certain circumstances e.g. with synchronous i/o enabled, atomic-append can cause lots of read-modify-write cycles on the storage device. This can become unusably slow if the values you write are small, or your OS doesn't implement gather writes for file i/o (e.g. Windows). For these circumstances, one can instead use a memory map of the end of the smallfile to append @@ -388,6 +397,37 @@ namespace key_value_store } _use_mmaps_for_commit = v; } + /*! \brief Sets whether to use mmaps for fetches. + + Requires lots of virtual address space as the entire + of all the small files is mapped into memory with additional `overextension`. Also requires a kernel + page cache implementation which correctly updates appends to the smallfile into the mapped view + without `msync(MS_INVALIDATE)`. + */ + void use_mmaps_for_fetch(size_t overextension = 1024ULL * 1024 * 1024) + { + if(_mmap_over_extension != 0) + return; + _smallfiles.section.reserve(_smallfiles.read.size()); + _smallfiles.map.reserve(_smallfiles.read.size()); + for(size_t n = 0; n < _smallfiles.read.size(); n++) + { + auto currentlength = _smallfiles.read[n].length().value(); + _smallfiles.section.push_back(afio::section_handle::section(_smallfiles.read[n], currentlength, +#ifdef _WIN32 + // Yes this is confusing. But for some reason, Windows won't permit overextended views on read only sections. + // And somehow or other, Windows permits read/write sections on read only files. Which makes zero sense. + afio::section_handle::flag::readwrite +#else + afio::section_handle::flag::read +#endif + ) + .value()); + // The nocommit allows us to reserve all the address space now, and to fill in mapped data later as the file extends + _smallfiles.map.push_back(afio::map_handle::map(_smallfiles.section.back(), currentlength + overextension, 0, afio::section_handle::flag::nocommit | afio::section_handle::flag::read).value()); + } + _mmap_over_extension = overextension; + } //! Retrieve when keys were last updated by setting the second to the latest transaction counter. //! Note that counter will be `(uint64_t)-1` for any unknown keys. Never throws exceptions. @@ -418,7 +458,7 @@ namespace key_value_store //! When this value was last modified uint64_t transaction_counter; - keyvalue_info(keyvalue_info &&o) noexcept : key(std::move(o.key)), value(std::move(o.value)), transaction_counter(std::move(o.transaction_counter)), _value_buffer(std::move(o._value_buffer)), _value_view(std::move(o._value_view)) { o._value_buffer = nullptr; } + keyvalue_info(keyvalue_info &&o) noexcept : key(std::move(o.key)), value(std::move(o.value)), transaction_counter(std::move(o.transaction_counter)), _value_buffer(std::move(o._value_buffer)) { o._value_buffer = nullptr; } keyvalue_info &operator=(keyvalue_info &&o) noexcept { this->~keyvalue_info(); @@ -447,15 +487,14 @@ namespace key_value_store , transaction_counter((uint64_t) -1) { } - keyvalue_info(key_type _key, span<char> buffer, uint64_t tc) + keyvalue_info(key_type _key, span<char> buffer, bool free_on_destruct, uint64_t tc) : key(_key) , value(buffer) , transaction_counter(tc) - , _value_buffer(buffer.data()) + , _value_buffer(free_on_destruct ? buffer.data() : nullptr) { } char *_value_buffer{nullptr}; - afio::algorithm::mapped_view<const char> _value_view; }; //! Retrieve the latest value for a key. May throw `corrupted_store` keyvalue_info find(key_type key, size_t revision = 0) @@ -480,17 +519,37 @@ namespace key_value_store return keyvalue_info(key); } size_t length = item.length, smallfilelength = _pad_length(length); - char *buffer = (char *) malloc(smallfilelength); - if(!buffer) - { - throw std::bad_alloc(); - } if(item.value_identifier >= _smallfiles.read.size()) { // TODO: Open newly created smallfiles abort(); } - _smallfiles.read[item.value_identifier].read(item.value_offset * 64 - smallfilelength, buffer, smallfilelength).value(); + char *buffer; + bool free_on_destruct = _smallfiles.map.empty() || !_smallfiles.map[item.value_identifier].is_valid(); + if(!free_on_destruct) + { + if(item.value_offset * 64 > _smallfiles.section[item.value_identifier].length()) + { + auto oldsize = _smallfiles.section[item.value_identifier].length(); + auto newsize = _smallfiles.read[item.value_identifier].length().value(); + // Resize the memory section to the current size of the file + _smallfiles.section[item.value_identifier].truncate(newsize).value(); + // Commit the newly mapped pages + afio::map_handle::buffer_type bt{_smallfiles.map[item.value_identifier].address() + oldsize, newsize - oldsize}; + bt.data = afio::utils::round_up_to_page_size(bt.data); + _smallfiles.map[item.value_identifier].commit(bt, afio::section_handle::flag::read).value(); + } + buffer = _smallfiles.map[item.value_identifier].address() + item.value_offset * 64 - smallfilelength; + } + else + { + buffer = (char *) malloc(smallfilelength); + if(!buffer) + { + throw std::bad_alloc(); + } + _smallfiles.read[item.value_identifier].read(item.value_offset * 64 - smallfilelength, buffer, smallfilelength).value(); + } index::value_tail *vt = reinterpret_cast<index::value_tail *>(buffer + smallfilelength - sizeof(index::value_tail)); if(_indexheader->contents_hashed || _indexheader->key_is_hash_of_value) { @@ -519,7 +578,7 @@ namespace key_value_store _indexheader->magic = _badmagic; throw corrupted_store(); } - return keyvalue_info(key, span<char>(buffer, length), item.transaction_counter); + return keyvalue_info(key, span<char>(buffer, length), free_on_destruct, item.transaction_counter); } } }; diff --git a/programs/key-value-store/main.cpp b/programs/key-value-store/main.cpp index a6349453..26eec4bc 100644 --- a/programs/key-value-store/main.cpp +++ b/programs/key-value-store/main.cpp @@ -227,6 +227,7 @@ int main() } { key_value_store::basic_key_value_store store("teststore", 2000000); + store.use_mmaps_for_fetch(); benchmark(store, "no integrity, no durability, commit appends"); } { @@ -260,6 +261,26 @@ int main() AFIO_V2_NAMESPACE::filesystem::remove_all("teststore", ec); } { + key_value_store::basic_key_value_store store("teststore", 2000000); + store.use_mmaps_for_commit(true); + store.use_mmaps_for_fetch(); + benchmark(store, "no integrity, no durability, commit mmaps, fetch mmaps"); + } + { + std::error_code ec; + AFIO_V2_NAMESPACE::filesystem::remove_all("teststore", ec); + } + { + key_value_store::basic_key_value_store store("teststore", 2000000, true); + store.use_mmaps_for_commit(true); + store.use_mmaps_for_fetch(); + benchmark(store, "integrity, no durability, commit mmaps, fetch mmaps"); + } + { + std::error_code ec; + AFIO_V2_NAMESPACE::filesystem::remove_all("teststore", ec); + } + { key_value_store::basic_key_value_store store("teststore", 2000000, true, AFIO_V2_NAMESPACE::file_handle::mode::write, AFIO_V2_NAMESPACE::file_handle::caching::reads); store.use_mmaps_for_commit(true); benchmark(store, "integrity, durability, commit mmaps"); |