diff options
author | François Laignel <fengalin@free.fr> | 2022-08-19 15:50:40 +0300 |
---|---|---|
committer | Sebastian Dröge <slomo@coaxion.net> | 2022-09-13 10:29:50 +0300 |
commit | d39aabe05477b858adc6bd32f7da90c505894ad1 (patch) | |
tree | bf821a5639738286b5992682d92a06596a6c6649 /generic | |
parent | af12bce141f5f3ec03e0128ef34a203fa7df9691 (diff) |
ts/Task: don't drain sub tasks after state transition and iteration
Subtasks are used when current async processing needs to execute
a `Future` via a sync function (eg. a call to a C function).
In this case `Context::block_on` would block the whole `Context`,
leading to a deadlock.
The main use case for this is the `Pad{Src,Sink}` functions:
when we `PadSrc::push` and the peer pad is a `PadSink`, we want
`PadSrc::push` to complete after the async function on the
`PadSink` completes. In this case the `PadSink` async function
is added as a subtask of current scheduler task and
`PadSrc::push` only returns when the subtask is executed.
In `runtime::Task` (`Task` here is the execution Task with a
state machine, not a scheduler task), we used to spawn state
transition actions and iteration loop (leading to a new
scheduler Task). At the time, it seemed convenient for the user
to automatically drain sub tasks after a state transition action
or an iteration. User wouldn't have to worry about this, similarly
to the `Pad{Src,Sink}` case.
In current implementation, the `Task` state machine now operates
directly on the target `Context`. State transtions actions and
the iteration loop are no longer spawned. It seems now useless to
abstract the subtasks draining from the user. Either they
transitively use a mechanism such as `Pad{Src,Sink}` which already
handles this automatically, or they add substasks on purpose, in
which case they know better when subtasks must be drained.
Diffstat (limited to 'generic')
-rw-r--r-- | generic/threadshare/src/runtime/task.rs | 54 |
1 files changed, 17 insertions, 37 deletions
diff --git a/generic/threadshare/src/runtime/task.rs b/generic/threadshare/src/runtime/task.rs index 7bbb35c4e..2277df941 100644 --- a/generic/threadshare/src/runtime/task.rs +++ b/generic/threadshare/src/runtime/task.rs @@ -28,7 +28,6 @@ use futures::prelude::*; use std::fmt; use std::ops::Deref; use std::pin::Pin; -use std::stringify; use std::sync::{Arc, Mutex, MutexGuard}; use std::task::Poll; @@ -376,8 +375,7 @@ pub trait TaskImpl: Send + 'static { /// Handles an error occuring during the execution of the `Task` loop. /// - /// This include errors returned by [`Self::try_next`], [`Self::handle_item`] - /// as well as errors returned by subtasks drain at the end of an iteration. + /// This include errors returned by [`Self::try_next`] & [`Self::handle_item`]. /// /// If the error is unrecoverable, implementations might use /// `gst::Element::post_error_message` and return `Trigger::Error`. @@ -817,27 +815,10 @@ struct StateMachine<Item: Send + 'static> { pending_triggering_evt: Option<TriggeringEvent>, } -// Make sure the Context doesn't throttle otherwise we end up with long delays executing -// transition actions in a pipeline with many elements. This is because pipeline serializes -// the transition actions and the Context's scheduler gets a chance to reach its throttling -// state between 2 elements. - macro_rules! exec_action { ($self:ident, $action:ident, $triggering_evt:expr, $origin:expr, $task_inner:expr) => {{ match $self.task_impl.$action().await { - Ok(()) => { - gst::trace!(RUNTIME_CAT, "Draining subtasks for {}", stringify!($action)); - if let Err(err) = Context::drain_sub_tasks().await { - gst::log!( - RUNTIME_CAT, - "{} subtask returned {:?}", - stringify!($action), - err - ); - } - - Ok($triggering_evt) - } + Ok(()) => Ok($triggering_evt), Err(err) => { // FIXME problem is that we loose the origin trigger in the // final TransitionStatus. @@ -1093,11 +1074,6 @@ impl<Item: Send + 'static> StateMachine<Item> { // Unprepare is not joined by an ack_rx but by joining the state machine handle self.task_impl.unprepare().await; - gst::trace!(RUNTIME_CAT, "Draining subtasks for unprepare"); - if let Err(err) = Context::drain_sub_tasks().await { - gst::log!(RUNTIME_CAT, "unprepare subtask returned {:?}", err); - } - task_inner .lock() .unwrap() @@ -1174,11 +1150,6 @@ impl<Item: Send + 'static> StateMachine<Item> { gst::debug!(RUNTIME_CAT, "TaskImpl::handle_item returned {:?}", err); err })?; - - gst::trace!(RUNTIME_CAT, "Draining subtasks after Task loop iteration"); - if let Err(err) = Context::drain_sub_tasks().await { - gst::debug!(RUNTIME_CAT, "Task iteration subtask returned {:?}", err); - } } } } @@ -1187,10 +1158,19 @@ impl<Item: Send + 'static> StateMachine<Item> { mod tests { use futures::channel::{mpsc, oneshot}; use futures::executor::block_on; + use futures::future::BoxFuture; + use futures::prelude::*; use std::time::Duration; - use super::{TaskState::*, TransitionOk::*, TransitionStatus::*, Trigger::*, *}; - use crate::runtime::Context; + use super::{ + Task, TaskImpl, + TaskState::{self, *}, + TransitionError, + TransitionOk::*, + TransitionStatus::*, + Trigger::{self, *}, + }; + use crate::runtime::{Context, RUNTIME_CAT}; #[track_caller] fn stop_then_unprepare(task: Task) { @@ -1534,7 +1514,7 @@ mod tests { err ); match (trigger, state) { - (Trigger::Prepare, TaskState::Unprepared) => { + (Prepare, Unprepared) => { self.prepare_error_sender.send(()).await.unwrap(); } other => unreachable!("{:?}", other), @@ -1772,7 +1752,7 @@ mod tests { err ); match (trigger, state) { - (Trigger::Prepare, TaskState::Unprepared) => { + (Prepare, Unprepared) => { self.prepare_error_sender.send(()).await.unwrap(); } other => panic!("action error for {:?}", other), @@ -2789,11 +2769,11 @@ mod tests { gst::debug!(RUNTIME_CAT, "flush_concurrent_start: // start"); match block_on(task.start()) { - Ok(TransitionOk::Complete { + Ok(Complete { origin: Paused, target: Started, }) => (), - Ok(TransitionOk::Complete { + Ok(Complete { origin: PausedFlushing, target: Flushing, }) => (), |