Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.freedesktop.org/gstreamer/gst-plugins-rs.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
path: root/net/rtp
diff options
context:
space:
mode:
authorSebastian Dröge <sebastian@centricular.com>2023-02-01 18:30:48 +0300
committerSebastian Dröge <sebastian@centricular.com>2023-02-02 21:24:27 +0300
commitd6cb9d72d8259d181b92c5973907d663720bbe00 (patch)
tree512e05889301ab1e98d093c23d502b75015101cc /net/rtp
parent27128a476c20f11ff845a935c04fb810e0789321 (diff)
rtpav1depay: Don't output full TUs but just OBUs as they come
Simplifies state tracking and potentially reduces latency as it's not necessary to wait until all fragments of an OBU are received. The last OBU of a TU is marked with the marker flag to allow parsers to detect this without first seeing the beginning of the next TU. Also use a simple `Vec` for collecting complete OBUs instead of a `gst_base::Adapter` as this reduces the number of allocations. And also handle invalid packets a little bit more gracefully. Fixes https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/issues/244 Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/1072>
Diffstat (limited to 'net/rtp')
-rw-r--r--net/rtp/Cargo.toml1
-rw-r--r--net/rtp/src/av1/depay/imp.rs138
-rw-r--r--net/rtp/tests/rtpav1.rs4
3 files changed, 67 insertions, 76 deletions
diff --git a/net/rtp/Cargo.toml b/net/rtp/Cargo.toml
index 1e4819ea4..46c9bbc62 100644
--- a/net/rtp/Cargo.toml
+++ b/net/rtp/Cargo.toml
@@ -11,7 +11,6 @@ rust-version = "1.63"
[dependencies]
bitstream-io = "1.3"
gst = { package = "gstreamer", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs", features = ["v1_20"] }
-gst-base = { package = "gstreamer-base", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs", features = ["v1_20"] }
gst-rtp = { package = "gstreamer-rtp", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs", features = ["v1_20"]}
once_cell = "1.0"
chrono = { version = "0.4", default-features = false }
diff --git a/net/rtp/src/av1/depay/imp.rs b/net/rtp/src/av1/depay/imp.rs
index 0270d2dda..355694d49 100644
--- a/net/rtp/src/av1/depay/imp.rs
+++ b/net/rtp/src/av1/depay/imp.rs
@@ -28,9 +28,6 @@ use crate::av1::common::{
#[derive(Debug, Default)]
struct State {
- /// used to store outgoing OBUs until the TU is complete
- adapter: gst_base::UniqueAdapter,
-
last_timestamp: Option<u32>,
/// if true, the last packet of a temporal unit has been received
marked_packet: bool,
@@ -51,8 +48,7 @@ static CAT: Lazy<gst::DebugCategory> = Lazy::new(|| {
)
});
-static TEMPORAL_DELIMITER: Lazy<gst::Memory> =
- Lazy::new(|| gst::Memory::from_slice([0b0001_0010, 0]));
+static TEMPORAL_DELIMITER: [u8; 2] = [0b0001_0010, 0];
impl RTPAv1Depay {
fn reset(&self, state: &mut State) {
@@ -109,7 +105,7 @@ impl ElementImpl for RTPAv1Depay {
&gst::Caps::builder("video/x-av1")
.field("parsed", true)
.field("stream-format", "obu-stream")
- .field("alignment", "tu")
+ .field("alignment", "obu")
.build(),
)
.unwrap();
@@ -185,10 +181,8 @@ impl RTPBaseDepayloadImpl for RTPAv1Depay {
self.reset(&mut state);
}
- // number of bytes that can be used in the next outgoing buffer
- let mut bytes_ready = 0;
let mut reader = Cursor::new(payload);
- let mut ready_obus = gst::Buffer::new();
+ let mut ready_obus = Vec::new();
let aggr_header = {
let mut byte = [0; 1];
@@ -214,18 +208,10 @@ impl RTPBaseDepayloadImpl for RTPAv1Depay {
state.last_timestamp
);
self.reset(&mut state);
- return None;
}
- // all the currently stored bytes can be packed into the next outgoing buffer
- bytes_ready = state.adapter.available();
-
// the next temporal unit starts with a temporal delimiter OBU
- ready_obus
- .get_mut()
- .unwrap()
- .insert_memory(None, TEMPORAL_DELIMITER.clone());
- state.marked_packet = false;
+ ready_obus.extend_from_slice(&TEMPORAL_DELIMITER);
}
state.marked_packet = rtp.is_marker();
state.last_timestamp = Some(rtp.timestamp());
@@ -234,16 +220,17 @@ impl RTPBaseDepayloadImpl for RTPAv1Depay {
let mut idx = 0;
// handle leading OBU fragment
- if let Some((obu, ref mut bytes)) = &mut state.obu_fragment {
- if !aggr_header.leading_fragment {
- gst::error!(
- CAT,
- imp: self,
- "invalid packet: ignores unclosed OBU fragment"
- );
- return None;
- }
+ if state.obu_fragment.is_some() && !aggr_header.leading_fragment {
+ gst::error!(
+ CAT,
+ imp: self,
+ "invalid packet: ignores unclosed OBU fragment"
+ );
+ self.reset(&mut state);
+ }
+ if let Some((obu, ref mut bytes)) = &mut state.obu_fragment {
+ assert!(aggr_header.leading_fragment);
let (element_size, is_last_obu) =
self.find_element_info(rtp, &mut reader, &aggr_header, idx)?;
@@ -262,9 +249,11 @@ impl RTPBaseDepayloadImpl for RTPAv1Depay {
obu.as_sized(size, leb_size)
};
- let buffer = self.translate_obu(&mut Cursor::new(bytes.as_slice()), &full_obu)?;
-
- state.adapter.push(buffer);
+ self.translate_obu(
+ &mut Cursor::new(bytes.as_slice()),
+ &full_obu,
+ &mut ready_obus,
+ )?;
state.obu_fragment = None;
}
}
@@ -291,7 +280,10 @@ impl RTPBaseDepayloadImpl for RTPAv1Depay {
.seek(SeekFrom::Current(element_size as i64))
.map_err(err_opt!(self, buf_read))
.ok()?;
+ idx += 1;
+ continue;
}
+
// trailing OBU fragments are stored in the state
if is_last_obu && aggr_header.trailing_fragment {
let bytes_left = rtp.payload_size() - (reader.position() as u32);
@@ -303,7 +295,7 @@ impl RTPBaseDepayloadImpl for RTPAv1Depay {
state.obu_fragment = Some((obu, bytes));
}
- // full OBUs elements are translated and appended to the adapter
+ // full OBUs elements are translated and appended to the ready OBUs
else {
let full_obu = {
let size = element_size - obu.header_len;
@@ -311,49 +303,48 @@ impl RTPBaseDepayloadImpl for RTPAv1Depay {
obu.as_sized(size, leb_size)
};
- ready_obus.append(self.translate_obu(&mut reader, &full_obu)?);
+ self.translate_obu(&mut reader, &full_obu, &mut ready_obus)?;
}
idx += 1;
}
- state.adapter.push(ready_obus);
+ // now push all the complete OBUs
+ let buffer = if !ready_obus.is_empty() {
+ gst::log!(
+ CAT,
+ imp: self,
+ "creating buffer containing {} bytes of data (marker {})...",
+ ready_obus.len(),
+ state.marked_packet,
+ );
- if state.marked_packet {
- if state.obu_fragment.is_some() {
- gst::error!(
- CAT,
- imp: self,
- concat!(
- "invalid packet: has marker bit set, but ",
- "last OBU is not yet complete"
- )
- );
- self.reset(&mut state);
- return None;
+ let mut buffer = gst::Buffer::from_mut_slice(ready_obus);
+ {
+ let buffer = buffer.get_mut().unwrap();
+ if state.marked_packet {
+ buffer.set_flags(gst::BufferFlags::MARKER);
+ }
}
- bytes_ready = state.adapter.available();
- }
+ Some(buffer)
+ } else {
+ None
+ };
- // now push all the complete temporal units
- if bytes_ready > 0 {
- gst::log!(
+ if state.marked_packet && state.obu_fragment.is_some() {
+ gst::error!(
CAT,
imp: self,
- "creating buffer containing {} bytes of data...",
- bytes_ready
+ concat!(
+ "invalid packet: has marker bit set, but ",
+ "last OBU is not yet complete"
+ )
);
- Some(
- state
- .adapter
- .take_buffer(bytes_ready)
- .map_err(err_opt!(self, buf_take))
- .ok()?,
- )
- } else {
- None
+ self.reset(&mut state);
}
+
+ buffer
}
}
@@ -407,12 +398,15 @@ impl RTPAv1Depay {
}
/// Using OBU data from an RTP packet, construct a buffer containing that OBU in AV1 bitstream format
- fn translate_obu(&self, reader: &mut Cursor<&[u8]>, obu: &SizedObu) -> Option<gst::Buffer> {
- let mut bytes = gst::Buffer::with_size(obu.full_size() as usize)
- .map_err(err_opt!(self, buf_alloc))
- .ok()?
- .into_mapped_buffer_writable()
- .unwrap();
+ fn translate_obu(
+ &self,
+ reader: &mut Cursor<&[u8]>,
+ obu: &SizedObu,
+ w: &mut Vec<u8>,
+ ) -> Option<usize> {
+ let pos = w.len();
+ w.resize(pos + obu.full_size() as usize, 0);
+ let bytes = &mut w[pos..];
// write OBU header
reader
@@ -447,7 +441,7 @@ impl RTPAv1Depay {
.map_err(err_opt!(self, buf_read))
.ok()?;
- Some(bytes.into_buffer())
+ Some(obu.full_size() as usize)
}
}
@@ -512,14 +506,10 @@ mod tests {
println!("running test {idx}...");
let mut reader = Cursor::new(rtp_bytes.as_slice());
- let actual = element.imp().translate_obu(&mut reader, &obu);
+ let mut actual = Vec::new();
+ element.imp().translate_obu(&mut reader, &obu, &mut actual).unwrap();
assert_eq!(reader.position(), rtp_bytes.len() as u64);
- assert!(actual.is_some());
- let actual = actual
- .unwrap()
- .into_mapped_buffer_readable()
- .unwrap();
assert_eq!(actual.as_slice(), out_bytes.as_slice());
}
}
diff --git a/net/rtp/tests/rtpav1.rs b/net/rtp/tests/rtpav1.rs
index 306e9d7e4..cf2479d48 100644
--- a/net/rtp/tests/rtpav1.rs
+++ b/net/rtp/tests/rtpav1.rs
@@ -57,7 +57,7 @@ fn test_depayloader() {
)
];
- let expected: [Vec<u8>; 2] = [
+ let expected: [Vec<u8>; 3] = [
vec![
0b0001_0010, 0,
0b0011_0010, 0b0000_0110, 1, 2, 3, 4, 5, 6,
@@ -65,6 +65,8 @@ fn test_depayloader() {
vec![
0b0001_0010, 0,
0b0111_1010, 0b0000_0101, 1, 2, 3, 4, 5,
+ ],
+ vec![
0b0011_0010, 0b0000_1010, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10,
],
];