diff options
author | Pavlo Strokov <pstrokov@gitlab.com> | 2020-06-17 17:47:07 +0300 |
---|---|---|
committer | Zeger-Jan van de Weg <git@zjvandeweg.nl> | 2020-06-17 17:47:07 +0300 |
commit | 26bae69b6bc2ddbcfa94811eb939441acabffe76 (patch) | |
tree | 29b2252b14345040a1b7ba1e47f06e2224bd92cd | |
parent | a1bddf693fa258a3f8d97810fcfcfa07d43b15c0 (diff) |
Replication not working on Praefect
Replications for different repositories on the same storage
must be dequeued independently from each other.
It is safe to run N replication jobs at the same time on the
same storage for different repositories.
'in_progress' replications should not block other replication
entries to be consumed from the queue.
Closes: https://gitlab.com/gitlab-org/gitaly/-/issues/2801
-rw-r--r-- | changelogs/unreleased/ps-dequeue-with-skip-in-progress.yml | 5 | ||||
-rw-r--r-- | internal/praefect/datastore/queue.go | 26 | ||||
-rw-r--r-- | internal/praefect/datastore/queue_test.go | 89 |
3 files changed, 97 insertions, 23 deletions
diff --git a/changelogs/unreleased/ps-dequeue-with-skip-in-progress.yml b/changelogs/unreleased/ps-dequeue-with-skip-in-progress.yml new file mode 100644 index 000000000..86c773e93 --- /dev/null +++ b/changelogs/unreleased/ps-dequeue-with-skip-in-progress.yml @@ -0,0 +1,5 @@ +--- +title: Replication not working on Praefect +merge_request: 2281 +author: +type: fixed diff --git a/internal/praefect/datastore/queue.go b/internal/praefect/datastore/queue.go index 0662b4981..e4453cae7 100644 --- a/internal/praefect/datastore/queue.go +++ b/internal/praefect/datastore/queue.go @@ -229,12 +229,13 @@ func (rq PostgresReplicationEventQueue) Dequeue(ctx context.Context, virtualStor SELECT id FROM replication_queue_lock AS repo_lock WHERE repo_lock.acquired = FALSE AND repo_lock.id IN ( - SELECT lock_id - FROM replication_queue - WHERE attempt > 0 - AND state IN ('ready', 'failed') - AND job->>'virtual_storage' = $1 - AND job->>'target_node_storage' = $2 + SELECT rq.lock_id + FROM replication_queue rq + WHERE rq.attempt > 0 + AND rq.state IN ('ready', 'failed') + AND rq.job->>'virtual_storage' = $1 + AND rq.job->>'target_node_storage' = $2 + AND NOT EXISTS (SELECT 1 FROM replication_queue_job_lock WHERE lock_id = rq.lock_id) ORDER BY created_at LIMIT $3 FOR UPDATE ) @@ -249,12 +250,13 @@ func (rq PostgresReplicationEventQueue) Dequeue(ctx context.Context, virtualStor WHERE queue.lock_id IN (SELECT id FROM to_lock) AND state NOT IN ('in_progress', 'cancelled', 'completed') AND queue.id IN ( - SELECT id - FROM replication_queue - WHERE attempt > 0 - AND state IN ('ready', 'failed') - AND job->>'virtual_storage' = $1 - AND job->>'target_node_storage' = $2 + SELECT rq.id + FROM replication_queue rq + WHERE rq.attempt > 0 + AND rq.state IN ('ready', 'failed') + AND rq.job->>'virtual_storage' = $1 + AND rq.job->>'target_node_storage' = $2 + AND NOT EXISTS (SELECT 1 FROM replication_queue_job_lock WHERE lock_id = rq.lock_id) ORDER BY created_at LIMIT $3 ) diff --git a/internal/praefect/datastore/queue_test.go b/internal/praefect/datastore/queue_test.go index 696b0b29c..4e6f14df4 100644 --- a/internal/praefect/datastore/queue_test.go +++ b/internal/praefect/datastore/queue_test.go @@ -349,6 +349,72 @@ func TestPostgresReplicationEventQueue_DequeueMultiple(t *testing.T) { requireEvents(t, ctx, db, expectedEvents) } +func TestPostgresReplicationEventQueue_DequeueSameStorageOtherRepository(t *testing.T) { + db := getDB(t) + + ctx, cancel := testhelper.Context() + defer cancel() + + queue := PostgresReplicationEventQueue{db.DB} + + eventType1 := ReplicationEvent{ + Job: ReplicationJob{ + Change: UpdateRepo, + RelativePath: "/project/path-1", + TargetNodeStorage: "gitaly-1", + SourceNodeStorage: "gitaly-0", + VirtualStorage: "praefect", + Params: nil, + }, + } + + eventType2 := ReplicationEvent{ + Job: ReplicationJob{ + Change: UpdateRepo, + RelativePath: "/project/path-2", + TargetNodeStorage: "gitaly-1", + SourceNodeStorage: "gitaly-0", + VirtualStorage: "praefect", + Params: nil, + }, + } + + var eventsType1 []ReplicationEvent + for i := 0; i < 2; i++ { + event, err := queue.Enqueue(ctx, eventType1) + require.NoError(t, err, "failed to fill in event queue") + eventsType1 = append(eventsType1, event) + } + + dequeuedEvents1, err := queue.Dequeue(ctx, "praefect", "gitaly-1", 1) + require.NoError(t, err) + require.Len(t, dequeuedEvents1, 1) + requireLocks(t, ctx, db, []LockRow{ + // there is only one single lock for all fetched events because of their 'repo' and 'target' combination + {ID: "praefect|gitaly-1|/project/path-1", Acquired: true}, + }) + requireJobLocks(t, ctx, db, []JobLockRow{{JobID: 1, LockID: "praefect|gitaly-1|/project/path-1"}}) + + var eventsType2 []ReplicationEvent + for i := 0; i < 2; i++ { + event, err := queue.Enqueue(ctx, eventType2) + require.NoError(t, err, "failed to fill in event queue") + eventsType2 = append(eventsType2, event) + } + + dequeuedEvents2, err := queue.Dequeue(ctx, "praefect", "gitaly-1", 1) + require.NoError(t, err) + require.Len(t, dequeuedEvents2, 1) + requireLocks(t, ctx, db, []LockRow{ + {ID: "praefect|gitaly-1|/project/path-1", Acquired: true}, + {ID: "praefect|gitaly-1|/project/path-2", Acquired: true}, + }) + requireJobLocks(t, ctx, db, []JobLockRow{ + {JobID: 1, LockID: "praefect|gitaly-1|/project/path-1"}, + {JobID: 3, LockID: "praefect|gitaly-1|/project/path-2"}, + }) +} + func TestPostgresReplicationEventQueue_Acknowledge(t *testing.T) { db := getDB(t) @@ -483,7 +549,7 @@ func TestPostgresReplicationEventQueue_AcknowledgeMultiple(t *testing.T) { dequeuedEvents2, err := queue.Dequeue(ctx, "praefect", "gitaly-1", 3) require.NoError(t, err) - require.Len(t, dequeuedEvents2, 2, "expected: event of type 3 and of type 2 ('failed' will be fetched for retry)") + require.Len(t, dequeuedEvents2, 3, "expected: events of type 2 and of type 3 ('failed' will be fetched for retry)") requireLocks(t, ctx, db, []LockRow{ {ID: "praefect|gitaly-1|/project/path-1", Acquired: true}, {ID: "praefect|gitaly-1|/project/path-2", Acquired: true}, @@ -493,8 +559,9 @@ func TestPostgresReplicationEventQueue_AcknowledgeMultiple(t *testing.T) { requireJobLocks(t, ctx, db, []JobLockRow{ {JobID: 1, LockID: "praefect|gitaly-1|/project/path-1"}, {JobID: 2, LockID: "praefect|gitaly-1|/project/path-1"}, - {JobID: 5, LockID: "praefect|gitaly-1|/project/path-3"}, {JobID: 3, LockID: "praefect|gitaly-1|/project/path-2"}, + {JobID: 5, LockID: "praefect|gitaly-1|/project/path-3"}, + {JobID: 6, LockID: "praefect|gitaly-1|/project/path-2"}, }) acknowledge2, err := queue.Acknowledge(ctx, JobStateCompleted, []uint64{1, 3}) @@ -502,13 +569,14 @@ func TestPostgresReplicationEventQueue_AcknowledgeMultiple(t *testing.T) { require.Equal(t, []uint64{1, 3}, acknowledge2) requireLocks(t, ctx, db, []LockRow{ {ID: "praefect|gitaly-1|/project/path-1", Acquired: true}, - {ID: "praefect|gitaly-1|/project/path-2", Acquired: false}, + {ID: "praefect|gitaly-1|/project/path-2", Acquired: true}, {ID: "praefect|gitaly-1|/project/path-3", Acquired: true}, {ID: "praefect|gitaly-2|/project/path-1", Acquired: false}, }) requireJobLocks(t, ctx, db, []JobLockRow{ {JobID: 2, LockID: "praefect|gitaly-1|/project/path-1"}, {JobID: 5, LockID: "praefect|gitaly-1|/project/path-3"}, + {JobID: 6, LockID: "praefect|gitaly-1|/project/path-2"}, }) dequeuedEvents3, err := queue.Dequeue(ctx, "praefect", "gitaly-2", 3) @@ -516,19 +584,20 @@ func TestPostgresReplicationEventQueue_AcknowledgeMultiple(t *testing.T) { require.Len(t, dequeuedEvents3, 1, "expected: event of type 4") requireLocks(t, ctx, db, []LockRow{ {ID: "praefect|gitaly-1|/project/path-1", Acquired: true}, - {ID: "praefect|gitaly-1|/project/path-2", Acquired: false}, + {ID: "praefect|gitaly-1|/project/path-2", Acquired: true}, {ID: "praefect|gitaly-1|/project/path-3", Acquired: true}, {ID: "praefect|gitaly-2|/project/path-1", Acquired: true}, }) requireJobLocks(t, ctx, db, []JobLockRow{ {JobID: 2, LockID: "praefect|gitaly-1|/project/path-1"}, {JobID: 5, LockID: "praefect|gitaly-1|/project/path-3"}, + {JobID: 6, LockID: "praefect|gitaly-1|/project/path-2"}, {JobID: 7, LockID: "praefect|gitaly-2|/project/path-1"}, }) - acknowledged3, err := queue.Acknowledge(ctx, JobStateCompleted, []uint64{2, 5, 7}) + acknowledged3, err := queue.Acknowledge(ctx, JobStateCompleted, []uint64{2, 5, 6, 7}) require.NoError(t, err) - require.Equal(t, []uint64{2, 5, 7}, acknowledged3) + require.Equal(t, []uint64{2, 5, 6, 7}, acknowledged3) requireLocks(t, ctx, db, []LockRow{ {ID: "praefect|gitaly-1|/project/path-1", Acquired: false}, {ID: "praefect|gitaly-1|/project/path-2", Acquired: false}, @@ -539,16 +608,15 @@ func TestPostgresReplicationEventQueue_AcknowledgeMultiple(t *testing.T) { dequeuedEvents4, err := queue.Dequeue(ctx, "praefect", "gitaly-1", 100500) require.NoError(t, err) - require.Len(t, dequeuedEvents4, 2, "expected: event of type 4") + require.Len(t, dequeuedEvents4, 1, "expected: event of type 1") requireLocks(t, ctx, db, []LockRow{ {ID: "praefect|gitaly-1|/project/path-1", Acquired: true}, - {ID: "praefect|gitaly-1|/project/path-2", Acquired: true}, + {ID: "praefect|gitaly-1|/project/path-2", Acquired: false}, {ID: "praefect|gitaly-1|/project/path-3", Acquired: false}, {ID: "praefect|gitaly-2|/project/path-1", Acquired: false}, }) requireJobLocks(t, ctx, db, []JobLockRow{ {JobID: 4, LockID: "praefect|gitaly-1|/project/path-1"}, - {JobID: 6, LockID: "praefect|gitaly-1|/project/path-2"}, }) newEvent, err := queue.Enqueue(ctx, eventType1) @@ -562,13 +630,12 @@ func TestPostgresReplicationEventQueue_AcknowledgeMultiple(t *testing.T) { require.Equal(t, "ready", newEventState, "no way to acknowledge event that is not in in_progress state(was not dequeued)") requireLocks(t, ctx, db, []LockRow{ {ID: "praefect|gitaly-1|/project/path-1", Acquired: true}, - {ID: "praefect|gitaly-1|/project/path-2", Acquired: true}, + {ID: "praefect|gitaly-1|/project/path-2", Acquired: false}, {ID: "praefect|gitaly-1|/project/path-3", Acquired: false}, {ID: "praefect|gitaly-2|/project/path-1", Acquired: false}, }) requireJobLocks(t, ctx, db, []JobLockRow{ {JobID: 4, LockID: "praefect|gitaly-1|/project/path-1"}, - {JobID: 6, LockID: "praefect|gitaly-1|/project/path-2"}, }) } |