Welcome to mirror list, hosted at ThFree Co, Russian Federation.

github.com/windirstat/llfio.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorNiall Douglas (s [underscore] sourceforge {at} nedprod [dot] com) <spamtrap@nedprod.com>2017-09-03 04:35:13 +0300
committerNiall Douglas (s [underscore] sourceforge {at} nedprod [dot] com) <spamtrap@nedprod.com>2017-09-03 04:35:13 +0300
commitf15b8cec3026ad3cae3e0aa2f8390d3300305a75 (patch)
treeaa2a0159d008efbea9a0c291d5dde00cc63cc58b /programs
parent2b8fdf95e2013295b799d7ee198a1cbc067d9e36 (diff)
Implemented key deletion into the toy key-value store.
Actually handle errors properly now, correctly rolling back and abandoning any transaction.
Diffstat (limited to 'programs')
-rw-r--r--programs/key-value-store/Readme.md29
-rw-r--r--programs/key-value-store/include/key_value_store.hpp344
-rw-r--r--programs/key-value-store/main.cpp105
3 files changed, 401 insertions, 77 deletions
diff --git a/programs/key-value-store/Readme.md b/programs/key-value-store/Readme.md
index 9fd91549..e10eecd5 100644
--- a/programs/key-value-store/Readme.md
+++ b/programs/key-value-store/Readme.md
@@ -5,3 +5,32 @@ update as an atomic transaction up to 65,535 key-values at once.
It is purely to test the feasibility of one approach to implementing such
a store, and to test AFIO's design. Nobody should use this store for
anything serious.
+
+## Todo:
+- [ ] Add sparse file creation on Windows to AFIO and see how the
+benchmarks fare.
+- [x] Add key-value deletion.
+- [ ] Atomic append should issue gather buffers of `IOV_MAX`
+- [ ] Optionally use mmaps to extend smallfile instead of atomic appends.
+Likely highly racy on Linux due to kernel bugs :)
+- [ ] Online free space consolidation (copy early still in use records
+into new small file, update index to use new small file)
+ - [ ] Per 1Mb free space consolidated, punch hole
+- [ ] Need some way of detecting and breaking sudden process exit during
+index update.
+
+## Benchmarks:
+- 1Kb values Windows with NTFS, no integrity, no durability:
+ ```
+ Inserting 1M key-value pairs ...
+ Inserted at 125596 items per sec
+ Retrieving 1M key-value pairs ...
+ Fetched at 524934 items per sec
+ ```
+- 64 byte values Windows with NTFS, no integrity, no durability:
+ ```
+ Inserting 1M key-value pairs ...
+ Inserted at 247402 items per sec
+ Retrieving 1M key-value pairs ...
+ Fetched at 700770 items per sec
+ ``` \ No newline at end of file
diff --git a/programs/key-value-store/include/key_value_store.hpp b/programs/key-value-store/include/key_value_store.hpp
index d964f17d..a5588e0a 100644
--- a/programs/key-value-store/include/key_value_store.hpp
+++ b/programs/key-value-store/include/key_value_store.hpp
@@ -36,6 +36,7 @@ namespace key_value_store
namespace afio = AFIO_V2_NAMESPACE;
template <class T> using optional = afio::optional<T>;
template <class T> using span = afio::span<T>;
+ using afio::undoer;
using uint128 = QUICKCPPLIB_NAMESPACE::integers128::uint128;
using key_type = uint128;
@@ -79,6 +80,14 @@ namespace key_value_store
{
}
};
+ class index_full : std::runtime_error
+ {
+ public:
+ index_full()
+ : std::runtime_error("Failed to insert new key due to too many collisions, use a bigger index!")
+ {
+ }
+ };
class transaction_aborted : std::runtime_error
{
key_type _key;
@@ -115,15 +124,19 @@ namespace key_value_store
- uint128 key 16 bytes
- value_history 104 bytes
*/
- using open_hash_index = basic_open_hash_index<atomic_linear_memory_policy<key_type, value_history, 1>, AFIO_V2_NAMESPACE::algorithm::mapped_view>;
+ using open_hash_index = basic_open_hash_index<atomic_linear_memory_policy<key_type, value_history, 0>, AFIO_V2_NAMESPACE::algorithm::mapped_view>;
static_assert(sizeof(open_hash_index::value_type) == 128, "open_hash_index::value_type is wrong size");
struct index
{
uint64_t magic; // versionmagic, currently "AFIOKV01" for valid, "DEADKV01" for requires repair
std::atomic<uint64_t> transaction_counter; // top 16 bits are number of keys changed this transaction, bottom 48 bits are monotonic counter
+ uint128 hash; // Optional hash of index file written on last close to guard against systems which don't write mmaps properly
std::atomic<unsigned> writes_occurring[48]; // Incremented just before an update, decremented after, per writer
std::atomic<bool> all_writes_synced; // Set if all writers since the first which has opened this store did so with `O_SYNC` on (i.e. safe during fsck to check small file tails only)
+
+ uint64_t contents_hashed : 1; // If records written are hashed and checked on fetch
+ uint64_t key_is_hash_of_value : 1; // On read, check hash of value equals key
};
struct value_tail
@@ -131,7 +144,7 @@ namespace key_value_store
uint128 hash; // 128 bit hash of contents
key_type key;
uint64_t transaction_counter; // transaction counter when this was updated
- uint64_t length;
+ uint64_t length; // (uint64_t)-1 means key was deleted
};
static_assert(sizeof(value_tail) == 48, "value_tail is wrong size");
}
@@ -216,8 +229,8 @@ namespace key_value_store
{
throw maximum_writers_reached();
}
- // Set up the index
- afio::section_handle sh = afio::section_handle::section(_indexfile).value();
+ // Set up the index, either r/w or read only with copy on write
+ afio::section_handle sh = afio::section_handle::section(_indexfile, 0, (mode == afio::file_handle::mode::write) ? afio::section_handle::flag::readwrite : (afio::section_handle::flag::read | afio::section_handle::flag::cow)).value();
afio::file_handle::extent_type len = sh.length();
len -= sizeof(index::index);
len /= sizeof(index::open_hash_index::value_type);
@@ -260,16 +273,20 @@ namespace key_value_store
/* TODO: Check consistency of index by checking that every item's transaction counter is within 2^47 of head's
and that no item has a transaction counter later than head's.
- Check that every smallfile's tail points to a value record which matches one in the history in the index
+ Check that every smallfile's tail points to a complete set of value records which matches the one in the history in the index
or that that key's latest value exists and is valid.
*/
//_openfiles(dir, writable);
+ if(_indexheader->contents_hashed)
+ {
+ }
}
// Reset writes_occurring and all_writes_synced
index::index i;
_indexfile.read(0, (char *) &i, sizeof(i)).value();
memset(i.writes_occurring, 0, sizeof(i.writes_occurring));
i.all_writes_synced = _indexfile.are_writes_durable();
+ memset(&i.hash, 0, sizeof(i.hash));
_indexfile.write(0, (char *) &i, sizeof(i)).value();
}
}
@@ -285,12 +302,9 @@ namespace key_value_store
}
// Open our smallfiles and map our index for shared usage
_openfiles(dir, mode, caching);
- if(mode == afio::file_handle::mode::write)
+ if(!_indexfile.are_writes_durable())
{
- if(!_indexfile.are_writes_durable())
- {
- _indexheader->all_writes_synced = false;
- }
+ _indexheader->all_writes_synced = false;
}
}
//! \overload
@@ -303,10 +317,27 @@ namespace key_value_store
: basic_key_value_store(afio::path_handle::path(dir).value(), 0, afio::file_handle::mode::read)
{
}
+ ~basic_key_value_store()
+ {
+ // Release my smallfile
+ _smallfileguard.unlock();
+ _mysmallfile.close().value();
+ // Try to lock the index exclusively
+ _indexfileguard.unlock();
+ auto indexfileguard = _indexfile.try_lock(_indexinuseoffset, 1, true);
+ if(indexfileguard)
+ {
+ // I am the last user
+ if(_indexheader->contents_hashed)
+ {
+ _indexheader->hash = QUICKCPPLIB_NAMESPACE::algorithm::hash::fast_hash::hash((char *) _indexheader, _indexfile.length().value());
+ }
+ }
+ }
//! Retrieve when keys were last updated by setting the second to the latest transaction counter.
- //! Note that counter will be `(uint64_t)-1` for any unknown keys
- void last_updated(span<std::pair<key_type, uint64_t>> keys)
+ //! Note that counter will be `(uint64_t)-1` for any unknown keys. Never throws exceptions.
+ void last_updated(span<std::pair<key_type, uint64_t>> keys) noexcept
{
for(auto &key : keys)
{
@@ -325,6 +356,7 @@ namespace key_value_store
struct keyvalue_info
{
friend class basic_key_value_store;
+ friend class transaction;
//! The key
key_type key;
//! The value
@@ -371,20 +403,28 @@ namespace key_value_store
char *_value_buffer{nullptr};
afio::algorithm::mapped_view<const char> _value_view;
};
- //! Retrieve the latest value for a key
+ //! Retrieve the latest value for a key. May throw `corrupted_store`
keyvalue_info find(key_type key, size_t revision = 0)
{
if(_indexheader->magic != goodmagic)
throw corrupted_store();
+ if(revision >= 4)
+ throw std::invalid_argument("valid revision is 0-3");
auto it = _index->find_shared(key);
if(it == _index->end())
{
+ // No value as no key
return keyvalue_info(key);
}
else
{
// TODO Depending on length, make a mapped_view instead
const auto &item = it->second.history[revision];
+ if(item.transaction_counter == 0)
+ {
+ // No value on the key at this revision
+ return keyvalue_info(key);
+ }
size_t length = item.length, smallfilelength = _pad_length(length);
char *buffer = (char *) malloc(smallfilelength);
if(!buffer)
@@ -393,23 +433,37 @@ namespace key_value_store
}
if(item.value_identifier >= _smallfiles.read.size())
{
- // FIXME: Open newly created smallfiles
+ // TODO: Open newly created smallfiles
abort();
}
_smallfiles.read[item.value_identifier].read(item.value_offset * 64 - smallfilelength, buffer, smallfilelength).value();
index::value_tail *vt = reinterpret_cast<index::value_tail *>(buffer + smallfilelength - sizeof(index::value_tail));
- // TODO: Check hash equals contents if hashing enabled
+ if(_indexheader->contents_hashed || _indexheader->key_is_hash_of_value)
+ {
+ QUICKCPPLIB_NAMESPACE::algorithm::hash::fast_hash hasher;
+ uint128 tocheck = vt->hash;
+ memset(&vt->hash, 0, sizeof(vt->hash));
+ uint128 thishash = QUICKCPPLIB_NAMESPACE::algorithm::hash::fast_hash::hash(buffer, _indexheader->contents_hashed ? smallfilelength : length);
+ if(tocheck != thishash)
+ {
+ _indexheader->magic = badmagic;
+ throw corrupted_store();
+ }
+ }
if(vt->key != key)
{
- abort();
+ _indexheader->magic = badmagic;
+ throw corrupted_store();
}
if(vt->length != length)
{
- abort();
+ _indexheader->magic = badmagic;
+ throw corrupted_store();
}
if(vt->transaction_counter != item.transaction_counter)
{
- abort();
+ _indexheader->magic = badmagic;
+ throw corrupted_store();
}
return keyvalue_info(key, span<char>(buffer, length), item.transaction_counter);
}
@@ -426,8 +480,10 @@ namespace key_value_store
{
basic_key_value_store::keyvalue_info kvi; // the item's value when fetched
afio::optional<span<const char>> towrite; // the value to be written on commit
+ bool remove; // true if to remove
_item(basic_key_value_store::keyvalue_info &&_kvi)
: kvi(std::move(_kvi))
+ , remove(false)
{
}
};
@@ -461,7 +517,7 @@ namespace key_value_store
_items.push_back(_parent->find(key));
return _items.back().kvi.value;
}
- //! Set what a value will be updated to on commit
+ //! Set what a value will be updated to on commit. Requires the key to have been previously fetched to establish a base revision.
void update(key_type key, span<const char> towrite)
{
for(auto &i : _items)
@@ -469,37 +525,125 @@ namespace key_value_store
if(i.kvi.key == key)
{
i.towrite = towrite;
+ i.remove = false;
return;
}
}
throw bad_update();
}
+ //! Remove a key and its value on commit. Requires the key to have been previously fetched to establish a base revision.
+ void remove(key_type key)
+ {
+ for(auto &i : _items)
+ {
+ if(i.kvi.key == key)
+ {
+ i.towrite = {};
+ i.remove = true;
+ return;
+ }
+ }
+ throw bad_update();
+ }
+ //! Set what a value will be updated to on commit without establishing a base revision. Concurrent updates to this key may be lost!
+ void update_unsafe(key_type key, span<const char> towrite)
+ {
+ for(auto &i : _items)
+ {
+ if(i.kvi.key == key)
+ {
+ i.towrite = towrite;
+ i.remove = false;
+ return;
+ }
+ }
+ if(_items.size() == 65535)
+ {
+ throw transaction_limit_reached();
+ }
+ basic_key_value_store::keyvalue_info kvi(key);
+ _items.push_back(std::move(kvi));
+ _items.back().towrite = towrite;
+ }
+ //! Remove a key and its value on commit without establishing a base revision. Concurrent updates to this key may be lost!
+ void remove_unsafe(key_type key)
+ {
+ for(auto &i : _items)
+ {
+ if(i.kvi.key == key)
+ {
+ i.towrite = {};
+ i.remove = true;
+ return;
+ }
+ }
+ if(_items.size() == 65535)
+ {
+ throw transaction_limit_reached();
+ }
+ basic_key_value_store::keyvalue_info kvi(key);
+ _items.push_back(std::move(kvi));
+ _items.back().remove = true;
+ }
//! Commit the transaction, throwing `transaction_aborted` if a key's value was updated since it was fetched for this transaction.
void commit()
{
if(_parent->_indexheader->magic != _parent->goodmagic)
throw corrupted_store();
- // Firstly sort the list of keys we are to update into order. This ensures that all
- // writers always lock the keys in the same order, thus preventing deadlock.
+ // Firstly remove any items fetched but not used as a base for an update, and sort the remaining
+ // list of keys we are to update into order. This ensures that all writers always lock the keys
+ // in the same order, thus preventing deadlock.
+ _items.erase(std::remove_if(_items.begin(), _items.end(), [](const auto &item) { return !item.towrite.has_value() && !item.remove; }), _items.end());
std::sort(_items.begin(), _items.end(), [](const _item &a, const _item &b) { return a.kvi.key < b.kvi.key; });
- // Take out shared locks on all the items in my commit, early checking if we will abort
- std::vector<index::open_hash_index::const_iterator> its;
- its.reserve(_items.size());
+ // The update list, filled in as we progress
+ struct toupdate_type
+ {
+ const key_type key;
+ const uint64_t old_transaction_counter;
+ const bool insertion, update, removal;
+ index::value_history::item history_item{};
+ index::open_hash_index::iterator it{};
+ toupdate_type(key_type _key, uint64_t _old_transaction_counter, bool _insertion, bool _update, bool _removal)
+ : key(_key)
+ , old_transaction_counter(_old_transaction_counter)
+ , insertion(_insertion)
+ , update(_update)
+ , removal(_removal)
+ {
+ }
+ };
+ std::vector<toupdate_type> toupdate;
+ toupdate.reserve(_items.size());
+
+ // Take out shared locks on all the items in my commit with existing values, early checking if we will abort
+ std::vector<index::open_hash_index::const_iterator> shared_locks;
+ shared_locks.reserve(_items.size());
for(const auto &item : _items)
{
- if(item.towrite.has_value())
+ bool insertion = false, update = false, removal = false;
+ if(item.towrite.has_value() || item.remove)
{
- its.push_back(_parent->_index->find_shared(item.kvi.key));
- if(its.back() != _parent->_index->end())
+ auto it = _parent->_index->find_shared(item.kvi.key);
+ if(it != _parent->_index->end())
{
- if(its.back()->second.history[0].transaction_counter != item.kvi.transaction_counter)
+ // If item was fetched before update and it has since changed, abort
+ if(item.kvi.transaction_counter != (uint64_t) -1 && it->second.history[0].transaction_counter != item.kvi.transaction_counter)
{
throw transaction_aborted(item.kvi.key);
}
+ shared_locks.push_back(std::move(it));
+ removal = item.remove;
+ update = !item.remove;
+ }
+ else
+ {
+ insertion = true;
}
}
+ assert(insertion + update + removal == 1);
+ toupdate.emplace_back(item.kvi.key, item.kvi.transaction_counter, insertion, update, removal);
}
// Atomically increment the transaction counter to set this latest transaction
uint64_t old_transaction_counter;
@@ -524,66 +668,132 @@ namespace key_value_store
memset(buffer, 0, sizeof(buffer));
index::value_tail *vt = reinterpret_cast<index::value_tail *>(buffer + sizeof(buffer) - sizeof(index::value_tail));
afio::file_handle::extent_type value_offset = _parent->_mysmallfile.length().value();
- using toupdate_type = std::tuple<key_type, uint64_t, index::value_history::item, index::open_hash_index::iterator>;
- std::vector<toupdate_type> toupdate;
- toupdate.reserve(its.size());
- for(const auto &item : _items)
+ for(size_t n = 0; n < _items.size(); n++)
{
- if(item.towrite.has_value())
+ toupdate_type &thisupdate = toupdate[n];
+ if(thisupdate.insertion || thisupdate.update || thisupdate.removal)
{
- vt->key = item.kvi.key;
+ const transaction::_item &item = _items[n];
+ vt->key = thisupdate.key;
vt->transaction_counter = this_transaction_counter;
- vt->length = item.towrite->size();
- // TODO: Hash contents
- size_t totalwrite = _parent->_pad_length(vt->length);
- size_t tailbytes = totalwrite - vt->length;
- assert(tailbytes < sizeof(buffer));
- afio::file_handle::const_buffer_type reqs[] = {{item.towrite->data(), item.towrite->size()}, {buffer + sizeof(buffer) - tailbytes, tailbytes}};
- _parent->_mysmallfile.write({reqs, 0}).value();
- index::value_history::item history_item;
- history_item.transaction_counter = this_transaction_counter;
- history_item.value_offset = (value_offset + totalwrite) / 64;
- history_item.value_identifier = _parent->_mysmallfileidx;
- history_item.length = vt->length;
- toupdate.emplace_back(vt->key, item.kvi.transaction_counter, history_item, index::open_hash_index::iterator{});
+ size_t totalwrite = 0;
+ if(thisupdate.removal)
+ {
+ vt->length = (uint64_t) -1;
+ totalwrite = 64;
+ size_t tailbytes = 64;
+ afio::file_handle::const_buffer_type reqs[] = {{buffer + sizeof(buffer) - tailbytes, tailbytes}};
+ if(_parent->_indexheader->contents_hashed)
+ {
+ QUICKCPPLIB_NAMESPACE::algorithm::hash::fast_hash hasher;
+ memset(&vt->hash, 0, sizeof(vt->hash));
+ hasher.add(reqs[0].data, reqs[0].len);
+ vt->hash = hasher.finalise();
+ }
+ _parent->_mysmallfile.write({reqs, 0}).value();
+ memset(&thisupdate.history_item, 0, sizeof(thisupdate.history_item));
+ }
+ else
+ {
+ vt->length = item.towrite->size();
+ totalwrite = _parent->_pad_length(item.towrite->size());
+ size_t tailbytes = totalwrite - item.towrite->size();
+ assert(tailbytes < sizeof(buffer));
+ afio::file_handle::const_buffer_type reqs[] = {{item.towrite->data(), item.towrite->size()}, {buffer + sizeof(buffer) - tailbytes, tailbytes}};
+ if(_parent->_indexheader->contents_hashed)
+ {
+ QUICKCPPLIB_NAMESPACE::algorithm::hash::fast_hash hasher;
+ memset(&vt->hash, 0, sizeof(vt->hash));
+ hasher.add(reqs[0].data, reqs[0].len);
+ hasher.add(reqs[1].data, reqs[1].len);
+ vt->hash = hasher.finalise();
+ }
+ _parent->_mysmallfile.write({reqs, 0}).value();
+ index::value_history::item &history_item = thisupdate.history_item;
+ history_item.transaction_counter = this_transaction_counter;
+ history_item.value_offset = (value_offset + totalwrite) / 64;
+ history_item.value_identifier = _parent->_mysmallfileidx;
+ history_item.length = vt->length;
+ }
value_offset += totalwrite;
}
}
- // Release all the shared locks, and take exclusive locks
- its.clear();
- for(auto &item : toupdate)
+ // Release all the shared locks on the existing items we are about to update
+ shared_locks.clear();
+ // Bail out if store has become corrupted
+ if(_parent->_indexheader->magic != _parent->goodmagic)
+ throw corrupted_store();
+ // Remove any newly inserted keys if we abort
+ auto removeinserted = undoer([this, &toupdate] {
+ for(auto updit = toupdate.rend(); updit != toupdate.rbegin(); ++updit)
+ {
+ if(updit->insertion && updit->it != _parent->_index->end())
+ {
+ _parent->_index->erase(std::move(updit->it));
+ }
+ }
+ });
+ // Take exclusive locks on all items in this transaction, inserting new keys if necessary
+ for(toupdate_type &item : toupdate)
{
- auto it = _parent->_index->find_exclusive(std::get<0>(item));
+ auto it = _parent->_index->find_exclusive(item.key);
if(it != _parent->_index->end())
{
- if(it->second.history[0].transaction_counter != std::get<1>(item))
+ if(item.insertion || (item.update && it->second.history[0].transaction_counter != item.old_transaction_counter))
{
- throw transaction_aborted(std::get<0>(item));
+ // Item has changed since transaction begun
+ throw transaction_aborted(item.key);
}
}
- std::get<3>(item) = std::move(it);
+ else
+ {
+ if(item.update || item.removal)
+ {
+ // Item has changed since transaction begun
+ throw transaction_aborted(item.key);
+ }
+ // Insert a new key with empty history
+ index::value_history vh;
+ memset(&vh, 0, sizeof(vh));
+ it = _parent->_index->insert({item.key, std::move(vh)}).first;
+ if(it == _parent->_index->end())
+ {
+ throw index_full();
+ }
+ }
+ // Store the exclusive lock away for later
+ item.it = std::move(it);
}
- // Update all the values we are updating this transaction
if(_parent->_indexheader->magic != _parent->goodmagic)
throw corrupted_store();
+ // Finally actually perform the update as quickly as possible to reduce the
+ // possibility of a partially issued update which is expensive to repair.
+ // This can no longer abort, so dismiss the removeinserter
+ removeinserted.dismiss();
_parent->_indexheader->writes_occurring[_parent->_mysmallfileidx].fetch_add(1);
for(auto &item : toupdate)
{
- auto &it = std::get<3>(item);
- if(it == _parent->_index->end())
- {
- index::value_history vh;
- memset(&vh, 0, sizeof(vh));
- vh.history[0] = std::get<2>(item);
- _parent->_index->insert({std::get<0>(item), std::move(vh)});
- }
- else
+ // Update existing value's latest revision
+ index::value_history &value = item.it->second;
+ memmove(value.history + 1, value.history, sizeof(value.history) - sizeof(value.history[0]));
+ value.history[0] = item.history_item;
+ if(item.removal)
{
- auto &indexitem = it->second;
- memmove(indexitem.history + 1, indexitem.history, sizeof(indexitem.history) / sizeof(indexitem.history[0]) - 1);
- indexitem.history[0] = std::get<2>(item);
+ bool alldeleted = true;
+ for(const auto &h : value.history)
+ {
+ if(h.transaction_counter != 0)
+ {
+ alldeleted = false;
+ break;
+ }
+ }
+ if(alldeleted)
+ {
+ _parent->_index->erase(std::move(item.it));
+ }
}
}
_parent->_indexheader->writes_occurring[_parent->_mysmallfileidx].fetch_sub(1);
diff --git a/programs/key-value-store/main.cpp b/programs/key-value-store/main.cpp
index 56b026d2..02c21f59 100644
--- a/programs/key-value-store/main.cpp
+++ b/programs/key-value-store/main.cpp
@@ -26,22 +26,107 @@ Distributed under the Boost Software License, Version 1.0.
int main()
{
+#ifdef _WIN32
+ SetThreadAffinityMask(GetCurrentThread(), 1);
+#endif
try
{
- key_value_store::basic_key_value_store store("teststore", 10);
- auto kvi = store.find(78);
- if(kvi)
+ AFIO_V2_NAMESPACE::filesystem::remove_all("teststore");
+ key_value_store::basic_key_value_store store("teststore", 2000000);
{
- std::cout << "Key 78 has value " << kvi.value << " and it was last updated at " << kvi.transaction_counter << std::endl;
+ key_value_store::transaction tr(store);
+ tr.fetch(78);
+ tr.update(78, "niall");
+ tr.commit();
+ auto kvi = store.find(78);
+ if(kvi)
+ {
+ std::cout << "Key 78 has value " << kvi.value << " and it was last updated at " << kvi.transaction_counter << std::endl;
+ }
+ else
+ {
+ std::cerr << "FAILURE: Key 78 was not found!" << std::endl;
+ }
}
- else
{
- std::cout << "Key 78 was not found!" << std::endl;
+ key_value_store::transaction tr(store);
+ tr.fetch(79);
+ tr.update(79, "douglas");
+ tr.commit();
+ auto kvi = store.find(79);
+ if(kvi)
+ {
+ std::cout << "Key 79 has value " << kvi.value << " and it was last updated at " << kvi.transaction_counter << std::endl;
+ }
+ else
+ {
+ std::cerr << "FAILURE: Key 79 was not found!" << std::endl;
+ }
+ }
+ {
+ key_value_store::transaction tr(store);
+ tr.fetch(78);
+ tr.remove(78);
+ tr.commit();
+ auto kvi = store.find(78, 0);
+ if(kvi)
+ {
+ std::cerr << "FAILURE: Revision 0 of Key 78 has value " << kvi.value << " and it was last updated at " << kvi.transaction_counter << std::endl;
+ }
+ else
+ {
+ std::cout << "Revision 0 of key 78 was not found!" << std::endl;
+ }
+ kvi = store.find(78, 1);
+ if(kvi)
+ {
+ std::cout << "Revision 1 of Key 78 has value " << kvi.value << " and it was last updated at " << kvi.transaction_counter << std::endl;
+ }
+ else
+ {
+ std::cerr << "FAILURE: Revision 1Key 78 was not found!" << std::endl;
+ }
+ }
+
+ // Write 1M values and see how long it takes
+ std::vector<std::pair<uint64_t, std::string>> values;
+ std::cout << "\nGenerating 1M key-value pairs ..." << std::endl;
+ for(size_t n = 0; n < 1000000; n++)
+ {
+ std::string randomvalue = AFIO_V2_NAMESPACE::utils::random_string(1024);
+ values.push_back({100 + n, randomvalue});
+ }
+ std::cout << "Inserting 1M key-value pairs ..." << std::endl;
+ {
+ auto begin = std::chrono::high_resolution_clock::now();
+ for(size_t n = 0; n < values.size(); n += 1024)
+ {
+ key_value_store::transaction tr(store);
+ for(size_t m = 0; m < 1024; m++)
+ {
+ if(n + m >= values.size())
+ break;
+ auto &i = values[n + m];
+ tr.update_unsafe(i.first, i.second);
+ }
+ tr.commit();
+ }
+ auto end = std::chrono::high_resolution_clock::now();
+ auto diff = std::chrono::duration_cast<std::chrono::milliseconds>(end - begin).count();
+ std::cout << "Inserted at " << (1000000000ULL / diff) << " items per sec" << std::endl;
+ }
+ std::cout << "Retrieving 1M key-value pairs ..." << std::endl;
+ {
+ auto begin = std::chrono::high_resolution_clock::now();
+ for(auto &i : values)
+ {
+ if(!store.find(i.first))
+ abort();
+ }
+ auto end = std::chrono::high_resolution_clock::now();
+ auto diff = std::chrono::duration_cast<std::chrono::milliseconds>(end - begin).count();
+ std::cout << "Fetched at " << (1000000000ULL / diff) << " items per sec" << std::endl;
}
- key_value_store::transaction tr(store);
- tr.fetch(78);
- tr.update(78, "niall");
- tr.commit();
}
catch(const std::exception &e)
{