diff options
author | Kenneth Heafield <github@kheafield.com> | 2021-05-07 12:33:54 +0300 |
---|---|---|
committer | Kenneth Heafield <github@kheafield.com> | 2021-05-07 12:33:54 +0300 |
commit | 6a366eb9b80ca7de635caa849c89c976c51fa11d (patch) | |
tree | 6352367162d2baa2569dd14f7aa53e22970466f3 | |
parent | 7e50a594345e96c55f1467d059b744d274632e2d (diff) |
Copy pcqueue from preprocesssemaphore
-rw-r--r-- | util/pcqueue.hh | 99 |
1 files changed, 33 insertions, 66 deletions
diff --git a/util/pcqueue.hh b/util/pcqueue.hh index 3c13908..b7cfade 100644 --- a/util/pcqueue.hh +++ b/util/pcqueue.hh @@ -14,12 +14,10 @@ #include <mach/task.h> #include <mach/mach_traps.h> #include <mach/mach.h> -#elif defined(__linux) -#include <semaphore.h> #elif defined(_WIN32) || defined(_WIN64) #include <windows.h> #else -#include <boost/interprocess/sync/interprocess_semaphore.hpp> +#include <semaphore.h> #endif namespace util { @@ -55,100 +53,69 @@ class Semaphore { task_t task_; }; -inline void WaitSemaphore(Semaphore &semaphore) { - semaphore.wait(); -} - -#elif defined(__linux) +#elif defined(_WIN32) || defined(_WIN64) class Semaphore { public: - explicit Semaphore(unsigned int value) { - UTIL_THROW_IF(sem_init(&sem_, 0, value), ErrnoException, "Could not create semaphore"); + explicit Semaphore(LONG value) : sem_(CreateSemaphoreA(NULL, value, 2147483647, NULL)) { + UTIL_THROW_IF(!sem_, Exception, "Could not CreateSemaphore " << GetLastError()); } ~Semaphore() { - if (-1 == sem_destroy(&sem_)) { - std::cerr << "Could not destroy semaphore " << ErrnoException().what() << std::endl; - abort(); - } + CloseHandle(sem_); } + void wait() { - while (UTIL_UNLIKELY(-1 == sem_wait(&sem_))) { - UTIL_THROW_IF(errno != EINTR, ErrnoException, "Wait for semaphore failed"); + switch (WaitForSingleObject(sem_, INFINITE)) { + case WAIT_OBJECT_0: + return; + case WAIT_ABANDONED: + UTIL_THROW(Exception, "A semaphore can't be abandoned, confused by Windows"); + case WAIT_TIMEOUT: + UTIL_THROW(Exception, "Timeout on an infinite wait?!"); + case WAIT_FAILED: + UTIL_THROW(Exception, "Waiting on Semaphore failed " << GetLastError()); } } void post() { - UTIL_THROW_IF(-1 == sem_post(&sem_), ErrnoException, "Could not post to semaphore"); + UTIL_THROW_IF(!ReleaseSemaphore(sem_, 1, NULL), Exception, "Failed to release Semaphore " << GetLastError()); } private: - sem_t sem_; + HANDLE sem_; }; -inline void WaitSemaphore(Semaphore &semaphore) { - semaphore.wait(); -} - -#elif defined(_WIN32) || defined(_WIN64) +#else // Linux and BSD class Semaphore { public: - explicit Semaphore(LONG value) : sem_(CreateSemaphoreA(NULL, value, 2147483647, NULL)) { - UTIL_THROW_IF(!sem_, Exception, "Could not CreateSemaphore " << GetLastError()); + explicit Semaphore(unsigned int value) { + UTIL_THROW_IF(sem_init(&sem_, 0, value), ErrnoException, "Could not create semaphore"); } ~Semaphore() { - CloseHandle(sem_); + if (-1 == sem_destroy(&sem_)) { + std::cerr << "Could not destroy semaphore" << std::endl; + abort(); + } } - void wait() { - while (true) { - switch (WaitForSingleObject(sem_, 0L)) { - case WAIT_OBJECT_0: - return; - case WAIT_ABANDONED: - UTIL_THROW(Exception, "A semaphore can't be abandoned, confused by Windows"); - case WAIT_TIMEOUT: - continue; - case WAIT_FAILED: - UTIL_THROW(Exception, "Waiting on Semaphore failed " << GetLastError()); - } + while (-1 == sem_wait(&sem_)) { + UTIL_THROW_IF(errno != EINTR, ErrnoException, "Wait for semaphore failed"); } } void post() { - UTIL_THROW_IF(!ReleaseSemaphore(sem_, 1, NULL), Exception, "Failed to release Semaphore " << GetLastError()); + UTIL_THROW_IF(-1 == sem_post(&sem_), ErrnoException, "Could not post to semaphore"); } private: - HANDLE sem_; + sem_t sem_; }; -inline void WaitSemaphore(Semaphore &semaphore) { - semaphore.wait(); -} - -#else -typedef boost::interprocess::interprocess_semaphore Semaphore; - -inline void WaitSemaphore (Semaphore &on) { - while (1) { - try { - on.wait(); - break; - } - catch (boost::interprocess::interprocess_exception &e) { - if (e.get_native_error() != EINTR) { - throw; - } - } - } -} - #endif // Cases for semaphore support /** @@ -169,7 +136,7 @@ template <class T> class PCQueue { // Add a value to the queue. void Produce(const T &val) { - WaitSemaphore(empty_); + empty_.wait(); { std::lock_guard<std::mutex> produce_lock(produce_at_mutex_); try { @@ -185,7 +152,7 @@ template <class T> class PCQueue { // Add a value to the queue, but swap it into place. void ProduceSwap(T &val) { - WaitSemaphore(empty_); + empty_.wait(); { std::lock_guard<std::mutex> produce_lock(produce_at_mutex_); try { @@ -202,7 +169,7 @@ template <class T> class PCQueue { // Consume a value, assigning it to out. T& Consume(T &out) { - WaitSemaphore(used_); + used_.wait(); { std::lock_guard<std::mutex> consume_lock(consume_at_mutex_); try { @@ -219,7 +186,7 @@ template <class T> class PCQueue { // Consume a value, swapping it to out. T& ConsumeSwap(T &out) { - WaitSemaphore(used_); + used_.wait(); { std::lock_guard<std::mutex> consume_lock(consume_at_mutex_); try { @@ -290,7 +257,7 @@ template <class T> class UnboundedSingleQueue { } T& Consume(T &out) { - WaitSemaphore(valid_); + valid_.wait(); if (reading_current_ == reading_end_) { SetReading(reading_->next); } |