Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.freedesktop.org/gstreamer/gst-plugins-rs.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorFrançois Laignel <fengalin@free.fr>2022-03-22 19:18:18 +0300
committerSebastian Dröge <sebastian@centricular.com>2022-03-28 12:50:57 +0300
commit0b615409ac5acb05fa6af402da3ce39d42f6e173 (patch)
tree4b6df8cd9fa77eb2270b28e192a4814f02b5bdc6
parenta17cdf990338793af4fdb8082037bb4c530369ec (diff)
ts/rt/Task: use light weight executor blocking on ack or join handle
Previous version used the Context::block_on_or_add_sub_task which spawns a full-fledged executor with timer and io Reactor for no reason when we just need to wait for a Receiver or JoinHandle.
-rw-r--r--generic/threadshare/src/runtime/task.rs55
1 files changed, 41 insertions, 14 deletions
diff --git a/generic/threadshare/src/runtime/task.rs b/generic/threadshare/src/runtime/task.rs
index 8110042f5..903d5a5ec 100644
--- a/generic/threadshare/src/runtime/task.rs
+++ b/generic/threadshare/src/runtime/task.rs
@@ -33,7 +33,7 @@ use std::ops::Deref;
use std::stringify;
use std::sync::{Arc, Mutex, MutexGuard};
-use super::executor::{block_on_or_add_sub_task, TaskId};
+use super::executor::TaskId;
use super::{Context, RUNTIME_CAT};
#[derive(Debug, Eq, PartialEq, Ord, PartialOrd, Hash, Clone, Copy)]
@@ -526,25 +526,35 @@ impl Task {
match state_machine_handle {
Some(state_machine_handle) => {
- gst_log!(
- RUNTIME_CAT,
- "Synchronously waiting for the state machine {:?}",
- state_machine_handle,
- );
- let join_fut = block_on_or_add_sub_task(async {
+ let state_machine_end_fut = async {
state_machine_handle.await;
drop(triggering_evt_tx);
drop(context);
gst_debug!(RUNTIME_CAT, "Task unprepared");
- });
+ };
+
+ if let Some((cur_context, cur_task_id)) = Context::current_task() {
+ gst_log!(
+ RUNTIME_CAT,
+ "Will wait for state machine termination completion in subtask to task {:?} on context {}",
+ cur_task_id,
+ cur_context.name()
+ );
+ let _ = Context::add_sub_task(state_machine_end_fut.map(|_| Ok(())));
- if join_fut.is_none() {
return Ok(TransitionStatus::Async {
trigger: Trigger::Unprepare,
origin,
});
+ } else {
+ gst_log!(
+ RUNTIME_CAT,
+ "Waiting for state machine termination on current thread"
+ );
+ // Use a light-weight executor, no timer nor async io.
+ futures::executor::block_on(state_machine_end_fut)
}
}
None => {
@@ -654,7 +664,7 @@ impl Task {
drop(inner);
- block_on_or_add_sub_task(async move {
+ let ack_await_fut = async move {
gst_trace!(RUNTIME_CAT, "Awaiting ack for {:?}", trigger);
let res = ack_rx.await.unwrap();
@@ -665,11 +675,28 @@ impl Task {
}
res
- })
- .unwrap_or({
- // Future was spawned as a subtask
+ };
+
+ if let Some((cur_context, cur_task_id)) = Context::current_task() {
+ gst_log!(
+ RUNTIME_CAT,
+ "Will await ack for {:?} in subtask to task {:?} on context {}",
+ trigger,
+ cur_task_id,
+ cur_context.name()
+ );
+ let _ = Context::add_sub_task(ack_await_fut.map(|_| Ok(())));
+
Ok(TransitionStatus::Async { trigger, origin })
- })
+ } else {
+ gst_log!(
+ RUNTIME_CAT,
+ "Awaiting ack for {:?} on current thread",
+ trigger
+ );
+ // Use a light-weight executor, no timer nor async io.
+ futures::executor::block_on(ack_await_fut)
+ }
}
fn spawn_state_machine(