diff options
Diffstat (limited to 'programs/key-value-store/include/key_value_store.hpp')
-rw-r--r-- | programs/key-value-store/include/key_value_store.hpp | 127 |
1 files changed, 46 insertions, 81 deletions
diff --git a/programs/key-value-store/include/key_value_store.hpp b/programs/key-value-store/include/key_value_store.hpp index 20c54635..2035fd48 100644 --- a/programs/key-value-store/include/key_value_store.hpp +++ b/programs/key-value-store/include/key_value_store.hpp @@ -171,7 +171,6 @@ namespace key_value_store optional<index::open_hash_index> _index; index::index *_indexheader{nullptr}; std::mutex _commitlock; - bool _use_mmaps_for_commit{false}; size_t _mmap_over_extension{0}; static constexpr afio::file_handle::extent_type _indexinuseoffset = INT64_MAX; @@ -362,42 +361,14 @@ namespace key_value_store } } - /*! \brief Sets whether to use mmaps for writing small objects during commit. - - Normally, this implementation atomic-appends small objects to the smallfile via gather write. - This is simple, safe, and with reasonable performance most of the time. - - However in certain circumstances e.g. with synchronous i/o enabled, atomic-append can cause lots of - read-modify-write cycles on the storage device. This can become unusably slow if the values you - write are small, or your OS doesn't implement gather writes for file i/o (e.g. Windows). For - these circumstances, one can instead use a memory map of the end of the smallfile to append - the small objects. This can cause the kernel to not flush the map to storage until the map is - destroyed, but it also avoids the read-modify-write cycle with synchronous i/o. - */ - void use_mmaps_for_commit(bool v) - { - if(_use_mmaps_for_commit == v) - { - return; - } - if(v && !_use_mmaps_for_commit) - { - _mysmallfile.set_append_only(false).value(); - } - else - { - _mysmallfile.set_append_only(true).value(); - } - _use_mmaps_for_commit = v; - } - /*! \brief Sets whether to use mmaps for fetches. + /*! \brief Sets whether to use mmaps for fetches and appends. Requires lots of virtual address space as the entire of all the small files is mapped into memory with additional `overextension`. Also requires a kernel page cache implementation which correctly updates appends to the smallfile into the mapped view without `msync(MS_INVALIDATE)`. */ - void use_mmaps_for_fetch(size_t overextension = 1024ULL * 1024 * 1024) + void use_mmaps(size_t overextension = 1024ULL * 1024 * 1024) { if(_mmap_over_extension != 0) return; @@ -766,7 +737,7 @@ namespace key_value_store } bool items_written = false; - if(_parent->_use_mmaps_for_commit) + if(!_parent->_smallfiles.mapped.empty()) { afio::file_handle::extent_type original_length = _parent->_mysmallfile.length().value(); // How big does this map need to be? @@ -779,59 +750,53 @@ namespace key_value_store } if(totalcommitsize >= 4096) { + auto &mfh = _parent->_smallfiles.mapped[_parent->_mysmallfileidx]; + afio::file_handle::extent_type new_length = original_length + totalcommitsize; + if(new_length > mfh.capacity()) { -#ifdef _WIN32 - afio::file_handle::extent_type mapbegin = original_length & ~65535; -#else - afio::file_handle::extent_type mapbegin = afio::utils::round_down_to_page_size(original_length); -#endif - afio::file_handle::extent_type mapend = afio::utils::round_up_to_page_size(original_length + totalcommitsize); - _parent->_mysmallfile.truncate(mapend).value(); - auto sh = afio::section_handle::section(_parent->_mysmallfile, mapend).value(); - auto mh = afio::map_handle::map(sh, mapend - mapbegin, mapbegin).value(); - char *value = mh.address() + (original_length - mapbegin); - afio::file_handle::extent_type value_offset = original_length; - for(size_t n = 0; n < _items.size(); n++) + mfh.reserve(new_length + _parent->_mmap_over_extension).value(); + } + mfh.truncate(new_length).value(); + char *value = mfh.address() + original_length; + afio::file_handle::extent_type value_offset = original_length; + for(size_t n = 0; n < _items.size(); n++) + { + toupdate_type &thisupdate = toupdate[n]; + const transaction::_item &item = _items[n]; + size_t totalwrite = 0; + if(thisupdate.removal) { - toupdate_type &thisupdate = toupdate[n]; - const transaction::_item &item = _items[n]; - size_t totalwrite = 0; - if(thisupdate.removal) - { - totalwrite = 64; - } - else - { - memcpy(value, item.towrite->data(), item.towrite->size()); - totalwrite = _parent->_pad_length(item.towrite->size()); - } - index::value_tail *vt = reinterpret_cast<index::value_tail *>(value + totalwrite - sizeof(index::value_tail)); - vt->key = thisupdate.key; - vt->transaction_counter = this_transaction_counter; - if(thisupdate.removal) - { - vt->length = (uint64_t) -1; // this key is being deleted - memset(&thisupdate.history_item, 0, sizeof(thisupdate.history_item)); - } - else - { - vt->length = item.towrite->size(); - index::value_history::item &history_item = thisupdate.history_item; - history_item.transaction_counter = this_transaction_counter; - history_item.value_offset = (value_offset + totalwrite) / 64; - history_item.value_identifier = _parent->_mysmallfileidx; - history_item.length = vt->length; - } - if(_parent->_indexheader->contents_hashed) - { - vt->hash = QUICKCPPLIB_NAMESPACE::algorithm::hash::fast_hash::hash(value, totalwrite); - } - value += totalwrite; - value_offset += totalwrite; + totalwrite = 64; + } + else + { + memcpy(value, item.towrite->data(), item.towrite->size()); + totalwrite = _parent->_pad_length(item.towrite->size()); + } + index::value_tail *vt = reinterpret_cast<index::value_tail *>(value + totalwrite - sizeof(index::value_tail)); + vt->key = thisupdate.key; + vt->transaction_counter = this_transaction_counter; + if(thisupdate.removal) + { + vt->length = (uint64_t) -1; // this key is being deleted + memset(&thisupdate.history_item, 0, sizeof(thisupdate.history_item)); + } + else + { + vt->length = item.towrite->size(); + index::value_history::item &history_item = thisupdate.history_item; + history_item.transaction_counter = this_transaction_counter; + history_item.value_offset = (value_offset + totalwrite) / 64; + history_item.value_identifier = _parent->_mysmallfileidx; + history_item.length = vt->length; + } + if(_parent->_indexheader->contents_hashed) + { + vt->hash = QUICKCPPLIB_NAMESPACE::algorithm::hash::fast_hash::hash(value, totalwrite); } + value += totalwrite; + value_offset += totalwrite; } - // Place maximum extent back where it is supposed to be - _parent->_mysmallfile.truncate(original_length + totalcommitsize).value(); items_written = true; } } |