Welcome to mirror list, hosted at ThFree Co, Russian Federation.

github.com/nodejs/node.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
Diffstat (limited to 'deps/uv/src/threadpool.c')
-rw-r--r--deps/uv/src/threadpool.c86
1 files changed, 73 insertions, 13 deletions
diff --git a/deps/uv/src/threadpool.c b/deps/uv/src/threadpool.c
index 413d1c204c2..4875f27beb9 100644
--- a/deps/uv/src/threadpool.c
+++ b/deps/uv/src/threadpool.c
@@ -33,12 +33,18 @@ static uv_once_t once = UV_ONCE_INIT;
static uv_cond_t cond;
static uv_mutex_t mutex;
static unsigned int idle_threads;
+static unsigned int slow_io_work_running;
static unsigned int nthreads;
static uv_thread_t* threads;
static uv_thread_t default_threads[4];
static QUEUE exit_message;
static QUEUE wq;
+static QUEUE run_slow_work_message;
+static QUEUE slow_io_pending_wq;
+static unsigned int slow_work_thread_threshold(void) {
+ return (nthreads + 1) / 2;
+}
static void uv__cancelled(struct uv__work* w) {
abort();
@@ -51,6 +57,7 @@ static void uv__cancelled(struct uv__work* w) {
static void worker(void* arg) {
struct uv__work* w;
QUEUE* q;
+ int is_slow_work;
uv_sem_post((uv_sem_t*) arg);
arg = NULL;
@@ -58,31 +65,65 @@ static void worker(void* arg) {
for (;;) {
uv_mutex_lock(&mutex);
- while (QUEUE_EMPTY(&wq)) {
+ wait_for_work:
+ /* Keep waiting while either no work is present or only slow I/O
+ and we're at the threshold for that. */
+ while (QUEUE_EMPTY(&wq) ||
+ (QUEUE_HEAD(&wq) == &run_slow_work_message &&
+ QUEUE_NEXT(&run_slow_work_message) == &wq &&
+ slow_io_work_running >= slow_work_thread_threshold())) {
idle_threads += 1;
uv_cond_wait(&cond, &mutex);
idle_threads -= 1;
}
q = QUEUE_HEAD(&wq);
-
- if (q == &exit_message)
+ if (q == &exit_message) {
uv_cond_signal(&cond);
- else {
+ uv_mutex_unlock(&mutex);
+ break;
+ }
+
+ QUEUE_REMOVE(q);
+ QUEUE_INIT(q); /* Signal uv_cancel() that the work req is executing. */
+
+ is_slow_work = 0;
+ if (q == &run_slow_work_message) {
+ /* If we're at the slow I/O threshold, re-schedule until after all
+ other work in the queue is done. */
+ if (slow_io_work_running >= slow_work_thread_threshold()) {
+ QUEUE_INSERT_TAIL(&wq, q);
+ goto wait_for_work;
+ }
+
+ /* If we encountered a request to run slow I/O work but there is none
+ to run, that means it's cancelled => Start over. */
+ if (QUEUE_EMPTY(&slow_io_pending_wq))
+ goto wait_for_work;
+
+ is_slow_work = 1;
+ slow_io_work_running++;
+
+ q = QUEUE_HEAD(&slow_io_pending_wq);
QUEUE_REMOVE(q);
- QUEUE_INIT(q); /* Signal uv_cancel() that the work req is
- executing. */
+ QUEUE_INIT(q);
+
+ /* If there is more slow I/O work, schedule it to be run as well. */
+ if (!QUEUE_EMPTY(&slow_io_pending_wq)) {
+ QUEUE_INSERT_TAIL(&wq, &run_slow_work_message);
+ if (idle_threads > 0)
+ uv_cond_signal(&cond);
+ }
}
uv_mutex_unlock(&mutex);
- if (q == &exit_message)
- break;
-
w = QUEUE_DATA(q, struct uv__work, wq);
w->work(w);
uv_mutex_lock(&w->loop->wq_mutex);
+ if (is_slow_work)
+ slow_io_work_running--;
w->work = NULL; /* Signal uv_cancel() that the work req is done
executing. */
QUEUE_INSERT_TAIL(&w->loop->wq, &w->wq);
@@ -92,8 +133,20 @@ static void worker(void* arg) {
}
-static void post(QUEUE* q) {
+static void post(QUEUE* q, enum uv__work_kind kind) {
uv_mutex_lock(&mutex);
+ if (kind == UV__WORK_SLOW_IO) {
+ /* Insert into a separate queue. */
+ QUEUE_INSERT_TAIL(&slow_io_pending_wq, q);
+ if (!QUEUE_EMPTY(&run_slow_work_message)) {
+ /* Running slow I/O tasks is already scheduled => Nothing to do here.
+ The worker that runs said other task will schedule this one as well. */
+ uv_mutex_unlock(&mutex);
+ return;
+ }
+ q = &run_slow_work_message;
+ }
+
QUEUE_INSERT_TAIL(&wq, q);
if (idle_threads > 0)
uv_cond_signal(&cond);
@@ -108,7 +161,7 @@ UV_DESTRUCTOR(static void cleanup(void)) {
if (nthreads == 0)
return;
- post(&exit_message);
+ post(&exit_message, UV__WORK_CPU);
for (i = 0; i < nthreads; i++)
if (uv_thread_join(threads + i))
@@ -156,6 +209,8 @@ static void init_threads(void) {
abort();
QUEUE_INIT(&wq);
+ QUEUE_INIT(&slow_io_pending_wq);
+ QUEUE_INIT(&run_slow_work_message);
if (uv_sem_init(&sem, 0))
abort();
@@ -194,13 +249,14 @@ static void init_once(void) {
void uv__work_submit(uv_loop_t* loop,
struct uv__work* w,
+ enum uv__work_kind kind,
void (*work)(struct uv__work* w),
void (*done)(struct uv__work* w, int status)) {
uv_once(&once, init_once);
w->loop = loop;
w->work = work;
w->done = done;
- post(&w->wq);
+ post(&w->wq, kind);
}
@@ -284,7 +340,11 @@ int uv_queue_work(uv_loop_t* loop,
req->loop = loop;
req->work_cb = work_cb;
req->after_work_cb = after_work_cb;
- uv__work_submit(loop, &req->work_req, uv__queue_work, uv__queue_done);
+ uv__work_submit(loop,
+ &req->work_req,
+ UV__WORK_CPU,
+ uv__queue_work,
+ uv__queue_done);
return 0;
}