Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.freedesktop.org/gstreamer/gst-plugins-rs.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorFrançois Laignel <fengalin@free.fr>2022-08-03 18:07:28 +0300
committerFrançois Laignel <fengalin@free.fr>2022-08-09 20:48:06 +0300
commitd4061774a46f52523c42f718a0ebcf7b85910d29 (patch)
tree7a0774a0378dbfab0b8152cf791ec205393394e6 /generic/threadshare/src
parent625fce3934470698abd7facfe4dba1804a84fa3b (diff)
ts/Task: return a future for state transitions
State transitions request functions hid the synchronization details to the caller: - If the transition was requested from a Context, a subtask was added or the transition ack was not awaited if the new transition was requested from a running transition or an iteration function. - If the transition was not requested from a Context, current thread was blocked until the ack was received. This strategy facilitated code in elements, but it suffered from the following shortcomings: - The `prepare` transition request didn't comply with the above strategy and would always return an `Async` form. This was designed to accomodate the `Prepare` function for elements such as `ts-tcpclientsrc` which takes times due to the TCP socket connection delays. The idea was that the actual transition result would be available after calling `start`. This was a disadvantage for elements which would prefer to error immediately in the event of a preparation failure. - Hidding the transition request synchronization to the caller meant that they had no options but relying on the internal mechanism. E.g.: it was not possible to `start` from another async runtime without blocking. Also it was not possible to request a transition and choose not to await for the ack. This commit introduces a more flexible API for state transitions requests: - The transition request function now return a `TransitionStatus`, which is a Future. - When an error occurs immediately (e.g. the transition request is not autorized due to current state of the Task), the `TransitionStatus` is resolved immediately and can be `check`ed for errors. This is useful for functions such as `pepare` in the case of `ts-tcpclientsrc` (see above). This is also useful for `pause`, because in current design, the transition is always async. Note however, that `pause` is forseen to adhere to the same behaviour as the other transition requests in the near future [1]. - If the caller chooses to await for the ack and they don't know if they are running on a ts Context (e.g. in `Pad{Src,Sink}` handlers), they can call `await_maybe_on_context`. This is mostly the same behaviour as the one that used to be performed internaly. - If the caller knows for sure they can't possibly block an async executor, they can call `block_on` which is more explicite, but will nonetheless make sure no ts Context is being blocked. This last check was introduced as it was considered low overhead while it should ease preventing missues in cases where the above functions should be used. [1]: https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/793#note_1464400
Diffstat (limited to 'generic/threadshare/src')
-rw-r--r--generic/threadshare/src/runtime/task.rs1443
1 files changed, 796 insertions, 647 deletions
diff --git a/generic/threadshare/src/runtime/task.rs b/generic/threadshare/src/runtime/task.rs
index d3fdf39f8..559086188 100644
--- a/generic/threadshare/src/runtime/task.rs
+++ b/generic/threadshare/src/runtime/task.rs
@@ -1,4 +1,4 @@
-// Copyright (C) 2019-2020 François Laignel <fengalin@free.fr>
+// Copyright (C) 2019-2022 François Laignel <fengalin@free.fr>
// Copyright (C) 2020 Sebastian Dröge <sebastian@centricular.com>
//
// This library is free software; you can redistribute it and/or
@@ -28,8 +28,10 @@ use futures::stream::StreamExt;
use std::fmt;
use std::ops::Deref;
+use std::pin::Pin;
use std::stringify;
use std::sync::{Arc, Mutex, MutexGuard};
+use std::task::Poll;
use super::{Context, JoinHandle, RUNTIME_CAT};
@@ -58,6 +60,25 @@ pub enum Trigger {
Unprepare,
}
+/// Transition success details.
+#[derive(Clone, Copy, Debug, Eq, PartialEq)]
+pub enum TransitionOk {
+ /// Transition completed successfully.
+ Complete {
+ origin: TaskState,
+ target: TaskState,
+ },
+ /// Not waiting for transition result.
+ ///
+ /// This is to prevent:
+ /// - A deadlock when executing a transition action.
+ /// - A potential infinite wait when pausing a running loop
+ /// which could be awaiting for an `iterate` to complete.
+ NotWaiting { trigger: Trigger, origin: TaskState },
+ /// Skipping triggering event due to current state.
+ Skipped { trigger: Trigger, state: TaskState },
+}
+
/// TriggeringEvent error details.
#[derive(Clone, Debug, Eq, PartialEq)]
pub struct TransitionError {
@@ -84,35 +105,214 @@ impl From<TransitionError> for gst::ErrorMessage {
}
}
-// FIXME impl Future so that we can await without matching on the variant
-/// Transition details.
+/// Transition status.
///
/// A state transition occurs as a result of a triggering event.
-#[derive(Debug)]
+/// The triggering event is asynchronously handled by a state machine
+/// running on a [`Context`].
+#[must_use = "This `TransitionStatus` may be `Pending`. In most cases it should be awaited. See `await_maybe_on_context`"]
pub enum TransitionStatus {
- /// Transition completed successfully.
- Complete {
- origin: TaskState,
- target: TaskState,
- },
- /// Asynchronously awaiting for transition completion.
- ///
- /// This occurs when the event is triggered from a `Context`.
- Async {
+ /// Transition result is ready.
+ Ready(Result<TransitionOk, TransitionError>),
+ /// Transition is pending.
+ Pending {
trigger: Trigger,
origin: TaskState,
- ack_handle: JoinHandle<Result<TransitionStatus, TransitionError>>,
+ res_fut: Pin<Box<dyn Future<Output = Result<TransitionOk, TransitionError>> + Send>>,
},
- // FIXME remove or edit doc
- /// Not waiting for transition completion.
+}
+
+impl TransitionStatus {
+ pub fn is_ready(&self) -> bool {
+ matches!(self, TransitionStatus::Ready { .. })
+ }
+
+ pub fn is_pending(&self) -> bool {
+ matches!(self, TransitionStatus::Pending { .. })
+ }
+
+ /// Converts the `TransitionStatus` into a `Result`.
///
- /// This is to prevent:
- /// - A deadlock when executing a transition action.
- /// - A potential infinite wait when pausing a running loop
- /// which could be awaiting for an `iterate` to complete.
- NotWaiting { trigger: Trigger, origin: TaskState },
- /// Skipping triggering event due to current state.
- Skipped { trigger: Trigger, state: TaskState },
+ /// This function allows getting the `TransitionError` when
+ /// the transition result is ready without `await`ing nor blocking.
+ ///
+ /// See also [`Self::await_maybe_on_context`].
+ // FIXME once stabilized, this could use https://github.com/rust-lang/rust/issues/84277
+ pub fn check(self) -> Result<TransitionStatus, TransitionError> {
+ match self {
+ TransitionStatus::Ready(Err(err)) => Err(err),
+ other => Ok(other),
+ }
+ }
+
+ /// Awaits for this transition to complete, possibly while running on a [`Context`].
+ ///
+ /// Notes:
+ ///
+ /// - When running in an `async` block within a running transition or
+ /// task iteration, don't await for the transition as it would deadlock.
+ /// Use [`Self::check`] to make sure the state transition is valid.
+ /// - When running in an `async` block out of a running transition or
+ /// task iteration, just `.await` normally. E.g.:
+ ///
+ /// ```
+ /// # use gstthreadshare::runtime::task::{Task, TransitionOk, TransitionError};
+ /// # async fn async_fn() -> Result<TransitionOk, TransitionError> {
+ /// # let task = Task::default();
+ /// let flush_ok = task.flush_start().await?;
+ /// # Ok(flush_ok)
+ /// # }
+ /// ```
+ ///
+ /// This function makes sure the transition completes successfully or
+ /// produces an error. It must be used in situations where we don't know
+ /// whether we are running on a [`Context`] or not. This is the case for
+ /// functions in [`PadSrc`] and [`PadSink`] as well as the synchronous
+ /// functions transitively called from them.
+ ///
+ /// As an example, a `PadSrc::src_event` function which handles a
+ /// `FlushStart` could call:
+ ///
+ /// ```
+ /// # fn src_event() -> bool {
+ /// # let task = gstthreadshare::runtime::Task::default();
+ /// return task
+ /// .flush_start()
+ /// .await_maybe_on_context()
+ /// .is_ok();
+ /// # }
+ /// ```
+ ///
+ /// If the transition is already complete, the result is returned immediately.
+ ///
+ /// If we are NOT running on a [`Context`], the transition result is awaited
+ /// by blocking on current thread and the result is returned.
+ ///
+ /// If we are running on a [`Context`], the transition result is awaited
+ /// in a sub task for current [`Context`]'s Scheduler task. As a consequence,
+ /// the sub task will be awaited in usual [`Context::drain_sub_tasks`]
+ /// rendezvous, ensuring some kind of synchronization. To avoid deadlocks,
+ /// `Ok(TransitionOk::NotWaiting { .. })` is immediately returned.
+ ///
+ /// [`PadSrc`]: ../pad/struct.PadSrc.html
+ /// [`PadSink`]: ../pad/struct.PadSink.html
+ pub fn await_maybe_on_context(self) -> Result<TransitionOk, TransitionError> {
+ use TransitionStatus::*;
+ match self {
+ Pending {
+ trigger,
+ origin,
+ res_fut,
+ } => {
+ if let Some(cur_ctx) = Context::current() {
+ gst::debug!(
+ RUNTIME_CAT,
+ "Awaiting for {:?} ack in a subtask on context {}",
+ trigger,
+ cur_ctx.name()
+ );
+ let _ = Context::add_sub_task(async move {
+ let res = res_fut.await;
+ if res.is_ok() {
+ gst::log!(RUNTIME_CAT, "Received ack {:?} for {:?}", res, trigger);
+ } else {
+ gst::error!(RUNTIME_CAT, "Received ack {:?} for {:?}", res, trigger);
+ }
+
+ Ok(())
+ });
+
+ Ok(TransitionOk::NotWaiting { trigger, origin })
+ } else {
+ gst::debug!(
+ RUNTIME_CAT,
+ "Awaiting for {:?} ack on current thread",
+ trigger,
+ );
+ futures::executor::block_on(res_fut)
+ }
+ }
+ Ready(res) => res,
+ }
+ }
+
+ /// Awaits for this transition to complete by blocking current thread.
+ ///
+ /// This function blocks until the transition completes successfully or
+ /// produces an error.
+ ///
+ /// In situations where we don't know whether we are running on a [`Context`]
+ /// or not, use [`Self::await_maybe_on_context`] instead.
+ ///
+ /// # Panics
+ ///
+ /// Panics if current thread is a [`Context`] thread.
+ pub fn block_on(self) -> Result<TransitionOk, TransitionError> {
+ assert!(!Context::is_context_thread());
+ use TransitionStatus::*;
+ match self {
+ Pending {
+ trigger, res_fut, ..
+ } => {
+ gst::debug!(
+ RUNTIME_CAT,
+ "Awaiting for {:?} ack on current thread",
+ trigger,
+ );
+ futures::executor::block_on(res_fut)
+ }
+ Ready(res) => res,
+ }
+ }
+}
+
+impl Future for TransitionStatus {
+ type Output = Result<TransitionOk, TransitionError>;
+
+ fn poll(mut self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
+ use TransitionStatus::*;
+
+ match &mut *self {
+ Ready(res) => Poll::Ready(res.clone()),
+ Pending { res_fut, .. } => match Pin::new(res_fut).poll(cx) {
+ Poll::Pending => Poll::Pending,
+ Poll::Ready(res) => {
+ *self = Ready(res.clone());
+
+ Poll::Ready(res)
+ }
+ },
+ }
+ }
+}
+
+impl From<TransitionOk> for TransitionStatus {
+ fn from(ok: TransitionOk) -> Self {
+ Self::Ready(Ok(ok))
+ }
+}
+
+impl From<TransitionError> for TransitionStatus {
+ fn from(err: TransitionError) -> Self {
+ Self::Ready(Err(err))
+ }
+}
+
+// Explicit impl due to `res_fut` not implementing `Debug`.
+impl fmt::Debug for TransitionStatus {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ use TransitionStatus::*;
+ match self {
+ Ready(res) => f.debug_tuple("Ready").field(res).finish(),
+ Pending {
+ trigger, origin, ..
+ } => f
+ .debug_struct("Pending")
+ .field("trigger", trigger)
+ .field("origin", origin)
+ .finish(),
+ }
+ }
}
/// Implementation trait for `Task`s.
@@ -222,8 +422,8 @@ pub trait TaskImpl: Send + 'static {
}
}
-type AckSender = oneshot::Sender<Result<TransitionStatus, TransitionError>>;
-type AckReceiver = oneshot::Receiver<Result<TransitionStatus, TransitionError>>;
+type AckSender = oneshot::Sender<Result<TransitionOk, TransitionError>>;
+type AckReceiver = oneshot::Receiver<Result<TransitionOk, TransitionError>>;
struct TriggeringEvent {
trigger: Trigger,
@@ -238,7 +438,7 @@ impl TriggeringEvent {
(req, ack_rx)
}
- fn send_ack(self, res: Result<TransitionStatus, TransitionError>) {
+ fn send_ack(self, res: Result<TransitionOk, TransitionError>) {
let _ = self.ack_tx.send(res);
}
@@ -279,12 +479,7 @@ impl StateMachineHandle {
let (triggering_evt, ack_rx) = TriggeringEvent::new(trigger);
gst::log!(RUNTIME_CAT, "Pushing {:?}", triggering_evt);
-
- self.triggering_evt_tx
- .try_send(triggering_evt)
- .unwrap_or_else(|err| {
- panic!("trigger channel failure: {}", err);
- });
+ self.triggering_evt_tx.try_send(triggering_evt).unwrap();
self.context.unpark();
@@ -317,7 +512,7 @@ impl Default for TaskInner {
impl TaskInner {
fn switch_to_state(&mut self, target_state: TaskState, triggering_evt: TriggeringEvent) {
- let res = Ok(TransitionStatus::Complete {
+ let res = Ok(TransitionOk::Complete {
origin: self.state,
target: target_state,
});
@@ -345,7 +540,7 @@ impl TaskInner {
}
fn skip_triggering_evt(&mut self, triggering_evt: TriggeringEvent) {
- let res = Ok(TransitionStatus::Skipped {
+ let res = Ok(TransitionOk::Skipped {
trigger: triggering_evt.trigger,
state: self.state,
});
@@ -424,11 +619,7 @@ impl Task {
TaskStateGuard(self.0.lock().unwrap())
}
- pub fn prepare(
- &self,
- task_impl: impl TaskImpl,
- context: Context,
- ) -> Result<TransitionStatus, TransitionError> {
+ pub fn prepare(&self, task_impl: impl TaskImpl, context: Context) -> TransitionStatus {
let mut inner = self.0.lock().unwrap();
let origin = inner.state;
@@ -436,21 +627,23 @@ impl Task {
TaskState::Unprepared => (),
TaskState::Prepared | TaskState::Preparing => {
gst::debug!(RUNTIME_CAT, "Task already {:?}", origin);
- return Ok(TransitionStatus::Skipped {
+ return TransitionOk::Skipped {
trigger: Trigger::Prepare,
state: origin,
- });
+ }
+ .into();
}
state => {
gst::warning!(RUNTIME_CAT, "Attempt to prepare Task in state {:?}", state);
- return Err(TransitionError {
+ return TransitionError {
trigger: Trigger::Prepare,
state: inner.state,
err_msg: gst::error_msg!(
gst::CoreError::StateChange,
["Attempt to prepare Task in state {:?}", state]
),
- });
+ }
+ .into();
}
}
@@ -462,33 +655,23 @@ impl Task {
inner.state_machine_handle = Some(StateMachine::spawn(
self.0.clone(),
Box::new(task_impl),
- context.clone(),
+ context,
));
- let ack_rx = inner.trigger(Trigger::Prepare)?;
- drop(inner);
-
- let ack_await_fut = async move {
- gst::trace!(RUNTIME_CAT, "Awaiting ack for Prepare");
-
- let res = ack_rx.await.unwrap();
- if res.is_ok() {
- gst::log!(RUNTIME_CAT, "Received ack {:?} for Prepare", res);
- } else {
- gst::error!(RUNTIME_CAT, "Received ack {:?} for Prepare", res);
- }
-
- res
+ let ack_rx = match inner.trigger(Trigger::Prepare) {
+ Ok(ack_rx) => ack_rx,
+ Err(err) => return err.into(),
};
+ drop(inner);
- Ok(TransitionStatus::Async {
+ TransitionStatus::Pending {
trigger: Trigger::Prepare,
origin: TaskState::Unprepared,
- ack_handle: context.spawn_and_unpark(ack_await_fut),
- })
+ res_fut: Box::pin(ack_rx.map(Result::unwrap)),
+ }
}
- pub fn unprepare(&self) -> Result<TransitionStatus, TransitionError> {
+ pub fn unprepare(&self) -> TransitionStatus {
let mut inner = self.0.lock().unwrap();
let origin = inner.state;
@@ -505,10 +688,11 @@ impl Task {
}
None => {
gst::debug!(RUNTIME_CAT, "Task already unpreparing");
- return Ok(TransitionStatus::Skipped {
+ return TransitionOk::Skipped {
trigger: Trigger::Unprepare,
state: origin,
- });
+ }
+ .into();
}
},
state => {
@@ -517,14 +701,15 @@ impl Task {
"Attempt to unprepare Task in state {:?}",
state
);
- return Err(TransitionError {
+ return TransitionError {
trigger: Trigger::Unprepare,
state: inner.state,
err_msg: gst::error_msg!(
gst::CoreError::StateChange,
["Attempt to unprepare Task in state {:?}", state]
),
- });
+ }
+ .into();
}
};
@@ -534,62 +719,43 @@ impl Task {
let state_machine_end_fut = async {
state_machine_handle.join().await;
-
- let res = ack_rx.await.unwrap();
- if res.is_ok() {
- gst::log!(RUNTIME_CAT, "Received ack {:?} for Unprepare", res);
- } else {
- gst::error!(RUNTIME_CAT, "Received ack {:?} for Unprepare", res);
- }
-
- res
+ ack_rx.await.unwrap()
};
- if let Some(cur_context) = Context::current() {
- let ack_handle = cur_context.spawn_and_unpark(state_machine_end_fut);
- gst::log!(
- RUNTIME_CAT,
- "Will wait for state machine termination completion in {:?} on context {}",
- ack_handle.task_id(),
- cur_context.name()
- );
-
- return Ok(TransitionStatus::Async {
- trigger: Trigger::Unprepare,
- origin,
- ack_handle,
- });
- } else {
- gst::log!(
- RUNTIME_CAT,
- "Waiting for state machine termination on current thread"
- );
- // Use a light-weight executor, no timer nor async io.
- futures::executor::block_on(state_machine_end_fut.map(drop))
- }
-
- Ok(TransitionStatus::Complete {
+ TransitionStatus::Pending {
+ trigger: Trigger::Unprepare,
origin,
- target: TaskState::Unprepared,
- })
+ res_fut: Box::pin(state_machine_end_fut),
+ }
}
/// Starts the `Task`.
///
/// The execution occurs on the `Task` context.
- pub fn start(&self) -> Result<TransitionStatus, TransitionError> {
+ pub fn start(&self) -> TransitionStatus {
let mut inner = self.0.lock().unwrap();
- let ack_rx = inner.trigger(Trigger::Start)?;
+ let ack_rx = match inner.trigger(Trigger::Start) {
+ Ok(ack_rx) => ack_rx,
+ Err(err) => return err.into(),
+ };
if let TaskState::Started = inner.state {
- return Ok(TransitionStatus::Skipped {
+ return TransitionOk::Skipped {
trigger: Trigger::Start,
state: TaskState::Started,
- });
+ }
+ .into();
}
- Self::await_ack(inner, ack_rx, Trigger::Start)
+ let origin = inner.state;
+ drop(inner);
+
+ TransitionStatus::Pending {
+ trigger: Trigger::Start,
+ origin,
+ res_fut: Box::pin(ack_rx.map(Result::unwrap)),
+ }
}
/// Requests the `Task` loop to pause.
@@ -597,32 +763,43 @@ impl Task {
/// If an iteration is in progress, it will run to completion,
/// then no more iteration will be executed before `start` is called again.
/// Therefore, it is not guaranteed that `Paused` is reached when `pause` returns.
- pub fn pause(&self) -> Result<TransitionStatus, TransitionError> {
+ pub fn pause(&self) -> TransitionStatus {
let mut inner = self.0.lock().unwrap();
- let ack_rx = inner.trigger(Trigger::Pause)?;
+ let ack_rx = match inner.trigger(Trigger::Pause) {
+ Ok(ack_rx) => ack_rx,
+ Err(err) => return err.into(),
+ };
if let TaskState::Started = inner.state {
- // FIXME this could be async
- return Ok(TransitionStatus::NotWaiting {
+ // FIXME this could be async when iterate is split into next_item / handle_item
+ return TransitionOk::NotWaiting {
trigger: Trigger::Pause,
origin: TaskState::Started,
- });
+ }
+ .into();
}
- Self::await_ack(inner, ack_rx, Trigger::Pause)
+ let origin = inner.state;
+ drop(inner);
+
+ TransitionStatus::Pending {
+ trigger: Trigger::Pause,
+ origin,
+ res_fut: Box::pin(ack_rx.map(Result::unwrap)),
+ }
}
- pub fn flush_start(&self) -> Result<TransitionStatus, TransitionError> {
+ pub fn flush_start(&self) -> TransitionStatus {
self.abort_push_await(Trigger::FlushStart)
}
- pub fn flush_stop(&self) -> Result<TransitionStatus, TransitionError> {
+ pub fn flush_stop(&self) -> TransitionStatus {
self.abort_push_await(Trigger::FlushStop)
}
/// Stops the `Started` `Task` and wait for it to finish.
- pub fn stop(&self) -> Result<TransitionStatus, TransitionError> {
+ pub fn stop(&self) -> TransitionStatus {
self.abort_push_await(Trigger::Stop)
}
@@ -632,59 +809,22 @@ impl Task {
/// - Aborts the iteration loop aborts.
/// - Pushes the provided [`Trigger`].
/// - Awaits for the expected transition as usual.
- fn abort_push_await(&self, trigger: Trigger) -> Result<TransitionStatus, TransitionError> {
+ fn abort_push_await(&self, trigger: Trigger) -> TransitionStatus {
let mut inner = self.0.lock().unwrap();
inner.abort_task_loop();
- let ack_rx = inner.trigger(trigger)?;
- Self::await_ack(inner, ack_rx, trigger)
- }
+ let ack_rx = match inner.trigger(trigger) {
+ Ok(ack_rx) => ack_rx,
+ Err(err) => return err.into(),
+ };
- fn await_ack(
- inner: MutexGuard<TaskInner>,
- ack_rx: oneshot::Receiver<Result<TransitionStatus, TransitionError>>,
- trigger: Trigger,
- ) -> Result<TransitionStatus, TransitionError> {
let origin = inner.state;
drop(inner);
- let ack_await_fut = async move {
- gst::trace!(RUNTIME_CAT, "Awaiting ack for {:?}", trigger);
-
- let res = ack_rx.await.unwrap();
- if res.is_ok() {
- gst::log!(RUNTIME_CAT, "Received ack {:?} for {:?}", res, trigger);
- } else {
- gst::error!(RUNTIME_CAT, "Received ack {:?} for {:?}", res, trigger);
- }
-
- res
- };
-
- if let Some(cur_context) = Context::current() {
- let ack_handle = cur_context.spawn(ack_await_fut);
-
- gst::log!(
- RUNTIME_CAT,
- "Awaiting ack for {:?} in {:?} on context {}",
- trigger,
- ack_handle.task_id(),
- cur_context.name()
- );
-
- Ok(TransitionStatus::Async {
- trigger,
- origin,
- ack_handle,
- })
- } else {
- gst::log!(
- RUNTIME_CAT,
- "Awaiting ack for {:?} on current thread",
- trigger
- );
- // Use a light-weight executor, no timer nor async io.
- futures::executor::block_on(ack_await_fut)
+ TransitionStatus::Pending {
+ trigger,
+ origin,
+ res_fut: Box::pin(ack_rx.map(Result::unwrap)),
}
}
}
@@ -790,7 +930,7 @@ impl StateMachine {
);
if let Ok(triggering_evt) = res {
let mut task_inner = task_inner.lock().unwrap();
- let res = Ok(TransitionStatus::Complete {
+ let res = Ok(TransitionOk::Complete {
origin: TaskState::Unprepared,
target: TaskState::Prepared,
});
@@ -1095,9 +1235,14 @@ mod tests {
use futures::executor::block_on;
use std::time::Duration;
+ use super::{TaskState::*, TransitionOk::*, TransitionStatus::*, Trigger::*, *};
use crate::runtime::Context;
- use super::*;
+ #[track_caller]
+ fn stop_then_unprepare(task: Task) {
+ task.stop().block_on().unwrap();
+ task.unprepare().block_on().unwrap();
+ }
#[test]
fn iterate() {
@@ -1117,7 +1262,7 @@ mod tests {
impl TaskImpl for TaskTest {
fn prepare(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
async move {
- gst::debug!(RUNTIME_CAT, "task_iterate: prepared");
+ gst::debug!(RUNTIME_CAT, "iterate: prepared");
self.prepared_sender.send(()).await.unwrap();
Ok(())
}
@@ -1126,7 +1271,7 @@ mod tests {
fn start(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
async move {
- gst::debug!(RUNTIME_CAT, "task_iterate: started");
+ gst::debug!(RUNTIME_CAT, "iterate: started");
self.started_sender.send(()).await.unwrap();
Ok(())
}
@@ -1135,21 +1280,18 @@ mod tests {
fn iterate(&mut self) -> BoxFuture<'_, Result<(), gst::FlowError>> {
async move {
- gst::debug!(RUNTIME_CAT, "task_iterate: entering iterate");
+ gst::debug!(RUNTIME_CAT, "iterate: entering iterate");
self.iterate_sender.send(()).await.unwrap();
- gst::debug!(
- RUNTIME_CAT,
- "task_iterate: awaiting complete_iterate_receiver"
- );
+ gst::debug!(RUNTIME_CAT, "iterate: awaiting complete_iterate_receiver");
let res = self.complete_iterate_receiver.next().await.unwrap();
if res.is_ok() {
- gst::debug!(RUNTIME_CAT, "task_iterate: received Ok => keep looping");
+ gst::debug!(RUNTIME_CAT, "iterate: received Ok => keep looping");
} else {
gst::debug!(
RUNTIME_CAT,
- "task_iterate: received {:?} => cancelling loop",
+ "iterate: received {:?} => cancelling loop",
res
);
}
@@ -1161,7 +1303,7 @@ mod tests {
fn pause(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
async move {
- gst::debug!(RUNTIME_CAT, "task_iterate: paused");
+ gst::debug!(RUNTIME_CAT, "iterate: paused");
self.paused_sender.send(()).await.unwrap();
Ok(())
}
@@ -1170,7 +1312,7 @@ mod tests {
fn stop(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
async move {
- gst::debug!(RUNTIME_CAT, "task_iterate: stopped");
+ gst::debug!(RUNTIME_CAT, "iterate: stopped");
self.stopped_sender.send(()).await.unwrap();
Ok(())
}
@@ -1179,7 +1321,7 @@ mod tests {
fn flush_start(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
async move {
- gst::debug!(RUNTIME_CAT, "task_iterate: stopped");
+ gst::debug!(RUNTIME_CAT, "iterate: stopped");
self.flush_start_sender.send(()).await.unwrap();
Ok(())
}
@@ -1188,20 +1330,20 @@ mod tests {
fn unprepare(&mut self) -> BoxFuture<'_, ()> {
async move {
- gst::debug!(RUNTIME_CAT, "task_iterate: unprepared");
+ gst::debug!(RUNTIME_CAT, "iterate: unprepared");
self.unprepared_sender.send(()).await.unwrap();
}
.boxed()
}
}
- let context = Context::acquire("task_iterate", Duration::from_millis(2)).unwrap();
+ let context = Context::acquire("iterate", Duration::from_millis(2)).unwrap();
let task = Task::default();
- assert_eq!(task.state(), TaskState::Unprepared);
+ assert_eq!(task.state(), Unprepared);
- gst::debug!(RUNTIME_CAT, "task_iterate: preparing");
+ gst::debug!(RUNTIME_CAT, "iterate: preparing");
let (prepared_sender, mut prepared_receiver) = mpsc::channel(1);
let (started_sender, mut started_receiver) = mpsc::channel(1);
@@ -1211,7 +1353,7 @@ mod tests {
let (stopped_sender, mut stopped_receiver) = mpsc::channel(1);
let (flush_start_sender, mut flush_start_receiver) = mpsc::channel(1);
let (unprepared_sender, mut unprepared_receiver) = mpsc::channel(1);
- let res = task.prepare(
+ let prepare_status = task.prepare(
TaskTest {
prepared_sender,
started_sender,
@@ -1225,65 +1367,80 @@ mod tests {
context,
);
- let prepare_ack_handle = match res {
- Ok(TransitionStatus::Async {
- trigger: Trigger::Prepare,
- origin: TaskState::Unprepared,
- ack_handle,
- }) => ack_handle,
- other => panic!("unexpected {:?}", other),
+ assert!(prepare_status.is_pending());
+ match prepare_status {
+ Pending {
+ trigger: Prepare,
+ origin: Unprepared,
+ ..
+ } => (),
+ other => panic!("{:?}", other),
};
- gst::debug!(RUNTIME_CAT, "task_iterate: starting (initial)");
-
- match task.start() {
- Ok(TransitionStatus::Complete {
- origin: TaskState::Prepared,
- target: TaskState::Started,
- }) => (),
- other => panic!("unexpected {:?}", other),
- }
+ gst::debug!(RUNTIME_CAT, "iterate: starting (async prepare)");
+ // also tests await_maybe_on_context
+ assert_eq!(
+ task.start().await_maybe_on_context().unwrap(),
+ Complete {
+ origin: Prepared,
+ target: Started,
+ }
+ );
+ assert_eq!(task.state(), Started);
- assert_eq!(task.state(), TaskState::Started);
// At this point, preparation must be complete
- match block_on(prepare_ack_handle).unwrap() {
- Ok(TransitionStatus::Complete {
- origin: TaskState::Unprepared,
- target: TaskState::Prepared,
- }) => (),
- other => panic!("unexpected {:?}", other),
- }
+ // also tests await_maybe_on_context
+ assert_eq!(
+ prepare_status.await_maybe_on_context().unwrap(),
+ Complete {
+ origin: Unprepared,
+ target: Prepared,
+ },
+ );
block_on(prepared_receiver.next()).unwrap();
// ... and start executed
block_on(started_receiver.next()).unwrap();
- assert_eq!(task.state(), TaskState::Started);
+ assert_eq!(task.state(), Started);
// unlock task loop and keep looping
block_on(iterate_receiver.next()).unwrap();
block_on(complete_iterate_sender.send(Ok(()))).unwrap();
- gst::debug!(RUNTIME_CAT, "task_iterate: starting (redundant)");
+ gst::debug!(RUNTIME_CAT, "iterate: starting (redundant)");
// already started
- match task.start() {
- Ok(TransitionStatus::Skipped {
- trigger: Trigger::Start,
- state: TaskState::Started,
- }) => (),
- other => panic!("unexpected {:?}", other),
+ assert_eq!(
+ task.start().block_on().unwrap(),
+ Skipped {
+ trigger: Start,
+ state: Started,
+ },
+ );
+ assert_eq!(task.state(), Started);
+
+ // Attempt to prepare Task in state Started (also tests check)
+ match task.unprepare().check().unwrap_err() {
+ TransitionError {
+ trigger: Unprepare,
+ state: Started,
+ ..
+ } => (),
+ other => panic!("{:?}", other),
}
- assert_eq!(task.state(), TaskState::Started);
- gst::debug!(RUNTIME_CAT, "task_iterate: pause (initial)");
- match task.pause() {
- Ok(TransitionStatus::NotWaiting {
- trigger: Trigger::Pause,
- origin: TaskState::Started,
- }) => (),
- other => panic!("unexpected {:?}", other),
+ gst::debug!(RUNTIME_CAT, "iterate: pause (initial)");
+ let pause_status = task.pause();
+ assert!(pause_status.is_ready());
+ // also tests `check`
+ match pause_status.check().unwrap() {
+ Ready(Ok(NotWaiting {
+ trigger: Pause,
+ origin: Started,
+ })) => (),
+ other => panic!("{:?}", other),
}
- // Pause transition is asynchronous
+ // Pause transition is asynchronous FIXME
while TaskState::Paused != task.state() {
std::thread::sleep(Duration::from_millis(2));
@@ -1293,52 +1450,52 @@ mod tests {
}
}
- gst::debug!(RUNTIME_CAT, "task_iterate: awaiting pause ack");
+ gst::debug!(RUNTIME_CAT, "iterate: awaiting pause ack");
block_on(paused_receiver.next()).unwrap();
- gst::debug!(RUNTIME_CAT, "task_iterate: starting (after pause)");
- match task.start() {
- Ok(TransitionStatus::Complete {
- origin: TaskState::Paused,
- target: TaskState::Started,
- }) => (),
- other => panic!("unexpected {:?}", other),
- }
+ gst::debug!(RUNTIME_CAT, "iterate: starting (after pause)");
+ assert_eq!(
+ task.start().block_on().unwrap(),
+ Complete {
+ origin: Paused,
+ target: Started,
+ },
+ );
- assert_eq!(task.state(), TaskState::Started);
+ assert_eq!(task.state(), Started);
// Paused -> Started
let _ = block_on(started_receiver.next());
- gst::debug!(RUNTIME_CAT, "task_iterate: stopping");
- match task.stop() {
- Ok(TransitionStatus::Complete {
- origin: TaskState::Started,
- target: TaskState::Stopped,
- }) => (),
- other => panic!("unexpected {:?}", other),
- }
+ gst::debug!(RUNTIME_CAT, "iterate: stopping");
+ assert_eq!(
+ task.stop().block_on().unwrap(),
+ Complete {
+ origin: Started,
+ target: Stopped,
+ },
+ );
- assert_eq!(task.state(), TaskState::Stopped);
+ assert_eq!(task.state(), Stopped);
let _ = block_on(stopped_receiver.next());
// purge remaining iteration received before stop if any
let _ = iterate_receiver.try_next();
- gst::debug!(RUNTIME_CAT, "task_iterate: starting (after stop)");
- match task.start() {
- Ok(TransitionStatus::Complete {
- origin: TaskState::Stopped,
- target: TaskState::Started,
- }) => (),
- other => panic!("unexpected {:?}", other),
- }
+ gst::debug!(RUNTIME_CAT, "iterate: starting (after stop)");
+ assert_eq!(
+ task.start().block_on().unwrap(),
+ Complete {
+ origin: Stopped,
+ target: Started,
+ },
+ );
let _ = block_on(started_receiver.next());
- gst::debug!(RUNTIME_CAT, "task_iterate: req. iterate to return Eos");
+ gst::debug!(RUNTIME_CAT, "iterate: req. iterate to return Eos");
block_on(iterate_receiver.next()).unwrap();
block_on(complete_iterate_sender.send(Err(gst::FlowError::Eos))).unwrap();
- gst::debug!(RUNTIME_CAT, "task_iterate: awaiting stop ack");
+ gst::debug!(RUNTIME_CAT, "iterate: awaiting stop ack");
block_on(stopped_receiver.next()).unwrap();
// Wait for state machine to reach Stopped
@@ -1346,21 +1503,21 @@ mod tests {
std::thread::sleep(Duration::from_millis(2));
}
- gst::debug!(RUNTIME_CAT, "task_iterate: starting (after stop)");
- match task.start() {
- Ok(TransitionStatus::Complete {
- origin: TaskState::Stopped,
- target: TaskState::Started,
- }) => (),
- other => panic!("unexpected {:?}", other),
- }
+ gst::debug!(RUNTIME_CAT, "iterate: starting (after stop)");
+ assert_eq!(
+ task.start().block_on().unwrap(),
+ Complete {
+ origin: Stopped,
+ target: Started,
+ },
+ );
let _ = block_on(started_receiver.next());
- gst::debug!(RUNTIME_CAT, "task_iterate: req. iterate to return Flushing");
+ gst::debug!(RUNTIME_CAT, "iterate: req. iterate to return Flushing");
block_on(iterate_receiver.next()).unwrap();
block_on(complete_iterate_sender.send(Err(gst::FlowError::Flushing))).unwrap();
- gst::debug!(RUNTIME_CAT, "task_iterate: awaiting flush_start ack");
+ gst::debug!(RUNTIME_CAT, "iterate: awaiting flush_start ack");
block_on(flush_start_receiver.next()).unwrap();
// Wait for state machine to reach Flushing
@@ -1368,17 +1525,17 @@ mod tests {
std::thread::sleep(Duration::from_millis(2));
}
- gst::debug!(RUNTIME_CAT, "task_iterate: stop flushing");
- match task.flush_stop() {
- Ok(TransitionStatus::Complete {
- origin: TaskState::Flushing,
- target: TaskState::Started,
- }) => (),
- other => panic!("unexpected {:?}", other),
- }
+ gst::debug!(RUNTIME_CAT, "iterate: stop flushing");
+ assert_eq!(
+ task.flush_stop().block_on().unwrap(),
+ Complete {
+ origin: Flushing,
+ target: Started,
+ },
+ );
let _ = block_on(started_receiver.next());
- gst::debug!(RUNTIME_CAT, "task_iterate: req. iterate to return Error");
+ gst::debug!(RUNTIME_CAT, "iterate: req. iterate to return Error");
block_on(iterate_receiver.next()).unwrap();
block_on(complete_iterate_sender.send(Err(gst::FlowError::Error))).unwrap();
@@ -1387,28 +1544,25 @@ mod tests {
std::thread::sleep(Duration::from_millis(2));
}
- gst::debug!(
- RUNTIME_CAT,
- "task_iterate: attempting to start (after Error)"
- );
- match task.start() {
- Err(TransitionError {
- trigger: Trigger::Start,
+ gst::debug!(RUNTIME_CAT, "iterate: attempting to start (after Error)");
+ match task.start().block_on().unwrap_err() {
+ TransitionError {
+ trigger: Start,
state: TaskState::Error,
..
- }) => (),
- _ => unreachable!(),
+ } => (),
+ other => panic!("{:?}", other),
}
- match task.unprepare() {
- Ok(TransitionStatus::Complete {
+ assert_eq!(
+ task.unprepare().block_on().unwrap(),
+ Complete {
origin: TaskState::Error,
- target: TaskState::Unprepared,
- }) => (),
- other => panic!("unexpected {:?}", other),
- }
+ target: Unprepared,
+ },
+ );
- assert_eq!(task.state(), TaskState::Unprepared);
+ assert_eq!(task.state(), Unprepared);
let _ = block_on(unprepared_receiver.next());
}
@@ -1464,16 +1618,15 @@ mod tests {
let task = Task::default();
- assert_eq!(task.state(), TaskState::Unprepared);
+ assert_eq!(task.state(), Unprepared);
let (prepare_error_sender, mut prepare_error_receiver) = mpsc::channel(1);
- task.prepare(
+ let prepare_status = task.prepare(
TaskPrepareTest {
prepare_error_sender,
},
context,
- )
- .unwrap();
+ );
gst::debug!(
RUNTIME_CAT,
@@ -1481,21 +1634,30 @@ mod tests {
);
block_on(prepare_error_receiver.next()).unwrap();
+ match prepare_status.block_on().unwrap_err() {
+ TransitionError {
+ trigger: Trigger::Error,
+ state: Preparing,
+ ..
+ } => (),
+ other => panic!("{:?}", other),
+ }
+
// Wait for state machine to reach Error
while TaskState::Error != task.state() {
std::thread::sleep(Duration::from_millis(2));
}
- match task.start() {
- Err(TransitionError {
- trigger: Trigger::Start,
+ match task.start().block_on().unwrap_err() {
+ TransitionError {
+ trigger: Start,
state: TaskState::Error,
..
- }) => (),
- other => unreachable!("{:?}", other),
+ } => (),
+ other => panic!("{:?}", other),
}
- task.unprepare().unwrap();
+ block_on(task.unprepare()).unwrap();
}
#[test]
@@ -1549,65 +1711,67 @@ mod tests {
let task = Task::default();
let (mut prepare_sender, prepare_receiver) = mpsc::channel(1);
- task.prepare(TaskPrepareTest { prepare_receiver }, context)
- .unwrap();
+ let _ = task.prepare(TaskPrepareTest { prepare_receiver }, context);
let start_ctx = Context::acquire("prepare_start_ok_requester", Duration::ZERO).unwrap();
let (ready_sender, ready_receiver) = oneshot::channel();
let start_handle = start_ctx.spawn(async move {
- assert_eq!(task.state(), TaskState::Preparing);
+ assert_eq!(task.state(), Preparing);
gst::debug!(RUNTIME_CAT, "prepare_start_ok: starting");
- let ack_handle = match task.start() {
- Ok(TransitionStatus::Async {
- trigger: Trigger::Start,
- origin: TaskState::Preparing,
- ack_handle,
- }) => ack_handle,
- other => panic!("unexpected {:?}", other),
- };
- ready_sender.send(()).unwrap();
- match ack_handle.await.unwrap() {
- Ok(TransitionStatus::Complete {
- origin: TaskState::Prepared,
- target: TaskState::Started,
- }) => (),
- other => panic!("unexpected {:?}", other),
+ let start_status = task.start();
+ match start_status {
+ Pending {
+ trigger: Start,
+ origin: Preparing,
+ ..
+ } => (),
+ other => panic!("{:?}", other),
}
- assert_eq!(task.state(), TaskState::Started);
+ ready_sender.send(()).unwrap();
+ assert_eq!(
+ start_status.await.unwrap(),
+ Complete {
+ origin: Prepared,
+ target: Started,
+ },
+ );
+ assert_eq!(task.state(), Started);
- let ack_handle = match task.stop() {
- Ok(TransitionStatus::Async {
- trigger: Trigger::Stop,
- origin: TaskState::Started,
- ack_handle,
- }) => ack_handle,
- other => panic!("unexpected {:?}", other),
- };
- match ack_handle.await.unwrap() {
- Ok(TransitionStatus::Complete {
- origin: TaskState::Started,
- target: TaskState::Stopped,
- }) => (),
- other => panic!("unexpected {:?}", other),
- }
- assert_eq!(task.state(), TaskState::Stopped);
+ let stop_status = task.stop();
+ match stop_status {
+ Pending {
+ trigger: Stop,
+ origin: Started,
+ ..
+ } => (),
+ other => panic!("{:?}", other),
+ }
+ assert_eq!(
+ stop_status.await.unwrap(),
+ Complete {
+ origin: Started,
+ target: Stopped,
+ },
+ );
+ assert_eq!(task.state(), Stopped);
- let ack_handle = match task.unprepare() {
- Ok(TransitionStatus::Async {
- trigger: Trigger::Unprepare,
- origin: TaskState::Stopped,
- ack_handle,
- }) => ack_handle,
- other => panic!("unexpected {:?}", other),
+ let unprepare_status = task.unprepare();
+ match unprepare_status {
+ Pending {
+ trigger: Unprepare,
+ origin: Stopped,
+ ..
+ } => (),
+ other => panic!("{:?}", other),
};
- match ack_handle.await.unwrap() {
- Ok(TransitionStatus::Complete {
- origin: TaskState::Stopped,
- target: TaskState::Unprepared,
- }) => (),
- other => panic!("unexpected {:?}", other),
- }
- assert_eq!(task.state(), TaskState::Unprepared);
+ assert_eq!(
+ unprepare_status.await.unwrap(),
+ Complete {
+ origin: Stopped,
+ target: Unprepared,
+ },
+ );
+ assert_eq!(task.state(), Unprepared);
});
gst::debug!(RUNTIME_CAT, "prepare_start_ok: awaiting for start_ctx");
@@ -1686,55 +1850,56 @@ mod tests {
let (mut prepare_sender, prepare_receiver) = mpsc::channel(1);
let (prepare_error_sender, mut prepare_error_receiver) = mpsc::channel(1);
- let res = task.prepare(
+ let prepare_status = task.prepare(
TaskPrepareTest {
prepare_receiver,
prepare_error_sender,
},
context,
);
- let prepare_ack = match res {
- Ok(TransitionStatus::Async {
- trigger: Trigger::Prepare,
- origin: TaskState::Unprepared,
- ack_handle,
- }) => ack_handle,
- other => panic!("unexpected {:?}", other),
+ match prepare_status {
+ Pending {
+ trigger: Prepare,
+ origin: Unprepared,
+ ..
+ } => (),
+ other => panic!("{:?}", other),
};
let start_ctx = Context::acquire("prepare_start_error_requester", Duration::ZERO).unwrap();
let (ready_sender, ready_receiver) = oneshot::channel();
let start_handle = start_ctx.spawn(async move {
gst::debug!(RUNTIME_CAT, "prepare_start_error: starting (Err)");
- task.start().unwrap();
+ let _ = task.start();
ready_sender.send(()).unwrap();
// FIXME we loose the origin Trigger (Start)
// and only get the Trigger returned by handle_action_error
// see also: comment in exec_action!
- match prepare_ack.await.unwrap() {
+ match prepare_status.await {
Err(TransitionError {
trigger: Trigger::Error,
- state: TaskState::Preparing,
+ state: Preparing,
..
}) => (),
- other => panic!("unexpected transition res {:?}", other),
+ other => panic!("{:?}", other),
}
- let ack_handle = match task.unprepare().unwrap() {
- TransitionStatus::Async {
- trigger: Trigger::Unprepare,
+ let unprepare_status = task.unprepare();
+ match unprepare_status {
+ Pending {
+ trigger: Unprepare,
origin: TaskState::Error,
- ack_handle,
- } => ack_handle,
- other => panic!("unexpected {:?}", other),
+ ..
+ } => (),
+ other => panic!("{:?}", other),
};
- match ack_handle.await.unwrap() {
- Ok(TransitionStatus::Complete {
+ assert_eq!(
+ unprepare_status.await.unwrap(),
+ Complete {
origin: TaskState::Error,
- target: TaskState::Unprepared,
- }) => (),
- other => panic!("unexpected {:?}", other),
- }
+ target: Unprepared,
+ },
+ );
});
gst::debug!(RUNTIME_CAT, "prepare_start_error: awaiting for start_ctx");
@@ -1797,42 +1962,41 @@ mod tests {
let (iterate_sender, mut iterate_receiver) = mpsc::channel(1);
let (mut complete_sender, complete_receiver) = mpsc::channel(0);
let (paused_sender, mut paused_receiver) = mpsc::channel(1);
- task.prepare(
+ let _ = task.prepare(
TaskPauseStartTest {
iterate_sender,
complete_receiver,
paused_sender,
},
context,
- )
- .unwrap();
+ );
gst::debug!(RUNTIME_CAT, "pause_start: starting");
- match task.start() {
- Ok(TransitionStatus::Complete {
- origin: TaskState::Prepared,
- target: TaskState::Started,
- }) => (),
- other => panic!("unexpected {:?}", other),
- }
- assert_eq!(task.state(), TaskState::Started);
+ assert_eq!(
+ task.start().block_on().unwrap(),
+ Complete {
+ origin: Prepared,
+ target: Started,
+ },
+ );
+ assert_eq!(task.state(), Started);
gst::debug!(RUNTIME_CAT, "pause_start: awaiting 1st iteration");
block_on(iterate_receiver.next()).unwrap();
gst::debug!(RUNTIME_CAT, "pause_start: pausing (1)");
match task.pause() {
- Ok(TransitionStatus::NotWaiting {
- trigger: Trigger::Pause,
- origin: TaskState::Started,
- }) => (),
- other => panic!("unexpected {:?}", other),
+ Ready(Ok(NotWaiting {
+ trigger: Pause,
+ origin: Started,
+ })) => (),
+ other => panic!("{:?}", other),
}
gst::debug!(RUNTIME_CAT, "pause_start: sending 1st iteration completion");
complete_sender.try_send(()).unwrap();
- // Pause transition is asynchronous
+ // Pause transition is asynchronous FIXME
while TaskState::Paused != task.state() {
std::thread::sleep(Duration::from_millis(5));
}
@@ -1842,14 +2006,14 @@ mod tests {
// Loop held on due to Pause
iterate_receiver.try_next().unwrap_err();
- match task.start() {
- Ok(TransitionStatus::Complete {
- origin: TaskState::Paused,
- target: TaskState::Started,
- }) => (),
- other => panic!("unexpected {:?}", other),
- }
- assert_eq!(task.state(), TaskState::Started);
+ assert_eq!(
+ task.start().block_on().unwrap(),
+ Complete {
+ origin: Paused,
+ target: Started,
+ },
+ );
+ assert_eq!(task.state(), Started);
gst::debug!(RUNTIME_CAT, "pause_start: awaiting 2d iteration");
block_on(iterate_receiver.next()).unwrap();
@@ -1857,8 +2021,7 @@ mod tests {
gst::debug!(RUNTIME_CAT, "pause_start: sending 2d iteration completion");
complete_sender.try_send(()).unwrap();
- task.stop().unwrap();
- task.unprepare().unwrap();
+ stop_then_unprepare(task);
}
#[test]
@@ -1887,27 +2050,25 @@ mod tests {
let task = Task::default();
let (iterate_sender, mut iterate_receiver) = mpsc::channel(1);
- task.prepare(TaskPauseStartTest { iterate_sender }, context)
- .unwrap();
+ let _ = task.prepare(TaskPauseStartTest { iterate_sender }, context);
gst::debug!(RUNTIME_CAT, "successive_pause_start: starting");
- task.start().unwrap();
+ block_on(task.start()).unwrap();
gst::debug!(RUNTIME_CAT, "successive_pause_start: awaiting iteration 1");
block_on(iterate_receiver.next()).unwrap();
gst::debug!(RUNTIME_CAT, "successive_pause_start: pause and start");
- task.pause().unwrap();
- task.start().unwrap();
+ let _ = task.pause();
+ block_on(task.start()).unwrap();
- assert_eq!(task.state(), TaskState::Started);
+ assert_eq!(task.state(), Started);
gst::debug!(RUNTIME_CAT, "successive_pause_start: awaiting iteration 2");
block_on(iterate_receiver.next()).unwrap();
gst::debug!(RUNTIME_CAT, "successive_pause_start: stopping");
- task.stop().unwrap();
- task.unprepare().unwrap();
+ stop_then_unprepare(task);
}
#[test]
@@ -1949,45 +2110,43 @@ mod tests {
let (flush_start_sender, mut flush_start_receiver) = mpsc::channel(1);
let (flush_stop_sender, mut flush_stop_receiver) = mpsc::channel(1);
- task.prepare(
+ let _ = task.prepare(
TaskFlushTest {
flush_start_sender,
flush_stop_sender,
},
context,
- )
- .unwrap();
+ );
gst::debug!(RUNTIME_CAT, "flush_regular_sync: start");
- task.start().unwrap();
+ block_on(task.start()).unwrap();
gst::debug!(RUNTIME_CAT, "flush_regular_sync: starting flush");
- match task.flush_start() {
- Ok(TransitionStatus::Complete {
- origin: TaskState::Started,
- target: TaskState::Flushing,
- }) => (),
- other => panic!("unexpected {:?}", other),
- }
- assert_eq!(task.state(), TaskState::Flushing);
+ assert_eq!(
+ task.flush_start().block_on().unwrap(),
+ Complete {
+ origin: Started,
+ target: Flushing,
+ },
+ );
+ assert_eq!(task.state(), Flushing);
block_on(flush_start_receiver.next()).unwrap();
gst::debug!(RUNTIME_CAT, "flush_regular_sync: stopping flush");
- match task.flush_stop() {
- Ok(TransitionStatus::Complete {
- origin: TaskState::Flushing,
- target: TaskState::Started,
- }) => (),
- other => panic!("unexpected {:?}", other),
- }
- assert_eq!(task.state(), TaskState::Started);
+ assert_eq!(
+ task.flush_stop().await_maybe_on_context().unwrap(),
+ Complete {
+ origin: Flushing,
+ target: Started,
+ },
+ );
+ assert_eq!(task.state(), Started);
block_on(flush_stop_receiver.next()).unwrap();
- task.pause().unwrap();
- task.stop().unwrap();
- task.unprepare().unwrap();
+ let _ = task.pause();
+ stop_then_unprepare(task);
}
#[test]
@@ -2037,17 +2196,16 @@ mod tests {
let (flush_start_sender, mut flush_start_receiver) = mpsc::channel(1);
let (flush_stop_sender, mut flush_stop_receiver) = mpsc::channel(1);
- task.prepare(
+ let _ = task.prepare(
TaskFlushTest {
flush_start_sender,
flush_stop_sender,
},
context,
- )
- .unwrap();
+ );
gst::debug!(RUNTIME_CAT, "flush_regular_different_context: start");
- task.start().unwrap();
+ task.start().block_on().unwrap();
let oob_context = Context::acquire(
"flush_regular_different_context_oob",
@@ -2056,48 +2214,51 @@ mod tests {
.unwrap();
let task_clone = task.clone();
- let flush_handle = oob_context.spawn(async move {
- let flush_ack_handle = match task_clone.flush_start() {
- Ok(TransitionStatus::Async {
- trigger: Trigger::FlushStart,
- origin: TaskState::Started,
- ack_handle,
- }) => ack_handle,
- other => panic!("unexpected {:?}", other),
+ let flush_res_fut = oob_context.spawn(async move {
+ let flush_start_status = task_clone.flush_start();
+ match flush_start_status {
+ Pending {
+ trigger: FlushStart,
+ origin: Started,
+ ..
+ } => (),
+ other => panic!("{:?}", other),
};
- match flush_ack_handle.await.unwrap() {
- Ok(TransitionStatus::Complete {
- origin: TaskState::Started,
- target: TaskState::Flushing,
- }) => (),
- other => panic!("unexpected {:?}", other),
- }
- assert_eq!(task_clone.state(), TaskState::Flushing);
+ assert_eq!(
+ flush_start_status.await.unwrap(),
+ Complete {
+ origin: Started,
+ target: Flushing,
+ },
+ );
+ assert_eq!(task_clone.state(), Flushing);
flush_start_receiver.next().await.unwrap();
- let flush_stop_ack_handle = match task_clone.flush_stop() {
- Ok(TransitionStatus::Async {
- trigger: Trigger::FlushStop,
- origin: TaskState::Flushing,
- ack_handle,
- }) => ack_handle,
- other => panic!("unexpected {:?}", other),
+ let flush_stop_status = task_clone.flush_stop();
+ match flush_stop_status {
+ Pending {
+ trigger: FlushStop,
+ origin: Flushing,
+ ..
+ } => (),
+ other => panic!("{:?}", other),
};
- match flush_stop_ack_handle.await.unwrap() {
- Ok(TransitionStatus::Complete {
- origin: TaskState::Flushing,
- target: TaskState::Started,
- }) => (),
- other => panic!("unexpected {:?}", other),
- }
- assert_eq!(task_clone.state(), TaskState::Started);
+ assert_eq!(
+ flush_stop_status.await_maybe_on_context().unwrap(),
+ NotWaiting {
+ trigger: FlushStop,
+ origin: Flushing,
+ },
+ );
+
+ Context::drain_sub_tasks().await.unwrap();
+ assert_eq!(task_clone.state(), Started);
});
- block_on(flush_handle).unwrap();
+ block_on(flush_res_fut).unwrap();
block_on(flush_stop_receiver.next()).unwrap();
- task.stop().unwrap();
- task.unprepare().unwrap();
+ stop_then_unprepare(task);
}
#[test]
@@ -2141,60 +2302,60 @@ mod tests {
let (flush_start_sender, mut flush_start_receiver) = mpsc::channel(1);
let (flush_stop_sender, mut flush_stop_receiver) = mpsc::channel(1);
- task.prepare(
+ let _ = task.prepare(
TaskFlushTest {
flush_start_sender,
flush_stop_sender,
},
context.clone(),
- )
- .unwrap();
+ );
- task.start().unwrap();
+ block_on(task.start()).unwrap();
let task_clone = task.clone();
let flush_handle = context.spawn(async move {
- let flush_ack_handle = match task_clone.flush_start() {
- Ok(TransitionStatus::Async {
- trigger: Trigger::FlushStart,
- origin: TaskState::Started,
- ack_handle,
- }) => ack_handle,
- other => panic!("unexpected {:?}", other),
+ let flush_start_status = task_clone.flush_start();
+ match flush_start_status {
+ Pending {
+ trigger: FlushStart,
+ origin: Started,
+ ..
+ } => (),
+ other => panic!("{:?}", other),
};
- match flush_ack_handle.await.unwrap() {
- Ok(TransitionStatus::Complete {
- origin: TaskState::Started,
- target: TaskState::Flushing,
- }) => (),
- other => panic!("unexpected {:?}", other),
- }
- assert_eq!(task_clone.state(), TaskState::Flushing);
+ assert_eq!(
+ flush_start_status.await.unwrap(),
+ Complete {
+ origin: Started,
+ target: Flushing,
+ },
+ );
+ assert_eq!(task_clone.state(), Flushing);
flush_start_receiver.next().await.unwrap();
- let flush_stop_ack_handle = match task_clone.flush_stop() {
- Ok(TransitionStatus::Async {
- trigger: Trigger::FlushStop,
- origin: TaskState::Flushing,
- ack_handle,
- }) => ack_handle,
- other => panic!("unexpected {:?}", other),
+ let flush_stop_status = task_clone.flush_stop();
+ match flush_stop_status {
+ Pending {
+ trigger: FlushStop,
+ origin: Flushing,
+ ..
+ } => (),
+ other => panic!("{:?}", other),
};
- match flush_stop_ack_handle.await.unwrap() {
- Ok(TransitionStatus::Complete {
- origin: TaskState::Flushing,
- target: TaskState::Started,
- }) => (),
- other => panic!("unexpected {:?}", other),
- }
- assert_eq!(task_clone.state(), TaskState::Started);
+ assert_eq!(
+ flush_stop_status.await.unwrap(),
+ Complete {
+ origin: Flushing,
+ target: Started,
+ },
+ );
+ assert_eq!(task_clone.state(), Started);
});
block_on(flush_handle).unwrap();
block_on(flush_stop_receiver.next()).unwrap();
- task.stop().unwrap();
- task.unprepare().unwrap();
+ stop_then_unprepare(task);
}
#[test]
@@ -2211,14 +2372,16 @@ mod tests {
fn iterate(&mut self) -> BoxFuture<'_, Result<(), gst::FlowError>> {
async move {
gst::debug!(RUNTIME_CAT, "flush_from_loop: flush_start from iteration");
- match self.task.flush_start() {
- Ok(TransitionStatus::Async {
- trigger: Trigger::FlushStart,
- origin: TaskState::Started,
+ let flush_status = self.task.flush_start();
+ match flush_status {
+ Pending {
+ trigger: FlushStart,
+ origin: Started,
..
- }) => (),
- other => panic!("unexpected {:?}", other),
+ } => (),
+ other => panic!("{:?}", other),
}
+ flush_status.await.unwrap();
Ok(())
}
.boxed()
@@ -2239,16 +2402,15 @@ mod tests {
let task = Task::default();
let (flush_start_sender, mut flush_start_receiver) = mpsc::channel(1);
- task.prepare(
+ let _ = task.prepare(
TaskFlushTest {
task: task.clone(),
flush_start_sender,
},
context,
- )
- .unwrap();
+ );
- task.start().unwrap();
+ let _ = task.start();
gst::debug!(
RUNTIME_CAT,
@@ -2256,14 +2418,14 @@ mod tests {
);
block_on(flush_start_receiver.next()).unwrap();
- match task.stop() {
- Ok(TransitionStatus::Complete {
- origin: TaskState::Flushing,
- target: TaskState::Stopped,
- }) => (),
- other => panic!("unexpected {:?}", other),
- }
- task.unprepare().unwrap();
+ assert_eq!(
+ task.stop().block_on().unwrap(),
+ Complete {
+ origin: Flushing,
+ target: Stopped,
+ },
+ );
+ task.unprepare().block_on().unwrap();
}
#[test]
@@ -2286,11 +2448,11 @@ mod tests {
gst::debug!(RUNTIME_CAT, "pause_from_loop: pause from iteration");
match self.task.pause() {
- Ok(TransitionStatus::NotWaiting {
- trigger: Trigger::Pause,
- origin: TaskState::Started,
- }) => (),
- other => panic!("unexpected {:?}", other),
+ Ready(Ok(TransitionOk::NotWaiting {
+ trigger: Pause,
+ origin: Started,
+ })) => (),
+ other => panic!("{:?}", other),
}
Ok(())
@@ -2313,22 +2475,20 @@ mod tests {
let task = Task::default();
let (pause_sender, mut pause_receiver) = mpsc::channel(1);
- task.prepare(
+ let _ = task.prepare(
TaskStartTest {
task: task.clone(),
pause_sender,
},
context,
- )
- .unwrap();
+ );
- task.start().unwrap();
+ let _ = task.start();
gst::debug!(RUNTIME_CAT, "pause_from_loop: awaiting pause notification");
block_on(pause_receiver.next()).unwrap();
- task.stop().unwrap();
- task.unprepare().unwrap();
+ stop_then_unprepare(task);
}
#[test]
@@ -2353,12 +2513,12 @@ mod tests {
"trigger_from_action: flush_start triggering flush_stop"
);
match self.task.flush_stop() {
- Ok(TransitionStatus::Async {
- trigger: Trigger::FlushStop,
- origin: TaskState::Started,
+ Pending {
+ trigger: FlushStop,
+ origin: Started,
..
- }) => (),
- other => panic!("unexpected {:?}", other),
+ } => (),
+ other => panic!("{:?}", other),
}
Ok(())
@@ -2381,17 +2541,16 @@ mod tests {
let task = Task::default();
let (flush_stop_sender, mut flush_stop_receiver) = mpsc::channel(1);
- task.prepare(
+ let _ = task.prepare(
TaskFlushTest {
task: task.clone(),
flush_stop_sender,
},
context,
- )
- .unwrap();
+ );
- task.start().unwrap();
- task.flush_start().unwrap();
+ task.start().block_on().unwrap();
+ let _ = task.flush_start();
gst::debug!(
RUNTIME_CAT,
@@ -2399,8 +2558,7 @@ mod tests {
);
block_on(flush_stop_receiver.next()).unwrap();
- task.stop().unwrap();
- task.unprepare().unwrap();
+ stop_then_unprepare(task);
}
#[test]
@@ -2453,65 +2611,63 @@ mod tests {
let (started_sender, mut started_receiver) = mpsc::channel(1);
let (flush_start_sender, mut flush_start_receiver) = mpsc::channel(1);
let (flush_stop_sender, mut flush_stop_receiver) = mpsc::channel(1);
- task.prepare(
+ let _ = task.prepare(
TaskFlushTest {
started_sender,
flush_start_sender,
flush_stop_sender,
},
context,
- )
- .unwrap();
+ );
// Pause, FlushStart, FlushStop, Start
gst::debug!(RUNTIME_CAT, "pause_flush_start: pausing");
- match task.pause() {
- Ok(TransitionStatus::Complete {
- origin: TaskState::Prepared,
- target: TaskState::Paused,
- }) => (),
- other => panic!("unexpected {:?}", other),
- }
+ assert_eq!(
+ task.pause().block_on().unwrap(),
+ Complete {
+ origin: Prepared,
+ target: Paused,
+ },
+ );
gst::debug!(RUNTIME_CAT, "pause_flush_start: starting flush");
- match task.flush_start() {
- Ok(TransitionStatus::Complete {
- origin: TaskState::Paused,
- target: TaskState::PausedFlushing,
- }) => (),
- other => panic!("unexpected {:?}", other),
- }
- assert_eq!(task.state(), TaskState::PausedFlushing);
+ assert_eq!(
+ task.flush_start().block_on().unwrap(),
+ Complete {
+ origin: Paused,
+ target: PausedFlushing,
+ },
+ );
+ assert_eq!(task.state(), PausedFlushing);
block_on(flush_start_receiver.next());
gst::debug!(RUNTIME_CAT, "pause_flush_start: stopping flush");
- match task.flush_stop() {
- Ok(TransitionStatus::Complete {
- origin: TaskState::PausedFlushing,
- target: TaskState::Paused,
- }) => (),
- other => panic!("unexpected {:?}", other),
- }
- assert_eq!(task.state(), TaskState::Paused);
+ assert_eq!(
+ task.flush_stop().block_on().unwrap(),
+ Complete {
+ origin: PausedFlushing,
+ target: Paused,
+ },
+ );
+ assert_eq!(task.state(), Paused);
block_on(flush_stop_receiver.next());
// start action not executed
started_receiver.try_next().unwrap_err();
gst::debug!(RUNTIME_CAT, "pause_flush_start: starting after flushing");
- match task.start() {
- Ok(TransitionStatus::Complete {
- origin: TaskState::Paused,
- target: TaskState::Started,
- }) => (),
- other => panic!("unexpected {:?}", other),
- }
- assert_eq!(task.state(), TaskState::Started);
+ assert_eq!(
+ task.start().block_on().unwrap(),
+ Complete {
+ origin: Paused,
+ target: Started,
+ },
+ );
+ assert_eq!(task.state(), Started);
block_on(started_receiver.next());
- task.stop().unwrap();
- task.unprepare().unwrap();
+ stop_then_unprepare(task);
}
#[test]
@@ -2564,53 +2720,51 @@ mod tests {
let (started_sender, mut started_receiver) = mpsc::channel(1);
let (flush_start_sender, mut flush_start_receiver) = mpsc::channel(1);
let (flush_stop_sender, mut flush_stop_receiver) = mpsc::channel(1);
- task.prepare(
+ let _ = task.prepare(
TaskFlushTest {
started_sender,
flush_start_sender,
flush_stop_sender,
},
context,
- )
- .unwrap();
+ );
// Pause, FlushStart, Start, FlushStop
gst::debug!(RUNTIME_CAT, "pause_flushing_start: pausing");
- task.pause().unwrap();
+ let _ = task.pause();
gst::debug!(RUNTIME_CAT, "pause_flushing_start: starting flush");
- task.flush_start().unwrap();
- assert_eq!(task.state(), TaskState::PausedFlushing);
+ block_on(task.flush_start()).unwrap();
+ assert_eq!(task.state(), PausedFlushing);
block_on(flush_start_receiver.next());
gst::debug!(RUNTIME_CAT, "pause_flushing_start: starting while flushing");
- match task.start() {
- Ok(TransitionStatus::Complete {
- origin: TaskState::PausedFlushing,
- target: TaskState::Flushing,
- }) => (),
- other => panic!("unexpected {:?}", other),
- }
- assert_eq!(task.state(), TaskState::Flushing);
+ assert_eq!(
+ task.start().block_on().unwrap(),
+ Complete {
+ origin: PausedFlushing,
+ target: Flushing,
+ },
+ );
+ assert_eq!(task.state(), Flushing);
// start action not executed
started_receiver.try_next().unwrap_err();
gst::debug!(RUNTIME_CAT, "pause_flushing_start: stopping flush");
- match task.flush_stop() {
- Ok(TransitionStatus::Complete {
- origin: TaskState::Flushing,
- target: TaskState::Started,
- }) => (),
- other => panic!("unexpected {:?}", other),
- }
- assert_eq!(task.state(), TaskState::Started);
+ assert_eq!(
+ task.flush_stop().block_on().unwrap(),
+ Complete {
+ origin: Flushing,
+ target: Started,
+ },
+ );
+ assert_eq!(task.state(), Started);
block_on(flush_stop_receiver.next());
block_on(started_receiver.next());
- task.stop().unwrap();
- task.unprepare().unwrap();
+ stop_then_unprepare(task);
}
#[test]
@@ -2654,20 +2808,19 @@ mod tests {
let (flush_start_sender, mut flush_start_receiver) = mpsc::channel(1);
let (flush_stop_sender, mut flush_stop_receiver) = mpsc::channel(1);
- task.prepare(
+ let _ = task.prepare(
TaskStartTest {
flush_start_sender,
flush_stop_sender,
},
context,
- )
- .unwrap();
+ );
let oob_context =
Context::acquire("flush_concurrent_start_oob", Duration::from_millis(2)).unwrap();
let task_clone = task.clone();
- task.pause().unwrap();
+ block_on(task.pause()).unwrap();
// Launch flush_start // start
let (ready_sender, ready_receiver) = oneshot::channel();
@@ -2675,21 +2828,21 @@ mod tests {
let flush_start_handle = oob_context.spawn(async move {
gst::debug!(RUNTIME_CAT, "flush_concurrent_start: // flush_start");
ready_sender.send(()).unwrap();
- let res = task_clone.flush_start().unwrap();
- let ack_handle = match res {
- TransitionStatus::Async {
- trigger: Trigger::FlushStart,
- origin: TaskState::Paused,
- ack_handle,
- } => ack_handle,
- TransitionStatus::Async {
- trigger: Trigger::FlushStart,
- origin: TaskState::Started,
- ack_handle,
- } => ack_handle,
- other => panic!("unexpected {:?}", other),
+ let status = task_clone.flush_start();
+ match status {
+ Pending {
+ trigger: FlushStart,
+ origin: Paused,
+ ..
+ } => (),
+ Pending {
+ trigger: FlushStart,
+ origin: Started,
+ ..
+ } => (),
+ other => panic!("{:?}", other),
};
- ack_handle.await.unwrap().unwrap();
+ status.await.unwrap();
flush_start_receiver.next().await.unwrap();
});
@@ -2700,34 +2853,32 @@ mod tests {
block_on(ready_receiver).unwrap();
gst::debug!(RUNTIME_CAT, "flush_concurrent_start: // start");
- let res = task.start().unwrap();
- match res {
- TransitionStatus::Complete {
- origin: TaskState::Paused,
- target: TaskState::Started,
- } => (),
- TransitionStatus::Complete {
- origin: TaskState::PausedFlushing,
- target: TaskState::Flushing,
- } => (),
- other => panic!("unexpected {:?}", other),
+ match block_on(task.start()) {
+ Ok(TransitionOk::Complete {
+ origin: Paused,
+ target: Started,
+ }) => (),
+ Ok(TransitionOk::Complete {
+ origin: PausedFlushing,
+ target: Flushing,
+ }) => (),
+ other => panic!("{:?}", other),
}
block_on(flush_start_handle).unwrap();
gst::debug!(RUNTIME_CAT, "flush_concurrent_start: requesting flush_stop");
- match task.flush_stop() {
- Ok(TransitionStatus::Complete {
- origin: TaskState::Flushing,
- target: TaskState::Started,
- }) => (),
- other => panic!("unexpected {:?}", other),
- }
- assert_eq!(task.state(), TaskState::Started);
+ assert_eq!(
+ task.flush_stop().block_on().unwrap(),
+ Complete {
+ origin: Flushing,
+ target: Started,
+ },
+ );
+ assert_eq!(task.state(), Started);
block_on(flush_stop_receiver.next());
- task.stop().unwrap();
- task.unprepare().unwrap();
+ stop_then_unprepare(task);
}
#[test]
@@ -2772,22 +2923,20 @@ mod tests {
let task = Task::default();
let (timer_elapsed_sender, timer_elapsed_receiver) = oneshot::channel();
- task.prepare(
+ let _ = task.prepare(
TaskTimerTest {
timer: None,
timer_elapsed_sender: Some(timer_elapsed_sender),
},
context,
- )
- .unwrap();
+ );
gst::debug!(RUNTIME_CAT, "start_timer: start");
- task.start().unwrap();
+ let _ = task.start();
block_on(timer_elapsed_receiver).unwrap();
gst::debug!(RUNTIME_CAT, "start_timer: timer elapsed received");
- task.stop().unwrap();
- task.unprepare().unwrap();
+ stop_then_unprepare(task);
}
}