LLFIO  v2.00 late alpha
llfio_v2_xxx::algorithm::shared_fs_mutex::atomic_append Class Reference

Scalable many entity shared/exclusive file system based lock. More...

#include "atomic_append.hpp"

Inheritance diagram for llfio_v2_xxx::algorithm::shared_fs_mutex::atomic_append:
llfio_v2_xxx::algorithm::shared_fs_mutex::shared_fs_mutex

Public Types

using entity_type = shared_fs_mutex::entity_type
 The type of an entity id.
 
using entities_type = shared_fs_mutex::entities_type
 The type of a sequence of entities.
 

Public Member Functions

 atomic_append (const atomic_append &)=delete
 No copy construction.
 
atomic_appendoperator= (const atomic_append &)=delete
 No copy assignment.
 
 atomic_append (atomic_append &&o) noexcept
 Move constructor.
 
atomic_appendoperator= (atomic_append &&o) noexcept
 Move assign.
 
const file_handlehandle () const noexcept
 Return the handle to file being used for this lock.
 
virtual void unlock (entities_type entities, unsigned long long hint) noexcept final
 Unlock a previously locked sequence of entities.
 
entity_type entity_from_buffer (const char *buffer, size_t bytes, bool exclusive=true) noexcept
 Generates an entity id from a sequence of bytes.
 
template<typename T >
entity_type entity_from_string (const std::basic_string< T > &str, bool exclusive=true) noexcept
 Generates an entity id from a string.
 
entity_type random_entity (bool exclusive=true) noexcept
 Generates a cryptographically random entity id.
 
void fill_random_entities (span< entity_type > seq, bool exclusive=true) noexcept
 Fills a sequence of entity ids with cryptographic randomness. Much faster than calling random_entity() individually.
 
result< entities_guardlock (entities_type entities, deadline d=deadline(), bool spin_not_sleep=false) noexcept
 Lock all of a sequence of entities for exclusive or shared access.
 
result< entities_guardlock (entity_type entity, deadline d=deadline(), bool spin_not_sleep=false) noexcept
 Lock a single entity for exclusive or shared access.
 
result< entities_guardtry_lock (entities_type entities) noexcept
 Try to lock all of a sequence of entities for exclusive or shared access.
 
result< entities_guardtry_lock (entity_type entity) noexcept
 Try to lock a single entity for exclusive or shared access.
 

Static Public Member Functions

static result< atomic_appendfs_mutex_append (const path_handle &base, path_view lockfile, bool nfs_compatibility=false, bool skip_hashing=false) noexcept
 

Protected Member Functions

virtual result< void > _lock (entities_guard &out, deadline d, bool spin_not_sleep) noexcept final
 

Detailed Description

Scalable many entity shared/exclusive file system based lock.

