Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.freedesktop.org/gstreamer/gst-plugins-rs.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorFrançois Laignel <fengalin@free.fr>2022-03-21 14:40:28 +0300
committerSebastian Dröge <slomo@coaxion.net>2022-03-28 11:47:32 +0300
commitc1615d01e6e55b5c65cf1f988c9f6af73e9ab103 (patch)
tree8ada26553efd80fc1bb66cecb7b7831a84f343aa /generic/threadshare/src/runtime
parent97985d6442cbff62f8c8ba3f2fc679ee8df4b487 (diff)
ts/rt/Task: awake the iteration loop when it needs to be aborted
When the iteration loop is throttling, the call to `abort` on the `loop_abort_handle` returns immediately, but the actual `Future` for the iteration loop is aborted only when the scheduler throttling completes. State transitions which requires the loop to be aborted & which are serialized at the pipeline level can incur long delays. This commit makes sure the Task Context's scheduler is awaken as soon as the task loop is aborted.
Diffstat (limited to 'generic/threadshare/src/runtime')
-rw-r--r--generic/threadshare/src/runtime/executor/context.rs11
-rw-r--r--generic/threadshare/src/runtime/executor/scheduler.rs4
-rw-r--r--generic/threadshare/src/runtime/task.rs61
3 files changed, 49 insertions, 27 deletions
diff --git a/generic/threadshare/src/runtime/executor/context.rs b/generic/threadshare/src/runtime/executor/context.rs
index fabd09370..17a8c840f 100644
--- a/generic/threadshare/src/runtime/executor/context.rs
+++ b/generic/threadshare/src/runtime/executor/context.rs
@@ -249,6 +249,17 @@ impl Context {
self.0.spawn_and_awake(future)
}
+ /// Forces the scheduler to wake up.
+ ///
+ /// This is not needed by elements implementors as they are
+ /// supposed to call [`Self::spawn_and_awake`] when needed.
+ /// However, it's useful for lower level implementations such as
+ /// `runtime::Task` so as to make sure the iteration loop yields
+ /// as soon as possible when a transition is requested.
+ pub(in crate::runtime) fn wake_up(&self) {
+ self.0.wake_up();
+ }
+
pub fn current_has_sub_tasks() -> bool {
let (ctx, task_id) = match Context::current_task() {
Some(task) => task,
diff --git a/generic/threadshare/src/runtime/executor/scheduler.rs b/generic/threadshare/src/runtime/executor/scheduler.rs
index dab53de02..d6d76be23 100644
--- a/generic/threadshare/src/runtime/executor/scheduler.rs
+++ b/generic/threadshare/src/runtime/executor/scheduler.rs
@@ -420,6 +420,10 @@ impl Handle {
JoinHandle::new(task_id, task, self)
}
+ pub(super) fn wake_up(&self) {
+ self.0.scheduler.wake_up();
+ }
+
pub fn remove_soure(&self, source: Arc<Source>) {
if self
.0
diff --git a/generic/threadshare/src/runtime/task.rs b/generic/threadshare/src/runtime/task.rs
index e729a6fc4..8b7be397b 100644
--- a/generic/threadshare/src/runtime/task.rs
+++ b/generic/threadshare/src/runtime/task.rs
@@ -355,6 +355,25 @@ impl TaskInner {
Ok(ack_rx)
}
+
+ /// Aborts the task iteration loop ASAP.
+ ///
+ /// When the iteration loop is throttling, the call to `abort`
+ /// on the `loop_abort_handle` returns immediately, but the
+ /// actual `Future` for the iteration loop is aborted only when
+ /// the scheduler throttling completes.
+ ///
+ /// This function aborts the task iteration loop and awakes the
+ /// iteration scheduler.
+ fn abort_task_loop(&mut self) {
+ if let Some(loop_abort_handle) = self.loop_abort_handle.take() {
+ loop_abort_handle.abort();
+
+ if let Some(context) = self.context.as_ref() {
+ context.wake_up();
+ }
+ }
+ }
}
impl Drop for TaskInner {
@@ -489,9 +508,7 @@ impl Task {
inner.state = TaskState::Unpreparing;
- if let Some(loop_abort_handle) = inner.loop_abort_handle.take() {
- loop_abort_handle.abort();
- }
+ inner.abort_task_loop();
let _ = inner.trigger(Trigger::Unprepare).unwrap();
let triggering_evt_tx = inner.triggering_evt_tx.take().unwrap();
@@ -579,42 +596,32 @@ impl Task {
}
pub fn flush_start(&self) -> Result<TransitionStatus, TransitionError> {
- let mut inner = self.0.lock().unwrap();
-
- if let Some(loop_abort_handle) = inner.loop_abort_handle.take() {
- loop_abort_handle.abort();
- }
-
- Self::push_and_await_transition(inner, Trigger::FlushStart)
+ self.abort_push_wakeup_await(Trigger::FlushStart)
}
pub fn flush_stop(&self) -> Result<TransitionStatus, TransitionError> {
- let mut inner = self.0.lock().unwrap();
-
- if let Some(loop_abort_handle) = inner.loop_abort_handle.take() {
- loop_abort_handle.abort();
- }
-
- Self::push_and_await_transition(inner, Trigger::FlushStop)
+ self.abort_push_wakeup_await(Trigger::FlushStop)
}
/// Stops the `Started` `Task` and wait for it to finish.
pub fn stop(&self) -> Result<TransitionStatus, TransitionError> {
- let mut inner = self.0.lock().unwrap();
-
- if let Some(loop_abort_handle) = inner.loop_abort_handle.take() {
- loop_abort_handle.abort();
- }
-
- Self::push_and_await_transition(inner, Trigger::Stop)
+ self.abort_push_wakeup_await(Trigger::Stop)
}
- fn push_and_await_transition(
- mut inner: MutexGuard<TaskInner>,
+ /// Pushes a [`Trigger`] which requires the iteration loop to abort ASAP.
+ ///
+ /// This function:
+ /// - Makes sure the iteration loop aborts as soon as possible.
+ /// - Pushes the provided [`Trigger`].
+ /// - Awaits for the expected transition as usual.
+ fn abort_push_wakeup_await(
+ &self,
trigger: Trigger,
) -> Result<TransitionStatus, TransitionError> {
- let ack_rx = inner.trigger(trigger)?;
+ let mut inner = self.0.lock().unwrap();
+ inner.abort_task_loop();
+ let ack_rx = inner.trigger(trigger)?;
Self::await_ack(inner, ack_rx, trigger)
}