Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.freedesktop.org/gstreamer/gst-plugins-rs.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
path: root/net
diff options
context:
space:
mode:
authorTomasz Andrzejak <andreiltd@gmail.com>2022-02-27 13:30:09 +0300
committerTomasz Andrzejak <andreiltd@gmail.com>2022-07-20 16:34:58 +0300
commit14160d1d3128b1fb8da50fb474d23e77e55d657e (patch)
treefaa27bdd3874e1b06ec85d18bce6e1a96c0b3d12 /net
parent02990f8fcc5193deeebe3da67e6465468acebfa1 (diff)
Add RaptorQ RTP FEC plugins
Diffstat (limited to 'net')
-rw-r--r--net/raptorq/Cargo.toml45
l---------net/raptorq/LICENSE-MPL-2.01
-rw-r--r--net/raptorq/README.md118
-rw-r--r--net/raptorq/build.rs3
-rw-r--r--net/raptorq/src/fecscheme.rs96
-rw-r--r--net/raptorq/src/lib.rs33
-rw-r--r--net/raptorq/src/raptorqdec/imp.rs939
-rw-r--r--net/raptorq/src/raptorqdec/mod.rs23
-rw-r--r--net/raptorq/src/raptorqenc/imp.rs969
-rw-r--r--net/raptorq/src/raptorqenc/mod.rs23
-rw-r--r--net/raptorq/tests/raptorq.rs618
11 files changed, 2868 insertions, 0 deletions
diff --git a/net/raptorq/Cargo.toml b/net/raptorq/Cargo.toml
new file mode 100644
index 000000000..f849b9982
--- /dev/null
+++ b/net/raptorq/Cargo.toml
@@ -0,0 +1,45 @@
+[package]
+name = "gst-plugin-raptorq"
+version = "0.9.0"
+authors = ["Tomasz Andrzejak <andreiltd@gmail.com>"]
+repository = "https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs"
+description = "Rust Raptorq FEC Plugin"
+license = "MPL-2.0"
+edition = "2021"
+rust-version = "1.56"
+
+[dependencies]
+gst = { package = "gstreamer", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs" }
+gst-base = { package = "gstreamer-base", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs" }
+gst-rtp = { package = "gstreamer-rtp", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs" }
+once_cell = "1.0"
+raptorq = "1.7"
+
+[dev-dependencies]
+gst-check = { package = "gstreamer-check", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs", features = ["v1_18"] }
+rand = "0.8"
+
+[lib]
+name = "gstraptorq"
+crate-type = ["cdylib", "rlib"]
+path = "src/lib.rs"
+
+[build-dependencies]
+gst-plugin-version-helper = { path="../../version-helper" }
+
+[features]
+static = []
+capi = []
+
+[package.metadata.capi]
+min_version = "0.8.0"
+
+[package.metadata.capi.header]
+enabled = false
+
+[package.metadata.capi.library]
+install_subdir = "gstreamer-1.0"
+versioning = false
+
+[package.metadata.capi.pkg_config]
+requires_private = "gstreamer-1.0, gstreamer-base-1.0, gstreamer-rtp-1.0, gobject-2.0, glib-2.0, gmodule-2.0"
diff --git a/net/raptorq/LICENSE-MPL-2.0 b/net/raptorq/LICENSE-MPL-2.0
new file mode 120000
index 000000000..eb5d24fe9
--- /dev/null
+++ b/net/raptorq/LICENSE-MPL-2.0
@@ -0,0 +1 @@
+../../LICENSE-MPL-2.0 \ No newline at end of file
diff --git a/net/raptorq/README.md b/net/raptorq/README.md
new file mode 100644
index 000000000..0948ddecb
--- /dev/null
+++ b/net/raptorq/README.md
@@ -0,0 +1,118 @@
+## Introduction
+This is GStreamer implementation of RaptorQ FEC for RTP streams.
+
+The sender element produces requested number `X` of repair packets from `K` RTP
+packets. The receiver only needs:
+
+- `K` of any repair or RTP packets to recover all the data with 99% probability
+- `K + 1` of any repair or RTP packets to recover all the data with 99.99%
+ probability,
+- `K + 2` of any repair or RTP packets to recover all the data with 99.9999%
+ probability etc.
+
+Relevant documents:
+- [RFC6363 - Forward Error Correction (FEC) Framework](https://datatracker.ietf.org/doc/html/rfc6363)
+- [RFC6681 - Raptor Forward Error Correction (FEC) Schemes for FECFRAME](https://datatracker.ietf.org/doc/html/rfc6681)
+- [RFC6682 - RTP Payload Format for Raptor Forward Error Correction (FEC)](https://datatracker.ietf.org/doc/html/rfc6682)
+
+
+## Sender/Receiver Example
+```shell
+ gst-launch-1.0 \
+ rtpbin name=rtp fec-encoders='fec,0="raptorqenc\ mtu=1356\ symbol-size=192";' \
+ uridecodebin uri=file:///path/to/video/file ! x264enc key-int-max=60 tune=zerolatency ! \
+ queue ! mpegtsmux ! rtpmp2tpay ssrc=0 ! \
+ rtp.send_rtp_sink_0 rtp.send_rtp_src_0 ! udpsink host=127.0.0.1 port=5000 \
+ rtp.send_fec_src_0_0 ! udpsink host=127.0.0.1 port=5002 async=false
+
+ gst-launch-1.0 \
+ rtpbin latency=200 fec-decoders='fec,0="raptorqdec";' name=rtp \
+ udpsrc address=127.0.0.1 port=5002 \
+ caps="application/x-rtp, payload=96, raptor-scheme-id=(string)6, repair-window=(string)1000000, t=(string)192" ! \
+ queue ! rtp.recv_fec_sink_0_0 \
+ udpsrc address=127.0.0.1 port=5000 \
+ caps="application/x-rtp, media=video, clock-rate=90000, encoding-name=mp2t, payload=33" ! \
+ queue ! netsim drop-probability=0.05 ! rtp.recv_rtp_sink_0 \
+ rtp. ! decodebin ! videoconvert ! queue ! autovideosink
+```
+
+## Implementation Details
+
+### Encoder Element
+The encoder element stores the copy of original RTP packets internally until it
+receives the number of packets that are requested to be protected together. At
+this point it creates a Source Block that is passed to RaptorQ Encoder. Source
+Block is constructed by concatenating ADUIs (Application Data Unit Information)
+sometimes also called SPI (Source Packet Information). Each ADUI contains:
+
+- Header with Flow ID - `F(I)` and Length Indication for the packet - `L(I)`,
+- UDP payload, this a complete RTP packet with header,
+- Padding bytes if required,
+
+```text
+ T T T T
+ <----------------><--------------><---------------><---------------->
+ +----+--------+-----------------------+-----------------------------+
+ |F[0]| L[0] | ADU[0] | Pad[0] |
+ +----+--------+----------+------------+-----------------------------+
+ |F[1]| L[1] | ADU[1] | Pad[1] |
+ +----+--------+----------+------------------------------------------+
+ |F[2]| L[2] | ADU[2] |
+ +----+--------+------+----------------------------------------------+
+ |F[3]| L[3] |ADU[3]| Pad[3] |
+ +----+--------+------+----------------------------------------------+
+ \_________________________________ ________________________________/
+ \/
+ RaptorQ FEC encoding
+
+ +-------------------------------------------------------------------+
+ | Repair 4 |
+ +-------------------------------------------------------------------+
+ . .
+ . .
+ +-------------------------------------------------------------------+
+ | Repair 7 |
+ +-------------------------------------------------------------------+
+
+ T - Symbol Size
+ F - Flow ID
+ L - Length Indication
+ ADU - Application Data Unit (RTP packet)
+```
+
+Encoder element creates requested number of packets for a given Source Block.
+The repair packets are send during `repair-window` which is configurable
+parameter. E.g. if encoder element produces 5 repair packets and `repair-window`
+is set to 500ms, a first repair packet is send 100ms after the last protected
+packet, second at 200ms and the last at `repair-window`.
+
+Each repair packet except the symbols that are required to recover missing
+source packets, contains also the information about the Source Block:
+
+- `I` - Initial sequence number of the Source Block,
+- `Lp` - ADUI length in symbols,
+- `Lb` - Source Block Length in symbols,
+
+### Decoder Element
+Decoder element stores the copy of received RTP packets, and push original
+packet downstream immediately. If all the RTP packets have been received, the
+buffered media packets are dropped. If any packets are missing, the receiver
+checks if it has enough buffered media and repair packets to perform decoding.
+If that's the case it tries to recover missing packets by building the Source
+Block following the same rules as sender, except it skips missing packets and
+append repair packets to the block instead.
+
+Because the receiver element does not introduce latency, the recovered packets
+are send out of sequence, and it requires a `rtpjitterbuffer` to be chained
+downstream. The `rtpjitterbuffer` needs to be configured with high enough
+latency.
+
+The receiver to determine which media packets belongs to Source Blocks uses the
+information that can be retrieved from any of the repair packets. Then media
+packets with Sequence Numbers: `I + Lb/Lp - 1` inclusive, are considered during
+building a Source Block.
+
+The receiver uses `repair-window` that is signaled by the sender, and its own
+`repair-window-tolerance` parameter to decide for how long it should wait for
+the corresponding repair packets before giving up. The wait time is
+`repair-window + repair-window-tolerance`.
diff --git a/net/raptorq/build.rs b/net/raptorq/build.rs
new file mode 100644
index 000000000..cda12e57e
--- /dev/null
+++ b/net/raptorq/build.rs
@@ -0,0 +1,3 @@
+fn main() {
+ gst_plugin_version_helper::info()
+}
diff --git a/net/raptorq/src/fecscheme.rs b/net/raptorq/src/fecscheme.rs
new file mode 100644
index 000000000..1f62eddf2
--- /dev/null
+++ b/net/raptorq/src/fecscheme.rs
@@ -0,0 +1,96 @@
+// Copyright (C) 2022 Tomasz Andrzejak <andreiltd@gmail.com>
+//
+// This Source Code Form is subject to the terms of the Mozilla Public
+// License, v. 2.0. If a copy of the MPL was not distributed with this
+// file, You can obtain one at http://mozilla.org/MPL/2.0/.
+
+pub const MAX_SOURCE_BLOCK_LEN: usize = 56403;
+pub const MAX_ENCODING_SYMBOL_SIZE: usize = 65536;
+
+// RFC6681, section 8.1.1.1
+pub const FEC_SCHEME_ID: u32 = 6;
+
+#[derive(Clone, Debug, PartialEq)]
+pub struct DataUnitHeader {
+ pub flow_indication: u8,
+ pub len_indication: u16,
+}
+
+// RFC6881, section 5
+impl DataUnitHeader {
+ pub fn encode(&self) -> [u8; 3] {
+ let mut bytes: [u8; 3] = [0; 3];
+
+ bytes[0] = self.flow_indication;
+ bytes[1..3].copy_from_slice(&self.len_indication.to_be_bytes());
+ bytes
+ }
+
+ pub fn decode(bytes: [u8; 3]) -> Self {
+ Self {
+ flow_indication: bytes[0],
+ len_indication: u16::from_be_bytes([bytes[1], bytes[2]]),
+ }
+ }
+}
+
+// RFC6881, section 8.1.3
+#[derive(Clone, Debug, PartialEq)]
+pub struct RepairPayloadId {
+ pub initial_sequence_num: u16,
+ pub source_block_len: u16,
+ pub encoding_symbol_id: u32, // 24 bits
+}
+
+impl RepairPayloadId {
+ pub fn encode(&self) -> [u8; 7] {
+ let mut bytes: [u8; 7] = [0; 7];
+
+ bytes[0..2].copy_from_slice(&self.initial_sequence_num.to_be_bytes());
+ bytes[2..4].copy_from_slice(&self.source_block_len.to_be_bytes());
+ bytes[4..7].copy_from_slice(&self.encoding_symbol_id.to_be_bytes()[1..4]);
+ bytes
+ }
+
+ pub fn decode(bytes: [u8; 7]) -> Self {
+ Self {
+ initial_sequence_num: u16::from_be_bytes([bytes[0], bytes[1]]),
+ source_block_len: u16::from_be_bytes([bytes[2], bytes[3]]),
+ encoding_symbol_id: u32::from_be_bytes([0, bytes[4], bytes[5], bytes[6]]),
+ }
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ #[test]
+ fn test_repair_payload_encode() {
+ let payload_id = RepairPayloadId {
+ initial_sequence_num: 42,
+ source_block_len: 43,
+ encoding_symbol_id: 44,
+ };
+
+ let encoded = payload_id.encode();
+ assert_eq!(encoded.len(), 7);
+
+ let decoded = RepairPayloadId::decode(encoded);
+ assert_eq!(payload_id, decoded);
+ }
+
+ #[test]
+ fn test_unit_data_header_encode() {
+ let header = DataUnitHeader {
+ flow_indication: 42,
+ len_indication: 43,
+ };
+
+ let encoded = header.encode();
+ assert_eq!(encoded.len(), 3);
+
+ let decoded = DataUnitHeader::decode(encoded);
+ assert_eq!(header, decoded);
+ }
+}
diff --git a/net/raptorq/src/lib.rs b/net/raptorq/src/lib.rs
new file mode 100644
index 000000000..0fd3b1258
--- /dev/null
+++ b/net/raptorq/src/lib.rs
@@ -0,0 +1,33 @@
+// Copyright (C) 2022 Tomasz Andrzejak <andreiltd@gmail.com>
+//
+// This Source Code Form is subject to the terms of the Mozilla Public
+// License, v. 2.0. If a copy of the MPL was not distributed with this
+// file, You can obtain one at http://mozilla.org/MPL/2.0/.
+
+#![allow(clippy::non_send_fields_in_send_ty)]
+#![doc = include_str!("../README.md")]
+
+use gst::glib;
+
+mod fecscheme;
+mod raptorqdec;
+mod raptorqenc;
+
+fn plugin_init(plugin: &gst::Plugin) -> Result<(), glib::BoolError> {
+ raptorqdec::register(plugin)?;
+ raptorqenc::register(plugin)?;
+
+ Ok(())
+}
+
+gst::plugin_define!(
+ raptorq,
+ env!("CARGO_PKG_DESCRIPTION"),
+ plugin_init,
+ concat!(env!("CARGO_PKG_VERSION"), "-", env!("COMMIT_ID")),
+ "MPL",
+ env!("CARGO_PKG_NAME"),
+ env!("CARGO_PKG_NAME"),
+ env!("CARGO_PKG_REPOSITORY"),
+ env!("BUILD_REL_DATE")
+);
diff --git a/net/raptorq/src/raptorqdec/imp.rs b/net/raptorq/src/raptorqdec/imp.rs
new file mode 100644
index 000000000..6ba2d6b80
--- /dev/null
+++ b/net/raptorq/src/raptorqdec/imp.rs
@@ -0,0 +1,939 @@
+// Copyright (C) 2022 Tomasz Andrzejak <andreiltd@gmail.com>
+//
+// This Source Code Form is subject to the terms of the Mozilla Public
+// License, v. 2.0. If a copy of the MPL was not distributed with this
+// file, You can obtain one at http://mozilla.org/MPL/2.0/.
+
+use gst::{error_msg, glib};
+
+use gst::prelude::*;
+use gst::subclass::prelude::*;
+
+use gst_rtp::RTPBuffer;
+
+use once_cell::sync::Lazy;
+
+use std::collections::BTreeMap;
+use std::iter;
+use std::ops::Range;
+use std::sync::Mutex;
+
+use raptorq::{EncodingPacket, ObjectTransmissionInformation, PayloadId, SourceBlockDecoder};
+
+use crate::fecscheme::{self, DataUnitHeader, RepairPayloadId};
+
+static CAT: Lazy<gst::DebugCategory> = Lazy::new(|| {
+ gst::DebugCategory::new(
+ "raptorqdec",
+ gst::DebugColorFlags::empty(),
+ Some("RTP RaptorQ Decoder"),
+ )
+});
+
+const DEFAULT_REPAIR_WINDOW_TOLERANCE: u32 = 500;
+const DEFAULT_MEDIA_PACKETS_RESET_THRESHOLD: u32 = 5000;
+
+#[derive(Debug, Clone, Copy)]
+struct Settings {
+ repair_window_tolerance: u32,
+ media_packets_reset_threshold: u32,
+}
+
+impl Default for Settings {
+ fn default() -> Self {
+ Self {
+ repair_window_tolerance: DEFAULT_REPAIR_WINDOW_TOLERANCE,
+ media_packets_reset_threshold: DEFAULT_MEDIA_PACKETS_RESET_THRESHOLD,
+ }
+ }
+}
+
+#[derive(Debug, Default, Clone, Copy)]
+struct Stats {
+ recv: u64,
+ lost: u64,
+ recovered: u64,
+}
+
+#[derive(Debug, Clone, Copy)]
+struct SourceBlockInfo {
+ initial_seq: u64,
+ symbols_per_block: u64,
+ symbols_per_packet: u64,
+}
+
+impl SourceBlockInfo {
+ fn seq_range(&self) -> Range<u64> {
+ // RFC 6881, section 8.2.2
+ let i = self.initial_seq;
+ let lp = self.symbols_per_packet;
+ let lb = self.symbols_per_block;
+
+ i..i + lb / lp
+ }
+
+ fn packets_num(&self) -> usize {
+ (self.symbols_per_block / self.symbols_per_packet) as usize
+ }
+}
+
+#[derive(Debug, Clone)]
+struct RepairPacketItem {
+ payload_id: RepairPayloadId,
+ payload: Vec<u8>,
+}
+
+#[derive(Debug, Clone)]
+struct MediaPacketItem {
+ header: DataUnitHeader,
+ payload: Vec<u8>,
+}
+
+#[derive(Default)]
+struct State {
+ media_packets: BTreeMap<u64, MediaPacketItem>,
+ repair_packets: BTreeMap<u64, Vec<RepairPacketItem>>,
+ expirations: BTreeMap<u64, Option<gst::ClockTime>>,
+ source_block_info: BTreeMap<u64, SourceBlockInfo>,
+ extended_media_seq: Option<u64>,
+ extended_repair_seq: Option<u64>,
+ symbol_size: usize,
+ media_packets_reset_threshold: usize,
+ repair_window: Option<gst::ClockTime>,
+ max_arrival_time: Option<gst::ClockTime>,
+ stats: Stats,
+}
+
+impl State {
+ fn drop_source_block(&mut self, seq: u64) {
+ if let Some(info) = self.source_block_info.get(&seq) {
+ let (seq_lo, seq_hi) = (info.seq_range().start, info.seq_range().end);
+
+ self.media_packets.retain(|&k, _| k >= seq_hi);
+ self.repair_packets.remove(&seq_lo);
+ self.source_block_info.remove(&seq_lo);
+ self.expirations.remove(&seq_lo);
+ }
+ }
+
+ fn expire_packets(&mut self) -> Vec<u64> {
+ let expired = self
+ .expirations
+ .iter()
+ .filter_map(|(&seq, &expiration)| {
+ if self.max_arrival_time.opt_gt(expiration) == Some(true) {
+ Some(seq)
+ } else {
+ None
+ }
+ })
+ .collect::<Vec<_>>();
+
+ for seq in &expired {
+ self.drop_source_block(*seq);
+ }
+
+ expired
+ }
+}
+
+pub struct RaptorqDec {
+ sinkpad: gst::Pad,
+ srcpad: gst::Pad,
+ sinkpad_fec: Mutex<Option<gst::Pad>>,
+ settings: Mutex<Settings>,
+ state: Mutex<State>,
+}
+
+impl RaptorqDec {
+ fn process_source_block(
+ &self,
+ element: &super::RaptorqDec,
+ state: &mut State,
+ ) -> Result<gst::FlowSuccess, gst::FlowError> {
+ // Pull the information about the current Source Block from sequence.
+ // Data packets for current Source Block are in range: info.seq_range(),
+ // Repair Packets on the other hand, for a given block share the same key
+ // in the sequence, which is the lower seq bound of Data Packets.
+ let source_block_info = state
+ .source_block_info
+ .values()
+ .cloned()
+ .collect::<Vec<_>>();
+
+ for info in source_block_info {
+ let (seq_lo, seq_hi) = (info.seq_range().start, info.seq_range().end);
+ let data_packets_num = state.media_packets.range(seq_lo..seq_hi).count();
+ let n = info.packets_num();
+
+ if data_packets_num == n {
+ gst::trace!(
+ CAT,
+ obj: element,
+ "All packets ({}) received, dropping Source Block ({})",
+ data_packets_num,
+ seq_lo
+ );
+
+ state.drop_source_block(seq_lo);
+ continue;
+ }
+
+ let repair_packets_num = state.repair_packets.entry(seq_lo).or_default().len();
+
+ // Wait until we have enough Symbols to start decoding a Block
+ if data_packets_num + repair_packets_num < n {
+ continue;
+ }
+
+ // Build Source Block from received Data Packets and append
+ // Repair Packets that have the same initial sequnce number
+ let mut source_block = Vec::with_capacity(
+ (data_packets_num + repair_packets_num)
+ .checked_mul(state.symbol_size)
+ .ok_or(gst::FlowError::NotSupported)?
+ .checked_mul(info.symbols_per_packet as usize)
+ .ok_or(gst::FlowError::NotSupported)?,
+ );
+
+ source_block.extend(
+ Iterator::chain(
+ state
+ .media_packets
+ .range(seq_lo..seq_hi)
+ .map(|(_, packet)| {
+ let si = info.symbols_per_packet as usize;
+ let mut data = vec![0; si * state.symbol_size];
+
+ assert!(data.len() >= packet.payload.len() + 3);
+
+ data[0..3].copy_from_slice(&packet.header.encode());
+ data[3..3 + packet.payload.len()].copy_from_slice(&packet.payload);
+ data
+ }),
+ state
+ .repair_packets
+ .entry(seq_lo)
+ .or_default()
+ .iter()
+ .map(|packet| packet.payload.to_owned()),
+ )
+ .flatten(),
+ );
+
+ // RFC 6881, section 8.2.2
+ let esi = Iterator::chain(
+ state
+ .media_packets
+ .range(seq_lo..seq_hi)
+ .flat_map(|(seq, _)| {
+ let i = (seq - seq_lo) * info.symbols_per_packet;
+ (i..i + info.symbols_per_packet).collect::<Vec<_>>()
+ }),
+ state
+ .repair_packets
+ .entry(seq_lo)
+ .or_default()
+ .iter()
+ .flat_map(|packet| {
+ let i = packet.payload_id.encoding_symbol_id as u64;
+ (i..i + info.symbols_per_packet).collect::<Vec<_>>()
+ }),
+ )
+ .collect::<Vec<_>>();
+
+ let symbolsz = state.symbol_size as u64;
+ let blocksz = info.symbols_per_block * symbolsz;
+
+ let config = ObjectTransmissionInformation::new(0, symbolsz as u16, 1, 1, 8);
+ let mut decoder = SourceBlockDecoder::new2(0, &config, blocksz);
+ let mut result = None;
+
+ for (esi, symbol) in
+ Iterator::zip(esi.iter(), source_block.chunks_exact(state.symbol_size))
+ {
+ let payload_id = PayloadId::new(0, *esi as u32);
+ let encoding_packet = EncodingPacket::new(payload_id, symbol.to_vec());
+
+ result = decoder.decode(iter::once(encoding_packet));
+ if result.is_some() {
+ break;
+ }
+ }
+
+ if let Some(data) = result {
+ // Find missing packets in the Source Block
+ let missing_indices = (seq_lo..seq_hi)
+ .filter_map(|seq| match state.media_packets.contains_key(&seq) {
+ false => Some((seq - seq_lo) as usize),
+ true => None,
+ })
+ .collect::<Vec<_>>();
+
+ let pktsz = (info.symbols_per_packet * symbolsz) as usize;
+
+ let recovered_packets = missing_indices
+ .iter()
+ .filter_map(|i| {
+ let packet = &data[i * pktsz..];
+ let header = packet[0..3].try_into().ok()?;
+
+ let len = DataUnitHeader::decode(header).len_indication as usize;
+
+ // Length indication does not account for Unit Header and RTP header
+ if packet.len() >= len + 3 + 12 {
+ let data_unit = packet[3..len + 12 + 3].to_owned();
+ let mut buf = gst::Buffer::from_slice(data_unit);
+
+ let buf_mut = buf.get_mut().unwrap();
+ buf_mut.set_dts(state.max_arrival_time);
+
+ return Some(buf);
+ }
+
+ None
+ })
+ .collect::<Vec<_>>();
+
+ state.drop_source_block(seq_lo);
+ state.stats.lost += missing_indices.len() as u64;
+
+ for packet in recovered_packets {
+ {
+ let rtpbuf = RTPBuffer::from_buffer_readable(&packet).unwrap();
+
+ gst::debug!(
+ CAT,
+ obj: element,
+ "Succesfully recovered packet: seqnum: {}, len: {}, ts: {}",
+ rtpbuf.seq(),
+ rtpbuf.payload_size(),
+ rtpbuf.timestamp(),
+ );
+ }
+
+ state.stats.recovered += 1;
+ self.srcpad.push(packet)?;
+ }
+ }
+ }
+
+ Ok(gst::FlowSuccess::Ok)
+ }
+
+ fn store_media_packet(
+ &self,
+ element: &super::RaptorqDec,
+ state: &mut State,
+ buffer: &gst::Buffer,
+ ) -> Result<gst::FlowSuccess, gst::FlowError> {
+ let this_seq = {
+ let rtpbuf = RTPBuffer::from_buffer_readable(buffer).map_err(|err| {
+ gst::error!(CAT, obj: element, "Failed to map rtp buffer : {}", err);
+ gst::FlowError::Error
+ })?;
+
+ gst::trace!(
+ CAT,
+ obj: element,
+ "New data packet, seq {}, ts {}",
+ rtpbuf.seq(),
+ rtpbuf.timestamp()
+ );
+
+ // Expand cyclic sequence numbers to u64, start from u16::MAX so we
+ // never overflow substraction.
+ let seq = rtpbuf.seq();
+ let prev_seq = state.extended_media_seq.unwrap_or(65_535 + seq as u64);
+
+ let delta = gst_rtp::compare_seqnum(prev_seq as u16, seq);
+
+ match delta.is_negative() {
+ true => prev_seq - delta.unsigned_abs() as u64,
+ false => prev_seq + delta.unsigned_abs() as u64,
+ }
+ };
+
+ let map = buffer.map_readable().map_err(|_| gst::FlowError::Error)?;
+
+ // As defined in RFC6881, section 8.2.4, length indication
+ // should be equal to UDP packet length without RTP header.
+ let header = DataUnitHeader {
+ flow_indication: 0,
+ len_indication: buffer.size() as u16 - 12,
+ };
+
+ state.media_packets.insert(
+ this_seq,
+ MediaPacketItem {
+ header,
+ payload: map.to_vec(),
+ },
+ );
+
+ state.stats.recv += 1;
+ state.extended_media_seq = Some(this_seq);
+
+ let now = buffer.dts_or_pts();
+ state.max_arrival_time = state.max_arrival_time.opt_max(now).or(now);
+
+ Ok(gst::FlowSuccess::Ok)
+ }
+
+ fn sink_chain(
+ &self,
+ _pad: &gst::Pad,
+ element: &super::RaptorqDec,
+ buffer: gst::Buffer,
+ ) -> Result<gst::FlowSuccess, gst::FlowError> {
+ let mut state = self.state.lock().unwrap();
+ self.store_media_packet(element, &mut state, &buffer)?;
+
+ // Retire the packets that have been around for too long
+ let expired = state.expire_packets();
+ for seq in expired {
+ gst::trace!(
+ CAT,
+ obj: element,
+ "Source Block ({}) dropped, because max wait time has been exceeded",
+ seq as u16
+ );
+ }
+
+ // This is the fuse to make sure we are not growing RTP storage indefinitely.
+ let thresh = state.media_packets_reset_threshold;
+ if thresh > 0 && state.media_packets.len() >= thresh {
+ gst::warning!(
+ CAT,
+ obj: element,
+ "Too many buffered media packets, resetting decoder. This might \
+ be because we haven't received a repair packet for too long, or \
+ repair packets have no valid timestamps.",
+ );
+
+ self.reset(element);
+ }
+
+ self.process_source_block(element, &mut state)?;
+ drop(state);
+
+ self.srcpad.push(buffer)
+ }
+
+ fn fec_sink_chain(
+ &self,
+ _pad: &gst::Pad,
+ element: &super::RaptorqDec,
+ buffer: gst::Buffer,
+ ) -> Result<gst::FlowSuccess, gst::FlowError> {
+ let rtpbuf = RTPBuffer::from_buffer_readable(&buffer).map_err(|err| {
+ gst::error!(CAT, obj: element, "Failed to map rtp buffer : {}", err);
+ gst::FlowError::Error
+ })?;
+
+ let payload = rtpbuf.payload().unwrap();
+ let payload_id = payload[0..7].try_into().map_err(|err| {
+ gst::error!(CAT, obj: element, "Unexpected rtp fec payload : {}", err);
+ gst::FlowError::Error
+ })?;
+
+ let mut state = self.state.lock().unwrap();
+
+ let id = RepairPayloadId::decode(payload_id);
+
+ let i = id.initial_sequence_num;
+ let lb = id.source_block_len as u64;
+ let lp = ((payload.len() - payload_id.len()) / state.symbol_size) as u64;
+
+ gst::trace!(
+ CAT,
+ obj: element,
+ "New repair packet, I: {}, LP: {}, LB: {}",
+ i,
+ lp,
+ lb,
+ );
+
+ // Expand cyclic sequence numbers to u64, start from u16::MAX so we
+ // never overflow substraction.
+ let prev_seq = state.extended_repair_seq.unwrap_or(65_535 + i as u64);
+ let delta = gst_rtp::compare_seqnum(prev_seq as u16, i);
+
+ let this_seq = match delta.is_negative() {
+ true => prev_seq - delta.unsigned_abs() as u64,
+ false => prev_seq + delta.unsigned_abs() as u64,
+ };
+
+ state.extended_repair_seq = Some(this_seq);
+
+ let expire_at = state.max_arrival_time.opt_add(state.repair_window);
+ let scheduled = state.expirations.entry(this_seq).or_insert(expire_at);
+
+ // Update already scheduled expiration if a new value happens to be earlier
+ *scheduled = scheduled.opt_min(expire_at);
+
+ state
+ .source_block_info
+ .entry(this_seq)
+ .or_insert(SourceBlockInfo {
+ initial_seq: this_seq,
+ symbols_per_block: lb,
+ symbols_per_packet: lp,
+ });
+
+ state
+ .repair_packets
+ .entry(this_seq)
+ .or_insert_with(Vec::new)
+ .push(RepairPacketItem {
+ payload_id: id,
+ payload: payload[7..].to_vec(), // without PayloadId
+ });
+
+ assert_eq!(state.repair_packets.len(), state.source_block_info.len());
+ assert_eq!(state.repair_packets.len(), state.expirations.len());
+
+ Ok(gst::FlowSuccess::Ok)
+ }
+
+ fn sink_event(&self, pad: &gst::Pad, element: &super::RaptorqDec, event: gst::Event) -> bool {
+ gst::debug!(CAT, "Handling event {:?}", event);
+ use gst::EventView;
+
+ if let EventView::FlushStop(_) = event.view() {
+ self.reset(element);
+ }
+
+ pad.event_default(Some(element), event)
+ }
+
+ fn fec_sink_event(
+ &self,
+ pad: &gst::Pad,
+ element: &super::RaptorqDec,
+ event: gst::Event,
+ ) -> bool {
+ gst::debug!(CAT, "Handling event {:?}", event);
+ use gst::EventView;
+
+ if let EventView::Caps(c) = event.view() {
+ if let Err(err) = self.start(element, c.caps()) {
+ gst::element_error!(
+ element,
+ gst::CoreError::Event,
+ ["Failed to start raptorqdec {:?}", err]
+ );
+
+ return false;
+ }
+ }
+
+ pad.event_default(Some(element), event)
+ }
+
+ fn iterate_internal_links(
+ &self,
+ pad: &gst::Pad,
+ _element: &super::RaptorqDec,
+ ) -> gst::Iterator<gst::Pad> {
+ if pad == &self.srcpad {
+ gst::Iterator::from_vec(vec![self.sinkpad.clone()])
+ } else if pad == &self.sinkpad {
+ gst::Iterator::from_vec(vec![self.srcpad.clone()])
+ } else {
+ gst::Iterator::from_vec(vec![])
+ }
+ }
+
+ fn start(
+ &self,
+ element: &super::RaptorqDec,
+ incaps: &gst::CapsRef,
+ ) -> Result<(), gst::ErrorMessage> {
+ let symbol_size = fmtp_param_from_caps::<usize>("t", incaps)?;
+
+ if symbol_size > fecscheme::MAX_ENCODING_SYMBOL_SIZE {
+ let details = format!(
+ "Symbol size exceeds Maximum Encoding Symbol Size: {}",
+ fecscheme::MAX_ENCODING_SYMBOL_SIZE
+ );
+
+ gst::element_error!(element, gst::CoreError::Failed, [&details]);
+ return Err(error_msg!(gst::CoreError::Failed, [&details]));
+ }
+
+ let settings = self.settings.lock().unwrap();
+
+ let tolerance = settings.repair_window_tolerance as u64;
+ let repair_window = fmtp_param_from_caps::<u64>("repair-window", incaps)?;
+
+ let tolerance = gst::ClockTime::from_mseconds(tolerance);
+ let repair_window = gst::ClockTime::from_useconds(repair_window);
+ let repair_window = Some(repair_window + tolerance);
+
+ let media_packets_reset_threshold = settings.media_packets_reset_threshold as usize;
+
+ gst::debug!(CAT, obj: element, "Configured for caps {}", incaps);
+
+ let mut state = self.state.lock().unwrap();
+
+ state.symbol_size = symbol_size;
+ state.repair_window = repair_window;
+ state.media_packets_reset_threshold = media_packets_reset_threshold;
+
+ Ok(())
+ }
+
+ fn stop(&self, element: &super::RaptorqDec) {
+ self.reset(element);
+ }
+
+ fn reset(&self, _element: &super::RaptorqDec) {
+ let mut state = self.state.lock().unwrap();
+
+ state.media_packets.clear();
+ state.repair_packets.clear();
+ state.source_block_info.clear();
+ state.expirations.clear();
+ state.extended_media_seq = None;
+ state.extended_repair_seq = None;
+ state.max_arrival_time = gst::ClockTime::NONE;
+ state.stats = Default::default();
+ }
+}
+
+#[glib::object_subclass]
+impl ObjectSubclass for RaptorqDec {
+ const NAME: &'static str = "GstRaptorqDec";
+ type Type = super::RaptorqDec;
+ type ParentType = gst::Element;
+
+ fn with_class(klass: &Self::Class) -> Self {
+ let templ = klass.pad_template("sink").unwrap();
+ let sinkpad = gst::Pad::builder_with_template(&templ, Some("sink"))
+ .chain_function(|pad, parent, buffer| {
+ Self::catch_panic_pad_function(
+ parent,
+ || Err(gst::FlowError::Error),
+ |this, element| this.sink_chain(pad, element, buffer),
+ )
+ })
+ .event_function(|pad, parent, event| {
+ Self::catch_panic_pad_function(
+ parent,
+ || false,
+ |this, element| this.sink_event(pad, element, event),
+ )
+ })
+ .iterate_internal_links_function(|pad, parent| {
+ Self::catch_panic_pad_function(
+ parent,
+ || gst::Iterator::from_vec(vec![]),
+ |this, element| this.iterate_internal_links(pad, element),
+ )
+ })
+ .flags(gst::PadFlags::PROXY_CAPS)
+ .build();
+
+ let templ = klass.pad_template("src").unwrap();
+ let srcpad = gst::Pad::builder_with_template(&templ, Some("src"))
+ .iterate_internal_links_function(|pad, parent| {
+ Self::catch_panic_pad_function(
+ parent,
+ || gst::Iterator::from_vec(vec![]),
+ |this, element| this.iterate_internal_links(pad, element),
+ )
+ })
+ .flags(gst::PadFlags::PROXY_CAPS)
+ .build();
+
+ Self {
+ srcpad,
+ sinkpad,
+ sinkpad_fec: Mutex::new(None),
+ settings: Mutex::new(Default::default()),
+ state: Mutex::new(Default::default()),
+ }
+ }
+}
+
+impl ObjectImpl for RaptorqDec {
+ fn properties() -> &'static [glib::ParamSpec] {
+ static PROPERTIES: Lazy<Vec<glib::ParamSpec>> = Lazy::new(|| {
+ vec![
+ glib::ParamSpecUInt::new(
+ "repair-window-tolerance",
+ "Repair Window Tolerance (ms)",
+ "The amount of time to add to repair-window reported by RaptorQ encoder (in ms)",
+ 0,
+ u32::MAX - 1,
+ DEFAULT_REPAIR_WINDOW_TOLERANCE,
+ glib::ParamFlags::READWRITE | gst::PARAM_FLAG_MUTABLE_READY,
+ ),
+ glib::ParamSpecUInt::new(
+ "media-packets-reset-threshold",
+ "Media Packets Reset Threshold",
+ "This is the maximum allowed number of buffered packets, before we reset the decoder. \
+ It can only be triggered if we don't receive repair packets for too long, or packets \
+ have no valid timestamps, (0 - disable)",
+ 0,
+ u32::MAX - 1,
+ DEFAULT_MEDIA_PACKETS_RESET_THRESHOLD,
+ glib::ParamFlags::READWRITE | gst::PARAM_FLAG_MUTABLE_READY,
+ ),
+ glib::ParamSpecBoxed::new(
+ "stats",
+ "Statistics",
+ "Various statistics",
+ gst::Structure::static_type(),
+ glib::ParamFlags::READABLE,
+ ),
+ ]
+ });
+
+ PROPERTIES.as_ref()
+ }
+
+ fn set_property(
+ &self,
+ _obj: &Self::Type,
+ _id: usize,
+ value: &glib::Value,
+ pspec: &glib::ParamSpec,
+ ) {
+ match pspec.name() {
+ "repair-window-tolerance" => {
+ let mut settings = self.settings.lock().unwrap();
+ let val = value.get().expect("type checked upstream");
+ settings.repair_window_tolerance = val;
+ }
+
+ "media-packets-reset-threshold" => {
+ let mut settings = self.settings.lock().unwrap();
+ let val = value.get().expect("type checked upstream");
+ settings.media_packets_reset_threshold = val;
+ }
+ _ => unimplemented!(),
+ }
+ }
+
+ fn property(&self, _obj: &Self::Type, _id: usize, pspec: &glib::ParamSpec) -> glib::Value {
+ match pspec.name() {
+ "repair-window-tolerance" => {
+ let settings = self.settings.lock().unwrap();
+ settings.repair_window_tolerance.to_value()
+ }
+ "media-packets-reset-threshold" => {
+ let settings = self.settings.lock().unwrap();
+ settings.media_packets_reset_threshold.to_value()
+ }
+ "stats" => {
+ let state = self.state.lock().unwrap();
+ let stats = state.stats;
+
+ let (media_packets, repair_packets) = (
+ state.media_packets.len() as u64,
+ state
+ .repair_packets
+ .values()
+ .fold(0, |acc, x| acc + x.len() as u64),
+ );
+
+ let s = gst::Structure::builder("application/x-rtp-raptorqdec-stats")
+ .field("received-packets", stats.recv)
+ .field("lost-packets", stats.lost)
+ .field("recovered-packets", stats.recovered)
+ .field("buffered-media-packets", media_packets)
+ .field("buffered-repair-packets", repair_packets)
+ .build();
+
+ s.to_value()
+ }
+ _ => unimplemented!(),
+ }
+ }
+ fn constructed(&self, obj: &Self::Type) {
+ self.parent_constructed(obj);
+
+ obj.add_pad(&self.sinkpad).unwrap();
+ obj.add_pad(&self.srcpad).unwrap();
+ }
+}
+
+impl GstObjectImpl for RaptorqDec {}
+
+impl ElementImpl for RaptorqDec {
+ fn metadata() -> Option<&'static gst::subclass::ElementMetadata> {
+ static ELEMENT_METADATA: Lazy<gst::subclass::ElementMetadata> = Lazy::new(|| {
+ gst::subclass::ElementMetadata::new(
+ "RTP RaptorQ FEC Decoder",
+ "RTP RaptorQ FEC Decoding",
+ "Performs FEC using RaptorQ (RFC6681, RFC6682)",
+ "Tomasz Andrzejak <andreiltd@gmail.com>",
+ )
+ });
+
+ Some(&*ELEMENT_METADATA)
+ }
+
+ fn pad_templates() -> &'static [gst::PadTemplate] {
+ static PAD_TEMPLATES: Lazy<Vec<gst::PadTemplate>> = Lazy::new(|| {
+ let caps = gst::Caps::builder("application/x-rtp").build();
+
+ let srcpad_template = gst::PadTemplate::new(
+ "src",
+ gst::PadDirection::Src,
+ gst::PadPresence::Always,
+ &caps,
+ )
+ .unwrap();
+
+ let sinkpad_template = gst::PadTemplate::new(
+ "sink",
+ gst::PadDirection::Sink,
+ gst::PadPresence::Always,
+ &caps,
+ )
+ .unwrap();
+
+ let sink_fec_caps = gst::Caps::builder("application/x-rtp")
+ .field("raptor-scheme-id", fecscheme::FEC_SCHEME_ID.to_string())
+ // All fmtp paramters from SDP are string in caps, those are
+ // required parameters that cannot be expressed as string:
+ // .field("kmax", (string) [1, MAX_SOURCE_BLOCK_LEN])
+ // .field("t", (string) [1, MAX_ENCODING_SYMBOL_SIZE])
+ // .field("repair-window", (string) ANY)
+ .build();
+
+ let sinkpad_fec_template = gst::PadTemplate::new(
+ "fec_%u",
+ gst::PadDirection::Sink,
+ gst::PadPresence::Request,
+ &sink_fec_caps,
+ )
+ .unwrap();
+
+ vec![srcpad_template, sinkpad_template, sinkpad_fec_template]
+ });
+
+ PAD_TEMPLATES.as_ref()
+ }
+
+ fn change_state(
+ &self,
+ element: &Self::Type,
+ transition: gst::StateChange,
+ ) -> Result<gst::StateChangeSuccess, gst::StateChangeError> {
+ gst::trace!(CAT, obj: element, "Changing state {:?}", transition);
+
+ match transition {
+ gst::StateChange::ReadyToPaused => {
+ self.reset(element);
+ }
+ gst::StateChange::PausedToReady => {
+ self.stop(element);
+ }
+ _ => (),
+ }
+
+ self.parent_change_state(element, transition)
+ }
+
+ fn request_new_pad(
+ &self,
+ element: &Self::Type,
+ templ: &gst::PadTemplate,
+ name: Option<String>,
+ _caps: Option<&gst::Caps>,
+ ) -> Option<gst::Pad> {
+ let mut sinkpad_fec_guard = self.sinkpad_fec.lock().unwrap();
+
+ if sinkpad_fec_guard.is_some() {
+ gst::element_error!(
+ element,
+ gst::CoreError::Pad,
+ ["Not accepting more than one FEC stream"]
+ );
+
+ return None;
+ }
+
+ let sinkpad_fec = gst::Pad::builder_with_template(templ, name.as_deref())
+ .chain_function(|pad, parent, buffer| {
+ Self::catch_panic_pad_function(
+ parent,
+ || Err(gst::FlowError::Error),
+ |this, element| this.fec_sink_chain(pad, element, buffer),
+ )
+ })
+ .event_function(|pad, parent, event| {
+ Self::catch_panic_pad_function(
+ parent,
+ || false,
+ |this, element| this.fec_sink_event(pad, element, event),
+ )
+ })
+ .iterate_internal_links_function(|pad, parent| {
+ Self::catch_panic_pad_function(
+ parent,
+ || gst::Iterator::from_vec(vec![]),
+ |this, element| this.iterate_internal_links(pad, element),
+ )
+ })
+ .build();
+
+ sinkpad_fec.set_active(true).unwrap();
+ *sinkpad_fec_guard = Some(sinkpad_fec.clone());
+
+ drop(sinkpad_fec_guard);
+
+ element.add_pad(&sinkpad_fec).unwrap();
+
+ Some(sinkpad_fec)
+ }
+
+ fn release_pad(&self, element: &Self::Type, _pad: &gst::Pad) {
+ let mut pad_guard = self.sinkpad_fec.lock().unwrap();
+
+ if let Some(pad) = pad_guard.take() {
+ drop(pad_guard);
+ pad.set_active(false).unwrap();
+ element.remove_pad(&pad).unwrap();
+ }
+ }
+}
+
+fn fmtp_param_from_caps<T: std::str::FromStr>(
+ name: &str,
+ caps: &gst::CapsRef,
+) -> Result<T, gst::ErrorMessage>
+where
+ <T as std::str::FromStr>::Err: std::fmt::Debug,
+{
+ caps.structure(0)
+ .unwrap()
+ .get::<String>(name)
+ .map_err(|err| {
+ error_msg!(
+ gst::CoreError::Caps,
+ [
+ "Could not get \"{}\" param from caps {:?}, err: {:?}",
+ name,
+ caps,
+ err
+ ]
+ )
+ })?
+ .parse::<T>()
+ .map_err(|err| {
+ error_msg!(
+ gst::CoreError::Caps,
+ [
+ "Could not parse \"{}\" param from caps {:?}, err: {:?}",
+ name,
+ caps,
+ err
+ ]
+ )
+ })
+}
diff --git a/net/raptorq/src/raptorqdec/mod.rs b/net/raptorq/src/raptorqdec/mod.rs
new file mode 100644
index 000000000..73fa33af1
--- /dev/null
+++ b/net/raptorq/src/raptorqdec/mod.rs
@@ -0,0 +1,23 @@
+// Copyright (C) 2022 Tomasz Andrzejak <andreiltd@gmail.com>
+//
+// This Source Code Form is subject to the terms of the Mozilla Public
+// License, v. 2.0. If a copy of the MPL was not distributed with this
+// file, You can obtain one at http://mozilla.org/MPL/2.0/.
+
+use gst::glib;
+use gst::prelude::*;
+
+mod imp;
+
+glib::wrapper! {
+ pub struct RaptorqDec(ObjectSubclass<imp::RaptorqDec>) @extends gst::Element, gst::Object;
+}
+
+pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> {
+ gst::Element::register(
+ Some(plugin),
+ "raptorqdec",
+ gst::Rank::Marginal,
+ RaptorqDec::static_type(),
+ )
+}
diff --git a/net/raptorq/src/raptorqenc/imp.rs b/net/raptorq/src/raptorqenc/imp.rs
new file mode 100644
index 000000000..42825ab1c
--- /dev/null
+++ b/net/raptorq/src/raptorqenc/imp.rs
@@ -0,0 +1,969 @@
+// Copyright (C) 2022 Tomasz Andrzejak <andreiltd@gmail.com>
+//
+// This Source Code Form is subject to the terms of the Mozilla Public
+// License, v. 2.0. If a copy of the MPL was not distributed with this
+// file, You can obtain one at http://mozilla.org/MPL/2.0/.
+
+use gst::{element_error, error_msg, glib, loggable_error};
+
+use gst::prelude::*;
+use gst::subclass::prelude::*;
+
+use gst_rtp::rtp_buffer::*;
+use gst_rtp::RTPBuffer;
+
+use once_cell::sync::Lazy;
+
+use std::collections::HashSet;
+use std::sync::{mpsc, Mutex};
+
+use raptorq::{
+ extended_source_block_symbols, ObjectTransmissionInformation, SourceBlockEncoder,
+ SourceBlockEncodingPlan,
+};
+
+use crate::fecscheme::{self, DataUnitHeader, RepairPayloadId};
+
+static CAT: Lazy<gst::DebugCategory> = Lazy::new(|| {
+ gst::DebugCategory::new(
+ "raptorqenc",
+ gst::DebugColorFlags::empty(),
+ Some("RTP RaptorQ Encoder"),
+ )
+});
+
+const DEFAULT_PROTECTED_PACKETS: u32 = 25;
+const DEFAULT_REPAIR_PACKETS: u32 = 5;
+const DEFAULT_REPAIR_WINDOW: u32 = 50;
+const DEFAULT_SYMBOL_SIZE: u32 = 1408;
+const DEFAULT_MTU: u32 = 1400;
+const DEFAULT_PT: u32 = 97;
+
+const SYMBOL_ALIGNMENT: usize = 8;
+
+#[derive(Debug, Clone, Copy)]
+struct Settings {
+ protected_packets: u32,
+ repair_packets: u32,
+ repair_window: u32,
+ symbol_size: u32,
+ mtu: u32,
+ pt: u32,
+}
+
+impl Default for Settings {
+ fn default() -> Self {
+ Self {
+ protected_packets: DEFAULT_PROTECTED_PACKETS,
+ repair_packets: DEFAULT_REPAIR_PACKETS,
+ repair_window: DEFAULT_REPAIR_WINDOW,
+ symbol_size: DEFAULT_SYMBOL_SIZE,
+ mtu: DEFAULT_MTU,
+ pt: DEFAULT_PT,
+ }
+ }
+}
+
+type BufferTarget = (Option<gst::ClockTime>, gst::Buffer);
+type BufferTrigger = (gst::ClockId, gst::Buffer);
+
+enum SrcTaskMsg {
+ Schedule(BufferTarget),
+ Timeout(BufferTrigger),
+ Eos,
+}
+
+#[derive(Debug, Clone)]
+struct State {
+ packets: Vec<gst::Buffer>,
+ seqnums: Vec<u16>,
+ sender: Option<mpsc::Sender<SrcTaskMsg>>,
+ segment: gst::FormattedSegment<gst::ClockTime>,
+ repair_packets_num: usize,
+ protected_packets_num: usize,
+ repair_window: usize,
+ symbol_size: usize,
+ symbols_per_packet: usize,
+ symbols_per_block: usize,
+ mtu: usize,
+ pt: u8,
+ seq: u16,
+ ssrc: u32,
+ clock_rate: Option<u32>,
+ info: ObjectTransmissionInformation,
+ plan: SourceBlockEncodingPlan,
+}
+
+pub struct RaptorqEnc {
+ sinkpad: gst::Pad,
+ srcpad: gst::Pad,
+ srcpad_fec: gst::Pad,
+ settings: Mutex<Settings>,
+ state: Mutex<Option<State>>,
+ pending_timers: Mutex<HashSet<gst::ClockId>>,
+}
+
+impl RaptorqEnc {
+ fn process_source_block(
+ element: &super::RaptorqEnc,
+ state: &mut State,
+ now_pts: Option<gst::ClockTime>,
+ now_dts: Option<gst::ClockTime>,
+ now_rtpts: u32,
+ ) -> Result<gst::FlowSuccess, gst::FlowError> {
+ let sender = match &state.sender {
+ Some(sender) => sender,
+ None => return Ok(gst::FlowSuccess::Ok),
+ };
+
+ // Build Source Block, RFC6881, section 8.
+ let mut source_block = Vec::with_capacity(
+ state
+ .symbol_size
+ .checked_mul(state.symbols_per_block)
+ .ok_or(gst::FlowError::NotSupported)?,
+ );
+
+ source_block.extend(state.packets.iter().flat_map(|packet| {
+ // As defined in RFC6881, section 8.2.4, length indication
+ // should be equal to UDP packet length without RTP header.
+ let li = packet.size() - 12;
+ // Value of s[i] should be equal to number of repair symbols
+ // placed in each repair packet.
+ let si = state.symbols_per_packet;
+
+ gst::trace!(
+ CAT,
+ obj: element,
+ "Source Block add ADU: si {}, li {}",
+ si,
+ li
+ );
+
+ let mut data = vec![0; si * state.symbol_size];
+
+ data[0..3].copy_from_slice(
+ &DataUnitHeader {
+ flow_indication: 0,
+ len_indication: li as u16,
+ }
+ .encode(),
+ );
+
+ let packet_map = packet.map_readable().unwrap();
+ let packet_data = packet_map.as_slice();
+
+ data[3..3 + packet.size()].copy_from_slice(packet_data);
+ data
+ }));
+
+ assert_eq!(
+ state.symbol_size * state.symbols_per_block,
+ source_block.len()
+ );
+
+ let encoder =
+ SourceBlockEncoder::with_encoding_plan2(0, &state.info, &source_block, &state.plan);
+
+ let sbl = state.symbols_per_block;
+
+ // Initial sequnce number in Repair Payload ID is a sequence number of
+ // the first packet in the Source Block.
+ let seq = state.seqnums.first().cloned().unwrap();
+
+ // Build FEC packets as defined in RFC6881, section 8.1.3
+ let repair_symbols = state.repair_packets_num * state.symbols_per_packet;
+
+ // Delay step is used to create linearly spaced vector of delays for
+ // repair packets. All the repair packets are send within repair_window
+ // span from the fec srcpad thread.
+ let delay_step = state
+ .repair_window
+ .checked_div(state.repair_packets_num)
+ .unwrap_or(0);
+
+ let delays = (1..=state.repair_packets_num)
+ .map(|n| gst::ClockTime::from_mseconds((n * delay_step) as u64))
+ .collect::<Vec<_>>();
+
+ let base_time = element.base_time();
+ let running_time = state.segment.to_running_time(now_pts);
+
+ for (target_time, repair_packet) in Iterator::zip(
+ delays
+ .iter()
+ .map(|delay| base_time.opt_add(running_time).opt_add(delay)),
+ encoder
+ .repair_packets(0, repair_symbols as u32)
+ .chunks_exact(state.symbols_per_packet)
+ .enumerate()
+ .zip(&delays)
+ .map(|((n, packets), delay)| {
+ let esi = packets[0].payload_id().encoding_symbol_id();
+
+ let payload_id = RepairPayloadId {
+ initial_sequence_num: seq,
+ source_block_len: sbl as u16,
+ encoding_symbol_id: esi,
+ }
+ .encode();
+
+ let fecsz = payload_id.len() + state.symbol_size * state.symbols_per_packet;
+ let mut buf = gst::Buffer::new_rtp_with_sizes(fecsz as u32, 0, 0).unwrap();
+
+ {
+ let buf_mut = buf.get_mut().unwrap();
+ buf_mut.set_pts(now_pts.opt_add(delay));
+ buf_mut.set_dts(now_dts.opt_add(delay));
+
+ let mut rtpbuf = RTPBuffer::from_buffer_writable(buf_mut).unwrap();
+
+ rtpbuf.set_payload_type(state.pt);
+ rtpbuf.set_seq(state.seq);
+ rtpbuf.set_marker(n == state.repair_packets_num - 1);
+
+ if let Some(clock_rate) = state.clock_rate {
+ let rtpdelay = delay
+ .mul_div_round(*gst::ClockTime::SECOND, clock_rate as u64)
+ .unwrap()
+ .nseconds() as u32;
+
+ rtpbuf.set_timestamp(now_rtpts.overflowing_add(rtpdelay).0);
+ }
+
+ state.seq = state.seq.overflowing_add(1).0;
+
+ let payload = rtpbuf.payload_mut().unwrap();
+ payload[0..payload_id.len()].copy_from_slice(&payload_id);
+
+ for (n, packet) in packets.iter().enumerate() {
+ let data = packet.data();
+ let start = payload_id.len() + n * data.len();
+
+ payload[start..start + data.len()].copy_from_slice(data);
+ }
+ }
+
+ buf
+ }),
+ ) {
+ if sender
+ .send(SrcTaskMsg::Schedule((target_time, repair_packet)))
+ .is_err()
+ {
+ break;
+ }
+ }
+
+ state.packets.clear();
+ state.seqnums.clear();
+
+ Ok(gst::FlowSuccess::Ok)
+ }
+
+ fn start_task(&self, element: &super::RaptorqEnc) -> Result<(), gst::LoggableError> {
+ let (sender, receiver) = mpsc::channel();
+
+ let mut state_guard = self.state.lock().unwrap();
+ let state = state_guard.as_mut().unwrap();
+
+ state.sender = Some(sender);
+ drop(state_guard);
+
+ let element_weak = element.downgrade();
+ let pad_weak = self.srcpad_fec.downgrade();
+
+ let mut eos = false;
+
+ self.srcpad_fec
+ .start_task(move || {
+ while let Ok(msg) = receiver.recv() {
+ let pad = match pad_weak.upgrade() {
+ Some(pad) => pad,
+ None => break,
+ };
+
+ let element = match element_weak.upgrade() {
+ Some(element) => element,
+ None => break,
+ };
+
+ match msg {
+ SrcTaskMsg::Timeout((id, buf)) => {
+ let mut timers = element.imp().pending_timers.lock().unwrap();
+ let _ = timers.remove(&id);
+
+ let push_eos = eos && timers.is_empty();
+
+ drop(timers);
+
+ if let Err(err) = pad.push(buf) {
+ gst::element_error!(
+ element,
+ gst::CoreError::Pad,
+ ["Failed to push on src FEC pad {:?}", err]
+ );
+
+ break;
+ }
+
+ if push_eos {
+ pad.push_event(gst::event::Eos::new());
+ break;
+ }
+ }
+ SrcTaskMsg::Schedule((target, buf)) => {
+ let target = match target {
+ Some(target) => target,
+ None => {
+ // No target, push buffer immediately
+ if let Err(err) = pad.push(buf) {
+ gst::element_error!(
+ element,
+ gst::CoreError::Pad,
+ ["Failed to push on src FEC pad {:?}", err]
+ );
+ break;
+ }
+
+ continue;
+ }
+ };
+
+ let clock = match element.clock() {
+ Some(clock) => clock,
+ None => {
+ // No clock provided, push buffer immediately
+ if let Err(err) = pad.push(buf) {
+ gst::element_error!(
+ element,
+ gst::CoreError::Pad,
+ ["Failed to push on src FEC pad {:?}", err]
+ );
+ break;
+ }
+
+ continue;
+ }
+ };
+
+ let timeout_sender = {
+ let state_guard = element.imp().state.lock().unwrap();
+ let state = match state_guard.as_ref() {
+ Some(state) => state,
+ None => break,
+ };
+
+ state.sender.as_ref().unwrap().clone()
+ };
+
+ let timeout = clock.new_single_shot_id(target);
+
+ let mut timers = element.imp().pending_timers.lock().unwrap();
+ timers.insert(timeout.clone().into());
+
+ timeout
+ .wait_async(move |_clock, _time, id| {
+ let id = id.clone();
+ let _ = timeout_sender.send(SrcTaskMsg::Timeout((id, buf)));
+ })
+ .expect("Failed to wait async");
+ }
+ SrcTaskMsg::Eos => {
+ if element.imp().pending_timers.lock().unwrap().is_empty() {
+ pad.push_event(gst::event::Eos::new());
+ break;
+ }
+
+ eos = true;
+ }
+ }
+ }
+
+ // All senders dropped or error
+ let pad = match pad_weak.upgrade() {
+ Some(pad) => pad,
+ None => return,
+ };
+
+ let _ = pad.pause_task();
+ })
+ .map_err(|_| loggable_error!(CAT, "Failed to start pad task"))?;
+
+ Ok(())
+ }
+
+ fn src_activatemode(
+ &self,
+ _pad: &gst::Pad,
+ element: &super::RaptorqEnc,
+ _mode: gst::PadMode,
+ active: bool,
+ ) -> Result<(), gst::LoggableError> {
+ if active {
+ self.start_task(element)?;
+ } else {
+ // element stop should be called at this point so that all mpsc
+ // senders used in task are dropped, otherwise channel can deadlock
+ self.srcpad_fec.stop_task()?;
+ }
+
+ Ok(())
+ }
+
+ fn sink_chain(
+ &self,
+ _pad: &gst::Pad,
+ element: &super::RaptorqEnc,
+ buffer: gst::Buffer,
+ ) -> Result<gst::FlowSuccess, gst::FlowError> {
+ let mut state_guard = self.state.lock().unwrap();
+ let state = state_guard.as_mut().ok_or(gst::FlowError::NotNegotiated)?;
+
+ if buffer.size() > state.mtu {
+ gst::error!(CAT, obj: element, "Packet length exceeds configured MTU");
+ return Err(gst::FlowError::NotSupported);
+ }
+
+ let (curr_seq, now_rtpts) = match RTPBuffer::from_buffer_readable(&buffer) {
+ Ok(rtpbuf) => (rtpbuf.seq(), rtpbuf.timestamp()),
+ Err(_) => {
+ gst::error!(CAT, obj: element, "Mapping to RTP packet failed");
+ return Err(gst::FlowError::NotSupported);
+ }
+ };
+
+ if let Some(last_seq) = state.seqnums.last() {
+ if last_seq.overflowing_add(1).0 != curr_seq {
+ gst::error!(CAT, obj: element, "Got out of sequence packets");
+ return Err(gst::FlowError::NotSupported);
+ }
+ }
+
+ state.packets.push(buffer.clone());
+ state.seqnums.push(curr_seq);
+
+ assert_eq!(state.packets.len(), state.seqnums.len());
+
+ if state.packets.len() == state.protected_packets_num as usize {
+ // We use current buffer timing as a base for repair packets timestamps
+ let now_pts = buffer.pts();
+ let now_dts = buffer.dts_or_pts();
+
+ Self::process_source_block(element, state, now_pts, now_dts, now_rtpts)?;
+ }
+
+ drop(state_guard);
+ self.srcpad.push(buffer)
+ }
+
+ fn sink_event(&self, pad: &gst::Pad, element: &super::RaptorqEnc, event: gst::Event) -> bool {
+ gst::debug!(CAT, "Handling event {:?}", event);
+ use gst::EventView;
+
+ match event.view() {
+ EventView::FlushStart(_) => {
+ if let Err(err) = self.stop(element) {
+ element_error!(
+ element,
+ gst::CoreError::Event,
+ ["Failed to stop encoder after flush start {:?}", err]
+ );
+ return false;
+ }
+
+ let _ = self.srcpad_fec.set_active(false);
+ }
+ EventView::FlushStop(_) => {
+ if let Err(err) = self.start(element) {
+ element_error!(
+ element,
+ gst::CoreError::Event,
+ ["Failed to start encoder after flush stop {:?}", err]
+ );
+ return false;
+ }
+
+ let _ = self.srcpad_fec.set_active(true);
+ }
+ EventView::Caps(ev) => {
+ let caps = ev.caps();
+ gst::info!(CAT, obj: pad, "Got caps {:?}", caps);
+
+ let mut state_guard = self.state.lock().unwrap();
+
+ if let Some(state) = state_guard.as_mut() {
+ let s = caps.structure(0).unwrap();
+
+ // We need clock rate to calculate RTP timestamps of
+ // delayed repair packets.
+ if let Ok(clock_rate) = s.get::<i32>("clock-rate") {
+ if clock_rate <= 0 {
+ element_error!(element, gst::CoreError::Event, ["Invalid clock rate"]);
+ return false;
+ }
+
+ state.clock_rate = Some(clock_rate as u32);
+ }
+ }
+ }
+ EventView::Segment(ev) => {
+ let mut state_guard = self.state.lock().unwrap();
+
+ if let Some(state) = state_guard.as_mut() {
+ let segment = ev.segment().clone();
+ let segment = match segment.downcast::<gst::ClockTime>() {
+ Ok(segment) => segment,
+ Err(_) => {
+ element_error!(
+ element,
+ gst::CoreError::Event,
+ ["Only time segments are supported"]
+ );
+ return false;
+ }
+ };
+
+ state.segment = segment.clone();
+
+ // Push stream events on FEC srcpad as well
+ let pad = &self.srcpad_fec;
+ let stream_id = pad.create_stream_id(element, Some("fec")).to_string();
+
+ let kmax = extended_source_block_symbols(state.symbols_per_block as u32);
+ let scheme_id = fecscheme::FEC_SCHEME_ID;
+
+ // RFC 6682, section 6.1.1
+ let caps = gst::Caps::builder("application/x-rtp")
+ .field("payload", state.pt as i32)
+ .field("ssrc", state.ssrc as i32)
+ .field("clock-rate", state.clock_rate.unwrap_or(0) as i32)
+ .field("encoding-name", "RAPTORFEC")
+ .field("raptor-scheme-id", scheme_id.to_string())
+ .field("kmax", kmax.to_string())
+ .field("repair-window", (state.repair_window * 1000).to_string()) // ms -> us
+ .field("t", state.symbol_size.to_string())
+ .field("p", "B")
+ .build();
+
+ drop(state_guard);
+
+ pad.push_event(gst::event::StreamStart::new(&stream_id));
+ pad.push_event(gst::event::Caps::new(&caps));
+ pad.push_event(gst::event::Segment::new(&segment));
+ }
+ }
+ EventView::Eos(_) => {
+ let mut state_guard = self.state.lock().unwrap();
+
+ if let Some(state) = state_guard.as_mut() {
+ let _ = state.sender.as_ref().unwrap().send(SrcTaskMsg::Eos);
+ }
+ }
+ _ => (),
+ }
+
+ pad.event_default(Some(element), event)
+ }
+
+ fn iterate_internal_links(
+ &self,
+ pad: &gst::Pad,
+ _element: &super::RaptorqEnc,
+ ) -> gst::Iterator<gst::Pad> {
+ if pad == &self.sinkpad {
+ gst::Iterator::from_vec(vec![self.srcpad.clone()])
+ } else if pad == &self.srcpad {
+ gst::Iterator::from_vec(vec![self.sinkpad.clone()])
+ } else {
+ gst::Iterator::from_vec(vec![])
+ }
+ }
+
+ fn start(&self, element: &super::RaptorqEnc) -> Result<(), gst::ErrorMessage> {
+ let settings = self.settings.lock().unwrap();
+
+ let protected_packets_num = settings.protected_packets as usize;
+ let repair_packets_num = settings.repair_packets as usize;
+ let repair_window = settings.repair_window as usize;
+ let symbol_size = settings.symbol_size as usize;
+ let mtu = settings.mtu as usize;
+ let pt = settings.pt as u8;
+
+ // this is the number of repair symbols placed in each repair packet,
+ // it SHALL be the same for all repair packets in a block. This include
+ // 1 byte of flow indication and 2 bytes of lenght indication as defined
+ // in RFC6881, section 8.2.4.
+ let symbols_per_packet = (mtu + 3 + symbol_size - 1) / symbol_size;
+ let symbols_per_block = symbols_per_packet * protected_packets_num;
+
+ if symbol_size.rem_euclid(SYMBOL_ALIGNMENT) != 0 {
+ let details = format!(
+ "Symbol size is not multiple of Symbol Alignment {}",
+ SYMBOL_ALIGNMENT
+ );
+
+ gst::element_error!(element, gst::CoreError::Failed, [&details]);
+ return Err(error_msg!(gst::CoreError::Failed, [&details]));
+ }
+
+ if symbol_size > fecscheme::MAX_ENCODING_SYMBOL_SIZE {
+ let details = format!(
+ "Symbol size exceeds Maximum Encoding Symbol Size: {}",
+ fecscheme::MAX_ENCODING_SYMBOL_SIZE
+ );
+
+ gst::element_error!(element, gst::CoreError::Failed, [&details]);
+ return Err(error_msg!(gst::CoreError::Failed, [&details]));
+ }
+
+ if symbols_per_block > fecscheme::MAX_SOURCE_BLOCK_LEN {
+ let details = format!(
+ "Source block length exceeds Maximum Source Block Length: {}",
+ fecscheme::MAX_SOURCE_BLOCK_LEN
+ );
+
+ gst::element_error!(element, gst::CoreError::Failed, [&details]);
+ return Err(error_msg!(gst::CoreError::Failed, [&details]));
+ }
+
+ gst::info!(
+ CAT,
+ obj: element,
+ "Starting RaptorQ Encoder, Symbols per Block: {}, Symbol Size: {}",
+ symbols_per_block,
+ symbol_size
+ );
+
+ let plan = SourceBlockEncodingPlan::generate(symbols_per_block as u16);
+ let info = ObjectTransmissionInformation::new(0, symbol_size as u16, 1, 1, 8);
+
+ let segment = gst::FormattedSegment::<gst::ClockTime>::default();
+
+ *self.state.lock().unwrap() = Some(State {
+ info,
+ plan,
+ repair_packets_num,
+ protected_packets_num,
+ repair_window,
+ symbol_size,
+ symbols_per_packet,
+ symbols_per_block,
+ mtu,
+ pt,
+ segment,
+ seq: 0,
+ ssrc: 0,
+ packets: Vec::new(),
+ seqnums: Vec::new(),
+ clock_rate: None,
+ sender: None,
+ });
+
+ Ok(())
+ }
+
+ fn stop(&self, _element: &super::RaptorqEnc) -> Result<(), gst::ErrorMessage> {
+ let mut timers = self.pending_timers.lock().unwrap();
+ for timer in timers.drain() {
+ timer.unschedule();
+ }
+
+ // Drop state
+ let _ = self.state.lock().unwrap().take();
+ Ok(())
+ }
+}
+
+#[glib::object_subclass]
+impl ObjectSubclass for RaptorqEnc {
+ const NAME: &'static str = "GstRaptorqEnc";
+ type Type = super::RaptorqEnc;
+ type ParentType = gst::Element;
+
+ fn with_class(klass: &Self::Class) -> Self {
+ let templ = klass.pad_template("sink").unwrap();
+ let sinkpad = gst::Pad::builder_with_template(&templ, Some("sink"))
+ .chain_function(|pad, parent, buffer| {
+ Self::catch_panic_pad_function(
+ parent,
+ || Err(gst::FlowError::Error),
+ |this, element| this.sink_chain(pad, element, buffer),
+ )
+ })
+ .event_function(|pad, parent, event| {
+ Self::catch_panic_pad_function(
+ parent,
+ || false,
+ |this, element| this.sink_event(pad, element, event),
+ )
+ })
+ .iterate_internal_links_function(|pad, parent| {
+ Self::catch_panic_pad_function(
+ parent,
+ || gst::Iterator::from_vec(vec![]),
+ |this, element| this.iterate_internal_links(pad, element),
+ )
+ })
+ .flags(gst::PadFlags::PROXY_CAPS)
+ .build();
+
+ let templ = klass.pad_template("src").unwrap();
+ let srcpad = gst::Pad::builder_with_template(&templ, Some("src"))
+ .iterate_internal_links_function(|pad, parent| {
+ Self::catch_panic_pad_function(
+ parent,
+ || gst::Iterator::from_vec(vec![]),
+ |this, element| this.iterate_internal_links(pad, element),
+ )
+ })
+ .flags(gst::PadFlags::PROXY_CAPS)
+ .build();
+
+ let templ = klass.pad_template("fec_0").unwrap();
+ let srcpad_fec = gst::Pad::builder_with_template(&templ, Some("fec_0"))
+ .activatemode_function(move |pad, parent, mode, active| {
+ Self::catch_panic_pad_function(
+ parent,
+ || Err(loggable_error!(CAT, "Panic activating src pad with mode")),
+ |this, element| this.src_activatemode(pad, element, mode, active),
+ )
+ })
+ .iterate_internal_links_function(|pad, parent| {
+ Self::catch_panic_pad_function(
+ parent,
+ || gst::Iterator::from_vec(vec![]),
+ |this, element| this.iterate_internal_links(pad, element),
+ )
+ })
+ .build();
+
+ Self {
+ sinkpad,
+ srcpad,
+ srcpad_fec,
+ settings: Mutex::new(Default::default()),
+ state: Mutex::new(None),
+ pending_timers: Mutex::new(HashSet::new()),
+ }
+ }
+}
+
+impl ObjectImpl for RaptorqEnc {
+ fn properties() -> &'static [glib::ParamSpec] {
+ static PROPERTIES: Lazy<Vec<glib::ParamSpec>> = Lazy::new(|| {
+ vec![
+ glib::ParamSpecUInt::new(
+ "protected-packets",
+ "Protected Packets",
+ "Number of packets to protect together",
+ 1,
+ u32::MAX - 1,
+ DEFAULT_PROTECTED_PACKETS,
+ glib::ParamFlags::READWRITE | gst::PARAM_FLAG_MUTABLE_READY,
+ ),
+ glib::ParamSpecUInt::new(
+ "repair-packets",
+ "Repair Packets",
+ "Number of repair packets per block to send",
+ 1,
+ u32::MAX - 1,
+ DEFAULT_REPAIR_PACKETS,
+ glib::ParamFlags::READWRITE | gst::PARAM_FLAG_MUTABLE_READY,
+ ),
+ glib::ParamSpecUInt::new(
+ "repair-window",
+ "Repair Window",
+ "A time span in milliseconds in which repair packets are send",
+ 0,
+ u32::MAX - 1,
+ DEFAULT_REPAIR_PACKETS,
+ glib::ParamFlags::READWRITE | gst::PARAM_FLAG_MUTABLE_READY,
+ ),
+ glib::ParamSpecUInt::new(
+ "symbol-size",
+ "Symbol Size",
+ "Size of RaptorQ data unit",
+ 1,
+ u32::MAX - 1,
+ DEFAULT_SYMBOL_SIZE,
+ glib::ParamFlags::READWRITE | gst::PARAM_FLAG_MUTABLE_READY,
+ ),
+ glib::ParamSpecUInt::new(
+ // TODO: maybe change this to max-rtp-packet-size or max-media-packet-size
+ "mtu",
+ "MTU",
+ "Maximum expected packet size",
+ 0,
+ i32::MAX as u32,
+ DEFAULT_MTU,
+ glib::ParamFlags::READWRITE | gst::PARAM_FLAG_MUTABLE_READY,
+ ),
+ glib::ParamSpecUInt::new(
+ "pt",
+ "Payload Type",
+ "The payload type of FEC packets",
+ 96,
+ 255,
+ DEFAULT_PT,
+ glib::ParamFlags::READWRITE | gst::PARAM_FLAG_MUTABLE_READY,
+ ),
+ ]
+ });
+
+ PROPERTIES.as_ref()
+ }
+
+ fn set_property(
+ &self,
+ _obj: &Self::Type,
+ _id: usize,
+ value: &glib::Value,
+ pspec: &glib::ParamSpec,
+ ) {
+ match pspec.name() {
+ "protected-packets" => {
+ let mut settings = self.settings.lock().unwrap();
+ let protected_packets = value.get().expect("type checked upstream");
+ settings.protected_packets = protected_packets;
+ }
+ "repair-packets" => {
+ let mut settings = self.settings.lock().unwrap();
+ let repair_packets = value.get().expect("type checked upstream");
+ settings.repair_packets = repair_packets;
+ }
+ "repair-window" => {
+ let mut settings = self.settings.lock().unwrap();
+ let repair_window = value.get().expect("type checked upstream");
+ settings.repair_window = repair_window;
+ }
+ "symbol-size" => {
+ let mut settings = self.settings.lock().unwrap();
+ let symbol_size = value.get().expect("type checked upstream");
+ settings.symbol_size = symbol_size;
+ }
+ "mtu" => {
+ let mut settings = self.settings.lock().unwrap();
+ let mtu = value.get().expect("type checked upstream");
+ settings.mtu = mtu;
+ }
+ "pt" => {
+ let mut settings = self.settings.lock().unwrap();
+ let pt = value.get().expect("type checked upstream");
+ settings.pt = pt;
+ }
+ _ => unimplemented!(),
+ }
+ }
+
+ fn property(&self, _obj: &Self::Type, _id: usize, pspec: &glib::ParamSpec) -> glib::Value {
+ match pspec.name() {
+ "protected-packets" => {
+ let settings = self.settings.lock().unwrap();
+ settings.protected_packets.to_value()
+ }
+ "repair-packets" => {
+ let settings = self.settings.lock().unwrap();
+ settings.repair_packets.to_value()
+ }
+ "repair-window" => {
+ let settings = self.settings.lock().unwrap();
+ settings.repair_window.to_value()
+ }
+ "symbol-size" => {
+ let settings = self.settings.lock().unwrap();
+ settings.symbol_size.to_value()
+ }
+ "mtu" => {
+ let settings = self.settings.lock().unwrap();
+ settings.mtu.to_value()
+ }
+ "pt" => {
+ let settings = self.settings.lock().unwrap();
+ settings.pt.to_value()
+ }
+ _ => unimplemented!(),
+ }
+ }
+
+ fn constructed(&self, obj: &Self::Type) {
+ self.parent_constructed(obj);
+
+ obj.add_pad(&self.sinkpad).unwrap();
+ obj.add_pad(&self.srcpad).unwrap();
+ obj.add_pad(&self.srcpad_fec).unwrap();
+ }
+}
+
+impl GstObjectImpl for RaptorqEnc {}
+
+impl ElementImpl for RaptorqEnc {
+ fn metadata() -> Option<&'static gst::subclass::ElementMetadata> {
+ static ELEMENT_METADATA: Lazy<gst::subclass::ElementMetadata> = Lazy::new(|| {
+ gst::subclass::ElementMetadata::new(
+ "RTP RaptorQ FEC Encoder",
+ "RTP RaptorQ FEC Encoding",
+ "Performs FEC using RaptorQ (RFC6681, RFC6682)",
+ "Tomasz Andrzejak <andreiltd@gmail.com>",
+ )
+ });
+
+ Some(&*ELEMENT_METADATA)
+ }
+
+ fn pad_templates() -> &'static [gst::PadTemplate] {
+ static PAD_TEMPLATES: Lazy<Vec<gst::PadTemplate>> = Lazy::new(|| {
+ let caps = gst::Caps::builder("application/x-rtp")
+ .field("clock-rate", gst::IntRange::new(0, std::i32::MAX))
+ .build();
+
+ let srcpad_template = gst::PadTemplate::new(
+ "src",
+ gst::PadDirection::Src,
+ gst::PadPresence::Always,
+ &caps,
+ )
+ .unwrap();
+
+ let sinkpad_template = gst::PadTemplate::new(
+ "sink",
+ gst::PadDirection::Sink,
+ gst::PadPresence::Always,
+ &caps,
+ )
+ .unwrap();
+
+ let srcpad_fec_template = gst::PadTemplate::new(
+ "fec_0",
+ gst::PadDirection::Src,
+ gst::PadPresence::Always,
+ &caps,
+ )
+ .unwrap();
+
+ vec![srcpad_template, sinkpad_template, srcpad_fec_template]
+ });
+
+ PAD_TEMPLATES.as_ref()
+ }
+
+ fn change_state(
+ &self,
+ element: &Self::Type,
+ transition: gst::StateChange,
+ ) -> Result<gst::StateChangeSuccess, gst::StateChangeError> {
+ gst::trace!(CAT, obj: element, "Changing state {:?}", transition);
+
+ match transition {
+ gst::StateChange::ReadyToPaused => {
+ self.start(element).map_err(|_| gst::StateChangeError)?;
+ }
+ gst::StateChange::PausedToReady => {
+ self.stop(element).map_err(|_| gst::StateChangeError)?;
+ }
+ _ => (),
+ }
+
+ self.parent_change_state(element, transition)
+ }
+}
diff --git a/net/raptorq/src/raptorqenc/mod.rs b/net/raptorq/src/raptorqenc/mod.rs
new file mode 100644
index 000000000..7214d6ddd
--- /dev/null
+++ b/net/raptorq/src/raptorqenc/mod.rs
@@ -0,0 +1,23 @@
+// Copyright (C) 2022 Tomasz Andrzejak <andreiltd@gmail.com>
+//
+// This Source Code Form is subject to the terms of the Mozilla Public
+// License, v. 2.0. If a copy of the MPL was not distributed with this
+// file, You can obtain one at http://mozilla.org/MPL/2.0/.
+
+use gst::glib;
+use gst::prelude::*;
+
+mod imp;
+
+glib::wrapper! {
+ pub struct RaptorqEnc(ObjectSubclass<imp::RaptorqEnc>) @extends gst::Element, gst::Object;
+}
+
+pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> {
+ gst::Element::register(
+ Some(plugin),
+ "raptorqenc",
+ gst::Rank::Marginal,
+ RaptorqEnc::static_type(),
+ )
+}
diff --git a/net/raptorq/tests/raptorq.rs b/net/raptorq/tests/raptorq.rs
new file mode 100644
index 000000000..8595c3071
--- /dev/null
+++ b/net/raptorq/tests/raptorq.rs
@@ -0,0 +1,618 @@
+// Copyright (C) 2022 Tomasz Andrzejak <andreiltd@gmail.com>
+//
+// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0.
+// If a copy of the MPL was not distributed with this file, You can obtain one at
+// <https://mozilla.org/MPL/2.0/>.
+//
+// SPDX-License-Identifier: MPL-2.0
+
+use gst::prelude::*;
+
+use gst_rtp::rtp_buffer::*;
+use gst_rtp::RTPBuffer;
+
+use rand::Rng;
+
+#[must_use]
+struct RaptorqTest {
+ protected_packets: usize,
+ repair_packets: usize,
+ repair_window: usize,
+ symbol_size: usize,
+ mtu: usize,
+ initial_seq: u16,
+ lost_buffers: Vec<usize>,
+ swapped_buffers: Vec<usize>,
+ input_buffers: usize,
+ expect_output_buffers: usize,
+}
+
+fn init() {
+ use std::sync::Once;
+ static INIT: Once = Once::new();
+
+ INIT.call_once(|| {
+ gst::init().unwrap();
+ gstraptorq::plugin_register_static().expect("Failed to register raptorqenc plugin");
+ });
+}
+
+impl RaptorqTest {
+ fn new() -> Self {
+ init();
+
+ let enc = gst::ElementFactory::make("raptorqenc", None).unwrap();
+
+ let protected_packets = enc.property::<u32>("protected-packets") as usize;
+ let repair_packets = enc.property::<u32>("repair-packets") as usize;
+ let repair_window = enc.property::<u32>("repair-window") as usize;
+ let symbol_size = enc.property::<u32>("symbol-size") as usize;
+ let mtu = enc.property::<u32>("mtu") as usize;
+
+ Self {
+ protected_packets,
+ repair_packets,
+ repair_window,
+ symbol_size,
+ mtu,
+ initial_seq: 42,
+ lost_buffers: vec![0],
+ swapped_buffers: vec![],
+ input_buffers: protected_packets,
+ expect_output_buffers: protected_packets,
+ }
+ }
+
+ fn protected_packets(mut self, protected_packets: usize) -> Self {
+ self.protected_packets = protected_packets;
+ self
+ }
+
+ fn repair_packets(mut self, repair_packets: usize) -> Self {
+ self.repair_packets = repair_packets;
+ self
+ }
+
+ fn repair_window(mut self, repair_window: usize) -> Self {
+ self.repair_window = repair_window;
+ self
+ }
+
+ fn symbol_size(mut self, symbol_size: usize) -> Self {
+ self.symbol_size = symbol_size;
+ self
+ }
+
+ fn initial_seq(mut self, initial_seq: u16) -> Self {
+ self.initial_seq = initial_seq;
+ self
+ }
+
+ fn mtu(mut self, mtu: usize) -> Self {
+ self.mtu = mtu;
+ self
+ }
+
+ fn lost_buffers(mut self, lost_buffers: Vec<usize>) -> Self {
+ self.lost_buffers = lost_buffers;
+ self
+ }
+
+ fn swapped_buffers(mut self, swapped_buffers: Vec<usize>) -> Self {
+ self.swapped_buffers = swapped_buffers;
+ self
+ }
+
+ fn input_buffers(mut self, input_buffers: usize) -> Self {
+ self.input_buffers = input_buffers;
+ self
+ }
+
+ fn expect_output_buffers(mut self, expect_output_buffers: usize) -> Self {
+ self.expect_output_buffers = expect_output_buffers;
+ self
+ }
+
+ fn run(self) {
+ assert!(self.input_buffers >= self.protected_packets);
+
+ // 1. Decoder Setup:
+ let enc = gst::ElementFactory::make("raptorqenc", None).unwrap();
+
+ enc.set_property("protected-packets", self.protected_packets as u32);
+ enc.set_property("repair-packets", self.repair_packets as u32);
+ enc.set_property("repair-window", self.repair_window as u32);
+ enc.set_property("symbol-size", self.symbol_size as u32);
+ enc.set_property("mtu", self.mtu as u32);
+
+ let mut h_enc = gst_check::Harness::with_element(&enc, Some("sink"), Some("src"));
+ let mut h_enc_fec = gst_check::Harness::with_element(&enc, None, Some("fec_0"));
+
+ h_enc.set_src_caps_str("application/x-rtp,clock-rate=8000");
+
+ // 2. Decoder Setup:
+ let dec = gst::ElementFactory::make("raptorqdec", None).unwrap();
+
+ let mut h_dec = gst_check::Harness::with_element(&dec, Some("sink"), Some("src"));
+ let mut h_dec_fec = gst_check::Harness::with_element(&dec, Some("fec_0"), None);
+
+ let caps = gst::Caps::builder("application/x-rtp")
+ .field("raptor-scheme-id", "6")
+ .field("repair-window", "1000000")
+ .field("t", self.symbol_size.to_string())
+ .build();
+
+ h_dec.set_src_caps_str("application/x-rtp");
+ h_dec_fec.set_src_caps(caps);
+
+ let mut rng = rand::thread_rng();
+
+ let input_buffers = (0..self.input_buffers)
+ .map(|i| {
+ // payload size without RTP Header and ADUI Header
+ let size = rng.gen_range(1..self.mtu - 12 - 3);
+ let data = (0..size).map(|_| rng.gen()).collect::<Vec<u8>>();
+
+ let mut buf = gst::Buffer::new_rtp_with_sizes(size as u32, 0, 0).unwrap();
+ {
+ let buf_mut = buf.get_mut().unwrap();
+ buf_mut.set_pts(gst::ClockTime::ZERO);
+ buf_mut.set_dts(gst::ClockTime::ZERO);
+
+ let mut rtpbuf = RTPBuffer::from_buffer_writable(buf_mut).unwrap();
+ let payload = rtpbuf.payload_mut().unwrap();
+
+ payload.copy_from_slice(data.as_slice());
+ rtpbuf.set_seq(self.initial_seq.wrapping_add(i as u16));
+ rtpbuf.set_timestamp(0);
+ }
+
+ buf
+ })
+ .collect::<Vec<_>>();
+
+ // 3. Encoder Operations:
+
+ // Do not consume buffers here so we can compare it with the output
+ for buf in &input_buffers {
+ let result = h_enc.push(buf.clone());
+ assert!(result.is_ok());
+ }
+
+ assert_eq!(h_enc.buffers_in_queue(), self.input_buffers as u32);
+
+ let mut media_packets = (0..self.input_buffers)
+ .map(|_| {
+ let result = h_enc.pull();
+ assert!(result.is_ok());
+ result.unwrap()
+ })
+ .collect::<Vec<_>>();
+
+ // Simulate out of order packets
+ for x in self.swapped_buffers.chunks_exact(2) {
+ media_packets.swap(x[0], x[1])
+ }
+
+ // Check if repair packets pushed from encoder are delayed properly
+ let delay_step =
+ gst::ClockTime::from_mseconds((self.repair_window / self.repair_packets) as u64);
+ let mut delay = delay_step;
+
+ let repair_packets = (0..self.repair_packets)
+ .map(|_| {
+ // Set time just before the timer to push the buffer fires up,
+ // we shouldn't see the buffer just yet.
+ h_enc_fec.set_time(delay - gst::ClockTime::NSECOND).unwrap();
+ assert_eq!(h_enc_fec.buffers_in_queue(), 0);
+
+ // Advance time to the delay and crank clock id, we should
+ // get a buffer with adjusted timestamps now. All input buffers
+ // have zero timestamp, so the pts/dts/rtp-timestamp should be
+ // equal to delay.
+ h_enc_fec.set_time(delay).unwrap();
+ h_enc_fec.crank_single_clock_wait().unwrap();
+
+ let result = h_enc_fec.pull();
+ assert!(result.is_ok());
+
+ let buf = result.unwrap();
+ assert_eq!(buf.pts().unwrap(), delay);
+ assert_eq!(buf.dts().unwrap(), delay);
+
+ let ts = RTPBuffer::from_buffer_readable(&buf).unwrap().timestamp();
+ let expected_ts =
+ *delay.mul_div_round(*gst::ClockTime::SECOND, 8000).unwrap() as u32;
+
+ assert_eq!(ts, expected_ts);
+
+ delay += delay_step;
+ buf
+ })
+ .collect::<Vec<_>>();
+
+ // 4. Decoder Operations:
+
+ // remove media packets to simulate packet loss
+ let media_packets = media_packets
+ .iter()
+ .cloned()
+ .enumerate()
+ .filter(|(i, _)| !self.lost_buffers.contains(i))
+ .map(|(_, x)| x)
+ .collect::<Vec<_>>();
+
+ // Push media packets to decoder
+ for buf in media_packets {
+ assert!(h_dec.push(buf).is_ok());
+ }
+
+ // Push repair packets to decoder
+ for buf in repair_packets {
+ assert!(h_dec_fec.push(buf).is_ok());
+ }
+
+ // At this point decoder has all the information it needs to
+ // recover packets, we just need an input buffer to run sink
+ // chain operations.
+ let result = h_dec.push(input_buffers.iter().last().unwrap().clone());
+ assert!(result.is_ok());
+
+ let mut output_buffers = (0..self.expect_output_buffers)
+ .map(|_| {
+ let result = h_dec.pull();
+ assert!(result.is_ok());
+ result.unwrap()
+ })
+ .collect::<Vec<_>>();
+
+ // Output buffers are out of sequence, we should sort it by
+ // seqnum so we can compare them with input buffers.
+ output_buffers.sort_unstable_by(|a, b| {
+ let aa = RTPBuffer::from_buffer_readable(a).unwrap();
+ let bb = RTPBuffer::from_buffer_readable(b).unwrap();
+
+ match gst_rtp::compare_seqnum(bb.seq(), aa.seq()) {
+ x if x > 0 => std::cmp::Ordering::Greater,
+ x if x < 0 => std::cmp::Ordering::Less,
+ _ => std::cmp::Ordering::Equal,
+ }
+ });
+
+ assert_eq!(output_buffers.len(), self.expect_output_buffers);
+
+ if self.input_buffers == self.expect_output_buffers {
+ for (inbuf, outbuf) in Iterator::zip(input_buffers.iter(), output_buffers.iter()) {
+ let rtp1 = RTPBuffer::from_buffer_readable(inbuf).unwrap();
+ let rtp2 = RTPBuffer::from_buffer_readable(outbuf).unwrap();
+
+ assert_eq!(rtp1.seq(), rtp2.seq());
+ assert_eq!(rtp1.payload().unwrap(), rtp2.payload().unwrap());
+ }
+ }
+ }
+}
+
+#[test]
+fn test_raptorq_all_default() {
+ RaptorqTest::new().run();
+}
+
+#[test]
+fn test_raptorq_decoder_media_packets_out_of_sequence() {
+ RaptorqTest::new()
+ .swapped_buffers(vec![5, 10, 12, 15])
+ .run();
+}
+
+#[test]
+fn test_raptorq_10_percent_overhead() {
+ RaptorqTest::new()
+ .protected_packets(100)
+ .repair_packets(10)
+ .lost_buffers(vec![4, 42, 43, 44, 45])
+ .input_buffers(100)
+ .expect_output_buffers(100)
+ .run();
+}
+
+#[test]
+fn test_raptorq_5_percent_overhead() {
+ RaptorqTest::new()
+ .protected_packets(100)
+ .repair_packets(5)
+ .input_buffers(100)
+ .lost_buffers(vec![8, 11])
+ .expect_output_buffers(100)
+ .run();
+}
+
+#[test]
+fn test_raptorq_symbol_size_128() {
+ RaptorqTest::new()
+ .protected_packets(20)
+ .repair_packets(4)
+ .symbol_size(128)
+ .mtu(400)
+ .input_buffers(20)
+ .lost_buffers(vec![9])
+ .expect_output_buffers(20)
+ .run();
+}
+
+#[test]
+fn test_raptorq_symbol_size_192() {
+ RaptorqTest::new()
+ .protected_packets(20)
+ .repair_packets(4)
+ .symbol_size(192)
+ .mtu(999)
+ .input_buffers(20)
+ .lost_buffers(vec![16, 19])
+ .expect_output_buffers(20)
+ .run();
+}
+
+#[test]
+fn test_raptorq_symbol_size_1024() {
+ RaptorqTest::new()
+ .protected_packets(20)
+ .repair_packets(8)
+ .symbol_size(192)
+ .mtu(100)
+ .input_buffers(20)
+ .lost_buffers(vec![0, 1, 2, 3, 4, 5])
+ .expect_output_buffers(20)
+ .run();
+}
+
+#[test]
+fn test_raptorq_mtu_lt_symbol_size() {
+ RaptorqTest::new()
+ .protected_packets(20)
+ .repair_packets(8)
+ .symbol_size(1400)
+ .mtu(100)
+ .input_buffers(20)
+ .lost_buffers(vec![14, 15, 16, 17, 18, 19])
+ .expect_output_buffers(20)
+ .run();
+}
+
+#[test]
+fn test_raptorq_heavy_loss() {
+ RaptorqTest::new()
+ .protected_packets(40)
+ .repair_packets(8)
+ .input_buffers(40)
+ .lost_buffers(vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9])
+ .expect_output_buffers(30)
+ .run();
+}
+
+#[test]
+fn test_raptorq_repair_window_100ms() {
+ RaptorqTest::new()
+ .protected_packets(10)
+ .repair_packets(10)
+ .repair_window(100)
+ .input_buffers(10)
+ .lost_buffers(vec![2, 6])
+ .expect_output_buffers(10)
+ .run();
+}
+
+#[test]
+fn test_raptorq_repair_window_500ms() {
+ RaptorqTest::new()
+ .protected_packets(8)
+ .repair_packets(2)
+ .repair_window(500)
+ .input_buffers(8)
+ .lost_buffers(vec![])
+ .expect_output_buffers(8)
+ .run();
+}
+
+#[test]
+fn test_raptorq_wrapping_sequence_number_1() {
+ RaptorqTest::new().initial_seq(u16::MAX - 5).run();
+}
+
+#[test]
+fn test_raptorq_wrapping_sequence_number_2() {
+ RaptorqTest::new()
+ .initial_seq(u16::MAX - 5)
+ .swapped_buffers(vec![4, 5])
+ .run();
+}
+
+#[test]
+fn test_raptorq_wrapping_sequence_number_3() {
+ RaptorqTest::new()
+ .initial_seq(u16::MAX - 3)
+ .lost_buffers(vec![0, 1, 2, 8])
+ .run();
+}
+
+#[test]
+fn test_raptorq_encoder_flush_cancels_pending_timers() {
+ init();
+
+ let enc = gst::ElementFactory::make("raptorqenc", None).unwrap();
+
+ // Set delay to 5s, this way each buffer should be delayed by 1s
+ enc.set_property("repair-window", 5000u32);
+ enc.set_property("protected-packets", 5u32);
+ enc.set_property("repair-packets", 5u32);
+
+ let mut h_enc = gst_check::Harness::with_element(&enc, Some("sink"), Some("src"));
+ let mut h_enc_fec = gst_check::Harness::with_element(&enc, None, Some("fec_0"));
+
+ h_enc.set_src_caps_str("application/x-rtp,clock-rate=8000");
+
+ for i in 0u64..5 {
+ let mut buf = gst::Buffer::new_rtp_with_sizes(42, 0, 0).unwrap();
+
+ let buf_mut = buf.get_mut().unwrap();
+ buf_mut.set_pts(gst::ClockTime::SECOND * i);
+
+ let mut rtpbuf = RTPBuffer::from_buffer_writable(buf_mut).unwrap();
+ rtpbuf.set_seq(i as u16);
+
+ drop(rtpbuf);
+
+ let result = h_enc.push(buf);
+ assert!(result.is_ok());
+ }
+
+ // We want to check if flush cancels pending timers, last buffer of source
+ // block is at 5s, at 6s we should have 1 buffer qeued already, then we flush
+ // and move time to 10s. Flush should cancel pending timers and we should
+ // have no buffers at the output
+ h_enc_fec.set_time(gst::ClockTime::SECOND * 6).unwrap();
+ h_enc_fec.crank_single_clock_wait().unwrap();
+
+ let result = h_enc_fec.pull();
+ assert!(result.is_ok());
+
+ h_enc.push_event(gst::event::FlushStart::new());
+ h_enc.push_event(gst::event::FlushStop::new(true));
+
+ h_enc_fec.set_time(gst::ClockTime::SECOND * 10).unwrap();
+
+ loop {
+ let event = h_enc.pull_event();
+
+ if let Ok(event) = event {
+ match event.view() {
+ gst::EventView::FlushStart(_) => {
+ continue;
+ }
+ gst::EventView::FlushStop(_) => {
+ break;
+ }
+ _ => (),
+ }
+ }
+ }
+
+ assert_eq!(h_enc_fec.buffers_in_queue(), 0);
+ assert_eq!(h_enc_fec.testclock().unwrap().peek_id_count(), 0);
+}
+
+#[test]
+fn test_raptorq_repair_window_tolerance() {
+ init();
+
+ let enc = gst::ElementFactory::make("raptorqenc", None).unwrap();
+
+ // Set delay to 5s, this way each buffer should be delayed by 1s
+ enc.set_property("repair-window", 1000u32);
+ enc.set_property("protected-packets", 5u32);
+ enc.set_property("repair-packets", 5u32);
+
+ let mut h_enc = gst_check::Harness::with_element(&enc, Some("sink"), Some("src"));
+ let mut h_enc_fec = gst_check::Harness::with_element(&enc, None, Some("fec_0"));
+
+ h_enc.set_src_caps_str("application/x-rtp,clock-rate=8000");
+
+ for i in 0u64..5 {
+ let mut buf = gst::Buffer::new_rtp_with_sizes(42, 0, 0).unwrap();
+
+ let buf_mut = buf.get_mut().unwrap();
+ buf_mut.set_pts(gst::ClockTime::SECOND * i);
+
+ let mut rtpbuf = RTPBuffer::from_buffer_writable(buf_mut).unwrap();
+ rtpbuf.set_seq(i as u16);
+
+ drop(rtpbuf);
+
+ let result = h_enc.push(buf);
+ assert!(result.is_ok());
+ }
+
+ let dec = gst::ElementFactory::make("raptorqdec", None).unwrap();
+
+ dec.set_property("repair-window-tolerance", 1000u32);
+
+ let mut h_dec = gst_check::Harness::with_element(&dec, Some("sink"), Some("src"));
+ let mut h_dec_fec = gst_check::Harness::with_element(&dec, Some("fec_0"), None);
+
+ let caps = loop {
+ let event = h_enc_fec.pull_event();
+
+ if let Ok(event) = event {
+ #[allow(clippy::single_match)]
+ match event.view() {
+ gst::EventView::Caps(c) => {
+ break c.caps_owned();
+ }
+ _ => (),
+ }
+ }
+ };
+
+ h_dec.set_src_caps_str("application/x-rtp");
+ h_dec_fec.set_src_caps(caps);
+
+ h_enc_fec.set_time(gst::ClockTime::from_seconds(1)).unwrap();
+
+ let result = h_enc.pull();
+ assert!(result.is_ok());
+
+ let buf = result.unwrap();
+ let result = h_dec.push(buf);
+ assert!(result.is_ok());
+
+ // Push some of repair packets to decoder, just not enough to recover
+ // media packets
+ for _ in 0..2 {
+ h_enc_fec.crank_single_clock_wait().unwrap();
+
+ let result = h_enc_fec.pull();
+ assert!(result.is_ok());
+
+ let buf = result.unwrap();
+ let result = h_dec_fec.push(buf);
+ assert!(result.is_ok());
+ }
+
+ let stats = h_dec.element().unwrap().property::<gst::Structure>("stats");
+ assert_eq!(
+ stats
+ .get::<u64>("buffered-media-packets")
+ .expect("type error"),
+ 1
+ );
+ assert_eq!(
+ stats
+ .get::<u64>("buffered-repair-packets")
+ .expect("type error"),
+ 2
+ );
+
+ // Media buffer is way beyond repair window which is 2 seconds,
+ // (repair_window (1s) + repair_window_tolerance (1s)),
+ // the decoder should drop buffered packets as they were kept for too long.
+ let mut buf = gst::Buffer::new_rtp_with_sizes(42, 0, 0).unwrap();
+ let buf_mut = buf.get_mut().unwrap();
+ buf_mut.set_pts(gst::ClockTime::SECOND * 10);
+
+ let result = h_dec.push(buf);
+ assert!(result.is_ok());
+
+ let stats = h_dec.element().unwrap().property::<gst::Structure>("stats");
+ assert_eq!(
+ stats
+ .get::<u64>("buffered-media-packets")
+ .expect("type error"),
+ 0
+ );
+ assert_eq!(
+ stats
+ .get::<u64>("buffered-repair-packets")
+ .expect("type error"),
+ 0
+ );
+}