Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.freedesktop.org/gstreamer/gst-plugins-rs.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorFrançois Laignel <fengalin@free.fr>2022-09-01 18:54:05 +0300
committerSebastian Dröge <slomo@coaxion.net>2022-09-13 10:29:50 +0300
commit61c62ee1e85d48578a11e4e84366885fcd0fe4ac (patch)
treef1913e624e16e2553daf4186d9cd4f665db53ded /generic/threadshare/src/runtime
parent235ded35fd71082ac9ec78502ba462adb238d38f (diff)
ts/timers: multiple improvements
This commit improves threadshare timers predictability by better making use of current time slice. Added a dedicate timer BTreeMap for after timers (those that are guaranteed to fire no sooner than the expected instant) so as to avoid previous workaround which added half the max throttling duration. These timers can now be checked against the reactor processing instant. Oneshot timers only need to be polled as `Future`s when intervals are `Stream`s. This also reduces the size for oneshot timers and make user call `next` on intervals. Intervals can also implement `FusedStream`, which can help when used in features such as `select!`. Also drop the `time` module, which was kepts for compatibility when the `executor` was migrated from tokio based to smol-like.
Diffstat (limited to 'generic/threadshare/src/runtime')
-rw-r--r--generic/threadshare/src/runtime/executor/context.rs10
-rw-r--r--generic/threadshare/src/runtime/executor/mod.rs1
-rw-r--r--generic/threadshare/src/runtime/executor/reactor.rs187
-rw-r--r--generic/threadshare/src/runtime/executor/scheduler.rs18
-rw-r--r--generic/threadshare/src/runtime/executor/timer.rs629
-rw-r--r--generic/threadshare/src/runtime/mod.rs7
-rw-r--r--generic/threadshare/src/runtime/task.rs8
-rw-r--r--generic/threadshare/src/runtime/time.rs42
8 files changed, 631 insertions, 271 deletions
diff --git a/generic/threadshare/src/runtime/executor/context.rs b/generic/threadshare/src/runtime/executor/context.rs
index 325e20190..e12fdcbbb 100644
--- a/generic/threadshare/src/runtime/executor/context.rs
+++ b/generic/threadshare/src/runtime/executor/context.rs
@@ -348,7 +348,7 @@ mod tests {
let elapsed = crate::runtime::executor::block_on(async {
let now = Instant::now();
- crate::runtime::time::delay_for(DELAY).await;
+ crate::runtime::timer::delay_for(DELAY).await;
now.elapsed()
});
@@ -490,7 +490,7 @@ mod tests {
let elapsed = crate::runtime::executor::block_on(context.spawn(async {
let start = Instant::now();
- crate::runtime::time::delay_for(DELAY).await;
+ crate::runtime::timer::delay_for(DELAY).await;
start.elapsed()
}))
.unwrap();
@@ -507,7 +507,7 @@ mod tests {
// Panic: attempt to `runtime::executor::block_on` within a `Context` thread
let join_handle = context.spawn(async {
- crate::runtime::executor::block_on(crate::runtime::time::delay_for(DELAY));
+ crate::runtime::executor::block_on(crate::runtime::timer::delay_for(DELAY));
});
// Panic: task has failed
@@ -533,7 +533,7 @@ mod tests {
assert_eq!(bytes_sent, 10);
let (start, timer) =
- context.enter(|| (Instant::now(), crate::runtime::time::delay_for(DELAY)));
+ context.enter(|| (Instant::now(), crate::runtime::timer::delay_for(DELAY)));
timer.await;
start.elapsed()
});
@@ -559,7 +559,7 @@ mod tests {
assert_eq!(bytes_sent, 10);
let (start, timer) =
- context.enter(|| (Instant::now(), crate::runtime::time::delay_for(DELAY)));
+ context.enter(|| (Instant::now(), crate::runtime::timer::delay_for(DELAY)));
let elapsed = crate::runtime::executor::block_on(async move {
timer.await;
start.elapsed()
diff --git a/generic/threadshare/src/runtime/executor/mod.rs b/generic/threadshare/src/runtime/executor/mod.rs
index 61ccbe04e..f47a6491f 100644
--- a/generic/threadshare/src/runtime/executor/mod.rs
+++ b/generic/threadshare/src/runtime/executor/mod.rs
@@ -41,7 +41,6 @@ mod task;
pub use task::{SubTaskOutput, TaskId};
pub mod timer;
-pub use timer::Timer;
struct CallOnDrop<F: FnOnce()>(Option<F>);
diff --git a/generic/threadshare/src/runtime/executor/reactor.rs b/generic/threadshare/src/runtime/executor/reactor.rs
index 6769f47a0..d93ba9a0c 100644
--- a/generic/threadshare/src/runtime/executor/reactor.rs
+++ b/generic/threadshare/src/runtime/executor/reactor.rs
@@ -1,7 +1,7 @@
// This is based on https://github.com/smol-rs/async-io
// with adaptations by:
//
-// Copyright (C) 2021 François Laignel <fengalin@free.fr>
+// Copyright (C) 2021-2022 François Laignel <fengalin@free.fr>
//
// Take a look at the license at the top of the repository in the LICENSE file.
@@ -53,9 +53,18 @@ pub(super) struct Reactor {
/// fresh "round" of `ReactorLock::react()`.
ticker: AtomicUsize,
+ /// Time when timers have been checked in current time slice.
+ timers_check_instant: Instant,
+
+ /// Time limit when timers are being fired in current time slice.
+ time_slice_end: Instant,
+
/// Half max throttling duration, needed to fire timers.
half_max_throttling: Duration,
+ /// List of wakers to wake when reacting.
+ wakers: Vec<Waker>,
+
/// Registered sources.
sources: Slab<Arc<Source>>,
@@ -64,12 +73,21 @@ pub(super) struct Reactor {
/// Holding a lock on this event list implies the exclusive right to poll I/O.
events: Vec<Event>,
- /// An ordered map of registered timers.
+ /// An ordered map of registered regular timers.
+ ///
+ /// Timers are in the order in which they fire. The `RegularTimerId` distinguishes
+ /// timers that fire at the same time. The `Waker` represents the task awaiting the
+ /// timer.
+ timers: BTreeMap<(Instant, RegularTimerId), Waker>,
+
+ /// An ordered map of registered after timers.
+ ///
+ /// These timers are guaranteed to fire no sooner than their expected time.
///
- /// Timers are in the order in which they fire. The `usize` in this type is a timer ID used to
- /// distinguish timers that fire at the same time. The `Waker` represents the task awaiting the
+ /// Timers are in the order in which they fire. The `AfterTimerId` distinguishes
+ /// timers that fire at the same time. The `Waker` represents the task awaiting the
/// timer.
- timers: BTreeMap<(Instant, usize), Waker>,
+ after_timers: BTreeMap<(Instant, AfterTimerId), Waker>,
/// A queue of timer operations (insert and remove).
///
@@ -83,10 +101,14 @@ impl Reactor {
Reactor {
poller: Poller::new().expect("cannot initialize I/O event notification"),
ticker: AtomicUsize::new(0),
- half_max_throttling: max_throttling / 2 + Duration::from_nanos(1),
+ timers_check_instant: Instant::now(),
+ time_slice_end: Instant::now(),
+ half_max_throttling: max_throttling / 2,
+ wakers: Vec::new(),
sources: Slab::new(),
events: Vec::new(),
timers: BTreeMap::new(),
+ after_timers: BTreeMap::new(),
timer_ops: ConcurrentQueue::bounded(1000),
}
}
@@ -166,6 +188,14 @@ impl Reactor {
self.half_max_throttling
}
+ pub fn timers_check_instant(&self) -> Instant {
+ self.timers_check_instant
+ }
+
+ pub fn time_slice_end(&self) -> Instant {
+ self.time_slice_end
+ }
+
/// Registers an I/O source in the reactor.
pub fn insert_io(
&mut self,
@@ -205,18 +235,40 @@ impl Reactor {
self.poller.delete(source.raw)
}
- /// Registers a timer in the reactor.
+ /// Registers a regular timer in the reactor.
+ ///
+ /// Returns the inserted timer's ID.
+ pub fn insert_regular_timer(&mut self, when: Instant, waker: &Waker) -> RegularTimerId {
+ // Generate a new timer ID.
+ static REGULAR_ID_GENERATOR: AtomicUsize = AtomicUsize::new(1);
+ let id = RegularTimerId(REGULAR_ID_GENERATOR.fetch_add(1, Ordering::Relaxed));
+
+ // Push an insert operation.
+ while self
+ .timer_ops
+ .push(TimerOp::Insert(when, id.into(), waker.clone()))
+ .is_err()
+ {
+ // If the queue is full, drain it and try again.
+ gst::warning!(RUNTIME_CAT, "react: timer_ops is full");
+ self.process_timer_ops();
+ }
+
+ id
+ }
+
+ /// Registers an after timer in the reactor.
///
/// Returns the inserted timer's ID.
- pub fn insert_timer(&mut self, when: Instant, waker: &Waker) -> usize {
+ pub fn insert_after_timer(&mut self, when: Instant, waker: &Waker) -> AfterTimerId {
// Generate a new timer ID.
- static ID_GENERATOR: AtomicUsize = AtomicUsize::new(1);
- let id = ID_GENERATOR.fetch_add(1, Ordering::Relaxed);
+ static AFTER_ID_GENERATOR: AtomicUsize = AtomicUsize::new(1);
+ let id = AfterTimerId(AFTER_ID_GENERATOR.fetch_add(1, Ordering::Relaxed));
// Push an insert operation.
while self
.timer_ops
- .push(TimerOp::Insert(when, id, waker.clone()))
+ .push(TimerOp::Insert(when, id.into(), waker.clone()))
.is_err()
{
// If the queue is full, drain it and try again.
@@ -228,8 +280,9 @@ impl Reactor {
}
/// Deregisters a timer from the reactor.
- pub fn remove_timer(&mut self, when: Instant, id: usize) {
+ pub fn remove_timer(&mut self, when: Instant, id: impl Into<TimerId>) {
// Push a remove operation.
+ let id = id.into();
while self.timer_ops.push(TimerOp::Remove(when, id)).is_err() {
gst::warning!(RUNTIME_CAT, "react: timer_ops is full");
// If the queue is full, drain it and try again.
@@ -238,26 +291,54 @@ impl Reactor {
}
/// Processes ready timers and extends the list of wakers to wake.
- ///
- /// Returns the duration until the next timer before this method was called.
- fn process_timers(&mut self, wakers: &mut Vec<Waker>) {
+ fn process_timers(&mut self, now: Instant) {
self.process_timer_ops();
- let now = Instant::now();
+ self.timers_check_instant = now;
+ self.time_slice_end = now + self.half_max_throttling;
- // Split timers into ready and pending timers.
+ // Split regular timers into ready and pending timers.
//
- // Careful to split just *after* `now`, so that a timer set for exactly `now` is considered
- // ready.
- let pending = self.timers.split_off(&(now + self.half_max_throttling, 0));
+ // Careful to split just *after* current time slice end,
+ // so that a timer set to fire in current time slice is
+ // considered ready.
+ let pending = self
+ .timers
+ .split_off(&(self.time_slice_end, RegularTimerId::NONE));
let ready = mem::replace(&mut self.timers, pending);
// Add wakers to the list.
if !ready.is_empty() {
- gst::trace!(RUNTIME_CAT, "process_timers: {} ready wakers", ready.len());
+ gst::trace!(
+ RUNTIME_CAT,
+ "process_timers (regular): {} ready wakers",
+ ready.len()
+ );
+
+ for (_, waker) in ready {
+ self.wakers.push(waker);
+ }
+ }
+
+ // Split "at least" timers into ready and pending timers.
+ //
+ // Careful to split just *after* `now`,
+ // so that a timer set for exactly `now` is considered ready.
+ let pending = self
+ .after_timers
+ .split_off(&(self.timers_check_instant, AfterTimerId::NONE));
+ let ready = mem::replace(&mut self.after_timers, pending);
+
+ // Add wakers to the list.
+ if !ready.is_empty() {
+ gst::trace!(
+ RUNTIME_CAT,
+ "process_timers (after): {} ready wakers",
+ ready.len()
+ );
for (_, waker) in ready {
- wakers.push(waker);
+ self.wakers.push(waker);
}
}
}
@@ -268,23 +349,29 @@ impl Reactor {
// forever.
for _ in 0..self.timer_ops.capacity().unwrap() {
match self.timer_ops.pop() {
- Ok(TimerOp::Insert(when, id, waker)) => {
+ Ok(TimerOp::Insert(when, TimerId::Regular(id), waker)) => {
self.timers.insert((when, id), waker);
}
- Ok(TimerOp::Remove(when, id)) => {
+ Ok(TimerOp::Insert(when, TimerId::After(id), waker)) => {
+ self.after_timers.insert((when, id), waker);
+ }
+ Ok(TimerOp::Remove(when, TimerId::Regular(id))) => {
self.timers.remove(&(when, id));
}
+ Ok(TimerOp::Remove(when, TimerId::After(id))) => {
+ self.after_timers.remove(&(when, id));
+ }
Err(_) => break,
}
}
}
/// Processes new events.
- pub fn react(&mut self) -> io::Result<()> {
- let mut wakers = Vec::new();
+ pub fn react(&mut self, now: Instant) -> io::Result<()> {
+ debug_assert!(self.wakers.is_empty());
// Process ready timers.
- self.process_timers(&mut wakers);
+ self.process_timers(now);
// Bump the ticker before polling I/O.
let tick = self.ticker.fetch_add(1, Ordering::SeqCst).wrapping_add(1);
@@ -306,7 +393,7 @@ impl Reactor {
for &(dir, emitted) in &[(WRITE, ev.writable), (READ, ev.readable)] {
if emitted {
state[dir].tick = tick;
- state[dir].drain_into(&mut wakers);
+ state[dir].drain_into(&mut self.wakers);
}
}
@@ -337,10 +424,10 @@ impl Reactor {
};
// Wake up ready tasks.
- if !wakers.is_empty() {
- gst::trace!(RUNTIME_CAT, "react: {} ready wakers", wakers.len());
+ if !self.wakers.is_empty() {
+ gst::trace!(RUNTIME_CAT, "react: {} ready wakers", self.wakers.len());
- for waker in wakers {
+ for waker in self.wakers.drain(..) {
// Don't let a panicking waker blow everything up.
panic::catch_unwind(|| waker.wake()).ok();
}
@@ -350,10 +437,44 @@ impl Reactor {
}
}
+/// Timer will fire in its time slice.
+/// This can happen before of after the expected time.
+#[derive(Copy, Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)]
+pub struct RegularTimerId(usize);
+impl RegularTimerId {
+ const NONE: RegularTimerId = RegularTimerId(0);
+}
+
+/// Timer is guaranteed to fire after the expected time.
+#[derive(Copy, Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)]
+pub struct AfterTimerId(usize);
+impl AfterTimerId {
+ const NONE: AfterTimerId = AfterTimerId(0);
+}
+
+/// Any Timer Ids.
+#[derive(Copy, Clone, Debug)]
+pub(crate) enum TimerId {
+ Regular(RegularTimerId),
+ After(AfterTimerId),
+}
+
+impl From<RegularTimerId> for TimerId {
+ fn from(id: RegularTimerId) -> Self {
+ TimerId::Regular(id)
+ }
+}
+
+impl From<AfterTimerId> for TimerId {
+ fn from(id: AfterTimerId) -> Self {
+ TimerId::After(id)
+ }
+}
+
/// A single timer operation.
enum TimerOp {
- Insert(Instant, usize, Waker),
- Remove(Instant, usize),
+ Insert(Instant, TimerId, Waker),
+ Remove(Instant, TimerId),
}
/// A registered source of I/O events.
diff --git a/generic/threadshare/src/runtime/executor/scheduler.rs b/generic/threadshare/src/runtime/executor/scheduler.rs
index 958d88b3f..9f862a664 100644
--- a/generic/threadshare/src/runtime/executor/scheduler.rs
+++ b/generic/threadshare/src/runtime/executor/scheduler.rs
@@ -178,16 +178,19 @@ impl Scheduler {
let _guard = CallOnDrop::new(|| Scheduler::close(Arc::clone(&self.context_name)));
+ if let Poll::Ready(t) = future.as_mut().poll(cx) {
+ return Ok(t);
+ }
+
let mut last;
loop {
last = Instant::now();
+ Reactor::with_mut(|reactor| reactor.react(last).ok());
if let Poll::Ready(t) = future.as_mut().poll(cx) {
- break Ok(t);
+ return Ok(t);
}
- Reactor::with_mut(|reactor| reactor.react().ok());
-
while let Ok(runnable) = self.tasks.pop_runnable() {
panic::catch_unwind(|| runnable.run()).map_err(|err| {
gst::error!(
@@ -447,11 +450,12 @@ impl PartialEq for Handle {
#[cfg(test)]
mod tests {
- use super::super::Timer;
- use super::*;
+ use super::super::*;
+ use std::time::Duration;
#[test]
fn block_on_task_join_handle() {
+ use futures::channel::oneshot;
use std::sync::mpsc;
let (join_sender, join_receiver) = mpsc::channel();
@@ -461,7 +465,7 @@ mod tests {
let handle =
Scheduler::init("block_on_task_join_handle".into(), Duration::from_millis(2));
let join_handle = handle.spawn(async {
- Timer::after(Duration::from_millis(5)).await;
+ timer::delay_for(Duration::from_millis(5)).await;
42
});
@@ -480,7 +484,7 @@ mod tests {
#[test]
fn block_on_timer() {
let res = Scheduler::block_on(async {
- Timer::after(Duration::from_millis(5)).await;
+ timer::delay_for(Duration::from_millis(5)).await;
42
});
diff --git a/generic/threadshare/src/runtime/executor/timer.rs b/generic/threadshare/src/runtime/executor/timer.rs
index 079c7a9ef..5ecddb9bb 100644
--- a/generic/threadshare/src/runtime/executor/timer.rs
+++ b/generic/threadshare/src/runtime/executor/timer.rs
@@ -1,193 +1,355 @@
// This is based on https://github.com/smol-rs/async-io
// with adaptations by:
//
-// Copyright (C) 2021 François Laignel <fengalin@free.fr>
+// Copyright (C) 2021-2022 François Laignel <fengalin@free.fr>
//
// Take a look at the license at the top of the repository in the LICENSE file.
-use futures::stream::Stream;
+use futures::stream::{FusedStream, Stream};
+use std::error::Error;
+use std::fmt;
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll, Waker};
use std::time::{Duration, Instant};
-use super::Reactor;
+use super::reactor::{AfterTimerId, Reactor, RegularTimerId};
-/// A future or stream that emits timed events.
-///
-/// Timers are futures that output a single [`Instant`] when they fire.
-///
-/// Timers are also streams that can output [`Instant`]s periodically.
#[derive(Debug)]
-pub struct Timer {
- /// This timer's ID and last waker that polled it.
- ///
- /// When this field is set to `None`, this timer is not registered in the reactor.
- id_and_waker: Option<(usize, Waker)>,
+pub struct IntervalError;
- /// The next instant at which this timer fires.
- when: Instant,
+impl Error for IntervalError {}
- /// The period.
- period: Duration,
+impl fmt::Display for IntervalError {
+ fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+ f.write_str("Interval period can't be null")
+ }
}
-impl Timer {
- /// Creates a timer that emits an event once after the given duration of time.
- ///
- /// When throttling is activated (i.e. when using a non-`0` `wait`
- /// duration in `Context::acquire`), timer entries are assigned to
- /// the nearest time frame, meaning that the delay might elapse
- /// `wait` / 2 ms earlier or later than the expected instant.
- ///
- /// Use [`Timer::after_at_least`] when it's preferable not to return
- /// before the expected instant.
- pub fn after(duration: Duration) -> Timer {
- Timer::at(Instant::now() + duration)
+/// Creates a timer that emits an event once after the given delay.
+///
+/// When throttling is activated (i.e. when using a non-`0` `wait`
+/// duration in `Context::acquire`), timer entries are assigned to
+/// the nearest time frame, meaning that the delay might elapse
+/// `wait` / 2 ms earlier or later than the expected instant.
+///
+/// Use [`delay_for_at_least`] when it's preferable not to return
+/// before the expected instant.
+pub fn delay_for(delay: Duration) -> Oneshot {
+ if delay <= Reactor::with(|r| r.half_max_throttling()) {
+ // timer should fire now.
+ return Oneshot::new(Reactor::with(|r| r.timers_check_instant()));
}
- /// Creates a timer that emits an event once after the given duration of time.
- ///
- /// See [`Timer::after`] for details. The event won't be emitted before
- /// the expected delay has elapsed.
- #[track_caller]
- pub fn after_at_least(duration: Duration) -> Timer {
- Timer::at_least_at(Instant::now() + duration)
+ Oneshot::new(Instant::now() + delay)
+}
+
+/// Creates a timer that emits an event once at the given time instant.
+///
+/// When throttling is activated (i.e. when using a non-`0` `wait`
+/// duration in `Context::acquire`), timer entries are assigned to
+/// the nearest time frame, meaning that the delay might elapse
+/// `wait` / 2 ms earlier or later than the expected instant.
+///
+/// Use [`after`] when it's preferable not to return
+/// before the expected instant.
+pub fn at(when: Instant) -> Oneshot {
+ if when <= Instant::now() {
+ // timer should fire now.
+ return Oneshot::new(Reactor::with(|r| r.timers_check_instant()));
}
- /// Creates a timer that emits an event once at the given time instant.
- ///
- /// When throttling is activated (i.e. when using a non-`0` `wait`
- /// duration in `Context::acquire`), timer entries are assigned to
- /// the nearest time frame, meaning that the delay might elapse
- /// `wait` / 2 ms earlier or later than the expected instant.
- ///
- /// Use [`Timer::at_least_at`] when it's preferable not to return
- /// before the expected instant.
- pub fn at(instant: Instant) -> Timer {
- Timer::interval_at(instant, Duration::MAX)
+ Oneshot::new(when)
+}
+
+/// Creates a timer that emits events periodically, starting as soon as possible.
+///
+/// Returns an error if `period` is zero.
+///
+/// When throttling is activated (i.e. when using a non-`0` `wait`
+/// duration in `Context::acquire`), timer entries are assigned to
+/// the nearest time frame, meaning that the delay might elapse
+/// `wait` / 2 ms earlier or later than the expected instant.
+///
+/// Use [`interval_at_least`] when it's preferable not to tick
+/// before the expected instants.
+pub fn interval(period: Duration) -> Result<Interval, IntervalError> {
+ interval_at(Instant::now(), period)
+}
+
+/// Creates a timer that emits events periodically, starting after `delay`.
+///
+/// Returns an error if `period` is zero.
+///
+/// When throttling is activated (i.e. when using a non-`0` `wait`
+/// duration in `Context::acquire`), timer entries are assigned to
+/// the nearest time frame, meaning that the delay might elapse
+/// `wait` / 2 ms earlier or later than the expected instant.
+///
+/// Use [`interval_delayed_by_at_least`] when it's preferable not to tick
+/// before the expected instants.
+pub fn interval_delayed_by(delay: Duration, period: Duration) -> Result<Interval, IntervalError> {
+ interval_at(Instant::now() + delay, period)
+}
+
+/// Creates a timer that emits events periodically, starting at `start`.
+///
+/// When throttling is activated (i.e. when using a non-`0` `wait`
+/// duration in `Context::acquire`), timer entries are assigned to
+/// the nearest time frame, meaning that the delay might elapse
+/// `wait` / 2 ms earlier or later than the expected instant.
+///
+/// Use [`interval_after_at_least`] when it's preferable not to tick
+/// before the expected instants.
+pub fn interval_at(start: Instant, period: Duration) -> Result<Interval, IntervalError> {
+ if period.is_zero() {
+ return Err(IntervalError);
}
- /// Creates a timer that emits an event once at the given time instant.
- ///
- /// See [`Timer::at`] for details. The event won't be emitted before
- /// the expected delay has elapsed.
- #[track_caller]
- pub fn at_least_at(instant: Instant) -> Timer {
- Timer::interval_at_least_at(instant, Duration::MAX)
+ Ok(Interval::new(start, period))
+}
+
+/// Creates a timer that emits an event once after the given delay.
+///
+/// See [`delay_for`] for details. The event is guaranteed to be emitted
+/// no sooner than the expected delay has elapsed.
+#[track_caller]
+pub fn delay_for_at_least(delay: Duration) -> OneshotAfter {
+ if delay.is_zero() {
+ // timer should fire now.
+ return OneshotAfter::new(Reactor::with(|r| r.timers_check_instant()));
}
- /// Creates a timer that emits events periodically.
- pub fn interval(period: Duration) -> Timer {
- Timer::interval_at(Instant::now() + period, period)
+ OneshotAfter::new(Instant::now() + delay)
+}
+
+/// Creates a timer that emits an event once no sooner than the given time instant.
+///
+/// See [`at`] for details.
+#[track_caller]
+pub fn after(when: Instant) -> OneshotAfter {
+ if when <= Instant::now() {
+ // timer should fire now.
+ return OneshotAfter::new(Reactor::with(|r| r.timers_check_instant()));
}
- /// Creates a timer that emits events periodically, starting after `delay`.
- ///
- /// When throttling is activated (i.e. when using a non-`0` `wait`
- /// duration in `Context::acquire`), timer entries are assigned to
- /// the nearest time frame, meaning that the delay might elapse
- /// `wait` / 2 ms earlier or later than the expected instant.
- pub fn interval_delayed_by(delay: Duration, period: Duration) -> Timer {
- Timer::interval_at(Instant::now() + delay, period)
+ OneshotAfter::new(when)
+}
+
+/// Creates a timer that emits events periodically, starting as soon as possible.
+///
+/// Returns an error if `period` is zero.
+///
+/// See [`interval`] for details. The events are guaranteed to be
+/// emitted no sooner than the expected instants.
+pub fn interval_at_least(period: Duration) -> Result<IntervalAfter, IntervalError> {
+ interval_after_at_least(Instant::now(), period)
+}
+
+/// Creates a timer that emits events periodically, starting after at least `delay`.
+///
+/// Returns an error if `period` is zero.
+///
+/// See [`interval_delayed_by`] for details. The events are guaranteed to be
+/// emitted no sooner than the expected instants.
+#[track_caller]
+pub fn interval_delayed_by_at_least(
+ delay: Duration,
+ period: Duration,
+) -> Result<IntervalAfter, IntervalError> {
+ interval_after_at_least(Instant::now() + delay, period)
+}
+
+/// Creates a timer that emits events periodically, starting at `start`.
+///
+/// See [`interval_at`] for details. The events are guaranteed to be
+/// emitted no sooner than the expected instants.
+#[track_caller]
+pub fn interval_after_at_least(
+ start: Instant,
+ period: Duration,
+) -> Result<IntervalAfter, IntervalError> {
+ if period.is_zero() {
+ return Err(IntervalError);
}
- /// Creates a timer that emits events periodically, starting at `start`.
- ///
- /// When throttling is activated (i.e. when using a non-`0` `wait`
- /// duration in `Context::acquire`), timer entries are assigned to
- /// the nearest time frame, meaning that the delay might elapse
- /// `wait` / 2 ms earlier or later than the expected instant.
+ Ok(IntervalAfter::new(start, period))
+}
+
+/// A future that emits an event at the given time.
+///
+/// `Oneshot`s are futures that resolve when the given time is reached.
+///
+/// When throttling is activated (i.e. when using a non-`0` `wait`
+/// duration in `Context::acquire`), timer entries are assigned to
+/// the nearest time frame, meaning that the timer may fire
+/// `wait` / 2 ms earlier or later than the expected instant.
+#[derive(Debug)]
+pub struct Oneshot {
+ /// This timer's ID and last waker that polled it.
///
- /// Use [`Timer::interval_at_least_at`] when it's preferable not to return
- /// before the expected instant.
- pub fn interval_at(start: Instant, period: Duration) -> Timer {
- Timer {
+ /// When this field is set to `None`, this timer is not registered in the reactor.
+ id_and_waker: Option<(RegularTimerId, Waker)>,
+
+ /// The instant at which this timer fires.
+ when: Instant,
+}
+
+impl Oneshot {
+ fn new(when: Instant) -> Self {
+ Oneshot {
id_and_waker: None,
- when: start,
- period,
+ when,
}
}
+}
- /// Creates a timer that emits events periodically, starting at `start`.
- ///
- /// See [`Timer::interval_at`] for details. The event won't be emitted before
- /// the expected delay has elapsed.
- #[track_caller]
- pub fn interval_at_least_at(start: Instant, period: Duration) -> Timer {
- Timer {
- id_and_waker: None,
- when: start + Reactor::with(|reactor| reactor.half_max_throttling()),
- period,
+impl Drop for Oneshot {
+ fn drop(&mut self) {
+ if let Some((id, _)) = self.id_and_waker.take() {
+ Reactor::with_mut(|reactor| {
+ reactor.remove_timer(self.when, id);
+ });
}
}
+}
- /// Sets the timer to emit an en event once after the given duration of time.
- ///
- /// Note that resetting a timer is different from creating a new timer because
- /// [`set_after()`][`Timer::set_after()`] does not remove the waker associated with the task
- /// that is polling the timer.
- pub fn set_after(&mut self, duration: Duration) {
- self.set_at(Instant::now() + duration);
- }
+impl Future for Oneshot {
+ type Output = ();
- /// Sets the timer to emit an event once at the given time instant.
- ///
- /// Note that resetting a timer is different from creating a new timer because
- /// [`set_at()`][`Timer::set_at()`] does not remove the waker associated with the task
- /// that is polling the timer.
- pub fn set_at(&mut self, instant: Instant) {
+ fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
Reactor::with_mut(|reactor| {
- if let Some((id, _)) = self.id_and_waker.as_ref() {
- // Deregister the timer from the reactor.
- reactor.remove_timer(self.when, *id);
- }
+ if reactor.time_slice_end() >= self.when {
+ if let Some((id, _)) = self.id_and_waker.take() {
+ // Deregister the timer from the reactor.
+ reactor.remove_timer(self.when, id);
+ }
+
+ Poll::Ready(())
+ } else {
+ match &self.id_and_waker {
+ None => {
+ // Register the timer in the reactor.
+ let id = reactor.insert_regular_timer(self.when, cx.waker());
+ self.id_and_waker = Some((id, cx.waker().clone()));
+ }
+ Some((id, w)) if !w.will_wake(cx.waker()) => {
+ // Deregister the timer from the reactor to remove the old waker.
+ reactor.remove_timer(self.when, *id);
- // Update the timeout.
- self.when = instant;
+ // Register the timer in the reactor with the new waker.
+ let id = reactor.insert_regular_timer(self.when, cx.waker());
+ self.id_and_waker = Some((id, cx.waker().clone()));
+ }
+ Some(_) => {}
+ }
- if let Some((id, waker)) = self.id_and_waker.as_mut() {
- // Re-register the timer with the new timeout.
- *id = reactor.insert_timer(self.when, waker);
+ Poll::Pending
}
})
}
+}
- /// Sets the timer to emit events periodically.
+/// A future that emits an event at the given time.
+///
+/// `OneshotAfter`s are futures that always resolve after
+/// the given time is reached.
+#[derive(Debug)]
+pub struct OneshotAfter {
+ /// This timer's ID and last waker that polled it.
///
- /// Note that resetting a timer is different from creating a new timer because
- /// [`set_interval()`][`Timer::set_interval()`] does not remove the waker associated with the
- /// task that is polling the timer.
- pub fn set_interval(&mut self, period: Duration) {
- self.set_interval_at(Instant::now() + period, period);
+ /// When this field is set to `None`, this timer is not registered in the reactor.
+ id_and_waker: Option<(AfterTimerId, Waker)>,
+
+ /// The instant at which this timer fires.
+ when: Instant,
+}
+
+impl OneshotAfter {
+ fn new(when: Instant) -> Self {
+ OneshotAfter {
+ id_and_waker: None,
+ when,
+ }
}
+}
- /// Sets the timer to emit events periodically, starting at `start`.
- ///
- /// Note that resetting a timer is different from creating a new timer because
- /// [`set_interval_at()`][`Timer::set_interval_at()`] does not remove the waker associated with
- /// the task that is polling the timer.
- pub fn set_interval_at(&mut self, start: Instant, period: Duration) {
- // Note: the timer might have been registered on an Executor and then transfered to another.
+impl Drop for OneshotAfter {
+ fn drop(&mut self) {
+ if let Some((id, _)) = self.id_and_waker.take() {
+ Reactor::with_mut(|reactor| {
+ reactor.remove_timer(self.when, id);
+ });
+ }
+ }
+}
+
+impl Future for OneshotAfter {
+ type Output = ();
+
+ fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
Reactor::with_mut(|reactor| {
- if let Some((id, _)) = self.id_and_waker.as_ref() {
- // Deregister the timer from the reactor.
- reactor.remove_timer(self.when, *id);
- }
+ if reactor.timers_check_instant() >= self.when {
+ if let Some((id, _)) = self.id_and_waker.take() {
+ // Deregister the timer from the reactor.
+ reactor.remove_timer(self.when, id);
+ }
+
+ Poll::Ready(())
+ } else {
+ match &self.id_and_waker {
+ None => {
+ // Register the timer in the reactor.
+ let id = reactor.insert_after_timer(self.when, cx.waker());
+ self.id_and_waker = Some((id, cx.waker().clone()));
+ }
+ Some((id, w)) if !w.will_wake(cx.waker()) => {
+ // Deregister the timer from the reactor to remove the old waker.
+ reactor.remove_timer(self.when, *id);
- self.when = start;
- self.period = period;
+ // Register the timer in the reactor with the new waker.
+ let id = reactor.insert_after_timer(self.when, cx.waker());
+ self.id_and_waker = Some((id, cx.waker().clone()));
+ }
+ Some(_) => {}
+ }
- if let Some((id, waker)) = self.id_and_waker.as_mut() {
- // Re-register the timer with the new timeout.
- *id = reactor.insert_timer(self.when, waker);
+ Poll::Pending
}
})
}
}
-impl Drop for Timer {
+/// A stream that emits timed events.
+///
+/// `Interval`s are streams that ticks periodically in the closest
+/// time slice.
+#[derive(Debug)]
+pub struct Interval {
+ /// This timer's ID and last waker that polled it.
+ ///
+ /// When this field is set to `None`, this timer is not registered in the reactor.
+ id_and_waker: Option<(RegularTimerId, Waker)>,
+
+ /// The next instant at which this timer should fire.
+ when: Instant,
+
+ /// The period.
+ period: Duration,
+}
+
+impl Interval {
+ fn new(start: Instant, period: Duration) -> Self {
+ Interval {
+ id_and_waker: None,
+ when: start,
+ period,
+ }
+ }
+}
+
+impl Drop for Interval {
fn drop(&mut self) {
if let Some((id, _)) = self.id_and_waker.take() {
Reactor::with_mut(|reactor| {
@@ -197,42 +359,125 @@ impl Drop for Timer {
}
}
-impl Future for Timer {
- type Output = ();
+impl Stream for Interval {
+ type Item = ();
+
+ fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
+ Reactor::with_mut(|reactor| {
+ let time_slice_end = reactor.time_slice_end();
+ if time_slice_end >= self.when {
+ if let Some((id, _)) = self.id_and_waker.take() {
+ // Deregister the timer from the reactor.
+ reactor.remove_timer(self.when, id);
+ }
+ // Compute the next tick making sure we are not so late
+ // that we would need to tick again right now.
+ let period = self.period;
+ while time_slice_end >= self.when {
+ // This can't overflow in practical conditions.
+ self.when += period;
+ }
+ // Register the timer in the reactor.
+ let id = reactor.insert_regular_timer(self.when, cx.waker());
+ self.id_and_waker = Some((id, cx.waker().clone()));
+
+ Poll::Ready(Some(()))
+ } else {
+ match &self.id_and_waker {
+ None => {
+ // Register the timer in the reactor.
+ let id = reactor.insert_regular_timer(self.when, cx.waker());
+ self.id_and_waker = Some((id, cx.waker().clone()));
+ }
+ Some((id, w)) if !w.will_wake(cx.waker()) => {
+ // Deregister the timer from the reactor to remove the old waker.
+ reactor.remove_timer(self.when, *id);
+
+ // Register the timer in the reactor with the new waker.
+ let id = reactor.insert_regular_timer(self.when, cx.waker());
+ self.id_and_waker = Some((id, cx.waker().clone()));
+ }
+ Some(_) => {}
+ }
+
+ Poll::Pending
+ }
+ })
+ }
+}
+
+impl FusedStream for Interval {
+ fn is_terminated(&self) -> bool {
+ // Interval is "infinite" in practice
+ false
+ }
+}
+/// A stream that emits timed events.
+///
+/// `IntervalAfter`s are streams that ticks periodically. Ticks are
+/// guaranteed to fire no sooner than the expected instant.
+#[derive(Debug)]
+pub struct IntervalAfter {
+ /// This timer's ID and last waker that polled it.
+ ///
+ /// When this field is set to `None`, this timer is not registered in the reactor.
+ id_and_waker: Option<(AfterTimerId, Waker)>,
+
+ /// The next instant at which this timer should fire.
+ when: Instant,
+
+ /// The period.
+ period: Duration,
+}
+
+impl IntervalAfter {
+ fn new(start: Instant, period: Duration) -> Self {
+ IntervalAfter {
+ id_and_waker: None,
+ when: start,
+ period,
+ }
+ }
+}
- fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
- match self.poll_next(cx) {
- Poll::Ready(Some(_)) => Poll::Ready(()),
- Poll::Pending => Poll::Pending,
- Poll::Ready(None) => unreachable!(),
+impl Drop for IntervalAfter {
+ fn drop(&mut self) {
+ if let Some((id, _)) = self.id_and_waker.take() {
+ Reactor::with_mut(|reactor| {
+ reactor.remove_timer(self.when, id);
+ });
}
}
}
-impl Stream for Timer {
+impl Stream for IntervalAfter {
type Item = ();
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
Reactor::with_mut(|reactor| {
- if Instant::now() + reactor.half_max_throttling() >= self.when {
+ let timers_check_instant = reactor.timers_check_instant();
+ if timers_check_instant >= self.when {
if let Some((id, _)) = self.id_and_waker.take() {
// Deregister the timer from the reactor.
reactor.remove_timer(self.when, id);
}
- let when = self.when;
- if let Some(next) = when.checked_add(self.period) {
- self.when = next;
- // Register the timer in the reactor.
- let id = reactor.insert_timer(self.when, cx.waker());
- self.id_and_waker = Some((id, cx.waker().clone()));
+ // Compute the next tick making sure we are not so late
+ // that we would need to tick again right now.
+ let period = self.period;
+ while timers_check_instant >= self.when {
+ // This can't overflow in practical conditions.
+ self.when += period;
}
+ // Register the timer in the reactor.
+ let id = reactor.insert_after_timer(self.when, cx.waker());
+ self.id_and_waker = Some((id, cx.waker().clone()));
Poll::Ready(Some(()))
} else {
match &self.id_and_waker {
None => {
// Register the timer in the reactor.
- let id = reactor.insert_timer(self.when, cx.waker());
+ let id = reactor.insert_after_timer(self.when, cx.waker());
self.id_and_waker = Some((id, cx.waker().clone()));
}
Some((id, w)) if !w.will_wake(cx.waker()) => {
@@ -240,7 +485,7 @@ impl Stream for Timer {
reactor.remove_timer(self.when, *id);
// Register the timer in the reactor with the new waker.
- let id = reactor.insert_timer(self.when, cx.waker());
+ let id = reactor.insert_after_timer(self.when, cx.waker());
self.id_and_waker = Some((id, cx.waker().clone()));
}
Some(_) => {}
@@ -252,31 +497,36 @@ impl Stream for Timer {
}
}
+impl FusedStream for IntervalAfter {
+ fn is_terminated(&self) -> bool {
+ // IntervalAfter is "infinite" in practice
+ false
+ }
+}
+
#[cfg(test)]
mod tests {
use std::time::{Duration, Instant};
- use super::Timer;
use crate::runtime::executor::Scheduler;
const MAX_THROTTLING: Duration = Duration::from_millis(10);
const DELAY: Duration = Duration::from_millis(12);
+ const PERIOD: Duration = Duration::from_millis(15);
#[test]
- fn delay_for() {
+ fn delay_for_regular() {
gst::init().unwrap();
- let handle = Scheduler::start("delay_for", MAX_THROTTLING);
+ let handle = Scheduler::start("delay_for_regular", MAX_THROTTLING);
- let elapsed = futures::executor::block_on(handle.spawn(async {
- let now = Instant::now();
- Timer::after(DELAY).await;
- now.elapsed()
+ futures::executor::block_on(handle.spawn(async {
+ let start = Instant::now();
+ super::delay_for(DELAY).await;
+ // Due to throttling, timer may be fired earlier
+ assert!(start.elapsed() + MAX_THROTTLING / 2 >= DELAY);
}))
.unwrap();
-
- // Due to throttling, timer may be fired earlier
- assert!(elapsed + MAX_THROTTLING / 2 >= DELAY);
}
#[test]
@@ -285,36 +535,65 @@ mod tests {
let handle = Scheduler::start("delay_for_at_least", MAX_THROTTLING);
- let elapsed = futures::executor::block_on(handle.spawn(async {
- let now = Instant::now();
- Timer::after_at_least(DELAY).await;
- now.elapsed()
+ futures::executor::block_on(handle.spawn(async {
+ let start = Instant::now();
+ super::delay_for_at_least(DELAY).await;
+ // Never returns earlier than DELAY
+ assert!(start.elapsed() >= DELAY);
}))
.unwrap();
+ }
+
+ #[test]
+ fn interval_regular() {
+ use futures::prelude::*;
+
+ gst::init().unwrap();
+
+ let handle = Scheduler::start("interval_regular", MAX_THROTTLING);
+
+ let join_handle = handle.spawn(async move {
+ let mut acc = Duration::ZERO;
+
+ let start = Instant::now();
+ let mut interval = super::interval(PERIOD).unwrap();
+
+ interval.next().await.unwrap();
+ assert!(start.elapsed() + MAX_THROTTLING / 2 >= acc);
+
+ // Due to throttling, intervals may tick earlier.
+ for _ in 0..10 {
+ interval.next().await.unwrap();
+ acc += PERIOD;
+ assert!(start.elapsed() + MAX_THROTTLING / 2 >= acc);
+ }
+ });
- // Never returns earlier than DELAY
- assert!(elapsed >= DELAY);
+ futures::executor::block_on(join_handle).unwrap();
}
#[test]
- fn interval() {
+ fn interval_after_at_least() {
use futures::prelude::*;
gst::init().unwrap();
- let handle = Scheduler::start("interval", MAX_THROTTLING);
+ let handle = Scheduler::start("interval_after", MAX_THROTTLING);
let join_handle = handle.spawn(async move {
+ let mut acc = DELAY;
+
let start = Instant::now();
- let mut interval = Timer::interval(DELAY);
+ let mut interval = super::interval_after_at_least(start + DELAY, PERIOD).unwrap();
- interval.next().await;
- // Due to throttling, timer may be fired earlier
- assert!(start.elapsed() + MAX_THROTTLING / 2 >= DELAY);
+ interval.next().await.unwrap();
+ assert!(start.elapsed() >= acc);
- interval.next().await;
- // Due to throttling, timer may be fired earlier
- assert!(start.elapsed() + MAX_THROTTLING >= 2 * DELAY);
+ for _ in 1..10 {
+ interval.next().await.unwrap();
+ acc += PERIOD;
+ assert!(start.elapsed() >= acc);
+ }
});
futures::executor::block_on(join_handle).unwrap();
diff --git a/generic/threadshare/src/runtime/mod.rs b/generic/threadshare/src/runtime/mod.rs
index c0e4ee5a5..4d77f78f2 100644
--- a/generic/threadshare/src/runtime/mod.rs
+++ b/generic/threadshare/src/runtime/mod.rs
@@ -1,4 +1,4 @@
-// Copyright (C) 2019-2021 François Laignel <fengalin@free.fr>
+// Copyright (C) 2019-2022 François Laignel <fengalin@free.fr>
//
// Take a look at the license at the top of the repository in the LICENSE file.
@@ -31,7 +31,7 @@
//! [`PadSink`]: pad/struct.PadSink.html
pub mod executor;
-pub use executor::{Async, Context, JoinHandle, SubTaskOutput, Timer};
+pub use executor::{timer, Async, Context, JoinHandle, SubTaskOutput};
pub mod pad;
pub use pad::{PadSink, PadSinkRef, PadSinkWeak, PadSrc, PadSrcRef, PadSrcWeak};
@@ -44,9 +44,6 @@ pub mod prelude {
pub use super::task::TaskImpl;
}
-pub mod time;
-pub use time::{delay_for, delay_for_at_least};
-
use once_cell::sync::Lazy;
static RUNTIME_CAT: Lazy<gst::DebugCategory> = Lazy::new(|| {
diff --git a/generic/threadshare/src/runtime/task.rs b/generic/threadshare/src/runtime/task.rs
index 137ceef92..7bbb35c4e 100644
--- a/generic/threadshare/src/runtime/task.rs
+++ b/generic/threadshare/src/runtime/task.rs
@@ -2354,7 +2354,7 @@ mod tests {
async move {
gst::debug!(RUNTIME_CAT, "pause_from_loop: entering handle_item");
- crate::runtime::time::delay_for(Duration::from_millis(50)).await;
+ crate::runtime::timer::delay_for(Duration::from_millis(50)).await;
gst::debug!(RUNTIME_CAT, "pause_from_loop: pause from handle_item");
match self.task.pause() {
@@ -2818,12 +2818,14 @@ mod tests {
#[test]
fn start_timer() {
+ use crate::runtime::timer;
+
// Purpose: make sure a Timer initialized in a transition is
// available when iterating in the loop.
gst::init().unwrap();
struct TaskTimerTest {
- timer: Option<crate::runtime::Timer>,
+ timer: Option<timer::Oneshot>,
timer_elapsed_sender: Option<oneshot::Sender<()>>,
}
@@ -2832,7 +2834,7 @@ mod tests {
fn start(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
async move {
- self.timer = Some(crate::runtime::time::delay_for(Duration::from_millis(50)));
+ self.timer = Some(crate::runtime::timer::delay_for(Duration::from_millis(50)));
gst::debug!(RUNTIME_CAT, "start_timer: started");
Ok(())
}
diff --git a/generic/threadshare/src/runtime/time.rs b/generic/threadshare/src/runtime/time.rs
deleted file mode 100644
index ce9d82fa6..000000000
--- a/generic/threadshare/src/runtime/time.rs
+++ /dev/null
@@ -1,42 +0,0 @@
-// Copyright (C) 2020-2021 François Laignel <fengalin@free.fr>
-//
-// Take a look at the license at the top of the repository in the LICENSE file.
-
-//! Wrappers for the underlying runtime specific time related Futures.
-
-use std::time::Duration;
-
-use super::executor::Timer;
-
-/// Wait until the given `delay` has elapsed.
-///
-/// This must be called from within the target runtime environment.
-///
-/// When throttling is activated (i.e. when using a non-`0` `wait`
-/// duration in `Context::acquire`), timer entries are assigned to
-/// the nearest time frame, meaning that the delay might elapse
-/// `wait` / 2 ms earlier or later than the expected instant.
-///
-/// Use [`delay_for_at_least`] when it's preferable not to return
-/// before the expected instant.
-pub fn delay_for(delay: Duration) -> Timer {
- Timer::after(delay)
-}
-
-/// Wait until at least the given `delay` has elapsed.
-///
-/// This must be called from within the target runtime environment.
-///
-/// See [`delay_for`] for details. This method won't return before
-/// the expected delay has elapsed.
-#[track_caller]
-pub fn delay_for_at_least(delay: Duration) -> Timer {
- Timer::after_at_least(delay)
-}
-
-/// Builds a `Stream` that yields at `interval`.
-///
-/// This must be called from within the target runtime environment.
-pub fn interval(interval: Duration) -> Timer {
- Timer::interval(interval)
-}