Welcome to mirror list, hosted at ThFree Co, Russian Federation.

github.com/sdroege/gst-plugin-rs.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
path: root/net/rtp
diff options
context:
space:
mode:
authorSebastian Dröge <sebastian@centricular.com>2023-02-01 23:01:53 +0300
committerSebastian Dröge <sebastian@centricular.com>2023-02-02 21:24:27 +0300
commit1756d7a5169a3872cad02d7fe80a831913942b32 (patch)
treec6e8605a5448aed801ff8a55ce5b040ea0ef89ea /net/rtp
parented4e9a50d5eee58f0a77a9a43a365c6354948e78 (diff)
rtpav1depay: Fix error handling
Don't error out immediately on errors anymore but try again with the next packet. Fixes https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/issues/289 Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/1072>
Diffstat (limited to 'net/rtp')
-rw-r--r--net/rtp/src/av1/common/error.rs47
-rw-r--r--net/rtp/src/av1/depay/imp.rs88
2 files changed, 56 insertions, 79 deletions
diff --git a/net/rtp/src/av1/common/error.rs b/net/rtp/src/av1/common/error.rs
index 29d1026a..3867bb07 100644
--- a/net/rtp/src/av1/common/error.rs
+++ b/net/rtp/src/av1/common/error.rs
@@ -10,13 +10,13 @@
macro_rules! err_flow {
($imp:ident, read, $msg:literal) => {
|err| {
- gst::element_imp_error!($imp, gst::ResourceError::Read, [$msg, err]);
+ gst::element_imp_warning!($imp, gst::ResourceError::Read, [$msg, err]);
gst::FlowError::Error
}
};
($imp:ident, write, $msg:literal) => {
|err| {
- gst::element_imp_error!($imp, gst::ResourceError::Write, [$msg, err]);
+ gst::element_imp_warning!($imp, gst::ResourceError::Write, [$msg, err]);
gst::FlowError::Error
}
};
@@ -44,51 +44,24 @@ macro_rules! err_flow {
($imp:ident, outbuf_alloc) => {
err_flow!($imp, write, "Failed to allocate output buffer: {}")
};
-}
-
-macro_rules! err_opt {
- ($imp:ident, read, $msg:literal) => {
- |err| {
- gst::element_imp_error!($imp, gst::ResourceError::Read, [$msg, err]);
- Option::<()>::None
- }
- };
- ($imp:ident, write, $msg:literal) => {
- |err| {
- gst::element_imp_error!($imp, gst::ResourceError::Write, [$msg, err]);
- Option::<()>::None
- }
- };
-
- ($imp:ident, buf_alloc) => {
- err_opt!($imp, write, "Failed to allocate new buffer: {}")
- };
-
($imp:ident, payload_buf) => {
- err_opt!($imp, read, "Failed to get RTP payload buffer: {}")
- };
- ($imp:ident, payload_map) => {
- err_opt!($imp, read, "Failed to map payload as readable: {}")
- };
- ($imp:ident, buf_take) => {
- err_opt!($imp, read, "Failed to take buffer from adapter: {}")
+ err_flow!($imp, read, "Failed to get RTP payload buffer: {}")
};
($imp:ident, aggr_header_read) => {
- err_opt!($imp, read, "Failed to read aggregation header: {}")
+ err_flow!($imp, read, "Failed to read aggregation header: {}")
+ };
+ ($imp:ident, find_element) => {
+ err_flow!($imp, read, "Failed to find OBU element in packet: {}")
};
($imp:ident, leb_read) => {
- err_opt!($imp, read, "Failed to read leb128 size field: {}")
+ err_flow!($imp, read, "Failed to read leb128 size field: {}")
};
($imp:ident, leb_write) => {
- err_opt!($imp, read, "Failed to write leb128 size field: {}")
+ err_flow!($imp, read, "Failed to write leb128 size field: {}")
};
($imp:ident, obu_read) => {
- err_opt!($imp, read, "Failed to read OBU header: {}")
- };
- ($imp:ident, buf_read) => {
- err_opt!($imp, read, "Failed to read RTP buffer: {}")
+ err_flow!($imp, read, "Failed to read OBU header: {}")
};
}
pub(crate) use err_flow;
-pub(crate) use err_opt;
diff --git a/net/rtp/src/av1/depay/imp.rs b/net/rtp/src/av1/depay/imp.rs
index 5ae3e305..b9b0ba3d 100644
--- a/net/rtp/src/av1/depay/imp.rs
+++ b/net/rtp/src/av1/depay/imp.rs
@@ -20,7 +20,7 @@ use bitstream_io::{BitReader, BitWriter};
use once_cell::sync::Lazy;
use crate::av1::common::{
- err_opt, leb128_size, parse_leb128, write_leb128, AggregationHeader, ObuType, SizedObu,
+ err_flow, leb128_size, parse_leb128, write_leb128, AggregationHeader, ObuType, SizedObu,
UnsizedObu, CLOCK_RATE, ENDIANNESS,
};
@@ -177,6 +177,20 @@ impl RTPBaseDepayloadImpl for RTPAv1Depay {
&self,
rtp: &gst_rtp::RTPBuffer<gst_rtp::rtp_buffer::Readable>,
) -> Option<gst::Buffer> {
+ if let Err(err) = self.handle_rtp_packet(rtp) {
+ gst::warning!(CAT, imp: self, "Failed to handle RTP packet: {err:?}");
+ self.reset(&mut self.state.lock().unwrap());
+ }
+
+ None
+ }
+}
+
+impl RTPAv1Depay {
+ fn handle_rtp_packet(
+ &self,
+ rtp: &gst_rtp::RTPBuffer<gst_rtp::rtp_buffer::Readable>,
+ ) -> Result<(), gst::FlowError> {
gst::log!(
CAT,
imp: self,
@@ -185,7 +199,7 @@ impl RTPBaseDepayloadImpl for RTPAv1Depay {
rtp.buffer().size(),
);
- let payload = rtp.payload().map_err(err_opt!(self, payload_buf)).ok()?;
+ let payload = rtp.payload().map_err(err_flow!(self, payload_buf))?;
let mut state = self.state.lock().unwrap();
@@ -201,8 +215,7 @@ impl RTPBaseDepayloadImpl for RTPAv1Depay {
let mut byte = [0; 1];
reader
.read_exact(&mut byte)
- .map_err(err_opt!(self, aggr_header_read))
- .ok()?;
+ .map_err(err_flow!(self, aggr_header_read))?;
AggregationHeader::from(&byte)
};
@@ -237,22 +250,22 @@ impl RTPBaseDepayloadImpl for RTPAv1Depay {
gst::error!(
CAT,
imp: self,
- "invalid packet: ignores unclosed OBU fragment"
+ "invalid packet: dropping unclosed OBU fragment"
);
self.reset(&mut state);
}
if let Some((obu, ref mut bytes)) = &mut state.obu_fragment {
assert!(aggr_header.leading_fragment);
- let (element_size, is_last_obu) =
- self.find_element_info(rtp, &mut reader, &aggr_header, idx)?;
+ let (element_size, is_last_obu) = self
+ .find_element_info(rtp, &mut reader, &aggr_header, idx)
+ .map_err(err_flow!(self, find_element))?;
let bytes_end = bytes.len();
bytes.resize(bytes_end + element_size as usize, 0);
reader
.read_exact(&mut bytes[bytes_end..])
- .map_err(err_opt!(self, buf_read))
- .ok()?;
+ .map_err(err_flow!(self, buf_read))?;
// if this OBU is complete, it can be appended to the adapter
if !(is_last_obu && aggr_header.trailing_fragment) {
@@ -278,21 +291,17 @@ impl RTPBaseDepayloadImpl for RTPAv1Depay {
let header_pos = reader.position();
let mut bitreader = BitReader::endian(&mut reader, ENDIANNESS);
- let obu = UnsizedObu::parse(&mut bitreader)
- .map_err(err_opt!(self, obu_read))
- .ok()?;
+ let obu = UnsizedObu::parse(&mut bitreader).map_err(err_flow!(self, obu_read))?;
reader
.seek(SeekFrom::Start(header_pos))
- .map_err(err_opt!(self, buf_read))
- .ok()?;
+ .map_err(err_flow!(self, buf_read))?;
// ignore these OBU types
if matches!(obu.obu_type, ObuType::TemporalDelimiter | ObuType::TileList) {
reader
.seek(SeekFrom::Current(element_size as i64))
- .map_err(err_opt!(self, buf_read))
- .ok()?;
+ .map_err(err_flow!(self, buf_read))?;
idx += 1;
continue;
}
@@ -303,8 +312,7 @@ impl RTPBaseDepayloadImpl for RTPAv1Depay {
let mut bytes = vec![0; bytes_left as usize];
reader
.read_exact(bytes.as_mut_slice())
- .map_err(err_opt!(self, buf_read))
- .ok()?;
+ .map_err(err_flow!(self, buf_read))?;
state.obu_fragment = Some((obu, bytes));
}
@@ -327,7 +335,7 @@ impl RTPBaseDepayloadImpl for RTPAv1Depay {
gst::log!(
CAT,
imp: self,
- "creating buffer containing {} bytes of data (marker {}, discont {})...",
+ "Creating buffer containing {} bytes of data (marker {}, discont {})...",
ready_obus.len(),
state.marked_packet,
state.needs_discont,
@@ -358,17 +366,20 @@ impl RTPBaseDepayloadImpl for RTPAv1Depay {
imp: self,
concat!(
"invalid packet: has marker bit set, but ",
- "last OBU is not yet complete"
+ "last OBU is not yet complete. Dropping incomplete OBU."
)
);
self.reset(&mut state);
}
+ drop(state);
+
+ if let Some(buffer) = buffer {
+ self.obj().push(buffer)?;
+ }
- buffer
+ Ok(())
}
-}
-impl RTPAv1Depay {
/// Find out the next OBU element's size, and if it is the last OBU in the packet.
/// The reader is expected to be at the first byte of the element,
/// or its preceding size field if present,
@@ -379,7 +390,7 @@ impl RTPAv1Depay {
reader: &mut Cursor<&[u8]>,
aggr_header: &AggregationHeader,
index: u32,
- ) -> Option<(u32, bool)> {
+ ) -> Result<(u32, bool), gst::FlowError> {
let element_size: u32;
let is_last_obu: bool;
@@ -389,14 +400,11 @@ impl RTPAv1Depay {
rtp.payload_size() - (reader.position() as u32)
} else {
let mut bitreader = BitReader::endian(reader, ENDIANNESS);
- parse_leb128(&mut bitreader)
- .map_err(err_opt!(self, leb_read))
- .ok()?
+ parse_leb128(&mut bitreader).map_err(err_flow!(self, leb_read))?
}
} else {
element_size = parse_leb128(&mut BitReader::endian(&mut *reader, ENDIANNESS))
- .map_err(err_opt!(self, leb_read))
- .ok()?;
+ .map_err(err_flow!(self, leb_read))?;
is_last_obu = match rtp
.payload_size()
.cmp(&(reader.position() as u32 + element_size))
@@ -409,12 +417,12 @@ impl RTPAv1Depay {
imp: self,
"invalid packet: size field gives impossibly large OBU size"
);
- return None;
+ return Err(gst::FlowError::Error);
}
};
}
- Some((element_size, is_last_obu))
+ Ok((element_size, is_last_obu))
}
/// Using OBU data from an RTP packet, construct a buffer containing that OBU in AV1 bitstream format
@@ -423,7 +431,7 @@ impl RTPAv1Depay {
reader: &mut Cursor<&[u8]>,
obu: &SizedObu,
w: &mut Vec<u8>,
- ) -> Option<usize> {
+ ) -> Result<(), gst::FlowError> {
let pos = w.len();
w.resize(pos + obu.full_size() as usize, 0);
let bytes = &mut w[pos..];
@@ -431,8 +439,7 @@ impl RTPAv1Depay {
// write OBU header
reader
.read_exact(&mut bytes[..obu.header_len as usize])
- .map_err(err_opt!(self, buf_read))
- .ok()?;
+ .map_err(err_flow!(self, buf_read))?;
// set `has_size_field`
bytes[0] |= 1 << 1;
@@ -440,8 +447,7 @@ impl RTPAv1Depay {
// skip internal size field if present
if obu.has_size_field {
parse_leb128(&mut BitReader::endian(&mut *reader, ENDIANNESS))
- .map_err(err_opt!(self, leb_read))
- .ok()?;
+ .map_err(err_flow!(self, leb_read))?;
}
// write size field
@@ -452,16 +458,14 @@ impl RTPAv1Depay {
),
obu.size,
)
- .map_err(err_opt!(self, leb_write))
- .ok()?;
+ .map_err(err_flow!(self, leb_write))?;
// write OBU payload
reader
.read_exact(&mut bytes[(obu.header_len + obu.leb_size) as usize..])
- .map_err(err_opt!(self, buf_read))
- .ok()?;
+ .map_err(err_flow!(self, buf_read))?;
- Some(obu.full_size() as usize)
+ Ok(())
}
}
@@ -584,7 +588,7 @@ mod tests {
println!("testing element {} with reader position {}...", obu_idx, reader.position());
let actual = element.imp().find_element_info(&rtp, &mut reader, &aggr_header, obu_idx as u32);
- assert_eq!(actual, Some(expected));
+ assert_eq!(actual, Ok(expected));
element_size = actual.unwrap().0;
}
}