Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.freedesktop.org/gstreamer/gst-plugins-rs.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSebastian Dröge <sebastian@centricular.com>2018-12-06 14:03:04 +0300
committerSebastian Dröge <sebastian@centricular.com>2018-12-06 14:03:04 +0300
commite64a9b4a1a30a32bc7fee7093f510b29d9a78b2c (patch)
treee51a73d5cc5ee045d7ea0b7089c96439cdf92e71
parent4d87c11293db86a224d30f58869f5dce64dab1ce (diff)
Port threadshare plugin to new subclassing API
-rw-r--r--gst-plugin-threadshare/Cargo.toml4
-rw-r--r--gst-plugin-threadshare/src/appsrc.rs351
-rw-r--r--gst-plugin-threadshare/src/lib.rs21
-rw-r--r--gst-plugin-threadshare/src/proxy.rs582
-rw-r--r--gst-plugin-threadshare/src/queue.rs405
-rw-r--r--gst-plugin-threadshare/src/tcpclientsrc.rs331
-rw-r--r--gst-plugin-threadshare/src/udpsrc.rs384
7 files changed, 1088 insertions, 990 deletions
diff --git a/gst-plugin-threadshare/Cargo.toml b/gst-plugin-threadshare/Cargo.toml
index 961ac0f60..967c1b823 100644
--- a/gst-plugin-threadshare/Cargo.toml
+++ b/gst-plugin-threadshare/Cargo.toml
@@ -12,10 +12,8 @@ gio-sys = { git = "https://github.com/gtk-rs/sys" }
gstreamer-sys = { git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs-sys" }
glib = { git = "https://github.com/gtk-rs/glib", features = ["subclassing"] }
gio = { git = "https://github.com/gtk-rs/gio" }
-gstreamer = { git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs" }
+gstreamer = { git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs", features = ["subclassing"] }
gstreamer-check = { git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs" }
-gobject-subclass = { git = "https://github.com/gtk-rs/gobject-subclass" }
-gst-plugin = { path = "../gst-plugin" }
tokio = "0.1"
tokio-reactor = "0.1"
tokio-executor = "0.1"
diff --git a/gst-plugin-threadshare/src/appsrc.rs b/gst-plugin-threadshare/src/appsrc.rs
index d08df87be..32e4e9732 100644
--- a/gst-plugin-threadshare/src/appsrc.rs
+++ b/gst-plugin-threadshare/src/appsrc.rs
@@ -17,11 +17,11 @@
use glib;
use glib::prelude::*;
+use glib::subclass;
+use glib::subclass::prelude::*;
use gst;
use gst::prelude::*;
-
-use gobject_subclass::object::*;
-use gst_plugin::element::*;
+use gst::subclass::prelude::*;
use std::sync::Mutex;
use std::u32;
@@ -63,44 +63,56 @@ impl Default for Settings {
}
}
-static PROPERTIES: [Property; 5] = [
- Property::String(
- "context",
- "Context",
- "Context name to share threads with",
- Some(DEFAULT_CONTEXT),
- PropertyMutability::ReadWrite,
- ),
- Property::UInt(
- "context-wait",
- "Context Wait",
- "Throttle poll loop to run at most once every this many ms",
- (0, 1000),
- DEFAULT_CONTEXT_WAIT,
- PropertyMutability::ReadWrite,
- ),
- Property::UInt(
- "max-buffers",
- "Max Buffers",
- "Maximum number of buffers to queue up",
- (1, u32::MAX),
- DEFAULT_MAX_BUFFERS,
- PropertyMutability::ReadWrite,
- ),
- Property::Boxed(
- "caps",
- "Caps",
- "Caps to use",
- gst::Caps::static_type,
- PropertyMutability::ReadWrite,
- ),
- Property::Boolean(
- "do-timestamp",
- "Do Timestamp",
- "Timestamp buffers with the current running time on arrival",
- DEFAULT_DO_TIMESTAMP,
- PropertyMutability::ReadWrite,
- ),
+static PROPERTIES: [subclass::Property; 5] = [
+ subclass::Property("context", || {
+ glib::ParamSpec::string(
+ "context",
+ "Context",
+ "Context name to share threads with",
+ Some(DEFAULT_CONTEXT),
+ glib::ParamFlags::READWRITE,
+ )
+ }),
+ subclass::Property("context-wait", || {
+ glib::ParamSpec::uint(
+ "context-wait",
+ "Context Wait",
+ "Throttle poll loop to run at most once every this many ms",
+ 0,
+ 1000,
+ DEFAULT_CONTEXT_WAIT,
+ glib::ParamFlags::READWRITE,
+ )
+ }),
+ subclass::Property("max-buffers", || {
+ glib::ParamSpec::uint(
+ "max-buffers",
+ "Max Buffers",
+ "Maximum number of buffers to queue up",
+ 1,
+ u32::MAX,
+ DEFAULT_MAX_BUFFERS,
+ glib::ParamFlags::READWRITE,
+ )
+ }),
+ subclass::Property("caps", || {
+ glib::ParamSpec::boxed(
+ "caps",
+ "Caps",
+ "Caps to use",
+ gst::Caps::static_type(),
+ glib::ParamFlags::READWRITE,
+ )
+ }),
+ subclass::Property("do-timestamp", || {
+ glib::ParamSpec::boolean(
+ "do-timestamp",
+ "Do Timestamp",
+ "Timestamp buffers with the current running time on arrival",
+ DEFAULT_DO_TIMESTAMP,
+ glib::ParamFlags::READWRITE,
+ )
+ }),
];
struct State {
@@ -133,88 +145,6 @@ struct AppSrc {
}
impl AppSrc {
- fn class_init(klass: &mut ElementClass) {
- klass.set_metadata(
- "Thread-sharing app source",
- "Source/Generic",
- "Thread-sharing app source",
- "Sebastian Dröge <sebastian@centricular.com>",
- );
-
- let caps = gst::Caps::new_any();
-
- let src_pad_template = gst::PadTemplate::new(
- "src",
- gst::PadDirection::Src,
- gst::PadPresence::Always,
- &caps,
- );
- klass.add_pad_template(src_pad_template);
-
- klass.install_properties(&PROPERTIES);
-
- klass.add_action_signal(
- "push-buffer",
- &[gst::Buffer::static_type()],
- bool::static_type(),
- |args| {
- let element = args[0]
- .get::<gst::Element>()
- .unwrap()
- .downcast::<Element>()
- .unwrap();
- let buffer = args[1].get::<gst::Buffer>().unwrap();
- let appsrc = element.get_impl().downcast_ref::<AppSrc>().unwrap();
-
- Some(appsrc.push_buffer(&element, buffer).to_value())
- },
- );
-
- klass.add_action_signal("end-of-stream", &[], bool::static_type(), |args| {
- let element = args[0]
- .get::<gst::Element>()
- .unwrap()
- .downcast::<Element>()
- .unwrap();
- let appsrc = element.get_impl().downcast_ref::<AppSrc>().unwrap();
- Some(appsrc.end_of_stream(&element).to_value())
- });
- }
-
- fn init(element: &Element) -> Box<ElementImpl<Element>> {
- let templ = element.get_pad_template("src").unwrap();
- let src_pad = gst::Pad::new_from_template(&templ, "src");
-
- src_pad.set_event_function(|pad, parent, event| {
- AppSrc::catch_panic_pad_function(
- parent,
- || false,
- |queue, element| queue.src_event(pad, element, event),
- )
- });
- src_pad.set_query_function(|pad, parent, query| {
- AppSrc::catch_panic_pad_function(
- parent,
- || false,
- |queue, element| queue.src_query(pad, element, query),
- )
- });
- element.add_pad(&src_pad).unwrap();
-
- ::set_element_flags(element, gst::ElementFlags::SOURCE);
-
- Box::new(Self {
- cat: gst::DebugCategory::new(
- "ts-appsrc",
- gst::DebugColorFlags::empty(),
- "Thread-sharing app source",
- ),
- src_pad: src_pad,
- state: Mutex::new(State::default()),
- settings: Mutex::new(Settings::default()),
- })
- }
-
fn create_io_context_event(state: &State) -> Option<gst::Event> {
if let (&Some(ref pending_future_id), &Some(ref io_context)) =
(&state.pending_future_id, &state.io_context)
@@ -232,7 +162,7 @@ impl AppSrc {
}
}
- fn src_event(&self, pad: &gst::Pad, element: &Element, event: gst::Event) -> bool {
+ fn src_event(&self, pad: &gst::Pad, element: &gst::Element, event: gst::Event) -> bool {
use gst::EventView;
gst_log!(self.cat, obj: pad, "Handling event {:?}", event);
@@ -265,7 +195,12 @@ impl AppSrc {
ret
}
- fn src_query(&self, pad: &gst::Pad, _element: &Element, query: &mut gst::QueryRef) -> bool {
+ fn src_query(
+ &self,
+ pad: &gst::Pad,
+ _element: &gst::Element,
+ query: &mut gst::QueryRef,
+ ) -> bool {
use gst::QueryView;
gst_log!(self.cat, obj: pad, "Handling query {:?}", query);
@@ -306,7 +241,7 @@ impl AppSrc {
ret
}
- fn push_buffer(&self, element: &Element, mut buffer: gst::Buffer) -> bool {
+ fn push_buffer(&self, element: &gst::Element, mut buffer: gst::Buffer) -> bool {
let settings = self.settings.lock().unwrap().clone();
if settings.do_timestamp {
@@ -337,7 +272,7 @@ impl AppSrc {
}
}
- fn end_of_stream(&self, element: &Element) -> bool {
+ fn end_of_stream(&self, element: &gst::Element) -> bool {
let mut state = self.state.lock().unwrap();
if let Some(ref mut channel) = state.channel {
match channel.try_send(Either::Right(gst::Event::new_eos().build())) {
@@ -354,7 +289,7 @@ impl AppSrc {
fn push_item(
&self,
- element: &Element,
+ element: &gst::Element,
item: Either<gst::Buffer, gst::Event>,
) -> future::Either<
Box<Future<Item = (), Error = ()> + Send + 'static>,
@@ -449,7 +384,7 @@ impl AppSrc {
}
}
- fn prepare(&self, element: &Element) -> Result<(), gst::ErrorMessage> {
+ fn prepare(&self, element: &gst::Element) -> Result<(), gst::ErrorMessage> {
gst_debug!(self.cat, obj: element, "Preparing");
let settings = self.settings.lock().unwrap().clone();
@@ -480,7 +415,7 @@ impl AppSrc {
Ok(())
}
- fn unprepare(&self, element: &Element) -> Result<(), ()> {
+ fn unprepare(&self, element: &gst::Element) -> Result<(), ()> {
gst_debug!(self.cat, obj: element, "Unpreparing");
let mut state = self.state.lock().unwrap();
@@ -498,7 +433,7 @@ impl AppSrc {
Ok(())
}
- fn start(&self, element: &Element) -> Result<(), ()> {
+ fn start(&self, element: &gst::Element) -> Result<(), ()> {
gst_debug!(self.cat, obj: element, "Starting");
let settings = self.settings.lock().unwrap().clone();
let mut state = self.state.lock().unwrap();
@@ -515,7 +450,7 @@ impl AppSrc {
let element_clone = element.clone();
let future = channel_receiver.for_each(move |item| {
- let appsrc = element_clone.get_impl().downcast_ref::<AppSrc>().unwrap();
+ let appsrc = Self::from_instance(&element_clone);
appsrc.push_item(&element_clone, item)
});
io_context.spawn(future);
@@ -526,7 +461,7 @@ impl AppSrc {
Ok(())
}
- fn stop(&self, element: &Element) -> Result<(), ()> {
+ fn stop(&self, element: &gst::Element) -> Result<(), ()> {
gst_debug!(self.cat, obj: element, "Stopping");
let mut state = self.state.lock().unwrap();
@@ -539,28 +474,114 @@ impl AppSrc {
}
}
-impl ObjectImpl<Element> for AppSrc {
- fn set_property(&self, _obj: &glib::Object, id: u32, value: &glib::Value) {
- let prop = &PROPERTIES[id as usize];
+impl ObjectSubclass for AppSrc {
+ const NAME: &'static str = "RsTsAppSrc";
+ type ParentType = gst::Element;
+ type Instance = gst::subclass::ElementInstanceStruct<Self>;
+ type Class = subclass::simple::ClassStruct<Self>;
+
+ glib_object_subclass!();
+
+ fn class_init(klass: &mut subclass::simple::ClassStruct<Self>) {
+ klass.set_metadata(
+ "Thread-sharing app source",
+ "Source/Generic",
+ "Thread-sharing app source",
+ "Sebastian Dröge <sebastian@centricular.com>",
+ );
+
+ let caps = gst::Caps::new_any();
+
+ let src_pad_template = gst::PadTemplate::new(
+ "src",
+ gst::PadDirection::Src,
+ gst::PadPresence::Always,
+ &caps,
+ );
+ klass.add_pad_template(src_pad_template);
+
+ klass.install_properties(&PROPERTIES);
+
+ klass.add_action_signal(
+ "push-buffer",
+ &[gst::Buffer::static_type()],
+ bool::static_type(),
+ |args| {
+ let element = args[0].get::<gst::Element>().unwrap();
+ let buffer = args[1].get::<gst::Buffer>().unwrap();
+ let appsrc = Self::from_instance(&element);
+
+ Some(appsrc.push_buffer(&element, buffer).to_value())
+ },
+ );
+
+ klass.add_action_signal("end-of-stream", &[], bool::static_type(), |args| {
+ let element = args[0].get::<gst::Element>().unwrap();
+ let appsrc = Self::from_instance(&element);
+ Some(appsrc.end_of_stream(&element).to_value())
+ });
+ }
+
+ fn new() -> Self {
+ unreachable!()
+ }
+
+ fn new_with_class(klass: &subclass::simple::ClassStruct<Self>) -> Self {
+ let templ = klass.get_pad_template("src").unwrap();
+ let src_pad = gst::Pad::new_from_template(&templ, "src");
+
+ src_pad.set_event_function(|pad, parent, event| {
+ AppSrc::catch_panic_pad_function(
+ parent,
+ || false,
+ |queue, element| queue.src_event(pad, element, event),
+ )
+ });
+ src_pad.set_query_function(|pad, parent, query| {
+ AppSrc::catch_panic_pad_function(
+ parent,
+ || false,
+ |queue, element| queue.src_query(pad, element, query),
+ )
+ });
+
+ Self {
+ cat: gst::DebugCategory::new(
+ "ts-appsrc",
+ gst::DebugColorFlags::empty(),
+ "Thread-sharing app source",
+ ),
+ src_pad: src_pad,
+ state: Mutex::new(State::default()),
+ settings: Mutex::new(Settings::default()),
+ }
+ }
+}
+
+impl ObjectImpl for AppSrc {
+ glib_object_impl!();
+
+ fn set_property(&self, _obj: &glib::Object, id: usize, value: &glib::Value) {
+ let prop = &PROPERTIES[id];
match *prop {
- Property::String("context", ..) => {
+ subclass::Property("context", ..) => {
let mut settings = self.settings.lock().unwrap();
settings.context = value.get().unwrap_or_else(|| "".into());
}
- Property::UInt("context-wait", ..) => {
+ subclass::Property("context-wait", ..) => {
let mut settings = self.settings.lock().unwrap();
settings.context_wait = value.get().unwrap();
}
- Property::Boxed("caps", ..) => {
+ subclass::Property("caps", ..) => {
let mut settings = self.settings.lock().unwrap();
settings.caps = value.get();
}
- Property::UInt("max-buffers", ..) => {
+ subclass::Property("max-buffers", ..) => {
let mut settings = self.settings.lock().unwrap();
settings.max_buffers = value.get().unwrap();
}
- Property::Boolean("do-timestamp", ..) => {
+ subclass::Property("do-timestamp", ..) => {
let mut settings = self.settings.lock().unwrap();
settings.do_timestamp = value.get().unwrap();
}
@@ -568,39 +589,48 @@ impl ObjectImpl<Element> for AppSrc {
}
}
- fn get_property(&self, _obj: &glib::Object, id: u32) -> Result<glib::Value, ()> {
- let prop = &PROPERTIES[id as usize];
+ fn get_property(&self, _obj: &glib::Object, id: usize) -> Result<glib::Value, ()> {
+ let prop = &PROPERTIES[id];
match *prop {
- Property::String("context", ..) => {
+ subclass::Property("context", ..) => {
let mut settings = self.settings.lock().unwrap();
Ok(settings.context.to_value())
}
- Property::UInt("context-wait", ..) => {
+ subclass::Property("context-wait", ..) => {
let mut settings = self.settings.lock().unwrap();
Ok(settings.context_wait.to_value())
}
- Property::Boxed("caps", ..) => {
+ subclass::Property("caps", ..) => {
let mut settings = self.settings.lock().unwrap();
Ok(settings.caps.to_value())
}
- Property::UInt("max-buffers", ..) => {
+ subclass::Property("max-buffers", ..) => {
let mut settings = self.settings.lock().unwrap();
Ok(settings.max_buffers.to_value())
}
- Property::Boolean("do-timestamp", ..) => {
+ subclass::Property("do-timestamp", ..) => {
let mut settings = self.settings.lock().unwrap();
Ok(settings.do_timestamp.to_value())
}
_ => unimplemented!(),
}
}
+
+ fn constructed(&self, obj: &glib::Object) {
+ self.parent_constructed(obj);
+
+ let element = obj.downcast_ref::<gst::Element>().unwrap();
+ element.add_pad(&self.src_pad).unwrap();
+
+ ::set_element_flags(element, gst::ElementFlags::SOURCE);
+ }
}
-impl ElementImpl<Element> for AppSrc {
+impl ElementImpl for AppSrc {
fn change_state(
&self,
- element: &Element,
+ element: &gst::Element,
transition: gst::StateChange,
) -> gst::StateChangeReturn {
gst_trace!(self.cat, obj: element, "Changing state {:?}", transition);
@@ -624,7 +654,7 @@ impl ElementImpl<Element> for AppSrc {
_ => (),
}
- let mut ret = element.parent_change_state(transition);
+ let mut ret = self.parent_change_state(element, transition);
if ret == gst::StateChangeReturn::Failure {
return ret;
}
@@ -648,23 +678,6 @@ impl ElementImpl<Element> for AppSrc {
}
}
-struct AppSrcStatic;
-
-impl ImplTypeStatic<Element> for AppSrcStatic {
- fn get_name(&self) -> &str {
- "AppSrc"
- }
-
- fn new(&self, element: &Element) -> Box<ElementImpl<Element>> {
- AppSrc::init(element)
- }
-
- fn class_init(&self, klass: &mut ElementClass) {
- AppSrc::class_init(klass);
- }
-}
-
pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> {
- let type_ = register_type(AppSrcStatic);
- gst::Element::register(plugin, "ts-appsrc", 0, type_)
+ gst::Element::register(plugin, "ts-appsrc", 0, AppSrc::get_type())
}
diff --git a/gst-plugin-threadshare/src/lib.rs b/gst-plugin-threadshare/src/lib.rs
index 65f8fd302..47d8e5746 100644
--- a/gst-plugin-threadshare/src/lib.rs
+++ b/gst-plugin-threadshare/src/lib.rs
@@ -27,9 +27,6 @@ extern crate gstreamer_sys as gst_ffi;
extern crate gio;
#[macro_use]
extern crate glib;
-extern crate gobject_subclass;
-#[macro_use]
-extern crate gst_plugin;
#[macro_use]
extern crate gstreamer as gst;
@@ -73,16 +70,16 @@ fn plugin_init(plugin: &gst::Plugin) -> Result<(), glib::BoolError> {
Ok(())
}
-plugin_define!(
- b"threadshare\0",
- b"Threadshare Plugin\0",
+gst_plugin_define!(
+ "threadshare",
+ "Threadshare Plugin",
plugin_init,
- b"0.1.0\0",
- b"LGPL\0",
- b"threadshare\0",
- b"threadshare\0",
- b"https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs\0",
- b"2018-03-01\0"
+ "0.1.0",
+ "LGPL",
+ "threadshare",
+ "threadshare",
+ "https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs",
+ "2018-03-01"
);
pub fn set_element_flags<T: glib::IsA<gst::Object> + glib::IsA<gst::Element>>(
diff --git a/gst-plugin-threadshare/src/proxy.rs b/gst-plugin-threadshare/src/proxy.rs
index 93ce47aab..08fac9a79 100644
--- a/gst-plugin-threadshare/src/proxy.rs
+++ b/gst-plugin-threadshare/src/proxy.rs
@@ -17,11 +17,11 @@
use glib;
use glib::prelude::*;
+use glib::subclass;
+use glib::subclass::prelude::*;
use gst;
use gst::prelude::*;
-
-use gobject_subclass::object::*;
-use gst_plugin::element::*;
+use gst::subclass::prelude::*;
use std::collections::HashMap;
use std::collections::VecDeque;
@@ -87,62 +87,80 @@ impl Default for SettingsSrc {
}
}
-static PROPERTIES_SRC: [Property; 6] = [
- Property::UInt(
- "max-size-buffers",
- "Max Size Buffers",
- "Maximum number of buffers to queue (0=unlimited)",
- (0, u32::MAX),
- DEFAULT_MAX_SIZE_BUFFERS,
- PropertyMutability::ReadWrite,
- ),
- Property::UInt(
- "max-size-bytes",
- "Max Size Bytes",
- "Maximum number of bytes to queue (0=unlimited)",
- (0, u32::MAX),
- DEFAULT_MAX_SIZE_BYTES,
- PropertyMutability::ReadWrite,
- ),
- Property::UInt64(
- "max-size-time",
- "Max Size Time",
- "Maximum number of nanoseconds to queue (0=unlimited)",
- (0, u64::MAX - 1),
- DEFAULT_MAX_SIZE_TIME,
- PropertyMutability::ReadWrite,
- ),
- Property::String(
- "context",
- "Context",
- "Context name to share threads with",
- Some(DEFAULT_CONTEXT),
- PropertyMutability::ReadWrite,
- ),
- Property::UInt(
- "context-wait",
- "Context Wait",
- "Throttle poll loop to run at most once every this many ms",
- (0, 1000),
- DEFAULT_CONTEXT_WAIT,
- PropertyMutability::ReadWrite,
- ),
- Property::String(
+static PROPERTIES_SRC: [subclass::Property; 6] = [
+ subclass::Property("max-size-buffers", || {
+ glib::ParamSpec::uint(
+ "max-size-buffers",
+ "Max Size Buffers",
+ "Maximum number of buffers to queue (0=unlimited)",
+ 0,
+ u32::MAX,
+ DEFAULT_MAX_SIZE_BUFFERS,
+ glib::ParamFlags::READWRITE,
+ )
+ }),
+ subclass::Property("max-size-bytes", || {
+ glib::ParamSpec::uint(
+ "max-size-bytes",
+ "Max Size Bytes",
+ "Maximum number of bytes to queue (0=unlimited)",
+ 0,
+ u32::MAX,
+ DEFAULT_MAX_SIZE_BYTES,
+ glib::ParamFlags::READWRITE,
+ )
+ }),
+ subclass::Property("max-size-time", || {
+ glib::ParamSpec::uint64(
+ "max-size-time",
+ "Max Size Time",
+ "Maximum number of nanoseconds to queue (0=unlimited)",
+ 0,
+ u64::MAX - 1,
+ DEFAULT_MAX_SIZE_TIME,
+ glib::ParamFlags::READWRITE,
+ )
+ }),
+ subclass::Property("context", || {
+ glib::ParamSpec::string(
+ "context",
+ "Context",
+ "Context name to share threads with",
+ Some(DEFAULT_CONTEXT),
+ glib::ParamFlags::READWRITE,
+ )
+ }),
+ subclass::Property("context-wait", || {
+ glib::ParamSpec::uint(
+ "context-wait",
+ "Context Wait",
+ "Throttle poll loop to run at most once every this many ms",
+ 0,
+ 1000,
+ DEFAULT_CONTEXT_WAIT,
+ glib::ParamFlags::READWRITE,
+ )
+ }),
+ subclass::Property("proxy-context", || {
+ glib::ParamSpec::string(
+ "proxy-context",
+ "Proxy Context",
+ "Context name of the proxy to share with",
+ Some(DEFAULT_PROXY_CONTEXT),
+ glib::ParamFlags::READWRITE,
+ )
+ }),
+];
+
+static PROPERTIES_SINK: [subclass::Property; 1] = [subclass::Property("proxy-context", || {
+ glib::ParamSpec::string(
"proxy-context",
"Proxy Context",
"Context name of the proxy to share with",
Some(DEFAULT_PROXY_CONTEXT),
- PropertyMutability::ReadWrite,
- ),
-];
-
-static PROPERTIES_SINK: [Property; 1] = [Property::String(
- "proxy-context",
- "Proxy Context",
- "Context name of the proxy to share with",
- Some(DEFAULT_PROXY_CONTEXT),
- PropertyMutability::ReadWrite,
-)];
+ glib::ParamFlags::READWRITE,
+ )
+})];
// TODO: Refactor into a Sender and Receiver instead of the have_ booleans
@@ -273,80 +291,10 @@ struct ProxySink {
}
impl ProxySink {
- fn class_init(klass: &mut ElementClass) {
- klass.set_metadata(
- "Thread-sharing proxy sink",
- "Sink/Generic",
- "Thread-sharing proxy sink",
- "Sebastian Dröge <sebastian@centricular.com>",
- );
-
- let caps = gst::Caps::new_any();
-
- let sink_pad_template = gst::PadTemplate::new(
- "sink",
- gst::PadDirection::Sink,
- gst::PadPresence::Always,
- &caps,
- );
- klass.add_pad_template(sink_pad_template);
-
- klass.install_properties(&PROPERTIES_SINK);
- }
-
- fn init(element: &Element) -> Box<ElementImpl<Element>> {
- let templ = element.get_pad_template("sink").unwrap();
- let sink_pad = gst::Pad::new_from_template(&templ, "sink");
-
- sink_pad.set_chain_function(|pad, parent, buffer| {
- ProxySink::catch_panic_pad_function(
- parent,
- || gst::FlowReturn::Error,
- |queue, element| queue.sink_chain(pad, element, buffer),
- )
- });
- sink_pad.set_chain_list_function(|pad, parent, list| {
- ProxySink::catch_panic_pad_function(
- parent,
- || gst::FlowReturn::Error,
- |queue, element| queue.sink_chain_list(pad, element, list),
- )
- });
- sink_pad.set_event_function(|pad, parent, event| {
- ProxySink::catch_panic_pad_function(
- parent,
- || false,
- |queue, element| queue.sink_event(pad, element, event),
- )
- });
- sink_pad.set_query_function(|pad, parent, query| {
- ProxySink::catch_panic_pad_function(
- parent,
- || false,
- |queue, element| queue.sink_query(pad, element, query),
- )
- });
-
- element.add_pad(&sink_pad).unwrap();
-
- ::set_element_flags(element, gst::ElementFlags::SINK);
-
- Box::new(Self {
- cat: gst::DebugCategory::new(
- "ts-proxysink",
- gst::DebugColorFlags::empty(),
- "Thread-sharing proxy sink",
- ),
- sink_pad: sink_pad,
- state: Mutex::new(StateSink::default()),
- settings: Mutex::new(SettingsSink::default()),
- })
- }
-
fn enqueue_item(
&self,
_pad: &gst::Pad,
- element: &Element,
+ element: &gst::Element,
item: DataQueueItem,
) -> gst::FlowReturn {
let wait_future = {
@@ -426,10 +374,7 @@ impl ProxySink {
let element_clone = element.clone();
let future = future::poll_fn(move || {
- let sink = element_clone
- .get_impl()
- .downcast_ref::<ProxySink>()
- .unwrap();
+ let sink = Self::from_instance(&element_clone);
let state = sink.state.lock().unwrap();
gst_log!(
@@ -549,7 +494,7 @@ impl ProxySink {
fn sink_chain(
&self,
pad: &gst::Pad,
- element: &Element,
+ element: &gst::Element,
buffer: gst::Buffer,
) -> gst::FlowReturn {
gst_log!(self.cat, obj: pad, "Handling buffer {:?}", buffer);
@@ -559,14 +504,14 @@ impl ProxySink {
fn sink_chain_list(
&self,
pad: &gst::Pad,
- element: &Element,
+ element: &gst::Element,
list: gst::BufferList,
) -> gst::FlowReturn {
gst_log!(self.cat, obj: pad, "Handling buffer list {:?}", list);
self.enqueue_item(pad, element, DataQueueItem::BufferList(list))
}
- fn sink_event(&self, pad: &gst::Pad, element: &Element, event: gst::Event) -> bool {
+ fn sink_event(&self, pad: &gst::Pad, element: &gst::Element, event: gst::Event) -> bool {
use gst::EventView;
gst_log!(self.cat, obj: pad, "Handling event {:?}", event);
@@ -612,13 +557,18 @@ impl ProxySink {
true
}
- fn sink_query(&self, pad: &gst::Pad, element: &Element, query: &mut gst::QueryRef) -> bool {
+ fn sink_query(
+ &self,
+ pad: &gst::Pad,
+ element: &gst::Element,
+ query: &mut gst::QueryRef,
+ ) -> bool {
gst_log!(self.cat, obj: pad, "Handling query {:?}", query);
pad.query_default(element, query)
}
- fn prepare(&self, element: &Element) -> Result<(), gst::ErrorMessage> {
+ fn prepare(&self, element: &gst::Element) -> Result<(), gst::ErrorMessage> {
gst_debug!(self.cat, obj: element, "Preparing");
let settings = self.settings.lock().unwrap().clone();
@@ -640,7 +590,7 @@ impl ProxySink {
Ok(())
}
- fn unprepare(&self, element: &Element) -> Result<(), ()> {
+ fn unprepare(&self, element: &gst::Element) -> Result<(), ()> {
gst_debug!(self.cat, obj: element, "Unpreparing");
let mut state = self.state.lock().unwrap();
@@ -650,7 +600,7 @@ impl ProxySink {
Ok(())
}
- fn start(&self, element: &Element) -> Result<(), ()> {
+ fn start(&self, element: &gst::Element) -> Result<(), ()> {
gst_debug!(self.cat, obj: element, "Starting");
let state = self.state.lock().unwrap();
@@ -662,7 +612,7 @@ impl ProxySink {
Ok(())
}
- fn stop(&self, element: &Element) -> Result<(), ()> {
+ fn stop(&self, element: &gst::Element) -> Result<(), ()> {
gst_debug!(self.cat, obj: element, "Stopping");
let mut state = self.state.lock().unwrap();
@@ -682,12 +632,93 @@ impl ProxySink {
}
}
-impl ObjectImpl<Element> for ProxySink {
- fn set_property(&self, _obj: &glib::Object, id: u32, value: &glib::Value) {
- let prop = &PROPERTIES_SINK[id as usize];
+impl ObjectSubclass for ProxySink {
+ const NAME: &'static str = "RsTsProxySink";
+ type ParentType = gst::Element;
+ type Instance = gst::subclass::ElementInstanceStruct<Self>;
+ type Class = subclass::simple::ClassStruct<Self>;
+
+ glib_object_subclass!();
+
+ fn class_init(klass: &mut subclass::simple::ClassStruct<Self>) {
+ klass.set_metadata(
+ "Thread-sharing proxy sink",
+ "Sink/Generic",
+ "Thread-sharing proxy sink",
+ "Sebastian Dröge <sebastian@centricular.com>",
+ );
+
+ let caps = gst::Caps::new_any();
+
+ let sink_pad_template = gst::PadTemplate::new(
+ "sink",
+ gst::PadDirection::Sink,
+ gst::PadPresence::Always,
+ &caps,
+ );
+ klass.add_pad_template(sink_pad_template);
+
+ klass.install_properties(&PROPERTIES_SINK);
+ }
+
+ fn new() -> Self {
+ unreachable!()
+ }
+
+ fn new_with_class(klass: &subclass::simple::ClassStruct<Self>) -> Self {
+ let templ = klass.get_pad_template("sink").unwrap();
+ let sink_pad = gst::Pad::new_from_template(&templ, "sink");
+
+ sink_pad.set_chain_function(|pad, parent, buffer| {
+ ProxySink::catch_panic_pad_function(
+ parent,
+ || gst::FlowReturn::Error,
+ |queue, element| queue.sink_chain(pad, element, buffer),
+ )
+ });
+ sink_pad.set_chain_list_function(|pad, parent, list| {
+ ProxySink::catch_panic_pad_function(
+ parent,
+ || gst::FlowReturn::Error,
+ |queue, element| queue.sink_chain_list(pad, element, list),
+ )
+ });
+ sink_pad.set_event_function(|pad, parent, event| {
+ ProxySink::catch_panic_pad_function(
+ parent,
+ || false,
+ |queue, element| queue.sink_event(pad, element, event),
+ )
+ });
+ sink_pad.set_query_function(|pad, parent, query| {
+ ProxySink::catch_panic_pad_function(
+ parent,
+ || false,
+ |queue, element| queue.sink_query(pad, element, query),
+ )
+ });
+
+ Self {
+ cat: gst::DebugCategory::new(
+ "ts-proxysink",
+ gst::DebugColorFlags::empty(),
+ "Thread-sharing proxy sink",
+ ),
+ sink_pad: sink_pad,
+ state: Mutex::new(StateSink::default()),
+ settings: Mutex::new(SettingsSink::default()),
+ }
+ }
+}
+
+impl ObjectImpl for ProxySink {
+ glib_object_impl!();
+
+ fn set_property(&self, _obj: &glib::Object, id: usize, value: &glib::Value) {
+ let prop = &PROPERTIES_SINK[id];
match *prop {
- Property::String("proxy-context", ..) => {
+ subclass::Property("proxy-context", ..) => {
let mut settings = self.settings.lock().unwrap();
settings.proxy_context = value.get().unwrap_or_else(|| "".into());
}
@@ -695,23 +726,32 @@ impl ObjectImpl<Element> for ProxySink {
}
}
- fn get_property(&self, _obj: &glib::Object, id: u32) -> Result<glib::Value, ()> {
- let prop = &PROPERTIES_SINK[id as usize];
+ fn get_property(&self, _obj: &glib::Object, id: usize) -> Result<glib::Value, ()> {
+ let prop = &PROPERTIES_SINK[id];
match *prop {
- Property::String("proxy-context", ..) => {
+ subclass::Property("proxy-context", ..) => {
let mut settings = self.settings.lock().unwrap();
Ok(settings.proxy_context.to_value())
}
_ => unimplemented!(),
}
}
+
+ fn constructed(&self, obj: &glib::Object) {
+ self.parent_constructed(obj);
+
+ let element = obj.downcast_ref::<gst::Element>().unwrap();
+ element.add_pad(&self.sink_pad).unwrap();
+
+ ::set_element_flags(element, gst::ElementFlags::SINK);
+ }
}
-impl ElementImpl<Element> for ProxySink {
+impl ElementImpl for ProxySink {
fn change_state(
&self,
- element: &Element,
+ element: &gst::Element,
transition: gst::StateChange,
) -> gst::StateChangeReturn {
gst_trace!(self.cat, obj: element, "Changing state {:?}", transition);
@@ -735,7 +775,7 @@ impl ElementImpl<Element> for ProxySink {
_ => (),
}
- let ret = element.parent_change_state(transition);
+ let ret = self.parent_change_state(element, transition);
if ret == gst::StateChangeReturn::Failure {
return ret;
}
@@ -760,71 +800,6 @@ struct ProxySrc {
}
impl ProxySrc {
- fn class_init(klass: &mut ElementClass) {
- klass.set_metadata(
- "Thread-sharing proxy source",
- "Source/Generic",
- "Thread-sharing proxy source",
- "Sebastian Dröge <sebastian@centricular.com>",
- );
-
- let caps = gst::Caps::new_any();
-
- let src_pad_template = gst::PadTemplate::new(
- "src",
- gst::PadDirection::Src,
- gst::PadPresence::Always,
- &caps,
- );
- klass.add_pad_template(src_pad_template);
-
- klass.install_properties(&PROPERTIES_SRC);
- }
-
- fn init(element: &Element) -> Box<ElementImpl<Element>> {
- let templ = element.get_pad_template("src").unwrap();
- let src_pad = gst::Pad::new_from_template(&templ, "src");
-
- src_pad.set_event_function(|pad, parent, event| {
- ProxySrc::catch_panic_pad_function(
- parent,
- || false,
- |queue, element| queue.src_event(pad, element, event),
- )
- });
- src_pad.set_query_function(|pad, parent, query| {
- ProxySrc::catch_panic_pad_function(
- parent,
- || false,
- |queue, element| queue.src_query(pad, element, query),
- )
- });
- element.add_pad(&src_pad).unwrap();
-
- ::set_element_flags(element, gst::ElementFlags::SOURCE);
-
- Box::new(Self {
- cat: gst::DebugCategory::new(
- "ts-proxysrc",
- gst::DebugColorFlags::empty(),
- "Thread-sharing proxy source",
- ),
- src_pad: src_pad,
- state: Mutex::new(StateSrc::default()),
- settings: Mutex::new(SettingsSrc::default()),
- })
- }
-
- fn catch_panic_pad_function<T, F: FnOnce(&Self, &Element) -> T, G: FnOnce() -> T>(
- parent: &Option<gst::Object>,
- fallback: G,
- f: F,
- ) -> T {
- let element = parent.as_ref().unwrap().downcast_ref::<Element>().unwrap();
- let src = element.get_impl().downcast_ref::<ProxySrc>().unwrap();
- element.catch_panic(fallback, |element| f(src, element))
- }
-
fn create_io_context_event(state: &StateSrc) -> Option<gst::Event> {
if let (&Some(ref pending_future_id), &Some(ref io_context)) =
(&state.pending_future_id, &state.io_context)
@@ -842,7 +817,7 @@ impl ProxySrc {
}
}
- fn src_event(&self, pad: &gst::Pad, element: &Element, event: gst::Event) -> bool {
+ fn src_event(&self, pad: &gst::Pad, element: &gst::Element, event: gst::Event) -> bool {
use gst::EventView;
gst_log!(self.cat, obj: pad, "Handling event {:?}", event);
@@ -875,7 +850,12 @@ impl ProxySrc {
ret
}
- fn src_query(&self, pad: &gst::Pad, _element: &Element, query: &mut gst::QueryRef) -> bool {
+ fn src_query(
+ &self,
+ pad: &gst::Pad,
+ _element: &gst::Element,
+ query: &mut gst::QueryRef,
+ ) -> bool {
use gst::QueryView;
gst_log!(self.cat, obj: pad, "Handling query {:?}", query);
@@ -917,7 +897,7 @@ impl ProxySrc {
fn push_item(
&self,
- element: &Element,
+ element: &gst::Element,
item: DataQueueItem,
) -> future::Either<
Box<Future<Item = (), Error = gst::FlowError> + Send + 'static>,
@@ -1045,7 +1025,7 @@ impl ProxySrc {
}
}
- fn prepare(&self, element: &Element) -> Result<(), gst::ErrorMessage> {
+ fn prepare(&self, element: &gst::Element) -> Result<(), gst::ErrorMessage> {
gst_debug!(self.cat, obj: element, "Preparing");
let settings = self.settings.lock().unwrap().clone();
@@ -1095,14 +1075,11 @@ impl ProxySrc {
.schedule(
&io_context,
move |item| {
- let src = element_clone.get_impl().downcast_ref::<ProxySrc>().unwrap();
+ let src = Self::from_instance(&element_clone);
src.push_item(&element_clone, item)
},
move |err| {
- let src = element_clone2
- .get_impl()
- .downcast_ref::<ProxySrc>()
- .unwrap();
+ let src = Self::from_instance(&element_clone2);
gst_error!(src.cat, obj: &element_clone2, "Got error {}", err);
match err {
gst::FlowError::CustomError => (),
@@ -1143,7 +1120,7 @@ impl ProxySrc {
Ok(())
}
- fn unprepare(&self, element: &Element) -> Result<(), ()> {
+ fn unprepare(&self, element: &gst::Element) -> Result<(), ()> {
gst_debug!(self.cat, obj: element, "Unpreparing");
// FIXME: The IO Context has to be alive longer than the queue,
@@ -1181,7 +1158,7 @@ impl ProxySrc {
Ok(())
}
- fn start(&self, element: &Element) -> Result<(), ()> {
+ fn start(&self, element: &gst::Element) -> Result<(), ()> {
gst_debug!(self.cat, obj: element, "Starting");
let state = self.state.lock().unwrap();
let queue = state.queue.as_ref().unwrap().0.lock().unwrap();
@@ -1195,7 +1172,7 @@ impl ProxySrc {
Ok(())
}
- fn stop(&self, element: &Element) -> Result<(), ()> {
+ fn stop(&self, element: &gst::Element) -> Result<(), ()> {
gst_debug!(self.cat, obj: element, "Stopping");
let state = self.state.lock().unwrap();
let mut queue = state.queue.as_ref().unwrap().0.lock().unwrap();
@@ -1212,32 +1189,99 @@ impl ProxySrc {
}
}
-impl ObjectImpl<Element> for ProxySrc {
- fn set_property(&self, _obj: &glib::Object, id: u32, value: &glib::Value) {
- let prop = &PROPERTIES_SRC[id as usize];
+impl ObjectSubclass for ProxySrc {
+ const NAME: &'static str = "RsTsProxySrc";
+ type ParentType = gst::Element;
+ type Instance = gst::subclass::ElementInstanceStruct<Self>;
+ type Class = subclass::simple::ClassStruct<Self>;
+
+ glib_object_subclass!();
+
+ fn class_init(klass: &mut subclass::simple::ClassStruct<Self>) {
+ klass.set_metadata(
+ "Thread-sharing proxy source",
+ "Source/Generic",
+ "Thread-sharing proxy source",
+ "Sebastian Dröge <sebastian@centricular.com>",
+ );
+
+ let caps = gst::Caps::new_any();
+
+ let src_pad_template = gst::PadTemplate::new(
+ "src",
+ gst::PadDirection::Src,
+ gst::PadPresence::Always,
+ &caps,
+ );
+ klass.add_pad_template(src_pad_template);
+
+ klass.install_properties(&PROPERTIES_SRC);
+ }
+
+ fn new() -> Self {
+ unreachable!()
+ }
+
+ fn new_with_class(klass: &subclass::simple::ClassStruct<Self>) -> Self {
+ let templ = klass.get_pad_template("src").unwrap();
+ let src_pad = gst::Pad::new_from_template(&templ, "src");
+
+ src_pad.set_event_function(|pad, parent, event| {
+ ProxySrc::catch_panic_pad_function(
+ parent,
+ || false,
+ |queue, element| queue.src_event(pad, element, event),
+ )
+ });
+ src_pad.set_query_function(|pad, parent, query| {
+ ProxySrc::catch_panic_pad_function(
+ parent,
+ || false,
+ |queue, element| queue.src_query(pad, element, query),
+ )
+ });
+
+ Self {
+ cat: gst::DebugCategory::new(
+ "ts-proxysrc",
+ gst::DebugColorFlags::empty(),
+ "Thread-sharing proxy source",
+ ),
+ src_pad: src_pad,
+ state: Mutex::new(StateSrc::default()),
+ settings: Mutex::new(SettingsSrc::default()),
+ }
+ }
+}
+
+impl ObjectImpl for ProxySrc {
+ glib_object_impl!();
+
+ fn set_property(&self, _obj: &glib::Object, id: usize, value: &glib::Value) {
+ let prop = &PROPERTIES_SRC[id];
match *prop {
- Property::UInt("max-size-buffers", ..) => {
+ subclass::Property("max-size-buffers", ..) => {
let mut settings = self.settings.lock().unwrap();
settings.max_size_buffers = value.get().unwrap();
}
- Property::UInt("max-size-bytes", ..) => {
+ subclass::Property("max-size-bytes", ..) => {
let mut settings = self.settings.lock().unwrap();
settings.max_size_bytes = value.get().unwrap();
}
- Property::UInt64("max-size-time", ..) => {
+ subclass::Property("max-size-time", ..) => {
let mut settings = self.settings.lock().unwrap();
settings.max_size_time = value.get().unwrap();
}
- Property::String("context", ..) => {
+ subclass::Property("context", ..) => {
let mut settings = self.settings.lock().unwrap();
settings.context = value.get().unwrap_or_else(|| "".into());
}
- Property::UInt("context-wait", ..) => {
+ subclass::Property("context-wait", ..) => {
let mut settings = self.settings.lock().unwrap();
settings.context_wait = value.get().unwrap();
}
- Property::String("proxy-context", ..) => {
+ subclass::Property("proxy-context", ..) => {
let mut settings = self.settings.lock().unwrap();
settings.proxy_context = value.get().unwrap_or_else(|| "".into());
}
@@ -1245,43 +1289,52 @@ impl ObjectImpl<Element> for ProxySrc {
}
}
- fn get_property(&self, _obj: &glib::Object, id: u32) -> Result<glib::Value, ()> {
- let prop = &PROPERTIES_SRC[id as usize];
+ fn get_property(&self, _obj: &glib::Object, id: usize) -> Result<glib::Value, ()> {
+ let prop = &PROPERTIES_SRC[id];
match *prop {
- Property::UInt("max-size-buffers", ..) => {
+ subclass::Property("max-size-buffers", ..) => {
let mut settings = self.settings.lock().unwrap();
Ok(settings.max_size_buffers.to_value())
}
- Property::UInt("max-size-bytes", ..) => {
+ subclass::Property("max-size-bytes", ..) => {
let mut settings = self.settings.lock().unwrap();
Ok(settings.max_size_bytes.to_value())
}
- Property::UInt64("max-size-time", ..) => {
+ subclass::Property("max-size-time", ..) => {
let mut settings = self.settings.lock().unwrap();
Ok(settings.max_size_time.to_value())
}
- Property::String("context", ..) => {
+ subclass::Property("context", ..) => {
let mut settings = self.settings.lock().unwrap();
Ok(settings.context.to_value())
}
- Property::UInt("context-wait", ..) => {
+ subclass::Property("context-wait", ..) => {
let mut settings = self.settings.lock().unwrap();
Ok(settings.context_wait.to_value())
}
- Property::String("proxy-context", ..) => {
+ subclass::Property("proxy-context", ..) => {
let mut settings = self.settings.lock().unwrap();
Ok(settings.proxy_context.to_value())
}
_ => unimplemented!(),
}
}
+
+ fn constructed(&self, obj: &glib::Object) {
+ self.parent_constructed(obj);
+
+ let element = obj.downcast_ref::<gst::Element>().unwrap();
+ element.add_pad(&self.src_pad).unwrap();
+
+ ::set_element_flags(element, gst::ElementFlags::SOURCE);
+ }
}
-impl ElementImpl<Element> for ProxySrc {
+impl ElementImpl for ProxySrc {
fn change_state(
&self,
- element: &Element,
+ element: &gst::Element,
transition: gst::StateChange,
) -> gst::StateChangeReturn {
gst_trace!(self.cat, obj: element, "Changing state {:?}", transition);
@@ -1305,7 +1358,7 @@ impl ElementImpl<Element> for ProxySrc {
_ => (),
}
- let mut ret = element.parent_change_state(transition);
+ let mut ret = self.parent_change_state(element, transition);
if ret == gst::StateChangeReturn::Failure {
return ret;
}
@@ -1325,42 +1378,7 @@ impl ElementImpl<Element> for ProxySrc {
}
}
-struct ProxySinkStatic;
-
-impl ImplTypeStatic<Element> for ProxySinkStatic {
- fn get_name(&self) -> &str {
- "ProxySink"
- }
-
- fn new(&self, element: &Element) -> Box<ElementImpl<Element>> {
- ProxySink::init(element)
- }
-
- fn class_init(&self, klass: &mut ElementClass) {
- ProxySink::class_init(klass);
- }
-}
-
-struct ProxySrcStatic;
-
-impl ImplTypeStatic<Element> for ProxySrcStatic {
- fn get_name(&self) -> &str {
- "ProxySrc"
- }
-
- fn new(&self, element: &Element) -> Box<ElementImpl<Element>> {
- ProxySrc::init(element)
- }
-
- fn class_init(&self, klass: &mut ElementClass) {
- ProxySrc::class_init(klass);
- }
-}
-
pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> {
- let type_ = register_type(ProxySinkStatic);
- gst::Element::register(plugin, "ts-proxysink", 0, type_)?;
-
- let type_ = register_type(ProxySrcStatic);
- gst::Element::register(plugin, "ts-proxysrc", 0, type_)
+ gst::Element::register(plugin, "ts-proxysink", 0, ProxySink::get_type())?;
+ gst::Element::register(plugin, "ts-proxysrc", 0, ProxySrc::get_type())
}
diff --git a/gst-plugin-threadshare/src/queue.rs b/gst-plugin-threadshare/src/queue.rs
index 540879fc3..8122e6236 100644
--- a/gst-plugin-threadshare/src/queue.rs
+++ b/gst-plugin-threadshare/src/queue.rs
@@ -17,11 +17,11 @@
use glib;
use glib::prelude::*;
+use glib::subclass;
+use glib::subclass::prelude::*;
use gst;
use gst::prelude::*;
-
-use gobject_subclass::object::*;
-use gst_plugin::element::*;
+use gst::subclass::prelude::*;
use std::collections::VecDeque;
use std::sync::Mutex;
@@ -64,46 +64,60 @@ impl Default for Settings {
}
}
-static PROPERTIES: [Property; 5] = [
- Property::UInt(
- "max-size-buffers",
- "Max Size Buffers",
- "Maximum number of buffers to queue (0=unlimited)",
- (0, u32::MAX),
- DEFAULT_MAX_SIZE_BUFFERS,
- PropertyMutability::ReadWrite,
- ),
- Property::UInt(
- "max-size-bytes",
- "Max Size Bytes",
- "Maximum number of bytes to queue (0=unlimited)",
- (0, u32::MAX),
- DEFAULT_MAX_SIZE_BYTES,
- PropertyMutability::ReadWrite,
- ),
- Property::UInt64(
- "max-size-time",
- "Max Size Time",
- "Maximum number of nanoseconds to queue (0=unlimited)",
- (0, u64::MAX - 1),
- DEFAULT_MAX_SIZE_TIME,
- PropertyMutability::ReadWrite,
- ),
- Property::String(
- "context",
- "Context",
- "Context name to share threads with",
- Some(DEFAULT_CONTEXT),
- PropertyMutability::ReadWrite,
- ),
- Property::UInt(
- "context-wait",
- "Context Wait",
- "Throttle poll loop to run at most once every this many ms",
- (0, 1000),
- DEFAULT_CONTEXT_WAIT,
- PropertyMutability::ReadWrite,
- ),
+static PROPERTIES: [subclass::Property; 5] = [
+ subclass::Property("max-size-buffers", || {
+ glib::ParamSpec::uint(
+ "max-size-buffers",
+ "Max Size Buffers",
+ "Maximum number of buffers to queue (0=unlimited)",
+ 0,
+ u32::MAX,
+ DEFAULT_MAX_SIZE_BUFFERS,
+ glib::ParamFlags::READWRITE,
+ )
+ }),
+ subclass::Property("max-size-bytes", || {
+ glib::ParamSpec::uint(
+ "max-size-bytes",
+ "Max Size Bytes",
+ "Maximum number of bytes to queue (0=unlimited)",
+ 0,
+ u32::MAX,
+ DEFAULT_MAX_SIZE_BYTES,
+ glib::ParamFlags::READWRITE,
+ )
+ }),
+ subclass::Property("max-size-time", || {
+ glib::ParamSpec::uint64(
+ "max-size-time",
+ "Max Size Time",
+ "Maximum number of nanoseconds to queue (0=unlimited)",
+ 0,
+ u64::MAX - 1,
+ DEFAULT_MAX_SIZE_TIME,
+ glib::ParamFlags::READWRITE,
+ )
+ }),
+ subclass::Property("context", || {
+ glib::ParamSpec::string(
+ "context",
+ "Context",
+ "Context name to share threads with",
+ Some(DEFAULT_CONTEXT),
+ glib::ParamFlags::READWRITE,
+ )
+ }),
+ subclass::Property("context-wait", || {
+ glib::ParamSpec::uint(
+ "context-wait",
+ "Context Wait",
+ "Throttle poll loop to run at most once every this many ms",
+ 0,
+ 1000,
+ DEFAULT_CONTEXT_WAIT,
+ glib::ParamFlags::READWRITE,
+ )
+ }),
];
struct State {
@@ -141,100 +155,6 @@ struct Queue {
}
impl Queue {
- fn class_init(klass: &mut ElementClass) {
- klass.set_metadata(
- "Thread-sharing queue",
- "Generic",
- "Simple data queue",
- "Sebastian Dröge <sebastian@centricular.com>",
- );
-
- let caps = gst::Caps::new_any();
-
- let sink_pad_template = gst::PadTemplate::new(
- "sink",
- gst::PadDirection::Sink,
- gst::PadPresence::Always,
- &caps,
- );
- klass.add_pad_template(sink_pad_template);
-
- let src_pad_template = gst::PadTemplate::new(
- "src",
- gst::PadDirection::Src,
- gst::PadPresence::Always,
- &caps,
- );
- klass.add_pad_template(src_pad_template);
-
- klass.install_properties(&PROPERTIES);
- }
-
- fn init(element: &Element) -> Box<ElementImpl<Element>> {
- let templ = element.get_pad_template("sink").unwrap();
- let sink_pad = gst::Pad::new_from_template(&templ, "sink");
- let templ = element.get_pad_template("src").unwrap();
- let src_pad = gst::Pad::new_from_template(&templ, "src");
-
- sink_pad.set_chain_function(|pad, parent, buffer| {
- Queue::catch_panic_pad_function(
- parent,
- || gst::FlowReturn::Error,
- |queue, element| queue.sink_chain(pad, element, buffer),
- )
- });
- sink_pad.set_chain_list_function(|pad, parent, list| {
- Queue::catch_panic_pad_function(
- parent,
- || gst::FlowReturn::Error,
- |queue, element| queue.sink_chain_list(pad, element, list),
- )
- });
- sink_pad.set_event_function(|pad, parent, event| {
- Queue::catch_panic_pad_function(
- parent,
- || false,
- |queue, element| queue.sink_event(pad, element, event),
- )
- });
- sink_pad.set_query_function(|pad, parent, query| {
- Queue::catch_panic_pad_function(
- parent,
- || false,
- |queue, element| queue.sink_query(pad, element, query),
- )
- });
-
- src_pad.set_event_function(|pad, parent, event| {
- Queue::catch_panic_pad_function(
- parent,
- || false,
- |queue, element| queue.src_event(pad, element, event),
- )
- });
- src_pad.set_query_function(|pad, parent, query| {
- Queue::catch_panic_pad_function(
- parent,
- || false,
- |queue, element| queue.src_query(pad, element, query),
- )
- });
- element.add_pad(&sink_pad).unwrap();
- element.add_pad(&src_pad).unwrap();
-
- Box::new(Self {
- cat: gst::DebugCategory::new(
- "ts-queue",
- gst::DebugColorFlags::empty(),
- "Thread-sharing queue",
- ),
- sink_pad: sink_pad,
- src_pad: src_pad,
- state: Mutex::new(State::default()),
- settings: Mutex::new(Settings::default()),
- })
- }
-
fn create_io_context_event(state: &State) -> Option<gst::Event> {
if let (&Some(ref pending_future_id), &Some(ref io_context)) =
(&state.pending_future_id, &state.io_context)
@@ -255,7 +175,7 @@ impl Queue {
fn enqueue_item(
&self,
_pad: &gst::Pad,
- element: &Element,
+ element: &gst::Element,
item: DataQueueItem,
) -> gst::FlowReturn {
let wait_future = {
@@ -325,7 +245,7 @@ impl Queue {
let element_clone = element.clone();
let future = future::poll_fn(move || {
- let queue = element_clone.get_impl().downcast_ref::<Queue>().unwrap();
+ let queue = Self::from_instance(&element_clone);
let mut state = queue.state.lock().unwrap();
let State {
@@ -430,7 +350,7 @@ impl Queue {
fn sink_chain(
&self,
pad: &gst::Pad,
- element: &Element,
+ element: &gst::Element,
buffer: gst::Buffer,
) -> gst::FlowReturn {
gst_log!(self.cat, obj: pad, "Handling buffer {:?}", buffer);
@@ -440,14 +360,14 @@ impl Queue {
fn sink_chain_list(
&self,
pad: &gst::Pad,
- element: &Element,
+ element: &gst::Element,
list: gst::BufferList,
) -> gst::FlowReturn {
gst_log!(self.cat, obj: pad, "Handling buffer list {:?}", list);
self.enqueue_item(pad, element, DataQueueItem::BufferList(list))
}
- fn sink_event(&self, pad: &gst::Pad, element: &Element, mut event: gst::Event) -> bool {
+ fn sink_event(&self, pad: &gst::Pad, element: &gst::Element, mut event: gst::Event) -> bool {
use gst::EventView;
gst_log!(self.cat, obj: pad, "Handling event {:?}", event);
@@ -505,7 +425,12 @@ impl Queue {
}
}
- fn sink_query(&self, pad: &gst::Pad, _element: &Element, query: &mut gst::QueryRef) -> bool {
+ fn sink_query(
+ &self,
+ pad: &gst::Pad,
+ _element: &gst::Element,
+ query: &mut gst::QueryRef,
+ ) -> bool {
gst_log!(self.cat, obj: pad, "Handling query {:?}", query);
if query.is_serialized() {
@@ -518,7 +443,7 @@ impl Queue {
}
}
- fn src_event(&self, pad: &gst::Pad, element: &Element, event: gst::Event) -> bool {
+ fn src_event(&self, pad: &gst::Pad, element: &gst::Element, event: gst::Event) -> bool {
use gst::EventView;
gst_log!(self.cat, obj: pad, "Handling event {:?}", event);
@@ -542,7 +467,12 @@ impl Queue {
self.sink_pad.push_event(event)
}
- fn src_query(&self, pad: &gst::Pad, _element: &Element, query: &mut gst::QueryRef) -> bool {
+ fn src_query(
+ &self,
+ pad: &gst::Pad,
+ _element: &gst::Element,
+ query: &mut gst::QueryRef,
+ ) -> bool {
use gst::QueryView;
gst_log!(self.cat, obj: pad, "Handling query {:?}", query);
@@ -578,7 +508,7 @@ impl Queue {
fn push_item(
&self,
- element: &Element,
+ element: &gst::Element,
item: DataQueueItem,
) -> future::Either<
Box<Future<Item = (), Error = gst::FlowError> + Send + 'static>,
@@ -679,7 +609,7 @@ impl Queue {
}
}
- fn prepare(&self, element: &Element) -> Result<(), gst::ErrorMessage> {
+ fn prepare(&self, element: &gst::Element) -> Result<(), gst::ErrorMessage> {
gst_debug!(self.cat, obj: element, "Preparing");
let settings = self.settings.lock().unwrap().clone();
@@ -719,11 +649,11 @@ impl Queue {
.schedule(
&io_context,
move |item| {
- let queue = element_clone.get_impl().downcast_ref::<Queue>().unwrap();
+ let queue = Self::from_instance(&element_clone);
queue.push_item(&element_clone, item)
},
move |err| {
- let queue = element_clone2.get_impl().downcast_ref::<Queue>().unwrap();
+ let queue = Self::from_instance(&element_clone2);
gst_error!(queue.cat, obj: &element_clone2, "Got error {}", err);
match err {
gst::FlowError::CustomError => (),
@@ -762,7 +692,7 @@ impl Queue {
Ok(())
}
- fn unprepare(&self, element: &Element) -> Result<(), ()> {
+ fn unprepare(&self, element: &gst::Element) -> Result<(), ()> {
gst_debug!(self.cat, obj: element, "Unpreparing");
// FIXME: The IO Context has to be alive longer than the queue,
@@ -792,7 +722,7 @@ impl Queue {
Ok(())
}
- fn start(&self, element: &Element) -> Result<(), ()> {
+ fn start(&self, element: &gst::Element) -> Result<(), ()> {
gst_debug!(self.cat, obj: element, "Starting");
let mut state = self.state.lock().unwrap();
@@ -806,7 +736,7 @@ impl Queue {
Ok(())
}
- fn stop(&self, element: &Element) -> Result<(), ()> {
+ fn stop(&self, element: &gst::Element) -> Result<(), ()> {
gst_debug!(self.cat, obj: element, "Stopping");
let mut state = self.state.lock().unwrap();
@@ -826,28 +756,135 @@ impl Queue {
}
}
-impl ObjectImpl<Element> for Queue {
- fn set_property(&self, _obj: &glib::Object, id: u32, value: &glib::Value) {
- let prop = &PROPERTIES[id as usize];
+impl ObjectSubclass for Queue {
+ const NAME: &'static str = "RsTsQueue";
+ type ParentType = gst::Element;
+ type Instance = gst::subclass::ElementInstanceStruct<Self>;
+ type Class = subclass::simple::ClassStruct<Self>;
+
+ glib_object_subclass!();
+
+ fn class_init(klass: &mut subclass::simple::ClassStruct<Self>) {
+ klass.set_metadata(
+ "Thread-sharing queue",
+ "Generic",
+ "Simple data queue",
+ "Sebastian Dröge <sebastian@centricular.com>",
+ );
+
+ let caps = gst::Caps::new_any();
+
+ let sink_pad_template = gst::PadTemplate::new(
+ "sink",
+ gst::PadDirection::Sink,
+ gst::PadPresence::Always,
+ &caps,
+ );
+ klass.add_pad_template(sink_pad_template);
+
+ let src_pad_template = gst::PadTemplate::new(
+ "src",
+ gst::PadDirection::Src,
+ gst::PadPresence::Always,
+ &caps,
+ );
+ klass.add_pad_template(src_pad_template);
+
+ klass.install_properties(&PROPERTIES);
+ }
+
+ fn new() -> Self {
+ unreachable!()
+ }
+
+ fn new_with_class(klass: &subclass::simple::ClassStruct<Self>) -> Self {
+ let templ = klass.get_pad_template("sink").unwrap();
+ let sink_pad = gst::Pad::new_from_template(&templ, "sink");
+ let templ = klass.get_pad_template("src").unwrap();
+ let src_pad = gst::Pad::new_from_template(&templ, "src");
+
+ sink_pad.set_chain_function(|pad, parent, buffer| {
+ Queue::catch_panic_pad_function(
+ parent,
+ || gst::FlowReturn::Error,
+ |queue, element| queue.sink_chain(pad, element, buffer),
+ )
+ });
+ sink_pad.set_chain_list_function(|pad, parent, list| {
+ Queue::catch_panic_pad_function(
+ parent,
+ || gst::FlowReturn::Error,
+ |queue, element| queue.sink_chain_list(pad, element, list),
+ )
+ });
+ sink_pad.set_event_function(|pad, parent, event| {
+ Queue::catch_panic_pad_function(
+ parent,
+ || false,
+ |queue, element| queue.sink_event(pad, element, event),
+ )
+ });
+ sink_pad.set_query_function(|pad, parent, query| {
+ Queue::catch_panic_pad_function(
+ parent,
+ || false,
+ |queue, element| queue.sink_query(pad, element, query),
+ )
+ });
+
+ src_pad.set_event_function(|pad, parent, event| {
+ Queue::catch_panic_pad_function(
+ parent,
+ || false,
+ |queue, element| queue.src_event(pad, element, event),
+ )
+ });
+ src_pad.set_query_function(|pad, parent, query| {
+ Queue::catch_panic_pad_function(
+ parent,
+ || false,
+ |queue, element| queue.src_query(pad, element, query),
+ )
+ });
+
+ Self {
+ cat: gst::DebugCategory::new(
+ "ts-queue",
+ gst::DebugColorFlags::empty(),
+ "Thread-sharing queue",
+ ),
+ sink_pad: sink_pad,
+ src_pad: src_pad,
+ state: Mutex::new(State::default()),
+ settings: Mutex::new(Settings::default()),
+ }
+ }
+}
+
+impl ObjectImpl for Queue {
+ glib_object_impl!();
+
+ fn set_property(&self, _obj: &glib::Object, id: usize, value: &glib::Value) {
+ let prop = &PROPERTIES[id];
match *prop {
- Property::UInt("max-size-buffers", ..) => {
+ subclass::Property("max-size-buffers", ..) => {
let mut settings = self.settings.lock().unwrap();
settings.max_size_buffers = value.get().unwrap();
}
- Property::UInt("max-size-bytes", ..) => {
+ subclass::Property("max-size-bytes", ..) => {
let mut settings = self.settings.lock().unwrap();
settings.max_size_bytes = value.get().unwrap();
}
- Property::UInt64("max-size-time", ..) => {
+ subclass::Property("max-size-time", ..) => {
let mut settings = self.settings.lock().unwrap();
settings.max_size_time = value.get().unwrap();
}
- Property::String("context", ..) => {
+ subclass::Property("context", ..) => {
let mut settings = self.settings.lock().unwrap();
settings.context = value.get().unwrap_or_else(|| "".into());
}
- Property::UInt("context-wait", ..) => {
+ subclass::Property("context-wait", ..) => {
let mut settings = self.settings.lock().unwrap();
settings.context_wait = value.get().unwrap();
}
@@ -855,39 +892,47 @@ impl ObjectImpl<Element> for Queue {
}
}
- fn get_property(&self, _obj: &glib::Object, id: u32) -> Result<glib::Value, ()> {
- let prop = &PROPERTIES[id as usize];
+ fn get_property(&self, _obj: &glib::Object, id: usize) -> Result<glib::Value, ()> {
+ let prop = &PROPERTIES[id];
match *prop {
- Property::UInt("max-size-buffers", ..) => {
+ subclass::Property("max-size-buffers", ..) => {
let mut settings = self.settings.lock().unwrap();
Ok(settings.max_size_buffers.to_value())
}
- Property::UInt("max-size-bytes", ..) => {
+ subclass::Property("max-size-bytes", ..) => {
let mut settings = self.settings.lock().unwrap();
Ok(settings.max_size_bytes.to_value())
}
- Property::UInt64("max-size-time", ..) => {
+ subclass::Property("max-size-time", ..) => {
let mut settings = self.settings.lock().unwrap();
Ok(settings.max_size_time.to_value())
}
- Property::String("context", ..) => {
+ subclass::Property("context", ..) => {
let mut settings = self.settings.lock().unwrap();
Ok(settings.context.to_value())
}
- Property::UInt("context-wait", ..) => {
+ subclass::Property("context-wait", ..) => {
let mut settings = self.settings.lock().unwrap();
Ok(settings.context_wait.to_value())
}
_ => unimplemented!(),
}
}
+
+ fn constructed(&self, obj: &glib::Object) {
+ self.parent_constructed(obj);
+
+ let element = obj.downcast_ref::<gst::Element>().unwrap();
+ element.add_pad(&self.sink_pad).unwrap();
+ element.add_pad(&self.src_pad).unwrap();
+ }
}
-impl ElementImpl<Element> for Queue {
+impl ElementImpl for Queue {
fn change_state(
&self,
- element: &Element,
+ element: &gst::Element,
transition: gst::StateChange,
) -> gst::StateChangeReturn {
gst_trace!(self.cat, obj: element, "Changing state {:?}", transition);
@@ -911,7 +956,7 @@ impl ElementImpl<Element> for Queue {
_ => (),
}
- let ret = element.parent_change_state(transition);
+ let ret = self.parent_change_state(element, transition);
if ret == gst::StateChangeReturn::Failure {
return ret;
}
@@ -928,24 +973,6 @@ impl ElementImpl<Element> for Queue {
}
}
-struct QueueStatic;
-
-impl ImplTypeStatic<Element> for QueueStatic {
- fn get_name(&self) -> &str {
- "Queue"
- }
-
- fn new(&self, element: &Element) -> Box<ElementImpl<Element>> {
- Queue::init(element)
- }
-
- fn class_init(&self, klass: &mut ElementClass) {
- Queue::class_init(klass);
- }
-}
-
pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> {
- let queue_static = QueueStatic;
- let type_ = register_type(queue_static);
- gst::Element::register(plugin, "ts-queue", 0, type_)
+ gst::Element::register(plugin, "ts-queue", 0, Queue::get_type())
}
diff --git a/gst-plugin-threadshare/src/tcpclientsrc.rs b/gst-plugin-threadshare/src/tcpclientsrc.rs
index 212f7a513..0817cbcab 100644
--- a/gst-plugin-threadshare/src/tcpclientsrc.rs
+++ b/gst-plugin-threadshare/src/tcpclientsrc.rs
@@ -18,11 +18,11 @@
use glib;
use glib::prelude::*;
+use glib::subclass;
+use glib::subclass::prelude::*;
use gst;
use gst::prelude::*;
-
-use gobject_subclass::object::*;
-use gst_plugin::element::*;
+use gst::subclass::prelude::*;
use std::io;
use std::sync::Mutex;
@@ -71,52 +71,67 @@ impl Default for Settings {
}
}
-static PROPERTIES: [Property; 6] = [
- Property::String(
- "address",
- "Address",
- "Address to receive packets from",
- DEFAULT_ADDRESS,
- PropertyMutability::ReadWrite,
- ),
- Property::UInt(
- "port",
- "Port",
- "Port to receive packets from",
- (0, u16::MAX as u32),
- DEFAULT_PORT,
- PropertyMutability::ReadWrite,
- ),
- Property::Boxed(
- "caps",
- "Caps",
- "Caps to use",
- gst::Caps::static_type,
- PropertyMutability::ReadWrite,
- ),
- Property::UInt(
- "chunk-size",
- "Chunk Size",
- "Chunk Size",
- (0, u16::MAX as u32),
- DEFAULT_CHUNK_SIZE,
- PropertyMutability::ReadWrite,
- ),
- Property::String(
- "context",
- "Context",
- "Context name to share threads with",
- Some(DEFAULT_CONTEXT),
- PropertyMutability::ReadWrite,
- ),
- Property::UInt(
- "context-wait",
- "Context Wait",
- "Throttle poll loop to run at most once every this many ms",
- (0, 1000),
- DEFAULT_CONTEXT_WAIT,
- PropertyMutability::ReadWrite,
- ),
+static PROPERTIES: [subclass::Property; 6] = [
+ subclass::Property("address", || {
+ glib::ParamSpec::string(
+ "address",
+ "Address",
+ "Address to receive packets from",
+ DEFAULT_ADDRESS,
+ glib::ParamFlags::READWRITE,
+ )
+ }),
+ subclass::Property("port", || {
+ glib::ParamSpec::uint(
+ "port",
+ "Port",
+ "Port to receive packets from",
+ 0,
+ u16::MAX as u32,
+ DEFAULT_PORT,
+ glib::ParamFlags::READWRITE,
+ )
+ }),
+ subclass::Property("caps", || {
+ glib::ParamSpec::boxed(
+ "caps",
+ "Caps",
+ "Caps to use",
+ gst::Caps::static_type(),
+ glib::ParamFlags::READWRITE,
+ )
+ }),
+ subclass::Property("chunk-size", || {
+ glib::ParamSpec::uint(
+ "chunk-size",
+ "Chunk Size",
+ "Chunk Size",
+ 0,
+ u16::MAX as u32,
+ DEFAULT_CHUNK_SIZE,
+ glib::ParamFlags::READWRITE,
+ )
+ }),
+ subclass::Property("context", || {
+ glib::ParamSpec::string(
+ "context",
+ "Context",
+ "Context name to share threads with",
+ Some(DEFAULT_CONTEXT),
+ glib::ParamFlags::READWRITE,
+ )
+ }),
+ subclass::Property("context-wait", || {
+ glib::ParamSpec::uint(
+ "context-wait",
+ "Context Wait",
+ "Throttle poll loop to run at most once every this many ms",
+ 0,
+ 1000,
+ DEFAULT_CONTEXT_WAIT,
+ glib::ParamFlags::READWRITE,
+ )
+ }),
];
pub struct TcpClientReader {
@@ -185,61 +200,7 @@ struct TcpClientSrc {
}
impl TcpClientSrc {
- fn class_init(klass: &mut ElementClass) {
- klass.set_metadata(
- "Thread-sharing TCP client source",
- "Source/Network",
- "Receives data over the network via TCP",
- "Sebastian Dröge <sebastian@centricular.com>, LEE Dongjun <redongjun@gmail.com>",
- );
-
- let caps = gst::Caps::new_any();
- let src_pad_template = gst::PadTemplate::new(
- "src",
- gst::PadDirection::Src,
- gst::PadPresence::Always,
- &caps,
- );
- klass.add_pad_template(src_pad_template);
-
- klass.install_properties(&PROPERTIES);
- }
-
- fn init(element: &Element) -> Box<ElementImpl<Element>> {
- let templ = element.get_pad_template("src").unwrap();
- let src_pad = gst::Pad::new_from_template(&templ, "src");
-
- src_pad.set_event_function(|pad, parent, event| {
- TcpClientSrc::catch_panic_pad_function(
- parent,
- || false,
- |tcpclientsrc, element| tcpclientsrc.src_event(pad, element, event),
- )
- });
- src_pad.set_query_function(|pad, parent, query| {
- TcpClientSrc::catch_panic_pad_function(
- parent,
- || false,
- |tcpclientsrc, element| tcpclientsrc.src_query(pad, element, query),
- )
- });
- element.add_pad(&src_pad).unwrap();
-
- ::set_element_flags(element, gst::ElementFlags::SOURCE);
-
- Box::new(Self {
- cat: gst::DebugCategory::new(
- "ts-tcpclientsrc",
- gst::DebugColorFlags::empty(),
- "Thread-sharing TCP Client source",
- ),
- src_pad: src_pad,
- state: Mutex::new(State::default()),
- settings: Mutex::new(Settings::default()),
- })
- }
-
- fn src_event(&self, pad: &gst::Pad, element: &Element, event: gst::Event) -> bool {
+ fn src_event(&self, pad: &gst::Pad, element: &gst::Element, event: gst::Event) -> bool {
use gst::EventView;
gst_log!(self.cat, obj: pad, "Handling event {:?}", event);
@@ -271,7 +232,12 @@ impl TcpClientSrc {
ret
}
- fn src_query(&self, pad: &gst::Pad, _element: &Element, query: &mut gst::QueryRef) -> bool {
+ fn src_query(
+ &self,
+ pad: &gst::Pad,
+ _element: &gst::Element,
+ query: &mut gst::QueryRef,
+ ) -> bool {
use gst::QueryView;
gst_log!(self.cat, obj: pad, "Handling query {:?}", query);
@@ -331,7 +297,7 @@ impl TcpClientSrc {
fn push_buffer(
&self,
- element: &Element,
+ element: &gst::Element,
buffer: gst::Buffer,
) -> future::Either<
Box<Future<Item = (), Error = gst::FlowError> + Send + 'static>,
@@ -431,7 +397,7 @@ impl TcpClientSrc {
}
}
- fn prepare(&self, element: &Element) -> Result<(), gst::ErrorMessage> {
+ fn prepare(&self, element: &gst::Element) -> Result<(), gst::ErrorMessage> {
use std::net::{IpAddr, SocketAddr};
gst_debug!(self.cat, obj: element, "Preparing");
@@ -493,17 +459,11 @@ impl TcpClientSrc {
.schedule(
&io_context,
move |buffer| {
- let tcpclientsrc = element_clone
- .get_impl()
- .downcast_ref::<TcpClientSrc>()
- .unwrap();
+ let tcpclientsrc = Self::from_instance(&element_clone);
tcpclientsrc.push_buffer(&element_clone, buffer)
},
move |err| {
- let tcpclientsrc = element_clone2
- .get_impl()
- .downcast_ref::<TcpClientSrc>()
- .unwrap();
+ let tcpclientsrc = Self::from_instance(&element_clone2);
gst_error!(tcpclientsrc.cat, obj: &element_clone2, "Got error {}", err);
match err {
Either::Left(gst::FlowError::CustomError) => (),
@@ -547,7 +507,7 @@ impl TcpClientSrc {
Ok(())
}
- fn unprepare(&self, element: &Element) -> Result<(), ()> {
+ fn unprepare(&self, element: &gst::Element) -> Result<(), ()> {
gst_debug!(self.cat, obj: element, "Unpreparing");
// FIXME: The IO Context has to be alive longer than the queue,
@@ -576,7 +536,7 @@ impl TcpClientSrc {
Ok(())
}
- fn start(&self, element: &Element) -> Result<(), ()> {
+ fn start(&self, element: &gst::Element) -> Result<(), ()> {
gst_debug!(self.cat, obj: element, "Starting");
let state = self.state.lock().unwrap();
@@ -589,7 +549,7 @@ impl TcpClientSrc {
Ok(())
}
- fn stop(&self, element: &Element) -> Result<(), ()> {
+ fn stop(&self, element: &gst::Element) -> Result<(), ()> {
gst_debug!(self.cat, obj: element, "Stopping");
let mut state = self.state.lock().unwrap();
@@ -604,32 +564,98 @@ impl TcpClientSrc {
}
}
-impl ObjectImpl<Element> for TcpClientSrc {
- fn set_property(&self, _obj: &glib::Object, id: u32, value: &glib::Value) {
- let prop = &PROPERTIES[id as usize];
+impl ObjectSubclass for TcpClientSrc {
+ const NAME: &'static str = "RsTsTcpClientSrc";
+ type ParentType = gst::Element;
+ type Instance = gst::subclass::ElementInstanceStruct<Self>;
+ type Class = subclass::simple::ClassStruct<Self>;
+
+ glib_object_subclass!();
+
+ fn class_init(klass: &mut subclass::simple::ClassStruct<Self>) {
+ klass.set_metadata(
+ "Thread-sharing TCP client source",
+ "Source/Network",
+ "Receives data over the network via TCP",
+ "Sebastian Dröge <sebastian@centricular.com>, LEE Dongjun <redongjun@gmail.com>",
+ );
+
+ let caps = gst::Caps::new_any();
+ let src_pad_template = gst::PadTemplate::new(
+ "src",
+ gst::PadDirection::Src,
+ gst::PadPresence::Always,
+ &caps,
+ );
+ klass.add_pad_template(src_pad_template);
+
+ klass.install_properties(&PROPERTIES);
+ }
+
+ fn new() -> Self {
+ unreachable!()
+ }
+
+ fn new_with_class(klass: &subclass::simple::ClassStruct<Self>) -> Self {
+ let templ = klass.get_pad_template("src").unwrap();
+ let src_pad = gst::Pad::new_from_template(&templ, "src");
+
+ src_pad.set_event_function(|pad, parent, event| {
+ TcpClientSrc::catch_panic_pad_function(
+ parent,
+ || false,
+ |tcpclientsrc, element| tcpclientsrc.src_event(pad, element, event),
+ )
+ });
+ src_pad.set_query_function(|pad, parent, query| {
+ TcpClientSrc::catch_panic_pad_function(
+ parent,
+ || false,
+ |tcpclientsrc, element| tcpclientsrc.src_query(pad, element, query),
+ )
+ });
+
+ Self {
+ cat: gst::DebugCategory::new(
+ "ts-tcpclientsrc",
+ gst::DebugColorFlags::empty(),
+ "Thread-sharing TCP Client source",
+ ),
+ src_pad: src_pad,
+ state: Mutex::new(State::default()),
+ settings: Mutex::new(Settings::default()),
+ }
+ }
+}
+
+impl ObjectImpl for TcpClientSrc {
+ glib_object_impl!();
+
+ fn set_property(&self, _obj: &glib::Object, id: usize, value: &glib::Value) {
+ let prop = &PROPERTIES[id];
match *prop {
- Property::String("address", ..) => {
+ subclass::Property("address", ..) => {
let mut settings = self.settings.lock().unwrap();
settings.address = value.get();
}
- Property::UInt("port", ..) => {
+ subclass::Property("port", ..) => {
let mut settings = self.settings.lock().unwrap();
settings.port = value.get().unwrap();
}
- Property::Boxed("caps", ..) => {
+ subclass::Property("caps", ..) => {
let mut settings = self.settings.lock().unwrap();
settings.caps = value.get();
}
- Property::UInt("chunk-size", ..) => {
+ subclass::Property("chunk-size", ..) => {
let mut settings = self.settings.lock().unwrap();
settings.chunk_size = value.get().unwrap();
}
- Property::String("context", ..) => {
+ subclass::Property("context", ..) => {
let mut settings = self.settings.lock().unwrap();
settings.context = value.get().unwrap_or_else(|| "".into());
}
- Property::UInt("context-wait", ..) => {
+ subclass::Property("context-wait", ..) => {
let mut settings = self.settings.lock().unwrap();
settings.context_wait = value.get().unwrap();
}
@@ -637,43 +663,52 @@ impl ObjectImpl<Element> for TcpClientSrc {
}
}
- fn get_property(&self, _obj: &glib::Object, id: u32) -> Result<glib::Value, ()> {
- let prop = &PROPERTIES[id as usize];
+ fn get_property(&self, _obj: &glib::Object, id: usize) -> Result<glib::Value, ()> {
+ let prop = &PROPERTIES[id];
match *prop {
- Property::String("address", ..) => {
+ subclass::Property("address", ..) => {
let mut settings = self.settings.lock().unwrap();
Ok(settings.address.to_value())
}
- Property::UInt("port", ..) => {
+ subclass::Property("port", ..) => {
let mut settings = self.settings.lock().unwrap();
Ok(settings.port.to_value())
}
- Property::Boxed("caps", ..) => {
+ subclass::Property("caps", ..) => {
let mut settings = self.settings.lock().unwrap();
Ok(settings.caps.to_value())
}
- Property::UInt("chunk-size", ..) => {
+ subclass::Property("chunk-size", ..) => {
let mut settings = self.settings.lock().unwrap();
Ok(settings.chunk_size.to_value())
}
- Property::String("context", ..) => {
+ subclass::Property("context", ..) => {
let mut settings = self.settings.lock().unwrap();
Ok(settings.context.to_value())
}
- Property::UInt("context-wait", ..) => {
+ subclass::Property("context-wait", ..) => {
let mut settings = self.settings.lock().unwrap();
Ok(settings.context_wait.to_value())
}
_ => unimplemented!(),
}
}
+
+ fn constructed(&self, obj: &glib::Object) {
+ self.parent_constructed(obj);
+
+ let element = obj.downcast_ref::<gst::Element>().unwrap();
+ element.add_pad(&self.src_pad).unwrap();
+
+ ::set_element_flags(element, gst::ElementFlags::SOURCE);
+ }
}
-impl ElementImpl<Element> for TcpClientSrc {
+impl ElementImpl for TcpClientSrc {
fn change_state(
&self,
- element: &Element,
+ element: &gst::Element,
transition: gst::StateChange,
) -> gst::StateChangeReturn {
gst_trace!(self.cat, obj: element, "Changing state {:?}", transition);
@@ -699,7 +734,7 @@ impl ElementImpl<Element> for TcpClientSrc {
_ => (),
}
- let mut ret = element.parent_change_state(transition);
+ let mut ret = self.parent_change_state(element, transition);
if ret == gst::StateChangeReturn::Failure {
return ret;
}
@@ -719,24 +754,6 @@ impl ElementImpl<Element> for TcpClientSrc {
}
}
-struct TcpClientSrcStatic;
-
-impl ImplTypeStatic<Element> for TcpClientSrcStatic {
- fn get_name(&self) -> &str {
- "TcpClientSrc"
- }
-
- fn new(&self, element: &Element) -> Box<ElementImpl<Element>> {
- TcpClientSrc::init(element)
- }
-
- fn class_init(&self, klass: &mut ElementClass) {
- TcpClientSrc::class_init(klass);
- }
-}
-
pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> {
- let tcpclientsrc_static = TcpClientSrcStatic;
- let type_ = register_type(tcpclientsrc_static);
- gst::Element::register(plugin, "ts-tcpclientsrc", 0, type_)
+ gst::Element::register(plugin, "ts-tcpclientsrc", 0, TcpClientSrc::get_type())
}
diff --git a/gst-plugin-threadshare/src/udpsrc.rs b/gst-plugin-threadshare/src/udpsrc.rs
index 884e98571..596da6b2d 100644
--- a/gst-plugin-threadshare/src/udpsrc.rs
+++ b/gst-plugin-threadshare/src/udpsrc.rs
@@ -17,17 +17,17 @@
use glib;
use glib::prelude::*;
+use glib::subclass;
+use glib::subclass::prelude::*;
use gst;
use gst::prelude::*;
+use gst::subclass::prelude::*;
use gio;
use gio_ffi;
use gobject_ffi;
-use gobject_subclass::object::*;
-use gst_plugin::element::*;
-
use std::io;
use std::sync::Mutex;
use std::u16;
@@ -179,73 +179,94 @@ impl Default for Settings {
}
}
-static PROPERTIES: [Property; 9] = [
- Property::String(
- "address",
- "Address",
- "Address/multicast group to listen on",
- DEFAULT_ADDRESS,
- PropertyMutability::ReadWrite,
- ),
- Property::UInt(
- "port",
- "Port",
- "Port to listen on",
- (0, u16::MAX as u32),
- DEFAULT_PORT,
- PropertyMutability::ReadWrite,
- ),
- Property::Boolean(
- "reuse",
- "Reuse",
- "Allow reuse of the port",
- DEFAULT_REUSE,
- PropertyMutability::ReadWrite,
- ),
- Property::Boxed(
- "caps",
- "Caps",
- "Caps to use",
- gst::Caps::static_type,
- PropertyMutability::ReadWrite,
- ),
- Property::UInt(
- "mtu",
- "MTU",
- "MTU",
- (0, u16::MAX as u32),
- DEFAULT_MTU,
- PropertyMutability::ReadWrite,
- ),
- Property::Object(
- "socket",
- "Socket",
- "Socket to use for UDP reception. (None == allocate)",
- gio::Socket::static_type,
- PropertyMutability::ReadWrite,
- ),
- Property::Object(
- "used-socket",
- "Used Socket",
- "Socket currently in use for UDP reception. (None = no socket)",
- gio::Socket::static_type,
- PropertyMutability::Readable,
- ),
- Property::String(
- "context",
- "Context",
- "Context name to share threads with",
- Some(DEFAULT_CONTEXT),
- PropertyMutability::ReadWrite,
- ),
- Property::UInt(
- "context-wait",
- "Context Wait",
- "Throttle poll loop to run at most once every this many ms",
- (0, 1000),
- DEFAULT_CONTEXT_WAIT,
- PropertyMutability::ReadWrite,
- ),
+static PROPERTIES: [subclass::Property; 9] = [
+ subclass::Property("address", || {
+ glib::ParamSpec::string(
+ "address",
+ "Address",
+ "Address/multicast group to listen on",
+ DEFAULT_ADDRESS,
+ glib::ParamFlags::READWRITE,
+ )
+ }),
+ subclass::Property("port", || {
+ glib::ParamSpec::uint(
+ "port",
+ "Port",
+ "Port to listen on",
+ 0,
+ u16::MAX as u32,
+ DEFAULT_PORT,
+ glib::ParamFlags::READWRITE,
+ )
+ }),
+ subclass::Property("reuse", || {
+ glib::ParamSpec::boolean(
+ "reuse",
+ "Reuse",
+ "Allow reuse of the port",
+ DEFAULT_REUSE,
+ glib::ParamFlags::READWRITE,
+ )
+ }),
+ subclass::Property("caps", || {
+ glib::ParamSpec::boxed(
+ "caps",
+ "Caps",
+ "Caps to use",
+ gst::Caps::static_type(),
+ glib::ParamFlags::READWRITE,
+ )
+ }),
+ subclass::Property("mtu", || {
+ glib::ParamSpec::uint(
+ "mtu",
+ "MTU",
+ "MTU",
+ 0,
+ u16::MAX as u32,
+ DEFAULT_MTU,
+ glib::ParamFlags::READWRITE,
+ )
+ }),
+ subclass::Property("socket", || {
+ glib::ParamSpec::object(
+ "socket",
+ "Socket",
+ "Socket to use for UDP reception. (None == allocate)",
+ gio::Socket::static_type(),
+ glib::ParamFlags::READWRITE,
+ )
+ }),
+ subclass::Property("used-socket", || {
+ glib::ParamSpec::object(
+ "used-socket",
+ "Used Socket",
+ "Socket currently in use for UDP reception. (None = no socket)",
+ gio::Socket::static_type(),
+ glib::ParamFlags::READABLE,
+ )
+ }),
+ subclass::Property("context", || {
+ glib::ParamSpec::string(
+ "context",
+ "Context",
+ "Context name to share threads with",
+ Some(DEFAULT_CONTEXT),
+ glib::ParamFlags::READWRITE,
+ )
+ }),
+ subclass::Property("context-wait", || {
+ glib::ParamSpec::uint(
+ "context-wait",
+ "Context Wait",
+ "Throttle poll loop to run at most once every this many ms",
+ 0,
+ 1000,
+ DEFAULT_CONTEXT_WAIT,
+ glib::ParamFlags::READWRITE,
+ )
+ }),
];
pub struct UdpReader {
@@ -296,61 +317,7 @@ struct UdpSrc {
}
impl UdpSrc {
- fn class_init(klass: &mut ElementClass) {
- klass.set_metadata(
- "Thread-sharing UDP source",
- "Source/Network",
- "Receives data over the network via UDP",
- "Sebastian Dröge <sebastian@centricular.com>",
- );
-
- let caps = gst::Caps::new_any();
- let src_pad_template = gst::PadTemplate::new(
- "src",
- gst::PadDirection::Src,
- gst::PadPresence::Always,
- &caps,
- );
- klass.add_pad_template(src_pad_template);
-
- klass.install_properties(&PROPERTIES);
- }
-
- fn init(element: &Element) -> Box<ElementImpl<Element>> {
- let templ = element.get_pad_template("src").unwrap();
- let src_pad = gst::Pad::new_from_template(&templ, "src");
-
- src_pad.set_event_function(|pad, parent, event| {
- UdpSrc::catch_panic_pad_function(
- parent,
- || false,
- |udpsrc, element| udpsrc.src_event(pad, element, event),
- )
- });
- src_pad.set_query_function(|pad, parent, query| {
- UdpSrc::catch_panic_pad_function(
- parent,
- || false,
- |udpsrc, element| udpsrc.src_query(pad, element, query),
- )
- });
- element.add_pad(&src_pad).unwrap();
-
- ::set_element_flags(element, gst::ElementFlags::SOURCE);
-
- Box::new(Self {
- cat: gst::DebugCategory::new(
- "ts-udpsrc",
- gst::DebugColorFlags::empty(),
- "Thread-sharing UDP source",
- ),
- src_pad: src_pad,
- state: Mutex::new(State::default()),
- settings: Mutex::new(Settings::default()),
- })
- }
-
- fn src_event(&self, pad: &gst::Pad, element: &Element, event: gst::Event) -> bool {
+ fn src_event(&self, pad: &gst::Pad, element: &gst::Element, event: gst::Event) -> bool {
use gst::EventView;
gst_log!(self.cat, obj: pad, "Handling event {:?}", event);
@@ -382,7 +349,12 @@ impl UdpSrc {
ret
}
- fn src_query(&self, pad: &gst::Pad, _element: &Element, query: &mut gst::QueryRef) -> bool {
+ fn src_query(
+ &self,
+ pad: &gst::Pad,
+ _element: &gst::Element,
+ query: &mut gst::QueryRef,
+ ) -> bool {
use gst::QueryView;
gst_log!(self.cat, obj: pad, "Handling query {:?}", query);
@@ -442,7 +414,7 @@ impl UdpSrc {
fn push_buffer(
&self,
- element: &Element,
+ element: &gst::Element,
buffer: gst::Buffer,
) -> future::Either<
Box<Future<Item = (), Error = gst::FlowError> + Send + 'static>,
@@ -537,7 +509,7 @@ impl UdpSrc {
}
}
- fn prepare(&self, element: &Element) -> Result<(), gst::ErrorMessage> {
+ fn prepare(&self, element: &gst::Element) -> Result<(), gst::ErrorMessage> {
use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr};
gst_debug!(self.cat, obj: element, "Preparing");
@@ -768,11 +740,11 @@ impl UdpSrc {
.schedule(
&io_context,
move |buffer| {
- let udpsrc = element_clone.get_impl().downcast_ref::<UdpSrc>().unwrap();
+ let udpsrc = Self::from_instance(&element_clone);
udpsrc.push_buffer(&element_clone, buffer)
},
move |err| {
- let udpsrc = element_clone2.get_impl().downcast_ref::<UdpSrc>().unwrap();
+ let udpsrc = Self::from_instance(&element_clone2);
gst_error!(udpsrc.cat, obj: &element_clone2, "Got error {}", err);
match err {
Either::Left(gst::FlowError::CustomError) => (),
@@ -819,7 +791,7 @@ impl UdpSrc {
Ok(())
}
- fn unprepare(&self, element: &Element) -> Result<(), ()> {
+ fn unprepare(&self, element: &gst::Element) -> Result<(), ()> {
gst_debug!(self.cat, obj: element, "Unpreparing");
self.settings.lock().unwrap().used_socket = None;
@@ -850,7 +822,7 @@ impl UdpSrc {
Ok(())
}
- fn start(&self, element: &Element) -> Result<(), ()> {
+ fn start(&self, element: &gst::Element) -> Result<(), ()> {
gst_debug!(self.cat, obj: element, "Starting");
let state = self.state.lock().unwrap();
@@ -863,7 +835,7 @@ impl UdpSrc {
Ok(())
}
- fn stop(&self, element: &Element) -> Result<(), ()> {
+ fn stop(&self, element: &gst::Element) -> Result<(), ()> {
gst_debug!(self.cat, obj: element, "Stopping");
let mut state = self.state.lock().unwrap();
@@ -878,45 +850,111 @@ impl UdpSrc {
}
}
-impl ObjectImpl<Element> for UdpSrc {
- fn set_property(&self, _obj: &glib::Object, id: u32, value: &glib::Value) {
- let prop = &PROPERTIES[id as usize];
+impl ObjectSubclass for UdpSrc {
+ const NAME: &'static str = "RsTsUdpSrc";
+ type ParentType = gst::Element;
+ type Instance = gst::subclass::ElementInstanceStruct<Self>;
+ type Class = subclass::simple::ClassStruct<Self>;
+
+ glib_object_subclass!();
+
+ fn class_init(klass: &mut subclass::simple::ClassStruct<Self>) {
+ klass.set_metadata(
+ "Thread-sharing UDP source",
+ "Source/Network",
+ "Receives data over the network via UDP",
+ "Sebastian Dröge <sebastian@centricular.com>",
+ );
+
+ let caps = gst::Caps::new_any();
+ let src_pad_template = gst::PadTemplate::new(
+ "src",
+ gst::PadDirection::Src,
+ gst::PadPresence::Always,
+ &caps,
+ );
+ klass.add_pad_template(src_pad_template);
+
+ klass.install_properties(&PROPERTIES);
+ }
+
+ fn new() -> Self {
+ unreachable!()
+ }
+
+ fn new_with_class(klass: &subclass::simple::ClassStruct<Self>) -> Self {
+ let templ = klass.get_pad_template("src").unwrap();
+ let src_pad = gst::Pad::new_from_template(&templ, "src");
+
+ src_pad.set_event_function(|pad, parent, event| {
+ UdpSrc::catch_panic_pad_function(
+ parent,
+ || false,
+ |udpsrc, element| udpsrc.src_event(pad, element, event),
+ )
+ });
+ src_pad.set_query_function(|pad, parent, query| {
+ UdpSrc::catch_panic_pad_function(
+ parent,
+ || false,
+ |udpsrc, element| udpsrc.src_query(pad, element, query),
+ )
+ });
+
+ Self {
+ cat: gst::DebugCategory::new(
+ "ts-udpsrc",
+ gst::DebugColorFlags::empty(),
+ "Thread-sharing UDP source",
+ ),
+ src_pad: src_pad,
+ state: Mutex::new(State::default()),
+ settings: Mutex::new(Settings::default()),
+ }
+ }
+}
+
+impl ObjectImpl for UdpSrc {
+ glib_object_impl!();
+
+ fn set_property(&self, _obj: &glib::Object, id: usize, value: &glib::Value) {
+ let prop = &PROPERTIES[id];
match *prop {
- Property::String("address", ..) => {
+ subclass::Property("address", ..) => {
let mut settings = self.settings.lock().unwrap();
settings.address = value.get();
}
- Property::UInt("port", ..) => {
+ subclass::Property("port", ..) => {
let mut settings = self.settings.lock().unwrap();
settings.port = value.get().unwrap();
}
- Property::Boolean("reuse", ..) => {
+ subclass::Property("reuse", ..) => {
let mut settings = self.settings.lock().unwrap();
settings.reuse = value.get().unwrap();
}
- Property::Boxed("caps", ..) => {
+ subclass::Property("caps", ..) => {
let mut settings = self.settings.lock().unwrap();
settings.caps = value.get();
}
- Property::UInt("mtu", ..) => {
+ subclass::Property("mtu", ..) => {
let mut settings = self.settings.lock().unwrap();
settings.mtu = value.get().unwrap();
}
- Property::Object("socket", ..) => {
+ subclass::Property("socket", ..) => {
let mut settings = self.settings.lock().unwrap();
settings.socket = value
.get::<gio::Socket>()
.map(|socket| GioSocketWrapper::new(&socket));
}
- Property::Object("used-socket", ..) => {
+ subclass::Property("used-socket", ..) => {
unreachable!();
}
- Property::String("context", ..) => {
+ subclass::Property("context", ..) => {
let mut settings = self.settings.lock().unwrap();
settings.context = value.get().unwrap_or_else(|| "".into());
}
- Property::UInt("context-wait", ..) => {
+ subclass::Property("context-wait", ..) => {
let mut settings = self.settings.lock().unwrap();
settings.context_wait = value.get().unwrap();
}
@@ -924,31 +962,31 @@ impl ObjectImpl<Element> for UdpSrc {
}
}
- fn get_property(&self, _obj: &glib::Object, id: u32) -> Result<glib::Value, ()> {
- let prop = &PROPERTIES[id as usize];
+ fn get_property(&self, _obj: &glib::Object, id: usize) -> Result<glib::Value, ()> {
+ let prop = &PROPERTIES[id];
match *prop {
- Property::String("address", ..) => {
+ subclass::Property("address", ..) => {
let mut settings = self.settings.lock().unwrap();
Ok(settings.address.to_value())
}
- Property::UInt("port", ..) => {
+ subclass::Property("port", ..) => {
let mut settings = self.settings.lock().unwrap();
Ok(settings.port.to_value())
}
- Property::Boolean("reuse", ..) => {
+ subclass::Property("reuse", ..) => {
let mut settings = self.settings.lock().unwrap();
Ok(settings.reuse.to_value())
}
- Property::Boxed("caps", ..) => {
+ subclass::Property("caps", ..) => {
let mut settings = self.settings.lock().unwrap();
Ok(settings.caps.to_value())
}
- Property::UInt("mtu", ..) => {
+ subclass::Property("mtu", ..) => {
let mut settings = self.settings.lock().unwrap();
Ok(settings.mtu.to_value())
}
- Property::Object("socket", ..) => {
+ subclass::Property("socket", ..) => {
let mut settings = self.settings.lock().unwrap();
Ok(settings
.socket
@@ -956,7 +994,7 @@ impl ObjectImpl<Element> for UdpSrc {
.map(GioSocketWrapper::as_socket)
.to_value())
}
- Property::Object("used-socket", ..) => {
+ subclass::Property("used-socket", ..) => {
let mut settings = self.settings.lock().unwrap();
Ok(settings
.used_socket
@@ -964,23 +1002,31 @@ impl ObjectImpl<Element> for UdpSrc {
.map(GioSocketWrapper::as_socket)
.to_value())
}
- Property::String("context", ..) => {
+ subclass::Property("context", ..) => {
let mut settings = self.settings.lock().unwrap();
Ok(settings.context.to_value())
}
- Property::UInt("context-wait", ..) => {
+ subclass::Property("context-wait", ..) => {
let mut settings = self.settings.lock().unwrap();
Ok(settings.context_wait.to_value())
}
_ => unimplemented!(),
}
}
+
+ fn constructed(&self, obj: &glib::Object) {
+ self.parent_constructed(obj);
+
+ let element = obj.downcast_ref::<gst::Element>().unwrap();
+ element.add_pad(&self.src_pad).unwrap();
+ ::set_element_flags(element, gst::ElementFlags::SOURCE);
+ }
}
-impl ElementImpl<Element> for UdpSrc {
+impl ElementImpl for UdpSrc {
fn change_state(
&self,
- element: &Element,
+ element: &gst::Element,
transition: gst::StateChange,
) -> gst::StateChangeReturn {
gst_trace!(self.cat, obj: element, "Changing state {:?}", transition);
@@ -1004,7 +1050,7 @@ impl ElementImpl<Element> for UdpSrc {
_ => (),
}
- let mut ret = element.parent_change_state(transition);
+ let mut ret = self.parent_change_state(element, transition);
if ret == gst::StateChangeReturn::Failure {
return ret;
}
@@ -1028,24 +1074,6 @@ impl ElementImpl<Element> for UdpSrc {
}
}
-struct UdpSrcStatic;
-
-impl ImplTypeStatic<Element> for UdpSrcStatic {
- fn get_name(&self) -> &str {
- "UdpSrc"
- }
-
- fn new(&self, element: &Element) -> Box<ElementImpl<Element>> {
- UdpSrc::init(element)
- }
-
- fn class_init(&self, klass: &mut ElementClass) {
- UdpSrc::class_init(klass);
- }
-}
-
pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> {
- let udpsrc_static = UdpSrcStatic;
- let type_ = register_type(udpsrc_static);
- gst::Element::register(plugin, "ts-udpsrc", 0, type_)
+ gst::Element::register(plugin, "ts-udpsrc", 0, UdpSrc::get_type())
}