diff options
Diffstat (limited to 'src/core/Index/DocumentsWriter.cs')
-rw-r--r-- | src/core/Index/DocumentsWriter.cs | 2075 |
1 files changed, 2075 insertions, 0 deletions
diff --git a/src/core/Index/DocumentsWriter.cs b/src/core/Index/DocumentsWriter.cs new file mode 100644 index 0000000..6545d11 --- /dev/null +++ b/src/core/Index/DocumentsWriter.cs @@ -0,0 +1,2075 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading; +using Lucene.Net.Support; +using Analyzer = Lucene.Net.Analysis.Analyzer; +using Document = Lucene.Net.Documents.Document; +using AlreadyClosedException = Lucene.Net.Store.AlreadyClosedException; +using Directory = Lucene.Net.Store.Directory; +using ArrayUtil = Lucene.Net.Util.ArrayUtil; +using Constants = Lucene.Net.Util.Constants; +using IndexSearcher = Lucene.Net.Search.IndexSearcher; +using Query = Lucene.Net.Search.Query; +using Scorer = Lucene.Net.Search.Scorer; +using Similarity = Lucene.Net.Search.Similarity; +using Weight = Lucene.Net.Search.Weight; + +namespace Lucene.Net.Index +{ + + /// <summary> This class accepts multiple added documents and directly + /// writes a single segment file. It does this more + /// efficiently than creating a single segment per document + /// (with DocumentWriter) and doing standard merges on those + /// segments. + /// + /// Each added document is passed to the <see cref="DocConsumer" />, + /// which in turn processes the document and interacts with + /// other consumers in the indexing chain. Certain + /// consumers, like <see cref="StoredFieldsWriter" /> and <see cref="TermVectorsTermsWriter" /> + ///, digest a document and + /// immediately write bytes to the "doc store" files (ie, + /// they do not consume RAM per document, except while they + /// are processing the document). + /// + /// Other consumers, eg <see cref="FreqProxTermsWriter" /> and + /// <see cref="NormsWriter" />, buffer bytes in RAM and flush only + /// when a new segment is produced. + /// Once we have used our allowed RAM buffer, or the number + /// of added docs is large enough (in the case we are + /// flushing by doc count instead of RAM usage), we create a + /// real segment and flush it to the Directory. + /// + /// Threads: + /// + /// Multiple threads are allowed into addDocument at once. + /// There is an initial synchronized call to getThreadState + /// which allocates a ThreadState for this thread. The same + /// thread will get the same ThreadState over time (thread + /// affinity) so that if there are consistent patterns (for + /// example each thread is indexing a different content + /// source) then we make better use of RAM. Then + /// processDocument is called on that ThreadState without + /// synchronization (most of the "heavy lifting" is in this + /// call). Finally the synchronized "finishDocument" is + /// called to flush changes to the directory. + /// + /// When flush is called by IndexWriter we forcefully idle + /// all threads and flush only once they are all idle. This + /// means you can call flush with a given thread even while + /// other threads are actively adding/deleting documents. + /// + /// + /// Exceptions: + /// + /// Because this class directly updates in-memory posting + /// lists, and flushes stored fields and term vectors + /// directly to files in the directory, there are certain + /// limited times when an exception can corrupt this state. + /// For example, a disk full while flushing stored fields + /// leaves this file in a corrupt state. Or, an OOM + /// exception while appending to the in-memory posting lists + /// can corrupt that posting list. We call such exceptions + /// "aborting exceptions". In these cases we must call + /// abort() to discard all docs added since the last flush. + /// + /// All other exceptions ("non-aborting exceptions") can + /// still partially update the index structures. These + /// updates are consistent, but, they represent only a part + /// of the document seen up until the exception was hit. + /// When this happens, we immediately mark the document as + /// deleted so that the document is always atomically ("all + /// or none") added to the index. + /// </summary> + + public sealed class DocumentsWriter : IDisposable + { + internal class AnonymousClassIndexingChain:IndexingChain + { + + internal override DocConsumer GetChain(DocumentsWriter documentsWriter) + { + /* + This is the current indexing chain: + + DocConsumer / DocConsumerPerThread + --> code: DocFieldProcessor / DocFieldProcessorPerThread + --> DocFieldConsumer / DocFieldConsumerPerThread / DocFieldConsumerPerField + --> code: DocFieldConsumers / DocFieldConsumersPerThread / DocFieldConsumersPerField + --> code: DocInverter / DocInverterPerThread / DocInverterPerField + --> InvertedDocConsumer / InvertedDocConsumerPerThread / InvertedDocConsumerPerField + --> code: TermsHash / TermsHashPerThread / TermsHashPerField + --> TermsHashConsumer / TermsHashConsumerPerThread / TermsHashConsumerPerField + --> code: FreqProxTermsWriter / FreqProxTermsWriterPerThread / FreqProxTermsWriterPerField + --> code: TermVectorsTermsWriter / TermVectorsTermsWriterPerThread / TermVectorsTermsWriterPerField + --> InvertedDocEndConsumer / InvertedDocConsumerPerThread / InvertedDocConsumerPerField + --> code: NormsWriter / NormsWriterPerThread / NormsWriterPerField + --> code: StoredFieldsWriter / StoredFieldsWriterPerThread / StoredFieldsWriterPerField + */ + + // Build up indexing chain: + + TermsHashConsumer termVectorsWriter = new TermVectorsTermsWriter(documentsWriter); + TermsHashConsumer freqProxWriter = new FreqProxTermsWriter(); + + InvertedDocConsumer termsHash = new TermsHash(documentsWriter, true, freqProxWriter, new TermsHash(documentsWriter, false, termVectorsWriter, null)); + NormsWriter normsWriter = new NormsWriter(); + DocInverter docInverter = new DocInverter(termsHash, normsWriter); + return new DocFieldProcessor(documentsWriter, docInverter); + } + } + private void InitBlock() + { + maxFieldLength = IndexWriter.DEFAULT_MAX_FIELD_LENGTH; + maxBufferedDeleteTerms = IndexWriter.DEFAULT_MAX_BUFFERED_DELETE_TERMS; + ramBufferSize = (long) (IndexWriter.DEFAULT_RAM_BUFFER_SIZE_MB * 1024 * 1024); + waitQueuePauseBytes = (long) (ramBufferSize * 0.1); + waitQueueResumeBytes = (long) (ramBufferSize * 0.05); + freeTrigger = (long) (IndexWriter.DEFAULT_RAM_BUFFER_SIZE_MB * 1024 * 1024 * 1.05); + freeLevel = (long) (IndexWriter.DEFAULT_RAM_BUFFER_SIZE_MB * 1024 * 1024 * 0.95); + maxBufferedDocs = IndexWriter.DEFAULT_MAX_BUFFERED_DOCS; + skipDocWriter = new SkipDocWriter(); + byteBlockAllocator = new ByteBlockAllocator(this, DocumentsWriter.BYTE_BLOCK_SIZE); + perDocAllocator = new ByteBlockAllocator(this,DocumentsWriter.PER_DOC_BLOCK_SIZE); + waitQueue = new WaitQueue(this); + } + + internal IndexWriter writer; + internal Directory directory; + + internal System.String segment; // Current segment we are working on + private System.String docStoreSegment; // Current doc-store segment we are writing + private int docStoreOffset; // Current starting doc-store offset of current segment + + private int nextDocID; // Next docID to be added + private int numDocsInRAM; // # docs buffered in RAM + internal int numDocsInStore; // # docs written to doc stores + + // Max # ThreadState instances; if there are more threads + // than this they share ThreadStates + private const int MAX_THREAD_STATE = 5; + private DocumentsWriterThreadState[] threadStates = new DocumentsWriterThreadState[0]; + private HashMap<ThreadClass, DocumentsWriterThreadState> threadBindings = new HashMap<ThreadClass, DocumentsWriterThreadState>(); + + private int pauseThreads; // Non-zero when we need all threads to + // pause (eg to flush) + internal bool flushPending; // True when a thread has decided to flush + internal bool bufferIsFull; // True when it's time to write segment + private bool aborting; // True if an abort is pending + + private DocFieldProcessor docFieldProcessor; + + internal System.IO.StreamWriter infoStream; + internal int maxFieldLength; + internal Similarity similarity; + + internal IList<string> newFiles; + + internal class DocState + { + internal DocumentsWriter docWriter; + internal Analyzer analyzer; + internal int maxFieldLength; + internal System.IO.StreamWriter infoStream; + internal Similarity similarity; + internal int docID; + internal Document doc; + internal System.String maxTermPrefix; + + // Only called by asserts + public bool TestPoint(System.String name) + { + return docWriter.writer.TestPoint(name); + } + + public void Clear() + { + // don't hold onto doc nor analyzer, in case it is + // largish: + doc = null; + analyzer = null; + } + } + + /// <summary>Consumer returns this on each doc. This holds any + /// state that must be flushed synchronized "in docID + /// order". We gather these and flush them in order. + /// </summary> + internal abstract class DocWriter + { + internal DocWriter next; + internal int docID; + public abstract void Finish(); + public abstract void Abort(); + public abstract long SizeInBytes(); + + internal void SetNext(DocWriter next) + { + this.next = next; + } + } + + /* + * Create and return a new DocWriterBuffer. + */ + internal PerDocBuffer NewPerDocBuffer() + { + return new PerDocBuffer(this); + } + + /* + * RAMFile buffer for DocWriters. + */ + internal class PerDocBuffer : Lucene.Net.Store.RAMFile + { + DocumentsWriter enclosingInstance; + public PerDocBuffer(DocumentsWriter enclosingInstance) + { + this.enclosingInstance = enclosingInstance; + } + /* + * Allocate bytes used from shared pool. + */ + public override byte[] NewBuffer(int size) + { + System.Diagnostics.Debug.Assert(size == PER_DOC_BLOCK_SIZE); + return enclosingInstance.perDocAllocator.GetByteBlock(false); + } + + /* + * Recycle the bytes used. + */ + internal void Recycle() + { + lock (this) + { + if (buffers.Count > 0) + { + Length = 0; + + // Recycle the blocks + enclosingInstance.perDocAllocator.RecycleByteBlocks(buffers); + buffers.Clear(); + sizeInBytes = 0; + + System.Diagnostics.Debug.Assert(NumBuffers() == 0); + } + } + } + } + + /// <summary> The IndexingChain must define the <see cref="GetChain(DocumentsWriter)" /> method + /// which returns the DocConsumer that the DocumentsWriter calls to process the + /// documents. + /// </summary> + internal abstract class IndexingChain + { + internal abstract DocConsumer GetChain(DocumentsWriter documentsWriter); + } + + internal static readonly IndexingChain DefaultIndexingChain; + + internal DocConsumer consumer; + + // Deletes done after the last flush; these are discarded + // on abort + private BufferedDeletes deletesInRAM = new BufferedDeletes(false); + + // Deletes done before the last flush; these are still + // kept on abort + private BufferedDeletes deletesFlushed = new BufferedDeletes(true); + + // The max number of delete terms that can be buffered before + // they must be flushed to disk. + private int maxBufferedDeleteTerms; + + // How much RAM we can use before flushing. This is 0 if + // we are flushing by doc count instead. + private long ramBufferSize; + private long waitQueuePauseBytes; + private long waitQueueResumeBytes; + + // If we've allocated 5% over our RAM budget, we then + // free down to 95% + private long freeTrigger; + private long freeLevel; + + // Flush @ this number of docs. If ramBufferSize is + // non-zero we will flush by RAM usage instead. + private int maxBufferedDocs; + + private int flushedDocCount; // How many docs already flushed to index + + internal void UpdateFlushedDocCount(int n) + { + lock (this) + { + flushedDocCount += n; + } + } + internal int GetFlushedDocCount() + { + lock (this) + { + return flushedDocCount; + } + } + internal void SetFlushedDocCount(int n) + { + lock (this) + { + flushedDocCount = n; + } + } + + private bool closed; + + internal DocumentsWriter(Directory directory, IndexWriter writer, IndexingChain indexingChain) + { + InitBlock(); + this.directory = directory; + this.writer = writer; + this.similarity = writer.Similarity; + flushedDocCount = writer.MaxDoc(); + + consumer = indexingChain.GetChain(this); + if (consumer is DocFieldProcessor) + { + docFieldProcessor = (DocFieldProcessor) consumer; + } + } + + /// <summary>Returns true if any of the fields in the current + /// buffered docs have omitTermFreqAndPositions==false + /// </summary> + internal bool HasProx() + { + return (docFieldProcessor != null)?docFieldProcessor.fieldInfos.HasProx():true; + } + + /// <summary>If non-null, various details of indexing are printed + /// here. + /// </summary> + internal void SetInfoStream(System.IO.StreamWriter infoStream) + { + lock (this) + { + this.infoStream = infoStream; + for (int i = 0; i < threadStates.Length; i++) + threadStates[i].docState.infoStream = infoStream; + } + } + + internal void SetMaxFieldLength(int maxFieldLength) + { + lock (this) + { + this.maxFieldLength = maxFieldLength; + for (int i = 0; i < threadStates.Length; i++) + threadStates[i].docState.maxFieldLength = maxFieldLength; + } + } + + internal void SetSimilarity(Similarity similarity) + { + lock (this) + { + this.similarity = similarity; + for (int i = 0; i < threadStates.Length; i++) + threadStates[i].docState.similarity = similarity; + } + } + + /// <summary>Set how much RAM we can use before flushing. </summary> + internal void SetRAMBufferSizeMB(double mb) + { + lock (this) + { + if (mb == IndexWriter.DISABLE_AUTO_FLUSH) + { + ramBufferSize = IndexWriter.DISABLE_AUTO_FLUSH; + waitQueuePauseBytes = 4 * 1024 * 1024; + waitQueueResumeBytes = 2 * 1024 * 1024; + } + else + { + ramBufferSize = (long) (mb * 1024 * 1024); + waitQueuePauseBytes = (long) (ramBufferSize * 0.1); + waitQueueResumeBytes = (long) (ramBufferSize * 0.05); + freeTrigger = (long) (1.05 * ramBufferSize); + freeLevel = (long) (0.95 * ramBufferSize); + } + } + } + + internal double GetRAMBufferSizeMB() + { + lock (this) + { + if (ramBufferSize == IndexWriter.DISABLE_AUTO_FLUSH) + { + return ramBufferSize; + } + else + { + return ramBufferSize / 1024.0 / 1024.0; + } + } + } + + /// <summary>Gets or sets max buffered docs, which means we will flush by + /// doc count instead of by RAM usage. + /// </summary> + internal int MaxBufferedDocs + { + get { return maxBufferedDocs; } + set { maxBufferedDocs = value; } + } + + /// <summary>Get current segment name we are writing. </summary> + internal string Segment + { + get { return segment; } + } + + /// <summary>Returns how many docs are currently buffered in RAM. </summary> + internal int NumDocsInRAM + { + get { return numDocsInRAM; } + } + + /// <summary>Returns the current doc store segment we are writing + /// to. + /// </summary> + internal string DocStoreSegment + { + get + { + lock (this) + { + return docStoreSegment; + } + } + } + + /// <summary>Returns the doc offset into the shared doc store for + /// the current buffered docs. + /// </summary> + internal int DocStoreOffset + { + get { return docStoreOffset; } + } + + /// <summary>Closes the current open doc stores an returns the doc + /// store segment name. This returns null if there are * + /// no buffered documents. + /// </summary> + internal System.String CloseDocStore() + { + lock (this) + { + + System.Diagnostics.Debug.Assert(AllThreadsIdle()); + + if (infoStream != null) + Message("closeDocStore: " + openFiles.Count + " files to flush to segment " + docStoreSegment + " numDocs=" + numDocsInStore); + + bool success = false; + + try + { + InitFlushState(true); + closedFiles.Clear(); + + consumer.CloseDocStore(flushState); + System.Diagnostics.Debug.Assert(0 == openFiles.Count); + + System.String s = docStoreSegment; + docStoreSegment = null; + docStoreOffset = 0; + numDocsInStore = 0; + success = true; + return s; + } + finally + { + if (!success) + { + Abort(); + } + } + } + } + + private ICollection<string> abortedFiles; // List of files that were written before last abort() + + private SegmentWriteState flushState; + + internal ICollection<string> AbortedFiles() + { + return abortedFiles; + } + + internal void Message(System.String message) + { + if (infoStream != null) + writer.Message("DW: " + message); + } + + internal IList<string> openFiles = new List<string>(); + internal IList<string> closedFiles = new List<string>(); + + /* Returns Collection of files in use by this instance, + * including any flushed segments. */ + internal IList<string> OpenFiles() + { + lock (this) + { + // ToArray returns a copy + return openFiles.ToArray(); + } + } + + internal IList<string> ClosedFiles() + { + lock (this) + { + // ToArray returns a copy + return closedFiles.ToArray(); + } + } + + internal void AddOpenFile(System.String name) + { + lock (this) + { + System.Diagnostics.Debug.Assert(!openFiles.Contains(name)); + openFiles.Add(name); + } + } + + internal void RemoveOpenFile(System.String name) + { + lock (this) + { + System.Diagnostics.Debug.Assert(openFiles.Contains(name)); + openFiles.Remove(name); + closedFiles.Add(name); + } + } + + internal void SetAborting() + { + lock (this) + { + aborting = true; + } + } + + /// <summary>Called if we hit an exception at a bad time (when + /// updating the index files) and must discard all + /// currently buffered docs. This resets our state, + /// discarding any docs added since last flush. + /// </summary> + internal void Abort() + { + lock (this) + { + try + { + if (infoStream != null) + { + Message("docWriter: now abort"); + } + + // Forcefully remove waiting ThreadStates from line + waitQueue.Abort(); + + // Wait for all other threads to finish with + // DocumentsWriter: + PauseAllThreads(); + + try + { + + System.Diagnostics.Debug.Assert(0 == waitQueue.numWaiting); + + waitQueue.waitingBytes = 0; + + try + { + abortedFiles = OpenFiles(); + } + catch (System.Exception) + { + abortedFiles = null; + } + + deletesInRAM.Clear(); + deletesFlushed.Clear(); + openFiles.Clear(); + + for (int i = 0; i < threadStates.Length; i++) + try + { + threadStates[i].consumer.Abort(); + } + catch (System.Exception) + { + } + + try + { + consumer.Abort(); + } + catch (System.Exception) + { + } + + docStoreSegment = null; + numDocsInStore = 0; + docStoreOffset = 0; + + // Reset all postings data + DoAfterFlush(); + } + finally + { + ResumeAllThreads(); + } + } + finally + { + aborting = false; + System.Threading.Monitor.PulseAll(this); + if (infoStream != null) + { + Message("docWriter: done abort; abortedFiles=" + abortedFiles); + } + } + } + } + + /// <summary>Reset after a flush </summary> + private void DoAfterFlush() + { + // All ThreadStates should be idle when we are called + System.Diagnostics.Debug.Assert(AllThreadsIdle()); + threadBindings.Clear(); + waitQueue.Reset(); + segment = null; + numDocsInRAM = 0; + nextDocID = 0; + bufferIsFull = false; + flushPending = false; + for (int i = 0; i < threadStates.Length; i++) + threadStates[i].DoAfterFlush(); + numBytesUsed = 0; + } + + // Returns true if an abort is in progress + internal bool PauseAllThreads() + { + lock (this) + { + pauseThreads++; + while (!AllThreadsIdle()) + { + System.Threading.Monitor.Wait(this); + } + + return aborting; + } + } + + internal void ResumeAllThreads() + { + lock (this) + { + pauseThreads--; + System.Diagnostics.Debug.Assert(pauseThreads >= 0); + if (0 == pauseThreads) + System.Threading.Monitor.PulseAll(this); + } + } + + private bool AllThreadsIdle() + { + lock (this) + { + for (int i = 0; i < threadStates.Length; i++) + if (!threadStates[i].isIdle) + return false; + return true; + } + } + + internal bool AnyChanges + { + get + { + lock (this) + { + return numDocsInRAM != 0 || deletesInRAM.numTerms != 0 || deletesInRAM.docIDs.Count != 0 || + deletesInRAM.queries.Count != 0; + } + } + } + + private void InitFlushState(bool onlyDocStore) + { + lock (this) + { + InitSegmentName(onlyDocStore); + flushState = new SegmentWriteState(this, directory, segment, docStoreSegment, numDocsInRAM, numDocsInStore, writer.TermIndexInterval); + } + } + + /// <summary>Flush all pending docs to a new segment </summary> + internal int Flush(bool closeDocStore) + { + lock (this) + { + + System.Diagnostics.Debug.Assert(AllThreadsIdle()); + + System.Diagnostics.Debug.Assert(numDocsInRAM > 0); + + System.Diagnostics.Debug.Assert(nextDocID == numDocsInRAM); + System.Diagnostics.Debug.Assert(waitQueue.numWaiting == 0); + System.Diagnostics.Debug.Assert(waitQueue.waitingBytes == 0); + + InitFlushState(false); + + docStoreOffset = numDocsInStore; + + if (infoStream != null) + Message("flush postings as segment " + flushState.segmentName + " numDocs=" + numDocsInRAM); + + bool success = false; + + try + { + + if (closeDocStore) + { + System.Diagnostics.Debug.Assert(flushState.docStoreSegmentName != null); + System.Diagnostics.Debug.Assert(flushState.docStoreSegmentName.Equals(flushState.segmentName)); + CloseDocStore(); + flushState.numDocsInStore = 0; + } + + ICollection<DocConsumerPerThread> threads = new HashSet<DocConsumerPerThread>(); + for (int i = 0; i < threadStates.Length; i++) + threads.Add(threadStates[i].consumer); + consumer.Flush(threads, flushState); + + if (infoStream != null) + { + SegmentInfo si = new SegmentInfo(flushState.segmentName, flushState.numDocs, directory); + long newSegmentSize = si.SizeInBytes(); + System.String message = System.String.Format(nf, " oldRAMSize={0:d} newFlushedSize={1:d} docs/MB={2:f} new/old={3:%}", + new System.Object[] { numBytesUsed, newSegmentSize, (numDocsInRAM / (newSegmentSize / 1024.0 / 1024.0)), (100.0 * newSegmentSize / numBytesUsed) }); + Message(message); + } + + flushedDocCount += flushState.numDocs; + + DoAfterFlush(); + + success = true; + } + finally + { + if (!success) + { + Abort(); + } + } + + System.Diagnostics.Debug.Assert(waitQueue.waitingBytes == 0); + + return flushState.numDocs; + } + } + + internal ICollection<string> GetFlushedFiles() + { + return flushState.flushedFiles; + } + + /// <summary>Build compound file for the segment we just flushed </summary> + internal void CreateCompoundFile(System.String segment) + { + + CompoundFileWriter cfsWriter = new CompoundFileWriter(directory, segment + "." + IndexFileNames.COMPOUND_FILE_EXTENSION); + foreach(string flushedFile in flushState.flushedFiles) + { + cfsWriter.AddFile(flushedFile); + } + + // Perform the merge + cfsWriter.Close(); + } + + /// <summary>Set flushPending if it is not already set and returns + /// whether it was set. This is used by IndexWriter to + /// trigger a single flush even when multiple threads are + /// trying to do so. + /// </summary> + internal bool SetFlushPending() + { + lock (this) + { + if (flushPending) + return false; + else + { + flushPending = true; + return true; + } + } + } + + internal void ClearFlushPending() + { + lock (this) + { + flushPending = false; + } + } + + internal void PushDeletes() + { + lock (this) + { + deletesFlushed.Update(deletesInRAM); + } + } + + public void Dispose() + { + // Move to protected method if class becomes unsealed + lock (this) + { + closed = true; + System.Threading.Monitor.PulseAll(this); + } + } + + internal void InitSegmentName(bool onlyDocStore) + { + lock (this) + { + if (segment == null && (!onlyDocStore || docStoreSegment == null)) + { + segment = writer.NewSegmentName(); + System.Diagnostics.Debug.Assert(numDocsInRAM == 0); + } + if (docStoreSegment == null) + { + docStoreSegment = segment; + System.Diagnostics.Debug.Assert(numDocsInStore == 0); + } + } + } + + /// <summary>Returns a free (idle) ThreadState that may be used for + /// indexing this one document. This call also pauses if a + /// flush is pending. If delTerm is non-null then we + /// buffer this deleted term after the thread state has + /// been acquired. + /// </summary> + internal DocumentsWriterThreadState GetThreadState(Document doc, Term delTerm) + { + lock (this) + { + + // First, find a thread state. If this thread already + // has affinity to a specific ThreadState, use that one + // again. + DocumentsWriterThreadState state = threadBindings[ThreadClass.Current()]; + if (state == null) + { + + // First time this thread has called us since last + // flush. Find the least loaded thread state: + DocumentsWriterThreadState minThreadState = null; + for (int i = 0; i < threadStates.Length; i++) + { + DocumentsWriterThreadState ts = threadStates[i]; + if (minThreadState == null || ts.numThreads < minThreadState.numThreads) + minThreadState = ts; + } + if (minThreadState != null && (minThreadState.numThreads == 0 || threadStates.Length >= MAX_THREAD_STATE)) + { + state = minThreadState; + state.numThreads++; + } + else + { + // Just create a new "private" thread state + DocumentsWriterThreadState[] newArray = new DocumentsWriterThreadState[1 + threadStates.Length]; + if (threadStates.Length > 0) + Array.Copy(threadStates, 0, newArray, 0, threadStates.Length); + state = newArray[threadStates.Length] = new DocumentsWriterThreadState(this); + threadStates = newArray; + } + threadBindings[ThreadClass.Current()] = state; + } + + // Next, wait until my thread state is idle (in case + // it's shared with other threads) and for threads to + // not be paused nor a flush pending: + WaitReady(state); + + // Allocate segment name if this is the first doc since + // last flush: + InitSegmentName(false); + + state.isIdle = false; + + bool success = false; + try + { + state.docState.docID = nextDocID; + + System.Diagnostics.Debug.Assert(writer.TestPoint("DocumentsWriter.ThreadState.init start")); + + if (delTerm != null) + { + AddDeleteTerm(delTerm, state.docState.docID); + state.doFlushAfter = TimeToFlushDeletes(); + } + + System.Diagnostics.Debug.Assert(writer.TestPoint("DocumentsWriter.ThreadState.init after delTerm")); + + nextDocID++; + numDocsInRAM++; + + // We must at this point commit to flushing to ensure we + // always get N docs when we flush by doc count, even if + // > 1 thread is adding documents: + if (!flushPending && maxBufferedDocs != IndexWriter.DISABLE_AUTO_FLUSH && numDocsInRAM >= maxBufferedDocs) + { + flushPending = true; + state.doFlushAfter = true; + } + + success = true; + } + finally + { + if (!success) + { + // Forcefully idle this ThreadState: + state.isIdle = true; + System.Threading.Monitor.PulseAll(this); + if (state.doFlushAfter) + { + state.doFlushAfter = false; + flushPending = false; + } + } + } + + return state; + } + } + + /// <summary>Returns true if the caller (IndexWriter) should now + /// flush. + /// </summary> + internal bool AddDocument(Document doc, Analyzer analyzer) + { + return UpdateDocument(doc, analyzer, null); + } + + internal bool UpdateDocument(Term t, Document doc, Analyzer analyzer) + { + return UpdateDocument(doc, analyzer, t); + } + + internal bool UpdateDocument(Document doc, Analyzer analyzer, Term delTerm) + { + + // This call is synchronized but fast + DocumentsWriterThreadState state = GetThreadState(doc, delTerm); + + DocState docState = state.docState; + docState.doc = doc; + docState.analyzer = analyzer; + + bool doReturnFalse = false; // {{Aroush-2.9}} to handle return from finally clause + + bool success = false; + try + { + // This call is not synchronized and does all the + // work + DocWriter perDoc; + try + { + perDoc = state.consumer.ProcessDocument(); + } + finally + { + docState.Clear(); + } + // This call is synchronized but fast + FinishDocument(state, perDoc); + success = true; + } + finally + { + if (!success) + { + lock (this) + { + + if (aborting) + { + state.isIdle = true; + System.Threading.Monitor.PulseAll(this); + Abort(); + } + else + { + skipDocWriter.docID = docState.docID; + bool success2 = false; + try + { + waitQueue.Add(skipDocWriter); + success2 = true; + } + finally + { + if (!success2) + { + state.isIdle = true; + System.Threading.Monitor.PulseAll(this); + Abort(); + // return false; // {{Aroush-2.9}} this 'return false' is move to outside finally + doReturnFalse = true; + } + } + + if (!doReturnFalse) // {{Aroush-2.9}} added because of the above 'return false' removal + { + state.isIdle = true; + System.Threading.Monitor.PulseAll(this); + + // If this thread state had decided to flush, we + // must clear it so another thread can flush + if (state.doFlushAfter) + { + state.doFlushAfter = false; + flushPending = false; + System.Threading.Monitor.PulseAll(this); + } + + // Immediately mark this document as deleted + // since likely it was partially added. This + // keeps indexing as "all or none" (atomic) when + // adding a document: + AddDeleteDocID(state.docState.docID); + } + } + } + } + } + + if (doReturnFalse) // {{Aroush-2.9}} see comment abouve + { + return false; + } + + return state.doFlushAfter || TimeToFlushDeletes(); + } + + // for testing + internal int GetNumBufferedDeleteTerms() + { + lock (this) + { + return deletesInRAM.numTerms; + } + } + + // for testing + internal IDictionary<Term, BufferedDeletes.Num> GetBufferedDeleteTerms() + { + lock (this) + { + return deletesInRAM.terms; + } + } + + /// <summary>Called whenever a merge has completed and the merged segments had deletions </summary> + internal void RemapDeletes(SegmentInfos infos, int[][] docMaps, int[] delCounts, MergePolicy.OneMerge merge, int mergeDocCount) + { + lock (this) + { + if (docMaps == null) + // The merged segments had no deletes so docIDs did not change and we have nothing to do + return ; + MergeDocIDRemapper mapper = new MergeDocIDRemapper(infos, docMaps, delCounts, merge, mergeDocCount); + deletesInRAM.Remap(mapper, infos, docMaps, delCounts, merge, mergeDocCount); + deletesFlushed.Remap(mapper, infos, docMaps, delCounts, merge, mergeDocCount); + flushedDocCount -= mapper.docShift; + } + } + + private void WaitReady(DocumentsWriterThreadState state) + { + lock (this) + { + + while (!closed && ((state != null && !state.isIdle) || pauseThreads != 0 || flushPending || aborting)) + { + System.Threading.Monitor.Wait(this); + } + + if (closed) + throw new AlreadyClosedException("this IndexWriter is closed"); + } + } + + internal bool BufferDeleteTerms(Term[] terms) + { + lock (this) + { + WaitReady(null); + for (int i = 0; i < terms.Length; i++) + AddDeleteTerm(terms[i], numDocsInRAM); + return TimeToFlushDeletes(); + } + } + + internal bool BufferDeleteTerm(Term term) + { + lock (this) + { + WaitReady(null); + AddDeleteTerm(term, numDocsInRAM); + return TimeToFlushDeletes(); + } + } + + internal bool BufferDeleteQueries(Query[] queries) + { + lock (this) + { + WaitReady(null); + for (int i = 0; i < queries.Length; i++) + AddDeleteQuery(queries[i], numDocsInRAM); + return TimeToFlushDeletes(); + } + } + + internal bool BufferDeleteQuery(Query query) + { + lock (this) + { + WaitReady(null); + AddDeleteQuery(query, numDocsInRAM); + return TimeToFlushDeletes(); + } + } + + internal bool DeletesFull() + { + lock (this) + { + return (ramBufferSize != IndexWriter.DISABLE_AUTO_FLUSH && (deletesInRAM.bytesUsed + deletesFlushed.bytesUsed + numBytesUsed) >= ramBufferSize) || (maxBufferedDeleteTerms != IndexWriter.DISABLE_AUTO_FLUSH && ((deletesInRAM.Size() + deletesFlushed.Size()) >= maxBufferedDeleteTerms)); + } + } + + internal bool DoApplyDeletes() + { + lock (this) + { + // Very similar to deletesFull(), except we don't count + // numBytesAlloc, because we are checking whether + // deletes (alone) are consuming too many resources now + // and thus should be applied. We apply deletes if RAM + // usage is > 1/2 of our allowed RAM buffer, to prevent + // too-frequent flushing of a long tail of tiny segments + // when merges (which always apply deletes) are + // infrequent. + return (ramBufferSize != IndexWriter.DISABLE_AUTO_FLUSH && (deletesInRAM.bytesUsed + deletesFlushed.bytesUsed) >= ramBufferSize / 2) || (maxBufferedDeleteTerms != IndexWriter.DISABLE_AUTO_FLUSH && ((deletesInRAM.Size() + deletesFlushed.Size()) >= maxBufferedDeleteTerms)); + } + } + + private bool TimeToFlushDeletes() + { + lock (this) + { + return (bufferIsFull || DeletesFull()) && SetFlushPending(); + } + } + + internal int MaxBufferedDeleteTerms + { + set { this.maxBufferedDeleteTerms = value; } + get { return maxBufferedDeleteTerms; } + } + + internal bool HasDeletes() + { + lock (this) + { + return deletesFlushed.Any(); + } + } + + internal bool ApplyDeletes(SegmentInfos infos) + { + lock (this) + { + if (!HasDeletes()) + return false; + + if (infoStream != null) + Message("apply " + deletesFlushed.numTerms + " buffered deleted terms and " + deletesFlushed.docIDs.Count + " deleted docIDs and " + deletesFlushed.queries.Count + " deleted queries on " + (+ infos.Count) + " segments."); + + int infosEnd = infos.Count; + + int docStart = 0; + bool any = false; + for (int i = 0; i < infosEnd; i++) + { + + // Make sure we never attempt to apply deletes to + // segment in external dir + System.Diagnostics.Debug.Assert(infos.Info(i).dir == directory); + + SegmentReader reader = writer.readerPool.Get(infos.Info(i), false); + try + { + any |= ApplyDeletes(reader, docStart); + docStart += reader.MaxDoc; + } + finally + { + writer.readerPool.Release(reader); + } + } + + deletesFlushed.Clear(); + + return any; + } + } + + // used only by assert + private Term lastDeleteTerm; + + // used only by assert + private bool CheckDeleteTerm(Term term) + { + if (term != null) { + System.Diagnostics.Debug.Assert(lastDeleteTerm == null || term.CompareTo(lastDeleteTerm) > 0, "lastTerm=" + lastDeleteTerm + " vs term=" + term); + } + lastDeleteTerm = term; + return true; + } + + // Apply buffered delete terms, queries and docIDs to the + // provided reader + private bool ApplyDeletes(IndexReader reader, int docIDStart) + { + lock (this) + { + int docEnd = docIDStart + reader.MaxDoc; + bool any = false; + + System.Diagnostics.Debug.Assert(CheckDeleteTerm(null)); + + // Delete by term + TermDocs docs = reader.TermDocs(); + try + { + foreach(KeyValuePair<Term, BufferedDeletes.Num> entry in deletesFlushed.terms) + { + Term term = entry.Key; + // LUCENE-2086: we should be iterating a TreeMap, + // here, so terms better be in order: + System.Diagnostics.Debug.Assert(CheckDeleteTerm(term)); + docs.Seek(term); + int limit = entry.Value.GetNum(); + while (docs.Next()) + { + int docID = docs.Doc; + if (docIDStart + docID >= limit) + break; + reader.DeleteDocument(docID); + any = true; + } + } + } + finally + { + docs.Close(); + } + + // Delete by docID + foreach(int docIdInt in deletesFlushed.docIDs) + { + int docID = docIdInt; + if (docID >= docIDStart && docID < docEnd) + { + reader.DeleteDocument(docID - docIDStart); + any = true; + } + } + + // Delete by query + IndexSearcher searcher = new IndexSearcher(reader); + foreach(KeyValuePair<Query, int> entry in deletesFlushed.queries) + { + Query query = (Query) entry.Key; + int limit = (int)entry.Value; + Weight weight = query.Weight(searcher); + Scorer scorer = weight.Scorer(reader, true, false); + if (scorer != null) + { + while (true) + { + int doc = scorer.NextDoc(); + if (((long) docIDStart) + doc >= limit) + break; + reader.DeleteDocument(doc); + any = true; + } + } + } + searcher.Close(); + return any; + } + } + + // Buffer a term in bufferedDeleteTerms, which records the + // current number of documents buffered in ram so that the + // delete term will be applied to those documents as well + // as the disk segments. + private void AddDeleteTerm(Term term, int docCount) + { + lock (this) + { + BufferedDeletes.Num num = deletesInRAM.terms[term]; + int docIDUpto = flushedDocCount + docCount; + if (num == null) + deletesInRAM.terms[term] = new BufferedDeletes.Num(docIDUpto); + else + num.SetNum(docIDUpto); + deletesInRAM.numTerms++; + + deletesInRAM.AddBytesUsed(BYTES_PER_DEL_TERM + term.Text.Length * CHAR_NUM_BYTE); + } + } + + // Buffer a specific docID for deletion. Currently only + // used when we hit a exception when adding a document + private void AddDeleteDocID(int docID) + { + lock (this) + { + deletesInRAM.docIDs.Add(flushedDocCount + docID); + deletesInRAM.AddBytesUsed(BYTES_PER_DEL_DOCID); + } + } + + private void AddDeleteQuery(Query query, int docID) + { + lock (this) + { + deletesInRAM.queries[query] = flushedDocCount + docID; + deletesInRAM.AddBytesUsed(BYTES_PER_DEL_QUERY); + } + } + + internal bool DoBalanceRAM() + { + lock (this) + { + return ramBufferSize != IndexWriter.DISABLE_AUTO_FLUSH && !bufferIsFull && (numBytesUsed + deletesInRAM.bytesUsed + deletesFlushed.bytesUsed >= ramBufferSize || numBytesAlloc >= freeTrigger); + } + } + + /// <summary>Does the synchronized work to finish/flush the + /// inverted document. + /// </summary> + private void FinishDocument(DocumentsWriterThreadState perThread, DocWriter docWriter) + { + + if (DoBalanceRAM()) + // Must call this w/o holding synchronized(this) else + // we'll hit deadlock: + BalanceRAM(); + + lock (this) + { + + System.Diagnostics.Debug.Assert(docWriter == null || docWriter.docID == perThread.docState.docID); + + if (aborting) + { + + // We are currently aborting, and another thread is + // waiting for me to become idle. We just forcefully + // idle this threadState; it will be fully reset by + // abort() + if (docWriter != null) + try + { + docWriter.Abort(); + } + catch (System.Exception) + { + } + + perThread.isIdle = true; + System.Threading.Monitor.PulseAll(this); + return ; + } + + bool doPause; + + if (docWriter != null) + doPause = waitQueue.Add(docWriter); + else + { + skipDocWriter.docID = perThread.docState.docID; + doPause = waitQueue.Add(skipDocWriter); + } + + if (doPause) + WaitForWaitQueue(); + + if (bufferIsFull && !flushPending) + { + flushPending = true; + perThread.doFlushAfter = true; + } + + perThread.isIdle = true; + System.Threading.Monitor.PulseAll(this); + } + } + + internal void WaitForWaitQueue() + { + lock (this) + { + do + { + System.Threading.Monitor.Wait(this); + } + while (!waitQueue.DoResume()); + } + } + + internal class SkipDocWriter:DocWriter + { + public override void Finish() + { + } + public override void Abort() + { + } + public override long SizeInBytes() + { + return 0; + } + } + internal SkipDocWriter skipDocWriter; + + internal long GetRAMUsed() + { + return numBytesUsed + deletesInRAM.bytesUsed + deletesFlushed.bytesUsed; + } + + internal long numBytesAlloc; + internal long numBytesUsed; + + internal System.Globalization.NumberFormatInfo nf = System.Globalization.CultureInfo.CurrentCulture.NumberFormat; + + // Coarse estimates used to measure RAM usage of buffered deletes + internal const int OBJECT_HEADER_BYTES = 8; + internal static readonly int POINTER_NUM_BYTE; + internal const int INT_NUM_BYTE = 4; + internal const int CHAR_NUM_BYTE = 2; + + /* Rough logic: HashMap has an array[Entry] w/ varying + load factor (say 2 * POINTER). Entry is object w/ Term + key, BufferedDeletes.Num val, int hash, Entry next + (OBJ_HEADER + 3*POINTER + INT). Term is object w/ + String field and String text (OBJ_HEADER + 2*POINTER). + We don't count Term's field since it's interned. + Term's text is String (OBJ_HEADER + 4*INT + POINTER + + OBJ_HEADER + string.length*CHAR). BufferedDeletes.num is + OBJ_HEADER + INT. */ + + internal static readonly int BYTES_PER_DEL_TERM = 8 * POINTER_NUM_BYTE + 5 * OBJECT_HEADER_BYTES + 6 * INT_NUM_BYTE; + + /* Rough logic: del docIDs are List<Integer>. Say list + allocates ~2X size (2*POINTER). Integer is OBJ_HEADER + + int */ + internal static readonly int BYTES_PER_DEL_DOCID = 2 * POINTER_NUM_BYTE + OBJECT_HEADER_BYTES + INT_NUM_BYTE; + + /* Rough logic: HashMap has an array[Entry] w/ varying + load factor (say 2 * POINTER). Entry is object w/ + Query key, Integer val, int hash, Entry next + (OBJ_HEADER + 3*POINTER + INT). Query we often + undercount (say 24 bytes). Integer is OBJ_HEADER + INT. */ + internal static readonly int BYTES_PER_DEL_QUERY = 5 * POINTER_NUM_BYTE + 2 * OBJECT_HEADER_BYTES + 2 * INT_NUM_BYTE + 24; + + /* Initial chunks size of the shared byte[] blocks used to + store postings data */ + internal const int BYTE_BLOCK_SHIFT = 15; + internal static readonly int BYTE_BLOCK_SIZE = 1 << BYTE_BLOCK_SHIFT; + internal static readonly int BYTE_BLOCK_MASK = BYTE_BLOCK_SIZE - 1; + internal static readonly int BYTE_BLOCK_NOT_MASK = ~ BYTE_BLOCK_MASK; + + internal class ByteBlockAllocator : ByteBlockPool.Allocator + { + public ByteBlockAllocator(DocumentsWriter enclosingInstance, int blockSize) + { + this.blockSize = blockSize; + InitBlock(enclosingInstance); + } + private void InitBlock(DocumentsWriter enclosingInstance) + { + this.enclosingInstance = enclosingInstance; + } + private DocumentsWriter enclosingInstance; + public DocumentsWriter Enclosing_Instance + { + get + { + return enclosingInstance; + } + + } + + int blockSize; + internal List<byte[]> freeByteBlocks = new List<byte[]>(); + + /* Allocate another byte[] from the shared pool */ + public /*internal*/ override byte[] GetByteBlock(bool trackAllocations) + { + lock (Enclosing_Instance) + { + int size = freeByteBlocks.Count; + byte[] b; + if (0 == size) + { + // Always record a block allocated, even if + // trackAllocations is false. This is necessary + // because this block will be shared between + // things that don't track allocations (term + // vectors) and things that do (freq/prox + // postings). + Enclosing_Instance.numBytesAlloc += blockSize; + b = new byte[blockSize]; + } + else + { + b = freeByteBlocks[size - 1]; + freeByteBlocks.RemoveAt(size - 1); + } + if (trackAllocations) + Enclosing_Instance.numBytesUsed += blockSize; + System.Diagnostics.Debug.Assert(Enclosing_Instance.numBytesUsed <= Enclosing_Instance.numBytesAlloc); + return b; + } + } + + /* Return byte[]'s to the pool */ + public /*internal*/ override void RecycleByteBlocks(byte[][] blocks, int start, int end) + { + lock (Enclosing_Instance) + { + for (int i = start; i < end; i++) + { + freeByteBlocks.Add(blocks[i]); + blocks[i] = null; + } + } + } + + public /*internal*/ override void RecycleByteBlocks(IList<byte[]> blocks) + { + lock (Enclosing_Instance) + { + int size = blocks.Count; + for(int i=0;i<size;i++) + freeByteBlocks.Add(blocks[i]); + } + } + } + + /* Initial chunks size of the shared int[] blocks used to + store postings data */ + internal const int INT_BLOCK_SHIFT = 13; + internal static readonly int INT_BLOCK_SIZE = 1 << INT_BLOCK_SHIFT; + internal static readonly int INT_BLOCK_MASK = INT_BLOCK_SIZE - 1; + + private List<int[]> freeIntBlocks = new List<int[]>(); + + /* Allocate another int[] from the shared pool */ + internal int[] GetIntBlock(bool trackAllocations) + { + lock (this) + { + int size = freeIntBlocks.Count; + int[] b; + if (0 == size) + { + // Always record a block allocated, even if + // trackAllocations is false. This is necessary + // because this block will be shared between + // things that don't track allocations (term + // vectors) and things that do (freq/prox + // postings). + numBytesAlloc += INT_BLOCK_SIZE * INT_NUM_BYTE; + b = new int[INT_BLOCK_SIZE]; + } + else + { + b = freeIntBlocks[size - 1]; + freeIntBlocks.RemoveAt(size - 1); + } + if (trackAllocations) + numBytesUsed += INT_BLOCK_SIZE * INT_NUM_BYTE; + System.Diagnostics.Debug.Assert(numBytesUsed <= numBytesAlloc); + return b; + } + } + + internal void BytesAllocated(long numBytes) + { + lock (this) + { + numBytesAlloc += numBytes; + } + } + + internal void BytesUsed(long numBytes) + { + lock (this) + { + numBytesUsed += numBytes; + System.Diagnostics.Debug.Assert(numBytesUsed <= numBytesAlloc); + } + } + + /* Return int[]s to the pool */ + internal void RecycleIntBlocks(int[][] blocks, int start, int end) + { + lock (this) + { + for (int i = start; i < end; i++) + { + freeIntBlocks.Add(blocks[i]); + blocks[i] = null; + } + } + } + + internal ByteBlockAllocator byteBlockAllocator; + + internal static int PER_DOC_BLOCK_SIZE = 1024; + + ByteBlockAllocator perDocAllocator; + + /* Initial chunk size of the shared char[] blocks used to + store term text */ + internal const int CHAR_BLOCK_SHIFT = 14; + internal static readonly int CHAR_BLOCK_SIZE = 1 << CHAR_BLOCK_SHIFT; + internal static readonly int CHAR_BLOCK_MASK = CHAR_BLOCK_SIZE - 1; + + internal static readonly int MAX_TERM_LENGTH = CHAR_BLOCK_SIZE - 1; + + private List<char[]> freeCharBlocks = new List<char[]>(); + + /* Allocate another char[] from the shared pool */ + internal char[] GetCharBlock() + { + lock (this) + { + int size = freeCharBlocks.Count; + char[] c; + if (0 == size) + { + numBytesAlloc += CHAR_BLOCK_SIZE * CHAR_NUM_BYTE; + c = new char[CHAR_BLOCK_SIZE]; + } + else + { + c = freeCharBlocks[size - 1]; + freeCharBlocks.RemoveAt(size - 1); + } + // We always track allocations of char blocks, for now, + // because nothing that skips allocation tracking + // (currently only term vectors) uses its own char + // blocks. + numBytesUsed += CHAR_BLOCK_SIZE * CHAR_NUM_BYTE; + System.Diagnostics.Debug.Assert(numBytesUsed <= numBytesAlloc); + return c; + } + } + + /* Return char[]s to the pool */ + internal void RecycleCharBlocks(char[][] blocks, int numBlocks) + { + lock (this) + { + for (int i = 0; i < numBlocks; i++) + { + freeCharBlocks.Add(blocks[i]); + blocks[i] = null; + } + } + } + + internal System.String ToMB(long v) + { + return System.String.Format(nf, "{0:f}", new System.Object[] { (v / 1024F / 1024F) }); + } + + + /* We have four pools of RAM: Postings, byte blocks + * (holds freq/prox posting data), char blocks (holds + * characters in the term) and per-doc buffers (stored fields/term vectors). + * Different docs require varying amount of storage from + * these four classes. + * + * For example, docs with many unique single-occurrence + * short terms will use up the Postings RAM and hardly any + * of the other two. Whereas docs with very large terms + * will use alot of char blocks RAM and relatively less of + * the other two. This method just frees allocations from + * the pools once we are over-budget, which balances the + * pools to match the current docs. */ + internal void BalanceRAM() + { + + // We flush when we've used our target usage + long flushTrigger = ramBufferSize; + + long deletesRAMUsed = deletesInRAM.bytesUsed + deletesFlushed.bytesUsed; + + if (numBytesAlloc + deletesRAMUsed > freeTrigger) + { + + if (infoStream != null) + Message( + " RAM: now balance allocations: usedMB=" + ToMB(numBytesUsed) + + " vs trigger=" + ToMB(flushTrigger) + + " allocMB=" + ToMB(numBytesAlloc) + + " deletesMB=" + ToMB(deletesRAMUsed) + + " vs trigger=" + ToMB(freeTrigger) + + " byteBlockFree=" + ToMB(byteBlockAllocator.freeByteBlocks.Count * BYTE_BLOCK_SIZE) + + " perDocFree=" + ToMB(perDocAllocator.freeByteBlocks.Count * PER_DOC_BLOCK_SIZE) + + " charBlockFree=" + ToMB(freeCharBlocks.Count * CHAR_BLOCK_SIZE * CHAR_NUM_BYTE)); + + long startBytesAlloc = numBytesAlloc + deletesRAMUsed; + + int iter = 0; + + // We free equally from each pool in 32 KB + // chunks until we are below our threshold + // (freeLevel) + + bool any = true; + + while (numBytesAlloc + deletesRAMUsed > freeLevel) + { + + lock (this) + { + if (0 == perDocAllocator.freeByteBlocks.Count + && 0 == byteBlockAllocator.freeByteBlocks.Count + && 0 == freeCharBlocks.Count + && 0 == freeIntBlocks.Count + && !any) + { + // Nothing else to free -- must flush now. + bufferIsFull = numBytesUsed + deletesRAMUsed > flushTrigger; + if (infoStream != null) + { + if (bufferIsFull) + Message(" nothing to free; now set bufferIsFull"); + else + Message(" nothing to free"); + } + System.Diagnostics.Debug.Assert(numBytesUsed <= numBytesAlloc); + break; + } + + if ((0 == iter % 5) && byteBlockAllocator.freeByteBlocks.Count > 0) + { + byteBlockAllocator.freeByteBlocks.RemoveAt(byteBlockAllocator.freeByteBlocks.Count - 1); + numBytesAlloc -= BYTE_BLOCK_SIZE; + } + + if ((1 == iter % 5) && freeCharBlocks.Count > 0) + { + freeCharBlocks.RemoveAt(freeCharBlocks.Count - 1); + numBytesAlloc -= CHAR_BLOCK_SIZE * CHAR_NUM_BYTE; + } + + if ((2 == iter % 5) && freeIntBlocks.Count > 0) + { + freeIntBlocks.RemoveAt(freeIntBlocks.Count - 1); + numBytesAlloc -= INT_BLOCK_SIZE * INT_NUM_BYTE; + } + + if ((3 == iter % 5) && perDocAllocator.freeByteBlocks.Count > 0) + { + // Remove upwards of 32 blocks (each block is 1K) + for (int i = 0; i < 32; ++i) + { + perDocAllocator.freeByteBlocks.RemoveAt(perDocAllocator.freeByteBlocks.Count - 1); + numBytesAlloc -= PER_DOC_BLOCK_SIZE; + if (perDocAllocator.freeByteBlocks.Count == 0) + { + break; + } + } + } + } + + if ((4 == iter % 5) && any) + // Ask consumer to free any recycled state + any = consumer.FreeRAM(); + + iter++; + } + + if (infoStream != null) + Message(System.String.Format(nf, " after free: freedMB={0:f} usedMB={1:f} allocMB={2:f}", + new System.Object[] { ((startBytesAlloc - numBytesAlloc) / 1024.0 / 1024.0), (numBytesUsed / 1024.0 / 1024.0), (numBytesAlloc / 1024.0 / 1024.0) })); + } + else + { + // If we have not crossed the 100% mark, but have + // crossed the 95% mark of RAM we are actually + // using, go ahead and flush. This prevents + // over-allocating and then freeing, with every + // flush. + lock (this) + { + + if (numBytesUsed + deletesRAMUsed > flushTrigger) + { + if (infoStream != null) + Message(System.String.Format(nf, " RAM: now flush @ usedMB={0:f} allocMB={1:f} triggerMB={2:f}", + new object[] { (numBytesUsed / 1024.0 / 1024.0), (numBytesAlloc / 1024.0 / 1024.0), (flushTrigger / 1024.0 / 1024.0) })); + + bufferIsFull = true; + } + } + } + } + + internal WaitQueue waitQueue; + + internal class WaitQueue + { + private void InitBlock(DocumentsWriter enclosingInstance) + { + this.enclosingInstance = enclosingInstance; + } + private DocumentsWriter enclosingInstance; + public DocumentsWriter Enclosing_Instance + { + get + { + return enclosingInstance; + } + + } + internal DocWriter[] waiting; + internal int nextWriteDocID; + internal int nextWriteLoc; + internal int numWaiting; + internal long waitingBytes; + + public WaitQueue(DocumentsWriter enclosingInstance) + { + InitBlock(enclosingInstance); + waiting = new DocWriter[10]; + } + + internal void Reset() + { + lock (this) + { + // NOTE: nextWriteLoc doesn't need to be reset + System.Diagnostics.Debug.Assert(numWaiting == 0); + System.Diagnostics.Debug.Assert(waitingBytes == 0); + nextWriteDocID = 0; + } + } + + internal bool DoResume() + { + lock (this) + { + return waitingBytes <= Enclosing_Instance.waitQueueResumeBytes; + } + } + + internal bool DoPause() + { + lock (this) + { + return waitingBytes > Enclosing_Instance.waitQueuePauseBytes; + } + } + + internal void Abort() + { + lock (this) + { + int count = 0; + for (int i = 0; i < waiting.Length; i++) + { + DocWriter doc = waiting[i]; + if (doc != null) + { + doc.Abort(); + waiting[i] = null; + count++; + } + } + waitingBytes = 0; + System.Diagnostics.Debug.Assert(count == numWaiting); + numWaiting = 0; + } + } + + private void WriteDocument(DocWriter doc) + { + System.Diagnostics.Debug.Assert(doc == Enclosing_Instance.skipDocWriter || nextWriteDocID == doc.docID); + bool success = false; + try + { + doc.Finish(); + nextWriteDocID++; + Enclosing_Instance.numDocsInStore++; + nextWriteLoc++; + System.Diagnostics.Debug.Assert(nextWriteLoc <= waiting.Length); + if (nextWriteLoc == waiting.Length) + nextWriteLoc = 0; + success = true; + } + finally + { + if (!success) + Enclosing_Instance.SetAborting(); + } + } + + public bool Add(DocWriter doc) + { + lock (this) + { + + System.Diagnostics.Debug.Assert(doc.docID >= nextWriteDocID); + + if (doc.docID == nextWriteDocID) + { + WriteDocument(doc); + while (true) + { + doc = waiting[nextWriteLoc]; + if (doc != null) + { + numWaiting--; + waiting[nextWriteLoc] = null; + waitingBytes -= doc.SizeInBytes(); + WriteDocument(doc); + } + else + break; + } + } + else + { + + // I finished before documents that were added + // before me. This can easily happen when I am a + // small doc and the docs before me were large, or, + // just due to luck in the thread scheduling. Just + // add myself to the queue and when that large doc + // finishes, it will flush me: + int gap = doc.docID - nextWriteDocID; + if (gap >= waiting.Length) + { + // Grow queue + DocWriter[] newArray = new DocWriter[ArrayUtil.GetNextSize(gap)]; + System.Diagnostics.Debug.Assert(nextWriteLoc >= 0); + Array.Copy(waiting, nextWriteLoc, newArray, 0, waiting.Length - nextWriteLoc); + Array.Copy(waiting, 0, newArray, waiting.Length - nextWriteLoc, nextWriteLoc); + nextWriteLoc = 0; + waiting = newArray; + gap = doc.docID - nextWriteDocID; + } + + int loc = nextWriteLoc + gap; + if (loc >= waiting.Length) + loc -= waiting.Length; + + // We should only wrap one time + System.Diagnostics.Debug.Assert(loc < waiting.Length); + + // Nobody should be in my spot! + System.Diagnostics.Debug.Assert(waiting [loc] == null); + waiting[loc] = doc; + numWaiting++; + waitingBytes += doc.SizeInBytes(); + } + + return DoPause(); + } + } + } + static DocumentsWriter() + { + DefaultIndexingChain = new AnonymousClassIndexingChain(); + POINTER_NUM_BYTE = Constants.JRE_IS_64BIT?8:4; + } + + public static int BYTE_BLOCK_SIZE_ForNUnit + { + get { return BYTE_BLOCK_SIZE; } + } + + public static int CHAR_BLOCK_SIZE_ForNUnit + { + get { return CHAR_BLOCK_SIZE; } + } + } +}
\ No newline at end of file |