diff options
author | François Laignel <fengalin@free.fr> | 2021-10-09 13:17:05 +0300 |
---|---|---|
committer | François Laignel <fengalin@free.fr> | 2021-10-18 16:09:47 +0300 |
commit | 27b9f0d868f436e9b2bcc3e51f393c40b56fcc02 (patch) | |
tree | 93c0db7b1cf26ea7d0e3a4d70a7d2844c2e00975 | |
parent | bd8a7e8df7e8ebf751b2d00fe6a096d726683c00 (diff) |
Improve usability thanks to opt-ops
The crate option-operations simplifies usage when dealing with
`Option`s, which is often the case with `ClockTime`.
29 files changed, 175 insertions, 308 deletions
diff --git a/audio/audiofx/src/audioloudnorm/imp.rs b/audio/audiofx/src/audioloudnorm/imp.rs index 829258dd8..e0512a618 100644 --- a/audio/audiofx/src/audioloudnorm/imp.rs +++ b/audio/audiofx/src/audioloudnorm/imp.rs @@ -235,7 +235,9 @@ impl State { let distance_ts = distance_samples .mul_div_floor(*gst::ClockTime::SECOND, self.info.rate() as u64) .map(gst::ClockTime::from_nseconds); - let pts = pts.zip(distance_ts).map(|(pts, dist)| pts + dist); + let pts = pts + .opt_checked_add(distance_ts) + .map_err(|_| gst::FlowError::Error)?; let inbuf = self .adapter @@ -276,7 +278,9 @@ impl State { let distance_ts = distance_samples .mul_div_floor(*gst::ClockTime::SECOND, self.info.rate() as u64) .map(gst::ClockTime::from_nseconds); - let pts = pts.zip(distance_ts).map(|(pts, dist)| pts + dist); + let pts = pts + .opt_checked_add(distance_ts) + .map_err(|_| gst::FlowError::Error)?; let mut _mapped_inbuf = None; let src = if self.adapter.available() > 0 { @@ -1702,7 +1706,7 @@ impl AudioLoudNorm { q.set( live, min_latency + 3 * gst::ClockTime::SECOND, - max_latency.map(|max| max + 3 * gst::ClockTime::SECOND), + max_latency.opt_add(3 * gst::ClockTime::SECOND), ); true } else { diff --git a/audio/audiofx/src/audiornnoise/imp.rs b/audio/audiofx/src/audiornnoise/imp.rs index 33ae01b84..6a75658b8 100644 --- a/audio/audiofx/src/audiornnoise/imp.rs +++ b/audio/audiofx/src/audiornnoise/imp.rs @@ -74,8 +74,9 @@ impl State { // pts at the beginning of the adapter. let samples = distance / self.in_info.bpf() as u64; prev_pts - .zip(self.samples_to_time(samples)) - .map(|(prev_pts, time_offset)| prev_pts + time_offset) + .opt_checked_add(self.samples_to_time(samples)) + .ok() + .flatten() } fn needs_more_data(&self) -> bool { @@ -367,8 +368,7 @@ impl BaseTransformImpl for AudioRNNoise { ); min += gst::ClockTime::from_seconds((FRAME_SIZE / 48000) as u64); - max = max - .map(|max| max + gst::ClockTime::from_seconds((FRAME_SIZE / 48000) as u64)); + max = max.opt_add(gst::ClockTime::from_seconds((FRAME_SIZE / 48000) as u64)); q.set(live, min, max); return true; } diff --git a/audio/audiofx/src/ebur128level/imp.rs b/audio/audiofx/src/ebur128level/imp.rs index c16410735..18fd5df10 100644 --- a/audio/audiofx/src/ebur128level/imp.rs +++ b/audio/audiofx/src/ebur128level/imp.rs @@ -481,12 +481,12 @@ impl BaseTransformImpl for EbuR128Level { // The timestamp we report in messages is always the timestamp until which measurements // are included, not the starting timestamp. - timestamp = timestamp.map(|ts| { - ts + to_process + timestamp = timestamp.opt_add( + to_process .mul_div_floor(*gst::ClockTime::SECOND, state.info.rate() as u64) .map(gst::ClockTime::from_nseconds) - .unwrap() - }); + .unwrap(), + ); // Post a message whenever an interval is full if state.interval_frames_remaining.is_zero() { diff --git a/audio/csound/src/filter/imp.rs b/audio/csound/src/filter/imp.rs index b3c649ae6..c6f479c9c 100644 --- a/audio/csound/src/filter/imp.rs +++ b/audio/csound/src/filter/imp.rs @@ -120,8 +120,9 @@ impl State { // pts at the beginning of the adapter. let samples = distance / self.in_info.bpf() as u64; prev_pts - .zip(self.samples_to_time(samples)) - .map(|(prev_pts, time_offset)| prev_pts + time_offset) + .opt_checked_add(self.samples_to_time(samples)) + .ok() + .flatten() } fn buffer_duration(&self, buffer_size: u64) -> Option<gst::ClockTime> { diff --git a/audio/csound/tests/csound_filter.rs b/audio/csound/tests/csound_filter.rs index 702d47e54..54f856a2c 100644 --- a/audio/csound/tests/csound_filter.rs +++ b/audio/csound/tests/csound_filter.rs @@ -181,7 +181,7 @@ fn csound_filter_eos() { let samples_at_eos = (EOS_NUM_BUFFERS * EOS_NUM_SAMPLES) % ksmps; assert_eq!( buffer.as_ref().pts(), - duration_from_samples(samples_at_eos as _, sr as _).map(|duration| in_pts - duration) + in_pts.opt_sub(duration_from_samples(samples_at_eos as _, sr as _)) ); let map = buffer.into_mapped_buffer_readable().unwrap(); diff --git a/generic/threadshare/src/appsrc/imp.rs b/generic/threadshare/src/appsrc/imp.rs index 2f5dd4284..a0d9cb3e4 100644 --- a/generic/threadshare/src/appsrc/imp.rs +++ b/generic/threadshare/src/appsrc/imp.rs @@ -38,8 +38,7 @@ use crate::runtime::prelude::*; use crate::runtime::{Context, PadSrc, PadSrcRef, PadSrcWeak, Task, TaskState}; const DEFAULT_CONTEXT: &str = ""; -// FIXME use Duration::ZERO when MSVC >= 1.53.2 -const DEFAULT_CONTEXT_WAIT: Duration = Duration::from_nanos(0); +const DEFAULT_CONTEXT_WAIT: Duration = Duration::ZERO; const DEFAULT_CAPS: Option<gst::Caps> = None; const DEFAULT_MAX_BUFFERS: u32 = 10; const DEFAULT_DO_TIMESTAMP: bool = false; @@ -391,10 +390,7 @@ impl AppSrc { let now = clock.time(); let buffer = buffer.make_mut(); - buffer.set_dts( - now.zip(base_time) - .and_then(|(now, base_time)| now.checked_sub(base_time)), - ); + buffer.set_dts(now.opt_checked_sub(base_time).ok().flatten()); buffer.set_pts(None); } else { gst_error!(CAT, obj: element, "Don't have a clock yet"); diff --git a/generic/threadshare/src/inputselector/imp.rs b/generic/threadshare/src/inputselector/imp.rs index 0bccc227b..121f882b9 100644 --- a/generic/threadshare/src/inputselector/imp.rs +++ b/generic/threadshare/src/inputselector/imp.rs @@ -35,8 +35,7 @@ use crate::runtime::prelude::*; use crate::runtime::{self, PadSink, PadSinkRef, PadSrc, PadSrcRef}; const DEFAULT_CONTEXT: &str = ""; -// FIXME use Duration::ZERO when MSVC >= 1.53.2 -const DEFAULT_CONTEXT_WAIT: Duration = Duration::from_nanos(0); +const DEFAULT_CONTEXT_WAIT: Duration = Duration::ZERO; #[derive(Debug, Clone)] struct Settings { @@ -82,13 +81,9 @@ impl InputSelectorPadSinkHandler { ) { let now = element.current_running_time(); - match running_time - .into() - .zip(now) - .and_then(|(running_time, now)| running_time.checked_sub(now)) - { - Some(delay) => runtime::time::delay_for(delay.into()).await, - None => runtime::executor::yield_now().await, + match running_time.into().opt_checked_sub(now) { + Ok(Some(delay)) => runtime::time::delay_for(delay.into()).await, + _ => runtime::executor::yield_now().await, } } @@ -315,10 +310,7 @@ impl PadSrcHandler for InputSelectorPadSrcHandler { let (live, min, max) = peer_query.result(); if live { min_latency = min.max(min_latency); - max_latency = max - .zip(max_latency) - .map(|(max, max_latency)| max.min(max_latency)) - .or(max); + max_latency = max.opt_min(max_latency).or(max); } } } diff --git a/generic/threadshare/src/jitterbuffer/imp.rs b/generic/threadshare/src/jitterbuffer/imp.rs index a27e1fa07..36209335b 100644 --- a/generic/threadshare/src/jitterbuffer/imp.rs +++ b/generic/threadshare/src/jitterbuffer/imp.rs @@ -212,10 +212,7 @@ impl SinkHandler { ) { if inner.ips_rtptime != Some(rtptime) { let pts = pts.into(); - let new_packet_spacing = inner - .ips_pts - .zip(pts) - .and_then(|(ips_pts, pts)| pts.checked_sub(ips_pts)); + let new_packet_spacing = pts.opt_checked_sub(inner.ips_pts).ok().flatten(); if let Some(new_packet_spacing) = new_packet_spacing { let old_packet_spacing = state.packet_spacing; @@ -744,7 +741,6 @@ impl SrcHandler { &[ ("seqnum", &(lost_seqnum as u32)), ("timestamp", ×tamp), - // FIXME would probably make sense being a ClockTime ("duration", &duration.nseconds()), ("retry", &0), ], @@ -864,8 +860,7 @@ impl SrcHandler { if state.eos { gst_debug!(CAT, obj: element, "EOS, not waiting"); - // FIXME use Duration::ZERO when MSVC >= 1.53.2 - return (now, Some((now, Duration::from_nanos(0)))); + return (now, Some((now, Duration::ZERO))); } if state.earliest_pts.is_none() { @@ -877,10 +872,8 @@ impl SrcHandler { .map(|earliest_pts| earliest_pts + latency - state.packet_spacing - context_wait / 2); let delay = next_wakeup - .zip(now) - .map_or(gst::ClockTime::ZERO, |(next_wakeup, now)| { - next_wakeup.saturating_sub(now) - }); + .opt_saturating_sub(now) + .unwrap_or(gst::ClockTime::ZERO); gst_debug!( CAT, @@ -1113,8 +1106,7 @@ impl TaskImpl for JitterBufferTask { ); let (delay_fut, abort_handle) = match next_wakeup { - // FIXME use Duration::ZERO when MSVC >= 1.53.2 - Some((_, delay)) if delay == Duration::from_nanos(0) => (None, None), + Some((_, delay)) if delay.is_zero() => (None, None), _ => { let (delay_fut, abort_handle) = abortable(async move { match next_wakeup { @@ -1166,10 +1158,7 @@ impl TaskImpl for JitterBufferTask { ); if let Some((next_wakeup, _)) = next_wakeup { - if next_wakeup - .zip(now) - .map_or(false, |(next_wakeup, now)| next_wakeup > now) - { + if next_wakeup.opt_gt(now).unwrap_or(false) { // Reschedule and wait a bit longer in the next iteration return Ok(()); } diff --git a/generic/threadshare/src/proxy/imp.rs b/generic/threadshare/src/proxy/imp.rs index 0e5904594..3262e8056 100644 --- a/generic/threadshare/src/proxy/imp.rs +++ b/generic/threadshare/src/proxy/imp.rs @@ -53,8 +53,7 @@ const DEFAULT_MAX_SIZE_BUFFERS: u32 = 200; const DEFAULT_MAX_SIZE_BYTES: u32 = 1024 * 1024; const DEFAULT_MAX_SIZE_TIME: gst::ClockTime = gst::ClockTime::SECOND; const DEFAULT_CONTEXT: &str = ""; -// FIXME use Duration::ZERO when MSVC >= 1.53.2 -const DEFAULT_CONTEXT_WAIT: Duration = Duration::from_nanos(0); +const DEFAULT_CONTEXT_WAIT: Duration = Duration::ZERO; #[derive(Debug, Clone)] struct SettingsSink { diff --git a/generic/threadshare/src/queue/imp.rs b/generic/threadshare/src/queue/imp.rs index ec8f89513..f9d375e02 100644 --- a/generic/threadshare/src/queue/imp.rs +++ b/generic/threadshare/src/queue/imp.rs @@ -40,8 +40,7 @@ const DEFAULT_MAX_SIZE_BUFFERS: u32 = 200; const DEFAULT_MAX_SIZE_BYTES: u32 = 1024 * 1024; const DEFAULT_MAX_SIZE_TIME: gst::ClockTime = gst::ClockTime::SECOND; const DEFAULT_CONTEXT: &str = ""; -// FIXME use Duration::ZERO when MSVC >= 1.53.2 -const DEFAULT_CONTEXT_WAIT: Duration = Duration::from_nanos(0); +const DEFAULT_CONTEXT_WAIT: Duration = Duration::ZERO; #[derive(Debug, Clone)] struct Settings { diff --git a/generic/threadshare/src/runtime/task.rs b/generic/threadshare/src/runtime/task.rs index aa6497959..1feade11b 100644 --- a/generic/threadshare/src/runtime/task.rs +++ b/generic/threadshare/src/runtime/task.rs @@ -1612,9 +1612,7 @@ mod tests { task.prepare(TaskPrepareTest { prepare_receiver }, context.clone()) .unwrap(); - // FIXME use Duration::ZERO when MSVC >= 1.53.2 - let start_ctx = - Context::acquire("prepare_start_ok_requester", Duration::from_nanos(0)).unwrap(); + let start_ctx = Context::acquire("prepare_start_ok_requester", Duration::ZERO).unwrap(); let task_clone = task.clone(); let (ready_sender, ready_receiver) = oneshot::channel(); let start_handle = start_ctx.spawn(async move { @@ -1737,9 +1735,7 @@ mod tests { ) .unwrap(); - // FIXME use Duration::ZERO when MSVC >= 1.53.2 - let start_ctx = - Context::acquire("prepare_start_error_requester", Duration::from_nanos(0)).unwrap(); + let start_ctx = Context::acquire("prepare_start_error_requester", Duration::ZERO).unwrap(); let task_clone = task.clone(); let (ready_sender, ready_receiver) = oneshot::channel(); let start_handle = start_ctx.spawn(async move { diff --git a/generic/threadshare/src/socket.rs b/generic/threadshare/src/socket.rs index 8d8da5caa..420e0f345 100644 --- a/generic/threadshare/src/socket.rs +++ b/generic/threadshare/src/socket.rs @@ -143,11 +143,7 @@ impl<T: SocketRead> Socket<T> { Ok((len, saddr)) => { let dts = if T::DO_TIMESTAMP { let time = self.clock.as_ref().unwrap().time(); - let running_time = time - .zip(self.base_time) - // TODO Do we want None if time < base_time - // or do we expect Some? - .and_then(|(time, base_time)| time.checked_sub(base_time)); + let running_time = time.opt_checked_sub(self.base_time).ok().flatten(); // FIXME maybe we should check if running_time.is_none // so as to display another message gst_debug!( diff --git a/generic/threadshare/src/tcpclientsrc/imp.rs b/generic/threadshare/src/tcpclientsrc/imp.rs index b832ee9fd..aaaa846cb 100644 --- a/generic/threadshare/src/tcpclientsrc/imp.rs +++ b/generic/threadshare/src/tcpclientsrc/imp.rs @@ -48,8 +48,7 @@ const DEFAULT_PORT: i32 = 4953; const DEFAULT_CAPS: Option<gst::Caps> = None; const DEFAULT_BLOCKSIZE: u32 = 4096; const DEFAULT_CONTEXT: &str = ""; -// FIXME use Duration::ZERO when MSVC >= 1.53.2 -const DEFAULT_CONTEXT_WAIT: Duration = Duration::from_nanos(0); +const DEFAULT_CONTEXT_WAIT: Duration = Duration::ZERO; #[derive(Debug, Clone)] struct Settings { diff --git a/generic/threadshare/src/udpsink/imp.rs b/generic/threadshare/src/udpsink/imp.rs index 2d6bcb8e2..3a9fe5577 100644 --- a/generic/threadshare/src/udpsink/imp.rs +++ b/generic/threadshare/src/udpsink/imp.rs @@ -62,8 +62,7 @@ const DEFAULT_TTL_MC: u32 = 1; const DEFAULT_QOS_DSCP: i32 = -1; const DEFAULT_CLIENTS: &str = ""; const DEFAULT_CONTEXT: &str = ""; -// FIXME use Duration::ZERO when MSVC >= 1.53.2 -const DEFAULT_CONTEXT_WAIT: Duration = Duration::from_nanos(0); +const DEFAULT_CONTEXT_WAIT: Duration = Duration::ZERO; #[derive(Debug, Clone)] struct Settings { @@ -412,10 +411,7 @@ impl UdpSinkPadHandler { rtime = segment .downcast_ref::<gst::format::Time>() .and_then(|segment| { - segment - .to_running_time(buffer.pts()) - .zip(inner.latency) - .map(|(rtime, latency)| rtime + latency) + segment.to_running_time(buffer.pts()).opt_add(inner.latency) }); } @@ -529,13 +525,9 @@ impl UdpSinkPadHandler { ) { let now = element.current_running_time(); - match running_time - .into() - .zip(now) - .and_then(|(running_time, now)| running_time.checked_sub(now)) - { - Some(delay) => runtime::time::delay_for(delay.into()).await, - None => runtime::executor::yield_now().await, + match running_time.into().opt_checked_sub(now) { + Ok(Some(delay)) => runtime::time::delay_for(delay.into()).await, + _ => runtime::executor::yield_now().await, } } diff --git a/generic/threadshare/src/udpsrc/imp.rs b/generic/threadshare/src/udpsrc/imp.rs index cccf7c17f..0318d1c48 100644 --- a/generic/threadshare/src/udpsrc/imp.rs +++ b/generic/threadshare/src/udpsrc/imp.rs @@ -48,8 +48,7 @@ const DEFAULT_MTU: u32 = 1492; const DEFAULT_SOCKET: Option<GioSocketWrapper> = None; const DEFAULT_USED_SOCKET: Option<GioSocketWrapper> = None; const DEFAULT_CONTEXT: &str = ""; -// FIXME use Duration::ZERO when MSVC >= 1.53.2 -const DEFAULT_CONTEXT_WAIT: Duration = Duration::from_nanos(0); +const DEFAULT_CONTEXT_WAIT: Duration = Duration::ZERO; const DEFAULT_RETRIEVE_SENDER_ADDRESS: bool = true; #[derive(Debug, Clone)] diff --git a/net/rusoto/src/aws_transcriber/imp.rs b/net/rusoto/src/aws_transcriber/imp.rs index 07a3843e6..1ce15009d 100644 --- a/net/rusoto/src/aws_transcriber/imp.rs +++ b/net/rusoto/src/aws_transcriber/imp.rs @@ -760,9 +760,7 @@ impl Transcriber { let running_time = state.in_segment.to_running_time(buffer.pts()); let now = element.current_running_time(); - delay = running_time - .zip(now) - .and_then(|(running_time, now)| running_time.checked_sub(now)); + delay = running_time.opt_checked_sub(now).ok().flatten(); } } diff --git a/text/json/src/jsongstparse/imp.rs b/text/json/src/jsongstparse/imp.rs index 8d5890715..5abae084d 100644 --- a/text/json/src/jsongstparse/imp.rs +++ b/text/json/src/jsongstparse/imp.rs @@ -194,7 +194,7 @@ impl State { ) { let buffer = buffer.get_mut().unwrap(); - self.last_position = pts.zip(duration).map(|(pts, duration)| pts + duration); + self.last_position = pts.opt_add(duration); buffer.set_pts(pts); @@ -263,7 +263,7 @@ impl JsonGstParse { let mut buffer = gst::Buffer::from_mut_slice(data.into_bytes()); if let Some(last_position) = state.last_position { - if let Some(duration) = pts.map(|pts| pts.checked_sub(last_position)) { + if let Ok(Some(duration)) = pts.opt_checked_sub(last_position) { events.push( gst::event::Gap::builder(last_position) .duration(duration) @@ -274,12 +274,11 @@ impl JsonGstParse { state.add_buffer_metadata(element, &mut buffer, pts, duration); - let send_eos = state - .segment - .stop() - .zip(buffer.pts()) - .zip(buffer.duration()) - .map_or(false, |((stop, pts), duration)| pts + duration >= stop); + let send_eos = buffer + .pts() + .opt_add(buffer.duration()) + .opt_ge(state.segment.stop()) + .unwrap_or(false); // Drop our state mutex while we push out buffers or events drop(state); @@ -346,11 +345,7 @@ impl JsonGstParse { pts: impl Into<Option<gst::ClockTime>>, mut state: MutexGuard<State>, ) -> MutexGuard<State> { - if pts - .into() - .zip(state.segment.start()) - .map_or(false, |(pts, start)| pts >= start) - { + if pts.into().opt_ge(state.segment.start()).unwrap_or(false) { state.seeking = false; state.discont = true; state.replay_last_line = true; @@ -503,7 +498,7 @@ impl JsonGstParse { data: _data, }) = serde_json::from_slice(line) { - last_pts = pts.zip(duration).map(|(pts, duration)| pts + duration); + last_pts = pts.opt_add(duration); } } @@ -757,17 +752,11 @@ impl JsonGstParse { let pull = state.pull.as_ref().unwrap(); if start_type == gst::SeekType::Set { - start = start - .zip(pull.duration) - .map(|(start, duration)| start.min(duration)) - .or(start); + start = start.opt_min(pull.duration).or(start); } if stop_type == gst::SeekType::Set { - stop = stop - .zip(pull.duration) - .map(|(stop, duration)| stop.min(duration)) - .or(stop); + stop = stop.opt_min(pull.duration).or(stop); } state.seeking = true; diff --git a/text/wrap/src/gsttextwrap/imp.rs b/text/wrap/src/gsttextwrap/imp.rs index d01e310aa..ef6bbeb07 100644 --- a/text/wrap/src/gsttextwrap/imp.rs +++ b/text/wrap/src/gsttextwrap/imp.rs @@ -203,10 +203,9 @@ impl TextWrap { let add_buffer = state .start_ts - .zip(Some(accumulate_time)) - .map_or(false, |(start_ts, accumulate_time)| { - start_ts + accumulate_time < pts - }); + .opt_add(accumulate_time) + .opt_lt(pts) + .unwrap_or(false); if add_buffer { let mut buf = @@ -214,12 +213,8 @@ impl TextWrap { { let buf_mut = buf.get_mut().unwrap(); buf_mut.set_pts(state.start_ts); - buf_mut.set_duration( - state - .end_ts - .zip(state.start_ts) - .and_then(|(end_ts, start_ts)| end_ts.checked_sub(start_ts)), - ); + buf_mut + .set_duration(state.end_ts.opt_checked_sub(state.start_ts).ok().flatten()); } bufferlist.get_mut().unwrap().add(buf); @@ -262,10 +257,7 @@ impl TextWrap { .collect::<Vec<String>>() .join("\n"); } else { - let duration = state - .end_ts - .zip(state.start_ts) - .and_then(|(end_ts, start_ts)| end_ts.checked_sub(start_ts)); + let duration = state.end_ts.opt_checked_sub(state.start_ts).ok().flatten(); let contents = chunk .iter() .map(|l| l.to_string()) @@ -291,7 +283,7 @@ impl TextWrap { } current_text = trailing; - state.end_ts = state.end_ts.map(|end_ts| end_ts + duration_per_word); + state.end_ts = state.end_ts.opt_add(duration_per_word); } state.current_text = current_text; @@ -399,10 +391,7 @@ impl TextWrap { let buf_mut = buf.get_mut().unwrap(); buf_mut.set_pts(state.start_ts); buf_mut.set_duration( - state - .end_ts - .zip(state.start_ts) - .and_then(|(end_ts, start_ts)| end_ts.checked_sub(start_ts)), + state.end_ts.opt_checked_sub(state.start_ts).ok().flatten(), ); } diff --git a/tutorial/src/sinesrc/imp.rs b/tutorial/src/sinesrc/imp.rs index 1fd22dd6f..bc03ad218 100644 --- a/tutorial/src/sinesrc/imp.rs +++ b/tutorial/src/sinesrc/imp.rs @@ -770,19 +770,11 @@ impl PushSrcImpl for SineSrc { let segment = element.segment().downcast::<gst::format::Time>().unwrap(); let base_time = element.base_time(); - let running_time = segment.to_running_time( - buffer - .pts() - .zip(buffer.duration()) - .map(|(pts, duration)| pts + duration), - ); + let running_time = segment.to_running_time(buffer.pts().opt_add(buffer.duration())); // The last sample's clock time is the base time of the element plus the // running time of the last sample - let wait_until = match running_time - .zip(base_time) - .map(|(running_time, base_time)| running_time + base_time) - { + let wait_until = match running_time.opt_add(base_time) { Some(wait_until) => wait_until, None => return Ok(CreateSuccess::NewBuffer(buffer)), }; diff --git a/tutorial/tutorial-2.md b/tutorial/tutorial-2.md index 983cbc265..26d55ef06 100644 --- a/tutorial/tutorial-2.md +++ b/tutorial/tutorial-2.md @@ -703,16 +703,12 @@ For working in live mode, we have to add a few different parts in various places let running_time = segment.to_running_time( buffer .pts() - .zip(buffer.duration()) - .map(|(pts, duration)| pts + duration), + .opt_add(buffer.duration()), ); // The last sample's clock time is the base time of the element plus the // running time of the last sample - let wait_until = match running_time - .zip(base_time) - .map(|(running_time, base_time)| running_time + base_time) - { + let wait_until = match running_time.opt_add(base_time) { Some(wait_until) => wait_until, None => return Ok(buffer), }; diff --git a/utils/fallbackswitch/src/fallbackswitch/imp.rs b/utils/fallbackswitch/src/fallbackswitch/imp.rs index 816a58fee..b991fc669 100644 --- a/utils/fallbackswitch/src/fallbackswitch/imp.rs +++ b/utils/fallbackswitch/src/fallbackswitch/imp.rs @@ -200,8 +200,8 @@ impl FallbackSwitch { if pts.is_none() || new_running_time - .zip(target_running_time.into()) - .map_or(false, |(new, target)| new <= target) + .opt_le(target_running_time.into()) + .unwrap_or(false) { gst_debug!(CAT, obj: pad, "Dropping trailing buffer {:?}", buffer); pad.drop_buffer(); @@ -269,16 +269,13 @@ impl FallbackSwitch { } let cur_running_time = cur_running_time.into(); - let (is_late, deadline) = cur_running_time - .zip(agg.latency()) - .zip(running_time) - .map_or( - (false, None), - |((cur_running_time, latency), running_time)| { - let dealine = running_time + latency + 40 * gst::ClockTime::MSECOND; - (cur_running_time > dealine, Some(dealine)) - }, - ); + let (is_late, deadline) = match (cur_running_time, agg.latency(), running_time) { + (Some(cur_running_time), Some(latency), Some(running_time)) => { + let deadline = running_time + latency + 40 * gst::ClockTime::MSECOND; + (cur_running_time > deadline, Some(deadline)) + } + _ => (false, None), + }; if is_late { gst_debug!( @@ -289,13 +286,12 @@ impl FallbackSwitch { deadline.display(), ); - let is_late = state.last_output_time.zip(running_time).map_or( - false, - |(last_output_time, running_time)| { - last_output_time + settings.timeout <= running_time - }, - ); - if is_late { + let is_late = state + .last_output_time + .opt_add(settings.timeout) + .opt_le(running_time); + + if let Some(true) = is_late { /* This buffer arrived too late - we either already switched * to the other pad or there's no point outputting this anyway */ gst_debug!( @@ -418,12 +414,11 @@ impl FallbackSwitch { }; if !ignore_timeout { - let timed_out = !state.last_output_time.zip(running_time).map_or( - false, - |(last_output_time, running_time)| { - last_output_time + settings.timeout > running_time - }, - ); + let timed_out = state + .last_output_time + .opt_add(settings.timeout) + .opt_le(running_time) + .unwrap_or(true); // Get the next one if this one is before the timeout if !timed_out { @@ -544,10 +539,7 @@ impl FallbackSwitch { let base_time = agg.base_time(); let cur_running_time = if let Some(clock) = clock { - clock - .time() - .zip(base_time) - .and_then(|(time, base_time)| time.checked_sub(base_time)) + clock.time().opt_checked_sub(base_time).ok().flatten() } else { gst::ClockTime::NONE }; @@ -1168,16 +1160,13 @@ impl AggregatorImpl for FallbackSwitch { audio_info.bpf(), ) } else if pad_state.video_info.is_some() { - let stop = pts.zip(duration).map(|(pts, duration)| pts + duration); + let stop = pts.opt_add(duration); segment.clip(pts, stop).map(|(start, stop)| { { let buffer = buffer.make_mut(); buffer.set_pts(start); if duration.is_some() { - buffer.set_duration( - stop.zip(start) - .and_then(|(stop, start)| stop.checked_sub(start)), - ); + buffer.set_duration(stop.opt_checked_sub(start).ok().flatten()); } } diff --git a/utils/togglerecord/src/togglerecord/imp.rs b/utils/togglerecord/src/togglerecord/imp.rs index f9f8fec7b..da86174da 100644 --- a/utils/togglerecord/src/togglerecord/imp.rs +++ b/utils/togglerecord/src/togglerecord/imp.rs @@ -210,7 +210,7 @@ impl HandleData for (gst::ClockTime, Option<gst::ClockTime>) { segment.clip(self.0, stop).map(|(start, stop)| { let start = start.expect("provided a defined value"); - (start, stop.map(|stop| stop - start)) + (start, stop.opt_sub(start)) }) } } @@ -306,9 +306,7 @@ impl HandleData for gst::Buffer { let buffer = self.make_mut(); buffer.set_pts(start); buffer.set_duration( - stop.zip(start) - .and_then(|(stop, start)| stop.checked_sub(start)), - // FIXME we could expect here + stop.opt_checked_sub(start).ok().flatten(), // FIXME we could expect here ); } @@ -370,7 +368,6 @@ impl ToggleRecord { }; // This will only do anything for non-raw data - // FIXME comment why we can unwrap dts_or_pts = state.in_segment.start().unwrap().max(dts_or_pts); dts_or_pts_end = state.in_segment.start().unwrap().max(dts_or_pts_end); if let Some(stop) = state.in_segment.stop() { @@ -380,13 +377,12 @@ impl ToggleRecord { let current_running_time = state.in_segment.to_running_time(dts_or_pts); let current_running_time_end = state.in_segment.to_running_time(dts_or_pts_end); + state.current_running_time = current_running_time - .zip(state.current_running_time) - .map(|(cur_rt, state_rt)| cur_rt.max(state_rt)) + .opt_max(state.current_running_time) .or(current_running_time); state.current_running_time_end = current_running_time_end - .zip(state.current_running_time_end) - .map(|(cur_rt_end, state_rt_end)| cur_rt_end.max(state_rt_end)) + .opt_max(state.current_running_time_end) .or(current_running_time_end); // FIXME we should probably return if either current_running_time or current_running_time_end @@ -455,8 +451,9 @@ impl ToggleRecord { rec_state.last_recording_stop = current_running_time; let last_recording_duration = rec_state .last_recording_stop - .zip(rec_state.last_recording_start) - .and_then(|(stop, start)| stop.checked_sub(start)); + .opt_checked_sub(rec_state.last_recording_start) + .ok() + .flatten(); gst_debug!( CAT, obj: pad, @@ -477,8 +474,8 @@ impl ToggleRecord { let s = s.state.lock(); s.eos || s.current_running_time - .zip(current_running_time) - .map_or(false, |(s_cur_rt, cur_rt)| s_cur_rt >= cur_rt) + .opt_ge(current_running_time) + .unwrap_or(false) }) { gst_log!(CAT, obj: pad, "Waiting for other streams to stop"); @@ -570,8 +567,8 @@ impl ToggleRecord { let s = s.state.lock(); s.eos || s.current_running_time - .zip(current_running_time) - .map_or(false, |(s_cur_rt, cur_rt)| s_cur_rt >= cur_rt) + .opt_ge(current_running_time) + .unwrap_or(false) }) { gst_log!(CAT, obj: pad, "Waiting for other streams to start"); @@ -649,7 +646,6 @@ impl ToggleRecord { }; // This will only do anything for non-raw data - // FIXME comment why we can unwrap pts = state.in_segment.start().unwrap().max(pts); pts_end = state.in_segment.start().unwrap().max(pts_end); if let Some(stop) = state.in_segment.stop() { @@ -660,12 +656,10 @@ impl ToggleRecord { let current_running_time = state.in_segment.to_running_time(pts); let current_running_time_end = state.in_segment.to_running_time(pts_end); state.current_running_time = current_running_time - .zip(state.current_running_time) - .map(|(cur_rt, state_rt)| cur_rt.max(state_rt)) + .opt_max(state.current_running_time) .or(current_running_time); state.current_running_time_end = current_running_time_end - .zip(state.current_running_time_end) - .map(|(cur_rt_end, state_rt_end)| cur_rt_end.max(state_rt_end)) + .opt_max(state.current_running_time_end) .or(current_running_time_end); gst_log!( @@ -703,18 +697,20 @@ impl ToggleRecord { && rec_state.recording_state != RecordingState::Stopping && main_state .current_running_time_end - .zip(current_running_time_end) - .map_or(false, |(main_rt_end, cur_rt_end)| main_rt_end < cur_rt_end) + .opt_lt(current_running_time_end) + .unwrap_or(false) || rec_state.recording_state == RecordingState::Starting - && rec_state - .last_recording_start - .map_or(true, |last_rec_start| { - current_running_time.map_or(false, |cur_rt| last_rec_start <= cur_rt) - }) + && (rec_state.last_recording_start.is_none() + || rec_state + .last_recording_start + .opt_le(current_running_time) + .unwrap_or(false)) || rec_state.recording_state == RecordingState::Stopping - && rec_state.last_recording_stop.map_or(true, |last_rec_stop| { - current_running_time.map_or(false, |cur_rt| last_rec_stop <= cur_rt) - })) + && (rec_state.last_recording_stop.is_none() + || rec_state + .last_recording_stop + .opt_le(current_running_time) + .unwrap_or(false))) && !main_state.eos && !state.flushing { @@ -800,7 +796,10 @@ impl ToggleRecord { gst_warning!(CAT, obj: pad, "Complete buffer clipped!"); return Ok(HandleResult::Drop); } - } else if current_running_time.map_or(false, |cur_rt| cur_rt < last_recording_start) { + } else if current_running_time + .opt_lt(last_recording_start) + .unwrap_or(false) + { // Otherwise if the buffer starts before the recording start, drop it. This // means that we either can't clip, or that the end is also before the // recording start @@ -814,13 +813,11 @@ impl ToggleRecord { return Ok(HandleResult::Drop); } else if data.can_clip(&*state) && current_running_time - .zip(rec_state.last_recording_stop) - .map_or(false, |(cur_rt, last_rec_stop)| cur_rt < last_rec_stop) + .opt_lt(rec_state.last_recording_stop) + .unwrap_or(false) && current_running_time_end - .zip(rec_state.last_recording_stop) - .map_or(false, |(cur_rt_end, last_rec_stop)| { - cur_rt_end > last_rec_stop - }) + .opt_gt(rec_state.last_recording_stop) + .unwrap_or(false) { // Similarly if the end is after the recording stop but the start is before and we // can clip, clip the buffer and pass it through. @@ -863,10 +860,8 @@ impl ToggleRecord { ))); } } else if current_running_time_end - .zip(rec_state.last_recording_stop) - .map_or(false, |(cur_rt_end, last_rec_stop)| { - cur_rt_end > last_rec_stop - }) + .opt_gt(rec_state.last_recording_stop) + .unwrap_or(false) { // Otherwise if the end of the buffer is after the recording stop, we're EOS // now. This means that we either couldn't clip or that the start is also after @@ -887,11 +882,12 @@ impl ToggleRecord { } else { // In all other cases the buffer is fully between recording start and end and // can be passed through as is - assert!(current_running_time.map_or(false, |cur_rt| cur_rt >= last_recording_start)); + assert!(current_running_time + .opt_ge(last_recording_start) + .unwrap_or(false)); assert!(current_running_time_end - .zip(rec_state.last_recording_stop) - .map_or(false, |(cur_rt_end, last_rec_stop)| cur_rt_end - <= last_rec_stop)); + .opt_le(rec_state.last_recording_stop) + .unwrap_or(false)); gst_debug!( CAT, @@ -910,15 +906,14 @@ impl ToggleRecord { // The end of our buffer must be before/at the end of the previous buffer of the main // stream assert!(current_running_time_end - .zip(main_state.current_running_time_end) - .map_or(false, |(cur_rt_end, main_cur_rt_end)| cur_rt_end - <= main_cur_rt_end)); + .opt_le(main_state.current_running_time_end) + .unwrap_or(false)); // We're properly started, must have a start position and // be actually after that start position assert!(current_running_time - .zip(rec_state.last_recording_start) - .map_or(false, |(cur_rt, last_rec_start)| cur_rt >= last_rec_start)); + .opt_ge(rec_state.last_recording_start) + .unwrap_or(false)); gst_log!(CAT, obj: pad, "Passing buffer (recording)"); Ok(HandleResult::Pass(data)) } @@ -998,9 +993,8 @@ impl ToggleRecord { // The end of our buffer must be before/at the end of the previous buffer of the main // stream assert!(current_running_time_end - .zip(main_state.current_running_time_end) - .map_or(false, |(cur_rt_end, state_rt_end)| cur_rt_end - <= state_rt_end)); + .opt_le(main_state.current_running_time_end) + .unwrap_or(false)); // We're properly stopped gst_log!(CAT, obj: pad, "Dropping buffer (stopped)"); @@ -1357,11 +1351,7 @@ impl ToggleRecord { forward = match handle_result { Ok(HandleResult::Pass((new_pts, new_duration))) => { - if new_pts != pts - || new_duration - .zip(duration) - .map_or(false, |(new_duration, duration)| new_duration != duration) - { + if new_pts != pts || new_duration.is_some() && new_duration != duration { event = gst::event::Gap::builder(new_pts) .duration(new_duration) .build(); @@ -1587,10 +1577,9 @@ impl ToggleRecord { { if let Some(delta) = state .current_running_time_end - .zip(rec_state.last_recording_start) - .and_then(|(cur_rt_end, last_rec_start)| { - cur_rt_end.checked_sub(last_rec_start) - }) + .opt_checked_sub(rec_state.last_recording_start) + .ok() + .flatten() { gst_debug!( CAT, @@ -1622,10 +1611,9 @@ impl ToggleRecord { { if let Some(delta) = state .current_running_time_end - .zip(rec_state.last_recording_start) - .and_then(|(cur_rt_end, last_rec_start)| { - cur_rt_end.checked_sub(last_rec_start) - }) + .opt_checked_sub(rec_state.last_recording_start) + .ok() + .flatten() { gst_debug!( CAT, diff --git a/video/closedcaption/src/cea608tojson/imp.rs b/video/closedcaption/src/cea608tojson/imp.rs index b60e77381..0c90ddc8c 100644 --- a/video/closedcaption/src/cea608tojson/imp.rs +++ b/video/closedcaption/src/cea608tojson/imp.rs @@ -400,9 +400,7 @@ fn dump( duration: impl Into<Option<gst::ClockTime>>, ) { let pts = pts.into(); - let end = pts - .zip(duration.into()) - .map(|(pts, duration)| pts + duration); + let end = pts.opt_add(duration.into()); if cc_data != 0x8080 { gst_debug!( @@ -478,10 +476,10 @@ impl State { Some(Cea608Mode::PopOn) => gst::ClockTime::NONE, _ => self .current_pts - .zip(self.current_duration) - .map(|(cur_pts, cur_duration)| cur_pts + cur_duration) - .zip(self.first_pts) - .and_then(|(cur_end, first_pts)| cur_end.checked_sub(first_pts)), + .opt_add(self.current_duration) + .opt_checked_sub(self.first_pts) + .ok() + .flatten(), } }; @@ -541,10 +539,10 @@ impl State { gst_log!(CAT, obj: element, "Draining pending"); pending.duration = self .current_pts - .zip(self.current_duration) - .map(|(cur_pts, cur_dur)| cur_pts + cur_dur) - .zip(pending.pts) - .and_then(|(cur_end, pending_pts)| cur_end.checked_sub(pending_pts)); + .opt_add(self.current_duration) + .opt_checked_sub(pending.pts) + .ok() + .flatten(); Some(pending) } else { None diff --git a/video/closedcaption/src/jsontovtt/imp.rs b/video/closedcaption/src/jsontovtt/imp.rs index dc604c68a..436c388eb 100644 --- a/video/closedcaption/src/jsontovtt/imp.rs +++ b/video/closedcaption/src/jsontovtt/imp.rs @@ -68,7 +68,7 @@ fn clamp( mut pts: gst::ClockTime, mut duration: Option<gst::ClockTime>, ) -> Option<(gst::ClockTime, Option<gst::ClockTime>)> { - let end_pts = duration.map(|duration| pts + duration).unwrap_or(pts); + let end_pts = pts.opt_add(duration).unwrap_or(pts); if let Some(segment_start) = segment.start() { if end_pts < segment_start { @@ -76,9 +76,7 @@ fn clamp( } if pts < segment_start { - if let Some(ref mut duration) = duration { - *duration = duration.saturating_sub(segment_start - pts); - } + duration.opt_sub_assign(segment_start - pts); pts = segment_start; } } @@ -89,9 +87,7 @@ fn clamp( } if end_pts > segment_stop { - if let Some(ref mut duration) = duration { - *duration = duration.saturating_sub(end_pts - segment_stop); - } + duration.opt_sub_assign(end_pts - segment_stop); } } @@ -322,10 +318,7 @@ impl State { self.drain(&mut ret, self.segment.to_running_time(pts)); - self.last_pts = Some(pts) - .zip(duration) - .map(|(pts, duration)| pts + duration) - .or(Some(pts)); + self.last_pts = Some(pts).opt_add(duration).or(Some(pts)); ret } diff --git a/video/closedcaption/src/mcc_parse/imp.rs b/video/closedcaption/src/mcc_parse/imp.rs index ff0f18bb9..c38c4f19d 100644 --- a/video/closedcaption/src/mcc_parse/imp.rs +++ b/video/closedcaption/src/mcc_parse/imp.rs @@ -1015,17 +1015,11 @@ impl MccParse { let pull = state.pull.as_ref().unwrap(); if start_type == gst::SeekType::Set { - start = start - .zip(pull.duration) - .map(|(start, duration)| start.min(duration)) - .or(start); + start = start.opt_min(pull.duration).or(start); } if stop_type == gst::SeekType::Set { - stop = stop - .zip(pull.duration) - .map(|(stop, duration)| stop.min(duration)) - .or(stop); + stop = stop.opt_min(pull.duration).or(stop); } state.seeking = true; diff --git a/video/closedcaption/src/scc_parse/imp.rs b/video/closedcaption/src/scc_parse/imp.rs index 75cdaac13..2fa363f84 100644 --- a/video/closedcaption/src/scc_parse/imp.rs +++ b/video/closedcaption/src/scc_parse/imp.rs @@ -429,14 +429,8 @@ impl SccParse { timecode.increment_frame(); if clip_buffers { - let end_time = buffer - .pts() - .zip(buffer.duration()) - .map(|(pts, duration)| pts + duration); - if end_time - .zip(segment_start) - .map_or(false, |(end_time, segment_start)| end_time < segment_start) - { + let end_time = buffer.pts().opt_add(buffer.duration()); + if end_time.opt_lt(segment_start).unwrap_or(false) { gst_trace!( CAT, obj: element, @@ -448,12 +442,11 @@ impl SccParse { } } - send_eos = state.segment.stop().map_or(false, |stop| { - buffer - .pts() - .zip(buffer.duration()) - .map_or(false, |(pts, duration)| pts + duration >= stop) - }); + send_eos = buffer + .pts() + .opt_add(buffer.duration()) + .opt_ge(state.segment.stop()) + .unwrap_or(false); let buffers = buffers.get_mut().unwrap(); buffers.add(buffer); @@ -899,17 +892,11 @@ impl SccParse { let pull = state.pull.as_ref().unwrap(); if start_type == gst::SeekType::Set { - start = start - .zip(pull.duration) - .map(|(start, duration)| start.min(duration)) - .or(start); + start = start.opt_min(pull.duration).or(start); } if stop_type == gst::SeekType::Set { - stop = stop - .zip(pull.duration) - .map(|(stop, duration)| stop.min(duration)) - .or(stop); + stop = stop.opt_min(pull.duration).or(stop); } state.seeking = true; diff --git a/video/flavors/src/flvdemux/imp.rs b/video/flavors/src/flvdemux/imp.rs index 53cdd925e..78f21bdfa 100644 --- a/video/flavors/src/flvdemux/imp.rs +++ b/video/flavors/src/flvdemux/imp.rs @@ -1198,15 +1198,9 @@ impl StreamingState { fn update_position(&mut self, buffer: &gst::Buffer) { if let Some(pts) = buffer.pts() { - self.last_position = self - .last_position - .map(|last_pos| last_pos.max(pts)) - .or(Some(pts)); + self.last_position = self.last_position.opt_max(pts).or(Some(pts)); } else if let Some(dts) = buffer.dts() { - self.last_position = self - .last_position - .map(|last_pos| last_pos.max(dts)) - .or(Some(dts)); + self.last_position = self.last_position.opt_max(dts).or(Some(dts)); } } } diff --git a/video/gif/src/gifenc/imp.rs b/video/gif/src/gifenc/imp.rs index 481c3a590..9c0480274 100644 --- a/video/gif/src/gifenc/imp.rs +++ b/video/gif/src/gifenc/imp.rs @@ -401,9 +401,9 @@ impl VideoEncoderImpl for GifEnc { // is probably less visible than the large stuttering when a complete 10ms have to // "catch up". gif_frame.delay = (frame_delay.mseconds() as f32 / 10.0).round() as u16; - state.gif_pts = state.gif_pts.map(|gif_pts| { - gif_pts + gst::ClockTime::from_mseconds(gif_frame.delay as u64 * 10) - }); + state.gif_pts = state + .gif_pts + .opt_add(gst::ClockTime::from_mseconds(gif_frame.delay as u64 * 10)); // encode new frame let context = state.context.as_mut().unwrap(); diff --git a/video/webp/tests/webpdec.rs b/video/webp/tests/webpdec.rs index 6dfb04656..4d8e2dd3a 100644 --- a/video/webp/tests/webpdec.rs +++ b/video/webp/tests/webpdec.rs @@ -48,9 +48,7 @@ fn test_decode() { assert_eq!(buf.pts(), expected_timestamp); assert_eq!(buf.duration(), expected_duration); - expected_timestamp = expected_timestamp - .zip(expected_duration) - .map(|(ts, duration)| ts + duration); + expected_timestamp = expected_timestamp.opt_add(expected_duration); count += 1; } |