diff options
Diffstat (limited to 'generic/threadshare/src/jitterbuffer/imp.rs')
-rw-r--r-- | generic/threadshare/src/jitterbuffer/imp.rs | 1636 |
1 files changed, 1636 insertions, 0 deletions
diff --git a/generic/threadshare/src/jitterbuffer/imp.rs b/generic/threadshare/src/jitterbuffer/imp.rs new file mode 100644 index 000000000..895efbc06 --- /dev/null +++ b/generic/threadshare/src/jitterbuffer/imp.rs @@ -0,0 +1,1636 @@ +// Copyright (C) 2018 Sebastian Dröge <sebastian@centricular.com> +// +// This library is free software; you can redistribute it and/or +// modify it under the terms of the GNU Library General Public +// License as published by the Free Software Foundation; either +// version 2 of the License, or (at your option) any later version. +// +// This library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +// Library General Public License for more details. +// +// You should have received a copy of the GNU Library General Public +// License along with this library; if not, write to the +// Free Software Foundation, Inc., 51 Franklin Street, Suite 500, +// Boston, MA 02110-1335, USA. + +use futures::future::BoxFuture; +use futures::future::{abortable, AbortHandle, Aborted}; +use futures::prelude::*; + +use glib::glib_object_subclass; +use glib::prelude::*; +use glib::subclass; +use glib::subclass::prelude::*; + +use gst::prelude::*; +use gst::subclass::prelude::*; +use gst::{gst_debug, gst_element_error, gst_error, gst_error_msg, gst_info, gst_log, gst_trace}; +use gst_rtp::RTPBuffer; + +use lazy_static::lazy_static; + +use std::cmp::{max, min, Ordering}; +use std::collections::{BTreeSet, VecDeque}; +use std::mem; +use std::sync::Arc; +use std::sync::Mutex as StdMutex; +use std::time::Duration; + +use crate::runtime::prelude::*; +use crate::runtime::{self, Context, PadSink, PadSinkRef, PadSrc, PadSrcRef, Task}; + +use super::jitterbuffer::{RTPJitterBuffer, RTPJitterBufferItem, RTPPacketRateCtx}; + +const DEFAULT_LATENCY_MS: u32 = 200; +const DEFAULT_DO_LOST: bool = false; +const DEFAULT_MAX_DROPOUT_TIME: u32 = 60000; +const DEFAULT_MAX_MISORDER_TIME: u32 = 2000; +const DEFAULT_CONTEXT: &str = ""; +const DEFAULT_CONTEXT_WAIT: u32 = 0; + +#[derive(Debug, Clone)] +struct Settings { + latency_ms: u32, + do_lost: bool, + max_dropout_time: u32, + max_misorder_time: u32, + context: String, + context_wait: u32, +} + +impl Default for Settings { + fn default() -> Self { + Settings { + latency_ms: DEFAULT_LATENCY_MS, + do_lost: DEFAULT_DO_LOST, + max_dropout_time: DEFAULT_MAX_DROPOUT_TIME, + max_misorder_time: DEFAULT_MAX_MISORDER_TIME, + context: DEFAULT_CONTEXT.into(), + context_wait: DEFAULT_CONTEXT_WAIT, + } + } +} + +static PROPERTIES: [subclass::Property; 7] = [ + subclass::Property("latency", |name| { + glib::ParamSpec::uint( + name, + "Buffer latency in ms", + "Amount of ms to buffer", + 0, + std::u32::MAX, + DEFAULT_LATENCY_MS, + glib::ParamFlags::READWRITE, + ) + }), + subclass::Property("do-lost", |name| { + glib::ParamSpec::boolean( + name, + "Do Lost", + "Send an event downstream when a packet is lost", + DEFAULT_DO_LOST, + glib::ParamFlags::READWRITE, + ) + }), + subclass::Property("max-dropout-time", |name| { + glib::ParamSpec::uint( + name, + "Max dropout time", + "The maximum time (milliseconds) of missing packets tolerated.", + 0, + std::u32::MAX, + DEFAULT_MAX_DROPOUT_TIME, + glib::ParamFlags::READWRITE, + ) + }), + subclass::Property("max-misorder-time", |name| { + glib::ParamSpec::uint( + name, + "Max misorder time", + "The maximum time (milliseconds) of misordered packets tolerated.", + 0, + std::u32::MAX, + DEFAULT_MAX_MISORDER_TIME, + glib::ParamFlags::READWRITE, + ) + }), + subclass::Property("stats", |name| { + glib::ParamSpec::boxed( + name, + "Statistics", + "Various statistics", + gst::Structure::static_type(), + glib::ParamFlags::READABLE, + ) + }), + subclass::Property("context", |name| { + glib::ParamSpec::string( + name, + "Context", + "Context name to share threads with", + Some(DEFAULT_CONTEXT), + glib::ParamFlags::READWRITE, + ) + }), + subclass::Property("context-wait", |name| { + glib::ParamSpec::uint( + name, + "Context Wait", + "Throttle poll loop to run at most once every this many ms", + 0, + 1000, + DEFAULT_CONTEXT_WAIT, + glib::ParamFlags::READWRITE, + ) + }), +]; + +#[derive(Eq)] +struct GapPacket { + buffer: gst::Buffer, + seq: u16, + pt: u8, +} + +impl GapPacket { + fn new(buffer: gst::Buffer) -> Self { + let rtp_buffer = RTPBuffer::from_buffer_readable(&buffer).unwrap(); + let seq = rtp_buffer.get_seq(); + let pt = rtp_buffer.get_payload_type(); + drop(rtp_buffer); + + Self { buffer, seq, pt } + } +} + +impl Ord for GapPacket { + fn cmp(&self, other: &Self) -> Ordering { + 0.cmp(&gst_rtp::compare_seqnum(self.seq, other.seq)) + } +} + +impl PartialOrd for GapPacket { + fn partial_cmp(&self, other: &Self) -> Option<Ordering> { + Some(self.cmp(other)) + } +} + +impl PartialEq for GapPacket { + fn eq(&self, other: &Self) -> bool { + self.cmp(other) == Ordering::Equal + } +} + +struct SinkHandlerInner { + packet_rate_ctx: RTPPacketRateCtx, + ips_rtptime: Option<u32>, + ips_pts: gst::ClockTime, + + gap_packets: BTreeSet<GapPacket>, + + last_pt: Option<u8>, + + last_in_seqnum: Option<u16>, + last_rtptime: Option<u32>, +} + +impl Default for SinkHandlerInner { + fn default() -> Self { + SinkHandlerInner { + packet_rate_ctx: RTPPacketRateCtx::new(), + ips_rtptime: None, + ips_pts: gst::CLOCK_TIME_NONE, + gap_packets: BTreeSet::new(), + last_pt: None, + last_in_seqnum: None, + last_rtptime: None, + } + } +} + +#[derive(Clone, Default)] +struct SinkHandler(Arc<StdMutex<SinkHandlerInner>>); + +impl SinkHandler { + fn clear(&self) { + let mut inner = self.0.lock().unwrap(); + *inner = SinkHandlerInner::default(); + } + + // For resetting if seqnum discontinuities + fn reset( + &self, + inner: &mut SinkHandlerInner, + state: &mut State, + element: &super::JitterBuffer, + ) -> BTreeSet<GapPacket> { + gst_info!(CAT, obj: element, "Resetting"); + + state.jbuf.borrow().flush(); + state.jbuf.borrow().reset_skew(); + state.discont = true; + + state.last_popped_seqnum = None; + state.last_popped_pts = gst::CLOCK_TIME_NONE; + + inner.last_in_seqnum = None; + inner.last_rtptime = None; + + state.earliest_pts = gst::CLOCK_TIME_NONE; + state.earliest_seqnum = None; + + inner.ips_rtptime = None; + inner.ips_pts = gst::CLOCK_TIME_NONE; + + mem::replace(&mut inner.gap_packets, BTreeSet::new()) + } + + fn parse_caps( + &self, + inner: &mut SinkHandlerInner, + state: &mut State, + element: &super::JitterBuffer, + caps: &gst::Caps, + pt: u8, + ) -> Result<gst::FlowSuccess, gst::FlowError> { + let s = caps.get_structure(0).ok_or(gst::FlowError::Error)?; + + gst_info!(CAT, obj: element, "Parsing {:?}", caps); + + let payload = s + .get_some::<i32>("payload") + .map_err(|_| gst::FlowError::Error)?; + + if pt != 0 && payload as u8 != pt { + return Err(gst::FlowError::Error); + } + + inner.last_pt = Some(pt); + let clock_rate = s + .get_some::<i32>("clock-rate") + .map_err(|_| gst::FlowError::Error)?; + + if clock_rate <= 0 { + return Err(gst::FlowError::Error); + } + state.clock_rate = Some(clock_rate as u32); + + inner.packet_rate_ctx.reset(clock_rate); + state.jbuf.borrow().set_clock_rate(clock_rate as u32); + + Ok(gst::FlowSuccess::Ok) + } + + fn calculate_packet_spacing( + &self, + inner: &mut SinkHandlerInner, + state: &mut State, + rtptime: u32, + pts: gst::ClockTime, + ) { + if inner.ips_rtptime != Some(rtptime) { + if inner.ips_pts.is_some() && pts.is_some() { + let new_packet_spacing = pts - inner.ips_pts; + let old_packet_spacing = state.packet_spacing; + + assert!(old_packet_spacing.is_some()); + if old_packet_spacing > new_packet_spacing { + state.packet_spacing = (new_packet_spacing + 3 * old_packet_spacing) / 4; + } else if !old_packet_spacing.is_zero() { + state.packet_spacing = (3 * new_packet_spacing + old_packet_spacing) / 4; + } else { + state.packet_spacing = new_packet_spacing; + } + + gst_debug!( + CAT, + "new packet spacing {}, old packet spacing {} combined to {}", + new_packet_spacing, + old_packet_spacing, + state.packet_spacing + ); + } + inner.ips_rtptime = Some(rtptime); + inner.ips_pts = pts; + } + } + + fn handle_big_gap_buffer( + &self, + inner: &mut SinkHandlerInner, + element: &super::JitterBuffer, + buffer: gst::Buffer, + pt: u8, + ) -> bool { + let gap_packets_length = inner.gap_packets.len(); + let mut reset = false; + + gst_debug!( + CAT, + obj: element, + "Handling big gap, gap packets length: {}", + gap_packets_length + ); + + inner.gap_packets.insert(GapPacket::new(buffer)); + + if gap_packets_length > 0 { + let mut prev_gap_seq = std::u32::MAX; + let mut all_consecutive = true; + + for gap_packet in inner.gap_packets.iter() { + gst_log!( + CAT, + obj: element, + "Looking at gap packet with seq {}", + gap_packet.seq, + ); + + all_consecutive = gap_packet.pt == pt; + + if prev_gap_seq == std::u32::MAX { + prev_gap_seq = gap_packet.seq as u32; + } else if gst_rtp::compare_seqnum(gap_packet.seq, prev_gap_seq as u16) != -1 { + all_consecutive = false; + } else { + prev_gap_seq = gap_packet.seq as u32; + } + + if !all_consecutive { + break; + } + } + + gst_debug!(CAT, obj: element, "all consecutive: {}", all_consecutive); + + if all_consecutive && gap_packets_length > 3 { + reset = true; + } else if !all_consecutive { + inner.gap_packets.clear(); + } + } + + reset + } + + fn store( + &self, + inner: &mut SinkHandlerInner, + pad: &gst::Pad, + element: &super::JitterBuffer, + buffer: gst::Buffer, + ) -> Result<gst::FlowSuccess, gst::FlowError> { + let jb = JitterBuffer::from_instance(element); + let mut state = jb.state.lock().unwrap(); + + let (max_misorder_time, max_dropout_time) = { + let settings = jb.settings.lock().unwrap(); + (settings.max_misorder_time, settings.max_dropout_time) + }; + + let (seq, rtptime, pt) = { + let rtp_buffer = + RTPBuffer::from_buffer_readable(&buffer).map_err(|_| gst::FlowError::Error)?; + ( + rtp_buffer.get_seq(), + rtp_buffer.get_timestamp(), + rtp_buffer.get_payload_type(), + ) + }; + + let mut pts = buffer.get_pts(); + let mut dts = buffer.get_dts(); + let mut estimated_dts = false; + + gst_log!( + CAT, + obj: element, + "Storing buffer, seq: {}, rtptime: {}, pt: {}", + seq, + rtptime, + pt + ); + + if dts.is_none() { + dts = pts; + } else if pts.is_none() { + pts = dts; + } + + if dts.is_none() { + dts = element.get_current_running_time(); + pts = dts; + + estimated_dts = state.clock_rate.is_some(); + } else { + dts = state.segment.to_running_time(dts); + } + + if state.clock_rate.is_none() { + inner.ips_rtptime = Some(rtptime); + inner.ips_pts = pts; + } + + if inner.last_pt != Some(pt) { + inner.last_pt = Some(pt); + state.clock_rate = None; + + gst_debug!(CAT, obj: pad, "New payload type: {}", pt); + + if let Some(caps) = pad.get_current_caps() { + /* Ignore errors at this point, as we want to emit request-pt-map */ + let _ = self.parse_caps(inner, &mut state, element, &caps, pt); + } + } + + let mut state = { + if state.clock_rate.is_none() { + drop(state); + let caps = element + .emit("request-pt-map", &[&(pt as u32)]) + .map_err(|_| gst::FlowError::Error)? + .ok_or(gst::FlowError::Error)? + .get::<gst::Caps>() + .map_err(|_| gst::FlowError::Error)? + .ok_or(gst::FlowError::Error)?; + let mut state = jb.state.lock().unwrap(); + self.parse_caps(inner, &mut state, element, &caps, pt)?; + state + } else { + state + } + }; + + inner.packet_rate_ctx.update(seq, rtptime); + + let max_dropout = inner + .packet_rate_ctx + .get_max_dropout(max_dropout_time as i32); + let max_misorder = inner + .packet_rate_ctx + .get_max_dropout(max_misorder_time as i32); + + pts = state.jbuf.borrow().calculate_pts( + dts, + estimated_dts, + rtptime, + element.get_base_time(), + 0, + false, + ); + + if pts.is_none() { + gst_debug!( + CAT, + obj: element, + "cannot calculate a valid pts for #{}, discard", + seq + ); + return Ok(gst::FlowSuccess::Ok); + } + + if let Some(last_in_seqnum) = inner.last_in_seqnum { + let gap = gst_rtp::compare_seqnum(last_in_seqnum as u16, seq); + if gap == 1 { + self.calculate_packet_spacing(inner, &mut state, rtptime, pts); + } else { + if (gap != -1 && gap < -(max_misorder as i32)) || (gap >= max_dropout as i32) { + let reset = self.handle_big_gap_buffer(inner, element, buffer, pt); + if reset { + // Handle reset in `enqueue_item` to avoid recursion + return Err(gst::FlowError::CustomError); + } else { + return Ok(gst::FlowSuccess::Ok); + } + } + inner.ips_pts = gst::CLOCK_TIME_NONE; + inner.ips_rtptime = None; + } + + inner.gap_packets.clear(); + } + + if let Some(last_popped_seqnum) = state.last_popped_seqnum { + let gap = gst_rtp::compare_seqnum(last_popped_seqnum, seq); + + if gap <= 0 { + state.stats.num_late += 1; + gst_debug!(CAT, obj: element, "Dropping late {}", seq); + return Ok(gst::FlowSuccess::Ok); + } + } + + inner.last_in_seqnum = Some(seq); + + let jb_item = if estimated_dts { + RTPJitterBufferItem::new(buffer, gst::CLOCK_TIME_NONE, pts, Some(seq), rtptime) + } else { + RTPJitterBufferItem::new(buffer, dts, pts, Some(seq), rtptime) + }; + + let (success, _, _) = state.jbuf.borrow().insert(jb_item); + + if !success { + /* duplicate */ + return Ok(gst::FlowSuccess::Ok); + } + + if Some(rtptime) == inner.last_rtptime { + state.equidistant -= 2; + } else { + state.equidistant += 1; + } + + state.equidistant = min(max(state.equidistant, -7), 7); + + inner.last_rtptime = Some(rtptime); + + if state.earliest_pts.is_none() + || (pts.is_some() + && (pts < state.earliest_pts + || (pts == state.earliest_pts + && state + .earliest_seqnum + .map(|earliest_seqnum| seq > earliest_seqnum) + .unwrap_or(false)))) + { + state.earliest_pts = pts; + state.earliest_seqnum = Some(seq); + } + + gst_log!(CAT, obj: pad, "Stored buffer"); + + Ok(gst::FlowSuccess::Ok) + } + + fn enqueue_item( + &self, + pad: &gst::Pad, + element: &super::JitterBuffer, + buffer: Option<gst::Buffer>, + ) -> Result<gst::FlowSuccess, gst::FlowError> { + let mut inner = self.0.lock().unwrap(); + + let mut buffers = VecDeque::new(); + if let Some(buf) = buffer { + buffers.push_back(buf); + } + + // This is to avoid recursion with `store`, `reset` and `enqueue_item` + while let Some(buf) = buffers.pop_front() { + if let Err(err) = self.store(&mut inner, pad, element, buf) { + match err { + gst::FlowError::CustomError => { + let jb = JitterBuffer::from_instance(element); + let mut state = jb.state.lock().unwrap(); + for gap_packet in self.reset(&mut inner, &mut state, element) { + buffers.push_back(gap_packet.buffer); + } + } + other => return Err(other), + } + } + } + + let jb = JitterBuffer::from_instance(element); + let mut state = jb.state.lock().unwrap(); + + let (latency, context_wait) = { + let settings = jb.settings.lock().unwrap(); + ( + settings.latency_ms as u64 * gst::MSECOND, + settings.context_wait as u64 * gst::MSECOND, + ) + }; + + // Reschedule if needed + let (_, next_wakeup) = + jb.src_pad_handler + .get_next_wakeup(&element, &state, latency, context_wait); + if let Some((next_wakeup, _)) = next_wakeup { + if let Some((previous_next_wakeup, ref abort_handle)) = state.wait_handle { + if previous_next_wakeup.is_none() || previous_next_wakeup > next_wakeup { + gst_debug!( + CAT, + obj: pad, + "Rescheduling for new item {} < {}", + next_wakeup, + previous_next_wakeup + ); + abort_handle.abort(); + state.wait_handle = None; + } + } + } + state.last_res + } +} + +impl PadSinkHandler for SinkHandler { + type ElementImpl = JitterBuffer; + + fn sink_chain( + &self, + pad: &PadSinkRef, + _jb: &JitterBuffer, + element: &gst::Element, + buffer: gst::Buffer, + ) -> BoxFuture<'static, Result<gst::FlowSuccess, gst::FlowError>> { + let pad_weak = pad.downgrade(); + let element = element.clone().downcast::<super::JitterBuffer>().unwrap(); + let this = self.clone(); + + async move { + let pad = pad_weak.upgrade().expect("PadSink no longer exists"); + + gst_debug!(CAT, obj: pad.gst_pad(), "Handling {:?}", buffer); + this.enqueue_item(pad.gst_pad(), &element, Some(buffer)) + } + .boxed() + } + + fn sink_event( + &self, + pad: &PadSinkRef, + jb: &JitterBuffer, + element: &gst::Element, + event: gst::Event, + ) -> bool { + use gst::EventView; + + gst_log!(CAT, obj: pad.gst_pad(), "Handling {:?}", event); + + if let EventView::FlushStart(..) = event.view() { + if let Err(err) = jb.task.flush_start() { + gst_error!(CAT, obj: pad.gst_pad(), "FlushStart failed {:?}", err); + gst_element_error!( + element, + gst::StreamError::Failed, + ("Internal data stream error"), + ["FlushStart failed {:?}", err] + ); + return false; + } + } + + gst_log!(CAT, obj: pad.gst_pad(), "Forwarding {:?}", event); + jb.src_pad.gst_pad().push_event(event) + } + + fn sink_event_serialized( + &self, + pad: &PadSinkRef, + _jb: &JitterBuffer, + element: &gst::Element, + event: gst::Event, + ) -> BoxFuture<'static, bool> { + use gst::EventView; + + let pad_weak = pad.downgrade(); + let element = element.clone().downcast::<super::JitterBuffer>().unwrap(); + + async move { + let pad = pad_weak.upgrade().expect("PadSink no longer exists"); + + gst_log!(CAT, obj: pad.gst_pad(), "Handling {:?}", event); + + let jb = JitterBuffer::from_instance(&element); + + let mut forward = true; + match event.view() { + EventView::Segment(e) => { + let mut state = jb.state.lock().unwrap(); + state.segment = e + .get_segment() + .clone() + .downcast::<gst::format::Time>() + .unwrap(); + } + EventView::FlushStop(..) => { + if let Err(err) = jb.task.flush_stop() { + gst_error!(CAT, obj: pad.gst_pad(), "FlushStop failed {:?}", err); + gst_element_error!( + element, + gst::StreamError::Failed, + ("Internal data stream error"), + ["FlushStop failed {:?}", err] + ); + return false; + } + } + EventView::Eos(..) => { + let mut state = jb.state.lock().unwrap(); + state.eos = true; + if let Some((_, abort_handle)) = state.wait_handle.take() { + abort_handle.abort(); + } + forward = false; + } + _ => (), + }; + + if forward { + // FIXME: These events should really be queued up and stay in order + gst_log!(CAT, obj: pad.gst_pad(), "Forwarding serialized {:?}", event); + jb.src_pad.push_event(event).await + } else { + true + } + } + .boxed() + } +} + +#[derive(Clone, Default)] +struct SrcHandler; + +impl SrcHandler { + fn clear(&self) {} + + fn generate_lost_events( + &self, + state: &mut State, + element: &super::JitterBuffer, + seqnum: u16, + pts: gst::ClockTime, + discont: &mut bool, + ) -> Vec<gst::Event> { + let (latency_ns, do_lost) = { + let jb = JitterBuffer::from_instance(element); + let settings = jb.settings.lock().unwrap(); + ( + settings.latency_ms as u64 * gst::MSECOND.nseconds().unwrap(), + settings.do_lost, + ) + }; + + let mut events = vec![]; + + let last_popped_seqnum = match state.last_popped_seqnum { + None => return events, + Some(seq) => seq, + }; + + gst_debug!( + CAT, + obj: element, + "Generating lost events seq: {}, last popped seq: {:?}", + seqnum, + last_popped_seqnum, + ); + + let mut lost_seqnum = last_popped_seqnum.wrapping_add(1); + let gap = gst_rtp::compare_seqnum(lost_seqnum, seqnum) as i64; + + if gap > 0 { + let interval = + pts.nseconds().unwrap() as i64 - state.last_popped_pts.nseconds().unwrap() as i64; + let gap = gap as u64; + let spacing = if interval >= 0 { + interval as u64 / (gap + 1) + } else { + 0 + }; + + *discont = true; + + if state.equidistant > 0 && gap > 1 && gap * spacing > latency_ns { + let n_packets = gap - latency_ns / spacing; + + if do_lost { + let s = gst::Structure::new( + "GstRTPPacketLost", + &[ + ("seqnum", &(lost_seqnum as u32)), + ( + "timestamp", + &(state.last_popped_pts + gst::ClockTime(Some(spacing))), + ), + ("duration", &(n_packets * spacing)), + ("retry", &0), + ], + ); + + events.push(gst::event::CustomDownstream::new(s)); + } + + lost_seqnum = lost_seqnum.wrapping_add(n_packets as u16); + state.last_popped_pts += gst::ClockTime(Some(n_packets * spacing)); + state.stats.num_lost += n_packets; + } + + while lost_seqnum != seqnum { + let timestamp = state.last_popped_pts + gst::ClockTime(Some(spacing)); + let duration = if state.equidistant > 0 { spacing } else { 0 }; + + state.last_popped_pts = timestamp; + + if do_lost { + let s = gst::Structure::new( + "GstRTPPacketLost", + &[ + ("seqnum", &(lost_seqnum as u32)), + ("timestamp", ×tamp), + ("duration", &duration), + ("retry", &0), + ], + ); + + events.push(gst::event::CustomDownstream::new(s)); + } + + state.stats.num_lost += 1; + + lost_seqnum = lost_seqnum.wrapping_add(1); + } + } + + events + } + + async fn pop_and_push( + &self, + element: &super::JitterBuffer, + ) -> Result<gst::FlowSuccess, gst::FlowError> { + let jb = JitterBuffer::from_instance(element); + + let (lost_events, buffer, seq) = { + let mut state = jb.state.lock().unwrap(); + + let mut discont = false; + let (jb_item, _) = state.jbuf.borrow().pop(); + + let jb_item = match jb_item { + None => { + if state.eos { + return Err(gst::FlowError::Eos); + } else { + return Ok(gst::FlowSuccess::Ok); + } + } + Some(item) => item, + }; + + let dts = jb_item.get_dts(); + let pts = jb_item.get_pts(); + let seq = jb_item.get_seqnum(); + let mut buffer = jb_item.into_buffer(); + + let lost_events = { + let buffer = buffer.make_mut(); + + buffer.set_dts(state.segment.to_running_time(dts)); + buffer.set_pts(state.segment.to_running_time(pts)); + + if state.last_popped_pts.is_some() && buffer.get_pts() < state.last_popped_pts { + buffer.set_pts(state.last_popped_pts) + } + + let lost_events = if let Some(seq) = seq { + self.generate_lost_events(&mut state, element, seq, pts, &mut discont) + } else { + vec![] + }; + + if state.discont { + discont = true; + state.discont = false; + } + + if discont { + buffer.set_flags(gst::BufferFlags::DISCONT); + } + + lost_events + }; + + state.last_popped_pts = buffer.get_pts(); + if let Some(pts) = state.last_popped_pts.nseconds() { + state.position = pts.into(); + } + state.last_popped_seqnum = seq; + + state.stats.num_pushed += 1; + + (lost_events, buffer, seq) + }; + + for event in lost_events { + gst_debug!(CAT, obj: jb.src_pad.gst_pad(), "Pushing lost event {:?}", event); + let _ = jb.src_pad.push_event(event).await; + } + + gst_debug!(CAT, obj: jb.src_pad.gst_pad(), "Pushing {:?} with seq {:?}", buffer, seq); + + jb.src_pad.push(buffer).await + } + + fn get_next_wakeup( + &self, + element: &super::JitterBuffer, + state: &State, + latency: gst::ClockTime, + context_wait: gst::ClockTime, + ) -> (gst::ClockTime, Option<(gst::ClockTime, Duration)>) { + let now = element.get_current_running_time(); + + gst_debug!( + CAT, + obj: element, + "Now is {}, EOS {}, earliest pts is {}, packet_spacing {} and latency {}", + now, + state.eos, + state.earliest_pts, + state.packet_spacing, + latency + ); + + if state.eos { + gst_debug!(CAT, obj: element, "EOS, not waiting"); + return (now, Some((now, Duration::from_nanos(0)))); + } + + if state.earliest_pts.is_none() { + return (now, None); + } + + let next_wakeup = state.earliest_pts + latency - state.packet_spacing - context_wait / 2; + + let delay = next_wakeup + .saturating_sub(now) + .unwrap_or_else(gst::ClockTime::zero) + .nseconds() + .unwrap(); + + gst_debug!( + CAT, + obj: element, + "Next wakeup at {} with delay {}", + next_wakeup, + delay + ); + + (now, Some((next_wakeup, Duration::from_nanos(delay)))) + } +} + +impl PadSrcHandler for SrcHandler { + type ElementImpl = JitterBuffer; + + fn src_event( + &self, + pad: &PadSrcRef, + jb: &JitterBuffer, + element: &gst::Element, + event: gst::Event, + ) -> bool { + use gst::EventView; + + gst_log!(CAT, obj: pad.gst_pad(), "Handling {:?}", event); + + match event.view() { + EventView::FlushStart(..) => { + if let Err(err) = jb.task.flush_start() { + gst_error!(CAT, obj: pad.gst_pad(), "FlushStart failed {:?}", err); + gst_element_error!( + element, + gst::StreamError::Failed, + ("Internal data stream error"), + ["FlushStart failed {:?}", err] + ); + return false; + } + } + EventView::FlushStop(..) => { + if let Err(err) = jb.task.flush_stop() { + gst_error!(CAT, obj: pad.gst_pad(), "FlushStop failed {:?}", err); + gst_element_error!( + element, + gst::StreamError::Failed, + ("Internal data stream error"), + ["FlushStop failed {:?}", err] + ); + return false; + } + } + _ => (), + } + + gst_log!(CAT, obj: pad.gst_pad(), "Forwarding {:?}", event); + jb.sink_pad.gst_pad().push_event(event) + } + + fn src_query( + &self, + pad: &PadSrcRef, + jb: &JitterBuffer, + _element: &gst::Element, + query: &mut gst::QueryRef, + ) -> bool { + use gst::QueryView; + + gst_log!(CAT, obj: pad.gst_pad(), "Forwarding {:?}", query); + + match query.view_mut() { + QueryView::Latency(ref mut q) => { + let mut peer_query = gst::query::Latency::new(); + + let ret = jb.sink_pad.gst_pad().peer_query(&mut peer_query); + + if ret { + let settings = jb.settings.lock().unwrap(); + let (_, mut min_latency, _) = peer_query.get_result(); + min_latency += (settings.latency_ms as u64) * gst::SECOND; + let max_latency = gst::CLOCK_TIME_NONE; + + q.set(true, min_latency, max_latency); + } + + ret + } + QueryView::Position(ref mut q) => { + if q.get_format() != gst::Format::Time { + jb.sink_pad.gst_pad().peer_query(query) + } else { + let state = jb.state.lock().unwrap(); + let position = state.position; + q.set(position); + true + } + } + _ => jb.sink_pad.gst_pad().peer_query(query), + } + } +} + +#[derive(Debug)] +struct Stats { + num_pushed: u64, + num_lost: u64, + num_late: u64, +} + +impl Default for Stats { + fn default() -> Self { + Self { + num_pushed: 0, + num_lost: 0, + num_late: 0, + } + } +} + +// Shared state between element, sink and source pad +struct State { + jbuf: glib::SendUniqueCell<RTPJitterBuffer>, + + last_res: Result<gst::FlowSuccess, gst::FlowError>, + position: gst::ClockTime, + + segment: gst::FormattedSegment<gst::ClockTime>, + clock_rate: Option<u32>, + + packet_spacing: gst::ClockTime, + equidistant: i32, + + discont: bool, + eos: bool, + + last_popped_seqnum: Option<u16>, + last_popped_pts: gst::ClockTime, + + stats: Stats, + + earliest_pts: gst::ClockTime, + earliest_seqnum: Option<u16>, + + wait_handle: Option<(gst::ClockTime, AbortHandle)>, +} + +impl Default for State { + fn default() -> State { + State { + jbuf: glib::SendUniqueCell::new(RTPJitterBuffer::new()).unwrap(), + + last_res: Ok(gst::FlowSuccess::Ok), + position: gst::CLOCK_TIME_NONE, + + segment: gst::FormattedSegment::<gst::ClockTime>::new(), + clock_rate: None, + + packet_spacing: gst::ClockTime::zero(), + equidistant: 0, + + discont: true, + eos: false, + + last_popped_seqnum: None, + last_popped_pts: gst::CLOCK_TIME_NONE, + + stats: Stats::default(), + + earliest_pts: gst::CLOCK_TIME_NONE, + earliest_seqnum: None, + + wait_handle: None, + } + } +} + +struct JitterBufferTask { + element: super::JitterBuffer, + src_pad_handler: SrcHandler, + sink_pad_handler: SinkHandler, +} + +impl JitterBufferTask { + fn new( + element: &super::JitterBuffer, + src_pad_handler: &SrcHandler, + sink_pad_handler: &SinkHandler, + ) -> Self { + JitterBufferTask { + element: element.clone(), + src_pad_handler: src_pad_handler.clone(), + sink_pad_handler: sink_pad_handler.clone(), + } + } +} + +impl TaskImpl for JitterBufferTask { + fn start(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> { + async move { + gst_log!(CAT, obj: &self.element, "Starting task"); + + self.src_pad_handler.clear(); + self.sink_pad_handler.clear(); + + let jb = JitterBuffer::from_instance(&self.element); + *jb.state.lock().unwrap() = State::default(); + + gst_log!(CAT, obj: &self.element, "Task started"); + Ok(()) + } + .boxed() + } + + fn iterate(&mut self) -> BoxFuture<'_, Result<(), gst::FlowError>> { + async move { + let jb = JitterBuffer::from_instance(&self.element); + let (latency, context_wait) = { + let settings = jb.settings.lock().unwrap(); + ( + settings.latency_ms as u64 * gst::MSECOND, + settings.context_wait as u64 * gst::MSECOND, + ) + }; + + loop { + let delay_fut = { + let mut state = jb.state.lock().unwrap(); + let (_, next_wakeup) = self.src_pad_handler.get_next_wakeup( + &self.element, + &state, + latency, + context_wait, + ); + + let (delay_fut, abort_handle) = match next_wakeup { + Some((_, delay)) if delay == Duration::from_nanos(0) => (None, None), + _ => { + let (delay_fut, abort_handle) = abortable(async move { + match next_wakeup { + Some((_, delay)) => { + runtime::time::delay_for(delay).await; + } + None => { + future::pending::<()>().await; + } + }; + }); + + let next_wakeup = + next_wakeup.map(|w| w.0).unwrap_or(gst::CLOCK_TIME_NONE); + (Some(delay_fut), Some((next_wakeup, abort_handle))) + } + }; + + state.wait_handle = abort_handle; + + delay_fut + }; + + // Got aborted, reschedule if needed + if let Some(delay_fut) = delay_fut { + gst_debug!(CAT, obj: &self.element, "Waiting"); + if let Err(Aborted) = delay_fut.await { + gst_debug!(CAT, obj: &self.element, "Waiting aborted"); + return Ok(()); + } + } + + let (head_pts, head_seq) = { + let state = jb.state.lock().unwrap(); + // + // Check earliest PTS as we have just taken the lock + let (now, next_wakeup) = self.src_pad_handler.get_next_wakeup( + &self.element, + &state, + latency, + context_wait, + ); + + gst_debug!( + CAT, + obj: &self.element, + "Woke up at {}, earliest_pts {}", + now, + state.earliest_pts + ); + + if let Some((next_wakeup, _)) = next_wakeup { + if next_wakeup > now { + // Reschedule and wait a bit longer in the next iteration + return Ok(()); + } + } else { + return Ok(()); + } + + let (head_pts, head_seq) = state.jbuf.borrow().peek(); + + (head_pts, head_seq) + }; + + let res = self.src_pad_handler.pop_and_push(&self.element).await; + + { + let mut state = jb.state.lock().unwrap(); + + state.last_res = res; + + if head_pts == state.earliest_pts && head_seq == state.earliest_seqnum { + let (earliest_pts, earliest_seqnum) = state.jbuf.borrow().find_earliest(); + state.earliest_pts = earliest_pts; + state.earliest_seqnum = earliest_seqnum; + } + + if res.is_ok() { + // Return and reschedule if the next packet would be in the future + // Check earliest PTS as we have just taken the lock + let (now, next_wakeup) = self.src_pad_handler.get_next_wakeup( + &self.element, + &state, + latency, + context_wait, + ); + if let Some((next_wakeup, _)) = next_wakeup { + if next_wakeup > now { + // Reschedule and wait a bit longer in the next iteration + return Ok(()); + } + } else { + return Ok(()); + } + } + } + + if let Err(err) = res { + match err { + gst::FlowError::Eos => { + gst_debug!(CAT, obj: &self.element, "Pushing EOS event"); + let _ = jb.src_pad.push_event(gst::event::Eos::new()).await; + } + gst::FlowError::Flushing => gst_debug!(CAT, obj: &self.element, "Flushing"), + err => gst_error!(CAT, obj: &self.element, "Error {}", err), + } + + return Err(err); + } + } + } + .boxed() + } + + fn stop(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> { + async move { + gst_log!(CAT, obj: &self.element, "Stopping task"); + + let jb = JitterBuffer::from_instance(&self.element); + let mut jb_state = jb.state.lock().unwrap(); + + if let Some((_, abort_handle)) = jb_state.wait_handle.take() { + abort_handle.abort(); + } + + self.src_pad_handler.clear(); + self.sink_pad_handler.clear(); + + *jb_state = State::default(); + + gst_log!(CAT, obj: &self.element, "Task stopped"); + Ok(()) + } + .boxed() + } +} + +pub struct JitterBuffer { + sink_pad: PadSink, + src_pad: PadSrc, + sink_pad_handler: SinkHandler, + src_pad_handler: SrcHandler, + task: Task, + state: StdMutex<State>, + settings: StdMutex<Settings>, +} + +lazy_static! { + static ref CAT: gst::DebugCategory = gst::DebugCategory::new( + "ts-jitterbuffer", + gst::DebugColorFlags::empty(), + Some("Thread-sharing jitterbuffer"), + ); +} + +impl JitterBuffer { + fn clear_pt_map(&self, element: &super::JitterBuffer) { + gst_info!(CAT, obj: element, "Clearing PT map"); + + let mut state = self.state.lock().unwrap(); + state.clock_rate = None; + state.jbuf.borrow().reset_skew(); + } + + fn prepare(&self, element: &super::JitterBuffer) -> Result<(), gst::ErrorMessage> { + gst_info!(CAT, obj: element, "Preparing"); + + let context = { + let settings = self.settings.lock().unwrap(); + Context::acquire(&settings.context, settings.context_wait).unwrap() + }; + + self.task + .prepare( + JitterBufferTask::new(element, &self.src_pad_handler, &self.sink_pad_handler), + context, + ) + .map_err(|err| { + gst_error_msg!( + gst::ResourceError::OpenRead, + ["Error preparing Task: {:?}", err] + ) + })?; + + gst_info!(CAT, obj: element, "Prepared"); + + Ok(()) + } + + fn unprepare(&self, element: &super::JitterBuffer) { + gst_debug!(CAT, obj: element, "Unpreparing"); + self.task.unprepare().unwrap(); + gst_debug!(CAT, obj: element, "Unprepared"); + } + + fn start(&self, element: &super::JitterBuffer) -> Result<(), gst::ErrorMessage> { + gst_debug!(CAT, obj: element, "Starting"); + self.task.start()?; + gst_debug!(CAT, obj: element, "Started"); + Ok(()) + } + + fn stop(&self, element: &super::JitterBuffer) -> Result<(), gst::ErrorMessage> { + gst_debug!(CAT, obj: element, "Stopping"); + self.task.stop()?; + gst_debug!(CAT, obj: element, "Stopped"); + Ok(()) + } +} + +impl ObjectSubclass for JitterBuffer { + const NAME: &'static str = "RsTsJitterBuffer"; + type Type = super::JitterBuffer; + type ParentType = gst::Element; + type Instance = gst::subclass::ElementInstanceStruct<Self>; + type Class = subclass::simple::ClassStruct<Self>; + + glib_object_subclass!(); + + fn class_init(klass: &mut Self::Class) { + klass.set_metadata( + "Thread-sharing jitterbuffer", + "Generic", + "Simple jitterbuffer", + "Mathieu Duponchelle <mathieu@centricular.com>", + ); + + let caps = gst::Caps::new_any(); + + let sink_pad_template = gst::PadTemplate::new( + "sink", + gst::PadDirection::Sink, + gst::PadPresence::Always, + &caps, + ) + .unwrap(); + klass.add_pad_template(sink_pad_template); + klass.add_signal( + "request-pt-map", + glib::SignalFlags::RUN_LAST, + &[u32::static_type()], + gst::Caps::static_type(), + ); + + klass.add_signal_with_class_handler( + "clear-pt-map", + glib::SignalFlags::RUN_LAST | glib::SignalFlags::ACTION, + &[], + glib::types::Type::Unit, + |_, args| { + let element = args[0] + .get::<super::JitterBuffer>() + .expect("signal arg") + .expect("missing signal arg"); + let jb = Self::from_instance(&element); + jb.clear_pt_map(&element); + None + }, + ); + + let src_pad_template = gst::PadTemplate::new( + "src", + gst::PadDirection::Src, + gst::PadPresence::Always, + &caps, + ) + .unwrap(); + klass.add_pad_template(src_pad_template); + klass.install_properties(&PROPERTIES); + } + + fn with_class(klass: &Self::Class) -> Self { + let sink_pad_handler = SinkHandler::default(); + let src_pad_handler = SrcHandler::default(); + + Self { + sink_pad: PadSink::new( + gst::Pad::from_template(&klass.get_pad_template("sink").unwrap(), Some("sink")), + sink_pad_handler.clone(), + ), + src_pad: PadSrc::new( + gst::Pad::from_template(&klass.get_pad_template("src").unwrap(), Some("src")), + src_pad_handler.clone(), + ), + sink_pad_handler, + src_pad_handler, + task: Task::default(), + state: StdMutex::new(State::default()), + settings: StdMutex::new(Settings::default()), + } + } +} + +impl ObjectImpl for JitterBuffer { + fn set_property(&self, obj: &Self::Type, id: usize, value: &glib::Value) { + let prop = &PROPERTIES[id]; + + match *prop { + subclass::Property("latency", ..) => { + let latency_ms = { + let mut settings = self.settings.lock().unwrap(); + settings.latency_ms = value.get_some().expect("type checked upstream"); + settings.latency_ms as u64 + }; + + let state = self.state.lock().unwrap(); + state.jbuf.borrow().set_delay(latency_ms * gst::MSECOND); + + let _ = obj.post_message(gst::message::Latency::builder().src(obj).build()); + } + subclass::Property("do-lost", ..) => { + let mut settings = self.settings.lock().unwrap(); + settings.do_lost = value.get_some().expect("type checked upstream"); + } + subclass::Property("max-dropout-time", ..) => { + let mut settings = self.settings.lock().unwrap(); + settings.max_dropout_time = value.get_some().expect("type checked upstream"); + } + subclass::Property("max-misorder-time", ..) => { + let mut settings = self.settings.lock().unwrap(); + settings.max_misorder_time = value.get_some().expect("type checked upstream"); + } + subclass::Property("context", ..) => { + let mut settings = self.settings.lock().unwrap(); + settings.context = value + .get() + .expect("type checked upstream") + .unwrap_or_else(|| "".into()); + } + subclass::Property("context-wait", ..) => { + let mut settings = self.settings.lock().unwrap(); + settings.context_wait = value.get_some().expect("type checked upstream"); + } + _ => unimplemented!(), + } + } + + fn get_property(&self, _obj: &Self::Type, id: usize) -> Result<glib::Value, ()> { + let prop = &PROPERTIES[id]; + + match *prop { + subclass::Property("latency", ..) => { + let settings = self.settings.lock().unwrap(); + Ok(settings.latency_ms.to_value()) + } + subclass::Property("do-lost", ..) => { + let settings = self.settings.lock().unwrap(); + Ok(settings.do_lost.to_value()) + } + subclass::Property("max-dropout-time", ..) => { + let settings = self.settings.lock().unwrap(); + Ok(settings.max_dropout_time.to_value()) + } + subclass::Property("max-misorder-time", ..) => { + let settings = self.settings.lock().unwrap(); + Ok(settings.max_misorder_time.to_value()) + } + subclass::Property("stats", ..) => { + let state = self.state.lock().unwrap(); + let s = gst::Structure::new( + "application/x-rtp-jitterbuffer-stats", + &[ + ("num-pushed", &state.stats.num_pushed), + ("num-lost", &state.stats.num_lost), + ("num-late", &state.stats.num_late), + ], + ); + Ok(s.to_value()) + } + subclass::Property("context", ..) => { + let settings = self.settings.lock().unwrap(); + Ok(settings.context.to_value()) + } + subclass::Property("context-wait", ..) => { + let settings = self.settings.lock().unwrap(); + Ok(settings.context_wait.to_value()) + } + _ => unimplemented!(), + } + } + + fn constructed(&self, obj: &Self::Type) { + self.parent_constructed(obj); + + obj.add_pad(self.sink_pad.gst_pad()).unwrap(); + obj.add_pad(self.src_pad.gst_pad()).unwrap(); + obj.set_element_flags(gst::ElementFlags::PROVIDE_CLOCK | gst::ElementFlags::REQUIRE_CLOCK); + } +} + +impl ElementImpl for JitterBuffer { + fn change_state( + &self, + element: &Self::Type, + transition: gst::StateChange, + ) -> Result<gst::StateChangeSuccess, gst::StateChangeError> { + gst_trace!(CAT, obj: element, "Changing state {:?}", transition); + + match transition { + gst::StateChange::NullToReady => { + self.prepare(element).map_err(|err| { + element.post_error_message(err); + gst::StateChangeError + })?; + } + gst::StateChange::PausedToReady => { + self.stop(element).map_err(|_| gst::StateChangeError)?; + } + gst::StateChange::ReadyToNull => { + self.unprepare(element); + } + _ => (), + } + + let mut success = self.parent_change_state(element, transition)?; + + match transition { + gst::StateChange::ReadyToPaused => { + self.start(element).map_err(|_| gst::StateChangeError)?; + success = gst::StateChangeSuccess::NoPreroll; + } + gst::StateChange::PlayingToPaused => { + success = gst::StateChangeSuccess::NoPreroll; + } + _ => (), + } + + Ok(success) + } + + fn provide_clock(&self, _element: &Self::Type) -> Option<gst::Clock> { + Some(gst::SystemClock::obtain()) + } +} |