Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.freedesktop.org/gstreamer/gst-plugins-rs.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorFrançois Laignel <fengalin@free.fr>2022-07-26 10:23:52 +0300
committerFrançois Laignel <fengalin@free.fr>2022-08-09 20:48:06 +0300
commit625fce3934470698abd7facfe4dba1804a84fa3b (patch)
tree6a6734ddc8547b74460758287aa91047fbdd03f3 /generic/threadshare/src
parent0858dfedb4bcc77c412c88e679b5dd1941fb7dcb (diff)
ts/Task: spawn StateMachine on ts Context
Task state machines used to execute in an executor from the Futures crate. State transitions actions and iteration functions were then spawned on the target threadshare Context. This commit directly spawns the task state machine on the threadshare Context. This simplifies code a bit and paves the way for the changes described in [1]. Also introduces struct `StateMachineHandle`, which gather together fields to communicate and synchronize with the StateMachine. Renamed `StateMachine::run` as `spawn` and return `StateMachineHandle`. [1]: https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/793#note_1464400
Diffstat (limited to 'generic/threadshare/src')
-rw-r--r--generic/threadshare/src/runtime/task.rs1124
1 files changed, 562 insertions, 562 deletions
diff --git a/generic/threadshare/src/runtime/task.rs b/generic/threadshare/src/runtime/task.rs
index 37ca756ce..d3fdf39f8 100644
--- a/generic/threadshare/src/runtime/task.rs
+++ b/generic/threadshare/src/runtime/task.rs
@@ -22,7 +22,7 @@
use futures::channel::mpsc as async_mpsc;
use futures::channel::oneshot;
-use futures::future::{self, abortable, AbortHandle, Aborted, BoxFuture, RemoteHandle};
+use futures::future::{self, abortable, AbortHandle, Aborted, BoxFuture};
use futures::prelude::*;
use futures::stream::StreamExt;
@@ -31,8 +31,7 @@ use std::ops::Deref;
use std::stringify;
use std::sync::{Arc, Mutex, MutexGuard};
-use super::executor::TaskId;
-use super::{Context, RUNTIME_CAT};
+use super::{Context, JoinHandle, RUNTIME_CAT};
#[derive(Debug, Eq, PartialEq, Ord, PartialOrd, Hash, Clone, Copy)]
pub enum TaskState {
@@ -45,7 +44,6 @@ pub enum TaskState {
Started,
Stopped,
Unprepared,
- Unpreparing,
}
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
@@ -86,20 +84,26 @@ impl From<TransitionError> for gst::ErrorMessage {
}
}
+// FIXME impl Future so that we can await without matching on the variant
/// Transition details.
///
/// A state transition occurs as a result of a triggering event.
-#[derive(Clone, Debug, Eq, PartialEq)]
+#[derive(Debug)]
pub enum TransitionStatus {
/// Transition completed successfully.
Complete {
origin: TaskState,
target: TaskState,
},
- /// Asynchronously awaiting for transition completion in a subtask.
+ /// Asynchronously awaiting for transition completion.
///
/// This occurs when the event is triggered from a `Context`.
- Async { trigger: Trigger, origin: TaskState },
+ Async {
+ trigger: Trigger,
+ origin: TaskState,
+ ack_handle: JoinHandle<Result<TransitionStatus, TransitionError>>,
+ },
+ // FIXME remove or edit doc
/// Not waiting for transition completion.
///
/// This is to prevent:
@@ -218,18 +222,16 @@ pub trait TaskImpl: Send + 'static {
}
}
+type AckSender = oneshot::Sender<Result<TransitionStatus, TransitionError>>;
+type AckReceiver = oneshot::Receiver<Result<TransitionStatus, TransitionError>>;
+
struct TriggeringEvent {
trigger: Trigger,
- ack_tx: oneshot::Sender<Result<TransitionStatus, TransitionError>>,
+ ack_tx: AckSender,
}
impl TriggeringEvent {
- fn new(
- trigger: Trigger,
- ) -> (
- Self,
- oneshot::Receiver<Result<TransitionStatus, TransitionError>>,
- ) {
+ fn new(trigger: Trigger) -> (Self, AckReceiver) {
let (ack_tx, ack_rx) = oneshot::channel();
let req = TriggeringEvent { trigger, ack_tx };
@@ -266,26 +268,49 @@ impl fmt::Debug for TriggeringEvent {
}
#[derive(Debug)]
+struct StateMachineHandle {
+ join_handle: JoinHandle<()>,
+ triggering_evt_tx: async_mpsc::Sender<TriggeringEvent>,
+ context: Context,
+}
+
+impl StateMachineHandle {
+ fn trigger(&mut self, trigger: Trigger) -> AckReceiver {
+ let (triggering_evt, ack_rx) = TriggeringEvent::new(trigger);
+
+ gst::log!(RUNTIME_CAT, "Pushing {:?}", triggering_evt);
+
+ self.triggering_evt_tx
+ .try_send(triggering_evt)
+ .unwrap_or_else(|err| {
+ panic!("trigger channel failure: {}", err);
+ });
+
+ self.context.unpark();
+
+ ack_rx
+ }
+
+ async fn join(self) {
+ self.join_handle
+ .await
+ .expect("state machine shouldn't have been cancelled");
+ }
+}
+
+#[derive(Debug)]
struct TaskInner {
- context: Option<Context>,
state: TaskState,
- state_machine_handle: Option<RemoteHandle<()>>,
- triggering_evt_tx: Option<async_mpsc::Sender<TriggeringEvent>>,
- prepare_abort_handle: Option<AbortHandle>,
+ state_machine_handle: Option<StateMachineHandle>,
loop_abort_handle: Option<AbortHandle>,
- spawned_task_id: Option<TaskId>,
}
impl Default for TaskInner {
fn default() -> Self {
TaskInner {
- context: None,
state: TaskState::Unprepared,
state_machine_handle: None,
- triggering_evt_tx: None,
- prepare_abort_handle: None,
loop_abort_handle: None,
- spawned_task_id: None,
}
}
}
@@ -328,34 +353,25 @@ impl TaskInner {
triggering_evt.send_ack(res);
}
- fn trigger(
- &mut self,
- trigger: Trigger,
- ) -> Result<oneshot::Receiver<Result<TransitionStatus, TransitionError>>, TransitionError> {
- let triggering_evt_tx = self.triggering_evt_tx.as_mut().unwrap();
-
- let (triggering_evt, ack_rx) = TriggeringEvent::new(trigger);
-
- gst::log!(RUNTIME_CAT, "Pushing {:?}", triggering_evt);
-
- triggering_evt_tx.try_send(triggering_evt).map_err(|err| {
- let resource_err = if err.is_full() {
- gst::ResourceError::NoSpaceLeft
- } else {
- gst::ResourceError::Close
- };
-
- gst::warning!(RUNTIME_CAT, "Unable to send {:?}: {:?}", trigger, err);
- TransitionError {
- trigger,
- state: self.state,
- err_msg: gst::error_msg!(resource_err, ["Unable to send {:?}: {:?}", trigger, err]),
- }
- })?;
-
- self.context.as_ref().unwrap().unpark();
-
- Ok(ack_rx)
+ fn trigger(&mut self, trigger: Trigger) -> Result<AckReceiver, TransitionError> {
+ self.state_machine_handle
+ .as_mut()
+ .map(|state_machine| state_machine.trigger(trigger))
+ .ok_or_else(|| {
+ gst::warning!(
+ RUNTIME_CAT,
+ "Unable to send {:?}: no state machine",
+ trigger
+ );
+ TransitionError {
+ trigger,
+ state: TaskState::Unprepared,
+ err_msg: gst::error_msg!(
+ gst::ResourceError::NotFound,
+ ["Unable to send {:?}: no state machine", trigger]
+ ),
+ }
+ })
}
fn abort_task_loop(&mut self) {
@@ -408,10 +424,6 @@ impl Task {
TaskStateGuard(self.0.lock().unwrap())
}
- pub fn context(&self) -> Option<Context> {
- self.0.lock().unwrap().context.as_ref().cloned()
- }
-
pub fn prepare(
&self,
task_impl: impl TaskImpl,
@@ -447,19 +459,32 @@ impl Task {
inner.state = TaskState::Preparing;
gst::log!(RUNTIME_CAT, "Spawning task state machine");
+ inner.state_machine_handle = Some(StateMachine::spawn(
+ self.0.clone(),
+ Box::new(task_impl),
+ context.clone(),
+ ));
- // FIXME allow configuration of the channel buffer size,
- // this determines the contention on the Task.
- let (triggering_evt_tx, triggering_evt_rx) = async_mpsc::channel(4);
- let state_machine = StateMachine::new(Box::new(task_impl), triggering_evt_rx);
- inner.state_machine_handle = Some(self.spawn_state_machine(state_machine, &context));
+ let ack_rx = inner.trigger(Trigger::Prepare)?;
+ drop(inner);
+
+ let ack_await_fut = async move {
+ gst::trace!(RUNTIME_CAT, "Awaiting ack for Prepare");
- inner.triggering_evt_tx = Some(triggering_evt_tx);
- inner.context = Some(context);
+ let res = ack_rx.await.unwrap();
+ if res.is_ok() {
+ gst::log!(RUNTIME_CAT, "Received ack {:?} for Prepare", res);
+ } else {
+ gst::error!(RUNTIME_CAT, "Received ack {:?} for Prepare", res);
+ }
+
+ res
+ };
Ok(TransitionStatus::Async {
trigger: Trigger::Prepare,
- origin,
+ origin: TaskState::Unprepared,
+ ack_handle: context.spawn_and_unpark(ack_await_fut),
})
}
@@ -467,17 +492,25 @@ impl Task {
let mut inner = self.0.lock().unwrap();
let origin = inner.state;
- match origin {
- TaskState::Stopped | TaskState::Error | TaskState::Prepared | TaskState::Preparing => {
- gst::debug!(RUNTIME_CAT, "Unpreparing task");
- }
- TaskState::Unprepared | TaskState::Unpreparing => {
- gst::debug!(RUNTIME_CAT, "Task already {:?}", origin);
- return Ok(TransitionStatus::Skipped {
- trigger: Trigger::Unprepare,
- state: origin,
- });
- }
+ let mut state_machine_handle = match origin {
+ TaskState::Stopped
+ | TaskState::Error
+ | TaskState::Prepared
+ | TaskState::Preparing
+ | TaskState::Unprepared => match inner.state_machine_handle.take() {
+ Some(state_machine_handle) => {
+ gst::debug!(RUNTIME_CAT, "Unpreparing task");
+
+ state_machine_handle
+ }
+ None => {
+ gst::debug!(RUNTIME_CAT, "Task already unpreparing");
+ return Ok(TransitionStatus::Skipped {
+ trigger: Trigger::Unprepare,
+ state: origin,
+ });
+ }
+ },
state => {
gst::warning!(
RUNTIME_CAT,
@@ -493,60 +526,46 @@ impl Task {
),
});
}
- }
-
- inner.state = TaskState::Unpreparing;
-
- if let Some(prepare_abort_handle) = inner.prepare_abort_handle.take() {
- prepare_abort_handle.abort();
- }
+ };
inner.abort_task_loop();
-
- let _ = inner.trigger(Trigger::Unprepare).unwrap();
- let triggering_evt_tx = inner.triggering_evt_tx.take().unwrap();
-
- let state_machine_handle = inner.state_machine_handle.take();
- let context = inner.context.take().unwrap();
+ let ack_rx = state_machine_handle.trigger(Trigger::Unprepare);
drop(inner);
- match state_machine_handle {
- Some(state_machine_handle) => {
- let state_machine_end_fut = async {
- state_machine_handle.await;
+ let state_machine_end_fut = async {
+ state_machine_handle.join().await;
- drop(triggering_evt_tx);
- drop(context);
+ let res = ack_rx.await.unwrap();
+ if res.is_ok() {
+ gst::log!(RUNTIME_CAT, "Received ack {:?} for Unprepare", res);
+ } else {
+ gst::error!(RUNTIME_CAT, "Received ack {:?} for Unprepare", res);
+ }
- gst::debug!(RUNTIME_CAT, "Task unprepared");
- };
+ res
+ };
- if let Some((cur_context, cur_task_id)) = Context::current_task() {
- gst::log!(
- RUNTIME_CAT,
- "Will wait for state machine termination completion in subtask to task {:?} on context {}",
- cur_task_id,
- cur_context.name()
- );
- let _ = Context::add_sub_task(state_machine_end_fut.map(|_| Ok(())));
+ if let Some(cur_context) = Context::current() {
+ let ack_handle = cur_context.spawn_and_unpark(state_machine_end_fut);
+ gst::log!(
+ RUNTIME_CAT,
+ "Will wait for state machine termination completion in {:?} on context {}",
+ ack_handle.task_id(),
+ cur_context.name()
+ );
- return Ok(TransitionStatus::Async {
- trigger: Trigger::Unprepare,
- origin,
- });
- } else {
- gst::log!(
- RUNTIME_CAT,
- "Waiting for state machine termination on current thread"
- );
- // Use a light-weight executor, no timer nor async io.
- futures::executor::block_on(state_machine_end_fut)
- }
- }
- None => {
- drop(triggering_evt_tx);
- drop(context);
- }
+ return Ok(TransitionStatus::Async {
+ trigger: Trigger::Unprepare,
+ origin,
+ ack_handle,
+ });
+ } else {
+ gst::log!(
+ RUNTIME_CAT,
+ "Waiting for state machine termination on current thread"
+ );
+ // Use a light-weight executor, no timer nor async io.
+ futures::executor::block_on(state_machine_end_fut.map(drop))
}
Ok(TransitionStatus::Complete {
@@ -564,9 +583,9 @@ impl Task {
let ack_rx = inner.trigger(Trigger::Start)?;
if let TaskState::Started = inner.state {
- return Ok(TransitionStatus::NotWaiting {
+ return Ok(TransitionStatus::Skipped {
trigger: Trigger::Start,
- origin: TaskState::Started,
+ state: TaskState::Started,
});
}
@@ -584,6 +603,7 @@ impl Task {
let ack_rx = inner.trigger(Trigger::Pause)?;
if let TaskState::Started = inner.state {
+ // FIXME this could be async
return Ok(TransitionStatus::NotWaiting {
trigger: Trigger::Pause,
origin: TaskState::Started,
@@ -626,25 +646,6 @@ impl Task {
trigger: Trigger,
) -> Result<TransitionStatus, TransitionError> {
let origin = inner.state;
-
- // Since triggering events handling is serialized by the state machine and
- // we hold a lock on TaskInner, we can verify if current spawned loop / tansition action
- // task_id matches the task_id of current subtask, if any.
- if let Some(spawned_task_id) = inner.spawned_task_id {
- if let Some((cur_context, cur_task_id)) = Context::current_task() {
- if cur_task_id == spawned_task_id && &cur_context == inner.context.as_ref().unwrap()
- {
- // Don't block as this would deadlock
- gst::log!(
- RUNTIME_CAT,
- "Requested {:?} from loop or transition action, not waiting",
- trigger,
- );
- return Ok(TransitionStatus::NotWaiting { trigger, origin });
- }
- }
- }
-
drop(inner);
let ack_await_fut = async move {
@@ -660,17 +661,22 @@ impl Task {
res
};
- if let Some((cur_context, cur_task_id)) = Context::current_task() {
+ if let Some(cur_context) = Context::current() {
+ let ack_handle = cur_context.spawn(ack_await_fut);
+
gst::log!(
RUNTIME_CAT,
- "Will await ack for {:?} in subtask to task {:?} on context {}",
+ "Awaiting ack for {:?} in {:?} on context {}",
trigger,
- cur_task_id,
+ ack_handle.task_id(),
cur_context.name()
);
- let _ = Context::add_sub_task(ack_await_fut.map(|_| Ok(())));
- Ok(TransitionStatus::Async { trigger, origin })
+ Ok(TransitionStatus::Async {
+ trigger,
+ origin,
+ ack_handle,
+ })
} else {
gst::log!(
RUNTIME_CAT,
@@ -681,22 +687,6 @@ impl Task {
futures::executor::block_on(ack_await_fut)
}
}
-
- fn spawn_state_machine(
- &self,
- state_machine: StateMachine,
- context: &Context,
- ) -> RemoteHandle<()> {
- use futures::executor::ThreadPool;
- use futures::task::SpawnExt;
- use once_cell::sync::OnceCell;
-
- static EXECUTOR: OnceCell<ThreadPool> = OnceCell::new();
- EXECUTOR
- .get_or_init(|| ThreadPool::builder().pool_size(1).create().unwrap())
- .spawn_with_handle(state_machine.run(Arc::clone(&self.0), context.clone()))
- .unwrap()
- }
}
struct StateMachine {
@@ -711,13 +701,10 @@ struct StateMachine {
// state between 2 elements.
macro_rules! exec_action {
- ($self:ident, $action:ident, $triggering_evt:expr, $origin:expr, $task_inner:expr, $context:expr) => {{
- let trigger = $triggering_evt.trigger;
-
- let action_fut = async move {
- let mut res = $self.task_impl.$action().await;
-
- if res.is_ok() {
+ ($self:ident, $action:ident, $triggering_evt:expr, $origin:expr, $task_inner:expr) => {{
+ match $self.task_impl.$action().await {
+ Ok(()) => {
+ let mut res;
while Context::current_has_sub_tasks() {
gst::trace!(RUNTIME_CAT, "Draining subtasks for {}", stringify!($action));
res = Context::drain_sub_tasks().await.map_err(|err| {
@@ -730,36 +717,18 @@ macro_rules! exec_action {
break;
}
}
- }
-
- let res = match res {
- Ok(()) => Ok(()),
- Err(err) => {
- let next_triggering_evt = $self
- .task_impl
- .handle_action_error(trigger, $origin, err)
- .await;
- Err(next_triggering_evt)
- }
- };
-
- ($self, res)
- };
-
- let join_handle = {
- let mut task_inner = $task_inner.lock().unwrap();
- let join_handle = $context.spawn_and_unpark(action_fut);
- task_inner.spawned_task_id = Some(join_handle.task_id());
- join_handle
- };
+ Ok($triggering_evt)
+ }
+ Err(err) => {
+ // FIXME problem is that we loose the origin trigger in the
+ // final TransitionStatus.
- let (this, res) = join_handle.await.unwrap();
- $self = this;
+ let next_trigger = $self
+ .task_impl
+ .handle_action_error($triggering_evt.trigger, $origin, err)
+ .await;
- match res {
- Ok(()) => Ok($triggering_evt),
- Err(next_trigger) => {
// Convert triggering event according to the error handler's decision
gst::trace!(
RUNTIME_CAT,
@@ -780,41 +749,63 @@ macro_rules! exec_action {
impl StateMachine {
// Use dynamic dispatch for TaskImpl as it reduces memory usage compared to monomorphization
// without inducing any significant performance penalties.
- fn new(
+ fn spawn(
+ task_inner: Arc<Mutex<TaskInner>>,
task_impl: Box<dyn TaskImpl>,
- triggering_evt_rx: async_mpsc::Receiver<TriggeringEvent>,
- ) -> Self {
- StateMachine {
+ context: Context,
+ ) -> StateMachineHandle {
+ let (triggering_evt_tx, triggering_evt_rx) = async_mpsc::channel(4);
+
+ let state_machine = StateMachine {
task_impl,
triggering_evt_rx,
pending_triggering_evt: None,
+ };
+
+ StateMachineHandle {
+ join_handle: context.spawn_and_unpark(state_machine.run(task_inner)),
+ triggering_evt_tx,
+ context,
}
}
- async fn run(mut self, task_inner: Arc<Mutex<TaskInner>>, context: Context) {
- gst::trace!(RUNTIME_CAT, "Preparing task");
+ async fn run(mut self, task_inner: Arc<Mutex<TaskInner>>) {
+ let context = Context::current().expect("must be spawed on a Context");
+
+ let mut triggering_evt = self
+ .triggering_evt_rx
+ .next()
+ .await
+ .expect("triggering_evt_rx dropped");
+
+ if let Trigger::Prepare = triggering_evt.trigger {
+ gst::trace!(RUNTIME_CAT, "Preparing task");
- {
- let (mut triggering_evt, _) = TriggeringEvent::new(Trigger::Prepare);
let res = exec_action!(
self,
prepare,
triggering_evt,
- TaskState::Preparing,
- &task_inner,
- &context
+ TaskState::Unprepared,
+ &task_inner
);
if let Ok(triggering_evt) = res {
- task_inner
- .lock()
- .unwrap()
- .switch_to_state(TaskState::Prepared, triggering_evt);
+ let mut task_inner = task_inner.lock().unwrap();
+ let res = Ok(TransitionStatus::Complete {
+ origin: TaskState::Unprepared,
+ target: TaskState::Prepared,
+ });
+
+ task_inner.state = TaskState::Prepared;
+ triggering_evt.send_ack(res);
+
gst::trace!(RUNTIME_CAT, "Task Prepared");
}
+ } else {
+ panic!("Unexpected initial trigger {:?}", triggering_evt.trigger);
}
loop {
- let mut triggering_evt = match self.pending_triggering_evt.take() {
+ triggering_evt = match self.pending_triggering_evt.take() {
Some(pending_triggering_evt) => pending_triggering_evt,
None => self
.triggering_evt_rx
@@ -859,8 +850,7 @@ impl StateMachine {
origin
};
- self =
- Self::spawn_loop(self, triggering_evt, origin, &task_inner, &context).await;
+ self = Self::start(self, triggering_evt, origin, &task_inner, &context).await;
// next/pending triggering event handled in next iteration
}
Trigger::Pause => {
@@ -884,8 +874,7 @@ impl StateMachine {
}
};
- let res =
- exec_action!(self, pause, triggering_evt, origin, &task_inner, &context);
+ let res = exec_action!(self, pause, triggering_evt, origin, &task_inner);
if let Ok(triggering_evt) = res {
task_inner
.lock()
@@ -915,8 +904,7 @@ impl StateMachine {
}
};
- let res =
- exec_action!(self, stop, triggering_evt, origin, &task_inner, &context);
+ let res = exec_action!(self, stop, triggering_evt, origin, &task_inner);
if let Ok(triggering_evt) = res {
task_inner
.lock()
@@ -944,14 +932,7 @@ impl StateMachine {
}
};
- let res = exec_action!(
- self,
- flush_start,
- triggering_evt,
- origin,
- &task_inner,
- &context
- );
+ let res = exec_action!(self, flush_start, triggering_evt, origin, &task_inner);
if let Ok(triggering_evt) = res {
task_inner
.lock()
@@ -979,14 +960,7 @@ impl StateMachine {
}
};
- let res = exec_action!(
- self,
- flush_stop,
- triggering_evt,
- origin,
- &task_inner,
- &context
- );
+ let res = exec_action!(self, flush_stop, triggering_evt, origin, &task_inner);
if let Ok(triggering_evt) = res {
if is_paused {
task_inner
@@ -995,38 +969,26 @@ impl StateMachine {
.switch_to_state(TaskState::Paused, triggering_evt);
gst::trace!(RUNTIME_CAT, "Switched from PausedFlushing to Paused");
} else {
- self = Self::spawn_loop(
- self,
- triggering_evt,
- origin,
- &task_inner,
- &context,
- )
- .await;
+ self = Self::start(self, triggering_evt, origin, &task_inner, &context)
+ .await;
// next/pending triggering event handled in next iteration
}
}
}
Trigger::Unprepare => {
- // Unprepare is not joined by an ack_rx but by joining the state machine
- // handle, so we don't need to keep track of the spwaned_task_id
- context
- .spawn_and_unpark(async move {
- self.task_impl.unprepare().await;
-
- while Context::current_has_sub_tasks() {
- gst::trace!(RUNTIME_CAT, "Draining subtasks for unprepare");
- let res = Context::drain_sub_tasks().await.map_err(|err| {
- gst::log!(RUNTIME_CAT, "unprepare subtask returned {:?}", err);
- err
- });
- if res.is_err() {
- break;
- }
- }
- })
- .await
- .unwrap();
+ // Unprepare is not joined by an ack_rx but by joining the state machine handle
+ self.task_impl.unprepare().await;
+
+ while Context::current_has_sub_tasks() {
+ gst::trace!(RUNTIME_CAT, "Draining subtasks for unprepare");
+ let res = Context::drain_sub_tasks().await.map_err(|err| {
+ gst::log!(RUNTIME_CAT, "unprepare subtask returned {:?}", err);
+ err
+ });
+ if res.is_err() {
+ break;
+ }
+ }
task_inner
.lock()
@@ -1042,87 +1004,45 @@ impl StateMachine {
gst::trace!(RUNTIME_CAT, "Task state machine terminated");
}
- async fn spawn_loop(
+ async fn start(
mut self,
mut triggering_evt: TriggeringEvent,
origin: TaskState,
task_inner: &Arc<Mutex<TaskInner>>,
context: &Context,
) -> Self {
- let task_inner_clone = Arc::clone(task_inner);
- let loop_fut = async move {
- let mut res = self.task_impl.start().await;
- if res.is_ok() {
- while Context::current_has_sub_tasks() {
- gst::trace!(RUNTIME_CAT, "Draining subtasks for start");
- res = Context::drain_sub_tasks().await.map_err(|err| {
- let msg = format!("start subtask returned {:?}", err);
- gst::log!(RUNTIME_CAT, "{}", &msg);
- gst::error_msg!(gst::CoreError::StateChange, ["{}", &msg])
- });
-
- if res.is_err() {
- break;
- }
- }
- }
-
- match res {
- Ok(()) => {
- let abortable_task_loop = {
- let (abortable_task_loop, loop_abort_handle) =
- abortable(self.run_loop(Arc::clone(&task_inner_clone)));
+ let triggering_evt = match exec_action!(self, start, triggering_evt, origin, &task_inner) {
+ Ok(triggering_evt) => triggering_evt,
+ Err(_) => return self,
+ };
- let mut task_inner = task_inner_clone.lock().unwrap();
- task_inner.loop_abort_handle = Some(loop_abort_handle);
- task_inner.switch_to_state(TaskState::Started, triggering_evt);
+ let task_inner_cl = Arc::clone(task_inner);
+ let loop_fut = async move {
+ let (abortable_task_loop, loop_abort_handle) =
+ abortable(self.run_loop(Arc::clone(&task_inner_cl)));
- abortable_task_loop
- };
+ {
+ let mut task_inner = task_inner_cl.lock().unwrap();
+ task_inner.loop_abort_handle = Some(loop_abort_handle);
+ task_inner.switch_to_state(TaskState::Started, triggering_evt);
- gst::trace!(RUNTIME_CAT, "Starting task loop");
- match abortable_task_loop.await {
- Ok(Ok(())) => (),
- Ok(Err(err)) => {
- let next_trigger = self.task_impl.handle_iterate_error(err).await;
- let (triggering_evt, _) = TriggeringEvent::new(next_trigger);
- self.pending_triggering_evt = Some(triggering_evt);
- }
- Err(Aborted) => gst::trace!(RUNTIME_CAT, "Task loop aborted"),
- }
- }
- Err(err) => {
- // Error while executing start transition action
- let next_trigger = self
- .task_impl
- .handle_action_error(triggering_evt.trigger, origin, err)
- .await;
-
- gst::log!(
- RUNTIME_CAT,
- "TaskImpl transition action error: converting Start to {:?}",
- next_trigger,
- );
+ gst::trace!(RUNTIME_CAT, "Starting task loop");
+ }
- triggering_evt.trigger = next_trigger;
+ match abortable_task_loop.await {
+ Ok(Ok(())) => (),
+ Ok(Err(err)) => {
+ let next_trigger = self.task_impl.handle_iterate_error(err).await;
+ let (triggering_evt, _) = TriggeringEvent::new(next_trigger);
self.pending_triggering_evt = Some(triggering_evt);
}
+ Err(Aborted) => gst::trace!(RUNTIME_CAT, "Task loop aborted"),
}
- // next/pending triggering event handled in state machine loop
-
self
};
- let join_handle = {
- let mut task_inner = task_inner.lock().unwrap();
- let join_handle = context.spawn_and_unpark(loop_fut);
- task_inner.spawned_task_id = Some(join_handle.task_id());
-
- join_handle
- };
-
- join_handle.await.unwrap()
+ context.spawn_and_unpark(loop_fut).await.unwrap()
}
async fn run_loop(&mut self, task_inner: Arc<Mutex<TaskInner>>) -> Result<(), gst::FlowError> {
@@ -1291,40 +1211,48 @@ mod tests {
let (stopped_sender, mut stopped_receiver) = mpsc::channel(1);
let (flush_start_sender, mut flush_start_receiver) = mpsc::channel(1);
let (unprepared_sender, mut unprepared_receiver) = mpsc::channel(1);
- let res = task
- .prepare(
- TaskTest {
- prepared_sender,
- started_sender,
- iterate_sender,
- complete_iterate_receiver,
- paused_sender,
- stopped_sender,
- flush_start_sender,
- unprepared_sender,
- },
- context,
- )
- .unwrap();
- assert_eq!(
- res,
- TransitionStatus::Async {
+ let res = task.prepare(
+ TaskTest {
+ prepared_sender,
+ started_sender,
+ iterate_sender,
+ complete_iterate_receiver,
+ paused_sender,
+ stopped_sender,
+ flush_start_sender,
+ unprepared_sender,
+ },
+ context,
+ );
+
+ let prepare_ack_handle = match res {
+ Ok(TransitionStatus::Async {
trigger: Trigger::Prepare,
origin: TaskState::Unprepared,
- }
- );
+ ack_handle,
+ }) => ack_handle,
+ other => panic!("unexpected {:?}", other),
+ };
gst::debug!(RUNTIME_CAT, "task_iterate: starting (initial)");
- assert_eq!(
- task.start().unwrap(),
- TransitionStatus::Complete {
+
+ match task.start() {
+ Ok(TransitionStatus::Complete {
origin: TaskState::Prepared,
- target: TaskState::Started
- },
- );
+ target: TaskState::Started,
+ }) => (),
+ other => panic!("unexpected {:?}", other),
+ }
assert_eq!(task.state(), TaskState::Started);
- // At this point, prepared must be completed
+ // At this point, preparation must be complete
+ match block_on(prepare_ack_handle).unwrap() {
+ Ok(TransitionStatus::Complete {
+ origin: TaskState::Unprepared,
+ target: TaskState::Prepared,
+ }) => (),
+ other => panic!("unexpected {:?}", other),
+ }
block_on(prepared_receiver.next()).unwrap();
// ... and start executed
block_on(started_receiver.next()).unwrap();
@@ -1336,24 +1264,24 @@ mod tests {
block_on(complete_iterate_sender.send(Ok(()))).unwrap();
gst::debug!(RUNTIME_CAT, "task_iterate: starting (redundant)");
- // start will return immediately
- assert_eq!(
- task.start().unwrap(),
- TransitionStatus::NotWaiting {
+ // already started
+ match task.start() {
+ Ok(TransitionStatus::Skipped {
trigger: Trigger::Start,
- origin: TaskState::Started,
- },
- );
+ state: TaskState::Started,
+ }) => (),
+ other => panic!("unexpected {:?}", other),
+ }
assert_eq!(task.state(), TaskState::Started);
gst::debug!(RUNTIME_CAT, "task_iterate: pause (initial)");
- assert_eq!(
- task.pause().unwrap(),
- TransitionStatus::NotWaiting {
+ match task.pause() {
+ Ok(TransitionStatus::NotWaiting {
trigger: Trigger::Pause,
origin: TaskState::Started,
- },
- );
+ }) => (),
+ other => panic!("unexpected {:?}", other),
+ }
// Pause transition is asynchronous
while TaskState::Paused != task.state() {
@@ -1369,26 +1297,26 @@ mod tests {
block_on(paused_receiver.next()).unwrap();
gst::debug!(RUNTIME_CAT, "task_iterate: starting (after pause)");
- assert_eq!(
- task.start().unwrap(),
- TransitionStatus::Complete {
+ match task.start() {
+ Ok(TransitionStatus::Complete {
origin: TaskState::Paused,
- target: TaskState::Started
- },
- );
+ target: TaskState::Started,
+ }) => (),
+ other => panic!("unexpected {:?}", other),
+ }
assert_eq!(task.state(), TaskState::Started);
// Paused -> Started
let _ = block_on(started_receiver.next());
gst::debug!(RUNTIME_CAT, "task_iterate: stopping");
- assert_eq!(
- task.stop().unwrap(),
- TransitionStatus::Complete {
+ match task.stop() {
+ Ok(TransitionStatus::Complete {
origin: TaskState::Started,
- target: TaskState::Stopped
- },
- );
+ target: TaskState::Stopped,
+ }) => (),
+ other => panic!("unexpected {:?}", other),
+ }
assert_eq!(task.state(), TaskState::Stopped);
let _ = block_on(stopped_receiver.next());
@@ -1397,13 +1325,13 @@ mod tests {
let _ = iterate_receiver.try_next();
gst::debug!(RUNTIME_CAT, "task_iterate: starting (after stop)");
- assert_eq!(
- task.start().unwrap(),
- TransitionStatus::Complete {
+ match task.start() {
+ Ok(TransitionStatus::Complete {
origin: TaskState::Stopped,
- target: TaskState::Started
- },
- );
+ target: TaskState::Started,
+ }) => (),
+ other => panic!("unexpected {:?}", other),
+ }
let _ = block_on(started_receiver.next());
gst::debug!(RUNTIME_CAT, "task_iterate: req. iterate to return Eos");
@@ -1419,13 +1347,13 @@ mod tests {
}
gst::debug!(RUNTIME_CAT, "task_iterate: starting (after stop)");
- assert_eq!(
- task.start().unwrap(),
- TransitionStatus::Complete {
+ match task.start() {
+ Ok(TransitionStatus::Complete {
origin: TaskState::Stopped,
- target: TaskState::Started
- },
- );
+ target: TaskState::Started,
+ }) => (),
+ other => panic!("unexpected {:?}", other),
+ }
let _ = block_on(started_receiver.next());
gst::debug!(RUNTIME_CAT, "task_iterate: req. iterate to return Flushing");
@@ -1441,13 +1369,13 @@ mod tests {
}
gst::debug!(RUNTIME_CAT, "task_iterate: stop flushing");
- assert_eq!(
- task.flush_stop().unwrap(),
- TransitionStatus::Complete {
+ match task.flush_stop() {
+ Ok(TransitionStatus::Complete {
origin: TaskState::Flushing,
- target: TaskState::Started
- },
- );
+ target: TaskState::Started,
+ }) => (),
+ other => panic!("unexpected {:?}", other),
+ }
let _ = block_on(started_receiver.next());
gst::debug!(RUNTIME_CAT, "task_iterate: req. iterate to return Error");
@@ -1463,23 +1391,22 @@ mod tests {
RUNTIME_CAT,
"task_iterate: attempting to start (after Error)"
);
- let err = task.start().unwrap_err();
- match err {
- TransitionError {
+ match task.start() {
+ Err(TransitionError {
trigger: Trigger::Start,
state: TaskState::Error,
..
- } => (),
+ }) => (),
_ => unreachable!(),
}
- assert_eq!(
- task.unprepare().unwrap(),
- TransitionStatus::Complete {
+ match task.unprepare() {
+ Ok(TransitionStatus::Complete {
origin: TaskState::Error,
- target: TaskState::Unprepared
- },
- );
+ target: TaskState::Unprepared,
+ }) => (),
+ other => panic!("unexpected {:?}", other),
+ }
assert_eq!(task.state(), TaskState::Unprepared);
let _ = block_on(unprepared_receiver.next());
@@ -1518,7 +1445,7 @@ mod tests {
err
);
match (trigger, state) {
- (Trigger::Prepare, TaskState::Preparing) => {
+ (Trigger::Prepare, TaskState::Unprepared) => {
self.prepare_error_sender.send(()).await.unwrap();
}
other => unreachable!("{:?}", other),
@@ -1559,13 +1486,12 @@ mod tests {
std::thread::sleep(Duration::from_millis(2));
}
- let res = task.start().unwrap_err();
- match res {
- TransitionError {
+ match task.start() {
+ Err(TransitionError {
trigger: Trigger::Start,
state: TaskState::Error,
..
- } => (),
+ }) => (),
other => unreachable!("{:?}", other),
}
@@ -1627,41 +1553,61 @@ mod tests {
.unwrap();
let start_ctx = Context::acquire("prepare_start_ok_requester", Duration::ZERO).unwrap();
- let task_clone = task.clone();
let (ready_sender, ready_receiver) = oneshot::channel();
let start_handle = start_ctx.spawn(async move {
- assert_eq!(task_clone.state(), TaskState::Preparing);
+ assert_eq!(task.state(), TaskState::Preparing);
gst::debug!(RUNTIME_CAT, "prepare_start_ok: starting");
- assert_eq!(
- task_clone.start().unwrap(),
- TransitionStatus::Async {
+ let ack_handle = match task.start() {
+ Ok(TransitionStatus::Async {
trigger: Trigger::Start,
origin: TaskState::Preparing,
- }
- );
+ ack_handle,
+ }) => ack_handle,
+ other => panic!("unexpected {:?}", other),
+ };
ready_sender.send(()).unwrap();
- Context::drain_sub_tasks().await.unwrap();
- assert_eq!(task_clone.state(), TaskState::Started);
+ match ack_handle.await.unwrap() {
+ Ok(TransitionStatus::Complete {
+ origin: TaskState::Prepared,
+ target: TaskState::Started,
+ }) => (),
+ other => panic!("unexpected {:?}", other),
+ }
+ assert_eq!(task.state(), TaskState::Started);
- assert_eq!(
- task.stop().unwrap(),
- TransitionStatus::Async {
+ let ack_handle = match task.stop() {
+ Ok(TransitionStatus::Async {
trigger: Trigger::Stop,
origin: TaskState::Started,
- },
- );
- Context::drain_sub_tasks().await.unwrap();
- assert_eq!(task_clone.state(), TaskState::Stopped);
+ ack_handle,
+ }) => ack_handle,
+ other => panic!("unexpected {:?}", other),
+ };
+ match ack_handle.await.unwrap() {
+ Ok(TransitionStatus::Complete {
+ origin: TaskState::Started,
+ target: TaskState::Stopped,
+ }) => (),
+ other => panic!("unexpected {:?}", other),
+ }
+ assert_eq!(task.state(), TaskState::Stopped);
- assert_eq!(
- task.unprepare().unwrap(),
- TransitionStatus::Async {
+ let ack_handle = match task.unprepare() {
+ Ok(TransitionStatus::Async {
trigger: Trigger::Unprepare,
origin: TaskState::Stopped,
- },
- );
- Context::drain_sub_tasks().await.unwrap();
- assert_eq!(task_clone.state(), TaskState::Unprepared);
+ ack_handle,
+ }) => ack_handle,
+ other => panic!("unexpected {:?}", other),
+ };
+ match ack_handle.await.unwrap() {
+ Ok(TransitionStatus::Complete {
+ origin: TaskState::Stopped,
+ target: TaskState::Unprepared,
+ }) => (),
+ other => panic!("unexpected {:?}", other),
+ }
+ assert_eq!(task.state(), TaskState::Unprepared);
});
gst::debug!(RUNTIME_CAT, "prepare_start_ok: awaiting for start_ctx");
@@ -1715,10 +1661,10 @@ mod tests {
err
);
match (trigger, state) {
- (Trigger::Prepare, TaskState::Preparing) => {
+ (Trigger::Prepare, TaskState::Unprepared) => {
self.prepare_error_sender.send(()).await.unwrap();
}
- other => unreachable!("action error for {:?}", other),
+ other => panic!("action error for {:?}", other),
}
Trigger::Error
}
@@ -1740,33 +1686,55 @@ mod tests {
let (mut prepare_sender, prepare_receiver) = mpsc::channel(1);
let (prepare_error_sender, mut prepare_error_receiver) = mpsc::channel(1);
- task.prepare(
+ let res = task.prepare(
TaskPrepareTest {
prepare_receiver,
prepare_error_sender,
},
context,
- )
- .unwrap();
+ );
+ let prepare_ack = match res {
+ Ok(TransitionStatus::Async {
+ trigger: Trigger::Prepare,
+ origin: TaskState::Unprepared,
+ ack_handle,
+ }) => ack_handle,
+ other => panic!("unexpected {:?}", other),
+ };
let start_ctx = Context::acquire("prepare_start_error_requester", Duration::ZERO).unwrap();
- let task_clone = task.clone();
let (ready_sender, ready_receiver) = oneshot::channel();
let start_handle = start_ctx.spawn(async move {
- assert_eq!(task_clone.state(), TaskState::Preparing);
gst::debug!(RUNTIME_CAT, "prepare_start_error: starting (Err)");
- task_clone.start().unwrap();
+ task.start().unwrap();
ready_sender.send(()).unwrap();
- Context::drain_sub_tasks().await.unwrap();
-
- assert_eq!(
- task.unprepare().unwrap(),
+ // FIXME we loose the origin Trigger (Start)
+ // and only get the Trigger returned by handle_action_error
+ // see also: comment in exec_action!
+ match prepare_ack.await.unwrap() {
+ Err(TransitionError {
+ trigger: Trigger::Error,
+ state: TaskState::Preparing,
+ ..
+ }) => (),
+ other => panic!("unexpected transition res {:?}", other),
+ }
+
+ let ack_handle = match task.unprepare().unwrap() {
TransitionStatus::Async {
trigger: Trigger::Unprepare,
origin: TaskState::Error,
- },
- );
- Context::drain_sub_tasks().await.unwrap();
+ ack_handle,
+ } => ack_handle,
+ other => panic!("unexpected {:?}", other),
+ };
+ match ack_handle.await.unwrap() {
+ Ok(TransitionStatus::Complete {
+ origin: TaskState::Error,
+ target: TaskState::Unprepared,
+ }) => (),
+ other => panic!("unexpected {:?}", other),
+ }
});
gst::debug!(RUNTIME_CAT, "prepare_start_error: awaiting for start_ctx");
@@ -1840,26 +1808,26 @@ mod tests {
.unwrap();
gst::debug!(RUNTIME_CAT, "pause_start: starting");
- assert_eq!(
- task.start().unwrap(),
- TransitionStatus::Complete {
+ match task.start() {
+ Ok(TransitionStatus::Complete {
origin: TaskState::Prepared,
target: TaskState::Started,
- },
- );
+ }) => (),
+ other => panic!("unexpected {:?}", other),
+ }
assert_eq!(task.state(), TaskState::Started);
gst::debug!(RUNTIME_CAT, "pause_start: awaiting 1st iteration");
block_on(iterate_receiver.next()).unwrap();
gst::debug!(RUNTIME_CAT, "pause_start: pausing (1)");
- assert_eq!(
- task.pause().unwrap(),
- TransitionStatus::NotWaiting {
+ match task.pause() {
+ Ok(TransitionStatus::NotWaiting {
trigger: Trigger::Pause,
origin: TaskState::Started,
- },
- );
+ }) => (),
+ other => panic!("unexpected {:?}", other),
+ }
gst::debug!(RUNTIME_CAT, "pause_start: sending 1st iteration completion");
complete_sender.try_send(()).unwrap();
@@ -1874,14 +1842,13 @@ mod tests {
// Loop held on due to Pause
iterate_receiver.try_next().unwrap_err();
-
- assert_eq!(
- task.start().unwrap(),
- TransitionStatus::Complete {
+ match task.start() {
+ Ok(TransitionStatus::Complete {
origin: TaskState::Paused,
target: TaskState::Started,
- },
- );
+ }) => (),
+ other => panic!("unexpected {:?}", other),
+ }
assert_eq!(task.state(), TaskState::Started);
gst::debug!(RUNTIME_CAT, "pause_start: awaiting 2d iteration");
@@ -1995,25 +1962,25 @@ mod tests {
task.start().unwrap();
gst::debug!(RUNTIME_CAT, "flush_regular_sync: starting flush");
- assert_eq!(
- task.flush_start().unwrap(),
- TransitionStatus::Complete {
+ match task.flush_start() {
+ Ok(TransitionStatus::Complete {
origin: TaskState::Started,
target: TaskState::Flushing,
- },
- );
+ }) => (),
+ other => panic!("unexpected {:?}", other),
+ }
assert_eq!(task.state(), TaskState::Flushing);
block_on(flush_start_receiver.next()).unwrap();
gst::debug!(RUNTIME_CAT, "flush_regular_sync: stopping flush");
- assert_eq!(
- task.flush_stop().unwrap(),
- TransitionStatus::Complete {
+ match task.flush_stop() {
+ Ok(TransitionStatus::Complete {
origin: TaskState::Flushing,
target: TaskState::Started,
- },
- );
+ }) => (),
+ other => panic!("unexpected {:?}", other),
+ }
assert_eq!(task.state(), TaskState::Started);
block_on(flush_stop_receiver.next()).unwrap();
@@ -2090,25 +2057,39 @@ mod tests {
let task_clone = task.clone();
let flush_handle = oob_context.spawn(async move {
- assert_eq!(
- task_clone.flush_start().unwrap(),
- TransitionStatus::Async {
+ let flush_ack_handle = match task_clone.flush_start() {
+ Ok(TransitionStatus::Async {
trigger: Trigger::FlushStart,
origin: TaskState::Started,
- },
- );
- Context::drain_sub_tasks().await.unwrap();
+ ack_handle,
+ }) => ack_handle,
+ other => panic!("unexpected {:?}", other),
+ };
+ match flush_ack_handle.await.unwrap() {
+ Ok(TransitionStatus::Complete {
+ origin: TaskState::Started,
+ target: TaskState::Flushing,
+ }) => (),
+ other => panic!("unexpected {:?}", other),
+ }
assert_eq!(task_clone.state(), TaskState::Flushing);
flush_start_receiver.next().await.unwrap();
- assert_eq!(
- task_clone.flush_stop().unwrap(),
- TransitionStatus::Async {
+ let flush_stop_ack_handle = match task_clone.flush_stop() {
+ Ok(TransitionStatus::Async {
trigger: Trigger::FlushStop,
origin: TaskState::Flushing,
- },
- );
- Context::drain_sub_tasks().await.unwrap();
+ ack_handle,
+ }) => ack_handle,
+ other => panic!("unexpected {:?}", other),
+ };
+ match flush_stop_ack_handle.await.unwrap() {
+ Ok(TransitionStatus::Complete {
+ origin: TaskState::Flushing,
+ target: TaskState::Started,
+ }) => (),
+ other => panic!("unexpected {:?}", other),
+ }
assert_eq!(task_clone.state(), TaskState::Started);
});
@@ -2165,33 +2146,47 @@ mod tests {
flush_start_sender,
flush_stop_sender,
},
- context,
+ context.clone(),
)
.unwrap();
task.start().unwrap();
let task_clone = task.clone();
- let flush_handle = task.context().as_ref().unwrap().spawn(async move {
- assert_eq!(
- task_clone.flush_start().unwrap(),
- TransitionStatus::Async {
+ let flush_handle = context.spawn(async move {
+ let flush_ack_handle = match task_clone.flush_start() {
+ Ok(TransitionStatus::Async {
trigger: Trigger::FlushStart,
origin: TaskState::Started,
- },
- );
- Context::drain_sub_tasks().await.unwrap();
+ ack_handle,
+ }) => ack_handle,
+ other => panic!("unexpected {:?}", other),
+ };
+ match flush_ack_handle.await.unwrap() {
+ Ok(TransitionStatus::Complete {
+ origin: TaskState::Started,
+ target: TaskState::Flushing,
+ }) => (),
+ other => panic!("unexpected {:?}", other),
+ }
assert_eq!(task_clone.state(), TaskState::Flushing);
flush_start_receiver.next().await.unwrap();
- assert_eq!(
- task_clone.flush_stop().unwrap(),
- TransitionStatus::Async {
+ let flush_stop_ack_handle = match task_clone.flush_stop() {
+ Ok(TransitionStatus::Async {
trigger: Trigger::FlushStop,
origin: TaskState::Flushing,
- },
- );
- Context::drain_sub_tasks().await.unwrap();
+ ack_handle,
+ }) => ack_handle,
+ other => panic!("unexpected {:?}", other),
+ };
+ match flush_stop_ack_handle.await.unwrap() {
+ Ok(TransitionStatus::Complete {
+ origin: TaskState::Flushing,
+ target: TaskState::Started,
+ }) => (),
+ other => panic!("unexpected {:?}", other),
+ }
assert_eq!(task_clone.state(), TaskState::Started);
});
@@ -2216,13 +2211,14 @@ mod tests {
fn iterate(&mut self) -> BoxFuture<'_, Result<(), gst::FlowError>> {
async move {
gst::debug!(RUNTIME_CAT, "flush_from_loop: flush_start from iteration");
- assert_eq!(
- self.task.flush_start().unwrap(),
- TransitionStatus::NotWaiting {
+ match self.task.flush_start() {
+ Ok(TransitionStatus::Async {
trigger: Trigger::FlushStart,
origin: TaskState::Started,
- },
- );
+ ..
+ }) => (),
+ other => panic!("unexpected {:?}", other),
+ }
Ok(())
}
.boxed()
@@ -2260,13 +2256,13 @@ mod tests {
);
block_on(flush_start_receiver.next()).unwrap();
- assert_eq!(
- task.stop().unwrap(),
- TransitionStatus::Complete {
+ match task.stop() {
+ Ok(TransitionStatus::Complete {
origin: TaskState::Flushing,
target: TaskState::Stopped,
- },
- );
+ }) => (),
+ other => panic!("unexpected {:?}", other),
+ }
task.unprepare().unwrap();
}
@@ -2289,13 +2285,14 @@ mod tests {
crate::runtime::time::delay_for(Duration::from_millis(50)).await;
gst::debug!(RUNTIME_CAT, "pause_from_loop: pause from iteration");
- assert_eq!(
- self.task.pause().unwrap(),
- TransitionStatus::NotWaiting {
+ match self.task.pause() {
+ Ok(TransitionStatus::NotWaiting {
trigger: Trigger::Pause,
origin: TaskState::Started,
- },
- );
+ }) => (),
+ other => panic!("unexpected {:?}", other),
+ }
+
Ok(())
}
.boxed()
@@ -2355,13 +2352,15 @@ mod tests {
RUNTIME_CAT,
"trigger_from_action: flush_start triggering flush_stop"
);
- assert_eq!(
- self.task.flush_stop().unwrap(),
- TransitionStatus::NotWaiting {
+ match self.task.flush_stop() {
+ Ok(TransitionStatus::Async {
trigger: Trigger::FlushStop,
origin: TaskState::Started,
- },
- );
+ ..
+ }) => (),
+ other => panic!("unexpected {:?}", other),
+ }
+
Ok(())
}
.boxed()
@@ -2467,33 +2466,33 @@ mod tests {
// Pause, FlushStart, FlushStop, Start
gst::debug!(RUNTIME_CAT, "pause_flush_start: pausing");
- assert_eq!(
- task.pause().unwrap(),
- TransitionStatus::Complete {
+ match task.pause() {
+ Ok(TransitionStatus::Complete {
origin: TaskState::Prepared,
target: TaskState::Paused,
- },
- );
+ }) => (),
+ other => panic!("unexpected {:?}", other),
+ }
gst::debug!(RUNTIME_CAT, "pause_flush_start: starting flush");
- assert_eq!(
- task.flush_start().unwrap(),
- TransitionStatus::Complete {
+ match task.flush_start() {
+ Ok(TransitionStatus::Complete {
origin: TaskState::Paused,
target: TaskState::PausedFlushing,
- },
- );
+ }) => (),
+ other => panic!("unexpected {:?}", other),
+ }
assert_eq!(task.state(), TaskState::PausedFlushing);
block_on(flush_start_receiver.next());
gst::debug!(RUNTIME_CAT, "pause_flush_start: stopping flush");
- assert_eq!(
- task.flush_stop().unwrap(),
- TransitionStatus::Complete {
+ match task.flush_stop() {
+ Ok(TransitionStatus::Complete {
origin: TaskState::PausedFlushing,
target: TaskState::Paused,
- },
- );
+ }) => (),
+ other => panic!("unexpected {:?}", other),
+ }
assert_eq!(task.state(), TaskState::Paused);
block_on(flush_stop_receiver.next());
@@ -2501,13 +2500,13 @@ mod tests {
started_receiver.try_next().unwrap_err();
gst::debug!(RUNTIME_CAT, "pause_flush_start: starting after flushing");
- assert_eq!(
- task.start().unwrap(),
- TransitionStatus::Complete {
+ match task.start() {
+ Ok(TransitionStatus::Complete {
origin: TaskState::Paused,
target: TaskState::Started,
- },
- );
+ }) => (),
+ other => panic!("unexpected {:?}", other),
+ }
assert_eq!(task.state(), TaskState::Started);
block_on(started_receiver.next());
@@ -2586,26 +2585,26 @@ mod tests {
block_on(flush_start_receiver.next());
gst::debug!(RUNTIME_CAT, "pause_flushing_start: starting while flushing");
- assert_eq!(
- task.start().unwrap(),
- TransitionStatus::Complete {
+ match task.start() {
+ Ok(TransitionStatus::Complete {
origin: TaskState::PausedFlushing,
target: TaskState::Flushing,
- },
- );
+ }) => (),
+ other => panic!("unexpected {:?}", other),
+ }
assert_eq!(task.state(), TaskState::Flushing);
// start action not executed
started_receiver.try_next().unwrap_err();
gst::debug!(RUNTIME_CAT, "pause_flushing_start: stopping flush");
- assert_eq!(
- task.flush_stop().unwrap(),
- TransitionStatus::Complete {
+ match task.flush_stop() {
+ Ok(TransitionStatus::Complete {
origin: TaskState::Flushing,
target: TaskState::Started,
- },
- );
+ }) => (),
+ other => panic!("unexpected {:?}", other),
+ }
assert_eq!(task.state(), TaskState::Started);
block_on(flush_stop_receiver.next());
block_on(started_receiver.next());
@@ -2677,18 +2676,20 @@ mod tests {
gst::debug!(RUNTIME_CAT, "flush_concurrent_start: // flush_start");
ready_sender.send(()).unwrap();
let res = task_clone.flush_start().unwrap();
- match res {
+ let ack_handle = match res {
TransitionStatus::Async {
trigger: Trigger::FlushStart,
origin: TaskState::Paused,
- } => (),
+ ack_handle,
+ } => ack_handle,
TransitionStatus::Async {
trigger: Trigger::FlushStart,
origin: TaskState::Started,
- } => (),
- other => unreachable!("{:?}", other),
- }
- Context::drain_sub_tasks().await.unwrap();
+ ack_handle,
+ } => ack_handle,
+ other => panic!("unexpected {:?}", other),
+ };
+ ack_handle.await.unwrap().unwrap();
flush_start_receiver.next().await.unwrap();
});
@@ -2709,20 +2710,19 @@ mod tests {
origin: TaskState::PausedFlushing,
target: TaskState::Flushing,
} => (),
- other => unreachable!("{:?}", other),
+ other => panic!("unexpected {:?}", other),
}
block_on(flush_start_handle).unwrap();
gst::debug!(RUNTIME_CAT, "flush_concurrent_start: requesting flush_stop");
- assert_eq!(
- task.flush_stop().unwrap(),
- TransitionStatus::Complete {
+ match task.flush_stop() {
+ Ok(TransitionStatus::Complete {
origin: TaskState::Flushing,
target: TaskState::Started,
- },
- );
-
+ }) => (),
+ other => panic!("unexpected {:?}", other),
+ }
assert_eq!(task.state(), TaskState::Started);
block_on(flush_stop_receiver.next());