Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.freedesktop.org/gstreamer/gst-plugins-rs.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorFrançois Laignel <fengalin@free.fr>2022-09-11 20:57:36 +0300
committerSebastian Dröge <slomo@coaxion.net>2022-09-13 10:29:50 +0300
commit1be30b8ecc94167f2a0cf4c05408311fdc1ab3b2 (patch)
tree13b4e7779e6a7faa253b2958723df5441e38b9c0 /generic
parentab327be9af44e73dd9572e9c30e1368a41929013 (diff)
ts/scheduler: fix shutdown
A strong handle reference was held in the `block_on_priv` `Result` handler in the thread for the `Scheduler::start` code path, which lead to the `Handler` strong count not dropping to 0 when it should, leading to the shutdown request not being triggered. Use an Arc<AtomicBool> instead of a oneshot channel for shutdown. The main Future is always polled and never relies on a waker, a `poll_fn` is cheap and does the job. Unpark the scheduler after posting a request to shutdown.
Diffstat (limited to 'generic')
-rw-r--r--generic/threadshare/src/runtime/executor/scheduler.rs116
1 files changed, 54 insertions, 62 deletions
diff --git a/generic/threadshare/src/runtime/executor/scheduler.rs b/generic/threadshare/src/runtime/executor/scheduler.rs
index 4ce1c008e..bf7c65c47 100644
--- a/generic/threadshare/src/runtime/executor/scheduler.rs
+++ b/generic/threadshare/src/runtime/executor/scheduler.rs
@@ -3,7 +3,7 @@
//
// Take a look at the license at the top of the repository in the LICENSE file.
-use futures::channel::oneshot;
+use futures::future::poll_fn;
use futures::pin_mut;
use gio::glib::clone::Downgrade;
@@ -12,7 +12,8 @@ use std::cell::RefCell;
use std::future::Future;
use std::panic;
#[cfg(feature = "tuning")]
-use std::sync::atomic::{AtomicU64, Ordering};
+use std::sync::atomic::AtomicU64;
+use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc as sync_mpsc;
use std::sync::{Arc, Condvar, Mutex, Weak};
use std::task::Poll;
@@ -49,7 +50,6 @@ impl Scheduler {
let thread = thread::Builder::new().name(context_name.to_string());
let (handle_sender, handle_receiver) = sync_mpsc::channel();
- let (shutdown_sender, shutdown_receiver) = oneshot::channel();
let context_name = Arc::from(context_name);
let thread_ctx_name = Arc::clone(&context_name);
let join = thread
@@ -62,9 +62,20 @@ impl Scheduler {
let handle = Scheduler::init(Arc::clone(&thread_ctx_name), max_throttling);
let this = Arc::clone(&handle.0.scheduler);
- handle_sender.send(handle.clone()).unwrap();
+ let must_shutdown = handle.0.must_shutdown.clone();
+ let handle_weak = handle.downgrade();
+ handle_sender.send(handle).unwrap();
+
+ let shutdown_fut = poll_fn(move |_| {
+ if must_shutdown.load(Ordering::SeqCst) {
+ Poll::Ready(())
+ } else {
+ Poll::Pending
+ }
+ });
- match this.block_on_priv(shutdown_receiver) {
+ // Blocking on `shutdown_fut` which is cheap to `poll`.
+ match this.block_on_priv(shutdown_fut) {
Ok(_) => {
gst::debug!(
RUNTIME_CAT,
@@ -79,9 +90,8 @@ impl Scheduler {
thread_ctx_name
);
- // We are shutting down on our own initiative
- if let Ok(mut shutdown) = handle.0.shutdown.lock() {
- shutdown.clear();
+ if let Some(handle) = handle_weak.upgrade() {
+ handle.self_shutdown();
}
panic::resume_unwind(e);
@@ -91,7 +101,7 @@ impl Scheduler {
.expect("Failed to spawn Scheduler thread");
let handle = handle_receiver.recv().expect("Context thread init failed");
- handle.set_shutdown(shutdown_sender, join);
+ handle.set_join_handle(join);
handle
}
@@ -132,9 +142,11 @@ impl Scheduler {
!Scheduler::is_scheduler_thread(),
"Attempt to block within an existing Scheduler thread."
);
+
let handle = Scheduler::init(Scheduler::DUMMY_NAME.into(), Duration::ZERO);
let this = Arc::clone(&handle.0.scheduler);
+ // Move the (only) handle for this scheduler in the main task.
let (task_id, task) = this.tasks.add(async move {
let res = future.await;
@@ -154,6 +166,7 @@ impl Scheduler {
);
});
+ // Blocking on `task` which is cheap to `poll`.
match this.block_on_priv(task) {
Ok(res) => res,
Err(e) => {
@@ -168,14 +181,21 @@ impl Scheduler {
}
}
- fn block_on_priv<F>(&self, future: F) -> std::thread::Result<F::Output>
+ // Important: the `termination_future` MUST be cheap to poll.
+ //
+ // Examples of appropriate `termination_future` are:
+ //
+ // - an `executor::Task` returned by `self.tasks.add(..)`.
+ // - a `JoinHandle` returned by `Handle::spawn`.
+ // - a custom future with few cycles (ex. checking an `AtomicBool`).
+ fn block_on_priv<F>(&self, termination_future: F) -> std::thread::Result<F::Output>
where
F: Future + Send + 'static,
F::Output: Send + 'static,
{
let waker = waker_fn(|| ());
let cx = &mut std::task::Context::from_waker(&waker);
- pin_mut!(future);
+ pin_mut!(termination_future);
let _guard = CallOnDrop::new(|| Scheduler::close(Arc::clone(&self.context_name)));
@@ -191,7 +211,7 @@ impl Scheduler {
Reactor::with_mut(|reactor| reactor.react(now).ok());
}
- if let Poll::Ready(t) = future.as_mut().poll(cx) {
+ if let Poll::Ready(t) = termination_future.as_mut().poll(cx) {
return Ok(t);
}
@@ -210,15 +230,6 @@ impl Scheduler {
tasks_checked += 1;
} else {
- // No more ready tasks.
- if tasks_checked > 0 {
- // Check if the main future is ready before parking.
- if let Poll::Ready(t) = future.as_mut().poll(cx) {
- return Ok(t);
- }
- }
- // else: main future has just been checked.
-
let mut must_unpark = self.must_unpark.lock().unwrap();
loop {
if *must_unpark {
@@ -297,51 +308,38 @@ impl Scheduler {
}
}
-impl Drop for Scheduler {
- fn drop(&mut self) {
- gst::debug!(
- RUNTIME_CAT,
- "Terminated: Scheduler for Context {}",
- self.context_name
- );
- }
-}
-
#[derive(Debug)]
-struct SchedulerShutdown {
+struct HandleInner {
scheduler: Arc<Scheduler>,
- sender: Option<oneshot::Sender<()>>,
- join: Option<thread::JoinHandle<()>>,
+ must_shutdown: Arc<AtomicBool>,
+ join: Mutex<Option<thread::JoinHandle<()>>>,
}
-impl SchedulerShutdown {
+impl HandleInner {
fn new(scheduler: Arc<Scheduler>) -> Self {
- SchedulerShutdown {
+ HandleInner {
scheduler,
- sender: None,
- join: None,
+ must_shutdown: Default::default(),
+ join: Default::default(),
}
}
-
- fn clear(&mut self) {
- self.sender = None;
- self.join = None;
- }
}
-impl Drop for SchedulerShutdown {
+impl Drop for HandleInner {
fn drop(&mut self) {
- if let Some(sender) = self.sender.take() {
- gst::debug!(
+ if !self.must_shutdown.fetch_or(true, Ordering::SeqCst) {
+ // Was not already shutting down.
+ self.scheduler.unpark();
+
+ gst::trace!(
RUNTIME_CAT,
"Shutting down Scheduler thread for Context {}",
self.scheduler.context_name
);
- drop(sender);
// Don't block shutting down itself
if !self.scheduler.is_current() {
- if let Some(join_handler) = self.join.take() {
+ if let Some(join_handler) = self.join.lock().unwrap().take() {
gst::trace!(
RUNTIME_CAT,
"Waiting for Scheduler thread to shutdown for Context {}",
@@ -355,12 +353,6 @@ impl Drop for SchedulerShutdown {
}
}
-#[derive(Debug)]
-struct HandleInner {
- scheduler: Arc<Scheduler>,
- shutdown: Mutex<SchedulerShutdown>,
-}
-
#[derive(Clone, Debug)]
pub(super) struct HandleWeak(Weak<HandleInner>);
@@ -375,16 +367,16 @@ pub(super) struct Handle(Arc<HandleInner>);
impl Handle {
fn new(scheduler: Arc<Scheduler>) -> Self {
- Handle(Arc::new(HandleInner {
- shutdown: Mutex::new(SchedulerShutdown::new(Arc::clone(&scheduler))),
- scheduler,
- }))
+ Handle(Arc::new(HandleInner::new(scheduler)))
+ }
+
+ fn set_join_handle(&self, join: thread::JoinHandle<()>) {
+ *self.0.join.lock().unwrap() = Some(join);
}
- fn set_shutdown(&self, sender: oneshot::Sender<()>, join: thread::JoinHandle<()>) {
- let mut shutdown = self.0.shutdown.lock().unwrap();
- shutdown.sender = Some(sender);
- shutdown.join = Some(join);
+ fn self_shutdown(self) {
+ self.0.must_shutdown.store(true, Ordering::SeqCst);
+ *self.0.join.lock().unwrap() = None;
}
pub fn context_name(&self) -> &str {