Welcome to mirror list, hosted at ThFree Co, Russian Federation.

github.com/moses-smt/mosesdecoder.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
Diffstat (limited to 'util/stream/rewindable_stream.cc')
-rw-r--r--util/stream/rewindable_stream.cc157
1 files changed, 86 insertions, 71 deletions
diff --git a/util/stream/rewindable_stream.cc b/util/stream/rewindable_stream.cc
index c7e39231b..2867bf8ab 100644
--- a/util/stream/rewindable_stream.cc
+++ b/util/stream/rewindable_stream.cc
@@ -13,105 +13,120 @@ void RewindableStream::Init(const ChainPosition &position) {
UTIL_THROW_IF2(in_, "RewindableStream::Init twice");
in_ = position.in_;
out_ = position.out_;
+ hit_poison_ = false;
poisoned_ = false;
progress_ = position.progress_;
entry_size_ = position.GetChain().EntrySize();
block_size_ = position.GetChain().BlockSize();
- FetchBlock();
- current_bl_ = &second_bl_;
- current_ = static_cast<uint8_t*>(current_bl_->Get());
- end_ = current_ + current_bl_->ValidSize();
-}
-
-const void *RewindableStream::Get() const {
- return current_;
-}
-
-void *RewindableStream::Get() {
- return current_;
+ block_count_ = position.GetChain().BlockCount();
+ blocks_it_ = 0;
+ marked_ = NULL;
+ UTIL_THROW_IF2(block_count_ < 2, "RewindableStream needs block_count at least two");
+ AppendBlock();
}
RewindableStream &RewindableStream::operator++() {
assert(*this);
- assert(current_ < end_);
+ assert(current_ < block_end_);
+ assert(current_);
+ assert(blocks_it_ < blocks_.size());
current_ += entry_size_;
- if (current_ == end_) {
- // two cases: either we need to fetch the next block, or we've already
- // fetched it before. We can check this by looking at the current_bl_
- // pointer: if it's at the second_bl_, we need to flush and fetch a new
- // block. Otherwise, we can just move over to the second block.
- if (current_bl_ == &second_bl_) {
- if (first_bl_) {
- out_->Produce(first_bl_);
- progress_ += first_bl_.ValidSize();
+ if (UTIL_UNLIKELY(current_ == block_end_)) {
+ // Fetch another block if necessary.
+ if (++blocks_it_ == blocks_.size()) {
+ if (!marked_) {
+ Flush(blocks_.begin() + blocks_it_);
+ blocks_it_ = 0;
}
- first_bl_ = second_bl_;
- FetchBlock();
+ AppendBlock();
+ assert(poisoned_ || (blocks_it_ == blocks_.size() - 1));
+ if (poisoned_) return *this;
}
- current_bl_ = &second_bl_;
- current_ = static_cast<uint8_t *>(second_bl_.Get());
- end_ = current_ + second_bl_.ValidSize();
- }
-
- if (!*current_bl_)
- {
- if (current_bl_ == &second_bl_ && first_bl_)
- {
- out_->Produce(first_bl_);
- progress_ += first_bl_.ValidSize();
- }
- out_->Produce(*current_bl_);
- poisoned_ = true;
+ Block &cur_block = blocks_[blocks_it_];
+ current_ = static_cast<uint8_t*>(cur_block.Get());
+ block_end_ = current_ + cur_block.ValidSize();
}
-
+ assert(current_);
+ assert(current_ >= static_cast<uint8_t*>(blocks_[blocks_it_].Get()));
+ assert(current_ < block_end_);
+ assert(block_end_ == blocks_[blocks_it_].ValidEnd());
return *this;
}
-void RewindableStream::FetchBlock() {
- // The loop is needed since it is *feasible* that we're given 0 sized but
- // valid blocks
- do {
- in_->Consume(second_bl_);
- } while (second_bl_ && second_bl_.ValidSize() == 0);
-}
-
void RewindableStream::Mark() {
marked_ = current_;
+ Flush(blocks_.begin() + blocks_it_);
+ blocks_it_ = 0;
}
void RewindableStream::Rewind() {
- if (marked_ >= first_bl_.Get() && marked_ < first_bl_.ValidEnd()) {
- current_bl_ = &first_bl_;
- current_ = marked_;
- } else if (marked_ >= second_bl_.Get() && marked_ < second_bl_.ValidEnd()) {
- current_bl_ = &second_bl_;
- current_ = marked_;
- } else { UTIL_THROW2("RewindableStream rewound too far"); }
+ if (current_ != marked_) {
+ poisoned_ = false;
+ }
+ blocks_it_ = 0;
+ current_ = marked_;
+ block_end_ = static_cast<const uint8_t*>(blocks_[blocks_it_].ValidEnd());
+
+ assert(current_);
+ assert(current_ >= static_cast<uint8_t*>(blocks_[blocks_it_].Get()));
+ assert(current_ < block_end_);
+ assert(block_end_ == blocks_[blocks_it_].ValidEnd());
}
void RewindableStream::Poison() {
- assert(!poisoned_);
+ if (blocks_.empty()) return;
+ assert(*this);
+ assert(blocks_it_ == blocks_.size() - 1);
- // Three things: if we have a buffered first block, we need to produce it
- // first. Then, produce the partial "current" block, and then send the
- // poison down the chain
+ // Produce all buffered blocks.
+ blocks_.back().SetValidSize(current_ - static_cast<uint8_t*>(blocks_.back().Get()));
+ Flush(blocks_.end());
+ blocks_it_ = 0;
- // if we still have a buffered first block, produce it first
- if (current_bl_ == &second_bl_ && first_bl_) {
- out_->Produce(first_bl_);
- progress_ += first_bl_.ValidSize();
+ Block poison;
+ if (!hit_poison_) {
+ in_->Consume(poison);
}
+ poison.SetToPoison();
+ out_->Produce(poison);
+ hit_poison_ = true;
+ poisoned_ = true;
+}
- // send our partial block
- current_bl_->SetValidSize(current_
- - static_cast<uint8_t *>(current_bl_->Get()));
- out_->Produce(*current_bl_);
- progress_ += current_bl_->ValidSize();
+void RewindableStream::AppendBlock() {
+ if (UTIL_UNLIKELY(blocks_.size() >= block_count_)) {
+ std::cerr << "RewindableStream trying to use more blocks than available" << std::endl;
+ abort();
+ }
+ if (UTIL_UNLIKELY(hit_poison_)) {
+ poisoned_ = true;
+ return;
+ }
+ Block get;
+ // The loop is needed since it is *feasible* that we're given 0 sized but
+ // valid blocks
+ do {
+ in_->Consume(get);
+ if (UTIL_LIKELY(get)) {
+ blocks_.push_back(get);
+ } else {
+ hit_poison_ = true;
+ poisoned_ = true;
+ return;
+ }
+ } while (UTIL_UNLIKELY(get.ValidSize() == 0));
+ current_ = static_cast<uint8_t*>(blocks_.back().Get());
+ block_end_ = static_cast<const uint8_t*>(blocks_.back().ValidEnd());
+ blocks_it_ = blocks_.size() - 1;
+}
- // send down the poison
- current_bl_->SetToPoison();
- out_->Produce(*current_bl_);
- poisoned_ = true;
+void RewindableStream::Flush(std::deque<Block>::iterator to) {
+ for (std::deque<Block>::iterator i = blocks_.begin(); i != to; ++i) {
+ out_->Produce(*i);
+ progress_ += i->ValidSize();
+ }
+ blocks_.erase(blocks_.begin(), to);
}
+
}
}