diff options
Diffstat (limited to 'winsup/cygwin/fhandler_mqueue.cc')
-rw-r--r-- | winsup/cygwin/fhandler_mqueue.cc | 436 |
1 files changed, 435 insertions, 1 deletions
diff --git a/winsup/cygwin/fhandler_mqueue.cc b/winsup/cygwin/fhandler_mqueue.cc index c450c0337..28aae314e 100644 --- a/winsup/cygwin/fhandler_mqueue.cc +++ b/winsup/cygwin/fhandler_mqueue.cc @@ -11,6 +11,7 @@ details. */ #include "path.h" #include "fhandler.h" #include "dtable.h" +#include "clock.h" #include <mqueue.h> #include <sys/param.h> @@ -137,7 +138,7 @@ fhandler_mqueue::_mqinfo (SIZE_T filesize, mode_t mode, int flags, get pagesize aligned, which breaks the next NtMapViewOfSection in fork. */ mqinfo ()->mqi_sectsize = filesize; mqinfo ()->mqi_mode = mode; - mqinfo ()->mqi_flags = flags; + set_nonblocking (flags & O_NONBLOCK); __small_swprintf (buf, L"mqueue/mtx%s", get_name ()); RtlInitUnicodeString (&uname, buf); @@ -426,3 +427,436 @@ fhandler_mqueue::close () __endtry return 0; } + +int +fhandler_mqueue::mutex_lock (HANDLE mtx, bool eintr) +{ + switch (cygwait (mtx, cw_infinite, cw_cancel | cw_cancel_self + | (eintr ? cw_sig_eintr : cw_sig_restart))) + { + case WAIT_OBJECT_0: + case WAIT_ABANDONED_0: + return 0; + case WAIT_SIGNALED: + set_errno (EINTR); + return 1; + default: + break; + } + return geterrno_from_win_error (); +} + +int +fhandler_mqueue::mutex_unlock (HANDLE mtx) +{ + return ReleaseMutex (mtx) ? 0 : geterrno_from_win_error (); +} + +int +fhandler_mqueue::cond_timedwait (HANDLE evt, HANDLE mtx, + const struct timespec *abstime) +{ + HANDLE w4[4] = { evt, }; + DWORD cnt = 2; + DWORD timer_idx = 0; + int ret = 0; + + wait_signal_arrived here (w4[1]); + if ((w4[cnt] = pthread::get_cancel_event ()) != NULL) + ++cnt; + if (abstime) + { + if (!valid_timespec (*abstime)) + return EINVAL; + + /* If a timeout is set, we create a waitable timer to wait for. + This is the easiest way to handle the absolute timeout value, given + that NtSetTimer also takes absolute times and given the double + dependency on evt *and* mtx, which requires to call WFMO twice. */ + NTSTATUS status; + LARGE_INTEGER duetime; + + timer_idx = cnt++; + status = NtCreateTimer (&w4[timer_idx], TIMER_ALL_ACCESS, NULL, + NotificationTimer); + if (!NT_SUCCESS (status)) + return geterrno_from_nt_status (status); + timespec_to_filetime (abstime, &duetime); + status = NtSetTimer (w4[timer_idx], &duetime, NULL, NULL, FALSE, 0, NULL); + if (!NT_SUCCESS (status)) + { + NtClose (w4[timer_idx]); + return geterrno_from_nt_status (status); + } + } + ResetEvent (evt); + if ((ret = mutex_unlock (mtx)) != 0) + return ret; + /* Everything's set up, so now wait for the event to be signalled. */ +restart1: + switch (WaitForMultipleObjects (cnt, w4, FALSE, INFINITE)) + { + case WAIT_OBJECT_0: + break; + case WAIT_OBJECT_0 + 1: + if (_my_tls.call_signal_handler ()) + goto restart1; + ret = EINTR; + break; + case WAIT_OBJECT_0 + 2: + if (timer_idx != 2) + pthread::static_cancel_self (); + fallthrough; + case WAIT_OBJECT_0 + 3: + ret = ETIMEDOUT; + break; + default: + ret = geterrno_from_win_error (); + break; + } + if (ret == 0) + { + /* At this point we need to lock the mutex. The wait is practically + the same as before, just that we now wait on the mutex instead of the + event. */ + restart2: + w4[0] = mtx; + switch (WaitForMultipleObjects (cnt, w4, FALSE, INFINITE)) + { + case WAIT_OBJECT_0: + case WAIT_ABANDONED_0: + break; + case WAIT_OBJECT_0 + 1: + if (_my_tls.call_signal_handler ()) + goto restart2; + ret = EINTR; + break; + case WAIT_OBJECT_0 + 2: + if (timer_idx != 2) + pthread_testcancel (); + fallthrough; + case WAIT_OBJECT_0 + 3: + ret = ETIMEDOUT; + break; + default: + ret = geterrno_from_win_error (); + break; + } + } + if (timer_idx) + { + if (ret != ETIMEDOUT) + NtCancelTimer (w4[timer_idx], NULL); + NtClose (w4[timer_idx]); + } + return ret; +} + +void +fhandler_mqueue::cond_signal (HANDLE evt) +{ + SetEvent (evt); +} + +int +fhandler_mqueue::mq_getattr (struct mq_attr *mqstat) +{ + int n; + struct mq_hdr *mqhdr; + struct mq_fattr *attr; + + __try + { + mqhdr = mqinfo ()->mqi_hdr; + attr = &mqhdr->mqh_attr; + if ((n = mutex_lock (mqinfo ()->mqi_lock, false)) != 0) + { + errno = n; + __leave; + } + mqstat->mq_flags = is_nonblocking () ? O_NONBLOCK : 0; /* per-open */ + mqstat->mq_maxmsg = attr->mq_maxmsg; /* remaining three per-queue */ + mqstat->mq_msgsize = attr->mq_msgsize; + mqstat->mq_curmsgs = attr->mq_curmsgs; + + mutex_unlock (mqinfo ()->mqi_lock); + return 0; + } + __except (EBADF) {} + __endtry + return -1; +} + +int +fhandler_mqueue::mq_setattr (const struct mq_attr *mqstat, + struct mq_attr *omqstat) +{ + int n; + struct mq_hdr *mqhdr; + struct mq_fattr *attr; + + __try + { + mqhdr = mqinfo ()->mqi_hdr; + attr = &mqhdr->mqh_attr; + if ((n = mutex_lock (mqinfo ()->mqi_lock, false)) != 0) + { + errno = n; + __leave; + } + + if (omqstat != NULL) + { + omqstat->mq_flags = is_nonblocking () ? O_NONBLOCK : 0; + omqstat->mq_maxmsg = attr->mq_maxmsg; + omqstat->mq_msgsize = attr->mq_msgsize; + omqstat->mq_curmsgs = attr->mq_curmsgs; /* and current status */ + } + + set_nonblocking (mqstat->mq_flags & O_NONBLOCK); + + mutex_unlock (mqinfo ()->mqi_lock); + return 0; + } + __except (EBADF) {} + __endtry + return -1; +} + +int +fhandler_mqueue::mq_notify (const struct sigevent *notification) +{ + int n; + pid_t pid; + struct mq_hdr *mqhdr; + + __try + { + mqhdr = mqinfo ()->mqi_hdr; + if ((n = mutex_lock (mqinfo ()->mqi_lock, false)) != 0) + { + errno = n; + __leave; + } + + pid = myself->pid; + if (!notification) + { + if (mqhdr->mqh_pid == pid) + mqhdr->mqh_pid = 0; /* unregister calling process */ + } + else + { + if (mqhdr->mqh_pid != 0) + { + if (kill (mqhdr->mqh_pid, 0) != -1 || errno != ESRCH) + { + set_errno (EBUSY); + mutex_unlock (mqinfo ()->mqi_lock); + __leave; + } + } + mqhdr->mqh_pid = pid; + mqhdr->mqh_event = *notification; + } + mutex_unlock (mqinfo ()->mqi_lock); + return 0; + } + __except (EBADF) {} + __endtry + return -1; +} + +int +fhandler_mqueue::mq_timedsend (const char *ptr, size_t len, unsigned int prio, + const struct timespec *abstime) +{ + int n; + long index, freeindex; + int8_t *mptr; + struct sigevent *sigev; + struct mq_hdr *mqhdr; + struct mq_fattr *attr; + struct msg_hdr *msghdr, *nmsghdr, *pmsghdr; + bool mutex_locked = false; + int ret = -1; + + pthread_testcancel (); + + __try + { + if (prio >= MQ_PRIO_MAX) + { + set_errno (EINVAL); + __leave; + } + + mqhdr = mqinfo ()->mqi_hdr; /* struct pointer */ + mptr = (int8_t *) mqhdr; /* byte pointer */ + attr = &mqhdr->mqh_attr; + if ((n = mutex_lock (mqinfo ()->mqi_lock, true)) != 0) + { + errno = n; + __leave; + } + mutex_locked = true; + if (len > (size_t) attr->mq_msgsize) + { + set_errno (EMSGSIZE); + __leave; + } + if (attr->mq_curmsgs == 0) + { + if (mqhdr->mqh_pid != 0 && mqhdr->mqh_nwait == 0) + { + sigev = &mqhdr->mqh_event; + if (sigev->sigev_notify == SIGEV_SIGNAL) + sigqueue (mqhdr->mqh_pid, sigev->sigev_signo, + sigev->sigev_value); + mqhdr->mqh_pid = 0; /* unregister */ + } + } + else if (attr->mq_curmsgs >= attr->mq_maxmsg) + { + /* Queue is full */ + if (is_nonblocking ()) + { + set_errno (EAGAIN); + __leave; + } + /* Wait for room for one message on the queue */ + while (attr->mq_curmsgs >= attr->mq_maxmsg) + { + int ret = cond_timedwait (mqinfo ()->mqi_waitsend, + mqinfo ()->mqi_lock, abstime); + if (ret != 0) + { + set_errno (ret); + __leave; + } + } + } + + /* nmsghdr will point to new message */ + if ((freeindex = mqhdr->mqh_free) == 0) + api_fatal ("mq_send: curmsgs = %ld; free = 0", attr->mq_curmsgs); + + nmsghdr = (struct msg_hdr *) &mptr[freeindex]; + nmsghdr->msg_prio = prio; + nmsghdr->msg_len = len; + memcpy (nmsghdr + 1, ptr, len); /* copy message from caller */ + mqhdr->mqh_free = nmsghdr->msg_next; /* new freelist head */ + + /* Find right place for message in linked list */ + index = mqhdr->mqh_head; + pmsghdr = (struct msg_hdr *) &(mqhdr->mqh_head); + while (index) + { + msghdr = (struct msg_hdr *) &mptr[index]; + if (prio > msghdr->msg_prio) + { + nmsghdr->msg_next = index; + pmsghdr->msg_next = freeindex; + break; + } + index = msghdr->msg_next; + pmsghdr = msghdr; + } + if (index == 0) + { + /* Queue was empty or new goes at end of list */ + pmsghdr->msg_next = freeindex; + nmsghdr->msg_next = 0; + } + /* Wake up anyone blocked in mq_receive waiting for a message */ + if (attr->mq_curmsgs == 0) + cond_signal (mqinfo ()->mqi_waitrecv); + attr->mq_curmsgs++; + + ret = 0; + } + __except (EBADF) {} + __endtry + if (mutex_locked) + mutex_unlock (mqinfo ()->mqi_lock); + return ret; +} + +ssize_t +fhandler_mqueue::mq_timedrecv (char *ptr, size_t maxlen, unsigned int *priop, + const struct timespec *abstime) +{ + int n; + long index; + int8_t *mptr; + ssize_t len = -1; + struct mq_hdr *mqhdr; + struct mq_fattr *attr; + struct msg_hdr *msghdr; + bool mutex_locked = false; + + pthread_testcancel (); + + __try + { + mqhdr = mqinfo ()->mqi_hdr; /* struct pointer */ + mptr = (int8_t *) mqhdr; /* byte pointer */ + attr = &mqhdr->mqh_attr; + if ((n = mutex_lock (mqinfo ()->mqi_lock, true)) != 0) + { + errno = n; + __leave; + } + mutex_locked = true; + if (maxlen < (size_t) attr->mq_msgsize) + { + set_errno (EMSGSIZE); + __leave; + } + if (attr->mq_curmsgs == 0) /* queue is empty */ + { + if (is_nonblocking ()) + { + set_errno (EAGAIN); + __leave; + } + /* Wait for a message to be placed onto queue */ + mqhdr->mqh_nwait++; + while (attr->mq_curmsgs == 0) + { + int ret = cond_timedwait (mqinfo ()->mqi_waitrecv, + mqinfo ()->mqi_lock, abstime); + if (ret != 0) + { + set_errno (ret); + __leave; + } + } + mqhdr->mqh_nwait--; + } + + if ((index = mqhdr->mqh_head) == 0) + api_fatal ("mq_receive: curmsgs = %ld; head = 0", attr->mq_curmsgs); + + msghdr = (struct msg_hdr *) &mptr[index]; + mqhdr->mqh_head = msghdr->msg_next; /* new head of list */ + len = msghdr->msg_len; + memcpy(ptr, msghdr + 1, len); /* copy the message itself */ + if (priop != NULL) + *priop = msghdr->msg_prio; + + /* Just-read message goes to front of free list */ + msghdr->msg_next = mqhdr->mqh_free; + mqhdr->mqh_free = index; + + /* Wake up anyone blocked in mq_send waiting for room */ + if (attr->mq_curmsgs == attr->mq_maxmsg) + cond_signal (mqinfo ()->mqi_waitsend); + attr->mq_curmsgs--; + } + __except (EBADF) {} + __endtry + if (mutex_locked) + mutex_unlock (mqinfo ()->mqi_lock); + return len; +} |