diff options
Diffstat (limited to 'winsup')
-rw-r--r-- | winsup/cygserver/threaded_queue.cc | 169 | ||||
-rw-r--r-- | winsup/cygwin/ChangeLog | 17 | ||||
-rwxr-xr-x | winsup/cygwin/cygserver.cc | 1 | ||||
-rwxr-xr-x | winsup/cygwin/cygserver_process.cc | 4 | ||||
-rwxr-xr-x | winsup/cygwin/include/cygwin/cygserver_process.h | 1 | ||||
-rwxr-xr-x | winsup/cygwin/threaded_queue.cc | 169 | ||||
-rwxr-xr-x | winsup/cygwin/threaded_queue.h | 12 |
7 files changed, 273 insertions, 100 deletions
diff --git a/winsup/cygserver/threaded_queue.cc b/winsup/cygserver/threaded_queue.cc index d01be1833..0d6b65876 100644 --- a/winsup/cygserver/threaded_queue.cc +++ b/winsup/cygserver/threaded_queue.cc @@ -17,11 +17,13 @@ #include <sys/types.h> #include "wincap.h" #include "threaded_queue.h" +#define DEBUG 1 +#define debug_printf if (DEBUG) printf /* threaded_queue */ DWORD WINAPI -worker_function( LPVOID LpParam ) +worker_function (LPVOID LpParam) { class threaded_queue *queue = (class threaded_queue *) LpParam; class queue_request *request; @@ -30,26 +32,28 @@ worker_function( LPVOID LpParam ) { EnterCriticalSection (&queue->queuelock); while (!queue->request && queue->active) - { - LeaveCriticalSection (&queue->queuelock); - DWORD rc = WaitForSingleObject (queue->event,INFINITE); - if (rc == WAIT_FAILED) - { - printf("Wait for event failed\n"); - queue->running--; - ExitThread (0); - } - EnterCriticalSection (&queue->queuelock); - } + { + LeaveCriticalSection (&queue->queuelock); + DWORD rc = WaitForSingleObject (queue->event, INFINITE); + if (rc == WAIT_FAILED) + { + printf ("Wait for event failed\n"); + queue->running--; + ExitThread (0); + } + EnterCriticalSection (&queue->queuelock); + } if (!queue->active) - { - queue->running--; - LeaveCriticalSection (&queue->queuelock); - ExitThread (0); - } + { + queue->running--; + LeaveCriticalSection (&queue->queuelock); + ExitThread (0); + } /* not needed, but it is efficient */ - request = (class queue_request *)InterlockedExchangePointer (&queue->request, queue -->request->next); + request = + (class queue_request *) InterlockedExchangePointer (&queue->request, + queue->request-> + next); LeaveCriticalSection (&queue->queuelock); request->process (); delete request; @@ -59,12 +63,13 @@ worker_function( LPVOID LpParam ) } void -threaded_queue::create_workers() +threaded_queue::create_workers () { InitializeCriticalSection (&queuelock); if ((event = CreateEvent (NULL, FALSE, FALSE, NULL)) == NULL) { - printf ("Failed to create event queue (%lu), terminating\n", GetLastError ()); + printf ("Failed to create event queue (%lu), terminating\n", + GetLastError ()); exit (1); } active = true; @@ -72,16 +77,17 @@ threaded_queue::create_workers() /* FIXME: Use a stack pair and create threads on the fly whenever * we have to to service a request. */ - for (unsigned int i=0; i<initial_workers; i++) + for (unsigned int i = 0; i < initial_workers; i++) { HANDLE hThread; DWORD tid; hThread = CreateThread (NULL, 0, worker_function, this, 0, &tid); if (hThread == NULL) - { - printf ("Failed to create thread (%lu), terminating\n", GetLastError ()); - exit (1); - } + { + printf ("Failed to create thread (%lu), terminating\n", + GetLastError ()); + exit (1); + } CloseHandle (hThread); running++; } @@ -93,19 +99,13 @@ threaded_queue::cleanup () /* harvest the threads */ active = false; /* kill the request processing loops */ - queue_process_param * reqloop; + queue_process_param *reqloop; /* make sure we don't race with a incoming request creation */ EnterCriticalSection (&queuelock); - reqloop = (queue_process_param *) InterlockedExchangePointer (&process_head, NULL); + reqloop = + (queue_process_param *) InterlockedExchangePointer (&process_head, NULL); while (reqloop) { - printf ("killing request loop thread %ld\n", reqloop->tid); - int rc; - if (!(rc = TerminateThread (reqloop->hThread, 0))) - { - printf ("error shutting down request loop worker thread\n"); - } - CloseHandle (reqloop->hThread); queue_process_param *t = reqloop; reqloop = reqloop->next; delete t; @@ -114,7 +114,7 @@ threaded_queue::cleanup () if (!running) return; printf ("Waiting for current connections to terminate\n"); - for (int n=running; n; n--) + for (int n = running; n; n--) PulseEvent (event); while (running) sleep (1); @@ -124,7 +124,7 @@ threaded_queue::cleanup () /* FIXME: return success or failure */ void -threaded_queue::add (queue_request *therequest) +threaded_queue::add (queue_request * therequest) { /* safe to not "Try" because workers don't hog this, they wait on the event */ @@ -134,13 +134,13 @@ threaded_queue::add (queue_request *therequest) printf ("No worker threads to handle request!\n"); } if (!request) - request = therequest; + request = therequest; else { /* add to the queue end. */ queue_request *listrequest = request; while (listrequest->next) - listrequest = listrequest->next; + listrequest = listrequest->next; listrequest->next = therequest; } PulseEvent (event); @@ -149,21 +149,93 @@ threaded_queue::add (queue_request *therequest) /* FIXME: return success or failure rather than quitting */ void -threaded_queue::process_requests (queue_process_param *params, threaded_queue_thread_function *request_loop) +threaded_queue::process_requests (queue_process_param * params, + threaded_queue_thread_function * + request_loop) +{ + if (params->start (request_loop, this) == false) + exit (1); + params->next = + (queue_process_param *) InterlockedExchangePointer (&process_head, + params); +} + +/* queue_process_param */ +/* How does a constructor return an error? */ +queue_process_param::queue_process_param (bool ninterruptible):running (false), shutdown (false), +interruptible +(ninterruptible) { - params->queue = this; - params->hThread = CreateThread (NULL, 0, request_loop, params, 0, ¶ms->tid); - if (params->hThread == NULL) + if (!interruptible) + return; + debug_printf ("creating an interruptible processing thread\n"); + if ((interrupt = CreateEvent (NULL, FALSE, FALSE, NULL)) == NULL) { - printf ("Failed to create thread (%lu), terminating\n", GetLastError ()); + printf ("Failed to create interrupt event (%lu), terminating\n", + GetLastError ()); exit (1); } - params->next = (queue_process_param *) InterlockedExchangePointer (&process_head, params -); +} + +queue_process_param::~queue_process_param () +{ + debug_printf ("aaaargh\n"); + if (running) + stop (); + if (!interruptible) + return; + CloseHandle (interrupt); +} + +bool + queue_process_param::start (threaded_queue_thread_function * request_loop, + threaded_queue * thequeue) +{ + queue = thequeue; + hThread = CreateThread (NULL, 0, request_loop, this, 0, &tid); + if (hThread) + { + running = true; + return true; + } + printf ("Failed to create thread (%lu), terminating\n", GetLastError ()); + return false; +} + +void +queue_process_param::stop () +{ + if (interruptible) + { + InterlockedExchange (&shutdown, true); + PulseEvent (interrupt); + /* Wait up to 50 ms for the thread to exit. If it doesn't _and_ we get + * scheduled again, we print an error and exit. We _should_ loop or + * try resignalling. We don't want to hand here though... + */ + if (WaitForSingleObject (hThread, 200) == WAIT_TIMEOUT) + { + printf ("Process thread didn't shutdown cleanly after 200ms!\n"); + exit (1); + } + else + running = false; + } + else + { + printf ("killing request loop thread %ld\n", tid); + int rc; + if (!(rc = TerminateThread (hThread, 0))) + { + printf ("error shutting down request loop worker thread\n"); + } + running = false; + } + CloseHandle (hThread); } /* queue_request */ -queue_request::queue_request () : next (NULL) +queue_request::queue_request ():next (NULL) { } @@ -172,6 +244,5 @@ queue_request::process (void) { printf ("\n**********************************************\n" "Oh no! we've hit the base queue_request process() function, and this indicates a coding\n" - "fault !!!\n" - "***********************************************\n"); + "fault !!!\n" "***********************************************\n"); } diff --git a/winsup/cygwin/ChangeLog b/winsup/cygwin/ChangeLog index 0a8f57db1..3b99e473f 100644 --- a/winsup/cygwin/ChangeLog +++ b/winsup/cygwin/ChangeLog @@ -1,5 +1,22 @@ Tue Oct 2 23:24:00 2001 Robert Collins <rbtcollins@hotmail.com> + * cygserver.cc (class server_process_param): Use new constructor syntax. + * cygserver_process.cc (process_cache::~process_cache): New function. + * threaded_queue.cc: Define condition debug_printf. + Run indent. + (threaded_queue::cleanup): Move queue_process_param guts to a method. + (threaded_queue::process_requests): Ditto. + (queue_process_param::queue_process_param): New method. + (queue_process_param::~queue_process_param): Ditto. + (queue_process_param::start): Ditto. + (queue_process_param::stop): Ditto. + * threaded_queue.h (class queue_process_param): Add support for + interruptible request loops. + * cygwin/include/cygwin/cygserver_process.h (class process_cache): Add + destructor. + +Tue Oct 2 23:24:00 2001 Robert Collins <rbtcollins@hotmail.com> + * cygserver_client.cc: New flag allow_daemon to disable the daemon completely. (cygserver_request): Check it. (cygserver_init): Ditto. diff --git a/winsup/cygwin/cygserver.cc b/winsup/cygwin/cygserver.cc index f4e2b852a..c633e0d3a 100755 --- a/winsup/cygwin/cygserver.cc +++ b/winsup/cygwin/cygserver.cc @@ -289,6 +289,7 @@ class server_process_param : public queue_process_param { public: transport_layer_base *transport; + server_process_param () : queue_process_param (false) {}; }; class server_request_queue : public threaded_queue diff --git a/winsup/cygwin/cygserver_process.cc b/winsup/cygwin/cygserver_process.cc index d02905b03..27e7d3129 100755 --- a/winsup/cygwin/cygserver_process.cc +++ b/winsup/cygwin/cygserver_process.cc @@ -35,6 +35,10 @@ process_cache::process_cache () : head (NULL) InitializeCriticalSection (&cache_write_access); } +process_cache::~process_cache () +{ +} + class process * process_cache::process (long pid) { diff --git a/winsup/cygwin/include/cygwin/cygserver_process.h b/winsup/cygwin/include/cygwin/cygserver_process.h index 35abf48bd..0f60acead 100755 --- a/winsup/cygwin/include/cygwin/cygserver_process.h +++ b/winsup/cygwin/include/cygwin/cygserver_process.h @@ -35,6 +35,7 @@ class process_cache { public: process_cache (); + ~process_cache (); class process *process (long); private: class process_entry *head; diff --git a/winsup/cygwin/threaded_queue.cc b/winsup/cygwin/threaded_queue.cc index d01be1833..0d6b65876 100755 --- a/winsup/cygwin/threaded_queue.cc +++ b/winsup/cygwin/threaded_queue.cc @@ -17,11 +17,13 @@ #include <sys/types.h> #include "wincap.h" #include "threaded_queue.h" +#define DEBUG 1 +#define debug_printf if (DEBUG) printf /* threaded_queue */ DWORD WINAPI -worker_function( LPVOID LpParam ) +worker_function (LPVOID LpParam) { class threaded_queue *queue = (class threaded_queue *) LpParam; class queue_request *request; @@ -30,26 +32,28 @@ worker_function( LPVOID LpParam ) { EnterCriticalSection (&queue->queuelock); while (!queue->request && queue->active) - { - LeaveCriticalSection (&queue->queuelock); - DWORD rc = WaitForSingleObject (queue->event,INFINITE); - if (rc == WAIT_FAILED) - { - printf("Wait for event failed\n"); - queue->running--; - ExitThread (0); - } - EnterCriticalSection (&queue->queuelock); - } + { + LeaveCriticalSection (&queue->queuelock); + DWORD rc = WaitForSingleObject (queue->event, INFINITE); + if (rc == WAIT_FAILED) + { + printf ("Wait for event failed\n"); + queue->running--; + ExitThread (0); + } + EnterCriticalSection (&queue->queuelock); + } if (!queue->active) - { - queue->running--; - LeaveCriticalSection (&queue->queuelock); - ExitThread (0); - } + { + queue->running--; + LeaveCriticalSection (&queue->queuelock); + ExitThread (0); + } /* not needed, but it is efficient */ - request = (class queue_request *)InterlockedExchangePointer (&queue->request, queue -->request->next); + request = + (class queue_request *) InterlockedExchangePointer (&queue->request, + queue->request-> + next); LeaveCriticalSection (&queue->queuelock); request->process (); delete request; @@ -59,12 +63,13 @@ worker_function( LPVOID LpParam ) } void -threaded_queue::create_workers() +threaded_queue::create_workers () { InitializeCriticalSection (&queuelock); if ((event = CreateEvent (NULL, FALSE, FALSE, NULL)) == NULL) { - printf ("Failed to create event queue (%lu), terminating\n", GetLastError ()); + printf ("Failed to create event queue (%lu), terminating\n", + GetLastError ()); exit (1); } active = true; @@ -72,16 +77,17 @@ threaded_queue::create_workers() /* FIXME: Use a stack pair and create threads on the fly whenever * we have to to service a request. */ - for (unsigned int i=0; i<initial_workers; i++) + for (unsigned int i = 0; i < initial_workers; i++) { HANDLE hThread; DWORD tid; hThread = CreateThread (NULL, 0, worker_function, this, 0, &tid); if (hThread == NULL) - { - printf ("Failed to create thread (%lu), terminating\n", GetLastError ()); - exit (1); - } + { + printf ("Failed to create thread (%lu), terminating\n", + GetLastError ()); + exit (1); + } CloseHandle (hThread); running++; } @@ -93,19 +99,13 @@ threaded_queue::cleanup () /* harvest the threads */ active = false; /* kill the request processing loops */ - queue_process_param * reqloop; + queue_process_param *reqloop; /* make sure we don't race with a incoming request creation */ EnterCriticalSection (&queuelock); - reqloop = (queue_process_param *) InterlockedExchangePointer (&process_head, NULL); + reqloop = + (queue_process_param *) InterlockedExchangePointer (&process_head, NULL); while (reqloop) { - printf ("killing request loop thread %ld\n", reqloop->tid); - int rc; - if (!(rc = TerminateThread (reqloop->hThread, 0))) - { - printf ("error shutting down request loop worker thread\n"); - } - CloseHandle (reqloop->hThread); queue_process_param *t = reqloop; reqloop = reqloop->next; delete t; @@ -114,7 +114,7 @@ threaded_queue::cleanup () if (!running) return; printf ("Waiting for current connections to terminate\n"); - for (int n=running; n; n--) + for (int n = running; n; n--) PulseEvent (event); while (running) sleep (1); @@ -124,7 +124,7 @@ threaded_queue::cleanup () /* FIXME: return success or failure */ void -threaded_queue::add (queue_request *therequest) +threaded_queue::add (queue_request * therequest) { /* safe to not "Try" because workers don't hog this, they wait on the event */ @@ -134,13 +134,13 @@ threaded_queue::add (queue_request *therequest) printf ("No worker threads to handle request!\n"); } if (!request) - request = therequest; + request = therequest; else { /* add to the queue end. */ queue_request *listrequest = request; while (listrequest->next) - listrequest = listrequest->next; + listrequest = listrequest->next; listrequest->next = therequest; } PulseEvent (event); @@ -149,21 +149,93 @@ threaded_queue::add (queue_request *therequest) /* FIXME: return success or failure rather than quitting */ void -threaded_queue::process_requests (queue_process_param *params, threaded_queue_thread_function *request_loop) +threaded_queue::process_requests (queue_process_param * params, + threaded_queue_thread_function * + request_loop) +{ + if (params->start (request_loop, this) == false) + exit (1); + params->next = + (queue_process_param *) InterlockedExchangePointer (&process_head, + params); +} + +/* queue_process_param */ +/* How does a constructor return an error? */ +queue_process_param::queue_process_param (bool ninterruptible):running (false), shutdown (false), +interruptible +(ninterruptible) { - params->queue = this; - params->hThread = CreateThread (NULL, 0, request_loop, params, 0, ¶ms->tid); - if (params->hThread == NULL) + if (!interruptible) + return; + debug_printf ("creating an interruptible processing thread\n"); + if ((interrupt = CreateEvent (NULL, FALSE, FALSE, NULL)) == NULL) { - printf ("Failed to create thread (%lu), terminating\n", GetLastError ()); + printf ("Failed to create interrupt event (%lu), terminating\n", + GetLastError ()); exit (1); } - params->next = (queue_process_param *) InterlockedExchangePointer (&process_head, params -); +} + +queue_process_param::~queue_process_param () +{ + debug_printf ("aaaargh\n"); + if (running) + stop (); + if (!interruptible) + return; + CloseHandle (interrupt); +} + +bool + queue_process_param::start (threaded_queue_thread_function * request_loop, + threaded_queue * thequeue) +{ + queue = thequeue; + hThread = CreateThread (NULL, 0, request_loop, this, 0, &tid); + if (hThread) + { + running = true; + return true; + } + printf ("Failed to create thread (%lu), terminating\n", GetLastError ()); + return false; +} + +void +queue_process_param::stop () +{ + if (interruptible) + { + InterlockedExchange (&shutdown, true); + PulseEvent (interrupt); + /* Wait up to 50 ms for the thread to exit. If it doesn't _and_ we get + * scheduled again, we print an error and exit. We _should_ loop or + * try resignalling. We don't want to hand here though... + */ + if (WaitForSingleObject (hThread, 200) == WAIT_TIMEOUT) + { + printf ("Process thread didn't shutdown cleanly after 200ms!\n"); + exit (1); + } + else + running = false; + } + else + { + printf ("killing request loop thread %ld\n", tid); + int rc; + if (!(rc = TerminateThread (hThread, 0))) + { + printf ("error shutting down request loop worker thread\n"); + } + running = false; + } + CloseHandle (hThread); } /* queue_request */ -queue_request::queue_request () : next (NULL) +queue_request::queue_request ():next (NULL) { } @@ -172,6 +244,5 @@ queue_request::process (void) { printf ("\n**********************************************\n" "Oh no! we've hit the base queue_request process() function, and this indicates a coding\n" - "fault !!!\n" - "***********************************************\n"); + "fault !!!\n" "***********************************************\n"); } diff --git a/winsup/cygwin/threaded_queue.h b/winsup/cygwin/threaded_queue.h index 432218d10..d76baf086 100755 --- a/winsup/cygwin/threaded_queue.h +++ b/winsup/cygwin/threaded_queue.h @@ -23,19 +23,27 @@ class queue_request queue_request(); }; + +typedef DWORD WINAPI threaded_queue_thread_function (LPVOID); /* parameters for a request finding and submitting loop */ class queue_process_param { public: + bool start (threaded_queue_thread_function *, class threaded_queue *); + void stop (); + bool running; + long int shutdown; class queue_process_param * next; class threaded_queue *queue; + queue_process_param (bool ninterruptible); + ~queue_process_param (); + bool interruptible; + HANDLE interrupt; HANDLE hThread; DWORD tid; }; -typedef DWORD WINAPI threaded_queue_thread_function (LPVOID); - /* a queue to allocate requests from n submission loops to x worker threads */ class threaded_queue |