Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorToon Claes <toon@gitlab.com>2020-12-15 20:37:35 +0300
committerToon Claes <toon@gitlab.com>2020-12-15 20:37:35 +0300
commit84f04b8241de21a20cdcd42b4575c1894074e1d6 (patch)
tree745ebb04108983afb16859e8f5c0c99057bc8196
parentdf62df25a02c8d72660c7fd72eb49c1e8c5ca03f (diff)
parentbb6f1bb506a3e3dc3ce3cef5f2a65036f8e5a4d4 (diff)
Merge branch 'ps-notifications-payload-cleanup' into 'master'
Cleanup redundant data from notification events Closes #3309 See merge request gitlab-org/gitaly!2893
-rw-r--r--changelogs/unreleased/ps-notifications-payload-cleanup.yml5
-rw-r--r--internal/praefect/datastore/listener_postgres_test.go86
-rw-r--r--internal/praefect/datastore/migrations/20201208163237_cleanup_notifiactions_payload.go74
-rw-r--r--internal/praefect/datastore/storage_provider.go34
-rw-r--r--internal/praefect/datastore/storage_provider_test.go25
5 files changed, 151 insertions, 73 deletions
diff --git a/changelogs/unreleased/ps-notifications-payload-cleanup.yml b/changelogs/unreleased/ps-notifications-payload-cleanup.yml
new file mode 100644
index 000000000..2a2e17cb5
--- /dev/null
+++ b/changelogs/unreleased/ps-notifications-payload-cleanup.yml
@@ -0,0 +1,5 @@
+---
+title: Cleanup redundant data from notification events
+merge_request: 2893
+author:
+type: changed
diff --git a/internal/praefect/datastore/listener_postgres_test.go b/internal/praefect/datastore/listener_postgres_test.go
index d5618c8d7..d402368d4 100644
--- a/internal/praefect/datastore/listener_postgres_test.go
+++ b/internal/praefect/datastore/listener_postgres_test.go
@@ -3,8 +3,10 @@
package datastore
import (
+ "encoding/json"
"errors"
"fmt"
+ "sort"
"strings"
"sync"
"testing"
@@ -348,6 +350,22 @@ func TestPostgresListener_Listen(t *testing.T) {
})
}
+func requireEqualNotificationEntries(t *testing.T, d string, entries []notificationEntry) {
+ t.Helper()
+
+ var nes []notificationEntry
+ require.NoError(t, json.NewDecoder(strings.NewReader(d)).Decode(&nes))
+
+ for _, es := range [][]notificationEntry{entries, nes} {
+ for _, e := range es {
+ sort.Strings(e.RelativePaths)
+ }
+ sort.Slice(es, func(i, j int) bool { return es[i].VirtualStorage < es[j].VirtualStorage })
+ }
+
+ require.EqualValues(t, entries, nes)
+}
+
func TestPostgresListener_Listen_repositories_delete(t *testing.T) {
db := getDB(t)
@@ -361,7 +379,8 @@ func TestPostgresListener_Listen_repositories_delete(t *testing.T) {
INSERT INTO repositories
VALUES ('praefect-1', '/path/to/repo/1', 1),
('praefect-1', '/path/to/repo/2', 1),
- ('praefect-1', '/path/to/repo/3', 0)`)
+ ('praefect-1', '/path/to/repo/3', 0),
+ ('praefect-2', '/path/to/repo/1', 1)`)
require.NoError(t, err)
},
func(t *testing.T) {
@@ -370,16 +389,10 @@ func TestPostgresListener_Listen_repositories_delete(t *testing.T) {
},
func(t *testing.T, n glsql.Notification) {
require.Equal(t, channel, n.Channel)
- require.JSONEq(t, `
- {
- "old": [
- {"virtual_storage":"praefect-1","relative_path":"/path/to/repo/1","generation":1,"primary":null},
- {"virtual_storage":"praefect-1","relative_path":"/path/to/repo/2","generation":1,"primary":null}
- ],
- "new" : null
- }`,
- n.Payload,
- )
+ requireEqualNotificationEntries(t, n.Payload, []notificationEntry{
+ {VirtualStorage: "praefect-1", RelativePaths: []string{"/path/to/repo/1", "/path/to/repo/2"}},
+ {VirtualStorage: "praefect-2", RelativePaths: []string{"/path/to/repo/1"}},
+ })
},
)
}
@@ -403,16 +416,7 @@ func TestPostgresListener_Listen_storage_repositories_insert(t *testing.T) {
},
func(t *testing.T, n glsql.Notification) {
require.Equal(t, channel, n.Channel)
- require.JSONEq(t, `
- {
- "old":null,
- "new":[
- {"virtual_storage":"praefect-1","relative_path":"/path/to/repo","storage":"gitaly-1","generation":0,"assigned":true},
- {"virtual_storage":"praefect-1","relative_path":"/path/to/repo","storage":"gitaly-2","generation":0,"assigned":true}
- ]
- }`,
- n.Payload,
- )
+ requireEqualNotificationEntries(t, n.Payload, []notificationEntry{{VirtualStorage: "praefect-1", RelativePaths: []string{"/path/to/repo"}}})
},
)
}
@@ -435,14 +439,25 @@ func TestPostgresListener_Listen_storage_repositories_update(t *testing.T) {
},
func(t *testing.T, n glsql.Notification) {
require.Equal(t, channel, n.Channel)
- require.JSONEq(t, `
- {
- "old" : [{"virtual_storage":"praefect-1","relative_path":"/path/to/repo","storage":"gitaly-1","generation":0,"assigned":true}],
- "new" : [{"virtual_storage":"praefect-1","relative_path":"/path/to/repo","storage":"gitaly-1","generation":1,"assigned":true}]
- }`,
- n.Payload,
- )
+ requireEqualNotificationEntries(t, n.Payload, []notificationEntry{{VirtualStorage: "praefect-1", RelativePaths: []string{"/path/to/repo"}}})
+ },
+ )
+}
+
+func TestPostgresListener_Listen_storage_empty_notification(t *testing.T) {
+ db := getDB(t)
+
+ const channel = "storage_repositories_updates"
+
+ testListener(
+ t,
+ channel,
+ func(t *testing.T) {},
+ func(t *testing.T) {
+ _, err := db.DB.Exec(`UPDATE storage_repositories SET generation = 1`)
+ require.NoError(t, err)
},
+ nil, // no notification events expected
)
}
@@ -467,13 +482,7 @@ func TestPostgresListener_Listen_storage_repositories_delete(t *testing.T) {
},
func(t *testing.T, n glsql.Notification) {
require.Equal(t, channel, n.Channel)
- require.JSONEq(t, `
- {
- "old" : [{"virtual_storage":"praefect-1","relative_path":"/path/to/repo","storage":"gitaly-1","generation":0,"assigned":true}],
- "new" : null
- }`,
- n.Payload,
- )
+ requireEqualNotificationEntries(t, n.Payload, []notificationEntry{{VirtualStorage: "praefect-1", RelativePaths: []string{"/path/to/repo"}}})
},
)
}
@@ -515,9 +524,16 @@ func testListener(t *testing.T, channel string, setup func(t *testing.T), trigge
select {
case <-time.After(time.Second):
+ if verifier == nil {
+ // no notifications expected
+ return
+ }
require.FailNow(t, "no notifications for too long period")
case <-receivedChan:
}
+ if verifier == nil {
+ require.Failf(t, "no notifications expected", "received: %v", notification)
+ }
verifier(t, notification)
}
diff --git a/internal/praefect/datastore/migrations/20201208163237_cleanup_notifiactions_payload.go b/internal/praefect/datastore/migrations/20201208163237_cleanup_notifiactions_payload.go
new file mode 100644
index 000000000..a1a79b56b
--- /dev/null
+++ b/internal/praefect/datastore/migrations/20201208163237_cleanup_notifiactions_payload.go
@@ -0,0 +1,74 @@
+package migrations
+
+import migrate "github.com/rubenv/sql-migrate"
+
+func init() {
+ m := &migrate.Migration{
+ Id: "20201208163237_cleanup_notifiactions_payload",
+ Up: []string{
+ `-- +migrate StatementBegin
+ CREATE OR REPLACE FUNCTION notify_on_change() RETURNS TRIGGER AS $$
+ DECLARE
+ msg JSONB;
+ BEGIN
+ CASE TG_OP
+ WHEN 'INSERT' THEN
+ SELECT JSON_AGG(obj) INTO msg
+ FROM (
+ SELECT JSONB_BUILD_OBJECT('virtual_storage', virtual_storage, 'relative_paths', ARRAY_AGG(DISTINCT relative_path)) AS obj
+ FROM NEW
+ GROUP BY virtual_storage
+ ) t;
+ WHEN 'UPDATE' THEN
+ SELECT JSON_AGG(obj) INTO msg
+ FROM (
+ SELECT JSONB_BUILD_OBJECT('virtual_storage', virtual_storage, 'relative_paths', ARRAY_AGG(DISTINCT relative_path)) AS obj
+ FROM NEW
+ FULL JOIN OLD USING (virtual_storage, relative_path)
+ GROUP BY virtual_storage
+ ) t;
+ WHEN 'DELETE' THEN
+ SELECT JSON_AGG(obj) INTO msg
+ FROM (
+ SELECT JSONB_BUILD_OBJECT('virtual_storage', virtual_storage, 'relative_paths', ARRAY_AGG(DISTINCT relative_path)) AS obj
+ FROM OLD
+ GROUP BY virtual_storage
+ ) t;
+ END CASE;
+
+ CASE WHEN JSONB_ARRAY_LENGTH(msg) > 0 THEN
+ PERFORM PG_NOTIFY(TG_ARGV[TG_NARGS-1], msg::TEXT);
+ ELSE END CASE;
+
+ RETURN NULL;
+ END;
+ $$ LANGUAGE plpgsql;
+ -- +migrate StatementEnd`,
+ },
+ Down: []string{
+ `-- +migrate StatementBegin
+ CREATE OR REPLACE FUNCTION notify_on_change() RETURNS TRIGGER AS $$
+ DECLARE
+ old_val JSON DEFAULT NULL;
+ new_val JSON DEFAULT NULL;
+ BEGIN
+ CASE TG_OP
+ WHEN 'INSERT' THEN
+ SELECT JSON_AGG(ROW_TO_JSON(t.*)) INTO new_val FROM NEW AS t;
+ WHEN 'UPDATE' THEN
+ SELECT JSON_AGG(ROW_TO_JSON(t.*)) INTO old_val FROM OLD AS t;
+ SELECT JSON_AGG(ROW_TO_JSON(t.*)) INTO new_val FROM NEW AS t;
+ WHEN 'DELETE' THEN
+ SELECT JSON_AGG(ROW_TO_JSON(t.*)) INTO old_val FROM OLD AS t;
+ END CASE;
+
+ PERFORM PG_NOTIFY(TG_ARGV[TG_NARGS-1], JSON_BUILD_OBJECT('old', old_val, 'new', new_val)::TEXT);
+ RETURN NULL;
+ END;
+ $$ LANGUAGE plpgsql;
+ -- +migrate StatementEnd`,
+ },
+ }
+
+ allMigrations = append(allMigrations, m)
+}
diff --git a/internal/praefect/datastore/storage_provider.go b/internal/praefect/datastore/storage_provider.go
index 4b52c0304..fe8e46804 100644
--- a/internal/praefect/datastore/storage_provider.go
+++ b/internal/praefect/datastore/storage_provider.go
@@ -106,41 +106,27 @@ func NewCachingStorageProvider(logger logrus.FieldLogger, sp SecondariesProvider
return csp, nil
}
-type (
- notificationEntry struct {
- VirtualStorage string `json:"virtual_storage"`
- RelativePath string `json:"relative_path"`
- }
-
- changeNotification struct {
- Old []notificationEntry `json:"old"`
- New []notificationEntry `json:"new"`
- }
-)
+type notificationEntry struct {
+ VirtualStorage string `json:"virtual_storage"`
+ RelativePaths []string `json:"relative_paths"`
+}
func (c *CachingStorageProvider) Notification(n glsql.Notification) {
- var change changeNotification
- if err := json.NewDecoder(strings.NewReader(n.Payload)).Decode(&change); err != nil {
+ var changes []notificationEntry
+ if err := json.NewDecoder(strings.NewReader(n.Payload)).Decode(&changes); err != nil {
c.disableCaching() // as we can't update cache properly we should disable it
c.callbackLogger.WithError(err).WithField("channel", n.Channel).Error("received payload can't be processed, cache disabled")
return
}
- entries := map[string][]string{}
- for _, notificationEntries := range [][]notificationEntry{change.Old, change.New} {
- for _, entry := range notificationEntries {
- entries[entry.VirtualStorage] = append(entries[entry.VirtualStorage], entry.RelativePath)
- }
- }
-
- for virtualStorage, relativePaths := range entries {
- cache, found := c.getCache(virtualStorage)
+ for _, entry := range changes {
+ cache, found := c.getCache(entry.VirtualStorage)
if !found {
- c.callbackLogger.WithError(errNotExistingVirtualStorage).WithField("virtual_storage", virtualStorage).Error("cache not found")
+ c.callbackLogger.WithError(errNotExistingVirtualStorage).WithField("virtual_storage", entry.VirtualStorage).Error("cache not found")
continue
}
- for _, relativePath := range relativePaths {
+ for _, relativePath := range entry.RelativePaths {
cache.Remove(relativePath)
}
}
diff --git a/internal/praefect/datastore/storage_provider_test.go b/internal/praefect/datastore/storage_provider_test.go
index 45b422ad0..aa22ba81c 100644
--- a/internal/praefect/datastore/storage_provider_test.go
+++ b/internal/praefect/datastore/storage_provider_test.go
@@ -2,7 +2,7 @@ package datastore
import (
"context"
- "errors"
+ "encoding/json"
"runtime"
"strings"
"sync"
@@ -213,15 +213,14 @@ func TestCachingStorageProvider_GetSyncedNodes(t *testing.T) {
require.ElementsMatch(t, []string{"g1", "g2", "g3"}, storages1)
// invalid payload disables caching
- cache.Notification(glsql.Notification{Channel: "notification_channel_1", Payload: ``})
+ notification := glsql.Notification{Channel: "notification_channel_1", Payload: `_`}
+ cache.Notification(notification)
+ expErr := json.Unmarshal([]byte(notification.Payload), new(struct{}))
// second access omits cached data as caching should be disabled
storages2 := cache.GetSyncedNodes(ctx, "vs", "/repo/path/1", "g1")
require.ElementsMatch(t, []string{"g1", "g2", "g3"}, storages2)
- // valid payload enables caching again
- cache.Notification(glsql.Notification{Channel: "notification_channel_2", Payload: `{}`})
-
// third access retrieves data and caches it
storages3 := cache.GetSyncedNodes(ctx, "vs", "/repo/path/1", "g1")
require.ElementsMatch(t, []string{"g1", "g2", "g3"}, storages3)
@@ -235,7 +234,7 @@ func TestCachingStorageProvider_GetSyncedNodes(t *testing.T) {
assert.Equal(t, logrus.Fields{
"channel": "notification_channel_1",
"component": "caching_storage_provider",
- "error": errors.New("EOF"),
+ "error": expErr,
}, logHook.LastEntry().Data)
assert.Equal(t, logrus.ErrorLevel, logHook.LastEntry().Level)
@@ -271,12 +270,10 @@ func TestCachingStorageProvider_GetSyncedNodes(t *testing.T) {
// notification evicts entries for '/repo/path/2' from the cache
cache.Notification(glsql.Notification{Payload: `
- {
- "old":[
- {"virtual_storage": "bad", "relative_path": "/repo/path/1"}
- ],
- "new":[{"virtual_storage": "vs", "relative_path": "/repo/path/2"}]
- }`},
+ [
+ {"virtual_storage": "bad", "relative_paths": ["/repo/path/1"]},
+ {"virtual_storage": "vs", "relative_paths": ["/repo/path/2"]}
+ ]`},
)
// second access re-uses cached data for '/repo/path/1'
@@ -342,8 +339,8 @@ func TestCachingStorageProvider_GetSyncedNodes(t *testing.T) {
require.NoError(t, err)
cache.Connected()
- nf1 := glsql.Notification{Payload: `{"new":[{"virtual_storage":"vs","relative_path":"/repo/path/1"}]}`}
- nf2 := glsql.Notification{Payload: `{"new":[{"virtual_storage":"vs","relative_path":"/repo/path/2"}]}`}
+ nf1 := glsql.Notification{Payload: `[{"virtual_storage": "vs", "relative_paths": ["/repo/path/1"]}]`}
+ nf2 := glsql.Notification{Payload: `[{"virtual_storage": "vs", "relative_paths": ["/repo/path/2"]}]`}
var operations []func()
for i := 0; i < 100; i++ {