Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.freedesktop.org/gstreamer/gst-plugins-rs.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSeungha Yang <seungha@centricular.com>2023-11-10 16:49:40 +0300
committerSebastian Dröge <sebastian@centricular.com>2023-11-10 18:32:39 +0300
commitd2e5cb43ccf6b2f7e7509fe797e72f3e7b6e073f (patch)
tree1222bd226a3ef445ab3dbc3825de95be28f6bff1
parent29cbfbf970ef66356ee5dce75d0cc297e83cf7f8 (diff)
hlssink3: Various cleanup
* Simplify state/playlist management * Fix a bug that segment is not deleted if location contains directory and playlist-root is unset * Split playlist update routine into two steps, adding segment to playlist and playlist write Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/1387>
-rw-r--r--net/hlssink3/src/imp.rs257
1 files changed, 108 insertions, 149 deletions
diff --git a/net/hlssink3/src/imp.rs b/net/hlssink3/src/imp.rs
index 3d07a071c..0b4323fcc 100644
--- a/net/hlssink3/src/imp.rs
+++ b/net/hlssink3/src/imp.rs
@@ -17,7 +17,7 @@ use once_cell::sync::Lazy;
use std::fs;
use std::io::Write;
use std::path;
-use std::sync::{Arc, Mutex};
+use std::sync::Mutex;
const DEFAULT_LOCATION: &str = "segment%05d.ts";
const DEFAULT_PLAYLIST_LOCATION: &str = "playlist.m3u8";
@@ -104,49 +104,22 @@ impl Default for Settings {
}
}
-pub(crate) struct StartedState {
+struct PlaylistContext {
playlist: Playlist,
fragment_opened_at: Option<gst::ClockTime>,
current_segment_location: Option<String>,
old_segment_locations: Vec<String>,
}
-impl StartedState {
- fn new(
- target_duration: f32,
- playlist_type: Option<MediaPlaylistType>,
- i_frames_only: bool,
- ) -> Self {
- Self {
- playlist: Playlist::new(target_duration, playlist_type, i_frames_only),
- current_segment_location: None,
- fragment_opened_at: None,
- old_segment_locations: Vec::new(),
- }
- }
-
- fn fragment_duration_since(&self, fragment_closed: gst::ClockTime) -> f32 {
- let segment_duration = fragment_closed - self.fragment_opened_at.unwrap();
- segment_duration.mseconds() as f32 / 1_000f32
- }
-}
-
-#[allow(clippy::large_enum_variant)]
-enum State {
- Stopped,
- Started(StartedState),
-}
-
-impl Default for State {
- fn default() -> Self {
- Self::Stopped
- }
+#[derive(Default)]
+struct State {
+ context: Option<PlaylistContext>,
}
#[derive(Default)]
pub struct HlsSink3 {
- settings: Arc<Mutex<Settings>>,
- state: Arc<Mutex<State>>,
+ settings: Mutex<Settings>,
+ state: Mutex<State>,
}
impl HlsSink3 {
@@ -163,12 +136,21 @@ impl HlsSink3 {
};
let mut state = self.state.lock().unwrap();
- if let State::Stopped = *state {
- *state = State::Started(StartedState::new(
- target_duration,
- playlist_type,
- i_frames_only,
- ));
+ state.context = Some(PlaylistContext {
+ playlist: Playlist::new(target_duration, playlist_type, i_frames_only),
+ fragment_opened_at: None,
+ current_segment_location: None,
+ old_segment_locations: Vec::new(),
+ });
+ }
+
+ fn stop(&self) {
+ let mut state = self.state.lock().unwrap();
+ if let Some(mut context) = state.context.take() {
+ if context.playlist.is_rendering() {
+ context.playlist.stop();
+ let _ = self.write_playlist(&mut context);
+ }
}
}
@@ -180,11 +162,14 @@ impl HlsSink3 {
fragment_id
);
- // TODO: Create method in state to simplify this boilerplate: `let state = self.state.started()?`
- let mut state_guard = self.state.lock().unwrap();
- let state = match &mut *state_guard {
- State::Stopped => return Err("Not in Started state".to_string()),
- State::Started(s) => s,
+ let mut state = self.state.lock().unwrap();
+ let context = match state.context.as_mut() {
+ Some(context) => context,
+ None => {
+ gst::error!(CAT, imp: self, "Playlist is not configured",);
+
+ return Err(String::from("Playlist is not configured"));
+ }
};
let settings = self.settings.lock().unwrap();
@@ -203,7 +188,7 @@ impl HlsSink3 {
segment_file_location
);
- state.current_segment_location = Some(segment_file_location.clone());
+ context.current_segment_location = Some(segment_file_location.clone());
let fragment_stream = self
.obj()
@@ -221,7 +206,7 @@ impl HlsSink3 {
CAT,
imp: self,
"New segment location: {:?}",
- state.current_segment_location.as_ref()
+ context.current_segment_location.as_ref()
);
Ok(segment_file_location)
}
@@ -259,29 +244,63 @@ impl HlsSink3 {
});
}
- fn write_playlist(
- &self,
- fragment_closed_at: Option<gst::ClockTime>,
- ) -> Result<gst::StateChangeSuccess, gst::StateChangeError> {
- gst::info!(CAT, imp: self, "Preparing to write new playlist");
+ fn on_fragment_closed(&self, closed_at: gst::ClockTime) {
+ let mut state = self.state.lock().unwrap();
+ let context = match state.context.as_mut() {
+ Some(context) => context,
+ None => {
+ gst::error!(CAT, imp: self, "Playlist is not configured");
+ return;
+ }
+ };
- let mut state_guard = self.state.lock().unwrap();
- let state = match &mut *state_guard {
- State::Stopped => return Err(gst::StateChangeError),
- State::Started(s) => s,
+ let location = match context.current_segment_location.take() {
+ Some(location) => location,
+ None => {
+ gst::error!(CAT, imp: self, "Unknown segment location");
+ return;
+ }
};
- gst::info!(CAT, imp: self, "COUNT {}", state.playlist.len());
+ let opened_at = match context.fragment_opened_at.take() {
+ Some(opened_at) => opened_at,
+ None => {
+ gst::error!(CAT, imp: self, "Unknown segment duration");
+ return;
+ }
+ };
- // Only add fragment if it's complete.
- if let Some(fragment_closed) = fragment_closed_at {
- let segment_filename = self.segment_filename(state);
- state.playlist.add_segment(
- segment_filename.clone(),
- state.fragment_duration_since(fragment_closed),
- );
- state.old_segment_locations.push(segment_filename);
- }
+ let duration = ((closed_at - opened_at).mseconds() as f32) / 1_000f32;
+ let file_name = path::Path::new(&location)
+ .file_name()
+ .unwrap()
+ .to_str()
+ .unwrap();
+
+ let settings = self.settings.lock().unwrap();
+ let segment_file_name = if let Some(playlist_root) = &settings.playlist_root {
+ format!("{playlist_root}/{file_name}")
+ } else {
+ file_name.to_string()
+ };
+ drop(settings);
+
+ context.playlist.add_segment(segment_file_name, duration);
+ context.old_segment_locations.push(location);
+
+ let _ = self.write_playlist(context);
+ }
+
+ fn write_playlist(
+ &self,
+ context: &mut PlaylistContext,
+ ) -> Result<gst::FlowSuccess, gst::FlowError> {
+ gst::info!(
+ CAT,
+ imp: self,
+ "Preparing to write new playlist, COUNT {}",
+ context.playlist.len()
+ );
let (playlist_location, max_num_segments, max_playlist_length) = {
let settings = self.settings.lock().unwrap();
@@ -292,7 +311,7 @@ impl HlsSink3 {
)
};
- state.playlist.update_playlist_state(max_playlist_length);
+ context.playlist.update_playlist_state(max_playlist_length);
// Acquires the playlist file handle so we can update it with new content. By default, this
// is expected to be the same file every time.
@@ -308,11 +327,11 @@ impl HlsSink3 {
imp: self,
"Could not get stream to write playlist content",
);
- gst::StateChangeError
+ gst::FlowError::Error
})?
.into_write();
- state
+ context
.playlist
.write_to(&mut playlist_stream)
.map_err(|err| {
@@ -322,7 +341,7 @@ impl HlsSink3 {
"Could not write new playlist: {}",
err.to_string()
);
- gst::StateChangeError
+ gst::FlowError::Error
})?;
playlist_stream.flush().map_err(|err| {
gst::error!(
@@ -331,14 +350,14 @@ impl HlsSink3 {
"Could not flush playlist: {}",
err.to_string()
);
- gst::StateChangeError
+ gst::FlowError::Error
})?;
- if state.playlist.is_type_undefined() && max_num_segments > 0 {
+ if context.playlist.is_type_undefined() && max_num_segments > 0 {
// Cleanup old segments from filesystem
- if state.old_segment_locations.len() > max_num_segments {
- for _ in 0..state.old_segment_locations.len() - max_num_segments {
- let old_segment_location = state.old_segment_locations.remove(0);
+ if context.old_segment_locations.len() > max_num_segments {
+ for _ in 0..context.old_segment_locations.len() - max_num_segments {
+ let old_segment_location = context.old_segment_locations.remove(0);
if !self
.obj()
.emit_by_name::<bool>(SIGNAL_DELETE_FRAGMENT, &[&old_segment_location])
@@ -350,40 +369,7 @@ impl HlsSink3 {
}
gst::debug!(CAT, imp: self, "Wrote new playlist file!");
- Ok(gst::StateChangeSuccess::Success)
- }
-
- fn segment_filename(&self, state: &mut StartedState) -> String {
- assert!(state.current_segment_location.is_some());
- let name = state.current_segment_location.take().unwrap();
- let segment_filename = path::Path::new(&name)
- .file_name()
- .unwrap()
- .to_str()
- .unwrap();
-
- let settings = self.settings.lock().unwrap();
- if let Some(playlist_root) = &settings.playlist_root {
- format!("{}/{}", playlist_root, segment_filename)
- } else {
- segment_filename.to_string()
- }
- }
-
- fn write_final_playlist(&self) -> Result<gst::StateChangeSuccess, gst::StateChangeError> {
- gst::debug!(CAT, imp: self, "Preparing to write final playlist");
- self.write_playlist(None)
- }
-
- fn stop(&self) {
- gst::debug!(CAT, imp: self, "Stopping");
-
- let mut state = self.state.lock().unwrap();
- if let State::Started(_) = *state {
- *state = State::Stopped;
- }
-
- gst::debug!(CAT, imp: self, "Stopped");
+ Ok(gst::FlowSuccess::Ok)
}
}
@@ -416,18 +402,15 @@ impl BinImpl for HlsSink3 {
if let Ok(new_fragment_opened_at) = s.get::<gst::ClockTime>("running-time")
{
let mut state = self.state.lock().unwrap();
- match &mut *state {
- State::Stopped => {}
- State::Started(state) => {
- state.fragment_opened_at = Some(new_fragment_opened_at)
- }
- };
+ if let Some(context) = state.context.as_mut() {
+ context.fragment_opened_at = Some(new_fragment_opened_at);
+ }
}
}
"splitmuxsink-fragment-closed" => {
let s = msg.structure().unwrap();
if let Ok(fragment_closed_at) = s.get::<gst::ClockTime>("running-time") {
- let _ = self.write_playlist(Some(fragment_closed_at));
+ self.on_fragment_closed(fragment_closed_at);
}
}
_ => {}
@@ -670,20 +653,20 @@ impl ObjectImpl for HlsSink3 {
obj.add(&settings.splitmuxsink).unwrap();
settings.splitmuxsink.connect("format-location", false, {
- let self_weak = self.downgrade();
+ let imp_weak = self.downgrade();
move |args| {
- let self_ = match self_weak.upgrade() {
- Some(self_) => self_,
+ let imp = match imp_weak.upgrade() {
+ Some(imp) => imp,
None => return Some(None::<String>.to_value()),
};
let fragment_id = args[1].get::<u32>().unwrap();
- gst::info!(CAT, imp: self_, "Got fragment-id: {}", fragment_id);
+ gst::info!(CAT, imp: imp, "Got fragment-id: {}", fragment_id);
- match self_.on_format_location(fragment_id) {
+ match imp.on_format_location(fragment_id) {
Ok(segment_location) => Some(segment_location.to_value()),
Err(err) => {
- gst::error!(CAT, imp: self_, "on format-location handler: {}", err);
+ gst::error!(CAT, imp: imp, "on format-location handler: {}", err);
Some("unknown_segment".to_value())
}
}
@@ -740,38 +723,14 @@ impl ElementImpl for HlsSink3 {
&self,
transition: gst::StateChange,
) -> Result<gst::StateChangeSuccess, gst::StateChangeError> {
- if let gst::StateChange::NullToReady = transition {
+ if transition == gst::StateChange::ReadyToPaused {
self.start();
}
let ret = self.parent_change_state(transition)?;
- match transition {
- gst::StateChange::PausedToReady => {
- let write_final = {
- let mut state = self.state.lock().unwrap();
- match &mut *state {
- State::Stopped => false,
- State::Started(state) => {
- if state.playlist.is_rendering() {
- state.playlist.stop();
- true
- } else {
- false
- }
- }
- }
- };
-
- if write_final {
- // Don't fail transitioning to READY if this fails
- let _ = self.write_final_playlist();
- }
- }
- gst::StateChange::ReadyToNull => {
- self.stop();
- }
- _ => (),
+ if transition == gst::StateChange::PausedToReady {
+ self.stop();
}
Ok(ret)