From dd03f9fb69da82a37f1846d25dcc8d8781956e96 Mon Sep 17 00:00:00 2001 From: Kenneth Heafield Date: Mon, 2 Jun 2014 10:28:02 -0700 Subject: KenLM 5a7efd8fe1db88ee0a9f7e9479b24ac3ca348221 with Hieu's patch to exception.hh --- util/stream/block.hh | 57 ++++++++++++++- util/stream/chain.cc | 12 ++- util/stream/chain.hh | 165 +++++++++++++++++++++++++++++++++++++++--- util/stream/config.hh | 43 +++++++++-- util/stream/io.cc | 14 +++- util/stream/io.hh | 21 ++++-- util/stream/line_input.hh | 6 +- util/stream/multi_progress.hh | 6 +- util/stream/multi_stream.hh | 127 ++++++++++++++++++++++++++++++++ util/stream/sort.hh | 14 ++-- util/stream/stream.hh | 9 ++- util/stream/timer.hh | 6 +- 12 files changed, 432 insertions(+), 48 deletions(-) create mode 100644 util/stream/multi_stream.hh (limited to 'util/stream') diff --git a/util/stream/block.hh b/util/stream/block.hh index 11aa991e4..aa7e28bb1 100644 --- a/util/stream/block.hh +++ b/util/stream/block.hh @@ -1,5 +1,5 @@ -#ifndef UTIL_STREAM_BLOCK__ -#define UTIL_STREAM_BLOCK__ +#ifndef UTIL_STREAM_BLOCK_H +#define UTIL_STREAM_BLOCK_H #include #include @@ -7,28 +7,77 @@ namespace util { namespace stream { +/** + * Encapsulates a block of memory. + */ class Block { public: + + /** + * Constructs an empty block. + */ Block() : mem_(NULL), valid_size_(0) {} + /** + * Constructs a block that encapsulates a segment of memory. + * + * @param[in] mem The segment of memory to encapsulate + * @param[in] size The size of the memory segment in bytes + */ Block(void *mem, std::size_t size) : mem_(mem), valid_size_(size) {} + /** + * Set the number of bytes in this block that should be interpreted as valid. + * + * @param[in] to Number of bytes + */ void SetValidSize(std::size_t to) { valid_size_ = to; } - // Read might fill in less than Allocated at EOF. + + /** + * Gets the number of bytes in this block that should be interpreted as valid. + * This is important because read might fill in less than Allocated at EOF. + */ std::size_t ValidSize() const { return valid_size_; } + /** Gets a void pointer to the memory underlying this block. */ void *Get() { return mem_; } + + /** Gets a const void pointer to the memory underlying this block. */ const void *Get() const { return mem_; } + + /** + * Gets a const void pointer to the end of the valid section of memory + * encapsulated by this block. + */ const void *ValidEnd() const { return reinterpret_cast(mem_) + valid_size_; } + /** + * Returns true if this block encapsulates a valid (non-NULL) block of memory. + * + * This method is a user-defined implicit conversion function to boolean; + * among other things, this method enables bare instances of this class + * to be used as the condition of an if statement. + */ operator bool() const { return mem_ != NULL; } + + /** + * Returns true if this block is empty. + * + * In other words, if Get()==NULL, this method will return true. + */ bool operator!() const { return mem_ == NULL; } private: friend class Link; + + /** + * Points this block's memory at NULL. + * + * This class defines poison as a block whose memory pointer is NULL. + */ void SetToPoison() { mem_ = NULL; } @@ -40,4 +89,4 @@ class Block { } // namespace stream } // namespace util -#endif // UTIL_STREAM_BLOCK__ +#endif // UTIL_STREAM_BLOCK_H diff --git a/util/stream/chain.cc b/util/stream/chain.cc index 46708c601..4596af7ae 100644 --- a/util/stream/chain.cc +++ b/util/stream/chain.cc @@ -59,6 +59,11 @@ Chain &Chain::operator>>(const WriteAndRecycle &writer) { return *this; } +Chain &Chain::operator>>(const PWriteAndRecycle &writer) { + threads_.push_back(new Thread(Complete(), writer)); + return *this; +} + void Chain::Wait(bool release_memory) { if (queues_.empty()) { assert(threads_.empty()); @@ -126,7 +131,12 @@ Link::~Link() { // abort(); } else { if (!poisoned_) { - // Pass the poison! + // Poison is a block whose memory pointer is NULL. + // + // Because we're in the else block, + // we know that the memory pointer of current_ is NULL. + // + // Pass the current (poison) block! out_->Produce(current_); } } diff --git a/util/stream/chain.hh b/util/stream/chain.hh index 0cc83a852..508650860 100644 --- a/util/stream/chain.hh +++ b/util/stream/chain.hh @@ -1,5 +1,5 @@ -#ifndef UTIL_STREAM_CHAIN__ -#define UTIL_STREAM_CHAIN__ +#ifndef UTIL_STREAM_CHAIN_H +#define UTIL_STREAM_CHAIN_H #include "util/stream/block.hh" #include "util/stream/config.hh" @@ -24,7 +24,12 @@ class ChainConfigException : public Exception { }; class Chain; -// Specifies position in chain for Link constructor. + +/** + * Encapsulates a @ref PCQueue "producer queue" and a @ref PCQueue "consumer queue" within a @ref Chain "chain". + * + * Specifies position in chain for Link constructor. + */ class ChainPosition { public: const Chain &GetChain() const { return *chain_; } @@ -41,14 +46,32 @@ class ChainPosition { WorkerProgress progress_; }; -// Position is usually ChainPosition but if there are multiple streams involved, this can be ChainPositions. + +/** + * Encapsulates a worker thread processing data at a given position in the chain. + * + * Each instance of this class owns one boost thread in which the worker is Run(). + */ class Thread { public: + + /** + * Constructs a new Thread in which the provided Worker is Run(). + * + * Position is usually ChainPosition but if there are multiple streams involved, this can be ChainPositions. + * + * After a call to this constructor, the provided worker will be running within a boost thread owned by the newly constructed Thread object. + */ template Thread(const Position &position, const Worker &worker) : thread_(boost::ref(*this), position, worker) {} ~Thread(); + /** + * Launches the provided worker in this object's boost thread. + * + * This method is called automatically by this class's @ref Thread() "constructor". + */ template void operator()(const Position &position, Worker &worker) { try { worker.Run(position); @@ -63,14 +86,27 @@ class Thread { boost::thread thread_; }; +/** + * This resets blocks to full valid size. Used to close the loop in Chain by recycling blocks. + */ class Recycler { public: + /** + * Resets the blocks in the chain such that the blocks' respective valid sizes match the chain's block size. + * + * @see Block::SetValidSize() + * @see Chain::BlockSize() + */ void Run(const ChainPosition &position); }; extern const Recycler kRecycle; class WriteAndRecycle; - +class PWriteAndRecycle; + +/** + * Represents a sequence of workers, through which @ref Block "blocks" can pass. + */ class Chain { private: template struct CheckForRun { @@ -78,8 +114,20 @@ class Chain { }; public: + + /** + * Constructs a configured Chain. + * + * @param config Specifies how to configure the Chain. + */ explicit Chain(const ChainConfig &config); + /** + * Destructs a Chain. + * + * This method waits for the chain's threads to complete, + * and frees the memory held by this chain. + */ ~Chain(); void ActivateProgress() { @@ -91,24 +139,49 @@ class Chain { progress_.SetTarget(target); } + /** + * Gets the number of bytes in each record of a Block. + * + * @see ChainConfig::entry_size + */ std::size_t EntrySize() const { return config_.entry_size; } + + /** + * Gets the inital @ref Block::ValidSize "valid size" for @ref Block "blocks" in this chain. + * + * @see Block::ValidSize + */ std::size_t BlockSize() const { return block_size_; } - // Two ways to add to the chain: Add() or operator>>. + /** Two ways to add to the chain: Add() or operator>>. */ ChainPosition Add(); - // This is for adding threaded workers with a Run method. + /** + * Adds a new worker to this chain, + * and runs that worker in a new Thread owned by this chain. + * + * The worker must have a Run method that accepts a position argument. + * + * @see Thread::operator()() + */ template typename CheckForRun::type &operator>>(const Worker &worker) { assert(!complete_called_); threads_.push_back(new Thread(Add(), worker)); return *this; } - // Avoid copying the worker. + /** + * Adds a new worker to this chain (but avoids copying that worker), + * and runs that worker in a new Thread owned by this chain. + * + * The worker must have a Run method that accepts a position argument. + * + * @see Thread::operator()() + */ template typename CheckForRun::type &operator>>(const boost::reference_wrapper &worker) { assert(!complete_called_); threads_.push_back(new Thread(Add(), worker)); @@ -122,12 +195,21 @@ class Chain { threads_.push_back(new Thread(Complete(), kRecycle)); } + /** + * Adds a Recycler worker to this chain, + * and runs that worker in a new Thread owned by this chain. + */ Chain &operator>>(const Recycler &) { CompleteLoop(); return *this; } + /** + * Adds a WriteAndRecycle worker to this chain, + * and runs that worker in a new Thread owned by this chain. + */ Chain &operator>>(const WriteAndRecycle &writer); + Chain &operator>>(const PWriteAndRecycle &writer); // Chains are reusable. Call Wait to wait for everything to finish and free memory. void Wait(bool release_memory = true); @@ -156,28 +238,87 @@ class Chain { }; // Create the link in the worker thread using the position token. +/** + * Represents a C++ style iterator over @ref Block "blocks". + */ class Link { public: + // Either default construct and Init or just construct all at once. + + /** + * Constructs an @ref Init "initialized" link. + * + * @see Init + */ + explicit Link(const ChainPosition &position); + + /** + * Constructs a link that must subsequently be @ref Init "initialized". + * + * @see Init + */ Link(); + + /** + * Initializes the link with the input @ref PCQueue "consumer queue" and output @ref PCQueue "producer queue" at a given @ref ChainPosition "position" in the @ref Chain "chain". + * + * @see Link() + */ void Init(const ChainPosition &position); - explicit Link(const ChainPosition &position); - + /** + * Destructs the link object. + * + * If necessary, this method will pass a poison block + * to this link's output @ref PCQueue "producer queue". + * + * @see Block::SetToPoison() + */ ~Link(); + /** + * Gets a reference to the @ref Block "block" at this link. + */ Block &operator*() { return current_; } + + /** + * Gets a const reference to the @ref Block "block" at this link. + */ const Block &operator*() const { return current_; } + /** + * Gets a pointer to the @ref Block "block" at this link. + */ Block *operator->() { return ¤t_; } + + /** + * Gets a const pointer to the @ref Block "block" at this link. + */ const Block *operator->() const { return ¤t_; } + /** + * Gets the link at the next @ref ChainPosition "position" in the @ref Chain "chain". + */ Link &operator++(); + /** + * Returns true if the @ref Block "block" at this link encapsulates a valid (non-NULL) block of memory. + * + * This method is a user-defined implicit conversion function to boolean; + * among other things, this method enables bare instances of this class + * to be used as the condition of an if statement. + */ operator bool() const { return current_; } + /** + * @ref Block::SetToPoison() "Poisons" the @ref Block "block" at this link, + * and passes this now-poisoned block to this link's output @ref PCQueue "producer queue". + * + * @see Block::SetToPoison() + */ void Poison(); - + private: Block current_; PCQueue *in_, *out_; @@ -195,4 +336,4 @@ inline Chain &operator>>(Chain &chain, Link &link) { } // namespace stream } // namespace util -#endif // UTIL_STREAM_CHAIN__ +#endif // UTIL_STREAM_CHAIN_H diff --git a/util/stream/config.hh b/util/stream/config.hh index 1eeb3a8a1..6bad36bc5 100644 --- a/util/stream/config.hh +++ b/util/stream/config.hh @@ -1,32 +1,63 @@ -#ifndef UTIL_STREAM_CONFIG__ -#define UTIL_STREAM_CONFIG__ +#ifndef UTIL_STREAM_CONFIG_H +#define UTIL_STREAM_CONFIG_H #include #include namespace util { namespace stream { +/** + * Represents how a chain should be configured. + */ struct ChainConfig { + + /** Constructs an configuration with underspecified (or default) parameters. */ ChainConfig() {} + /** + * Constructs a chain configuration object. + * + * @param [in] in_entry_size Number of bytes in each record. + * @param [in] in_block_count Number of blocks in the chain. + * @param [in] in_total_memory Total number of bytes available to the chain. + * This value will be divided amongst the blocks in the chain. + */ ChainConfig(std::size_t in_entry_size, std::size_t in_block_count, std::size_t in_total_memory) : entry_size(in_entry_size), block_count(in_block_count), total_memory(in_total_memory) {} + /** + * Number of bytes in each record. + */ std::size_t entry_size; + + /** + * Number of blocks in the chain. + */ std::size_t block_count; - // Chain's constructor will make this a multiple of entry_size. + + /** + * Total number of bytes available to the chain. + * This value will be divided amongst the blocks in the chain. + * Chain's constructor will make this a multiple of entry_size. + */ std::size_t total_memory; }; + +/** + * Represents how a sorter should be configured. + */ struct SortConfig { + + /** Filename prefix where temporary files should be placed. */ std::string temp_prefix; - // Size of each input/output buffer. + /** Size of each input/output buffer. */ std::size_t buffer_size; - // Total memory to use when running alone. + /** Total memory to use when running alone. */ std::size_t total_memory; }; }} // namespaces -#endif // UTIL_STREAM_CONFIG__ +#endif // UTIL_STREAM_CONFIG_H diff --git a/util/stream/io.cc b/util/stream/io.cc index 0459f7069..c64004c0b 100644 --- a/util/stream/io.cc +++ b/util/stream/io.cc @@ -36,12 +36,12 @@ void PRead::Run(const ChainPosition &position) { Link link(position); uint64_t offset = 0; for (; offset + block_size64 < size; offset += block_size64, ++link) { - PReadOrThrow(file_, link->Get(), block_size, offset); + ErsatzPRead(file_, link->Get(), block_size, offset); link->SetValidSize(block_size); } // size - offset is <= block_size, so it casts to 32-bit fine. if (size - offset) { - PReadOrThrow(file_, link->Get(), size - offset, offset); + ErsatzPRead(file_, link->Get(), size - offset, offset); link->SetValidSize(size - offset); ++link; } @@ -62,5 +62,15 @@ void WriteAndRecycle::Run(const ChainPosition &position) { } } +void PWriteAndRecycle::Run(const ChainPosition &position) { + const std::size_t block_size = position.GetChain().BlockSize(); + uint64_t offset = 0; + for (Link link(position); link; ++link) { + ErsatzPWrite(file_, link->Get(), link->ValidSize(), offset); + offset += link->ValidSize(); + link->SetValidSize(block_size); + } +} + } // namespace stream } // namespace util diff --git a/util/stream/io.hh b/util/stream/io.hh index 934b6b3fe..8dae2cbff 100644 --- a/util/stream/io.hh +++ b/util/stream/io.hh @@ -1,5 +1,5 @@ -#ifndef UTIL_STREAM_IO__ -#define UTIL_STREAM_IO__ +#ifndef UTIL_STREAM_IO_H +#define UTIL_STREAM_IO_H #include "util/exception.hh" #include "util/file.hh" @@ -41,6 +41,8 @@ class Write { int file_; }; +// It's a common case that stuff is written and then recycled. So rather than +// spawn another thread to Recycle, this combines the two roles. class WriteAndRecycle { public: explicit WriteAndRecycle(int fd) : file_(fd) {} @@ -49,14 +51,23 @@ class WriteAndRecycle { int file_; }; +class PWriteAndRecycle { + public: + explicit PWriteAndRecycle(int fd) : file_(fd) {} + void Run(const ChainPosition &position); + private: + int file_; +}; + + // Reuse the same file over and over again to buffer output. class FileBuffer { public: explicit FileBuffer(int fd) : file_(fd) {} - WriteAndRecycle Sink() const { + PWriteAndRecycle Sink() const { util::SeekOrThrow(file_.get(), 0); - return WriteAndRecycle(file_.get()); + return PWriteAndRecycle(file_.get()); } PRead Source() const { @@ -73,4 +84,4 @@ class FileBuffer { } // namespace stream } // namespace util -#endif // UTIL_STREAM_IO__ +#endif // UTIL_STREAM_IO_H diff --git a/util/stream/line_input.hh b/util/stream/line_input.hh index 86db1dd06..a870a6648 100644 --- a/util/stream/line_input.hh +++ b/util/stream/line_input.hh @@ -1,5 +1,5 @@ -#ifndef UTIL_STREAM_LINE_INPUT__ -#define UTIL_STREAM_LINE_INPUT__ +#ifndef UTIL_STREAM_LINE_INPUT_H +#define UTIL_STREAM_LINE_INPUT_H namespace util {namespace stream { class ChainPosition; @@ -19,4 +19,4 @@ class LineInput { }; }} // namespaces -#endif // UTIL_STREAM_LINE_INPUT__ +#endif // UTIL_STREAM_LINE_INPUT_H diff --git a/util/stream/multi_progress.hh b/util/stream/multi_progress.hh index c4dd45a9b..82e698a59 100644 --- a/util/stream/multi_progress.hh +++ b/util/stream/multi_progress.hh @@ -1,6 +1,6 @@ /* Progress bar suitable for chains of workers */ -#ifndef UTIL_MULTI_PROGRESS__ -#define UTIL_MULTI_PROGRESS__ +#ifndef UTIL_STREAM_MULTI_PROGRESS_H +#define UTIL_STREAM_MULTI_PROGRESS_H #include @@ -87,4 +87,4 @@ class WorkerProgress { }} // namespaces -#endif // UTIL_MULTI_PROGRESS__ +#endif // UTIL_STREAM_MULTI_PROGRESS_H diff --git a/util/stream/multi_stream.hh b/util/stream/multi_stream.hh new file mode 100644 index 000000000..0ee7fab6f --- /dev/null +++ b/util/stream/multi_stream.hh @@ -0,0 +1,127 @@ +#ifndef UTIL_STREAM_MULTI_STREAM_H +#define UTIL_STREAM_MULTI_STREAM_H + +#include "util/fixed_array.hh" +#include "util/scoped.hh" +#include "util/stream/chain.hh" +#include "util/stream/stream.hh" + +#include +#include + +#include +#include + +namespace util { namespace stream { + +class Chains; + +class ChainPositions : public util::FixedArray { + public: + ChainPositions() {} + + void Init(Chains &chains); + + explicit ChainPositions(Chains &chains) { + Init(chains); + } +}; + +class Chains : public util::FixedArray { + private: + template struct CheckForRun { + typedef Chains type; + }; + + public: + // Must call Init. + Chains() {} + + explicit Chains(std::size_t limit) : util::FixedArray(limit) {} + + template typename CheckForRun::type &operator>>(const Worker &worker) { + threads_.push_back(new util::stream::Thread(ChainPositions(*this), worker)); + return *this; + } + + template typename CheckForRun::type &operator>>(const boost::reference_wrapper &worker) { + threads_.push_back(new util::stream::Thread(ChainPositions(*this), worker)); + return *this; + } + + Chains &operator>>(const util::stream::Recycler &recycler) { + for (util::stream::Chain *i = begin(); i != end(); ++i) + *i >> recycler; + return *this; + } + + void Wait(bool release_memory = true) { + threads_.clear(); + for (util::stream::Chain *i = begin(); i != end(); ++i) { + i->Wait(release_memory); + } + } + + private: + boost::ptr_vector threads_; + + Chains(const Chains &); + void operator=(const Chains &); +}; + +inline void ChainPositions::Init(Chains &chains) { + util::FixedArray::Init(chains.size()); + for (util::stream::Chain *i = chains.begin(); i != chains.end(); ++i) { + // use "placement new" syntax to initalize ChainPosition in an already-allocated memory location + new (end()) util::stream::ChainPosition(i->Add()); Constructed(); + } +} + +inline Chains &operator>>(Chains &chains, ChainPositions &positions) { + positions.Init(chains); + return chains; +} + +template class GenericStreams : public util::FixedArray { + private: + typedef util::FixedArray P; + public: + GenericStreams() {} + + // This puts a dummy T at the beginning (useful to algorithms that need to reference something at the beginning). + void InitWithDummy(const ChainPositions &positions) { + P::Init(positions.size() + 1); + new (P::end()) T(); // use "placement new" syntax to initalize T in an already-allocated memory location + P::Constructed(); + for (const util::stream::ChainPosition *i = positions.begin(); i != positions.end(); ++i) { + P::push_back(*i); + } + } + + // Limit restricts to positions[0,limit) + void Init(const ChainPositions &positions, std::size_t limit) { + P::Init(limit); + for (const util::stream::ChainPosition *i = positions.begin(); i != positions.begin() + limit; ++i) { + P::push_back(*i); + } + } + void Init(const ChainPositions &positions) { + Init(positions, positions.size()); + } + + GenericStreams(const ChainPositions &positions) { + Init(positions); + } +}; + +template inline Chains &operator>>(Chains &chains, GenericStreams &streams) { + ChainPositions positions; + chains >> positions; + streams.Init(positions); + return chains; +} + +typedef GenericStreams Streams; + +}} // namespaces +#endif // UTIL_STREAM_MULTI_STREAM_H diff --git a/util/stream/sort.hh b/util/stream/sort.hh index 16aa6a036..9082cfdde 100644 --- a/util/stream/sort.hh +++ b/util/stream/sort.hh @@ -15,8 +15,8 @@ * sort. Use a hash table for that. */ -#ifndef UTIL_STREAM_SORT__ -#define UTIL_STREAM_SORT__ +#ifndef UTIL_STREAM_SORT_H +#define UTIL_STREAM_SORT_H #include "util/stream/chain.hh" #include "util/stream/config.hh" @@ -182,7 +182,7 @@ template class MergeQueue { amount = remaining_; buffer_end_ = current_ + remaining_; } - PReadOrThrow(fd, current_, amount, offset_); + ErsatzPRead(fd, current_, amount, offset_); offset_ += amount; assert(current_ <= buffer_end_); remaining_ -= amount; @@ -307,10 +307,10 @@ template class MergingReader { const uint64_t block_size = position.GetChain().BlockSize(); Link l(position); for (; offset + block_size < end; ++l, offset += block_size) { - PReadOrThrow(in_, l->Get(), block_size, offset); + ErsatzPRead(in_, l->Get(), block_size, offset); l->SetValidSize(block_size); } - PReadOrThrow(in_, l->Get(), end - offset, offset); + ErsatzPRead(in_, l->Get(), end - offset, offset); l->SetValidSize(end - offset); (++l).Poison(); return; @@ -388,8 +388,10 @@ class BadSortConfig : public Exception { ~BadSortConfig() throw() {} }; +/** Sort */ template class Sort { public: + /** Constructs an object capable of sorting */ Sort(Chain &in, const SortConfig &config, const Compare &compare = Compare(), const Combine &combine = Combine()) : config_(config), data_(MakeTemp(config.temp_prefix)), @@ -545,4 +547,4 @@ template uint64_t BlockingSort(Chain &chain, cons } // namespace stream } // namespace util -#endif // UTIL_STREAM_SORT__ +#endif // UTIL_STREAM_SORT_H diff --git a/util/stream/stream.hh b/util/stream/stream.hh index 6ff45b820..7ea1c9f70 100644 --- a/util/stream/stream.hh +++ b/util/stream/stream.hh @@ -1,5 +1,5 @@ -#ifndef UTIL_STREAM_STREAM__ -#define UTIL_STREAM_STREAM__ +#ifndef UTIL_STREAM_STREAM_H +#define UTIL_STREAM_STREAM_H #include "util/stream/chain.hh" @@ -56,6 +56,9 @@ class Stream : boost::noncopyable { end_ = current_ + block_it_->ValidSize(); } + // The following are pointers to raw memory + // current_ is the current record + // end_ is the end of the block (so we know when to move to the next block) uint8_t *current_, *end_; std::size_t entry_size_; @@ -71,4 +74,4 @@ inline Chain &operator>>(Chain &chain, Stream &stream) { } // namespace stream } // namespace util -#endif // UTIL_STREAM_STREAM__ +#endif // UTIL_STREAM_STREAM_H diff --git a/util/stream/timer.hh b/util/stream/timer.hh index 7e1a5885d..06488a17e 100644 --- a/util/stream/timer.hh +++ b/util/stream/timer.hh @@ -1,5 +1,5 @@ -#ifndef UTIL_STREAM_TIMER__ -#define UTIL_STREAM_TIMER__ +#ifndef UTIL_STREAM_TIMER_H +#define UTIL_STREAM_TIMER_H // Sorry Jon, this was adding library dependencies in Moses and people complained. @@ -13,4 +13,4 @@ #define UTIL_TIMER(str) //#endif -#endif // UTIL_STREAM_TIMER__ +#endif // UTIL_STREAM_TIMER_H -- cgit v1.2.3