Welcome to mirror list, hosted at ThFree Co, Russian Federation.

github.com/moses-smt/mosesdecoder.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
path: root/util
diff options
context:
space:
mode:
authorKenneth Heafield <github@kheafield.com>2015-08-27 12:55:52 +0300
committerKenneth Heafield <github@kheafield.com>2015-08-27 12:55:52 +0300
commit09ecd071f9571a3808fc43700186fd777564785b (patch)
tree98236edb11f43a01980fe2ebaf397bf3ac344b53 /util
parent380b5a5dfd99a7df4ea8f270abf72debe5e9cb6e (diff)
KenLM 2a3e8fae3633c890cb3b342d461f9130c8e343fa excluding unfinished interpolation directory
Diffstat (limited to 'util')
-rw-r--r--util/CMakeLists.txt109
-rw-r--r--util/double-conversion/CMakeLists.txt39
-rw-r--r--util/file.cc13
-rw-r--r--util/file.hh22
-rw-r--r--util/fixed_array.hh35
-rw-r--r--util/float_to_string.hh4
-rw-r--r--util/probing_hash_table.hh3
-rw-r--r--util/probing_hash_table_benchmark_main.cc14
-rw-r--r--util/stream/CMakeLists.txt74
-rw-r--r--util/stream/Jamfile8
-rw-r--r--util/stream/chain.cc2
-rw-r--r--util/stream/chain.hh15
-rw-r--r--util/stream/count_records.cc12
-rw-r--r--util/stream/count_records.hh20
-rw-r--r--util/stream/multi_stream.hh17
-rw-r--r--util/stream/rewindable_stream.cc157
-rw-r--r--util/stream/rewindable_stream.hh46
-rw-r--r--util/stream/rewindable_stream_test.cc2
18 files changed, 462 insertions, 130 deletions
diff --git a/util/CMakeLists.txt b/util/CMakeLists.txt
new file mode 100644
index 000000000..c52cdbc06
--- /dev/null
+++ b/util/CMakeLists.txt
@@ -0,0 +1,109 @@
+cmake_minimum_required(VERSION 2.8.8)
+#
+# The KenLM cmake files make use of add_library(... OBJECTS ...)
+#
+# This syntax allows grouping of source files when compiling
+# (effectively creating "fake" libraries based on source subdirs).
+#
+# This syntax was only added in cmake version 2.8.8
+#
+# see http://www.cmake.org/Wiki/CMake/Tutorials/Object_Library
+
+
+# This CMake file was created by Lane Schwartz <dowobeha@gmail.com>
+
+
+# Explicitly list the source files for this subdirectory
+#
+# If you add any source files to this subdirectory
+# that should be included in the kenlm library,
+# (this excludes any unit test files)
+# you should add them to the following list:
+#
+# Because we do not set PARENT_SCOPE in the following definition,
+# CMake files in the parent directory won't be able to access this variable.
+#
+set(KENLM_UTIL_SOURCE
+ bit_packing.cc
+ ersatz_progress.cc
+ exception.cc
+ file.cc
+ file_piece.cc
+ float_to_string.cc
+ integer_to_string.cc
+ mmap.cc
+ murmur_hash.cc
+ parallel_read.cc
+ pool.cc
+ read_compressed.cc
+ scoped.cc
+ string_piece.cc
+ usage.cc
+ )
+
+# This directory has children that need to be processed
+add_subdirectory(double-conversion)
+add_subdirectory(stream)
+
+
+# Group these objects together for later use.
+#
+# Given add_library(foo OBJECT ${my_foo_sources}),
+# refer to these objects as $<TARGET_OBJECTS:foo>
+#
+add_library(kenlm_util OBJECT ${KENLM_UTIL_DOUBLECONVERSION_SOURCE} ${KENLM_UTIL_STREAM_SOURCE} ${KENLM_UTIL_SOURCE})
+
+
+
+# Only compile and run unit tests if tests should be run
+if(BUILD_TESTING)
+
+ # Explicitly list the Boost test files to be compiled
+ set(KENLM_BOOST_TESTS_LIST
+ bit_packing_test
+ file_piece_test
+ joint_sort_test
+ multi_intersection_test
+ probing_hash_table_test
+ read_compressed_test
+ sorted_uniform_test
+ tokenize_piece_test
+ )
+
+ # Iterate through the Boost tests list
+ foreach(test ${KENLM_BOOST_TESTS_LIST})
+
+ # Compile the executable, linking against the requisite dependent object files
+ add_executable(${test} ${test}.cc $<TARGET_OBJECTS:kenlm_util>)
+
+ # Require the following compile flag
+ set_target_properties(${test} PROPERTIES COMPILE_FLAGS -DBOOST_TEST_DYN_LINK)
+
+ # Link the executable against boost
+ target_link_libraries(${test} ${Boost_LIBRARIES})
+
+ # file_piece_test requires an extra command line parameter
+ if ("${test}" STREQUAL "file_piece_test")
+ set(test_params
+ ${CMAKE_CURRENT_SOURCE_DIR}/file_piece.cc
+ )
+ else()
+ set(test_params
+ )
+ endif()
+
+ # Specify command arguments for how to run each unit test
+ #
+ # Assuming that foo was defined via add_executable(foo ...),
+ # the syntax $<TARGET_FILE:foo> gives the full path to the executable.
+ #
+ add_test(NAME ${test}_test
+ COMMAND $<TARGET_FILE:${test}> ${test_params})
+
+ # Group unit tests together
+ set_target_properties(${test} PROPERTIES FOLDER "unit_tests")
+
+ # End for loop
+ endforeach(test)
+
+endif()
diff --git a/util/double-conversion/CMakeLists.txt b/util/double-conversion/CMakeLists.txt
new file mode 100644
index 000000000..e2cf02aa6
--- /dev/null
+++ b/util/double-conversion/CMakeLists.txt
@@ -0,0 +1,39 @@
+cmake_minimum_required(VERSION 2.8.8)
+#
+# The KenLM cmake files make use of add_library(... OBJECTS ...)
+#
+# This syntax allows grouping of source files when compiling
+# (effectively creating "fake" libraries based on source subdirs).
+#
+# This syntax was only added in cmake version 2.8.8
+#
+# see http://www.cmake.org/Wiki/CMake/Tutorials/Object_Library
+
+
+# This CMake file was created by Lane Schwartz <dowobeha@gmail.com>
+
+# Explicitly list the source files for this subdirectory
+#
+# If you add any source files to this subdirectory
+# that should be included in the kenlm library,
+# (this excludes any unit test files)
+# you should add them to the following list:
+#
+# In order to allow CMake files in the parent directory
+# to see this variable definition, we set PARENT_SCOPE.
+#
+# In order to set correct paths to these files
+# when this variable is referenced by CMake files in the parent directory,
+# we prefix all files with ${CMAKE_CURRENT_SOURCE_DIR}.
+#
+set(KENLM_UTIL_DOUBLECONVERSION_SOURCE
+ ${CMAKE_CURRENT_SOURCE_DIR}/bignum-dtoa.cc
+ ${CMAKE_CURRENT_SOURCE_DIR}/bignum.cc
+ ${CMAKE_CURRENT_SOURCE_DIR}/cached-powers.cc
+ ${CMAKE_CURRENT_SOURCE_DIR}/diy-fp.cc
+ ${CMAKE_CURRENT_SOURCE_DIR}/double-conversion.cc
+ ${CMAKE_CURRENT_SOURCE_DIR}/fast-dtoa.cc
+ ${CMAKE_CURRENT_SOURCE_DIR}/fixed-dtoa.cc
+ ${CMAKE_CURRENT_SOURCE_DIR}/strtod.cc
+ PARENT_SCOPE)
+
diff --git a/util/file.cc b/util/file.cc
index 046b9ff90..be272f9bc 100644
--- a/util/file.cc
+++ b/util/file.cc
@@ -60,6 +60,14 @@ EndOfFileException::EndOfFileException() throw() {
}
EndOfFileException::~EndOfFileException() throw() {}
+bool InputFileIsStdin(StringPiece path) {
+ return path == "-" || path == "/dev/stdin";
+}
+
+bool OutputFileIsStdout(StringPiece path) {
+ return path == "-" || path == "/dev/stdout";
+}
+
int OpenReadOrThrow(const char *name) {
int ret;
#if defined(_WIN32) || defined(_WIN64)
@@ -111,7 +119,10 @@ uint64_t SizeOrThrow(int fd) {
}
void ResizeOrThrow(int fd, uint64_t to) {
-#if defined(_WIN32) || defined(_WIN64)
+#if defined __MINGW32__
+ // Does this handle 64-bit?
+ int ret = ftruncate
+#elif defined(_WIN32) || defined(_WIN64)
errno_t ret = _chsize_s
#elif defined(OS_ANDROID)
int ret = ftruncate64
diff --git a/util/file.hh b/util/file.hh
index bd5873cbc..f7cb4d688 100644
--- a/util/file.hh
+++ b/util/file.hh
@@ -78,6 +78,28 @@ int OpenReadOrThrow(const char *name);
// Create file if it doesn't exist, truncate if it does. Opened for write.
int CreateOrThrow(const char *name);
+/** Does the given input file path denote standard input?
+ *
+ * Returns true if, and only if, path is either "-" or "/dev/stdin".
+ *
+ * Opening standard input as a file may need some special treatment for
+ * portability. There's a convention that a dash ("-") in place of an input
+ * file path denotes standard input, but opening "/dev/stdin" may need to be
+ * special as well.
+ */
+bool InputPathIsStdin(StringPiece path);
+
+/** Does the given output file path denote standard output?
+ *
+ * Returns true if, and only if, path is either "-" or "/dev/stdout".
+ *
+ * Opening standard output as a file may need some special treatment for
+ * portability. There's a convention that a dash ("-") in place of an output
+ * file path denotes standard output, but opening "/dev/stdout" may need to be
+ * special as well.
+ */
+bool OutputPathIsStdout(StringPiece path);
+
// Return value for SizeFile when it can't size properly.
const uint64_t kBadSize = (uint64_t)-1;
uint64_t SizeFile(int fd);
diff --git a/util/fixed_array.hh b/util/fixed_array.hh
index 610cbdf12..9083d39ee 100644
--- a/util/fixed_array.hh
+++ b/util/fixed_array.hh
@@ -100,33 +100,56 @@ template <class T> class FixedArray {
*
* @param i Index of the object to reference
*/
- T &operator[](std::size_t i) { return begin()[i]; }
+ T &operator[](std::size_t i) {
+ assert(i < size());
+ return begin()[i];
+ }
/**
* Gets a const reference to the object with index i currently stored in this data structure.
*
* @param i Index of the object to reference
*/
- const T &operator[](std::size_t i) const { return begin()[i]; }
+ const T &operator[](std::size_t i) const {
+ assert(i < size());
+ return begin()[i];
+ }
/**
* Constructs a new object using the provided parameter,
* and stores it in this data structure.
*
* The memory backing the constructed object is managed by this data structure.
+ * I miss C++11 variadic templates.
*/
+ void push_back() {
+ new (end()) T();
+ Constructed();
+ }
template <class C> void push_back(const C &c) {
- new (end()) T(c); // use "placement new" syntax to initalize T in an already-allocated memory location
+ new (end()) T(c);
+ Constructed();
+ }
+ template <class C> void push_back(C &c) {
+ new (end()) T(c);
+ Constructed();
+ }
+ template <class C, class D> void push_back(const C &c, const D &d) {
+ new (end()) T(c, d);
Constructed();
}
+ void pop_back() {
+ back().~T();
+ --newed_end_;
+ }
+
/**
* Removes all elements from this array.
*/
void clear() {
- for (T *i = begin(); i != end(); ++i)
- i->~T();
- newed_end_ = begin();
+ while (newed_end_ != begin())
+ pop_back();
}
protected:
diff --git a/util/float_to_string.hh b/util/float_to_string.hh
index d1104e790..930532734 100644
--- a/util/float_to_string.hh
+++ b/util/float_to_string.hh
@@ -8,13 +8,13 @@ namespace util {
template <> struct ToStringBuf<double> {
// DoubleToStringConverter::kBase10MaximalLength + 1 for null paranoia.
- static const unsigned kBytes = 18;
+ static const unsigned kBytes = 19;
};
// Single wasn't documented in double conversion, so be conservative and
// say the same as double.
template <> struct ToStringBuf<float> {
- static const unsigned kBytes = 18;
+ static const unsigned kBytes = 19;
};
char *ToString(double value, char *to);
diff --git a/util/probing_hash_table.hh b/util/probing_hash_table.hh
index f4192577b..f32b64ea3 100644
--- a/util/probing_hash_table.hh
+++ b/util/probing_hash_table.hh
@@ -92,8 +92,7 @@ template <class EntryT, class HashT, class EqualT = std::equal_to<typename Entry
Key got(i->GetKey());
if (equal_(got, t.GetKey())) { out = i; return true; }
if (equal_(got, invalid_)) {
- UTIL_THROW_IF(++entries_ >= buckets_, ProbingSizeException,
- "Hash table with " << buckets_ << " buckets is full.");
+ UTIL_THROW_IF(++entries_ >= buckets_, ProbingSizeException, "Hash table with " << buckets_ << " buckets is full.");
*i = t;
out = i;
return false;
diff --git a/util/probing_hash_table_benchmark_main.cc b/util/probing_hash_table_benchmark_main.cc
index 9deeaac2d..3e12290cf 100644
--- a/util/probing_hash_table_benchmark_main.cc
+++ b/util/probing_hash_table_benchmark_main.cc
@@ -1,18 +1,9 @@
-
#include "util/probing_hash_table.hh"
#include "util/scoped.hh"
#include "util/usage.hh"
#include <boost/random/mersenne_twister.hpp>
-#include <boost/version.hpp>
-#if BOOST_VERSION / 100000 < 1
-#error BOOST_LIB_VERSION is to old. Time to upgrade.
-#elif BOOST_VERSION / 100000 > 1 || BOOST_VERSION / 100 % 1000 > 46
#include <boost/random/uniform_int_distribution.hpp>
-#define have_uniform_int_distribution
-#else
-#include <boost/random/uniform_int.hpp>
-#endif
#include <iostream>
@@ -31,13 +22,8 @@ void Test(uint64_t entries, uint64_t lookups, float multiplier = 1.5) {
std::size_t size = Table::Size(entries, multiplier);
scoped_malloc backing(util::CallocOrThrow(size));
Table table(backing.get(), size);
-#ifdef have_uniform_int_distribution
boost::random::mt19937 gen;
boost::random::uniform_int_distribution<> dist(std::numeric_limits<uint64_t>::min(), std::numeric_limits<uint64_t>::max());
-#else
- boost::mt19937 gen;
- boost::uniform_int<> dist(std::numeric_limits<uint64_t>::min(), std::numeric_limits<uint64_t>::max());
-#endif
double start = UserTime();
for (uint64_t i = 0; i < entries; ++i) {
Entry entry;
diff --git a/util/stream/CMakeLists.txt b/util/stream/CMakeLists.txt
new file mode 100644
index 000000000..d3eddbe5c
--- /dev/null
+++ b/util/stream/CMakeLists.txt
@@ -0,0 +1,74 @@
+cmake_minimum_required(VERSION 2.8.8)
+#
+# The KenLM cmake files make use of add_library(... OBJECTS ...)
+#
+# This syntax allows grouping of source files when compiling
+# (effectively creating "fake" libraries based on source subdirs).
+#
+# This syntax was only added in cmake version 2.8.8
+#
+# see http://www.cmake.org/Wiki/CMake/Tutorials/Object_Library
+
+
+# This CMake file was created by Lane Schwartz <dowobeha@gmail.com>
+
+# Explicitly list the source files for this subdirectory
+#
+# If you add any source files to this subdirectory
+# that should be included in the kenlm library,
+# (this excludes any unit test files)
+# you should add them to the following list:
+#
+# In order to allow CMake files in the parent directory
+# to see this variable definition, we set PARENT_SCOPE.
+#
+# In order to set correct paths to these files
+# when this variable is referenced by CMake files in the parent directory,
+# we prefix all files with ${CMAKE_CURRENT_SOURCE_DIR}.
+#
+set(KENLM_UTIL_STREAM_SOURCE
+ ${CMAKE_CURRENT_SOURCE_DIR}/chain.cc
+ ${CMAKE_CURRENT_SOURCE_DIR}/io.cc
+ ${CMAKE_CURRENT_SOURCE_DIR}/line_input.cc
+ ${CMAKE_CURRENT_SOURCE_DIR}/multi_progress.cc
+ ${CMAKE_CURRENT_SOURCE_DIR}/rewindable_stream.cc
+ PARENT_SCOPE)
+
+
+
+if(BUILD_TESTING)
+
+ # Explicitly list the Boost test files to be compiled
+ set(KENLM_BOOST_TESTS_LIST
+ io_test
+ sort_test
+ stream_test
+ )
+
+ # Iterate through the Boost tests list
+ foreach(test ${KENLM_BOOST_TESTS_LIST})
+
+ # Compile the executable, linking against the requisite dependent object files
+ add_executable(${test} ${test}.cc $<TARGET_OBJECTS:kenlm_util>)
+
+ # Require the following compile flag
+ set_target_properties(${test} PROPERTIES COMPILE_FLAGS -DBOOST_TEST_DYN_LINK)
+
+ # Link the executable against boost
+ target_link_libraries(${test} ${Boost_LIBRARIES})
+
+ # Specify command arguments for how to run each unit test
+ #
+ # Assuming that foo was defined via add_executable(foo ...),
+ # the syntax $<TARGET_FILE:foo> gives the full path to the executable.
+ #
+ add_test(NAME ${test}_test
+ COMMAND $<TARGET_FILE:${test}>)
+
+ # Group unit tests together
+ set_target_properties(${test} PROPERTIES FOLDER "unit_tests")
+
+ # End for loop
+ endforeach(test)
+
+endif()
diff --git a/util/stream/Jamfile b/util/stream/Jamfile
index cde0247e7..de9d41c5f 100644
--- a/util/stream/Jamfile
+++ b/util/stream/Jamfile
@@ -1,10 +1,4 @@
-#if $(BOOST-VERSION) >= 104800 {
-# timer-link = <library>/top//boost_timer ;
-#} else {
-# timer-link = ;
-#}
-
-fakelib stream : chain.cc rewindable_stream.cc io.cc line_input.cc multi_progress.cc ..//kenutil /top//boost_thread : : : <library>/top//boost_thread ;
+fakelib stream : [ glob *.cc : *_test.cc ] ..//kenutil /top//boost_thread : : : <library>/top//boost_thread ;
import testing ;
unit-test io_test : io_test.cc stream /top//boost_unit_test_framework ;
diff --git a/util/stream/chain.cc b/util/stream/chain.cc
index 39f2f3fbb..6bc000522 100644
--- a/util/stream/chain.cc
+++ b/util/stream/chain.cc
@@ -126,7 +126,7 @@ Link::~Link() {
if (current_) {
// Probably an exception unwinding.
std::cerr << "Last input should have been poison." << std::endl;
- // abort();
+ abort();
} else {
if (!poisoned_) {
// Poison is a block whose memory pointer is NULL.
diff --git a/util/stream/chain.hh b/util/stream/chain.hh
index 8caa1afcb..296982260 100644
--- a/util/stream/chain.hh
+++ b/util/stream/chain.hh
@@ -74,11 +74,11 @@ class Thread {
* This method is called automatically by this class's @ref Thread() "constructor".
*/
template <class Position, class Worker> void operator()(const Position &position, Worker &worker) {
- try {
+// try {
worker.Run(position);
- } catch (const std::exception &e) {
- UnhandledException(e);
- }
+// } catch (const std::exception &e) {
+// UnhandledException(e);
+// }
}
private:
@@ -158,6 +158,13 @@ class Chain {
return block_size_;
}
+ /**
+ * Number of blocks going through the Chain.
+ */
+ std::size_t BlockCount() const {
+ return config_.block_count;
+ }
+
/** Two ways to add to the chain: Add() or operator>>. */
ChainPosition Add();
diff --git a/util/stream/count_records.cc b/util/stream/count_records.cc
new file mode 100644
index 000000000..bdadad713
--- /dev/null
+++ b/util/stream/count_records.cc
@@ -0,0 +1,12 @@
+#include "util/stream/count_records.hh"
+#include "util/stream/chain.hh"
+
+namespace util { namespace stream {
+
+void CountRecords::Run(const ChainPosition &position) {
+ for (Link link(position); link; ++link) {
+ *count_ += link->ValidSize() / position.GetChain().EntrySize();
+ }
+}
+
+}} // namespaces
diff --git a/util/stream/count_records.hh b/util/stream/count_records.hh
new file mode 100644
index 000000000..e3f7c94af
--- /dev/null
+++ b/util/stream/count_records.hh
@@ -0,0 +1,20 @@
+#include <stdint.h>
+
+namespace util { namespace stream {
+
+class ChainPosition;
+
+class CountRecords {
+ public:
+ explicit CountRecords(uint64_t *out)
+ : count_(out) {
+ *count_ = 0;
+ }
+
+ void Run(const ChainPosition &position);
+
+ private:
+ uint64_t *count_;
+};
+
+}} // namespaces
diff --git a/util/stream/multi_stream.hh b/util/stream/multi_stream.hh
index b1461f964..6381fc2ed 100644
--- a/util/stream/multi_stream.hh
+++ b/util/stream/multi_stream.hh
@@ -20,6 +20,9 @@ class ChainPositions : public util::FixedArray<util::stream::ChainPosition> {
public:
ChainPositions() {}
+ explicit ChainPositions(std::size_t bound) :
+ util::FixedArray<util::stream::ChainPosition>(bound) {}
+
void Init(Chains &chains);
explicit ChainPositions(Chains &chains) {
@@ -88,16 +91,6 @@ template <class T> class GenericStreams : public util::FixedArray<T> {
public:
GenericStreams() {}
- // This puts a dummy T at the beginning (useful to algorithms that need to reference something at the beginning).
- void InitWithDummy(const ChainPositions &positions) {
- P::Init(positions.size() + 1);
- new (P::end()) T(); // use "placement new" syntax to initalize T in an already-allocated memory location
- P::Constructed();
- for (const util::stream::ChainPosition *i = positions.begin(); i != positions.end(); ++i) {
- P::push_back(*i);
- }
- }
-
// Limit restricts to positions[0,limit)
void Init(const ChainPositions &positions, std::size_t limit) {
P::Init(limit);
@@ -112,6 +105,10 @@ template <class T> class GenericStreams : public util::FixedArray<T> {
GenericStreams(const ChainPositions &positions) {
Init(positions);
}
+
+ void Init(std::size_t amount) {
+ P::Init(amount);
+ }
};
template <class T> inline Chains &operator>>(Chains &chains, GenericStreams<T> &streams) {
diff --git a/util/stream/rewindable_stream.cc b/util/stream/rewindable_stream.cc
index c7e39231b..2867bf8ab 100644
--- a/util/stream/rewindable_stream.cc
+++ b/util/stream/rewindable_stream.cc
@@ -13,105 +13,120 @@ void RewindableStream::Init(const ChainPosition &position) {
UTIL_THROW_IF2(in_, "RewindableStream::Init twice");
in_ = position.in_;
out_ = position.out_;
+ hit_poison_ = false;
poisoned_ = false;
progress_ = position.progress_;
entry_size_ = position.GetChain().EntrySize();
block_size_ = position.GetChain().BlockSize();
- FetchBlock();
- current_bl_ = &second_bl_;
- current_ = static_cast<uint8_t*>(current_bl_->Get());
- end_ = current_ + current_bl_->ValidSize();
-}
-
-const void *RewindableStream::Get() const {
- return current_;
-}
-
-void *RewindableStream::Get() {
- return current_;
+ block_count_ = position.GetChain().BlockCount();
+ blocks_it_ = 0;
+ marked_ = NULL;
+ UTIL_THROW_IF2(block_count_ < 2, "RewindableStream needs block_count at least two");
+ AppendBlock();
}
RewindableStream &RewindableStream::operator++() {
assert(*this);
- assert(current_ < end_);
+ assert(current_ < block_end_);
+ assert(current_);
+ assert(blocks_it_ < blocks_.size());
current_ += entry_size_;
- if (current_ == end_) {
- // two cases: either we need to fetch the next block, or we've already
- // fetched it before. We can check this by looking at the current_bl_
- // pointer: if it's at the second_bl_, we need to flush and fetch a new
- // block. Otherwise, we can just move over to the second block.
- if (current_bl_ == &second_bl_) {
- if (first_bl_) {
- out_->Produce(first_bl_);
- progress_ += first_bl_.ValidSize();
+ if (UTIL_UNLIKELY(current_ == block_end_)) {
+ // Fetch another block if necessary.
+ if (++blocks_it_ == blocks_.size()) {
+ if (!marked_) {
+ Flush(blocks_.begin() + blocks_it_);
+ blocks_it_ = 0;
}
- first_bl_ = second_bl_;
- FetchBlock();
+ AppendBlock();
+ assert(poisoned_ || (blocks_it_ == blocks_.size() - 1));
+ if (poisoned_) return *this;
}
- current_bl_ = &second_bl_;
- current_ = static_cast<uint8_t *>(second_bl_.Get());
- end_ = current_ + second_bl_.ValidSize();
- }
-
- if (!*current_bl_)
- {
- if (current_bl_ == &second_bl_ && first_bl_)
- {
- out_->Produce(first_bl_);
- progress_ += first_bl_.ValidSize();
- }
- out_->Produce(*current_bl_);
- poisoned_ = true;
+ Block &cur_block = blocks_[blocks_it_];
+ current_ = static_cast<uint8_t*>(cur_block.Get());
+ block_end_ = current_ + cur_block.ValidSize();
}
-
+ assert(current_);
+ assert(current_ >= static_cast<uint8_t*>(blocks_[blocks_it_].Get()));
+ assert(current_ < block_end_);
+ assert(block_end_ == blocks_[blocks_it_].ValidEnd());
return *this;
}
-void RewindableStream::FetchBlock() {
- // The loop is needed since it is *feasible* that we're given 0 sized but
- // valid blocks
- do {
- in_->Consume(second_bl_);
- } while (second_bl_ && second_bl_.ValidSize() == 0);
-}
-
void RewindableStream::Mark() {
marked_ = current_;
+ Flush(blocks_.begin() + blocks_it_);
+ blocks_it_ = 0;
}
void RewindableStream::Rewind() {
- if (marked_ >= first_bl_.Get() && marked_ < first_bl_.ValidEnd()) {
- current_bl_ = &first_bl_;
- current_ = marked_;
- } else if (marked_ >= second_bl_.Get() && marked_ < second_bl_.ValidEnd()) {
- current_bl_ = &second_bl_;
- current_ = marked_;
- } else { UTIL_THROW2("RewindableStream rewound too far"); }
+ if (current_ != marked_) {
+ poisoned_ = false;
+ }
+ blocks_it_ = 0;
+ current_ = marked_;
+ block_end_ = static_cast<const uint8_t*>(blocks_[blocks_it_].ValidEnd());
+
+ assert(current_);
+ assert(current_ >= static_cast<uint8_t*>(blocks_[blocks_it_].Get()));
+ assert(current_ < block_end_);
+ assert(block_end_ == blocks_[blocks_it_].ValidEnd());
}
void RewindableStream::Poison() {
- assert(!poisoned_);
+ if (blocks_.empty()) return;
+ assert(*this);
+ assert(blocks_it_ == blocks_.size() - 1);
- // Three things: if we have a buffered first block, we need to produce it
- // first. Then, produce the partial "current" block, and then send the
- // poison down the chain
+ // Produce all buffered blocks.
+ blocks_.back().SetValidSize(current_ - static_cast<uint8_t*>(blocks_.back().Get()));
+ Flush(blocks_.end());
+ blocks_it_ = 0;
- // if we still have a buffered first block, produce it first
- if (current_bl_ == &second_bl_ && first_bl_) {
- out_->Produce(first_bl_);
- progress_ += first_bl_.ValidSize();
+ Block poison;
+ if (!hit_poison_) {
+ in_->Consume(poison);
}
+ poison.SetToPoison();
+ out_->Produce(poison);
+ hit_poison_ = true;
+ poisoned_ = true;
+}
- // send our partial block
- current_bl_->SetValidSize(current_
- - static_cast<uint8_t *>(current_bl_->Get()));
- out_->Produce(*current_bl_);
- progress_ += current_bl_->ValidSize();
+void RewindableStream::AppendBlock() {
+ if (UTIL_UNLIKELY(blocks_.size() >= block_count_)) {
+ std::cerr << "RewindableStream trying to use more blocks than available" << std::endl;
+ abort();
+ }
+ if (UTIL_UNLIKELY(hit_poison_)) {
+ poisoned_ = true;
+ return;
+ }
+ Block get;
+ // The loop is needed since it is *feasible* that we're given 0 sized but
+ // valid blocks
+ do {
+ in_->Consume(get);
+ if (UTIL_LIKELY(get)) {
+ blocks_.push_back(get);
+ } else {
+ hit_poison_ = true;
+ poisoned_ = true;
+ return;
+ }
+ } while (UTIL_UNLIKELY(get.ValidSize() == 0));
+ current_ = static_cast<uint8_t*>(blocks_.back().Get());
+ block_end_ = static_cast<const uint8_t*>(blocks_.back().ValidEnd());
+ blocks_it_ = blocks_.size() - 1;
+}
- // send down the poison
- current_bl_->SetToPoison();
- out_->Produce(*current_bl_);
- poisoned_ = true;
+void RewindableStream::Flush(std::deque<Block>::iterator to) {
+ for (std::deque<Block>::iterator i = blocks_.begin(); i != to; ++i) {
+ out_->Produce(*i);
+ progress_ += i->ValidSize();
+ }
+ blocks_.erase(blocks_.begin(), to);
}
+
}
}
diff --git a/util/stream/rewindable_stream.hh b/util/stream/rewindable_stream.hh
index 9ee637c99..560825cde 100644
--- a/util/stream/rewindable_stream.hh
+++ b/util/stream/rewindable_stream.hh
@@ -5,6 +5,8 @@
#include <boost/noncopyable.hpp>
+#include <deque>
+
namespace util {
namespace stream {
@@ -23,6 +25,10 @@ class RewindableStream : boost::noncopyable {
*/
RewindableStream();
+ ~RewindableStream() {
+ Poison();
+ }
+
/**
* Initializes an existing RewindableStream at a specific position in
* a Chain.
@@ -38,21 +44,32 @@ class RewindableStream : boost::noncopyable {
*
* Equivalent to RewindableStream a(); a.Init(....);
*/
- explicit RewindableStream(const ChainPosition &position);
+ explicit RewindableStream(const ChainPosition &position)
+ : in_(NULL) {
+ Init(position);
+ }
/**
* Gets the record at the current stream position. Const version.
*/
- const void *Get() const;
+ const void *Get() const {
+ assert(!poisoned_);
+ assert(current_);
+ return current_;
+ }
/**
* Gets the record at the current stream position.
*/
- void *Get();
+ void *Get() {
+ assert(!poisoned_);
+ assert(current_);
+ return current_;
+ }
- operator bool() const { return current_; }
+ operator bool() const { return !poisoned_; }
- bool operator!() const { return !(*this); }
+ bool operator!() const { return poisoned_; }
/**
* Marks the current position in the stream to be rewound to later.
@@ -80,19 +97,26 @@ class RewindableStream : boost::noncopyable {
void Poison();
private:
- void FetchBlock();
+ void AppendBlock();
+
+ void Flush(std::deque<Block>::iterator to);
+
+ std::deque<Block> blocks_;
+ // current_ is in blocks_[blocks_it_] unless poisoned_.
+ std::size_t blocks_it_;
std::size_t entry_size_;
std::size_t block_size_;
+ std::size_t block_count_;
- uint8_t *marked_, *current_, *end_;
-
- Block first_bl_;
- Block second_bl_;
- Block* current_bl_;
+ uint8_t *marked_, *current_;
+ const uint8_t *block_end_;
PCQueue<Block> *in_, *out_;
+ // Have we hit poison at the end of the stream, even if rewinding?
+ bool hit_poison_;
+ // Is the curren position poison?
bool poisoned_;
WorkerProgress progress_;
diff --git a/util/stream/rewindable_stream_test.cc b/util/stream/rewindable_stream_test.cc
index 3ed87f372..f8924c3c7 100644
--- a/util/stream/rewindable_stream_test.cc
+++ b/util/stream/rewindable_stream_test.cc
@@ -22,8 +22,8 @@ BOOST_AUTO_TEST_CASE(RewindableStreamTest) {
config.total_memory = 100;
config.block_count = 6;
- RewindableStream s;
Chain chain(config);
+ RewindableStream s;
chain >> Read(in.get()) >> s >> kRecycle;
uint64_t i = 0;
for (; s; ++s, ++i) {