diff options
author | Sebastian Dröge <sebastian@centricular.com> | 2020-08-05 18:24:10 +0300 |
---|---|---|
committer | Sebastian Dröge <slomo@coaxion.net> | 2020-08-06 14:56:59 +0300 |
commit | 98b618cc9d2b97ddba9b81319e923276e016c78c (patch) | |
tree | a1f94a63f4cff7172733829f0a927a00442e8e5b | |
parent | a91e8aadb2a9cb2ccb969d09eb4befe3e6a098ff (diff) |
utils/togglerecord: Fix timestamp tracking logic for partially overlapping timestamps
And various other cases. Also adjust one of the tests accordingly and
improve assertions to print more information about internal
inconsistencies.
-rw-r--r-- | utils/togglerecord/Cargo.toml | 1 | ||||
-rw-r--r-- | utils/togglerecord/src/lib.rs | 3 | ||||
-rw-r--r-- | utils/togglerecord/src/togglerecord.rs | 98 | ||||
-rw-r--r-- | utils/togglerecord/tests/tests.rs | 15 |
4 files changed, 85 insertions, 32 deletions
diff --git a/utils/togglerecord/Cargo.toml b/utils/togglerecord/Cargo.toml index 4ab677758..e71842d8b 100644 --- a/utils/togglerecord/Cargo.toml +++ b/utils/togglerecord/Cargo.toml @@ -15,6 +15,7 @@ gstreamer-video = { git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs gtk = { git = "https://github.com/gtk-rs/gtk", optional = true } gio = { git = "https://github.com/gtk-rs/gio", optional = true } parking_lot = "0.11" +more-asserts = "0.2" lazy_static = "1.0" [dev-dependencies] diff --git a/utils/togglerecord/src/lib.rs b/utils/togglerecord/src/lib.rs index 84a77a32d..7b5e286b7 100644 --- a/utils/togglerecord/src/lib.rs +++ b/utils/togglerecord/src/lib.rs @@ -25,6 +25,9 @@ extern crate gstreamer_video as gst_video; #[macro_use] extern crate lazy_static; +#[macro_use] +extern crate more_asserts; + extern crate parking_lot; mod togglerecord; diff --git a/utils/togglerecord/src/togglerecord.rs b/utils/togglerecord/src/togglerecord.rs index a7a0c7bfd..1a4ecf5a2 100644 --- a/utils/togglerecord/src/togglerecord.rs +++ b/utils/togglerecord/src/togglerecord.rs @@ -93,7 +93,9 @@ struct StreamState { in_segment: gst::FormattedSegment<gst::ClockTime>, out_segment: gst::FormattedSegment<gst::ClockTime>, segment_seqnum: gst::Seqnum, + // Start/end running time of the current/last buffer current_running_time: gst::ClockTime, + current_running_time_end: gst::ClockTime, eos: bool, flushing: bool, segment_pending: bool, @@ -110,6 +112,7 @@ impl Default for StreamState { out_segment: gst::FormattedSegment::new(), segment_seqnum: gst::Seqnum::next(), current_running_time: gst::CLOCK_TIME_NONE, + current_running_time_end: gst::CLOCK_TIME_NONE, eos: false, flushing: false, segment_pending: false, @@ -407,7 +410,9 @@ impl ToggleRecord { let current_running_time = state.in_segment.to_running_time(dts_or_pts); let current_running_time_end = state.in_segment.to_running_time(dts_or_pts_end); - state.current_running_time = cmp::max(current_running_time_end, state.current_running_time); + state.current_running_time = cmp::max(current_running_time, state.current_running_time); + state.current_running_time_end = + cmp::max(current_running_time_end, state.current_running_time_end); // Wake up everybody, we advanced a bit // Important: They will only be able to advance once we're done with this @@ -480,14 +485,16 @@ impl ToggleRecord { rec_state.recording_duration ); - // Then unlock and wait for all other streams to reach it or go EOS instead. + // Then unlock and wait for all other streams to reach a buffer that is completely + // after/at the recording stop position (i.e. can be dropped completely) or go EOS + // instead. drop(rec_state); while !self.other_streams.lock().0.iter().all(|s| { let s = s.state.lock(); s.eos || (s.current_running_time.is_some() - && s.current_running_time >= current_running_time_end) + && s.current_running_time >= current_running_time) }) { gst_log!(CAT, obj: pad, "Waiting for other streams to stop"); self.main_stream_cond.wait(&mut state); @@ -563,15 +570,16 @@ impl ToggleRecord { other_state.discont_pending = true; } - // Then unlock and wait for all other streams to reach - // it or go EOS instead + // Then unlock and wait for all other streams to reach a buffer that is completely + // after/at the recording start position (i.e. can be passed through completely) or + // go EOS instead. drop(rec_state); while !self.other_streams.lock().0.iter().all(|s| { let s = s.state.lock(); s.eos || (s.current_running_time.is_some() - && s.current_running_time >= current_running_time_end) + && s.current_running_time >= current_running_time) }) { gst_log!(CAT, obj: pad, "Waiting for other streams to start"); self.main_stream_cond.wait(&mut state); @@ -664,7 +672,9 @@ impl ToggleRecord { let current_running_time = state.in_segment.to_running_time(pts); let current_running_time_end = state.in_segment.to_running_time(pts_end); - state.current_running_time = cmp::max(current_running_time_end, state.current_running_time); + state.current_running_time = cmp::max(current_running_time, state.current_running_time); + state.current_running_time_end = + cmp::max(current_running_time_end, state.current_running_time_end); gst_log!( CAT, @@ -684,20 +694,41 @@ impl ToggleRecord { // above but all notifying must happen while the main_stream state is locked as per above. self.main_stream_cond.notify_all(); + let mut rec_state = self.state.lock(); + + // Wait until the main stream advanced completely past our current running time in + // Recording/Stopped modes to make sure we're not already outputting/dropping data that + // should actually be dropped/output if recording is started/stopped now. + // + // In Starting/Stopping mode we wait if we the start of this buffer is after last recording + // start/stop as in that case we should be in Recording/Stopped mode already. The main + // stream is waiting for us to reach that position to switch to Recording/Stopped mode so + // that in those modes we only have to pass through/drop the whole buffers. while (main_state.current_running_time == gst::CLOCK_TIME_NONE - || main_state.current_running_time < current_running_time_end) + || rec_state.recording_state != RecordingState::Starting + && rec_state.recording_state != RecordingState::Stopping + && main_state.current_running_time_end < current_running_time_end + || rec_state.recording_state == RecordingState::Starting + && rec_state.last_recording_start <= current_running_time + || rec_state.recording_state == RecordingState::Stopping + && rec_state.last_recording_stop <= current_running_time) && !main_state.eos && !stream.state.lock().flushing { gst_log!( CAT, obj: pad, - "Waiting for reaching {} / EOS / flushing, main stream at {}", + "Waiting at {}-{} in {:?} state, main stream at {}-{}", current_running_time, - main_state.current_running_time + current_running_time_end, + rec_state.recording_state, + main_state.current_running_time, + main_state.current_running_time_end ); + drop(rec_state); self.main_stream_cond.wait(&mut main_state); + rec_state = self.state.lock(); } state = stream.state.lock(); @@ -707,8 +738,6 @@ impl ToggleRecord { return Ok(HandleResult::Flushing); } - let rec_state = self.state.lock(); - // If the main stream is EOS, we are also EOS unless we are // before the final last recording stop running time if main_state.eos { @@ -821,8 +850,8 @@ impl ToggleRecord { } else { // In all other cases the buffer is fully between recording start and end and // can be passed through as is - assert!(current_running_time >= rec_state.last_recording_start); - assert!(current_running_time_end <= rec_state.last_recording_stop); + assert_ge!(current_running_time, rec_state.last_recording_start); + assert_le!(current_running_time_end, rec_state.last_recording_stop); gst_debug!( CAT, @@ -836,19 +865,27 @@ impl ToggleRecord { } } - // The end of our buffer is before the end of the previous buffer of the main stream - assert!(main_state.current_running_time >= current_running_time_end); - match rec_state.recording_state { RecordingState::Recording => { + // The end of our buffer must be before/at the end of the previous buffer of the main + // stream + assert_le!( + current_running_time_end, + main_state.current_running_time_end + ); + // We're properly started, must have a start position and // be actually after that start position assert!(rec_state.last_recording_start.is_some()); - assert!(current_running_time >= rec_state.last_recording_start); + assert_ge!(current_running_time, rec_state.last_recording_start); gst_log!(CAT, obj: pad, "Passing buffer (recording)"); Ok(HandleResult::Pass(data)) } RecordingState::Stopping => { + // The start of our buffer must be before the last recording stop as + // otherwise we would be in Stopped state already + assert_lt!(current_running_time, rec_state.last_recording_stop); + // If we have no start position yet, the main stream is waiting for a key-frame if rec_state.last_recording_stop.is_none() { gst_log!( @@ -908,11 +945,22 @@ impl ToggleRecord { } } RecordingState::Stopped => { + // The end of our buffer must be before/at the end of the previous buffer of the main + // stream + assert_le!( + current_running_time_end, + main_state.current_running_time_end + ); + // We're properly stopped gst_log!(CAT, obj: pad, "Dropping buffer (stopped)"); Ok(HandleResult::Drop) } RecordingState::Starting => { + // The start of our buffer must be before the last recording start as + // otherwise we would be in Recording state already + assert_lt!(current_running_time, rec_state.last_recording_start); + // If we have no start position yet, the main stream is waiting for a key-frame if rec_state.last_recording_start.is_none() { gst_log!( @@ -1126,6 +1174,7 @@ impl ToggleRecord { state.segment_pending = true; state.discont_pending = true; state.current_running_time = gst::CLOCK_TIME_NONE; + state.current_running_time_end = gst::CLOCK_TIME_NONE; } EventView::Caps(c) => { let mut state = stream.state.lock(); @@ -1178,6 +1227,7 @@ impl ToggleRecord { state.segment_seqnum = event.get_seqnum(); state.segment_pending = true; state.current_running_time = gst::CLOCK_TIME_NONE; + state.current_running_time_end = gst::CLOCK_TIME_NONE; gst_debug!(CAT, obj: pad, "Got new Segment {:?}", state.in_segment); @@ -1397,13 +1447,13 @@ impl ToggleRecord { obj: pad, "Returning position {} = {} - ({} + {})", recording_duration - + (state.current_running_time - rec_state.last_recording_start), + + (state.current_running_time_end - rec_state.last_recording_start), recording_duration, - state.current_running_time, + state.current_running_time_end, rec_state.last_recording_start ); recording_duration += - state.current_running_time - rec_state.last_recording_start; + state.current_running_time_end - rec_state.last_recording_start; } else { gst_debug!(CAT, obj: pad, "Returning position {}", recording_duration,); } @@ -1426,13 +1476,13 @@ impl ToggleRecord { obj: pad, "Returning duration {} = {} - ({} + {})", recording_duration - + (state.current_running_time - rec_state.last_recording_start), + + (state.current_running_time_end - rec_state.last_recording_start), recording_duration, - state.current_running_time, + state.current_running_time_end, rec_state.last_recording_start ); recording_duration += - state.current_running_time - rec_state.last_recording_start; + state.current_running_time_end - rec_state.last_recording_start; } else { gst_debug!(CAT, obj: pad, "Returning duration {}", recording_duration,); } diff --git a/utils/togglerecord/tests/tests.rs b/utils/togglerecord/tests/tests.rs index 8b05aabc0..37705f6a3 100644 --- a/utils/togglerecord/tests/tests.rs +++ b/utils/togglerecord/tests/tests.rs @@ -1067,23 +1067,22 @@ fn test_three_stream_open_close_open() { sender_input_1.send(SendData::Buffers(10)).unwrap(); sender_input_2.send(SendData::Buffers(11)).unwrap(); - sender_input_3.send(SendData::Buffers(10)).unwrap(); + sender_input_3.send(SendData::Buffers(11)).unwrap(); - // Sender 2 is waiting for sender 1 to continue, sender 1/3 are finished + // Sender 2/3 is waiting for sender 1 to continue receiver_input_done_1.recv().unwrap(); - receiver_input_done_3.recv().unwrap(); // Stop recording and push new buffers to sender 1, which will advance - // it and release the 11th buffer of sender 2 above + // it and release the 11th buffer of sender 2/3 above togglerecord.set_property("record", &false).unwrap(); sender_input_1.send(SendData::Buffers(10)).unwrap(); + receiver_input_done_2.recv().unwrap(); + receiver_input_done_3.recv().unwrap(); - // Send another 9 buffers to sender 2, 1/2 are at the same position now + // Send another 9 buffers to sender 2/3, all streams are at the same position now sender_input_2.send(SendData::Buffers(9)).unwrap(); - - // Send the remaining 10 buffers to sender 3, all are at the same position now - sender_input_3.send(SendData::Buffers(10)).unwrap(); + sender_input_3.send(SendData::Buffers(9)).unwrap(); // Wait until all 20 buffers of all senders are done receiver_input_done_1.recv().unwrap(); |