diff options
Diffstat (limited to 'generic/threadshare/tests/pad.rs')
-rw-r--r-- | generic/threadshare/tests/pad.rs | 1103 |
1 files changed, 568 insertions, 535 deletions
diff --git a/generic/threadshare/tests/pad.rs b/generic/threadshare/tests/pad.rs index 78408d1e9..fa22175de 100644 --- a/generic/threadshare/tests/pad.rs +++ b/generic/threadshare/tests/pad.rs @@ -21,8 +21,8 @@ use futures::future::BoxFuture; use futures::lock::Mutex as FutMutex; use futures::prelude::*; -use glib::glib_object_subclass; use glib::GBoxed; +use glib::{glib_object_subclass, glib_wrapper}; use gst::prelude::*; use gst::subclass::prelude::*; @@ -53,654 +53,687 @@ fn init() { }); } -// Src - -static SRC_PROPERTIES: [glib::subclass::Property; 1] = - [glib::subclass::Property("context", |name| { - glib::ParamSpec::string( - name, - "Context", - "Context name to share threads with", - Some(DEFAULT_CONTEXT), - glib::ParamFlags::READWRITE, - ) - })]; - -#[derive(Clone, Debug, Default)] -struct Settings { - context: String, +#[derive(Debug)] +pub enum Item { + Buffer(gst::Buffer), + BufferList(gst::BufferList), + Event(gst::Event), } -lazy_static! { - static ref SRC_CAT: gst::DebugCategory = gst::DebugCategory::new( - "ts-element-src-test", - gst::DebugColorFlags::empty(), - Some("Thread-sharing Test Src Element"), - ); +#[derive(Clone, Debug, GBoxed)] +#[gboxed(type_name = "TsTestItemSender")] +struct ItemSender { + sender: mpsc::Sender<Item>, } -#[derive(Clone, Debug)] -struct PadSrcTestHandler; +// Src +mod imp_src { + use super::*; + + static SRC_PROPERTIES: [glib::subclass::Property; 1] = + [glib::subclass::Property("context", |name| { + glib::ParamSpec::string( + name, + "Context", + "Context name to share threads with", + Some(DEFAULT_CONTEXT), + glib::ParamFlags::READWRITE, + ) + })]; + + #[derive(Clone, Debug, Default)] + struct Settings { + context: String, + } + + lazy_static! { + pub static ref SRC_CAT: gst::DebugCategory = gst::DebugCategory::new( + "ts-element-src-test", + gst::DebugColorFlags::empty(), + Some("Thread-sharing Test Src Element"), + ); + } + + #[derive(Clone, Debug)] + struct PadSrcTestHandler; -impl PadSrcTestHandler { - async fn push_item( - pad: &PadSrcRef<'_>, - item: Item, - ) -> Result<gst::FlowSuccess, gst::FlowError> { - gst_debug!(SRC_CAT, obj: pad.gst_pad(), "Handling {:?}", item); + impl PadSrcTestHandler { + async fn push_item( + pad: &PadSrcRef<'_>, + item: Item, + ) -> Result<gst::FlowSuccess, gst::FlowError> { + gst_debug!(SRC_CAT, obj: pad.gst_pad(), "Handling {:?}", item); - match item { - Item::Event(event) => { - pad.push_event(event).await; + match item { + Item::Event(event) => { + pad.push_event(event).await; - Ok(gst::FlowSuccess::Ok) + Ok(gst::FlowSuccess::Ok) + } + Item::Buffer(buffer) => pad.push(buffer).await, + Item::BufferList(list) => pad.push_list(list).await, } - Item::Buffer(buffer) => pad.push(buffer).await, - Item::BufferList(list) => pad.push_list(list).await, } } -} -impl PadSrcHandler for PadSrcTestHandler { - type ElementImpl = ElementSrcTest; - - fn src_event( - &self, - pad: &PadSrcRef, - elem_src_test: &ElementSrcTest, - _element: &gst::Element, - event: gst::Event, - ) -> bool { - gst_log!(SRC_CAT, obj: pad.gst_pad(), "Handling {:?}", event); - - let ret = match event.view() { - EventView::FlushStart(..) => { - elem_src_test.task.flush_start().unwrap(); - true - } - EventView::Qos(..) | EventView::Reconfigure(..) | EventView::Latency(..) => true, - EventView::FlushStop(..) => { - elem_src_test.task.flush_stop().unwrap(); - true + impl PadSrcHandler for PadSrcTestHandler { + type ElementImpl = ElementSrcTest; + + fn src_event( + &self, + pad: &PadSrcRef, + elem_src_test: &ElementSrcTest, + _element: &gst::Element, + event: gst::Event, + ) -> bool { + gst_log!(SRC_CAT, obj: pad.gst_pad(), "Handling {:?}", event); + + let ret = match event.view() { + EventView::FlushStart(..) => { + elem_src_test.task.flush_start().unwrap(); + true + } + EventView::Qos(..) | EventView::Reconfigure(..) | EventView::Latency(..) => true, + EventView::FlushStop(..) => { + elem_src_test.task.flush_stop().unwrap(); + true + } + _ => false, + }; + + if ret { + gst_log!(SRC_CAT, obj: pad.gst_pad(), "Handled {:?}", event); + } else { + gst_log!(SRC_CAT, obj: pad.gst_pad(), "Didn't handle {:?}", event); } - _ => false, - }; - if ret { - gst_log!(SRC_CAT, obj: pad.gst_pad(), "Handled {:?}", event); - } else { - gst_log!(SRC_CAT, obj: pad.gst_pad(), "Didn't handle {:?}", event); + ret } - - ret } -} -#[derive(Debug)] -struct ElementSrcTestTask { - element: gst::Element, - src_pad: PadSrcWeak, - receiver: mpsc::Receiver<Item>, -} + #[derive(Debug)] + struct ElementSrcTestTask { + element: super::ElementSrcTest, + src_pad: PadSrcWeak, + receiver: mpsc::Receiver<Item>, + } -impl ElementSrcTestTask { - fn new(element: &gst::Element, src_pad: &PadSrc, receiver: mpsc::Receiver<Item>) -> Self { - ElementSrcTestTask { - element: element.clone(), - src_pad: src_pad.downgrade(), - receiver, + impl ElementSrcTestTask { + fn new( + element: &super::ElementSrcTest, + src_pad: &PadSrc, + receiver: mpsc::Receiver<Item>, + ) -> Self { + ElementSrcTestTask { + element: element.clone(), + src_pad: src_pad.downgrade(), + receiver, + } } } -} -impl ElementSrcTestTask { - fn flush(&mut self) { - // Purge the channel - while let Ok(Some(_item)) = self.receiver.try_next() {} + impl ElementSrcTestTask { + fn flush(&mut self) { + // Purge the channel + while let Ok(Some(_item)) = self.receiver.try_next() {} + } } -} - -impl TaskImpl for ElementSrcTestTask { - fn iterate(&mut self) -> BoxFuture<'_, Result<(), gst::FlowError>> { - async move { - let item = self.receiver.next().await; - let item = match item { - Some(item) => item, - None => { - gst_log!(SRC_CAT, obj: &self.element, "SrcPad channel aborted"); - return Err(gst::FlowError::Eos); + impl TaskImpl for ElementSrcTestTask { + fn iterate(&mut self) -> BoxFuture<'_, Result<(), gst::FlowError>> { + async move { + let item = self.receiver.next().await; + + let item = match item { + Some(item) => item, + None => { + gst_log!(SRC_CAT, obj: &self.element, "SrcPad channel aborted"); + return Err(gst::FlowError::Eos); + } + }; + + let pad = self.src_pad.upgrade().expect("PadSrc no longer exists"); + let res = PadSrcTestHandler::push_item(&pad, item).await; + match res { + Ok(_) => gst_log!(SRC_CAT, obj: &self.element, "Successfully pushed item"), + Err(gst::FlowError::Flushing) => { + gst_debug!(SRC_CAT, obj: &self.element, "Flushing") + } + Err(err) => panic!("Got error {}", err), } - }; - let pad = self.src_pad.upgrade().expect("PadSrc no longer exists"); - let res = PadSrcTestHandler::push_item(&pad, item).await; - match res { - Ok(_) => gst_log!(SRC_CAT, obj: &self.element, "Successfully pushed item"), - Err(gst::FlowError::Flushing) => { - gst_debug!(SRC_CAT, obj: &self.element, "Flushing") - } - Err(err) => panic!("Got error {}", err), + res.map(drop) } - - res.map(drop) + .boxed() } - .boxed() - } - fn stop(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> { - async move { - gst_log!(SRC_CAT, obj: &self.element, "Stopping task"); - self.flush(); - gst_log!(SRC_CAT, obj: &self.element, "Task stopped"); - Ok(()) + fn stop(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> { + async move { + gst_log!(SRC_CAT, obj: &self.element, "Stopping task"); + self.flush(); + gst_log!(SRC_CAT, obj: &self.element, "Task stopped"); + Ok(()) + } + .boxed() } - .boxed() - } - fn flush_start(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> { - async move { - gst_log!(SRC_CAT, obj: &self.element, "Starting task flush"); - self.flush(); - gst_log!(SRC_CAT, obj: &self.element, "Task flush started"); - Ok(()) + fn flush_start(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> { + async move { + gst_log!(SRC_CAT, obj: &self.element, "Starting task flush"); + self.flush(); + gst_log!(SRC_CAT, obj: &self.element, "Task flush started"); + Ok(()) + } + .boxed() } - .boxed() } -} -#[derive(Debug)] -struct ElementSrcTest { - src_pad: PadSrc, - task: Task, - sender: StdMutex<Option<mpsc::Sender<Item>>>, - settings: StdMutex<Settings>, -} + #[derive(Debug)] + pub struct ElementSrcTest { + src_pad: PadSrc, + task: Task, + sender: StdMutex<Option<mpsc::Sender<Item>>>, + settings: StdMutex<Settings>, + } -impl ElementSrcTest { - fn try_push(&self, item: Item) -> Result<(), Item> { - let state = self.task.lock_state(); - if *state != TaskState::Started && *state != TaskState::Paused { - gst_debug!(SRC_CAT, "ElementSrcTest rejecting item due to pad state"); + impl ElementSrcTest { + pub fn try_push(&self, item: Item) -> Result<(), Item> { + let state = self.task.lock_state(); + if *state != TaskState::Started && *state != TaskState::Paused { + gst_debug!(SRC_CAT, "ElementSrcTest rejecting item due to pad state"); - return Err(item); - } + return Err(item); + } - match self.sender.lock().unwrap().as_mut() { - Some(sender) => sender - .try_send(item) - .map_err(mpsc::TrySendError::into_inner), - None => Err(item), + match self.sender.lock().unwrap().as_mut() { + Some(sender) => sender + .try_send(item) + .map_err(mpsc::TrySendError::into_inner), + None => Err(item), + } } - } - fn prepare(&self, element: &gst::Element) -> Result<(), gst::ErrorMessage> { - gst_debug!(SRC_CAT, obj: element, "Preparing"); + fn prepare(&self, element: &super::ElementSrcTest) -> Result<(), gst::ErrorMessage> { + gst_debug!(SRC_CAT, obj: element, "Preparing"); - let settings = self.settings.lock().unwrap().clone(); - let context = Context::acquire(&settings.context, THROTTLING_DURATION).map_err(|err| { - gst_error_msg!( - gst::ResourceError::OpenRead, - ["Failed to acquire Context: {}", err] - ) - })?; + let settings = self.settings.lock().unwrap().clone(); + let context = + Context::acquire(&settings.context, THROTTLING_DURATION).map_err(|err| { + gst_error_msg!( + gst::ResourceError::OpenRead, + ["Failed to acquire Context: {}", err] + ) + })?; - let (sender, receiver) = mpsc::channel(1); - *self.sender.lock().unwrap() = Some(sender); + let (sender, receiver) = mpsc::channel(1); + *self.sender.lock().unwrap() = Some(sender); - self.task - .prepare( - ElementSrcTestTask::new(element, &self.src_pad, receiver), - context, - ) - .map_err(|err| { - gst_error_msg!( - gst::ResourceError::Failed, - ["Error preparing Task: {:?}", err] + self.task + .prepare( + ElementSrcTestTask::new(element, &self.src_pad, receiver), + context, ) - })?; + .map_err(|err| { + gst_error_msg!( + gst::ResourceError::Failed, + ["Error preparing Task: {:?}", err] + ) + })?; - gst_debug!(SRC_CAT, obj: element, "Prepared"); + gst_debug!(SRC_CAT, obj: element, "Prepared"); - Ok(()) - } + Ok(()) + } - fn unprepare(&self, element: &gst::Element) { - gst_debug!(SRC_CAT, obj: element, "Unpreparing"); + fn unprepare(&self, element: &super::ElementSrcTest) { + gst_debug!(SRC_CAT, obj: element, "Unpreparing"); - *self.sender.lock().unwrap() = None; - self.task.unprepare().unwrap(); + *self.sender.lock().unwrap() = None; + self.task.unprepare().unwrap(); - gst_debug!(SRC_CAT, obj: element, "Unprepared"); - } + gst_debug!(SRC_CAT, obj: element, "Unprepared"); + } - fn stop(&self, element: &gst::Element) { - gst_debug!(SRC_CAT, obj: element, "Stopping"); - self.task.stop().unwrap(); - gst_debug!(SRC_CAT, obj: element, "Stopped"); - } + fn stop(&self, element: &super::ElementSrcTest) { + gst_debug!(SRC_CAT, obj: element, "Stopping"); + self.task.stop().unwrap(); + gst_debug!(SRC_CAT, obj: element, "Stopped"); + } - fn start(&self, element: &gst::Element) { - gst_debug!(SRC_CAT, obj: element, "Starting"); - self.task.start().unwrap(); - gst_debug!(SRC_CAT, obj: element, "Started"); - } + fn start(&self, element: &super::ElementSrcTest) { + gst_debug!(SRC_CAT, obj: element, "Starting"); + self.task.start().unwrap(); + gst_debug!(SRC_CAT, obj: element, "Started"); + } - fn pause(&self, element: &gst::Element) { - gst_debug!(SRC_CAT, obj: element, "Pausing"); - self.task.pause().unwrap(); - gst_debug!(SRC_CAT, obj: element, "Paused"); + fn pause(&self, element: &super::ElementSrcTest) { + gst_debug!(SRC_CAT, obj: element, "Pausing"); + self.task.pause().unwrap(); + gst_debug!(SRC_CAT, obj: element, "Paused"); + } } -} - -impl ObjectSubclass for ElementSrcTest { - const NAME: &'static str = "TsElementSrcTest"; - type ParentType = gst::Element; - type Instance = gst::subclass::ElementInstanceStruct<Self>; - type Class = glib::subclass::simple::ClassStruct<Self>; - glib_object_subclass!(); - - fn class_init(klass: &mut glib::subclass::simple::ClassStruct<Self>) { - klass.set_metadata( - "Thread-sharing Test Src Element", - "Generic", - "Src Element for Pad Src Test", - "François Laignel <fengalin@free.fr>", - ); + impl ObjectSubclass for ElementSrcTest { + const NAME: &'static str = "TsElementSrcTest"; + type Type = super::ElementSrcTest; + type ParentType = gst::Element; + type Instance = gst::subclass::ElementInstanceStruct<Self>; + type Class = glib::subclass::simple::ClassStruct<Self>; + + glib_object_subclass!(); + + fn class_init(klass: &mut Self::Class) { + klass.set_metadata( + "Thread-sharing Test Src Element", + "Generic", + "Src Element for Pad Src Test", + "François Laignel <fengalin@free.fr>", + ); - let caps = gst::Caps::new_any(); - let src_pad_template = gst::PadTemplate::new( - "src", - gst::PadDirection::Src, - gst::PadPresence::Always, - &caps, - ) - .unwrap(); - klass.add_pad_template(src_pad_template); + let caps = gst::Caps::new_any(); + let src_pad_template = gst::PadTemplate::new( + "src", + gst::PadDirection::Src, + gst::PadPresence::Always, + &caps, + ) + .unwrap(); + klass.add_pad_template(src_pad_template); - klass.install_properties(&SRC_PROPERTIES); - } + klass.install_properties(&SRC_PROPERTIES); + } - fn with_class(klass: &glib::subclass::simple::ClassStruct<Self>) -> Self { - ElementSrcTest { - src_pad: PadSrc::new( - gst::Pad::from_template(&klass.get_pad_template("src").unwrap(), Some("src")), - PadSrcTestHandler, - ), - task: Task::default(), - sender: StdMutex::new(None), - settings: StdMutex::new(Settings::default()), + fn with_class(klass: &Self::Class) -> Self { + ElementSrcTest { + src_pad: PadSrc::new( + gst::Pad::from_template(&klass.get_pad_template("src").unwrap(), Some("src")), + PadSrcTestHandler, + ), + task: Task::default(), + sender: StdMutex::new(None), + settings: StdMutex::new(Settings::default()), + } } } -} -impl ObjectImpl for ElementSrcTest { - fn set_property(&self, _obj: &glib::Object, id: usize, value: &glib::Value) { - let prop = &SRC_PROPERTIES[id]; + impl ObjectImpl for ElementSrcTest { + fn set_property(&self, _obj: &Self::Type, id: usize, value: &glib::Value) { + let prop = &SRC_PROPERTIES[id]; - match *prop { - glib::subclass::Property("context", ..) => { - let context = value - .get() - .expect("type checked upstream") - .unwrap_or_else(|| "".into()); + match *prop { + glib::subclass::Property("context", ..) => { + let context = value + .get() + .expect("type checked upstream") + .unwrap_or_else(|| "".into()); - self.settings.lock().unwrap().context = context; + self.settings.lock().unwrap().context = context; + } + _ => unimplemented!(), } - _ => unimplemented!(), } - } - fn constructed(&self, obj: &glib::Object) { - self.parent_constructed(obj); + fn constructed(&self, obj: &Self::Type) { + self.parent_constructed(obj); - let element = obj.downcast_ref::<gst::Element>().unwrap(); - element.add_pad(self.src_pad.gst_pad()).unwrap(); + obj.add_pad(self.src_pad.gst_pad()).unwrap(); + } } -} -impl ElementImpl for ElementSrcTest { - fn change_state( - &self, - element: &gst::Element, - transition: gst::StateChange, - ) -> Result<gst::StateChangeSuccess, gst::StateChangeError> { - gst_log!(SRC_CAT, obj: element, "Changing state {:?}", transition); - - match transition { - gst::StateChange::NullToReady => { - self.prepare(element).map_err(|err| { - element.post_error_message(err); - gst::StateChangeError - })?; - } - gst::StateChange::PlayingToPaused => { - self.pause(element); - } - gst::StateChange::ReadyToNull => { - self.unprepare(element); + impl ElementImpl for ElementSrcTest { + fn change_state( + &self, + element: &Self::Type, + transition: gst::StateChange, + ) -> Result<gst::StateChangeSuccess, gst::StateChangeError> { + gst_log!(SRC_CAT, obj: element, "Changing state {:?}", transition); + + match transition { + gst::StateChange::NullToReady => { + self.prepare(element).map_err(|err| { + element.post_error_message(err); + gst::StateChangeError + })?; + } + gst::StateChange::PlayingToPaused => { + self.pause(element); + } + gst::StateChange::ReadyToNull => { + self.unprepare(element); + } + _ => (), } - _ => (), - } - let mut success = self.parent_change_state(element, transition)?; + let mut success = self.parent_change_state(element, transition)?; - match transition { - gst::StateChange::PausedToReady => { - self.stop(element); - } - gst::StateChange::PausedToPlaying => { - self.start(element); - } - gst::StateChange::ReadyToPaused | gst::StateChange::PlayingToPaused => { - success = gst::StateChangeSuccess::NoPreroll; + match transition { + gst::StateChange::PausedToReady => { + self.stop(element); + } + gst::StateChange::PausedToPlaying => { + self.start(element); + } + gst::StateChange::ReadyToPaused | gst::StateChange::PlayingToPaused => { + success = gst::StateChangeSuccess::NoPreroll; + } + _ => (), } - _ => (), - } - Ok(success) - } + Ok(success) + } - fn send_event(&self, _element: &gst::Element, event: gst::Event) -> bool { - match event.view() { - EventView::FlushStart(..) => { - self.task.flush_start().unwrap(); - } - EventView::FlushStop(..) => { - self.task.flush_stop().unwrap(); + fn send_event(&self, _element: &Self::Type, event: gst::Event) -> bool { + match event.view() { + EventView::FlushStart(..) => { + self.task.flush_start().unwrap(); + } + EventView::FlushStop(..) => { + self.task.flush_stop().unwrap(); + } + _ => (), } - _ => (), - } - if !event.is_serialized() { - self.src_pad.gst_pad().push_event(event) - } else { - self.try_push(Item::Event(event)).is_ok() + if !event.is_serialized() { + self.src_pad.gst_pad().push_event(event) + } else { + self.try_push(Item::Event(event)).is_ok() + } } } } -// Sink - -#[derive(Debug)] -enum Item { - Buffer(gst::Buffer), - BufferList(gst::BufferList), - Event(gst::Event), -} - -#[derive(Clone, Debug, GBoxed)] -#[gboxed(type_name = "TsTestItemSender")] -struct ItemSender { - sender: mpsc::Sender<Item>, +glib_wrapper! { + pub struct ElementSrcTest(ObjectSubclass<imp_src::ElementSrcTest>) @extends gst::Element, gst::Object; } +unsafe impl Send for ElementSrcTest {} +unsafe impl Sync for ElementSrcTest {} -static SINK_PROPERTIES: [glib::subclass::Property; 1] = - [glib::subclass::Property("sender", |name| { - glib::ParamSpec::boxed( - name, - "Sender", - "Channel sender to forward the incoming items to", - ItemSender::get_type(), - glib::ParamFlags::WRITABLE, - ) - })]; - -#[derive(Clone, Debug, Default)] -struct PadSinkTestHandler; - -impl PadSinkHandler for PadSinkTestHandler { - type ElementImpl = ElementSinkTest; - - fn sink_chain( - &self, - _pad: &PadSinkRef, - _elem_sink_test: &ElementSinkTest, - element: &gst::Element, - buffer: gst::Buffer, - ) -> BoxFuture<'static, Result<gst::FlowSuccess, gst::FlowError>> { - let element = element.clone(); - async move { - let elem_sink_test = ElementSinkTest::from_instance(&element); - elem_sink_test - .forward_item(&element, Item::Buffer(buffer)) - .await - } - .boxed() - } +// Sink - fn sink_chain_list( - &self, - _pad: &PadSinkRef, - _elem_sink_test: &ElementSinkTest, - element: &gst::Element, - list: gst::BufferList, - ) -> BoxFuture<'static, Result<gst::FlowSuccess, gst::FlowError>> { - let element = element.clone(); - async move { - let elem_sink_test = ElementSinkTest::from_instance(&element); - elem_sink_test - .forward_item(&element, Item::BufferList(list)) - .await +mod imp_sink { + use super::*; + + static SINK_PROPERTIES: [glib::subclass::Property; 1] = + [glib::subclass::Property("sender", |name| { + glib::ParamSpec::boxed( + name, + "Sender", + "Channel sender to forward the incoming items to", + ItemSender::get_type(), + glib::ParamFlags::WRITABLE, + ) + })]; + + #[derive(Clone, Debug, Default)] + struct PadSinkTestHandler; + + impl PadSinkHandler for PadSinkTestHandler { + type ElementImpl = ElementSinkTest; + + fn sink_chain( + &self, + _pad: &PadSinkRef, + _elem_sink_test: &ElementSinkTest, + element: &gst::Element, + buffer: gst::Buffer, + ) -> BoxFuture<'static, Result<gst::FlowSuccess, gst::FlowError>> { + let element = element + .clone() + .downcast::<super::ElementSinkTest>() + .unwrap(); + async move { + let elem_sink_test = ElementSinkTest::from_instance(&element); + elem_sink_test + .forward_item(&element, Item::Buffer(buffer)) + .await + } + .boxed() } - .boxed() - } - fn sink_event( - &self, - pad: &PadSinkRef, - elem_sink_test: &ElementSinkTest, - element: &gst::Element, - event: gst::Event, - ) -> bool { - gst_debug!(SINK_CAT, obj: pad.gst_pad(), "Handling non-serialized {:?}", event); - - match event.view() { - EventView::FlushStart(..) => { - elem_sink_test.stop(&element); - true + fn sink_chain_list( + &self, + _pad: &PadSinkRef, + _elem_sink_test: &ElementSinkTest, + element: &gst::Element, + list: gst::BufferList, + ) -> BoxFuture<'static, Result<gst::FlowSuccess, gst::FlowError>> { + let element = element + .clone() + .downcast::<super::ElementSinkTest>() + .unwrap(); + async move { + let elem_sink_test = ElementSinkTest::from_instance(&element); + elem_sink_test + .forward_item(&element, Item::BufferList(list)) + .await } - _ => false, + .boxed() } - } - fn sink_event_serialized( - &self, - pad: &PadSinkRef, - _elem_sink_test: &ElementSinkTest, - element: &gst::Element, - event: gst::Event, - ) -> BoxFuture<'static, bool> { - gst_log!(SINK_CAT, obj: pad.gst_pad(), "Handling serialized {:?}", event); - - let element = element.clone(); - async move { - let elem_sink_test = ElementSinkTest::from_instance(&element); - - if let EventView::FlushStop(..) = event.view() { - elem_sink_test.start(&element); + fn sink_event( + &self, + pad: &PadSinkRef, + elem_sink_test: &ElementSinkTest, + element: &gst::Element, + event: gst::Event, + ) -> bool { + gst_debug!(SINK_CAT, obj: pad.gst_pad(), "Handling non-serialized {:?}", event); + + match event.view() { + EventView::FlushStart(..) => { + elem_sink_test.stop(&element.downcast_ref::<super::ElementSinkTest>().unwrap()); + true + } + _ => false, } - - elem_sink_test - .forward_item(&element, Item::Event(event)) - .await - .is_ok() } - .boxed() - } -} -#[derive(Debug)] -struct ElementSinkTest { - sink_pad: PadSink, - flushing: AtomicBool, - sender: FutMutex<Option<mpsc::Sender<Item>>>, -} + fn sink_event_serialized( + &self, + pad: &PadSinkRef, + _elem_sink_test: &ElementSinkTest, + element: &gst::Element, + event: gst::Event, + ) -> BoxFuture<'static, bool> { + gst_log!(SINK_CAT, obj: pad.gst_pad(), "Handling serialized {:?}", event); + + let element = element + .clone() + .downcast::<super::ElementSinkTest>() + .unwrap(); + async move { + let elem_sink_test = ElementSinkTest::from_instance(&element); + + if let EventView::FlushStop(..) = event.view() { + elem_sink_test.start(&element); + } -impl ElementSinkTest { - async fn forward_item( - &self, - element: &gst::Element, - item: Item, - ) -> Result<gst::FlowSuccess, gst::FlowError> { - if !self.flushing.load(Ordering::SeqCst) { - gst_debug!(SINK_CAT, obj: element, "Fowarding {:?}", item); - self.sender - .lock() - .await - .as_mut() - .expect("Item Sender not set") - .send(item) - .await - .map(|_| gst::FlowSuccess::Ok) - .map_err(|_| gst::FlowError::Error) - } else { - gst_debug!( - SINK_CAT, - obj: element, - "Not fowarding {:?} due to flushing", - item - ); - Err(gst::FlowError::Flushing) + elem_sink_test + .forward_item(&element, Item::Event(event)) + .await + .is_ok() + } + .boxed() } } - fn start(&self, element: &gst::Element) { - gst_debug!(SINK_CAT, obj: element, "Starting"); - self.flushing.store(false, Ordering::SeqCst); - gst_debug!(SINK_CAT, obj: element, "Started"); + #[derive(Debug)] + pub struct ElementSinkTest { + sink_pad: PadSink, + flushing: AtomicBool, + sender: FutMutex<Option<mpsc::Sender<Item>>>, } - fn stop(&self, element: &gst::Element) { - gst_debug!(SINK_CAT, obj: element, "Stopping"); - self.flushing.store(true, Ordering::SeqCst); - gst_debug!(SINK_CAT, obj: element, "Stopped"); - } -} + impl ElementSinkTest { + async fn forward_item( + &self, + element: &super::ElementSinkTest, + item: Item, + ) -> Result<gst::FlowSuccess, gst::FlowError> { + if !self.flushing.load(Ordering::SeqCst) { + gst_debug!(SINK_CAT, obj: element, "Fowarding {:?}", item); + self.sender + .lock() + .await + .as_mut() + .expect("Item Sender not set") + .send(item) + .await + .map(|_| gst::FlowSuccess::Ok) + .map_err(|_| gst::FlowError::Error) + } else { + gst_debug!( + SINK_CAT, + obj: element, + "Not fowarding {:?} due to flushing", + item + ); + Err(gst::FlowError::Flushing) + } + } -impl ElementSinkTest { - fn push_flush_start(&self, element: &gst::Element) { - gst_debug!(SINK_CAT, obj: element, "Pushing FlushStart"); - self.sink_pad - .gst_pad() - .push_event(gst::event::FlushStart::new()); - gst_debug!(SINK_CAT, obj: element, "FlushStart pushed"); - } + fn start(&self, element: &super::ElementSinkTest) { + gst_debug!(SINK_CAT, obj: element, "Starting"); + self.flushing.store(false, Ordering::SeqCst); + gst_debug!(SINK_CAT, obj: element, "Started"); + } - fn push_flush_stop(&self, element: &gst::Element) { - gst_debug!(SINK_CAT, obj: element, "Pushing FlushStop"); - self.sink_pad - .gst_pad() - .push_event(gst::event::FlushStop::new(true)); - gst_debug!(SINK_CAT, obj: element, "FlushStop pushed"); + fn stop(&self, element: &super::ElementSinkTest) { + gst_debug!(SINK_CAT, obj: element, "Stopping"); + self.flushing.store(true, Ordering::SeqCst); + gst_debug!(SINK_CAT, obj: element, "Stopped"); + } } -} - -lazy_static! { - static ref SINK_CAT: gst::DebugCategory = gst::DebugCategory::new( - "ts-element-sink-test", - gst::DebugColorFlags::empty(), - Some("Thread-sharing Test Sink Element"), - ); -} -impl ObjectSubclass for ElementSinkTest { - const NAME: &'static str = "TsElementSinkTest"; - type ParentType = gst::Element; - type Instance = gst::subclass::ElementInstanceStruct<Self>; - type Class = glib::subclass::simple::ClassStruct<Self>; + impl ElementSinkTest { + pub fn push_flush_start(&self, element: &super::ElementSinkTest) { + gst_debug!(SINK_CAT, obj: element, "Pushing FlushStart"); + self.sink_pad + .gst_pad() + .push_event(gst::event::FlushStart::new()); + gst_debug!(SINK_CAT, obj: element, "FlushStart pushed"); + } - glib_object_subclass!(); + pub fn push_flush_stop(&self, element: &super::ElementSinkTest) { + gst_debug!(SINK_CAT, obj: element, "Pushing FlushStop"); + self.sink_pad + .gst_pad() + .push_event(gst::event::FlushStop::new(true)); + gst_debug!(SINK_CAT, obj: element, "FlushStop pushed"); + } + } - fn class_init(klass: &mut glib::subclass::simple::ClassStruct<Self>) { - klass.set_metadata( - "Thread-sharing Test Sink Element", - "Generic", - "Sink Element for Pad Test", - "François Laignel <fengalin@free.fr>", + lazy_static! { + static ref SINK_CAT: gst::DebugCategory = gst::DebugCategory::new( + "ts-element-sink-test", + gst::DebugColorFlags::empty(), + Some("Thread-sharing Test Sink Element"), ); + } - let caps = gst::Caps::new_any(); - let sink_pad_template = gst::PadTemplate::new( - "sink", - gst::PadDirection::Sink, - gst::PadPresence::Always, - &caps, - ) - .unwrap(); - klass.add_pad_template(sink_pad_template); + impl ObjectSubclass for ElementSinkTest { + const NAME: &'static str = "TsElementSinkTest"; + type Type = super::ElementSinkTest; + type ParentType = gst::Element; + type Instance = gst::subclass::ElementInstanceStruct<Self>; + type Class = glib::subclass::simple::ClassStruct<Self>; + + glib_object_subclass!(); + + fn class_init(klass: &mut Self::Class) { + klass.set_metadata( + "Thread-sharing Test Sink Element", + "Generic", + "Sink Element for Pad Test", + "François Laignel <fengalin@free.fr>", + ); - klass.install_properties(&SINK_PROPERTIES); - } + let caps = gst::Caps::new_any(); + let sink_pad_template = gst::PadTemplate::new( + "sink", + gst::PadDirection::Sink, + gst::PadPresence::Always, + &caps, + ) + .unwrap(); + klass.add_pad_template(sink_pad_template); - fn with_class(klass: &glib::subclass::simple::ClassStruct<Self>) -> Self { - ElementSinkTest { - sink_pad: PadSink::new( - gst::Pad::from_template(&klass.get_pad_template("sink").unwrap(), Some("sink")), - PadSinkTestHandler, - ), - flushing: AtomicBool::new(true), - sender: FutMutex::new(None), + klass.install_properties(&SINK_PROPERTIES); } - } -} -impl ObjectImpl for ElementSinkTest { - fn set_property(&self, _obj: &glib::Object, id: usize, value: &glib::Value) { - let prop = &SINK_PROPERTIES[id]; - - match *prop { - glib::subclass::Property("sender", ..) => { - let ItemSender { sender } = value - .get::<&ItemSender>() - .expect("type checked upstream") - .expect("ItemSender not found") - .clone(); - *futures::executor::block_on(self.sender.lock()) = Some(sender); + fn with_class(klass: &Self::Class) -> Self { + ElementSinkTest { + sink_pad: PadSink::new( + gst::Pad::from_template(&klass.get_pad_template("sink").unwrap(), Some("sink")), + PadSinkTestHandler, + ), + flushing: AtomicBool::new(true), + sender: FutMutex::new(None), } - _ => unimplemented!(), } } - fn constructed(&self, obj: &glib::Object) { - self.parent_constructed(obj); + impl ObjectImpl for ElementSinkTest { + fn set_property(&self, _obj: &Self::Type, id: usize, value: &glib::Value) { + let prop = &SINK_PROPERTIES[id]; + + match *prop { + glib::subclass::Property("sender", ..) => { + let ItemSender { sender } = value + .get::<&ItemSender>() + .expect("type checked upstream") + .expect("ItemSender not found") + .clone(); + *futures::executor::block_on(self.sender.lock()) = Some(sender); + } + _ => unimplemented!(), + } + } - let element = obj.downcast_ref::<gst::Element>().unwrap(); - element.add_pad(self.sink_pad.gst_pad()).unwrap(); + fn constructed(&self, obj: &Self::Type) { + self.parent_constructed(obj); + + obj.add_pad(self.sink_pad.gst_pad()).unwrap(); + } } -} -impl ElementImpl for ElementSinkTest { - fn change_state( - &self, - element: &gst::Element, - transition: gst::StateChange, - ) -> Result<gst::StateChangeSuccess, gst::StateChangeError> { - gst_log!(SINK_CAT, obj: element, "Changing state {:?}", transition); + impl ElementImpl for ElementSinkTest { + fn change_state( + &self, + element: &Self::Type, + transition: gst::StateChange, + ) -> Result<gst::StateChangeSuccess, gst::StateChangeError> { + gst_log!(SINK_CAT, obj: element, "Changing state {:?}", transition); - if let gst::StateChange::PausedToReady = transition { - self.stop(element); - } + if let gst::StateChange::PausedToReady = transition { + self.stop(element); + } - let success = self.parent_change_state(element, transition)?; + let success = self.parent_change_state(element, transition)?; - if let gst::StateChange::ReadyToPaused = transition { - self.start(element); - } + if let gst::StateChange::ReadyToPaused = transition { + self.start(element); + } - Ok(success) + Ok(success) + } } } +glib_wrapper! { + pub struct ElementSinkTest(ObjectSubclass<imp_sink::ElementSinkTest>) @extends gst::Element, gst::Object; +} +unsafe impl Send for ElementSinkTest {} +unsafe impl Sync for ElementSinkTest {} + fn setup( context_name: &str, mut middle_element_1: Option<gst::Element>, mut middle_element_2: Option<gst::Element>, ) -> ( gst::Pipeline, - gst::Element, - gst::Element, + ElementSrcTest, + ElementSinkTest, mpsc::Receiver<Item>, ) { init(); @@ -708,14 +741,14 @@ fn setup( let pipeline = gst::Pipeline::new(None); // Src - let src_element = glib::Object::new(ElementSrcTest::get_type(), &[]) + let src_element = glib::Object::new(ElementSrcTest::static_type(), &[]) .unwrap() - .downcast::<gst::Element>() + .downcast::<ElementSrcTest>() .unwrap(); src_element.set_property("context", &context_name).unwrap(); pipeline.add(&src_element).unwrap(); - let mut last_element = src_element.clone(); + let mut last_element = src_element.clone().upcast::<gst::Element>(); if let Some(middle_element) = middle_element_1.take() { pipeline.add(&middle_element).unwrap(); @@ -730,9 +763,9 @@ fn setup( } // Sink - let sink_element = glib::Object::new(ElementSinkTest::get_type(), &[]) + let sink_element = glib::Object::new(ElementSinkTest::static_type(), &[]) .unwrap() - .downcast::<gst::Element>() + .downcast::<ElementSinkTest>() .unwrap(); pipeline.add(&sink_element).unwrap(); last_element.link(&sink_element).unwrap(); @@ -748,10 +781,10 @@ fn setup( fn nominal_scenario( scenario_name: &str, pipeline: gst::Pipeline, - src_element: gst::Element, + src_element: ElementSrcTest, mut receiver: mpsc::Receiver<Item>, ) { - let elem_src_test = ElementSrcTest::from_instance(&src_element); + let elem_src_test = imp_src::ElementSrcTest::from_instance(&src_element); pipeline.set_state(gst::State::Playing).unwrap(); @@ -975,7 +1008,7 @@ fn start_pause_start() { let (pipeline, src_element, _sink_element, mut receiver) = setup(&scenario_name, None, None); - let elem_src_test = ElementSrcTest::from_instance(&src_element); + let elem_src_test = imp_src::ElementSrcTest::from_instance(&src_element); pipeline.set_state(gst::State::Playing).unwrap(); @@ -1053,7 +1086,7 @@ fn start_stop_start() { let (pipeline, src_element, _sink_element, mut receiver) = setup(&scenario_name, None, None); - let elem_src_test = ElementSrcTest::from_instance(&src_element); + let elem_src_test = imp_src::ElementSrcTest::from_instance(&src_element); pipeline.set_state(gst::State::Playing).unwrap(); @@ -1109,7 +1142,7 @@ fn start_stop_start() { match futures::executor::block_on(receiver.next()).unwrap() { Item::Buffer(_buffer) => { gst_info!( - SRC_CAT, + imp_src::SRC_CAT, "{}: initial buffer went through, don't expect any pending item to be dropped", scenario_name ); @@ -1166,7 +1199,7 @@ fn start_flush() { let (pipeline, src_element, sink_element, mut receiver) = setup(&scenario_name, None, None); - let elem_src_test = ElementSrcTest::from_instance(&src_element); + let elem_src_test = imp_src::ElementSrcTest::from_instance(&src_element); pipeline.set_state(gst::State::Playing).unwrap(); @@ -1206,7 +1239,7 @@ fn start_flush() { .try_push(Item::Buffer(gst::Buffer::from_slice(vec![1, 2, 3, 4]))) .unwrap(); - let elem_sink_test = ElementSinkTest::from_instance(&sink_element); + let elem_sink_test = imp_sink::ElementSinkTest::from_instance(&sink_element); elem_sink_test.push_flush_start(&sink_element); |