Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.freedesktop.org/gstreamer/gst-plugins-rs.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
path: root/net
diff options
context:
space:
mode:
authorSebastian Dröge <sebastian@centricular.com>2020-11-14 20:24:01 +0300
committerSebastian Dröge <sebastian@centricular.com>2020-11-15 19:25:42 +0300
commitd7404a7e1cf40593f625857336de04c7d92a9201 (patch)
tree0704ad46fc6a35d0aeb9210de741a36eb206429a /net
parentaf0337c26c25cc782142f914dfa19383e51e1743 (diff)
net: Update for subclassing API changes
Diffstat (limited to 'net')
-rw-r--r--net/reqwest/src/reqwesthttpsrc/imp.rs (renamed from net/reqwest/src/reqwesthttpsrc.rs)72
-rw-r--r--net/reqwest/src/reqwesthttpsrc/mod.rs27
-rw-r--r--net/rusoto/src/aws_transcriber/imp.rs (renamed from net/rusoto/src/aws_transcribe_parse.rs)65
-rw-r--r--net/rusoto/src/aws_transcriber/mod.rs37
-rw-r--r--net/rusoto/src/aws_transcriber/packet/mod.rs (renamed from net/rusoto/src/packet/mod.rs)0
-rw-r--r--net/rusoto/src/lib.rs5
-rw-r--r--net/rusoto/src/s3sink/imp.rs (renamed from net/rusoto/src/s3sink.rs)32
-rw-r--r--net/rusoto/src/s3sink/mod.rs27
-rw-r--r--net/rusoto/src/s3src/imp.rs (renamed from net/rusoto/src/s3src.rs)59
-rw-r--r--net/rusoto/src/s3src/mod.rs27
10 files changed, 213 insertions, 138 deletions
diff --git a/net/reqwest/src/reqwesthttpsrc.rs b/net/reqwest/src/reqwesthttpsrc/imp.rs
index eddfd3fce..cb9ebec86 100644
--- a/net/reqwest/src/reqwesthttpsrc.rs
+++ b/net/reqwest/src/reqwesthttpsrc/imp.rs
@@ -230,7 +230,7 @@ lazy_static! {
impl ReqwestHttpSrc {
fn set_location(
&self,
- _element: &gst_base::BaseSrc,
+ _element: &super::ReqwestHttpSrc,
uri: Option<&str>,
) -> Result<(), glib::Error> {
let state = self.state.lock().unwrap();
@@ -268,7 +268,10 @@ impl ReqwestHttpSrc {
Ok(())
}
- fn ensure_client(&self, src: &gst_base::BaseSrc) -> Result<ClientContext, gst::ErrorMessage> {
+ fn ensure_client(
+ &self,
+ src: &super::ReqwestHttpSrc,
+ ) -> Result<ClientContext, gst::ErrorMessage> {
let mut client_guard = self.client.lock().unwrap();
if let Some(ref client) = *client_guard {
gst_debug!(CAT, obj: src, "Using already configured client");
@@ -334,7 +337,7 @@ impl ReqwestHttpSrc {
fn do_request(
&self,
- src: &gst_base::BaseSrc,
+ src: &super::ReqwestHttpSrc,
uri: Url,
start: u64,
stop: Option<u64>,
@@ -670,17 +673,15 @@ impl ReqwestHttpSrc {
}
impl ObjectImpl for ReqwestHttpSrc {
- fn set_property(&self, obj: &glib::Object, id: usize, value: &glib::Value) {
+ fn set_property(&self, obj: &Self::Type, id: usize, value: &glib::Value) {
let prop = &PROPERTIES[id];
match *prop {
subclass::Property("location", ..) => {
- let element = obj.downcast_ref::<gst_base::BaseSrc>().unwrap();
-
let location = value.get::<&str>().expect("type checked upstream");
- if let Err(err) = self.set_location(element, location) {
+ if let Err(err) = self.set_location(obj, location) {
gst_error!(
CAT,
- obj: element,
+ obj: obj,
"Failed to set property `location`: {:?}",
err
);
@@ -695,9 +696,8 @@ impl ObjectImpl for ReqwestHttpSrc {
settings.user_agent = user_agent;
}
subclass::Property("is-live", ..) => {
- let element = obj.downcast_ref::<gst_base::BaseSrc>().unwrap();
let is_live = value.get_some().expect("type checked upstream");
- element.set_live(is_live);
+ obj.set_live(is_live);
}
subclass::Property("user-id", ..) => {
let mut settings = self.settings.lock().unwrap();
@@ -743,7 +743,7 @@ impl ObjectImpl for ReqwestHttpSrc {
};
}
- fn get_property(&self, obj: &glib::Object, id: usize) -> Result<glib::Value, ()> {
+ fn get_property(&self, obj: &Self::Type, id: usize) -> Result<glib::Value, ()> {
let prop = &PROPERTIES[id];
match *prop {
subclass::Property("location", ..) => {
@@ -756,10 +756,7 @@ impl ObjectImpl for ReqwestHttpSrc {
let settings = self.settings.lock().unwrap();
Ok(settings.user_agent.to_value())
}
- subclass::Property("is-live", ..) => {
- let element = obj.downcast_ref::<gst_base::BaseSrc>().unwrap();
- Ok(element.is_live().to_value())
- }
+ subclass::Property("is-live", ..) => Ok(obj.is_live().to_value()),
subclass::Property("user-id", ..) => {
let settings = self.settings.lock().unwrap();
Ok(settings.user_id.to_value())
@@ -796,16 +793,15 @@ impl ObjectImpl for ReqwestHttpSrc {
}
}
- fn constructed(&self, obj: &glib::Object) {
+ fn constructed(&self, obj: &Self::Type) {
self.parent_constructed(obj);
- let element = obj.downcast_ref::<gst_base::BaseSrc>().unwrap();
- element.set_automatic_eos(false);
- element.set_format(gst::Format::Bytes);
+ obj.set_automatic_eos(false);
+ obj.set_format(gst::Format::Bytes);
}
}
impl ElementImpl for ReqwestHttpSrc {
- fn set_context(&self, element: &gst::Element, context: &gst::Context) {
+ fn set_context(&self, element: &Self::Type, context: &gst::Context) {
if context.get_context_type() == REQWEST_CLIENT_CONTEXT {
let mut external_client = self.external_client.lock().unwrap();
let s = context.get_structure();
@@ -820,7 +816,7 @@ impl ElementImpl for ReqwestHttpSrc {
fn change_state(
&self,
- element: &gst::Element,
+ element: &Self::Type,
transition: gst::StateChange,
) -> Result<gst::StateChangeSuccess, gst::StateChangeError> {
if let gst::StateChange::ReadyToNull = transition {
@@ -832,21 +828,21 @@ impl ElementImpl for ReqwestHttpSrc {
}
impl BaseSrcImpl for ReqwestHttpSrc {
- fn is_seekable(&self, _src: &gst_base::BaseSrc) -> bool {
+ fn is_seekable(&self, _src: &Self::Type) -> bool {
match *self.state.lock().unwrap() {
State::Started { seekable, .. } => seekable,
_ => false,
}
}
- fn get_size(&self, _src: &gst_base::BaseSrc) -> Option<u64> {
+ fn get_size(&self, _src: &Self::Type) -> Option<u64> {
match *self.state.lock().unwrap() {
State::Started { size, .. } => size,
_ => None,
}
}
- fn unlock(&self, _src: &gst_base::BaseSrc) -> Result<(), gst::ErrorMessage> {
+ fn unlock(&self, _src: &Self::Type) -> Result<(), gst::ErrorMessage> {
let canceller = self.canceller.lock().unwrap();
if let Some(ref canceller) = *canceller {
canceller.abort();
@@ -854,7 +850,7 @@ impl BaseSrcImpl for ReqwestHttpSrc {
Ok(())
}
- fn start(&self, src: &gst_base::BaseSrc) -> Result<(), gst::ErrorMessage> {
+ fn start(&self, src: &Self::Type) -> Result<(), gst::ErrorMessage> {
let mut state = self.state.lock().unwrap();
*state = State::Stopped;
@@ -881,14 +877,14 @@ impl BaseSrcImpl for ReqwestHttpSrc {
Ok(())
}
- fn stop(&self, src: &gst_base::BaseSrc) -> Result<(), gst::ErrorMessage> {
+ fn stop(&self, src: &Self::Type) -> Result<(), gst::ErrorMessage> {
gst_debug!(CAT, obj: src, "Stopping");
*self.state.lock().unwrap() = State::Stopped;
Ok(())
}
- fn query(&self, element: &gst_base::BaseSrc, query: &mut gst::QueryRef) -> bool {
+ fn query(&self, element: &Self::Type, query: &mut gst::QueryRef) -> bool {
use gst::QueryView;
match query.view_mut() {
@@ -906,7 +902,7 @@ impl BaseSrcImpl for ReqwestHttpSrc {
}
}
- fn do_seek(&self, src: &gst_base::BaseSrc, segment: &mut gst::Segment) -> bool {
+ fn do_seek(&self, src: &Self::Type, segment: &mut gst::Segment) -> bool {
let segment = segment.downcast_mut::<gst::format::Bytes>().unwrap();
let mut state = self.state.lock().unwrap();
@@ -951,7 +947,7 @@ impl BaseSrcImpl for ReqwestHttpSrc {
}
impl PushSrcImpl for ReqwestHttpSrc {
- fn create(&self, src: &gst_base::PushSrc) -> Result<gst::Buffer, gst::FlowError> {
+ fn create(&self, src: &Self::Type) -> Result<gst::Buffer, gst::FlowError> {
let mut state = self.state.lock().unwrap();
let (response, position, caps, tags) = match *state {
@@ -1073,15 +1069,13 @@ impl PushSrcImpl for ReqwestHttpSrc {
}
impl URIHandlerImpl for ReqwestHttpSrc {
- fn get_uri(&self, _element: &gst::URIHandler) -> Option<String> {
+ fn get_uri(&self, _element: &Self::Type) -> Option<String> {
let settings = self.settings.lock().unwrap();
settings.location.as_ref().map(Url::to_string)
}
- fn set_uri(&self, element: &gst::URIHandler, uri: &str) -> Result<(), glib::Error> {
- let element = element.dynamic_cast_ref::<gst_base::BaseSrc>().unwrap();
-
+ fn set_uri(&self, element: &Self::Type, uri: &str) -> Result<(), glib::Error> {
self.set_location(&element, Some(uri))
}
@@ -1096,6 +1090,7 @@ impl URIHandlerImpl for ReqwestHttpSrc {
impl ObjectSubclass for ReqwestHttpSrc {
const NAME: &'static str = "ReqwestHttpSrc";
+ type Type = super::ReqwestHttpSrc;
type ParentType = gst_base::PushSrc;
type Instance = gst::subclass::ElementInstanceStruct<Self>;
type Class = subclass::simple::ClassStruct<Self>;
@@ -1116,7 +1111,7 @@ impl ObjectSubclass for ReqwestHttpSrc {
type_.add_interface::<gst::URIHandler>();
}
- fn class_init(klass: &mut subclass::simple::ClassStruct<Self>) {
+ fn class_init(klass: &mut Self::Class) {
klass.set_metadata(
"HTTP Source",
"Source/Network/HTTP",
@@ -1137,12 +1132,3 @@ impl ObjectSubclass for ReqwestHttpSrc {
klass.install_properties(&PROPERTIES);
}
}
-
-pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> {
- gst::Element::register(
- Some(plugin),
- "reqwesthttpsrc",
- gst::Rank::Marginal,
- ReqwestHttpSrc::get_type(),
- )
-}
diff --git a/net/reqwest/src/reqwesthttpsrc/mod.rs b/net/reqwest/src/reqwesthttpsrc/mod.rs
new file mode 100644
index 000000000..701f97a6c
--- /dev/null
+++ b/net/reqwest/src/reqwesthttpsrc/mod.rs
@@ -0,0 +1,27 @@
+// Copyright (C) 2016-2018 Sebastian Dröge <sebastian@centricular.com>
+//
+// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
+// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
+// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
+// option. This file may not be copied, modified, or distributed
+// except according to those terms.
+
+use glib::prelude::*;
+
+mod imp;
+
+glib_wrapper! {
+ pub struct ReqwestHttpSrc(ObjectSubclass<imp::ReqwestHttpSrc>) @extends gst_base::PushSrc, gst_base::BaseSrc, gst::Element, gst::Object, @implements gst::URIHandler;
+}
+
+unsafe impl Send for ReqwestHttpSrc {}
+unsafe impl Sync for ReqwestHttpSrc {}
+
+pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> {
+ gst::Element::register(
+ Some(plugin),
+ "reqwesthttpsrc",
+ gst::Rank::Marginal,
+ ReqwestHttpSrc::static_type(),
+ )
+}
diff --git a/net/rusoto/src/aws_transcribe_parse.rs b/net/rusoto/src/aws_transcriber/imp.rs
index a1c55e5ce..3c95f3c73 100644
--- a/net/rusoto/src/aws_transcribe_parse.rs
+++ b/net/rusoto/src/aws_transcriber/imp.rs
@@ -40,7 +40,7 @@ use std::pin::Pin;
use std::sync::Mutex;
use std::time::Duration;
-use crate::packet::*;
+use super::packet::*;
use serde_derive::Deserialize;
@@ -199,7 +199,7 @@ impl Default for State {
type WsSink = Pin<Box<dyn Sink<Message, Error = WsError> + Send>>;
-struct Transcriber {
+pub struct Transcriber {
srcpad: gst::Pad,
sinkpad: gst::Pad,
settings: Mutex<Settings>,
@@ -230,7 +230,7 @@ fn build_packet(payload: &[u8]) -> Vec<u8> {
}
impl Transcriber {
- fn dequeue(&self, element: &gst::Element) -> bool {
+ fn dequeue(&self, element: &super::Transcriber) -> bool {
/* First, check our pending buffers */
let mut items = vec![];
@@ -340,7 +340,7 @@ impl Transcriber {
fn enqueue(
&self,
- element: &gst::Element,
+ element: &super::Transcriber,
state: &mut State,
alternative: &TranscriptAlternative,
partial: bool,
@@ -398,7 +398,7 @@ impl Transcriber {
fn loop_fn(
&self,
- element: &gst::Element,
+ element: &super::Transcriber,
receiver: &mut mpsc::Receiver<Message>,
) -> Result<(), gst::ErrorMessage> {
let future = async move {
@@ -567,7 +567,7 @@ impl Transcriber {
RUNTIME.enter(|| futures::executor::block_on(future))
}
- fn start_task(&self, element: &gst::Element) -> Result<(), gst::LoggableError> {
+ fn start_task(&self, element: &super::Transcriber) -> Result<(), gst::LoggableError> {
let element_weak = element.downgrade();
let pad_weak = self.srcpad.downgrade();
let (sender, mut receiver) = mpsc::channel(1);
@@ -610,7 +610,7 @@ impl Transcriber {
fn src_activatemode(
&self,
_pad: &gst::Pad,
- element: &gst::Element,
+ element: &super::Transcriber,
_mode: gst::PadMode,
active: bool,
) -> Result<(), gst::LoggableError> {
@@ -628,7 +628,12 @@ impl Transcriber {
Ok(())
}
- fn src_query(&self, pad: &gst::Pad, element: &gst::Element, query: &mut gst::QueryRef) -> bool {
+ fn src_query(
+ &self,
+ pad: &gst::Pad,
+ element: &super::Transcriber,
+ query: &mut gst::QueryRef,
+ ) -> bool {
use gst::QueryView;
gst_log!(CAT, obj: pad, "Handling query {:?}", query);
@@ -664,7 +669,7 @@ impl Transcriber {
}
}
- fn sink_event(&self, pad: &gst::Pad, element: &gst::Element, event: gst::Event) -> bool {
+ fn sink_event(&self, pad: &gst::Pad, element: &super::Transcriber, event: gst::Event) -> bool {
use gst::EventView;
gst_debug!(CAT, obj: pad, "Handling event {:?}", event);
@@ -771,7 +776,7 @@ impl Transcriber {
async fn sync_and_send(
&self,
- element: &gst::Element,
+ element: &super::Transcriber,
buffer: Option<gst::Buffer>,
) -> Result<gst::FlowSuccess, gst::FlowError> {
let mut delay = None;
@@ -819,7 +824,7 @@ impl Transcriber {
fn handle_buffer(
&self,
_pad: &gst::Pad,
- element: &gst::Element,
+ element: &super::Transcriber,
buffer: Option<gst::Buffer>,
) -> Result<gst::FlowSuccess, gst::FlowError> {
gst_debug!(CAT, obj: element, "Handling {:?}", buffer);
@@ -846,13 +851,13 @@ impl Transcriber {
fn sink_chain(
&self,
pad: &gst::Pad,
- element: &gst::Element,
+ element: &super::Transcriber,
buffer: gst::Buffer,
) -> Result<gst::FlowSuccess, gst::FlowError> {
self.handle_buffer(pad, element, Some(buffer))
}
- fn ensure_connection(&self, element: &gst::Element) -> Result<(), gst::ErrorMessage> {
+ fn ensure_connection(&self, element: &super::Transcriber) -> Result<(), gst::ErrorMessage> {
let mut state = self.state.lock().unwrap();
if state.connected {
@@ -959,7 +964,7 @@ impl Transcriber {
Ok(())
}
- fn disconnect(&self, element: &gst::Element) -> Result<(), gst::ErrorMessage> {
+ fn disconnect(&self, element: &super::Transcriber) -> Result<(), gst::ErrorMessage> {
let mut state = self.state.lock().unwrap();
gst_info!(CAT, obj: element, "Unpreparing");
@@ -987,13 +992,14 @@ impl Transcriber {
impl ObjectSubclass for Transcriber {
const NAME: &'static str = "RsAwsTranscriber";
+ type Type = super::Transcriber;
type ParentType = gst::Element;
type Instance = gst::subclass::ElementInstanceStruct<Self>;
type Class = subclass::simple::ClassStruct<Self>;
glib_object_subclass!();
- fn with_class(klass: &subclass::simple::ClassStruct<Self>) -> Self {
+ fn with_class(klass: &Self::Class) -> Self {
let templ = klass.get_pad_template("sink").unwrap();
let sinkpad = gst::Pad::builder_with_template(&templ, Some("sink"))
.chain_function(|pad, parent, buffer| {
@@ -1047,7 +1053,7 @@ impl ObjectSubclass for Transcriber {
}
}
- fn class_init(klass: &mut subclass::simple::ClassStruct<Self>) {
+ fn class_init(klass: &mut Self::Class) {
klass.set_metadata(
"Transcriber",
"Audio/Text/Filter",
@@ -1085,17 +1091,15 @@ impl ObjectSubclass for Transcriber {
}
impl ObjectImpl for Transcriber {
- fn constructed(&self, obj: &glib::Object) {
+ fn constructed(&self, obj: &Self::Type) {
self.parent_constructed(obj);
- let element = obj.downcast_ref::<gst::Element>().unwrap();
- element.add_pad(&self.sinkpad).unwrap();
- element.add_pad(&self.srcpad).unwrap();
- element
- .set_element_flags(gst::ElementFlags::PROVIDE_CLOCK | gst::ElementFlags::REQUIRE_CLOCK);
+ obj.add_pad(&self.sinkpad).unwrap();
+ obj.add_pad(&self.srcpad).unwrap();
+ obj.set_element_flags(gst::ElementFlags::PROVIDE_CLOCK | gst::ElementFlags::REQUIRE_CLOCK);
}
- fn set_property(&self, _obj: &glib::Object, id: usize, value: &glib::Value) {
+ fn set_property(&self, _obj: &Self::Type, id: usize, value: &glib::Value) {
let prop = &PROPERTIES[id];
match *prop {
@@ -1115,7 +1119,7 @@ impl ObjectImpl for Transcriber {
}
}
- fn get_property(&self, _obj: &glib::Object, id: usize) -> Result<glib::Value, ()> {
+ fn get_property(&self, _obj: &Self::Type, id: usize) -> Result<glib::Value, ()> {
let prop = &PROPERTIES[id];
match *prop {
@@ -1139,7 +1143,7 @@ impl ObjectImpl for Transcriber {
impl ElementImpl for Transcriber {
fn change_state(
&self,
- element: &gst::Element,
+ element: &Self::Type,
transition: gst::StateChange,
) -> Result<gst::StateChangeSuccess, gst::StateChangeError> {
gst_info!(CAT, obj: element, "Changing state {:?}", transition);
@@ -1169,16 +1173,7 @@ impl ElementImpl for Transcriber {
Ok(success)
}
- fn provide_clock(&self, _element: &gst::Element) -> Option<gst::Clock> {
+ fn provide_clock(&self, _element: &Self::Type) -> Option<gst::Clock> {
Some(gst::SystemClock::obtain())
}
}
-
-pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> {
- gst::Element::register(
- Some(plugin),
- "awstranscriber",
- gst::Rank::None,
- Transcriber::get_type(),
- )
-}
diff --git a/net/rusoto/src/aws_transcriber/mod.rs b/net/rusoto/src/aws_transcriber/mod.rs
new file mode 100644
index 000000000..f28f86442
--- /dev/null
+++ b/net/rusoto/src/aws_transcriber/mod.rs
@@ -0,0 +1,37 @@
+// Copyright (C) 2020 Mathieu Duponchelle <mathieu@centricular.com>
+//
+// This library is free software; you can redistribute it and/or
+// modify it under the terms of the GNU Library General Public
+// License as published by the Free Software Foundation; either
+// version 2 of the License, or (at your option) any later version.
+//
+// This library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+// Library General Public License for more details.
+//
+// You should have received a copy of the GNU Library General Public
+// License along with this library; if not, write to the
+// Free Software Foundation, Inc., 51 Franklin Street, Suite 500,
+// Boston, MA 02110-1335, USA.
+
+use glib::prelude::*;
+
+mod imp;
+mod packet;
+
+glib_wrapper! {
+ pub struct Transcriber(ObjectSubclass<imp::Transcriber>) @extends gst::Element, gst::Object;
+}
+
+unsafe impl Send for Transcriber {}
+unsafe impl Sync for Transcriber {}
+
+pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> {
+ gst::Element::register(
+ Some(plugin),
+ "awstranscriber",
+ gst::Rank::None,
+ Transcriber::static_type(),
+ )
+}
diff --git a/net/rusoto/src/packet/mod.rs b/net/rusoto/src/aws_transcriber/packet/mod.rs
index 18281617c..18281617c 100644
--- a/net/rusoto/src/packet/mod.rs
+++ b/net/rusoto/src/aws_transcriber/packet/mod.rs
diff --git a/net/rusoto/src/lib.rs b/net/rusoto/src/lib.rs
index c0b45bd22..359ef1f64 100644
--- a/net/rusoto/src/lib.rs
+++ b/net/rusoto/src/lib.rs
@@ -14,8 +14,7 @@ extern crate gstreamer_base as gst_base;
#[macro_use]
extern crate lazy_static;
-mod aws_transcribe_parse;
-mod packet;
+mod aws_transcriber;
mod s3sink;
mod s3src;
mod s3url;
@@ -24,7 +23,7 @@ mod s3utils;
fn plugin_init(plugin: &gst::Plugin) -> Result<(), glib::BoolError> {
s3sink::register(plugin)?;
s3src::register(plugin)?;
- aws_transcribe_parse::register(plugin)?;
+ aws_transcriber::register(plugin)?;
Ok(())
}
diff --git a/net/rusoto/src/s3sink.rs b/net/rusoto/src/s3sink/imp.rs
index 21138f5b9..121a38589 100644
--- a/net/rusoto/src/s3sink.rs
+++ b/net/rusoto/src/s3sink/imp.rs
@@ -157,7 +157,7 @@ static PROPERTIES: [subclass::Property; 4] = [
impl S3Sink {
fn flush_current_buffer(
&self,
- element: &gst_base::BaseSink,
+ element: &super::S3Sink,
) -> Result<(), Option<gst::ErrorMessage>> {
let upload_part_req = self.create_upload_part_request()?;
let part_number = upload_part_req.part_number;
@@ -261,7 +261,7 @@ impl S3Sink {
})
}
- fn finalize_upload(&self, element: &gst_base::BaseSink) -> Result<(), gst::ErrorMessage> {
+ fn finalize_upload(&self, element: &super::S3Sink) -> Result<(), gst::ErrorMessage> {
if self.flush_current_buffer(element).is_err() {
return Err(gst_error_msg!(
gst::ResourceError::Settings,
@@ -339,7 +339,7 @@ impl S3Sink {
fn update_buffer(
&self,
src: &[u8],
- element: &gst_base::BaseSink,
+ element: &super::S3Sink,
) -> Result<(), Option<gst::ErrorMessage>> {
let mut state = self.state.lock().unwrap();
let started_state = match *state {
@@ -381,6 +381,7 @@ impl S3Sink {
impl ObjectSubclass for S3Sink {
const NAME: &'static str = "RusotoS3Sink";
+ type Type = super::S3Sink;
type ParentType = gst_base::BaseSink;
type Instance = gst::subclass::ElementInstanceStruct<Self>;
type Class = subclass::simple::ClassStruct<Self>;
@@ -395,7 +396,7 @@ impl ObjectSubclass for S3Sink {
}
}
- fn class_init(klass: &mut subclass::simple::ClassStruct<Self>) {
+ fn class_init(klass: &mut Self::Class) {
klass.set_metadata(
"Amazon S3 sink",
"Source/Network",
@@ -418,7 +419,7 @@ impl ObjectSubclass for S3Sink {
}
impl ObjectImpl for S3Sink {
- fn set_property(&self, _obj: &glib::Object, id: usize, value: &glib::Value) {
+ fn set_property(&self, _obj: &Self::Type, id: usize, value: &glib::Value) {
let prop = &PROPERTIES[id as usize];
let mut settings = self.settings.lock().unwrap();
@@ -445,7 +446,7 @@ impl ObjectImpl for S3Sink {
}
}
- fn get_property(&self, _: &glib::Object, id: usize) -> Result<glib::Value, ()> {
+ fn get_property(&self, _: &Self::Type, id: usize) -> Result<glib::Value, ()> {
let prop = &PROPERTIES[id as usize];
let settings = self.settings.lock().unwrap();
@@ -462,11 +463,11 @@ impl ObjectImpl for S3Sink {
impl ElementImpl for S3Sink {}
impl BaseSinkImpl for S3Sink {
- fn start(&self, _element: &gst_base::BaseSink) -> Result<(), gst::ErrorMessage> {
+ fn start(&self, _element: &Self::Type) -> Result<(), gst::ErrorMessage> {
self.start()
}
- fn stop(&self, element: &gst_base::BaseSink) -> Result<(), gst::ErrorMessage> {
+ fn stop(&self, element: &Self::Type) -> Result<(), gst::ErrorMessage> {
let mut state = self.state.lock().unwrap();
*state = State::Stopped;
gst_info!(CAT, obj: element, "Stopped");
@@ -476,7 +477,7 @@ impl BaseSinkImpl for S3Sink {
fn render(
&self,
- element: &gst_base::BaseSink,
+ element: &Self::Type,
buffer: &gst::Buffer,
) -> Result<gst::FlowSuccess, gst::FlowError> {
if let State::Stopped = *self.state.lock().unwrap() {
@@ -511,13 +512,13 @@ impl BaseSinkImpl for S3Sink {
}
}
- fn unlock(&self, _element: &gst_base::BaseSink) -> Result<(), gst::ErrorMessage> {
+ fn unlock(&self, _element: &Self::Type) -> Result<(), gst::ErrorMessage> {
self.cancel();
Ok(())
}
- fn event(&self, element: &gst_base::BaseSink, event: gst::Event) -> bool {
+ fn event(&self, element: &Self::Type, event: gst::Event) -> bool {
if let gst::EventView::Eos(_) = event.view() {
if let Err(error_message) = self.finalize_upload(element) {
gst_error!(
@@ -533,12 +534,3 @@ impl BaseSinkImpl for S3Sink {
BaseSinkImplExt::parent_event(self, element, event)
}
}
-
-pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> {
- gst::Element::register(
- Some(plugin),
- "rusotos3sink",
- gst::Rank::Primary,
- S3Sink::get_type(),
- )
-}
diff --git a/net/rusoto/src/s3sink/mod.rs b/net/rusoto/src/s3sink/mod.rs
new file mode 100644
index 000000000..40b6cd937
--- /dev/null
+++ b/net/rusoto/src/s3sink/mod.rs
@@ -0,0 +1,27 @@
+// Copyright (C) 2019 Amazon.com, Inc. or its affiliates <mkolny@amazon.com>
+//
+// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
+// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
+// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
+// option. This file may not be copied, modified, or distributed
+// except according to those terms.
+
+use glib::prelude::*;
+
+mod imp;
+
+glib_wrapper! {
+ pub struct S3Sink(ObjectSubclass<imp::S3Sink>) @extends gst_base::BaseSink, gst::Element, gst::Object;
+}
+
+unsafe impl Send for S3Sink {}
+unsafe impl Sync for S3Sink {}
+
+pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> {
+ gst::Element::register(
+ Some(plugin),
+ "rusotos3sink",
+ gst::Rank::Primary,
+ S3Sink::static_type(),
+ )
+}
diff --git a/net/rusoto/src/s3src.rs b/net/rusoto/src/s3src/imp.rs
index 8c3978907..ea0eb4f6c 100644
--- a/net/rusoto/src/s3src.rs
+++ b/net/rusoto/src/s3src/imp.rs
@@ -72,11 +72,7 @@ impl S3Src {
Ok(S3Client::new(url.region.clone()))
}
- fn set_uri(
- self: &S3Src,
- _: &gst_base::BaseSrc,
- url_str: Option<&str>,
- ) -> Result<(), glib::Error> {
+ fn set_uri(self: &S3Src, _: &super::S3Src, url_str: Option<&str>) -> Result<(), glib::Error> {
let state = self.state.lock().unwrap();
if let StreamingState::Started { .. } = *state {
@@ -108,7 +104,7 @@ impl S3Src {
fn head(
self: &S3Src,
- src: &gst_base::BaseSrc,
+ src: &super::S3Src,
client: &S3Client,
url: &GstS3Url,
) -> Result<u64, gst::ErrorMessage> {
@@ -145,7 +141,7 @@ impl S3Src {
/* Returns the bytes, Some(error) if one occured, or a None error if interrupted */
fn get(
self: &S3Src,
- src: &gst_base::BaseSrc,
+ src: &super::S3Src,
offset: u64,
length: u64,
) -> Result<Bytes, Option<gst::ErrorMessage>> {
@@ -210,6 +206,7 @@ impl S3Src {
impl ObjectSubclass for S3Src {
const NAME: &'static str = "RusotoS3Src";
+ type Type = super::S3Src;
type ParentType = gst_base::BaseSrc;
type Instance = gst::subclass::ElementInstanceStruct<Self>;
type Class = subclass::simple::ClassStruct<Self>;
@@ -228,7 +225,7 @@ impl ObjectSubclass for S3Src {
typ.add_interface::<gst::URIHandler>();
}
- fn class_init(klass: &mut subclass::simple::ClassStruct<Self>) {
+ fn class_init(klass: &mut Self::Class) {
klass.set_metadata(
"Amazon S3 source",
"Source/Network",
@@ -251,19 +248,18 @@ impl ObjectSubclass for S3Src {
}
impl ObjectImpl for S3Src {
- fn set_property(&self, obj: &glib::Object, id: usize, value: &glib::Value) {
+ fn set_property(&self, obj: &Self::Type, id: usize, value: &glib::Value) {
let prop = &PROPERTIES[id as usize];
- let basesrc = obj.downcast_ref::<gst_base::BaseSrc>().unwrap();
match *prop {
subclass::Property("uri", ..) => {
- let _ = self.set_uri(basesrc, value.get().expect("type checked upstream"));
+ let _ = self.set_uri(obj, value.get().expect("type checked upstream"));
}
_ => unimplemented!(),
}
}
- fn get_property(&self, _: &glib::Object, id: usize) -> Result<glib::Value, ()> {
+ fn get_property(&self, _: &Self::Type, id: usize) -> Result<glib::Value, ()> {
let prop = &PROPERTIES[id as usize];
match *prop {
@@ -279,13 +275,12 @@ impl ObjectImpl for S3Src {
}
}
- fn constructed(&self, obj: &glib::Object) {
+ fn constructed(&self, obj: &Self::Type) {
self.parent_constructed(obj);
- let basesrc = obj.downcast_ref::<gst_base::BaseSrc>().unwrap();
- basesrc.set_format(gst::Format::Bytes);
+ obj.set_format(gst::Format::Bytes);
/* Set a larger default blocksize to make read more efficient */
- basesrc.set_blocksize(256 * 1024);
+ obj.set_blocksize(256 * 1024);
}
}
@@ -294,13 +289,12 @@ impl ElementImpl for S3Src {
}
impl URIHandlerImpl for S3Src {
- fn get_uri(&self, _: &gst::URIHandler) -> Option<String> {
+ fn get_uri(&self, _: &Self::Type) -> Option<String> {
self.url.lock().unwrap().as_ref().map(|s| s.to_string())
}
- fn set_uri(&self, element: &gst::URIHandler, uri: &str) -> Result<(), glib::Error> {
- let basesrc = element.dynamic_cast_ref::<gst_base::BaseSrc>().unwrap();
- self.set_uri(basesrc, Some(uri))
+ fn set_uri(&self, element: &Self::Type, uri: &str) -> Result<(), glib::Error> {
+ self.set_uri(element, Some(uri))
}
fn get_uri_type() -> gst::URIType {
@@ -313,18 +307,18 @@ impl URIHandlerImpl for S3Src {
}
impl BaseSrcImpl for S3Src {
- fn is_seekable(&self, _: &gst_base::BaseSrc) -> bool {
+ fn is_seekable(&self, _: &Self::Type) -> bool {
true
}
- fn get_size(&self, _: &gst_base::BaseSrc) -> Option<u64> {
+ fn get_size(&self, _: &Self::Type) -> Option<u64> {
match *self.state.lock().unwrap() {
StreamingState::Stopped => None,
StreamingState::Started { size, .. } => Some(size),
}
}
- fn start(&self, src: &gst_base::BaseSrc) -> Result<(), gst::ErrorMessage> {
+ fn start(&self, src: &Self::Type) -> Result<(), gst::ErrorMessage> {
let mut state = self.state.lock().unwrap();
if let StreamingState::Started { .. } = *state {
@@ -353,7 +347,7 @@ impl BaseSrcImpl for S3Src {
Ok(())
}
- fn stop(&self, _: &gst_base::BaseSrc) -> Result<(), gst::ErrorMessage> {
+ fn stop(&self, _: &Self::Type) -> Result<(), gst::ErrorMessage> {
// First, stop any asynchronous tasks if we're running, as they will have the state lock
self.cancel();
@@ -368,7 +362,7 @@ impl BaseSrcImpl for S3Src {
Ok(())
}
- fn query(&self, src: &gst_base::BaseSrc, query: &mut gst::QueryRef) -> bool {
+ fn query(&self, src: &Self::Type, query: &mut gst::QueryRef) -> bool {
if let gst::QueryView::Scheduling(ref mut q) = query.view_mut() {
q.set(
gst::SchedulingFlags::SEQUENTIAL | gst::SchedulingFlags::BANDWIDTH_LIMITED,
@@ -385,7 +379,7 @@ impl BaseSrcImpl for S3Src {
fn create(
&self,
- src: &gst_base::BaseSrc,
+ src: &Self::Type,
offset: u64,
buffer: Option<&mut gst::BufferRef>,
length: u32,
@@ -416,21 +410,12 @@ impl BaseSrcImpl for S3Src {
}
/* FIXME: implement */
- fn do_seek(&self, _: &gst_base::BaseSrc, _: &mut gst::Segment) -> bool {
+ fn do_seek(&self, _: &Self::Type, _: &mut gst::Segment) -> bool {
true
}
- fn unlock(&self, _: &gst_base::BaseSrc) -> Result<(), gst::ErrorMessage> {
+ fn unlock(&self, _: &Self::Type) -> Result<(), gst::ErrorMessage> {
self.cancel();
Ok(())
}
}
-
-pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> {
- gst::Element::register(
- Some(plugin),
- "rusotos3src",
- gst::Rank::Primary,
- S3Src::get_type(),
- )
-}
diff --git a/net/rusoto/src/s3src/mod.rs b/net/rusoto/src/s3src/mod.rs
new file mode 100644
index 000000000..04968feaf
--- /dev/null
+++ b/net/rusoto/src/s3src/mod.rs
@@ -0,0 +1,27 @@
+// Copyright (C) 2017 Author: Arun Raghavan <arun@arunraghavan.net>
+//
+// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
+// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
+// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
+// option. This file may not be copied, modified, or distributed
+// except according to those terms.
+
+use glib::prelude::*;
+
+mod imp;
+
+glib_wrapper! {
+ pub struct S3Src(ObjectSubclass<imp::S3Src>) @extends gst_base::BaseSrc, gst::Element, gst::Object;
+}
+
+unsafe impl Send for S3Src {}
+unsafe impl Sync for S3Src {}
+
+pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> {
+ gst::Element::register(
+ Some(plugin),
+ "rusotos3src",
+ gst::Rank::Primary,
+ S3Src::static_type(),
+ )
+}