Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPavlo Strokov <pstrokov@gitlab.com>2022-02-15 23:18:49 +0300
committerPavlo Strokov <pstrokov@gitlab.com>2022-02-23 13:36:05 +0300
commita08910de6b5dbc8cfd2e3f559288a60cee0b2a84 (patch)
tree5f5edbfeb9efa89c807e97234ea2cf2352e73365
parent57b3f3a83d55e655ca59a7e677f506ebf41c654b (diff)
migrations: Migrate an existing events and add constraint
The set of newly added columns in the previous commit remains empty or contains default values. We do update for the replication_queue table where we populate colums with the values extracted from the 'job' column corresponding to the JSON field names. If some old replication events have no replication_id field set we try to extract it from the repositories table. We consider all replication events with unset replication_id (0 value) as too old and remove them otherwise we won't be able to create a foreign key constraint. The foreign key between replication_queue and repositories tables is used to clean up replication events once the repository is removed. To make replication_queue_job_lock table more self-managed we re-create replication_queue_job_lock_job_id_fkey and replication_queue_job_lock_lock_id_fkey with cascade removal. It will give us a confidence the rows are removed from the table in case corresponding rows are removed from the replication_queue or replication_queue_lock tables. If there were in_progress events the rows for those will be removed from the replication_queue and replication_queue_job_lock tables, but the rows in the replication_queue_lock will remain with 'acquired' column set to 'true'. Because the repositories not exist anymore we remove rows from the replication_queue_lock as they won't be used anymore. And to prevent existence of the unused rows in the replication_queue_lock table we create a trigger on the repositories table to delete rows from replication_queue_lock table after repository row removal. Part of: https://gitlab.com/gitlab-org/gitaly/-/issues/3974
-rw-r--r--internal/praefect/datastore/collector_test.go18
-rw-r--r--internal/praefect/datastore/migrations/20220211161203_json_job_to_columns.go140
-rw-r--r--internal/praefect/datastore/queue_test.go266
3 files changed, 323 insertions, 101 deletions
diff --git a/internal/praefect/datastore/collector_test.go b/internal/praefect/datastore/collector_test.go
index 3091c8321..d59f2112f 100644
--- a/internal/praefect/datastore/collector_test.go
+++ b/internal/praefect/datastore/collector_test.go
@@ -269,15 +269,17 @@ func TestRepositoryStoreCollector_ReplicationQueueDepth(t *testing.T) {
readyJobs := 5
for virtualStorage, nodes := range storageNames {
for i := 0; i < readyJobs; i++ {
+ job := ReplicationJob{
+ Change: UpdateRepo,
+ RelativePath: "/project/path-1",
+ TargetNodeStorage: nodes[1],
+ SourceNodeStorage: nodes[0],
+ VirtualStorage: virtualStorage,
+ Params: nil,
+ }
+ insertRepository(t, db, ctx, job.VirtualStorage, job.RelativePath, job.SourceNodeStorage)
_, err := queue.Enqueue(ctx, ReplicationEvent{
- Job: ReplicationJob{
- Change: UpdateRepo,
- RelativePath: "/project/path-1",
- TargetNodeStorage: nodes[1],
- SourceNodeStorage: nodes[0],
- VirtualStorage: virtualStorage,
- Params: nil,
- },
+ Job: job,
})
require.NoError(t, err)
}
diff --git a/internal/praefect/datastore/migrations/20220211161203_json_job_to_columns.go b/internal/praefect/datastore/migrations/20220211161203_json_job_to_columns.go
new file mode 100644
index 000000000..0336ae058
--- /dev/null
+++ b/internal/praefect/datastore/migrations/20220211161203_json_job_to_columns.go
@@ -0,0 +1,140 @@
+package migrations
+
+import migrate "github.com/rubenv/sql-migrate"
+
+func init() {
+ m := &migrate.Migration{
+ Id: "20220211161203_json_job_to_columns",
+ Up: []string{
+ // 1. The replication_queue was extended with set of the new columns to cover
+ // job JSON struct. This query update all existing rows to fulfil newly
+ // added columns with data from the job column.
+ `UPDATE replication_queue SET
+ change = (queue.job->>'change')::REPLICATION_JOB_TYPE,
+ repository_id = COALESCE(
+ -- For the old jobs the repository_id may be 'null' as we didn't have migration that
+ -- populates those with the value from the repositories table.
+ -- The repository_id also may be a 0 in case the job was created before repositories table was fulfilled
+ -- and event was created by reconciler.
+ CASE WHEN (queue.job->>'repository_id')::BIGINT = 0 THEN NULL ELSE (queue.job->>'repository_id')::BIGINT END,
+ (SELECT repositories.repository_id FROM repositories WHERE repositories.virtual_storage = queue.job->>'virtual_storage' AND repositories.relative_path = queue.job->>'relative_path'),
+ -- If repository_id doesn't exist we still need to fill this column otherwise it may fail on scan into struct.
+ 0),
+ replica_path = COALESCE(queue.job->>'replica_path', ''),
+ relative_path = queue.job->>'relative_path',
+ target_node_storage = queue.job->>'target_node_storage',
+ source_node_storage = COALESCE(queue.job->>'source_node_storage', ''),
+ virtual_storage = queue.job->>'virtual_storage',
+ params = (queue.job->>'params')::JSONB
+ FROM replication_queue AS queue
+ LEFT JOIN repositories ON
+ queue.job->>'virtual_storage' = repositories.virtual_storage AND
+ queue.job->>'relative_path' = repositories.relative_path
+ WHERE replication_queue.id = queue.id`,
+
+ // 2. Drop existing foreign key as it does nothing on removal of the referencable row.
+ `ALTER TABLE replication_queue_job_lock
+ DROP CONSTRAINT replication_queue_job_lock_job_id_fkey`,
+
+ // 3. And re-create it with the cascade deletion.
+ `ALTER TABLE replication_queue_job_lock
+ ADD CONSTRAINT replication_queue_job_lock_job_id_fkey
+ FOREIGN KEY (job_id)
+ REFERENCES replication_queue(id)
+ ON DELETE CASCADE`,
+
+ // 4. Drop existing foreign key as it does nothing on removal of the referencable row.
+ `ALTER TABLE replication_queue_job_lock DROP CONSTRAINT replication_queue_job_lock_lock_id_fkey`,
+
+ // 5. And re-create it with the cascade deletion.
+ `ALTER TABLE replication_queue_job_lock
+ ADD CONSTRAINT replication_queue_job_lock_lock_id_fkey
+ FOREIGN KEY (lock_id)
+ REFERENCES replication_queue_lock(id)
+ ON DELETE CASCADE`,
+
+ // 6. The replication events without repository_id are too old and should be
+ // removed for the foreign key constraint to be created on the repositories table.
+ `DELETE FROM replication_queue
+ WHERE repository_id = 0`,
+
+ // 7. In case we removed some in_progress replication events we need to remove
+ // corresponding rows from replication_queue_lock because repository doesn't
+ // exist anymore and we don't need a lock row for it.
+ `DELETE FROM replication_queue_lock
+ WHERE acquired
+ AND NOT EXISTS(SELECT FROM replication_queue_job_lock WHERE lock_id = id)`,
+
+ // 8. Once the repository is removed (row deleted from repositories table) it
+ // can't be used anymore. And there is no reason to process any remaining
+ // replication events. If gitaly node was out of service and repository
+ // deletion was scheduled as replication job this job will be removed as well
+ // but https://gitlab.com/gitlab-org/gitaly/-/issues/3719 should deal with it.
+ // And to automatically cleanup remaining replication events we create a foreign
+ // key with cascade removal.
+ `ALTER TABLE replication_queue
+ ADD CONSTRAINT replication_queue_repository_id_fkey
+ FOREIGN KEY (repository_id)
+ REFERENCES repositories(repository_id)
+ ON DELETE CASCADE`,
+
+ // 9. If repository is removed there is nothing that cleans up replication_queue_lock
+ // table.The table is used to sync run of the replication jobs and rows in it
+ // created before rows in the replication_queue table that is why we can't use
+ // foreign key to remove orphaned lock rows. We rely on the trigger that removes
+ // rows from replication_queue_lock once the record is removed from the repositories table.
+ `-- +migrate StatementBegin
+ CREATE FUNCTION remove_queue_lock_on_repository_removal() RETURNS TRIGGER AS $$
+ BEGIN
+ DELETE FROM replication_queue_lock
+ WHERE id LIKE (OLD.virtual_storage || '|%|' || OLD.relative_path);
+ RETURN NULL;
+ END;
+ $$ LANGUAGE plpgsql;`,
+
+ // 10. Activates a trigger to remove rows from the replication_queue_lock once the row
+ // is deleted from the repositories table .
+ `CREATE TRIGGER remove_queue_lock_on_repository_removal AFTER DELETE ON repositories
+ FOR EACH ROW EXECUTE PROCEDURE remove_queue_lock_on_repository_removal()`,
+ },
+ Down: []string{
+ // 10.
+ `DROP TRIGGER remove_queue_lock_on_repository_removal ON repositories`,
+
+ // 9.
+ `DROP FUNCTION remove_queue_lock_on_repository_removal`,
+
+ // 8.
+ `ALTER TABLE replication_queue
+ DROP CONSTRAINT replication_queue_repository_id_fkey`,
+
+ // 7. We can't restore deleted rows, nothing to do here.
+
+ // 6. We can't restore deleted rows, nothing to do here.
+
+ // 5.
+ `ALTER TABLE replication_queue_job_lock DROP CONSTRAINT replication_queue_job_lock_lock_id_fkey`,
+
+ // 4. Re-create foreign key with the default options.
+ `ALTER TABLE replication_queue_job_lock
+ ADD CONSTRAINT replication_queue_job_lock_lock_id_fkey
+ FOREIGN KEY (lock_id)
+ REFERENCES replication_queue_lock(id)`,
+
+ // 3.
+ `ALTER TABLE replication_queue_job_lock
+ DROP CONSTRAINT replication_queue_job_lock_job_id_fkey`,
+
+ // 2.
+ `ALTER TABLE replication_queue_job_lock
+ ADD CONSTRAINT replication_queue_job_lock_job_id_fkey
+ FOREIGN KEY (job_id)
+ REFERENCES replication_queue(id)`,
+
+ // 1. We don't know what is the set of rows we updated that is why we can't reset
+ // them back, so nothing to do here.
+ },
+ }
+
+ allMigrations = append(allMigrations, m)
+}
diff --git a/internal/praefect/datastore/queue_test.go b/internal/praefect/datastore/queue_test.go
index 08884c3ba..96407f125 100644
--- a/internal/praefect/datastore/queue_test.go
+++ b/internal/praefect/datastore/queue_test.go
@@ -8,6 +8,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore/glsql"
"gitlab.com/gitlab-org/gitaly/v14/internal/testhelper"
"gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testdb"
)
@@ -111,6 +112,7 @@ func TestPostgresReplicationEventQueue_DeleteReplicaUniqueIndex(t *testing.T) {
ctx := testhelper.Context(t)
if tc.existingJob != nil {
+ insertRepository(t, db, ctx, tc.existingJob.Job.VirtualStorage, tc.existingJob.Job.RelativePath, "stub")
_, err := db.ExecContext(ctx, `
INSERT INTO replication_queue (state, job)
VALUES ($1, $2)
@@ -118,14 +120,16 @@ func TestPostgresReplicationEventQueue_DeleteReplicaUniqueIndex(t *testing.T) {
require.NoError(t, err)
}
+ job := ReplicationJob{
+ Change: DeleteReplica,
+ VirtualStorage: "praefect",
+ RelativePath: "relative-path",
+ TargetNodeStorage: "gitaly-1",
+ }
+ insertRepository(t, db, ctx, job.VirtualStorage, job.RelativePath, "stub")
_, err := NewPostgresReplicationEventQueue(db).Enqueue(ctx, ReplicationEvent{
State: JobStateReady,
- Job: ReplicationJob{
- Change: DeleteReplica,
- VirtualStorage: "praefect",
- RelativePath: "relative-path",
- TargetNodeStorage: "gitaly-1",
- },
+ Job: job,
})
if tc.succeeds {
@@ -155,6 +159,7 @@ func TestPostgresReplicationEventQueue_Enqueue(t *testing.T) {
Params: nil,
},
}
+ insertRepository(t, db, ctx, eventType.Job.VirtualStorage, eventType.Job.RelativePath, eventType.Job.SourceNodeStorage)
actualEvent, err := queue.Enqueue(ctx, eventType) // initial event
require.NoError(t, err)
@@ -188,90 +193,74 @@ func TestPostgresReplicationEventQueue_Enqueue_triggerPopulatesColumns(t *testin
db := testdb.New(t)
ctx := testhelper.Context(t)
- type action func(t *testing.T, tx *testdb.TxWrapper, event ReplicationEvent) ReplicationEvent
+ t.Run("no repository record exists", func(t *testing.T) {
+ tx := db.Begin(t)
+ defer tx.Rollback(t)
- for _, tc := range []struct {
- desc string
- job ReplicationJob
- beforeEnqueue action
- afterEnqueue action
- }{
- {
- desc: "repository id embedded into the job",
- job: ReplicationJob{
- Change: UpdateRepo,
- RepositoryID: 1,
- RelativePath: "/project/path-1",
- ReplicaPath: "relative/project/path-1",
- TargetNodeStorage: "gitaly-1",
- SourceNodeStorage: "gitaly-0",
- VirtualStorage: "praefect",
- Params: nil,
- },
- },
- {
- desc: "repository id extracted from repositories table",
- job: ReplicationJob{
- Change: RenameRepo,
- RelativePath: "/project/path-1",
- ReplicaPath: "relative/project/path-1",
- TargetNodeStorage: "gitaly-1",
- SourceNodeStorage: "gitaly-0",
- VirtualStorage: "praefect",
- Params: Params{"RelativePath": "new/path"},
- },
- beforeEnqueue: func(t *testing.T, tx *testdb.TxWrapper, event ReplicationEvent) ReplicationEvent {
- const query = `
- INSERT INTO repositories(virtual_storage, relative_path, generation, "primary", replica_path)
- VALUES ($1, $2, $3, $4, $5)
- RETURNING repository_id`
- var repositoryID int64
- err := tx.QueryRowContext(ctx, query, event.Job.VirtualStorage, event.Job.RelativePath, 1, event.Job.SourceNodeStorage, event.Job.ReplicaPath).
- Scan(&repositoryID)
- require.NoError(t, err, "create repository record to have it as a source of repository id")
- return event
- },
- afterEnqueue: func(t *testing.T, tx *testdb.TxWrapper, event ReplicationEvent) ReplicationEvent {
- const query = `SELECT repository_id FROM repositories WHERE virtual_storage = $1 AND relative_path = $2`
- err := tx.QueryRowContext(ctx, query, event.Job.VirtualStorage, event.Job.RelativePath).
- Scan(&event.Job.RepositoryID)
- require.NoError(t, err, "create repository record to have it as a source of repository id")
- return event
- },
- },
- {
- desc: "repository id doesn't exist",
- job: ReplicationJob{
- Change: RenameRepo,
- RelativePath: "/project/path-1",
- ReplicaPath: "relative/project/path-1",
- TargetNodeStorage: "gitaly-1",
- SourceNodeStorage: "gitaly-0",
- VirtualStorage: "praefect",
- Params: Params{"RelativePath": "new/path"},
- },
- },
- } {
- t.Run(tc.desc, func(t *testing.T) {
- tx := db.Begin(t)
- defer tx.Rollback(t)
+ job := ReplicationJob{
+ Change: UpdateRepo,
+ RepositoryID: 1,
+ RelativePath: "/project/path-1",
+ ReplicaPath: "relative/project/path-1",
+ TargetNodeStorage: "gitaly-1",
+ SourceNodeStorage: "gitaly-0",
+ VirtualStorage: "praefect",
+ Params: nil,
+ }
+ queue := NewPostgresReplicationEventQueue(tx)
+ event := ReplicationEvent{Job: job}
+ _, err := queue.Enqueue(ctx, event)
+ ok := glsql.IsForeignKeyViolation(err, "replication_queue_repository_id_fkey")
+ require.Truef(t, ok, "returned error is not expected: %+v", err)
+ })
- queue := NewPostgresReplicationEventQueue(tx)
- event := ReplicationEvent{Job: tc.job}
- if tc.beforeEnqueue != nil {
- event = tc.beforeEnqueue(t, tx, event)
- }
- enqueued, err := queue.Enqueue(ctx, event)
- require.NoError(t, err)
+ t.Run("repository id not set on job, but found in repositories table", func(t *testing.T) {
+ tx := db.Begin(t)
+ defer tx.Rollback(t)
- if tc.afterEnqueue != nil {
- enqueued = tc.afterEnqueue(t, tx, enqueued)
- }
+ job := ReplicationJob{
+ Change: RenameRepo,
+ RelativePath: "/project/path-1",
+ ReplicaPath: "relative/project/path-1",
+ TargetNodeStorage: "gitaly-1",
+ SourceNodeStorage: "gitaly-0",
+ VirtualStorage: "praefect",
+ Params: Params{"RelativePath": "new/path"},
+ }
+ repositoryID := insertRepository(t, tx, ctx, job.VirtualStorage, job.RelativePath, job.SourceNodeStorage)
+ queue := NewPostgresReplicationEventQueue(tx)
+ event := ReplicationEvent{Job: job}
+ enqueued, err := queue.Enqueue(ctx, event)
+ require.NoError(t, err)
- job := extractReplicationJob(t, ctx, tx, enqueued.ID)
- require.Equal(t, enqueued.Job, job)
- })
- }
+ actual := extractReplicationJob(t, ctx, tx, enqueued.ID)
+ job.RepositoryID = repositoryID
+ require.Equal(t, job, actual)
+ })
+
+ t.Run("repository id set on job", func(t *testing.T) {
+ tx := db.Begin(t)
+ defer tx.Rollback(t)
+
+ job := ReplicationJob{
+ Change: RenameRepo,
+ RelativePath: "/project/path-1",
+ ReplicaPath: "relative/project/path-1",
+ TargetNodeStorage: "gitaly-1",
+ SourceNodeStorage: "gitaly-0",
+ VirtualStorage: "praefect",
+ Params: Params{"RelativePath": "new/path"},
+ }
+ job.RepositoryID = insertRepository(t, tx, ctx, job.VirtualStorage, job.RelativePath, job.SourceNodeStorage)
+
+ queue := NewPostgresReplicationEventQueue(tx)
+ event := ReplicationEvent{Job: job}
+ enqueued, err := queue.Enqueue(ctx, event)
+ require.NoError(t, err)
+
+ actual := extractReplicationJob(t, ctx, tx, enqueued.ID)
+ require.Equal(t, job, actual)
+ })
}
func extractReplicationJob(t *testing.T, ctx context.Context, tx *testdb.TxWrapper, id uint64) ReplicationJob {
@@ -291,16 +280,19 @@ func extractReplicationJob(t *testing.T, ctx context.Context, tx *testdb.TxWrapp
func TestPostgresReplicationEventQueue_DeleteReplicaInfiniteAttempts(t *testing.T) {
t.Parallel()
- queue := NewPostgresReplicationEventQueue(testdb.New(t))
+ db := testdb.New(t)
+ queue := NewPostgresReplicationEventQueue(db)
ctx := testhelper.Context(t)
+ job := ReplicationJob{
+ Change: DeleteReplica,
+ RelativePath: "/project/path-1",
+ TargetNodeStorage: "gitaly-1",
+ VirtualStorage: "praefect",
+ }
+ job.RepositoryID = insertRepository(t, db, ctx, job.VirtualStorage, job.RelativePath, "stub")
actualEvent, err := queue.Enqueue(ctx, ReplicationEvent{
- Job: ReplicationJob{
- Change: DeleteReplica,
- RelativePath: "/project/path-1",
- TargetNodeStorage: "gitaly-1",
- VirtualStorage: "praefect",
- },
+ Job: job,
})
require.NoError(t, err)
@@ -310,6 +302,7 @@ func TestPostgresReplicationEventQueue_DeleteReplicaInfiniteAttempts(t *testing.
Attempt: 3,
LockID: "praefect|gitaly-1|/project/path-1",
Job: ReplicationJob{
+ RepositoryID: job.RepositoryID,
Change: DeleteReplica,
RelativePath: "/project/path-1",
TargetNodeStorage: "gitaly-1",
@@ -378,6 +371,7 @@ func TestPostgresReplicationEventQueue_EnqueueMultiple(t *testing.T) {
},
}
+ eventType1.Job.RepositoryID = insertRepository(t, db, ctx, eventType1.Job.VirtualStorage, eventType1.Job.RelativePath, eventType1.Job.SourceNodeStorage)
event1, err := queue.Enqueue(ctx, eventType1) // initial event
require.NoError(t, err)
@@ -392,6 +386,7 @@ func TestPostgresReplicationEventQueue_EnqueueMultiple(t *testing.T) {
LockID: "praefect-0|gitaly-1|/project/path-1",
Job: ReplicationJob{
Change: UpdateRepo,
+ RepositoryID: eventType1.Job.RepositoryID,
RelativePath: "/project/path-1",
TargetNodeStorage: "gitaly-1",
SourceNodeStorage: "gitaly-0",
@@ -414,6 +409,7 @@ func TestPostgresReplicationEventQueue_EnqueueMultiple(t *testing.T) {
LockID: "praefect-0|gitaly-1|/project/path-1",
Job: ReplicationJob{
Change: UpdateRepo,
+ RepositoryID: eventType1.Job.RepositoryID,
RelativePath: "/project/path-1",
TargetNodeStorage: "gitaly-1",
SourceNodeStorage: "gitaly-0",
@@ -446,6 +442,7 @@ func TestPostgresReplicationEventQueue_EnqueueMultiple(t *testing.T) {
requireEvents(t, ctx, db, []ReplicationEvent{expEvent1, expEvent2, expEvent3})
requireLocks(t, ctx, db, []LockRow{expLock1, expLock2}) // the new lock for another target repeated event
+ eventType3.Job.RepositoryID = insertRepository(t, db, ctx, eventType3.Job.VirtualStorage, eventType3.Job.RelativePath, eventType3.Job.SourceNodeStorage)
event4, err := queue.Enqueue(ctx, eventType3) // event for another repo
require.NoError(t, err)
@@ -456,6 +453,7 @@ func TestPostgresReplicationEventQueue_EnqueueMultiple(t *testing.T) {
LockID: "praefect-1|gitaly-1|/project/path-2",
Job: ReplicationJob{
Change: UpdateRepo,
+ RepositoryID: eventType3.Job.RepositoryID,
RelativePath: "/project/path-2",
TargetNodeStorage: "gitaly-1",
SourceNodeStorage: "gitaly-0",
@@ -487,6 +485,7 @@ func TestPostgresReplicationEventQueue_Dequeue(t *testing.T) {
Params: nil,
},
}
+ event.Job.RepositoryID = insertRepository(t, db, ctx, event.Job.VirtualStorage, event.Job.RelativePath, event.Job.SourceNodeStorage)
event, err := queue.Enqueue(ctx, event)
require.NoError(t, err, "failed to fill in event queue")
@@ -570,6 +569,8 @@ func TestPostgresReplicationEventQueue_DequeueMultiple(t *testing.T) {
// events to fill in the queue
events := []ReplicationEvent{eventType1, eventType1, eventType2, eventType1, eventType3, eventType4}
for i := range events {
+ events[i].Job.RepositoryID = insertRepository(t, db, ctx, events[i].Job.VirtualStorage, events[i].Job.RelativePath, events[i].Job.SourceNodeStorage)
+
var err error
events[i], err = queue.Enqueue(ctx, events[i])
require.NoError(t, err, "failed to fill in event queue")
@@ -655,6 +656,7 @@ func TestPostgresReplicationEventQueue_DequeueSameStorageOtherRepository(t *test
}
for i := 0; i < 2; i++ {
+ eventType1.Job.RepositoryID = insertRepository(t, db, ctx, eventType1.Job.VirtualStorage, eventType1.Job.RelativePath, eventType1.Job.SourceNodeStorage)
_, err := queue.Enqueue(ctx, eventType1)
require.NoError(t, err, "failed to fill in event queue")
}
@@ -669,6 +671,7 @@ func TestPostgresReplicationEventQueue_DequeueSameStorageOtherRepository(t *test
requireJobLocks(t, ctx, db, []JobLockRow{{JobID: 1, LockID: "praefect|gitaly-1|/project/path-1"}})
for i := 0; i < 2; i++ {
+ eventType2.Job.RepositoryID = insertRepository(t, db, ctx, eventType2.Job.VirtualStorage, eventType2.Job.RelativePath, eventType2.Job.SourceNodeStorage)
_, err := queue.Enqueue(ctx, eventType2)
require.NoError(t, err, "failed to fill in event queue")
}
@@ -704,7 +707,9 @@ func TestPostgresReplicationEventQueue_Acknowledge(t *testing.T) {
},
}
+ repositoryID := insertRepository(t, db, ctx, event.Job.VirtualStorage, event.Job.RelativePath, event.Job.SourceNodeStorage)
event, err := queue.Enqueue(ctx, event)
+ event.Job.RepositoryID = repositoryID
require.NoError(t, err, "failed to fill in event queue")
actual, err := queue.Dequeue(ctx, event.Job.VirtualStorage, event.Job.TargetNodeStorage, 100)
@@ -781,6 +786,7 @@ func TestPostgresReplicationEventQueue_AcknowledgeMultiple(t *testing.T) {
events := []ReplicationEvent{eventType1, eventType1, eventType2, eventType1, eventType3, eventType2, eventType4} // events to fill in the queue
for i := range events {
+ events[i].Job.RepositoryID = insertRepository(t, db, ctx, events[i].Job.VirtualStorage, events[i].Job.RelativePath, events[i].Job.SourceNodeStorage)
var err error
events[i], err = queue.Enqueue(ctx, events[i])
require.NoError(t, err, "failed to fill in event queue")
@@ -988,6 +994,7 @@ func TestPostgresReplicationEventQueue_StartHealthUpdate(t *testing.T) {
queue := NewPostgresReplicationEventQueue(db)
events := []ReplicationEvent{eventType1, eventType2, eventType3, eventType4}
for i := range events {
+ events[i].Job.RepositoryID = insertRepository(t, db, ctx, events[i].Job.VirtualStorage, events[i].Job.RelativePath, events[i].Job.SourceNodeStorage)
var err error
events[i], err = queue.Enqueue(ctx, events[i])
require.NoError(t, err, "failed to fill in event queue")
@@ -1062,6 +1069,7 @@ func TestPostgresReplicationEventQueue_AcknowledgeStale(t *testing.T) {
db.TruncateAll(t)
source := NewPostgresReplicationEventQueue(db)
+ insertRepository(t, db, ctx, eventType1.Job.VirtualStorage, eventType1.Job.RelativePath, eventType1.Job.SourceNodeStorage)
event, err := source.Enqueue(ctx, eventType1)
require.NoError(t, err)
@@ -1079,10 +1087,12 @@ func TestPostgresReplicationEventQueue_AcknowledgeStale(t *testing.T) {
db.TruncateAll(t)
source := NewPostgresReplicationEventQueue(db)
+ insertRepository(t, db, ctx, eventType1.Job.VirtualStorage, eventType1.Job.RelativePath, eventType1.Job.SourceNodeStorage)
// move event to 'ready' state
event1, err := source.Enqueue(ctx, eventType1)
require.NoError(t, err)
+ insertRepository(t, db, ctx, eventType2.Job.VirtualStorage, eventType2.Job.RelativePath, eventType2.Job.SourceNodeStorage)
// move event to 'failed' state
event2, err := source.Enqueue(ctx, eventType2)
require.NoError(t, err)
@@ -1092,6 +1102,7 @@ func TestPostgresReplicationEventQueue_AcknowledgeStale(t *testing.T) {
_, err = source.Acknowledge(ctx, JobStateFailed, []uint64{devents2[0].ID})
require.NoError(t, err)
+ insertRepository(t, db, ctx, eventType3.Job.VirtualStorage, eventType3.Job.RelativePath, eventType3.Job.SourceNodeStorage)
// move event to 'dead' state
event3, err := source.Enqueue(ctx, eventType3)
require.NoError(t, err)
@@ -1101,6 +1112,7 @@ func TestPostgresReplicationEventQueue_AcknowledgeStale(t *testing.T) {
_, err = source.Acknowledge(ctx, JobStateDead, []uint64{devents3[0].ID})
require.NoError(t, err)
+ insertRepository(t, db, ctx, eventType4.Job.VirtualStorage, eventType4.Job.RelativePath, eventType4.Job.SourceNodeStorage)
event4, err := source.Enqueue(ctx, eventType4)
require.NoError(t, err)
devents4, err := source.Dequeue(ctx, event4.Job.VirtualStorage, event4.Job.TargetNodeStorage, 1)
@@ -1122,6 +1134,7 @@ func TestPostgresReplicationEventQueue_AcknowledgeStale(t *testing.T) {
var events []ReplicationEvent
for _, eventType := range []ReplicationEvent{eventType1, eventType2, eventType3} {
+ insertRepository(t, db, ctx, eventType.Job.VirtualStorage, eventType.Job.RelativePath, eventType.Job.SourceNodeStorage)
event, err := source.Enqueue(ctx, eventType)
require.NoError(t, err)
devents, err := source.Dequeue(ctx, event.Job.VirtualStorage, event.Job.TargetNodeStorage, 1)
@@ -1151,6 +1164,73 @@ func TestPostgresReplicationEventQueue_AcknowledgeStale(t *testing.T) {
})
}
+func TestLockRowIsRemovedOnceRepositoryIsRemoved(t *testing.T) {
+ t.Parallel()
+ db := testdb.New(t)
+ ctx := testhelper.Context(t)
+
+ const (
+ virtualStorage = "praefect"
+ relativePath = "/project/path-1"
+ primaryStorage = "gitaly-0"
+ )
+
+ enqueueJobs := []ReplicationJob{
+ // This event will be moved to in_progress state and lock will be with acquired=true.
+ {
+ Change: UpdateRepo,
+ RepositoryID: 1,
+ RelativePath: relativePath,
+ ReplicaPath: "relative/project/path-1",
+ TargetNodeStorage: "gitaly-1",
+ SourceNodeStorage: primaryStorage,
+ VirtualStorage: virtualStorage,
+ Params: nil,
+ },
+ // This event is for another storage, but for the same repository.
+ {
+ Change: UpdateRepo,
+ RepositoryID: 1,
+ RelativePath: relativePath,
+ ReplicaPath: "relative/project/path-2",
+ TargetNodeStorage: "gitaly-2",
+ SourceNodeStorage: primaryStorage,
+ VirtualStorage: virtualStorage,
+ Params: nil,
+ },
+ }
+ repositoryID := insertRepository(t, db, ctx, virtualStorage, relativePath, primaryStorage)
+
+ queue := NewPostgresReplicationEventQueue(db)
+ for _, job := range enqueueJobs {
+ event := ReplicationEvent{Job: job}
+ _, err := queue.Enqueue(ctx, event)
+ require.NoError(t, err)
+ }
+ _, err := queue.Dequeue(ctx, enqueueJobs[0].VirtualStorage, enqueueJobs[0].TargetNodeStorage, 1)
+ require.NoError(t, err, "pickup one job to change it's status and create additional rows")
+
+ _, err = db.ExecContext(ctx, `DELETE FROM repositories where repository_id = $1`, repositoryID)
+ require.NoError(t, err)
+
+ db.RequireRowsInTable(t, "replication_queue_lock", 0)
+}
+
+func insertRepository(t *testing.T, db glsql.Querier, ctx context.Context, virtualStorage, relativePath, primary string) int64 {
+ t.Helper()
+ const query = `
+ INSERT INTO repositories(virtual_storage, relative_path, generation, "primary")
+ VALUES ($1, $2, $3, $4)
+ ON CONFLICT (virtual_storage, relative_path)
+ DO UPDATE SET relative_path = excluded.relative_path
+ RETURNING repository_id`
+ var repositoryID int64
+ err := db.QueryRowContext(ctx, query, virtualStorage, relativePath, 1, primary).
+ Scan(&repositoryID)
+ require.NoError(t, err, "create repository record")
+ return repositoryID
+}
+
func requireEvents(t *testing.T, ctx context.Context, db testdb.DB, expected []ReplicationEvent) {
t.Helper()