Welcome to mirror list, hosted at ThFree Co, Russian Federation.

github.com/P-p-H-d/mlib.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPatrick Pelissier <patrick.pelissier@gmail.com>2019-04-11 21:07:27 +0300
committerPatrick Pelissier <patrick.pelissier@gmail.com>2019-04-11 21:07:27 +0300
commit01ad221972cd1f722218550608071c6806680013 (patch)
tree4e645a9aec3f9b4ad2f8e92e68b093ea6f66c0fe /m-concurrent.h
parentc269a36e8ba6c9877ab15224c03bea63bb420bc6 (diff)
Rewrite RP so that writers cannot starve and avoid warnings with thread sanitizer.
Diffstat (limited to 'm-concurrent.h')
-rw-r--r--m-concurrent.h72
1 files changed, 56 insertions, 16 deletions
diff --git a/m-concurrent.h b/m-concurrent.h
index ce0c78b..3ba7c14 100644
--- a/m-concurrent.h
+++ b/m-concurrent.h
@@ -29,7 +29,8 @@
#include "m-mutex.h"
#include "m-atomic.h"
-/* Define a protected concurrent container and its associated functions.
+/* Define a protected concurrent container and its associated functions
+ based on the given container.
USAGE: CONCURRENT_DEF(name, type [, oplist_of_the_type]) */
#define CONCURRENT_DEF(name, ...) \
CONCURRENTI_DEF(M_IF_NARGS_EQ1(__VA_ARGS__) \
@@ -45,13 +46,16 @@
(__VA_ARGS__ )))
-/* Define a protected concurrent container and its associated functions with Read Preference.
+/* Define a protected concurrent container and its associated functions
+ based on its given container. Operations that perform only read of the container
+ can be done in parallel.
USAGE: CONCURRENT_RP_DEF(name, type [, oplist_of_the_type]) */
#define CONCURRENT_RP_DEF(name, ...) \
CONCURRENTI_RP_DEF(M_IF_NARGS_EQ1(__VA_ARGS__) \
((name, __VA_ARGS__, M_GLOBAL_OPLIST_OR_DEF(__VA_ARGS__)(), M_C(name,_t), M_C(name,_it_t) ), \
(name, __VA_ARGS__, M_C(name,_t), M_C(name,_it_t))))
+
/********************************** INTERNAL ************************************/
// Deferred evaluation for the oplist definition.
@@ -593,14 +597,15 @@
// Deferred evaluation for the concurrent definition.
#define CONCURRENTI_RP_DEF(arg) CONCURRENTI_RP_DEF2 arg
-// Internal definition.
+// Internal definition of RP
#define CONCURRENTI_RP_DEF2(name, type, oplist, concurrent_t, concurrent_it_t) \
\
typedef struct M_C(name, _s) { \
struct M_C(name, _s) *self; \
m_mutex_t lock; \
- m_mutex_t read_lock; \
+ m_cond_t rw_done; \
size_t read_count; \
+ bool writer_waiting; \
m_cond_t there_is_data; /* condition raised when there is data */ \
type data; \
} concurrent_t[1]; \
@@ -615,10 +620,11 @@
M_C(name, _internal_init)(concurrent_t out) \
{ \
m_mutex_init(out->lock); \
- m_mutex_init(out->read_lock); \
+ m_cond_init(out->rw_done); \
m_cond_init(out->there_is_data); \
out->self = out; \
out->read_count = 0; \
+ out->writer_waiting = false; \
} \
\
static inline void \
@@ -626,7 +632,7 @@
{ \
assert (out->self == out); \
m_mutex_clear(out->lock); \
- m_mutex_clear(out->read_lock); \
+ m_cond_clear(out->rw_done); \
m_cond_clear(out->there_is_data); \
out->self = NULL; \
} \
@@ -636,12 +642,12 @@
{ \
struct M_C(name, _s) *self = out->self; \
assert (self == out); \
- m_mutex_lock (self->read_lock); \
- self->read_count ++; \
- if (self->read_count == 1) { \
- m_mutex_lock (self->lock); \
+ m_mutex_lock (self->lock); \
+ while (self->writer_waiting == true) { \
+ m_cond_wait(self->rw_done, self->lock); \
} \
- m_mutex_unlock (self->read_lock); \
+ self->read_count ++; \
+ m_mutex_unlock (self->lock); \
} \
\
static inline void \
@@ -649,23 +655,34 @@
{ \
struct M_C(name, _s) *self = out->self; \
assert (self == out); \
- m_mutex_lock (self->read_lock); \
+ m_mutex_lock (self->lock); \
self->read_count --; \
if (self->read_count == 0) { \
- m_mutex_unlock (self->lock); \
+ m_cond_broadcast (self->rw_done); \
} \
- m_mutex_unlock (self->read_lock); \
+ m_mutex_unlock (self->lock); \
} \
\
static inline void \
M_C(name, _write_lock)(concurrent_t out) \
{ \
m_mutex_lock (out->lock); \
+ while (out->writer_waiting == true) { \
+ m_cond_wait(out->rw_done, out->lock); \
+ } \
+ out->writer_waiting = true; \
+ while (out->read_count > 0) { \
+ m_cond_wait(out->rw_done, out->lock); \
+ } \
+ m_mutex_unlock (out->lock); \
} \
\
static inline void \
M_C(name, _write_unlock)(concurrent_t out) \
{ \
+ m_mutex_lock (out->lock); \
+ out->writer_waiting = false; \
+ m_cond_broadcast (out->rw_done); \
m_mutex_unlock (out->lock); \
} \
\
@@ -674,19 +691,42 @@
{ \
struct M_C(name, _s) *self = out->self; \
assert (self == out); \
+ m_mutex_lock (out->self->lock); \
+ self->read_count --; \
+ if (self->read_count == 0) { \
+ m_cond_broadcast (self->rw_done); \
+ } \
m_cond_wait(self->there_is_data, self->lock); \
+ while (self->writer_waiting == true) { \
+ m_cond_wait(self->rw_done, self->lock); \
+ } \
+ self->read_count ++; \
+ m_mutex_unlock (out->self->lock); \
} \
\
static inline void \
- M_C(name, _write_wait)(const concurrent_t out) \
+ M_C(name, _write_wait)(concurrent_t out) \
{ \
- m_cond_wait(out->self->there_is_data, out->self->lock); \
+ m_mutex_lock (out->lock); \
+ out->writer_waiting = false; \
+ m_cond_broadcast (out->rw_done); \
+ m_cond_wait(out->there_is_data, out->lock); \
+ while (out->writer_waiting == true) { \
+ m_cond_wait(out->rw_done, out->lock); \
+ } \
+ out->writer_waiting = true; \
+ while (out->read_count > 0) { \
+ m_cond_wait(out->rw_done, out->lock); \
+ } \
+ m_mutex_unlock (out->lock); \
} \
\
static inline void \
M_C(name, _write_signal)(concurrent_t out) \
{ \
+ m_mutex_lock (out->lock); \
m_cond_broadcast(out->there_is_data); \
+ m_mutex_unlock (out->lock); \
} \
\
CONCURRENTI_DEF_FUNC(name, type, oplist, concurrent_t, concurrent_it_t)