Welcome to mirror list, hosted at ThFree Co, Russian Federation.

github.com/moses-smt/mosesdecoder.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
path: root/moses
diff options
context:
space:
mode:
authorUlrich Germann <ugermann@inf.ed.ac.uk>2015-03-21 19:12:52 +0300
committerUlrich Germann <ugermann@inf.ed.ac.uk>2015-03-21 19:12:52 +0300
commit8ca11d941d99df42664b32c101020283cc83054e (patch)
treeeac36b128d9bfe22ff598458635ae66cdecbf57b /moses
parent85d2567b57af592a3a902209c7b0f3675576aac7 (diff)
1. Lifetime of tasks in ThreadPool is now managed via shared pointers.
2. Code cleanup in IOWrapper and a bit elsewhere.
Diffstat (limited to 'moses')
-rw-r--r--moses/ExportInterface.cpp64
-rw-r--r--moses/IOWrapper.cpp61
-rw-r--r--moses/IOWrapper.h17
-rw-r--r--moses/ThreadPool.cpp8
-rw-r--r--moses/ThreadPool.h6
-rw-r--r--moses/TranslationModel/CompactPT/BlockHashIndex.h6
-rw-r--r--moses/TranslationTask.cpp87
-rw-r--r--moses/TranslationTask.h50
8 files changed, 183 insertions, 116 deletions
diff --git a/moses/ExportInterface.cpp b/moses/ExportInterface.cpp
index dc7dab931..008878bf3 100644
--- a/moses/ExportInterface.cpp
+++ b/moses/ExportInterface.cpp
@@ -101,36 +101,31 @@ SimpleTranslationInterface::~SimpleTranslationInterface()
//the simplified version of string input/output translation
string SimpleTranslationInterface::translate(const string &inputString)
{
- Moses::IOWrapper ioWrapper;
- long lineCount = Moses::StaticData::Instance().GetStartTranslationId();
+ boost::shared_ptr<Moses::IOWrapper> ioWrapper(new IOWrapper);
// main loop over set of input sentences
- InputType* source = NULL;
size_t sentEnd = inputString.rfind('\n'); //find the last \n, the input stream has to be appended with \n to be translated
const string &newString = sentEnd != string::npos ? inputString : inputString + '\n';
istringstream inputStream(newString); //create the stream for the interface
- ioWrapper.SetInputStreamFromString(inputStream);
+ ioWrapper->SetInputStreamFromString(inputStream);
ostringstream outputStream;
- ioWrapper.SetOutputStream2SingleBestOutputCollector(&outputStream);
- ioWrapper.ReadInput(SimpleTranslationInterface::m_staticData.GetInputType(),source);
- if (source)
- source->SetTranslationId(lineCount);
- else
- return "Error: Source==null!!!";
- IFVERBOSE(1) {
- ResetUserTime();
- }
-
- FeatureFunction::CallChangeSource(source);
+ ioWrapper->SetOutputStream2SingleBestOutputCollector(&outputStream);
- // set up task of translating one sentence
- TranslationTask task = TranslationTask(source, ioWrapper);
- task.Run();
+ boost::shared_ptr<InputType> source = ioWrapper->ReadInput();
+ if (!source) return "Error: Source==null!!!";
+ IFVERBOSE(1) { ResetUserTime(); }
+
+ FeatureFunction::CallChangeSource(&*source);
- string output = outputStream.str();
- //now trim the end whitespace
- const string whitespace = " \t\f\v\n\r";
- size_t end = output.find_last_not_of(whitespace);
+ // set up task of translating one sentence
+ boost::shared_ptr<TranslationTask> task
+ = TranslationTask::create(source, ioWrapper);
+ task->Run();
+
+ string output = outputStream.str();
+ //now trim the end whitespace
+ const string whitespace = " \t\f\v\n\r";
+ size_t end = output.find_last_not_of(whitespace);
return output.erase(end + 1);
}
@@ -174,9 +169,9 @@ run_as_server()
else myAbyssServer.run();
std::cerr << "xmlrpc_c::serverAbyss.run() returned but should not." << std::endl;
-#pragma message("BUILDING MOSES WITH SERVER SUPPORT")
+ // #pragma message("BUILDING MOSES WITH SERVER SUPPORT")
#else
-#pragma message("BUILDING MOSES WITHOUT SERVER SUPPORT")
+ // #pragma message("BUILDING MOSES WITHOUT SERVER SUPPORT")
std::cerr << "Moses was compiled without server support." << endl;
#endif
return 1;
@@ -194,7 +189,8 @@ batch_run()
IFVERBOSE(1) PrintUserTime("Created input-output object");
- IOWrapper* ioWrapper = new IOWrapper(); // set up read/writing class
+ // set up read/writing class:
+ boost::shared_ptr<IOWrapper> ioWrapper(new IOWrapper);
UTIL_THROW_IF2(ioWrapper == NULL, "Error; Failed to create IO object"
<< " [" << HERE << "]");
@@ -212,17 +208,17 @@ batch_run()
#endif
// main loop over set of input sentences
- InputType* source = NULL;
- size_t lineCount = staticData.GetStartTranslationId();
- while(ioWrapper->ReadInput(staticData.GetInputType(), source))
+
+ boost::shared_ptr<InputType> source;
+ while ((source = ioWrapper->ReadInput()) != NULL)
{
- source->SetTranslationId(lineCount);
IFVERBOSE(1) ResetUserTime();
- FeatureFunction::CallChangeSource(source);
+ FeatureFunction::CallChangeSource(source.get());
// set up task of translating one sentence
- TranslationTask* task = new TranslationTask(source, *ioWrapper);
+ boost::shared_ptr<TranslationTask>
+ task = TranslationTask::create(source, ioWrapper);
// execute task
#ifdef WITH_THREADS
@@ -234,7 +230,6 @@ batch_run()
{
// simulated post-editing: always run single-threaded!
task->Run();
- delete task;
string src,trg,aln;
UTIL_THROW_IF2(!getline(*ioWrapper->spe_src,src), "[" << HERE << "] "
<< "missing update data for simulated post-editing.");
@@ -258,11 +253,7 @@ batch_run()
#endif
#else
task->Run();
- delete task;
#endif
-
- source = NULL; //make sure it doesn't get deleted
- ++lineCount;
}
// we are done, finishing up
@@ -270,7 +261,6 @@ batch_run()
pool.Stop(true); //flush remaining jobs
#endif
- delete ioWrapper;
FeatureFunction::Destroy();
IFVERBOSE(1) util::PrintUsage(std::cerr);
diff --git a/moses/IOWrapper.cpp b/moses/IOWrapper.cpp
index 99897e289..92994e234 100644
--- a/moses/IOWrapper.cpp
+++ b/moses/IOWrapper.cpp
@@ -87,6 +87,9 @@ IOWrapper::IOWrapper()
{
const StaticData &staticData = StaticData::Instance();
+ m_inputType = staticData.GetInputType();
+ m_currentLine = staticData.GetStartTranslationId();
+
m_inputFactorOrder = &staticData.GetInputFactorOrder();
size_t nBestSize = staticData.GetNBestSize();
@@ -223,48 +226,52 @@ IOWrapper::~IOWrapper()
delete m_latticeSamplesStream;
}
-InputType*
-IOWrapper::
-GetInput(InputType* inputType)
-{
- if(inputType->Read(*m_inputStream, *m_inputFactorOrder)) {
- return inputType;
- } else {
- delete inputType;
- return NULL;
- }
-}
-
-bool
-IOWrapper
-::ReadInput(InputTypeEnum inputType,
- InputType*& source,
- TranslationTask const* ttask)
+// InputType*
+// IOWrapper::
+// GetInput(InputType* inputType)
+// {
+// if(inputType->Read(*m_inputStream, *m_inputFactorOrder)) {
+// return inputType;
+// } else {
+// delete inputType;
+// return NULL;
+// }
+// }
+
+boost::shared_ptr<InputType>
+IOWrapper::ReadInput()
{
- delete source;
- switch(inputType) {
+ boost::shared_ptr<InputType> source;
+ switch(m_inputType) {
case SentenceInput:
- source = GetInput(new Sentence(ttask));
+ source.reset(new Sentence);
break;
case ConfusionNetworkInput:
- source = GetInput(new ConfusionNet(ttask));
+ source.reset(new ConfusionNet);
break;
case WordLatticeInput:
- source = GetInput(new WordLattice(ttask));
+ source.reset(new WordLattice);
break;
case TreeInputType:
- source = GetInput(new TreeInput(ttask));
+ source.reset(new TreeInput);
break;
case TabbedSentenceInput:
- source = GetInput(new TabbedSentence(ttask));
+ source.reset(new TabbedSentence);
break;
case ForestInputType:
- source = GetInput(new ForestInput(ttask));
+ source.reset(new ForestInput);
break;
default:
- TRACE_ERR("Unknown input type: " << inputType << "\n");
+ TRACE_ERR("Unknown input type: " << m_inputType << "\n");
}
- return (source ? true : false);
+#ifdef WITH_THREADS
+ boost::lock_guard<boost::mutex> lock(m_lock);
+#endif
+ if (source->Read(*m_inputStream, *m_inputFactorOrder))
+ source->SetTranslationId(m_currentLine++);
+ else
+ source.reset();
+ return source;
}
} // namespace
diff --git a/moses/IOWrapper.h b/moses/IOWrapper.h
index dd9c9d785..8ed9a02e5 100644
--- a/moses/IOWrapper.h
+++ b/moses/IOWrapper.h
@@ -35,6 +35,10 @@ POSSIBILITY OF SUCH DAMAGE.
#pragma once
+#ifdef WITH_THREADS
+#include <boost/thread.hpp>
+#endif
+
#include <cassert>
#include <fstream>
#include <ostream>
@@ -74,7 +78,6 @@ struct SHyperedge;
class IOWrapper
{
protected:
-
const std::vector<Moses::FactorType> *m_inputFactorOrder;
std::string m_inputFilePath;
Moses::InputFileStream *m_inputFile;
@@ -100,14 +103,20 @@ protected:
bool m_surpressSingleBestOutput;
+#ifdef WITH_THREADS
+ boost::mutex m_lock;
+#endif
+ size_t m_currentLine; /* line counter, initialized from static data at construction
+ * incremented with every call to ReadInput */
+
+ InputTypeEnum m_inputType; // initialized from StaticData at construction
public:
IOWrapper();
~IOWrapper();
- Moses::InputType* GetInput(Moses::InputType *inputType);
- bool ReadInput(Moses::InputTypeEnum inputType,
- Moses::InputType*& source, TranslationTask const* ttask=NULL);
+ // Moses::InputType* GetInput(Moses::InputType *inputType);
+ boost::shared_ptr<InputType> ReadInput();
Moses::OutputCollector *GetSingleBestOutputCollector() {
return m_singleBestOutputCollector.get();
diff --git a/moses/ThreadPool.cpp b/moses/ThreadPool.cpp
index 265c150c2..1113cef9d 100644
--- a/moses/ThreadPool.cpp
+++ b/moses/ThreadPool.cpp
@@ -41,7 +41,7 @@ ThreadPool::ThreadPool( size_t numThreads )
void ThreadPool::Execute()
{
do {
- Task* task = NULL;
+ boost::shared_ptr<Task> task;
{
// Find a job to perform
boost::mutex::scoped_lock lock(m_mutex);
@@ -59,15 +59,13 @@ void ThreadPool::Execute()
// race condition
bool del = task->DeleteAfterExecution();
task->Run();
- if (del) {
- delete task;
- }
+ // if (del) delete task; // not needed any more, since we use shared ptrs.
}
m_threadAvailable.notify_all();
} while (!m_stopped);
}
-void ThreadPool::Submit( Task* task )
+void ThreadPool::Submit(boost::shared_ptr<Task> task)
{
boost::mutex::scoped_lock lock(m_mutex);
if (m_stopping) {
diff --git a/moses/ThreadPool.h b/moses/ThreadPool.h
index bf981a2da..024d1c54d 100644
--- a/moses/ThreadPool.h
+++ b/moses/ThreadPool.h
@@ -26,6 +26,8 @@ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
#include <queue>
#include <vector>
+#include <boost/shared_ptr.hpp>
+
#ifdef WITH_THREADS
#include <boost/bind.hpp>
#include <boost/thread.hpp>
@@ -74,7 +76,7 @@ public:
/**
* Add a job to the threadpool.
**/
- void Submit(Task* task);
+ void Submit(boost::shared_ptr<Task> task);
/**
* Wait until all queued jobs have completed, and shut down
@@ -95,7 +97,7 @@ private:
**/
void Execute();
- std::queue<Task*> m_tasks;
+ std::queue<boost::shared_ptr<Task> > m_tasks;
boost::thread_group m_threads;
boost::mutex m_mutex;
boost::condition_variable m_threadNeeded;
diff --git a/moses/TranslationModel/CompactPT/BlockHashIndex.h b/moses/TranslationModel/CompactPT/BlockHashIndex.h
index 109044bb4..f8d526930 100644
--- a/moses/TranslationModel/CompactPT/BlockHashIndex.h
+++ b/moses/TranslationModel/CompactPT/BlockHashIndex.h
@@ -41,6 +41,8 @@ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
#include <time.h>
#endif
+#include <boost/shared_ptr.hpp>
+
namespace Moses
{
@@ -159,7 +161,9 @@ public:
}
#ifdef WITH_THREADS
- HashTask<Keys>* ht = new HashTask<Keys>(current, *this, keys);
+
+ boost::shared_ptr<HashTask<Keys> >
+ ht(new HashTask<Keys>(current, *this, keys));
m_threadPool.Submit(ht);
#else
CalcHash(current, keys);
diff --git a/moses/TranslationTask.cpp b/moses/TranslationTask.cpp
index be59e42d3..2b0d47baa 100644
--- a/moses/TranslationTask.cpp
+++ b/moses/TranslationTask.cpp
@@ -23,25 +23,36 @@ using namespace std;
namespace Moses
{
-TranslationTask::TranslationTask(InputType* source, Moses::IOWrapper &ioWrapper)
- : m_source(source)
- , m_ioWrapper(ioWrapper)
-{}
-
-TranslationTask::~TranslationTask()
+boost::shared_ptr<TranslationTask>
+TranslationTask
+::create(boost::shared_ptr<InputType> const& source,
+ boost::shared_ptr<IOWrapper> const& ioWrapper)
{
- delete m_source;
+ boost::shared_ptr<TranslationTask> ret(new TranslationTask(source, ioWrapper));
+ ret->m_self = ret;
+ return ret;
}
+TranslationTask
+::TranslationTask(boost::shared_ptr<InputType> const& source,
+ boost::shared_ptr<IOWrapper> const& ioWrapper)
+ : m_source(source) , m_ioWrapper(ioWrapper)
+{ }
+
+TranslationTask::~TranslationTask()
+{ }
+
void TranslationTask::Run()
{
+ UTIL_THROW_IF2(!m_source || !m_ioWrapper,
+ "Base Instances of TranslationTask must be initialized with"
+ << " input and iowrapper.");
+
+
// shorthand for "global data"
const StaticData &staticData = StaticData::Instance();
const size_t translationId = m_source->GetTranslationId();
- // input sentence
- Sentence sentence(this);
-
// report wall time spent on translation
Timer translationTime;
translationTime.start();
@@ -59,28 +70,28 @@ void TranslationTask::Run()
initTime.start();
// which manager
- BaseManager *manager;
+ boost::scoped_ptr<BaseManager> manager;
if (!staticData.IsSyntax()) {
// phrase-based
- manager = new Manager(*m_source);
+ manager.reset(new Manager(*m_source));
} else if (staticData.GetSearchAlgorithm() == SyntaxF2S ||
staticData.GetSearchAlgorithm() == SyntaxT2S) {
// STSG-based tree-to-string / forest-to-string decoding (ask Phil Williams)
typedef Syntax::F2S::RuleMatcherCallback Callback;
typedef Syntax::F2S::RuleMatcherHyperTree<Callback> RuleMatcher;
- manager = new Syntax::F2S::Manager<RuleMatcher>(*m_source);
+ manager.reset(new Syntax::F2S::Manager<RuleMatcher>(*m_source));
} else if (staticData.GetSearchAlgorithm() == SyntaxS2T) {
// new-style string-to-tree decoding (ask Phil Williams)
S2TParsingAlgorithm algorithm = staticData.GetS2TParsingAlgorithm();
if (algorithm == RecursiveCYKPlus) {
typedef Syntax::S2T::EagerParserCallback Callback;
typedef Syntax::S2T::RecursiveCYKPlusParser<Callback> Parser;
- manager = new Syntax::S2T::Manager<Parser>(*m_source);
+ manager.reset(new Syntax::S2T::Manager<Parser>(*m_source));
} else if (algorithm == Scope3) {
typedef Syntax::S2T::StandardParserCallback Callback;
typedef Syntax::S2T::Scope3Parser<Callback> Parser;
- manager = new Syntax::S2T::Manager<Parser>(*m_source);
+ manager.reset(new Syntax::S2T::Manager<Parser>(*m_source));
} else {
UTIL_THROW2("ERROR: unhandled S2T parsing algorithm");
}
@@ -88,64 +99,72 @@ void TranslationTask::Run()
// SCFG-based tree-to-string decoding (ask Phil Williams)
typedef Syntax::F2S::RuleMatcherCallback Callback;
typedef Syntax::T2S::RuleMatcherSCFG<Callback> RuleMatcher;
- manager = new Syntax::T2S::Manager<RuleMatcher>(*m_source);
+ manager.reset(new Syntax::T2S::Manager<RuleMatcher>(*m_source));
} else if (staticData.GetSearchAlgorithm() == ChartIncremental) {
// Ken's incremental decoding
- manager = new Incremental::Manager(*m_source);
+ manager.reset(new Incremental::Manager(*m_source));
} else {
// original SCFG manager
- manager = new ChartManager(*m_source);
+ manager.reset(new ChartManager(*m_source));
}
- VERBOSE(1, "Line " << translationId << ": Initialize search took " << initTime << " seconds total" << endl);
+ VERBOSE(1, "Line " << translationId << ": Initialize search took "
+ << initTime << " seconds total" << endl);
+
manager->Decode();
+ OutputCollector* ocoll;
// we are done with search, let's look what we got
Timer additionalReportingTime;
additionalReportingTime.start();
- manager->OutputBest(m_ioWrapper.GetSingleBestOutputCollector());
+ boost::shared_ptr<IOWrapper> const& io = m_ioWrapper;
+ manager->OutputBest(io->GetSingleBestOutputCollector());
// output word graph
- manager->OutputWordGraph(m_ioWrapper.GetWordGraphCollector());
+ manager->OutputWordGraph(io->GetWordGraphCollector());
// output search graph
- manager->OutputSearchGraph(m_ioWrapper.GetSearchGraphOutputCollector());
+ manager->OutputSearchGraph(io->GetSearchGraphOutputCollector());
+ // ???
manager->OutputSearchGraphSLF();
- // Output search graph in hypergraph format for Kenneth Heafield's lazy hypergraph decoder
- manager->OutputSearchGraphHypergraph();
+ // Output search graph in hypergraph format for Kenneth Heafield's
+ // lazy hypergraph decoder; writes to stderr
+ manager->OutputSearchGraphHypergraph();
additionalReportingTime.stop();
additionalReportingTime.start();
// output n-best list
- manager->OutputNBest(m_ioWrapper.GetNBestOutputCollector());
+ manager->OutputNBest(io->GetNBestOutputCollector());
//lattice samples
- manager->OutputLatticeSamples(m_ioWrapper.GetLatticeSamplesCollector());
+ manager->OutputLatticeSamples(io->GetLatticeSamplesCollector());
// detailed translation reporting
- manager->OutputDetailedTranslationReport(m_ioWrapper.GetDetailedTranslationCollector());
+ ocoll = io->GetDetailedTranslationCollector();
+ manager->OutputDetailedTranslationReport(ocoll);
- manager->OutputDetailedTreeFragmentsTranslationReport(m_ioWrapper.GetDetailTreeFragmentsOutputCollector());
+ ocoll = io->GetDetailTreeFragmentsOutputCollector();
+ manager->OutputDetailedTreeFragmentsTranslationReport(ocoll);
//list of unknown words
- manager->OutputUnknowns(m_ioWrapper.GetUnknownsCollector());
+ manager->OutputUnknowns(io->GetUnknownsCollector());
- manager->OutputAlignment(m_ioWrapper.GetAlignmentInfoCollector());
+ manager->OutputAlignment(io->GetAlignmentInfoCollector());
// report additional statistics
manager->CalcDecoderStatistics();
- VERBOSE(1, "Line " << translationId << ": Additional reporting took " << additionalReportingTime << " seconds total" << endl);
- VERBOSE(1, "Line " << translationId << ": Translation took " << translationTime << " seconds total" << endl);
+ VERBOSE(1, "Line " << translationId << ": Additional reporting took "
+ << additionalReportingTime << " seconds total" << endl);
+ VERBOSE(1, "Line " << translationId << ": Translation took "
+ << translationTime << " seconds total" << endl);
IFVERBOSE(2) {
PrintUserTime("Sentence Decoding Time:");
}
-
- delete manager;
}
}
diff --git a/moses/TranslationTask.h b/moses/TranslationTask.h
index b2a36840a..c9dccc0a5 100644
--- a/moses/TranslationTask.h
+++ b/moses/TranslationTask.h
@@ -1,3 +1,4 @@
+// -*- c++ -*-
#pragma once
#include <boost/smart_ptr/shared_ptr.hpp>
@@ -12,6 +13,10 @@
#include "moses/Syntax/S2T/Manager.h"
#include "moses/Syntax/T2S/Manager.h"
+#include <boost/shared_ptr.hpp>
+#include <boost/weak_ptr.hpp>
+#include <boost/make_shared.hpp>
+
namespace Moses
{
class InputType;
@@ -25,21 +30,54 @@ class OutputCollector;
**/
class TranslationTask : public Moses::Task
{
+ // no copying, no assignment
+ TranslationTask(TranslationTask const& other) { }
+
+ TranslationTask const&
+ operator=(TranslationTask const& other) { return *this; }
+
+protected:
+ boost::weak_ptr<TranslationTask> m_self; // weak ptr to myself
+
+ TranslationTask() { } ;
+ TranslationTask(boost::shared_ptr<Moses::InputType> const& source,
+ boost::shared_ptr<Moses::IOWrapper> const& ioWrapper);
+ // Yes, the constructor is protected.
+ //
+ // TranslationTasks can only be created through the creator
+ // functions create(...). The creator functions set m_self to a
+ // weak_pointer s.t m_self.get() == this. The public member function
+ // self() can then be used to get a shared_ptr to the Task that
+ // guarantees the existence of the Task while that pointer is live.
+ // Depending on the use, case, that shared pointer can be kept alive
+ // or copied into a weak pointer that can then be used e.g. as a
+ // hash key for caching context-dependent information in feature
+ // functions. When it is time to clean up the cache, the feature
+ // function can determine (via a check on the weak pointer) if the
+ // task is still live or not, or maintain a shared_ptr to ensure the
+ // task stays alive till it's done with it.
public:
- TranslationTask(Moses::InputType* source, Moses::IOWrapper &ioWrapper);
+ virtual
+ boost::shared_ptr<TranslationTask>
+ self() const { return m_self.lock(); }
- ~TranslationTask();
+ // creator functions
+ static boost::shared_ptr<TranslationTask> create();
+ static boost::shared_ptr<TranslationTask>
+ create(boost::shared_ptr<Moses::InputType> const& source,
+ boost::shared_ptr<Moses::IOWrapper> const& ioWrapper);
+
+ ~TranslationTask();
/** Translate one sentence
* gets called by main function implemented at end of this source file */
- void Run();
-
+ virtual void Run();
private:
- Moses::InputType* m_source;
- Moses::IOWrapper &m_ioWrapper;
+ boost::shared_ptr<Moses::InputType> m_source;
+ boost::shared_ptr<Moses::IOWrapper> m_ioWrapper;
};