diff options
author | Toon Claes <toon@gitlab.com> | 2022-07-15 17:20:46 +0300 |
---|---|---|
committer | Toon Claes <toon@gitlab.com> | 2022-08-30 16:14:48 +0300 |
commit | 60e82502f2282fb83a02862434c5b72d51eb88c9 (patch) | |
tree | 8ad4375f7d30851be449a0489af28d50ab4e8228 | |
parent | 325ef97d3218f1680399bcc78eb86034d6209a29 (diff) |
datastore: Do not enqueue duplicate update jobstoon-dedup-rep-q-jobs
The reconciler schedules replication jobs for repositories that are not
up-to-date. It creates a new event each time it notices a repo is
outdated. We've seen situations where the job queue gets filled up with
a large amount of replication jobs. This is extremely inefficient and
makes the table grow to unmanageable sizes.
This change will avoid creation of a new update job when there is
already one there that's not being processed yet for the same repo.
Changelog: performance
Issue: https://gitlab.com/gitlab-org/gitaly/-/issues/3940
-rw-r--r-- | internal/praefect/datastore/collector_test.go | 3 | ||||
-rw-r--r-- | internal/praefect/datastore/queue.go | 31 | ||||
-rw-r--r-- | internal/praefect/datastore/queue_test.go | 150 | ||||
-rw-r--r-- | internal/praefect/replicator_test.go | 5 |
4 files changed, 109 insertions, 80 deletions
diff --git a/internal/praefect/datastore/collector_test.go b/internal/praefect/datastore/collector_test.go index 09b2a0fa9..3044f2ae7 100644 --- a/internal/praefect/datastore/collector_test.go +++ b/internal/praefect/datastore/collector_test.go @@ -274,10 +274,11 @@ func TestRepositoryStoreCollector_ReplicationQueueDepth(t *testing.T) { _, err := queue.Enqueue(ctx, ReplicationEvent{ Job: ReplicationJob{ Change: UpdateRepo, - RelativePath: "/project/path-1", + RelativePath: fmt.Sprintf("/project/path-%d", i), TargetNodeStorage: nodes[1], SourceNodeStorage: nodes[0], VirtualStorage: virtualStorage, + RepositoryID: int64(i), Params: nil, }, }) diff --git a/internal/praefect/datastore/queue.go b/internal/praefect/datastore/queue.go index 2943974ce..52d7acea4 100644 --- a/internal/praefect/datastore/queue.go +++ b/internal/praefect/datastore/queue.go @@ -12,6 +12,8 @@ import ( "gitlab.com/gitlab-org/gitaly/v15/internal/praefect/datastore/glsql" ) +var errDuplicateWorkNotEnqueued = errors.New("duplicate work not enqueued") + // ReplicationEventQueue allows to put new events to the persistent queue and retrieve them back. type ReplicationEventQueue interface { // Enqueue puts provided event into the persistent queue. @@ -20,8 +22,8 @@ type ReplicationEventQueue interface { Dequeue(ctx context.Context, virtualStorage, nodeStorage string, count int) ([]ReplicationEvent, error) // Acknowledge updates previously dequeued events with the new state and releases resources acquired for it. // It updates events that are in 'in_progress' state to the state that is passed in. - // It also updates state of similar events (scheduled fot the same repository with same change from the same source) - // that are in 'ready' state and created before the target event was dequeue for the processing if the new state is + // It also updates state of similar events (scheduled for the same repository with same change from the same source) + // that are in 'ready' state and created before the target event was dequeued for the processing if the new state is // 'completed'. Otherwise it won't be changed. // It returns sub-set of passed in ids that were updated. Acknowledge(ctx context.Context, state JobState, ids []uint64) ([]uint64, error) @@ -66,7 +68,7 @@ type ReplicationJob struct { Params Params `json:"params"` } -//nolint: stylecheck // This is unintentionally missing documentation. +// Scan a value of json data into the ReplicationJob func (job *ReplicationJob) Scan(value interface{}) error { if value == nil { return nil @@ -80,7 +82,7 @@ func (job *ReplicationJob) Scan(value interface{}) error { return json.Unmarshal(d, job) } -//nolint: stylecheck // This is unintentionally missing documentation. +// Value transforms the ReplicationJob into json func (job ReplicationJob) Value() (driver.Value, error) { data, err := json.Marshal(job) if err != nil { @@ -208,11 +210,12 @@ type PostgresReplicationEventQueue struct { qc glsql.Querier } -//nolint: stylecheck // This is unintentionally missing documentation. +// Enqueue puts provided event into the persistent queue. func (rq PostgresReplicationEventQueue) Enqueue(ctx context.Context, event ReplicationEvent) (ReplicationEvent, error) { // When `Enqueue` method is called: // 1. Insertion of the new record into `replication_queue_lock` table, so we are ensured all events have - // a corresponding <lock>. If a record already exists it won't be inserted again. + // a corresponding <lock>. If a record already exists it won't be inserted again, but locked for update to + // block concurrent Enqueue calls. // 2. Insertion of the new record into the `replication_queue` table with the defaults listed above, // the job, the meta and corresponding <lock> used in `replication_queue_lock` table for the `lock_id` column. @@ -226,6 +229,15 @@ func (rq PostgresReplicationEventQueue) Enqueue(ctx context.Context, event Repli INSERT INTO replication_queue(lock_id, job, meta) SELECT insert_lock.id, $4, $5 FROM insert_lock + WHERE NOT EXISTS ( + SELECT + FROM replication_queue AS q + WHERE q.state = 'ready' + AND q.job->>'virtual_storage' = $4::json->>'virtual_storage' + AND q.job->>'relative_path' = $4::json->>'relative_path' + AND q.job->>'target_node_storage' = $4::json->>'target_node_storage' + AND q.job->>'change' = $4::json->>'change' + ) RETURNING id, state, created_at, updated_at, lock_id, attempt, job, meta` // this will always return a single row result (because of lock uniqueness) or an error rows, err := rq.qc.QueryContext(ctx, query, event.Job.VirtualStorage, event.Job.TargetNodeStorage, event.Job.RelativePath, event.Job, event.Meta) @@ -237,11 +249,14 @@ func (rq PostgresReplicationEventQueue) Enqueue(ctx context.Context, event Repli if err != nil { return ReplicationEvent{}, fmt.Errorf("scan: %w", err) } + if len(events) == 0 { + return ReplicationEvent{}, errDuplicateWorkNotEnqueued + } return events[0], nil } -//nolint: stylecheck // This is unintentionally missing documentation. +// Dequeue retrieves events from the persistent queue using provided limitations and filters. func (rq PostgresReplicationEventQueue) Dequeue(ctx context.Context, virtualStorage, nodeStorage string, count int) ([]ReplicationEvent, error) { // When `Dequeue` method is called: // 1. Events with attempts left that are either in `ready` or `failed` state are candidates for dequeuing. @@ -322,7 +337,7 @@ func (rq PostgresReplicationEventQueue) Dequeue(ctx context.Context, virtualStor return res, nil } -//nolint: stylecheck // This is unintentionally missing documentation. +// Acknowledge updates previously dequeued events with the new state and releases resources acquired for it. func (rq PostgresReplicationEventQueue) Acknowledge(ctx context.Context, state JobState, ids []uint64) ([]uint64, error) { // When `Acknowledge` method is called: // 1. The list of event `id`s and corresponding <lock>s retrieved from `replication_queue` table as passed in by the diff --git a/internal/praefect/datastore/queue_test.go b/internal/praefect/datastore/queue_test.go index 6aac7ca4d..f07e55f00 100644 --- a/internal/praefect/datastore/queue_test.go +++ b/internal/praefect/datastore/queue_test.go @@ -300,25 +300,10 @@ func TestPostgresReplicationEventQueue_EnqueueMultiple(t *testing.T) { requireLocks(t, ctx, db, []LockRow{expLock1}) // expected a new lock for new event db.RequireRowsInTable(t, "replication_queue_job_lock", 0) - event2, err := queue.Enqueue(ctx, eventType1) // repeat of the same event - require.NoError(t, err) - - expEvent2 := ReplicationEvent{ - ID: event2.ID, - State: "ready", - Attempt: 3, - LockID: "praefect-0|gitaly-1|/project/path-1", - Job: ReplicationJob{ - Change: UpdateRepo, - RelativePath: "/project/path-1", - TargetNodeStorage: "gitaly-1", - SourceNodeStorage: "gitaly-0", - VirtualStorage: "praefect-0", - Params: nil, - }, - } + _, err = queue.Enqueue(ctx, eventType1) // repeat of the same event + require.Error(t, err, errDuplicateWorkNotEnqueued) - requireEvents(t, ctx, db, []ReplicationEvent{expEvent1, expEvent2}) + requireEvents(t, ctx, db, []ReplicationEvent{expEvent1}) requireLocks(t, ctx, db, []LockRow{expLock1}) // expected still one the same lock for repeated event event3, err := queue.Enqueue(ctx, eventType2) // event for another target @@ -339,7 +324,7 @@ func TestPostgresReplicationEventQueue_EnqueueMultiple(t *testing.T) { }, } - requireEvents(t, ctx, db, []ReplicationEvent{expEvent1, expEvent2, expEvent3}) + requireEvents(t, ctx, db, []ReplicationEvent{expEvent1, expEvent3}) requireLocks(t, ctx, db, []LockRow{expLock1, expLock2}) // the new lock for another target repeated event event4, err := queue.Enqueue(ctx, eventType3) // event for another repo @@ -360,10 +345,45 @@ func TestPostgresReplicationEventQueue_EnqueueMultiple(t *testing.T) { }, } - requireEvents(t, ctx, db, []ReplicationEvent{expEvent1, expEvent2, expEvent3, expEvent4}) + requireEvents(t, ctx, db, []ReplicationEvent{expEvent1, expEvent3, expEvent4}) requireLocks(t, ctx, db, []LockRow{expLock1, expLock2, expLock3}) // the new lock for same target but for another repo db.RequireRowsInTable(t, "replication_queue_job_lock", 0) // there is no fetches it must be empty + + dequeued, err := queue.Dequeue(ctx, "praefect-0", "gitaly-1", 1) + require.NoError(t, err) + require.Len(t, dequeued, 1) + require.Equal(t, event1.ID, dequeued[0].ID) + + expEvent1.State = JobStateInProgress + expEvent1.Attempt = 2 + expLock1.Acquired = true + + requireEvents(t, ctx, db, []ReplicationEvent{expEvent1, expEvent3, expEvent4}) + requireLocks(t, ctx, db, []LockRow{expLock1, expLock2, expLock3}) + + event5, err := queue.Enqueue(ctx, eventType1) // event for in-progress repo + require.NoError(t, err) + + expEvent5 := ReplicationEvent{ + ID: event5.ID, + State: "ready", + Attempt: 3, + LockID: "praefect-0|gitaly-1|/project/path-1", + Job: ReplicationJob{ + Change: UpdateRepo, + RelativePath: "/project/path-1", + TargetNodeStorage: "gitaly-1", + SourceNodeStorage: "gitaly-0", + VirtualStorage: "praefect-0", + Params: nil, + }, + } + + requireEvents(t, ctx, db, []ReplicationEvent{expEvent1, expEvent3, expEvent4, expEvent5}) + requireLocks(t, ctx, db, []LockRow{expLock1, expLock2, expLock3}) + + db.RequireRowsInTable(t, "replication_queue_job_lock", 1) } func TestPostgresReplicationEventQueue_Dequeue(t *testing.T) { @@ -464,7 +484,7 @@ func TestPostgresReplicationEventQueue_DequeueMultiple(t *testing.T) { } // events to fill in the queue - events := []ReplicationEvent{eventType1, eventType1, eventType2, eventType1, eventType3, eventType4} + events := []ReplicationEvent{eventType1, eventType2, eventType3, eventType4} for i := range events { var err error events[i], err = queue.Enqueue(ctx, events[i]) @@ -472,11 +492,11 @@ func TestPostgresReplicationEventQueue_DequeueMultiple(t *testing.T) { } // first request to deque - expectedEvents1 := []ReplicationEvent{events[0], events[2], events[4]} + expectedEvents1 := []ReplicationEvent{events[0], events[1], events[2]} expectedJobLocks1 := []JobLockRow{ {JobID: events[0].ID, LockID: "praefect|gitaly-1|/project/path-1"}, - {JobID: events[2].ID, LockID: "praefect|gitaly-1|/project/path-1"}, - {JobID: events[4].ID, LockID: "praefect|gitaly-1|/project/path-2"}, + {JobID: events[1].ID, LockID: "praefect|gitaly-1|/project/path-1"}, + {JobID: events[2].ID, LockID: "praefect|gitaly-1|/project/path-2"}, } // we expect only first two types of events by limiting count to 3 @@ -500,11 +520,11 @@ func TestPostgresReplicationEventQueue_DequeueMultiple(t *testing.T) { // second request to deque // there must be only last event fetched from the queue - expectedEvents2 := []ReplicationEvent{events[5]} + expectedEvents2 := []ReplicationEvent{events[3]} expectedEvents2[0].State = JobStateInProgress expectedEvents2[0].Attempt = 2 - expectedJobLocks2 := []JobLockRow{{JobID: 6, LockID: "backup|gitaly-1|/project/path-1"}} + expectedJobLocks2 := []JobLockRow{{JobID: 4, LockID: "backup|gitaly-1|/project/path-1"}} dequeuedEvents2, err := queue.Dequeue(ctx, "backup", "gitaly-1", 100500) require.NoError(t, err) @@ -550,10 +570,8 @@ func TestPostgresReplicationEventQueue_DequeueSameStorageOtherRepository(t *test }, } - for i := 0; i < 2; i++ { - _, err := queue.Enqueue(ctx, eventType1) - require.NoError(t, err, "failed to fill in event queue") - } + _, err := queue.Enqueue(ctx, eventType1) + require.NoError(t, err, "failed to fill in event queue") dequeuedEvents1, err := queue.Dequeue(ctx, "praefect", "gitaly-1", 1) require.NoError(t, err) @@ -564,10 +582,8 @@ func TestPostgresReplicationEventQueue_DequeueSameStorageOtherRepository(t *test }) requireJobLocks(t, ctx, db, []JobLockRow{{JobID: 1, LockID: "praefect|gitaly-1|/project/path-1"}}) - for i := 0; i < 2; i++ { - _, err := queue.Enqueue(ctx, eventType2) - require.NoError(t, err, "failed to fill in event queue") - } + _, err = queue.Enqueue(ctx, eventType2) + require.NoError(t, err, "failed to fill in event queue") dequeuedEvents2, err := queue.Dequeue(ctx, "praefect", "gitaly-1", 1) require.NoError(t, err) @@ -578,7 +594,7 @@ func TestPostgresReplicationEventQueue_DequeueSameStorageOtherRepository(t *test }) requireJobLocks(t, ctx, db, []JobLockRow{ {JobID: 1, LockID: "praefect|gitaly-1|/project/path-1"}, - {JobID: 3, LockID: "praefect|gitaly-1|/project/path-2"}, + {JobID: 2, LockID: "praefect|gitaly-1|/project/path-2"}, }) } @@ -675,7 +691,7 @@ func TestPostgresReplicationEventQueue_AcknowledgeMultiple(t *testing.T) { }, } - events := []ReplicationEvent{eventType1, eventType1, eventType2, eventType1, eventType3, eventType2, eventType4} // events to fill in the queue + events := []ReplicationEvent{eventType1, eventType2, eventType3, eventType4} for i := range events { var err error events[i], err = queue.Enqueue(ctx, events[i]) @@ -688,29 +704,29 @@ func TestPostgresReplicationEventQueue_AcknowledgeMultiple(t *testing.T) { require.Len(t, dequeuedEvents1, 3) requireLocks(t, ctx, db, []LockRow{ {ID: events[0].LockID, Acquired: true}, + {ID: events[1].LockID, Acquired: true}, {ID: events[2].LockID, Acquired: true}, - {ID: events[4].LockID, Acquired: true}, - {ID: events[6].LockID, Acquired: false}, + {ID: events[3].LockID, Acquired: false}, }) requireJobLocks(t, ctx, db, []JobLockRow{ {JobID: events[0].ID, LockID: events[0].LockID}, + {JobID: events[1].ID, LockID: events[1].LockID}, {JobID: events[2].ID, LockID: events[2].LockID}, - {JobID: events[4].ID, LockID: events[4].LockID}, }) // release lock for events of second type - acknowledge1, err := queue.Acknowledge(ctx, JobStateFailed, []uint64{events[2].ID}) + acknowledge1, err := queue.Acknowledge(ctx, JobStateFailed, []uint64{events[1].ID}) require.NoError(t, err) - require.Equal(t, []uint64{3}, acknowledge1) + require.Equal(t, []uint64{2}, acknowledge1) requireLocks(t, ctx, db, []LockRow{ {ID: events[0].LockID, Acquired: true}, - {ID: events[2].LockID, Acquired: false}, - {ID: events[4].LockID, Acquired: true}, - {ID: events[6].LockID, Acquired: false}, + {ID: events[1].LockID, Acquired: false}, + {ID: events[2].LockID, Acquired: true}, + {ID: events[3].LockID, Acquired: false}, }) requireJobLocks(t, ctx, db, []JobLockRow{ {JobID: events[0].ID, LockID: events[0].LockID}, - {JobID: events[4].ID, LockID: events[4].LockID}, + {JobID: events[2].ID, LockID: events[2].LockID}, }) dequeuedEvents2, err := queue.Dequeue(ctx, "praefect", "gitaly-1", 3) @@ -718,14 +734,14 @@ func TestPostgresReplicationEventQueue_AcknowledgeMultiple(t *testing.T) { require.Len(t, dequeuedEvents2, 1, "expected: events of type 2 ('failed' will be fetched for retry)") requireLocks(t, ctx, db, []LockRow{ {ID: events[0].LockID, Acquired: true}, + {ID: events[1].LockID, Acquired: true}, {ID: events[2].LockID, Acquired: true}, - {ID: events[4].LockID, Acquired: true}, - {ID: events[6].LockID, Acquired: false}, + {ID: events[3].LockID, Acquired: false}, }) requireJobLocks(t, ctx, db, []JobLockRow{ {JobID: events[0].ID, LockID: events[0].LockID}, + {JobID: events[1].ID, LockID: events[1].LockID}, {JobID: events[2].ID, LockID: events[2].LockID}, - {JobID: events[4].ID, LockID: events[4].LockID}, }) // creation of the new event that is equal to those already dequeue and processed @@ -734,47 +750,47 @@ func TestPostgresReplicationEventQueue_AcknowledgeMultiple(t *testing.T) { _, err = queue.Enqueue(ctx, eventType1) require.NoError(t, err) - acknowledge2, err := queue.Acknowledge(ctx, JobStateCompleted, []uint64{events[0].ID, events[4].ID}) + acknowledge2, err := queue.Acknowledge(ctx, JobStateCompleted, []uint64{events[0].ID, events[2].ID}) require.NoError(t, err) - require.Equal(t, []uint64{events[0].ID, events[4].ID}, acknowledge2) + require.Equal(t, []uint64{events[0].ID, events[2].ID}, acknowledge2) requireLocks(t, ctx, db, []LockRow{ {ID: events[0].LockID, Acquired: false}, - {ID: events[2].LockID, Acquired: true}, - {ID: events[4].LockID, Acquired: false}, - {ID: events[6].LockID, Acquired: false}, + {ID: events[1].LockID, Acquired: true}, + {ID: events[2].LockID, Acquired: false}, + {ID: events[3].LockID, Acquired: false}, }) requireJobLocks(t, ctx, db, []JobLockRow{ - {JobID: events[2].ID, LockID: events[2].LockID}, + {JobID: events[1].ID, LockID: events[1].LockID}, }) - db.RequireRowsInTable(t, "replication_queue", 4) + db.RequireRowsInTable(t, "replication_queue", 3) dequeuedEvents3, err := queue.Dequeue(ctx, "praefect", "gitaly-2", 3) require.NoError(t, err) require.Len(t, dequeuedEvents3, 1, "expected: event of type 4") requireLocks(t, ctx, db, []LockRow{ {ID: events[0].LockID, Acquired: false}, - {ID: events[2].LockID, Acquired: true}, - {ID: events[4].LockID, Acquired: false}, - {ID: events[6].LockID, Acquired: true}, + {ID: events[1].LockID, Acquired: true}, + {ID: events[2].LockID, Acquired: false}, + {ID: events[3].LockID, Acquired: true}, }) requireJobLocks(t, ctx, db, []JobLockRow{ - {JobID: events[2].ID, LockID: events[2].LockID}, - {JobID: events[6].ID, LockID: events[6].LockID}, + {JobID: events[1].ID, LockID: events[1].LockID}, + {JobID: events[3].ID, LockID: events[3].LockID}, }) - acknowledged3, err := queue.Acknowledge(ctx, JobStateCompleted, []uint64{events[2].ID, events[6].ID}) + acknowledged3, err := queue.Acknowledge(ctx, JobStateCompleted, []uint64{events[1].ID, events[3].ID}) require.NoError(t, err) - require.Equal(t, []uint64{events[2].ID, events[6].ID}, acknowledged3) + require.Equal(t, []uint64{events[1].ID, events[3].ID}, acknowledged3) requireLocks(t, ctx, db, []LockRow{ {ID: events[0].LockID, Acquired: false}, + {ID: events[1].LockID, Acquired: false}, {ID: events[2].LockID, Acquired: false}, - {ID: events[4].LockID, Acquired: false}, - {ID: events[6].LockID, Acquired: false}, + {ID: events[3].LockID, Acquired: false}, }) requireJobLocks(t, ctx, db, nil) db.RequireRowsInTable(t, "replication_queue", 1) - newEvent, err := queue.Enqueue(ctx, eventType1) + newEvent, err := queue.Enqueue(ctx, eventType2) require.NoError(t, err) acknowledge4, err := queue.Acknowledge(ctx, JobStateCompleted, []uint64{newEvent.ID}) @@ -787,9 +803,9 @@ func TestPostgresReplicationEventQueue_AcknowledgeMultiple(t *testing.T) { require.Equal(t, "ready", newEventState, "no way to acknowledge event that is not in in_progress state(was not dequeued)") requireLocks(t, ctx, db, []LockRow{ {ID: events[0].LockID, Acquired: false}, + {ID: events[1].LockID, Acquired: false}, {ID: events[2].LockID, Acquired: false}, - {ID: events[4].LockID, Acquired: false}, - {ID: events[6].LockID, Acquired: false}, + {ID: events[3].LockID, Acquired: false}, }) requireJobLocks(t, ctx, db, nil) } diff --git a/internal/praefect/replicator_test.go b/internal/praefect/replicator_test.go index b9349a0bc..795ac9234 100644 --- a/internal/praefect/replicator_test.go +++ b/internal/praefect/replicator_test.go @@ -490,7 +490,7 @@ func TestProcessBacklog_Success(t *testing.T) { ackIDs, err := queue.Acknowledge(ctx, state, ids) if len(ids) > 0 { assert.Equal(t, datastore.JobStateCompleted, state, "no fails expected") - assert.Equal(t, []uint64{1, 3, 4}, ids, "all jobs must be processed at once") + assert.Equal(t, []uint64{1, 2, 3}, ids, "all jobs must be processed at once") } return ackIDs, err }) @@ -517,9 +517,6 @@ func TestProcessBacklog_Success(t *testing.T) { _, err := queueInterceptor.Enqueue(ctx, eventType1) require.NoError(t, err) - _, err = queueInterceptor.Enqueue(ctx, eventType1) - require.NoError(t, err) - renameTo1 := filepath.Join(testRepo.GetRelativePath(), "..", filepath.Base(testRepo.GetRelativePath())+"-mv1") fullNewPath1 := filepath.Join(backupCfg.Storages[0].Path, renameTo1) |