diff options
author | Sebastian Dröge <sebastian@centricular.com> | 2022-10-18 19:16:48 +0300 |
---|---|---|
committer | Sebastian Dröge <sebastian@centricular.com> | 2022-10-19 15:23:10 +0300 |
commit | 36861edf9a1cab1a6034c6fb61cf68c0fbb49660 (patch) | |
tree | 473c13dd74d3b3a2ea4a17fda4fb1aab9b7f0a94 | |
parent | e0437ae8f63679d32986517c012c0ec2e26a6d51 (diff) |
rtpav1pay: Use a `VecDeque` instead of a `Vec` for the queued OBUs
And use a `Vec` plus offset for consuming partial byte buffers.
Removing the first element from a `Vec` repeatedly is not very cheap.
Also simplify calculation of the current packet by removing a mostly
unused type and keeping track of the calculations always locally instead
of sometimes storing it in the element state.
-rw-r--r-- | net/rtpav1/src/pay/imp.rs | 204 |
1 files changed, 109 insertions, 95 deletions
diff --git a/net/rtpav1/src/pay/imp.rs b/net/rtpav1/src/pay/imp.rs index bd59e680..bfd7ef76 100644 --- a/net/rtpav1/src/pay/imp.rs +++ b/net/rtpav1/src/pay/imp.rs @@ -10,6 +10,7 @@ use gst::{glib, subclass::prelude::*}; use gst_rtp::{prelude::*, subclass::prelude::*}; use std::{ + collections::VecDeque, io::{Cursor, Read, Seek, SeekFrom, Write}, sync::Mutex, }; @@ -32,7 +33,7 @@ static CAT: Lazy<gst::DebugCategory> = Lazy::new(|| { // TODO: properly handle `max_ptime` and `min_ptime` /// Information about the OBUs intended to be grouped into one packet -#[derive(Copy, Clone, Debug, Default, PartialEq, Eq)] +#[derive(Copy, Clone, Debug, PartialEq, Eq)] struct PacketOBUData { obu_count: usize, payload_size: u32, @@ -41,21 +42,23 @@ struct PacketOBUData { ends_temporal_unit: bool, } -/// Temporary information held between invocations of `consider_new_packet()` -#[derive(Copy, Clone, Debug, Default, PartialEq, Eq)] -struct TempPacketData { - payload_limit: u32, - required_ids: Option<(u8, u8)>, - /// bytes used for an OBUs size field will only be added to the total - /// once its known for sure it will be placed in the packet - pending_bytes: u32, - packet: PacketOBUData, +impl Default for PacketOBUData { + fn default() -> Self { + PacketOBUData { + payload_size: 1, // 1 byte is used for the aggregation header + omit_last_size_field: true, + obu_count: 0, + last_obu_fragment_size: None, + ends_temporal_unit: false, + } + } } #[derive(Clone, Debug, Default, PartialEq, Eq)] struct ObuData { info: SizedObu, bytes: Vec<u8>, + offset: usize, dts: Option<gst::ClockTime>, pts: Option<gst::ClockTime>, } @@ -64,8 +67,7 @@ struct ObuData { struct State { /// Holds header information and raw bytes for all received OBUs, /// as well as DTS and PTS - //obus: Vec<(SizedObu, Vec<u8>, Option<ClockTime>, Option<ClockTime>)>, - obus: Vec<ObuData>, + obus: VecDeque<ObuData>, /// Indicates that the first element in the Buffer is an OBU fragment, /// left over from the previous RTP packet @@ -74,8 +76,6 @@ struct State { /// Indicates the next constructed packet will be the first in its sequence /// (Corresponds to `N` field in the aggregation header) first_packet_in_seq: bool, - - temp_packet_data: Option<TempPacketData>, } #[derive(Debug, Default)] @@ -86,10 +86,9 @@ pub struct RTPAv1Pay { impl Default for State { fn default() -> Self { Self { - obus: Vec::new(), + obus: VecDeque::new(), open_obu_fragment: false, first_packet_in_seq: true, - temp_packet_data: None, } } } @@ -140,9 +139,10 @@ impl RTPAv1Pay { ); return Err(gst::FlowError::Error); } - state.obus.push(ObuData { + state.obus.push_back(ObuData { info: obu, bytes: Vec::new(), + offset: 0, dts, pts, }); @@ -171,9 +171,10 @@ impl RTPAv1Pay { .read_exact(&mut bytes[(obu.header_len as usize)..bytes_total]) .map_err(err_flow!(self, buf_read))?; - state.obus.push(ObuData { + state.obus.push_back(ObuData { info: obu, bytes, + offset: 0, dts, pts, }); @@ -208,75 +209,68 @@ impl RTPAv1Pay { state.obus.len() ); - let mut data = state.temp_packet_data.take().unwrap_or_else(|| { - TempPacketData { - payload_limit: gst_rtp::calc_payload_len(self.instance().mtu(), 0, 0), - packet: PacketOBUData { - payload_size: 1, // 1 byte is used for the aggregation header - omit_last_size_field: true, - ..PacketOBUData::default() - }, - ..TempPacketData::default() - } - }); - let mut packet = data.packet; + let payload_limit = gst_rtp::calc_payload_len(self.instance().mtu(), 0, 0); + + // Create information about the packet that can be created now while iterating over the + // OBUs and return this if a full packet can indeed be created now. + let mut packet = PacketOBUData::default(); + let mut pending_bytes = 0; + let mut required_ids = None::<(u8, u8)>; // figure out how many OBUs we can fit into this packet - while packet.obu_count < state.obus.len() { + for obu in &state.obus { // for OBUs with extension headers, spatial and temporal IDs must be equal // to all other such OBUs in the packet - let matching_obu_ids = |obu: &SizedObu, data: &mut TempPacketData| -> bool { - if let Some((sid, tid)) = data.required_ids { + let matching_obu_ids = |obu: &SizedObu, required_ids: &mut Option<(u8, u8)>| -> bool { + if let Some((sid, tid)) = *required_ids { sid == obu.spatial_id && tid == obu.temporal_id } else { - data.required_ids = Some((obu.spatial_id, obu.temporal_id)); + *required_ids = Some((obu.spatial_id, obu.temporal_id)); true } }; - let current = state.obus[packet.obu_count].info; + let current = &obu.info; // should this packet be finished here? if current.obu_type == ObuType::TemporalDelimiter { - // remove the temporal delimiter, it is not supposed to be transmitted + // ignore the temporal delimiter, it is not supposed to be transmitted, + // it will be skipped later when building the packet gst::log!(CAT, imp: self, "ignoring temporal delimiter OBU"); - state.obus.remove(packet.obu_count); if packet.obu_count > 0 { packet.ends_temporal_unit = true; if packet.obu_count > 3 { - packet.payload_size += data.pending_bytes; + packet.payload_size += pending_bytes; packet.omit_last_size_field = false; } return Some(packet); - } else { - continue; } - } else if packet.payload_size >= data.payload_limit + + continue; + } else if packet.payload_size >= payload_limit || (packet.obu_count > 0 && current.obu_type == ObuType::SequenceHeader) - || !matching_obu_ids(&state.obus[packet.obu_count].info, &mut data) + || !matching_obu_ids(current, &mut required_ids) { if packet.obu_count > 3 { - packet.payload_size += data.pending_bytes; + packet.payload_size += pending_bytes; packet.omit_last_size_field = false; } return Some(packet); } // would the full OBU fit? - if packet.payload_size + data.pending_bytes + current.full_size() <= data.payload_limit - { + if packet.payload_size + pending_bytes + current.full_size() <= payload_limit { packet.obu_count += 1; - packet.payload_size += current.partial_size() + data.pending_bytes; - data.pending_bytes = current.leb_size; + packet.payload_size += current.partial_size() + pending_bytes; + pending_bytes = current.leb_size; } // would it fit without the size field? else if packet.obu_count < 3 - && packet.payload_size + data.pending_bytes + current.partial_size() - <= data.payload_limit + && packet.payload_size + pending_bytes + current.partial_size() <= payload_limit { packet.obu_count += 1; - packet.payload_size += current.partial_size() + data.pending_bytes; + packet.payload_size += current.partial_size() + pending_bytes; return Some(packet); } @@ -287,21 +281,20 @@ impl RTPAv1Pay { } else { // assume the biggest possible OBU fragment, // so if anything the size field will be smaller than expected - leb128_size(data.payload_limit - packet.payload_size) as u32 + leb128_size(payload_limit - packet.payload_size) as u32 }; // is there even enough space to bother? - if packet.payload_size + data.pending_bytes + leb_size + current.header_len - < data.payload_limit + if packet.payload_size + pending_bytes + leb_size + current.header_len + < payload_limit { packet.obu_count += 1; - packet.last_obu_fragment_size = Some( - data.payload_limit - packet.payload_size - data.pending_bytes - leb_size, - ); - packet.payload_size = data.payload_limit; + packet.last_obu_fragment_size = + Some(payload_limit - packet.payload_size - pending_bytes - leb_size); + packet.payload_size = payload_limit; packet.omit_last_size_field = leb_size == 0; } else if packet.obu_count > 3 { - packet.payload_size += data.pending_bytes; + packet.payload_size += pending_bytes; } return Some(packet); @@ -310,14 +303,12 @@ impl RTPAv1Pay { if force && packet.obu_count > 0 { if packet.obu_count > 3 { - packet.payload_size += data.pending_bytes; + packet.payload_size += pending_bytes; packet.omit_last_size_field = false; } Some(packet) } else { // if we ran out of OBUs with space in the packet to spare, wait a bit longer - data.packet = packet; - state.temp_packet_data = Some(data); None } } @@ -350,11 +341,12 @@ impl RTPAv1Pay { { // this block enforces that outbuf_mut is dropped before pushing outbuf + let first_obu = state.obus.front().unwrap(); let outbuf_mut = outbuf .get_mut() .expect("Failed to get mutable reference to outbuf"); - outbuf_mut.set_dts(state.obus[0].dts); - outbuf_mut.set_pts(state.obus[0].pts); + outbuf_mut.set_dts(first_obu.dts); + outbuf_mut.set_pts(first_obu.pts); let mut rtp = gst_rtp::RTPBuffer::from_buffer_writable(outbuf_mut) .expect("Failed to create RTPBuffer"); @@ -389,7 +381,13 @@ impl RTPAv1Pay { // append OBUs to the buffer for _ in 1..packet.obu_count { - let obu = &state.obus[0]; + let obu = loop { + let obu = state.obus.pop_front().unwrap(); + // Drop temporal delimiter from here + if obu.info.obu_type != ObuType::TemporalDelimiter { + break obu; + } + }; write_leb128( &mut BitWriter::endian(&mut writer, ENDIANNESS), @@ -397,21 +395,28 @@ impl RTPAv1Pay { ) .map_err(err_flow!(self, leb_write))?; writer - .write(&obu.bytes) + .write(&obu.bytes[obu.offset..]) .map_err(err_flow!(self, obu_write))?; - - state.obus.remove(0); } state.open_obu_fragment = false; { + let last_obu = loop { + let obu = state.obus.front_mut().unwrap(); + // Drop temporal delimiter from here + if obu.info.obu_type != ObuType::TemporalDelimiter { + break obu; + } + let _ = state.obus.pop_front().unwrap(); + }; + // do the last OBU separately // in this instance `obu_size` includes the header length let obu_size = if let Some(size) = packet.last_obu_fragment_size { state.open_obu_fragment = true; size } else { - state.obus[0].bytes.len() as u32 + last_obu.bytes.len() as u32 - last_obu.offset as usize as u32 }; if !packet.omit_last_size_field { @@ -420,33 +425,30 @@ impl RTPAv1Pay { } // if this OBU is not a fragment, handle it as usual - if packet.last_obu_fragment_size == None { + if packet.last_obu_fragment_size.is_none() { writer - .write(&state.obus[0].bytes) + .write(&last_obu.bytes[last_obu.offset..]) .map_err(err_flow!(self, obu_write))?; - state.obus.remove(0); + let _ = state.obus.pop_front().unwrap(); } // otherwise write only a slice, and update the element // to only contain the unwritten bytes else { writer - .write(&state.obus[0].bytes[0..obu_size as usize]) + .write( + &last_obu.bytes[last_obu.offset..last_obu.offset + obu_size as usize], + ) .map_err(err_flow!(self, obu_write))?; - let new_size = state.obus[0].bytes.len() as u32 - obu_size; - state.obus[0] = ObuData { - info: SizedObu { - size: new_size, - header_len: 0, - leb_size: leb128_size(new_size) as u32, - is_fragment: true, - ..state.obus[0].info - }, - bytes: Vec::from( - &state.obus[0].bytes[obu_size as usize..state.obus[0].bytes.len()], - ), - ..state.obus[0] + let new_size = last_obu.bytes.len() as u32 - last_obu.offset as u32 - obu_size; + last_obu.info = SizedObu { + size: new_size, + header_len: 0, + leb_size: leb128_size(new_size) as u32, + is_fragment: true, + ..last_obu.info }; + last_obu.offset += obu_size as usize; } } } @@ -643,7 +645,7 @@ mod tests { false, // force argument State { // payloader state - obus: vec![ + obus: VecDeque::from(vec![ ObuData { info: SizedObu { obu_type: ObuType::Padding, @@ -689,14 +691,14 @@ mod tests { bytes: vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10], ..ObuData::default() }, - ], + ]), ..State::default() }, ), ( true, State { - obus: vec![ + obus: VecDeque::from(vec![ ObuData { info: SizedObu { obu_type: ObuType::TemporalDelimiter, @@ -741,14 +743,14 @@ mod tests { bytes: vec![1, 2, 3], ..ObuData::default() }, - ], + ]), ..State::default() }, ), ( false, State { - obus: vec![ObuData { + obus: VecDeque::from(vec![ObuData { info: SizedObu { obu_type: ObuType::Frame, size: 4, @@ -756,7 +758,7 @@ mod tests { }, bytes: vec![1, 2, 3, 4], ..ObuData::default() - }], + }]), ..State::default() }, ), @@ -772,12 +774,12 @@ mod tests { ends_temporal_unit: true, }), State { - obus: vec![ + obus: VecDeque::from(vec![ input_data[0].1.obus[0].clone(), input_data[0].1.obus[1].clone(), input_data[0].1.obus[2].clone(), input_data[0].1.obus[4].clone(), - ], + ]), ..input_data[0].1 }, ), @@ -790,7 +792,11 @@ mod tests { ends_temporal_unit: false, }), State { - obus: input_data[1].1.obus[1..].to_owned(), + obus: { + let mut copy = input_data[1].1.obus.clone(); + copy.pop_front().unwrap(); + copy + }, ..input_data[1].1 }, ), @@ -809,7 +815,15 @@ mod tests { pay.consider_new_packet(&mut state, input_data[idx].0), results[idx].0, ); - assert_eq!(state.obus, results[idx].1.obus); + assert_eq!( + state + .obus + .iter() + .filter(|o| o.info.obu_type != ObuType::TemporalDelimiter) + .cloned() + .collect::<Vec<_>>(), + results[idx].1.obus.iter().cloned().collect::<Vec<_>>() + ); assert_eq!(state.open_obu_fragment, results[idx].1.open_obu_fragment); assert_eq!( state.first_packet_in_seq, |