Welcome to mirror list, hosted at ThFree Co, Russian Federation.

github.com/P-p-H-d/mlib.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPatrick Pelissier <patrick.pelissier@gmail.com>2018-11-25 13:44:28 +0300
committerPatrick Pelissier <patrick.pelissier@gmail.com>2018-11-25 13:44:28 +0300
commitf07656757f4bf0e72a53061f4945eb7f4f667d4d (patch)
treef16d44ac7340eb8cc95af4ca7cf98fc1157fabce /m-concurrent.h
parenta1c22fb679d9605b88579c5b1a996ea3df01193a (diff)
Add another candidate for concurrent with parallel reader.
Diffstat (limited to 'm-concurrent.h')
-rw-r--r--m-concurrent.h158
1 files changed, 157 insertions, 1 deletions
diff --git a/m-concurrent.h b/m-concurrent.h
index 732e3af..759157f 100644
--- a/m-concurrent.h
+++ b/m-concurrent.h
@@ -27,6 +27,7 @@
#include "m-core.h"
#include "m-mutex.h"
+#include "m-atomic.h"
/* Define a protected concurrent container and its associated functions.
USAGE: CONCURRENT_DEF(name, type [, oplist_of_the_type]) */
@@ -51,6 +52,12 @@
((name, __VA_ARGS__, M_GLOBAL_OPLIST_OR_DEF(__VA_ARGS__)(), M_C(name,_t), M_C(name,_it_t) ), \
(name, __VA_ARGS__, M_C(name,_t), M_C(name,_it_t))))
+/* Define a protected concurrent container and its associated functions with Read Preference.
+ USAGE: CONCURRENT_RP2_DEF(name, type [, oplist_of_the_type]) */
+#define CONCURRENT_RP2_DEF(name, ...) \
+ CONCURRENTI_RP2_DEF(M_IF_NARGS_EQ1(__VA_ARGS__) \
+ ((name, __VA_ARGS__, M_GLOBAL_OPLIST_OR_DEF(__VA_ARGS__)(), M_C(name,_t), M_C(name,_it_t) ), \
+ (name, __VA_ARGS__, M_C(name,_t), M_C(name,_it_t))))
/********************************** INTERNAL ************************************/
@@ -606,7 +613,7 @@
\
typedef type M_C(name, _type_t); \
\
- /* Define the lock strategy (global & shared lock) */ \
+ /* Define the lock strategy (multi lock) */ \
static inline void \
M_C(name, _internal_init)(concurrent_t out) \
{ \
@@ -699,4 +706,153 @@
\
CONCURRENTI_DEF_FUNC(name, type, oplist, concurrent_t, concurrent_it_t)
+typedef enum {
+ CONCURRENTI_RINC = 0x100,
+ CONCURRENTI_WBITS = 0x3,
+ CONCURRENTI_PRES = 0x2,
+ CONCURRENTI_PHID = 0x1
+} concurrenti_low_byte_t;
+
+// Deferred evaluation for the concurrent definition.
+#define CONCURRENTI_RP2_DEF(arg) CONCURRENTI_RP2_DEF2 arg
+
+// Internal definition.
+#define CONCURRENTI_RP2_DEF2(name, type, oplist, concurrent_t, concurrent_it_t) \
+ \
+ typedef struct M_C(name, _s) { \
+ struct M_C(name, _s) *self; \
+ m_mutex_t lock; \
+ m_cond_t there_is_data; /* condition raised when there is data */ \
+ atomic_uint rin; \
+ atomic_uint rout; \
+ atomic_uint win; \
+ atomic_uint wout; \
+ atomic_uint rwait; \
+ atomic_uint wsignal; \
+ type data; \
+ } concurrent_t[1]; \
+ \
+ typedef struct M_C(name, _s) *M_C(name, _ptr); \
+ typedef const struct M_C(name, _s) *M_C(name, _srcptr); \
+ \
+ typedef type M_C(name, _type_t); \
+ \
+ /* Define the lock strategy (read oriented lock) */ \
+ /* See http://www.cs.unc.edu/~anderson/papers/ecrts09b.pdf */ \
+ static inline void \
+ M_C(name, _internal_init)(concurrent_t out) \
+ { \
+ m_mutex_init(out->lock); \
+ m_cond_init(out->there_is_data); \
+ out->self = out; \
+ atomic_init(&out->rin, 0); \
+ atomic_init(&out->rout, 0); \
+ atomic_init(&out->win, 0); \
+ atomic_init(&out->wout, 0); \
+ atomic_init(&out->rwait, 0); \
+ atomic_init(&out->wsignal, 0); \
+ } \
+ \
+ static inline void \
+ M_C(name, _internal_clear)(concurrent_t out) \
+ { \
+ assert (out->self == out); \
+ m_mutex_clear(out->lock); \
+ m_cond_clear(out->there_is_data); \
+ out->self = NULL; \
+ } \
+ \
+ static inline void \
+ M_C(name, _read_lock)(const concurrent_t out) \
+ { \
+ struct M_C(name, _s) *self = out->self; \
+ assert (self == out); \
+ unsigned int w = atomic_fetch_add(&self->rin, CONCURRENTI_RINC); \
+ w &= CONCURRENTI_WBITS; \
+ if (w == 0) return; \
+ unsigned int rin; \
+ do { \
+ rin = atomic_load(&self->rin); \
+ } while ( (rin & CONCURRENTI_WBITS) == w); \
+ } \
+ \
+ static inline void \
+ M_C(name, _read_unlock)(const concurrent_t out) \
+ { \
+ struct M_C(name, _s) *self = out->self; \
+ assert (self == out); \
+ atomic_fetch_add(&self->rout, CONCURRENTI_RINC); \
+ } \
+ \
+ static inline void \
+ M_C(name, _write_lock)(concurrent_t out) \
+ { \
+ unsigned int ticket = atomic_fetch_add(&out->win, 1); \
+ while (ticket != atomic_load(&out->wout)); \
+ unsigned int w = CONCURRENTI_PRES | (ticket & CONCURRENTI_PHID); \
+ ticket = atomic_fetch_add(&out->rin, w); \
+ while (ticket != atomic_load(&out->rout)); \
+ } \
+ \
+ static inline void \
+ M_C(name, _write_unlock)(concurrent_t out) \
+ { \
+ atomic_fetch_and(&out->rin, ~0xFFu); \
+ atomic_fetch_add(&out->wout, 1); \
+ } \
+ \
+ static inline void \
+ M_C(name, _read_wait)(int *p, const concurrent_t out) \
+ { \
+ (void) p; \
+ struct M_C(name, _s) *self = out->self; \
+ assert (self == out); \
+ unsigned int ticket = atomic_load(&self->wsignal); \
+ atomic_fetch_add(&self->rwait, 1); \
+ M_C(name, _read_unlock)(out); \
+ m_mutex_lock (self->lock); \
+ while (ticket == atomic_load(&out->self->wsignal)) { \
+ m_cond_wait(self->there_is_data, self->lock); \
+ } \
+ m_mutex_unlock (self->lock); \
+ M_C(name, _read_lock)(out); \
+ atomic_fetch_add(&self->rwait, -1); \
+ } \
+ \
+ static inline void \
+ M_C(name, _read_wait_unlock)(int *p, const concurrent_t out) \
+ { \
+ (void) p; \
+ M_C(name, _read_unlock)(out); \
+ } \
+ \
+ static inline void \
+ M_C(name, _write_wait)(concurrent_t out) \
+ { \
+ struct M_C(name, _s) *self = out; \
+ unsigned int ticket = atomic_load(&self->wsignal); \
+ atomic_fetch_add(&self->rwait, 1); \
+ M_C(name, _write_unlock)(out); \
+ m_mutex_lock (self->lock); \
+ while (ticket == atomic_load(&out->self->wsignal)) { \
+ m_cond_wait(self->there_is_data, self->lock); \
+ } \
+ m_mutex_unlock (self->lock); \
+ M_C(name, _write_lock)(out); \
+ atomic_fetch_add(&self->rwait, -1); \
+ } \
+ \
+ static inline void \
+ M_C(name, _write_signal)(concurrent_t out) \
+ { \
+ atomic_fetch_add(&out->wsignal, 1); \
+ if (atomic_load(&out->rwait) > 0) { \
+ m_mutex_lock (out->lock); \
+ m_cond_broadcast(out->there_is_data); \
+ m_mutex_unlock (out->lock); \
+ } \
+ } \
+ \
+ CONCURRENTI_DEF_FUNC(name, type, oplist, concurrent_t, concurrent_it_t)
+
#endif