diff options
author | Mathieu Duponchelle <mathieu@centricular.com> | 2020-10-06 04:13:30 +0300 |
---|---|---|
committer | GStreamer Merge Bot <gitlab-merge-bot@gstreamer-foundation.org> | 2020-10-09 01:22:18 +0300 |
commit | cff42d4c26ddf268fd50973aa6d086bc18694768 (patch) | |
tree | 6622a3f65a61014ed299e366a1815bc02043c129 /gst | |
parent | 7c9a5e86fe3a504811aad9af7c86b64d7ea5a8d7 (diff) |
rtpmanager: implement SMPTE 2022-1 FEC decoder
+ improve integration of FEC decoders in rtpbin
Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-good/-/merge_requests/753>
Diffstat (limited to 'gst')
-rw-r--r-- | gst/rtpmanager/gstrtpbin.c | 301 | ||||
-rw-r--r-- | gst/rtpmanager/gstrtpbin.h | 3 | ||||
-rw-r--r-- | gst/rtpmanager/gstrtpmanager.c | 5 | ||||
-rw-r--r-- | gst/rtpmanager/gstrtpst2022-1-fecdec.c | 995 | ||||
-rw-r--r-- | gst/rtpmanager/gstrtpst2022-1-fecdec.h | 37 | ||||
-rw-r--r-- | gst/rtpmanager/meson.build | 1 |
6 files changed, 1334 insertions, 8 deletions
diff --git a/gst/rtpmanager/gstrtpbin.c b/gst/rtpmanager/gstrtpbin.c index 1fb98ffc7..444a66de5 100644 --- a/gst/rtpmanager/gstrtpbin.c +++ b/gst/rtpmanager/gstrtpbin.c @@ -165,6 +165,23 @@ static GstStaticPadTemplate rtpbin_recv_rtp_sink_template = GST_STATIC_CAPS ("application/x-rtp;application/x-srtp") ); +/** + * GstRtpBin!recv_fec_sink_%u_%u: + * + * Sink template for receiving Forward Error Correction packets, + * in the form recv_fec_sink_<session_idx>_<fec_stream_idx> + * + * See #GstRTPST_2022_1_FecDec for example usage + * + * Since: 1.20 + */ +static GstStaticPadTemplate rtpbin_recv_fec_sink_template = +GST_STATIC_PAD_TEMPLATE ("recv_fec_sink_%u_%u", + GST_PAD_SINK, + GST_PAD_REQUEST, + GST_STATIC_CAPS ("application/x-rtp") + ); + static GstStaticPadTemplate rtpbin_recv_rtcp_sink_template = GST_STATIC_PAD_TEMPLATE ("recv_rtcp_sink_%u", GST_PAD_SINK, @@ -344,6 +361,7 @@ enum PROP_MAX_STREAMS, PROP_MAX_TS_OFFSET_ADJUSTMENT, PROP_MAX_TS_OFFSET, + PROP_FEC_DECODERS, }; #define GST_RTP_BIN_RTCP_SYNC_TYPE (gst_rtp_bin_rtcp_sync_get_type()) @@ -377,6 +395,7 @@ static void payload_type_change (GstElement * element, guint pt, GstRtpBinSession * session); static void remove_recv_rtp (GstRtpBin * rtpbin, GstRtpBinSession * session); static void remove_recv_rtcp (GstRtpBin * rtpbin, GstRtpBinSession * session); +static void remove_recv_fec (GstRtpBin * rtpbin, GstRtpBinSession * session); static void remove_send_rtp (GstRtpBin * rtpbin, GstRtpBinSession * session); static void remove_rtcp (GstRtpBin * rtpbin, GstRtpBinSession * session); static void free_client (GstRtpBinClient * client, GstRtpBin * bin); @@ -486,6 +505,10 @@ struct _GstRtpBinSession GstPad *send_rtp_src_ghost; GstPad *send_rtcp_src; GstPad *send_rtcp_src_ghost; + + GSList *recv_fec_sinks; + GSList *recv_fec_sink_ghosts; + GstElement *fec_decoder; }; /* Manages the RTP streams that come from one client and should therefore be @@ -517,6 +540,12 @@ find_session_by_id (GstRtpBin * rtpbin, gint id) return NULL; } +static gboolean +pad_is_recv_fec (GstRtpBinSession * session, GstPad * pad) +{ + return g_slist_find (session->recv_fec_sink_ghosts, pad) != NULL; +} + /* find a session with the given request pad. Must be called with RTP_BIN_LOCK */ static GstRtpBinSession * find_session_by_pad (GstRtpBin * rtpbin, GstPad * pad) @@ -528,8 +557,8 @@ find_session_by_pad (GstRtpBin * rtpbin, GstPad * pad) if ((sess->recv_rtp_sink_ghost == pad) || (sess->recv_rtcp_sink_ghost == pad) || - (sess->send_rtp_sink_ghost == pad) - || (sess->send_rtcp_src_ghost == pad)) + (sess->send_rtp_sink_ghost == pad) || + (sess->send_rtcp_src_ghost == pad) || pad_is_recv_fec (sess, pad)) return sess; } return NULL; @@ -850,6 +879,7 @@ free_session (GstRtpBinSession * sess, GstRtpBin * bin) remove_recv_rtp (bin, sess); remove_recv_rtcp (bin, sess); + remove_recv_fec (bin, sess); remove_send_rtp (bin, sess); remove_rtcp (bin, sess); @@ -2640,6 +2670,24 @@ gst_rtp_bin_class_init (GstRtpBinClass * klass) "changed to 0 (no limit)", 0, G_MAXINT64, DEFAULT_MAX_TS_OFFSET, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + /** + * GstRtpBin:fec-decoders: + * + * Used to provide a factory used to build the FEC decoder for a + * given session, as a command line alternative to + * #GstRtpBin::request-fec-decoder. + * + * Expects a GstStructure in the form session_id (gint) -> factory (string) + * + * Since: 1.20 + */ + g_object_class_install_property (gobject_class, PROP_FEC_DECODERS, + g_param_spec_boxed ("fec-decoders", "Fec Decoders", + "GstStructure mapping from session index to FEC decoder " + "factory, eg " + "fec-decoders='fec,0=\"rtpst2022-1-fecdec\\ size-time\\=1000000000\";'", + GST_TYPE_STRUCTURE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + gstelement_class->change_state = GST_DEBUG_FUNCPTR (gst_rtp_bin_change_state); gstelement_class->request_new_pad = GST_DEBUG_FUNCPTR (gst_rtp_bin_request_new_pad); @@ -2649,6 +2697,8 @@ gst_rtp_bin_class_init (GstRtpBinClass * klass) gst_element_class_add_static_pad_template (gstelement_class, &rtpbin_recv_rtp_sink_template); gst_element_class_add_static_pad_template (gstelement_class, + &rtpbin_recv_fec_sink_template); + gst_element_class_add_static_pad_template (gstelement_class, &rtpbin_recv_rtcp_sink_template); gst_element_class_add_static_pad_template (gstelement_class, &rtpbin_send_rtp_sink_template); @@ -2726,6 +2776,8 @@ gst_rtp_bin_init (GstRtpBin * rtpbin) cname = g_strdup_printf ("user%u@host-%x", g_random_int (), g_random_int ()); rtpbin->sdes = gst_structure_new ("application/x-rtp-source-sdes", "cname", G_TYPE_STRING, cname, "tool", G_TYPE_STRING, "GStreamer", NULL); + rtpbin->fec_decoders = + gst_structure_new_empty ("application/x-rtp-fec-decoders"); g_free (cname); } @@ -2756,6 +2808,9 @@ gst_rtp_bin_finalize (GObject * object) if (rtpbin->sdes) gst_structure_free (rtpbin->sdes); + if (rtpbin->fec_decoders) + gst_structure_free (rtpbin->fec_decoders); + g_mutex_clear (&rtpbin->priv->bin_lock); g_mutex_clear (&rtpbin->priv->dyn_lock); @@ -2788,6 +2843,25 @@ gst_rtp_bin_set_sdes_struct (GstRtpBin * bin, const GstStructure * sdes) GST_RTP_BIN_UNLOCK (bin); } +static void +gst_rtp_bin_set_fec_decoders_struct (GstRtpBin * bin, + const GstStructure * decoders) +{ + if (decoders == NULL) + return; + + GST_RTP_BIN_LOCK (bin); + + GST_OBJECT_LOCK (bin); + if (bin->fec_decoders) + gst_structure_free (bin->fec_decoders); + bin->fec_decoders = gst_structure_copy (decoders); + + GST_OBJECT_UNLOCK (bin); + + GST_RTP_BIN_UNLOCK (bin); +} + static GstStructure * gst_rtp_bin_get_sdes_struct (GstRtpBin * bin) { @@ -2800,6 +2874,18 @@ gst_rtp_bin_get_sdes_struct (GstRtpBin * bin) return result; } +static GstStructure * +gst_rtp_bin_get_fec_decoders_struct (GstRtpBin * bin) +{ + GstStructure *result; + + GST_OBJECT_LOCK (bin); + result = gst_structure_copy (bin->fec_decoders); + GST_OBJECT_UNLOCK (bin); + + return result; +} + static void gst_rtp_bin_set_property (GObject * object, guint prop_id, const GValue * value, GParamSpec * pspec) @@ -2963,6 +3049,9 @@ gst_rtp_bin_set_property (GObject * object, guint prop_id, rtpbin->max_ts_offset = g_value_get_int64 (value); rtpbin->max_ts_offset_is_set = TRUE; break; + case PROP_FEC_DECODERS: + gst_rtp_bin_set_fec_decoders_struct (rtpbin, g_value_get_boxed (value)); + break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); break; @@ -3057,6 +3146,9 @@ gst_rtp_bin_get_property (GObject * object, guint prop_id, case PROP_MAX_TS_OFFSET: g_value_set_int64 (value, rtpbin->max_ts_offset); break; + case PROP_FEC_DECODERS: + g_value_take_boxed (value, gst_rtp_bin_get_fec_decoders_struct (rtpbin)); + break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); break; @@ -3343,6 +3435,48 @@ copy_sticky_events (GstPad * pad, GstEvent ** event, gpointer user_data) return TRUE; } +static gboolean +ensure_fec_decoder (GstRtpBin * rtpbin, GstRtpBinSession * session) +{ + const gchar *factory; + gchar *sess_id_str; + + if (session->fec_decoder) + goto done; + + sess_id_str = g_strdup_printf ("%u", session->id); + factory = gst_structure_get_string (rtpbin->fec_decoders, sess_id_str); + g_free (sess_id_str); + + /* First try the property */ + if (factory) { + GError *err = NULL; + + session->fec_decoder = + gst_parse_bin_from_description_full (factory, TRUE, NULL, + GST_PARSE_FLAG_NO_SINGLE_ELEMENT_BINS | GST_PARSE_FLAG_FATAL_ERRORS, + &err); + if (!session->fec_decoder) { + GST_ERROR_OBJECT (rtpbin, "Failed to build decoder from factory: %s", + err->message); + } + + bin_manage_element (session->bin, session->fec_decoder); + session->elements = + g_slist_prepend (session->elements, session->fec_decoder); + GST_INFO_OBJECT (rtpbin, "Built FEC decoder: %" GST_PTR_FORMAT + " for session %u", session->fec_decoder, session->id); + } + + /* Fallback to the signal */ + if (!session->fec_decoder) + session->fec_decoder = + session_request_element (session, SIGNAL_REQUEST_FEC_DECODER); + +done: + return session->fec_decoder != NULL; +} + static void expose_recv_src_pad (GstRtpBin * rtpbin, GstPad * pad, GstRtpBinStream * stream, guint8 pt) @@ -3354,11 +3488,9 @@ expose_recv_src_pad (GstRtpBin * rtpbin, GstPad * pad, GstRtpBinStream * stream, gst_object_ref (pad); - if (stream->session->storage) { - GstElement *fec_decoder = - session_request_element (stream->session, SIGNAL_REQUEST_FEC_DECODER); - - if (fec_decoder) { + if (stream->session->storage && !stream->session->fec_decoder) { + if (ensure_fec_decoder (rtpbin, stream->session)) { + GstElement *fec_decoder = stream->session->fec_decoder; GstPad *sinkpad, *srcpad; GstPadLinkReturn ret; @@ -3594,6 +3726,15 @@ new_ssrc_pad_found (GstElement * element, guint ssrc, GstPad * pad, padname = g_strdup_printf ("src_%u", ssrc); srcpad = gst_element_get_static_pad (element, padname); g_free (padname); + + if (session->fec_decoder) { + sinkpad = gst_element_get_static_pad (session->fec_decoder, "sink"); + gst_pad_link_full (srcpad, sinkpad, GST_PAD_LINK_CHECK_NOTHING); + gst_object_unref (sinkpad); + gst_object_unref (srcpad); + srcpad = gst_element_get_static_pad (session->fec_decoder, "src"); + } + sinkpad = gst_element_get_static_pad (stream->buffer, "sink"); gst_pad_link_full (srcpad, sinkpad, GST_PAD_LINK_CHECK_NOTHING); gst_object_unref (sinkpad); @@ -3934,6 +4075,41 @@ remove_recv_rtp (GstRtpBin * rtpbin, GstRtpBinSession * session) } static GstPad * +complete_session_fec (GstRtpBin * rtpbin, GstRtpBinSession * session, + guint fec_idx) +{ + gchar *padname; + GstPad *ret; + + if (!ensure_fec_decoder (rtpbin, session)) + goto no_decoder; + + GST_DEBUG_OBJECT (rtpbin, "getting FEC sink pad"); + padname = g_strdup_printf ("fec_%u", fec_idx); + ret = gst_element_get_request_pad (session->fec_decoder, padname); + g_free (padname); + + if (ret == NULL) + goto pad_failed; + + session->recv_fec_sinks = g_slist_prepend (session->recv_fec_sinks, ret); + + return ret; + +pad_failed: + { + g_warning ("rtpbin: failed to get decoder fec pad"); + return NULL; + } +no_decoder: + { + g_warning ("rtpbin: failed to build FEC decoder for session %u", + session->id); + return NULL; + } +} + +static GstPad * complete_session_rtcp (GstRtpBin * rtpbin, GstRtpBinSession * session, guint sessid) { @@ -4076,6 +4252,66 @@ create_error: } } +static GstPad * +create_recv_fec (GstRtpBin * rtpbin, GstPadTemplate * templ, const gchar * name) +{ + guint sessid, fec_idx; + GstRtpBinSession *session; + GstPad *decsink = NULL; + GstPad *ghost; + + /* first get the session number */ + if (name == NULL + || sscanf (name, "recv_fec_sink_%u_%u", &sessid, &fec_idx) != 2) + goto no_name; + + if (fec_idx > 1) + goto invalid_idx; + + GST_DEBUG_OBJECT (rtpbin, "finding session %u", sessid); + + /* get or create the session */ + session = find_session_by_id (rtpbin, sessid); + if (!session) { + GST_DEBUG_OBJECT (rtpbin, "creating session %u", sessid); + /* create session now */ + session = create_session (rtpbin, sessid); + if (session == NULL) + goto create_error; + } + + decsink = complete_session_fec (rtpbin, session, fec_idx); + if (!decsink) + goto create_error; + + ghost = gst_ghost_pad_new_from_template (name, decsink, templ); + session->recv_fec_sink_ghosts = + g_slist_prepend (session->recv_fec_sink_ghosts, ghost); + gst_object_unref (decsink); + gst_pad_set_active (ghost, TRUE); + gst_element_add_pad (GST_ELEMENT_CAST (rtpbin), ghost); + + return ghost; + + /* ERRORS */ +no_name: + { + g_warning ("rtpbin: cannot find session id for pad: %s", + GST_STR_NULL (name)); + return NULL; + } +invalid_idx: + { + g_warning ("rtpbin: invalid FEC index: %s", GST_STR_NULL (name)); + return NULL; + } +create_error: + { + /* create_session already warned */ + return NULL; + } +} + static void remove_recv_rtcp (GstRtpBin * rtpbin, GstRtpBinSession * session) { @@ -4097,6 +4333,49 @@ remove_recv_rtcp (GstRtpBin * rtpbin, GstRtpBinSession * session) } } +static void +remove_recv_fec_for_pad (GstRtpBin * rtpbin, GstRtpBinSession * session, + GstPad * ghost) +{ + GSList *item; + GstPad *target; + + target = gst_ghost_pad_get_target (GST_GHOST_PAD (ghost)); + + if (target) { + item = g_slist_find (session->recv_fec_sinks, target); + if (item) { + gst_element_release_request_pad (session->fec_decoder, item->data); + session->recv_fec_sinks = + g_slist_delete_link (session->recv_fec_sinks, item); + } + gst_object_unref (target); + } + + item = g_slist_find (session->recv_fec_sink_ghosts, ghost); + if (item) + session->recv_fec_sink_ghosts = + g_slist_delete_link (session->recv_fec_sink_ghosts, item); + + gst_pad_set_active (ghost, FALSE); + gst_element_remove_pad (GST_ELEMENT_CAST (rtpbin), ghost); +} + +static void +remove_recv_fec (GstRtpBin * rtpbin, GstRtpBinSession * session) +{ + GSList *copy; + GSList *tmp; + + copy = g_slist_copy (session->recv_fec_sink_ghosts); + + for (tmp = copy; tmp; tmp = tmp->next) { + remove_recv_fec_for_pad (rtpbin, session, (GstPad *) tmp->data); + } + + g_slist_free (copy); +} + static gboolean complete_session_src (GstRtpBin * rtpbin, GstRtpBinSession * session) { @@ -4699,6 +4978,9 @@ gst_rtp_bin_request_new_pad (GstElement * element, } else if (templ == gst_element_class_get_pad_template (klass, "send_rtcp_src_%u")) { result = create_send_rtcp (rtpbin, templ, pad_name); + } else if (templ == gst_element_class_get_pad_template (klass, + "recv_fec_sink_%u_%u")) { + result = create_recv_fec (rtpbin, templ, pad_name); } else goto wrong_template; @@ -4743,13 +5025,16 @@ gst_rtp_bin_release_pad (GstElement * element, GstPad * pad) remove_send_rtp (rtpbin, session); } else if (session->send_rtcp_src_ghost == pad) { remove_rtcp (rtpbin, session); + } else if (pad_is_recv_fec (session, pad)) { + remove_recv_fec_for_pad (rtpbin, session, pad); } /* no more request pads, free the complete session */ if (session->recv_rtp_sink_ghost == NULL && session->recv_rtcp_sink_ghost == NULL && session->send_rtp_sink_ghost == NULL - && session->send_rtcp_src_ghost == NULL) { + && session->send_rtcp_src_ghost == NULL + && session->recv_fec_sink_ghosts == NULL) { GST_DEBUG_OBJECT (rtpbin, "no more pads for session %p", session); rtpbin->sessions = g_slist_remove (rtpbin->sessions, session); free_session (session, rtpbin); diff --git a/gst/rtpmanager/gstrtpbin.h b/gst/rtpmanager/gstrtpbin.h index fcea7cec8..58de860a4 100644 --- a/gst/rtpmanager/gstrtpbin.h +++ b/gst/rtpmanager/gstrtpbin.h @@ -88,6 +88,9 @@ struct _GstRtpBin { /* the default SDES items for sessions */ GstStructure *sdes; + /* the default FEC decoder factories for sessions */ + GstStructure *fec_decoders; + /*< private >*/ GstRtpBinPrivate *priv; }; diff --git a/gst/rtpmanager/gstrtpmanager.c b/gst/rtpmanager/gstrtpmanager.c index 4ba624fba..91b6b653a 100644 --- a/gst/rtpmanager/gstrtpmanager.c +++ b/gst/rtpmanager/gstrtpmanager.c @@ -32,6 +32,7 @@ #include "gstrtpdtmfmux.h" #include "gstrtpmux.h" #include "gstrtpfunnel.h" +#include "gstrtpst2022-1-fecdec.h" static gboolean plugin_init (GstPlugin * plugin) @@ -74,6 +75,10 @@ plugin_init (GstPlugin * plugin) GST_TYPE_RTP_FUNNEL)) return FALSE; + if (!gst_element_register (plugin, "rtpst2022-1-fecdec", GST_RANK_NONE, + GST_TYPE_RTPST_2022_1_FECDEC)) + return FALSE; + return TRUE; } diff --git a/gst/rtpmanager/gstrtpst2022-1-fecdec.c b/gst/rtpmanager/gstrtpst2022-1-fecdec.c new file mode 100644 index 000000000..7f45b53f8 --- /dev/null +++ b/gst/rtpmanager/gstrtpst2022-1-fecdec.c @@ -0,0 +1,995 @@ +/* GStreamer + * Copyright (C) <2020> Mathieu Duponchelle <mathieu@centricular.com> + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, + * Boston, MA 02110-1301, USA. + */ + +/** + * SECTION:element-rtpst2022-1-fecdec + * @see_also: #element-rtpst2022-1-fecenc + * + * This element takes as input a media stream and up to two FEC + * streams as described in SMPTE 2022-1: Forward Error Correction + * for Real-Time Video/Audio Transport Over IP Networks, and makes + * use of the FEC packets to recover media packets that may have + * gotten lost. + * + * ## Design + * + * The approach picked for this element is to proactively reconstruct missing + * packets as soon as possible. When a FEC packet arrives, the element + * immediately checks whether a media packet in the row / column it protects + * can be reconstructed. + * + * Similarly, when a media packet comes in, the element checks whether it has + * already received a corresponding packet in both the column and row the packet + * belongs to, and if so goes through the first step listed above. + * + * This process is repeated recursively, allowing for recoveries over one + * dimension to unblock recoveries over the other. + * + * In perfect networking conditions, this incurs next to no overhead as FEC + * packets will arrive after the media packets, causing no reconstruction to + * take place, just a few checks upon chaining. + * + * ## sender / receiver example + * + * ``` shell + * gst-launch-1.0 \ + * rtpbin name=rtp fec-encoders='fec,0="rtpst2022-1-fecenc\ rows\=5\ columns\=5";' \ + * uridecodebin uri=file:///path/to/video/file ! x264enc key-int-max=60 tune=zerolatency ! \ + * queue ! mpegtsmux ! rtpmp2tpay ssrc=0 ! rtp.send_rtp_sink_0 \ + * rtp.send_rtp_src_0 ! udpsink host=127.0.0.1 port=5000 \ + * rtp.send_fec_src_0_0 ! udpsink host=127.0.0.1 port=5002 async=false \ + * rtp.send_fec_src_0_1 ! udpsink host=127.0.0.1 port=5004 async=false + * ``` + * + * ``` shell + * gst-launch-1.0 \ + * rtpbin latency=500 fec-decoders='fec,0="rtpst2022-1-fecdec\ size-time\=1000000000";' name=rtp \ + * udpsrc address=127.0.0.1 port=5002 caps="application/x-rtp, payload=96" ! queue ! rtp.recv_fec_sink_0_0 \ + * udpsrc address=127.0.0.1 port=5004 caps="application/x-rtp, payload=96" ! queue ! rtp.recv_fec_sink_0_1 \ + * udpsrc address=127.0.0.1 port=5000 caps="application/x-rtp, media=video, clock-rate=90000, encoding-name=mp2t, payload=33" ! \ + * queue ! netsim drop-probability=0.05 ! rtp.recv_rtp_sink_0 \ + * rtp. ! decodebin ! videoconvert ! queue ! autovideosink + * ``` + * + * With the above command line, as the media packet size is constant, + * the fec overhead can be approximated to the number of fec packets + * per 2-d matrix of media packet, here 10 fec packets for each 25 + * media packets. + * + * Increasing the number of rows and columns will decrease the overhead, + * but obviously increase the likelihood of recovery failure for lost + * packets on the receiver side. + * + * Since: 1.20 + */ + +#ifdef HAVE_CONFIG_H +#include "config.h" +#endif + +#include <gst/base/base.h> +#include <gst/rtp/gstrtpbuffer.h> + +#include "gstrtpst2022-1-fecdec.h" + +GST_DEBUG_CATEGORY_STATIC (gst_rtpst_2022_1_fecdec_debug); +#define GST_CAT_DEFAULT gst_rtpst_2022_1_fecdec_debug + +#define DEFAULT_SIZE_TIME (GST_SECOND) + +typedef struct +{ + guint16 seq; + GstBuffer *buffer; +} Item; + +static GstFlowReturn store_media_item (GstRTPST_2022_1_FecDec * dec, + GstRTPBuffer * rtp, Item * item); + +static void +free_item (Item * item) +{ + gst_buffer_unref (item->buffer); + item->buffer = NULL; + g_free (item); +} + +static gint +cmp_items (Item * a, Item * b, gpointer unused) +{ + return gst_rtp_buffer_compare_seqnum (b->seq, a->seq); +} + +enum +{ + PROP_0, + PROP_SIZE_TIME, +}; + +struct _GstRTPST_2022_1_FecDecClass +{ + GstElementClass class; +}; + +struct _GstRTPST_2022_1_FecDec +{ + GstElement element; + + GstPad *srcpad; + GstPad *sinkpad; + GList *fec_sinkpads; + + /* All the following field are protected by the OBJECT_LOCK */ + GSequence *packets; + GHashTable *column_fec_packets; + GSequence *fec_packets[2]; + /* N columns */ + guint l; + /* N rows */ + guint d; + + GstClockTime size_time; + GstClockTime max_arrival_time; + GstClockTime max_fec_arrival_time[2]; +}; + +#define RTP_CAPS "application/x-rtp" + +typedef struct +{ + guint16 seq; + guint16 len; + guint8 E; + guint8 pt; + guint32 mask; + guint32 timestamp; + guint8 N; + guint8 D; + guint8 type; + guint8 index; + guint8 offset; + guint8 NA; + guint8 seq_ext; + guint8 *payload; + guint payload_len; +} Rtp2DFecHeader; + +static GstStaticPadTemplate fec_sink_template = +GST_STATIC_PAD_TEMPLATE ("fec_%u", + GST_PAD_SINK, + GST_PAD_REQUEST, + GST_STATIC_CAPS (RTP_CAPS)); + +static GstStaticPadTemplate sink_template = GST_STATIC_PAD_TEMPLATE ("sink", + GST_PAD_SINK, + GST_PAD_ALWAYS, + GST_STATIC_CAPS (RTP_CAPS)); + +static GstStaticPadTemplate src_template = GST_STATIC_PAD_TEMPLATE ("src", + GST_PAD_SRC, + GST_PAD_ALWAYS, + GST_STATIC_CAPS (RTP_CAPS)); + +#define gst_rtpst_2022_1_fecdec_parent_class parent_class +G_DEFINE_TYPE (GstRTPST_2022_1_FecDec, gst_rtpst_2022_1_fecdec, + GST_TYPE_ELEMENT); + +static void +trim_items (GstRTPST_2022_1_FecDec * dec) +{ + GSequenceIter *tmp_iter, *iter = NULL; + + for (tmp_iter = g_sequence_get_begin_iter (dec->packets); + tmp_iter; tmp_iter = g_sequence_iter_next (tmp_iter)) { + Item *item; + + if (g_sequence_iter_is_end (tmp_iter)) + break; + + item = g_sequence_get (tmp_iter); + + if (dec->max_arrival_time - GST_BUFFER_DTS_OR_PTS (item->buffer) < + dec->size_time) + break; + + iter = tmp_iter; + } + + if (iter) { + Item *item = g_sequence_get (iter); + GST_TRACE_OBJECT (dec, + "Trimming packets up to %" GST_TIME_FORMAT " (seq: %u)", + GST_TIME_ARGS (GST_BUFFER_DTS_OR_PTS (item->buffer)), item->seq); + g_sequence_remove_range (g_sequence_get_begin_iter (dec->packets), iter); + } +} + +static void +trim_fec_items (GstRTPST_2022_1_FecDec * dec, guint D) +{ + GSequenceIter *tmp_iter, *iter = NULL; + + for (tmp_iter = g_sequence_get_begin_iter (dec->fec_packets[D]); + tmp_iter; tmp_iter = g_sequence_iter_next (tmp_iter)) { + Item *item; + + if (g_sequence_iter_is_end (tmp_iter)) + break; + + item = g_sequence_get (tmp_iter); + + if (dec->max_fec_arrival_time[D] - GST_BUFFER_DTS_OR_PTS (item->buffer) < + dec->size_time) + break; + + if (!D) { + guint i; + guint16 seq; + + for (i = 0; i < dec->d; i++) { + seq = item->seq + i * dec->l; + g_hash_table_remove (dec->column_fec_packets, GUINT_TO_POINTER (seq)); + } + } + + iter = tmp_iter; + } + + if (iter) { + Item *item = g_sequence_get (iter); + GST_TRACE_OBJECT (dec, + "Trimming %s FEC packets up to %" GST_TIME_FORMAT " (seq: %u)", + D ? "row" : "column", + GST_TIME_ARGS (GST_BUFFER_DTS_OR_PTS (item->buffer)), item->seq); + g_sequence_remove_range (g_sequence_get_begin_iter (dec->fec_packets[D]), + iter); + } +} + +static Item * +lookup_media_packet (GstRTPST_2022_1_FecDec * dec, guint16 seqnum) +{ + GSequenceIter *iter; + Item *ret = NULL; + Item dummy = { seqnum, NULL }; + + iter = + g_sequence_lookup (dec->packets, &dummy, (GCompareDataFunc) cmp_items, + NULL); + + if (iter) + ret = g_sequence_get (iter); + + return ret; +} + +static gboolean +parse_header (Rtp2DFecHeader * fec, guint8 * data, guint len) +{ + gboolean ret = FALSE; + GstBitReader bits; + + if (len < 16) + goto done; + + gst_bit_reader_init (&bits, data, len); + + fec->seq = gst_bit_reader_get_bits_uint16_unchecked (&bits, 16); + fec->len = gst_bit_reader_get_bits_uint16_unchecked (&bits, 16); + fec->E = gst_bit_reader_get_bits_uint8_unchecked (&bits, 1); + fec->pt = gst_bit_reader_get_bits_uint8_unchecked (&bits, 7); + fec->mask = gst_bit_reader_get_bits_uint32_unchecked (&bits, 24); + fec->timestamp = gst_bit_reader_get_bits_uint32_unchecked (&bits, 32); + fec->N = gst_bit_reader_get_bits_uint8_unchecked (&bits, 1); + fec->D = gst_bit_reader_get_bits_uint8_unchecked (&bits, 1); + fec->type = gst_bit_reader_get_bits_uint8_unchecked (&bits, 3); + fec->index = gst_bit_reader_get_bits_uint8_unchecked (&bits, 3); + fec->offset = gst_bit_reader_get_bits_uint8_unchecked (&bits, 8); + fec->NA = gst_bit_reader_get_bits_uint8_unchecked (&bits, 8); + fec->seq_ext = gst_bit_reader_get_bits_uint8_unchecked (&bits, 8); + fec->payload = data + 16; + fec->payload_len = len - 16; + + ret = TRUE; + +done: + return ret; +} + +static Item * +get_row_fec (GstRTPST_2022_1_FecDec * dec, guint16 seqnum) +{ + GSequenceIter *iter; + Item *ret = NULL; + Item dummy = { 0, }; + + if (dec->l == G_MAXUINT) + goto done; + + /* Potential underflow is intended */ + dummy.seq = seqnum - dec->l; + + iter = + g_sequence_search (dec->fec_packets[1], &dummy, + (GCompareDataFunc) cmp_items, NULL); + + if (!g_sequence_iter_is_end (iter)) { + gint seqdiff; + ret = g_sequence_get (iter); + + seqdiff = gst_rtp_buffer_compare_seqnum (ret->seq, seqnum); + + /* Now check whether the fec packet does apply */ + if (seqdiff < 0 || seqdiff >= dec->l) + ret = NULL; + } + +done: + return ret; +} + +static Item * +get_column_fec (GstRTPST_2022_1_FecDec * dec, guint16 seqnum) +{ + Item *ret = NULL; + + if (dec->l == G_MAXUINT || dec->d == G_MAXUINT) + goto done; + + ret = + g_hash_table_lookup (dec->column_fec_packets, GUINT_TO_POINTER (seqnum)); + +done: + return ret; +} + +static void +_xor_mem (guint8 * restrict dst, const guint8 * restrict src, gsize length) +{ + guint i; + + for (i = 0; i < (length / sizeof (guint64)); ++i) { +#if G_BYTE_ORDER == G_LITTLE_ENDIAN + GST_WRITE_UINT64_LE (dst, + GST_READ_UINT64_LE (dst) ^ GST_READ_UINT64_LE (src)); +#else + GST_WRITE_UINT64_BE (dst, + GST_READ_UINT64_BE (dst) ^ GST_READ_UINT64_BE (src)); +#endif + dst += sizeof (guint64); + src += sizeof (guint64); + } + for (i = 0; i < (length % sizeof (guint64)); ++i) + dst[i] ^= src[i]; +} + +static GstFlowReturn +xor_items (GstRTPST_2022_1_FecDec * dec, Rtp2DFecHeader * fec, GList * packets, + guint16 seqnum) +{ + guint8 *xored; + guint32 xored_timestamp; + guint8 xored_pt; + guint16 xored_payload_len; + Item *item; + GstRTPBuffer rtp = GST_RTP_BUFFER_INIT; + GList *tmp; + GstFlowReturn ret = GST_FLOW_OK; + GstBuffer *buffer; + + /* Figure out the recovered packet length first */ + xored_payload_len = fec->len; + for (tmp = packets; tmp; tmp = tmp->next) { + GstRTPBuffer media_rtp = GST_RTP_BUFFER_INIT; + Item *item = (Item *) tmp->data; + + gst_rtp_buffer_map (item->buffer, GST_MAP_READ, &media_rtp); + xored_payload_len ^= gst_rtp_buffer_get_payload_len (&media_rtp); + gst_rtp_buffer_unmap (&media_rtp); + } + + if (xored_payload_len > fec->payload_len) { + GST_WARNING_OBJECT (dec, "FEC payload len %u < length recovery %u", + fec->payload_len, xored_payload_len); + goto done; + } + + item = g_malloc0 (sizeof (Item)); + item->seq = seqnum; + item->buffer = gst_rtp_buffer_new_allocate (xored_payload_len, 0, 0); + gst_rtp_buffer_map (item->buffer, GST_MAP_WRITE, &rtp); + + xored = gst_rtp_buffer_get_payload (&rtp); + memcpy (xored, fec->payload, xored_payload_len); + xored_timestamp = fec->timestamp; + xored_pt = fec->pt; + + for (tmp = packets; tmp; tmp = tmp->next) { + GstRTPBuffer media_rtp = GST_RTP_BUFFER_INIT; + Item *item = (Item *) tmp->data; + + gst_rtp_buffer_map (item->buffer, GST_MAP_READ, &media_rtp); + _xor_mem (xored, gst_rtp_buffer_get_payload (&media_rtp), + gst_rtp_buffer_get_payload_len (&media_rtp)); + xored_timestamp ^= gst_rtp_buffer_get_timestamp (&media_rtp); + xored_pt ^= gst_rtp_buffer_get_payload_type (&media_rtp); + + gst_rtp_buffer_unmap (&media_rtp); + } + + GST_DEBUG_OBJECT (dec, + "Recovered buffer through %s FEC with seqnum %u, payload len %u and timestamp %u", + fec->D ? "row" : "column", seqnum, xored_payload_len, xored_timestamp); + + GST_BUFFER_DTS (item->buffer) = dec->max_arrival_time; + + gst_rtp_buffer_set_timestamp (&rtp, xored_timestamp); + gst_rtp_buffer_set_seq (&rtp, seqnum); + gst_rtp_buffer_set_payload_type (&rtp, xored_pt); + + gst_rtp_buffer_unmap (&rtp); + + /* Store a ref on item->buffer as store_media_item may + * recurse and call this method again, potentially releasing + * the object lock and leaving our item unprotected in + * dec->packets + */ + buffer = gst_buffer_ref (item->buffer); + + /* It is right that we should celebrate, + * for your brother was dead, and is alive again */ + gst_rtp_buffer_map (item->buffer, GST_MAP_READ, &rtp); + ret = store_media_item (dec, &rtp, item); + gst_rtp_buffer_unmap (&rtp); + + if (ret == GST_FLOW_OK) { + /* Unlocking here is safe */ + GST_OBJECT_UNLOCK (dec); + ret = gst_pad_push (dec->srcpad, buffer); + GST_OBJECT_LOCK (dec); + } else { + gst_buffer_unref (buffer); + } + +done: + return ret; +} + +/* Returns a flow value if we should discard the packet, GST_FLOW_CUSTOM_SUCCESS otherwise */ +static GstFlowReturn +check_fec (GstRTPST_2022_1_FecDec * dec, Rtp2DFecHeader * fec) +{ + GList *packets = NULL; + gint missing_seq = -1; + guint n_packets = 0; + guint required_n_packets; + GstFlowReturn ret = GST_FLOW_OK; + + if (fec->D) { + guint i = 0; + + required_n_packets = dec->l; + + for (i = 0; i < dec->l; i++) { + Item *item = lookup_media_packet (dec, fec->seq + i); + + if (item) { + packets = g_list_prepend (packets, item); + n_packets += 1; + } else { + missing_seq = fec->seq + i; + } + } + } else { + guint i = 0; + + required_n_packets = dec->d; + + for (i = 0; i < dec->d; i++) { + Item *item = lookup_media_packet (dec, fec->seq + i * dec->l); + + if (item) { + packets = g_list_prepend (packets, item); + n_packets += 1; + } else { + missing_seq = fec->seq + i * dec->l; + } + } + } + + if (n_packets == required_n_packets) { + g_assert (missing_seq == -1); + GST_LOG_OBJECT (dec, + "All media packets present, we can discard that FEC packet"); + } else if (n_packets + 1 == required_n_packets) { + g_assert (missing_seq != -1); + ret = xor_items (dec, fec, packets, missing_seq); + GST_LOG_OBJECT (dec, "We have enough info to reconstruct %u", missing_seq); + } else { + ret = GST_FLOW_CUSTOM_SUCCESS; + GST_LOG_OBJECT (dec, "Too many media packets missing, storing FEC packet"); + } + g_list_free (packets); + + return ret; +} + +static GstFlowReturn +check_fec_item (GstRTPST_2022_1_FecDec * dec, Item * item) +{ + Rtp2DFecHeader fec; + GstRTPBuffer rtp = GST_RTP_BUFFER_INIT; + guint payload_len; + guint8 *payload; + GstFlowReturn ret; + + gst_rtp_buffer_map (item->buffer, GST_MAP_READ, &rtp); + + payload_len = gst_rtp_buffer_get_payload_len (&rtp); + payload = gst_rtp_buffer_get_payload (&rtp); + + parse_header (&fec, payload, payload_len); + + ret = check_fec (dec, &fec); + + gst_rtp_buffer_unmap (&rtp); + + return ret; +} + +static GstFlowReturn +store_media_item (GstRTPST_2022_1_FecDec * dec, GstRTPBuffer * rtp, Item * item) +{ + GstFlowReturn ret = GST_FLOW_OK; + Item *fec_item; + guint16 seq; + + seq = gst_rtp_buffer_get_seq (rtp); + + g_sequence_insert_sorted (dec->packets, item, (GCompareDataFunc) cmp_items, + NULL); + + if ((fec_item = get_row_fec (dec, seq))) { + ret = check_fec_item (dec, fec_item); + if (ret == GST_FLOW_CUSTOM_SUCCESS) + ret = GST_FLOW_OK; + } + + if (ret == GST_FLOW_OK && (fec_item = get_column_fec (dec, seq))) { + ret = check_fec_item (dec, fec_item); + if (ret == GST_FLOW_CUSTOM_SUCCESS) + ret = GST_FLOW_OK; + } + + return ret; +} + +static GstFlowReturn +store_media (GstRTPST_2022_1_FecDec * dec, GstRTPBuffer * rtp, + GstBuffer * buffer) +{ + Item *item; + guint16 seq; + + seq = gst_rtp_buffer_get_seq (rtp); + item = g_malloc0 (sizeof (Item)); + item->buffer = gst_buffer_ref (buffer); + item->seq = seq; + + return store_media_item (dec, rtp, item); +} + +static GstFlowReturn +gst_rtpst_2022_1_fecdec_sink_chain_fec (GstPad * pad, GstObject * parent, + GstBuffer * buffer) +{ + GstRTPST_2022_1_FecDec *dec = GST_RTPST_2022_1_FECDEC_CAST (parent); + Rtp2DFecHeader fec = { 0, }; + guint payload_len; + guint8 *payload; + GstFlowReturn ret = GST_FLOW_OK; + Item *item; + GstRTPBuffer rtp = GST_RTP_BUFFER_INIT; + + GST_OBJECT_LOCK (dec); + + if (!gst_rtp_buffer_map (buffer, GST_MAP_READ, &rtp)) { + GST_WARNING_OBJECT (pad, "Chained FEC buffer isn't valid RTP"); + goto discard; + } + + payload_len = gst_rtp_buffer_get_payload_len (&rtp); + payload = gst_rtp_buffer_get_payload (&rtp); + + if (!parse_header (&fec, payload, payload_len)) { + GST_WARNING_OBJECT (pad, "Failed to parse FEC header (payload len: %d)", + payload_len); + GST_MEMDUMP_OBJECT (pad, "Invalid payload", payload, payload_len); + goto discard; + } + + GST_TRACE_OBJECT + (pad, + "Handling FEC buffer with SNBase / N / D / NA / offset %u / %u / %u / %u / %u", + fec.seq, fec.N, fec.D, fec.NA, fec.offset); + + if (fec.D) { + if (dec->l == G_MAXUINT) { + dec->l = fec.NA; + } else if (fec.NA != dec->l) { + GST_WARNING_OBJECT (dec, "2D FEC dimensionality cannot change"); + goto discard; + } + + if (fec.offset != 1) { + GST_WARNING_OBJECT (pad, "offset must be 1 for row FEC packets"); + goto discard; + } + } else { + if (dec->d == G_MAXUINT) { + dec->d = fec.NA; + } else if (fec.NA != dec->d) { + GST_WARNING_OBJECT (dec, "2D FEC dimensionality cannot change"); + goto discard; + } + + if (dec->l == G_MAXUINT) { + dec->l = fec.offset; + } else if (fec.offset != dec->l) { + GST_WARNING_OBJECT (dec, "2D FEC dimensionality cannot change"); + goto discard; + } + } + + dec->max_fec_arrival_time[fec.D] = GST_BUFFER_DTS_OR_PTS (buffer); + trim_fec_items (dec, fec.D); + + ret = check_fec (dec, &fec); + + if (ret == GST_FLOW_CUSTOM_SUCCESS) { + item = g_malloc0 (sizeof (Item)); + item->buffer = buffer; + item->seq = fec.seq; + + if (!fec.D) { + guint i; + guint16 seq; + + for (i = 0; i < dec->d; i++) { + seq = fec.seq + i * dec->l; + g_hash_table_insert (dec->column_fec_packets, GUINT_TO_POINTER (seq), + item); + } + } + g_sequence_insert_sorted (dec->fec_packets[fec.D], item, + (GCompareDataFunc) cmp_items, NULL); + ret = GST_FLOW_OK; + } else { + goto discard; + } + + gst_rtp_buffer_unmap (&rtp); + +done: + GST_OBJECT_UNLOCK (dec); + return ret; + +discard: + if (rtp.buffer != NULL) + gst_rtp_buffer_unmap (&rtp); + + gst_buffer_unref (buffer); + + goto done; +} + +static GstFlowReturn +gst_rtpst_2022_1_fecdec_sink_chain (GstPad * pad, GstObject * parent, + GstBuffer * buffer) +{ + GstRTPST_2022_1_FecDec *dec = GST_RTPST_2022_1_FECDEC_CAST (parent); + GstFlowReturn ret = GST_FLOW_OK; + GstRTPBuffer rtp = GST_RTP_BUFFER_INIT; + + if (!gst_rtp_buffer_map (buffer, GST_MAP_READ, &rtp)) { + GST_WARNING_OBJECT (pad, "Chained buffer isn't valid RTP"); + goto error; + } + + GST_OBJECT_LOCK (dec); + dec->max_arrival_time = + MAX (dec->max_arrival_time, GST_BUFFER_DTS_OR_PTS (buffer)); + trim_items (dec); + ret = store_media (dec, &rtp, buffer); + GST_OBJECT_UNLOCK (dec); + + gst_rtp_buffer_unmap (&rtp); + + if (ret == GST_FLOW_OK) + ret = gst_pad_push (dec->srcpad, buffer); + +done: + return ret; + +error: + gst_buffer_unref (buffer); + goto done; +} + +static gboolean +gst_rtpst_2022_1_fecdec_src_event (GstPad * pad, GstObject * parent, + GstEvent * event) +{ + gboolean handled = FALSE; + gboolean ret = TRUE; + + if (!handled) { + gst_pad_event_default (pad, parent, event); + } + + return ret; +} + +/* Takes the object lock */ +static void +gst_rtpst_2022_1_fecdec_reset (GstRTPST_2022_1_FecDec * dec, gboolean allocate) +{ + guint i; + + GST_OBJECT_LOCK (dec); + + if (dec->packets) { + g_sequence_free (dec->packets); + dec->packets = NULL; + } + + if (dec->column_fec_packets) { + g_hash_table_unref (dec->column_fec_packets); + dec->column_fec_packets = NULL; + } + + if (allocate) { + dec->packets = g_sequence_new ((GDestroyNotify) free_item); + dec->column_fec_packets = g_hash_table_new (g_direct_hash, g_direct_equal); + } + + for (i = 0; i < 2; i++) { + if (dec->fec_packets[i]) { + g_sequence_free (dec->fec_packets[i]); + dec->fec_packets[i] = NULL; + } + + if (allocate) + dec->fec_packets[i] = g_sequence_new ((GDestroyNotify) free_item); + } + + dec->d = G_MAXUINT; + dec->l = G_MAXUINT; + + GST_OBJECT_UNLOCK (dec); +} + +static GstStateChangeReturn +gst_rtpst_2022_1_fecdec_change_state (GstElement * element, + GstStateChange transition) +{ + GstStateChangeReturn ret; + GstRTPST_2022_1_FecDec *dec = GST_RTPST_2022_1_FECDEC_CAST (element); + + switch (transition) { + case GST_STATE_CHANGE_READY_TO_PAUSED: + gst_rtpst_2022_1_fecdec_reset (dec, TRUE); + break; + case GST_STATE_CHANGE_PAUSED_TO_READY: + gst_rtpst_2022_1_fecdec_reset (dec, FALSE); + break; + default: + break; + } + + ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition); + + return ret; +} + +static void +gst_rtpst_2022_1_fecdec_finalize (GObject * object) +{ + GstRTPST_2022_1_FecDec *dec = GST_RTPST_2022_1_FECDEC_CAST (object); + + gst_rtpst_2022_1_fecdec_reset (dec, FALSE); + + G_OBJECT_CLASS (parent_class)->finalize (object); +} + +static void +gst_rtpst_2022_1_fecdec_set_property (GObject * object, guint prop_id, + const GValue * value, GParamSpec * pspec) +{ + GstRTPST_2022_1_FecDec *dec = GST_RTPST_2022_1_FECDEC_CAST (object); + + switch (prop_id) { + case PROP_SIZE_TIME: + dec->size_time = g_value_get_uint64 (value); + break; + default: + G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); + break; + } +} + +static void +gst_rtpst_2022_1_fecdec_get_property (GObject * object, guint prop_id, + GValue * value, GParamSpec * pspec) +{ + GstRTPST_2022_1_FecDec *dec = GST_RTPST_2022_1_FECDEC_CAST (object); + + switch (prop_id) { + case PROP_SIZE_TIME: + g_value_set_uint64 (value, dec->size_time); + break; + default: + G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); + break; + } +} + +static gboolean +gst_2d_fec_sink_event (GstPad * pad, GstObject * parent, GstEvent * event) +{ + GstRTPST_2022_1_FecDec *dec = GST_RTPST_2022_1_FECDEC_CAST (parent); + gboolean ret; + + if (GST_EVENT_TYPE (event) == GST_EVENT_FLUSH_STOP) + gst_rtpst_2022_1_fecdec_reset (dec, TRUE); + + ret = gst_pad_event_default (pad, parent, event); + + return ret; +} + +static GstIterator * +gst_rtpst_2022_1_fecdec_iterate_linked_pads (GstPad * pad, GstObject * parent) +{ + GstRTPST_2022_1_FecDec *dec = GST_RTPST_2022_1_FECDEC_CAST (parent); + GstPad *otherpad = NULL; + GstIterator *it = NULL; + GValue val = { 0, }; + + if (pad == dec->srcpad) + otherpad = dec->sinkpad; + else if (pad == dec->sinkpad) + otherpad = dec->srcpad; + + if (otherpad) { + g_value_init (&val, GST_TYPE_PAD); + g_value_set_object (&val, otherpad); + it = gst_iterator_new_single (GST_TYPE_PAD, &val); + g_value_unset (&val); + } + + return it; +} + +static GstPad * +gst_rtpst_2022_1_fecdec_request_new_pad (GstElement * element, + GstPadTemplate * templ, const gchar * name, const GstCaps * caps) +{ + GstRTPST_2022_1_FecDec *dec = GST_RTPST_2022_1_FECDEC_CAST (element); + GstPad *sinkpad = NULL; + + GST_DEBUG_OBJECT (element, "requesting pad"); + + if (g_list_length (dec->fec_sinkpads) > 1) { + GST_ERROR_OBJECT (dec, "not accepting more than two fec streams"); + goto done; + } + + sinkpad = gst_pad_new_from_template (templ, name); + gst_pad_set_chain_function (sinkpad, gst_rtpst_2022_1_fecdec_sink_chain_fec); + gst_element_add_pad (GST_ELEMENT (dec), sinkpad); + gst_pad_set_iterate_internal_links_function (sinkpad, + GST_DEBUG_FUNCPTR (gst_rtpst_2022_1_fecdec_iterate_linked_pads)); + + gst_pad_set_active (sinkpad, TRUE); + + GST_DEBUG_OBJECT (element, "requested pad %s:%s", + GST_DEBUG_PAD_NAME (sinkpad)); + +done: + return sinkpad; +} + +static void +gst_rtpst_2022_1_fecdec_release_pad (GstElement * element, GstPad * pad) +{ + GstRTPST_2022_1_FecDec *dec = GST_RTPST_2022_1_FECDEC_CAST (element); + + GST_DEBUG_OBJECT (element, "releasing pad %s:%s", GST_DEBUG_PAD_NAME (pad)); + + dec->fec_sinkpads = g_list_remove (dec->fec_sinkpads, pad); + + gst_pad_set_active (pad, FALSE); + gst_element_remove_pad (GST_ELEMENT_CAST (dec), pad); +} + +static void +gst_rtpst_2022_1_fecdec_class_init (GstRTPST_2022_1_FecDecClass * klass) +{ + GObjectClass *gobject_class = G_OBJECT_CLASS (klass); + GstElementClass *gstelement_class = GST_ELEMENT_CLASS (klass); + + gobject_class->set_property = + GST_DEBUG_FUNCPTR (gst_rtpst_2022_1_fecdec_set_property); + gobject_class->get_property = + GST_DEBUG_FUNCPTR (gst_rtpst_2022_1_fecdec_get_property); + gobject_class->finalize = + GST_DEBUG_FUNCPTR (gst_rtpst_2022_1_fecdec_finalize); + + g_object_class_install_property (gobject_class, PROP_SIZE_TIME, + g_param_spec_uint64 ("size-time", "Storage size (in ns)", + "The amount of data to store (in ns, 0-disable)", 0, + G_MAXUINT64, DEFAULT_SIZE_TIME, + G_PARAM_READWRITE | G_PARAM_CONSTRUCT | G_PARAM_STATIC_STRINGS)); + + gstelement_class->change_state = + GST_DEBUG_FUNCPTR (gst_rtpst_2022_1_fecdec_change_state); + gstelement_class->request_new_pad = + GST_DEBUG_FUNCPTR (gst_rtpst_2022_1_fecdec_request_new_pad); + gstelement_class->release_pad = + GST_DEBUG_FUNCPTR (gst_rtpst_2022_1_fecdec_release_pad); + + gst_element_class_set_static_metadata (gstelement_class, + "SMPTE 2022-1 FEC decoder", "SMPTE 2022-1 FEC decoding", + "performs FEC as described by SMPTE 2022-1", + "Mathieu Duponchelle <mathieu@centricular.com>"); + + gst_element_class_add_static_pad_template (gstelement_class, &sink_template); + gst_element_class_add_static_pad_template (gstelement_class, + &fec_sink_template); + gst_element_class_add_static_pad_template (gstelement_class, &src_template); + + GST_DEBUG_CATEGORY_INIT (gst_rtpst_2022_1_fecdec_debug, + "rtpst2022-1-fecdec", 0, "SMPTE 2022-1 FEC decoder element"); +} + +static void +gst_rtpst_2022_1_fecdec_init (GstRTPST_2022_1_FecDec * dec) +{ + dec->srcpad = gst_pad_new_from_static_template (&src_template, "src"); + GST_PAD_SET_PROXY_CAPS (dec->srcpad); + gst_pad_use_fixed_caps (dec->srcpad); + gst_pad_set_event_function (dec->srcpad, + GST_DEBUG_FUNCPTR (gst_rtpst_2022_1_fecdec_src_event)); + gst_pad_set_iterate_internal_links_function (dec->srcpad, + GST_DEBUG_FUNCPTR (gst_rtpst_2022_1_fecdec_iterate_linked_pads)); + gst_element_add_pad (GST_ELEMENT (dec), dec->srcpad); + + dec->sinkpad = gst_pad_new_from_static_template (&sink_template, "sink"); + GST_PAD_SET_PROXY_CAPS (dec->sinkpad); + gst_pad_set_chain_function (dec->sinkpad, gst_rtpst_2022_1_fecdec_sink_chain); + gst_pad_set_event_function (dec->sinkpad, + GST_DEBUG_FUNCPTR (gst_2d_fec_sink_event)); + gst_pad_set_iterate_internal_links_function (dec->sinkpad, + GST_DEBUG_FUNCPTR (gst_rtpst_2022_1_fecdec_iterate_linked_pads)); + gst_element_add_pad (GST_ELEMENT (dec), dec->sinkpad); + + dec->d = G_MAXUINT; + dec->l = G_MAXUINT; +} diff --git a/gst/rtpmanager/gstrtpst2022-1-fecdec.h b/gst/rtpmanager/gstrtpst2022-1-fecdec.h new file mode 100644 index 000000000..104b1578e --- /dev/null +++ b/gst/rtpmanager/gstrtpst2022-1-fecdec.h @@ -0,0 +1,37 @@ +/* GStreamer + * Copyright (C) <2020> Mathieu Duponchelle <mathieu@centricular.com> + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, + * Boston, MA 02110-1301, USA. + */ + +#ifndef __GST_RTPST_2022_1_FECDEC_H__ +#define __GST_RTPST_2022_1_FECDEC_H__ + +#include <gst/gst.h> + +G_BEGIN_DECLS + +typedef struct _GstRTPST_2022_1_FecDecClass GstRTPST_2022_1_FecDecClass; +typedef struct _GstRTPST_2022_1_FecDec GstRTPST_2022_1_FecDec; + +#define GST_TYPE_RTPST_2022_1_FECDEC (gst_rtpst_2022_1_fecdec_get_type()) +#define GST_RTPST_2022_1_FECDEC_CAST(obj) ((GstRTPST_2022_1_FecDec *)(obj)) + +GType gst_rtpst_2022_1_fecdec_get_type (void); + +G_END_DECLS + +#endif /* __GST_RTPST_2022_1_FECDEC_H__ */ diff --git a/gst/rtpmanager/meson.build b/gst/rtpmanager/meson.build index 118a1e1ea..5cb6084cc 100644 --- a/gst/rtpmanager/meson.build +++ b/gst/rtpmanager/meson.build @@ -17,6 +17,7 @@ rtpmanager_sources = [ 'rtptwcc.c', 'gstrtpsession.c', 'gstrtpfunnel.c', + 'gstrtpst2022-1-fecdec.c' ] gstrtpmanager = library('gstrtpmanager', |