Welcome to mirror list, hosted at ThFree Co, Russian Federation.

github.com/GStreamer/gst-plugins-good.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
path: root/gst
diff options
context:
space:
mode:
authorMathieu Duponchelle <mathieu@centricular.com>2020-10-06 04:13:30 +0300
committerGStreamer Merge Bot <gitlab-merge-bot@gstreamer-foundation.org>2020-10-09 01:22:18 +0300
commitcff42d4c26ddf268fd50973aa6d086bc18694768 (patch)
tree6622a3f65a61014ed299e366a1815bc02043c129 /gst
parent7c9a5e86fe3a504811aad9af7c86b64d7ea5a8d7 (diff)
rtpmanager: implement SMPTE 2022-1 FEC decoder
+ improve integration of FEC decoders in rtpbin Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-good/-/merge_requests/753>
Diffstat (limited to 'gst')
-rw-r--r--gst/rtpmanager/gstrtpbin.c301
-rw-r--r--gst/rtpmanager/gstrtpbin.h3
-rw-r--r--gst/rtpmanager/gstrtpmanager.c5
-rw-r--r--gst/rtpmanager/gstrtpst2022-1-fecdec.c995
-rw-r--r--gst/rtpmanager/gstrtpst2022-1-fecdec.h37
-rw-r--r--gst/rtpmanager/meson.build1
6 files changed, 1334 insertions, 8 deletions
diff --git a/gst/rtpmanager/gstrtpbin.c b/gst/rtpmanager/gstrtpbin.c
index 1fb98ffc7..444a66de5 100644
--- a/gst/rtpmanager/gstrtpbin.c
+++ b/gst/rtpmanager/gstrtpbin.c
@@ -165,6 +165,23 @@ static GstStaticPadTemplate rtpbin_recv_rtp_sink_template =
GST_STATIC_CAPS ("application/x-rtp;application/x-srtp")
);
+/**
+ * GstRtpBin!recv_fec_sink_%u_%u:
+ *
+ * Sink template for receiving Forward Error Correction packets,
+ * in the form recv_fec_sink_<session_idx>_<fec_stream_idx>
+ *
+ * See #GstRTPST_2022_1_FecDec for example usage
+ *
+ * Since: 1.20
+ */
+static GstStaticPadTemplate rtpbin_recv_fec_sink_template =
+GST_STATIC_PAD_TEMPLATE ("recv_fec_sink_%u_%u",
+ GST_PAD_SINK,
+ GST_PAD_REQUEST,
+ GST_STATIC_CAPS ("application/x-rtp")
+ );
+
static GstStaticPadTemplate rtpbin_recv_rtcp_sink_template =
GST_STATIC_PAD_TEMPLATE ("recv_rtcp_sink_%u",
GST_PAD_SINK,
@@ -344,6 +361,7 @@ enum
PROP_MAX_STREAMS,
PROP_MAX_TS_OFFSET_ADJUSTMENT,
PROP_MAX_TS_OFFSET,
+ PROP_FEC_DECODERS,
};
#define GST_RTP_BIN_RTCP_SYNC_TYPE (gst_rtp_bin_rtcp_sync_get_type())
@@ -377,6 +395,7 @@ static void payload_type_change (GstElement * element, guint pt,
GstRtpBinSession * session);
static void remove_recv_rtp (GstRtpBin * rtpbin, GstRtpBinSession * session);
static void remove_recv_rtcp (GstRtpBin * rtpbin, GstRtpBinSession * session);
+static void remove_recv_fec (GstRtpBin * rtpbin, GstRtpBinSession * session);
static void remove_send_rtp (GstRtpBin * rtpbin, GstRtpBinSession * session);
static void remove_rtcp (GstRtpBin * rtpbin, GstRtpBinSession * session);
static void free_client (GstRtpBinClient * client, GstRtpBin * bin);
@@ -486,6 +505,10 @@ struct _GstRtpBinSession
GstPad *send_rtp_src_ghost;
GstPad *send_rtcp_src;
GstPad *send_rtcp_src_ghost;
+
+ GSList *recv_fec_sinks;
+ GSList *recv_fec_sink_ghosts;
+ GstElement *fec_decoder;
};
/* Manages the RTP streams that come from one client and should therefore be
@@ -517,6 +540,12 @@ find_session_by_id (GstRtpBin * rtpbin, gint id)
return NULL;
}
+static gboolean
+pad_is_recv_fec (GstRtpBinSession * session, GstPad * pad)
+{
+ return g_slist_find (session->recv_fec_sink_ghosts, pad) != NULL;
+}
+
/* find a session with the given request pad. Must be called with RTP_BIN_LOCK */
static GstRtpBinSession *
find_session_by_pad (GstRtpBin * rtpbin, GstPad * pad)
@@ -528,8 +557,8 @@ find_session_by_pad (GstRtpBin * rtpbin, GstPad * pad)
if ((sess->recv_rtp_sink_ghost == pad) ||
(sess->recv_rtcp_sink_ghost == pad) ||
- (sess->send_rtp_sink_ghost == pad)
- || (sess->send_rtcp_src_ghost == pad))
+ (sess->send_rtp_sink_ghost == pad) ||
+ (sess->send_rtcp_src_ghost == pad) || pad_is_recv_fec (sess, pad))
return sess;
}
return NULL;
@@ -850,6 +879,7 @@ free_session (GstRtpBinSession * sess, GstRtpBin * bin)
remove_recv_rtp (bin, sess);
remove_recv_rtcp (bin, sess);
+ remove_recv_fec (bin, sess);
remove_send_rtp (bin, sess);
remove_rtcp (bin, sess);
@@ -2640,6 +2670,24 @@ gst_rtp_bin_class_init (GstRtpBinClass * klass)
"changed to 0 (no limit)", 0, G_MAXINT64, DEFAULT_MAX_TS_OFFSET,
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+ /**
+ * GstRtpBin:fec-decoders:
+ *
+ * Used to provide a factory used to build the FEC decoder for a
+ * given session, as a command line alternative to
+ * #GstRtpBin::request-fec-decoder.
+ *
+ * Expects a GstStructure in the form session_id (gint) -> factory (string)
+ *
+ * Since: 1.20
+ */
+ g_object_class_install_property (gobject_class, PROP_FEC_DECODERS,
+ g_param_spec_boxed ("fec-decoders", "Fec Decoders",
+ "GstStructure mapping from session index to FEC decoder "
+ "factory, eg "
+ "fec-decoders='fec,0=\"rtpst2022-1-fecdec\\ size-time\\=1000000000\";'",
+ GST_TYPE_STRUCTURE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+
gstelement_class->change_state = GST_DEBUG_FUNCPTR (gst_rtp_bin_change_state);
gstelement_class->request_new_pad =
GST_DEBUG_FUNCPTR (gst_rtp_bin_request_new_pad);
@@ -2649,6 +2697,8 @@ gst_rtp_bin_class_init (GstRtpBinClass * klass)
gst_element_class_add_static_pad_template (gstelement_class,
&rtpbin_recv_rtp_sink_template);
gst_element_class_add_static_pad_template (gstelement_class,
+ &rtpbin_recv_fec_sink_template);
+ gst_element_class_add_static_pad_template (gstelement_class,
&rtpbin_recv_rtcp_sink_template);
gst_element_class_add_static_pad_template (gstelement_class,
&rtpbin_send_rtp_sink_template);
@@ -2726,6 +2776,8 @@ gst_rtp_bin_init (GstRtpBin * rtpbin)
cname = g_strdup_printf ("user%u@host-%x", g_random_int (), g_random_int ());
rtpbin->sdes = gst_structure_new ("application/x-rtp-source-sdes",
"cname", G_TYPE_STRING, cname, "tool", G_TYPE_STRING, "GStreamer", NULL);
+ rtpbin->fec_decoders =
+ gst_structure_new_empty ("application/x-rtp-fec-decoders");
g_free (cname);
}
@@ -2756,6 +2808,9 @@ gst_rtp_bin_finalize (GObject * object)
if (rtpbin->sdes)
gst_structure_free (rtpbin->sdes);
+ if (rtpbin->fec_decoders)
+ gst_structure_free (rtpbin->fec_decoders);
+
g_mutex_clear (&rtpbin->priv->bin_lock);
g_mutex_clear (&rtpbin->priv->dyn_lock);
@@ -2788,6 +2843,25 @@ gst_rtp_bin_set_sdes_struct (GstRtpBin * bin, const GstStructure * sdes)
GST_RTP_BIN_UNLOCK (bin);
}
+static void
+gst_rtp_bin_set_fec_decoders_struct (GstRtpBin * bin,
+ const GstStructure * decoders)
+{
+ if (decoders == NULL)
+ return;
+
+ GST_RTP_BIN_LOCK (bin);
+
+ GST_OBJECT_LOCK (bin);
+ if (bin->fec_decoders)
+ gst_structure_free (bin->fec_decoders);
+ bin->fec_decoders = gst_structure_copy (decoders);
+
+ GST_OBJECT_UNLOCK (bin);
+
+ GST_RTP_BIN_UNLOCK (bin);
+}
+
static GstStructure *
gst_rtp_bin_get_sdes_struct (GstRtpBin * bin)
{
@@ -2800,6 +2874,18 @@ gst_rtp_bin_get_sdes_struct (GstRtpBin * bin)
return result;
}
+static GstStructure *
+gst_rtp_bin_get_fec_decoders_struct (GstRtpBin * bin)
+{
+ GstStructure *result;
+
+ GST_OBJECT_LOCK (bin);
+ result = gst_structure_copy (bin->fec_decoders);
+ GST_OBJECT_UNLOCK (bin);
+
+ return result;
+}
+
static void
gst_rtp_bin_set_property (GObject * object, guint prop_id,
const GValue * value, GParamSpec * pspec)
@@ -2963,6 +3049,9 @@ gst_rtp_bin_set_property (GObject * object, guint prop_id,
rtpbin->max_ts_offset = g_value_get_int64 (value);
rtpbin->max_ts_offset_is_set = TRUE;
break;
+ case PROP_FEC_DECODERS:
+ gst_rtp_bin_set_fec_decoders_struct (rtpbin, g_value_get_boxed (value));
+ break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break;
@@ -3057,6 +3146,9 @@ gst_rtp_bin_get_property (GObject * object, guint prop_id,
case PROP_MAX_TS_OFFSET:
g_value_set_int64 (value, rtpbin->max_ts_offset);
break;
+ case PROP_FEC_DECODERS:
+ g_value_take_boxed (value, gst_rtp_bin_get_fec_decoders_struct (rtpbin));
+ break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break;
@@ -3343,6 +3435,48 @@ copy_sticky_events (GstPad * pad, GstEvent ** event, gpointer user_data)
return TRUE;
}
+static gboolean
+ensure_fec_decoder (GstRtpBin * rtpbin, GstRtpBinSession * session)
+{
+ const gchar *factory;
+ gchar *sess_id_str;
+
+ if (session->fec_decoder)
+ goto done;
+
+ sess_id_str = g_strdup_printf ("%u", session->id);
+ factory = gst_structure_get_string (rtpbin->fec_decoders, sess_id_str);
+ g_free (sess_id_str);
+
+ /* First try the property */
+ if (factory) {
+ GError *err = NULL;
+
+ session->fec_decoder =
+ gst_parse_bin_from_description_full (factory, TRUE, NULL,
+ GST_PARSE_FLAG_NO_SINGLE_ELEMENT_BINS | GST_PARSE_FLAG_FATAL_ERRORS,
+ &err);
+ if (!session->fec_decoder) {
+ GST_ERROR_OBJECT (rtpbin, "Failed to build decoder from factory: %s",
+ err->message);
+ }
+
+ bin_manage_element (session->bin, session->fec_decoder);
+ session->elements =
+ g_slist_prepend (session->elements, session->fec_decoder);
+ GST_INFO_OBJECT (rtpbin, "Built FEC decoder: %" GST_PTR_FORMAT
+ " for session %u", session->fec_decoder, session->id);
+ }
+
+ /* Fallback to the signal */
+ if (!session->fec_decoder)
+ session->fec_decoder =
+ session_request_element (session, SIGNAL_REQUEST_FEC_DECODER);
+
+done:
+ return session->fec_decoder != NULL;
+}
+
static void
expose_recv_src_pad (GstRtpBin * rtpbin, GstPad * pad, GstRtpBinStream * stream,
guint8 pt)
@@ -3354,11 +3488,9 @@ expose_recv_src_pad (GstRtpBin * rtpbin, GstPad * pad, GstRtpBinStream * stream,
gst_object_ref (pad);
- if (stream->session->storage) {
- GstElement *fec_decoder =
- session_request_element (stream->session, SIGNAL_REQUEST_FEC_DECODER);
-
- if (fec_decoder) {
+ if (stream->session->storage && !stream->session->fec_decoder) {
+ if (ensure_fec_decoder (rtpbin, stream->session)) {
+ GstElement *fec_decoder = stream->session->fec_decoder;
GstPad *sinkpad, *srcpad;
GstPadLinkReturn ret;
@@ -3594,6 +3726,15 @@ new_ssrc_pad_found (GstElement * element, guint ssrc, GstPad * pad,
padname = g_strdup_printf ("src_%u", ssrc);
srcpad = gst_element_get_static_pad (element, padname);
g_free (padname);
+
+ if (session->fec_decoder) {
+ sinkpad = gst_element_get_static_pad (session->fec_decoder, "sink");
+ gst_pad_link_full (srcpad, sinkpad, GST_PAD_LINK_CHECK_NOTHING);
+ gst_object_unref (sinkpad);
+ gst_object_unref (srcpad);
+ srcpad = gst_element_get_static_pad (session->fec_decoder, "src");
+ }
+
sinkpad = gst_element_get_static_pad (stream->buffer, "sink");
gst_pad_link_full (srcpad, sinkpad, GST_PAD_LINK_CHECK_NOTHING);
gst_object_unref (sinkpad);
@@ -3934,6 +4075,41 @@ remove_recv_rtp (GstRtpBin * rtpbin, GstRtpBinSession * session)
}
static GstPad *
+complete_session_fec (GstRtpBin * rtpbin, GstRtpBinSession * session,
+ guint fec_idx)
+{
+ gchar *padname;
+ GstPad *ret;
+
+ if (!ensure_fec_decoder (rtpbin, session))
+ goto no_decoder;
+
+ GST_DEBUG_OBJECT (rtpbin, "getting FEC sink pad");
+ padname = g_strdup_printf ("fec_%u", fec_idx);
+ ret = gst_element_get_request_pad (session->fec_decoder, padname);
+ g_free (padname);
+
+ if (ret == NULL)
+ goto pad_failed;
+
+ session->recv_fec_sinks = g_slist_prepend (session->recv_fec_sinks, ret);
+
+ return ret;
+
+pad_failed:
+ {
+ g_warning ("rtpbin: failed to get decoder fec pad");
+ return NULL;
+ }
+no_decoder:
+ {
+ g_warning ("rtpbin: failed to build FEC decoder for session %u",
+ session->id);
+ return NULL;
+ }
+}
+
+static GstPad *
complete_session_rtcp (GstRtpBin * rtpbin, GstRtpBinSession * session,
guint sessid)
{
@@ -4076,6 +4252,66 @@ create_error:
}
}
+static GstPad *
+create_recv_fec (GstRtpBin * rtpbin, GstPadTemplate * templ, const gchar * name)
+{
+ guint sessid, fec_idx;
+ GstRtpBinSession *session;
+ GstPad *decsink = NULL;
+ GstPad *ghost;
+
+ /* first get the session number */
+ if (name == NULL
+ || sscanf (name, "recv_fec_sink_%u_%u", &sessid, &fec_idx) != 2)
+ goto no_name;
+
+ if (fec_idx > 1)
+ goto invalid_idx;
+
+ GST_DEBUG_OBJECT (rtpbin, "finding session %u", sessid);
+
+ /* get or create the session */
+ session = find_session_by_id (rtpbin, sessid);
+ if (!session) {
+ GST_DEBUG_OBJECT (rtpbin, "creating session %u", sessid);
+ /* create session now */
+ session = create_session (rtpbin, sessid);
+ if (session == NULL)
+ goto create_error;
+ }
+
+ decsink = complete_session_fec (rtpbin, session, fec_idx);
+ if (!decsink)
+ goto create_error;
+
+ ghost = gst_ghost_pad_new_from_template (name, decsink, templ);
+ session->recv_fec_sink_ghosts =
+ g_slist_prepend (session->recv_fec_sink_ghosts, ghost);
+ gst_object_unref (decsink);
+ gst_pad_set_active (ghost, TRUE);
+ gst_element_add_pad (GST_ELEMENT_CAST (rtpbin), ghost);
+
+ return ghost;
+
+ /* ERRORS */
+no_name:
+ {
+ g_warning ("rtpbin: cannot find session id for pad: %s",
+ GST_STR_NULL (name));
+ return NULL;
+ }
+invalid_idx:
+ {
+ g_warning ("rtpbin: invalid FEC index: %s", GST_STR_NULL (name));
+ return NULL;
+ }
+create_error:
+ {
+ /* create_session already warned */
+ return NULL;
+ }
+}
+
static void
remove_recv_rtcp (GstRtpBin * rtpbin, GstRtpBinSession * session)
{
@@ -4097,6 +4333,49 @@ remove_recv_rtcp (GstRtpBin * rtpbin, GstRtpBinSession * session)
}
}
+static void
+remove_recv_fec_for_pad (GstRtpBin * rtpbin, GstRtpBinSession * session,
+ GstPad * ghost)
+{
+ GSList *item;
+ GstPad *target;
+
+ target = gst_ghost_pad_get_target (GST_GHOST_PAD (ghost));
+
+ if (target) {
+ item = g_slist_find (session->recv_fec_sinks, target);
+ if (item) {
+ gst_element_release_request_pad (session->fec_decoder, item->data);
+ session->recv_fec_sinks =
+ g_slist_delete_link (session->recv_fec_sinks, item);
+ }
+ gst_object_unref (target);
+ }
+
+ item = g_slist_find (session->recv_fec_sink_ghosts, ghost);
+ if (item)
+ session->recv_fec_sink_ghosts =
+ g_slist_delete_link (session->recv_fec_sink_ghosts, item);
+
+ gst_pad_set_active (ghost, FALSE);
+ gst_element_remove_pad (GST_ELEMENT_CAST (rtpbin), ghost);
+}
+
+static void
+remove_recv_fec (GstRtpBin * rtpbin, GstRtpBinSession * session)
+{
+ GSList *copy;
+ GSList *tmp;
+
+ copy = g_slist_copy (session->recv_fec_sink_ghosts);
+
+ for (tmp = copy; tmp; tmp = tmp->next) {
+ remove_recv_fec_for_pad (rtpbin, session, (GstPad *) tmp->data);
+ }
+
+ g_slist_free (copy);
+}
+
static gboolean
complete_session_src (GstRtpBin * rtpbin, GstRtpBinSession * session)
{
@@ -4699,6 +4978,9 @@ gst_rtp_bin_request_new_pad (GstElement * element,
} else if (templ == gst_element_class_get_pad_template (klass,
"send_rtcp_src_%u")) {
result = create_send_rtcp (rtpbin, templ, pad_name);
+ } else if (templ == gst_element_class_get_pad_template (klass,
+ "recv_fec_sink_%u_%u")) {
+ result = create_recv_fec (rtpbin, templ, pad_name);
} else
goto wrong_template;
@@ -4743,13 +5025,16 @@ gst_rtp_bin_release_pad (GstElement * element, GstPad * pad)
remove_send_rtp (rtpbin, session);
} else if (session->send_rtcp_src_ghost == pad) {
remove_rtcp (rtpbin, session);
+ } else if (pad_is_recv_fec (session, pad)) {
+ remove_recv_fec_for_pad (rtpbin, session, pad);
}
/* no more request pads, free the complete session */
if (session->recv_rtp_sink_ghost == NULL
&& session->recv_rtcp_sink_ghost == NULL
&& session->send_rtp_sink_ghost == NULL
- && session->send_rtcp_src_ghost == NULL) {
+ && session->send_rtcp_src_ghost == NULL
+ && session->recv_fec_sink_ghosts == NULL) {
GST_DEBUG_OBJECT (rtpbin, "no more pads for session %p", session);
rtpbin->sessions = g_slist_remove (rtpbin->sessions, session);
free_session (session, rtpbin);
diff --git a/gst/rtpmanager/gstrtpbin.h b/gst/rtpmanager/gstrtpbin.h
index fcea7cec8..58de860a4 100644
--- a/gst/rtpmanager/gstrtpbin.h
+++ b/gst/rtpmanager/gstrtpbin.h
@@ -88,6 +88,9 @@ struct _GstRtpBin {
/* the default SDES items for sessions */
GstStructure *sdes;
+ /* the default FEC decoder factories for sessions */
+ GstStructure *fec_decoders;
+
/*< private >*/
GstRtpBinPrivate *priv;
};
diff --git a/gst/rtpmanager/gstrtpmanager.c b/gst/rtpmanager/gstrtpmanager.c
index 4ba624fba..91b6b653a 100644
--- a/gst/rtpmanager/gstrtpmanager.c
+++ b/gst/rtpmanager/gstrtpmanager.c
@@ -32,6 +32,7 @@
#include "gstrtpdtmfmux.h"
#include "gstrtpmux.h"
#include "gstrtpfunnel.h"
+#include "gstrtpst2022-1-fecdec.h"
static gboolean
plugin_init (GstPlugin * plugin)
@@ -74,6 +75,10 @@ plugin_init (GstPlugin * plugin)
GST_TYPE_RTP_FUNNEL))
return FALSE;
+ if (!gst_element_register (plugin, "rtpst2022-1-fecdec", GST_RANK_NONE,
+ GST_TYPE_RTPST_2022_1_FECDEC))
+ return FALSE;
+
return TRUE;
}
diff --git a/gst/rtpmanager/gstrtpst2022-1-fecdec.c b/gst/rtpmanager/gstrtpst2022-1-fecdec.c
new file mode 100644
index 000000000..7f45b53f8
--- /dev/null
+++ b/gst/rtpmanager/gstrtpst2022-1-fecdec.c
@@ -0,0 +1,995 @@
+/* GStreamer
+ * Copyright (C) <2020> Mathieu Duponchelle <mathieu@centricular.com>
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Library General Public
+ * License as published by the Free Software Foundation; either
+ * version 2 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Library General Public License for more details.
+ *
+ * You should have received a copy of the GNU Library General Public
+ * License along with this library; if not, write to the
+ * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
+ * Boston, MA 02110-1301, USA.
+ */
+
+/**
+ * SECTION:element-rtpst2022-1-fecdec
+ * @see_also: #element-rtpst2022-1-fecenc
+ *
+ * This element takes as input a media stream and up to two FEC
+ * streams as described in SMPTE 2022-1: Forward Error Correction
+ * for Real-Time Video/Audio Transport Over IP Networks, and makes
+ * use of the FEC packets to recover media packets that may have
+ * gotten lost.
+ *
+ * ## Design
+ *
+ * The approach picked for this element is to proactively reconstruct missing
+ * packets as soon as possible. When a FEC packet arrives, the element
+ * immediately checks whether a media packet in the row / column it protects
+ * can be reconstructed.
+ *
+ * Similarly, when a media packet comes in, the element checks whether it has
+ * already received a corresponding packet in both the column and row the packet
+ * belongs to, and if so goes through the first step listed above.
+ *
+ * This process is repeated recursively, allowing for recoveries over one
+ * dimension to unblock recoveries over the other.
+ *
+ * In perfect networking conditions, this incurs next to no overhead as FEC
+ * packets will arrive after the media packets, causing no reconstruction to
+ * take place, just a few checks upon chaining.
+ *
+ * ## sender / receiver example
+ *
+ * ``` shell
+ * gst-launch-1.0 \
+ * rtpbin name=rtp fec-encoders='fec,0="rtpst2022-1-fecenc\ rows\=5\ columns\=5";' \
+ * uridecodebin uri=file:///path/to/video/file ! x264enc key-int-max=60 tune=zerolatency ! \
+ * queue ! mpegtsmux ! rtpmp2tpay ssrc=0 ! rtp.send_rtp_sink_0 \
+ * rtp.send_rtp_src_0 ! udpsink host=127.0.0.1 port=5000 \
+ * rtp.send_fec_src_0_0 ! udpsink host=127.0.0.1 port=5002 async=false \
+ * rtp.send_fec_src_0_1 ! udpsink host=127.0.0.1 port=5004 async=false
+ * ```
+ *
+ * ``` shell
+ * gst-launch-1.0 \
+ * rtpbin latency=500 fec-decoders='fec,0="rtpst2022-1-fecdec\ size-time\=1000000000";' name=rtp \
+ * udpsrc address=127.0.0.1 port=5002 caps="application/x-rtp, payload=96" ! queue ! rtp.recv_fec_sink_0_0 \
+ * udpsrc address=127.0.0.1 port=5004 caps="application/x-rtp, payload=96" ! queue ! rtp.recv_fec_sink_0_1 \
+ * udpsrc address=127.0.0.1 port=5000 caps="application/x-rtp, media=video, clock-rate=90000, encoding-name=mp2t, payload=33" ! \
+ * queue ! netsim drop-probability=0.05 ! rtp.recv_rtp_sink_0 \
+ * rtp. ! decodebin ! videoconvert ! queue ! autovideosink
+ * ```
+ *
+ * With the above command line, as the media packet size is constant,
+ * the fec overhead can be approximated to the number of fec packets
+ * per 2-d matrix of media packet, here 10 fec packets for each 25
+ * media packets.
+ *
+ * Increasing the number of rows and columns will decrease the overhead,
+ * but obviously increase the likelihood of recovery failure for lost
+ * packets on the receiver side.
+ *
+ * Since: 1.20
+ */
+
+#ifdef HAVE_CONFIG_H
+#include "config.h"
+#endif
+
+#include <gst/base/base.h>
+#include <gst/rtp/gstrtpbuffer.h>
+
+#include "gstrtpst2022-1-fecdec.h"
+
+GST_DEBUG_CATEGORY_STATIC (gst_rtpst_2022_1_fecdec_debug);
+#define GST_CAT_DEFAULT gst_rtpst_2022_1_fecdec_debug
+
+#define DEFAULT_SIZE_TIME (GST_SECOND)
+
+typedef struct
+{
+ guint16 seq;
+ GstBuffer *buffer;
+} Item;
+
+static GstFlowReturn store_media_item (GstRTPST_2022_1_FecDec * dec,
+ GstRTPBuffer * rtp, Item * item);
+
+static void
+free_item (Item * item)
+{
+ gst_buffer_unref (item->buffer);
+ item->buffer = NULL;
+ g_free (item);
+}
+
+static gint
+cmp_items (Item * a, Item * b, gpointer unused)
+{
+ return gst_rtp_buffer_compare_seqnum (b->seq, a->seq);
+}
+
+enum
+{
+ PROP_0,
+ PROP_SIZE_TIME,
+};
+
+struct _GstRTPST_2022_1_FecDecClass
+{
+ GstElementClass class;
+};
+
+struct _GstRTPST_2022_1_FecDec
+{
+ GstElement element;
+
+ GstPad *srcpad;
+ GstPad *sinkpad;
+ GList *fec_sinkpads;
+
+ /* All the following field are protected by the OBJECT_LOCK */
+ GSequence *packets;
+ GHashTable *column_fec_packets;
+ GSequence *fec_packets[2];
+ /* N columns */
+ guint l;
+ /* N rows */
+ guint d;
+
+ GstClockTime size_time;
+ GstClockTime max_arrival_time;
+ GstClockTime max_fec_arrival_time[2];
+};
+
+#define RTP_CAPS "application/x-rtp"
+
+typedef struct
+{
+ guint16 seq;
+ guint16 len;
+ guint8 E;
+ guint8 pt;
+ guint32 mask;
+ guint32 timestamp;
+ guint8 N;
+ guint8 D;
+ guint8 type;
+ guint8 index;
+ guint8 offset;
+ guint8 NA;
+ guint8 seq_ext;
+ guint8 *payload;
+ guint payload_len;
+} Rtp2DFecHeader;
+
+static GstStaticPadTemplate fec_sink_template =
+GST_STATIC_PAD_TEMPLATE ("fec_%u",
+ GST_PAD_SINK,
+ GST_PAD_REQUEST,
+ GST_STATIC_CAPS (RTP_CAPS));
+
+static GstStaticPadTemplate sink_template = GST_STATIC_PAD_TEMPLATE ("sink",
+ GST_PAD_SINK,
+ GST_PAD_ALWAYS,
+ GST_STATIC_CAPS (RTP_CAPS));
+
+static GstStaticPadTemplate src_template = GST_STATIC_PAD_TEMPLATE ("src",
+ GST_PAD_SRC,
+ GST_PAD_ALWAYS,
+ GST_STATIC_CAPS (RTP_CAPS));
+
+#define gst_rtpst_2022_1_fecdec_parent_class parent_class
+G_DEFINE_TYPE (GstRTPST_2022_1_FecDec, gst_rtpst_2022_1_fecdec,
+ GST_TYPE_ELEMENT);
+
+static void
+trim_items (GstRTPST_2022_1_FecDec * dec)
+{
+ GSequenceIter *tmp_iter, *iter = NULL;
+
+ for (tmp_iter = g_sequence_get_begin_iter (dec->packets);
+ tmp_iter; tmp_iter = g_sequence_iter_next (tmp_iter)) {
+ Item *item;
+
+ if (g_sequence_iter_is_end (tmp_iter))
+ break;
+
+ item = g_sequence_get (tmp_iter);
+
+ if (dec->max_arrival_time - GST_BUFFER_DTS_OR_PTS (item->buffer) <
+ dec->size_time)
+ break;
+
+ iter = tmp_iter;
+ }
+
+ if (iter) {
+ Item *item = g_sequence_get (iter);
+ GST_TRACE_OBJECT (dec,
+ "Trimming packets up to %" GST_TIME_FORMAT " (seq: %u)",
+ GST_TIME_ARGS (GST_BUFFER_DTS_OR_PTS (item->buffer)), item->seq);
+ g_sequence_remove_range (g_sequence_get_begin_iter (dec->packets), iter);
+ }
+}
+
+static void
+trim_fec_items (GstRTPST_2022_1_FecDec * dec, guint D)
+{
+ GSequenceIter *tmp_iter, *iter = NULL;
+
+ for (tmp_iter = g_sequence_get_begin_iter (dec->fec_packets[D]);
+ tmp_iter; tmp_iter = g_sequence_iter_next (tmp_iter)) {
+ Item *item;
+
+ if (g_sequence_iter_is_end (tmp_iter))
+ break;
+
+ item = g_sequence_get (tmp_iter);
+
+ if (dec->max_fec_arrival_time[D] - GST_BUFFER_DTS_OR_PTS (item->buffer) <
+ dec->size_time)
+ break;
+
+ if (!D) {
+ guint i;
+ guint16 seq;
+
+ for (i = 0; i < dec->d; i++) {
+ seq = item->seq + i * dec->l;
+ g_hash_table_remove (dec->column_fec_packets, GUINT_TO_POINTER (seq));
+ }
+ }
+
+ iter = tmp_iter;
+ }
+
+ if (iter) {
+ Item *item = g_sequence_get (iter);
+ GST_TRACE_OBJECT (dec,
+ "Trimming %s FEC packets up to %" GST_TIME_FORMAT " (seq: %u)",
+ D ? "row" : "column",
+ GST_TIME_ARGS (GST_BUFFER_DTS_OR_PTS (item->buffer)), item->seq);
+ g_sequence_remove_range (g_sequence_get_begin_iter (dec->fec_packets[D]),
+ iter);
+ }
+}
+
+static Item *
+lookup_media_packet (GstRTPST_2022_1_FecDec * dec, guint16 seqnum)
+{
+ GSequenceIter *iter;
+ Item *ret = NULL;
+ Item dummy = { seqnum, NULL };
+
+ iter =
+ g_sequence_lookup (dec->packets, &dummy, (GCompareDataFunc) cmp_items,
+ NULL);
+
+ if (iter)
+ ret = g_sequence_get (iter);
+
+ return ret;
+}
+
+static gboolean
+parse_header (Rtp2DFecHeader * fec, guint8 * data, guint len)
+{
+ gboolean ret = FALSE;
+ GstBitReader bits;
+
+ if (len < 16)
+ goto done;
+
+ gst_bit_reader_init (&bits, data, len);
+
+ fec->seq = gst_bit_reader_get_bits_uint16_unchecked (&bits, 16);
+ fec->len = gst_bit_reader_get_bits_uint16_unchecked (&bits, 16);
+ fec->E = gst_bit_reader_get_bits_uint8_unchecked (&bits, 1);
+ fec->pt = gst_bit_reader_get_bits_uint8_unchecked (&bits, 7);
+ fec->mask = gst_bit_reader_get_bits_uint32_unchecked (&bits, 24);
+ fec->timestamp = gst_bit_reader_get_bits_uint32_unchecked (&bits, 32);
+ fec->N = gst_bit_reader_get_bits_uint8_unchecked (&bits, 1);
+ fec->D = gst_bit_reader_get_bits_uint8_unchecked (&bits, 1);
+ fec->type = gst_bit_reader_get_bits_uint8_unchecked (&bits, 3);
+ fec->index = gst_bit_reader_get_bits_uint8_unchecked (&bits, 3);
+ fec->offset = gst_bit_reader_get_bits_uint8_unchecked (&bits, 8);
+ fec->NA = gst_bit_reader_get_bits_uint8_unchecked (&bits, 8);
+ fec->seq_ext = gst_bit_reader_get_bits_uint8_unchecked (&bits, 8);
+ fec->payload = data + 16;
+ fec->payload_len = len - 16;
+
+ ret = TRUE;
+
+done:
+ return ret;
+}
+
+static Item *
+get_row_fec (GstRTPST_2022_1_FecDec * dec, guint16 seqnum)
+{
+ GSequenceIter *iter;
+ Item *ret = NULL;
+ Item dummy = { 0, };
+
+ if (dec->l == G_MAXUINT)
+ goto done;
+
+ /* Potential underflow is intended */
+ dummy.seq = seqnum - dec->l;
+
+ iter =
+ g_sequence_search (dec->fec_packets[1], &dummy,
+ (GCompareDataFunc) cmp_items, NULL);
+
+ if (!g_sequence_iter_is_end (iter)) {
+ gint seqdiff;
+ ret = g_sequence_get (iter);
+
+ seqdiff = gst_rtp_buffer_compare_seqnum (ret->seq, seqnum);
+
+ /* Now check whether the fec packet does apply */
+ if (seqdiff < 0 || seqdiff >= dec->l)
+ ret = NULL;
+ }
+
+done:
+ return ret;
+}
+
+static Item *
+get_column_fec (GstRTPST_2022_1_FecDec * dec, guint16 seqnum)
+{
+ Item *ret = NULL;
+
+ if (dec->l == G_MAXUINT || dec->d == G_MAXUINT)
+ goto done;
+
+ ret =
+ g_hash_table_lookup (dec->column_fec_packets, GUINT_TO_POINTER (seqnum));
+
+done:
+ return ret;
+}
+
+static void
+_xor_mem (guint8 * restrict dst, const guint8 * restrict src, gsize length)
+{
+ guint i;
+
+ for (i = 0; i < (length / sizeof (guint64)); ++i) {
+#if G_BYTE_ORDER == G_LITTLE_ENDIAN
+ GST_WRITE_UINT64_LE (dst,
+ GST_READ_UINT64_LE (dst) ^ GST_READ_UINT64_LE (src));
+#else
+ GST_WRITE_UINT64_BE (dst,
+ GST_READ_UINT64_BE (dst) ^ GST_READ_UINT64_BE (src));
+#endif
+ dst += sizeof (guint64);
+ src += sizeof (guint64);
+ }
+ for (i = 0; i < (length % sizeof (guint64)); ++i)
+ dst[i] ^= src[i];
+}
+
+static GstFlowReturn
+xor_items (GstRTPST_2022_1_FecDec * dec, Rtp2DFecHeader * fec, GList * packets,
+ guint16 seqnum)
+{
+ guint8 *xored;
+ guint32 xored_timestamp;
+ guint8 xored_pt;
+ guint16 xored_payload_len;
+ Item *item;
+ GstRTPBuffer rtp = GST_RTP_BUFFER_INIT;
+ GList *tmp;
+ GstFlowReturn ret = GST_FLOW_OK;
+ GstBuffer *buffer;
+
+ /* Figure out the recovered packet length first */
+ xored_payload_len = fec->len;
+ for (tmp = packets; tmp; tmp = tmp->next) {
+ GstRTPBuffer media_rtp = GST_RTP_BUFFER_INIT;
+ Item *item = (Item *) tmp->data;
+
+ gst_rtp_buffer_map (item->buffer, GST_MAP_READ, &media_rtp);
+ xored_payload_len ^= gst_rtp_buffer_get_payload_len (&media_rtp);
+ gst_rtp_buffer_unmap (&media_rtp);
+ }
+
+ if (xored_payload_len > fec->payload_len) {
+ GST_WARNING_OBJECT (dec, "FEC payload len %u < length recovery %u",
+ fec->payload_len, xored_payload_len);
+ goto done;
+ }
+
+ item = g_malloc0 (sizeof (Item));
+ item->seq = seqnum;
+ item->buffer = gst_rtp_buffer_new_allocate (xored_payload_len, 0, 0);
+ gst_rtp_buffer_map (item->buffer, GST_MAP_WRITE, &rtp);
+
+ xored = gst_rtp_buffer_get_payload (&rtp);
+ memcpy (xored, fec->payload, xored_payload_len);
+ xored_timestamp = fec->timestamp;
+ xored_pt = fec->pt;
+
+ for (tmp = packets; tmp; tmp = tmp->next) {
+ GstRTPBuffer media_rtp = GST_RTP_BUFFER_INIT;
+ Item *item = (Item *) tmp->data;
+
+ gst_rtp_buffer_map (item->buffer, GST_MAP_READ, &media_rtp);
+ _xor_mem (xored, gst_rtp_buffer_get_payload (&media_rtp),
+ gst_rtp_buffer_get_payload_len (&media_rtp));
+ xored_timestamp ^= gst_rtp_buffer_get_timestamp (&media_rtp);
+ xored_pt ^= gst_rtp_buffer_get_payload_type (&media_rtp);
+
+ gst_rtp_buffer_unmap (&media_rtp);
+ }
+
+ GST_DEBUG_OBJECT (dec,
+ "Recovered buffer through %s FEC with seqnum %u, payload len %u and timestamp %u",
+ fec->D ? "row" : "column", seqnum, xored_payload_len, xored_timestamp);
+
+ GST_BUFFER_DTS (item->buffer) = dec->max_arrival_time;
+
+ gst_rtp_buffer_set_timestamp (&rtp, xored_timestamp);
+ gst_rtp_buffer_set_seq (&rtp, seqnum);
+ gst_rtp_buffer_set_payload_type (&rtp, xored_pt);
+
+ gst_rtp_buffer_unmap (&rtp);
+
+ /* Store a ref on item->buffer as store_media_item may
+ * recurse and call this method again, potentially releasing
+ * the object lock and leaving our item unprotected in
+ * dec->packets
+ */
+ buffer = gst_buffer_ref (item->buffer);
+
+ /* It is right that we should celebrate,
+ * for your brother was dead, and is alive again */
+ gst_rtp_buffer_map (item->buffer, GST_MAP_READ, &rtp);
+ ret = store_media_item (dec, &rtp, item);
+ gst_rtp_buffer_unmap (&rtp);
+
+ if (ret == GST_FLOW_OK) {
+ /* Unlocking here is safe */
+ GST_OBJECT_UNLOCK (dec);
+ ret = gst_pad_push (dec->srcpad, buffer);
+ GST_OBJECT_LOCK (dec);
+ } else {
+ gst_buffer_unref (buffer);
+ }
+
+done:
+ return ret;
+}
+
+/* Returns a flow value if we should discard the packet, GST_FLOW_CUSTOM_SUCCESS otherwise */
+static GstFlowReturn
+check_fec (GstRTPST_2022_1_FecDec * dec, Rtp2DFecHeader * fec)
+{
+ GList *packets = NULL;
+ gint missing_seq = -1;
+ guint n_packets = 0;
+ guint required_n_packets;
+ GstFlowReturn ret = GST_FLOW_OK;
+
+ if (fec->D) {
+ guint i = 0;
+
+ required_n_packets = dec->l;
+
+ for (i = 0; i < dec->l; i++) {
+ Item *item = lookup_media_packet (dec, fec->seq + i);
+
+ if (item) {
+ packets = g_list_prepend (packets, item);
+ n_packets += 1;
+ } else {
+ missing_seq = fec->seq + i;
+ }
+ }
+ } else {
+ guint i = 0;
+
+ required_n_packets = dec->d;
+
+ for (i = 0; i < dec->d; i++) {
+ Item *item = lookup_media_packet (dec, fec->seq + i * dec->l);
+
+ if (item) {
+ packets = g_list_prepend (packets, item);
+ n_packets += 1;
+ } else {
+ missing_seq = fec->seq + i * dec->l;
+ }
+ }
+ }
+
+ if (n_packets == required_n_packets) {
+ g_assert (missing_seq == -1);
+ GST_LOG_OBJECT (dec,
+ "All media packets present, we can discard that FEC packet");
+ } else if (n_packets + 1 == required_n_packets) {
+ g_assert (missing_seq != -1);
+ ret = xor_items (dec, fec, packets, missing_seq);
+ GST_LOG_OBJECT (dec, "We have enough info to reconstruct %u", missing_seq);
+ } else {
+ ret = GST_FLOW_CUSTOM_SUCCESS;
+ GST_LOG_OBJECT (dec, "Too many media packets missing, storing FEC packet");
+ }
+ g_list_free (packets);
+
+ return ret;
+}
+
+static GstFlowReturn
+check_fec_item (GstRTPST_2022_1_FecDec * dec, Item * item)
+{
+ Rtp2DFecHeader fec;
+ GstRTPBuffer rtp = GST_RTP_BUFFER_INIT;
+ guint payload_len;
+ guint8 *payload;
+ GstFlowReturn ret;
+
+ gst_rtp_buffer_map (item->buffer, GST_MAP_READ, &rtp);
+
+ payload_len = gst_rtp_buffer_get_payload_len (&rtp);
+ payload = gst_rtp_buffer_get_payload (&rtp);
+
+ parse_header (&fec, payload, payload_len);
+
+ ret = check_fec (dec, &fec);
+
+ gst_rtp_buffer_unmap (&rtp);
+
+ return ret;
+}
+
+static GstFlowReturn
+store_media_item (GstRTPST_2022_1_FecDec * dec, GstRTPBuffer * rtp, Item * item)
+{
+ GstFlowReturn ret = GST_FLOW_OK;
+ Item *fec_item;
+ guint16 seq;
+
+ seq = gst_rtp_buffer_get_seq (rtp);
+
+ g_sequence_insert_sorted (dec->packets, item, (GCompareDataFunc) cmp_items,
+ NULL);
+
+ if ((fec_item = get_row_fec (dec, seq))) {
+ ret = check_fec_item (dec, fec_item);
+ if (ret == GST_FLOW_CUSTOM_SUCCESS)
+ ret = GST_FLOW_OK;
+ }
+
+ if (ret == GST_FLOW_OK && (fec_item = get_column_fec (dec, seq))) {
+ ret = check_fec_item (dec, fec_item);
+ if (ret == GST_FLOW_CUSTOM_SUCCESS)
+ ret = GST_FLOW_OK;
+ }
+
+ return ret;
+}
+
+static GstFlowReturn
+store_media (GstRTPST_2022_1_FecDec * dec, GstRTPBuffer * rtp,
+ GstBuffer * buffer)
+{
+ Item *item;
+ guint16 seq;
+
+ seq = gst_rtp_buffer_get_seq (rtp);
+ item = g_malloc0 (sizeof (Item));
+ item->buffer = gst_buffer_ref (buffer);
+ item->seq = seq;
+
+ return store_media_item (dec, rtp, item);
+}
+
+static GstFlowReturn
+gst_rtpst_2022_1_fecdec_sink_chain_fec (GstPad * pad, GstObject * parent,
+ GstBuffer * buffer)
+{
+ GstRTPST_2022_1_FecDec *dec = GST_RTPST_2022_1_FECDEC_CAST (parent);
+ Rtp2DFecHeader fec = { 0, };
+ guint payload_len;
+ guint8 *payload;
+ GstFlowReturn ret = GST_FLOW_OK;
+ Item *item;
+ GstRTPBuffer rtp = GST_RTP_BUFFER_INIT;
+
+ GST_OBJECT_LOCK (dec);
+
+ if (!gst_rtp_buffer_map (buffer, GST_MAP_READ, &rtp)) {
+ GST_WARNING_OBJECT (pad, "Chained FEC buffer isn't valid RTP");
+ goto discard;
+ }
+
+ payload_len = gst_rtp_buffer_get_payload_len (&rtp);
+ payload = gst_rtp_buffer_get_payload (&rtp);
+
+ if (!parse_header (&fec, payload, payload_len)) {
+ GST_WARNING_OBJECT (pad, "Failed to parse FEC header (payload len: %d)",
+ payload_len);
+ GST_MEMDUMP_OBJECT (pad, "Invalid payload", payload, payload_len);
+ goto discard;
+ }
+
+ GST_TRACE_OBJECT
+ (pad,
+ "Handling FEC buffer with SNBase / N / D / NA / offset %u / %u / %u / %u / %u",
+ fec.seq, fec.N, fec.D, fec.NA, fec.offset);
+
+ if (fec.D) {
+ if (dec->l == G_MAXUINT) {
+ dec->l = fec.NA;
+ } else if (fec.NA != dec->l) {
+ GST_WARNING_OBJECT (dec, "2D FEC dimensionality cannot change");
+ goto discard;
+ }
+
+ if (fec.offset != 1) {
+ GST_WARNING_OBJECT (pad, "offset must be 1 for row FEC packets");
+ goto discard;
+ }
+ } else {
+ if (dec->d == G_MAXUINT) {
+ dec->d = fec.NA;
+ } else if (fec.NA != dec->d) {
+ GST_WARNING_OBJECT (dec, "2D FEC dimensionality cannot change");
+ goto discard;
+ }
+
+ if (dec->l == G_MAXUINT) {
+ dec->l = fec.offset;
+ } else if (fec.offset != dec->l) {
+ GST_WARNING_OBJECT (dec, "2D FEC dimensionality cannot change");
+ goto discard;
+ }
+ }
+
+ dec->max_fec_arrival_time[fec.D] = GST_BUFFER_DTS_OR_PTS (buffer);
+ trim_fec_items (dec, fec.D);
+
+ ret = check_fec (dec, &fec);
+
+ if (ret == GST_FLOW_CUSTOM_SUCCESS) {
+ item = g_malloc0 (sizeof (Item));
+ item->buffer = buffer;
+ item->seq = fec.seq;
+
+ if (!fec.D) {
+ guint i;
+ guint16 seq;
+
+ for (i = 0; i < dec->d; i++) {
+ seq = fec.seq + i * dec->l;
+ g_hash_table_insert (dec->column_fec_packets, GUINT_TO_POINTER (seq),
+ item);
+ }
+ }
+ g_sequence_insert_sorted (dec->fec_packets[fec.D], item,
+ (GCompareDataFunc) cmp_items, NULL);
+ ret = GST_FLOW_OK;
+ } else {
+ goto discard;
+ }
+
+ gst_rtp_buffer_unmap (&rtp);
+
+done:
+ GST_OBJECT_UNLOCK (dec);
+ return ret;
+
+discard:
+ if (rtp.buffer != NULL)
+ gst_rtp_buffer_unmap (&rtp);
+
+ gst_buffer_unref (buffer);
+
+ goto done;
+}
+
+static GstFlowReturn
+gst_rtpst_2022_1_fecdec_sink_chain (GstPad * pad, GstObject * parent,
+ GstBuffer * buffer)
+{
+ GstRTPST_2022_1_FecDec *dec = GST_RTPST_2022_1_FECDEC_CAST (parent);
+ GstFlowReturn ret = GST_FLOW_OK;
+ GstRTPBuffer rtp = GST_RTP_BUFFER_INIT;
+
+ if (!gst_rtp_buffer_map (buffer, GST_MAP_READ, &rtp)) {
+ GST_WARNING_OBJECT (pad, "Chained buffer isn't valid RTP");
+ goto error;
+ }
+
+ GST_OBJECT_LOCK (dec);
+ dec->max_arrival_time =
+ MAX (dec->max_arrival_time, GST_BUFFER_DTS_OR_PTS (buffer));
+ trim_items (dec);
+ ret = store_media (dec, &rtp, buffer);
+ GST_OBJECT_UNLOCK (dec);
+
+ gst_rtp_buffer_unmap (&rtp);
+
+ if (ret == GST_FLOW_OK)
+ ret = gst_pad_push (dec->srcpad, buffer);
+
+done:
+ return ret;
+
+error:
+ gst_buffer_unref (buffer);
+ goto done;
+}
+
+static gboolean
+gst_rtpst_2022_1_fecdec_src_event (GstPad * pad, GstObject * parent,
+ GstEvent * event)
+{
+ gboolean handled = FALSE;
+ gboolean ret = TRUE;
+
+ if (!handled) {
+ gst_pad_event_default (pad, parent, event);
+ }
+
+ return ret;
+}
+
+/* Takes the object lock */
+static void
+gst_rtpst_2022_1_fecdec_reset (GstRTPST_2022_1_FecDec * dec, gboolean allocate)
+{
+ guint i;
+
+ GST_OBJECT_LOCK (dec);
+
+ if (dec->packets) {
+ g_sequence_free (dec->packets);
+ dec->packets = NULL;
+ }
+
+ if (dec->column_fec_packets) {
+ g_hash_table_unref (dec->column_fec_packets);
+ dec->column_fec_packets = NULL;
+ }
+
+ if (allocate) {
+ dec->packets = g_sequence_new ((GDestroyNotify) free_item);
+ dec->column_fec_packets = g_hash_table_new (g_direct_hash, g_direct_equal);
+ }
+
+ for (i = 0; i < 2; i++) {
+ if (dec->fec_packets[i]) {
+ g_sequence_free (dec->fec_packets[i]);
+ dec->fec_packets[i] = NULL;
+ }
+
+ if (allocate)
+ dec->fec_packets[i] = g_sequence_new ((GDestroyNotify) free_item);
+ }
+
+ dec->d = G_MAXUINT;
+ dec->l = G_MAXUINT;
+
+ GST_OBJECT_UNLOCK (dec);
+}
+
+static GstStateChangeReturn
+gst_rtpst_2022_1_fecdec_change_state (GstElement * element,
+ GstStateChange transition)
+{
+ GstStateChangeReturn ret;
+ GstRTPST_2022_1_FecDec *dec = GST_RTPST_2022_1_FECDEC_CAST (element);
+
+ switch (transition) {
+ case GST_STATE_CHANGE_READY_TO_PAUSED:
+ gst_rtpst_2022_1_fecdec_reset (dec, TRUE);
+ break;
+ case GST_STATE_CHANGE_PAUSED_TO_READY:
+ gst_rtpst_2022_1_fecdec_reset (dec, FALSE);
+ break;
+ default:
+ break;
+ }
+
+ ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
+
+ return ret;
+}
+
+static void
+gst_rtpst_2022_1_fecdec_finalize (GObject * object)
+{
+ GstRTPST_2022_1_FecDec *dec = GST_RTPST_2022_1_FECDEC_CAST (object);
+
+ gst_rtpst_2022_1_fecdec_reset (dec, FALSE);
+
+ G_OBJECT_CLASS (parent_class)->finalize (object);
+}
+
+static void
+gst_rtpst_2022_1_fecdec_set_property (GObject * object, guint prop_id,
+ const GValue * value, GParamSpec * pspec)
+{
+ GstRTPST_2022_1_FecDec *dec = GST_RTPST_2022_1_FECDEC_CAST (object);
+
+ switch (prop_id) {
+ case PROP_SIZE_TIME:
+ dec->size_time = g_value_get_uint64 (value);
+ break;
+ default:
+ G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
+ break;
+ }
+}
+
+static void
+gst_rtpst_2022_1_fecdec_get_property (GObject * object, guint prop_id,
+ GValue * value, GParamSpec * pspec)
+{
+ GstRTPST_2022_1_FecDec *dec = GST_RTPST_2022_1_FECDEC_CAST (object);
+
+ switch (prop_id) {
+ case PROP_SIZE_TIME:
+ g_value_set_uint64 (value, dec->size_time);
+ break;
+ default:
+ G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
+ break;
+ }
+}
+
+static gboolean
+gst_2d_fec_sink_event (GstPad * pad, GstObject * parent, GstEvent * event)
+{
+ GstRTPST_2022_1_FecDec *dec = GST_RTPST_2022_1_FECDEC_CAST (parent);
+ gboolean ret;
+
+ if (GST_EVENT_TYPE (event) == GST_EVENT_FLUSH_STOP)
+ gst_rtpst_2022_1_fecdec_reset (dec, TRUE);
+
+ ret = gst_pad_event_default (pad, parent, event);
+
+ return ret;
+}
+
+static GstIterator *
+gst_rtpst_2022_1_fecdec_iterate_linked_pads (GstPad * pad, GstObject * parent)
+{
+ GstRTPST_2022_1_FecDec *dec = GST_RTPST_2022_1_FECDEC_CAST (parent);
+ GstPad *otherpad = NULL;
+ GstIterator *it = NULL;
+ GValue val = { 0, };
+
+ if (pad == dec->srcpad)
+ otherpad = dec->sinkpad;
+ else if (pad == dec->sinkpad)
+ otherpad = dec->srcpad;
+
+ if (otherpad) {
+ g_value_init (&val, GST_TYPE_PAD);
+ g_value_set_object (&val, otherpad);
+ it = gst_iterator_new_single (GST_TYPE_PAD, &val);
+ g_value_unset (&val);
+ }
+
+ return it;
+}
+
+static GstPad *
+gst_rtpst_2022_1_fecdec_request_new_pad (GstElement * element,
+ GstPadTemplate * templ, const gchar * name, const GstCaps * caps)
+{
+ GstRTPST_2022_1_FecDec *dec = GST_RTPST_2022_1_FECDEC_CAST (element);
+ GstPad *sinkpad = NULL;
+
+ GST_DEBUG_OBJECT (element, "requesting pad");
+
+ if (g_list_length (dec->fec_sinkpads) > 1) {
+ GST_ERROR_OBJECT (dec, "not accepting more than two fec streams");
+ goto done;
+ }
+
+ sinkpad = gst_pad_new_from_template (templ, name);
+ gst_pad_set_chain_function (sinkpad, gst_rtpst_2022_1_fecdec_sink_chain_fec);
+ gst_element_add_pad (GST_ELEMENT (dec), sinkpad);
+ gst_pad_set_iterate_internal_links_function (sinkpad,
+ GST_DEBUG_FUNCPTR (gst_rtpst_2022_1_fecdec_iterate_linked_pads));
+
+ gst_pad_set_active (sinkpad, TRUE);
+
+ GST_DEBUG_OBJECT (element, "requested pad %s:%s",
+ GST_DEBUG_PAD_NAME (sinkpad));
+
+done:
+ return sinkpad;
+}
+
+static void
+gst_rtpst_2022_1_fecdec_release_pad (GstElement * element, GstPad * pad)
+{
+ GstRTPST_2022_1_FecDec *dec = GST_RTPST_2022_1_FECDEC_CAST (element);
+
+ GST_DEBUG_OBJECT (element, "releasing pad %s:%s", GST_DEBUG_PAD_NAME (pad));
+
+ dec->fec_sinkpads = g_list_remove (dec->fec_sinkpads, pad);
+
+ gst_pad_set_active (pad, FALSE);
+ gst_element_remove_pad (GST_ELEMENT_CAST (dec), pad);
+}
+
+static void
+gst_rtpst_2022_1_fecdec_class_init (GstRTPST_2022_1_FecDecClass * klass)
+{
+ GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
+ GstElementClass *gstelement_class = GST_ELEMENT_CLASS (klass);
+
+ gobject_class->set_property =
+ GST_DEBUG_FUNCPTR (gst_rtpst_2022_1_fecdec_set_property);
+ gobject_class->get_property =
+ GST_DEBUG_FUNCPTR (gst_rtpst_2022_1_fecdec_get_property);
+ gobject_class->finalize =
+ GST_DEBUG_FUNCPTR (gst_rtpst_2022_1_fecdec_finalize);
+
+ g_object_class_install_property (gobject_class, PROP_SIZE_TIME,
+ g_param_spec_uint64 ("size-time", "Storage size (in ns)",
+ "The amount of data to store (in ns, 0-disable)", 0,
+ G_MAXUINT64, DEFAULT_SIZE_TIME,
+ G_PARAM_READWRITE | G_PARAM_CONSTRUCT | G_PARAM_STATIC_STRINGS));
+
+ gstelement_class->change_state =
+ GST_DEBUG_FUNCPTR (gst_rtpst_2022_1_fecdec_change_state);
+ gstelement_class->request_new_pad =
+ GST_DEBUG_FUNCPTR (gst_rtpst_2022_1_fecdec_request_new_pad);
+ gstelement_class->release_pad =
+ GST_DEBUG_FUNCPTR (gst_rtpst_2022_1_fecdec_release_pad);
+
+ gst_element_class_set_static_metadata (gstelement_class,
+ "SMPTE 2022-1 FEC decoder", "SMPTE 2022-1 FEC decoding",
+ "performs FEC as described by SMPTE 2022-1",
+ "Mathieu Duponchelle <mathieu@centricular.com>");
+
+ gst_element_class_add_static_pad_template (gstelement_class, &sink_template);
+ gst_element_class_add_static_pad_template (gstelement_class,
+ &fec_sink_template);
+ gst_element_class_add_static_pad_template (gstelement_class, &src_template);
+
+ GST_DEBUG_CATEGORY_INIT (gst_rtpst_2022_1_fecdec_debug,
+ "rtpst2022-1-fecdec", 0, "SMPTE 2022-1 FEC decoder element");
+}
+
+static void
+gst_rtpst_2022_1_fecdec_init (GstRTPST_2022_1_FecDec * dec)
+{
+ dec->srcpad = gst_pad_new_from_static_template (&src_template, "src");
+ GST_PAD_SET_PROXY_CAPS (dec->srcpad);
+ gst_pad_use_fixed_caps (dec->srcpad);
+ gst_pad_set_event_function (dec->srcpad,
+ GST_DEBUG_FUNCPTR (gst_rtpst_2022_1_fecdec_src_event));
+ gst_pad_set_iterate_internal_links_function (dec->srcpad,
+ GST_DEBUG_FUNCPTR (gst_rtpst_2022_1_fecdec_iterate_linked_pads));
+ gst_element_add_pad (GST_ELEMENT (dec), dec->srcpad);
+
+ dec->sinkpad = gst_pad_new_from_static_template (&sink_template, "sink");
+ GST_PAD_SET_PROXY_CAPS (dec->sinkpad);
+ gst_pad_set_chain_function (dec->sinkpad, gst_rtpst_2022_1_fecdec_sink_chain);
+ gst_pad_set_event_function (dec->sinkpad,
+ GST_DEBUG_FUNCPTR (gst_2d_fec_sink_event));
+ gst_pad_set_iterate_internal_links_function (dec->sinkpad,
+ GST_DEBUG_FUNCPTR (gst_rtpst_2022_1_fecdec_iterate_linked_pads));
+ gst_element_add_pad (GST_ELEMENT (dec), dec->sinkpad);
+
+ dec->d = G_MAXUINT;
+ dec->l = G_MAXUINT;
+}
diff --git a/gst/rtpmanager/gstrtpst2022-1-fecdec.h b/gst/rtpmanager/gstrtpst2022-1-fecdec.h
new file mode 100644
index 000000000..104b1578e
--- /dev/null
+++ b/gst/rtpmanager/gstrtpst2022-1-fecdec.h
@@ -0,0 +1,37 @@
+/* GStreamer
+ * Copyright (C) <2020> Mathieu Duponchelle <mathieu@centricular.com>
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Library General Public
+ * License as published by the Free Software Foundation; either
+ * version 2 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Library General Public License for more details.
+ *
+ * You should have received a copy of the GNU Library General Public
+ * License along with this library; if not, write to the
+ * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
+ * Boston, MA 02110-1301, USA.
+ */
+
+#ifndef __GST_RTPST_2022_1_FECDEC_H__
+#define __GST_RTPST_2022_1_FECDEC_H__
+
+#include <gst/gst.h>
+
+G_BEGIN_DECLS
+
+typedef struct _GstRTPST_2022_1_FecDecClass GstRTPST_2022_1_FecDecClass;
+typedef struct _GstRTPST_2022_1_FecDec GstRTPST_2022_1_FecDec;
+
+#define GST_TYPE_RTPST_2022_1_FECDEC (gst_rtpst_2022_1_fecdec_get_type())
+#define GST_RTPST_2022_1_FECDEC_CAST(obj) ((GstRTPST_2022_1_FecDec *)(obj))
+
+GType gst_rtpst_2022_1_fecdec_get_type (void);
+
+G_END_DECLS
+
+#endif /* __GST_RTPST_2022_1_FECDEC_H__ */
diff --git a/gst/rtpmanager/meson.build b/gst/rtpmanager/meson.build
index 118a1e1ea..5cb6084cc 100644
--- a/gst/rtpmanager/meson.build
+++ b/gst/rtpmanager/meson.build
@@ -17,6 +17,7 @@ rtpmanager_sources = [
'rtptwcc.c',
'gstrtpsession.c',
'gstrtpfunnel.c',
+ 'gstrtpst2022-1-fecdec.c'
]
gstrtpmanager = library('gstrtpmanager',