diff options
author | François Laignel <fengalin@free.fr> | 2021-10-09 13:17:05 +0300 |
---|---|---|
committer | François Laignel <fengalin@free.fr> | 2021-10-18 16:09:47 +0300 |
commit | 27b9f0d868f436e9b2bcc3e51f393c40b56fcc02 (patch) | |
tree | 93c0db7b1cf26ea7d0e3a4d70a7d2844c2e00975 /generic/threadshare | |
parent | bd8a7e8df7e8ebf751b2d00fe6a096d726683c00 (diff) |
Improve usability thanks to opt-ops
The crate option-operations simplifies usage when dealing with
`Option`s, which is often the case with `ClockTime`.
Diffstat (limited to 'generic/threadshare')
-rw-r--r-- | generic/threadshare/src/appsrc/imp.rs | 8 | ||||
-rw-r--r-- | generic/threadshare/src/inputselector/imp.rs | 18 | ||||
-rw-r--r-- | generic/threadshare/src/jitterbuffer/imp.rs | 23 | ||||
-rw-r--r-- | generic/threadshare/src/proxy/imp.rs | 3 | ||||
-rw-r--r-- | generic/threadshare/src/queue/imp.rs | 3 | ||||
-rw-r--r-- | generic/threadshare/src/runtime/task.rs | 8 | ||||
-rw-r--r-- | generic/threadshare/src/socket.rs | 6 | ||||
-rw-r--r-- | generic/threadshare/src/tcpclientsrc/imp.rs | 3 | ||||
-rw-r--r-- | generic/threadshare/src/udpsink/imp.rs | 18 | ||||
-rw-r--r-- | generic/threadshare/src/udpsrc/imp.rs | 3 |
10 files changed, 25 insertions, 68 deletions
diff --git a/generic/threadshare/src/appsrc/imp.rs b/generic/threadshare/src/appsrc/imp.rs index 2f5dd4284..a0d9cb3e4 100644 --- a/generic/threadshare/src/appsrc/imp.rs +++ b/generic/threadshare/src/appsrc/imp.rs @@ -38,8 +38,7 @@ use crate::runtime::prelude::*; use crate::runtime::{Context, PadSrc, PadSrcRef, PadSrcWeak, Task, TaskState}; const DEFAULT_CONTEXT: &str = ""; -// FIXME use Duration::ZERO when MSVC >= 1.53.2 -const DEFAULT_CONTEXT_WAIT: Duration = Duration::from_nanos(0); +const DEFAULT_CONTEXT_WAIT: Duration = Duration::ZERO; const DEFAULT_CAPS: Option<gst::Caps> = None; const DEFAULT_MAX_BUFFERS: u32 = 10; const DEFAULT_DO_TIMESTAMP: bool = false; @@ -391,10 +390,7 @@ impl AppSrc { let now = clock.time(); let buffer = buffer.make_mut(); - buffer.set_dts( - now.zip(base_time) - .and_then(|(now, base_time)| now.checked_sub(base_time)), - ); + buffer.set_dts(now.opt_checked_sub(base_time).ok().flatten()); buffer.set_pts(None); } else { gst_error!(CAT, obj: element, "Don't have a clock yet"); diff --git a/generic/threadshare/src/inputselector/imp.rs b/generic/threadshare/src/inputselector/imp.rs index 0bccc227b..121f882b9 100644 --- a/generic/threadshare/src/inputselector/imp.rs +++ b/generic/threadshare/src/inputselector/imp.rs @@ -35,8 +35,7 @@ use crate::runtime::prelude::*; use crate::runtime::{self, PadSink, PadSinkRef, PadSrc, PadSrcRef}; const DEFAULT_CONTEXT: &str = ""; -// FIXME use Duration::ZERO when MSVC >= 1.53.2 -const DEFAULT_CONTEXT_WAIT: Duration = Duration::from_nanos(0); +const DEFAULT_CONTEXT_WAIT: Duration = Duration::ZERO; #[derive(Debug, Clone)] struct Settings { @@ -82,13 +81,9 @@ impl InputSelectorPadSinkHandler { ) { let now = element.current_running_time(); - match running_time - .into() - .zip(now) - .and_then(|(running_time, now)| running_time.checked_sub(now)) - { - Some(delay) => runtime::time::delay_for(delay.into()).await, - None => runtime::executor::yield_now().await, + match running_time.into().opt_checked_sub(now) { + Ok(Some(delay)) => runtime::time::delay_for(delay.into()).await, + _ => runtime::executor::yield_now().await, } } @@ -315,10 +310,7 @@ impl PadSrcHandler for InputSelectorPadSrcHandler { let (live, min, max) = peer_query.result(); if live { min_latency = min.max(min_latency); - max_latency = max - .zip(max_latency) - .map(|(max, max_latency)| max.min(max_latency)) - .or(max); + max_latency = max.opt_min(max_latency).or(max); } } } diff --git a/generic/threadshare/src/jitterbuffer/imp.rs b/generic/threadshare/src/jitterbuffer/imp.rs index a27e1fa07..36209335b 100644 --- a/generic/threadshare/src/jitterbuffer/imp.rs +++ b/generic/threadshare/src/jitterbuffer/imp.rs @@ -212,10 +212,7 @@ impl SinkHandler { ) { if inner.ips_rtptime != Some(rtptime) { let pts = pts.into(); - let new_packet_spacing = inner - .ips_pts - .zip(pts) - .and_then(|(ips_pts, pts)| pts.checked_sub(ips_pts)); + let new_packet_spacing = pts.opt_checked_sub(inner.ips_pts).ok().flatten(); if let Some(new_packet_spacing) = new_packet_spacing { let old_packet_spacing = state.packet_spacing; @@ -744,7 +741,6 @@ impl SrcHandler { &[ ("seqnum", &(lost_seqnum as u32)), ("timestamp", ×tamp), - // FIXME would probably make sense being a ClockTime ("duration", &duration.nseconds()), ("retry", &0), ], @@ -864,8 +860,7 @@ impl SrcHandler { if state.eos { gst_debug!(CAT, obj: element, "EOS, not waiting"); - // FIXME use Duration::ZERO when MSVC >= 1.53.2 - return (now, Some((now, Duration::from_nanos(0)))); + return (now, Some((now, Duration::ZERO))); } if state.earliest_pts.is_none() { @@ -877,10 +872,8 @@ impl SrcHandler { .map(|earliest_pts| earliest_pts + latency - state.packet_spacing - context_wait / 2); let delay = next_wakeup - .zip(now) - .map_or(gst::ClockTime::ZERO, |(next_wakeup, now)| { - next_wakeup.saturating_sub(now) - }); + .opt_saturating_sub(now) + .unwrap_or(gst::ClockTime::ZERO); gst_debug!( CAT, @@ -1113,8 +1106,7 @@ impl TaskImpl for JitterBufferTask { ); let (delay_fut, abort_handle) = match next_wakeup { - // FIXME use Duration::ZERO when MSVC >= 1.53.2 - Some((_, delay)) if delay == Duration::from_nanos(0) => (None, None), + Some((_, delay)) if delay.is_zero() => (None, None), _ => { let (delay_fut, abort_handle) = abortable(async move { match next_wakeup { @@ -1166,10 +1158,7 @@ impl TaskImpl for JitterBufferTask { ); if let Some((next_wakeup, _)) = next_wakeup { - if next_wakeup - .zip(now) - .map_or(false, |(next_wakeup, now)| next_wakeup > now) - { + if next_wakeup.opt_gt(now).unwrap_or(false) { // Reschedule and wait a bit longer in the next iteration return Ok(()); } diff --git a/generic/threadshare/src/proxy/imp.rs b/generic/threadshare/src/proxy/imp.rs index 0e5904594..3262e8056 100644 --- a/generic/threadshare/src/proxy/imp.rs +++ b/generic/threadshare/src/proxy/imp.rs @@ -53,8 +53,7 @@ const DEFAULT_MAX_SIZE_BUFFERS: u32 = 200; const DEFAULT_MAX_SIZE_BYTES: u32 = 1024 * 1024; const DEFAULT_MAX_SIZE_TIME: gst::ClockTime = gst::ClockTime::SECOND; const DEFAULT_CONTEXT: &str = ""; -// FIXME use Duration::ZERO when MSVC >= 1.53.2 -const DEFAULT_CONTEXT_WAIT: Duration = Duration::from_nanos(0); +const DEFAULT_CONTEXT_WAIT: Duration = Duration::ZERO; #[derive(Debug, Clone)] struct SettingsSink { diff --git a/generic/threadshare/src/queue/imp.rs b/generic/threadshare/src/queue/imp.rs index ec8f89513..f9d375e02 100644 --- a/generic/threadshare/src/queue/imp.rs +++ b/generic/threadshare/src/queue/imp.rs @@ -40,8 +40,7 @@ const DEFAULT_MAX_SIZE_BUFFERS: u32 = 200; const DEFAULT_MAX_SIZE_BYTES: u32 = 1024 * 1024; const DEFAULT_MAX_SIZE_TIME: gst::ClockTime = gst::ClockTime::SECOND; const DEFAULT_CONTEXT: &str = ""; -// FIXME use Duration::ZERO when MSVC >= 1.53.2 -const DEFAULT_CONTEXT_WAIT: Duration = Duration::from_nanos(0); +const DEFAULT_CONTEXT_WAIT: Duration = Duration::ZERO; #[derive(Debug, Clone)] struct Settings { diff --git a/generic/threadshare/src/runtime/task.rs b/generic/threadshare/src/runtime/task.rs index aa6497959..1feade11b 100644 --- a/generic/threadshare/src/runtime/task.rs +++ b/generic/threadshare/src/runtime/task.rs @@ -1612,9 +1612,7 @@ mod tests { task.prepare(TaskPrepareTest { prepare_receiver }, context.clone()) .unwrap(); - // FIXME use Duration::ZERO when MSVC >= 1.53.2 - let start_ctx = - Context::acquire("prepare_start_ok_requester", Duration::from_nanos(0)).unwrap(); + let start_ctx = Context::acquire("prepare_start_ok_requester", Duration::ZERO).unwrap(); let task_clone = task.clone(); let (ready_sender, ready_receiver) = oneshot::channel(); let start_handle = start_ctx.spawn(async move { @@ -1737,9 +1735,7 @@ mod tests { ) .unwrap(); - // FIXME use Duration::ZERO when MSVC >= 1.53.2 - let start_ctx = - Context::acquire("prepare_start_error_requester", Duration::from_nanos(0)).unwrap(); + let start_ctx = Context::acquire("prepare_start_error_requester", Duration::ZERO).unwrap(); let task_clone = task.clone(); let (ready_sender, ready_receiver) = oneshot::channel(); let start_handle = start_ctx.spawn(async move { diff --git a/generic/threadshare/src/socket.rs b/generic/threadshare/src/socket.rs index 8d8da5caa..420e0f345 100644 --- a/generic/threadshare/src/socket.rs +++ b/generic/threadshare/src/socket.rs @@ -143,11 +143,7 @@ impl<T: SocketRead> Socket<T> { Ok((len, saddr)) => { let dts = if T::DO_TIMESTAMP { let time = self.clock.as_ref().unwrap().time(); - let running_time = time - .zip(self.base_time) - // TODO Do we want None if time < base_time - // or do we expect Some? - .and_then(|(time, base_time)| time.checked_sub(base_time)); + let running_time = time.opt_checked_sub(self.base_time).ok().flatten(); // FIXME maybe we should check if running_time.is_none // so as to display another message gst_debug!( diff --git a/generic/threadshare/src/tcpclientsrc/imp.rs b/generic/threadshare/src/tcpclientsrc/imp.rs index b832ee9fd..aaaa846cb 100644 --- a/generic/threadshare/src/tcpclientsrc/imp.rs +++ b/generic/threadshare/src/tcpclientsrc/imp.rs @@ -48,8 +48,7 @@ const DEFAULT_PORT: i32 = 4953; const DEFAULT_CAPS: Option<gst::Caps> = None; const DEFAULT_BLOCKSIZE: u32 = 4096; const DEFAULT_CONTEXT: &str = ""; -// FIXME use Duration::ZERO when MSVC >= 1.53.2 -const DEFAULT_CONTEXT_WAIT: Duration = Duration::from_nanos(0); +const DEFAULT_CONTEXT_WAIT: Duration = Duration::ZERO; #[derive(Debug, Clone)] struct Settings { diff --git a/generic/threadshare/src/udpsink/imp.rs b/generic/threadshare/src/udpsink/imp.rs index 2d6bcb8e2..3a9fe5577 100644 --- a/generic/threadshare/src/udpsink/imp.rs +++ b/generic/threadshare/src/udpsink/imp.rs @@ -62,8 +62,7 @@ const DEFAULT_TTL_MC: u32 = 1; const DEFAULT_QOS_DSCP: i32 = -1; const DEFAULT_CLIENTS: &str = ""; const DEFAULT_CONTEXT: &str = ""; -// FIXME use Duration::ZERO when MSVC >= 1.53.2 -const DEFAULT_CONTEXT_WAIT: Duration = Duration::from_nanos(0); +const DEFAULT_CONTEXT_WAIT: Duration = Duration::ZERO; #[derive(Debug, Clone)] struct Settings { @@ -412,10 +411,7 @@ impl UdpSinkPadHandler { rtime = segment .downcast_ref::<gst::format::Time>() .and_then(|segment| { - segment - .to_running_time(buffer.pts()) - .zip(inner.latency) - .map(|(rtime, latency)| rtime + latency) + segment.to_running_time(buffer.pts()).opt_add(inner.latency) }); } @@ -529,13 +525,9 @@ impl UdpSinkPadHandler { ) { let now = element.current_running_time(); - match running_time - .into() - .zip(now) - .and_then(|(running_time, now)| running_time.checked_sub(now)) - { - Some(delay) => runtime::time::delay_for(delay.into()).await, - None => runtime::executor::yield_now().await, + match running_time.into().opt_checked_sub(now) { + Ok(Some(delay)) => runtime::time::delay_for(delay.into()).await, + _ => runtime::executor::yield_now().await, } } diff --git a/generic/threadshare/src/udpsrc/imp.rs b/generic/threadshare/src/udpsrc/imp.rs index cccf7c17f..0318d1c48 100644 --- a/generic/threadshare/src/udpsrc/imp.rs +++ b/generic/threadshare/src/udpsrc/imp.rs @@ -48,8 +48,7 @@ const DEFAULT_MTU: u32 = 1492; const DEFAULT_SOCKET: Option<GioSocketWrapper> = None; const DEFAULT_USED_SOCKET: Option<GioSocketWrapper> = None; const DEFAULT_CONTEXT: &str = ""; -// FIXME use Duration::ZERO when MSVC >= 1.53.2 -const DEFAULT_CONTEXT_WAIT: Duration = Duration::from_nanos(0); +const DEFAULT_CONTEXT_WAIT: Duration = Duration::ZERO; const DEFAULT_RETRIEVE_SENDER_ADDRESS: bool = true; #[derive(Debug, Clone)] |