Welcome to mirror list, hosted at ThFree Co, Russian Federation.

github.com/moses-smt/mosesdecoder.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
Diffstat (limited to 'moses-cmd/src/ThreadPool.cpp')
-rw-r--r--moses-cmd/src/ThreadPool.cpp97
1 files changed, 97 insertions, 0 deletions
diff --git a/moses-cmd/src/ThreadPool.cpp b/moses-cmd/src/ThreadPool.cpp
new file mode 100644
index 000000000..cbed7c4d9
--- /dev/null
+++ b/moses-cmd/src/ThreadPool.cpp
@@ -0,0 +1,97 @@
+// $Id: $
+
+/***********************************************************************
+Moses - factored phrase-based language decoder
+Copyright (C) 2009 University of Edinburgh
+
+This library is free software; you can redistribute it and/or
+modify it under the terms of the GNU Lesser General Public
+License as published by the Free Software Foundation; either
+version 2.1 of the License, or (at your option) any later version.
+
+This library is distributed in the hope that it will be useful,
+but WITHOUT ANY WARRANTY; without even the implied warranty of
+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+Lesser General Public License for more details.
+
+You should have received a copy of the GNU Lesser General Public
+License along with this library; if not, write to the Free Software
+Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+***********************************************************************/
+
+
+#include "ThreadPool.h"
+
+using namespace std;
+using namespace Moses;
+
+Moses::ThreadPool::ThreadPool( size_t numThreads )
+ : m_stopped(false), m_stopping(false)
+{
+ for (size_t i = 0; i < numThreads; ++i) {
+ m_threads.create_thread(boost::bind(&ThreadPool::Execute,this));
+ }
+}
+
+void Moses::ThreadPool::Execute()
+{
+ do {
+ Task* task = NULL;
+ { // Find a job to perform
+ boost::mutex::scoped_lock lock(m_mutex);
+ if (m_tasks.empty() && !m_stopped) {
+ m_threadNeeded.wait(lock);
+ }
+ if (!m_stopped && !m_tasks.empty()) {
+ task = m_tasks.front();
+ m_tasks.pop();
+ }
+ }
+ //Execute job
+ if (task) {
+ task->Run();
+ delete task;
+ }
+ m_threadAvailable.notify_all();
+ } while (!m_stopped);
+#if defined(BOOST_HAS_PTHREADS)
+ TRACE_ERR("Thread " << (int)pthread_self() << " exiting" << endl);
+#endif
+}
+
+void Moses::ThreadPool::Submit( Task* task )
+{
+ boost::mutex::scoped_lock lock(m_mutex);
+ if (m_stopping) {
+ throw runtime_error("ThreadPool stopping - unable to accept new jobs");
+ }
+ m_tasks.push(task);
+ m_threadNeeded.notify_all();
+
+}
+
+void Moses::ThreadPool::Stop(bool processRemainingJobs)
+{
+ {
+ //prevent more jobs from being added to the queue
+ boost::mutex::scoped_lock lock(m_mutex);
+ if (m_stopped) return;
+ m_stopping = true;
+ }
+ if (processRemainingJobs) {
+ boost::mutex::scoped_lock lock(m_mutex);
+ //wait for queue to drain.
+ while (!m_tasks.empty() && !m_stopped) {
+ m_threadAvailable.wait(lock);
+ }
+ }
+ //tell all threads to stop
+ {
+ boost::mutex::scoped_lock lock(m_mutex);
+ m_stopped = true;
+ }
+ m_threadNeeded.notify_all();
+
+ cerr << m_threads.size() << endl;
+ m_threads.join_all();
+}