// -*- c++ -*- // to be included from ug_bitext.h // The agenda handles parallel sampling. // It maintains a queue of unfinished sampling jobs and // assigns them to a pool of workers. // template class Bitext ::agenda { public: class job; class worker; private: boost::mutex lock; std::list > joblist; std::vector > workers; bool shutdown; size_t doomed; public: Bitext const& bt; agenda(Bitext const& bitext); ~agenda(); void add_workers(int n); sptr add_job(Bitext const* const theBitext, typename TSA::tree_iterator const& phrase, size_t const max_samples, sptr const& bias); // add_job(Bitext const* const theBitext, // typename TSA::tree_iterator const& phrase, // size_t const max_samples, SamplingBias const* const bias); sptr get_job(); }; template class Bitext::agenda:: worker { agenda& ag; public: worker(agenda& a) : ag(a) {} void operator()(); }; #include "ug_bitext_agenda_worker.h" #include "ug_bitext_agenda_job.h" template void Bitext ::agenda ::add_workers(int n) { static boost::posix_time::time_duration nodelay(0,0,0,0); boost::lock_guard guard(this->lock); int target = max(1, int(n + workers.size() - this->doomed)); // house keeping: remove all workers that have finished for (size_t i = 0; i < workers.size(); ) { if (workers[i]->timed_join(nodelay)) { if (i + 1 < workers.size()) workers[i].swap(workers.back()); workers.pop_back(); } else ++i; } // cerr << workers.size() << "/" << target << " active" << endl; if (int(workers.size()) > target) this->doomed = workers.size() - target; else while (int(workers.size()) < target) { sptr w(new boost::thread(worker(*this))); workers.push_back(w); } } template sptr Bitext ::agenda ::add_job(Bitext const* const theBitext, typename TSA::tree_iterator const& phrase, size_t const max_samples, sptr const& bias) { boost::unique_lock lk(this->lock); static boost::posix_time::time_duration nodelay(0,0,0,0); bool fwd = phrase.root == bt.I1.get(); sptr j(new job(theBitext, phrase, fwd ? bt.I1 : bt.I2, max_samples, fwd, bias)); j->stats->register_worker(); joblist.push_back(j); if (joblist.size() == 1) { size_t i = 0; while (i < workers.size()) { if (workers[i]->timed_join(nodelay)) { if (doomed) { if (i+1 < workers.size()) workers[i].swap(workers.back()); workers.pop_back(); --doomed; } else workers[i++] = sptr(new boost::thread(worker(*this))); } else ++i; } } return j->stats; } template sptr::agenda::job> Bitext ::agenda ::get_job() { // cerr << workers.size() << " workers on record" << endl; sptr ret; if (this->shutdown) return ret; boost::unique_lock lock(this->lock); if (this->doomed) { // the number of workers has been reduced, tell the redundant once to quit --this->doomed; return ret; } typename list >::iterator j = joblist.begin(); while (j != joblist.end()) { if ((*j)->done()) { (*j)->stats->release(); joblist.erase(j++); } else if ((*j)->workers >= 4) ++j; // no more than 4 workers per job else break; // found one } if (joblist.size()) { ret = j == joblist.end() ? joblist.front() : *j; // if we've reached the end of the queue (all jobs have 4 workers on them), // take the first in the queue boost::lock_guard jguard(ret->lock); ++ret->workers; } return ret; } template Bitext:: agenda:: ~agenda() { this->lock.lock(); this->shutdown = true; this->lock.unlock(); for (size_t i = 0; i < workers.size(); ++i) workers[i]->join(); } template Bitext:: agenda:: agenda(Bitext const& thebitext) : shutdown(false), doomed(0), bt(thebitext) { }