Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.freedesktop.org/gstreamer/gst-plugins-rs.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSebastian Dröge <sebastian@centricular.com>2020-11-14 20:09:42 +0300
committerSebastian Dröge <sebastian@centricular.com>2020-11-15 19:25:42 +0300
commitaf0337c26c25cc782142f914dfa19383e51e1743 (patch)
tree60263c9ee978753df8cc5be55ed04ce3c9c65488 /generic
parentf54f9f977ec2143028c6068ac43bff5da4bea2ee (diff)
generic: Update for subclassing API changes
Diffstat (limited to 'generic')
-rw-r--r--generic/file/src/filesink/imp.rs (renamed from generic/file/src/filesink.rs)45
-rw-r--r--generic/file/src/filesink/mod.rs29
-rw-r--r--generic/file/src/filesrc/imp.rs (renamed from generic/file/src/filesrc.rs)54
-rw-r--r--generic/file/src/filesrc/mod.rs28
-rw-r--r--generic/sodium/src/decrypter/imp.rs (renamed from generic/sodium/src/decrypter.rs)50
-rw-r--r--generic/sodium/src/decrypter/mod.rs43
-rw-r--r--generic/sodium/src/encrypter/imp.rs (renamed from generic/sodium/src/encrypter.rs)44
-rw-r--r--generic/sodium/src/encrypter/mod.rs43
-rw-r--r--generic/threadshare/src/appsrc/imp.rs (renamed from generic/threadshare/src/appsrc.rs)55
-rw-r--r--generic/threadshare/src/appsrc/mod.rs40
-rw-r--r--generic/threadshare/src/inputselector/imp.rs (renamed from generic/threadshare/src/inputselector.rs)46
-rw-r--r--generic/threadshare/src/inputselector/mod.rs39
-rw-r--r--generic/threadshare/src/jitterbuffer/ffi.rs115
-rw-r--r--generic/threadshare/src/jitterbuffer/imp.rs1636
-rw-r--r--generic/threadshare/src/jitterbuffer/jitterbuffer.rs1764
-rw-r--r--generic/threadshare/src/jitterbuffer/mod.rs473
-rw-r--r--generic/threadshare/src/lib.rs2
-rw-r--r--generic/threadshare/src/proxy/imp.rs (renamed from generic/threadshare/src/proxy.rs)91
-rw-r--r--generic/threadshare/src/proxy/mod.rs54
-rw-r--r--generic/threadshare/src/queue/imp.rs (renamed from generic/threadshare/src/queue.rs)48
-rw-r--r--generic/threadshare/src/queue/mod.rs39
-rw-r--r--generic/threadshare/src/runtime/pad.rs90
-rw-r--r--generic/threadshare/src/tcpclientsrc/imp.rs (renamed from generic/threadshare/src/tcpclientsrc.rs)53
-rw-r--r--generic/threadshare/src/tcpclientsrc/mod.rs40
-rw-r--r--generic/threadshare/src/udpsink/imp.rs (renamed from generic/threadshare/src/udpsink.rs)76
-rw-r--r--generic/threadshare/src/udpsink/mod.rs39
-rw-r--r--generic/threadshare/src/udpsrc/imp.rs (renamed from generic/threadshare/src/udpsrc.rs)69
-rw-r--r--generic/threadshare/src/udpsrc/mod.rs39
-rw-r--r--generic/threadshare/tests/pad.rs1103
29 files changed, 3347 insertions, 2900 deletions
diff --git a/generic/file/src/filesink.rs b/generic/file/src/filesink/imp.rs
index 2bc0b7c3e..ae01b0849 100644
--- a/generic/file/src/filesink.rs
+++ b/generic/file/src/filesink/imp.rs
@@ -23,7 +23,7 @@ use std::sync::Mutex;
use url::Url;
-use file_location::FileLocation;
+use crate::file_location::FileLocation;
const DEFAULT_LOCATION: Option<FileLocation> = None;
@@ -77,7 +77,7 @@ lazy_static! {
impl FileSink {
fn set_location(
&self,
- element: &gst_base::BaseSink,
+ element: &super::FileSink,
location: Option<FileLocation>,
) -> Result<(), glib::Error> {
let state = self.state.lock().unwrap();
@@ -119,6 +119,7 @@ impl FileSink {
impl ObjectSubclass for FileSink {
const NAME: &'static str = "RsFileSink";
+ type Type = super::FileSink;
type ParentType = gst_base::BaseSink;
type Instance = gst::subclass::ElementInstanceStruct<Self>;
type Class = subclass::simple::ClassStruct<Self>;
@@ -136,7 +137,7 @@ impl ObjectSubclass for FileSink {
type_.add_interface::<gst::URIHandler>();
}
- fn class_init(klass: &mut subclass::simple::ClassStruct<Self>) {
+ fn class_init(klass: &mut Self::Class) {
klass.set_metadata(
"File Sink",
"Sink/File",
@@ -159,33 +160,26 @@ impl ObjectSubclass for FileSink {
}
impl ObjectImpl for FileSink {
- fn set_property(&self, obj: &glib::Object, id: usize, value: &glib::Value) {
+ fn set_property(&self, obj: &Self::Type, id: usize, value: &glib::Value) {
let prop = &PROPERTIES[id];
match *prop {
subclass::Property("location", ..) => {
- let element = obj.downcast_ref::<gst_base::BaseSink>().unwrap();
-
let res = match value.get::<String>() {
Ok(Some(location)) => FileLocation::try_from_path_str(location)
- .and_then(|file_location| self.set_location(&element, Some(file_location))),
- Ok(None) => self.set_location(&element, None),
+ .and_then(|file_location| self.set_location(obj, Some(file_location))),
+ Ok(None) => self.set_location(obj, None),
Err(_) => unreachable!("type checked upstream"),
};
if let Err(err) = res {
- gst_error!(
- CAT,
- obj: element,
- "Failed to set property `location`: {}",
- err
- );
+ gst_error!(CAT, obj: obj, "Failed to set property `location`: {}", err);
}
}
_ => unimplemented!(),
};
}
- fn get_property(&self, _obj: &glib::Object, id: usize) -> Result<glib::Value, ()> {
+ fn get_property(&self, _obj: &Self::Type, id: usize) -> Result<glib::Value, ()> {
let prop = &PROPERTIES[id];
match *prop {
subclass::Property("location", ..) => {
@@ -205,7 +199,7 @@ impl ObjectImpl for FileSink {
impl ElementImpl for FileSink {}
impl BaseSinkImpl for FileSink {
- fn start(&self, element: &gst_base::BaseSink) -> Result<(), gst::ErrorMessage> {
+ fn start(&self, element: &Self::Type) -> Result<(), gst::ErrorMessage> {
let mut state = self.state.lock().unwrap();
if let State::Started { .. } = *state {
unreachable!("FileSink already started");
@@ -237,7 +231,7 @@ impl BaseSinkImpl for FileSink {
Ok(())
}
- fn stop(&self, element: &gst_base::BaseSink) -> Result<(), gst::ErrorMessage> {
+ fn stop(&self, element: &Self::Type) -> Result<(), gst::ErrorMessage> {
let mut state = self.state.lock().unwrap();
if let State::Stopped = *state {
return Err(gst_error_msg!(
@@ -256,7 +250,7 @@ impl BaseSinkImpl for FileSink {
fn render(
&self,
- element: &gst_base::BaseSink,
+ element: &Self::Type,
buffer: &gst::Buffer,
) -> Result<gst::FlowSuccess, gst::FlowError> {
let mut state = self.state.lock().unwrap();
@@ -293,7 +287,7 @@ impl BaseSinkImpl for FileSink {
}
impl URIHandlerImpl for FileSink {
- fn get_uri(&self, _element: &gst::URIHandler) -> Option<String> {
+ fn get_uri(&self, _element: &Self::Type) -> Option<String> {
let settings = self.settings.lock().unwrap();
// Conversion to Url already checked while building the `FileLocation`
@@ -304,9 +298,7 @@ impl URIHandlerImpl for FileSink {
})
}
- fn set_uri(&self, element: &gst::URIHandler, uri: &str) -> Result<(), glib::Error> {
- let element = element.dynamic_cast_ref::<gst_base::BaseSink>().unwrap();
-
+ fn set_uri(&self, element: &Self::Type, uri: &str) -> Result<(), glib::Error> {
// Special case for "file://" as this is used by some applications to test
// with `gst_element_make_from_uri` if there's an element that supports the URI protocol
@@ -326,12 +318,3 @@ impl URIHandlerImpl for FileSink {
vec!["file".to_string()]
}
}
-
-pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> {
- gst::Element::register(
- Some(plugin),
- "rsfilesink",
- gst::Rank::None,
- FileSink::get_type(),
- )
-}
diff --git a/generic/file/src/filesink/mod.rs b/generic/file/src/filesink/mod.rs
new file mode 100644
index 000000000..8ad677156
--- /dev/null
+++ b/generic/file/src/filesink/mod.rs
@@ -0,0 +1,29 @@
+// Copyright (C) 2016-2017 Sebastian Dröge <sebastian@centricular.com>
+// 2016 Luis de Bethencourt <luisbg@osg.samsung.com>
+// 2018 François Laignel <fengalin@free.fr>
+//
+// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
+// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
+// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
+// option. This file may not be copied, modified, or distributed
+// except according to those terms.
+
+use glib::prelude::*;
+
+mod imp;
+
+glib_wrapper! {
+ pub struct FileSink(ObjectSubclass<imp::FileSink>) @extends gst_base::BaseSink, gst::Element, gst::Object, @implements gst::URIHandler;
+}
+
+unsafe impl Send for FileSink {}
+unsafe impl Sync for FileSink {}
+
+pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> {
+ gst::Element::register(
+ Some(plugin),
+ "rsfilesink",
+ gst::Rank::None,
+ FileSink::static_type(),
+ )
+}
diff --git a/generic/file/src/filesrc.rs b/generic/file/src/filesrc/imp.rs
index eb8d1d632..ca58949dc 100644
--- a/generic/file/src/filesrc.rs
+++ b/generic/file/src/filesrc/imp.rs
@@ -23,7 +23,7 @@ use std::sync::Mutex;
use url::Url;
-use file_location::FileLocation;
+use crate::file_location::FileLocation;
const DEFAULT_LOCATION: Option<FileLocation> = None;
@@ -77,7 +77,7 @@ lazy_static! {
impl FileSrc {
fn set_location(
&self,
- element: &gst_base::BaseSrc,
+ element: &super::FileSrc,
location: Option<FileLocation>,
) -> Result<(), glib::Error> {
let state = self.state.lock().unwrap();
@@ -133,6 +133,7 @@ impl FileSrc {
impl ObjectSubclass for FileSrc {
const NAME: &'static str = "RsFileSrc";
+ type Type = super::FileSrc;
type ParentType = gst_base::BaseSrc;
type Instance = gst::subclass::ElementInstanceStruct<Self>;
type Class = subclass::simple::ClassStruct<Self>;
@@ -150,7 +151,7 @@ impl ObjectSubclass for FileSrc {
type_.add_interface::<gst::URIHandler>();
}
- fn class_init(klass: &mut subclass::simple::ClassStruct<Self>) {
+ fn class_init(klass: &mut Self::Class) {
klass.set_metadata(
"File Source",
"Source/File",
@@ -173,33 +174,26 @@ impl ObjectSubclass for FileSrc {
}
impl ObjectImpl for FileSrc {
- fn set_property(&self, obj: &glib::Object, id: usize, value: &glib::Value) {
+ fn set_property(&self, obj: &Self::Type, id: usize, value: &glib::Value) {
let prop = &PROPERTIES[id];
match *prop {
subclass::Property("location", ..) => {
- let element = obj.downcast_ref::<gst_base::BaseSrc>().unwrap();
-
let res = match value.get::<String>() {
Ok(Some(location)) => FileLocation::try_from_path_str(location)
- .and_then(|file_location| self.set_location(&element, Some(file_location))),
- Ok(None) => self.set_location(&element, None),
+ .and_then(|file_location| self.set_location(obj, Some(file_location))),
+ Ok(None) => self.set_location(obj, None),
Err(_) => unreachable!("type checked upstream"),
};
if let Err(err) = res {
- gst_error!(
- CAT,
- obj: element,
- "Failed to set property `location`: {}",
- err
- );
+ gst_error!(CAT, obj: obj, "Failed to set property `location`: {}", err);
}
}
_ => unimplemented!(),
};
}
- fn get_property(&self, _obj: &glib::Object, id: usize) -> Result<glib::Value, ()> {
+ fn get_property(&self, _obj: &Self::Type, id: usize) -> Result<glib::Value, ()> {
let prop = &PROPERTIES[id];
match *prop {
subclass::Property("location", ..) => {
@@ -215,22 +209,21 @@ impl ObjectImpl for FileSrc {
}
}
- fn constructed(&self, obj: &glib::Object) {
+ fn constructed(&self, obj: &Self::Type) {
self.parent_constructed(obj);
- let element = obj.downcast_ref::<gst_base::BaseSrc>().unwrap();
- element.set_format(gst::Format::Bytes);
+ obj.set_format(gst::Format::Bytes);
}
}
impl ElementImpl for FileSrc {}
impl BaseSrcImpl for FileSrc {
- fn is_seekable(&self, _src: &gst_base::BaseSrc) -> bool {
+ fn is_seekable(&self, _src: &Self::Type) -> bool {
true
}
- fn get_size(&self, _src: &gst_base::BaseSrc) -> Option<u64> {
+ fn get_size(&self, _src: &Self::Type) -> Option<u64> {
let state = self.state.lock().unwrap();
if let State::Started { ref file, .. } = *state {
file.metadata().ok().map(|m| m.len())
@@ -239,7 +232,7 @@ impl BaseSrcImpl for FileSrc {
}
}
- fn start(&self, element: &gst_base::BaseSrc) -> Result<(), gst::ErrorMessage> {
+ fn start(&self, element: &Self::Type) -> Result<(), gst::ErrorMessage> {
let mut state = self.state.lock().unwrap();
if let State::Started { .. } = *state {
unreachable!("FileSrc already started");
@@ -273,7 +266,7 @@ impl BaseSrcImpl for FileSrc {
Ok(())
}
- fn stop(&self, element: &gst_base::BaseSrc) -> Result<(), gst::ErrorMessage> {
+ fn stop(&self, element: &Self::Type) -> Result<(), gst::ErrorMessage> {
let mut state = self.state.lock().unwrap();
if let State::Stopped = *state {
return Err(gst_error_msg!(
@@ -291,7 +284,7 @@ impl BaseSrcImpl for FileSrc {
fn fill(
&self,
- element: &gst_base::BaseSrc,
+ element: &Self::Type,
offset: u64,
_length: u32,
buffer: &mut gst::BufferRef,
@@ -347,7 +340,7 @@ impl BaseSrcImpl for FileSrc {
}
impl URIHandlerImpl for FileSrc {
- fn get_uri(&self, _element: &gst::URIHandler) -> Option<String> {
+ fn get_uri(&self, _element: &Self::Type) -> Option<String> {
let settings = self.settings.lock().unwrap();
// Conversion to Url already checked while building the `FileLocation`
@@ -358,9 +351,7 @@ impl URIHandlerImpl for FileSrc {
})
}
- fn set_uri(&self, element: &gst::URIHandler, uri: &str) -> Result<(), glib::Error> {
- let element = element.dynamic_cast_ref::<gst_base::BaseSrc>().unwrap();
-
+ fn set_uri(&self, element: &Self::Type, uri: &str) -> Result<(), glib::Error> {
// Special case for "file://" as this is used by some applications to test
// with `gst_element_make_from_uri` if there's an element that supports the URI protocol
@@ -380,12 +371,3 @@ impl URIHandlerImpl for FileSrc {
vec!["file".to_string()]
}
}
-
-pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> {
- gst::Element::register(
- Some(plugin),
- "rsfilesrc",
- gst::Rank::None,
- FileSrc::get_type(),
- )
-}
diff --git a/generic/file/src/filesrc/mod.rs b/generic/file/src/filesrc/mod.rs
new file mode 100644
index 000000000..70ed45de7
--- /dev/null
+++ b/generic/file/src/filesrc/mod.rs
@@ -0,0 +1,28 @@
+// Copyright (C) 2016-2017 Sebastian Dröge <sebastian@centricular.com>
+// 2018 François Laignel <fengalin@free.fr>
+//
+// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
+// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
+// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
+// option. This file may not be copied, modified, or distributed
+// except according to those terms.
+
+use glib::prelude::*;
+
+mod imp;
+
+glib_wrapper! {
+ pub struct FileSrc(ObjectSubclass<imp::FileSrc>) @extends gst_base::BaseSrc, gst::Element, gst::Object, @implements gst::URIHandler;
+}
+
+unsafe impl Send for FileSrc {}
+unsafe impl Sync for FileSrc {}
+
+pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> {
+ gst::Element::register(
+ Some(plugin),
+ "rsfilesrc",
+ gst::Rank::None,
+ FileSrc::static_type(),
+ )
+}
diff --git a/generic/sodium/src/decrypter.rs b/generic/sodium/src/decrypter/imp.rs
index 2de508b50..f82b874f2 100644
--- a/generic/sodium/src/decrypter.rs
+++ b/generic/sodium/src/decrypter/imp.rs
@@ -123,7 +123,7 @@ impl State {
// retrieval
fn decrypt_into_adapter(
&mut self,
- element: &gst::Element,
+ element: &super::Decrypter,
pad: &gst::Pad,
buffer: &gst::Buffer,
chunk_index: u64,
@@ -261,7 +261,7 @@ fn add_nonce(initial_nonce: box_::Nonce, chunk_index: u64) -> box_::Nonce {
box_::Nonce::from_slice(&nonce).expect("Failed to convert slice back to Nonce")
}
-struct Decrypter {
+pub struct Decrypter {
srcpad: gst::Pad,
sinkpad: gst::Pad,
props: Mutex<Props>,
@@ -272,7 +272,7 @@ impl Decrypter {
fn src_activatemode_function(
&self,
_pad: &gst::Pad,
- element: &gst::Element,
+ element: &super::Decrypter,
mode: gst::PadMode,
active: bool,
) -> Result<(), gst::LoggableError> {
@@ -295,7 +295,12 @@ impl Decrypter {
}
}
- fn src_query(&self, pad: &gst::Pad, element: &gst::Element, query: &mut gst::QueryRef) -> bool {
+ fn src_query(
+ &self,
+ pad: &gst::Pad,
+ element: &super::Decrypter,
+ query: &mut gst::QueryRef,
+ ) -> bool {
use gst::QueryView;
gst_log!(CAT, obj: pad, "Handling query {:?}", query);
@@ -349,7 +354,7 @@ impl Decrypter {
};
// subtract static offsets
- let size = size - super::HEADERS_SIZE as u64;
+ let size = size - crate::HEADERS_SIZE as u64;
// calculate the number of chunks that exist in the stream
let total_chunks =
@@ -366,7 +371,7 @@ impl Decrypter {
}
}
- fn check_headers(&self, element: &gst::Element) -> Result<(), gst::LoggableError> {
+ fn check_headers(&self, element: &super::Decrypter) -> Result<(), gst::LoggableError> {
let is_none = {
let mutex_state = self.state.lock().unwrap();
let state = mutex_state.as_ref().unwrap();
@@ -440,12 +445,12 @@ impl Decrypter {
fn pull_requested_buffer(
&self,
pad: &gst::Pad,
- element: &gst::Element,
+ element: &super::Decrypter,
requested_size: u32,
block_size: u32,
chunk_index: u64,
) -> Result<gst::Buffer, gst::FlowError> {
- let pull_offset = super::HEADERS_SIZE as u64
+ let pull_offset = crate::HEADERS_SIZE as u64
+ (chunk_index * block_size as u64)
+ (chunk_index * box_::MACBYTES as u64);
@@ -508,7 +513,7 @@ impl Decrypter {
fn get_range(
&self,
pad: &gst::Pad,
- element: &gst::Element,
+ element: &super::Decrypter,
offset: u64,
buffer: Option<&mut gst::BufferRef>,
requested_size: u32,
@@ -555,13 +560,14 @@ impl Decrypter {
impl ObjectSubclass for Decrypter {
const NAME: &'static str = "RsSodiumDecryptor";
+ type Type = super::Decrypter;
type ParentType = gst::Element;
type Instance = gst::subclass::ElementInstanceStruct<Self>;
type Class = subclass::simple::ClassStruct<Self>;
glib_object_subclass!();
- fn with_class(klass: &subclass::simple::ClassStruct<Self>) -> Self {
+ fn with_class(klass: &Self::Class) -> Self {
let templ = klass.get_pad_template("sink").unwrap();
let sinkpad = gst::Pad::from_template(&templ, Some("sink"));
@@ -608,7 +614,7 @@ impl ObjectSubclass for Decrypter {
}
}
- fn class_init(klass: &mut subclass::simple::ClassStruct<Self>) {
+ fn class_init(klass: &mut Self::Class) {
klass.set_metadata(
"Decrypter",
"Generic",
@@ -639,15 +645,14 @@ impl ObjectSubclass for Decrypter {
}
impl ObjectImpl for Decrypter {
- fn constructed(&self, obj: &glib::Object) {
+ fn constructed(&self, obj: &Self::Type) {
self.parent_constructed(obj);
- let element = obj.downcast_ref::<gst::Element>().unwrap();
- element.add_pad(&self.sinkpad).unwrap();
- element.add_pad(&self.srcpad).unwrap();
+ obj.add_pad(&self.sinkpad).unwrap();
+ obj.add_pad(&self.srcpad).unwrap();
}
- fn set_property(&self, _obj: &glib::Object, id: usize, value: &glib::Value) {
+ fn set_property(&self, _obj: &Self::Type, id: usize, value: &glib::Value) {
let prop = &PROPERTIES[id];
match *prop {
@@ -665,7 +670,7 @@ impl ObjectImpl for Decrypter {
}
}
- fn get_property(&self, _obj: &glib::Object, id: usize) -> Result<glib::Value, ()> {
+ fn get_property(&self, _obj: &Self::Type, id: usize) -> Result<glib::Value, ()> {
let prop = &PROPERTIES[id];
match *prop {
@@ -682,7 +687,7 @@ impl ObjectImpl for Decrypter {
impl ElementImpl for Decrypter {
fn change_state(
&self,
- element: &gst::Element,
+ element: &Self::Type,
transition: gst::StateChange,
) -> Result<gst::StateChangeSuccess, gst::StateChangeError> {
gst_debug!(CAT, obj: element, "Changing state {:?}", transition);
@@ -716,12 +721,3 @@ impl ElementImpl for Decrypter {
Ok(success)
}
}
-
-pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> {
- gst::Element::register(
- Some(plugin),
- "sodiumdecrypter",
- gst::Rank::None,
- Decrypter::get_type(),
- )
-}
diff --git a/generic/sodium/src/decrypter/mod.rs b/generic/sodium/src/decrypter/mod.rs
new file mode 100644
index 000000000..4bee6635c
--- /dev/null
+++ b/generic/sodium/src/decrypter/mod.rs
@@ -0,0 +1,43 @@
+// decrypter.rs
+//
+// Copyright 2019 Jordan Petridis <jordan@centricular.com>
+//
+// Permission is hereby granted, free of charge, to any person obtaining a copy
+// of this software and associated documentation files (the "Software"), to
+// deal in the Software without restriction, including without limitation the
+// rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
+// sell copies of the Software, and to permit persons to whom the Software is
+// furnished to do so, subject to the following conditions:
+//
+// The above copyright notice and this permission notice shall be included in
+// all copies or substantial portions of the Software.
+//
+// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
+// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
+// IN THE SOFTWARE.
+//
+// SPDX-License-Identifier: MIT
+
+use glib::prelude::*;
+
+mod imp;
+
+glib_wrapper! {
+ pub struct Decrypter(ObjectSubclass<imp::Decrypter>) @extends gst::Element, gst::Object;
+}
+
+unsafe impl Send for Decrypter {}
+unsafe impl Sync for Decrypter {}
+
+pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> {
+ gst::Element::register(
+ Some(plugin),
+ "sodiumdecrypter",
+ gst::Rank::None,
+ Decrypter::static_type(),
+ )
+}
diff --git a/generic/sodium/src/encrypter.rs b/generic/sodium/src/encrypter/imp.rs
index 0ba9b0e98..e17868ec7 100644
--- a/generic/sodium/src/encrypter.rs
+++ b/generic/sodium/src/encrypter/imp.rs
@@ -190,7 +190,7 @@ impl State {
}
}
-struct Encrypter {
+pub struct Encrypter {
srcpad: gst::Pad,
sinkpad: gst::Pad,
props: Mutex<Props>,
@@ -201,7 +201,7 @@ impl Encrypter {
fn sink_chain(
&self,
pad: &gst::Pad,
- element: &gst::Element,
+ element: &super::Encrypter,
buffer: gst::Buffer,
) -> Result<gst::FlowSuccess, gst::FlowError> {
gst_log!(CAT, obj: pad, "Handling buffer {:?}", buffer);
@@ -251,7 +251,7 @@ impl Encrypter {
Ok(gst::FlowSuccess::Ok)
}
- fn sink_event(&self, pad: &gst::Pad, element: &gst::Element, event: gst::Event) -> bool {
+ fn sink_event(&self, pad: &gst::Pad, element: &super::Encrypter, event: gst::Event) -> bool {
use gst::EventView;
gst_log!(CAT, obj: pad, "Handling event {:?}", event);
@@ -306,7 +306,7 @@ impl Encrypter {
}
}
- fn src_event(&self, pad: &gst::Pad, element: &gst::Element, event: gst::Event) -> bool {
+ fn src_event(&self, pad: &gst::Pad, element: &super::Encrypter, event: gst::Event) -> bool {
use gst::EventView;
gst_log!(CAT, obj: pad, "Handling event {:?}", event);
@@ -317,7 +317,12 @@ impl Encrypter {
}
}
- fn src_query(&self, pad: &gst::Pad, element: &gst::Element, query: &mut gst::QueryRef) -> bool {
+ fn src_query(
+ &self,
+ pad: &gst::Pad,
+ element: &super::Encrypter,
+ query: &mut gst::QueryRef,
+ ) -> bool {
use gst::QueryView;
gst_log!(CAT, obj: pad, "Handling query {:?}", query);
@@ -371,7 +376,7 @@ impl Encrypter {
let size = size + total_chunks * box_::MACBYTES as u64;
// add static offsets
- let size = size + super::HEADERS_SIZE as u64;
+ let size = size + crate::HEADERS_SIZE as u64;
gst_debug!(CAT, obj: pad, "Setting duration bytes: {}", size);
q.set(gst::format::Bytes::from(size));
@@ -385,13 +390,14 @@ impl Encrypter {
impl ObjectSubclass for Encrypter {
const NAME: &'static str = "RsSodiumEncrypter";
+ type Type = super::Encrypter;
type ParentType = gst::Element;
type Instance = gst::subclass::ElementInstanceStruct<Self>;
type Class = subclass::simple::ClassStruct<Self>;
glib_object_subclass!();
- fn with_class(klass: &subclass::simple::ClassStruct<Self>) -> Self {
+ fn with_class(klass: &Self::Class) -> Self {
let templ = klass.get_pad_template("sink").unwrap();
let sinkpad = gst::Pad::builder_with_template(&templ, Some("sink"))
.chain_function(|pad, parent, buffer| {
@@ -439,7 +445,7 @@ impl ObjectSubclass for Encrypter {
}
}
- fn class_init(klass: &mut subclass::simple::ClassStruct<Self>) {
+ fn class_init(klass: &mut Self::Class) {
klass.set_metadata(
"Encrypter",
"Generic",
@@ -470,15 +476,14 @@ impl ObjectSubclass for Encrypter {
}
impl ObjectImpl for Encrypter {
- fn constructed(&self, obj: &glib::Object) {
+ fn constructed(&self, obj: &Self::Type) {
self.parent_constructed(obj);
- let element = obj.downcast_ref::<gst::Element>().unwrap();
- element.add_pad(&self.sinkpad).unwrap();
- element.add_pad(&self.srcpad).unwrap();
+ obj.add_pad(&self.sinkpad).unwrap();
+ obj.add_pad(&self.srcpad).unwrap();
}
- fn set_property(&self, _obj: &glib::Object, id: usize, value: &glib::Value) {
+ fn set_property(&self, _obj: &Self::Type, id: usize, value: &glib::Value) {
let prop = &PROPERTIES[id];
match *prop {
@@ -501,7 +506,7 @@ impl ObjectImpl for Encrypter {
}
}
- fn get_property(&self, _obj: &glib::Object, id: usize) -> Result<glib::Value, ()> {
+ fn get_property(&self, _obj: &Self::Type, id: usize) -> Result<glib::Value, ()> {
let prop = &PROPERTIES[id];
match *prop {
@@ -523,7 +528,7 @@ impl ObjectImpl for Encrypter {
impl ElementImpl for Encrypter {
fn change_state(
&self,
- element: &gst::Element,
+ element: &Self::Type,
transition: gst::StateChange,
) -> Result<gst::StateChangeSuccess, gst::StateChangeError> {
gst_debug!(CAT, obj: element, "Changing state {:?}", transition);
@@ -557,12 +562,3 @@ impl ElementImpl for Encrypter {
Ok(success)
}
}
-
-pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> {
- gst::Element::register(
- Some(plugin),
- "sodiumencrypter",
- gst::Rank::None,
- Encrypter::get_type(),
- )
-}
diff --git a/generic/sodium/src/encrypter/mod.rs b/generic/sodium/src/encrypter/mod.rs
new file mode 100644
index 000000000..b71dd3f91
--- /dev/null
+++ b/generic/sodium/src/encrypter/mod.rs
@@ -0,0 +1,43 @@
+// encrypter.rs
+//
+// Copyright 2019 Jordan Petridis <jordan@centricular.com>
+//
+// Permission is hereby granted, free of charge, to any person obtaining a copy
+// of this software and associated documentation files (the "Software"), to
+// deal in the Software without restriction, including without limitation the
+// rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
+// sell copies of the Software, and to permit persons to whom the Software is
+// furnished to do so, subject to the following conditions:
+//
+// The above copyright notice and this permission notice shall be included in
+// all copies or substantial portions of the Software.
+//
+// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
+// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
+// IN THE SOFTWARE.
+//
+// SPDX-License-Identifier: MIT
+
+use glib::prelude::*;
+
+mod imp;
+
+glib_wrapper! {
+ pub struct Encrypter(ObjectSubclass<imp::Encrypter>) @extends gst::Element, gst::Object;
+}
+
+unsafe impl Send for Encrypter {}
+unsafe impl Sync for Encrypter {}
+
+pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> {
+ gst::Element::register(
+ Some(plugin),
+ "sodiumencrypter",
+ gst::Rank::None,
+ Encrypter::static_type(),
+ )
+}
diff --git a/generic/threadshare/src/appsrc.rs b/generic/threadshare/src/appsrc/imp.rs
index ae6f7b9a1..85247392b 100644
--- a/generic/threadshare/src/appsrc.rs
+++ b/generic/threadshare/src/appsrc/imp.rs
@@ -176,7 +176,7 @@ impl AppSrcPadHandler {
self.0.state.lock().await.need_segment = true;
}
- async fn push_prelude(&self, pad: &PadSrcRef<'_>, _element: &gst::Element) {
+ async fn push_prelude(&self, pad: &PadSrcRef<'_>, _element: &super::AppSrc) {
let mut state = self.0.state.lock().await;
if state.need_initial_events {
gst_debug!(CAT, obj: pad.gst_pad(), "Pushing initial events");
@@ -207,7 +207,7 @@ impl AppSrcPadHandler {
async fn push_item(
&self,
pad: &PadSrcRef<'_>,
- element: &gst::Element,
+ element: &super::AppSrc,
item: StreamItem,
) -> Result<gst::FlowSuccess, gst::FlowError> {
gst_log!(CAT, obj: pad.gst_pad(), "Handling {:?}", item);
@@ -316,7 +316,7 @@ impl PadSrcHandler for AppSrcPadHandler {
#[derive(Debug)]
struct AppSrcTask {
- element: gst::Element,
+ element: super::AppSrc,
src_pad: PadSrcWeak,
src_pad_handler: AppSrcPadHandler,
receiver: mpsc::Receiver<StreamItem>,
@@ -324,7 +324,7 @@ struct AppSrcTask {
impl AppSrcTask {
fn new(
- element: &gst::Element,
+ element: &super::AppSrc,
src_pad: &PadSrc,
src_pad_handler: &AppSrcPadHandler,
receiver: mpsc::Receiver<StreamItem>,
@@ -422,7 +422,7 @@ impl TaskImpl for AppSrcTask {
}
#[derive(Debug)]
-struct AppSrc {
+pub struct AppSrc {
src_pad: PadSrc,
src_pad_handler: AppSrcPadHandler,
task: Task,
@@ -431,7 +431,7 @@ struct AppSrc {
}
impl AppSrc {
- fn push_buffer(&self, element: &gst::Element, mut buffer: gst::Buffer) -> bool {
+ fn push_buffer(&self, element: &super::AppSrc, mut buffer: gst::Buffer) -> bool {
let state = self.task.lock_state();
if *state != TaskState::Started && *state != TaskState::Paused {
gst_debug!(CAT, obj: element, "Rejecting buffer due to element state");
@@ -469,7 +469,7 @@ impl AppSrc {
}
}
- fn end_of_stream(&self, element: &gst::Element) -> bool {
+ fn end_of_stream(&self, element: &super::AppSrc) -> bool {
let mut sender = self.sender.lock().unwrap();
let sender = match sender.as_mut() {
Some(sender) => sender,
@@ -485,7 +485,7 @@ impl AppSrc {
}
}
- fn prepare(&self, element: &gst::Element) -> Result<(), gst::ErrorMessage> {
+ fn prepare(&self, element: &super::AppSrc) -> Result<(), gst::ErrorMessage> {
let settings = self.settings.lock().unwrap();
gst_debug!(CAT, obj: element, "Preparing");
@@ -526,7 +526,7 @@ impl AppSrc {
Ok(())
}
- fn unprepare(&self, element: &gst::Element) {
+ fn unprepare(&self, element: &super::AppSrc) {
gst_debug!(CAT, obj: element, "Unpreparing");
*self.sender.lock().unwrap() = None;
@@ -535,21 +535,21 @@ impl AppSrc {
gst_debug!(CAT, obj: element, "Unprepared");
}
- fn stop(&self, element: &gst::Element) -> Result<(), gst::ErrorMessage> {
+ fn stop(&self, element: &super::AppSrc) -> Result<(), gst::ErrorMessage> {
gst_debug!(CAT, obj: element, "Stopping");
self.task.stop()?;
gst_debug!(CAT, obj: element, "Stopped");
Ok(())
}
- fn start(&self, element: &gst::Element) -> Result<(), gst::ErrorMessage> {
+ fn start(&self, element: &super::AppSrc) -> Result<(), gst::ErrorMessage> {
gst_debug!(CAT, obj: element, "Starting");
self.task.start()?;
gst_debug!(CAT, obj: element, "Started");
Ok(())
}
- fn pause(&self, element: &gst::Element) -> Result<(), gst::ErrorMessage> {
+ fn pause(&self, element: &super::AppSrc) -> Result<(), gst::ErrorMessage> {
gst_debug!(CAT, obj: element, "Pausing");
self.task.pause()?;
gst_debug!(CAT, obj: element, "Paused");
@@ -559,13 +559,14 @@ impl AppSrc {
impl ObjectSubclass for AppSrc {
const NAME: &'static str = "RsTsAppSrc";
+ type Type = super::AppSrc;
type ParentType = gst::Element;
type Instance = gst::subclass::ElementInstanceStruct<Self>;
type Class = subclass::simple::ClassStruct<Self>;
glib_object_subclass!();
- fn class_init(klass: &mut subclass::simple::ClassStruct<Self>) {
+ fn class_init(klass: &mut Self::Class) {
klass.set_metadata(
"Thread-sharing app source",
"Source/Generic",
@@ -593,7 +594,7 @@ impl ObjectSubclass for AppSrc {
bool::static_type(),
|_, args| {
let element = args[0]
- .get::<gst::Element>()
+ .get::<super::AppSrc>()
.expect("signal arg")
.expect("missing signal arg");
let buffer = args[1]
@@ -613,7 +614,7 @@ impl ObjectSubclass for AppSrc {
bool::static_type(),
|_, args| {
let element = args[0]
- .get::<gst::Element>()
+ .get::<super::AppSrc>()
.expect("signal arg")
.expect("missing signal arg");
let appsrc = Self::from_instance(&element);
@@ -622,7 +623,7 @@ impl ObjectSubclass for AppSrc {
);
}
- fn with_class(klass: &subclass::simple::ClassStruct<Self>) -> Self {
+ fn with_class(klass: &Self::Class) -> Self {
let src_pad_handler = AppSrcPadHandler::default();
Self {
@@ -639,7 +640,7 @@ impl ObjectSubclass for AppSrc {
}
impl ObjectImpl for AppSrc {
- fn set_property(&self, _obj: &glib::Object, id: usize, value: &glib::Value) {
+ fn set_property(&self, _obj: &Self::Type, id: usize, value: &glib::Value) {
let prop = &PROPERTIES[id];
let mut settings = self.settings.lock().unwrap();
@@ -666,7 +667,7 @@ impl ObjectImpl for AppSrc {
}
}
- fn get_property(&self, _obj: &glib::Object, id: usize) -> Result<glib::Value, ()> {
+ fn get_property(&self, _obj: &Self::Type, id: usize) -> Result<glib::Value, ()> {
let prop = &PROPERTIES[id];
let settings = self.settings.lock().unwrap();
@@ -680,20 +681,19 @@ impl ObjectImpl for AppSrc {
}
}
- fn constructed(&self, obj: &glib::Object) {
+ fn constructed(&self, obj: &Self::Type) {
self.parent_constructed(obj);
- let element = obj.downcast_ref::<gst::Element>().unwrap();
- element.add_pad(self.src_pad.gst_pad()).unwrap();
+ obj.add_pad(self.src_pad.gst_pad()).unwrap();
- super::set_element_flags(element, gst::ElementFlags::SOURCE);
+ crate::set_element_flags(obj, gst::ElementFlags::SOURCE);
}
}
impl ElementImpl for AppSrc {
fn change_state(
&self,
- element: &gst::Element,
+ element: &Self::Type,
transition: gst::StateChange,
) -> Result<gst::StateChangeSuccess, gst::StateChangeError> {
gst_trace!(CAT, obj: element, "Changing state {:?}", transition);
@@ -735,12 +735,3 @@ impl ElementImpl for AppSrc {
Ok(success)
}
}
-
-pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> {
- gst::Element::register(
- Some(plugin),
- "ts-appsrc",
- gst::Rank::None,
- AppSrc::get_type(),
- )
-}
diff --git a/generic/threadshare/src/appsrc/mod.rs b/generic/threadshare/src/appsrc/mod.rs
new file mode 100644
index 000000000..6f1e17296
--- /dev/null
+++ b/generic/threadshare/src/appsrc/mod.rs
@@ -0,0 +1,40 @@
+// Copyright (C) 2018 Sebastian Dröge <sebastian@centricular.com>
+// Copyright (C) 2019-2020 François Laignel <fengalin@free.fr>
+//
+// This library is free software; you can redistribute it and/or
+// modify it under the terms of the GNU Library General Public
+// License as published by the Free Software Foundation; either
+// version 2 of the License, or (at your option) any later version.
+//
+// This library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+// Library General Public License for more details.
+//
+// You should have received a copy of the GNU Library General Public
+// License along with this library; if not, write to the
+// Free Software Foundation, Inc., 51 Franklin Street, Suite 500,
+// Boston, MA 02110-1335, USA.
+
+use glib::glib_wrapper;
+use glib::prelude::*;
+
+mod imp;
+
+glib_wrapper! {
+ pub struct AppSrc(ObjectSubclass<imp::AppSrc>) @extends gst::Element, gst::Object;
+}
+
+// GStreamer elements need to be thread-safe. For the private implementation this is automatically
+// enforced but for the public wrapper type we need to specify this manually.
+unsafe impl Send for AppSrc {}
+unsafe impl Sync for AppSrc {}
+
+pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> {
+ gst::Element::register(
+ Some(plugin),
+ "ts-appsrc",
+ gst::Rank::None,
+ AppSrc::static_type(),
+ )
+}
diff --git a/generic/threadshare/src/inputselector.rs b/generic/threadshare/src/inputselector/imp.rs
index 295d4537e..914e4f39f 100644
--- a/generic/threadshare/src/inputselector.rs
+++ b/generic/threadshare/src/inputselector/imp.rs
@@ -110,7 +110,7 @@ struct InputSelectorPadSinkHandler(Arc<Mutex<InputSelectorPadSinkHandlerInner>>)
impl InputSelectorPadSinkHandler {
/* Wait until specified time */
- async fn sync(&self, element: &gst::Element, running_time: gst::ClockTime) {
+ async fn sync(&self, element: &super::InputSelector, running_time: gst::ClockTime) {
let now = element.get_current_running_time();
if let Some(delay) = running_time
@@ -124,7 +124,7 @@ impl InputSelectorPadSinkHandler {
async fn handle_item(
&self,
pad: &PadSinkRef<'_>,
- element: &gst::Element,
+ element: &super::InputSelector,
mut buffer: gst::Buffer,
) -> Result<gst::FlowSuccess, gst::FlowError> {
let inputselector = InputSelector::from_instance(element);
@@ -199,7 +199,7 @@ impl PadSinkHandler for InputSelectorPadSinkHandler {
buffer: gst::Buffer,
) -> BoxFuture<'static, Result<gst::FlowSuccess, gst::FlowError>> {
let this = self.clone();
- let element = element.clone();
+ let element = element.clone().downcast::<super::InputSelector>().unwrap();
let pad_weak = pad.downgrade();
async move {
let pad = pad_weak.upgrade().expect("PadSink no longer exists");
@@ -216,7 +216,7 @@ impl PadSinkHandler for InputSelectorPadSinkHandler {
list: gst::BufferList,
) -> BoxFuture<'static, Result<gst::FlowSuccess, gst::FlowError>> {
let this = self.clone();
- let element = element.clone();
+ let element = element.clone().downcast::<super::InputSelector>().unwrap();
let pad_weak = pad.downgrade();
async move {
let pad = pad_weak.upgrade().expect("PadSink no longer exists");
@@ -400,7 +400,7 @@ impl Default for Pads {
}
#[derive(Debug)]
-struct InputSelector {
+pub struct InputSelector {
src_pad: PadSrc,
state: Mutex<State>,
settings: Mutex<Settings>,
@@ -416,7 +416,7 @@ lazy_static! {
}
impl InputSelector {
- fn unprepare(&self, element: &gst::Element) -> Result<(), ()> {
+ fn unprepare(&self, element: &super::InputSelector) -> Result<(), ()> {
let mut state = self.state.lock().unwrap();
gst_debug!(CAT, obj: element, "Unpreparing");
*state = State::default();
@@ -428,13 +428,14 @@ impl InputSelector {
impl ObjectSubclass for InputSelector {
const NAME: &'static str = "RsTsInputSelector";
+ type Type = super::InputSelector;
type ParentType = gst::Element;
type Instance = gst::subclass::ElementInstanceStruct<Self>;
type Class = subclass::simple::ClassStruct<Self>;
glib_object_subclass!();
- fn class_init(klass: &mut subclass::simple::ClassStruct<Self>) {
+ fn class_init(klass: &mut Self::Class) {
klass.set_metadata(
"Thread-sharing input selector",
"Generic",
@@ -465,7 +466,7 @@ impl ObjectSubclass for InputSelector {
klass.install_properties(&PROPERTIES);
}
- fn with_class(klass: &subclass::simple::ClassStruct<Self>) -> Self {
+ fn with_class(klass: &Self::Class) -> Self {
Self {
src_pad: PadSrc::new(
gst::Pad::from_template(&klass.get_pad_template("src").unwrap(), Some("src")),
@@ -479,7 +480,7 @@ impl ObjectSubclass for InputSelector {
}
impl ObjectImpl for InputSelector {
- fn set_property(&self, _obj: &glib::Object, id: usize, value: &glib::Value) {
+ fn set_property(&self, _obj: &Self::Type, id: usize, value: &glib::Value) {
let prop = &PROPERTIES[id];
match *prop {
@@ -526,7 +527,7 @@ impl ObjectImpl for InputSelector {
}
}
- fn get_property(&self, _obj: &glib::Object, id: usize) -> Result<glib::Value, ()> {
+ fn get_property(&self, _obj: &Self::Type, id: usize) -> Result<glib::Value, ()> {
let prop = &PROPERTIES[id];
match *prop {
@@ -547,20 +548,18 @@ impl ObjectImpl for InputSelector {
}
}
- fn constructed(&self, obj: &glib::Object) {
+ fn constructed(&self, obj: &Self::Type) {
self.parent_constructed(obj);
- let element = obj.downcast_ref::<gst::Element>().unwrap();
- element.add_pad(self.src_pad.gst_pad()).unwrap();
- element
- .set_element_flags(gst::ElementFlags::PROVIDE_CLOCK | gst::ElementFlags::REQUIRE_CLOCK);
+ obj.add_pad(self.src_pad.gst_pad()).unwrap();
+ obj.set_element_flags(gst::ElementFlags::PROVIDE_CLOCK | gst::ElementFlags::REQUIRE_CLOCK);
}
}
impl ElementImpl for InputSelector {
fn change_state(
&self,
- element: &gst::Element,
+ element: &Self::Type,
transition: gst::StateChange,
) -> Result<gst::StateChangeSuccess, gst::StateChangeError> {
gst_trace!(CAT, obj: element, "Changing state {:?}", transition);
@@ -586,7 +585,7 @@ impl ElementImpl for InputSelector {
fn request_new_pad(
&self,
- element: &gst::Element,
+ element: &Self::Type,
templ: &gst::PadTemplate,
_name: Option<String>,
_caps: Option<&gst::Caps>,
@@ -615,7 +614,7 @@ impl ElementImpl for InputSelector {
Some(ret)
}
- fn release_pad(&self, element: &gst::Element, pad: &gst::Pad) {
+ fn release_pad(&self, element: &Self::Type, pad: &gst::Pad) {
let mut pads = self.pads.lock().unwrap();
let sink_pad = pads.sink_pads.remove(pad).unwrap();
drop(sink_pad);
@@ -625,16 +624,7 @@ impl ElementImpl for InputSelector {
let _ = element.post_message(gst::message::Latency::builder().src(element).build());
}
- fn provide_clock(&self, _element: &gst::Element) -> Option<gst::Clock> {
+ fn provide_clock(&self, _element: &Self::Type) -> Option<gst::Clock> {
Some(gst::SystemClock::obtain())
}
}
-
-pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> {
- gst::Element::register(
- Some(plugin),
- "ts-input-selector",
- gst::Rank::None,
- InputSelector::get_type(),
- )
-}
diff --git a/generic/threadshare/src/inputselector/mod.rs b/generic/threadshare/src/inputselector/mod.rs
new file mode 100644
index 000000000..1b8d9e73d
--- /dev/null
+++ b/generic/threadshare/src/inputselector/mod.rs
@@ -0,0 +1,39 @@
+// Copyright (C) 2020 Mathieu Duponchelle <mathieu@centricular.com>
+//
+// This library is free software; you can redistribute it and/or
+// modify it under the terms of the GNU Library General Public
+// License as published by the Free Software Foundation; either
+// version 2 of the License, or (at your option) any later version.
+//
+// This library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+// Library General Public License for more details.
+//
+// You should have received a copy of the GNU Library General Public
+// License along with this library; if not, write to the
+// Free Software Foundation, Inc., 51 Franklin Street, Suite 500,
+// Boston, MA 02110-1335, USA.
+
+use glib::glib_wrapper;
+use glib::prelude::*;
+
+mod imp;
+
+glib_wrapper! {
+ pub struct InputSelector(ObjectSubclass<imp::InputSelector>) @extends gst::Element, gst::Object;
+}
+
+// GStreamer elements need to be thread-safe. For the private implementation this is automatically
+// enforced but for the public wrapper type we need to specify this manually.
+unsafe impl Send for InputSelector {}
+unsafe impl Sync for InputSelector {}
+
+pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> {
+ gst::Element::register(
+ Some(plugin),
+ "ts-input-selector",
+ gst::Rank::None,
+ InputSelector::static_type(),
+ )
+}
diff --git a/generic/threadshare/src/jitterbuffer/ffi.rs b/generic/threadshare/src/jitterbuffer/ffi.rs
new file mode 100644
index 000000000..46424fa14
--- /dev/null
+++ b/generic/threadshare/src/jitterbuffer/ffi.rs
@@ -0,0 +1,115 @@
+// Copyright (C) 2019 Sebastian Dröge <sebastian@centricular.com>
+//
+// This library is free software; you can redistribute it and/or
+// modify it under the terms of the GNU Library General Public
+// License as published by the Free Software Foundation; either
+// version 2 of the License, or (at your option) any later version.
+//
+// This library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+// Library General Public License for more details.
+//
+// You should have received a copy of the GNU Library General Public
+// License along with this library; if not, write to the
+// Free Software Foundation, Inc., 51 Franklin Street, Suite 500,
+// Boston, MA 02110-1335, USA.
+
+use glib_ffi::{gboolean, gpointer, GList, GType};
+use glib_sys as glib_ffi;
+
+use gst_ffi::GstClockTime;
+use gstreamer_sys as gst_ffi;
+use libc::{c_int, c_uint, c_ulonglong, c_ushort, c_void};
+
+#[repr(C)]
+#[derive(Copy, Clone)]
+pub struct RTPJitterBufferItem {
+ pub data: gpointer,
+ pub next: *mut GList,
+ pub prev: *mut GList,
+ pub r#type: c_uint,
+ pub dts: GstClockTime,
+ pub pts: GstClockTime,
+ pub seqnum: c_uint,
+ pub count: c_uint,
+ pub rtptime: c_uint,
+}
+
+#[repr(C)]
+pub struct RTPJitterBuffer(c_void);
+
+#[repr(C)]
+#[derive(Copy, Clone)]
+pub struct RTPPacketRateCtx {
+ probed: gboolean,
+ clock_rate: c_int,
+ last_seqnum: c_ushort,
+ last_ts: c_ulonglong,
+ avg_packet_rate: c_uint,
+}
+
+pub type RTPJitterBufferMode = c_int;
+pub const RTP_JITTER_BUFFER_MODE_NONE: RTPJitterBufferMode = 0;
+pub const RTP_JITTER_BUFFER_MODE_SLAVE: RTPJitterBufferMode = 1;
+pub const RTP_JITTER_BUFFER_MODE_BUFFER: RTPJitterBufferMode = 2;
+pub const RTP_JITTER_BUFFER_MODE_SYNCED: RTPJitterBufferMode = 4;
+
+extern "C" {
+ pub fn rtp_jitter_buffer_new() -> *mut RTPJitterBuffer;
+ pub fn rtp_jitter_buffer_get_type() -> GType;
+ #[allow(dead_code)]
+ pub fn rtp_jitter_buffer_get_mode(jbuf: *mut RTPJitterBuffer) -> RTPJitterBufferMode;
+ #[allow(dead_code)]
+ pub fn rtp_jitter_buffer_set_mode(jbuf: *mut RTPJitterBuffer, mode: RTPJitterBufferMode);
+ #[allow(dead_code)]
+ pub fn rtp_jitter_buffer_get_delay(jbuf: *mut RTPJitterBuffer) -> GstClockTime;
+ pub fn rtp_jitter_buffer_set_delay(jbuf: *mut RTPJitterBuffer, delay: GstClockTime);
+ pub fn rtp_jitter_buffer_set_clock_rate(jbuf: *mut RTPJitterBuffer, clock_rate: c_uint);
+ #[allow(dead_code)]
+ pub fn rtp_jitter_buffer_get_clock_rate(jbuf: *mut RTPJitterBuffer) -> c_uint;
+ pub fn rtp_jitter_buffer_reset_skew(jbuf: *mut RTPJitterBuffer);
+
+ pub fn rtp_jitter_buffer_flush(jbuf: *mut RTPJitterBuffer, free_func: glib_ffi::GFunc);
+ pub fn rtp_jitter_buffer_find_earliest(
+ jbuf: *mut RTPJitterBuffer,
+ pts: *mut GstClockTime,
+ seqnum: *mut c_uint,
+ );
+ pub fn rtp_jitter_buffer_calculate_pts(
+ jbuf: *mut RTPJitterBuffer,
+ dts: GstClockTime,
+ estimated_dts: gboolean,
+ rtptime: c_uint,
+ base_time: GstClockTime,
+ gap: c_int,
+ is_rtx: gboolean,
+ ) -> GstClockTime;
+ pub fn rtp_jitter_buffer_insert(
+ jbuf: *mut RTPJitterBuffer,
+ item: *mut RTPJitterBufferItem,
+ head: *mut gboolean,
+ percent: *mut c_int,
+ ) -> gboolean;
+ pub fn rtp_jitter_buffer_pop(
+ jbuf: *mut RTPJitterBuffer,
+ percent: *mut c_int,
+ ) -> *mut RTPJitterBufferItem;
+ pub fn rtp_jitter_buffer_peek(jbuf: *mut RTPJitterBuffer) -> *mut RTPJitterBufferItem;
+
+ pub fn gst_rtp_packet_rate_ctx_reset(ctx: *mut RTPPacketRateCtx, clock_rate: c_int);
+ pub fn gst_rtp_packet_rate_ctx_update(
+ ctx: *mut RTPPacketRateCtx,
+ seqnum: c_ushort,
+ ts: c_uint,
+ ) -> c_uint;
+ pub fn gst_rtp_packet_rate_ctx_get_max_dropout(
+ ctx: *mut RTPPacketRateCtx,
+ time_ms: c_int,
+ ) -> c_uint;
+ #[allow(dead_code)]
+ pub fn gst_rtp_packet_rate_ctx_get_max_disorder(
+ ctx: *mut RTPPacketRateCtx,
+ time_ms: c_int,
+ ) -> c_uint;
+}
diff --git a/generic/threadshare/src/jitterbuffer/imp.rs b/generic/threadshare/src/jitterbuffer/imp.rs
new file mode 100644
index 000000000..895efbc06
--- /dev/null
+++ b/generic/threadshare/src/jitterbuffer/imp.rs
@@ -0,0 +1,1636 @@
+// Copyright (C) 2018 Sebastian Dröge <sebastian@centricular.com>
+//
+// This library is free software; you can redistribute it and/or
+// modify it under the terms of the GNU Library General Public
+// License as published by the Free Software Foundation; either
+// version 2 of the License, or (at your option) any later version.
+//
+// This library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+// Library General Public License for more details.
+//
+// You should have received a copy of the GNU Library General Public
+// License along with this library; if not, write to the
+// Free Software Foundation, Inc., 51 Franklin Street, Suite 500,
+// Boston, MA 02110-1335, USA.
+
+use futures::future::BoxFuture;
+use futures::future::{abortable, AbortHandle, Aborted};
+use futures::prelude::*;
+
+use glib::glib_object_subclass;
+use glib::prelude::*;
+use glib::subclass;
+use glib::subclass::prelude::*;
+
+use gst::prelude::*;
+use gst::subclass::prelude::*;
+use gst::{gst_debug, gst_element_error, gst_error, gst_error_msg, gst_info, gst_log, gst_trace};
+use gst_rtp::RTPBuffer;
+
+use lazy_static::lazy_static;
+
+use std::cmp::{max, min, Ordering};
+use std::collections::{BTreeSet, VecDeque};
+use std::mem;
+use std::sync::Arc;
+use std::sync::Mutex as StdMutex;
+use std::time::Duration;
+
+use crate::runtime::prelude::*;
+use crate::runtime::{self, Context, PadSink, PadSinkRef, PadSrc, PadSrcRef, Task};
+
+use super::jitterbuffer::{RTPJitterBuffer, RTPJitterBufferItem, RTPPacketRateCtx};
+
+const DEFAULT_LATENCY_MS: u32 = 200;
+const DEFAULT_DO_LOST: bool = false;
+const DEFAULT_MAX_DROPOUT_TIME: u32 = 60000;
+const DEFAULT_MAX_MISORDER_TIME: u32 = 2000;
+const DEFAULT_CONTEXT: &str = "";
+const DEFAULT_CONTEXT_WAIT: u32 = 0;
+
+#[derive(Debug, Clone)]
+struct Settings {
+ latency_ms: u32,
+ do_lost: bool,
+ max_dropout_time: u32,
+ max_misorder_time: u32,
+ context: String,
+ context_wait: u32,
+}
+
+impl Default for Settings {
+ fn default() -> Self {
+ Settings {
+ latency_ms: DEFAULT_LATENCY_MS,
+ do_lost: DEFAULT_DO_LOST,
+ max_dropout_time: DEFAULT_MAX_DROPOUT_TIME,
+ max_misorder_time: DEFAULT_MAX_MISORDER_TIME,
+ context: DEFAULT_CONTEXT.into(),
+ context_wait: DEFAULT_CONTEXT_WAIT,
+ }
+ }
+}
+
+static PROPERTIES: [subclass::Property; 7] = [
+ subclass::Property("latency", |name| {
+ glib::ParamSpec::uint(
+ name,
+ "Buffer latency in ms",
+ "Amount of ms to buffer",
+ 0,
+ std::u32::MAX,
+ DEFAULT_LATENCY_MS,
+ glib::ParamFlags::READWRITE,
+ )
+ }),
+ subclass::Property("do-lost", |name| {
+ glib::ParamSpec::boolean(
+ name,
+ "Do Lost",
+ "Send an event downstream when a packet is lost",
+ DEFAULT_DO_LOST,
+ glib::ParamFlags::READWRITE,
+ )
+ }),
+ subclass::Property("max-dropout-time", |name| {
+ glib::ParamSpec::uint(
+ name,
+ "Max dropout time",
+ "The maximum time (milliseconds) of missing packets tolerated.",
+ 0,
+ std::u32::MAX,
+ DEFAULT_MAX_DROPOUT_TIME,
+ glib::ParamFlags::READWRITE,
+ )
+ }),
+ subclass::Property("max-misorder-time", |name| {
+ glib::ParamSpec::uint(
+ name,
+ "Max misorder time",
+ "The maximum time (milliseconds) of misordered packets tolerated.",
+ 0,
+ std::u32::MAX,
+ DEFAULT_MAX_MISORDER_TIME,
+ glib::ParamFlags::READWRITE,
+ )
+ }),
+ subclass::Property("stats", |name| {
+ glib::ParamSpec::boxed(
+ name,
+ "Statistics",
+ "Various statistics",
+ gst::Structure::static_type(),
+ glib::ParamFlags::READABLE,
+ )
+ }),
+ subclass::Property("context", |name| {
+ glib::ParamSpec::string(
+ name,
+ "Context",
+ "Context name to share threads with",
+ Some(DEFAULT_CONTEXT),
+ glib::ParamFlags::READWRITE,
+ )
+ }),
+ subclass::Property("context-wait", |name| {
+ glib::ParamSpec::uint(
+ name,
+ "Context Wait",
+ "Throttle poll loop to run at most once every this many ms",
+ 0,
+ 1000,
+ DEFAULT_CONTEXT_WAIT,
+ glib::ParamFlags::READWRITE,
+ )
+ }),
+];
+
+#[derive(Eq)]
+struct GapPacket {
+ buffer: gst::Buffer,
+ seq: u16,
+ pt: u8,
+}
+
+impl GapPacket {
+ fn new(buffer: gst::Buffer) -> Self {
+ let rtp_buffer = RTPBuffer::from_buffer_readable(&buffer).unwrap();
+ let seq = rtp_buffer.get_seq();
+ let pt = rtp_buffer.get_payload_type();
+ drop(rtp_buffer);
+
+ Self { buffer, seq, pt }
+ }
+}
+
+impl Ord for GapPacket {
+ fn cmp(&self, other: &Self) -> Ordering {
+ 0.cmp(&gst_rtp::compare_seqnum(self.seq, other.seq))
+ }
+}
+
+impl PartialOrd for GapPacket {
+ fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
+ Some(self.cmp(other))
+ }
+}
+
+impl PartialEq for GapPacket {
+ fn eq(&self, other: &Self) -> bool {
+ self.cmp(other) == Ordering::Equal
+ }
+}
+
+struct SinkHandlerInner {
+ packet_rate_ctx: RTPPacketRateCtx,
+ ips_rtptime: Option<u32>,
+ ips_pts: gst::ClockTime,
+
+ gap_packets: BTreeSet<GapPacket>,
+
+ last_pt: Option<u8>,
+
+ last_in_seqnum: Option<u16>,
+ last_rtptime: Option<u32>,
+}
+
+impl Default for SinkHandlerInner {
+ fn default() -> Self {
+ SinkHandlerInner {
+ packet_rate_ctx: RTPPacketRateCtx::new(),
+ ips_rtptime: None,
+ ips_pts: gst::CLOCK_TIME_NONE,
+ gap_packets: BTreeSet::new(),
+ last_pt: None,
+ last_in_seqnum: None,
+ last_rtptime: None,
+ }
+ }
+}
+
+#[derive(Clone, Default)]
+struct SinkHandler(Arc<StdMutex<SinkHandlerInner>>);
+
+impl SinkHandler {
+ fn clear(&self) {
+ let mut inner = self.0.lock().unwrap();
+ *inner = SinkHandlerInner::default();
+ }
+
+ // For resetting if seqnum discontinuities
+ fn reset(
+ &self,
+ inner: &mut SinkHandlerInner,
+ state: &mut State,
+ element: &super::JitterBuffer,
+ ) -> BTreeSet<GapPacket> {
+ gst_info!(CAT, obj: element, "Resetting");
+
+ state.jbuf.borrow().flush();
+ state.jbuf.borrow().reset_skew();
+ state.discont = true;
+
+ state.last_popped_seqnum = None;
+ state.last_popped_pts = gst::CLOCK_TIME_NONE;
+
+ inner.last_in_seqnum = None;
+ inner.last_rtptime = None;
+
+ state.earliest_pts = gst::CLOCK_TIME_NONE;
+ state.earliest_seqnum = None;
+
+ inner.ips_rtptime = None;
+ inner.ips_pts = gst::CLOCK_TIME_NONE;
+
+ mem::replace(&mut inner.gap_packets, BTreeSet::new())
+ }
+
+ fn parse_caps(
+ &self,
+ inner: &mut SinkHandlerInner,
+ state: &mut State,
+ element: &super::JitterBuffer,
+ caps: &gst::Caps,
+ pt: u8,
+ ) -> Result<gst::FlowSuccess, gst::FlowError> {
+ let s = caps.get_structure(0).ok_or(gst::FlowError::Error)?;
+
+ gst_info!(CAT, obj: element, "Parsing {:?}", caps);
+
+ let payload = s
+ .get_some::<i32>("payload")
+ .map_err(|_| gst::FlowError::Error)?;
+
+ if pt != 0 && payload as u8 != pt {
+ return Err(gst::FlowError::Error);
+ }
+
+ inner.last_pt = Some(pt);
+ let clock_rate = s
+ .get_some::<i32>("clock-rate")
+ .map_err(|_| gst::FlowError::Error)?;
+
+ if clock_rate <= 0 {
+ return Err(gst::FlowError::Error);
+ }
+ state.clock_rate = Some(clock_rate as u32);
+
+ inner.packet_rate_ctx.reset(clock_rate);
+ state.jbuf.borrow().set_clock_rate(clock_rate as u32);
+
+ Ok(gst::FlowSuccess::Ok)
+ }
+
+ fn calculate_packet_spacing(
+ &self,
+ inner: &mut SinkHandlerInner,
+ state: &mut State,
+ rtptime: u32,
+ pts: gst::ClockTime,
+ ) {
+ if inner.ips_rtptime != Some(rtptime) {
+ if inner.ips_pts.is_some() && pts.is_some() {
+ let new_packet_spacing = pts - inner.ips_pts;
+ let old_packet_spacing = state.packet_spacing;
+
+ assert!(old_packet_spacing.is_some());
+ if old_packet_spacing > new_packet_spacing {
+ state.packet_spacing = (new_packet_spacing + 3 * old_packet_spacing) / 4;
+ } else if !old_packet_spacing.is_zero() {
+ state.packet_spacing = (3 * new_packet_spacing + old_packet_spacing) / 4;
+ } else {
+ state.packet_spacing = new_packet_spacing;
+ }
+
+ gst_debug!(
+ CAT,
+ "new packet spacing {}, old packet spacing {} combined to {}",
+ new_packet_spacing,
+ old_packet_spacing,
+ state.packet_spacing
+ );
+ }
+ inner.ips_rtptime = Some(rtptime);
+ inner.ips_pts = pts;
+ }
+ }
+
+ fn handle_big_gap_buffer(
+ &self,
+ inner: &mut SinkHandlerInner,
+ element: &super::JitterBuffer,
+ buffer: gst::Buffer,
+ pt: u8,
+ ) -> bool {
+ let gap_packets_length = inner.gap_packets.len();
+ let mut reset = false;
+
+ gst_debug!(
+ CAT,
+ obj: element,
+ "Handling big gap, gap packets length: {}",
+ gap_packets_length
+ );
+
+ inner.gap_packets.insert(GapPacket::new(buffer));
+
+ if gap_packets_length > 0 {
+ let mut prev_gap_seq = std::u32::MAX;
+ let mut all_consecutive = true;
+
+ for gap_packet in inner.gap_packets.iter() {
+ gst_log!(
+ CAT,
+ obj: element,
+ "Looking at gap packet with seq {}",
+ gap_packet.seq,
+ );
+
+ all_consecutive = gap_packet.pt == pt;
+
+ if prev_gap_seq == std::u32::MAX {
+ prev_gap_seq = gap_packet.seq as u32;
+ } else if gst_rtp::compare_seqnum(gap_packet.seq, prev_gap_seq as u16) != -1 {
+ all_consecutive = false;
+ } else {
+ prev_gap_seq = gap_packet.seq as u32;
+ }
+
+ if !all_consecutive {
+ break;
+ }
+ }
+
+ gst_debug!(CAT, obj: element, "all consecutive: {}", all_consecutive);
+
+ if all_consecutive && gap_packets_length > 3 {
+ reset = true;
+ } else if !all_consecutive {
+ inner.gap_packets.clear();
+ }
+ }
+
+ reset
+ }
+
+ fn store(
+ &self,
+ inner: &mut SinkHandlerInner,
+ pad: &gst::Pad,
+ element: &super::JitterBuffer,
+ buffer: gst::Buffer,
+ ) -> Result<gst::FlowSuccess, gst::FlowError> {
+ let jb = JitterBuffer::from_instance(element);
+ let mut state = jb.state.lock().unwrap();
+
+ let (max_misorder_time, max_dropout_time) = {
+ let settings = jb.settings.lock().unwrap();
+ (settings.max_misorder_time, settings.max_dropout_time)
+ };
+
+ let (seq, rtptime, pt) = {
+ let rtp_buffer =
+ RTPBuffer::from_buffer_readable(&buffer).map_err(|_| gst::FlowError::Error)?;
+ (
+ rtp_buffer.get_seq(),
+ rtp_buffer.get_timestamp(),
+ rtp_buffer.get_payload_type(),
+ )
+ };
+
+ let mut pts = buffer.get_pts();
+ let mut dts = buffer.get_dts();
+ let mut estimated_dts = false;
+
+ gst_log!(
+ CAT,
+ obj: element,
+ "Storing buffer, seq: {}, rtptime: {}, pt: {}",
+ seq,
+ rtptime,
+ pt
+ );
+
+ if dts.is_none() {
+ dts = pts;
+ } else if pts.is_none() {
+ pts = dts;
+ }
+
+ if dts.is_none() {
+ dts = element.get_current_running_time();
+ pts = dts;
+
+ estimated_dts = state.clock_rate.is_some();
+ } else {
+ dts = state.segment.to_running_time(dts);
+ }
+
+ if state.clock_rate.is_none() {
+ inner.ips_rtptime = Some(rtptime);
+ inner.ips_pts = pts;
+ }
+
+ if inner.last_pt != Some(pt) {
+ inner.last_pt = Some(pt);
+ state.clock_rate = None;
+
+ gst_debug!(CAT, obj: pad, "New payload type: {}", pt);
+
+ if let Some(caps) = pad.get_current_caps() {
+ /* Ignore errors at this point, as we want to emit request-pt-map */
+ let _ = self.parse_caps(inner, &mut state, element, &caps, pt);
+ }
+ }
+
+ let mut state = {
+ if state.clock_rate.is_none() {
+ drop(state);
+ let caps = element
+ .emit("request-pt-map", &[&(pt as u32)])
+ .map_err(|_| gst::FlowError::Error)?
+ .ok_or(gst::FlowError::Error)?
+ .get::<gst::Caps>()
+ .map_err(|_| gst::FlowError::Error)?
+ .ok_or(gst::FlowError::Error)?;
+ let mut state = jb.state.lock().unwrap();
+ self.parse_caps(inner, &mut state, element, &caps, pt)?;
+ state
+ } else {
+ state
+ }
+ };
+
+ inner.packet_rate_ctx.update(seq, rtptime);
+
+ let max_dropout = inner
+ .packet_rate_ctx
+ .get_max_dropout(max_dropout_time as i32);
+ let max_misorder = inner
+ .packet_rate_ctx
+ .get_max_dropout(max_misorder_time as i32);
+
+ pts = state.jbuf.borrow().calculate_pts(
+ dts,
+ estimated_dts,
+ rtptime,
+ element.get_base_time(),
+ 0,
+ false,
+ );
+
+ if pts.is_none() {
+ gst_debug!(
+ CAT,
+ obj: element,
+ "cannot calculate a valid pts for #{}, discard",
+ seq
+ );
+ return Ok(gst::FlowSuccess::Ok);
+ }
+
+ if let Some(last_in_seqnum) = inner.last_in_seqnum {
+ let gap = gst_rtp::compare_seqnum(last_in_seqnum as u16, seq);
+ if gap == 1 {
+ self.calculate_packet_spacing(inner, &mut state, rtptime, pts);
+ } else {
+ if (gap != -1 && gap < -(max_misorder as i32)) || (gap >= max_dropout as i32) {
+ let reset = self.handle_big_gap_buffer(inner, element, buffer, pt);
+ if reset {
+ // Handle reset in `enqueue_item` to avoid recursion
+ return Err(gst::FlowError::CustomError);
+ } else {
+ return Ok(gst::FlowSuccess::Ok);
+ }
+ }
+ inner.ips_pts = gst::CLOCK_TIME_NONE;
+ inner.ips_rtptime = None;
+ }
+
+ inner.gap_packets.clear();
+ }
+
+ if let Some(last_popped_seqnum) = state.last_popped_seqnum {
+ let gap = gst_rtp::compare_seqnum(last_popped_seqnum, seq);
+
+ if gap <= 0 {
+ state.stats.num_late += 1;
+ gst_debug!(CAT, obj: element, "Dropping late {}", seq);
+ return Ok(gst::FlowSuccess::Ok);
+ }
+ }
+
+ inner.last_in_seqnum = Some(seq);
+
+ let jb_item = if estimated_dts {
+ RTPJitterBufferItem::new(buffer, gst::CLOCK_TIME_NONE, pts, Some(seq), rtptime)
+ } else {
+ RTPJitterBufferItem::new(buffer, dts, pts, Some(seq), rtptime)
+ };
+
+ let (success, _, _) = state.jbuf.borrow().insert(jb_item);
+
+ if !success {
+ /* duplicate */
+ return Ok(gst::FlowSuccess::Ok);
+ }
+
+ if Some(rtptime) == inner.last_rtptime {
+ state.equidistant -= 2;
+ } else {
+ state.equidistant += 1;
+ }
+
+ state.equidistant = min(max(state.equidistant, -7), 7);
+
+ inner.last_rtptime = Some(rtptime);
+
+ if state.earliest_pts.is_none()
+ || (pts.is_some()
+ && (pts < state.earliest_pts
+ || (pts == state.earliest_pts
+ && state
+ .earliest_seqnum
+ .map(|earliest_seqnum| seq > earliest_seqnum)
+ .unwrap_or(false))))
+ {
+ state.earliest_pts = pts;
+ state.earliest_seqnum = Some(seq);
+ }
+
+ gst_log!(CAT, obj: pad, "Stored buffer");
+
+ Ok(gst::FlowSuccess::Ok)
+ }
+
+ fn enqueue_item(
+ &self,
+ pad: &gst::Pad,
+ element: &super::JitterBuffer,
+ buffer: Option<gst::Buffer>,
+ ) -> Result<gst::FlowSuccess, gst::FlowError> {
+ let mut inner = self.0.lock().unwrap();
+
+ let mut buffers = VecDeque::new();
+ if let Some(buf) = buffer {
+ buffers.push_back(buf);
+ }
+
+ // This is to avoid recursion with `store`, `reset` and `enqueue_item`
+ while let Some(buf) = buffers.pop_front() {
+ if let Err(err) = self.store(&mut inner, pad, element, buf) {
+ match err {
+ gst::FlowError::CustomError => {
+ let jb = JitterBuffer::from_instance(element);
+ let mut state = jb.state.lock().unwrap();
+ for gap_packet in self.reset(&mut inner, &mut state, element) {
+ buffers.push_back(gap_packet.buffer);
+ }
+ }
+ other => return Err(other),
+ }
+ }
+ }
+
+ let jb = JitterBuffer::from_instance(element);
+ let mut state = jb.state.lock().unwrap();
+
+ let (latency, context_wait) = {
+ let settings = jb.settings.lock().unwrap();
+ (
+ settings.latency_ms as u64 * gst::MSECOND,
+ settings.context_wait as u64 * gst::MSECOND,
+ )
+ };
+
+ // Reschedule if needed
+ let (_, next_wakeup) =
+ jb.src_pad_handler
+ .get_next_wakeup(&element, &state, latency, context_wait);
+ if let Some((next_wakeup, _)) = next_wakeup {
+ if let Some((previous_next_wakeup, ref abort_handle)) = state.wait_handle {
+ if previous_next_wakeup.is_none() || previous_next_wakeup > next_wakeup {
+ gst_debug!(
+ CAT,
+ obj: pad,
+ "Rescheduling for new item {} < {}",
+ next_wakeup,
+ previous_next_wakeup
+ );
+ abort_handle.abort();
+ state.wait_handle = None;
+ }
+ }
+ }
+ state.last_res
+ }
+}
+
+impl PadSinkHandler for SinkHandler {
+ type ElementImpl = JitterBuffer;
+
+ fn sink_chain(
+ &self,
+ pad: &PadSinkRef,
+ _jb: &JitterBuffer,
+ element: &gst::Element,
+ buffer: gst::Buffer,
+ ) -> BoxFuture<'static, Result<gst::FlowSuccess, gst::FlowError>> {
+ let pad_weak = pad.downgrade();
+ let element = element.clone().downcast::<super::JitterBuffer>().unwrap();
+ let this = self.clone();
+
+ async move {
+ let pad = pad_weak.upgrade().expect("PadSink no longer exists");
+
+ gst_debug!(CAT, obj: pad.gst_pad(), "Handling {:?}", buffer);
+ this.enqueue_item(pad.gst_pad(), &element, Some(buffer))
+ }
+ .boxed()
+ }
+
+ fn sink_event(
+ &self,
+ pad: &PadSinkRef,
+ jb: &JitterBuffer,
+ element: &gst::Element,
+ event: gst::Event,
+ ) -> bool {
+ use gst::EventView;
+
+ gst_log!(CAT, obj: pad.gst_pad(), "Handling {:?}", event);
+
+ if let EventView::FlushStart(..) = event.view() {
+ if let Err(err) = jb.task.flush_start() {
+ gst_error!(CAT, obj: pad.gst_pad(), "FlushStart failed {:?}", err);
+ gst_element_error!(
+ element,
+ gst::StreamError::Failed,
+ ("Internal data stream error"),
+ ["FlushStart failed {:?}", err]
+ );
+ return false;
+ }
+ }
+
+ gst_log!(CAT, obj: pad.gst_pad(), "Forwarding {:?}", event);
+ jb.src_pad.gst_pad().push_event(event)
+ }
+
+ fn sink_event_serialized(
+ &self,
+ pad: &PadSinkRef,
+ _jb: &JitterBuffer,
+ element: &gst::Element,
+ event: gst::Event,
+ ) -> BoxFuture<'static, bool> {
+ use gst::EventView;
+
+ let pad_weak = pad.downgrade();
+ let element = element.clone().downcast::<super::JitterBuffer>().unwrap();
+
+ async move {
+ let pad = pad_weak.upgrade().expect("PadSink no longer exists");
+
+ gst_log!(CAT, obj: pad.gst_pad(), "Handling {:?}", event);
+
+ let jb = JitterBuffer::from_instance(&element);
+
+ let mut forward = true;
+ match event.view() {
+ EventView::Segment(e) => {
+ let mut state = jb.state.lock().unwrap();
+ state.segment = e
+ .get_segment()
+ .clone()
+ .downcast::<gst::format::Time>()
+ .unwrap();
+ }
+ EventView::FlushStop(..) => {
+ if let Err(err) = jb.task.flush_stop() {
+ gst_error!(CAT, obj: pad.gst_pad(), "FlushStop failed {:?}", err);
+ gst_element_error!(
+ element,
+ gst::StreamError::Failed,
+ ("Internal data stream error"),
+ ["FlushStop failed {:?}", err]
+ );
+ return false;
+ }
+ }
+ EventView::Eos(..) => {
+ let mut state = jb.state.lock().unwrap();
+ state.eos = true;
+ if let Some((_, abort_handle)) = state.wait_handle.take() {
+ abort_handle.abort();
+ }
+ forward = false;
+ }
+ _ => (),
+ };
+
+ if forward {
+ // FIXME: These events should really be queued up and stay in order
+ gst_log!(CAT, obj: pad.gst_pad(), "Forwarding serialized {:?}", event);
+ jb.src_pad.push_event(event).await
+ } else {
+ true
+ }
+ }
+ .boxed()
+ }
+}
+
+#[derive(Clone, Default)]
+struct SrcHandler;
+
+impl SrcHandler {
+ fn clear(&self) {}
+
+ fn generate_lost_events(
+ &self,
+ state: &mut State,
+ element: &super::JitterBuffer,
+ seqnum: u16,
+ pts: gst::ClockTime,
+ discont: &mut bool,
+ ) -> Vec<gst::Event> {
+ let (latency_ns, do_lost) = {
+ let jb = JitterBuffer::from_instance(element);
+ let settings = jb.settings.lock().unwrap();
+ (
+ settings.latency_ms as u64 * gst::MSECOND.nseconds().unwrap(),
+ settings.do_lost,
+ )
+ };
+
+ let mut events = vec![];
+
+ let last_popped_seqnum = match state.last_popped_seqnum {
+ None => return events,
+ Some(seq) => seq,
+ };
+
+ gst_debug!(
+ CAT,
+ obj: element,
+ "Generating lost events seq: {}, last popped seq: {:?}",
+ seqnum,
+ last_popped_seqnum,
+ );
+
+ let mut lost_seqnum = last_popped_seqnum.wrapping_add(1);
+ let gap = gst_rtp::compare_seqnum(lost_seqnum, seqnum) as i64;
+
+ if gap > 0 {
+ let interval =
+ pts.nseconds().unwrap() as i64 - state.last_popped_pts.nseconds().unwrap() as i64;
+ let gap = gap as u64;
+ let spacing = if interval >= 0 {
+ interval as u64 / (gap + 1)
+ } else {
+ 0
+ };
+
+ *discont = true;
+
+ if state.equidistant > 0 && gap > 1 && gap * spacing > latency_ns {
+ let n_packets = gap - latency_ns / spacing;
+
+ if do_lost {
+ let s = gst::Structure::new(
+ "GstRTPPacketLost",
+ &[
+ ("seqnum", &(lost_seqnum as u32)),
+ (
+ "timestamp",
+ &(state.last_popped_pts + gst::ClockTime(Some(spacing))),
+ ),
+ ("duration", &(n_packets * spacing)),
+ ("retry", &0),
+ ],
+ );
+
+ events.push(gst::event::CustomDownstream::new(s));
+ }
+
+ lost_seqnum = lost_seqnum.wrapping_add(n_packets as u16);
+ state.last_popped_pts += gst::ClockTime(Some(n_packets * spacing));
+ state.stats.num_lost += n_packets;
+ }
+
+ while lost_seqnum != seqnum {
+ let timestamp = state.last_popped_pts + gst::ClockTime(Some(spacing));
+ let duration = if state.equidistant > 0 { spacing } else { 0 };
+
+ state.last_popped_pts = timestamp;
+
+ if do_lost {
+ let s = gst::Structure::new(
+ "GstRTPPacketLost",
+ &[
+ ("seqnum", &(lost_seqnum as u32)),
+ ("timestamp", &timestamp),
+ ("duration", &duration),
+ ("retry", &0),
+ ],
+ );
+
+ events.push(gst::event::CustomDownstream::new(s));
+ }
+
+ state.stats.num_lost += 1;
+
+ lost_seqnum = lost_seqnum.wrapping_add(1);
+ }
+ }
+
+ events
+ }
+
+ async fn pop_and_push(
+ &self,
+ element: &super::JitterBuffer,
+ ) -> Result<gst::FlowSuccess, gst::FlowError> {
+ let jb = JitterBuffer::from_instance(element);
+
+ let (lost_events, buffer, seq) = {
+ let mut state = jb.state.lock().unwrap();
+
+ let mut discont = false;
+ let (jb_item, _) = state.jbuf.borrow().pop();
+
+ let jb_item = match jb_item {
+ None => {
+ if state.eos {
+ return Err(gst::FlowError::Eos);
+ } else {
+ return Ok(gst::FlowSuccess::Ok);
+ }
+ }
+ Some(item) => item,
+ };
+
+ let dts = jb_item.get_dts();
+ let pts = jb_item.get_pts();
+ let seq = jb_item.get_seqnum();
+ let mut buffer = jb_item.into_buffer();
+
+ let lost_events = {
+ let buffer = buffer.make_mut();
+
+ buffer.set_dts(state.segment.to_running_time(dts));
+ buffer.set_pts(state.segment.to_running_time(pts));
+
+ if state.last_popped_pts.is_some() && buffer.get_pts() < state.last_popped_pts {
+ buffer.set_pts(state.last_popped_pts)
+ }
+
+ let lost_events = if let Some(seq) = seq {
+ self.generate_lost_events(&mut state, element, seq, pts, &mut discont)
+ } else {
+ vec![]
+ };
+
+ if state.discont {
+ discont = true;
+ state.discont = false;
+ }
+
+ if discont {
+ buffer.set_flags(gst::BufferFlags::DISCONT);
+ }
+
+ lost_events
+ };
+
+ state.last_popped_pts = buffer.get_pts();
+ if let Some(pts) = state.last_popped_pts.nseconds() {
+ state.position = pts.into();
+ }
+ state.last_popped_seqnum = seq;
+
+ state.stats.num_pushed += 1;
+
+ (lost_events, buffer, seq)
+ };
+
+ for event in lost_events {
+ gst_debug!(CAT, obj: jb.src_pad.gst_pad(), "Pushing lost event {:?}", event);
+ let _ = jb.src_pad.push_event(event).await;
+ }
+
+ gst_debug!(CAT, obj: jb.src_pad.gst_pad(), "Pushing {:?} with seq {:?}", buffer, seq);
+
+ jb.src_pad.push(buffer).await
+ }
+
+ fn get_next_wakeup(
+ &self,
+ element: &super::JitterBuffer,
+ state: &State,
+ latency: gst::ClockTime,
+ context_wait: gst::ClockTime,
+ ) -> (gst::ClockTime, Option<(gst::ClockTime, Duration)>) {
+ let now = element.get_current_running_time();
+
+ gst_debug!(
+ CAT,
+ obj: element,
+ "Now is {}, EOS {}, earliest pts is {}, packet_spacing {} and latency {}",
+ now,
+ state.eos,
+ state.earliest_pts,
+ state.packet_spacing,
+ latency
+ );
+
+ if state.eos {
+ gst_debug!(CAT, obj: element, "EOS, not waiting");
+ return (now, Some((now, Duration::from_nanos(0))));
+ }
+
+ if state.earliest_pts.is_none() {
+ return (now, None);
+ }
+
+ let next_wakeup = state.earliest_pts + latency - state.packet_spacing - context_wait / 2;
+
+ let delay = next_wakeup
+ .saturating_sub(now)
+ .unwrap_or_else(gst::ClockTime::zero)
+ .nseconds()
+ .unwrap();
+
+ gst_debug!(
+ CAT,
+ obj: element,
+ "Next wakeup at {} with delay {}",
+ next_wakeup,
+ delay
+ );
+
+ (now, Some((next_wakeup, Duration::from_nanos(delay))))
+ }
+}
+
+impl PadSrcHandler for SrcHandler {
+ type ElementImpl = JitterBuffer;
+
+ fn src_event(
+ &self,
+ pad: &PadSrcRef,
+ jb: &JitterBuffer,
+ element: &gst::Element,
+ event: gst::Event,
+ ) -> bool {
+ use gst::EventView;
+
+ gst_log!(CAT, obj: pad.gst_pad(), "Handling {:?}", event);
+
+ match event.view() {
+ EventView::FlushStart(..) => {
+ if let Err(err) = jb.task.flush_start() {
+ gst_error!(CAT, obj: pad.gst_pad(), "FlushStart failed {:?}", err);
+ gst_element_error!(
+ element,
+ gst::StreamError::Failed,
+ ("Internal data stream error"),
+ ["FlushStart failed {:?}", err]
+ );
+ return false;
+ }
+ }
+ EventView::FlushStop(..) => {
+ if let Err(err) = jb.task.flush_stop() {
+ gst_error!(CAT, obj: pad.gst_pad(), "FlushStop failed {:?}", err);
+ gst_element_error!(
+ element,
+ gst::StreamError::Failed,
+ ("Internal data stream error"),
+ ["FlushStop failed {:?}", err]
+ );
+ return false;
+ }
+ }
+ _ => (),
+ }
+
+ gst_log!(CAT, obj: pad.gst_pad(), "Forwarding {:?}", event);
+ jb.sink_pad.gst_pad().push_event(event)
+ }
+
+ fn src_query(
+ &self,
+ pad: &PadSrcRef,
+ jb: &JitterBuffer,
+ _element: &gst::Element,
+ query: &mut gst::QueryRef,
+ ) -> bool {
+ use gst::QueryView;
+
+ gst_log!(CAT, obj: pad.gst_pad(), "Forwarding {:?}", query);
+
+ match query.view_mut() {
+ QueryView::Latency(ref mut q) => {
+ let mut peer_query = gst::query::Latency::new();
+
+ let ret = jb.sink_pad.gst_pad().peer_query(&mut peer_query);
+
+ if ret {
+ let settings = jb.settings.lock().unwrap();
+ let (_, mut min_latency, _) = peer_query.get_result();
+ min_latency += (settings.latency_ms as u64) * gst::SECOND;
+ let max_latency = gst::CLOCK_TIME_NONE;
+
+ q.set(true, min_latency, max_latency);
+ }
+
+ ret
+ }
+ QueryView::Position(ref mut q) => {
+ if q.get_format() != gst::Format::Time {
+ jb.sink_pad.gst_pad().peer_query(query)
+ } else {
+ let state = jb.state.lock().unwrap();
+ let position = state.position;
+ q.set(position);
+ true
+ }
+ }
+ _ => jb.sink_pad.gst_pad().peer_query(query),
+ }
+ }
+}
+
+#[derive(Debug)]
+struct Stats {
+ num_pushed: u64,
+ num_lost: u64,
+ num_late: u64,
+}
+
+impl Default for Stats {
+ fn default() -> Self {
+ Self {
+ num_pushed: 0,
+ num_lost: 0,
+ num_late: 0,
+ }
+ }
+}
+
+// Shared state between element, sink and source pad
+struct State {
+ jbuf: glib::SendUniqueCell<RTPJitterBuffer>,
+
+ last_res: Result<gst::FlowSuccess, gst::FlowError>,
+ position: gst::ClockTime,
+
+ segment: gst::FormattedSegment<gst::ClockTime>,
+ clock_rate: Option<u32>,
+
+ packet_spacing: gst::ClockTime,
+ equidistant: i32,
+
+ discont: bool,
+ eos: bool,
+
+ last_popped_seqnum: Option<u16>,
+ last_popped_pts: gst::ClockTime,
+
+ stats: Stats,
+
+ earliest_pts: gst::ClockTime,
+ earliest_seqnum: Option<u16>,
+
+ wait_handle: Option<(gst::ClockTime, AbortHandle)>,
+}
+
+impl Default for State {
+ fn default() -> State {
+ State {
+ jbuf: glib::SendUniqueCell::new(RTPJitterBuffer::new()).unwrap(),
+
+ last_res: Ok(gst::FlowSuccess::Ok),
+ position: gst::CLOCK_TIME_NONE,
+
+ segment: gst::FormattedSegment::<gst::ClockTime>::new(),
+ clock_rate: None,
+
+ packet_spacing: gst::ClockTime::zero(),
+ equidistant: 0,
+
+ discont: true,
+ eos: false,
+
+ last_popped_seqnum: None,
+ last_popped_pts: gst::CLOCK_TIME_NONE,
+
+ stats: Stats::default(),
+
+ earliest_pts: gst::CLOCK_TIME_NONE,
+ earliest_seqnum: None,
+
+ wait_handle: None,
+ }
+ }
+}
+
+struct JitterBufferTask {
+ element: super::JitterBuffer,
+ src_pad_handler: SrcHandler,
+ sink_pad_handler: SinkHandler,
+}
+
+impl JitterBufferTask {
+ fn new(
+ element: &super::JitterBuffer,
+ src_pad_handler: &SrcHandler,
+ sink_pad_handler: &SinkHandler,
+ ) -> Self {
+ JitterBufferTask {
+ element: element.clone(),
+ src_pad_handler: src_pad_handler.clone(),
+ sink_pad_handler: sink_pad_handler.clone(),
+ }
+ }
+}
+
+impl TaskImpl for JitterBufferTask {
+ fn start(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
+ async move {
+ gst_log!(CAT, obj: &self.element, "Starting task");
+
+ self.src_pad_handler.clear();
+ self.sink_pad_handler.clear();
+
+ let jb = JitterBuffer::from_instance(&self.element);
+ *jb.state.lock().unwrap() = State::default();
+
+ gst_log!(CAT, obj: &self.element, "Task started");
+ Ok(())
+ }
+ .boxed()
+ }
+
+ fn iterate(&mut self) -> BoxFuture<'_, Result<(), gst::FlowError>> {
+ async move {
+ let jb = JitterBuffer::from_instance(&self.element);
+ let (latency, context_wait) = {
+ let settings = jb.settings.lock().unwrap();
+ (
+ settings.latency_ms as u64 * gst::MSECOND,
+ settings.context_wait as u64 * gst::MSECOND,
+ )
+ };
+
+ loop {
+ let delay_fut = {
+ let mut state = jb.state.lock().unwrap();
+ let (_, next_wakeup) = self.src_pad_handler.get_next_wakeup(
+ &self.element,
+ &state,
+ latency,
+ context_wait,
+ );
+
+ let (delay_fut, abort_handle) = match next_wakeup {
+ Some((_, delay)) if delay == Duration::from_nanos(0) => (None, None),
+ _ => {
+ let (delay_fut, abort_handle) = abortable(async move {
+ match next_wakeup {
+ Some((_, delay)) => {
+ runtime::time::delay_for(delay).await;
+ }
+ None => {
+ future::pending::<()>().await;
+ }
+ };
+ });
+
+ let next_wakeup =
+ next_wakeup.map(|w| w.0).unwrap_or(gst::CLOCK_TIME_NONE);
+ (Some(delay_fut), Some((next_wakeup, abort_handle)))
+ }
+ };
+
+ state.wait_handle = abort_handle;
+
+ delay_fut
+ };
+
+ // Got aborted, reschedule if needed
+ if let Some(delay_fut) = delay_fut {
+ gst_debug!(CAT, obj: &self.element, "Waiting");
+ if let Err(Aborted) = delay_fut.await {
+ gst_debug!(CAT, obj: &self.element, "Waiting aborted");
+ return Ok(());
+ }
+ }
+
+ let (head_pts, head_seq) = {
+ let state = jb.state.lock().unwrap();
+ //
+ // Check earliest PTS as we have just taken the lock
+ let (now, next_wakeup) = self.src_pad_handler.get_next_wakeup(
+ &self.element,
+ &state,
+ latency,
+ context_wait,
+ );
+
+ gst_debug!(
+ CAT,
+ obj: &self.element,
+ "Woke up at {}, earliest_pts {}",
+ now,
+ state.earliest_pts
+ );
+
+ if let Some((next_wakeup, _)) = next_wakeup {
+ if next_wakeup > now {
+ // Reschedule and wait a bit longer in the next iteration
+ return Ok(());
+ }
+ } else {
+ return Ok(());
+ }
+
+ let (head_pts, head_seq) = state.jbuf.borrow().peek();
+
+ (head_pts, head_seq)
+ };
+
+ let res = self.src_pad_handler.pop_and_push(&self.element).await;
+
+ {
+ let mut state = jb.state.lock().unwrap();
+
+ state.last_res = res;
+
+ if head_pts == state.earliest_pts && head_seq == state.earliest_seqnum {
+ let (earliest_pts, earliest_seqnum) = state.jbuf.borrow().find_earliest();
+ state.earliest_pts = earliest_pts;
+ state.earliest_seqnum = earliest_seqnum;
+ }
+
+ if res.is_ok() {
+ // Return and reschedule if the next packet would be in the future
+ // Check earliest PTS as we have just taken the lock
+ let (now, next_wakeup) = self.src_pad_handler.get_next_wakeup(
+ &self.element,
+ &state,
+ latency,
+ context_wait,
+ );
+ if let Some((next_wakeup, _)) = next_wakeup {
+ if next_wakeup > now {
+ // Reschedule and wait a bit longer in the next iteration
+ return Ok(());
+ }
+ } else {
+ return Ok(());
+ }
+ }
+ }
+
+ if let Err(err) = res {
+ match err {
+ gst::FlowError::Eos => {
+ gst_debug!(CAT, obj: &self.element, "Pushing EOS event");
+ let _ = jb.src_pad.push_event(gst::event::Eos::new()).await;
+ }
+ gst::FlowError::Flushing => gst_debug!(CAT, obj: &self.element, "Flushing"),
+ err => gst_error!(CAT, obj: &self.element, "Error {}", err),
+ }
+
+ return Err(err);
+ }
+ }
+ }
+ .boxed()
+ }
+
+ fn stop(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
+ async move {
+ gst_log!(CAT, obj: &self.element, "Stopping task");
+
+ let jb = JitterBuffer::from_instance(&self.element);
+ let mut jb_state = jb.state.lock().unwrap();
+
+ if let Some((_, abort_handle)) = jb_state.wait_handle.take() {
+ abort_handle.abort();
+ }
+
+ self.src_pad_handler.clear();
+ self.sink_pad_handler.clear();
+
+ *jb_state = State::default();
+
+ gst_log!(CAT, obj: &self.element, "Task stopped");
+ Ok(())
+ }
+ .boxed()
+ }
+}
+
+pub struct JitterBuffer {
+ sink_pad: PadSink,
+ src_pad: PadSrc,
+ sink_pad_handler: SinkHandler,
+ src_pad_handler: SrcHandler,
+ task: Task,
+ state: StdMutex<State>,
+ settings: StdMutex<Settings>,
+}
+
+lazy_static! {
+ static ref CAT: gst::DebugCategory = gst::DebugCategory::new(
+ "ts-jitterbuffer",
+ gst::DebugColorFlags::empty(),
+ Some("Thread-sharing jitterbuffer"),
+ );
+}
+
+impl JitterBuffer {
+ fn clear_pt_map(&self, element: &super::JitterBuffer) {
+ gst_info!(CAT, obj: element, "Clearing PT map");
+
+ let mut state = self.state.lock().unwrap();
+ state.clock_rate = None;
+ state.jbuf.borrow().reset_skew();
+ }
+
+ fn prepare(&self, element: &super::JitterBuffer) -> Result<(), gst::ErrorMessage> {
+ gst_info!(CAT, obj: element, "Preparing");
+
+ let context = {
+ let settings = self.settings.lock().unwrap();
+ Context::acquire(&settings.context, settings.context_wait).unwrap()
+ };
+
+ self.task
+ .prepare(
+ JitterBufferTask::new(element, &self.src_pad_handler, &self.sink_pad_handler),
+ context,
+ )
+ .map_err(|err| {
+ gst_error_msg!(
+ gst::ResourceError::OpenRead,
+ ["Error preparing Task: {:?}", err]
+ )
+ })?;
+
+ gst_info!(CAT, obj: element, "Prepared");
+
+ Ok(())
+ }
+
+ fn unprepare(&self, element: &super::JitterBuffer) {
+ gst_debug!(CAT, obj: element, "Unpreparing");
+ self.task.unprepare().unwrap();
+ gst_debug!(CAT, obj: element, "Unprepared");
+ }
+
+ fn start(&self, element: &super::JitterBuffer) -> Result<(), gst::ErrorMessage> {
+ gst_debug!(CAT, obj: element, "Starting");
+ self.task.start()?;
+ gst_debug!(CAT, obj: element, "Started");
+ Ok(())
+ }
+
+ fn stop(&self, element: &super::JitterBuffer) -> Result<(), gst::ErrorMessage> {
+ gst_debug!(CAT, obj: element, "Stopping");
+ self.task.stop()?;
+ gst_debug!(CAT, obj: element, "Stopped");
+ Ok(())
+ }
+}
+
+impl ObjectSubclass for JitterBuffer {
+ const NAME: &'static str = "RsTsJitterBuffer";
+ type Type = super::JitterBuffer;
+ type ParentType = gst::Element;
+ type Instance = gst::subclass::ElementInstanceStruct<Self>;
+ type Class = subclass::simple::ClassStruct<Self>;
+
+ glib_object_subclass!();
+
+ fn class_init(klass: &mut Self::Class) {
+ klass.set_metadata(
+ "Thread-sharing jitterbuffer",
+ "Generic",
+ "Simple jitterbuffer",
+ "Mathieu Duponchelle <mathieu@centricular.com>",
+ );
+
+ let caps = gst::Caps::new_any();
+
+ let sink_pad_template = gst::PadTemplate::new(
+ "sink",
+ gst::PadDirection::Sink,
+ gst::PadPresence::Always,
+ &caps,
+ )
+ .unwrap();
+ klass.add_pad_template(sink_pad_template);
+ klass.add_signal(
+ "request-pt-map",
+ glib::SignalFlags::RUN_LAST,
+ &[u32::static_type()],
+ gst::Caps::static_type(),
+ );
+
+ klass.add_signal_with_class_handler(
+ "clear-pt-map",
+ glib::SignalFlags::RUN_LAST | glib::SignalFlags::ACTION,
+ &[],
+ glib::types::Type::Unit,
+ |_, args| {
+ let element = args[0]
+ .get::<super::JitterBuffer>()
+ .expect("signal arg")
+ .expect("missing signal arg");
+ let jb = Self::from_instance(&element);
+ jb.clear_pt_map(&element);
+ None
+ },
+ );
+
+ let src_pad_template = gst::PadTemplate::new(
+ "src",
+ gst::PadDirection::Src,
+ gst::PadPresence::Always,
+ &caps,
+ )
+ .unwrap();
+ klass.add_pad_template(src_pad_template);
+ klass.install_properties(&PROPERTIES);
+ }
+
+ fn with_class(klass: &Self::Class) -> Self {
+ let sink_pad_handler = SinkHandler::default();
+ let src_pad_handler = SrcHandler::default();
+
+ Self {
+ sink_pad: PadSink::new(
+ gst::Pad::from_template(&klass.get_pad_template("sink").unwrap(), Some("sink")),
+ sink_pad_handler.clone(),
+ ),
+ src_pad: PadSrc::new(
+ gst::Pad::from_template(&klass.get_pad_template("src").unwrap(), Some("src")),
+ src_pad_handler.clone(),
+ ),
+ sink_pad_handler,
+ src_pad_handler,
+ task: Task::default(),
+ state: StdMutex::new(State::default()),
+ settings: StdMutex::new(Settings::default()),
+ }
+ }
+}
+
+impl ObjectImpl for JitterBuffer {
+ fn set_property(&self, obj: &Self::Type, id: usize, value: &glib::Value) {
+ let prop = &PROPERTIES[id];
+
+ match *prop {
+ subclass::Property("latency", ..) => {
+ let latency_ms = {
+ let mut settings = self.settings.lock().unwrap();
+ settings.latency_ms = value.get_some().expect("type checked upstream");
+ settings.latency_ms as u64
+ };
+
+ let state = self.state.lock().unwrap();
+ state.jbuf.borrow().set_delay(latency_ms * gst::MSECOND);
+
+ let _ = obj.post_message(gst::message::Latency::builder().src(obj).build());
+ }
+ subclass::Property("do-lost", ..) => {
+ let mut settings = self.settings.lock().unwrap();
+ settings.do_lost = value.get_some().expect("type checked upstream");
+ }
+ subclass::Property("max-dropout-time", ..) => {
+ let mut settings = self.settings.lock().unwrap();
+ settings.max_dropout_time = value.get_some().expect("type checked upstream");
+ }
+ subclass::Property("max-misorder-time", ..) => {
+ let mut settings = self.settings.lock().unwrap();
+ settings.max_misorder_time = value.get_some().expect("type checked upstream");
+ }
+ subclass::Property("context", ..) => {
+ let mut settings = self.settings.lock().unwrap();
+ settings.context = value
+ .get()
+ .expect("type checked upstream")
+ .unwrap_or_else(|| "".into());
+ }
+ subclass::Property("context-wait", ..) => {
+ let mut settings = self.settings.lock().unwrap();
+ settings.context_wait = value.get_some().expect("type checked upstream");
+ }
+ _ => unimplemented!(),
+ }
+ }
+
+ fn get_property(&self, _obj: &Self::Type, id: usize) -> Result<glib::Value, ()> {
+ let prop = &PROPERTIES[id];
+
+ match *prop {
+ subclass::Property("latency", ..) => {
+ let settings = self.settings.lock().unwrap();
+ Ok(settings.latency_ms.to_value())
+ }
+ subclass::Property("do-lost", ..) => {
+ let settings = self.settings.lock().unwrap();
+ Ok(settings.do_lost.to_value())
+ }
+ subclass::Property("max-dropout-time", ..) => {
+ let settings = self.settings.lock().unwrap();
+ Ok(settings.max_dropout_time.to_value())
+ }
+ subclass::Property("max-misorder-time", ..) => {
+ let settings = self.settings.lock().unwrap();
+ Ok(settings.max_misorder_time.to_value())
+ }
+ subclass::Property("stats", ..) => {
+ let state = self.state.lock().unwrap();
+ let s = gst::Structure::new(
+ "application/x-rtp-jitterbuffer-stats",
+ &[
+ ("num-pushed", &state.stats.num_pushed),
+ ("num-lost", &state.stats.num_lost),
+ ("num-late", &state.stats.num_late),
+ ],
+ );
+ Ok(s.to_value())
+ }
+ subclass::Property("context", ..) => {
+ let settings = self.settings.lock().unwrap();
+ Ok(settings.context.to_value())
+ }
+ subclass::Property("context-wait", ..) => {
+ let settings = self.settings.lock().unwrap();
+ Ok(settings.context_wait.to_value())
+ }
+ _ => unimplemented!(),
+ }
+ }
+
+ fn constructed(&self, obj: &Self::Type) {
+ self.parent_constructed(obj);
+
+ obj.add_pad(self.sink_pad.gst_pad()).unwrap();
+ obj.add_pad(self.src_pad.gst_pad()).unwrap();
+ obj.set_element_flags(gst::ElementFlags::PROVIDE_CLOCK | gst::ElementFlags::REQUIRE_CLOCK);
+ }
+}
+
+impl ElementImpl for JitterBuffer {
+ fn change_state(
+ &self,
+ element: &Self::Type,
+ transition: gst::StateChange,
+ ) -> Result<gst::StateChangeSuccess, gst::StateChangeError> {
+ gst_trace!(CAT, obj: element, "Changing state {:?}", transition);
+
+ match transition {
+ gst::StateChange::NullToReady => {
+ self.prepare(element).map_err(|err| {
+ element.post_error_message(err);
+ gst::StateChangeError
+ })?;
+ }
+ gst::StateChange::PausedToReady => {
+ self.stop(element).map_err(|_| gst::StateChangeError)?;
+ }
+ gst::StateChange::ReadyToNull => {
+ self.unprepare(element);
+ }
+ _ => (),
+ }
+
+ let mut success = self.parent_change_state(element, transition)?;
+
+ match transition {
+ gst::StateChange::ReadyToPaused => {
+ self.start(element).map_err(|_| gst::StateChangeError)?;
+ success = gst::StateChangeSuccess::NoPreroll;
+ }
+ gst::StateChange::PlayingToPaused => {
+ success = gst::StateChangeSuccess::NoPreroll;
+ }
+ _ => (),
+ }
+
+ Ok(success)
+ }
+
+ fn provide_clock(&self, _element: &Self::Type) -> Option<gst::Clock> {
+ Some(gst::SystemClock::obtain())
+ }
+}
diff --git a/generic/threadshare/src/jitterbuffer/jitterbuffer.rs b/generic/threadshare/src/jitterbuffer/jitterbuffer.rs
index 3579f9f3a..5984db98f 100644
--- a/generic/threadshare/src/jitterbuffer/jitterbuffer.rs
+++ b/generic/threadshare/src/jitterbuffer/jitterbuffer.rs
@@ -1,4 +1,4 @@
-// Copyright (C) 2018 Sebastian Dröge <sebastian@centricular.com>
+// Copyright (C) 2019 Sebastian Dröge <sebastian@centricular.com>
//
// This library is free software; you can redistribute it and/or
// modify it under the terms of the GNU Library General Public
@@ -15,1633 +15,369 @@
// Free Software Foundation, Inc., 51 Franklin Street, Suite 500,
// Boston, MA 02110-1335, USA.
-use futures::future::BoxFuture;
-use futures::future::{abortable, AbortHandle, Aborted};
-use futures::prelude::*;
+use super::ffi;
-use glib::glib_object_subclass;
-use glib::prelude::*;
-use glib::subclass;
-use glib::subclass::prelude::*;
+use std::ptr;
-use gst::prelude::*;
-use gst::subclass::prelude::*;
-use gst::{gst_debug, gst_element_error, gst_error, gst_error_msg, gst_info, gst_log, gst_trace};
-use gst_rtp::RTPBuffer;
+use glib_sys as glib_ffi;
+use gstreamer_sys as gst_ffi;
-use lazy_static::lazy_static;
+use glib::glib_wrapper;
+use glib::prelude::*;
+use glib::translate::*;
-use std::cmp::{max, min, Ordering};
-use std::collections::{BTreeSet, VecDeque};
use std::mem;
-use std::sync::Arc;
-use std::sync::Mutex as StdMutex;
-use std::time::Duration;
-
-use crate::runtime::prelude::*;
-use crate::runtime::{self, Context, PadSink, PadSinkRef, PadSrc, PadSrcRef, Task};
-
-use super::{RTPJitterBuffer, RTPJitterBufferItem, RTPPacketRateCtx};
-const DEFAULT_LATENCY_MS: u32 = 200;
-const DEFAULT_DO_LOST: bool = false;
-const DEFAULT_MAX_DROPOUT_TIME: u32 = 60000;
-const DEFAULT_MAX_MISORDER_TIME: u32 = 2000;
-const DEFAULT_CONTEXT: &str = "";
-const DEFAULT_CONTEXT_WAIT: u32 = 0;
+glib_wrapper! {
+ pub struct RTPJitterBuffer(Object<ffi::RTPJitterBuffer>);
-#[derive(Debug, Clone)]
-struct Settings {
- latency_ms: u32,
- do_lost: bool,
- max_dropout_time: u32,
- max_misorder_time: u32,
- context: String,
- context_wait: u32,
-}
-
-impl Default for Settings {
- fn default() -> Self {
- Settings {
- latency_ms: DEFAULT_LATENCY_MS,
- do_lost: DEFAULT_DO_LOST,
- max_dropout_time: DEFAULT_MAX_DROPOUT_TIME,
- max_misorder_time: DEFAULT_MAX_MISORDER_TIME,
- context: DEFAULT_CONTEXT.into(),
- context_wait: DEFAULT_CONTEXT_WAIT,
- }
+ match fn {
+ get_type => || ffi::rtp_jitter_buffer_get_type(),
}
}
-static PROPERTIES: [subclass::Property; 7] = [
- subclass::Property("latency", |name| {
- glib::ParamSpec::uint(
- name,
- "Buffer latency in ms",
- "Amount of ms to buffer",
- 0,
- std::u32::MAX,
- DEFAULT_LATENCY_MS,
- glib::ParamFlags::READWRITE,
- )
- }),
- subclass::Property("do-lost", |name| {
- glib::ParamSpec::boolean(
- name,
- "Do Lost",
- "Send an event downstream when a packet is lost",
- DEFAULT_DO_LOST,
- glib::ParamFlags::READWRITE,
- )
- }),
- subclass::Property("max-dropout-time", |name| {
- glib::ParamSpec::uint(
- name,
- "Max dropout time",
- "The maximum time (milliseconds) of missing packets tolerated.",
- 0,
- std::u32::MAX,
- DEFAULT_MAX_DROPOUT_TIME,
- glib::ParamFlags::READWRITE,
- )
- }),
- subclass::Property("max-misorder-time", |name| {
- glib::ParamSpec::uint(
- name,
- "Max misorder time",
- "The maximum time (milliseconds) of misordered packets tolerated.",
- 0,
- std::u32::MAX,
- DEFAULT_MAX_MISORDER_TIME,
- glib::ParamFlags::READWRITE,
- )
- }),
- subclass::Property("stats", |name| {
- glib::ParamSpec::boxed(
- name,
- "Statistics",
- "Various statistics",
- gst::Structure::static_type(),
- glib::ParamFlags::READABLE,
- )
- }),
- subclass::Property("context", |name| {
- glib::ParamSpec::string(
- name,
- "Context",
- "Context name to share threads with",
- Some(DEFAULT_CONTEXT),
- glib::ParamFlags::READWRITE,
- )
- }),
- subclass::Property("context-wait", |name| {
- glib::ParamSpec::uint(
- name,
- "Context Wait",
- "Throttle poll loop to run at most once every this many ms",
- 0,
- 1000,
- DEFAULT_CONTEXT_WAIT,
- glib::ParamFlags::READWRITE,
- )
- }),
-];
-
-#[derive(Eq)]
-struct GapPacket {
- buffer: gst::Buffer,
- seq: u16,
- pt: u8,
-}
-
-impl GapPacket {
- fn new(buffer: gst::Buffer) -> Self {
- let rtp_buffer = RTPBuffer::from_buffer_readable(&buffer).unwrap();
- let seq = rtp_buffer.get_seq();
- let pt = rtp_buffer.get_payload_type();
- drop(rtp_buffer);
-
- Self { buffer, seq, pt }
+unsafe impl glib::SendUnique for RTPJitterBuffer {
+ fn is_unique(&self) -> bool {
+ self.ref_count() == 1
}
}
-impl Ord for GapPacket {
- fn cmp(&self, other: &Self) -> Ordering {
- 0.cmp(&gst_rtp::compare_seqnum(self.seq, other.seq))
- }
-}
-
-impl PartialOrd for GapPacket {
- fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
- Some(self.cmp(other))
- }
-}
+impl ToGlib for RTPJitterBufferMode {
+ type GlibType = ffi::RTPJitterBufferMode;
-impl PartialEq for GapPacket {
- fn eq(&self, other: &Self) -> bool {
- self.cmp(other) == Ordering::Equal
+ fn to_glib(&self) -> ffi::RTPJitterBufferMode {
+ match *self {
+ RTPJitterBufferMode::None => ffi::RTP_JITTER_BUFFER_MODE_NONE,
+ RTPJitterBufferMode::Slave => ffi::RTP_JITTER_BUFFER_MODE_SLAVE,
+ RTPJitterBufferMode::Buffer => ffi::RTP_JITTER_BUFFER_MODE_BUFFER,
+ RTPJitterBufferMode::Synced => ffi::RTP_JITTER_BUFFER_MODE_SYNCED,
+ RTPJitterBufferMode::__Unknown(value) => value,
+ }
}
}
-struct SinkHandlerInner {
- packet_rate_ctx: RTPPacketRateCtx,
- ips_rtptime: Option<u32>,
- ips_pts: gst::ClockTime,
-
- gap_packets: BTreeSet<GapPacket>,
-
- last_pt: Option<u8>,
-
- last_in_seqnum: Option<u16>,
- last_rtptime: Option<u32>,
-}
-
-impl Default for SinkHandlerInner {
- fn default() -> Self {
- SinkHandlerInner {
- packet_rate_ctx: RTPPacketRateCtx::new(),
- ips_rtptime: None,
- ips_pts: gst::CLOCK_TIME_NONE,
- gap_packets: BTreeSet::new(),
- last_pt: None,
- last_in_seqnum: None,
- last_rtptime: None,
+impl FromGlib<ffi::RTPJitterBufferMode> for RTPJitterBufferMode {
+ fn from_glib(value: ffi::RTPJitterBufferMode) -> Self {
+ match value {
+ 0 => RTPJitterBufferMode::None,
+ 1 => RTPJitterBufferMode::Slave,
+ 2 => RTPJitterBufferMode::Buffer,
+ 4 => RTPJitterBufferMode::Synced,
+ value => RTPJitterBufferMode::__Unknown(value),
}
}
}
-#[derive(Clone, Default)]
-struct SinkHandler(Arc<StdMutex<SinkHandlerInner>>);
+pub struct RTPJitterBufferItem(Option<ptr::NonNull<ffi::RTPJitterBufferItem>>);
-impl SinkHandler {
- fn clear(&self) {
- let mut inner = self.0.lock().unwrap();
- *inner = SinkHandlerInner::default();
- }
+unsafe impl Send for RTPJitterBufferItem {}
- // For resetting if seqnum discontinuities
- fn reset(
- &self,
- inner: &mut SinkHandlerInner,
- state: &mut State,
- element: &gst::Element,
- ) -> BTreeSet<GapPacket> {
- gst_info!(CAT, obj: element, "Resetting");
-
- state.jbuf.borrow().flush();
- state.jbuf.borrow().reset_skew();
- state.discont = true;
-
- state.last_popped_seqnum = None;
- state.last_popped_pts = gst::CLOCK_TIME_NONE;
-
- inner.last_in_seqnum = None;
- inner.last_rtptime = None;
-
- state.earliest_pts = gst::CLOCK_TIME_NONE;
- state.earliest_seqnum = None;
-
- inner.ips_rtptime = None;
- inner.ips_pts = gst::CLOCK_TIME_NONE;
-
- mem::replace(&mut inner.gap_packets, BTreeSet::new())
- }
-
- fn parse_caps(
- &self,
- inner: &mut SinkHandlerInner,
- state: &mut State,
- element: &gst::Element,
- caps: &gst::Caps,
- pt: u8,
- ) -> Result<gst::FlowSuccess, gst::FlowError> {
- let s = caps.get_structure(0).ok_or(gst::FlowError::Error)?;
-
- gst_info!(CAT, obj: element, "Parsing {:?}", caps);
-
- let payload = s
- .get_some::<i32>("payload")
- .map_err(|_| gst::FlowError::Error)?;
-
- if pt != 0 && payload as u8 != pt {
- return Err(gst::FlowError::Error);
- }
-
- inner.last_pt = Some(pt);
- let clock_rate = s
- .get_some::<i32>("clock-rate")
- .map_err(|_| gst::FlowError::Error)?;
-
- if clock_rate <= 0 {
- return Err(gst::FlowError::Error);
- }
- state.clock_rate = Some(clock_rate as u32);
-
- inner.packet_rate_ctx.reset(clock_rate);
- state.jbuf.borrow().set_clock_rate(clock_rate as u32);
-
- Ok(gst::FlowSuccess::Ok)
- }
-
- fn calculate_packet_spacing(
- &self,
- inner: &mut SinkHandlerInner,
- state: &mut State,
- rtptime: u32,
+impl RTPJitterBufferItem {
+ pub fn new(
+ buffer: gst::Buffer,
+ dts: gst::ClockTime,
pts: gst::ClockTime,
- ) {
- if inner.ips_rtptime != Some(rtptime) {
- if inner.ips_pts.is_some() && pts.is_some() {
- let new_packet_spacing = pts - inner.ips_pts;
- let old_packet_spacing = state.packet_spacing;
-
- assert!(old_packet_spacing.is_some());
- if old_packet_spacing > new_packet_spacing {
- state.packet_spacing = (new_packet_spacing + 3 * old_packet_spacing) / 4;
- } else if !old_packet_spacing.is_zero() {
- state.packet_spacing = (3 * new_packet_spacing + old_packet_spacing) / 4;
- } else {
- state.packet_spacing = new_packet_spacing;
- }
+ seqnum: Option<u16>,
+ rtptime: u32,
+ ) -> RTPJitterBufferItem {
+ unsafe {
+ let ptr = ptr::NonNull::new(glib_sys::g_slice_alloc0(mem::size_of::<
+ ffi::RTPJitterBufferItem,
+ >()) as *mut ffi::RTPJitterBufferItem)
+ .expect("Allocation failed");
+ ptr::write(
+ ptr.as_ptr(),
+ ffi::RTPJitterBufferItem {
+ data: buffer.into_ptr() as *mut _,
+ next: ptr::null_mut(),
+ prev: ptr::null_mut(),
+ r#type: 0,
+ dts: dts.to_glib(),
+ pts: pts.to_glib(),
+ seqnum: seqnum.map(|s| s as u32).unwrap_or(std::u32::MAX),
+ count: 1,
+ rtptime,
+ },
+ );
- gst_debug!(
- CAT,
- "new packet spacing {}, old packet spacing {} combined to {}",
- new_packet_spacing,
- old_packet_spacing,
- state.packet_spacing
- );
- }
- inner.ips_rtptime = Some(rtptime);
- inner.ips_pts = pts;
+ RTPJitterBufferItem(Some(ptr))
}
}
- fn handle_big_gap_buffer(
- &self,
- inner: &mut SinkHandlerInner,
- element: &gst::Element,
- buffer: gst::Buffer,
- pt: u8,
- ) -> bool {
- let gap_packets_length = inner.gap_packets.len();
- let mut reset = false;
-
- gst_debug!(
- CAT,
- obj: element,
- "Handling big gap, gap packets length: {}",
- gap_packets_length
- );
-
- inner.gap_packets.insert(GapPacket::new(buffer));
-
- if gap_packets_length > 0 {
- let mut prev_gap_seq = std::u32::MAX;
- let mut all_consecutive = true;
-
- for gap_packet in inner.gap_packets.iter() {
- gst_log!(
- CAT,
- obj: element,
- "Looking at gap packet with seq {}",
- gap_packet.seq,
- );
-
- all_consecutive = gap_packet.pt == pt;
-
- if prev_gap_seq == std::u32::MAX {
- prev_gap_seq = gap_packet.seq as u32;
- } else if gst_rtp::compare_seqnum(gap_packet.seq, prev_gap_seq as u16) != -1 {
- all_consecutive = false;
- } else {
- prev_gap_seq = gap_packet.seq as u32;
- }
-
- if !all_consecutive {
- break;
- }
- }
-
- gst_debug!(CAT, obj: element, "all consecutive: {}", all_consecutive);
-
- if all_consecutive && gap_packets_length > 3 {
- reset = true;
- } else if !all_consecutive {
- inner.gap_packets.clear();
- }
+ pub fn into_buffer(mut self) -> gst::Buffer {
+ unsafe {
+ let item = self.0.take().expect("Invalid wrapper");
+ let buf = item.as_ref().data as *mut gst_ffi::GstBuffer;
+ glib_sys::g_slice_free1(
+ mem::size_of::<ffi::RTPJitterBufferItem>(),
+ item.as_ptr() as *mut _,
+ );
+ from_glib_full(buf)
}
-
- reset
}
- fn store(
- &self,
- inner: &mut SinkHandlerInner,
- pad: &gst::Pad,
- element: &gst::Element,
- buffer: gst::Buffer,
- ) -> Result<gst::FlowSuccess, gst::FlowError> {
- let jb = JitterBuffer::from_instance(element);
- let mut state = jb.state.lock().unwrap();
-
- let (max_misorder_time, max_dropout_time) = {
- let settings = jb.settings.lock().unwrap();
- (settings.max_misorder_time, settings.max_dropout_time)
- };
-
- let (seq, rtptime, pt) = {
- let rtp_buffer =
- RTPBuffer::from_buffer_readable(&buffer).map_err(|_| gst::FlowError::Error)?;
- (
- rtp_buffer.get_seq(),
- rtp_buffer.get_timestamp(),
- rtp_buffer.get_payload_type(),
- )
- };
-
- let mut pts = buffer.get_pts();
- let mut dts = buffer.get_dts();
- let mut estimated_dts = false;
-
- gst_log!(
- CAT,
- obj: element,
- "Storing buffer, seq: {}, rtptime: {}, pt: {}",
- seq,
- rtptime,
- pt
- );
-
- if dts.is_none() {
- dts = pts;
- } else if pts.is_none() {
- pts = dts;
- }
-
- if dts.is_none() {
- dts = element.get_current_running_time();
- pts = dts;
-
- estimated_dts = state.clock_rate.is_some();
- } else {
- dts = state.segment.to_running_time(dts);
- }
-
- if state.clock_rate.is_none() {
- inner.ips_rtptime = Some(rtptime);
- inner.ips_pts = pts;
- }
-
- if inner.last_pt != Some(pt) {
- inner.last_pt = Some(pt);
- state.clock_rate = None;
-
- gst_debug!(CAT, obj: pad, "New payload type: {}", pt);
-
- if let Some(caps) = pad.get_current_caps() {
- /* Ignore errors at this point, as we want to emit request-pt-map */
- let _ = self.parse_caps(inner, &mut state, element, &caps, pt);
- }
- }
-
- let mut state = {
- if state.clock_rate.is_none() {
- drop(state);
- let caps = element
- .emit("request-pt-map", &[&(pt as u32)])
- .map_err(|_| gst::FlowError::Error)?
- .ok_or(gst::FlowError::Error)?
- .get::<gst::Caps>()
- .map_err(|_| gst::FlowError::Error)?
- .ok_or(gst::FlowError::Error)?;
- let mut state = jb.state.lock().unwrap();
- self.parse_caps(inner, &mut state, element, &caps, pt)?;
- state
+ pub fn get_dts(&self) -> gst::ClockTime {
+ unsafe {
+ let item = self.0.as_ref().expect("Invalid wrapper");
+ if item.as_ref().dts == gst_ffi::GST_CLOCK_TIME_NONE {
+ gst::CLOCK_TIME_NONE
} else {
- state
+ gst::ClockTime(Some(item.as_ref().dts))
}
- };
-
- inner.packet_rate_ctx.update(seq, rtptime);
-
- let max_dropout = inner
- .packet_rate_ctx
- .get_max_dropout(max_dropout_time as i32);
- let max_misorder = inner
- .packet_rate_ctx
- .get_max_dropout(max_misorder_time as i32);
-
- pts = state.jbuf.borrow().calculate_pts(
- dts,
- estimated_dts,
- rtptime,
- element.get_base_time(),
- 0,
- false,
- );
-
- if pts.is_none() {
- gst_debug!(
- CAT,
- obj: element,
- "cannot calculate a valid pts for #{}, discard",
- seq
- );
- return Ok(gst::FlowSuccess::Ok);
}
+ }
- if let Some(last_in_seqnum) = inner.last_in_seqnum {
- let gap = gst_rtp::compare_seqnum(last_in_seqnum as u16, seq);
- if gap == 1 {
- self.calculate_packet_spacing(inner, &mut state, rtptime, pts);
+ pub fn get_pts(&self) -> gst::ClockTime {
+ unsafe {
+ let item = self.0.as_ref().expect("Invalid wrapper");
+ if item.as_ref().pts == gst_ffi::GST_CLOCK_TIME_NONE {
+ gst::CLOCK_TIME_NONE
} else {
- if (gap != -1 && gap < -(max_misorder as i32)) || (gap >= max_dropout as i32) {
- let reset = self.handle_big_gap_buffer(inner, element, buffer, pt);
- if reset {
- // Handle reset in `enqueue_item` to avoid recursion
- return Err(gst::FlowError::CustomError);
- } else {
- return Ok(gst::FlowSuccess::Ok);
- }
- }
- inner.ips_pts = gst::CLOCK_TIME_NONE;
- inner.ips_rtptime = None;
+ gst::ClockTime(Some(item.as_ref().pts))
}
-
- inner.gap_packets.clear();
- }
-
- if let Some(last_popped_seqnum) = state.last_popped_seqnum {
- let gap = gst_rtp::compare_seqnum(last_popped_seqnum, seq);
-
- if gap <= 0 {
- state.stats.num_late += 1;
- gst_debug!(CAT, obj: element, "Dropping late {}", seq);
- return Ok(gst::FlowSuccess::Ok);
- }
- }
-
- inner.last_in_seqnum = Some(seq);
-
- let jb_item = if estimated_dts {
- RTPJitterBufferItem::new(buffer, gst::CLOCK_TIME_NONE, pts, Some(seq), rtptime)
- } else {
- RTPJitterBufferItem::new(buffer, dts, pts, Some(seq), rtptime)
- };
-
- let (success, _, _) = state.jbuf.borrow().insert(jb_item);
-
- if !success {
- /* duplicate */
- return Ok(gst::FlowSuccess::Ok);
- }
-
- if Some(rtptime) == inner.last_rtptime {
- state.equidistant -= 2;
- } else {
- state.equidistant += 1;
}
-
- state.equidistant = min(max(state.equidistant, -7), 7);
-
- inner.last_rtptime = Some(rtptime);
-
- if state.earliest_pts.is_none()
- || (pts.is_some()
- && (pts < state.earliest_pts
- || (pts == state.earliest_pts
- && state
- .earliest_seqnum
- .map(|earliest_seqnum| seq > earliest_seqnum)
- .unwrap_or(false))))
- {
- state.earliest_pts = pts;
- state.earliest_seqnum = Some(seq);
- }
-
- gst_log!(CAT, obj: pad, "Stored buffer");
-
- Ok(gst::FlowSuccess::Ok)
}
- fn enqueue_item(
- &self,
- pad: &gst::Pad,
- element: &gst::Element,
- buffer: Option<gst::Buffer>,
- ) -> Result<gst::FlowSuccess, gst::FlowError> {
- let mut inner = self.0.lock().unwrap();
-
- let mut buffers = VecDeque::new();
- if let Some(buf) = buffer {
- buffers.push_back(buf);
- }
-
- // This is to avoid recursion with `store`, `reset` and `enqueue_item`
- while let Some(buf) = buffers.pop_front() {
- if let Err(err) = self.store(&mut inner, pad, element, buf) {
- match err {
- gst::FlowError::CustomError => {
- let jb = JitterBuffer::from_instance(element);
- let mut state = jb.state.lock().unwrap();
- for gap_packet in self.reset(&mut inner, &mut state, element) {
- buffers.push_back(gap_packet.buffer);
- }
- }
- other => return Err(other),
- }
- }
- }
-
- let jb = JitterBuffer::from_instance(element);
- let mut state = jb.state.lock().unwrap();
-
- let (latency, context_wait) = {
- let settings = jb.settings.lock().unwrap();
- (
- settings.latency_ms as u64 * gst::MSECOND,
- settings.context_wait as u64 * gst::MSECOND,
- )
- };
-
- // Reschedule if needed
- let (_, next_wakeup) =
- jb.src_pad_handler
- .get_next_wakeup(&element, &state, latency, context_wait);
- if let Some((next_wakeup, _)) = next_wakeup {
- if let Some((previous_next_wakeup, ref abort_handle)) = state.wait_handle {
- if previous_next_wakeup.is_none() || previous_next_wakeup > next_wakeup {
- gst_debug!(
- CAT,
- obj: pad,
- "Rescheduling for new item {} < {}",
- next_wakeup,
- previous_next_wakeup
- );
- abort_handle.abort();
- state.wait_handle = None;
- }
+ pub fn get_seqnum(&self) -> Option<u16> {
+ unsafe {
+ let item = self.0.as_ref().expect("Invalid wrapper");
+ if item.as_ref().seqnum == std::u32::MAX {
+ None
+ } else {
+ Some(item.as_ref().seqnum as u16)
}
}
- state.last_res
- }
-}
-
-impl PadSinkHandler for SinkHandler {
- type ElementImpl = JitterBuffer;
-
- fn sink_chain(
- &self,
- pad: &PadSinkRef,
- _jb: &JitterBuffer,
- element: &gst::Element,
- buffer: gst::Buffer,
- ) -> BoxFuture<'static, Result<gst::FlowSuccess, gst::FlowError>> {
- let pad_weak = pad.downgrade();
- let element = element.clone();
- let this = self.clone();
-
- async move {
- let pad = pad_weak.upgrade().expect("PadSink no longer exists");
-
- gst_debug!(CAT, obj: pad.gst_pad(), "Handling {:?}", buffer);
- this.enqueue_item(pad.gst_pad(), &element, Some(buffer))
- }
- .boxed()
}
- fn sink_event(
- &self,
- pad: &PadSinkRef,
- jb: &JitterBuffer,
- element: &gst::Element,
- event: gst::Event,
- ) -> bool {
- use gst::EventView;
-
- gst_log!(CAT, obj: pad.gst_pad(), "Handling {:?}", event);
-
- if let EventView::FlushStart(..) = event.view() {
- if let Err(err) = jb.task.flush_start() {
- gst_error!(CAT, obj: pad.gst_pad(), "FlushStart failed {:?}", err);
- gst_element_error!(
- element,
- gst::StreamError::Failed,
- ("Internal data stream error"),
- ["FlushStart failed {:?}", err]
- );
- return false;
- }
+ #[allow(dead_code)]
+ pub fn get_rtptime(&self) -> u32 {
+ unsafe {
+ let item = self.0.as_ref().expect("Invalid wrapper");
+ item.as_ref().rtptime
}
-
- gst_log!(CAT, obj: pad.gst_pad(), "Forwarding {:?}", event);
- jb.src_pad.gst_pad().push_event(event)
}
+}
- fn sink_event_serialized(
- &self,
- pad: &PadSinkRef,
- _jb: &JitterBuffer,
- element: &gst::Element,
- event: gst::Event,
- ) -> BoxFuture<'static, bool> {
- use gst::EventView;
-
- let pad_weak = pad.downgrade();
- let element = element.clone();
-
- async move {
- let pad = pad_weak.upgrade().expect("PadSink no longer exists");
-
- gst_log!(CAT, obj: pad.gst_pad(), "Handling {:?}", event);
-
- let jb = JitterBuffer::from_instance(&element);
-
- let mut forward = true;
- match event.view() {
- EventView::Segment(e) => {
- let mut state = jb.state.lock().unwrap();
- state.segment = e
- .get_segment()
- .clone()
- .downcast::<gst::format::Time>()
- .unwrap();
- }
- EventView::FlushStop(..) => {
- if let Err(err) = jb.task.flush_stop() {
- gst_error!(CAT, obj: pad.gst_pad(), "FlushStop failed {:?}", err);
- gst_element_error!(
- element,
- gst::StreamError::Failed,
- ("Internal data stream error"),
- ["FlushStop failed {:?}", err]
- );
- return false;
- }
- }
- EventView::Eos(..) => {
- let mut state = jb.state.lock().unwrap();
- state.eos = true;
- if let Some((_, abort_handle)) = state.wait_handle.take() {
- abort_handle.abort();
- }
- forward = false;
+impl Drop for RTPJitterBufferItem {
+ fn drop(&mut self) {
+ unsafe {
+ if let Some(ref item) = self.0 {
+ if !item.as_ref().data.is_null() {
+ gst_ffi::gst_mini_object_unref(item.as_ref().data as *mut _);
}
- _ => (),
- };
- if forward {
- // FIXME: These events should really be queued up and stay in order
- gst_log!(CAT, obj: pad.gst_pad(), "Forwarding serialized {:?}", event);
- jb.src_pad.push_event(event).await
- } else {
- true
+ glib_sys::g_slice_free1(
+ mem::size_of::<ffi::RTPJitterBufferItem>(),
+ item.as_ptr() as *mut _,
+ );
}
}
- .boxed()
}
}
-#[derive(Clone, Default)]
-struct SrcHandler;
-
-impl SrcHandler {
- fn clear(&self) {}
-
- fn generate_lost_events(
- &self,
- state: &mut State,
- element: &gst::Element,
- seqnum: u16,
- pts: gst::ClockTime,
- discont: &mut bool,
- ) -> Vec<gst::Event> {
- let (latency_ns, do_lost) = {
- let jb = JitterBuffer::from_instance(element);
- let settings = jb.settings.lock().unwrap();
- (
- settings.latency_ms as u64 * gst::MSECOND.nseconds().unwrap(),
- settings.do_lost,
- )
- };
-
- let mut events = vec![];
-
- let last_popped_seqnum = match state.last_popped_seqnum {
- None => return events,
- Some(seq) => seq,
- };
-
- gst_debug!(
- CAT,
- obj: element,
- "Generating lost events seq: {}, last popped seq: {:?}",
- seqnum,
- last_popped_seqnum,
- );
-
- let mut lost_seqnum = last_popped_seqnum.wrapping_add(1);
- let gap = gst_rtp::compare_seqnum(lost_seqnum, seqnum) as i64;
-
- if gap > 0 {
- let interval =
- pts.nseconds().unwrap() as i64 - state.last_popped_pts.nseconds().unwrap() as i64;
- let gap = gap as u64;
- let spacing = if interval >= 0 {
- interval as u64 / (gap + 1)
- } else {
- 0
- };
-
- *discont = true;
-
- if state.equidistant > 0 && gap > 1 && gap * spacing > latency_ns {
- let n_packets = gap - latency_ns / spacing;
-
- if do_lost {
- let s = gst::Structure::new(
- "GstRTPPacketLost",
- &[
- ("seqnum", &(lost_seqnum as u32)),
- (
- "timestamp",
- &(state.last_popped_pts + gst::ClockTime(Some(spacing))),
- ),
- ("duration", &(n_packets * spacing)),
- ("retry", &0),
- ],
- );
-
- events.push(gst::event::CustomDownstream::new(s));
- }
-
- lost_seqnum = lost_seqnum.wrapping_add(n_packets as u16);
- state.last_popped_pts += gst::ClockTime(Some(n_packets * spacing));
- state.stats.num_lost += n_packets;
- }
-
- while lost_seqnum != seqnum {
- let timestamp = state.last_popped_pts + gst::ClockTime(Some(spacing));
- let duration = if state.equidistant > 0 { spacing } else { 0 };
-
- state.last_popped_pts = timestamp;
-
- if do_lost {
- let s = gst::Structure::new(
- "GstRTPPacketLost",
- &[
- ("seqnum", &(lost_seqnum as u32)),
- ("timestamp", &timestamp),
- ("duration", &duration),
- ("retry", &0),
- ],
- );
-
- events.push(gst::event::CustomDownstream::new(s));
- }
+pub struct RTPPacketRateCtx(Box<ffi::RTPPacketRateCtx>);
- state.stats.num_lost += 1;
+unsafe impl Send for RTPPacketRateCtx {}
- lost_seqnum = lost_seqnum.wrapping_add(1);
- }
+impl RTPPacketRateCtx {
+ pub fn new() -> RTPPacketRateCtx {
+ unsafe {
+ let mut ptr = std::mem::MaybeUninit::uninit();
+ ffi::gst_rtp_packet_rate_ctx_reset(ptr.as_mut_ptr(), -1);
+ RTPPacketRateCtx(Box::new(ptr.assume_init()))
}
-
- events
}
- async fn pop_and_push(
- &self,
- element: &gst::Element,
- ) -> Result<gst::FlowSuccess, gst::FlowError> {
- let jb = JitterBuffer::from_instance(element);
-
- let (lost_events, buffer, seq) = {
- let mut state = jb.state.lock().unwrap();
-
- let mut discont = false;
- let (jb_item, _) = state.jbuf.borrow().pop();
-
- let jb_item = match jb_item {
- None => {
- if state.eos {
- return Err(gst::FlowError::Eos);
- } else {
- return Ok(gst::FlowSuccess::Ok);
- }
- }
- Some(item) => item,
- };
-
- let dts = jb_item.get_dts();
- let pts = jb_item.get_pts();
- let seq = jb_item.get_seqnum();
- let mut buffer = jb_item.into_buffer();
-
- let lost_events = {
- let buffer = buffer.make_mut();
-
- buffer.set_dts(state.segment.to_running_time(dts));
- buffer.set_pts(state.segment.to_running_time(pts));
-
- if state.last_popped_pts.is_some() && buffer.get_pts() < state.last_popped_pts {
- buffer.set_pts(state.last_popped_pts)
- }
-
- let lost_events = if let Some(seq) = seq {
- self.generate_lost_events(&mut state, element, seq, pts, &mut discont)
- } else {
- vec![]
- };
-
- if state.discont {
- discont = true;
- state.discont = false;
- }
-
- if discont {
- buffer.set_flags(gst::BufferFlags::DISCONT);
- }
-
- lost_events
- };
-
- state.last_popped_pts = buffer.get_pts();
- if let Some(pts) = state.last_popped_pts.nseconds() {
- state.position = pts.into();
- }
- state.last_popped_seqnum = seq;
-
- state.stats.num_pushed += 1;
-
- (lost_events, buffer, seq)
- };
-
- for event in lost_events {
- gst_debug!(CAT, obj: jb.src_pad.gst_pad(), "Pushing lost event {:?}", event);
- let _ = jb.src_pad.push_event(event).await;
- }
-
- gst_debug!(CAT, obj: jb.src_pad.gst_pad(), "Pushing {:?} with seq {:?}", buffer, seq);
-
- jb.src_pad.push(buffer).await
+ pub fn reset(&mut self, clock_rate: i32) {
+ unsafe { ffi::gst_rtp_packet_rate_ctx_reset(&mut *self.0, clock_rate) }
}
- fn get_next_wakeup(
- &self,
- element: &gst::Element,
- state: &State,
- latency: gst::ClockTime,
- context_wait: gst::ClockTime,
- ) -> (gst::ClockTime, Option<(gst::ClockTime, Duration)>) {
- let now = element.get_current_running_time();
-
- gst_debug!(
- CAT,
- obj: element,
- "Now is {}, EOS {}, earliest pts is {}, packet_spacing {} and latency {}",
- now,
- state.eos,
- state.earliest_pts,
- state.packet_spacing,
- latency
- );
-
- if state.eos {
- gst_debug!(CAT, obj: element, "EOS, not waiting");
- return (now, Some((now, Duration::from_nanos(0))));
- }
-
- if state.earliest_pts.is_none() {
- return (now, None);
- }
-
- let next_wakeup = state.earliest_pts + latency - state.packet_spacing - context_wait / 2;
-
- let delay = next_wakeup
- .saturating_sub(now)
- .unwrap_or_else(gst::ClockTime::zero)
- .nseconds()
- .unwrap();
-
- gst_debug!(
- CAT,
- obj: element,
- "Next wakeup at {} with delay {}",
- next_wakeup,
- delay
- );
-
- (now, Some((next_wakeup, Duration::from_nanos(delay))))
+ pub fn update(&mut self, seqnum: u16, ts: u32) -> u32 {
+ unsafe { ffi::gst_rtp_packet_rate_ctx_update(&mut *self.0, seqnum, ts) }
}
-}
-impl PadSrcHandler for SrcHandler {
- type ElementImpl = JitterBuffer;
-
- fn src_event(
- &self,
- pad: &PadSrcRef,
- jb: &JitterBuffer,
- element: &gst::Element,
- event: gst::Event,
- ) -> bool {
- use gst::EventView;
-
- gst_log!(CAT, obj: pad.gst_pad(), "Handling {:?}", event);
-
- match event.view() {
- EventView::FlushStart(..) => {
- if let Err(err) = jb.task.flush_start() {
- gst_error!(CAT, obj: pad.gst_pad(), "FlushStart failed {:?}", err);
- gst_element_error!(
- element,
- gst::StreamError::Failed,
- ("Internal data stream error"),
- ["FlushStart failed {:?}", err]
- );
- return false;
- }
- }
- EventView::FlushStop(..) => {
- if let Err(err) = jb.task.flush_stop() {
- gst_error!(CAT, obj: pad.gst_pad(), "FlushStop failed {:?}", err);
- gst_element_error!(
- element,
- gst::StreamError::Failed,
- ("Internal data stream error"),
- ["FlushStop failed {:?}", err]
- );
- return false;
- }
- }
- _ => (),
- }
-
- gst_log!(CAT, obj: pad.gst_pad(), "Forwarding {:?}", event);
- jb.sink_pad.gst_pad().push_event(event)
+ pub fn get_max_dropout(&mut self, time_ms: i32) -> u32 {
+ unsafe { ffi::gst_rtp_packet_rate_ctx_get_max_dropout(&mut *self.0, time_ms) }
}
- fn src_query(
- &self,
- pad: &PadSrcRef,
- jb: &JitterBuffer,
- _element: &gst::Element,
- query: &mut gst::QueryRef,
- ) -> bool {
- use gst::QueryView;
-
- gst_log!(CAT, obj: pad.gst_pad(), "Forwarding {:?}", query);
-
- match query.view_mut() {
- QueryView::Latency(ref mut q) => {
- let mut peer_query = gst::query::Latency::new();
-
- let ret = jb.sink_pad.gst_pad().peer_query(&mut peer_query);
-
- if ret {
- let settings = jb.settings.lock().unwrap();
- let (_, mut min_latency, _) = peer_query.get_result();
- min_latency += (settings.latency_ms as u64) * gst::SECOND;
- let max_latency = gst::CLOCK_TIME_NONE;
-
- q.set(true, min_latency, max_latency);
- }
-
- ret
- }
- QueryView::Position(ref mut q) => {
- if q.get_format() != gst::Format::Time {
- jb.sink_pad.gst_pad().peer_query(query)
- } else {
- let state = jb.state.lock().unwrap();
- let position = state.position;
- q.set(position);
- true
- }
- }
- _ => jb.sink_pad.gst_pad().peer_query(query),
- }
+ #[allow(dead_code)]
+ pub fn get_max_disorder(&mut self, time_ms: i32) -> u32 {
+ unsafe { ffi::gst_rtp_packet_rate_ctx_get_max_disorder(&mut *self.0, time_ms) }
}
}
-#[derive(Debug)]
-struct Stats {
- num_pushed: u64,
- num_lost: u64,
- num_late: u64,
-}
-
-impl Default for Stats {
+impl Default for RTPPacketRateCtx {
fn default() -> Self {
- Self {
- num_pushed: 0,
- num_lost: 0,
- num_late: 0,
- }
+ RTPPacketRateCtx::new()
}
}
-// Shared state between element, sink and source pad
-struct State {
- jbuf: glib::SendUniqueCell<RTPJitterBuffer>,
-
- last_res: Result<gst::FlowSuccess, gst::FlowError>,
- position: gst::ClockTime,
-
- segment: gst::FormattedSegment<gst::ClockTime>,
- clock_rate: Option<u32>,
-
- packet_spacing: gst::ClockTime,
- equidistant: i32,
-
- discont: bool,
- eos: bool,
-
- last_popped_seqnum: Option<u16>,
- last_popped_pts: gst::ClockTime,
-
- stats: Stats,
-
- earliest_pts: gst::ClockTime,
- earliest_seqnum: Option<u16>,
-
- wait_handle: Option<(gst::ClockTime, AbortHandle)>,
+#[derive(Debug, Eq, PartialEq, Ord, PartialOrd, Hash, Clone, Copy)]
+pub enum RTPJitterBufferMode {
+ r#None,
+ Slave,
+ Buffer,
+ Synced,
+ __Unknown(i32),
}
-impl Default for State {
- fn default() -> State {
- State {
- jbuf: glib::SendUniqueCell::new(RTPJitterBuffer::new()).unwrap(),
-
- last_res: Ok(gst::FlowSuccess::Ok),
- position: gst::CLOCK_TIME_NONE,
-
- segment: gst::FormattedSegment::<gst::ClockTime>::new(),
- clock_rate: None,
-
- packet_spacing: gst::ClockTime::zero(),
- equidistant: 0,
-
- discont: true,
- eos: false,
-
- last_popped_seqnum: None,
- last_popped_pts: gst::CLOCK_TIME_NONE,
-
- stats: Stats::default(),
-
- earliest_pts: gst::CLOCK_TIME_NONE,
- earliest_seqnum: None,
-
- wait_handle: None,
- }
+impl RTPJitterBuffer {
+ pub fn new() -> RTPJitterBuffer {
+ unsafe { from_glib_full(ffi::rtp_jitter_buffer_new()) }
}
-}
-
-struct JitterBufferTask {
- element: gst::Element,
- src_pad_handler: SrcHandler,
- sink_pad_handler: SinkHandler,
-}
-impl JitterBufferTask {
- fn new(
- element: &gst::Element,
- src_pad_handler: &SrcHandler,
- sink_pad_handler: &SinkHandler,
- ) -> Self {
- JitterBufferTask {
- element: element.clone(),
- src_pad_handler: src_pad_handler.clone(),
- sink_pad_handler: sink_pad_handler.clone(),
- }
+ #[allow(dead_code)]
+ pub fn get_mode(&self) -> RTPJitterBufferMode {
+ unsafe { from_glib(ffi::rtp_jitter_buffer_get_mode(self.to_glib_none().0)) }
}
-}
-impl TaskImpl for JitterBufferTask {
- fn start(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
- async move {
- gst_log!(CAT, obj: &self.element, "Starting task");
-
- self.src_pad_handler.clear();
- self.sink_pad_handler.clear();
-
- let jb = JitterBuffer::from_instance(&self.element);
- *jb.state.lock().unwrap() = State::default();
-
- gst_log!(CAT, obj: &self.element, "Task started");
- Ok(())
- }
- .boxed()
+ #[allow(dead_code)]
+ pub fn set_mode(&self, mode: RTPJitterBufferMode) {
+ unsafe { ffi::rtp_jitter_buffer_set_mode(self.to_glib_none().0, mode.to_glib()) }
}
- fn iterate(&mut self) -> BoxFuture<'_, Result<(), gst::FlowError>> {
- async move {
- let jb = JitterBuffer::from_instance(&self.element);
- let (latency, context_wait) = {
- let settings = jb.settings.lock().unwrap();
- (
- settings.latency_ms as u64 * gst::MSECOND,
- settings.context_wait as u64 * gst::MSECOND,
- )
- };
-
- loop {
- let delay_fut = {
- let mut state = jb.state.lock().unwrap();
- let (_, next_wakeup) = self.src_pad_handler.get_next_wakeup(
- &self.element,
- &state,
- latency,
- context_wait,
- );
-
- let (delay_fut, abort_handle) = match next_wakeup {
- Some((_, delay)) if delay == Duration::from_nanos(0) => (None, None),
- _ => {
- let (delay_fut, abort_handle) = abortable(async move {
- match next_wakeup {
- Some((_, delay)) => {
- runtime::time::delay_for(delay).await;
- }
- None => {
- future::pending::<()>().await;
- }
- };
- });
-
- let next_wakeup =
- next_wakeup.map(|w| w.0).unwrap_or(gst::CLOCK_TIME_NONE);
- (Some(delay_fut), Some((next_wakeup, abort_handle)))
- }
- };
-
- state.wait_handle = abort_handle;
-
- delay_fut
- };
-
- // Got aborted, reschedule if needed
- if let Some(delay_fut) = delay_fut {
- gst_debug!(CAT, obj: &self.element, "Waiting");
- if let Err(Aborted) = delay_fut.await {
- gst_debug!(CAT, obj: &self.element, "Waiting aborted");
- return Ok(());
- }
- }
-
- let (head_pts, head_seq) = {
- let state = jb.state.lock().unwrap();
- //
- // Check earliest PTS as we have just taken the lock
- let (now, next_wakeup) = self.src_pad_handler.get_next_wakeup(
- &self.element,
- &state,
- latency,
- context_wait,
- );
-
- gst_debug!(
- CAT,
- obj: &self.element,
- "Woke up at {}, earliest_pts {}",
- now,
- state.earliest_pts
- );
-
- if let Some((next_wakeup, _)) = next_wakeup {
- if next_wakeup > now {
- // Reschedule and wait a bit longer in the next iteration
- return Ok(());
- }
- } else {
- return Ok(());
- }
-
- let (head_pts, head_seq) = state.jbuf.borrow().peek();
-
- (head_pts, head_seq)
- };
-
- let res = self.src_pad_handler.pop_and_push(&self.element).await;
-
- {
- let mut state = jb.state.lock().unwrap();
-
- state.last_res = res;
-
- if head_pts == state.earliest_pts && head_seq == state.earliest_seqnum {
- let (earliest_pts, earliest_seqnum) = state.jbuf.borrow().find_earliest();
- state.earliest_pts = earliest_pts;
- state.earliest_seqnum = earliest_seqnum;
- }
-
- if res.is_ok() {
- // Return and reschedule if the next packet would be in the future
- // Check earliest PTS as we have just taken the lock
- let (now, next_wakeup) = self.src_pad_handler.get_next_wakeup(
- &self.element,
- &state,
- latency,
- context_wait,
- );
- if let Some((next_wakeup, _)) = next_wakeup {
- if next_wakeup > now {
- // Reschedule and wait a bit longer in the next iteration
- return Ok(());
- }
- } else {
- return Ok(());
- }
- }
- }
-
- if let Err(err) = res {
- match err {
- gst::FlowError::Eos => {
- gst_debug!(CAT, obj: &self.element, "Pushing EOS event");
- let _ = jb.src_pad.push_event(gst::event::Eos::new()).await;
- }
- gst::FlowError::Flushing => gst_debug!(CAT, obj: &self.element, "Flushing"),
- err => gst_error!(CAT, obj: &self.element, "Error {}", err),
- }
-
- return Err(err);
- }
- }
- }
- .boxed()
+ #[allow(dead_code)]
+ pub fn get_delay(&self) -> gst::ClockTime {
+ unsafe { from_glib(ffi::rtp_jitter_buffer_get_delay(self.to_glib_none().0)) }
}
- fn stop(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
- async move {
- gst_log!(CAT, obj: &self.element, "Stopping task");
-
- let jb = JitterBuffer::from_instance(&self.element);
- let mut jb_state = jb.state.lock().unwrap();
-
- if let Some((_, abort_handle)) = jb_state.wait_handle.take() {
- abort_handle.abort();
- }
-
- self.src_pad_handler.clear();
- self.sink_pad_handler.clear();
-
- *jb_state = State::default();
-
- gst_log!(CAT, obj: &self.element, "Task stopped");
- Ok(())
- }
- .boxed()
+ pub fn set_delay(&self, delay: gst::ClockTime) {
+ unsafe { ffi::rtp_jitter_buffer_set_delay(self.to_glib_none().0, delay.to_glib()) }
}
-}
-
-struct JitterBuffer {
- sink_pad: PadSink,
- src_pad: PadSrc,
- sink_pad_handler: SinkHandler,
- src_pad_handler: SrcHandler,
- task: Task,
- state: StdMutex<State>,
- settings: StdMutex<Settings>,
-}
-
-lazy_static! {
- static ref CAT: gst::DebugCategory = gst::DebugCategory::new(
- "ts-jitterbuffer",
- gst::DebugColorFlags::empty(),
- Some("Thread-sharing jitterbuffer"),
- );
-}
-impl JitterBuffer {
- fn clear_pt_map(&self, element: &gst::Element) {
- gst_info!(CAT, obj: element, "Clearing PT map");
-
- let mut state = self.state.lock().unwrap();
- state.clock_rate = None;
- state.jbuf.borrow().reset_skew();
+ pub fn set_clock_rate(&self, clock_rate: u32) {
+ unsafe { ffi::rtp_jitter_buffer_set_clock_rate(self.to_glib_none().0, clock_rate) }
}
- fn prepare(&self, element: &gst::Element) -> Result<(), gst::ErrorMessage> {
- gst_info!(CAT, obj: element, "Preparing");
-
- let context = {
- let settings = self.settings.lock().unwrap();
- Context::acquire(&settings.context, settings.context_wait).unwrap()
- };
-
- self.task
- .prepare(
- JitterBufferTask::new(element, &self.src_pad_handler, &self.sink_pad_handler),
- context,
- )
- .map_err(|err| {
- gst_error_msg!(
- gst::ResourceError::OpenRead,
- ["Error preparing Task: {:?}", err]
- )
- })?;
-
- gst_info!(CAT, obj: element, "Prepared");
-
- Ok(())
+ #[allow(dead_code)]
+ pub fn get_clock_rate(&self) -> u32 {
+ unsafe { ffi::rtp_jitter_buffer_get_clock_rate(self.to_glib_none().0) }
}
- fn unprepare(&self, element: &gst::Element) {
- gst_debug!(CAT, obj: element, "Unpreparing");
- self.task.unprepare().unwrap();
- gst_debug!(CAT, obj: element, "Unprepared");
- }
+ pub fn calculate_pts(
+ &self,
+ dts: gst::ClockTime,
+ estimated_dts: bool,
+ rtptime: u32,
+ base_time: gst::ClockTime,
+ gap: i32,
+ is_rtx: bool,
+ ) -> gst::ClockTime {
+ unsafe {
+ let pts = ffi::rtp_jitter_buffer_calculate_pts(
+ self.to_glib_none().0,
+ dts.to_glib(),
+ estimated_dts.to_glib(),
+ rtptime,
+ base_time.to_glib(),
+ gap,
+ is_rtx.to_glib(),
+ );
- fn start(&self, element: &gst::Element) -> Result<(), gst::ErrorMessage> {
- gst_debug!(CAT, obj: element, "Starting");
- self.task.start()?;
- gst_debug!(CAT, obj: element, "Started");
- Ok(())
+ if pts == gst_ffi::GST_CLOCK_TIME_NONE {
+ gst::CLOCK_TIME_NONE
+ } else {
+ pts.into()
+ }
+ }
}
- fn stop(&self, element: &gst::Element) -> Result<(), gst::ErrorMessage> {
- gst_debug!(CAT, obj: element, "Stopping");
- self.task.stop()?;
- gst_debug!(CAT, obj: element, "Stopped");
- Ok(())
+ pub fn insert(&self, mut item: RTPJitterBufferItem) -> (bool, bool, i32) {
+ unsafe {
+ let mut head = mem::MaybeUninit::uninit();
+ let mut percent = mem::MaybeUninit::uninit();
+ let ptr = item.0.take().expect("Invalid wrapper");
+ let ret: bool = from_glib(ffi::rtp_jitter_buffer_insert(
+ self.to_glib_none().0,
+ ptr.as_ptr(),
+ head.as_mut_ptr(),
+ percent.as_mut_ptr(),
+ ));
+ if !ret {
+ item.0 = Some(ptr);
+ }
+ (ret, from_glib(head.assume_init()), percent.assume_init())
+ }
}
-}
-
-impl ObjectSubclass for JitterBuffer {
- const NAME: &'static str = "RsTsJitterBuffer";
- type ParentType = gst::Element;
- type Instance = gst::subclass::ElementInstanceStruct<Self>;
- type Class = subclass::simple::ClassStruct<Self>;
-
- glib_object_subclass!();
- fn class_init(klass: &mut subclass::simple::ClassStruct<Self>) {
- klass.set_metadata(
- "Thread-sharing jitterbuffer",
- "Generic",
- "Simple jitterbuffer",
- "Mathieu Duponchelle <mathieu@centricular.com>",
- );
+ pub fn find_earliest(&self) -> (gst::ClockTime, Option<u16>) {
+ unsafe {
+ let mut pts = mem::MaybeUninit::uninit();
+ let mut seqnum = mem::MaybeUninit::uninit();
- let caps = gst::Caps::new_any();
-
- let sink_pad_template = gst::PadTemplate::new(
- "sink",
- gst::PadDirection::Sink,
- gst::PadPresence::Always,
- &caps,
- )
- .unwrap();
- klass.add_pad_template(sink_pad_template);
- klass.add_signal(
- "request-pt-map",
- glib::SignalFlags::RUN_LAST,
- &[u32::static_type()],
- gst::Caps::static_type(),
- );
+ ffi::rtp_jitter_buffer_find_earliest(
+ self.to_glib_none().0,
+ pts.as_mut_ptr(),
+ seqnum.as_mut_ptr(),
+ );
+ let pts = pts.assume_init();
+ let seqnum = seqnum.assume_init();
- klass.add_signal_with_class_handler(
- "clear-pt-map",
- glib::SignalFlags::RUN_LAST | glib::SignalFlags::ACTION,
- &[],
- glib::types::Type::Unit,
- |_, args| {
- let element = args[0]
- .get::<gst::Element>()
- .expect("signal arg")
- .expect("missing signal arg");
- let jb = Self::from_instance(&element);
- jb.clear_pt_map(&element);
+ let seqnum = if seqnum == std::u32::MAX {
None
- },
- );
-
- let src_pad_template = gst::PadTemplate::new(
- "src",
- gst::PadDirection::Src,
- gst::PadPresence::Always,
- &caps,
- )
- .unwrap();
- klass.add_pad_template(src_pad_template);
- klass.install_properties(&PROPERTIES);
- }
-
- fn with_class(klass: &subclass::simple::ClassStruct<Self>) -> Self {
- let sink_pad_handler = SinkHandler::default();
- let src_pad_handler = SrcHandler::default();
+ } else {
+ Some(seqnum as u16)
+ };
- Self {
- sink_pad: PadSink::new(
- gst::Pad::from_template(&klass.get_pad_template("sink").unwrap(), Some("sink")),
- sink_pad_handler.clone(),
- ),
- src_pad: PadSrc::new(
- gst::Pad::from_template(&klass.get_pad_template("src").unwrap(), Some("src")),
- src_pad_handler.clone(),
- ),
- sink_pad_handler,
- src_pad_handler,
- task: Task::default(),
- state: StdMutex::new(State::default()),
- settings: StdMutex::new(Settings::default()),
+ if pts == gst_ffi::GST_CLOCK_TIME_NONE {
+ (gst::CLOCK_TIME_NONE, seqnum)
+ } else {
+ (pts.into(), seqnum)
+ }
}
}
-}
-
-impl ObjectImpl for JitterBuffer {
- fn set_property(&self, obj: &glib::Object, id: usize, value: &glib::Value) {
- let prop = &PROPERTIES[id];
-
- match *prop {
- subclass::Property("latency", ..) => {
- let latency_ms = {
- let mut settings = self.settings.lock().unwrap();
- settings.latency_ms = value.get_some().expect("type checked upstream");
- settings.latency_ms as u64
- };
- let state = self.state.lock().unwrap();
- state.jbuf.borrow().set_delay(latency_ms * gst::MSECOND);
+ pub fn pop(&self) -> (Option<RTPJitterBufferItem>, i32) {
+ unsafe {
+ let mut percent = mem::MaybeUninit::uninit();
+ let item = ffi::rtp_jitter_buffer_pop(self.to_glib_none().0, percent.as_mut_ptr());
- let element = obj.downcast_ref::<gst::Element>().unwrap();
- let _ = element.post_message(gst::message::Latency::builder().src(element).build());
- }
- subclass::Property("do-lost", ..) => {
- let mut settings = self.settings.lock().unwrap();
- settings.do_lost = value.get_some().expect("type checked upstream");
- }
- subclass::Property("max-dropout-time", ..) => {
- let mut settings = self.settings.lock().unwrap();
- settings.max_dropout_time = value.get_some().expect("type checked upstream");
- }
- subclass::Property("max-misorder-time", ..) => {
- let mut settings = self.settings.lock().unwrap();
- settings.max_misorder_time = value.get_some().expect("type checked upstream");
- }
- subclass::Property("context", ..) => {
- let mut settings = self.settings.lock().unwrap();
- settings.context = value
- .get()
- .expect("type checked upstream")
- .unwrap_or_else(|| "".into());
- }
- subclass::Property("context-wait", ..) => {
- let mut settings = self.settings.lock().unwrap();
- settings.context_wait = value.get_some().expect("type checked upstream");
- }
- _ => unimplemented!(),
+ (
+ if item.is_null() {
+ None
+ } else {
+ Some(RTPJitterBufferItem(Some(ptr::NonNull::new_unchecked(item))))
+ },
+ percent.assume_init(),
+ )
}
}
- fn get_property(&self, _obj: &glib::Object, id: usize) -> Result<glib::Value, ()> {
- let prop = &PROPERTIES[id];
-
- match *prop {
- subclass::Property("latency", ..) => {
- let settings = self.settings.lock().unwrap();
- Ok(settings.latency_ms.to_value())
- }
- subclass::Property("do-lost", ..) => {
- let settings = self.settings.lock().unwrap();
- Ok(settings.do_lost.to_value())
- }
- subclass::Property("max-dropout-time", ..) => {
- let settings = self.settings.lock().unwrap();
- Ok(settings.max_dropout_time.to_value())
- }
- subclass::Property("max-misorder-time", ..) => {
- let settings = self.settings.lock().unwrap();
- Ok(settings.max_misorder_time.to_value())
- }
- subclass::Property("stats", ..) => {
- let state = self.state.lock().unwrap();
- let s = gst::Structure::new(
- "application/x-rtp-jitterbuffer-stats",
- &[
- ("num-pushed", &state.stats.num_pushed),
- ("num-lost", &state.stats.num_lost),
- ("num-late", &state.stats.num_late),
- ],
- );
- Ok(s.to_value())
- }
- subclass::Property("context", ..) => {
- let settings = self.settings.lock().unwrap();
- Ok(settings.context.to_value())
- }
- subclass::Property("context-wait", ..) => {
- let settings = self.settings.lock().unwrap();
- Ok(settings.context_wait.to_value())
+ pub fn peek(&self) -> (gst::ClockTime, Option<u16>) {
+ unsafe {
+ let item = ffi::rtp_jitter_buffer_peek(self.to_glib_none().0);
+ if item.is_null() {
+ (gst::CLOCK_TIME_NONE, None)
+ } else {
+ let seqnum = (*item).seqnum;
+ let seqnum = if seqnum == std::u32::MAX {
+ None
+ } else {
+ Some(seqnum as u16)
+ };
+ ((*item).pts.into(), seqnum)
}
- _ => unimplemented!(),
}
}
- fn constructed(&self, obj: &glib::Object) {
- self.parent_constructed(obj);
-
- let element = obj.downcast_ref::<gst::Element>().unwrap();
- element.add_pad(self.sink_pad.gst_pad()).unwrap();
- element.add_pad(self.src_pad.gst_pad()).unwrap();
- element
- .set_element_flags(gst::ElementFlags::PROVIDE_CLOCK | gst::ElementFlags::REQUIRE_CLOCK);
- }
-}
-
-impl ElementImpl for JitterBuffer {
- fn change_state(
- &self,
- element: &gst::Element,
- transition: gst::StateChange,
- ) -> Result<gst::StateChangeSuccess, gst::StateChangeError> {
- gst_trace!(CAT, obj: element, "Changing state {:?}", transition);
-
- match transition {
- gst::StateChange::NullToReady => {
- self.prepare(element).map_err(|err| {
- element.post_error_message(err);
- gst::StateChangeError
- })?;
- }
- gst::StateChange::PausedToReady => {
- self.stop(element).map_err(|_| gst::StateChangeError)?;
- }
- gst::StateChange::ReadyToNull => {
- self.unprepare(element);
- }
- _ => (),
+ pub fn flush(&self) {
+ unsafe extern "C" fn free_item(item: glib_ffi::gpointer, _: glib_ffi::gpointer) {
+ let _ =
+ RTPJitterBufferItem(Some(ptr::NonNull::new(item as *mut _).expect("NULL item")));
}
- let mut success = self.parent_change_state(element, transition)?;
-
- match transition {
- gst::StateChange::ReadyToPaused => {
- self.start(element).map_err(|_| gst::StateChangeError)?;
- success = gst::StateChangeSuccess::NoPreroll;
- }
- gst::StateChange::PlayingToPaused => {
- success = gst::StateChangeSuccess::NoPreroll;
- }
- _ => (),
+ unsafe {
+ ffi::rtp_jitter_buffer_flush(self.to_glib_none().0, Some(free_item));
}
-
- Ok(success)
}
- fn provide_clock(&self, _element: &gst::Element) -> Option<gst::Clock> {
- Some(gst::SystemClock::obtain())
+ pub fn reset_skew(&self) {
+ unsafe { ffi::rtp_jitter_buffer_reset_skew(self.to_glib_none().0) }
}
}
-pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> {
- gst::Element::register(
- Some(plugin),
- "ts-jitterbuffer",
- gst::Rank::None,
- JitterBuffer::get_type(),
- )
+impl Default for RTPJitterBuffer {
+ fn default() -> Self {
+ RTPJitterBuffer::new()
+ }
}
diff --git a/generic/threadshare/src/jitterbuffer/mod.rs b/generic/threadshare/src/jitterbuffer/mod.rs
index 01745aa77..0dc7d2806 100644
--- a/generic/threadshare/src/jitterbuffer/mod.rs
+++ b/generic/threadshare/src/jitterbuffer/mod.rs
@@ -15,472 +15,29 @@
// Free Software Foundation, Inc., 51 Franklin Street, Suite 500,
// Boston, MA 02110-1335, USA.
-use glib_sys as glib_ffi;
-use gstreamer_sys as gst_ffi;
-
-use std::ptr;
-use std::u32;
+use glib::prelude::*;
+mod ffi;
+mod imp;
#[allow(clippy::module_inception)]
pub mod jitterbuffer;
-pub mod ffi {
- use glib_ffi::{gboolean, gpointer, GList, GType};
- use glib_sys as glib_ffi;
-
- use gst_ffi::GstClockTime;
- use gstreamer_sys as gst_ffi;
- use libc::{c_int, c_uint, c_ulonglong, c_ushort, c_void};
-
- #[repr(C)]
- #[derive(Copy, Clone)]
- pub struct RTPJitterBufferItem {
- pub data: gpointer,
- pub next: *mut GList,
- pub prev: *mut GList,
- pub r#type: c_uint,
- pub dts: GstClockTime,
- pub pts: GstClockTime,
- pub seqnum: c_uint,
- pub count: c_uint,
- pub rtptime: c_uint,
- }
-
- #[repr(C)]
- pub struct RTPJitterBuffer(c_void);
-
- #[repr(C)]
- #[derive(Copy, Clone)]
- pub struct RTPPacketRateCtx {
- probed: gboolean,
- clock_rate: c_int,
- last_seqnum: c_ushort,
- last_ts: c_ulonglong,
- avg_packet_rate: c_uint,
- }
-
- pub type RTPJitterBufferMode = c_int;
- pub const RTP_JITTER_BUFFER_MODE_NONE: RTPJitterBufferMode = 0;
- pub const RTP_JITTER_BUFFER_MODE_SLAVE: RTPJitterBufferMode = 1;
- pub const RTP_JITTER_BUFFER_MODE_BUFFER: RTPJitterBufferMode = 2;
- pub const RTP_JITTER_BUFFER_MODE_SYNCED: RTPJitterBufferMode = 4;
-
- extern "C" {
- pub fn rtp_jitter_buffer_new() -> *mut RTPJitterBuffer;
- pub fn rtp_jitter_buffer_get_type() -> GType;
- #[allow(dead_code)]
- pub fn rtp_jitter_buffer_get_mode(jbuf: *mut RTPJitterBuffer) -> RTPJitterBufferMode;
- #[allow(dead_code)]
- pub fn rtp_jitter_buffer_set_mode(jbuf: *mut RTPJitterBuffer, mode: RTPJitterBufferMode);
- #[allow(dead_code)]
- pub fn rtp_jitter_buffer_get_delay(jbuf: *mut RTPJitterBuffer) -> GstClockTime;
- pub fn rtp_jitter_buffer_set_delay(jbuf: *mut RTPJitterBuffer, delay: GstClockTime);
- pub fn rtp_jitter_buffer_set_clock_rate(jbuf: *mut RTPJitterBuffer, clock_rate: c_uint);
- #[allow(dead_code)]
- pub fn rtp_jitter_buffer_get_clock_rate(jbuf: *mut RTPJitterBuffer) -> c_uint;
- pub fn rtp_jitter_buffer_reset_skew(jbuf: *mut RTPJitterBuffer);
-
- pub fn rtp_jitter_buffer_flush(jbuf: *mut RTPJitterBuffer, free_func: glib_ffi::GFunc);
- pub fn rtp_jitter_buffer_find_earliest(
- jbuf: *mut RTPJitterBuffer,
- pts: *mut GstClockTime,
- seqnum: *mut c_uint,
- );
- pub fn rtp_jitter_buffer_calculate_pts(
- jbuf: *mut RTPJitterBuffer,
- dts: GstClockTime,
- estimated_dts: gboolean,
- rtptime: c_uint,
- base_time: GstClockTime,
- gap: c_int,
- is_rtx: gboolean,
- ) -> GstClockTime;
- pub fn rtp_jitter_buffer_insert(
- jbuf: *mut RTPJitterBuffer,
- item: *mut RTPJitterBufferItem,
- head: *mut gboolean,
- percent: *mut c_int,
- ) -> gboolean;
- pub fn rtp_jitter_buffer_pop(
- jbuf: *mut RTPJitterBuffer,
- percent: *mut c_int,
- ) -> *mut RTPJitterBufferItem;
- pub fn rtp_jitter_buffer_peek(jbuf: *mut RTPJitterBuffer) -> *mut RTPJitterBufferItem;
-
- pub fn gst_rtp_packet_rate_ctx_reset(ctx: *mut RTPPacketRateCtx, clock_rate: c_int);
- pub fn gst_rtp_packet_rate_ctx_update(
- ctx: *mut RTPPacketRateCtx,
- seqnum: c_ushort,
- ts: c_uint,
- ) -> c_uint;
- pub fn gst_rtp_packet_rate_ctx_get_max_dropout(
- ctx: *mut RTPPacketRateCtx,
- time_ms: c_int,
- ) -> c_uint;
- #[allow(dead_code)]
- pub fn gst_rtp_packet_rate_ctx_get_max_disorder(
- ctx: *mut RTPPacketRateCtx,
- time_ms: c_int,
- ) -> c_uint;
- }
-}
-
use glib::glib_wrapper;
-use glib::prelude::*;
-use glib::translate::*;
-
-use std::mem;
glib_wrapper! {
- pub struct RTPJitterBuffer(Object<ffi::RTPJitterBuffer>);
-
- match fn {
- get_type => || ffi::rtp_jitter_buffer_get_type(),
- }
-}
-
-unsafe impl glib::SendUnique for RTPJitterBuffer {
- fn is_unique(&self) -> bool {
- self.ref_count() == 1
- }
+ pub struct JitterBuffer(ObjectSubclass<imp::JitterBuffer>) @extends gst::Element, gst::Object;
}
-impl ToGlib for RTPJitterBufferMode {
- type GlibType = ffi::RTPJitterBufferMode;
-
- fn to_glib(&self) -> ffi::RTPJitterBufferMode {
- match *self {
- RTPJitterBufferMode::None => ffi::RTP_JITTER_BUFFER_MODE_NONE,
- RTPJitterBufferMode::Slave => ffi::RTP_JITTER_BUFFER_MODE_SLAVE,
- RTPJitterBufferMode::Buffer => ffi::RTP_JITTER_BUFFER_MODE_BUFFER,
- RTPJitterBufferMode::Synced => ffi::RTP_JITTER_BUFFER_MODE_SYNCED,
- RTPJitterBufferMode::__Unknown(value) => value,
- }
- }
-}
-
-impl FromGlib<ffi::RTPJitterBufferMode> for RTPJitterBufferMode {
- fn from_glib(value: ffi::RTPJitterBufferMode) -> Self {
- match value {
- 0 => RTPJitterBufferMode::None,
- 1 => RTPJitterBufferMode::Slave,
- 2 => RTPJitterBufferMode::Buffer,
- 4 => RTPJitterBufferMode::Synced,
- value => RTPJitterBufferMode::__Unknown(value),
- }
- }
-}
-
-pub struct RTPJitterBufferItem(Option<ptr::NonNull<ffi::RTPJitterBufferItem>>);
-
-unsafe impl Send for RTPJitterBufferItem {}
-
-impl RTPJitterBufferItem {
- pub fn new(
- buffer: gst::Buffer,
- dts: gst::ClockTime,
- pts: gst::ClockTime,
- seqnum: Option<u16>,
- rtptime: u32,
- ) -> RTPJitterBufferItem {
- unsafe {
- let ptr = ptr::NonNull::new(glib_sys::g_slice_alloc0(mem::size_of::<
- ffi::RTPJitterBufferItem,
- >()) as *mut ffi::RTPJitterBufferItem)
- .expect("Allocation failed");
- ptr::write(
- ptr.as_ptr(),
- ffi::RTPJitterBufferItem {
- data: buffer.into_ptr() as *mut _,
- next: ptr::null_mut(),
- prev: ptr::null_mut(),
- r#type: 0,
- dts: dts.to_glib(),
- pts: pts.to_glib(),
- seqnum: seqnum.map(|s| s as u32).unwrap_or(u32::MAX),
- count: 1,
- rtptime,
- },
- );
-
- RTPJitterBufferItem(Some(ptr))
- }
- }
-
- pub fn into_buffer(mut self) -> gst::Buffer {
- unsafe {
- let item = self.0.take().expect("Invalid wrapper");
- let buf = item.as_ref().data as *mut gst_ffi::GstBuffer;
- glib_sys::g_slice_free1(
- mem::size_of::<ffi::RTPJitterBufferItem>(),
- item.as_ptr() as *mut _,
- );
- from_glib_full(buf)
- }
- }
-
- pub fn get_dts(&self) -> gst::ClockTime {
- unsafe {
- let item = self.0.as_ref().expect("Invalid wrapper");
- if item.as_ref().dts == gst_ffi::GST_CLOCK_TIME_NONE {
- gst::CLOCK_TIME_NONE
- } else {
- gst::ClockTime(Some(item.as_ref().dts))
- }
- }
- }
-
- pub fn get_pts(&self) -> gst::ClockTime {
- unsafe {
- let item = self.0.as_ref().expect("Invalid wrapper");
- if item.as_ref().pts == gst_ffi::GST_CLOCK_TIME_NONE {
- gst::CLOCK_TIME_NONE
- } else {
- gst::ClockTime(Some(item.as_ref().pts))
- }
- }
- }
-
- pub fn get_seqnum(&self) -> Option<u16> {
- unsafe {
- let item = self.0.as_ref().expect("Invalid wrapper");
- if item.as_ref().seqnum == u32::MAX {
- None
- } else {
- Some(item.as_ref().seqnum as u16)
- }
- }
- }
-
- #[allow(dead_code)]
- pub fn get_rtptime(&self) -> u32 {
- unsafe {
- let item = self.0.as_ref().expect("Invalid wrapper");
- item.as_ref().rtptime
- }
- }
-}
-
-impl Drop for RTPJitterBufferItem {
- fn drop(&mut self) {
- unsafe {
- if let Some(ref item) = self.0 {
- if !item.as_ref().data.is_null() {
- gst_ffi::gst_mini_object_unref(item.as_ref().data as *mut _);
- }
-
- glib_sys::g_slice_free1(
- mem::size_of::<ffi::RTPJitterBufferItem>(),
- item.as_ptr() as *mut _,
- );
- }
- }
- }
-}
-
-pub struct RTPPacketRateCtx(Box<ffi::RTPPacketRateCtx>);
-
-unsafe impl Send for RTPPacketRateCtx {}
-
-impl RTPPacketRateCtx {
- pub fn new() -> RTPPacketRateCtx {
- unsafe {
- let mut ptr = std::mem::MaybeUninit::uninit();
- ffi::gst_rtp_packet_rate_ctx_reset(ptr.as_mut_ptr(), -1);
- RTPPacketRateCtx(Box::new(ptr.assume_init()))
- }
- }
-
- pub fn reset(&mut self, clock_rate: i32) {
- unsafe { ffi::gst_rtp_packet_rate_ctx_reset(&mut *self.0, clock_rate) }
- }
-
- pub fn update(&mut self, seqnum: u16, ts: u32) -> u32 {
- unsafe { ffi::gst_rtp_packet_rate_ctx_update(&mut *self.0, seqnum, ts) }
- }
-
- pub fn get_max_dropout(&mut self, time_ms: i32) -> u32 {
- unsafe { ffi::gst_rtp_packet_rate_ctx_get_max_dropout(&mut *self.0, time_ms) }
- }
-
- #[allow(dead_code)]
- pub fn get_max_disorder(&mut self, time_ms: i32) -> u32 {
- unsafe { ffi::gst_rtp_packet_rate_ctx_get_max_disorder(&mut *self.0, time_ms) }
- }
-}
-
-impl Default for RTPPacketRateCtx {
- fn default() -> Self {
- RTPPacketRateCtx::new()
- }
-}
-
-#[derive(Debug, Eq, PartialEq, Ord, PartialOrd, Hash, Clone, Copy)]
-pub enum RTPJitterBufferMode {
- r#None,
- Slave,
- Buffer,
- Synced,
- __Unknown(i32),
-}
-
-impl RTPJitterBuffer {
- pub fn new() -> RTPJitterBuffer {
- unsafe { from_glib_full(ffi::rtp_jitter_buffer_new()) }
- }
-
- #[allow(dead_code)]
- pub fn get_mode(&self) -> RTPJitterBufferMode {
- unsafe { from_glib(ffi::rtp_jitter_buffer_get_mode(self.to_glib_none().0)) }
- }
-
- #[allow(dead_code)]
- pub fn set_mode(&self, mode: RTPJitterBufferMode) {
- unsafe { ffi::rtp_jitter_buffer_set_mode(self.to_glib_none().0, mode.to_glib()) }
- }
-
- #[allow(dead_code)]
- pub fn get_delay(&self) -> gst::ClockTime {
- unsafe { from_glib(ffi::rtp_jitter_buffer_get_delay(self.to_glib_none().0)) }
- }
-
- pub fn set_delay(&self, delay: gst::ClockTime) {
- unsafe { ffi::rtp_jitter_buffer_set_delay(self.to_glib_none().0, delay.to_glib()) }
- }
-
- pub fn set_clock_rate(&self, clock_rate: u32) {
- unsafe { ffi::rtp_jitter_buffer_set_clock_rate(self.to_glib_none().0, clock_rate) }
- }
-
- #[allow(dead_code)]
- pub fn get_clock_rate(&self) -> u32 {
- unsafe { ffi::rtp_jitter_buffer_get_clock_rate(self.to_glib_none().0) }
- }
-
- pub fn calculate_pts(
- &self,
- dts: gst::ClockTime,
- estimated_dts: bool,
- rtptime: u32,
- base_time: gst::ClockTime,
- gap: i32,
- is_rtx: bool,
- ) -> gst::ClockTime {
- unsafe {
- let pts = ffi::rtp_jitter_buffer_calculate_pts(
- self.to_glib_none().0,
- dts.to_glib(),
- estimated_dts.to_glib(),
- rtptime,
- base_time.to_glib(),
- gap,
- is_rtx.to_glib(),
- );
-
- if pts == gst_ffi::GST_CLOCK_TIME_NONE {
- gst::CLOCK_TIME_NONE
- } else {
- pts.into()
- }
- }
- }
-
- pub fn insert(&self, mut item: RTPJitterBufferItem) -> (bool, bool, i32) {
- unsafe {
- let mut head = mem::MaybeUninit::uninit();
- let mut percent = mem::MaybeUninit::uninit();
- let ptr = item.0.take().expect("Invalid wrapper");
- let ret: bool = from_glib(ffi::rtp_jitter_buffer_insert(
- self.to_glib_none().0,
- ptr.as_ptr(),
- head.as_mut_ptr(),
- percent.as_mut_ptr(),
- ));
- if !ret {
- item.0 = Some(ptr);
- }
- (ret, from_glib(head.assume_init()), percent.assume_init())
- }
- }
-
- pub fn find_earliest(&self) -> (gst::ClockTime, Option<u16>) {
- unsafe {
- let mut pts = mem::MaybeUninit::uninit();
- let mut seqnum = mem::MaybeUninit::uninit();
-
- ffi::rtp_jitter_buffer_find_earliest(
- self.to_glib_none().0,
- pts.as_mut_ptr(),
- seqnum.as_mut_ptr(),
- );
- let pts = pts.assume_init();
- let seqnum = seqnum.assume_init();
-
- let seqnum = if seqnum == u32::MAX {
- None
- } else {
- Some(seqnum as u16)
- };
-
- if pts == gst_ffi::GST_CLOCK_TIME_NONE {
- (gst::CLOCK_TIME_NONE, seqnum)
- } else {
- (pts.into(), seqnum)
- }
- }
- }
-
- pub fn pop(&self) -> (Option<RTPJitterBufferItem>, i32) {
- unsafe {
- let mut percent = mem::MaybeUninit::uninit();
- let item = ffi::rtp_jitter_buffer_pop(self.to_glib_none().0, percent.as_mut_ptr());
-
- (
- if item.is_null() {
- None
- } else {
- Some(RTPJitterBufferItem(Some(ptr::NonNull::new_unchecked(item))))
- },
- percent.assume_init(),
- )
- }
- }
-
- pub fn peek(&self) -> (gst::ClockTime, Option<u16>) {
- unsafe {
- let item = ffi::rtp_jitter_buffer_peek(self.to_glib_none().0);
- if item.is_null() {
- (gst::CLOCK_TIME_NONE, None)
- } else {
- let seqnum = (*item).seqnum;
- let seqnum = if seqnum == u32::MAX {
- None
- } else {
- Some(seqnum as u16)
- };
- ((*item).pts.into(), seqnum)
- }
- }
- }
-
- pub fn flush(&self) {
- unsafe extern "C" fn free_item(item: glib_ffi::gpointer, _: glib_ffi::gpointer) {
- let _ =
- RTPJitterBufferItem(Some(ptr::NonNull::new(item as *mut _).expect("NULL item")));
- }
-
- unsafe {
- ffi::rtp_jitter_buffer_flush(self.to_glib_none().0, Some(free_item));
- }
- }
-
- pub fn reset_skew(&self) {
- unsafe { ffi::rtp_jitter_buffer_reset_skew(self.to_glib_none().0) }
- }
-}
+// GStreamer elements need to be thread-safe. For the private implementation this is automatically
+// enforced but for the public wrapper type we need to specify this manually.
+unsafe impl Send for JitterBuffer {}
+unsafe impl Sync for JitterBuffer {}
-impl Default for RTPJitterBuffer {
- fn default() -> Self {
- RTPJitterBuffer::new()
- }
+pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> {
+ gst::Element::register(
+ Some(plugin),
+ "ts-jitterbuffer",
+ gst::Rank::None,
+ JitterBuffer::static_type(),
+ )
}
diff --git a/generic/threadshare/src/lib.rs b/generic/threadshare/src/lib.rs
index af57fffc2..1264cbf56 100644
--- a/generic/threadshare/src/lib.rs
+++ b/generic/threadshare/src/lib.rs
@@ -53,7 +53,7 @@ fn plugin_init(plugin: &gst::Plugin) -> Result<(), glib::BoolError> {
queue::register(plugin)?;
proxy::register(plugin)?;
appsrc::register(plugin)?;
- jitterbuffer::jitterbuffer::register(plugin)?;
+ jitterbuffer::register(plugin)?;
inputselector::register(plugin)?;
Ok(())
diff --git a/generic/threadshare/src/proxy.rs b/generic/threadshare/src/proxy/imp.rs
index c524e9bbd..d4eb1aa2f 100644
--- a/generic/threadshare/src/proxy.rs
+++ b/generic/threadshare/src/proxy/imp.rs
@@ -41,7 +41,7 @@ use crate::runtime::{
Context, PadSink, PadSinkRef, PadSinkWeak, PadSrc, PadSrcRef, PadSrcWeak, Task,
};
-use super::dataqueue::{DataQueue, DataQueueItem};
+use crate::dataqueue::{DataQueue, DataQueueItem};
lazy_static! {
static ref PROXY_CONTEXTS: StdMutex<HashMap<String, Weak<StdMutex<ProxyContextInner>>>> =
@@ -301,7 +301,7 @@ impl PadSinkHandler for ProxySinkPadHandler {
buffer: gst::Buffer,
) -> BoxFuture<'static, Result<gst::FlowSuccess, gst::FlowError>> {
let pad_weak = pad.downgrade();
- let element = element.clone();
+ let element = element.clone().downcast::<super::ProxySink>().unwrap();
async move {
let pad = pad_weak.upgrade().expect("PadSink no longer exists");
@@ -322,7 +322,7 @@ impl PadSinkHandler for ProxySinkPadHandler {
list: gst::BufferList,
) -> BoxFuture<'static, Result<gst::FlowSuccess, gst::FlowError>> {
let pad_weak = pad.downgrade();
- let element = element.clone();
+ let element = element.clone().downcast::<super::ProxySink>().unwrap();
async move {
let pad = pad_weak.upgrade().expect("PadSink no longer exists");
gst_log!(SINK_CAT, obj: pad.gst_pad(), "Handling {:?}", list);
@@ -357,7 +357,7 @@ impl PadSinkHandler for ProxySinkPadHandler {
};
if let EventView::FlushStart(..) = event.view() {
- proxysink.stop(&element);
+ proxysink.stop(element.downcast_ref::<super::ProxySink>().unwrap());
}
if let Some(src_pad) = src_pad {
@@ -381,7 +381,7 @@ impl PadSinkHandler for ProxySinkPadHandler {
gst_log!(SINK_CAT, obj: pad.gst_pad(), "Handling serialized {:?}", event);
let pad_weak = pad.downgrade();
- let element = element.clone();
+ let element = element.clone().downcast::<super::ProxySink>().unwrap();
async move {
let pad = pad_weak.upgrade().expect("PadSink no longer exists");
let proxysink = ProxySink::from_instance(&element);
@@ -406,7 +406,7 @@ impl PadSinkHandler for ProxySinkPadHandler {
}
#[derive(Debug)]
-struct ProxySink {
+pub struct ProxySink {
sink_pad: PadSink,
proxy_ctx: StdMutex<Option<ProxyContext>>,
settings: StdMutex<SettingsSink>,
@@ -421,7 +421,7 @@ lazy_static! {
}
impl ProxySink {
- async fn schedule_pending_queue(&self, element: &gst::Element) {
+ async fn schedule_pending_queue(&self, element: &super::ProxySink) {
loop {
let more_queue_space_receiver = {
let proxy_ctx = self.proxy_ctx.lock().unwrap();
@@ -476,7 +476,7 @@ impl ProxySink {
async fn enqueue_item(
&self,
- element: &gst::Element,
+ element: &super::ProxySink,
item: DataQueueItem,
) -> Result<gst::FlowSuccess, gst::FlowError> {
let wait_fut = {
@@ -589,7 +589,7 @@ impl ProxySink {
shared_ctx.last_res
}
- fn prepare(&self, element: &gst::Element) -> Result<(), gst::ErrorMessage> {
+ fn prepare(&self, element: &super::ProxySink) -> Result<(), gst::ErrorMessage> {
gst_debug!(SINK_CAT, obj: element, "Preparing");
let proxy_context = self.settings.lock().unwrap().proxy_context.to_string();
@@ -614,13 +614,13 @@ impl ProxySink {
Ok(())
}
- fn unprepare(&self, element: &gst::Element) {
+ fn unprepare(&self, element: &super::ProxySink) {
gst_debug!(SINK_CAT, obj: element, "Unpreparing");
*self.proxy_ctx.lock().unwrap() = None;
gst_debug!(SINK_CAT, obj: element, "Unprepared");
}
- fn start(&self, element: &gst::Element) {
+ fn start(&self, element: &super::ProxySink) {
let proxy_ctx = self.proxy_ctx.lock().unwrap();
let mut shared_ctx = proxy_ctx.as_ref().unwrap().lock_shared();
@@ -637,7 +637,7 @@ impl ProxySink {
gst_debug!(SINK_CAT, obj: element, "Started");
}
- fn stop(&self, element: &gst::Element) {
+ fn stop(&self, element: &super::ProxySink) {
let proxy_ctx = self.proxy_ctx.lock().unwrap();
let mut shared_ctx = proxy_ctx.as_ref().unwrap().lock_shared();
@@ -652,13 +652,14 @@ impl ProxySink {
impl ObjectSubclass for ProxySink {
const NAME: &'static str = "RsTsProxySink";
+ type Type = super::ProxySink;
type ParentType = gst::Element;
type Instance = gst::subclass::ElementInstanceStruct<Self>;
type Class = subclass::simple::ClassStruct<Self>;
glib_object_subclass!();
- fn class_init(klass: &mut subclass::simple::ClassStruct<Self>) {
+ fn class_init(klass: &mut Self::Class) {
klass.set_metadata(
"Thread-sharing proxy sink",
"Sink/Generic",
@@ -680,7 +681,7 @@ impl ObjectSubclass for ProxySink {
klass.install_properties(&PROPERTIES_SINK);
}
- fn with_class(klass: &subclass::simple::ClassStruct<Self>) -> Self {
+ fn with_class(klass: &Self::Class) -> Self {
Self {
sink_pad: PadSink::new(
gst::Pad::from_template(&klass.get_pad_template("sink").unwrap(), Some("sink")),
@@ -693,7 +694,7 @@ impl ObjectSubclass for ProxySink {
}
impl ObjectImpl for ProxySink {
- fn set_property(&self, _obj: &glib::Object, id: usize, value: &glib::Value) {
+ fn set_property(&self, _obj: &Self::Type, id: usize, value: &glib::Value) {
let prop = &PROPERTIES_SINK[id];
let mut settings = self.settings.lock().unwrap();
@@ -708,7 +709,7 @@ impl ObjectImpl for ProxySink {
}
}
- fn get_property(&self, _obj: &glib::Object, id: usize) -> Result<glib::Value, ()> {
+ fn get_property(&self, _obj: &Self::Type, id: usize) -> Result<glib::Value, ()> {
let prop = &PROPERTIES_SINK[id];
let settings = self.settings.lock().unwrap();
@@ -718,20 +719,19 @@ impl ObjectImpl for ProxySink {
}
}
- fn constructed(&self, obj: &glib::Object) {
+ fn constructed(&self, obj: &Self::Type) {
self.parent_constructed(obj);
- let element = obj.downcast_ref::<gst::Element>().unwrap();
- element.add_pad(self.sink_pad.gst_pad()).unwrap();
+ obj.add_pad(self.sink_pad.gst_pad()).unwrap();
- super::set_element_flags(element, gst::ElementFlags::SINK);
+ crate::set_element_flags(obj, gst::ElementFlags::SINK);
}
}
impl ElementImpl for ProxySink {
fn change_state(
&self,
- element: &gst::Element,
+ element: &Self::Type,
transition: gst::StateChange,
) -> Result<gst::StateChangeSuccess, gst::StateChangeError> {
gst_trace!(SINK_CAT, obj: element, "Changing state {:?}", transition);
@@ -909,13 +909,13 @@ impl PadSrcHandler for ProxySrcPadHandler {
#[derive(Debug)]
struct ProxySrcTask {
- element: gst::Element,
+ element: super::ProxySrc,
src_pad: PadSrcWeak,
dataqueue: DataQueue,
}
impl ProxySrcTask {
- fn new(element: &gst::Element, src_pad: &PadSrc, dataqueue: DataQueue) -> Self {
+ fn new(element: &super::ProxySrc, src_pad: &PadSrc, dataqueue: DataQueue) -> Self {
ProxySrcTask {
element: element.clone(),
src_pad: src_pad.downgrade(),
@@ -1043,7 +1043,7 @@ impl TaskImpl for ProxySrcTask {
}
#[derive(Debug)]
-struct ProxySrc {
+pub struct ProxySrc {
src_pad: PadSrc,
task: Task,
proxy_ctx: StdMutex<Option<ProxyContext>>,
@@ -1060,7 +1060,7 @@ lazy_static! {
}
impl ProxySrc {
- fn prepare(&self, element: &gst::Element) -> Result<(), gst::ErrorMessage> {
+ fn prepare(&self, element: &super::ProxySrc) -> Result<(), gst::ErrorMessage> {
gst_debug!(SRC_CAT, obj: element, "Preparing");
let settings = self.settings.lock().unwrap().clone();
@@ -1126,7 +1126,7 @@ impl ProxySrc {
Ok(())
}
- fn unprepare(&self, element: &gst::Element) {
+ fn unprepare(&self, element: &super::ProxySrc) {
gst_debug!(SRC_CAT, obj: element, "Unpreparing");
{
@@ -1143,21 +1143,21 @@ impl ProxySrc {
gst_debug!(SRC_CAT, obj: element, "Unprepared");
}
- fn stop(&self, element: &gst::Element) -> Result<(), gst::ErrorMessage> {
+ fn stop(&self, element: &super::ProxySrc) -> Result<(), gst::ErrorMessage> {
gst_debug!(SRC_CAT, obj: element, "Stopping");
self.task.stop()?;
gst_debug!(SRC_CAT, obj: element, "Stopped");
Ok(())
}
- fn start(&self, element: &gst::Element) -> Result<(), gst::ErrorMessage> {
+ fn start(&self, element: &super::ProxySrc) -> Result<(), gst::ErrorMessage> {
gst_debug!(SRC_CAT, obj: element, "Starting");
self.task.start()?;
gst_debug!(SRC_CAT, obj: element, "Started");
Ok(())
}
- fn pause(&self, element: &gst::Element) -> Result<(), gst::ErrorMessage> {
+ fn pause(&self, element: &super::ProxySrc) -> Result<(), gst::ErrorMessage> {
gst_debug!(SRC_CAT, obj: element, "Pausing");
self.task.pause()?;
gst_debug!(SRC_CAT, obj: element, "Paused");
@@ -1167,13 +1167,14 @@ impl ProxySrc {
impl ObjectSubclass for ProxySrc {
const NAME: &'static str = "RsTsProxySrc";
+ type Type = super::ProxySrc;
type ParentType = gst::Element;
type Instance = gst::subclass::ElementInstanceStruct<Self>;
type Class = subclass::simple::ClassStruct<Self>;
glib_object_subclass!();
- fn class_init(klass: &mut subclass::simple::ClassStruct<Self>) {
+ fn class_init(klass: &mut Self::Class) {
klass.set_metadata(
"Thread-sharing proxy source",
"Source/Generic",
@@ -1199,7 +1200,7 @@ impl ObjectSubclass for ProxySrc {
unreachable!()
}
- fn with_class(klass: &subclass::simple::ClassStruct<Self>) -> Self {
+ fn with_class(klass: &Self::Class) -> Self {
Self {
src_pad: PadSrc::new(
gst::Pad::from_template(&klass.get_pad_template("src").unwrap(), Some("src")),
@@ -1214,7 +1215,7 @@ impl ObjectSubclass for ProxySrc {
}
impl ObjectImpl for ProxySrc {
- fn set_property(&self, _obj: &glib::Object, id: usize, value: &glib::Value) {
+ fn set_property(&self, _obj: &Self::Type, id: usize, value: &glib::Value) {
let prop = &PROPERTIES_SRC[id];
let mut settings = self.settings.lock().unwrap();
@@ -1247,7 +1248,7 @@ impl ObjectImpl for ProxySrc {
}
}
- fn get_property(&self, _obj: &glib::Object, id: usize) -> Result<glib::Value, ()> {
+ fn get_property(&self, _obj: &Self::Type, id: usize) -> Result<glib::Value, ()> {
let prop = &PROPERTIES_SRC[id];
let settings = self.settings.lock().unwrap();
@@ -1262,20 +1263,19 @@ impl ObjectImpl for ProxySrc {
}
}
- fn constructed(&self, obj: &glib::Object) {
+ fn constructed(&self, obj: &Self::Type) {
self.parent_constructed(obj);
- let element = obj.downcast_ref::<gst::Element>().unwrap();
- element.add_pad(self.src_pad.gst_pad()).unwrap();
+ obj.add_pad(self.src_pad.gst_pad()).unwrap();
- super::set_element_flags(element, gst::ElementFlags::SOURCE);
+ crate::set_element_flags(obj, gst::ElementFlags::SOURCE);
}
}
impl ElementImpl for ProxySrc {
fn change_state(
&self,
- element: &gst::Element,
+ element: &Self::Type,
transition: gst::StateChange,
) -> Result<gst::StateChangeSuccess, gst::StateChangeError> {
gst_trace!(SRC_CAT, obj: element, "Changing state {:?}", transition);
@@ -1317,18 +1317,3 @@ impl ElementImpl for ProxySrc {
Ok(success)
}
}
-
-pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> {
- gst::Element::register(
- Some(plugin),
- "ts-proxysink",
- gst::Rank::None,
- ProxySink::get_type(),
- )?;
- gst::Element::register(
- Some(plugin),
- "ts-proxysrc",
- gst::Rank::None,
- ProxySrc::get_type(),
- )
-}
diff --git a/generic/threadshare/src/proxy/mod.rs b/generic/threadshare/src/proxy/mod.rs
new file mode 100644
index 000000000..2bf3b2dd3
--- /dev/null
+++ b/generic/threadshare/src/proxy/mod.rs
@@ -0,0 +1,54 @@
+// Copyright (C) 2018 Sebastian Dröge <sebastian@centricular.com>
+//
+// This library is free software; you can redistribute it and/or
+// modify it under the terms of the GNU Library General Public
+// License as published by the Free Software Foundation; either
+// version 2 of the License, or (at your option) any later version.
+//
+// This library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+// Library General Public License for more details.
+//
+// You should have received a copy of the GNU Library General Public
+// License along with this library; if not, write to the
+// Free Software Foundation, Inc., 51 Franklin Street, Suite 500,
+// Boston, MA 02110-1335, USA.
+
+use glib::glib_wrapper;
+use glib::prelude::*;
+
+mod imp;
+
+glib_wrapper! {
+ pub struct ProxySink(ObjectSubclass<imp::ProxySink>) @extends gst::Element, gst::Object;
+}
+
+// GStreamer elements need to be thread-safe. For the private implementation this is automatically
+// enforced but for the public wrapper type we need to specify this manually.
+unsafe impl Send for ProxySink {}
+unsafe impl Sync for ProxySink {}
+
+glib_wrapper! {
+ pub struct ProxySrc(ObjectSubclass<imp::ProxySrc>) @extends gst::Element, gst::Object;
+}
+
+// GStreamer elements need to be thread-safe. For the private implementation this is automatically
+// enforced but for the public wrapper type we need to specify this manually.
+unsafe impl Send for ProxySrc {}
+unsafe impl Sync for ProxySrc {}
+
+pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> {
+ gst::Element::register(
+ Some(plugin),
+ "ts-proxysink",
+ gst::Rank::None,
+ ProxySink::static_type(),
+ )?;
+ gst::Element::register(
+ Some(plugin),
+ "ts-proxysrc",
+ gst::Rank::None,
+ ProxySrc::static_type(),
+ )
+}
diff --git a/generic/threadshare/src/queue.rs b/generic/threadshare/src/queue/imp.rs
index e9fec3c71..28ac0a351 100644
--- a/generic/threadshare/src/queue.rs
+++ b/generic/threadshare/src/queue/imp.rs
@@ -37,7 +37,7 @@ use std::{u32, u64};
use crate::runtime::prelude::*;
use crate::runtime::{Context, PadSink, PadSinkRef, PadSrc, PadSrcRef, PadSrcWeak, Task};
-use super::dataqueue::{DataQueue, DataQueueItem};
+use crate::dataqueue::{DataQueue, DataQueueItem};
const DEFAULT_MAX_SIZE_BUFFERS: u32 = 200;
const DEFAULT_MAX_SIZE_BYTES: u32 = 1024 * 1024;
@@ -149,7 +149,7 @@ impl PadSinkHandler for QueuePadSinkHandler {
buffer: gst::Buffer,
) -> BoxFuture<'static, Result<gst::FlowSuccess, gst::FlowError>> {
let pad_weak = pad.downgrade();
- let element = element.clone();
+ let element = element.clone().downcast::<super::Queue>().unwrap();
async move {
let pad = pad_weak.upgrade().expect("PadSink no longer exists");
gst_log!(CAT, obj: pad.gst_pad(), "Handling {:?}", buffer);
@@ -169,7 +169,7 @@ impl PadSinkHandler for QueuePadSinkHandler {
list: gst::BufferList,
) -> BoxFuture<'static, Result<gst::FlowSuccess, gst::FlowError>> {
let pad_weak = pad.downgrade();
- let element = element.clone();
+ let element = element.clone().downcast::<super::Queue>().unwrap();
async move {
let pad = pad_weak.upgrade().expect("PadSink no longer exists");
gst_log!(CAT, obj: pad.gst_pad(), "Handling {:?}", list);
@@ -221,7 +221,7 @@ impl PadSinkHandler for QueuePadSinkHandler {
gst_log!(CAT, obj: pad.gst_pad(), "Handling serialized {:?}", event);
let pad_weak = pad.downgrade();
- let element = element.clone();
+ let element = element.clone().downcast::<super::Queue>().unwrap();
async move {
let pad = pad_weak.upgrade().expect("PadSink no longer exists");
let queue = Queue::from_instance(&element);
@@ -379,13 +379,13 @@ impl PadSrcHandler for QueuePadSrcHandler {
#[derive(Debug)]
struct QueueTask {
- element: gst::Element,
+ element: super::Queue,
src_pad: PadSrcWeak,
dataqueue: DataQueue,
}
impl QueueTask {
- fn new(element: &gst::Element, src_pad: &PadSrc, dataqueue: DataQueue) -> Self {
+ fn new(element: &super::Queue, src_pad: &PadSrc, dataqueue: DataQueue) -> Self {
QueueTask {
element: element.clone(),
src_pad: src_pad.downgrade(),
@@ -503,7 +503,7 @@ impl TaskImpl for QueueTask {
}
#[derive(Debug)]
-struct Queue {
+pub struct Queue {
sink_pad: PadSink,
src_pad: PadSrc,
task: Task,
@@ -561,7 +561,7 @@ impl Queue {
/* Schedules emptying of the pending queue. If there is an upstream
* TaskContext, the new task is spawned, it is otherwise
* returned, for the caller to block on */
- async fn schedule_pending_queue(&self, element: &gst::Element) {
+ async fn schedule_pending_queue(&self, element: &super::Queue) {
loop {
let more_queue_space_receiver = {
let dataqueue = self.dataqueue.lock().unwrap();
@@ -604,7 +604,7 @@ impl Queue {
async fn enqueue_item(
&self,
- element: &gst::Element,
+ element: &super::Queue,
item: DataQueueItem,
) -> Result<gst::FlowSuccess, gst::FlowError> {
let wait_fut = {
@@ -672,7 +672,7 @@ impl Queue {
*self.last_res.lock().unwrap()
}
- fn prepare(&self, element: &gst::Element) -> Result<(), gst::ErrorMessage> {
+ fn prepare(&self, element: &super::Queue) -> Result<(), gst::ErrorMessage> {
gst_debug!(CAT, obj: element, "Preparing");
let settings = self.settings.lock().unwrap().clone();
@@ -721,7 +721,7 @@ impl Queue {
Ok(())
}
- fn unprepare(&self, element: &gst::Element) {
+ fn unprepare(&self, element: &super::Queue) {
gst_debug!(CAT, obj: element, "Unpreparing");
self.task.unprepare().unwrap();
@@ -734,14 +734,14 @@ impl Queue {
gst_debug!(CAT, obj: element, "Unprepared");
}
- fn stop(&self, element: &gst::Element) -> Result<(), gst::ErrorMessage> {
+ fn stop(&self, element: &super::Queue) -> Result<(), gst::ErrorMessage> {
gst_debug!(CAT, obj: element, "Stopping");
self.task.stop()?;
gst_debug!(CAT, obj: element, "Stopped");
Ok(())
}
- fn start(&self, element: &gst::Element) -> Result<(), gst::ErrorMessage> {
+ fn start(&self, element: &super::Queue) -> Result<(), gst::ErrorMessage> {
gst_debug!(CAT, obj: element, "Starting");
self.task.start()?;
gst_debug!(CAT, obj: element, "Started");
@@ -751,13 +751,14 @@ impl Queue {
impl ObjectSubclass for Queue {
const NAME: &'static str = "RsTsQueue";
+ type Type = super::Queue;
type ParentType = gst::Element;
type Instance = gst::subclass::ElementInstanceStruct<Self>;
type Class = subclass::simple::ClassStruct<Self>;
glib_object_subclass!();
- fn class_init(klass: &mut subclass::simple::ClassStruct<Self>) {
+ fn class_init(klass: &mut Self::Class) {
klass.set_metadata(
"Thread-sharing queue",
"Generic",
@@ -788,7 +789,7 @@ impl ObjectSubclass for Queue {
klass.install_properties(&PROPERTIES);
}
- fn with_class(klass: &subclass::simple::ClassStruct<Self>) -> Self {
+ fn with_class(klass: &Self::Class) -> Self {
Self {
sink_pad: PadSink::new(
gst::Pad::from_template(&klass.get_pad_template("sink").unwrap(), Some("sink")),
@@ -808,7 +809,7 @@ impl ObjectSubclass for Queue {
}
impl ObjectImpl for Queue {
- fn set_property(&self, _obj: &glib::Object, id: usize, value: &glib::Value) {
+ fn set_property(&self, _obj: &Self::Type, id: usize, value: &glib::Value) {
let prop = &PROPERTIES[id];
let mut settings = self.settings.lock().unwrap();
@@ -835,7 +836,7 @@ impl ObjectImpl for Queue {
}
}
- fn get_property(&self, _obj: &glib::Object, id: usize) -> Result<glib::Value, ()> {
+ fn get_property(&self, _obj: &Self::Type, id: usize) -> Result<glib::Value, ()> {
let prop = &PROPERTIES[id];
let settings = self.settings.lock().unwrap();
@@ -849,19 +850,18 @@ impl ObjectImpl for Queue {
}
}
- fn constructed(&self, obj: &glib::Object) {
+ fn constructed(&self, obj: &Self::Type) {
self.parent_constructed(obj);
- let element = obj.downcast_ref::<gst::Element>().unwrap();
- element.add_pad(self.sink_pad.gst_pad()).unwrap();
- element.add_pad(self.src_pad.gst_pad()).unwrap();
+ obj.add_pad(self.sink_pad.gst_pad()).unwrap();
+ obj.add_pad(self.src_pad.gst_pad()).unwrap();
}
}
impl ElementImpl for Queue {
fn change_state(
&self,
- element: &gst::Element,
+ element: &Self::Type,
transition: gst::StateChange,
) -> Result<gst::StateChangeSuccess, gst::StateChangeError> {
gst_trace!(CAT, obj: element, "Changing state {:?}", transition);
@@ -891,7 +891,3 @@ impl ElementImpl for Queue {
Ok(success)
}
}
-
-pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> {
- gst::Element::register(Some(plugin), "ts-queue", gst::Rank::None, Queue::get_type())
-}
diff --git a/generic/threadshare/src/queue/mod.rs b/generic/threadshare/src/queue/mod.rs
new file mode 100644
index 000000000..153939f9d
--- /dev/null
+++ b/generic/threadshare/src/queue/mod.rs
@@ -0,0 +1,39 @@
+// Copyright (C) 2018 Sebastian Dröge <sebastian@centricular.com>
+//
+// This library is free software; you can redistribute it and/or
+// modify it under the terms of the GNU Library General Public
+// License as published by the Free Software Foundation; either
+// version 2 of the License, or (at your option) any later version.
+//
+// This library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+// Library General Public License for more details.
+//
+// You should have received a copy of the GNU Library General Public
+// License along with this library; if not, write to the
+// Free Software Foundation, Inc., 51 Franklin Street, Suite 500,
+// Boston, MA 02110-1335, USA.
+
+use glib::glib_wrapper;
+use glib::prelude::*;
+
+mod imp;
+
+glib_wrapper! {
+ pub struct Queue(ObjectSubclass<imp::Queue>) @extends gst::Element, gst::Object;
+}
+
+// GStreamer elements need to be thread-safe. For the private implementation this is automatically
+// enforced but for the public wrapper type we need to specify this manually.
+unsafe impl Send for Queue {}
+unsafe impl Sync for Queue {}
+
+pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> {
+ gst::Element::register(
+ Some(plugin),
+ "ts-queue",
+ gst::Rank::None,
+ Queue::static_type(),
+ )
+}
diff --git a/generic/threadshare/src/runtime/pad.rs b/generic/threadshare/src/runtime/pad.rs
index 89e5e01d4..12da4d3b4 100644
--- a/generic/threadshare/src/runtime/pad.rs
+++ b/generic/threadshare/src/runtime/pad.rs
@@ -402,7 +402,11 @@ impl PadSrc {
},
move |imp, element| {
let this_ref = PadSrcRef::new(inner_arc);
- handler.src_activate(&this_ref, imp, element)
+ handler.src_activate(
+ &this_ref,
+ imp,
+ element.dynamic_cast_ref::<gst::Element>().unwrap(),
+ )
},
)
});
@@ -425,7 +429,13 @@ impl PadSrc {
move |imp, element| {
let this_ref = PadSrcRef::new(inner_arc);
this_ref.activate_mode_hook(mode, active)?;
- handler.src_activatemode(&this_ref, imp, element, mode, active)
+ handler.src_activatemode(
+ &this_ref,
+ imp,
+ element.dynamic_cast_ref::<gst::Element>().unwrap(),
+ mode,
+ active,
+ )
},
)
});
@@ -443,7 +453,12 @@ impl PadSrc {
|| Err(FlowError::Error),
move |imp, element| {
let this_ref = PadSrcRef::new(inner_arc);
- handler.src_event_full(&this_ref, imp, &element, event)
+ handler.src_event_full(
+ &this_ref,
+ imp,
+ element.dynamic_cast_ref::<gst::Element>().unwrap(),
+ event,
+ )
},
)
});
@@ -459,7 +474,7 @@ impl PadSrc {
move |imp, element| {
let this_ref = PadSrcRef::new(inner_arc);
if !query.is_serialized() {
- handler.src_query(&this_ref, imp, &element, query)
+ handler.src_query(&this_ref, imp, element.dynamic_cast_ref::<gst::Element>().unwrap(), query)
} else {
gst_fixme!(RUNTIME_CAT, obj: this_ref.gst_pad(), "Serialized Query not supported");
false
@@ -509,6 +524,8 @@ impl Deref for PadSrc {
/// [`pad` module]: index.html
pub trait PadSinkHandler: Clone + Send + Sync + 'static {
type ElementImpl: ElementImpl + ObjectSubclass;
+ // FIXME: Once associated type bounds are stable we should use ObjectSubclass::Type below
+ // instead of &gst::Element
fn sink_activate(
&self,
@@ -806,7 +823,11 @@ impl PadSink {
},
move |imp, element| {
let this_ref = PadSinkRef::new(inner_arc);
- handler.sink_activate(&this_ref, imp, element)
+ handler.sink_activate(
+ &this_ref,
+ imp,
+ element.dynamic_cast_ref::<gst::Element>().unwrap(),
+ )
},
)
});
@@ -830,7 +851,13 @@ impl PadSink {
let this_ref = PadSinkRef::new(inner_arc);
this_ref.activate_mode_hook(mode, active)?;
- handler.sink_activatemode(&this_ref, imp, element, mode, active)
+ handler.sink_activatemode(
+ &this_ref,
+ imp,
+ element.dynamic_cast_ref::<gst::Element>().unwrap(),
+ mode,
+ active,
+ )
},
)
});
@@ -848,10 +875,12 @@ impl PadSink {
if Context::current_has_sub_tasks() {
let this_weak = PadSinkWeak(Arc::downgrade(&inner_arc));
let handler = handler.clone();
- let element = element.clone();
+ let element =
+ element.clone().dynamic_cast::<gst::Element>().unwrap();
let delayed_fut = async move {
- let imp =
- <H::ElementImpl as ObjectSubclass>::from_instance(&element);
+ let imp = <H::ElementImpl as ObjectSubclass>::from_instance(
+ element.unsafe_cast_ref(),
+ );
let this_ref =
this_weak.upgrade().ok_or(gst::FlowError::Flushing)?;
handler.sink_chain(&this_ref, imp, &element, buffer).await
@@ -861,8 +890,12 @@ impl PadSink {
Ok(gst::FlowSuccess::Ok)
} else {
let this_ref = PadSinkRef::new(inner_arc);
- let chain_fut =
- handler.sink_chain(&this_ref, imp, &element, buffer);
+ let chain_fut = handler.sink_chain(
+ &this_ref,
+ imp,
+ element.dynamic_cast_ref::<gst::Element>().unwrap(),
+ buffer,
+ );
this_ref.handle_future(chain_fut)
}
},
@@ -882,10 +915,12 @@ impl PadSink {
if Context::current_has_sub_tasks() {
let this_weak = PadSinkWeak(Arc::downgrade(&inner_arc));
let handler = handler.clone();
- let element = element.clone();
+ let element =
+ element.clone().dynamic_cast::<gst::Element>().unwrap();
let delayed_fut = async move {
- let imp =
- <H::ElementImpl as ObjectSubclass>::from_instance(&element);
+ let imp = <H::ElementImpl as ObjectSubclass>::from_instance(
+ element.unsafe_cast_ref(),
+ );
let this_ref =
this_weak.upgrade().ok_or(gst::FlowError::Flushing)?;
handler
@@ -897,8 +932,12 @@ impl PadSink {
Ok(gst::FlowSuccess::Ok)
} else {
let this_ref = PadSinkRef::new(inner_arc);
- let chain_list_fut =
- handler.sink_chain_list(&this_ref, imp, &element, list);
+ let chain_list_fut = handler.sink_chain_list(
+ &this_ref,
+ imp,
+ element.dynamic_cast_ref::<gst::Element>().unwrap(),
+ list,
+ );
this_ref.handle_future(chain_list_fut)
}
},
@@ -921,10 +960,11 @@ impl PadSink {
if Context::current_has_sub_tasks() {
let this_weak = PadSinkWeak(Arc::downgrade(&inner_arc));
let handler = handler.clone();
- let element = element.clone();
+ let element =
+ element.clone().dynamic_cast::<gst::Element>().unwrap();
let delayed_fut = async move {
let imp = <H::ElementImpl as ObjectSubclass>::from_instance(
- &element,
+ element.unsafe_cast_ref(),
);
let this_ref =
this_weak.upgrade().ok_or(gst::FlowError::Flushing)?;
@@ -942,13 +982,21 @@ impl PadSink {
} else {
let this_ref = PadSinkRef::new(inner_arc);
let event_fut = handler.sink_event_full_serialized(
- &this_ref, imp, &element, event,
+ &this_ref,
+ imp,
+ element.dynamic_cast_ref::<gst::Element>().unwrap(),
+ event,
);
this_ref.handle_future(event_fut)
}
} else {
let this_ref = PadSinkRef::new(inner_arc);
- handler.sink_event_full(&this_ref, imp, &element, event)
+ handler.sink_event_full(
+ &this_ref,
+ imp,
+ element.dynamic_cast_ref::<gst::Element>().unwrap(),
+ event,
+ )
}
},
)
@@ -965,7 +1013,7 @@ impl PadSink {
move |imp, element| {
let this_ref = PadSinkRef::new(inner_arc);
if !query.is_serialized() {
- handler.sink_query(&this_ref, imp, &element, query)
+ handler.sink_query(&this_ref, imp, element.dynamic_cast_ref::<gst::Element>().unwrap(), query)
} else {
gst_fixme!(RUNTIME_CAT, obj: this_ref.gst_pad(), "Serialized Query not supported");
false
diff --git a/generic/threadshare/src/tcpclientsrc.rs b/generic/threadshare/src/tcpclientsrc/imp.rs
index b5e8e967e..809f47d1b 100644
--- a/generic/threadshare/src/tcpclientsrc.rs
+++ b/generic/threadshare/src/tcpclientsrc/imp.rs
@@ -44,7 +44,7 @@ use crate::runtime::prelude::*;
use crate::runtime::task;
use crate::runtime::{Context, PadSrc, PadSrcRef, PadSrcWeak, Task, TaskState};
-use super::socket::{Socket, SocketError, SocketRead};
+use crate::socket::{Socket, SocketError, SocketRead};
const DEFAULT_HOST: Option<&str> = Some("127.0.0.1");
const DEFAULT_PORT: i32 = 4953;
@@ -139,7 +139,7 @@ static PROPERTIES: [subclass::Property; 6] = [
}),
];
-pub struct TcpClientReader(tokio::net::TcpStream);
+struct TcpClientReader(tokio::net::TcpStream);
impl TcpClientReader {
pub fn new(socket: tokio::net::TcpStream) -> Self {
@@ -201,7 +201,7 @@ impl TcpClientSrcPadHandler {
self.0.state.lock().await.need_segment = true;
}
- async fn push_prelude(&self, pad: &PadSrcRef<'_>, _element: &gst::Element) {
+ async fn push_prelude(&self, pad: &PadSrcRef<'_>, _element: &super::TcpClientSrc) {
let mut state = self.0.state.lock().await;
if state.need_initial_events {
gst_debug!(CAT, obj: pad.gst_pad(), "Pushing initial events");
@@ -232,7 +232,7 @@ impl TcpClientSrcPadHandler {
async fn push_buffer(
&self,
pad: &PadSrcRef<'_>,
- element: &gst::Element,
+ element: &super::TcpClientSrc,
buffer: gst::Buffer,
) -> Result<gst::FlowSuccess, gst::FlowError> {
gst_log!(CAT, obj: pad.gst_pad(), "Handling {:?}", buffer);
@@ -328,7 +328,7 @@ impl PadSrcHandler for TcpClientSrcPadHandler {
}
struct TcpClientSrcTask {
- element: gst::Element,
+ element: super::TcpClientSrc,
src_pad: PadSrcWeak,
src_pad_handler: TcpClientSrcPadHandler,
saddr: SocketAddr,
@@ -338,7 +338,7 @@ struct TcpClientSrcTask {
impl TcpClientSrcTask {
fn new(
- element: &gst::Element,
+ element: &super::TcpClientSrc,
src_pad: &PadSrc,
src_pad_handler: &TcpClientSrcPadHandler,
saddr: SocketAddr,
@@ -371,7 +371,7 @@ impl TaskImpl for TcpClientSrcTask {
self.socket = Some(
Socket::try_new(
- self.element.clone(),
+ self.element.clone().upcast(),
self.buffer_pool.take().unwrap(),
TcpClientReader::new(socket),
)
@@ -496,7 +496,7 @@ impl TaskImpl for TcpClientSrcTask {
}
}
-struct TcpClientSrc {
+pub struct TcpClientSrc {
src_pad: PadSrc,
src_pad_handler: TcpClientSrcPadHandler,
task: Task,
@@ -512,7 +512,7 @@ lazy_static! {
}
impl TcpClientSrc {
- fn prepare(&self, element: &gst::Element) -> Result<(), gst::ErrorMessage> {
+ fn prepare(&self, element: &super::TcpClientSrc) -> Result<(), gst::ErrorMessage> {
let settings = self.settings.lock().unwrap().clone();
gst_debug!(CAT, obj: element, "Preparing");
@@ -581,27 +581,27 @@ impl TcpClientSrc {
Ok(())
}
- fn unprepare(&self, element: &gst::Element) {
+ fn unprepare(&self, element: &super::TcpClientSrc) {
gst_debug!(CAT, obj: element, "Unpreparing");
self.task.unprepare().unwrap();
gst_debug!(CAT, obj: element, "Unprepared");
}
- fn stop(&self, element: &gst::Element) -> Result<(), gst::ErrorMessage> {
+ fn stop(&self, element: &super::TcpClientSrc) -> Result<(), gst::ErrorMessage> {
gst_debug!(CAT, obj: element, "Stopping");
self.task.stop()?;
gst_debug!(CAT, obj: element, "Stopped");
Ok(())
}
- fn start(&self, element: &gst::Element) -> Result<(), gst::ErrorMessage> {
+ fn start(&self, element: &super::TcpClientSrc) -> Result<(), gst::ErrorMessage> {
gst_debug!(CAT, obj: element, "Starting");
self.task.start()?;
gst_debug!(CAT, obj: element, "Started");
Ok(())
}
- fn pause(&self, element: &gst::Element) -> Result<(), gst::ErrorMessage> {
+ fn pause(&self, element: &super::TcpClientSrc) -> Result<(), gst::ErrorMessage> {
gst_debug!(CAT, obj: element, "Pausing");
self.task.pause()?;
gst_debug!(CAT, obj: element, "Paused");
@@ -611,13 +611,14 @@ impl TcpClientSrc {
impl ObjectSubclass for TcpClientSrc {
const NAME: &'static str = "RsTsTcpClientSrc";
+ type Type = super::TcpClientSrc;
type ParentType = gst::Element;
type Instance = gst::subclass::ElementInstanceStruct<Self>;
type Class = subclass::simple::ClassStruct<Self>;
glib_object_subclass!();
- fn class_init(klass: &mut subclass::simple::ClassStruct<Self>) {
+ fn class_init(klass: &mut Self::Class) {
klass.set_metadata(
"Thread-sharing TCP client source",
"Source/Network",
@@ -638,7 +639,7 @@ impl ObjectSubclass for TcpClientSrc {
klass.install_properties(&PROPERTIES);
}
- fn with_class(klass: &subclass::simple::ClassStruct<Self>) -> Self {
+ fn with_class(klass: &Self::Class) -> Self {
let src_pad_handler = TcpClientSrcPadHandler::default();
Self {
@@ -654,7 +655,7 @@ impl ObjectSubclass for TcpClientSrc {
}
impl ObjectImpl for TcpClientSrc {
- fn set_property(&self, _obj: &glib::Object, id: usize, value: &glib::Value) {
+ fn set_property(&self, _obj: &Self::Type, id: usize, value: &glib::Value) {
let prop = &PROPERTIES[id];
let mut settings = self.settings.lock().unwrap();
@@ -684,7 +685,7 @@ impl ObjectImpl for TcpClientSrc {
}
}
- fn get_property(&self, _obj: &glib::Object, id: usize) -> Result<glib::Value, ()> {
+ fn get_property(&self, _obj: &Self::Type, id: usize) -> Result<glib::Value, ()> {
let prop = &PROPERTIES[id];
let settings = self.settings.lock().unwrap();
@@ -699,20 +700,19 @@ impl ObjectImpl for TcpClientSrc {
}
}
- fn constructed(&self, obj: &glib::Object) {
+ fn constructed(&self, obj: &Self::Type) {
self.parent_constructed(obj);
- let element = obj.downcast_ref::<gst::Element>().unwrap();
- element.add_pad(self.src_pad.gst_pad()).unwrap();
+ obj.add_pad(self.src_pad.gst_pad()).unwrap();
- super::set_element_flags(element, gst::ElementFlags::SOURCE);
+ crate::set_element_flags(obj, gst::ElementFlags::SOURCE);
}
}
impl ElementImpl for TcpClientSrc {
fn change_state(
&self,
- element: &gst::Element,
+ element: &Self::Type,
transition: gst::StateChange,
) -> Result<gst::StateChangeSuccess, gst::StateChangeError> {
gst_trace!(CAT, obj: element, "Changing state {:?}", transition);
@@ -754,12 +754,3 @@ impl ElementImpl for TcpClientSrc {
Ok(success)
}
}
-
-pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> {
- gst::Element::register(
- Some(plugin),
- "ts-tcpclientsrc",
- gst::Rank::None,
- TcpClientSrc::get_type(),
- )
-}
diff --git a/generic/threadshare/src/tcpclientsrc/mod.rs b/generic/threadshare/src/tcpclientsrc/mod.rs
new file mode 100644
index 000000000..c78ba826a
--- /dev/null
+++ b/generic/threadshare/src/tcpclientsrc/mod.rs
@@ -0,0 +1,40 @@
+// Copyright (C) 2018 Sebastian Dröge <sebastian@centricular.com>
+// Copyright (C) 2018 LEE Dongjun <redongjun@gmail.com>
+//
+// This library is free software; you can redistribute it and/or
+// modify it under the terms of the GNU Library General Public
+// License as published by the Free Software Foundation; either
+// version 2 of the License, or (at your option) any later version.
+//
+// This library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+// Library General Public License for more details.
+//
+// You should have received a copy of the GNU Library General Public
+// License along with this library; if not, write to the
+// Free Software Foundation, Inc., 51 Franklin Street, Suite 500,
+// Boston, MA 02110-1335, USA.
+
+use glib::glib_wrapper;
+use glib::prelude::*;
+
+mod imp;
+
+glib_wrapper! {
+ pub struct TcpClientSrc(ObjectSubclass<imp::TcpClientSrc>) @extends gst::Element, gst::Object;
+}
+
+// GStreamer elements need to be thread-safe. For the private implementation this is automatically
+// enforced but for the public wrapper type we need to specify this manually.
+unsafe impl Send for TcpClientSrc {}
+unsafe impl Sync for TcpClientSrc {}
+
+pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> {
+ gst::Element::register(
+ Some(plugin),
+ "ts-tcpclientsrc",
+ gst::Rank::None,
+ TcpClientSrc::static_type(),
+ )
+}
diff --git a/generic/threadshare/src/udpsink.rs b/generic/threadshare/src/udpsink/imp.rs
index 35ea4c599..064d9f6c3 100644
--- a/generic/threadshare/src/udpsink.rs
+++ b/generic/threadshare/src/udpsink/imp.rs
@@ -562,7 +562,7 @@ impl UdpSinkPadHandler {
async fn render(
&self,
- element: &gst::Element,
+ element: &super::UdpSink,
buffer: gst::Buffer,
) -> Result<gst::FlowSuccess, gst::FlowError> {
let (
@@ -691,7 +691,7 @@ impl UdpSinkPadHandler {
}
/* Wait until specified time */
- async fn sync(&self, element: &gst::Element, running_time: gst::ClockTime) {
+ async fn sync(&self, element: &super::UdpSink, running_time: gst::ClockTime) {
let now = element.get_current_running_time();
if let Some(delay) = running_time
@@ -702,7 +702,7 @@ impl UdpSinkPadHandler {
}
}
- async fn handle_event(&self, element: &gst::Element, event: gst::Event) {
+ async fn handle_event(&self, element: &super::UdpSink, event: gst::Event) {
match event.view() {
EventView::Eos(_) => {
let _ = element.post_message(gst::message::Eos::builder().src(element).build());
@@ -726,7 +726,7 @@ impl PadSinkHandler for UdpSinkPadHandler {
buffer: gst::Buffer,
) -> BoxFuture<'static, Result<gst::FlowSuccess, gst::FlowError>> {
let sender = Arc::clone(&self.0.read().unwrap().sender);
- let element = element.clone();
+ let element = element.clone().downcast::<super::UdpSink>().unwrap();
async move {
if let Some(sender) = sender.lock().await.as_mut() {
@@ -748,7 +748,7 @@ impl PadSinkHandler for UdpSinkPadHandler {
list: gst::BufferList,
) -> BoxFuture<'static, Result<gst::FlowSuccess, gst::FlowError>> {
let sender = Arc::clone(&self.0.read().unwrap().sender);
- let element = element.clone();
+ let element = element.clone().downcast::<super::UdpSink>().unwrap();
async move {
if let Some(sender) = sender.lock().await.as_mut() {
@@ -773,7 +773,7 @@ impl PadSinkHandler for UdpSinkPadHandler {
event: gst::Event,
) -> BoxFuture<'static, bool> {
let sender = Arc::clone(&self.0.read().unwrap().sender);
- let element = element.clone();
+ let element = element.clone().downcast::<super::UdpSink>().unwrap();
async move {
if let EventView::FlushStop(_) = event.view() {
@@ -807,13 +807,13 @@ impl PadSinkHandler for UdpSinkPadHandler {
#[derive(Debug)]
struct UdpSinkTask {
- element: gst::Element,
+ element: super::UdpSink,
sink_pad_handler: UdpSinkPadHandler,
receiver: Option<mpsc::Receiver<TaskItem>>,
}
impl UdpSinkTask {
- fn new(element: &gst::Element, sink_pad_handler: &UdpSinkPadHandler) -> Self {
+ fn new(element: &super::UdpSink, sink_pad_handler: &UdpSinkPadHandler) -> Self {
UdpSinkTask {
element: element.clone(),
sink_pad_handler: sink_pad_handler.clone(),
@@ -877,7 +877,7 @@ enum SocketFamily {
}
#[derive(Debug)]
-struct UdpSink {
+pub struct UdpSink {
sink_pad: PadSink,
sink_pad_handler: UdpSinkPadHandler,
task: Task,
@@ -889,7 +889,7 @@ impl UdpSink {
&self,
family: SocketFamily,
context: &Context,
- element: &gst::Element,
+ element: &super::UdpSink,
) -> Result<(), gst::ErrorMessage> {
let mut settings = self.settings.lock().unwrap();
@@ -1028,7 +1028,7 @@ impl UdpSink {
Ok(())
}
- fn prepare(&self, element: &gst::Element) -> Result<(), gst::ErrorMessage> {
+ fn prepare(&self, element: &super::UdpSink) -> Result<(), gst::ErrorMessage> {
gst_debug!(CAT, obj: element, "Preparing");
let context = {
@@ -1060,7 +1060,7 @@ impl UdpSink {
Ok(())
}
- fn unprepare(&self, element: &gst::Element) {
+ fn unprepare(&self, element: &super::UdpSink) {
gst_debug!(CAT, obj: element, "Unpreparing");
self.task.unprepare().unwrap();
@@ -1069,14 +1069,14 @@ impl UdpSink {
gst_debug!(CAT, obj: element, "Unprepared");
}
- fn stop(&self, element: &gst::Element) -> Result<(), gst::ErrorMessage> {
+ fn stop(&self, element: &super::UdpSink) -> Result<(), gst::ErrorMessage> {
gst_debug!(CAT, obj: element, "Stopping");
self.task.stop()?;
gst_debug!(CAT, obj: element, "Stopped");
Ok(())
}
- fn start(&self, element: &gst::Element) -> Result<(), gst::ErrorMessage> {
+ fn start(&self, element: &super::UdpSink) -> Result<(), gst::ErrorMessage> {
gst_debug!(CAT, obj: element, "Starting");
self.task.start()?;
gst_debug!(CAT, obj: element, "Started");
@@ -1101,7 +1101,7 @@ impl UdpSink {
}
}
-fn try_into_socket_addr(element: &gst::Element, host: &str, port: i32) -> Result<SocketAddr, ()> {
+fn try_into_socket_addr(element: &super::UdpSink, host: &str, port: i32) -> Result<SocketAddr, ()> {
let addr: IpAddr = match host.parse() {
Err(err) => {
gst_error!(CAT, obj: element, "Failed to parse host {}: {}", host, err);
@@ -1123,13 +1123,14 @@ fn try_into_socket_addr(element: &gst::Element, host: &str, port: i32) -> Result
impl ObjectSubclass for UdpSink {
const NAME: &'static str = "RsTsUdpSink";
+ type Type = super::UdpSink;
type ParentType = gst::Element;
type Instance = gst::subclass::ElementInstanceStruct<Self>;
type Class = subclass::simple::ClassStruct<Self>;
glib_object_subclass!();
- fn class_init(klass: &mut subclass::simple::ClassStruct<Self>) {
+ fn class_init(klass: &mut Self::Class) {
klass.set_metadata(
"Thread-sharing UDP sink",
"Sink/Network",
@@ -1154,7 +1155,7 @@ impl ObjectSubclass for UdpSink {
glib::types::Type::Unit,
|_, args| {
let element = args[0]
- .get::<gst::Element>()
+ .get::<super::UdpSink>()
.expect("signal arg")
.expect("missing signal arg");
let host = args[1]
@@ -1182,7 +1183,7 @@ impl ObjectSubclass for UdpSink {
glib::types::Type::Unit,
|_, args| {
let element = args[0]
- .get::<gst::Element>()
+ .get::<super::UdpSink>()
.expect("signal arg")
.expect("missing signal arg");
let host = args[1]
@@ -1211,7 +1212,7 @@ impl ObjectSubclass for UdpSink {
glib::types::Type::Unit,
|_, args| {
let element = args[0]
- .get::<gst::Element>()
+ .get::<super::UdpSink>()
.expect("signal arg")
.expect("missing signal arg");
@@ -1225,7 +1226,7 @@ impl ObjectSubclass for UdpSink {
klass.install_properties(&PROPERTIES);
}
- fn with_class(klass: &subclass::simple::ClassStruct<Self>) -> Self {
+ fn with_class(klass: &Self::Class) -> Self {
let settings = Arc::new(StdMutex::new(Settings::default()));
let sink_pad_handler = UdpSinkPadHandler::new(Arc::clone(&settings));
@@ -1242,9 +1243,8 @@ impl ObjectSubclass for UdpSink {
}
impl ObjectImpl for UdpSink {
- fn set_property(&self, obj: &glib::Object, id: usize, value: &glib::Value) {
+ fn set_property(&self, obj: &Self::Type, id: usize, value: &glib::Value) {
let prop = &PROPERTIES[id];
- let element = obj.downcast_ref::<gst::Element>().unwrap();
let mut settings = self.settings.lock().unwrap();
match *prop {
@@ -1315,15 +1315,9 @@ impl ObjectImpl for UdpSink {
rsplit[0]
.parse::<i32>()
.map_err(|err| {
- gst_error!(
- CAT,
- obj: element,
- "Invalid port {}: {}",
- rsplit[0],
- err
- );
+ gst_error!(CAT, obj: obj, "Invalid port {}: {}", rsplit[0], err);
})
- .and_then(|port| try_into_socket_addr(&element, rsplit[1], port))
+ .and_then(|port| try_into_socket_addr(&obj, rsplit[1], port))
.ok()
} else {
None
@@ -1346,7 +1340,7 @@ impl ObjectImpl for UdpSink {
}
}
- fn get_property(&self, _obj: &glib::Object, id: usize) -> Result<glib::Value, ()> {
+ fn get_property(&self, _obj: &Self::Type, id: usize) -> Result<glib::Value, ()> {
let prop = &PROPERTIES[id];
let settings = self.settings.lock().unwrap();
@@ -1399,20 +1393,19 @@ impl ObjectImpl for UdpSink {
}
}
- fn constructed(&self, obj: &glib::Object) {
+ fn constructed(&self, obj: &Self::Type) {
self.parent_constructed(obj);
- let element = obj.downcast_ref::<gst::Element>().unwrap();
- element.add_pad(self.sink_pad.gst_pad()).unwrap();
+ obj.add_pad(self.sink_pad.gst_pad()).unwrap();
- super::set_element_flags(element, gst::ElementFlags::SINK);
+ crate::set_element_flags(obj, gst::ElementFlags::SINK);
}
}
impl ElementImpl for UdpSink {
fn change_state(
&self,
- element: &gst::Element,
+ element: &Self::Type,
transition: gst::StateChange,
) -> Result<gst::StateChangeSuccess, gst::StateChangeError> {
gst_trace!(CAT, obj: element, "Changing state {:?}", transition);
@@ -1439,7 +1432,7 @@ impl ElementImpl for UdpSink {
self.parent_change_state(element, transition)
}
- fn send_event(&self, _element: &gst::Element, event: gst::Event) -> bool {
+ fn send_event(&self, _element: &Self::Type, event: gst::Event) -> bool {
match event.view() {
EventView::Latency(ev) => {
self.sink_pad_handler.set_latency(ev.get_latency());
@@ -1450,12 +1443,3 @@ impl ElementImpl for UdpSink {
}
}
}
-
-pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> {
- gst::Element::register(
- Some(plugin),
- "ts-udpsink",
- gst::Rank::None,
- UdpSink::get_type(),
- )
-}
diff --git a/generic/threadshare/src/udpsink/mod.rs b/generic/threadshare/src/udpsink/mod.rs
new file mode 100644
index 000000000..da0729e93
--- /dev/null
+++ b/generic/threadshare/src/udpsink/mod.rs
@@ -0,0 +1,39 @@
+// Copyright (C) 2019 Mathieu Duponchelle <mathieu@centricular.com>
+//
+// This library is free software; you can redistribute it and/or
+// modify it under the terms of the GNU Library General Public
+// License as published by the Free Software Foundation; either
+// version 2 of the License, or (at your option) any later version.
+//
+// This library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+// Library General Public License for more details.
+//
+// You should have received a copy of the GNU Library General Public
+// License along with this library; if not, write to the
+// Free Software Foundation, Inc., 51 Franklin Street, Suite 500,
+// Boston, MA 02110-1335, USA.
+
+use glib::glib_wrapper;
+use glib::prelude::*;
+
+mod imp;
+
+glib_wrapper! {
+ pub struct UdpSink(ObjectSubclass<imp::UdpSink>) @extends gst::Element, gst::Object;
+}
+
+// GStreamer elements need to be thread-safe. For the private implementation this is automatically
+// enforced but for the public wrapper type we need to specify this manually.
+unsafe impl Send for UdpSink {}
+unsafe impl Sync for UdpSink {}
+
+pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> {
+ gst::Element::register(
+ Some(plugin),
+ "ts-udpsink",
+ gst::Rank::None,
+ UdpSink::static_type(),
+ )
+}
diff --git a/generic/threadshare/src/udpsrc.rs b/generic/threadshare/src/udpsrc/imp.rs
index 2572f2f0f..f70c1e732 100644
--- a/generic/threadshare/src/udpsrc.rs
+++ b/generic/threadshare/src/udpsrc/imp.rs
@@ -41,7 +41,7 @@ use std::u16;
use crate::runtime::prelude::*;
use crate::runtime::{Context, PadSrc, PadSrcRef, PadSrcWeak, Task};
-use super::socket::{wrap_socket, GioSocketWrapper, Socket, SocketError, SocketRead};
+use crate::socket::{wrap_socket, GioSocketWrapper, Socket, SocketError, SocketRead};
const DEFAULT_ADDRESS: Option<&str> = Some("0.0.0.0");
const DEFAULT_PORT: i32 = 5000;
@@ -185,7 +185,7 @@ static PROPERTIES: [subclass::Property; 10] = [
];
#[derive(Debug)]
-pub struct UdpReader(tokio::net::UdpSocket);
+struct UdpReader(tokio::net::UdpSocket);
impl UdpReader {
fn new(socket: tokio::net::UdpSocket) -> Self {
@@ -254,7 +254,7 @@ impl UdpSrcPadHandler {
self.0.state.lock().await.need_segment = true;
}
- async fn push_prelude(&self, pad: &PadSrcRef<'_>, _element: &gst::Element) {
+ async fn push_prelude(&self, pad: &PadSrcRef<'_>, _element: &super::UdpSrc) {
let mut state = self.0.state.lock().await;
if state.need_initial_events {
gst_debug!(CAT, obj: pad.gst_pad(), "Pushing initial events");
@@ -285,7 +285,7 @@ impl UdpSrcPadHandler {
async fn push_buffer(
&self,
pad: &PadSrcRef<'_>,
- element: &gst::Element,
+ element: &super::UdpSrc,
buffer: gst::Buffer,
) -> Result<gst::FlowSuccess, gst::FlowError> {
gst_log!(CAT, obj: pad.gst_pad(), "Handling {:?}", buffer);
@@ -377,7 +377,7 @@ impl PadSrcHandler for UdpSrcPadHandler {
}
struct UdpSrcTask {
- element: gst::Element,
+ element: super::UdpSrc,
src_pad: PadSrcWeak,
src_pad_handler: UdpSrcPadHandler,
socket: Socket<UdpReader>,
@@ -385,7 +385,7 @@ struct UdpSrcTask {
impl UdpSrcTask {
fn new(
- element: &gst::Element,
+ element: &super::UdpSrc,
src_pad: &PadSrc,
src_pad_handler: &UdpSrcPadHandler,
socket: Socket<UdpReader>,
@@ -510,7 +510,7 @@ impl TaskImpl for UdpSrcTask {
}
}
-struct UdpSrc {
+pub struct UdpSrc {
src_pad: PadSrc,
src_pad_handler: UdpSrcPadHandler,
task: Task,
@@ -526,7 +526,7 @@ lazy_static! {
}
impl UdpSrc {
- fn prepare(&self, element: &gst::Element) -> Result<(), gst::ErrorMessage> {
+ fn prepare(&self, element: &super::UdpSrc) -> Result<(), gst::ErrorMessage> {
let mut settings_guard = self.settings.lock().unwrap();
gst_debug!(CAT, obj: element, "Preparing");
@@ -719,13 +719,17 @@ impl UdpSrc {
)
})?;
- let socket = Socket::try_new(element.clone(), buffer_pool, UdpReader::new(socket))
- .map_err(|err| {
- gst_error_msg!(
- gst::ResourceError::OpenRead,
- ["Failed to prepare socket {:?}", err]
- )
- })?;
+ let socket = Socket::try_new(
+ element.clone().upcast(),
+ buffer_pool,
+ UdpReader::new(socket),
+ )
+ .map_err(|err| {
+ gst_error_msg!(
+ gst::ResourceError::OpenRead,
+ ["Failed to prepare socket {:?}", err]
+ )
+ })?;
element.notify("used-socket");
@@ -749,7 +753,7 @@ impl UdpSrc {
Ok(())
}
- fn unprepare(&self, element: &gst::Element) {
+ fn unprepare(&self, element: &super::UdpSrc) {
gst_debug!(CAT, obj: element, "Unpreparing");
self.settings.lock().unwrap().used_socket = None;
@@ -760,21 +764,21 @@ impl UdpSrc {
gst_debug!(CAT, obj: element, "Unprepared");
}
- fn stop(&self, element: &gst::Element) -> Result<(), gst::ErrorMessage> {
+ fn stop(&self, element: &super::UdpSrc) -> Result<(), gst::ErrorMessage> {
gst_debug!(CAT, obj: element, "Stopping");
self.task.stop()?;
gst_debug!(CAT, obj: element, "Stopped");
Ok(())
}
- fn start(&self, element: &gst::Element) -> Result<(), gst::ErrorMessage> {
+ fn start(&self, element: &super::UdpSrc) -> Result<(), gst::ErrorMessage> {
gst_debug!(CAT, obj: element, "Starting");
self.task.start()?;
gst_debug!(CAT, obj: element, "Started");
Ok(())
}
- fn pause(&self, element: &gst::Element) -> Result<(), gst::ErrorMessage> {
+ fn pause(&self, element: &super::UdpSrc) -> Result<(), gst::ErrorMessage> {
gst_debug!(CAT, obj: element, "Pausing");
self.task.pause()?;
gst_debug!(CAT, obj: element, "Paused");
@@ -784,13 +788,14 @@ impl UdpSrc {
impl ObjectSubclass for UdpSrc {
const NAME: &'static str = "RsTsUdpSrc";
+ type Type = super::UdpSrc;
type ParentType = gst::Element;
type Instance = gst::subclass::ElementInstanceStruct<Self>;
type Class = subclass::simple::ClassStruct<Self>;
glib_object_subclass!();
- fn class_init(klass: &mut subclass::simple::ClassStruct<Self>) {
+ fn class_init(klass: &mut Self::Class) {
klass.set_metadata(
"Thread-sharing UDP source",
"Source/Network",
@@ -827,7 +832,7 @@ impl ObjectSubclass for UdpSrc {
}
}
- fn with_class(klass: &subclass::simple::ClassStruct<Self>) -> Self {
+ fn with_class(klass: &Self::Class) -> Self {
let src_pad_handler = UdpSrcPadHandler::default();
Self {
@@ -843,7 +848,7 @@ impl ObjectSubclass for UdpSrc {
}
impl ObjectImpl for UdpSrc {
- fn set_property(&self, _obj: &glib::Object, id: usize, value: &glib::Value) {
+ fn set_property(&self, _obj: &Self::Type, id: usize, value: &glib::Value) {
let prop = &PROPERTIES[id];
let mut settings = self.settings.lock().unwrap();
@@ -888,7 +893,7 @@ impl ObjectImpl for UdpSrc {
}
}
- fn get_property(&self, _obj: &glib::Object, id: usize) -> Result<glib::Value, ()> {
+ fn get_property(&self, _obj: &Self::Type, id: usize) -> Result<glib::Value, ()> {
let prop = &PROPERTIES[id];
let settings = self.settings.lock().unwrap();
@@ -917,20 +922,19 @@ impl ObjectImpl for UdpSrc {
}
}
- fn constructed(&self, obj: &glib::Object) {
+ fn constructed(&self, obj: &Self::Type) {
self.parent_constructed(obj);
- let element = obj.downcast_ref::<gst::Element>().unwrap();
- element.add_pad(self.src_pad.gst_pad()).unwrap();
+ obj.add_pad(self.src_pad.gst_pad()).unwrap();
- super::set_element_flags(element, gst::ElementFlags::SOURCE);
+ crate::set_element_flags(obj, gst::ElementFlags::SOURCE);
}
}
impl ElementImpl for UdpSrc {
fn change_state(
&self,
- element: &gst::Element,
+ element: &Self::Type,
transition: gst::StateChange,
) -> Result<gst::StateChangeSuccess, gst::StateChangeError> {
gst_trace!(CAT, obj: element, "Changing state {:?}", transition);
@@ -972,12 +976,3 @@ impl ElementImpl for UdpSrc {
Ok(success)
}
}
-
-pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> {
- gst::Element::register(
- Some(plugin),
- "ts-udpsrc",
- gst::Rank::None,
- UdpSrc::get_type(),
- )
-}
diff --git a/generic/threadshare/src/udpsrc/mod.rs b/generic/threadshare/src/udpsrc/mod.rs
new file mode 100644
index 000000000..19ebd2e1d
--- /dev/null
+++ b/generic/threadshare/src/udpsrc/mod.rs
@@ -0,0 +1,39 @@
+// Copyright (C) 2018 Sebastian Dröge <sebastian@centricular.com>
+//
+// This library is free software; you can redistribute it and/or
+// modify it under the terms of the GNU Library General Public
+// License as published by the Free Software Foundation; either
+// version 2 of the License, or (at your option) any later version.
+//
+// This library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+// Library General Public License for more details.
+//
+// You should have received a copy of the GNU Library General Public
+// License along with this library; if not, write to the
+// Free Software Foundation, Inc., 51 Franklin Street, Suite 500,
+// Boston, MA 02110-1335, USA.
+
+use glib::glib_wrapper;
+use glib::prelude::*;
+
+mod imp;
+
+glib_wrapper! {
+ pub struct UdpSrc(ObjectSubclass<imp::UdpSrc>) @extends gst::Element, gst::Object;
+}
+
+// GStreamer elements need to be thread-safe. For the private implementation this is automatically
+// enforced but for the public wrapper type we need to specify this manually.
+unsafe impl Send for UdpSrc {}
+unsafe impl Sync for UdpSrc {}
+
+pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> {
+ gst::Element::register(
+ Some(plugin),
+ "ts-udpsrc",
+ gst::Rank::None,
+ UdpSrc::static_type(),
+ )
+}
diff --git a/generic/threadshare/tests/pad.rs b/generic/threadshare/tests/pad.rs
index 78408d1e9..fa22175de 100644
--- a/generic/threadshare/tests/pad.rs
+++ b/generic/threadshare/tests/pad.rs
@@ -21,8 +21,8 @@ use futures::future::BoxFuture;
use futures::lock::Mutex as FutMutex;
use futures::prelude::*;
-use glib::glib_object_subclass;
use glib::GBoxed;
+use glib::{glib_object_subclass, glib_wrapper};
use gst::prelude::*;
use gst::subclass::prelude::*;
@@ -53,654 +53,687 @@ fn init() {
});
}
-// Src
-
-static SRC_PROPERTIES: [glib::subclass::Property; 1] =
- [glib::subclass::Property("context", |name| {
- glib::ParamSpec::string(
- name,
- "Context",
- "Context name to share threads with",
- Some(DEFAULT_CONTEXT),
- glib::ParamFlags::READWRITE,
- )
- })];
-
-#[derive(Clone, Debug, Default)]
-struct Settings {
- context: String,
+#[derive(Debug)]
+pub enum Item {
+ Buffer(gst::Buffer),
+ BufferList(gst::BufferList),
+ Event(gst::Event),
}
-lazy_static! {
- static ref SRC_CAT: gst::DebugCategory = gst::DebugCategory::new(
- "ts-element-src-test",
- gst::DebugColorFlags::empty(),
- Some("Thread-sharing Test Src Element"),
- );
+#[derive(Clone, Debug, GBoxed)]
+#[gboxed(type_name = "TsTestItemSender")]
+struct ItemSender {
+ sender: mpsc::Sender<Item>,
}
-#[derive(Clone, Debug)]
-struct PadSrcTestHandler;
+// Src
+mod imp_src {
+ use super::*;
+
+ static SRC_PROPERTIES: [glib::subclass::Property; 1] =
+ [glib::subclass::Property("context", |name| {
+ glib::ParamSpec::string(
+ name,
+ "Context",
+ "Context name to share threads with",
+ Some(DEFAULT_CONTEXT),
+ glib::ParamFlags::READWRITE,
+ )
+ })];
+
+ #[derive(Clone, Debug, Default)]
+ struct Settings {
+ context: String,
+ }
+
+ lazy_static! {
+ pub static ref SRC_CAT: gst::DebugCategory = gst::DebugCategory::new(
+ "ts-element-src-test",
+ gst::DebugColorFlags::empty(),
+ Some("Thread-sharing Test Src Element"),
+ );
+ }
+
+ #[derive(Clone, Debug)]
+ struct PadSrcTestHandler;
-impl PadSrcTestHandler {
- async fn push_item(
- pad: &PadSrcRef<'_>,
- item: Item,
- ) -> Result<gst::FlowSuccess, gst::FlowError> {
- gst_debug!(SRC_CAT, obj: pad.gst_pad(), "Handling {:?}", item);
+ impl PadSrcTestHandler {
+ async fn push_item(
+ pad: &PadSrcRef<'_>,
+ item: Item,
+ ) -> Result<gst::FlowSuccess, gst::FlowError> {
+ gst_debug!(SRC_CAT, obj: pad.gst_pad(), "Handling {:?}", item);
- match item {
- Item::Event(event) => {
- pad.push_event(event).await;
+ match item {
+ Item::Event(event) => {
+ pad.push_event(event).await;
- Ok(gst::FlowSuccess::Ok)
+ Ok(gst::FlowSuccess::Ok)
+ }
+ Item::Buffer(buffer) => pad.push(buffer).await,
+ Item::BufferList(list) => pad.push_list(list).await,
}
- Item::Buffer(buffer) => pad.push(buffer).await,
- Item::BufferList(list) => pad.push_list(list).await,
}
}
-}
-impl PadSrcHandler for PadSrcTestHandler {
- type ElementImpl = ElementSrcTest;
-
- fn src_event(
- &self,
- pad: &PadSrcRef,
- elem_src_test: &ElementSrcTest,
- _element: &gst::Element,
- event: gst::Event,
- ) -> bool {
- gst_log!(SRC_CAT, obj: pad.gst_pad(), "Handling {:?}", event);
-
- let ret = match event.view() {
- EventView::FlushStart(..) => {
- elem_src_test.task.flush_start().unwrap();
- true
- }
- EventView::Qos(..) | EventView::Reconfigure(..) | EventView::Latency(..) => true,
- EventView::FlushStop(..) => {
- elem_src_test.task.flush_stop().unwrap();
- true
+ impl PadSrcHandler for PadSrcTestHandler {
+ type ElementImpl = ElementSrcTest;
+
+ fn src_event(
+ &self,
+ pad: &PadSrcRef,
+ elem_src_test: &ElementSrcTest,
+ _element: &gst::Element,
+ event: gst::Event,
+ ) -> bool {
+ gst_log!(SRC_CAT, obj: pad.gst_pad(), "Handling {:?}", event);
+
+ let ret = match event.view() {
+ EventView::FlushStart(..) => {
+ elem_src_test.task.flush_start().unwrap();
+ true
+ }
+ EventView::Qos(..) | EventView::Reconfigure(..) | EventView::Latency(..) => true,
+ EventView::FlushStop(..) => {
+ elem_src_test.task.flush_stop().unwrap();
+ true
+ }
+ _ => false,
+ };
+
+ if ret {
+ gst_log!(SRC_CAT, obj: pad.gst_pad(), "Handled {:?}", event);
+ } else {
+ gst_log!(SRC_CAT, obj: pad.gst_pad(), "Didn't handle {:?}", event);
}
- _ => false,
- };
- if ret {
- gst_log!(SRC_CAT, obj: pad.gst_pad(), "Handled {:?}", event);
- } else {
- gst_log!(SRC_CAT, obj: pad.gst_pad(), "Didn't handle {:?}", event);
+ ret
}
-
- ret
}
-}
-#[derive(Debug)]
-struct ElementSrcTestTask {
- element: gst::Element,
- src_pad: PadSrcWeak,
- receiver: mpsc::Receiver<Item>,
-}
+ #[derive(Debug)]
+ struct ElementSrcTestTask {
+ element: super::ElementSrcTest,
+ src_pad: PadSrcWeak,
+ receiver: mpsc::Receiver<Item>,
+ }
-impl ElementSrcTestTask {
- fn new(element: &gst::Element, src_pad: &PadSrc, receiver: mpsc::Receiver<Item>) -> Self {
- ElementSrcTestTask {
- element: element.clone(),
- src_pad: src_pad.downgrade(),
- receiver,
+ impl ElementSrcTestTask {
+ fn new(
+ element: &super::ElementSrcTest,
+ src_pad: &PadSrc,
+ receiver: mpsc::Receiver<Item>,
+ ) -> Self {
+ ElementSrcTestTask {
+ element: element.clone(),
+ src_pad: src_pad.downgrade(),
+ receiver,
+ }
}
}
-}
-impl ElementSrcTestTask {
- fn flush(&mut self) {
- // Purge the channel
- while let Ok(Some(_item)) = self.receiver.try_next() {}
+ impl ElementSrcTestTask {
+ fn flush(&mut self) {
+ // Purge the channel
+ while let Ok(Some(_item)) = self.receiver.try_next() {}
+ }
}
-}
-
-impl TaskImpl for ElementSrcTestTask {
- fn iterate(&mut self) -> BoxFuture<'_, Result<(), gst::FlowError>> {
- async move {
- let item = self.receiver.next().await;
- let item = match item {
- Some(item) => item,
- None => {
- gst_log!(SRC_CAT, obj: &self.element, "SrcPad channel aborted");
- return Err(gst::FlowError::Eos);
+ impl TaskImpl for ElementSrcTestTask {
+ fn iterate(&mut self) -> BoxFuture<'_, Result<(), gst::FlowError>> {
+ async move {
+ let item = self.receiver.next().await;
+
+ let item = match item {
+ Some(item) => item,
+ None => {
+ gst_log!(SRC_CAT, obj: &self.element, "SrcPad channel aborted");
+ return Err(gst::FlowError::Eos);
+ }
+ };
+
+ let pad = self.src_pad.upgrade().expect("PadSrc no longer exists");
+ let res = PadSrcTestHandler::push_item(&pad, item).await;
+ match res {
+ Ok(_) => gst_log!(SRC_CAT, obj: &self.element, "Successfully pushed item"),
+ Err(gst::FlowError::Flushing) => {
+ gst_debug!(SRC_CAT, obj: &self.element, "Flushing")
+ }
+ Err(err) => panic!("Got error {}", err),
}
- };
- let pad = self.src_pad.upgrade().expect("PadSrc no longer exists");
- let res = PadSrcTestHandler::push_item(&pad, item).await;
- match res {
- Ok(_) => gst_log!(SRC_CAT, obj: &self.element, "Successfully pushed item"),
- Err(gst::FlowError::Flushing) => {
- gst_debug!(SRC_CAT, obj: &self.element, "Flushing")
- }
- Err(err) => panic!("Got error {}", err),
+ res.map(drop)
}
-
- res.map(drop)
+ .boxed()
}
- .boxed()
- }
- fn stop(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
- async move {
- gst_log!(SRC_CAT, obj: &self.element, "Stopping task");
- self.flush();
- gst_log!(SRC_CAT, obj: &self.element, "Task stopped");
- Ok(())
+ fn stop(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
+ async move {
+ gst_log!(SRC_CAT, obj: &self.element, "Stopping task");
+ self.flush();
+ gst_log!(SRC_CAT, obj: &self.element, "Task stopped");
+ Ok(())
+ }
+ .boxed()
}
- .boxed()
- }
- fn flush_start(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
- async move {
- gst_log!(SRC_CAT, obj: &self.element, "Starting task flush");
- self.flush();
- gst_log!(SRC_CAT, obj: &self.element, "Task flush started");
- Ok(())
+ fn flush_start(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
+ async move {
+ gst_log!(SRC_CAT, obj: &self.element, "Starting task flush");
+ self.flush();
+ gst_log!(SRC_CAT, obj: &self.element, "Task flush started");
+ Ok(())
+ }
+ .boxed()
}
- .boxed()
}
-}
-#[derive(Debug)]
-struct ElementSrcTest {
- src_pad: PadSrc,
- task: Task,
- sender: StdMutex<Option<mpsc::Sender<Item>>>,
- settings: StdMutex<Settings>,
-}
+ #[derive(Debug)]
+ pub struct ElementSrcTest {
+ src_pad: PadSrc,
+ task: Task,
+ sender: StdMutex<Option<mpsc::Sender<Item>>>,
+ settings: StdMutex<Settings>,
+ }
-impl ElementSrcTest {
- fn try_push(&self, item: Item) -> Result<(), Item> {
- let state = self.task.lock_state();
- if *state != TaskState::Started && *state != TaskState::Paused {
- gst_debug!(SRC_CAT, "ElementSrcTest rejecting item due to pad state");
+ impl ElementSrcTest {
+ pub fn try_push(&self, item: Item) -> Result<(), Item> {
+ let state = self.task.lock_state();
+ if *state != TaskState::Started && *state != TaskState::Paused {
+ gst_debug!(SRC_CAT, "ElementSrcTest rejecting item due to pad state");
- return Err(item);
- }
+ return Err(item);
+ }
- match self.sender.lock().unwrap().as_mut() {
- Some(sender) => sender
- .try_send(item)
- .map_err(mpsc::TrySendError::into_inner),
- None => Err(item),
+ match self.sender.lock().unwrap().as_mut() {
+ Some(sender) => sender
+ .try_send(item)
+ .map_err(mpsc::TrySendError::into_inner),
+ None => Err(item),
+ }
}
- }
- fn prepare(&self, element: &gst::Element) -> Result<(), gst::ErrorMessage> {
- gst_debug!(SRC_CAT, obj: element, "Preparing");
+ fn prepare(&self, element: &super::ElementSrcTest) -> Result<(), gst::ErrorMessage> {
+ gst_debug!(SRC_CAT, obj: element, "Preparing");
- let settings = self.settings.lock().unwrap().clone();
- let context = Context::acquire(&settings.context, THROTTLING_DURATION).map_err(|err| {
- gst_error_msg!(
- gst::ResourceError::OpenRead,
- ["Failed to acquire Context: {}", err]
- )
- })?;
+ let settings = self.settings.lock().unwrap().clone();
+ let context =
+ Context::acquire(&settings.context, THROTTLING_DURATION).map_err(|err| {
+ gst_error_msg!(
+ gst::ResourceError::OpenRead,
+ ["Failed to acquire Context: {}", err]
+ )
+ })?;
- let (sender, receiver) = mpsc::channel(1);
- *self.sender.lock().unwrap() = Some(sender);
+ let (sender, receiver) = mpsc::channel(1);
+ *self.sender.lock().unwrap() = Some(sender);
- self.task
- .prepare(
- ElementSrcTestTask::new(element, &self.src_pad, receiver),
- context,
- )
- .map_err(|err| {
- gst_error_msg!(
- gst::ResourceError::Failed,
- ["Error preparing Task: {:?}", err]
+ self.task
+ .prepare(
+ ElementSrcTestTask::new(element, &self.src_pad, receiver),
+ context,
)
- })?;
+ .map_err(|err| {
+ gst_error_msg!(
+ gst::ResourceError::Failed,
+ ["Error preparing Task: {:?}", err]
+ )
+ })?;
- gst_debug!(SRC_CAT, obj: element, "Prepared");
+ gst_debug!(SRC_CAT, obj: element, "Prepared");
- Ok(())
- }
+ Ok(())
+ }
- fn unprepare(&self, element: &gst::Element) {
- gst_debug!(SRC_CAT, obj: element, "Unpreparing");
+ fn unprepare(&self, element: &super::ElementSrcTest) {
+ gst_debug!(SRC_CAT, obj: element, "Unpreparing");
- *self.sender.lock().unwrap() = None;
- self.task.unprepare().unwrap();
+ *self.sender.lock().unwrap() = None;
+ self.task.unprepare().unwrap();
- gst_debug!(SRC_CAT, obj: element, "Unprepared");
- }
+ gst_debug!(SRC_CAT, obj: element, "Unprepared");
+ }
- fn stop(&self, element: &gst::Element) {
- gst_debug!(SRC_CAT, obj: element, "Stopping");
- self.task.stop().unwrap();
- gst_debug!(SRC_CAT, obj: element, "Stopped");
- }
+ fn stop(&self, element: &super::ElementSrcTest) {
+ gst_debug!(SRC_CAT, obj: element, "Stopping");
+ self.task.stop().unwrap();
+ gst_debug!(SRC_CAT, obj: element, "Stopped");
+ }
- fn start(&self, element: &gst::Element) {
- gst_debug!(SRC_CAT, obj: element, "Starting");
- self.task.start().unwrap();
- gst_debug!(SRC_CAT, obj: element, "Started");
- }
+ fn start(&self, element: &super::ElementSrcTest) {
+ gst_debug!(SRC_CAT, obj: element, "Starting");
+ self.task.start().unwrap();
+ gst_debug!(SRC_CAT, obj: element, "Started");
+ }
- fn pause(&self, element: &gst::Element) {
- gst_debug!(SRC_CAT, obj: element, "Pausing");
- self.task.pause().unwrap();
- gst_debug!(SRC_CAT, obj: element, "Paused");
+ fn pause(&self, element: &super::ElementSrcTest) {
+ gst_debug!(SRC_CAT, obj: element, "Pausing");
+ self.task.pause().unwrap();
+ gst_debug!(SRC_CAT, obj: element, "Paused");
+ }
}
-}
-
-impl ObjectSubclass for ElementSrcTest {
- const NAME: &'static str = "TsElementSrcTest";
- type ParentType = gst::Element;
- type Instance = gst::subclass::ElementInstanceStruct<Self>;
- type Class = glib::subclass::simple::ClassStruct<Self>;
- glib_object_subclass!();
-
- fn class_init(klass: &mut glib::subclass::simple::ClassStruct<Self>) {
- klass.set_metadata(
- "Thread-sharing Test Src Element",
- "Generic",
- "Src Element for Pad Src Test",
- "François Laignel <fengalin@free.fr>",
- );
+ impl ObjectSubclass for ElementSrcTest {
+ const NAME: &'static str = "TsElementSrcTest";
+ type Type = super::ElementSrcTest;
+ type ParentType = gst::Element;
+ type Instance = gst::subclass::ElementInstanceStruct<Self>;
+ type Class = glib::subclass::simple::ClassStruct<Self>;
+
+ glib_object_subclass!();
+
+ fn class_init(klass: &mut Self::Class) {
+ klass.set_metadata(
+ "Thread-sharing Test Src Element",
+ "Generic",
+ "Src Element for Pad Src Test",
+ "François Laignel <fengalin@free.fr>",
+ );
- let caps = gst::Caps::new_any();
- let src_pad_template = gst::PadTemplate::new(
- "src",
- gst::PadDirection::Src,
- gst::PadPresence::Always,
- &caps,
- )
- .unwrap();
- klass.add_pad_template(src_pad_template);
+ let caps = gst::Caps::new_any();
+ let src_pad_template = gst::PadTemplate::new(
+ "src",
+ gst::PadDirection::Src,
+ gst::PadPresence::Always,
+ &caps,
+ )
+ .unwrap();
+ klass.add_pad_template(src_pad_template);
- klass.install_properties(&SRC_PROPERTIES);
- }
+ klass.install_properties(&SRC_PROPERTIES);
+ }
- fn with_class(klass: &glib::subclass::simple::ClassStruct<Self>) -> Self {
- ElementSrcTest {
- src_pad: PadSrc::new(
- gst::Pad::from_template(&klass.get_pad_template("src").unwrap(), Some("src")),
- PadSrcTestHandler,
- ),
- task: Task::default(),
- sender: StdMutex::new(None),
- settings: StdMutex::new(Settings::default()),
+ fn with_class(klass: &Self::Class) -> Self {
+ ElementSrcTest {
+ src_pad: PadSrc::new(
+ gst::Pad::from_template(&klass.get_pad_template("src").unwrap(), Some("src")),
+ PadSrcTestHandler,
+ ),
+ task: Task::default(),
+ sender: StdMutex::new(None),
+ settings: StdMutex::new(Settings::default()),
+ }
}
}
-}
-impl ObjectImpl for ElementSrcTest {
- fn set_property(&self, _obj: &glib::Object, id: usize, value: &glib::Value) {
- let prop = &SRC_PROPERTIES[id];
+ impl ObjectImpl for ElementSrcTest {
+ fn set_property(&self, _obj: &Self::Type, id: usize, value: &glib::Value) {
+ let prop = &SRC_PROPERTIES[id];
- match *prop {
- glib::subclass::Property("context", ..) => {
- let context = value
- .get()
- .expect("type checked upstream")
- .unwrap_or_else(|| "".into());
+ match *prop {
+ glib::subclass::Property("context", ..) => {
+ let context = value
+ .get()
+ .expect("type checked upstream")
+ .unwrap_or_else(|| "".into());
- self.settings.lock().unwrap().context = context;
+ self.settings.lock().unwrap().context = context;
+ }
+ _ => unimplemented!(),
}
- _ => unimplemented!(),
}
- }
- fn constructed(&self, obj: &glib::Object) {
- self.parent_constructed(obj);
+ fn constructed(&self, obj: &Self::Type) {
+ self.parent_constructed(obj);
- let element = obj.downcast_ref::<gst::Element>().unwrap();
- element.add_pad(self.src_pad.gst_pad()).unwrap();
+ obj.add_pad(self.src_pad.gst_pad()).unwrap();
+ }
}
-}
-impl ElementImpl for ElementSrcTest {
- fn change_state(
- &self,
- element: &gst::Element,
- transition: gst::StateChange,
- ) -> Result<gst::StateChangeSuccess, gst::StateChangeError> {
- gst_log!(SRC_CAT, obj: element, "Changing state {:?}", transition);
-
- match transition {
- gst::StateChange::NullToReady => {
- self.prepare(element).map_err(|err| {
- element.post_error_message(err);
- gst::StateChangeError
- })?;
- }
- gst::StateChange::PlayingToPaused => {
- self.pause(element);
- }
- gst::StateChange::ReadyToNull => {
- self.unprepare(element);
+ impl ElementImpl for ElementSrcTest {
+ fn change_state(
+ &self,
+ element: &Self::Type,
+ transition: gst::StateChange,
+ ) -> Result<gst::StateChangeSuccess, gst::StateChangeError> {
+ gst_log!(SRC_CAT, obj: element, "Changing state {:?}", transition);
+
+ match transition {
+ gst::StateChange::NullToReady => {
+ self.prepare(element).map_err(|err| {
+ element.post_error_message(err);
+ gst::StateChangeError
+ })?;
+ }
+ gst::StateChange::PlayingToPaused => {
+ self.pause(element);
+ }
+ gst::StateChange::ReadyToNull => {
+ self.unprepare(element);
+ }
+ _ => (),
}
- _ => (),
- }
- let mut success = self.parent_change_state(element, transition)?;
+ let mut success = self.parent_change_state(element, transition)?;
- match transition {
- gst::StateChange::PausedToReady => {
- self.stop(element);
- }
- gst::StateChange::PausedToPlaying => {
- self.start(element);
- }
- gst::StateChange::ReadyToPaused | gst::StateChange::PlayingToPaused => {
- success = gst::StateChangeSuccess::NoPreroll;
+ match transition {
+ gst::StateChange::PausedToReady => {
+ self.stop(element);
+ }
+ gst::StateChange::PausedToPlaying => {
+ self.start(element);
+ }
+ gst::StateChange::ReadyToPaused | gst::StateChange::PlayingToPaused => {
+ success = gst::StateChangeSuccess::NoPreroll;
+ }
+ _ => (),
}
- _ => (),
- }
- Ok(success)
- }
+ Ok(success)
+ }
- fn send_event(&self, _element: &gst::Element, event: gst::Event) -> bool {
- match event.view() {
- EventView::FlushStart(..) => {
- self.task.flush_start().unwrap();
- }
- EventView::FlushStop(..) => {
- self.task.flush_stop().unwrap();
+ fn send_event(&self, _element: &Self::Type, event: gst::Event) -> bool {
+ match event.view() {
+ EventView::FlushStart(..) => {
+ self.task.flush_start().unwrap();
+ }
+ EventView::FlushStop(..) => {
+ self.task.flush_stop().unwrap();
+ }
+ _ => (),
}
- _ => (),
- }
- if !event.is_serialized() {
- self.src_pad.gst_pad().push_event(event)
- } else {
- self.try_push(Item::Event(event)).is_ok()
+ if !event.is_serialized() {
+ self.src_pad.gst_pad().push_event(event)
+ } else {
+ self.try_push(Item::Event(event)).is_ok()
+ }
}
}
}
-// Sink
-
-#[derive(Debug)]
-enum Item {
- Buffer(gst::Buffer),
- BufferList(gst::BufferList),
- Event(gst::Event),
-}
-
-#[derive(Clone, Debug, GBoxed)]
-#[gboxed(type_name = "TsTestItemSender")]
-struct ItemSender {
- sender: mpsc::Sender<Item>,
+glib_wrapper! {
+ pub struct ElementSrcTest(ObjectSubclass<imp_src::ElementSrcTest>) @extends gst::Element, gst::Object;
}
+unsafe impl Send for ElementSrcTest {}
+unsafe impl Sync for ElementSrcTest {}
-static SINK_PROPERTIES: [glib::subclass::Property; 1] =
- [glib::subclass::Property("sender", |name| {
- glib::ParamSpec::boxed(
- name,
- "Sender",
- "Channel sender to forward the incoming items to",
- ItemSender::get_type(),
- glib::ParamFlags::WRITABLE,
- )
- })];
-
-#[derive(Clone, Debug, Default)]
-struct PadSinkTestHandler;
-
-impl PadSinkHandler for PadSinkTestHandler {
- type ElementImpl = ElementSinkTest;
-
- fn sink_chain(
- &self,
- _pad: &PadSinkRef,
- _elem_sink_test: &ElementSinkTest,
- element: &gst::Element,
- buffer: gst::Buffer,
- ) -> BoxFuture<'static, Result<gst::FlowSuccess, gst::FlowError>> {
- let element = element.clone();
- async move {
- let elem_sink_test = ElementSinkTest::from_instance(&element);
- elem_sink_test
- .forward_item(&element, Item::Buffer(buffer))
- .await
- }
- .boxed()
- }
+// Sink
- fn sink_chain_list(
- &self,
- _pad: &PadSinkRef,
- _elem_sink_test: &ElementSinkTest,
- element: &gst::Element,
- list: gst::BufferList,
- ) -> BoxFuture<'static, Result<gst::FlowSuccess, gst::FlowError>> {
- let element = element.clone();
- async move {
- let elem_sink_test = ElementSinkTest::from_instance(&element);
- elem_sink_test
- .forward_item(&element, Item::BufferList(list))
- .await
+mod imp_sink {
+ use super::*;
+
+ static SINK_PROPERTIES: [glib::subclass::Property; 1] =
+ [glib::subclass::Property("sender", |name| {
+ glib::ParamSpec::boxed(
+ name,
+ "Sender",
+ "Channel sender to forward the incoming items to",
+ ItemSender::get_type(),
+ glib::ParamFlags::WRITABLE,
+ )
+ })];
+
+ #[derive(Clone, Debug, Default)]
+ struct PadSinkTestHandler;
+
+ impl PadSinkHandler for PadSinkTestHandler {
+ type ElementImpl = ElementSinkTest;
+
+ fn sink_chain(
+ &self,
+ _pad: &PadSinkRef,
+ _elem_sink_test: &ElementSinkTest,
+ element: &gst::Element,
+ buffer: gst::Buffer,
+ ) -> BoxFuture<'static, Result<gst::FlowSuccess, gst::FlowError>> {
+ let element = element
+ .clone()
+ .downcast::<super::ElementSinkTest>()
+ .unwrap();
+ async move {
+ let elem_sink_test = ElementSinkTest::from_instance(&element);
+ elem_sink_test
+ .forward_item(&element, Item::Buffer(buffer))
+ .await
+ }
+ .boxed()
}
- .boxed()
- }
- fn sink_event(
- &self,
- pad: &PadSinkRef,
- elem_sink_test: &ElementSinkTest,
- element: &gst::Element,
- event: gst::Event,
- ) -> bool {
- gst_debug!(SINK_CAT, obj: pad.gst_pad(), "Handling non-serialized {:?}", event);
-
- match event.view() {
- EventView::FlushStart(..) => {
- elem_sink_test.stop(&element);
- true
+ fn sink_chain_list(
+ &self,
+ _pad: &PadSinkRef,
+ _elem_sink_test: &ElementSinkTest,
+ element: &gst::Element,
+ list: gst::BufferList,
+ ) -> BoxFuture<'static, Result<gst::FlowSuccess, gst::FlowError>> {
+ let element = element
+ .clone()
+ .downcast::<super::ElementSinkTest>()
+ .unwrap();
+ async move {
+ let elem_sink_test = ElementSinkTest::from_instance(&element);
+ elem_sink_test
+ .forward_item(&element, Item::BufferList(list))
+ .await
}
- _ => false,
+ .boxed()
}
- }
- fn sink_event_serialized(
- &self,
- pad: &PadSinkRef,
- _elem_sink_test: &ElementSinkTest,
- element: &gst::Element,
- event: gst::Event,
- ) -> BoxFuture<'static, bool> {
- gst_log!(SINK_CAT, obj: pad.gst_pad(), "Handling serialized {:?}", event);
-
- let element = element.clone();
- async move {
- let elem_sink_test = ElementSinkTest::from_instance(&element);
-
- if let EventView::FlushStop(..) = event.view() {
- elem_sink_test.start(&element);
+ fn sink_event(
+ &self,
+ pad: &PadSinkRef,
+ elem_sink_test: &ElementSinkTest,
+ element: &gst::Element,
+ event: gst::Event,
+ ) -> bool {
+ gst_debug!(SINK_CAT, obj: pad.gst_pad(), "Handling non-serialized {:?}", event);
+
+ match event.view() {
+ EventView::FlushStart(..) => {
+ elem_sink_test.stop(&element.downcast_ref::<super::ElementSinkTest>().unwrap());
+ true
+ }
+ _ => false,
}
-
- elem_sink_test
- .forward_item(&element, Item::Event(event))
- .await
- .is_ok()
}
- .boxed()
- }
-}
-#[derive(Debug)]
-struct ElementSinkTest {
- sink_pad: PadSink,
- flushing: AtomicBool,
- sender: FutMutex<Option<mpsc::Sender<Item>>>,
-}
+ fn sink_event_serialized(
+ &self,
+ pad: &PadSinkRef,
+ _elem_sink_test: &ElementSinkTest,
+ element: &gst::Element,
+ event: gst::Event,
+ ) -> BoxFuture<'static, bool> {
+ gst_log!(SINK_CAT, obj: pad.gst_pad(), "Handling serialized {:?}", event);
+
+ let element = element
+ .clone()
+ .downcast::<super::ElementSinkTest>()
+ .unwrap();
+ async move {
+ let elem_sink_test = ElementSinkTest::from_instance(&element);
+
+ if let EventView::FlushStop(..) = event.view() {
+ elem_sink_test.start(&element);
+ }
-impl ElementSinkTest {
- async fn forward_item(
- &self,
- element: &gst::Element,
- item: Item,
- ) -> Result<gst::FlowSuccess, gst::FlowError> {
- if !self.flushing.load(Ordering::SeqCst) {
- gst_debug!(SINK_CAT, obj: element, "Fowarding {:?}", item);
- self.sender
- .lock()
- .await
- .as_mut()
- .expect("Item Sender not set")
- .send(item)
- .await
- .map(|_| gst::FlowSuccess::Ok)
- .map_err(|_| gst::FlowError::Error)
- } else {
- gst_debug!(
- SINK_CAT,
- obj: element,
- "Not fowarding {:?} due to flushing",
- item
- );
- Err(gst::FlowError::Flushing)
+ elem_sink_test
+ .forward_item(&element, Item::Event(event))
+ .await
+ .is_ok()
+ }
+ .boxed()
}
}
- fn start(&self, element: &gst::Element) {
- gst_debug!(SINK_CAT, obj: element, "Starting");
- self.flushing.store(false, Ordering::SeqCst);
- gst_debug!(SINK_CAT, obj: element, "Started");
+ #[derive(Debug)]
+ pub struct ElementSinkTest {
+ sink_pad: PadSink,
+ flushing: AtomicBool,
+ sender: FutMutex<Option<mpsc::Sender<Item>>>,
}
- fn stop(&self, element: &gst::Element) {
- gst_debug!(SINK_CAT, obj: element, "Stopping");
- self.flushing.store(true, Ordering::SeqCst);
- gst_debug!(SINK_CAT, obj: element, "Stopped");
- }
-}
+ impl ElementSinkTest {
+ async fn forward_item(
+ &self,
+ element: &super::ElementSinkTest,
+ item: Item,
+ ) -> Result<gst::FlowSuccess, gst::FlowError> {
+ if !self.flushing.load(Ordering::SeqCst) {
+ gst_debug!(SINK_CAT, obj: element, "Fowarding {:?}", item);
+ self.sender
+ .lock()
+ .await
+ .as_mut()
+ .expect("Item Sender not set")
+ .send(item)
+ .await
+ .map(|_| gst::FlowSuccess::Ok)
+ .map_err(|_| gst::FlowError::Error)
+ } else {
+ gst_debug!(
+ SINK_CAT,
+ obj: element,
+ "Not fowarding {:?} due to flushing",
+ item
+ );
+ Err(gst::FlowError::Flushing)
+ }
+ }
-impl ElementSinkTest {
- fn push_flush_start(&self, element: &gst::Element) {
- gst_debug!(SINK_CAT, obj: element, "Pushing FlushStart");
- self.sink_pad
- .gst_pad()
- .push_event(gst::event::FlushStart::new());
- gst_debug!(SINK_CAT, obj: element, "FlushStart pushed");
- }
+ fn start(&self, element: &super::ElementSinkTest) {
+ gst_debug!(SINK_CAT, obj: element, "Starting");
+ self.flushing.store(false, Ordering::SeqCst);
+ gst_debug!(SINK_CAT, obj: element, "Started");
+ }
- fn push_flush_stop(&self, element: &gst::Element) {
- gst_debug!(SINK_CAT, obj: element, "Pushing FlushStop");
- self.sink_pad
- .gst_pad()
- .push_event(gst::event::FlushStop::new(true));
- gst_debug!(SINK_CAT, obj: element, "FlushStop pushed");
+ fn stop(&self, element: &super::ElementSinkTest) {
+ gst_debug!(SINK_CAT, obj: element, "Stopping");
+ self.flushing.store(true, Ordering::SeqCst);
+ gst_debug!(SINK_CAT, obj: element, "Stopped");
+ }
}
-}
-
-lazy_static! {
- static ref SINK_CAT: gst::DebugCategory = gst::DebugCategory::new(
- "ts-element-sink-test",
- gst::DebugColorFlags::empty(),
- Some("Thread-sharing Test Sink Element"),
- );
-}
-impl ObjectSubclass for ElementSinkTest {
- const NAME: &'static str = "TsElementSinkTest";
- type ParentType = gst::Element;
- type Instance = gst::subclass::ElementInstanceStruct<Self>;
- type Class = glib::subclass::simple::ClassStruct<Self>;
+ impl ElementSinkTest {
+ pub fn push_flush_start(&self, element: &super::ElementSinkTest) {
+ gst_debug!(SINK_CAT, obj: element, "Pushing FlushStart");
+ self.sink_pad
+ .gst_pad()
+ .push_event(gst::event::FlushStart::new());
+ gst_debug!(SINK_CAT, obj: element, "FlushStart pushed");
+ }
- glib_object_subclass!();
+ pub fn push_flush_stop(&self, element: &super::ElementSinkTest) {
+ gst_debug!(SINK_CAT, obj: element, "Pushing FlushStop");
+ self.sink_pad
+ .gst_pad()
+ .push_event(gst::event::FlushStop::new(true));
+ gst_debug!(SINK_CAT, obj: element, "FlushStop pushed");
+ }
+ }
- fn class_init(klass: &mut glib::subclass::simple::ClassStruct<Self>) {
- klass.set_metadata(
- "Thread-sharing Test Sink Element",
- "Generic",
- "Sink Element for Pad Test",
- "François Laignel <fengalin@free.fr>",
+ lazy_static! {
+ static ref SINK_CAT: gst::DebugCategory = gst::DebugCategory::new(
+ "ts-element-sink-test",
+ gst::DebugColorFlags::empty(),
+ Some("Thread-sharing Test Sink Element"),
);
+ }
- let caps = gst::Caps::new_any();
- let sink_pad_template = gst::PadTemplate::new(
- "sink",
- gst::PadDirection::Sink,
- gst::PadPresence::Always,
- &caps,
- )
- .unwrap();
- klass.add_pad_template(sink_pad_template);
+ impl ObjectSubclass for ElementSinkTest {
+ const NAME: &'static str = "TsElementSinkTest";
+ type Type = super::ElementSinkTest;
+ type ParentType = gst::Element;
+ type Instance = gst::subclass::ElementInstanceStruct<Self>;
+ type Class = glib::subclass::simple::ClassStruct<Self>;
+
+ glib_object_subclass!();
+
+ fn class_init(klass: &mut Self::Class) {
+ klass.set_metadata(
+ "Thread-sharing Test Sink Element",
+ "Generic",
+ "Sink Element for Pad Test",
+ "François Laignel <fengalin@free.fr>",
+ );
- klass.install_properties(&SINK_PROPERTIES);
- }
+ let caps = gst::Caps::new_any();
+ let sink_pad_template = gst::PadTemplate::new(
+ "sink",
+ gst::PadDirection::Sink,
+ gst::PadPresence::Always,
+ &caps,
+ )
+ .unwrap();
+ klass.add_pad_template(sink_pad_template);
- fn with_class(klass: &glib::subclass::simple::ClassStruct<Self>) -> Self {
- ElementSinkTest {
- sink_pad: PadSink::new(
- gst::Pad::from_template(&klass.get_pad_template("sink").unwrap(), Some("sink")),
- PadSinkTestHandler,
- ),
- flushing: AtomicBool::new(true),
- sender: FutMutex::new(None),
+ klass.install_properties(&SINK_PROPERTIES);
}
- }
-}
-impl ObjectImpl for ElementSinkTest {
- fn set_property(&self, _obj: &glib::Object, id: usize, value: &glib::Value) {
- let prop = &SINK_PROPERTIES[id];
-
- match *prop {
- glib::subclass::Property("sender", ..) => {
- let ItemSender { sender } = value
- .get::<&ItemSender>()
- .expect("type checked upstream")
- .expect("ItemSender not found")
- .clone();
- *futures::executor::block_on(self.sender.lock()) = Some(sender);
+ fn with_class(klass: &Self::Class) -> Self {
+ ElementSinkTest {
+ sink_pad: PadSink::new(
+ gst::Pad::from_template(&klass.get_pad_template("sink").unwrap(), Some("sink")),
+ PadSinkTestHandler,
+ ),
+ flushing: AtomicBool::new(true),
+ sender: FutMutex::new(None),
}
- _ => unimplemented!(),
}
}
- fn constructed(&self, obj: &glib::Object) {
- self.parent_constructed(obj);
+ impl ObjectImpl for ElementSinkTest {
+ fn set_property(&self, _obj: &Self::Type, id: usize, value: &glib::Value) {
+ let prop = &SINK_PROPERTIES[id];
+
+ match *prop {
+ glib::subclass::Property("sender", ..) => {
+ let ItemSender { sender } = value
+ .get::<&ItemSender>()
+ .expect("type checked upstream")
+ .expect("ItemSender not found")
+ .clone();
+ *futures::executor::block_on(self.sender.lock()) = Some(sender);
+ }
+ _ => unimplemented!(),
+ }
+ }
- let element = obj.downcast_ref::<gst::Element>().unwrap();
- element.add_pad(self.sink_pad.gst_pad()).unwrap();
+ fn constructed(&self, obj: &Self::Type) {
+ self.parent_constructed(obj);
+
+ obj.add_pad(self.sink_pad.gst_pad()).unwrap();
+ }
}
-}
-impl ElementImpl for ElementSinkTest {
- fn change_state(
- &self,
- element: &gst::Element,
- transition: gst::StateChange,
- ) -> Result<gst::StateChangeSuccess, gst::StateChangeError> {
- gst_log!(SINK_CAT, obj: element, "Changing state {:?}", transition);
+ impl ElementImpl for ElementSinkTest {
+ fn change_state(
+ &self,
+ element: &Self::Type,
+ transition: gst::StateChange,
+ ) -> Result<gst::StateChangeSuccess, gst::StateChangeError> {
+ gst_log!(SINK_CAT, obj: element, "Changing state {:?}", transition);
- if let gst::StateChange::PausedToReady = transition {
- self.stop(element);
- }
+ if let gst::StateChange::PausedToReady = transition {
+ self.stop(element);
+ }
- let success = self.parent_change_state(element, transition)?;
+ let success = self.parent_change_state(element, transition)?;
- if let gst::StateChange::ReadyToPaused = transition {
- self.start(element);
- }
+ if let gst::StateChange::ReadyToPaused = transition {
+ self.start(element);
+ }
- Ok(success)
+ Ok(success)
+ }
}
}
+glib_wrapper! {
+ pub struct ElementSinkTest(ObjectSubclass<imp_sink::ElementSinkTest>) @extends gst::Element, gst::Object;
+}
+unsafe impl Send for ElementSinkTest {}
+unsafe impl Sync for ElementSinkTest {}
+
fn setup(
context_name: &str,
mut middle_element_1: Option<gst::Element>,
mut middle_element_2: Option<gst::Element>,
) -> (
gst::Pipeline,
- gst::Element,
- gst::Element,
+ ElementSrcTest,
+ ElementSinkTest,
mpsc::Receiver<Item>,
) {
init();
@@ -708,14 +741,14 @@ fn setup(
let pipeline = gst::Pipeline::new(None);
// Src
- let src_element = glib::Object::new(ElementSrcTest::get_type(), &[])
+ let src_element = glib::Object::new(ElementSrcTest::static_type(), &[])
.unwrap()
- .downcast::<gst::Element>()
+ .downcast::<ElementSrcTest>()
.unwrap();
src_element.set_property("context", &context_name).unwrap();
pipeline.add(&src_element).unwrap();
- let mut last_element = src_element.clone();
+ let mut last_element = src_element.clone().upcast::<gst::Element>();
if let Some(middle_element) = middle_element_1.take() {
pipeline.add(&middle_element).unwrap();
@@ -730,9 +763,9 @@ fn setup(
}
// Sink
- let sink_element = glib::Object::new(ElementSinkTest::get_type(), &[])
+ let sink_element = glib::Object::new(ElementSinkTest::static_type(), &[])
.unwrap()
- .downcast::<gst::Element>()
+ .downcast::<ElementSinkTest>()
.unwrap();
pipeline.add(&sink_element).unwrap();
last_element.link(&sink_element).unwrap();
@@ -748,10 +781,10 @@ fn setup(
fn nominal_scenario(
scenario_name: &str,
pipeline: gst::Pipeline,
- src_element: gst::Element,
+ src_element: ElementSrcTest,
mut receiver: mpsc::Receiver<Item>,
) {
- let elem_src_test = ElementSrcTest::from_instance(&src_element);
+ let elem_src_test = imp_src::ElementSrcTest::from_instance(&src_element);
pipeline.set_state(gst::State::Playing).unwrap();
@@ -975,7 +1008,7 @@ fn start_pause_start() {
let (pipeline, src_element, _sink_element, mut receiver) = setup(&scenario_name, None, None);
- let elem_src_test = ElementSrcTest::from_instance(&src_element);
+ let elem_src_test = imp_src::ElementSrcTest::from_instance(&src_element);
pipeline.set_state(gst::State::Playing).unwrap();
@@ -1053,7 +1086,7 @@ fn start_stop_start() {
let (pipeline, src_element, _sink_element, mut receiver) = setup(&scenario_name, None, None);
- let elem_src_test = ElementSrcTest::from_instance(&src_element);
+ let elem_src_test = imp_src::ElementSrcTest::from_instance(&src_element);
pipeline.set_state(gst::State::Playing).unwrap();
@@ -1109,7 +1142,7 @@ fn start_stop_start() {
match futures::executor::block_on(receiver.next()).unwrap() {
Item::Buffer(_buffer) => {
gst_info!(
- SRC_CAT,
+ imp_src::SRC_CAT,
"{}: initial buffer went through, don't expect any pending item to be dropped",
scenario_name
);
@@ -1166,7 +1199,7 @@ fn start_flush() {
let (pipeline, src_element, sink_element, mut receiver) = setup(&scenario_name, None, None);
- let elem_src_test = ElementSrcTest::from_instance(&src_element);
+ let elem_src_test = imp_src::ElementSrcTest::from_instance(&src_element);
pipeline.set_state(gst::State::Playing).unwrap();
@@ -1206,7 +1239,7 @@ fn start_flush() {
.try_push(Item::Buffer(gst::Buffer::from_slice(vec![1, 2, 3, 4])))
.unwrap();
- let elem_sink_test = ElementSinkTest::from_instance(&sink_element);
+ let elem_sink_test = imp_sink::ElementSinkTest::from_instance(&sink_element);
elem_sink_test.push_flush_start(&sink_element);