authorArun Raghavan <arun@asymptotic.io>2020-04-05 17:38:52 +0300
committerArun Raghavan <arun@arunraghavan.net>2020-04-05 22:10:47 +0300
commitdc3c8fd0494056ae5e5f87aa716b4c3866af6591 (patch)
tree3226cac3439fd29b7b02f6456f3162d6a3d1e00a /utils/togglerecord
parent205b6040fbb918c0fa736874b09f8e3f3f261e44 (diff)
Drop gst-plugin- prefix in plugin directory name
Diffstat (limited to 'utils/togglerecord')
7 files changed, 3869 insertions, 0 deletions
diff --git a/utils/togglerecord/Cargo.toml b/utils/togglerecord/Cargo.toml
new file mode 100644
index 00000000..df94b677
--- /dev/null
+++ b/utils/togglerecord/Cargo.toml
@@ -0,0 +1,34 @@
+name = "gst-plugin-togglerecord"
+version = "0.6.0"
+authors = ["Sebastian Dröge <sebastian@centricular.com>"]
+license = "LGPL-2.1+"
+description = "Toggle Record Plugin"
+repository = "https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs"
+edition = "2018"
+glib = { git = "https://github.com/gtk-rs/glib" }
+gstreamer = { git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs" }
+gstreamer-audio = { git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs" }
+gstreamer-video = { git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs" }
+gtk = { git = "https://github.com/gtk-rs/gtk", optional = true }
+gio = { git = "https://github.com/gtk-rs/gio", optional = true }
+parking_lot = "0.10"
+lazy_static = "1.0"
+either = "1.0"
+name = "gsttogglerecord"
+crate-type = ["cdylib", "rlib"]
+path = "src/lib.rs"
+name = "gtk-recording"
+path = "examples/gtk_recording.rs"
+required-features = ["gtk", "gio"]
+gst-plugin-version-helper = { path="../../version-helper" }
diff --git a/utils/togglerecord/LICENSE b/utils/togglerecord/LICENSE
new file mode 100644
index 00000000..4362b491
--- /dev/null
+++ b/utils/togglerecord/LICENSE
diff --git a/utils/togglerecord/build.rs b/utils/togglerecord/build.rs
new file mode 100644
index 00000000..0d1ddb61
--- /dev/null
+++ b/utils/togglerecord/build.rs
@@ -0,0 +1,5 @@
+extern crate gst_plugin_version_helper;
+fn main() {
+ gst_plugin_version_helper::get_info()
diff --git a/utils/togglerecord/examples/gtk_recording.rs b/utils/togglerecord/examples/gtk_recording.rs
new file mode 100644
index 00000000..8f0bc0bb
--- /dev/null
+++ b/utils/togglerecord/examples/gtk_recording.rs
@@ -0,0 +1,358 @@
+// Copyright (C) 2017 Sebastian Dröge <sebastian@centricular.com>
+// This library is free software; you can redistribute it and/or
+// modify it under the terms of the GNU Library General Public
+// License as published by the Free Software Foundation; either
+// version 2 of the License, or (at your option) any later version.
+// This library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// Library General Public License for more details.
+// You should have received a copy of the GNU Library General Public
+// License along with this library; if not, write to the
+// Free Software Foundation, Inc., 51 Franklin Street, Suite 500,
+// Boston, MA 02110-1335, USA.
+extern crate glib;
+use glib::prelude::*;
+extern crate gio;
+use gio::prelude::*;
+extern crate gstreamer as gst;
+use gst::prelude::*;
+extern crate gsttogglerecord;
+extern crate gtk;
+use gtk::prelude::*;
+use std::cell::RefCell;
+use std::env;
+fn create_pipeline() -> (
+ gst::Pipeline,
+ gst::Pad,
+ gst::Pad,
+ gst::Element,
+ gst::Element,
+ gtk::Widget,
+) {
+ let pipeline = gst::Pipeline::new(None);
+ let video_src = gst::ElementFactory::make("videotestsrc", None).unwrap();
+ video_src.set_property("is-live", &true).unwrap();
+ video_src.set_property_from_str("pattern", "ball");
+ let timeoverlay = gst::ElementFactory::make("timeoverlay", None).unwrap();
+ timeoverlay
+ .set_property("font-desc", &"Monospace 20")
+ .unwrap();
+ let video_tee = gst::ElementFactory::make("tee", None).unwrap();
+ let video_queue1 = gst::ElementFactory::make("queue", None).unwrap();
+ let video_queue2 = gst::ElementFactory::make("queue", None).unwrap();
+ let video_convert1 = gst::ElementFactory::make("videoconvert", None).unwrap();
+ let video_convert2 = gst::ElementFactory::make("videoconvert", None).unwrap();
+ let (video_sink, video_widget) =
+ if let Ok(gtkglsink) = gst::ElementFactory::make("gtkglsink", None) {
+ let glsinkbin = gst::ElementFactory::make("glsinkbin", None).unwrap();
+ glsinkbin.set_property("sink", &gtkglsink).unwrap();
+ let widget = gtkglsink.get_property("widget").unwrap();
+ (glsinkbin, widget.get::<gtk::Widget>().unwrap().unwrap())
+ } else {
+ let sink = gst::ElementFactory::make("gtksink", None).unwrap();
+ let widget = sink.get_property("widget").unwrap();
+ (sink, widget.get::<gtk::Widget>().unwrap().unwrap())
+ };
+ let video_enc = gst::ElementFactory::make("x264enc", None).unwrap();
+ video_enc.set_property("rc-lookahead", &10i32).unwrap();
+ video_enc.set_property("key-int-max", &30u32).unwrap();
+ let video_parse = gst::ElementFactory::make("h264parse", None).unwrap();
+ let audio_src = gst::ElementFactory::make("audiotestsrc", None).unwrap();
+ audio_src.set_property("is-live", &true).unwrap();
+ audio_src.set_property_from_str("wave", "ticks");
+ let audio_tee = gst::ElementFactory::make("tee", None).unwrap();
+ let audio_queue1 = gst::ElementFactory::make("queue", None).unwrap();
+ let audio_queue2 = gst::ElementFactory::make("queue", None).unwrap();
+ let audio_convert1 = gst::ElementFactory::make("audioconvert", None).unwrap();
+ let audio_convert2 = gst::ElementFactory::make("audioconvert", None).unwrap();
+ let audio_sink = gst::ElementFactory::make("autoaudiosink", None).unwrap();
+ let audio_enc = gst::ElementFactory::make("lamemp3enc", None).unwrap();
+ let audio_parse = gst::ElementFactory::make("mpegaudioparse", None).unwrap();
+ let togglerecord = gst::ElementFactory::make("togglerecord", None).unwrap();
+ let mux_queue1 = gst::ElementFactory::make("queue", None).unwrap();
+ let mux_queue2 = gst::ElementFactory::make("queue", None).unwrap();
+ let mux = gst::ElementFactory::make("mp4mux", None).unwrap();
+ let file_sink = gst::ElementFactory::make("filesink", None).unwrap();
+ file_sink
+ .set_property("location", &"recording.mp4")
+ .unwrap();
+ file_sink.set_property("async", &false).unwrap();
+ file_sink.set_property("sync", &false).unwrap();
+ pipeline
+ .add_many(&[
+ &video_src,
+ &timeoverlay,
+ &video_tee,
+ &video_queue1,
+ &video_queue2,
+ &video_convert1,
+ &video_convert2,
+ &video_sink,
+ &video_enc,
+ &video_parse,
+ &audio_src,
+ &audio_tee,
+ &audio_queue1,
+ &audio_queue2,
+ &audio_convert1,
+ &audio_convert2,
+ &audio_sink,
+ &audio_enc,
+ &audio_parse,
+ &togglerecord,
+ &mux_queue1,
+ &mux_queue2,
+ &mux,
+ &file_sink,
+ ])
+ .unwrap();
+ gst::Element::link_many(&[
+ &video_src,
+ &timeoverlay,
+ &video_tee,
+ &video_queue1,
+ &video_convert1,
+ &video_sink,
+ ])
+ .unwrap();
+ gst::Element::link_many(&[
+ &video_tee,
+ &video_queue2,
+ &video_convert2,
+ &video_enc,
+ &video_parse,
+ ])
+ .unwrap();
+ video_parse
+ .link_pads(Some("src"), &togglerecord, Some("sink"))
+ .unwrap();
+ togglerecord
+ .link_pads(Some("src"), &mux_queue1, Some("sink"))
+ .unwrap();
+ mux_queue1
+ .link_pads(Some("src"), &mux, Some("video_%u"))
+ .unwrap();
+ gst::Element::link_many(&[
+ &audio_src,
+ &audio_tee,
+ &audio_queue1,
+ &audio_convert1,
+ &audio_sink,
+ ])
+ .unwrap();
+ gst::Element::link_many(&[
+ &audio_tee,
+ &audio_queue2,
+ &audio_convert2,
+ &audio_enc,
+ &audio_parse,
+ ])
+ .unwrap();
+ audio_parse
+ .link_pads(Some("src"), &togglerecord, Some("sink_0"))
+ .unwrap();
+ togglerecord
+ .link_pads(Some("src_0"), &mux_queue2, Some("sink"))
+ .unwrap();
+ mux_queue2
+ .link_pads(Some("src"), &mux, Some("audio_%u"))
+ .unwrap();
+ gst::Element::link_many(&[&mux, &file_sink]).unwrap();
+ (
+ pipeline,
+ video_queue2.get_static_pad("sink").unwrap(),
+ audio_queue2.get_static_pad("sink").unwrap(),
+ togglerecord,
+ video_sink,
+ video_widget,
+ )
+fn create_ui(app: &gtk::Application) {
+ let (pipeline, video_pad, audio_pad, togglerecord, video_sink, video_widget) =
+ create_pipeline();
+ let window = gtk::Window::new(gtk::WindowType::Toplevel);
+ window.set_default_size(320, 240);
+ let vbox = gtk::Box::new(gtk::Orientation::Vertical, 0);
+ vbox.pack_start(&video_widget, true, true, 0);
+ let hbox = gtk::Box::new(gtk::Orientation::Horizontal, 0);
+ let position_label = gtk::Label::new(Some("Position: 00:00:00"));
+ hbox.pack_start(&position_label, true, true, 5);
+ let recorded_duration_label = gtk::Label::new(Some("Recorded: 00:00:00"));
+ hbox.pack_start(&recorded_duration_label, true, true, 5);
+ vbox.pack_start(&hbox, false, false, 5);
+ let hbox = gtk::Box::new(gtk::Orientation::Horizontal, 0);
+ let record_button = gtk::Button::new_with_label("Record");
+ hbox.pack_start(&record_button, true, true, 5);
+ let finish_button = gtk::Button::new_with_label("Finish");
+ hbox.pack_start(&finish_button, true, true, 5);
+ vbox.pack_start(&hbox, false, false, 5);
+ window.add(&vbox);
+ window.show_all();
+ app.add_window(&window);
+ let video_sink_weak = video_sink.downgrade();
+ let togglerecord_weak = togglerecord.downgrade();
+ let timeout_id = gtk::timeout_add(100, move || {
+ let video_sink = match video_sink_weak.upgrade() {
+ Some(video_sink) => video_sink,
+ None => return glib::Continue(true),
+ };
+ let togglerecord = match togglerecord_weak.upgrade() {
+ Some(togglerecord) => togglerecord,
+ None => return glib::Continue(true),
+ };
+ let position = video_sink
+ .query_position::<gst::ClockTime>()
+ .unwrap_or_else(|| 0.into());
+ position_label.set_text(&format!("Position: {:.1}", position));
+ let recording_duration = togglerecord
+ .get_static_pad("src")
+ .unwrap()
+ .query_position::<gst::ClockTime>()
+ .unwrap_or_else(|| 0.into());
+ recorded_duration_label.set_text(&format!("Recorded: {:.1}", recording_duration));
+ glib::Continue(true)
+ });
+ let togglerecord_weak = togglerecord.downgrade();
+ record_button.connect_clicked(move |button| {
+ let togglerecord = match togglerecord_weak.upgrade() {
+ Some(togglerecord) => togglerecord,
+ None => return,
+ };
+ let recording = !togglerecord
+ .get_property("record")
+ .unwrap()
+ .get_some::<bool>()
+ .unwrap();
+ togglerecord.set_property("record", &recording).unwrap();
+ button.set_label(if recording { "Stop" } else { "Record" });
+ });
+ let record_button_weak = record_button.downgrade();
+ finish_button.connect_clicked(move |button| {
+ let record_button = match record_button_weak.upgrade() {
+ Some(record_button) => record_button,
+ None => return,
+ };
+ record_button.set_sensitive(false);
+ button.set_sensitive(false);
+ video_pad.send_event(gst::Event::new_eos().build());
+ audio_pad.send_event(gst::Event::new_eos().build());
+ });
+ let app_weak = app.downgrade();
+ window.connect_delete_event(move |_, _| {
+ let app = match app_weak.upgrade() {
+ Some(app) => app,
+ None => return Inhibit(false),
+ };
+ app.quit();
+ Inhibit(false)
+ });
+ let bus = pipeline.get_bus().unwrap();
+ let app_weak = app.downgrade();
+ bus.add_watch_local(move |_, msg| {
+ use gst::MessageView;
+ let app = match app_weak.upgrade() {
+ Some(app) => app,
+ None => return glib::Continue(false),
+ };
+ match msg.view() {
+ MessageView::Eos(..) => app.quit(),
+ MessageView::Error(err) => {
+ println!(
+ "Error from {:?}: {} ({:?})",
+ msg.get_src().map(|s| s.get_path_string()),
+ err.get_error(),
+ err.get_debug()
+ );
+ app.quit();
+ }
+ _ => (),
+ };
+ glib::Continue(true)
+ })
+ .expect("Failed to add bus watch");
+ pipeline.set_state(gst::State::Playing).unwrap();
+ // Pipeline reference is owned by the closure below, so will be
+ // destroyed once the app is destroyed
+ let timeout_id = RefCell::new(Some(timeout_id));
+ app.connect_shutdown(move |_| {
+ pipeline.set_state(gst::State::Null).unwrap();
+ bus.remove_watch().unwrap();
+ if let Some(timeout_id) = timeout_id.borrow_mut().take() {
+ glib::source_remove(timeout_id);
+ }
+ });
+fn main() {
+ gst::init().unwrap();
+ gtk::init().unwrap();
+ gsttogglerecord::plugin_register_static().expect("Failed to register togglerecord plugin");
+ let app = gtk::Application::new(None, gio::ApplicationFlags::FLAGS_NONE).unwrap();
+ app.connect_activate(create_ui);
+ let args = env::args().collect::<Vec<_>>();
+ app.run(&args);
diff --git a/utils/togglerecord/src/lib.rs b/utils/togglerecord/src/lib.rs
new file mode 100644
index 00000000..9f215466
--- /dev/null
+++ b/utils/togglerecord/src/lib.rs
@@ -0,0 +1,48 @@
+// Copyright (C) 2017 Sebastian Dröge <sebastian@centricular.com>
+// This library is free software; you can redistribute it and/or
+// modify it under the terms of the GNU Library General Public
+// License as published by the Free Software Foundation; either
+// version 2 of the License, or (at your option) any later version.
+// This library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// Library General Public License for more details.
+// You should have received a copy of the GNU Library General Public
+// License along with this library; if not, write to the
+// Free Software Foundation, Inc., 51 Franklin Street, Suite 500,
+// Boston, MA 02110-1335, USA.
+#![crate_type = "cdylib"]
+extern crate glib;
+extern crate gstreamer as gst;
+extern crate gstreamer_audio as gst_audio;
+extern crate gstreamer_video as gst_video;
+extern crate lazy_static;
+extern crate parking_lot;
+mod togglerecord;
+fn plugin_init(plugin: &gst::Plugin) -> Result<(), glib::BoolError> {
+ togglerecord::register(plugin)
+ togglerecord,
+ plugin_init,
+ concat!(env!("CARGO_PKG_VERSION"), "-", env!("COMMIT_ID")),
+ "LGPL",
+ env!("CARGO_PKG_NAME"),
+ env!("CARGO_PKG_NAME"),
+ env!("BUILD_REL_DATE")
diff --git a/utils/togglerecord/src/togglerecord.rs b/utils/togglerecord/src/togglerecord.rs
new file mode 100644
index 00000000..07872b73
--- /dev/null
+++ b/utils/togglerecord/src/togglerecord.rs
@@ -0,0 +1,1749 @@
+// Copyright (C) 2017 Sebastian Dröge <sebastian@centricular.com>
+// This library is free software; you can redistribute it and/or
+// modify it under the terms of the GNU Library General Public
+// License as published by the Free Software Foundation; either
+// version 2 of the License, or (at your option) any later version.
+// This library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// Library General Public License for more details.
+// You should have received a copy of the GNU Library General Public
+// License along with this library; if not, write to the
+// Free Software Foundation, Inc., 51 Franklin Street, Suite 500,
+// Boston, MA 02110-1335, USA.
+use glib;
+use glib::prelude::*;
+use glib::subclass;
+use glib::subclass::prelude::*;
+use gst;
+use gst::prelude::*;
+use gst::subclass::prelude::*;
+use gst_audio;
+use gst_video;
+use parking_lot::{Condvar, Mutex};
+use std::cmp;
+use std::collections::HashMap;
+use std::f64;
+use std::iter;
+use std::sync::Arc;
+const DEFAULT_RECORD: bool = false;
+#[derive(Debug, Clone, Copy)]
+struct Settings {
+ record: bool,
+impl Default for Settings {
+ fn default() -> Self {
+ Settings {
+ }
+ }
+static PROPERTIES: [subclass::Property; 2] = [
+ subclass::Property("record", |name| {
+ glib::ParamSpec::boolean(
+ name,
+ "Record",
+ "Enable/disable recording",
+ glib::ParamFlags::READWRITE,
+ )
+ }),
+ subclass::Property("recording", |name| {
+ glib::ParamSpec::boolean(
+ name,
+ "Recording",
+ "Whether recording is currently taking place",
+ glib::ParamFlags::READABLE,
+ )
+ }),
+struct Stream {
+ sinkpad: gst::Pad,
+ srcpad: gst::Pad,
+ state: Arc<Mutex<StreamState>>,
+impl PartialEq for Stream {
+ fn eq(&self, other: &Self) -> bool {
+ self.sinkpad == other.sinkpad && self.srcpad == other.srcpad
+ }
+impl Eq for Stream {}
+impl Stream {
+ fn new(sinkpad: gst::Pad, srcpad: gst::Pad) -> Self {
+ Self {
+ sinkpad,
+ srcpad,
+ state: Arc::new(Mutex::new(StreamState::default())),
+ }
+ }
+struct StreamState {
+ in_segment: gst::FormattedSegment<gst::ClockTime>,
+ out_segment: gst::FormattedSegment<gst::ClockTime>,
+ segment_seqnum: gst::Seqnum,
+ current_running_time: gst::ClockTime,
+ eos: bool,
+ flushing: bool,
+ segment_pending: bool,
+ pending_events: Vec<gst::Event>,
+ audio_info: Option<gst_audio::AudioInfo>,
+ video_info: Option<gst_video::VideoInfo>,
+impl Default for StreamState {
+ fn default() -> Self {
+ Self {
+ in_segment: gst::FormattedSegment::new(),
+ out_segment: gst::FormattedSegment::new(),
+ segment_seqnum: gst::Seqnum::next(),
+ current_running_time: gst::CLOCK_TIME_NONE,
+ eos: false,
+ flushing: false,
+ segment_pending: false,
+ pending_events: Vec::new(),
+ audio_info: None,
+ video_info: None,
+ }
+ }
+// Recording behaviour:
+// Secondary streams are *always* behind main stream
+// Main stream EOS stops recording (-> Stopping), makes secondary streams go EOS
+// Recording: Passing through all data
+// Stopping: Main stream remembering current last_recording_stop, waiting for all
+// other streams to reach this position
+// Stopped: Dropping all data
+// Starting: Main stream waiting until next keyframe and setting last_recording_start, waiting
+// for all other streams to reach this position
+#[derive(Clone, Copy, Debug, PartialEq, Eq)]
+enum RecordingState {
+ Recording,
+ Stopping,
+ Stopped,
+ Starting,
+struct State {
+ recording_state: RecordingState,
+ last_recording_start: gst::ClockTime,
+ last_recording_stop: gst::ClockTime,
+ // Accumulated duration of previous recording segments,
+ // updated whenever going to Stopped
+ recording_duration: gst::ClockTime,
+ // Updated whenever going to Recording
+ running_time_offset: gst::ClockTime,
+impl Default for State {
+ fn default() -> Self {
+ Self {
+ recording_state: RecordingState::Stopped,
+ last_recording_start: gst::CLOCK_TIME_NONE,
+ last_recording_stop: gst::CLOCK_TIME_NONE,
+ recording_duration: 0.into(),
+ running_time_offset: gst::CLOCK_TIME_NONE,
+ }
+ }
+#[derive(Debug, PartialEq, Eq)]
+enum HandleResult<T> {
+ Pass(T),
+ Drop,
+ Eos,
+ Flushing,
+trait HandleData: Sized {
+ fn get_pts(&self) -> gst::ClockTime;
+ fn get_dts(&self) -> gst::ClockTime;
+ fn get_dts_or_pts(&self) -> gst::ClockTime {
+ let dts = self.get_dts();
+ if dts.is_some() {
+ dts
+ } else {
+ self.get_pts()
+ }
+ }
+ fn get_duration(&self, state: &StreamState) -> gst::ClockTime;
+ fn is_keyframe(&self) -> bool;
+ fn can_clip(&self, state: &StreamState) -> bool;
+ fn clip(
+ self,
+ state: &StreamState,
+ segment: &gst::FormattedSegment<gst::ClockTime>,
+ ) -> Option<Self>;
+impl HandleData for (gst::ClockTime, gst::ClockTime) {
+ fn get_pts(&self) -> gst::ClockTime {
+ self.0
+ }
+ fn get_dts(&self) -> gst::ClockTime {
+ self.0
+ }
+ fn get_duration(&self, _state: &StreamState) -> gst::ClockTime {
+ self.1
+ }
+ fn is_keyframe(&self) -> bool {
+ true
+ }
+ fn can_clip(&self, _state: &StreamState) -> bool {
+ true
+ }
+ fn clip(
+ self,
+ _state: &StreamState,
+ segment: &gst::FormattedSegment<gst::ClockTime>,
+ ) -> Option<Self> {
+ let stop = if self.1.is_some() {
+ self.0 + self.1
+ } else {
+ self.0
+ };
+ segment
+ .clip(self.0, stop)
+ .map(|(start, stop)| (start, stop - start))
+ }
+impl HandleData for gst::Buffer {
+ fn get_pts(&self) -> gst::ClockTime {
+ gst::BufferRef::get_pts(self)
+ }
+ fn get_dts(&self) -> gst::ClockTime {
+ gst::BufferRef::get_dts(self)
+ }
+ fn get_duration(&self, state: &StreamState) -> gst::ClockTime {
+ let duration = gst::BufferRef::get_duration(self);
+ if duration.is_some() {
+ duration
+ } else if let Some(ref video_info) = state.video_info {
+ if video_info.fps() != 0.into() {
+ gst::SECOND
+ .mul_div_floor(
+ *video_info.fps().denom() as u64,
+ *video_info.fps().numer() as u64,
+ )
+ .unwrap_or(gst::CLOCK_TIME_NONE)
+ } else {
+ }
+ } else if let Some(ref audio_info) = state.audio_info {
+ if audio_info.bpf() == 0 || audio_info.rate() == 0 {
+ return gst::CLOCK_TIME_NONE;
+ }
+ let size = self.get_size() as u64;
+ let num_samples = size / audio_info.bpf() as u64;
+ gst::SECOND
+ .mul_div_floor(num_samples, audio_info.rate() as u64)
+ .unwrap_or(gst::CLOCK_TIME_NONE)
+ } else {
+ }
+ }
+ fn is_keyframe(&self) -> bool {
+ !gst::BufferRef::get_flags(self).contains(gst::BufferFlags::DELTA_UNIT)
+ }
+ fn can_clip(&self, state: &StreamState) -> bool {
+ // Only do actual clipping for raw audio/video
+ if let Some(ref audio_info) = state.audio_info {
+ if audio_info.format() == gst_audio::AudioFormat::Unknown
+ || audio_info.format() == gst_audio::AudioFormat::Encoded
+ || audio_info.rate() == 0
+ || audio_info.bpf() == 0
+ {
+ return false;
+ }
+ } else if let Some(ref video_info) = state.video_info {
+ if video_info.format() == gst_video::VideoFormat::Unknown
+ || video_info.format() == gst_video::VideoFormat::Encoded
+ || self.get_dts_or_pts() != self.get_pts()
+ {
+ return false;
+ }
+ } else {
+ return false;
+ }
+ true
+ }
+ fn clip(
+ mut self,
+ state: &StreamState,
+ segment: &gst::FormattedSegment<gst::ClockTime>,
+ ) -> Option<Self> {
+ // Only do actual clipping for raw audio/video
+ if !self.can_clip(state) {
+ return Some(self);
+ }
+ let pts = HandleData::get_pts(&self);
+ let duration = HandleData::get_duration(&self, state);
+ let stop = if duration.is_some() {
+ pts + duration
+ } else {
+ pts
+ };
+ if let Some(ref audio_info) = state.audio_info {
+ gst_audio::audio_buffer_clip(
+ self,
+ segment.upcast_ref(),
+ audio_info.rate(),
+ audio_info.bpf(),
+ )
+ } else if state.video_info.is_some() {
+ segment.clip(pts, stop).map(move |(start, stop)| {
+ {
+ let buffer = self.make_mut();
+ buffer.set_pts(start);
+ buffer.set_dts(start);
+ buffer.set_duration(stop - start);
+ }
+ self
+ })
+ } else {
+ unreachable!();
+ }
+ }
+struct ToggleRecord {
+ settings: Mutex<Settings>,
+ state: Mutex<State>,
+ main_stream: Stream,
+ // Always must have main_stream.state locked!
+ // If multiple stream states have to be locked, the
+ // main_stream always comes first
+ main_stream_cond: Condvar,
+ other_streams: Mutex<(Vec<Stream>, u32)>,
+ pads: Mutex<HashMap<gst::Pad, Stream>>,
+lazy_static! {
+ static ref CAT: gst::DebugCategory = gst::DebugCategory::new(
+ "togglerecord",
+ gst::DebugColorFlags::empty(),
+ Some("Toggle Record Element"),
+ );
+impl ToggleRecord {
+ fn set_pad_functions(sinkpad: &gst::Pad, srcpad: &gst::Pad) {
+ sinkpad.set_chain_function(|pad, parent, buffer| {
+ ToggleRecord::catch_panic_pad_function(
+ parent,
+ || Err(gst::FlowError::Error),
+ |togglerecord, element| togglerecord.sink_chain(pad, element, buffer),
+ )
+ });
+ sinkpad.set_event_function(|pad, parent, event| {
+ ToggleRecord::catch_panic_pad_function(
+ parent,
+ || false,
+ |togglerecord, element| togglerecord.sink_event(pad, element, event),
+ )
+ });
+ sinkpad.set_query_function(|pad, parent, query| {
+ ToggleRecord::catch_panic_pad_function(
+ parent,
+ || false,
+ |togglerecord, element| togglerecord.sink_query(pad, element, query),
+ )
+ });
+ sinkpad.set_iterate_internal_links_function(|pad, parent| {
+ ToggleRecord::catch_panic_pad_function(
+ parent,
+ || gst::Iterator::from_vec(vec![]),
+ |togglerecord, element| togglerecord.iterate_internal_links(pad, element),
+ )
+ });
+ srcpad.set_event_function(|pad, parent, event| {
+ ToggleRecord::catch_panic_pad_function(
+ parent,
+ || false,
+ |togglerecord, element| togglerecord.src_event(pad, element, event),
+ )
+ });
+ srcpad.set_query_function(|pad, parent, query| {
+ ToggleRecord::catch_panic_pad_function(
+ parent,
+ || false,
+ |togglerecord, element| togglerecord.src_query(pad, element, query),
+ )
+ });
+ srcpad.set_iterate_internal_links_function(|pad, parent| {
+ ToggleRecord::catch_panic_pad_function(
+ parent,
+ || gst::Iterator::from_vec(vec![]),
+ |togglerecord, element| togglerecord.iterate_internal_links(pad, element),
+ )
+ });
+ }
+ fn handle_main_stream<T: HandleData>(
+ &self,
+ element: &gst::Element,
+ pad: &gst::Pad,
+ stream: &Stream,
+ data: T,
+ ) -> Result<HandleResult<T>, gst::FlowError> {
+ let mut state = stream.state.lock();
+ let mut dts_or_pts = data.get_dts_or_pts();
+ let duration = data.get_duration(&state);
+ if !dts_or_pts.is_some() {
+ gst_element_error!(
+ element,
+ gst::StreamError::Format,
+ ["Buffer without DTS or PTS"]
+ );
+ return Err(gst::FlowError::Error);
+ }
+ let mut dts_or_pts_end = if duration.is_some() {
+ dts_or_pts + duration
+ } else {
+ dts_or_pts
+ };
+ let data = match data.clip(&state, &state.in_segment) {
+ None => {
+ gst_log!(CAT, obj: pad, "Dropping raw data outside segment");
+ return Ok(HandleResult::Drop);
+ }
+ Some(data) => data,
+ };
+ // This will only do anything for non-raw data
+ dts_or_pts = cmp::max(state.in_segment.get_start(), dts_or_pts);
+ dts_or_pts_end = cmp::max(state.in_segment.get_start(), dts_or_pts_end);
+ if state.in_segment.get_stop().is_some() {
+ dts_or_pts = cmp::min(state.in_segment.get_stop(), dts_or_pts);
+ dts_or_pts_end = cmp::min(state.in_segment.get_stop(), dts_or_pts_end);
+ }
+ let current_running_time = state.in_segment.to_running_time(dts_or_pts);
+ let current_running_time_end = state.in_segment.to_running_time(dts_or_pts_end);
+ state.current_running_time = cmp::max(current_running_time_end, state.current_running_time);
+ // Wake up everybody, we advanced a bit
+ // Important: They will only be able to advance once we're done with this
+ // function or waiting for them to catch up below, otherwise they might
+ // get the wrong state
+ self.main_stream_cond.notify_all();
+ gst_log!(
+ CAT,
+ obj: pad,
+ "Main stream current running time {}-{} (position: {}-{})",
+ current_running_time,
+ current_running_time_end,
+ dts_or_pts,
+ dts_or_pts_end
+ );
+ let settings = *self.settings.lock();
+ // First check if we have to update our recording state
+ let mut rec_state = self.state.lock();
+ let settings_changed = match rec_state.recording_state {
+ RecordingState::Recording if !settings.record => {
+ gst_debug!(CAT, obj: pad, "Stopping recording");
+ rec_state.recording_state = RecordingState::Stopping;
+ true
+ }
+ RecordingState::Stopped if settings.record => {
+ gst_debug!(CAT, obj: pad, "Starting recording");
+ rec_state.recording_state = RecordingState::Starting;
+ true
+ }
+ _ => false,
+ };
+ match rec_state.recording_state {
+ RecordingState::Recording => {
+ // Remember where we stopped last, in case of EOS
+ rec_state.last_recording_stop = current_running_time_end;
+ gst_log!(CAT, obj: pad, "Passing buffer (recording)");
+ Ok(HandleResult::Pass(data))
+ }
+ RecordingState::Stopping => {
+ if !data.is_keyframe() {
+ // Remember where we stopped last, in case of EOS
+ rec_state.last_recording_stop = current_running_time_end;
+ gst_log!(CAT, obj: pad, "Passing non-keyframe buffer (stopping)");
+ drop(rec_state);
+ drop(state);
+ if settings_changed {
+ gst_debug!(CAT, obj: pad, "Requesting a new keyframe");
+ stream
+ .sinkpad
+ .push_event(gst_video::new_upstream_force_key_unit_event().build());
+ }
+ return Ok(HandleResult::Pass(data));
+ }
+ // Remember the time when we stopped: now, i.e. right before the current buffer!
+ rec_state.last_recording_stop = current_running_time;
+ gst_debug!(CAT, obj: pad, "Stopping at {}", current_running_time);
+ // Then unlock and wait for all other streams to reach it or go EOS instead.
+ drop(rec_state);
+ while !self.other_streams.lock().0.iter().all(|s| {
+ let s = s.state.lock();
+ s.eos
+ || (s.current_running_time.is_some()
+ && s.current_running_time >= current_running_time_end)
+ }) {
+ gst_log!(CAT, obj: pad, "Waiting for other streams to stop");
+ self.main_stream_cond.wait(&mut state);
+ }
+ if state.flushing {
+ gst_debug!(CAT, obj: pad, "Flushing");
+ return Ok(HandleResult::Flushing);
+ }
+ let mut rec_state = self.state.lock();
+ rec_state.recording_state = RecordingState::Stopped;
+ let advance_by = rec_state.last_recording_stop - rec_state.last_recording_start;
+ rec_state.recording_duration += advance_by;
+ rec_state.last_recording_start = gst::CLOCK_TIME_NONE;
+ rec_state.last_recording_stop = gst::CLOCK_TIME_NONE;
+ gst_debug!(
+ CAT,
+ obj: pad,
+ "Stopped at {}, recording duration {}",
+ current_running_time,
+ rec_state.recording_duration
+ );
+ // Then become Stopped and drop this buffer. We always stop right before
+ // a keyframe
+ gst_log!(CAT, obj: pad, "Dropping buffer (stopped)");
+ drop(rec_state);
+ drop(state);
+ element.notify("recording");
+ Ok(HandleResult::Drop)
+ }
+ RecordingState::Stopped => {
+ gst_log!(CAT, obj: pad, "Dropping buffer (stopped)");
+ Ok(HandleResult::Drop)
+ }
+ RecordingState::Starting => {
+ // If this is no keyframe, we can directly go out again here and drop the frame
+ if !data.is_keyframe() {
+ gst_log!(CAT, obj: pad, "Dropping non-keyframe buffer (starting)");
+ drop(rec_state);
+ drop(state);
+ if settings_changed {
+ gst_debug!(CAT, obj: pad, "Requesting a new keyframe");
+ stream
+ .sinkpad
+ .push_event(gst_video::new_upstream_force_key_unit_event().build());
+ }
+ return Ok(HandleResult::Drop);
+ }
+ // Remember the time when we started: now!
+ rec_state.last_recording_start = current_running_time;
+ rec_state.running_time_offset = current_running_time - rec_state.recording_duration;
+ gst_debug!(CAT, obj: pad, "Starting at {}", current_running_time);
+ state.segment_pending = true;
+ for other_stream in &self.other_streams.lock().0 {
+ other_stream.state.lock().segment_pending = true;
+ }
+ // Then unlock and wait for all other streams to reach
+ // it or go EOS instead
+ drop(rec_state);
+ while !self.other_streams.lock().0.iter().all(|s| {
+ let s = s.state.lock();
+ s.eos
+ || (s.current_running_time.is_some()
+ && s.current_running_time >= current_running_time_end)
+ }) {
+ gst_log!(CAT, obj: pad, "Waiting for other streams to start");
+ self.main_stream_cond.wait(&mut state);
+ }
+ if state.flushing {
+ gst_debug!(CAT, obj: pad, "Flushing");
+ return Ok(HandleResult::Flushing);
+ }
+ let mut rec_state = self.state.lock();
+ rec_state.recording_state = RecordingState::Recording;
+ gst_debug!(
+ CAT,
+ obj: pad,
+ "Started at {}, recording duration {}",
+ current_running_time,
+ rec_state.recording_duration
+ );
+ gst_log!(CAT, obj: pad, "Passing buffer (recording)");
+ drop(rec_state);
+ drop(state);
+ element.notify("recording");
+ Ok(HandleResult::Pass(data))
+ }
+ }
+ }
+ fn handle_secondary_stream<T: HandleData>(
+ &self,
+ element: &gst::Element,
+ pad: &gst::Pad,
+ stream: &Stream,
+ data: T,
+ ) -> Result<HandleResult<T>, gst::FlowError> {
+ // Calculate end pts & current running time and make sure we stay in the segment
+ let mut state = stream.state.lock();
+ let mut pts = data.get_pts();
+ let duration = data.get_duration(&state);
+ if pts.is_none() {
+ gst_element_error!(element, gst::StreamError::Format, ["Buffer without PTS"]);
+ return Err(gst::FlowError::Error);
+ }
+ let dts = data.get_dts();
+ if dts.is_some() && pts.is_some() && dts != pts {
+ gst_element_error!(
+ element,
+ gst::StreamError::Format,
+ ["DTS != PTS not supported for secondary streams"]
+ );
+ return Err(gst::FlowError::Error);
+ }
+ if !data.is_keyframe() {
+ gst_element_error!(
+ element,
+ gst::StreamError::Format,
+ ["Delta-units not supported for secondary streams"]
+ );
+ return Err(gst::FlowError::Error);
+ }
+ let mut pts_end = if duration.is_some() {
+ pts + duration
+ } else {
+ pts
+ };
+ let data = match data.clip(&state, &state.in_segment) {
+ None => {
+ gst_log!(CAT, obj: pad, "Dropping raw data outside segment");
+ return Ok(HandleResult::Drop);
+ }
+ Some(data) => data,
+ };
+ // This will only do anything for non-raw data
+ pts = cmp::max(state.in_segment.get_start(), pts);
+ pts_end = cmp::max(state.in_segment.get_start(), pts_end);
+ if state.in_segment.get_stop().is_some() {
+ pts = cmp::min(state.in_segment.get_stop(), pts);
+ pts_end = cmp::min(state.in_segment.get_stop(), pts_end);
+ }
+ let current_running_time = state.in_segment.to_running_time(pts);
+ let current_running_time_end = state.in_segment.to_running_time(pts_end);
+ state.current_running_time = cmp::max(current_running_time_end, state.current_running_time);
+ gst_log!(
+ CAT,
+ obj: pad,
+ "Secondary stream current running time {}-{} (position: {}-{}",
+ current_running_time,
+ current_running_time_end,
+ pts,
+ pts_end
+ );
+ drop(state);
+ let mut main_state = self.main_stream.state.lock();
+ // Wake up, in case the main stream is waiting for us to progress up to here. We progressed
+ // above but all notifying must happen while the main_stream state is locked as per above.
+ self.main_stream_cond.notify_all();
+ while (main_state.current_running_time == gst::CLOCK_TIME_NONE
+ || main_state.current_running_time < current_running_time_end)
+ && !main_state.eos
+ && !stream.state.lock().flushing
+ {
+ gst_log!(
+ CAT,
+ obj: pad,
+ "Waiting for reaching {} / EOS / flushing, main stream at {}",
+ current_running_time,
+ main_state.current_running_time
+ );
+ self.main_stream_cond.wait(&mut main_state);
+ }
+ state = stream.state.lock();
+ if state.flushing {
+ gst_debug!(CAT, obj: pad, "Flushing");
+ return Ok(HandleResult::Flushing);
+ }
+ let rec_state = self.state.lock();
+ // If the main stream is EOS, we are also EOS unless we are
+ // before the final last recording stop running time
+ if main_state.eos {
+ // If we have no start or stop position (we never recorded) then we're EOS too now
+ if rec_state.last_recording_stop.is_none() || rec_state.last_recording_start.is_none() {
+ gst_debug!(CAT, obj: pad, "Main stream EOS and recording never started",);
+ return Ok(HandleResult::Eos);
+ } else if data.can_clip(&*state)
+ && current_running_time < rec_state.last_recording_start
+ && current_running_time_end > rec_state.last_recording_start
+ {
+ // Otherwise if we're before the recording start but the end of the buffer is after
+ // the start and we can clip, clip the buffer and pass it onwards.
+ gst_debug!(
+ CAT,
+ obj: pad,
+ "Main stream EOS and we're not EOS yet (overlapping recording start, {} < {} < {})",
+ current_running_time,
+ rec_state.last_recording_start,
+ current_running_time_end
+ );
+ let mut clip_start = state
+ .in_segment
+ .position_from_running_time(rec_state.last_recording_start);
+ if clip_start.is_none() {
+ clip_start = state.in_segment.get_start();
+ }
+ let mut clip_stop = state
+ .in_segment
+ .position_from_running_time(rec_state.last_recording_stop);
+ if clip_stop.is_none() {
+ clip_stop = state.in_segment.get_stop();
+ }
+ let mut segment = state.in_segment.clone();
+ segment.set_start(clip_start);
+ segment.set_stop(clip_stop);
+ gst_log!(CAT, obj: pad, "Clipping to segment {:?}", segment,);
+ if let Some(data) = data.clip(&*state, &segment) {
+ return Ok(HandleResult::Pass(data));
+ } else {
+ gst_warning!(CAT, obj: pad, "Complete buffer clipped!");
+ return Ok(HandleResult::Drop);
+ }
+ } else if current_running_time < rec_state.last_recording_start {
+ // Otherwise if the buffer starts before the recording start, drop it. This
+ // means that we either can't clip, or that the end is also before the
+ // recording start
+ gst_debug!(
+ CAT,
+ obj: pad,
+ "Main stream EOS and we're not EOS yet (before recording start, {} < {})",
+ current_running_time,
+ rec_state.last_recording_start
+ );
+ return Ok(HandleResult::Drop);
+ } else if data.can_clip(&*state)
+ && current_running_time < rec_state.last_recording_stop
+ && current_running_time_end > rec_state.last_recording_stop
+ {
+ // Similarly if the end is after the recording stop but the start is before and we
+ // can clip, clip the buffer and pass it through.
+ gst_debug!(
+ CAT,
+ obj: pad,
+ "Main stream EOS and we're not EOS yet (overlapping recording end, {} < {} < {})",
+ current_running_time,
+ rec_state.last_recording_stop,
+ current_running_time_end
+ );
+ let mut clip_start = state
+ .in_segment
+ .position_from_running_time(rec_state.last_recording_start);
+ if clip_start.is_none() {
+ clip_start = state.in_segment.get_start();
+ }
+ let mut clip_stop = state
+ .in_segment
+ .position_from_running_time(rec_state.last_recording_stop);
+ if clip_stop.is_none() {
+ clip_stop = state.in_segment.get_stop();
+ }
+ let mut segment = state.in_segment.clone();
+ segment.set_start(clip_start);
+ segment.set_stop(clip_stop);
+ gst_log!(CAT, obj: pad, "Clipping to segment {:?}", segment,);
+ if let Some(data) = data.clip(&*state, &segment) {
+ return Ok(HandleResult::Pass(data));
+ } else {
+ gst_warning!(CAT, obj: pad, "Complete buffer clipped!");
+ return Ok(HandleResult::Eos);
+ }
+ } else if current_running_time_end > rec_state.last_recording_stop {
+ // Otherwise if the end of the buffer is after the recording stop, we're EOS
+ // now. This means that we either couldn't clip or that the start is also after
+ // the recording stop
+ gst_debug!(
+ CAT,
+ obj: pad,
+ "Main stream EOS and we're EOS too (after recording end, {} > {})",
+ current_running_time_end,
+ rec_state.last_recording_stop
+ );
+ return Ok(HandleResult::Eos);
+ } else {
+ // In all other cases the buffer is fully between recording start and end and
+ // can be passed through as is
+ assert!(current_running_time >= rec_state.last_recording_start);
+ assert!(current_running_time_end <= rec_state.last_recording_stop);
+ gst_debug!(
+ CAT,
+ obj: pad,
+ "Main stream EOS and we're not EOS yet (before recording end, {} <= {} <= {})",
+ rec_state.last_recording_start,
+ current_running_time,
+ rec_state.last_recording_stop
+ );
+ return Ok(HandleResult::Pass(data));
+ }
+ }
+ // The end of our buffer is before the end of the previous buffer of the main stream
+ assert!(main_state.current_running_time >= current_running_time_end);
+ match rec_state.recording_state {
+ RecordingState::Recording => {
+ // We're properly started, must have a start position and
+ // be actually after that start position
+ assert!(rec_state.last_recording_start.is_some());
+ assert!(current_running_time >= rec_state.last_recording_start);
+ gst_log!(CAT, obj: pad, "Passing buffer (recording)");
+ Ok(HandleResult::Pass(data))
+ }
+ RecordingState::Stopping => {
+ // If we have no start position yet, the main stream is waiting for a key-frame
+ if rec_state.last_recording_stop.is_none() {
+ gst_log!(
+ CAT,
+ obj: pad,
+ "Passing buffer (stopping: waiting for keyframe)",
+ );
+ Ok(HandleResult::Pass(data))
+ } else if current_running_time_end <= rec_state.last_recording_stop {
+ gst_log!(
+ CAT,
+ obj: pad,
+ "Passing buffer (stopping: {} <= {})",
+ current_running_time_end,
+ rec_state.last_recording_stop
+ );
+ Ok(HandleResult::Pass(data))
+ } else if data.can_clip(&*state)
+ && current_running_time < rec_state.last_recording_stop
+ && current_running_time_end > rec_state.last_recording_stop
+ {
+ gst_log!(
+ CAT,
+ obj: pad,
+ "Passing buffer (stopping: {} < {} < {})",
+ current_running_time,
+ rec_state.last_recording_stop,
+ current_running_time_end,
+ );
+ let mut clip_stop = state
+ .in_segment
+ .position_from_running_time(rec_state.last_recording_stop);
+ if clip_stop.is_none() {
+ clip_stop = state.in_segment.get_stop();
+ }
+ let mut segment = state.in_segment.clone();
+ segment.set_stop(clip_stop);
+ gst_log!(CAT, obj: pad, "Clipping to segment {:?}", segment,);
+ if let Some(data) = data.clip(&*state, &segment) {
+ Ok(HandleResult::Pass(data))
+ } else {
+ gst_warning!(CAT, obj: pad, "Complete buffer clipped!");
+ Ok(HandleResult::Drop)
+ }
+ } else {
+ gst_log!(
+ CAT,
+ obj: pad,
+ "Dropping buffer (stopping: {} > {})",
+ current_running_time_end,
+ rec_state.last_recording_stop
+ );
+ Ok(HandleResult::Drop)
+ }
+ }
+ RecordingState::Stopped => {
+ // We're properly stopped
+ gst_log!(CAT, obj: pad, "Dropping buffer (stopped)");
+ Ok(HandleResult::Drop)
+ }
+ RecordingState::Starting => {
+ // If we have no start position yet, the main stream is waiting for a key-frame
+ if rec_state.last_recording_start.is_none() {
+ gst_log!(
+ CAT,
+ obj: pad,
+ "Dropping buffer (starting: waiting for keyframe)",
+ );
+ Ok(HandleResult::Drop)
+ } else if current_running_time >= rec_state.last_recording_start {
+ gst_log!(
+ CAT,
+ obj: pad,
+ "Passing buffer (starting: {} >= {})",
+ current_running_time,
+ rec_state.last_recording_start
+ );
+ Ok(HandleResult::Pass(data))
+ } else if data.can_clip(&*state)
+ && current_running_time < rec_state.last_recording_start
+ && current_running_time_end > rec_state.last_recording_start
+ {
+ gst_log!(
+ CAT,
+ obj: pad,
+ "Passing buffer (starting: {} < {} < {})",
+ current_running_time,
+ rec_state.last_recording_start,
+ current_running_time_end,
+ );
+ let mut clip_start = state
+ .in_segment
+ .position_from_running_time(rec_state.last_recording_start);
+ if clip_start.is_none() {
+ clip_start = state.in_segment.get_start();
+ }
+ let mut segment = state.in_segment.clone();
+ segment.set_start(clip_start);
+ gst_log!(CAT, obj: pad, "Clipping to segment {:?}", segment,);
+ if let Some(data) = data.clip(&*state, &segment) {
+ Ok(HandleResult::Pass(data))
+ } else {
+ gst_warning!(CAT, obj: pad, "Complete buffer clipped!");
+ Ok(HandleResult::Drop)
+ }
+ } else {
+ gst_log!(
+ CAT,
+ obj: pad,
+ "Dropping buffer (starting: {} < {})",
+ current_running_time,
+ rec_state.last_recording_start
+ );
+ Ok(HandleResult::Drop)
+ }
+ }
+ }
+ }
+ fn sink_chain(
+ &self,
+ pad: &gst::Pad,
+ element: &gst::Element,
+ buffer: gst::Buffer,
+ ) -> Result<gst::FlowSuccess, gst::FlowError> {
+ let stream = self.pads.lock().get(pad).cloned().ok_or_else(|| {
+ gst_element_error!(
+ element,
+ gst::CoreError::Pad,
+ ["Unknown pad {:?}", pad.get_name()]
+ );
+ gst::FlowError::Error
+ })?;
+ gst_log!(CAT, obj: pad, "Handling buffer {:?}", buffer);
+ {
+ let state = stream.state.lock();
+ if state.eos {
+ return Err(gst::FlowError::Eos);
+ }
+ }
+ let handle_result = if stream != self.main_stream {
+ self.handle_secondary_stream(element, pad, &stream, buffer)
+ } else {
+ self.handle_main_stream(element, pad, &stream, buffer)
+ }?;
+ let buffer = match handle_result {
+ HandleResult::Drop => {
+ return Ok(gst::FlowSuccess::Ok);
+ }
+ HandleResult::Flushing => {
+ return Err(gst::FlowError::Flushing);
+ }
+ HandleResult::Eos => {
+ stream.srcpad.push_event(
+ gst::Event::new_eos()
+ .seqnum(stream.state.lock().segment_seqnum)
+ .build(),
+ );
+ return Err(gst::FlowError::Eos);
+ }
+ HandleResult::Pass(buffer) => {
+ // Pass through and actually push the buffer
+ buffer
+ }
+ };
+ let out_running_time = {
+ let mut state = stream.state.lock();
+ let mut events = Vec::with_capacity(state.pending_events.len() + 1);
+ if state.segment_pending {
+ let rec_state = self.state.lock();
+ // Adjust so that last_recording_start has running time of
+ // recording_duration
+ state.out_segment = state.in_segment.clone();
+ let offset = rec_state.running_time_offset.unwrap_or(0);
+ state
+ .out_segment
+ .offset_running_time(-(offset as i64))
+ .expect("Adjusting record duration");
+ events.push(
+ gst::Event::new_segment(&state.out_segment)
+ .seqnum(state.segment_seqnum)
+ .build(),
+ );
+ state.segment_pending = false;
+ gst_debug!(CAT, obj: pad, "Pending Segment {:?}", &state.out_segment);
+ }
+ if !state.pending_events.is_empty() {
+ gst_debug!(CAT, obj: pad, "Pushing pending events");
+ }
+ events.append(&mut state.pending_events);
+ let out_running_time = state.out_segment.to_running_time(buffer.get_pts());
+ // Unlock before pushing
+ drop(state);
+ for e in events.drain(..) {
+ stream.srcpad.push_event(e);
+ }
+ out_running_time
+ };
+ gst_log!(
+ CAT,
+ obj: pad,
+ "Pushing buffer with running time {}: {:?}",
+ out_running_time,
+ buffer
+ );
+ stream.srcpad.push(buffer)
+ }
+ fn sink_event(&self, pad: &gst::Pad, element: &gst::Element, mut event: gst::Event) -> bool {
+ use gst::EventView;
+ let stream = match self.pads.lock().get(pad) {
+ None => {
+ gst_element_error!(
+ element,
+ gst::CoreError::Pad,
+ ["Unknown pad {:?}", pad.get_name()]
+ );
+ return false;
+ }
+ Some(stream) => stream.clone(),
+ };
+ gst_log!(CAT, obj: pad, "Handling event {:?}", event);
+ let mut forward = true;
+ let mut send_pending = false;
+ match event.view() {
+ EventView::FlushStart(..) => {
+ let _main_state = if stream != self.main_stream {
+ Some(self.main_stream.state.lock())
+ } else {
+ None
+ };
+ let mut state = stream.state.lock();
+ state.flushing = true;
+ self.main_stream_cond.notify_all();
+ }
+ EventView::FlushStop(..) => {
+ let mut state = stream.state.lock();
+ state.eos = false;
+ state.flushing = false;
+ state.segment_pending = false;
+ state.current_running_time = gst::CLOCK_TIME_NONE;
+ }
+ EventView::Caps(c) => {
+ let mut state = stream.state.lock();
+ let caps = c.get_caps();
+ let s = caps.get_structure(0).unwrap();
+ if s.get_name().starts_with("audio/") {
+ state.audio_info = gst_audio::AudioInfo::from_caps(caps).ok();
+ gst_log!(CAT, obj: pad, "Got audio caps {:?}", state.audio_info);
+ state.video_info = None;
+ } else if s.get_name().starts_with("video/") {
+ state.audio_info = None;
+ state.video_info = gst_video::VideoInfo::from_caps(caps).ok();
+ gst_log!(CAT, obj: pad, "Got video caps {:?}", state.video_info);
+ } else {
+ state.audio_info = None;
+ state.video_info = None;
+ }
+ }
+ EventView::Segment(e) => {
+ let mut state = stream.state.lock();
+ let segment = match e.get_segment().clone().downcast::<gst::ClockTime>() {
+ Err(segment) => {
+ gst_element_error!(
+ element,
+ gst::StreamError::Format,
+ [
+ "Only Time segments supported, got {:?}",
+ segment.get_format(),
+ ]
+ );
+ return false;
+ }
+ Ok(segment) => segment,
+ };
+ if (segment.get_rate() - 1.0).abs() > f64::EPSILON {
+ gst_element_error!(
+ element,
+ gst::StreamError::Format,
+ [
+ "Only rate==1.0 segments supported, got {:?}",
+ segment.get_rate(),
+ ]
+ );
+ return false;
+ }
+ state.in_segment = segment;
+ state.segment_seqnum = event.get_seqnum();
+ state.segment_pending = true;
+ state.current_running_time = gst::CLOCK_TIME_NONE;
+ gst_debug!(CAT, obj: pad, "Got new Segment {:?}", state.in_segment);
+ forward = false;
+ }
+ EventView::Gap(e) => {
+ gst_debug!(CAT, obj: pad, "Handling Gap event {:?}", event);
+ let (pts, duration) = e.get();
+ let handle_result = if stream == self.main_stream {
+ self.handle_main_stream(element, pad, &stream, (pts, duration))
+ } else {
+ self.handle_secondary_stream(element, pad, &stream, (pts, duration))
+ };
+ forward = match handle_result {
+ Ok(HandleResult::Pass((new_pts, new_duration))) if new_pts.is_some() => {
+ if new_pts != pts || new_duration != duration {
+ event = gst::Event::new_gap(new_pts, new_duration).build();
+ }
+ true
+ }
+ Ok(_) => false,
+ Err(_) => false,
+ };
+ }
+ EventView::Eos(..) => {
+ let _main_state = if stream != self.main_stream {
+ Some(self.main_stream.state.lock())
+ } else {
+ None
+ };
+ let mut state = stream.state.lock();
+ state.eos = true;
+ self.main_stream_cond.notify_all();
+ gst_debug!(
+ CAT,
+ obj: pad,
+ "Stream is EOS now, sending any pending events"
+ );
+ send_pending = true;
+ }
+ _ => (),
+ };
+ // If a serialized event and coming after Segment and a new Segment is pending,
+ // queue up and send at a later time (buffer/gap) after we sent the Segment
+ let type_ = event.get_type();
+ if forward
+ && type_ != gst::EventType::Eos
+ && type_.is_serialized()
+ && type_.partial_cmp(&gst::EventType::Segment) == Some(cmp::Ordering::Greater)
+ {
+ let mut state = stream.state.lock();
+ if state.segment_pending {
+ gst_log!(CAT, obj: pad, "Storing event for later pushing");
+ state.pending_events.push(event);
+ return true;
+ }
+ }
+ if send_pending {
+ let mut state = stream.state.lock();
+ let mut events = Vec::with_capacity(state.pending_events.len() + 1);
+ // Got not a single buffer on this stream before EOS, forward
+ // the input segment
+ if state.segment_pending {
+ events.push(
+ gst::Event::new_segment(&state.in_segment)
+ .seqnum(state.segment_seqnum)
+ .build(),
+ );
+ }
+ events.append(&mut state.pending_events);
+ drop(state);
+ for e in events.drain(..) {
+ stream.srcpad.push_event(e);
+ }
+ }
+ if forward {
+ gst_log!(CAT, obj: pad, "Forwarding event {:?}", event);
+ stream.srcpad.push_event(event)
+ } else {
+ gst_log!(CAT, obj: pad, "Dropping event {:?}", event);
+ true
+ }
+ }
+ fn sink_query(
+ &self,
+ pad: &gst::Pad,
+ element: &gst::Element,
+ query: &mut gst::QueryRef,
+ ) -> bool {
+ let stream = match self.pads.lock().get(pad) {
+ None => {
+ gst_element_error!(
+ element,
+ gst::CoreError::Pad,
+ ["Unknown pad {:?}", pad.get_name()]
+ );
+ return false;
+ }
+ Some(stream) => stream.clone(),
+ };
+ gst_log!(CAT, obj: pad, "Handling query {:?}", query);
+ stream.srcpad.peer_query(query)
+ }
+ fn src_event(&self, pad: &gst::Pad, element: &gst::Element, mut event: gst::Event) -> bool {
+ use gst::EventView;
+ let stream = match self.pads.lock().get(pad) {
+ None => {
+ gst_element_error!(
+ element,
+ gst::CoreError::Pad,
+ ["Unknown pad {:?}", pad.get_name()]
+ );
+ return false;
+ }
+ Some(stream) => stream.clone(),
+ };
+ gst_log!(CAT, obj: pad, "Handling event {:?}", event);
+ let forward = match event.view() {
+ EventView::Seek(..) => false,
+ _ => true,
+ };
+ let rec_state = self.state.lock();
+ let running_time_offset = rec_state.running_time_offset.unwrap_or(0) as i64;
+ let offset = event.get_running_time_offset();
+ event
+ .make_mut()
+ .set_running_time_offset(offset + running_time_offset);
+ drop(rec_state);
+ if forward {
+ gst_log!(CAT, obj: pad, "Forwarding event {:?}", event);
+ stream.sinkpad.push_event(event)
+ } else {
+ gst_log!(CAT, obj: pad, "Dropping event {:?}", event);
+ false
+ }
+ }
+ fn src_query(&self, pad: &gst::Pad, element: &gst::Element, query: &mut gst::QueryRef) -> bool {
+ use gst::QueryView;
+ let stream = match self.pads.lock().get(pad) {
+ None => {
+ gst_element_error!(
+ element,
+ gst::CoreError::Pad,
+ ["Unknown pad {:?}", pad.get_name()]
+ );
+ return false;
+ }
+ Some(stream) => stream.clone(),
+ };
+ gst_log!(CAT, obj: pad, "Handling query {:?}", query);
+ match query.view_mut() {
+ QueryView::Scheduling(ref mut q) => {
+ let mut new_query = gst::Query::new_scheduling();
+ let res = stream.sinkpad.peer_query(&mut new_query);
+ if !res {
+ return res;
+ }
+ gst_log!(CAT, obj: pad, "Downstream returned {:?}", new_query);
+ let (flags, min, max, align) = new_query.get_result();
+ q.set(flags, min, max, align);
+ q.add_scheduling_modes(
+ &new_query
+ .get_scheduling_modes()
+ .iter()
+ .cloned()
+ .filter(|m| m != &gst::PadMode::Pull)
+ .collect::<Vec<_>>(),
+ );
+ gst_log!(CAT, obj: pad, "Returning {:?}", q.get_mut_query());
+ true
+ }
+ QueryView::Seeking(ref mut q) => {
+ // Seeking is not possible here
+ let format = q.get_format();
+ q.set(
+ false,
+ gst::GenericFormattedValue::new(format, -1),
+ gst::GenericFormattedValue::new(format, -1),
+ );
+ gst_log!(CAT, obj: pad, "Returning {:?}", q.get_mut_query());
+ true
+ }
+ // Position and duration is always the current recording position
+ QueryView::Position(ref mut q) => {
+ if q.get_format() == gst::Format::Time {
+ let state = stream.state.lock();
+ let rec_state = self.state.lock();
+ let mut recording_duration = rec_state.recording_duration;
+ if rec_state.recording_state == RecordingState::Recording
+ || rec_state.recording_state == RecordingState::Stopping
+ {
+ recording_duration +=
+ state.current_running_time - rec_state.last_recording_start;
+ }
+ q.set(recording_duration);
+ true
+ } else {
+ false
+ }
+ }
+ QueryView::Duration(ref mut q) => {
+ if q.get_format() == gst::Format::Time {
+ let state = stream.state.lock();
+ let rec_state = self.state.lock();
+ let mut recording_duration = rec_state.recording_duration;
+ if rec_state.recording_state == RecordingState::Recording
+ || rec_state.recording_state == RecordingState::Stopping
+ {
+ recording_duration +=
+ state.current_running_time - rec_state.last_recording_start;
+ }
+ q.set(recording_duration);
+ true
+ } else {
+ false
+ }
+ }
+ _ => {
+ gst_log!(CAT, obj: pad, "Forwarding query {:?}", query);
+ stream.sinkpad.peer_query(query)
+ }
+ }
+ }
+ fn iterate_internal_links(
+ &self,
+ pad: &gst::Pad,
+ element: &gst::Element,
+ ) -> gst::Iterator<gst::Pad> {
+ let stream = match self.pads.lock().get(pad) {
+ None => {
+ gst_element_error!(
+ element,
+ gst::CoreError::Pad,
+ ["Unknown pad {:?}", pad.get_name()]
+ );
+ return gst::Iterator::from_vec(vec![]);
+ }
+ Some(stream) => stream.clone(),
+ };
+ if pad == &stream.srcpad {
+ gst::Iterator::from_vec(vec![stream.sinkpad])
+ } else {
+ gst::Iterator::from_vec(vec![stream.srcpad])
+ }
+ }
+impl ObjectSubclass for ToggleRecord {
+ const NAME: &'static str = "RsToggleRecord";
+ type ParentType = gst::Element;
+ type Instance = gst::subclass::ElementInstanceStruct<Self>;
+ type Class = subclass::simple::ClassStruct<Self>;
+ glib_object_subclass!();
+ fn new_with_class(klass: &subclass::simple::ClassStruct<Self>) -> Self {
+ let templ = klass.get_pad_template("sink").unwrap();
+ let sinkpad = gst::Pad::new_from_template(&templ, Some("sink"));
+ let templ = klass.get_pad_template("src").unwrap();
+ let srcpad = gst::Pad::new_from_template(&templ, Some("src"));
+ ToggleRecord::set_pad_functions(&sinkpad, &srcpad);
+ let main_stream = Stream::new(sinkpad, srcpad);
+ let mut pads = HashMap::new();
+ pads.insert(main_stream.sinkpad.clone(), main_stream.clone());
+ pads.insert(main_stream.srcpad.clone(), main_stream.clone());
+ Self {
+ settings: Mutex::new(Settings::default()),
+ state: Mutex::new(State::default()),
+ main_stream,
+ main_stream_cond: Condvar::new(),
+ other_streams: Mutex::new((Vec::new(), 0)),
+ pads: Mutex::new(pads),
+ }
+ }
+ fn class_init(klass: &mut subclass::simple::ClassStruct<Self>) {
+ klass.install_properties(&PROPERTIES);
+ klass.set_metadata(
+ "Toggle Record",
+ "Generic",
+ "Valve that ensures multiple streams start/end at the same time",
+ "Sebastian Dröge <sebastian@centricular.com>",
+ );
+ let caps = gst::Caps::new_any();
+ let src_pad_template = gst::PadTemplate::new(
+ "src",
+ gst::PadDirection::Src,
+ gst::PadPresence::Always,
+ &caps,
+ )
+ .unwrap();
+ klass.add_pad_template(src_pad_template);
+ let sink_pad_template = gst::PadTemplate::new(
+ "sink",
+ gst::PadDirection::Sink,
+ gst::PadPresence::Always,
+ &caps,
+ )
+ .unwrap();
+ klass.add_pad_template(sink_pad_template);
+ let src_pad_template = gst::PadTemplate::new(
+ "src_%u",
+ gst::PadDirection::Src,
+ gst::PadPresence::Sometimes,
+ &caps,
+ )
+ .unwrap();
+ klass.add_pad_template(src_pad_template);
+ let sink_pad_template = gst::PadTemplate::new(
+ "sink_%u",
+ gst::PadDirection::Sink,
+ gst::PadPresence::Request,
+ &caps,
+ )
+ .unwrap();
+ klass.add_pad_template(sink_pad_template);
+ }
+impl ObjectImpl for ToggleRecord {
+ glib_object_impl!();
+ fn set_property(&self, obj: &glib::Object, id: usize, value: &glib::Value) {
+ let prop = &PROPERTIES[id];
+ let element = obj.downcast_ref::<gst::Element>().unwrap();
+ match *prop {
+ subclass::Property("record", ..) => {
+ let mut settings = self.settings.lock();
+ let record = value.get_some().expect("type checked upstream");
+ gst_debug!(
+ CAT,
+ obj: element,
+ "Setting record from {:?} to {:?}",
+ settings.record,
+ record
+ );
+ settings.record = record;
+ }
+ _ => unimplemented!(),
+ }
+ }
+ fn get_property(&self, _obj: &glib::Object, id: usize) -> Result<glib::Value, ()> {
+ let prop = &PROPERTIES[id];
+ match *prop {
+ subclass::Property("record", ..) => {
+ let settings = self.settings.lock();
+ Ok(settings.record.to_value())
+ }
+ subclass::Property("recording", ..) => {
+ let rec_state = self.state.lock();
+ Ok((rec_state.recording_state == RecordingState::Recording).to_value())
+ }
+ _ => unimplemented!(),
+ }
+ }
+ fn constructed(&self, obj: &glib::Object) {
+ self.parent_constructed(obj);
+ let element = obj.downcast_ref::<gst::Element>().unwrap();
+ element.add_pad(&self.main_stream.sinkpad).unwrap();
+ element.add_pad(&self.main_stream.srcpad).unwrap();
+ }
+impl ElementImpl for ToggleRecord {
+ fn change_state(
+ &self,
+ element: &gst::Element,
+ transition: gst::StateChange,
+ ) -> Result<gst::StateChangeSuccess, gst::StateChangeError> {
+ gst_trace!(CAT, obj: element, "Changing state {:?}", transition);
+ match transition {
+ gst::StateChange::ReadyToPaused => {
+ for s in self
+ .other_streams
+ .lock()
+ .0
+ .iter()
+ .chain(iter::once(&self.main_stream))
+ {
+ let mut state = s.state.lock();
+ *state = StreamState::default();
+ }
+ let mut rec_state = self.state.lock();
+ *rec_state = State::default();
+ }
+ gst::StateChange::PausedToReady => {
+ for s in &self.other_streams.lock().0 {
+ let mut state = s.state.lock();
+ state.flushing = true;
+ }
+ let mut state = self.main_stream.state.lock();
+ state.flushing = true;
+ self.main_stream_cond.notify_all();
+ }
+ _ => (),
+ }
+ let success = self.parent_change_state(element, transition)?;
+ if transition == gst::StateChange::PausedToReady {
+ for s in self
+ .other_streams
+ .lock()
+ .0
+ .iter()
+ .chain(iter::once(&self.main_stream))
+ {
+ let mut state = s.state.lock();
+ state.pending_events.clear();
+ }
+ let mut rec_state = self.state.lock();
+ *rec_state = State::default();
+ drop(rec_state);
+ element.notify("recording");
+ }
+ Ok(success)
+ }
+ fn request_new_pad(
+ &self,
+ element: &gst::Element,
+ _templ: &gst::PadTemplate,
+ _name: Option<String>,
+ _caps: Option<&gst::Caps>,
+ ) -> Option<gst::Pad> {
+ let mut other_streams_guard = self.other_streams.lock();
+ let (ref mut other_streams, ref mut pad_count) = *other_streams_guard;
+ let mut pads = self.pads.lock();
+ let id = *pad_count;
+ *pad_count += 1;
+ let templ = element.get_pad_template("sink_%u").unwrap();
+ let sinkpad = gst::Pad::new_from_template(&templ, Some(format!("sink_{}", id).as_str()));
+ let templ = element.get_pad_template("src_%u").unwrap();
+ let srcpad = gst::Pad::new_from_template(&templ, Some(format!("src_{}", id).as_str()));
+ ToggleRecord::set_pad_functions(&sinkpad, &srcpad);
+ sinkpad.set_active(true).unwrap();
+ srcpad.set_active(true).unwrap();
+ let stream = Stream::new(sinkpad.clone(), srcpad.clone());
+ pads.insert(stream.sinkpad.clone(), stream.clone());
+ pads.insert(stream.srcpad.clone(), stream.clone());
+ other_streams.push(stream);
+ drop(pads);
+ drop(other_streams_guard);
+ element.add_pad(&sinkpad).unwrap();
+ element.add_pad(&srcpad).unwrap();
+ Some(sinkpad)
+ }
+ fn release_pad(&self, element: &gst::Element, pad: &gst::Pad) {
+ let mut other_streams_guard = self.other_streams.lock();
+ let (ref mut other_streams, _) = *other_streams_guard;
+ let mut pads = self.pads.lock();
+ let stream = match pads.get(pad) {
+ None => return,
+ Some(stream) => stream.clone(),
+ };
+ stream.srcpad.set_active(false).unwrap();
+ stream.sinkpad.set_active(false).unwrap();
+ pads.remove(&stream.sinkpad).unwrap();
+ pads.remove(&stream.srcpad).unwrap();
+ // TODO: Replace with Vec::remove_item() once stable
+ let pos = other_streams.iter().position(|x| *x == stream);
+ pos.map(|pos| other_streams.swap_remove(pos));
+ drop(pads);
+ drop(other_streams_guard);
+ element.remove_pad(&stream.sinkpad).unwrap();
+ element.remove_pad(&stream.srcpad).unwrap();
+ }
+pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> {
+ gst::Element::register(
+ Some(plugin),
+ "togglerecord",
+ gst::Rank::None,
+ ToggleRecord::get_type(),
+ )
diff --git a/utils/togglerecord/tests/tests.rs b/utils/togglerecord/tests/tests.rs
new file mode 100644
index 00000000..c0eaf674
--- /dev/null
+++ b/utils/togglerecord/tests/tests.rs
@@ -0,0 +1,1173 @@
+// Copyright (C) 2017 Sebastian Dröge <sebastian@centricular.com>
+// This library is free software; you can redistribute it and/or
+// modify it under the terms of the GNU Library General Public
+// License as published by the Free Software Foundation; either
+// version 2 of the License, or (at your option) any later version.
+// This library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// Library General Public License for more details.
+// You should have received a copy of the GNU Library General Public
+// License along with this library; if not, write to the
+// Free Software Foundation, Inc., 51 Franklin Street, Suite 500,
+// Boston, MA 02110-1335, USA.
+extern crate glib;
+use glib::prelude::*;
+extern crate gstreamer as gst;
+use gst::prelude::*;
+extern crate either;
+use either::*;
+use std::sync::{mpsc, Mutex};
+use std::thread;
+extern crate gsttogglerecord;
+fn init() {
+ use std::sync::Once;
+ static INIT: Once = Once::new();
+ INIT.call_once(|| {
+ gst::init().unwrap();
+ gsttogglerecord::plugin_register_static().expect("gsttogglerecord tests");
+ });
+enum SendData {
+ Buffers(usize),
+ BuffersDelta(usize),
+ Gaps(usize),
+ Eos,
+fn setup_sender_receiver(
+ pipeline: &gst::Pipeline,
+ togglerecord: &gst::Element,
+ pad: &str,
+ offset: gst::ClockTime,
+) -> (
+ mpsc::Sender<SendData>,
+ mpsc::Receiver<()>,
+ mpsc::Receiver<Either<gst::Buffer, gst::Event>>,
+ thread::JoinHandle<()>,
+) {
+ let fakesink = gst::ElementFactory::make("fakesink", None).unwrap();
+ fakesink.set_property("async", &false).unwrap();
+ pipeline.add(&fakesink).unwrap();
+ let main_stream = pad == "src";
+ let (srcpad, sinkpad) = if main_stream {
+ (
+ togglerecord.get_static_pad("src").unwrap(),
+ togglerecord.get_static_pad("sink").unwrap(),
+ )
+ } else {
+ let sinkpad = togglerecord.get_request_pad("sink_%u").unwrap();
+ let srcpad = sinkpad.iterate_internal_links().next().unwrap().unwrap();
+ (srcpad, sinkpad)
+ };
+ let fakesink_sinkpad = fakesink.get_static_pad("sink").unwrap();
+ srcpad.link(&fakesink_sinkpad).unwrap();
+ let (sender_output, receiver_output) = mpsc::channel::<Either<gst::Buffer, gst::Event>>();
+ let sender_output = Mutex::new(sender_output);
+ srcpad.add_probe(
+ gst::PadProbeType::BUFFER | gst::PadProbeType::EVENT_DOWNSTREAM,
+ move |_, ref probe_info| {
+ match probe_info.data {
+ Some(gst::PadProbeData::Buffer(ref buffer)) => {
+ sender_output
+ .lock()
+ .unwrap()
+ .send(Left(buffer.clone()))
+ .unwrap();
+ }
+ Some(gst::PadProbeData::Event(ref event)) => {
+ sender_output
+ .lock()
+ .unwrap()
+ .send(Right(event.clone()))
+ .unwrap();
+ }
+ _ => {
+ unreachable!();
+ }
+ }
+ gst::PadProbeReturn::Ok
+ },
+ );
+ let (sender_input, receiver_input) = mpsc::channel::<SendData>();
+ let (sender_input_done, receiver_input_done) = mpsc::channel::<()>();
+ let thread = thread::spawn(move || {
+ let mut i = 0;
+ let mut first = true;
+ while let Ok(send_data) = receiver_input.recv() {
+ if first {
+ assert!(sinkpad.send_event(gst::Event::new_stream_start("test").build()));
+ let caps = if main_stream {
+ gst::Caps::builder("video/x-raw")
+ .field("format", &"ARGB")
+ .field("width", &320i32)
+ .field("height", &240i32)
+ .field("framerate", &gst::Fraction::new(50, 1))
+ .build()
+ } else {
+ gst::Caps::builder("audio/x-raw")
+ .field("format", &"U8")
+ .field("layout", &"interleaved")
+ .field("rate", &8000i32)
+ .field("channels", &1i32)
+ .build()
+ };
+ assert!(sinkpad.send_event(gst::Event::new_caps(&caps).build()));
+ let segment = gst::FormattedSegment::<gst::ClockTime>::new();
+ assert!(sinkpad.send_event(gst::Event::new_segment(&segment).build()));
+ let mut tags = gst::TagList::new();
+ tags.get_mut()
+ .unwrap()
+ .add::<gst::tags::Title>(&"some title", gst::TagMergeMode::Append);
+ assert!(sinkpad.send_event(gst::Event::new_tag(tags).build()));
+ first = false;
+ }
+ let buffer = if main_stream {
+ gst::Buffer::with_size(320 * 240 * 4).unwrap()
+ } else {
+ gst::Buffer::with_size(160).unwrap()
+ };
+ match send_data {
+ SendData::Eos => {
+ break;
+ }
+ SendData::Buffers(n) => {
+ for _ in 0..n {
+ let mut buffer = buffer.clone();
+ {
+ let buffer = buffer.make_mut();
+ buffer.set_pts(offset + i * 20 * gst::MSECOND);
+ buffer.set_duration(20 * gst::MSECOND);
+ }
+ let _ = sinkpad.chain(buffer);
+ i += 1;
+ }
+ }
+ SendData::BuffersDelta(n) => {
+ for _ in 0..n {
+ let mut buffer = gst::Buffer::new();
+ buffer
+ .get_mut()
+ .unwrap()
+ .set_pts(offset + i * 20 * gst::MSECOND);
+ buffer.get_mut().unwrap().set_duration(20 * gst::MSECOND);
+ buffer
+ .get_mut()
+ .unwrap()
+ .set_flags(gst::BufferFlags::DELTA_UNIT);
+ let _ = sinkpad.chain(buffer);
+ i += 1;
+ }
+ }
+ SendData::Gaps(n) => {
+ for _ in 0..n {
+ let event =
+ gst::Event::new_gap(offset + i * 20 * gst::MSECOND, 20 * gst::MSECOND)
+ .build();
+ let _ = sinkpad.send_event(event);
+ i += 1;
+ }
+ }
+ }
+ let _ = sender_input_done.send(());
+ }
+ let _ = sinkpad.send_event(gst::Event::new_eos().build());
+ let _ = sender_input_done.send(());
+ });
+ (sender_input, receiver_input_done, receiver_output, thread)
+fn recv_buffers(
+ receiver_output: &mpsc::Receiver<Either<gst::Buffer, gst::Event>>,
+ segment: &mut gst::FormattedSegment<gst::ClockTime>,
+ wait_buffers: usize,
+) -> Vec<(gst::ClockTime, gst::ClockTime, gst::ClockTime)> {
+ let mut res = Vec::new();
+ let mut n_buffers = 0;
+ while let Ok(val) = receiver_output.recv() {
+ match val {
+ Left(buffer) => {
+ res.push((
+ segment.to_running_time(buffer.get_pts()),
+ buffer.get_pts(),
+ buffer.get_duration(),
+ ));
+ n_buffers += 1;
+ if wait_buffers > 0 && n_buffers == wait_buffers {
+ return res;
+ }
+ }
+ Right(event) => {
+ use gst::EventView;
+ match event.view() {
+ EventView::Gap(ref e) => {
+ let (ts, duration) = e.get();
+ res.push((segment.to_running_time(ts), ts, duration));
+ n_buffers += 1;
+ if wait_buffers > 0 && n_buffers == wait_buffers {
+ return res;
+ }
+ }
+ EventView::Eos(..) => {
+ return res;
+ }
+ EventView::Segment(ref e) => {
+ *segment = e.get_segment().clone().downcast().unwrap();
+ }
+ _ => (),
+ }
+ }
+ }
+ }
+ res
+fn test_create() {
+ init();
+ assert!(gst::ElementFactory::make("togglerecord", None).is_ok());
+fn test_create_pads() {
+ init();
+ let togglerecord = gst::ElementFactory::make("togglerecord", None).unwrap();
+ let sinkpad = togglerecord.get_request_pad("sink_%u").unwrap();
+ let srcpad = sinkpad.iterate_internal_links().next().unwrap().unwrap();
+ assert_eq!(sinkpad.get_name(), "sink_0");
+ assert_eq!(srcpad.get_name(), "src_0");
+ togglerecord.release_request_pad(&sinkpad);
+ assert!(sinkpad.get_parent().is_none());
+ assert!(srcpad.get_parent().is_none());
+fn test_one_stream_open() {
+ init();
+ let pipeline = gst::Pipeline::new(None);
+ let togglerecord = gst::ElementFactory::make("togglerecord", None).unwrap();
+ pipeline.add(&togglerecord).unwrap();
+ let (sender_input, _, receiver_output, thread) =
+ setup_sender_receiver(&pipeline, &togglerecord, "src", 0.into());
+ pipeline.set_state(gst::State::Playing).unwrap();
+ togglerecord.set_property("record", &true).unwrap();
+ sender_input.send(SendData::Buffers(10)).unwrap();
+ drop(sender_input);
+ let mut segment = gst::FormattedSegment::<gst::ClockTime>::new();
+ let buffers = recv_buffers(&receiver_output, &mut segment, 0);
+ assert_eq!(buffers.len(), 10);
+ for (index, &(running_time, pts, duration)) in buffers.iter().enumerate() {
+ let index = index as u64;
+ assert_eq!(running_time, index * 20 * gst::MSECOND);
+ assert_eq!(pts, index * 20 * gst::MSECOND);
+ assert_eq!(duration, 20 * gst::MSECOND);
+ }
+ thread.join().unwrap();
+ pipeline.set_state(gst::State::Null).unwrap();
+fn test_one_stream_gaps_open() {
+ init();
+ let pipeline = gst::Pipeline::new(None);
+ let togglerecord = gst::ElementFactory::make("togglerecord", None).unwrap();
+ pipeline.add(&togglerecord).unwrap();
+ let (sender_input, _, receiver_output, thread) =
+ setup_sender_receiver(&pipeline, &togglerecord, "src", 0.into());
+ pipeline.set_state(gst::State::Playing).unwrap();
+ togglerecord.set_property("record", &true).unwrap();
+ sender_input.send(SendData::Buffers(5)).unwrap();
+ sender_input.send(SendData::Gaps(5)).unwrap();
+ drop(sender_input);
+ let mut segment = gst::FormattedSegment::<gst::ClockTime>::new();
+ let buffers = recv_buffers(&receiver_output, &mut segment, 0);
+ assert_eq!(buffers.len(), 10);
+ for (index, &(running_time, pts, duration)) in buffers.iter().enumerate() {
+ let index = index as u64;
+ assert_eq!(running_time, index * 20 * gst::MSECOND);
+ assert_eq!(pts, index * 20 * gst::MSECOND);
+ assert_eq!(duration, 20 * gst::MSECOND);
+ }
+ thread.join().unwrap();
+ pipeline.set_state(gst::State::Null).unwrap();
+fn test_one_stream_close_open() {
+ init();
+ let pipeline = gst::Pipeline::new(None);
+ let togglerecord = gst::ElementFactory::make("togglerecord", None).unwrap();
+ pipeline.add(&togglerecord).unwrap();
+ let (sender_input, receiver_input_done, receiver_output, thread) =
+ setup_sender_receiver(&pipeline, &togglerecord, "src", 0.into());
+ pipeline.set_state(gst::State::Playing).unwrap();
+ sender_input.send(SendData::Buffers(10)).unwrap();
+ receiver_input_done.recv().unwrap();
+ togglerecord.set_property("record", &true).unwrap();
+ sender_input.send(SendData::Buffers(10)).unwrap();
+ drop(sender_input);
+ let mut segment = gst::FormattedSegment::<gst::ClockTime>::new();
+ let buffers = recv_buffers(&receiver_output, &mut segment, 0);
+ assert_eq!(buffers.len(), 10);
+ for (index, &(running_time, pts, duration)) in buffers.iter().enumerate() {
+ let index = index as u64;
+ assert_eq!(running_time, index * 20 * gst::MSECOND);
+ assert_eq!(pts, (10 + index) * 20 * gst::MSECOND);
+ assert_eq!(duration, 20 * gst::MSECOND);
+ }
+ thread.join().unwrap();
+ pipeline.set_state(gst::State::Null).unwrap();
+fn test_one_stream_open_close() {
+ init();
+ let pipeline = gst::Pipeline::new(None);
+ let togglerecord = gst::ElementFactory::make("togglerecord", None).unwrap();
+ pipeline.add(&togglerecord).unwrap();
+ let (sender_input, receiver_input_done, receiver_output, thread) =
+ setup_sender_receiver(&pipeline, &togglerecord, "src", 0.into());
+ pipeline.set_state(gst::State::Playing).unwrap();
+ togglerecord.set_property("record", &true).unwrap();
+ sender_input.send(SendData::Buffers(10)).unwrap();
+ receiver_input_done.recv().unwrap();
+ togglerecord.set_property("record", &false).unwrap();
+ sender_input.send(SendData::Buffers(10)).unwrap();
+ drop(sender_input);
+ let mut segment = gst::FormattedSegment::<gst::ClockTime>::new();
+ let buffers = recv_buffers(&receiver_output, &mut segment, 0);
+ assert_eq!(buffers.len(), 10);
+ for (index, &(running_time, pts, duration)) in buffers.iter().enumerate() {
+ let index = index as u64;
+ assert_eq!(running_time, index * 20 * gst::MSECOND);
+ assert_eq!(pts, index * 20 * gst::MSECOND);
+ assert_eq!(duration, 20 * gst::MSECOND);
+ }
+ thread.join().unwrap();
+ pipeline.set_state(gst::State::Null).unwrap();
+fn test_one_stream_open_close_open() {
+ init();
+ let pipeline = gst::Pipeline::new(None);
+ let togglerecord = gst::ElementFactory::make("togglerecord", None).unwrap();
+ pipeline.add(&togglerecord).unwrap();
+ let (sender_input, receiver_input_done, receiver_output, thread) =
+ setup_sender_receiver(&pipeline, &togglerecord, "src", 0.into());
+ pipeline.set_state(gst::State::Playing).unwrap();
+ togglerecord.set_property("record", &true).unwrap();
+ sender_input.send(SendData::Buffers(10)).unwrap();
+ receiver_input_done.recv().unwrap();
+ togglerecord.set_property("record", &false).unwrap();
+ sender_input.send(SendData::Buffers(10)).unwrap();
+ receiver_input_done.recv().unwrap();
+ togglerecord.set_property("record", &true).unwrap();
+ sender_input.send(SendData::Buffers(10)).unwrap();
+ drop(sender_input);
+ let mut segment = gst::FormattedSegment::<gst::ClockTime>::new();
+ let buffers = recv_buffers(&receiver_output, &mut segment, 0);
+ assert_eq!(buffers.len(), 20);
+ for (index, &(running_time, pts, duration)) in buffers.iter().enumerate() {
+ let pts_off = if index >= 10 {
+ 10 * 20 * gst::MSECOND
+ } else {
+ 0.into()
+ };
+ let index = index as u64;
+ assert_eq!(running_time, index * 20 * gst::MSECOND);
+ assert_eq!(pts, pts_off + index * 20 * gst::MSECOND);
+ assert_eq!(duration, 20 * gst::MSECOND);
+ }
+ thread.join().unwrap();
+ pipeline.set_state(gst::State::Null).unwrap();
+fn test_two_stream_open() {
+ init();
+ let pipeline = gst::Pipeline::new(None);
+ let togglerecord = gst::ElementFactory::make("togglerecord", None).unwrap();
+ pipeline.add(&togglerecord).unwrap();
+ let (sender_input_1, receiver_input_done_1, receiver_output_1, thread_1) =
+ setup_sender_receiver(&pipeline, &togglerecord, "src", 0.into());
+ let (sender_input_2, receiver_input_done_2, receiver_output_2, thread_2) =
+ setup_sender_receiver(&pipeline, &togglerecord, "src_%u", 0.into());
+ pipeline.set_state(gst::State::Playing).unwrap();
+ togglerecord.set_property("record", &true).unwrap();
+ sender_input_1.send(SendData::Buffers(10)).unwrap();
+ sender_input_2.send(SendData::Buffers(11)).unwrap();
+ receiver_input_done_1.recv().unwrap();
+ sender_input_1.send(SendData::Eos).unwrap();
+ receiver_input_done_1.recv().unwrap();
+ sender_input_2.send(SendData::Eos).unwrap();
+ receiver_input_done_2.recv().unwrap();
+ receiver_input_done_2.recv().unwrap();
+ let mut segment_1 = gst::FormattedSegment::<gst::ClockTime>::new();
+ let buffers_1 = recv_buffers(&receiver_output_1, &mut segment_1, 0);
+ for (index, &(running_time, pts, duration)) in buffers_1.iter().enumerate() {
+ let index = index as u64;
+ assert_eq!(running_time, index * 20 * gst::MSECOND);
+ assert_eq!(pts, index * 20 * gst::MSECOND);
+ assert_eq!(duration, 20 * gst::MSECOND);
+ }
+ assert_eq!(buffers_1.len(), 10);
+ // Last buffer should be dropped from second stream
+ let mut segment_2 = gst::FormattedSegment::<gst::ClockTime>::new();
+ let buffers_2 = recv_buffers(&receiver_output_2, &mut segment_2, 0);
+ for (index, &(running_time, pts, duration)) in buffers_2.iter().enumerate() {
+ let index = index as u64;
+ assert_eq!(running_time, index * 20 * gst::MSECOND);
+ assert_eq!(pts, index * 20 * gst::MSECOND);
+ assert_eq!(duration, 20 * gst::MSECOND);
+ }
+ assert_eq!(buffers_2.len(), 10);
+ thread_1.join().unwrap();
+ thread_2.join().unwrap();
+ pipeline.set_state(gst::State::Null).unwrap();
+fn test_two_stream_open_shift() {
+ init();
+ let pipeline = gst::Pipeline::new(None);
+ let togglerecord = gst::ElementFactory::make("togglerecord", None).unwrap();
+ pipeline.add(&togglerecord).unwrap();
+ let (sender_input_1, receiver_input_done_1, receiver_output_1, thread_1) =
+ setup_sender_receiver(&pipeline, &togglerecord, "src", 0.into());
+ let (sender_input_2, receiver_input_done_2, receiver_output_2, thread_2) =
+ setup_sender_receiver(&pipeline, &togglerecord, "src_%u", 5 * gst::MSECOND);
+ pipeline.set_state(gst::State::Playing).unwrap();
+ togglerecord.set_property("record", &true).unwrap();
+ sender_input_1.send(SendData::Buffers(10)).unwrap();
+ sender_input_2.send(SendData::Buffers(11)).unwrap();
+ receiver_input_done_1.recv().unwrap();
+ sender_input_1.send(SendData::Eos).unwrap();
+ receiver_input_done_1.recv().unwrap();
+ sender_input_2.send(SendData::Eos).unwrap();
+ receiver_input_done_2.recv().unwrap();
+ receiver_input_done_2.recv().unwrap();
+ let mut segment_1 = gst::FormattedSegment::<gst::ClockTime>::new();
+ let buffers_1 = recv_buffers(&receiver_output_1, &mut segment_1, 0);
+ for (index, &(running_time, pts, duration)) in buffers_1.iter().enumerate() {
+ let index = index as u64;
+ assert_eq!(running_time, index * 20 * gst::MSECOND);
+ assert_eq!(pts, index * 20 * gst::MSECOND);
+ assert_eq!(duration, 20 * gst::MSECOND);
+ }
+ assert_eq!(buffers_1.len(), 10);
+ // Second to last buffer should be clipped from second stream, last should be dropped
+ let mut segment_2 = gst::FormattedSegment::<gst::ClockTime>::new();
+ let buffers_2 = recv_buffers(&receiver_output_2, &mut segment_2, 0);
+ for (index, &(running_time, pts, duration)) in buffers_2.iter().enumerate() {
+ let index = index as u64;
+ assert_eq!(running_time, 5 * gst::MSECOND + index * 20 * gst::MSECOND);
+ assert_eq!(pts, 5 * gst::MSECOND + index * 20 * gst::MSECOND);
+ if index == 9 {
+ assert_eq!(duration, 15 * gst::MSECOND);
+ } else {
+ assert_eq!(duration, 20 * gst::MSECOND);
+ }
+ }
+ assert_eq!(buffers_2.len(), 10);
+ thread_1.join().unwrap();
+ thread_2.join().unwrap();
+ pipeline.set_state(gst::State::Null).unwrap();
+fn test_two_stream_open_shift_main() {
+ init();
+ let pipeline = gst::Pipeline::new(None);
+ let togglerecord = gst::ElementFactory::make("togglerecord", None).unwrap();
+ pipeline.add(&togglerecord).unwrap();
+ let (sender_input_1, receiver_input_done_1, receiver_output_1, thread_1) =
+ setup_sender_receiver(&pipeline, &togglerecord, "src", 5 * gst::MSECOND);
+ let (sender_input_2, receiver_input_done_2, receiver_output_2, thread_2) =
+ setup_sender_receiver(&pipeline, &togglerecord, "src_%u", 0.into());
+ pipeline.set_state(gst::State::Playing).unwrap();
+ togglerecord.set_property("record", &true).unwrap();
+ sender_input_1.send(SendData::Buffers(10)).unwrap();
+ sender_input_2.send(SendData::Buffers(12)).unwrap();
+ receiver_input_done_1.recv().unwrap();
+ sender_input_1.send(SendData::Eos).unwrap();
+ receiver_input_done_1.recv().unwrap();
+ sender_input_2.send(SendData::Eos).unwrap();
+ receiver_input_done_2.recv().unwrap();
+ receiver_input_done_2.recv().unwrap();
+ // PTS 5 maps to running time 0 now
+ let mut segment_1 = gst::FormattedSegment::<gst::ClockTime>::new();
+ let buffers_1 = recv_buffers(&receiver_output_1, &mut segment_1, 0);
+ for (index, &(running_time, pts, duration)) in buffers_1.iter().enumerate() {
+ let index = index as u64;
+ assert_eq!(running_time, index * 20 * gst::MSECOND);
+ assert_eq!(pts, 5 * gst::MSECOND + index * 20 * gst::MSECOND);
+ assert_eq!(duration, 20 * gst::MSECOND);
+ }
+ assert_eq!(buffers_1.len(), 10);
+ // First and second last buffer should be clipped from second stream,
+ // last buffer should be dropped
+ let mut segment_2 = gst::FormattedSegment::<gst::ClockTime>::new();
+ let buffers_2 = recv_buffers(&receiver_output_2, &mut segment_2, 0);
+ for (index, &(running_time, pts, duration)) in buffers_2.iter().enumerate() {
+ let index = index as u64;
+ if index == 0 {
+ assert_eq!(running_time, index * 20 * gst::MSECOND);
+ assert_eq!(pts, 5 * gst::MSECOND + index * 20 * gst::MSECOND);
+ assert_eq!(duration, 15 * gst::MSECOND);
+ } else if index == 10 {
+ assert_eq!(running_time, index * 20 * gst::MSECOND - 5 * gst::MSECOND);
+ assert_eq!(pts, index * 20 * gst::MSECOND);
+ assert_eq!(duration, 5 * gst::MSECOND);
+ } else {
+ assert_eq!(running_time, index * 20 * gst::MSECOND - 5 * gst::MSECOND);
+ assert_eq!(pts, index * 20 * gst::MSECOND);
+ assert_eq!(duration, 20 * gst::MSECOND);
+ }
+ }
+ assert_eq!(buffers_2.len(), 11);
+ thread_1.join().unwrap();
+ thread_2.join().unwrap();
+ pipeline.set_state(gst::State::Null).unwrap();
+fn test_two_stream_open_close() {
+ init();
+ let pipeline = gst::Pipeline::new(None);
+ let togglerecord = gst::ElementFactory::make("togglerecord", None).unwrap();
+ pipeline.add(&togglerecord).unwrap();
+ let (sender_input_1, receiver_input_done_1, receiver_output_1, thread_1) =
+ setup_sender_receiver(&pipeline, &togglerecord, "src", 0.into());
+ let (sender_input_2, receiver_input_done_2, receiver_output_2, thread_2) =
+ setup_sender_receiver(&pipeline, &togglerecord, "src_%u", 0.into());
+ pipeline.set_state(gst::State::Playing).unwrap();
+ togglerecord.set_property("record", &true).unwrap();
+ sender_input_1.send(SendData::Buffers(10)).unwrap();
+ sender_input_2.send(SendData::Buffers(11)).unwrap();
+ // Sender 2 is waiting for sender 1 to continue, sender 1 is finished
+ receiver_input_done_1.recv().unwrap();
+ // Stop recording and push new buffers to sender 1, which will advance
+ // it and release the 11th buffer of sender 2 above
+ togglerecord.set_property("record", &false).unwrap();
+ sender_input_1.send(SendData::Buffers(10)).unwrap();
+ receiver_input_done_2.recv().unwrap();
+ // Send another 9 buffers to sender 2, both are the same position now
+ sender_input_2.send(SendData::Buffers(9)).unwrap();
+ // Wait until all 20 buffers of both senders are done
+ receiver_input_done_1.recv().unwrap();
+ receiver_input_done_2.recv().unwrap();
+ // Send EOS and wait for it to be handled
+ sender_input_1.send(SendData::Eos).unwrap();
+ sender_input_2.send(SendData::Eos).unwrap();
+ receiver_input_done_1.recv().unwrap();
+ receiver_input_done_2.recv().unwrap();
+ let mut segment_1 = gst::FormattedSegment::<gst::ClockTime>::new();
+ let buffers_1 = recv_buffers(&receiver_output_1, &mut segment_1, 0);
+ for (index, &(running_time, pts, duration)) in buffers_1.iter().enumerate() {
+ let index = index as u64;
+ assert_eq!(running_time, index * 20 * gst::MSECOND);
+ assert_eq!(pts, index * 20 * gst::MSECOND);
+ assert_eq!(duration, 20 * gst::MSECOND);
+ }
+ assert_eq!(buffers_1.len(), 10);
+ // Last buffer should be dropped from second stream
+ let mut segment_2 = gst::FormattedSegment::<gst::ClockTime>::new();
+ let buffers_2 = recv_buffers(&receiver_output_2, &mut segment_2, 0);
+ for (index, &(running_time, pts, duration)) in buffers_2.iter().enumerate() {
+ let index = index as u64;
+ assert_eq!(running_time, index * 20 * gst::MSECOND);
+ assert_eq!(pts, index * 20 * gst::MSECOND);
+ assert_eq!(duration, 20 * gst::MSECOND);
+ }
+ assert_eq!(buffers_2.len(), 10);
+ thread_1.join().unwrap();
+ thread_2.join().unwrap();
+ pipeline.set_state(gst::State::Null).unwrap();
+fn test_two_stream_close_open() {
+ init();
+ let pipeline = gst::Pipeline::new(None);
+ let togglerecord = gst::ElementFactory::make("togglerecord", None).unwrap();
+ pipeline.add(&togglerecord).unwrap();
+ let (sender_input_1, receiver_input_done_1, receiver_output_1, thread_1) =
+ setup_sender_receiver(&pipeline, &togglerecord, "src", 0.into());
+ let (sender_input_2, receiver_input_done_2, receiver_output_2, thread_2) =
+ setup_sender_receiver(&pipeline, &togglerecord, "src_%u", 0.into());
+ pipeline.set_state(gst::State::Playing).unwrap();
+ togglerecord.set_property("record", &false).unwrap();
+ sender_input_1.send(SendData::Buffers(10)).unwrap();
+ sender_input_2.send(SendData::Buffers(11)).unwrap();
+ // Sender 2 is waiting for sender 1 to continue, sender 1 is finished
+ receiver_input_done_1.recv().unwrap();
+ // Start recording and push new buffers to sender 1, which will advance
+ // it and release the 11th buffer of sender 2 above
+ togglerecord.set_property("record", &true).unwrap();
+ sender_input_1.send(SendData::Buffers(10)).unwrap();
+ receiver_input_done_2.recv().unwrap();
+ // Send another 9 buffers to sender 2, both are the same position now
+ sender_input_2.send(SendData::Buffers(9)).unwrap();
+ // Wait until all 20 buffers of both senders are done
+ receiver_input_done_1.recv().unwrap();
+ receiver_input_done_2.recv().unwrap();
+ // Send EOS and wait for it to be handled
+ sender_input_1.send(SendData::Eos).unwrap();
+ sender_input_2.send(SendData::Eos).unwrap();
+ receiver_input_done_1.recv().unwrap();
+ receiver_input_done_2.recv().unwrap();
+ let mut segment_1 = gst::FormattedSegment::<gst::ClockTime>::new();
+ let buffers_1 = recv_buffers(&receiver_output_1, &mut segment_1, 0);
+ for (index, &(running_time, pts, duration)) in buffers_1.iter().enumerate() {
+ let index = index as u64;
+ assert_eq!(running_time, index * 20 * gst::MSECOND);
+ assert_eq!(pts, (10 + index) * 20 * gst::MSECOND);
+ assert_eq!(duration, 20 * gst::MSECOND);
+ }
+ assert_eq!(buffers_1.len(), 10);
+ // Last buffer should be dropped from second stream
+ let mut segment_2 = gst::FormattedSegment::<gst::ClockTime>::new();
+ let buffers_2 = recv_buffers(&receiver_output_2, &mut segment_2, 0);
+ for (index, &(running_time, pts, duration)) in buffers_2.iter().enumerate() {
+ let index = index as u64;
+ assert_eq!(running_time, index * 20 * gst::MSECOND);
+ assert_eq!(pts, (10 + index) * 20 * gst::MSECOND);
+ assert_eq!(duration, 20 * gst::MSECOND);
+ }
+ assert_eq!(buffers_2.len(), 10);
+ thread_1.join().unwrap();
+ thread_2.join().unwrap();
+ pipeline.set_state(gst::State::Null).unwrap();
+fn test_two_stream_open_close_open() {
+ init();
+ let pipeline = gst::Pipeline::new(None);
+ let togglerecord = gst::ElementFactory::make("togglerecord", None).unwrap();
+ pipeline.add(&togglerecord).unwrap();
+ let (sender_input_1, receiver_input_done_1, receiver_output_1, thread_1) =
+ setup_sender_receiver(&pipeline, &togglerecord, "src", 0.into());
+ let (sender_input_2, receiver_input_done_2, receiver_output_2, thread_2) =
+ setup_sender_receiver(&pipeline, &togglerecord, "src_%u", 0.into());
+ pipeline.set_state(gst::State::Playing).unwrap();
+ togglerecord.set_property("record", &true).unwrap();
+ sender_input_1.send(SendData::Buffers(10)).unwrap();
+ sender_input_2.send(SendData::Buffers(11)).unwrap();
+ // Sender 2 is waiting for sender 1 to continue, sender 1 is finished
+ receiver_input_done_1.recv().unwrap();
+ // Stop recording and push new buffers to sender 1, which will advance
+ // it and release the 11th buffer of sender 2 above
+ togglerecord.set_property("record", &false).unwrap();
+ sender_input_1.send(SendData::Buffers(10)).unwrap();
+ receiver_input_done_2.recv().unwrap();
+ // Send another 9 buffers to sender 2, both are the same position now
+ sender_input_2.send(SendData::Buffers(9)).unwrap();
+ // Wait until all 20 buffers of both senders are done
+ receiver_input_done_1.recv().unwrap();
+ receiver_input_done_2.recv().unwrap();
+ // Send another buffer to sender 2, this will block until sender 1 advances
+ // but must not be dropped, although we're not recording (yet)
+ sender_input_2.send(SendData::Buffers(1)).unwrap();
+ // Start recording again and send another set of buffers to both senders
+ togglerecord.set_property("record", &true).unwrap();
+ sender_input_1.send(SendData::Buffers(10)).unwrap();
+ sender_input_2.send(SendData::Buffers(10)).unwrap();
+ receiver_input_done_1.recv().unwrap();
+ // The single buffer above for sender 1 should be handled now
+ receiver_input_done_2.recv().unwrap();
+ // Send EOS and wait for it to be handled
+ sender_input_1.send(SendData::Eos).unwrap();
+ sender_input_2.send(SendData::Eos).unwrap();
+ receiver_input_done_1.recv().unwrap();
+ receiver_input_done_2.recv().unwrap();
+ receiver_input_done_2.recv().unwrap();
+ let mut segment_1 = gst::FormattedSegment::<gst::ClockTime>::new();
+ let buffers_1 = recv_buffers(&receiver_output_1, &mut segment_1, 0);
+ for (index, &(running_time, pts, duration)) in buffers_1.iter().enumerate() {
+ let pts_off = if index >= 10 {
+ 10 * 20 * gst::MSECOND
+ } else {
+ 0.into()
+ };
+ let index = index as u64;
+ assert_eq!(running_time, index * 20 * gst::MSECOND);
+ assert_eq!(pts, pts_off + index * 20 * gst::MSECOND);
+ assert_eq!(duration, 20 * gst::MSECOND);
+ }
+ assert_eq!(buffers_1.len(), 20);
+ // Last buffer should be dropped from second stream
+ let mut segment_2 = gst::FormattedSegment::<gst::ClockTime>::new();
+ let buffers_2 = recv_buffers(&receiver_output_2, &mut segment_2, 0);
+ for (index, &(running_time, pts, duration)) in buffers_2.iter().enumerate() {
+ let pts_off = if index >= 10 {
+ 10 * 20 * gst::MSECOND
+ } else {
+ 0.into()
+ };
+ let index = index as u64;
+ assert_eq!(running_time, index * 20 * gst::MSECOND);
+ assert_eq!(pts, pts_off + index * 20 * gst::MSECOND);
+ assert_eq!(duration, 20 * gst::MSECOND);
+ }
+ assert_eq!(buffers_2.len(), 20);
+ thread_1.join().unwrap();
+ thread_2.join().unwrap();
+ pipeline.set_state(gst::State::Null).unwrap();
+fn test_two_stream_open_close_open_gaps() {
+ init();
+ let pipeline = gst::Pipeline::new(None);
+ let togglerecord = gst::ElementFactory::make("togglerecord", None).unwrap();
+ pipeline.add(&togglerecord).unwrap();
+ let (sender_input_1, receiver_input_done_1, receiver_output_1, thread_1) =
+ setup_sender_receiver(&pipeline, &togglerecord, "src", 0.into());
+ let (sender_input_2, receiver_input_done_2, receiver_output_2, thread_2) =
+ setup_sender_receiver(&pipeline, &togglerecord, "src_%u", 0.into());
+ pipeline.set_state(gst::State::Playing).unwrap();
+ togglerecord.set_property("record", &true).unwrap();
+ sender_input_1.send(SendData::Buffers(3)).unwrap();
+ sender_input_1.send(SendData::Gaps(3)).unwrap();
+ sender_input_1.send(SendData::Buffers(4)).unwrap();
+ sender_input_2.send(SendData::Buffers(11)).unwrap();
+ // Sender 2 is waiting for sender 1 to continue, sender 1 is finished
+ receiver_input_done_1.recv().unwrap();
+ receiver_input_done_1.recv().unwrap();
+ receiver_input_done_1.recv().unwrap();
+ // Stop recording and push new buffers to sender 1, which will advance
+ // it and release the 11th buffer of sender 2 above
+ togglerecord.set_property("record", &false).unwrap();
+ sender_input_1.send(SendData::Buffers(10)).unwrap();
+ receiver_input_done_2.recv().unwrap();
+ // Send another 4 gaps and 5 buffers to sender 2, both are the same position now
+ sender_input_2.send(SendData::Gaps(4)).unwrap();
+ sender_input_2.send(SendData::Buffers(5)).unwrap();
+ // Wait until all 20 buffers of both senders are done
+ receiver_input_done_1.recv().unwrap();
+ receiver_input_done_2.recv().unwrap();
+ receiver_input_done_2.recv().unwrap();
+ // Send another gap to sender 2, this will block until sender 1 advances
+ // but must not be dropped, although we're not recording (yet)
+ sender_input_2.send(SendData::Gaps(1)).unwrap();
+ // Start recording again and send another set of buffers to both senders
+ togglerecord.set_property("record", &true).unwrap();
+ sender_input_1.send(SendData::Buffers(10)).unwrap();
+ sender_input_2.send(SendData::Buffers(10)).unwrap();
+ receiver_input_done_1.recv().unwrap();
+ // The single buffer above for sender 1 should be handled now
+ receiver_input_done_2.recv().unwrap();
+ // Send EOS and wait for it to be handled
+ sender_input_1.send(SendData::Eos).unwrap();
+ sender_input_2.send(SendData::Eos).unwrap();
+ receiver_input_done_1.recv().unwrap();
+ receiver_input_done_2.recv().unwrap();
+ receiver_input_done_2.recv().unwrap();
+ let mut segment_1 = gst::FormattedSegment::<gst::ClockTime>::new();
+ let buffers_1 = recv_buffers(&receiver_output_1, &mut segment_1, 0);
+ for (index, &(running_time, pts, duration)) in buffers_1.iter().enumerate() {
+ let pts_off = if index >= 10 {
+ 10 * 20 * gst::MSECOND
+ } else {
+ 0.into()
+ };
+ let index = index as u64;
+ assert_eq!(running_time, index * 20 * gst::MSECOND);
+ assert_eq!(pts, pts_off + index * 20 * gst::MSECOND);
+ assert_eq!(duration, 20 * gst::MSECOND);
+ }
+ assert_eq!(buffers_1.len(), 20);
+ // Last buffer should be dropped from second stream
+ let mut segment_2 = gst::FormattedSegment::<gst::ClockTime>::new();
+ let buffers_2 = recv_buffers(&receiver_output_2, &mut segment_2, 0);
+ for (index, &(running_time, pts, duration)) in buffers_2.iter().enumerate() {
+ let pts_off = if index >= 10 {
+ 10 * 20 * gst::MSECOND
+ } else {
+ 0.into()
+ };
+ let index = index as u64;
+ assert_eq!(running_time, index * 20 * gst::MSECOND);
+ assert_eq!(pts, pts_off + index * 20 * gst::MSECOND);
+ assert_eq!(duration, 20 * gst::MSECOND);
+ }
+ assert_eq!(buffers_2.len(), 20);
+ thread_1.join().unwrap();
+ thread_2.join().unwrap();
+ pipeline.set_state(gst::State::Null).unwrap();
+fn test_two_stream_close_open_close_delta() {
+ init();
+ let pipeline = gst::Pipeline::new(None);
+ let togglerecord = gst::ElementFactory::make("togglerecord", None).unwrap();
+ pipeline.add(&togglerecord).unwrap();
+ let (sender_input_1, receiver_input_done_1, receiver_output_1, thread_1) =
+ setup_sender_receiver(&pipeline, &togglerecord, "src", 0.into());
+ let (sender_input_2, receiver_input_done_2, receiver_output_2, thread_2) =
+ setup_sender_receiver(&pipeline, &togglerecord, "src_%u", 0.into());
+ pipeline.set_state(gst::State::Playing).unwrap();
+ togglerecord.set_property("record", &false).unwrap();
+ sender_input_1.send(SendData::Buffers(10)).unwrap();
+ sender_input_2.send(SendData::Buffers(11)).unwrap();
+ // Sender 2 is waiting for sender 1 to continue, sender 1 is finished
+ receiver_input_done_1.recv().unwrap();
+ // Start recording and push new buffers to sender 1. The first one is a delta frame,
+ // so will be dropped, and as such the next frame of sender 2 will also be dropped
+ // Sender 2 is empty now
+ togglerecord.set_property("record", &true).unwrap();
+ sender_input_1.send(SendData::BuffersDelta(1)).unwrap();
+ sender_input_1.send(SendData::Buffers(9)).unwrap();
+ receiver_input_done_2.recv().unwrap();
+ // Send another 9 buffers to sender 2, both are the same position now
+ sender_input_2.send(SendData::Buffers(9)).unwrap();
+ // Wait until all 20 buffers of both senders are done
+ receiver_input_done_1.recv().unwrap();
+ receiver_input_done_1.recv().unwrap();
+ receiver_input_done_2.recv().unwrap();
+ // Send another buffer to sender 2, this will block until sender 1 advances
+ // but must not be dropped, and we're still recording
+ sender_input_2.send(SendData::Buffers(1)).unwrap();
+ // Stop recording again and send another set of buffers to both senders
+ // The first one is a delta frame, so we only actually stop recording
+ // after recording another frame
+ togglerecord.set_property("record", &false).unwrap();
+ sender_input_1.send(SendData::BuffersDelta(1)).unwrap();
+ sender_input_1.send(SendData::Buffers(9)).unwrap();
+ sender_input_2.send(SendData::Buffers(10)).unwrap();
+ receiver_input_done_1.recv().unwrap();
+ receiver_input_done_1.recv().unwrap();
+ // The single buffer above for sender 1 should be handled now
+ receiver_input_done_2.recv().unwrap();
+ // Send EOS and wait for it to be handled
+ sender_input_1.send(SendData::Eos).unwrap();
+ sender_input_2.send(SendData::Eos).unwrap();
+ receiver_input_done_1.recv().unwrap();
+ receiver_input_done_2.recv().unwrap();
+ receiver_input_done_2.recv().unwrap();
+ let mut segment_1 = gst::FormattedSegment::<gst::ClockTime>::new();
+ let buffers_1 = recv_buffers(&receiver_output_1, &mut segment_1, 0);
+ for (index, &(running_time, pts, duration)) in buffers_1.iter().enumerate() {
+ let index = index as u64;
+ assert_eq!(running_time, index * 20 * gst::MSECOND);
+ assert_eq!(pts, (11 + index) * 20 * gst::MSECOND);
+ assert_eq!(duration, 20 * gst::MSECOND);
+ }
+ assert_eq!(buffers_1.len(), 10);
+ // Last buffer should be dropped from second stream
+ let mut segment_2 = gst::FormattedSegment::<gst::ClockTime>::new();
+ let buffers_2 = recv_buffers(&receiver_output_2, &mut segment_2, 0);
+ for (index, &(running_time, pts, duration)) in buffers_2.iter().enumerate() {
+ let index = index as u64;
+ assert_eq!(running_time, index * 20 * gst::MSECOND);
+ assert_eq!(pts, (11 + index) * 20 * gst::MSECOND);
+ assert_eq!(duration, 20 * gst::MSECOND);
+ }
+ assert_eq!(buffers_2.len(), 10);
+ thread_1.join().unwrap();
+ thread_2.join().unwrap();
+ pipeline.set_state(gst::State::Null).unwrap();
+fn test_three_stream_open_close_open() {
+ init();
+ let pipeline = gst::Pipeline::new(None);
+ let togglerecord = gst::ElementFactory::make("togglerecord", None).unwrap();
+ pipeline.add(&togglerecord).unwrap();
+ let (sender_input_1, receiver_input_done_1, receiver_output_1, thread_1) =
+ setup_sender_receiver(&pipeline, &togglerecord, "src", 0.into());
+ let (sender_input_2, receiver_input_done_2, receiver_output_2, thread_2) =
+ setup_sender_receiver(&pipeline, &togglerecord, "src_%u", 0.into());
+ let (sender_input_3, receiver_input_done_3, receiver_output_3, thread_3) =
+ setup_sender_receiver(&pipeline, &togglerecord, "src_%u", 0.into());
+ pipeline.set_state(gst::State::Playing).unwrap();
+ togglerecord.set_property("record", &true).unwrap();
+ sender_input_1.send(SendData::Buffers(10)).unwrap();
+ sender_input_2.send(SendData::Buffers(11)).unwrap();
+ sender_input_3.send(SendData::Buffers(10)).unwrap();
+ // Sender 2 is waiting for sender 1 to continue, sender 1/3 are finished
+ receiver_input_done_1.recv().unwrap();
+ receiver_input_done_3.recv().unwrap();
+ // Stop recording and push new buffers to sender 1, which will advance
+ // it and release the 11th buffer of sender 2 above
+ togglerecord.set_property("record", &false).unwrap();
+ sender_input_1.send(SendData::Buffers(10)).unwrap();
+ receiver_input_done_2.recv().unwrap();
+ // Send another 9 buffers to sender 2, 1/2 are at the same position now
+ sender_input_2.send(SendData::Buffers(9)).unwrap();
+ // Send the remaining 10 buffers to sender 3, all are at the same position now
+ sender_input_3.send(SendData::Buffers(10)).unwrap();
+ // Wait until all 20 buffers of all senders are done
+ receiver_input_done_1.recv().unwrap();
+ receiver_input_done_2.recv().unwrap();
+ receiver_input_done_3.recv().unwrap();
+ // Send another buffer to sender 2, this will block until sender 1 advances
+ // but must not be dropped, although we're not recording (yet)
+ sender_input_2.send(SendData::Buffers(1)).unwrap();
+ // Start recording again and send another set of buffers to both senders
+ togglerecord.set_property("record", &true).unwrap();
+ sender_input_1.send(SendData::Buffers(10)).unwrap();
+ sender_input_2.send(SendData::Buffers(10)).unwrap();
+ sender_input_3.send(SendData::Buffers(5)).unwrap();
+ receiver_input_done_1.recv().unwrap();
+ // The single buffer above for sender 1 should be handled now
+ receiver_input_done_2.recv().unwrap();
+ receiver_input_done_3.recv().unwrap();
+ sender_input_3.send(SendData::Buffers(5)).unwrap();
+ receiver_input_done_3.recv().unwrap();
+ // Send EOS and wait for it to be handled
+ sender_input_1.send(SendData::Eos).unwrap();
+ sender_input_2.send(SendData::Eos).unwrap();
+ sender_input_3.send(SendData::Eos).unwrap();
+ receiver_input_done_1.recv().unwrap();
+ receiver_input_done_2.recv().unwrap();
+ receiver_input_done_3.recv().unwrap();
+ let mut segment_1 = gst::FormattedSegment::<gst::ClockTime>::new();
+ let buffers_1 = recv_buffers(&receiver_output_1, &mut segment_1, 0);
+ for (index, &(running_time, pts, duration)) in buffers_1.iter().enumerate() {
+ let pts_off = if index >= 10 {
+ 10 * 20 * gst::MSECOND
+ } else {
+ 0.into()
+ };
+ let index = index as u64;
+ assert_eq!(running_time, index * 20 * gst::MSECOND);
+ assert_eq!(pts, pts_off + index * 20 * gst::MSECOND);
+ assert_eq!(duration, 20 * gst::MSECOND);
+ }
+ assert_eq!(buffers_1.len(), 20);
+ // Last buffer should be dropped from second stream
+ let mut segment_2 = gst::FormattedSegment::<gst::ClockTime>::new();
+ let buffers_2 = recv_buffers(&receiver_output_2, &mut segment_2, 0);
+ for (index, &(running_time, pts, duration)) in buffers_2.iter().enumerate() {
+ let pts_off = if index >= 10 {
+ 10 * 20 * gst::MSECOND
+ } else {
+ 0.into()
+ };
+ let index = index as u64;
+ assert_eq!(running_time, index * 20 * gst::MSECOND);
+ assert_eq!(pts, pts_off + index * 20 * gst::MSECOND);
+ assert_eq!(duration, 20 * gst::MSECOND);
+ }
+ assert_eq!(buffers_2.len(), 20);
+ let mut segment_3 = gst::FormattedSegment::<gst::ClockTime>::new();
+ let buffers_3 = recv_buffers(&receiver_output_3, &mut segment_3, 0);
+ for (index, &(running_time, pts, duration)) in buffers_3.iter().enumerate() {
+ let pts_off = if index >= 10 {
+ 10 * 20 * gst::MSECOND
+ } else {
+ 0.into()
+ };
+ let index = index as u64;
+ assert_eq!(running_time, index * 20 * gst::MSECOND);
+ assert_eq!(pts, pts_off + index * 20 * gst::MSECOND);
+ assert_eq!(duration, 20 * gst::MSECOND);
+ }
+ assert_eq!(buffers_3.len(), 20);
+ thread_1.join().unwrap();
+ thread_2.join().unwrap();
+ thread_3.join().unwrap();
+ pipeline.set_state(gst::State::Null).unwrap();