Welcome to mirror list, hosted at ThFree Co, Russian Federation.

github.com/moses-smt/mosesdecoder.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKenneth Heafield <github@kheafield.com>2013-01-18 19:58:54 +0400
committerKenneth Heafield <github@kheafield.com>2013-01-18 19:59:51 +0400
commitfc5868d0fff647c3879668af4bfe6e1bab9e83ab (patch)
treed9b5026f32cc5ae603e6f42b1fc7128aeee4ba55 /util/pcqueue.hh
parent5f7b91e702f809577d82a7570778d254d985ba93 (diff)
KenLM df5be22 lmplz for estimation
Diffstat (limited to 'util/pcqueue.hh')
-rw-r--r--util/pcqueue.hh105
1 files changed, 105 insertions, 0 deletions
diff --git a/util/pcqueue.hh b/util/pcqueue.hh
new file mode 100644
index 000000000..3df8749b1
--- /dev/null
+++ b/util/pcqueue.hh
@@ -0,0 +1,105 @@
+#ifndef UTIL_PCQUEUE__
+#define UTIL_PCQUEUE__
+
+#include <boost/interprocess/sync/interprocess_semaphore.hpp>
+#include <boost/scoped_array.hpp>
+#include <boost/thread/mutex.hpp>
+#include <boost/utility.hpp>
+
+#include <errno.h>
+
+namespace util {
+
+inline void WaitSemaphore (boost::interprocess::interprocess_semaphore &on) {
+ while (1) {
+ try {
+ on.wait();
+ break;
+ }
+ catch (boost::interprocess::interprocess_exception &e) {
+ if (e.get_native_error() != EINTR) throw;
+ }
+ }
+}
+
+/* Producer consumer queue safe for multiple producers and multiple consumers.
+ * T must be default constructable and have operator=.
+ * The value is copied twice for Consume(T &out) or three times for Consume(),
+ * so larger objects should be passed via pointer.
+ * Strong exception guarantee if operator= throws. Undefined if semaphores throw.
+ */
+template <class T> class PCQueue : boost::noncopyable {
+ public:
+ explicit PCQueue(size_t size)
+ : empty_(size), used_(0),
+ storage_(new T[size]),
+ end_(storage_.get() + size),
+ produce_at_(storage_.get()),
+ consume_at_(storage_.get()) {}
+
+ // Add a value to the queue.
+ void Produce(const T &val) {
+ WaitSemaphore(empty_);
+ {
+ boost::unique_lock<boost::mutex> produce_lock(produce_at_mutex_);
+ try {
+ *produce_at_ = val;
+ }
+ catch (...) {
+ empty_.post();
+ throw;
+ }
+ if (++produce_at_ == end_) produce_at_ = storage_.get();
+ }
+ used_.post();
+ }
+
+ // Consume a value, assigning it to out.
+ T& Consume(T &out) {
+ WaitSemaphore(used_);
+ {
+ boost::unique_lock<boost::mutex> consume_lock(consume_at_mutex_);
+ try {
+ out = *consume_at_;
+ }
+ catch (...) {
+ used_.post();
+ throw;
+ }
+ if (++consume_at_ == end_) consume_at_ = storage_.get();
+ }
+ empty_.post();
+ return out;
+ }
+
+ // Convenience version of Consume that copies the value to return.
+ // The other version is faster.
+ T Consume() {
+ T ret;
+ Consume(ret);
+ return ret;
+ }
+
+ private:
+ // Number of empty spaces in storage_.
+ boost::interprocess::interprocess_semaphore empty_;
+ // Number of occupied spaces in storage_.
+ boost::interprocess::interprocess_semaphore used_;
+
+ boost::scoped_array<T> storage_;
+
+ T *const end_;
+
+ // Index for next write in storage_.
+ T *produce_at_;
+ boost::mutex produce_at_mutex_;
+
+ // Index for next read from storage_.
+ T *consume_at_;
+ boost::mutex consume_at_mutex_;
+
+};
+
+} // namespace util
+
+#endif // UTIL_PCQUEUE__