diff options
author | James Fargher <jfargher@gitlab.com> | 2023-02-10 04:22:35 +0300 |
---|---|---|
committer | James Fargher <jfargher@gitlab.com> | 2023-02-10 04:22:35 +0300 |
commit | 4c174fcfc448a5126d537e0fac81f183f8ecee89 (patch) | |
tree | c022167e790f117f221f9680769e1b425b929cf9 | |
parent | a4acadc5e16ef6c37a807aac6466051a30ec6846 (diff) |
datastore: Clear replication queue on DeleteAllRepositories
-rw-r--r-- | internal/praefect/datastore/repository_store.go | 19 | ||||
-rw-r--r-- | internal/praefect/datastore/repository_store_test.go | 75 |
2 files changed, 93 insertions, 1 deletions
diff --git a/internal/praefect/datastore/repository_store.go b/internal/praefect/datastore/repository_store.go index 71fe9016e..5a317c689 100644 --- a/internal/praefect/datastore/repository_store.go +++ b/internal/praefect/datastore/repository_store.go @@ -524,8 +524,25 @@ GROUP BY replica_path //nolint:revive // This is unintentionally missing documentation. func (rs *PostgresRepositoryStore) DeleteAllRepositories(ctx context.Context, virtualStorage string) error { _, err := rs.db.ExecContext(ctx, ` +WITH delete_jobs AS ( + DELETE FROM replication_queue + WHERE job->>'virtual_storage' = $1 + RETURNING id +), + +delete_job_locks AS ( + DELETE FROM replication_queue_job_lock + USING delete_jobs + WHERE job_id = delete_jobs.id +), + +delete_locks AS ( + DELETE FROM replication_queue_lock + WHERE id LIKE $1 || '|%|%' +) + DELETE FROM repositories -WHERE virtual_storage = $1 +WHERE virtual_storage = $1; `, virtualStorage) if err != nil { return err diff --git a/internal/praefect/datastore/repository_store_test.go b/internal/praefect/datastore/repository_store_test.go index c3410ee3c..d1d3f0204 100644 --- a/internal/praefect/datastore/repository_store_test.go +++ b/internal/praefect/datastore/repository_store_test.go @@ -622,12 +622,81 @@ func TestRepositoryStore_Postgres(t *testing.T) { }) t.Run("DeleteAllRepositories", func(t *testing.T) { + queue := PostgresReplicationEventQueue{db} rs := newRepositoryStore(t, nil) require.NoError(t, rs.CreateRepository(ctx, 1, "virtual-storage-1", "repository-1", "replica-path-1", "storage-1", nil, nil, false, false)) require.NoError(t, rs.CreateRepository(ctx, 2, "virtual-storage-2", "repository-1", "replica-path-2", "storage-1", []string{"storage-2"}, nil, false, false)) require.NoError(t, rs.CreateRepository(ctx, 3, "virtual-storage-2", "repository-2", "replica-path-3", "storage-1", nil, nil, false, false)) + events := []ReplicationEvent{ + { + Job: ReplicationJob{ + Change: UpdateRepo, + RelativePath: "/project/path-1", + TargetNodeStorage: "gitaly-1", + SourceNodeStorage: "gitaly-0", + VirtualStorage: "virtual-storage-1", + }, + }, + { + Job: ReplicationJob{ + Change: UpdateRepo, + RelativePath: "/project/path-2", + TargetNodeStorage: "gitaly-1", + SourceNodeStorage: "gitaly-0", + VirtualStorage: "virtual-storage-1", + }, + }, + { + Job: ReplicationJob{ + Change: UpdateRepo, + RelativePath: "/project/path-3", + TargetNodeStorage: "gitaly-1", + SourceNodeStorage: "gitaly-0", + VirtualStorage: "virtual-storage-2", + }, + }, + { + Job: ReplicationJob{ + Change: UpdateRepo, + RelativePath: "/project/path-4", + TargetNodeStorage: "gitaly-1", + SourceNodeStorage: "gitaly-0", + VirtualStorage: "virtual-storage-2", + }, + }, + } + + for i := range events { + var err error + events[i], err = queue.Enqueue(ctx, events[i]) + require.NoError(t, err, "failed to fill in event queue") + } + + expectedEvents := []ReplicationEvent{events[2]} + expectedJobLocks := []JobLockRow{ + {JobID: events[2].ID, LockID: "virtual-storage-2|gitaly-1|/project/path-3"}, + } + + dequeuedEvents, err := queue.Dequeue(ctx, "virtual-storage-2", "gitaly-1", 1) + require.NoError(t, err) + require.Len(t, dequeuedEvents, len(expectedEvents)) + for i := range dequeuedEvents { + dequeuedEvents[i].UpdatedAt = nil // it is not possible to determine update_at value as it is generated on UPDATE in database + expectedEvents[i].State = JobStateInProgress + expectedEvents[i].Attempt-- + } + require.Equal(t, expectedEvents, dequeuedEvents) + + requireLocks(t, ctx, db, []LockRow{ + {ID: "virtual-storage-1|gitaly-1|/project/path-1", Acquired: false}, + {ID: "virtual-storage-1|gitaly-1|/project/path-2", Acquired: false}, + {ID: "virtual-storage-2|gitaly-1|/project/path-3", Acquired: true}, + {ID: "virtual-storage-2|gitaly-1|/project/path-4", Acquired: false}, + }) + requireJobLocks(t, ctx, db, expectedJobLocks) + requireState(t, ctx, db, virtualStorageState{ "virtual-storage-1": { @@ -658,6 +727,12 @@ func TestRepositoryStore_Postgres(t *testing.T) { require.NoError(t, rs.DeleteAllRepositories(ctx, "virtual-storage-2")) + requireLocks(t, ctx, db, []LockRow{ + {ID: "virtual-storage-1|gitaly-1|/project/path-1", Acquired: false}, + {ID: "virtual-storage-1|gitaly-1|/project/path-2", Acquired: false}, + }) + requireJobLocks(t, ctx, db, []JobLockRow{}) + requireState(t, ctx, db, virtualStorageState{ "virtual-storage-1": { |