Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPavlo Strokov <pstrokov@gitlab.com>2020-06-17 17:47:07 +0300
committerZeger-Jan van de Weg <git@zjvandeweg.nl>2020-06-17 17:47:07 +0300
commit26bae69b6bc2ddbcfa94811eb939441acabffe76 (patch)
tree29b2252b14345040a1b7ba1e47f06e2224bd92cd
parenta1bddf693fa258a3f8d97810fcfcfa07d43b15c0 (diff)
Replication not working on Praefect
Replications for different repositories on the same storage must be dequeued independently from each other. It is safe to run N replication jobs at the same time on the same storage for different repositories. 'in_progress' replications should not block other replication entries to be consumed from the queue. Closes: https://gitlab.com/gitlab-org/gitaly/-/issues/2801
-rw-r--r--changelogs/unreleased/ps-dequeue-with-skip-in-progress.yml5
-rw-r--r--internal/praefect/datastore/queue.go26
-rw-r--r--internal/praefect/datastore/queue_test.go89
3 files changed, 97 insertions, 23 deletions
diff --git a/changelogs/unreleased/ps-dequeue-with-skip-in-progress.yml b/changelogs/unreleased/ps-dequeue-with-skip-in-progress.yml
new file mode 100644
index 000000000..86c773e93
--- /dev/null
+++ b/changelogs/unreleased/ps-dequeue-with-skip-in-progress.yml
@@ -0,0 +1,5 @@
+---
+title: Replication not working on Praefect
+merge_request: 2281
+author:
+type: fixed
diff --git a/internal/praefect/datastore/queue.go b/internal/praefect/datastore/queue.go
index 0662b4981..e4453cae7 100644
--- a/internal/praefect/datastore/queue.go
+++ b/internal/praefect/datastore/queue.go
@@ -229,12 +229,13 @@ func (rq PostgresReplicationEventQueue) Dequeue(ctx context.Context, virtualStor
SELECT id
FROM replication_queue_lock AS repo_lock
WHERE repo_lock.acquired = FALSE AND repo_lock.id IN (
- SELECT lock_id
- FROM replication_queue
- WHERE attempt > 0
- AND state IN ('ready', 'failed')
- AND job->>'virtual_storage' = $1
- AND job->>'target_node_storage' = $2
+ SELECT rq.lock_id
+ FROM replication_queue rq
+ WHERE rq.attempt > 0
+ AND rq.state IN ('ready', 'failed')
+ AND rq.job->>'virtual_storage' = $1
+ AND rq.job->>'target_node_storage' = $2
+ AND NOT EXISTS (SELECT 1 FROM replication_queue_job_lock WHERE lock_id = rq.lock_id)
ORDER BY created_at
LIMIT $3 FOR UPDATE
)
@@ -249,12 +250,13 @@ func (rq PostgresReplicationEventQueue) Dequeue(ctx context.Context, virtualStor
WHERE queue.lock_id IN (SELECT id FROM to_lock)
AND state NOT IN ('in_progress', 'cancelled', 'completed')
AND queue.id IN (
- SELECT id
- FROM replication_queue
- WHERE attempt > 0
- AND state IN ('ready', 'failed')
- AND job->>'virtual_storage' = $1
- AND job->>'target_node_storage' = $2
+ SELECT rq.id
+ FROM replication_queue rq
+ WHERE rq.attempt > 0
+ AND rq.state IN ('ready', 'failed')
+ AND rq.job->>'virtual_storage' = $1
+ AND rq.job->>'target_node_storage' = $2
+ AND NOT EXISTS (SELECT 1 FROM replication_queue_job_lock WHERE lock_id = rq.lock_id)
ORDER BY created_at
LIMIT $3
)
diff --git a/internal/praefect/datastore/queue_test.go b/internal/praefect/datastore/queue_test.go
index 696b0b29c..4e6f14df4 100644
--- a/internal/praefect/datastore/queue_test.go
+++ b/internal/praefect/datastore/queue_test.go
@@ -349,6 +349,72 @@ func TestPostgresReplicationEventQueue_DequeueMultiple(t *testing.T) {
requireEvents(t, ctx, db, expectedEvents)
}
+func TestPostgresReplicationEventQueue_DequeueSameStorageOtherRepository(t *testing.T) {
+ db := getDB(t)
+
+ ctx, cancel := testhelper.Context()
+ defer cancel()
+
+ queue := PostgresReplicationEventQueue{db.DB}
+
+ eventType1 := ReplicationEvent{
+ Job: ReplicationJob{
+ Change: UpdateRepo,
+ RelativePath: "/project/path-1",
+ TargetNodeStorage: "gitaly-1",
+ SourceNodeStorage: "gitaly-0",
+ VirtualStorage: "praefect",
+ Params: nil,
+ },
+ }
+
+ eventType2 := ReplicationEvent{
+ Job: ReplicationJob{
+ Change: UpdateRepo,
+ RelativePath: "/project/path-2",
+ TargetNodeStorage: "gitaly-1",
+ SourceNodeStorage: "gitaly-0",
+ VirtualStorage: "praefect",
+ Params: nil,
+ },
+ }
+
+ var eventsType1 []ReplicationEvent
+ for i := 0; i < 2; i++ {
+ event, err := queue.Enqueue(ctx, eventType1)
+ require.NoError(t, err, "failed to fill in event queue")
+ eventsType1 = append(eventsType1, event)
+ }
+
+ dequeuedEvents1, err := queue.Dequeue(ctx, "praefect", "gitaly-1", 1)
+ require.NoError(t, err)
+ require.Len(t, dequeuedEvents1, 1)
+ requireLocks(t, ctx, db, []LockRow{
+ // there is only one single lock for all fetched events because of their 'repo' and 'target' combination
+ {ID: "praefect|gitaly-1|/project/path-1", Acquired: true},
+ })
+ requireJobLocks(t, ctx, db, []JobLockRow{{JobID: 1, LockID: "praefect|gitaly-1|/project/path-1"}})
+
+ var eventsType2 []ReplicationEvent
+ for i := 0; i < 2; i++ {
+ event, err := queue.Enqueue(ctx, eventType2)
+ require.NoError(t, err, "failed to fill in event queue")
+ eventsType2 = append(eventsType2, event)
+ }
+
+ dequeuedEvents2, err := queue.Dequeue(ctx, "praefect", "gitaly-1", 1)
+ require.NoError(t, err)
+ require.Len(t, dequeuedEvents2, 1)
+ requireLocks(t, ctx, db, []LockRow{
+ {ID: "praefect|gitaly-1|/project/path-1", Acquired: true},
+ {ID: "praefect|gitaly-1|/project/path-2", Acquired: true},
+ })
+ requireJobLocks(t, ctx, db, []JobLockRow{
+ {JobID: 1, LockID: "praefect|gitaly-1|/project/path-1"},
+ {JobID: 3, LockID: "praefect|gitaly-1|/project/path-2"},
+ })
+}
+
func TestPostgresReplicationEventQueue_Acknowledge(t *testing.T) {
db := getDB(t)
@@ -483,7 +549,7 @@ func TestPostgresReplicationEventQueue_AcknowledgeMultiple(t *testing.T) {
dequeuedEvents2, err := queue.Dequeue(ctx, "praefect", "gitaly-1", 3)
require.NoError(t, err)
- require.Len(t, dequeuedEvents2, 2, "expected: event of type 3 and of type 2 ('failed' will be fetched for retry)")
+ require.Len(t, dequeuedEvents2, 3, "expected: events of type 2 and of type 3 ('failed' will be fetched for retry)")
requireLocks(t, ctx, db, []LockRow{
{ID: "praefect|gitaly-1|/project/path-1", Acquired: true},
{ID: "praefect|gitaly-1|/project/path-2", Acquired: true},
@@ -493,8 +559,9 @@ func TestPostgresReplicationEventQueue_AcknowledgeMultiple(t *testing.T) {
requireJobLocks(t, ctx, db, []JobLockRow{
{JobID: 1, LockID: "praefect|gitaly-1|/project/path-1"},
{JobID: 2, LockID: "praefect|gitaly-1|/project/path-1"},
- {JobID: 5, LockID: "praefect|gitaly-1|/project/path-3"},
{JobID: 3, LockID: "praefect|gitaly-1|/project/path-2"},
+ {JobID: 5, LockID: "praefect|gitaly-1|/project/path-3"},
+ {JobID: 6, LockID: "praefect|gitaly-1|/project/path-2"},
})
acknowledge2, err := queue.Acknowledge(ctx, JobStateCompleted, []uint64{1, 3})
@@ -502,13 +569,14 @@ func TestPostgresReplicationEventQueue_AcknowledgeMultiple(t *testing.T) {
require.Equal(t, []uint64{1, 3}, acknowledge2)
requireLocks(t, ctx, db, []LockRow{
{ID: "praefect|gitaly-1|/project/path-1", Acquired: true},
- {ID: "praefect|gitaly-1|/project/path-2", Acquired: false},
+ {ID: "praefect|gitaly-1|/project/path-2", Acquired: true},
{ID: "praefect|gitaly-1|/project/path-3", Acquired: true},
{ID: "praefect|gitaly-2|/project/path-1", Acquired: false},
})
requireJobLocks(t, ctx, db, []JobLockRow{
{JobID: 2, LockID: "praefect|gitaly-1|/project/path-1"},
{JobID: 5, LockID: "praefect|gitaly-1|/project/path-3"},
+ {JobID: 6, LockID: "praefect|gitaly-1|/project/path-2"},
})
dequeuedEvents3, err := queue.Dequeue(ctx, "praefect", "gitaly-2", 3)
@@ -516,19 +584,20 @@ func TestPostgresReplicationEventQueue_AcknowledgeMultiple(t *testing.T) {
require.Len(t, dequeuedEvents3, 1, "expected: event of type 4")
requireLocks(t, ctx, db, []LockRow{
{ID: "praefect|gitaly-1|/project/path-1", Acquired: true},
- {ID: "praefect|gitaly-1|/project/path-2", Acquired: false},
+ {ID: "praefect|gitaly-1|/project/path-2", Acquired: true},
{ID: "praefect|gitaly-1|/project/path-3", Acquired: true},
{ID: "praefect|gitaly-2|/project/path-1", Acquired: true},
})
requireJobLocks(t, ctx, db, []JobLockRow{
{JobID: 2, LockID: "praefect|gitaly-1|/project/path-1"},
{JobID: 5, LockID: "praefect|gitaly-1|/project/path-3"},
+ {JobID: 6, LockID: "praefect|gitaly-1|/project/path-2"},
{JobID: 7, LockID: "praefect|gitaly-2|/project/path-1"},
})
- acknowledged3, err := queue.Acknowledge(ctx, JobStateCompleted, []uint64{2, 5, 7})
+ acknowledged3, err := queue.Acknowledge(ctx, JobStateCompleted, []uint64{2, 5, 6, 7})
require.NoError(t, err)
- require.Equal(t, []uint64{2, 5, 7}, acknowledged3)
+ require.Equal(t, []uint64{2, 5, 6, 7}, acknowledged3)
requireLocks(t, ctx, db, []LockRow{
{ID: "praefect|gitaly-1|/project/path-1", Acquired: false},
{ID: "praefect|gitaly-1|/project/path-2", Acquired: false},
@@ -539,16 +608,15 @@ func TestPostgresReplicationEventQueue_AcknowledgeMultiple(t *testing.T) {
dequeuedEvents4, err := queue.Dequeue(ctx, "praefect", "gitaly-1", 100500)
require.NoError(t, err)
- require.Len(t, dequeuedEvents4, 2, "expected: event of type 4")
+ require.Len(t, dequeuedEvents4, 1, "expected: event of type 1")
requireLocks(t, ctx, db, []LockRow{
{ID: "praefect|gitaly-1|/project/path-1", Acquired: true},
- {ID: "praefect|gitaly-1|/project/path-2", Acquired: true},
+ {ID: "praefect|gitaly-1|/project/path-2", Acquired: false},
{ID: "praefect|gitaly-1|/project/path-3", Acquired: false},
{ID: "praefect|gitaly-2|/project/path-1", Acquired: false},
})
requireJobLocks(t, ctx, db, []JobLockRow{
{JobID: 4, LockID: "praefect|gitaly-1|/project/path-1"},
- {JobID: 6, LockID: "praefect|gitaly-1|/project/path-2"},
})
newEvent, err := queue.Enqueue(ctx, eventType1)
@@ -562,13 +630,12 @@ func TestPostgresReplicationEventQueue_AcknowledgeMultiple(t *testing.T) {
require.Equal(t, "ready", newEventState, "no way to acknowledge event that is not in in_progress state(was not dequeued)")
requireLocks(t, ctx, db, []LockRow{
{ID: "praefect|gitaly-1|/project/path-1", Acquired: true},
- {ID: "praefect|gitaly-1|/project/path-2", Acquired: true},
+ {ID: "praefect|gitaly-1|/project/path-2", Acquired: false},
{ID: "praefect|gitaly-1|/project/path-3", Acquired: false},
{ID: "praefect|gitaly-2|/project/path-1", Acquired: false},
})
requireJobLocks(t, ctx, db, []JobLockRow{
{JobID: 4, LockID: "praefect|gitaly-1|/project/path-1"},
- {JobID: 6, LockID: "praefect|gitaly-1|/project/path-2"},
})
}