From 3e7886db54d0cb3ce32909c71ad2a8c9d9eab223 Mon Sep 17 00:00:00 2001 From: Victorien Le Couviour--Tuffet Date: Thu, 6 Oct 2022 11:14:06 +0200 Subject: threading: Fix a race around frame completion (frame-mt) The completion of the first frame to decode while an async reset request on that same frame is pending will render it stale. The processing of such a stale request is likely to result in a hang. One reason this happens is the skip condition at the beginning of reset_task_cur(). => Consume the async request before that check. Another reason is several threads producing async reset requests in parallel: an async request for the first frame could cascade through the other threads (other frames) during completion of that frame, meaning not being caught by the last synchronous reset_task_cur() after signaling the main thread and before releasing the lock. => To solve this we need to add protections at the racy locations. That means after we increase first, before returning from reset_task_cur_async(), and after consuming the async request. --- src/decode.c | 5 ++++- src/lib.c | 5 ++++- src/obu.c | 5 ++++- src/thread_task.c | 13 +++++++++++-- 4 files changed, 23 insertions(+), 5 deletions(-) diff --git a/src/decode.c b/src/decode.c index 90cb0e4..be9320e 100644 --- a/src/decode.c +++ b/src/decode.c @@ -3508,10 +3508,13 @@ int dav1d_submit_frame(Dav1dContext *const c) { &c->task_thread.lock); out_delayed = &c->frame_thread.out_delayed[next]; if (out_delayed->p.data[0] || atomic_load(&f->task_thread.error)) { - if (atomic_load(&c->task_thread.first) + 1U < c->n_fc) + unsigned first = atomic_load(&c->task_thread.first); + if (first + 1U < c->n_fc) atomic_fetch_add(&c->task_thread.first, 1U); else atomic_store(&c->task_thread.first, 0); + atomic_compare_exchange_strong(&c->task_thread.reset_task_cur, + &first, UINT_MAX); if (c->task_thread.cur && c->task_thread.cur < c->n_fc) c->task_thread.cur--; } diff --git a/src/lib.c b/src/lib.c index 8f0b75a..f60b7f8 100644 --- a/src/lib.c +++ b/src/lib.c @@ -393,10 +393,13 @@ static int drain_picture(Dav1dContext *const c, Dav1dPicture *const out) { Dav1dThreadPicture *const out_delayed = &c->frame_thread.out_delayed[next]; if (out_delayed->p.data[0] || atomic_load(&f->task_thread.error)) { - if (atomic_load(&c->task_thread.first) + 1U < c->n_fc) + unsigned first = atomic_load(&c->task_thread.first); + if (first + 1U < c->n_fc) atomic_fetch_add(&c->task_thread.first, 1U); else atomic_store(&c->task_thread.first, 0); + atomic_compare_exchange_strong(&c->task_thread.reset_task_cur, + &first, UINT_MAX); if (c->task_thread.cur && c->task_thread.cur < c->n_fc) c->task_thread.cur--; } diff --git a/src/obu.c b/src/obu.c index 2b04da3..bd11794 100644 --- a/src/obu.c +++ b/src/obu.c @@ -1581,10 +1581,13 @@ int dav1d_parse_obus(Dav1dContext *const c, Dav1dData *const in, const int globa Dav1dThreadPicture *const out_delayed = &c->frame_thread.out_delayed[next]; if (out_delayed->p.data[0] || atomic_load(&f->task_thread.error)) { - if (atomic_load(&c->task_thread.first) + 1U < c->n_fc) + unsigned first = atomic_load(&c->task_thread.first); + if (first + 1U < c->n_fc) atomic_fetch_add(&c->task_thread.first, 1U); else atomic_store(&c->task_thread.first, 0); + atomic_compare_exchange_strong(&c->task_thread.reset_task_cur, + &first, UINT_MAX); if (c->task_thread.cur && c->task_thread.cur < c->n_fc) c->task_thread.cur--; } diff --git a/src/thread_task.c b/src/thread_task.c index 5ddc05c..90244ec 100644 --- a/src/thread_task.c +++ b/src/thread_task.c @@ -49,9 +49,13 @@ static inline int reset_task_cur(const Dav1dContext *const c, unsigned frame_idx) { const unsigned first = atomic_load(&ttd->first); + unsigned reset_frame_idx = atomic_exchange(&ttd->reset_task_cur, UINT_MAX); + if (reset_frame_idx < first) { + if (frame_idx == UINT_MAX) return 0; + reset_frame_idx = UINT_MAX; + } if (!ttd->cur && c->fc[first].task_thread.task_cur_prev == NULL) return 0; - unsigned reset_frame_idx = atomic_exchange(&ttd->reset_task_cur, UINT_MAX); if (reset_frame_idx != UINT_MAX) { if (frame_idx == UINT_MAX) { if (reset_frame_idx > first + ttd->cur) @@ -78,12 +82,17 @@ cur_found: static inline void reset_task_cur_async(struct TaskThreadData *const ttd, unsigned frame_idx, unsigned n_frames) { - if (frame_idx < (unsigned)atomic_load(&ttd->first)) frame_idx += n_frames; + const unsigned first = atomic_load(&ttd->first); + if (frame_idx < first) frame_idx += n_frames; unsigned last_idx = frame_idx; do { frame_idx = last_idx; last_idx = atomic_exchange(&ttd->reset_task_cur, frame_idx); } while (last_idx < frame_idx); + if (frame_idx == first && atomic_load(&ttd->first) != first) { + unsigned expected = frame_idx; + atomic_compare_exchange_strong(&ttd->reset_task_cur, &expected, UINT_MAX); + } } static void insert_tasks_between(Dav1dFrameContext *const f, -- cgit v1.2.3