diff options
author | Sami Hiltunen <shiltunen@gitlab.com> | 2020-07-20 16:48:28 +0300 |
---|---|---|
committer | Sami Hiltunen <shiltunen@gitlab.com> | 2020-07-20 17:21:46 +0300 |
commit | b14362fa95c48966924106d19f0abafbe1fc2a11 (patch) | |
tree | f23f7f0565f485e842a9fed5b3370c5e5975c6fb | |
parent | 893137c05e065ab96edd21af1a5ad8f6745b4a08 (diff) |
pick the newest job for processing when collapsing jobssmh-collapse-jobs-pick-oldest
Currently Praefect picks the oldest job for processing when collapsing
jobs. When a job is completed, other jobs with the same
(lock_id, change_type) are also acknowledged as complete, avoiding
redundant work by collapsing replication jobs that would perform
the same change.
Repository generation tracking was introduced in 7bbc7cc4. As the
generation numbers are propagated in replication jobs, skipping newer
jobs means we are not acknowleding higher generations we've replicated.
This leads to repositories being considered outdated even if they're not.
To avoid this while still keeping the benefits of job collapsing, this
commit changes the collapsing to prefer newest jobs for a given repo
while still maintaining the queueuing order between repositories.
For the following queue of jobs:
J1/R1 -> J2/R2 -> J3/R2 -> J4/R1 -> J5/R3
Dequeuing two events should yield:
J4 -> J3
The queueing order of the repositories is maintained while for a given
repository, the latest job is picked.
Additionally, this commit only acknowledges jobs older than the completed
job as newer jobs might contain higher generation numbers.
Source node is not considered anymore when acknowledging a job as replicating
first from Node A and then from Node B is redundant as Node B's changes
overwrite Node A's.
-rw-r--r-- | internal/praefect/datastore/queue.go | 26 | ||||
-rw-r--r-- | internal/praefect/datastore/queue_test.go | 38 |
2 files changed, 32 insertions, 32 deletions
diff --git a/internal/praefect/datastore/queue.go b/internal/praefect/datastore/queue.go index 8d1d6e941..ceb0fabcf 100644 --- a/internal/praefect/datastore/queue.go +++ b/internal/praefect/datastore/queue.go @@ -206,18 +206,20 @@ func (rq PostgresReplicationEventQueue) Dequeue(ctx context.Context, virtualStor FOR UPDATE SKIP LOCKED ) , candidate AS ( - SELECT id - FROM replication_queue - WHERE id IN ( - SELECT DISTINCT FIRST_VALUE(queue.id) OVER (PARTITION BY lock_id, job->>'change' ORDER BY queue.created_at) + SELECT collapse_to AS id + FROM ( + SELECT + row_number() OVER repo_change = 1 AS first_in_partition, + max(queue.id) OVER repo_change AS collapse_to FROM replication_queue AS queue JOIN lock ON queue.lock_id = lock.id - WHERE queue.state IN ('ready', 'failed' ) + WHERE queue.state IN ('ready', 'failed') AND NOT EXISTS (SELECT 1 FROM replication_queue_job_lock WHERE lock_id = queue.lock_id) - ) - ORDER BY created_at + WINDOW repo_change AS (PARTITION BY lock_id, job->>'change') + ORDER BY created_at + ) AS repo_changes + WHERE first_in_partition LIMIT $3 - FOR UPDATE ) , job AS ( UPDATE replication_queue AS queue @@ -241,8 +243,7 @@ func (rq PostgresReplicationEventQueue) Dequeue(ctx context.Context, virtualStor WHERE lock.id = tracked.lock_id ) SELECT id, state, created_at, updated_at, lock_id, attempt, job, meta - FROM job - ORDER BY id` + FROM job` rows, err := rq.qc.QueryContext(ctx, query, virtualStorage, nodeStorage, count) if err != nil { return nil, fmt.Errorf("query: %w", err) @@ -272,7 +273,7 @@ func (rq PostgresReplicationEventQueue) Acknowledge(ctx context.Context, state J query := ` WITH existing AS ( - SELECT id, lock_id, updated_at, job + SELECT id, lock_id, created_at, updated_at, job FROM replication_queue WHERE id = ANY($1) AND state = 'in_progress' @@ -295,10 +296,9 @@ func (rq PostgresReplicationEventQueue) Acknowledge(ctx context.Context, state J WHERE existing.id = queue.id OR ( queue.state = 'ready' - AND queue.created_at < existing.updated_at + AND queue.created_at < existing.created_at AND queue.lock_id = existing.lock_id AND queue.job->>'change' = existing.job->>'change' - AND queue.job->>'source_node_storage' = existing.job->>'source_node_storage' ) RETURNING queue.id, queue.lock_id ) diff --git a/internal/praefect/datastore/queue_test.go b/internal/praefect/datastore/queue_test.go index c8be612e8..502ab9c0d 100644 --- a/internal/praefect/datastore/queue_test.go +++ b/internal/praefect/datastore/queue_test.go @@ -300,10 +300,10 @@ func TestPostgresReplicationEventQueue_DequeueMultiple(t *testing.T) { } // first request to deque - expectedEvents1 := []ReplicationEvent{events[0], events[2], events[4]} + expectedEvents1 := []ReplicationEvent{events[3], events[2], events[4]} expectedJobLocks1 := []JobLockRow{ - {JobID: events[0].ID, LockID: "praefect|gitaly-1|/project/path-1"}, {JobID: events[2].ID, LockID: "praefect|gitaly-1|/project/path-1"}, + {JobID: events[3].ID, LockID: "praefect|gitaly-1|/project/path-1"}, {JobID: events[4].ID, LockID: "praefect|gitaly-1|/project/path-2"}, } @@ -393,7 +393,7 @@ func TestPostgresReplicationEventQueue_DequeueSameStorageOtherRepository(t *test // there is only one single lock for all fetched events because of their 'repo' and 'target' combination {ID: "praefect|gitaly-1|/project/path-1", Acquired: true}, }) - requireJobLocks(t, ctx, db, []JobLockRow{{JobID: 1, LockID: "praefect|gitaly-1|/project/path-1"}}) + requireJobLocks(t, ctx, db, []JobLockRow{{JobID: 2, LockID: "praefect|gitaly-1|/project/path-1"}}) var eventsType2 []ReplicationEvent for i := 0; i < 2; i++ { @@ -410,8 +410,8 @@ func TestPostgresReplicationEventQueue_DequeueSameStorageOtherRepository(t *test {ID: "praefect|gitaly-1|/project/path-2", Acquired: true}, }) requireJobLocks(t, ctx, db, []JobLockRow{ - {JobID: 1, LockID: "praefect|gitaly-1|/project/path-1"}, - {JobID: 3, LockID: "praefect|gitaly-1|/project/path-2"}, + {JobID: 2, LockID: "praefect|gitaly-1|/project/path-1"}, + {JobID: 4, LockID: "praefect|gitaly-1|/project/path-2"}, }) } @@ -527,15 +527,15 @@ func TestPostgresReplicationEventQueue_AcknowledgeMultiple(t *testing.T) { {ID: events[6].LockID, Acquired: false}, }) requireJobLocks(t, ctx, db, []JobLockRow{ - {JobID: events[0].ID, LockID: events[0].LockID}, - {JobID: events[2].ID, LockID: events[2].LockID}, + {JobID: events[3].ID, LockID: events[3].LockID}, {JobID: events[4].ID, LockID: events[4].LockID}, + {JobID: events[5].ID, LockID: events[5].LockID}, }) // release lock for events of second type - acknowledge1, err := queue.Acknowledge(ctx, JobStateFailed, []uint64{events[2].ID}) + acknowledge1, err := queue.Acknowledge(ctx, JobStateFailed, []uint64{events[5].ID}) require.NoError(t, err) - require.Equal(t, []uint64{3}, acknowledge1) + require.Equal(t, []uint64{6}, acknowledge1) requireLocks(t, ctx, db, []LockRow{ {ID: events[0].LockID, Acquired: true}, {ID: events[2].LockID, Acquired: false}, @@ -543,7 +543,7 @@ func TestPostgresReplicationEventQueue_AcknowledgeMultiple(t *testing.T) { {ID: events[6].LockID, Acquired: false}, }) requireJobLocks(t, ctx, db, []JobLockRow{ - {JobID: events[0].ID, LockID: events[0].LockID}, + {JobID: events[3].ID, LockID: events[3].LockID}, {JobID: events[4].ID, LockID: events[4].LockID}, }) @@ -557,9 +557,9 @@ func TestPostgresReplicationEventQueue_AcknowledgeMultiple(t *testing.T) { {ID: events[6].LockID, Acquired: false}, }) requireJobLocks(t, ctx, db, []JobLockRow{ - {JobID: events[0].ID, LockID: events[0].LockID}, - {JobID: events[2].ID, LockID: events[2].LockID}, + {JobID: events[3].ID, LockID: events[3].LockID}, {JobID: events[4].ID, LockID: events[4].LockID}, + {JobID: events[5].ID, LockID: events[5].LockID}, }) // creation of the new event that is equal to those already dequeue and processed @@ -568,9 +568,9 @@ func TestPostgresReplicationEventQueue_AcknowledgeMultiple(t *testing.T) { _, err = queue.Enqueue(ctx, eventType1) require.NoError(t, err) - acknowledge2, err := queue.Acknowledge(ctx, JobStateCompleted, []uint64{events[0].ID, events[4].ID}) + acknowledge2, err := queue.Acknowledge(ctx, JobStateCompleted, []uint64{events[3].ID, events[4].ID}) require.NoError(t, err) - require.Equal(t, []uint64{events[0].ID, events[4].ID}, acknowledge2) + require.Equal(t, []uint64{events[3].ID, events[4].ID}, acknowledge2) requireLocks(t, ctx, db, []LockRow{ {ID: events[0].LockID, Acquired: false}, {ID: events[2].LockID, Acquired: true}, @@ -578,7 +578,7 @@ func TestPostgresReplicationEventQueue_AcknowledgeMultiple(t *testing.T) { {ID: events[6].LockID, Acquired: false}, }) requireJobLocks(t, ctx, db, []JobLockRow{ - {JobID: events[2].ID, LockID: events[2].LockID}, + {JobID: events[5].ID, LockID: events[5].LockID}, }) dequeuedEvents3, err := queue.Dequeue(ctx, "praefect", "gitaly-2", 3) @@ -591,13 +591,13 @@ func TestPostgresReplicationEventQueue_AcknowledgeMultiple(t *testing.T) { {ID: events[6].LockID, Acquired: true}, }) requireJobLocks(t, ctx, db, []JobLockRow{ - {JobID: events[2].ID, LockID: events[2].LockID}, + {JobID: events[5].ID, LockID: events[5].LockID}, {JobID: events[6].ID, LockID: events[6].LockID}, }) - acknowledged3, err := queue.Acknowledge(ctx, JobStateCompleted, []uint64{events[2].ID, events[6].ID}) + acknowledged3, err := queue.Acknowledge(ctx, JobStateCompleted, []uint64{events[5].ID, events[6].ID}) require.NoError(t, err) - require.Equal(t, []uint64{events[2].ID, events[6].ID}, acknowledged3) + require.Equal(t, []uint64{events[5].ID, events[6].ID}, acknowledged3) requireLocks(t, ctx, db, []LockRow{ {ID: events[0].LockID, Acquired: false}, {ID: events[2].LockID, Acquired: false}, @@ -1147,7 +1147,7 @@ func requireJobLocks(t *testing.T, ctx context.Context, db glsql.DB, expected [] for i := range actual { actual[i].TriggeredAt = time.Time{} } - require.ElementsMatch(t, expected, actual) + require.Equal(t, expected, actual) } func fetchJobLocks(t *testing.T, ctx context.Context, db glsql.DB) []JobLockRow { |