From 8f16314dba8bd1ec9d958bbb8bfba002967bd6a8 Mon Sep 17 00:00:00 2001 From: Victorien Le Couviour--Tuffet Date: Tue, 28 Jun 2022 19:14:07 +0200 Subject: threading: Add a pending list for async task insertion --- src/decode.c | 8 +- src/internal.h | 14 ++- src/lib.c | 19 +++- src/thread_task.c | 265 +++++++++++++++++++++++++++++++++++++----------------- 4 files changed, 214 insertions(+), 92 deletions(-) diff --git a/src/decode.c b/src/decode.c index be9320e..2ac190b 100644 --- a/src/decode.c +++ b/src/decode.c @@ -3459,11 +3459,12 @@ int dav1d_decode_frame(Dav1dFrameContext *const f) { // wait until all threads have completed if (!res) { if (f->c->n_tc > 1) { - pthread_mutex_lock(&f->task_thread.ttd->lock); res = dav1d_task_create_tile_sbrow(f, 0, 1); + pthread_mutex_lock(&f->task_thread.ttd->lock); + pthread_cond_signal(&f->task_thread.ttd->cond); if (!res) { while (!f->task_thread.done[0] || - f->task_thread.task_counter > 0) + atomic_load(&f->task_thread.task_counter) > 0) { pthread_cond_wait(&f->task_thread.cond, &f->task_thread.ttd->lock); @@ -3726,7 +3727,8 @@ int dav1d_submit_frame(Dav1dContext *const c) { const int uses_2pass = c->n_fc > 1; const int cols = f->frame_hdr->tiling.cols; const int rows = f->frame_hdr->tiling.rows; - f->task_thread.task_counter = (cols * rows + f->sbh) << uses_2pass; + atomic_store(&f->task_thread.task_counter, + (cols * rows + f->sbh) << uses_2pass); // ref_mvs if (IS_INTER_OR_SWITCH(f->frame_hdr) || f->frame_hdr->allow_intrabc) { diff --git a/src/internal.h b/src/internal.h index eceda98..29d07b8 100644 --- a/src/internal.h +++ b/src/internal.h @@ -275,7 +275,7 @@ struct Dav1dFrameContext { struct { int next_tile_row[2 /* 0: reconstruction, 1: entropy */]; - int entropy_progress; + atomic_int entropy_progress; atomic_int deblock_progress; // in sby units atomic_uint *frame_progress, *copy_lpf_progress; // indexed using t->by * f->b4_stride + t->bx @@ -324,22 +324,28 @@ struct Dav1dFrameContext { } lf; struct { + pthread_mutex_t lock; pthread_cond_t cond; struct TaskThreadData *ttd; struct Dav1dTask *tasks, *tile_tasks[2], init_task; int num_tasks, num_tile_tasks; - int init_done; - int done[2]; + atomic_int init_done; + atomic_int done[2]; int retval; int update_set; // whether we need to update CDF reference atomic_int error; - int task_counter; + atomic_int task_counter; struct Dav1dTask *task_head, *task_tail; // Points to the task directly before the cur pointer in the queue. // This cur pointer is theoretical here, we actually keep track of the // "prev_t" variable. This is needed to not loose the tasks in // [head;cur-1] when picking one for execution. struct Dav1dTask *task_cur_prev; + struct { // async task insertion + atomic_int merge; + pthread_mutex_t lock; + Dav1dTask *head, *tail; + } pending_tasks; } task_thread; // threading (refer to tc[] for per-thread things) diff --git a/src/lib.c b/src/lib.c index f60b7f8..841b32d 100644 --- a/src/lib.c +++ b/src/lib.c @@ -235,8 +235,18 @@ COLD int dav1d_open(Dav1dContext **const c_out, const Dav1dSettings *const s) { } for (unsigned n = 0; n < c->n_fc; n++) { Dav1dFrameContext *const f = &c->fc[n]; - if (c->n_tc > 1) - if (pthread_cond_init(&f->task_thread.cond, NULL)) goto error; + if (c->n_tc > 1) { + if (pthread_mutex_init(&f->task_thread.lock, NULL)) goto error; + if (pthread_cond_init(&f->task_thread.cond, NULL)) { + pthread_mutex_destroy(&f->task_thread.lock); + goto error; + } + if (pthread_mutex_init(&f->task_thread.pending_tasks.lock, NULL)) { + pthread_cond_destroy(&f->task_thread.cond); + pthread_mutex_destroy(&f->task_thread.lock); + goto error; + } + } f->c = c; f->task_thread.ttd = &c->task_thread; f->lf.last_sharpness = -1; @@ -595,6 +605,9 @@ void dav1d_flush(Dav1dContext *const c) { c->fc[i].task_thread.task_head = NULL; c->fc[i].task_thread.task_tail = NULL; c->fc[i].task_thread.task_cur_prev = NULL; + c->fc[i].task_thread.pending_tasks.head = NULL; + c->fc[i].task_thread.pending_tasks.tail = NULL; + atomic_init(&c->fc[i].task_thread.pending_tasks.merge, 0); } atomic_init(&c->task_thread.first, 0); c->task_thread.cur = c->n_fc; @@ -668,7 +681,9 @@ static COLD void close_internal(Dav1dContext **const c_out, int flush) { freep(&f->frame_thread.cbi); } if (c->n_tc > 1) { + pthread_mutex_destroy(&f->task_thread.pending_tasks.lock); pthread_cond_destroy(&f->task_thread.cond); + pthread_mutex_destroy(&f->task_thread.lock); } freep(&f->frame_thread.frame_progress); freep(&f->task_thread.tasks); diff --git a/src/thread_task.c b/src/thread_task.c index 90244ec..ab2376c 100644 --- a/src/thread_task.c +++ b/src/thread_task.c @@ -173,6 +173,43 @@ static inline void insert_task(Dav1dFrameContext *const f, insert_tasks(f, t, t, cond_signal); } +static inline void add_pending(Dav1dFrameContext *const f, Dav1dTask *const t) { + pthread_mutex_lock(&f->task_thread.pending_tasks.lock); + t->next = NULL; + if (!f->task_thread.pending_tasks.head) + f->task_thread.pending_tasks.head = t; + else + f->task_thread.pending_tasks.tail->next = t; + f->task_thread.pending_tasks.tail = t; + atomic_store(&f->task_thread.pending_tasks.merge, 1); + pthread_mutex_unlock(&f->task_thread.pending_tasks.lock); +} + +static inline int merge_pending_frame(Dav1dFrameContext *const f) { + int const merge = atomic_load(&f->task_thread.pending_tasks.merge); + if (merge) { + pthread_mutex_lock(&f->task_thread.pending_tasks.lock); + Dav1dTask *t = f->task_thread.pending_tasks.head; + f->task_thread.pending_tasks.head = NULL; + f->task_thread.pending_tasks.tail = NULL; + atomic_store(&f->task_thread.pending_tasks.merge, 0); + pthread_mutex_unlock(&f->task_thread.pending_tasks.lock); + while (t) { + Dav1dTask *const tmp = t->next; + insert_task(f, t, 0); + t = tmp; + } + } + return merge; +} + +static inline int merge_pending(const Dav1dContext *const c) { + int res = 0; + for (unsigned i = 0; i < c->n_fc; i++) + res |= merge_pending_frame(&c->fc[i]); + return res; +} + static int create_filter_sbrow(Dav1dFrameContext *const f, const int pass, Dav1dTask **res_t) { @@ -234,16 +271,18 @@ int dav1d_task_create_tile_sbrow(Dav1dFrameContext *const f, const int pass, Dav1dTask *tasks = f->task_thread.tile_tasks[0]; const int uses_2pass = f->c->n_fc > 1; const int num_tasks = f->frame_hdr->tiling.cols * f->frame_hdr->tiling.rows; - int alloc_num_tasks = num_tasks * (1 + uses_2pass); - if (alloc_num_tasks > f->task_thread.num_tile_tasks) { - const size_t size = sizeof(Dav1dTask) * alloc_num_tasks; - tasks = realloc(f->task_thread.tile_tasks[0], size); - if (!tasks) return -1; - memset(tasks, 0, size); - f->task_thread.tile_tasks[0] = tasks; - f->task_thread.num_tile_tasks = alloc_num_tasks; + if (pass < 2) { + int alloc_num_tasks = num_tasks * (1 + uses_2pass); + if (alloc_num_tasks > f->task_thread.num_tile_tasks) { + const size_t size = sizeof(Dav1dTask) * alloc_num_tasks; + tasks = realloc(f->task_thread.tile_tasks[0], size); + if (!tasks) return -1; + memset(tasks, 0, size); + f->task_thread.tile_tasks[0] = tasks; + f->task_thread.num_tile_tasks = alloc_num_tasks; + } + f->task_thread.tile_tasks[1] = tasks + num_tasks; } - f->task_thread.tile_tasks[1] = tasks + num_tasks; tasks += num_tasks * (pass & 1); Dav1dTask *pf_t; @@ -273,8 +312,22 @@ int dav1d_task_create_tile_sbrow(Dav1dFrameContext *const f, const int pass, prev_t->next = pf_t; prev_t = pf_t; } - insert_tasks(f, &tasks[0], prev_t, cond_signal); - f->task_thread.done[pass & 1] = 0; + prev_t->next = NULL; + + atomic_store(&f->task_thread.done[pass & 1], 0); + + // XXX in theory this could be done locklessly, at this point they are no + // tasks in the frameQ, so no other runner should be using this lock, but + // we must add both passes at once + pthread_mutex_lock(&f->task_thread.pending_tasks.lock); + assert(f->task_thread.pending_tasks.head == NULL || pass == 2); + if (!f->task_thread.pending_tasks.head) + f->task_thread.pending_tasks.head = &tasks[0]; + else + f->task_thread.pending_tasks.tail->next = &tasks[0]; + f->task_thread.pending_tasks.tail = prev_t; + atomic_store(&f->task_thread.pending_tasks.merge, 1); + pthread_mutex_unlock(&f->task_thread.pending_tasks.lock); return 0; } @@ -282,7 +335,7 @@ int dav1d_task_create_tile_sbrow(Dav1dFrameContext *const f, const int pass, void dav1d_task_frame_init(Dav1dFrameContext *const f) { const Dav1dContext *const c = f->c; - f->task_thread.init_done = 0; + atomic_store(&f->task_thread.init_done, 0); // schedule init task, which will schedule the remaining tasks Dav1dTask *const t = &f->task_thread.init_task; t->type = DAV1D_TASK_TYPE_INIT; @@ -317,16 +370,12 @@ static inline int ensure_progress(struct TaskThreadData *const ttd, // so ensure that completed. if not, re-add to task-queue; else, fall-through int p1 = atomic_load(state); if (p1 < t->sby) { + t->type = type; + t->recon_progress = t->deblock_progress = 0; + *target = t->sby; + add_pending(f, t); pthread_mutex_lock(&ttd->lock); - p1 = atomic_load(state); - if (p1 < t->sby) { - t->type = type; - t->recon_progress = t->deblock_progress = 0; - *target = t->sby; - insert_task(f, t, 0); - return 1; - } - pthread_mutex_unlock(&ttd->lock); + return 1; } return 0; } @@ -379,11 +428,29 @@ static inline int check_tile(Dav1dTask *const t, Dav1dFrameContext *const f, return 0; } +static inline int get_frame_progress(const Dav1dContext *const c, + const Dav1dFrameContext *const f) +{ + unsigned frame_prog = c->n_fc > 1 ? atomic_load(&f->sr_cur.progress[1]) : 0; + if (frame_prog >= FRAME_ERROR) + return f->sbh - 1; + int idx = frame_prog >> (f->sb_shift + 7); + int prog; + do { + atomic_uint *state = &f->frame_thread.frame_progress[idx]; + const unsigned val = ~atomic_load(state); + prog = val ? ctz(val) : 32; + if (prog != 32) break; + prog = 0; + } while (++idx < f->frame_thread.prog_sz); + return ((idx << 5) | prog) - 1; +} + static inline void abort_frame(Dav1dFrameContext *const f, const int error) { atomic_store(&f->task_thread.error, error == DAV1D_ERR(EINVAL) ? 1 : -1); - f->task_thread.task_counter = 0; - f->task_thread.done[0] = 1; - f->task_thread.done[1] = 1; + atomic_store(&f->task_thread.task_counter, 0); + atomic_store(&f->task_thread.done[0], 1); + atomic_store(&f->task_thread.done[1], 1); atomic_store(&f->sr_cur.progress[0], FRAME_ERROR); atomic_store(&f->sr_cur.progress[1], FRAME_ERROR); dav1d_decode_frame_exit(f, error); @@ -488,6 +555,8 @@ void *dav1d_worker_task(void *data) { for (;;) { if (tc->task_thread.die) break; if (atomic_load(c->flush)) goto park; + + merge_pending(c); if (ttd->delayed_fg.exec) { // run delayed film grain first delayed_fg_task(c, ttd); continue; @@ -498,11 +567,18 @@ void *dav1d_worker_task(void *data) { for (unsigned i = 0; i < c->n_fc; i++) { const unsigned first = atomic_load(&ttd->first); f = &c->fc[(first + i) % c->n_fc]; - if (f->task_thread.init_done) continue; + if (atomic_load(&f->task_thread.init_done)) continue; t = f->task_thread.task_head; if (!t) continue; if (t->type == DAV1D_TASK_TYPE_INIT) goto found; if (t->type == DAV1D_TASK_TYPE_INIT_CDF) { + // XXX This can be a simple else, if adding tasks of both + // passes at once (in dav1d_task_create_tile_sbrow). + // Adding the tasks to the pending Q can result in a + // thread merging them before setting init_done. + // We will need to set init_done before adding to the + // pending Q, so maybe return the tasks, set init_done, + // and add to pending Q only then. const int p1 = f->in_cdf.progress ? atomic_load(f->in_cdf.progress) : 1; if (p1) { @@ -515,6 +591,7 @@ void *dav1d_worker_task(void *data) { while (ttd->cur < c->n_fc) { // run decoding tasks last const unsigned first = atomic_load(&ttd->first); f = &c->fc[(first + ttd->cur) % c->n_fc]; + merge_pending_frame(f); prev_t = f->task_thread.task_cur_prev; t = prev_t ? prev_t->next : f->task_thread.task_head; while (t) { @@ -529,11 +606,12 @@ void *dav1d_worker_task(void *data) { } else if (t->recon_progress) { const int p = t->type == DAV1D_TASK_TYPE_ENTROPY_PROGRESS; int error = atomic_load(&f->task_thread.error); - assert(!f->task_thread.done[p] || error); + assert(!atomic_load(&f->task_thread.done[p]) || error); const int tile_row_base = f->frame_hdr->tiling.cols * f->frame_thread.next_tile_row[p]; if (p) { - const int p1 = f->frame_thread.entropy_progress; + atomic_int *const prog = &f->frame_thread.entropy_progress; + const int p1 = atomic_load(prog); if (p1 < t->sby) goto next; atomic_fetch_or(&f->task_thread.error, p1 == TILE_ERROR); } @@ -577,6 +655,7 @@ void *dav1d_worker_task(void *data) { ttd->cur++; } if (reset_task_cur(c, ttd, UINT_MAX)) continue; + if (merge_pending(c)) continue; park: tc->task_thread.flushed = 1; pthread_cond_signal(&tc->task_thread.td.cond); @@ -594,6 +673,7 @@ void *dav1d_worker_task(void *data) { if (!t->next) f->task_thread.task_tail = prev_t; if (t->type > DAV1D_TASK_TYPE_INIT_CDF && !f->task_thread.task_head) ttd->cur++; + t->next = NULL; // we don't need to check cond_signaled here, since we found a task // after the last signal so we want to re-signal the next waiting thread // and again won't need to signal after that @@ -615,13 +695,13 @@ void *dav1d_worker_task(void *data) { if (res || p1 == TILE_ERROR) { pthread_mutex_lock(&ttd->lock); abort_frame(f, res ? res : DAV1D_ERR(EINVAL)); - } else if (!res) { + reset_task_cur(c, ttd, t->frame_idx); + } else { t->type = DAV1D_TASK_TYPE_INIT_CDF; if (p1) goto found_unlocked; + add_pending(f, t); pthread_mutex_lock(&ttd->lock); - insert_task(f, t, 0); } - reset_task_cur(c, ttd, t->frame_idx); continue; } case DAV1D_TASK_TYPE_INIT_CDF: { @@ -629,7 +709,6 @@ void *dav1d_worker_task(void *data) { int res = DAV1D_ERR(EINVAL); if (!atomic_load(&f->task_thread.error)) res = dav1d_decode_frame_init_cdf(f); - pthread_mutex_lock(&ttd->lock); if (f->frame_hdr->refresh_context && !f->task_thread.update_set) { atomic_store(f->out_cdf.progress, res < 0 ? TILE_ERROR : 1); } @@ -638,23 +717,34 @@ void *dav1d_worker_task(void *data) { for (int p = 1; p <= 2; p++) { const int res = dav1d_task_create_tile_sbrow(f, p, 0); if (res) { + pthread_mutex_lock(&ttd->lock); // memory allocation failed - f->task_thread.done[2 - p] = 1; + atomic_store(&f->task_thread.done[2 - p], 1); atomic_store(&f->task_thread.error, -1); - f->task_thread.task_counter -= f->sbh + - f->frame_hdr->tiling.cols * f->frame_hdr->tiling.rows; + atomic_fetch_sub(&f->task_thread.task_counter, + f->frame_hdr->tiling.cols * + f->frame_hdr->tiling.rows + f->sbh); atomic_store(&f->sr_cur.progress[p - 1], FRAME_ERROR); - if (p == 2 && f->task_thread.done[1]) { - assert(!f->task_thread.task_counter); + if (p == 2 && atomic_load(&f->task_thread.done[1])) { + assert(!atomic_load(&f->task_thread.task_counter)); dav1d_decode_frame_exit(f, DAV1D_ERR(ENOMEM)); f->n_tile_data = 0; pthread_cond_signal(&f->task_thread.cond); + atomic_store(&f->task_thread.init_done, 1); + continue; + } else { + pthread_mutex_unlock(&ttd->lock); } } } - } else abort_frame(f, res); - reset_task_cur(c, ttd, t->frame_idx); - f->task_thread.init_done = 1; + atomic_store(&f->task_thread.init_done, 1); + pthread_mutex_lock(&ttd->lock); + } else { + pthread_mutex_lock(&ttd->lock); + abort_frame(f, res); + reset_task_cur(c, ttd, t->frame_idx); + atomic_store(&f->task_thread.init_done, 1); + } continue; } case DAV1D_TASK_TYPE_TILE_ENTROPY: @@ -683,10 +773,9 @@ void *dav1d_worker_task(void *data) { pthread_cond_signal(&ttd->cond); goto found_unlocked; } - pthread_mutex_lock(&ttd->lock); atomic_store(&ts->progress[p], progress); - reset_task_cur(c, ttd, t->frame_idx); - insert_task(f, t, 0); + add_pending(f, t); + pthread_mutex_lock(&ttd->lock); } else { pthread_mutex_lock(&ttd->lock); atomic_store(&ts->progress[p], progress); @@ -702,15 +791,16 @@ void *dav1d_worker_task(void *data) { if (c->n_fc > 1) atomic_store(f->out_cdf.progress, error ? TILE_ERROR : 1); } - if (!--f->task_thread.task_counter && f->task_thread.done[0] && - (!uses_2pass || f->task_thread.done[1])) + if (atomic_fetch_sub(&f->task_thread.task_counter, 1) - 1 == 0 && + atomic_load(&f->task_thread.done[0]) && + (!uses_2pass || atomic_load(&f->task_thread.done[1]))) { dav1d_decode_frame_exit(f, error == 1 ? DAV1D_ERR(EINVAL) : error ? DAV1D_ERR(ENOMEM) : 0); f->n_tile_data = 0; pthread_cond_signal(&f->task_thread.cond); } - assert(f->task_thread.task_counter >= 0); + assert(atomic_load(&f->task_thread.task_counter) >= 0); if (!atomic_fetch_or(&ttd->cond_signaled, 1)) pthread_cond_signal(&ttd->cond); } @@ -744,15 +834,11 @@ void *dav1d_worker_task(void *data) { if (sby) { int prog = atomic_load(&f->frame_thread.copy_lpf_progress[(sby - 1) >> 5]); if (~prog & (1U << ((sby - 1) & 31))) { + t->type = DAV1D_TASK_TYPE_CDEF; + t->recon_progress = t->deblock_progress = 0; + add_pending(f, t); pthread_mutex_lock(&ttd->lock); - prog = atomic_load(&f->frame_thread.copy_lpf_progress[(sby - 1) >> 5]); - if (~prog & (1U << ((sby - 1) & 31))) { - t->type = DAV1D_TASK_TYPE_CDEF; - t->recon_progress = t->deblock_progress = 0; - insert_task(f, t, 0); - continue; - } - pthread_mutex_unlock(&ttd->lock); + continue; } } } @@ -786,40 +872,53 @@ void *dav1d_worker_task(void *data) { const int uses_2pass = c->n_fc > 1; const int sbh = f->sbh; const int sbsz = f->sb_step * 4; - const enum PlaneType progress_plane_type = - t->type == DAV1D_TASK_TYPE_ENTROPY_PROGRESS ? PLANE_TYPE_BLOCK : - c->n_fc > 1 ? PLANE_TYPE_Y : PLANE_TYPE_ALL; - if (t->type != DAV1D_TASK_TYPE_ENTROPY_PROGRESS) - atomic_fetch_or(&f->frame_thread.frame_progress[sby >> 5], - 1U << (sby & 31)); - pthread_mutex_lock(&ttd->lock); - if (t->type != DAV1D_TASK_TYPE_ENTROPY_PROGRESS) { - unsigned frame_prog = c->n_fc > 1 ? atomic_load(&f->sr_cur.progress[1]) : 0; - if (frame_prog < FRAME_ERROR) { - int idx = frame_prog >> (f->sb_shift + 7); - int prog; - do { - atomic_uint *state = &f->frame_thread.frame_progress[idx]; - const unsigned val = ~atomic_load(state); - prog = val ? ctz(val) : 32; - if (prog != 32) break; - prog = 0; - } while (++idx < f->frame_thread.prog_sz); - sby = ((idx << 5) | prog) - 1; - } else sby = sbh - 1; + if (t->type == DAV1D_TASK_TYPE_ENTROPY_PROGRESS) { + error = atomic_load(&f->task_thread.error); + const unsigned y = sby + 1 == sbh ? UINT_MAX : (unsigned)(sby + 1) * sbsz; + assert(c->n_fc > 1); + if (f->sr_cur.p.data[0] /* upon flush, this can be free'ed already */) + atomic_store(&f->sr_cur.progress[0], error ? FRAME_ERROR : y); + atomic_store(&f->frame_thread.entropy_progress, + error ? TILE_ERROR : sby + 1); + if (sby + 1 == sbh) + atomic_store(&f->task_thread.done[1], 1); + pthread_mutex_lock(&ttd->lock); + const int num_tasks = atomic_fetch_sub(&f->task_thread.task_counter, 1) - 1; + if (sby + 1 < sbh && num_tasks) { + reset_task_cur(c, ttd, t->frame_idx); + continue; + } + if (!num_tasks && atomic_load(&f->task_thread.done[0]) && + atomic_load(&f->task_thread.done[1])) + { + dav1d_decode_frame_exit(f, error == 1 ? DAV1D_ERR(EINVAL) : + error ? DAV1D_ERR(ENOMEM) : 0); + f->n_tile_data = 0; + pthread_cond_signal(&f->task_thread.cond); + } + reset_task_cur(c, ttd, t->frame_idx); + continue; } + // t->type != DAV1D_TASK_TYPE_ENTROPY_PROGRESS + atomic_fetch_or(&f->frame_thread.frame_progress[sby >> 5], + 1U << (sby & 31)); + pthread_mutex_lock(&f->task_thread.lock); + sby = get_frame_progress(c, f); error = atomic_load(&f->task_thread.error); const unsigned y = sby + 1 == sbh ? UINT_MAX : (unsigned)(sby + 1) * sbsz; - if (c->n_fc > 1 && f->sr_cur.p.data[0] /* upon flush, this can be free'ed already */) { - const int idx = t->type != DAV1D_TASK_TYPE_ENTROPY_PROGRESS; - atomic_store(&f->sr_cur.progress[idx], error ? FRAME_ERROR : y); - } - if (progress_plane_type == PLANE_TYPE_BLOCK) - f->frame_thread.entropy_progress = error ? TILE_ERROR : sby + 1; + if (c->n_fc > 1 && f->sr_cur.p.data[0] /* upon flush, this can be free'ed already */) + atomic_store(&f->sr_cur.progress[1], error ? FRAME_ERROR : y); + pthread_mutex_unlock(&f->task_thread.lock); if (sby + 1 == sbh) - f->task_thread.done[progress_plane_type == PLANE_TYPE_BLOCK] = 1; - if (!--f->task_thread.task_counter && - f->task_thread.done[0] && (!uses_2pass || f->task_thread.done[1])) + atomic_store(&f->task_thread.done[0], 1); + pthread_mutex_lock(&ttd->lock); + const int num_tasks = atomic_fetch_sub(&f->task_thread.task_counter, 1) - 1; + if (sby + 1 < sbh && num_tasks) { + reset_task_cur(c, ttd, t->frame_idx); + continue; + } + if (!num_tasks && atomic_load(&f->task_thread.done[0]) && + (!uses_2pass || atomic_load(&f->task_thread.done[1]))) { dav1d_decode_frame_exit(f, error == 1 ? DAV1D_ERR(EINVAL) : error ? DAV1D_ERR(ENOMEM) : 0); -- cgit v1.2.3