diff options
author | Sebastian Dröge <sebastian@centricular.com> | 2023-04-14 12:46:43 +0300 |
---|---|---|
committer | Sebastian Dröge <sebastian@centricular.com> | 2023-04-14 12:46:43 +0300 |
commit | 47159ad3c224e01e634a66d0afa3baa43066c228 (patch) | |
tree | de56561d41545c31d79eaf73721f8b749941f66b /generic | |
parent | aabfb61834375301e313fd8fd8b8fc6849b57695 (diff) |
Make sure to keep around and drop bus watches after usage in all the examples
Diffstat (limited to 'generic')
-rw-r--r-- | generic/threadshare/examples/benchmark.rs | 41 | ||||
-rw-r--r-- | generic/threadshare/examples/standalone/main.rs | 69 | ||||
-rw-r--r-- | generic/threadshare/examples/udpsrc_benchmark_sender.rs | 49 | ||||
-rw-r--r-- | generic/threadshare/tests/pipeline.rs | 120 |
4 files changed, 142 insertions, 137 deletions
diff --git a/generic/threadshare/examples/benchmark.rs b/generic/threadshare/examples/benchmark.rs index 9ecfce632..3d8a893ce 100644 --- a/generic/threadshare/examples/benchmark.rs +++ b/generic/threadshare/examples/benchmark.rs @@ -192,27 +192,28 @@ fn main() { let bus = pipeline.bus().unwrap(); let l_clone = l.clone(); - bus.add_watch(move |_, msg| { - use gst::MessageView; - - match msg.view() { - MessageView::Eos(..) => l_clone.quit(), - MessageView::Error(err) => { - gst::error!( - CAT, - "Error from {:?}: {} ({:?})", - err.src().map(|s| s.path_string()), - err.error(), - err.debug() - ); - l_clone.quit(); - } - _ => (), - }; + let _bus_watch = bus + .add_watch(move |_, msg| { + use gst::MessageView; + + match msg.view() { + MessageView::Eos(..) => l_clone.quit(), + MessageView::Error(err) => { + gst::error!( + CAT, + "Error from {:?}: {} ({:?})", + err.src().map(|s| s.path_string()), + err.error(), + err.debug() + ); + l_clone.quit(); + } + _ => (), + }; - glib::Continue(true) - }) - .expect("Failed to add bus watch"); + glib::Continue(true) + }) + .expect("Failed to add bus watch"); pipeline.set_state(gst::State::Playing).unwrap(); diff --git a/generic/threadshare/examples/standalone/main.rs b/generic/threadshare/examples/standalone/main.rs index ff1edc416..54340ef4e 100644 --- a/generic/threadshare/examples/standalone/main.rs +++ b/generic/threadshare/examples/standalone/main.rs @@ -108,45 +108,46 @@ fn main() { let terminated_count = Arc::new(AtomicU32::new(0)); let pipeline_clone = pipeline.clone(); let l_clone = l.clone(); - bus.add_watch(move |_, msg| { - use gst::MessageView; - match msg.view() { - MessageView::Eos(_) => { - // Actually, we don't post EOS (see sinks impl). - gst::info!(CAT, "Received eos"); - l_clone.quit(); - - glib::Continue(false) - } - MessageView::Error(msg) => { - if let gst::MessageView::Error(msg) = msg.message().view() { - if msg.error().matches(gst::LibraryError::Shutdown) { - if terminated_count.fetch_add(1, Ordering::SeqCst) == args.streams - 1 { - gst::info!(CAT, "Received all shutdown requests"); - l_clone.quit(); - - return glib::Continue(false); - } else { - return glib::Continue(true); + let _bus_watch = bus + .add_watch(move |_, msg| { + use gst::MessageView; + match msg.view() { + MessageView::Eos(_) => { + // Actually, we don't post EOS (see sinks impl). + gst::info!(CAT, "Received eos"); + l_clone.quit(); + + glib::Continue(false) + } + MessageView::Error(msg) => { + if let gst::MessageView::Error(msg) = msg.message().view() { + if msg.error().matches(gst::LibraryError::Shutdown) { + if terminated_count.fetch_add(1, Ordering::SeqCst) == args.streams - 1 { + gst::info!(CAT, "Received all shutdown requests"); + l_clone.quit(); + + return glib::Continue(false); + } else { + return glib::Continue(true); + } } } - } - gst::error!( - CAT, - "Error from {:?}: {} ({:?})", - msg.src().map(|s| s.path_string()), - msg.error(), - msg.debug() - ); - l_clone.quit(); + gst::error!( + CAT, + "Error from {:?}: {} ({:?})", + msg.src().map(|s| s.path_string()), + msg.error(), + msg.debug() + ); + l_clone.quit(); - glib::Continue(false) + glib::Continue(false) + } + _ => glib::Continue(true), } - _ => glib::Continue(true), - } - }) - .expect("Failed to add bus watch"); + }) + .expect("Failed to add bus watch"); gst::info!(CAT, "Switching to Ready"); let start = Instant::now(); diff --git a/generic/threadshare/examples/udpsrc_benchmark_sender.rs b/generic/threadshare/examples/udpsrc_benchmark_sender.rs index 497225d4f..bb7ce23fb 100644 --- a/generic/threadshare/examples/udpsrc_benchmark_sender.rs +++ b/generic/threadshare/examples/udpsrc_benchmark_sender.rs @@ -170,31 +170,32 @@ fn run(pipeline: gst::Pipeline) { let bus = pipeline.bus().unwrap(); let l_clone = l.clone(); - bus.add_watch(move |_, msg| { - use gst::MessageView; - match msg.view() { - MessageView::Eos(_) => { - gst::info!(CAT, "Received eos"); - l_clone.quit(); - - glib::Continue(false) + let _bus_watch = bus + .add_watch(move |_, msg| { + use gst::MessageView; + match msg.view() { + MessageView::Eos(_) => { + gst::info!(CAT, "Received eos"); + l_clone.quit(); + + glib::Continue(false) + } + MessageView::Error(msg) => { + gst::error!( + CAT, + "Error from {:?}: {} ({:?})", + msg.src().map(|s| s.path_string()), + msg.error(), + msg.debug() + ); + l_clone.quit(); + + glib::Continue(false) + } + _ => glib::Continue(true), } - MessageView::Error(msg) => { - gst::error!( - CAT, - "Error from {:?}: {} ({:?})", - msg.src().map(|s| s.path_string()), - msg.error(), - msg.debug() - ); - l_clone.quit(); - - glib::Continue(false) - } - _ => glib::Continue(true), - } - }) - .expect("Failed to add bus watch"); + }) + .expect("Failed to add bus watch"); pipeline.set_state(gst::State::Playing).unwrap(); l.run(); diff --git a/generic/threadshare/tests/pipeline.rs b/generic/threadshare/tests/pipeline.rs index 91e5882a8..d27eef6e9 100644 --- a/generic/threadshare/tests/pipeline.rs +++ b/generic/threadshare/tests/pipeline.rs @@ -135,38 +135,39 @@ fn multiple_contexts_queue() { let bus = pipeline.bus().unwrap(); let l_clone = l.clone(); - bus.add_watch(move |_, msg| { - use gst::MessageView; - - match msg.view() { - MessageView::StateChanged(state_changed) => { - if let Some(source) = state_changed.src() { - if source.type_() == gst::Pipeline::static_type() - && state_changed.old() == gst::State::Paused - && state_changed.current() == gst::State::Playing - { - if let Some(test_scenario) = test_scenario.take() { - std::thread::spawn(test_scenario); + let _bus_watch = bus + .add_watch(move |_, msg| { + use gst::MessageView; + + match msg.view() { + MessageView::StateChanged(state_changed) => { + if let Some(source) = state_changed.src() { + if source.type_() == gst::Pipeline::static_type() + && state_changed.old() == gst::State::Paused + && state_changed.current() == gst::State::Playing + { + if let Some(test_scenario) = test_scenario.take() { + std::thread::spawn(test_scenario); + } } } } - } - MessageView::Error(err) => { - gst::error!( - CAT, - "multiple_contexts_queue: Error from {:?}: {} ({:?})", - err.src().map(|s| s.path_string()), - err.error(), - err.debug() - ); - l_clone.quit(); - } - _ => (), - }; + MessageView::Error(err) => { + gst::error!( + CAT, + "multiple_contexts_queue: Error from {:?}: {} ({:?})", + err.src().map(|s| s.path_string()), + err.error(), + err.debug() + ); + l_clone.quit(); + } + _ => (), + }; - glib::Continue(true) - }) - .unwrap(); + glib::Continue(true) + }) + .unwrap(); pipeline.set_state(gst::State::Playing).unwrap(); @@ -281,38 +282,39 @@ fn multiple_contexts_proxy() { let bus = pipeline.bus().unwrap(); let l_clone = l.clone(); - bus.add_watch(move |_, msg| { - use gst::MessageView; - - match msg.view() { - MessageView::StateChanged(state_changed) => { - if let Some(source) = state_changed.src() { - if source.type_() == gst::Pipeline::static_type() - && state_changed.old() == gst::State::Paused - && state_changed.current() == gst::State::Playing - { - if let Some(test_scenario) = test_scenario.take() { - std::thread::spawn(test_scenario); + let _bus_watch = bus + .add_watch(move |_, msg| { + use gst::MessageView; + + match msg.view() { + MessageView::StateChanged(state_changed) => { + if let Some(source) = state_changed.src() { + if source.type_() == gst::Pipeline::static_type() + && state_changed.old() == gst::State::Paused + && state_changed.current() == gst::State::Playing + { + if let Some(test_scenario) = test_scenario.take() { + std::thread::spawn(test_scenario); + } } } } - } - MessageView::Error(err) => { - gst::error!( - CAT, - "multiple_contexts_proxy: Error from {:?}: {} ({:?})", - err.src().map(|s| s.path_string()), - err.error(), - err.debug() - ); - l_clone.quit(); - } - _ => (), - }; + MessageView::Error(err) => { + gst::error!( + CAT, + "multiple_contexts_proxy: Error from {:?}: {} ({:?})", + err.src().map(|s| s.path_string()), + err.error(), + err.debug() + ); + l_clone.quit(); + } + _ => (), + }; - glib::Continue(true) - }) - .unwrap(); + glib::Continue(true) + }) + .unwrap(); pipeline.set_state(gst::State::Playing).unwrap(); @@ -405,7 +407,7 @@ fn eos() { }); let l_clone = l.clone(); - pipeline + let _bus_watch = pipeline .bus() .unwrap() .add_watch(move |_, msg| { @@ -561,7 +563,7 @@ fn premature_shutdown() { }); let l_clone = l.clone(); - pipeline + let _bus_watch = pipeline .bus() .unwrap() .add_watch(move |_, msg| { @@ -657,7 +659,7 @@ fn socket_play_null_play() { }); let l_clone = l.clone(); - pipeline + let _bus_watch = pipeline .bus() .unwrap() .add_watch(move |_, msg| { |