Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.freedesktop.org/gstreamer/gst-plugins-rs.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJan Alexander Steffens (heftig) <jan.steffens@ltnglobal.com>2023-10-25 00:53:17 +0300
committerJan Alexander Steffens (heftig) <jan.steffens@ltnglobal.com>2023-10-25 12:52:40 +0300
commit62791bfb47a0bb8b515ea5881a4dccf1bfa78833 (patch)
treeb680daa3d2af3fad47b97984373cb30aab8d8c42
parentd663f708ef324d385ef188499d116a2bcafc8979 (diff)
livesync: Clean up state handling
- Separate resetting state more cleanly, introducing `set_flushing`, `sink_reset` and `src_reset`. - Clear the queue early when we flush, in order to unblock waits on query responses. - Return an error when we fail to start, pause or stop the task. Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/1369>
-rw-r--r--utils/livesync/src/livesync/imp.rs183
1 files changed, 93 insertions, 90 deletions
diff --git a/utils/livesync/src/livesync/imp.rs b/utils/livesync/src/livesync/imp.rs
index b6f3cef74..b5f7272c4 100644
--- a/utils/livesync/src/livesync/imp.rs
+++ b/utils/livesync/src/livesync/imp.rs
@@ -428,9 +428,21 @@ impl ElementImpl for LiveSync {
let success = self.parent_change_state(transition)?;
- if transition == gst::StateChange::PlayingToPaused {
- let mut state = self.state.lock();
- state.playing = false;
+ match transition {
+ gst::StateChange::PlayingToPaused => {
+ let mut state = self.state.lock();
+ state.playing = false;
+ }
+
+ gst::StateChange::PausedToReady => {
+ let mut state = self.state.lock();
+ state.num_in = 0;
+ state.num_drop = 0;
+ state.num_out = 0;
+ state.num_duplicate = 0;
+ }
+
+ _ => {}
}
match (transition, success) {
@@ -506,36 +518,11 @@ impl LiveSync {
return Err(gst::loggable_error!(CAT, "Wrong scheduling mode"));
}
- if active {
- let mut state = self.state.lock();
- state.srcresult = Ok(gst::FlowSuccess::Ok);
- state.eos = false;
- state.in_timestamp = None;
- state.num_in = 0;
- state.num_drop = 0;
- state.in_segment = None;
- } else {
- {
- let mut state = self.state.lock();
- state.srcresult = Err(gst::FlowError::Flushing);
- if let Some(clock_id) = state.clock_id.take() {
- clock_id.unschedule();
- }
- state.pending_caps = None;
- state.out_audio_info = None;
- state.out_buffer = None;
- self.cond.notify_all();
- }
+ if !active {
+ self.set_flushing(&mut self.state.lock());
let lock = pad.stream_lock();
- {
- let mut state = self.state.lock();
- state.in_caps = None;
- state.in_audio_info = None;
- state.queue.clear();
- state.buffer_queued = false;
- state.update_fallback_duration();
- }
+ self.sink_reset(&mut self.state.lock());
drop(lock);
}
@@ -553,37 +540,49 @@ impl LiveSync {
}
if active {
- let ret;
+ self.start_src_task(&mut self.state.lock())
+ .map_err(|e| gst::LoggableError::new(*CAT, e))?;
+ } else {
+ let mut state = self.state.lock();
+ self.set_flushing(&mut state);
+ self.src_reset(&mut state);
+ drop(state);
- {
- let mut state = self.state.lock();
+ pad.stop_task()?;
+ }
- state.srcresult = Ok(gst::FlowSuccess::Ok);
- state.pending_segment = None;
- state.out_segment = None;
- state.out_timestamp = None;
- state.num_out = 0;
- state.num_duplicate = 0;
+ Ok(())
+ }
- ret = self.start_src_task().map_err(Into::into);
- }
+ fn set_flushing(&self, state: &mut State) {
+ state.srcresult = Err(gst::FlowError::Flushing);
+ if let Some(clock_id) = state.clock_id.take() {
+ clock_id.unschedule();
+ }
- ret
- } else {
- {
- let mut state = self.state.lock();
- state.srcresult = Err(gst::FlowError::Flushing);
- if let Some(clock_id) = state.clock_id.take() {
- clock_id.unschedule();
- }
- state.pending_caps = None;
- state.out_audio_info = None;
- state.out_buffer = None;
- self.cond.notify_all();
- }
+ // Ensure we drop any query response sender to unblock the sinkpad
+ state.queue.clear();
+ state.buffer_queued = false;
- pad.stop_task().map_err(Into::into)
- }
+ self.cond.notify_all();
+ }
+
+ fn sink_reset(&self, state: &mut State) {
+ state.eos = false;
+ state.in_segment = None;
+ state.in_caps = None;
+ state.in_audio_info = None;
+ state.in_timestamp = None;
+ state.update_fallback_duration();
+ }
+
+ fn src_reset(&self, state: &mut State) {
+ state.pending_segment = None;
+ state.out_segment = None;
+ state.pending_caps = None;
+ state.out_audio_info = None;
+ state.out_buffer = None;
+ state.out_timestamp = None;
}
fn sink_event(&self, pad: &gst::Pad, mut event: gst::Event) -> bool {
@@ -603,16 +602,13 @@ impl LiveSync {
gst::EventView::FlushStart(_) => {
let ret = self.srcpad.push_event(event);
- {
- let mut state = self.state.lock();
- state.srcresult = Err(gst::FlowError::Flushing);
- if let Some(clock_id) = state.clock_id.take() {
- clock_id.unschedule();
- }
- self.cond.notify_all();
+ self.set_flushing(&mut self.state.lock());
+
+ if let Err(e) = self.srcpad.pause_task() {
+ gst::error!(CAT, imp: self, "Failed to pause task: {e}");
+ return false;
}
- let _ = self.srcpad.pause_task();
return ret;
}
@@ -620,21 +616,14 @@ impl LiveSync {
let ret = self.srcpad.push_event(event);
let mut state = self.state.lock();
- state.srcresult = Ok(gst::FlowSuccess::Ok);
- state.eos = false;
- state.in_segment = None;
- state.pending_segment = None;
- state.out_segment = None;
- state.in_caps = None;
- state.pending_caps = None;
- state.in_audio_info = None;
- state.out_audio_info = None;
- state.queue.clear();
- state.buffer_queued = false;
- state.out_buffer = None;
- state.update_fallback_duration();
+ self.sink_reset(&mut state);
+ self.src_reset(&mut state);
+
+ if let Err(e) = self.start_src_task(&mut state) {
+ gst::error!(CAT, imp: self, "Failed to start task: {e}");
+ return false;
+ }
- let _ = self.start_src_task();
return ret;
}
@@ -689,12 +678,14 @@ impl LiveSync {
let mut state = self.state.lock();
if is_restart {
+ state.eos = false;
+
if state.srcresult == Err(gst::FlowError::Eos) {
- state.srcresult = Ok(gst::FlowSuccess::Ok);
+ if let Err(e) = self.start_src_task(&mut state) {
+ gst::error!(CAT, imp: self, "Failed to start task: {e}");
+ return false;
+ }
}
-
- state.eos = false;
- let _ = self.start_src_task();
}
if state.eos {
@@ -739,10 +730,12 @@ impl LiveSync {
{
let mut state = self.state.lock();
if state.srcresult == Err(gst::FlowError::NotLinked) {
- state.srcresult = Ok(gst::FlowSuccess::Ok);
- let _ = self.start_src_task();
+ if let Err(e) = self.start_src_task(&mut state) {
+ gst::error!(CAT, imp: self, "Failed to start task: {e}");
+ }
}
}
+
self.sinkpad.push_event(event)
}
@@ -766,6 +759,7 @@ impl LiveSync {
self.cond.notify_all();
drop(state);
+ // If the sender gets dropped, we will also unblock
receiver.recv().unwrap_or(false)
} else {
gst::Pad::query_default(pad, Some(&*self.obj()), query)
@@ -977,8 +971,16 @@ impl LiveSync {
}
fn start_src_task(&self, state: &mut State) -> Result<(), glib::BoolError> {
+ state.srcresult = Ok(gst::FlowSuccess::Ok);
+
let imp = self.ref_counted();
- self.srcpad.start_task(move || imp.src_loop())
+ let ret = self.srcpad.start_task(move || imp.src_loop());
+
+ if ret.is_err() {
+ state.srcresult = Err(gst::FlowError::Error);
+ }
+
+ ret
}
fn src_loop(&self) {
@@ -1278,15 +1280,16 @@ impl LiveSync {
None => return false,
};
- let slack = state
- .out_buffer
- .as_deref()
- .map_or(gst::ClockTime::ZERO, |b| b.duration().unwrap());
+ // When out_timestamp is set, we also have an out_buffer
+ let slack = state.out_buffer.as_deref().unwrap().duration().unwrap();
if timestamp.start < out_timestamp.end + slack {
return false;
}
+ // This buffer would start beyond another buffer duration after our
+ // last emitted buffer ended
+
gst::debug!(
CAT,
imp: self,