Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.freedesktop.org/gstreamer/gst-plugins-rs.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorVivia Nikolaidou <vivia@ahiru.eu>2023-04-27 19:59:19 +0300
committerVivia Nikolaidou <vivia@ahiru.eu>2023-06-14 15:58:04 +0300
commit063871a1ebbab48d1a13b209b39bd0ec9daa3f14 (patch)
treea0ade3c14d2f8b90847c08bd9bcdf6b7a2ca2fe9 /utils/togglerecord
parent4683291c1f6e84e373bc96410706ba689bf62e76 (diff)
togglerecord: Add support for non-live inputs
Live input + is-live=false: While not recording, drop input When recording is started, offset to collapse the gap Live input + is-live=true: While not recording, drop input Don't modify the offset Non-live input + is-live=false: While not recording, block input Don't modify the offset Non-live input + is-live=true: While not recording, block input When recording is started, offset to current running time Co-authored-by: Jan Alexander Steffens (heftig) <jan.steffens@ltnglobal.com> Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/1206>
Diffstat (limited to 'utils/togglerecord')
-rw-r--r--utils/togglerecord/Cargo.toml1
-rw-r--r--utils/togglerecord/README.md30
-rw-r--r--utils/togglerecord/src/togglerecord/imp.rs226
-rw-r--r--utils/togglerecord/tests/tests.rs518
4 files changed, 702 insertions, 73 deletions
diff --git a/utils/togglerecord/Cargo.toml b/utils/togglerecord/Cargo.toml
index 1e5368c8e..36edeb2c1 100644
--- a/utils/togglerecord/Cargo.toml
+++ b/utils/togglerecord/Cargo.toml
@@ -20,6 +20,7 @@ once_cell = "1.0"
[dev-dependencies]
either = "1.0"
+gst-check = { package = "gstreamer-check", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs" }
[lib]
name = "gsttogglerecord"
diff --git a/utils/togglerecord/README.md b/utils/togglerecord/README.md
new file mode 100644
index 000000000..b42e2a4ee
--- /dev/null
+++ b/utils/togglerecord/README.md
@@ -0,0 +1,30 @@
+# togglerecord
+
+A multistream valve-like plugin that ensures multiple streams start/end at the
+same time.
+
+It supports both live and non-live input and toggles recording via the
+`record` property. Live inputs will be dropped when not recording, while
+non-live inputs will be blocked.
+
+## Use cases
+
+The `is-live` property refers to whether the output of the element will be
+live. So, based on whether the input is live and on whether the output
+`is-live`, we have these four behaviours:
+
+- Live input + `is-live=false`:
+ - While not recording, drop input
+ - When recording is started, offset to collapse the gap
+
+- Live input + `is-live=true`:
+ - While not recording, drop input
+ - Don't modify the offset
+
+- Non-live input + `is-live=false`:
+ - While not recording, block input
+ - Don't modify the offset
+
+- Non-live input + `is-live=true`:
+ - While not recording, block input
+ - When recording is started, offset to current running time
diff --git a/utils/togglerecord/src/togglerecord/imp.rs b/utils/togglerecord/src/togglerecord/imp.rs
index d32ba3508..d66437229 100644
--- a/utils/togglerecord/src/togglerecord/imp.rs
+++ b/utils/togglerecord/src/togglerecord/imp.rs
@@ -6,12 +6,18 @@
//
// SPDX-License-Identifier: MPL-2.0
+/**
+ * element-togglerecord:
+ *
+ * {{ utils/togglerecord/README.md[2:30] }}
+ *
+ */
use gst::glib;
use gst::prelude::*;
use gst::subclass::prelude::*;
use once_cell::sync::Lazy;
-use parking_lot::{Condvar, Mutex};
+use parking_lot::{Condvar, Mutex, MutexGuard};
use std::cmp;
use std::collections::HashMap;
use std::f64;
@@ -72,6 +78,7 @@ struct StreamState {
flushing: bool,
segment_pending: bool,
discont_pending: bool,
+ upstream_live: Option<bool>,
pending_events: Vec<gst::Event>,
audio_info: Option<gst_audio::AudioInfo>,
video_info: Option<gst_video::VideoInfo>,
@@ -89,6 +96,7 @@ impl Default for StreamState {
flushing: false,
segment_pending: false,
discont_pending: true,
+ upstream_live: None,
pending_events: Vec::new(),
audio_info: None,
video_info: None,
@@ -104,7 +112,7 @@ impl Default for StreamState {
// Recording: Passing through all data
// Stopping: Main stream remembering current last_recording_stop, waiting for all
// other streams to reach this position
-// Stopped: Dropping all data
+// Stopped: Dropping (live input) or blocking (non-live input) all data
// Starting: Main stream waiting until next keyframe and setting last_recording_start, waiting
// for all other streams to reach this position
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
@@ -120,11 +128,21 @@ struct State {
recording_state: RecordingState,
last_recording_start: Option<gst::ClockTime>,
last_recording_stop: Option<gst::ClockTime>,
+
// Accumulated duration of previous recording segments,
// updated whenever going to Stopped
recording_duration: gst::ClockTime,
+
+ // Accumulated duration of blocked segments
+ blocked_duration: gst::ClockTime,
+
+ // What time we started blocking
+ time_start_block: Option<gst::ClockTime>,
+
// Updated whenever going to Recording
running_time_offset: i64,
+
+ // Copied from settings
live: bool,
}
@@ -135,6 +153,8 @@ impl Default for State {
last_recording_start: None,
last_recording_stop: None,
recording_duration: gst::ClockTime::ZERO,
+ blocked_duration: gst::ClockTime::ZERO,
+ time_start_block: gst::ClockTime::NONE,
running_time_offset: 0,
live: false,
}
@@ -146,7 +166,6 @@ enum HandleResult<T> {
Pass(T),
Drop,
Eos(bool),
- Flushing,
}
trait HandleData: Sized {
@@ -329,11 +348,54 @@ static CAT: Lazy<gst::DebugCategory> = Lazy::new(|| {
});
impl ToggleRecord {
+ fn block_if_upstream_not_live(
+ &self,
+ pad: &gst::Pad,
+ mut settings: Settings,
+ state: &mut MutexGuard<StreamState>,
+ upstream_live: bool,
+ ) -> Result<bool, gst::FlowError> {
+ if !upstream_live {
+ while !settings.record && !state.flushing {
+ gst::debug!(CAT, obj: pad, "Waiting for record=true");
+ self.main_stream_cond.wait(state);
+ settings = *self.settings.lock();
+ }
+ if state.flushing {
+ gst::debug!(CAT, obj: pad, "Flushing");
+ return Err(gst::FlowError::Flushing);
+ }
+ state.segment_pending = true;
+ state.discont_pending = true;
+ for other_stream in &self.other_streams.lock().0 {
+ let mut other_state = other_stream.state.lock();
+ other_state.segment_pending = true;
+ other_state.discont_pending = true;
+ }
+ let mut rec_state = self.state.lock();
+ if let Some(time_start_block) = rec_state.time_start_block {
+ let clock = self.obj().clock().expect("Cannot find pipeline clock");
+ rec_state.blocked_duration += clock.time().unwrap() - time_start_block;
+ if settings.live {
+ rec_state.running_time_offset = rec_state.blocked_duration.nseconds() as i64;
+ }
+ rec_state.time_start_block = gst::ClockTime::NONE;
+ }
+ drop(rec_state);
+ gst::log!(CAT, obj: pad, "Done blocking main stream");
+ Ok(true)
+ } else {
+ gst::log!(CAT, obj: pad, "Dropping buffer (stopped)");
+ Ok(false)
+ }
+ }
+
fn handle_main_stream<T: HandleData>(
&self,
pad: &gst::Pad,
stream: &Stream,
data: T,
+ upstream_live: bool,
) -> Result<HandleResult<T>, gst::FlowError> {
let mut state = stream.state.lock();
@@ -395,10 +457,14 @@ impl ToggleRecord {
let settings = *self.settings.lock();
- // First check if we have to update our recording state
+ // First check if we need to block for non-live input
let mut rec_state = self.state.lock();
+
+ // Check if we have to update our recording state
let settings_changed = match rec_state.recording_state {
RecordingState::Recording if !settings.record => {
+ let clock = self.obj().clock().expect("Cannot find pipeline clock");
+ rec_state.time_start_block = Some(clock.time().unwrap());
gst::debug!(CAT, obj: pad, "Stopping recording");
rec_state.recording_state = RecordingState::Stopping;
true
@@ -473,7 +539,7 @@ impl ToggleRecord {
if state.flushing {
gst::debug!(CAT, obj: pad, "Flushing");
- return Ok(HandleResult::Flushing);
+ return Err(gst::FlowError::Flushing);
}
let mut rec_state = self.state.lock();
@@ -493,17 +559,25 @@ impl ToggleRecord {
// Then become Stopped and drop this buffer. We always stop right before
// a keyframe
- gst::log!(CAT, obj: pad, "Dropping buffer (stopped)");
-
drop(rec_state);
+
+ let ret =
+ self.block_if_upstream_not_live(pad, settings, &mut state, upstream_live)?;
drop(state);
self.obj().notify("recording");
- Ok(HandleResult::Drop)
+ if ret {
+ Ok(HandleResult::Pass(data))
+ } else {
+ Ok(HandleResult::Drop)
+ }
}
RecordingState::Stopped => {
- gst::log!(CAT, obj: pad, "Dropping buffer (stopped)");
- Ok(HandleResult::Drop)
+ if self.block_if_upstream_not_live(pad, settings, &mut state, upstream_live)? {
+ Ok(HandleResult::Pass(data))
+ } else {
+ Ok(HandleResult::Drop)
+ }
}
RecordingState::Starting => {
// If this is no keyframe, we can directly go out again here and drop the frame
@@ -519,23 +593,35 @@ impl ToggleRecord {
.push_event(gst_video::UpstreamForceKeyUnitEvent::builder().build());
}
+ if !upstream_live {
+ gst::log!(
+ CAT,
+ obj: pad,
+ "Always passing data when upstream is not live"
+ );
+ return Ok(HandleResult::Pass(data));
+ }
return Ok(HandleResult::Drop);
}
// Remember the time when we started: now!
rec_state.last_recording_start = current_running_time;
- rec_state.running_time_offset =
- current_running_time.map_or(0, |current_running_time| {
- current_running_time
- .saturating_sub(rec_state.recording_duration)
- .nseconds()
- }) as i64;
+ // We made sure a few lines above, but let's be sure again
+ if !settings.live || upstream_live {
+ rec_state.running_time_offset =
+ 0 - current_running_time.map_or(0, |current_running_time| {
+ current_running_time
+ .saturating_sub(rec_state.recording_duration)
+ .nseconds()
+ }) as i64
+ };
gst::debug!(
CAT,
obj: pad,
- "Starting at {}, previous accumulated recording duration {}",
+ "Starting at {}, previous accumulated recording duration {}, offset {}",
current_running_time.display(),
rec_state.recording_duration,
+ rec_state.running_time_offset,
);
state.segment_pending = true;
@@ -566,7 +652,7 @@ impl ToggleRecord {
if state.flushing {
gst::debug!(CAT, obj: pad, "Flushing");
- return Ok(HandleResult::Flushing);
+ return Err(gst::FlowError::Flushing);
}
let mut rec_state = self.state.lock();
@@ -596,6 +682,7 @@ impl ToggleRecord {
pad: &gst::Pad,
stream: &Stream,
data: T,
+ upstream_live: bool,
) -> Result<HandleResult<T>, gst::FlowError> {
// Calculate end pts & current running time and make sure we stay in the segment
let mut state = stream.state.lock();
@@ -722,7 +809,7 @@ impl ToggleRecord {
if state.flushing {
gst::debug!(CAT, obj: pad, "Flushing");
- return Ok(HandleResult::Flushing);
+ return Err(gst::FlowError::Flushing);
}
// If the main stream is EOS, we are also EOS unless we are
@@ -889,6 +976,10 @@ impl ToggleRecord {
}
}
+ if !upstream_live {
+ return Ok(HandleResult::Pass(data));
+ }
+
match rec_state.recording_state {
RecordingState::Recording => {
// The end of our buffer must be before/at the end of the previous buffer of the main
@@ -1154,31 +1245,48 @@ impl ToggleRecord {
gst::FlowError::Error
})?;
- gst::log!(CAT, obj: pad, "Handling buffer {:?}", buffer);
+ let upstream_live;
{
- let state = stream.state.lock();
+ let mut state = stream.state.lock();
if state.eos {
return Err(gst::FlowError::Eos);
}
if state.flushing {
return Err(gst::FlowError::Flushing);
}
+ match state.upstream_live {
+ None => {
+ // Not handling anything here, the pad's query function will catch it
+ let mut query = gst::query::Latency::new();
+ let success = pad.peer_query(&mut query);
+ if success {
+ (upstream_live, _, _) = query.result();
+ state.upstream_live = Some(upstream_live);
+ } else {
+ state.upstream_live = None;
+ upstream_live = false;
+ gst::warning!(
+ CAT,
+ obj: pad,
+ "Latency query failed, assuming non-live input, will retry"
+ );
+ }
+ }
+ Some(is_live) => upstream_live = is_live,
+ }
}
let handle_result = if stream != self.main_stream {
- self.handle_secondary_stream(pad, &stream, buffer)
+ self.handle_secondary_stream(pad, &stream, buffer, upstream_live)
} else {
- self.handle_main_stream(pad, &stream, buffer)
+ self.handle_main_stream(pad, &stream, buffer, upstream_live)
}?;
let mut buffer = match handle_result {
HandleResult::Drop => {
return Ok(gst::FlowSuccess::Ok);
}
- HandleResult::Flushing => {
- return Err(gst::FlowError::Flushing);
- }
HandleResult::Eos(recording_state_updated) => {
stream.srcpad.push_event(
gst::event::Eos::builder()
@@ -1224,10 +1332,13 @@ impl ToggleRecord {
state.out_segment = state.in_segment.clone();
- if !rec_state.live {
+ // state.upstream_live should have a value from a few lines above
+ // segment offset is taken into account in case upstream is live and we are not
+ // (collapse gap)
+ if rec_state.live != upstream_live {
state
.out_segment
- .offset_running_time(-rec_state.running_time_offset)
+ .offset_running_time(rec_state.running_time_offset)
.expect("Adjusting record duration");
}
events.push(
@@ -1368,10 +1479,35 @@ impl ToggleRecord {
EventView::Gap(e) => {
gst::debug!(CAT, obj: pad, "Handling Gap event {:?}", event);
let (pts, duration) = e.get();
+ let upstream_live;
+
+ {
+ let mut state = stream.state.lock();
+ match state.upstream_live {
+ None => {
+ // Not handling anything here, the pad's query function will catch it
+ let mut query = gst::query::Latency::new();
+ let success = pad.peer_query(&mut query);
+ if success {
+ (upstream_live, _, _) = query.result();
+ state.upstream_live = Some(upstream_live);
+ } else {
+ state.upstream_live = None;
+ upstream_live = false;
+ gst::warning!(
+ CAT,
+ obj: pad,
+ "Latency query failed, assuming non-live input, will retry"
+ );
+ }
+ }
+ Some(is_live) => upstream_live = is_live,
+ }
+ }
let handle_result = if stream == self.main_stream {
- self.handle_main_stream(pad, &stream, (pts, duration))
+ self.handle_main_stream(pad, &stream, (pts, duration), upstream_live)
} else {
- self.handle_secondary_stream(pad, &stream, (pts, duration))
+ self.handle_secondary_stream(pad, &stream, (pts, duration), upstream_live)
};
forward = match handle_result {
@@ -1508,7 +1644,19 @@ impl ToggleRecord {
gst::log!(CAT, obj: pad, "Handling query {:?}", query);
- stream.srcpad.peer_query(query)
+ let success = stream.srcpad.peer_query(query);
+
+ if let gst::QueryView::Latency(latency) = query.view() {
+ let mut state = stream.state.lock();
+ if success {
+ let (is_live, _, _) = latency.result();
+ state.upstream_live = Some(is_live);
+ } else {
+ state.upstream_live = None;
+ }
+ }
+
+ success
}
// FIXME `matches!` was introduced in rustc 1.42.0, current MSRV is 1.41.0
@@ -1537,7 +1685,7 @@ impl ToggleRecord {
let offset = event.running_time_offset();
event
.make_mut()
- .set_running_time_offset(offset + rec_state.running_time_offset);
+ .set_running_time_offset(offset - rec_state.running_time_offset);
drop(rec_state);
if forward {
@@ -1795,8 +1943,12 @@ impl ObjectImpl for ToggleRecord {
.read_only()
.build(),
glib::ParamSpecBoolean::builder("is-live")
- .nick("Live mode")
- .blurb("Live mode: no \"gap eating\", forward incoming segment")
+ .nick("Live output mode")
+ .blurb(
+ "Live output mode: no \"gap eating\", \
+ forward incoming segment for live input, \
+ create a gap to fill the paused duration for non-live input",
+ )
.default_value(DEFAULT_LIVE)
.mutable_ready()
.build(),
@@ -1820,6 +1972,7 @@ impl ObjectImpl for ToggleRecord {
);
settings.record = record;
+ self.main_stream_cond.notify_all();
}
"is-live" => {
let mut settings = self.settings.lock();
@@ -1873,7 +2026,9 @@ impl ElementImpl for ToggleRecord {
gst::subclass::ElementMetadata::new(
"Toggle Record",
"Generic",
- "Valve that ensures multiple streams start/end at the same time",
+ "Valve that ensures multiple streams start/end at the same time. \
+ If the input comes from a live stream, when not recording it will be dropped. \
+ If it comes from a non-live stream, when not recording it will be blocked.",
"Sebastian Dröge <sebastian@centricular.com>",
)
});
@@ -1948,6 +2103,7 @@ impl ElementImpl for ToggleRecord {
let mut rec_state = self.state.lock();
*rec_state = State::default();
+
let settings = *self.settings.lock();
rec_state.live = settings.live;
}
diff --git a/utils/togglerecord/tests/tests.rs b/utils/togglerecord/tests/tests.rs
index 6836d9c6f..988a9b142 100644
--- a/utils/togglerecord/tests/tests.rs
+++ b/utils/togglerecord/tests/tests.rs
@@ -11,7 +11,7 @@ use gst::prelude::*;
use either::*;
use std::sync::{mpsc, Mutex};
-use std::thread;
+use std::{thread, time::Duration};
fn init() {
use std::sync::Once;
@@ -37,6 +37,7 @@ fn setup_sender_receiver(
togglerecord: &gst::Element,
pad: &str,
offset: gst::ClockTime,
+ live: bool,
) -> (
mpsc::Sender<SendData>,
mpsc::Receiver<()>,
@@ -62,6 +63,25 @@ fn setup_sender_receiver(
(srcpad, sinkpad)
};
+ sinkpad.add_probe(
+ gst::PadProbeType::QUERY_UPSTREAM,
+ move |_pad, probe_info| {
+ let query = match &mut probe_info.data {
+ Some(gst::PadProbeData::Query(q)) => q,
+ _ => unreachable!(),
+ };
+
+ use gst::QueryViewMut::*;
+ match query.view_mut() {
+ Latency(q) => {
+ q.set(live, gst::ClockTime::ZERO, None);
+ gst::PadProbeReturn::Handled
+ }
+ _ => gst::PadProbeReturn::Ok,
+ }
+ },
+ );
+
let fakesink_sinkpad = fakesink.static_pad("sink").unwrap();
srcpad.link(&fakesink_sinkpad).unwrap();
@@ -272,6 +292,76 @@ fn test_create_pads() {
}
#[test]
+fn test_one_stream_open_nonlivein_nonliveout() {
+ init();
+
+ let pipeline = gst::Pipeline::default();
+ let togglerecord = gst::ElementFactory::make("togglerecord")
+ .property("is-live", false)
+ .build()
+ .unwrap();
+ pipeline.add(&togglerecord).unwrap();
+
+ let (sender_input, _, receiver_output, thread) =
+ setup_sender_receiver(&pipeline, &togglerecord, "src", gst::ClockTime::ZERO, false);
+
+ pipeline.set_state(gst::State::Playing).unwrap();
+
+ togglerecord.set_property("record", true);
+ sender_input.send(SendData::Buffers(10)).unwrap();
+ drop(sender_input);
+
+ let mut segment = gst::FormattedSegment::<gst::ClockTime>::new();
+ let (buffers, _) = recv_buffers(&receiver_output, &mut segment, 0);
+ assert_eq!(buffers.len(), 10);
+ for (index, &(running_time, pts, duration)) in buffers.iter().enumerate() {
+ let index = index as u64;
+ assert_eq!(running_time.unwrap(), index * 20.mseconds());
+ assert_eq!(pts.unwrap(), index * 20.mseconds());
+ assert_eq!(duration.unwrap(), 20.mseconds());
+ }
+
+ thread.join().unwrap();
+
+ pipeline.set_state(gst::State::Null).unwrap();
+}
+
+#[test]
+fn test_one_stream_open_nonlivein_liveout() {
+ init();
+
+ let pipeline = gst::Pipeline::default();
+ let togglerecord = gst::ElementFactory::make("togglerecord")
+ .property("is-live", true)
+ .build()
+ .unwrap();
+ pipeline.add(&togglerecord).unwrap();
+
+ let (sender_input, _, receiver_output, thread) =
+ setup_sender_receiver(&pipeline, &togglerecord, "src", gst::ClockTime::ZERO, false);
+
+ pipeline.set_state(gst::State::Playing).unwrap();
+
+ togglerecord.set_property("record", true);
+ sender_input.send(SendData::Buffers(10)).unwrap();
+ drop(sender_input);
+
+ let mut segment = gst::FormattedSegment::<gst::ClockTime>::new();
+ let (buffers, _) = recv_buffers(&receiver_output, &mut segment, 0);
+ assert_eq!(buffers.len(), 10);
+ for (index, &(running_time, pts, duration)) in buffers.iter().enumerate() {
+ let index = index as u64;
+ assert_eq!(running_time.unwrap(), index * 20.mseconds());
+ assert_eq!(pts.unwrap(), index * 20.mseconds());
+ assert_eq!(duration.unwrap(), 20.mseconds());
+ }
+
+ thread.join().unwrap();
+
+ pipeline.set_state(gst::State::Null).unwrap();
+}
+
+#[test]
fn test_one_stream_open() {
init();
@@ -280,7 +370,7 @@ fn test_one_stream_open() {
pipeline.add(&togglerecord).unwrap();
let (sender_input, _, receiver_output, thread) =
- setup_sender_receiver(&pipeline, &togglerecord, "src", gst::ClockTime::ZERO);
+ setup_sender_receiver(&pipeline, &togglerecord, "src", gst::ClockTime::ZERO, true);
pipeline.set_state(gst::State::Playing).unwrap();
@@ -312,7 +402,7 @@ fn test_one_stream_gaps_open() {
pipeline.add(&togglerecord).unwrap();
let (sender_input, _, receiver_output, thread) =
- setup_sender_receiver(&pipeline, &togglerecord, "src", gst::ClockTime::ZERO);
+ setup_sender_receiver(&pipeline, &togglerecord, "src", gst::ClockTime::ZERO, true);
pipeline.set_state(gst::State::Playing).unwrap();
@@ -345,7 +435,7 @@ fn test_one_stream_close_open() {
pipeline.add(&togglerecord).unwrap();
let (sender_input, receiver_input_done, receiver_output, thread) =
- setup_sender_receiver(&pipeline, &togglerecord, "src", gst::ClockTime::ZERO);
+ setup_sender_receiver(&pipeline, &togglerecord, "src", gst::ClockTime::ZERO, true);
pipeline.set_state(gst::State::Playing).unwrap();
@@ -379,7 +469,7 @@ fn test_one_stream_open_close() {
pipeline.add(&togglerecord).unwrap();
let (sender_input, receiver_input_done, receiver_output, thread) =
- setup_sender_receiver(&pipeline, &togglerecord, "src", gst::ClockTime::ZERO);
+ setup_sender_receiver(&pipeline, &togglerecord, "src", gst::ClockTime::ZERO, true);
pipeline.set_state(gst::State::Playing).unwrap();
@@ -414,7 +504,7 @@ fn test_one_stream_open_close_open() {
pipeline.add(&togglerecord).unwrap();
let (sender_input, receiver_input_done, receiver_output, thread) =
- setup_sender_receiver(&pipeline, &togglerecord, "src", gst::ClockTime::ZERO);
+ setup_sender_receiver(&pipeline, &togglerecord, "src", gst::ClockTime::ZERO, true);
pipeline.set_state(gst::State::Playing).unwrap();
@@ -458,9 +548,15 @@ fn test_two_stream_open() {
pipeline.add(&togglerecord).unwrap();
let (sender_input_1, receiver_input_done_1, receiver_output_1, thread_1) =
- setup_sender_receiver(&pipeline, &togglerecord, "src", gst::ClockTime::ZERO);
+ setup_sender_receiver(&pipeline, &togglerecord, "src", gst::ClockTime::ZERO, true);
let (sender_input_2, receiver_input_done_2, receiver_output_2, thread_2) =
- setup_sender_receiver(&pipeline, &togglerecord, "src_%u", gst::ClockTime::ZERO);
+ setup_sender_receiver(
+ &pipeline,
+ &togglerecord,
+ "src_%u",
+ gst::ClockTime::ZERO,
+ true,
+ );
pipeline.set_state(gst::State::Playing).unwrap();
@@ -511,9 +607,9 @@ fn test_two_stream_open_shift() {
pipeline.add(&togglerecord).unwrap();
let (sender_input_1, receiver_input_done_1, receiver_output_1, thread_1) =
- setup_sender_receiver(&pipeline, &togglerecord, "src", gst::ClockTime::ZERO);
+ setup_sender_receiver(&pipeline, &togglerecord, "src", gst::ClockTime::ZERO, true);
let (sender_input_2, receiver_input_done_2, receiver_output_2, thread_2) =
- setup_sender_receiver(&pipeline, &togglerecord, "src_%u", 5.mseconds());
+ setup_sender_receiver(&pipeline, &togglerecord, "src_%u", 5.mseconds(), true);
pipeline.set_state(gst::State::Playing).unwrap();
@@ -568,9 +664,15 @@ fn test_two_stream_open_shift_main() {
pipeline.add(&togglerecord).unwrap();
let (sender_input_1, receiver_input_done_1, receiver_output_1, thread_1) =
- setup_sender_receiver(&pipeline, &togglerecord, "src", 5.mseconds());
+ setup_sender_receiver(&pipeline, &togglerecord, "src", 5.mseconds(), true);
let (sender_input_2, receiver_input_done_2, receiver_output_2, thread_2) =
- setup_sender_receiver(&pipeline, &togglerecord, "src_%u", gst::ClockTime::ZERO);
+ setup_sender_receiver(
+ &pipeline,
+ &togglerecord,
+ "src_%u",
+ gst::ClockTime::ZERO,
+ true,
+ );
pipeline.set_state(gst::State::Playing).unwrap();
@@ -633,9 +735,15 @@ fn test_two_stream_open_close() {
pipeline.add(&togglerecord).unwrap();
let (sender_input_1, receiver_input_done_1, receiver_output_1, thread_1) =
- setup_sender_receiver(&pipeline, &togglerecord, "src", gst::ClockTime::ZERO);
+ setup_sender_receiver(&pipeline, &togglerecord, "src", gst::ClockTime::ZERO, true);
let (sender_input_2, receiver_input_done_2, receiver_output_2, thread_2) =
- setup_sender_receiver(&pipeline, &togglerecord, "src_%u", gst::ClockTime::ZERO);
+ setup_sender_receiver(
+ &pipeline,
+ &togglerecord,
+ "src_%u",
+ gst::ClockTime::ZERO,
+ true,
+ );
pipeline.set_state(gst::State::Playing).unwrap();
@@ -702,9 +810,15 @@ fn test_two_stream_close_open() {
pipeline.add(&togglerecord).unwrap();
let (sender_input_1, receiver_input_done_1, receiver_output_1, thread_1) =
- setup_sender_receiver(&pipeline, &togglerecord, "src", gst::ClockTime::ZERO);
+ setup_sender_receiver(&pipeline, &togglerecord, "src", gst::ClockTime::ZERO, true);
let (sender_input_2, receiver_input_done_2, receiver_output_2, thread_2) =
- setup_sender_receiver(&pipeline, &togglerecord, "src_%u", gst::ClockTime::ZERO);
+ setup_sender_receiver(
+ &pipeline,
+ &togglerecord,
+ "src_%u",
+ gst::ClockTime::ZERO,
+ true,
+ );
pipeline.set_state(gst::State::Playing).unwrap();
@@ -763,6 +877,256 @@ fn test_two_stream_close_open() {
}
#[test]
+fn test_two_stream_open_close_open_nonlivein_liveout() {
+ init();
+
+ let testclock = gst_check::TestClock::new();
+ let pipeline = gst::Pipeline::default();
+ pipeline.use_clock(Some(&testclock));
+ let togglerecord = gst::ElementFactory::make("togglerecord")
+ .property("is-live", true)
+ .build()
+ .unwrap();
+ togglerecord.set_clock(Some(&testclock)).unwrap();
+ pipeline.add(&togglerecord).unwrap();
+ let testclock = testclock.downcast::<gst_check::TestClock>().unwrap();
+ testclock.set_time(gst::ClockTime::ZERO);
+
+ let main_buffers_before_gap = 10u64;
+ let secondary_buffers_before_gap = main_buffers_before_gap + 1;
+ let buffers_in_gap = 10u64;
+ let buffers_after_gap = 10u64;
+ let recv_timeout = Duration::from_secs(10);
+
+ let (sender_input_1, receiver_input_done_1, receiver_output_1, thread_1) =
+ setup_sender_receiver(&pipeline, &togglerecord, "src", gst::ClockTime::ZERO, false);
+ let (sender_input_2, receiver_input_done_2, receiver_output_2, thread_2) =
+ setup_sender_receiver(
+ &pipeline,
+ &togglerecord,
+ "src_%u",
+ gst::ClockTime::ZERO,
+ false,
+ );
+
+ pipeline.set_state(gst::State::Playing).unwrap();
+
+ togglerecord.set_property("record", true);
+
+ sender_input_1
+ .send(SendData::Buffers(main_buffers_before_gap as usize))
+ .unwrap();
+ sender_input_2
+ .send(SendData::Buffers(
+ (secondary_buffers_before_gap - 1) as usize,
+ ))
+ .unwrap();
+
+ // Sender 2 is waiting for sender 1 to continue, sender 1 is finished
+ receiver_input_done_1.recv_timeout(recv_timeout).unwrap();
+ receiver_input_done_2.recv_timeout(recv_timeout).unwrap();
+ sender_input_2.send(SendData::Buffers(1)).unwrap();
+ assert_eq!(
+ receiver_input_done_2.recv_timeout(Duration::from_millis(20)),
+ Err(mpsc::RecvTimeoutError::Timeout)
+ );
+
+ // Stop recording and push new buffers to sender 1, this will block
+ togglerecord.set_property("record", false);
+ sender_input_1
+ .send(SendData::Buffers(buffers_in_gap as usize))
+ .unwrap();
+
+ // Send another 10 buffers to sender 2, both are the same position at 9 buffers, the next one
+ // will block until record=true
+ sender_input_2
+ .send(SendData::Buffers(buffers_in_gap as usize))
+ .unwrap();
+
+ receiver_input_done_2.recv_timeout(recv_timeout).unwrap();
+ // Advance the clock
+ let block_time = gst::ClockTime::from_mseconds(42);
+ testclock.advance_time(block_time.nseconds() as i64);
+
+ // Start recording again and send another set of buffers to both senders
+ togglerecord.set_property("record", true);
+ sender_input_1
+ .send(SendData::Buffers(buffers_after_gap as usize))
+ .unwrap();
+ sender_input_2
+ .send(SendData::Buffers(buffers_after_gap as usize))
+ .unwrap();
+ receiver_input_done_1.recv_timeout(recv_timeout).unwrap();
+ // The single buffer above for sender 1 should be handled now
+ receiver_input_done_2.recv_timeout(recv_timeout).unwrap();
+
+ // Send EOS and wait for it to be handled
+ sender_input_1.send(SendData::Eos).unwrap();
+ sender_input_2.send(SendData::Eos).unwrap();
+ receiver_input_done_1.recv_timeout(recv_timeout).unwrap();
+ receiver_input_done_2.recv_timeout(recv_timeout).unwrap();
+ receiver_input_done_2.recv_timeout(recv_timeout).unwrap();
+
+ let mut segment_1 = gst::FormattedSegment::<gst::ClockTime>::new();
+ let (buffers_1, _) = recv_buffers(
+ &receiver_output_1,
+ &mut segment_1,
+ main_buffers_before_gap as usize,
+ );
+ for (index, &(running_time, pts, duration)) in buffers_1.iter().enumerate() {
+ let index = index as u64;
+ assert_eq!(running_time.unwrap(), index * 20.mseconds());
+ assert_eq!(pts.unwrap(), index * 20.mseconds());
+ assert_eq!(duration.unwrap(), 20.mseconds());
+ }
+ let (buffers_1, _) = recv_buffers(&receiver_output_1, &mut segment_1, 0);
+ for (index, &(running_time, pts, duration)) in buffers_1.iter().enumerate() {
+ let index = index as u64;
+ assert_eq!(
+ running_time.unwrap(),
+ block_time + (index + main_buffers_before_gap) * 20.mseconds()
+ );
+ assert_eq!(
+ pts.unwrap(),
+ (index + main_buffers_before_gap) * 20.mseconds()
+ );
+ assert_eq!(duration.unwrap(), 20.mseconds());
+ }
+ assert_eq!(
+ buffers_1.len(),
+ (buffers_in_gap + buffers_after_gap) as usize
+ );
+
+ // Last buffer should be dropped from second stream
+ let mut segment_2 = gst::FormattedSegment::<gst::ClockTime>::new();
+ let (buffers_2, _) = recv_buffers(
+ &receiver_output_2,
+ &mut segment_2,
+ secondary_buffers_before_gap as usize,
+ );
+ for (index, &(running_time, pts, duration)) in buffers_2.iter().enumerate() {
+ let index = index as u64;
+ assert_eq!(running_time.unwrap(), index * 20.mseconds());
+ assert_eq!(pts.unwrap(), index * 20.mseconds());
+ assert_eq!(duration.unwrap(), 20.mseconds());
+ }
+ let (buffers_2, _) = recv_buffers(&receiver_output_2, &mut segment_2, 0);
+ for (index, &(running_time, pts, duration)) in buffers_2.iter().enumerate() {
+ let index = index as u64;
+ assert_eq!(
+ running_time.unwrap(),
+ block_time + (index + secondary_buffers_before_gap) * 20.mseconds()
+ );
+ assert_eq!(
+ pts.unwrap(),
+ (index + secondary_buffers_before_gap) * 20.mseconds()
+ );
+ assert_eq!(duration.unwrap(), 20.mseconds());
+ }
+ assert_eq!(
+ buffers_2.len(),
+ (buffers_in_gap + buffers_after_gap - 1) as usize
+ );
+
+ thread_1.join().unwrap();
+ thread_2.join().unwrap();
+
+ pipeline.set_state(gst::State::Null).unwrap();
+}
+#[test]
+fn test_two_stream_open_close_open_nonlivein_nonliveout() {
+ init();
+
+ let pipeline = gst::Pipeline::default();
+ let togglerecord = gst::ElementFactory::make("togglerecord")
+ .property("is-live", false)
+ .build()
+ .unwrap();
+ pipeline.add(&togglerecord).unwrap();
+
+ let (sender_input_1, receiver_input_done_1, receiver_output_1, thread_1) =
+ setup_sender_receiver(&pipeline, &togglerecord, "src", gst::ClockTime::ZERO, false);
+ let (sender_input_2, receiver_input_done_2, receiver_output_2, thread_2) =
+ setup_sender_receiver(
+ &pipeline,
+ &togglerecord,
+ "src_%u",
+ gst::ClockTime::ZERO,
+ false,
+ );
+
+ let recv_timeout = Duration::from_secs(10);
+
+ pipeline.set_state(gst::State::Playing).unwrap();
+
+ togglerecord.set_property("record", true);
+
+ sender_input_1.send(SendData::Buffers(10)).unwrap();
+ sender_input_2.send(SendData::Buffers(10)).unwrap();
+
+ receiver_input_done_1.recv_timeout(recv_timeout).unwrap();
+ receiver_input_done_2.recv_timeout(recv_timeout).unwrap();
+
+ // Sender 2 is waiting for sender 1 to continue, sender 1 is finished
+ sender_input_2.send(SendData::Buffers(1)).unwrap();
+ assert_eq!(
+ receiver_input_done_2.recv_timeout(Duration::from_millis(20)),
+ Err(mpsc::RecvTimeoutError::Timeout)
+ );
+
+ // Stop recording and push new buffers to sender 1, this will block
+ togglerecord.set_property("record", false);
+ sender_input_1.send(SendData::Buffers(10)).unwrap();
+
+ // Send another 9 buffers to sender 2, both are the same position now
+ sender_input_2.send(SendData::Buffers(9)).unwrap();
+
+ // Send another buffer to sender 2, this will block until record=true
+ sender_input_2.send(SendData::Buffers(1)).unwrap();
+
+ // Start recording again and send another set of buffers to both senders
+ togglerecord.set_property("record", true);
+ sender_input_1.send(SendData::Buffers(10)).unwrap();
+ sender_input_2.send(SendData::Buffers(10)).unwrap();
+ receiver_input_done_1.recv_timeout(recv_timeout).unwrap();
+ // The single buffer above for sender 1 should be handled now
+ receiver_input_done_2.recv_timeout(recv_timeout).unwrap();
+
+ // Send EOS and wait for it to be handled
+ sender_input_1.send(SendData::Eos).unwrap();
+ sender_input_2.send(SendData::Eos).unwrap();
+ receiver_input_done_1.recv_timeout(recv_timeout).unwrap();
+ receiver_input_done_2.recv_timeout(recv_timeout).unwrap();
+ receiver_input_done_2.recv_timeout(recv_timeout).unwrap();
+
+ let mut segment_1 = gst::FormattedSegment::<gst::ClockTime>::new();
+ let (buffers_1, _) = recv_buffers(&receiver_output_1, &mut segment_1, 0);
+ for (index, &(running_time, pts, duration)) in buffers_1.iter().enumerate() {
+ let index = index as u64;
+ assert_eq!(running_time.unwrap(), index * 20.mseconds());
+ assert_eq!(pts.unwrap(), index * 20.mseconds());
+ assert_eq!(duration.unwrap(), 20.mseconds());
+ }
+ assert_eq!(buffers_1.len(), 30);
+
+ // Last buffer should be dropped from second stream
+ let mut segment_2 = gst::FormattedSegment::<gst::ClockTime>::new();
+ let (buffers_2, _) = recv_buffers(&receiver_output_2, &mut segment_2, 0);
+ for (index, &(running_time, pts, duration)) in buffers_2.iter().enumerate() {
+ let index = index as u64;
+ assert_eq!(running_time.unwrap(), index * 20.mseconds());
+ assert_eq!(pts.unwrap(), index * 20.mseconds());
+ assert_eq!(duration.unwrap(), 20.mseconds());
+ }
+ assert_eq!(buffers_2.len(), 30);
+
+ thread_1.join().unwrap();
+ thread_2.join().unwrap();
+
+ pipeline.set_state(gst::State::Null).unwrap();
+}
+
+#[test]
fn test_two_stream_open_close_open() {
init();
@@ -771,9 +1135,15 @@ fn test_two_stream_open_close_open() {
pipeline.add(&togglerecord).unwrap();
let (sender_input_1, receiver_input_done_1, receiver_output_1, thread_1) =
- setup_sender_receiver(&pipeline, &togglerecord, "src", gst::ClockTime::ZERO);
+ setup_sender_receiver(&pipeline, &togglerecord, "src", gst::ClockTime::ZERO, true);
let (sender_input_2, receiver_input_done_2, receiver_output_2, thread_2) =
- setup_sender_receiver(&pipeline, &togglerecord, "src_%u", gst::ClockTime::ZERO);
+ setup_sender_receiver(
+ &pipeline,
+ &togglerecord,
+ "src_%u",
+ gst::ClockTime::ZERO,
+ true,
+ );
pipeline.set_state(gst::State::Playing).unwrap();
@@ -865,9 +1235,15 @@ fn test_two_stream_open_close_open_gaps() {
pipeline.add(&togglerecord).unwrap();
let (sender_input_1, receiver_input_done_1, receiver_output_1, thread_1) =
- setup_sender_receiver(&pipeline, &togglerecord, "src", gst::ClockTime::ZERO);
+ setup_sender_receiver(&pipeline, &togglerecord, "src", gst::ClockTime::ZERO, true);
let (sender_input_2, receiver_input_done_2, receiver_output_2, thread_2) =
- setup_sender_receiver(&pipeline, &togglerecord, "src_%u", gst::ClockTime::ZERO);
+ setup_sender_receiver(
+ &pipeline,
+ &togglerecord,
+ "src_%u",
+ gst::ClockTime::ZERO,
+ true,
+ );
pipeline.set_state(gst::State::Playing).unwrap();
@@ -965,9 +1341,15 @@ fn test_two_stream_close_open_close_delta() {
pipeline.add(&togglerecord).unwrap();
let (sender_input_1, receiver_input_done_1, receiver_output_1, thread_1) =
- setup_sender_receiver(&pipeline, &togglerecord, "src", gst::ClockTime::ZERO);
+ setup_sender_receiver(&pipeline, &togglerecord, "src", gst::ClockTime::ZERO, true);
let (sender_input_2, receiver_input_done_2, receiver_output_2, thread_2) =
- setup_sender_receiver(&pipeline, &togglerecord, "src_%u", gst::ClockTime::ZERO);
+ setup_sender_receiver(
+ &pipeline,
+ &togglerecord,
+ "src_%u",
+ gst::ClockTime::ZERO,
+ true,
+ );
pipeline.set_state(gst::State::Playing).unwrap();
@@ -1054,11 +1436,23 @@ fn test_three_stream_open_close_open() {
pipeline.add(&togglerecord).unwrap();
let (sender_input_1, receiver_input_done_1, receiver_output_1, thread_1) =
- setup_sender_receiver(&pipeline, &togglerecord, "src", gst::ClockTime::ZERO);
+ setup_sender_receiver(&pipeline, &togglerecord, "src", gst::ClockTime::ZERO, true);
let (sender_input_2, receiver_input_done_2, receiver_output_2, thread_2) =
- setup_sender_receiver(&pipeline, &togglerecord, "src_%u", gst::ClockTime::ZERO);
+ setup_sender_receiver(
+ &pipeline,
+ &togglerecord,
+ "src_%u",
+ gst::ClockTime::ZERO,
+ true,
+ );
let (sender_input_3, receiver_input_done_3, receiver_output_3, thread_3) =
- setup_sender_receiver(&pipeline, &togglerecord, "src_%u", gst::ClockTime::ZERO);
+ setup_sender_receiver(
+ &pipeline,
+ &togglerecord,
+ "src_%u",
+ gst::ClockTime::ZERO,
+ true,
+ );
pipeline.set_state(gst::State::Playing).unwrap();
@@ -1178,9 +1572,15 @@ fn test_two_stream_main_eos() {
pipeline.add(&togglerecord).unwrap();
let (sender_input_1, receiver_input_done_1, receiver_output_1, thread_1) =
- setup_sender_receiver(&pipeline, &togglerecord, "src", gst::ClockTime::ZERO);
+ setup_sender_receiver(&pipeline, &togglerecord, "src", gst::ClockTime::ZERO, true);
let (sender_input_2, receiver_input_done_2, receiver_output_2, thread_2) =
- setup_sender_receiver(&pipeline, &togglerecord, "src_%u", gst::ClockTime::ZERO);
+ setup_sender_receiver(
+ &pipeline,
+ &togglerecord,
+ "src_%u",
+ gst::ClockTime::ZERO,
+ true,
+ );
pipeline.set_state(gst::State::Playing).unwrap();
@@ -1253,9 +1653,15 @@ fn test_two_stream_secondary_eos_first() {
pipeline.add(&togglerecord).unwrap();
let (sender_input_1, receiver_input_done_1, receiver_output_1, thread_1) =
- setup_sender_receiver(&pipeline, &togglerecord, "src", gst::ClockTime::ZERO);
+ setup_sender_receiver(&pipeline, &togglerecord, "src", gst::ClockTime::ZERO, true);
let (sender_input_2, receiver_input_done_2, receiver_output_2, thread_2) =
- setup_sender_receiver(&pipeline, &togglerecord, "src_%u", gst::ClockTime::ZERO);
+ setup_sender_receiver(
+ &pipeline,
+ &togglerecord,
+ "src_%u",
+ gst::ClockTime::ZERO,
+ true,
+ );
pipeline.set_state(gst::State::Playing).unwrap();
@@ -1321,11 +1727,23 @@ fn test_three_stream_main_eos() {
pipeline.add(&togglerecord).unwrap();
let (sender_input_1, receiver_input_done_1, receiver_output_1, thread_1) =
- setup_sender_receiver(&pipeline, &togglerecord, "src", gst::ClockTime::ZERO);
+ setup_sender_receiver(&pipeline, &togglerecord, "src", gst::ClockTime::ZERO, true);
let (sender_input_2, receiver_input_done_2, receiver_output_2, thread_2) =
- setup_sender_receiver(&pipeline, &togglerecord, "src_%u", gst::ClockTime::ZERO);
+ setup_sender_receiver(
+ &pipeline,
+ &togglerecord,
+ "src_%u",
+ gst::ClockTime::ZERO,
+ true,
+ );
let (sender_input_3, receiver_input_done_3, receiver_output_3, thread_3) =
- setup_sender_receiver(&pipeline, &togglerecord, "src_%u", gst::ClockTime::ZERO);
+ setup_sender_receiver(
+ &pipeline,
+ &togglerecord,
+ "src_%u",
+ gst::ClockTime::ZERO,
+ true,
+ );
pipeline.set_state(gst::State::Playing).unwrap();
@@ -1422,11 +1840,23 @@ fn test_three_stream_main_and_second_eos() {
pipeline.add(&togglerecord).unwrap();
let (sender_input_1, receiver_input_done_1, receiver_output_1, thread_1) =
- setup_sender_receiver(&pipeline, &togglerecord, "src", gst::ClockTime::ZERO);
+ setup_sender_receiver(&pipeline, &togglerecord, "src", gst::ClockTime::ZERO, true);
let (sender_input_2, receiver_input_done_2, receiver_output_2, thread_2) =
- setup_sender_receiver(&pipeline, &togglerecord, "src_%u", gst::ClockTime::ZERO);
+ setup_sender_receiver(
+ &pipeline,
+ &togglerecord,
+ "src_%u",
+ gst::ClockTime::ZERO,
+ true,
+ );
let (sender_input_3, receiver_input_done_3, receiver_output_3, thread_3) =
- setup_sender_receiver(&pipeline, &togglerecord, "src_%u", gst::ClockTime::ZERO);
+ setup_sender_receiver(
+ &pipeline,
+ &togglerecord,
+ "src_%u",
+ gst::ClockTime::ZERO,
+ true,
+ );
pipeline.set_state(gst::State::Playing).unwrap();
@@ -1523,11 +1953,23 @@ fn test_three_stream_secondary_eos_first() {
pipeline.add(&togglerecord).unwrap();
let (sender_input_1, receiver_input_done_1, receiver_output_1, thread_1) =
- setup_sender_receiver(&pipeline, &togglerecord, "src", gst::ClockTime::ZERO);
+ setup_sender_receiver(&pipeline, &togglerecord, "src", gst::ClockTime::ZERO, true);
let (sender_input_2, receiver_input_done_2, receiver_output_2, thread_2) =
- setup_sender_receiver(&pipeline, &togglerecord, "src_%u", gst::ClockTime::ZERO);
+ setup_sender_receiver(
+ &pipeline,
+ &togglerecord,
+ "src_%u",
+ gst::ClockTime::ZERO,
+ true,
+ );
let (sender_input_3, receiver_input_done_3, receiver_output_3, thread_3) =
- setup_sender_receiver(&pipeline, &togglerecord, "src_%u", gst::ClockTime::ZERO);
+ setup_sender_receiver(
+ &pipeline,
+ &togglerecord,
+ "src_%u",
+ gst::ClockTime::ZERO,
+ true,
+ );
pipeline.set_state(gst::State::Playing).unwrap();