Lock files and byte ranges scale poorly to the number of items being concurrently locked with typically an exponential drop off in performance as the number of items being concurrently locked rises. This file system algorithm solves this problem using IPC via a shared append-only lock file.

  • Compatible with networked file systems (NFS too if the special nfs_compatibility flag is true. Note turning this on is not free of cost if you don't need NFS compatibility).
  • Nearly constant time to number of entities being locked.
  • Nearly constant time to number of processes concurrently using the lock (i.e. number of waiters).
  • Can sleep until a lock becomes free in a power-efficient manner.
  • Sudden power loss during use is recovered from.

Caveats:

  • Much slower than byte_ranges for few waiters or small number of entities.
  • Sudden process exit with locks held will deadlock all other users.
  • Maximum of twelve entities may be locked concurrently.
  • Wasteful of disk space if used on a non-extents based filing system (e.g. FAT32, ext3). It is best used in /tmp if possible (file_handle::temp_file()). If you really must use a non-extents based filing system, destroy and recreate the object instance periodically to force resetting the lock file's length to zero.
  • Similarly older operating systems (e.g. Linux < 3.0) do not implement extent hole punching and therefore will also see excessive disk space consumption. Note at the time of writing OS X doesn't implement hole punching at all.
  • If your OS doesn't have sane byte range locks (OS X, BSD, older Linuxes) and multiple objects in your process use the same lock file, misoperation will occur. Use lock_files instead.
Todo:

Implement hole punching once I port that code from LLFIO v1.

Decide on some resolution mechanism for sudden process exit.

There is a 1 out of 2^64-2 chance of unique id collision. It would be nice if we actually formally checked that our chosen unique id is actually unique.

Member Function Documentation

◆ _lock()

virtual result<void> llfio_v2_xxx::algorithm::shared_fs_mutex::atomic_append::_lock ( entities_guard out,
deadline  d,
bool  spin_not_sleep 
)
inlinefinalprotectedvirtualnoexcept
Todo:
Read from header.last_known_good immediately if possible in order to avoid a duplicate read later

Implements llfio_v2_xxx::algorithm::shared_fs_mutex::shared_fs_mutex.

249  {
250  LLFIO_LOG_FUNCTION_CALL(this);
251  atomic_append_detail::lock_request lock_request;
252  if(out.entities.size() > sizeof(lock_request.entities) / sizeof(lock_request.entities[0]))
253  {
254  return errc::argument_list_too_long;
255  }
256 
257  std::chrono::steady_clock::time_point began_steady;
258  std::chrono::system_clock::time_point end_utc;
259  if(d)
260  {
261  if((d).steady)
262  {
263  began_steady = std::chrono::steady_clock::now();
264  }
265  else
266  {
267  end_utc = (d).to_time_point();
268  }
269  }
270  // Fire this if an error occurs
271  auto disableunlock = undoer([&] { out.release(); });
272 
273  // Write my lock request immediately
274  memset(&lock_request, 0, sizeof(lock_request));
275  lock_request.unique_id = _unique_id;
276  auto count = std::chrono::system_clock::now() - std::chrono::system_clock::from_time_t(_header.time_offset);
277  lock_request.us_count = std::chrono::duration_cast<std::chrono::microseconds>(count).count();
278  lock_request.items = out.entities.size();
279  memcpy(lock_request.entities, out.entities.data(), sizeof(lock_request.entities[0]) * out.entities.size());
280  if(!_skip_hashing)
281  {
282  lock_request.hash = QUICKCPPLIB_NAMESPACE::algorithm::hash::fast_hash::hash((reinterpret_cast<char *>(&lock_request)) + 16, sizeof(lock_request) - 16);
283  }
284  // My lock request will be the file's current length or higher
285  OUTCOME_TRY(my_lock_request_offset, _h.maximum_extent());
286  {
287  OUTCOME_TRYV(_h.set_append_only(true));
288  auto undo = undoer([this] { (void) _h.set_append_only(false); });
289  file_handle::extent_guard append_guard;
290  if(_nfs_compatibility)
291  {
292  auto lastbyte = static_cast<file_handle::extent_type>(-1);
293  // Lock up to the beginning of the shadow lock space
294  lastbyte &= ~(1ULL << 63U);
295  OUTCOME_TRY(append_guard_, _h.lock(my_lock_request_offset, lastbyte, true));
296  append_guard = std::move(append_guard_);
297  }
298  OUTCOME_TRYV(_h.write(0, {{reinterpret_cast<byte *>(&lock_request), sizeof(lock_request)}}));
299  }
300 
301  // Find the record I just wrote
302  alignas(64) byte _buffer[4096 + 2048]; // 6Kb cache line aligned buffer
303  // Read onwards from length as reported before I wrote my lock request
304  // until I find my lock request. This loop should never actually iterate
305  // except under extreme load conditions.
306  //! \todo Read from header.last_known_good immediately if possible in order
307  //! to avoid a duplicate read later
308  for(;;)
309  {
310  file_handle::buffer_type req{_buffer, sizeof(_buffer)};
311  file_handle::io_result<file_handle::buffers_type> readoutcome = _h.read({req, my_lock_request_offset});
312  // Should never happen :)
313  if(readoutcome.has_error())
314  {
315  LLFIO_LOG_FATAL(this, "atomic_append::lock() saw an error when searching for just written data");
316  std::terminate();
317  }
318  const atomic_append_detail::lock_request *record, *lastrecord;
319  for(record = reinterpret_cast<const atomic_append_detail::lock_request *>(readoutcome.value()[0].data()), lastrecord = reinterpret_cast<const atomic_append_detail::lock_request *>(readoutcome.value()[0].data() + readoutcome.value()[0].size()); record < lastrecord && record->hash != lock_request.hash;
320  ++record)
321  {
322  my_lock_request_offset += sizeof(atomic_append_detail::lock_request);
323  }
324  if(record->hash == lock_request.hash)
325  {
326  break;
327  }
328  }
329 
330  // extent_guard is now valid and will be unlocked on error
331  out.hint = my_lock_request_offset;
332  disableunlock.dismiss();
333 
334  // Lock my request for writing so others can sleep on me
335  file_handle::extent_guard my_request_guard;
336  if(!spin_not_sleep)
337  {
338  auto lock_offset = my_lock_request_offset;
339  // Set the top bit to use the shadow lock space on Windows
340  lock_offset |= (1ULL << 63U);
341  OUTCOME_TRY(my_request_guard_, _h.lock(lock_offset, sizeof(lock_request), true));
342  my_request_guard = std::move(my_request_guard_);
343  }
344 
345  // Read every record preceding mine until header.first_known_good inclusive
346  auto record_offset = my_lock_request_offset - sizeof(atomic_append_detail::lock_request);
347  do
348  {
349  reload:
350  // Refresh the header and load a snapshot of everything between record_offset
351  // and first_known_good or -6Kb, whichever the sooner
352  OUTCOME_TRYV(_read_header());
353  // If there are no preceding records, we're done
354  if(record_offset < _header.first_known_good)
355  {
356  break;
357  }
358  auto start_offset = record_offset;
359  if(start_offset > sizeof(_buffer) - sizeof(atomic_append_detail::lock_request))
360  {
361  start_offset -= sizeof(_buffer) - sizeof(atomic_append_detail::lock_request);
362  }
363  else
364  {
365  start_offset = sizeof(atomic_append_detail::lock_request);
366  }
367  if(start_offset < _header.first_known_good)
368  {
369  start_offset = _header.first_known_good;
370  }
371  assert(record_offset >= start_offset);
372  assert(record_offset - start_offset <= sizeof(_buffer));
373  file_handle::buffer_type req{_buffer, (size_t)(record_offset - start_offset) + sizeof(atomic_append_detail::lock_request)};
374  OUTCOME_TRY(batchread, _h.read({req, start_offset}));
375  assert(batchread[0].size() == record_offset - start_offset + sizeof(atomic_append_detail::lock_request));
376  const atomic_append_detail::lock_request *record = reinterpret_cast<atomic_append_detail::lock_request *>(batchread[0].data() + batchread[0].size() - sizeof(atomic_append_detail::lock_request));
377  const atomic_append_detail::lock_request *firstrecord = reinterpret_cast<atomic_append_detail::lock_request *>(batchread[0].data());
378 
379  // Skip all completed lock requests or not mentioning any of my entities
380  for(; record >= firstrecord; record_offset -= sizeof(atomic_append_detail::lock_request), --record)
381  {
382  // If a completed lock request, skip
383  if(!record->hash && (record->unique_id == 0u))
384  {
385  continue;
386  }
387  // If record hash doesn't match contents it's a torn read, reload
388  if(!_skip_hashing)
389  {
390  if(record->hash != QUICKCPPLIB_NAMESPACE::algorithm::hash::fast_hash::hash((reinterpret_cast<const char *>(record)) + 16, sizeof(atomic_append_detail::lock_request) - 16))
391  {
392  goto reload;
393  }
394  }
395 
396  // Does this record lock anything I am locking?
397  for(const auto &entity : out.entities)
398  {
399  for(size_t n = 0; n < record->items; n++)
400  {
401  if(record->entities[n].value == entity.value)
402  {
403  // Is the lock I want exclusive or the lock he wants exclusive?
404  // If so, need to block
405  if((record->entities[n].exclusive != 0u) || (entity.exclusive != 0u))
406  {
407  goto beginwait;
408  }
409  }
410  }
411  }
412  }
413  // None of this batch of records has anything to do with my request, so keep going
414  continue;
415 
416  beginwait:
417  // Sleep until this record is freed using a shared lock
418  // on the record in our way. Note there is a race here
419  // between when the lock requester writes the lock
420  // request and when he takes an exclusive lock on it,
421  // so if our shared lock succeeds we need to immediately
422  // unlock and retry based on the data.
423  std::this_thread::yield();
424  if(!spin_not_sleep)
425  {
426  deadline nd;
427  if(d)
428  {
429  if((d).steady)
430  {
431  std::chrono::nanoseconds ns = std::chrono::duration_cast<std::chrono::nanoseconds>((began_steady + std::chrono::nanoseconds((d).nsecs)) - std::chrono::steady_clock::now());
432  if(ns.count() < 0)
433  {
434  (nd).nsecs = 0;
435  }
436  else
437  {
438  (nd).nsecs = ns.count();
439  }
440  }
441  else
442  {
443  (nd) = (d);
444  }
445  }
446  auto lock_offset = record_offset;
447  // Set the top bit to use the shadow lock space on Windows
448  lock_offset |= (1ULL << 63U);
449  OUTCOME_TRYV(_h.lock(lock_offset, sizeof(*record), false, nd));
450  }
451  // Make sure we haven't timed out during this wait
452  if(d)
453  {
454  if((d).steady)
455  {
456  if(std::chrono::steady_clock::now() >= (began_steady + std::chrono::nanoseconds((d).nsecs)))
457  {
458  return errc::timed_out;
459  }
460  }
461  else
462  {
463  if(std::chrono::system_clock::now() >= end_utc)
464  {
465  return errc::timed_out;
466  }
467  }
468  }
469  } while(record_offset >= _header.first_known_good);
470  return success();
471  }
virtual io_result< const_buffers_type > write(io_request< const_buffers_type > reqs, deadline d=deadline()) noexcept
Write data to the open handle.
io_result< size_type > read(extent_type offset, std::initializer_list< buffer_type > lst, deadline d=deadline()) noexcept
Convenience initialiser list based overload for read()
Definition: file_handle.hpp:244
virtual result< extent_guard > lock(extent_type offset, extent_type bytes, bool exclusive=true, deadline d=deadline()) noexcept
Tries to lock the range of bytes specified for shared or exclusive access. Be aware this passes throu...
virtual result< void > set_append_only(bool enable) noexcept
virtual result< extent_type > maximum_extent() const noexcept

◆ fs_mutex_append()

static result<atomic_append> llfio_v2_xxx::algorithm::shared_fs_mutex::atomic_append::fs_mutex_append ( const path_handle base,
path_view  lockfile,
bool  nfs_compatibility = false,
bool  skip_hashing = false 
)
inlinestaticnoexcept

Initialises a shared filing system mutex using the file at lockfile

Returns
An implementation of shared_fs_mutex using the atomic_append algorithm.
Parameters
baseOptional base for the path to the file.
lockfileThe path to the file to use for IPC.
nfs_compatibilityMake this true if the lockfile could be accessed by NFS.
skip_hashingSome filing systems (typically the copy on write ones e.g. ZFS, btrfs) guarantee atomicity of updates and therefore torn writes are never observed by readers. For these, hashing can be safely disabled.
Todo:
fs_mutex_append needs to check if file still exists after lock is granted, awaiting path fetching.
204  {
205  LLFIO_LOG_FUNCTION_CALL(0);
207  atomic_append_detail::header header;
208  // Lock the entire header for exclusive access
209  auto lockresult = ret.try_lock(0, sizeof(header), true);
210  //! \todo fs_mutex_append needs to check if file still exists after lock is granted, awaiting path fetching.
211  if(lockresult.has_error())
212  {
213  if(lockresult.error() != errc::timed_out)
214  {
215  return std::move(lockresult).error();
216  }
217  // Somebody else is also using this file
218  }
219  else
220  {
221  // I am the first person to be using this (stale?) file, so write a new header and truncate
222  OUTCOME_TRYV(ret.truncate(sizeof(header)));
223  memset(&header, 0, sizeof(header));
224  header.time_offset = std::chrono::system_clock::to_time_t(std::chrono::system_clock::now());
225  header.first_known_good = sizeof(header);
226  header.first_after_hole_punch = sizeof(header);
227  if(!skip_hashing)
228  {
229  header.hash = QUICKCPPLIB_NAMESPACE::algorithm::hash::fast_hash::hash((reinterpret_cast<char *>(&header)) + 16, sizeof(header) - 16);
230  }
231  OUTCOME_TRYV(ret.write(0, {{reinterpret_cast<byte *>(&header), sizeof(header)}}));
232  }
233  // Open a shared lock on last byte in header to prevent other users zomping the file
234  OUTCOME_TRY(guard, ret.lock(sizeof(header) - 1, 1, false));
235  // Unlock any exclusive lock I gained earlier now
236  if(lockresult)
237  {
238  lockresult.value().unlock();
239  }
240  // The constructor will read and cache the header
241  return atomic_append(std::move(ret), std::move(guard), nfs_compatibility, skip_hashing);
242  }
static result< file_handle > file(const path_handle &base, path_view_type path, mode _mode=mode::read, creation _creation=creation::open_existing, caching _caching=caching::all, flag flags=flag::none) noexcept
Ability to read and write (READ_CONTROL|FILE_READ_DATA|FILE_READ_ATTRIBUTES|FILE_READ_EA|FILE_WRITE_D...
Cache reads and writes of data and metadata so they complete immediately, only sending any updates to...
If filesystem entry exists that is used, else one is created.

The documentation for this class was generated from the following file: