Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.freedesktop.org/gstreamer/gst-plugins-rs.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorFrançois Laignel <fengalin@free.fr>2022-08-19 15:50:40 +0300
committerSebastian Dröge <slomo@coaxion.net>2022-09-13 10:29:50 +0300
commitd39aabe05477b858adc6bd32f7da90c505894ad1 (patch)
treebf821a5639738286b5992682d92a06596a6c6649 /generic
parentaf12bce141f5f3ec03e0128ef34a203fa7df9691 (diff)
ts/Task: don't drain sub tasks after state transition and iteration
Subtasks are used when current async processing needs to execute a `Future` via a sync function (eg. a call to a C function). In this case `Context::block_on` would block the whole `Context`, leading to a deadlock. The main use case for this is the `Pad{Src,Sink}` functions: when we `PadSrc::push` and the peer pad is a `PadSink`, we want `PadSrc::push` to complete after the async function on the `PadSink` completes. In this case the `PadSink` async function is added as a subtask of current scheduler task and `PadSrc::push` only returns when the subtask is executed. In `runtime::Task` (`Task` here is the execution Task with a state machine, not a scheduler task), we used to spawn state transition actions and iteration loop (leading to a new scheduler Task). At the time, it seemed convenient for the user to automatically drain sub tasks after a state transition action or an iteration. User wouldn't have to worry about this, similarly to the `Pad{Src,Sink}` case. In current implementation, the `Task` state machine now operates directly on the target `Context`. State transtions actions and the iteration loop are no longer spawned. It seems now useless to abstract the subtasks draining from the user. Either they transitively use a mechanism such as `Pad{Src,Sink}` which already handles this automatically, or they add substasks on purpose, in which case they know better when subtasks must be drained.
Diffstat (limited to 'generic')
-rw-r--r--generic/threadshare/src/runtime/task.rs54
1 files changed, 17 insertions, 37 deletions
diff --git a/generic/threadshare/src/runtime/task.rs b/generic/threadshare/src/runtime/task.rs
index 7bbb35c4e..2277df941 100644
--- a/generic/threadshare/src/runtime/task.rs
+++ b/generic/threadshare/src/runtime/task.rs
@@ -28,7 +28,6 @@ use futures::prelude::*;
use std::fmt;
use std::ops::Deref;
use std::pin::Pin;
-use std::stringify;
use std::sync::{Arc, Mutex, MutexGuard};
use std::task::Poll;
@@ -376,8 +375,7 @@ pub trait TaskImpl: Send + 'static {
/// Handles an error occuring during the execution of the `Task` loop.
///
- /// This include errors returned by [`Self::try_next`], [`Self::handle_item`]
- /// as well as errors returned by subtasks drain at the end of an iteration.
+ /// This include errors returned by [`Self::try_next`] & [`Self::handle_item`].
///
/// If the error is unrecoverable, implementations might use
/// `gst::Element::post_error_message` and return `Trigger::Error`.
@@ -817,27 +815,10 @@ struct StateMachine<Item: Send + 'static> {
pending_triggering_evt: Option<TriggeringEvent>,
}
-// Make sure the Context doesn't throttle otherwise we end up with long delays executing
-// transition actions in a pipeline with many elements. This is because pipeline serializes
-// the transition actions and the Context's scheduler gets a chance to reach its throttling
-// state between 2 elements.
-
macro_rules! exec_action {
($self:ident, $action:ident, $triggering_evt:expr, $origin:expr, $task_inner:expr) => {{
match $self.task_impl.$action().await {
- Ok(()) => {
- gst::trace!(RUNTIME_CAT, "Draining subtasks for {}", stringify!($action));
- if let Err(err) = Context::drain_sub_tasks().await {
- gst::log!(
- RUNTIME_CAT,
- "{} subtask returned {:?}",
- stringify!($action),
- err
- );
- }
-
- Ok($triggering_evt)
- }
+ Ok(()) => Ok($triggering_evt),
Err(err) => {
// FIXME problem is that we loose the origin trigger in the
// final TransitionStatus.
@@ -1093,11 +1074,6 @@ impl<Item: Send + 'static> StateMachine<Item> {
// Unprepare is not joined by an ack_rx but by joining the state machine handle
self.task_impl.unprepare().await;
- gst::trace!(RUNTIME_CAT, "Draining subtasks for unprepare");
- if let Err(err) = Context::drain_sub_tasks().await {
- gst::log!(RUNTIME_CAT, "unprepare subtask returned {:?}", err);
- }
-
task_inner
.lock()
.unwrap()
@@ -1174,11 +1150,6 @@ impl<Item: Send + 'static> StateMachine<Item> {
gst::debug!(RUNTIME_CAT, "TaskImpl::handle_item returned {:?}", err);
err
})?;
-
- gst::trace!(RUNTIME_CAT, "Draining subtasks after Task loop iteration");
- if let Err(err) = Context::drain_sub_tasks().await {
- gst::debug!(RUNTIME_CAT, "Task iteration subtask returned {:?}", err);
- }
}
}
}
@@ -1187,10 +1158,19 @@ impl<Item: Send + 'static> StateMachine<Item> {
mod tests {
use futures::channel::{mpsc, oneshot};
use futures::executor::block_on;
+ use futures::future::BoxFuture;
+ use futures::prelude::*;
use std::time::Duration;
- use super::{TaskState::*, TransitionOk::*, TransitionStatus::*, Trigger::*, *};
- use crate::runtime::Context;
+ use super::{
+ Task, TaskImpl,
+ TaskState::{self, *},
+ TransitionError,
+ TransitionOk::*,
+ TransitionStatus::*,
+ Trigger::{self, *},
+ };
+ use crate::runtime::{Context, RUNTIME_CAT};
#[track_caller]
fn stop_then_unprepare(task: Task) {
@@ -1534,7 +1514,7 @@ mod tests {
err
);
match (trigger, state) {
- (Trigger::Prepare, TaskState::Unprepared) => {
+ (Prepare, Unprepared) => {
self.prepare_error_sender.send(()).await.unwrap();
}
other => unreachable!("{:?}", other),
@@ -1772,7 +1752,7 @@ mod tests {
err
);
match (trigger, state) {
- (Trigger::Prepare, TaskState::Unprepared) => {
+ (Prepare, Unprepared) => {
self.prepare_error_sender.send(()).await.unwrap();
}
other => panic!("action error for {:?}", other),
@@ -2789,11 +2769,11 @@ mod tests {
gst::debug!(RUNTIME_CAT, "flush_concurrent_start: // start");
match block_on(task.start()) {
- Ok(TransitionOk::Complete {
+ Ok(Complete {
origin: Paused,
target: Started,
}) => (),
- Ok(TransitionOk::Complete {
+ Ok(Complete {
origin: PausedFlushing,
target: Flushing,
}) => (),