Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.freedesktop.org/gstreamer/gst-plugins-rs.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
path: root/utils
diff options
context:
space:
mode:
authorVivia Nikolaidou <vivia@ahiru.eu>2023-06-20 15:05:29 +0300
committerVivia Nikolaidou <vivia@ahiru.eu>2023-06-21 16:27:00 +0300
commit2be14b95b306cf23bd52a192106c17108d573a03 (patch)
tree6f9b8bd7c1125ca638f9b774b78030db2cf575d1 /utils
parent1119ed66202fcef3cf11d02ff6ce97fcb99c5aa4 (diff)
togglerecord: Fix nonlive inputs when element is started not recording
Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/1252>
Diffstat (limited to 'utils')
-rw-r--r--utils/togglerecord/src/togglerecord/imp.rs22
-rw-r--r--utils/togglerecord/tests/tests.rs217
2 files changed, 238 insertions, 1 deletions
diff --git a/utils/togglerecord/src/togglerecord/imp.rs b/utils/togglerecord/src/togglerecord/imp.rs
index d66437229..74fdaba0b 100644
--- a/utils/togglerecord/src/togglerecord/imp.rs
+++ b/utils/togglerecord/src/togglerecord/imp.rs
@@ -356,6 +356,14 @@ impl ToggleRecord {
upstream_live: bool,
) -> Result<bool, gst::FlowError> {
if !upstream_live {
+ let clock = self.obj().clock();
+ let mut rec_state = self.state.lock();
+ if rec_state.time_start_block.is_none() {
+ rec_state.time_start_block = clock
+ .as_ref()
+ .map_or(state.current_running_time, |c| c.time());
+ }
+ drop(rec_state);
while !settings.record && !state.flushing {
gst::debug!(CAT, obj: pad, "Waiting for record=true");
self.main_stream_cond.wait(state);
@@ -374,12 +382,20 @@ impl ToggleRecord {
}
let mut rec_state = self.state.lock();
if let Some(time_start_block) = rec_state.time_start_block {
- let clock = self.obj().clock().expect("Cannot find pipeline clock");
+ // If we have a time_start_block it means the clock is there
+ let clock = clock.expect("Cannot find pipeline clock");
rec_state.blocked_duration += clock.time().unwrap() - time_start_block;
if settings.live {
rec_state.running_time_offset = rec_state.blocked_duration.nseconds() as i64;
}
rec_state.time_start_block = gst::ClockTime::NONE;
+ } else {
+ // How did we even get here?
+ gst::warning!(
+ CAT,
+ obj: pad,
+ "Have no clock and no current running time. Will not offset buffers"
+ );
}
drop(rec_state);
gst::log!(CAT, obj: pad, "Done blocking main stream");
@@ -573,6 +589,10 @@ impl ToggleRecord {
}
}
RecordingState::Stopped => {
+ if !upstream_live {
+ rec_state.recording_state = RecordingState::Starting;
+ }
+ drop(rec_state);
if self.block_if_upstream_not_live(pad, settings, &mut state, upstream_live)? {
Ok(HandleResult::Pass(data))
} else {
diff --git a/utils/togglerecord/tests/tests.rs b/utils/togglerecord/tests/tests.rs
index 988a9b142..1a72881f4 100644
--- a/utils/togglerecord/tests/tests.rs
+++ b/utils/togglerecord/tests/tests.rs
@@ -877,6 +877,223 @@ fn test_two_stream_close_open() {
}
#[test]
+fn test_two_stream_close_open_nonlivein_nonliveout() {
+ init();
+
+ let pipeline = gst::Pipeline::default();
+ let togglerecord = gst::ElementFactory::make("togglerecord")
+ .property("is-live", false)
+ .property("record", false)
+ .build()
+ .unwrap();
+ pipeline.add(&togglerecord).unwrap();
+
+ let main_buffers_in_gap = 10u64;
+ let secondary_buffers_in_gap = main_buffers_in_gap + 1;
+ let main_buffers_after_gap = 10u64;
+ let secondary_buffers_after_gap = 9u64;
+ let recv_timeout = Duration::from_secs(10);
+
+ let (sender_input_1, receiver_input_done_1, receiver_output_1, thread_1) =
+ setup_sender_receiver(&pipeline, &togglerecord, "src", gst::ClockTime::ZERO, false);
+ let (sender_input_2, receiver_input_done_2, receiver_output_2, thread_2) =
+ setup_sender_receiver(
+ &pipeline,
+ &togglerecord,
+ "src_%u",
+ gst::ClockTime::ZERO,
+ false,
+ );
+
+ pipeline.set_state(gst::State::Playing).unwrap();
+
+ sender_input_1
+ .send(SendData::Buffers(main_buffers_in_gap as usize))
+ .unwrap();
+ assert_eq!(
+ receiver_input_done_1.recv_timeout(Duration::from_millis(20)),
+ Err(mpsc::RecvTimeoutError::Timeout)
+ );
+
+ // Sender 2 is waiting for sender 1 to continue, sender 1 is finished
+ sender_input_2
+ .send(SendData::Buffers((secondary_buffers_in_gap) as usize))
+ .unwrap();
+ assert_eq!(
+ receiver_input_done_2.recv_timeout(Duration::from_millis(20)),
+ Err(mpsc::RecvTimeoutError::Timeout)
+ );
+
+ // Start recording and push new buffers to sender 1, which will advance
+ // it and release the 11th buffer of sender 2 above
+ togglerecord.set_property("record", true);
+ sender_input_1
+ .send(SendData::Buffers(main_buffers_after_gap as usize))
+ .unwrap();
+ receiver_input_done_2.recv_timeout(recv_timeout).unwrap();
+
+ // Send another 9 buffers to sender 2, both are the same position now
+ sender_input_2
+ .send(SendData::Buffers(secondary_buffers_after_gap as usize))
+ .unwrap();
+
+ // Wait until all 20 buffers of both senders are done
+ receiver_input_done_1.recv_timeout(recv_timeout).unwrap();
+ receiver_input_done_2.recv_timeout(recv_timeout).unwrap();
+
+ // Send EOS and wait for it to be handled
+ sender_input_1.send(SendData::Eos).unwrap();
+ sender_input_2.send(SendData::Eos).unwrap();
+ receiver_input_done_1.recv_timeout(recv_timeout).unwrap();
+ receiver_input_done_2.recv_timeout(recv_timeout).unwrap();
+
+ let mut segment_1 = gst::FormattedSegment::<gst::ClockTime>::new();
+ let (buffers_1, _) = recv_buffers(&receiver_output_1, &mut segment_1, 0);
+ for (index, &(running_time, pts, duration)) in buffers_1.iter().enumerate() {
+ let index = index as u64;
+ assert_eq!(running_time.unwrap(), index * 20.mseconds());
+ assert_eq!(pts.unwrap(), index * 20.mseconds());
+ assert_eq!(duration.unwrap(), 20.mseconds());
+ }
+ assert_eq!(
+ buffers_1.len() as u64,
+ main_buffers_in_gap + main_buffers_in_gap
+ );
+
+ // Last buffer should be dropped from second stream
+ let mut segment_2 = gst::FormattedSegment::<gst::ClockTime>::new();
+ let (buffers_2, _) = recv_buffers(&receiver_output_2, &mut segment_2, 0);
+ for (index, &(running_time, pts, duration)) in buffers_2.iter().enumerate() {
+ let index = index as u64;
+ assert_eq!(running_time.unwrap(), index * 20.mseconds());
+ assert_eq!(pts.unwrap(), index * 20.mseconds());
+ assert_eq!(duration.unwrap(), 20.mseconds());
+ }
+ assert_eq!(
+ buffers_2.len() as u64,
+ secondary_buffers_in_gap + secondary_buffers_after_gap
+ );
+
+ thread_1.join().unwrap();
+ thread_2.join().unwrap();
+
+ pipeline.set_state(gst::State::Null).unwrap();
+}
+
+#[test]
+fn test_two_stream_close_open_nonlivein_liveout() {
+ init();
+
+ let testclock = gst_check::TestClock::new();
+ let pipeline = gst::Pipeline::default();
+ pipeline.use_clock(Some(&testclock));
+ let togglerecord = gst::ElementFactory::make("togglerecord")
+ .property("is-live", true)
+ .property("record", false)
+ .build()
+ .unwrap();
+ togglerecord.set_clock(Some(&testclock)).unwrap();
+ pipeline.add(&togglerecord).unwrap();
+ let testclock = testclock.downcast::<gst_check::TestClock>().unwrap();
+ testclock.set_time(gst::ClockTime::ZERO);
+
+ let main_buffers_in_gap = 10u64;
+ let secondary_buffers_in_gap = main_buffers_in_gap + 1;
+ let main_buffers_after_gap = 10u64;
+ let secondary_buffers_after_gap = 9u64;
+ let recv_timeout = Duration::from_secs(10);
+
+ let (sender_input_1, receiver_input_done_1, receiver_output_1, thread_1) =
+ setup_sender_receiver(&pipeline, &togglerecord, "src", gst::ClockTime::ZERO, false);
+ let (sender_input_2, receiver_input_done_2, receiver_output_2, thread_2) =
+ setup_sender_receiver(
+ &pipeline,
+ &togglerecord,
+ "src_%u",
+ gst::ClockTime::ZERO,
+ false,
+ );
+
+ pipeline.set_state(gst::State::Playing).unwrap();
+
+ sender_input_1
+ .send(SendData::Buffers(main_buffers_in_gap as usize))
+ .unwrap();
+ assert_eq!(
+ receiver_input_done_1.recv_timeout(Duration::from_millis(20)),
+ Err(mpsc::RecvTimeoutError::Timeout)
+ );
+
+ // Sender 2 is waiting for sender 1 to continue, sender 1 is finished
+ sender_input_2
+ .send(SendData::Buffers((secondary_buffers_in_gap) as usize))
+ .unwrap();
+ assert_eq!(
+ receiver_input_done_2.recv_timeout(Duration::from_millis(20)),
+ Err(mpsc::RecvTimeoutError::Timeout)
+ );
+
+ // Advance the clock
+ let block_time = gst::ClockTime::from_mseconds(42);
+ testclock.advance_time(block_time.nseconds() as i64);
+
+ // Start recording and push new buffers to sender 1, which will advance
+ // it and release the 11th buffer of sender 2 above
+ togglerecord.set_property("record", true);
+ sender_input_1
+ .send(SendData::Buffers(main_buffers_after_gap as usize))
+ .unwrap();
+ receiver_input_done_2.recv_timeout(recv_timeout).unwrap();
+
+ // Send another 9 buffers to sender 2, both are the same position now
+ sender_input_2
+ .send(SendData::Buffers(secondary_buffers_after_gap as usize))
+ .unwrap();
+
+ // Wait until all 20 buffers of both senders are done
+ receiver_input_done_1.recv_timeout(recv_timeout).unwrap();
+ receiver_input_done_2.recv_timeout(recv_timeout).unwrap();
+
+ // Send EOS and wait for it to be handled
+ sender_input_1.send(SendData::Eos).unwrap();
+ sender_input_2.send(SendData::Eos).unwrap();
+ receiver_input_done_1.recv_timeout(recv_timeout).unwrap();
+ receiver_input_done_2.recv_timeout(recv_timeout).unwrap();
+
+ let mut segment_1 = gst::FormattedSegment::<gst::ClockTime>::new();
+ let (buffers_1, _) = recv_buffers(&receiver_output_1, &mut segment_1, 0);
+ for (index, &(running_time, pts, duration)) in buffers_1.iter().enumerate() {
+ let index = index as u64;
+ assert_eq!(running_time.unwrap(), block_time + index * 20.mseconds());
+ assert_eq!(pts.unwrap(), index * 20.mseconds());
+ assert_eq!(duration.unwrap(), 20.mseconds());
+ }
+ assert_eq!(
+ buffers_1.len() as u64,
+ main_buffers_in_gap + main_buffers_in_gap
+ );
+
+ // Last buffer should be dropped from second stream
+ let mut segment_2 = gst::FormattedSegment::<gst::ClockTime>::new();
+ let (buffers_2, _) = recv_buffers(&receiver_output_2, &mut segment_2, 0);
+ for (index, &(running_time, pts, duration)) in buffers_2.iter().enumerate() {
+ let index = index as u64;
+ assert_eq!(running_time.unwrap(), block_time + index * 20.mseconds());
+ assert_eq!(pts.unwrap(), index * 20.mseconds());
+ assert_eq!(duration.unwrap(), 20.mseconds());
+ }
+ assert_eq!(
+ buffers_2.len() as u64,
+ secondary_buffers_in_gap + secondary_buffers_after_gap
+ );
+
+ thread_1.join().unwrap();
+ thread_2.join().unwrap();
+
+ pipeline.set_state(gst::State::Null).unwrap();
+}
+
+#[test]
fn test_two_stream_open_close_open_nonlivein_liveout() {
init();