diff options
author | François Laignel <fengalin@free.fr> | 2022-08-31 00:56:01 +0300 |
---|---|---|
committer | Sebastian Dröge <slomo@coaxion.net> | 2022-09-13 10:29:50 +0300 |
commit | ab327be9af44e73dd9572e9c30e1368a41929013 (patch) | |
tree | b4e603983f61ba941096967b9882e0637e92e058 /generic | |
parent | d39aabe05477b858adc6bd32f7da90c505894ad1 (diff) |
ts/scheduler: improve tasks / io & timers polling balance
Set a limit to the nb of task checked before checking the reactor
and the main future again.
Diffstat (limited to 'generic')
-rw-r--r-- | generic/threadshare/src/runtime/executor/scheduler.rs | 96 |
1 files changed, 59 insertions, 37 deletions
diff --git a/generic/threadshare/src/runtime/executor/scheduler.rs b/generic/threadshare/src/runtime/executor/scheduler.rs index ba4d6fa62..4ce1c008e 100644 --- a/generic/threadshare/src/runtime/executor/scheduler.rs +++ b/generic/threadshare/src/runtime/executor/scheduler.rs @@ -42,6 +42,7 @@ pub(super) struct Scheduler { impl Scheduler { pub const DUMMY_NAME: &'static str = "DUMMY"; + const MAX_SUCCESSIVE_TASKS: usize = 64; pub fn start(context_name: &str, max_throttling: Duration) -> Handle { // Name the thread so that it appears in panic messages. @@ -178,52 +179,73 @@ impl Scheduler { let _guard = CallOnDrop::new(|| Scheduler::close(Arc::clone(&self.context_name))); - if let Poll::Ready(t) = future.as_mut().poll(cx) { - return Ok(t); - } - - let mut last; - loop { - last = Instant::now(); - Reactor::with_mut(|reactor| reactor.react(last).ok()); + let mut now; + // This is to ensure reactor invocation on the first iteration. + let mut last_react = Instant::now() - self.max_throttling; + let mut tasks_checked; + 'main: loop { + // Only check I/O and timers every `max_throttling`. + now = Instant::now(); + if now - last_react >= self.max_throttling { + last_react = now; + Reactor::with_mut(|reactor| reactor.react(now).ok()); + } if let Poll::Ready(t) = future.as_mut().poll(cx) { return Ok(t); } - while let Ok(runnable) = self.tasks.pop_runnable() { - panic::catch_unwind(|| runnable.run()).map_err(|err| { - gst::error!( - RUNTIME_CAT, - "A task has panicked within Context {}", - self.context_name - ); - - err - })?; - } + tasks_checked = 0; + while tasks_checked < Self::MAX_SUCCESSIVE_TASKS { + if let Ok(runnable) = self.tasks.pop_runnable() { + panic::catch_unwind(|| runnable.run()).map_err(|err| { + gst::error!( + RUNTIME_CAT, + "A task has panicked within Context {}", + self.context_name + ); - let mut must_unpark = self.must_unpark.lock().unwrap(); - loop { - if *must_unpark { - *must_unpark = false; - break; - } + err + })?; - if let Some(parking_duration) = self.max_throttling.checked_sub(last.elapsed()) { - let result = self - .must_unpark_cvar - .wait_timeout(must_unpark, parking_duration) - .unwrap(); + tasks_checked += 1; + } else { + // No more ready tasks. + if tasks_checked > 0 { + // Check if the main future is ready before parking. + if let Poll::Ready(t) = future.as_mut().poll(cx) { + return Ok(t); + } + } + // else: main future has just been checked. - #[cfg(feature = "tuning")] - self.parked_duration - .fetch_add(parking_duration.subsec_nanos() as u64, Ordering::Relaxed); + let mut must_unpark = self.must_unpark.lock().unwrap(); + loop { + if *must_unpark { + *must_unpark = false; + continue 'main; + } - must_unpark = result.0; - } else { - *must_unpark = false; - break; + if let Some(parking_duration) = + self.max_throttling.checked_sub(last_react.elapsed()) + { + #[cfg(feature = "tuning")] + self.parked_duration.fetch_add( + parking_duration.subsec_nanos() as u64, + Ordering::Relaxed, + ); + + let result = self + .must_unpark_cvar + .wait_timeout(must_unpark, parking_duration) + .unwrap(); + + must_unpark = result.0; + } else { + *must_unpark = false; + continue 'main; + } + } } } } |