Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.freedesktop.org/gstreamer/gst-plugins-rs.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorFrançois Laignel <francois@centricular.com>2023-09-18 17:59:27 +0300
committerGStreamer Marge Bot <gitlab-merge-bot@gstreamer-foundation.org>2023-10-12 18:45:58 +0300
commitc021e2b69f905a53c6eb80a7a52cf84e309b2649 (patch)
tree9939de8a7fb9fc179ac3b09a5c96f1cc40631f7c /net/webrtc
parent42008fb8953e9a3e379ff221a6ff2f190849374c (diff)
net/webrtcsink: don't miss ice candidates
During `on_remote_description_set()` processing, current session is removed from the sessions `HashMap`. If an ice candidate is submitted to `handle_ice()` by that time, the session can't be found and the candidate is ignored. This commit wraps the Session in the sessions `HashMap` so an entry is kept while `on_remote_description_set()` is running. Incoming candidates received by `handle_ice()` will be processed immediately or enqueued and handled when the session is restored by `on_remote_description_set()`. Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/1325>
Diffstat (limited to 'net/webrtc')
-rw-r--r--net/webrtc/src/webrtcsink/imp.rs195
1 files changed, 176 insertions, 19 deletions
diff --git a/net/webrtc/src/webrtcsink/imp.rs b/net/webrtc/src/webrtcsink/imp.rs
index f6b83ae6c..ca4b393c4 100644
--- a/net/webrtc/src/webrtcsink/imp.rs
+++ b/net/webrtc/src/webrtcsink/imp.rs
@@ -277,10 +277,141 @@ struct SignallerSignals {
shutdown: glib::SignalHandlerId,
}
+struct IceCandidate {
+ sdp_m_line_index: u32,
+ candidate: String,
+}
+
+/// Wrapper around `Session`.
+///
+/// This makes it possible for the `Session` to be taken out of the `State`,
+/// without removing the entry in the `sessions` `HashMap`, thus allowing
+/// the `State` lock to be released, e.g. before calling a `Signal`.
+///
+/// Taking the `Session`, replaces it with a placeholder which can enqueue
+/// items (currently ICE candidates) received while the `Session` is taken.
+/// In which case, the enqueued items will be processed when the `Session` is
+/// restored.
+enum SessionWrapper {
+ /// The `Session` is available in the `SessionWrapper`.
+ InPlace(Session),
+ /// The `Session` was taken out the `SessionWrapper`.
+ Taken(Vec<IceCandidate>),
+}
+
+impl SessionWrapper {
+ /// Unwraps a reference to the `Session` of this `SessionWrapper`.
+ ///
+ /// # Panics
+ ///
+ /// Panics is the `Session` was taken.
+ fn unwrap(&self) -> &Session {
+ match self {
+ SessionWrapper::InPlace(session) => session,
+ _ => panic!("Session is not In Place"),
+ }
+ }
+
+ /// Unwraps a mutable reference to the `Session` of this `SessionWrapper`.
+ ///
+ /// # Panics
+ ///
+ /// Panics is the `Session` was taken.
+ fn unwrap_mut(&mut self) -> &mut Session {
+ match self {
+ SessionWrapper::InPlace(session) => session,
+ _ => panic!("Session is not In Place"),
+ }
+ }
+
+ /// Consumes the `SessionWrapper`, returning the wrapped `Session`.
+ ///
+ /// # Panics
+ ///
+ /// Panics is the `Session` was taken.
+ fn into_inner(self) -> Session {
+ match self {
+ SessionWrapper::InPlace(session) => session,
+ _ => panic!("Session is not In Place"),
+ }
+ }
+
+ /// Takes the `Session` out of this `SessionWrapper`, leaving it in the `Taken` state.
+ ///
+ /// # Panics
+ ///
+ /// Panics is the `Session` was taken.
+ fn take(&mut self) -> Session {
+ use SessionWrapper::*;
+ match std::mem::replace(self, Taken(Vec::new())) {
+ InPlace(session) => session,
+ _ => panic!("Session is not In Place"),
+ }
+ }
+
+ /// Restores a `Session` to this `SessionWrapper`.
+ ///
+ /// Processes any pending items enqueued while the `Session` was taken.
+ ///
+ /// # Panics
+ ///
+ /// Panics is the `Session` is already in place.
+ fn restore(&mut self, session: Session) {
+ let SessionWrapper::Taken(ref cands) = self else {
+ panic!("Session is already in place");
+ };
+
+ if !cands.is_empty() {
+ gst::trace!(
+ CAT,
+ "handling {} pending ice candidates for session {}",
+ cands.len(),
+ session.id,
+ );
+ for cand in cands {
+ session.webrtcbin.emit_by_name::<()>(
+ "add-ice-candidate",
+ &[&cand.sdp_m_line_index, &cand.candidate],
+ );
+ }
+ }
+
+ *self = SessionWrapper::InPlace(session);
+ }
+
+ /// Adds an ICE candidate to this `SessionWrapper`.
+ ///
+ /// If the `Session` is in place, the ICE candidate is added immediately,
+ /// otherwise, it will be added when the `Session` is restored.
+ fn add_ice_candidate(&mut self, session_id: &str, sdp_m_line_index: u32, candidate: &str) {
+ match self {
+ SessionWrapper::InPlace(session) => {
+ gst::trace!(CAT, "adding ice candidate for session {session_id}");
+ session
+ .webrtcbin
+ .emit_by_name::<()>("add-ice-candidate", &[&sdp_m_line_index, &candidate]);
+ }
+ SessionWrapper::Taken(cands) => {
+ gst::trace!(CAT, "queuing ice candidate for session {session_id}");
+ cands.push(IceCandidate {
+ sdp_m_line_index,
+ candidate: candidate.to_string(),
+ });
+ }
+ }
+ }
+}
+
+impl From<Session> for SessionWrapper {
+ fn from(session: Session) -> Self {
+ SessionWrapper::InPlace(session)
+ }
+}
+
/* Our internal state */
struct State {
signaller_state: SignallerState,
- sessions: HashMap<String, Session>,
+ sessions: HashMap<String, SessionWrapper>,
codecs: BTreeMap<i32, Codec>,
/// Used to abort codec discovery
codecs_abort_handles: Vec<futures::future::AbortHandle>,
@@ -957,7 +1088,8 @@ impl State {
}
fn end_session(&mut self, session_id: &str) -> Option<Session> {
- if let Some(mut session) = self.sessions.remove(session_id) {
+ if let Some(session) = self.sessions.remove(session_id) {
+ let mut session = session.into_inner();
self.finalize_session(&mut session);
Some(session)
} else {
@@ -1674,6 +1806,7 @@ impl BaseWebRTCSink {
if let Some(session) = state.sessions.get(session_id) {
session
+ .unwrap()
.webrtcbin
.emit_by_name::<()>("set-local-description", &[&offer, &None::<gst::Promise>]);
drop(state);
@@ -1693,7 +1826,8 @@ impl BaseWebRTCSink {
drop(settings);
let mut state = self.state.lock().unwrap();
- if let Some(mut session) = state.sessions.remove(session_id) {
+ if let Some(session) = state.sessions.get_mut(session_id) {
+ let mut session = session.take();
let sdp = answer.sdp();
session.sdp = Some(sdp.to_owned());
@@ -1705,13 +1839,19 @@ impl BaseWebRTCSink {
.and_then(|format| format.parse::<i32>().ok());
}
+ // FIXME I think the intention was to drop(state) and re-acquire the lock after the call
+
session
.webrtcbin
.emit_by_name::<()>("set-local-description", &[&answer, &None::<gst::Promise>]);
let session_id = session.id.clone();
- state.sessions.insert(session.id.clone(), session);
+ if let Some(session_wrapper) = state.sessions.get_mut(&session_id) {
+ session_wrapper.restore(session);
+ } else {
+ gst::warning!(CAT, "Session {session_id} was removed");
+ }
drop(state);
signaller.send_sdp(&session_id, &answer);
@@ -1776,6 +1916,7 @@ impl BaseWebRTCSink {
});
session
+ .unwrap()
.webrtcbin
.emit_by_name::<()>("create-answer", &[&None::<gst::Structure>, &promise]);
}
@@ -1910,6 +2051,7 @@ impl BaseWebRTCSink {
gst::debug!(CAT, obj: element, "Negotiating for session {}", session_id);
if let Some(session) = state.sessions.get(session_id) {
+ let session = session.unwrap();
gst::trace!(CAT, "WebRTC pads: {:?}", session.webrtc_pads);
if let Some(offer) = offer {
@@ -2211,7 +2353,7 @@ impl BaseWebRTCSink {
let state = this.state.lock().unwrap();
if let Some(session) = state.sessions.get(&session_id_clone) {
- for webrtc_pad in session.webrtc_pads.values() {
+ for webrtc_pad in session.unwrap().webrtc_pads.values() {
if let Some(srcpad) = webrtc_pad.pad.peer() {
srcpad.send_event(
gst_video::UpstreamForceKeyUnitEvent::builder()
@@ -2277,7 +2419,7 @@ impl BaseWebRTCSink {
let element = element.expect("on-new-ssrc emitted when webrtcsink has been disposed?");
let mut state = element.imp().state.lock().unwrap();
if let Some(session) = state.sessions.get_mut(&session_id_str) {
-
+ let session = session.unwrap_mut();
if session.stats_sigid.is_none() {
let session_id_str = session_id_str.clone();
let element = element.downgrade();
@@ -2357,7 +2499,9 @@ impl BaseWebRTCSink {
}
});
- state.sessions.insert(session_id.to_string(), session);
+ state
+ .sessions
+ .insert(session_id.to_string(), session.into());
let mut streams: Vec<InputStream> = state.streams.values().cloned().collect();
@@ -2437,12 +2581,12 @@ impl BaseWebRTCSink {
{
let mut state = this.state.lock().unwrap();
- if let Some(mut session) = state.sessions.remove(&session_id) {
+ if let Some(session) = state.sessions.get_mut(&session_id) {
+ let session = session.unwrap_mut();
session.webrtc_pads = webrtc_pads;
if offer_clone.is_some() {
session.codecs = Some(codecs);
}
- state.sessions.insert(session_id.to_owned(), session);
}
}
@@ -2532,6 +2676,7 @@ impl BaseWebRTCSink {
) {
let mut state = element.imp().state.lock().unwrap();
if let Some(session) = state.sessions.get_mut(session_id) {
+ let session = session.unwrap_mut();
if let Some(congestion_controller) = session.congestion_controller.as_mut() {
congestion_controller.loss_control(element, stats, &mut session.encoders);
}
@@ -2552,6 +2697,7 @@ impl BaseWebRTCSink {
let mut state = element.imp().state.lock().unwrap();
if let Some(session) = state.sessions.get_mut(&session_id) {
+ let session = session.unwrap_mut();
if let Some(congestion_controller) = session.congestion_controller.as_mut() {
congestion_controller.delay_control(&element, stats, &mut session.encoders,);
}
@@ -2573,7 +2719,7 @@ impl BaseWebRTCSink {
let mut state = element.imp().state.lock().unwrap();
if let Some(session) = state.sessions.get_mut(session_id) {
- session.rtprtxsend = Some(rtprtxsend);
+ session.unwrap_mut().rtprtxsend = Some(rtprtxsend);
}
}
@@ -2582,6 +2728,8 @@ impl BaseWebRTCSink {
let mut state = element.imp().state.lock().unwrap();
if let Some(session) = state.sessions.get_mut(session_id) {
+ let session = session.unwrap_mut();
+
let n_encoders = session.encoders.len();
let fec_ratio = {
@@ -2616,7 +2764,9 @@ impl BaseWebRTCSink {
let mut remove = false;
let codecs = state.codecs.clone();
- if let Some(mut session) = state.sessions.remove(&session_id) {
+ if let Some(session) = state.sessions.get_mut(&session_id) {
+ let mut session = session.take();
+
for webrtc_pad in session.webrtc_pads.clone().values() {
let transceiver = webrtc_pad
.pad
@@ -2691,14 +2841,17 @@ impl BaseWebRTCSink {
}));
if remove {
+ let _ = state.sessions.remove(&session_id);
state.finalize_session(&mut session);
drop(state);
let settings = self.settings.lock().unwrap();
let signaller = settings.signaller.clone();
drop(settings);
signaller.end_session(&session_id);
+ } else if let Some(session_wrapper) = state.sessions.get_mut(&session_id) {
+ session_wrapper.restore(session);
} else {
- state.sessions.insert(session.id.clone(), session);
+ gst::warning!(CAT, "Session {session_id} was removed");
}
}
}
@@ -2711,7 +2864,7 @@ impl BaseWebRTCSink {
_sdp_mid: Option<String>,
candidate: &str,
) {
- let state = self.state.lock().unwrap();
+ let mut state = self.state.lock().unwrap();
let sdp_m_line_index = match sdp_m_line_index {
Some(sdp_m_line_index) => sdp_m_line_index,
@@ -2721,11 +2874,8 @@ impl BaseWebRTCSink {
}
};
- if let Some(session) = state.sessions.get(session_id) {
- gst::trace!(CAT, "adding ice candidate for session {}", session_id);
- session
- .webrtcbin
- .emit_by_name::<()>("add-ice-candidate", &[&sdp_m_line_index, &candidate]);
+ if let Some(session_wrapper) = state.sessions.get_mut(session_id) {
+ session_wrapper.add_ice_candidate(session_id, sdp_m_line_index, candidate);
} else {
gst::warning!(CAT, "No consumer with ID {session_id}");
}
@@ -2740,6 +2890,8 @@ impl BaseWebRTCSink {
let mut state = self.state.lock().unwrap();
if let Some(session) = state.sessions.get_mut(session_id) {
+ let session = session.unwrap_mut();
+
let sdp = desc.sdp();
session.sdp = Some(sdp.to_owned());
@@ -3100,7 +3252,12 @@ impl BaseWebRTCSink {
.unwrap()
.sessions
.iter()
- .map(|(name, consumer)| (name.as_str(), consumer.gather_stats().to_send_value())),
+ .map(|(name, consumer)| {
+ (
+ name.as_str(),
+ consumer.unwrap().gather_stats().to_send_value(),
+ )
+ }),
)
}