diff options
author | Sami Hiltunen <shiltunen@gitlab.com> | 2022-04-07 15:54:58 +0300 |
---|---|---|
committer | Sami Hiltunen <shiltunen@gitlab.com> | 2022-04-13 12:45:39 +0300 |
commit | 5c9feb4e8e3175f1f80aae2f452e9e676133d14f (patch) | |
tree | e7a79ee911b9866352b006dff3dc1aa08cb7cf0a | |
parent | 9413ca591ebe30dcb133c86d0ec53f6bc2fc30bb (diff) |
Ignore verification columns for read-only cache updatessmh-verification-trigger-only-generation
Read-only cache receives invalidations on record updates via triggers
in Postgres. Currently the notifications are sent for any modification
to the records. The verification related columns are not relevant to
the operation of the cache so this commit ignores the changes to the
columns in the triggers.
Changelog: changed
-rw-r--r-- | _support/praefect-schema.sql | 6 | ||||
-rw-r--r-- | internal/praefect/datastore/listener_test.go | 51 | ||||
-rw-r--r-- | internal/praefect/datastore/migrations/20220407111624_ignore_verification_in_triggers.go | 93 |
3 files changed, 147 insertions, 3 deletions
diff --git a/_support/praefect-schema.sql b/_support/praefect-schema.sql index 78316e80c..083abf664 100644 --- a/_support/praefect-schema.sql +++ b/_support/praefect-schema.sql @@ -72,8 +72,10 @@ CREATE FUNCTION public.notify_on_change() RETURNS trigger SELECT JSON_AGG(obj) INTO msg FROM ( SELECT JSONB_BUILD_OBJECT('virtual_storage', virtual_storage, 'relative_paths', ARRAY_AGG(DISTINCT relative_path)) AS obj - FROM NEW - FULL JOIN OLD USING (virtual_storage, relative_path) + FROM NEW AS new_value + FULL JOIN OLD AS old_value USING (virtual_storage, relative_path) + WHERE ( COALESCE(new_value.generation, -1) != COALESCE(old_value.generation, -1) ) + OR ( new_value.relative_path != old_value.relative_path ) GROUP BY virtual_storage ) t; WHEN 'DELETE' THEN diff --git a/internal/praefect/datastore/listener_test.go b/internal/praefect/datastore/listener_test.go index 087e2d131..d7274361f 100644 --- a/internal/praefect/datastore/listener_test.go +++ b/internal/praefect/datastore/listener_test.go @@ -309,7 +309,7 @@ func TestListener_Listen_storage_repositories_insert(t *testing.T) { ) } -func TestListener_Listen_storage_repositories_update(t *testing.T) { +func TestListener_Listen_storage_repositories_generation_update(t *testing.T) { t.Parallel() db := testdb.New(t) dbConf := testdb.GetConfig(t, db.Name) @@ -335,6 +335,55 @@ func TestListener_Listen_storage_repositories_update(t *testing.T) { ) } +func TestListener_Listen_storage_repositories_relative_path_update(t *testing.T) { + t.Parallel() + db := testdb.New(t) + dbConf := testdb.GetConfig(t, db.Name) + ctx := testhelper.Context(t) + + verifyListener( + t, + ctx, + dbConf, + StorageRepositoriesUpdatesChannel, + func(t *testing.T) { + rs := NewPostgresRepositoryStore(db, nil) + require.NoError(t, rs.CreateRepository(ctx, 1, "praefect-1", "/path/to/repo", "replica-path", "gitaly-1", nil, nil, true, false)) + }, + func(t *testing.T) { + _, err := db.DB.Exec(`UPDATE storage_repositories SET relative_path = 'updated-relative-path'`) + require.NoError(t, err) + }, + func(t *testing.T, n glsql.Notification) { + require.Equal(t, StorageRepositoriesUpdatesChannel, n.Channel) + requireEqualNotificationEntries(t, n.Payload, []notificationEntry{{VirtualStorage: "praefect-1", RelativePaths: []string{"/path/to/repo", "updated-relative-path"}}}) + }, + ) +} + +func TestListener_Listen_storage_repositories_verification_updates_ignored(t *testing.T) { + t.Parallel() + db := testdb.New(t) + dbConf := testdb.GetConfig(t, db.Name) + ctx := testhelper.Context(t) + + verifyListener( + t, + ctx, + dbConf, + StorageRepositoriesUpdatesChannel, + func(t *testing.T) { + rs := NewPostgresRepositoryStore(db, nil) + require.NoError(t, rs.CreateRepository(ctx, 1, "praefect-1", "/path/to/repo", "replica-path", "gitaly-1", nil, nil, true, false)) + }, + func(t *testing.T) { + _, err := db.DB.Exec(`UPDATE storage_repositories SET verified_at = now(), verification_leased_until = now()`) + require.NoError(t, err) + }, + nil, // no notification events expected + ) +} + func TestListener_Listen_storage_empty_notification(t *testing.T) { t.Parallel() db := testdb.New(t) diff --git a/internal/praefect/datastore/migrations/20220407111624_ignore_verification_in_triggers.go b/internal/praefect/datastore/migrations/20220407111624_ignore_verification_in_triggers.go new file mode 100644 index 000000000..81010ef34 --- /dev/null +++ b/internal/praefect/datastore/migrations/20220407111624_ignore_verification_in_triggers.go @@ -0,0 +1,93 @@ +package migrations + +import migrate "github.com/rubenv/sql-migrate" + +func init() { + m := &migrate.Migration{ + Id: "20220407111624_ignore_verification_in_triggers", + Up: []string{ + `-- +migrate StatementBegin + CREATE OR REPLACE FUNCTION notify_on_change() RETURNS TRIGGER AS $$ + DECLARE + msg JSONB; + BEGIN + CASE TG_OP + WHEN 'INSERT' THEN + SELECT JSON_AGG(obj) INTO msg + FROM ( + SELECT JSONB_BUILD_OBJECT('virtual_storage', virtual_storage, 'relative_paths', ARRAY_AGG(DISTINCT relative_path)) AS obj + FROM NEW + GROUP BY virtual_storage + ) t; + WHEN 'UPDATE' THEN + SELECT JSON_AGG(obj) INTO msg + FROM ( + SELECT JSONB_BUILD_OBJECT('virtual_storage', virtual_storage, 'relative_paths', ARRAY_AGG(DISTINCT relative_path)) AS obj + FROM NEW AS new_value + FULL JOIN OLD AS old_value USING (virtual_storage, relative_path) + WHERE ( COALESCE(new_value.generation, -1) != COALESCE(old_value.generation, -1) ) + OR ( new_value.relative_path != old_value.relative_path ) + GROUP BY virtual_storage + ) t; + WHEN 'DELETE' THEN + SELECT JSON_AGG(obj) INTO msg + FROM ( + SELECT JSONB_BUILD_OBJECT('virtual_storage', virtual_storage, 'relative_paths', ARRAY_AGG(DISTINCT relative_path)) AS obj + FROM OLD + GROUP BY virtual_storage + ) t; + END CASE; + + CASE WHEN JSONB_ARRAY_LENGTH(msg) > 0 THEN + PERFORM PG_NOTIFY(TG_ARGV[TG_NARGS-1], msg::TEXT); + ELSE END CASE; + + RETURN NULL; + END; + $$ LANGUAGE plpgsql; + -- +migrate StatementEnd`, + }, + Down: []string{ + `-- +migrate StatementBegin + CREATE OR REPLACE FUNCTION notify_on_change() RETURNS TRIGGER AS $$ + DECLARE + msg JSONB; + BEGIN + CASE TG_OP + WHEN 'INSERT' THEN + SELECT JSON_AGG(obj) INTO msg + FROM ( + SELECT JSONB_BUILD_OBJECT('virtual_storage', virtual_storage, 'relative_paths', ARRAY_AGG(DISTINCT relative_path)) AS obj + FROM NEW + GROUP BY virtual_storage + ) t; + WHEN 'UPDATE' THEN + SELECT JSON_AGG(obj) INTO msg + FROM ( + SELECT JSONB_BUILD_OBJECT('virtual_storage', virtual_storage, 'relative_paths', ARRAY_AGG(DISTINCT relative_path)) AS obj + FROM NEW + FULL JOIN OLD USING (virtual_storage, relative_path) + GROUP BY virtual_storage + ) t; + WHEN 'DELETE' THEN + SELECT JSON_AGG(obj) INTO msg + FROM ( + SELECT JSONB_BUILD_OBJECT('virtual_storage', virtual_storage, 'relative_paths', ARRAY_AGG(DISTINCT relative_path)) AS obj + FROM OLD + GROUP BY virtual_storage + ) t; + END CASE; + + CASE WHEN JSONB_ARRAY_LENGTH(msg) > 0 THEN + PERFORM PG_NOTIFY(TG_ARGV[TG_NARGS-1], msg::TEXT); + ELSE END CASE; + + RETURN NULL; + END; + $$ LANGUAGE plpgsql; + -- +migrate StatementEnd`, + }, + } + + allMigrations = append(allMigrations, m) +} |