Welcome to mirror list, hosted at ThFree Co, Russian Federation.

github.com/videolan/dav1d.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorVictorien Le Couviour--Tuffet <victorien@videolan.org>2022-06-28 20:14:07 +0300
committerVictorien Le Couviour--Tuffet <victorien@videolan.org>2022-10-27 16:03:22 +0300
commit8f16314dba8bd1ec9d958bbb8bfba002967bd6a8 (patch)
treebe21368dcb81d0e9de8cf7f73dd6121930a81499
parent8a4932ff035e2fd2b139caa73f6593c2a4754f67 (diff)
threading: Add a pending list for async task insertion
-rw-r--r--src/decode.c8
-rw-r--r--src/internal.h14
-rw-r--r--src/lib.c19
-rw-r--r--src/thread_task.c265
4 files changed, 214 insertions, 92 deletions
diff --git a/src/decode.c b/src/decode.c
index be9320e..2ac190b 100644
--- a/src/decode.c
+++ b/src/decode.c
@@ -3459,11 +3459,12 @@ int dav1d_decode_frame(Dav1dFrameContext *const f) {
// wait until all threads have completed
if (!res) {
if (f->c->n_tc > 1) {
- pthread_mutex_lock(&f->task_thread.ttd->lock);
res = dav1d_task_create_tile_sbrow(f, 0, 1);
+ pthread_mutex_lock(&f->task_thread.ttd->lock);
+ pthread_cond_signal(&f->task_thread.ttd->cond);
if (!res) {
while (!f->task_thread.done[0] ||
- f->task_thread.task_counter > 0)
+ atomic_load(&f->task_thread.task_counter) > 0)
{
pthread_cond_wait(&f->task_thread.cond,
&f->task_thread.ttd->lock);
@@ -3726,7 +3727,8 @@ int dav1d_submit_frame(Dav1dContext *const c) {
const int uses_2pass = c->n_fc > 1;
const int cols = f->frame_hdr->tiling.cols;
const int rows = f->frame_hdr->tiling.rows;
- f->task_thread.task_counter = (cols * rows + f->sbh) << uses_2pass;
+ atomic_store(&f->task_thread.task_counter,
+ (cols * rows + f->sbh) << uses_2pass);
// ref_mvs
if (IS_INTER_OR_SWITCH(f->frame_hdr) || f->frame_hdr->allow_intrabc) {
diff --git a/src/internal.h b/src/internal.h
index eceda98..29d07b8 100644
--- a/src/internal.h
+++ b/src/internal.h
@@ -275,7 +275,7 @@ struct Dav1dFrameContext {
struct {
int next_tile_row[2 /* 0: reconstruction, 1: entropy */];
- int entropy_progress;
+ atomic_int entropy_progress;
atomic_int deblock_progress; // in sby units
atomic_uint *frame_progress, *copy_lpf_progress;
// indexed using t->by * f->b4_stride + t->bx
@@ -324,22 +324,28 @@ struct Dav1dFrameContext {
} lf;
struct {
+ pthread_mutex_t lock;
pthread_cond_t cond;
struct TaskThreadData *ttd;
struct Dav1dTask *tasks, *tile_tasks[2], init_task;
int num_tasks, num_tile_tasks;
- int init_done;
- int done[2];
+ atomic_int init_done;
+ atomic_int done[2];
int retval;
int update_set; // whether we need to update CDF reference
atomic_int error;
- int task_counter;
+ atomic_int task_counter;
struct Dav1dTask *task_head, *task_tail;
// Points to the task directly before the cur pointer in the queue.
// This cur pointer is theoretical here, we actually keep track of the
// "prev_t" variable. This is needed to not loose the tasks in
// [head;cur-1] when picking one for execution.
struct Dav1dTask *task_cur_prev;
+ struct { // async task insertion
+ atomic_int merge;
+ pthread_mutex_t lock;
+ Dav1dTask *head, *tail;
+ } pending_tasks;
} task_thread;
// threading (refer to tc[] for per-thread things)
diff --git a/src/lib.c b/src/lib.c
index f60b7f8..841b32d 100644
--- a/src/lib.c
+++ b/src/lib.c
@@ -235,8 +235,18 @@ COLD int dav1d_open(Dav1dContext **const c_out, const Dav1dSettings *const s) {
}
for (unsigned n = 0; n < c->n_fc; n++) {
Dav1dFrameContext *const f = &c->fc[n];
- if (c->n_tc > 1)
- if (pthread_cond_init(&f->task_thread.cond, NULL)) goto error;
+ if (c->n_tc > 1) {
+ if (pthread_mutex_init(&f->task_thread.lock, NULL)) goto error;
+ if (pthread_cond_init(&f->task_thread.cond, NULL)) {
+ pthread_mutex_destroy(&f->task_thread.lock);
+ goto error;
+ }
+ if (pthread_mutex_init(&f->task_thread.pending_tasks.lock, NULL)) {
+ pthread_cond_destroy(&f->task_thread.cond);
+ pthread_mutex_destroy(&f->task_thread.lock);
+ goto error;
+ }
+ }
f->c = c;
f->task_thread.ttd = &c->task_thread;
f->lf.last_sharpness = -1;
@@ -595,6 +605,9 @@ void dav1d_flush(Dav1dContext *const c) {
c->fc[i].task_thread.task_head = NULL;
c->fc[i].task_thread.task_tail = NULL;
c->fc[i].task_thread.task_cur_prev = NULL;
+ c->fc[i].task_thread.pending_tasks.head = NULL;
+ c->fc[i].task_thread.pending_tasks.tail = NULL;
+ atomic_init(&c->fc[i].task_thread.pending_tasks.merge, 0);
}
atomic_init(&c->task_thread.first, 0);
c->task_thread.cur = c->n_fc;
@@ -668,7 +681,9 @@ static COLD void close_internal(Dav1dContext **const c_out, int flush) {
freep(&f->frame_thread.cbi);
}
if (c->n_tc > 1) {
+ pthread_mutex_destroy(&f->task_thread.pending_tasks.lock);
pthread_cond_destroy(&f->task_thread.cond);
+ pthread_mutex_destroy(&f->task_thread.lock);
}
freep(&f->frame_thread.frame_progress);
freep(&f->task_thread.tasks);
diff --git a/src/thread_task.c b/src/thread_task.c
index 90244ec..ab2376c 100644
--- a/src/thread_task.c
+++ b/src/thread_task.c
@@ -173,6 +173,43 @@ static inline void insert_task(Dav1dFrameContext *const f,
insert_tasks(f, t, t, cond_signal);
}
+static inline void add_pending(Dav1dFrameContext *const f, Dav1dTask *const t) {
+ pthread_mutex_lock(&f->task_thread.pending_tasks.lock);
+ t->next = NULL;
+ if (!f->task_thread.pending_tasks.head)
+ f->task_thread.pending_tasks.head = t;
+ else
+ f->task_thread.pending_tasks.tail->next = t;
+ f->task_thread.pending_tasks.tail = t;
+ atomic_store(&f->task_thread.pending_tasks.merge, 1);
+ pthread_mutex_unlock(&f->task_thread.pending_tasks.lock);
+}
+
+static inline int merge_pending_frame(Dav1dFrameContext *const f) {
+ int const merge = atomic_load(&f->task_thread.pending_tasks.merge);
+ if (merge) {
+ pthread_mutex_lock(&f->task_thread.pending_tasks.lock);
+ Dav1dTask *t = f->task_thread.pending_tasks.head;
+ f->task_thread.pending_tasks.head = NULL;
+ f->task_thread.pending_tasks.tail = NULL;
+ atomic_store(&f->task_thread.pending_tasks.merge, 0);
+ pthread_mutex_unlock(&f->task_thread.pending_tasks.lock);
+ while (t) {
+ Dav1dTask *const tmp = t->next;
+ insert_task(f, t, 0);
+ t = tmp;
+ }
+ }
+ return merge;
+}
+
+static inline int merge_pending(const Dav1dContext *const c) {
+ int res = 0;
+ for (unsigned i = 0; i < c->n_fc; i++)
+ res |= merge_pending_frame(&c->fc[i]);
+ return res;
+}
+
static int create_filter_sbrow(Dav1dFrameContext *const f,
const int pass, Dav1dTask **res_t)
{
@@ -234,16 +271,18 @@ int dav1d_task_create_tile_sbrow(Dav1dFrameContext *const f, const int pass,
Dav1dTask *tasks = f->task_thread.tile_tasks[0];
const int uses_2pass = f->c->n_fc > 1;
const int num_tasks = f->frame_hdr->tiling.cols * f->frame_hdr->tiling.rows;
- int alloc_num_tasks = num_tasks * (1 + uses_2pass);
- if (alloc_num_tasks > f->task_thread.num_tile_tasks) {
- const size_t size = sizeof(Dav1dTask) * alloc_num_tasks;
- tasks = realloc(f->task_thread.tile_tasks[0], size);
- if (!tasks) return -1;
- memset(tasks, 0, size);
- f->task_thread.tile_tasks[0] = tasks;
- f->task_thread.num_tile_tasks = alloc_num_tasks;
+ if (pass < 2) {
+ int alloc_num_tasks = num_tasks * (1 + uses_2pass);
+ if (alloc_num_tasks > f->task_thread.num_tile_tasks) {
+ const size_t size = sizeof(Dav1dTask) * alloc_num_tasks;
+ tasks = realloc(f->task_thread.tile_tasks[0], size);
+ if (!tasks) return -1;
+ memset(tasks, 0, size);
+ f->task_thread.tile_tasks[0] = tasks;
+ f->task_thread.num_tile_tasks = alloc_num_tasks;
+ }
+ f->task_thread.tile_tasks[1] = tasks + num_tasks;
}
- f->task_thread.tile_tasks[1] = tasks + num_tasks;
tasks += num_tasks * (pass & 1);
Dav1dTask *pf_t;
@@ -273,8 +312,22 @@ int dav1d_task_create_tile_sbrow(Dav1dFrameContext *const f, const int pass,
prev_t->next = pf_t;
prev_t = pf_t;
}
- insert_tasks(f, &tasks[0], prev_t, cond_signal);
- f->task_thread.done[pass & 1] = 0;
+ prev_t->next = NULL;
+
+ atomic_store(&f->task_thread.done[pass & 1], 0);
+
+ // XXX in theory this could be done locklessly, at this point they are no
+ // tasks in the frameQ, so no other runner should be using this lock, but
+ // we must add both passes at once
+ pthread_mutex_lock(&f->task_thread.pending_tasks.lock);
+ assert(f->task_thread.pending_tasks.head == NULL || pass == 2);
+ if (!f->task_thread.pending_tasks.head)
+ f->task_thread.pending_tasks.head = &tasks[0];
+ else
+ f->task_thread.pending_tasks.tail->next = &tasks[0];
+ f->task_thread.pending_tasks.tail = prev_t;
+ atomic_store(&f->task_thread.pending_tasks.merge, 1);
+ pthread_mutex_unlock(&f->task_thread.pending_tasks.lock);
return 0;
}
@@ -282,7 +335,7 @@ int dav1d_task_create_tile_sbrow(Dav1dFrameContext *const f, const int pass,
void dav1d_task_frame_init(Dav1dFrameContext *const f) {
const Dav1dContext *const c = f->c;
- f->task_thread.init_done = 0;
+ atomic_store(&f->task_thread.init_done, 0);
// schedule init task, which will schedule the remaining tasks
Dav1dTask *const t = &f->task_thread.init_task;
t->type = DAV1D_TASK_TYPE_INIT;
@@ -317,16 +370,12 @@ static inline int ensure_progress(struct TaskThreadData *const ttd,
// so ensure that completed. if not, re-add to task-queue; else, fall-through
int p1 = atomic_load(state);
if (p1 < t->sby) {
+ t->type = type;
+ t->recon_progress = t->deblock_progress = 0;
+ *target = t->sby;
+ add_pending(f, t);
pthread_mutex_lock(&ttd->lock);
- p1 = atomic_load(state);
- if (p1 < t->sby) {
- t->type = type;
- t->recon_progress = t->deblock_progress = 0;
- *target = t->sby;
- insert_task(f, t, 0);
- return 1;
- }
- pthread_mutex_unlock(&ttd->lock);
+ return 1;
}
return 0;
}
@@ -379,11 +428,29 @@ static inline int check_tile(Dav1dTask *const t, Dav1dFrameContext *const f,
return 0;
}
+static inline int get_frame_progress(const Dav1dContext *const c,
+ const Dav1dFrameContext *const f)
+{
+ unsigned frame_prog = c->n_fc > 1 ? atomic_load(&f->sr_cur.progress[1]) : 0;
+ if (frame_prog >= FRAME_ERROR)
+ return f->sbh - 1;
+ int idx = frame_prog >> (f->sb_shift + 7);
+ int prog;
+ do {
+ atomic_uint *state = &f->frame_thread.frame_progress[idx];
+ const unsigned val = ~atomic_load(state);
+ prog = val ? ctz(val) : 32;
+ if (prog != 32) break;
+ prog = 0;
+ } while (++idx < f->frame_thread.prog_sz);
+ return ((idx << 5) | prog) - 1;
+}
+
static inline void abort_frame(Dav1dFrameContext *const f, const int error) {
atomic_store(&f->task_thread.error, error == DAV1D_ERR(EINVAL) ? 1 : -1);
- f->task_thread.task_counter = 0;
- f->task_thread.done[0] = 1;
- f->task_thread.done[1] = 1;
+ atomic_store(&f->task_thread.task_counter, 0);
+ atomic_store(&f->task_thread.done[0], 1);
+ atomic_store(&f->task_thread.done[1], 1);
atomic_store(&f->sr_cur.progress[0], FRAME_ERROR);
atomic_store(&f->sr_cur.progress[1], FRAME_ERROR);
dav1d_decode_frame_exit(f, error);
@@ -488,6 +555,8 @@ void *dav1d_worker_task(void *data) {
for (;;) {
if (tc->task_thread.die) break;
if (atomic_load(c->flush)) goto park;
+
+ merge_pending(c);
if (ttd->delayed_fg.exec) { // run delayed film grain first
delayed_fg_task(c, ttd);
continue;
@@ -498,11 +567,18 @@ void *dav1d_worker_task(void *data) {
for (unsigned i = 0; i < c->n_fc; i++) {
const unsigned first = atomic_load(&ttd->first);
f = &c->fc[(first + i) % c->n_fc];
- if (f->task_thread.init_done) continue;
+ if (atomic_load(&f->task_thread.init_done)) continue;
t = f->task_thread.task_head;
if (!t) continue;
if (t->type == DAV1D_TASK_TYPE_INIT) goto found;
if (t->type == DAV1D_TASK_TYPE_INIT_CDF) {
+ // XXX This can be a simple else, if adding tasks of both
+ // passes at once (in dav1d_task_create_tile_sbrow).
+ // Adding the tasks to the pending Q can result in a
+ // thread merging them before setting init_done.
+ // We will need to set init_done before adding to the
+ // pending Q, so maybe return the tasks, set init_done,
+ // and add to pending Q only then.
const int p1 = f->in_cdf.progress ?
atomic_load(f->in_cdf.progress) : 1;
if (p1) {
@@ -515,6 +591,7 @@ void *dav1d_worker_task(void *data) {
while (ttd->cur < c->n_fc) { // run decoding tasks last
const unsigned first = atomic_load(&ttd->first);
f = &c->fc[(first + ttd->cur) % c->n_fc];
+ merge_pending_frame(f);
prev_t = f->task_thread.task_cur_prev;
t = prev_t ? prev_t->next : f->task_thread.task_head;
while (t) {
@@ -529,11 +606,12 @@ void *dav1d_worker_task(void *data) {
} else if (t->recon_progress) {
const int p = t->type == DAV1D_TASK_TYPE_ENTROPY_PROGRESS;
int error = atomic_load(&f->task_thread.error);
- assert(!f->task_thread.done[p] || error);
+ assert(!atomic_load(&f->task_thread.done[p]) || error);
const int tile_row_base = f->frame_hdr->tiling.cols *
f->frame_thread.next_tile_row[p];
if (p) {
- const int p1 = f->frame_thread.entropy_progress;
+ atomic_int *const prog = &f->frame_thread.entropy_progress;
+ const int p1 = atomic_load(prog);
if (p1 < t->sby) goto next;
atomic_fetch_or(&f->task_thread.error, p1 == TILE_ERROR);
}
@@ -577,6 +655,7 @@ void *dav1d_worker_task(void *data) {
ttd->cur++;
}
if (reset_task_cur(c, ttd, UINT_MAX)) continue;
+ if (merge_pending(c)) continue;
park:
tc->task_thread.flushed = 1;
pthread_cond_signal(&tc->task_thread.td.cond);
@@ -594,6 +673,7 @@ void *dav1d_worker_task(void *data) {
if (!t->next) f->task_thread.task_tail = prev_t;
if (t->type > DAV1D_TASK_TYPE_INIT_CDF && !f->task_thread.task_head)
ttd->cur++;
+ t->next = NULL;
// we don't need to check cond_signaled here, since we found a task
// after the last signal so we want to re-signal the next waiting thread
// and again won't need to signal after that
@@ -615,13 +695,13 @@ void *dav1d_worker_task(void *data) {
if (res || p1 == TILE_ERROR) {
pthread_mutex_lock(&ttd->lock);
abort_frame(f, res ? res : DAV1D_ERR(EINVAL));
- } else if (!res) {
+ reset_task_cur(c, ttd, t->frame_idx);
+ } else {
t->type = DAV1D_TASK_TYPE_INIT_CDF;
if (p1) goto found_unlocked;
+ add_pending(f, t);
pthread_mutex_lock(&ttd->lock);
- insert_task(f, t, 0);
}
- reset_task_cur(c, ttd, t->frame_idx);
continue;
}
case DAV1D_TASK_TYPE_INIT_CDF: {
@@ -629,7 +709,6 @@ void *dav1d_worker_task(void *data) {
int res = DAV1D_ERR(EINVAL);
if (!atomic_load(&f->task_thread.error))
res = dav1d_decode_frame_init_cdf(f);
- pthread_mutex_lock(&ttd->lock);
if (f->frame_hdr->refresh_context && !f->task_thread.update_set) {
atomic_store(f->out_cdf.progress, res < 0 ? TILE_ERROR : 1);
}
@@ -638,23 +717,34 @@ void *dav1d_worker_task(void *data) {
for (int p = 1; p <= 2; p++) {
const int res = dav1d_task_create_tile_sbrow(f, p, 0);
if (res) {
+ pthread_mutex_lock(&ttd->lock);
// memory allocation failed
- f->task_thread.done[2 - p] = 1;
+ atomic_store(&f->task_thread.done[2 - p], 1);
atomic_store(&f->task_thread.error, -1);
- f->task_thread.task_counter -= f->sbh +
- f->frame_hdr->tiling.cols * f->frame_hdr->tiling.rows;
+ atomic_fetch_sub(&f->task_thread.task_counter,
+ f->frame_hdr->tiling.cols *
+ f->frame_hdr->tiling.rows + f->sbh);
atomic_store(&f->sr_cur.progress[p - 1], FRAME_ERROR);
- if (p == 2 && f->task_thread.done[1]) {
- assert(!f->task_thread.task_counter);
+ if (p == 2 && atomic_load(&f->task_thread.done[1])) {
+ assert(!atomic_load(&f->task_thread.task_counter));
dav1d_decode_frame_exit(f, DAV1D_ERR(ENOMEM));
f->n_tile_data = 0;
pthread_cond_signal(&f->task_thread.cond);
+ atomic_store(&f->task_thread.init_done, 1);
+ continue;
+ } else {
+ pthread_mutex_unlock(&ttd->lock);
}
}
}
- } else abort_frame(f, res);
- reset_task_cur(c, ttd, t->frame_idx);
- f->task_thread.init_done = 1;
+ atomic_store(&f->task_thread.init_done, 1);
+ pthread_mutex_lock(&ttd->lock);
+ } else {
+ pthread_mutex_lock(&ttd->lock);
+ abort_frame(f, res);
+ reset_task_cur(c, ttd, t->frame_idx);
+ atomic_store(&f->task_thread.init_done, 1);
+ }
continue;
}
case DAV1D_TASK_TYPE_TILE_ENTROPY:
@@ -683,10 +773,9 @@ void *dav1d_worker_task(void *data) {
pthread_cond_signal(&ttd->cond);
goto found_unlocked;
}
- pthread_mutex_lock(&ttd->lock);
atomic_store(&ts->progress[p], progress);
- reset_task_cur(c, ttd, t->frame_idx);
- insert_task(f, t, 0);
+ add_pending(f, t);
+ pthread_mutex_lock(&ttd->lock);
} else {
pthread_mutex_lock(&ttd->lock);
atomic_store(&ts->progress[p], progress);
@@ -702,15 +791,16 @@ void *dav1d_worker_task(void *data) {
if (c->n_fc > 1)
atomic_store(f->out_cdf.progress, error ? TILE_ERROR : 1);
}
- if (!--f->task_thread.task_counter && f->task_thread.done[0] &&
- (!uses_2pass || f->task_thread.done[1]))
+ if (atomic_fetch_sub(&f->task_thread.task_counter, 1) - 1 == 0 &&
+ atomic_load(&f->task_thread.done[0]) &&
+ (!uses_2pass || atomic_load(&f->task_thread.done[1])))
{
dav1d_decode_frame_exit(f, error == 1 ? DAV1D_ERR(EINVAL) :
error ? DAV1D_ERR(ENOMEM) : 0);
f->n_tile_data = 0;
pthread_cond_signal(&f->task_thread.cond);
}
- assert(f->task_thread.task_counter >= 0);
+ assert(atomic_load(&f->task_thread.task_counter) >= 0);
if (!atomic_fetch_or(&ttd->cond_signaled, 1))
pthread_cond_signal(&ttd->cond);
}
@@ -744,15 +834,11 @@ void *dav1d_worker_task(void *data) {
if (sby) {
int prog = atomic_load(&f->frame_thread.copy_lpf_progress[(sby - 1) >> 5]);
if (~prog & (1U << ((sby - 1) & 31))) {
+ t->type = DAV1D_TASK_TYPE_CDEF;
+ t->recon_progress = t->deblock_progress = 0;
+ add_pending(f, t);
pthread_mutex_lock(&ttd->lock);
- prog = atomic_load(&f->frame_thread.copy_lpf_progress[(sby - 1) >> 5]);
- if (~prog & (1U << ((sby - 1) & 31))) {
- t->type = DAV1D_TASK_TYPE_CDEF;
- t->recon_progress = t->deblock_progress = 0;
- insert_task(f, t, 0);
- continue;
- }
- pthread_mutex_unlock(&ttd->lock);
+ continue;
}
}
}
@@ -786,40 +872,53 @@ void *dav1d_worker_task(void *data) {
const int uses_2pass = c->n_fc > 1;
const int sbh = f->sbh;
const int sbsz = f->sb_step * 4;
- const enum PlaneType progress_plane_type =
- t->type == DAV1D_TASK_TYPE_ENTROPY_PROGRESS ? PLANE_TYPE_BLOCK :
- c->n_fc > 1 ? PLANE_TYPE_Y : PLANE_TYPE_ALL;
- if (t->type != DAV1D_TASK_TYPE_ENTROPY_PROGRESS)
- atomic_fetch_or(&f->frame_thread.frame_progress[sby >> 5],
- 1U << (sby & 31));
- pthread_mutex_lock(&ttd->lock);
- if (t->type != DAV1D_TASK_TYPE_ENTROPY_PROGRESS) {
- unsigned frame_prog = c->n_fc > 1 ? atomic_load(&f->sr_cur.progress[1]) : 0;
- if (frame_prog < FRAME_ERROR) {
- int idx = frame_prog >> (f->sb_shift + 7);
- int prog;
- do {
- atomic_uint *state = &f->frame_thread.frame_progress[idx];
- const unsigned val = ~atomic_load(state);
- prog = val ? ctz(val) : 32;
- if (prog != 32) break;
- prog = 0;
- } while (++idx < f->frame_thread.prog_sz);
- sby = ((idx << 5) | prog) - 1;
- } else sby = sbh - 1;
+ if (t->type == DAV1D_TASK_TYPE_ENTROPY_PROGRESS) {
+ error = atomic_load(&f->task_thread.error);
+ const unsigned y = sby + 1 == sbh ? UINT_MAX : (unsigned)(sby + 1) * sbsz;
+ assert(c->n_fc > 1);
+ if (f->sr_cur.p.data[0] /* upon flush, this can be free'ed already */)
+ atomic_store(&f->sr_cur.progress[0], error ? FRAME_ERROR : y);
+ atomic_store(&f->frame_thread.entropy_progress,
+ error ? TILE_ERROR : sby + 1);
+ if (sby + 1 == sbh)
+ atomic_store(&f->task_thread.done[1], 1);
+ pthread_mutex_lock(&ttd->lock);
+ const int num_tasks = atomic_fetch_sub(&f->task_thread.task_counter, 1) - 1;
+ if (sby + 1 < sbh && num_tasks) {
+ reset_task_cur(c, ttd, t->frame_idx);
+ continue;
+ }
+ if (!num_tasks && atomic_load(&f->task_thread.done[0]) &&
+ atomic_load(&f->task_thread.done[1]))
+ {
+ dav1d_decode_frame_exit(f, error == 1 ? DAV1D_ERR(EINVAL) :
+ error ? DAV1D_ERR(ENOMEM) : 0);
+ f->n_tile_data = 0;
+ pthread_cond_signal(&f->task_thread.cond);
+ }
+ reset_task_cur(c, ttd, t->frame_idx);
+ continue;
}
+ // t->type != DAV1D_TASK_TYPE_ENTROPY_PROGRESS
+ atomic_fetch_or(&f->frame_thread.frame_progress[sby >> 5],
+ 1U << (sby & 31));
+ pthread_mutex_lock(&f->task_thread.lock);
+ sby = get_frame_progress(c, f);
error = atomic_load(&f->task_thread.error);
const unsigned y = sby + 1 == sbh ? UINT_MAX : (unsigned)(sby + 1) * sbsz;
- if (c->n_fc > 1 && f->sr_cur.p.data[0] /* upon flush, this can be free'ed already */) {
- const int idx = t->type != DAV1D_TASK_TYPE_ENTROPY_PROGRESS;
- atomic_store(&f->sr_cur.progress[idx], error ? FRAME_ERROR : y);
- }
- if (progress_plane_type == PLANE_TYPE_BLOCK)
- f->frame_thread.entropy_progress = error ? TILE_ERROR : sby + 1;
+ if (c->n_fc > 1 && f->sr_cur.p.data[0] /* upon flush, this can be free'ed already */)
+ atomic_store(&f->sr_cur.progress[1], error ? FRAME_ERROR : y);
+ pthread_mutex_unlock(&f->task_thread.lock);
if (sby + 1 == sbh)
- f->task_thread.done[progress_plane_type == PLANE_TYPE_BLOCK] = 1;
- if (!--f->task_thread.task_counter &&
- f->task_thread.done[0] && (!uses_2pass || f->task_thread.done[1]))
+ atomic_store(&f->task_thread.done[0], 1);
+ pthread_mutex_lock(&ttd->lock);
+ const int num_tasks = atomic_fetch_sub(&f->task_thread.task_counter, 1) - 1;
+ if (sby + 1 < sbh && num_tasks) {
+ reset_task_cur(c, ttd, t->frame_idx);
+ continue;
+ }
+ if (!num_tasks && atomic_load(&f->task_thread.done[0]) &&
+ (!uses_2pass || atomic_load(&f->task_thread.done[1])))
{
dav1d_decode_frame_exit(f, error == 1 ? DAV1D_ERR(EINVAL) :
error ? DAV1D_ERR(ENOMEM) : 0);