Welcome to mirror list, hosted at ThFree Co, Russian Federation.

threadpool.cpp « unrar « thirdparty « src - github.com/mpc-hc/mpc-hc.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
blob: 732dd75375beba6fb8133f190455e348e4295390 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
#include "rar.hpp"

#ifdef RAR_SMP
#include "threadmisc.cpp"

#ifdef _WIN_ALL
int ThreadPool::ThreadPriority=THREAD_PRIORITY_NORMAL;
#endif

ThreadPool::ThreadPool(uint MaxThreads)
{
  MaxAllowedThreads = MaxThreads;
  if (MaxAllowedThreads>MaxPoolThreads)
    MaxAllowedThreads=MaxPoolThreads;
  if (MaxAllowedThreads==0)
    MaxAllowedThreads=1;

  ThreadsCreatedCount=0;

  // If we have more threads than queue size, we'll hang on pool destroying,
  // not releasing all waiting threads.
  if (MaxAllowedThreads>ASIZE(TaskQueue))
    MaxAllowedThreads=ASIZE(TaskQueue);

  Closing=false;

  bool Success = CriticalSectionCreate(&CritSection);
#ifdef _WIN_ALL
  QueuedTasksCnt=CreateSemaphore(NULL,0,ASIZE(TaskQueue),NULL);
  NoneActive=CreateEvent(NULL,TRUE,TRUE,NULL);
  Success=Success && QueuedTasksCnt!=NULL && NoneActive!=NULL;
#elif defined(_UNIX)
  AnyActive = false;
  QueuedTasksCnt = 0;
  Success=Success && pthread_cond_init(&AnyActiveCond,NULL)==0 &&
          pthread_mutex_init(&AnyActiveMutex,NULL)==0 &&
          pthread_cond_init(&QueuedTasksCntCond,NULL)==0 &&
          pthread_mutex_init(&QueuedTasksCntMutex,NULL)==0;
#endif
  if (!Success)
  {
    ErrHandler.GeneralErrMsg(L"\nThread pool initialization failed.");
    ErrHandler.Exit(RARX_FATAL);
  }

  QueueTop = 0;
  QueueBottom = 0;
  ActiveThreads = 0;
}


ThreadPool::~ThreadPool()
{
  WaitDone();
  Closing=true;

#ifdef _WIN_ALL
  ReleaseSemaphore(QueuedTasksCnt,ASIZE(TaskQueue),NULL);
#elif defined(_UNIX)
  // Threads still can access QueuedTasksCnt for a short time after WaitDone(),
  // so lock is required. We would occassionally hang without it.
  pthread_mutex_lock(&QueuedTasksCntMutex);
  QueuedTasksCnt+=ASIZE(TaskQueue);
  pthread_mutex_unlock(&QueuedTasksCntMutex);

  pthread_cond_broadcast(&QueuedTasksCntCond);
#endif

  for(uint I=0;I<ThreadsCreatedCount;I++)
  {
#ifdef _WIN_ALL
    // Waiting until the thread terminates.
    CWaitForSingleObject(ThreadHandles[I]);
#endif
    // Close the thread handle. In Unix it results in pthread_join call,
    // which also waits for thread termination.
    ThreadClose(ThreadHandles[I]);
  }

  CriticalSectionDelete(&CritSection);
#ifdef _WIN_ALL
  CloseHandle(QueuedTasksCnt);
  CloseHandle(NoneActive);
#elif defined(_UNIX)
  pthread_cond_destroy(&AnyActiveCond);
  pthread_mutex_destroy(&AnyActiveMutex);
  pthread_cond_destroy(&QueuedTasksCntCond);
  pthread_mutex_destroy(&QueuedTasksCntMutex);
#endif
}


void ThreadPool::CreateThreads()
{
  for(uint I=0;I<MaxAllowedThreads;I++)
  {
    ThreadHandles[I] = ThreadCreate(PoolThread, this);
    ThreadsCreatedCount++;
#ifdef _WIN_ALL
    if (ThreadPool::ThreadPriority!=THREAD_PRIORITY_NORMAL)
      SetThreadPriority(ThreadHandles[I],ThreadPool::ThreadPriority);
#endif
  }
}


