Welcome to mirror list, hosted at ThFree Co, Russian Federation.

mq_open.c « linux « sys « libc « newlib - cygwin.com/git/newlib-cygwin.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
blob: 9b72e9073d9eb9ddd5293c15c40326238cf0a65c (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
/* Copyright 2002, Red Hat Inc. */

#include <mqueue.h>
#include <fcntl.h>
#include <unistd.h>
#include <errno.h>
#include <sys/stat.h>
#include <sys/mman.h>
#include <sys/ipc.h>
#include <sys/sem.h>
#include <string.h>
#include <stdlib.h>
#include <time.h>
#include <stdarg.h>
#include <machine/weakalias.h>
#define _LIBC 1
#include <sys/lock.h>
#undef _LIBC

#include "mqlocal.h"

#define	NHASH	32	          /* Num of hash lists, must be a power of 2 */
#define	LOCHASH(i)	((i)&(NHASH-1))

static long	mq_index;	/* Index of next entry */
static struct	libc_mq *mq_hash[NHASH];   /* Hash list heads for mqopen_infos */

__LOCK_INIT(static, mq_hash_lock);

mqd_t
mq_open (const char *name, int oflag, ...)
{
  MSG *wrbuf = NULL;
  MSG *rdbuf = NULL;
  int msgqid = -1;
  int rc = -1;
  int fd = -1;
  int semid = -1;
  int created = 0;
  key_t key = (key_t)-1;
  struct mq_attr *attr = (struct mq_attr *)MAP_FAILED;
  struct sembuf sb = {0, 0, 0};
  mode_t mode = 0;
  int size;
  int i, index, saved_errno;
  char *real_name;
  char *ptr;
  struct mq_attr *user_attr = NULL;
  struct libc_mq *info;
  union semun arg;
  
  /* ignore opening slash if present */
  if (*name == '/')
    ++name;  
  size = strlen(name);

  if ((real_name = (char *)malloc (size + sizeof(MSGQ_PREFIX))) == NULL ||
      (info = (struct libc_mq *)malloc (sizeof(struct libc_mq))) == NULL)
    {
      errno = ENOSPC;
      if (real_name)
	free (real_name);
      return (mqd_t)-1;
    }
  
  /* use given name to create shared memory file name - we convert any
     slashes to underscores so we don't have to create directories */
  memcpy (real_name, MSGQ_PREFIX, sizeof(MSGQ_PREFIX) - 1);
  memcpy (real_name + sizeof(MSGQ_PREFIX) - 1, name, size + 1);
  ptr = real_name + sizeof(MSGQ_PREFIX) - 1;
  for (i = 0; i < size; ++i)
    {
      if (*ptr == '/')
	*ptr = '_';
      ++ptr;
    }

  /* open shared memory file based on msg queue open flags and then use memory
     file to create a unique key to use for semaphores, etc.. */
  if (oflag & O_CREAT)
    {
      va_list list;
      va_start (list, oflag);

      saved_errno = errno;
      mode = (mode_t)va_arg (list, int);
      user_attr = va_arg(list,struct mq_attr *);
      va_end (list);

      /* attempt to open the shared memory file for exclusive create so we know
	 whether we are the owners or not */
      fd = open (real_name, O_RDWR | O_CREAT | O_EXCL, mode);
      if (fd < 0 && (oflag & O_EXCL))
	{
	  /* we failed and the user wanted exclusive create */
	  free (real_name);
	  free (info);
	  return (mqd_t)-1;
	}
      errno = saved_errno;
      /* check if we created the file or not */
      if (fd >= 0)
        created = 1;
    }
	  
  if (fd < 0)
    fd = open (real_name, O_RDWR, 0);

  if (fd >= 0)
    key = ftok(real_name, 255);

  if (key != (key_t)-1)
    /* memory map the shared memory file so we have a global shared data area to use */
    attr = (struct mq_attr *)mmap (0, sizeof(struct mq_attr), PROT_READ | PROT_WRITE,
				   MAP_SHARED, fd, 0);
  
  if (attr != (struct mq_attr *)MAP_FAILED)
    {
      /* we need semaphores to prevent multi-process race conditions on the
	 shared storage which contains a shared structure.  The following
	 are the ones we need.
	 
	 0 = open semaphore
	 1 = number of opens
	 2 = number of writes left until queue is full
	 3 = number of reads available in queue
	 4 = notify semaphore 
	 5 = number of readers */
      arg.val = 0;
      /* make sure the creator of the shared memory file also is the creator of the
	 semaphores...this will ensure that it also creates the message queue */
      if (created)
	{
	  saved_errno = errno;
	  semid = semget (key, 6, IPC_CREAT | IPC_EXCL | mode);
	  errno = saved_errno;
	  /* now that we have created the semaphore, we should initialize it */
	  if (semid != -1)
	    semctl (semid, 0, SETVAL, arg);
	}
      else
	{
	  /* if we didn't create the shared memory file but have gotten to here, we want
	     to ensure we haven't gotten ahead of the creator temporarily so we will
	     loop until the semaphore exists.  This ensures that the creator will be the
	     one to create the message queue with the correct mode and we will be blocked
	     by the open semaphore 0.  We impose a time limit to ensure something terrible
	     hasn't gone wrong. */
	  struct timespec tms;
	  int i;

	  tms.tv_sec = 0;
	  tms.tv_nsec = 10000; /* 10 microseconds */
	  for (i = 0; i < 100; ++i)
	    {
	      if ((semid = semget (key, 6, 0)) != -1)
		break;
	      /* sleep in case we our a higher priority process */
	      nanosleep (&tms, NULL);
	    }
	}
    }

  if (semid != -1)
    {
      /* acquire main open semaphore if we didn't create it */
      if (!created)
	{
	  sb.sem_op = -1;
	  rc = semop (semid, &sb, 1);
	}
      else
	rc = 0; /* need this to continue below */
    }
      
  if (rc == 0)
    {
      if (created)
	{
	  /* the creator must get here first so the message queue will be created */
	  msgqid = msgget (key, IPC_CREAT | mode); 
	  if (msgqid >= 0)
	    {
	      /* we have created the message queue so check and set the attributes */
	      if ((wrbuf = (MSG *)malloc (user_attr->mq_msgsize + sizeof(int))) == NULL ||
		  (rdbuf = (MSG *)malloc (user_attr->mq_msgsize + sizeof(int))) == NULL ||
		  user_attr == NULL || user_attr->mq_msgsize <= 0 || user_attr->mq_maxmsg <= 0)
		{
		  /* we're out of space and we created the message queue so we should
		     try to remove it */
		  msgctl (msgqid, IPC_RMID, NULL);
		  msgqid = -1; /* allow clean up to occur below */
		  if (wrbuf && rdbuf)
		    errno = EINVAL;
		  else
		    errno = ENOSPC;
		}
	      else /* valid attributes */
		{
		  write (fd, user_attr, sizeof(struct mq_attr));
		  attr->mq_curmsgs = 0;
		  attr->mq_flags = oflag & O_NONBLOCK;
		  arg.val = 0;
		  semctl (semid, 1, SETVAL, arg); /* number of opens starts at 0 */
		  semctl (semid, 3, SETVAL, arg); /* number of reads available starts at 0 */
		  semctl (semid, 5, SETVAL, arg); /* number of readers starts at 0 */
		  arg.val = 1;
		  semctl (semid, 4, SETVAL, arg); /* notify semaphore */
		  arg.val = user_attr->mq_maxmsg;
		  semctl (semid, 2, SETVAL, arg); /* number of writes left starts at mq_maxmsg */
		}
	    }
	}
      else /* just open it */
        {
	  msgqid = msgget (key, 0);
	  wrbuf = (MSG *)malloc (attr->mq_msgsize + sizeof(int));
	  rdbuf = (MSG *)malloc (attr->mq_msgsize + sizeof(int));
        }
      
      /* release semaphore acquired earlier */
      sb.sem_op = 1;
      semop (semid, &sb, 1);
    }

  /* if we get here and we haven't got a message queue id, then we need to clean up 
     our mess and return failure */
  if (msgqid < 0)
    {
      if (fd >= 0)
	close (fd);
      if (attr != (struct mq_attr *)MAP_FAILED)
	munmap (attr, sizeof(struct mq_attr));
      if (created)
	{
	  unlink (real_name);
	  if (semid != -1)
	    semctl (semid, 0, IPC_RMID);
	}
      free (real_name);
      free (info);
      if (wrbuf)
	free (wrbuf);
      if (rdbuf)
	free (rdbuf);
      return (mqd_t)-1;
    }

  /* we are successful so register the message queue */

  /* up the count of msg queue opens */
  sb.sem_op = 1;
  sb.sem_num = 1;
  semop (semid, &sb, 1);

  /* success, translate into index into mq_info array */  
  __lock_acquire(mq_hash_lock);
  index = mq_index++;
  info->index = index;
  info->msgqid = msgqid;
  info->name = real_name;
  info->semid = semid;
  info->fd = fd;
  info->oflag = oflag;
  info->wrbuf = wrbuf;
  info->rdbuf = rdbuf;
  info->cleanup_notify = NULL;
  info->next = mq_hash[LOCHASH(index)];
  info->attr = attr;
  mq_hash[LOCHASH(index)] = info;
  __lock_release(mq_hash_lock);

  return (mqd_t)index;
}

struct libc_mq *
__find_mq (mqd_t mq)
{
  struct libc_mq *ptr;

  __lock_acquire(mq_hash_lock);

  ptr = mq_hash[LOCHASH((int)mq)];

  while (ptr)
    {
      if (ptr->index == (int)mq)
        break;
      ptr = ptr->next;
    }

  __lock_release(mq_hash_lock);

  return ptr;
}
      
void
__cleanup_mq (mqd_t mq)
{
  struct libc_mq *ptr;
  struct libc_mq *prev;
  int semid;
  struct sembuf sb = {0, 0, 0};

  __lock_acquire(mq_hash_lock);

  ptr = mq_hash[LOCHASH((int)mq)];
  prev = NULL;

  while (ptr)
    {
      if (ptr->index == (int)mq)
        break;
      prev = ptr;
      ptr = ptr->next;
    }

  if (ptr != NULL)
    {
      if (ptr->cleanup_notify != NULL)
	ptr->cleanup_notify (ptr);
      if (prev != NULL)
	prev->next = ptr->next;
      else
	mq_hash[LOCHASH((int)mq)] = NULL;
      munmap (ptr->attr, sizeof(struct mq_attr));
      close (ptr->fd);
      free (ptr->name);
      free (ptr->wrbuf);
      free (ptr->rdbuf);
      semid = ptr->semid;
      free (ptr);
      /* lower the count of msg queue opens */
      sb.sem_op = -1;
      sb.sem_num = 1;
      sb.sem_flg = IPC_NOWAIT;
      semop (semid, &sb, 1);
    }

  __lock_release(mq_hash_lock);
}