diff options
author | Kenneth Heafield <github@kheafield.com> | 2021-04-21 12:46:02 +0300 |
---|---|---|
committer | Kenneth Heafield <github@kheafield.com> | 2021-04-21 12:46:02 +0300 |
commit | 2ae8ab13e6dc114fdc5d3765a0898dcc3dcfede3 (patch) | |
tree | 2802f9aca1dcafd4089d7e64e4ce64453c91a355 | |
parent | 0c4dd4e8a29a9bcaf22d971a83f4974f1a16d6d9 (diff) |
Copy semaphore over from preprocess
-rw-r--r-- | util/pcqueue.hh | 181 |
1 files changed, 160 insertions, 21 deletions
diff --git a/util/pcqueue.hh b/util/pcqueue.hh index 05c868f..205d6b2 100644 --- a/util/pcqueue.hh +++ b/util/pcqueue.hh @@ -3,19 +3,22 @@ #include "util/exception.hh" -#include <boost/interprocess/sync/interprocess_semaphore.hpp> -#include <boost/scoped_array.hpp> -#include <boost/thread/mutex.hpp> -#include <boost/utility.hpp> - +#include <algorithm> #include <cerrno> +#include <iostream> +#include <memory> +#include <mutex> #ifdef __APPLE__ #include <mach/semaphore.h> #include <mach/task.h> #include <mach/mach_traps.h> #include <mach/mach.h> -#endif // __APPLE__ +#elif defined(__linux) +#include <semaphore.h> +#else +#include <boost/interprocess/sync/interprocess_semaphore.hpp> +#endif namespace util { @@ -24,24 +27,25 @@ namespace util { */ #ifdef __APPLE__ -#define MACH_CALL(call) UTIL_THROW_IF(KERN_SUCCESS != (call), Exception, "Mach call failure") - class Semaphore { public: explicit Semaphore(int value) : task_(mach_task_self()) { - MACH_CALL(semaphore_create(task_, &back_, SYNC_POLICY_FIFO, value)); + UTIL_THROW_IF(KERN_SUCCESS != semaphore_create(task_, &back_, SYNC_POLICY_FIFO, value), ErrnoException, "Could not create semaphore"); } ~Semaphore() { - MACH_CALL(semaphore_destroy(task_, back_)); + if (KERN_SUCCESS != semaphore_destroy(task_, back_)) { + std::cerr << "Could not destroy semaphore" << std::endl; + abort(); + } } void wait() { - MACH_CALL(semaphore_wait(back_)); + UTIL_THROW_IF(KERN_SUCCESS != semaphore_wait(back_), Exception, "Wait for semaphore failed"); } void post() { - MACH_CALL(semaphore_signal(back_)); + UTIL_THROW_IF(KERN_SUCCESS != semaphore_signal(back_), Exception, "Could not post to semaphore"); } private: @@ -53,6 +57,39 @@ inline void WaitSemaphore(Semaphore &semaphore) { semaphore.wait(); } +#elif defined(__linux) + +class Semaphore { + public: + explicit Semaphore(unsigned int value) { + UTIL_THROW_IF(sem_init(&sem_, 0, value), ErrnoException, "Could not create semaphore"); + } + + ~Semaphore() { + if (-1 == sem_destroy(&sem_)) { + std::cerr << "Could not destroy semaphore " << ErrnoException().what() << std::endl; + abort(); + } + } + + void wait() { + while (UTIL_UNLIKELY(-1 == sem_wait(&sem_))) { + UTIL_THROW_IF(errno != EINTR, ErrnoException, "Wait for semaphore failed"); + } + } + + void post() { + UTIL_THROW_IF(-1 == sem_post(&sem_), ErrnoException, "Could not post to semaphore"); + } + + private: + sem_t sem_; +}; + +inline void WaitSemaphore(Semaphore &semaphore) { + semaphore.wait(); +} + #else typedef boost::interprocess::interprocess_semaphore Semaphore; @@ -70,7 +107,7 @@ inline void WaitSemaphore (Semaphore &on) { } } -#endif // __APPLE__ +#endif // Apple /** * Producer consumer queue safe for multiple producers and multiple consumers. @@ -79,7 +116,7 @@ inline void WaitSemaphore (Semaphore &on) { * so larger objects should be passed via pointer. * Strong exception guarantee if operator= throws. Undefined if semaphores throw. */ -template <class T> class PCQueue : boost::noncopyable { +template <class T> class PCQueue { public: explicit PCQueue(size_t size) : empty_(size), used_(0), @@ -92,11 +129,26 @@ template <class T> class PCQueue : boost::noncopyable { void Produce(const T &val) { WaitSemaphore(empty_); { - boost::unique_lock<boost::mutex> produce_lock(produce_at_mutex_); + std::lock_guard<std::mutex> produce_lock(produce_at_mutex_); try { *produce_at_ = val; + } catch (...) { + empty_.post(); + throw; } - catch (...) { + if (++produce_at_ == end_) produce_at_ = storage_.get(); + } + used_.post(); + } + + // Add a value to the queue, but swap it into place. + void ProduceSwap(T &val) { + WaitSemaphore(empty_); + { + std::lock_guard<std::mutex> produce_lock(produce_at_mutex_); + try { + std::swap(*produce_at_, val); + } catch (...) { empty_.post(); throw; } @@ -105,15 +157,32 @@ template <class T> class PCQueue : boost::noncopyable { used_.post(); } + // Consume a value, assigning it to out. T& Consume(T &out) { WaitSemaphore(used_); { - boost::unique_lock<boost::mutex> consume_lock(consume_at_mutex_); + std::lock_guard<std::mutex> consume_lock(consume_at_mutex_); try { out = *consume_at_; + } catch (...) { + used_.post(); + throw; } - catch (...) { + if (++consume_at_ == end_) consume_at_ = storage_.get(); + } + empty_.post(); + return out; + } + + // Consume a value, swapping it to out. + T& ConsumeSwap(T &out) { + WaitSemaphore(used_); + { + std::lock_guard<std::mutex> consume_lock(consume_at_mutex_); + try { + std::swap(out, *consume_at_); + } catch (...) { used_.post(); throw; } @@ -123,6 +192,7 @@ template <class T> class PCQueue : boost::noncopyable { return out; } + // Convenience version of Consume that copies the value to return. // The other version is faster. T Consume() { @@ -137,18 +207,87 @@ template <class T> class PCQueue : boost::noncopyable { // Number of occupied spaces in storage_. Semaphore used_; - boost::scoped_array<T> storage_; + std::unique_ptr<T[]> storage_; T *const end_; // Index for next write in storage_. T *produce_at_; - boost::mutex produce_at_mutex_; + std::mutex produce_at_mutex_; // Index for next read from storage_. T *consume_at_; - boost::mutex consume_at_mutex_; + std::mutex consume_at_mutex_; +}; + +template <class T> struct UnboundedPage { + UnboundedPage() : next(nullptr) {} + UnboundedPage *next; + T entries[1023]; +}; + +template <class T> class UnboundedSingleQueue { + public: + UnboundedSingleQueue() : valid_(0) { + SetFilling(new UnboundedPage<T>()); + SetReading(filling_); + } + + void Produce(T &&val) { + if (filling_current_ == filling_end_) { + UnboundedPage<T> *next = new UnboundedPage<T>(); + filling_->next = next; + SetFilling(next); + } + *(filling_current_++) = std::move(val); + valid_.post(); + } + + void Produce(const T &val) { + Produce(T(val)); + } + + T& Consume(T &out) { + WaitSemaphore(valid_); + if (reading_current_ == reading_end_) { + SetReading(reading_->next); + } + out = std::move(*(reading_current_++)); + return out; + } + + // Warning: very much a no-guarantees race-condition-rich implementation! + // But sufficient for our specific purpose: The single thread that consumes + // is also the only one that checks Empty, and knows that it's racing. + bool Empty() const { + return reading_current_ == filling_current_; + } + + private: + void SetFilling(UnboundedPage<T> *to) { + filling_ = to; + filling_current_ = to->entries; + filling_end_ = filling_current_ + sizeof(to->entries) / sizeof(T); + } + void SetReading(UnboundedPage<T> *to) { + reading_.reset(to); + reading_current_ = to->entries; + reading_end_ = reading_current_ + sizeof(to->entries) / sizeof(T); + } + + Semaphore valid_; + + UnboundedPage<T> *filling_; + + std::unique_ptr<UnboundedPage<T> > reading_; + + T *filling_current_; + T *filling_end_; + T *reading_current_; + T *reading_end_; + UnboundedSingleQueue(const UnboundedSingleQueue &) = delete; + UnboundedSingleQueue &operator=(const UnboundedSingleQueue &) = delete; }; } // namespace util |