Welcome to mirror list, hosted at ThFree Co, Russian Federation.

github.com/kpu/kenlm.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKenneth Heafield <github@kheafield.com>2021-04-21 12:46:02 +0300
committerKenneth Heafield <github@kheafield.com>2021-04-21 12:46:02 +0300
commit2ae8ab13e6dc114fdc5d3765a0898dcc3dcfede3 (patch)
tree2802f9aca1dcafd4089d7e64e4ce64453c91a355
parent0c4dd4e8a29a9bcaf22d971a83f4974f1a16d6d9 (diff)
Copy semaphore over from preprocess
-rw-r--r--util/pcqueue.hh181
1 files changed, 160 insertions, 21 deletions
diff --git a/util/pcqueue.hh b/util/pcqueue.hh
index 05c868f..205d6b2 100644
--- a/util/pcqueue.hh
+++ b/util/pcqueue.hh
@@ -3,19 +3,22 @@
#include "util/exception.hh"
-#include <boost/interprocess/sync/interprocess_semaphore.hpp>
-#include <boost/scoped_array.hpp>
-#include <boost/thread/mutex.hpp>
-#include <boost/utility.hpp>
-
+#include <algorithm>
#include <cerrno>
+#include <iostream>
+#include <memory>
+#include <mutex>
#ifdef __APPLE__
#include <mach/semaphore.h>
#include <mach/task.h>
#include <mach/mach_traps.h>
#include <mach/mach.h>
-#endif // __APPLE__
+#elif defined(__linux)
+#include <semaphore.h>
+#else
+#include <boost/interprocess/sync/interprocess_semaphore.hpp>
+#endif
namespace util {
@@ -24,24 +27,25 @@ namespace util {
*/
#ifdef __APPLE__
-#define MACH_CALL(call) UTIL_THROW_IF(KERN_SUCCESS != (call), Exception, "Mach call failure")
-
class Semaphore {
public:
explicit Semaphore(int value) : task_(mach_task_self()) {
- MACH_CALL(semaphore_create(task_, &back_, SYNC_POLICY_FIFO, value));
+ UTIL_THROW_IF(KERN_SUCCESS != semaphore_create(task_, &back_, SYNC_POLICY_FIFO, value), ErrnoException, "Could not create semaphore");
}
~Semaphore() {
- MACH_CALL(semaphore_destroy(task_, back_));
+ if (KERN_SUCCESS != semaphore_destroy(task_, back_)) {
+ std::cerr << "Could not destroy semaphore" << std::endl;
+ abort();
+ }
}
void wait() {
- MACH_CALL(semaphore_wait(back_));
+ UTIL_THROW_IF(KERN_SUCCESS != semaphore_wait(back_), Exception, "Wait for semaphore failed");
}
void post() {
- MACH_CALL(semaphore_signal(back_));
+ UTIL_THROW_IF(KERN_SUCCESS != semaphore_signal(back_), Exception, "Could not post to semaphore");
}
private:
@@ -53,6 +57,39 @@ inline void WaitSemaphore(Semaphore &semaphore) {
semaphore.wait();
}
+#elif defined(__linux)
+
+class Semaphore {
+ public:
+ explicit Semaphore(unsigned int value) {
+ UTIL_THROW_IF(sem_init(&sem_, 0, value), ErrnoException, "Could not create semaphore");
+ }
+
+ ~Semaphore() {
+ if (-1 == sem_destroy(&sem_)) {
+ std::cerr << "Could not destroy semaphore " << ErrnoException().what() << std::endl;
+ abort();
+ }
+ }
+
+ void wait() {
+ while (UTIL_UNLIKELY(-1 == sem_wait(&sem_))) {
+ UTIL_THROW_IF(errno != EINTR, ErrnoException, "Wait for semaphore failed");
+ }
+ }
+
+ void post() {
+ UTIL_THROW_IF(-1 == sem_post(&sem_), ErrnoException, "Could not post to semaphore");
+ }
+
+ private:
+ sem_t sem_;
+};
+
+inline void WaitSemaphore(Semaphore &semaphore) {
+ semaphore.wait();
+}
+
#else
typedef boost::interprocess::interprocess_semaphore Semaphore;
@@ -70,7 +107,7 @@ inline void WaitSemaphore (Semaphore &on) {
}
}
-#endif // __APPLE__
+#endif // Apple
/**
* Producer consumer queue safe for multiple producers and multiple consumers.
@@ -79,7 +116,7 @@ inline void WaitSemaphore (Semaphore &on) {
* so larger objects should be passed via pointer.
* Strong exception guarantee if operator= throws. Undefined if semaphores throw.
*/
-template <class T> class PCQueue : boost::noncopyable {
+template <class T> class PCQueue {
public:
explicit PCQueue(size_t size)
: empty_(size), used_(0),
@@ -92,11 +129,26 @@ template <class T> class PCQueue : boost::noncopyable {
void Produce(const T &val) {
WaitSemaphore(empty_);
{
- boost::unique_lock<boost::mutex> produce_lock(produce_at_mutex_);
+ std::lock_guard<std::mutex> produce_lock(produce_at_mutex_);
try {
*produce_at_ = val;
+ } catch (...) {
+ empty_.post();
+ throw;
}
- catch (...) {
+ if (++produce_at_ == end_) produce_at_ = storage_.get();
+ }
+ used_.post();
+ }
+
+ // Add a value to the queue, but swap it into place.
+ void ProduceSwap(T &val) {
+ WaitSemaphore(empty_);
+ {
+ std::lock_guard<std::mutex> produce_lock(produce_at_mutex_);
+ try {
+ std::swap(*produce_at_, val);
+ } catch (...) {
empty_.post();
throw;
}
@@ -105,15 +157,32 @@ template <class T> class PCQueue : boost::noncopyable {
used_.post();
}
+
// Consume a value, assigning it to out.
T& Consume(T &out) {
WaitSemaphore(used_);
{
- boost::unique_lock<boost::mutex> consume_lock(consume_at_mutex_);
+ std::lock_guard<std::mutex> consume_lock(consume_at_mutex_);
try {
out = *consume_at_;
+ } catch (...) {
+ used_.post();
+ throw;
}
- catch (...) {
+ if (++consume_at_ == end_) consume_at_ = storage_.get();
+ }
+ empty_.post();
+ return out;
+ }
+
+ // Consume a value, swapping it to out.
+ T& ConsumeSwap(T &out) {
+ WaitSemaphore(used_);
+ {
+ std::lock_guard<std::mutex> consume_lock(consume_at_mutex_);
+ try {
+ std::swap(out, *consume_at_);
+ } catch (...) {
used_.post();
throw;
}
@@ -123,6 +192,7 @@ template <class T> class PCQueue : boost::noncopyable {
return out;
}
+
// Convenience version of Consume that copies the value to return.
// The other version is faster.
T Consume() {
@@ -137,18 +207,87 @@ template <class T> class PCQueue : boost::noncopyable {
// Number of occupied spaces in storage_.
Semaphore used_;
- boost::scoped_array<T> storage_;
+ std::unique_ptr<T[]> storage_;
T *const end_;
// Index for next write in storage_.
T *produce_at_;
- boost::mutex produce_at_mutex_;
+ std::mutex produce_at_mutex_;
// Index for next read from storage_.
T *consume_at_;
- boost::mutex consume_at_mutex_;
+ std::mutex consume_at_mutex_;
+};
+
+template <class T> struct UnboundedPage {
+ UnboundedPage() : next(nullptr) {}
+ UnboundedPage *next;
+ T entries[1023];
+};
+
+template <class T> class UnboundedSingleQueue {
+ public:
+ UnboundedSingleQueue() : valid_(0) {
+ SetFilling(new UnboundedPage<T>());
+ SetReading(filling_);
+ }
+
+ void Produce(T &&val) {
+ if (filling_current_ == filling_end_) {
+ UnboundedPage<T> *next = new UnboundedPage<T>();
+ filling_->next = next;
+ SetFilling(next);
+ }
+ *(filling_current_++) = std::move(val);
+ valid_.post();
+ }
+
+ void Produce(const T &val) {
+ Produce(T(val));
+ }
+
+ T& Consume(T &out) {
+ WaitSemaphore(valid_);
+ if (reading_current_ == reading_end_) {
+ SetReading(reading_->next);
+ }
+ out = std::move(*(reading_current_++));
+ return out;
+ }
+
+ // Warning: very much a no-guarantees race-condition-rich implementation!
+ // But sufficient for our specific purpose: The single thread that consumes
+ // is also the only one that checks Empty, and knows that it's racing.
+ bool Empty() const {
+ return reading_current_ == filling_current_;
+ }
+
+ private:
+ void SetFilling(UnboundedPage<T> *to) {
+ filling_ = to;
+ filling_current_ = to->entries;
+ filling_end_ = filling_current_ + sizeof(to->entries) / sizeof(T);
+ }
+ void SetReading(UnboundedPage<T> *to) {
+ reading_.reset(to);
+ reading_current_ = to->entries;
+ reading_end_ = reading_current_ + sizeof(to->entries) / sizeof(T);
+ }
+
+ Semaphore valid_;
+
+ UnboundedPage<T> *filling_;
+
+ std::unique_ptr<UnboundedPage<T> > reading_;
+
+ T *filling_current_;
+ T *filling_end_;
+ T *reading_current_;
+ T *reading_end_;
+ UnboundedSingleQueue(const UnboundedSingleQueue &) = delete;
+ UnboundedSingleQueue &operator=(const UnboundedSingleQueue &) = delete;
};
} // namespace util