Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.freedesktop.org/gstreamer/gst-plugins-rs.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
Diffstat (limited to 'generic/threadshare/src/jitterbuffer/jitterbuffer.rs')
-rw-r--r--generic/threadshare/src/jitterbuffer/jitterbuffer.rs1764
1 files changed, 250 insertions, 1514 deletions
diff --git a/generic/threadshare/src/jitterbuffer/jitterbuffer.rs b/generic/threadshare/src/jitterbuffer/jitterbuffer.rs
index 3579f9f3a..5984db98f 100644
--- a/generic/threadshare/src/jitterbuffer/jitterbuffer.rs
+++ b/generic/threadshare/src/jitterbuffer/jitterbuffer.rs
@@ -1,4 +1,4 @@
-// Copyright (C) 2018 Sebastian Dröge <sebastian@centricular.com>
+// Copyright (C) 2019 Sebastian Dröge <sebastian@centricular.com>
//
// This library is free software; you can redistribute it and/or
// modify it under the terms of the GNU Library General Public
@@ -15,1633 +15,369 @@
// Free Software Foundation, Inc., 51 Franklin Street, Suite 500,
// Boston, MA 02110-1335, USA.
-use futures::future::BoxFuture;
-use futures::future::{abortable, AbortHandle, Aborted};
-use futures::prelude::*;
+use super::ffi;
-use glib::glib_object_subclass;
-use glib::prelude::*;
-use glib::subclass;
-use glib::subclass::prelude::*;
+use std::ptr;
-use gst::prelude::*;
-use gst::subclass::prelude::*;
-use gst::{gst_debug, gst_element_error, gst_error, gst_error_msg, gst_info, gst_log, gst_trace};
-use gst_rtp::RTPBuffer;
+use glib_sys as glib_ffi;
+use gstreamer_sys as gst_ffi;
-use lazy_static::lazy_static;
+use glib::glib_wrapper;
+use glib::prelude::*;
+use glib::translate::*;
-use std::cmp::{max, min, Ordering};
-use std::collections::{BTreeSet, VecDeque};
use std::mem;
-use std::sync::Arc;
-use std::sync::Mutex as StdMutex;
-use std::time::Duration;
-
-use crate::runtime::prelude::*;
-use crate::runtime::{self, Context, PadSink, PadSinkRef, PadSrc, PadSrcRef, Task};
-
-use super::{RTPJitterBuffer, RTPJitterBufferItem, RTPPacketRateCtx};
-const DEFAULT_LATENCY_MS: u32 = 200;
-const DEFAULT_DO_LOST: bool = false;
-const DEFAULT_MAX_DROPOUT_TIME: u32 = 60000;
-const DEFAULT_MAX_MISORDER_TIME: u32 = 2000;
-const DEFAULT_CONTEXT: &str = "";
-const DEFAULT_CONTEXT_WAIT: u32 = 0;
+glib_wrapper! {
+ pub struct RTPJitterBuffer(Object<ffi::RTPJitterBuffer>);
-#[derive(Debug, Clone)]
-struct Settings {
- latency_ms: u32,
- do_lost: bool,
- max_dropout_time: u32,
- max_misorder_time: u32,
- context: String,
- context_wait: u32,
-}
-
-impl Default for Settings {
- fn default() -> Self {
- Settings {
- latency_ms: DEFAULT_LATENCY_MS,
- do_lost: DEFAULT_DO_LOST,
- max_dropout_time: DEFAULT_MAX_DROPOUT_TIME,
- max_misorder_time: DEFAULT_MAX_MISORDER_TIME,
- context: DEFAULT_CONTEXT.into(),
- context_wait: DEFAULT_CONTEXT_WAIT,
- }
+ match fn {
+ get_type => || ffi::rtp_jitter_buffer_get_type(),
}
}
-static PROPERTIES: [subclass::Property; 7] = [
- subclass::Property("latency", |name| {
- glib::ParamSpec::uint(
- name,
- "Buffer latency in ms",
- "Amount of ms to buffer",
- 0,
- std::u32::MAX,
- DEFAULT_LATENCY_MS,
- glib::ParamFlags::READWRITE,
- )
- }),
- subclass::Property("do-lost", |name| {
- glib::ParamSpec::boolean(
- name,
- "Do Lost",
- "Send an event downstream when a packet is lost",
- DEFAULT_DO_LOST,
- glib::ParamFlags::READWRITE,
- )
- }),
- subclass::Property("max-dropout-time", |name| {
- glib::ParamSpec::uint(
- name,
- "Max dropout time",
- "The maximum time (milliseconds) of missing packets tolerated.",
- 0,
- std::u32::MAX,
- DEFAULT_MAX_DROPOUT_TIME,
- glib::ParamFlags::READWRITE,
- )
- }),
- subclass::Property("max-misorder-time", |name| {
- glib::ParamSpec::uint(
- name,
- "Max misorder time",
- "The maximum time (milliseconds) of misordered packets tolerated.",
- 0,
- std::u32::MAX,
- DEFAULT_MAX_MISORDER_TIME,
- glib::ParamFlags::READWRITE,
- )
- }),
- subclass::Property("stats", |name| {
- glib::ParamSpec::boxed(
- name,
- "Statistics",
- "Various statistics",
- gst::Structure::static_type(),
- glib::ParamFlags::READABLE,
- )
- }),
- subclass::Property("context", |name| {
- glib::ParamSpec::string(
- name,
- "Context",
- "Context name to share threads with",
- Some(DEFAULT_CONTEXT),
- glib::ParamFlags::READWRITE,
- )
- }),
- subclass::Property("context-wait", |name| {
- glib::ParamSpec::uint(
- name,
- "Context Wait",
- "Throttle poll loop to run at most once every this many ms",
- 0,
- 1000,
- DEFAULT_CONTEXT_WAIT,
- glib::ParamFlags::READWRITE,
- )
- }),
-];
-
-#[derive(Eq)]
-struct GapPacket {
- buffer: gst::Buffer,
- seq: u16,
- pt: u8,
-}
-
-impl GapPacket {
- fn new(buffer: gst::Buffer) -> Self {
- let rtp_buffer = RTPBuffer::from_buffer_readable(&buffer).unwrap();
- let seq = rtp_buffer.get_seq();
- let pt = rtp_buffer.get_payload_type();
- drop(rtp_buffer);
-
- Self { buffer, seq, pt }
+unsafe impl glib::SendUnique for RTPJitterBuffer {
+ fn is_unique(&self) -> bool {
+ self.ref_count() == 1
}
}
-impl Ord for GapPacket {
- fn cmp(&self, other: &Self) -> Ordering {
- 0.cmp(&gst_rtp::compare_seqnum(self.seq, other.seq))
- }
-}
-
-impl PartialOrd for GapPacket {
- fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
- Some(self.cmp(other))
- }
-}
+impl ToGlib for RTPJitterBufferMode {
+ type GlibType = ffi::RTPJitterBufferMode;
-impl PartialEq for GapPacket {
- fn eq(&self, other: &Self) -> bool {
- self.cmp(other) == Ordering::Equal
+ fn to_glib(&self) -> ffi::RTPJitterBufferMode {
+ match *self {
+ RTPJitterBufferMode::None => ffi::RTP_JITTER_BUFFER_MODE_NONE,
+ RTPJitterBufferMode::Slave => ffi::RTP_JITTER_BUFFER_MODE_SLAVE,
+ RTPJitterBufferMode::Buffer => ffi::RTP_JITTER_BUFFER_MODE_BUFFER,
+ RTPJitterBufferMode::Synced => ffi::RTP_JITTER_BUFFER_MODE_SYNCED,
+ RTPJitterBufferMode::__Unknown(value) => value,
+ }
}
}
-struct SinkHandlerInner {
- packet_rate_ctx: RTPPacketRateCtx,
- ips_rtptime: Option<u32>,
- ips_pts: gst::ClockTime,
-
- gap_packets: BTreeSet<GapPacket>,
-
- last_pt: Option<u8>,
-
- last_in_seqnum: Option<u16>,
- last_rtptime: Option<u32>,
-}
-
-impl Default for SinkHandlerInner {
- fn default() -> Self {
- SinkHandlerInner {
- packet_rate_ctx: RTPPacketRateCtx::new(),
- ips_rtptime: None,
- ips_pts: gst::CLOCK_TIME_NONE,
- gap_packets: BTreeSet::new(),
- last_pt: None,
- last_in_seqnum: None,
- last_rtptime: None,
+impl FromGlib<ffi::RTPJitterBufferMode> for RTPJitterBufferMode {
+ fn from_glib(value: ffi::RTPJitterBufferMode) -> Self {
+ match value {
+ 0 => RTPJitterBufferMode::None,
+ 1 => RTPJitterBufferMode::Slave,
+ 2 => RTPJitterBufferMode::Buffer,
+ 4 => RTPJitterBufferMode::Synced,
+ value => RTPJitterBufferMode::__Unknown(value),
}
}
}
-#[derive(Clone, Default)]
-struct SinkHandler(Arc<StdMutex<SinkHandlerInner>>);
+pub struct RTPJitterBufferItem(Option<ptr::NonNull<ffi::RTPJitterBufferItem>>);
-impl SinkHandler {
- fn clear(&self) {
- let mut inner = self.0.lock().unwrap();
- *inner = SinkHandlerInner::default();
- }
+unsafe impl Send for RTPJitterBufferItem {}
- // For resetting if seqnum discontinuities
- fn reset(
- &self,
- inner: &mut SinkHandlerInner,
- state: &mut State,
- element: &gst::Element,
- ) -> BTreeSet<GapPacket> {
- gst_info!(CAT, obj: element, "Resetting");
-
- state.jbuf.borrow().flush();
- state.jbuf.borrow().reset_skew();
- state.discont = true;
-
- state.last_popped_seqnum = None;
- state.last_popped_pts = gst::CLOCK_TIME_NONE;
-
- inner.last_in_seqnum = None;
- inner.last_rtptime = None;
-
- state.earliest_pts = gst::CLOCK_TIME_NONE;
- state.earliest_seqnum = None;
-
- inner.ips_rtptime = None;
- inner.ips_pts = gst::CLOCK_TIME_NONE;
-
- mem::replace(&mut inner.gap_packets, BTreeSet::new())
- }
-
- fn parse_caps(
- &self,
- inner: &mut SinkHandlerInner,
- state: &mut State,
- element: &gst::Element,
- caps: &gst::Caps,
- pt: u8,
- ) -> Result<gst::FlowSuccess, gst::FlowError> {
- let s = caps.get_structure(0).ok_or(gst::FlowError::Error)?;
-
- gst_info!(CAT, obj: element, "Parsing {:?}", caps);
-
- let payload = s
- .get_some::<i32>("payload")
- .map_err(|_| gst::FlowError::Error)?;
-
- if pt != 0 && payload as u8 != pt {
- return Err(gst::FlowError::Error);
- }
-
- inner.last_pt = Some(pt);
- let clock_rate = s
- .get_some::<i32>("clock-rate")
- .map_err(|_| gst::FlowError::Error)?;
-
- if clock_rate <= 0 {
- return Err(gst::FlowError::Error);
- }
- state.clock_rate = Some(clock_rate as u32);
-
- inner.packet_rate_ctx.reset(clock_rate);
- state.jbuf.borrow().set_clock_rate(clock_rate as u32);
-
- Ok(gst::FlowSuccess::Ok)
- }
-
- fn calculate_packet_spacing(
- &self,
- inner: &mut SinkHandlerInner,
- state: &mut State,
- rtptime: u32,
+impl RTPJitterBufferItem {
+ pub fn new(
+ buffer: gst::Buffer,
+ dts: gst::ClockTime,
pts: gst::ClockTime,
- ) {
- if inner.ips_rtptime != Some(rtptime) {
- if inner.ips_pts.is_some() && pts.is_some() {
- let new_packet_spacing = pts - inner.ips_pts;
- let old_packet_spacing = state.packet_spacing;
-
- assert!(old_packet_spacing.is_some());
- if old_packet_spacing > new_packet_spacing {
- state.packet_spacing = (new_packet_spacing + 3 * old_packet_spacing) / 4;
- } else if !old_packet_spacing.is_zero() {
- state.packet_spacing = (3 * new_packet_spacing + old_packet_spacing) / 4;
- } else {
- state.packet_spacing = new_packet_spacing;
- }
+ seqnum: Option<u16>,
+ rtptime: u32,
+ ) -> RTPJitterBufferItem {
+ unsafe {
+ let ptr = ptr::NonNull::new(glib_sys::g_slice_alloc0(mem::size_of::<
+ ffi::RTPJitterBufferItem,
+ >()) as *mut ffi::RTPJitterBufferItem)
+ .expect("Allocation failed");
+ ptr::write(
+ ptr.as_ptr(),
+ ffi::RTPJitterBufferItem {
+ data: buffer.into_ptr() as *mut _,
+ next: ptr::null_mut(),
+ prev: ptr::null_mut(),
+ r#type: 0,
+ dts: dts.to_glib(),
+ pts: pts.to_glib(),
+ seqnum: seqnum.map(|s| s as u32).unwrap_or(std::u32::MAX),
+ count: 1,
+ rtptime,
+ },
+ );
- gst_debug!(
- CAT,
- "new packet spacing {}, old packet spacing {} combined to {}",
- new_packet_spacing,
- old_packet_spacing,
- state.packet_spacing
- );
- }
- inner.ips_rtptime = Some(rtptime);
- inner.ips_pts = pts;
+ RTPJitterBufferItem(Some(ptr))
}
}
- fn handle_big_gap_buffer(
- &self,
- inner: &mut SinkHandlerInner,
- element: &gst::Element,
- buffer: gst::Buffer,
- pt: u8,
- ) -> bool {
- let gap_packets_length = inner.gap_packets.len();
- let mut reset = false;
-
- gst_debug!(
- CAT,
- obj: element,
- "Handling big gap, gap packets length: {}",
- gap_packets_length
- );
-
- inner.gap_packets.insert(GapPacket::new(buffer));
-
- if gap_packets_length > 0 {
- let mut prev_gap_seq = std::u32::MAX;
- let mut all_consecutive = true;
-
- for gap_packet in inner.gap_packets.iter() {
- gst_log!(
- CAT,
- obj: element,
- "Looking at gap packet with seq {}",
- gap_packet.seq,
- );
-
- all_consecutive = gap_packet.pt == pt;
-
- if prev_gap_seq == std::u32::MAX {
- prev_gap_seq = gap_packet.seq as u32;
- } else if gst_rtp::compare_seqnum(gap_packet.seq, prev_gap_seq as u16) != -1 {
- all_consecutive = false;
- } else {
- prev_gap_seq = gap_packet.seq as u32;
- }
-
- if !all_consecutive {
- break;
- }
- }
-
- gst_debug!(CAT, obj: element, "all consecutive: {}", all_consecutive);
-
- if all_consecutive && gap_packets_length > 3 {
- reset = true;
- } else if !all_consecutive {
- inner.gap_packets.clear();
- }
+ pub fn into_buffer(mut self) -> gst::Buffer {
+ unsafe {
+ let item = self.0.take().expect("Invalid wrapper");
+ let buf = item.as_ref().data as *mut gst_ffi::GstBuffer;
+ glib_sys::g_slice_free1(
+ mem::size_of::<ffi::RTPJitterBufferItem>(),
+ item.as_ptr() as *mut _,
+ );
+ from_glib_full(buf)
}
-
- reset
}
- fn store(
- &self,
- inner: &mut SinkHandlerInner,
- pad: &gst::Pad,
- element: &gst::Element,
- buffer: gst::Buffer,
- ) -> Result<gst::FlowSuccess, gst::FlowError> {
- let jb = JitterBuffer::from_instance(element);
- let mut state = jb.state.lock().unwrap();
-
- let (max_misorder_time, max_dropout_time) = {
- let settings = jb.settings.lock().unwrap();
- (settings.max_misorder_time, settings.max_dropout_time)
- };
-
- let (seq, rtptime, pt) = {
- let rtp_buffer =
- RTPBuffer::from_buffer_readable(&buffer).map_err(|_| gst::FlowError::Error)?;
- (
- rtp_buffer.get_seq(),
- rtp_buffer.get_timestamp(),
- rtp_buffer.get_payload_type(),
- )
- };
-
- let mut pts = buffer.get_pts();
- let mut dts = buffer.get_dts();
- let mut estimated_dts = false;
-
- gst_log!(
- CAT,
- obj: element,
- "Storing buffer, seq: {}, rtptime: {}, pt: {}",
- seq,
- rtptime,
- pt
- );
-
- if dts.is_none() {
- dts = pts;
- } else if pts.is_none() {
- pts = dts;
- }
-
- if dts.is_none() {
- dts = element.get_current_running_time();
- pts = dts;
-
- estimated_dts = state.clock_rate.is_some();
- } else {
- dts = state.segment.to_running_time(dts);
- }
-
- if state.clock_rate.is_none() {
- inner.ips_rtptime = Some(rtptime);
- inner.ips_pts = pts;
- }
-
- if inner.last_pt != Some(pt) {
- inner.last_pt = Some(pt);
- state.clock_rate = None;
-
- gst_debug!(CAT, obj: pad, "New payload type: {}", pt);
-
- if let Some(caps) = pad.get_current_caps() {
- /* Ignore errors at this point, as we want to emit request-pt-map */
- let _ = self.parse_caps(inner, &mut state, element, &caps, pt);
- }
- }
-
- let mut state = {
- if state.clock_rate.is_none() {
- drop(state);
- let caps = element
- .emit("request-pt-map", &[&(pt as u32)])
- .map_err(|_| gst::FlowError::Error)?
- .ok_or(gst::FlowError::Error)?
- .get::<gst::Caps>()
- .map_err(|_| gst::FlowError::Error)?
- .ok_or(gst::FlowError::Error)?;
- let mut state = jb.state.lock().unwrap();
- self.parse_caps(inner, &mut state, element, &caps, pt)?;
- state
+ pub fn get_dts(&self) -> gst::ClockTime {
+ unsafe {
+ let item = self.0.as_ref().expect("Invalid wrapper");
+ if item.as_ref().dts == gst_ffi::GST_CLOCK_TIME_NONE {
+ gst::CLOCK_TIME_NONE
} else {
- state
+ gst::ClockTime(Some(item.as_ref().dts))
}
- };
-
- inner.packet_rate_ctx.update(seq, rtptime);
-
- let max_dropout = inner
- .packet_rate_ctx
- .get_max_dropout(max_dropout_time as i32);
- let max_misorder = inner
- .packet_rate_ctx
- .get_max_dropout(max_misorder_time as i32);
-
- pts = state.jbuf.borrow().calculate_pts(
- dts,
- estimated_dts,
- rtptime,
- element.get_base_time(),
- 0,
- false,
- );
-
- if pts.is_none() {
- gst_debug!(
- CAT,
- obj: element,
- "cannot calculate a valid pts for #{}, discard",
- seq
- );
- return Ok(gst::FlowSuccess::Ok);
}
+ }
- if let Some(last_in_seqnum) = inner.last_in_seqnum {
- let gap = gst_rtp::compare_seqnum(last_in_seqnum as u16, seq);
- if gap == 1 {
- self.calculate_packet_spacing(inner, &mut state, rtptime, pts);
+ pub fn get_pts(&self) -> gst::ClockTime {
+ unsafe {
+ let item = self.0.as_ref().expect("Invalid wrapper");
+ if item.as_ref().pts == gst_ffi::GST_CLOCK_TIME_NONE {
+ gst::CLOCK_TIME_NONE
} else {
- if (gap != -1 && gap < -(max_misorder as i32)) || (gap >= max_dropout as i32) {
- let reset = self.handle_big_gap_buffer(inner, element, buffer, pt);
- if reset {
- // Handle reset in `enqueue_item` to avoid recursion
- return Err(gst::FlowError::CustomError);
- } else {
- return Ok(gst::FlowSuccess::Ok);
- }
- }
- inner.ips_pts = gst::CLOCK_TIME_NONE;
- inner.ips_rtptime = None;
+ gst::ClockTime(Some(item.as_ref().pts))
}
-
- inner.gap_packets.clear();
- }
-
- if let Some(last_popped_seqnum) = state.last_popped_seqnum {
- let gap = gst_rtp::compare_seqnum(last_popped_seqnum, seq);
-
- if gap <= 0 {
- state.stats.num_late += 1;
- gst_debug!(CAT, obj: element, "Dropping late {}", seq);
- return Ok(gst::FlowSuccess::Ok);
- }
- }
-
- inner.last_in_seqnum = Some(seq);
-
- let jb_item = if estimated_dts {
- RTPJitterBufferItem::new(buffer, gst::CLOCK_TIME_NONE, pts, Some(seq), rtptime)
- } else {
- RTPJitterBufferItem::new(buffer, dts, pts, Some(seq), rtptime)
- };
-
- let (success, _, _) = state.jbuf.borrow().insert(jb_item);
-
- if !success {
- /* duplicate */
- return Ok(gst::FlowSuccess::Ok);
- }
-
- if Some(rtptime) == inner.last_rtptime {
- state.equidistant -= 2;
- } else {
- state.equidistant += 1;
}
-
- state.equidistant = min(max(state.equidistant, -7), 7);
-
- inner.last_rtptime = Some(rtptime);
-
- if state.earliest_pts.is_none()
- || (pts.is_some()
- && (pts < state.earliest_pts
- || (pts == state.earliest_pts
- && state
- .earliest_seqnum
- .map(|earliest_seqnum| seq > earliest_seqnum)
- .unwrap_or(false))))
- {
- state.earliest_pts = pts;
- state.earliest_seqnum = Some(seq);
- }
-
- gst_log!(CAT, obj: pad, "Stored buffer");
-
- Ok(gst::FlowSuccess::Ok)
}
- fn enqueue_item(
- &self,
- pad: &gst::Pad,
- element: &gst::Element,
- buffer: Option<gst::Buffer>,
- ) -> Result<gst::FlowSuccess, gst::FlowError> {
- let mut inner = self.0.lock().unwrap();
-
- let mut buffers = VecDeque::new();
- if let Some(buf) = buffer {
- buffers.push_back(buf);
- }
-
- // This is to avoid recursion with `store`, `reset` and `enqueue_item`
- while let Some(buf) = buffers.pop_front() {
- if let Err(err) = self.store(&mut inner, pad, element, buf) {
- match err {
- gst::FlowError::CustomError => {
- let jb = JitterBuffer::from_instance(element);
- let mut state = jb.state.lock().unwrap();
- for gap_packet in self.reset(&mut inner, &mut state, element) {
- buffers.push_back(gap_packet.buffer);
- }
- }
- other => return Err(other),
- }
- }
- }
-
- let jb = JitterBuffer::from_instance(element);
- let mut state = jb.state.lock().unwrap();
-
- let (latency, context_wait) = {
- let settings = jb.settings.lock().unwrap();
- (
- settings.latency_ms as u64 * gst::MSECOND,
- settings.context_wait as u64 * gst::MSECOND,
- )
- };
-
- // Reschedule if needed
- let (_, next_wakeup) =
- jb.src_pad_handler
- .get_next_wakeup(&element, &state, latency, context_wait);
- if let Some((next_wakeup, _)) = next_wakeup {
- if let Some((previous_next_wakeup, ref abort_handle)) = state.wait_handle {
- if previous_next_wakeup.is_none() || previous_next_wakeup > next_wakeup {
- gst_debug!(
- CAT,
- obj: pad,
- "Rescheduling for new item {} < {}",
- next_wakeup,
- previous_next_wakeup
- );
- abort_handle.abort();
- state.wait_handle = None;
- }
+ pub fn get_seqnum(&self) -> Option<u16> {
+ unsafe {
+ let item = self.0.as_ref().expect("Invalid wrapper");
+ if item.as_ref().seqnum == std::u32::MAX {
+ None
+ } else {
+ Some(item.as_ref().seqnum as u16)
}
}
- state.last_res
- }
-}
-
-impl PadSinkHandler for SinkHandler {
- type ElementImpl = JitterBuffer;
-
- fn sink_chain(
- &self,
- pad: &PadSinkRef,
- _jb: &JitterBuffer,
- element: &gst::Element,
- buffer: gst::Buffer,
- ) -> BoxFuture<'static, Result<gst::FlowSuccess, gst::FlowError>> {
- let pad_weak = pad.downgrade();
- let element = element.clone();
- let this = self.clone();
-
- async move {
- let pad = pad_weak.upgrade().expect("PadSink no longer exists");
-
- gst_debug!(CAT, obj: pad.gst_pad(), "Handling {:?}", buffer);
- this.enqueue_item(pad.gst_pad(), &element, Some(buffer))
- }
- .boxed()
}
- fn sink_event(
- &self,
- pad: &PadSinkRef,
- jb: &JitterBuffer,
- element: &gst::Element,
- event: gst::Event,
- ) -> bool {
- use gst::EventView;
-
- gst_log!(CAT, obj: pad.gst_pad(), "Handling {:?}", event);
-
- if let EventView::FlushStart(..) = event.view() {
- if let Err(err) = jb.task.flush_start() {
- gst_error!(CAT, obj: pad.gst_pad(), "FlushStart failed {:?}", err);
- gst_element_error!(
- element,
- gst::StreamError::Failed,
- ("Internal data stream error"),
- ["FlushStart failed {:?}", err]
- );
- return false;
- }
+ #[allow(dead_code)]
+ pub fn get_rtptime(&self) -> u32 {
+ unsafe {
+ let item = self.0.as_ref().expect("Invalid wrapper");
+ item.as_ref().rtptime
}
-
- gst_log!(CAT, obj: pad.gst_pad(), "Forwarding {:?}", event);
- jb.src_pad.gst_pad().push_event(event)
}
+}
- fn sink_event_serialized(
- &self,
- pad: &PadSinkRef,
- _jb: &JitterBuffer,
- element: &gst::Element,
- event: gst::Event,
- ) -> BoxFuture<'static, bool> {
- use gst::EventView;
-
- let pad_weak = pad.downgrade();
- let element = element.clone();
-
- async move {
- let pad = pad_weak.upgrade().expect("PadSink no longer exists");
-
- gst_log!(CAT, obj: pad.gst_pad(), "Handling {:?}", event);
-
- let jb = JitterBuffer::from_instance(&element);
-
- let mut forward = true;
- match event.view() {
- EventView::Segment(e) => {
- let mut state = jb.state.lock().unwrap();
- state.segment = e
- .get_segment()
- .clone()
- .downcast::<gst::format::Time>()
- .unwrap();
- }
- EventView::FlushStop(..) => {
- if let Err(err) = jb.task.flush_stop() {
- gst_error!(CAT, obj: pad.gst_pad(), "FlushStop failed {:?}", err);
- gst_element_error!(
- element,
- gst::StreamError::Failed,
- ("Internal data stream error"),
- ["FlushStop failed {:?}", err]
- );
- return false;
- }
- }
- EventView::Eos(..) => {
- let mut state = jb.state.lock().unwrap();
- state.eos = true;
- if let Some((_, abort_handle)) = state.wait_handle.take() {
- abort_handle.abort();
- }
- forward = false;
+impl Drop for RTPJitterBufferItem {
+ fn drop(&mut self) {
+ unsafe {
+ if let Some(ref item) = self.0 {
+ if !item.as_ref().data.is_null() {
+ gst_ffi::gst_mini_object_unref(item.as_ref().data as *mut _);
}
- _ => (),
- };
- if forward {
- // FIXME: These events should really be queued up and stay in order
- gst_log!(CAT, obj: pad.gst_pad(), "Forwarding serialized {:?}", event);
- jb.src_pad.push_event(event).await
- } else {
- true
+ glib_sys::g_slice_free1(
+ mem::size_of::<ffi::RTPJitterBufferItem>(),
+ item.as_ptr() as *mut _,
+ );
}
}
- .boxed()
}
}
-#[derive(Clone, Default)]
-struct SrcHandler;
-
-impl SrcHandler {
- fn clear(&self) {}
-
- fn generate_lost_events(
- &self,
- state: &mut State,
- element: &gst::Element,
- seqnum: u16,
- pts: gst::ClockTime,
- discont: &mut bool,
- ) -> Vec<gst::Event> {
- let (latency_ns, do_lost) = {
- let jb = JitterBuffer::from_instance(element);
- let settings = jb.settings.lock().unwrap();
- (
- settings.latency_ms as u64 * gst::MSECOND.nseconds().unwrap(),
- settings.do_lost,
- )
- };
-
- let mut events = vec![];
-
- let last_popped_seqnum = match state.last_popped_seqnum {
- None => return events,
- Some(seq) => seq,
- };
-
- gst_debug!(
- CAT,
- obj: element,
- "Generating lost events seq: {}, last popped seq: {:?}",
- seqnum,
- last_popped_seqnum,
- );
-
- let mut lost_seqnum = last_popped_seqnum.wrapping_add(1);
- let gap = gst_rtp::compare_seqnum(lost_seqnum, seqnum) as i64;
-
- if gap > 0 {
- let interval =
- pts.nseconds().unwrap() as i64 - state.last_popped_pts.nseconds().unwrap() as i64;
- let gap = gap as u64;
- let spacing = if interval >= 0 {
- interval as u64 / (gap + 1)
- } else {
- 0
- };
-
- *discont = true;
-
- if state.equidistant > 0 && gap > 1 && gap * spacing > latency_ns {
- let n_packets = gap - latency_ns / spacing;
-
- if do_lost {
- let s = gst::Structure::new(
- "GstRTPPacketLost",
- &[
- ("seqnum", &(lost_seqnum as u32)),
- (
- "timestamp",
- &(state.last_popped_pts + gst::ClockTime(Some(spacing))),
- ),
- ("duration", &(n_packets * spacing)),
- ("retry", &0),
- ],
- );
-
- events.push(gst::event::CustomDownstream::new(s));
- }
-
- lost_seqnum = lost_seqnum.wrapping_add(n_packets as u16);
- state.last_popped_pts += gst::ClockTime(Some(n_packets * spacing));
- state.stats.num_lost += n_packets;
- }
-
- while lost_seqnum != seqnum {
- let timestamp = state.last_popped_pts + gst::ClockTime(Some(spacing));
- let duration = if state.equidistant > 0 { spacing } else { 0 };
-
- state.last_popped_pts = timestamp;
-
- if do_lost {
- let s = gst::Structure::new(
- "GstRTPPacketLost",
- &[
- ("seqnum", &(lost_seqnum as u32)),
- ("timestamp", &timestamp),
- ("duration", &duration),
- ("retry", &0),
- ],
- );
-
- events.push(gst::event::CustomDownstream::new(s));
- }
+pub struct RTPPacketRateCtx(Box<ffi::RTPPacketRateCtx>);
- state.stats.num_lost += 1;
+unsafe impl Send for RTPPacketRateCtx {}
- lost_seqnum = lost_seqnum.wrapping_add(1);
- }
+impl RTPPacketRateCtx {
+ pub fn new() -> RTPPacketRateCtx {
+ unsafe {
+ let mut ptr = std::mem::MaybeUninit::uninit();
+ ffi::gst_rtp_packet_rate_ctx_reset(ptr.as_mut_ptr(), -1);
+ RTPPacketRateCtx(Box::new(ptr.assume_init()))
}
-
- events
}
- async fn pop_and_push(
- &self,
- element: &gst::Element,
- ) -> Result<gst::FlowSuccess, gst::FlowError> {
- let jb = JitterBuffer::from_instance(element);
-
- let (lost_events, buffer, seq) = {
- let mut state = jb.state.lock().unwrap();
-
- let mut discont = false;
- let (jb_item, _) = state.jbuf.borrow().pop();
-
- let jb_item = match jb_item {
- None => {
- if state.eos {
- return Err(gst::FlowError::Eos);
- } else {
- return Ok(gst::FlowSuccess::Ok);
- }
- }
- Some(item) => item,
- };
-
- let dts = jb_item.get_dts();
- let pts = jb_item.get_pts();
- let seq = jb_item.get_seqnum();
- let mut buffer = jb_item.into_buffer();
-
- let lost_events = {
- let buffer = buffer.make_mut();
-
- buffer.set_dts(state.segment.to_running_time(dts));
- buffer.set_pts(state.segment.to_running_time(pts));
-
- if state.last_popped_pts.is_some() && buffer.get_pts() < state.last_popped_pts {
- buffer.set_pts(state.last_popped_pts)
- }
-
- let lost_events = if let Some(seq) = seq {
- self.generate_lost_events(&mut state, element, seq, pts, &mut discont)
- } else {
- vec![]
- };
-
- if state.discont {
- discont = true;
- state.discont = false;
- }
-
- if discont {
- buffer.set_flags(gst::BufferFlags::DISCONT);
- }
-
- lost_events
- };
-
- state.last_popped_pts = buffer.get_pts();
- if let Some(pts) = state.last_popped_pts.nseconds() {
- state.position = pts.into();
- }
- state.last_popped_seqnum = seq;
-
- state.stats.num_pushed += 1;
-
- (lost_events, buffer, seq)
- };
-
- for event in lost_events {
- gst_debug!(CAT, obj: jb.src_pad.gst_pad(), "Pushing lost event {:?}", event);
- let _ = jb.src_pad.push_event(event).await;
- }
-
- gst_debug!(CAT, obj: jb.src_pad.gst_pad(), "Pushing {:?} with seq {:?}", buffer, seq);
-
- jb.src_pad.push(buffer).await
+ pub fn reset(&mut self, clock_rate: i32) {
+ unsafe { ffi::gst_rtp_packet_rate_ctx_reset(&mut *self.0, clock_rate) }
}
- fn get_next_wakeup(
- &self,
- element: &gst::Element,
- state: &State,
- latency: gst::ClockTime,
- context_wait: gst::ClockTime,
- ) -> (gst::ClockTime, Option<(gst::ClockTime, Duration)>) {
- let now = element.get_current_running_time();
-
- gst_debug!(
- CAT,
- obj: element,
- "Now is {}, EOS {}, earliest pts is {}, packet_spacing {} and latency {}",
- now,
- state.eos,
- state.earliest_pts,
- state.packet_spacing,
- latency
- );
-
- if state.eos {
- gst_debug!(CAT, obj: element, "EOS, not waiting");
- return (now, Some((now, Duration::from_nanos(0))));
- }
-
- if state.earliest_pts.is_none() {
- return (now, None);
- }
-
- let next_wakeup = state.earliest_pts + latency - state.packet_spacing - context_wait / 2;
-
- let delay = next_wakeup
- .saturating_sub(now)
- .unwrap_or_else(gst::ClockTime::zero)
- .nseconds()
- .unwrap();
-
- gst_debug!(
- CAT,
- obj: element,
- "Next wakeup at {} with delay {}",
- next_wakeup,
- delay
- );
-
- (now, Some((next_wakeup, Duration::from_nanos(delay))))
+ pub fn update(&mut self, seqnum: u16, ts: u32) -> u32 {
+ unsafe { ffi::gst_rtp_packet_rate_ctx_update(&mut *self.0, seqnum, ts) }
}
-}
-impl PadSrcHandler for SrcHandler {
- type ElementImpl = JitterBuffer;
-
- fn src_event(
- &self,
- pad: &PadSrcRef,
- jb: &JitterBuffer,
- element: &gst::Element,
- event: gst::Event,
- ) -> bool {
- use gst::EventView;
-
- gst_log!(CAT, obj: pad.gst_pad(), "Handling {:?}", event);
-
- match event.view() {
- EventView::FlushStart(..) => {
- if let Err(err) = jb.task.flush_start() {
- gst_error!(CAT, obj: pad.gst_pad(), "FlushStart failed {:?}", err);
- gst_element_error!(
- element,
- gst::StreamError::Failed,
- ("Internal data stream error"),
- ["FlushStart failed {:?}", err]
- );
- return false;
- }
- }
- EventView::FlushStop(..) => {
- if let Err(err) = jb.task.flush_stop() {
- gst_error!(CAT, obj: pad.gst_pad(), "FlushStop failed {:?}", err);
- gst_element_error!(
- element,
- gst::StreamError::Failed,
- ("Internal data stream error"),
- ["FlushStop failed {:?}", err]
- );
- return false;
- }
- }
- _ => (),
- }
-
- gst_log!(CAT, obj: pad.gst_pad(), "Forwarding {:?}", event);
- jb.sink_pad.gst_pad().push_event(event)
+ pub fn get_max_dropout(&mut self, time_ms: i32) -> u32 {
+ unsafe { ffi::gst_rtp_packet_rate_ctx_get_max_dropout(&mut *self.0, time_ms) }
}
- fn src_query(
- &self,
- pad: &PadSrcRef,
- jb: &JitterBuffer,
- _element: &gst::Element,
- query: &mut gst::QueryRef,
- ) -> bool {
- use gst::QueryView;
-
- gst_log!(CAT, obj: pad.gst_pad(), "Forwarding {:?}", query);
-
- match query.view_mut() {
- QueryView::Latency(ref mut q) => {
- let mut peer_query = gst::query::Latency::new();
-
- let ret = jb.sink_pad.gst_pad().peer_query(&mut peer_query);
-
- if ret {
- let settings = jb.settings.lock().unwrap();
- let (_, mut min_latency, _) = peer_query.get_result();
- min_latency += (settings.latency_ms as u64) * gst::SECOND;
- let max_latency = gst::CLOCK_TIME_NONE;
-
- q.set(true, min_latency, max_latency);
- }
-
- ret
- }
- QueryView::Position(ref mut q) => {
- if q.get_format() != gst::Format::Time {
- jb.sink_pad.gst_pad().peer_query(query)
- } else {
- let state = jb.state.lock().unwrap();
- let position = state.position;
- q.set(position);
- true
- }
- }
- _ => jb.sink_pad.gst_pad().peer_query(query),
- }
+ #[allow(dead_code)]
+ pub fn get_max_disorder(&mut self, time_ms: i32) -> u32 {
+ unsafe { ffi::gst_rtp_packet_rate_ctx_get_max_disorder(&mut *self.0, time_ms) }
}
}
-#[derive(Debug)]
-struct Stats {
- num_pushed: u64,
- num_lost: u64,
- num_late: u64,
-}
-
-impl Default for Stats {
+impl Default for RTPPacketRateCtx {
fn default() -> Self {
- Self {
- num_pushed: 0,
- num_lost: 0,
- num_late: 0,
- }
+ RTPPacketRateCtx::new()
}
}
-// Shared state between element, sink and source pad
-struct State {
- jbuf: glib::SendUniqueCell<RTPJitterBuffer>,
-
- last_res: Result<gst::FlowSuccess, gst::FlowError>,
- position: gst::ClockTime,
-
- segment: gst::FormattedSegment<gst::ClockTime>,
- clock_rate: Option<u32>,
-
- packet_spacing: gst::ClockTime,
- equidistant: i32,
-
- discont: bool,
- eos: bool,
-
- last_popped_seqnum: Option<u16>,
- last_popped_pts: gst::ClockTime,
-
- stats: Stats,
-
- earliest_pts: gst::ClockTime,
- earliest_seqnum: Option<u16>,
-
- wait_handle: Option<(gst::ClockTime, AbortHandle)>,
+#[derive(Debug, Eq, PartialEq, Ord, PartialOrd, Hash, Clone, Copy)]
+pub enum RTPJitterBufferMode {
+ r#None,
+ Slave,
+ Buffer,
+ Synced,
+ __Unknown(i32),
}
-impl Default for State {
- fn default() -> State {
- State {
- jbuf: glib::SendUniqueCell::new(RTPJitterBuffer::new()).unwrap(),
-
- last_res: Ok(gst::FlowSuccess::Ok),
- position: gst::CLOCK_TIME_NONE,
-
- segment: gst::FormattedSegment::<gst::ClockTime>::new(),
- clock_rate: None,
-
- packet_spacing: gst::ClockTime::zero(),
- equidistant: 0,
-
- discont: true,
- eos: false,
-
- last_popped_seqnum: None,
- last_popped_pts: gst::CLOCK_TIME_NONE,
-
- stats: Stats::default(),
-
- earliest_pts: gst::CLOCK_TIME_NONE,
- earliest_seqnum: None,
-
- wait_handle: None,
- }
+impl RTPJitterBuffer {
+ pub fn new() -> RTPJitterBuffer {
+ unsafe { from_glib_full(ffi::rtp_jitter_buffer_new()) }
}
-}
-
-struct JitterBufferTask {
- element: gst::Element,
- src_pad_handler: SrcHandler,
- sink_pad_handler: SinkHandler,
-}
-impl JitterBufferTask {
- fn new(
- element: &gst::Element,
- src_pad_handler: &SrcHandler,
- sink_pad_handler: &SinkHandler,
- ) -> Self {
- JitterBufferTask {
- element: element.clone(),
- src_pad_handler: src_pad_handler.clone(),
- sink_pad_handler: sink_pad_handler.clone(),
- }
+ #[allow(dead_code)]
+ pub fn get_mode(&self) -> RTPJitterBufferMode {
+ unsafe { from_glib(ffi::rtp_jitter_buffer_get_mode(self.to_glib_none().0)) }
}
-}
-impl TaskImpl for JitterBufferTask {
- fn start(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
- async move {
- gst_log!(CAT, obj: &self.element, "Starting task");
-
- self.src_pad_handler.clear();
- self.sink_pad_handler.clear();
-
- let jb = JitterBuffer::from_instance(&self.element);
- *jb.state.lock().unwrap() = State::default();
-
- gst_log!(CAT, obj: &self.element, "Task started");
- Ok(())
- }
- .boxed()
+ #[allow(dead_code)]
+ pub fn set_mode(&self, mode: RTPJitterBufferMode) {
+ unsafe { ffi::rtp_jitter_buffer_set_mode(self.to_glib_none().0, mode.to_glib()) }
}
- fn iterate(&mut self) -> BoxFuture<'_, Result<(), gst::FlowError>> {
- async move {
- let jb = JitterBuffer::from_instance(&self.element);
- let (latency, context_wait) = {
- let settings = jb.settings.lock().unwrap();
- (
- settings.latency_ms as u64 * gst::MSECOND,
- settings.context_wait as u64 * gst::MSECOND,
- )
- };
-
- loop {
- let delay_fut = {
- let mut state = jb.state.lock().unwrap();
- let (_, next_wakeup) = self.src_pad_handler.get_next_wakeup(
- &self.element,
- &state,
- latency,
- context_wait,
- );
-
- let (delay_fut, abort_handle) = match next_wakeup {
- Some((_, delay)) if delay == Duration::from_nanos(0) => (None, None),
- _ => {
- let (delay_fut, abort_handle) = abortable(async move {
- match next_wakeup {
- Some((_, delay)) => {
- runtime::time::delay_for(delay).await;
- }
- None => {
- future::pending::<()>().await;
- }
- };
- });
-
- let next_wakeup =
- next_wakeup.map(|w| w.0).unwrap_or(gst::CLOCK_TIME_NONE);
- (Some(delay_fut), Some((next_wakeup, abort_handle)))
- }
- };
-
- state.wait_handle = abort_handle;
-
- delay_fut
- };
-
- // Got aborted, reschedule if needed
- if let Some(delay_fut) = delay_fut {
- gst_debug!(CAT, obj: &self.element, "Waiting");
- if let Err(Aborted) = delay_fut.await {
- gst_debug!(CAT, obj: &self.element, "Waiting aborted");
- return Ok(());
- }
- }
-
- let (head_pts, head_seq) = {
- let state = jb.state.lock().unwrap();
- //
- // Check earliest PTS as we have just taken the lock
- let (now, next_wakeup) = self.src_pad_handler.get_next_wakeup(
- &self.element,
- &state,
- latency,
- context_wait,
- );
-
- gst_debug!(
- CAT,
- obj: &self.element,
- "Woke up at {}, earliest_pts {}",
- now,
- state.earliest_pts
- );
-
- if let Some((next_wakeup, _)) = next_wakeup {
- if next_wakeup > now {
- // Reschedule and wait a bit longer in the next iteration
- return Ok(());
- }
- } else {
- return Ok(());
- }
-
- let (head_pts, head_seq) = state.jbuf.borrow().peek();
-
- (head_pts, head_seq)
- };
-
- let res = self.src_pad_handler.pop_and_push(&self.element).await;
-
- {
- let mut state = jb.state.lock().unwrap();
-
- state.last_res = res;
-
- if head_pts == state.earliest_pts && head_seq == state.earliest_seqnum {
- let (earliest_pts, earliest_seqnum) = state.jbuf.borrow().find_earliest();
- state.earliest_pts = earliest_pts;
- state.earliest_seqnum = earliest_seqnum;
- }
-
- if res.is_ok() {
- // Return and reschedule if the next packet would be in the future
- // Check earliest PTS as we have just taken the lock
- let (now, next_wakeup) = self.src_pad_handler.get_next_wakeup(
- &self.element,
- &state,
- latency,
- context_wait,
- );
- if let Some((next_wakeup, _)) = next_wakeup {
- if next_wakeup > now {
- // Reschedule and wait a bit longer in the next iteration
- return Ok(());
- }
- } else {
- return Ok(());
- }
- }
- }
-
- if let Err(err) = res {
- match err {
- gst::FlowError::Eos => {
- gst_debug!(CAT, obj: &self.element, "Pushing EOS event");
- let _ = jb.src_pad.push_event(gst::event::Eos::new()).await;
- }
- gst::FlowError::Flushing => gst_debug!(CAT, obj: &self.element, "Flushing"),
- err => gst_error!(CAT, obj: &self.element, "Error {}", err),
- }
-
- return Err(err);
- }
- }
- }
- .boxed()
+ #[allow(dead_code)]
+ pub fn get_delay(&self) -> gst::ClockTime {
+ unsafe { from_glib(ffi::rtp_jitter_buffer_get_delay(self.to_glib_none().0)) }
}
- fn stop(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
- async move {
- gst_log!(CAT, obj: &self.element, "Stopping task");
-
- let jb = JitterBuffer::from_instance(&self.element);
- let mut jb_state = jb.state.lock().unwrap();
-
- if let Some((_, abort_handle)) = jb_state.wait_handle.take() {
- abort_handle.abort();
- }
-
- self.src_pad_handler.clear();
- self.sink_pad_handler.clear();
-
- *jb_state = State::default();
-
- gst_log!(CAT, obj: &self.element, "Task stopped");
- Ok(())
- }
- .boxed()
+ pub fn set_delay(&self, delay: gst::ClockTime) {
+ unsafe { ffi::rtp_jitter_buffer_set_delay(self.to_glib_none().0, delay.to_glib()) }
}
-}
-
-struct JitterBuffer {
- sink_pad: PadSink,
- src_pad: PadSrc,
- sink_pad_handler: SinkHandler,
- src_pad_handler: SrcHandler,
- task: Task,
- state: StdMutex<State>,
- settings: StdMutex<Settings>,
-}
-
-lazy_static! {
- static ref CAT: gst::DebugCategory = gst::DebugCategory::new(
- "ts-jitterbuffer",
- gst::DebugColorFlags::empty(),
- Some("Thread-sharing jitterbuffer"),
- );
-}
-impl JitterBuffer {
- fn clear_pt_map(&self, element: &gst::Element) {
- gst_info!(CAT, obj: element, "Clearing PT map");
-
- let mut state = self.state.lock().unwrap();
- state.clock_rate = None;
- state.jbuf.borrow().reset_skew();
+ pub fn set_clock_rate(&self, clock_rate: u32) {
+ unsafe { ffi::rtp_jitter_buffer_set_clock_rate(self.to_glib_none().0, clock_rate) }
}
- fn prepare(&self, element: &gst::Element) -> Result<(), gst::ErrorMessage> {
- gst_info!(CAT, obj: element, "Preparing");
-
- let context = {
- let settings = self.settings.lock().unwrap();
- Context::acquire(&settings.context, settings.context_wait).unwrap()
- };
-
- self.task
- .prepare(
- JitterBufferTask::new(element, &self.src_pad_handler, &self.sink_pad_handler),
- context,
- )
- .map_err(|err| {
- gst_error_msg!(
- gst::ResourceError::OpenRead,
- ["Error preparing Task: {:?}", err]
- )
- })?;
-
- gst_info!(CAT, obj: element, "Prepared");
-
- Ok(())
+ #[allow(dead_code)]
+ pub fn get_clock_rate(&self) -> u32 {
+ unsafe { ffi::rtp_jitter_buffer_get_clock_rate(self.to_glib_none().0) }
}
- fn unprepare(&self, element: &gst::Element) {
- gst_debug!(CAT, obj: element, "Unpreparing");
- self.task.unprepare().unwrap();
- gst_debug!(CAT, obj: element, "Unprepared");
- }
+ pub fn calculate_pts(
+ &self,
+ dts: gst::ClockTime,
+ estimated_dts: bool,
+ rtptime: u32,
+ base_time: gst::ClockTime,
+ gap: i32,
+ is_rtx: bool,
+ ) -> gst::ClockTime {
+ unsafe {
+ let pts = ffi::rtp_jitter_buffer_calculate_pts(
+ self.to_glib_none().0,
+ dts.to_glib(),
+ estimated_dts.to_glib(),
+ rtptime,
+ base_time.to_glib(),
+ gap,
+ is_rtx.to_glib(),
+ );
- fn start(&self, element: &gst::Element) -> Result<(), gst::ErrorMessage> {
- gst_debug!(CAT, obj: element, "Starting");
- self.task.start()?;
- gst_debug!(CAT, obj: element, "Started");
- Ok(())
+ if pts == gst_ffi::GST_CLOCK_TIME_NONE {
+ gst::CLOCK_TIME_NONE
+ } else {
+ pts.into()
+ }
+ }
}
- fn stop(&self, element: &gst::Element) -> Result<(), gst::ErrorMessage> {
- gst_debug!(CAT, obj: element, "Stopping");
- self.task.stop()?;
- gst_debug!(CAT, obj: element, "Stopped");
- Ok(())
+ pub fn insert(&self, mut item: RTPJitterBufferItem) -> (bool, bool, i32) {
+ unsafe {
+ let mut head = mem::MaybeUninit::uninit();
+ let mut percent = mem::MaybeUninit::uninit();
+ let ptr = item.0.take().expect("Invalid wrapper");
+ let ret: bool = from_glib(ffi::rtp_jitter_buffer_insert(
+ self.to_glib_none().0,
+ ptr.as_ptr(),
+ head.as_mut_ptr(),
+ percent.as_mut_ptr(),
+ ));
+ if !ret {
+ item.0 = Some(ptr);
+ }
+ (ret, from_glib(head.assume_init()), percent.assume_init())
+ }
}
-}
-
-impl ObjectSubclass for JitterBuffer {
- const NAME: &'static str = "RsTsJitterBuffer";
- type ParentType = gst::Element;
- type Instance = gst::subclass::ElementInstanceStruct<Self>;
- type Class = subclass::simple::ClassStruct<Self>;
-
- glib_object_subclass!();
- fn class_init(klass: &mut subclass::simple::ClassStruct<Self>) {
- klass.set_metadata(
- "Thread-sharing jitterbuffer",
- "Generic",
- "Simple jitterbuffer",
- "Mathieu Duponchelle <mathieu@centricular.com>",
- );
+ pub fn find_earliest(&self) -> (gst::ClockTime, Option<u16>) {
+ unsafe {
+ let mut pts = mem::MaybeUninit::uninit();
+ let mut seqnum = mem::MaybeUninit::uninit();
- let caps = gst::Caps::new_any();
-
- let sink_pad_template = gst::PadTemplate::new(
- "sink",
- gst::PadDirection::Sink,
- gst::PadPresence::Always,
- &caps,
- )
- .unwrap();
- klass.add_pad_template(sink_pad_template);
- klass.add_signal(
- "request-pt-map",
- glib::SignalFlags::RUN_LAST,
- &[u32::static_type()],
- gst::Caps::static_type(),
- );
+ ffi::rtp_jitter_buffer_find_earliest(
+ self.to_glib_none().0,
+ pts.as_mut_ptr(),
+ seqnum.as_mut_ptr(),
+ );
+ let pts = pts.assume_init();
+ let seqnum = seqnum.assume_init();
- klass.add_signal_with_class_handler(
- "clear-pt-map",
- glib::SignalFlags::RUN_LAST | glib::SignalFlags::ACTION,
- &[],
- glib::types::Type::Unit,
- |_, args| {
- let element = args[0]
- .get::<gst::Element>()
- .expect("signal arg")
- .expect("missing signal arg");
- let jb = Self::from_instance(&element);
- jb.clear_pt_map(&element);
+ let seqnum = if seqnum == std::u32::MAX {
None
- },
- );
-
- let src_pad_template = gst::PadTemplate::new(
- "src",
- gst::PadDirection::Src,
- gst::PadPresence::Always,
- &caps,
- )
- .unwrap();
- klass.add_pad_template(src_pad_template);
- klass.install_properties(&PROPERTIES);
- }
-
- fn with_class(klass: &subclass::simple::ClassStruct<Self>) -> Self {
- let sink_pad_handler = SinkHandler::default();
- let src_pad_handler = SrcHandler::default();
+ } else {
+ Some(seqnum as u16)
+ };
- Self {
- sink_pad: PadSink::new(
- gst::Pad::from_template(&klass.get_pad_template("sink").unwrap(), Some("sink")),
- sink_pad_handler.clone(),
- ),
- src_pad: PadSrc::new(
- gst::Pad::from_template(&klass.get_pad_template("src").unwrap(), Some("src")),
- src_pad_handler.clone(),
- ),
- sink_pad_handler,
- src_pad_handler,
- task: Task::default(),
- state: StdMutex::new(State::default()),
- settings: StdMutex::new(Settings::default()),
+ if pts == gst_ffi::GST_CLOCK_TIME_NONE {
+ (gst::CLOCK_TIME_NONE, seqnum)
+ } else {
+ (pts.into(), seqnum)
+ }
}
}
-}
-
-impl ObjectImpl for JitterBuffer {
- fn set_property(&self, obj: &glib::Object, id: usize, value: &glib::Value) {
- let prop = &PROPERTIES[id];
-
- match *prop {
- subclass::Property("latency", ..) => {
- let latency_ms = {
- let mut settings = self.settings.lock().unwrap();
- settings.latency_ms = value.get_some().expect("type checked upstream");
- settings.latency_ms as u64
- };
- let state = self.state.lock().unwrap();
- state.jbuf.borrow().set_delay(latency_ms * gst::MSECOND);
+ pub fn pop(&self) -> (Option<RTPJitterBufferItem>, i32) {
+ unsafe {
+ let mut percent = mem::MaybeUninit::uninit();
+ let item = ffi::rtp_jitter_buffer_pop(self.to_glib_none().0, percent.as_mut_ptr());
- let element = obj.downcast_ref::<gst::Element>().unwrap();
- let _ = element.post_message(gst::message::Latency::builder().src(element).build());
- }
- subclass::Property("do-lost", ..) => {
- let mut settings = self.settings.lock().unwrap();
- settings.do_lost = value.get_some().expect("type checked upstream");
- }
- subclass::Property("max-dropout-time", ..) => {
- let mut settings = self.settings.lock().unwrap();
- settings.max_dropout_time = value.get_some().expect("type checked upstream");
- }
- subclass::Property("max-misorder-time", ..) => {
- let mut settings = self.settings.lock().unwrap();
- settings.max_misorder_time = value.get_some().expect("type checked upstream");
- }
- subclass::Property("context", ..) => {
- let mut settings = self.settings.lock().unwrap();
- settings.context = value
- .get()
- .expect("type checked upstream")
- .unwrap_or_else(|| "".into());
- }
- subclass::Property("context-wait", ..) => {
- let mut settings = self.settings.lock().unwrap();
- settings.context_wait = value.get_some().expect("type checked upstream");
- }
- _ => unimplemented!(),
+ (
+ if item.is_null() {
+ None
+ } else {
+ Some(RTPJitterBufferItem(Some(ptr::NonNull::new_unchecked(item))))
+ },
+ percent.assume_init(),
+ )
}
}
- fn get_property(&self, _obj: &glib::Object, id: usize) -> Result<glib::Value, ()> {
- let prop = &PROPERTIES[id];
-
- match *prop {
- subclass::Property("latency", ..) => {
- let settings = self.settings.lock().unwrap();
- Ok(settings.latency_ms.to_value())
- }
- subclass::Property("do-lost", ..) => {
- let settings = self.settings.lock().unwrap();
- Ok(settings.do_lost.to_value())
- }
- subclass::Property("max-dropout-time", ..) => {
- let settings = self.settings.lock().unwrap();
- Ok(settings.max_dropout_time.to_value())
- }
- subclass::Property("max-misorder-time", ..) => {
- let settings = self.settings.lock().unwrap();
- Ok(settings.max_misorder_time.to_value())
- }
- subclass::Property("stats", ..) => {
- let state = self.state.lock().unwrap();
- let s = gst::Structure::new(
- "application/x-rtp-jitterbuffer-stats",
- &[
- ("num-pushed", &state.stats.num_pushed),
- ("num-lost", &state.stats.num_lost),
- ("num-late", &state.stats.num_late),
- ],
- );
- Ok(s.to_value())
- }
- subclass::Property("context", ..) => {
- let settings = self.settings.lock().unwrap();
- Ok(settings.context.to_value())
- }
- subclass::Property("context-wait", ..) => {
- let settings = self.settings.lock().unwrap();
- Ok(settings.context_wait.to_value())
+ pub fn peek(&self) -> (gst::ClockTime, Option<u16>) {
+ unsafe {
+ let item = ffi::rtp_jitter_buffer_peek(self.to_glib_none().0);
+ if item.is_null() {
+ (gst::CLOCK_TIME_NONE, None)
+ } else {
+ let seqnum = (*item).seqnum;
+ let seqnum = if seqnum == std::u32::MAX {
+ None
+ } else {
+ Some(seqnum as u16)
+ };
+ ((*item).pts.into(), seqnum)
}
- _ => unimplemented!(),
}
}
- fn constructed(&self, obj: &glib::Object) {
- self.parent_constructed(obj);
-
- let element = obj.downcast_ref::<gst::Element>().unwrap();
- element.add_pad(self.sink_pad.gst_pad()).unwrap();
- element.add_pad(self.src_pad.gst_pad()).unwrap();
- element
- .set_element_flags(gst::ElementFlags::PROVIDE_CLOCK | gst::ElementFlags::REQUIRE_CLOCK);
- }
-}
-
-impl ElementImpl for JitterBuffer {
- fn change_state(
- &self,
- element: &gst::Element,
- transition: gst::StateChange,
- ) -> Result<gst::StateChangeSuccess, gst::StateChangeError> {
- gst_trace!(CAT, obj: element, "Changing state {:?}", transition);
-
- match transition {
- gst::StateChange::NullToReady => {
- self.prepare(element).map_err(|err| {
- element.post_error_message(err);
- gst::StateChangeError
- })?;
- }
- gst::StateChange::PausedToReady => {
- self.stop(element).map_err(|_| gst::StateChangeError)?;
- }
- gst::StateChange::ReadyToNull => {
- self.unprepare(element);
- }
- _ => (),
+ pub fn flush(&self) {
+ unsafe extern "C" fn free_item(item: glib_ffi::gpointer, _: glib_ffi::gpointer) {
+ let _ =
+ RTPJitterBufferItem(Some(ptr::NonNull::new(item as *mut _).expect("NULL item")));
}
- let mut success = self.parent_change_state(element, transition)?;
-
- match transition {
- gst::StateChange::ReadyToPaused => {
- self.start(element).map_err(|_| gst::StateChangeError)?;
- success = gst::StateChangeSuccess::NoPreroll;
- }
- gst::StateChange::PlayingToPaused => {
- success = gst::StateChangeSuccess::NoPreroll;
- }
- _ => (),
+ unsafe {
+ ffi::rtp_jitter_buffer_flush(self.to_glib_none().0, Some(free_item));
}
-
- Ok(success)
}
- fn provide_clock(&self, _element: &gst::Element) -> Option<gst::Clock> {
- Some(gst::SystemClock::obtain())
+ pub fn reset_skew(&self) {
+ unsafe { ffi::rtp_jitter_buffer_reset_skew(self.to_glib_none().0) }
}
}
-pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> {
- gst::Element::register(
- Some(plugin),
- "ts-jitterbuffer",
- gst::Rank::None,
- JitterBuffer::get_type(),
- )
+impl Default for RTPJitterBuffer {
+ fn default() -> Self {
+ RTPJitterBuffer::new()
+ }
}