diff options
author | Robert Collins <rbtcollins@hotmail.com> | 2001-10-04 08:21:06 +0400 |
---|---|---|
committer | Robert Collins <rbtcollins@hotmail.com> | 2001-10-04 08:21:06 +0400 |
commit | 02baf7ff76fe2853ff0e94dbea52164d60d67acf (patch) | |
tree | 95640029a7c90b863664c2d2b0c1d32e6ccdcf2b /winsup | |
parent | 5bea3af68fa57e4dc68b4c0ea61696d4df1b114b (diff) |
Thu Oct 4 14:12:00 2001 Robert Collins <rbtcollins@hotmail.com>
* cygserver.cc (request_loop): Make static.
(main): Use new cache constructor syntax.
Start cache worker threads.
Cleanup the cache at shutdown.
* cygserver_process.cc: Run indent.
(process_cache::process_cache): Add a trigger to use when adding a process.
(process_cache::process): Move process_entry to process.
Insert at the end of the list.
Trigger the request loop when new process's inserted.
(process_cache::process_requests): Do it.
(process_cache::add): New method.
(process_cache::handle_snapshot): New method.
(process::process): Merge in the process_entry fields.
(process::handle): Make a stub function.
(process::exit_code): New method.
(process_request::process): New method.
(process_process_param::request_loop): New method.
* cygserver_shm.cc: New header dependency - threaded_queue.h.
* threaded_queue.cc (threaded_queue::cleanup): Clearer messages.
(queue_process_param::stop): Short spinlock on interruptible threads.
* threaded_queue.h (class threaded_queue): New constructor.
* include/cygwin/cygserver_process.h (process_request): New class.
(process_entry): Remove.
(process): Merge in process_entry.
(process_cache): Inherit from threaded_queue.
Diffstat (limited to 'winsup')
-rw-r--r-- | winsup/cygserver/threaded_queue.cc | 7 | ||||
-rw-r--r-- | winsup/cygwin/ChangeLog | 28 | ||||
-rwxr-xr-x | winsup/cygwin/cygserver.cc | 26 | ||||
-rwxr-xr-x | winsup/cygwin/cygserver_process.cc | 248 | ||||
-rwxr-xr-x | winsup/cygwin/cygserver_shm.cc | 1 | ||||
-rwxr-xr-x | winsup/cygwin/include/cygwin/cygserver_process.h | 56 | ||||
-rwxr-xr-x | winsup/cygwin/threaded_queue.cc | 7 | ||||
-rwxr-xr-x | winsup/cygwin/threaded_queue.h | 1 |
8 files changed, 319 insertions, 55 deletions
diff --git a/winsup/cygserver/threaded_queue.cc b/winsup/cygserver/threaded_queue.cc index 0d6b65876..5fb22b191 100644 --- a/winsup/cygserver/threaded_queue.cc +++ b/winsup/cygserver/threaded_queue.cc @@ -113,7 +113,7 @@ threaded_queue::cleanup () LeaveCriticalSection (&queuelock); if (!running) return; - printf ("Waiting for current connections to terminate\n"); + printf ("Waiting for current queue threads to terminate\n"); for (int n = running; n; n--) PulseEvent (event); while (running) @@ -179,7 +179,6 @@ interruptible queue_process_param::~queue_process_param () { - debug_printf ("aaaargh\n"); if (running) stop (); if (!interruptible) @@ -213,7 +212,9 @@ queue_process_param::stop () * scheduled again, we print an error and exit. We _should_ loop or * try resignalling. We don't want to hand here though... */ - if (WaitForSingleObject (hThread, 200) == WAIT_TIMEOUT) + int n = 5; + while (n-- && WaitForSingleObject (hThread, 1000) == WAIT_TIMEOUT); + if (!n) { printf ("Process thread didn't shutdown cleanly after 200ms!\n"); exit (1); diff --git a/winsup/cygwin/ChangeLog b/winsup/cygwin/ChangeLog index 3b99e473f..67ff204a7 100644 --- a/winsup/cygwin/ChangeLog +++ b/winsup/cygwin/ChangeLog @@ -1,3 +1,31 @@ +Thu Oct 4 14:12:00 2001 Robert Collins <rbtcollins@hotmail.com> + + * cygserver.cc (request_loop): Make static. + (main): Use new cache constructor syntax. + Start cache worker threads. + Cleanup the cache at shutdown. + * cygserver_process.cc: Run indent. + (process_cache::process_cache): Add a trigger to use when adding a process. + (process_cache::process): Move process_entry to process. + Insert at the end of the list. + Trigger the request loop when new process's inserted. + (process_cache::process_requests): Do it. + (process_cache::add): New method. + (process_cache::handle_snapshot): New method. + (process::process): Merge in the process_entry fields. + (process::handle): Make a stub function. + (process::exit_code): New method. + (process_request::process): New method. + (process_process_param::request_loop): New method. + * cygserver_shm.cc: New header dependency - threaded_queue.h. + * threaded_queue.cc (threaded_queue::cleanup): Clearer messages. + (queue_process_param::stop): Short spinlock on interruptible threads. + * threaded_queue.h (class threaded_queue): New constructor. + * include/cygwin/cygserver_process.h (process_request): New class. + (process_entry): Remove. + (process): Merge in process_entry. + (process_cache): Inherit from threaded_queue. + Tue Oct 2 23:24:00 2001 Robert Collins <rbtcollins@hotmail.com> * cygserver.cc (class server_process_param): Use new constructor syntax. diff --git a/winsup/cygwin/cygserver.cc b/winsup/cygwin/cygserver.cc index c633e0d3a..b0b815beb 100755 --- a/winsup/cygwin/cygserver.cc +++ b/winsup/cygwin/cygserver.cc @@ -26,12 +26,11 @@ #include "cygwin/cygserver_transport.h" #include "cygwin/cygserver_transport_pipes.h" #include "cygwin/cygserver_transport_sockets.h" +#include "threaded_queue.h" #include "cygwin/cygserver_process.h" #include "cygwin/cygserver.h" #include "cygserver_shm.h" -#include "threaded_queue.h" - /* for quieter operation, set to 0 */ #define DEBUG 0 #define debug_printf if (DEBUG) printf @@ -301,7 +300,7 @@ class server_request_queue : public threaded_queue }; class server_request_queue request_queue; -DWORD WINAPI +static DWORD WINAPI request_loop (LPVOID LpParam) { class server_process_param *params = (server_process_param *) LpParam; @@ -429,6 +428,8 @@ server_request_queue::add (transport_layer_base *conn) { conn->close (); delete conn; + LeaveCriticalSection (&queuelock); + return; } queue_request * listrequest = new server_request (conn, cache); threaded_queue::add (listrequest); @@ -516,12 +517,16 @@ main (int argc, char **argv) printf ("."); transport->listen (); printf ("."); - class process_cache cache; + class process_cache cache (2); request_queue.initial_workers = 10; request_queue.cache = &cache; request_queue.create_workers (); printf ("."); request_queue.process_requests (transport); + printf ("."); + cache.create_workers (); + printf ("."); + cache.process_requests (); printf (".complete\n"); /* TODO: wait on multiple objects - the thread handle for each request loop + * all the process handles. This should be done by querying the request_queue and @@ -531,12 +536,19 @@ main (int argc, char **argv) * and pipes simply by making a new transport, and then calling * request_queue.process_requests (transport2); */ + /* WaitForMultipleObjects abort && request_queue && process_queue && signal + -- if signal event then retrigger it + */ while (1 && request_queue.active) - sleep (1); - printf ("\nShutdown request recieved. No longer accepting requests.\n"); + { + sleep (1); + } + printf ("\nShutdown request recieved - new requests will be denied\n"); request_queue.cleanup (); printf ("All pending requests processed\n"); transport->close (); - printf ("request port closed\n"); + printf ("No longer accepting requests - cygwin will operate in daemonless mode\n"); + cache.cleanup (); + printf ("All outstanding process-cache activities completed\n"); printf ("daemon shutdown\n"); } diff --git a/winsup/cygwin/cygserver_process.cc b/winsup/cygwin/cygserver_process.cc index 27e7d3129..fe5b5b80b 100755 --- a/winsup/cygwin/cygserver_process.cc +++ b/winsup/cygwin/cygserver_process.cc @@ -13,12 +13,14 @@ #include <errno.h> #include <stdio.h> #include <unistd.h> +#include <stdlib.h> #include <windows.h> #include <sys/types.h> #include <sys/socket.h> #include <netdb.h> #include "wincap.h" #include <pthread.h> +#include <threaded_queue.h> #include <cygwin/cygserver_process.h> #define debug_printf if (DEBUG) printf @@ -29,10 +31,18 @@ */ /* process cache */ -process_cache::process_cache () : head (NULL) +process_cache::process_cache (unsigned int num_initial_workers): +head (NULL) { /* there can only be one */ InitializeCriticalSection (&cache_write_access); + if ((cache_add_trigger = CreateEvent (NULL, FALSE, FALSE, NULL)) == NULL) + { + printf ("Failed to create cache add trigger (%lu), terminating\n", + GetLastError ()); + exit (1); + } + initial_workers = num_initial_workers; } process_cache::~process_cache () @@ -42,23 +52,99 @@ process_cache::~process_cache () class process * process_cache::process (long pid) { - class process_entry *entry = head; + class process *entry = head; /* TODO: make this more granular, so a search doesn't involve the write lock */ EnterCriticalSection (&cache_write_access); - while (entry && entry->process.winpid != pid) - entry = entry->next; if (!entry) { - entry = new process_entry (pid); - entry->next = (process_entry *) InterlockedExchangePointer (&head, entry); + entry = new class process (pid); + entry->next = + (class process *) InterlockedExchangePointer (&head, entry); + PulseEvent (cache_add_trigger); + } + else + { + while (entry->winpid != pid && entry->next) + entry = entry->next; + if (entry->winpid != pid) + { + class process *new_entry = new class process (pid); + new_entry->next = + (class process *) InterlockedExchangePointer (&entry->next, + new_entry); + entry = new_entry; + PulseEvent (cache_add_trigger); + } } LeaveCriticalSection (&cache_write_access); - return &entry->process; + return entry; +} + +static DWORD WINAPI +request_loop (LPVOID LpParam) +{ + class process_process_param *params = (process_process_param *) LpParam; + return params->request_loop (); +} + +void +process_cache::process_requests () +{ + class process_process_param *params = new process_process_param; + threaded_queue::process_requests (params, request_loop); } -/* process cache entries */ -process_entry::process_entry (long pid) : next (NULL), process (pid) +void +process_cache::add () { + /* safe to not "Try" because workers don't hog this, they wait on the event + */ + /* every derived ::add must enter the section! */ + EnterCriticalSection (&queuelock); + queue_request *listrequest = new process_request; + threaded_queue::add (listrequest); + LeaveCriticalSection (&queuelock); +} + + +/* copy <= max_copy HANDLEs to dest[], starting at an offset into _our list_ of + * begin_at. (Ie begin_at = 5, the first copied handle is still written to dest[0] + * NOTE: Thread safe, but not thread guaranteed - a newly added process may be missed. + * Who cares - It'll get caught the next time. + */ +int +process_cache::handle_snapshot (HANDLE * hdest, class process ** edest, + ssize_t max_copy, int begin_at) +{ + /* TODO:? grab a delete-lock, to prevent deletes during this process ? */ + class process *entry = head; + int count = begin_at; + /* skip begin_at entries */ + while (entry && count) + { + if (entry->exit_code () == STILL_ACTIVE) + count--; + entry = entry->next; + } + /* hit the end of the list within begin_at entries */ + if (count) + return 0; + HANDLE *hto = hdest; + class process **eto = edest; + while (entry && count < max_copy) + { + /* hack */ + if (entry->exit_code () == STILL_ACTIVE) + { + *hto = entry->handle (); + *eto = entry; + count++; + hto++; + eto++; + } + entry = entry->next; + } + return count; } /* process's */ @@ -72,11 +158,17 @@ do_process_init (void) /* we don't have a cache shutdown capability today */ } -process::process (long pid) : winpid (pid) +process::process (long pid): +winpid (pid), next (NULL), _exit_status (STILL_ACTIVE) { - pthread_once(&process_init, do_process_init); + pthread_once (&process_init, do_process_init); EnterCriticalSection (&process_access); thehandle = OpenProcess (PROCESS_ALL_ACCESS, FALSE, pid); + if (!thehandle) + { + printf ("unable to obtain handle for new cache process %ld\n", pid); + thehandle = INVALID_HANDLE_VALUE; + } debug_printf ("Got handle %p for new cache process %ld\n", thehandle, pid); LeaveCriticalSection (&process_access); } @@ -84,21 +176,133 @@ process::process (long pid) : winpid (pid) HANDLE process::handle () { - unsigned long exitstate; - bool err = GetExitCodeProcess (thehandle, &exitstate); +// DWORD exitstate = exit_code (); +// if (exitstate == STILL_ACTIVE) + return thehandle; + + /* FIXME: call the cleanup list ? */ + +// CloseHandle (thehandle); +// debug_printf ("Process id %ld has terminated, attempting to open a new handle\n", +// winpid); +// thehandle = OpenProcess (PROCESS_ALL_ACCESS, FALSE, winpid); +// debug_printf ("Got handle %p when refreshing cache process %ld\n", thehandle, winpid); +// /* FIXME: what if OpenProcess fails ? */ +// if (thehandle) +// { +// _exit_status = STILL_ACTIVE; +// exit_code (); +// } +// else +// thehandle = INVALID_HANDLE_VALUE; +// return thehandle; +} + +DWORD process::exit_code () +{ + if (_exit_status != STILL_ACTIVE) + return _exit_status; + bool + err = GetExitCodeProcess (thehandle, &_exit_status); if (!err) { - /* FIXME: */ + debug_printf ("Failed to retrieve exit code (%ld)\n", GetLastError ()); thehandle = INVALID_HANDLE_VALUE; - return INVALID_HANDLE_VALUE; + return _exit_status; } - if (exitstate != STILL_ACTIVE) + else if (_exit_status == STILL_ACTIVE) + return _exit_status; + /* add new cleanup task etc etc ? */ + return _exit_status; +} + +/* process_request */ +void +process_request::process () +{ + printf ("processing...\n"); +} + +/* process_process_param */ +DWORD +process_process_param::request_loop () +{ + process_cache *cache = (process_cache *) queue; + /* always malloc one, so there is no special case in the loop */ + ssize_t HandlesSize = 2; + HANDLE *Handles = (HANDLE *) malloc (sizeof (HANDLE) * HandlesSize); + process **Entries = (process **) malloc (sizeof (LPVOID) * HandlesSize); + /* TODO: put [1] at the end as it will also get done if a process dies? */ + Handles[0] = interrupt; + Handles[1] = cache->cache_add_trigger; + while (cache->active && !shutdown) { - /* FIXME: call the cleanup list */ - CloseHandle (thehandle); - debug_printf ("Process id %ld has terminated, attempting to open a new handle\n", winpid); - thehandle = OpenProcess (PROCESS_ALL_ACCESS, FALSE, winpid); - /* FIXME: what if this fails */ + int copied; + copied = -1; + int offset; + offset = 1; + int count; + count = 2; + while ((copied == HandlesSize - 2 - offset) || copied < 0) + { + /* we need more storage to cope with all the HANDLES */ + if (copied == HandlesSize - 2 - offset) + { + HANDLE *temp = (HANDLE *) realloc (Handles, + sizeof (HANDLE) * + HandlesSize + 10); + if (!temp) + { + printf + ("cannot allocate more storage for the handle array!\n"); + exit (1); + } + Handles = temp; + process **ptemp = (process **) realloc (Entries, + sizeof (LPVOID) * + HandlesSize + 10); + if (!ptemp) + { + printf + ("cannot allocate more storage for the handle array!\n"); + exit (1); + } + Entries = ptemp; + HandlesSize += 10; + } + offset += copied; + copied = + cache->handle_snapshot (&Handles[2], &Entries[2], + HandlesSize - 2 - offset, offset); + count += copied; + } + debug_printf ("waiting on %u objects\n", count); + DWORD rc = WaitForMultipleObjects (count, Handles, FALSE, INFINITE); + if (rc == WAIT_FAILED) + { + printf ("Could not wait on the process handles (%ld)!\n", + GetLastError ()); + exit (1); + } + int objindex = rc - WAIT_OBJECT_0; + if (objindex > 1 && objindex < count) + { + debug_printf ("Process %ld has left the building\n", + Entries[objindex]->winpid); + /* fire off the termination routines */ + + } + else if (objindex >= 0 && objindex < 2) + { + /* 0 is shutdown - do nothing */ + /* 1 is a cache add event - just rebuild the object list */ + } + else + { + printf + ("unexpected return code from WaitForMultiple objects in process_process_param::request_loop\n"); + } } - return thehandle; + running = false; + return 0; } diff --git a/winsup/cygwin/cygserver_shm.cc b/winsup/cygwin/cygserver_shm.cc index 491b01744..134f810e6 100755 --- a/winsup/cygwin/cygserver_shm.cc +++ b/winsup/cygwin/cygserver_shm.cc @@ -45,6 +45,7 @@ #endif //#include "perprocess.h" #include "cygserver_shm.h" +#include <threaded_queue.h> #include <cygwin/cygserver_process.h> // FIXME IS THIS CORRECT diff --git a/winsup/cygwin/include/cygwin/cygserver_process.h b/winsup/cygwin/include/cygwin/cygserver_process.h index 0f60acead..75cae51a7 100755 --- a/winsup/cygwin/include/cygwin/cygserver_process.h +++ b/winsup/cygwin/include/cygwin/cygserver_process.h @@ -13,33 +13,49 @@ #ifndef _CYGSERVER_PROCESS_ #define _CYGSERVER_PROCESS_ -class process +/* needs threaded_queue.h */ + +class process_request:public queue_request +{ +public: + virtual void process (); +}; + +class process_process_param:public queue_process_param { - public: - HANDLE handle (); - long winpid; - process (long); - private: - HANDLE thehandle; + class process_cache *cache; +public: + DWORD request_loop (); + process_process_param ():queue_process_param (true) {}; }; -class process_entry +class process { - public: - class process_entry *next; - class process process; - process_entry (long); +public: + HANDLE handle (); + long winpid; + process (long); + DWORD exit_code (); + class process * next; +private: + HANDLE thehandle; + DWORD _exit_status; }; -class process_cache +class process_cache:public threaded_queue { - public: - process_cache (); - ~process_cache (); - class process *process (long); - private: - class process_entry *head; - CRITICAL_SECTION cache_write_access; +public: + process_cache (unsigned int initial_workers); + virtual ~ process_cache (); + class process *process (long); + int handle_snapshot (HANDLE *, class process **, ssize_t, int); + /* threaded_queue methods */ + void process_requests (); + HANDLE cache_add_trigger; +private: + virtual void add (); + class process *head; + CRITICAL_SECTION cache_write_access; }; #endif /* _CYGSERVER_PROCESS_ */ diff --git a/winsup/cygwin/threaded_queue.cc b/winsup/cygwin/threaded_queue.cc index 0d6b65876..5fb22b191 100755 --- a/winsup/cygwin/threaded_queue.cc +++ b/winsup/cygwin/threaded_queue.cc @@ -113,7 +113,7 @@ threaded_queue::cleanup () LeaveCriticalSection (&queuelock); if (!running) return; - printf ("Waiting for current connections to terminate\n"); + printf ("Waiting for current queue threads to terminate\n"); for (int n = running; n; n--) PulseEvent (event); while (running) @@ -179,7 +179,6 @@ interruptible queue_process_param::~queue_process_param () { - debug_printf ("aaaargh\n"); if (running) stop (); if (!interruptible) @@ -213,7 +212,9 @@ queue_process_param::stop () * scheduled again, we print an error and exit. We _should_ loop or * try resignalling. We don't want to hand here though... */ - if (WaitForSingleObject (hThread, 200) == WAIT_TIMEOUT) + int n = 5; + while (n-- && WaitForSingleObject (hThread, 1000) == WAIT_TIMEOUT); + if (!n) { printf ("Process thread didn't shutdown cleanly after 200ms!\n"); exit (1); diff --git a/winsup/cygwin/threaded_queue.h b/winsup/cygwin/threaded_queue.h index d76baf086..b330d96ac 100755 --- a/winsup/cygwin/threaded_queue.h +++ b/winsup/cygwin/threaded_queue.h @@ -59,6 +59,7 @@ class threaded_queue void cleanup (); void add (queue_request *); void process_requests (queue_process_param *, threaded_queue_thread_function *); + threaded_queue () : active (false), request (NULL), initial_workers (1), running (0), process_head (NULL) {}; private: queue_request *process_head; }; |