Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJames Fargher <jfargher@gitlab.com>2023-02-10 04:22:35 +0300
committerJames Fargher <jfargher@gitlab.com>2023-02-10 04:22:35 +0300
commit4c174fcfc448a5126d537e0fac81f183f8ecee89 (patch)
treec022167e790f117f221f9680769e1b425b929cf9
parenta4acadc5e16ef6c37a807aac6466051a30ec6846 (diff)
datastore: Clear replication queue on DeleteAllRepositories
-rw-r--r--internal/praefect/datastore/repository_store.go19
-rw-r--r--internal/praefect/datastore/repository_store_test.go75
2 files changed, 93 insertions, 1 deletions
diff --git a/internal/praefect/datastore/repository_store.go b/internal/praefect/datastore/repository_store.go
index 71fe9016e..5a317c689 100644
--- a/internal/praefect/datastore/repository_store.go
+++ b/internal/praefect/datastore/repository_store.go
@@ -524,8 +524,25 @@ GROUP BY replica_path
//nolint:revive // This is unintentionally missing documentation.
func (rs *PostgresRepositoryStore) DeleteAllRepositories(ctx context.Context, virtualStorage string) error {
_, err := rs.db.ExecContext(ctx, `
+WITH delete_jobs AS (
+ DELETE FROM replication_queue
+ WHERE job->>'virtual_storage' = $1
+ RETURNING id
+),
+
+delete_job_locks AS (
+ DELETE FROM replication_queue_job_lock
+ USING delete_jobs
+ WHERE job_id = delete_jobs.id
+),
+
+delete_locks AS (
+ DELETE FROM replication_queue_lock
+ WHERE id LIKE $1 || '|%|%'
+)
+
DELETE FROM repositories
-WHERE virtual_storage = $1
+WHERE virtual_storage = $1;
`, virtualStorage)
if err != nil {
return err
diff --git a/internal/praefect/datastore/repository_store_test.go b/internal/praefect/datastore/repository_store_test.go
index c3410ee3c..d1d3f0204 100644
--- a/internal/praefect/datastore/repository_store_test.go
+++ b/internal/praefect/datastore/repository_store_test.go
@@ -622,12 +622,81 @@ func TestRepositoryStore_Postgres(t *testing.T) {
})
t.Run("DeleteAllRepositories", func(t *testing.T) {
+ queue := PostgresReplicationEventQueue{db}
rs := newRepositoryStore(t, nil)
require.NoError(t, rs.CreateRepository(ctx, 1, "virtual-storage-1", "repository-1", "replica-path-1", "storage-1", nil, nil, false, false))
require.NoError(t, rs.CreateRepository(ctx, 2, "virtual-storage-2", "repository-1", "replica-path-2", "storage-1", []string{"storage-2"}, nil, false, false))
require.NoError(t, rs.CreateRepository(ctx, 3, "virtual-storage-2", "repository-2", "replica-path-3", "storage-1", nil, nil, false, false))
+ events := []ReplicationEvent{
+ {
+ Job: ReplicationJob{
+ Change: UpdateRepo,
+ RelativePath: "/project/path-1",
+ TargetNodeStorage: "gitaly-1",
+ SourceNodeStorage: "gitaly-0",
+ VirtualStorage: "virtual-storage-1",
+ },
+ },
+ {
+ Job: ReplicationJob{
+ Change: UpdateRepo,
+ RelativePath: "/project/path-2",
+ TargetNodeStorage: "gitaly-1",
+ SourceNodeStorage: "gitaly-0",
+ VirtualStorage: "virtual-storage-1",
+ },
+ },
+ {
+ Job: ReplicationJob{
+ Change: UpdateRepo,
+ RelativePath: "/project/path-3",
+ TargetNodeStorage: "gitaly-1",
+ SourceNodeStorage: "gitaly-0",
+ VirtualStorage: "virtual-storage-2",
+ },
+ },
+ {
+ Job: ReplicationJob{
+ Change: UpdateRepo,
+ RelativePath: "/project/path-4",
+ TargetNodeStorage: "gitaly-1",
+ SourceNodeStorage: "gitaly-0",
+ VirtualStorage: "virtual-storage-2",
+ },
+ },
+ }
+
+ for i := range events {
+ var err error
+ events[i], err = queue.Enqueue(ctx, events[i])
+ require.NoError(t, err, "failed to fill in event queue")
+ }
+
+ expectedEvents := []ReplicationEvent{events[2]}
+ expectedJobLocks := []JobLockRow{
+ {JobID: events[2].ID, LockID: "virtual-storage-2|gitaly-1|/project/path-3"},
+ }
+
+ dequeuedEvents, err := queue.Dequeue(ctx, "virtual-storage-2", "gitaly-1", 1)
+ require.NoError(t, err)
+ require.Len(t, dequeuedEvents, len(expectedEvents))
+ for i := range dequeuedEvents {
+ dequeuedEvents[i].UpdatedAt = nil // it is not possible to determine update_at value as it is generated on UPDATE in database
+ expectedEvents[i].State = JobStateInProgress
+ expectedEvents[i].Attempt--
+ }
+ require.Equal(t, expectedEvents, dequeuedEvents)
+
+ requireLocks(t, ctx, db, []LockRow{
+ {ID: "virtual-storage-1|gitaly-1|/project/path-1", Acquired: false},
+ {ID: "virtual-storage-1|gitaly-1|/project/path-2", Acquired: false},
+ {ID: "virtual-storage-2|gitaly-1|/project/path-3", Acquired: true},
+ {ID: "virtual-storage-2|gitaly-1|/project/path-4", Acquired: false},
+ })
+ requireJobLocks(t, ctx, db, expectedJobLocks)
+
requireState(t, ctx, db,
virtualStorageState{
"virtual-storage-1": {
@@ -658,6 +727,12 @@ func TestRepositoryStore_Postgres(t *testing.T) {
require.NoError(t, rs.DeleteAllRepositories(ctx, "virtual-storage-2"))
+ requireLocks(t, ctx, db, []LockRow{
+ {ID: "virtual-storage-1|gitaly-1|/project/path-1", Acquired: false},
+ {ID: "virtual-storage-1|gitaly-1|/project/path-2", Acquired: false},
+ })
+ requireJobLocks(t, ctx, db, []JobLockRow{})
+
requireState(t, ctx, db,
virtualStorageState{
"virtual-storage-1": {