Welcome to mirror list, hosted at ThFree Co, Russian Federation.

github.com/moses-smt/mosesdecoder.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
Diffstat (limited to 'moses/TranslationModel/UG/mm/ug_bitext_agenda.h')
-rw-r--r--moses/TranslationModel/UG/mm/ug_bitext_agenda.h186
1 files changed, 186 insertions, 0 deletions
diff --git a/moses/TranslationModel/UG/mm/ug_bitext_agenda.h b/moses/TranslationModel/UG/mm/ug_bitext_agenda.h
new file mode 100644
index 000000000..a9632c056
--- /dev/null
+++ b/moses/TranslationModel/UG/mm/ug_bitext_agenda.h
@@ -0,0 +1,186 @@
+// -*- c++ -*-
+// to be included from ug_bitext.h
+
+// The agenda handles parallel sampling.
+// It maintains a queue of unfinished sampling jobs and
+// assigns them to a pool of workers.
+//
+template<typename Token>
+class Bitext<Token>
+::agenda
+{
+public:
+ class job;
+ class worker;
+private:
+ boost::mutex lock;
+ std::list<sptr<job> > joblist;
+ std::vector<sptr<boost::thread> > workers;
+ bool shutdown;
+ size_t doomed;
+
+public:
+
+
+ Bitext<Token> const& bt;
+
+ agenda(Bitext<Token> const& bitext);
+ ~agenda();
+
+ void
+ add_workers(int n);
+
+ sptr<pstats>
+ add_job(Bitext<Token> const* const theBitext,
+ typename TSA<Token>::tree_iterator const& phrase,
+ size_t const max_samples, sptr<SamplingBias const> const& bias);
+ // add_job(Bitext<Token> const* const theBitext,
+ // typename TSA<Token>::tree_iterator const& phrase,
+ // size_t const max_samples, SamplingBias const* const bias);
+
+ sptr<job>
+ get_job();
+};
+
+template<typename Token>
+class
+Bitext<Token>::agenda::
+worker
+{
+ agenda& ag;
+public:
+ worker(agenda& a) : ag(a) {}
+ void operator()();
+};
+
+#include "ug_bitext_agenda_worker.h"
+#include "ug_bitext_agenda_job.h"
+
+template<typename Token>
+void Bitext<Token>
+::agenda
+::add_workers(int n)
+{
+ static boost::posix_time::time_duration nodelay(0,0,0,0);
+ boost::lock_guard<boost::mutex> guard(this->lock);
+
+ int target = max(1, int(n + workers.size() - this->doomed));
+ // house keeping: remove all workers that have finished
+ for (size_t i = 0; i < workers.size(); )
+ {
+ if (workers[i]->timed_join(nodelay))
+ {
+ if (i + 1 < workers.size())
+ workers[i].swap(workers.back());
+ workers.pop_back();
+ }
+ else ++i;
+ }
+ // cerr << workers.size() << "/" << target << " active" << endl;
+ if (int(workers.size()) > target)
+ this->doomed = workers.size() - target;
+ else
+ while (int(workers.size()) < target)
+ {
+ sptr<boost::thread> w(new boost::thread(worker(*this)));
+ workers.push_back(w);
+ }
+}
+
+
+template<typename Token>
+sptr<pstats> Bitext<Token>
+::agenda
+::add_job(Bitext<Token> const* const theBitext,
+ typename TSA<Token>::tree_iterator const& phrase,
+ size_t const max_samples, sptr<SamplingBias const> const& bias)
+{
+ boost::unique_lock<boost::mutex> lk(this->lock);
+ static boost::posix_time::time_duration nodelay(0,0,0,0);
+ bool fwd = phrase.root == bt.I1.get();
+ sptr<job> j(new job(theBitext, phrase, fwd ? bt.I1 : bt.I2,
+ max_samples, fwd, bias));
+ j->stats->register_worker();
+
+ joblist.push_back(j);
+ if (joblist.size() == 1)
+ {
+ size_t i = 0;
+ while (i < workers.size())
+ {
+ if (workers[i]->timed_join(nodelay))
+ {
+ if (doomed)
+ {
+ if (i+1 < workers.size())
+ workers[i].swap(workers.back());
+ workers.pop_back();
+ --doomed;
+ }
+ else
+ workers[i++] = sptr<boost::thread>(new boost::thread(worker(*this)));
+ }
+ else ++i;
+ }
+ }
+ return j->stats;
+}
+
+template<typename Token>
+sptr<typename Bitext<Token>::agenda::job>
+Bitext<Token>
+::agenda
+::get_job()
+{
+ // cerr << workers.size() << " workers on record" << endl;
+ sptr<job> ret;
+ if (this->shutdown) return ret;
+ boost::unique_lock<boost::mutex> lock(this->lock);
+ if (this->doomed)
+ { // the number of workers has been reduced, tell the redundant once to quit
+ --this->doomed;
+ return ret;
+ }
+
+ typename list<sptr<job> >::iterator j = joblist.begin();
+ while (j != joblist.end())
+ {
+ if ((*j)->done())
+ {
+ (*j)->stats->release();
+ joblist.erase(j++);
+ }
+ else if ((*j)->workers >= 4) ++j; // no more than 4 workers per job
+ else break; // found one
+ }
+ if (joblist.size())
+ {
+ ret = j == joblist.end() ? joblist.front() : *j;
+ // if we've reached the end of the queue (all jobs have 4 workers on them),
+ // take the first in the queue
+ boost::lock_guard<boost::mutex> jguard(ret->lock);
+ ++ret->workers;
+ }
+ return ret;
+}
+
+template<typename Token>
+Bitext<Token>::
+agenda::
+~agenda()
+{
+ this->lock.lock();
+ this->shutdown = true;
+ this->lock.unlock();
+ for (size_t i = 0; i < workers.size(); ++i)
+ workers[i]->join();
+}
+
+template<typename Token>
+Bitext<Token>::
+agenda::
+agenda(Bitext<Token> const& thebitext)
+ : shutdown(false), doomed(0), bt(thebitext)
+{ }
+
+