diff options
author | Sebastian Dröge <sebastian@centricular.com> | 2018-04-12 09:59:39 +0300 |
---|---|---|
committer | Sebastian Dröge <sebastian@centricular.com> | 2018-11-05 14:36:47 +0300 |
commit | 2dfca38977b41e8394fc00841ce9ffed01b31a24 (patch) | |
tree | 69442622b06d0d7b98080b51aa57af1b47b165df | |
parent | b56e1a987363611ab3887d965502df10f3889747 (diff) |
threadshare: Turn the single-threaded executor until no futures are left to be run before waiting
Otherwise in e.g. a pipeline like
ts-udpsrc ! ts-queue ! fakesink
the first turn would only get a packet and queue it up, then we would
wait due to throttling and only then we would forward the packet from
the queue (but not poll the socket again), wait again due to throttling
and only then poll and get the next packet.
See https://github.com/tokio-rs/tokio/issues/310
-rw-r--r-- | gst-plugin-threadshare/Cargo.toml | 10 | ||||
-rw-r--r-- | gst-plugin-threadshare/src/iocontext.rs | 9 |
2 files changed, 13 insertions, 6 deletions
diff --git a/gst-plugin-threadshare/Cargo.toml b/gst-plugin-threadshare/Cargo.toml index f95e4dddd..e42c2c7e5 100644 --- a/gst-plugin-threadshare/Cargo.toml +++ b/gst-plugin-threadshare/Cargo.toml @@ -10,11 +10,11 @@ gstreamer-sys = { git = "https://github.com/sdroege/gstreamer-sys" } glib = { git = "https://github.com/gtk-rs/glib" } gstreamer = { git = "https://github.com/sdroege/gstreamer-rs" } gst-plugin = { git = "https://github.com/sdroege/gst-plugin-rs" } -tokio = { git = "https://github.com/tokio-rs/tokio" } -tokio-reactor = { git = "https://github.com/tokio-rs/tokio" } -tokio-executor = { git = "https://github.com/tokio-rs/tokio" } -tokio-timer = { git = "https://github.com/tokio-rs/tokio" } -tokio-threadpool = { git = "https://github.com/tokio-rs/tokio" } +tokio = { git = "https://github.com/sdroege/tokio", branch = "fair-turn" } +tokio-reactor = { git = "https://github.com/sdroege/tokio", branch = "fair-turn" } +tokio-executor = { git = "https://github.com/sdroege/tokio", branch = "fair-turn" } +tokio-timer = { git = "https://github.com/sdroege/tokio", branch = "fair-turn" } +tokio-threadpool = { git = "https://github.com/sdroege/tokio", branch = "fair-turn" } futures = "0.1" lazy_static = "1.0" either = "1.0" diff --git a/gst-plugin-threadshare/src/iocontext.rs b/gst-plugin-threadshare/src/iocontext.rs index f82f30a5f..b992f5a99 100644 --- a/gst-plugin-threadshare/src/iocontext.rs +++ b/gst-plugin-threadshare/src/iocontext.rs @@ -134,6 +134,8 @@ impl IOContextRunner { ::tokio_reactor::with_default(&handle, &mut enter, |mut enter| { ::tokio_timer::with_default(&timer_handle, &mut enter, |enter| loop { + use tokio::executor::current_thread::Turn; + let now = time::Instant::now(); if self.shutdown.load(atomic::Ordering::SeqCst) > RUNNING { @@ -148,7 +150,11 @@ impl IOContextRunner { } gst_trace!(CONTEXT_CAT, "Turning current thread '{}'", self.name); - current_thread.enter(enter).turn(None).unwrap(); + while let Turn(true) = current_thread + .enter(enter) + .turn(Some(time::Duration::from_millis(0))) + .unwrap() + {} gst_trace!(CONTEXT_CAT, "Turned current thread '{}'", self.name); let elapsed = now.elapsed(); @@ -277,6 +283,7 @@ impl IOContext { let shutdown = IOContextRunner::start(name, wait, reactor); + // FIXME: The executor threads are not throttled at all, only the reactor let mut pool_builder = thread_pool::Builder::new(); pool_builder .around_worker(move |w, enter| { |