Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.freedesktop.org/gstreamer/gst-plugins-rs.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
Diffstat (limited to 'generic')
-rw-r--r--generic/threadshare/src/runtime/task.rs54
1 files changed, 17 insertions, 37 deletions
diff --git a/generic/threadshare/src/runtime/task.rs b/generic/threadshare/src/runtime/task.rs
index 7bbb35c4e..2277df941 100644
--- a/generic/threadshare/src/runtime/task.rs
+++ b/generic/threadshare/src/runtime/task.rs
@@ -28,7 +28,6 @@ use futures::prelude::*;
use std::fmt;
use std::ops::Deref;
use std::pin::Pin;
-use std::stringify;
use std::sync::{Arc, Mutex, MutexGuard};
use std::task::Poll;
@@ -376,8 +375,7 @@ pub trait TaskImpl: Send + 'static {
/// Handles an error occuring during the execution of the `Task` loop.
///
- /// This include errors returned by [`Self::try_next`], [`Self::handle_item`]
- /// as well as errors returned by subtasks drain at the end of an iteration.
+ /// This include errors returned by [`Self::try_next`] & [`Self::handle_item`].
///
/// If the error is unrecoverable, implementations might use
/// `gst::Element::post_error_message` and return `Trigger::Error`.
@@ -817,27 +815,10 @@ struct StateMachine<Item: Send + 'static> {
pending_triggering_evt: Option<TriggeringEvent>,
}
-// Make sure the Context doesn't throttle otherwise we end up with long delays executing
-// transition actions in a pipeline with many elements. This is because pipeline serializes
-// the transition actions and the Context's scheduler gets a chance to reach its throttling
-// state between 2 elements.
-
macro_rules! exec_action {
($self:ident, $action:ident, $triggering_evt:expr, $origin:expr, $task_inner:expr) => {{
match $self.task_impl.$action().await {
- Ok(()) => {
- gst::trace!(RUNTIME_CAT, "Draining subtasks for {}", stringify!($action));
- if let Err(err) = Context::drain_sub_tasks().await {
- gst::log!(
- RUNTIME_CAT,
- "{} subtask returned {:?}",
- stringify!($action),
- err
- );
- }
-
- Ok($triggering_evt)
- }
+ Ok(()) => Ok($triggering_evt),
Err(err) => {
// FIXME problem is that we loose the origin trigger in the
// final TransitionStatus.
@@ -1093,11 +1074,6 @@ impl<Item: Send + 'static> StateMachine<Item> {
// Unprepare is not joined by an ack_rx but by joining the state machine handle
self.task_impl.unprepare().await;
- gst::trace!(RUNTIME_CAT, "Draining subtasks for unprepare");
- if let Err(err) = Context::drain_sub_tasks().await {
- gst::log!(RUNTIME_CAT, "unprepare subtask returned {:?}", err);
- }
-
task_inner
.lock()
.unwrap()
@@ -1174,11 +1150,6 @@ impl<Item: Send + 'static> StateMachine<Item> {
gst::debug!(RUNTIME_CAT, "TaskImpl::handle_item returned {:?}", err);
err
})?;
-
- gst::trace!(RUNTIME_CAT, "Draining subtasks after Task loop iteration");
- if let Err(err) = Context::drain_sub_tasks().await {
- gst::debug!(RUNTIME_CAT, "Task iteration subtask returned {:?}", err);
- }
}
}
}
@@ -1187,10 +1158,19 @@ impl<Item: Send + 'static> StateMachine<Item> {
mod tests {
use futures::channel::{mpsc, oneshot};
use futures::executor::block_on;
+ use futures::future::BoxFuture;
+ use futures::prelude::*;
use std::time::Duration;
- use super::{TaskState::*, TransitionOk::*, TransitionStatus::*, Trigger::*, *};
- use crate::runtime::Context;
+ use super::{
+ Task, TaskImpl,
+ TaskState::{self, *},
+ TransitionError,
+ TransitionOk::*,
+ TransitionStatus::*,
+ Trigger::{self, *},
+ };
+ use crate::runtime::{Context, RUNTIME_CAT};
#[track_caller]
fn stop_then_unprepare(task: Task) {
@@ -1534,7 +1514,7 @@ mod tests {
err
);
match (trigger, state) {
- (Trigger::Prepare, TaskState::Unprepared) => {
+ (Prepare, Unprepared) => {
self.prepare_error_sender.send(()).await.unwrap();
}
other => unreachable!("{:?}", other),
@@ -1772,7 +1752,7 @@ mod tests {
err
);
match (trigger, state) {
- (Trigger::Prepare, TaskState::Unprepared) => {
+ (Prepare, Unprepared) => {
self.prepare_error_sender.send(()).await.unwrap();
}
other => panic!("action error for {:?}", other),
@@ -2789,11 +2769,11 @@ mod tests {
gst::debug!(RUNTIME_CAT, "flush_concurrent_start: // start");
match block_on(task.start()) {
- Ok(TransitionOk::Complete {
+ Ok(Complete {
origin: Paused,
target: Started,
}) => (),
- Ok(TransitionOk::Complete {
+ Ok(Complete {
origin: PausedFlushing,
target: Flushing,
}) => (),