Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.freedesktop.org/gstreamer/gst-plugins-rs.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--utils/livesync/src/livesync/imp.rs217
1 files changed, 128 insertions, 89 deletions
diff --git a/utils/livesync/src/livesync/imp.rs b/utils/livesync/src/livesync/imp.rs
index 0e655c4a2..7355edaab 100644
--- a/utils/livesync/src/livesync/imp.rs
+++ b/utils/livesync/src/livesync/imp.rs
@@ -14,7 +14,7 @@ use gst::{
};
use once_cell::sync::Lazy;
use parking_lot::{Condvar, Mutex, MutexGuard};
-use std::sync::mpsc;
+use std::{collections::VecDeque, sync::mpsc};
/// Offset for the segment in single-segment mode, to handle negative DTS
const SEGMENT_OFFSET: gst::ClockTime = gst::ClockTime::from_seconds(60 * 60 * 1000);
@@ -67,19 +67,23 @@ struct State {
upstream_latency: Option<gst::ClockTime>,
fallback_duration: gst::ClockTime,
+ playing: bool,
eos: bool,
- segment: Option<gst::FormattedSegment<gst::ClockTime>>,
srcresult: Result<gst::FlowSuccess, gst::FlowError>,
- playing: bool,
- sent_segment: bool,
clock_id: Option<gst::SingleShotClockId>,
+ in_segment: Option<gst::FormattedSegment<gst::ClockTime>>,
+ pending_segment: Option<gst::FormattedSegment<gst::ClockTime>>,
+ out_segment: Option<gst::FormattedSegment<gst::ClockTime>>,
+
in_caps: Option<gst::Caps>,
+ pending_caps: Option<gst::Caps>,
in_audio_info: Option<gst_audio::AudioInfo>,
out_audio_info: Option<gst_audio::AudioInfo>,
- in_item: Option<Item>,
+ queue: VecDeque<Item>,
+ buffer_queued: bool,
out_buffer: Option<gst::Buffer>,
in_timestamp: Option<Timestamps>,
@@ -113,16 +117,19 @@ impl Default for State {
late_threshold: DEFAULT_LATE_THRESHOLD,
upstream_latency: None,
fallback_duration: DEFAULT_DURATION,
+ playing: false,
eos: false,
- segment: None,
srcresult: Err(gst::FlowError::Flushing),
- playing: false,
- sent_segment: false,
clock_id: None,
+ in_segment: None,
+ pending_segment: None,
+ out_segment: None,
in_caps: None,
+ pending_caps: None,
in_audio_info: None,
out_audio_info: None,
- in_item: None,
+ queue: VecDeque::with_capacity(32),
+ buffer_queued: false,
out_buffer: None,
in_timestamp: None,
out_timestamp: None,
@@ -394,14 +401,15 @@ impl ElementImpl for LiveSync {
impl State {
/// Calculate the running time the buffer covers, including latency
- fn ts_range(&self, buf: &gst::BufferRef) -> Option<Timestamps> {
+ fn ts_range(
+ &self,
+ buf: &gst::BufferRef,
+ segment: &gst::FormattedSegment<gst::ClockTime>,
+ ) -> Option<Timestamps> {
let mut timestamp_start = buf.dts_or_pts()?;
if !self.single_segment {
- timestamp_start = self
- .segment
- .as_ref()
- .unwrap()
+ timestamp_start = segment
.to_running_time(timestamp_start)
.unwrap_or(gst::ClockTime::ZERO);
timestamp_start += self.latency + self.upstream_latency.unwrap();
@@ -456,16 +464,17 @@ impl LiveSync {
state.in_timestamp = None;
state.num_in = 0;
state.num_drop = 0;
- state.segment = None;
+ state.in_segment = None;
} else {
{
let mut state = self.state.lock();
state.srcresult = Err(gst::FlowError::Flushing);
- state.out_buffer = None;
- state.out_audio_info = None;
if let Some(clock_id) = state.clock_id.take() {
clock_id.unschedule();
}
+ state.pending_caps = None;
+ state.out_audio_info = None;
+ state.out_buffer = None;
self.cond.notify_all();
}
@@ -474,7 +483,8 @@ impl LiveSync {
let mut state = self.state.lock();
state.in_caps = None;
state.in_audio_info = None;
- state.in_item = None;
+ state.queue.clear();
+ state.buffer_queued = false;
state.update_fallback_duration();
}
drop(lock);
@@ -500,7 +510,8 @@ impl LiveSync {
let mut state = self.state.lock();
state.srcresult = Ok(gst::FlowSuccess::Ok);
- state.sent_segment = false;
+ state.pending_segment = None;
+ state.out_segment = None;
state.out_timestamp = None;
state.num_out = 0;
state.num_duplicate = 0;
@@ -513,11 +524,12 @@ impl LiveSync {
{
let mut state = self.state.lock();
state.srcresult = Err(gst::FlowError::Flushing);
- state.out_buffer = None;
- state.out_audio_info = None;
if let Some(clock_id) = state.clock_id.take() {
clock_id.unschedule();
}
+ state.pending_caps = None;
+ state.out_audio_info = None;
+ state.out_buffer = None;
self.cond.notify_all();
}
@@ -558,12 +570,15 @@ impl LiveSync {
let mut state = self.state.lock();
state.srcresult = Ok(gst::FlowSuccess::Ok);
state.eos = false;
- state.sent_segment = false;
- state.segment = None;
+ state.in_segment = None;
+ state.pending_segment = None;
+ state.out_segment = None;
state.in_caps = None;
+ state.pending_caps = None;
state.in_audio_info = None;
state.out_audio_info = None;
- state.in_item = None;
+ state.queue.clear();
+ state.buffer_queued = false;
state.out_buffer = None;
state.update_fallback_duration();
@@ -587,11 +602,7 @@ impl LiveSync {
};
let mut state = self.state.lock();
- state.segment = Some(segment.clone());
- if !state.single_segment {
- state.sent_segment = false;
- }
- return true;
+ state.in_segment = Some(segment.clone());
}
gst::EventView::Gap(_) => {
@@ -638,30 +649,25 @@ impl LiveSync {
state.in_caps = Some(caps);
state.in_audio_info = audio_info;
state.update_fallback_duration();
- return true;
}
_ => {}
}
- if event.is_serialized() {
- let mut state = self.state.lock();
- while state.srcresult.is_ok() && state.in_item.is_some() {
- self.cond.wait(&mut state);
- }
+ if !event.is_serialized() {
+ return gst::Pad::event_default(pad, Some(&*self.obj()), event);
+ }
- if state.srcresult.is_err() {
- return false;
- }
+ let mut state = self.state.lock();
+ if state.srcresult.is_err() {
+ return false;
+ }
- gst::trace!(CAT, imp: self, "Queueing {:?}", event);
- state.in_item = Some(Item::Event(event));
- self.cond.notify_all();
+ gst::trace!(CAT, imp: self, "Queueing {:?}", event);
+ state.queue.push_back(Item::Event(event));
+ self.cond.notify_all();
- true
- } else {
- gst::Pad::event_default(pad, Some(&*self.obj()), event)
- }
+ true
}
fn src_event(&self, pad: &gst::Pad, mut event: gst::Event) -> bool {
@@ -695,16 +701,14 @@ impl LiveSync {
let (sender, receiver) = mpsc::sync_channel(1);
let mut state = self.state.lock();
- while state.srcresult.is_ok() && state.in_item.is_some() {
- self.cond.wait(&mut state);
- }
-
if state.srcresult.is_err() {
return false;
}
gst::trace!(CAT, imp: self, "Queueing {:?}", query);
- state.in_item = Some(Item::Query(std::ptr::NonNull::from(query), sender));
+ state
+ .queue
+ .push_back(Item::Query(std::ptr::NonNull::from(query), sender));
self.cond.notify_all();
drop(state);
@@ -774,7 +778,7 @@ impl LiveSync {
}
}
- while state.srcresult.is_ok() && state.in_item.is_some() {
+ while state.srcresult.is_ok() && state.buffer_queued {
self.cond.wait(&mut state);
}
state.srcresult?;
@@ -819,9 +823,10 @@ impl LiveSync {
}
}
+ // At this stage we should really really have a segment
+ let segment = state.in_segment.as_ref().ok_or(gst::FlowError::Error)?;
+
if state.single_segment {
- // At this stage we should really really have a segment
- let segment = state.segment.as_ref().ok_or(gst::FlowError::Error)?;
let dts = segment
.to_running_time_full(buf_mut.dts())
.map(|r| r + SEGMENT_OFFSET)
@@ -855,7 +860,7 @@ impl LiveSync {
buf_mut.set_flags(gst::BufferFlags::DISCONT);
}
- let mut timestamp = state.ts_range(buf_mut);
+ let mut timestamp = state.ts_range(buf_mut, segment);
let lateness = self.buffer_is_backwards(&state, timestamp);
match lateness {
BufferLateness::OnTime => {}
@@ -889,12 +894,13 @@ impl LiveSync {
buf_mut.set_pts(prev.pts().map(|t| t + prev_duration));
buf_mut.set_flags(gst::BufferFlags::GAP);
- timestamp = state.ts_range(buf_mut);
+ timestamp = state.ts_range(buf_mut, state.out_segment.as_ref().unwrap());
}
}
gst::trace!(CAT, imp: self, "Queueing {:?} ({:?})", buffer, lateness);
- state.in_item = Some(Item::Buffer(buffer, lateness));
+ state.queue.push_back(Item::Buffer(buffer, lateness));
+ state.buffer_queued = true;
state.in_timestamp = timestamp;
state.num_in += 1;
self.cond.notify_all();
@@ -958,31 +964,54 @@ impl LiveSync {
fn src_loop_inner(&self) -> Result<gst::FlowSuccess, gst::FlowError> {
let mut state = self.state.lock();
while state.srcresult.is_ok()
- && (!state.playing || (state.in_item.is_none() && state.out_buffer.is_none()))
+ && (!state.playing || (state.queue.is_empty() && state.out_buffer.is_none()))
{
self.cond.wait(&mut state);
}
state.srcresult?;
- gst::trace!(CAT, imp: self, "Unqueueing {:?}", state.in_item);
- let in_buffer = match state.in_item.take() {
+ let in_item = state.queue.pop_front();
+ gst::trace!(CAT, imp: self, "Unqueueing {:?}", in_item);
+
+ let in_buffer = match in_item {
None => None,
Some(Item::Buffer(buffer, lateness)) => {
if self.buffer_is_early(&state, state.in_timestamp) {
// Try this buffer again on the next iteration
- state.in_item = Some(Item::Buffer(buffer, lateness));
+ state.queue.push_front(Item::Buffer(buffer, lateness));
None
} else {
+ state.buffer_queued = false;
Some((buffer, lateness))
}
}
Some(Item::Event(event)) => {
+ let mut push = true;
+
+ match event.view() {
+ gst::EventView::Segment(e) => {
+ let segment = e.segment().downcast_ref().unwrap();
+ state.pending_segment = Some(segment.clone());
+ push = false;
+ }
+
+ gst::EventView::Caps(e) => {
+ state.pending_caps = Some(e.caps_owned());
+ state.update_fallback_duration();
+ push = false;
+ }
+
+ _ => {}
+ }
+
self.cond.notify_all();
drop(state);
- self.srcpad.push_event(event);
+ if push {
+ self.srcpad.push_event(event);
+ }
return Ok(gst::FlowSuccess::Ok);
}
@@ -999,19 +1028,18 @@ impl LiveSync {
}
};
- let (duplicate, caps) = if let Some((buffer, lateness)) = in_buffer {
- let caps = state.in_caps.take();
-
+ let duplicate;
+ let mut caps = None;
+ let mut segment = None;
+ if let Some((buffer, lateness)) = in_buffer {
state.out_buffer = Some(buffer);
state.out_timestamp = state.in_timestamp;
- if caps.is_some() {
- state.out_audio_info = state.in_audio_info.clone();
- }
+ caps = state.pending_caps.take();
+ segment = state.pending_segment.take();
+ duplicate = lateness != BufferLateness::OnTime;
self.cond.notify_all();
-
- (lateness != BufferLateness::OnTime, caps)
} else {
// Work around borrow checker
let State {
@@ -1045,8 +1073,11 @@ impl LiveSync {
buffer.set_flags(gst::BufferFlags::GAP);
buffer.unset_flags(gst::BufferFlags::DISCONT);
- state.out_timestamp = state.ts_range(state.out_buffer.as_ref().unwrap());
- (true, None)
+ state.out_timestamp = state.ts_range(
+ state.out_buffer.as_ref().unwrap(),
+ state.out_segment.as_ref().unwrap(),
+ );
+ duplicate = true;
};
let buffer = state.out_buffer.clone().unwrap();
@@ -1060,29 +1091,37 @@ impl LiveSync {
let event = gst::event::Caps::new(&caps);
MutexGuard::unlocked(&mut state, || self.srcpad.push_event(event));
state.srcresult?;
+
+ state.out_audio_info = caps
+ .structure(0)
+ .unwrap()
+ .has_name("audio/x-raw")
+ .then(|| gst_audio::AudioInfo::from_caps(&caps).unwrap());
}
- if !state.sent_segment {
- let event = if state.single_segment {
+ if let Some(segment) = segment {
+ if !state.single_segment {
+ gst::debug!(CAT, imp: self, "Forwarding segment: {:?}", segment);
+
+ let event = gst::event::Segment::new(&segment);
+ MutexGuard::unlocked(&mut state, || self.srcpad.push_event(event));
+ state.srcresult?;
+ } else if state.out_segment.is_none() {
// Create live segment
- let mut segment = gst::FormattedSegment::<gst::ClockTime>::new();
- segment.set_start(sync_ts + SEGMENT_OFFSET);
- segment.set_base(sync_ts);
- segment.set_time(sync_ts);
- segment.set_position(sync_ts + SEGMENT_OFFSET);
-
- gst::debug!(CAT, imp: self, "Sending new segment: {:?}", segment);
- gst::event::Segment::new(&segment)
- } else {
- let segment = state.segment.as_ref().unwrap();
+ let mut live_segment = gst::FormattedSegment::<gst::ClockTime>::new();
+ live_segment.set_start(sync_ts + SEGMENT_OFFSET);
+ live_segment.set_base(sync_ts);
+ live_segment.set_time(sync_ts);
+ live_segment.set_position(sync_ts + SEGMENT_OFFSET);
- gst::debug!(CAT, imp: self, "Forwarding segment: {:?}", segment);
- gst::event::Segment::new(segment)
- };
+ gst::debug!(CAT, imp: self, "Sending new segment: {:?}", live_segment);
- MutexGuard::unlocked(&mut state, || self.srcpad.push_event(event));
- state.srcresult?;
- state.sent_segment = true;
+ let event = gst::event::Segment::new(&live_segment);
+ MutexGuard::unlocked(&mut state, || self.srcpad.push_event(event));
+ state.srcresult?;
+ }
+
+ state.out_segment = Some(segment);
}
{