diff options
author | Patrick Pelissier <patrick.pelissier@gmail.com> | 2019-04-11 21:07:27 +0300 |
---|---|---|
committer | Patrick Pelissier <patrick.pelissier@gmail.com> | 2019-04-11 21:07:27 +0300 |
commit | 01ad221972cd1f722218550608071c6806680013 (patch) | |
tree | 4e645a9aec3f9b4ad2f8e92e68b093ea6f66c0fe /m-concurrent.h | |
parent | c269a36e8ba6c9877ab15224c03bea63bb420bc6 (diff) |
Rewrite RP so that writers cannot starve and avoid warnings with thread sanitizer.
Diffstat (limited to 'm-concurrent.h')
-rw-r--r-- | m-concurrent.h | 72 |
1 files changed, 56 insertions, 16 deletions
diff --git a/m-concurrent.h b/m-concurrent.h index ce0c78b..3ba7c14 100644 --- a/m-concurrent.h +++ b/m-concurrent.h @@ -29,7 +29,8 @@ #include "m-mutex.h" #include "m-atomic.h" -/* Define a protected concurrent container and its associated functions. +/* Define a protected concurrent container and its associated functions + based on the given container. USAGE: CONCURRENT_DEF(name, type [, oplist_of_the_type]) */ #define CONCURRENT_DEF(name, ...) \ CONCURRENTI_DEF(M_IF_NARGS_EQ1(__VA_ARGS__) \ @@ -45,13 +46,16 @@ (__VA_ARGS__ ))) -/* Define a protected concurrent container and its associated functions with Read Preference. +/* Define a protected concurrent container and its associated functions + based on its given container. Operations that perform only read of the container + can be done in parallel. USAGE: CONCURRENT_RP_DEF(name, type [, oplist_of_the_type]) */ #define CONCURRENT_RP_DEF(name, ...) \ CONCURRENTI_RP_DEF(M_IF_NARGS_EQ1(__VA_ARGS__) \ ((name, __VA_ARGS__, M_GLOBAL_OPLIST_OR_DEF(__VA_ARGS__)(), M_C(name,_t), M_C(name,_it_t) ), \ (name, __VA_ARGS__, M_C(name,_t), M_C(name,_it_t)))) + /********************************** INTERNAL ************************************/ // Deferred evaluation for the oplist definition. @@ -593,14 +597,15 @@ // Deferred evaluation for the concurrent definition. #define CONCURRENTI_RP_DEF(arg) CONCURRENTI_RP_DEF2 arg -// Internal definition. +// Internal definition of RP #define CONCURRENTI_RP_DEF2(name, type, oplist, concurrent_t, concurrent_it_t) \ \ typedef struct M_C(name, _s) { \ struct M_C(name, _s) *self; \ m_mutex_t lock; \ - m_mutex_t read_lock; \ + m_cond_t rw_done; \ size_t read_count; \ + bool writer_waiting; \ m_cond_t there_is_data; /* condition raised when there is data */ \ type data; \ } concurrent_t[1]; \ @@ -615,10 +620,11 @@ M_C(name, _internal_init)(concurrent_t out) \ { \ m_mutex_init(out->lock); \ - m_mutex_init(out->read_lock); \ + m_cond_init(out->rw_done); \ m_cond_init(out->there_is_data); \ out->self = out; \ out->read_count = 0; \ + out->writer_waiting = false; \ } \ \ static inline void \ @@ -626,7 +632,7 @@ { \ assert (out->self == out); \ m_mutex_clear(out->lock); \ - m_mutex_clear(out->read_lock); \ + m_cond_clear(out->rw_done); \ m_cond_clear(out->there_is_data); \ out->self = NULL; \ } \ @@ -636,12 +642,12 @@ { \ struct M_C(name, _s) *self = out->self; \ assert (self == out); \ - m_mutex_lock (self->read_lock); \ - self->read_count ++; \ - if (self->read_count == 1) { \ - m_mutex_lock (self->lock); \ + m_mutex_lock (self->lock); \ + while (self->writer_waiting == true) { \ + m_cond_wait(self->rw_done, self->lock); \ } \ - m_mutex_unlock (self->read_lock); \ + self->read_count ++; \ + m_mutex_unlock (self->lock); \ } \ \ static inline void \ @@ -649,23 +655,34 @@ { \ struct M_C(name, _s) *self = out->self; \ assert (self == out); \ - m_mutex_lock (self->read_lock); \ + m_mutex_lock (self->lock); \ self->read_count --; \ if (self->read_count == 0) { \ - m_mutex_unlock (self->lock); \ + m_cond_broadcast (self->rw_done); \ } \ - m_mutex_unlock (self->read_lock); \ + m_mutex_unlock (self->lock); \ } \ \ static inline void \ M_C(name, _write_lock)(concurrent_t out) \ { \ m_mutex_lock (out->lock); \ + while (out->writer_waiting == true) { \ + m_cond_wait(out->rw_done, out->lock); \ + } \ + out->writer_waiting = true; \ + while (out->read_count > 0) { \ + m_cond_wait(out->rw_done, out->lock); \ + } \ + m_mutex_unlock (out->lock); \ } \ \ static inline void \ M_C(name, _write_unlock)(concurrent_t out) \ { \ + m_mutex_lock (out->lock); \ + out->writer_waiting = false; \ + m_cond_broadcast (out->rw_done); \ m_mutex_unlock (out->lock); \ } \ \ @@ -674,19 +691,42 @@ { \ struct M_C(name, _s) *self = out->self; \ assert (self == out); \ + m_mutex_lock (out->self->lock); \ + self->read_count --; \ + if (self->read_count == 0) { \ + m_cond_broadcast (self->rw_done); \ + } \ m_cond_wait(self->there_is_data, self->lock); \ + while (self->writer_waiting == true) { \ + m_cond_wait(self->rw_done, self->lock); \ + } \ + self->read_count ++; \ + m_mutex_unlock (out->self->lock); \ } \ \ static inline void \ - M_C(name, _write_wait)(const concurrent_t out) \ + M_C(name, _write_wait)(concurrent_t out) \ { \ - m_cond_wait(out->self->there_is_data, out->self->lock); \ + m_mutex_lock (out->lock); \ + out->writer_waiting = false; \ + m_cond_broadcast (out->rw_done); \ + m_cond_wait(out->there_is_data, out->lock); \ + while (out->writer_waiting == true) { \ + m_cond_wait(out->rw_done, out->lock); \ + } \ + out->writer_waiting = true; \ + while (out->read_count > 0) { \ + m_cond_wait(out->rw_done, out->lock); \ + } \ + m_mutex_unlock (out->lock); \ } \ \ static inline void \ M_C(name, _write_signal)(concurrent_t out) \ { \ + m_mutex_lock (out->lock); \ m_cond_broadcast(out->there_is_data); \ + m_mutex_unlock (out->lock); \ } \ \ CONCURRENTI_DEF_FUNC(name, type, oplist, concurrent_t, concurrent_it_t) |