Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.freedesktop.org/gstreamer/gst-plugins-rs.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorFrançois Laignel <fengalin@free.fr>2022-08-17 20:14:37 +0300
committerFrançois Laignel <fengalin@free.fr>2022-08-18 19:42:18 +0300
commit2bb071a950c1b78b8e324c916294251ca822fb37 (patch)
treeca94f380d5eedefaa31dbbe50b762d1d5d127eee /generic/threadshare/src/runtime
parent57da8e649dddf984929d0bd1ceaae29c522e03c1 (diff)
ts/runtime: slight optimizations for sub tasks related operations
Using callgrind with the standalone test showed opportunities for improvements for sub tasks addition and drain. All sub task additions were performed after making sure we were operating on a Context Task. The Context and Task were checked again when adding the sub task. Draining sub tasks was perfomed in a loop on every call places, checking whether there were remaining sub tasks first. This commit implements the loop and checks directly in `executor::Task::drain_subtasks`, saving one `Mutex` lock and one `thread_local` access per iteration when there are sub tasks to drain. The `PadSink` functions wrapper were performing redundant checks on the `Context` presence and were adding the delayed Future only when there were already sub tasks.
Diffstat (limited to 'generic/threadshare/src/runtime')
-rw-r--r--generic/threadshare/src/runtime/executor/context.rs68
-rw-r--r--generic/threadshare/src/runtime/executor/scheduler.rs12
-rw-r--r--generic/threadshare/src/runtime/executor/task.rs60
-rw-r--r--generic/threadshare/src/runtime/pad.rs54
-rw-r--r--generic/threadshare/src/runtime/task.rs47
5 files changed, 85 insertions, 156 deletions
diff --git a/generic/threadshare/src/runtime/executor/context.rs b/generic/threadshare/src/runtime/executor/context.rs
index 371af2202..f1555627a 100644
--- a/generic/threadshare/src/runtime/executor/context.rs
+++ b/generic/threadshare/src/runtime/executor/context.rs
@@ -1,5 +1,5 @@
// Copyright (C) 2018-2020 Sebastian Dröge <sebastian@centricular.com>
-// Copyright (C) 2019-2021 François Laignel <fengalin@free.fr>
+// Copyright (C) 2019-2022 François Laignel <fengalin@free.fr>
//
// Take a look at the license at the top of the repository in the LICENSE file.
@@ -55,7 +55,7 @@ where
cur_task_id,
cur_context.name()
);
- let _ = Context::add_sub_task(async move {
+ let _ = cur_context.add_sub_task(cur_task_id, async move {
future.await;
Ok(())
});
@@ -199,7 +199,10 @@ impl Context {
/// Returns the `TaskId` running on current thread, if any.
pub fn current_task() -> Option<(Context, TaskId)> {
- Scheduler::current().map(Context).zip(TaskId::current())
+ Scheduler::current().map(|scheduler| {
+ // Context users always operate on a Task
+ (Context(scheduler), TaskId::current().unwrap())
+ })
}
/// Executes the provided function relatively to this [`Context`].
@@ -265,31 +268,11 @@ impl Context {
self.0.unpark();
}
- pub fn current_has_sub_tasks() -> bool {
- let (ctx, task_id) = match Context::current_task() {
- Some(task) => task,
- None => {
- gst::trace!(RUNTIME_CAT, "No current task");
- return false;
- }
- };
-
- ctx.0.has_sub_tasks(task_id)
- }
-
- pub fn add_sub_task<T>(sub_task: T) -> Result<(), T>
+ pub fn add_sub_task<T>(&self, task_id: TaskId, sub_task: T) -> Result<(), T>
where
T: Future<Output = SubTaskOutput> + Send + 'static,
{
- let (ctx, task_id) = match Context::current_task() {
- Some(task) => task,
- None => {
- gst::trace!(RUNTIME_CAT, "No current task");
- return Err(sub_task);
- }
- };
-
- ctx.0.add_sub_task(task_id, sub_task)
+ self.0.add_sub_task(task_id, sub_task)
}
pub async fn drain_sub_tasks() -> SubTaskOutput {
@@ -339,7 +322,7 @@ mod tests {
assert_eq!(ctx.name(), Scheduler::DUMMY_NAME);
assert_eq!(task_id, super::TaskId(0));
- let res = Context::add_sub_task(async move {
+ let res = ctx.add_sub_task(task_id, async move {
let (_ctx, task_id) = Context::current_task().unwrap();
assert_eq!(task_id, super::TaskId(0));
Ok(())
@@ -381,10 +364,10 @@ mod tests {
let ctx_weak = context.downgrade();
let join_handle = context.spawn(async move {
- let (_ctx, task_id) = Context::current_task().unwrap();
+ let (ctx, task_id) = Context::current_task().unwrap();
assert_eq!(task_id, TaskId(0));
- let res = Context::add_sub_task(async move {
+ let res = ctx.add_sub_task(task_id, async move {
let (_ctx, task_id) = Context::current_task().unwrap();
assert_eq!(task_id, TaskId(0));
Ok(())
@@ -395,10 +378,10 @@ mod tests {
.upgrade()
.unwrap()
.spawn(async {
- let (_ctx, task_id) = Context::current_task().unwrap();
+ let (ctx, task_id) = Context::current_task().unwrap();
assert_eq!(task_id, TaskId(1));
- let res = Context::add_sub_task(async move {
+ let res = ctx.add_sub_task(task_id, async move {
let (_ctx, task_id) = Context::current_task().unwrap();
assert_eq!(task_id, TaskId(1));
Ok(())
@@ -433,14 +416,19 @@ mod tests {
let add_sub_task = move |item| {
let sender = sender.clone();
- Context::add_sub_task(async move {
- sender
- .lock()
- .await
- .send(item)
- .await
- .map_err(|_| gst::FlowError::Error)
- })
+ Context::current_task()
+ .ok_or(())
+ .and_then(|(ctx, task_id)| {
+ ctx.add_sub_task(task_id, async move {
+ sender
+ .lock()
+ .await
+ .send(item)
+ .await
+ .map_err(|_| gst::FlowError::Error)
+ })
+ .map_err(drop)
+ })
};
// Tests
@@ -450,7 +438,7 @@ mod tests {
drain_fut.await.unwrap();
// Add a subtask
- add_sub_task(0).map_err(drop).unwrap();
+ add_sub_task(0).unwrap();
// Check that it was not executed yet
receiver.try_next().unwrap_err();
@@ -461,7 +449,7 @@ mod tests {
assert_eq!(receiver.try_next().unwrap(), Some(0));
// Add another task and check that it's not executed yet
- add_sub_task(1).map_err(drop).unwrap();
+ add_sub_task(1).unwrap();
receiver.try_next().unwrap_err();
// Return the receiver
diff --git a/generic/threadshare/src/runtime/executor/scheduler.rs b/generic/threadshare/src/runtime/executor/scheduler.rs
index a5f1eaf68..53a7fab7f 100644
--- a/generic/threadshare/src/runtime/executor/scheduler.rs
+++ b/generic/threadshare/src/runtime/executor/scheduler.rs
@@ -1,5 +1,5 @@
// Copyright (C) 2018-2020 Sebastian Dröge <sebastian@centricular.com>
-// Copyright (C) 2019-2021 François Laignel <fengalin@free.fr>
+// Copyright (C) 2019-2022 François Laignel <fengalin@free.fr>
//
// Take a look at the license at the top of the repository in the LICENSE file.
@@ -132,11 +132,7 @@ impl Scheduler {
let res = future.await;
let task_id = TaskId::current().unwrap();
- while handle.has_sub_tasks(task_id) {
- if handle.drain_sub_tasks(task_id).await.is_err() {
- break;
- }
- }
+ let _ = handle.drain_sub_tasks(task_id).await;
res
});
@@ -411,10 +407,6 @@ impl Handle {
self.0.scheduler.unpark();
}
- pub fn has_sub_tasks(&self, task_id: TaskId) -> bool {
- self.0.scheduler.tasks.has_sub_tasks(task_id)
- }
-
pub fn add_sub_task<T>(&self, task_id: TaskId, sub_task: T) -> Result<(), T>
where
T: Future<Output = SubTaskOutput> + Send + 'static,
diff --git a/generic/threadshare/src/runtime/executor/task.rs b/generic/threadshare/src/runtime/executor/task.rs
index 935b5f9e0..4b883ced6 100644
--- a/generic/threadshare/src/runtime/executor/task.rs
+++ b/generic/threadshare/src/runtime/executor/task.rs
@@ -1,5 +1,5 @@
// Copyright (C) 2018-2020 Sebastian Dröge <sebastian@centricular.com>
-// Copyright (C) 2019-2021 François Laignel <fengalin@free.fr>
+// Copyright (C) 2019-2022 François Laignel <fengalin@free.fr>
//
// Take a look at the license at the top of the repository in the LICENSE file.
@@ -91,10 +91,6 @@ impl Task {
{
self.sub_tasks.push_back(sub_task.boxed());
}
-
- fn drain_sub_tasks(&mut self) -> VecDeque<BoxFuture<'static, SubTaskOutput>> {
- std::mem::take(&mut self.sub_tasks)
- }
}
impl fmt::Debug for Task {
@@ -240,15 +236,6 @@ impl TaskQueue {
self.runnables.pop()
}
- pub fn has_sub_tasks(&self, task_id: TaskId) -> bool {
- self.tasks
- .lock()
- .unwrap()
- .get(task_id.0)
- .map(|t| !t.sub_tasks.is_empty())
- .unwrap_or(false)
- }
-
pub fn add_sub_task<T>(&self, task_id: TaskId, sub_task: T) -> Result<(), T>
where
T: Future<Output = SubTaskOutput> + Send + 'static,
@@ -272,35 +259,24 @@ impl TaskQueue {
}
}
- pub fn drain_sub_tasks(
- &self,
- task_id: TaskId,
- ) -> impl Future<Output = SubTaskOutput> + Send + 'static {
- let sub_tasks = self
- .tasks
- .lock()
- .unwrap()
- .get_mut(task_id.0)
- .map(|task| (task.drain_sub_tasks(), Arc::clone(&self.context_name)));
-
- async move {
- if let Some((mut sub_tasks, context_name)) = sub_tasks {
- if !sub_tasks.is_empty() {
- gst::log!(
- RUNTIME_CAT,
- "Scheduling draining {} sub tasks from {:?} on '{}'",
- sub_tasks.len(),
- task_id,
- &context_name,
- );
-
- for sub_task in sub_tasks.drain(..) {
- sub_task.await?;
- }
- }
- }
+ pub async fn drain_sub_tasks(&self, task_id: TaskId) -> SubTaskOutput {
+ loop {
+ let mut sub_tasks = match self.tasks.lock().unwrap().get_mut(task_id.0) {
+ Some(task) if !task.sub_tasks.is_empty() => std::mem::take(&mut task.sub_tasks),
+ _ => return Ok(()),
+ };
- Ok(())
+ gst::trace!(
+ RUNTIME_CAT,
+ "Scheduling draining {} sub tasks from {:?} on '{}'",
+ sub_tasks.len(),
+ task_id,
+ self.context_name,
+ );
+
+ for sub_task in sub_tasks.drain(..) {
+ sub_task.await?;
+ }
}
}
}
diff --git a/generic/threadshare/src/runtime/pad.rs b/generic/threadshare/src/runtime/pad.rs
index e54537086..89e144776 100644
--- a/generic/threadshare/src/runtime/pad.rs
+++ b/generic/threadshare/src/runtime/pad.rs
@@ -1,4 +1,4 @@
-// Copyright (C) 2019-2020 François Laignel <fengalin@free.fr>
+// Copyright (C) 2019-2022 François Laignel <fengalin@free.fr>
// Copyright (C) 2020 Sebastian Dröge <sebastian@centricular.com>
//
// This library is free software; you can redistribute it and/or
@@ -80,7 +80,7 @@ use std::marker::PhantomData;
use std::ops::Deref;
use std::sync::{Arc, Weak};
-use super::executor::{block_on_or_add_sub_task, Context};
+use super::executor::{self, Context};
use super::RUNTIME_CAT;
#[inline]
@@ -234,9 +234,7 @@ impl PadSrcInner {
})?;
gst::log!(RUNTIME_CAT, obj: &self.gst_pad, "Processing any pending sub tasks");
- while Context::current_has_sub_tasks() {
- Context::drain_sub_tasks().await?;
- }
+ Context::drain_sub_tasks().await?;
Ok(success)
}
@@ -255,9 +253,7 @@ impl PadSrcInner {
})?;
gst::log!(RUNTIME_CAT, obj: &self.gst_pad, "Processing any pending sub tasks");
- while Context::current_has_sub_tasks() {
- Context::drain_sub_tasks().await?;
- }
+ Context::drain_sub_tasks().await?;
Ok(success)
}
@@ -268,10 +264,8 @@ impl PadSrcInner {
let was_handled = self.gst_pad().push_event(event);
gst::log!(RUNTIME_CAT, obj: &self.gst_pad, "Processing any pending sub tasks");
- while Context::current_has_sub_tasks() {
- if Context::drain_sub_tasks().await.is_err() {
- return false;
- }
+ if Context::drain_sub_tasks().await.is_err() {
+ return false;
}
was_handled
@@ -758,18 +752,6 @@ impl<'a> PadSinkRef<'a> {
Ok(())
}
-
- fn handle_future(
- &self,
- fut: impl Future<Output = Result<FlowSuccess, FlowError>> + Send + 'static,
- ) -> Result<FlowSuccess, FlowError> {
- if let Err(fut) = Context::add_sub_task(fut.map(|res| res.map(drop))) {
- block_on_or_add_sub_task(fut.map(|res| res.map(|_| gst::FlowSuccess::Ok)))
- .unwrap_or(Ok(gst::FlowSuccess::Ok))
- } else {
- Ok(gst::FlowSuccess::Ok)
- }
- }
}
impl<'a> Deref for PadSinkRef<'a> {
@@ -876,7 +858,7 @@ impl PadSink {
parent,
|| Err(FlowError::Error),
move |imp, element| {
- if Context::current_has_sub_tasks() {
+ if let Some((ctx, task_id)) = Context::current_task() {
let this_weak = PadSinkWeak(Arc::downgrade(&inner_arc));
let handler = handler.clone();
let element =
@@ -889,7 +871,8 @@ impl PadSink {
this_weak.upgrade().ok_or(gst::FlowError::Flushing)?;
handler.sink_chain(&this_ref, imp, &element, buffer).await
};
- let _ = Context::add_sub_task(delayed_fut.map(|res| res.map(drop)));
+ let _ =
+ ctx.add_sub_task(task_id, delayed_fut.map(|res| res.map(drop)));
Ok(gst::FlowSuccess::Ok)
} else {
@@ -900,7 +883,7 @@ impl PadSink {
element.dynamic_cast_ref::<gst::Element>().unwrap(),
buffer,
);
- this_ref.handle_future(chain_fut)
+ executor::block_on(chain_fut)
}
},
)
@@ -916,7 +899,7 @@ impl PadSink {
parent,
|| Err(FlowError::Error),
move |imp, element| {
- if Context::current_has_sub_tasks() {
+ if let Some((ctx, task_id)) = Context::current_task() {
let this_weak = PadSinkWeak(Arc::downgrade(&inner_arc));
let handler = handler.clone();
let element =
@@ -931,7 +914,8 @@ impl PadSink {
.sink_chain_list(&this_ref, imp, &element, list)
.await
};
- let _ = Context::add_sub_task(delayed_fut.map(|res| res.map(drop)));
+ let _ =
+ ctx.add_sub_task(task_id, delayed_fut.map(|res| res.map(drop)));
Ok(gst::FlowSuccess::Ok)
} else {
@@ -942,7 +926,7 @@ impl PadSink {
element.dynamic_cast_ref::<gst::Element>().unwrap(),
list,
);
- this_ref.handle_future(chain_list_fut)
+ executor::block_on(chain_list_fut)
}
},
)
@@ -961,7 +945,7 @@ impl PadSink {
|| Err(FlowError::Error),
move |imp, element| {
if event.is_serialized() {
- if Context::current_has_sub_tasks() {
+ if let Some((ctx, task_id)) = Context::current_task() {
let this_weak = PadSinkWeak(Arc::downgrade(&inner_arc));
let handler = handler.clone();
let element =
@@ -980,8 +964,10 @@ impl PadSink {
)
.await
};
- let _ =
- Context::add_sub_task(delayed_fut.map(|res| res.map(drop)));
+ let _ = ctx.add_sub_task(
+ task_id,
+ delayed_fut.map(|res| res.map(drop)),
+ );
Ok(gst::FlowSuccess::Ok)
} else {
@@ -992,7 +978,7 @@ impl PadSink {
element.dynamic_cast_ref::<gst::Element>().unwrap(),
event,
);
- this_ref.handle_future(event_fut)
+ executor::block_on(event_fut)
}
} else {
let this_ref = PadSinkRef::new(inner_arc);
diff --git a/generic/threadshare/src/runtime/task.rs b/generic/threadshare/src/runtime/task.rs
index abf23e1b0..137ceef92 100644
--- a/generic/threadshare/src/runtime/task.rs
+++ b/generic/threadshare/src/runtime/task.rs
@@ -203,14 +203,14 @@ impl TransitionStatus {
origin,
res_fut,
} => {
- if let Some(cur_ctx) = Context::current() {
+ if let Some((ctx, task_id)) = Context::current_task() {
gst::debug!(
RUNTIME_CAT,
"Awaiting for {:?} ack in a subtask on context {}",
trigger,
- cur_ctx.name()
+ ctx.name()
);
- let _ = Context::add_sub_task(async move {
+ let _ = ctx.add_sub_task(task_id, async move {
let res = res_fut.await;
if res.is_ok() {
gst::log!(RUNTIME_CAT, "Received ack {:?} for {:?}", res, trigger);
@@ -826,18 +826,14 @@ macro_rules! exec_action {
($self:ident, $action:ident, $triggering_evt:expr, $origin:expr, $task_inner:expr) => {{
match $self.task_impl.$action().await {
Ok(()) => {
- let mut res;
- while Context::current_has_sub_tasks() {
- gst::trace!(RUNTIME_CAT, "Draining subtasks for {}", stringify!($action));
- res = Context::drain_sub_tasks().await.map_err(|err| {
- let msg = format!("{} subtask returned {:?}", stringify!($action), err);
- gst::log!(RUNTIME_CAT, "{}", &msg);
- gst::error_msg!(gst::CoreError::StateChange, ["{}", &msg])
- });
-
- if res.is_err() {
- break;
- }
+ gst::trace!(RUNTIME_CAT, "Draining subtasks for {}", stringify!($action));
+ if let Err(err) = Context::drain_sub_tasks().await {
+ gst::log!(
+ RUNTIME_CAT,
+ "{} subtask returned {:?}",
+ stringify!($action),
+ err
+ );
}
Ok($triggering_evt)
@@ -1097,15 +1093,9 @@ impl<Item: Send + 'static> StateMachine<Item> {
// Unprepare is not joined by an ack_rx but by joining the state machine handle
self.task_impl.unprepare().await;
- while Context::current_has_sub_tasks() {
- gst::trace!(RUNTIME_CAT, "Draining subtasks for unprepare");
- let res = Context::drain_sub_tasks().await.map_err(|err| {
- gst::log!(RUNTIME_CAT, "unprepare subtask returned {:?}", err);
- err
- });
- if res.is_err() {
- break;
- }
+ gst::trace!(RUNTIME_CAT, "Draining subtasks for unprepare");
+ if let Err(err) = Context::drain_sub_tasks().await {
+ gst::log!(RUNTIME_CAT, "unprepare subtask returned {:?}", err);
}
task_inner
@@ -1185,12 +1175,9 @@ impl<Item: Send + 'static> StateMachine<Item> {
err
})?;
- while Context::current_has_sub_tasks() {
- gst::trace!(RUNTIME_CAT, "Draining subtasks after Task loop iteration");
- Context::drain_sub_tasks().await.map_err(|err| {
- gst::debug!(RUNTIME_CAT, "Task iteration subtask returned {:?}", err);
- err
- })?;
+ gst::trace!(RUNTIME_CAT, "Draining subtasks after Task loop iteration");
+ if let Err(err) = Context::drain_sub_tasks().await {
+ gst::debug!(RUNTIME_CAT, "Task iteration subtask returned {:?}", err);
}
}
}