diff options
author | Pavlo Strokov <pstrokov@gitlab.com> | 2020-12-08 19:39:54 +0300 |
---|---|---|
committer | Pavlo Strokov <pstrokov@gitlab.com> | 2020-12-14 17:41:09 +0300 |
commit | e97131a42ed6db84edda3a3cc585eb2004f8b503 (patch) | |
tree | a47640233da5440b9a1b2c28d5cd690475166777 /internal/praefect/datastore | |
parent | c6d679391328796f4679542ab651b39de75e7a23 (diff) |
Cleanup redundant data from notification events
Postgres sends notification messages to the Praefect using triggers
on 'repositories' and 'storage_repositories' tables changes.
The payload is constructed from the data that is subject of the change:
old and new states. The notifications are used to invalidate in-memory
cached values of up to date storages used by the reads distribution. Most
of the data from the notification payload is redundant and only two fields
are used: relative_path and virtual_storage.
This change cleans up the payload by including only required fields.
It also changes the structure of the notification message to reduce
redundancy as the payload of the notification message is limited by the
Postgres instance and big payloads could generate an error 'pq: payload
string too long'.
Part of: https://gitlab.com/gitlab-org/gitaly/-/issues/3309
Diffstat (limited to 'internal/praefect/datastore')
4 files changed, 120 insertions, 70 deletions
diff --git a/internal/praefect/datastore/listener_postgres_test.go b/internal/praefect/datastore/listener_postgres_test.go index d5618c8d7..79a53d741 100644 --- a/internal/praefect/datastore/listener_postgres_test.go +++ b/internal/praefect/datastore/listener_postgres_test.go @@ -3,8 +3,10 @@ package datastore import ( + "encoding/json" "errors" "fmt" + "sort" "strings" "sync" "testing" @@ -348,6 +350,22 @@ func TestPostgresListener_Listen(t *testing.T) { }) } +func requireEqualNotificationEntries(t *testing.T, d string, entries []notificationEntry) { + t.Helper() + + var nes []notificationEntry + require.NoError(t, json.NewDecoder(strings.NewReader(d)).Decode(&nes)) + + for _, es := range [][]notificationEntry{entries, nes} { + for _, e := range es { + sort.Strings(e.RelativePaths) + } + sort.Slice(es, func(i, j int) bool { return es[i].VirtualStorage < es[j].VirtualStorage }) + } + + require.EqualValues(t, entries, nes) +} + func TestPostgresListener_Listen_repositories_delete(t *testing.T) { db := getDB(t) @@ -361,7 +379,8 @@ func TestPostgresListener_Listen_repositories_delete(t *testing.T) { INSERT INTO repositories VALUES ('praefect-1', '/path/to/repo/1', 1), ('praefect-1', '/path/to/repo/2', 1), - ('praefect-1', '/path/to/repo/3', 0)`) + ('praefect-1', '/path/to/repo/3', 0), + ('praefect-2', '/path/to/repo/1', 1)`) require.NoError(t, err) }, func(t *testing.T) { @@ -370,16 +389,10 @@ func TestPostgresListener_Listen_repositories_delete(t *testing.T) { }, func(t *testing.T, n glsql.Notification) { require.Equal(t, channel, n.Channel) - require.JSONEq(t, ` - { - "old": [ - {"virtual_storage":"praefect-1","relative_path":"/path/to/repo/1","generation":1,"primary":null}, - {"virtual_storage":"praefect-1","relative_path":"/path/to/repo/2","generation":1,"primary":null} - ], - "new" : null - }`, - n.Payload, - ) + requireEqualNotificationEntries(t, n.Payload, []notificationEntry{ + {VirtualStorage: "praefect-1", RelativePaths: []string{"/path/to/repo/1", "/path/to/repo/2"}}, + {VirtualStorage: "praefect-2", RelativePaths: []string{"/path/to/repo/1"}}, + }) }, ) } @@ -403,16 +416,7 @@ func TestPostgresListener_Listen_storage_repositories_insert(t *testing.T) { }, func(t *testing.T, n glsql.Notification) { require.Equal(t, channel, n.Channel) - require.JSONEq(t, ` - { - "old":null, - "new":[ - {"virtual_storage":"praefect-1","relative_path":"/path/to/repo","storage":"gitaly-1","generation":0,"assigned":true}, - {"virtual_storage":"praefect-1","relative_path":"/path/to/repo","storage":"gitaly-2","generation":0,"assigned":true} - ] - }`, - n.Payload, - ) + requireEqualNotificationEntries(t, n.Payload, []notificationEntry{{VirtualStorage: "praefect-1", RelativePaths: []string{"/path/to/repo"}}}) }, ) } @@ -435,13 +439,7 @@ func TestPostgresListener_Listen_storage_repositories_update(t *testing.T) { }, func(t *testing.T, n glsql.Notification) { require.Equal(t, channel, n.Channel) - require.JSONEq(t, ` - { - "old" : [{"virtual_storage":"praefect-1","relative_path":"/path/to/repo","storage":"gitaly-1","generation":0,"assigned":true}], - "new" : [{"virtual_storage":"praefect-1","relative_path":"/path/to/repo","storage":"gitaly-1","generation":1,"assigned":true}] - }`, - n.Payload, - ) + requireEqualNotificationEntries(t, n.Payload, []notificationEntry{{VirtualStorage: "praefect-1", RelativePaths: []string{"/path/to/repo"}}}) }, ) } @@ -467,13 +465,7 @@ func TestPostgresListener_Listen_storage_repositories_delete(t *testing.T) { }, func(t *testing.T, n glsql.Notification) { require.Equal(t, channel, n.Channel) - require.JSONEq(t, ` - { - "old" : [{"virtual_storage":"praefect-1","relative_path":"/path/to/repo","storage":"gitaly-1","generation":0,"assigned":true}], - "new" : null - }`, - n.Payload, - ) + requireEqualNotificationEntries(t, n.Payload, []notificationEntry{{VirtualStorage: "praefect-1", RelativePaths: []string{"/path/to/repo"}}}) }, ) } diff --git a/internal/praefect/datastore/migrations/20201208163237_cleanup_notifiactions_payload.go b/internal/praefect/datastore/migrations/20201208163237_cleanup_notifiactions_payload.go new file mode 100644 index 000000000..6213bef6f --- /dev/null +++ b/internal/praefect/datastore/migrations/20201208163237_cleanup_notifiactions_payload.go @@ -0,0 +1,76 @@ +package migrations + +import migrate "github.com/rubenv/sql-migrate" + +func init() { + m := &migrate.Migration{ + Id: "20201208163237_cleanup_notifiactions_payload", + Up: []string{ + `-- +migrate StatementBegin + CREATE OR REPLACE FUNCTION notify_on_change() RETURNS TRIGGER AS $$ + DECLARE + msg TEXT DEFAULT ''; + BEGIN + CASE TG_OP + WHEN 'INSERT' THEN + SELECT JSON_AGG(obj)::TEXT INTO msg + FROM ( + SELECT JSONB_BUILD_OBJECT('virtual_storage', virtual_storage, 'relative_paths', ARRAY_AGG(relative_path)) AS obj + FROM (SELECT DISTINCT virtual_storage, relative_path FROM NEW) t + GROUP BY virtual_storage + ) t; + WHEN 'UPDATE' THEN + SELECT JSON_AGG(obj)::TEXT INTO msg + FROM ( + SELECT JSONB_BUILD_OBJECT('virtual_storage', virtual_storage, 'relative_paths', ARRAY_AGG(relative_path)) AS obj + FROM ( + SELECT virtual_storage, relative_path + FROM (SELECT DISTINCT virtual_storage, relative_path FROM NEW) t1 + UNION + SELECT virtual_storage, relative_path + FROM (SELECT DISTINCT virtual_storage, relative_path FROM OLD) t2 + ) t + GROUP BY virtual_storage + ) t; + WHEN 'DELETE' THEN + SELECT JSON_AGG(obj)::TEXT INTO msg + FROM ( + SELECT JSONB_BUILD_OBJECT('virtual_storage', virtual_storage, 'relative_paths', ARRAY_AGG(relative_path)) AS obj + FROM (SELECT DISTINCT virtual_storage, relative_path FROM OLD) t + GROUP BY virtual_storage + ) t; + END CASE; + + PERFORM PG_NOTIFY(TG_ARGV[TG_NARGS-1], msg); + RETURN NULL; + END; + $$ LANGUAGE plpgsql; + -- +migrate StatementEnd`, + }, + Down: []string{ + `-- +migrate StatementBegin + CREATE OR REPLACE FUNCTION notify_on_change() RETURNS TRIGGER AS $$ + DECLARE + old_val JSON DEFAULT NULL; + new_val JSON DEFAULT NULL; + BEGIN + CASE TG_OP + WHEN 'INSERT' THEN + SELECT JSON_AGG(ROW_TO_JSON(t.*)) INTO new_val FROM NEW AS t; + WHEN 'UPDATE' THEN + SELECT JSON_AGG(ROW_TO_JSON(t.*)) INTO old_val FROM OLD AS t; + SELECT JSON_AGG(ROW_TO_JSON(t.*)) INTO new_val FROM NEW AS t; + WHEN 'DELETE' THEN + SELECT JSON_AGG(ROW_TO_JSON(t.*)) INTO old_val FROM OLD AS t; + END CASE; + + PERFORM PG_NOTIFY(TG_ARGV[TG_NARGS-1], JSON_BUILD_OBJECT('old', old_val, 'new', new_val)::TEXT); + RETURN NULL; + END; + $$ LANGUAGE plpgsql; + -- +migrate StatementEnd`, + }, + } + + allMigrations = append(allMigrations, m) +} diff --git a/internal/praefect/datastore/storage_provider.go b/internal/praefect/datastore/storage_provider.go index 4b52c0304..6bb3909d1 100644 --- a/internal/praefect/datastore/storage_provider.go +++ b/internal/praefect/datastore/storage_provider.go @@ -106,41 +106,28 @@ func NewCachingStorageProvider(logger logrus.FieldLogger, sp SecondariesProvider return csp, nil } -type ( - notificationEntry struct { - VirtualStorage string `json:"virtual_storage"` - RelativePath string `json:"relative_path"` - } - - changeNotification struct { - Old []notificationEntry `json:"old"` - New []notificationEntry `json:"new"` - } -) +type notificationEntry struct { + VirtualStorage string `json:"virtual_storage"` + RelativePaths []string `json:"relative_paths"` +} func (c *CachingStorageProvider) Notification(n glsql.Notification) { - var change changeNotification - if err := json.NewDecoder(strings.NewReader(n.Payload)).Decode(&change); err != nil { + // it is a new format of the notification message + var changes []notificationEntry + if err := json.NewDecoder(strings.NewReader(n.Payload)).Decode(&changes); err != nil { c.disableCaching() // as we can't update cache properly we should disable it c.callbackLogger.WithError(err).WithField("channel", n.Channel).Error("received payload can't be processed, cache disabled") return } - entries := map[string][]string{} - for _, notificationEntries := range [][]notificationEntry{change.Old, change.New} { - for _, entry := range notificationEntries { - entries[entry.VirtualStorage] = append(entries[entry.VirtualStorage], entry.RelativePath) - } - } - - for virtualStorage, relativePaths := range entries { - cache, found := c.getCache(virtualStorage) + for _, entry := range changes { + cache, found := c.getCache(entry.VirtualStorage) if !found { - c.callbackLogger.WithError(errNotExistingVirtualStorage).WithField("virtual_storage", virtualStorage).Error("cache not found") + c.callbackLogger.WithError(errNotExistingVirtualStorage).WithField("virtual_storage", entry.VirtualStorage).Error("cache not found") continue } - for _, relativePath := range relativePaths { + for _, relativePath := range entry.RelativePaths { cache.Remove(relativePath) } } diff --git a/internal/praefect/datastore/storage_provider_test.go b/internal/praefect/datastore/storage_provider_test.go index 45b422ad0..4a9894914 100644 --- a/internal/praefect/datastore/storage_provider_test.go +++ b/internal/praefect/datastore/storage_provider_test.go @@ -219,9 +219,6 @@ func TestCachingStorageProvider_GetSyncedNodes(t *testing.T) { storages2 := cache.GetSyncedNodes(ctx, "vs", "/repo/path/1", "g1") require.ElementsMatch(t, []string{"g1", "g2", "g3"}, storages2) - // valid payload enables caching again - cache.Notification(glsql.Notification{Channel: "notification_channel_2", Payload: `{}`}) - // third access retrieves data and caches it storages3 := cache.GetSyncedNodes(ctx, "vs", "/repo/path/1", "g1") require.ElementsMatch(t, []string{"g1", "g2", "g3"}, storages3) @@ -271,12 +268,10 @@ func TestCachingStorageProvider_GetSyncedNodes(t *testing.T) { // notification evicts entries for '/repo/path/2' from the cache cache.Notification(glsql.Notification{Payload: ` - { - "old":[ - {"virtual_storage": "bad", "relative_path": "/repo/path/1"} - ], - "new":[{"virtual_storage": "vs", "relative_path": "/repo/path/2"}] - }`}, + [ + {"virtual_storage": "bad", "relative_paths": ["/repo/path/1"]}, + {"virtual_storage": "vs", "relative_paths": ["/repo/path/2"]} + ]`}, ) // second access re-uses cached data for '/repo/path/1' @@ -342,8 +337,8 @@ func TestCachingStorageProvider_GetSyncedNodes(t *testing.T) { require.NoError(t, err) cache.Connected() - nf1 := glsql.Notification{Payload: `{"new":[{"virtual_storage":"vs","relative_path":"/repo/path/1"}]}`} - nf2 := glsql.Notification{Payload: `{"new":[{"virtual_storage":"vs","relative_path":"/repo/path/2"}]}`} + nf1 := glsql.Notification{Payload: `[{"virtual_storage": "vs", "relative_paths": ["/repo/path/1"]}]`} + nf2 := glsql.Notification{Payload: `[{"virtual_storage": "vs", "relative_paths": ["/repo/path/2"]}]`} var operations []func() for i := 0; i < 100; i++ { |