Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.freedesktop.org/gstreamer/gst-plugins-rs.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSebastian Dröge <sebastian@centricular.com>2018-11-13 15:13:23 +0300
committerSebastian Dröge <sebastian@centricular.com>2018-11-13 15:13:23 +0300
commitab08cbd412bcf61c959db9120dfa5b88aa3299d2 (patch)
treef047de5eb9ba25bc52867025a9a7728239e629b9
parent4fb18382c28f01af06a6205b155137420c852c2e (diff)
ts-udpsrc: Implement socket and used-socket properties like in udpsrc
-rw-r--r--gst-plugin-threadshare/Cargo.toml3
-rw-r--r--gst-plugin-threadshare/src/lib.rs3
-rw-r--r--gst-plugin-threadshare/src/udpsrc.rs370
3 files changed, 292 insertions, 84 deletions
diff --git a/gst-plugin-threadshare/Cargo.toml b/gst-plugin-threadshare/Cargo.toml
index 770929341..df87781c6 100644
--- a/gst-plugin-threadshare/Cargo.toml
+++ b/gst-plugin-threadshare/Cargo.toml
@@ -6,8 +6,11 @@ license = "LGPL-2.1+"
[dependencies]
glib-sys = { git = "https://github.com/gtk-rs/sys" }
+gobject-sys = { git = "https://github.com/gtk-rs/sys" }
+gio-sys = { git = "https://github.com/gtk-rs/sys" }
gstreamer-sys = { git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs-sys" }
glib = { git = "https://github.com/gtk-rs/glib" }
+gio = { git = "https://github.com/gtk-rs/gio" }
gstreamer = { git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs" }
gobject-subclass = { git = "https://github.com/gtk-rs/gobject-subclass" }
gst-plugin = { path = "../gst-plugin" }
diff --git a/gst-plugin-threadshare/src/lib.rs b/gst-plugin-threadshare/src/lib.rs
index 459421df7..446e4703b 100644
--- a/gst-plugin-threadshare/src/lib.rs
+++ b/gst-plugin-threadshare/src/lib.rs
@@ -17,9 +17,12 @@
#![crate_type = "cdylib"]
+extern crate gio_sys as gio_ffi;
extern crate glib_sys as glib_ffi;
+extern crate gobject_sys as gobject_ffi;
extern crate gstreamer_sys as gst_ffi;
+extern crate gio;
extern crate glib;
extern crate gobject_subclass;
#[macro_use]
diff --git a/gst-plugin-threadshare/src/udpsrc.rs b/gst-plugin-threadshare/src/udpsrc.rs
index 3fa3b964a..1707320e0 100644
--- a/gst-plugin-threadshare/src/udpsrc.rs
+++ b/gst-plugin-threadshare/src/udpsrc.rs
@@ -20,6 +20,11 @@ use glib::prelude::*;
use gst;
use gst::prelude::*;
+use gio;
+
+use gio_ffi;
+use gobject_ffi;
+
use gobject_subclass::object::*;
use gst_plugin::element::*;
@@ -38,6 +43,12 @@ use rand;
use net2;
+#[cfg(unix)]
+use std::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd};
+
+#[cfg(windows)]
+use std::os::windows::io::{AsRawSocket, FromRawSocket, IntoRawFd, RawSocket};
+
use iocontext::*;
use socket::*;
@@ -46,9 +57,69 @@ const DEFAULT_PORT: u32 = 5000;
const DEFAULT_REUSE: bool = true;
const DEFAULT_CAPS: Option<gst::Caps> = None;
const DEFAULT_MTU: u32 = 1500;
+const DEFAULT_SOCKET: Option<GioSocketWrapper> = None;
+const DEFAULT_USED_SOCKET: Option<GioSocketWrapper> = None;
const DEFAULT_CONTEXT: &'static str = "";
const DEFAULT_CONTEXT_WAIT: u32 = 0;
+// Send/Sync struct for passing around a gio::Socket
+// and getting the raw fd from it
+//
+// gio::Socket is not Send/Sync as it's generally unsafe
+// to access it from multiple threads. Getting the underlying raw
+// fd is safe though, as is receiving/sending from two different threads
+#[derive(Debug)]
+struct GioSocketWrapper {
+ socket: *mut gio_ffi::GSocket,
+}
+
+unsafe impl Send for GioSocketWrapper {}
+unsafe impl Sync for GioSocketWrapper {}
+
+impl GioSocketWrapper {
+ fn new(socket: &gio::Socket) -> Self {
+ use glib::translate::*;
+
+ Self {
+ socket: socket.to_glib_full(),
+ }
+ }
+
+ fn as_socket(&self) -> gio::Socket {
+ unsafe {
+ use glib::translate::*;
+
+ from_glib_none(self.socket)
+ }
+ }
+
+ #[cfg(unix)]
+ fn get<T: FromRawFd>(&self) -> T {
+ unsafe { FromRawFd::from_raw_fd(gio_ffi::g_socket_get_fd(self.socket)) }
+ }
+
+ #[cfg(windows)]
+ fn get<T: FromRawSocket>(&self) -> T {
+ unsafe { FromRawSocket::from_raw_socket(ffi::g_socket_get_fd(self.socket) as _) }
+ }
+}
+
+impl Clone for GioSocketWrapper {
+ fn clone(&self) -> Self {
+ Self {
+ socket: unsafe { gobject_ffi::g_object_ref(self.socket as *mut _) as *mut _ },
+ }
+ }
+}
+
+impl Drop for GioSocketWrapper {
+ fn drop(&mut self) {
+ unsafe {
+ gobject_ffi::g_object_unref(self.socket as *mut _);
+ }
+ }
+}
+
#[derive(Debug, Clone)]
struct Settings {
address: Option<String>,
@@ -56,6 +127,8 @@ struct Settings {
reuse: bool,
caps: Option<gst::Caps>,
mtu: u32,
+ socket: Option<GioSocketWrapper>,
+ used_socket: Option<GioSocketWrapper>,
context: String,
context_wait: u32,
}
@@ -68,13 +141,15 @@ impl Default for Settings {
reuse: DEFAULT_REUSE,
caps: DEFAULT_CAPS,
mtu: DEFAULT_MTU,
+ socket: DEFAULT_SOCKET,
+ used_socket: DEFAULT_USED_SOCKET,
context: DEFAULT_CONTEXT.into(),
context_wait: DEFAULT_CONTEXT_WAIT,
}
}
}
-static PROPERTIES: [Property; 7] = [
+static PROPERTIES: [Property; 9] = [
Property::String(
"address",
"Address",
@@ -112,6 +187,20 @@ static PROPERTIES: [Property; 7] = [
DEFAULT_MTU,
PropertyMutability::ReadWrite,
),
+ Property::Object(
+ "socket",
+ "Socket",
+ "Socket to use for UDP reception. (None == allocate)",
+ gio::Socket::static_type,
+ PropertyMutability::ReadWrite,
+ ),
+ Property::Object(
+ "used-socket",
+ "Used Socket",
+ "Socket currently in use for UDP reception. (None = no socket)",
+ gio::Socket::static_type,
+ PropertyMutability::Readable,
+ ),
Property::String(
"context",
"Context",
@@ -438,120 +527,203 @@ impl UdpSrc {
)
})?;
- let addr: IpAddr = match settings.address {
- None => {
- return Err(gst_error_msg!(
- gst::ResourceError::Settings,
- ["No address set"]
- ))
+ let socket = if let Some(ref wrapped_socket) = settings.socket {
+ use std::net::UdpSocket;
+
+ let mut socket: UdpSocket;
+
+ #[cfg(unix)]
+ {
+ socket = wrapped_socket.get()
}
- Some(ref addr) => match addr.parse() {
- Err(err) => {
+ #[cfg(windows)]
+ {
+ socket = wrapped_socket.get()
+ }
+
+ let socket =
+ net::UdpSocket::from_std(socket, io_context.reactor_handle()).map_err(|err| {
+ gst_error_msg!(
+ gst::ResourceError::OpenRead,
+ ["Failed to setup socket for tokio: {}", err]
+ )
+ })?;
+
+ self.settings.lock().unwrap().used_socket = Some(wrapped_socket.clone());
+
+ socket
+ } else {
+ let addr: IpAddr = match settings.address {
+ None => {
return Err(gst_error_msg!(
gst::ResourceError::Settings,
- ["Invalid address '{}' set: {}", addr, err]
+ ["No address set"]
))
}
- Ok(addr) => addr,
- },
- };
- let port = settings.port;
-
- // TODO: TTL, multicast loopback, etc
- let saddr = if addr.is_multicast() {
- // TODO: Use ::unspecified() constructor once stable
- let bind_addr = if addr.is_ipv4() {
- IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0))
- } else {
- IpAddr::V6(Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 0))
+ Some(ref addr) => match addr.parse() {
+ Err(err) => {
+ return Err(gst_error_msg!(
+ gst::ResourceError::Settings,
+ ["Invalid address '{}' set: {}", addr, err]
+ ))
+ }
+ Ok(addr) => addr,
+ },
};
+ let port = settings.port;
- let saddr = SocketAddr::new(bind_addr, port as u16);
- gst_debug!(
- self.cat,
- obj: element,
- "Binding to {:?} for multicast group {:?}",
- saddr,
- addr
- );
-
- saddr
- } else {
- let saddr = SocketAddr::new(addr, port as u16);
- gst_debug!(self.cat, obj: element, "Binding to {:?}", saddr);
+ // TODO: TTL, multicast loopback, etc
+ let saddr = if addr.is_multicast() {
+ // TODO: Use ::unspecified() constructor once stable
+ let bind_addr = if addr.is_ipv4() {
+ IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0))
+ } else {
+ IpAddr::V6(Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 0))
+ };
- saddr
- };
+ let saddr = SocketAddr::new(bind_addr, port as u16);
+ gst_debug!(
+ self.cat,
+ obj: element,
+ "Binding to {:?} for multicast group {:?}",
+ saddr,
+ addr
+ );
- let builder = if addr.is_ipv4() {
- net2::UdpBuilder::new_v4()
- } else {
- net2::UdpBuilder::new_v6()
- }
- .map_err(|err| {
- gst_error_msg!(
- gst::ResourceError::OpenRead,
- ["Failed to create socket: {}", err]
- )
- })?;
+ saddr
+ } else {
+ let saddr = SocketAddr::new(addr, port as u16);
+ gst_debug!(self.cat, obj: element, "Binding to {:?}", saddr);
- builder.reuse_address(settings.reuse).map_err(|err| {
- gst_error_msg!(
- gst::ResourceError::OpenRead,
- ["Failed to set reuse_address: {}", err]
- )
- })?;
+ saddr
+ };
- #[cfg(unix)]
- {
- use net2::unix::UnixUdpBuilderExt;
+ let builder = if addr.is_ipv4() {
+ net2::UdpBuilder::new_v4()
+ } else {
+ net2::UdpBuilder::new_v6()
+ }
+ .map_err(|err| {
+ gst_error_msg!(
+ gst::ResourceError::OpenRead,
+ ["Failed to create socket: {}", err]
+ )
+ })?;
- builder.reuse_port(settings.reuse).map_err(|err| {
+ builder.reuse_address(settings.reuse).map_err(|err| {
gst_error_msg!(
gst::ResourceError::OpenRead,
- ["Failed to set reuse_port: {}", err]
+ ["Failed to set reuse_address: {}", err]
)
})?;
- }
- let socket = builder.bind(&saddr).map_err(|err| {
- gst_error_msg!(
- gst::ResourceError::OpenRead,
- ["Failed to bind socket: {}", err]
- )
- })?;
+ #[cfg(unix)]
+ {
+ use net2::unix::UnixUdpBuilderExt;
+
+ builder.reuse_port(settings.reuse).map_err(|err| {
+ gst_error_msg!(
+ gst::ResourceError::OpenRead,
+ ["Failed to set reuse_port: {}", err]
+ )
+ })?;
+ }
- let socket =
- net::UdpSocket::from_std(socket, io_context.reactor_handle()).map_err(|err| {
+ let socket = builder.bind(&saddr).map_err(|err| {
gst_error_msg!(
gst::ResourceError::OpenRead,
- ["Failed to setup socket for tokio: {}", err]
+ ["Failed to bind socket: {}", err]
)
})?;
- if addr.is_multicast() {
- // TODO: Multicast interface configuration, going to be tricky
- match addr {
- IpAddr::V4(addr) => {
- socket
- .join_multicast_v4(&addr, &Ipv4Addr::new(0, 0, 0, 0))
- .map_err(|err| {
+ let socket =
+ net::UdpSocket::from_std(socket, io_context.reactor_handle()).map_err(|err| {
+ gst_error_msg!(
+ gst::ResourceError::OpenRead,
+ ["Failed to setup socket for tokio: {}", err]
+ )
+ })?;
+
+ if addr.is_multicast() {
+ // TODO: Multicast interface configuration, going to be tricky
+ match addr {
+ IpAddr::V4(addr) => {
+ socket
+ .join_multicast_v4(&addr, &Ipv4Addr::new(0, 0, 0, 0))
+ .map_err(|err| {
+ gst_error_msg!(
+ gst::ResourceError::OpenRead,
+ ["Failed to join multicast group: {}", err]
+ )
+ })?;
+ }
+ IpAddr::V6(addr) => {
+ socket.join_multicast_v6(&addr, 0).map_err(|err| {
gst_error_msg!(
gst::ResourceError::OpenRead,
["Failed to join multicast group: {}", err]
)
})?;
+ }
}
- IpAddr::V6(addr) => {
- socket.join_multicast_v6(&addr, 0).map_err(|err| {
- gst_error_msg!(
- gst::ResourceError::OpenRead,
- ["Failed to join multicast group: {}", err]
- )
- })?;
+ }
+
+ // Store the socket as used-socket in the settings
+ #[cfg(unix)]
+ {
+ let fd = socket.as_raw_fd();
+
+ // This is technically unsafe because it allows
+ // us to share the fd between the socket and the
+ // GIO socket below, but safety of this is the
+ // job of the application
+ struct FdConverter(RawFd);
+ impl IntoRawFd for FdConverter {
+ fn into_raw_fd(self) -> RawFd {
+ self.0
+ }
}
+
+ let fd = FdConverter(fd);
+
+ let gio_socket = gio::Socket::new_from_fd(fd).map_err(|err| {
+ gst_error_msg!(
+ gst::ResourceError::OpenRead,
+ ["Failed to create wrapped GIO socket: {}", err]
+ )
+ })?;;
+ let wrapper = GioSocketWrapper::new(&gio_socket);
+ self.settings.lock().unwrap().used_socket = Some(wrapper);
}
- }
+ #[cfg(windows)]
+ {
+ let fd = socket.as_raw_socket();
+
+ // This is technically unsafe because it allows
+ // us to share the fd between the socket and the
+ // GIO socket below, but safety of this is the
+ // job of the application
+ struct SocketConverter(RawSocket);
+ impl IntoRawSocket for SocketConverter {
+ fn into_raw_socket(self) -> RawSocket {
+ self.0
+ }
+ }
+
+ let fd = SocketConverter(fd);
+
+ let gio_socket = gio::Socket::new_from_socket(fd).map_err(|err| {
+ gst_error_msg!(
+ gst::ResourceError::OpenRead,
+ ["Failed to create wrapped GIO socket: {}", err]
+ )
+ })?;;
+ let wrapper = GioSocketWrapper::new(&gio_socket);
+ self.settings.lock().unwrap().used_socket = Some(wrapper);
+ }
+
+ socket
+ };
let buffer_pool = gst::BufferPool::new();
let mut config = buffer_pool.get_config();
@@ -615,6 +787,9 @@ impl UdpSrc {
state.pending_future_id = Some(pending_future_id);
gst_debug!(self.cat, obj: element, "Prepared");
+ drop(state);
+
+ element.notify("used-socket");
Ok(())
}
@@ -622,6 +797,8 @@ impl UdpSrc {
fn unprepare(&self, element: &Element) -> Result<(), ()> {
gst_debug!(self.cat, obj: element, "Unpreparing");
+ self.settings.lock().unwrap().used_socket = None;
+
// FIXME: The IO Context has to be alive longer than the queue,
// otherwise the queue can't finish any remaining work
let (mut socket, io_context) = {
@@ -701,6 +878,15 @@ impl ObjectImpl<Element> for UdpSrc {
let mut settings = self.settings.lock().unwrap();
settings.mtu = value.get().unwrap();
}
+ Property::Object("socket", ..) => {
+ let mut settings = self.settings.lock().unwrap();
+ settings.socket = value
+ .get::<gio::Socket>()
+ .map(|socket| GioSocketWrapper::new(&socket));
+ }
+ Property::Object("used-socket", ..) => {
+ unreachable!();
+ }
Property::String("context", ..) => {
let mut settings = self.settings.lock().unwrap();
settings.context = value.get().unwrap_or_else(|| "".into());
@@ -737,6 +923,22 @@ impl ObjectImpl<Element> for UdpSrc {
let mut settings = self.settings.lock().unwrap();
Ok(settings.mtu.to_value())
}
+ Property::Object("socket", ..) => {
+ let mut settings = self.settings.lock().unwrap();
+ Ok(settings
+ .socket
+ .as_ref()
+ .map(GioSocketWrapper::as_socket)
+ .to_value())
+ }
+ Property::Object("used-socket", ..) => {
+ let mut settings = self.settings.lock().unwrap();
+ Ok(settings
+ .used_socket
+ .as_ref()
+ .map(GioSocketWrapper::as_socket)
+ .to_value())
+ }
Property::String("context", ..) => {
let mut settings = self.settings.lock().unwrap();
Ok(settings.context.to_value())