Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.freedesktop.org/gstreamer/gst-plugins-rs.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--utils/togglerecord/Cargo.toml1
-rw-r--r--utils/togglerecord/src/lib.rs3
-rw-r--r--utils/togglerecord/src/togglerecord.rs98
-rw-r--r--utils/togglerecord/tests/tests.rs15
4 files changed, 85 insertions, 32 deletions
diff --git a/utils/togglerecord/Cargo.toml b/utils/togglerecord/Cargo.toml
index 4ab677758..e71842d8b 100644
--- a/utils/togglerecord/Cargo.toml
+++ b/utils/togglerecord/Cargo.toml
@@ -15,6 +15,7 @@ gstreamer-video = { git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs
gtk = { git = "https://github.com/gtk-rs/gtk", optional = true }
gio = { git = "https://github.com/gtk-rs/gio", optional = true }
parking_lot = "0.11"
+more-asserts = "0.2"
lazy_static = "1.0"
[dev-dependencies]
diff --git a/utils/togglerecord/src/lib.rs b/utils/togglerecord/src/lib.rs
index 84a77a32d..7b5e286b7 100644
--- a/utils/togglerecord/src/lib.rs
+++ b/utils/togglerecord/src/lib.rs
@@ -25,6 +25,9 @@ extern crate gstreamer_video as gst_video;
#[macro_use]
extern crate lazy_static;
+#[macro_use]
+extern crate more_asserts;
+
extern crate parking_lot;
mod togglerecord;
diff --git a/utils/togglerecord/src/togglerecord.rs b/utils/togglerecord/src/togglerecord.rs
index a7a0c7bfd..1a4ecf5a2 100644
--- a/utils/togglerecord/src/togglerecord.rs
+++ b/utils/togglerecord/src/togglerecord.rs
@@ -93,7 +93,9 @@ struct StreamState {
in_segment: gst::FormattedSegment<gst::ClockTime>,
out_segment: gst::FormattedSegment<gst::ClockTime>,
segment_seqnum: gst::Seqnum,
+ // Start/end running time of the current/last buffer
current_running_time: gst::ClockTime,
+ current_running_time_end: gst::ClockTime,
eos: bool,
flushing: bool,
segment_pending: bool,
@@ -110,6 +112,7 @@ impl Default for StreamState {
out_segment: gst::FormattedSegment::new(),
segment_seqnum: gst::Seqnum::next(),
current_running_time: gst::CLOCK_TIME_NONE,
+ current_running_time_end: gst::CLOCK_TIME_NONE,
eos: false,
flushing: false,
segment_pending: false,
@@ -407,7 +410,9 @@ impl ToggleRecord {
let current_running_time = state.in_segment.to_running_time(dts_or_pts);
let current_running_time_end = state.in_segment.to_running_time(dts_or_pts_end);
- state.current_running_time = cmp::max(current_running_time_end, state.current_running_time);
+ state.current_running_time = cmp::max(current_running_time, state.current_running_time);
+ state.current_running_time_end =
+ cmp::max(current_running_time_end, state.current_running_time_end);
// Wake up everybody, we advanced a bit
// Important: They will only be able to advance once we're done with this
@@ -480,14 +485,16 @@ impl ToggleRecord {
rec_state.recording_duration
);
- // Then unlock and wait for all other streams to reach it or go EOS instead.
+ // Then unlock and wait for all other streams to reach a buffer that is completely
+ // after/at the recording stop position (i.e. can be dropped completely) or go EOS
+ // instead.
drop(rec_state);
while !self.other_streams.lock().0.iter().all(|s| {
let s = s.state.lock();
s.eos
|| (s.current_running_time.is_some()
- && s.current_running_time >= current_running_time_end)
+ && s.current_running_time >= current_running_time)
}) {
gst_log!(CAT, obj: pad, "Waiting for other streams to stop");
self.main_stream_cond.wait(&mut state);
@@ -563,15 +570,16 @@ impl ToggleRecord {
other_state.discont_pending = true;
}
- // Then unlock and wait for all other streams to reach
- // it or go EOS instead
+ // Then unlock and wait for all other streams to reach a buffer that is completely
+ // after/at the recording start position (i.e. can be passed through completely) or
+ // go EOS instead.
drop(rec_state);
while !self.other_streams.lock().0.iter().all(|s| {
let s = s.state.lock();
s.eos
|| (s.current_running_time.is_some()
- && s.current_running_time >= current_running_time_end)
+ && s.current_running_time >= current_running_time)
}) {
gst_log!(CAT, obj: pad, "Waiting for other streams to start");
self.main_stream_cond.wait(&mut state);
@@ -664,7 +672,9 @@ impl ToggleRecord {
let current_running_time = state.in_segment.to_running_time(pts);
let current_running_time_end = state.in_segment.to_running_time(pts_end);
- state.current_running_time = cmp::max(current_running_time_end, state.current_running_time);
+ state.current_running_time = cmp::max(current_running_time, state.current_running_time);
+ state.current_running_time_end =
+ cmp::max(current_running_time_end, state.current_running_time_end);
gst_log!(
CAT,
@@ -684,20 +694,41 @@ impl ToggleRecord {
// above but all notifying must happen while the main_stream state is locked as per above.
self.main_stream_cond.notify_all();
+ let mut rec_state = self.state.lock();
+
+ // Wait until the main stream advanced completely past our current running time in
+ // Recording/Stopped modes to make sure we're not already outputting/dropping data that
+ // should actually be dropped/output if recording is started/stopped now.
+ //
+ // In Starting/Stopping mode we wait if we the start of this buffer is after last recording
+ // start/stop as in that case we should be in Recording/Stopped mode already. The main
+ // stream is waiting for us to reach that position to switch to Recording/Stopped mode so
+ // that in those modes we only have to pass through/drop the whole buffers.
while (main_state.current_running_time == gst::CLOCK_TIME_NONE
- || main_state.current_running_time < current_running_time_end)
+ || rec_state.recording_state != RecordingState::Starting
+ && rec_state.recording_state != RecordingState::Stopping
+ && main_state.current_running_time_end < current_running_time_end
+ || rec_state.recording_state == RecordingState::Starting
+ && rec_state.last_recording_start <= current_running_time
+ || rec_state.recording_state == RecordingState::Stopping
+ && rec_state.last_recording_stop <= current_running_time)
&& !main_state.eos
&& !stream.state.lock().flushing
{
gst_log!(
CAT,
obj: pad,
- "Waiting for reaching {} / EOS / flushing, main stream at {}",
+ "Waiting at {}-{} in {:?} state, main stream at {}-{}",
current_running_time,
- main_state.current_running_time
+ current_running_time_end,
+ rec_state.recording_state,
+ main_state.current_running_time,
+ main_state.current_running_time_end
);
+ drop(rec_state);
self.main_stream_cond.wait(&mut main_state);
+ rec_state = self.state.lock();
}
state = stream.state.lock();
@@ -707,8 +738,6 @@ impl ToggleRecord {
return Ok(HandleResult::Flushing);
}
- let rec_state = self.state.lock();
-
// If the main stream is EOS, we are also EOS unless we are
// before the final last recording stop running time
if main_state.eos {
@@ -821,8 +850,8 @@ impl ToggleRecord {
} else {
// In all other cases the buffer is fully between recording start and end and
// can be passed through as is
- assert!(current_running_time >= rec_state.last_recording_start);
- assert!(current_running_time_end <= rec_state.last_recording_stop);
+ assert_ge!(current_running_time, rec_state.last_recording_start);
+ assert_le!(current_running_time_end, rec_state.last_recording_stop);
gst_debug!(
CAT,
@@ -836,19 +865,27 @@ impl ToggleRecord {
}
}
- // The end of our buffer is before the end of the previous buffer of the main stream
- assert!(main_state.current_running_time >= current_running_time_end);
-
match rec_state.recording_state {
RecordingState::Recording => {
+ // The end of our buffer must be before/at the end of the previous buffer of the main
+ // stream
+ assert_le!(
+ current_running_time_end,
+ main_state.current_running_time_end
+ );
+
// We're properly started, must have a start position and
// be actually after that start position
assert!(rec_state.last_recording_start.is_some());
- assert!(current_running_time >= rec_state.last_recording_start);
+ assert_ge!(current_running_time, rec_state.last_recording_start);
gst_log!(CAT, obj: pad, "Passing buffer (recording)");
Ok(HandleResult::Pass(data))
}
RecordingState::Stopping => {
+ // The start of our buffer must be before the last recording stop as
+ // otherwise we would be in Stopped state already
+ assert_lt!(current_running_time, rec_state.last_recording_stop);
+
// If we have no start position yet, the main stream is waiting for a key-frame
if rec_state.last_recording_stop.is_none() {
gst_log!(
@@ -908,11 +945,22 @@ impl ToggleRecord {
}
}
RecordingState::Stopped => {
+ // The end of our buffer must be before/at the end of the previous buffer of the main
+ // stream
+ assert_le!(
+ current_running_time_end,
+ main_state.current_running_time_end
+ );
+
// We're properly stopped
gst_log!(CAT, obj: pad, "Dropping buffer (stopped)");
Ok(HandleResult::Drop)
}
RecordingState::Starting => {
+ // The start of our buffer must be before the last recording start as
+ // otherwise we would be in Recording state already
+ assert_lt!(current_running_time, rec_state.last_recording_start);
+
// If we have no start position yet, the main stream is waiting for a key-frame
if rec_state.last_recording_start.is_none() {
gst_log!(
@@ -1126,6 +1174,7 @@ impl ToggleRecord {
state.segment_pending = true;
state.discont_pending = true;
state.current_running_time = gst::CLOCK_TIME_NONE;
+ state.current_running_time_end = gst::CLOCK_TIME_NONE;
}
EventView::Caps(c) => {
let mut state = stream.state.lock();
@@ -1178,6 +1227,7 @@ impl ToggleRecord {
state.segment_seqnum = event.get_seqnum();
state.segment_pending = true;
state.current_running_time = gst::CLOCK_TIME_NONE;
+ state.current_running_time_end = gst::CLOCK_TIME_NONE;
gst_debug!(CAT, obj: pad, "Got new Segment {:?}", state.in_segment);
@@ -1397,13 +1447,13 @@ impl ToggleRecord {
obj: pad,
"Returning position {} = {} - ({} + {})",
recording_duration
- + (state.current_running_time - rec_state.last_recording_start),
+ + (state.current_running_time_end - rec_state.last_recording_start),
recording_duration,
- state.current_running_time,
+ state.current_running_time_end,
rec_state.last_recording_start
);
recording_duration +=
- state.current_running_time - rec_state.last_recording_start;
+ state.current_running_time_end - rec_state.last_recording_start;
} else {
gst_debug!(CAT, obj: pad, "Returning position {}", recording_duration,);
}
@@ -1426,13 +1476,13 @@ impl ToggleRecord {
obj: pad,
"Returning duration {} = {} - ({} + {})",
recording_duration
- + (state.current_running_time - rec_state.last_recording_start),
+ + (state.current_running_time_end - rec_state.last_recording_start),
recording_duration,
- state.current_running_time,
+ state.current_running_time_end,
rec_state.last_recording_start
);
recording_duration +=
- state.current_running_time - rec_state.last_recording_start;
+ state.current_running_time_end - rec_state.last_recording_start;
} else {
gst_debug!(CAT, obj: pad, "Returning duration {}", recording_duration,);
}
diff --git a/utils/togglerecord/tests/tests.rs b/utils/togglerecord/tests/tests.rs
index 8b05aabc0..37705f6a3 100644
--- a/utils/togglerecord/tests/tests.rs
+++ b/utils/togglerecord/tests/tests.rs
@@ -1067,23 +1067,22 @@ fn test_three_stream_open_close_open() {
sender_input_1.send(SendData::Buffers(10)).unwrap();
sender_input_2.send(SendData::Buffers(11)).unwrap();
- sender_input_3.send(SendData::Buffers(10)).unwrap();
+ sender_input_3.send(SendData::Buffers(11)).unwrap();
- // Sender 2 is waiting for sender 1 to continue, sender 1/3 are finished
+ // Sender 2/3 is waiting for sender 1 to continue
receiver_input_done_1.recv().unwrap();
- receiver_input_done_3.recv().unwrap();
// Stop recording and push new buffers to sender 1, which will advance
- // it and release the 11th buffer of sender 2 above
+ // it and release the 11th buffer of sender 2/3 above
togglerecord.set_property("record", &false).unwrap();
sender_input_1.send(SendData::Buffers(10)).unwrap();
+
receiver_input_done_2.recv().unwrap();
+ receiver_input_done_3.recv().unwrap();
- // Send another 9 buffers to sender 2, 1/2 are at the same position now
+ // Send another 9 buffers to sender 2/3, all streams are at the same position now
sender_input_2.send(SendData::Buffers(9)).unwrap();
-
- // Send the remaining 10 buffers to sender 3, all are at the same position now
- sender_input_3.send(SendData::Buffers(10)).unwrap();
+ sender_input_3.send(SendData::Buffers(9)).unwrap();
// Wait until all 20 buffers of all senders are done
receiver_input_done_1.recv().unwrap();