Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPavlo Strokov <pstrokov@gitlab.com>2020-12-14 17:58:42 +0300
committerPavlo Strokov <pstrokov@gitlab.com>2020-12-15 12:45:16 +0300
commitbb6f1bb506a3e3dc3ce3cef5f2a65036f8e5a4d4 (patch)
treebb11f30b468aef371226cfd48962cf8c24764318 /internal/praefect/datastore
parente97131a42ed6db84edda3a3cc585eb2004f8b503 (diff)
Handle empty notification messages
The triggers may be called with no actual change done by the operation, like updating of the non-existing rows. In this case existing trigger will generate a notification with empty payload. This payload can't be properly handled by the listener and will cause cache disabling. To deal with it we add check in the trigger logic to verify if there are any actual changes done to the state. This change also includes structural changes into the query but preserve the same logic and format of the payload. Closes: https://gitlab.com/gitlab-org/gitaly/-/issues/3309
Diffstat (limited to 'internal/praefect/datastore')
-rw-r--r--internal/praefect/datastore/listener_postgres_test.go24
-rw-r--r--internal/praefect/datastore/migrations/20201208163237_cleanup_notifiactions_payload.go36
-rw-r--r--internal/praefect/datastore/storage_provider.go1
-rw-r--r--internal/praefect/datastore/storage_provider_test.go8
4 files changed, 46 insertions, 23 deletions
diff --git a/internal/praefect/datastore/listener_postgres_test.go b/internal/praefect/datastore/listener_postgres_test.go
index 79a53d741..d402368d4 100644
--- a/internal/praefect/datastore/listener_postgres_test.go
+++ b/internal/praefect/datastore/listener_postgres_test.go
@@ -444,6 +444,23 @@ func TestPostgresListener_Listen_storage_repositories_update(t *testing.T) {
)
}
+func TestPostgresListener_Listen_storage_empty_notification(t *testing.T) {
+ db := getDB(t)
+
+ const channel = "storage_repositories_updates"
+
+ testListener(
+ t,
+ channel,
+ func(t *testing.T) {},
+ func(t *testing.T) {
+ _, err := db.DB.Exec(`UPDATE storage_repositories SET generation = 1`)
+ require.NoError(t, err)
+ },
+ nil, // no notification events expected
+ )
+}
+
func TestPostgresListener_Listen_storage_repositories_delete(t *testing.T) {
db := getDB(t)
@@ -507,9 +524,16 @@ func testListener(t *testing.T, channel string, setup func(t *testing.T), trigge
select {
case <-time.After(time.Second):
+ if verifier == nil {
+ // no notifications expected
+ return
+ }
require.FailNow(t, "no notifications for too long period")
case <-receivedChan:
}
+ if verifier == nil {
+ require.Failf(t, "no notifications expected", "received: %v", notification)
+ }
verifier(t, notification)
}
diff --git a/internal/praefect/datastore/migrations/20201208163237_cleanup_notifiactions_payload.go b/internal/praefect/datastore/migrations/20201208163237_cleanup_notifiactions_payload.go
index 6213bef6f..a1a79b56b 100644
--- a/internal/praefect/datastore/migrations/20201208163237_cleanup_notifiactions_payload.go
+++ b/internal/praefect/datastore/migrations/20201208163237_cleanup_notifiactions_payload.go
@@ -9,40 +9,38 @@ func init() {
`-- +migrate StatementBegin
CREATE OR REPLACE FUNCTION notify_on_change() RETURNS TRIGGER AS $$
DECLARE
- msg TEXT DEFAULT '';
+ msg JSONB;
BEGIN
- CASE TG_OP
+ CASE TG_OP
WHEN 'INSERT' THEN
- SELECT JSON_AGG(obj)::TEXT INTO msg
+ SELECT JSON_AGG(obj) INTO msg
FROM (
- SELECT JSONB_BUILD_OBJECT('virtual_storage', virtual_storage, 'relative_paths', ARRAY_AGG(relative_path)) AS obj
- FROM (SELECT DISTINCT virtual_storage, relative_path FROM NEW) t
+ SELECT JSONB_BUILD_OBJECT('virtual_storage', virtual_storage, 'relative_paths', ARRAY_AGG(DISTINCT relative_path)) AS obj
+ FROM NEW
GROUP BY virtual_storage
) t;
WHEN 'UPDATE' THEN
- SELECT JSON_AGG(obj)::TEXT INTO msg
+ SELECT JSON_AGG(obj) INTO msg
FROM (
- SELECT JSONB_BUILD_OBJECT('virtual_storage', virtual_storage, 'relative_paths', ARRAY_AGG(relative_path)) AS obj
- FROM (
- SELECT virtual_storage, relative_path
- FROM (SELECT DISTINCT virtual_storage, relative_path FROM NEW) t1
- UNION
- SELECT virtual_storage, relative_path
- FROM (SELECT DISTINCT virtual_storage, relative_path FROM OLD) t2
- ) t
+ SELECT JSONB_BUILD_OBJECT('virtual_storage', virtual_storage, 'relative_paths', ARRAY_AGG(DISTINCT relative_path)) AS obj
+ FROM NEW
+ FULL JOIN OLD USING (virtual_storage, relative_path)
GROUP BY virtual_storage
) t;
WHEN 'DELETE' THEN
- SELECT JSON_AGG(obj)::TEXT INTO msg
+ SELECT JSON_AGG(obj) INTO msg
FROM (
- SELECT JSONB_BUILD_OBJECT('virtual_storage', virtual_storage, 'relative_paths', ARRAY_AGG(relative_path)) AS obj
- FROM (SELECT DISTINCT virtual_storage, relative_path FROM OLD) t
+ SELECT JSONB_BUILD_OBJECT('virtual_storage', virtual_storage, 'relative_paths', ARRAY_AGG(DISTINCT relative_path)) AS obj
+ FROM OLD
GROUP BY virtual_storage
) t;
END CASE;
- PERFORM PG_NOTIFY(TG_ARGV[TG_NARGS-1], msg);
- RETURN NULL;
+ CASE WHEN JSONB_ARRAY_LENGTH(msg) > 0 THEN
+ PERFORM PG_NOTIFY(TG_ARGV[TG_NARGS-1], msg::TEXT);
+ ELSE END CASE;
+
+ RETURN NULL;
END;
$$ LANGUAGE plpgsql;
-- +migrate StatementEnd`,
diff --git a/internal/praefect/datastore/storage_provider.go b/internal/praefect/datastore/storage_provider.go
index 6bb3909d1..fe8e46804 100644
--- a/internal/praefect/datastore/storage_provider.go
+++ b/internal/praefect/datastore/storage_provider.go
@@ -112,7 +112,6 @@ type notificationEntry struct {
}
func (c *CachingStorageProvider) Notification(n glsql.Notification) {
- // it is a new format of the notification message
var changes []notificationEntry
if err := json.NewDecoder(strings.NewReader(n.Payload)).Decode(&changes); err != nil {
c.disableCaching() // as we can't update cache properly we should disable it
diff --git a/internal/praefect/datastore/storage_provider_test.go b/internal/praefect/datastore/storage_provider_test.go
index 4a9894914..aa22ba81c 100644
--- a/internal/praefect/datastore/storage_provider_test.go
+++ b/internal/praefect/datastore/storage_provider_test.go
@@ -2,7 +2,7 @@ package datastore
import (
"context"
- "errors"
+ "encoding/json"
"runtime"
"strings"
"sync"
@@ -213,7 +213,9 @@ func TestCachingStorageProvider_GetSyncedNodes(t *testing.T) {
require.ElementsMatch(t, []string{"g1", "g2", "g3"}, storages1)
// invalid payload disables caching
- cache.Notification(glsql.Notification{Channel: "notification_channel_1", Payload: ``})
+ notification := glsql.Notification{Channel: "notification_channel_1", Payload: `_`}
+ cache.Notification(notification)
+ expErr := json.Unmarshal([]byte(notification.Payload), new(struct{}))
// second access omits cached data as caching should be disabled
storages2 := cache.GetSyncedNodes(ctx, "vs", "/repo/path/1", "g1")
@@ -232,7 +234,7 @@ func TestCachingStorageProvider_GetSyncedNodes(t *testing.T) {
assert.Equal(t, logrus.Fields{
"channel": "notification_channel_1",
"component": "caching_storage_provider",
- "error": errors.New("EOF"),
+ "error": expErr,
}, logHook.LastEntry().Data)
assert.Equal(t, logrus.ErrorLevel, logHook.LastEntry().Level)