diff options
author | Niall Douglas (s [underscore] sourceforge {at} nedprod [dot] com) <spamtrap@nedprod.com> | 2017-09-03 04:35:13 +0300 |
---|---|---|
committer | Niall Douglas (s [underscore] sourceforge {at} nedprod [dot] com) <spamtrap@nedprod.com> | 2017-09-03 04:35:13 +0300 |
commit | f15b8cec3026ad3cae3e0aa2f8390d3300305a75 (patch) | |
tree | aa2a0159d008efbea9a0c291d5dde00cc63cc58b /programs | |
parent | 2b8fdf95e2013295b799d7ee198a1cbc067d9e36 (diff) |
Implemented key deletion into the toy key-value store.
Actually handle errors properly now, correctly rolling back and abandoning any transaction.
Diffstat (limited to 'programs')
-rw-r--r-- | programs/key-value-store/Readme.md | 29 | ||||
-rw-r--r-- | programs/key-value-store/include/key_value_store.hpp | 344 | ||||
-rw-r--r-- | programs/key-value-store/main.cpp | 105 |
3 files changed, 401 insertions, 77 deletions
diff --git a/programs/key-value-store/Readme.md b/programs/key-value-store/Readme.md index 9fd91549..e10eecd5 100644 --- a/programs/key-value-store/Readme.md +++ b/programs/key-value-store/Readme.md @@ -5,3 +5,32 @@ update as an atomic transaction up to 65,535 key-values at once. It is purely to test the feasibility of one approach to implementing such a store, and to test AFIO's design. Nobody should use this store for anything serious. + +## Todo: +- [ ] Add sparse file creation on Windows to AFIO and see how the +benchmarks fare. +- [x] Add key-value deletion. +- [ ] Atomic append should issue gather buffers of `IOV_MAX` +- [ ] Optionally use mmaps to extend smallfile instead of atomic appends. +Likely highly racy on Linux due to kernel bugs :) +- [ ] Online free space consolidation (copy early still in use records +into new small file, update index to use new small file) + - [ ] Per 1Mb free space consolidated, punch hole +- [ ] Need some way of detecting and breaking sudden process exit during +index update. + +## Benchmarks: +- 1Kb values Windows with NTFS, no integrity, no durability: + ``` + Inserting 1M key-value pairs ... + Inserted at 125596 items per sec + Retrieving 1M key-value pairs ... + Fetched at 524934 items per sec + ``` +- 64 byte values Windows with NTFS, no integrity, no durability: + ``` + Inserting 1M key-value pairs ... + Inserted at 247402 items per sec + Retrieving 1M key-value pairs ... + Fetched at 700770 items per sec + ```
\ No newline at end of file diff --git a/programs/key-value-store/include/key_value_store.hpp b/programs/key-value-store/include/key_value_store.hpp index d964f17d..a5588e0a 100644 --- a/programs/key-value-store/include/key_value_store.hpp +++ b/programs/key-value-store/include/key_value_store.hpp @@ -36,6 +36,7 @@ namespace key_value_store namespace afio = AFIO_V2_NAMESPACE; template <class T> using optional = afio::optional<T>; template <class T> using span = afio::span<T>; + using afio::undoer; using uint128 = QUICKCPPLIB_NAMESPACE::integers128::uint128; using key_type = uint128; @@ -79,6 +80,14 @@ namespace key_value_store { } }; + class index_full : std::runtime_error + { + public: + index_full() + : std::runtime_error("Failed to insert new key due to too many collisions, use a bigger index!") + { + } + }; class transaction_aborted : std::runtime_error { key_type _key; @@ -115,15 +124,19 @@ namespace key_value_store - uint128 key 16 bytes - value_history 104 bytes */ - using open_hash_index = basic_open_hash_index<atomic_linear_memory_policy<key_type, value_history, 1>, AFIO_V2_NAMESPACE::algorithm::mapped_view>; + using open_hash_index = basic_open_hash_index<atomic_linear_memory_policy<key_type, value_history, 0>, AFIO_V2_NAMESPACE::algorithm::mapped_view>; static_assert(sizeof(open_hash_index::value_type) == 128, "open_hash_index::value_type is wrong size"); struct index { uint64_t magic; // versionmagic, currently "AFIOKV01" for valid, "DEADKV01" for requires repair std::atomic<uint64_t> transaction_counter; // top 16 bits are number of keys changed this transaction, bottom 48 bits are monotonic counter + uint128 hash; // Optional hash of index file written on last close to guard against systems which don't write mmaps properly std::atomic<unsigned> writes_occurring[48]; // Incremented just before an update, decremented after, per writer std::atomic<bool> all_writes_synced; // Set if all writers since the first which has opened this store did so with `O_SYNC` on (i.e. safe during fsck to check small file tails only) + + uint64_t contents_hashed : 1; // If records written are hashed and checked on fetch + uint64_t key_is_hash_of_value : 1; // On read, check hash of value equals key }; struct value_tail @@ -131,7 +144,7 @@ namespace key_value_store uint128 hash; // 128 bit hash of contents key_type key; uint64_t transaction_counter; // transaction counter when this was updated - uint64_t length; + uint64_t length; // (uint64_t)-1 means key was deleted }; static_assert(sizeof(value_tail) == 48, "value_tail is wrong size"); } @@ -216,8 +229,8 @@ namespace key_value_store { throw maximum_writers_reached(); } - // Set up the index - afio::section_handle sh = afio::section_handle::section(_indexfile).value(); + // Set up the index, either r/w or read only with copy on write + afio::section_handle sh = afio::section_handle::section(_indexfile, 0, (mode == afio::file_handle::mode::write) ? afio::section_handle::flag::readwrite : (afio::section_handle::flag::read | afio::section_handle::flag::cow)).value(); afio::file_handle::extent_type len = sh.length(); len -= sizeof(index::index); len /= sizeof(index::open_hash_index::value_type); @@ -260,16 +273,20 @@ namespace key_value_store /* TODO: Check consistency of index by checking that every item's transaction counter is within 2^47 of head's and that no item has a transaction counter later than head's. - Check that every smallfile's tail points to a value record which matches one in the history in the index + Check that every smallfile's tail points to a complete set of value records which matches the one in the history in the index or that that key's latest value exists and is valid. */ //_openfiles(dir, writable); + if(_indexheader->contents_hashed) + { + } } // Reset writes_occurring and all_writes_synced index::index i; _indexfile.read(0, (char *) &i, sizeof(i)).value(); memset(i.writes_occurring, 0, sizeof(i.writes_occurring)); i.all_writes_synced = _indexfile.are_writes_durable(); + memset(&i.hash, 0, sizeof(i.hash)); _indexfile.write(0, (char *) &i, sizeof(i)).value(); } } @@ -285,12 +302,9 @@ namespace key_value_store } // Open our smallfiles and map our index for shared usage _openfiles(dir, mode, caching); - if(mode == afio::file_handle::mode::write) + if(!_indexfile.are_writes_durable()) { - if(!_indexfile.are_writes_durable()) - { - _indexheader->all_writes_synced = false; - } + _indexheader->all_writes_synced = false; } } //! \overload @@ -303,10 +317,27 @@ namespace key_value_store : basic_key_value_store(afio::path_handle::path(dir).value(), 0, afio::file_handle::mode::read) { } + ~basic_key_value_store() + { + // Release my smallfile + _smallfileguard.unlock(); + _mysmallfile.close().value(); + // Try to lock the index exclusively + _indexfileguard.unlock(); + auto indexfileguard = _indexfile.try_lock(_indexinuseoffset, 1, true); + if(indexfileguard) + { + // I am the last user + if(_indexheader->contents_hashed) + { + _indexheader->hash = QUICKCPPLIB_NAMESPACE::algorithm::hash::fast_hash::hash((char *) _indexheader, _indexfile.length().value()); + } + } + } //! Retrieve when keys were last updated by setting the second to the latest transaction counter. - //! Note that counter will be `(uint64_t)-1` for any unknown keys - void last_updated(span<std::pair<key_type, uint64_t>> keys) + //! Note that counter will be `(uint64_t)-1` for any unknown keys. Never throws exceptions. + void last_updated(span<std::pair<key_type, uint64_t>> keys) noexcept { for(auto &key : keys) { @@ -325,6 +356,7 @@ namespace key_value_store struct keyvalue_info { friend class basic_key_value_store; + friend class transaction; //! The key key_type key; //! The value @@ -371,20 +403,28 @@ namespace key_value_store char *_value_buffer{nullptr}; afio::algorithm::mapped_view<const char> _value_view; }; - //! Retrieve the latest value for a key + //! Retrieve the latest value for a key. May throw `corrupted_store` keyvalue_info find(key_type key, size_t revision = 0) { if(_indexheader->magic != goodmagic) throw corrupted_store(); + if(revision >= 4) + throw std::invalid_argument("valid revision is 0-3"); auto it = _index->find_shared(key); if(it == _index->end()) { + // No value as no key return keyvalue_info(key); } else { // TODO Depending on length, make a mapped_view instead const auto &item = it->second.history[revision]; + if(item.transaction_counter == 0) + { + // No value on the key at this revision + return keyvalue_info(key); + } size_t length = item.length, smallfilelength = _pad_length(length); char *buffer = (char *) malloc(smallfilelength); if(!buffer) @@ -393,23 +433,37 @@ namespace key_value_store } if(item.value_identifier >= _smallfiles.read.size()) { - // FIXME: Open newly created smallfiles + // TODO: Open newly created smallfiles abort(); } _smallfiles.read[item.value_identifier].read(item.value_offset * 64 - smallfilelength, buffer, smallfilelength).value(); index::value_tail *vt = reinterpret_cast<index::value_tail *>(buffer + smallfilelength - sizeof(index::value_tail)); - // TODO: Check hash equals contents if hashing enabled + if(_indexheader->contents_hashed || _indexheader->key_is_hash_of_value) + { + QUICKCPPLIB_NAMESPACE::algorithm::hash::fast_hash hasher; + uint128 tocheck = vt->hash; + memset(&vt->hash, 0, sizeof(vt->hash)); + uint128 thishash = QUICKCPPLIB_NAMESPACE::algorithm::hash::fast_hash::hash(buffer, _indexheader->contents_hashed ? smallfilelength : length); + if(tocheck != thishash) + { + _indexheader->magic = badmagic; + throw corrupted_store(); + } + } if(vt->key != key) { - abort(); + _indexheader->magic = badmagic; + throw corrupted_store(); } if(vt->length != length) { - abort(); + _indexheader->magic = badmagic; + throw corrupted_store(); } if(vt->transaction_counter != item.transaction_counter) { - abort(); + _indexheader->magic = badmagic; + throw corrupted_store(); } return keyvalue_info(key, span<char>(buffer, length), item.transaction_counter); } @@ -426,8 +480,10 @@ namespace key_value_store { basic_key_value_store::keyvalue_info kvi; // the item's value when fetched afio::optional<span<const char>> towrite; // the value to be written on commit + bool remove; // true if to remove _item(basic_key_value_store::keyvalue_info &&_kvi) : kvi(std::move(_kvi)) + , remove(false) { } }; @@ -461,7 +517,7 @@ namespace key_value_store _items.push_back(_parent->find(key)); return _items.back().kvi.value; } - //! Set what a value will be updated to on commit + //! Set what a value will be updated to on commit. Requires the key to have been previously fetched to establish a base revision. void update(key_type key, span<const char> towrite) { for(auto &i : _items) @@ -469,37 +525,125 @@ namespace key_value_store if(i.kvi.key == key) { i.towrite = towrite; + i.remove = false; return; } } throw bad_update(); } + //! Remove a key and its value on commit. Requires the key to have been previously fetched to establish a base revision. + void remove(key_type key) + { + for(auto &i : _items) + { + if(i.kvi.key == key) + { + i.towrite = {}; + i.remove = true; + return; + } + } + throw bad_update(); + } + //! Set what a value will be updated to on commit without establishing a base revision. Concurrent updates to this key may be lost! + void update_unsafe(key_type key, span<const char> towrite) + { + for(auto &i : _items) + { + if(i.kvi.key == key) + { + i.towrite = towrite; + i.remove = false; + return; + } + } + if(_items.size() == 65535) + { + throw transaction_limit_reached(); + } + basic_key_value_store::keyvalue_info kvi(key); + _items.push_back(std::move(kvi)); + _items.back().towrite = towrite; + } + //! Remove a key and its value on commit without establishing a base revision. Concurrent updates to this key may be lost! + void remove_unsafe(key_type key) + { + for(auto &i : _items) + { + if(i.kvi.key == key) + { + i.towrite = {}; + i.remove = true; + return; + } + } + if(_items.size() == 65535) + { + throw transaction_limit_reached(); + } + basic_key_value_store::keyvalue_info kvi(key); + _items.push_back(std::move(kvi)); + _items.back().remove = true; + } //! Commit the transaction, throwing `transaction_aborted` if a key's value was updated since it was fetched for this transaction. void commit() { if(_parent->_indexheader->magic != _parent->goodmagic) throw corrupted_store(); - // Firstly sort the list of keys we are to update into order. This ensures that all - // writers always lock the keys in the same order, thus preventing deadlock. + // Firstly remove any items fetched but not used as a base for an update, and sort the remaining + // list of keys we are to update into order. This ensures that all writers always lock the keys + // in the same order, thus preventing deadlock. + _items.erase(std::remove_if(_items.begin(), _items.end(), [](const auto &item) { return !item.towrite.has_value() && !item.remove; }), _items.end()); std::sort(_items.begin(), _items.end(), [](const _item &a, const _item &b) { return a.kvi.key < b.kvi.key; }); - // Take out shared locks on all the items in my commit, early checking if we will abort - std::vector<index::open_hash_index::const_iterator> its; - its.reserve(_items.size()); + // The update list, filled in as we progress + struct toupdate_type + { + const key_type key; + const uint64_t old_transaction_counter; + const bool insertion, update, removal; + index::value_history::item history_item{}; + index::open_hash_index::iterator it{}; + toupdate_type(key_type _key, uint64_t _old_transaction_counter, bool _insertion, bool _update, bool _removal) + : key(_key) + , old_transaction_counter(_old_transaction_counter) + , insertion(_insertion) + , update(_update) + , removal(_removal) + { + } + }; + std::vector<toupdate_type> toupdate; + toupdate.reserve(_items.size()); + + // Take out shared locks on all the items in my commit with existing values, early checking if we will abort + std::vector<index::open_hash_index::const_iterator> shared_locks; + shared_locks.reserve(_items.size()); for(const auto &item : _items) { - if(item.towrite.has_value()) + bool insertion = false, update = false, removal = false; + if(item.towrite.has_value() || item.remove) { - its.push_back(_parent->_index->find_shared(item.kvi.key)); - if(its.back() != _parent->_index->end()) + auto it = _parent->_index->find_shared(item.kvi.key); + if(it != _parent->_index->end()) { - if(its.back()->second.history[0].transaction_counter != item.kvi.transaction_counter) + // If item was fetched before update and it has since changed, abort + if(item.kvi.transaction_counter != (uint64_t) -1 && it->second.history[0].transaction_counter != item.kvi.transaction_counter) { throw transaction_aborted(item.kvi.key); } + shared_locks.push_back(std::move(it)); + removal = item.remove; + update = !item.remove; + } + else + { + insertion = true; } } + assert(insertion + update + removal == 1); + toupdate.emplace_back(item.kvi.key, item.kvi.transaction_counter, insertion, update, removal); } // Atomically increment the transaction counter to set this latest transaction uint64_t old_transaction_counter; @@ -524,66 +668,132 @@ namespace key_value_store memset(buffer, 0, sizeof(buffer)); index::value_tail *vt = reinterpret_cast<index::value_tail *>(buffer + sizeof(buffer) - sizeof(index::value_tail)); afio::file_handle::extent_type value_offset = _parent->_mysmallfile.length().value(); - using toupdate_type = std::tuple<key_type, uint64_t, index::value_history::item, index::open_hash_index::iterator>; - std::vector<toupdate_type> toupdate; - toupdate.reserve(its.size()); - for(const auto &item : _items) + for(size_t n = 0; n < _items.size(); n++) { - if(item.towrite.has_value()) + toupdate_type &thisupdate = toupdate[n]; + if(thisupdate.insertion || thisupdate.update || thisupdate.removal) { - vt->key = item.kvi.key; + const transaction::_item &item = _items[n]; + vt->key = thisupdate.key; vt->transaction_counter = this_transaction_counter; - vt->length = item.towrite->size(); - // TODO: Hash contents - size_t totalwrite = _parent->_pad_length(vt->length); - size_t tailbytes = totalwrite - vt->length; - assert(tailbytes < sizeof(buffer)); - afio::file_handle::const_buffer_type reqs[] = {{item.towrite->data(), item.towrite->size()}, {buffer + sizeof(buffer) - tailbytes, tailbytes}}; - _parent->_mysmallfile.write({reqs, 0}).value(); - index::value_history::item history_item; - history_item.transaction_counter = this_transaction_counter; - history_item.value_offset = (value_offset + totalwrite) / 64; - history_item.value_identifier = _parent->_mysmallfileidx; - history_item.length = vt->length; - toupdate.emplace_back(vt->key, item.kvi.transaction_counter, history_item, index::open_hash_index::iterator{}); + size_t totalwrite = 0; + if(thisupdate.removal) + { + vt->length = (uint64_t) -1; + totalwrite = 64; + size_t tailbytes = 64; + afio::file_handle::const_buffer_type reqs[] = {{buffer + sizeof(buffer) - tailbytes, tailbytes}}; + if(_parent->_indexheader->contents_hashed) + { + QUICKCPPLIB_NAMESPACE::algorithm::hash::fast_hash hasher; + memset(&vt->hash, 0, sizeof(vt->hash)); + hasher.add(reqs[0].data, reqs[0].len); + vt->hash = hasher.finalise(); + } + _parent->_mysmallfile.write({reqs, 0}).value(); + memset(&thisupdate.history_item, 0, sizeof(thisupdate.history_item)); + } + else + { + vt->length = item.towrite->size(); + totalwrite = _parent->_pad_length(item.towrite->size()); + size_t tailbytes = totalwrite - item.towrite->size(); + assert(tailbytes < sizeof(buffer)); + afio::file_handle::const_buffer_type reqs[] = {{item.towrite->data(), item.towrite->size()}, {buffer + sizeof(buffer) - tailbytes, tailbytes}}; + if(_parent->_indexheader->contents_hashed) + { + QUICKCPPLIB_NAMESPACE::algorithm::hash::fast_hash hasher; + memset(&vt->hash, 0, sizeof(vt->hash)); + hasher.add(reqs[0].data, reqs[0].len); + hasher.add(reqs[1].data, reqs[1].len); + vt->hash = hasher.finalise(); + } + _parent->_mysmallfile.write({reqs, 0}).value(); + index::value_history::item &history_item = thisupdate.history_item; + history_item.transaction_counter = this_transaction_counter; + history_item.value_offset = (value_offset + totalwrite) / 64; + history_item.value_identifier = _parent->_mysmallfileidx; + history_item.length = vt->length; + } value_offset += totalwrite; } } - // Release all the shared locks, and take exclusive locks - its.clear(); - for(auto &item : toupdate) + // Release all the shared locks on the existing items we are about to update + shared_locks.clear(); + // Bail out if store has become corrupted + if(_parent->_indexheader->magic != _parent->goodmagic) + throw corrupted_store(); + // Remove any newly inserted keys if we abort + auto removeinserted = undoer([this, &toupdate] { + for(auto updit = toupdate.rend(); updit != toupdate.rbegin(); ++updit) + { + if(updit->insertion && updit->it != _parent->_index->end()) + { + _parent->_index->erase(std::move(updit->it)); + } + } + }); + // Take exclusive locks on all items in this transaction, inserting new keys if necessary + for(toupdate_type &item : toupdate) { - auto it = _parent->_index->find_exclusive(std::get<0>(item)); + auto it = _parent->_index->find_exclusive(item.key); if(it != _parent->_index->end()) { - if(it->second.history[0].transaction_counter != std::get<1>(item)) + if(item.insertion || (item.update && it->second.history[0].transaction_counter != item.old_transaction_counter)) { - throw transaction_aborted(std::get<0>(item)); + // Item has changed since transaction begun + throw transaction_aborted(item.key); } } - std::get<3>(item) = std::move(it); + else + { + if(item.update || item.removal) + { + // Item has changed since transaction begun + throw transaction_aborted(item.key); + } + // Insert a new key with empty history + index::value_history vh; + memset(&vh, 0, sizeof(vh)); + it = _parent->_index->insert({item.key, std::move(vh)}).first; + if(it == _parent->_index->end()) + { + throw index_full(); + } + } + // Store the exclusive lock away for later + item.it = std::move(it); } - // Update all the values we are updating this transaction if(_parent->_indexheader->magic != _parent->goodmagic) throw corrupted_store(); + // Finally actually perform the update as quickly as possible to reduce the + // possibility of a partially issued update which is expensive to repair. + // This can no longer abort, so dismiss the removeinserter + removeinserted.dismiss(); _parent->_indexheader->writes_occurring[_parent->_mysmallfileidx].fetch_add(1); for(auto &item : toupdate) { - auto &it = std::get<3>(item); - if(it == _parent->_index->end()) - { - index::value_history vh; - memset(&vh, 0, sizeof(vh)); - vh.history[0] = std::get<2>(item); - _parent->_index->insert({std::get<0>(item), std::move(vh)}); - } - else + // Update existing value's latest revision + index::value_history &value = item.it->second; + memmove(value.history + 1, value.history, sizeof(value.history) - sizeof(value.history[0])); + value.history[0] = item.history_item; + if(item.removal) { - auto &indexitem = it->second; - memmove(indexitem.history + 1, indexitem.history, sizeof(indexitem.history) / sizeof(indexitem.history[0]) - 1); - indexitem.history[0] = std::get<2>(item); + bool alldeleted = true; + for(const auto &h : value.history) + { + if(h.transaction_counter != 0) + { + alldeleted = false; + break; + } + } + if(alldeleted) + { + _parent->_index->erase(std::move(item.it)); + } } } _parent->_indexheader->writes_occurring[_parent->_mysmallfileidx].fetch_sub(1); diff --git a/programs/key-value-store/main.cpp b/programs/key-value-store/main.cpp index 56b026d2..02c21f59 100644 --- a/programs/key-value-store/main.cpp +++ b/programs/key-value-store/main.cpp @@ -26,22 +26,107 @@ Distributed under the Boost Software License, Version 1.0. int main() { +#ifdef _WIN32 + SetThreadAffinityMask(GetCurrentThread(), 1); +#endif try { - key_value_store::basic_key_value_store store("teststore", 10); - auto kvi = store.find(78); - if(kvi) + AFIO_V2_NAMESPACE::filesystem::remove_all("teststore"); + key_value_store::basic_key_value_store store("teststore", 2000000); { - std::cout << "Key 78 has value " << kvi.value << " and it was last updated at " << kvi.transaction_counter << std::endl; + key_value_store::transaction tr(store); + tr.fetch(78); + tr.update(78, "niall"); + tr.commit(); + auto kvi = store.find(78); + if(kvi) + { + std::cout << "Key 78 has value " << kvi.value << " and it was last updated at " << kvi.transaction_counter << std::endl; + } + else + { + std::cerr << "FAILURE: Key 78 was not found!" << std::endl; + } } - else { - std::cout << "Key 78 was not found!" << std::endl; + key_value_store::transaction tr(store); + tr.fetch(79); + tr.update(79, "douglas"); + tr.commit(); + auto kvi = store.find(79); + if(kvi) + { + std::cout << "Key 79 has value " << kvi.value << " and it was last updated at " << kvi.transaction_counter << std::endl; + } + else + { + std::cerr << "FAILURE: Key 79 was not found!" << std::endl; + } + } + { + key_value_store::transaction tr(store); + tr.fetch(78); + tr.remove(78); + tr.commit(); + auto kvi = store.find(78, 0); + if(kvi) + { + std::cerr << "FAILURE: Revision 0 of Key 78 has value " << kvi.value << " and it was last updated at " << kvi.transaction_counter << std::endl; + } + else + { + std::cout << "Revision 0 of key 78 was not found!" << std::endl; + } + kvi = store.find(78, 1); + if(kvi) + { + std::cout << "Revision 1 of Key 78 has value " << kvi.value << " and it was last updated at " << kvi.transaction_counter << std::endl; + } + else + { + std::cerr << "FAILURE: Revision 1Key 78 was not found!" << std::endl; + } + } + + // Write 1M values and see how long it takes + std::vector<std::pair<uint64_t, std::string>> values; + std::cout << "\nGenerating 1M key-value pairs ..." << std::endl; + for(size_t n = 0; n < 1000000; n++) + { + std::string randomvalue = AFIO_V2_NAMESPACE::utils::random_string(1024); + values.push_back({100 + n, randomvalue}); + } + std::cout << "Inserting 1M key-value pairs ..." << std::endl; + { + auto begin = std::chrono::high_resolution_clock::now(); + for(size_t n = 0; n < values.size(); n += 1024) + { + key_value_store::transaction tr(store); + for(size_t m = 0; m < 1024; m++) + { + if(n + m >= values.size()) + break; + auto &i = values[n + m]; + tr.update_unsafe(i.first, i.second); + } + tr.commit(); + } + auto end = std::chrono::high_resolution_clock::now(); + auto diff = std::chrono::duration_cast<std::chrono::milliseconds>(end - begin).count(); + std::cout << "Inserted at " << (1000000000ULL / diff) << " items per sec" << std::endl; + } + std::cout << "Retrieving 1M key-value pairs ..." << std::endl; + { + auto begin = std::chrono::high_resolution_clock::now(); + for(auto &i : values) + { + if(!store.find(i.first)) + abort(); + } + auto end = std::chrono::high_resolution_clock::now(); + auto diff = std::chrono::duration_cast<std::chrono::milliseconds>(end - begin).count(); + std::cout << "Fetched at " << (1000000000ULL / diff) << " items per sec" << std::endl; } - key_value_store::transaction tr(store); - tr.fetch(78); - tr.update(78, "niall"); - tr.commit(); } catch(const std::exception &e) { |