diff options
author | Victorien Le Couviour--Tuffet <victorien@videolan.org> | 2022-06-28 20:14:07 +0300 |
---|---|---|
committer | Victorien Le Couviour--Tuffet <victorien@videolan.org> | 2022-10-27 16:03:22 +0300 |
commit | 8f16314dba8bd1ec9d958bbb8bfba002967bd6a8 (patch) | |
tree | be21368dcb81d0e9de8cf7f73dd6121930a81499 /src/thread_task.c | |
parent | 8a4932ff035e2fd2b139caa73f6593c2a4754f67 (diff) |
threading: Add a pending list for async task insertion
Diffstat (limited to 'src/thread_task.c')
-rw-r--r-- | src/thread_task.c | 265 |
1 files changed, 182 insertions, 83 deletions
diff --git a/src/thread_task.c b/src/thread_task.c index 90244ec..ab2376c 100644 --- a/src/thread_task.c +++ b/src/thread_task.c @@ -173,6 +173,43 @@ static inline void insert_task(Dav1dFrameContext *const f, insert_tasks(f, t, t, cond_signal); } +static inline void add_pending(Dav1dFrameContext *const f, Dav1dTask *const t) { + pthread_mutex_lock(&f->task_thread.pending_tasks.lock); + t->next = NULL; + if (!f->task_thread.pending_tasks.head) + f->task_thread.pending_tasks.head = t; + else + f->task_thread.pending_tasks.tail->next = t; + f->task_thread.pending_tasks.tail = t; + atomic_store(&f->task_thread.pending_tasks.merge, 1); + pthread_mutex_unlock(&f->task_thread.pending_tasks.lock); +} + +static inline int merge_pending_frame(Dav1dFrameContext *const f) { + int const merge = atomic_load(&f->task_thread.pending_tasks.merge); + if (merge) { + pthread_mutex_lock(&f->task_thread.pending_tasks.lock); + Dav1dTask *t = f->task_thread.pending_tasks.head; + f->task_thread.pending_tasks.head = NULL; + f->task_thread.pending_tasks.tail = NULL; + atomic_store(&f->task_thread.pending_tasks.merge, 0); + pthread_mutex_unlock(&f->task_thread.pending_tasks.lock); + while (t) { + Dav1dTask *const tmp = t->next; + insert_task(f, t, 0); + t = tmp; + } + } + return merge; +} + +static inline int merge_pending(const Dav1dContext *const c) { + int res = 0; + for (unsigned i = 0; i < c->n_fc; i++) + res |= merge_pending_frame(&c->fc[i]); + return res; +} + static int create_filter_sbrow(Dav1dFrameContext *const f, const int pass, Dav1dTask **res_t) { @@ -234,16 +271,18 @@ int dav1d_task_create_tile_sbrow(Dav1dFrameContext *const f, const int pass, Dav1dTask *tasks = f->task_thread.tile_tasks[0]; const int uses_2pass = f->c->n_fc > 1; const int num_tasks = f->frame_hdr->tiling.cols * f->frame_hdr->tiling.rows; - int alloc_num_tasks = num_tasks * (1 + uses_2pass); - if (alloc_num_tasks > f->task_thread.num_tile_tasks) { - const size_t size = sizeof(Dav1dTask) * alloc_num_tasks; - tasks = realloc(f->task_thread.tile_tasks[0], size); - if (!tasks) return -1; - memset(tasks, 0, size); - f->task_thread.tile_tasks[0] = tasks; - f->task_thread.num_tile_tasks = alloc_num_tasks; + if (pass < 2) { + int alloc_num_tasks = num_tasks * (1 + uses_2pass); + if (alloc_num_tasks > f->task_thread.num_tile_tasks) { + const size_t size = sizeof(Dav1dTask) * alloc_num_tasks; + tasks = realloc(f->task_thread.tile_tasks[0], size); + if (!tasks) return -1; + memset(tasks, 0, size); + f->task_thread.tile_tasks[0] = tasks; + f->task_thread.num_tile_tasks = alloc_num_tasks; + } + f->task_thread.tile_tasks[1] = tasks + num_tasks; } - f->task_thread.tile_tasks[1] = tasks + num_tasks; tasks += num_tasks * (pass & 1); Dav1dTask *pf_t; @@ -273,8 +312,22 @@ int dav1d_task_create_tile_sbrow(Dav1dFrameContext *const f, const int pass, prev_t->next = pf_t; prev_t = pf_t; } - insert_tasks(f, &tasks[0], prev_t, cond_signal); - f->task_thread.done[pass & 1] = 0; + prev_t->next = NULL; + + atomic_store(&f->task_thread.done[pass & 1], 0); + + // XXX in theory this could be done locklessly, at this point they are no + // tasks in the frameQ, so no other runner should be using this lock, but + // we must add both passes at once + pthread_mutex_lock(&f->task_thread.pending_tasks.lock); + assert(f->task_thread.pending_tasks.head == NULL || pass == 2); + if (!f->task_thread.pending_tasks.head) + f->task_thread.pending_tasks.head = &tasks[0]; + else + f->task_thread.pending_tasks.tail->next = &tasks[0]; + f->task_thread.pending_tasks.tail = prev_t; + atomic_store(&f->task_thread.pending_tasks.merge, 1); + pthread_mutex_unlock(&f->task_thread.pending_tasks.lock); return 0; } @@ -282,7 +335,7 @@ int dav1d_task_create_tile_sbrow(Dav1dFrameContext *const f, const int pass, void dav1d_task_frame_init(Dav1dFrameContext *const f) { const Dav1dContext *const c = f->c; - f->task_thread.init_done = 0; + atomic_store(&f->task_thread.init_done, 0); // schedule init task, which will schedule the remaining tasks Dav1dTask *const t = &f->task_thread.init_task; t->type = DAV1D_TASK_TYPE_INIT; @@ -317,16 +370,12 @@ static inline int ensure_progress(struct TaskThreadData *const ttd, // so ensure that completed. if not, re-add to task-queue; else, fall-through int p1 = atomic_load(state); if (p1 < t->sby) { + t->type = type; + t->recon_progress = t->deblock_progress = 0; + *target = t->sby; + add_pending(f, t); pthread_mutex_lock(&ttd->lock); - p1 = atomic_load(state); - if (p1 < t->sby) { - t->type = type; - t->recon_progress = t->deblock_progress = 0; - *target = t->sby; - insert_task(f, t, 0); - return 1; - } - pthread_mutex_unlock(&ttd->lock); + return 1; } return 0; } @@ -379,11 +428,29 @@ static inline int check_tile(Dav1dTask *const t, Dav1dFrameContext *const f, return 0; } +static inline int get_frame_progress(const Dav1dContext *const c, + const Dav1dFrameContext *const f) +{ + unsigned frame_prog = c->n_fc > 1 ? atomic_load(&f->sr_cur.progress[1]) : 0; + if (frame_prog >= FRAME_ERROR) + return f->sbh - 1; + int idx = frame_prog >> (f->sb_shift + 7); + int prog; + do { + atomic_uint *state = &f->frame_thread.frame_progress[idx]; + const unsigned val = ~atomic_load(state); + prog = val ? ctz(val) : 32; + if (prog != 32) break; + prog = 0; + } while (++idx < f->frame_thread.prog_sz); + return ((idx << 5) | prog) - 1; +} + static inline void abort_frame(Dav1dFrameContext *const f, const int error) { atomic_store(&f->task_thread.error, error == DAV1D_ERR(EINVAL) ? 1 : -1); - f->task_thread.task_counter = 0; - f->task_thread.done[0] = 1; - f->task_thread.done[1] = 1; + atomic_store(&f->task_thread.task_counter, 0); + atomic_store(&f->task_thread.done[0], 1); + atomic_store(&f->task_thread.done[1], 1); atomic_store(&f->sr_cur.progress[0], FRAME_ERROR); atomic_store(&f->sr_cur.progress[1], FRAME_ERROR); dav1d_decode_frame_exit(f, error); @@ -488,6 +555,8 @@ void *dav1d_worker_task(void *data) { for (;;) { if (tc->task_thread.die) break; if (atomic_load(c->flush)) goto park; + + merge_pending(c); if (ttd->delayed_fg.exec) { // run delayed film grain first delayed_fg_task(c, ttd); continue; @@ -498,11 +567,18 @@ void *dav1d_worker_task(void *data) { for (unsigned i = 0; i < c->n_fc; i++) { const unsigned first = atomic_load(&ttd->first); f = &c->fc[(first + i) % c->n_fc]; - if (f->task_thread.init_done) continue; + if (atomic_load(&f->task_thread.init_done)) continue; t = f->task_thread.task_head; if (!t) continue; if (t->type == DAV1D_TASK_TYPE_INIT) goto found; if (t->type == DAV1D_TASK_TYPE_INIT_CDF) { + // XXX This can be a simple else, if adding tasks of both + // passes at once (in dav1d_task_create_tile_sbrow). + // Adding the tasks to the pending Q can result in a + // thread merging them before setting init_done. + // We will need to set init_done before adding to the + // pending Q, so maybe return the tasks, set init_done, + // and add to pending Q only then. const int p1 = f->in_cdf.progress ? atomic_load(f->in_cdf.progress) : 1; if (p1) { @@ -515,6 +591,7 @@ void *dav1d_worker_task(void *data) { while (ttd->cur < c->n_fc) { // run decoding tasks last const unsigned first = atomic_load(&ttd->first); f = &c->fc[(first + ttd->cur) % c->n_fc]; + merge_pending_frame(f); prev_t = f->task_thread.task_cur_prev; t = prev_t ? prev_t->next : f->task_thread.task_head; while (t) { @@ -529,11 +606,12 @@ void *dav1d_worker_task(void *data) { } else if (t->recon_progress) { const int p = t->type == DAV1D_TASK_TYPE_ENTROPY_PROGRESS; int error = atomic_load(&f->task_thread.error); - assert(!f->task_thread.done[p] || error); + assert(!atomic_load(&f->task_thread.done[p]) || error); const int tile_row_base = f->frame_hdr->tiling.cols * f->frame_thread.next_tile_row[p]; if (p) { - const int p1 = f->frame_thread.entropy_progress; + atomic_int *const prog = &f->frame_thread.entropy_progress; + const int p1 = atomic_load(prog); if (p1 < t->sby) goto next; atomic_fetch_or(&f->task_thread.error, p1 == TILE_ERROR); } @@ -577,6 +655,7 @@ void *dav1d_worker_task(void *data) { ttd->cur++; } if (reset_task_cur(c, ttd, UINT_MAX)) continue; + if (merge_pending(c)) continue; park: tc->task_thread.flushed = 1; pthread_cond_signal(&tc->task_thread.td.cond); @@ -594,6 +673,7 @@ void *dav1d_worker_task(void *data) { if (!t->next) f->task_thread.task_tail = prev_t; if (t->type > DAV1D_TASK_TYPE_INIT_CDF && !f->task_thread.task_head) ttd->cur++; + t->next = NULL; // we don't need to check cond_signaled here, since we found a task // after the last signal so we want to re-signal the next waiting thread // and again won't need to signal after that @@ -615,13 +695,13 @@ void *dav1d_worker_task(void *data) { if (res || p1 == TILE_ERROR) { pthread_mutex_lock(&ttd->lock); abort_frame(f, res ? res : DAV1D_ERR(EINVAL)); - } else if (!res) { + reset_task_cur(c, ttd, t->frame_idx); + } else { t->type = DAV1D_TASK_TYPE_INIT_CDF; if (p1) goto found_unlocked; + add_pending(f, t); pthread_mutex_lock(&ttd->lock); - insert_task(f, t, 0); } - reset_task_cur(c, ttd, t->frame_idx); continue; } case DAV1D_TASK_TYPE_INIT_CDF: { @@ -629,7 +709,6 @@ void *dav1d_worker_task(void *data) { int res = DAV1D_ERR(EINVAL); if (!atomic_load(&f->task_thread.error)) res = dav1d_decode_frame_init_cdf(f); - pthread_mutex_lock(&ttd->lock); if (f->frame_hdr->refresh_context && !f->task_thread.update_set) { atomic_store(f->out_cdf.progress, res < 0 ? TILE_ERROR : 1); } @@ -638,23 +717,34 @@ void *dav1d_worker_task(void *data) { for (int p = 1; p <= 2; p++) { const int res = dav1d_task_create_tile_sbrow(f, p, 0); if (res) { + pthread_mutex_lock(&ttd->lock); // memory allocation failed - f->task_thread.done[2 - p] = 1; + atomic_store(&f->task_thread.done[2 - p], 1); atomic_store(&f->task_thread.error, -1); - f->task_thread.task_counter -= f->sbh + - f->frame_hdr->tiling.cols * f->frame_hdr->tiling.rows; + atomic_fetch_sub(&f->task_thread.task_counter, + f->frame_hdr->tiling.cols * + f->frame_hdr->tiling.rows + f->sbh); atomic_store(&f->sr_cur.progress[p - 1], FRAME_ERROR); - if (p == 2 && f->task_thread.done[1]) { - assert(!f->task_thread.task_counter); + if (p == 2 && atomic_load(&f->task_thread.done[1])) { + assert(!atomic_load(&f->task_thread.task_counter)); dav1d_decode_frame_exit(f, DAV1D_ERR(ENOMEM)); f->n_tile_data = 0; pthread_cond_signal(&f->task_thread.cond); + atomic_store(&f->task_thread.init_done, 1); + continue; + } else { + pthread_mutex_unlock(&ttd->lock); } } } - } else abort_frame(f, res); - reset_task_cur(c, ttd, t->frame_idx); - f->task_thread.init_done = 1; + atomic_store(&f->task_thread.init_done, 1); + pthread_mutex_lock(&ttd->lock); + } else { + pthread_mutex_lock(&ttd->lock); + abort_frame(f, res); + reset_task_cur(c, ttd, t->frame_idx); + atomic_store(&f->task_thread.init_done, 1); + } continue; } case DAV1D_TASK_TYPE_TILE_ENTROPY: @@ -683,10 +773,9 @@ void *dav1d_worker_task(void *data) { pthread_cond_signal(&ttd->cond); goto found_unlocked; } - pthread_mutex_lock(&ttd->lock); atomic_store(&ts->progress[p], progress); - reset_task_cur(c, ttd, t->frame_idx); - insert_task(f, t, 0); + add_pending(f, t); + pthread_mutex_lock(&ttd->lock); } else { pthread_mutex_lock(&ttd->lock); atomic_store(&ts->progress[p], progress); @@ -702,15 +791,16 @@ void *dav1d_worker_task(void *data) { if (c->n_fc > 1) atomic_store(f->out_cdf.progress, error ? TILE_ERROR : 1); } - if (!--f->task_thread.task_counter && f->task_thread.done[0] && - (!uses_2pass || f->task_thread.done[1])) + if (atomic_fetch_sub(&f->task_thread.task_counter, 1) - 1 == 0 && + atomic_load(&f->task_thread.done[0]) && + (!uses_2pass || atomic_load(&f->task_thread.done[1]))) { dav1d_decode_frame_exit(f, error == 1 ? DAV1D_ERR(EINVAL) : error ? DAV1D_ERR(ENOMEM) : 0); f->n_tile_data = 0; pthread_cond_signal(&f->task_thread.cond); } - assert(f->task_thread.task_counter >= 0); + assert(atomic_load(&f->task_thread.task_counter) >= 0); if (!atomic_fetch_or(&ttd->cond_signaled, 1)) pthread_cond_signal(&ttd->cond); } @@ -744,15 +834,11 @@ void *dav1d_worker_task(void *data) { if (sby) { int prog = atomic_load(&f->frame_thread.copy_lpf_progress[(sby - 1) >> 5]); if (~prog & (1U << ((sby - 1) & 31))) { + t->type = DAV1D_TASK_TYPE_CDEF; + t->recon_progress = t->deblock_progress = 0; + add_pending(f, t); pthread_mutex_lock(&ttd->lock); - prog = atomic_load(&f->frame_thread.copy_lpf_progress[(sby - 1) >> 5]); - if (~prog & (1U << ((sby - 1) & 31))) { - t->type = DAV1D_TASK_TYPE_CDEF; - t->recon_progress = t->deblock_progress = 0; - insert_task(f, t, 0); - continue; - } - pthread_mutex_unlock(&ttd->lock); + continue; } } } @@ -786,40 +872,53 @@ void *dav1d_worker_task(void *data) { const int uses_2pass = c->n_fc > 1; const int sbh = f->sbh; const int sbsz = f->sb_step * 4; - const enum PlaneType progress_plane_type = - t->type == DAV1D_TASK_TYPE_ENTROPY_PROGRESS ? PLANE_TYPE_BLOCK : - c->n_fc > 1 ? PLANE_TYPE_Y : PLANE_TYPE_ALL; - if (t->type != DAV1D_TASK_TYPE_ENTROPY_PROGRESS) - atomic_fetch_or(&f->frame_thread.frame_progress[sby >> 5], - 1U << (sby & 31)); - pthread_mutex_lock(&ttd->lock); - if (t->type != DAV1D_TASK_TYPE_ENTROPY_PROGRESS) { - unsigned frame_prog = c->n_fc > 1 ? atomic_load(&f->sr_cur.progress[1]) : 0; - if (frame_prog < FRAME_ERROR) { - int idx = frame_prog >> (f->sb_shift + 7); - int prog; - do { - atomic_uint *state = &f->frame_thread.frame_progress[idx]; - const unsigned val = ~atomic_load(state); - prog = val ? ctz(val) : 32; - if (prog != 32) break; - prog = 0; - } while (++idx < f->frame_thread.prog_sz); - sby = ((idx << 5) | prog) - 1; - } else sby = sbh - 1; + if (t->type == DAV1D_TASK_TYPE_ENTROPY_PROGRESS) { + error = atomic_load(&f->task_thread.error); + const unsigned y = sby + 1 == sbh ? UINT_MAX : (unsigned)(sby + 1) * sbsz; + assert(c->n_fc > 1); + if (f->sr_cur.p.data[0] /* upon flush, this can be free'ed already */) + atomic_store(&f->sr_cur.progress[0], error ? FRAME_ERROR : y); + atomic_store(&f->frame_thread.entropy_progress, + error ? TILE_ERROR : sby + 1); + if (sby + 1 == sbh) + atomic_store(&f->task_thread.done[1], 1); + pthread_mutex_lock(&ttd->lock); + const int num_tasks = atomic_fetch_sub(&f->task_thread.task_counter, 1) - 1; + if (sby + 1 < sbh && num_tasks) { + reset_task_cur(c, ttd, t->frame_idx); + continue; + } + if (!num_tasks && atomic_load(&f->task_thread.done[0]) && + atomic_load(&f->task_thread.done[1])) + { + dav1d_decode_frame_exit(f, error == 1 ? DAV1D_ERR(EINVAL) : + error ? DAV1D_ERR(ENOMEM) : 0); + f->n_tile_data = 0; + pthread_cond_signal(&f->task_thread.cond); + } + reset_task_cur(c, ttd, t->frame_idx); + continue; } + // t->type != DAV1D_TASK_TYPE_ENTROPY_PROGRESS + atomic_fetch_or(&f->frame_thread.frame_progress[sby >> 5], + 1U << (sby & 31)); + pthread_mutex_lock(&f->task_thread.lock); + sby = get_frame_progress(c, f); error = atomic_load(&f->task_thread.error); const unsigned y = sby + 1 == sbh ? UINT_MAX : (unsigned)(sby + 1) * sbsz; - if (c->n_fc > 1 && f->sr_cur.p.data[0] /* upon flush, this can be free'ed already */) { - const int idx = t->type != DAV1D_TASK_TYPE_ENTROPY_PROGRESS; - atomic_store(&f->sr_cur.progress[idx], error ? FRAME_ERROR : y); - } - if (progress_plane_type == PLANE_TYPE_BLOCK) - f->frame_thread.entropy_progress = error ? TILE_ERROR : sby + 1; + if (c->n_fc > 1 && f->sr_cur.p.data[0] /* upon flush, this can be free'ed already */) + atomic_store(&f->sr_cur.progress[1], error ? FRAME_ERROR : y); + pthread_mutex_unlock(&f->task_thread.lock); if (sby + 1 == sbh) - f->task_thread.done[progress_plane_type == PLANE_TYPE_BLOCK] = 1; - if (!--f->task_thread.task_counter && - f->task_thread.done[0] && (!uses_2pass || f->task_thread.done[1])) + atomic_store(&f->task_thread.done[0], 1); + pthread_mutex_lock(&ttd->lock); + const int num_tasks = atomic_fetch_sub(&f->task_thread.task_counter, 1) - 1; + if (sby + 1 < sbh && num_tasks) { + reset_task_cur(c, ttd, t->frame_idx); + continue; + } + if (!num_tasks && atomic_load(&f->task_thread.done[0]) && + (!uses_2pass || atomic_load(&f->task_thread.done[1]))) { dav1d_decode_frame_exit(f, error == 1 ? DAV1D_ERR(EINVAL) : error ? DAV1D_ERR(ENOMEM) : 0); |