diff options
author | Sebastian Dröge <sebastian@centricular.com> | 2022-10-19 15:42:48 +0300 |
---|---|---|
committer | Sebastian Dröge <sebastian@centricular.com> | 2022-10-19 15:42:48 +0300 |
commit | 9ce8e93c6374a712a6c8981557c726a3fc04273b (patch) | |
tree | 9095e16d0e10b1a1d333cdc779e50fd7598c3dd6 | |
parent | 36861edf9a1cab1a6034c6fb61cf68c0fbb49660 (diff) |
rtpav1pay: Track last known upstream PTS/DTS in case not all OBUs are properly timestamped
-rw-r--r-- | net/rtpav1/src/pay/imp.rs | 59 |
1 files changed, 57 insertions, 2 deletions
diff --git a/net/rtpav1/src/pay/imp.rs b/net/rtpav1/src/pay/imp.rs index bfd7ef76..fdd5ae26 100644 --- a/net/rtpav1/src/pay/imp.rs +++ b/net/rtpav1/src/pay/imp.rs @@ -10,6 +10,7 @@ use gst::{glib, subclass::prelude::*}; use gst_rtp::{prelude::*, subclass::prelude::*}; use std::{ + cmp, collections::VecDeque, io::{Cursor, Read, Seek, SeekFrom, Write}, sync::Mutex, @@ -76,6 +77,11 @@ struct State { /// Indicates the next constructed packet will be the first in its sequence /// (Corresponds to `N` field in the aggregation header) first_packet_in_seq: bool, + + /// The last observed DTS if upstream does not provide DTS for each OBU + last_dts: Option<gst::ClockTime>, + /// The last observed PTS if upstream does not provide PTS for each OBU + last_pts: Option<gst::ClockTime>, } #[derive(Debug, Default)] @@ -89,6 +95,8 @@ impl Default for State { obus: VecDeque::new(), open_obu_fragment: false, first_packet_in_seq: true, + last_dts: None, + last_pts: None, } } } @@ -342,11 +350,26 @@ impl RTPAv1Pay { { // this block enforces that outbuf_mut is dropped before pushing outbuf let first_obu = state.obus.front().unwrap(); + if let Some(dts) = first_obu.dts { + state.last_dts = Some( + state + .last_dts + .map_or(dts, |last_dts| cmp::max(last_dts, dts)), + ); + } + if let Some(pts) = first_obu.pts { + state.last_pts = Some( + state + .last_pts + .map_or(pts, |last_pts| cmp::max(last_pts, pts)), + ); + } + let outbuf_mut = outbuf .get_mut() .expect("Failed to get mutable reference to outbuf"); - outbuf_mut.set_dts(first_obu.dts); - outbuf_mut.set_pts(first_obu.pts); + outbuf_mut.set_dts(state.last_dts); + outbuf_mut.set_pts(state.last_pts); let mut rtp = gst_rtp::RTPBuffer::from_buffer_writable(outbuf_mut) .expect("Failed to create RTPBuffer"); @@ -383,6 +406,22 @@ impl RTPAv1Pay { for _ in 1..packet.obu_count { let obu = loop { let obu = state.obus.pop_front().unwrap(); + + if let Some(dts) = obu.dts { + state.last_dts = Some( + state + .last_dts + .map_or(dts, |last_dts| cmp::max(last_dts, dts)), + ); + } + if let Some(pts) = obu.pts { + state.last_pts = Some( + state + .last_pts + .map_or(pts, |last_pts| cmp::max(last_pts, pts)), + ); + } + // Drop temporal delimiter from here if obu.info.obu_type != ObuType::TemporalDelimiter { break obu; @@ -403,6 +442,22 @@ impl RTPAv1Pay { { let last_obu = loop { let obu = state.obus.front_mut().unwrap(); + + if let Some(dts) = obu.dts { + state.last_dts = Some( + state + .last_dts + .map_or(dts, |last_dts| cmp::max(last_dts, dts)), + ); + } + if let Some(pts) = obu.pts { + state.last_pts = Some( + state + .last_pts + .map_or(pts, |last_pts| cmp::max(last_pts, pts)), + ); + } + // Drop temporal delimiter from here if obu.info.obu_type != ObuType::TemporalDelimiter { break obu; |