Welcome to mirror list, hosted at ThFree Co, Russian Federation.

github.com/sdroege/gst-plugin-rs.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSebastian Dröge <sebastian@centricular.com>2022-10-18 19:16:48 +0300
committerSebastian Dröge <sebastian@centricular.com>2022-10-19 15:23:10 +0300
commit36861edf9a1cab1a6034c6fb61cf68c0fbb49660 (patch)
tree473c13dd74d3b3a2ea4a17fda4fb1aab9b7f0a94
parente0437ae8f63679d32986517c012c0ec2e26a6d51 (diff)
rtpav1pay: Use a `VecDeque` instead of a `Vec` for the queued OBUs
And use a `Vec` plus offset for consuming partial byte buffers. Removing the first element from a `Vec` repeatedly is not very cheap. Also simplify calculation of the current packet by removing a mostly unused type and keeping track of the calculations always locally instead of sometimes storing it in the element state.
-rw-r--r--net/rtpav1/src/pay/imp.rs204
1 files changed, 109 insertions, 95 deletions
diff --git a/net/rtpav1/src/pay/imp.rs b/net/rtpav1/src/pay/imp.rs
index bd59e680..bfd7ef76 100644
--- a/net/rtpav1/src/pay/imp.rs
+++ b/net/rtpav1/src/pay/imp.rs
@@ -10,6 +10,7 @@
use gst::{glib, subclass::prelude::*};
use gst_rtp::{prelude::*, subclass::prelude::*};
use std::{
+ collections::VecDeque,
io::{Cursor, Read, Seek, SeekFrom, Write},
sync::Mutex,
};
@@ -32,7 +33,7 @@ static CAT: Lazy<gst::DebugCategory> = Lazy::new(|| {
// TODO: properly handle `max_ptime` and `min_ptime`
/// Information about the OBUs intended to be grouped into one packet
-#[derive(Copy, Clone, Debug, Default, PartialEq, Eq)]
+#[derive(Copy, Clone, Debug, PartialEq, Eq)]
struct PacketOBUData {
obu_count: usize,
payload_size: u32,
@@ -41,21 +42,23 @@ struct PacketOBUData {
ends_temporal_unit: bool,
}
-/// Temporary information held between invocations of `consider_new_packet()`
-#[derive(Copy, Clone, Debug, Default, PartialEq, Eq)]
-struct TempPacketData {
- payload_limit: u32,
- required_ids: Option<(u8, u8)>,
- /// bytes used for an OBUs size field will only be added to the total
- /// once its known for sure it will be placed in the packet
- pending_bytes: u32,
- packet: PacketOBUData,
+impl Default for PacketOBUData {
+ fn default() -> Self {
+ PacketOBUData {
+ payload_size: 1, // 1 byte is used for the aggregation header
+ omit_last_size_field: true,
+ obu_count: 0,
+ last_obu_fragment_size: None,
+ ends_temporal_unit: false,
+ }
+ }
}
#[derive(Clone, Debug, Default, PartialEq, Eq)]
struct ObuData {
info: SizedObu,
bytes: Vec<u8>,
+ offset: usize,
dts: Option<gst::ClockTime>,
pts: Option<gst::ClockTime>,
}
@@ -64,8 +67,7 @@ struct ObuData {
struct State {
/// Holds header information and raw bytes for all received OBUs,
/// as well as DTS and PTS
- //obus: Vec<(SizedObu, Vec<u8>, Option<ClockTime>, Option<ClockTime>)>,
- obus: Vec<ObuData>,
+ obus: VecDeque<ObuData>,
/// Indicates that the first element in the Buffer is an OBU fragment,
/// left over from the previous RTP packet
@@ -74,8 +76,6 @@ struct State {
/// Indicates the next constructed packet will be the first in its sequence
/// (Corresponds to `N` field in the aggregation header)
first_packet_in_seq: bool,
-
- temp_packet_data: Option<TempPacketData>,
}
#[derive(Debug, Default)]
@@ -86,10 +86,9 @@ pub struct RTPAv1Pay {
impl Default for State {
fn default() -> Self {
Self {
- obus: Vec::new(),
+ obus: VecDeque::new(),
open_obu_fragment: false,
first_packet_in_seq: true,
- temp_packet_data: None,
}
}
}
@@ -140,9 +139,10 @@ impl RTPAv1Pay {
);
return Err(gst::FlowError::Error);
}
- state.obus.push(ObuData {
+ state.obus.push_back(ObuData {
info: obu,
bytes: Vec::new(),
+ offset: 0,
dts,
pts,
});
@@ -171,9 +171,10 @@ impl RTPAv1Pay {
.read_exact(&mut bytes[(obu.header_len as usize)..bytes_total])
.map_err(err_flow!(self, buf_read))?;
- state.obus.push(ObuData {
+ state.obus.push_back(ObuData {
info: obu,
bytes,
+ offset: 0,
dts,
pts,
});
@@ -208,75 +209,68 @@ impl RTPAv1Pay {
state.obus.len()
);
- let mut data = state.temp_packet_data.take().unwrap_or_else(|| {
- TempPacketData {
- payload_limit: gst_rtp::calc_payload_len(self.instance().mtu(), 0, 0),
- packet: PacketOBUData {
- payload_size: 1, // 1 byte is used for the aggregation header
- omit_last_size_field: true,
- ..PacketOBUData::default()
- },
- ..TempPacketData::default()
- }
- });
- let mut packet = data.packet;
+ let payload_limit = gst_rtp::calc_payload_len(self.instance().mtu(), 0, 0);
+
+ // Create information about the packet that can be created now while iterating over the
+ // OBUs and return this if a full packet can indeed be created now.
+ let mut packet = PacketOBUData::default();
+ let mut pending_bytes = 0;
+ let mut required_ids = None::<(u8, u8)>;
// figure out how many OBUs we can fit into this packet
- while packet.obu_count < state.obus.len() {
+ for obu in &state.obus {
// for OBUs with extension headers, spatial and temporal IDs must be equal
// to all other such OBUs in the packet
- let matching_obu_ids = |obu: &SizedObu, data: &mut TempPacketData| -> bool {
- if let Some((sid, tid)) = data.required_ids {
+ let matching_obu_ids = |obu: &SizedObu, required_ids: &mut Option<(u8, u8)>| -> bool {
+ if let Some((sid, tid)) = *required_ids {
sid == obu.spatial_id && tid == obu.temporal_id
} else {
- data.required_ids = Some((obu.spatial_id, obu.temporal_id));
+ *required_ids = Some((obu.spatial_id, obu.temporal_id));
true
}
};
- let current = state.obus[packet.obu_count].info;
+ let current = &obu.info;
// should this packet be finished here?
if current.obu_type == ObuType::TemporalDelimiter {
- // remove the temporal delimiter, it is not supposed to be transmitted
+ // ignore the temporal delimiter, it is not supposed to be transmitted,
+ // it will be skipped later when building the packet
gst::log!(CAT, imp: self, "ignoring temporal delimiter OBU");
- state.obus.remove(packet.obu_count);
if packet.obu_count > 0 {
packet.ends_temporal_unit = true;
if packet.obu_count > 3 {
- packet.payload_size += data.pending_bytes;
+ packet.payload_size += pending_bytes;
packet.omit_last_size_field = false;
}
return Some(packet);
- } else {
- continue;
}
- } else if packet.payload_size >= data.payload_limit
+
+ continue;
+ } else if packet.payload_size >= payload_limit
|| (packet.obu_count > 0 && current.obu_type == ObuType::SequenceHeader)
- || !matching_obu_ids(&state.obus[packet.obu_count].info, &mut data)
+ || !matching_obu_ids(current, &mut required_ids)
{
if packet.obu_count > 3 {
- packet.payload_size += data.pending_bytes;
+ packet.payload_size += pending_bytes;
packet.omit_last_size_field = false;
}
return Some(packet);
}
// would the full OBU fit?
- if packet.payload_size + data.pending_bytes + current.full_size() <= data.payload_limit
- {
+ if packet.payload_size + pending_bytes + current.full_size() <= payload_limit {
packet.obu_count += 1;
- packet.payload_size += current.partial_size() + data.pending_bytes;
- data.pending_bytes = current.leb_size;
+ packet.payload_size += current.partial_size() + pending_bytes;
+ pending_bytes = current.leb_size;
}
// would it fit without the size field?
else if packet.obu_count < 3
- && packet.payload_size + data.pending_bytes + current.partial_size()
- <= data.payload_limit
+ && packet.payload_size + pending_bytes + current.partial_size() <= payload_limit
{
packet.obu_count += 1;
- packet.payload_size += current.partial_size() + data.pending_bytes;
+ packet.payload_size += current.partial_size() + pending_bytes;
return Some(packet);
}
@@ -287,21 +281,20 @@ impl RTPAv1Pay {
} else {
// assume the biggest possible OBU fragment,
// so if anything the size field will be smaller than expected
- leb128_size(data.payload_limit - packet.payload_size) as u32
+ leb128_size(payload_limit - packet.payload_size) as u32
};
// is there even enough space to bother?
- if packet.payload_size + data.pending_bytes + leb_size + current.header_len
- < data.payload_limit
+ if packet.payload_size + pending_bytes + leb_size + current.header_len
+ < payload_limit
{
packet.obu_count += 1;
- packet.last_obu_fragment_size = Some(
- data.payload_limit - packet.payload_size - data.pending_bytes - leb_size,
- );
- packet.payload_size = data.payload_limit;
+ packet.last_obu_fragment_size =
+ Some(payload_limit - packet.payload_size - pending_bytes - leb_size);
+ packet.payload_size = payload_limit;
packet.omit_last_size_field = leb_size == 0;
} else if packet.obu_count > 3 {
- packet.payload_size += data.pending_bytes;
+ packet.payload_size += pending_bytes;
}
return Some(packet);
@@ -310,14 +303,12 @@ impl RTPAv1Pay {
if force && packet.obu_count > 0 {
if packet.obu_count > 3 {
- packet.payload_size += data.pending_bytes;
+ packet.payload_size += pending_bytes;
packet.omit_last_size_field = false;
}
Some(packet)
} else {
// if we ran out of OBUs with space in the packet to spare, wait a bit longer
- data.packet = packet;
- state.temp_packet_data = Some(data);
None
}
}
@@ -350,11 +341,12 @@ impl RTPAv1Pay {
{
// this block enforces that outbuf_mut is dropped before pushing outbuf
+ let first_obu = state.obus.front().unwrap();
let outbuf_mut = outbuf
.get_mut()
.expect("Failed to get mutable reference to outbuf");
- outbuf_mut.set_dts(state.obus[0].dts);
- outbuf_mut.set_pts(state.obus[0].pts);
+ outbuf_mut.set_dts(first_obu.dts);
+ outbuf_mut.set_pts(first_obu.pts);
let mut rtp = gst_rtp::RTPBuffer::from_buffer_writable(outbuf_mut)
.expect("Failed to create RTPBuffer");
@@ -389,7 +381,13 @@ impl RTPAv1Pay {
// append OBUs to the buffer
for _ in 1..packet.obu_count {
- let obu = &state.obus[0];
+ let obu = loop {
+ let obu = state.obus.pop_front().unwrap();
+ // Drop temporal delimiter from here
+ if obu.info.obu_type != ObuType::TemporalDelimiter {
+ break obu;
+ }
+ };
write_leb128(
&mut BitWriter::endian(&mut writer, ENDIANNESS),
@@ -397,21 +395,28 @@ impl RTPAv1Pay {
)
.map_err(err_flow!(self, leb_write))?;
writer
- .write(&obu.bytes)
+ .write(&obu.bytes[obu.offset..])
.map_err(err_flow!(self, obu_write))?;
-
- state.obus.remove(0);
}
state.open_obu_fragment = false;
{
+ let last_obu = loop {
+ let obu = state.obus.front_mut().unwrap();
+ // Drop temporal delimiter from here
+ if obu.info.obu_type != ObuType::TemporalDelimiter {
+ break obu;
+ }
+ let _ = state.obus.pop_front().unwrap();
+ };
+
// do the last OBU separately
// in this instance `obu_size` includes the header length
let obu_size = if let Some(size) = packet.last_obu_fragment_size {
state.open_obu_fragment = true;
size
} else {
- state.obus[0].bytes.len() as u32
+ last_obu.bytes.len() as u32 - last_obu.offset as usize as u32
};
if !packet.omit_last_size_field {
@@ -420,33 +425,30 @@ impl RTPAv1Pay {
}
// if this OBU is not a fragment, handle it as usual
- if packet.last_obu_fragment_size == None {
+ if packet.last_obu_fragment_size.is_none() {
writer
- .write(&state.obus[0].bytes)
+ .write(&last_obu.bytes[last_obu.offset..])
.map_err(err_flow!(self, obu_write))?;
- state.obus.remove(0);
+ let _ = state.obus.pop_front().unwrap();
}
// otherwise write only a slice, and update the element
// to only contain the unwritten bytes
else {
writer
- .write(&state.obus[0].bytes[0..obu_size as usize])
+ .write(
+ &last_obu.bytes[last_obu.offset..last_obu.offset + obu_size as usize],
+ )
.map_err(err_flow!(self, obu_write))?;
- let new_size = state.obus[0].bytes.len() as u32 - obu_size;
- state.obus[0] = ObuData {
- info: SizedObu {
- size: new_size,
- header_len: 0,
- leb_size: leb128_size(new_size) as u32,
- is_fragment: true,
- ..state.obus[0].info
- },
- bytes: Vec::from(
- &state.obus[0].bytes[obu_size as usize..state.obus[0].bytes.len()],
- ),
- ..state.obus[0]
+ let new_size = last_obu.bytes.len() as u32 - last_obu.offset as u32 - obu_size;
+ last_obu.info = SizedObu {
+ size: new_size,
+ header_len: 0,
+ leb_size: leb128_size(new_size) as u32,
+ is_fragment: true,
+ ..last_obu.info
};
+ last_obu.offset += obu_size as usize;
}
}
}
@@ -643,7 +645,7 @@ mod tests {
false, // force argument
State {
// payloader state
- obus: vec![
+ obus: VecDeque::from(vec![
ObuData {
info: SizedObu {
obu_type: ObuType::Padding,
@@ -689,14 +691,14 @@ mod tests {
bytes: vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10],
..ObuData::default()
},
- ],
+ ]),
..State::default()
},
),
(
true,
State {
- obus: vec![
+ obus: VecDeque::from(vec![
ObuData {
info: SizedObu {
obu_type: ObuType::TemporalDelimiter,
@@ -741,14 +743,14 @@ mod tests {
bytes: vec![1, 2, 3],
..ObuData::default()
},
- ],
+ ]),
..State::default()
},
),
(
false,
State {
- obus: vec![ObuData {
+ obus: VecDeque::from(vec![ObuData {
info: SizedObu {
obu_type: ObuType::Frame,
size: 4,
@@ -756,7 +758,7 @@ mod tests {
},
bytes: vec![1, 2, 3, 4],
..ObuData::default()
- }],
+ }]),
..State::default()
},
),
@@ -772,12 +774,12 @@ mod tests {
ends_temporal_unit: true,
}),
State {
- obus: vec![
+ obus: VecDeque::from(vec![
input_data[0].1.obus[0].clone(),
input_data[0].1.obus[1].clone(),
input_data[0].1.obus[2].clone(),
input_data[0].1.obus[4].clone(),
- ],
+ ]),
..input_data[0].1
},
),
@@ -790,7 +792,11 @@ mod tests {
ends_temporal_unit: false,
}),
State {
- obus: input_data[1].1.obus[1..].to_owned(),
+ obus: {
+ let mut copy = input_data[1].1.obus.clone();
+ copy.pop_front().unwrap();
+ copy
+ },
..input_data[1].1
},
),
@@ -809,7 +815,15 @@ mod tests {
pay.consider_new_packet(&mut state, input_data[idx].0),
results[idx].0,
);
- assert_eq!(state.obus, results[idx].1.obus);
+ assert_eq!(
+ state
+ .obus
+ .iter()
+ .filter(|o| o.info.obu_type != ObuType::TemporalDelimiter)
+ .cloned()
+ .collect::<Vec<_>>(),
+ results[idx].1.obus.iter().cloned().collect::<Vec<_>>()
+ );
assert_eq!(state.open_obu_fragment, results[idx].1.open_obu_fragment);
assert_eq!(
state.first_packet_in_seq,