diff options
author | Kenneth Heafield <github@kheafield.com> | 2015-08-27 12:55:52 +0300 |
---|---|---|
committer | Kenneth Heafield <github@kheafield.com> | 2015-08-27 12:55:52 +0300 |
commit | 09ecd071f9571a3808fc43700186fd777564785b (patch) | |
tree | 98236edb11f43a01980fe2ebaf397bf3ac344b53 /util | |
parent | 380b5a5dfd99a7df4ea8f270abf72debe5e9cb6e (diff) |
KenLM 2a3e8fae3633c890cb3b342d461f9130c8e343fa excluding unfinished interpolation directory
Diffstat (limited to 'util')
-rw-r--r-- | util/CMakeLists.txt | 109 | ||||
-rw-r--r-- | util/double-conversion/CMakeLists.txt | 39 | ||||
-rw-r--r-- | util/file.cc | 13 | ||||
-rw-r--r-- | util/file.hh | 22 | ||||
-rw-r--r-- | util/fixed_array.hh | 35 | ||||
-rw-r--r-- | util/float_to_string.hh | 4 | ||||
-rw-r--r-- | util/probing_hash_table.hh | 3 | ||||
-rw-r--r-- | util/probing_hash_table_benchmark_main.cc | 14 | ||||
-rw-r--r-- | util/stream/CMakeLists.txt | 74 | ||||
-rw-r--r-- | util/stream/Jamfile | 8 | ||||
-rw-r--r-- | util/stream/chain.cc | 2 | ||||
-rw-r--r-- | util/stream/chain.hh | 15 | ||||
-rw-r--r-- | util/stream/count_records.cc | 12 | ||||
-rw-r--r-- | util/stream/count_records.hh | 20 | ||||
-rw-r--r-- | util/stream/multi_stream.hh | 17 | ||||
-rw-r--r-- | util/stream/rewindable_stream.cc | 157 | ||||
-rw-r--r-- | util/stream/rewindable_stream.hh | 46 | ||||
-rw-r--r-- | util/stream/rewindable_stream_test.cc | 2 |
18 files changed, 462 insertions, 130 deletions
diff --git a/util/CMakeLists.txt b/util/CMakeLists.txt new file mode 100644 index 000000000..c52cdbc06 --- /dev/null +++ b/util/CMakeLists.txt @@ -0,0 +1,109 @@ +cmake_minimum_required(VERSION 2.8.8) +# +# The KenLM cmake files make use of add_library(... OBJECTS ...) +# +# This syntax allows grouping of source files when compiling +# (effectively creating "fake" libraries based on source subdirs). +# +# This syntax was only added in cmake version 2.8.8 +# +# see http://www.cmake.org/Wiki/CMake/Tutorials/Object_Library + + +# This CMake file was created by Lane Schwartz <dowobeha@gmail.com> + + +# Explicitly list the source files for this subdirectory +# +# If you add any source files to this subdirectory +# that should be included in the kenlm library, +# (this excludes any unit test files) +# you should add them to the following list: +# +# Because we do not set PARENT_SCOPE in the following definition, +# CMake files in the parent directory won't be able to access this variable. +# +set(KENLM_UTIL_SOURCE + bit_packing.cc + ersatz_progress.cc + exception.cc + file.cc + file_piece.cc + float_to_string.cc + integer_to_string.cc + mmap.cc + murmur_hash.cc + parallel_read.cc + pool.cc + read_compressed.cc + scoped.cc + string_piece.cc + usage.cc + ) + +# This directory has children that need to be processed +add_subdirectory(double-conversion) +add_subdirectory(stream) + + +# Group these objects together for later use. +# +# Given add_library(foo OBJECT ${my_foo_sources}), +# refer to these objects as $<TARGET_OBJECTS:foo> +# +add_library(kenlm_util OBJECT ${KENLM_UTIL_DOUBLECONVERSION_SOURCE} ${KENLM_UTIL_STREAM_SOURCE} ${KENLM_UTIL_SOURCE}) + + + +# Only compile and run unit tests if tests should be run +if(BUILD_TESTING) + + # Explicitly list the Boost test files to be compiled + set(KENLM_BOOST_TESTS_LIST + bit_packing_test + file_piece_test + joint_sort_test + multi_intersection_test + probing_hash_table_test + read_compressed_test + sorted_uniform_test + tokenize_piece_test + ) + + # Iterate through the Boost tests list + foreach(test ${KENLM_BOOST_TESTS_LIST}) + + # Compile the executable, linking against the requisite dependent object files + add_executable(${test} ${test}.cc $<TARGET_OBJECTS:kenlm_util>) + + # Require the following compile flag + set_target_properties(${test} PROPERTIES COMPILE_FLAGS -DBOOST_TEST_DYN_LINK) + + # Link the executable against boost + target_link_libraries(${test} ${Boost_LIBRARIES}) + + # file_piece_test requires an extra command line parameter + if ("${test}" STREQUAL "file_piece_test") + set(test_params + ${CMAKE_CURRENT_SOURCE_DIR}/file_piece.cc + ) + else() + set(test_params + ) + endif() + + # Specify command arguments for how to run each unit test + # + # Assuming that foo was defined via add_executable(foo ...), + # the syntax $<TARGET_FILE:foo> gives the full path to the executable. + # + add_test(NAME ${test}_test + COMMAND $<TARGET_FILE:${test}> ${test_params}) + + # Group unit tests together + set_target_properties(${test} PROPERTIES FOLDER "unit_tests") + + # End for loop + endforeach(test) + +endif() diff --git a/util/double-conversion/CMakeLists.txt b/util/double-conversion/CMakeLists.txt new file mode 100644 index 000000000..e2cf02aa6 --- /dev/null +++ b/util/double-conversion/CMakeLists.txt @@ -0,0 +1,39 @@ +cmake_minimum_required(VERSION 2.8.8) +# +# The KenLM cmake files make use of add_library(... OBJECTS ...) +# +# This syntax allows grouping of source files when compiling +# (effectively creating "fake" libraries based on source subdirs). +# +# This syntax was only added in cmake version 2.8.8 +# +# see http://www.cmake.org/Wiki/CMake/Tutorials/Object_Library + + +# This CMake file was created by Lane Schwartz <dowobeha@gmail.com> + +# Explicitly list the source files for this subdirectory +# +# If you add any source files to this subdirectory +# that should be included in the kenlm library, +# (this excludes any unit test files) +# you should add them to the following list: +# +# In order to allow CMake files in the parent directory +# to see this variable definition, we set PARENT_SCOPE. +# +# In order to set correct paths to these files +# when this variable is referenced by CMake files in the parent directory, +# we prefix all files with ${CMAKE_CURRENT_SOURCE_DIR}. +# +set(KENLM_UTIL_DOUBLECONVERSION_SOURCE + ${CMAKE_CURRENT_SOURCE_DIR}/bignum-dtoa.cc + ${CMAKE_CURRENT_SOURCE_DIR}/bignum.cc + ${CMAKE_CURRENT_SOURCE_DIR}/cached-powers.cc + ${CMAKE_CURRENT_SOURCE_DIR}/diy-fp.cc + ${CMAKE_CURRENT_SOURCE_DIR}/double-conversion.cc + ${CMAKE_CURRENT_SOURCE_DIR}/fast-dtoa.cc + ${CMAKE_CURRENT_SOURCE_DIR}/fixed-dtoa.cc + ${CMAKE_CURRENT_SOURCE_DIR}/strtod.cc + PARENT_SCOPE) + diff --git a/util/file.cc b/util/file.cc index 046b9ff90..be272f9bc 100644 --- a/util/file.cc +++ b/util/file.cc @@ -60,6 +60,14 @@ EndOfFileException::EndOfFileException() throw() { } EndOfFileException::~EndOfFileException() throw() {} +bool InputFileIsStdin(StringPiece path) { + return path == "-" || path == "/dev/stdin"; +} + +bool OutputFileIsStdout(StringPiece path) { + return path == "-" || path == "/dev/stdout"; +} + int OpenReadOrThrow(const char *name) { int ret; #if defined(_WIN32) || defined(_WIN64) @@ -111,7 +119,10 @@ uint64_t SizeOrThrow(int fd) { } void ResizeOrThrow(int fd, uint64_t to) { -#if defined(_WIN32) || defined(_WIN64) +#if defined __MINGW32__ + // Does this handle 64-bit? + int ret = ftruncate +#elif defined(_WIN32) || defined(_WIN64) errno_t ret = _chsize_s #elif defined(OS_ANDROID) int ret = ftruncate64 diff --git a/util/file.hh b/util/file.hh index bd5873cbc..f7cb4d688 100644 --- a/util/file.hh +++ b/util/file.hh @@ -78,6 +78,28 @@ int OpenReadOrThrow(const char *name); // Create file if it doesn't exist, truncate if it does. Opened for write. int CreateOrThrow(const char *name); +/** Does the given input file path denote standard input? + * + * Returns true if, and only if, path is either "-" or "/dev/stdin". + * + * Opening standard input as a file may need some special treatment for + * portability. There's a convention that a dash ("-") in place of an input + * file path denotes standard input, but opening "/dev/stdin" may need to be + * special as well. + */ +bool InputPathIsStdin(StringPiece path); + +/** Does the given output file path denote standard output? + * + * Returns true if, and only if, path is either "-" or "/dev/stdout". + * + * Opening standard output as a file may need some special treatment for + * portability. There's a convention that a dash ("-") in place of an output + * file path denotes standard output, but opening "/dev/stdout" may need to be + * special as well. + */ +bool OutputPathIsStdout(StringPiece path); + // Return value for SizeFile when it can't size properly. const uint64_t kBadSize = (uint64_t)-1; uint64_t SizeFile(int fd); diff --git a/util/fixed_array.hh b/util/fixed_array.hh index 610cbdf12..9083d39ee 100644 --- a/util/fixed_array.hh +++ b/util/fixed_array.hh @@ -100,33 +100,56 @@ template <class T> class FixedArray { * * @param i Index of the object to reference */ - T &operator[](std::size_t i) { return begin()[i]; } + T &operator[](std::size_t i) { + assert(i < size()); + return begin()[i]; + } /** * Gets a const reference to the object with index i currently stored in this data structure. * * @param i Index of the object to reference */ - const T &operator[](std::size_t i) const { return begin()[i]; } + const T &operator[](std::size_t i) const { + assert(i < size()); + return begin()[i]; + } /** * Constructs a new object using the provided parameter, * and stores it in this data structure. * * The memory backing the constructed object is managed by this data structure. + * I miss C++11 variadic templates. */ + void push_back() { + new (end()) T(); + Constructed(); + } template <class C> void push_back(const C &c) { - new (end()) T(c); // use "placement new" syntax to initalize T in an already-allocated memory location + new (end()) T(c); + Constructed(); + } + template <class C> void push_back(C &c) { + new (end()) T(c); + Constructed(); + } + template <class C, class D> void push_back(const C &c, const D &d) { + new (end()) T(c, d); Constructed(); } + void pop_back() { + back().~T(); + --newed_end_; + } + /** * Removes all elements from this array. */ void clear() { - for (T *i = begin(); i != end(); ++i) - i->~T(); - newed_end_ = begin(); + while (newed_end_ != begin()) + pop_back(); } protected: diff --git a/util/float_to_string.hh b/util/float_to_string.hh index d1104e790..930532734 100644 --- a/util/float_to_string.hh +++ b/util/float_to_string.hh @@ -8,13 +8,13 @@ namespace util { template <> struct ToStringBuf<double> { // DoubleToStringConverter::kBase10MaximalLength + 1 for null paranoia. - static const unsigned kBytes = 18; + static const unsigned kBytes = 19; }; // Single wasn't documented in double conversion, so be conservative and // say the same as double. template <> struct ToStringBuf<float> { - static const unsigned kBytes = 18; + static const unsigned kBytes = 19; }; char *ToString(double value, char *to); diff --git a/util/probing_hash_table.hh b/util/probing_hash_table.hh index f4192577b..f32b64ea3 100644 --- a/util/probing_hash_table.hh +++ b/util/probing_hash_table.hh @@ -92,8 +92,7 @@ template <class EntryT, class HashT, class EqualT = std::equal_to<typename Entry Key got(i->GetKey()); if (equal_(got, t.GetKey())) { out = i; return true; } if (equal_(got, invalid_)) { - UTIL_THROW_IF(++entries_ >= buckets_, ProbingSizeException, - "Hash table with " << buckets_ << " buckets is full."); + UTIL_THROW_IF(++entries_ >= buckets_, ProbingSizeException, "Hash table with " << buckets_ << " buckets is full."); *i = t; out = i; return false; diff --git a/util/probing_hash_table_benchmark_main.cc b/util/probing_hash_table_benchmark_main.cc index 9deeaac2d..3e12290cf 100644 --- a/util/probing_hash_table_benchmark_main.cc +++ b/util/probing_hash_table_benchmark_main.cc @@ -1,18 +1,9 @@ - #include "util/probing_hash_table.hh" #include "util/scoped.hh" #include "util/usage.hh" #include <boost/random/mersenne_twister.hpp> -#include <boost/version.hpp> -#if BOOST_VERSION / 100000 < 1 -#error BOOST_LIB_VERSION is to old. Time to upgrade. -#elif BOOST_VERSION / 100000 > 1 || BOOST_VERSION / 100 % 1000 > 46 #include <boost/random/uniform_int_distribution.hpp> -#define have_uniform_int_distribution -#else -#include <boost/random/uniform_int.hpp> -#endif #include <iostream> @@ -31,13 +22,8 @@ void Test(uint64_t entries, uint64_t lookups, float multiplier = 1.5) { std::size_t size = Table::Size(entries, multiplier); scoped_malloc backing(util::CallocOrThrow(size)); Table table(backing.get(), size); -#ifdef have_uniform_int_distribution boost::random::mt19937 gen; boost::random::uniform_int_distribution<> dist(std::numeric_limits<uint64_t>::min(), std::numeric_limits<uint64_t>::max()); -#else - boost::mt19937 gen; - boost::uniform_int<> dist(std::numeric_limits<uint64_t>::min(), std::numeric_limits<uint64_t>::max()); -#endif double start = UserTime(); for (uint64_t i = 0; i < entries; ++i) { Entry entry; diff --git a/util/stream/CMakeLists.txt b/util/stream/CMakeLists.txt new file mode 100644 index 000000000..d3eddbe5c --- /dev/null +++ b/util/stream/CMakeLists.txt @@ -0,0 +1,74 @@ +cmake_minimum_required(VERSION 2.8.8) +# +# The KenLM cmake files make use of add_library(... OBJECTS ...) +# +# This syntax allows grouping of source files when compiling +# (effectively creating "fake" libraries based on source subdirs). +# +# This syntax was only added in cmake version 2.8.8 +# +# see http://www.cmake.org/Wiki/CMake/Tutorials/Object_Library + + +# This CMake file was created by Lane Schwartz <dowobeha@gmail.com> + +# Explicitly list the source files for this subdirectory +# +# If you add any source files to this subdirectory +# that should be included in the kenlm library, +# (this excludes any unit test files) +# you should add them to the following list: +# +# In order to allow CMake files in the parent directory +# to see this variable definition, we set PARENT_SCOPE. +# +# In order to set correct paths to these files +# when this variable is referenced by CMake files in the parent directory, +# we prefix all files with ${CMAKE_CURRENT_SOURCE_DIR}. +# +set(KENLM_UTIL_STREAM_SOURCE + ${CMAKE_CURRENT_SOURCE_DIR}/chain.cc + ${CMAKE_CURRENT_SOURCE_DIR}/io.cc + ${CMAKE_CURRENT_SOURCE_DIR}/line_input.cc + ${CMAKE_CURRENT_SOURCE_DIR}/multi_progress.cc + ${CMAKE_CURRENT_SOURCE_DIR}/rewindable_stream.cc + PARENT_SCOPE) + + + +if(BUILD_TESTING) + + # Explicitly list the Boost test files to be compiled + set(KENLM_BOOST_TESTS_LIST + io_test + sort_test + stream_test + ) + + # Iterate through the Boost tests list + foreach(test ${KENLM_BOOST_TESTS_LIST}) + + # Compile the executable, linking against the requisite dependent object files + add_executable(${test} ${test}.cc $<TARGET_OBJECTS:kenlm_util>) + + # Require the following compile flag + set_target_properties(${test} PROPERTIES COMPILE_FLAGS -DBOOST_TEST_DYN_LINK) + + # Link the executable against boost + target_link_libraries(${test} ${Boost_LIBRARIES}) + + # Specify command arguments for how to run each unit test + # + # Assuming that foo was defined via add_executable(foo ...), + # the syntax $<TARGET_FILE:foo> gives the full path to the executable. + # + add_test(NAME ${test}_test + COMMAND $<TARGET_FILE:${test}>) + + # Group unit tests together + set_target_properties(${test} PROPERTIES FOLDER "unit_tests") + + # End for loop + endforeach(test) + +endif() diff --git a/util/stream/Jamfile b/util/stream/Jamfile index cde0247e7..de9d41c5f 100644 --- a/util/stream/Jamfile +++ b/util/stream/Jamfile @@ -1,10 +1,4 @@ -#if $(BOOST-VERSION) >= 104800 { -# timer-link = <library>/top//boost_timer ; -#} else { -# timer-link = ; -#} - -fakelib stream : chain.cc rewindable_stream.cc io.cc line_input.cc multi_progress.cc ..//kenutil /top//boost_thread : : : <library>/top//boost_thread ; +fakelib stream : [ glob *.cc : *_test.cc ] ..//kenutil /top//boost_thread : : : <library>/top//boost_thread ; import testing ; unit-test io_test : io_test.cc stream /top//boost_unit_test_framework ; diff --git a/util/stream/chain.cc b/util/stream/chain.cc index 39f2f3fbb..6bc000522 100644 --- a/util/stream/chain.cc +++ b/util/stream/chain.cc @@ -126,7 +126,7 @@ Link::~Link() { if (current_) { // Probably an exception unwinding. std::cerr << "Last input should have been poison." << std::endl; - // abort(); + abort(); } else { if (!poisoned_) { // Poison is a block whose memory pointer is NULL. diff --git a/util/stream/chain.hh b/util/stream/chain.hh index 8caa1afcb..296982260 100644 --- a/util/stream/chain.hh +++ b/util/stream/chain.hh @@ -74,11 +74,11 @@ class Thread { * This method is called automatically by this class's @ref Thread() "constructor". */ template <class Position, class Worker> void operator()(const Position &position, Worker &worker) { - try { +// try { worker.Run(position); - } catch (const std::exception &e) { - UnhandledException(e); - } +// } catch (const std::exception &e) { +// UnhandledException(e); +// } } private: @@ -158,6 +158,13 @@ class Chain { return block_size_; } + /** + * Number of blocks going through the Chain. + */ + std::size_t BlockCount() const { + return config_.block_count; + } + /** Two ways to add to the chain: Add() or operator>>. */ ChainPosition Add(); diff --git a/util/stream/count_records.cc b/util/stream/count_records.cc new file mode 100644 index 000000000..bdadad713 --- /dev/null +++ b/util/stream/count_records.cc @@ -0,0 +1,12 @@ +#include "util/stream/count_records.hh" +#include "util/stream/chain.hh" + +namespace util { namespace stream { + +void CountRecords::Run(const ChainPosition &position) { + for (Link link(position); link; ++link) { + *count_ += link->ValidSize() / position.GetChain().EntrySize(); + } +} + +}} // namespaces diff --git a/util/stream/count_records.hh b/util/stream/count_records.hh new file mode 100644 index 000000000..e3f7c94af --- /dev/null +++ b/util/stream/count_records.hh @@ -0,0 +1,20 @@ +#include <stdint.h> + +namespace util { namespace stream { + +class ChainPosition; + +class CountRecords { + public: + explicit CountRecords(uint64_t *out) + : count_(out) { + *count_ = 0; + } + + void Run(const ChainPosition &position); + + private: + uint64_t *count_; +}; + +}} // namespaces diff --git a/util/stream/multi_stream.hh b/util/stream/multi_stream.hh index b1461f964..6381fc2ed 100644 --- a/util/stream/multi_stream.hh +++ b/util/stream/multi_stream.hh @@ -20,6 +20,9 @@ class ChainPositions : public util::FixedArray<util::stream::ChainPosition> { public: ChainPositions() {} + explicit ChainPositions(std::size_t bound) : + util::FixedArray<util::stream::ChainPosition>(bound) {} + void Init(Chains &chains); explicit ChainPositions(Chains &chains) { @@ -88,16 +91,6 @@ template <class T> class GenericStreams : public util::FixedArray<T> { public: GenericStreams() {} - // This puts a dummy T at the beginning (useful to algorithms that need to reference something at the beginning). - void InitWithDummy(const ChainPositions &positions) { - P::Init(positions.size() + 1); - new (P::end()) T(); // use "placement new" syntax to initalize T in an already-allocated memory location - P::Constructed(); - for (const util::stream::ChainPosition *i = positions.begin(); i != positions.end(); ++i) { - P::push_back(*i); - } - } - // Limit restricts to positions[0,limit) void Init(const ChainPositions &positions, std::size_t limit) { P::Init(limit); @@ -112,6 +105,10 @@ template <class T> class GenericStreams : public util::FixedArray<T> { GenericStreams(const ChainPositions &positions) { Init(positions); } + + void Init(std::size_t amount) { + P::Init(amount); + } }; template <class T> inline Chains &operator>>(Chains &chains, GenericStreams<T> &streams) { diff --git a/util/stream/rewindable_stream.cc b/util/stream/rewindable_stream.cc index c7e39231b..2867bf8ab 100644 --- a/util/stream/rewindable_stream.cc +++ b/util/stream/rewindable_stream.cc @@ -13,105 +13,120 @@ void RewindableStream::Init(const ChainPosition &position) { UTIL_THROW_IF2(in_, "RewindableStream::Init twice"); in_ = position.in_; out_ = position.out_; + hit_poison_ = false; poisoned_ = false; progress_ = position.progress_; entry_size_ = position.GetChain().EntrySize(); block_size_ = position.GetChain().BlockSize(); - FetchBlock(); - current_bl_ = &second_bl_; - current_ = static_cast<uint8_t*>(current_bl_->Get()); - end_ = current_ + current_bl_->ValidSize(); -} - -const void *RewindableStream::Get() const { - return current_; -} - -void *RewindableStream::Get() { - return current_; + block_count_ = position.GetChain().BlockCount(); + blocks_it_ = 0; + marked_ = NULL; + UTIL_THROW_IF2(block_count_ < 2, "RewindableStream needs block_count at least two"); + AppendBlock(); } RewindableStream &RewindableStream::operator++() { assert(*this); - assert(current_ < end_); + assert(current_ < block_end_); + assert(current_); + assert(blocks_it_ < blocks_.size()); current_ += entry_size_; - if (current_ == end_) { - // two cases: either we need to fetch the next block, or we've already - // fetched it before. We can check this by looking at the current_bl_ - // pointer: if it's at the second_bl_, we need to flush and fetch a new - // block. Otherwise, we can just move over to the second block. - if (current_bl_ == &second_bl_) { - if (first_bl_) { - out_->Produce(first_bl_); - progress_ += first_bl_.ValidSize(); + if (UTIL_UNLIKELY(current_ == block_end_)) { + // Fetch another block if necessary. + if (++blocks_it_ == blocks_.size()) { + if (!marked_) { + Flush(blocks_.begin() + blocks_it_); + blocks_it_ = 0; } - first_bl_ = second_bl_; - FetchBlock(); + AppendBlock(); + assert(poisoned_ || (blocks_it_ == blocks_.size() - 1)); + if (poisoned_) return *this; } - current_bl_ = &second_bl_; - current_ = static_cast<uint8_t *>(second_bl_.Get()); - end_ = current_ + second_bl_.ValidSize(); - } - - if (!*current_bl_) - { - if (current_bl_ == &second_bl_ && first_bl_) - { - out_->Produce(first_bl_); - progress_ += first_bl_.ValidSize(); - } - out_->Produce(*current_bl_); - poisoned_ = true; + Block &cur_block = blocks_[blocks_it_]; + current_ = static_cast<uint8_t*>(cur_block.Get()); + block_end_ = current_ + cur_block.ValidSize(); } - + assert(current_); + assert(current_ >= static_cast<uint8_t*>(blocks_[blocks_it_].Get())); + assert(current_ < block_end_); + assert(block_end_ == blocks_[blocks_it_].ValidEnd()); return *this; } -void RewindableStream::FetchBlock() { - // The loop is needed since it is *feasible* that we're given 0 sized but - // valid blocks - do { - in_->Consume(second_bl_); - } while (second_bl_ && second_bl_.ValidSize() == 0); -} - void RewindableStream::Mark() { marked_ = current_; + Flush(blocks_.begin() + blocks_it_); + blocks_it_ = 0; } void RewindableStream::Rewind() { - if (marked_ >= first_bl_.Get() && marked_ < first_bl_.ValidEnd()) { - current_bl_ = &first_bl_; - current_ = marked_; - } else if (marked_ >= second_bl_.Get() && marked_ < second_bl_.ValidEnd()) { - current_bl_ = &second_bl_; - current_ = marked_; - } else { UTIL_THROW2("RewindableStream rewound too far"); } + if (current_ != marked_) { + poisoned_ = false; + } + blocks_it_ = 0; + current_ = marked_; + block_end_ = static_cast<const uint8_t*>(blocks_[blocks_it_].ValidEnd()); + + assert(current_); + assert(current_ >= static_cast<uint8_t*>(blocks_[blocks_it_].Get())); + assert(current_ < block_end_); + assert(block_end_ == blocks_[blocks_it_].ValidEnd()); } void RewindableStream::Poison() { - assert(!poisoned_); + if (blocks_.empty()) return; + assert(*this); + assert(blocks_it_ == blocks_.size() - 1); - // Three things: if we have a buffered first block, we need to produce it - // first. Then, produce the partial "current" block, and then send the - // poison down the chain + // Produce all buffered blocks. + blocks_.back().SetValidSize(current_ - static_cast<uint8_t*>(blocks_.back().Get())); + Flush(blocks_.end()); + blocks_it_ = 0; - // if we still have a buffered first block, produce it first - if (current_bl_ == &second_bl_ && first_bl_) { - out_->Produce(first_bl_); - progress_ += first_bl_.ValidSize(); + Block poison; + if (!hit_poison_) { + in_->Consume(poison); } + poison.SetToPoison(); + out_->Produce(poison); + hit_poison_ = true; + poisoned_ = true; +} - // send our partial block - current_bl_->SetValidSize(current_ - - static_cast<uint8_t *>(current_bl_->Get())); - out_->Produce(*current_bl_); - progress_ += current_bl_->ValidSize(); +void RewindableStream::AppendBlock() { + if (UTIL_UNLIKELY(blocks_.size() >= block_count_)) { + std::cerr << "RewindableStream trying to use more blocks than available" << std::endl; + abort(); + } + if (UTIL_UNLIKELY(hit_poison_)) { + poisoned_ = true; + return; + } + Block get; + // The loop is needed since it is *feasible* that we're given 0 sized but + // valid blocks + do { + in_->Consume(get); + if (UTIL_LIKELY(get)) { + blocks_.push_back(get); + } else { + hit_poison_ = true; + poisoned_ = true; + return; + } + } while (UTIL_UNLIKELY(get.ValidSize() == 0)); + current_ = static_cast<uint8_t*>(blocks_.back().Get()); + block_end_ = static_cast<const uint8_t*>(blocks_.back().ValidEnd()); + blocks_it_ = blocks_.size() - 1; +} - // send down the poison - current_bl_->SetToPoison(); - out_->Produce(*current_bl_); - poisoned_ = true; +void RewindableStream::Flush(std::deque<Block>::iterator to) { + for (std::deque<Block>::iterator i = blocks_.begin(); i != to; ++i) { + out_->Produce(*i); + progress_ += i->ValidSize(); + } + blocks_.erase(blocks_.begin(), to); } + } } diff --git a/util/stream/rewindable_stream.hh b/util/stream/rewindable_stream.hh index 9ee637c99..560825cde 100644 --- a/util/stream/rewindable_stream.hh +++ b/util/stream/rewindable_stream.hh @@ -5,6 +5,8 @@ #include <boost/noncopyable.hpp> +#include <deque> + namespace util { namespace stream { @@ -23,6 +25,10 @@ class RewindableStream : boost::noncopyable { */ RewindableStream(); + ~RewindableStream() { + Poison(); + } + /** * Initializes an existing RewindableStream at a specific position in * a Chain. @@ -38,21 +44,32 @@ class RewindableStream : boost::noncopyable { * * Equivalent to RewindableStream a(); a.Init(....); */ - explicit RewindableStream(const ChainPosition &position); + explicit RewindableStream(const ChainPosition &position) + : in_(NULL) { + Init(position); + } /** * Gets the record at the current stream position. Const version. */ - const void *Get() const; + const void *Get() const { + assert(!poisoned_); + assert(current_); + return current_; + } /** * Gets the record at the current stream position. */ - void *Get(); + void *Get() { + assert(!poisoned_); + assert(current_); + return current_; + } - operator bool() const { return current_; } + operator bool() const { return !poisoned_; } - bool operator!() const { return !(*this); } + bool operator!() const { return poisoned_; } /** * Marks the current position in the stream to be rewound to later. @@ -80,19 +97,26 @@ class RewindableStream : boost::noncopyable { void Poison(); private: - void FetchBlock(); + void AppendBlock(); + + void Flush(std::deque<Block>::iterator to); + + std::deque<Block> blocks_; + // current_ is in blocks_[blocks_it_] unless poisoned_. + std::size_t blocks_it_; std::size_t entry_size_; std::size_t block_size_; + std::size_t block_count_; - uint8_t *marked_, *current_, *end_; - - Block first_bl_; - Block second_bl_; - Block* current_bl_; + uint8_t *marked_, *current_; + const uint8_t *block_end_; PCQueue<Block> *in_, *out_; + // Have we hit poison at the end of the stream, even if rewinding? + bool hit_poison_; + // Is the curren position poison? bool poisoned_; WorkerProgress progress_; diff --git a/util/stream/rewindable_stream_test.cc b/util/stream/rewindable_stream_test.cc index 3ed87f372..f8924c3c7 100644 --- a/util/stream/rewindable_stream_test.cc +++ b/util/stream/rewindable_stream_test.cc @@ -22,8 +22,8 @@ BOOST_AUTO_TEST_CASE(RewindableStreamTest) { config.total_memory = 100; config.block_count = 6; - RewindableStream s; Chain chain(config); + RewindableStream s; chain >> Read(in.get()) >> s >> kRecycle; uint64_t i = 0; for (; s; ++s, ++i) { |