Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.freedesktop.org/gstreamer/gst-plugins-rs.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
path: root/net
diff options
context:
space:
mode:
authorSebastian Dröge <sebastian@centricular.com>2023-10-18 21:02:55 +0300
committerSebastian Dröge <sebastian@centricular.com>2023-11-13 14:22:48 +0300
commit39155ef81cebc2fd72ce0cf06bc37d42598a8be7 (patch)
treec810f300b0128c6bdbd741af7819989326654357 /net
parent2afffb39ddc76d6cfb06eeed9269b07c253b1040 (diff)
ndisrc: Implement zerocopy handling for the received frames if possible
Also move processing from the capture thread to the streaming thread. The NDI SDK can cause frame drops if not reading fast enough from it. All frame processing is now handled inside the ndisrcdemux. Also use a buffer pool for video if copying is necessary. Additionally, make sure to use different stream ids in the stream-start event for the audio and video pad. This plugin now requires GStreamer 1.16 or newer. Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/1365>
Diffstat (limited to 'net')
-rw-r--r--net/ndi/Cargo.toml13
-rw-r--r--net/ndi/src/lib.rs3
-rw-r--r--net/ndi/src/ndi.rs4
-rw-r--r--net/ndi/src/ndi_cc_meta.rs115
-rw-r--r--net/ndi/src/ndisrc/imp.rs120
-rw-r--r--net/ndi/src/ndisrc/receiver.rs1514
-rw-r--r--net/ndi/src/ndisrcdemux/imp.rs1861
-rw-r--r--net/ndi/src/ndisrcmeta.rs71
8 files changed, 1980 insertions, 1721 deletions
diff --git a/net/ndi/Cargo.toml b/net/ndi/Cargo.toml
index 5fee6b151..b35c994a2 100644
--- a/net/ndi/Cargo.toml
+++ b/net/ndi/Cargo.toml
@@ -9,11 +9,11 @@ edition = "2021"
rust-version = "1.70"
[dependencies]
-glib = { git = "https://github.com/gtk-rs/gtk-rs-core"}
-gst = { package = "gstreamer", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs" }
-gst-base = { package = "gstreamer-base", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs" }
-gst-audio = { package = "gstreamer-audio", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs" }
-gst-video = { package = "gstreamer-video", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs" }
+glib = { git = "https://github.com/gtk-rs/gtk-rs-core" }
+gst = { package = "gstreamer", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs", features = ["v1_16"] }
+gst-base = { package = "gstreamer-base", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs", features = ["v1_16"] }
+gst-audio = { package = "gstreamer-audio", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs", features = ["v1_16"] }
+gst-video = { package = "gstreamer-video", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs", features = ["v1_16"] }
anyhow = "1.0"
byte-slice-cast = "1"
byteorder = "1.0"
@@ -28,8 +28,7 @@ thiserror = "1.0"
gst-plugin-version-helper = { path = "../../version-helper" }
[features]
-default = ["interlaced-fields", "sink"]
-interlaced-fields = ["gst/v1_16", "gst-video/v1_16"]
+default = ["sink"]
sink = ["gst/v1_18", "gst-base/v1_18"]
advanced-sdk = []
static = []
diff --git a/net/ndi/src/lib.rs b/net/ndi/src/lib.rs
index abed3688e..4def1f967 100644
--- a/net/ndi/src/lib.rs
+++ b/net/ndi/src/lib.rs
@@ -32,10 +32,11 @@ use gst::prelude::*;
use gst::glib::once_cell::sync::Lazy;
-#[derive(Debug, Eq, PartialEq, Ord, PartialOrd, Hash, Clone, Copy, glib::Enum)]
+#[derive(Debug, Eq, PartialEq, Ord, PartialOrd, Hash, Clone, Copy, glib::Enum, Default)]
#[repr(u32)]
#[enum_type(name = "GstNdiTimestampMode")]
pub enum TimestampMode {
+ #[default]
#[enum_value(name = "Auto", nick = "auto")]
Auto = 0,
#[enum_value(name = "Receive Time / Timecode", nick = "receive-time-vs-timecode")]
diff --git a/net/ndi/src/ndi.rs b/net/ndi/src/ndi.rs
index fa0131fa3..0ea4d0267 100644
--- a/net/ndi/src/ndi.rs
+++ b/net/ndi/src/ndi.rs
@@ -257,7 +257,7 @@ impl<'a> RecvBuilder<'a> {
}
}
-#[derive(Debug, Clone)]
+#[derive(Debug)]
struct RecvInstancePtr(ptr::NonNull<::std::os::raw::c_void>);
impl Drop for RecvInstancePtr {
@@ -836,13 +836,11 @@ impl VideoFrame {
NDIlib_frame_format_type_e::NDIlib_frame_format_type_interleaved
}
// FIXME: Is this correct?
- #[cfg(feature = "interlaced-fields")]
gst_video::VideoInterlaceMode::Alternate
if frame.flags().contains(gst_video::VideoFrameFlags::TFF) =>
{
NDIlib_frame_format_type_e::NDIlib_frame_format_type_field_0
}
- #[cfg(feature = "interlaced-fields")]
gst_video::VideoInterlaceMode::Alternate
if !frame.flags().contains(gst_video::VideoFrameFlags::TFF) =>
{
diff --git a/net/ndi/src/ndi_cc_meta.rs b/net/ndi/src/ndi_cc_meta.rs
index 4c83a325f..120c183b0 100644
--- a/net/ndi/src/ndi_cc_meta.rs
+++ b/net/ndi/src/ndi_cc_meta.rs
@@ -268,12 +268,11 @@ impl NDICCMetaDecoder {
/// Decodes the provided NDI metadata string, searching for NDI closed captions
/// and add them as `VideoCaptionMeta` to the provided `gst::Buffer`.
- pub fn decode(&mut self, input: &str, buffer: &mut gst::Buffer) -> Result<()> {
+ pub fn decode(&mut self, input: &str) -> Result<Vec<VideoAncillary>> {
use quick_xml::events::Event;
use quick_xml::reader::Reader;
- let buffer = buffer.get_mut().unwrap();
-
+ let mut captions = Vec::new();
let mut reader = Reader::from_str(input);
self.xml_buf.clear();
@@ -293,11 +292,7 @@ impl NDICCMetaDecoder {
Ok(v210_buf) => match self.parse_for_cea608(&v210_buf) {
Ok(None) => (),
Ok(Some(anc)) => {
- gst_video::VideoCaptionMeta::add(
- buffer,
- gst_video::VideoCaptionType::Cea608S3341a,
- anc.data(),
- );
+ captions.push(anc);
}
Err(err) => {
gst::error!(CAT, "Failed to parse NDI C608 metadata: {err}");
@@ -311,11 +306,7 @@ impl NDICCMetaDecoder {
Ok(v210_buf) => match self.parse_for_cea708(&v210_buf) {
Ok(None) => (),
Ok(Some(anc)) => {
- gst_video::VideoCaptionMeta::add(
- buffer,
- gst_video::VideoCaptionType::Cea708Cdp,
- anc.data(),
- );
+ captions.push(anc);
}
Err(err) => {
gst::error!(CAT, "Failed to parse NDI C708 metadata: {err}");
@@ -333,7 +324,7 @@ impl NDICCMetaDecoder {
self.xml_buf.clear();
}
- Ok(())
+ Ok(captions)
}
fn parse_for_cea608(&mut self, input: &[u8]) -> Result<Option<VideoAncillary>> {
@@ -510,39 +501,36 @@ mod tests {
fn decode_ndi_meta_c608() {
gst::init().unwrap();
- let mut buf = gst::Buffer::new();
let mut ndi_cc_decoder = NDICCMetaDecoder::new(1920);
- ndi_cc_decoder
- .decode(
- "<C608 line=\"128\">AAAAAP8D8D8AhAUAAgEwIAAABgCUAcASAJgKAAAAAAA=</C608>",
- &mut buf,
- )
+ let captions = ndi_cc_decoder
+ .decode("<C608 line=\"128\">AAAAAP8D8D8AhAUAAgEwIAAABgCUAcASAJgKAAAAAAA=</C608>")
.unwrap();
- let mut cc_meta_iter = buf.iter_meta::<gst_video::VideoCaptionMeta>();
- let cc_meta = cc_meta_iter.next().unwrap();
- assert_eq!(cc_meta.caption_type(), VideoCaptionType::Cea608S3341a);
- assert_eq!(cc_meta.data(), [0x80, 0x94, 0x2c]);
- assert!(cc_meta_iter.next().is_none());
+ assert_eq!(captions.len(), 1);
+ assert_eq!(
+ captions[0].did16(),
+ gst_video::VideoAncillaryDID16::S334Eia608
+ );
+ assert_eq!(captions[0].data(), [0x80, 0x94, 0x2c]);
}
#[test]
fn decode_ndi_meta_c708() {
gst::init().unwrap();
- let mut buf = gst::Buffer::new();
let mut ndi_cc_decoder = NDICCMetaDecoder::new(1920);
- ndi_cc_decoder.decode(
+ let captions = ndi_cc_decoder.decode(
"<C708 line=\"10\">AAAAAP8D8D8AhAUAAQFQJQBYCgBpAlAlAPwIAEMBACAAAAgAcgKAHwDwCwCUAcASAOQLAAACACAA6AsAAAIAIADoCwAAAgAgAOgLAAACACAA6AsAAAIAIADoCwAAAgAgAOgLAAACACAA6AsAAAIAIADoCwAAAgAgAOgLAAACACAA6AsAAAIAIADoCwAAAgAgAOgLAAACACAA6AsAAAIAIADoCwAAAgAgAOgLAAACACAA6AsAAAIAIADoCwAAAgAgAOgLAAACACAA6AsAAAIAIADoCwAAAgAgAOgLAAACACAA6AsAAAIAIADQCQAAAgAgAGwIALcCAAAAAAAAAAAAAA==</C708>",
- &mut buf,
)
.unwrap();
- let mut cc_meta_iter = buf.iter_meta::<gst_video::VideoCaptionMeta>();
- let cc_meta = cc_meta_iter.next().unwrap();
- assert_eq!(cc_meta.caption_type(), VideoCaptionType::Cea708Cdp);
+ assert_eq!(captions.len(), 1);
+ assert_eq!(
+ captions[0].did16(),
+ gst_video::VideoAncillaryDID16::S334Eia708
+ );
assert_eq!(
- cc_meta.data(),
+ captions[0].data(),
[
0x96, 0x69, 0x55, 0x3f, 0x43, 0x00, 0x00, 0x72, 0xf8, 0xfc, 0x94, 0x2c, 0xf9, 0x00,
0x00, 0xfa, 0x00, 0x00, 0xfa, 0x00, 0x00, 0xfa, 0x00, 0x00, 0xfa, 0x00, 0x00, 0xfa,
@@ -553,16 +541,14 @@ mod tests {
0x1b,
]
);
- assert!(cc_meta_iter.next().is_none());
}
#[test]
fn decode_ndi_meta_c708_newlines_and_indent() {
gst::init().unwrap();
- let mut buf = gst::Buffer::new();
let mut ndi_cc_decoder = NDICCMetaDecoder::new(1920);
- ndi_cc_decoder
+ let captions = ndi_cc_decoder
.decode(
r#"<C708 line=\"10\">
AAAAAP8D8D8AhAUAAQFQJQBYCgBpAlAlAPwIAEMBACAAAAgAcgKAHwDwCwCUAcASAOQ
@@ -572,15 +558,16 @@ mod tests {
6AsAAAIAIADoCwAAAgAgAOgLAAACACAA6AsAAAIAIADQCQAAAgAgAGwIALcCAAAAAAA
AAAAAAA==
</C708>"#,
- &mut buf,
)
.unwrap();
- let mut cc_meta_iter = buf.iter_meta::<gst_video::VideoCaptionMeta>();
- let cc_meta = cc_meta_iter.next().unwrap();
- assert_eq!(cc_meta.caption_type(), VideoCaptionType::Cea708Cdp);
+ assert_eq!(captions.len(), 1);
+ assert_eq!(
+ captions[0].did16(),
+ gst_video::VideoAncillaryDID16::S334Eia708
+ );
assert_eq!(
- cc_meta.data(),
+ captions[0].data(),
[
0x96, 0x69, 0x55, 0x3f, 0x43, 0x00, 0x00, 0x72, 0xf8, 0xfc, 0x94, 0x2c, 0xf9, 0x00,
0x00, 0xfa, 0x00, 0x00, 0xfa, 0x00, 0x00, 0xfa, 0x00, 0x00, 0xfa, 0x00, 0x00, 0xfa,
@@ -591,51 +578,49 @@ mod tests {
0x1b,
]
);
- assert!(cc_meta_iter.next().is_none());
}
#[test]
fn decode_ndi_meta_c608_newlines_spaces_inline() {
gst::init().unwrap();
- let mut buf = gst::Buffer::new();
let mut ndi_cc_decoder = NDICCMetaDecoder::new(1920);
- ndi_cc_decoder.decode(
+ let captions = ndi_cc_decoder.decode(
"<C608 line=\"128\">\n\tAAAAAP8D8\n\n\r D8AhAUA\r\n\tAgEwIAAABgCUAcASAJgKAAAAAAA= \n</C608>",
- &mut buf,
)
.unwrap();
- let mut cc_meta_iter = buf.iter_meta::<gst_video::VideoCaptionMeta>();
- let cc_meta = cc_meta_iter.next().unwrap();
- assert_eq!(cc_meta.caption_type(), VideoCaptionType::Cea608S3341a);
- assert_eq!(cc_meta.data(), [0x80, 0x94, 0x2c]);
-
- assert!(cc_meta_iter.next().is_none());
+ assert_eq!(captions.len(), 1);
+ assert_eq!(
+ captions[0].did16(),
+ gst_video::VideoAncillaryDID16::S334Eia608
+ );
+ assert_eq!(captions[0].data(), [0x80, 0x94, 0x2c]);
}
#[test]
fn decode_ndi_meta_c608_and_c708() {
gst::init().unwrap();
- let mut buf = gst::Buffer::new();
let mut ndi_cc_decoder = NDICCMetaDecoder::new(1920);
- ndi_cc_decoder.decode(
+ let captions = ndi_cc_decoder.decode(
"<C608 line=\"128\">AAAAAP8D8D8AhAUAAgEwIAAABgCUAcASAJgKAAAAAAA=</C608><C708 line=\"10\">AAAAAP8D8D8AhAUAAQFQJQBYCgBpAlAlAPwIAEMBACAAAAgAcgKAHwDwCwCUAcASAOQLAAACACAA6AsAAAIAIADoCwAAAgAgAOgLAAACACAA6AsAAAIAIADoCwAAAgAgAOgLAAACACAA6AsAAAIAIADoCwAAAgAgAOgLAAACACAA6AsAAAIAIADoCwAAAgAgAOgLAAACACAA6AsAAAIAIADoCwAAAgAgAOgLAAACACAA6AsAAAIAIADoCwAAAgAgAOgLAAACACAA6AsAAAIAIADoCwAAAgAgAOgLAAACACAA6AsAAAIAIADQCQAAAgAgAGwIALcCAAAAAAAAAAAAAA==</C708>",
- &mut buf,
)
.unwrap();
- let mut cc_meta_iter = buf.iter_meta::<gst_video::VideoCaptionMeta>();
-
- let cc_meta = cc_meta_iter.next().unwrap();
- assert_eq!(cc_meta.caption_type(), VideoCaptionType::Cea608S3341a);
- assert_eq!(cc_meta.data(), [0x80, 0x94, 0x2c]);
+ assert_eq!(captions.len(), 2);
+ assert_eq!(
+ captions[0].did16(),
+ gst_video::VideoAncillaryDID16::S334Eia608
+ );
+ assert_eq!(captions[0].data(), [0x80, 0x94, 0x2c]);
- let cc_meta = cc_meta_iter.next().unwrap();
- assert_eq!(cc_meta.caption_type(), VideoCaptionType::Cea708Cdp);
assert_eq!(
- cc_meta.data(),
+ captions[1].did16(),
+ gst_video::VideoAncillaryDID16::S334Eia708
+ );
+ assert_eq!(
+ captions[1].data(),
[
0x96, 0x69, 0x55, 0x3f, 0x43, 0x00, 0x00, 0x72, 0xf8, 0xfc, 0x94, 0x2c, 0xf9, 0x00,
0x00, 0xfa, 0x00, 0x00, 0xfa, 0x00, 0x00, 0xfa, 0x00, 0x00, 0xfa, 0x00, 0x00, 0xfa,
@@ -646,8 +631,6 @@ mod tests {
0x1b,
]
);
-
- assert!(cc_meta_iter.next().is_none());
}
#[test]
@@ -655,13 +638,9 @@ mod tests {
gst::init().unwrap();
// Expecting </C608> found </C708>'
- let mut buf = gst::Buffer::new();
let mut ndi_cc_decoder = NDICCMetaDecoder::new(1920);
ndi_cc_decoder
- .decode(
- "<C608 line=\"128\">AAAAAP8D8D8AhAUAAgEwIAAABgCUAcASAJgKAAAAAAA=</C708>",
- &mut buf,
- )
+ .decode("<C608 line=\"128\">AAAAAP8D8D8AhAUAAgEwIAAABgCUAcASAJgKAAAAAAA=</C708>")
.unwrap_err();
}
}
diff --git a/net/ndi/src/ndisrc/imp.rs b/net/ndi/src/ndisrc/imp.rs
index 04c06e3e9..b2bdea7af 100644
--- a/net/ndi/src/ndisrc/imp.rs
+++ b/net/ndi/src/ndisrc/imp.rs
@@ -11,12 +11,13 @@ use std::u32;
use gst::glib::once_cell::sync::Lazy;
+use crate::ndisrcmeta::NdiSrcMeta;
use crate::ndisys;
use crate::RecvColorFormat;
use crate::TimestampMode;
-use super::receiver::{self, Buffer, Receiver, ReceiverControlHandle, ReceiverItem};
-use crate::ndisrcmeta;
+use super::receiver::{Receiver, ReceiverControlHandle, ReceiverItem};
+use crate::ndisrcmeta::Buffer;
static CAT: Lazy<gst::DebugCategory> = Lazy::new(|| {
gst::DebugCategory::new(
@@ -63,26 +64,11 @@ impl Default for Settings {
}
}
+#[derive(Default)]
struct State {
- video_info: Option<receiver::VideoInfo>,
- video_caps: Option<gst::Caps>,
- audio_info: Option<receiver::AudioInfo>,
- audio_caps: Option<gst::Caps>,
- current_latency: Option<gst::ClockTime>,
receiver: Option<Receiver>,
-}
-
-impl Default for State {
- fn default() -> State {
- State {
- video_info: None,
- video_caps: None,
- audio_info: None,
- audio_caps: None,
- current_latency: gst::ClockTime::NONE,
- receiver: None,
- }
- }
+ timestamp_mode: TimestampMode,
+ current_latency: Option<gst::ClockTime>,
}
pub struct NdiSrc {
@@ -447,7 +433,6 @@ impl BaseSrcImpl for NdiSrc {
settings.connect_timeout,
settings.bandwidth,
settings.color_format.into(),
- settings.timestamp_mode,
settings.timeout,
settings.max_queue_length as usize,
);
@@ -462,6 +447,7 @@ impl BaseSrcImpl for NdiSrc {
Some(receiver.receiver_control_handle());
let mut state = self.state.lock().unwrap();
state.receiver = Some(receiver);
+ state.timestamp_mode = settings.timestamp_mode;
Ok(())
}
@@ -537,72 +523,32 @@ impl BaseSrcImpl for NdiSrc {
state.receiver = Some(recv);
match res {
- ReceiverItem::Buffer(buffer) => {
- let buffer = match buffer {
- Buffer::Audio(mut buffer, info) => {
- if state.audio_info.as_ref() != Some(&info) {
- let caps = info.to_caps().map_err(|_| {
- gst::element_imp_error!(
- self,
- gst::ResourceError::Settings,
- ["Invalid audio info received: {:?}", info]
- );
- gst::FlowError::NotNegotiated
- })?;
- state.audio_info = Some(info);
- state.audio_caps = Some(caps);
- }
-
- {
- let buffer = buffer.get_mut().unwrap();
- ndisrcmeta::NdiSrcMeta::add(
- buffer,
- ndisrcmeta::StreamType::Audio,
- state.audio_caps.as_ref().unwrap(),
- );
- }
-
- buffer
- }
- Buffer::Video(mut buffer, info) => {
- let mut latency_changed = false;
-
- if state.video_info.as_ref() != Some(&info) {
- let caps = info.to_caps().map_err(|_| {
- gst::element_imp_error!(
- self,
- gst::ResourceError::Settings,
- ["Invalid video info received: {:?}", info]
- );
- gst::FlowError::NotNegotiated
- })?;
- state.video_info = Some(info);
- state.video_caps = Some(caps);
- latency_changed = state.current_latency != buffer.duration();
- state.current_latency = buffer.duration();
- }
-
- {
- let buffer = buffer.get_mut().unwrap();
- ndisrcmeta::NdiSrcMeta::add(
- buffer,
- ndisrcmeta::StreamType::Video,
- state.video_caps.as_ref().unwrap(),
- );
- }
-
- drop(state);
- if latency_changed {
- let _ = self.obj().post_message(
- gst::message::Latency::builder().src(&*self.obj()).build(),
- );
- }
-
- buffer
- }
- };
-
- Ok(CreateSuccess::NewBuffer(buffer))
+ ReceiverItem::Buffer(ndi_buffer) => {
+ let mut latency_changed = false;
+
+ if let Buffer::Video { ref frame, .. } = ndi_buffer {
+ let duration = gst::ClockTime::SECOND
+ .mul_div_floor(frame.frame_rate().1 as u64, frame.frame_rate().0 as u64);
+
+ latency_changed = state.current_latency != duration;
+ state.current_latency = duration;
+ }
+
+ let mut gst_buffer = gst::Buffer::new();
+ {
+ let buffer_ref = gst_buffer.get_mut().unwrap();
+ NdiSrcMeta::add(buffer_ref, ndi_buffer, state.timestamp_mode);
+ }
+
+ drop(state);
+
+ if latency_changed {
+ let _ = self
+ .obj()
+ .post_message(gst::message::Latency::builder().src(&*self.obj()).build());
+ }
+
+ Ok(CreateSuccess::NewBuffer(gst_buffer))
}
ReceiverItem::Timeout => Err(gst::FlowError::Eos),
ReceiverItem::Flushing => Err(gst::FlowError::Flushing),
diff --git a/net/ndi/src/ndisrc/receiver.rs b/net/ndi/src/ndisrc/receiver.rs
index 2dd9b1c89..0e4f93d1b 100644
--- a/net/ndi/src/ndisrc/receiver.rs
+++ b/net/ndi/src/ndisrc/receiver.rs
@@ -2,25 +2,17 @@
use glib::prelude::*;
use gst::prelude::*;
-use gst_video::prelude::*;
-use byte_slice_cast::*;
-
-use std::cmp;
use std::collections::VecDeque;
use std::sync::{Arc, Condvar, Mutex, Weak};
use std::thread;
use std::time;
-use atomic_refcell::AtomicRefCell;
-
use gst::glib::once_cell::sync::Lazy;
use crate::ndi::*;
-use crate::ndi_cc_meta::NDICCMetaDecoder;
-use crate::ndisys;
+use crate::ndisrcmeta::Buffer;
use crate::ndisys::*;
-use crate::TimestampMode;
static CAT: Lazy<gst::DebugCategory> = Lazy::new(|| {
gst::DebugCategory::new(
@@ -32,170 +24,6 @@ static CAT: Lazy<gst::DebugCategory> = Lazy::new(|| {
pub struct Receiver(Arc<ReceiverInner>);
-#[derive(Debug, PartialEq, Eq)]
-#[allow(clippy::large_enum_variant)]
-pub enum AudioInfo {
- Audio(gst_audio::AudioInfo),
- #[cfg(feature = "advanced-sdk")]
- #[allow(dead_code)]
- Opus {
- sample_rate: i32,
- no_channels: i32,
- },
- #[cfg(feature = "advanced-sdk")]
- Aac {
- sample_rate: i32,
- no_channels: i32,
- codec_data: [u8; 2],
- },
-}
-
-impl AudioInfo {
- pub fn to_caps(&self) -> Result<gst::Caps, glib::BoolError> {
- match self {
- AudioInfo::Audio(ref info) => info.to_caps(),
- #[cfg(feature = "advanced-sdk")]
- AudioInfo::Opus {
- sample_rate,
- no_channels,
- } => Ok(gst::Caps::builder("audio/x-opus")
- .field("channels", *no_channels)
- .field("rate", *sample_rate)
- .field("channel-mapping-family", 0i32)
- .build()),
- #[cfg(feature = "advanced-sdk")]
- AudioInfo::Aac {
- sample_rate,
- no_channels,
- codec_data,
- } => Ok(gst::Caps::builder("audio/mpeg")
- .field("channels", *no_channels)
- .field("rate", *sample_rate)
- .field("mpegversion", 4i32)
- .field("stream-format", "raw")
- .field("codec_data", gst::Buffer::from_mut_slice(*codec_data))
- .build()),
- }
- }
-}
-
-#[derive(Debug, PartialEq, Eq)]
-pub enum VideoInfo {
- Video(gst_video::VideoInfo),
- #[cfg(feature = "advanced-sdk")]
- SpeedHQInfo {
- variant: String,
- xres: i32,
- yres: i32,
- fps_n: i32,
- fps_d: i32,
- par_n: i32,
- par_d: i32,
- interlace_mode: gst_video::VideoInterlaceMode,
- },
- #[cfg(feature = "advanced-sdk")]
- H264 {
- xres: i32,
- yres: i32,
- fps_n: i32,
- fps_d: i32,
- par_n: i32,
- par_d: i32,
- interlace_mode: gst_video::VideoInterlaceMode,
- },
- #[cfg(feature = "advanced-sdk")]
- H265 {
- xres: i32,
- yres: i32,
- fps_n: i32,
- fps_d: i32,
- par_n: i32,
- par_d: i32,
- interlace_mode: gst_video::VideoInterlaceMode,
- },
-}
-
-impl VideoInfo {
- pub fn to_caps(&self) -> Result<gst::Caps, glib::BoolError> {
- match self {
- VideoInfo::Video(ref info) => info.to_caps(),
- #[cfg(feature = "advanced-sdk")]
- VideoInfo::SpeedHQInfo {
- ref variant,
- xres,
- yres,
- fps_n,
- fps_d,
- par_n,
- par_d,
- interlace_mode,
- } => Ok(gst::Caps::builder("video/x-speedhq")
- .field("width", *xres)
- .field("height", *yres)
- .field("framerate", gst::Fraction::new(*fps_n, *fps_d))
- .field("pixel-aspect-ratio", gst::Fraction::new(*par_n, *par_d))
- .field("interlace-mode", interlace_mode.to_str())
- .field("variant", variant)
- .build()),
- #[cfg(feature = "advanced-sdk")]
- VideoInfo::H264 {
- xres,
- yres,
- fps_n,
- fps_d,
- par_n,
- par_d,
- interlace_mode,
- ..
- } => Ok(gst::Caps::builder("video/x-h264")
- .field("width", *xres)
- .field("height", *yres)
- .field("framerate", gst::Fraction::new(*fps_n, *fps_d))
- .field("pixel-aspect-ratio", gst::Fraction::new(*par_n, *par_d))
- .field("interlace-mode", interlace_mode.to_str())
- .field("stream-format", "byte-stream")
- .field("alignment", "au")
- .build()),
- #[cfg(feature = "advanced-sdk")]
- VideoInfo::H265 {
- xres,
- yres,
- fps_n,
- fps_d,
- par_n,
- par_d,
- interlace_mode,
- ..
- } => Ok(gst::Caps::builder("video/x-h265")
- .field("width", *xres)
- .field("height", *yres)
- .field("framerate", gst::Fraction::new(*fps_n, *fps_d))
- .field("pixel-aspect-ratio", gst::Fraction::new(*par_n, *par_d))
- .field("interlace-mode", interlace_mode.to_str())
- .field("stream-format", "byte-stream")
- .field("alignment", "au")
- .build()),
- }
- }
-
- pub fn width(&self) -> u32 {
- match self {
- VideoInfo::Video(ref info) => info.width(),
- #[cfg(feature = "advanced-sdk")]
- VideoInfo::SpeedHQInfo { xres, .. }
- | VideoInfo::H264 { xres, .. }
- | VideoInfo::H265 { xres, .. } => *xres as u32,
- }
- }
-}
-
-#[derive(Debug)]
-#[allow(clippy::large_enum_variant)]
-pub enum Buffer {
- Audio(gst::Buffer, AudioInfo),
- Video(gst::Buffer, VideoInfo),
-}
-
#[derive(Debug)]
#[allow(clippy::large_enum_variant)]
pub enum ReceiverItem {
@@ -209,17 +37,11 @@ struct ReceiverInner {
queue: ReceiverQueue,
max_queue_length: usize,
- // Audio/video time observations
- observations_timestamp: [Observations; 2],
- observations_timecode: [Observations; 2],
-
element: glib::WeakRef<gst::Element>,
- timestamp_mode: TimestampMode,
timeout: u32,
connect_timeout: u32,
- ndi_cc_decoder: AtomicRefCell<Option<NDICCMetaDecoder>>,
thread: Mutex<Option<std::thread::JoinHandle<()>>>,
}
@@ -245,305 +67,6 @@ struct ReceiverQueueInner {
timeout: bool,
}
-const PREFILL_WINDOW_LENGTH: usize = 12;
-const WINDOW_LENGTH: u64 = 512;
-const WINDOW_DURATION: u64 = 2_000_000_000;
-
-#[derive(Default)]
-struct Observations(AtomicRefCell<ObservationsInner>);
-
-struct ObservationsInner {
- base_remote_time: Option<u64>,
- base_local_time: Option<u64>,
- deltas: VecDeque<i64>,
- min_delta: i64,
- skew: i64,
- filling: bool,
- window_size: usize,
-
- // Remote/local times for workaround around fundamentally wrong slopes
- // This is not reset below and has a bigger window.
- times: VecDeque<(u64, u64)>,
- slope_correction: (u64, u64),
-}
-
-impl Default for ObservationsInner {
- fn default() -> ObservationsInner {
- ObservationsInner {
- base_local_time: None,
- base_remote_time: None,
- deltas: VecDeque::new(),
- min_delta: 0,
- skew: 0,
- filling: true,
- window_size: 0,
- times: VecDeque::new(),
- slope_correction: (1, 1),
- }
- }
-}
-
-impl ObservationsInner {
- fn reset(&mut self) {
- self.base_local_time = None;
- self.base_remote_time = None;
- self.deltas = VecDeque::new();
- self.min_delta = 0;
- self.skew = 0;
- self.filling = true;
- self.window_size = 0;
- }
-}
-
-impl Observations {
- // Based on the algorithm used in GStreamer's rtpjitterbuffer, which comes from
- // Fober, Orlarey and Letz, 2005, "Real Time Clock Skew Estimation over Network Delays":
- // http://citeseerx.ist.psu.edu/viewdoc/summary?doi=10.1.1.102.1546
- fn process(
- &self,
- element: &gst::Element,
- remote_time: Option<gst::ClockTime>,
- local_time: gst::ClockTime,
- duration: Option<gst::ClockTime>,
- ) -> Option<(gst::ClockTime, Option<gst::ClockTime>, bool)> {
- let remote_time = remote_time?.nseconds();
- let local_time = local_time.nseconds();
-
- let mut inner = self.0.borrow_mut();
-
- gst::trace!(
- CAT,
- obj: element,
- "Local time {}, remote time {}, slope correct {}/{}",
- local_time.nseconds(),
- remote_time.nseconds(),
- inner.slope_correction.0,
- inner.slope_correction.1,
- );
-
- inner.times.push_back((remote_time, local_time));
- while inner
- .times
- .back()
- .unwrap()
- .1
- .saturating_sub(inner.times.front().unwrap().1)
- > WINDOW_DURATION
- {
- let _ = inner.times.pop_front();
- }
-
- // Static remote times
- if inner.slope_correction.1 == 0 {
- return None;
- }
-
- let remote_time =
- remote_time.mul_div_round(inner.slope_correction.0, inner.slope_correction.1)?;
-
- let (base_remote_time, base_local_time) =
- match (inner.base_remote_time, inner.base_local_time) {
- (Some(remote), Some(local)) => (remote, local),
- _ => {
- gst::debug!(
- CAT,
- obj: element,
- "Initializing base time: local {}, remote {}",
- local_time.nseconds(),
- remote_time.nseconds(),
- );
- inner.base_remote_time = Some(remote_time);
- inner.base_local_time = Some(local_time);
-
- return Some((local_time.nseconds(), duration, true));
- }
- };
-
- if inner.times.len() < PREFILL_WINDOW_LENGTH {
- return Some((local_time.nseconds(), duration, false));
- }
-
- // Check if the slope is simply wrong and try correcting
- {
- let local_diff = inner
- .times
- .back()
- .unwrap()
- .1
- .saturating_sub(inner.times.front().unwrap().1);
- let remote_diff = inner
- .times
- .back()
- .unwrap()
- .0
- .saturating_sub(inner.times.front().unwrap().0);
-
- if remote_diff == 0 {
- inner.reset();
- inner.base_remote_time = Some(remote_time);
- inner.base_local_time = Some(local_time);
-
- // Static remote times
- inner.slope_correction = (0, 0);
- return None;
- } else {
- let slope = local_diff as f64 / remote_diff as f64;
- let scaled_slope =
- slope * (inner.slope_correction.1 as f64) / (inner.slope_correction.0 as f64);
-
- // Check for some obviously wrong slopes and try to correct for that
- if !(0.5..1.5).contains(&scaled_slope) {
- gst::warning!(
- CAT,
- obj: element,
- "Too small/big slope {}, resetting",
- scaled_slope
- );
-
- let discont = !inner.deltas.is_empty();
- inner.reset();
-
- if (0.0005..0.0015).contains(&slope) {
- // Remote unit was actually 0.1ns
- inner.slope_correction = (1, 1000);
- } else if (0.005..0.015).contains(&slope) {
- // Remote unit was actually 1ns
- inner.slope_correction = (1, 100);
- } else if (0.05..0.15).contains(&slope) {
- // Remote unit was actually 10ns
- inner.slope_correction = (1, 10);
- } else if (5.0..15.0).contains(&slope) {
- // Remote unit was actually 1us
- inner.slope_correction = (10, 1);
- } else if (50.0..150.0).contains(&slope) {
- // Remote unit was actually 10us
- inner.slope_correction = (100, 1);
- } else if (50.0..150.0).contains(&slope) {
- // Remote unit was actually 100us
- inner.slope_correction = (1000, 1);
- } else if (50.0..150.0).contains(&slope) {
- // Remote unit was actually 1ms
- inner.slope_correction = (10000, 1);
- } else {
- inner.slope_correction = (1, 1);
- }
-
- let remote_time = inner
- .times
- .back()
- .unwrap()
- .0
- .mul_div_round(inner.slope_correction.0, inner.slope_correction.1)?;
- gst::debug!(
- CAT,
- obj: element,
- "Initializing base time: local {}, remote {}, slope correction {}/{}",
- local_time.nseconds(),
- remote_time.nseconds(),
- inner.slope_correction.0,
- inner.slope_correction.1,
- );
- inner.base_remote_time = Some(remote_time);
- inner.base_local_time = Some(local_time);
-
- return Some((local_time.nseconds(), duration, discont));
- }
- }
- }
-
- let remote_diff = remote_time.saturating_sub(base_remote_time);
- let local_diff = local_time.saturating_sub(base_local_time);
- let delta = (local_diff as i64) - (remote_diff as i64);
-
- gst::trace!(
- CAT,
- obj: element,
- "Local diff {}, remote diff {}, delta {}",
- local_diff.nseconds(),
- remote_diff.nseconds(),
- delta,
- );
-
- if (delta > inner.skew && delta - inner.skew > 1_000_000_000)
- || (delta < inner.skew && inner.skew - delta > 1_000_000_000)
- {
- gst::warning!(
- CAT,
- obj: element,
- "Delta {} too far from skew {}, resetting",
- delta,
- inner.skew
- );
-
- let discont = !inner.deltas.is_empty();
-
- gst::debug!(
- CAT,
- obj: element,
- "Initializing base time: local {}, remote {}",
- local_time.nseconds(),
- remote_time.nseconds(),
- );
-
- inner.reset();
- inner.base_remote_time = Some(remote_time);
- inner.base_local_time = Some(local_time);
-
- return Some((local_time.nseconds(), duration, discont));
- }
-
- if inner.filling {
- if inner.deltas.is_empty() || delta < inner.min_delta {
- inner.min_delta = delta;
- }
- inner.deltas.push_back(delta);
-
- if remote_diff > WINDOW_DURATION || inner.deltas.len() as u64 == WINDOW_LENGTH {
- inner.window_size = inner.deltas.len();
- inner.skew = inner.min_delta;
- inner.filling = false;
- } else {
- let perc_time = remote_diff.mul_div_floor(100, WINDOW_DURATION).unwrap() as i64;
- let perc_window = (inner.deltas.len() as u64)
- .mul_div_floor(100, WINDOW_LENGTH)
- .unwrap() as i64;
- let perc = cmp::max(perc_time, perc_window);
-
- inner.skew = (perc * inner.min_delta + ((10_000 - perc) * inner.skew)) / 10_000;
- }
- } else {
- let old = inner.deltas.pop_front().unwrap();
- inner.deltas.push_back(delta);
-
- if delta <= inner.min_delta {
- inner.min_delta = delta;
- } else if old == inner.min_delta {
- inner.min_delta = inner.deltas.iter().copied().min().unwrap();
- }
-
- inner.skew = (inner.min_delta + (124 * inner.skew)) / 125;
- }
-
- let out_time = base_local_time + remote_diff;
- let out_time = if inner.skew < 0 {
- out_time.saturating_sub((-inner.skew) as u64)
- } else {
- out_time + (inner.skew as u64)
- };
-
- gst::trace!(
- CAT,
- obj: element,
- "Skew {}, min delta {}",
- inner.skew,
- inner.min_delta
- );
- gst::trace!(CAT, obj: element, "Outputting {}", out_time.nseconds());
-
- Some((out_time.nseconds(), duration, false))
- }
-}
-
#[derive(Clone)]
pub struct ReceiverControlHandle {
queue: ReceiverQueue,
@@ -586,7 +109,6 @@ impl Drop for ReceiverInner {
impl Receiver {
fn new(
recv: RecvInstance,
- timestamp_mode: TimestampMode,
timeout: u32,
connect_timeout: u32,
max_queue_length: usize,
@@ -605,13 +127,9 @@ impl Receiver {
Condvar::new(),
))),
max_queue_length,
- observations_timestamp: Default::default(),
- observations_timecode: Default::default(),
element: element.downgrade(),
- timestamp_mode,
timeout,
connect_timeout,
- ndi_cc_decoder: AtomicRefCell::new(None),
thread: Mutex::new(None),
}));
@@ -699,7 +217,6 @@ impl Receiver {
connect_timeout: u32,
bandwidth: NDIlib_recv_bandwidth_e,
color_format: NDIlib_recv_color_format_e,
- timestamp_mode: TimestampMode,
timeout: u32,
max_queue_length: usize,
) -> Option<Self> {
@@ -740,14 +257,7 @@ impl Receiver {
recv.send_metadata(&enable_hw_accel);
// This will set info.audio/video accordingly
- let receiver = Receiver::new(
- recv,
- timestamp_mode,
- timeout,
- connect_timeout,
- max_queue_length,
- element,
- );
+ let receiver = Receiver::new(recv, timeout, connect_timeout, max_queue_length, element);
Some(receiver)
}
@@ -757,7 +267,6 @@ impl Receiver {
let mut first_audio_frame = true;
let mut first_frame = true;
let mut timer = time::Instant::now();
- let mut pending_metas = VecDeque::<String>::new();
// Capture until error or shutdown
loop {
@@ -813,60 +322,74 @@ impl Receiver {
continue;
}
Ok(Some(Frame::Video(frame))) => {
- first_frame = false;
- let mut buffer = receiver.create_video_buffer_and_info(&element, frame);
- if first_video_frame {
- if let Ok(Buffer::Video(ref mut buffer, _)) = buffer {
- buffer
- .get_mut()
- .unwrap()
- .set_flags(gst::BufferFlags::DISCONT);
- first_video_frame = false;
- }
- }
+ if let Some(receive_time_gst) = element.current_running_time() {
+ let receive_time_real = (glib::real_time() as u64 * 1000).nseconds();
- if !pending_metas.is_empty() {
- if let Ok(Buffer::Video(ref mut buffer, _)) = buffer {
- let mut ndi_cc_decoder = receiver.0.ndi_cc_decoder.borrow_mut();
- for meta in pending_metas.drain(..) {
- let res = ndi_cc_decoder.as_mut().unwrap().decode(&meta, buffer);
- if let Err(err) = res {
- gst::debug!(CAT, obj: element, "Failed to parse NDI metadata: {err}");
- }
- }
- }
- }
+ first_frame = false;
+ let discont = first_video_frame;
+ first_video_frame = false;
- buffer
+ gst::debug!(
+ CAT,
+ obj: element,
+ "Received video frame at timecode {}: {:?}",
+ (frame.timecode() as u64 * 100).nseconds(),
+ frame,
+ );
+
+ Ok(Buffer::Video {
+ frame,
+ discont,
+ receive_time_gst,
+ receive_time_real,
+ })
+ } else {
+ Err(gst::FlowError::Flushing)
+ }
}
Ok(Some(Frame::Audio(frame))) => {
- first_frame = false;
- let mut buffer = receiver.create_audio_buffer_and_info(&element, frame);
- if first_audio_frame {
- if let Ok(Buffer::Audio(ref mut buffer, _)) = buffer {
- buffer
- .get_mut()
- .unwrap()
- .set_flags(gst::BufferFlags::DISCONT);
- first_audio_frame = false;
- }
+ if let Some(receive_time_gst) = element.current_running_time() {
+ let receive_time_real = (glib::real_time() as u64 * 1000).nseconds();
+ first_frame = false;
+ let discont = first_audio_frame;
+ first_audio_frame = false;
+
+ gst::debug!(
+ CAT,
+ obj: element,
+ "Received audio frame at timecode {}: {:?}",
+ (frame.timecode() as u64 * 100).nseconds(),
+ frame,
+ );
+
+ Ok(Buffer::Audio {
+ frame,
+ discont,
+ receive_time_gst,
+ receive_time_real,
+ })
+ } else {
+ Err(gst::FlowError::Flushing)
}
- buffer
}
Ok(Some(Frame::Metadata(frame))) => {
- if let Some(metadata) = frame.metadata() {
+ if let Some(receive_time_gst) = element.current_running_time() {
+ let receive_time_real = (glib::real_time() as u64 * 1000).nseconds();
gst::debug!(
CAT,
obj: element,
- "Received metadata at timecode {}: {}",
+ "Received metadata frame at timecode {}: {:?}",
(frame.timecode() as u64 * 100).nseconds(),
- metadata,
+ frame,
);
-
- pending_metas.push_back(metadata.to_string());
+ Ok(Buffer::Metadata {
+ frame,
+ receive_time_gst,
+ receive_time_real,
+ })
+ } else {
+ Err(gst::FlowError::Flushing)
}
-
- continue;
}
};
@@ -899,6 +422,9 @@ impl Receiver {
queue.buffer_queue.clear();
(receiver.0.queue.0).1.notify_one();
timer = time::Instant::now();
+ first_frame = true;
+ first_audio_frame = true;
+ first_video_frame = true;
}
Err(err) => {
gst::error!(CAT, obj: element, "Signalling error");
@@ -912,924 +438,4 @@ impl Receiver {
}
}
}
-
- fn calculate_timestamp(
- &self,
- element: &gst::Element,
- is_audio: bool,
- timestamp: i64,
- timecode: i64,
- duration: Option<gst::ClockTime>,
- ) -> Option<(gst::ClockTime, Option<gst::ClockTime>, bool)> {
- let receive_time = element.current_running_time()?;
-
- let real_time_now = (glib::real_time() as u64 * 1000).nseconds();
- let timestamp = if timestamp == ndisys::NDIlib_recv_timestamp_undefined {
- gst::ClockTime::NONE
- } else {
- Some((timestamp as u64 * 100).nseconds())
- };
- let timecode = (timecode as u64 * 100).nseconds();
-
- gst::log!(
- CAT,
- obj: element,
- "Received frame with timecode {}, timestamp {}, duration {}, receive time {}, local time now {}",
- timecode,
- timestamp.display(),
- duration.display(),
- receive_time.display(),
- real_time_now,
- );
-
- let res_timestamp = self.0.observations_timestamp[usize::from(!is_audio)].process(
- element,
- timestamp,
- receive_time,
- duration,
- );
-
- let res_timecode = self.0.observations_timecode[usize::from(!is_audio)].process(
- element,
- Some(timecode),
- receive_time,
- duration,
- );
-
- let (pts, duration, discont) = match self.0.timestamp_mode {
- TimestampMode::ReceiveTimeTimecode => match res_timecode {
- Some((pts, duration, discont)) => (pts, duration, discont),
- None => {
- gst::warning!(CAT, obj: element, "Can't calculate timestamp");
- (receive_time, duration, false)
- }
- },
- TimestampMode::ReceiveTimeTimestamp => match res_timestamp {
- Some((pts, duration, discont)) => (pts, duration, discont),
- None => {
- if timestamp.is_some() {
- gst::warning!(CAT, obj: element, "Can't calculate timestamp");
- }
-
- (receive_time, duration, false)
- }
- },
- TimestampMode::Timecode => (timecode, duration, false),
- TimestampMode::Timestamp if timestamp.is_none() => (receive_time, duration, false),
- TimestampMode::Timestamp => {
- // Timestamps are relative to the UNIX epoch
- let timestamp = timestamp?;
- if real_time_now > timestamp {
- let diff = real_time_now - timestamp;
- if diff > receive_time {
- (gst::ClockTime::ZERO, duration, false)
- } else {
- (receive_time - diff, duration, false)
- }
- } else {
- let diff = timestamp - real_time_now;
- (receive_time + diff, duration, false)
- }
- }
- TimestampMode::ReceiveTime => (receive_time, duration, false),
- TimestampMode::Auto => {
- res_timecode
- .or(res_timestamp)
- .unwrap_or((receive_time, duration, false))
- }
- };
-
- gst::log!(
- CAT,
- obj: element,
- "Calculated PTS {}, duration {}",
- pts.display(),
- duration.display(),
- );
-
- Some((pts, duration, discont))
- }
-
- fn create_video_buffer_and_info(
- &self,
- element: &gst::Element,
- video_frame: VideoFrame,
- ) -> Result<Buffer, gst::FlowError> {
- gst::debug!(CAT, obj: element, "Received video frame {:?}", video_frame);
-
- let (pts, duration, discont) = self
- .calculate_video_timestamp(element, &video_frame)
- .ok_or_else(|| {
- gst::debug!(CAT, obj: element, "Flushing, dropping buffer");
- gst::FlowError::Flushing
- })?;
-
- let info = self.create_video_info(element, &video_frame)?;
-
- let mut buffer = self.create_video_buffer(element, pts, duration, &info, &video_frame)?;
- if discont {
- buffer
- .get_mut()
- .unwrap()
- .set_flags(gst::BufferFlags::RESYNC);
- }
-
- let mut ndi_cc_decoder = self.0.ndi_cc_decoder.borrow_mut();
- if ndi_cc_decoder.is_none() {
- *ndi_cc_decoder = Some(NDICCMetaDecoder::new(info.width()));
- }
-
- {
- let ndi_cc_decoder = ndi_cc_decoder.as_mut().unwrap();
- // handle potential width change (also needed for standalone metadata)
- ndi_cc_decoder.set_width(info.width());
-
- if let Some(metadata) = video_frame.metadata() {
- let res = ndi_cc_decoder.decode(metadata, &mut buffer);
- if let Err(err) = res {
- gst::debug!(CAT, obj: element, "Failed to parse NDI video frame metadata: {err}");
- }
- }
- }
-
- gst::log!(CAT, obj: element, "Produced video buffer {:?}", buffer);
-
- Ok(Buffer::Video(buffer, info))
- }
-
- fn calculate_video_timestamp(
- &self,
- element: &gst::Element,
- video_frame: &VideoFrame,
- ) -> Option<(gst::ClockTime, Option<gst::ClockTime>, bool)> {
- let duration = gst::ClockTime::SECOND.mul_div_floor(
- video_frame.frame_rate().1 as u64,
- video_frame.frame_rate().0 as u64,
- );
-
- self.calculate_timestamp(
- element,
- false,
- video_frame.timestamp(),
- video_frame.timecode(),
- duration,
- )
- }
-
- fn create_video_info(
- &self,
- element: &gst::Element,
- video_frame: &VideoFrame,
- ) -> Result<VideoInfo, gst::FlowError> {
- let fourcc = video_frame.fourcc();
-
- let par = gst::Fraction::approximate_f32(video_frame.picture_aspect_ratio())
- .unwrap_or_else(|| gst::Fraction::new(1, 1))
- * gst::Fraction::new(video_frame.yres(), video_frame.xres());
- let interlace_mode = match video_frame.frame_format_type() {
- ndisys::NDIlib_frame_format_type_e::NDIlib_frame_format_type_progressive => {
- gst_video::VideoInterlaceMode::Progressive
- }
- ndisys::NDIlib_frame_format_type_e::NDIlib_frame_format_type_interleaved => {
- gst_video::VideoInterlaceMode::Interleaved
- }
- #[cfg(feature = "interlaced-fields")]
- _ => gst_video::VideoInterlaceMode::Alternate,
- #[cfg(not(feature = "interlaced-fields"))]
- _ => {
- gst::element_error!(
- element,
- gst::StreamError::Format,
- ["Separate field interlacing not supported"]
- );
- return Err(gst::FlowError::NotNegotiated);
- }
- };
-
- if [
- ndisys::NDIlib_FourCC_video_type_UYVY,
- ndisys::NDIlib_FourCC_video_type_UYVA,
- ndisys::NDIlib_FourCC_video_type_YV12,
- ndisys::NDIlib_FourCC_video_type_NV12,
- ndisys::NDIlib_FourCC_video_type_I420,
- ndisys::NDIlib_FourCC_video_type_BGRA,
- ndisys::NDIlib_FourCC_video_type_BGRX,
- ndisys::NDIlib_FourCC_video_type_RGBA,
- ndisys::NDIlib_FourCC_video_type_BGRX,
- ]
- .contains(&fourcc)
- {
- // YV12 and I420 are swapped in the NDI SDK compared to GStreamer
- let format = match video_frame.fourcc() {
- ndisys::NDIlib_FourCC_video_type_UYVY => gst_video::VideoFormat::Uyvy,
- // FIXME: This drops the alpha plane!
- ndisys::NDIlib_FourCC_video_type_UYVA => gst_video::VideoFormat::Uyvy,
- ndisys::NDIlib_FourCC_video_type_YV12 => gst_video::VideoFormat::I420,
- ndisys::NDIlib_FourCC_video_type_NV12 => gst_video::VideoFormat::Nv12,
- ndisys::NDIlib_FourCC_video_type_I420 => gst_video::VideoFormat::Yv12,
- ndisys::NDIlib_FourCC_video_type_BGRA => gst_video::VideoFormat::Bgra,
- ndisys::NDIlib_FourCC_video_type_BGRX => gst_video::VideoFormat::Bgrx,
- ndisys::NDIlib_FourCC_video_type_RGBA => gst_video::VideoFormat::Rgba,
- ndisys::NDIlib_FourCC_video_type_RGBX => gst_video::VideoFormat::Rgbx,
- _ => {
- gst::element_error!(
- element,
- gst::StreamError::Format,
- ["Unsupported video fourcc {:08x}", video_frame.fourcc()]
- );
-
- return Err(gst::FlowError::NotNegotiated);
- } // TODO: NDIlib_FourCC_video_type_P216 and NDIlib_FourCC_video_type_PA16 not
- // supported by GStreamer
- };
-
- #[cfg(feature = "interlaced-fields")]
- {
- let mut builder = gst_video::VideoInfo::builder(
- format,
- video_frame.xres() as u32,
- video_frame.yres() as u32,
- )
- .fps(gst::Fraction::from(video_frame.frame_rate()))
- .par(par)
- .interlace_mode(interlace_mode);
-
- if video_frame.frame_format_type()
- == ndisys::NDIlib_frame_format_type_e::NDIlib_frame_format_type_interleaved
- {
- builder = builder.field_order(gst_video::VideoFieldOrder::TopFieldFirst);
- }
-
- return Ok(VideoInfo::Video(builder.build().map_err(|_| {
- gst::element_error!(
- element,
- gst::StreamError::Format,
- ["Invalid video format configuration"]
- );
-
- gst::FlowError::NotNegotiated
- })?));
- }
-
- #[cfg(not(feature = "interlaced-fields"))]
- {
- let mut builder = gst_video::VideoInfo::builder(
- format,
- video_frame.xres() as u32,
- video_frame.yres() as u32,
- )
- .fps(gst::Fraction::from(video_frame.frame_rate()))
- .par(par)
- .interlace_mode(interlace_mode);
-
- if video_frame.frame_format_type()
- == ndisys::NDIlib_frame_format_type_e::NDIlib_frame_format_type_interleaved
- {
- builder = builder.field_order(gst_video::VideoFieldOrder::TopFieldFirst);
- }
-
- return Ok(VideoInfo::Video(builder.build().map_err(|_| {
- gst::element_error!(
- element,
- gst::StreamError::Format,
- ["Invalid video format configuration"]
- );
-
- gst::FlowError::NotNegotiated
- })?));
- }
- }
-
- #[cfg(feature = "advanced-sdk")]
- if [
- ndisys::NDIlib_FourCC_video_type_ex_SHQ0_highest_bandwidth,
- ndisys::NDIlib_FourCC_video_type_ex_SHQ2_highest_bandwidth,
- ndisys::NDIlib_FourCC_video_type_ex_SHQ7_highest_bandwidth,
- ndisys::NDIlib_FourCC_video_type_ex_SHQ0_lowest_bandwidth,
- ndisys::NDIlib_FourCC_video_type_ex_SHQ2_lowest_bandwidth,
- ndisys::NDIlib_FourCC_video_type_ex_SHQ7_lowest_bandwidth,
- ]
- .contains(&fourcc)
- {
- let variant = match fourcc {
- ndisys::NDIlib_FourCC_video_type_ex_SHQ0_highest_bandwidth
- | ndisys::NDIlib_FourCC_video_type_ex_SHQ0_lowest_bandwidth => String::from("SHQ0"),
- ndisys::NDIlib_FourCC_video_type_ex_SHQ2_highest_bandwidth
- | ndisys::NDIlib_FourCC_video_type_ex_SHQ2_lowest_bandwidth => String::from("SHQ2"),
- ndisys::NDIlib_FourCC_video_type_ex_SHQ7_highest_bandwidth
- | ndisys::NDIlib_FourCC_video_type_ex_SHQ7_lowest_bandwidth => String::from("SHQ7"),
- _ => {
- gst::element_error!(
- element,
- gst::StreamError::Format,
- [
- "Unsupported SpeedHQ video fourcc {:08x}",
- video_frame.fourcc()
- ]
- );
-
- return Err(gst::FlowError::NotNegotiated);
- }
- };
-
- return Ok(VideoInfo::SpeedHQInfo {
- variant,
- xres: video_frame.xres(),
- yres: video_frame.yres(),
- fps_n: video_frame.frame_rate().0,
- fps_d: video_frame.frame_rate().1,
- par_n: par.numer(),
- par_d: par.denom(),
- interlace_mode,
- });
- }
-
- #[cfg(feature = "advanced-sdk")]
- if [
- ndisys::NDIlib_FourCC_video_type_ex_H264_highest_bandwidth,
- ndisys::NDIlib_FourCC_video_type_ex_H264_lowest_bandwidth,
- ndisys::NDIlib_FourCC_video_type_ex_H264_alpha_highest_bandwidth,
- ndisys::NDIlib_FourCC_video_type_ex_H264_alpha_lowest_bandwidth,
- ]
- .contains(&fourcc)
- {
- let compressed_packet = video_frame.compressed_packet().ok_or_else(|| {
- gst::error!(
- CAT,
- obj: element,
- "Video packet doesn't have compressed packet start"
- );
- gst::element_error!(element, gst::StreamError::Format, ["Invalid video packet"]);
-
- gst::FlowError::Error
- })?;
-
- if compressed_packet.fourcc != NDIlib_compressed_FourCC_type_H264 {
- gst::error!(CAT, obj: element, "Non-H264 video packet");
- gst::element_error!(element, gst::StreamError::Format, ["Invalid video packet"]);
-
- return Err(gst::FlowError::Error);
- }
-
- return Ok(VideoInfo::H264 {
- xres: video_frame.xres(),
- yres: video_frame.yres(),
- fps_n: video_frame.frame_rate().0,
- fps_d: video_frame.frame_rate().1,
- par_n: par.numer(),
- par_d: par.denom(),
- interlace_mode,
- });
- }
-
- #[cfg(feature = "advanced-sdk")]
- if [
- ndisys::NDIlib_FourCC_video_type_ex_HEVC_highest_bandwidth,
- ndisys::NDIlib_FourCC_video_type_ex_HEVC_lowest_bandwidth,
- ndisys::NDIlib_FourCC_video_type_ex_HEVC_alpha_highest_bandwidth,
- ndisys::NDIlib_FourCC_video_type_ex_HEVC_alpha_lowest_bandwidth,
- ]
- .contains(&fourcc)
- {
- let compressed_packet = video_frame.compressed_packet().ok_or_else(|| {
- gst::error!(
- CAT,
- obj: element,
- "Video packet doesn't have compressed packet start"
- );
- gst::element_error!(element, gst::StreamError::Format, ["Invalid video packet"]);
-
- gst::FlowError::Error
- })?;
-
- if compressed_packet.fourcc != NDIlib_compressed_FourCC_type_HEVC {
- gst::error!(CAT, obj: element, "Non-H265 video packet");
- gst::element_error!(element, gst::StreamError::Format, ["Invalid video packet"]);
-
- return Err(gst::FlowError::Error);
- }
-
- return Ok(VideoInfo::H265 {
- xres: video_frame.xres(),
- yres: video_frame.yres(),
- fps_n: video_frame.frame_rate().0,
- fps_d: video_frame.frame_rate().1,
- par_n: par.numer(),
- par_d: par.denom(),
- interlace_mode,
- });
- }
-
- gst::element_error!(
- element,
- gst::StreamError::Format,
- ["Unsupported video fourcc {:08x}", video_frame.fourcc()]
- );
- Err(gst::FlowError::NotNegotiated)
- }
-
- fn create_video_buffer(
- &self,
- element: &gst::Element,
- pts: gst::ClockTime,
- duration: Option<gst::ClockTime>,
- info: &VideoInfo,
- video_frame: &VideoFrame,
- ) -> Result<gst::Buffer, gst::FlowError> {
- let mut buffer = self.copy_video_frame(element, info, video_frame)?;
- {
- let buffer = buffer.get_mut().unwrap();
- buffer.set_pts(pts);
- buffer.set_duration(duration);
-
- gst::ReferenceTimestampMeta::add(
- buffer,
- &crate::TIMECODE_CAPS,
- (video_frame.timecode() as u64 * 100).nseconds(),
- gst::ClockTime::NONE,
- );
- if video_frame.timestamp() != ndisys::NDIlib_recv_timestamp_undefined {
- gst::ReferenceTimestampMeta::add(
- buffer,
- &crate::TIMESTAMP_CAPS,
- (video_frame.timestamp() as u64 * 100).nseconds(),
- gst::ClockTime::NONE,
- );
- }
-
- #[cfg(feature = "interlaced-fields")]
- {
- match video_frame.frame_format_type() {
- ndisys::NDIlib_frame_format_type_e::NDIlib_frame_format_type_interleaved => {
- buffer.set_video_flags(
- gst_video::VideoBufferFlags::INTERLACED
- | gst_video::VideoBufferFlags::TFF,
- );
- }
- ndisys::NDIlib_frame_format_type_e::NDIlib_frame_format_type_field_0 => {
- buffer.set_video_flags(
- gst_video::VideoBufferFlags::INTERLACED
- | gst_video::VideoBufferFlags::TOP_FIELD,
- );
- }
- ndisys::NDIlib_frame_format_type_e::NDIlib_frame_format_type_field_1 => {
- buffer.set_video_flags(
- gst_video::VideoBufferFlags::INTERLACED
- | gst_video::VideoBufferFlags::BOTTOM_FIELD,
- );
- }
- _ => (),
- };
- }
-
- #[cfg(not(feature = "interlaced-fields"))]
- {
- if video_frame.frame_format_type()
- == ndisys::NDIlib_frame_format_type_e::NDIlib_frame_format_type_interleaved
- {
- buffer.set_video_flags(
- gst_video::VideoBufferFlags::INTERLACED | gst_video::VideoBufferFlags::TFF,
- );
- }
- }
- }
-
- Ok(buffer)
- }
-
- fn copy_video_frame(
- &self,
- #[allow(unused_variables)] element: &gst::Element,
- info: &VideoInfo,
- video_frame: &VideoFrame,
- ) -> Result<gst::Buffer, gst::FlowError> {
- match info {
- VideoInfo::Video(ref info) => {
- let src = video_frame.data().ok_or(gst::FlowError::Error)?;
-
- let buffer = gst::Buffer::with_size(info.size()).unwrap();
- let mut vframe = gst_video::VideoFrame::from_buffer_writable(buffer, info).unwrap();
-
- match info.format() {
- gst_video::VideoFormat::Uyvy
- | gst_video::VideoFormat::Bgra
- | gst_video::VideoFormat::Bgrx
- | gst_video::VideoFormat::Rgba
- | gst_video::VideoFormat::Rgbx => {
- let line_bytes = if info.format() == gst_video::VideoFormat::Uyvy {
- 2 * vframe.width() as usize
- } else {
- 4 * vframe.width() as usize
- };
-
- let dest_stride = vframe.plane_stride()[0] as usize;
- let dest = vframe.plane_data_mut(0).unwrap();
- let src_stride = video_frame.line_stride_or_data_size_in_bytes() as usize;
- let plane_size = video_frame.yres() as usize * src_stride;
-
- if src.len() < plane_size || src_stride < line_bytes {
- gst::error!(CAT, obj: element, "Video packet has wrong stride or size");
- gst::element_error!(
- element,
- gst::StreamError::Format,
- ["Video packet has wrong stride or size"]
- );
- return Err(gst::FlowError::Error);
- }
-
- for (dest, src) in dest
- .chunks_exact_mut(dest_stride)
- .zip(src.chunks_exact(src_stride))
- {
- dest[..line_bytes].copy_from_slice(&src[..line_bytes]);
- }
- }
- gst_video::VideoFormat::Nv12 => {
- let line_bytes = vframe.width() as usize;
- let src_stride = video_frame.line_stride_or_data_size_in_bytes() as usize;
- let plane_size = video_frame.yres() as usize * src_stride;
-
- if src.len() < 2 * plane_size || src_stride < line_bytes {
- gst::error!(CAT, obj: element, "Video packet has wrong stride or size");
- gst::element_error!(
- element,
- gst::StreamError::Format,
- ["Video packet has wrong stride or size"]
- );
- return Err(gst::FlowError::Error);
- }
-
- // First plane
- {
- let dest_stride = vframe.plane_stride()[0] as usize;
- let dest = vframe.plane_data_mut(0).unwrap();
- let src = &src[..plane_size];
-
- for (dest, src) in dest
- .chunks_exact_mut(dest_stride)
- .zip(src.chunks_exact(src_stride))
- {
- dest[..line_bytes].copy_from_slice(&src[..line_bytes]);
- }
- }
-
- // Second plane
- {
- let dest_stride = vframe.plane_stride()[1] as usize;
- let dest = vframe.plane_data_mut(1).unwrap();
- let src = &src[plane_size..];
-
- for (dest, src) in dest
- .chunks_exact_mut(dest_stride)
- .zip(src.chunks_exact(src_stride))
- {
- dest[..line_bytes].copy_from_slice(&src[..line_bytes]);
- }
- }
- }
- gst_video::VideoFormat::Yv12 | gst_video::VideoFormat::I420 => {
- let line_bytes = vframe.width() as usize;
- let line_bytes1 = (line_bytes + 1) / 2;
-
- let src_stride = video_frame.line_stride_or_data_size_in_bytes() as usize;
- let src_stride1 = (src_stride + 1) / 2;
-
- let plane_size = video_frame.yres() as usize * src_stride;
- let plane_size1 = ((video_frame.yres() as usize + 1) / 2) * src_stride1;
-
- if src.len() < plane_size + 2 * plane_size1 || src_stride < line_bytes {
- gst::error!(CAT, obj: element, "Video packet has wrong stride or size");
- gst::element_error!(
- element,
- gst::StreamError::Format,
- ["Video packet has wrong stride or size"]
- );
- return Err(gst::FlowError::Error);
- }
-
- // First plane
- {
- let dest_stride = vframe.plane_stride()[0] as usize;
- let dest = vframe.plane_data_mut(0).unwrap();
- let src = &src[..plane_size];
-
- for (dest, src) in dest
- .chunks_exact_mut(dest_stride)
- .zip(src.chunks_exact(src_stride))
- {
- dest[..line_bytes].copy_from_slice(&src[..line_bytes]);
- }
- }
-
- // Second plane
- {
- let dest_stride = vframe.plane_stride()[1] as usize;
- let dest = vframe.plane_data_mut(1).unwrap();
- let src = &src[plane_size..][..plane_size1];
-
- for (dest, src) in dest
- .chunks_exact_mut(dest_stride)
- .zip(src.chunks_exact(src_stride1))
- {
- dest[..line_bytes1].copy_from_slice(&src[..line_bytes1]);
- }
- }
-
- // Third plane
- {
- let dest_stride = vframe.plane_stride()[2] as usize;
- let dest = vframe.plane_data_mut(2).unwrap();
- let src = &src[plane_size + plane_size1..][..plane_size1];
-
- for (dest, src) in dest
- .chunks_exact_mut(dest_stride)
- .zip(src.chunks_exact(src_stride1))
- {
- dest[..line_bytes1].copy_from_slice(&src[..line_bytes1]);
- }
- }
- }
- _ => unreachable!(),
- }
-
- Ok(vframe.into_buffer())
- }
- #[cfg(feature = "advanced-sdk")]
- VideoInfo::SpeedHQInfo { .. } => {
- let data = video_frame.data().ok_or_else(|| {
- gst::error!(CAT, obj: element, "Video packet has no data");
- gst::element_error!(
- element,
- gst::StreamError::Format,
- ["Invalid video packet"]
- );
-
- gst::FlowError::Error
- })?;
-
- Ok(gst::Buffer::from_mut_slice(Vec::from(data)))
- }
- #[cfg(feature = "advanced-sdk")]
- VideoInfo::H264 { .. } | VideoInfo::H265 { .. } => {
- let compressed_packet = video_frame.compressed_packet().ok_or_else(|| {
- gst::error!(
- CAT,
- obj: element,
- "Video packet doesn't have compressed packet start"
- );
- gst::element_error!(
- element,
- gst::StreamError::Format,
- ["Invalid video packet"]
- );
-
- gst::FlowError::Error
- })?;
-
- let mut buffer = Vec::new();
- if let Some(extra_data) = compressed_packet.extra_data {
- buffer.extend_from_slice(extra_data);
- }
- buffer.extend_from_slice(compressed_packet.data);
- let mut buffer = gst::Buffer::from_mut_slice(buffer);
- if !compressed_packet.key_frame {
- let buffer = buffer.get_mut().unwrap();
- buffer.set_flags(gst::BufferFlags::DELTA_UNIT);
- }
-
- Ok(buffer)
- }
- }
- }
-
- fn create_audio_buffer_and_info(
- &self,
- element: &gst::Element,
- audio_frame: AudioFrame,
- ) -> Result<Buffer, gst::FlowError> {
- gst::debug!(CAT, obj: element, "Received audio frame {:?}", audio_frame);
-
- let (pts, duration, discont) = self
- .calculate_audio_timestamp(element, &audio_frame)
- .ok_or_else(|| {
- gst::debug!(CAT, obj: element, "Flushing, dropping buffer");
- gst::FlowError::Flushing
- })?;
-
- let info = self.create_audio_info(element, &audio_frame)?;
-
- let mut buffer = self.create_audio_buffer(element, pts, duration, &info, &audio_frame)?;
- if discont {
- buffer
- .get_mut()
- .unwrap()
- .set_flags(gst::BufferFlags::RESYNC);
- }
-
- gst::log!(CAT, obj: element, "Produced audio buffer {:?}", buffer);
-
- Ok(Buffer::Audio(buffer, info))
- }
-
- fn calculate_audio_timestamp(
- &self,
- element: &gst::Element,
- audio_frame: &AudioFrame,
- ) -> Option<(gst::ClockTime, Option<gst::ClockTime>, bool)> {
- let duration = gst::ClockTime::SECOND.mul_div_floor(
- audio_frame.no_samples() as u64,
- audio_frame.sample_rate() as u64,
- );
-
- self.calculate_timestamp(
- element,
- true,
- audio_frame.timestamp(),
- audio_frame.timecode(),
- duration,
- )
- }
-
- fn create_audio_info(
- &self,
- element: &gst::Element,
- audio_frame: &AudioFrame,
- ) -> Result<AudioInfo, gst::FlowError> {
- let fourcc = audio_frame.fourcc();
-
- if [NDIlib_FourCC_audio_type_FLTp].contains(&fourcc) {
- let channels = audio_frame.no_channels() as u32;
- let mut positions = [gst_audio::AudioChannelPosition::None; 64];
- if channels <= 8 {
- let _ = gst_audio::AudioChannelPosition::positions_from_mask(
- gst_audio::AudioChannelPosition::fallback_mask(channels),
- &mut positions[..channels as usize],
- );
- }
-
- let builder = gst_audio::AudioInfo::builder(
- gst_audio::AUDIO_FORMAT_F32,
- audio_frame.sample_rate() as u32,
- channels,
- )
- .positions(&positions[..channels as usize]);
-
- let info = builder.build().map_err(|_| {
- gst::element_error!(
- element,
- gst::StreamError::Format,
- ["Invalid audio format configuration"]
- );
-
- gst::FlowError::NotNegotiated
- })?;
-
- return Ok(AudioInfo::Audio(info));
- }
-
- #[cfg(feature = "advanced-sdk")]
- if [NDIlib_FourCC_audio_type_AAC].contains(&fourcc) {
- let compressed_packet = audio_frame.compressed_packet().ok_or_else(|| {
- gst::error!(
- CAT,
- obj: element,
- "Audio packet doesn't have compressed packet start"
- );
- gst::element_error!(element, gst::StreamError::Format, ["Invalid audio packet"]);
-
- gst::FlowError::Error
- })?;
-
- if compressed_packet.fourcc != NDIlib_compressed_FourCC_type_AAC {
- gst::error!(CAT, obj: element, "Non-AAC audio packet");
- gst::element_error!(element, gst::StreamError::Format, ["Invalid audio packet"]);
-
- return Err(gst::FlowError::Error);
- }
-
- return Ok(AudioInfo::Aac {
- sample_rate: audio_frame.sample_rate(),
- no_channels: audio_frame.no_channels(),
- codec_data: compressed_packet
- .extra_data
- .ok_or(gst::FlowError::NotNegotiated)?
- .try_into()
- .map_err(|_| gst::FlowError::NotNegotiated)?,
- });
- }
-
- // FIXME: Needs testing with an actual stream to understand how it works
- // #[cfg(feature = "advanced-sdk")]
- // if [NDIlib_FourCC_audio_type_Opus].contains(&fourcc) {}
-
- gst::element_error!(
- element,
- gst::StreamError::Format,
- ["Unsupported audio fourcc {:08x}", audio_frame.fourcc()]
- );
- Err(gst::FlowError::NotNegotiated)
- }
-
- fn create_audio_buffer(
- &self,
- #[allow(unused_variables)] element: &gst::Element,
- pts: gst::ClockTime,
- duration: Option<gst::ClockTime>,
- info: &AudioInfo,
- audio_frame: &AudioFrame,
- ) -> Result<gst::Buffer, gst::FlowError> {
- match info {
- AudioInfo::Audio(ref info) => {
- let src = audio_frame.data().ok_or(gst::FlowError::Error)?;
- let buff_size = (audio_frame.no_samples() as u32 * info.bpf()) as usize;
-
- let mut buffer = gst::Buffer::with_size(buff_size).unwrap();
- {
- let buffer = buffer.get_mut().unwrap();
-
- buffer.set_pts(pts);
- buffer.set_duration(duration);
-
- gst::ReferenceTimestampMeta::add(
- buffer,
- &crate::TIMECODE_CAPS,
- (audio_frame.timecode() as u64 * 100).nseconds(),
- gst::ClockTime::NONE,
- );
- if audio_frame.timestamp() != ndisys::NDIlib_recv_timestamp_undefined {
- gst::ReferenceTimestampMeta::add(
- buffer,
- &crate::TIMESTAMP_CAPS,
- (audio_frame.timestamp() as u64 * 100).nseconds(),
- gst::ClockTime::NONE,
- );
- }
-
- let mut dest = buffer.map_writable().unwrap();
- let dest = dest
- .as_mut_slice_of::<f32>()
- .map_err(|_| gst::FlowError::NotNegotiated)?;
- assert!(
- dest.len()
- == audio_frame.no_samples() as usize
- * audio_frame.no_channels() as usize
- );
-
- for (channel, samples) in src
- .chunks_exact(audio_frame.channel_stride_or_data_size_in_bytes() as usize)
- .enumerate()
- {
- let samples = samples
- .as_slice_of::<f32>()
- .map_err(|_| gst::FlowError::NotNegotiated)?;
-
- for (i, sample) in samples[..audio_frame.no_samples() as usize]
- .iter()
- .enumerate()
- {
- dest[i * (audio_frame.no_channels() as usize) + channel] = *sample;
- }
- }
- }
-
- Ok(buffer)
- }
- #[cfg(feature = "advanced-sdk")]
- AudioInfo::Opus { .. } => {
- let data = audio_frame.data().ok_or_else(|| {
- gst::error!(CAT, obj: element, "Audio packet has no data");
- gst::element_error!(
- element,
- gst::StreamError::Format,
- ["Invalid audio packet"]
- );
-
- gst::FlowError::Error
- })?;
-
- Ok(gst::Buffer::from_mut_slice(Vec::from(data)))
- }
- #[cfg(feature = "advanced-sdk")]
- AudioInfo::Aac { .. } => {
- let compressed_packet = audio_frame.compressed_packet().ok_or_else(|| {
- gst::error!(
- CAT,
- obj: element,
- "Audio packet doesn't have compressed packet start"
- );
- gst::element_error!(
- element,
- gst::StreamError::Format,
- ["Invalid audio packet"]
- );
-
- gst::FlowError::Error
- })?;
-
- Ok(gst::Buffer::from_mut_slice(Vec::from(
- compressed_packet.data,
- )))
- }
- }
- }
}
diff --git a/net/ndi/src/ndisrcdemux/imp.rs b/net/ndi/src/ndisrcdemux/imp.rs
index 225431a6d..be93e4c83 100644
--- a/net/ndi/src/ndisrcdemux/imp.rs
+++ b/net/ndi/src/ndisrcdemux/imp.rs
@@ -1,13 +1,20 @@
// SPDX-License-Identifier: MPL-2.0
+use atomic_refcell::AtomicRefCell;
+use gst::glib::once_cell::sync::Lazy;
use gst::prelude::*;
use gst::subclass::prelude::*;
+use gst_video::prelude::*;
-use std::sync::Mutex;
+use std::{cmp, collections::VecDeque, sync::Mutex};
-use gst::glib::once_cell::sync::Lazy;
+use byte_slice_cast::*;
-use crate::ndisrcmeta;
+use crate::{
+ ndi_cc_meta::NDICCMetaDecoder,
+ ndisrcmeta::{self, Buffer},
+ ndisys, TimestampMode,
+};
static CAT: Lazy<gst::DebugCategory> = Lazy::new(|| {
gst::DebugCategory::new(
@@ -17,14 +24,55 @@ static CAT: Lazy<gst::DebugCategory> = Lazy::new(|| {
)
});
-#[derive(Default)]
struct State {
combiner: gst_base::UniqueFlowCombiner,
video_pad: Option<gst::Pad>,
+ video_info: Option<VideoInfo>,
video_caps: Option<gst::Caps>,
+ video_buffer_pool: Option<gst::BufferPool>,
audio_pad: Option<gst::Pad>,
+ audio_info: Option<AudioInfo>,
audio_caps: Option<gst::Caps>,
+ // Only set for raw audio
+ audio_info_non_interleaved: Option<gst_audio::AudioInfo>,
+ audio_caps_non_interleaved: Option<gst::Caps>,
+ audio_non_interleaved: bool,
+
+ ndi_cc_decoder: Option<NDICCMetaDecoder>,
+ pending_metadata: Vec<crate::ndi::MetadataFrame>,
+
+ // Audio/video time observations
+ timestamp_mode: TimestampMode,
+ observations_timestamp: [Observations; 2],
+ observations_timecode: [Observations; 2],
+}
+
+impl Default for State {
+ fn default() -> State {
+ State {
+ combiner: gst_base::UniqueFlowCombiner::new(),
+
+ video_pad: None,
+ video_info: None,
+ video_caps: None,
+ video_buffer_pool: None,
+
+ audio_pad: None,
+ audio_info: None,
+ audio_caps: None,
+ audio_info_non_interleaved: None,
+ audio_caps_non_interleaved: None,
+ audio_non_interleaved: false,
+
+ ndi_cc_decoder: None,
+ pending_metadata: Vec::new(),
+
+ timestamp_mode: TimestampMode::Auto,
+ observations_timestamp: [Observations::default(), Observations::default()],
+ observations_timecode: [Observations::default(), Observations::default()],
+ }
+ }
}
pub struct NdiSrcDemux {
@@ -135,12 +183,18 @@ impl ElementImpl for NdiSrcDemux {
match transition {
gst::StateChange::PausedToReady => {
let mut state = self.state.lock().unwrap();
+
for pad in [state.audio_pad.take(), state.video_pad.take()]
.iter()
.flatten()
{
self.obj().remove_pad(pad).unwrap();
}
+
+ if let Some(pool) = state.video_buffer_pool.take() {
+ let _ = pool.set_active(false);
+ }
+
*state = State::default();
}
_ => (),
@@ -158,7 +212,7 @@ impl NdiSrcDemux {
) -> Result<gst::FlowSuccess, gst::FlowError> {
gst::log!(CAT, imp: self, "Handling buffer {:?}", buffer);
- let meta = buffer
+ let mut meta = buffer
.make_mut()
.meta_mut::<ndisrcmeta::NdiSrcMeta>()
.ok_or_else(|| {
@@ -166,125 +220,325 @@ impl NdiSrcDemux {
gst::FlowError::Error
})?;
- let mut events = vec![];
- let srcpad;
- let mut add_pad = false;
-
let mut state = self.state.lock().unwrap();
- let caps = meta.caps();
- match meta.stream_type() {
- ndisrcmeta::StreamType::Audio => {
+ let ndi_buffer = meta.take_ndi_buffer();
+
+ match ndi_buffer {
+ Buffer::Audio { ref frame, .. } => {
+ gst::debug!(CAT, imp: self, "Received audio frame {:?}", frame);
+
+ let mut reconfigure = false;
+ let info = self.create_audio_info(frame)?;
+ if Some(&info) != state.audio_info.as_ref() {
+ let caps = info.to_caps().map_err(|_| {
+ gst::element_imp_error!(
+ self,
+ gst::ResourceError::Settings,
+ ["Invalid audio info received: {:?}", info]
+ );
+ gst::FlowError::NotNegotiated
+ })?;
+
+ gst::debug!(CAT, imp: self, "Audio caps changed to {}", caps);
+
+ #[allow(irrefutable_let_patterns)]
+ if let AudioInfo::Audio(ref info) = info {
+ let mut builder = gst_audio::AudioInfo::builder(
+ info.format(),
+ info.rate(),
+ info.channels(),
+ )
+ .layout(gst_audio::AudioLayout::NonInterleaved);
+
+ if let Some(positions) = info.positions() {
+ builder = builder.positions(positions);
+ }
+
+ let non_interleaved_info = builder.build().unwrap();
+ state.audio_caps_non_interleaved =
+ Some(non_interleaved_info.to_caps().unwrap());
+ state.audio_info_non_interleaved = Some(non_interleaved_info);
+ } else {
+ state.audio_non_interleaved = false;
+ state.audio_caps_non_interleaved = None;
+ state.audio_info_non_interleaved = None;
+ }
+
+ state.audio_info = Some(info);
+ state.audio_caps = Some(caps);
+ reconfigure = true;
+ }
+
+ let srcpad;
if let Some(ref pad) = state.audio_pad {
srcpad = pad.clone();
+ reconfigure |= pad.check_reconfigure();
} else {
- gst::debug!(CAT, imp: self, "Adding audio pad with caps {}", caps);
+ gst::debug!(CAT, imp: self, "Adding audio pad");
let templ = self.obj().element_class().pad_template("audio").unwrap();
let pad = gst::Pad::builder_from_template(&templ)
.flags(gst::PadFlags::FIXED_CAPS)
.build();
- let mut caps_event = Some(gst::event::Caps::new(&caps));
+ state.audio_pad = Some(pad.clone());
+ let _ = pad.set_active(true);
+ state.combiner.add_pad(&pad);
+
+ let mut stored_caps = false;
self.sinkpad.sticky_events_foreach(|ev| {
- if ev.type_() < gst::EventType::Caps {
- events.push(ev.clone());
- } else {
- if let Some(ev) = caps_event.take() {
- events.push(ev);
- }
+ if let gst::EventView::StreamStart(ev) = ev.view() {
+ let stream_start = gst::event::StreamStart::builder(&format!(
+ "{}/audio",
+ ev.stream_id()
+ ))
+ .seqnum(ev.seqnum())
+ .flags(ev.stream_flags())
+ .group_id(ev.group_id().unwrap_or_else(|| {
+ // This can't really happen as ndisrc would provide one!
+ gst::error!(CAT, imp: self, "Upstream provided no group id");
+ gst::GroupId::next()
+ }))
+ .build();
- if ev.type_() != gst::EventType::Caps {
- events.push(ev.clone());
- }
+ let _ = pad.store_sticky_event(&stream_start);
+ } else if ev.type_() < gst::EventType::Caps {
+ let _ = pad.store_sticky_event(ev);
+ } else if ev.type_() > gst::EventType::Caps {
+ // We store the interleaved caps for starters
+ let caps =
+ gst::event::Caps::builder(state.audio_caps.as_ref().unwrap())
+ .build();
+ let _ = pad.store_sticky_event(&caps);
+ stored_caps = true;
+ let _ = pad.store_sticky_event(ev);
}
std::ops::ControlFlow::Continue(gst::EventForeachAction::Keep)
});
- state.audio_caps = Some(caps.clone());
- state.audio_pad = Some(pad.clone());
+ if !stored_caps {
+ // We store the interleaved caps for starters
+ let caps =
+ gst::event::Caps::builder(state.audio_caps.as_ref().unwrap()).build();
+ let _ = pad.store_sticky_event(&caps);
+ }
- let _ = pad.set_active(true);
- for ev in events.drain(..) {
- let _ = pad.store_sticky_event(&ev);
+ drop(state);
+
+ self.obj().add_pad(&pad).unwrap();
+ if self.obj().num_src_pads() == 2 {
+ self.obj().no_more_pads();
}
- state.combiner.add_pad(&pad);
+ state = self.state.lock().unwrap();
- add_pad = true;
srcpad = pad;
+ // No need to check for non-interleaved caps support below or update the caps
+ // because the same caps were already set above
+ reconfigure = state.audio_caps_non_interleaved.is_some();
}
- if state.audio_caps.as_ref() != Some(&caps) {
- gst::debug!(CAT, imp: self, "Audio caps changed to {}", caps);
- events.push(gst::event::Caps::new(&caps));
- state.audio_caps = Some(caps);
+ if reconfigure {
+ // FIXME: As this is a demuxer we can't unfortunately do an allocation query
+ // downstream without risking deadlocks.
+
+ // Check if there's a peer downstream and if it supports the non-interleaved
+ // caps, otherwise fall back to the normal caps.
+ if let Some(caps) = state.audio_caps_non_interleaved.clone() {
+ drop(state);
+ let allowed_caps = srcpad.peer().map(|peer| peer.query_caps(Some(&caps)));
+ state = self.state.lock().unwrap();
+
+ gst::info!(CAT, imp: self, "Allowed audio caps {allowed_caps:?}");
+
+ state.audio_non_interleaved = allowed_caps
+ .map_or(false, |allowed_caps| allowed_caps.can_intersect(&caps));
+
+ gst::info!(
+ CAT,
+ imp: self,
+ "Non-interleaved caps{} supported",
+ if state.audio_non_interleaved { "" } else { "not" },
+ );
+ }
+
+ let caps = gst::event::Caps::builder(if state.audio_non_interleaved {
+ state.audio_caps_non_interleaved.as_ref().unwrap()
+ } else {
+ state.audio_caps.as_ref().unwrap()
+ })
+ .build();
+
+ let _ = srcpad.store_sticky_event(&caps);
}
}
- ndisrcmeta::StreamType::Video => {
+ Buffer::Video { ref frame, .. } => {
+ gst::debug!(CAT, imp: self, "Received video frame {:?}", frame);
+
+ let mut reconfigure = false;
+ let info = self.create_video_info(frame)?;
+ if Some(&info) != state.video_info.as_ref() {
+ let caps = info.to_caps().map_err(|_| {
+ gst::element_imp_error!(
+ self,
+ gst::ResourceError::Settings,
+ ["Invalid video info received: {:?}", info]
+ );
+ gst::FlowError::NotNegotiated
+ })?;
+
+ if state.ndi_cc_decoder.is_none() {
+ state.ndi_cc_decoder = Some(NDICCMetaDecoder::new(info.width()));
+ }
+
+ gst::debug!(CAT, imp: self, "Video caps changed to {}", caps);
+ state.video_info = Some(info);
+ state.video_caps = Some(caps);
+ state.video_buffer_pool = None;
+ reconfigure = true;
+ }
+
+ let srcpad;
if let Some(ref pad) = state.video_pad {
srcpad = pad.clone();
+ reconfigure |= pad.check_reconfigure();
} else {
- gst::debug!(CAT, imp: self, "Adding video pad with caps {}", caps);
+ gst::debug!(CAT, imp: self, "Adding video pad");
let templ = self.obj().element_class().pad_template("video").unwrap();
let pad = gst::Pad::builder_from_template(&templ)
.flags(gst::PadFlags::FIXED_CAPS)
.build();
- let mut caps_event = Some(gst::event::Caps::new(&caps));
+ state.video_pad = Some(pad.clone());
+
+ let _ = pad.set_active(true);
+ state.combiner.add_pad(&pad);
+ let mut stored_caps = false;
self.sinkpad.sticky_events_foreach(|ev| {
- if ev.type_() < gst::EventType::Caps {
- events.push(ev.clone());
- } else {
- if let Some(ev) = caps_event.take() {
- events.push(ev);
- }
+ if let gst::EventView::StreamStart(ev) = ev.view() {
+ let stream_start = gst::event::StreamStart::builder(&format!(
+ "{}/video",
+ ev.stream_id()
+ ))
+ .seqnum(ev.seqnum())
+ .flags(ev.stream_flags())
+ .group_id(ev.group_id().unwrap_or_else(|| {
+ // This can't really happen as ndisrc would provide one!
+ gst::error!(CAT, imp: self, "Upstream provided no group id");
+ gst::GroupId::next()
+ }))
+ .build();
- if ev.type_() != gst::EventType::Caps {
- events.push(ev.clone());
- }
+ let _ = pad.store_sticky_event(&stream_start);
+ } else if ev.type_() < gst::EventType::Caps {
+ let _ = pad.store_sticky_event(ev);
+ } else if ev.type_() > gst::EventType::Caps {
+ let caps =
+ gst::event::Caps::builder(state.video_caps.as_ref().unwrap())
+ .build();
+ let _ = pad.store_sticky_event(&caps);
+ stored_caps = true;
+ let _ = pad.store_sticky_event(ev);
}
std::ops::ControlFlow::Continue(gst::EventForeachAction::Keep)
});
- state.video_caps = Some(caps.clone());
- state.video_pad = Some(pad.clone());
+ if !stored_caps {
+ let caps =
+ gst::event::Caps::builder(state.video_caps.as_ref().unwrap()).build();
+ let _ = pad.store_sticky_event(&caps);
+ }
- let _ = pad.set_active(true);
- for ev in events.drain(..) {
- let _ = pad.store_sticky_event(&ev);
+ drop(state);
+
+ self.obj().add_pad(&pad).unwrap();
+ if self.obj().num_src_pads() == 2 {
+ self.obj().no_more_pads();
}
- state.combiner.add_pad(&pad);
+ state = self.state.lock().unwrap();
- add_pad = true;
srcpad = pad;
+
+ // New caps were already stored above
+ reconfigure = false;
}
- if state.video_caps.as_ref() != Some(&caps) {
- gst::debug!(CAT, imp: self, "Video caps changed to {}", caps);
- events.push(gst::event::Caps::new(&caps));
- state.video_caps = Some(caps);
+ if reconfigure {
+ // FIXME: As this is a demuxer we can't unfortunately do an allocation query
+ // downstream without risking deadlocks.
+ let caps =
+ gst::event::Caps::builder(state.video_caps.as_ref().unwrap()).build();
+
+ let _ = srcpad.store_sticky_event(&caps);
}
}
+ Buffer::Metadata { .. } => {
+ // Nothing to be done here
+ }
}
- drop(state);
- meta.remove().unwrap();
- if add_pad {
- self.obj().add_pad(&srcpad).unwrap();
- if self.obj().num_src_pads() == 2 {
- self.obj().no_more_pads();
+ let srcpad;
+ let buffer;
+ match ndi_buffer {
+ Buffer::Audio {
+ frame,
+ discont,
+ receive_time_gst,
+ receive_time_real,
+ } => {
+ srcpad = state.audio_pad.clone().unwrap();
+ let (pts, duration, resync) = self
+ .calculate_audio_timestamp(
+ &mut state,
+ receive_time_gst,
+ receive_time_real,
+ &frame,
+ )
+ .ok_or_else(|| {
+ gst::debug!(CAT, imp: self, "Flushing, dropping buffer");
+ gst::FlowError::Flushing
+ })?;
+
+ buffer = self.create_audio_buffer(&state, pts, duration, discont, resync, frame)?;
+
+ gst::log!(CAT, imp: self, "Produced audio buffer {:?}", buffer);
}
- }
+ Buffer::Video {
+ frame,
+ discont,
+ receive_time_gst,
+ receive_time_real,
+ } => {
+ srcpad = state.video_pad.clone().unwrap();
+ let (pts, duration, resync) = self
+ .calculate_video_timestamp(
+ &mut state,
+ receive_time_gst,
+ receive_time_real,
+ &frame,
+ )
+ .ok_or_else(|| {
+ gst::debug!(CAT, imp: self, "Flushing, dropping buffer");
+ gst::FlowError::Flushing
+ })?;
- for ev in events {
- srcpad.push_event(ev);
- }
+ buffer =
+ self.create_video_buffer(&mut state, pts, duration, discont, resync, frame)?;
+
+ gst::log!(CAT, imp: self, "Produced video buffer {:?}", buffer);
+ }
+ Buffer::Metadata { frame, .. } => {
+ state.pending_metadata.push(frame);
+ return Ok(gst::FlowSuccess::Ok);
+ }
+ };
+ drop(state);
let res = srcpad.push(buffer);
@@ -296,16 +550,1477 @@ impl NdiSrcDemux {
use gst::EventView;
gst::log!(CAT, imp: self, "Handling event {:?}", event);
- if let EventView::Eos(_) = event.view() {
- if self.obj().num_src_pads() == 0 {
- // error out on EOS if no src pad are available
+ match event.view() {
+ EventView::StreamStart(ev) => {
+ let state = self.state.lock().unwrap();
+ let pads = [
+ ("audio", state.audio_pad.clone()),
+ ("video", state.video_pad.clone()),
+ ];
+ drop(state);
+
+ for (stream_name, srcpad) in pads {
+ let Some(srcpad) = srcpad else {
+ continue;
+ };
+
+ let stream_start = gst::event::StreamStart::builder(&format!(
+ "{}/{stream_name}",
+ ev.stream_id()
+ ))
+ .seqnum(ev.seqnum())
+ .flags(ev.stream_flags())
+ .group_id(ev.group_id().unwrap_or_else(|| {
+ // This can't really happen as ndisrc would provide one!
+ gst::error!(CAT, imp: self, "Upstream provided no group id");
+ gst::GroupId::next()
+ }))
+ .build();
+
+ let _ = srcpad.push_event(stream_start);
+ }
+
+ return true;
+ }
+ EventView::Caps(_) => {
+ return true;
+ }
+ EventView::Eos(_) => {
+ if self.obj().num_src_pads() == 0 {
+ // error out on EOS if no src pad are available
+ gst::element_imp_error!(
+ self,
+ gst::StreamError::Demux,
+ ["EOS without available srcpad(s)"]
+ );
+ }
+ }
+ _ => (),
+ }
+ gst::Pad::event_default(pad, Some(&*self.obj()), event)
+ }
+}
+
+impl NdiSrcDemux {
+ #[allow(clippy::too_many_arguments)]
+ fn calculate_timestamp(
+ &self,
+ state: &mut State,
+ is_audio: bool,
+ receive_time_gst: gst::ClockTime,
+ receive_time_real: gst::ClockTime,
+ timestamp: i64,
+ timecode: i64,
+ duration: Option<gst::ClockTime>,
+ ) -> Option<(gst::ClockTime, Option<gst::ClockTime>, bool)> {
+ let timestamp = if timestamp == ndisys::NDIlib_recv_timestamp_undefined {
+ gst::ClockTime::NONE
+ } else {
+ Some((timestamp as u64 * 100).nseconds())
+ };
+ let timecode = (timecode as u64 * 100).nseconds();
+
+ gst::log!(
+ CAT,
+ imp: self,
+ "Received frame with timecode {}, timestamp {}, duration {}, receive time {}, local time now {}",
+ timecode,
+ timestamp.display(),
+ duration.display(),
+ receive_time_gst.display(),
+ receive_time_real,
+ );
+
+ let res_timestamp = state.observations_timestamp[usize::from(!is_audio)].process(
+ self.obj().upcast_ref(),
+ timestamp,
+ receive_time_gst,
+ duration,
+ );
+
+ let res_timecode = state.observations_timecode[usize::from(!is_audio)].process(
+ self.obj().upcast_ref(),
+ Some(timecode),
+ receive_time_gst,
+ duration,
+ );
+
+ let (pts, duration, discont) = match state.timestamp_mode {
+ TimestampMode::ReceiveTimeTimecode => match res_timecode {
+ Some((pts, duration, discont)) => (pts, duration, discont),
+ None => {
+ gst::warning!(CAT, imp: self, "Can't calculate timestamp");
+ (receive_time_gst, duration, false)
+ }
+ },
+ TimestampMode::ReceiveTimeTimestamp => match res_timestamp {
+ Some((pts, duration, discont)) => (pts, duration, discont),
+ None => {
+ if timestamp.is_some() {
+ gst::warning!(CAT, imp: self, "Can't calculate timestamp");
+ }
+
+ (receive_time_gst, duration, false)
+ }
+ },
+ TimestampMode::Timecode => (timecode, duration, false),
+ TimestampMode::Timestamp if timestamp.is_none() => (receive_time_gst, duration, false),
+ TimestampMode::Timestamp => {
+ // Timestamps are relative to the UNIX epoch
+ let timestamp = timestamp?;
+ if receive_time_real > timestamp {
+ let diff = receive_time_real - timestamp;
+ if diff > receive_time_gst {
+ (gst::ClockTime::ZERO, duration, false)
+ } else {
+ (receive_time_gst - diff, duration, false)
+ }
+ } else {
+ let diff = timestamp - receive_time_real;
+ (receive_time_gst + diff, duration, false)
+ }
+ }
+ TimestampMode::ReceiveTime => (receive_time_gst, duration, false),
+ TimestampMode::Auto => {
+ res_timecode
+ .or(res_timestamp)
+ .unwrap_or((receive_time_gst, duration, false))
+ }
+ };
+
+ gst::log!(
+ CAT,
+ imp: self,
+ "Calculated PTS {}, duration {}",
+ pts.display(),
+ duration.display(),
+ );
+
+ Some((pts, duration, discont))
+ }
+
+ fn calculate_video_timestamp(
+ &self,
+ state: &mut State,
+ receive_time_gst: gst::ClockTime,
+ receive_time_real: gst::ClockTime,
+ video_frame: &crate::ndi::VideoFrame,
+ ) -> Option<(gst::ClockTime, Option<gst::ClockTime>, bool)> {
+ let duration = gst::ClockTime::SECOND.mul_div_floor(
+ video_frame.frame_rate().1 as u64,
+ video_frame.frame_rate().0 as u64,
+ );
+
+ self.calculate_timestamp(
+ state,
+ false,
+ receive_time_gst,
+ receive_time_real,
+ video_frame.timestamp(),
+ video_frame.timecode(),
+ duration,
+ )
+ }
+
+ fn create_video_buffer_pool(&self, video_info: &gst_video::VideoInfo) -> gst::BufferPool {
+ let pool = gst_video::VideoBufferPool::new();
+ let mut config = pool.config();
+ config.set_params(
+ Some(&video_info.to_caps().unwrap()),
+ video_info.size() as u32,
+ 0,
+ 0,
+ );
+ pool.set_config(config).unwrap();
+ pool.set_active(true).unwrap();
+
+ pool.upcast()
+ }
+
+ fn create_video_info(
+ &self,
+ video_frame: &crate::ndi::VideoFrame,
+ ) -> Result<VideoInfo, gst::FlowError> {
+ let fourcc = video_frame.fourcc();
+
+ let par = gst::Fraction::approximate_f32(video_frame.picture_aspect_ratio())
+ .unwrap_or_else(|| gst::Fraction::new(1, 1))
+ * gst::Fraction::new(video_frame.yres(), video_frame.xres());
+ let interlace_mode = match video_frame.frame_format_type() {
+ ndisys::NDIlib_frame_format_type_e::NDIlib_frame_format_type_progressive => {
+ gst_video::VideoInterlaceMode::Progressive
+ }
+ ndisys::NDIlib_frame_format_type_e::NDIlib_frame_format_type_interleaved => {
+ gst_video::VideoInterlaceMode::Interleaved
+ }
+ _ => gst_video::VideoInterlaceMode::Alternate,
+ };
+
+ if [
+ ndisys::NDIlib_FourCC_video_type_UYVY,
+ ndisys::NDIlib_FourCC_video_type_UYVA,
+ ndisys::NDIlib_FourCC_video_type_YV12,
+ ndisys::NDIlib_FourCC_video_type_NV12,
+ ndisys::NDIlib_FourCC_video_type_I420,
+ ndisys::NDIlib_FourCC_video_type_BGRA,
+ ndisys::NDIlib_FourCC_video_type_BGRX,
+ ndisys::NDIlib_FourCC_video_type_RGBA,
+ ndisys::NDIlib_FourCC_video_type_BGRX,
+ ]
+ .contains(&fourcc)
+ {
+ // YV12 and I420 are swapped in the NDI SDK compared to GStreamer
+ let format = match video_frame.fourcc() {
+ ndisys::NDIlib_FourCC_video_type_UYVY => gst_video::VideoFormat::Uyvy,
+ // FIXME: This drops the alpha plane!
+ ndisys::NDIlib_FourCC_video_type_UYVA => gst_video::VideoFormat::Uyvy,
+ ndisys::NDIlib_FourCC_video_type_YV12 => gst_video::VideoFormat::I420,
+ ndisys::NDIlib_FourCC_video_type_NV12 => gst_video::VideoFormat::Nv12,
+ ndisys::NDIlib_FourCC_video_type_I420 => gst_video::VideoFormat::Yv12,
+ ndisys::NDIlib_FourCC_video_type_BGRA => gst_video::VideoFormat::Bgra,
+ ndisys::NDIlib_FourCC_video_type_BGRX => gst_video::VideoFormat::Bgrx,
+ ndisys::NDIlib_FourCC_video_type_RGBA => gst_video::VideoFormat::Rgba,
+ ndisys::NDIlib_FourCC_video_type_RGBX => gst_video::VideoFormat::Rgbx,
+ _ => {
+ gst::element_imp_error!(
+ self,
+ gst::StreamError::Format,
+ ["Unsupported video fourcc {:08x}", video_frame.fourcc()]
+ );
+
+ return Err(gst::FlowError::NotNegotiated);
+ } // TODO: NDIlib_FourCC_video_type_P216 and NDIlib_FourCC_video_type_PA16 not
+ // supported by GStreamer
+ };
+
+ let mut builder = gst_video::VideoInfo::builder(
+ format,
+ video_frame.xres() as u32,
+ video_frame.yres() as u32,
+ )
+ .fps(gst::Fraction::from(video_frame.frame_rate()))
+ .par(par)
+ .interlace_mode(interlace_mode);
+
+ if video_frame.frame_format_type()
+ == ndisys::NDIlib_frame_format_type_e::NDIlib_frame_format_type_interleaved
+ {
+ builder = builder.field_order(gst_video::VideoFieldOrder::TopFieldFirst);
+ }
+
+ return Ok(VideoInfo::Video(builder.build().map_err(|_| {
gst::element_imp_error!(
self,
- gst::StreamError::Demux,
- ["EOS without available srcpad(s)"]
+ gst::StreamError::Format,
+ ["Invalid video format configuration"]
+ );
+
+ gst::FlowError::NotNegotiated
+ })?));
+ }
+
+ #[cfg(feature = "advanced-sdk")]
+ if [
+ ndisys::NDIlib_FourCC_video_type_ex_SHQ0_highest_bandwidth,
+ ndisys::NDIlib_FourCC_video_type_ex_SHQ2_highest_bandwidth,
+ ndisys::NDIlib_FourCC_video_type_ex_SHQ7_highest_bandwidth,
+ ndisys::NDIlib_FourCC_video_type_ex_SHQ0_lowest_bandwidth,
+ ndisys::NDIlib_FourCC_video_type_ex_SHQ2_lowest_bandwidth,
+ ndisys::NDIlib_FourCC_video_type_ex_SHQ7_lowest_bandwidth,
+ ]
+ .contains(&fourcc)
+ {
+ let variant = match fourcc {
+ ndisys::NDIlib_FourCC_video_type_ex_SHQ0_highest_bandwidth
+ | ndisys::NDIlib_FourCC_video_type_ex_SHQ0_lowest_bandwidth => String::from("SHQ0"),
+ ndisys::NDIlib_FourCC_video_type_ex_SHQ2_highest_bandwidth
+ | ndisys::NDIlib_FourCC_video_type_ex_SHQ2_lowest_bandwidth => String::from("SHQ2"),
+ ndisys::NDIlib_FourCC_video_type_ex_SHQ7_highest_bandwidth
+ | ndisys::NDIlib_FourCC_video_type_ex_SHQ7_lowest_bandwidth => String::from("SHQ7"),
+ _ => {
+ gst::element_imp_error!(
+ self,
+ gst::StreamError::Format,
+ [
+ "Unsupported SpeedHQ video fourcc {:08x}",
+ video_frame.fourcc()
+ ]
+ );
+
+ return Err(gst::FlowError::NotNegotiated);
+ }
+ };
+
+ return Ok(VideoInfo::SpeedHQInfo {
+ variant,
+ xres: video_frame.xres(),
+ yres: video_frame.yres(),
+ fps_n: video_frame.frame_rate().0,
+ fps_d: video_frame.frame_rate().1,
+ par_n: par.numer(),
+ par_d: par.denom(),
+ interlace_mode,
+ });
+ }
+
+ #[cfg(feature = "advanced-sdk")]
+ if [
+ ndisys::NDIlib_FourCC_video_type_ex_H264_highest_bandwidth,
+ ndisys::NDIlib_FourCC_video_type_ex_H264_lowest_bandwidth,
+ ndisys::NDIlib_FourCC_video_type_ex_H264_alpha_highest_bandwidth,
+ ndisys::NDIlib_FourCC_video_type_ex_H264_alpha_lowest_bandwidth,
+ ]
+ .contains(&fourcc)
+ {
+ let compressed_packet = video_frame.compressed_packet().ok_or_else(|| {
+ gst::error!(
+ CAT,
+ imp: self,
+ "Video packet doesn't have compressed packet start"
);
+ gst::element_imp_error!(self, gst::StreamError::Format, ["Invalid video packet"]);
+
+ gst::FlowError::Error
+ })?;
+
+ if compressed_packet.fourcc != ndisys::NDIlib_compressed_FourCC_type_H264 {
+ gst::error!(CAT, imp: self, "Non-H264 video packet");
+ gst::element_imp_error!(self, gst::StreamError::Format, ["Invalid video packet"]);
+
+ return Err(gst::FlowError::Error);
}
+
+ return Ok(VideoInfo::H264 {
+ xres: video_frame.xres(),
+ yres: video_frame.yres(),
+ fps_n: video_frame.frame_rate().0,
+ fps_d: video_frame.frame_rate().1,
+ par_n: par.numer(),
+ par_d: par.denom(),
+ interlace_mode,
+ });
}
- gst::Pad::event_default(pad, Some(&*self.obj()), event)
+
+ #[cfg(feature = "advanced-sdk")]
+ if [
+ ndisys::NDIlib_FourCC_video_type_ex_HEVC_highest_bandwidth,
+ ndisys::NDIlib_FourCC_video_type_ex_HEVC_lowest_bandwidth,
+ ndisys::NDIlib_FourCC_video_type_ex_HEVC_alpha_highest_bandwidth,
+ ndisys::NDIlib_FourCC_video_type_ex_HEVC_alpha_lowest_bandwidth,
+ ]
+ .contains(&fourcc)
+ {
+ let compressed_packet = video_frame.compressed_packet().ok_or_else(|| {
+ gst::error!(
+ CAT,
+ imp: self,
+ "Video packet doesn't have compressed packet start"
+ );
+ gst::element_imp_error!(self, gst::StreamError::Format, ["Invalid video packet"]);
+
+ gst::FlowError::Error
+ })?;
+
+ if compressed_packet.fourcc != ndisys::NDIlib_compressed_FourCC_type_HEVC {
+ gst::error!(CAT, imp: self, "Non-H265 video packet");
+ gst::element_imp_error!(self, gst::StreamError::Format, ["Invalid video packet"]);
+
+ return Err(gst::FlowError::Error);
+ }
+
+ return Ok(VideoInfo::H265 {
+ xres: video_frame.xres(),
+ yres: video_frame.yres(),
+ fps_n: video_frame.frame_rate().0,
+ fps_d: video_frame.frame_rate().1,
+ par_n: par.numer(),
+ par_d: par.denom(),
+ interlace_mode,
+ });
+ }
+
+ gst::element_imp_error!(
+ self,
+ gst::StreamError::Format,
+ ["Unsupported video fourcc {:08x}", video_frame.fourcc()]
+ );
+ Err(gst::FlowError::NotNegotiated)
+ }
+
+ fn create_video_buffer(
+ &self,
+ state: &mut State,
+ pts: gst::ClockTime,
+ duration: Option<gst::ClockTime>,
+ discont: bool,
+ resync: bool,
+ video_frame: crate::ndi::VideoFrame,
+ ) -> Result<gst::Buffer, gst::FlowError> {
+ let timecode = video_frame.timecode();
+ let timestamp = video_frame.timestamp();
+ let frame_format_type = video_frame.frame_format_type();
+
+ let mut captions = Vec::new();
+
+ {
+ let ndi_cc_decoder = state.ndi_cc_decoder.as_mut().unwrap();
+ // handle potential width change (also needed for standalone metadata)
+ ndi_cc_decoder.set_width(state.video_info.as_ref().unwrap().width());
+
+ for metadata in state.pending_metadata.drain(..) {
+ if let Some(meta) = metadata.metadata() {
+ let res = ndi_cc_decoder.decode(meta);
+ if let Err(err) = res {
+ gst::debug!(CAT, imp: self, "Failed to parse NDI metadata: {err}");
+ }
+ }
+ }
+
+ if let Some(metadata) = video_frame.metadata() {
+ let res = ndi_cc_decoder.decode(metadata);
+ match res {
+ Ok(c) => {
+ captions.extend_from_slice(&c);
+ }
+ Err(err) => {
+ gst::debug!(CAT, imp: self, "Failed to parse NDI video frame metadata: {err}");
+ }
+ }
+ }
+ }
+
+ let mut buffer = self.wrap_or_copy_video_frame(state, video_frame)?;
+ {
+ let buffer = buffer.get_mut().unwrap();
+ buffer.set_pts(pts);
+ buffer.set_duration(duration);
+
+ if resync {
+ buffer.set_flags(gst::BufferFlags::RESYNC);
+ }
+
+ if discont {
+ buffer.set_flags(gst::BufferFlags::DISCONT);
+ }
+
+ gst::ReferenceTimestampMeta::add(
+ buffer,
+ &crate::TIMECODE_CAPS,
+ (timecode as u64 * 100).nseconds(),
+ gst::ClockTime::NONE,
+ );
+ if timestamp != ndisys::NDIlib_recv_timestamp_undefined {
+ gst::ReferenceTimestampMeta::add(
+ buffer,
+ &crate::TIMESTAMP_CAPS,
+ (timestamp as u64 * 100).nseconds(),
+ gst::ClockTime::NONE,
+ );
+ }
+
+ for caption in captions {
+ match caption.did16() {
+ gst_video::VideoAncillaryDID16::S334Eia608 => {
+ gst_video::VideoCaptionMeta::add(
+ buffer,
+ gst_video::VideoCaptionType::Cea608S3341a,
+ caption.data(),
+ );
+ }
+ gst_video::VideoAncillaryDID16::S334Eia708 => {
+ gst_video::VideoCaptionMeta::add(
+ buffer,
+ gst_video::VideoCaptionType::Cea708Cdp,
+ caption.data(),
+ );
+ }
+ _ => (),
+ }
+ }
+
+ match frame_format_type {
+ ndisys::NDIlib_frame_format_type_e::NDIlib_frame_format_type_interleaved => {
+ buffer.set_video_flags(
+ gst_video::VideoBufferFlags::INTERLACED | gst_video::VideoBufferFlags::TFF,
+ );
+ }
+ ndisys::NDIlib_frame_format_type_e::NDIlib_frame_format_type_field_0 => {
+ buffer.set_video_flags(
+ gst_video::VideoBufferFlags::INTERLACED
+ | gst_video::VideoBufferFlags::TOP_FIELD,
+ );
+ }
+ ndisys::NDIlib_frame_format_type_e::NDIlib_frame_format_type_field_1 => {
+ buffer.set_video_flags(
+ gst_video::VideoBufferFlags::INTERLACED
+ | gst_video::VideoBufferFlags::BOTTOM_FIELD,
+ );
+ }
+ _ => (),
+ }
+ }
+
+ Ok(buffer)
+ }
+
+ fn wrap_or_copy_video_frame(
+ &self,
+ state: &mut State,
+ video_frame: crate::ndi::VideoFrame,
+ ) -> Result<gst::Buffer, gst::FlowError> {
+ struct WrappedVideoFrame(crate::ndi::VideoFrame);
+
+ impl AsRef<[u8]> for WrappedVideoFrame {
+ fn as_ref(&self) -> &[u8] {
+ self.0.data().unwrap_or(&[])
+ }
+ }
+
+ match state.video_info.as_ref().unwrap() {
+ VideoInfo::Video(ref info) => {
+ match info.format() {
+ gst_video::VideoFormat::Uyvy
+ | gst_video::VideoFormat::Bgra
+ | gst_video::VideoFormat::Bgrx
+ | gst_video::VideoFormat::Rgba
+ | gst_video::VideoFormat::Rgbx => {
+ let src_stride = video_frame.line_stride_or_data_size_in_bytes() as usize;
+
+ if src_stride == info.stride()[0] as usize {
+ Ok(gst::Buffer::from_slice(WrappedVideoFrame(video_frame)))
+ } else {
+ gst::debug!(gst::CAT_PERFORMANCE, imp: self, "Copying raw video frame");
+
+ let src = video_frame.data().ok_or(gst::FlowError::Error)?;
+
+ if state.video_buffer_pool.is_none() {
+ state.video_buffer_pool = Some(self.create_video_buffer_pool(info));
+ };
+ let pool = state.video_buffer_pool.as_ref().unwrap();
+ let buffer = pool.acquire_buffer(None)?;
+
+ let mut vframe =
+ gst_video::VideoFrame::from_buffer_writable(buffer, info).unwrap();
+
+ let line_bytes = if info.format() == gst_video::VideoFormat::Uyvy {
+ 2 * vframe.width() as usize
+ } else {
+ 4 * vframe.width() as usize
+ };
+
+ let dest_stride = vframe.plane_stride()[0] as usize;
+ let dest = vframe.plane_data_mut(0).unwrap();
+ let plane_size = video_frame.yres() as usize * src_stride;
+
+ if src.len() < plane_size || src_stride < line_bytes {
+ gst::error!(CAT, imp: self, "Video packet has wrong stride or size");
+ gst::element_imp_error!(
+ self,
+ gst::StreamError::Format,
+ ["Video packet has wrong stride or size"]
+ );
+ return Err(gst::FlowError::Error);
+ }
+
+ for (dest, src) in dest
+ .chunks_exact_mut(dest_stride)
+ .zip(src.chunks_exact(src_stride))
+ {
+ dest[..line_bytes].copy_from_slice(&src[..line_bytes]);
+ }
+
+ Ok(vframe.into_buffer())
+ }
+ }
+ gst_video::VideoFormat::Nv12 => {
+ let src_stride = video_frame.line_stride_or_data_size_in_bytes() as usize;
+
+ if src_stride == info.stride()[0] as usize {
+ Ok(gst::Buffer::from_slice(WrappedVideoFrame(video_frame)))
+ } else {
+ gst::debug!(gst::CAT_PERFORMANCE, imp: self, "Copying raw video frame");
+
+ let src = video_frame.data().ok_or(gst::FlowError::Error)?;
+
+ if state.video_buffer_pool.is_none() {
+ state.video_buffer_pool = Some(self.create_video_buffer_pool(info));
+ };
+ let pool = state.video_buffer_pool.as_ref().unwrap();
+ let buffer = pool.acquire_buffer(None)?;
+
+ let mut vframe =
+ gst_video::VideoFrame::from_buffer_writable(buffer, info).unwrap();
+
+ let line_bytes = vframe.width() as usize;
+ let plane_size = video_frame.yres() as usize * src_stride;
+
+ if src.len() < 2 * plane_size || src_stride < line_bytes {
+ gst::error!(CAT, imp: self, "Video packet has wrong stride or size");
+ gst::element_imp_error!(
+ self,
+ gst::StreamError::Format,
+ ["Video packet has wrong stride or size"]
+ );
+ return Err(gst::FlowError::Error);
+ }
+
+ // First plane
+ {
+ let dest_stride = vframe.plane_stride()[0] as usize;
+ let dest = vframe.plane_data_mut(0).unwrap();
+ let src = &src[..plane_size];
+
+ for (dest, src) in dest
+ .chunks_exact_mut(dest_stride)
+ .zip(src.chunks_exact(src_stride))
+ {
+ dest[..line_bytes].copy_from_slice(&src[..line_bytes]);
+ }
+ }
+
+ // Second plane
+ {
+ let dest_stride = vframe.plane_stride()[1] as usize;
+ let dest = vframe.plane_data_mut(1).unwrap();
+ let src = &src[plane_size..];
+
+ for (dest, src) in dest
+ .chunks_exact_mut(dest_stride)
+ .zip(src.chunks_exact(src_stride))
+ {
+ dest[..line_bytes].copy_from_slice(&src[..line_bytes]);
+ }
+ }
+
+ Ok(vframe.into_buffer())
+ }
+ }
+ gst_video::VideoFormat::Yv12 | gst_video::VideoFormat::I420 => {
+ let src_stride = video_frame.line_stride_or_data_size_in_bytes() as usize;
+ let src_stride1 = (src_stride + 1) / 2;
+
+ if src_stride == info.stride()[0] as usize
+ && src_stride1 == info.stride()[1] as usize
+ {
+ Ok(gst::Buffer::from_slice(WrappedVideoFrame(video_frame)))
+ } else {
+ gst::debug!(gst::CAT_PERFORMANCE, imp: self, "Copying raw video frame");
+
+ let src = video_frame.data().ok_or(gst::FlowError::Error)?;
+
+ if state.video_buffer_pool.is_none() {
+ state.video_buffer_pool = Some(self.create_video_buffer_pool(info));
+ };
+ let pool = state.video_buffer_pool.as_ref().unwrap();
+ let buffer = pool.acquire_buffer(None)?;
+
+ let mut vframe =
+ gst_video::VideoFrame::from_buffer_writable(buffer, info).unwrap();
+
+ let line_bytes = vframe.width() as usize;
+ let line_bytes1 = (line_bytes + 1) / 2;
+
+ let plane_size = video_frame.yres() as usize * src_stride;
+ let plane_size1 = ((video_frame.yres() as usize + 1) / 2) * src_stride1;
+
+ if src.len() < plane_size + 2 * plane_size1 || src_stride < line_bytes {
+ gst::error!(CAT, imp: self, "Video packet has wrong stride or size");
+ gst::element_imp_error!(
+ self,
+ gst::StreamError::Format,
+ ["Video packet has wrong stride or size"]
+ );
+ return Err(gst::FlowError::Error);
+ }
+
+ // First plane
+ {
+ let dest_stride = vframe.plane_stride()[0] as usize;
+ let dest = vframe.plane_data_mut(0).unwrap();
+ let src = &src[..plane_size];
+
+ for (dest, src) in dest
+ .chunks_exact_mut(dest_stride)
+ .zip(src.chunks_exact(src_stride))
+ {
+ dest[..line_bytes].copy_from_slice(&src[..line_bytes]);
+ }
+ }
+
+ // Second plane
+ {
+ let dest_stride = vframe.plane_stride()[1] as usize;
+ let dest = vframe.plane_data_mut(1).unwrap();
+ let src = &src[plane_size..][..plane_size1];
+
+ for (dest, src) in dest
+ .chunks_exact_mut(dest_stride)
+ .zip(src.chunks_exact(src_stride1))
+ {
+ dest[..line_bytes1].copy_from_slice(&src[..line_bytes1]);
+ }
+ }
+
+ // Third plane
+ {
+ let dest_stride = vframe.plane_stride()[2] as usize;
+ let dest = vframe.plane_data_mut(2).unwrap();
+ let src = &src[plane_size + plane_size1..][..plane_size1];
+
+ for (dest, src) in dest
+ .chunks_exact_mut(dest_stride)
+ .zip(src.chunks_exact(src_stride1))
+ {
+ dest[..line_bytes1].copy_from_slice(&src[..line_bytes1]);
+ }
+ }
+
+ Ok(vframe.into_buffer())
+ }
+ }
+ _ => unreachable!(),
+ }
+ }
+ #[cfg(feature = "advanced-sdk")]
+ VideoInfo::SpeedHQInfo { .. } => {
+ Ok(gst::Buffer::from_slice(WrappedVideoFrame(video_frame)))
+ }
+ #[cfg(feature = "advanced-sdk")]
+ VideoInfo::H264 { .. } | VideoInfo::H265 { .. } => {
+ let compressed_packet = video_frame.compressed_packet().ok_or_else(|| {
+ gst::error!(
+ CAT,
+ imp: self,
+ "Video packet doesn't have compressed packet start"
+ );
+ gst::element_imp_error!(
+ self,
+ gst::StreamError::Format,
+ ["Invalid video packet"]
+ );
+
+ gst::FlowError::Error
+ })?;
+
+ // FIXME: Copy to a new vec for now. This can be optimized, especially if there is
+ // no extra data attached to the frame
+ let mut buffer = Vec::new();
+ if let Some(extra_data) = compressed_packet.extra_data {
+ buffer.extend_from_slice(extra_data);
+ }
+ buffer.extend_from_slice(compressed_packet.data);
+ let mut buffer = gst::Buffer::from_mut_slice(buffer);
+ if !compressed_packet.key_frame {
+ let buffer = buffer.get_mut().unwrap();
+ buffer.set_flags(gst::BufferFlags::DELTA_UNIT);
+ }
+
+ Ok(buffer)
+ }
+ }
+ }
+
+ fn calculate_audio_timestamp(
+ &self,
+ state: &mut State,
+ receive_time_gst: gst::ClockTime,
+ receive_time_real: gst::ClockTime,
+ audio_frame: &crate::ndi::AudioFrame,
+ ) -> Option<(gst::ClockTime, Option<gst::ClockTime>, bool)> {
+ let duration = gst::ClockTime::SECOND.mul_div_floor(
+ audio_frame.no_samples() as u64,
+ audio_frame.sample_rate() as u64,
+ );
+
+ self.calculate_timestamp(
+ state,
+ true,
+ receive_time_gst,
+ receive_time_real,
+ audio_frame.timestamp(),
+ audio_frame.timecode(),
+ duration,
+ )
+ }
+
+ fn create_audio_info(
+ &self,
+ audio_frame: &crate::ndi::AudioFrame,
+ ) -> Result<AudioInfo, gst::FlowError> {
+ let fourcc = audio_frame.fourcc();
+
+ if [ndisys::NDIlib_FourCC_audio_type_FLTp].contains(&fourcc) {
+ let channels = audio_frame.no_channels() as u32;
+ let mut positions = [gst_audio::AudioChannelPosition::None; 64];
+ if channels <= 8 {
+ let _ = gst_audio::AudioChannelPosition::positions_from_mask(
+ gst_audio::AudioChannelPosition::fallback_mask(channels),
+ &mut positions[..channels as usize],
+ );
+ }
+
+ let builder = gst_audio::AudioInfo::builder(
+ gst_audio::AUDIO_FORMAT_F32,
+ audio_frame.sample_rate() as u32,
+ channels,
+ )
+ .positions(&positions[..channels as usize]);
+
+ let info = builder.build().map_err(|_| {
+ gst::element_imp_error!(
+ self,
+ gst::StreamError::Format,
+ ["Invalid audio format configuration"]
+ );
+
+ gst::FlowError::NotNegotiated
+ })?;
+
+ return Ok(AudioInfo::Audio(info));
+ }
+
+ #[cfg(feature = "advanced-sdk")]
+ if [ndisys::NDIlib_FourCC_audio_type_AAC].contains(&fourcc) {
+ let compressed_packet = audio_frame.compressed_packet().ok_or_else(|| {
+ gst::error!(
+ CAT,
+ imp: self,
+ "Audio packet doesn't have compressed packet start"
+ );
+ gst::element_imp_error!(self, gst::StreamError::Format, ["Invalid audio packet"]);
+
+ gst::FlowError::Error
+ })?;
+
+ if compressed_packet.fourcc != ndisys::NDIlib_compressed_FourCC_type_AAC {
+ gst::error!(CAT, imp: self, "Non-AAC audio packet");
+ gst::element_imp_error!(self, gst::StreamError::Format, ["Invalid audio packet"]);
+
+ return Err(gst::FlowError::Error);
+ }
+
+ return Ok(AudioInfo::Aac {
+ sample_rate: audio_frame.sample_rate(),
+ no_channels: audio_frame.no_channels(),
+ codec_data: compressed_packet
+ .extra_data
+ .ok_or(gst::FlowError::NotNegotiated)?
+ .try_into()
+ .map_err(|_| gst::FlowError::NotNegotiated)?,
+ });
+ }
+
+ // FIXME: Needs testing with an actual stream to understand how it works
+ // #[cfg(feature = "advanced-sdk")]
+ // if [NDIlib_FourCC_audio_type_Opus].contains(&fourcc) {}
+
+ gst::element_imp_error!(
+ self,
+ gst::StreamError::Format,
+ ["Unsupported audio fourcc {:08x}", audio_frame.fourcc()]
+ );
+ Err(gst::FlowError::NotNegotiated)
+ }
+
+ fn create_audio_buffer(
+ &self,
+ state: &State,
+ pts: gst::ClockTime,
+ duration: Option<gst::ClockTime>,
+ discont: bool,
+ resync: bool,
+ audio_frame: crate::ndi::AudioFrame,
+ ) -> Result<gst::Buffer, gst::FlowError> {
+ struct WrappedAudioFrame(crate::ndi::AudioFrame);
+
+ impl AsRef<[u8]> for WrappedAudioFrame {
+ fn as_ref(&self) -> &[u8] {
+ self.0.data().unwrap_or(&[])
+ }
+ }
+
+ match state.audio_info.as_ref().unwrap() {
+ AudioInfo::Audio(ref info) => {
+ let no_samples = audio_frame.no_samples();
+ let timecode = audio_frame.timecode();
+ let timestamp = audio_frame.timestamp();
+ let buff_size = (no_samples as u32 * info.bpf()) as usize;
+
+ let mut buffer = if state.audio_non_interleaved {
+ let info = state.audio_info_non_interleaved.as_ref().unwrap();
+ let mut buffer = gst::Buffer::from_slice(WrappedAudioFrame(audio_frame));
+
+ {
+ let buffer = buffer.get_mut().unwrap();
+
+ gst_audio::AudioMeta::add(buffer, info, no_samples as usize, &[]).unwrap();
+ }
+
+ buffer
+ } else {
+ gst::debug!(gst::CAT_PERFORMANCE, imp: self, "Copying raw audio frame");
+
+ let src = audio_frame.data().ok_or(gst::FlowError::Error)?;
+ let mut buffer = gst::Buffer::with_size(buff_size).unwrap();
+
+ {
+ let buffer = buffer.get_mut().unwrap();
+ let mut dest = buffer.map_writable().unwrap();
+ let dest = dest
+ .as_mut_slice_of::<f32>()
+ .map_err(|_| gst::FlowError::NotNegotiated)?;
+ assert!(
+ dest.len()
+ == audio_frame.no_samples() as usize
+ * audio_frame.no_channels() as usize
+ );
+
+ for (channel, samples) in src
+ .chunks_exact(
+ audio_frame.channel_stride_or_data_size_in_bytes() as usize
+ )
+ .enumerate()
+ {
+ let samples = samples
+ .as_slice_of::<f32>()
+ .map_err(|_| gst::FlowError::NotNegotiated)?;
+
+ for (i, sample) in samples[..audio_frame.no_samples() as usize]
+ .iter()
+ .enumerate()
+ {
+ dest[i * (audio_frame.no_channels() as usize) + channel] = *sample;
+ }
+ }
+ }
+
+ buffer
+ };
+
+ {
+ let buffer = buffer.get_mut().unwrap();
+
+ buffer.set_pts(pts);
+ buffer.set_duration(duration);
+
+ if resync {
+ buffer.set_flags(gst::BufferFlags::RESYNC);
+ }
+
+ if discont {
+ buffer.set_flags(gst::BufferFlags::DISCONT);
+ }
+
+ gst::ReferenceTimestampMeta::add(
+ buffer,
+ &crate::TIMECODE_CAPS,
+ (timecode as u64 * 100).nseconds(),
+ gst::ClockTime::NONE,
+ );
+ if timestamp != ndisys::NDIlib_recv_timestamp_undefined {
+ gst::ReferenceTimestampMeta::add(
+ buffer,
+ &crate::TIMESTAMP_CAPS,
+ (timestamp as u64 * 100).nseconds(),
+ gst::ClockTime::NONE,
+ );
+ }
+ }
+
+ Ok(buffer)
+ }
+ #[cfg(feature = "advanced-sdk")]
+ AudioInfo::Opus { .. } => {
+ let data = audio_frame.data().ok_or_else(|| {
+ gst::error!(CAT, imp: self, "Audio packet has no data");
+ gst::element_imp_error!(
+ self,
+ gst::StreamError::Format,
+ ["Invalid audio packet"]
+ );
+
+ gst::FlowError::Error
+ })?;
+
+ Ok(gst::Buffer::from_mut_slice(Vec::from(data)))
+ }
+ #[cfg(feature = "advanced-sdk")]
+ AudioInfo::Aac { .. } => {
+ let compressed_packet = audio_frame.compressed_packet().ok_or_else(|| {
+ gst::error!(
+ CAT,
+ imp: self,
+ "Audio packet doesn't have compressed packet start"
+ );
+ gst::element_imp_error!(
+ self,
+ gst::StreamError::Format,
+ ["Invalid audio packet"]
+ );
+
+ gst::FlowError::Error
+ })?;
+
+ Ok(gst::Buffer::from_mut_slice(Vec::from(
+ compressed_packet.data,
+ )))
+ }
+ }
+ }
+}
+
+#[derive(Debug, PartialEq, Eq)]
+#[allow(clippy::large_enum_variant)]
+pub enum AudioInfo {
+ Audio(gst_audio::AudioInfo),
+ #[cfg(feature = "advanced-sdk")]
+ #[allow(dead_code)]
+ Opus {
+ sample_rate: i32,
+ no_channels: i32,
+ },
+ #[cfg(feature = "advanced-sdk")]
+ Aac {
+ sample_rate: i32,
+ no_channels: i32,
+ codec_data: [u8; 2],
+ },
+}
+
+impl AudioInfo {
+ pub fn to_caps(&self) -> Result<gst::Caps, glib::BoolError> {
+ match self {
+ AudioInfo::Audio(ref info) => info.to_caps(),
+ #[cfg(feature = "advanced-sdk")]
+ AudioInfo::Opus {
+ sample_rate,
+ no_channels,
+ } => Ok(gst::Caps::builder("audio/x-opus")
+ .field("channels", *no_channels)
+ .field("rate", *sample_rate)
+ .field("channel-mapping-family", 0i32)
+ .build()),
+ #[cfg(feature = "advanced-sdk")]
+ AudioInfo::Aac {
+ sample_rate,
+ no_channels,
+ codec_data,
+ } => Ok(gst::Caps::builder("audio/mpeg")
+ .field("channels", *no_channels)
+ .field("rate", *sample_rate)
+ .field("mpegversion", 4i32)
+ .field("stream-format", "raw")
+ .field("codec_data", gst::Buffer::from_mut_slice(*codec_data))
+ .build()),
+ }
+ }
+}
+
+#[derive(Debug, PartialEq, Eq)]
+pub enum VideoInfo {
+ Video(gst_video::VideoInfo),
+ #[cfg(feature = "advanced-sdk")]
+ SpeedHQInfo {
+ variant: String,
+ xres: i32,
+ yres: i32,
+ fps_n: i32,
+ fps_d: i32,
+ par_n: i32,
+ par_d: i32,
+ interlace_mode: gst_video::VideoInterlaceMode,
+ },
+ #[cfg(feature = "advanced-sdk")]
+ H264 {
+ xres: i32,
+ yres: i32,
+ fps_n: i32,
+ fps_d: i32,
+ par_n: i32,
+ par_d: i32,
+ interlace_mode: gst_video::VideoInterlaceMode,
+ },
+ #[cfg(feature = "advanced-sdk")]
+ H265 {
+ xres: i32,
+ yres: i32,
+ fps_n: i32,
+ fps_d: i32,
+ par_n: i32,
+ par_d: i32,
+ interlace_mode: gst_video::VideoInterlaceMode,
+ },
+}
+
+impl VideoInfo {
+ pub fn to_caps(&self) -> Result<gst::Caps, glib::BoolError> {
+ match self {
+ VideoInfo::Video(ref info) => info.to_caps(),
+ #[cfg(feature = "advanced-sdk")]
+ VideoInfo::SpeedHQInfo {
+ ref variant,
+ xres,
+ yres,
+ fps_n,
+ fps_d,
+ par_n,
+ par_d,
+ interlace_mode,
+ } => Ok(gst::Caps::builder("video/x-speedhq")
+ .field("width", *xres)
+ .field("height", *yres)
+ .field("framerate", gst::Fraction::new(*fps_n, *fps_d))
+ .field("pixel-aspect-ratio", gst::Fraction::new(*par_n, *par_d))
+ .field("interlace-mode", interlace_mode.to_str())
+ .field("variant", variant)
+ .build()),
+ #[cfg(feature = "advanced-sdk")]
+ VideoInfo::H264 {
+ xres,
+ yres,
+ fps_n,
+ fps_d,
+ par_n,
+ par_d,
+ interlace_mode,
+ ..
+ } => Ok(gst::Caps::builder("video/x-h264")
+ .field("width", *xres)
+ .field("height", *yres)
+ .field("framerate", gst::Fraction::new(*fps_n, *fps_d))
+ .field("pixel-aspect-ratio", gst::Fraction::new(*par_n, *par_d))
+ .field("interlace-mode", interlace_mode.to_str())
+ .field("stream-format", "byte-stream")
+ .field("alignment", "au")
+ .build()),
+ #[cfg(feature = "advanced-sdk")]
+ VideoInfo::H265 {
+ xres,
+ yres,
+ fps_n,
+ fps_d,
+ par_n,
+ par_d,
+ interlace_mode,
+ ..
+ } => Ok(gst::Caps::builder("video/x-h265")
+ .field("width", *xres)
+ .field("height", *yres)
+ .field("framerate", gst::Fraction::new(*fps_n, *fps_d))
+ .field("pixel-aspect-ratio", gst::Fraction::new(*par_n, *par_d))
+ .field("interlace-mode", interlace_mode.to_str())
+ .field("stream-format", "byte-stream")
+ .field("alignment", "au")
+ .build()),
+ }
+ }
+
+ pub fn width(&self) -> u32 {
+ match self {
+ VideoInfo::Video(ref info) => info.width(),
+ #[cfg(feature = "advanced-sdk")]
+ VideoInfo::SpeedHQInfo { xres, .. }
+ | VideoInfo::H264 { xres, .. }
+ | VideoInfo::H265 { xres, .. } => *xres as u32,
+ }
+ }
+}
+
+const PREFILL_WINDOW_LENGTH: usize = 12;
+const WINDOW_LENGTH: u64 = 512;
+const WINDOW_DURATION: u64 = 2_000_000_000;
+
+#[derive(Default)]
+struct Observations(AtomicRefCell<ObservationsInner>);
+
+struct ObservationsInner {
+ base_remote_time: Option<u64>,
+ base_local_time: Option<u64>,
+ deltas: VecDeque<i64>,
+ min_delta: i64,
+ skew: i64,
+ filling: bool,
+ window_size: usize,
+
+ // Remote/local times for workaround around fundamentally wrong slopes
+ // This is not reset below and has a bigger window.
+ times: VecDeque<(u64, u64)>,
+ slope_correction: (u64, u64),
+}
+
+impl Default for ObservationsInner {
+ fn default() -> ObservationsInner {
+ ObservationsInner {
+ base_local_time: None,
+ base_remote_time: None,
+ deltas: VecDeque::new(),
+ min_delta: 0,
+ skew: 0,
+ filling: true,
+ window_size: 0,
+ times: VecDeque::new(),
+ slope_correction: (1, 1),
+ }
+ }
+}
+
+impl ObservationsInner {
+ fn reset(&mut self) {
+ self.base_local_time = None;
+ self.base_remote_time = None;
+ self.deltas = VecDeque::new();
+ self.min_delta = 0;
+ self.skew = 0;
+ self.filling = true;
+ self.window_size = 0;
+ }
+}
+
+impl Observations {
+ // Based on the algorithm used in GStreamer's rtpjitterbuffer, which comes from
+ // Fober, Orlarey and Letz, 2005, "Real Time Clock Skew Estimation over Network Delays":
+ // http://citeseerx.ist.psu.edu/viewdoc/summary?doi=10.1.1.102.1546
+ fn process(
+ &self,
+ element: &gst::Element,
+ remote_time: Option<gst::ClockTime>,
+ local_time: gst::ClockTime,
+ duration: Option<gst::ClockTime>,
+ ) -> Option<(gst::ClockTime, Option<gst::ClockTime>, bool)> {
+ let remote_time = remote_time?.nseconds();
+ let local_time = local_time.nseconds();
+
+ let mut inner = self.0.borrow_mut();
+
+ gst::trace!(
+ CAT,
+ obj: element,
+ "Local time {}, remote time {}, slope correct {}/{}",
+ local_time.nseconds(),
+ remote_time.nseconds(),
+ inner.slope_correction.0,
+ inner.slope_correction.1,
+ );
+
+ inner.times.push_back((remote_time, local_time));
+ while inner
+ .times
+ .back()
+ .unwrap()
+ .1
+ .saturating_sub(inner.times.front().unwrap().1)
+ > WINDOW_DURATION
+ {
+ let _ = inner.times.pop_front();
+ }
+
+ // Static remote times
+ if inner.slope_correction.1 == 0 {
+ return None;
+ }
+
+ let remote_time =
+ remote_time.mul_div_round(inner.slope_correction.0, inner.slope_correction.1)?;
+
+ let (base_remote_time, base_local_time) =
+ match (inner.base_remote_time, inner.base_local_time) {
+ (Some(remote), Some(local)) => (remote, local),
+ _ => {
+ gst::debug!(
+ CAT,
+ obj: element,
+ "Initializing base time: local {}, remote {}",
+ local_time.nseconds(),
+ remote_time.nseconds(),
+ );
+ inner.base_remote_time = Some(remote_time);
+ inner.base_local_time = Some(local_time);
+
+ return Some((local_time.nseconds(), duration, true));
+ }
+ };
+
+ if inner.times.len() < PREFILL_WINDOW_LENGTH {
+ return Some((local_time.nseconds(), duration, false));
+ }
+
+ // Check if the slope is simply wrong and try correcting
+ {
+ let local_diff = inner
+ .times
+ .back()
+ .unwrap()
+ .1
+ .saturating_sub(inner.times.front().unwrap().1);
+ let remote_diff = inner
+ .times
+ .back()
+ .unwrap()
+ .0
+ .saturating_sub(inner.times.front().unwrap().0);
+
+ if remote_diff == 0 {
+ inner.reset();
+ inner.base_remote_time = Some(remote_time);
+ inner.base_local_time = Some(local_time);
+
+ // Static remote times
+ inner.slope_correction = (0, 0);
+ return None;
+ } else {
+ let slope = local_diff as f64 / remote_diff as f64;
+ let scaled_slope =
+ slope * (inner.slope_correction.1 as f64) / (inner.slope_correction.0 as f64);
+
+ // Check for some obviously wrong slopes and try to correct for that
+ if !(0.5..1.5).contains(&scaled_slope) {
+ gst::warning!(
+ CAT,
+ obj: element,
+ "Too small/big slope {}, resetting",
+ scaled_slope
+ );
+
+ let discont = !inner.deltas.is_empty();
+ inner.reset();
+
+ if (0.0005..0.0015).contains(&slope) {
+ // Remote unit was actually 0.1ns
+ inner.slope_correction = (1, 1000);
+ } else if (0.005..0.015).contains(&slope) {
+ // Remote unit was actually 1ns
+ inner.slope_correction = (1, 100);
+ } else if (0.05..0.15).contains(&slope) {
+ // Remote unit was actually 10ns
+ inner.slope_correction = (1, 10);
+ } else if (5.0..15.0).contains(&slope) {
+ // Remote unit was actually 1us
+ inner.slope_correction = (10, 1);
+ } else if (50.0..150.0).contains(&slope) {
+ // Remote unit was actually 10us
+ inner.slope_correction = (100, 1);
+ } else if (50.0..150.0).contains(&slope) {
+ // Remote unit was actually 100us
+ inner.slope_correction = (1000, 1);
+ } else if (50.0..150.0).contains(&slope) {
+ // Remote unit was actually 1ms
+ inner.slope_correction = (10000, 1);
+ } else {
+ inner.slope_correction = (1, 1);
+ }
+
+ let remote_time = inner
+ .times
+ .back()
+ .unwrap()
+ .0
+ .mul_div_round(inner.slope_correction.0, inner.slope_correction.1)?;
+ gst::debug!(
+ CAT,
+ obj: element,
+ "Initializing base time: local {}, remote {}, slope correction {}/{}",
+ local_time.nseconds(),
+ remote_time.nseconds(),
+ inner.slope_correction.0,
+ inner.slope_correction.1,
+ );
+ inner.base_remote_time = Some(remote_time);
+ inner.base_local_time = Some(local_time);
+
+ return Some((local_time.nseconds(), duration, discont));
+ }
+ }
+ }
+
+ let remote_diff = remote_time.saturating_sub(base_remote_time);
+ let local_diff = local_time.saturating_sub(base_local_time);
+ let delta = (local_diff as i64) - (remote_diff as i64);
+
+ gst::trace!(
+ CAT,
+ obj: element,
+ "Local diff {}, remote diff {}, delta {}",
+ local_diff.nseconds(),
+ remote_diff.nseconds(),
+ delta,
+ );
+
+ if (delta > inner.skew && delta - inner.skew > 1_000_000_000)
+ || (delta < inner.skew && inner.skew - delta > 1_000_000_000)
+ {
+ gst::warning!(
+ CAT,
+ obj: element,
+ "Delta {} too far from skew {}, resetting",
+ delta,
+ inner.skew
+ );
+
+ let discont = !inner.deltas.is_empty();
+
+ gst::debug!(
+ CAT,
+ obj: element,
+ "Initializing base time: local {}, remote {}",
+ local_time.nseconds(),
+ remote_time.nseconds(),
+ );
+
+ inner.reset();
+ inner.base_remote_time = Some(remote_time);
+ inner.base_local_time = Some(local_time);
+
+ return Some((local_time.nseconds(), duration, discont));
+ }
+
+ if inner.filling {
+ if inner.deltas.is_empty() || delta < inner.min_delta {
+ inner.min_delta = delta;
+ }
+ inner.deltas.push_back(delta);
+
+ if remote_diff > WINDOW_DURATION || inner.deltas.len() as u64 == WINDOW_LENGTH {
+ inner.window_size = inner.deltas.len();
+ inner.skew = inner.min_delta;
+ inner.filling = false;
+ } else {
+ let perc_time = remote_diff.mul_div_floor(100, WINDOW_DURATION).unwrap() as i64;
+ let perc_window = (inner.deltas.len() as u64)
+ .mul_div_floor(100, WINDOW_LENGTH)
+ .unwrap() as i64;
+ let perc = cmp::max(perc_time, perc_window);
+
+ inner.skew = (perc * inner.min_delta + ((10_000 - perc) * inner.skew)) / 10_000;
+ }
+ } else {
+ let old = inner.deltas.pop_front().unwrap();
+ inner.deltas.push_back(delta);
+
+ if delta <= inner.min_delta {
+ inner.min_delta = delta;
+ } else if old == inner.min_delta {
+ inner.min_delta = inner.deltas.iter().copied().min().unwrap();
+ }
+
+ inner.skew = (inner.min_delta + (124 * inner.skew)) / 125;
+ }
+
+ let out_time = base_local_time + remote_diff;
+ let out_time = if inner.skew < 0 {
+ out_time.saturating_sub((-inner.skew) as u64)
+ } else {
+ out_time + (inner.skew as u64)
+ };
+
+ gst::trace!(
+ CAT,
+ obj: element,
+ "Skew {}, min delta {}",
+ inner.skew,
+ inner.min_delta
+ );
+ gst::trace!(CAT, obj: element, "Outputting {}", out_time.nseconds());
+
+ Some((out_time.nseconds(), duration, false))
}
}
diff --git a/net/ndi/src/ndisrcmeta.rs b/net/ndi/src/ndisrcmeta.rs
index 6f4e053d0..c6d3a7ac4 100644
--- a/net/ndi/src/ndisrcmeta.rs
+++ b/net/ndi/src/ndisrcmeta.rs
@@ -4,30 +4,49 @@ use gst::prelude::*;
use std::fmt;
use std::mem;
+use crate::ndi::{AudioFrame, MetadataFrame, VideoFrame};
+use crate::TimestampMode;
+
#[repr(transparent)]
pub struct NdiSrcMeta(imp::NdiSrcMeta);
-#[derive(Copy, Clone, Debug, PartialEq, Eq)]
-pub enum StreamType {
- Audio,
- Video,
+#[derive(Debug)]
+#[allow(clippy::large_enum_variant)]
+pub enum Buffer {
+ Audio {
+ frame: AudioFrame,
+ discont: bool,
+ receive_time_gst: gst::ClockTime,
+ receive_time_real: gst::ClockTime,
+ },
+ Video {
+ frame: VideoFrame,
+ discont: bool,
+ receive_time_gst: gst::ClockTime,
+ receive_time_real: gst::ClockTime,
+ },
+ Metadata {
+ frame: MetadataFrame,
+ receive_time_gst: gst::ClockTime,
+ receive_time_real: gst::ClockTime,
+ },
}
unsafe impl Send for NdiSrcMeta {}
unsafe impl Sync for NdiSrcMeta {}
impl NdiSrcMeta {
- pub fn add<'a>(
- buffer: &'a mut gst::BufferRef,
- stream_type: StreamType,
- caps: &gst::Caps,
- ) -> gst::MetaRefMut<'a, Self, gst::meta::Standalone> {
+ pub fn add(
+ buffer: &mut gst::BufferRef,
+ ndi_buffer: Buffer,
+ timestamp_mode: TimestampMode,
+ ) -> gst::MetaRefMut<Self, gst::meta::Standalone> {
unsafe {
// Manually dropping because gst_buffer_add_meta() takes ownership of the
// content of the struct
let mut params = mem::ManuallyDrop::new(imp::NdiSrcMetaParams {
- caps: caps.clone(),
- stream_type,
+ ndi_buffer,
+ timestamp_mode,
});
let meta = gst::ffi::gst_buffer_add_meta(
@@ -40,12 +59,8 @@ impl NdiSrcMeta {
}
}
- pub fn stream_type(&self) -> StreamType {
- self.0.stream_type
- }
-
- pub fn caps(&self) -> gst::Caps {
- self.0.caps.clone()
+ pub fn take_ndi_buffer(&mut self) -> Buffer {
+ self.0.ndi_buffer.take().expect("can only take buffer once")
}
}
@@ -60,29 +75,30 @@ unsafe impl MetaAPI for NdiSrcMeta {
impl fmt::Debug for NdiSrcMeta {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("NdiSrcMeta")
- .field("stream_type", &self.stream_type())
- .field("caps", &self.caps())
+ .field("ndi_buffer", &self.0.ndi_buffer)
.finish()
}
}
mod imp {
- use super::StreamType;
+ use crate::TimestampMode;
+
+ use super::Buffer;
use glib::translate::*;
use gst::glib::once_cell::sync::Lazy;
use std::mem;
use std::ptr;
pub(super) struct NdiSrcMetaParams {
- pub caps: gst::Caps,
- pub stream_type: StreamType,
+ pub ndi_buffer: Buffer,
+ pub timestamp_mode: TimestampMode,
}
#[repr(C)]
pub struct NdiSrcMeta {
parent: gst::ffi::GstMeta,
- pub(super) caps: gst::Caps,
- pub(super) stream_type: StreamType,
+ pub(super) ndi_buffer: Option<Buffer>,
+ pub(super) timestamp_mode: TimestampMode,
}
pub(super) fn ndi_src_meta_api_get_type() -> glib::Type {
@@ -110,8 +126,8 @@ mod imp {
let meta = &mut *(meta as *mut NdiSrcMeta);
let params = ptr::read(params as *const NdiSrcMetaParams);
- ptr::write(&mut meta.stream_type, params.stream_type);
- ptr::write(&mut meta.caps, params.caps);
+ ptr::write(&mut meta.ndi_buffer, Some(params.ndi_buffer));
+ ptr::write(&mut meta.timestamp_mode, params.timestamp_mode);
true.into_glib()
}
@@ -122,8 +138,7 @@ mod imp {
) {
let meta = &mut *(meta as *mut NdiSrcMeta);
- ptr::drop_in_place(&mut meta.stream_type);
- ptr::drop_in_place(&mut meta.caps);
+ ptr::drop_in_place(&mut meta.ndi_buffer);
}
unsafe extern "C" fn ndi_src_meta_transform(