Welcome to mirror list, hosted at ThFree Co, Russian Federation.

github.com/GStreamer/gstreamer.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Waters <matthew@centricular.com>2022-11-10 06:31:43 +0300
committerGStreamer Marge Bot <gitlab-merge-bot@gstreamer-foundation.org>2022-11-11 13:13:27 +0300
commit5ca39884207042ef542835847486101fc2249612 (patch)
tree35c350082fcf2379d99fd3f7420a8435e00a72ea
parenta34e380e2ed6ae5d35611169a74c0deb61be4739 (diff)
webrtc/datachannel: handle error messages from appsrc/sink
Fixes a possible race where closing a data channel may produce e.g. not-linked errors. Part-of: <https://gitlab.freedesktop.org/gstreamer/gstreamer/-/merge_requests/3381>
-rw-r--r--subprojects/gst-plugins-bad/ext/webrtc/gstwebrtcbin.c18
-rw-r--r--subprojects/gst-plugins-bad/ext/webrtc/webrtcdatachannel.c215
-rw-r--r--subprojects/gst-plugins-bad/ext/webrtc/webrtcdatachannel.h4
3 files changed, 164 insertions, 73 deletions
diff --git a/subprojects/gst-plugins-bad/ext/webrtc/gstwebrtcbin.c b/subprojects/gst-plugins-bad/ext/webrtc/gstwebrtcbin.c
index dd16885238..5d59ee6959 100644
--- a/subprojects/gst-plugins-bad/ext/webrtc/gstwebrtcbin.c
+++ b/subprojects/gst-plugins-bad/ext/webrtc/gstwebrtcbin.c
@@ -2407,11 +2407,11 @@ _on_sctpdec_pad_added (GstElement * sctpdec, GstPad * pad,
g_signal_emit (webrtc, gst_webrtc_bin_signals[PREPARE_DATA_CHANNEL_SIGNAL],
0, channel, FALSE);
- gst_bin_add (GST_BIN (webrtc), channel->appsrc);
- gst_bin_add (GST_BIN (webrtc), channel->appsink);
+ gst_bin_add (GST_BIN (webrtc), channel->src_bin);
+ gst_bin_add (GST_BIN (webrtc), channel->sink_bin);
- gst_element_sync_state_with_parent (channel->appsrc);
- gst_element_sync_state_with_parent (channel->appsink);
+ gst_element_sync_state_with_parent (channel->src_bin);
+ gst_element_sync_state_with_parent (channel->sink_bin);
webrtc_data_channel_link_to_sctp (channel, webrtc->priv->sctp_transport);
@@ -2422,7 +2422,7 @@ _on_sctpdec_pad_added (GstElement * sctpdec, GstPad * pad,
g_signal_connect (channel, "notify::ready-state",
G_CALLBACK (_on_data_channel_ready_state), webrtc);
- sink_pad = gst_element_get_static_pad (channel->appsink, "sink");
+ sink_pad = gst_element_get_static_pad (channel->sink_bin, "sink");
if (gst_pad_link (pad, sink_pad) != GST_PAD_LINK_OK)
GST_WARNING_OBJECT (channel, "Failed to link sctp pad %s with channel %"
GST_PTR_FORMAT, GST_PAD_NAME (pad), channel);
@@ -6968,11 +6968,11 @@ gst_webrtc_bin_create_data_channel (GstWebRTCBin * webrtc, const gchar * label,
g_signal_emit (webrtc, gst_webrtc_bin_signals[PREPARE_DATA_CHANNEL_SIGNAL], 0,
ret, TRUE);
- gst_bin_add (GST_BIN (webrtc), ret->appsrc);
- gst_bin_add (GST_BIN (webrtc), ret->appsink);
+ gst_bin_add (GST_BIN (webrtc), ret->src_bin);
+ gst_bin_add (GST_BIN (webrtc), ret->sink_bin);
- gst_element_sync_state_with_parent (ret->appsrc);
- gst_element_sync_state_with_parent (ret->appsink);
+ gst_element_sync_state_with_parent (ret->src_bin);
+ gst_element_sync_state_with_parent (ret->sink_bin);
ret = gst_object_ref (ret);
ret->webrtcbin = webrtc;
diff --git a/subprojects/gst-plugins-bad/ext/webrtc/webrtcdatachannel.c b/subprojects/gst-plugins-bad/ext/webrtc/webrtcdatachannel.c
index bb2b023618..0260c61721 100644
--- a/subprojects/gst-plugins-bad/ext/webrtc/webrtcdatachannel.c
+++ b/subprojects/gst-plugins-bad/ext/webrtc/webrtcdatachannel.c
@@ -44,6 +44,141 @@
#define GST_CAT_DEFAULT webrtc_data_channel_debug
GST_DEBUG_CATEGORY_STATIC (GST_CAT_DEFAULT);
+static void _close_procedure (WebRTCDataChannel * channel, gpointer user_data);
+
+typedef void (*ChannelTask) (GstWebRTCDataChannel * channel,
+ gpointer user_data);
+
+struct task
+{
+ GstWebRTCDataChannel *channel;
+ ChannelTask func;
+ gpointer user_data;
+ GDestroyNotify notify;
+};
+
+static GstStructure *
+_execute_task (GstWebRTCBin * webrtc, struct task *task)
+{
+ if (task->func)
+ task->func (task->channel, task->user_data);
+
+ return NULL;
+}
+
+static void
+_free_task (struct task *task)
+{
+ gst_object_unref (task->channel);
+
+ if (task->notify)
+ task->notify (task->user_data);
+ g_free (task);
+}
+
+static void
+_channel_enqueue_task (WebRTCDataChannel * channel, ChannelTask func,
+ gpointer user_data, GDestroyNotify notify)
+{
+ struct task *task = g_new0 (struct task, 1);
+
+ task->channel = gst_object_ref (channel);
+ task->func = func;
+ task->user_data = user_data;
+ task->notify = notify;
+
+ gst_webrtc_bin_enqueue_task (channel->webrtcbin,
+ (GstWebRTCBinFunc) _execute_task, task, (GDestroyNotify) _free_task,
+ NULL);
+}
+
+static void
+_channel_store_error (WebRTCDataChannel * channel, GError * error)
+{
+ GST_WEBRTC_DATA_CHANNEL_LOCK (channel);
+ if (error) {
+ GST_WARNING_OBJECT (channel, "Error: %s",
+ error ? error->message : "Unknown");
+ if (!channel->stored_error)
+ channel->stored_error = error;
+ else
+ g_clear_error (&error);
+ }
+ GST_WEBRTC_DATA_CHANNEL_UNLOCK (channel);
+}
+
+struct _WebRTCErrorIgnoreBin
+{
+ GstBin bin;
+
+ WebRTCDataChannel *data_channel;
+};
+
+G_DEFINE_TYPE (WebRTCErrorIgnoreBin, webrtc_error_ignore_bin, GST_TYPE_BIN);
+
+static void
+webrtc_error_ignore_bin_handle_message (GstBin * bin, GstMessage * message)
+{
+ WebRTCErrorIgnoreBin *self = WEBRTC_ERROR_IGNORE_BIN (bin);
+
+ switch (GST_MESSAGE_TYPE (message)) {
+ case GST_MESSAGE_ERROR:{
+ GError *error = NULL;
+ gst_message_parse_error (message, &error, NULL);
+ GST_DEBUG_OBJECT (bin, "handling error message from internal element");
+ _channel_store_error (self->data_channel, error);
+ _channel_enqueue_task (self->data_channel, (ChannelTask) _close_procedure,
+ NULL, NULL);
+ break;
+ }
+ default:
+ GST_BIN_CLASS (webrtc_error_ignore_bin_parent_class)->handle_message (bin,
+ message);
+ break;
+ }
+}
+
+static void
+webrtc_error_ignore_bin_class_init (WebRTCErrorIgnoreBinClass * klass)
+{
+ GstBinClass *bin_class = (GstBinClass *) klass;
+
+ bin_class->handle_message = webrtc_error_ignore_bin_handle_message;
+}
+
+static void
+webrtc_error_ignore_bin_init (WebRTCErrorIgnoreBin * bin)
+{
+}
+
+static GstElement *
+webrtc_error_ignore_bin_new (WebRTCDataChannel * data_channel,
+ GstElement * other)
+{
+ WebRTCErrorIgnoreBin *self;
+ GstPad *pad;
+
+ self = g_object_new (webrtc_error_ignore_bin_get_type (), NULL);
+ self->data_channel = data_channel;
+
+ gst_bin_add (GST_BIN (self), other);
+
+ pad = gst_element_get_static_pad (other, "src");
+ if (pad) {
+ GstPad *ghost_pad = gst_ghost_pad_new ("src", pad);
+ gst_element_add_pad (GST_ELEMENT (self), ghost_pad);
+ gst_clear_object (&pad);
+ }
+ pad = gst_element_get_static_pad (other, "sink");
+ if (pad) {
+ GstPad *ghost_pad = gst_ghost_pad_new ("sink", pad);
+ gst_element_add_pad (GST_ELEMENT (self), ghost_pad);
+ gst_clear_object (&pad);
+ }
+
+ return (GstElement *) self;
+}
+
#define webrtc_data_channel_parent_class parent_class
G_DEFINE_TYPE_WITH_CODE (WebRTCDataChannel, webrtc_data_channel,
GST_TYPE_WEBRTC_DATA_CHANNEL,
@@ -213,67 +348,6 @@ construct_ack_packet (WebRTCDataChannel * channel)
return buf;
}
-typedef void (*ChannelTask) (GstWebRTCDataChannel * channel,
- gpointer user_data);
-
-struct task
-{
- GstWebRTCDataChannel *channel;
- ChannelTask func;
- gpointer user_data;
- GDestroyNotify notify;
-};
-
-static GstStructure *
-_execute_task (GstWebRTCBin * webrtc, struct task *task)
-{
- if (task->func)
- task->func (task->channel, task->user_data);
-
- return NULL;
-}
-
-static void
-_free_task (struct task *task)
-{
- gst_object_unref (task->channel);
-
- if (task->notify)
- task->notify (task->user_data);
- g_free (task);
-}
-
-static void
-_channel_enqueue_task (WebRTCDataChannel * channel, ChannelTask func,
- gpointer user_data, GDestroyNotify notify)
-{
- struct task *task = g_new0 (struct task, 1);
-
- task->channel = gst_object_ref (channel);
- task->func = func;
- task->user_data = user_data;
- task->notify = notify;
-
- gst_webrtc_bin_enqueue_task (channel->webrtcbin,
- (GstWebRTCBinFunc) _execute_task, task, (GDestroyNotify) _free_task,
- NULL);
-}
-
-static void
-_channel_store_error (WebRTCDataChannel * channel, GError * error)
-{
- GST_WEBRTC_DATA_CHANNEL_LOCK (channel);
- if (error) {
- GST_WARNING_OBJECT (channel, "Error: %s",
- error ? error->message : "Unknown");
- if (!channel->stored_error)
- channel->stored_error = error;
- else
- g_clear_error (&error);
- }
- GST_WEBRTC_DATA_CHANNEL_UNLOCK (channel);
-}
-
static void
_emit_on_open (WebRTCDataChannel * channel, gpointer user_data)
{
@@ -290,6 +364,10 @@ _transport_closed (WebRTCDataChannel * channel)
error = channel->stored_error;
channel->stored_error = NULL;
+ GST_TRACE_OBJECT (channel, "transport closed, peer closed %u error %p "
+ "buffered %" G_GUINT64_FORMAT, channel->peer_closed, error,
+ channel->parent.buffered_amount);
+
both_sides_closed =
channel->peer_closed && channel->parent.buffered_amount <= 0;
if (both_sides_closed || error) {
@@ -314,7 +392,7 @@ _close_sctp_stream (WebRTCDataChannel * channel, gpointer user_data)
GST_INFO_OBJECT (channel, "Closing outgoing SCTP stream %i label \"%s\"",
channel->parent.id, channel->parent.label);
- pad = gst_element_get_static_pad (channel->appsrc, "src");
+ pad = gst_element_get_static_pad (channel->src_bin, "src");
peer = gst_pad_get_peer (pad);
gst_object_unref (pad);
@@ -322,6 +400,7 @@ _close_sctp_stream (WebRTCDataChannel * channel, gpointer user_data)
GstElement *sctpenc = gst_pad_get_parent_element (peer);
if (sctpenc) {
+ GST_TRACE_OBJECT (channel, "removing sctpenc pad %" GST_PTR_FORMAT, peer);
gst_element_release_request_pad (sctpenc, peer);
gst_object_unref (sctpenc);
}
@@ -484,6 +563,8 @@ _parse_control_packet (WebRTCDataChannel * channel, guint8 * data,
if (ret != GST_FLOW_OK) {
g_set_error (error, GST_WEBRTC_ERROR,
GST_WEBRTC_ERROR_DATA_CHANNEL_FAILURE, "Could not send ack packet");
+ GST_WARNING_OBJECT (channel, "push returned %i, %s", ret,
+ gst_flow_get_name (ret));
return ret;
}
@@ -800,6 +881,8 @@ webrtc_data_channel_send_data (GstWebRTCDataChannel * base_channel,
} else {
g_set_error (error, GST_WEBRTC_ERROR,
GST_WEBRTC_ERROR_DATA_CHANNEL_FAILURE, "Failed to send data");
+ GST_WARNING_OBJECT (channel, "push returned %i, %s", ret,
+ gst_flow_get_name (ret));
GST_WEBRTC_DATA_CHANNEL_LOCK (channel);
channel->parent.buffered_amount -= size;
@@ -1001,6 +1084,8 @@ gst_webrtc_data_channel_constructed (GObject * object)
channel->src_probe = gst_pad_add_probe (pad, GST_PAD_PROBE_TYPE_DATA_BOTH,
(GstPadProbeCallback) on_appsrc_data, channel, NULL);
+ channel->src_bin = webrtc_error_ignore_bin_new (channel, channel->appsrc);
+
channel->appsink = gst_element_factory_make ("appsink", NULL);
gst_object_ref_sink (channel->appsink);
g_object_set (channel->appsink, "sync", FALSE, "async", FALSE, "caps", caps,
@@ -1008,6 +1093,8 @@ gst_webrtc_data_channel_constructed (GObject * object)
gst_app_sink_set_callbacks (GST_APP_SINK (channel->appsink), &sink_callbacks,
channel, NULL);
+ channel->sink_bin = webrtc_error_ignore_bin_new (channel, channel->appsink);
+
gst_object_unref (pad);
gst_caps_unref (caps);
}
@@ -1078,7 +1165,7 @@ _data_channel_set_sctp_transport (WebRTCDataChannel * channel,
GST_WEBRTC_DATA_CHANNEL_LOCK (channel);
if (channel->sctp_transport)
g_signal_handlers_disconnect_by_data (channel->sctp_transport, channel);
- GST_TRACE ("%p set sctp %p", channel, sctp);
+ GST_TRACE_OBJECT (channel, "set sctp %p", sctp);
gst_object_replace ((GstObject **) & channel->sctp_transport,
GST_OBJECT (sctp));
@@ -1106,7 +1193,7 @@ webrtc_data_channel_link_to_sctp (WebRTCDataChannel * channel,
_data_channel_set_sctp_transport (channel, sctp_transport);
pad_name = g_strdup_printf ("sink_%u", id);
- if (!gst_element_link_pads (channel->appsrc, "src",
+ if (!gst_element_link_pads (channel->src_bin, "src",
channel->sctp_transport->sctpenc, pad_name))
g_warn_if_reached ();
g_free (pad_name);
diff --git a/subprojects/gst-plugins-bad/ext/webrtc/webrtcdatachannel.h b/subprojects/gst-plugins-bad/ext/webrtc/webrtcdatachannel.h
index a0b38a7ad2..dd65a66ae3 100644
--- a/subprojects/gst-plugins-bad/ext/webrtc/webrtcdatachannel.h
+++ b/subprojects/gst-plugins-bad/ext/webrtc/webrtcdatachannel.h
@@ -46,7 +46,9 @@ struct _WebRTCDataChannel
GstWebRTCDataChannel parent;
WebRTCSCTPTransport *sctp_transport;
+ GstElement *src_bin;
GstElement *appsrc;
+ GstElement *sink_bin;
GstElement *appsink;
GstWebRTCBin *webrtcbin;
@@ -70,6 +72,8 @@ G_GNUC_INTERNAL
void webrtc_data_channel_link_to_sctp (WebRTCDataChannel *channel,
WebRTCSCTPTransport *sctp_transport);
+G_DECLARE_FINAL_TYPE (WebRTCErrorIgnoreBin, webrtc_error_ignore_bin, WEBRTC, ERROR_IGNORE_BIN, GstBin);
+
G_END_DECLS
#endif /* __WEBRTC_DATA_CHANNEL_H__ */