diff options
author | Pavlo Strokov <pstrokov@gitlab.com> | 2022-02-15 23:18:49 +0300 |
---|---|---|
committer | Pavlo Strokov <pstrokov@gitlab.com> | 2022-02-23 13:36:05 +0300 |
commit | a08910de6b5dbc8cfd2e3f559288a60cee0b2a84 (patch) | |
tree | 5f5edbfeb9efa89c807e97234ea2cf2352e73365 | |
parent | 57b3f3a83d55e655ca59a7e677f506ebf41c654b (diff) |
migrations: Migrate an existing events and add constraint
The set of newly added columns in the previous commit
remains empty or contains default values. We do update
for the replication_queue table where we populate colums
with the values extracted from the 'job' column corresponding
to the JSON field names. If some old replication events have
no replication_id field set we try to extract it from the
repositories table. We consider all replication events
with unset replication_id (0 value) as too old and remove
them otherwise we won't be able to create a foreign key
constraint. The foreign key between replication_queue and
repositories tables is used to clean up replication events
once the repository is removed.
To make replication_queue_job_lock table more self-managed
we re-create replication_queue_job_lock_job_id_fkey and
replication_queue_job_lock_lock_id_fkey with cascade removal.
It will give us a confidence the rows are removed from the
table in case corresponding rows are removed from the
replication_queue or replication_queue_lock tables.
If there were in_progress events the rows for those will be
removed from the replication_queue and replication_queue_job_lock
tables, but the rows in the replication_queue_lock will
remain with 'acquired' column set to 'true'. Because the
repositories not exist anymore we remove rows from the
replication_queue_lock as they won't be used anymore. And to
prevent existence of the unused rows in the
replication_queue_lock table we create a trigger on the
repositories table to delete rows from replication_queue_lock
table after repository row removal.
Part of: https://gitlab.com/gitlab-org/gitaly/-/issues/3974
3 files changed, 323 insertions, 101 deletions
diff --git a/internal/praefect/datastore/collector_test.go b/internal/praefect/datastore/collector_test.go index 3091c8321..d59f2112f 100644 --- a/internal/praefect/datastore/collector_test.go +++ b/internal/praefect/datastore/collector_test.go @@ -269,15 +269,17 @@ func TestRepositoryStoreCollector_ReplicationQueueDepth(t *testing.T) { readyJobs := 5 for virtualStorage, nodes := range storageNames { for i := 0; i < readyJobs; i++ { + job := ReplicationJob{ + Change: UpdateRepo, + RelativePath: "/project/path-1", + TargetNodeStorage: nodes[1], + SourceNodeStorage: nodes[0], + VirtualStorage: virtualStorage, + Params: nil, + } + insertRepository(t, db, ctx, job.VirtualStorage, job.RelativePath, job.SourceNodeStorage) _, err := queue.Enqueue(ctx, ReplicationEvent{ - Job: ReplicationJob{ - Change: UpdateRepo, - RelativePath: "/project/path-1", - TargetNodeStorage: nodes[1], - SourceNodeStorage: nodes[0], - VirtualStorage: virtualStorage, - Params: nil, - }, + Job: job, }) require.NoError(t, err) } diff --git a/internal/praefect/datastore/migrations/20220211161203_json_job_to_columns.go b/internal/praefect/datastore/migrations/20220211161203_json_job_to_columns.go new file mode 100644 index 000000000..0336ae058 --- /dev/null +++ b/internal/praefect/datastore/migrations/20220211161203_json_job_to_columns.go @@ -0,0 +1,140 @@ +package migrations + +import migrate "github.com/rubenv/sql-migrate" + +func init() { + m := &migrate.Migration{ + Id: "20220211161203_json_job_to_columns", + Up: []string{ + // 1. The replication_queue was extended with set of the new columns to cover + // job JSON struct. This query update all existing rows to fulfil newly + // added columns with data from the job column. + `UPDATE replication_queue SET + change = (queue.job->>'change')::REPLICATION_JOB_TYPE, + repository_id = COALESCE( + -- For the old jobs the repository_id may be 'null' as we didn't have migration that + -- populates those with the value from the repositories table. + -- The repository_id also may be a 0 in case the job was created before repositories table was fulfilled + -- and event was created by reconciler. + CASE WHEN (queue.job->>'repository_id')::BIGINT = 0 THEN NULL ELSE (queue.job->>'repository_id')::BIGINT END, + (SELECT repositories.repository_id FROM repositories WHERE repositories.virtual_storage = queue.job->>'virtual_storage' AND repositories.relative_path = queue.job->>'relative_path'), + -- If repository_id doesn't exist we still need to fill this column otherwise it may fail on scan into struct. + 0), + replica_path = COALESCE(queue.job->>'replica_path', ''), + relative_path = queue.job->>'relative_path', + target_node_storage = queue.job->>'target_node_storage', + source_node_storage = COALESCE(queue.job->>'source_node_storage', ''), + virtual_storage = queue.job->>'virtual_storage', + params = (queue.job->>'params')::JSONB + FROM replication_queue AS queue + LEFT JOIN repositories ON + queue.job->>'virtual_storage' = repositories.virtual_storage AND + queue.job->>'relative_path' = repositories.relative_path + WHERE replication_queue.id = queue.id`, + + // 2. Drop existing foreign key as it does nothing on removal of the referencable row. + `ALTER TABLE replication_queue_job_lock + DROP CONSTRAINT replication_queue_job_lock_job_id_fkey`, + + // 3. And re-create it with the cascade deletion. + `ALTER TABLE replication_queue_job_lock + ADD CONSTRAINT replication_queue_job_lock_job_id_fkey + FOREIGN KEY (job_id) + REFERENCES replication_queue(id) + ON DELETE CASCADE`, + + // 4. Drop existing foreign key as it does nothing on removal of the referencable row. + `ALTER TABLE replication_queue_job_lock DROP CONSTRAINT replication_queue_job_lock_lock_id_fkey`, + + // 5. And re-create it with the cascade deletion. + `ALTER TABLE replication_queue_job_lock + ADD CONSTRAINT replication_queue_job_lock_lock_id_fkey + FOREIGN KEY (lock_id) + REFERENCES replication_queue_lock(id) + ON DELETE CASCADE`, + + // 6. The replication events without repository_id are too old and should be + // removed for the foreign key constraint to be created on the repositories table. + `DELETE FROM replication_queue + WHERE repository_id = 0`, + + // 7. In case we removed some in_progress replication events we need to remove + // corresponding rows from replication_queue_lock because repository doesn't + // exist anymore and we don't need a lock row for it. + `DELETE FROM replication_queue_lock + WHERE acquired + AND NOT EXISTS(SELECT FROM replication_queue_job_lock WHERE lock_id = id)`, + + // 8. Once the repository is removed (row deleted from repositories table) it + // can't be used anymore. And there is no reason to process any remaining + // replication events. If gitaly node was out of service and repository + // deletion was scheduled as replication job this job will be removed as well + // but https://gitlab.com/gitlab-org/gitaly/-/issues/3719 should deal with it. + // And to automatically cleanup remaining replication events we create a foreign + // key with cascade removal. + `ALTER TABLE replication_queue + ADD CONSTRAINT replication_queue_repository_id_fkey + FOREIGN KEY (repository_id) + REFERENCES repositories(repository_id) + ON DELETE CASCADE`, + + // 9. If repository is removed there is nothing that cleans up replication_queue_lock + // table.The table is used to sync run of the replication jobs and rows in it + // created before rows in the replication_queue table that is why we can't use + // foreign key to remove orphaned lock rows. We rely on the trigger that removes + // rows from replication_queue_lock once the record is removed from the repositories table. + `-- +migrate StatementBegin + CREATE FUNCTION remove_queue_lock_on_repository_removal() RETURNS TRIGGER AS $$ + BEGIN + DELETE FROM replication_queue_lock + WHERE id LIKE (OLD.virtual_storage || '|%|' || OLD.relative_path); + RETURN NULL; + END; + $$ LANGUAGE plpgsql;`, + + // 10. Activates a trigger to remove rows from the replication_queue_lock once the row + // is deleted from the repositories table . + `CREATE TRIGGER remove_queue_lock_on_repository_removal AFTER DELETE ON repositories + FOR EACH ROW EXECUTE PROCEDURE remove_queue_lock_on_repository_removal()`, + }, + Down: []string{ + // 10. + `DROP TRIGGER remove_queue_lock_on_repository_removal ON repositories`, + + // 9. + `DROP FUNCTION remove_queue_lock_on_repository_removal`, + + // 8. + `ALTER TABLE replication_queue + DROP CONSTRAINT replication_queue_repository_id_fkey`, + + // 7. We can't restore deleted rows, nothing to do here. + + // 6. We can't restore deleted rows, nothing to do here. + + // 5. + `ALTER TABLE replication_queue_job_lock DROP CONSTRAINT replication_queue_job_lock_lock_id_fkey`, + + // 4. Re-create foreign key with the default options. + `ALTER TABLE replication_queue_job_lock + ADD CONSTRAINT replication_queue_job_lock_lock_id_fkey + FOREIGN KEY (lock_id) + REFERENCES replication_queue_lock(id)`, + + // 3. + `ALTER TABLE replication_queue_job_lock + DROP CONSTRAINT replication_queue_job_lock_job_id_fkey`, + + // 2. + `ALTER TABLE replication_queue_job_lock + ADD CONSTRAINT replication_queue_job_lock_job_id_fkey + FOREIGN KEY (job_id) + REFERENCES replication_queue(id)`, + + // 1. We don't know what is the set of rows we updated that is why we can't reset + // them back, so nothing to do here. + }, + } + + allMigrations = append(allMigrations, m) +} diff --git a/internal/praefect/datastore/queue_test.go b/internal/praefect/datastore/queue_test.go index 08884c3ba..96407f125 100644 --- a/internal/praefect/datastore/queue_test.go +++ b/internal/praefect/datastore/queue_test.go @@ -8,6 +8,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore/glsql" "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper" "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testdb" ) @@ -111,6 +112,7 @@ func TestPostgresReplicationEventQueue_DeleteReplicaUniqueIndex(t *testing.T) { ctx := testhelper.Context(t) if tc.existingJob != nil { + insertRepository(t, db, ctx, tc.existingJob.Job.VirtualStorage, tc.existingJob.Job.RelativePath, "stub") _, err := db.ExecContext(ctx, ` INSERT INTO replication_queue (state, job) VALUES ($1, $2) @@ -118,14 +120,16 @@ func TestPostgresReplicationEventQueue_DeleteReplicaUniqueIndex(t *testing.T) { require.NoError(t, err) } + job := ReplicationJob{ + Change: DeleteReplica, + VirtualStorage: "praefect", + RelativePath: "relative-path", + TargetNodeStorage: "gitaly-1", + } + insertRepository(t, db, ctx, job.VirtualStorage, job.RelativePath, "stub") _, err := NewPostgresReplicationEventQueue(db).Enqueue(ctx, ReplicationEvent{ State: JobStateReady, - Job: ReplicationJob{ - Change: DeleteReplica, - VirtualStorage: "praefect", - RelativePath: "relative-path", - TargetNodeStorage: "gitaly-1", - }, + Job: job, }) if tc.succeeds { @@ -155,6 +159,7 @@ func TestPostgresReplicationEventQueue_Enqueue(t *testing.T) { Params: nil, }, } + insertRepository(t, db, ctx, eventType.Job.VirtualStorage, eventType.Job.RelativePath, eventType.Job.SourceNodeStorage) actualEvent, err := queue.Enqueue(ctx, eventType) // initial event require.NoError(t, err) @@ -188,90 +193,74 @@ func TestPostgresReplicationEventQueue_Enqueue_triggerPopulatesColumns(t *testin db := testdb.New(t) ctx := testhelper.Context(t) - type action func(t *testing.T, tx *testdb.TxWrapper, event ReplicationEvent) ReplicationEvent + t.Run("no repository record exists", func(t *testing.T) { + tx := db.Begin(t) + defer tx.Rollback(t) - for _, tc := range []struct { - desc string - job ReplicationJob - beforeEnqueue action - afterEnqueue action - }{ - { - desc: "repository id embedded into the job", - job: ReplicationJob{ - Change: UpdateRepo, - RepositoryID: 1, - RelativePath: "/project/path-1", - ReplicaPath: "relative/project/path-1", - TargetNodeStorage: "gitaly-1", - SourceNodeStorage: "gitaly-0", - VirtualStorage: "praefect", - Params: nil, - }, - }, - { - desc: "repository id extracted from repositories table", - job: ReplicationJob{ - Change: RenameRepo, - RelativePath: "/project/path-1", - ReplicaPath: "relative/project/path-1", - TargetNodeStorage: "gitaly-1", - SourceNodeStorage: "gitaly-0", - VirtualStorage: "praefect", - Params: Params{"RelativePath": "new/path"}, - }, - beforeEnqueue: func(t *testing.T, tx *testdb.TxWrapper, event ReplicationEvent) ReplicationEvent { - const query = ` - INSERT INTO repositories(virtual_storage, relative_path, generation, "primary", replica_path) - VALUES ($1, $2, $3, $4, $5) - RETURNING repository_id` - var repositoryID int64 - err := tx.QueryRowContext(ctx, query, event.Job.VirtualStorage, event.Job.RelativePath, 1, event.Job.SourceNodeStorage, event.Job.ReplicaPath). - Scan(&repositoryID) - require.NoError(t, err, "create repository record to have it as a source of repository id") - return event - }, - afterEnqueue: func(t *testing.T, tx *testdb.TxWrapper, event ReplicationEvent) ReplicationEvent { - const query = `SELECT repository_id FROM repositories WHERE virtual_storage = $1 AND relative_path = $2` - err := tx.QueryRowContext(ctx, query, event.Job.VirtualStorage, event.Job.RelativePath). - Scan(&event.Job.RepositoryID) - require.NoError(t, err, "create repository record to have it as a source of repository id") - return event - }, - }, - { - desc: "repository id doesn't exist", - job: ReplicationJob{ - Change: RenameRepo, - RelativePath: "/project/path-1", - ReplicaPath: "relative/project/path-1", - TargetNodeStorage: "gitaly-1", - SourceNodeStorage: "gitaly-0", - VirtualStorage: "praefect", - Params: Params{"RelativePath": "new/path"}, - }, - }, - } { - t.Run(tc.desc, func(t *testing.T) { - tx := db.Begin(t) - defer tx.Rollback(t) + job := ReplicationJob{ + Change: UpdateRepo, + RepositoryID: 1, + RelativePath: "/project/path-1", + ReplicaPath: "relative/project/path-1", + TargetNodeStorage: "gitaly-1", + SourceNodeStorage: "gitaly-0", + VirtualStorage: "praefect", + Params: nil, + } + queue := NewPostgresReplicationEventQueue(tx) + event := ReplicationEvent{Job: job} + _, err := queue.Enqueue(ctx, event) + ok := glsql.IsForeignKeyViolation(err, "replication_queue_repository_id_fkey") + require.Truef(t, ok, "returned error is not expected: %+v", err) + }) - queue := NewPostgresReplicationEventQueue(tx) - event := ReplicationEvent{Job: tc.job} - if tc.beforeEnqueue != nil { - event = tc.beforeEnqueue(t, tx, event) - } - enqueued, err := queue.Enqueue(ctx, event) - require.NoError(t, err) + t.Run("repository id not set on job, but found in repositories table", func(t *testing.T) { + tx := db.Begin(t) + defer tx.Rollback(t) - if tc.afterEnqueue != nil { - enqueued = tc.afterEnqueue(t, tx, enqueued) - } + job := ReplicationJob{ + Change: RenameRepo, + RelativePath: "/project/path-1", + ReplicaPath: "relative/project/path-1", + TargetNodeStorage: "gitaly-1", + SourceNodeStorage: "gitaly-0", + VirtualStorage: "praefect", + Params: Params{"RelativePath": "new/path"}, + } + repositoryID := insertRepository(t, tx, ctx, job.VirtualStorage, job.RelativePath, job.SourceNodeStorage) + queue := NewPostgresReplicationEventQueue(tx) + event := ReplicationEvent{Job: job} + enqueued, err := queue.Enqueue(ctx, event) + require.NoError(t, err) - job := extractReplicationJob(t, ctx, tx, enqueued.ID) - require.Equal(t, enqueued.Job, job) - }) - } + actual := extractReplicationJob(t, ctx, tx, enqueued.ID) + job.RepositoryID = repositoryID + require.Equal(t, job, actual) + }) + + t.Run("repository id set on job", func(t *testing.T) { + tx := db.Begin(t) + defer tx.Rollback(t) + + job := ReplicationJob{ + Change: RenameRepo, + RelativePath: "/project/path-1", + ReplicaPath: "relative/project/path-1", + TargetNodeStorage: "gitaly-1", + SourceNodeStorage: "gitaly-0", + VirtualStorage: "praefect", + Params: Params{"RelativePath": "new/path"}, + } + job.RepositoryID = insertRepository(t, tx, ctx, job.VirtualStorage, job.RelativePath, job.SourceNodeStorage) + + queue := NewPostgresReplicationEventQueue(tx) + event := ReplicationEvent{Job: job} + enqueued, err := queue.Enqueue(ctx, event) + require.NoError(t, err) + + actual := extractReplicationJob(t, ctx, tx, enqueued.ID) + require.Equal(t, job, actual) + }) } func extractReplicationJob(t *testing.T, ctx context.Context, tx *testdb.TxWrapper, id uint64) ReplicationJob { @@ -291,16 +280,19 @@ func extractReplicationJob(t *testing.T, ctx context.Context, tx *testdb.TxWrapp func TestPostgresReplicationEventQueue_DeleteReplicaInfiniteAttempts(t *testing.T) { t.Parallel() - queue := NewPostgresReplicationEventQueue(testdb.New(t)) + db := testdb.New(t) + queue := NewPostgresReplicationEventQueue(db) ctx := testhelper.Context(t) + job := ReplicationJob{ + Change: DeleteReplica, + RelativePath: "/project/path-1", + TargetNodeStorage: "gitaly-1", + VirtualStorage: "praefect", + } + job.RepositoryID = insertRepository(t, db, ctx, job.VirtualStorage, job.RelativePath, "stub") actualEvent, err := queue.Enqueue(ctx, ReplicationEvent{ - Job: ReplicationJob{ - Change: DeleteReplica, - RelativePath: "/project/path-1", - TargetNodeStorage: "gitaly-1", - VirtualStorage: "praefect", - }, + Job: job, }) require.NoError(t, err) @@ -310,6 +302,7 @@ func TestPostgresReplicationEventQueue_DeleteReplicaInfiniteAttempts(t *testing. Attempt: 3, LockID: "praefect|gitaly-1|/project/path-1", Job: ReplicationJob{ + RepositoryID: job.RepositoryID, Change: DeleteReplica, RelativePath: "/project/path-1", TargetNodeStorage: "gitaly-1", @@ -378,6 +371,7 @@ func TestPostgresReplicationEventQueue_EnqueueMultiple(t *testing.T) { }, } + eventType1.Job.RepositoryID = insertRepository(t, db, ctx, eventType1.Job.VirtualStorage, eventType1.Job.RelativePath, eventType1.Job.SourceNodeStorage) event1, err := queue.Enqueue(ctx, eventType1) // initial event require.NoError(t, err) @@ -392,6 +386,7 @@ func TestPostgresReplicationEventQueue_EnqueueMultiple(t *testing.T) { LockID: "praefect-0|gitaly-1|/project/path-1", Job: ReplicationJob{ Change: UpdateRepo, + RepositoryID: eventType1.Job.RepositoryID, RelativePath: "/project/path-1", TargetNodeStorage: "gitaly-1", SourceNodeStorage: "gitaly-0", @@ -414,6 +409,7 @@ func TestPostgresReplicationEventQueue_EnqueueMultiple(t *testing.T) { LockID: "praefect-0|gitaly-1|/project/path-1", Job: ReplicationJob{ Change: UpdateRepo, + RepositoryID: eventType1.Job.RepositoryID, RelativePath: "/project/path-1", TargetNodeStorage: "gitaly-1", SourceNodeStorage: "gitaly-0", @@ -446,6 +442,7 @@ func TestPostgresReplicationEventQueue_EnqueueMultiple(t *testing.T) { requireEvents(t, ctx, db, []ReplicationEvent{expEvent1, expEvent2, expEvent3}) requireLocks(t, ctx, db, []LockRow{expLock1, expLock2}) // the new lock for another target repeated event + eventType3.Job.RepositoryID = insertRepository(t, db, ctx, eventType3.Job.VirtualStorage, eventType3.Job.RelativePath, eventType3.Job.SourceNodeStorage) event4, err := queue.Enqueue(ctx, eventType3) // event for another repo require.NoError(t, err) @@ -456,6 +453,7 @@ func TestPostgresReplicationEventQueue_EnqueueMultiple(t *testing.T) { LockID: "praefect-1|gitaly-1|/project/path-2", Job: ReplicationJob{ Change: UpdateRepo, + RepositoryID: eventType3.Job.RepositoryID, RelativePath: "/project/path-2", TargetNodeStorage: "gitaly-1", SourceNodeStorage: "gitaly-0", @@ -487,6 +485,7 @@ func TestPostgresReplicationEventQueue_Dequeue(t *testing.T) { Params: nil, }, } + event.Job.RepositoryID = insertRepository(t, db, ctx, event.Job.VirtualStorage, event.Job.RelativePath, event.Job.SourceNodeStorage) event, err := queue.Enqueue(ctx, event) require.NoError(t, err, "failed to fill in event queue") @@ -570,6 +569,8 @@ func TestPostgresReplicationEventQueue_DequeueMultiple(t *testing.T) { // events to fill in the queue events := []ReplicationEvent{eventType1, eventType1, eventType2, eventType1, eventType3, eventType4} for i := range events { + events[i].Job.RepositoryID = insertRepository(t, db, ctx, events[i].Job.VirtualStorage, events[i].Job.RelativePath, events[i].Job.SourceNodeStorage) + var err error events[i], err = queue.Enqueue(ctx, events[i]) require.NoError(t, err, "failed to fill in event queue") @@ -655,6 +656,7 @@ func TestPostgresReplicationEventQueue_DequeueSameStorageOtherRepository(t *test } for i := 0; i < 2; i++ { + eventType1.Job.RepositoryID = insertRepository(t, db, ctx, eventType1.Job.VirtualStorage, eventType1.Job.RelativePath, eventType1.Job.SourceNodeStorage) _, err := queue.Enqueue(ctx, eventType1) require.NoError(t, err, "failed to fill in event queue") } @@ -669,6 +671,7 @@ func TestPostgresReplicationEventQueue_DequeueSameStorageOtherRepository(t *test requireJobLocks(t, ctx, db, []JobLockRow{{JobID: 1, LockID: "praefect|gitaly-1|/project/path-1"}}) for i := 0; i < 2; i++ { + eventType2.Job.RepositoryID = insertRepository(t, db, ctx, eventType2.Job.VirtualStorage, eventType2.Job.RelativePath, eventType2.Job.SourceNodeStorage) _, err := queue.Enqueue(ctx, eventType2) require.NoError(t, err, "failed to fill in event queue") } @@ -704,7 +707,9 @@ func TestPostgresReplicationEventQueue_Acknowledge(t *testing.T) { }, } + repositoryID := insertRepository(t, db, ctx, event.Job.VirtualStorage, event.Job.RelativePath, event.Job.SourceNodeStorage) event, err := queue.Enqueue(ctx, event) + event.Job.RepositoryID = repositoryID require.NoError(t, err, "failed to fill in event queue") actual, err := queue.Dequeue(ctx, event.Job.VirtualStorage, event.Job.TargetNodeStorage, 100) @@ -781,6 +786,7 @@ func TestPostgresReplicationEventQueue_AcknowledgeMultiple(t *testing.T) { events := []ReplicationEvent{eventType1, eventType1, eventType2, eventType1, eventType3, eventType2, eventType4} // events to fill in the queue for i := range events { + events[i].Job.RepositoryID = insertRepository(t, db, ctx, events[i].Job.VirtualStorage, events[i].Job.RelativePath, events[i].Job.SourceNodeStorage) var err error events[i], err = queue.Enqueue(ctx, events[i]) require.NoError(t, err, "failed to fill in event queue") @@ -988,6 +994,7 @@ func TestPostgresReplicationEventQueue_StartHealthUpdate(t *testing.T) { queue := NewPostgresReplicationEventQueue(db) events := []ReplicationEvent{eventType1, eventType2, eventType3, eventType4} for i := range events { + events[i].Job.RepositoryID = insertRepository(t, db, ctx, events[i].Job.VirtualStorage, events[i].Job.RelativePath, events[i].Job.SourceNodeStorage) var err error events[i], err = queue.Enqueue(ctx, events[i]) require.NoError(t, err, "failed to fill in event queue") @@ -1062,6 +1069,7 @@ func TestPostgresReplicationEventQueue_AcknowledgeStale(t *testing.T) { db.TruncateAll(t) source := NewPostgresReplicationEventQueue(db) + insertRepository(t, db, ctx, eventType1.Job.VirtualStorage, eventType1.Job.RelativePath, eventType1.Job.SourceNodeStorage) event, err := source.Enqueue(ctx, eventType1) require.NoError(t, err) @@ -1079,10 +1087,12 @@ func TestPostgresReplicationEventQueue_AcknowledgeStale(t *testing.T) { db.TruncateAll(t) source := NewPostgresReplicationEventQueue(db) + insertRepository(t, db, ctx, eventType1.Job.VirtualStorage, eventType1.Job.RelativePath, eventType1.Job.SourceNodeStorage) // move event to 'ready' state event1, err := source.Enqueue(ctx, eventType1) require.NoError(t, err) + insertRepository(t, db, ctx, eventType2.Job.VirtualStorage, eventType2.Job.RelativePath, eventType2.Job.SourceNodeStorage) // move event to 'failed' state event2, err := source.Enqueue(ctx, eventType2) require.NoError(t, err) @@ -1092,6 +1102,7 @@ func TestPostgresReplicationEventQueue_AcknowledgeStale(t *testing.T) { _, err = source.Acknowledge(ctx, JobStateFailed, []uint64{devents2[0].ID}) require.NoError(t, err) + insertRepository(t, db, ctx, eventType3.Job.VirtualStorage, eventType3.Job.RelativePath, eventType3.Job.SourceNodeStorage) // move event to 'dead' state event3, err := source.Enqueue(ctx, eventType3) require.NoError(t, err) @@ -1101,6 +1112,7 @@ func TestPostgresReplicationEventQueue_AcknowledgeStale(t *testing.T) { _, err = source.Acknowledge(ctx, JobStateDead, []uint64{devents3[0].ID}) require.NoError(t, err) + insertRepository(t, db, ctx, eventType4.Job.VirtualStorage, eventType4.Job.RelativePath, eventType4.Job.SourceNodeStorage) event4, err := source.Enqueue(ctx, eventType4) require.NoError(t, err) devents4, err := source.Dequeue(ctx, event4.Job.VirtualStorage, event4.Job.TargetNodeStorage, 1) @@ -1122,6 +1134,7 @@ func TestPostgresReplicationEventQueue_AcknowledgeStale(t *testing.T) { var events []ReplicationEvent for _, eventType := range []ReplicationEvent{eventType1, eventType2, eventType3} { + insertRepository(t, db, ctx, eventType.Job.VirtualStorage, eventType.Job.RelativePath, eventType.Job.SourceNodeStorage) event, err := source.Enqueue(ctx, eventType) require.NoError(t, err) devents, err := source.Dequeue(ctx, event.Job.VirtualStorage, event.Job.TargetNodeStorage, 1) @@ -1151,6 +1164,73 @@ func TestPostgresReplicationEventQueue_AcknowledgeStale(t *testing.T) { }) } +func TestLockRowIsRemovedOnceRepositoryIsRemoved(t *testing.T) { + t.Parallel() + db := testdb.New(t) + ctx := testhelper.Context(t) + + const ( + virtualStorage = "praefect" + relativePath = "/project/path-1" + primaryStorage = "gitaly-0" + ) + + enqueueJobs := []ReplicationJob{ + // This event will be moved to in_progress state and lock will be with acquired=true. + { + Change: UpdateRepo, + RepositoryID: 1, + RelativePath: relativePath, + ReplicaPath: "relative/project/path-1", + TargetNodeStorage: "gitaly-1", + SourceNodeStorage: primaryStorage, + VirtualStorage: virtualStorage, + Params: nil, + }, + // This event is for another storage, but for the same repository. + { + Change: UpdateRepo, + RepositoryID: 1, + RelativePath: relativePath, + ReplicaPath: "relative/project/path-2", + TargetNodeStorage: "gitaly-2", + SourceNodeStorage: primaryStorage, + VirtualStorage: virtualStorage, + Params: nil, + }, + } + repositoryID := insertRepository(t, db, ctx, virtualStorage, relativePath, primaryStorage) + + queue := NewPostgresReplicationEventQueue(db) + for _, job := range enqueueJobs { + event := ReplicationEvent{Job: job} + _, err := queue.Enqueue(ctx, event) + require.NoError(t, err) + } + _, err := queue.Dequeue(ctx, enqueueJobs[0].VirtualStorage, enqueueJobs[0].TargetNodeStorage, 1) + require.NoError(t, err, "pickup one job to change it's status and create additional rows") + + _, err = db.ExecContext(ctx, `DELETE FROM repositories where repository_id = $1`, repositoryID) + require.NoError(t, err) + + db.RequireRowsInTable(t, "replication_queue_lock", 0) +} + +func insertRepository(t *testing.T, db glsql.Querier, ctx context.Context, virtualStorage, relativePath, primary string) int64 { + t.Helper() + const query = ` + INSERT INTO repositories(virtual_storage, relative_path, generation, "primary") + VALUES ($1, $2, $3, $4) + ON CONFLICT (virtual_storage, relative_path) + DO UPDATE SET relative_path = excluded.relative_path + RETURNING repository_id` + var repositoryID int64 + err := db.QueryRowContext(ctx, query, virtualStorage, relativePath, 1, primary). + Scan(&repositoryID) + require.NoError(t, err, "create repository record") + return repositoryID +} + func requireEvents(t *testing.T, ctx context.Context, db testdb.DB, expected []ReplicationEvent) { t.Helper() |