diff options
author | Patrick Pelissier <patrick.pelissier@gmail.com> | 2018-11-25 13:44:28 +0300 |
---|---|---|
committer | Patrick Pelissier <patrick.pelissier@gmail.com> | 2018-11-25 13:44:28 +0300 |
commit | f07656757f4bf0e72a53061f4945eb7f4f667d4d (patch) | |
tree | f16d44ac7340eb8cc95af4ca7cf98fc1157fabce /m-concurrent.h | |
parent | a1c22fb679d9605b88579c5b1a996ea3df01193a (diff) |
Add another candidate for concurrent with parallel reader.
Diffstat (limited to 'm-concurrent.h')
-rw-r--r-- | m-concurrent.h | 158 |
1 files changed, 157 insertions, 1 deletions
diff --git a/m-concurrent.h b/m-concurrent.h index 732e3af..759157f 100644 --- a/m-concurrent.h +++ b/m-concurrent.h @@ -27,6 +27,7 @@ #include "m-core.h" #include "m-mutex.h" +#include "m-atomic.h" /* Define a protected concurrent container and its associated functions. USAGE: CONCURRENT_DEF(name, type [, oplist_of_the_type]) */ @@ -51,6 +52,12 @@ ((name, __VA_ARGS__, M_GLOBAL_OPLIST_OR_DEF(__VA_ARGS__)(), M_C(name,_t), M_C(name,_it_t) ), \ (name, __VA_ARGS__, M_C(name,_t), M_C(name,_it_t)))) +/* Define a protected concurrent container and its associated functions with Read Preference. + USAGE: CONCURRENT_RP2_DEF(name, type [, oplist_of_the_type]) */ +#define CONCURRENT_RP2_DEF(name, ...) \ + CONCURRENTI_RP2_DEF(M_IF_NARGS_EQ1(__VA_ARGS__) \ + ((name, __VA_ARGS__, M_GLOBAL_OPLIST_OR_DEF(__VA_ARGS__)(), M_C(name,_t), M_C(name,_it_t) ), \ + (name, __VA_ARGS__, M_C(name,_t), M_C(name,_it_t)))) /********************************** INTERNAL ************************************/ @@ -606,7 +613,7 @@ \ typedef type M_C(name, _type_t); \ \ - /* Define the lock strategy (global & shared lock) */ \ + /* Define the lock strategy (multi lock) */ \ static inline void \ M_C(name, _internal_init)(concurrent_t out) \ { \ @@ -699,4 +706,153 @@ \ CONCURRENTI_DEF_FUNC(name, type, oplist, concurrent_t, concurrent_it_t) +typedef enum { + CONCURRENTI_RINC = 0x100, + CONCURRENTI_WBITS = 0x3, + CONCURRENTI_PRES = 0x2, + CONCURRENTI_PHID = 0x1 +} concurrenti_low_byte_t; + +// Deferred evaluation for the concurrent definition. +#define CONCURRENTI_RP2_DEF(arg) CONCURRENTI_RP2_DEF2 arg + +// Internal definition. +#define CONCURRENTI_RP2_DEF2(name, type, oplist, concurrent_t, concurrent_it_t) \ + \ + typedef struct M_C(name, _s) { \ + struct M_C(name, _s) *self; \ + m_mutex_t lock; \ + m_cond_t there_is_data; /* condition raised when there is data */ \ + atomic_uint rin; \ + atomic_uint rout; \ + atomic_uint win; \ + atomic_uint wout; \ + atomic_uint rwait; \ + atomic_uint wsignal; \ + type data; \ + } concurrent_t[1]; \ + \ + typedef struct M_C(name, _s) *M_C(name, _ptr); \ + typedef const struct M_C(name, _s) *M_C(name, _srcptr); \ + \ + typedef type M_C(name, _type_t); \ + \ + /* Define the lock strategy (read oriented lock) */ \ + /* See http://www.cs.unc.edu/~anderson/papers/ecrts09b.pdf */ \ + static inline void \ + M_C(name, _internal_init)(concurrent_t out) \ + { \ + m_mutex_init(out->lock); \ + m_cond_init(out->there_is_data); \ + out->self = out; \ + atomic_init(&out->rin, 0); \ + atomic_init(&out->rout, 0); \ + atomic_init(&out->win, 0); \ + atomic_init(&out->wout, 0); \ + atomic_init(&out->rwait, 0); \ + atomic_init(&out->wsignal, 0); \ + } \ + \ + static inline void \ + M_C(name, _internal_clear)(concurrent_t out) \ + { \ + assert (out->self == out); \ + m_mutex_clear(out->lock); \ + m_cond_clear(out->there_is_data); \ + out->self = NULL; \ + } \ + \ + static inline void \ + M_C(name, _read_lock)(const concurrent_t out) \ + { \ + struct M_C(name, _s) *self = out->self; \ + assert (self == out); \ + unsigned int w = atomic_fetch_add(&self->rin, CONCURRENTI_RINC); \ + w &= CONCURRENTI_WBITS; \ + if (w == 0) return; \ + unsigned int rin; \ + do { \ + rin = atomic_load(&self->rin); \ + } while ( (rin & CONCURRENTI_WBITS) == w); \ + } \ + \ + static inline void \ + M_C(name, _read_unlock)(const concurrent_t out) \ + { \ + struct M_C(name, _s) *self = out->self; \ + assert (self == out); \ + atomic_fetch_add(&self->rout, CONCURRENTI_RINC); \ + } \ + \ + static inline void \ + M_C(name, _write_lock)(concurrent_t out) \ + { \ + unsigned int ticket = atomic_fetch_add(&out->win, 1); \ + while (ticket != atomic_load(&out->wout)); \ + unsigned int w = CONCURRENTI_PRES | (ticket & CONCURRENTI_PHID); \ + ticket = atomic_fetch_add(&out->rin, w); \ + while (ticket != atomic_load(&out->rout)); \ + } \ + \ + static inline void \ + M_C(name, _write_unlock)(concurrent_t out) \ + { \ + atomic_fetch_and(&out->rin, ~0xFFu); \ + atomic_fetch_add(&out->wout, 1); \ + } \ + \ + static inline void \ + M_C(name, _read_wait)(int *p, const concurrent_t out) \ + { \ + (void) p; \ + struct M_C(name, _s) *self = out->self; \ + assert (self == out); \ + unsigned int ticket = atomic_load(&self->wsignal); \ + atomic_fetch_add(&self->rwait, 1); \ + M_C(name, _read_unlock)(out); \ + m_mutex_lock (self->lock); \ + while (ticket == atomic_load(&out->self->wsignal)) { \ + m_cond_wait(self->there_is_data, self->lock); \ + } \ + m_mutex_unlock (self->lock); \ + M_C(name, _read_lock)(out); \ + atomic_fetch_add(&self->rwait, -1); \ + } \ + \ + static inline void \ + M_C(name, _read_wait_unlock)(int *p, const concurrent_t out) \ + { \ + (void) p; \ + M_C(name, _read_unlock)(out); \ + } \ + \ + static inline void \ + M_C(name, _write_wait)(concurrent_t out) \ + { \ + struct M_C(name, _s) *self = out; \ + unsigned int ticket = atomic_load(&self->wsignal); \ + atomic_fetch_add(&self->rwait, 1); \ + M_C(name, _write_unlock)(out); \ + m_mutex_lock (self->lock); \ + while (ticket == atomic_load(&out->self->wsignal)) { \ + m_cond_wait(self->there_is_data, self->lock); \ + } \ + m_mutex_unlock (self->lock); \ + M_C(name, _write_lock)(out); \ + atomic_fetch_add(&self->rwait, -1); \ + } \ + \ + static inline void \ + M_C(name, _write_signal)(concurrent_t out) \ + { \ + atomic_fetch_add(&out->wsignal, 1); \ + if (atomic_load(&out->rwait) > 0) { \ + m_mutex_lock (out->lock); \ + m_cond_broadcast(out->there_is_data); \ + m_mutex_unlock (out->lock); \ + } \ + } \ + \ + CONCURRENTI_DEF_FUNC(name, type, oplist, concurrent_t, concurrent_it_t) + #endif |