Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPavlo Strokov <pstrokov@gitlab.com>2020-12-08 19:39:54 +0300
committerPavlo Strokov <pstrokov@gitlab.com>2020-12-14 17:41:09 +0300
commite97131a42ed6db84edda3a3cc585eb2004f8b503 (patch)
treea47640233da5440b9a1b2c28d5cd690475166777 /internal/praefect/datastore
parentc6d679391328796f4679542ab651b39de75e7a23 (diff)
Cleanup redundant data from notification events
Postgres sends notification messages to the Praefect using triggers on 'repositories' and 'storage_repositories' tables changes. The payload is constructed from the data that is subject of the change: old and new states. The notifications are used to invalidate in-memory cached values of up to date storages used by the reads distribution. Most of the data from the notification payload is redundant and only two fields are used: relative_path and virtual_storage. This change cleans up the payload by including only required fields. It also changes the structure of the notification message to reduce redundancy as the payload of the notification message is limited by the Postgres instance and big payloads could generate an error 'pq: payload string too long'. Part of: https://gitlab.com/gitlab-org/gitaly/-/issues/3309
Diffstat (limited to 'internal/praefect/datastore')
-rw-r--r--internal/praefect/datastore/listener_postgres_test.go62
-rw-r--r--internal/praefect/datastore/migrations/20201208163237_cleanup_notifiactions_payload.go76
-rw-r--r--internal/praefect/datastore/storage_provider.go35
-rw-r--r--internal/praefect/datastore/storage_provider_test.go17
4 files changed, 120 insertions, 70 deletions
diff --git a/internal/praefect/datastore/listener_postgres_test.go b/internal/praefect/datastore/listener_postgres_test.go
index d5618c8d7..79a53d741 100644
--- a/internal/praefect/datastore/listener_postgres_test.go
+++ b/internal/praefect/datastore/listener_postgres_test.go
@@ -3,8 +3,10 @@
package datastore
import (
+ "encoding/json"
"errors"
"fmt"
+ "sort"
"strings"
"sync"
"testing"
@@ -348,6 +350,22 @@ func TestPostgresListener_Listen(t *testing.T) {
})
}
+func requireEqualNotificationEntries(t *testing.T, d string, entries []notificationEntry) {
+ t.Helper()
+
+ var nes []notificationEntry
+ require.NoError(t, json.NewDecoder(strings.NewReader(d)).Decode(&nes))
+
+ for _, es := range [][]notificationEntry{entries, nes} {
+ for _, e := range es {
+ sort.Strings(e.RelativePaths)
+ }
+ sort.Slice(es, func(i, j int) bool { return es[i].VirtualStorage < es[j].VirtualStorage })
+ }
+
+ require.EqualValues(t, entries, nes)
+}
+
func TestPostgresListener_Listen_repositories_delete(t *testing.T) {
db := getDB(t)
@@ -361,7 +379,8 @@ func TestPostgresListener_Listen_repositories_delete(t *testing.T) {
INSERT INTO repositories
VALUES ('praefect-1', '/path/to/repo/1', 1),
('praefect-1', '/path/to/repo/2', 1),
- ('praefect-1', '/path/to/repo/3', 0)`)
+ ('praefect-1', '/path/to/repo/3', 0),
+ ('praefect-2', '/path/to/repo/1', 1)`)
require.NoError(t, err)
},
func(t *testing.T) {
@@ -370,16 +389,10 @@ func TestPostgresListener_Listen_repositories_delete(t *testing.T) {
},
func(t *testing.T, n glsql.Notification) {
require.Equal(t, channel, n.Channel)
- require.JSONEq(t, `
- {
- "old": [
- {"virtual_storage":"praefect-1","relative_path":"/path/to/repo/1","generation":1,"primary":null},
- {"virtual_storage":"praefect-1","relative_path":"/path/to/repo/2","generation":1,"primary":null}
- ],
- "new" : null
- }`,
- n.Payload,
- )
+ requireEqualNotificationEntries(t, n.Payload, []notificationEntry{
+ {VirtualStorage: "praefect-1", RelativePaths: []string{"/path/to/repo/1", "/path/to/repo/2"}},
+ {VirtualStorage: "praefect-2", RelativePaths: []string{"/path/to/repo/1"}},
+ })
},
)
}
@@ -403,16 +416,7 @@ func TestPostgresListener_Listen_storage_repositories_insert(t *testing.T) {
},
func(t *testing.T, n glsql.Notification) {
require.Equal(t, channel, n.Channel)
- require.JSONEq(t, `
- {
- "old":null,
- "new":[
- {"virtual_storage":"praefect-1","relative_path":"/path/to/repo","storage":"gitaly-1","generation":0,"assigned":true},
- {"virtual_storage":"praefect-1","relative_path":"/path/to/repo","storage":"gitaly-2","generation":0,"assigned":true}
- ]
- }`,
- n.Payload,
- )
+ requireEqualNotificationEntries(t, n.Payload, []notificationEntry{{VirtualStorage: "praefect-1", RelativePaths: []string{"/path/to/repo"}}})
},
)
}
@@ -435,13 +439,7 @@ func TestPostgresListener_Listen_storage_repositories_update(t *testing.T) {
},
func(t *testing.T, n glsql.Notification) {
require.Equal(t, channel, n.Channel)
- require.JSONEq(t, `
- {
- "old" : [{"virtual_storage":"praefect-1","relative_path":"/path/to/repo","storage":"gitaly-1","generation":0,"assigned":true}],
- "new" : [{"virtual_storage":"praefect-1","relative_path":"/path/to/repo","storage":"gitaly-1","generation":1,"assigned":true}]
- }`,
- n.Payload,
- )
+ requireEqualNotificationEntries(t, n.Payload, []notificationEntry{{VirtualStorage: "praefect-1", RelativePaths: []string{"/path/to/repo"}}})
},
)
}
@@ -467,13 +465,7 @@ func TestPostgresListener_Listen_storage_repositories_delete(t *testing.T) {
},
func(t *testing.T, n glsql.Notification) {
require.Equal(t, channel, n.Channel)
- require.JSONEq(t, `
- {
- "old" : [{"virtual_storage":"praefect-1","relative_path":"/path/to/repo","storage":"gitaly-1","generation":0,"assigned":true}],
- "new" : null
- }`,
- n.Payload,
- )
+ requireEqualNotificationEntries(t, n.Payload, []notificationEntry{{VirtualStorage: "praefect-1", RelativePaths: []string{"/path/to/repo"}}})
},
)
}
diff --git a/internal/praefect/datastore/migrations/20201208163237_cleanup_notifiactions_payload.go b/internal/praefect/datastore/migrations/20201208163237_cleanup_notifiactions_payload.go
new file mode 100644
index 000000000..6213bef6f
--- /dev/null
+++ b/internal/praefect/datastore/migrations/20201208163237_cleanup_notifiactions_payload.go
@@ -0,0 +1,76 @@
+package migrations
+
+import migrate "github.com/rubenv/sql-migrate"
+
+func init() {
+ m := &migrate.Migration{
+ Id: "20201208163237_cleanup_notifiactions_payload",
+ Up: []string{
+ `-- +migrate StatementBegin
+ CREATE OR REPLACE FUNCTION notify_on_change() RETURNS TRIGGER AS $$
+ DECLARE
+ msg TEXT DEFAULT '';
+ BEGIN
+ CASE TG_OP
+ WHEN 'INSERT' THEN
+ SELECT JSON_AGG(obj)::TEXT INTO msg
+ FROM (
+ SELECT JSONB_BUILD_OBJECT('virtual_storage', virtual_storage, 'relative_paths', ARRAY_AGG(relative_path)) AS obj
+ FROM (SELECT DISTINCT virtual_storage, relative_path FROM NEW) t
+ GROUP BY virtual_storage
+ ) t;
+ WHEN 'UPDATE' THEN
+ SELECT JSON_AGG(obj)::TEXT INTO msg
+ FROM (
+ SELECT JSONB_BUILD_OBJECT('virtual_storage', virtual_storage, 'relative_paths', ARRAY_AGG(relative_path)) AS obj
+ FROM (
+ SELECT virtual_storage, relative_path
+ FROM (SELECT DISTINCT virtual_storage, relative_path FROM NEW) t1
+ UNION
+ SELECT virtual_storage, relative_path
+ FROM (SELECT DISTINCT virtual_storage, relative_path FROM OLD) t2
+ ) t
+ GROUP BY virtual_storage
+ ) t;
+ WHEN 'DELETE' THEN
+ SELECT JSON_AGG(obj)::TEXT INTO msg
+ FROM (
+ SELECT JSONB_BUILD_OBJECT('virtual_storage', virtual_storage, 'relative_paths', ARRAY_AGG(relative_path)) AS obj
+ FROM (SELECT DISTINCT virtual_storage, relative_path FROM OLD) t
+ GROUP BY virtual_storage
+ ) t;
+ END CASE;
+
+ PERFORM PG_NOTIFY(TG_ARGV[TG_NARGS-1], msg);
+ RETURN NULL;
+ END;
+ $$ LANGUAGE plpgsql;
+ -- +migrate StatementEnd`,
+ },
+ Down: []string{
+ `-- +migrate StatementBegin
+ CREATE OR REPLACE FUNCTION notify_on_change() RETURNS TRIGGER AS $$
+ DECLARE
+ old_val JSON DEFAULT NULL;
+ new_val JSON DEFAULT NULL;
+ BEGIN
+ CASE TG_OP
+ WHEN 'INSERT' THEN
+ SELECT JSON_AGG(ROW_TO_JSON(t.*)) INTO new_val FROM NEW AS t;
+ WHEN 'UPDATE' THEN
+ SELECT JSON_AGG(ROW_TO_JSON(t.*)) INTO old_val FROM OLD AS t;
+ SELECT JSON_AGG(ROW_TO_JSON(t.*)) INTO new_val FROM NEW AS t;
+ WHEN 'DELETE' THEN
+ SELECT JSON_AGG(ROW_TO_JSON(t.*)) INTO old_val FROM OLD AS t;
+ END CASE;
+
+ PERFORM PG_NOTIFY(TG_ARGV[TG_NARGS-1], JSON_BUILD_OBJECT('old', old_val, 'new', new_val)::TEXT);
+ RETURN NULL;
+ END;
+ $$ LANGUAGE plpgsql;
+ -- +migrate StatementEnd`,
+ },
+ }
+
+ allMigrations = append(allMigrations, m)
+}
diff --git a/internal/praefect/datastore/storage_provider.go b/internal/praefect/datastore/storage_provider.go
index 4b52c0304..6bb3909d1 100644
--- a/internal/praefect/datastore/storage_provider.go
+++ b/internal/praefect/datastore/storage_provider.go
@@ -106,41 +106,28 @@ func NewCachingStorageProvider(logger logrus.FieldLogger, sp SecondariesProvider
return csp, nil
}
-type (
- notificationEntry struct {
- VirtualStorage string `json:"virtual_storage"`
- RelativePath string `json:"relative_path"`
- }
-
- changeNotification struct {
- Old []notificationEntry `json:"old"`
- New []notificationEntry `json:"new"`
- }
-)
+type notificationEntry struct {
+ VirtualStorage string `json:"virtual_storage"`
+ RelativePaths []string `json:"relative_paths"`
+}
func (c *CachingStorageProvider) Notification(n glsql.Notification) {
- var change changeNotification
- if err := json.NewDecoder(strings.NewReader(n.Payload)).Decode(&change); err != nil {
+ // it is a new format of the notification message
+ var changes []notificationEntry
+ if err := json.NewDecoder(strings.NewReader(n.Payload)).Decode(&changes); err != nil {
c.disableCaching() // as we can't update cache properly we should disable it
c.callbackLogger.WithError(err).WithField("channel", n.Channel).Error("received payload can't be processed, cache disabled")
return
}
- entries := map[string][]string{}
- for _, notificationEntries := range [][]notificationEntry{change.Old, change.New} {
- for _, entry := range notificationEntries {
- entries[entry.VirtualStorage] = append(entries[entry.VirtualStorage], entry.RelativePath)
- }
- }
-
- for virtualStorage, relativePaths := range entries {
- cache, found := c.getCache(virtualStorage)
+ for _, entry := range changes {
+ cache, found := c.getCache(entry.VirtualStorage)
if !found {
- c.callbackLogger.WithError(errNotExistingVirtualStorage).WithField("virtual_storage", virtualStorage).Error("cache not found")
+ c.callbackLogger.WithError(errNotExistingVirtualStorage).WithField("virtual_storage", entry.VirtualStorage).Error("cache not found")
continue
}
- for _, relativePath := range relativePaths {
+ for _, relativePath := range entry.RelativePaths {
cache.Remove(relativePath)
}
}
diff --git a/internal/praefect/datastore/storage_provider_test.go b/internal/praefect/datastore/storage_provider_test.go
index 45b422ad0..4a9894914 100644
--- a/internal/praefect/datastore/storage_provider_test.go
+++ b/internal/praefect/datastore/storage_provider_test.go
@@ -219,9 +219,6 @@ func TestCachingStorageProvider_GetSyncedNodes(t *testing.T) {
storages2 := cache.GetSyncedNodes(ctx, "vs", "/repo/path/1", "g1")
require.ElementsMatch(t, []string{"g1", "g2", "g3"}, storages2)
- // valid payload enables caching again
- cache.Notification(glsql.Notification{Channel: "notification_channel_2", Payload: `{}`})
-
// third access retrieves data and caches it
storages3 := cache.GetSyncedNodes(ctx, "vs", "/repo/path/1", "g1")
require.ElementsMatch(t, []string{"g1", "g2", "g3"}, storages3)
@@ -271,12 +268,10 @@ func TestCachingStorageProvider_GetSyncedNodes(t *testing.T) {
// notification evicts entries for '/repo/path/2' from the cache
cache.Notification(glsql.Notification{Payload: `
- {
- "old":[
- {"virtual_storage": "bad", "relative_path": "/repo/path/1"}
- ],
- "new":[{"virtual_storage": "vs", "relative_path": "/repo/path/2"}]
- }`},
+ [
+ {"virtual_storage": "bad", "relative_paths": ["/repo/path/1"]},
+ {"virtual_storage": "vs", "relative_paths": ["/repo/path/2"]}
+ ]`},
)
// second access re-uses cached data for '/repo/path/1'
@@ -342,8 +337,8 @@ func TestCachingStorageProvider_GetSyncedNodes(t *testing.T) {
require.NoError(t, err)
cache.Connected()
- nf1 := glsql.Notification{Payload: `{"new":[{"virtual_storage":"vs","relative_path":"/repo/path/1"}]}`}
- nf2 := glsql.Notification{Payload: `{"new":[{"virtual_storage":"vs","relative_path":"/repo/path/2"}]}`}
+ nf1 := glsql.Notification{Payload: `[{"virtual_storage": "vs", "relative_paths": ["/repo/path/1"]}]`}
+ nf2 := glsql.Notification{Payload: `[{"virtual_storage": "vs", "relative_paths": ["/repo/path/2"]}]`}
var operations []func()
for i := 0; i < 100; i++ {