Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.freedesktop.org/gstreamer/gst-plugins-rs.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--generic/sodium/examples/decrypt_example.rs2
-rw-r--r--generic/sodium/examples/encrypt_example.rs2
-rw-r--r--generic/sodium/src/decrypter/imp.rs6
-rw-r--r--generic/sodium/src/encrypter/imp.rs6
-rw-r--r--generic/sodium/tests/decrypter.rs6
-rw-r--r--generic/threadshare/src/appsrc/imp.rs23
-rw-r--r--generic/threadshare/src/dataqueue.rs16
-rw-r--r--generic/threadshare/src/inputselector/imp.rs40
-rw-r--r--generic/threadshare/src/jitterbuffer/imp.rs210
-rw-r--r--generic/threadshare/src/jitterbuffer/jitterbuffer.rs61
-rw-r--r--generic/threadshare/src/proxy/imp.rs29
-rw-r--r--generic/threadshare/src/queue/imp.rs27
-rw-r--r--generic/threadshare/src/runtime/executor.rs26
-rw-r--r--generic/threadshare/src/runtime/task.rs51
-rw-r--r--generic/threadshare/src/socket.rs14
-rw-r--r--generic/threadshare/src/tcpclientsrc/imp.rs16
-rw-r--r--generic/threadshare/src/udpsink/imp.rs51
-rw-r--r--generic/threadshare/src/udpsrc/imp.rs18
-rw-r--r--generic/threadshare/tests/pad.rs7
-rw-r--r--generic/threadshare/tests/proxy.rs6
-rw-r--r--generic/threadshare/tests/queue.rs2
-rw-r--r--generic/threadshare/tests/tcpclientsrc.rs2
22 files changed, 344 insertions, 277 deletions
diff --git a/generic/sodium/examples/decrypt_example.rs b/generic/sodium/examples/decrypt_example.rs
index 1a8a199a6..d1f335eaa 100644
--- a/generic/sodium/examples/decrypt_example.rs
+++ b/generic/sodium/examples/decrypt_example.rs
@@ -124,7 +124,7 @@ fn main() -> Result<(), Box<dyn Error>> {
.expect("Unable to set the pipeline to the `Playing` state");
let bus = pipeline.bus().unwrap();
- for msg in bus.iter_timed(gst::CLOCK_TIME_NONE) {
+ for msg in bus.iter_timed(gst::ClockTime::NONE) {
use gst::MessageView;
match msg.view() {
MessageView::Error(err) => {
diff --git a/generic/sodium/examples/encrypt_example.rs b/generic/sodium/examples/encrypt_example.rs
index 29aa586f0..ea125ddc3 100644
--- a/generic/sodium/examples/encrypt_example.rs
+++ b/generic/sodium/examples/encrypt_example.rs
@@ -121,7 +121,7 @@ fn main() -> Result<(), Box<dyn Error>> {
pipeline.set_state(gst::State::Playing)?;
let bus = pipeline.bus().unwrap();
- for msg in bus.iter_timed(gst::CLOCK_TIME_NONE) {
+ for msg in bus.iter_timed(gst::ClockTime::NONE) {
use gst::MessageView;
match msg.view() {
MessageView::Error(err) => {
diff --git a/generic/sodium/src/decrypter/imp.rs b/generic/sodium/src/decrypter/imp.rs
index ce8a840a9..428b252c1 100644
--- a/generic/sodium/src/decrypter/imp.rs
+++ b/generic/sodium/src/decrypter/imp.rs
@@ -322,8 +322,8 @@ impl Decrypter {
}
let size = match peer_query.result().try_into().unwrap() {
- gst::format::Bytes(Some(size)) => size,
- gst::format::Bytes(None) => {
+ Some(gst::format::Bytes(size)) => size,
+ None => {
gst_error!(CAT, "Failed to query upstream duration");
return false;
@@ -348,7 +348,7 @@ impl Decrypter {
let size = size - total_chunks * box_::MACBYTES as u64;
gst_debug!(CAT, obj: pad, "Setting duration bytes: {}", size);
- q.set(gst::format::Bytes::from(size));
+ q.set(gst::format::Bytes(size));
true
}
diff --git a/generic/sodium/src/encrypter/imp.rs b/generic/sodium/src/encrypter/imp.rs
index 3b5bde2af..ac1f1f402 100644
--- a/generic/sodium/src/encrypter/imp.rs
+++ b/generic/sodium/src/encrypter/imp.rs
@@ -299,8 +299,8 @@ impl Encrypter {
}
let size = match peer_query.result().try_into().unwrap() {
- gst::format::Bytes(Some(size)) => size,
- gst::format::Bytes(None) => {
+ Some(gst::format::Bytes(size)) => size,
+ None => {
gst_error!(CAT, "Failed to query upstream duration");
return false;
@@ -324,7 +324,7 @@ impl Encrypter {
let size = size + crate::HEADERS_SIZE as u64;
gst_debug!(CAT, obj: pad, "Setting duration bytes: {}", size);
- q.set(gst::format::Bytes::from(size));
+ q.set(gst::format::Bytes(size));
true
}
diff --git a/generic/sodium/tests/decrypter.rs b/generic/sodium/tests/decrypter.rs
index 8feafaefd..25f1afeb5 100644
--- a/generic/sodium/tests/decrypter.rs
+++ b/generic/sodium/tests/decrypter.rs
@@ -119,7 +119,7 @@ fn test_pipeline() {
.expect("Unable to set the pipeline to the `Playing` state");
let bus = pipeline.bus().unwrap();
- for msg in bus.iter_timed(gst::CLOCK_TIME_NONE) {
+ for msg in bus.iter_timed(gst::ClockTime::NONE) {
use gst::MessageView;
match msg.view() {
MessageView::Error(err) => {
@@ -200,11 +200,11 @@ fn test_pull_range() {
assert_eq!(seekable, true);
assert_eq!(
start,
- gst::GenericFormattedValue::Bytes(gst::format::Bytes(Some(0)))
+ gst::GenericFormattedValue::Bytes(Some(gst::format::Bytes(0)))
);
assert_eq!(
stop,
- gst::GenericFormattedValue::Bytes(gst::format::Bytes(Some(6043)))
+ gst::GenericFormattedValue::Bytes(Some(gst::format::Bytes(6043)))
);
// do pulls
diff --git a/generic/threadshare/src/appsrc/imp.rs b/generic/threadshare/src/appsrc/imp.rs
index bb0f622e8..2fdf6cff8 100644
--- a/generic/threadshare/src/appsrc/imp.rs
+++ b/generic/threadshare/src/appsrc/imp.rs
@@ -31,13 +31,15 @@ use once_cell::sync::Lazy;
use std::convert::TryInto;
use std::sync::Arc;
use std::sync::Mutex as StdMutex;
+use std::time::Duration;
use std::u32;
use crate::runtime::prelude::*;
use crate::runtime::{Context, PadSrc, PadSrcRef, PadSrcWeak, Task, TaskState};
const DEFAULT_CONTEXT: &str = "";
-const DEFAULT_CONTEXT_WAIT: u32 = 0;
+// FIXME use Duration::ZERO when MSVC >= 1.53.2
+const DEFAULT_CONTEXT_WAIT: Duration = Duration::from_nanos(0);
const DEFAULT_CAPS: Option<gst::Caps> = None;
const DEFAULT_MAX_BUFFERS: u32 = 10;
const DEFAULT_DO_TIMESTAMP: bool = false;
@@ -45,7 +47,7 @@ const DEFAULT_DO_TIMESTAMP: bool = false;
#[derive(Debug, Clone)]
struct Settings {
context: String,
- context_wait: u32,
+ context_wait: Duration,
caps: Option<gst::Caps>,
max_buffers: u32,
do_timestamp: bool,
@@ -223,7 +225,7 @@ impl PadSrcHandler for AppSrcPadHandler {
gst_log!(CAT, obj: pad.gst_pad(), "Handling {:?}", query);
let ret = match query.view_mut() {
QueryView::Latency(ref mut q) => {
- q.set(true, 0.into(), gst::CLOCK_TIME_NONE);
+ q.set(true, gst::ClockTime::ZERO, gst::ClockTime::NONE);
true
}
QueryView::Scheduling(ref mut q) => {
@@ -389,8 +391,11 @@ impl AppSrc {
let now = clock.time();
let buffer = buffer.make_mut();
- buffer.set_dts(now - base_time);
- buffer.set_pts(gst::CLOCK_TIME_NONE);
+ buffer.set_dts(
+ now.zip(base_time)
+ .and_then(|(now, base_time)| now.checked_sub(base_time)),
+ );
+ buffer.set_pts(None);
} else {
gst_error!(CAT, obj: element, "Don't have a clock yet");
return false;
@@ -540,7 +545,7 @@ impl ObjectImpl for AppSrc {
"Throttle poll loop to run at most once every this many ms",
0,
1000,
- DEFAULT_CONTEXT_WAIT,
+ DEFAULT_CONTEXT_WAIT.as_millis() as u32,
glib::ParamFlags::READWRITE,
),
glib::ParamSpec::new_uint(
@@ -620,7 +625,9 @@ impl ObjectImpl for AppSrc {
.unwrap_or_else(|| "".into());
}
"context-wait" => {
- settings.context_wait = value.get().expect("type checked upstream");
+ settings.context_wait = Duration::from_millis(
+ value.get::<u32>().expect("type checked upstream").into(),
+ );
}
"caps" => {
settings.caps = value.get().expect("type checked upstream");
@@ -639,7 +646,7 @@ impl ObjectImpl for AppSrc {
let settings = self.settings.lock().unwrap();
match pspec.name() {
"context" => settings.context.to_value(),
- "context-wait" => settings.context_wait.to_value(),
+ "context-wait" => (settings.context_wait.as_millis() as u32).to_value(),
"caps" => settings.caps.to_value(),
"max-buffers" => settings.max_buffers.to_value(),
"do-timestamp" => settings.do_timestamp.to_value(),
diff --git a/generic/threadshare/src/dataqueue.rs b/generic/threadshare/src/dataqueue.rs
index 263a25cf8..48f9d31da 100644
--- a/generic/threadshare/src/dataqueue.rs
+++ b/generic/threadshare/src/dataqueue.rs
@@ -25,7 +25,7 @@ use once_cell::sync::Lazy;
use std::collections::VecDeque;
use std::sync::Arc;
use std::sync::Mutex as StdMutex;
-use std::{u32, u64};
+use std::u32;
static DATA_QUEUE_CAT: Lazy<gst::DebugCategory> = Lazy::new(|| {
gst::DebugCategory::new(
@@ -54,12 +54,10 @@ impl DataQueueItem {
}
}
- fn timestamp(&self) -> Option<u64> {
+ fn timestamp(&self) -> Option<gst::ClockTime> {
match *self {
- DataQueueItem::Buffer(ref buffer) => buffer.dts_or_pts().0,
- DataQueueItem::BufferList(ref list) => {
- list.iter().filter_map(|b| b.dts_or_pts().0).next()
- }
+ DataQueueItem::Buffer(ref buffer) => buffer.dts_or_pts(),
+ DataQueueItem::BufferList(ref list) => list.iter().find_map(|b| b.dts_or_pts()),
DataQueueItem::Event(_) => None,
}
}
@@ -86,7 +84,7 @@ struct DataQueueInner {
cur_size_bytes: u32,
max_size_buffers: Option<u32>,
max_size_bytes: Option<u32>,
- max_size_time: Option<u64>,
+ max_size_time: Option<gst::ClockTime>,
pending_handle: Option<AbortHandle>,
}
@@ -105,7 +103,7 @@ impl DataQueue {
src_pad: &gst::Pad,
max_size_buffers: Option<u32>,
max_size_bytes: Option<u32>,
- max_size_time: Option<u64>,
+ max_size_time: impl Into<Option<gst::ClockTime>>,
) -> DataQueue {
DataQueue(Arc::new(StdMutex::new(DataQueueInner {
element: element.clone(),
@@ -116,7 +114,7 @@ impl DataQueue {
cur_size_bytes: 0,
max_size_buffers,
max_size_bytes,
- max_size_time,
+ max_size_time: max_size_time.into(),
pending_handle: None,
})))
}
diff --git a/generic/threadshare/src/inputselector/imp.rs b/generic/threadshare/src/inputselector/imp.rs
index d5dd63702..080816fd2 100644
--- a/generic/threadshare/src/inputselector/imp.rs
+++ b/generic/threadshare/src/inputselector/imp.rs
@@ -35,12 +35,13 @@ use crate::runtime::prelude::*;
use crate::runtime::{self, PadSink, PadSinkRef, PadSrc, PadSrcRef};
const DEFAULT_CONTEXT: &str = "";
-const DEFAULT_CONTEXT_WAIT: u32 = 0;
+// FIXME use Duration::ZERO when MSVC >= 1.53.2
+const DEFAULT_CONTEXT_WAIT: Duration = Duration::from_nanos(0);
#[derive(Debug, Clone)]
struct Settings {
context: String,
- context_wait: u32,
+ context_wait: Duration,
}
impl Default for Settings {
@@ -74,14 +75,20 @@ struct InputSelectorPadSinkHandler(Arc<Mutex<InputSelectorPadSinkHandlerInner>>)
impl InputSelectorPadSinkHandler {
/* Wait until specified time */
- async fn sync(&self, element: &super::InputSelector, running_time: gst::ClockTime) {
+ async fn sync(
+ &self,
+ element: &super::InputSelector,
+ running_time: impl Into<Option<gst::ClockTime>>,
+ ) {
let now = element.current_running_time();
- if let Some(delay) = running_time
- .saturating_sub(now)
- .and_then(|delay| delay.nseconds())
+ match running_time
+ .into()
+ .zip(now)
+ .and_then(|(running_time, now)| running_time.checked_sub(now))
{
- runtime::time::delay_for(Duration::from_nanos(delay)).await;
+ Some(delay) => runtime::time::delay_for(delay.into()).await,
+ None => runtime::executor::yield_now().await,
}
}
@@ -289,8 +296,8 @@ impl PadSrcHandler for InputSelectorPadSrcHandler {
match query.view_mut() {
QueryView::Latency(ref mut q) => {
let mut ret = true;
- let mut min_latency = 0.into();
- let mut max_latency = gst::ClockTime::none();
+ let mut min_latency = gst::ClockTime::ZERO;
+ let mut max_latency = gst::ClockTime::NONE;
let pads = {
let pads = inputselector.pads.lock().unwrap();
pads.sink_pads
@@ -307,8 +314,11 @@ impl PadSrcHandler for InputSelectorPadSrcHandler {
if ret {
let (live, min, max) = peer_query.result();
if live {
- min_latency = min.max(min_latency).unwrap_or(min_latency);
- max_latency = max.min(max_latency).unwrap_or(max);
+ min_latency = min.max(min_latency);
+ max_latency = max
+ .zip(max_latency)
+ .map(|(max, max_latency)| max.min(max_latency))
+ .or(max);
}
}
}
@@ -424,7 +434,7 @@ impl ObjectImpl for InputSelector {
"Throttle poll loop to run at most once every this many ms",
0,
1000,
- DEFAULT_CONTEXT_WAIT,
+ DEFAULT_CONTEXT_WAIT.as_millis() as u32,
glib::ParamFlags::READWRITE,
),
glib::ParamSpec::new_object(
@@ -457,7 +467,9 @@ impl ObjectImpl for InputSelector {
}
"context-wait" => {
let mut settings = self.settings.lock().unwrap();
- settings.context_wait = value.get().expect("type checked upstream");
+ settings.context_wait = Duration::from_millis(
+ value.get::<u32>().expect("type checked upstream").into(),
+ );
}
"active-pad" => {
let pad = value
@@ -501,7 +513,7 @@ impl ObjectImpl for InputSelector {
}
"context-wait" => {
let settings = self.settings.lock().unwrap();
- settings.context_wait.to_value()
+ (settings.context_wait.as_millis() as u32).to_value()
}
"active-pad" => {
let state = self.state.lock().unwrap();
diff --git a/generic/threadshare/src/jitterbuffer/imp.rs b/generic/threadshare/src/jitterbuffer/imp.rs
index e89d939fb..ef3880d62 100644
--- a/generic/threadshare/src/jitterbuffer/imp.rs
+++ b/generic/threadshare/src/jitterbuffer/imp.rs
@@ -39,27 +39,27 @@ use crate::runtime::{self, Context, PadSink, PadSinkRef, PadSrc, PadSrcRef, Task
use super::jitterbuffer::{RTPJitterBuffer, RTPJitterBufferItem, RTPPacketRateCtx};
-const DEFAULT_LATENCY_MS: u32 = 200;
+const DEFAULT_LATENCY: gst::ClockTime = gst::ClockTime::from_mseconds(200);
const DEFAULT_DO_LOST: bool = false;
const DEFAULT_MAX_DROPOUT_TIME: u32 = 60000;
const DEFAULT_MAX_MISORDER_TIME: u32 = 2000;
const DEFAULT_CONTEXT: &str = "";
-const DEFAULT_CONTEXT_WAIT: u32 = 0;
+const DEFAULT_CONTEXT_WAIT: gst::ClockTime = gst::ClockTime::ZERO;
#[derive(Debug, Clone)]
struct Settings {
- latency_ms: u32,
+ latency: gst::ClockTime,
do_lost: bool,
max_dropout_time: u32,
max_misorder_time: u32,
context: String,
- context_wait: u32,
+ context_wait: gst::ClockTime,
}
impl Default for Settings {
fn default() -> Self {
Settings {
- latency_ms: DEFAULT_LATENCY_MS,
+ latency: DEFAULT_LATENCY,
do_lost: DEFAULT_DO_LOST,
max_dropout_time: DEFAULT_MAX_DROPOUT_TIME,
max_misorder_time: DEFAULT_MAX_MISORDER_TIME,
@@ -108,7 +108,7 @@ impl PartialEq for GapPacket {
struct SinkHandlerInner {
packet_rate_ctx: RTPPacketRateCtx,
ips_rtptime: Option<u32>,
- ips_pts: gst::ClockTime,
+ ips_pts: Option<gst::ClockTime>,
gap_packets: BTreeSet<GapPacket>,
@@ -123,7 +123,7 @@ impl Default for SinkHandlerInner {
SinkHandlerInner {
packet_rate_ctx: RTPPacketRateCtx::new(),
ips_rtptime: None,
- ips_pts: gst::CLOCK_TIME_NONE,
+ ips_pts: None,
gap_packets: BTreeSet::new(),
last_pt: None,
last_in_seqnum: None,
@@ -155,16 +155,16 @@ impl SinkHandler {
state.discont = true;
state.last_popped_seqnum = None;
- state.last_popped_pts = gst::CLOCK_TIME_NONE;
+ state.last_popped_pts = None;
inner.last_in_seqnum = None;
inner.last_rtptime = None;
- state.earliest_pts = gst::CLOCK_TIME_NONE;
+ state.earliest_pts = None;
state.earliest_seqnum = None;
inner.ips_rtptime = None;
- inner.ips_pts = gst::CLOCK_TIME_NONE;
+ inner.ips_pts = None;
mem::replace(&mut inner.gap_packets, BTreeSet::new())
}
@@ -208,14 +208,17 @@ impl SinkHandler {
inner: &mut SinkHandlerInner,
state: &mut State,
rtptime: u32,
- pts: gst::ClockTime,
+ pts: impl Into<Option<gst::ClockTime>>,
) {
if inner.ips_rtptime != Some(rtptime) {
- if inner.ips_pts.is_some() && pts.is_some() {
- let new_packet_spacing = pts - inner.ips_pts;
+ let pts = pts.into();
+ let new_packet_spacing = inner
+ .ips_pts
+ .zip(pts)
+ .and_then(|(ips_pts, pts)| pts.checked_sub(ips_pts));
+ if let Some(new_packet_spacing) = new_packet_spacing {
let old_packet_spacing = state.packet_spacing;
- assert!(old_packet_spacing.is_some());
if old_packet_spacing > new_packet_spacing {
state.packet_spacing = (new_packet_spacing + 3 * old_packet_spacing) / 4;
} else if !old_packet_spacing.is_zero() {
@@ -421,7 +424,7 @@ impl SinkHandler {
return Ok(gst::FlowSuccess::Ok);
}
}
- inner.ips_pts = gst::CLOCK_TIME_NONE;
+ inner.ips_pts = None;
inner.ips_rtptime = None;
}
@@ -441,7 +444,7 @@ impl SinkHandler {
inner.last_in_seqnum = Some(seq);
let jb_item = if estimated_dts {
- RTPJitterBufferItem::new(buffer, gst::CLOCK_TIME_NONE, pts, Some(seq), rtptime)
+ RTPJitterBufferItem::new(buffer, gst::ClockTime::NONE, pts, Some(seq), rtptime)
} else {
RTPJitterBufferItem::new(buffer, dts, pts, Some(seq), rtptime)
};
@@ -463,15 +466,16 @@ impl SinkHandler {
inner.last_rtptime = Some(rtptime);
- if state.earliest_pts.is_none()
- || (pts.is_some()
- && (pts < state.earliest_pts
- || (pts == state.earliest_pts
- && state
- .earliest_seqnum
- .map(|earliest_seqnum| seq > earliest_seqnum)
- .unwrap_or(false))))
- {
+ let must_update = match (state.earliest_pts, pts) {
+ (None, _) => true,
+ (Some(earliest_pts), Some(pts)) if pts < earliest_pts => true,
+ (Some(earliest_pts), Some(pts)) if pts == earliest_pts => state
+ .earliest_seqnum
+ .map_or(false, |earliest_seqnum| seq > earliest_seqnum),
+ _ => false,
+ };
+
+ if must_update {
state.earliest_pts = pts;
state.earliest_seqnum = Some(seq);
}
@@ -515,10 +519,7 @@ impl SinkHandler {
let (latency, context_wait) = {
let settings = jb.settings.lock().unwrap();
- (
- settings.latency_ms as u64 * gst::MSECOND,
- settings.context_wait as u64 * gst::MSECOND,
- )
+ (settings.latency, settings.context_wait)
};
// Reschedule if needed
@@ -527,13 +528,15 @@ impl SinkHandler {
.next_wakeup(&element, &state, latency, context_wait);
if let Some((next_wakeup, _)) = next_wakeup {
if let Some((previous_next_wakeup, ref abort_handle)) = state.wait_handle {
- if previous_next_wakeup.is_none() || previous_next_wakeup > next_wakeup {
+ if previous_next_wakeup.is_none()
+ || next_wakeup.map_or(false, |next| previous_next_wakeup.unwrap() > next)
+ {
gst_debug!(
CAT,
obj: pad,
"Rescheduling for new item {} < {}",
- next_wakeup,
- previous_next_wakeup
+ next_wakeup.display(),
+ previous_next_wakeup.display(),
);
abort_handle.abort();
state.wait_handle = None;
@@ -666,16 +669,13 @@ impl SrcHandler {
state: &mut State,
element: &super::JitterBuffer,
seqnum: u16,
- pts: gst::ClockTime,
+ pts: impl Into<Option<gst::ClockTime>>,
discont: &mut bool,
) -> Vec<gst::Event> {
- let (latency_ns, do_lost) = {
+ let (latency, do_lost) = {
let jb = JitterBuffer::from_instance(element);
let settings = jb.settings.lock().unwrap();
- (
- settings.latency_ms as u64 * gst::MSECOND.nseconds().unwrap(),
- settings.do_lost,
- )
+ (settings.latency, settings.do_lost)
};
let mut events = vec![];
@@ -697,30 +697,24 @@ impl SrcHandler {
let gap = gst_rtp::compare_seqnum(lost_seqnum, seqnum) as i64;
if gap > 0 {
- let interval =
- pts.nseconds().unwrap() as i64 - state.last_popped_pts.nseconds().unwrap() as i64;
let gap = gap as u64;
- let spacing = if interval >= 0 {
- interval as u64 / (gap + 1)
- } else {
- 0
- };
+ // FIXME reason why we can expect Some for the 2 lines below
+ let mut last_popped_pts = state.last_popped_pts.unwrap();
+ let interval = pts.into().unwrap().saturating_sub(last_popped_pts);
+ let spacing = interval / (gap as u64 + 1);
*discont = true;
- if state.equidistant > 0 && gap > 1 && gap * spacing > latency_ns {
- let n_packets = gap - latency_ns / spacing;
+ if state.equidistant > 0 && gap > 1 && gap * spacing > latency {
+ let n_packets = gap - latency.nseconds() / spacing.nseconds();
if do_lost {
let s = gst::Structure::new(
"GstRTPPacketLost",
&[
("seqnum", &(lost_seqnum as u32)),
- (
- "timestamp",
- &(state.last_popped_pts + gst::ClockTime(Some(spacing))),
- ),
- ("duration", &(n_packets * spacing)),
+ ("timestamp", &(last_popped_pts + spacing)),
+ ("duration", &(n_packets * spacing).nseconds()),
("retry", &0),
],
);
@@ -729,15 +723,20 @@ impl SrcHandler {
}
lost_seqnum = lost_seqnum.wrapping_add(n_packets as u16);
- state.last_popped_pts += gst::ClockTime(Some(n_packets * spacing));
+ last_popped_pts += n_packets * spacing;
+ state.last_popped_pts = Some(last_popped_pts);
state.stats.num_lost += n_packets;
}
while lost_seqnum != seqnum {
- let timestamp = state.last_popped_pts + gst::ClockTime(Some(spacing));
- let duration = if state.equidistant > 0 { spacing } else { 0 };
+ let timestamp = last_popped_pts + spacing;
+ let duration = if state.equidistant > 0 {
+ spacing
+ } else {
+ gst::ClockTime::ZERO
+ };
- state.last_popped_pts = timestamp;
+ state.last_popped_pts = Some(timestamp);
if do_lost {
let s = gst::Structure::new(
@@ -745,7 +744,8 @@ impl SrcHandler {
&[
("seqnum", &(lost_seqnum as u32)),
("timestamp", &timestamp),
- ("duration", &duration),
+ // FIXME would probably make sense being a ClockTime
+ ("duration", &duration.nseconds()),
("retry", &0),
],
);
@@ -819,8 +819,8 @@ impl SrcHandler {
};
state.last_popped_pts = buffer.pts();
- if let Some(pts) = state.last_popped_pts.nseconds() {
- state.position = pts.into();
+ if state.last_popped_pts.is_some() {
+ state.position = state.last_popped_pts;
}
state.last_popped_seqnum = seq;
@@ -845,22 +845,26 @@ impl SrcHandler {
state: &State,
latency: gst::ClockTime,
context_wait: gst::ClockTime,
- ) -> (gst::ClockTime, Option<(gst::ClockTime, Duration)>) {
+ ) -> (
+ Option<gst::ClockTime>,
+ Option<(Option<gst::ClockTime>, Duration)>,
+ ) {
let now = element.current_running_time();
gst_debug!(
CAT,
obj: element,
"Now is {}, EOS {}, earliest pts is {}, packet_spacing {} and latency {}",
- now,
+ now.display(),
state.eos,
- state.earliest_pts,
+ state.earliest_pts.display(),
state.packet_spacing,
latency
);
if state.eos {
gst_debug!(CAT, obj: element, "EOS, not waiting");
+ // FIXME use Duration::ZERO when MSVC >= 1.53.2
return (now, Some((now, Duration::from_nanos(0))));
}
@@ -868,23 +872,25 @@ impl SrcHandler {
return (now, None);
}
- let next_wakeup = state.earliest_pts + latency - state.packet_spacing - context_wait / 2;
+ let next_wakeup = state
+ .earliest_pts
+ .map(|earliest_pts| earliest_pts + latency - state.packet_spacing - context_wait / 2);
let delay = next_wakeup
- .saturating_sub(now)
- .unwrap_or_else(gst::ClockTime::zero)
- .nseconds()
- .unwrap();
+ .zip(now)
+ .map_or(gst::ClockTime::ZERO, |(next_wakeup, now)| {
+ next_wakeup.saturating_sub(now)
+ });
gst_debug!(
CAT,
obj: element,
"Next wakeup at {} with delay {}",
- next_wakeup,
+ next_wakeup.display(),
delay
);
- (now, Some((next_wakeup, Duration::from_nanos(delay))))
+ (now, Some((next_wakeup, delay.into())))
}
}
@@ -954,8 +960,8 @@ impl PadSrcHandler for SrcHandler {
if ret {
let settings = jb.settings.lock().unwrap();
let (_, mut min_latency, _) = peer_query.result();
- min_latency += (settings.latency_ms as u64) * gst::SECOND;
- let max_latency = gst::CLOCK_TIME_NONE;
+ min_latency += settings.latency;
+ let max_latency = gst::ClockTime::NONE;
q.set(true, min_latency, max_latency);
}
@@ -999,7 +1005,7 @@ struct State {
jbuf: glib::SendUniqueCell<RTPJitterBuffer>,
last_res: Result<gst::FlowSuccess, gst::FlowError>,
- position: gst::ClockTime,
+ position: Option<gst::ClockTime>,
segment: gst::FormattedSegment<gst::ClockTime>,
clock_rate: Option<u32>,
@@ -1011,14 +1017,14 @@ struct State {
eos: bool,
last_popped_seqnum: Option<u16>,
- last_popped_pts: gst::ClockTime,
+ last_popped_pts: Option<gst::ClockTime>,
stats: Stats,
- earliest_pts: gst::ClockTime,
+ earliest_pts: Option<gst::ClockTime>,
earliest_seqnum: Option<u16>,
- wait_handle: Option<(gst::ClockTime, AbortHandle)>,
+ wait_handle: Option<(Option<gst::ClockTime>, AbortHandle)>,
}
impl Default for State {
@@ -1027,23 +1033,23 @@ impl Default for State {
jbuf: glib::SendUniqueCell::new(RTPJitterBuffer::new()).unwrap(),
last_res: Ok(gst::FlowSuccess::Ok),
- position: gst::CLOCK_TIME_NONE,
+ position: None,
segment: gst::FormattedSegment::<gst::ClockTime>::new(),
clock_rate: None,
- packet_spacing: gst::ClockTime::zero(),
+ packet_spacing: gst::ClockTime::ZERO,
equidistant: 0,
discont: true,
eos: false,
last_popped_seqnum: None,
- last_popped_pts: gst::CLOCK_TIME_NONE,
+ last_popped_pts: None,
stats: Stats::default(),
- earliest_pts: gst::CLOCK_TIME_NONE,
+ earliest_pts: None,
earliest_seqnum: None,
wait_handle: None,
@@ -1093,10 +1099,7 @@ impl TaskImpl for JitterBufferTask {
let jb = JitterBuffer::from_instance(&self.element);
let (latency, context_wait) = {
let settings = jb.settings.lock().unwrap();
- (
- settings.latency_ms as u64 * gst::MSECOND,
- settings.context_wait as u64 * gst::MSECOND,
- )
+ (settings.latency, settings.context_wait)
};
loop {
@@ -1110,6 +1113,7 @@ impl TaskImpl for JitterBufferTask {
);
let (delay_fut, abort_handle) = match next_wakeup {
+ // FIXME use Duration::ZERO when MSVC >= 1.53.2
Some((_, delay)) if delay == Duration::from_nanos(0) => (None, None),
_ => {
let (delay_fut, abort_handle) = abortable(async move {
@@ -1123,8 +1127,7 @@ impl TaskImpl for JitterBufferTask {
};
});
- let next_wakeup =
- next_wakeup.map(|w| w.0).unwrap_or(gst::CLOCK_TIME_NONE);
+ let next_wakeup = next_wakeup.and_then(|w| w.0);
(Some(delay_fut), Some((next_wakeup, abort_handle)))
}
};
@@ -1158,12 +1161,15 @@ impl TaskImpl for JitterBufferTask {
CAT,
obj: &self.element,
"Woke up at {}, earliest_pts {}",
- now,
- state.earliest_pts
+ now.display(),
+ state.earliest_pts.display()
);
if let Some((next_wakeup, _)) = next_wakeup {
- if next_wakeup > now {
+ if next_wakeup
+ .zip(now)
+ .map_or(false, |(next_wakeup, now)| next_wakeup > now)
+ {
// Reschedule and wait a bit longer in the next iteration
return Ok(());
}
@@ -1198,8 +1204,8 @@ impl TaskImpl for JitterBufferTask {
latency,
context_wait,
);
- if let Some((next_wakeup, _)) = next_wakeup {
- if next_wakeup > now {
+ if let Some((Some(next_wakeup), _)) = next_wakeup {
+ if now.map_or(false, |now| next_wakeup > now) {
// Reschedule and wait a bit longer in the next iteration
return Ok(());
}
@@ -1281,7 +1287,7 @@ impl JitterBuffer {
let context = {
let settings = self.settings.lock().unwrap();
- Context::acquire(&settings.context, settings.context_wait).unwrap()
+ Context::acquire(&settings.context, settings.context_wait.into()).unwrap()
};
self.task
@@ -1367,7 +1373,7 @@ impl ObjectImpl for JitterBuffer {
"Throttle poll loop to run at most once every this many ms",
0,
1000,
- DEFAULT_CONTEXT_WAIT,
+ DEFAULT_CONTEXT_WAIT.mseconds() as u32,
glib::ParamFlags::READWRITE,
),
glib::ParamSpec::new_uint(
@@ -1376,7 +1382,7 @@ impl ObjectImpl for JitterBuffer {
"Amount of ms to buffer",
0,
std::u32::MAX,
- DEFAULT_LATENCY_MS,
+ DEFAULT_LATENCY.mseconds() as u32,
glib::ParamFlags::READWRITE,
),
glib::ParamSpec::new_boolean(
@@ -1446,14 +1452,16 @@ impl ObjectImpl for JitterBuffer {
) {
match pspec.name() {
"latency" => {
- let latency_ms = {
+ let latency = {
let mut settings = self.settings.lock().unwrap();
- settings.latency_ms = value.get().expect("type checked upstream");
- settings.latency_ms as u64
+ settings.latency = gst::ClockTime::from_mseconds(
+ value.get::<u32>().expect("type checked upstream").into(),
+ );
+ settings.latency
};
let state = self.state.lock().unwrap();
- state.jbuf.borrow().set_delay(latency_ms * gst::MSECOND);
+ state.jbuf.borrow().set_delay(latency);
let _ = obj.post_message(gst::message::Latency::builder().src(obj).build());
}
@@ -1478,7 +1486,9 @@ impl ObjectImpl for JitterBuffer {
}
"context-wait" => {
let mut settings = self.settings.lock().unwrap();
- settings.context_wait = value.get().expect("type checked upstream");
+ settings.context_wait = gst::ClockTime::from_mseconds(
+ value.get::<u32>().expect("type checked upstream").into(),
+ );
}
_ => unimplemented!(),
}
@@ -1488,7 +1498,7 @@ impl ObjectImpl for JitterBuffer {
match pspec.name() {
"latency" => {
let settings = self.settings.lock().unwrap();
- settings.latency_ms.to_value()
+ settings.latency.mseconds().to_value()
}
"do-lost" => {
let settings = self.settings.lock().unwrap();
@@ -1520,7 +1530,7 @@ impl ObjectImpl for JitterBuffer {
}
"context-wait" => {
let settings = self.settings.lock().unwrap();
- settings.context_wait.to_value()
+ (settings.context_wait.mseconds() as u32).to_value()
}
_ => unimplemented!(),
}
diff --git a/generic/threadshare/src/jitterbuffer/jitterbuffer.rs b/generic/threadshare/src/jitterbuffer/jitterbuffer.rs
index 67e244026..3785ac0d7 100644
--- a/generic/threadshare/src/jitterbuffer/jitterbuffer.rs
+++ b/generic/threadshare/src/jitterbuffer/jitterbuffer.rs
@@ -72,8 +72,8 @@ unsafe impl Send for RTPJitterBufferItem {}
impl RTPJitterBufferItem {
pub fn new(
buffer: gst::Buffer,
- dts: gst::ClockTime,
- pts: gst::ClockTime,
+ dts: impl Into<Option<gst::ClockTime>>,
+ pts: impl Into<Option<gst::ClockTime>>,
seqnum: Option<u16>,
rtptime: u32,
) -> RTPJitterBufferItem {
@@ -89,8 +89,8 @@ impl RTPJitterBufferItem {
next: ptr::null_mut(),
prev: ptr::null_mut(),
r#type: 0,
- dts: dts.into_glib(),
- pts: pts.into_glib(),
+ dts: dts.into().into_glib(),
+ pts: pts.into().into_glib(),
seqnum: seqnum.map(|s| s as u32).unwrap_or(std::u32::MAX),
count: 1,
rtptime,
@@ -113,24 +113,24 @@ impl RTPJitterBufferItem {
}
}
- pub fn dts(&self) -> gst::ClockTime {
+ pub fn dts(&self) -> Option<gst::ClockTime> {
unsafe {
let item = self.0.as_ref().expect("Invalid wrapper");
if item.as_ref().dts == gst::ffi::GST_CLOCK_TIME_NONE {
- gst::CLOCK_TIME_NONE
+ None
} else {
- gst::ClockTime(Some(item.as_ref().dts))
+ Some(gst::ClockTime::from_nseconds(item.as_ref().dts))
}
}
}
- pub fn pts(&self) -> gst::ClockTime {
+ pub fn pts(&self) -> Option<gst::ClockTime> {
unsafe {
let item = self.0.as_ref().expect("Invalid wrapper");
if item.as_ref().pts == gst::ffi::GST_CLOCK_TIME_NONE {
- gst::CLOCK_TIME_NONE
+ None
} else {
- gst::ClockTime(Some(item.as_ref().pts))
+ Some(gst::ClockTime::from_nseconds(item.as_ref().pts))
}
}
}
@@ -235,7 +235,10 @@ impl RTPJitterBuffer {
#[allow(dead_code)]
pub fn delay(&self) -> gst::ClockTime {
- unsafe { from_glib(ffi::rtp_jitter_buffer_get_delay(self.to_glib_none().0)) }
+ unsafe {
+ try_from_glib(ffi::rtp_jitter_buffer_get_delay(self.to_glib_none().0))
+ .expect("undefined delay")
+ }
}
pub fn set_delay(&self, delay: gst::ClockTime) {
@@ -253,29 +256,23 @@ impl RTPJitterBuffer {
pub fn calculate_pts(
&self,
- dts: gst::ClockTime,
+ dts: impl Into<Option<gst::ClockTime>>,
estimated_dts: bool,
rtptime: u32,
- base_time: gst::ClockTime,
+ base_time: impl Into<Option<gst::ClockTime>>,
gap: i32,
is_rtx: bool,
- ) -> gst::ClockTime {
+ ) -> Option<gst::ClockTime> {
unsafe {
- let pts = ffi::rtp_jitter_buffer_calculate_pts(
+ from_glib(ffi::rtp_jitter_buffer_calculate_pts(
self.to_glib_none().0,
- dts.into_glib(),
+ dts.into().into_glib(),
estimated_dts.into_glib(),
rtptime,
- base_time.into_glib(),
+ base_time.into().into_glib(),
gap,
is_rtx.into_glib(),
- );
-
- if pts == gst::ffi::GST_CLOCK_TIME_NONE {
- gst::CLOCK_TIME_NONE
- } else {
- pts.into()
- }
+ ))
}
}
@@ -297,7 +294,7 @@ impl RTPJitterBuffer {
}
}
- pub fn find_earliest(&self) -> (gst::ClockTime, Option<u16>) {
+ pub fn find_earliest(&self) -> (Option<gst::ClockTime>, Option<u16>) {
unsafe {
let mut pts = mem::MaybeUninit::uninit();
let mut seqnum = mem::MaybeUninit::uninit();
@@ -307,7 +304,7 @@ impl RTPJitterBuffer {
pts.as_mut_ptr(),
seqnum.as_mut_ptr(),
);
- let pts = pts.assume_init();
+ let pts = from_glib(pts.assume_init());
let seqnum = seqnum.assume_init();
let seqnum = if seqnum == std::u32::MAX {
@@ -316,11 +313,7 @@ impl RTPJitterBuffer {
Some(seqnum as u16)
};
- if pts == gst::ffi::GST_CLOCK_TIME_NONE {
- (gst::CLOCK_TIME_NONE, seqnum)
- } else {
- (pts.into(), seqnum)
- }
+ (pts, seqnum)
}
}
@@ -340,11 +333,11 @@ impl RTPJitterBuffer {
}
}
- pub fn peek(&self) -> (gst::ClockTime, Option<u16>) {
+ pub fn peek(&self) -> (Option<gst::ClockTime>, Option<u16>) {
unsafe {
let item = ffi::rtp_jitter_buffer_peek(self.to_glib_none().0);
if item.is_null() {
- (gst::CLOCK_TIME_NONE, None)
+ (None, None)
} else {
let seqnum = (*item).seqnum;
let seqnum = if seqnum == std::u32::MAX {
@@ -352,7 +345,7 @@ impl RTPJitterBuffer {
} else {
Some(seqnum as u16)
};
- ((*item).pts.into(), seqnum)
+ (from_glib((*item).pts), seqnum)
}
}
}
diff --git a/generic/threadshare/src/proxy/imp.rs b/generic/threadshare/src/proxy/imp.rs
index 4ee74019d..b24fe5802 100644
--- a/generic/threadshare/src/proxy/imp.rs
+++ b/generic/threadshare/src/proxy/imp.rs
@@ -30,6 +30,7 @@ use std::collections::{HashMap, VecDeque};
use std::sync::Mutex as StdMutex;
use std::sync::MutexGuard as StdMutexGuard;
use std::sync::{Arc, Weak};
+use std::time::Duration;
use std::{u32, u64};
use crate::runtime::prelude::*;
@@ -50,9 +51,10 @@ const DEFAULT_PROXY_CONTEXT: &str = "";
const DEFAULT_MAX_SIZE_BUFFERS: u32 = 200;
const DEFAULT_MAX_SIZE_BYTES: u32 = 1024 * 1024;
-const DEFAULT_MAX_SIZE_TIME: u64 = gst::SECOND_VAL;
+const DEFAULT_MAX_SIZE_TIME: gst::ClockTime = gst::ClockTime::SECOND;
const DEFAULT_CONTEXT: &str = "";
-const DEFAULT_CONTEXT_WAIT: u32 = 0;
+// FIXME use Duration::ZERO when MSVC >= 1.53.2
+const DEFAULT_CONTEXT_WAIT: Duration = Duration::from_nanos(0);
#[derive(Debug, Clone)]
struct SettingsSink {
@@ -71,9 +73,9 @@ impl Default for SettingsSink {
struct SettingsSrc {
max_size_buffers: u32,
max_size_bytes: u32,
- max_size_time: u64,
+ max_size_time: gst::ClockTime,
context: String,
- context_wait: u32,
+ context_wait: Duration,
proxy_context: String,
}
@@ -810,7 +812,7 @@ impl PadSrcHandler for ProxySrcPadHandler {
gst_log!(SRC_CAT, obj: pad.gst_pad(), "Handling {:?}", query);
let ret = match query.view_mut() {
QueryView::Latency(ref mut q) => {
- q.set(true, 0.into(), gst::CLOCK_TIME_NONE);
+ q.set(true, gst::ClockTime::ZERO, gst::ClockTime::NONE);
true
}
QueryView::Scheduling(ref mut q) => {
@@ -1031,7 +1033,7 @@ impl ProxySrc {
} else {
Some(settings.max_size_bytes)
},
- if settings.max_size_time == 0 {
+ if settings.max_size_time.is_zero() {
None
} else {
Some(settings.max_size_time)
@@ -1141,7 +1143,7 @@ impl ObjectImpl for ProxySrc {
"Throttle poll loop to run at most once every this many ms",
0,
1000,
- DEFAULT_CONTEXT_WAIT,
+ DEFAULT_CONTEXT_WAIT.as_millis() as u32,
glib::ParamFlags::READWRITE,
),
glib::ParamSpec::new_string(
@@ -1175,7 +1177,7 @@ impl ObjectImpl for ProxySrc {
"Maximum number of nanoseconds to queue (0=unlimited)",
0,
u64::MAX - 1,
- DEFAULT_MAX_SIZE_TIME,
+ DEFAULT_MAX_SIZE_TIME.nseconds(),
glib::ParamFlags::READWRITE,
),
]
@@ -1200,7 +1202,8 @@ impl ObjectImpl for ProxySrc {
settings.max_size_bytes = value.get().expect("type checked upstream");
}
"max-size-time" => {
- settings.max_size_time = value.get().expect("type checked upstream");
+ settings.max_size_time =
+ gst::ClockTime::from_nseconds(value.get().expect("type checked upstream"));
}
"context" => {
settings.context = value
@@ -1209,7 +1212,9 @@ impl ObjectImpl for ProxySrc {
.unwrap_or_else(|| "".into());
}
"context-wait" => {
- settings.context_wait = value.get().expect("type checked upstream");
+ settings.context_wait = Duration::from_millis(
+ value.get::<u32>().expect("type checked upstream").into(),
+ );
}
"proxy-context" => {
settings.proxy_context = value
@@ -1226,9 +1231,9 @@ impl ObjectImpl for ProxySrc {
match pspec.name() {
"max-size-buffers" => settings.max_size_buffers.to_value(),
"max-size-bytes" => settings.max_size_bytes.to_value(),
- "max-size-time" => settings.max_size_time.to_value(),
+ "max-size-time" => settings.max_size_time.nseconds().to_value(),
"context" => settings.context.to_value(),
- "context-wait" => settings.context_wait.to_value(),
+ "context-wait" => (settings.context_wait.as_millis() as u32).to_value(),
"proxy-context" => settings.proxy_context.to_value(),
_ => unimplemented!(),
}
diff --git a/generic/threadshare/src/queue/imp.rs b/generic/threadshare/src/queue/imp.rs
index cc62a0fa6..b4501cb02 100644
--- a/generic/threadshare/src/queue/imp.rs
+++ b/generic/threadshare/src/queue/imp.rs
@@ -28,6 +28,7 @@ use once_cell::sync::Lazy;
use std::collections::VecDeque;
use std::sync::Mutex as StdMutex;
+use std::time::Duration;
use std::{u32, u64};
use crate::runtime::prelude::*;
@@ -37,17 +38,18 @@ use crate::dataqueue::{DataQueue, DataQueueItem};
const DEFAULT_MAX_SIZE_BUFFERS: u32 = 200;
const DEFAULT_MAX_SIZE_BYTES: u32 = 1024 * 1024;
-const DEFAULT_MAX_SIZE_TIME: u64 = gst::SECOND_VAL;
+const DEFAULT_MAX_SIZE_TIME: gst::ClockTime = gst::ClockTime::SECOND;
const DEFAULT_CONTEXT: &str = "";
-const DEFAULT_CONTEXT_WAIT: u32 = 0;
+// FIXME use Duration::ZERO when MSVC >= 1.53.2
+const DEFAULT_CONTEXT_WAIT: Duration = Duration::from_nanos(0);
#[derive(Debug, Clone)]
struct Settings {
max_size_buffers: u32,
max_size_bytes: u32,
- max_size_time: u64,
+ max_size_time: gst::ClockTime,
context: String,
- context_wait: u32,
+ context_wait: Duration,
}
impl Default for Settings {
@@ -628,7 +630,7 @@ impl Queue {
} else {
Some(settings.max_size_bytes)
},
- if settings.max_size_time == 0 {
+ if settings.max_size_time.is_zero() {
None
} else {
Some(settings.max_size_time)
@@ -729,7 +731,7 @@ impl ObjectImpl for Queue {
"Throttle poll loop to run at most once every this many ms",
0,
1000,
- DEFAULT_CONTEXT_WAIT,
+ DEFAULT_CONTEXT_WAIT.as_millis() as u32,
glib::ParamFlags::READWRITE,
),
glib::ParamSpec::new_uint(
@@ -756,7 +758,7 @@ impl ObjectImpl for Queue {
"Maximum number of nanoseconds to queue (0=unlimited)",
0,
u64::MAX - 1,
- DEFAULT_MAX_SIZE_TIME,
+ DEFAULT_MAX_SIZE_TIME.nseconds(),
glib::ParamFlags::READWRITE,
),
]
@@ -781,7 +783,8 @@ impl ObjectImpl for Queue {
settings.max_size_bytes = value.get().expect("type checked upstream");
}
"max-size-time" => {
- settings.max_size_time = value.get().expect("type checked upstream");
+ settings.max_size_time =
+ gst::ClockTime::from_nseconds(value.get().expect("type checked upstream"));
}
"context" => {
settings.context = value
@@ -790,7 +793,9 @@ impl ObjectImpl for Queue {
.unwrap_or_else(|| "".into());
}
"context-wait" => {
- settings.context_wait = value.get().expect("type checked upstream");
+ settings.context_wait = Duration::from_millis(
+ value.get::<u32>().expect("type checked upstream").into(),
+ );
}
_ => unimplemented!(),
}
@@ -801,9 +806,9 @@ impl ObjectImpl for Queue {
match pspec.name() {
"max-size-buffers" => settings.max_size_buffers.to_value(),
"max-size-bytes" => settings.max_size_bytes.to_value(),
- "max-size-time" => settings.max_size_time.to_value(),
+ "max-size-time" => settings.max_size_time.nseconds().to_value(),
"context" => settings.context.to_value(),
- "context-wait" => settings.context_wait.to_value(),
+ "context-wait" => (settings.context_wait.as_millis() as u32).to_value(),
_ => unimplemented!(),
}
}
diff --git a/generic/threadshare/src/runtime/executor.rs b/generic/threadshare/src/runtime/executor.rs
index fe3552904..445c56eb2 100644
--- a/generic/threadshare/src/runtime/executor.rs
+++ b/generic/threadshare/src/runtime/executor.rs
@@ -166,7 +166,7 @@ struct ContextThread {
}
impl ContextThread {
- fn start(name: &str, wait: u32) -> Context {
+ fn start(name: &str, wait: Duration) -> Context {
let context_thread = ContextThread { name: name.into() };
let (context_sender, context_receiver) = sync_mpsc::channel();
let join = thread::spawn(move || {
@@ -187,14 +187,14 @@ impl ContextThread {
context
}
- fn spawn(&self, wait: u32, context_sender: sync_mpsc::Sender<Context>) {
+ fn spawn(&self, wait: Duration, context_sender: sync_mpsc::Sender<Context>) {
gst_debug!(RUNTIME_CAT, "Started context thread '{}'", self.name);
let mut runtime = tokio::runtime::Builder::new()
.basic_scheduler()
.thread_name(self.name.clone())
.enable_all()
- .max_throttling(Duration::from_millis(wait as u64))
+ .max_throttling(wait)
.build()
.expect("Couldn't build the runtime");
@@ -406,7 +406,7 @@ impl PartialEq for Context {
impl Eq for Context {}
impl Context {
- pub fn acquire(context_name: &str, wait: u32) -> Result<Self, io::Error> {
+ pub fn acquire(context_name: &str, wait: Duration) -> Result<Self, io::Error> {
assert_ne!(context_name, "DUMMY");
let mut contexts = CONTEXTS.lock().unwrap();
@@ -693,16 +693,16 @@ mod tests {
type Item = i32;
- const SLEEP_DURATION_MS: u32 = 2;
- const SLEEP_DURATION: Duration = Duration::from_millis(SLEEP_DURATION_MS as u64);
- const DELAY: Duration = Duration::from_millis(SLEEP_DURATION_MS as u64 * 10);
+ const SLEEP_DURATION_MS: u64 = 2;
+ const SLEEP_DURATION: Duration = Duration::from_millis(SLEEP_DURATION_MS);
+ const DELAY: Duration = Duration::from_millis(SLEEP_DURATION_MS * 10);
#[tokio::test]
async fn drain_sub_tasks() {
// Setup
gst::init().unwrap();
- let context = Context::acquire("drain_sub_tasks", SLEEP_DURATION_MS).unwrap();
+ let context = Context::acquire("drain_sub_tasks", SLEEP_DURATION).unwrap();
let join_handle = context.spawn(async move {
let (sender, mut receiver) = mpsc::channel(1);
@@ -755,7 +755,7 @@ mod tests {
async fn block_on_within_tokio() {
gst::init().unwrap();
- let context = Context::acquire("block_on_within_tokio", SLEEP_DURATION_MS).unwrap();
+ let context = Context::acquire("block_on_within_tokio", SLEEP_DURATION).unwrap();
let bytes_sent = crate::runtime::executor::block_on(context.spawn(async {
let saddr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 5000);
@@ -781,7 +781,7 @@ mod tests {
fn block_on_from_sync() {
gst::init().unwrap();
- let context = Context::acquire("block_on_from_sync", SLEEP_DURATION_MS).unwrap();
+ let context = Context::acquire("block_on_from_sync", SLEEP_DURATION).unwrap();
let bytes_sent = crate::runtime::executor::block_on(context.spawn(async {
let saddr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 5001);
@@ -807,7 +807,7 @@ mod tests {
fn block_on_from_context() {
gst::init().unwrap();
- let context = Context::acquire("block_on_from_context", SLEEP_DURATION_MS).unwrap();
+ let context = Context::acquire("block_on_from_context", SLEEP_DURATION).unwrap();
let join_handle = context.spawn(async {
crate::runtime::executor::block_on(async {
crate::runtime::time::delay_for(DELAY).await;
@@ -821,7 +821,7 @@ mod tests {
async fn enter_context_from_tokio() {
gst::init().unwrap();
- let context = Context::acquire("enter_context_from_tokio", SLEEP_DURATION_MS).unwrap();
+ let context = Context::acquire("enter_context_from_tokio", SLEEP_DURATION).unwrap();
let mut socket = context
.enter(|| {
let saddr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 5002);
@@ -849,7 +849,7 @@ mod tests {
fn enter_context_from_sync() {
gst::init().unwrap();
- let context = Context::acquire("enter_context_from_sync", SLEEP_DURATION_MS).unwrap();
+ let context = Context::acquire("enter_context_from_sync", SLEEP_DURATION).unwrap();
let mut socket = context
.enter(|| {
let saddr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 5003);
diff --git a/generic/threadshare/src/runtime/task.rs b/generic/threadshare/src/runtime/task.rs
index d2528e6c6..7c2ff2568 100644
--- a/generic/threadshare/src/runtime/task.rs
+++ b/generic/threadshare/src/runtime/task.rs
@@ -1252,7 +1252,7 @@ mod tests {
}
}
- let context = Context::acquire("task_iterate", 2).unwrap();
+ let context = Context::acquire("task_iterate", Duration::from_millis(2)).unwrap();
let task = Task::default();
@@ -1519,7 +1519,7 @@ mod tests {
}
}
- let context = Context::acquire("prepare_error", 2).unwrap();
+ let context = Context::acquire("prepare_error", Duration::from_millis(2)).unwrap();
let task = Task::default();
@@ -1604,7 +1604,7 @@ mod tests {
}
}
- let context = Context::acquire("prepare_start_ok", 2).unwrap();
+ let context = Context::acquire("prepare_start_ok", Duration::from_millis(2)).unwrap();
let task = Task::default();
@@ -1612,7 +1612,9 @@ mod tests {
task.prepare(TaskPrepareTest { prepare_receiver }, context.clone())
.unwrap();
- let start_ctx = Context::acquire("prepare_start_ok_requester", 0).unwrap();
+ // FIXME use Duration::ZERO when MSVC >= 1.53.2
+ let start_ctx =
+ Context::acquire("prepare_start_ok_requester", Duration::from_nanos(0)).unwrap();
let task_clone = task.clone();
let (ready_sender, ready_receiver) = oneshot::channel();
let start_handle = start_ctx.spawn(async move {
@@ -1720,7 +1722,7 @@ mod tests {
}
}
- let context = Context::acquire("prepare_start_error", 2).unwrap();
+ let context = Context::acquire("prepare_start_error", Duration::from_millis(2)).unwrap();
let task = Task::default();
@@ -1735,7 +1737,9 @@ mod tests {
)
.unwrap();
- let start_ctx = Context::acquire("prepare_start_error_requester", 0).unwrap();
+ // FIXME use Duration::ZERO when MSVC >= 1.53.2
+ let start_ctx =
+ Context::acquire("prepare_start_error_requester", Duration::from_nanos(0)).unwrap();
let task_clone = task.clone();
let (ready_sender, ready_receiver) = oneshot::channel();
let start_handle = start_ctx.spawn(async move {
@@ -1808,7 +1812,7 @@ mod tests {
}
}
- let context = Context::acquire("pause_start", 2).unwrap();
+ let context = Context::acquire("pause_start", Duration::from_millis(2)).unwrap();
let task = Task::default();
@@ -1901,7 +1905,7 @@ mod tests {
}
}
- let context = Context::acquire("successive_pause_start", 2).unwrap();
+ let context = Context::acquire("successive_pause_start", Duration::from_millis(2)).unwrap();
let task = Task::default();
@@ -1962,7 +1966,7 @@ mod tests {
}
}
- let context = Context::acquire("flush_regular_sync", 2).unwrap();
+ let context = Context::acquire("flush_regular_sync", Duration::from_millis(2)).unwrap();
let task = Task::default();
@@ -2049,7 +2053,8 @@ mod tests {
}
}
- let context = Context::acquire("flush_regular_different_context", 2).unwrap();
+ let context =
+ Context::acquire("flush_regular_different_context", Duration::from_millis(2)).unwrap();
let task = Task::default();
@@ -2067,7 +2072,11 @@ mod tests {
gst_debug!(RUNTIME_CAT, "flush_regular_different_context: start");
task.start().unwrap();
- let oob_context = Context::acquire("flush_regular_different_context_oob", 2).unwrap();
+ let oob_context = Context::acquire(
+ "flush_regular_different_context_oob",
+ Duration::from_millis(2),
+ )
+ .unwrap();
let task_clone = task.clone();
let flush_handle = oob_context.spawn(async move {
@@ -2134,7 +2143,8 @@ mod tests {
}
}
- let context = Context::acquire("flush_regular_same_context", 2).unwrap();
+ let context =
+ Context::acquire("flush_regular_same_context", Duration::from_millis(2)).unwrap();
let task = Task::default();
@@ -2218,7 +2228,7 @@ mod tests {
}
}
- let context = Context::acquire("flush_from_loop", 2).unwrap();
+ let context = Context::acquire("flush_from_loop", Duration::from_millis(2)).unwrap();
let task = Task::default();
@@ -2291,7 +2301,7 @@ mod tests {
}
}
- let context = Context::acquire("pause_from_loop", 2).unwrap();
+ let context = Context::acquire("pause_from_loop", Duration::from_millis(2)).unwrap();
let task = Task::default();
@@ -2357,7 +2367,7 @@ mod tests {
}
}
- let context = Context::acquire("trigger_from_action", 2).unwrap();
+ let context = Context::acquire("trigger_from_action", Duration::from_millis(2)).unwrap();
let task = Task::default();
@@ -2427,7 +2437,7 @@ mod tests {
}
}
- let context = Context::acquire("pause_flush_start", 2).unwrap();
+ let context = Context::acquire("pause_flush_start", Duration::from_millis(2)).unwrap();
let task = Task::default();
@@ -2538,7 +2548,7 @@ mod tests {
}
}
- let context = Context::acquire("pause_flushing_start", 2).unwrap();
+ let context = Context::acquire("pause_flushing_start", Duration::from_millis(2)).unwrap();
let task = Task::default();
@@ -2629,7 +2639,7 @@ mod tests {
}
}
- let context = Context::acquire("flush_concurrent_start", 2).unwrap();
+ let context = Context::acquire("flush_concurrent_start", Duration::from_millis(2)).unwrap();
let task = Task::default();
@@ -2644,7 +2654,8 @@ mod tests {
)
.unwrap();
- let oob_context = Context::acquire("flush_concurrent_start_oob", 2).unwrap();
+ let oob_context =
+ Context::acquire("flush_concurrent_start_oob", Duration::from_millis(2)).unwrap();
let task_clone = task.clone();
task.pause().unwrap();
@@ -2746,7 +2757,7 @@ mod tests {
}
}
- let context = Context::acquire("start_timer", 2).unwrap();
+ let context = Context::acquire("start_timer", Duration::from_millis(2)).unwrap();
let task = Task::default();
diff --git a/generic/threadshare/src/socket.rs b/generic/threadshare/src/socket.rs
index 169a62d31..8d8da5caa 100644
--- a/generic/threadshare/src/socket.rs
+++ b/generic/threadshare/src/socket.rs
@@ -143,19 +143,25 @@ impl<T: SocketRead> Socket<T> {
Ok((len, saddr)) => {
let dts = if T::DO_TIMESTAMP {
let time = self.clock.as_ref().unwrap().time();
- let running_time = time - self.base_time.unwrap();
+ let running_time = time
+ .zip(self.base_time)
+ // TODO Do we want None if time < base_time
+ // or do we expect Some?
+ .and_then(|(time, base_time)| time.checked_sub(base_time));
+ // FIXME maybe we should check if running_time.is_none
+ // so as to display another message
gst_debug!(
SOCKET_CAT,
obj: &self.element,
"Read {} bytes at {} (clock {})",
len,
- running_time,
- time
+ running_time.display(),
+ time.display(),
);
running_time
} else {
gst_debug!(SOCKET_CAT, obj: &self.element, "Read {} bytes", len);
- gst::CLOCK_TIME_NONE
+ gst::ClockTime::NONE
};
let mut buffer = self.mapped_buffer.take().unwrap().into_buffer();
diff --git a/generic/threadshare/src/tcpclientsrc/imp.rs b/generic/threadshare/src/tcpclientsrc/imp.rs
index 2d5808973..748f5eadb 100644
--- a/generic/threadshare/src/tcpclientsrc/imp.rs
+++ b/generic/threadshare/src/tcpclientsrc/imp.rs
@@ -31,6 +31,7 @@ use std::io;
use std::net::{IpAddr, SocketAddr};
use std::sync::Arc;
use std::sync::Mutex as StdMutex;
+use std::time::Duration;
use std::u16;
use std::u32;
@@ -47,7 +48,8 @@ const DEFAULT_PORT: i32 = 4953;
const DEFAULT_CAPS: Option<gst::Caps> = None;
const DEFAULT_BLOCKSIZE: u32 = 4096;
const DEFAULT_CONTEXT: &str = "";
-const DEFAULT_CONTEXT_WAIT: u32 = 0;
+// FIXME use Duration::ZERO when MSVC >= 1.53.2
+const DEFAULT_CONTEXT_WAIT: Duration = Duration::from_nanos(0);
#[derive(Debug, Clone)]
struct Settings {
@@ -56,7 +58,7 @@ struct Settings {
caps: Option<gst::Caps>,
blocksize: u32,
context: String,
- context_wait: u32,
+ context_wait: Duration,
}
impl Default for Settings {
@@ -224,7 +226,7 @@ impl PadSrcHandler for TcpClientSrcPadHandler {
gst_log!(CAT, obj: pad.gst_pad(), "Handling {:?}", query);
let ret = match query.view_mut() {
QueryView::Latency(ref mut q) => {
- q.set(false, 0.into(), gst::CLOCK_TIME_NONE);
+ q.set(false, gst::ClockTime::ZERO, gst::ClockTime::NONE);
true
}
QueryView::Scheduling(ref mut q) => {
@@ -580,7 +582,7 @@ impl ObjectImpl for TcpClientSrc {
"Throttle poll loop to run at most once every this many ms",
0,
1000,
- DEFAULT_CONTEXT_WAIT,
+ DEFAULT_CONTEXT_WAIT.as_millis() as u32,
glib::ParamFlags::READWRITE,
),
glib::ParamSpec::new_string(
@@ -649,7 +651,9 @@ impl ObjectImpl for TcpClientSrc {
.unwrap_or_else(|| "".into());
}
"context-wait" => {
- settings.context_wait = value.get().expect("type checked upstream");
+ settings.context_wait = Duration::from_millis(
+ value.get::<u32>().expect("type checked upstream").into(),
+ );
}
_ => unimplemented!(),
}
@@ -663,7 +667,7 @@ impl ObjectImpl for TcpClientSrc {
"caps" => settings.caps.to_value(),
"blocksize" => settings.blocksize.to_value(),
"context" => settings.context.to_value(),
- "context-wait" => settings.context_wait.to_value(),
+ "context-wait" => (settings.context_wait.as_millis() as u32).to_value(),
_ => unimplemented!(),
}
}
diff --git a/generic/threadshare/src/udpsink/imp.rs b/generic/threadshare/src/udpsink/imp.rs
index 2a6f96b74..2a9d54b2f 100644
--- a/generic/threadshare/src/udpsink/imp.rs
+++ b/generic/threadshare/src/udpsink/imp.rs
@@ -62,7 +62,8 @@ const DEFAULT_TTL_MC: u32 = 1;
const DEFAULT_QOS_DSCP: i32 = -1;
const DEFAULT_CLIENTS: &str = "";
const DEFAULT_CONTEXT: &str = "";
-const DEFAULT_CONTEXT_WAIT: u32 = 0;
+// FIXME use Duration::ZERO when MSVC >= 1.53.2
+const DEFAULT_CONTEXT_WAIT: Duration = Duration::from_nanos(0);
#[derive(Debug, Clone)]
struct Settings {
@@ -81,7 +82,7 @@ struct Settings {
ttl_mc: u32,
qos_dscp: i32,
context: String,
- context_wait: u32,
+ context_wait: Duration,
}
impl Default for Settings {
@@ -125,7 +126,7 @@ enum TaskItem {
struct UdpSinkPadHandlerInner {
sync: bool,
segment: Option<gst::Segment>,
- latency: gst::ClockTime,
+ latency: Option<gst::ClockTime>,
socket: Arc<Mutex<Option<tokio::net::UdpSocket>>>,
socket_v6: Arc<Mutex<Option<tokio::net::UdpSocket>>>,
#[allow(clippy::rc_buffer)]
@@ -141,7 +142,7 @@ impl UdpSinkPadHandlerInner {
UdpSinkPadHandlerInner {
sync: DEFAULT_SYNC,
segment: None,
- latency: gst::CLOCK_TIME_NONE,
+ latency: None,
socket: Arc::new(Mutex::new(None)),
socket_v6: Arc::new(Mutex::new(None)),
clients: Arc::new(vec![SocketAddr::new(
@@ -217,7 +218,7 @@ impl UdpSinkPadHandler {
}
fn set_latency(&self, latency: gst::ClockTime) {
- self.0.write().unwrap().latency = latency;
+ self.0.write().unwrap().latency = Some(latency);
}
fn prepare(&self) {
@@ -405,15 +406,17 @@ impl UdpSinkPadHandler {
) = {
let mut inner = self.0.write().unwrap();
let do_sync = inner.sync;
- let mut rtime: gst::ClockTime = 0.into();
+ let mut rtime = gst::ClockTime::NONE;
if let Some(segment) = &inner.segment {
- if let Some(segment) = segment.downcast_ref::<gst::format::Time>() {
- rtime = segment.to_running_time(buffer.pts());
- if inner.latency.is_some() {
- rtime += inner.latency;
- }
- }
+ rtime = segment
+ .downcast_ref::<gst::format::Time>()
+ .and_then(|segment| {
+ segment
+ .to_running_time(buffer.pts())
+ .zip(inner.latency)
+ .map(|(rtime, latency)| rtime + latency)
+ });
}
let clients_to_configure = mem::replace(&mut inner.clients_to_configure, vec![]);
@@ -519,14 +522,20 @@ impl UdpSinkPadHandler {
}
/* Wait until specified time */
- async fn sync(&self, element: &super::UdpSink, running_time: gst::ClockTime) {
+ async fn sync(
+ &self,
+ element: &super::UdpSink,
+ running_time: impl Into<Option<gst::ClockTime>>,
+ ) {
let now = element.current_running_time();
- if let Some(delay) = running_time
- .saturating_sub(now)
- .and_then(|delay| delay.nseconds())
+ match running_time
+ .into()
+ .zip(now)
+ .and_then(|(running_time, now)| running_time.checked_sub(now))
{
- runtime::time::delay_for(Duration::from_nanos(delay)).await;
+ Some(delay) => runtime::time::delay_for(delay.into()).await,
+ None => runtime::executor::yield_now().await,
}
}
@@ -980,7 +989,7 @@ impl ObjectImpl for UdpSink {
"Throttle poll loop to run at most once every this many ms",
0,
1000,
- DEFAULT_CONTEXT_WAIT,
+ DEFAULT_CONTEXT_WAIT.as_millis() as u32,
glib::ParamFlags::READWRITE,
),
glib::ParamSpec::new_boolean(
@@ -1257,7 +1266,9 @@ impl ObjectImpl for UdpSink {
.unwrap_or_else(|| "".into());
}
"context-wait" => {
- settings.context_wait = value.get().expect("type checked upstream");
+ settings.context_wait = Duration::from_millis(
+ value.get::<u32>().expect("type checked upstream").into(),
+ );
}
_ => unimplemented!(),
}
@@ -1309,7 +1320,7 @@ impl ObjectImpl for UdpSink {
clients.join(",").to_value()
}
"context" => settings.context.to_value(),
- "context-wait" => settings.context_wait.to_value(),
+ "context-wait" => (settings.context_wait.as_millis() as u32).to_value(),
_ => unimplemented!(),
}
}
diff --git a/generic/threadshare/src/udpsrc/imp.rs b/generic/threadshare/src/udpsrc/imp.rs
index 347cc4445..8facdb493 100644
--- a/generic/threadshare/src/udpsrc/imp.rs
+++ b/generic/threadshare/src/udpsrc/imp.rs
@@ -32,6 +32,7 @@ use std::io;
use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr};
use std::sync::Arc;
use std::sync::Mutex as StdMutex;
+use std::time::Duration;
use std::u16;
use crate::runtime::prelude::*;
@@ -47,7 +48,8 @@ const DEFAULT_MTU: u32 = 1492;
const DEFAULT_SOCKET: Option<GioSocketWrapper> = None;
const DEFAULT_USED_SOCKET: Option<GioSocketWrapper> = None;
const DEFAULT_CONTEXT: &str = "";
-const DEFAULT_CONTEXT_WAIT: u32 = 0;
+// FIXME use Duration::ZERO when MSVC >= 1.53.2
+const DEFAULT_CONTEXT_WAIT: Duration = Duration::from_nanos(0);
const DEFAULT_RETRIEVE_SENDER_ADDRESS: bool = true;
#[derive(Debug, Clone)]
@@ -60,7 +62,7 @@ struct Settings {
socket: Option<GioSocketWrapper>,
used_socket: Option<GioSocketWrapper>,
context: String,
- context_wait: u32,
+ context_wait: Duration,
retrieve_sender_address: bool,
}
@@ -237,7 +239,7 @@ impl PadSrcHandler for UdpSrcPadHandler {
let ret = match query.view_mut() {
QueryView::Latency(ref mut q) => {
- q.set(true, 0.into(), gst::CLOCK_TIME_NONE);
+ q.set(true, gst::ClockTime::ZERO, gst::ClockTime::NONE);
true
}
QueryView::Scheduling(ref mut q) => {
@@ -301,7 +303,7 @@ impl TaskImpl for UdpSrcTask {
async move {
gst_log!(CAT, obj: &self.element, "Starting task");
self.socket
- .set_clock(self.element.clock(), Some(self.element.base_time()));
+ .set_clock(self.element.clock(), self.element.base_time());
gst_log!(CAT, obj: &self.element, "Task started");
Ok(())
}
@@ -721,7 +723,7 @@ impl ObjectImpl for UdpSrc {
"Throttle poll loop to run at most once every this many ms",
0,
1000,
- DEFAULT_CONTEXT_WAIT,
+ DEFAULT_CONTEXT_WAIT.as_millis() as u32,
glib::ParamFlags::READWRITE,
),
glib::ParamSpec::new_string(
@@ -836,7 +838,9 @@ impl ObjectImpl for UdpSrc {
.unwrap_or_else(|| "".into());
}
"context-wait" => {
- settings.context_wait = value.get().expect("type checked upstream");
+ settings.context_wait = Duration::from_millis(
+ value.get::<u32>().expect("type checked upstream").into(),
+ );
}
"retrieve-sender-address" => {
settings.retrieve_sender_address = value.get().expect("type checked upstream");
@@ -864,7 +868,7 @@ impl ObjectImpl for UdpSrc {
.map(GioSocketWrapper::as_socket)
.to_value(),
"context" => settings.context.to_value(),
- "context-wait" => settings.context_wait.to_value(),
+ "context-wait" => (settings.context_wait.as_millis() as u32).to_value(),
"retrieve-sender-address" => settings.retrieve_sender_address.to_value(),
_ => unimplemented!(),
}
diff --git a/generic/threadshare/tests/pad.rs b/generic/threadshare/tests/pad.rs
index 686a6f484..1c7b84c60 100644
--- a/generic/threadshare/tests/pad.rs
+++ b/generic/threadshare/tests/pad.rs
@@ -34,6 +34,7 @@ use once_cell::sync::Lazy;
use std::boxed::Box;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Mutex as StdMutex;
+use std::time::Duration;
use gstthreadshare::runtime::prelude::*;
use gstthreadshare::runtime::{
@@ -41,7 +42,7 @@ use gstthreadshare::runtime::{
};
const DEFAULT_CONTEXT: &str = "";
-const THROTTLING_DURATION: u32 = 2;
+const THROTTLING_DURATION: Duration = Duration::from_millis(2);
fn init() {
use std::sync::Once;
@@ -968,7 +969,7 @@ fn src_tsqueue_sink_nominal() {
.set_property("context", &format!("{}_queue", name))
.unwrap();
ts_queue
- .set_property("context-wait", &THROTTLING_DURATION)
+ .set_property("context-wait", &(THROTTLING_DURATION.as_millis() as u32))
.unwrap();
let (pipeline, src_element, _sink_element, receiver) = setup(name, Some(ts_queue), None);
@@ -1007,7 +1008,7 @@ fn src_tsproxy_sink_nominal() {
.set_property("context", &format!("{}_context", name))
.unwrap();
ts_proxy_src
- .set_property("context-wait", &THROTTLING_DURATION)
+ .set_property("context-wait", &(THROTTLING_DURATION.as_millis() as u32))
.unwrap();
let (pipeline, src_element, _sink_element, receiver) =
diff --git a/generic/threadshare/tests/proxy.rs b/generic/threadshare/tests/proxy.rs
index 5c2b28df8..87c1e1b17 100644
--- a/generic/threadshare/tests/proxy.rs
+++ b/generic/threadshare/tests/proxy.rs
@@ -76,7 +76,7 @@ fn test_push() {
let mut eos = false;
let bus = pipeline.bus().unwrap();
- while let Some(msg) = bus.timed_pop(5 * gst::SECOND) {
+ while let Some(msg) = bus.timed_pop(5 * gst::ClockTime::SECOND) {
use gst::MessageView;
match msg.view() {
MessageView::Eos(..) => {
@@ -123,8 +123,8 @@ fn test_from_pipeline_to_pipeline() {
pipe_1.set_state(gst::State::Paused).unwrap();
pipe_2.set_state(gst::State::Paused).unwrap();
- let _ = pipe_1.state(gst::CLOCK_TIME_NONE);
- let _ = pipe_2.state(gst::CLOCK_TIME_NONE);
+ let _ = pipe_1.state(gst::ClockTime::NONE);
+ let _ = pipe_2.state(gst::ClockTime::NONE);
pipe_1.set_state(gst::State::Null).unwrap();
diff --git a/generic/threadshare/tests/queue.rs b/generic/threadshare/tests/queue.rs
index f107ff50e..f74fb4cc7 100644
--- a/generic/threadshare/tests/queue.rs
+++ b/generic/threadshare/tests/queue.rs
@@ -71,7 +71,7 @@ fn test_push() {
let mut eos = false;
let bus = pipeline.bus().unwrap();
- while let Some(msg) = bus.timed_pop(5 * gst::SECOND) {
+ while let Some(msg) = bus.timed_pop(5 * gst::ClockTime::SECOND) {
use gst::MessageView;
match msg.view() {
MessageView::Eos(..) => {
diff --git a/generic/threadshare/tests/tcpclientsrc.rs b/generic/threadshare/tests/tcpclientsrc.rs
index c60417bfe..0a8f68d27 100644
--- a/generic/threadshare/tests/tcpclientsrc.rs
+++ b/generic/threadshare/tests/tcpclientsrc.rs
@@ -95,7 +95,7 @@ fn test_push() {
let mut eos = false;
let bus = pipeline.bus().unwrap();
- while let Some(msg) = bus.timed_pop(5 * gst::SECOND) {
+ while let Some(msg) = bus.timed_pop(5 * gst::ClockTime::SECOND) {
use gst::MessageView;
match msg.view() {
MessageView::Eos(..) => {