Welcome to mirror list, hosted at ThFree Co, Russian Federation.

github.com/sdroege/gst-plugin-rs.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSeungha Yang <seungha@centricular.com>2023-09-24 12:19:49 +0300
committerSeungha Yang <seungha@centricular.com>2023-09-25 15:32:16 +0300
commit0fe69cea9ff15b427051fcfb734b3f9b1723d8d6 (patch)
treefb8921d49ee903b853d0b0b4c0682f418294a4b1
parentd8546dd1405879c5b551900a8c553b60458f9d1f (diff)
hlssink3: Various cleanup
* Simplify state/playlist management * Fix a bug that segment is not deleted if location contains directory and playlist-root is unset * Split playlist update routine into two steps, adding segment to playlist and playlist write Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/1306>
-rw-r--r--net/hlssink3/src/imp.rs332
1 files changed, 143 insertions, 189 deletions
diff --git a/net/hlssink3/src/imp.rs b/net/hlssink3/src/imp.rs
index 1d5b40bf..12fa2376 100644
--- a/net/hlssink3/src/imp.rs
+++ b/net/hlssink3/src/imp.rs
@@ -18,7 +18,7 @@ use m3u8_rs::MediaPlaylistType;
use std::fs;
use std::io::Write;
use std::path;
-use std::sync::{Arc, Mutex};
+use std::sync::Mutex;
const DEFAULT_LOCATION: &str = "segment%05d.ts";
const DEFAULT_PLAYLIST_LOCATION: &str = "playlist.m3u8";
@@ -111,7 +111,7 @@ impl Default for Settings {
}
}
-pub(crate) struct StartedState {
+struct PlaylistContext {
pdt_base_utc: Option<DateTime<Utc>>,
pdt_base_running_time: Option<gst::ClockTime>,
playlist: Playlist,
@@ -121,45 +121,15 @@ pub(crate) struct StartedState {
old_segment_locations: Vec<String>,
}
-impl StartedState {
- fn new(
- target_duration: f32,
- playlist_type: Option<MediaPlaylistType>,
- i_frames_only: bool,
- ) -> Self {
- Self {
- pdt_base_utc: None,
- pdt_base_running_time: None,
- playlist: Playlist::new(target_duration, playlist_type, i_frames_only),
- current_segment_location: None,
- fragment_opened_at: None,
- fragment_running_time: None,
- old_segment_locations: Vec::new(),
- }
- }
-
- fn fragment_duration_since(&self, fragment_closed: gst::ClockTime) -> f32 {
- let segment_duration = fragment_closed - self.fragment_opened_at.unwrap();
- segment_duration.mseconds() as f32 / 1_000f32
- }
-}
-
-#[allow(clippy::large_enum_variant)]
-enum State {
- Stopped,
- Started(StartedState),
-}
-
-impl Default for State {
- fn default() -> Self {
- Self::Stopped
- }
+#[derive(Default)]
+struct State {
+ context: Option<PlaylistContext>,
}
#[derive(Default)]
pub struct HlsSink3 {
- settings: Arc<Mutex<Settings>>,
- state: Arc<Mutex<State>>,
+ settings: Mutex<Settings>,
+ state: Mutex<State>,
}
impl HlsSink3 {
@@ -176,12 +146,24 @@ impl HlsSink3 {
};
let mut state = self.state.lock().unwrap();
- if let State::Stopped = *state {
- *state = State::Started(StartedState::new(
- target_duration,
- playlist_type,
- i_frames_only,
- ));
+ state.context = Some(PlaylistContext {
+ pdt_base_utc: None,
+ pdt_base_running_time: None,
+ playlist: Playlist::new(target_duration, playlist_type, i_frames_only),
+ fragment_opened_at: None,
+ fragment_running_time: None,
+ current_segment_location: None,
+ old_segment_locations: Vec::new(),
+ });
+ }
+
+ fn stop(&self) {
+ let mut state = self.state.lock().unwrap();
+ if let Some(mut context) = state.context.take() {
+ if context.playlist.is_rendering() {
+ context.playlist.stop();
+ let _ = self.write_playlist(&mut context);
+ }
}
}
@@ -193,11 +175,18 @@ impl HlsSink3 {
fragment_id
);
- // TODO: Create method in state to simplify this boilerplate: `let state = self.state.started()?`
- let mut state_guard = self.state.lock().unwrap();
- let state = match &mut *state_guard {
- State::Stopped => return Err("Not in Started state".to_string()),
- State::Started(s) => s,
+ let mut state = self.state.lock().unwrap();
+ let context = match state.context.as_mut() {
+ Some(context) => context,
+ None => {
+ gst::error!(
+ CAT,
+ imp: self,
+ "Playlist is not configured",
+ );
+
+ return Err(String::from("Playlist is not configured"));
+ }
};
let settings = self.settings.lock().unwrap();
@@ -220,7 +209,7 @@ impl HlsSink3 {
segment_file_location
);
- state.current_segment_location = Some(segment_file_location.clone());
+ context.current_segment_location = Some(segment_file_location.clone());
let fragment_stream = self
.obj()
@@ -238,7 +227,7 @@ impl HlsSink3 {
CAT,
imp: self,
"New segment location: {:?}",
- state.current_segment_location.as_ref()
+ context.current_segment_location.as_ref()
);
Ok(segment_file_location)
}
@@ -276,31 +265,60 @@ impl HlsSink3 {
});
}
- fn write_playlist(
- &self,
- fragment_closed_at: Option<gst::ClockTime>,
- date_time: Option<DateTime<Utc>>,
- ) -> Result<gst::StateChangeSuccess, gst::StateChangeError> {
- gst::info!(CAT, imp: self, "Preparing to write new playlist");
+ fn on_fragment_closed(&self, closed_at: gst::ClockTime, date_time: Option<DateTime<Utc>>) {
+ let mut state = self.state.lock().unwrap();
+ let context = match state.context.as_mut() {
+ Some(context) => context,
+ None => {
+ gst::error!(CAT, imp: self, "Playlist is not configured");
+ return;
+ }
+ };
- let mut state_guard = self.state.lock().unwrap();
- let state = match &mut *state_guard {
- State::Stopped => return Err(gst::StateChangeError),
- State::Started(s) => s,
+ let location = match context.current_segment_location.take() {
+ Some(location) => location,
+ None => {
+ gst::error!(CAT, imp: self, "Unknown segment location");
+ return;
+ }
};
- gst::info!(CAT, imp: self, "COUNT {}", state.playlist.len());
+ let opened_at = match context.fragment_opened_at.take() {
+ Some(opened_at) => opened_at,
+ None => {
+ gst::error!(CAT, imp: self, "Unknown segment duration");
+ return;
+ }
+ };
- // Only add fragment if it's complete.
- if let Some(fragment_closed) = fragment_closed_at {
- let segment_filename = self.segment_filename(state);
- state.playlist.add_segment(
- segment_filename.clone(),
- state.fragment_duration_since(fragment_closed),
- date_time,
- );
- state.old_segment_locations.push(segment_filename);
- }
+ let duration = ((closed_at - opened_at).mseconds() as f32) / 1_000f32;
+ let file_name = path::Path::new(&location)
+ .file_name()
+ .unwrap()
+ .to_str()
+ .unwrap();
+
+ let settings = self.settings.lock().unwrap();
+ let segment_file_name = if let Some(playlist_root) = &settings.playlist_root {
+ format!("{playlist_root}/{file_name}")
+ } else {
+ file_name.to_string()
+ };
+ drop(settings);
+
+ context
+ .playlist
+ .add_segment(segment_file_name, duration, date_time);
+ context.old_segment_locations.push(location);
+
+ let _ = self.write_playlist(context);
+ }
+
+ fn write_playlist(
+ &self,
+ context: &mut PlaylistContext,
+ ) -> Result<gst::FlowSuccess, gst::FlowError> {
+ gst::info!(CAT, imp: self, "Preparing to write new playlist, COUNT {}", context.playlist.len());
let (playlist_location, max_num_segments, max_playlist_length) = {
let settings = self.settings.lock().unwrap();
@@ -311,7 +329,7 @@ impl HlsSink3 {
)
};
- state.playlist.update_playlist_state(max_playlist_length);
+ context.playlist.update_playlist_state(max_playlist_length);
// Acquires the playlist file handle so we can update it with new content. By default, this
// is expected to be the same file every time.
@@ -327,11 +345,11 @@ impl HlsSink3 {
imp: self,
"Could not get stream to write playlist content",
);
- gst::StateChangeError
+ gst::FlowError::Error
})?
.into_write();
- state
+ context
.playlist
.write_to(&mut playlist_stream)
.map_err(|err| {
@@ -341,7 +359,7 @@ impl HlsSink3 {
"Could not write new playlist: {}",
err.to_string()
);
- gst::StateChangeError
+ gst::FlowError::Error
})?;
playlist_stream.flush().map_err(|err| {
gst::error!(
@@ -350,14 +368,14 @@ impl HlsSink3 {
"Could not flush playlist: {}",
err.to_string()
);
- gst::StateChangeError
+ gst::FlowError::Error
})?;
- if state.playlist.is_type_undefined() && max_num_segments > 0 {
+ if context.playlist.is_type_undefined() && max_num_segments > 0 {
// Cleanup old segments from filesystem
- if state.old_segment_locations.len() > max_num_segments {
- for _ in 0..state.old_segment_locations.len() - max_num_segments {
- let old_segment_location = state.old_segment_locations.remove(0);
+ if context.old_segment_locations.len() > max_num_segments {
+ for _ in 0..context.old_segment_locations.len() - max_num_segments {
+ let old_segment_location = context.old_segment_locations.remove(0);
if !self
.obj()
.emit_by_name::<bool>(SIGNAL_DELETE_FRAGMENT, &[&old_segment_location])
@@ -369,40 +387,7 @@ impl HlsSink3 {
}
gst::debug!(CAT, imp: self, "Wrote new playlist file!");
- Ok(gst::StateChangeSuccess::Success)
- }
-
- fn segment_filename(&self, state: &mut StartedState) -> String {
- assert!(state.current_segment_location.is_some());
- let name = state.current_segment_location.take().unwrap();
- let segment_filename = path::Path::new(&name)
- .file_name()
- .unwrap()
- .to_str()
- .unwrap();
-
- let settings = self.settings.lock().unwrap();
- if let Some(playlist_root) = &settings.playlist_root {
- format!("{playlist_root}/{segment_filename}")
- } else {
- segment_filename.to_string()
- }
- }
-
- fn write_final_playlist(&self) -> Result<gst::StateChangeSuccess, gst::StateChangeError> {
- gst::debug!(CAT, imp: self, "Preparing to write final playlist");
- self.write_playlist(None, None)
- }
-
- fn stop(&self) {
- gst::debug!(CAT, imp: self, "Stopping");
-
- let mut state = self.state.lock().unwrap();
- if let State::Started(_) = *state {
- *state = State::Stopped;
- }
-
- gst::debug!(CAT, imp: self, "Stopped");
+ Ok(gst::FlowSuccess::Ok)
}
}
@@ -435,44 +420,39 @@ impl BinImpl for HlsSink3 {
if let Ok(new_fragment_opened_at) = s.get::<gst::ClockTime>("running-time")
{
let mut state = self.state.lock().unwrap();
- match &mut *state {
- State::Stopped => {}
- State::Started(state) => {
- state.fragment_opened_at = Some(new_fragment_opened_at)
- }
- };
+ if let Some(context) = state.context.as_mut() {
+ context.fragment_opened_at = Some(new_fragment_opened_at);
+ }
}
}
"splitmuxsink-fragment-closed" => {
- let s = msg.structure().unwrap();
-
let settings = self.settings.lock().unwrap();
- let mut state_guard = self.state.lock().unwrap();
- let state = match &mut *state_guard {
- State::Stopped => {
- gst::element_error!(
- self.obj(),
- gst::StreamError::Failed,
- ("Fragment closed in wrong state"),
- ["Fragment closed but element is in stopped state"]
+ let mut state = self.state.lock().unwrap();
+ let context = match state.context.as_mut() {
+ Some(context) => context,
+ None => {
+ gst::error!(
+ CAT,
+ imp: self,
+ "Playlist is not configured",
);
+
return;
}
- State::Started(state) => state,
};
- let fragment_pts = state
+ let fragment_pts = context
.fragment_running_time
.expect("fragment running time must be set by format-location-full");
- if state.pdt_base_running_time.is_none() {
- state.pdt_base_running_time = state.fragment_running_time;
+ if context.pdt_base_running_time.is_none() {
+ context.pdt_base_running_time = context.fragment_running_time;
}
// Calculate the mapping from running time to UTC
// calculate pdt_base_utc for each segment for !pdt_follows_pipeline_clock
// when pdt_follows_pipeline_clock is set, we calculate the base time every time
// this avoids the drift between pdt tag and external clock (if gst clock has skew w.r.t external clock)
- if state.pdt_base_utc.is_none() || !settings.pdt_follows_pipeline_clock {
+ if context.pdt_base_utc.is_none() || !settings.pdt_follows_pipeline_clock {
let now_utc = Utc::now();
let now_gst = settings.giostreamsink.clock().unwrap().time().unwrap();
let pts_clock_time =
@@ -483,21 +463,21 @@ impl BinImpl for HlsSink3 {
.checked_sub_signed(Duration::nanoseconds(diff.nseconds() as i64))
.expect("offsetting the utc with gstreamer clock-diff overflow");
- state.pdt_base_utc = Some(pts_utc);
+ context.pdt_base_utc = Some(pts_utc);
}
let fragment_date_time = if settings.enable_program_date_time
- && state.pdt_base_running_time.is_some()
+ && context.pdt_base_running_time.is_some()
{
// Add the diff of running time to UTC time
// date_time = first_segment_utc + (current_seg_running_time - first_seg_running_time)
- state
+ context
.pdt_base_utc
.unwrap()
.checked_add_signed(Duration::nanoseconds(
- state
+ context
.fragment_running_time
- .opt_checked_sub(state.pdt_base_running_time)
+ .opt_checked_sub(context.pdt_base_running_time)
.unwrap()
.unwrap()
.nseconds() as i64,
@@ -505,12 +485,11 @@ impl BinImpl for HlsSink3 {
} else {
None
};
- drop(state_guard);
+ drop(state);
drop(settings);
if let Ok(fragment_closed_at) = s.get::<gst::ClockTime>("running-time") {
- let _ =
- self.write_playlist(Some(fragment_closed_at), fragment_date_time);
+ self.on_fragment_closed(fragment_closed_at, fragment_date_time);
}
}
_ => {}
@@ -768,30 +747,29 @@ impl ObjectImpl for HlsSink3 {
]);
obj.add(&settings.splitmuxsink).unwrap();
- let state = self.state.clone();
settings
.splitmuxsink
.connect("format-location-full", false, {
- let self_weak = self.downgrade();
+ let imp_weak = self.downgrade();
move |args| {
- let self_ = match self_weak.upgrade() {
- Some(self_) => self_,
+ let imp = match imp_weak.upgrade() {
+ Some(imp) => imp,
None => return Some(None::<String>.to_value()),
};
let fragment_id = args[1].get::<u32>().unwrap();
- gst::info!(CAT, imp: self_, "Got fragment-id: {}", fragment_id);
+ gst::info!(CAT, imp: imp, "Got fragment-id: {}", fragment_id);
- let mut state_guard = state.lock().unwrap();
- let state = match &mut *state_guard {
- State::Stopped => {
+ let mut state = imp.state.lock().unwrap();
+ let context = match state.context.as_mut() {
+ Some(context) => context,
+ None => {
gst::error!(
CAT,
- imp: self_,
+ imp: imp,
"on format location called with Stopped state"
);
return Some("unknown_segment".to_value());
}
- State::Started(s) => s,
};
let sample = args[2].get::<gst::Sample>().unwrap();
@@ -802,22 +780,22 @@ impl ObjectImpl for HlsSink3 {
.expect("segment not available")
.downcast_ref::<gst::ClockTime>()
.expect("no time segment");
- state.fragment_running_time =
+ context.fragment_running_time =
segment.to_running_time(buffer.pts().unwrap());
} else {
gst::warning!(
CAT,
- imp: self_,
+ imp: imp,
"buffer null for fragment-id: {}",
fragment_id
);
}
- drop(state_guard);
+ drop(state);
- match self_.on_format_location(fragment_id) {
+ match imp.on_format_location(fragment_id) {
Ok(segment_location) => Some(segment_location.to_value()),
Err(err) => {
- gst::error!(CAT, imp: self_, "on format-location handler: {}", err);
+ gst::error!(CAT, imp: imp, "on format-location handler: {}", err);
Some("unknown_segment".to_value())
}
}
@@ -874,7 +852,7 @@ impl ElementImpl for HlsSink3 {
&self,
transition: gst::StateChange,
) -> Result<gst::StateChangeSuccess, gst::StateChangeError> {
- if let gst::StateChange::NullToReady = transition {
+ if transition == gst::StateChange::ReadyToPaused {
self.start();
}
@@ -883,39 +861,15 @@ impl ElementImpl for HlsSink3 {
match transition {
gst::StateChange::PlayingToPaused => {
let mut state = self.state.lock().unwrap();
- match &mut *state {
- State::Stopped => (),
- State::Started(state) => {
- // reset mapping from rt to utc. during pause
- // rt is stopped but utc keep moving so need to
- // calculate the mapping again
- state.pdt_base_running_time = None;
- state.pdt_base_utc = None
- }
+ if let Some(context) = state.context.as_mut() {
+ // reset mapping from rt to utc. during pause
+ // rt is stopped but utc keep moving so need to
+ // calculate the mapping again
+ context.pdt_base_running_time = None;
+ context.pdt_base_utc = None
}
}
gst::StateChange::PausedToReady => {
- let write_final = {
- let mut state = self.state.lock().unwrap();
- match &mut *state {
- State::Stopped => false,
- State::Started(state) => {
- if state.playlist.is_rendering() {
- state.playlist.stop();
- true
- } else {
- false
- }
- }
- }
- };
-
- if write_final {
- // Don't fail transitioning to READY if this fails
- let _ = self.write_final_playlist();
- }
- }
- gst::StateChange::ReadyToNull => {
self.stop();
}
_ => (),