Welcome to mirror list, hosted at ThFree Co, Russian Federation.

github.com/kornelski/7z.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
Diffstat (limited to '7zip/Compress/BZip2/BZip2Encoder.cpp')
-rwxr-xr-x7zip/Compress/BZip2/BZip2Encoder.cpp356
1 files changed, 298 insertions, 58 deletions
diff --git a/7zip/Compress/BZip2/BZip2Encoder.cpp b/7zip/Compress/BZip2/BZip2Encoder.cpp
index 657fcfb2..dc08e7a8 100755
--- a/7zip/Compress/BZip2/BZip2Encoder.cpp
+++ b/7zip/Compress/BZip2/BZip2Encoder.cpp
@@ -2,9 +2,10 @@
#include "StdAfx.h"
+#include "../../../Common/Alloc.h"
+
#include "BZip2Encoder.h"
-#include "../../../Common/Alloc.h"
#include "../BWT/Mtf8.h"
#include "BZip2CRC.h"
@@ -14,18 +15,160 @@ namespace NBZip2 {
static const UInt32 kBufferSize = (1 << 17);
static const int kNumHuffPasses = 4;
-CEncoder::CEncoder():
- m_Block(0),
- m_NeedHuffmanCreate(true),
- m_NumPasses(1),
+bool CThreadInfo::Create()
+{
+ if (!m_BlockSorter.Create(kBlockSizeMax))
+ return false;
+
+ if (m_Block == 0)
+ {
+ m_Block = (Byte *)::BigAlloc(kBlockSizeMax * 5 + kBlockSizeMax / 10 + (20 << 10));
+ if (m_Block == 0)
+ return false;
+ m_MtfArray = m_Block + kBlockSizeMax;
+ m_TempArray = m_MtfArray + kBlockSizeMax * 2 + 2;
+ }
+ if (m_NeedHuffmanCreate)
+ {
+ for (int i = 0; i < kNumTablesMax; i++)
+ if (!m_HuffEncoders[i].Create(kMaxAlphaSize, 0, 0, kMaxHuffmanLen))
+ return false;
+ m_NeedHuffmanCreate = false;
+ }
+ return true;
+}
+
+void CThreadInfo::Free()
+{
+ m_BlockSorter.Free();
+ ::BigFree(m_Block);
+ m_Block = 0;
+}
+
+#ifdef COMPRESS_BZIP2_MT
+void CThreadInfo::FinishStream()
+{
+ Encoder->StreamWasFinished = true;
+ StreamWasFinishedEvent.Set();
+ Encoder->CS.Leave();
+ Encoder->CanStartWaitingEvent.Lock();
+ WaitingWasStartedEvent.Set();
+}
+
+DWORD CThreadInfo::ThreadFunc()
+{
+ while (true)
+ {
+ Encoder->CS.Enter();
+ if (Encoder->CloseThreads)
+ {
+ Encoder->CS.Leave();
+ return 0;
+ }
+ if (Encoder->StreamWasFinished)
+ {
+ FinishStream();
+ continue;
+ }
+ HRESULT res = S_OK;
+ try
+ {
+ UInt32 blockSize = Encoder->ReadRleBlock(m_Block);
+ m_PackSize = Encoder->m_InStream.GetProcessedSize();
+ m_BlockIndex = Encoder->NextBlockIndex;
+ if (++Encoder->NextBlockIndex == Encoder->NumThreads)
+ Encoder->NextBlockIndex = 0;
+ if (blockSize == 0)
+ {
+ FinishStream();
+ continue;
+ }
+ Encoder->CS.Leave();
+ res = EncodeBlock3(blockSize);
+ }
+ catch(const CInBufferException &e) { res = e.ErrorCode; }
+ catch(const COutBufferException &e) { res = e.ErrorCode; }
+ catch(...) { res = E_FAIL; }
+ if (res != S_OK)
+ {
+ Encoder->Result = res;
+ FinishStream();
+ continue;
+ }
+ }
+}
+
+static DWORD WINAPI MFThread(void *threadCoderInfo)
+{
+ return ((CThreadInfo *)threadCoderInfo)->ThreadFunc();
+}
+#endif
+
+CEncoder::CEncoder():
+ NumPasses(1),
m_OptimizeNumTables(false),
m_BlockSizeMult(kBlockSizeMultMax)
-{}
+{
+ #ifdef COMPRESS_BZIP2_MT
+ ThreadsInfo = 0;
+ m_NumThreadsPrev = 0;
+ NumThreads = 1;
+ CS.Enter();
+ #endif
+}
+#ifdef COMPRESS_BZIP2_MT
CEncoder::~CEncoder()
{
- ::BigFree(m_Block);
+ Free();
+}
+
+bool CEncoder::Create()
+{
+ try
+ {
+ if (ThreadsInfo != 0 && m_NumThreadsPrev == NumThreads)
+ return true;
+ Free();
+ MtMode = (NumThreads > 1);
+ m_NumThreadsPrev = NumThreads;
+ ThreadsInfo = new CThreadInfo[NumThreads];
+ if (ThreadsInfo == 0)
+ return false;
+ for (UInt32 t = 0; t < NumThreads; t++)
+ {
+ CThreadInfo &ti = ThreadsInfo[t];
+ ti.Encoder = this;
+ if (MtMode)
+ if (!ti.Thread.Create(MFThread, &ti))
+ {
+ NumThreads = t;
+ Free();
+ return false;
+ }
+ }
+ }
+ catch(...) { return false; }
+ return true;
+}
+
+void CEncoder::Free()
+{
+ if (!ThreadsInfo)
+ return;
+ CloseThreads = true;
+ CS.Leave();
+ for (UInt32 t = 0; t < NumThreads; t++)
+ {
+ CThreadInfo &ti = ThreadsInfo[t];
+ if (MtMode)
+ ti.Thread.Wait();
+ ti.Free();
+ }
+ delete []ThreadsInfo;
+ ThreadsInfo = 0;
}
+#endif
UInt32 CEncoder::ReadRleBlock(Byte *buffer)
{
@@ -66,11 +209,11 @@ UInt32 CEncoder::ReadRleBlock(Byte *buffer)
return i;
}
-void CEncoder::WriteBits2(UInt32 value, UInt32 numBits)
+void CThreadInfo::WriteBits2(UInt32 value, UInt32 numBits)
{ m_OutStreamCurrent->WriteBits(value, numBits); }
-void CEncoder::WriteByte2(Byte b) { WriteBits2(b , 8); }
-void CEncoder::WriteBit2(bool v) { WriteBits2((v ? 1 : 0), 1); }
-void CEncoder::WriteCRC2(UInt32 v)
+void CThreadInfo::WriteByte2(Byte b) { WriteBits2(b , 8); }
+void CThreadInfo::WriteBit2(bool v) { WriteBits2((v ? 1 : 0), 1); }
+void CThreadInfo::WriteCRC2(UInt32 v)
{
for (int i = 0; i < 4; i++)
WriteByte2(((Byte)(v >> (24 - i * 8))));
@@ -88,7 +231,7 @@ void CEncoder::WriteCRC(UInt32 v)
// blockSize > 0
-void CEncoder::EncodeBlock(Byte *block, UInt32 blockSize)
+void CThreadInfo::EncodeBlock(Byte *block, UInt32 blockSize)
{
WriteBit2(false); // Randomised = false
@@ -404,7 +547,7 @@ void CEncoder::EncodeBlock(Byte *block, UInt32 blockSize)
}
// blockSize > 0
-UInt32 CEncoder::EncodeBlockWithHeaders(Byte *block, UInt32 blockSize)
+UInt32 CThreadInfo::EncodeBlockWithHeaders(Byte *block, UInt32 blockSize)
{
WriteByte2(kBlockSig0);
WriteByte2(kBlockSig1);
@@ -443,11 +586,10 @@ UInt32 CEncoder::EncodeBlockWithHeaders(Byte *block, UInt32 blockSize)
return crcRes;
}
-void CEncoder::EncodeBlock2(CBZip2CombinedCRC &combinedCRC,
- Byte *block, UInt32 blockSize, UInt32 numPasses)
+void CThreadInfo::EncodeBlock2(Byte *block, UInt32 blockSize, UInt32 numPasses)
{
+ UInt32 numCrcs = m_NumCrcs;
bool needCompare = false;
- CBZip2CombinedCRC specCombinedCRC = combinedCRC;
UInt32 startBytePos = m_OutStreamCurrent->GetBytePos();
UInt32 startPos = m_OutStreamCurrent->GetPos();
@@ -462,9 +604,8 @@ void CEncoder::EncodeBlock2(CBZip2CombinedCRC &combinedCRC,
blockSize0 < blockSize; blockSize0++);
if (blockSize0 < blockSize)
{
- EncodeBlock2(specCombinedCRC, block, blockSize0, numPasses - 1);
- EncodeBlock2(specCombinedCRC, block + blockSize0, blockSize - blockSize0,
- numPasses - 1);
+ EncodeBlock2(block, blockSize0, numPasses - 1);
+ EncodeBlock2(block + blockSize0, blockSize - blockSize0, numPasses - 1);
endPos = m_OutStreamCurrent->GetPos();
endCurByte = m_OutStreamCurrent->GetCurByte();
if ((endPos & 7) > 0)
@@ -479,8 +620,6 @@ void CEncoder::EncodeBlock2(CBZip2CombinedCRC &combinedCRC,
UInt32 crcVal = EncodeBlockWithHeaders(block, blockSize);
UInt32 endPos2 = m_OutStreamCurrent->GetPos();
- combinedCRC.Update(crcVal);
-
if (needCompare)
{
UInt32 size2 = endPos2 - startPos2;
@@ -491,59 +630,102 @@ void CEncoder::EncodeBlock2(CBZip2CombinedCRC &combinedCRC,
for (UInt32 i = 0; i < numBytes; i++)
buffer[startBytePos + i] = buffer[startBytePos2 + i];
m_OutStreamCurrent->SetPos(startPos + endPos2 - startPos2);
+ m_NumCrcs = numCrcs;
+ m_CRCs[m_NumCrcs++] = crcVal;
}
else
{
m_OutStreamCurrent->SetPos(endPos);
m_OutStreamCurrent->SetCurState((endPos & 7), endCurByte);
- combinedCRC = specCombinedCRC;
}
}
+ else
+ {
+ m_NumCrcs = numCrcs;
+ m_CRCs[m_NumCrcs++] = crcVal;
+ }
}
-void CEncoder::EncodeBlock3(CBZip2CombinedCRC &combinedCRC, UInt32 blockSize)
+HRESULT CThreadInfo::EncodeBlock3(UInt32 blockSize)
{
CMsbfEncoderTemp outStreamTemp;
outStreamTemp.SetStream(m_TempArray);
outStreamTemp.Init();
m_OutStreamCurrent = &outStreamTemp;
- EncodeBlock2(combinedCRC, m_Block, blockSize, m_NumPasses);
+ m_NumCrcs = 0;
+
+ EncodeBlock2(m_Block, blockSize, Encoder->NumPasses);
+
+ #ifdef COMPRESS_BZIP2_MT
+ if (Encoder->MtMode)
+ Encoder->ThreadsInfo[m_BlockIndex].CanWriteEvent.Lock();
+ #endif
+ for (UInt32 i = 0; i < m_NumCrcs; i++)
+ Encoder->CombinedCRC.Update(m_CRCs[i]);
+ Encoder->WriteBytes(m_TempArray, outStreamTemp.GetPos(), outStreamTemp.GetCurByte());
+ HRESULT res = S_OK;
+ #ifdef COMPRESS_BZIP2_MT
+ if (Encoder->MtMode)
+ {
+ UInt32 blockIndex = m_BlockIndex + 1;
+ if (blockIndex == Encoder->NumThreads)
+ blockIndex = 0;
+
+ if (Encoder->Progress)
+ {
+ UInt64 unpackSize = Encoder->m_OutStream.GetProcessedSize();
+ res = Encoder->Progress->SetRatioInfo(&m_PackSize, &unpackSize);
+ }
+
+ Encoder->ThreadsInfo[blockIndex].CanWriteEvent.Set();
+ }
+ #endif
+ return res;
+}
- UInt32 size = outStreamTemp.GetPos();
- UInt32 bytesSize = (size / 8);
+void CEncoder::WriteBytes(const Byte *data, UInt32 sizeInBits, Byte lastByte)
+{
+ UInt32 bytesSize = (sizeInBits / 8);
for (UInt32 i = 0; i < bytesSize; i++)
- m_OutStream.WriteBits(m_TempArray[i], 8);
- WriteBits(outStreamTemp.GetCurByte(), (size & 7));
+ m_OutStream.WriteBits(data[i], 8);
+ WriteBits(lastByte, (sizeInBits & 7));
}
+
HRESULT CEncoder::CodeReal(ISequentialInStream *inStream,
ISequentialOutStream *outStream, const UInt64 *inSize, const UInt64 *outSize,
ICompressProgressInfo *progress)
{
- if (!m_BlockSorter.Create(kBlockSizeMax))
- return E_OUTOFMEMORY;
-
- if (m_Block == 0)
+ #ifdef COMPRESS_BZIP2_MT
+ Progress = progress;
+ if (!Create())
+ return E_FAIL;
+ for (UInt32 t = 0; t < NumThreads; t++)
+ #endif
{
- m_Block = (Byte *)BigAlloc(kBlockSizeMax * 5 + kBlockSizeMax / 10 + (20 << 10));
- if (m_Block == 0)
+ #ifdef COMPRESS_BZIP2_MT
+ CThreadInfo &ti = ThreadsInfo[t];
+ ti.StreamWasFinishedEvent.Reset();
+ ti.WaitingWasStartedEvent.Reset();
+ ti.CanWriteEvent.Reset();
+ #else
+ CThreadInfo &ti = ThreadsInfo;
+ ti.Encoder = this;
+ #endif
+
+ ti.m_OptimizeNumTables = m_OptimizeNumTables;
+
+ if (!ti.Create())
return E_OUTOFMEMORY;
- m_MtfArray = m_Block + kBlockSizeMax;
- m_TempArray = m_MtfArray + kBlockSizeMax * 2 + 2;
}
+
+
if (!m_InStream.Create(kBufferSize))
return E_OUTOFMEMORY;
if (!m_OutStream.Create(kBufferSize))
return E_OUTOFMEMORY;
- if (m_NeedHuffmanCreate)
- {
- for (int i = 0; i < kNumTablesMax; i++)
- if (!m_HuffEncoders[i].Create(kMaxAlphaSize, 0, 0, kMaxHuffmanLen))
- return E_OUTOFMEMORY;
- m_NeedHuffmanCreate = false;
- }
m_InStream.SetStream(inStream);
m_InStream.Init();
@@ -553,24 +735,57 @@ HRESULT CEncoder::CodeReal(ISequentialInStream *inStream,
CFlusher flusher(this);
- CBZip2CombinedCRC combinedCRC;
+ CombinedCRC.Init();
+ #ifdef COMPRESS_BZIP2_MT
+ NextBlockIndex = 0;
+ StreamWasFinished = false;
+ CloseThreads = false;
+ CanStartWaitingEvent.Reset();
+ #endif
WriteByte(kArSig0);
WriteByte(kArSig1);
WriteByte(kArSig2);
WriteByte((Byte)(kArSig3 + m_BlockSizeMult));
- while (true)
+ #ifdef COMPRESS_BZIP2_MT
+
+ if (MtMode)
{
- UInt32 blockSize = ReadRleBlock(m_Block);
- if (blockSize == 0)
- break;
- EncodeBlock3(combinedCRC, blockSize);
- if (progress)
+ ThreadsInfo[0].CanWriteEvent.Set();
+ Result = S_OK;
+ CS.Leave();
+ UInt32 t;
+ for (t = 0; t < NumThreads; t++)
+ ThreadsInfo[t].StreamWasFinishedEvent.Lock();
+ CS.Enter();
+ CanStartWaitingEvent.Set();
+ for (t = 0; t < NumThreads; t++)
+ ThreadsInfo[t].WaitingWasStartedEvent.Lock();
+ CanStartWaitingEvent.Reset();
+ RINOK(Result);
+ }
+ else
+ #endif
+ {
+ while (true)
{
- UInt64 packSize = m_InStream.GetProcessedSize();
- UInt64 unpackSize = m_OutStream.GetProcessedSize();
- RINOK(progress->SetRatioInfo(&packSize, &unpackSize));
+ CThreadInfo &ti =
+ #ifdef COMPRESS_BZIP2_MT
+ ThreadsInfo[0];
+ #else
+ ThreadsInfo;
+ #endif
+ UInt32 blockSize = ReadRleBlock(ti.m_Block);
+ if (blockSize == 0)
+ break;
+ RINOK(ti.EncodeBlock3(blockSize));
+ if (progress)
+ {
+ UInt64 packSize = m_InStream.GetProcessedSize();
+ UInt64 unpackSize = m_OutStream.GetProcessedSize();
+ RINOK(progress->SetRatioInfo(&packSize, &unpackSize));
+ }
}
}
WriteByte(kFinSig0);
@@ -580,7 +795,7 @@ HRESULT CEncoder::CodeReal(ISequentialInStream *inStream,
WriteByte(kFinSig4);
WriteByte(kFinSig5);
- WriteCRC(combinedCRC.GetDigest());
+ WriteCRC(CombinedCRC.GetDigest());
return S_OK;
}
@@ -589,6 +804,8 @@ STDMETHODIMP CEncoder::Code(ISequentialInStream *inStream,
ICompressProgressInfo *progress)
{
try { return CodeReal(inStream, outStream, inSize, outSize, progress); }
+ catch(const CInBufferException &e) { return e.ErrorCode; }
+ catch(const COutBufferException &e) { return e.ErrorCode; }
catch(...) { return S_FALSE; }
}
@@ -605,10 +822,12 @@ HRESULT CEncoder::SetCoderProperties(const PROPID *propIDs,
if (property.vt != VT_UI4)
return E_INVALIDARG;
UInt32 numPasses = property.ulVal;
- if(numPasses == 0 || numPasses > 10)
- return E_INVALIDARG;
- m_NumPasses = numPasses;
- m_OptimizeNumTables = (m_NumPasses > 1);
+ if (numPasses == 0)
+ numPasses = 1;
+ if (numPasses > kNumPassesMax)
+ numPasses = kNumPassesMax;
+ NumPasses = numPasses;
+ m_OptimizeNumTables = (NumPasses > 1);
break;
}
case NCoderPropID::kDictionarySize:
@@ -623,6 +842,17 @@ HRESULT CEncoder::SetCoderProperties(const PROPID *propIDs,
m_BlockSizeMult = dictionary;
break;
}
+ case NCoderPropID::kNumThreads:
+ {
+ #ifdef COMPRESS_BZIP2_MT
+ if (property.vt != VT_UI4)
+ return E_INVALIDARG;
+ NumThreads = property.ulVal;
+ if (NumThreads < 1)
+ NumThreads = 1;
+ #endif
+ break;
+ }
default:
return E_INVALIDARG;
}
@@ -630,4 +860,14 @@ HRESULT CEncoder::SetCoderProperties(const PROPID *propIDs,
return S_OK;
}
+#ifdef COMPRESS_BZIP2_MT
+STDMETHODIMP CEncoder::SetNumberOfThreads(UInt32 numThreads)
+{
+ NumThreads = numThreads;
+ if (NumThreads < 1)
+ NumThreads = 1;
+ return S_OK;
+}
+#endif
+
}}