From 62791bfb47a0bb8b515ea5881a4dccf1bfa78833 Mon Sep 17 00:00:00 2001 From: "Jan Alexander Steffens (heftig)" Date: Tue, 24 Oct 2023 23:53:17 +0200 Subject: livesync: Clean up state handling - Separate resetting state more cleanly, introducing `set_flushing`, `sink_reset` and `src_reset`. - Clear the queue early when we flush, in order to unblock waits on query responses. - Return an error when we fail to start, pause or stop the task. Part-of: --- utils/livesync/src/livesync/imp.rs | 183 +++++++++++++++++++------------------ 1 file changed, 93 insertions(+), 90 deletions(-) diff --git a/utils/livesync/src/livesync/imp.rs b/utils/livesync/src/livesync/imp.rs index b6f3cef7..b5f7272c 100644 --- a/utils/livesync/src/livesync/imp.rs +++ b/utils/livesync/src/livesync/imp.rs @@ -428,9 +428,21 @@ impl ElementImpl for LiveSync { let success = self.parent_change_state(transition)?; - if transition == gst::StateChange::PlayingToPaused { - let mut state = self.state.lock(); - state.playing = false; + match transition { + gst::StateChange::PlayingToPaused => { + let mut state = self.state.lock(); + state.playing = false; + } + + gst::StateChange::PausedToReady => { + let mut state = self.state.lock(); + state.num_in = 0; + state.num_drop = 0; + state.num_out = 0; + state.num_duplicate = 0; + } + + _ => {} } match (transition, success) { @@ -506,36 +518,11 @@ impl LiveSync { return Err(gst::loggable_error!(CAT, "Wrong scheduling mode")); } - if active { - let mut state = self.state.lock(); - state.srcresult = Ok(gst::FlowSuccess::Ok); - state.eos = false; - state.in_timestamp = None; - state.num_in = 0; - state.num_drop = 0; - state.in_segment = None; - } else { - { - let mut state = self.state.lock(); - state.srcresult = Err(gst::FlowError::Flushing); - if let Some(clock_id) = state.clock_id.take() { - clock_id.unschedule(); - } - state.pending_caps = None; - state.out_audio_info = None; - state.out_buffer = None; - self.cond.notify_all(); - } + if !active { + self.set_flushing(&mut self.state.lock()); let lock = pad.stream_lock(); - { - let mut state = self.state.lock(); - state.in_caps = None; - state.in_audio_info = None; - state.queue.clear(); - state.buffer_queued = false; - state.update_fallback_duration(); - } + self.sink_reset(&mut self.state.lock()); drop(lock); } @@ -553,37 +540,49 @@ impl LiveSync { } if active { - let ret; + self.start_src_task(&mut self.state.lock()) + .map_err(|e| gst::LoggableError::new(*CAT, e))?; + } else { + let mut state = self.state.lock(); + self.set_flushing(&mut state); + self.src_reset(&mut state); + drop(state); - { - let mut state = self.state.lock(); + pad.stop_task()?; + } - state.srcresult = Ok(gst::FlowSuccess::Ok); - state.pending_segment = None; - state.out_segment = None; - state.out_timestamp = None; - state.num_out = 0; - state.num_duplicate = 0; + Ok(()) + } - ret = self.start_src_task().map_err(Into::into); - } + fn set_flushing(&self, state: &mut State) { + state.srcresult = Err(gst::FlowError::Flushing); + if let Some(clock_id) = state.clock_id.take() { + clock_id.unschedule(); + } - ret - } else { - { - let mut state = self.state.lock(); - state.srcresult = Err(gst::FlowError::Flushing); - if let Some(clock_id) = state.clock_id.take() { - clock_id.unschedule(); - } - state.pending_caps = None; - state.out_audio_info = None; - state.out_buffer = None; - self.cond.notify_all(); - } + // Ensure we drop any query response sender to unblock the sinkpad + state.queue.clear(); + state.buffer_queued = false; - pad.stop_task().map_err(Into::into) - } + self.cond.notify_all(); + } + + fn sink_reset(&self, state: &mut State) { + state.eos = false; + state.in_segment = None; + state.in_caps = None; + state.in_audio_info = None; + state.in_timestamp = None; + state.update_fallback_duration(); + } + + fn src_reset(&self, state: &mut State) { + state.pending_segment = None; + state.out_segment = None; + state.pending_caps = None; + state.out_audio_info = None; + state.out_buffer = None; + state.out_timestamp = None; } fn sink_event(&self, pad: &gst::Pad, mut event: gst::Event) -> bool { @@ -603,16 +602,13 @@ impl LiveSync { gst::EventView::FlushStart(_) => { let ret = self.srcpad.push_event(event); - { - let mut state = self.state.lock(); - state.srcresult = Err(gst::FlowError::Flushing); - if let Some(clock_id) = state.clock_id.take() { - clock_id.unschedule(); - } - self.cond.notify_all(); + self.set_flushing(&mut self.state.lock()); + + if let Err(e) = self.srcpad.pause_task() { + gst::error!(CAT, imp: self, "Failed to pause task: {e}"); + return false; } - let _ = self.srcpad.pause_task(); return ret; } @@ -620,21 +616,14 @@ impl LiveSync { let ret = self.srcpad.push_event(event); let mut state = self.state.lock(); - state.srcresult = Ok(gst::FlowSuccess::Ok); - state.eos = false; - state.in_segment = None; - state.pending_segment = None; - state.out_segment = None; - state.in_caps = None; - state.pending_caps = None; - state.in_audio_info = None; - state.out_audio_info = None; - state.queue.clear(); - state.buffer_queued = false; - state.out_buffer = None; - state.update_fallback_duration(); + self.sink_reset(&mut state); + self.src_reset(&mut state); + + if let Err(e) = self.start_src_task(&mut state) { + gst::error!(CAT, imp: self, "Failed to start task: {e}"); + return false; + } - let _ = self.start_src_task(); return ret; } @@ -689,12 +678,14 @@ impl LiveSync { let mut state = self.state.lock(); if is_restart { + state.eos = false; + if state.srcresult == Err(gst::FlowError::Eos) { - state.srcresult = Ok(gst::FlowSuccess::Ok); + if let Err(e) = self.start_src_task(&mut state) { + gst::error!(CAT, imp: self, "Failed to start task: {e}"); + return false; + } } - - state.eos = false; - let _ = self.start_src_task(); } if state.eos { @@ -739,10 +730,12 @@ impl LiveSync { { let mut state = self.state.lock(); if state.srcresult == Err(gst::FlowError::NotLinked) { - state.srcresult = Ok(gst::FlowSuccess::Ok); - let _ = self.start_src_task(); + if let Err(e) = self.start_src_task(&mut state) { + gst::error!(CAT, imp: self, "Failed to start task: {e}"); + } } } + self.sinkpad.push_event(event) } @@ -766,6 +759,7 @@ impl LiveSync { self.cond.notify_all(); drop(state); + // If the sender gets dropped, we will also unblock receiver.recv().unwrap_or(false) } else { gst::Pad::query_default(pad, Some(&*self.obj()), query) @@ -977,8 +971,16 @@ impl LiveSync { } fn start_src_task(&self, state: &mut State) -> Result<(), glib::BoolError> { + state.srcresult = Ok(gst::FlowSuccess::Ok); + let imp = self.ref_counted(); - self.srcpad.start_task(move || imp.src_loop()) + let ret = self.srcpad.start_task(move || imp.src_loop()); + + if ret.is_err() { + state.srcresult = Err(gst::FlowError::Error); + } + + ret } fn src_loop(&self) { @@ -1278,15 +1280,16 @@ impl LiveSync { None => return false, }; - let slack = state - .out_buffer - .as_deref() - .map_or(gst::ClockTime::ZERO, |b| b.duration().unwrap()); + // When out_timestamp is set, we also have an out_buffer + let slack = state.out_buffer.as_deref().unwrap().duration().unwrap(); if timestamp.start < out_timestamp.end + slack { return false; } + // This buffer would start beyond another buffer duration after our + // last emitted buffer ended + gst::debug!( CAT, imp: self, -- cgit v1.2.3