Welcome to mirror list, hosted at ThFree Co, Russian Federation.

cygwin.com/git/newlib-cygwin.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
Diffstat (limited to 'winsup/cygserver/threaded_queue.cc')
-rw-r--r--winsup/cygserver/threaded_queue.cc503
1 files changed, 331 insertions, 172 deletions
diff --git a/winsup/cygserver/threaded_queue.cc b/winsup/cygserver/threaded_queue.cc
index 5fb22b191..ba0fe4178 100644
--- a/winsup/cygserver/threaded_queue.cc
+++ b/winsup/cygserver/threaded_queue.cc
@@ -1,249 +1,408 @@
/* threaded_queue.cc
- Copyright 2001 Red Hat Inc.
+ Copyright 2001, 2002 Red Hat Inc.
Written by Robert Collins <rbtcollins@hotmail.com>
- This file is part of Cygwin.
+This file is part of Cygwin.
- This software is a copyrighted work licensed under the terms of the
- Cygwin license. Please consult the file "CYGWIN_LICENSE" for
- details. */
+This software is a copyrighted work licensed under the terms of the
+Cygwin license. Please consult the file "CYGWIN_LICENSE" for
+details. */
+#include "woutsup.h"
+
+#include <assert.h>
#include <errno.h>
#include <stdio.h>
#include <unistd.h>
-#include <windows.h>
#include <sys/types.h>
-#include "wincap.h"
+#include <stdlib.h>
#include "threaded_queue.h"
-#define DEBUG 1
-#define debug_printf if (DEBUG) printf
+
+/*****************************************************************************/
+
+/* queue_request */
+
+queue_request::~queue_request ()
+{}
+
+/*****************************************************************************/
/* threaded_queue */
-DWORD WINAPI
-worker_function (LPVOID LpParam)
+threaded_queue::threaded_queue (const size_t initial_workers)
+ : _workers_count (0),
+ _running (false),
+ _submitters_head (NULL),
+ _requests_count (0),
+ _requests_head (NULL),
+ _requests_sem (NULL)
{
- class threaded_queue *queue = (class threaded_queue *) LpParam;
- class queue_request *request;
- /* FIXME use a threadsafe pop instead for speed? */
- while (queue->active)
+ InitializeCriticalSection (&_queue_lock);
+
+ // This semaphore's count is the number of requests on the queue.
+ // The maximum count (129792) is calculated as MAXIMUM_WAIT_OBJECTS
+ // multiplied by max. threads per process (2028?), which is (a few)
+ // more requests than could ever be pending with the current design.
+
+ _requests_sem = CreateSemaphore (NULL, // SECURITY_ATTRIBUTES
+ 0, // Initial count
+ 129792, // Maximum count
+ NULL); // Anonymous
+
+ if (!_requests_sem)
{
- EnterCriticalSection (&queue->queuelock);
- while (!queue->request && queue->active)
- {
- LeaveCriticalSection (&queue->queuelock);
- DWORD rc = WaitForSingleObject (queue->event, INFINITE);
- if (rc == WAIT_FAILED)
- {
- printf ("Wait for event failed\n");
- queue->running--;
- ExitThread (0);
- }
- EnterCriticalSection (&queue->queuelock);
- }
- if (!queue->active)
- {
- queue->running--;
- LeaveCriticalSection (&queue->queuelock);
- ExitThread (0);
- }
- /* not needed, but it is efficient */
- request =
- (class queue_request *) InterlockedExchangePointer (&queue->request,
- queue->request->
- next);
- LeaveCriticalSection (&queue->queuelock);
- request->process ();
- delete request;
+ system_printf (("failed to create the request queue semaphore, "
+ "error = %lu"),
+ GetLastError ());
+ abort ();
}
- queue->running--;
- ExitThread (0);
+
+ create_workers (initial_workers);
}
-void
-threaded_queue::create_workers ()
+threaded_queue::~threaded_queue ()
{
- InitializeCriticalSection (&queuelock);
- if ((event = CreateEvent (NULL, FALSE, FALSE, NULL)) == NULL)
+ if (_running)
+ stop ();
+
+ debug_printf ("deleting all pending queue requests");
+ queue_request *reqptr = _requests_head;
+ while (reqptr)
{
- printf ("Failed to create event queue (%lu), terminating\n",
- GetLastError ());
- exit (1);
+ queue_request *const ptr = reqptr;
+ reqptr = reqptr->_next;
+ safe_delete (ptr);
}
- active = true;
- /* FIXME: Use a stack pair and create threads on the fly whenever
- * we have to to service a request.
- */
- for (unsigned int i = 0; i < initial_workers; i++)
+ DeleteCriticalSection (&_queue_lock);
+ if (_requests_sem)
+ (void) CloseHandle (_requests_sem);
+}
+
+/* FIXME: return success or failure rather than quitting */
+void
+threaded_queue::add_submission_loop (queue_submission_loop *const submitter)
+{
+ assert (this);
+ assert (submitter);
+ assert (submitter->_queue == this);
+ assert (!submitter->_next);
+
+ submitter->_next =
+ TInterlockedExchangePointer (&_submitters_head, submitter);
+
+ if (_running)
+ submitter->start ();
+}
+
+bool
+threaded_queue::start ()
+{
+ EnterCriticalSection (&_queue_lock);
+ const bool was_running = _running;
+ _running = true;
+ queue_submission_loop *loopptr = _submitters_head;
+ LeaveCriticalSection (&_queue_lock);
+
+ if (!was_running)
{
- HANDLE hThread;
- DWORD tid;
- hThread = CreateThread (NULL, 0, worker_function, this, 0, &tid);
- if (hThread == NULL)
+ debug_printf ("starting all queue submission loops");
+
+ while (loopptr)
{
- printf ("Failed to create thread (%lu), terminating\n",
- GetLastError ());
- exit (1);
+ queue_submission_loop *const ptr = loopptr;
+ loopptr = loopptr->_next;
+ ptr->start ();
}
- CloseHandle (hThread);
- running++;
}
+
+ return was_running;
}
-void
-threaded_queue::cleanup ()
+bool
+threaded_queue::stop ()
{
- /* harvest the threads */
- active = false;
- /* kill the request processing loops */
- queue_process_param *reqloop;
- /* make sure we don't race with a incoming request creation */
- EnterCriticalSection (&queuelock);
- reqloop =
- (queue_process_param *) InterlockedExchangePointer (&process_head, NULL);
- while (reqloop)
+ EnterCriticalSection (&_queue_lock);
+ const bool was_running = _running;
+ _running = false;
+ queue_submission_loop *loopptr = _submitters_head;
+ LeaveCriticalSection (&_queue_lock);
+
+ if (was_running)
{
- queue_process_param *t = reqloop;
- reqloop = reqloop->next;
- delete t;
+ debug_printf ("stopping all queue submission loops");
+ while (loopptr)
+ {
+ queue_submission_loop *const ptr = loopptr;
+ loopptr = loopptr->_next;
+ ptr->stop ();
+ }
+
+ ReleaseSemaphore (_requests_sem, _workers_count, NULL);
+ while (_workers_count)
+ {
+ debug_printf (("waiting for worker threads to terminate: "
+ "%lu still running"),
+ _workers_count);
+ Sleep (1000);
+ }
+ debug_printf ("all worker threads have terminated");
}
- LeaveCriticalSection (&queuelock);
- if (!running)
- return;
- printf ("Waiting for current queue threads to terminate\n");
- for (int n = running; n; n--)
- PulseEvent (event);
- while (running)
- sleep (1);
- DeleteCriticalSection (&queuelock);
- CloseHandle (event);
+
+ return was_running;
}
/* FIXME: return success or failure */
void
-threaded_queue::add (queue_request * therequest)
+threaded_queue::add (queue_request *const therequest)
{
- /* safe to not "Try" because workers don't hog this, they wait on the event
- */
- EnterCriticalSection (&queuelock);
- if (!running)
+ assert (this);
+ assert (therequest);
+ assert (!therequest->_next);
+
+ if (!_workers_count)
{
- printf ("No worker threads to handle request!\n");
+ system_printf ("warning: no worker threads to handle request!");
+ // FIXME: And then what?
}
- if (!request)
- request = therequest;
+
+ EnterCriticalSection (&_queue_lock);
+ if (!_requests_head)
+ _requests_head = therequest;
else
{
- /* add to the queue end. */
- queue_request *listrequest = request;
- while (listrequest->next)
- listrequest = listrequest->next;
- listrequest->next = therequest;
+ /* Add to the queue end. */
+ queue_request *reqptr = _requests_head;
+ for (; reqptr->_next; reqptr = reqptr->_next)
+ {}
+ assert (reqptr);
+ assert (!reqptr->_next);
+ reqptr->_next = therequest;
}
- PulseEvent (event);
- LeaveCriticalSection (&queuelock);
+
+ _requests_count += 1;
+ assert (_requests_count > 0);
+ LeaveCriticalSection (&_queue_lock);
+
+ (void) ReleaseSemaphore (_requests_sem, 1, NULL);
}
-/* FIXME: return success or failure rather than quitting */
+/*static*/ DWORD WINAPI
+threaded_queue::start_routine (const LPVOID lpParam)
+{
+ class threaded_queue *const queue = (class threaded_queue *) lpParam;
+ assert (queue);
+
+ queue->worker_loop ();
+
+ const long count = InterlockedDecrement (&queue->_workers_count);
+ assert (count >= 0);
+
+ if (queue->_running)
+ debug_printf ("worker loop has exited; thread about to terminate");
+
+ return 0;
+}
+
+/* Called from the constructor: so no need to be thread-safe until the
+ * worker threads start to be created; thus the interlocked increment
+ * of the `_workers_count' field.
+ */
+
void
-threaded_queue::process_requests (queue_process_param * params,
- threaded_queue_thread_function *
- request_loop)
+threaded_queue::create_workers (const size_t initial_workers)
{
- if (params->start (request_loop, this) == false)
- exit (1);
- params->next =
- (queue_process_param *) InterlockedExchangePointer (&process_head,
- params);
+ assert (initial_workers > 0);
+
+ for (unsigned int i = 0; i != initial_workers; i++)
+ {
+ const long count = InterlockedIncrement (&_workers_count);
+ assert (count > 0);
+
+ DWORD tid;
+ const HANDLE hThread =
+ CreateThread (NULL, 0, start_routine, this, 0, &tid);
+
+ if (!hThread)
+ {
+ system_printf ("failed to create thread, error = %lu",
+ GetLastError ());
+ abort ();
+ }
+
+ (void) CloseHandle (hThread);
+ }
}
-/* queue_process_param */
-/* How does a constructor return an error? */
-queue_process_param::queue_process_param (bool ninterruptible):running (false), shutdown (false),
-interruptible
-(ninterruptible)
+void
+threaded_queue::worker_loop ()
{
- if (!interruptible)
- return;
- debug_printf ("creating an interruptible processing thread\n");
- if ((interrupt = CreateEvent (NULL, FALSE, FALSE, NULL)) == NULL)
+ while (true)
{
- printf ("Failed to create interrupt event (%lu), terminating\n",
- GetLastError ());
- exit (1);
+ const DWORD rc = WaitForSingleObject (_requests_sem, INFINITE);
+ if (rc == WAIT_FAILED)
+ {
+ system_printf ("wait for request semaphore failed, error = %lu",
+ GetLastError ());
+ return;
+ }
+ assert (rc == WAIT_OBJECT_0);
+
+ EnterCriticalSection (&_queue_lock);
+ if (!_running)
+ {
+ LeaveCriticalSection (&_queue_lock);
+ return;
+ }
+
+ assert (_requests_head);
+ queue_request *const reqptr = _requests_head;
+ _requests_head = reqptr->_next;
+
+ _requests_count -= 1;
+ assert (_requests_count >= 0);
+ LeaveCriticalSection (&_queue_lock);
+
+ assert (reqptr);
+ reqptr->process ();
+ safe_delete (reqptr);
+ }
+}
+
+/*****************************************************************************/
+
+/* queue_submission_loop */
+
+queue_submission_loop::queue_submission_loop (threaded_queue *const queue,
+ const bool ninterruptible)
+ : _running (false),
+ _interrupt_event (NULL),
+ _queue (queue),
+ _interruptible (ninterruptible),
+ _hThread (NULL),
+ _tid (0),
+ _next (NULL)
+{
+ if (_interruptible)
+ {
+ // verbose: debug_printf ("creating an interruptible processing thread");
+
+ _interrupt_event = CreateEvent (NULL, // SECURITY_ATTRIBUTES
+ FALSE, // Auto-reset
+ FALSE, // Initially non-signalled
+ NULL); // Anonymous
+
+ if (!_interrupt_event)
+ {
+ system_printf ("failed to create interrupt event, error = %lu",
+ GetLastError ());
+ abort ();
+ }
}
}
-queue_process_param::~queue_process_param ()
+queue_submission_loop::~queue_submission_loop ()
{
- if (running)
+ if (_running)
stop ();
- if (!interruptible)
- return;
- CloseHandle (interrupt);
+ if (_interrupt_event)
+ (void) CloseHandle (_interrupt_event);
+ if (_hThread)
+ (void) CloseHandle (_hThread);
}
bool
- queue_process_param::start (threaded_queue_thread_function * request_loop,
- threaded_queue * thequeue)
+queue_submission_loop::start ()
{
- queue = thequeue;
- hThread = CreateThread (NULL, 0, request_loop, this, 0, &tid);
- if (hThread)
+ assert (this);
+ assert (!_hThread);
+
+ const bool was_running = _running;
+
+ if (!was_running)
{
- running = true;
- return true;
+ _running = true;
+
+ _hThread = CreateThread (NULL, 0, start_routine, this, 0, &_tid);
+ if (!_hThread)
+ {
+ system_printf ("failed to create thread, error = %lu",
+ GetLastError ());
+ abort ();
+ }
}
- printf ("Failed to create thread (%lu), terminating\n", GetLastError ());
- return false;
+
+ return was_running;
}
-void
-queue_process_param::stop ()
+bool
+queue_submission_loop::stop ()
{
- if (interruptible)
+ assert (this);
+ assert (_hThread && _hThread != INVALID_HANDLE_VALUE);
+
+ const bool was_running = _running;
+
+ if (_running)
{
- InterlockedExchange (&shutdown, true);
- PulseEvent (interrupt);
- /* Wait up to 50 ms for the thread to exit. If it doesn't _and_ we get
- * scheduled again, we print an error and exit. We _should_ loop or
- * try resignalling. We don't want to hand here though...
- */
- int n = 5;
- while (n-- && WaitForSingleObject (hThread, 1000) == WAIT_TIMEOUT);
- if (!n)
+ _running = false;
+
+ if (_interruptible)
{
- printf ("Process thread didn't shutdown cleanly after 200ms!\n");
- exit (1);
+ assert (_interrupt_event
+ && _interrupt_event != INVALID_HANDLE_VALUE);
+
+ SetEvent (_interrupt_event);
+
+ if (WaitForSingleObject (_hThread, 1000) == WAIT_TIMEOUT)
+ {
+ system_printf (("request loop thread %lu failed to shutdown "
+ "when asked politely: about to get heavy"),
+ _tid);
+
+ if (!TerminateThread (_hThread, 0))
+ {
+ system_printf (("failed to kill request loop thread %lu"
+ ", error = %lu"),
+ _tid, GetLastError ());
+ abort ();
+ }
+ }
}
else
- running = false;
- }
- else
- {
- printf ("killing request loop thread %ld\n", tid);
- int rc;
- if (!(rc = TerminateThread (hThread, 0)))
{
- printf ("error shutting down request loop worker thread\n");
+ // FIXME: could wait to see if the request loop notices that
+ // the submission loop is no longer running and shuts down
+ // voluntarily.
+
+ debug_printf ("killing request loop thread %lu", _tid);
+
+ if (!TerminateThread (_hThread, 0))
+ system_printf (("failed to kill request loop thread %lu"
+ ", error = %lu"),
+ _tid, GetLastError ());
}
- running = false;
}
- CloseHandle (hThread);
-}
-/* queue_request */
-queue_request::queue_request ():next (NULL)
-{
+ return was_running;
}
-void
-queue_request::process (void)
+/*static*/ DWORD WINAPI
+queue_submission_loop::start_routine (const LPVOID lpParam)
{
- printf ("\n**********************************************\n"
- "Oh no! we've hit the base queue_request process() function, and this indicates a coding\n"
- "fault !!!\n" "***********************************************\n");
+ class queue_submission_loop *const submission_loop =
+ (class queue_submission_loop *) lpParam;
+ assert (submission_loop);
+
+ submission_loop->request_loop ();
+
+ debug_printf ("submission loop has exited; thread about to terminate");
+
+ submission_loop->stop ();
+
+ return 0;
}
+
+/*****************************************************************************/