Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.freedesktop.org/gstreamer/gst-plugins-rs.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorFrançois Laignel <fengalin@free.fr>2020-05-15 20:38:54 +0300
committerFrançois Laignel <fengalin@free.fr>2020-05-25 19:31:49 +0300
commitf0793587f68a19ad4d0401ce98cacc8cb7b1009a (patch)
treee3678c55cc2ba42b760a65d5aa5be79aa3b25436 /generic
parent5c9bbc6818e5059f0c94b0be7c6c170f53e23bbc (diff)
threadshare/TaskImpl: allow transition hooks to fail...
... and add error handlers for iterate and transitions hooks with default implementation.
Diffstat (limited to 'generic')
-rw-r--r--generic/threadshare/src/appsrc.rs41
-rw-r--r--generic/threadshare/src/jitterbuffer/jitterbuffer.rs38
-rw-r--r--generic/threadshare/src/proxy.rs43
-rw-r--r--generic/threadshare/src/queue.rs45
-rw-r--r--generic/threadshare/src/runtime/task.rs1711
-rw-r--r--generic/threadshare/src/tcpclientsrc.rs61
-rw-r--r--generic/threadshare/src/udpsink.rs21
-rw-r--r--generic/threadshare/src/udpsrc.rs42
-rw-r--r--generic/threadshare/tests/pad.rs22
9 files changed, 1321 insertions, 703 deletions
diff --git a/generic/threadshare/src/appsrc.rs b/generic/threadshare/src/appsrc.rs
index 28688ddb3..8bc923029 100644
--- a/generic/threadshare/src/appsrc.rs
+++ b/generic/threadshare/src/appsrc.rs
@@ -252,14 +252,8 @@ impl PadSrcHandler for AppSrcPadHandler {
gst_log!(CAT, obj: pad.gst_pad(), "Handling {:?}", event);
let ret = match event.view() {
- EventView::FlushStart(..) => {
- appsrc.task.flush_start();
- true
- }
- EventView::FlushStop(..) => {
- appsrc.task.flush_stop();
- true
- }
+ EventView::FlushStart(..) => appsrc.task.flush_start().is_ok(),
+ EventView::FlushStop(..) => appsrc.task.flush_stop().is_ok(),
EventView::Reconfigure(..) => true,
EventView::Latency(..) => true,
_ => false,
@@ -355,9 +349,7 @@ impl AppSrcTask {
impl TaskImpl for AppSrcTask {
fn iterate(&mut self) -> BoxFuture<'_, Result<(), gst::FlowError>> {
async move {
- let item = self.receiver.next().await;
-
- let item = match item {
+ let item = match self.receiver.next().await {
Some(item) => item,
None => {
gst_error!(CAT, obj: &self.element, "SrcPad channel aborted");
@@ -404,7 +396,7 @@ impl TaskImpl for AppSrcTask {
.boxed()
}
- fn stop(&mut self) -> BoxFuture<'_, ()> {
+ fn stop(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
async move {
gst_log!(CAT, obj: &self.element, "Stopping task");
@@ -412,11 +404,12 @@ impl TaskImpl for AppSrcTask {
self.src_pad_handler.reset_state().await;
gst_log!(CAT, obj: &self.element, "Task stopped");
+ Ok(())
}
.boxed()
}
- fn flush_start(&mut self) -> BoxFuture<'_, ()> {
+ fn flush_start(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
async move {
gst_log!(CAT, obj: &self.element, "Starting task flush");
@@ -424,6 +417,7 @@ impl TaskImpl for AppSrcTask {
self.src_pad_handler.set_need_segment().await;
gst_log!(CAT, obj: &self.element, "Task flush started");
+ Ok(())
}
.boxed()
}
@@ -544,22 +538,25 @@ impl AppSrc {
gst_debug!(CAT, obj: element, "Unprepared");
}
- fn stop(&self, element: &gst::Element) {
+ fn stop(&self, element: &gst::Element) -> Result<(), gst::ErrorMessage> {
gst_debug!(CAT, obj: element, "Stopping");
- self.task.stop();
+ self.task.stop()?;
gst_debug!(CAT, obj: element, "Stopped");
+ Ok(())
}
- fn start(&self, element: &gst::Element) {
+ fn start(&self, element: &gst::Element) -> Result<(), gst::ErrorMessage> {
gst_debug!(CAT, obj: element, "Starting");
- self.task.start();
+ self.task.start()?;
gst_debug!(CAT, obj: element, "Started");
+ Ok(())
}
- fn pause(&self, element: &gst::Element) {
+ fn pause(&self, element: &gst::Element) -> Result<(), gst::ErrorMessage> {
gst_debug!(CAT, obj: element, "Pausing");
- self.task.pause();
+ self.task.pause()?;
gst_debug!(CAT, obj: element, "Paused");
+ Ok(())
}
}
@@ -714,7 +711,7 @@ impl ElementImpl for AppSrc {
})?;
}
gst::StateChange::PlayingToPaused => {
- self.pause(element);
+ self.pause(element).map_err(|_| gst::StateChangeError)?;
}
gst::StateChange::ReadyToNull => {
self.unprepare(element);
@@ -729,13 +726,13 @@ impl ElementImpl for AppSrc {
success = gst::StateChangeSuccess::NoPreroll;
}
gst::StateChange::PausedToPlaying => {
- self.start(element);
+ self.start(element).map_err(|_| gst::StateChangeError)?;
}
gst::StateChange::PlayingToPaused => {
success = gst::StateChangeSuccess::NoPreroll;
}
gst::StateChange::PausedToReady => {
- self.stop(element);
+ self.stop(element).map_err(|_| gst::StateChangeError)?;
}
_ => (),
}
diff --git a/generic/threadshare/src/jitterbuffer/jitterbuffer.rs b/generic/threadshare/src/jitterbuffer/jitterbuffer.rs
index b5327f607..fb7c03d07 100644
--- a/generic/threadshare/src/jitterbuffer/jitterbuffer.rs
+++ b/generic/threadshare/src/jitterbuffer/jitterbuffer.rs
@@ -662,7 +662,9 @@ impl PadSinkHandler for SinkHandler {
gst_log!(CAT, obj: pad.gst_pad(), "Handling {:?}", event);
if let EventView::FlushStart(..) = event.view() {
- jb.task.flush_start();
+ if let Err(err) = jb.task.flush_start() {
+ gst_error!(CAT, obj: pad.gst_pad(), "FlushStart failed {:?}", err);
+ }
}
gst_log!(CAT, obj: pad.gst_pad(), "Forwarding {:?}", event);
@@ -699,7 +701,10 @@ impl PadSinkHandler for SinkHandler {
.unwrap();
}
EventView::FlushStop(..) => {
- jb.task.flush_stop();
+ if let Err(err) = jb.task.flush_stop() {
+ // FIXME we should probably return false if that one fails
+ gst_error!(CAT, obj: pad.gst_pad(), "FlushStop failed {:?}", err);
+ }
}
EventView::Eos(..) => {
let mut state = jb.state.lock().unwrap();
@@ -975,10 +980,15 @@ impl PadSrcHandler for SrcHandler {
match event.view() {
EventView::FlushStart(..) => {
- jb.task.flush_start();
+ if let Err(err) = jb.task.flush_start() {
+ gst_error!(CAT, obj: pad.gst_pad(), "FlushStart failed {:?}", err);
+ }
}
EventView::FlushStop(..) => {
- jb.task.flush_stop();
+ if let Err(err) = jb.task.flush_stop() {
+ // FIXME we should probably return false if that one fails
+ gst_error!(CAT, obj: pad.gst_pad(), "FlushStop failed {:?}", err);
+ }
}
_ => (),
}
@@ -1125,7 +1135,7 @@ impl JitterBufferTask {
}
impl TaskImpl for JitterBufferTask {
- fn start(&mut self) -> BoxFuture<'_, ()> {
+ fn start(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
async move {
gst_log!(CAT, obj: &self.element, "Starting task");
@@ -1136,6 +1146,7 @@ impl TaskImpl for JitterBufferTask {
*jb.state.lock().unwrap() = State::default();
gst_log!(CAT, obj: &self.element, "Task started");
+ Ok(())
}
.boxed()
}
@@ -1279,7 +1290,7 @@ impl TaskImpl for JitterBufferTask {
.boxed()
}
- fn stop(&mut self) -> BoxFuture<'_, ()> {
+ fn stop(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
async move {
gst_log!(CAT, obj: &self.element, "Stopping task");
@@ -1296,6 +1307,7 @@ impl TaskImpl for JitterBufferTask {
*jb_state = State::default();
gst_log!(CAT, obj: &self.element, "Task stopped");
+ Ok(())
}
.boxed()
}
@@ -1359,16 +1371,18 @@ impl JitterBuffer {
gst_debug!(CAT, obj: element, "Unprepared");
}
- fn start(&self, element: &gst::Element) {
+ fn start(&self, element: &gst::Element) -> Result<(), gst::ErrorMessage> {
gst_debug!(CAT, obj: element, "Starting");
- self.task.start();
+ self.task.start()?;
gst_debug!(CAT, obj: element, "Started");
+ Ok(())
}
- fn stop(&self, element: &gst::Element) {
+ fn stop(&self, element: &gst::Element) -> Result<(), gst::ErrorMessage> {
gst_debug!(CAT, obj: element, "Stopping");
- self.task.stop();
+ self.task.stop()?;
gst_debug!(CAT, obj: element, "Stopped");
+ Ok(())
}
}
@@ -1571,7 +1585,7 @@ impl ElementImpl for JitterBuffer {
})?;
}
gst::StateChange::PausedToReady => {
- self.stop(element);
+ self.stop(element).map_err(|_| gst::StateChangeError)?;
}
gst::StateChange::ReadyToNull => {
self.unprepare(element);
@@ -1583,7 +1597,7 @@ impl ElementImpl for JitterBuffer {
match transition {
gst::StateChange::ReadyToPaused => {
- self.start(element);
+ self.start(element).map_err(|_| gst::StateChangeError)?;
success = gst::StateChangeSuccess::NoPreroll;
}
gst::StateChange::PlayingToPaused => {
diff --git a/generic/threadshare/src/proxy.rs b/generic/threadshare/src/proxy.rs
index 9b56a6ccb..9f2fc4a68 100644
--- a/generic/threadshare/src/proxy.rs
+++ b/generic/threadshare/src/proxy.rs
@@ -825,8 +825,17 @@ impl PadSrcHandler for ProxySrcPadHandler {
};
match event.view() {
- EventView::FlushStart(..) => proxysrc.task.flush_start(),
- EventView::FlushStop(..) => proxysrc.task.flush_stop(),
+ EventView::FlushStart(..) => {
+ if let Err(err) = proxysrc.task.flush_start() {
+ gst_error!(SRC_CAT, obj: pad.gst_pad(), "FlushStart failed {:?}", err);
+ }
+ }
+ EventView::FlushStop(..) => {
+ if let Err(err) = proxysrc.task.flush_stop() {
+ // FIXME we should probably return false if that one fails
+ gst_error!(SRC_CAT, obj: pad.gst_pad(), "FlushStop failed {:?}", err);
+ }
+ }
_ => (),
}
@@ -905,7 +914,7 @@ impl ProxySrcTask {
}
impl TaskImpl for ProxySrcTask {
- fn start(&mut self) -> BoxFuture<'_, ()> {
+ fn start(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
async move {
gst_log!(SRC_CAT, obj: &self.element, "Starting task");
@@ -922,6 +931,7 @@ impl TaskImpl for ProxySrcTask {
self.dataqueue.start();
gst_log!(SRC_CAT, obj: &self.element, "Task started");
+ Ok(())
}
.boxed()
}
@@ -979,7 +989,7 @@ impl TaskImpl for ProxySrcTask {
.boxed()
}
- fn stop(&mut self) -> BoxFuture<'_, ()> {
+ fn stop(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
async move {
gst_log!(SRC_CAT, obj: &self.element, "Stopping task");
@@ -997,11 +1007,12 @@ impl TaskImpl for ProxySrcTask {
}
gst_log!(SRC_CAT, obj: &self.element, "Task stopped");
+ Ok(())
}
.boxed()
}
- fn flush_start(&mut self) -> BoxFuture<'_, ()> {
+ fn flush_start(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
async move {
gst_log!(SRC_CAT, obj: &self.element, "Starting task flush");
@@ -1014,6 +1025,7 @@ impl TaskImpl for ProxySrcTask {
shared_ctx.last_res = Err(gst::FlowError::Flushing);
gst_log!(SRC_CAT, obj: &self.element, "Task flush started");
+ Ok(())
}
.boxed()
}
@@ -1120,22 +1132,25 @@ impl ProxySrc {
gst_debug!(SRC_CAT, obj: element, "Unprepared");
}
- fn stop(&self, element: &gst::Element) {
+ fn stop(&self, element: &gst::Element) -> Result<(), gst::ErrorMessage> {
gst_debug!(SRC_CAT, obj: element, "Stopping");
- self.task.stop();
+ self.task.stop()?;
gst_debug!(SRC_CAT, obj: element, "Stopped");
+ Ok(())
}
- fn start(&self, element: &gst::Element) {
+ fn start(&self, element: &gst::Element) -> Result<(), gst::ErrorMessage> {
gst_debug!(SRC_CAT, obj: element, "Starting");
- self.task.start();
+ self.task.start()?;
gst_debug!(SRC_CAT, obj: element, "Started");
+ Ok(())
}
- fn pause(&self, element: &gst::Element) {
+ fn pause(&self, element: &gst::Element) -> Result<(), gst::ErrorMessage> {
gst_debug!(SRC_CAT, obj: element, "Pausing");
- self.task.pause();
+ self.task.pause()?;
gst_debug!(SRC_CAT, obj: element, "Paused");
+ Ok(())
}
}
@@ -1264,7 +1279,7 @@ impl ElementImpl for ProxySrc {
})?;
}
gst::StateChange::PlayingToPaused => {
- self.pause(element);
+ self.pause(element).map_err(|_| gst::StateChangeError)?;
}
gst::StateChange::ReadyToNull => {
self.unprepare(element);
@@ -1279,13 +1294,13 @@ impl ElementImpl for ProxySrc {
success = gst::StateChangeSuccess::NoPreroll;
}
gst::StateChange::PausedToPlaying => {
- self.start(element);
+ self.start(element).map_err(|_| gst::StateChangeError)?;
}
gst::StateChange::PlayingToPaused => {
success = gst::StateChangeSuccess::NoPreroll;
}
gst::StateChange::PausedToReady => {
- self.stop(element);
+ self.stop(element).map_err(|_| gst::StateChangeError)?;
}
_ => (),
}
diff --git a/generic/threadshare/src/queue.rs b/generic/threadshare/src/queue.rs
index 3c9841836..c8a9a2f89 100644
--- a/generic/threadshare/src/queue.rs
+++ b/generic/threadshare/src/queue.rs
@@ -193,7 +193,9 @@ impl PadSinkHandler for QueuePadSinkHandler {
gst_debug!(CAT, obj: pad.gst_pad(), "Handling non-serialized {:?}", event);
if let EventView::FlushStart(..) = event.view() {
- queue.task.flush_start();
+ if let Err(err) = queue.task.flush_start() {
+ gst_error!(CAT, obj: pad.gst_pad(), "FlushStart failed {:?}", err);
+ }
}
gst_log!(CAT, obj: pad.gst_pad(), "Forwarding non-serialized {:?}", event);
@@ -218,7 +220,10 @@ impl PadSinkHandler for QueuePadSinkHandler {
let queue = Queue::from_instance(&element);
if let EventView::FlushStop(..) = event.view() {
- queue.task.flush_stop();
+ if let Err(err) = queue.task.flush_stop() {
+ // FIXME we should probably return false if that one fails
+ gst_error!(CAT, obj: pad.gst_pad(), "FlushStop failed {:?}", err);
+ }
}
gst_log!(CAT, obj: pad.gst_pad(), "Queuing serialized {:?}", event);
@@ -296,8 +301,17 @@ impl PadSrcHandler for QueuePadSrcHandler {
gst_log!(CAT, obj: pad.gst_pad(), "Handling {:?}", event);
match event.view() {
- EventView::FlushStart(..) => queue.task.flush_start(),
- EventView::FlushStop(..) => queue.task.flush_stop(),
+ EventView::FlushStart(..) => {
+ if let Err(err) = queue.task.flush_start() {
+ gst_error!(CAT, obj: pad.gst_pad(), "FlushStart failed {:?}", err);
+ }
+ }
+ EventView::FlushStop(..) => {
+ if let Err(err) = queue.task.flush_stop() {
+ // FIXME we should probably return false if that one fails
+ gst_error!(CAT, obj: pad.gst_pad(), "FlushStop failed {:?}", err);
+ }
+ }
_ => (),
}
@@ -362,7 +376,7 @@ impl QueueTask {
}
impl TaskImpl for QueueTask {
- fn start(&mut self) -> BoxFuture<'_, ()> {
+ fn start(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
async move {
gst_log!(CAT, obj: &self.element, "Starting task");
@@ -374,6 +388,7 @@ impl TaskImpl for QueueTask {
*last_res = Ok(gst::FlowSuccess::Ok);
gst_log!(CAT, obj: &self.element, "Task started");
+ Ok(())
}
.boxed()
}
@@ -425,7 +440,7 @@ impl TaskImpl for QueueTask {
.boxed()
}
- fn stop(&mut self) -> BoxFuture<'_, ()> {
+ fn stop(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
async move {
gst_log!(CAT, obj: &self.element, "Stopping task");
@@ -442,11 +457,12 @@ impl TaskImpl for QueueTask {
*last_res = Err(gst::FlowError::Flushing);
gst_log!(CAT, obj: &self.element, "Task stopped");
+ Ok(())
}
.boxed()
}
- fn flush_start(&mut self) -> BoxFuture<'_, ()> {
+ fn flush_start(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
async move {
gst_log!(CAT, obj: &self.element, "Starting task flush");
@@ -462,6 +478,7 @@ impl TaskImpl for QueueTask {
*last_res = Err(gst::FlowError::Flushing);
gst_log!(CAT, obj: &self.element, "Task flush started");
+ Ok(())
}
.boxed()
}
@@ -699,16 +716,18 @@ impl Queue {
gst_debug!(CAT, obj: element, "Unprepared");
}
- fn stop(&self, element: &gst::Element) {
+ fn stop(&self, element: &gst::Element) -> Result<(), gst::ErrorMessage> {
gst_debug!(CAT, obj: element, "Stopping");
- self.task.stop();
+ self.task.stop()?;
gst_debug!(CAT, obj: element, "Stopped");
+ Ok(())
}
- fn start(&self, element: &gst::Element) {
+ fn start(&self, element: &gst::Element) -> Result<(), gst::ErrorMessage> {
gst_debug!(CAT, obj: element, "Starting");
- self.task.start();
+ self.task.start()?;
gst_debug!(CAT, obj: element, "Started");
+ Ok(())
}
}
@@ -839,7 +858,7 @@ impl ElementImpl for Queue {
})?;
}
gst::StateChange::PausedToReady => {
- self.stop(element);
+ self.stop(element).map_err(|_| gst::StateChangeError)?;
}
gst::StateChange::ReadyToNull => {
self.unprepare(element);
@@ -850,7 +869,7 @@ impl ElementImpl for Queue {
let success = self.parent_change_state(element, transition)?;
if transition == gst::StateChange::ReadyToPaused {
- self.start(element);
+ self.start(element).map_err(|_| gst::StateChangeError)?;
}
Ok(success)
diff --git a/generic/threadshare/src/runtime/task.rs b/generic/threadshare/src/runtime/task.rs
index ee2eb0ead..be71c6d2f 100644
--- a/generic/threadshare/src/runtime/task.rs
+++ b/generic/threadshare/src/runtime/task.rs
@@ -28,6 +28,7 @@ use gst::{gst_debug, gst_error, gst_error_msg, gst_fixme, gst_log, gst_trace, gs
use std::fmt;
use std::ops::Deref;
+use std::stringify;
use std::sync::{Arc, Mutex, MutexGuard};
use super::executor::{block_on_or_add_sub_task, TaskId};
@@ -35,10 +36,10 @@ use super::{Context, JoinHandle, RUNTIME_CAT};
#[derive(Debug, Eq, PartialEq, Ord, PartialOrd, Hash, Clone, Copy)]
pub enum TaskState {
+ Error,
Flushing,
Paused,
PausedFlushing,
- PrepareFailed,
Prepared,
Preparing,
Started,
@@ -47,22 +48,75 @@ pub enum TaskState {
Unpreparing,
}
+#[derive(Clone, Copy, Debug, Eq, PartialEq)]
+pub enum Transition {
+ Error,
+ FlushStart,
+ FlushStop,
+ Pause,
+ Prepare,
+ Start,
+ Stop,
+ Unprepare,
+}
+
+/// TransitionRequest error details.
#[derive(Clone, Debug, Eq, PartialEq)]
-pub enum TaskError {
- ActiveTask,
- InactiveTask,
+pub struct TransitionError {
+ pub transition: Transition,
+ pub state: TaskState,
+ pub err_msg: gst::ErrorMessage,
}
-impl fmt::Display for TaskError {
+impl fmt::Display for TransitionError {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
- match self {
- TaskError::ActiveTask => write!(f, "Task is active"),
- TaskError::InactiveTask => write!(f, "Task is not active"),
- }
+ write!(
+ f,
+ "{:?} from state {:?}: {:?}",
+ self.transition, self.state, self.err_msg
+ )
+ }
+}
+
+impl std::error::Error for TransitionError {}
+
+impl From<TransitionError> for gst::ErrorMessage {
+ fn from(err: TransitionError) -> Self {
+ err.err_msg
}
}
-impl std::error::Error for TaskError {}
+/// Transition request handling details.
+#[derive(Clone, Debug, Eq, PartialEq)]
+pub enum TransitionStatus {
+ /// Transition completed successfully.
+ Complete {
+ origin: TaskState,
+ target: TaskState,
+ },
+ /// The transition acknowledgement was spawned in a subtask.
+ ///
+ /// This occurs when the transition is requested from a `Context`.
+ Async {
+ transition: Transition,
+ origin: TaskState,
+ },
+ /// Not waiting for transition completion.
+ ///
+ /// This is to prevent:
+ /// - A deadlock when executing from a `TaskImpl` hook.
+ /// - A potential infinite wait when pausing a running loop
+ /// which could be awaiting for an `iterate` to complete.
+ NotWaiting {
+ transition: Transition,
+ origin: TaskState,
+ },
+ /// Skipping transition due to current state.
+ Skipped {
+ transition: Transition,
+ state: TaskState,
+ },
+}
/// Implementation trait for `Task`s.
///
@@ -75,87 +129,147 @@ pub trait TaskImpl: Send + 'static {
future::ok(()).boxed()
}
- /// Handles an error happening during prepare.
- ///
- /// This handler also catches errors returned by subtasks spawned by `prepare`.
- ///
- /// Implementation might use `gst::Element::post_error_message`.
- fn handle_prepare_error(&mut self, err: gst::ErrorMessage) -> BoxFuture<'_, ()> {
- async move {
- gst_error!(
- RUNTIME_CAT,
- "TaskImpl default handle_prepare_error received {:?}",
- err
- );
- }
- .boxed()
- }
-
fn unprepare(&mut self) -> BoxFuture<'_, ()> {
future::ready(()).boxed()
}
- fn start(&mut self) -> BoxFuture<'_, ()> {
- future::ready(()).boxed()
+ fn start(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
+ future::ok(()).boxed()
}
/// Executes an iteration in `TaskState::Started`.
fn iterate(&mut self) -> BoxFuture<'_, Result<(), gst::FlowError>>;
- fn pause(&mut self) -> BoxFuture<'_, ()> {
- future::ready(()).boxed()
+ fn pause(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
+ future::ok(()).boxed()
}
- fn flush_start(&mut self) -> BoxFuture<'_, ()> {
- future::ready(()).boxed()
+ fn flush_start(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
+ future::ok(()).boxed()
}
- fn flush_stop(&mut self) -> BoxFuture<'_, ()> {
- future::ready(()).boxed()
+ fn flush_stop(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
+ future::ok(()).boxed()
}
- fn stop(&mut self) -> BoxFuture<'_, ()> {
- future::ready(()).boxed()
+ fn stop(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
+ future::ok(()).boxed()
}
-}
-#[derive(Clone, Copy, Debug)]
-enum TransitionKind {
- FlushStart,
- FlushStop,
- Pause,
- Prepare,
- Start,
- Stop,
- Unprepare,
+ /// Handles an error occuring during the execution of an iteration.
+ ///
+ /// This handler also catches errors returned by subtasks spawned by the iteration.
+ ///
+ /// If the error is unrecoverable, implementations might use `gst::Element::post_error_message`
+ /// and return `Transition::Error`.
+ ///
+ /// Otherwise, handle the error and return the requested `Transition` to recover.
+ ///
+ /// Default behaviour depends on the `err`:
+ ///
+ /// - `FlowError::Flushing` -> `Transition::FlushStart`.
+ /// - `FlowError::Eos` -> `Transition::Stop`.
+ /// - Other `FlowError` -> `Transition::Error`.
+ fn handle_iterate_error(&mut self, err: gst::FlowError) -> BoxFuture<'_, Transition> {
+ async move {
+ match err {
+ gst::FlowError::Flushing => {
+ gst_debug!(
+ RUNTIME_CAT,
+ "TaskImpl iterate returned Flushing. Posting FlushStart"
+ );
+ Transition::FlushStart
+ }
+ gst::FlowError::Eos => {
+ gst_debug!(RUNTIME_CAT, "TaskImpl iterate returned Eos. Posting Stop");
+ Transition::Stop
+ }
+ other => {
+ gst_error!(
+ RUNTIME_CAT,
+ "TaskImpl iterate returned {:?}. Posting Error",
+ other
+ );
+ Transition::Error
+ }
+ }
+ }
+ .boxed()
+ }
+
+ /// Handles an error occuring during the execution of a transition hook.
+ ///
+ /// This handler also catches errors returned by subtasks spawned by the transition hook.
+ ///
+ /// If the error is unrecoverable, implementations might use `gst::Element::post_error_message`
+ /// and return `Transition::Error`.
+ ///
+ /// Otherwise, handle the error and return the requested `Transition` to recover.
+ ///
+ /// Default is to `gst_error` log and return `Transition::Error`.
+ fn handle_hook_error(
+ &mut self,
+ transition: Transition,
+ state: TaskState,
+ err: gst::ErrorMessage,
+ ) -> BoxFuture<'_, Transition> {
+ async move {
+ gst_error!(
+ RUNTIME_CAT,
+ "TaskImpl hook error during {:?} from {:?}: {:?}. Posting Transition::Error",
+ transition,
+ state,
+ err,
+ );
+
+ Transition::Error
+ }
+ .boxed()
+ }
}
-struct Transition {
- kind: TransitionKind,
- ack_tx: Option<oneshot::Sender<Transition>>,
+struct TransitionRequest {
+ kind: Transition,
+ ack_tx: oneshot::Sender<Result<TransitionStatus, TransitionError>>,
}
-impl Transition {
- fn new(kind: TransitionKind) -> (Self, oneshot::Receiver<Transition>) {
+impl TransitionRequest {
+ fn new(
+ kind: Transition,
+ ) -> (
+ Self,
+ oneshot::Receiver<Result<TransitionStatus, TransitionError>>,
+ ) {
let (ack_tx, ack_rx) = oneshot::channel();
- let req = Transition {
- kind,
- ack_tx: Some(ack_tx),
- };
+ let req = TransitionRequest { kind, ack_tx };
(req, ack_rx)
}
- fn send_ack(mut self) {
- if let Some(ack_tx) = self.ack_tx.take() {
- let _ = ack_tx.send(self);
- }
+ fn send_ack(self, res: Result<TransitionStatus, TransitionError>) {
+ let _ = self.ack_tx.send(res);
+ }
+
+ fn send_err_ack(self) {
+ let res = Err(TransitionError {
+ transition: self.kind,
+ state: TaskState::Error,
+ err_msg: gst_error_msg!(
+ gst::CoreError::StateChange,
+ [
+ "Transition {:?} failed due to a previous unrecoverable error",
+ self.kind,
+ ]
+ ),
+ });
+
+ self.send_ack(res);
}
}
-impl fmt::Debug for Transition {
+impl fmt::Debug for TransitionRequest {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
- f.debug_struct("Transition")
+ f.debug_struct("TransitionRequest")
.field("kind", &self.kind)
.finish()
}
@@ -175,7 +289,7 @@ struct TaskInner {
state_machine_handle: Option<JoinHandle<()>>,
// The transition channel allows serializing transitions handling,
// preventing race conditions when transitions are run in //.
- transition_tx: Option<async_mpsc::Sender<Transition>>,
+ transition_tx: Option<async_mpsc::Sender<TransitionRequest>>,
prepare_abort_handle: Option<AbortHandle>,
loop_abort_handle: Option<AbortHandle>,
spawned_task_id: Option<TaskId>,
@@ -197,32 +311,67 @@ impl Default for TaskInner {
}
impl TaskInner {
- fn switch_to_state(&mut self, state: TaskState, transition: Transition) {
- self.state = state;
- transition.send_ack();
+ fn switch_to_state(&mut self, target_state: TaskState, transition_req: TransitionRequest) {
+ let res = Ok(TransitionStatus::Complete {
+ origin: self.state,
+ target: target_state,
+ });
+
+ self.state = target_state;
+ transition_req.send_ack(res);
}
- fn skip_transition(&mut self, transition: Transition) {
- transition.send_ack();
+ fn switch_to_err(&mut self, transition_req: TransitionRequest) {
+ let res = Err(TransitionError {
+ transition: transition_req.kind,
+ state: self.state,
+ err_msg: gst_error_msg!(
+ gst::CoreError::StateChange,
+ [
+ "Unrecoverable error for {:?} from state {:?}",
+ transition_req,
+ self.state,
+ ]
+ ),
+ });
+
+ self.state = TaskState::Error;
+ transition_req.send_ack(res);
}
- fn push_transition(
+ fn skip_transition(&mut self, transition_req: TransitionRequest) {
+ let res = Ok(TransitionStatus::Skipped {
+ transition: transition_req.kind,
+ state: self.state,
+ });
+
+ transition_req.send_ack(res);
+ }
+
+ fn request_transition(
&mut self,
- kind: TransitionKind,
- ) -> Result<oneshot::Receiver<Transition>, TaskError> {
- if self.transition_tx.is_none() {
- return Err(TaskError::InactiveTask);
- }
+ kind: Transition,
+ ) -> Result<oneshot::Receiver<Result<TransitionStatus, TransitionError>>, TransitionError> {
+ let transition_tx = self.transition_tx.as_mut().unwrap();
- let (transition, ack_rx) = Transition::new(kind);
+ let (transition_req, ack_rx) = TransitionRequest::new(kind);
- gst_log!(RUNTIME_CAT, "Requesting {:?}", transition);
+ gst_log!(RUNTIME_CAT, "Pushing {:?}", transition_req);
- self.transition_tx
- .as_mut()
- .unwrap()
- .try_send(transition)
- .or(Err(TaskError::InactiveTask))?;
+ transition_tx.try_send(transition_req).or_else(|err| {
+ let resource_err = if err.is_full() {
+ gst::ResourceError::NoSpaceLeft
+ } else {
+ gst::ResourceError::Close
+ };
+
+ gst_warning!(RUNTIME_CAT, "Unable to send {:?}: {:?}", kind, err);
+ Err(TransitionError {
+ transition: kind,
+ state: self.state,
+ err_msg: gst_error_msg!(resource_err, ["Unable to send {:?}: {:?}", kind, err]),
+ })
+ })?;
Ok(ack_rx)
}
@@ -275,30 +424,33 @@ impl Task {
self.0.lock().unwrap().context.as_ref().cloned()
}
- pub fn prepare(&self, task_impl: impl TaskImpl, context: Context) -> Result<(), TaskError> {
+ pub fn prepare(
+ &self,
+ task_impl: impl TaskImpl,
+ context: Context,
+ ) -> Result<TransitionStatus, TransitionError> {
let mut inner = self.0.lock().unwrap();
- match inner.state {
+ let origin = inner.state;
+ match origin {
TaskState::Unprepared => (),
- TaskState::Prepared => {
- gst_debug!(RUNTIME_CAT, "Task already prepared");
- return Err(TaskError::ActiveTask);
- }
- TaskState::Preparing => {
- gst_warning!(RUNTIME_CAT, "Task already preparing");
- return Err(TaskError::ActiveTask);
- }
- TaskState::Unpreparing => {
- gst_warning!(RUNTIME_CAT, "Task unpreparing");
- return Err(TaskError::InactiveTask);
+ TaskState::Prepared | TaskState::Preparing => {
+ gst_debug!(RUNTIME_CAT, "Task already {:?}", origin);
+ return Ok(TransitionStatus::Skipped {
+ transition: Transition::Prepare,
+ state: origin,
+ });
}
state => {
- gst_warning!(
- RUNTIME_CAT,
- "Attempt to prepare a task in state {:?}",
- state
- );
- return Err(TaskError::ActiveTask);
+ gst_warning!(RUNTIME_CAT, "Attempt to prepare Task in state {:?}", state);
+ return Err(TransitionError {
+ transition: Transition::Prepare,
+ state: inner.state,
+ err_msg: gst_error_msg!(
+ gst::CoreError::StateChange,
+ ["Attempt to prepare Task in state {:?}", state]
+ ),
+ });
}
}
@@ -312,11 +464,11 @@ impl Task {
// this determines the contention on the Task.
let (transition_tx, transition_rx) = async_mpsc::channel(4);
let state_machine = StateMachine::new(Box::new(task_impl), transition_rx);
- let (transition, _) = Transition::new(TransitionKind::Prepare);
+ let (transition_req, _) = TransitionRequest::new(Transition::Prepare);
inner.state_machine_handle = Some(inner.state_machine_context.spawn(state_machine.run(
Arc::clone(&self.0),
context.clone(),
- transition,
+ transition_req,
)));
inner.transition_tx = Some(transition_tx);
@@ -324,44 +476,51 @@ impl Task {
gst_log!(RUNTIME_CAT, "Task state machine started");
- Ok(())
+ Ok(TransitionStatus::Async {
+ transition: Transition::Prepare,
+ origin,
+ })
}
- pub fn unprepare(&self) -> Result<(), TaskError> {
+ pub fn unprepare(&self) -> Result<TransitionStatus, TransitionError> {
let mut inner = self.0.lock().unwrap();
- match inner.state {
- TaskState::Stopped
- | TaskState::Prepared
- | TaskState::PrepareFailed
- | TaskState::Preparing => (),
- TaskState::Unprepared => {
- gst_debug!(RUNTIME_CAT, "Task already unprepared");
- return Err(TaskError::InactiveTask);
+ let origin = inner.state;
+ match origin {
+ TaskState::Stopped | TaskState::Error | TaskState::Prepared | TaskState::Preparing => {
+ gst_debug!(RUNTIME_CAT, "Unpreparing task");
}
- TaskState::Unpreparing => {
- gst_debug!(RUNTIME_CAT, "Task already unpreparing");
- return Err(TaskError::InactiveTask);
+ TaskState::Unprepared | TaskState::Unpreparing => {
+ gst_debug!(RUNTIME_CAT, "Task already {:?}", origin);
+ return Ok(TransitionStatus::Skipped {
+ transition: Transition::Unprepare,
+ state: origin,
+ });
}
state => {
gst_warning!(
RUNTIME_CAT,
- "Attempt to unprepare a task in state {:?}",
+ "Attempt to unprepare Task in state {:?}",
state
);
- return Err(TaskError::ActiveTask);
+ return Err(TransitionError {
+ transition: Transition::Unprepare,
+ state: inner.state,
+ err_msg: gst_error_msg!(
+ gst::CoreError::StateChange,
+ ["Attempt to unprepare Task in state {:?}", state]
+ ),
+ });
}
}
- gst_debug!(RUNTIME_CAT, "Unpreparing task");
-
inner.state = TaskState::Unpreparing;
if let Some(loop_abort_handle) = inner.loop_abort_handle.take() {
loop_abort_handle.abort();
}
- let _ = inner.push_transition(TransitionKind::Unprepare).unwrap();
+ let _ = inner.request_transition(Transition::Unprepare).unwrap();
let transition_tx = inner.transition_tx.take().unwrap();
let state_machine_handle = inner.state_machine_handle.take();
@@ -373,45 +532,57 @@ impl Task {
drop(inner);
- if let Some(state_machine_handle) = state_machine_handle {
- // We can block on the state_machine_handle since `unprepare` is never called
- // from the state machine Context, but from a spawned Task Context subtask.
- gst_log!(
- RUNTIME_CAT,
- "Synchronously waiting for the state machine {:?}",
- state_machine_handle,
- );
- let _ = block_on_or_add_sub_task(state_machine_handle);
- }
+ match state_machine_handle {
+ Some(state_machine_handle) => {
+ gst_log!(
+ RUNTIME_CAT,
+ "Synchronously waiting for the state machine {:?}",
+ state_machine_handle,
+ );
+ let join_fut = block_on_or_add_sub_task(async {
+ state_machine_handle.await.unwrap();
+
+ drop(transition_tx);
+ drop(context);
- drop(transition_tx);
- drop(context);
+ gst_debug!(RUNTIME_CAT, "Task unprepared");
+ });
- gst_debug!(RUNTIME_CAT, "Task unprepared");
+ if join_fut.is_none() {
+ return Ok(TransitionStatus::Async {
+ transition: Transition::Unprepare,
+ origin,
+ });
+ }
+ }
+ None => {
+ drop(transition_tx);
+ drop(context);
+ }
+ }
- Ok(())
+ Ok(TransitionStatus::Complete {
+ origin,
+ target: TaskState::Unprepared,
+ })
}
- /// `Starts` the `Task`.
+ /// Starts the `Task`.
///
/// The execution occurs on the `Task` context.
- pub fn start(&self) {
+ pub fn start(&self) -> Result<TransitionStatus, TransitionError> {
let mut inner = self.0.lock().unwrap();
- let ack_rx = match inner.push_transition(TransitionKind::Start) {
- Ok(ack_rx) => ack_rx,
- Err(err) => {
- gst_warning!(RUNTIME_CAT, "Error attempting to Start task: {:?}", err);
- return;
- }
- };
+ let ack_rx = inner.request_transition(Transition::Start)?;
if let TaskState::Started = inner.state {
- // Don't await ack_rx because TaskImpl::iterate might be pending
- return;
+ return Ok(TransitionStatus::NotWaiting {
+ transition: Transition::Start,
+ origin: TaskState::Started,
+ });
}
- Self::await_ack(inner, ack_rx, TransitionKind::Start);
+ Self::await_ack(inner, ack_rx, Transition::Start)
}
/// Requests the `Task` loop to pause.
@@ -419,67 +590,68 @@ impl Task {
/// If an iteration is in progress, it will run to completion,
/// then no more iteration will be executed before `start` is called again.
/// Therefore, it is not guaranteed that `Paused` is reached when `pause` returns.
- pub fn pause(&self) {
+ pub fn pause(&self) -> Result<TransitionStatus, TransitionError> {
let mut inner = self.0.lock().unwrap();
- if let Err(err) = inner.push_transition(TransitionKind::Pause) {
- gst_warning!(RUNTIME_CAT, "Error attempting to Pause task: {:?}", err);
+ let ack_rx = inner.request_transition(Transition::Pause)?;
+
+ if let TaskState::Started = inner.state {
+ return Ok(TransitionStatus::NotWaiting {
+ transition: Transition::Pause,
+ origin: TaskState::Started,
+ });
}
+
+ Self::await_ack(inner, ack_rx, Transition::Pause)
}
- pub fn flush_start(&self) {
+ pub fn flush_start(&self) -> Result<TransitionStatus, TransitionError> {
let mut inner = self.0.lock().unwrap();
if let Some(loop_abort_handle) = inner.loop_abort_handle.take() {
loop_abort_handle.abort();
}
- Self::push_and_await_transition(inner, TransitionKind::FlushStart);
+ Self::push_and_await_transition(inner, Transition::FlushStart)
}
- pub fn flush_stop(&self) {
+ pub fn flush_stop(&self) -> Result<TransitionStatus, TransitionError> {
let mut inner = self.0.lock().unwrap();
if let Some(loop_abort_handle) = inner.loop_abort_handle.take() {
loop_abort_handle.abort();
}
- Self::push_and_await_transition(inner, TransitionKind::FlushStop);
+ Self::push_and_await_transition(inner, Transition::FlushStop)
}
/// Stops the `Started` `Task` and wait for it to finish.
- pub fn stop(&self) {
+ pub fn stop(&self) -> Result<TransitionStatus, TransitionError> {
let mut inner = self.0.lock().unwrap();
if let Some(loop_abort_handle) = inner.loop_abort_handle.take() {
loop_abort_handle.abort();
}
- Self::push_and_await_transition(inner, TransitionKind::Stop);
+ Self::push_and_await_transition(inner, Transition::Stop)
}
fn push_and_await_transition(
mut inner: MutexGuard<TaskInner>,
- transition_kind: TransitionKind,
- ) {
- match inner.push_transition(transition_kind) {
- Ok(ack_rx) => Self::await_ack(inner, ack_rx, transition_kind),
- Err(err) => {
- gst_warning!(
- RUNTIME_CAT,
- "Error attempting to {:?} task: {:?}",
- transition_kind,
- err
- );
- }
- }
+ transition: Transition,
+ ) -> Result<TransitionStatus, TransitionError> {
+ let ack_rx = inner.request_transition(transition)?;
+
+ Self::await_ack(inner, ack_rx, transition)
}
fn await_ack(
inner: MutexGuard<TaskInner>,
- ack_rx: oneshot::Receiver<Transition>,
- transition_kind: TransitionKind,
- ) {
+ ack_rx: oneshot::Receiver<Result<TransitionStatus, TransitionError>>,
+ transition: Transition,
+ ) -> Result<TransitionStatus, TransitionError> {
+ let origin = inner.state;
+
// Since transition handling is serialized by the state machine and
// we hold a lock on TaskInner, we can verify if current spawned loop / tansition hook
// task_id matches the task_id of current subtask, if any.
@@ -490,10 +662,10 @@ impl Task {
// Don't block as this would deadlock
gst_log!(
RUNTIME_CAT,
- "Transitionning to {:?} from loop or transition hook, not waiting",
- transition_kind,
+ "Requested {:?} from loop or transition hook, not waiting",
+ transition,
);
- return;
+ return Ok(TransitionStatus::NotWaiting { transition, origin });
}
}
}
@@ -501,24 +673,28 @@ impl Task {
drop(inner);
block_on_or_add_sub_task(async move {
- gst_trace!(RUNTIME_CAT, "Awaiting ack for {:?}", transition_kind);
- if let Ok(transition) = ack_rx.await {
- gst_log!(RUNTIME_CAT, "Received ack for {:?}", transition);
+ gst_trace!(RUNTIME_CAT, "Awaiting ack for {:?}", transition);
+
+ let res = ack_rx.await.unwrap();
+ if res.is_ok() {
+ gst_log!(RUNTIME_CAT, "Received ack {:?} for {:?}", res, transition);
} else {
- gst_log!(
- RUNTIME_CAT,
- "Transition to {:?} was dropped",
- transition_kind
- );
+ gst_error!(RUNTIME_CAT, "Received ack {:?} for {:?}", res, transition);
}
- });
+
+ res
+ })
+ .unwrap_or_else(|| {
+ // Future was spawned as a subtask
+ Ok(TransitionStatus::Async { transition, origin })
+ })
}
}
struct StateMachine {
task_impl: Box<dyn TaskImpl>,
- transition_rx: async_mpsc::Receiver<Transition>,
- pending_transition: Option<Transition>,
+ transition_rx: async_mpsc::Receiver<TransitionRequest>,
+ pending_transition: Option<TransitionRequest>,
}
// Make sure the Context doesn't throttle otherwise we end up with long delays
@@ -526,11 +702,40 @@ struct StateMachine {
// serializes the transitions and the Context's scheduler gets a chance to reach its
// throttling state between 2 elements.
-macro_rules! spawn_hook {
- ($hook:ident, $self:ident, $task_inner:expr, $context:expr) => {{
+macro_rules! exec_hook {
+ ($self:ident, $hook:ident, $transition_req:expr, $origin:expr, $task_inner:expr, $context:expr) => {{
+ let transition = $transition_req.kind;
+
let hook_fut = async move {
- $self.task_impl.$hook().await;
- $self
+ let mut res = $self.task_impl.$hook().await;
+
+ if res.is_ok() {
+ while Context::current_has_sub_tasks() {
+ gst_trace!(RUNTIME_CAT, "Draining subtasks for {}", stringify!($hook));
+ res = Context::drain_sub_tasks().await.map_err(|err| {
+ let msg = format!("{} subtask returned {:?}", stringify!($hook), err);
+ gst_log!(RUNTIME_CAT, "{}", &msg);
+ gst_error_msg!(gst::CoreError::StateChange, ["{}", &msg])
+ });
+
+ if res.is_err() {
+ break;
+ }
+ }
+ }
+
+ let res = match res {
+ Ok(()) => Ok(()),
+ Err(err) => {
+ let next_transition_req = $self
+ .task_impl
+ .handle_hook_error(transition, $origin, err)
+ .await;
+ Err(next_transition_req)
+ }
+ };
+
+ ($self, res)
};
let join_handle = {
@@ -541,14 +746,36 @@ macro_rules! spawn_hook {
join_handle
};
- join_handle.map(|res| res.unwrap())
+ let (this, res) = join_handle.map(|res| res.unwrap()).await;
+ $self = this;
+
+ match res {
+ Ok(()) => Ok($transition_req),
+ Err(next_transition_req) => {
+ // Convert transition according to the error handler's decision
+ gst_trace!(
+ RUNTIME_CAT,
+ "TaskImpl hook error: converting {:?} to {:?}",
+ $transition_req.kind,
+ next_transition_req,
+ );
+
+ $transition_req.kind = next_transition_req;
+ $self.pending_transition = Some($transition_req);
+
+ Err(())
+ }
+ }
}};
}
impl StateMachine {
// Use dynamic dispatch for TaskImpl as it reduces memory usage compared to monomorphization
// without inducing any significant performance penalties.
- fn new(task_impl: Box<dyn TaskImpl>, transition_rx: async_mpsc::Receiver<Transition>) -> Self {
+ fn new(
+ task_impl: Box<dyn TaskImpl>,
+ transition_rx: async_mpsc::Receiver<TransitionRequest>,
+ ) -> Self {
StateMachine {
task_impl,
transition_rx,
@@ -560,15 +787,31 @@ impl StateMachine {
mut self,
task_inner: Arc<Mutex<TaskInner>>,
context: Context,
- transition: Transition,
+ mut transition_req: TransitionRequest,
) {
gst_trace!(RUNTIME_CAT, "Preparing task");
- self = StateMachine::spawn_prepare(self, &task_inner, &context, transition).await;
+ {
+ let res = exec_hook!(
+ self,
+ prepare,
+ transition_req,
+ TaskState::Preparing,
+ &task_inner,
+ &context
+ );
+ if let Ok(transition_req) = res {
+ task_inner
+ .lock()
+ .unwrap()
+ .switch_to_state(TaskState::Prepared, transition_req);
+ gst_trace!(RUNTIME_CAT, "Task Prepared");
+ }
+ }
loop {
- let transition = match self.pending_transition.take() {
- Some(pending_transition) => pending_transition,
+ let mut transition_req = match self.pending_transition.take() {
+ Some(pending_transition_req) => pending_transition_req,
None => self
.transition_rx
.next()
@@ -576,327 +819,340 @@ impl StateMachine {
.expect("transition_rx dropped"),
};
- gst_trace!(RUNTIME_CAT, "State machine popped {:?}", transition);
+ gst_trace!(RUNTIME_CAT, "State machine popped {:?}", transition_req);
- match transition.kind {
- TransitionKind::Start => {
- {
+ match transition_req.kind {
+ Transition::Error => {
+ let mut task_inner = task_inner.lock().unwrap();
+ task_inner.switch_to_err(transition_req);
+ gst_trace!(RUNTIME_CAT, "Switched to Error");
+ }
+ Transition::Start => {
+ let origin = {
let mut task_inner = task_inner.lock().unwrap();
- match task_inner.state {
+ let origin = task_inner.state;
+ match origin {
TaskState::Stopped | TaskState::Paused | TaskState::Prepared => (),
TaskState::PausedFlushing => {
- task_inner.switch_to_state(TaskState::Flushing, transition);
+ task_inner.switch_to_state(TaskState::Flushing, transition_req);
gst_trace!(RUNTIME_CAT, "Switched from PausedFlushing to Flushing");
continue;
}
+ TaskState::Error => {
+ transition_req.send_err_ack();
+ continue;
+ }
state => {
- task_inner.skip_transition(transition);
- gst_trace!(RUNTIME_CAT, "Skipped Pause in state {:?}", state);
+ task_inner.skip_transition(transition_req);
+ gst_trace!(RUNTIME_CAT, "Skipped Start in state {:?}", state);
continue;
}
}
- }
- self = StateMachine::spawn_loop(self, &task_inner, &context, transition).await;
+ origin
+ };
+
+ self =
+ Self::spawn_loop(self, transition_req, origin, &task_inner, &context).await;
// next/pending transition handled in next iteration
}
- TransitionKind::Pause => {
- let target_state = {
+ Transition::Pause => {
+ let (origin, target) = {
let mut task_inner = task_inner.lock().unwrap();
- match task_inner.state {
+ let origin = task_inner.state;
+ match origin {
TaskState::Started | TaskState::Stopped | TaskState::Prepared => {
- TaskState::Paused
+ (origin, TaskState::Paused)
+ }
+ TaskState::Flushing => (origin, TaskState::PausedFlushing),
+ TaskState::Error => {
+ transition_req.send_err_ack();
+ continue;
}
- TaskState::Flushing => TaskState::PausedFlushing,
state => {
- task_inner.skip_transition(transition);
+ task_inner.skip_transition(transition_req);
gst_trace!(RUNTIME_CAT, "Skipped Pause in state {:?}", state);
continue;
}
}
};
- self = spawn_hook!(pause, self, &task_inner, &context).await;
- task_inner
- .lock()
- .unwrap()
- .switch_to_state(target_state, transition);
+ let res =
+ exec_hook!(self, pause, transition_req, origin, &task_inner, &context);
+ if let Ok(transition_req) = res {
+ task_inner
+ .lock()
+ .unwrap()
+ .switch_to_state(target, transition_req);
+ gst_trace!(RUNTIME_CAT, "Task loop {:?}", target);
+ }
}
- TransitionKind::Stop => {
- {
+ Transition::Stop => {
+ let origin = {
let mut task_inner = task_inner.lock().unwrap();
- match task_inner.state {
+ let origin = task_inner.state;
+ match origin {
TaskState::Started
| TaskState::Paused
| TaskState::PausedFlushing
- | TaskState::Flushing => (),
+ | TaskState::Flushing => origin,
+ TaskState::Error => {
+ transition_req.send_err_ack();
+ continue;
+ }
state => {
- task_inner.skip_transition(transition);
+ task_inner.skip_transition(transition_req);
gst_trace!(RUNTIME_CAT, "Skipped Stop in state {:?}", state);
continue;
}
}
- }
+ };
- self = spawn_hook!(stop, self, &task_inner, &context).await;
- task_inner
- .lock()
- .unwrap()
- .switch_to_state(TaskState::Stopped, transition);
- gst_trace!(RUNTIME_CAT, "Task loop stopped");
+ let res = exec_hook!(self, stop, transition_req, origin, &task_inner, &context);
+ if let Ok(transition_req) = res {
+ task_inner
+ .lock()
+ .unwrap()
+ .switch_to_state(TaskState::Stopped, transition_req);
+ gst_trace!(RUNTIME_CAT, "Task loop Stopped");
+ }
}
- TransitionKind::FlushStart => {
- let target_state = {
+ Transition::FlushStart => {
+ let (origin, target) = {
let mut task_inner = task_inner.lock().unwrap();
- match task_inner.state {
- TaskState::Started => TaskState::Flushing,
- TaskState::Paused => TaskState::PausedFlushing,
+ let origin = task_inner.state;
+ match origin {
+ TaskState::Started => (origin, TaskState::Flushing),
+ TaskState::Paused => (origin, TaskState::PausedFlushing),
+ TaskState::Error => {
+ transition_req.send_err_ack();
+ continue;
+ }
state => {
- task_inner.skip_transition(transition);
+ task_inner.skip_transition(transition_req);
gst_trace!(RUNTIME_CAT, "Skipped FlushStart in state {:?}", state);
continue;
}
}
};
- self = spawn_hook!(flush_start, self, &task_inner, &context).await;
- task_inner
- .lock()
- .unwrap()
- .switch_to_state(target_state, transition);
- gst_trace!(RUNTIME_CAT, "Task flush started");
+ let res = exec_hook!(
+ self,
+ flush_start,
+ transition_req,
+ origin,
+ &task_inner,
+ &context
+ );
+ if let Ok(transition_req) = res {
+ task_inner
+ .lock()
+ .unwrap()
+ .switch_to_state(target, transition_req);
+ gst_trace!(RUNTIME_CAT, "Task {:?}", target);
+ }
}
- TransitionKind::FlushStop => {
- let state = task_inner.lock().unwrap().state;
- match state {
- TaskState::Flushing => (),
- TaskState::PausedFlushing => {
- self = spawn_hook!(flush_stop, self, &task_inner, &context).await;
- task_inner
- .lock()
- .unwrap()
- .switch_to_state(TaskState::Paused, transition);
- gst_trace!(RUNTIME_CAT, "Switched from PausedFlushing to Paused");
+ Transition::FlushStop => {
+ let origin = task_inner.lock().unwrap().state;
+ let is_paused = match origin {
+ TaskState::Flushing => false,
+ TaskState::PausedFlushing => true,
+ TaskState::Error => {
+ transition_req.send_err_ack();
continue;
}
state => {
- task_inner.lock().unwrap().skip_transition(transition);
+ task_inner.lock().unwrap().skip_transition(transition_req);
gst_trace!(RUNTIME_CAT, "Skipped FlushStop in state {:?}", state);
continue;
}
- }
+ };
- self = spawn_hook!(flush_stop, self, &task_inner, &context).await;
- self = StateMachine::spawn_loop(self, &task_inner, &context, transition).await;
- // next/pending transition handled in next iteration
+ let res = exec_hook!(
+ self,
+ flush_stop,
+ transition_req,
+ origin,
+ &task_inner,
+ &context
+ );
+ if let Ok(transition_req) = res {
+ if is_paused {
+ task_inner
+ .lock()
+ .unwrap()
+ .switch_to_state(TaskState::Paused, transition_req);
+ gst_trace!(RUNTIME_CAT, "Switched from PausedFlushing to Paused");
+ } else {
+ self = Self::spawn_loop(
+ self,
+ transition_req,
+ origin,
+ &task_inner,
+ &context,
+ )
+ .await;
+ // next/pending transition handled in next iteration
+ }
+ }
}
- TransitionKind::Unprepare => {
- StateMachine::spawn_unprepare(self, context).await;
+ Transition::Unprepare => {
+ // Unprepare is not joined by an ack_rx but by joining the state machine
+ // handle, so we don't need to keep track of the spwaned_task_id
+ context
+ .awake_and_spawn(async move {
+ self.task_impl.unprepare().await;
+
+ while Context::current_has_sub_tasks() {
+ gst_trace!(RUNTIME_CAT, "Draining subtasks for unprepare");
+ let res = Context::drain_sub_tasks().await.map_err(|err| {
+ gst_log!(RUNTIME_CAT, "unprepare subtask returned {:?}", err);
+ err
+ });
+ if res.is_err() {
+ break;
+ }
+ }
+ })
+ .await
+ .unwrap();
+
task_inner
.lock()
.unwrap()
- .switch_to_state(TaskState::Unprepared, transition);
+ .switch_to_state(TaskState::Unprepared, transition_req);
break;
}
- _ => unreachable!("State machine handler {:?}", transition),
+ _ => unreachable!("State machine handler {:?}", transition_req),
}
}
gst_trace!(RUNTIME_CAT, "Task state machine terminated");
}
- async fn prepare(&mut self) -> Result<(), gst::ErrorMessage> {
- self.task_impl.prepare().await?;
-
- gst_trace!(RUNTIME_CAT, "Draining subtasks");
- while Context::current_has_sub_tasks() {
- Context::drain_sub_tasks().await.map_err(|err| {
- gst_error_msg!(
- gst::ResourceError::OpenRead,
- ["Failed to drain substasks while preparing: {:?}", err]
- )
- })?;
- }
-
- Ok(())
- }
-
- async fn run_loop(&mut self, task_inner: Arc<Mutex<TaskInner>>) -> Result<(), gst::FlowError> {
- gst_trace!(RUNTIME_CAT, "Task loop started");
-
- loop {
- // Check if there is any pending transition
- while let Ok(Some(transition)) = self.transition_rx.try_next() {
- gst_trace!(RUNTIME_CAT, "Task loop popped {:?}", transition);
-
- match transition.kind {
- TransitionKind::Start => {
- task_inner.lock().unwrap().skip_transition(transition);
- gst_trace!(RUNTIME_CAT, "Skipped Start in state Started");
- }
- _ => {
- gst_trace!(
- RUNTIME_CAT,
- "Task loop handing {:?} to state machine",
- transition,
- );
- self.pending_transition = Some(transition);
- return Ok(());
- }
- }
- }
-
- // Run the iteration
- self.task_impl.iterate().await.map_err(|err| {
- gst_log!(RUNTIME_CAT, "Task loop iterate impl returned {:?}", err);
- err
- })?;
- }
- }
-
- async fn spawn_prepare(
+ async fn spawn_loop(
mut self,
+ mut transition_req: TransitionRequest,
+ origin: TaskState,
task_inner: &Arc<Mutex<TaskInner>>,
context: &Context,
- transition: Transition,
) -> Self {
let task_inner_clone = Arc::clone(&task_inner);
- let prepare_fut = async move {
- let (abortable_prepare, abort_handle) = abortable(self.prepare());
- task_inner_clone.lock().unwrap().prepare_abort_handle = Some(abort_handle);
-
- let res = abortable_prepare.await.unwrap_or_else(|_| {
- Err(gst_error_msg!(
- gst::ResourceError::OpenRead,
- ["Task preparation aborted"]
- ))
- });
+ let loop_fut = async move {
+ let mut res = self.task_impl.start().await;
+ if res.is_ok() {
+ while Context::current_has_sub_tasks() {
+ gst_trace!(RUNTIME_CAT, "Draining subtasks for start");
+ res = Context::drain_sub_tasks().await.map_err(|err| {
+ let msg = format!("start subtask returned {:?}", err);
+ gst_log!(RUNTIME_CAT, "{}", &msg);
+ gst_error_msg!(gst::CoreError::StateChange, ["{}", &msg])
+ });
+
+ if res.is_err() {
+ break;
+ }
+ }
+ }
match res {
Ok(()) => {
- task_inner_clone
- .lock()
- .unwrap()
- .switch_to_state(TaskState::Prepared, transition);
- gst_log!(RUNTIME_CAT, "Task prepared");
- }
- Err(err) => {
- {
+ let abortable_task_loop = {
+ let (abortable_task_loop, loop_abort_handle) =
+ abortable(self.run_loop(Arc::clone(&task_inner_clone)));
+
let mut task_inner = task_inner_clone.lock().unwrap();
- task_inner.skip_transition(transition);
- task_inner.state = TaskState::PrepareFailed;
- }
+ task_inner.loop_abort_handle = Some(loop_abort_handle);
+ task_inner.switch_to_state(TaskState::Started, transition_req);
- gst_error!(RUNTIME_CAT, "Task preparation failed: {:?}", err);
- self.task_impl.handle_prepare_error(err).await;
+ abortable_task_loop
+ };
+
+ gst_trace!(RUNTIME_CAT, "Starting task loop");
+ match abortable_task_loop.await {
+ Ok(Ok(())) => (),
+ Ok(Err(err)) => {
+ let next_transition = self.task_impl.handle_iterate_error(err).await;
+ let (transition_req, _) = TransitionRequest::new(next_transition);
+ self.pending_transition = Some(transition_req);
+ }
+ Err(Aborted) => gst_trace!(RUNTIME_CAT, "Task loop aborted"),
+ }
+ }
+ Err(err) => {
+ // Error while executing start hook
+ let next_transition = self
+ .task_impl
+ .handle_hook_error(transition_req.kind, origin, err)
+ .await;
gst_log!(
RUNTIME_CAT,
- "Waiting for Unprepare due to task preparation failure"
+ "TaskImpl hook error: converting Start to {:?}",
+ next_transition,
);
- loop {
- let transition = self
- .transition_rx
- .next()
- .await
- .expect("transition_rx dropped");
-
- if let TransitionKind::Unprepare = transition.kind {
- self.pending_transition = Some(transition);
- break;
- } else {
- gst_log!(
- RUNTIME_CAT,
- "Skipping {:?} since task preparation failed",
- transition,
- );
- task_inner_clone.lock().unwrap().skip_transition(transition);
- }
- }
+ transition_req.kind = next_transition;
+ self.pending_transition = Some(transition_req);
}
}
+ // next/pending transition handled in state machine loop
+
self
};
let join_handle = {
let mut task_inner = task_inner.lock().unwrap();
- let join_handle = context.awake_and_spawn(prepare_fut);
+ let join_handle = context.awake_and_spawn(loop_fut);
task_inner.spawned_task_id = Some(join_handle.task_id());
join_handle
};
- join_handle.await.unwrap()
+ join_handle.map(|res| res.unwrap()).await
}
- async fn spawn_loop(
- mut self,
- task_inner: &Arc<Mutex<TaskInner>>,
- context: &Context,
- transition: Transition,
- ) -> Self {
- let task_inner_clone = Arc::clone(&task_inner);
- let loop_fut = async move {
- gst_trace!(RUNTIME_CAT, "Starting task loop");
- self.task_impl.start().await;
-
- let abortable_task_loop = {
- let (abortable_task_loop, loop_abort_handle) =
- abortable(self.run_loop(Arc::clone(&task_inner_clone)));
-
- let mut task_inner = task_inner_clone.lock().unwrap();
- task_inner.loop_abort_handle = Some(loop_abort_handle);
- task_inner.switch_to_state(TaskState::Started, transition);
+ async fn run_loop(&mut self, task_inner: Arc<Mutex<TaskInner>>) -> Result<(), gst::FlowError> {
+ gst_trace!(RUNTIME_CAT, "Task loop started");
- abortable_task_loop
- };
+ loop {
+ // Check if there is any pending transition_req
+ while let Ok(Some(transition_req)) = self.transition_rx.try_next() {
+ gst_trace!(RUNTIME_CAT, "Task loop popped {:?}", transition_req);
- match abortable_task_loop.await {
- Ok(Ok(())) => (),
- Ok(Err(gst::FlowError::Flushing)) => {
- gst_trace!(RUNTIME_CAT, "Starting task flush");
- self.task_impl.flush_start().await;
- task_inner_clone.lock().unwrap().state = TaskState::Flushing;
- gst_trace!(RUNTIME_CAT, "Started task flush");
- }
- Ok(Err(err)) => {
- // Note: this includes EOS
- gst_trace!(RUNTIME_CAT, "Stopping task due to {:?}", err);
- self.task_impl.stop().await;
- task_inner_clone.lock().unwrap().state = TaskState::Stopped;
- gst_trace!(RUNTIME_CAT, "Stopped task due to {:?}", err);
+ match transition_req.kind {
+ Transition::Start => {
+ task_inner.lock().unwrap().skip_transition(transition_req);
+ gst_trace!(RUNTIME_CAT, "Skipped Start in state Started");
+ }
+ _ => {
+ gst_trace!(
+ RUNTIME_CAT,
+ "Task loop handing {:?} to state machine",
+ transition_req,
+ );
+ self.pending_transition = Some(transition_req);
+ return Ok(());
+ }
}
- Err(Aborted) => gst_trace!(RUNTIME_CAT, "Task loop aborted"),
}
- // next/pending transition handled in state machine loop
-
- self
- };
-
- let join_handle = {
- let mut task_inner = task_inner.lock().unwrap();
- let join_handle = context.awake_and_spawn(loop_fut);
- task_inner.spawned_task_id = Some(join_handle.task_id());
-
- join_handle
- };
-
- join_handle.await.unwrap()
- }
+ // Run the iteration
+ self.task_impl.iterate().await.map_err(|err| {
+ gst_log!(RUNTIME_CAT, "Task loop iterate impl returned {:?}", err);
+ err
+ })?;
- async fn spawn_unprepare(mut self, context: Context) {
- // Unprepare is not joined by an ack_rx but by joining the state machine
- // handle, so we don't need to keep track of the spwaned_task_id
- context
- .awake_and_spawn(async move {
- self.task_impl.unprepare().await;
- })
- .await
- .unwrap()
+ while Context::current_has_sub_tasks() {
+ gst_trace!(RUNTIME_CAT, "Draining subtasks for {}", stringify!($hook));
+ Context::drain_sub_tasks().await.map_err(|err| {
+ gst_log!(RUNTIME_CAT, "Task loop iterate subtask returned {:?}", err);
+ err
+ })?;
+ }
+ }
}
}
@@ -934,10 +1190,11 @@ mod tests {
.boxed()
}
- fn start(&mut self) -> BoxFuture<'_, ()> {
+ fn start(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
async move {
gst_debug!(RUNTIME_CAT, "task_iterate: started");
self.started_sender.send(()).await.unwrap();
+ Ok(())
}
.boxed()
}
@@ -954,11 +1211,12 @@ mod tests {
let res = self.complete_iterate_receiver.next().await.unwrap();
if res.is_ok() {
- gst_debug!(RUNTIME_CAT, "task_iterate: received true => keep looping");
+ gst_debug!(RUNTIME_CAT, "task_iterate: received Ok => keep looping");
} else {
gst_debug!(
RUNTIME_CAT,
- "task_iterate: received false => cancelling loop"
+ "task_iterate: received {:?} => cancelling loop",
+ res
);
}
@@ -967,26 +1225,29 @@ mod tests {
.boxed()
}
- fn pause(&mut self) -> BoxFuture<'_, ()> {
+ fn pause(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
async move {
gst_debug!(RUNTIME_CAT, "task_iterate: paused");
self.paused_sender.send(()).await.unwrap();
+ Ok(())
}
.boxed()
}
- fn stop(&mut self) -> BoxFuture<'_, ()> {
+ fn stop(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
async move {
gst_debug!(RUNTIME_CAT, "task_iterate: stopped");
self.stopped_sender.send(()).await.unwrap();
+ Ok(())
}
.boxed()
}
- fn flush_start(&mut self) -> BoxFuture<'_, ()> {
+ fn flush_start(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
async move {
gst_debug!(RUNTIME_CAT, "task_iterate: stopped");
self.flush_start_sender.send(()).await.unwrap();
+ Ok(())
}
.boxed()
}
@@ -1016,23 +1277,37 @@ mod tests {
let (stopped_sender, mut stopped_receiver) = mpsc::channel(1);
let (flush_start_sender, mut flush_start_receiver) = mpsc::channel(1);
let (unprepared_sender, mut unprepared_receiver) = mpsc::channel(1);
- task.prepare(
- TaskTest {
- prepared_sender,
- started_sender,
- iterate_sender,
- complete_iterate_receiver,
- paused_sender,
- stopped_sender,
- flush_start_sender,
- unprepared_sender,
- },
- context,
- )
- .unwrap();
+ let res = task
+ .prepare(
+ TaskTest {
+ prepared_sender,
+ started_sender,
+ iterate_sender,
+ complete_iterate_receiver,
+ paused_sender,
+ stopped_sender,
+ flush_start_sender,
+ unprepared_sender,
+ },
+ context,
+ )
+ .unwrap();
+ assert_eq!(
+ res,
+ TransitionStatus::Async {
+ transition: Transition::Prepare,
+ origin: TaskState::Unprepared,
+ }
+ );
gst_debug!(RUNTIME_CAT, "task_iterate: starting (initial)");
- task.start();
+ assert_eq!(
+ task.start().unwrap(),
+ TransitionStatus::Complete {
+ origin: TaskState::Prepared,
+ target: TaskState::Started
+ },
+ );
assert_eq!(task.state(), TaskState::Started);
// At this point, prepared must be completed
@@ -1048,11 +1323,23 @@ mod tests {
gst_debug!(RUNTIME_CAT, "task_iterate: starting (redundant)");
// start will return immediately
- task.start();
+ assert_eq!(
+ task.start().unwrap(),
+ TransitionStatus::NotWaiting {
+ transition: Transition::Start,
+ origin: TaskState::Started,
+ },
+ );
assert_eq!(task.state(), TaskState::Started);
gst_debug!(RUNTIME_CAT, "task_iterate: pause (initial)");
- task.pause();
+ assert_eq!(
+ task.pause().unwrap(),
+ TransitionStatus::NotWaiting {
+ transition: Transition::Pause,
+ origin: TaskState::Started,
+ },
+ );
// Pause transition is asynchronous
while TaskState::Paused != task.state() {
@@ -1068,14 +1355,26 @@ mod tests {
paused_receiver.next().await.unwrap();
gst_debug!(RUNTIME_CAT, "task_iterate: starting (after pause)");
- task.start();
+ assert_eq!(
+ task.start().unwrap(),
+ TransitionStatus::Complete {
+ origin: TaskState::Paused,
+ target: TaskState::Started
+ },
+ );
assert_eq!(task.state(), TaskState::Started);
// Paused -> Started
let _ = started_receiver.next().await;
- gst_debug!(RUNTIME_CAT, "task_iterate: stopping (initial)");
- task.stop();
+ gst_debug!(RUNTIME_CAT, "task_iterate: stopping");
+ assert_eq!(
+ task.stop().unwrap(),
+ TransitionStatus::Complete {
+ origin: TaskState::Started,
+ target: TaskState::Stopped
+ },
+ );
assert_eq!(task.state(), TaskState::Stopped);
let _ = stopped_receiver.next().await;
@@ -1084,7 +1383,13 @@ mod tests {
let _ = iterate_receiver.try_next();
gst_debug!(RUNTIME_CAT, "task_iterate: starting (after stop)");
- task.start();
+ assert_eq!(
+ task.start().unwrap(),
+ TransitionStatus::Complete {
+ origin: TaskState::Stopped,
+ target: TaskState::Started
+ },
+ );
let _ = started_receiver.next().await;
gst_debug!(RUNTIME_CAT, "task_iterate: req. iterate to return Eos");
@@ -1103,53 +1408,73 @@ mod tests {
}
gst_debug!(RUNTIME_CAT, "task_iterate: starting (after stop)");
- task.start();
+ assert_eq!(
+ task.start().unwrap(),
+ TransitionStatus::Complete {
+ origin: TaskState::Stopped,
+ target: TaskState::Started
+ },
+ );
let _ = started_receiver.next().await;
- gst_debug!(RUNTIME_CAT, "task_iterate: req. iterate to return Error");
+ gst_debug!(RUNTIME_CAT, "task_iterate: req. iterate to return Flushing");
iterate_receiver.next().await.unwrap();
complete_iterate_sender
- .send(Err(gst::FlowError::Error))
+ .send(Err(gst::FlowError::Flushing))
.await
.unwrap();
- gst_debug!(RUNTIME_CAT, "task_iterate: awaiting stop hook ack");
- stopped_receiver.next().await.unwrap();
+ gst_debug!(RUNTIME_CAT, "task_iterate: awaiting flush_start hook ack");
+ flush_start_receiver.next().await.unwrap();
- // Wait for state machine to reach Stopped
- while TaskState::Stopped != task.state() {
+ // Wait for state machine to reach Flushing
+ while TaskState::Flushing != task.state() {
tokio::time::delay_for(Duration::from_millis(2)).await;
}
- gst_debug!(RUNTIME_CAT, "task_iterate: starting (after Error)");
- task.start();
-
- assert_eq!(task.state(), TaskState::Started);
- gst_debug!(RUNTIME_CAT, "task_iterate: awaiting start hook ack");
+ gst_debug!(RUNTIME_CAT, "task_iterate: stop flushing");
+ assert_eq!(
+ task.flush_stop().unwrap(),
+ TransitionStatus::Complete {
+ origin: TaskState::Flushing,
+ target: TaskState::Started
+ },
+ );
let _ = started_receiver.next().await;
- gst_debug!(RUNTIME_CAT, "task_iterate: req. iterate to return Flushing");
+ gst_debug!(RUNTIME_CAT, "task_iterate: req. iterate to return Error");
iterate_receiver.next().await.unwrap();
complete_iterate_sender
- .send(Err(gst::FlowError::Flushing))
+ .send(Err(gst::FlowError::Error))
.await
.unwrap();
- gst_debug!(RUNTIME_CAT, "task_iterate: awaiting flush_start hook ack");
- flush_start_receiver.next().await.unwrap();
-
- // Wait for state machine to reach Flushing
- while TaskState::Flushing != task.state() {
+ // Wait for state machine to reach Error
+ while TaskState::Error != task.state() {
tokio::time::delay_for(Duration::from_millis(2)).await;
}
- gst_debug!(RUNTIME_CAT, "task_iterate: stopping (final)");
- task.stop();
-
- assert_eq!(task.state(), TaskState::Stopped);
- let _ = stopped_receiver.next().await;
+ gst_debug!(
+ RUNTIME_CAT,
+ "task_iterate: attempting to start (after Error)"
+ );
+ let err = task.start().unwrap_err();
+ match err {
+ TransitionError {
+ transition: Transition::Start,
+ state: TaskState::Error,
+ ..
+ } => (),
+ other => unreachable!(other),
+ }
- task.unprepare().unwrap();
+ assert_eq!(
+ task.unprepare().unwrap(),
+ TransitionStatus::Complete {
+ origin: TaskState::Error,
+ target: TaskState::Unprepared
+ },
+ );
assert_eq!(task.state(), TaskState::Unprepared);
let _ = unprepared_receiver.next().await;
@@ -1168,17 +1493,32 @@ mod tests {
async move {
gst_debug!(RUNTIME_CAT, "prepare_error: prepare returning an error");
Err(gst_error_msg!(
- gst::ResourceError::OpenRead,
- ["prepare_error: intentional prepare error"]
+ gst::ResourceError::Failed,
+ ["prepare_error: intentional error"]
))
}
.boxed()
}
- fn handle_prepare_error(&mut self, _err: gst::ErrorMessage) -> BoxFuture<'_, ()> {
+ fn handle_hook_error(
+ &mut self,
+ transition: Transition,
+ state: TaskState,
+ err: gst::ErrorMessage,
+ ) -> BoxFuture<'_, Transition> {
async move {
- gst_debug!(RUNTIME_CAT, "prepare_error: handling prepare error");
- self.prepare_error_sender.send(()).await.unwrap();
+ gst_debug!(
+ RUNTIME_CAT,
+ "prepare_error: handling prepare error {:?}",
+ err
+ );
+ match (transition, state) {
+ (Transition::Prepare, TaskState::Preparing) => {
+ self.prepare_error_sender.send(()).await.unwrap();
+ }
+ other => unreachable!("{:?}", other),
+ }
+ Transition::Error
}
.boxed()
}
@@ -1203,12 +1543,24 @@ mod tests {
)
.unwrap();
- gst_debug!(
- RUNTIME_CAT,
- "prepare_error: await prepare error notification"
- );
+ gst_debug!(RUNTIME_CAT, "prepare_error: await error hook notification");
prepare_error_receiver.next().await.unwrap();
+ // Wait for state machine to reach Error
+ while TaskState::Error != task.state() {
+ tokio::time::delay_for(Duration::from_millis(2)).await;
+ }
+
+ let res = task.start().unwrap_err();
+ match res {
+ TransitionError {
+ transition: Transition::Start,
+ state: TaskState::Error,
+ ..
+ } => (),
+ other => unreachable!("{:?}", other),
+ }
+
task.unprepare().unwrap();
}
@@ -1231,19 +1583,24 @@ mod tests {
);
self.prepare_receiver.next().await.unwrap();
gst_debug!(RUNTIME_CAT, "prepare_start_ok: preparation complete Ok");
-
Ok(())
}
.boxed()
}
- fn handle_prepare_error(&mut self, _err: gst::ErrorMessage) -> BoxFuture<'_, ()> {
+ fn handle_hook_error(
+ &mut self,
+ _transition: Transition,
+ _state: TaskState,
+ _err: gst::ErrorMessage,
+ ) -> BoxFuture<'_, Transition> {
unreachable!("prepare_start_ok: handle_prepare_error");
}
- fn start(&mut self) -> BoxFuture<'_, ()> {
+ fn start(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
async move {
gst_debug!(RUNTIME_CAT, "prepare_start_ok: started");
+ Ok(())
}
.boxed()
}
@@ -1267,17 +1624,36 @@ mod tests {
let start_handle = start_ctx.spawn(async move {
assert_eq!(task_clone.state(), TaskState::Preparing);
gst_debug!(RUNTIME_CAT, "prepare_start_ok: starting");
+ assert_eq!(
+ task_clone.start().unwrap(),
+ TransitionStatus::Async {
+ transition: Transition::Start,
+ origin: TaskState::Preparing,
+ }
+ );
ready_sender.send(()).unwrap();
- task_clone.start();
Context::drain_sub_tasks().await.unwrap();
-
assert_eq!(task_clone.state(), TaskState::Started);
- task_clone.stop();
+ assert_eq!(
+ task.stop().unwrap(),
+ TransitionStatus::Async {
+ transition: Transition::Stop,
+ origin: TaskState::Started,
+ },
+ );
Context::drain_sub_tasks().await.unwrap();
-
- task_clone.unprepare().unwrap();
+ assert_eq!(task_clone.state(), TaskState::Stopped);
+
+ assert_eq!(
+ task.unprepare().unwrap(),
+ TransitionStatus::Async {
+ transition: Transition::Unprepare,
+ origin: TaskState::Stopped,
+ },
+ );
Context::drain_sub_tasks().await.unwrap();
+ assert_eq!(task_clone.state(), TaskState::Unprepared);
});
gst_debug!(RUNTIME_CAT, "prepare_start_ok: awaiting for start_ctx");
@@ -1311,22 +1687,37 @@ mod tests {
gst_debug!(RUNTIME_CAT, "prepare_start_error: preparation complete Err");
Err(gst_error_msg!(
- gst::ResourceError::OpenRead,
- ["prepare_start_error: intentional prepare error"]
+ gst::ResourceError::Failed,
+ ["prepare_start_error: intentional error"]
))
}
.boxed()
}
- fn handle_prepare_error(&mut self, _err: gst::ErrorMessage) -> BoxFuture<'_, ()> {
+ fn handle_hook_error(
+ &mut self,
+ transition: Transition,
+ state: TaskState,
+ err: gst::ErrorMessage,
+ ) -> BoxFuture<'_, Transition> {
async move {
- gst_debug!(RUNTIME_CAT, "prepare_start_error: handling prepare error");
- self.prepare_error_sender.send(()).await.unwrap();
+ gst_debug!(
+ RUNTIME_CAT,
+ "prepare_start_error: handling prepare error {:?}",
+ err
+ );
+ match (transition, state) {
+ (Transition::Prepare, TaskState::Preparing) => {
+ self.prepare_error_sender.send(()).await.unwrap();
+ }
+ other => unreachable!("hook error for {:?}", other),
+ }
+ Transition::Error
}
.boxed()
}
- fn start(&mut self) -> BoxFuture<'_, ()> {
+ fn start(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
unreachable!("prepare_start_error: start");
}
@@ -1356,12 +1747,17 @@ mod tests {
let start_handle = start_ctx.spawn(async move {
assert_eq!(task_clone.state(), TaskState::Preparing);
gst_debug!(RUNTIME_CAT, "prepare_start_error: starting (Err)");
+ task_clone.start().unwrap();
ready_sender.send(()).unwrap();
- task_clone.start();
Context::drain_sub_tasks().await.unwrap();
- assert_eq!(task_clone.state(), TaskState::PrepareFailed);
- task_clone.unprepare().unwrap();
+ assert_eq!(
+ task.unprepare().unwrap(),
+ TransitionStatus::Async {
+ transition: Transition::Unprepare,
+ origin: TaskState::Error,
+ },
+ );
Context::drain_sub_tasks().await.unwrap();
});
@@ -1408,10 +1804,11 @@ mod tests {
.boxed()
}
- fn pause(&mut self) -> BoxFuture<'_, ()> {
+ fn pause(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
async move {
gst_debug!(RUNTIME_CAT, "pause_start: paused");
self.paused_sender.send(()).await.unwrap();
+ Ok(())
}
.boxed()
}
@@ -1435,14 +1832,26 @@ mod tests {
.unwrap();
gst_debug!(RUNTIME_CAT, "pause_start: starting");
- task.start();
+ assert_eq!(
+ task.start().unwrap(),
+ TransitionStatus::Complete {
+ origin: TaskState::Prepared,
+ target: TaskState::Started,
+ },
+ );
assert_eq!(task.state(), TaskState::Started);
gst_debug!(RUNTIME_CAT, "pause_start: awaiting 1st iteration");
iterate_receiver.next().await.unwrap();
gst_debug!(RUNTIME_CAT, "pause_start: pausing (1)");
- task.pause();
+ assert_eq!(
+ task.pause().unwrap(),
+ TransitionStatus::NotWaiting {
+ transition: Transition::Pause,
+ origin: TaskState::Started,
+ },
+ );
gst_debug!(RUNTIME_CAT, "pause_start: sending 1st iteration completion");
complete_sender.try_send(()).unwrap();
@@ -1458,7 +1867,13 @@ mod tests {
// Loop held on due to Pause
iterate_receiver.try_next().unwrap_err();
- task.start();
+ assert_eq!(
+ task.start().unwrap(),
+ TransitionStatus::Complete {
+ origin: TaskState::Paused,
+ target: TaskState::Started,
+ },
+ );
assert_eq!(task.state(), TaskState::Started);
gst_debug!(RUNTIME_CAT, "pause_start: awaiting 2d iteration");
@@ -1467,7 +1882,7 @@ mod tests {
gst_debug!(RUNTIME_CAT, "pause_start: sending 2d iteration completion");
complete_sender.try_send(()).unwrap();
- task.stop();
+ task.stop().unwrap();
task.unprepare().unwrap();
}
@@ -1501,14 +1916,14 @@ mod tests {
.unwrap();
gst_debug!(RUNTIME_CAT, "successive_pause_start: starting");
- task.start();
+ task.start().unwrap();
gst_debug!(RUNTIME_CAT, "successive_pause_start: awaiting iteration 1");
iterate_receiver.next().await.unwrap();
gst_debug!(RUNTIME_CAT, "successive_pause_start: pause and start");
- task.pause();
- task.start();
+ task.pause().unwrap();
+ task.start().unwrap();
assert_eq!(task.state(), TaskState::Started);
@@ -1516,7 +1931,7 @@ mod tests {
iterate_receiver.next().await.unwrap();
gst_debug!(RUNTIME_CAT, "successive_pause_start: stopping");
- task.stop();
+ task.stop().unwrap();
task.unprepare().unwrap();
}
@@ -1534,18 +1949,20 @@ mod tests {
future::pending::<Result<(), gst::FlowError>>().boxed()
}
- fn flush_start(&mut self) -> BoxFuture<'_, ()> {
+ fn flush_start(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
async move {
gst_debug!(RUNTIME_CAT, "flush_regular_sync: started flushing");
self.flush_start_sender.send(()).await.unwrap();
+ Ok(())
}
.boxed()
}
- fn flush_stop(&mut self) -> BoxFuture<'_, ()> {
+ fn flush_stop(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
async move {
gst_debug!(RUNTIME_CAT, "flush_regular_sync: stopped flushing");
self.flush_stop_sender.send(()).await.unwrap();
+ Ok(())
}
.boxed()
}
@@ -1567,22 +1984,34 @@ mod tests {
.unwrap();
gst_debug!(RUNTIME_CAT, "flush_regular_sync: start");
- task.start();
+ task.start().unwrap();
gst_debug!(RUNTIME_CAT, "flush_regular_sync: starting flush");
- task.flush_start();
+ assert_eq!(
+ task.flush_start().unwrap(),
+ TransitionStatus::Complete {
+ origin: TaskState::Started,
+ target: TaskState::Flushing,
+ },
+ );
assert_eq!(task.state(), TaskState::Flushing);
flush_start_receiver.next().await.unwrap();
gst_debug!(RUNTIME_CAT, "flush_regular_sync: stopping flush");
- task.flush_stop();
+ assert_eq!(
+ task.flush_stop().unwrap(),
+ TransitionStatus::Complete {
+ origin: TaskState::Flushing,
+ target: TaskState::Started,
+ },
+ );
assert_eq!(task.state(), TaskState::Started);
flush_stop_receiver.next().await.unwrap();
- task.pause();
- task.stop();
+ task.pause().unwrap();
+ task.stop().unwrap();
task.unprepare().unwrap();
}
@@ -1601,24 +2030,26 @@ mod tests {
future::pending::<Result<(), gst::FlowError>>().boxed()
}
- fn flush_start(&mut self) -> BoxFuture<'_, ()> {
+ fn flush_start(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
async move {
gst_debug!(
RUNTIME_CAT,
"flush_regular_different_context: started flushing"
);
self.flush_start_sender.send(()).await.unwrap();
+ Ok(())
}
.boxed()
}
- fn flush_stop(&mut self) -> BoxFuture<'_, ()> {
+ fn flush_stop(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
async move {
gst_debug!(
RUNTIME_CAT,
"flush_regular_different_context: stopped flushing"
);
self.flush_stop_sender.send(()).await.unwrap();
+ Ok(())
}
.boxed()
}
@@ -1640,18 +2071,30 @@ mod tests {
.unwrap();
gst_debug!(RUNTIME_CAT, "flush_regular_different_context: start");
- task.start();
+ task.start().unwrap();
let oob_context = Context::acquire("flush_regular_different_context_oob", 2).unwrap();
let task_clone = task.clone();
let flush_handle = oob_context.spawn(async move {
- task_clone.flush_start();
+ assert_eq!(
+ task_clone.flush_start().unwrap(),
+ TransitionStatus::Async {
+ transition: Transition::FlushStart,
+ origin: TaskState::Started,
+ },
+ );
Context::drain_sub_tasks().await.unwrap();
assert_eq!(task_clone.state(), TaskState::Flushing);
flush_start_receiver.next().await.unwrap();
- task_clone.flush_stop();
+ assert_eq!(
+ task_clone.flush_stop().unwrap(),
+ TransitionStatus::Async {
+ transition: Transition::FlushStop,
+ origin: TaskState::Flushing,
+ },
+ );
Context::drain_sub_tasks().await.unwrap();
assert_eq!(task_clone.state(), TaskState::Started);
});
@@ -1659,7 +2102,7 @@ mod tests {
flush_handle.await.unwrap();
flush_stop_receiver.next().await.unwrap();
- task.stop();
+ task.stop().unwrap();
task.unprepare().unwrap();
}
@@ -1678,18 +2121,20 @@ mod tests {
future::pending::<Result<(), gst::FlowError>>().boxed()
}
- fn flush_start(&mut self) -> BoxFuture<'_, ()> {
+ fn flush_start(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
async move {
gst_debug!(RUNTIME_CAT, "flush_regular_same_context: started flushing");
self.flush_start_sender.send(()).await.unwrap();
+ Ok(())
}
.boxed()
}
- fn flush_stop(&mut self) -> BoxFuture<'_, ()> {
+ fn flush_stop(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
async move {
gst_debug!(RUNTIME_CAT, "flush_regular_same_context: stopped flushing");
self.flush_stop_sender.send(()).await.unwrap();
+ Ok(())
}
.boxed()
}
@@ -1710,16 +2155,28 @@ mod tests {
)
.unwrap();
- task.start();
+ task.start().unwrap();
let task_clone = task.clone();
let flush_handle = task.context().as_ref().unwrap().spawn(async move {
- task_clone.flush_start();
+ assert_eq!(
+ task_clone.flush_start().unwrap(),
+ TransitionStatus::Async {
+ transition: Transition::FlushStart,
+ origin: TaskState::Started,
+ },
+ );
Context::drain_sub_tasks().await.unwrap();
assert_eq!(task_clone.state(), TaskState::Flushing);
flush_start_receiver.next().await.unwrap();
- task_clone.flush_stop();
+ assert_eq!(
+ task_clone.flush_stop().unwrap(),
+ TransitionStatus::Async {
+ transition: Transition::FlushStop,
+ origin: TaskState::Flushing,
+ },
+ );
Context::drain_sub_tasks().await.unwrap();
assert_eq!(task_clone.state(), TaskState::Started);
});
@@ -1727,7 +2184,7 @@ mod tests {
flush_handle.await.unwrap();
flush_stop_receiver.next().await.unwrap();
- task.stop();
+ task.stop().unwrap();
task.unprepare().unwrap();
}
@@ -1745,16 +2202,23 @@ mod tests {
fn iterate(&mut self) -> BoxFuture<'_, Result<(), gst::FlowError>> {
async move {
gst_debug!(RUNTIME_CAT, "flush_from_loop: flush_start from iteration");
- self.task.flush_start();
- Context::drain_sub_tasks().await
+ assert_eq!(
+ self.task.flush_start().unwrap(),
+ TransitionStatus::NotWaiting {
+ transition: Transition::FlushStart,
+ origin: TaskState::Started,
+ },
+ );
+ Ok(())
}
.boxed()
}
- fn flush_start(&mut self) -> BoxFuture<'_, ()> {
+ fn flush_start(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
async move {
gst_debug!(RUNTIME_CAT, "flush_from_loop: started flushing");
self.flush_start_sender.send(()).await.unwrap();
+ Ok(())
}
.boxed()
}
@@ -1774,7 +2238,7 @@ mod tests {
)
.unwrap();
- task.start();
+ task.start().unwrap();
gst_debug!(
RUNTIME_CAT,
@@ -1782,59 +2246,77 @@ mod tests {
);
flush_start_receiver.next().await.unwrap();
- task.stop();
+ assert_eq!(
+ task.stop().unwrap(),
+ TransitionStatus::Complete {
+ origin: TaskState::Flushing,
+ target: TaskState::Stopped,
+ },
+ );
task.unprepare().unwrap();
}
#[tokio::test]
- async fn start_from_loop() {
+ async fn pause_from_loop() {
// Purpose: make sure a start transition triggered from an iteration doesn't block.
// E.g. an auto pause cancellation after a delay.
gst::init().unwrap();
struct TaskStartTest {
task: Task,
- iterate_sender: mpsc::Sender<()>,
+ pause_sender: mpsc::Sender<()>,
}
impl TaskImpl for TaskStartTest {
fn iterate(&mut self) -> BoxFuture<'_, Result<(), gst::FlowError>> {
async move {
- gst_debug!(RUNTIME_CAT, "start_from_loop: entering iteration");
- self.iterate_sender.send(()).await.unwrap();
+ gst_debug!(RUNTIME_CAT, "pause_from_loop: entering iteration");
tokio::time::delay_for(Duration::from_millis(50)).await;
- gst_debug!(RUNTIME_CAT, "start_from_loop: start from iteration");
- self.task.start();
- Context::drain_sub_tasks().await
+ gst_debug!(RUNTIME_CAT, "pause_from_loop: pause from iteration");
+ assert_eq!(
+ self.task.pause().unwrap(),
+ TransitionStatus::NotWaiting {
+ transition: Transition::Pause,
+ origin: TaskState::Started,
+ },
+ );
+ Ok(())
+ }
+ .boxed()
+ }
+
+ fn pause(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
+ async move {
+ gst_debug!(RUNTIME_CAT, "pause_from_loop: entering pause hook");
+ self.pause_sender.send(()).await.unwrap();
+ Ok(())
}
.boxed()
}
}
- let context = Context::acquire("start_from_loop", 2).unwrap();
+ let context = Context::acquire("pause_from_loop", 2).unwrap();
let task = Task::default();
- let (iterate_sender, mut iterate_receiver) = mpsc::channel(1);
+ let (pause_sender, mut pause_receiver) = mpsc::channel(1);
task.prepare(
TaskStartTest {
task: task.clone(),
- iterate_sender,
+ pause_sender,
},
context,
)
.unwrap();
- task.start();
- iterate_receiver.next().await.unwrap();
- task.pause();
+ task.start().unwrap();
- gst_debug!(RUNTIME_CAT, "start_from_loop: awaiting start notification");
- iterate_receiver.next().await.unwrap();
+ gst_debug!(RUNTIME_CAT, "pause_from_loop: awaiting pause notification");
+ pause_receiver.next().await.unwrap();
- task.stop();
+ task.stop().unwrap();
task.unprepare().unwrap();
}
@@ -1853,22 +2335,29 @@ mod tests {
future::pending::<Result<(), gst::FlowError>>().boxed()
}
- fn flush_start(&mut self) -> BoxFuture<'_, ()> {
+ fn flush_start(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
async move {
gst_debug!(
RUNTIME_CAT,
"transition_from_hook: flush_start triggering flush_stop"
);
- self.task.flush_stop();
- Context::drain_sub_tasks().await.unwrap();
+ assert_eq!(
+ self.task.flush_stop().unwrap(),
+ TransitionStatus::NotWaiting {
+ transition: Transition::FlushStop,
+ origin: TaskState::Started,
+ },
+ );
+ Ok(())
}
.boxed()
}
- fn flush_stop(&mut self) -> BoxFuture<'_, ()> {
+ fn flush_stop(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
async move {
gst_debug!(RUNTIME_CAT, "transition_from_hook: stopped flushing");
self.flush_stop_sender.send(()).await.unwrap();
+ Ok(())
}
.boxed()
}
@@ -1888,8 +2377,8 @@ mod tests {
)
.unwrap();
- task.start();
- task.flush_start();
+ task.start().unwrap();
+ task.flush_start().unwrap();
gst_debug!(
RUNTIME_CAT,
@@ -1897,7 +2386,7 @@ mod tests {
);
flush_stop_receiver.next().await.unwrap();
- task.stop();
+ task.stop().unwrap();
task.unprepare().unwrap();
}
@@ -1912,10 +2401,11 @@ mod tests {
}
impl TaskImpl for TaskFlushTest {
- fn start(&mut self) -> BoxFuture<'_, ()> {
+ fn start(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
async move {
gst_debug!(RUNTIME_CAT, "pause_flush_start: started");
self.started_sender.send(()).await.unwrap();
+ Ok(())
}
.boxed()
}
@@ -1924,18 +2414,20 @@ mod tests {
future::pending::<Result<(), gst::FlowError>>().boxed()
}
- fn flush_start(&mut self) -> BoxFuture<'_, ()> {
+ fn flush_start(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
async move {
gst_debug!(RUNTIME_CAT, "pause_flush_start: started flushing");
self.flush_start_sender.send(()).await.unwrap();
+ Ok(())
}
.boxed()
}
- fn flush_stop(&mut self) -> BoxFuture<'_, ()> {
+ fn flush_stop(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
async move {
gst_debug!(RUNTIME_CAT, "pause_flush_start: stopped flushing");
self.flush_stop_sender.send(()).await.unwrap();
+ Ok(())
}
.boxed()
}
@@ -1961,15 +2453,33 @@ mod tests {
// Pause, FlushStart, FlushStop, Start
gst_debug!(RUNTIME_CAT, "pause_flush_start: pausing");
- task.pause();
+ assert_eq!(
+ task.pause().unwrap(),
+ TransitionStatus::Complete {
+ origin: TaskState::Prepared,
+ target: TaskState::Paused,
+ },
+ );
gst_debug!(RUNTIME_CAT, "pause_flush_start: starting flush");
- task.flush_start();
+ assert_eq!(
+ task.flush_start().unwrap(),
+ TransitionStatus::Complete {
+ origin: TaskState::Paused,
+ target: TaskState::PausedFlushing,
+ },
+ );
assert_eq!(task.state(), TaskState::PausedFlushing);
flush_start_receiver.next().await;
gst_debug!(RUNTIME_CAT, "pause_flush_start: stopping flush");
- task.flush_stop();
+ assert_eq!(
+ task.flush_stop().unwrap(),
+ TransitionStatus::Complete {
+ origin: TaskState::PausedFlushing,
+ target: TaskState::Paused,
+ },
+ );
assert_eq!(task.state(), TaskState::Paused);
flush_stop_receiver.next().await;
@@ -1977,11 +2487,17 @@ mod tests {
started_receiver.try_next().unwrap_err();
gst_debug!(RUNTIME_CAT, "pause_flush_start: starting after flushing");
- task.start();
+ assert_eq!(
+ task.start().unwrap(),
+ TransitionStatus::Complete {
+ origin: TaskState::Paused,
+ target: TaskState::Started,
+ },
+ );
assert_eq!(task.state(), TaskState::Started);
started_receiver.next().await;
- task.stop();
+ task.stop().unwrap();
task.unprepare().unwrap();
}
@@ -1996,10 +2512,11 @@ mod tests {
}
impl TaskImpl for TaskFlushTest {
- fn start(&mut self) -> BoxFuture<'_, ()> {
+ fn start(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
async move {
gst_debug!(RUNTIME_CAT, "pause_flushing_start: started");
self.started_sender.send(()).await.unwrap();
+ Ok(())
}
.boxed()
}
@@ -2008,18 +2525,20 @@ mod tests {
future::pending::<Result<(), gst::FlowError>>().boxed()
}
- fn flush_start(&mut self) -> BoxFuture<'_, ()> {
+ fn flush_start(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
async move {
gst_debug!(RUNTIME_CAT, "pause_flushing_start: started flushing");
self.flush_start_sender.send(()).await.unwrap();
+ Ok(())
}
.boxed()
}
- fn flush_stop(&mut self) -> BoxFuture<'_, ()> {
+ fn flush_stop(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
async move {
gst_debug!(RUNTIME_CAT, "pause_flushing_start: stopped flushing");
self.flush_stop_sender.send(()).await.unwrap();
+ Ok(())
}
.boxed()
}
@@ -2045,27 +2564,39 @@ mod tests {
// Pause, FlushStart, Start, FlushStop
gst_debug!(RUNTIME_CAT, "pause_flushing_start: pausing");
- task.pause();
+ task.pause().unwrap();
gst_debug!(RUNTIME_CAT, "pause_flushing_start: starting flush");
- task.flush_start();
+ task.flush_start().unwrap();
assert_eq!(task.state(), TaskState::PausedFlushing);
flush_start_receiver.next().await;
gst_debug!(RUNTIME_CAT, "pause_flushing_start: starting while flushing");
- task.start();
+ assert_eq!(
+ task.start().unwrap(),
+ TransitionStatus::Complete {
+ origin: TaskState::PausedFlushing,
+ target: TaskState::Flushing,
+ },
+ );
assert_eq!(task.state(), TaskState::Flushing);
// start hook not executed
started_receiver.try_next().unwrap_err();
gst_debug!(RUNTIME_CAT, "pause_flushing_start: stopping flush");
- task.flush_stop();
+ assert_eq!(
+ task.flush_stop().unwrap(),
+ TransitionStatus::Complete {
+ origin: TaskState::Flushing,
+ target: TaskState::Started,
+ },
+ );
assert_eq!(task.state(), TaskState::Started);
flush_stop_receiver.next().await;
started_receiver.next().await;
- task.stop();
+ task.stop().unwrap();
task.unprepare().unwrap();
}
@@ -2085,18 +2616,20 @@ mod tests {
future::pending::<Result<(), gst::FlowError>>().boxed()
}
- fn flush_start(&mut self) -> BoxFuture<'_, ()> {
+ fn flush_start(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
async move {
gst_debug!(RUNTIME_CAT, "flush_concurrent_start: started flushing");
self.flush_start_sender.send(()).await.unwrap();
+ Ok(())
}
.boxed()
}
- fn flush_stop(&mut self) -> BoxFuture<'_, ()> {
+ fn flush_stop(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
async move {
gst_debug!(RUNTIME_CAT, "flush_concurrent_start: stopped flushing");
self.flush_stop_sender.send(()).await.unwrap();
+ Ok(())
}
.boxed()
}
@@ -2120,8 +2653,7 @@ mod tests {
let oob_context = Context::acquire("flush_concurrent_start_oob", 2).unwrap();
let task_clone = task.clone();
- task.start();
- task.pause();
+ task.pause().unwrap();
// Launch flush_start // start
let (ready_sender, ready_receiver) = oneshot::channel();
@@ -2129,7 +2661,18 @@ mod tests {
let flush_start_handle = oob_context.spawn(async move {
gst_debug!(RUNTIME_CAT, "flush_concurrent_start: // flush_start");
ready_sender.send(()).unwrap();
- task_clone.flush_start();
+ let res = task_clone.flush_start().unwrap();
+ match res {
+ TransitionStatus::Async {
+ transition: Transition::FlushStart,
+ origin: TaskState::Paused,
+ } => (),
+ TransitionStatus::Async {
+ transition: Transition::FlushStart,
+ origin: TaskState::Started,
+ } => (),
+ other => unreachable!("{:?}", other),
+ }
Context::drain_sub_tasks().await.unwrap();
flush_start_receiver.next().await.unwrap();
});
@@ -2141,17 +2684,34 @@ mod tests {
ready_receiver.await.unwrap();
gst_debug!(RUNTIME_CAT, "flush_concurrent_start: // start");
- task.start();
+ let res = task.start().unwrap();
+ match res {
+ TransitionStatus::Complete {
+ origin: TaskState::Paused,
+ target: TaskState::Started,
+ } => (),
+ TransitionStatus::Complete {
+ origin: TaskState::PausedFlushing,
+ target: TaskState::Flushing,
+ } => (),
+ other => unreachable!("{:?}", other),
+ }
flush_start_handle.await.unwrap();
gst_debug!(RUNTIME_CAT, "flush_concurrent_start: requesting flush_stop");
- task.flush_stop();
+ assert_eq!(
+ task.flush_stop().unwrap(),
+ TransitionStatus::Complete {
+ origin: TaskState::Flushing,
+ target: TaskState::Started,
+ },
+ );
assert_eq!(task.state(), TaskState::Started);
flush_stop_receiver.next().await;
- task.stop();
+ task.stop().unwrap();
task.unprepare().unwrap();
}
@@ -2167,10 +2727,11 @@ mod tests {
}
impl TaskImpl for TaskTimerTest {
- fn start(&mut self) -> BoxFuture<'_, ()> {
+ fn start(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
async move {
self.timer = Some(tokio::time::delay_for(Duration::from_millis(50)));
gst_debug!(RUNTIME_CAT, "start_timer: started");
+ Ok(())
}
.boxed()
}
@@ -2206,12 +2767,12 @@ mod tests {
.unwrap();
gst_debug!(RUNTIME_CAT, "start_timer: start");
- task.start();
+ task.start().unwrap();
timer_elapsed_receiver.await.unwrap();
gst_debug!(RUNTIME_CAT, "start_timer: timer elapsed received");
- task.stop();
+ task.stop().unwrap();
task.unprepare().unwrap();
}
}
diff --git a/generic/threadshare/src/tcpclientsrc.rs b/generic/threadshare/src/tcpclientsrc.rs
index e754a6e01..c51d43598 100644
--- a/generic/threadshare/src/tcpclientsrc.rs
+++ b/generic/threadshare/src/tcpclientsrc.rs
@@ -41,7 +41,8 @@ use std::u32;
use tokio::io::AsyncReadExt;
use crate::runtime::prelude::*;
-use crate::runtime::{Context, PadSrc, PadSrcRef, PadSrcWeak, Task};
+use crate::runtime::task::Transition;
+use crate::runtime::{Context, PadSrc, PadSrcRef, PadSrcWeak, Task, TaskState};
use super::socket::{Socket, SocketError, SocketRead};
@@ -264,16 +265,8 @@ impl PadSrcHandler for TcpClientSrcPadHandler {
gst_log!(CAT, obj: pad.gst_pad(), "Handling {:?}", event);
let ret = match event.view() {
- EventView::FlushStart(..) => {
- tcpclientsrc.task.flush_start();
-
- true
- }
- EventView::FlushStop(..) => {
- tcpclientsrc.task.flush_stop();
-
- true
- }
+ EventView::FlushStart(..) => tcpclientsrc.task.flush_start().is_ok(),
+ EventView::FlushStop(..) => tcpclientsrc.task.flush_stop().is_ok(),
EventView::Reconfigure(..) => true,
EventView::Latency(..) => true,
_ => false,
@@ -393,16 +386,27 @@ impl TaskImpl for TcpClientSrcTask {
);
gst_log!(CAT, obj: &self.element, "Task prepared");
-
Ok(())
}
.boxed()
}
- fn handle_prepare_error(&mut self, err: gst::ErrorMessage) -> BoxFuture<'_, ()> {
+ fn handle_hook_error(
+ &mut self,
+ transition: Transition,
+ state: TaskState,
+ err: gst::ErrorMessage,
+ ) -> BoxFuture<'_, Transition> {
async move {
- gst_error!(CAT, "Task preparation failed: {:?}", err);
- self.element.post_error_message(&err);
+ match transition {
+ Transition::Prepare => {
+ gst_error!(CAT, "Task preparation failed: {:?}", err);
+ self.element.post_error_message(&err);
+
+ Transition::Error
+ }
+ other => unreachable!("Hook error {:?} in state {:?}", other, state),
+ }
}
.boxed()
}
@@ -474,20 +478,22 @@ impl TaskImpl for TcpClientSrcTask {
.boxed()
}
- fn stop(&mut self) -> BoxFuture<'_, ()> {
+ fn stop(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
async move {
gst_log!(CAT, obj: &self.element, "Stopping task");
self.src_pad_handler.reset_state().await;
gst_log!(CAT, obj: &self.element, "Task stopped");
+ Ok(())
}
.boxed()
}
- fn flush_stop(&mut self) -> BoxFuture<'_, ()> {
+ fn flush_stop(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
async move {
gst_log!(CAT, obj: &self.element, "Stopping task flush");
self.src_pad_handler.set_need_segment().await;
gst_log!(CAT, obj: &self.element, "Task flush stopped");
+ Ok(())
}
.boxed()
}
@@ -584,22 +590,25 @@ impl TcpClientSrc {
gst_debug!(CAT, obj: element, "Unprepared");
}
- fn stop(&self, element: &gst::Element) {
+ fn stop(&self, element: &gst::Element) -> Result<(), gst::ErrorMessage> {
gst_debug!(CAT, obj: element, "Stopping");
- self.task.stop();
+ self.task.stop()?;
gst_debug!(CAT, obj: element, "Stopped");
+ Ok(())
}
- fn start(&self, element: &gst::Element) {
+ fn start(&self, element: &gst::Element) -> Result<(), gst::ErrorMessage> {
gst_debug!(CAT, obj: element, "Starting");
- self.task.start();
+ self.task.start()?;
gst_debug!(CAT, obj: element, "Started");
+ Ok(())
}
- fn pause(&self, element: &gst::Element) {
+ fn pause(&self, element: &gst::Element) -> Result<(), gst::ErrorMessage> {
gst_debug!(CAT, obj: element, "Pausing");
- self.task.pause();
+ self.task.pause()?;
gst_debug!(CAT, obj: element, "Paused");
+ Ok(())
}
}
@@ -721,7 +730,7 @@ impl ElementImpl for TcpClientSrc {
})?;
}
gst::StateChange::PlayingToPaused => {
- self.pause(element);
+ self.pause(element).map_err(|_| gst::StateChangeError)?;
}
gst::StateChange::ReadyToNull => {
self.unprepare(element);
@@ -736,13 +745,13 @@ impl ElementImpl for TcpClientSrc {
success = gst::StateChangeSuccess::NoPreroll;
}
gst::StateChange::PausedToPlaying => {
- self.start(element);
+ self.start(element).map_err(|_| gst::StateChangeError)?;
}
gst::StateChange::PlayingToPaused => {
success = gst::StateChangeSuccess::NoPreroll;
}
gst::StateChange::PausedToReady => {
- self.stop(element);
+ self.stop(element).map_err(|_| gst::StateChangeError)?;
}
_ => (),
}
diff --git a/generic/threadshare/src/udpsink.rs b/generic/threadshare/src/udpsink.rs
index 9cf2e7817..89c2ab4d3 100644
--- a/generic/threadshare/src/udpsink.rs
+++ b/generic/threadshare/src/udpsink.rs
@@ -827,7 +827,7 @@ impl PadSinkHandler for UdpSinkPadHandler {
async move {
if let EventView::FlushStop(_) = event.view() {
let udpsink = UdpSink::from_instance(&element);
- udpsink.task.flush_stop();
+ return udpsink.task.flush_stop().is_ok();
} else if let Some(sender) = sender.lock().await.as_mut() {
if sender.send(TaskItem::Event(event)).await.is_err() {
gst_debug!(CAT, obj: &element, "Flushing");
@@ -847,7 +847,7 @@ impl PadSinkHandler for UdpSinkPadHandler {
event: gst::Event,
) -> bool {
if let EventView::FlushStart(..) = event.view() {
- udpsink.task.flush_start();
+ return udpsink.task.flush_start().is_ok();
}
true
@@ -872,7 +872,7 @@ impl UdpSinkTask {
}
impl TaskImpl for UdpSinkTask {
- fn start(&mut self) -> BoxFuture<'_, ()> {
+ fn start(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
async move {
gst_log!(CAT, obj: &self.element, "Starting task");
@@ -884,6 +884,7 @@ impl TaskImpl for UdpSinkTask {
self.receiver = Some(receiver);
gst_log!(CAT, obj: &self.element, "Task started");
+ Ok(())
}
.boxed()
}
@@ -1109,16 +1110,18 @@ impl UdpSink {
gst_debug!(CAT, obj: element, "Unprepared");
}
- fn stop(&self, element: &gst::Element) {
+ fn stop(&self, element: &gst::Element) -> Result<(), gst::ErrorMessage> {
gst_debug!(CAT, obj: element, "Stopping");
- self.task.stop();
+ self.task.stop()?;
gst_debug!(CAT, obj: element, "Stopped");
+ Ok(())
}
- fn start(&self, element: &gst::Element) {
+ fn start(&self, element: &gst::Element) -> Result<(), gst::ErrorMessage> {
gst_debug!(CAT, obj: element, "Starting");
- self.task.start();
+ self.task.start()?;
gst_debug!(CAT, obj: element, "Started");
+ Ok(())
}
}
@@ -1515,10 +1518,10 @@ impl ElementImpl for UdpSink {
})?;
}
gst::StateChange::ReadyToPaused => {
- self.start(element);
+ self.start(element).map_err(|_| gst::StateChangeError)?;
}
gst::StateChange::PausedToReady => {
- self.stop(element);
+ self.stop(element).map_err(|_| gst::StateChangeError)?;
}
gst::StateChange::ReadyToNull => {
self.unprepare(element);
diff --git a/generic/threadshare/src/udpsrc.rs b/generic/threadshare/src/udpsrc.rs
index 3c4e938ab..111549714 100644
--- a/generic/threadshare/src/udpsrc.rs
+++ b/generic/threadshare/src/udpsrc.rs
@@ -312,16 +312,8 @@ impl PadSrcHandler for UdpSrcPadHandler {
gst_log!(CAT, obj: pad.gst_pad(), "Handling {:?}", event);
let ret = match event.view() {
- EventView::FlushStart(..) => {
- udpsrc.task.flush_start();
-
- true
- }
- EventView::FlushStop(..) => {
- udpsrc.task.flush_stop();
-
- true
- }
+ EventView::FlushStart(..) => udpsrc.task.flush_start().is_ok(),
+ EventView::FlushStop(..) => udpsrc.task.flush_stop().is_ok(),
EventView::Reconfigure(..) => true,
EventView::Latency(..) => true,
_ => false,
@@ -409,12 +401,13 @@ impl UdpSrcTask {
}
impl TaskImpl for UdpSrcTask {
- fn start(&mut self) -> BoxFuture<'_, ()> {
+ fn start(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
async move {
gst_log!(CAT, obj: &self.element, "Starting task");
self.socket
.set_clock(self.element.get_clock(), Some(self.element.get_base_time()));
gst_log!(CAT, obj: &self.element, "Task started");
+ Ok(())
}
.boxed()
}
@@ -504,20 +497,22 @@ impl TaskImpl for UdpSrcTask {
.boxed()
}
- fn stop(&mut self) -> BoxFuture<'_, ()> {
+ fn stop(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
async move {
gst_log!(CAT, obj: &self.element, "Stopping task");
self.src_pad_handler.reset_state().await;
gst_log!(CAT, obj: &self.element, "Task stopped");
+ Ok(())
}
.boxed()
}
- fn flush_stop(&mut self) -> BoxFuture<'_, ()> {
+ fn flush_stop(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
async move {
gst_log!(CAT, obj: &self.element, "Stopping task flush");
self.src_pad_handler.set_need_segment().await;
gst_log!(CAT, obj: &self.element, "Stopped task flush");
+ Ok(())
}
.boxed()
}
@@ -750,22 +745,25 @@ impl UdpSrc {
gst_debug!(CAT, obj: element, "Unprepared");
}
- fn stop(&self, element: &gst::Element) {
+ fn stop(&self, element: &gst::Element) -> Result<(), gst::ErrorMessage> {
gst_debug!(CAT, obj: element, "Stopping");
- self.task.stop();
+ self.task.stop()?;
gst_debug!(CAT, obj: element, "Stopped");
+ Ok(())
}
- fn start(&self, element: &gst::Element) {
+ fn start(&self, element: &gst::Element) -> Result<(), gst::ErrorMessage> {
gst_debug!(CAT, obj: element, "Starting");
- self.task.start();
+ self.task.start()?;
gst_debug!(CAT, obj: element, "Started");
+ Ok(())
}
- fn pause(&self, element: &gst::Element) {
+ fn pause(&self, element: &gst::Element) -> Result<(), gst::ErrorMessage> {
gst_debug!(CAT, obj: element, "Pausing");
- self.task.pause();
+ self.task.pause()?;
gst_debug!(CAT, obj: element, "Paused");
+ Ok(())
}
}
@@ -932,7 +930,7 @@ impl ElementImpl for UdpSrc {
})?;
}
gst::StateChange::PlayingToPaused => {
- self.pause(element);
+ self.pause(element).map_err(|_| gst::StateChangeError)?;
}
gst::StateChange::ReadyToNull => {
self.unprepare(element);
@@ -947,13 +945,13 @@ impl ElementImpl for UdpSrc {
success = gst::StateChangeSuccess::NoPreroll;
}
gst::StateChange::PausedToPlaying => {
- self.start(element);
+ self.start(element).map_err(|_| gst::StateChangeError)?;
}
gst::StateChange::PlayingToPaused => {
success = gst::StateChangeSuccess::NoPreroll;
}
gst::StateChange::PausedToReady => {
- self.stop(element);
+ self.stop(element).map_err(|_| gst::StateChangeError)?;
}
_ => (),
}
diff --git a/generic/threadshare/tests/pad.rs b/generic/threadshare/tests/pad.rs
index 71ffa39fd..0cea01265 100644
--- a/generic/threadshare/tests/pad.rs
+++ b/generic/threadshare/tests/pad.rs
@@ -115,12 +115,12 @@ impl PadSrcHandler for PadSrcTestHandler {
let ret = match event.view() {
EventView::FlushStart(..) => {
- elem_src_test.task.flush_start();
+ elem_src_test.task.flush_start().unwrap();
true
}
EventView::Qos(..) | EventView::Reconfigure(..) | EventView::Latency(..) => true,
EventView::FlushStop(..) => {
- elem_src_test.task.flush_stop();
+ elem_src_test.task.flush_stop().unwrap();
true
}
_ => false,
@@ -188,20 +188,22 @@ impl TaskImpl for ElementSrcTestTask {
.boxed()
}
- fn stop(&mut self) -> BoxFuture<'_, ()> {
+ fn stop(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
async move {
gst_log!(SRC_CAT, obj: &self.element, "Stopping task");
self.flush();
gst_log!(SRC_CAT, obj: &self.element, "Task stopped");
+ Ok(())
}
.boxed()
}
- fn flush_start(&mut self) -> BoxFuture<'_, ()> {
+ fn flush_start(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
async move {
gst_log!(SRC_CAT, obj: &self.element, "Starting task flush");
self.flush();
gst_log!(SRC_CAT, obj: &self.element, "Task flush started");
+ Ok(())
}
.boxed()
}
@@ -253,7 +255,7 @@ impl ElementSrcTest {
)
.map_err(|err| {
gst_error_msg!(
- gst::ResourceError::OpenRead,
+ gst::ResourceError::Failed,
["Error preparing Task: {:?}", err]
)
})?;
@@ -274,19 +276,19 @@ impl ElementSrcTest {
fn stop(&self, element: &gst::Element) {
gst_debug!(SRC_CAT, obj: element, "Stopping");
- self.task.stop();
+ self.task.stop().unwrap();
gst_debug!(SRC_CAT, obj: element, "Stopped");
}
fn start(&self, element: &gst::Element) {
gst_debug!(SRC_CAT, obj: element, "Starting");
- self.task.start();
+ self.task.start().unwrap();
gst_debug!(SRC_CAT, obj: element, "Started");
}
fn pause(&self, element: &gst::Element) {
gst_debug!(SRC_CAT, obj: element, "Pausing");
- self.task.pause();
+ self.task.pause().unwrap();
gst_debug!(SRC_CAT, obj: element, "Paused");
}
}
@@ -405,10 +407,10 @@ impl ElementImpl for ElementSrcTest {
fn send_event(&self, _element: &gst::Element, event: gst::Event) -> bool {
match event.view() {
EventView::FlushStart(..) => {
- self.task.flush_start();
+ self.task.flush_start().unwrap();
}
EventView::FlushStop(..) => {
- self.task.flush_stop();
+ self.task.flush_stop().unwrap();
}
_ => (),
}