Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.freedesktop.org/gstreamer/gst-plugins-rs.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
Diffstat (limited to 'generic/threadshare/tests/pad.rs')
-rw-r--r--generic/threadshare/tests/pad.rs1103
1 files changed, 568 insertions, 535 deletions
diff --git a/generic/threadshare/tests/pad.rs b/generic/threadshare/tests/pad.rs
index 78408d1e9..fa22175de 100644
--- a/generic/threadshare/tests/pad.rs
+++ b/generic/threadshare/tests/pad.rs
@@ -21,8 +21,8 @@ use futures::future::BoxFuture;
use futures::lock::Mutex as FutMutex;
use futures::prelude::*;
-use glib::glib_object_subclass;
use glib::GBoxed;
+use glib::{glib_object_subclass, glib_wrapper};
use gst::prelude::*;
use gst::subclass::prelude::*;
@@ -53,654 +53,687 @@ fn init() {
});
}
-// Src
-
-static SRC_PROPERTIES: [glib::subclass::Property; 1] =
- [glib::subclass::Property("context", |name| {
- glib::ParamSpec::string(
- name,
- "Context",
- "Context name to share threads with",
- Some(DEFAULT_CONTEXT),
- glib::ParamFlags::READWRITE,
- )
- })];
-
-#[derive(Clone, Debug, Default)]
-struct Settings {
- context: String,
+#[derive(Debug)]
+pub enum Item {
+ Buffer(gst::Buffer),
+ BufferList(gst::BufferList),
+ Event(gst::Event),
}
-lazy_static! {
- static ref SRC_CAT: gst::DebugCategory = gst::DebugCategory::new(
- "ts-element-src-test",
- gst::DebugColorFlags::empty(),
- Some("Thread-sharing Test Src Element"),
- );
+#[derive(Clone, Debug, GBoxed)]
+#[gboxed(type_name = "TsTestItemSender")]
+struct ItemSender {
+ sender: mpsc::Sender<Item>,
}
-#[derive(Clone, Debug)]
-struct PadSrcTestHandler;
+// Src
+mod imp_src {
+ use super::*;
+
+ static SRC_PROPERTIES: [glib::subclass::Property; 1] =
+ [glib::subclass::Property("context", |name| {
+ glib::ParamSpec::string(
+ name,
+ "Context",
+ "Context name to share threads with",
+ Some(DEFAULT_CONTEXT),
+ glib::ParamFlags::READWRITE,
+ )
+ })];
+
+ #[derive(Clone, Debug, Default)]
+ struct Settings {
+ context: String,
+ }
+
+ lazy_static! {
+ pub static ref SRC_CAT: gst::DebugCategory = gst::DebugCategory::new(
+ "ts-element-src-test",
+ gst::DebugColorFlags::empty(),
+ Some("Thread-sharing Test Src Element"),
+ );
+ }
+
+ #[derive(Clone, Debug)]
+ struct PadSrcTestHandler;
-impl PadSrcTestHandler {
- async fn push_item(
- pad: &PadSrcRef<'_>,
- item: Item,
- ) -> Result<gst::FlowSuccess, gst::FlowError> {
- gst_debug!(SRC_CAT, obj: pad.gst_pad(), "Handling {:?}", item);
+ impl PadSrcTestHandler {
+ async fn push_item(
+ pad: &PadSrcRef<'_>,
+ item: Item,
+ ) -> Result<gst::FlowSuccess, gst::FlowError> {
+ gst_debug!(SRC_CAT, obj: pad.gst_pad(), "Handling {:?}", item);
- match item {
- Item::Event(event) => {
- pad.push_event(event).await;
+ match item {
+ Item::Event(event) => {
+ pad.push_event(event).await;
- Ok(gst::FlowSuccess::Ok)
+ Ok(gst::FlowSuccess::Ok)
+ }
+ Item::Buffer(buffer) => pad.push(buffer).await,
+ Item::BufferList(list) => pad.push_list(list).await,
}
- Item::Buffer(buffer) => pad.push(buffer).await,
- Item::BufferList(list) => pad.push_list(list).await,
}
}
-}
-impl PadSrcHandler for PadSrcTestHandler {
- type ElementImpl = ElementSrcTest;
-
- fn src_event(
- &self,
- pad: &PadSrcRef,
- elem_src_test: &ElementSrcTest,
- _element: &gst::Element,
- event: gst::Event,
- ) -> bool {
- gst_log!(SRC_CAT, obj: pad.gst_pad(), "Handling {:?}", event);
-
- let ret = match event.view() {
- EventView::FlushStart(..) => {
- elem_src_test.task.flush_start().unwrap();
- true
- }
- EventView::Qos(..) | EventView::Reconfigure(..) | EventView::Latency(..) => true,
- EventView::FlushStop(..) => {
- elem_src_test.task.flush_stop().unwrap();
- true
+ impl PadSrcHandler for PadSrcTestHandler {
+ type ElementImpl = ElementSrcTest;
+
+ fn src_event(
+ &self,
+ pad: &PadSrcRef,
+ elem_src_test: &ElementSrcTest,
+ _element: &gst::Element,
+ event: gst::Event,
+ ) -> bool {
+ gst_log!(SRC_CAT, obj: pad.gst_pad(), "Handling {:?}", event);
+
+ let ret = match event.view() {
+ EventView::FlushStart(..) => {
+ elem_src_test.task.flush_start().unwrap();
+ true
+ }
+ EventView::Qos(..) | EventView::Reconfigure(..) | EventView::Latency(..) => true,
+ EventView::FlushStop(..) => {
+ elem_src_test.task.flush_stop().unwrap();
+ true
+ }
+ _ => false,
+ };
+
+ if ret {
+ gst_log!(SRC_CAT, obj: pad.gst_pad(), "Handled {:?}", event);
+ } else {
+ gst_log!(SRC_CAT, obj: pad.gst_pad(), "Didn't handle {:?}", event);
}
- _ => false,
- };
- if ret {
- gst_log!(SRC_CAT, obj: pad.gst_pad(), "Handled {:?}", event);
- } else {
- gst_log!(SRC_CAT, obj: pad.gst_pad(), "Didn't handle {:?}", event);
+ ret
}
-
- ret
}
-}
-#[derive(Debug)]
-struct ElementSrcTestTask {
- element: gst::Element,
- src_pad: PadSrcWeak,
- receiver: mpsc::Receiver<Item>,
-}
+ #[derive(Debug)]
+ struct ElementSrcTestTask {
+ element: super::ElementSrcTest,
+ src_pad: PadSrcWeak,
+ receiver: mpsc::Receiver<Item>,
+ }
-impl ElementSrcTestTask {
- fn new(element: &gst::Element, src_pad: &PadSrc, receiver: mpsc::Receiver<Item>) -> Self {
- ElementSrcTestTask {
- element: element.clone(),
- src_pad: src_pad.downgrade(),
- receiver,
+ impl ElementSrcTestTask {
+ fn new(
+ element: &super::ElementSrcTest,
+ src_pad: &PadSrc,
+ receiver: mpsc::Receiver<Item>,
+ ) -> Self {
+ ElementSrcTestTask {
+ element: element.clone(),
+ src_pad: src_pad.downgrade(),
+ receiver,
+ }
}
}
-}
-impl ElementSrcTestTask {
- fn flush(&mut self) {
- // Purge the channel
- while let Ok(Some(_item)) = self.receiver.try_next() {}
+ impl ElementSrcTestTask {
+ fn flush(&mut self) {
+ // Purge the channel
+ while let Ok(Some(_item)) = self.receiver.try_next() {}
+ }
}
-}
-
-impl TaskImpl for ElementSrcTestTask {
- fn iterate(&mut self) -> BoxFuture<'_, Result<(), gst::FlowError>> {
- async move {
- let item = self.receiver.next().await;
- let item = match item {
- Some(item) => item,
- None => {
- gst_log!(SRC_CAT, obj: &self.element, "SrcPad channel aborted");
- return Err(gst::FlowError::Eos);
+ impl TaskImpl for ElementSrcTestTask {
+ fn iterate(&mut self) -> BoxFuture<'_, Result<(), gst::FlowError>> {
+ async move {
+ let item = self.receiver.next().await;
+
+ let item = match item {
+ Some(item) => item,
+ None => {
+ gst_log!(SRC_CAT, obj: &self.element, "SrcPad channel aborted");
+ return Err(gst::FlowError::Eos);
+ }
+ };
+
+ let pad = self.src_pad.upgrade().expect("PadSrc no longer exists");
+ let res = PadSrcTestHandler::push_item(&pad, item).await;
+ match res {
+ Ok(_) => gst_log!(SRC_CAT, obj: &self.element, "Successfully pushed item"),
+ Err(gst::FlowError::Flushing) => {
+ gst_debug!(SRC_CAT, obj: &self.element, "Flushing")
+ }
+ Err(err) => panic!("Got error {}", err),
}
- };
- let pad = self.src_pad.upgrade().expect("PadSrc no longer exists");
- let res = PadSrcTestHandler::push_item(&pad, item).await;
- match res {
- Ok(_) => gst_log!(SRC_CAT, obj: &self.element, "Successfully pushed item"),
- Err(gst::FlowError::Flushing) => {
- gst_debug!(SRC_CAT, obj: &self.element, "Flushing")
- }
- Err(err) => panic!("Got error {}", err),
+ res.map(drop)
}
-
- res.map(drop)
+ .boxed()
}
- .boxed()
- }
- fn stop(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
- async move {
- gst_log!(SRC_CAT, obj: &self.element, "Stopping task");
- self.flush();
- gst_log!(SRC_CAT, obj: &self.element, "Task stopped");
- Ok(())
+ fn stop(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
+ async move {
+ gst_log!(SRC_CAT, obj: &self.element, "Stopping task");
+ self.flush();
+ gst_log!(SRC_CAT, obj: &self.element, "Task stopped");
+ Ok(())
+ }
+ .boxed()
}
- .boxed()
- }
- fn flush_start(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
- async move {
- gst_log!(SRC_CAT, obj: &self.element, "Starting task flush");
- self.flush();
- gst_log!(SRC_CAT, obj: &self.element, "Task flush started");
- Ok(())
+ fn flush_start(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
+ async move {
+ gst_log!(SRC_CAT, obj: &self.element, "Starting task flush");
+ self.flush();
+ gst_log!(SRC_CAT, obj: &self.element, "Task flush started");
+ Ok(())
+ }
+ .boxed()
}
- .boxed()
}
-}
-#[derive(Debug)]
-struct ElementSrcTest {
- src_pad: PadSrc,
- task: Task,
- sender: StdMutex<Option<mpsc::Sender<Item>>>,
- settings: StdMutex<Settings>,
-}
+ #[derive(Debug)]
+ pub struct ElementSrcTest {
+ src_pad: PadSrc,
+ task: Task,
+ sender: StdMutex<Option<mpsc::Sender<Item>>>,
+ settings: StdMutex<Settings>,
+ }
-impl ElementSrcTest {
- fn try_push(&self, item: Item) -> Result<(), Item> {
- let state = self.task.lock_state();
- if *state != TaskState::Started && *state != TaskState::Paused {
- gst_debug!(SRC_CAT, "ElementSrcTest rejecting item due to pad state");
+ impl ElementSrcTest {
+ pub fn try_push(&self, item: Item) -> Result<(), Item> {
+ let state = self.task.lock_state();
+ if *state != TaskState::Started && *state != TaskState::Paused {
+ gst_debug!(SRC_CAT, "ElementSrcTest rejecting item due to pad state");
- return Err(item);
- }
+ return Err(item);
+ }
- match self.sender.lock().unwrap().as_mut() {
- Some(sender) => sender
- .try_send(item)
- .map_err(mpsc::TrySendError::into_inner),
- None => Err(item),
+ match self.sender.lock().unwrap().as_mut() {
+ Some(sender) => sender
+ .try_send(item)
+ .map_err(mpsc::TrySendError::into_inner),
+ None => Err(item),
+ }
}
- }
- fn prepare(&self, element: &gst::Element) -> Result<(), gst::ErrorMessage> {
- gst_debug!(SRC_CAT, obj: element, "Preparing");
+ fn prepare(&self, element: &super::ElementSrcTest) -> Result<(), gst::ErrorMessage> {
+ gst_debug!(SRC_CAT, obj: element, "Preparing");
- let settings = self.settings.lock().unwrap().clone();
- let context = Context::acquire(&settings.context, THROTTLING_DURATION).map_err(|err| {
- gst_error_msg!(
- gst::ResourceError::OpenRead,
- ["Failed to acquire Context: {}", err]
- )
- })?;
+ let settings = self.settings.lock().unwrap().clone();
+ let context =
+ Context::acquire(&settings.context, THROTTLING_DURATION).map_err(|err| {
+ gst_error_msg!(
+ gst::ResourceError::OpenRead,
+ ["Failed to acquire Context: {}", err]
+ )
+ })?;
- let (sender, receiver) = mpsc::channel(1);
- *self.sender.lock().unwrap() = Some(sender);
+ let (sender, receiver) = mpsc::channel(1);
+ *self.sender.lock().unwrap() = Some(sender);
- self.task
- .prepare(
- ElementSrcTestTask::new(element, &self.src_pad, receiver),
- context,
- )
- .map_err(|err| {
- gst_error_msg!(
- gst::ResourceError::Failed,
- ["Error preparing Task: {:?}", err]
+ self.task
+ .prepare(
+ ElementSrcTestTask::new(element, &self.src_pad, receiver),
+ context,
)
- })?;
+ .map_err(|err| {
+ gst_error_msg!(
+ gst::ResourceError::Failed,
+ ["Error preparing Task: {:?}", err]
+ )
+ })?;
- gst_debug!(SRC_CAT, obj: element, "Prepared");
+ gst_debug!(SRC_CAT, obj: element, "Prepared");
- Ok(())
- }
+ Ok(())
+ }
- fn unprepare(&self, element: &gst::Element) {
- gst_debug!(SRC_CAT, obj: element, "Unpreparing");
+ fn unprepare(&self, element: &super::ElementSrcTest) {
+ gst_debug!(SRC_CAT, obj: element, "Unpreparing");
- *self.sender.lock().unwrap() = None;
- self.task.unprepare().unwrap();
+ *self.sender.lock().unwrap() = None;
+ self.task.unprepare().unwrap();
- gst_debug!(SRC_CAT, obj: element, "Unprepared");
- }
+ gst_debug!(SRC_CAT, obj: element, "Unprepared");
+ }
- fn stop(&self, element: &gst::Element) {
- gst_debug!(SRC_CAT, obj: element, "Stopping");
- self.task.stop().unwrap();
- gst_debug!(SRC_CAT, obj: element, "Stopped");
- }
+ fn stop(&self, element: &super::ElementSrcTest) {
+ gst_debug!(SRC_CAT, obj: element, "Stopping");
+ self.task.stop().unwrap();
+ gst_debug!(SRC_CAT, obj: element, "Stopped");
+ }
- fn start(&self, element: &gst::Element) {
- gst_debug!(SRC_CAT, obj: element, "Starting");
- self.task.start().unwrap();
- gst_debug!(SRC_CAT, obj: element, "Started");
- }
+ fn start(&self, element: &super::ElementSrcTest) {
+ gst_debug!(SRC_CAT, obj: element, "Starting");
+ self.task.start().unwrap();
+ gst_debug!(SRC_CAT, obj: element, "Started");
+ }
- fn pause(&self, element: &gst::Element) {
- gst_debug!(SRC_CAT, obj: element, "Pausing");
- self.task.pause().unwrap();
- gst_debug!(SRC_CAT, obj: element, "Paused");
+ fn pause(&self, element: &super::ElementSrcTest) {
+ gst_debug!(SRC_CAT, obj: element, "Pausing");
+ self.task.pause().unwrap();
+ gst_debug!(SRC_CAT, obj: element, "Paused");
+ }
}
-}
-
-impl ObjectSubclass for ElementSrcTest {
- const NAME: &'static str = "TsElementSrcTest";
- type ParentType = gst::Element;
- type Instance = gst::subclass::ElementInstanceStruct<Self>;
- type Class = glib::subclass::simple::ClassStruct<Self>;
- glib_object_subclass!();
-
- fn class_init(klass: &mut glib::subclass::simple::ClassStruct<Self>) {
- klass.set_metadata(
- "Thread-sharing Test Src Element",
- "Generic",
- "Src Element for Pad Src Test",
- "François Laignel <fengalin@free.fr>",
- );
+ impl ObjectSubclass for ElementSrcTest {
+ const NAME: &'static str = "TsElementSrcTest";
+ type Type = super::ElementSrcTest;
+ type ParentType = gst::Element;
+ type Instance = gst::subclass::ElementInstanceStruct<Self>;
+ type Class = glib::subclass::simple::ClassStruct<Self>;
+
+ glib_object_subclass!();
+
+ fn class_init(klass: &mut Self::Class) {
+ klass.set_metadata(
+ "Thread-sharing Test Src Element",
+ "Generic",
+ "Src Element for Pad Src Test",
+ "François Laignel <fengalin@free.fr>",
+ );
- let caps = gst::Caps::new_any();
- let src_pad_template = gst::PadTemplate::new(
- "src",
- gst::PadDirection::Src,
- gst::PadPresence::Always,
- &caps,
- )
- .unwrap();
- klass.add_pad_template(src_pad_template);
+ let caps = gst::Caps::new_any();
+ let src_pad_template = gst::PadTemplate::new(
+ "src",
+ gst::PadDirection::Src,
+ gst::PadPresence::Always,
+ &caps,
+ )
+ .unwrap();
+ klass.add_pad_template(src_pad_template);
- klass.install_properties(&SRC_PROPERTIES);
- }
+ klass.install_properties(&SRC_PROPERTIES);
+ }
- fn with_class(klass: &glib::subclass::simple::ClassStruct<Self>) -> Self {
- ElementSrcTest {
- src_pad: PadSrc::new(
- gst::Pad::from_template(&klass.get_pad_template("src").unwrap(), Some("src")),
- PadSrcTestHandler,
- ),
- task: Task::default(),
- sender: StdMutex::new(None),
- settings: StdMutex::new(Settings::default()),
+ fn with_class(klass: &Self::Class) -> Self {
+ ElementSrcTest {
+ src_pad: PadSrc::new(
+ gst::Pad::from_template(&klass.get_pad_template("src").unwrap(), Some("src")),
+ PadSrcTestHandler,
+ ),
+ task: Task::default(),
+ sender: StdMutex::new(None),
+ settings: StdMutex::new(Settings::default()),
+ }
}
}
-}
-impl ObjectImpl for ElementSrcTest {
- fn set_property(&self, _obj: &glib::Object, id: usize, value: &glib::Value) {
- let prop = &SRC_PROPERTIES[id];
+ impl ObjectImpl for ElementSrcTest {
+ fn set_property(&self, _obj: &Self::Type, id: usize, value: &glib::Value) {
+ let prop = &SRC_PROPERTIES[id];
- match *prop {
- glib::subclass::Property("context", ..) => {
- let context = value
- .get()
- .expect("type checked upstream")
- .unwrap_or_else(|| "".into());
+ match *prop {
+ glib::subclass::Property("context", ..) => {
+ let context = value
+ .get()
+ .expect("type checked upstream")
+ .unwrap_or_else(|| "".into());
- self.settings.lock().unwrap().context = context;
+ self.settings.lock().unwrap().context = context;
+ }
+ _ => unimplemented!(),
}
- _ => unimplemented!(),
}
- }
- fn constructed(&self, obj: &glib::Object) {
- self.parent_constructed(obj);
+ fn constructed(&self, obj: &Self::Type) {
+ self.parent_constructed(obj);
- let element = obj.downcast_ref::<gst::Element>().unwrap();
- element.add_pad(self.src_pad.gst_pad()).unwrap();
+ obj.add_pad(self.src_pad.gst_pad()).unwrap();
+ }
}
-}
-impl ElementImpl for ElementSrcTest {
- fn change_state(
- &self,
- element: &gst::Element,
- transition: gst::StateChange,
- ) -> Result<gst::StateChangeSuccess, gst::StateChangeError> {
- gst_log!(SRC_CAT, obj: element, "Changing state {:?}", transition);
-
- match transition {
- gst::StateChange::NullToReady => {
- self.prepare(element).map_err(|err| {
- element.post_error_message(err);
- gst::StateChangeError
- })?;
- }
- gst::StateChange::PlayingToPaused => {
- self.pause(element);
- }
- gst::StateChange::ReadyToNull => {
- self.unprepare(element);
+ impl ElementImpl for ElementSrcTest {
+ fn change_state(
+ &self,
+ element: &Self::Type,
+ transition: gst::StateChange,
+ ) -> Result<gst::StateChangeSuccess, gst::StateChangeError> {
+ gst_log!(SRC_CAT, obj: element, "Changing state {:?}", transition);
+
+ match transition {
+ gst::StateChange::NullToReady => {
+ self.prepare(element).map_err(|err| {
+ element.post_error_message(err);
+ gst::StateChangeError
+ })?;
+ }
+ gst::StateChange::PlayingToPaused => {
+ self.pause(element);
+ }
+ gst::StateChange::ReadyToNull => {
+ self.unprepare(element);
+ }
+ _ => (),
}
- _ => (),
- }
- let mut success = self.parent_change_state(element, transition)?;
+ let mut success = self.parent_change_state(element, transition)?;
- match transition {
- gst::StateChange::PausedToReady => {
- self.stop(element);
- }
- gst::StateChange::PausedToPlaying => {
- self.start(element);
- }
- gst::StateChange::ReadyToPaused | gst::StateChange::PlayingToPaused => {
- success = gst::StateChangeSuccess::NoPreroll;
+ match transition {
+ gst::StateChange::PausedToReady => {
+ self.stop(element);
+ }
+ gst::StateChange::PausedToPlaying => {
+ self.start(element);
+ }
+ gst::StateChange::ReadyToPaused | gst::StateChange::PlayingToPaused => {
+ success = gst::StateChangeSuccess::NoPreroll;
+ }
+ _ => (),
}
- _ => (),
- }
- Ok(success)
- }
+ Ok(success)
+ }
- fn send_event(&self, _element: &gst::Element, event: gst::Event) -> bool {
- match event.view() {
- EventView::FlushStart(..) => {
- self.task.flush_start().unwrap();
- }
- EventView::FlushStop(..) => {
- self.task.flush_stop().unwrap();
+ fn send_event(&self, _element: &Self::Type, event: gst::Event) -> bool {
+ match event.view() {
+ EventView::FlushStart(..) => {
+ self.task.flush_start().unwrap();
+ }
+ EventView::FlushStop(..) => {
+ self.task.flush_stop().unwrap();
+ }
+ _ => (),
}
- _ => (),
- }
- if !event.is_serialized() {
- self.src_pad.gst_pad().push_event(event)
- } else {
- self.try_push(Item::Event(event)).is_ok()
+ if !event.is_serialized() {
+ self.src_pad.gst_pad().push_event(event)
+ } else {
+ self.try_push(Item::Event(event)).is_ok()
+ }
}
}
}
-// Sink
-
-#[derive(Debug)]
-enum Item {
- Buffer(gst::Buffer),
- BufferList(gst::BufferList),
- Event(gst::Event),
-}
-
-#[derive(Clone, Debug, GBoxed)]
-#[gboxed(type_name = "TsTestItemSender")]
-struct ItemSender {
- sender: mpsc::Sender<Item>,
+glib_wrapper! {
+ pub struct ElementSrcTest(ObjectSubclass<imp_src::ElementSrcTest>) @extends gst::Element, gst::Object;
}
+unsafe impl Send for ElementSrcTest {}
+unsafe impl Sync for ElementSrcTest {}
-static SINK_PROPERTIES: [glib::subclass::Property; 1] =
- [glib::subclass::Property("sender", |name| {
- glib::ParamSpec::boxed(
- name,
- "Sender",
- "Channel sender to forward the incoming items to",
- ItemSender::get_type(),
- glib::ParamFlags::WRITABLE,
- )
- })];
-
-#[derive(Clone, Debug, Default)]
-struct PadSinkTestHandler;
-
-impl PadSinkHandler for PadSinkTestHandler {
- type ElementImpl = ElementSinkTest;
-
- fn sink_chain(
- &self,
- _pad: &PadSinkRef,
- _elem_sink_test: &ElementSinkTest,
- element: &gst::Element,
- buffer: gst::Buffer,
- ) -> BoxFuture<'static, Result<gst::FlowSuccess, gst::FlowError>> {
- let element = element.clone();
- async move {
- let elem_sink_test = ElementSinkTest::from_instance(&element);
- elem_sink_test
- .forward_item(&element, Item::Buffer(buffer))
- .await
- }
- .boxed()
- }
+// Sink
- fn sink_chain_list(
- &self,
- _pad: &PadSinkRef,
- _elem_sink_test: &ElementSinkTest,
- element: &gst::Element,
- list: gst::BufferList,
- ) -> BoxFuture<'static, Result<gst::FlowSuccess, gst::FlowError>> {
- let element = element.clone();
- async move {
- let elem_sink_test = ElementSinkTest::from_instance(&element);
- elem_sink_test
- .forward_item(&element, Item::BufferList(list))
- .await
+mod imp_sink {
+ use super::*;
+
+ static SINK_PROPERTIES: [glib::subclass::Property; 1] =
+ [glib::subclass::Property("sender", |name| {
+ glib::ParamSpec::boxed(
+ name,
+ "Sender",
+ "Channel sender to forward the incoming items to",
+ ItemSender::get_type(),
+ glib::ParamFlags::WRITABLE,
+ )
+ })];
+
+ #[derive(Clone, Debug, Default)]
+ struct PadSinkTestHandler;
+
+ impl PadSinkHandler for PadSinkTestHandler {
+ type ElementImpl = ElementSinkTest;
+
+ fn sink_chain(
+ &self,
+ _pad: &PadSinkRef,
+ _elem_sink_test: &ElementSinkTest,
+ element: &gst::Element,
+ buffer: gst::Buffer,
+ ) -> BoxFuture<'static, Result<gst::FlowSuccess, gst::FlowError>> {
+ let element = element
+ .clone()
+ .downcast::<super::ElementSinkTest>()
+ .unwrap();
+ async move {
+ let elem_sink_test = ElementSinkTest::from_instance(&element);
+ elem_sink_test
+ .forward_item(&element, Item::Buffer(buffer))
+ .await
+ }
+ .boxed()
}
- .boxed()
- }
- fn sink_event(
- &self,
- pad: &PadSinkRef,
- elem_sink_test: &ElementSinkTest,
- element: &gst::Element,
- event: gst::Event,
- ) -> bool {
- gst_debug!(SINK_CAT, obj: pad.gst_pad(), "Handling non-serialized {:?}", event);
-
- match event.view() {
- EventView::FlushStart(..) => {
- elem_sink_test.stop(&element);
- true
+ fn sink_chain_list(
+ &self,
+ _pad: &PadSinkRef,
+ _elem_sink_test: &ElementSinkTest,
+ element: &gst::Element,
+ list: gst::BufferList,
+ ) -> BoxFuture<'static, Result<gst::FlowSuccess, gst::FlowError>> {
+ let element = element
+ .clone()
+ .downcast::<super::ElementSinkTest>()
+ .unwrap();
+ async move {
+ let elem_sink_test = ElementSinkTest::from_instance(&element);
+ elem_sink_test
+ .forward_item(&element, Item::BufferList(list))
+ .await
}
- _ => false,
+ .boxed()
}
- }
- fn sink_event_serialized(
- &self,
- pad: &PadSinkRef,
- _elem_sink_test: &ElementSinkTest,
- element: &gst::Element,
- event: gst::Event,
- ) -> BoxFuture<'static, bool> {
- gst_log!(SINK_CAT, obj: pad.gst_pad(), "Handling serialized {:?}", event);
-
- let element = element.clone();
- async move {
- let elem_sink_test = ElementSinkTest::from_instance(&element);
-
- if let EventView::FlushStop(..) = event.view() {
- elem_sink_test.start(&element);
+ fn sink_event(
+ &self,
+ pad: &PadSinkRef,
+ elem_sink_test: &ElementSinkTest,
+ element: &gst::Element,
+ event: gst::Event,
+ ) -> bool {
+ gst_debug!(SINK_CAT, obj: pad.gst_pad(), "Handling non-serialized {:?}", event);
+
+ match event.view() {
+ EventView::FlushStart(..) => {
+ elem_sink_test.stop(&element.downcast_ref::<super::ElementSinkTest>().unwrap());
+ true
+ }
+ _ => false,
}
-
- elem_sink_test
- .forward_item(&element, Item::Event(event))
- .await
- .is_ok()
}
- .boxed()
- }
-}
-#[derive(Debug)]
-struct ElementSinkTest {
- sink_pad: PadSink,
- flushing: AtomicBool,
- sender: FutMutex<Option<mpsc::Sender<Item>>>,
-}
+ fn sink_event_serialized(
+ &self,
+ pad: &PadSinkRef,
+ _elem_sink_test: &ElementSinkTest,
+ element: &gst::Element,
+ event: gst::Event,
+ ) -> BoxFuture<'static, bool> {
+ gst_log!(SINK_CAT, obj: pad.gst_pad(), "Handling serialized {:?}", event);
+
+ let element = element
+ .clone()
+ .downcast::<super::ElementSinkTest>()
+ .unwrap();
+ async move {
+ let elem_sink_test = ElementSinkTest::from_instance(&element);
+
+ if let EventView::FlushStop(..) = event.view() {
+ elem_sink_test.start(&element);
+ }
-impl ElementSinkTest {
- async fn forward_item(
- &self,
- element: &gst::Element,
- item: Item,
- ) -> Result<gst::FlowSuccess, gst::FlowError> {
- if !self.flushing.load(Ordering::SeqCst) {
- gst_debug!(SINK_CAT, obj: element, "Fowarding {:?}", item);
- self.sender
- .lock()
- .await
- .as_mut()
- .expect("Item Sender not set")
- .send(item)
- .await
- .map(|_| gst::FlowSuccess::Ok)
- .map_err(|_| gst::FlowError::Error)
- } else {
- gst_debug!(
- SINK_CAT,
- obj: element,
- "Not fowarding {:?} due to flushing",
- item
- );
- Err(gst::FlowError::Flushing)
+ elem_sink_test
+ .forward_item(&element, Item::Event(event))
+ .await
+ .is_ok()
+ }
+ .boxed()
}
}
- fn start(&self, element: &gst::Element) {
- gst_debug!(SINK_CAT, obj: element, "Starting");
- self.flushing.store(false, Ordering::SeqCst);
- gst_debug!(SINK_CAT, obj: element, "Started");
+ #[derive(Debug)]
+ pub struct ElementSinkTest {
+ sink_pad: PadSink,
+ flushing: AtomicBool,
+ sender: FutMutex<Option<mpsc::Sender<Item>>>,
}
- fn stop(&self, element: &gst::Element) {
- gst_debug!(SINK_CAT, obj: element, "Stopping");
- self.flushing.store(true, Ordering::SeqCst);
- gst_debug!(SINK_CAT, obj: element, "Stopped");
- }
-}
+ impl ElementSinkTest {
+ async fn forward_item(
+ &self,
+ element: &super::ElementSinkTest,
+ item: Item,
+ ) -> Result<gst::FlowSuccess, gst::FlowError> {
+ if !self.flushing.load(Ordering::SeqCst) {
+ gst_debug!(SINK_CAT, obj: element, "Fowarding {:?}", item);
+ self.sender
+ .lock()
+ .await
+ .as_mut()
+ .expect("Item Sender not set")
+ .send(item)
+ .await
+ .map(|_| gst::FlowSuccess::Ok)
+ .map_err(|_| gst::FlowError::Error)
+ } else {
+ gst_debug!(
+ SINK_CAT,
+ obj: element,
+ "Not fowarding {:?} due to flushing",
+ item
+ );
+ Err(gst::FlowError::Flushing)
+ }
+ }
-impl ElementSinkTest {
- fn push_flush_start(&self, element: &gst::Element) {
- gst_debug!(SINK_CAT, obj: element, "Pushing FlushStart");
- self.sink_pad
- .gst_pad()
- .push_event(gst::event::FlushStart::new());
- gst_debug!(SINK_CAT, obj: element, "FlushStart pushed");
- }
+ fn start(&self, element: &super::ElementSinkTest) {
+ gst_debug!(SINK_CAT, obj: element, "Starting");
+ self.flushing.store(false, Ordering::SeqCst);
+ gst_debug!(SINK_CAT, obj: element, "Started");
+ }
- fn push_flush_stop(&self, element: &gst::Element) {
- gst_debug!(SINK_CAT, obj: element, "Pushing FlushStop");
- self.sink_pad
- .gst_pad()
- .push_event(gst::event::FlushStop::new(true));
- gst_debug!(SINK_CAT, obj: element, "FlushStop pushed");
+ fn stop(&self, element: &super::ElementSinkTest) {
+ gst_debug!(SINK_CAT, obj: element, "Stopping");
+ self.flushing.store(true, Ordering::SeqCst);
+ gst_debug!(SINK_CAT, obj: element, "Stopped");
+ }
}
-}
-
-lazy_static! {
- static ref SINK_CAT: gst::DebugCategory = gst::DebugCategory::new(
- "ts-element-sink-test",
- gst::DebugColorFlags::empty(),
- Some("Thread-sharing Test Sink Element"),
- );
-}
-impl ObjectSubclass for ElementSinkTest {
- const NAME: &'static str = "TsElementSinkTest";
- type ParentType = gst::Element;
- type Instance = gst::subclass::ElementInstanceStruct<Self>;
- type Class = glib::subclass::simple::ClassStruct<Self>;
+ impl ElementSinkTest {
+ pub fn push_flush_start(&self, element: &super::ElementSinkTest) {
+ gst_debug!(SINK_CAT, obj: element, "Pushing FlushStart");
+ self.sink_pad
+ .gst_pad()
+ .push_event(gst::event::FlushStart::new());
+ gst_debug!(SINK_CAT, obj: element, "FlushStart pushed");
+ }
- glib_object_subclass!();
+ pub fn push_flush_stop(&self, element: &super::ElementSinkTest) {
+ gst_debug!(SINK_CAT, obj: element, "Pushing FlushStop");
+ self.sink_pad
+ .gst_pad()
+ .push_event(gst::event::FlushStop::new(true));
+ gst_debug!(SINK_CAT, obj: element, "FlushStop pushed");
+ }
+ }
- fn class_init(klass: &mut glib::subclass::simple::ClassStruct<Self>) {
- klass.set_metadata(
- "Thread-sharing Test Sink Element",
- "Generic",
- "Sink Element for Pad Test",
- "François Laignel <fengalin@free.fr>",
+ lazy_static! {
+ static ref SINK_CAT: gst::DebugCategory = gst::DebugCategory::new(
+ "ts-element-sink-test",
+ gst::DebugColorFlags::empty(),
+ Some("Thread-sharing Test Sink Element"),
);
+ }
- let caps = gst::Caps::new_any();
- let sink_pad_template = gst::PadTemplate::new(
- "sink",
- gst::PadDirection::Sink,
- gst::PadPresence::Always,
- &caps,
- )
- .unwrap();
- klass.add_pad_template(sink_pad_template);
+ impl ObjectSubclass for ElementSinkTest {
+ const NAME: &'static str = "TsElementSinkTest";
+ type Type = super::ElementSinkTest;
+ type ParentType = gst::Element;
+ type Instance = gst::subclass::ElementInstanceStruct<Self>;
+ type Class = glib::subclass::simple::ClassStruct<Self>;
+
+ glib_object_subclass!();
+
+ fn class_init(klass: &mut Self::Class) {
+ klass.set_metadata(
+ "Thread-sharing Test Sink Element",
+ "Generic",
+ "Sink Element for Pad Test",
+ "François Laignel <fengalin@free.fr>",
+ );
- klass.install_properties(&SINK_PROPERTIES);
- }
+ let caps = gst::Caps::new_any();
+ let sink_pad_template = gst::PadTemplate::new(
+ "sink",
+ gst::PadDirection::Sink,
+ gst::PadPresence::Always,
+ &caps,
+ )
+ .unwrap();
+ klass.add_pad_template(sink_pad_template);
- fn with_class(klass: &glib::subclass::simple::ClassStruct<Self>) -> Self {
- ElementSinkTest {
- sink_pad: PadSink::new(
- gst::Pad::from_template(&klass.get_pad_template("sink").unwrap(), Some("sink")),
- PadSinkTestHandler,
- ),
- flushing: AtomicBool::new(true),
- sender: FutMutex::new(None),
+ klass.install_properties(&SINK_PROPERTIES);
}
- }
-}
-impl ObjectImpl for ElementSinkTest {
- fn set_property(&self, _obj: &glib::Object, id: usize, value: &glib::Value) {
- let prop = &SINK_PROPERTIES[id];
-
- match *prop {
- glib::subclass::Property("sender", ..) => {
- let ItemSender { sender } = value
- .get::<&ItemSender>()
- .expect("type checked upstream")
- .expect("ItemSender not found")
- .clone();
- *futures::executor::block_on(self.sender.lock()) = Some(sender);
+ fn with_class(klass: &Self::Class) -> Self {
+ ElementSinkTest {
+ sink_pad: PadSink::new(
+ gst::Pad::from_template(&klass.get_pad_template("sink").unwrap(), Some("sink")),
+ PadSinkTestHandler,
+ ),
+ flushing: AtomicBool::new(true),
+ sender: FutMutex::new(None),
}
- _ => unimplemented!(),
}
}
- fn constructed(&self, obj: &glib::Object) {
- self.parent_constructed(obj);
+ impl ObjectImpl for ElementSinkTest {
+ fn set_property(&self, _obj: &Self::Type, id: usize, value: &glib::Value) {
+ let prop = &SINK_PROPERTIES[id];
+
+ match *prop {
+ glib::subclass::Property("sender", ..) => {
+ let ItemSender { sender } = value
+ .get::<&ItemSender>()
+ .expect("type checked upstream")
+ .expect("ItemSender not found")
+ .clone();
+ *futures::executor::block_on(self.sender.lock()) = Some(sender);
+ }
+ _ => unimplemented!(),
+ }
+ }
- let element = obj.downcast_ref::<gst::Element>().unwrap();
- element.add_pad(self.sink_pad.gst_pad()).unwrap();
+ fn constructed(&self, obj: &Self::Type) {
+ self.parent_constructed(obj);
+
+ obj.add_pad(self.sink_pad.gst_pad()).unwrap();
+ }
}
-}
-impl ElementImpl for ElementSinkTest {
- fn change_state(
- &self,
- element: &gst::Element,
- transition: gst::StateChange,
- ) -> Result<gst::StateChangeSuccess, gst::StateChangeError> {
- gst_log!(SINK_CAT, obj: element, "Changing state {:?}", transition);
+ impl ElementImpl for ElementSinkTest {
+ fn change_state(
+ &self,
+ element: &Self::Type,
+ transition: gst::StateChange,
+ ) -> Result<gst::StateChangeSuccess, gst::StateChangeError> {
+ gst_log!(SINK_CAT, obj: element, "Changing state {:?}", transition);
- if let gst::StateChange::PausedToReady = transition {
- self.stop(element);
- }
+ if let gst::StateChange::PausedToReady = transition {
+ self.stop(element);
+ }
- let success = self.parent_change_state(element, transition)?;
+ let success = self.parent_change_state(element, transition)?;
- if let gst::StateChange::ReadyToPaused = transition {
- self.start(element);
- }
+ if let gst::StateChange::ReadyToPaused = transition {
+ self.start(element);
+ }
- Ok(success)
+ Ok(success)
+ }
}
}
+glib_wrapper! {
+ pub struct ElementSinkTest(ObjectSubclass<imp_sink::ElementSinkTest>) @extends gst::Element, gst::Object;
+}
+unsafe impl Send for ElementSinkTest {}
+unsafe impl Sync for ElementSinkTest {}
+
fn setup(
context_name: &str,
mut middle_element_1: Option<gst::Element>,
mut middle_element_2: Option<gst::Element>,
) -> (
gst::Pipeline,
- gst::Element,
- gst::Element,
+ ElementSrcTest,
+ ElementSinkTest,
mpsc::Receiver<Item>,
) {
init();
@@ -708,14 +741,14 @@ fn setup(
let pipeline = gst::Pipeline::new(None);
// Src
- let src_element = glib::Object::new(ElementSrcTest::get_type(), &[])
+ let src_element = glib::Object::new(ElementSrcTest::static_type(), &[])
.unwrap()
- .downcast::<gst::Element>()
+ .downcast::<ElementSrcTest>()
.unwrap();
src_element.set_property("context", &context_name).unwrap();
pipeline.add(&src_element).unwrap();
- let mut last_element = src_element.clone();
+ let mut last_element = src_element.clone().upcast::<gst::Element>();
if let Some(middle_element) = middle_element_1.take() {
pipeline.add(&middle_element).unwrap();
@@ -730,9 +763,9 @@ fn setup(
}
// Sink
- let sink_element = glib::Object::new(ElementSinkTest::get_type(), &[])
+ let sink_element = glib::Object::new(ElementSinkTest::static_type(), &[])
.unwrap()
- .downcast::<gst::Element>()
+ .downcast::<ElementSinkTest>()
.unwrap();
pipeline.add(&sink_element).unwrap();
last_element.link(&sink_element).unwrap();
@@ -748,10 +781,10 @@ fn setup(
fn nominal_scenario(
scenario_name: &str,
pipeline: gst::Pipeline,
- src_element: gst::Element,
+ src_element: ElementSrcTest,
mut receiver: mpsc::Receiver<Item>,
) {
- let elem_src_test = ElementSrcTest::from_instance(&src_element);
+ let elem_src_test = imp_src::ElementSrcTest::from_instance(&src_element);
pipeline.set_state(gst::State::Playing).unwrap();
@@ -975,7 +1008,7 @@ fn start_pause_start() {
let (pipeline, src_element, _sink_element, mut receiver) = setup(&scenario_name, None, None);
- let elem_src_test = ElementSrcTest::from_instance(&src_element);
+ let elem_src_test = imp_src::ElementSrcTest::from_instance(&src_element);
pipeline.set_state(gst::State::Playing).unwrap();
@@ -1053,7 +1086,7 @@ fn start_stop_start() {
let (pipeline, src_element, _sink_element, mut receiver) = setup(&scenario_name, None, None);
- let elem_src_test = ElementSrcTest::from_instance(&src_element);
+ let elem_src_test = imp_src::ElementSrcTest::from_instance(&src_element);
pipeline.set_state(gst::State::Playing).unwrap();
@@ -1109,7 +1142,7 @@ fn start_stop_start() {
match futures::executor::block_on(receiver.next()).unwrap() {
Item::Buffer(_buffer) => {
gst_info!(
- SRC_CAT,
+ imp_src::SRC_CAT,
"{}: initial buffer went through, don't expect any pending item to be dropped",
scenario_name
);
@@ -1166,7 +1199,7 @@ fn start_flush() {
let (pipeline, src_element, sink_element, mut receiver) = setup(&scenario_name, None, None);
- let elem_src_test = ElementSrcTest::from_instance(&src_element);
+ let elem_src_test = imp_src::ElementSrcTest::from_instance(&src_element);
pipeline.set_state(gst::State::Playing).unwrap();
@@ -1206,7 +1239,7 @@ fn start_flush() {
.try_push(Item::Buffer(gst::Buffer::from_slice(vec![1, 2, 3, 4])))
.unwrap();
- let elem_sink_test = ElementSinkTest::from_instance(&sink_element);
+ let elem_sink_test = imp_sink::ElementSinkTest::from_instance(&sink_element);
elem_sink_test.push_flush_start(&sink_element);