Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSami Hiltunen <shiltunen@gitlab.com>2020-07-20 16:48:28 +0300
committerSami Hiltunen <shiltunen@gitlab.com>2020-07-20 17:21:46 +0300
commitb14362fa95c48966924106d19f0abafbe1fc2a11 (patch)
treef23f7f0565f485e842a9fed5b3370c5e5975c6fb
parent893137c05e065ab96edd21af1a5ad8f6745b4a08 (diff)
pick the newest job for processing when collapsing jobssmh-collapse-jobs-pick-oldest
Currently Praefect picks the oldest job for processing when collapsing jobs. When a job is completed, other jobs with the same (lock_id, change_type) are also acknowledged as complete, avoiding redundant work by collapsing replication jobs that would perform the same change. Repository generation tracking was introduced in 7bbc7cc4. As the generation numbers are propagated in replication jobs, skipping newer jobs means we are not acknowleding higher generations we've replicated. This leads to repositories being considered outdated even if they're not. To avoid this while still keeping the benefits of job collapsing, this commit changes the collapsing to prefer newest jobs for a given repo while still maintaining the queueuing order between repositories. For the following queue of jobs: J1/R1 -> J2/R2 -> J3/R2 -> J4/R1 -> J5/R3 Dequeuing two events should yield: J4 -> J3 The queueing order of the repositories is maintained while for a given repository, the latest job is picked. Additionally, this commit only acknowledges jobs older than the completed job as newer jobs might contain higher generation numbers. Source node is not considered anymore when acknowledging a job as replicating first from Node A and then from Node B is redundant as Node B's changes overwrite Node A's.
-rw-r--r--internal/praefect/datastore/queue.go26
-rw-r--r--internal/praefect/datastore/queue_test.go38
2 files changed, 32 insertions, 32 deletions
diff --git a/internal/praefect/datastore/queue.go b/internal/praefect/datastore/queue.go
index 8d1d6e941..ceb0fabcf 100644
--- a/internal/praefect/datastore/queue.go
+++ b/internal/praefect/datastore/queue.go
@@ -206,18 +206,20 @@ func (rq PostgresReplicationEventQueue) Dequeue(ctx context.Context, virtualStor
FOR UPDATE SKIP LOCKED
)
, candidate AS (
- SELECT id
- FROM replication_queue
- WHERE id IN (
- SELECT DISTINCT FIRST_VALUE(queue.id) OVER (PARTITION BY lock_id, job->>'change' ORDER BY queue.created_at)
+ SELECT collapse_to AS id
+ FROM (
+ SELECT
+ row_number() OVER repo_change = 1 AS first_in_partition,
+ max(queue.id) OVER repo_change AS collapse_to
FROM replication_queue AS queue
JOIN lock ON queue.lock_id = lock.id
- WHERE queue.state IN ('ready', 'failed' )
+ WHERE queue.state IN ('ready', 'failed')
AND NOT EXISTS (SELECT 1 FROM replication_queue_job_lock WHERE lock_id = queue.lock_id)
- )
- ORDER BY created_at
+ WINDOW repo_change AS (PARTITION BY lock_id, job->>'change')
+ ORDER BY created_at
+ ) AS repo_changes
+ WHERE first_in_partition
LIMIT $3
- FOR UPDATE
)
, job AS (
UPDATE replication_queue AS queue
@@ -241,8 +243,7 @@ func (rq PostgresReplicationEventQueue) Dequeue(ctx context.Context, virtualStor
WHERE lock.id = tracked.lock_id
)
SELECT id, state, created_at, updated_at, lock_id, attempt, job, meta
- FROM job
- ORDER BY id`
+ FROM job`
rows, err := rq.qc.QueryContext(ctx, query, virtualStorage, nodeStorage, count)
if err != nil {
return nil, fmt.Errorf("query: %w", err)
@@ -272,7 +273,7 @@ func (rq PostgresReplicationEventQueue) Acknowledge(ctx context.Context, state J
query := `
WITH existing AS (
- SELECT id, lock_id, updated_at, job
+ SELECT id, lock_id, created_at, updated_at, job
FROM replication_queue
WHERE id = ANY($1)
AND state = 'in_progress'
@@ -295,10 +296,9 @@ func (rq PostgresReplicationEventQueue) Acknowledge(ctx context.Context, state J
WHERE existing.id = queue.id
OR (
queue.state = 'ready'
- AND queue.created_at < existing.updated_at
+ AND queue.created_at < existing.created_at
AND queue.lock_id = existing.lock_id
AND queue.job->>'change' = existing.job->>'change'
- AND queue.job->>'source_node_storage' = existing.job->>'source_node_storage'
)
RETURNING queue.id, queue.lock_id
)
diff --git a/internal/praefect/datastore/queue_test.go b/internal/praefect/datastore/queue_test.go
index c8be612e8..502ab9c0d 100644
--- a/internal/praefect/datastore/queue_test.go
+++ b/internal/praefect/datastore/queue_test.go
@@ -300,10 +300,10 @@ func TestPostgresReplicationEventQueue_DequeueMultiple(t *testing.T) {
}
// first request to deque
- expectedEvents1 := []ReplicationEvent{events[0], events[2], events[4]}
+ expectedEvents1 := []ReplicationEvent{events[3], events[2], events[4]}
expectedJobLocks1 := []JobLockRow{
- {JobID: events[0].ID, LockID: "praefect|gitaly-1|/project/path-1"},
{JobID: events[2].ID, LockID: "praefect|gitaly-1|/project/path-1"},
+ {JobID: events[3].ID, LockID: "praefect|gitaly-1|/project/path-1"},
{JobID: events[4].ID, LockID: "praefect|gitaly-1|/project/path-2"},
}
@@ -393,7 +393,7 @@ func TestPostgresReplicationEventQueue_DequeueSameStorageOtherRepository(t *test
// there is only one single lock for all fetched events because of their 'repo' and 'target' combination
{ID: "praefect|gitaly-1|/project/path-1", Acquired: true},
})
- requireJobLocks(t, ctx, db, []JobLockRow{{JobID: 1, LockID: "praefect|gitaly-1|/project/path-1"}})
+ requireJobLocks(t, ctx, db, []JobLockRow{{JobID: 2, LockID: "praefect|gitaly-1|/project/path-1"}})
var eventsType2 []ReplicationEvent
for i := 0; i < 2; i++ {
@@ -410,8 +410,8 @@ func TestPostgresReplicationEventQueue_DequeueSameStorageOtherRepository(t *test
{ID: "praefect|gitaly-1|/project/path-2", Acquired: true},
})
requireJobLocks(t, ctx, db, []JobLockRow{
- {JobID: 1, LockID: "praefect|gitaly-1|/project/path-1"},
- {JobID: 3, LockID: "praefect|gitaly-1|/project/path-2"},
+ {JobID: 2, LockID: "praefect|gitaly-1|/project/path-1"},
+ {JobID: 4, LockID: "praefect|gitaly-1|/project/path-2"},
})
}
@@ -527,15 +527,15 @@ func TestPostgresReplicationEventQueue_AcknowledgeMultiple(t *testing.T) {
{ID: events[6].LockID, Acquired: false},
})
requireJobLocks(t, ctx, db, []JobLockRow{
- {JobID: events[0].ID, LockID: events[0].LockID},
- {JobID: events[2].ID, LockID: events[2].LockID},
+ {JobID: events[3].ID, LockID: events[3].LockID},
{JobID: events[4].ID, LockID: events[4].LockID},
+ {JobID: events[5].ID, LockID: events[5].LockID},
})
// release lock for events of second type
- acknowledge1, err := queue.Acknowledge(ctx, JobStateFailed, []uint64{events[2].ID})
+ acknowledge1, err := queue.Acknowledge(ctx, JobStateFailed, []uint64{events[5].ID})
require.NoError(t, err)
- require.Equal(t, []uint64{3}, acknowledge1)
+ require.Equal(t, []uint64{6}, acknowledge1)
requireLocks(t, ctx, db, []LockRow{
{ID: events[0].LockID, Acquired: true},
{ID: events[2].LockID, Acquired: false},
@@ -543,7 +543,7 @@ func TestPostgresReplicationEventQueue_AcknowledgeMultiple(t *testing.T) {
{ID: events[6].LockID, Acquired: false},
})
requireJobLocks(t, ctx, db, []JobLockRow{
- {JobID: events[0].ID, LockID: events[0].LockID},
+ {JobID: events[3].ID, LockID: events[3].LockID},
{JobID: events[4].ID, LockID: events[4].LockID},
})
@@ -557,9 +557,9 @@ func TestPostgresReplicationEventQueue_AcknowledgeMultiple(t *testing.T) {
{ID: events[6].LockID, Acquired: false},
})
requireJobLocks(t, ctx, db, []JobLockRow{
- {JobID: events[0].ID, LockID: events[0].LockID},
- {JobID: events[2].ID, LockID: events[2].LockID},
+ {JobID: events[3].ID, LockID: events[3].LockID},
{JobID: events[4].ID, LockID: events[4].LockID},
+ {JobID: events[5].ID, LockID: events[5].LockID},
})
// creation of the new event that is equal to those already dequeue and processed
@@ -568,9 +568,9 @@ func TestPostgresReplicationEventQueue_AcknowledgeMultiple(t *testing.T) {
_, err = queue.Enqueue(ctx, eventType1)
require.NoError(t, err)
- acknowledge2, err := queue.Acknowledge(ctx, JobStateCompleted, []uint64{events[0].ID, events[4].ID})
+ acknowledge2, err := queue.Acknowledge(ctx, JobStateCompleted, []uint64{events[3].ID, events[4].ID})
require.NoError(t, err)
- require.Equal(t, []uint64{events[0].ID, events[4].ID}, acknowledge2)
+ require.Equal(t, []uint64{events[3].ID, events[4].ID}, acknowledge2)
requireLocks(t, ctx, db, []LockRow{
{ID: events[0].LockID, Acquired: false},
{ID: events[2].LockID, Acquired: true},
@@ -578,7 +578,7 @@ func TestPostgresReplicationEventQueue_AcknowledgeMultiple(t *testing.T) {
{ID: events[6].LockID, Acquired: false},
})
requireJobLocks(t, ctx, db, []JobLockRow{
- {JobID: events[2].ID, LockID: events[2].LockID},
+ {JobID: events[5].ID, LockID: events[5].LockID},
})
dequeuedEvents3, err := queue.Dequeue(ctx, "praefect", "gitaly-2", 3)
@@ -591,13 +591,13 @@ func TestPostgresReplicationEventQueue_AcknowledgeMultiple(t *testing.T) {
{ID: events[6].LockID, Acquired: true},
})
requireJobLocks(t, ctx, db, []JobLockRow{
- {JobID: events[2].ID, LockID: events[2].LockID},
+ {JobID: events[5].ID, LockID: events[5].LockID},
{JobID: events[6].ID, LockID: events[6].LockID},
})
- acknowledged3, err := queue.Acknowledge(ctx, JobStateCompleted, []uint64{events[2].ID, events[6].ID})
+ acknowledged3, err := queue.Acknowledge(ctx, JobStateCompleted, []uint64{events[5].ID, events[6].ID})
require.NoError(t, err)
- require.Equal(t, []uint64{events[2].ID, events[6].ID}, acknowledged3)
+ require.Equal(t, []uint64{events[5].ID, events[6].ID}, acknowledged3)
requireLocks(t, ctx, db, []LockRow{
{ID: events[0].LockID, Acquired: false},
{ID: events[2].LockID, Acquired: false},
@@ -1147,7 +1147,7 @@ func requireJobLocks(t *testing.T, ctx context.Context, db glsql.DB, expected []
for i := range actual {
actual[i].TriggeredAt = time.Time{}
}
- require.ElementsMatch(t, expected, actual)
+ require.Equal(t, expected, actual)
}
func fetchJobLocks(t *testing.T, ctx context.Context, db glsql.DB) []JobLockRow {