diff options
author | Pavlo Strokov <pstrokov@gitlab.com> | 2020-12-14 17:58:42 +0300 |
---|---|---|
committer | Pavlo Strokov <pstrokov@gitlab.com> | 2020-12-15 12:45:16 +0300 |
commit | bb6f1bb506a3e3dc3ce3cef5f2a65036f8e5a4d4 (patch) | |
tree | bb11f30b468aef371226cfd48962cf8c24764318 /internal/praefect/datastore | |
parent | e97131a42ed6db84edda3a3cc585eb2004f8b503 (diff) |
Handle empty notification messages
The triggers may be called with no actual change done by the
operation, like updating of the non-existing rows. In this case
existing trigger will generate a notification with empty payload.
This payload can't be properly handled by the listener and will
cause cache disabling.
To deal with it we add check in the trigger logic to
verify if there are any actual changes done to the state.
This change also includes structural changes into the query but
preserve the same logic and format of the payload.
Closes: https://gitlab.com/gitlab-org/gitaly/-/issues/3309
Diffstat (limited to 'internal/praefect/datastore')
4 files changed, 46 insertions, 23 deletions
diff --git a/internal/praefect/datastore/listener_postgres_test.go b/internal/praefect/datastore/listener_postgres_test.go index 79a53d741..d402368d4 100644 --- a/internal/praefect/datastore/listener_postgres_test.go +++ b/internal/praefect/datastore/listener_postgres_test.go @@ -444,6 +444,23 @@ func TestPostgresListener_Listen_storage_repositories_update(t *testing.T) { ) } +func TestPostgresListener_Listen_storage_empty_notification(t *testing.T) { + db := getDB(t) + + const channel = "storage_repositories_updates" + + testListener( + t, + channel, + func(t *testing.T) {}, + func(t *testing.T) { + _, err := db.DB.Exec(`UPDATE storage_repositories SET generation = 1`) + require.NoError(t, err) + }, + nil, // no notification events expected + ) +} + func TestPostgresListener_Listen_storage_repositories_delete(t *testing.T) { db := getDB(t) @@ -507,9 +524,16 @@ func testListener(t *testing.T, channel string, setup func(t *testing.T), trigge select { case <-time.After(time.Second): + if verifier == nil { + // no notifications expected + return + } require.FailNow(t, "no notifications for too long period") case <-receivedChan: } + if verifier == nil { + require.Failf(t, "no notifications expected", "received: %v", notification) + } verifier(t, notification) } diff --git a/internal/praefect/datastore/migrations/20201208163237_cleanup_notifiactions_payload.go b/internal/praefect/datastore/migrations/20201208163237_cleanup_notifiactions_payload.go index 6213bef6f..a1a79b56b 100644 --- a/internal/praefect/datastore/migrations/20201208163237_cleanup_notifiactions_payload.go +++ b/internal/praefect/datastore/migrations/20201208163237_cleanup_notifiactions_payload.go @@ -9,40 +9,38 @@ func init() { `-- +migrate StatementBegin CREATE OR REPLACE FUNCTION notify_on_change() RETURNS TRIGGER AS $$ DECLARE - msg TEXT DEFAULT ''; + msg JSONB; BEGIN - CASE TG_OP + CASE TG_OP WHEN 'INSERT' THEN - SELECT JSON_AGG(obj)::TEXT INTO msg + SELECT JSON_AGG(obj) INTO msg FROM ( - SELECT JSONB_BUILD_OBJECT('virtual_storage', virtual_storage, 'relative_paths', ARRAY_AGG(relative_path)) AS obj - FROM (SELECT DISTINCT virtual_storage, relative_path FROM NEW) t + SELECT JSONB_BUILD_OBJECT('virtual_storage', virtual_storage, 'relative_paths', ARRAY_AGG(DISTINCT relative_path)) AS obj + FROM NEW GROUP BY virtual_storage ) t; WHEN 'UPDATE' THEN - SELECT JSON_AGG(obj)::TEXT INTO msg + SELECT JSON_AGG(obj) INTO msg FROM ( - SELECT JSONB_BUILD_OBJECT('virtual_storage', virtual_storage, 'relative_paths', ARRAY_AGG(relative_path)) AS obj - FROM ( - SELECT virtual_storage, relative_path - FROM (SELECT DISTINCT virtual_storage, relative_path FROM NEW) t1 - UNION - SELECT virtual_storage, relative_path - FROM (SELECT DISTINCT virtual_storage, relative_path FROM OLD) t2 - ) t + SELECT JSONB_BUILD_OBJECT('virtual_storage', virtual_storage, 'relative_paths', ARRAY_AGG(DISTINCT relative_path)) AS obj + FROM NEW + FULL JOIN OLD USING (virtual_storage, relative_path) GROUP BY virtual_storage ) t; WHEN 'DELETE' THEN - SELECT JSON_AGG(obj)::TEXT INTO msg + SELECT JSON_AGG(obj) INTO msg FROM ( - SELECT JSONB_BUILD_OBJECT('virtual_storage', virtual_storage, 'relative_paths', ARRAY_AGG(relative_path)) AS obj - FROM (SELECT DISTINCT virtual_storage, relative_path FROM OLD) t + SELECT JSONB_BUILD_OBJECT('virtual_storage', virtual_storage, 'relative_paths', ARRAY_AGG(DISTINCT relative_path)) AS obj + FROM OLD GROUP BY virtual_storage ) t; END CASE; - PERFORM PG_NOTIFY(TG_ARGV[TG_NARGS-1], msg); - RETURN NULL; + CASE WHEN JSONB_ARRAY_LENGTH(msg) > 0 THEN + PERFORM PG_NOTIFY(TG_ARGV[TG_NARGS-1], msg::TEXT); + ELSE END CASE; + + RETURN NULL; END; $$ LANGUAGE plpgsql; -- +migrate StatementEnd`, diff --git a/internal/praefect/datastore/storage_provider.go b/internal/praefect/datastore/storage_provider.go index 6bb3909d1..fe8e46804 100644 --- a/internal/praefect/datastore/storage_provider.go +++ b/internal/praefect/datastore/storage_provider.go @@ -112,7 +112,6 @@ type notificationEntry struct { } func (c *CachingStorageProvider) Notification(n glsql.Notification) { - // it is a new format of the notification message var changes []notificationEntry if err := json.NewDecoder(strings.NewReader(n.Payload)).Decode(&changes); err != nil { c.disableCaching() // as we can't update cache properly we should disable it diff --git a/internal/praefect/datastore/storage_provider_test.go b/internal/praefect/datastore/storage_provider_test.go index 4a9894914..aa22ba81c 100644 --- a/internal/praefect/datastore/storage_provider_test.go +++ b/internal/praefect/datastore/storage_provider_test.go @@ -2,7 +2,7 @@ package datastore import ( "context" - "errors" + "encoding/json" "runtime" "strings" "sync" @@ -213,7 +213,9 @@ func TestCachingStorageProvider_GetSyncedNodes(t *testing.T) { require.ElementsMatch(t, []string{"g1", "g2", "g3"}, storages1) // invalid payload disables caching - cache.Notification(glsql.Notification{Channel: "notification_channel_1", Payload: ``}) + notification := glsql.Notification{Channel: "notification_channel_1", Payload: `_`} + cache.Notification(notification) + expErr := json.Unmarshal([]byte(notification.Payload), new(struct{})) // second access omits cached data as caching should be disabled storages2 := cache.GetSyncedNodes(ctx, "vs", "/repo/path/1", "g1") @@ -232,7 +234,7 @@ func TestCachingStorageProvider_GetSyncedNodes(t *testing.T) { assert.Equal(t, logrus.Fields{ "channel": "notification_channel_1", "component": "caching_storage_provider", - "error": errors.New("EOF"), + "error": expErr, }, logHook.LastEntry().Data) assert.Equal(t, logrus.ErrorLevel, logHook.LastEntry().Level) |