Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.freedesktop.org/gstreamer/gst-plugins-rs.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorFrançois Laignel <fengalin@free.fr>2022-08-31 00:56:01 +0300
committerSebastian Dröge <slomo@coaxion.net>2022-09-13 10:29:50 +0300
commitab327be9af44e73dd9572e9c30e1368a41929013 (patch)
treeb4e603983f61ba941096967b9882e0637e92e058 /generic
parentd39aabe05477b858adc6bd32f7da90c505894ad1 (diff)
ts/scheduler: improve tasks / io & timers polling balance
Set a limit to the nb of task checked before checking the reactor and the main future again.
Diffstat (limited to 'generic')
-rw-r--r--generic/threadshare/src/runtime/executor/scheduler.rs96
1 files changed, 59 insertions, 37 deletions
diff --git a/generic/threadshare/src/runtime/executor/scheduler.rs b/generic/threadshare/src/runtime/executor/scheduler.rs
index ba4d6fa62..4ce1c008e 100644
--- a/generic/threadshare/src/runtime/executor/scheduler.rs
+++ b/generic/threadshare/src/runtime/executor/scheduler.rs
@@ -42,6 +42,7 @@ pub(super) struct Scheduler {
impl Scheduler {
pub const DUMMY_NAME: &'static str = "DUMMY";
+ const MAX_SUCCESSIVE_TASKS: usize = 64;
pub fn start(context_name: &str, max_throttling: Duration) -> Handle {
// Name the thread so that it appears in panic messages.
@@ -178,52 +179,73 @@ impl Scheduler {
let _guard = CallOnDrop::new(|| Scheduler::close(Arc::clone(&self.context_name)));
- if let Poll::Ready(t) = future.as_mut().poll(cx) {
- return Ok(t);
- }
-
- let mut last;
- loop {
- last = Instant::now();
- Reactor::with_mut(|reactor| reactor.react(last).ok());
+ let mut now;
+ // This is to ensure reactor invocation on the first iteration.
+ let mut last_react = Instant::now() - self.max_throttling;
+ let mut tasks_checked;
+ 'main: loop {
+ // Only check I/O and timers every `max_throttling`.
+ now = Instant::now();
+ if now - last_react >= self.max_throttling {
+ last_react = now;
+ Reactor::with_mut(|reactor| reactor.react(now).ok());
+ }
if let Poll::Ready(t) = future.as_mut().poll(cx) {
return Ok(t);
}
- while let Ok(runnable) = self.tasks.pop_runnable() {
- panic::catch_unwind(|| runnable.run()).map_err(|err| {
- gst::error!(
- RUNTIME_CAT,
- "A task has panicked within Context {}",
- self.context_name
- );
-
- err
- })?;
- }
+ tasks_checked = 0;
+ while tasks_checked < Self::MAX_SUCCESSIVE_TASKS {
+ if let Ok(runnable) = self.tasks.pop_runnable() {
+ panic::catch_unwind(|| runnable.run()).map_err(|err| {
+ gst::error!(
+ RUNTIME_CAT,
+ "A task has panicked within Context {}",
+ self.context_name
+ );
- let mut must_unpark = self.must_unpark.lock().unwrap();
- loop {
- if *must_unpark {
- *must_unpark = false;
- break;
- }
+ err
+ })?;
- if let Some(parking_duration) = self.max_throttling.checked_sub(last.elapsed()) {
- let result = self
- .must_unpark_cvar
- .wait_timeout(must_unpark, parking_duration)
- .unwrap();
+ tasks_checked += 1;
+ } else {
+ // No more ready tasks.
+ if tasks_checked > 0 {
+ // Check if the main future is ready before parking.
+ if let Poll::Ready(t) = future.as_mut().poll(cx) {
+ return Ok(t);
+ }
+ }
+ // else: main future has just been checked.
- #[cfg(feature = "tuning")]
- self.parked_duration
- .fetch_add(parking_duration.subsec_nanos() as u64, Ordering::Relaxed);
+ let mut must_unpark = self.must_unpark.lock().unwrap();
+ loop {
+ if *must_unpark {
+ *must_unpark = false;
+ continue 'main;
+ }
- must_unpark = result.0;
- } else {
- *must_unpark = false;
- break;
+ if let Some(parking_duration) =
+ self.max_throttling.checked_sub(last_react.elapsed())
+ {
+ #[cfg(feature = "tuning")]
+ self.parked_duration.fetch_add(
+ parking_duration.subsec_nanos() as u64,
+ Ordering::Relaxed,
+ );
+
+ let result = self
+ .must_unpark_cvar
+ .wait_timeout(must_unpark, parking_duration)
+ .unwrap();
+
+ must_unpark = result.0;
+ } else {
+ *must_unpark = false;
+ continue 'main;
+ }
+ }
}
}
}