Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.freedesktop.org/gstreamer/gst-plugins-rs.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorFrançois Laignel <fengalin@free.fr>2022-08-06 19:10:43 +0300
committerFrançois Laignel <fengalin@free.fr>2022-08-10 21:02:53 +0300
commit8b54c3fed699fb8f47c30c531d58eec67f0079a9 (patch)
treed6b668b0f6fe3b8506d458ba648790943325f176 /generic/threadshare/src
parent07cbc2f0252d3288ff4e5c10705b3597a12af5e1 (diff)
ts/Task: split iterate into try_next and handle_item
Previous Task iteration model suffered from the following shortcomings: - When an iteration was engaged it could be cancelled at await points by Stop or Flush state transitions, which could lead to inconsistent states. - When an iteration was engaged it could not be cancelled by a Pause state transition so as to prevent data loss. This meant we couldn't block on the Pause request because the mechanism couldn't guarantee Paused would be reached in a timely manner. This commit split the Task iteration into: - `try_next`: this function returns a future that awaits for a new iteration to begin. The regular use case is to return an item to process. The item can be left to `()` if `try_next` acts as a tick generator. It can also return an error. This function can be cancelled at await points when a state transition request occurs. - `handle_item`: this function is called with the item returned by `try_next` and is guaranteed to run to completion even if a transition request is received. Note that this model plays well with the common Future cancellation pitfalls in Rust.
Diffstat (limited to 'generic/threadshare/src')
-rw-r--r--generic/threadshare/src/runtime/task.rs780
1 files changed, 368 insertions, 412 deletions
diff --git a/generic/threadshare/src/runtime/task.rs b/generic/threadshare/src/runtime/task.rs
index 559086188..abf23e1b0 100644
--- a/generic/threadshare/src/runtime/task.rs
+++ b/generic/threadshare/src/runtime/task.rs
@@ -22,9 +22,8 @@
use futures::channel::mpsc as async_mpsc;
use futures::channel::oneshot;
-use futures::future::{self, abortable, AbortHandle, Aborted, BoxFuture};
+use futures::future::{self, BoxFuture};
use futures::prelude::*;
-use futures::stream::StreamExt;
use std::fmt;
use std::ops::Deref;
@@ -73,7 +72,7 @@ pub enum TransitionOk {
/// This is to prevent:
/// - A deadlock when executing a transition action.
/// - A potential infinite wait when pausing a running loop
- /// which could be awaiting for an `iterate` to complete.
+ /// which could be awaiting for an `nominal` to complete.
NotWaiting { trigger: Trigger, origin: TaskState },
/// Skipping triggering event due to current state.
Skipped { trigger: Trigger, state: TaskState },
@@ -319,6 +318,8 @@ impl fmt::Debug for TransitionStatus {
///
/// Defines implementations for state transition actions and error handlers.
pub trait TaskImpl: Send + 'static {
+ type Item: Send + 'static;
+
fn prepare(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
future::ok(()).boxed()
}
@@ -331,8 +332,31 @@ pub trait TaskImpl: Send + 'static {
future::ok(()).boxed()
}
- /// Executes an iteration in `TaskState::Started`.
- fn iterate(&mut self) -> BoxFuture<'_, Result<(), gst::FlowError>>;
+ /// Tries to retrieve the next item to process.
+ ///
+ /// With [`Self::handle_item`], this is one of the two `Task` loop
+ /// functions. They are executed in a loop in the `Started` state.
+ ///
+ /// Function `try_next` is awaited at the beginning of each iteration,
+ /// and can be cancelled at `await` point if a state transition is requested.
+ ///
+ /// If `Ok(item)` is returned, the iteration calls [`Self::handle_item`]
+ /// with said `Item`.
+ ///
+ /// If `Err(..)` is returned, the iteration calls [`Self::handle_loop_error`].
+ fn try_next(&mut self) -> BoxFuture<'_, Result<Self::Item, gst::FlowError>>;
+
+ /// Does whatever needs to be done with the `item`.
+ ///
+ /// With [`Self::try_next`], this is one of the two `Task` loop
+ /// functions. They are executed in a loop in the `Started` state.
+ ///
+ /// Function `handle_item` asynchronously processes an `item` previously
+ /// retrieved by [`Self::try_next`]. Processing is guaranteed to run
+ /// to completion even if a state transition is requested.
+ ///
+ /// If `Err(..)` is returned, the iteration calls [`Self::handle_loop_error`].
+ fn handle_item(&mut self, _item: Self::Item) -> BoxFuture<'_, Result<(), gst::FlowError>>;
fn pause(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
future::ok(()).boxed()
@@ -350,12 +374,13 @@ pub trait TaskImpl: Send + 'static {
future::ok(()).boxed()
}
- /// Handles an error occuring during the execution of an iteration.
+ /// Handles an error occuring during the execution of the `Task` loop.
///
- /// This handler also catches errors returned by subtasks spawned by the iteration.
+ /// This include errors returned by [`Self::try_next`], [`Self::handle_item`]
+ /// as well as errors returned by subtasks drain at the end of an iteration.
///
- /// If the error is unrecoverable, implementations might use `gst::Element::post_error_message`
- /// and return `Trigger::Error`.
+ /// If the error is unrecoverable, implementations might use
+ /// `gst::Element::post_error_message` and return `Trigger::Error`.
///
/// Otherwise, handle the error and return the requested `Transition` to recover.
///
@@ -364,26 +389,22 @@ pub trait TaskImpl: Send + 'static {
/// - `FlowError::Flushing` -> `Trigger::FlushStart`.
/// - `FlowError::Eos` -> `Trigger::Stop`.
/// - Other `FlowError` -> `Trigger::Error`.
- fn handle_iterate_error(&mut self, err: gst::FlowError) -> BoxFuture<'_, Trigger> {
+ fn handle_loop_error(&mut self, err: gst::FlowError) -> BoxFuture<'_, Trigger> {
async move {
match err {
gst::FlowError::Flushing => {
gst::debug!(
RUNTIME_CAT,
- "TaskImpl iterate returned Flushing. Posting FlushStart"
+ "Task loop returned Flushing. Posting FlushStart"
);
Trigger::FlushStart
}
gst::FlowError::Eos => {
- gst::debug!(RUNTIME_CAT, "TaskImpl iterate returned Eos. Posting Stop");
+ gst::debug!(RUNTIME_CAT, "Task loop returned Eos. Posting Stop");
Trigger::Stop
}
other => {
- gst::error!(
- RUNTIME_CAT,
- "TaskImpl iterate returned {:?}. Posting Error",
- other
- );
+ gst::error!(RUNTIME_CAT, "Task loop returned {:?}. Posting Error", other);
Trigger::Error
}
}
@@ -497,7 +518,6 @@ impl StateMachineHandle {
struct TaskInner {
state: TaskState,
state_machine_handle: Option<StateMachineHandle>,
- loop_abort_handle: Option<AbortHandle>,
}
impl Default for TaskInner {
@@ -505,7 +525,6 @@ impl Default for TaskInner {
TaskInner {
state: TaskState::Unprepared,
state_machine_handle: None,
- loop_abort_handle: None,
}
}
}
@@ -568,12 +587,6 @@ impl TaskInner {
}
})
}
-
- fn abort_task_loop(&mut self) {
- if let Some(loop_abort_handle) = self.loop_abort_handle.take() {
- loop_abort_handle.abort();
- }
- }
}
impl Drop for TaskInner {
@@ -713,7 +726,6 @@ impl Task {
}
};
- inner.abort_task_loop();
let ack_rx = state_machine_handle.trigger(Trigger::Unprepare);
drop(inner);
@@ -735,11 +747,6 @@ impl Task {
pub fn start(&self) -> TransitionStatus {
let mut inner = self.0.lock().unwrap();
- let ack_rx = match inner.trigger(Trigger::Start) {
- Ok(ack_rx) => ack_rx,
- Err(err) => return err.into(),
- };
-
if let TaskState::Started = inner.state {
return TransitionOk::Skipped {
trigger: Trigger::Start,
@@ -748,6 +755,11 @@ impl Task {
.into();
}
+ let ack_rx = match inner.trigger(Trigger::Start) {
+ Ok(ack_rx) => ack_rx,
+ Err(err) => return err.into(),
+ };
+
let origin = inner.state;
drop(inner);
@@ -760,59 +772,29 @@ impl Task {
/// Requests the `Task` loop to pause.
///
- /// If an iteration is in progress, it will run to completion,
- /// then no more iteration will be executed before `start` is called again.
- /// Therefore, it is not guaranteed that `Paused` is reached when `pause` returns.
+ /// If an item handling is in progress, it will run to completion,
+ /// then no iterations will be executed before `start` is called again.
pub fn pause(&self) -> TransitionStatus {
- let mut inner = self.0.lock().unwrap();
-
- let ack_rx = match inner.trigger(Trigger::Pause) {
- Ok(ack_rx) => ack_rx,
- Err(err) => return err.into(),
- };
-
- if let TaskState::Started = inner.state {
- // FIXME this could be async when iterate is split into next_item / handle_item
- return TransitionOk::NotWaiting {
- trigger: Trigger::Pause,
- origin: TaskState::Started,
- }
- .into();
- }
-
- let origin = inner.state;
- drop(inner);
-
- TransitionStatus::Pending {
- trigger: Trigger::Pause,
- origin,
- res_fut: Box::pin(ack_rx.map(Result::unwrap)),
- }
+ self.push_pending(Trigger::Pause)
}
pub fn flush_start(&self) -> TransitionStatus {
- self.abort_push_await(Trigger::FlushStart)
+ self.push_pending(Trigger::FlushStart)
}
pub fn flush_stop(&self) -> TransitionStatus {
- self.abort_push_await(Trigger::FlushStop)
+ self.push_pending(Trigger::FlushStop)
}
/// Stops the `Started` `Task` and wait for it to finish.
pub fn stop(&self) -> TransitionStatus {
- self.abort_push_await(Trigger::Stop)
+ self.push_pending(Trigger::Stop)
}
- /// Pushes a [`Trigger`] which requires the iteration loop to abort ASAP.
- ///
- /// This function:
- /// - Aborts the iteration loop aborts.
- /// - Pushes the provided [`Trigger`].
- /// - Awaits for the expected transition as usual.
- fn abort_push_await(&self, trigger: Trigger) -> TransitionStatus {
+ /// Pushes a [`Trigger`] and returns TransitionStatus::Pending.
+ fn push_pending(&self, trigger: Trigger) -> TransitionStatus {
let mut inner = self.0.lock().unwrap();
- inner.abort_task_loop();
let ack_rx = match inner.trigger(trigger) {
Ok(ack_rx) => ack_rx,
Err(err) => return err.into(),
@@ -829,8 +811,8 @@ impl Task {
}
}
-struct StateMachine {
- task_impl: Box<dyn TaskImpl>,
+struct StateMachine<Item: Send + 'static> {
+ task_impl: Box<dyn TaskImpl<Item = Item>>,
triggering_evt_rx: async_mpsc::Receiver<TriggeringEvent>,
pending_triggering_evt: Option<TriggeringEvent>,
}
@@ -886,12 +868,12 @@ macro_rules! exec_action {
}};
}
-impl StateMachine {
+impl<Item: Send + 'static> StateMachine<Item> {
// Use dynamic dispatch for TaskImpl as it reduces memory usage compared to monomorphization
// without inducing any significant performance penalties.
fn spawn(
task_inner: Arc<Mutex<TaskInner>>,
- task_impl: Box<dyn TaskImpl>,
+ task_impl: Box<dyn TaskImpl<Item = Item>>,
context: Context,
) -> StateMachineHandle {
let (triggering_evt_tx, triggering_evt_rx) = async_mpsc::channel(4);
@@ -910,8 +892,6 @@ impl StateMachine {
}
async fn run(mut self, task_inner: Arc<Mutex<TaskInner>>) {
- let context = Context::current().expect("must be spawed on a Context");
-
let mut triggering_evt = self
.triggering_evt_rx
.next()
@@ -953,7 +933,6 @@ impl StateMachine {
.await
.expect("triggering_evt_rx dropped"),
};
-
gst::trace!(RUNTIME_CAT, "State machine popped {:?}", triggering_evt);
match triggering_evt.trigger {
@@ -990,7 +969,7 @@ impl StateMachine {
origin
};
- self = Self::start(self, triggering_evt, origin, &task_inner, &context).await;
+ self.start(triggering_evt, origin, &task_inner).await;
// next/pending triggering event handled in next iteration
}
Trigger::Pause => {
@@ -1109,8 +1088,7 @@ impl StateMachine {
.switch_to_state(TaskState::Paused, triggering_evt);
gst::trace!(RUNTIME_CAT, "Switched from PausedFlushing to Paused");
} else {
- self = Self::start(self, triggering_evt, origin, &task_inner, &context)
- .await;
+ self.start(triggering_evt, origin, &task_inner).await;
// next/pending triggering event handled in next iteration
}
}
@@ -1145,62 +1123,48 @@ impl StateMachine {
}
async fn start(
- mut self,
+ &mut self,
mut triggering_evt: TriggeringEvent,
origin: TaskState,
task_inner: &Arc<Mutex<TaskInner>>,
- context: &Context,
- ) -> Self {
- let triggering_evt = match exec_action!(self, start, triggering_evt, origin, &task_inner) {
- Ok(triggering_evt) => triggering_evt,
- Err(_) => return self,
- };
-
- let task_inner_cl = Arc::clone(task_inner);
- let loop_fut = async move {
- let (abortable_task_loop, loop_abort_handle) =
- abortable(self.run_loop(Arc::clone(&task_inner_cl)));
-
- {
- let mut task_inner = task_inner_cl.lock().unwrap();
- task_inner.loop_abort_handle = Some(loop_abort_handle);
+ ) {
+ match exec_action!(self, start, triggering_evt, origin, &task_inner) {
+ Ok(triggering_evt) => {
+ let mut task_inner = task_inner.lock().unwrap();
task_inner.switch_to_state(TaskState::Started, triggering_evt);
-
- gst::trace!(RUNTIME_CAT, "Starting task loop");
}
-
- match abortable_task_loop.await {
- Ok(Ok(())) => (),
- Ok(Err(err)) => {
- let next_trigger = self.task_impl.handle_iterate_error(err).await;
- let (triggering_evt, _) = TriggeringEvent::new(next_trigger);
- self.pending_triggering_evt = Some(triggering_evt);
- }
- Err(Aborted) => gst::trace!(RUNTIME_CAT, "Task loop aborted"),
+ Err(_) => {
+ // error handled by exec_action
+ return;
}
+ }
- self
- };
-
- context.spawn_and_unpark(loop_fut).await.unwrap()
+ match self.run_loop().await {
+ Ok(()) => (),
+ Err(err) => {
+ let next_trigger = self.task_impl.handle_loop_error(err).await;
+ let (triggering_evt, _) = TriggeringEvent::new(next_trigger);
+ self.pending_triggering_evt = Some(triggering_evt);
+ }
+ }
}
- async fn run_loop(&mut self, task_inner: Arc<Mutex<TaskInner>>) -> Result<(), gst::FlowError> {
+ async fn run_loop(&mut self) -> Result<(), gst::FlowError> {
gst::trace!(RUNTIME_CAT, "Task loop started");
+ let mut item;
loop {
- while let Ok(Some(triggering_evt)) = self.triggering_evt_rx.try_next() {
- gst::trace!(RUNTIME_CAT, "Task loop popped {:?}", triggering_evt);
-
- match triggering_evt.trigger {
- Trigger::Start => {
- task_inner
- .lock()
- .unwrap()
- .skip_triggering_evt(triggering_evt);
- gst::trace!(RUNTIME_CAT, "Skipped Start in state Started");
- }
- _ => {
+ item = {
+ // select_biased requires the selected futures to implement
+ // `FusedFuture`. Because async trait functions are not stable,
+ // we use `BoxFuture` for the `TaskImpl` function, including
+ // `try_next`. Since we need to get a new `BoxFuture` at
+ // each iteration, we can guarantee that the future is
+ // always valid for use in `select_biased`.
+ let mut try_next_fut = self.task_impl.try_next().fuse();
+ futures::select_biased! {
+ triggering_evt = self.triggering_evt_rx.next() => {
+ let triggering_evt = triggering_evt.expect("broken state machine channel");
gst::trace!(
RUNTIME_CAT,
"Task loop handing {:?} to state machine",
@@ -1209,19 +1173,22 @@ impl StateMachine {
self.pending_triggering_evt = Some(triggering_evt);
return Ok(());
}
+ try_next_res = try_next_fut => try_next_res.map_err(|err| {
+ gst::debug!(RUNTIME_CAT, "TaskImpl::try_next returned {:?}", err);
+ err
+ })?,
}
- }
+ };
- // Run the iteration function
- self.task_impl.iterate().await.map_err(|err| {
- gst::log!(RUNTIME_CAT, "Task loop iterate impl returned {:?}", err);
+ self.task_impl.handle_item(item).await.map_err(|err| {
+ gst::debug!(RUNTIME_CAT, "TaskImpl::handle_item returned {:?}", err);
err
})?;
while Context::current_has_sub_tasks() {
- gst::trace!(RUNTIME_CAT, "Draining subtasks for {}", stringify!($action));
+ gst::trace!(RUNTIME_CAT, "Draining subtasks after Task loop iteration");
Context::drain_sub_tasks().await.map_err(|err| {
- gst::log!(RUNTIME_CAT, "Task loop iterate subtask returned {:?}", err);
+ gst::debug!(RUNTIME_CAT, "Task iteration subtask returned {:?}", err);
err
})?;
}
@@ -1245,24 +1212,27 @@ mod tests {
}
#[test]
- fn iterate() {
+ fn nominal() {
gst::init().unwrap();
struct TaskTest {
prepared_sender: mpsc::Sender<()>,
started_sender: mpsc::Sender<()>,
- iterate_sender: mpsc::Sender<()>,
- complete_iterate_receiver: mpsc::Receiver<Result<(), gst::FlowError>>,
+ try_next_ready_sender: mpsc::Sender<()>,
+ try_next_receiver: mpsc::Receiver<()>,
+ handle_item_ready_sender: mpsc::Sender<()>,
+ handle_item_sender: mpsc::Sender<()>,
paused_sender: mpsc::Sender<()>,
stopped_sender: mpsc::Sender<()>,
- flush_start_sender: mpsc::Sender<()>,
unprepared_sender: mpsc::Sender<()>,
}
impl TaskImpl for TaskTest {
+ type Item = ();
+
fn prepare(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
async move {
- gst::debug!(RUNTIME_CAT, "iterate: prepared");
+ gst::debug!(RUNTIME_CAT, "nominal: prepared");
self.prepared_sender.send(()).await.unwrap();
Ok(())
}
@@ -1271,39 +1241,41 @@ mod tests {
fn start(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
async move {
- gst::debug!(RUNTIME_CAT, "iterate: started");
+ gst::debug!(RUNTIME_CAT, "nominal: started");
self.started_sender.send(()).await.unwrap();
Ok(())
}
.boxed()
}
- fn iterate(&mut self) -> BoxFuture<'_, Result<(), gst::FlowError>> {
+ fn try_next(&mut self) -> BoxFuture<'_, Result<(), gst::FlowError>> {
async move {
- gst::debug!(RUNTIME_CAT, "iterate: entering iterate");
- self.iterate_sender.send(()).await.unwrap();
+ gst::debug!(RUNTIME_CAT, "nominal: entering try_next");
+ self.try_next_ready_sender.send(()).await.unwrap();
+ gst::debug!(RUNTIME_CAT, "nominal: awaiting try_next");
+ self.try_next_receiver.next().await.unwrap();
+ Ok(())
+ }
+ .boxed()
+ }
- gst::debug!(RUNTIME_CAT, "iterate: awaiting complete_iterate_receiver");
+ fn handle_item(&mut self, _item: ()) -> BoxFuture<'_, Result<(), gst::FlowError>> {
+ async move {
+ gst::debug!(RUNTIME_CAT, "nominal: entering handle_item");
+ self.handle_item_ready_sender.send(()).await.unwrap();
- let res = self.complete_iterate_receiver.next().await.unwrap();
- if res.is_ok() {
- gst::debug!(RUNTIME_CAT, "iterate: received Ok => keep looping");
- } else {
- gst::debug!(
- RUNTIME_CAT,
- "iterate: received {:?} => cancelling loop",
- res
- );
- }
+ gst::debug!(RUNTIME_CAT, "nominal: locked in handle_item");
+ self.handle_item_sender.send(()).await.unwrap();
+ gst::debug!(RUNTIME_CAT, "nominal: leaving handle_item");
- res
+ Ok(())
}
.boxed()
}
fn pause(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
async move {
- gst::debug!(RUNTIME_CAT, "iterate: paused");
+ gst::debug!(RUNTIME_CAT, "nominal: paused");
self.paused_sender.send(()).await.unwrap();
Ok(())
}
@@ -1312,56 +1284,49 @@ mod tests {
fn stop(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
async move {
- gst::debug!(RUNTIME_CAT, "iterate: stopped");
+ gst::debug!(RUNTIME_CAT, "nominal: stopped");
self.stopped_sender.send(()).await.unwrap();
Ok(())
}
.boxed()
}
- fn flush_start(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
- async move {
- gst::debug!(RUNTIME_CAT, "iterate: stopped");
- self.flush_start_sender.send(()).await.unwrap();
- Ok(())
- }
- .boxed()
- }
-
fn unprepare(&mut self) -> BoxFuture<'_, ()> {
async move {
- gst::debug!(RUNTIME_CAT, "iterate: unprepared");
+ gst::debug!(RUNTIME_CAT, "nominal: unprepared");
self.unprepared_sender.send(()).await.unwrap();
}
.boxed()
}
}
- let context = Context::acquire("iterate", Duration::from_millis(2)).unwrap();
+ let context = Context::acquire("nominal", Duration::from_millis(2)).unwrap();
let task = Task::default();
assert_eq!(task.state(), Unprepared);
- gst::debug!(RUNTIME_CAT, "iterate: preparing");
+ gst::debug!(RUNTIME_CAT, "nominal: preparing");
let (prepared_sender, mut prepared_receiver) = mpsc::channel(1);
let (started_sender, mut started_receiver) = mpsc::channel(1);
- let (iterate_sender, mut iterate_receiver) = mpsc::channel(1);
- let (mut complete_iterate_sender, complete_iterate_receiver) = mpsc::channel(1);
+ let (try_next_ready_sender, mut try_next_ready_receiver) = mpsc::channel(1);
+ let (mut try_next_sender, try_next_receiver) = mpsc::channel(1);
+ let (handle_item_ready_sender, mut handle_item_ready_receiver) = mpsc::channel(1);
+ let (handle_item_sender, mut handle_item_receiver) = mpsc::channel(0);
let (paused_sender, mut paused_receiver) = mpsc::channel(1);
let (stopped_sender, mut stopped_receiver) = mpsc::channel(1);
- let (flush_start_sender, mut flush_start_receiver) = mpsc::channel(1);
let (unprepared_sender, mut unprepared_receiver) = mpsc::channel(1);
let prepare_status = task.prepare(
TaskTest {
prepared_sender,
started_sender,
- iterate_sender,
- complete_iterate_receiver,
+ try_next_ready_sender,
+ try_next_receiver,
+ handle_item_ready_sender,
+ handle_item_sender,
paused_sender,
stopped_sender,
- flush_start_sender,
unprepared_sender,
},
context,
@@ -1377,18 +1342,10 @@ mod tests {
other => panic!("{:?}", other),
};
- gst::debug!(RUNTIME_CAT, "iterate: starting (async prepare)");
- // also tests await_maybe_on_context
- assert_eq!(
- task.start().await_maybe_on_context().unwrap(),
- Complete {
- origin: Prepared,
- target: Started,
- }
- );
- assert_eq!(task.state(), Started);
+ gst::debug!(RUNTIME_CAT, "nominal: starting (async prepare)");
+ let start_status = task.start().check().unwrap();
- // At this point, preparation must be complete
+ block_on(prepared_receiver.next()).unwrap();
// also tests await_maybe_on_context
assert_eq!(
prepare_status.await_maybe_on_context().unwrap(),
@@ -1397,17 +1354,24 @@ mod tests {
target: Prepared,
},
);
- block_on(prepared_receiver.next()).unwrap();
- // ... and start executed
- block_on(started_receiver.next()).unwrap();
+ block_on(started_receiver.next()).unwrap();
+ assert_eq!(
+ start_status.block_on().unwrap(),
+ Complete {
+ origin: Prepared,
+ target: Started,
+ }
+ );
assert_eq!(task.state(), Started);
// unlock task loop and keep looping
- block_on(iterate_receiver.next()).unwrap();
- block_on(complete_iterate_sender.send(Ok(()))).unwrap();
+ block_on(try_next_ready_receiver.next()).unwrap();
+ block_on(try_next_sender.send(())).unwrap();
+ block_on(handle_item_ready_receiver.next()).unwrap();
+ block_on(handle_item_receiver.next()).unwrap();
- gst::debug!(RUNTIME_CAT, "iterate: starting (redundant)");
+ gst::debug!(RUNTIME_CAT, "nominal: starting (redundant)");
// already started
assert_eq!(
task.start().block_on().unwrap(),
@@ -1428,82 +1392,96 @@ mod tests {
other => panic!("{:?}", other),
}
- gst::debug!(RUNTIME_CAT, "iterate: pause (initial)");
- let pause_status = task.pause();
- assert!(pause_status.is_ready());
- // also tests `check`
- match pause_status.check().unwrap() {
- Ready(Ok(NotWaiting {
- trigger: Pause,
- origin: Started,
- })) => (),
- other => panic!("{:?}", other),
- }
-
- // Pause transition is asynchronous FIXME
- while TaskState::Paused != task.state() {
- std::thread::sleep(Duration::from_millis(2));
+ gst::debug!(RUNTIME_CAT, "nominal: pause cancelling try_next");
+ block_on(try_next_ready_receiver.next()).unwrap();
- if let Ok(Some(())) = iterate_receiver.try_next() {
- // unlock iteration
- block_on(complete_iterate_sender.send(Ok(()))).unwrap();
- }
- }
-
- gst::debug!(RUNTIME_CAT, "iterate: awaiting pause ack");
+ let pause_status = task.pause().check().unwrap();
+ gst::debug!(RUNTIME_CAT, "nominal: awaiting pause ack");
block_on(paused_receiver.next()).unwrap();
+ assert_eq!(
+ pause_status.block_on().unwrap(),
+ Complete {
+ origin: Started,
+ target: Paused,
+ },
+ );
- gst::debug!(RUNTIME_CAT, "iterate: starting (after pause)");
+ // handle_item not reached
+ assert!(handle_item_ready_receiver.try_next().is_err());
+ // try_next not reached again
+ assert!(try_next_ready_receiver.try_next().is_err());
+
+ gst::debug!(
+ RUNTIME_CAT,
+ "nominal: starting (after pause cancelling try_next)"
+ );
+ let start_receiver = task.start().check().unwrap();
+ block_on(started_receiver.next()).unwrap();
assert_eq!(
- task.start().block_on().unwrap(),
+ start_receiver.block_on().unwrap(),
Complete {
origin: Paused,
target: Started,
},
);
-
assert_eq!(task.state(), Started);
- // Paused -> Started
- let _ = block_on(started_receiver.next());
- gst::debug!(RUNTIME_CAT, "iterate: stopping");
+ gst::debug!(RUNTIME_CAT, "nominal: pause // handle_item");
+ block_on(try_next_ready_receiver.next()).unwrap();
+ block_on(try_next_sender.send(())).unwrap();
+ // Make sure item is picked
+ block_on(handle_item_ready_receiver.next()).unwrap();
+
+ gst::debug!(RUNTIME_CAT, "nominal: requesting to pause");
+ let pause_status = task.pause().check().unwrap();
+
+ gst::debug!(RUNTIME_CAT, "nominal: unlocking item handling");
+ block_on(handle_item_receiver.next()).unwrap();
+
+ gst::debug!(RUNTIME_CAT, "nominal: awaiting pause ack");
+ block_on(paused_receiver.next()).unwrap();
assert_eq!(
- task.stop().block_on().unwrap(),
+ pause_status.block_on().unwrap(),
Complete {
origin: Started,
- target: Stopped,
+ target: Paused,
},
);
- assert_eq!(task.state(), Stopped);
- let _ = block_on(stopped_receiver.next());
+ // try_next not reached again
+ assert!(try_next_ready_receiver.try_next().is_err());
- // purge remaining iteration received before stop if any
- let _ = iterate_receiver.try_next();
-
- gst::debug!(RUNTIME_CAT, "iterate: starting (after stop)");
+ gst::debug!(
+ RUNTIME_CAT,
+ "nominal: starting (after pause // handle_item)"
+ );
+ let start_receiver = task.start().check().unwrap();
+ block_on(started_receiver.next()).unwrap();
assert_eq!(
- task.start().block_on().unwrap(),
+ start_receiver.block_on().unwrap(),
Complete {
- origin: Stopped,
+ origin: Paused,
target: Started,
},
);
- let _ = block_on(started_receiver.next());
+ assert_eq!(task.state(), Started);
- gst::debug!(RUNTIME_CAT, "iterate: req. iterate to return Eos");
- block_on(iterate_receiver.next()).unwrap();
- block_on(complete_iterate_sender.send(Err(gst::FlowError::Eos))).unwrap();
+ gst::debug!(RUNTIME_CAT, "nominal: stopping");
+ assert_eq!(
+ task.stop().block_on().unwrap(),
+ Complete {
+ origin: Started,
+ target: Stopped,
+ },
+ );
- gst::debug!(RUNTIME_CAT, "iterate: awaiting stop ack");
- block_on(stopped_receiver.next()).unwrap();
+ assert_eq!(task.state(), Stopped);
+ let _ = block_on(stopped_receiver.next());
- // Wait for state machine to reach Stopped
- while TaskState::Stopped != task.state() {
- std::thread::sleep(Duration::from_millis(2));
- }
+ // purge remaining iteration received before stop if any
+ let _ = try_next_ready_receiver.try_next();
- gst::debug!(RUNTIME_CAT, "iterate: starting (after stop)");
+ gst::debug!(RUNTIME_CAT, "nominal: starting (after stop)");
assert_eq!(
task.start().block_on().unwrap(),
Complete {
@@ -1513,51 +1491,19 @@ mod tests {
);
let _ = block_on(started_receiver.next());
- gst::debug!(RUNTIME_CAT, "iterate: req. iterate to return Flushing");
- block_on(iterate_receiver.next()).unwrap();
- block_on(complete_iterate_sender.send(Err(gst::FlowError::Flushing))).unwrap();
-
- gst::debug!(RUNTIME_CAT, "iterate: awaiting flush_start ack");
- block_on(flush_start_receiver.next()).unwrap();
-
- // Wait for state machine to reach Flushing
- while TaskState::Flushing != task.state() {
- std::thread::sleep(Duration::from_millis(2));
- }
-
- gst::debug!(RUNTIME_CAT, "iterate: stop flushing");
+ gst::debug!(RUNTIME_CAT, "nominal: stopping");
assert_eq!(
- task.flush_stop().block_on().unwrap(),
+ task.stop().block_on().unwrap(),
Complete {
- origin: Flushing,
- target: Started,
+ origin: Started,
+ target: Stopped,
},
);
- let _ = block_on(started_receiver.next());
-
- gst::debug!(RUNTIME_CAT, "iterate: req. iterate to return Error");
- block_on(iterate_receiver.next()).unwrap();
- block_on(complete_iterate_sender.send(Err(gst::FlowError::Error))).unwrap();
-
- // Wait for state machine to reach Error
- while TaskState::Error != task.state() {
- std::thread::sleep(Duration::from_millis(2));
- }
-
- gst::debug!(RUNTIME_CAT, "iterate: attempting to start (after Error)");
- match task.start().block_on().unwrap_err() {
- TransitionError {
- trigger: Start,
- state: TaskState::Error,
- ..
- } => (),
- other => panic!("{:?}", other),
- }
assert_eq!(
task.unprepare().block_on().unwrap(),
Complete {
- origin: TaskState::Error,
+ origin: Stopped,
target: Unprepared,
},
);
@@ -1575,6 +1521,8 @@ mod tests {
}
impl TaskImpl for TaskPrepareTest {
+ type Item = ();
+
fn prepare(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
async move {
gst::debug!(RUNTIME_CAT, "prepare_error: prepare returning an error");
@@ -1609,8 +1557,12 @@ mod tests {
.boxed()
}
- fn iterate(&mut self) -> BoxFuture<'_, Result<(), gst::FlowError>> {
- future::ok(()).boxed()
+ fn try_next(&mut self) -> BoxFuture<'_, Result<(), gst::FlowError>> {
+ unreachable!("prepare_error: try_next");
+ }
+
+ fn handle_item(&mut self, _item: ()) -> BoxFuture<'_, Result<(), gst::FlowError>> {
+ unreachable!("prepare_error: handle_item");
}
}
@@ -1671,6 +1623,8 @@ mod tests {
}
impl TaskImpl for TaskPrepareTest {
+ type Item = ();
+
fn prepare(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
async move {
gst::debug!(
@@ -1701,9 +1655,13 @@ mod tests {
.boxed()
}
- fn iterate(&mut self) -> BoxFuture<'_, Result<(), gst::FlowError>> {
+ fn try_next(&mut self) -> BoxFuture<'_, Result<(), gst::FlowError>> {
future::pending::<Result<(), gst::FlowError>>().boxed()
}
+
+ fn handle_item(&mut self, _item: ()) -> BoxFuture<'_, Result<(), gst::FlowError>> {
+ unreachable!("prepare_start_ok: handle_item");
+ }
}
let context = Context::acquire("prepare_start_ok", Duration::from_millis(2)).unwrap();
@@ -1795,6 +1753,8 @@ mod tests {
}
impl TaskImpl for TaskPrepareTest {
+ type Item = ();
+
fn prepare(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
async move {
gst::debug!(
@@ -1839,8 +1799,12 @@ mod tests {
unreachable!("prepare_start_error: start");
}
- fn iterate(&mut self) -> BoxFuture<'_, Result<(), gst::FlowError>> {
- unreachable!("prepare_start_error: iterate");
+ fn try_next(&mut self) -> BoxFuture<'_, Result<(), gst::FlowError>> {
+ unreachable!("prepare_start_error: try_next");
+ }
+
+ fn handle_item(&mut self, _item: ()) -> BoxFuture<'_, Result<(), gst::FlowError>> {
+ unreachable!("prepare_start_error: handle_item");
}
}
@@ -1921,154 +1885,85 @@ mod tests {
}
#[test]
- fn pause_start() {
+ fn item_error() {
gst::init().unwrap();
- struct TaskPauseStartTest {
- iterate_sender: mpsc::Sender<()>,
- complete_receiver: mpsc::Receiver<()>,
- paused_sender: mpsc::Sender<()>,
+ struct TaskTest {
+ try_next_receiver: mpsc::Receiver<gst::FlowError>,
}
- impl TaskImpl for TaskPauseStartTest {
- fn iterate(&mut self) -> BoxFuture<'_, Result<(), gst::FlowError>> {
- async move {
- gst::debug!(RUNTIME_CAT, "pause_start: entering iteration");
- self.iterate_sender.send(()).await.unwrap();
-
- gst::debug!(RUNTIME_CAT, "pause_start: iteration awaiting completion");
- self.complete_receiver.next().await.unwrap();
- gst::debug!(RUNTIME_CAT, "pause_start: iteration complete");
+ impl TaskImpl for TaskTest {
+ type Item = gst::FlowError;
- Ok(())
+ fn try_next(&mut self) -> BoxFuture<'_, Result<gst::FlowError, gst::FlowError>> {
+ async move {
+ gst::debug!(RUNTIME_CAT, "item_error: awaiting try_next");
+ Ok(self.try_next_receiver.next().await.unwrap())
}
.boxed()
}
- fn pause(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
+ fn handle_item(
+ &mut self,
+ item: gst::FlowError,
+ ) -> BoxFuture<'_, Result<(), gst::FlowError>> {
async move {
- gst::debug!(RUNTIME_CAT, "pause_start: paused");
- self.paused_sender.send(()).await.unwrap();
- Ok(())
+ gst::debug!(RUNTIME_CAT, "item_error: handle_item received {:?}", item);
+ Err(item)
}
.boxed()
}
}
- let context = Context::acquire("pause_start", Duration::from_millis(2)).unwrap();
-
+ let context = Context::acquire("item_error", Duration::from_millis(2)).unwrap();
let task = Task::default();
+ gst::debug!(RUNTIME_CAT, "item_error: prepare and start");
+ let (mut try_next_sender, try_next_receiver) = mpsc::channel(1);
+ task.prepare(TaskTest { try_next_receiver }, context)
+ .block_on()
+ .unwrap();
+ task.start().block_on().unwrap();
- let (iterate_sender, mut iterate_receiver) = mpsc::channel(1);
- let (mut complete_sender, complete_receiver) = mpsc::channel(0);
- let (paused_sender, mut paused_receiver) = mpsc::channel(1);
- let _ = task.prepare(
- TaskPauseStartTest {
- iterate_sender,
- complete_receiver,
- paused_sender,
- },
- context,
- );
+ gst::debug!(RUNTIME_CAT, "item_error: req. handle_item to return Eos");
+ block_on(try_next_sender.send(gst::FlowError::Eos)).unwrap();
+ // Wait for state machine to reach Stopped
+ while Stopped != task.state() {
+ std::thread::sleep(Duration::from_millis(2));
+ }
- gst::debug!(RUNTIME_CAT, "pause_start: starting");
+ gst::debug!(RUNTIME_CAT, "item_error: starting (after stop)");
assert_eq!(
task.start().block_on().unwrap(),
Complete {
- origin: Prepared,
+ origin: Stopped,
target: Started,
},
);
- assert_eq!(task.state(), Started);
- gst::debug!(RUNTIME_CAT, "pause_start: awaiting 1st iteration");
- block_on(iterate_receiver.next()).unwrap();
-
- gst::debug!(RUNTIME_CAT, "pause_start: pausing (1)");
- match task.pause() {
- Ready(Ok(NotWaiting {
- trigger: Pause,
- origin: Started,
- })) => (),
- other => panic!("{:?}", other),
+ gst::debug!(RUNTIME_CAT, "item_error: req. handle_item to return Error");
+ block_on(try_next_sender.send(gst::FlowError::Error)).unwrap();
+ // Wait for state machine to reach Error
+ while TaskState::Error != task.state() {
+ std::thread::sleep(Duration::from_millis(2));
}
- gst::debug!(RUNTIME_CAT, "pause_start: sending 1st iteration completion");
- complete_sender.try_send(()).unwrap();
-
- // Pause transition is asynchronous FIXME
- while TaskState::Paused != task.state() {
- std::thread::sleep(Duration::from_millis(5));
+ gst::debug!(RUNTIME_CAT, "item_error: attempting to start (after Error)");
+ match task.start().block_on().unwrap_err() {
+ TransitionError {
+ trigger: Start,
+ state: TaskState::Error,
+ ..
+ } => (),
+ other => panic!("{:?}", other),
}
- gst::debug!(RUNTIME_CAT, "pause_start: awaiting paused");
- let _ = block_on(paused_receiver.next());
-
- // Loop held on due to Pause
- iterate_receiver.try_next().unwrap_err();
assert_eq!(
- task.start().block_on().unwrap(),
+ task.unprepare().block_on().unwrap(),
Complete {
- origin: Paused,
- target: Started,
+ origin: TaskState::Error,
+ target: Unprepared,
},
);
- assert_eq!(task.state(), Started);
-
- gst::debug!(RUNTIME_CAT, "pause_start: awaiting 2d iteration");
- block_on(iterate_receiver.next()).unwrap();
-
- gst::debug!(RUNTIME_CAT, "pause_start: sending 2d iteration completion");
- complete_sender.try_send(()).unwrap();
-
- stop_then_unprepare(task);
- }
-
- #[test]
- fn successive_pause_start() {
- // Purpose: check pause cancellation.
- gst::init().unwrap();
-
- struct TaskPauseStartTest {
- iterate_sender: mpsc::Sender<()>,
- }
-
- impl TaskImpl for TaskPauseStartTest {
- fn iterate(&mut self) -> BoxFuture<'_, Result<(), gst::FlowError>> {
- async move {
- gst::debug!(RUNTIME_CAT, "successive_pause_start: iteration");
- self.iterate_sender.send(()).await.unwrap();
-
- Ok(())
- }
- .boxed()
- }
- }
-
- let context = Context::acquire("successive_pause_start", Duration::from_millis(2)).unwrap();
-
- let task = Task::default();
-
- let (iterate_sender, mut iterate_receiver) = mpsc::channel(1);
- let _ = task.prepare(TaskPauseStartTest { iterate_sender }, context);
-
- gst::debug!(RUNTIME_CAT, "successive_pause_start: starting");
- block_on(task.start()).unwrap();
-
- gst::debug!(RUNTIME_CAT, "successive_pause_start: awaiting iteration 1");
- block_on(iterate_receiver.next()).unwrap();
-
- gst::debug!(RUNTIME_CAT, "successive_pause_start: pause and start");
- let _ = task.pause();
- block_on(task.start()).unwrap();
-
- assert_eq!(task.state(), Started);
-
- gst::debug!(RUNTIME_CAT, "successive_pause_start: awaiting iteration 2");
- block_on(iterate_receiver.next()).unwrap();
-
- gst::debug!(RUNTIME_CAT, "successive_pause_start: stopping");
- stop_then_unprepare(task);
}
#[test]
@@ -2081,10 +1976,16 @@ mod tests {
}
impl TaskImpl for TaskFlushTest {
- fn iterate(&mut self) -> BoxFuture<'_, Result<(), gst::FlowError>> {
+ type Item = ();
+
+ fn try_next(&mut self) -> BoxFuture<'_, Result<(), gst::FlowError>> {
future::pending::<Result<(), gst::FlowError>>().boxed()
}
+ fn handle_item(&mut self, _item: ()) -> BoxFuture<'_, Result<(), gst::FlowError>> {
+ unreachable!("flush_regular_sync: handle_item");
+ }
+
fn flush_start(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
async move {
gst::debug!(RUNTIME_CAT, "flush_regular_sync: started flushing");
@@ -2160,10 +2061,16 @@ mod tests {
}
impl TaskImpl for TaskFlushTest {
- fn iterate(&mut self) -> BoxFuture<'_, Result<(), gst::FlowError>> {
+ type Item = ();
+
+ fn try_next(&mut self) -> BoxFuture<'_, Result<(), gst::FlowError>> {
future::pending::<Result<(), gst::FlowError>>().boxed()
}
+ fn handle_item(&mut self, _item: ()) -> BoxFuture<'_, Result<(), gst::FlowError>> {
+ unreachable!("flush_regular_different_context: handle_item");
+ }
+
fn flush_start(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
async move {
gst::debug!(
@@ -2272,10 +2179,16 @@ mod tests {
}
impl TaskImpl for TaskFlushTest {
- fn iterate(&mut self) -> BoxFuture<'_, Result<(), gst::FlowError>> {
+ type Item = ();
+
+ fn try_next(&mut self) -> BoxFuture<'_, Result<(), gst::FlowError>> {
future::pending::<Result<(), gst::FlowError>>().boxed()
}
+ fn handle_item(&mut self, _item: ()) -> BoxFuture<'_, Result<(), gst::FlowError>> {
+ unreachable!("flush_regular_same_context: handle_item");
+ }
+
fn flush_start(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
async move {
gst::debug!(RUNTIME_CAT, "flush_regular_same_context: started flushing");
@@ -2369,11 +2282,16 @@ mod tests {
}
impl TaskImpl for TaskFlushTest {
- fn iterate(&mut self) -> BoxFuture<'_, Result<(), gst::FlowError>> {
+ type Item = ();
+
+ fn try_next(&mut self) -> BoxFuture<'_, Result<(), gst::FlowError>> {
+ future::ok(()).boxed()
+ }
+
+ fn handle_item(&mut self, _item: ()) -> BoxFuture<'_, Result<(), gst::FlowError>> {
async move {
- gst::debug!(RUNTIME_CAT, "flush_from_loop: flush_start from iteration");
- let flush_status = self.task.flush_start();
- match flush_status {
+ gst::debug!(RUNTIME_CAT, "flush_from_loop: flush_start from handle_item");
+ match self.task.flush_start() {
Pending {
trigger: FlushStart,
origin: Started,
@@ -2381,7 +2299,6 @@ mod tests {
} => (),
other => panic!("{:?}", other),
}
- flush_status.await.unwrap();
Ok(())
}
.boxed()
@@ -2440,18 +2357,25 @@ mod tests {
}
impl TaskImpl for TaskStartTest {
- fn iterate(&mut self) -> BoxFuture<'_, Result<(), gst::FlowError>> {
+ type Item = ();
+
+ fn try_next(&mut self) -> BoxFuture<'_, Result<(), gst::FlowError>> {
+ future::ok(()).boxed()
+ }
+
+ fn handle_item(&mut self, _item: ()) -> BoxFuture<'_, Result<(), gst::FlowError>> {
async move {
- gst::debug!(RUNTIME_CAT, "pause_from_loop: entering iteration");
+ gst::debug!(RUNTIME_CAT, "pause_from_loop: entering handle_item");
crate::runtime::time::delay_for(Duration::from_millis(50)).await;
- gst::debug!(RUNTIME_CAT, "pause_from_loop: pause from iteration");
+ gst::debug!(RUNTIME_CAT, "pause_from_loop: pause from handle_item");
match self.task.pause() {
- Ready(Ok(TransitionOk::NotWaiting {
+ Pending {
trigger: Pause,
origin: Started,
- })) => (),
+ ..
+ } => (),
other => panic!("{:?}", other),
}
@@ -2502,10 +2426,16 @@ mod tests {
}
impl TaskImpl for TaskFlushTest {
- fn iterate(&mut self) -> BoxFuture<'_, Result<(), gst::FlowError>> {
+ type Item = ();
+
+ fn try_next(&mut self) -> BoxFuture<'_, Result<(), gst::FlowError>> {
future::pending::<Result<(), gst::FlowError>>().boxed()
}
+ fn handle_item(&mut self, _item: ()) -> BoxFuture<'_, Result<(), gst::FlowError>> {
+ unreachable!("trigger_from_action: handle_item");
+ }
+
fn flush_start(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
async move {
gst::debug!(
@@ -2572,6 +2502,8 @@ mod tests {
}
impl TaskImpl for TaskFlushTest {
+ type Item = ();
+
fn start(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
async move {
gst::debug!(RUNTIME_CAT, "pause_flush_start: started");
@@ -2581,10 +2513,14 @@ mod tests {
.boxed()
}
- fn iterate(&mut self) -> BoxFuture<'_, Result<(), gst::FlowError>> {
+ fn try_next(&mut self) -> BoxFuture<'_, Result<(), gst::FlowError>> {
future::pending::<Result<(), gst::FlowError>>().boxed()
}
+ fn handle_item(&mut self, _item: ()) -> BoxFuture<'_, Result<(), gst::FlowError>> {
+ unreachable!("pause_flush_start: handle_item");
+ }
+
fn flush_start(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
async move {
gst::debug!(RUNTIME_CAT, "pause_flush_start: started flushing");
@@ -2681,6 +2617,8 @@ mod tests {
}
impl TaskImpl for TaskFlushTest {
+ type Item = ();
+
fn start(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
async move {
gst::debug!(RUNTIME_CAT, "pause_flushing_start: started");
@@ -2690,10 +2628,14 @@ mod tests {
.boxed()
}
- fn iterate(&mut self) -> BoxFuture<'_, Result<(), gst::FlowError>> {
+ fn try_next(&mut self) -> BoxFuture<'_, Result<(), gst::FlowError>> {
future::pending::<Result<(), gst::FlowError>>().boxed()
}
+ fn handle_item(&mut self, _item: ()) -> BoxFuture<'_, Result<(), gst::FlowError>> {
+ unreachable!("pause_flushing_start: handle_item");
+ }
+
fn flush_start(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
async move {
gst::debug!(RUNTIME_CAT, "pause_flushing_start: started flushing");
@@ -2779,10 +2721,16 @@ mod tests {
}
impl TaskImpl for TaskStartTest {
- fn iterate(&mut self) -> BoxFuture<'_, Result<(), gst::FlowError>> {
+ type Item = ();
+
+ fn try_next(&mut self) -> BoxFuture<'_, Result<(), gst::FlowError>> {
future::pending::<Result<(), gst::FlowError>>().boxed()
}
+ fn handle_item(&mut self, _item: ()) -> BoxFuture<'_, Result<(), gst::FlowError>> {
+ unreachable!("flush_concurrent_start: handle_item");
+ }
+
fn flush_start(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
async move {
gst::debug!(RUNTIME_CAT, "flush_concurrent_start: started flushing");
@@ -2893,6 +2841,8 @@ mod tests {
}
impl TaskImpl for TaskTimerTest {
+ type Item = ();
+
fn start(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
async move {
self.timer = Some(crate::runtime::time::delay_for(Duration::from_millis(50)));
@@ -2902,12 +2852,18 @@ mod tests {
.boxed()
}
- fn iterate(&mut self) -> BoxFuture<'_, Result<(), gst::FlowError>> {
+ fn try_next(&mut self) -> BoxFuture<'_, Result<(), gst::FlowError>> {
async move {
gst::debug!(RUNTIME_CAT, "start_timer: awaiting timer");
self.timer.take().unwrap().await;
- gst::debug!(RUNTIME_CAT, "start_timer: timer elapsed");
+ Ok(())
+ }
+ .boxed()
+ }
+ fn handle_item(&mut self, _item: ()) -> BoxFuture<'_, Result<(), gst::FlowError>> {
+ async move {
+ gst::debug!(RUNTIME_CAT, "start_timer: timer elapsed");
if let Some(timer_elapsed_sender) = self.timer_elapsed_sender.take() {
timer_elapsed_sender.send(()).unwrap();
}