diff options
Diffstat (limited to '7zip/Compress/BZip2/BZip2Encoder.cpp')
-rwxr-xr-x | 7zip/Compress/BZip2/BZip2Encoder.cpp | 356 |
1 files changed, 298 insertions, 58 deletions
diff --git a/7zip/Compress/BZip2/BZip2Encoder.cpp b/7zip/Compress/BZip2/BZip2Encoder.cpp index 657fcfb2..dc08e7a8 100755 --- a/7zip/Compress/BZip2/BZip2Encoder.cpp +++ b/7zip/Compress/BZip2/BZip2Encoder.cpp @@ -2,9 +2,10 @@ #include "StdAfx.h" +#include "../../../Common/Alloc.h" + #include "BZip2Encoder.h" -#include "../../../Common/Alloc.h" #include "../BWT/Mtf8.h" #include "BZip2CRC.h" @@ -14,18 +15,160 @@ namespace NBZip2 { static const UInt32 kBufferSize = (1 << 17); static const int kNumHuffPasses = 4; -CEncoder::CEncoder(): - m_Block(0), - m_NeedHuffmanCreate(true), - m_NumPasses(1), +bool CThreadInfo::Create() +{ + if (!m_BlockSorter.Create(kBlockSizeMax)) + return false; + + if (m_Block == 0) + { + m_Block = (Byte *)::BigAlloc(kBlockSizeMax * 5 + kBlockSizeMax / 10 + (20 << 10)); + if (m_Block == 0) + return false; + m_MtfArray = m_Block + kBlockSizeMax; + m_TempArray = m_MtfArray + kBlockSizeMax * 2 + 2; + } + if (m_NeedHuffmanCreate) + { + for (int i = 0; i < kNumTablesMax; i++) + if (!m_HuffEncoders[i].Create(kMaxAlphaSize, 0, 0, kMaxHuffmanLen)) + return false; + m_NeedHuffmanCreate = false; + } + return true; +} + +void CThreadInfo::Free() +{ + m_BlockSorter.Free(); + ::BigFree(m_Block); + m_Block = 0; +} + +#ifdef COMPRESS_BZIP2_MT +void CThreadInfo::FinishStream() +{ + Encoder->StreamWasFinished = true; + StreamWasFinishedEvent.Set(); + Encoder->CS.Leave(); + Encoder->CanStartWaitingEvent.Lock(); + WaitingWasStartedEvent.Set(); +} + +DWORD CThreadInfo::ThreadFunc() +{ + while (true) + { + Encoder->CS.Enter(); + if (Encoder->CloseThreads) + { + Encoder->CS.Leave(); + return 0; + } + if (Encoder->StreamWasFinished) + { + FinishStream(); + continue; + } + HRESULT res = S_OK; + try + { + UInt32 blockSize = Encoder->ReadRleBlock(m_Block); + m_PackSize = Encoder->m_InStream.GetProcessedSize(); + m_BlockIndex = Encoder->NextBlockIndex; + if (++Encoder->NextBlockIndex == Encoder->NumThreads) + Encoder->NextBlockIndex = 0; + if (blockSize == 0) + { + FinishStream(); + continue; + } + Encoder->CS.Leave(); + res = EncodeBlock3(blockSize); + } + catch(const CInBufferException &e) { res = e.ErrorCode; } + catch(const COutBufferException &e) { res = e.ErrorCode; } + catch(...) { res = E_FAIL; } + if (res != S_OK) + { + Encoder->Result = res; + FinishStream(); + continue; + } + } +} + +static DWORD WINAPI MFThread(void *threadCoderInfo) +{ + return ((CThreadInfo *)threadCoderInfo)->ThreadFunc(); +} +#endif + +CEncoder::CEncoder(): + NumPasses(1), m_OptimizeNumTables(false), m_BlockSizeMult(kBlockSizeMultMax) -{} +{ + #ifdef COMPRESS_BZIP2_MT + ThreadsInfo = 0; + m_NumThreadsPrev = 0; + NumThreads = 1; + CS.Enter(); + #endif +} +#ifdef COMPRESS_BZIP2_MT CEncoder::~CEncoder() { - ::BigFree(m_Block); + Free(); +} + +bool CEncoder::Create() +{ + try + { + if (ThreadsInfo != 0 && m_NumThreadsPrev == NumThreads) + return true; + Free(); + MtMode = (NumThreads > 1); + m_NumThreadsPrev = NumThreads; + ThreadsInfo = new CThreadInfo[NumThreads]; + if (ThreadsInfo == 0) + return false; + for (UInt32 t = 0; t < NumThreads; t++) + { + CThreadInfo &ti = ThreadsInfo[t]; + ti.Encoder = this; + if (MtMode) + if (!ti.Thread.Create(MFThread, &ti)) + { + NumThreads = t; + Free(); + return false; + } + } + } + catch(...) { return false; } + return true; +} + +void CEncoder::Free() +{ + if (!ThreadsInfo) + return; + CloseThreads = true; + CS.Leave(); + for (UInt32 t = 0; t < NumThreads; t++) + { + CThreadInfo &ti = ThreadsInfo[t]; + if (MtMode) + ti.Thread.Wait(); + ti.Free(); + } + delete []ThreadsInfo; + ThreadsInfo = 0; } +#endif UInt32 CEncoder::ReadRleBlock(Byte *buffer) { @@ -66,11 +209,11 @@ UInt32 CEncoder::ReadRleBlock(Byte *buffer) return i; } -void CEncoder::WriteBits2(UInt32 value, UInt32 numBits) +void CThreadInfo::WriteBits2(UInt32 value, UInt32 numBits) { m_OutStreamCurrent->WriteBits(value, numBits); } -void CEncoder::WriteByte2(Byte b) { WriteBits2(b , 8); } -void CEncoder::WriteBit2(bool v) { WriteBits2((v ? 1 : 0), 1); } -void CEncoder::WriteCRC2(UInt32 v) +void CThreadInfo::WriteByte2(Byte b) { WriteBits2(b , 8); } +void CThreadInfo::WriteBit2(bool v) { WriteBits2((v ? 1 : 0), 1); } +void CThreadInfo::WriteCRC2(UInt32 v) { for (int i = 0; i < 4; i++) WriteByte2(((Byte)(v >> (24 - i * 8)))); @@ -88,7 +231,7 @@ void CEncoder::WriteCRC(UInt32 v) // blockSize > 0 -void CEncoder::EncodeBlock(Byte *block, UInt32 blockSize) +void CThreadInfo::EncodeBlock(Byte *block, UInt32 blockSize) { WriteBit2(false); // Randomised = false @@ -404,7 +547,7 @@ void CEncoder::EncodeBlock(Byte *block, UInt32 blockSize) } // blockSize > 0 -UInt32 CEncoder::EncodeBlockWithHeaders(Byte *block, UInt32 blockSize) +UInt32 CThreadInfo::EncodeBlockWithHeaders(Byte *block, UInt32 blockSize) { WriteByte2(kBlockSig0); WriteByte2(kBlockSig1); @@ -443,11 +586,10 @@ UInt32 CEncoder::EncodeBlockWithHeaders(Byte *block, UInt32 blockSize) return crcRes; } -void CEncoder::EncodeBlock2(CBZip2CombinedCRC &combinedCRC, - Byte *block, UInt32 blockSize, UInt32 numPasses) +void CThreadInfo::EncodeBlock2(Byte *block, UInt32 blockSize, UInt32 numPasses) { + UInt32 numCrcs = m_NumCrcs; bool needCompare = false; - CBZip2CombinedCRC specCombinedCRC = combinedCRC; UInt32 startBytePos = m_OutStreamCurrent->GetBytePos(); UInt32 startPos = m_OutStreamCurrent->GetPos(); @@ -462,9 +604,8 @@ void CEncoder::EncodeBlock2(CBZip2CombinedCRC &combinedCRC, blockSize0 < blockSize; blockSize0++); if (blockSize0 < blockSize) { - EncodeBlock2(specCombinedCRC, block, blockSize0, numPasses - 1); - EncodeBlock2(specCombinedCRC, block + blockSize0, blockSize - blockSize0, - numPasses - 1); + EncodeBlock2(block, blockSize0, numPasses - 1); + EncodeBlock2(block + blockSize0, blockSize - blockSize0, numPasses - 1); endPos = m_OutStreamCurrent->GetPos(); endCurByte = m_OutStreamCurrent->GetCurByte(); if ((endPos & 7) > 0) @@ -479,8 +620,6 @@ void CEncoder::EncodeBlock2(CBZip2CombinedCRC &combinedCRC, UInt32 crcVal = EncodeBlockWithHeaders(block, blockSize); UInt32 endPos2 = m_OutStreamCurrent->GetPos(); - combinedCRC.Update(crcVal); - if (needCompare) { UInt32 size2 = endPos2 - startPos2; @@ -491,59 +630,102 @@ void CEncoder::EncodeBlock2(CBZip2CombinedCRC &combinedCRC, for (UInt32 i = 0; i < numBytes; i++) buffer[startBytePos + i] = buffer[startBytePos2 + i]; m_OutStreamCurrent->SetPos(startPos + endPos2 - startPos2); + m_NumCrcs = numCrcs; + m_CRCs[m_NumCrcs++] = crcVal; } else { m_OutStreamCurrent->SetPos(endPos); m_OutStreamCurrent->SetCurState((endPos & 7), endCurByte); - combinedCRC = specCombinedCRC; } } + else + { + m_NumCrcs = numCrcs; + m_CRCs[m_NumCrcs++] = crcVal; + } } -void CEncoder::EncodeBlock3(CBZip2CombinedCRC &combinedCRC, UInt32 blockSize) +HRESULT CThreadInfo::EncodeBlock3(UInt32 blockSize) { CMsbfEncoderTemp outStreamTemp; outStreamTemp.SetStream(m_TempArray); outStreamTemp.Init(); m_OutStreamCurrent = &outStreamTemp; - EncodeBlock2(combinedCRC, m_Block, blockSize, m_NumPasses); + m_NumCrcs = 0; + + EncodeBlock2(m_Block, blockSize, Encoder->NumPasses); + + #ifdef COMPRESS_BZIP2_MT + if (Encoder->MtMode) + Encoder->ThreadsInfo[m_BlockIndex].CanWriteEvent.Lock(); + #endif + for (UInt32 i = 0; i < m_NumCrcs; i++) + Encoder->CombinedCRC.Update(m_CRCs[i]); + Encoder->WriteBytes(m_TempArray, outStreamTemp.GetPos(), outStreamTemp.GetCurByte()); + HRESULT res = S_OK; + #ifdef COMPRESS_BZIP2_MT + if (Encoder->MtMode) + { + UInt32 blockIndex = m_BlockIndex + 1; + if (blockIndex == Encoder->NumThreads) + blockIndex = 0; + + if (Encoder->Progress) + { + UInt64 unpackSize = Encoder->m_OutStream.GetProcessedSize(); + res = Encoder->Progress->SetRatioInfo(&m_PackSize, &unpackSize); + } + + Encoder->ThreadsInfo[blockIndex].CanWriteEvent.Set(); + } + #endif + return res; +} - UInt32 size = outStreamTemp.GetPos(); - UInt32 bytesSize = (size / 8); +void CEncoder::WriteBytes(const Byte *data, UInt32 sizeInBits, Byte lastByte) +{ + UInt32 bytesSize = (sizeInBits / 8); for (UInt32 i = 0; i < bytesSize; i++) - m_OutStream.WriteBits(m_TempArray[i], 8); - WriteBits(outStreamTemp.GetCurByte(), (size & 7)); + m_OutStream.WriteBits(data[i], 8); + WriteBits(lastByte, (sizeInBits & 7)); } + HRESULT CEncoder::CodeReal(ISequentialInStream *inStream, ISequentialOutStream *outStream, const UInt64 *inSize, const UInt64 *outSize, ICompressProgressInfo *progress) { - if (!m_BlockSorter.Create(kBlockSizeMax)) - return E_OUTOFMEMORY; - - if (m_Block == 0) + #ifdef COMPRESS_BZIP2_MT + Progress = progress; + if (!Create()) + return E_FAIL; + for (UInt32 t = 0; t < NumThreads; t++) + #endif { - m_Block = (Byte *)BigAlloc(kBlockSizeMax * 5 + kBlockSizeMax / 10 + (20 << 10)); - if (m_Block == 0) + #ifdef COMPRESS_BZIP2_MT + CThreadInfo &ti = ThreadsInfo[t]; + ti.StreamWasFinishedEvent.Reset(); + ti.WaitingWasStartedEvent.Reset(); + ti.CanWriteEvent.Reset(); + #else + CThreadInfo &ti = ThreadsInfo; + ti.Encoder = this; + #endif + + ti.m_OptimizeNumTables = m_OptimizeNumTables; + + if (!ti.Create()) return E_OUTOFMEMORY; - m_MtfArray = m_Block + kBlockSizeMax; - m_TempArray = m_MtfArray + kBlockSizeMax * 2 + 2; } + + if (!m_InStream.Create(kBufferSize)) return E_OUTOFMEMORY; if (!m_OutStream.Create(kBufferSize)) return E_OUTOFMEMORY; - if (m_NeedHuffmanCreate) - { - for (int i = 0; i < kNumTablesMax; i++) - if (!m_HuffEncoders[i].Create(kMaxAlphaSize, 0, 0, kMaxHuffmanLen)) - return E_OUTOFMEMORY; - m_NeedHuffmanCreate = false; - } m_InStream.SetStream(inStream); m_InStream.Init(); @@ -553,24 +735,57 @@ HRESULT CEncoder::CodeReal(ISequentialInStream *inStream, CFlusher flusher(this); - CBZip2CombinedCRC combinedCRC; + CombinedCRC.Init(); + #ifdef COMPRESS_BZIP2_MT + NextBlockIndex = 0; + StreamWasFinished = false; + CloseThreads = false; + CanStartWaitingEvent.Reset(); + #endif WriteByte(kArSig0); WriteByte(kArSig1); WriteByte(kArSig2); WriteByte((Byte)(kArSig3 + m_BlockSizeMult)); - while (true) + #ifdef COMPRESS_BZIP2_MT + + if (MtMode) { - UInt32 blockSize = ReadRleBlock(m_Block); - if (blockSize == 0) - break; - EncodeBlock3(combinedCRC, blockSize); - if (progress) + ThreadsInfo[0].CanWriteEvent.Set(); + Result = S_OK; + CS.Leave(); + UInt32 t; + for (t = 0; t < NumThreads; t++) + ThreadsInfo[t].StreamWasFinishedEvent.Lock(); + CS.Enter(); + CanStartWaitingEvent.Set(); + for (t = 0; t < NumThreads; t++) + ThreadsInfo[t].WaitingWasStartedEvent.Lock(); + CanStartWaitingEvent.Reset(); + RINOK(Result); + } + else + #endif + { + while (true) { - UInt64 packSize = m_InStream.GetProcessedSize(); - UInt64 unpackSize = m_OutStream.GetProcessedSize(); - RINOK(progress->SetRatioInfo(&packSize, &unpackSize)); + CThreadInfo &ti = + #ifdef COMPRESS_BZIP2_MT + ThreadsInfo[0]; + #else + ThreadsInfo; + #endif + UInt32 blockSize = ReadRleBlock(ti.m_Block); + if (blockSize == 0) + break; + RINOK(ti.EncodeBlock3(blockSize)); + if (progress) + { + UInt64 packSize = m_InStream.GetProcessedSize(); + UInt64 unpackSize = m_OutStream.GetProcessedSize(); + RINOK(progress->SetRatioInfo(&packSize, &unpackSize)); + } } } WriteByte(kFinSig0); @@ -580,7 +795,7 @@ HRESULT CEncoder::CodeReal(ISequentialInStream *inStream, WriteByte(kFinSig4); WriteByte(kFinSig5); - WriteCRC(combinedCRC.GetDigest()); + WriteCRC(CombinedCRC.GetDigest()); return S_OK; } @@ -589,6 +804,8 @@ STDMETHODIMP CEncoder::Code(ISequentialInStream *inStream, ICompressProgressInfo *progress) { try { return CodeReal(inStream, outStream, inSize, outSize, progress); } + catch(const CInBufferException &e) { return e.ErrorCode; } + catch(const COutBufferException &e) { return e.ErrorCode; } catch(...) { return S_FALSE; } } @@ -605,10 +822,12 @@ HRESULT CEncoder::SetCoderProperties(const PROPID *propIDs, if (property.vt != VT_UI4) return E_INVALIDARG; UInt32 numPasses = property.ulVal; - if(numPasses == 0 || numPasses > 10) - return E_INVALIDARG; - m_NumPasses = numPasses; - m_OptimizeNumTables = (m_NumPasses > 1); + if (numPasses == 0) + numPasses = 1; + if (numPasses > kNumPassesMax) + numPasses = kNumPassesMax; + NumPasses = numPasses; + m_OptimizeNumTables = (NumPasses > 1); break; } case NCoderPropID::kDictionarySize: @@ -623,6 +842,17 @@ HRESULT CEncoder::SetCoderProperties(const PROPID *propIDs, m_BlockSizeMult = dictionary; break; } + case NCoderPropID::kNumThreads: + { + #ifdef COMPRESS_BZIP2_MT + if (property.vt != VT_UI4) + return E_INVALIDARG; + NumThreads = property.ulVal; + if (NumThreads < 1) + NumThreads = 1; + #endif + break; + } default: return E_INVALIDARG; } @@ -630,4 +860,14 @@ HRESULT CEncoder::SetCoderProperties(const PROPID *propIDs, return S_OK; } +#ifdef COMPRESS_BZIP2_MT +STDMETHODIMP CEncoder::SetNumberOfThreads(UInt32 numThreads) +{ + NumThreads = numThreads; + if (NumThreads < 1) + NumThreads = 1; + return S_OK; +} +#endif + }} |