Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.freedesktop.org/gstreamer/gst-plugins-rs.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGuillaume Desmottes <guillaume.desmottes@onestream.live>2021-10-15 17:57:55 +0300
committerGuillaume Desmottes <guillaume.desmottes@onestream.live>2021-11-29 12:55:01 +0300
commitf9a39b1138d77c48a54735c74a49ee5e28ab887d (patch)
tree42b1521e6f24d81d1718724336d9f1f1b74da84a
parent97e6a89cac2992c27fb9dc341a10b0b5dadbb24b (diff)
add uriplaylistbin plugin
uriplaylistbin plays a list of URIs sequentially, ensuring gapless transitions and proper streams synchronization.
-rw-r--r--Cargo.toml2
-rw-r--r--meson.build1
-rw-r--r--utils/uriplaylistbin/Cargo.toml50
-rw-r--r--utils/uriplaylistbin/build.rs3
-rw-r--r--utils/uriplaylistbin/examples/playlist.rs137
-rw-r--r--utils/uriplaylistbin/src/lib.rs28
-rw-r--r--utils/uriplaylistbin/src/uriplaylistbin/imp.rs1572
-rw-r--r--utils/uriplaylistbin/src/uriplaylistbin/mod.rs30
-rw-r--r--utils/uriplaylistbin/tests/sample.mkvbin0 -> 58531 bytes
-rw-r--r--utils/uriplaylistbin/tests/sample.oggbin0 -> 4618 bytes
-rw-r--r--utils/uriplaylistbin/tests/uriplaylistbin.rs299
11 files changed, 2122 insertions, 0 deletions
diff --git a/Cargo.toml b/Cargo.toml
index e20655541..796c65686 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -16,6 +16,7 @@ members = [
"net/rusoto",
"utils/fallbackswitch",
"utils/togglerecord",
+ "utils/uriplaylistbin",
"video/cdg",
"video/closedcaption",
"video/videofx",
@@ -46,6 +47,7 @@ default-members = [
"net/rusoto",
"utils/fallbackswitch",
"utils/togglerecord",
+ "utils/uriplaylistbin",
"video/cdg",
"video/ffv1",
"video/flavors",
diff --git a/meson.build b/meson.build
index 8dbbef342..772d714d8 100644
--- a/meson.build
+++ b/meson.build
@@ -58,6 +58,7 @@ plugins = {
# https://github.com/qnighy/libwebp-sys2-rs/issues/4
'gst-plugin-webp': 'libgstrswebp',
'gst-plugin-videofx': 'libgstvideofx',
+ 'gst-plugin-uriplaylistbin': 'libgsturiplaylistbin',
}
extra_env = {}
diff --git a/utils/uriplaylistbin/Cargo.toml b/utils/uriplaylistbin/Cargo.toml
new file mode 100644
index 000000000..1b1022650
--- /dev/null
+++ b/utils/uriplaylistbin/Cargo.toml
@@ -0,0 +1,50 @@
+[package]
+name = "gst-plugin-uriplaylistbin"
+version = "0.8.0"
+authors = ["Guillaume Desmottes <guillaume.desmottes@onestream.live>"]
+repository = "https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs"
+license = "MPL-2.0"
+edition = "2018"
+description = "Playlist Plugin"
+
+[dependencies]
+gst = { package = "gstreamer", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs", features = ["v1_14"] }
+once_cell = "1.0"
+anyhow = "1"
+crossbeam-channel = "0.5"
+
+[dev-dependencies]
+gst-app = { package = "gstreamer-app", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs", features = ["v1_14"]}
+structopt = "0.3"
+url = "2.2"
+more-asserts = "0.2"
+
+[lib]
+name = "gsturiplaylistbin"
+crate-type = ["cdylib", "rlib"]
+path = "src/lib.rs"
+
+[[example]]
+name = "playlist"
+path = "examples/playlist.rs"
+
+[build-dependencies]
+gst-plugin-version-helper = { path="../../version-helper" }
+
+[features]
+# GStreamer 1.14 is required for static linking
+static = ["gst/v1_14"]
+capi = []
+
+[package.metadata.capi]
+min_version = "0.8.0"
+
+[package.metadata.capi.header]
+enabled = false
+
+[package.metadata.capi.library]
+install_subdir = "gstreamer-1.0"
+versioning = false
+
+[package.metadata.capi.pkg_config]
+requires_private = "gstreamer-1.0, gobject-2.0, glib-2.0, gmodule-2.0"
diff --git a/utils/uriplaylistbin/build.rs b/utils/uriplaylistbin/build.rs
new file mode 100644
index 000000000..cda12e57e
--- /dev/null
+++ b/utils/uriplaylistbin/build.rs
@@ -0,0 +1,3 @@
+fn main() {
+ gst_plugin_version_helper::info()
+}
diff --git a/utils/uriplaylistbin/examples/playlist.rs b/utils/uriplaylistbin/examples/playlist.rs
new file mode 100644
index 000000000..ee81000bc
--- /dev/null
+++ b/utils/uriplaylistbin/examples/playlist.rs
@@ -0,0 +1,137 @@
+// Copyright (C) 2021 OneStream Live <guillaume.desmottes@onestream.live>
+//
+// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0.
+// If a copy of the MPL was not distributed with this file, You can obtain one at
+// <https://mozilla.org/MPL/2.0/>.
+//
+// SPDX-License-Identifier: MPL-2.0
+
+use std::{
+ collections::HashMap,
+ path::Path,
+ sync::{Arc, Mutex},
+};
+
+use gst::prelude::*;
+use structopt::StructOpt;
+
+#[derive(Debug, StructOpt)]
+#[structopt(name = "playlist", about = "An example of uriplaylistbin usage.")]
+struct Opt {
+ #[structopt(default_value = "1", short = "i", long = "iterations")]
+ iterations: u32,
+ uris: Vec<String>,
+}
+
+fn create_pipeline(uris: Vec<String>, iterations: u32) -> anyhow::Result<gst::Pipeline> {
+ let pipeline = gst::Pipeline::new(None);
+ let playlist = gst::ElementFactory::make("uriplaylistbin", None)?;
+
+ pipeline.add(&playlist)?;
+
+ playlist.set_property("uris", &uris);
+ playlist.set_property("iterations", &iterations);
+
+ let sink_bins = Arc::new(Mutex::new(HashMap::new()));
+ let sink_bins_clone = sink_bins.clone();
+
+ let pipeline_weak = pipeline.downgrade();
+ playlist.connect_pad_added(move |_playlist, src_pad| {
+ let pipeline = match pipeline_weak.upgrade() {
+ None => return,
+ Some(pipeline) => pipeline,
+ };
+ let pad_name = src_pad.name();
+
+ let sink = if pad_name.starts_with("audio") {
+ gst::parse_bin_from_description("audioconvert ! audioresample ! autoaudiosink", true)
+ .unwrap()
+ } else if pad_name.starts_with("video") {
+ gst::parse_bin_from_description("videoconvert ! autovideosink", true).unwrap()
+ } else {
+ unimplemented!();
+ };
+
+ pipeline.add(&sink).unwrap();
+ sink.sync_state_with_parent().unwrap();
+
+ let sink_pad = sink.static_pad("sink").unwrap();
+ src_pad.link(&sink_pad).unwrap();
+
+ sink_bins.lock().unwrap().insert(pad_name, sink);
+ });
+
+ let pipeline_weak = pipeline.downgrade();
+ playlist.connect_pad_removed(move |_playlist, pad| {
+ let pipeline = match pipeline_weak.upgrade() {
+ None => return,
+ Some(pipeline) => pipeline,
+ };
+
+ // remove sink bin that was handling the pad
+ let sink_bins = sink_bins_clone.lock().unwrap();
+ let sink = sink_bins.get(&pad.name()).unwrap();
+ pipeline.remove(sink).unwrap();
+ let _ = sink.set_state(gst::State::Null);
+ });
+
+ Ok(pipeline)
+}
+
+fn main() -> anyhow::Result<()> {
+ gst::init().unwrap();
+ gsturiplaylistbin::plugin_register_static().expect("Failed to register uriplaylistbin plugin");
+
+ let opt = Opt::from_args();
+ if opt.uris.is_empty() {
+ anyhow::bail!("Need at least one URI to play");
+ }
+
+ let uris = opt
+ .uris
+ .into_iter()
+ .map(|uri| {
+ let p = Path::new(&uri);
+ match p.canonicalize() {
+ Ok(p) => format!("file://{}", p.to_str().unwrap().to_string()),
+ _ => uri,
+ }
+ })
+ .collect();
+
+ {
+ let pipeline = create_pipeline(uris, opt.iterations)?;
+
+ pipeline
+ .set_state(gst::State::Playing)
+ .expect("Unable to set the pipeline to the `Playing` state");
+
+ let bus = pipeline.bus().unwrap();
+ for msg in bus.iter_timed(gst::ClockTime::NONE) {
+ use gst::MessageView;
+ match msg.view() {
+ MessageView::Error(err) => {
+ eprintln!(
+ "Error received from element {:?}: {}",
+ err.src().map(|s| s.path_string()),
+ err.error()
+ );
+ eprintln!("Debugging information: {:?}", err.debug());
+ break;
+ }
+ MessageView::Eos(..) => break,
+ _ => (),
+ }
+ }
+
+ pipeline
+ .set_state(gst::State::Null)
+ .expect("Unable to set the pipeline to the `Null` state");
+ }
+
+ unsafe {
+ gst::deinit();
+ }
+
+ Ok(())
+}
diff --git a/utils/uriplaylistbin/src/lib.rs b/utils/uriplaylistbin/src/lib.rs
new file mode 100644
index 000000000..38c32e8ec
--- /dev/null
+++ b/utils/uriplaylistbin/src/lib.rs
@@ -0,0 +1,28 @@
+// Copyright (C) 2021 OneStream Live <guillaume.desmottes@onestream.live>
+//
+// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0.
+// If a copy of the MPL was not distributed with this file, You can obtain one at
+// <https://mozilla.org/MPL/2.0/>.
+//
+// SPDX-License-Identifier: MPL-2.0
+
+use gst::glib;
+
+mod uriplaylistbin;
+
+fn plugin_init(plugin: &gst::Plugin) -> Result<(), glib::BoolError> {
+ uriplaylistbin::register(plugin)?;
+ Ok(())
+}
+
+gst::plugin_define!(
+ uriplaylistbin,
+ env!("CARGO_PKG_DESCRIPTION"),
+ plugin_init,
+ concat!(env!("CARGO_PKG_VERSION"), "-", env!("COMMIT_ID")),
+ "LGPL",
+ env!("CARGO_PKG_NAME"),
+ env!("CARGO_PKG_NAME"),
+ env!("CARGO_PKG_REPOSITORY"),
+ env!("BUILD_REL_DATE")
+);
diff --git a/utils/uriplaylistbin/src/uriplaylistbin/imp.rs b/utils/uriplaylistbin/src/uriplaylistbin/imp.rs
new file mode 100644
index 000000000..906cf5247
--- /dev/null
+++ b/utils/uriplaylistbin/src/uriplaylistbin/imp.rs
@@ -0,0 +1,1572 @@
+// Copyright (C) 2021 OneStream Live <guillaume.desmottes@onestream.live>
+//
+// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0.
+// If a copy of the MPL was not distributed with this file, You can obtain one at
+// <https://mozilla.org/MPL/2.0/>.
+//
+// SPDX-License-Identifier: MPL-2.0
+
+use std::sync::Arc;
+use std::sync::Mutex;
+
+use gst::glib;
+use gst::prelude::*;
+use gst::subclass::prelude::*;
+use gst::{gst_debug, gst_error, gst_fixme, gst_info, gst_log, gst_warning};
+
+use once_cell::sync::Lazy;
+
+static CAT: Lazy<gst::DebugCategory> = Lazy::new(|| {
+ gst::DebugCategory::new(
+ "uriplaylistbin",
+ gst::DebugColorFlags::empty(),
+ Some("Uri Playlist Bin"),
+ )
+});
+
+/// how many items are allowed to be prepared and waiting in the pipeline
+const MAX_STREAMING_ITEMS: usize = 2;
+
+#[derive(Debug)]
+enum PlaylistError {
+ PluginMissing { error: anyhow::Error },
+ ItemFailed { error: anyhow::Error, item: Item },
+}
+
+impl std::fmt::Display for PlaylistError {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ match self {
+ PlaylistError::PluginMissing { error } => {
+ write!(f, "{}", error)
+ }
+ PlaylistError::ItemFailed { error, item } => {
+ write!(f, "{} (URI: {})", error, item.uri())
+ }
+ }
+ }
+}
+
+impl std::error::Error for PlaylistError {
+ fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
+ match self {
+ PlaylistError::PluginMissing { error } | PlaylistError::ItemFailed { error, .. } => {
+ Some(error.as_ref())
+ }
+ }
+ }
+}
+
+/// Number of different streams currently handled by the element
+#[derive(Debug, Clone, PartialEq)]
+struct StreamsTopology {
+ audio: u32,
+ video: u32,
+ text: u32,
+}
+
+impl Default for StreamsTopology {
+ fn default() -> Self {
+ Self {
+ audio: 0,
+ video: 0,
+ text: 0,
+ }
+ }
+}
+
+impl StreamsTopology {
+ fn n_streams(&self) -> u32 {
+ self.audio + self.video + self.text
+ }
+}
+
+impl<'a> From<gst::StreamCollection> for StreamsTopology {
+ fn from(collection: gst::StreamCollection) -> Self {
+ let (mut audio, mut video, mut text) = (0, 0, 0);
+ for stream in collection.iter() {
+ match stream.stream_type() {
+ gst::StreamType::AUDIO => audio += 1,
+ gst::StreamType::VIDEO => video += 1,
+ gst::StreamType::TEXT => text += 1,
+ _ => {}
+ }
+ }
+
+ Self { audio, video, text }
+ }
+}
+
+#[derive(Debug, Clone)]
+struct Settings {
+ uris: Vec<String>,
+ iterations: u32,
+}
+
+impl Default for Settings {
+ fn default() -> Self {
+ Self {
+ uris: vec![],
+ iterations: 1,
+ }
+ }
+}
+
+struct State {
+ streamsynchronizer: gst::Element,
+ concat_audio: Vec<gst::Element>,
+ concat_video: Vec<gst::Element>,
+ concat_text: Vec<gst::Element>,
+
+ playlist: Playlist,
+
+ /// the current number of streams handled by the element
+ streams_topology: StreamsTopology,
+ // true if the element stopped because of an error
+ errored: bool,
+
+ // we have max one item in one of each of those states
+ waiting_for_stream_collection: Option<Item>,
+ waiting_for_ss_eos: Option<Item>,
+ waiting_for_pads: Option<Item>,
+ blocked: Option<Item>,
+ // multiple items can be streaming, `concat` elements will block them all but the active one
+ streaming: Vec<Item>,
+ // items which have been fully played, waiting to be cleaned up
+ done: Vec<Item>,
+}
+
+impl State {
+ fn new(uris: Vec<String>, iterations: u32, streamsynchronizer: gst::Element) -> Self {
+ Self {
+ concat_audio: vec![],
+ concat_video: vec![],
+ concat_text: vec![],
+ streamsynchronizer,
+ playlist: Playlist::new(uris, iterations),
+ streams_topology: StreamsTopology::default(),
+ errored: false,
+ waiting_for_stream_collection: None,
+ waiting_for_ss_eos: None,
+ waiting_for_pads: None,
+ blocked: None,
+ streaming: vec![],
+ done: vec![],
+ }
+ }
+
+ /// Return the item whose decodebin is either `src` or an ancestor of `src`
+ fn find_item_from_src(&self, src: &gst::Object) -> Option<Item> {
+ // iterate in all the places we store `Item`, ordering does not matter
+ // as one decodebin element can be in only one Item.
+ let mut items = self
+ .waiting_for_stream_collection
+ .iter()
+ .chain(self.waiting_for_ss_eos.iter())
+ .chain(self.waiting_for_pads.iter())
+ .chain(self.blocked.iter())
+ .chain(self.streaming.iter())
+ .chain(self.done.iter());
+
+ items
+ .find(|item| {
+ let decodebin = item.uridecodebin();
+ let from_decodebin = src == &decodebin;
+ let bin = decodebin.downcast_ref::<gst::Bin>().unwrap();
+ from_decodebin || src.has_as_ancestor(bin)
+ })
+ .cloned()
+ }
+
+ fn unblock_item(&mut self, element: &super::UriPlaylistBin) {
+ if let Some(blocked) = self.blocked.take() {
+ let (messages, sender) = blocked.set_streaming(self.streams_topology.n_streams());
+
+ gst_log!(
+ CAT,
+ obj: element,
+ "send pending message of item #{} and unblock its pads",
+ blocked.index()
+ );
+
+ // send pending messages then unblock pads
+ for msg in messages {
+ let _ = element.post_message(msg);
+ }
+ let _ = sender.send(true);
+
+ self.streaming.push(blocked);
+ }
+ }
+}
+
+#[derive(Default)]
+pub struct UriPlaylistBin {
+ settings: Mutex<Settings>,
+ state: Mutex<Option<State>>,
+}
+
+#[derive(Debug, Clone)]
+enum ItemState {
+ /// Waiting to create a decodebin element
+ Pending,
+ /// Waiting to receive the stream collection from its decodebin element
+ WaitingForStreamCollection { uridecodebin: gst::Element },
+ /// Waiting for streamsynchronizer to be eos on all its src pads.
+ /// Only used to block item whose streams topology is different from the one
+ /// currently handled by the element. In such case we need to wait for
+ /// streamsynchronizer to be flushed before adding/removing concat elements.
+ WaitingForStreamsynchronizerEos {
+ uridecodebin: gst::Element,
+ /// src pads from decodebin currently blocked
+ decodebin_pads: Vec<gst::Pad>,
+ /// number of streamsynchronizer src pads which are not eos yet
+ waiting_eos: u32,
+ stream_collection_msg: gst::Message,
+ // channel used to block pads flow until streamsynchronizer is eos
+ sender: crossbeam_channel::Sender<bool>,
+ receiver: crossbeam_channel::Receiver<bool>,
+ },
+ /// Waiting that pads of all the streams have been created on decodebin.
+ /// Required to ensure that streams are plugged to concat in the playlist order.
+ WaitingForPads {
+ uridecodebin: gst::Element,
+ n_pads_pendings: u32,
+ stream_collection_msg: gst::Message,
+ /// concat sink pads which have been requested to handle this item
+ concat_sink_pads: Vec<(gst::Element, gst::Pad)>,
+ // channel used to block pad flow in the Blocked state
+ sender: crossbeam_channel::Sender<bool>,
+ receiver: crossbeam_channel::Receiver<bool>,
+ },
+ /// Pads have been linked to `concat` elements but are blocked until the next item is linked to `concat` as well.
+ /// This is required to ensure gap-less transition between items.
+ Blocked {
+ uridecodebin: gst::Element,
+ stream_collection_msg: gst::Message,
+ stream_selected_msg: Option<gst::Message>,
+ concat_sink_pads: Vec<(gst::Element, gst::Pad)>,
+ sender: crossbeam_channel::Sender<bool>,
+ receiver: crossbeam_channel::Receiver<bool>,
+ },
+ /// Buffers are flowing
+ Streaming {
+ uridecodebin: gst::Element,
+ concat_sink_pads: Vec<(gst::Element, gst::Pad)>,
+ // number of pads which are not eos yet
+ waiting_eos: u32,
+ },
+ /// Item has been fully streamed
+ Done {
+ uridecodebin: gst::Element,
+ concat_sink_pads: Vec<(gst::Element, gst::Pad)>,
+ },
+}
+
+#[derive(Debug, Clone)]
+struct Item {
+ inner: Arc<Mutex<ItemInner>>,
+}
+
+impl Item {
+ fn new(uri: String, index: usize) -> Self {
+ let inner = ItemInner {
+ uri,
+ index,
+ state: ItemState::Pending,
+ };
+
+ Self {
+ inner: Arc::new(Mutex::new(inner)),
+ }
+ }
+
+ fn uri(&self) -> String {
+ let inner = self.inner.lock().unwrap();
+ inner.uri.clone()
+ }
+
+ fn index(&self) -> usize {
+ let inner = self.inner.lock().unwrap();
+ inner.index
+ }
+
+ fn uridecodebin(&self) -> gst::Element {
+ let inner = self.inner.lock().unwrap();
+
+ match &inner.state {
+ ItemState::WaitingForStreamCollection { uridecodebin }
+ | ItemState::WaitingForStreamsynchronizerEos { uridecodebin, .. }
+ | ItemState::WaitingForPads { uridecodebin, .. }
+ | ItemState::Blocked { uridecodebin, .. }
+ | ItemState::Streaming { uridecodebin, .. }
+ | ItemState::Done { uridecodebin, .. } => uridecodebin.clone(),
+ _ => unreachable!(),
+ }
+ }
+
+ fn concat_sink_pads(&self) -> Vec<(gst::Element, gst::Pad)> {
+ let inner = self.inner.lock().unwrap();
+
+ match &inner.state {
+ ItemState::WaitingForPads {
+ concat_sink_pads, ..
+ }
+ | ItemState::Blocked {
+ concat_sink_pads, ..
+ }
+ | ItemState::Streaming {
+ concat_sink_pads, ..
+ }
+ | ItemState::Done {
+ concat_sink_pads, ..
+ } => concat_sink_pads.clone(),
+ _ => unreachable!(),
+ }
+ }
+
+ fn dec_n_pads_pending(&self) -> u32 {
+ let mut inner = self.inner.lock().unwrap();
+
+ match &mut inner.state {
+ ItemState::WaitingForPads {
+ n_pads_pendings, ..
+ } => {
+ *n_pads_pendings -= 1;
+ *n_pads_pendings
+ }
+ _ => unreachable!(),
+ }
+ }
+
+ fn receiver(&self) -> crossbeam_channel::Receiver<bool> {
+ let inner = self.inner.lock().unwrap();
+
+ match &inner.state {
+ ItemState::WaitingForPads { receiver, .. } => receiver.clone(),
+ ItemState::WaitingForStreamsynchronizerEos { receiver, .. } => receiver.clone(),
+ // receiver is no longer supposed to be accessed once in the `Blocked` state
+ _ => unreachable!(),
+ }
+ }
+
+ fn add_blocked_pad(&self, pad: gst::Pad) {
+ let mut inner = self.inner.lock().unwrap();
+
+ match &mut inner.state {
+ ItemState::WaitingForStreamsynchronizerEos { decodebin_pads, .. } => {
+ decodebin_pads.push(pad);
+ }
+ _ => unreachable!(),
+ }
+ }
+
+ /// decrement waiting_eos on a WaitingForStreamsynchronizeEos item, returns if all the streams are now eos or not
+ fn dec_waiting_eos_ss(&self) -> bool {
+ let mut inner = self.inner.lock().unwrap();
+
+ match &mut inner.state {
+ ItemState::WaitingForStreamsynchronizerEos { waiting_eos, .. } => {
+ *waiting_eos -= 1;
+ *waiting_eos == 0
+ }
+ _ => unreachable!(),
+ }
+ }
+
+ fn is_streaming(&self) -> bool {
+ let inner = self.inner.lock().unwrap();
+
+ matches!(&inner.state, ItemState::Streaming { .. })
+ }
+
+ /// queue the stream-selected message of a blocked item
+ fn add_stream_selected(&self, msg: gst::Message) {
+ let mut inner = self.inner.lock().unwrap();
+
+ match &mut inner.state {
+ ItemState::Blocked {
+ stream_selected_msg,
+ ..
+ } => {
+ *stream_selected_msg = Some(msg);
+ }
+ _ => unreachable!(),
+ }
+ }
+
+ // decrement waiting_eos on a Streaming item, returns if all the streams are now eos or not
+ fn dec_waiting_eos(&self) -> bool {
+ let mut inner = self.inner.lock().unwrap();
+
+ match &mut inner.state {
+ ItemState::Streaming { waiting_eos, .. } => {
+ *waiting_eos -= 1;
+ *waiting_eos == 0
+ }
+ _ => unreachable!(),
+ }
+ }
+
+ fn add_concat_sink_pad(&self, concat: &gst::Element, sink_pad: &gst::Pad) {
+ let mut inner = self.inner.lock().unwrap();
+
+ match &mut inner.state {
+ ItemState::WaitingForPads {
+ concat_sink_pads, ..
+ } => {
+ concat_sink_pads.push((concat.clone(), sink_pad.clone()));
+ }
+ _ => unreachable!(),
+ }
+ }
+
+ // change state methods
+
+ // from the Pending state, called when starting to process the item
+ fn set_waiting_for_stream_collection(&self) -> Result<(), PlaylistError> {
+ let mut inner = self.inner.lock().unwrap();
+
+ let uridecodebin = gst::ElementFactory::make(
+ "uridecodebin3",
+ Some(&format!("playlist-decodebin-{}", inner.index)),
+ )
+ .map_err(|e| PlaylistError::PluginMissing { error: e.into() })?;
+ uridecodebin.set_property("uri", &inner.uri);
+
+ assert!(matches!(inner.state, ItemState::Pending));
+ inner.state = ItemState::WaitingForStreamCollection { uridecodebin };
+
+ Ok(())
+ }
+
+ // from the WaitingForStreamCollection state, called when we received the item stream collection
+ // and its stream topology matches what is currently being processed by the element.
+ fn set_waiting_for_pads(&self, n_streams: u32, msg: gst::message::StreamCollection) {
+ let mut inner = self.inner.lock().unwrap();
+ assert!(matches!(
+ inner.state,
+ ItemState::WaitingForStreamCollection { .. }
+ ));
+
+ let (sender, receiver) = crossbeam_channel::unbounded::<bool>();
+
+ match &inner.state {
+ ItemState::WaitingForStreamCollection { uridecodebin } => {
+ inner.state = ItemState::WaitingForPads {
+ uridecodebin: uridecodebin.clone(),
+ n_pads_pendings: n_streams,
+ stream_collection_msg: msg.copy(),
+ concat_sink_pads: vec![],
+ sender,
+ receiver,
+ };
+ }
+ _ => unreachable!(),
+ }
+ }
+
+ // from the WaitingForStreamCollection state, called when we received the item stream collection
+ // but its stream topology does not match what is currently being processed by the element,
+ // having to wait until streamsynchronizer is flushed to internally reorganize the element.
+ fn set_waiting_for_ss_eos(&self, waiting_eos: u32, msg: gst::message::StreamCollection) {
+ let mut inner = self.inner.lock().unwrap();
+ let (sender, receiver) = crossbeam_channel::unbounded::<bool>();
+
+ match &inner.state {
+ ItemState::WaitingForStreamCollection { uridecodebin } => {
+ inner.state = ItemState::WaitingForStreamsynchronizerEos {
+ uridecodebin: uridecodebin.clone(),
+ decodebin_pads: vec![],
+ waiting_eos,
+ // FIXME: save deep copy once https://gitlab.freedesktop.org/gstreamer/gstreamer-rs/-/issues/363 is fixed
+ stream_collection_msg: msg.copy(),
+ sender,
+ receiver,
+ };
+ }
+ _ => unreachable!(),
+ }
+ }
+
+ // from the WaitingForStreamsynchronizerEos state, called when the streamsynchronizer has been flushed
+ // and the item can now be processed.
+ fn done_waiting_for_ss_eos(
+ &self,
+ ) -> (
+ StreamsTopology,
+ Vec<gst::Pad>,
+ crossbeam_channel::Sender<bool>,
+ ) {
+ let mut inner = self.inner.lock().unwrap();
+
+ match &inner.state {
+ ItemState::WaitingForStreamsynchronizerEos {
+ uridecodebin,
+ decodebin_pads,
+ waiting_eos,
+ stream_collection_msg,
+ sender,
+ ..
+ } => {
+ assert_eq!(*waiting_eos, 0);
+
+ let topology = match stream_collection_msg.view() {
+ gst::MessageView::StreamCollection(stream_collection_msg) => {
+ StreamsTopology::from(stream_collection_msg.stream_collection())
+ }
+ _ => unreachable!(),
+ };
+ let pending_pads = decodebin_pads.clone();
+ let sender = sender.clone();
+
+ let (new_sender, new_receiver) = crossbeam_channel::unbounded::<bool>();
+
+ inner.state = ItemState::WaitingForPads {
+ uridecodebin: uridecodebin.clone(),
+ n_pads_pendings: topology.n_streams(),
+ stream_collection_msg: stream_collection_msg.copy(),
+ concat_sink_pads: vec![],
+ sender: new_sender,
+ receiver: new_receiver,
+ };
+
+ (topology, pending_pads, sender)
+ }
+ _ => unreachable!(),
+ }
+ }
+
+ // from the WaitingForPads state, called when all the pads from decodebin have been added and connected to concat elements.
+ fn set_blocked(&self) {
+ let mut inner = self.inner.lock().unwrap();
+
+ match &mut inner.state {
+ ItemState::WaitingForPads {
+ uridecodebin,
+ sender,
+ receiver,
+ stream_collection_msg,
+ concat_sink_pads,
+ ..
+ } => {
+ inner.state = ItemState::Blocked {
+ uridecodebin: uridecodebin.clone(),
+ sender: sender.clone(),
+ receiver: receiver.clone(),
+ concat_sink_pads: concat_sink_pads.clone(),
+ stream_collection_msg: stream_collection_msg.copy(),
+ stream_selected_msg: None,
+ };
+ }
+ _ => unreachable!(),
+ }
+ }
+
+ // from the Blocked state, called when the item streaming threads can be unblocked.
+ // Return the queued messages from this item and the sender to unblock their pads
+ fn set_streaming(
+ &self,
+ n_streams: u32,
+ ) -> (Vec<gst::Message>, crossbeam_channel::Sender<bool>) {
+ let mut inner = self.inner.lock().unwrap();
+
+ match &mut inner.state {
+ ItemState::Blocked {
+ uridecodebin,
+ sender,
+ stream_collection_msg,
+ stream_selected_msg,
+ concat_sink_pads,
+ ..
+ } => {
+ let mut messages = vec![stream_collection_msg.copy()];
+ if let Some(msg) = stream_selected_msg {
+ messages.push(msg.copy());
+ }
+ let sender = sender.clone();
+
+ inner.state = ItemState::Streaming {
+ uridecodebin: uridecodebin.clone(),
+ waiting_eos: n_streams,
+ concat_sink_pads: concat_sink_pads.clone(),
+ };
+
+ (messages, sender)
+ }
+ _ => unreachable!(),
+ }
+ }
+
+ // from the Streaming state, called when the item has been fully processed and can be cleaned up
+ fn set_done(&self) {
+ let mut inner = self.inner.lock().unwrap();
+
+ match &mut inner.state {
+ ItemState::Streaming {
+ uridecodebin,
+ concat_sink_pads,
+ ..
+ } => {
+ inner.state = ItemState::Done {
+ uridecodebin: uridecodebin.clone(),
+ concat_sink_pads: concat_sink_pads.clone(),
+ };
+ }
+ _ => unreachable!(),
+ }
+ }
+}
+
+#[derive(Debug, Clone)]
+struct ItemInner {
+ uri: String,
+ index: usize,
+ state: ItemState,
+}
+
+struct Playlist {
+ items: Box<dyn Iterator<Item = Item> + Send>,
+}
+
+impl Playlist {
+ fn new(uris: Vec<String>, iterations: u32) -> Self {
+ fn infinite_iter(uris: Vec<String>) -> Box<dyn Iterator<Item = Item> + Send + Sync> {
+ Box::new(
+ uris.into_iter()
+ .cycle()
+ .enumerate()
+ .map(|(index, uri)| Item::new(uri, index)),
+ )
+ }
+ fn finite_iter(
+ uris: Vec<String>,
+ iterations: u32,
+ ) -> Box<dyn Iterator<Item = Item> + Send + Sync> {
+ let n = (iterations as usize)
+ .checked_mul(uris.len())
+ .unwrap_or(usize::MAX);
+
+ Box::new(
+ uris.into_iter()
+ .cycle()
+ .take(n)
+ .enumerate()
+ .map(|(index, uri)| Item::new(uri, index)),
+ )
+ }
+
+ let items = if iterations == 0 {
+ infinite_iter(uris)
+ } else {
+ finite_iter(uris, iterations)
+ };
+
+ Self { items }
+ }
+
+ fn next(&mut self) -> Result<Option<Item>, PlaylistError> {
+ let item = match self.items.next() {
+ None => return Ok(None),
+ Some(item) => item,
+ };
+
+ item.set_waiting_for_stream_collection()?;
+
+ Ok(Some(item))
+ }
+}
+
+fn stream_type_from_pad_name(name: &str) -> anyhow::Result<(gst::StreamType, usize)> {
+ if let Some(index) = name.strip_prefix("audio_") {
+ Ok((gst::StreamType::AUDIO, index.parse().unwrap()))
+ } else if let Some(index) = name.strip_prefix("video_") {
+ Ok((gst::StreamType::VIDEO, index.parse().unwrap()))
+ } else if let Some(index) = name.strip_prefix("text_") {
+ Ok((gst::StreamType::TEXT, index.parse().unwrap()))
+ } else {
+ Err(anyhow::anyhow!("type of pad {} not supported", name))
+ }
+}
+
+#[glib::object_subclass]
+impl ObjectSubclass for UriPlaylistBin {
+ const NAME: &'static str = "GstUriPlaylistBin";
+ type Type = super::UriPlaylistBin;
+ type ParentType = gst::Bin;
+}
+
+impl ObjectImpl for UriPlaylistBin {
+ fn properties() -> &'static [glib::ParamSpec] {
+ static PROPERTIES: Lazy<Vec<glib::ParamSpec>> = Lazy::new(|| {
+ vec![
+ glib::ParamSpecBoxed::new(
+ "uris",
+ "URIs",
+ "URIs of the medias to play",
+ Vec::<String>::static_type(),
+ glib::ParamFlags::READWRITE | gst::PARAM_FLAG_MUTABLE_READY,
+ ),
+ glib::ParamSpecUInt::new(
+ "iterations",
+ "Iterations",
+ "Number of time the playlist items should be played each (0 = unlimited)",
+ 0,
+ u32::MAX,
+ 1,
+ glib::ParamFlags::READWRITE | gst::PARAM_FLAG_MUTABLE_READY,
+ ),
+ ]
+ });
+
+ PROPERTIES.as_ref()
+ }
+
+ fn set_property(
+ &self,
+ obj: &Self::Type,
+ _id: usize,
+ value: &glib::Value,
+ pspec: &glib::ParamSpec,
+ ) {
+ match pspec.name() {
+ "uris" => {
+ let mut settings = self.settings.lock().unwrap();
+ let new_value = value.get().expect("type checked upstream");
+ gst_info!(
+ CAT,
+ obj: obj,
+ "Changing uris from {:?} to {:?}",
+ settings.uris,
+ new_value,
+ );
+ settings.uris = new_value;
+ }
+ "iterations" => {
+ let mut settings = self.settings.lock().unwrap();
+ let new_value = value.get().expect("type checked upstream");
+ gst_info!(
+ CAT,
+ obj: obj,
+ "Changing iterations from {:?} to {:?}",
+ settings.iterations,
+ new_value,
+ );
+ settings.iterations = new_value;
+ }
+ _ => unimplemented!(),
+ }
+ }
+
+ fn property(&self, _obj: &Self::Type, _id: usize, pspec: &glib::ParamSpec) -> glib::Value {
+ match pspec.name() {
+ "uris" => {
+ let settings = self.settings.lock().unwrap();
+ settings.uris.to_value()
+ }
+ "iterations" => {
+ let settings = self.settings.lock().unwrap();
+ settings.iterations.to_value()
+ }
+ _ => unimplemented!(),
+ }
+ }
+
+ fn constructed(&self, obj: &Self::Type) {
+ self.parent_constructed(obj);
+
+ obj.set_suppressed_flags(gst::ElementFlags::SOURCE | gst::ElementFlags::SINK);
+ obj.set_element_flags(gst::ElementFlags::SOURCE);
+ }
+}
+
+impl GstObjectImpl for UriPlaylistBin {}
+
+impl BinImpl for UriPlaylistBin {
+ fn handle_message(&self, element: &Self::Type, msg: gst::Message) {
+ match msg.view() {
+ gst::MessageView::StreamCollection(stream_collection_msg) => {
+ if let Err(e) = self.handle_stream_collection(element, stream_collection_msg) {
+ self.failed(element, e);
+ }
+ // stream collection will be send when the item starts streaming
+ return;
+ }
+ gst::MessageView::StreamsSelected(stream_selected) => {
+ if !self.handle_stream_selected(element, stream_selected) {
+ return;
+ }
+ }
+ gst::MessageView::Error(error) => {
+ // find item which raised the error
+ let self_ = UriPlaylistBin::from_instance(element);
+ let mut state_guard = self_.state.lock().unwrap();
+ let state = state_guard.as_mut().unwrap();
+
+ let src = error.src().unwrap();
+ let item = state.find_item_from_src(&src);
+
+ drop(state_guard);
+
+ if let Some(item) = item {
+ // handle the error message so we can add the failing uri as error details
+
+ self.failed(
+ element,
+ PlaylistError::ItemFailed {
+ error: anyhow::anyhow!(
+ "Error when processing item #{} ({}): {}",
+ item.index(),
+ item.uri(),
+ error.error().to_string()
+ ),
+ item,
+ },
+ );
+ return;
+ }
+ }
+ _ => (),
+ }
+
+ self.parent_handle_message(element, msg)
+ }
+}
+
+impl ElementImpl for UriPlaylistBin {
+ fn metadata() -> Option<&'static gst::subclass::ElementMetadata> {
+ static ELEMENT_METADATA: Lazy<gst::subclass::ElementMetadata> = Lazy::new(|| {
+ gst::subclass::ElementMetadata::new(
+ "Playlist Source",
+ "Generic/Source",
+ "Sequentially play uri streams",
+ "Guillaume Desmottes <guillaume.desmottes@onestream.live>",
+ )
+ });
+
+ Some(&*ELEMENT_METADATA)
+ }
+
+ fn pad_templates() -> &'static [gst::PadTemplate] {
+ static PAD_TEMPLATES: Lazy<Vec<gst::PadTemplate>> = Lazy::new(|| {
+ let audio_src_pad_template = gst::PadTemplate::new(
+ "audio_%u",
+ gst::PadDirection::Src,
+ gst::PadPresence::Sometimes,
+ &gst::Caps::new_any(),
+ )
+ .unwrap();
+
+ let video_src_pad_template = gst::PadTemplate::new(
+ "video_%u",
+ gst::PadDirection::Src,
+ gst::PadPresence::Sometimes,
+ &gst::Caps::new_any(),
+ )
+ .unwrap();
+
+ let text_src_pad_template = gst::PadTemplate::new(
+ "text_%u",
+ gst::PadDirection::Src,
+ gst::PadPresence::Sometimes,
+ &gst::Caps::new_any(),
+ )
+ .unwrap();
+
+ vec![
+ audio_src_pad_template,
+ video_src_pad_template,
+ text_src_pad_template,
+ ]
+ });
+
+ PAD_TEMPLATES.as_ref()
+ }
+
+ fn change_state(
+ &self,
+ element: &Self::Type,
+ transition: gst::StateChange,
+ ) -> Result<gst::StateChangeSuccess, gst::StateChangeError> {
+ if transition == gst::StateChange::NullToReady {
+ if let Err(e) = self.start(element) {
+ self.failed(element, e);
+ return Err(gst::StateChangeError);
+ }
+ }
+
+ self.parent_change_state(element, transition)
+ }
+}
+
+impl UriPlaylistBin {
+ fn start(&self, element: &super::UriPlaylistBin) -> Result<(), PlaylistError> {
+ gst_debug!(CAT, obj: element, "Starting");
+ {
+ let mut state_guard = self.state.lock().unwrap();
+ assert!(state_guard.is_none());
+
+ let streamsynchronizer =
+ gst::ElementFactory::make("streamsynchronizer", Some("playlist-streamsync"))
+ .map_err(|e| PlaylistError::PluginMissing { error: e.into() })?;
+
+ element.add(&streamsynchronizer).unwrap();
+
+ let settings = self.settings.lock().unwrap();
+
+ *state_guard = Some(State::new(
+ settings.uris.clone(),
+ settings.iterations,
+ streamsynchronizer,
+ ));
+ }
+
+ self.start_next_item(element)?;
+
+ Ok(())
+ }
+
+ fn start_next_item(&self, element: &super::UriPlaylistBin) -> Result<(), PlaylistError> {
+ let mut state_guard = self.state.lock().unwrap();
+ let state = state_guard.as_mut().unwrap();
+
+ // clean up done items, so uridecodebin elements and concat sink pads don't pile up in the pipeline
+ while let Some(done) = state.done.pop() {
+ let uridecodebin = done.uridecodebin();
+ gst_log!(CAT, obj: element, "remove {} from bin", uridecodebin.name());
+
+ for (concat, sink_pad) in done.concat_sink_pads() {
+ // calling release_request_pad() while holding the pad stream lock would deadlock
+ concat.call_async(move |concat| {
+ concat.release_request_pad(&sink_pad);
+ });
+ }
+
+ // can't change state from the streaming thread
+ let uridecodebin_clone = uridecodebin.clone();
+ element.call_async(move |_element| {
+ let _ = uridecodebin_clone.set_state(gst::State::Null);
+ });
+
+ element.remove(&uridecodebin).unwrap();
+ }
+
+ if state.waiting_for_stream_collection.is_some()
+ || state.waiting_for_pads.is_some()
+ || state.waiting_for_ss_eos.is_some()
+ {
+ // another item is being prepared
+ return Ok(());
+ }
+
+ let n_streaming = state.streaming.len();
+ if n_streaming > MAX_STREAMING_ITEMS {
+ gst_log!(
+ CAT,
+ obj: element,
+ "Too many items streaming ({}), wait before starting the next one",
+ n_streaming
+ );
+
+ return Ok(());
+ }
+
+ let item = match state.playlist.next()? {
+ Some(item) => item,
+ None => {
+ gst_debug!(CAT, obj: element, "no more item to queue",);
+
+ // unblock last item
+ state.unblock_item(element);
+
+ return Ok(());
+ }
+ };
+
+ gst_debug!(
+ CAT,
+ obj: element,
+ "start decoding item #{}: {}",
+ item.index(),
+ item.uri()
+ );
+
+ let uridecodebin = item.uridecodebin();
+
+ element.add(&uridecodebin).unwrap();
+
+ let element_weak = element.downgrade();
+ let uridecodebin_clone = uridecodebin.clone();
+
+ let item_clone = item.clone();
+ assert!(state.waiting_for_stream_collection.is_none());
+ state.waiting_for_stream_collection = Some(item);
+
+ uridecodebin.connect_pad_added(move |_uridecodebin, src_pad| {
+ let element = match element_weak.upgrade() {
+ Some(element) => element,
+ None => return,
+ };
+ let self_ = UriPlaylistBin::from_instance(&element);
+
+ let item = {
+ let mut state_guard = self_.state.lock().unwrap();
+ let state = state_guard.as_mut().unwrap();
+ state.waiting_for_ss_eos.as_ref().cloned()
+ };
+
+ if let Some(item) = item {
+ // block pad until streamsynchronizer is eos
+ let element_weak = element.downgrade();
+ let receiver = item.receiver();
+
+ gst_debug!(
+ CAT,
+ obj: &element,
+ "Block pad {} until streamsynchronizer is flushed",
+ src_pad.name(),
+ );
+
+ src_pad.add_probe(gst::PadProbeType::BLOCK_DOWNSTREAM, move |pad, _info| {
+ let element = match element_weak.upgrade() {
+ Some(element) => element,
+ None => return gst::PadProbeReturn::Remove,
+ };
+ let parent = pad.parent().unwrap();
+
+ let _ = receiver.recv();
+
+ gst_log!(
+ CAT,
+ obj: &element,
+ "pad {}:{} has been unblocked",
+ parent.name(),
+ pad.name()
+ );
+
+ gst::PadProbeReturn::Remove
+ });
+
+ item.add_blocked_pad(src_pad.clone());
+ } else {
+ self_.process_decodebin_pad(src_pad);
+ }
+ });
+
+ drop(state_guard);
+
+ uridecodebin_clone
+ .sync_state_with_parent()
+ .map_err(|e| PlaylistError::ItemFailed {
+ error: e.into(),
+ item: item_clone,
+ })?;
+
+ Ok(())
+ }
+
+ fn handle_stream_collection(
+ &self,
+ element: &super::UriPlaylistBin,
+ stream_collection_msg: gst::message::StreamCollection,
+ ) -> Result<(), PlaylistError> {
+ let mut state_guard = self.state.lock().unwrap();
+ let state = state_guard.as_mut().unwrap();
+ let src = stream_collection_msg.src().unwrap();
+
+ if let Some(item) = state.waiting_for_stream_collection.clone() {
+ // check message is from the decodebin we are waiting for
+ let uridecodebin = item.uridecodebin();
+
+ if src.has_as_ancestor(&uridecodebin) {
+ let topology = StreamsTopology::from(stream_collection_msg.stream_collection());
+
+ gst_debug!(
+ CAT,
+ obj: element,
+ "got stream collection from {}: {:?}",
+ src.name(),
+ topology
+ );
+
+ if state.streams_topology.n_streams() == 0 {
+ state.streams_topology = topology.clone();
+ }
+
+ assert!(state.waiting_for_pads.is_none());
+
+ if state.streams_topology != topology {
+ gst_debug!(
+ CAT,
+ obj: element, "streams topoly changed ('{:?}' -> '{:?}'), waiting for streamsynchronize to be flushed",
+ state.streams_topology, topology);
+ item.set_waiting_for_ss_eos(
+ state.streams_topology.n_streams(),
+ stream_collection_msg,
+ );
+ state.waiting_for_ss_eos = Some(item);
+
+ // unblock previous item as we need it to be flushed out of streamsynchronizer
+ state.unblock_item(element);
+ } else {
+ item.set_waiting_for_pads(topology.n_streams(), stream_collection_msg);
+ state.waiting_for_pads = Some(item);
+ }
+
+ state.waiting_for_stream_collection = None;
+ }
+ }
+ Ok(())
+ }
+
+ // return true if the message can be forwarded
+ fn handle_stream_selected(
+ &self,
+ element: &super::UriPlaylistBin,
+ stream_selected_msg: gst::message::StreamsSelected,
+ ) -> bool {
+ let mut state_guard = self.state.lock().unwrap();
+ let state = state_guard.as_mut().unwrap();
+ let src = stream_selected_msg.src().unwrap();
+
+ if let Some(item) = state.blocked.clone() {
+ let uridecodebin = item.uridecodebin();
+
+ if src.has_as_ancestor(&uridecodebin) {
+ // stream-selected message is from the blocked item, queue the message until it's unblocked
+ gst_debug!(
+ CAT,
+ obj: element,
+ "queue stream-selected message from {} as item is currently blocked",
+ src.name(),
+ );
+
+ item.add_stream_selected(stream_selected_msg.copy());
+ false
+ } else {
+ true
+ }
+ } else {
+ true
+ }
+ }
+
+ fn process_decodebin_pad(&self, src_pad: &gst::Pad) {
+ let element = self.instance();
+
+ let start_next = {
+ let mut state_guard = self.state.lock().unwrap();
+ let state = state_guard.as_mut().unwrap();
+
+ if state.errored {
+ return;
+ }
+
+ let item = state.waiting_for_pads.clone().unwrap();
+
+ // Parse the pad name to extract the stream type and its index.
+ // We could get the type from the Stream object from the StreamStart sticky event but we'd still have
+ // to parse the name for the index.
+ let pad_name = src_pad.name();
+ let (stream_type, stream_index) = match stream_type_from_pad_name(&pad_name) {
+ Ok((stream_type, stream_index)) => (stream_type, stream_index),
+ Err(e) => {
+ gst_warning!(CAT, obj: &element, "Ignoring pad {}: {}", pad_name, e);
+ return;
+ }
+ };
+
+ let concat = match stream_type {
+ gst::StreamType::AUDIO => state.concat_audio.get(stream_index),
+ gst::StreamType::VIDEO => state.concat_video.get(stream_index),
+ gst::StreamType::TEXT => state.concat_text.get(stream_index),
+ _ => unreachable!(), // early return on unsupported streams above
+ };
+
+ let concat = match concat {
+ None => {
+ gst_debug!(
+ CAT,
+ obj: &element,
+ "stream {} from item #{}: creating concat element",
+ pad_name,
+ item.index()
+ );
+
+ let concat = match gst::ElementFactory::make(
+ "concat",
+ Some(&format!(
+ "playlist-concat-{}-{}",
+ stream_type.name(),
+ stream_index
+ )),
+ ) {
+ Ok(concat) => concat,
+ Err(_) => {
+ drop(state_guard);
+ self.failed(
+ &element,
+ PlaylistError::PluginMissing {
+ error: anyhow::anyhow!("element 'concat' missing"),
+ },
+ );
+ return;
+ }
+ };
+
+ // this is done by the streamsynchronizer element downstream
+ concat.set_property("adjust-base", false);
+
+ element.add(&concat).unwrap();
+
+ concat.sync_state_with_parent().unwrap();
+
+ // link concat elements to streamsynchronizer
+ let concat_src = concat.static_pad("src").unwrap();
+ let sync_sink = state
+ .streamsynchronizer
+ .request_pad_simple("sink_%u")
+ .unwrap();
+ concat_src.link(&sync_sink).unwrap();
+
+ let element_weak = element.downgrade();
+
+ // add event probe on streamsynchronizer src pad. Will only be used when we are waiting for the
+ // streamsynchronizer to be flushed in order to handle streams topology changes.
+ let src_pad_name = sync_sink.name().to_string().replace("sink", "src");
+ let sync_src = state.streamsynchronizer.static_pad(&src_pad_name).unwrap();
+ sync_src.add_probe(gst::PadProbeType::EVENT_DOWNSTREAM, move |_pad, info| {
+ match info.data {
+ Some(gst::PadProbeData::Event(ref ev))
+ if ev.type_() == gst::EventType::Eos =>
+ {
+ let element = match element_weak.upgrade() {
+ Some(element) => element,
+ None => return gst::PadProbeReturn::Remove,
+ };
+ let self_ = UriPlaylistBin::from_instance(&element);
+
+ let item = {
+ let mut state_guard = self_.state.lock().unwrap();
+ let state = state_guard.as_mut().unwrap();
+ state.waiting_for_ss_eos.as_ref().cloned()
+ };
+
+ if let Some(item) = item {
+ if item.dec_waiting_eos_ss() {
+ gst_debug!(CAT, obj: &element, "streamsynchronizer has been flushed, reorganize pipeline to fit new streams topology and unblock item");
+ self_.handle_topology_change(&element);
+ gst::PadProbeReturn::Drop
+ } else {
+ gst::PadProbeReturn::Drop
+ }
+ } else {
+ gst::PadProbeReturn::Pass
+ }
+ }
+ _ => gst::PadProbeReturn::Pass,
+ }
+ });
+
+ // ghost streamsynchronizer src pad
+ let sync_src_name = sync_sink.name().as_str().replace("sink", "src");
+ let src = state.streamsynchronizer.static_pad(&sync_src_name).unwrap();
+ let ghost = gst::GhostPad::with_target(Some(pad_name.as_str()), &src).unwrap();
+ ghost.set_active(true).unwrap();
+
+ // proxy sticky events
+ src.sticky_events_foreach(|event| {
+ let _ = ghost.store_sticky_event(&event);
+ Ok(Some(event))
+ });
+
+ unsafe {
+ ghost.set_event_function(|pad, parent, event| match event.view() {
+ gst::EventView::SelectStreams(_) => {
+ // TODO: handle select-streams event
+ let element = parent.unwrap();
+ gst_fixme!(
+ CAT,
+ obj: element,
+ "select-streams event not supported ('{:?}')",
+ event
+ );
+ false
+ }
+ _ => pad.event_default(parent, event),
+ });
+ }
+
+ element.add_pad(&ghost).unwrap();
+
+ match stream_type {
+ gst::StreamType::AUDIO => {
+ state.concat_audio.push(concat.clone());
+ }
+ gst::StreamType::VIDEO => {
+ state.concat_video.push(concat.clone());
+ }
+ gst::StreamType::TEXT => {
+ state.concat_text.push(concat.clone());
+ }
+ _ => unreachable!(), // early return on unsupported streams above
+ }
+
+ concat
+ }
+ Some(concat) => {
+ gst_debug!(
+ CAT,
+ obj: &element,
+ "stream {} from item #{}: re-using concat element {}",
+ pad_name,
+ item.index(),
+ concat.name()
+ );
+
+ concat.clone()
+ }
+ };
+
+ let sink_pad = concat.request_pad_simple("sink_%u").unwrap();
+ src_pad.link(&sink_pad).unwrap();
+
+ item.add_concat_sink_pad(&concat, &sink_pad);
+
+ // block pad until next item is reaching the `Blocked` state
+ let receiver = item.receiver();
+ let element_weak = element.downgrade();
+ let item_clone = item.clone();
+
+ sink_pad.add_probe(gst::PadProbeType::BLOCK_DOWNSTREAM, move |pad, info| {
+ let element = match element_weak.upgrade() {
+ Some(element) => element,
+ None => return gst::PadProbeReturn::Remove,
+ };
+ let parent = pad.parent().unwrap();
+ let item = &item_clone;
+
+ if !item.is_streaming() {
+ // block pad until next item is ready
+ gst_log!(
+ CAT,
+ obj: &element,
+ "blocking pad {}:{} until next item is ready",
+ parent.name(),
+ pad.name()
+ );
+
+ let _ = receiver.recv();
+
+ gst_log!(
+ CAT,
+ obj: &element,
+ "pad {}:{} has been unblocked",
+ parent.name(),
+ pad.name()
+ );
+
+ gst::PadProbeReturn::Pass
+ } else {
+ match info.data {
+ Some(gst::PadProbeData::Event(ref ev))
+ if ev.type_() == gst::EventType::Eos =>
+ {
+ if item.dec_waiting_eos() {
+ // all the streams are eos, item is now done
+ gst_log!(
+ CAT,
+ obj: &element,
+ "all streams of item #{} are eos",
+ item.index()
+ );
+
+ let self_ = UriPlaylistBin::from_instance(&element);
+ {
+ let mut state_guard = self_.state.lock().unwrap();
+ let state = state_guard.as_mut().unwrap();
+
+ let index = item.index();
+
+ let removed = state
+ .streaming
+ .iter()
+ .position(|i| i.index() == index)
+ .map(|e| state.streaming.remove(e));
+
+ if let Some(item) = removed {
+ item.set_done();
+ state.done.push(item);
+ }
+ }
+
+ if let Err(e) = self_.start_next_item(&element) {
+ self_.failed(&element, e);
+ }
+ }
+
+ gst::PadProbeReturn::Remove
+ }
+ _ => gst::PadProbeReturn::Pass,
+ }
+ }
+ });
+
+ if item.dec_n_pads_pending() == 0 {
+ // we got all the pads
+ gst_debug!(
+ CAT,
+ obj: &element,
+ "got all the pads for item #{}",
+ item.index()
+ );
+
+ // all pads have been linked to concat, unblock previous item
+ state.unblock_item(&element);
+
+ state.waiting_for_pads = None;
+ // block item until the next one is fully linked to concat
+ item.set_blocked();
+ state.blocked = Some(item);
+
+ true
+ } else {
+ false
+ }
+ };
+
+ if start_next {
+ gst_debug!(
+ CAT,
+ obj: &element,
+ "got all pending streams, queue next item"
+ );
+
+ if let Err(e) = self.start_next_item(&element) {
+ self.failed(&element, e);
+ }
+ }
+ }
+
+ /// called when all previous items have been flushed from streamsynchronizer
+ /// and so the elements can reorganize itself to handle a pending changes in
+ /// streams topology.
+ fn handle_topology_change(&self, element: &super::UriPlaylistBin) {
+ let (pending_pads, sender) = {
+ let mut state_guard = self.state.lock().unwrap();
+ let state = state_guard.as_mut().unwrap();
+
+ let item = state.waiting_for_ss_eos.take().unwrap();
+ let (topology, pending_pads, sender) = item.done_waiting_for_ss_eos();
+ state.waiting_for_pads = Some(item);
+
+ // remove now useless concat elements, missing ones will be added when handling src pads from decodebin
+
+ fn remove_useless_concat(
+ n_stream: usize,
+ concats: &mut Vec<gst::Element>,
+ element: &super::UriPlaylistBin,
+ streamsynchronizer: &gst::Element,
+ ) {
+ while n_stream < concats.len() {
+ // need to remove concat elements
+ let concat = concats.pop().unwrap();
+ gst_log!(CAT, obj: element, "remove {}", concat.name());
+
+ let concat_src = concat.static_pad("src").unwrap();
+ let ss_sink = concat_src.peer().unwrap();
+
+ // unlink and remove sink pad from streamsynchronizer
+ concat_src.unlink(&ss_sink).unwrap();
+ streamsynchronizer.release_request_pad(&ss_sink);
+
+ // remove associated ghost pad
+ let src_pads = element.src_pads();
+ let ghost = src_pads
+ .iter()
+ .find(|pad| {
+ let ghost = pad.downcast_ref::<gst::GhostPad>().unwrap();
+ ghost.target().is_none()
+ })
+ .unwrap();
+ element.remove_pad(ghost).unwrap();
+
+ element.remove(&concat).unwrap();
+ let _ = concat.set_state(gst::State::Null);
+ }
+ }
+
+ remove_useless_concat(
+ topology.audio as usize,
+ &mut state.concat_audio,
+ element,
+ &state.streamsynchronizer,
+ );
+ remove_useless_concat(
+ topology.video as usize,
+ &mut state.concat_video,
+ element,
+ &state.streamsynchronizer,
+ );
+ remove_useless_concat(
+ topology.text as usize,
+ &mut state.concat_text,
+ element,
+ &state.streamsynchronizer,
+ );
+
+ state.streams_topology = topology;
+
+ (pending_pads, sender)
+ };
+
+ // process decodebin src pads we already received and unblock them
+ for pad in pending_pads.iter() {
+ self.process_decodebin_pad(pad);
+ }
+
+ let _ = sender.send(true);
+ }
+
+ fn failed(&self, element: &super::UriPlaylistBin, error: PlaylistError) {
+ {
+ let mut state_guard = self.state.lock().unwrap();
+ let state = state_guard.as_mut().unwrap();
+
+ if state.errored {
+ return;
+ }
+ state.errored = true;
+
+ if let Some(blocked) = state.blocked.take() {
+ // unblock streaming thread
+ blocked.set_streaming(state.streams_topology.n_streams());
+ }
+ }
+ let error_msg = error.to_string();
+ gst_error!(CAT, obj: element, "{}", error_msg);
+
+ match error {
+ PlaylistError::PluginMissing { .. } => {
+ gst::element_error!(element, gst::CoreError::MissingPlugin, [&error_msg]);
+ }
+ PlaylistError::ItemFailed { item, .. } => {
+ // remove failing uridecodebin
+ let uridecodebin = item.uridecodebin();
+ uridecodebin.call_async(move |uridecodebin| {
+ let _ = uridecodebin.set_state(gst::State::Null);
+ });
+ let _ = element.remove(&uridecodebin);
+
+ let details = gst::Structure::builder("details");
+ let details = details.field("uri", item.uri());
+
+ gst::element_error!(
+ element,
+ gst::LibraryError::Failed,
+ [&error_msg],
+ details: details.build()
+ );
+ }
+ }
+ }
+}
diff --git a/utils/uriplaylistbin/src/uriplaylistbin/mod.rs b/utils/uriplaylistbin/src/uriplaylistbin/mod.rs
new file mode 100644
index 000000000..a3fa7c169
--- /dev/null
+++ b/utils/uriplaylistbin/src/uriplaylistbin/mod.rs
@@ -0,0 +1,30 @@
+// Copyright (C) 2021 OneStream Live <guillaume.desmottes@onestream.live>
+//
+// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0.
+// If a copy of the MPL was not distributed with this file, You can obtain one at
+// <https://mozilla.org/MPL/2.0/>.
+//
+// SPDX-License-Identifier: MPL-2.0
+
+use gst::glib;
+use gst::prelude::*;
+
+mod imp;
+
+glib::wrapper! {
+ pub struct UriPlaylistBin(ObjectSubclass<imp::UriPlaylistBin>) @extends gst::Bin, gst::Element, gst::Object;
+}
+
+// GStreamer elements need to be thread-safe. For the private implementation this is automatically
+// enforced but for the public wrapper type we need to specify this manually.
+unsafe impl Send for UriPlaylistBin {}
+unsafe impl Sync for UriPlaylistBin {}
+
+pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> {
+ gst::Element::register(
+ Some(plugin),
+ "uriplaylistbin",
+ gst::Rank::None,
+ UriPlaylistBin::static_type(),
+ )
+}
diff --git a/utils/uriplaylistbin/tests/sample.mkv b/utils/uriplaylistbin/tests/sample.mkv
new file mode 100644
index 000000000..331e063ba
--- /dev/null
+++ b/utils/uriplaylistbin/tests/sample.mkv
Binary files differ
diff --git a/utils/uriplaylistbin/tests/sample.ogg b/utils/uriplaylistbin/tests/sample.ogg
new file mode 100644
index 000000000..fd79fe108
--- /dev/null
+++ b/utils/uriplaylistbin/tests/sample.ogg
Binary files differ
diff --git a/utils/uriplaylistbin/tests/uriplaylistbin.rs b/utils/uriplaylistbin/tests/uriplaylistbin.rs
new file mode 100644
index 000000000..e8e4842cd
--- /dev/null
+++ b/utils/uriplaylistbin/tests/uriplaylistbin.rs
@@ -0,0 +1,299 @@
+// Copyright (C) 2021 OneStream Live <guillaume.desmottes@onestream.live>
+//
+// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0.
+// If a copy of the MPL was not distributed with this file, You can obtain one at
+// <https://mozilla.org/MPL/2.0/>.
+//
+// SPDX-License-Identifier: MPL-2.0
+
+use std::path::PathBuf;
+
+use gst::prelude::*;
+use gst::MessageView;
+use more_asserts::assert_ge;
+
+struct TestMedia {
+ uri: String,
+ len: gst::ClockTime,
+}
+
+fn file_name_to_uri(name: &str) -> String {
+ let input_path = {
+ let mut r = PathBuf::new();
+ r.push(env!("CARGO_MANIFEST_DIR"));
+ r.push("tests");
+ r.push(name);
+ r
+ };
+
+ let url = url::Url::from_file_path(&input_path).unwrap();
+ url.to_string()
+}
+
+impl TestMedia {
+ fn ogg() -> Self {
+ Self {
+ uri: file_name_to_uri("sample.ogg"),
+ len: gst::ClockTime::from_mseconds(510),
+ }
+ }
+
+ fn mkv() -> Self {
+ Self {
+ uri: file_name_to_uri("sample.mkv"),
+ len: gst::ClockTime::from_mseconds(510),
+ }
+ }
+
+ fn missing_file() -> Self {
+ Self {
+ uri: "file:///not-there.ogg".to_string(),
+ len: gst::ClockTime::from_mseconds(10),
+ }
+ }
+
+ fn missing_http() -> Self {
+ Self {
+ uri: "http:///not-there.ogg".to_string(),
+ len: gst::ClockTime::from_mseconds(10),
+ }
+ }
+}
+
+fn init() {
+ use std::sync::Once;
+ static INIT: Once = Once::new();
+
+ INIT.call_once(|| {
+ gst::init().unwrap();
+ gsturiplaylistbin::plugin_register_static()
+ .expect("Failed to register uriplaylistbin plugin");
+ });
+}
+
+fn test(
+ medias: Vec<TestMedia>,
+ n_streams: u32,
+ iterations: u32,
+ check_streams: bool,
+) -> Vec<gst::Message> {
+ init();
+
+ let playlist_len = medias.len() * (iterations as usize);
+
+ let pipeline = gst::Pipeline::new(None);
+ let playlist = gst::ElementFactory::make("uriplaylistbin", None).unwrap();
+ let mq = gst::ElementFactory::make("multiqueue", None).unwrap();
+
+ pipeline.add_many(&[&playlist, &mq]).unwrap();
+
+ let total_len = gst::ClockTime::from_nseconds(
+ medias
+ .iter()
+ .map(|t| t.len.nseconds() * (iterations as u64))
+ .sum(),
+ );
+
+ let uris: Vec<String> = medias.iter().map(|t| t.uri.clone()).collect();
+
+ playlist.set_property("uris", &uris);
+ playlist.set_property("iterations", &iterations);
+
+ let mq_clone = mq.clone();
+ playlist.connect_pad_added(move |_playlist, src_pad| {
+ let mq_sink = mq_clone.request_pad_simple("sink_%u").unwrap();
+ src_pad.link(&mq_sink).unwrap();
+ });
+
+ let pipeline_weak = pipeline.downgrade();
+ mq.connect_pad_added(move |_mq, pad| {
+ if pad.direction() != gst::PadDirection::Src {
+ return;
+ }
+
+ let pipeline = match pipeline_weak.upgrade() {
+ Some(pipeline) => pipeline,
+ None => return,
+ };
+
+ let sink = gst::ElementFactory::make("fakesink", None).unwrap();
+ pipeline.add(&sink).unwrap();
+ sink.sync_state_with_parent().unwrap();
+
+ pad.link(&sink.static_pad("sink").unwrap()).unwrap();
+ });
+
+ pipeline.set_state(gst::State::Playing).unwrap();
+
+ let bus = pipeline.bus().unwrap();
+ let mut events = vec![];
+
+ loop {
+ let msg = bus.iter_timed(gst::ClockTime::NONE).next().unwrap();
+
+ match msg.view() {
+ MessageView::Error(_) | MessageView::Eos(..) => {
+ events.push(msg.clone());
+ break;
+ }
+ // check stream related messages
+ MessageView::StreamCollection(_) | MessageView::StreamsSelected(_) => {
+ events.push(msg.clone())
+ }
+ _ => {}
+ }
+ }
+
+ // check we actually played all files and all streams
+ fn stream_end_ts(sink: &gst::Element) -> gst::ClockTime {
+ let sample: gst::Sample = sink.property("last-sample");
+ let buffer = sample.buffer().unwrap();
+ let pts = buffer.pts().unwrap();
+ let segment = sample.segment().unwrap();
+ let segment = segment.downcast_ref::<gst::ClockTime>().unwrap();
+ let rt = segment.to_running_time(pts).unwrap();
+
+ rt + buffer.duration().unwrap()
+ }
+
+ if check_streams {
+ // check all streams have been fully played
+ let mut n = 0;
+ for sink in pipeline.iterate_sinks() {
+ let sink = sink.unwrap();
+ assert_ge!(
+ stream_end_ts(&sink),
+ total_len,
+ "{}: {} < {}",
+ sink.name(),
+ stream_end_ts(&sink),
+ total_len
+ );
+ n += 1;
+ }
+ assert_eq!(n, n_streams);
+
+ // check stream-collection and streams-selected message ordering
+ let mut events = events.clone().into_iter();
+
+ for _ in 0..playlist_len {
+ let decodebin = assert_stream_collection(events.next().unwrap(), n_streams as usize);
+ assert_eq!(
+ assert_stream_selected(events.next().unwrap(), n_streams as usize),
+ decodebin
+ );
+ }
+ }
+
+ pipeline.set_state(gst::State::Null).unwrap();
+
+ events
+}
+
+fn assert_eos(msg: gst::Message) {
+ assert!(matches!(msg.view(), MessageView::Eos(_)));
+}
+
+fn assert_error(msg: gst::Message, failing: TestMedia) {
+ match msg.view() {
+ MessageView::Error(err) => {
+ let details = err.details().unwrap();
+ assert_eq!(details.get::<&str>("uri").unwrap(), failing.uri);
+ }
+ _ => {
+ panic!("last message is not an error");
+ }
+ }
+}
+
+fn assert_stream_collection(msg: gst::Message, n_streams: usize) -> gst::Object {
+ match msg.view() {
+ MessageView::StreamCollection(sc) => {
+ let collection = sc.stream_collection();
+ assert_eq!(collection.len(), n_streams);
+ sc.src().unwrap()
+ }
+ _ => {
+ panic!("message is not a stream collection");
+ }
+ }
+}
+
+fn assert_stream_selected(msg: gst::Message, n_streams: usize) -> gst::Object {
+ match msg.view() {
+ MessageView::StreamsSelected(ss) => {
+ let collection = ss.stream_collection();
+ assert_eq!(collection.len(), n_streams);
+ ss.src().unwrap()
+ }
+ _ => {
+ panic!("message is not stream selected");
+ }
+ }
+}
+
+#[test]
+fn single_audio() {
+ let events = test(vec![TestMedia::ogg()], 1, 1, true).into_iter();
+ assert_eos(events.last().unwrap());
+}
+
+#[test]
+fn single_video() {
+ let events = test(vec![TestMedia::mkv()], 2, 1, true).into_iter();
+ assert_eos(events.last().unwrap());
+}
+
+#[test]
+fn multi_audio() {
+ let events = test(
+ vec![TestMedia::ogg(), TestMedia::ogg(), TestMedia::ogg()],
+ 1,
+ 1,
+ true,
+ )
+ .into_iter();
+ assert_eos(events.last().unwrap());
+}
+
+#[test]
+fn multi_audio_video() {
+ let events = test(vec![TestMedia::mkv(), TestMedia::mkv()], 2, 1, true).into_iter();
+ assert_eos(events.last().unwrap());
+}
+
+#[test]
+fn iterations() {
+ let events = test(vec![TestMedia::mkv(), TestMedia::mkv()], 2, 2, true).into_iter();
+ assert_eos(events.last().unwrap());
+}
+
+#[test]
+fn nb_streams_increasing() {
+ let events = test(vec![TestMedia::ogg(), TestMedia::mkv()], 2, 1, false).into_iter();
+ assert_eos(events.last().unwrap());
+}
+
+#[test]
+fn missing_file() {
+ let events = test(
+ vec![TestMedia::ogg(), TestMedia::missing_file()],
+ 1,
+ 1,
+ false,
+ )
+ .into_iter();
+ assert_error(events.last().unwrap(), TestMedia::missing_file());
+}
+
+#[test]
+fn missing_http() {
+ let events = test(
+ vec![TestMedia::ogg(), TestMedia::missing_http()],
+ 1,
+ 1,
+ false,
+ )
+ .into_iter();
+ assert_error(events.last().unwrap(), TestMedia::missing_http());
+}