Welcome to mirror list, hosted at ThFree Co, Russian Federation.

github.com/sdroege/gst-plugin-rs.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
path: root/net/rtp
diff options
context:
space:
mode:
authorSebastian Dröge <sebastian@centricular.com>2023-02-02 20:46:46 +0300
committerSebastian Dröge <sebastian@centricular.com>2023-02-02 21:24:27 +0300
commit194c4e9e9ff26a2828f3ca9d7fc960735ae88213 (patch)
treeef9583fa0907f9f68f7b7e7b0abb2d314eccfbee /net/rtp
parent49350f738f9ff3bf4fd0dc508a729f1ccb151473 (diff)
rtpav1pay: Consider the marker flag to output packets immediately at the end of a frame
Otherwise it is necessary to wait for the beginning of the following frame, which unnecessarily increases the latency. Fixes https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/issues/255 Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/1072>
Diffstat (limited to 'net/rtp')
-rw-r--r--net/rtp/src/av1/pay/imp.rs86
-rw-r--r--net/rtp/tests/rtpav1.rs2
2 files changed, 67 insertions, 21 deletions
diff --git a/net/rtp/src/av1/pay/imp.rs b/net/rtp/src/av1/pay/imp.rs
index bd6aa8d1..09fa9519 100644
--- a/net/rtp/src/av1/pay/imp.rs
+++ b/net/rtp/src/av1/pay/imp.rs
@@ -114,6 +114,7 @@ impl RTPAv1Pay {
&self,
state: &mut State,
data: &[u8],
+ marker: bool,
dts: Option<gst::ClockTime>,
pts: Option<gst::ClockTime>,
) -> Result<gst::BufferList, gst::FlowError> {
@@ -193,7 +194,7 @@ impl RTPAv1Pay {
let mut list = gst::BufferList::new();
{
let list = list.get_mut().unwrap();
- while let Some(packet_data) = self.consider_new_packet(state, false) {
+ while let Some(packet_data) = self.consider_new_packet(state, false, marker) {
let buffer = self.generate_new_packet(state, packet_data)?;
list.add(buffer);
}
@@ -208,13 +209,21 @@ impl RTPAv1Pay {
///
/// If `true` is passed for `force`, packets of any size will be accepted,
/// which is used in flushing the last OBUs after receiving an EOS for example.
- fn consider_new_packet(&self, state: &mut State, force: bool) -> Option<PacketOBUData> {
+ ///
+ /// If `true` is passed for `marker` then all queued OBUs are considered to finish this TU.
+ fn consider_new_packet(
+ &self,
+ state: &mut State,
+ force: bool,
+ marker: bool,
+ ) -> Option<PacketOBUData> {
gst::trace!(
CAT,
imp: self,
- "{} new packet, currently storing {} OBUs",
+ "{} new packet, currently storing {} OBUs (marker {})",
if force { "forcing" } else { "considering" },
- state.obus.len()
+ state.obus.len(),
+ marker,
);
let payload_limit = gst_rtp::calc_payload_len(self.obj().mtu(), 0, 0);
@@ -226,7 +235,7 @@ impl RTPAv1Pay {
let mut required_ids = None::<(u8, u8)>;
// figure out how many OBUs we can fit into this packet
- for obu in &state.obus {
+ for (idx, obu) in state.obus.iter().enumerate() {
// for OBUs with extension headers, spatial and temporal IDs must be equal
// to all other such OBUs in the packet
let matching_obu_ids = |obu: &SizedObu, required_ids: &mut Option<(u8, u8)>| -> bool {
@@ -247,11 +256,20 @@ impl RTPAv1Pay {
gst::log!(CAT, imp: self, "ignoring temporal delimiter OBU");
if packet.obu_count > 0 {
+ if marker {
+ gst::warning!(
+ CAT,
+ imp: self,
+ "Temporal delimited in the middle of a frame"
+ );
+ }
+
packet.ends_temporal_unit = true;
if packet.obu_count > 3 {
packet.payload_size += pending_bytes;
packet.omit_last_size_field = false;
}
+
return Some(packet);
}
@@ -264,6 +282,7 @@ impl RTPAv1Pay {
packet.payload_size += pending_bytes;
packet.omit_last_size_field = false;
}
+ packet.ends_temporal_unit = marker && idx == state.obus.len() - 1;
return Some(packet);
}
@@ -279,6 +298,7 @@ impl RTPAv1Pay {
{
packet.obu_count += 1;
packet.payload_size += current.partial_size() + pending_bytes;
+ packet.ends_temporal_unit = marker && idx == state.obus.len() - 1;
return Some(packet);
}
@@ -302,6 +322,7 @@ impl RTPAv1Pay {
packet.payload_size = payload_limit;
packet.omit_last_size_field = leb_size == 0;
} else if packet.obu_count > 3 {
+ packet.ends_temporal_unit = marker && idx == state.obus.len() - 1;
packet.payload_size += pending_bytes;
}
@@ -309,11 +330,13 @@ impl RTPAv1Pay {
}
}
- if force && packet.obu_count > 0 {
+ if (force || marker) && packet.obu_count > 0 {
if packet.obu_count > 3 {
packet.payload_size += pending_bytes;
packet.omit_last_size_field = false;
}
+ packet.ends_temporal_unit = true;
+
Some(packet)
} else {
// if we ran out of OBUs with space in the packet to spare, wait a bit longer
@@ -621,7 +644,7 @@ impl RTPBasePayloadImpl for RTPAv1Pay {
let dts = buffer.dts();
let pts = buffer.pts();
- let buffer = buffer.into_mapped_buffer_readable().map_err(|_| {
+ let map = buffer.map_readable().map_err(|_| {
gst::element_imp_error!(
self,
gst::ResourceError::Read,
@@ -631,7 +654,10 @@ impl RTPBasePayloadImpl for RTPAv1Pay {
gst::FlowError::Error
})?;
- let list = self.handle_new_obus(&mut state, buffer.as_slice(), dts, pts)?;
+ // Does the buffer finished a full TU?
+ let marker = buffer.flags().contains(gst::BufferFlags::MARKER);
+ let list = self.handle_new_obus(&mut state, map.as_slice(), marker, dts, pts)?;
+ drop(map);
drop(state);
if !list.is_empty() {
@@ -652,7 +678,7 @@ impl RTPBasePayloadImpl for RTPAv1Pay {
let mut state = self.state.lock().unwrap();
let list = list.get_mut().unwrap();
- while let Some(packet_data) = self.consider_new_packet(&mut state, true) {
+ while let Some(packet_data) = self.consider_new_packet(&mut state, true, true) {
match self.generate_new_packet(&mut state, packet_data) {
Ok(buffer) => list.add(buffer),
Err(_) => break,
@@ -804,15 +830,25 @@ mod tests {
(
false,
State {
- obus: VecDeque::from(vec![ObuData {
- info: SizedObu {
- obu_type: ObuType::Frame,
- size: 4,
- ..base_obu
+ obus: VecDeque::from(vec![
+ ObuData {
+ info: SizedObu {
+ obu_type: ObuType::TemporalDelimiter,
+ size: 0,
+ ..base_obu
+ },
+ ..ObuData::default()
},
- bytes: vec![1, 2, 3, 4],
- ..ObuData::default()
- }]),
+ ObuData {
+ info: SizedObu {
+ obu_type: ObuType::Frame,
+ size: 4,
+ ..base_obu
+ },
+ bytes: vec![1, 2, 3, 4],
+ ..ObuData::default()
+ },
+ ]),
..State::default()
},
),
@@ -843,7 +879,7 @@ mod tests {
payload_size: 34,
last_obu_fragment_size: None,
omit_last_size_field: false,
- ends_temporal_unit: false,
+ ends_temporal_unit: true,
}),
State {
obus: {
@@ -854,7 +890,17 @@ mod tests {
..input_data[1].1
},
),
- (None, input_data[2].1.clone()),
+ (
+ None,
+ State {
+ obus: {
+ let mut copy = input_data[2].1.obus.clone();
+ copy.pop_front().unwrap();
+ copy
+ },
+ ..input_data[2].1
+ },
+ ),
];
let element = <RTPAv1Pay as ObjectSubclass>::Type::new();
@@ -866,7 +912,7 @@ mod tests {
*state = input_data[idx].1.clone();
assert_eq!(
- pay.consider_new_packet(&mut state, input_data[idx].0),
+ pay.consider_new_packet(&mut state, input_data[idx].0, false),
results[idx].0,
);
assert_eq!(
diff --git a/net/rtp/tests/rtpav1.rs b/net/rtp/tests/rtpav1.rs
index cf2479d4..3ae68b42 100644
--- a/net/rtp/tests/rtpav1.rs
+++ b/net/rtp/tests/rtpav1.rs
@@ -167,7 +167,7 @@ fn test_payloader() {
0b0011_0100, 0b0100_1000, 1,
]
), (
- false,
+ true, // because of EOS
90_000,
vec![
0b0001_0000,