NATIVE_THREAD_TYPE ThreadPool::PoolThread(void *Param)
{
  ((ThreadPool*)Param)->PoolThreadLoop();
  return 0;
}


void ThreadPool::PoolThreadLoop()
{
  QueueEntry Task;
  while (GetQueuedTask(&Task))
  {
    Task.Proc(Task.Param);
    
    CriticalSectionStart(&CritSection); 
    if (--ActiveThreads == 0)
    {
#ifdef _WIN_ALL
      SetEvent(NoneActive);
#elif defined(_UNIX)
      pthread_mutex_lock(&AnyActiveMutex);
      AnyActive=false;
      pthread_cond_signal(&AnyActiveCond);
      pthread_mutex_unlock(&AnyActiveMutex);
#endif
    }
    CriticalSectionEnd(&CritSection); 
  }
}


bool ThreadPool::GetQueuedTask(QueueEntry *Task)
{
#ifdef _WIN_ALL
  CWaitForSingleObject(QueuedTasksCnt);
#elif defined(_UNIX)
  pthread_mutex_lock(&QueuedTasksCntMutex);
  while (QueuedTasksCnt==0)
    cpthread_cond_wait(&QueuedTasksCntCond,&QueuedTasksCntMutex);
  QueuedTasksCnt--;
  pthread_mutex_unlock(&QueuedTasksCntMutex);
#endif

  if (Closing)
    return false;

  CriticalSectionStart(&CritSection); 

  *Task = TaskQueue[QueueBottom];
  QueueBottom = (QueueBottom + 1) % ASIZE(TaskQueue);

  CriticalSectionEnd(&CritSection); 

  return true;
}


// Add task to queue. We assume that it is always called from main thread,
// it allows to avoid any locks here. We process collected tasks only
// when WaitDone is called.
void ThreadPool::AddTask(PTHREAD_PROC Proc,void *Data)
{
  if (ThreadsCreatedCount == 0)
    CreateThreads();
  
  // If queue is full, wait until it is empty.
  if ((QueueTop + 1) % ASIZE(TaskQueue) == QueueBottom)
    WaitDone();

  TaskQueue[QueueTop].Proc = Proc;
  TaskQueue[QueueTop].Param = Data;
  QueueTop = (QueueTop + 1) % ASIZE(TaskQueue);
}


// Start queued tasks and wait until all threads are inactive.
// We assume that it is always called from main thread, when pool threads
// are sleeping yet.
void ThreadPool::WaitDone()
{
  // We add ASIZE(TaskQueue) for case if TaskQueue array size is not
  // a power of two. Negative numbers would not suit our purpose here.
  ActiveThreads=(QueueTop+ASIZE(TaskQueue)-QueueBottom) % ASIZE(TaskQueue);
  if (ActiveThreads==0)
    return;
#ifdef _WIN_ALL
  ResetEvent(NoneActive);
  ReleaseSemaphore(QueuedTasksCnt,ActiveThreads,NULL);
  CWaitForSingleObject(NoneActive);
#elif defined(_UNIX)
  AnyActive=true;

  // Threads reset AnyActive before accessing QueuedTasksCnt and even
  // preceding WaitDone() call does not guarantee that some slow thread
  // is not accessing QueuedTasksCnt now. So lock is necessary.
  pthread_mutex_lock(&QueuedTasksCntMutex);
  QueuedTasksCnt+=ActiveThreads;
  pthread_mutex_unlock(&QueuedTasksCntMutex);

  pthread_cond_broadcast(&QueuedTasksCntCond);

  pthread_mutex_lock(&AnyActiveMutex);
  while (AnyActive)
    cpthread_cond_wait(&AnyActiveCond,&AnyActiveMutex);
  pthread_mutex_unlock(&AnyActiveMutex);
#endif
}
#endif // RAR_SMP