diff options
author | Sebastian Dröge <sebastian@centricular.com> | 2021-04-06 20:56:55 +0300 |
---|---|---|
committer | Sebastian Dröge <sebastian@centricular.com> | 2021-04-06 21:17:56 +0300 |
commit | defe732ae0ccc61cfb2f98f0768bba459dbf08af (patch) | |
tree | 032a6e67461c1b5718045b79aabd1a73dc82ca9c | |
parent | 96b10c158fc7429ddeae11207c9a3cead3504d19 (diff) |
aggregator: Release pads' peeked buffer when removing the pad or finalizing it
The peeked buffer was always reset after calling ::aggregate() but under
no other circumstances. If a pad was removed after peeking and before
::aggregate() returned then the peeked buffer would be leaked.
This can easily happen if pads are removed from the aggregator from a
pad probe downstream of the source pad but still in the source pad's
streaming thread.
Part-of: <https://gitlab.freedesktop.org/gstreamer/gstreamer/-/merge_requests/784>
-rw-r--r-- | libs/gst/base/gstaggregator.c | 2 | ||||
-rw-r--r-- | tests/check/libs/aggregator.c | 39 |
2 files changed, 41 insertions, 0 deletions
diff --git a/libs/gst/base/gstaggregator.c b/libs/gst/base/gstaggregator.c index c55f4cce03..f4ee40c869 100644 --- a/libs/gst/base/gstaggregator.c +++ b/libs/gst/base/gstaggregator.c @@ -1938,6 +1938,7 @@ gst_aggregator_release_pad (GstElement * element, GstPad * pad) SRC_LOCK (self); gst_aggregator_pad_set_flushing (aggpad, GST_FLOW_FLUSHING, TRUE); + gst_buffer_replace (&aggpad->priv->peeked_buffer, NULL); gst_element_remove_pad (element, pad); self->priv->has_peer_latency = FALSE; @@ -3199,6 +3200,7 @@ gst_aggregator_pad_finalize (GObject * object) { GstAggregatorPad *pad = (GstAggregatorPad *) object; + gst_buffer_replace (&pad->priv->peeked_buffer, NULL); g_cond_clear (&pad->priv->event_cond); g_mutex_clear (&pad->priv->flush_lock); g_mutex_clear (&pad->priv->lock); diff --git a/tests/check/libs/aggregator.c b/tests/check/libs/aggregator.c index 24df76865a..73c12422cf 100644 --- a/tests/check/libs/aggregator.c +++ b/tests/check/libs/aggregator.c @@ -59,6 +59,7 @@ struct _GstTestAggregator guint64 timestamp; gboolean gap_expected; gboolean do_flush_on_aggregate; + gboolean do_remove_pad_on_aggregate; }; struct _GstTestAggregatorClass @@ -113,6 +114,14 @@ gst_test_aggregator_aggregate (GstAggregator * aggregator, gboolean timeout) fail_unless (buf == popped_buf); gst_buffer_unref (buf); gst_buffer_unref (popped_buf); + } else if (testagg->do_remove_pad_on_aggregate) { + buf = gst_aggregator_pad_peek_buffer (pad); + + GST_DEBUG_OBJECT (pad, "Removing pad on aggregate"); + + gst_buffer_unref (buf); + gst_element_release_request_pad (GST_ELEMENT (aggregator), + GST_PAD (pad)); } else { gst_aggregator_pad_drop_buffer (pad); } @@ -1304,6 +1313,35 @@ GST_START_TEST (test_flush_on_aggregate) GST_END_TEST; +GST_START_TEST (test_remove_pad_on_aggregate) +{ + GThread *thread1, *thread2; + ChainData data1 = { 0, }; + ChainData data2 = { 0, }; + TestData test = { 0, }; + + _test_data_init (&test, FALSE); + ((GstTestAggregator *) test.aggregator)->do_remove_pad_on_aggregate = TRUE; + _chain_data_init (&data1, test.aggregator, gst_buffer_new (), NULL); + _chain_data_init (&data2, test.aggregator, gst_buffer_new (), NULL); + + thread1 = g_thread_try_new ("gst-check", push_data, &data1, NULL); + thread2 = g_thread_try_new ("gst-check", push_data, &data2, NULL); + + g_main_loop_run (test.ml); + g_source_remove (test.timeout_id); + + /* these will return immediately as when the data is popped the threads are + * unlocked and will terminate */ + g_thread_join (thread1); + g_thread_join (thread2); + + _chain_data_clear (&data1); + _chain_data_clear (&data2); + _test_data_clear (&test); +} + +GST_END_TEST; static Suite * gst_aggregator_suite (void) @@ -1333,6 +1371,7 @@ gst_aggregator_suite (void) tcase_add_test (general, test_add_remove); tcase_add_test (general, test_change_state_intensive); tcase_add_test (general, test_flush_on_aggregate); + tcase_add_test (general, test_remove_pad_on_aggregate); return suite; } |