Welcome to mirror list, hosted at ThFree Co, Russian Federation.

github.com/sdroege/gst-plugin-rs.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMaksym Khomenko <maksym.khomenko@skelia.partners>2023-11-08 03:23:30 +0300
committerGStreamer Marge Bot <gitlab-merge-bot@gstreamer-foundation.org>2023-12-23 11:02:08 +0300
commit17f0b6157685e7ff0d8893a453edd332c4a3d3e5 (patch)
tree536e5dc5bdbff10863f766877aea75f5e93d70b9
parent1ef47cb48e9eab22031298b288ce451506be3a33 (diff)
webrtcsink: add payloader-setup signal
Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/1389>
-rw-r--r--docs/plugins/gst_plugins_cache.json18
-rw-r--r--net/webrtc/src/utils.rs64
-rw-r--r--net/webrtc/src/webrtcsink/imp.rs285
3 files changed, 280 insertions, 87 deletions
diff --git a/docs/plugins/gst_plugins_cache.json b/docs/plugins/gst_plugins_cache.json
index 1b9febb0..cf55efd7 100644
--- a/docs/plugins/gst_plugins_cache.json
+++ b/docs/plugins/gst_plugins_cache.json
@@ -7038,6 +7038,24 @@
"return-type": "GStrv",
"when": "last"
},
+ "payloader-setup": {
+ "args": [
+ {
+ "name": "arg0",
+ "type": "gchararray"
+ },
+ {
+ "name": "arg1",
+ "type": "gchararray"
+ },
+ {
+ "name": "arg2",
+ "type": "GstElement"
+ }
+ ],
+ "return-type": "gboolean",
+ "when": "last"
+ },
"request-encoded-filter": {
"args": [
{
diff --git a/net/webrtc/src/utils.rs b/net/webrtc/src/utils.rs
index 808bc09a..46176ec6 100644
--- a/net/webrtc/src/utils.rs
+++ b/net/webrtc/src/utils.rs
@@ -1,5 +1,5 @@
use std::{
- collections::{BTreeMap, HashMap},
+ collections::{BTreeMap, HashMap, HashSet},
ops::Deref,
sync::atomic::{AtomicBool, Ordering},
};
@@ -611,28 +611,10 @@ impl Codec {
})
}
- pub fn build_payloader(&self, pt: u32) -> Option<gst::Element> {
- self.encoding_info.as_ref().map(|info| {
- let mut res = info
- .payloader
- .create()
- .property("mtu", 1200_u32)
- .property("pt", pt);
-
- match info.payloader.name().as_str() {
- "rtpvp8pay" | "rtpvp9pay" => {
- res = res.property_from_str("picture-id-mode", "15-bit");
- }
- "rtph264pay" | "rtph265pay" => {
- res = res
- .property_from_str("aggregate-mode", "zero-latency")
- .property("config-interval", -1i32);
- }
- _ => (),
- }
-
- res.build().unwrap()
- })
+ pub fn create_payloader(&self) -> Option<gst::Element> {
+ self.encoding_info
+ .as_ref()
+ .map(|info| info.payloader.create().build().unwrap())
}
pub fn raw_converter_filter(&self) -> Result<gst::Element, Error> {
@@ -938,3 +920,39 @@ pub struct NavigationEvent {
#[serde(flatten)]
pub event: gst_video::NavigationEvent,
}
+
+pub fn find_smallest_available_ext_id(ids: impl IntoIterator<Item = u32>) -> u32 {
+ let used_numbers: HashSet<_> = ids.into_iter().collect();
+ (1..).find(|&num| !used_numbers.contains(&num)).unwrap()
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ fn test_find_smallest_available_ext_id_case(
+ ids: impl IntoIterator<Item = u32>,
+ expected: u32,
+ ) -> Result<(), String> {
+ let actual = find_smallest_available_ext_id(ids);
+
+ if actual != expected {
+ return Err(format!("Expected {}, got {}", expected, actual));
+ }
+
+ Ok(())
+ }
+
+ #[test]
+ fn test_find_smallest_available_ext_id() -> Result<(), String> {
+ [
+ (vec![], 1u32),
+ (vec![2u32, 3u32, 4u32], 1u32),
+ (vec![1u32, 3u32, 4u32], 2u32),
+ (vec![4u32, 1u32, 3u32], 2u32),
+ (vec![1u32, 2u32, 3u32], 4u32),
+ ]
+ .into_iter()
+ .try_for_each(|(input, expected)| test_find_smallest_available_ext_id_case(input, expected))
+ }
+}
diff --git a/net/webrtc/src/webrtcsink/imp.rs b/net/webrtc/src/webrtcsink/imp.rs
index 85195da5..8371bd5d 100644
--- a/net/webrtc/src/webrtcsink/imp.rs
+++ b/net/webrtc/src/webrtcsink/imp.rs
@@ -25,7 +25,7 @@ use crate::aws_kvs_signaller::AwsKvsSignaller;
use crate::livekit_signaller::LiveKitSignaller;
use crate::signaller::{prelude::*, Signallable, Signaller, WebRTCSignallerRole};
use crate::whip_signaller::WhipClientSignaller;
-use crate::RUNTIME;
+use crate::{utils, RUNTIME};
use std::collections::{BTreeMap, HashSet};
static CAT: Lazy<gst::DebugCategory> = Lazy::new(|| {
@@ -749,6 +749,36 @@ fn configure_encoder(enc: &gst::Element, start_bitrate: u32) {
}
}
+/// Default configuration for known payloaders, can be disabled
+/// by returning True from an payloader-setup handler.
+fn configure_payloader(pay: &gst::Element) {
+ pay.set_property("mtu", 1200_u32);
+
+ match pay.factory().unwrap().name().as_str() {
+ "rtpvp8pay" | "rtpvp9pay" => {
+ pay.set_property_from_str("picture-id-mode", "15-bit");
+ }
+ "rtph264pay" | "rtph265pay" => {
+ pay.set_property_from_str("aggregate-mode", "zero-latency");
+ pay.set_property("config-interval", -1i32);
+ }
+ _ => (),
+ }
+}
+
+fn setup_signal_accumulator(
+ _hint: &glib::subclass::SignalInvocationHint,
+ ret: &mut glib::Value,
+ value: &glib::Value,
+) -> bool {
+ let is_configured = value.get::<bool>().unwrap();
+ let continue_emission = !is_configured;
+
+ *ret = value.clone();
+
+ continue_emission
+}
+
/// Set of elements used in an EncodingChain
struct EncodingChain {
raw_filter: Option<gst::Element>,
@@ -756,22 +786,24 @@ struct EncodingChain {
pay_filter: gst::Element,
}
-struct EncodingChainBuilder {
+/// A set of elements that transform raw data into RTP packets
+struct PayloadChain {
+ encoding_chain: EncodingChain,
+ payloader: gst::Element,
+}
+
+struct PayloadChainBuilder {
/// Caps of the input chain
input_caps: gst::Caps,
- //// Caps expected after the payloader
+ /// Caps expected after the payloader
output_caps: gst::Caps,
/// The Codec representing wanted encoding
codec: Codec,
- /// The SSRC to use for the RTP stream if any
/// Filter element between the encoder and the payloader.
encoded_filter: Option<gst::Element>,
- ssrc: Option<u32>,
- /// The TWCC ID to use for payloaded stream
- twcc: Option<u32>,
}
-impl EncodingChainBuilder {
+impl PayloadChainBuilder {
fn new(
input_caps: &gst::Caps,
output_caps: &gst::Caps,
@@ -783,31 +815,18 @@ impl EncodingChainBuilder {
output_caps: output_caps.clone(),
codec: codec.clone(),
encoded_filter,
- ssrc: None,
- twcc: None,
}
}
- fn ssrc(mut self, ssrc: u32) -> Self {
- self.ssrc = Some(ssrc);
- self
- }
-
- fn twcc(mut self, twcc: u32) -> Self {
- self.twcc = Some(twcc);
- self
- }
-
- fn build(self, pipeline: &gst::Pipeline, src: &gst::Element) -> Result<EncodingChain, Error> {
+ fn build(self, pipeline: &gst::Pipeline, src: &gst::Element) -> Result<PayloadChain, Error> {
gst::trace!(
CAT,
obj: pipeline,
"Setting up encoding, input caps: {input_caps}, \
- output caps: {output_caps}, codec: {codec:?}, twcc: {twcc:?}",
+ output caps: {output_caps}, codec: {codec:?}",
input_caps = self.input_caps,
output_caps = self.output_caps,
codec = self.codec,
- twcc = self.twcc,
);
let needs_encoding = is_raw_caps(&self.input_caps);
@@ -856,33 +875,10 @@ impl EncodingChainBuilder {
let pay = self
.codec
- .build_payloader(
- self.codec
- .payload()
- .expect("Negotiated codec should always have pt set") as u32,
- )
+ .create_payloader()
.expect("Payloaders should always have been set in the CodecInfo we handle");
- if let Some(ssrc) = self.ssrc {
- pay.set_property("ssrc", ssrc);
- }
-
- /* We only enforce TWCC in the offer caps, once a remote description
- * has been set it will get automatically negotiated. This is necessary
- * because the implementor in Firefox had apparently not understood the
- * concept of *transport-wide* congestion control, and firefox doesn't
- * provide feedback for audio packets.
- */
- if let Some(idx) = self.twcc {
- if let Some(twcc_extension) = gst_rtp::RTPHeaderExtension::create_from_uri(RTP_TWCC_URI)
- {
- twcc_extension.set_id(idx);
- pay.emit_by_name::<()>("add-extension", &[&twcc_extension]);
- } else {
- anyhow::bail!("Failed to add TWCC extension, make sure 'gst-plugins-good:rtpmanager' is installed");
- }
- }
- elements.push(pay);
+ elements.push(pay.clone());
let pay_filter = gst::ElementFactory::make("capsfilter")
.property("caps", self.output_caps)
@@ -898,10 +894,13 @@ impl EncodingChainBuilder {
gst::Element::link_many(elements.iter().collect::<Vec<&gst::Element>>().as_slice())
.with_context(|| "Linking encoding elements")?;
- Ok(EncodingChain {
- raw_filter,
- encoder,
- pay_filter,
+ Ok(PayloadChain {
+ encoding_chain: EncodingChain {
+ raw_filter,
+ encoder,
+ pay_filter,
+ },
+ payloader: pay,
})
}
}
@@ -1236,7 +1235,10 @@ impl Session {
let output_caps = codec.output_filter().unwrap_or_else(gst::Caps::new_any);
- let encoding_chain = EncodingChainBuilder::new(
+ let PayloadChain {
+ payloader,
+ encoding_chain,
+ } = PayloadChainBuilder::new(
&webrtc_pad.in_caps,
&output_caps,
&codec,
@@ -1245,13 +1247,21 @@ impl Session {
&[&Some(&self.peer_id), &stream_name, &codec.caps],
),
)
- .ssrc(webrtc_pad.ssrc)
.build(&self.pipeline, &appsrc)?;
if let Some(ref enc) = encoding_chain.encoder {
element.emit_by_name::<bool>("encoder-setup", &[&self.peer_id, &stream_name, &enc]);
}
+ element.imp().configure_payloader(
+ &self.peer_id,
+ stream_name,
+ &payloader,
+ &codec,
+ Some(webrtc_pad.ssrc),
+ ExtensionConfigurationType::Skip,
+ )?;
+
// At this point, the peer has provided its answer, and we want to
// let the payloader / encoder perform negotiation according to that.
//
@@ -1448,7 +1458,103 @@ impl NavigationEventHandler {
}
}
+/// How to configure RTP extensions for payloaders, if at all
+enum ExtensionConfigurationType {
+ /// Skip configuration, do not add any extensions
+ Skip,
+ /// Configure extensions and assign IDs automatically, based on already enabled extensions
+ Auto,
+ /// Configure extensions, use specific ids that were provided
+ Apply { twcc_id: u32 },
+}
+
impl BaseWebRTCSink {
+ fn configure_congestion_control(
+ &self,
+ payloader: &gst::Element,
+ extension_configuration_type: ExtensionConfigurationType,
+ ) -> Result<(), Error> {
+ if let ExtensionConfigurationType::Skip = extension_configuration_type {
+ return Ok(());
+ }
+
+ let settings = self.settings.lock().unwrap();
+
+ if settings.cc_info.heuristic == WebRTCSinkCongestionControl::Disabled {
+ return Ok(());
+ }
+
+ let enabled_extensions: gst::Array = payloader.property("extensions");
+ let twcc = enabled_extensions
+ .iter()
+ .find(|value| {
+ let value = value.get::<gst_rtp::RTPHeaderExtension>().unwrap();
+
+ match value.uri() {
+ Some(v) => v == RTP_TWCC_URI,
+ None => false,
+ }
+ })
+ .map(|value| value.get::<gst_rtp::RTPHeaderExtension>().unwrap());
+
+ if let Some(ext) = twcc {
+ gst::debug!(CAT, obj: payloader, "TWCC extension is already mapped to id {} by application", ext.id());
+ return Ok(());
+ }
+
+ let twcc_id = match extension_configuration_type {
+ ExtensionConfigurationType::Auto => utils::find_smallest_available_ext_id(
+ enabled_extensions
+ .iter()
+ .map(|value| value.get::<gst_rtp::RTPHeaderExtension>().unwrap().id()),
+ ),
+ ExtensionConfigurationType::Apply { twcc_id } => twcc_id,
+ ExtensionConfigurationType::Skip => unreachable!(),
+ };
+ gst::debug!(CAT, obj: payloader, "Mapping TWCC extension to ID {}", twcc_id);
+
+ /* We only enforce TWCC in the offer caps, once a remote description
+ * has been set it will get automatically negotiated. This is necessary
+ * because the implementor in Firefox had apparently not understood the
+ * concept of *transport-wide* congestion control, and firefox doesn't
+ * provide feedback for audio packets.
+ */
+ if let Some(twcc_extension) = gst_rtp::RTPHeaderExtension::create_from_uri(RTP_TWCC_URI) {
+ twcc_extension.set_id(twcc_id);
+ payloader.emit_by_name::<()>("add-extension", &[&twcc_extension]);
+ } else {
+ anyhow::bail!("Failed to add TWCC extension, make sure 'gst-plugins-good:rtpmanager' is installed");
+ }
+
+ Ok(())
+ }
+
+ fn configure_payloader(
+ &self,
+ peer_id: &str,
+ stream_name: &str,
+ payloader: &gst::Element,
+ codec: &Codec,
+ ssrc: Option<u32>,
+ extension_configuration_type: ExtensionConfigurationType,
+ ) -> Result<(), Error> {
+ self.obj()
+ .emit_by_name::<bool>("payloader-setup", &[&peer_id, &stream_name, &payloader]);
+
+ payloader.set_property(
+ "pt",
+ codec
+ .payload()
+ .expect("Negotiated codec should always have pt set") as u32,
+ );
+
+ if let Some(ssrc) = ssrc {
+ payloader.set_property("ssrc", ssrc);
+ }
+
+ self.configure_congestion_control(payloader, extension_configuration_type)
+ }
+
fn generate_ssrc(
element: &super::BaseWebRTCSink,
webrtc_pads: &HashMap<u32, WebRTCPad>,
@@ -2026,6 +2132,10 @@ impl BaseWebRTCSink {
.iter()
.flat_map(|(_, codecs_and_caps)| codecs_and_caps)
.map(|(codec, caps)| async move {
+ let extension_configuration_type = twcc_idx
+ .map(|twcc_id| ExtensionConfigurationType::Apply { twcc_id })
+ .unwrap_or(ExtensionConfigurationType::Skip);
+
BaseWebRTCSink::run_discovery_pipeline(
element,
stream_name,
@@ -2033,7 +2143,7 @@ impl BaseWebRTCSink {
codec.clone(),
in_caps.clone(),
caps,
- twcc_idx,
+ extension_configuration_type,
)
.await
.map(|s| {
@@ -2999,7 +3109,7 @@ impl BaseWebRTCSink {
codec: Codec,
input_caps: gst::Caps,
output_caps: &gst::Caps,
- twcc: Option<u32>,
+ extension_configuration_type: ExtensionConfigurationType,
) -> Result<gst::Structure, Error> {
let pipe = PipelineWrapper(gst::Pipeline::default());
@@ -3029,7 +3139,7 @@ impl BaseWebRTCSink {
gst::Element::link_many(elements_slice)
.with_context(|| format!("Running discovery pipeline for caps {input_caps}"))?;
- let mut encoding_chain_builder = EncodingChainBuilder::new(
+ let payload_chain_builder = PayloadChainBuilder::new(
&src.caps()
.expect("Caps should always be set when starting discovery"),
output_caps,
@@ -3039,10 +3149,11 @@ impl BaseWebRTCSink {
&[&Option::<String>::None, &stream_name, &codec.caps],
),
);
- if let Some(twcc) = twcc {
- encoding_chain_builder = encoding_chain_builder.twcc(twcc)
- }
- let encoding_chain = encoding_chain_builder.build(&pipe.0, &encoding_chain_src)?;
+
+ let PayloadChain {
+ payloader,
+ encoding_chain,
+ } = payload_chain_builder.build(&pipe.0, &encoding_chain_src)?;
if let Some(ref enc) = encoding_chain.encoder {
element.emit_by_name::<bool>(
@@ -3051,6 +3162,15 @@ impl BaseWebRTCSink {
);
}
+ element.imp().configure_payloader(
+ "discovery",
+ stream_name,
+ &payloader,
+ &codec,
+ None,
+ extension_configuration_type,
+ )?;
+
let sink = gst_app::AppSink::builder()
.callbacks(
gst_app::AppSinkCallbacks::builder()
@@ -3196,7 +3316,7 @@ impl BaseWebRTCSink {
codec,
caps,
&output_caps,
- Some(1),
+ ExtensionConfigurationType::Auto,
)]
} else {
let sink_caps = discovery_info.caps.clone();
@@ -3218,7 +3338,7 @@ impl BaseWebRTCSink {
codec.clone(),
sink_caps.clone(),
&output_caps,
- Some(1),
+ ExtensionConfigurationType::Auto,
)
})
.collect()
@@ -3775,7 +3895,7 @@ impl ObjectImpl for BaseWebRTCSink {
gst::Element::static_type(),
])
.return_type::<bool>()
- .accumulator(|_hint, _ret, value| !value.get::<bool>().unwrap())
+ .accumulator(setup_signal_accumulator)
.class_handler(|_, args| {
let element = args[0].get::<super::BaseWebRTCSink>().expect("signal arg");
let enc = args[3].get::<gst::Element>().unwrap();
@@ -3796,6 +3916,43 @@ impl ObjectImpl for BaseWebRTCSink {
})
.build(),
/**
+ * RsBaseWebRTCSink::payloader-setup:
+ * @consumer_id: Identifier of the consumer, or "discovery"
+ * when the payloader is used in a discovery pipeline.
+ * @pad_name: The name of the corresponding input pad
+ * @payloader: The constructed payloader for selected codec
+ *
+ * This signal can be used to tweak @payloader properties, in particular, adding
+ * additional extensions.
+ *
+ * Note that payload type and ssrc settings are managed by webrtcsink element and
+ * trying to change them from an application handler will have no effect.
+ *
+ * Returns: True if the encoder is entirely configured,
+ * False to let other handlers run. Note that unless your intent is to enforce
+ * your custom settings, it's recommended to let the default handler run
+ * (by returning true), which would apply the optimal settings.
+ */
+ glib::subclass::Signal::builder("payloader-setup")
+ .param_types([
+ String::static_type(),
+ String::static_type(),
+ gst::Element::static_type(),
+ ])
+ .return_type::<bool>()
+ .accumulator(setup_signal_accumulator)
+ .class_handler(|_, args| {
+ let pay = args[3].get::<gst::Element>().unwrap();
+
+ configure_payloader(&pay);
+
+ // The default handler is no-op: the whole configuration logic happens
+ // in BaseWebRTCSink::configure_payloader, which is where this signal
+ // is invoked from
+ Some(false.to_value())
+ })
+ .build(),
+ /**
* RsWebRTCSink::request-encoded-filter:
* @consumer_id: Identifier of the consumer
* @pad_name: The name of the corresponding input pad