Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.freedesktop.org/gstreamer/gst-plugins-rs.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--generic/threadshare/examples/benchmark.rs41
-rw-r--r--generic/threadshare/examples/standalone/main.rs69
-rw-r--r--generic/threadshare/examples/udpsrc_benchmark_sender.rs49
-rw-r--r--generic/threadshare/tests/pipeline.rs120
-rw-r--r--utils/fallbackswitch/examples/gtk_fallbackswitch.rs55
-rw-r--r--utils/livesync/examples/gtk_livesync.rs10
-rw-r--r--utils/togglerecord/examples/gtk_recording.rs55
-rw-r--r--video/gtk4/examples/gtksink.rs54
8 files changed, 232 insertions, 221 deletions
diff --git a/generic/threadshare/examples/benchmark.rs b/generic/threadshare/examples/benchmark.rs
index 9ecfce632..3d8a893ce 100644
--- a/generic/threadshare/examples/benchmark.rs
+++ b/generic/threadshare/examples/benchmark.rs
@@ -192,27 +192,28 @@ fn main() {
let bus = pipeline.bus().unwrap();
let l_clone = l.clone();
- bus.add_watch(move |_, msg| {
- use gst::MessageView;
-
- match msg.view() {
- MessageView::Eos(..) => l_clone.quit(),
- MessageView::Error(err) => {
- gst::error!(
- CAT,
- "Error from {:?}: {} ({:?})",
- err.src().map(|s| s.path_string()),
- err.error(),
- err.debug()
- );
- l_clone.quit();
- }
- _ => (),
- };
+ let _bus_watch = bus
+ .add_watch(move |_, msg| {
+ use gst::MessageView;
+
+ match msg.view() {
+ MessageView::Eos(..) => l_clone.quit(),
+ MessageView::Error(err) => {
+ gst::error!(
+ CAT,
+ "Error from {:?}: {} ({:?})",
+ err.src().map(|s| s.path_string()),
+ err.error(),
+ err.debug()
+ );
+ l_clone.quit();
+ }
+ _ => (),
+ };
- glib::Continue(true)
- })
- .expect("Failed to add bus watch");
+ glib::Continue(true)
+ })
+ .expect("Failed to add bus watch");
pipeline.set_state(gst::State::Playing).unwrap();
diff --git a/generic/threadshare/examples/standalone/main.rs b/generic/threadshare/examples/standalone/main.rs
index ff1edc416..54340ef4e 100644
--- a/generic/threadshare/examples/standalone/main.rs
+++ b/generic/threadshare/examples/standalone/main.rs
@@ -108,45 +108,46 @@ fn main() {
let terminated_count = Arc::new(AtomicU32::new(0));
let pipeline_clone = pipeline.clone();
let l_clone = l.clone();
- bus.add_watch(move |_, msg| {
- use gst::MessageView;
- match msg.view() {
- MessageView::Eos(_) => {
- // Actually, we don't post EOS (see sinks impl).
- gst::info!(CAT, "Received eos");
- l_clone.quit();
-
- glib::Continue(false)
- }
- MessageView::Error(msg) => {
- if let gst::MessageView::Error(msg) = msg.message().view() {
- if msg.error().matches(gst::LibraryError::Shutdown) {
- if terminated_count.fetch_add(1, Ordering::SeqCst) == args.streams - 1 {
- gst::info!(CAT, "Received all shutdown requests");
- l_clone.quit();
-
- return glib::Continue(false);
- } else {
- return glib::Continue(true);
+ let _bus_watch = bus
+ .add_watch(move |_, msg| {
+ use gst::MessageView;
+ match msg.view() {
+ MessageView::Eos(_) => {
+ // Actually, we don't post EOS (see sinks impl).
+ gst::info!(CAT, "Received eos");
+ l_clone.quit();
+
+ glib::Continue(false)
+ }
+ MessageView::Error(msg) => {
+ if let gst::MessageView::Error(msg) = msg.message().view() {
+ if msg.error().matches(gst::LibraryError::Shutdown) {
+ if terminated_count.fetch_add(1, Ordering::SeqCst) == args.streams - 1 {
+ gst::info!(CAT, "Received all shutdown requests");
+ l_clone.quit();
+
+ return glib::Continue(false);
+ } else {
+ return glib::Continue(true);
+ }
}
}
- }
- gst::error!(
- CAT,
- "Error from {:?}: {} ({:?})",
- msg.src().map(|s| s.path_string()),
- msg.error(),
- msg.debug()
- );
- l_clone.quit();
+ gst::error!(
+ CAT,
+ "Error from {:?}: {} ({:?})",
+ msg.src().map(|s| s.path_string()),
+ msg.error(),
+ msg.debug()
+ );
+ l_clone.quit();
- glib::Continue(false)
+ glib::Continue(false)
+ }
+ _ => glib::Continue(true),
}
- _ => glib::Continue(true),
- }
- })
- .expect("Failed to add bus watch");
+ })
+ .expect("Failed to add bus watch");
gst::info!(CAT, "Switching to Ready");
let start = Instant::now();
diff --git a/generic/threadshare/examples/udpsrc_benchmark_sender.rs b/generic/threadshare/examples/udpsrc_benchmark_sender.rs
index 497225d4f..bb7ce23fb 100644
--- a/generic/threadshare/examples/udpsrc_benchmark_sender.rs
+++ b/generic/threadshare/examples/udpsrc_benchmark_sender.rs
@@ -170,31 +170,32 @@ fn run(pipeline: gst::Pipeline) {
let bus = pipeline.bus().unwrap();
let l_clone = l.clone();
- bus.add_watch(move |_, msg| {
- use gst::MessageView;
- match msg.view() {
- MessageView::Eos(_) => {
- gst::info!(CAT, "Received eos");
- l_clone.quit();
-
- glib::Continue(false)
+ let _bus_watch = bus
+ .add_watch(move |_, msg| {
+ use gst::MessageView;
+ match msg.view() {
+ MessageView::Eos(_) => {
+ gst::info!(CAT, "Received eos");
+ l_clone.quit();
+
+ glib::Continue(false)
+ }
+ MessageView::Error(msg) => {
+ gst::error!(
+ CAT,
+ "Error from {:?}: {} ({:?})",
+ msg.src().map(|s| s.path_string()),
+ msg.error(),
+ msg.debug()
+ );
+ l_clone.quit();
+
+ glib::Continue(false)
+ }
+ _ => glib::Continue(true),
}
- MessageView::Error(msg) => {
- gst::error!(
- CAT,
- "Error from {:?}: {} ({:?})",
- msg.src().map(|s| s.path_string()),
- msg.error(),
- msg.debug()
- );
- l_clone.quit();
-
- glib::Continue(false)
- }
- _ => glib::Continue(true),
- }
- })
- .expect("Failed to add bus watch");
+ })
+ .expect("Failed to add bus watch");
pipeline.set_state(gst::State::Playing).unwrap();
l.run();
diff --git a/generic/threadshare/tests/pipeline.rs b/generic/threadshare/tests/pipeline.rs
index 91e5882a8..d27eef6e9 100644
--- a/generic/threadshare/tests/pipeline.rs
+++ b/generic/threadshare/tests/pipeline.rs
@@ -135,38 +135,39 @@ fn multiple_contexts_queue() {
let bus = pipeline.bus().unwrap();
let l_clone = l.clone();
- bus.add_watch(move |_, msg| {
- use gst::MessageView;
-
- match msg.view() {
- MessageView::StateChanged(state_changed) => {
- if let Some(source) = state_changed.src() {
- if source.type_() == gst::Pipeline::static_type()
- && state_changed.old() == gst::State::Paused
- && state_changed.current() == gst::State::Playing
- {
- if let Some(test_scenario) = test_scenario.take() {
- std::thread::spawn(test_scenario);
+ let _bus_watch = bus
+ .add_watch(move |_, msg| {
+ use gst::MessageView;
+
+ match msg.view() {
+ MessageView::StateChanged(state_changed) => {
+ if let Some(source) = state_changed.src() {
+ if source.type_() == gst::Pipeline::static_type()
+ && state_changed.old() == gst::State::Paused
+ && state_changed.current() == gst::State::Playing
+ {
+ if let Some(test_scenario) = test_scenario.take() {
+ std::thread::spawn(test_scenario);
+ }
}
}
}
- }
- MessageView::Error(err) => {
- gst::error!(
- CAT,
- "multiple_contexts_queue: Error from {:?}: {} ({:?})",
- err.src().map(|s| s.path_string()),
- err.error(),
- err.debug()
- );
- l_clone.quit();
- }
- _ => (),
- };
+ MessageView::Error(err) => {
+ gst::error!(
+ CAT,
+ "multiple_contexts_queue: Error from {:?}: {} ({:?})",
+ err.src().map(|s| s.path_string()),
+ err.error(),
+ err.debug()
+ );
+ l_clone.quit();
+ }
+ _ => (),
+ };
- glib::Continue(true)
- })
- .unwrap();
+ glib::Continue(true)
+ })
+ .unwrap();
pipeline.set_state(gst::State::Playing).unwrap();
@@ -281,38 +282,39 @@ fn multiple_contexts_proxy() {
let bus = pipeline.bus().unwrap();
let l_clone = l.clone();
- bus.add_watch(move |_, msg| {
- use gst::MessageView;
-
- match msg.view() {
- MessageView::StateChanged(state_changed) => {
- if let Some(source) = state_changed.src() {
- if source.type_() == gst::Pipeline::static_type()
- && state_changed.old() == gst::State::Paused
- && state_changed.current() == gst::State::Playing
- {
- if let Some(test_scenario) = test_scenario.take() {
- std::thread::spawn(test_scenario);
+ let _bus_watch = bus
+ .add_watch(move |_, msg| {
+ use gst::MessageView;
+
+ match msg.view() {
+ MessageView::StateChanged(state_changed) => {
+ if let Some(source) = state_changed.src() {
+ if source.type_() == gst::Pipeline::static_type()
+ && state_changed.old() == gst::State::Paused
+ && state_changed.current() == gst::State::Playing
+ {
+ if let Some(test_scenario) = test_scenario.take() {
+ std::thread::spawn(test_scenario);
+ }
}
}
}
- }
- MessageView::Error(err) => {
- gst::error!(
- CAT,
- "multiple_contexts_proxy: Error from {:?}: {} ({:?})",
- err.src().map(|s| s.path_string()),
- err.error(),
- err.debug()
- );
- l_clone.quit();
- }
- _ => (),
- };
+ MessageView::Error(err) => {
+ gst::error!(
+ CAT,
+ "multiple_contexts_proxy: Error from {:?}: {} ({:?})",
+ err.src().map(|s| s.path_string()),
+ err.error(),
+ err.debug()
+ );
+ l_clone.quit();
+ }
+ _ => (),
+ };
- glib::Continue(true)
- })
- .unwrap();
+ glib::Continue(true)
+ })
+ .unwrap();
pipeline.set_state(gst::State::Playing).unwrap();
@@ -405,7 +407,7 @@ fn eos() {
});
let l_clone = l.clone();
- pipeline
+ let _bus_watch = pipeline
.bus()
.unwrap()
.add_watch(move |_, msg| {
@@ -561,7 +563,7 @@ fn premature_shutdown() {
});
let l_clone = l.clone();
- pipeline
+ let _bus_watch = pipeline
.bus()
.unwrap()
.add_watch(move |_, msg| {
@@ -657,7 +659,7 @@ fn socket_play_null_play() {
});
let l_clone = l.clone();
- pipeline
+ let _bus_watch = pipeline
.bus()
.unwrap()
.add_watch(move |_, msg| {
diff --git a/utils/fallbackswitch/examples/gtk_fallbackswitch.rs b/utils/fallbackswitch/examples/gtk_fallbackswitch.rs
index 824edd4f7..4aaf78dde 100644
--- a/utils/fallbackswitch/examples/gtk_fallbackswitch.rs
+++ b/utils/fallbackswitch/examples/gtk_fallbackswitch.rs
@@ -154,42 +154,43 @@ fn create_ui(app: &gtk::Application) {
let bus = pipeline.bus().unwrap();
let app_weak = app.downgrade();
- bus.add_watch_local(move |_, msg| {
- use gst::MessageView;
-
- let app = match app_weak.upgrade() {
- Some(app) => app,
- None => return glib::Continue(false),
- };
-
- match msg.view() {
- MessageView::Eos(..) => app.quit(),
- MessageView::Error(err) => {
- println!(
- "Error from {:?}: {} ({:?})",
- msg.src().map(|s| s.path_string()),
- err.error(),
- err.debug()
- );
- app.quit();
- }
- _ => (),
- };
-
- glib::Continue(true)
- })
- .expect("Failed to add bus watch");
+ let bus_watch = bus
+ .add_watch_local(move |_, msg| {
+ use gst::MessageView;
+
+ let app = match app_weak.upgrade() {
+ Some(app) => app,
+ None => return glib::Continue(false),
+ };
+
+ match msg.view() {
+ MessageView::Eos(..) => app.quit(),
+ MessageView::Error(err) => {
+ println!(
+ "Error from {:?}: {} ({:?})",
+ msg.src().map(|s| s.path_string()),
+ err.error(),
+ err.debug()
+ );
+ app.quit();
+ }
+ _ => (),
+ };
+
+ glib::Continue(true)
+ })
+ .expect("Failed to add bus watch");
pipeline.set_state(gst::State::Playing).unwrap();
// Pipeline reference is owned by the closure below, so will be
// destroyed once the app is destroyed
let timeout_id = RefCell::new(Some(timeout_id));
+ let bus_watch = RefCell::new(Some(bus_watch));
app.connect_shutdown(move |_| {
+ drop(bus_watch.borrow_mut().take());
pipeline.set_state(gst::State::Null).unwrap();
- bus.remove_watch().unwrap();
-
if let Some(timeout_id) = timeout_id.borrow_mut().take() {
timeout_id.remove();
}
diff --git a/utils/livesync/examples/gtk_livesync.rs b/utils/livesync/examples/gtk_livesync.rs
index 5c9e4f9c0..19836cd3b 100644
--- a/utils/livesync/examples/gtk_livesync.rs
+++ b/utils/livesync/examples/gtk_livesync.rs
@@ -10,7 +10,7 @@
use gio::prelude::*;
use gst::{glib, prelude::*};
use gtk::prelude::*;
-use std::cell::Cell;
+use std::cell::{Cell, RefCell};
struct DroppingProbe(glib::WeakRef<gst::Pad>, Option<gst::PadProbeId>);
@@ -106,7 +106,7 @@ fn create_window(app: &gtk::Application) {
}
});
- {
+ let bus_watch = {
let bus = pipeline.bus().unwrap();
let window = window.downgrade();
bus.add_watch_local(move |_, msg| {
@@ -136,8 +136,8 @@ fn create_window(app: &gtk::Application) {
glib::Continue(true)
})
- .unwrap();
- }
+ .unwrap()
+ };
{
let pipeline = pipeline.clone();
@@ -148,7 +148,9 @@ fn create_window(app: &gtk::Application) {
});
}
+ let bus_watch = RefCell::new(Some(bus_watch));
window.connect_unrealize(move |_| {
+ drop(bus_watch.borrow_mut().take());
pipeline
.set_state(gst::State::Null)
.expect("Failed to stop pipeline");
diff --git a/utils/togglerecord/examples/gtk_recording.rs b/utils/togglerecord/examples/gtk_recording.rs
index 7159a072a..272a2d370 100644
--- a/utils/togglerecord/examples/gtk_recording.rs
+++ b/utils/togglerecord/examples/gtk_recording.rs
@@ -284,42 +284,43 @@ fn create_ui(app: &gtk::Application) {
let bus = pipeline.bus().unwrap();
let app_weak = app.downgrade();
- bus.add_watch_local(move |_, msg| {
- use gst::MessageView;
-
- let app = match app_weak.upgrade() {
- Some(app) => app,
- None => return glib::Continue(false),
- };
-
- match msg.view() {
- MessageView::Eos(..) => app.quit(),
- MessageView::Error(err) => {
- println!(
- "Error from {:?}: {} ({:?})",
- msg.src().map(|s| s.path_string()),
- err.error(),
- err.debug()
- );
- app.quit();
- }
- _ => (),
- };
-
- glib::Continue(true)
- })
- .expect("Failed to add bus watch");
+ let bus_watch = bus
+ .add_watch_local(move |_, msg| {
+ use gst::MessageView;
+
+ let app = match app_weak.upgrade() {
+ Some(app) => app,
+ None => return glib::Continue(false),
+ };
+
+ match msg.view() {
+ MessageView::Eos(..) => app.quit(),
+ MessageView::Error(err) => {
+ println!(
+ "Error from {:?}: {} ({:?})",
+ msg.src().map(|s| s.path_string()),
+ err.error(),
+ err.debug()
+ );
+ app.quit();
+ }
+ _ => (),
+ };
+
+ glib::Continue(true)
+ })
+ .expect("Failed to add bus watch");
pipeline.set_state(gst::State::Playing).unwrap();
// Pipeline reference is owned by the closure below, so will be
// destroyed once the app is destroyed
let timeout_id = RefCell::new(Some(timeout_id));
+ let bus_watch = RefCell::new(Some(bus_watch));
app.connect_shutdown(move |_| {
+ drop(bus_watch.borrow_mut().take());
pipeline.set_state(gst::State::Null).unwrap();
- bus.remove_watch().unwrap();
-
if let Some(timeout_id) = timeout_id.borrow_mut().take() {
timeout_id.remove();
}
diff --git a/video/gtk4/examples/gtksink.rs b/video/gtk4/examples/gtksink.rs
index bdb3da2e1..3dddce252 100644
--- a/video/gtk4/examples/gtksink.rs
+++ b/video/gtk4/examples/gtksink.rs
@@ -95,42 +95,44 @@ fn create_ui(app: &gtk::Application) {
.expect("Unable to set the pipeline to the `Playing` state");
let app_weak = app.downgrade();
- bus.add_watch_local(move |_, msg| {
- use gst::MessageView;
-
- let app = match app_weak.upgrade() {
- Some(app) => app,
- None => return glib::Continue(false),
- };
-
- match msg.view() {
- MessageView::Eos(..) => app.quit(),
- MessageView::Error(err) => {
- println!(
- "Error from {:?}: {} ({:?})",
- err.src().map(|s| s.path_string()),
- err.error(),
- err.debug()
- );
- app.quit();
- }
- _ => (),
- };
-
- glib::Continue(true)
- })
- .expect("Failed to add bus watch");
+ let bus_watch = bus
+ .add_watch_local(move |_, msg| {
+ use gst::MessageView;
+
+ let app = match app_weak.upgrade() {
+ Some(app) => app,
+ None => return glib::Continue(false),
+ };
+
+ match msg.view() {
+ MessageView::Eos(..) => app.quit(),
+ MessageView::Error(err) => {
+ println!(
+ "Error from {:?}: {} ({:?})",
+ err.src().map(|s| s.path_string()),
+ err.error(),
+ err.debug()
+ );
+ app.quit();
+ }
+ _ => (),
+ };
+
+ glib::Continue(true)
+ })
+ .expect("Failed to add bus watch");
let timeout_id = RefCell::new(Some(timeout_id));
let pipeline = RefCell::new(Some(pipeline));
+ let bus_watch = RefCell::new(Some(bus_watch));
app.connect_shutdown(move |_| {
window.close();
+ drop(bus_watch.borrow_mut().take());
if let Some(pipeline) = pipeline.borrow_mut().take() {
pipeline
.set_state(gst::State::Null)
.expect("Unable to set the pipeline to the `Null` state");
- pipeline.bus().unwrap().remove_watch().unwrap();
}
if let Some(timeout_id) = timeout_id.borrow_mut().take() {