diff options
author | François Laignel <fengalin@free.fr> | 2021-05-26 12:54:34 +0300 |
---|---|---|
committer | François Laignel <fengalin@free.fr> | 2021-06-05 11:36:21 +0300 |
commit | 8f81cb881262aba9ba4aa2c1d7579e2467b3a596 (patch) | |
tree | 7310df86f506f57cd37dda9962c81d708b6178b0 /generic/threadshare/src/jitterbuffer | |
parent | 17feaa8c71a1915e57d05311fe8c1deedb5b97a1 (diff) |
generic: migrate to new ClockTime design
Diffstat (limited to 'generic/threadshare/src/jitterbuffer')
-rw-r--r-- | generic/threadshare/src/jitterbuffer/imp.rs | 210 | ||||
-rw-r--r-- | generic/threadshare/src/jitterbuffer/jitterbuffer.rs | 61 |
2 files changed, 137 insertions, 134 deletions
diff --git a/generic/threadshare/src/jitterbuffer/imp.rs b/generic/threadshare/src/jitterbuffer/imp.rs index e89d939fb..ef3880d62 100644 --- a/generic/threadshare/src/jitterbuffer/imp.rs +++ b/generic/threadshare/src/jitterbuffer/imp.rs @@ -39,27 +39,27 @@ use crate::runtime::{self, Context, PadSink, PadSinkRef, PadSrc, PadSrcRef, Task use super::jitterbuffer::{RTPJitterBuffer, RTPJitterBufferItem, RTPPacketRateCtx}; -const DEFAULT_LATENCY_MS: u32 = 200; +const DEFAULT_LATENCY: gst::ClockTime = gst::ClockTime::from_mseconds(200); const DEFAULT_DO_LOST: bool = false; const DEFAULT_MAX_DROPOUT_TIME: u32 = 60000; const DEFAULT_MAX_MISORDER_TIME: u32 = 2000; const DEFAULT_CONTEXT: &str = ""; -const DEFAULT_CONTEXT_WAIT: u32 = 0; +const DEFAULT_CONTEXT_WAIT: gst::ClockTime = gst::ClockTime::ZERO; #[derive(Debug, Clone)] struct Settings { - latency_ms: u32, + latency: gst::ClockTime, do_lost: bool, max_dropout_time: u32, max_misorder_time: u32, context: String, - context_wait: u32, + context_wait: gst::ClockTime, } impl Default for Settings { fn default() -> Self { Settings { - latency_ms: DEFAULT_LATENCY_MS, + latency: DEFAULT_LATENCY, do_lost: DEFAULT_DO_LOST, max_dropout_time: DEFAULT_MAX_DROPOUT_TIME, max_misorder_time: DEFAULT_MAX_MISORDER_TIME, @@ -108,7 +108,7 @@ impl PartialEq for GapPacket { struct SinkHandlerInner { packet_rate_ctx: RTPPacketRateCtx, ips_rtptime: Option<u32>, - ips_pts: gst::ClockTime, + ips_pts: Option<gst::ClockTime>, gap_packets: BTreeSet<GapPacket>, @@ -123,7 +123,7 @@ impl Default for SinkHandlerInner { SinkHandlerInner { packet_rate_ctx: RTPPacketRateCtx::new(), ips_rtptime: None, - ips_pts: gst::CLOCK_TIME_NONE, + ips_pts: None, gap_packets: BTreeSet::new(), last_pt: None, last_in_seqnum: None, @@ -155,16 +155,16 @@ impl SinkHandler { state.discont = true; state.last_popped_seqnum = None; - state.last_popped_pts = gst::CLOCK_TIME_NONE; + state.last_popped_pts = None; inner.last_in_seqnum = None; inner.last_rtptime = None; - state.earliest_pts = gst::CLOCK_TIME_NONE; + state.earliest_pts = None; state.earliest_seqnum = None; inner.ips_rtptime = None; - inner.ips_pts = gst::CLOCK_TIME_NONE; + inner.ips_pts = None; mem::replace(&mut inner.gap_packets, BTreeSet::new()) } @@ -208,14 +208,17 @@ impl SinkHandler { inner: &mut SinkHandlerInner, state: &mut State, rtptime: u32, - pts: gst::ClockTime, + pts: impl Into<Option<gst::ClockTime>>, ) { if inner.ips_rtptime != Some(rtptime) { - if inner.ips_pts.is_some() && pts.is_some() { - let new_packet_spacing = pts - inner.ips_pts; + let pts = pts.into(); + let new_packet_spacing = inner + .ips_pts + .zip(pts) + .and_then(|(ips_pts, pts)| pts.checked_sub(ips_pts)); + if let Some(new_packet_spacing) = new_packet_spacing { let old_packet_spacing = state.packet_spacing; - assert!(old_packet_spacing.is_some()); if old_packet_spacing > new_packet_spacing { state.packet_spacing = (new_packet_spacing + 3 * old_packet_spacing) / 4; } else if !old_packet_spacing.is_zero() { @@ -421,7 +424,7 @@ impl SinkHandler { return Ok(gst::FlowSuccess::Ok); } } - inner.ips_pts = gst::CLOCK_TIME_NONE; + inner.ips_pts = None; inner.ips_rtptime = None; } @@ -441,7 +444,7 @@ impl SinkHandler { inner.last_in_seqnum = Some(seq); let jb_item = if estimated_dts { - RTPJitterBufferItem::new(buffer, gst::CLOCK_TIME_NONE, pts, Some(seq), rtptime) + RTPJitterBufferItem::new(buffer, gst::ClockTime::NONE, pts, Some(seq), rtptime) } else { RTPJitterBufferItem::new(buffer, dts, pts, Some(seq), rtptime) }; @@ -463,15 +466,16 @@ impl SinkHandler { inner.last_rtptime = Some(rtptime); - if state.earliest_pts.is_none() - || (pts.is_some() - && (pts < state.earliest_pts - || (pts == state.earliest_pts - && state - .earliest_seqnum - .map(|earliest_seqnum| seq > earliest_seqnum) - .unwrap_or(false)))) - { + let must_update = match (state.earliest_pts, pts) { + (None, _) => true, + (Some(earliest_pts), Some(pts)) if pts < earliest_pts => true, + (Some(earliest_pts), Some(pts)) if pts == earliest_pts => state + .earliest_seqnum + .map_or(false, |earliest_seqnum| seq > earliest_seqnum), + _ => false, + }; + + if must_update { state.earliest_pts = pts; state.earliest_seqnum = Some(seq); } @@ -515,10 +519,7 @@ impl SinkHandler { let (latency, context_wait) = { let settings = jb.settings.lock().unwrap(); - ( - settings.latency_ms as u64 * gst::MSECOND, - settings.context_wait as u64 * gst::MSECOND, - ) + (settings.latency, settings.context_wait) }; // Reschedule if needed @@ -527,13 +528,15 @@ impl SinkHandler { .next_wakeup(&element, &state, latency, context_wait); if let Some((next_wakeup, _)) = next_wakeup { if let Some((previous_next_wakeup, ref abort_handle)) = state.wait_handle { - if previous_next_wakeup.is_none() || previous_next_wakeup > next_wakeup { + if previous_next_wakeup.is_none() + || next_wakeup.map_or(false, |next| previous_next_wakeup.unwrap() > next) + { gst_debug!( CAT, obj: pad, "Rescheduling for new item {} < {}", - next_wakeup, - previous_next_wakeup + next_wakeup.display(), + previous_next_wakeup.display(), ); abort_handle.abort(); state.wait_handle = None; @@ -666,16 +669,13 @@ impl SrcHandler { state: &mut State, element: &super::JitterBuffer, seqnum: u16, - pts: gst::ClockTime, + pts: impl Into<Option<gst::ClockTime>>, discont: &mut bool, ) -> Vec<gst::Event> { - let (latency_ns, do_lost) = { + let (latency, do_lost) = { let jb = JitterBuffer::from_instance(element); let settings = jb.settings.lock().unwrap(); - ( - settings.latency_ms as u64 * gst::MSECOND.nseconds().unwrap(), - settings.do_lost, - ) + (settings.latency, settings.do_lost) }; let mut events = vec![]; @@ -697,30 +697,24 @@ impl SrcHandler { let gap = gst_rtp::compare_seqnum(lost_seqnum, seqnum) as i64; if gap > 0 { - let interval = - pts.nseconds().unwrap() as i64 - state.last_popped_pts.nseconds().unwrap() as i64; let gap = gap as u64; - let spacing = if interval >= 0 { - interval as u64 / (gap + 1) - } else { - 0 - }; + // FIXME reason why we can expect Some for the 2 lines below + let mut last_popped_pts = state.last_popped_pts.unwrap(); + let interval = pts.into().unwrap().saturating_sub(last_popped_pts); + let spacing = interval / (gap as u64 + 1); *discont = true; - if state.equidistant > 0 && gap > 1 && gap * spacing > latency_ns { - let n_packets = gap - latency_ns / spacing; + if state.equidistant > 0 && gap > 1 && gap * spacing > latency { + let n_packets = gap - latency.nseconds() / spacing.nseconds(); if do_lost { let s = gst::Structure::new( "GstRTPPacketLost", &[ ("seqnum", &(lost_seqnum as u32)), - ( - "timestamp", - &(state.last_popped_pts + gst::ClockTime(Some(spacing))), - ), - ("duration", &(n_packets * spacing)), + ("timestamp", &(last_popped_pts + spacing)), + ("duration", &(n_packets * spacing).nseconds()), ("retry", &0), ], ); @@ -729,15 +723,20 @@ impl SrcHandler { } lost_seqnum = lost_seqnum.wrapping_add(n_packets as u16); - state.last_popped_pts += gst::ClockTime(Some(n_packets * spacing)); + last_popped_pts += n_packets * spacing; + state.last_popped_pts = Some(last_popped_pts); state.stats.num_lost += n_packets; } while lost_seqnum != seqnum { - let timestamp = state.last_popped_pts + gst::ClockTime(Some(spacing)); - let duration = if state.equidistant > 0 { spacing } else { 0 }; + let timestamp = last_popped_pts + spacing; + let duration = if state.equidistant > 0 { + spacing + } else { + gst::ClockTime::ZERO + }; - state.last_popped_pts = timestamp; + state.last_popped_pts = Some(timestamp); if do_lost { let s = gst::Structure::new( @@ -745,7 +744,8 @@ impl SrcHandler { &[ ("seqnum", &(lost_seqnum as u32)), ("timestamp", ×tamp), - ("duration", &duration), + // FIXME would probably make sense being a ClockTime + ("duration", &duration.nseconds()), ("retry", &0), ], ); @@ -819,8 +819,8 @@ impl SrcHandler { }; state.last_popped_pts = buffer.pts(); - if let Some(pts) = state.last_popped_pts.nseconds() { - state.position = pts.into(); + if state.last_popped_pts.is_some() { + state.position = state.last_popped_pts; } state.last_popped_seqnum = seq; @@ -845,22 +845,26 @@ impl SrcHandler { state: &State, latency: gst::ClockTime, context_wait: gst::ClockTime, - ) -> (gst::ClockTime, Option<(gst::ClockTime, Duration)>) { + ) -> ( + Option<gst::ClockTime>, + Option<(Option<gst::ClockTime>, Duration)>, + ) { let now = element.current_running_time(); gst_debug!( CAT, obj: element, "Now is {}, EOS {}, earliest pts is {}, packet_spacing {} and latency {}", - now, + now.display(), state.eos, - state.earliest_pts, + state.earliest_pts.display(), state.packet_spacing, latency ); if state.eos { gst_debug!(CAT, obj: element, "EOS, not waiting"); + // FIXME use Duration::ZERO when MSVC >= 1.53.2 return (now, Some((now, Duration::from_nanos(0)))); } @@ -868,23 +872,25 @@ impl SrcHandler { return (now, None); } - let next_wakeup = state.earliest_pts + latency - state.packet_spacing - context_wait / 2; + let next_wakeup = state + .earliest_pts + .map(|earliest_pts| earliest_pts + latency - state.packet_spacing - context_wait / 2); let delay = next_wakeup - .saturating_sub(now) - .unwrap_or_else(gst::ClockTime::zero) - .nseconds() - .unwrap(); + .zip(now) + .map_or(gst::ClockTime::ZERO, |(next_wakeup, now)| { + next_wakeup.saturating_sub(now) + }); gst_debug!( CAT, obj: element, "Next wakeup at {} with delay {}", - next_wakeup, + next_wakeup.display(), delay ); - (now, Some((next_wakeup, Duration::from_nanos(delay)))) + (now, Some((next_wakeup, delay.into()))) } } @@ -954,8 +960,8 @@ impl PadSrcHandler for SrcHandler { if ret { let settings = jb.settings.lock().unwrap(); let (_, mut min_latency, _) = peer_query.result(); - min_latency += (settings.latency_ms as u64) * gst::SECOND; - let max_latency = gst::CLOCK_TIME_NONE; + min_latency += settings.latency; + let max_latency = gst::ClockTime::NONE; q.set(true, min_latency, max_latency); } @@ -999,7 +1005,7 @@ struct State { jbuf: glib::SendUniqueCell<RTPJitterBuffer>, last_res: Result<gst::FlowSuccess, gst::FlowError>, - position: gst::ClockTime, + position: Option<gst::ClockTime>, segment: gst::FormattedSegment<gst::ClockTime>, clock_rate: Option<u32>, @@ -1011,14 +1017,14 @@ struct State { eos: bool, last_popped_seqnum: Option<u16>, - last_popped_pts: gst::ClockTime, + last_popped_pts: Option<gst::ClockTime>, stats: Stats, - earliest_pts: gst::ClockTime, + earliest_pts: Option<gst::ClockTime>, earliest_seqnum: Option<u16>, - wait_handle: Option<(gst::ClockTime, AbortHandle)>, + wait_handle: Option<(Option<gst::ClockTime>, AbortHandle)>, } impl Default for State { @@ -1027,23 +1033,23 @@ impl Default for State { jbuf: glib::SendUniqueCell::new(RTPJitterBuffer::new()).unwrap(), last_res: Ok(gst::FlowSuccess::Ok), - position: gst::CLOCK_TIME_NONE, + position: None, segment: gst::FormattedSegment::<gst::ClockTime>::new(), clock_rate: None, - packet_spacing: gst::ClockTime::zero(), + packet_spacing: gst::ClockTime::ZERO, equidistant: 0, discont: true, eos: false, last_popped_seqnum: None, - last_popped_pts: gst::CLOCK_TIME_NONE, + last_popped_pts: None, stats: Stats::default(), - earliest_pts: gst::CLOCK_TIME_NONE, + earliest_pts: None, earliest_seqnum: None, wait_handle: None, @@ -1093,10 +1099,7 @@ impl TaskImpl for JitterBufferTask { let jb = JitterBuffer::from_instance(&self.element); let (latency, context_wait) = { let settings = jb.settings.lock().unwrap(); - ( - settings.latency_ms as u64 * gst::MSECOND, - settings.context_wait as u64 * gst::MSECOND, - ) + (settings.latency, settings.context_wait) }; loop { @@ -1110,6 +1113,7 @@ impl TaskImpl for JitterBufferTask { ); let (delay_fut, abort_handle) = match next_wakeup { + // FIXME use Duration::ZERO when MSVC >= 1.53.2 Some((_, delay)) if delay == Duration::from_nanos(0) => (None, None), _ => { let (delay_fut, abort_handle) = abortable(async move { @@ -1123,8 +1127,7 @@ impl TaskImpl for JitterBufferTask { }; }); - let next_wakeup = - next_wakeup.map(|w| w.0).unwrap_or(gst::CLOCK_TIME_NONE); + let next_wakeup = next_wakeup.and_then(|w| w.0); (Some(delay_fut), Some((next_wakeup, abort_handle))) } }; @@ -1158,12 +1161,15 @@ impl TaskImpl for JitterBufferTask { CAT, obj: &self.element, "Woke up at {}, earliest_pts {}", - now, - state.earliest_pts + now.display(), + state.earliest_pts.display() ); if let Some((next_wakeup, _)) = next_wakeup { - if next_wakeup > now { + if next_wakeup + .zip(now) + .map_or(false, |(next_wakeup, now)| next_wakeup > now) + { // Reschedule and wait a bit longer in the next iteration return Ok(()); } @@ -1198,8 +1204,8 @@ impl TaskImpl for JitterBufferTask { latency, context_wait, ); - if let Some((next_wakeup, _)) = next_wakeup { - if next_wakeup > now { + if let Some((Some(next_wakeup), _)) = next_wakeup { + if now.map_or(false, |now| next_wakeup > now) { // Reschedule and wait a bit longer in the next iteration return Ok(()); } @@ -1281,7 +1287,7 @@ impl JitterBuffer { let context = { let settings = self.settings.lock().unwrap(); - Context::acquire(&settings.context, settings.context_wait).unwrap() + Context::acquire(&settings.context, settings.context_wait.into()).unwrap() }; self.task @@ -1367,7 +1373,7 @@ impl ObjectImpl for JitterBuffer { "Throttle poll loop to run at most once every this many ms", 0, 1000, - DEFAULT_CONTEXT_WAIT, + DEFAULT_CONTEXT_WAIT.mseconds() as u32, glib::ParamFlags::READWRITE, ), glib::ParamSpec::new_uint( @@ -1376,7 +1382,7 @@ impl ObjectImpl for JitterBuffer { "Amount of ms to buffer", 0, std::u32::MAX, - DEFAULT_LATENCY_MS, + DEFAULT_LATENCY.mseconds() as u32, glib::ParamFlags::READWRITE, ), glib::ParamSpec::new_boolean( @@ -1446,14 +1452,16 @@ impl ObjectImpl for JitterBuffer { ) { match pspec.name() { "latency" => { - let latency_ms = { + let latency = { let mut settings = self.settings.lock().unwrap(); - settings.latency_ms = value.get().expect("type checked upstream"); - settings.latency_ms as u64 + settings.latency = gst::ClockTime::from_mseconds( + value.get::<u32>().expect("type checked upstream").into(), + ); + settings.latency }; let state = self.state.lock().unwrap(); - state.jbuf.borrow().set_delay(latency_ms * gst::MSECOND); + state.jbuf.borrow().set_delay(latency); let _ = obj.post_message(gst::message::Latency::builder().src(obj).build()); } @@ -1478,7 +1486,9 @@ impl ObjectImpl for JitterBuffer { } "context-wait" => { let mut settings = self.settings.lock().unwrap(); - settings.context_wait = value.get().expect("type checked upstream"); + settings.context_wait = gst::ClockTime::from_mseconds( + value.get::<u32>().expect("type checked upstream").into(), + ); } _ => unimplemented!(), } @@ -1488,7 +1498,7 @@ impl ObjectImpl for JitterBuffer { match pspec.name() { "latency" => { let settings = self.settings.lock().unwrap(); - settings.latency_ms.to_value() + settings.latency.mseconds().to_value() } "do-lost" => { let settings = self.settings.lock().unwrap(); @@ -1520,7 +1530,7 @@ impl ObjectImpl for JitterBuffer { } "context-wait" => { let settings = self.settings.lock().unwrap(); - settings.context_wait.to_value() + (settings.context_wait.mseconds() as u32).to_value() } _ => unimplemented!(), } diff --git a/generic/threadshare/src/jitterbuffer/jitterbuffer.rs b/generic/threadshare/src/jitterbuffer/jitterbuffer.rs index 67e244026..3785ac0d7 100644 --- a/generic/threadshare/src/jitterbuffer/jitterbuffer.rs +++ b/generic/threadshare/src/jitterbuffer/jitterbuffer.rs @@ -72,8 +72,8 @@ unsafe impl Send for RTPJitterBufferItem {} impl RTPJitterBufferItem { pub fn new( buffer: gst::Buffer, - dts: gst::ClockTime, - pts: gst::ClockTime, + dts: impl Into<Option<gst::ClockTime>>, + pts: impl Into<Option<gst::ClockTime>>, seqnum: Option<u16>, rtptime: u32, ) -> RTPJitterBufferItem { @@ -89,8 +89,8 @@ impl RTPJitterBufferItem { next: ptr::null_mut(), prev: ptr::null_mut(), r#type: 0, - dts: dts.into_glib(), - pts: pts.into_glib(), + dts: dts.into().into_glib(), + pts: pts.into().into_glib(), seqnum: seqnum.map(|s| s as u32).unwrap_or(std::u32::MAX), count: 1, rtptime, @@ -113,24 +113,24 @@ impl RTPJitterBufferItem { } } - pub fn dts(&self) -> gst::ClockTime { + pub fn dts(&self) -> Option<gst::ClockTime> { unsafe { let item = self.0.as_ref().expect("Invalid wrapper"); if item.as_ref().dts == gst::ffi::GST_CLOCK_TIME_NONE { - gst::CLOCK_TIME_NONE + None } else { - gst::ClockTime(Some(item.as_ref().dts)) + Some(gst::ClockTime::from_nseconds(item.as_ref().dts)) } } } - pub fn pts(&self) -> gst::ClockTime { + pub fn pts(&self) -> Option<gst::ClockTime> { unsafe { let item = self.0.as_ref().expect("Invalid wrapper"); if item.as_ref().pts == gst::ffi::GST_CLOCK_TIME_NONE { - gst::CLOCK_TIME_NONE + None } else { - gst::ClockTime(Some(item.as_ref().pts)) + Some(gst::ClockTime::from_nseconds(item.as_ref().pts)) } } } @@ -235,7 +235,10 @@ impl RTPJitterBuffer { #[allow(dead_code)] pub fn delay(&self) -> gst::ClockTime { - unsafe { from_glib(ffi::rtp_jitter_buffer_get_delay(self.to_glib_none().0)) } + unsafe { + try_from_glib(ffi::rtp_jitter_buffer_get_delay(self.to_glib_none().0)) + .expect("undefined delay") + } } pub fn set_delay(&self, delay: gst::ClockTime) { @@ -253,29 +256,23 @@ impl RTPJitterBuffer { pub fn calculate_pts( &self, - dts: gst::ClockTime, + dts: impl Into<Option<gst::ClockTime>>, estimated_dts: bool, rtptime: u32, - base_time: gst::ClockTime, + base_time: impl Into<Option<gst::ClockTime>>, gap: i32, is_rtx: bool, - ) -> gst::ClockTime { + ) -> Option<gst::ClockTime> { unsafe { - let pts = ffi::rtp_jitter_buffer_calculate_pts( + from_glib(ffi::rtp_jitter_buffer_calculate_pts( self.to_glib_none().0, - dts.into_glib(), + dts.into().into_glib(), estimated_dts.into_glib(), rtptime, - base_time.into_glib(), + base_time.into().into_glib(), gap, is_rtx.into_glib(), - ); - - if pts == gst::ffi::GST_CLOCK_TIME_NONE { - gst::CLOCK_TIME_NONE - } else { - pts.into() - } + )) } } @@ -297,7 +294,7 @@ impl RTPJitterBuffer { } } - pub fn find_earliest(&self) -> (gst::ClockTime, Option<u16>) { + pub fn find_earliest(&self) -> (Option<gst::ClockTime>, Option<u16>) { unsafe { let mut pts = mem::MaybeUninit::uninit(); let mut seqnum = mem::MaybeUninit::uninit(); @@ -307,7 +304,7 @@ impl RTPJitterBuffer { pts.as_mut_ptr(), seqnum.as_mut_ptr(), ); - let pts = pts.assume_init(); + let pts = from_glib(pts.assume_init()); let seqnum = seqnum.assume_init(); let seqnum = if seqnum == std::u32::MAX { @@ -316,11 +313,7 @@ impl RTPJitterBuffer { Some(seqnum as u16) }; - if pts == gst::ffi::GST_CLOCK_TIME_NONE { - (gst::CLOCK_TIME_NONE, seqnum) - } else { - (pts.into(), seqnum) - } + (pts, seqnum) } } @@ -340,11 +333,11 @@ impl RTPJitterBuffer { } } - pub fn peek(&self) -> (gst::ClockTime, Option<u16>) { + pub fn peek(&self) -> (Option<gst::ClockTime>, Option<u16>) { unsafe { let item = ffi::rtp_jitter_buffer_peek(self.to_glib_none().0); if item.is_null() { - (gst::CLOCK_TIME_NONE, None) + (None, None) } else { let seqnum = (*item).seqnum; let seqnum = if seqnum == std::u32::MAX { @@ -352,7 +345,7 @@ impl RTPJitterBuffer { } else { Some(seqnum as u16) }; - ((*item).pts.into(), seqnum) + (from_glib((*item).pts), seqnum) } } } |