Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPavlo Strokov <pstrokov@gitlab.com>2020-10-23 16:24:57 +0300
committerPavlo Strokov <pstrokov@gitlab.com>2020-10-23 16:24:57 +0300
commit7615df420eaa7c455030df1e6bb35481283d0981 (patch)
tree141d99222606ef3a6908644e6219337773675133
parente76ac4c43dfc5586082ba22f359c6b3fdc21004d (diff)
Receiving notifications on changes in database
As we are on the road of adding cache of up to date storages per repository we need to know when we should invalidate cache entries in it. In order to archive this we are going to listen for the changes made to repositories and storage_repositories tables. The notification contains information about old and new state of the row. The notification is issued for each changed row. Part of: https://gitlab.com/gitlab-org/gitaly/-/issues/3053
-rw-r--r--changelogs/unreleased/ps-listen-table-changes.yml5
-rw-r--r--internal/praefect/datastore/glsql/postgres.go2
-rw-r--r--internal/praefect/datastore/listener_postgres.go2
-rw-r--r--internal/praefect/datastore/listener_postgres_test.go173
-rw-r--r--internal/praefect/datastore/migrations/20201006125956_trigger_repository_update_generation.go65
5 files changed, 247 insertions, 0 deletions
diff --git a/changelogs/unreleased/ps-listen-table-changes.yml b/changelogs/unreleased/ps-listen-table-changes.yml
new file mode 100644
index 000000000..e75b43c66
--- /dev/null
+++ b/changelogs/unreleased/ps-listen-table-changes.yml
@@ -0,0 +1,5 @@
+---
+title: Receiving notifications on changes in database
+merge_request: 2631
+author:
+type: added
diff --git a/internal/praefect/datastore/glsql/postgres.go b/internal/praefect/datastore/glsql/postgres.go
index 465275299..3e0a05700 100644
--- a/internal/praefect/datastore/glsql/postgres.go
+++ b/internal/praefect/datastore/glsql/postgres.go
@@ -47,6 +47,8 @@ type ListenHandler interface {
Notification(payload string)
// Disconnect would be triggered once a connection to remote service is lost.
Disconnect()
+ // Connected would be triggered once a connection to remote service is established.
+ Connected()
}
// Listener listens for events that occur in the system.
diff --git a/internal/praefect/datastore/listener_postgres.go b/internal/praefect/datastore/listener_postgres.go
index 49cb8c13a..4c122e9c4 100644
--- a/internal/praefect/datastore/listener_postgres.go
+++ b/internal/praefect/datastore/listener_postgres.go
@@ -122,6 +122,8 @@ func (pgl *PostgresListener) initListener(handler glsql.ListenHandler) error {
pgl.reconnectTotal.WithLabelValues(listenerEventTypeToString(eventType)).Inc()
switch eventType {
+ case pq.ListenerEventConnected:
+ dontpanic.Try(handler.Connected)
case pq.ListenerEventDisconnected:
dontpanic.Try(handler.Disconnect)
if disconnectThreshold() {
diff --git a/internal/praefect/datastore/listener_postgres_test.go b/internal/praefect/datastore/listener_postgres_test.go
index 5a740b562..96fe82ed2 100644
--- a/internal/praefect/datastore/listener_postgres_test.go
+++ b/internal/praefect/datastore/listener_postgres_test.go
@@ -68,6 +68,7 @@ func TestNewPostgresListener(t *testing.T) {
type mockListenHandler struct {
OnNotification func(string)
OnDisconnect func()
+ OnConnect func()
}
func (mlh mockListenHandler) Notification(v string) {
@@ -82,6 +83,12 @@ func (mlh mockListenHandler) Disconnect() {
}
}
+func (mlh mockListenHandler) Connected() {
+ if mlh.OnConnect != nil {
+ mlh.OnConnect()
+ }
+}
+
func TestPostgresListener_Listen(t *testing.T) {
db := getDB(t)
@@ -400,3 +407,169 @@ func TestThreshold(t *testing.T) {
require.True(t, thresholdReached())
})
}
+
+func TestPostgresListener_Listen_repositories_delete(t *testing.T) {
+ db := getDB(t)
+
+ testListener(
+ t,
+ "repositories_updates",
+ func(t *testing.T) {
+ _, err := db.DB.Exec(`
+ INSERT INTO repositories
+ VALUES ('praefect-1', '/path/to/repo/1', 1),
+ ('praefect-1', '/path/to/repo/2', 1),
+ ('praefect-1', '/path/to/repo/3', 0)`)
+ require.NoError(t, err)
+ },
+ func(t *testing.T) {
+ _, err := db.DB.Exec(`DELETE FROM repositories WHERE generation > 0`)
+ require.NoError(t, err)
+ },
+ func(t *testing.T, payload string) {
+ require.JSONEq(t, `
+ {
+ "old": [
+ {"virtual_storage":"praefect-1","relative_path":"/path/to/repo/1","generation":1},
+ {"virtual_storage":"praefect-1","relative_path":"/path/to/repo/2","generation":1}
+ ],
+ "new" : null
+ }`,
+ payload,
+ )
+ },
+ )
+}
+
+func TestPostgresListener_Listen_storage_repositories_insert(t *testing.T) {
+ db := getDB(t)
+
+ testListener(
+ t,
+ "storage_repositories_updates",
+ func(t *testing.T) {},
+ func(t *testing.T) {
+ _, err := db.DB.Exec(`
+ INSERT INTO storage_repositories
+ VALUES ('praefect-1', '/path/to/repo', 'gitaly-1', 0),
+ ('praefect-1', '/path/to/repo', 'gitaly-2', 0)`,
+ )
+ require.NoError(t, err)
+ },
+ func(t *testing.T, payload string) {
+ require.JSONEq(t, `
+ {
+ "old":null,
+ "new":[
+ {"virtual_storage":"praefect-1","relative_path":"/path/to/repo","storage":"gitaly-1","generation":0},
+ {"virtual_storage":"praefect-1","relative_path":"/path/to/repo","storage":"gitaly-2","generation":0}
+ ]
+ }`,
+ payload,
+ )
+ },
+ )
+}
+
+func TestPostgresListener_Listen_storage_repositories_update(t *testing.T) {
+ db := getDB(t)
+
+ testListener(
+ t,
+ "storage_repositories_updates",
+ func(t *testing.T) {
+ _, err := db.DB.Exec(`INSERT INTO storage_repositories VALUES ('praefect-1', '/path/to/repo', 'gitaly-1', 0)`)
+ require.NoError(t, err)
+ },
+ func(t *testing.T) {
+ _, err := db.DB.Exec(`UPDATE storage_repositories SET generation = generation + 1`)
+ require.NoError(t, err)
+ },
+ func(t *testing.T, payload string) {
+ require.JSONEq(t, `
+ {
+ "old" : [{"virtual_storage":"praefect-1","relative_path":"/path/to/repo","storage":"gitaly-1","generation":0}],
+ "new" : [{"virtual_storage":"praefect-1","relative_path":"/path/to/repo","storage":"gitaly-1","generation":1}]
+ }`,
+ payload,
+ )
+ },
+ )
+}
+
+func TestPostgresListener_Listen_storage_repositories_delete(t *testing.T) {
+ db := getDB(t)
+
+ testListener(
+ t,
+ "storage_repositories_updates",
+ func(t *testing.T) {
+ _, err := db.DB.Exec(`
+ INSERT INTO storage_repositories (virtual_storage, relative_path, storage, generation)
+ VALUES ('praefect-1', '/path/to/repo', 'gitaly-1', 0)`,
+ )
+ require.NoError(t, err)
+ },
+ func(t *testing.T) {
+ _, err := db.DB.Exec(`DELETE FROM storage_repositories`)
+ require.NoError(t, err)
+ },
+ func(t *testing.T, payload string) {
+ require.JSONEq(t, `
+ {
+ "old" : [{"virtual_storage":"praefect-1","relative_path":"/path/to/repo","storage":"gitaly-1","generation":0}],
+ "new" : null
+ }`,
+ payload,
+ )
+ },
+ )
+}
+
+func testListener(t *testing.T, channel string, setup func(t *testing.T), trigger func(t *testing.T), verifier func(t *testing.T, payload string)) {
+ setup(t)
+
+ opts := DefaultPostgresListenerOpts
+ opts.Addr = getDBConfig(t).ToPQString(true)
+ opts.Channel = channel
+
+ pgl, err := NewPostgresListener(opts)
+ require.NoError(t, err)
+
+ ctx, cancel := testhelper.Context()
+ defer cancel()
+
+ readyChan := make(chan struct{})
+ receivedChan := make(chan struct{})
+ var payload string
+
+ callback := func(pld string) {
+ select {
+ case <-receivedChan:
+ return
+ default:
+ payload = pld
+ close(receivedChan)
+ }
+ }
+
+ go func() {
+ require.NoError(t, pgl.Listen(ctx, mockListenHandler{OnNotification: callback, OnConnect: func() { close(readyChan) }}))
+ }()
+
+ select {
+ case <-time.After(time.Second):
+ require.FailNow(t, "no connection for too long period")
+ case <-readyChan:
+ }
+
+ trigger(t)
+
+ select {
+ case <-time.After(time.Second):
+ require.FailNow(t, "no notifications for too long period")
+ case <-receivedChan:
+ }
+
+ verifier(t, payload)
+}
diff --git a/internal/praefect/datastore/migrations/20201006125956_trigger_repository_update_generation.go b/internal/praefect/datastore/migrations/20201006125956_trigger_repository_update_generation.go
new file mode 100644
index 000000000..a1c5bf979
--- /dev/null
+++ b/internal/praefect/datastore/migrations/20201006125956_trigger_repository_update_generation.go
@@ -0,0 +1,65 @@
+package migrations
+
+import migrate "github.com/rubenv/sql-migrate"
+
+func init() {
+ m := &migrate.Migration{
+ Id: "20201006125956_trigger_repository_update_generation",
+ Up: []string{
+ `-- +migrate StatementBegin
+ CREATE OR REPLACE FUNCTION notify_on_change() RETURNS TRIGGER AS $$
+ DECLARE
+ old_val JSON DEFAULT NULL;
+ new_val JSON DEFAULT NULL;
+ BEGIN
+ CASE TG_OP
+ WHEN 'INSERT' THEN
+ SELECT JSON_AGG(ROW_TO_JSON(t.*)) INTO new_val FROM NEW AS t;
+ WHEN 'UPDATE' THEN
+ SELECT JSON_AGG(ROW_TO_JSON(t.*)) INTO old_val FROM OLD AS t;
+ SELECT JSON_AGG(ROW_TO_JSON(t.*)) INTO new_val FROM NEW AS t;
+ WHEN 'DELETE' THEN
+ SELECT JSON_AGG(ROW_TO_JSON(t.*)) INTO old_val FROM OLD AS t;
+ END CASE;
+
+ PERFORM PG_NOTIFY(TG_ARGV[TG_NARGS-1], JSON_BUILD_OBJECT('old', old_val, 'new', new_val)::TEXT);
+ RETURN NULL;
+ END;
+ $$ LANGUAGE plpgsql;
+ -- +migrate StatementEnd`,
+
+ // for repositories table
+ `CREATE TRIGGER notify_on_delete AFTER DELETE ON repositories
+ REFERENCING OLD TABLE AS OLD
+ FOR EACH STATEMENT
+ EXECUTE FUNCTION notify_on_change('repositories_updates')`,
+
+ // for storage_repositories table
+ `CREATE TRIGGER notify_on_insert AFTER INSERT ON storage_repositories
+ REFERENCING NEW TABLE AS NEW
+ FOR EACH STATEMENT
+ EXECUTE FUNCTION notify_on_change('storage_repositories_updates')`,
+
+ `CREATE TRIGGER notify_on_update AFTER UPDATE ON storage_repositories
+ REFERENCING OLD TABLE AS OLD NEW TABLE AS NEW
+ FOR EACH STATEMENT
+ EXECUTE FUNCTION notify_on_change('storage_repositories_updates')`,
+
+ `CREATE TRIGGER notify_on_delete AFTER DELETE ON storage_repositories
+ REFERENCING OLD TABLE AS OLD
+ FOR EACH STATEMENT
+ EXECUTE FUNCTION notify_on_change('storage_repositories_updates')`,
+ },
+ Down: []string{
+ `DROP TRIGGER IF EXISTS notify_on_delete ON repositories`,
+
+ `DROP TRIGGER IF EXISTS notify_on_insert ON storage_repositories`,
+ `DROP TRIGGER IF EXISTS notify_on_update ON storage_repositories`,
+ `DROP TRIGGER IF EXISTS notify_on_delete ON storage_repositories`,
+
+ `DROP FUNCTION IF EXISTS notify_on_change`,
+ },
+ }
+
+ allMigrations = append(allMigrations, m)
+}