Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.freedesktop.org/gstreamer/gst-plugins-rs.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSeungha Yang <seungha@centricular.com>2020-11-27 15:40:40 +0300
committerSebastian Dröge <slomo@coaxion.net>2020-12-01 14:07:34 +0300
commit58786fa0b5f122f2ccd60277739fb1450b3a71f7 (patch)
tree5f8582167da0b6c1ce7be3eb76d7db6401b20448 /video/closedcaption
parent8b8380992faed4388560f411e7ad0fd0537a12b9 (diff)
sccparse: Add support for seeking
Only pull mode can support seeking for now and reverse playback is not implemented yet. Note that this restriction is the same as that of mccparse.
Diffstat (limited to 'video/closedcaption')
-rw-r--r--video/closedcaption/src/scc_parse/imp.rs711
-rw-r--r--video/closedcaption/src/scc_parse/parser.rs6
-rw-r--r--video/closedcaption/tests/scc_parse.rs82
3 files changed, 743 insertions, 56 deletions
diff --git a/video/closedcaption/src/scc_parse/imp.rs b/video/closedcaption/src/scc_parse/imp.rs
index 8fc00054d..ebe3c08f8 100644
--- a/video/closedcaption/src/scc_parse/imp.rs
+++ b/video/closedcaption/src/scc_parse/imp.rs
@@ -20,8 +20,13 @@ use glib::subclass;
use glib::subclass::prelude::*;
use gst::prelude::*;
use gst::subclass::prelude::*;
-use gst::{gst_debug, gst_element_error, gst_error, gst_fixme, gst_log, gst_trace, gst_warning};
+use gst::{
+ gst_debug, gst_element_error, gst_error, gst_fixme, gst_info, gst_log, gst_loggable_error,
+ gst_trace, gst_warning,
+};
+use std::cmp;
+use std::convert::TryInto;
use std::sync::{Mutex, MutexGuard};
use once_cell::sync::Lazy;
@@ -39,6 +44,28 @@ static CAT: Lazy<gst::DebugCategory> = Lazy::new(|| {
});
#[derive(Debug)]
+struct PullState {
+ need_stream_start: bool,
+ stream_id: String,
+ offset: u64,
+ duration: gst::ClockTime,
+}
+
+impl PullState {
+ fn new(element: &super::SccParse, pad: &gst::Pad) -> Self {
+ Self {
+ need_stream_start: true,
+ stream_id: pad
+ .create_stream_id(element, Some("src"))
+ .unwrap()
+ .to_string(),
+ offset: 0,
+ duration: gst::CLOCK_TIME_NONE,
+ }
+ }
+}
+
+#[derive(Debug)]
struct State {
reader: LineReader<gst::MappedBuffer<gst::buffer::Readable>>,
parser: SccParser,
@@ -47,6 +74,16 @@ struct State {
framerate: Option<gst::Fraction>,
last_position: gst::ClockTime,
last_timecode: Option<gst_video::ValidVideoTimeCode>,
+ segment: gst::FormattedSegment<gst::ClockTime>,
+
+ // Pull mode
+ pull: Option<PullState>,
+
+ // seeking
+ seeking: bool,
+ discont: bool,
+ seek_seqnum: Option<gst::Seqnum>,
+ need_flush_stop: bool,
}
impl Default for State {
@@ -59,10 +96,43 @@ impl Default for State {
framerate: None,
last_position: gst::CLOCK_TIME_NONE,
last_timecode: None,
+ segment: gst::FormattedSegment::<gst::ClockTime>::new(),
+ pull: None,
+ seeking: false,
+ discont: false,
+ seek_seqnum: None,
+ need_flush_stop: false,
}
}
}
+fn parse_timecode(
+ framerate: gst::Fraction,
+ tc: &TimeCode,
+) -> Result<gst_video::ValidVideoTimeCode, gst::FlowError> {
+ use std::convert::TryFrom;
+
+ let timecode = gst_video::VideoTimeCode::new(
+ framerate,
+ None,
+ if tc.drop_frame {
+ gst_video::VideoTimeCodeFlags::DROP_FRAME
+ } else {
+ gst_video::VideoTimeCodeFlags::empty()
+ },
+ tc.hours,
+ tc.minutes,
+ tc.seconds,
+ tc.frames,
+ 0,
+ );
+
+ match gst_video::ValidVideoTimeCode::try_from(timecode).map_err(|_| gst::FlowError::Error) {
+ Ok(timecode) => Ok(timecode),
+ Err(timecode) => Err(timecode),
+ }
+}
+
impl State {
#[allow(clippy::type_complexity)]
fn get_line(
@@ -84,28 +154,11 @@ impl State {
fn handle_timecode(
&mut self,
- tc: TimeCode,
+ tc: &TimeCode,
framerate: gst::Fraction,
element: &super::SccParse,
) -> Result<gst_video::ValidVideoTimeCode, gst::FlowError> {
- use std::convert::TryFrom;
-
- let timecode = gst_video::VideoTimeCode::new(
- framerate,
- None,
- if tc.drop_frame {
- gst_video::VideoTimeCodeFlags::DROP_FRAME
- } else {
- gst_video::VideoTimeCodeFlags::empty()
- },
- tc.hours,
- tc.minutes,
- tc.seconds,
- tc.frames,
- 0,
- );
-
- match gst_video::ValidVideoTimeCode::try_from(timecode) {
+ match parse_timecode(framerate, &tc) {
Ok(timecode) => Ok(timecode),
Err(timecode) => {
let last_timecode =
@@ -176,6 +229,61 @@ impl State {
.unwrap_or(gst::CLOCK_TIME_NONE),
);
}
+
+ fn create_events(
+ &mut self,
+ element: &super::SccParse,
+ framerate: Option<gst::Fraction>,
+ ) -> Vec<gst::Event> {
+ let mut events = Vec::new();
+
+ if self.need_flush_stop {
+ let mut b = gst::event::FlushStop::builder(true);
+
+ if let Some(seek_seqnum) = self.seek_seqnum {
+ b = b.seqnum(seek_seqnum);
+ }
+
+ events.push(b.build());
+ self.need_flush_stop = false;
+ }
+
+ if let Some(pull) = &mut self.pull {
+ if pull.need_stream_start {
+ events.push(gst::event::StreamStart::new(&pull.stream_id));
+ pull.need_stream_start = false;
+ }
+ }
+
+ if let Some(framerate) = framerate {
+ if self.framerate != Some(framerate) {
+ self.framerate = Some(framerate);
+
+ let caps = gst::Caps::builder("closedcaption/x-cea-608")
+ .field("format", &"raw")
+ .field("framerate", &framerate)
+ .build();
+ self.framerate = Some(framerate);
+
+ events.push(gst::event::Caps::new(&caps));
+ gst_info!(CAT, obj: element, "Caps changed to {:?}", &caps);
+ }
+ }
+
+ if self.need_segment {
+ let mut b = gst::event::Segment::builder(&self.segment);
+
+ if let Some(seek_seqnum) = self.seek_seqnum {
+ b = b.seqnum(seek_seqnum);
+ }
+
+ events.push(b.build());
+ self.need_segment = false;
+ }
+
+ events.extend(self.pending_events.drain(..));
+ events
+ }
}
pub struct SccParse {
@@ -228,7 +336,12 @@ impl SccParse {
break Err(gst::FlowError::Error);
}
- Ok(None) => break Ok(gst::FlowSuccess::Ok),
+ Ok(None) => {
+ if drain && state.pull.is_some() {
+ break Err(gst::FlowError::Eos);
+ }
+ break Ok(gst::FlowSuccess::Ok);
+ }
}
}
}
@@ -256,27 +369,52 @@ impl SccParse {
gst::Fraction::new(30, 1)
};
- let mut events = Vec::new();
+ let mut timecode = state.handle_timecode(&tc, framerate, element)?;
+ let start_time = gst::ClockTime::from(timecode.nsec_since_daily_jam());
+ let segment_start = state.segment.get_start();
+ let clip_buffers = if state.seeking {
+ // If we are in the middle of seeking, check whether this line
+ // contains start frame, and if so, unset seeking flag
+ let num_bufs = (data.len() / 2) as i64;
+ let mut end_timecode = parse_timecode(framerate, &tc).unwrap();
+ // add one more frame here so that add duration of the last frame
+ end_timecode.add_frames(num_bufs + 1);
+ let stop_time = gst::ClockTime::from(end_timecode.nsec_since_daily_jam());
+
+ gst_trace!(
+ CAT,
+ obj: element,
+ "Checking inside of segment, line start {} line stop {} segment start {} num bufs {}",
+ start_time,
+ stop_time,
+ segment_start,
+ num_bufs,
+ );
- if Some(framerate) != state.framerate {
- let caps = gst::Caps::builder("closedcaption/x-cea-608")
- .field("format", &"raw")
- .field("framerate", &framerate)
- .build();
- events.push(gst::event::Caps::new(&caps));
- state.framerate = Some(framerate);
- }
+ if stop_time > segment_start {
+ state.seeking = false;
+ state.discont = true;
+ state.need_flush_stop = true;
+ }
- if state.need_segment {
- let segment = gst::FormattedSegment::<gst::format::Time>::new();
- events.push(gst::event::Segment::new(&segment));
- state.need_segment = false;
- }
+ // Still need to scan lines to find the first buffer
+ if state.seeking {
+ drop(state);
+ return Ok(self.state.lock().unwrap());
+ }
- events.extend(state.pending_events.drain(..));
+ true
+ } else {
+ false
+ };
- let mut timecode = state.handle_timecode(tc, framerate, element)?;
- let mut buffers = gst::BufferList::new_sized(data.len() / 2);
+ let mut buffers = if clip_buffers {
+ gst::BufferList::new()
+ } else {
+ gst::BufferList::new_sized(data.len() / 2)
+ };
+
+ let mut send_eos = false;
for d in data.chunks_exact(2) {
let mut buffer = gst::Buffer::with_size(d.len()).unwrap();
{
@@ -286,13 +424,38 @@ impl SccParse {
state.add_buffer_metadata(&mut buffer, &timecode, framerate, element);
timecode.increment_frame();
+
+ if clip_buffers {
+ let end_time = buffer.get_pts() + buffer.get_duration();
+ if end_time < segment_start {
+ gst_trace!(
+ CAT,
+ obj: element,
+ "Skip segment clipped buffer {:?}",
+ buffer,
+ );
+
+ continue;
+ }
+ }
+
+ send_eos = state.segment.get_stop().is_some()
+ && buffer.get_pts() + buffer.get_duration() >= state.segment.get_stop();
+
let buffers = buffers.get_mut().unwrap();
buffers.add(buffer);
+
+ // Terminate loop once we found EOS boundary buffer
+ if send_eos {
+ break;
+ }
}
// Update the last_timecode to the current one
state.last_timecode = Some(timecode);
+ let events = state.create_events(element, Some(framerate));
+
// Drop our state mutex while we push out buffers or events
drop(state);
@@ -306,9 +469,295 @@ impl SccParse {
err
})?;
+ if send_eos {
+ return Err(gst::FlowError::Eos);
+ }
+
Ok(self.state.lock().unwrap())
}
+ fn sink_activate(
+ &self,
+ pad: &gst::Pad,
+ element: &super::SccParse,
+ ) -> Result<(), gst::LoggableError> {
+ let mode = {
+ let mut query = gst::query::Scheduling::new();
+ let mut state = self.state.lock().unwrap();
+
+ state.pull = None;
+
+ if !pad.peer_query(&mut query) {
+ gst_debug!(CAT, obj: pad, "Scheduling query failed on peer");
+ gst::PadMode::Push
+ } else if query
+ .has_scheduling_mode_with_flags(gst::PadMode::Pull, gst::SchedulingFlags::SEEKABLE)
+ {
+ gst_debug!(CAT, obj: pad, "Activating in Pull mode");
+
+ state.pull = Some(PullState::new(element, &self.srcpad));
+
+ gst::PadMode::Pull
+ } else {
+ gst_debug!(CAT, obj: pad, "Activating in Push mode");
+ gst::PadMode::Push
+ }
+ };
+
+ pad.activate_mode(mode, true)?;
+ Ok(())
+ }
+
+ fn start_task(&self, element: &super::SccParse) -> Result<(), gst::LoggableError> {
+ let element_weak = element.downgrade();
+ let pad_weak = self.sinkpad.downgrade();
+ let res = self.sinkpad.start_task(move || {
+ let element = match element_weak.upgrade() {
+ Some(element) => element,
+ None => {
+ if let Some(pad) = pad_weak.upgrade() {
+ pad.pause_task().unwrap();
+ }
+ return;
+ }
+ };
+
+ let parse = Self::from_instance(&element);
+ parse.loop_fn(&element);
+ });
+ if res.is_err() {
+ return Err(gst_loggable_error!(CAT, "Failed to start pad task"));
+ }
+ Ok(())
+ }
+
+ fn sink_activatemode(
+ &self,
+ _pad: &gst::Pad,
+ element: &super::SccParse,
+ mode: gst::PadMode,
+ active: bool,
+ ) -> Result<(), gst::LoggableError> {
+ if mode == gst::PadMode::Pull {
+ if active {
+ self.start_task(element)?;
+ } else {
+ let _ = self.sinkpad.stop_task();
+ }
+ }
+
+ Ok(())
+ }
+
+ fn scan_duration(
+ &self,
+ element: &super::SccParse,
+ ) -> Result<Option<gst_video::ValidVideoTimeCode>, gst::LoggableError> {
+ gst_debug!(CAT, obj: element, "Scanning duration");
+
+ /* First let's query the bytes duration upstream */
+ let mut q = gst::query::Duration::new(gst::Format::Bytes);
+
+ if !self.sinkpad.peer_query(&mut q) {
+ return Err(gst_loggable_error!(
+ CAT,
+ "Failed to query upstream duration"
+ ));
+ }
+
+ let size = match q.get_result().try_into().unwrap() {
+ gst::format::Bytes(Some(size)) => size,
+ gst::format::Bytes(None) => {
+ return Err(gst_loggable_error!(
+ CAT,
+ "Failed to query upstream duration"
+ ));
+ }
+ };
+
+ let mut offset = size;
+ let mut buffers = Vec::new();
+ let mut last_tc = None;
+
+ loop {
+ let scan_size = cmp::min(offset, 4096);
+
+ offset -= scan_size;
+
+ match self.sinkpad.pull_range(offset, scan_size as u32) {
+ Ok(buffer) => {
+ buffers.push(buffer);
+ }
+ Err(flow) => {
+ return Err(gst_loggable_error!(
+ CAT,
+ "Failed to pull buffer while scanning duration: {:?}",
+ flow
+ ));
+ }
+ }
+
+ let mut reader = LineReader::new();
+ let mut parser = SccParser::new_scan_captions();
+
+ for buf in buffers.iter().rev() {
+ let buf = buf
+ .clone()
+ .into_mapped_buffer_readable()
+ .map_err(|_| gst_loggable_error!(CAT, "Failed to map buffer readable"))?;
+
+ reader.push(buf);
+ }
+
+ while let Some(line) = reader.get_line_with_drain(true) {
+ if let Ok(SccLine::Caption(tc, data)) =
+ parser.parse_line(line).map_err(|err| (line, err))
+ {
+ let framerate = if tc.drop_frame {
+ gst::Fraction::new(30000, 1001)
+ } else {
+ gst::Fraction::new(30, 1)
+ };
+
+ if let Ok(mut timecode) = parse_timecode(framerate, &tc) {
+ /* We're looking for the total duration */
+ timecode.add_frames((data.len() / 2) as i64 + 1);
+ last_tc = Some(timecode);
+ }
+ }
+ }
+
+ if last_tc.is_some() || offset == 0 {
+ gst_debug!(
+ CAT,
+ obj: element,
+ "Duration scan done, last_tc: {:?}",
+ last_tc
+ );
+ break (Ok(last_tc));
+ }
+ }
+ }
+
+ fn push_eos(&self, element: &super::SccParse) {
+ let mut state = self.state.lock().unwrap();
+
+ if state.seeking {
+ state.need_flush_stop = true;
+ }
+
+ let mut events = state.create_events(element, None);
+ let mut eos_event = gst::event::Eos::builder();
+
+ if let Some(seek_seqnum) = state.seek_seqnum {
+ eos_event = eos_event.seqnum(seek_seqnum);
+ }
+
+ events.push(eos_event.build());
+
+ // Drop our state mutex while we push out events
+ drop(state);
+
+ for event in events {
+ gst_debug!(CAT, obj: element, "Pushing event {:?}", event);
+ self.srcpad.push_event(event);
+ }
+ }
+
+ fn loop_fn(&self, element: &super::SccParse) {
+ let mut state = self.state.lock().unwrap();
+ let State {
+ ref framerate,
+ ref mut pull,
+ ..
+ } = *state;
+ let mut pull = pull.as_mut().unwrap();
+ let scan_duration = framerate.is_none() && pull.duration.is_none();
+ let offset = pull.offset;
+
+ pull.offset += 4096;
+
+ drop(state);
+
+ let buffer = match self.sinkpad.pull_range(offset, 4096) {
+ Ok(buffer) => Some(buffer),
+ Err(gst::FlowError::Eos) => None,
+ Err(gst::FlowError::Flushing) => {
+ gst_debug!(CAT, obj: &self.sinkpad, "Pausing after pulling buffer, reason: flushing");
+
+ self.sinkpad.pause_task().unwrap();
+ return;
+ }
+ Err(flow) => {
+ gst_error!(CAT, obj: &self.sinkpad, "Failed to pull, reason: {:?}", flow);
+
+ gst_element_error!(
+ element,
+ gst::StreamError::Failed,
+ ["Streaming stopped, failed to pull buffer"]
+ );
+
+ self.sinkpad.pause_task().unwrap();
+ return;
+ }
+ };
+
+ match self.handle_buffer(element, buffer) {
+ Ok(_) => {
+ if scan_duration {
+ match self.scan_duration(element) {
+ Ok(Some(tc)) => {
+ let mut state = self.state.lock().unwrap();
+ let mut pull = state.pull.as_mut().unwrap();
+ pull.duration = tc.nsec_since_daily_jam().into();
+ }
+ Ok(None) => {
+ let mut state = self.state.lock().unwrap();
+ let mut pull = state.pull.as_mut().unwrap();
+ pull.duration = 0.into();
+ }
+ Err(err) => {
+ err.log();
+
+ gst_element_error!(
+ element,
+ gst::StreamError::Decode,
+ ["Failed to scan duration"]
+ );
+
+ self.sinkpad.pause_task().unwrap();
+ }
+ }
+ }
+ }
+ Err(flow) => {
+ match flow {
+ gst::FlowError::Flushing => {
+ gst_debug!(CAT, obj: element, "Pausing after flow {:?}", flow);
+ }
+ gst::FlowError::Eos => {
+ self.push_eos(element);
+
+ gst_debug!(CAT, obj: element, "Pausing after flow {:?}", flow);
+ }
+ _ => {
+ self.push_eos(element);
+
+ gst_error!(CAT, obj: element, "Pausing after flow {:?}", flow);
+
+ gst_element_error!(
+ element,
+ gst::StreamError::Failed,
+ ["Streaming stopped, reason: {:?}", flow]
+ );
+ }
+ }
+
+ self.sinkpad.pause_task().unwrap();
+ }
+ }
+ }
+
fn sink_chain(
&self,
pad: &gst::Pad,
@@ -320,6 +769,23 @@ impl SccParse {
self.handle_buffer(element, Some(buffer))
}
+ fn flush(&self, mut state: MutexGuard<State>) -> MutexGuard<State> {
+ state.reader.clear();
+ state.parser.reset();
+ if let Some(pull) = &mut state.pull {
+ pull.offset = 0;
+ }
+ state.segment = gst::FormattedSegment::<gst::ClockTime>::new();
+ state.need_segment = true;
+ state.pending_events.clear();
+ state.last_position = 0.into();
+ state.last_timecode = None;
+
+ drop(state);
+
+ self.state.lock().unwrap()
+ }
+
fn sink_event(&self, pad: &gst::Pad, element: &super::SccParse, event: gst::Event) -> bool {
use gst::EventView;
@@ -337,13 +803,9 @@ impl SccParse {
true
}
EventView::FlushStop(_) => {
- let mut state = self.state.lock().unwrap();
- state.reader.clear();
- state.parser.reset();
- state.need_segment = true;
- state.pending_events.clear();
- state.last_position = gst::ClockTime::from_seconds(0);
- state.last_timecode = None;
+ let state = self.state.lock().unwrap();
+ let state = self.flush(state);
+ drop(state);
pad.event_default(Some(element), event)
}
@@ -370,15 +832,105 @@ impl SccParse {
}
}
+ fn perform_seek(&self, event: &gst::event::Seek, element: &super::SccParse) -> bool {
+ let mut state = self.state.lock().unwrap();
+
+ if state.pull.is_none() {
+ gst_error!(CAT, obj: element, "seeking is only supported in pull mode");
+ return false;
+ }
+
+ let (rate, flags, start_type, start, stop_type, stop) = event.get();
+
+ let mut start: gst::ClockTime = match start.try_into() {
+ Ok(start) => start,
+ Err(_) => {
+ gst_error!(CAT, obj: element, "seek has invalid format");
+ return false;
+ }
+ };
+
+ let mut stop: gst::ClockTime = match stop.try_into() {
+ Ok(stop) => stop,
+ Err(_) => {
+ gst_error!(CAT, obj: element, "seek has invalid format");
+ return false;
+ }
+ };
+
+ if !flags.contains(gst::SeekFlags::FLUSH) {
+ gst_error!(CAT, obj: element, "only flushing seeks are supported");
+ return false;
+ }
+
+ if start_type == gst::SeekType::End || stop_type == gst::SeekType::End {
+ gst_error!(CAT, obj: element, "Relative seeks are not supported");
+ return false;
+ }
+
+ let pull = state.pull.as_ref().unwrap();
+
+ if start_type == gst::SeekType::Set {
+ start = start.min(pull.duration).unwrap_or(start);
+ }
+
+ if stop_type == gst::SeekType::Set {
+ stop = stop.min(pull.duration).unwrap_or(stop);
+ }
+
+ state.seeking = true;
+ let seek_seqnum = event.get_seqnum();
+ state.seek_seqnum = Some(seek_seqnum);
+
+ let event = gst::event::FlushStart::builder()
+ .seqnum(seek_seqnum)
+ .build();
+
+ gst_debug!(CAT, obj: element, "Sending event {:?} upstream", event);
+ self.sinkpad.push_event(event);
+
+ let event = gst::event::FlushStart::builder()
+ .seqnum(seek_seqnum)
+ .build();
+
+ gst_debug!(CAT, obj: element, "Pushing event {:?}", event);
+ self.srcpad.push_event(event);
+
+ self.sinkpad.pause_task().unwrap();
+
+ state = self.flush(state);
+
+ let event = gst::event::FlushStop::builder(true)
+ .seqnum(seek_seqnum)
+ .build();
+
+ /* Drop our state while we push a serialized event upstream */
+ drop(state);
+
+ gst_debug!(CAT, obj: element, "Sending event {:?} upstream", event);
+ self.sinkpad.push_event(event);
+
+ state = self.state.lock().unwrap();
+
+ state
+ .segment
+ .do_seek(rate, flags, start_type, start, stop_type, stop);
+
+ match self.start_task(element) {
+ Err(error) => {
+ error.log();
+ false
+ }
+ _ => true,
+ }
+ }
+
fn src_event(&self, pad: &gst::Pad, element: &super::SccParse, event: gst::Event) -> bool {
use gst::EventView;
gst_log!(CAT, obj: pad, "Handling event {:?}", event);
match event.view() {
- EventView::Seek(_) => {
- gst_log!(CAT, obj: pad, "Dropping seek event");
- false
- }
+ EventView::Seek(e) => self.perform_seek(&e, element),
_ => pad.event_default(Some(element), event),
}
}
@@ -395,14 +947,24 @@ impl SccParse {
match query.view_mut() {
QueryView::Seeking(mut q) => {
- // We don't support any seeking at all
+ let state = self.state.lock().unwrap();
+
let fmt = q.get_format();
- q.set(
- false,
- gst::GenericFormattedValue::Other(fmt, -1),
- gst::GenericFormattedValue::Other(fmt, -1),
- );
- true
+
+ if fmt == gst::Format::Time {
+ if let Some(pull) = state.pull.as_ref() {
+ q.set(
+ true,
+ gst::GenericFormattedValue::Time(0.into()),
+ gst::GenericFormattedValue::Time(pull.duration),
+ );
+ true
+ } else {
+ false
+ }
+ } else {
+ false
+ }
}
QueryView::Position(ref mut q) => {
// For Time answer ourselfs, otherwise forward
@@ -414,6 +976,24 @@ impl SccParse {
self.sinkpad.peer_query(query)
}
}
+ QueryView::Duration(ref mut q) => {
+ // For Time answer ourselfs, otherwise forward
+ let state = self.state.lock().unwrap();
+ if q.get_format() == gst::Format::Time {
+ if let Some(pull) = state.pull.as_ref() {
+ if pull.duration.is_some() {
+ q.set(state.pull.as_ref().unwrap().duration);
+ true
+ } else {
+ false
+ }
+ } else {
+ false
+ }
+ } else {
+ self.sinkpad.peer_query(query)
+ }
+ }
_ => pad.query_default(Some(element), query),
}
}
@@ -431,6 +1011,25 @@ impl ObjectSubclass for SccParse {
fn with_class(klass: &Self::Class) -> Self {
let templ = klass.get_pad_template("sink").unwrap();
let sinkpad = gst::Pad::builder_with_template(&templ, Some("sink"))
+ .activate_function(|pad, parent| {
+ SccParse::catch_panic_pad_function(
+ parent,
+ || Err(gst_loggable_error!(CAT, "Panic activating sink pad")),
+ |parse, element| parse.sink_activate(pad, element),
+ )
+ })
+ .activatemode_function(|pad, parent, mode, active| {
+ SccParse::catch_panic_pad_function(
+ parent,
+ || {
+ Err(gst_loggable_error!(
+ CAT,
+ "Panic activating sink pad with mode"
+ ))
+ },
+ |parse, element| parse.sink_activatemode(pad, element, mode, active),
+ )
+ })
.chain_function(|pad, parent, buffer| {
SccParse::catch_panic_pad_function(
parent,
diff --git a/video/closedcaption/src/scc_parse/parser.rs b/video/closedcaption/src/scc_parse/parser.rs
index b1b77cd79..53ccd4e8f 100644
--- a/video/closedcaption/src/scc_parse/parser.rs
+++ b/video/closedcaption/src/scc_parse/parser.rs
@@ -141,6 +141,12 @@ impl SccParser {
}
}
+ pub fn new_scan_captions() -> Self {
+ Self {
+ state: State::CaptionOrEmpty,
+ }
+ }
+
pub fn reset(&mut self) {
self.state = State::Header;
}
diff --git a/video/closedcaption/tests/scc_parse.rs b/video/closedcaption/tests/scc_parse.rs
index 3e0a449f8..b960866eb 100644
--- a/video/closedcaption/tests/scc_parse.rs
+++ b/video/closedcaption/tests/scc_parse.rs
@@ -17,10 +17,12 @@
// Boston, MA 02110-1335, USA.
use gst::prelude::*;
+use gst::EventView;
use gst_video::{ValidVideoTimeCode, VideoTimeCode};
use pretty_assertions::assert_eq;
use rand::{Rng, SeedableRng};
use std::collections::VecDeque;
+use std::path::PathBuf;
fn init() {
use std::sync::Once;
@@ -199,3 +201,83 @@ fn test_timecodes() {
.build()
);
}
+
+#[test]
+fn test_pull() {
+ init();
+
+ let mut path = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
+ path.push("tests/dn2018-1217.scc");
+
+ let mut h = gst_check::Harness::new_parse(&format!("filesrc location={:?} ! sccparse", path));
+
+ h.play();
+
+ /* Let's first pull until EOS */
+ loop {
+ let mut done = false;
+
+ while h.events_in_queue() != 0 {
+ let event = h.pull_event();
+
+ if let Ok(event) = event {
+ match event.view() {
+ EventView::Eos(_) => {
+ done = true;
+ break;
+ }
+ _ => (),
+ }
+ }
+ }
+
+ while h.buffers_in_queue() != 0 {
+ let _ = h.pull();
+ }
+
+ if done {
+ break;
+ }
+ }
+
+ /* Now seek and check that we receive buffers with appropriate PTS */
+ h.push_upstream_event(gst::event::Seek::new(
+ 1.0,
+ gst::SeekFlags::FLUSH,
+ gst::SeekType::Set,
+ gst::GenericFormattedValue::Time(18 * gst::SECOND),
+ gst::SeekType::Set,
+ gst::GenericFormattedValue::Time(19 * gst::SECOND),
+ ));
+
+ loop {
+ let mut done = false;
+
+ while h.buffers_in_queue() != 0 {
+ if let Ok(buffer) = h.pull() {
+ let pts = buffer.get_pts();
+ let end_time = pts + buffer.get_duration();
+
+ assert!(end_time >= 18 * gst::SECOND && pts < 19 * gst::SECOND);
+ }
+ }
+
+ while h.events_in_queue() != 0 {
+ let event = h.pull_event();
+
+ if let Ok(event) = event {
+ match event.view() {
+ EventView::Eos(_) => {
+ done = true;
+ break;
+ }
+ _ => (),
+ }
+ }
+ }
+
+ if done {
+ break;
+ }
+ }
+}