Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorZeger-Jan van de Weg <git@zjvandeweg.nl>2020-06-17 17:47:08 +0300
committerZeger-Jan van de Weg <git@zjvandeweg.nl>2020-06-17 17:47:08 +0300
commit15031e59ac28fe540191d450feaf0684688c0b28 (patch)
tree2f7dd300ad179137cc9e3be89a73efa3cfae67da
parentde39c6d66d3c419e72b43821708142707bc4e750 (diff)
parent26bae69b6bc2ddbcfa94811eb939441acabffe76 (diff)
Merge branch 'ps-dequeue-with-skip-in-progress' into 'master'
Replication not working on Praefect Closes #2801 See merge request gitlab-org/gitaly!2281
-rw-r--r--changelogs/unreleased/ps-dequeue-with-skip-in-progress.yml5
-rw-r--r--internal/praefect/datastore/queue.go26
-rw-r--r--internal/praefect/datastore/queue_test.go89
3 files changed, 97 insertions, 23 deletions
diff --git a/changelogs/unreleased/ps-dequeue-with-skip-in-progress.yml b/changelogs/unreleased/ps-dequeue-with-skip-in-progress.yml
new file mode 100644
index 000000000..86c773e93
--- /dev/null
+++ b/changelogs/unreleased/ps-dequeue-with-skip-in-progress.yml
@@ -0,0 +1,5 @@
+---
+title: Replication not working on Praefect
+merge_request: 2281
+author:
+type: fixed
diff --git a/internal/praefect/datastore/queue.go b/internal/praefect/datastore/queue.go
index 200a0e7d1..7edc4b006 100644
--- a/internal/praefect/datastore/queue.go
+++ b/internal/praefect/datastore/queue.go
@@ -188,12 +188,13 @@ func (rq PostgresReplicationEventQueue) Dequeue(ctx context.Context, virtualStor
SELECT id
FROM replication_queue_lock AS repo_lock
WHERE repo_lock.acquired = FALSE AND repo_lock.id IN (
- SELECT lock_id
- FROM replication_queue
- WHERE attempt > 0
- AND state IN ('ready', 'failed')
- AND job->>'virtual_storage' = $1
- AND job->>'target_node_storage' = $2
+ SELECT rq.lock_id
+ FROM replication_queue rq
+ WHERE rq.attempt > 0
+ AND rq.state IN ('ready', 'failed')
+ AND rq.job->>'virtual_storage' = $1
+ AND rq.job->>'target_node_storage' = $2
+ AND NOT EXISTS (SELECT 1 FROM replication_queue_job_lock WHERE lock_id = rq.lock_id)
ORDER BY created_at
LIMIT $3 FOR UPDATE
)
@@ -208,12 +209,13 @@ func (rq PostgresReplicationEventQueue) Dequeue(ctx context.Context, virtualStor
WHERE queue.lock_id IN (SELECT id FROM to_lock)
AND state NOT IN ('in_progress', 'cancelled', 'completed')
AND queue.id IN (
- SELECT id
- FROM replication_queue
- WHERE attempt > 0
- AND state IN ('ready', 'failed')
- AND job->>'virtual_storage' = $1
- AND job->>'target_node_storage' = $2
+ SELECT rq.id
+ FROM replication_queue rq
+ WHERE rq.attempt > 0
+ AND rq.state IN ('ready', 'failed')
+ AND rq.job->>'virtual_storage' = $1
+ AND rq.job->>'target_node_storage' = $2
+ AND NOT EXISTS (SELECT 1 FROM replication_queue_job_lock WHERE lock_id = rq.lock_id)
ORDER BY created_at
LIMIT $3
)
diff --git a/internal/praefect/datastore/queue_test.go b/internal/praefect/datastore/queue_test.go
index a60b03f5c..9f4c458d1 100644
--- a/internal/praefect/datastore/queue_test.go
+++ b/internal/praefect/datastore/queue_test.go
@@ -345,6 +345,72 @@ func TestPostgresReplicationEventQueue_DequeueMultiple(t *testing.T) {
requireEvents(t, ctx, db, expectedEvents)
}
+func TestPostgresReplicationEventQueue_DequeueSameStorageOtherRepository(t *testing.T) {
+ db := getDB(t)
+
+ ctx, cancel := testhelper.Context()
+ defer cancel()
+
+ queue := PostgresReplicationEventQueue{db.DB}
+
+ eventType1 := ReplicationEvent{
+ Job: ReplicationJob{
+ Change: UpdateRepo,
+ RelativePath: "/project/path-1",
+ TargetNodeStorage: "gitaly-1",
+ SourceNodeStorage: "gitaly-0",
+ VirtualStorage: "praefect",
+ Params: nil,
+ },
+ }
+
+ eventType2 := ReplicationEvent{
+ Job: ReplicationJob{
+ Change: UpdateRepo,
+ RelativePath: "/project/path-2",
+ TargetNodeStorage: "gitaly-1",
+ SourceNodeStorage: "gitaly-0",
+ VirtualStorage: "praefect",
+ Params: nil,
+ },
+ }
+
+ var eventsType1 []ReplicationEvent
+ for i := 0; i < 2; i++ {
+ event, err := queue.Enqueue(ctx, eventType1)
+ require.NoError(t, err, "failed to fill in event queue")
+ eventsType1 = append(eventsType1, event)
+ }
+
+ dequeuedEvents1, err := queue.Dequeue(ctx, "praefect", "gitaly-1", 1)
+ require.NoError(t, err)
+ require.Len(t, dequeuedEvents1, 1)
+ requireLocks(t, ctx, db, []LockRow{
+ // there is only one single lock for all fetched events because of their 'repo' and 'target' combination
+ {ID: "praefect|gitaly-1|/project/path-1", Acquired: true},
+ })
+ requireJobLocks(t, ctx, db, []JobLockRow{{JobID: 1, LockID: "praefect|gitaly-1|/project/path-1"}})
+
+ var eventsType2 []ReplicationEvent
+ for i := 0; i < 2; i++ {
+ event, err := queue.Enqueue(ctx, eventType2)
+ require.NoError(t, err, "failed to fill in event queue")
+ eventsType2 = append(eventsType2, event)
+ }
+
+ dequeuedEvents2, err := queue.Dequeue(ctx, "praefect", "gitaly-1", 1)
+ require.NoError(t, err)
+ require.Len(t, dequeuedEvents2, 1)
+ requireLocks(t, ctx, db, []LockRow{
+ {ID: "praefect|gitaly-1|/project/path-1", Acquired: true},
+ {ID: "praefect|gitaly-1|/project/path-2", Acquired: true},
+ })
+ requireJobLocks(t, ctx, db, []JobLockRow{
+ {JobID: 1, LockID: "praefect|gitaly-1|/project/path-1"},
+ {JobID: 3, LockID: "praefect|gitaly-1|/project/path-2"},
+ })
+}
+
func TestPostgresReplicationEventQueue_Acknowledge(t *testing.T) {
db := getDB(t)
@@ -479,7 +545,7 @@ func TestPostgresReplicationEventQueue_AcknowledgeMultiple(t *testing.T) {
dequeuedEvents2, err := queue.Dequeue(ctx, "praefect", "gitaly-1", 3)
require.NoError(t, err)
- require.Len(t, dequeuedEvents2, 2, "expected: event of type 3 and of type 2 ('failed' will be fetched for retry)")
+ require.Len(t, dequeuedEvents2, 3, "expected: events of type 2 and of type 3 ('failed' will be fetched for retry)")
requireLocks(t, ctx, db, []LockRow{
{ID: "praefect|gitaly-1|/project/path-1", Acquired: true},
{ID: "praefect|gitaly-1|/project/path-2", Acquired: true},
@@ -489,8 +555,9 @@ func TestPostgresReplicationEventQueue_AcknowledgeMultiple(t *testing.T) {
requireJobLocks(t, ctx, db, []JobLockRow{
{JobID: 1, LockID: "praefect|gitaly-1|/project/path-1"},
{JobID: 2, LockID: "praefect|gitaly-1|/project/path-1"},
- {JobID: 5, LockID: "praefect|gitaly-1|/project/path-3"},
{JobID: 3, LockID: "praefect|gitaly-1|/project/path-2"},
+ {JobID: 5, LockID: "praefect|gitaly-1|/project/path-3"},
+ {JobID: 6, LockID: "praefect|gitaly-1|/project/path-2"},
})
acknowledge2, err := queue.Acknowledge(ctx, JobStateCompleted, []uint64{1, 3})
@@ -498,13 +565,14 @@ func TestPostgresReplicationEventQueue_AcknowledgeMultiple(t *testing.T) {
require.Equal(t, []uint64{1, 3}, acknowledge2)
requireLocks(t, ctx, db, []LockRow{
{ID: "praefect|gitaly-1|/project/path-1", Acquired: true},
- {ID: "praefect|gitaly-1|/project/path-2", Acquired: false},
+ {ID: "praefect|gitaly-1|/project/path-2", Acquired: true},
{ID: "praefect|gitaly-1|/project/path-3", Acquired: true},
{ID: "praefect|gitaly-2|/project/path-1", Acquired: false},
})
requireJobLocks(t, ctx, db, []JobLockRow{
{JobID: 2, LockID: "praefect|gitaly-1|/project/path-1"},
{JobID: 5, LockID: "praefect|gitaly-1|/project/path-3"},
+ {JobID: 6, LockID: "praefect|gitaly-1|/project/path-2"},
})
dequeuedEvents3, err := queue.Dequeue(ctx, "praefect", "gitaly-2", 3)
@@ -512,19 +580,20 @@ func TestPostgresReplicationEventQueue_AcknowledgeMultiple(t *testing.T) {
require.Len(t, dequeuedEvents3, 1, "expected: event of type 4")
requireLocks(t, ctx, db, []LockRow{
{ID: "praefect|gitaly-1|/project/path-1", Acquired: true},
- {ID: "praefect|gitaly-1|/project/path-2", Acquired: false},
+ {ID: "praefect|gitaly-1|/project/path-2", Acquired: true},
{ID: "praefect|gitaly-1|/project/path-3", Acquired: true},
{ID: "praefect|gitaly-2|/project/path-1", Acquired: true},
})
requireJobLocks(t, ctx, db, []JobLockRow{
{JobID: 2, LockID: "praefect|gitaly-1|/project/path-1"},
{JobID: 5, LockID: "praefect|gitaly-1|/project/path-3"},
+ {JobID: 6, LockID: "praefect|gitaly-1|/project/path-2"},
{JobID: 7, LockID: "praefect|gitaly-2|/project/path-1"},
})
- acknowledged3, err := queue.Acknowledge(ctx, JobStateCompleted, []uint64{2, 5, 7})
+ acknowledged3, err := queue.Acknowledge(ctx, JobStateCompleted, []uint64{2, 5, 6, 7})
require.NoError(t, err)
- require.Equal(t, []uint64{2, 5, 7}, acknowledged3)
+ require.Equal(t, []uint64{2, 5, 6, 7}, acknowledged3)
requireLocks(t, ctx, db, []LockRow{
{ID: "praefect|gitaly-1|/project/path-1", Acquired: false},
{ID: "praefect|gitaly-1|/project/path-2", Acquired: false},
@@ -535,16 +604,15 @@ func TestPostgresReplicationEventQueue_AcknowledgeMultiple(t *testing.T) {
dequeuedEvents4, err := queue.Dequeue(ctx, "praefect", "gitaly-1", 100500)
require.NoError(t, err)
- require.Len(t, dequeuedEvents4, 2, "expected: event of type 4")
+ require.Len(t, dequeuedEvents4, 1, "expected: event of type 1")
requireLocks(t, ctx, db, []LockRow{
{ID: "praefect|gitaly-1|/project/path-1", Acquired: true},
- {ID: "praefect|gitaly-1|/project/path-2", Acquired: true},
+ {ID: "praefect|gitaly-1|/project/path-2", Acquired: false},
{ID: "praefect|gitaly-1|/project/path-3", Acquired: false},
{ID: "praefect|gitaly-2|/project/path-1", Acquired: false},
})
requireJobLocks(t, ctx, db, []JobLockRow{
{JobID: 4, LockID: "praefect|gitaly-1|/project/path-1"},
- {JobID: 6, LockID: "praefect|gitaly-1|/project/path-2"},
})
newEvent, err := queue.Enqueue(ctx, eventType1)
@@ -558,13 +626,12 @@ func TestPostgresReplicationEventQueue_AcknowledgeMultiple(t *testing.T) {
require.Equal(t, "ready", newEventState, "no way to acknowledge event that is not in in_progress state(was not dequeued)")
requireLocks(t, ctx, db, []LockRow{
{ID: "praefect|gitaly-1|/project/path-1", Acquired: true},
- {ID: "praefect|gitaly-1|/project/path-2", Acquired: true},
+ {ID: "praefect|gitaly-1|/project/path-2", Acquired: false},
{ID: "praefect|gitaly-1|/project/path-3", Acquired: false},
{ID: "praefect|gitaly-2|/project/path-1", Acquired: false},
})
requireJobLocks(t, ctx, db, []JobLockRow{
{JobID: 4, LockID: "praefect|gitaly-1|/project/path-1"},
- {JobID: 6, LockID: "praefect|gitaly-1|/project/path-2"},
})
}