diff options
author | Toon Claes <toon@gitlab.com> | 2020-12-15 20:37:35 +0300 |
---|---|---|
committer | Toon Claes <toon@gitlab.com> | 2020-12-15 20:37:35 +0300 |
commit | 84f04b8241de21a20cdcd42b4575c1894074e1d6 (patch) | |
tree | 745ebb04108983afb16859e8f5c0c99057bc8196 | |
parent | df62df25a02c8d72660c7fd72eb49c1e8c5ca03f (diff) | |
parent | bb6f1bb506a3e3dc3ce3cef5f2a65036f8e5a4d4 (diff) |
Merge branch 'ps-notifications-payload-cleanup' into 'master'
Cleanup redundant data from notification events
Closes #3309
See merge request gitlab-org/gitaly!2893
5 files changed, 151 insertions, 73 deletions
diff --git a/changelogs/unreleased/ps-notifications-payload-cleanup.yml b/changelogs/unreleased/ps-notifications-payload-cleanup.yml new file mode 100644 index 000000000..2a2e17cb5 --- /dev/null +++ b/changelogs/unreleased/ps-notifications-payload-cleanup.yml @@ -0,0 +1,5 @@ +--- +title: Cleanup redundant data from notification events +merge_request: 2893 +author: +type: changed diff --git a/internal/praefect/datastore/listener_postgres_test.go b/internal/praefect/datastore/listener_postgres_test.go index d5618c8d7..d402368d4 100644 --- a/internal/praefect/datastore/listener_postgres_test.go +++ b/internal/praefect/datastore/listener_postgres_test.go @@ -3,8 +3,10 @@ package datastore import ( + "encoding/json" "errors" "fmt" + "sort" "strings" "sync" "testing" @@ -348,6 +350,22 @@ func TestPostgresListener_Listen(t *testing.T) { }) } +func requireEqualNotificationEntries(t *testing.T, d string, entries []notificationEntry) { + t.Helper() + + var nes []notificationEntry + require.NoError(t, json.NewDecoder(strings.NewReader(d)).Decode(&nes)) + + for _, es := range [][]notificationEntry{entries, nes} { + for _, e := range es { + sort.Strings(e.RelativePaths) + } + sort.Slice(es, func(i, j int) bool { return es[i].VirtualStorage < es[j].VirtualStorage }) + } + + require.EqualValues(t, entries, nes) +} + func TestPostgresListener_Listen_repositories_delete(t *testing.T) { db := getDB(t) @@ -361,7 +379,8 @@ func TestPostgresListener_Listen_repositories_delete(t *testing.T) { INSERT INTO repositories VALUES ('praefect-1', '/path/to/repo/1', 1), ('praefect-1', '/path/to/repo/2', 1), - ('praefect-1', '/path/to/repo/3', 0)`) + ('praefect-1', '/path/to/repo/3', 0), + ('praefect-2', '/path/to/repo/1', 1)`) require.NoError(t, err) }, func(t *testing.T) { @@ -370,16 +389,10 @@ func TestPostgresListener_Listen_repositories_delete(t *testing.T) { }, func(t *testing.T, n glsql.Notification) { require.Equal(t, channel, n.Channel) - require.JSONEq(t, ` - { - "old": [ - {"virtual_storage":"praefect-1","relative_path":"/path/to/repo/1","generation":1,"primary":null}, - {"virtual_storage":"praefect-1","relative_path":"/path/to/repo/2","generation":1,"primary":null} - ], - "new" : null - }`, - n.Payload, - ) + requireEqualNotificationEntries(t, n.Payload, []notificationEntry{ + {VirtualStorage: "praefect-1", RelativePaths: []string{"/path/to/repo/1", "/path/to/repo/2"}}, + {VirtualStorage: "praefect-2", RelativePaths: []string{"/path/to/repo/1"}}, + }) }, ) } @@ -403,16 +416,7 @@ func TestPostgresListener_Listen_storage_repositories_insert(t *testing.T) { }, func(t *testing.T, n glsql.Notification) { require.Equal(t, channel, n.Channel) - require.JSONEq(t, ` - { - "old":null, - "new":[ - {"virtual_storage":"praefect-1","relative_path":"/path/to/repo","storage":"gitaly-1","generation":0,"assigned":true}, - {"virtual_storage":"praefect-1","relative_path":"/path/to/repo","storage":"gitaly-2","generation":0,"assigned":true} - ] - }`, - n.Payload, - ) + requireEqualNotificationEntries(t, n.Payload, []notificationEntry{{VirtualStorage: "praefect-1", RelativePaths: []string{"/path/to/repo"}}}) }, ) } @@ -435,14 +439,25 @@ func TestPostgresListener_Listen_storage_repositories_update(t *testing.T) { }, func(t *testing.T, n glsql.Notification) { require.Equal(t, channel, n.Channel) - require.JSONEq(t, ` - { - "old" : [{"virtual_storage":"praefect-1","relative_path":"/path/to/repo","storage":"gitaly-1","generation":0,"assigned":true}], - "new" : [{"virtual_storage":"praefect-1","relative_path":"/path/to/repo","storage":"gitaly-1","generation":1,"assigned":true}] - }`, - n.Payload, - ) + requireEqualNotificationEntries(t, n.Payload, []notificationEntry{{VirtualStorage: "praefect-1", RelativePaths: []string{"/path/to/repo"}}}) + }, + ) +} + +func TestPostgresListener_Listen_storage_empty_notification(t *testing.T) { + db := getDB(t) + + const channel = "storage_repositories_updates" + + testListener( + t, + channel, + func(t *testing.T) {}, + func(t *testing.T) { + _, err := db.DB.Exec(`UPDATE storage_repositories SET generation = 1`) + require.NoError(t, err) }, + nil, // no notification events expected ) } @@ -467,13 +482,7 @@ func TestPostgresListener_Listen_storage_repositories_delete(t *testing.T) { }, func(t *testing.T, n glsql.Notification) { require.Equal(t, channel, n.Channel) - require.JSONEq(t, ` - { - "old" : [{"virtual_storage":"praefect-1","relative_path":"/path/to/repo","storage":"gitaly-1","generation":0,"assigned":true}], - "new" : null - }`, - n.Payload, - ) + requireEqualNotificationEntries(t, n.Payload, []notificationEntry{{VirtualStorage: "praefect-1", RelativePaths: []string{"/path/to/repo"}}}) }, ) } @@ -515,9 +524,16 @@ func testListener(t *testing.T, channel string, setup func(t *testing.T), trigge select { case <-time.After(time.Second): + if verifier == nil { + // no notifications expected + return + } require.FailNow(t, "no notifications for too long period") case <-receivedChan: } + if verifier == nil { + require.Failf(t, "no notifications expected", "received: %v", notification) + } verifier(t, notification) } diff --git a/internal/praefect/datastore/migrations/20201208163237_cleanup_notifiactions_payload.go b/internal/praefect/datastore/migrations/20201208163237_cleanup_notifiactions_payload.go new file mode 100644 index 000000000..a1a79b56b --- /dev/null +++ b/internal/praefect/datastore/migrations/20201208163237_cleanup_notifiactions_payload.go @@ -0,0 +1,74 @@ +package migrations + +import migrate "github.com/rubenv/sql-migrate" + +func init() { + m := &migrate.Migration{ + Id: "20201208163237_cleanup_notifiactions_payload", + Up: []string{ + `-- +migrate StatementBegin + CREATE OR REPLACE FUNCTION notify_on_change() RETURNS TRIGGER AS $$ + DECLARE + msg JSONB; + BEGIN + CASE TG_OP + WHEN 'INSERT' THEN + SELECT JSON_AGG(obj) INTO msg + FROM ( + SELECT JSONB_BUILD_OBJECT('virtual_storage', virtual_storage, 'relative_paths', ARRAY_AGG(DISTINCT relative_path)) AS obj + FROM NEW + GROUP BY virtual_storage + ) t; + WHEN 'UPDATE' THEN + SELECT JSON_AGG(obj) INTO msg + FROM ( + SELECT JSONB_BUILD_OBJECT('virtual_storage', virtual_storage, 'relative_paths', ARRAY_AGG(DISTINCT relative_path)) AS obj + FROM NEW + FULL JOIN OLD USING (virtual_storage, relative_path) + GROUP BY virtual_storage + ) t; + WHEN 'DELETE' THEN + SELECT JSON_AGG(obj) INTO msg + FROM ( + SELECT JSONB_BUILD_OBJECT('virtual_storage', virtual_storage, 'relative_paths', ARRAY_AGG(DISTINCT relative_path)) AS obj + FROM OLD + GROUP BY virtual_storage + ) t; + END CASE; + + CASE WHEN JSONB_ARRAY_LENGTH(msg) > 0 THEN + PERFORM PG_NOTIFY(TG_ARGV[TG_NARGS-1], msg::TEXT); + ELSE END CASE; + + RETURN NULL; + END; + $$ LANGUAGE plpgsql; + -- +migrate StatementEnd`, + }, + Down: []string{ + `-- +migrate StatementBegin + CREATE OR REPLACE FUNCTION notify_on_change() RETURNS TRIGGER AS $$ + DECLARE + old_val JSON DEFAULT NULL; + new_val JSON DEFAULT NULL; + BEGIN + CASE TG_OP + WHEN 'INSERT' THEN + SELECT JSON_AGG(ROW_TO_JSON(t.*)) INTO new_val FROM NEW AS t; + WHEN 'UPDATE' THEN + SELECT JSON_AGG(ROW_TO_JSON(t.*)) INTO old_val FROM OLD AS t; + SELECT JSON_AGG(ROW_TO_JSON(t.*)) INTO new_val FROM NEW AS t; + WHEN 'DELETE' THEN + SELECT JSON_AGG(ROW_TO_JSON(t.*)) INTO old_val FROM OLD AS t; + END CASE; + + PERFORM PG_NOTIFY(TG_ARGV[TG_NARGS-1], JSON_BUILD_OBJECT('old', old_val, 'new', new_val)::TEXT); + RETURN NULL; + END; + $$ LANGUAGE plpgsql; + -- +migrate StatementEnd`, + }, + } + + allMigrations = append(allMigrations, m) +} diff --git a/internal/praefect/datastore/storage_provider.go b/internal/praefect/datastore/storage_provider.go index 4b52c0304..fe8e46804 100644 --- a/internal/praefect/datastore/storage_provider.go +++ b/internal/praefect/datastore/storage_provider.go @@ -106,41 +106,27 @@ func NewCachingStorageProvider(logger logrus.FieldLogger, sp SecondariesProvider return csp, nil } -type ( - notificationEntry struct { - VirtualStorage string `json:"virtual_storage"` - RelativePath string `json:"relative_path"` - } - - changeNotification struct { - Old []notificationEntry `json:"old"` - New []notificationEntry `json:"new"` - } -) +type notificationEntry struct { + VirtualStorage string `json:"virtual_storage"` + RelativePaths []string `json:"relative_paths"` +} func (c *CachingStorageProvider) Notification(n glsql.Notification) { - var change changeNotification - if err := json.NewDecoder(strings.NewReader(n.Payload)).Decode(&change); err != nil { + var changes []notificationEntry + if err := json.NewDecoder(strings.NewReader(n.Payload)).Decode(&changes); err != nil { c.disableCaching() // as we can't update cache properly we should disable it c.callbackLogger.WithError(err).WithField("channel", n.Channel).Error("received payload can't be processed, cache disabled") return } - entries := map[string][]string{} - for _, notificationEntries := range [][]notificationEntry{change.Old, change.New} { - for _, entry := range notificationEntries { - entries[entry.VirtualStorage] = append(entries[entry.VirtualStorage], entry.RelativePath) - } - } - - for virtualStorage, relativePaths := range entries { - cache, found := c.getCache(virtualStorage) + for _, entry := range changes { + cache, found := c.getCache(entry.VirtualStorage) if !found { - c.callbackLogger.WithError(errNotExistingVirtualStorage).WithField("virtual_storage", virtualStorage).Error("cache not found") + c.callbackLogger.WithError(errNotExistingVirtualStorage).WithField("virtual_storage", entry.VirtualStorage).Error("cache not found") continue } - for _, relativePath := range relativePaths { + for _, relativePath := range entry.RelativePaths { cache.Remove(relativePath) } } diff --git a/internal/praefect/datastore/storage_provider_test.go b/internal/praefect/datastore/storage_provider_test.go index 45b422ad0..aa22ba81c 100644 --- a/internal/praefect/datastore/storage_provider_test.go +++ b/internal/praefect/datastore/storage_provider_test.go @@ -2,7 +2,7 @@ package datastore import ( "context" - "errors" + "encoding/json" "runtime" "strings" "sync" @@ -213,15 +213,14 @@ func TestCachingStorageProvider_GetSyncedNodes(t *testing.T) { require.ElementsMatch(t, []string{"g1", "g2", "g3"}, storages1) // invalid payload disables caching - cache.Notification(glsql.Notification{Channel: "notification_channel_1", Payload: ``}) + notification := glsql.Notification{Channel: "notification_channel_1", Payload: `_`} + cache.Notification(notification) + expErr := json.Unmarshal([]byte(notification.Payload), new(struct{})) // second access omits cached data as caching should be disabled storages2 := cache.GetSyncedNodes(ctx, "vs", "/repo/path/1", "g1") require.ElementsMatch(t, []string{"g1", "g2", "g3"}, storages2) - // valid payload enables caching again - cache.Notification(glsql.Notification{Channel: "notification_channel_2", Payload: `{}`}) - // third access retrieves data and caches it storages3 := cache.GetSyncedNodes(ctx, "vs", "/repo/path/1", "g1") require.ElementsMatch(t, []string{"g1", "g2", "g3"}, storages3) @@ -235,7 +234,7 @@ func TestCachingStorageProvider_GetSyncedNodes(t *testing.T) { assert.Equal(t, logrus.Fields{ "channel": "notification_channel_1", "component": "caching_storage_provider", - "error": errors.New("EOF"), + "error": expErr, }, logHook.LastEntry().Data) assert.Equal(t, logrus.ErrorLevel, logHook.LastEntry().Level) @@ -271,12 +270,10 @@ func TestCachingStorageProvider_GetSyncedNodes(t *testing.T) { // notification evicts entries for '/repo/path/2' from the cache cache.Notification(glsql.Notification{Payload: ` - { - "old":[ - {"virtual_storage": "bad", "relative_path": "/repo/path/1"} - ], - "new":[{"virtual_storage": "vs", "relative_path": "/repo/path/2"}] - }`}, + [ + {"virtual_storage": "bad", "relative_paths": ["/repo/path/1"]}, + {"virtual_storage": "vs", "relative_paths": ["/repo/path/2"]} + ]`}, ) // second access re-uses cached data for '/repo/path/1' @@ -342,8 +339,8 @@ func TestCachingStorageProvider_GetSyncedNodes(t *testing.T) { require.NoError(t, err) cache.Connected() - nf1 := glsql.Notification{Payload: `{"new":[{"virtual_storage":"vs","relative_path":"/repo/path/1"}]}`} - nf2 := glsql.Notification{Payload: `{"new":[{"virtual_storage":"vs","relative_path":"/repo/path/2"}]}`} + nf1 := glsql.Notification{Payload: `[{"virtual_storage": "vs", "relative_paths": ["/repo/path/1"]}]`} + nf2 := glsql.Notification{Payload: `[{"virtual_storage": "vs", "relative_paths": ["/repo/path/2"]}]`} var operations []func() for i := 0; i < 100; i++ { |