Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorToon Claes <toon@gitlab.com>2022-07-15 17:20:46 +0300
committerToon Claes <toon@gitlab.com>2022-08-30 16:14:48 +0300
commit60e82502f2282fb83a02862434c5b72d51eb88c9 (patch)
tree8ad4375f7d30851be449a0489af28d50ab4e8228
parent325ef97d3218f1680399bcc78eb86034d6209a29 (diff)
datastore: Do not enqueue duplicate update jobstoon-dedup-rep-q-jobs
The reconciler schedules replication jobs for repositories that are not up-to-date. It creates a new event each time it notices a repo is outdated. We've seen situations where the job queue gets filled up with a large amount of replication jobs. This is extremely inefficient and makes the table grow to unmanageable sizes. This change will avoid creation of a new update job when there is already one there that's not being processed yet for the same repo. Changelog: performance Issue: https://gitlab.com/gitlab-org/gitaly/-/issues/3940
-rw-r--r--internal/praefect/datastore/collector_test.go3
-rw-r--r--internal/praefect/datastore/queue.go31
-rw-r--r--internal/praefect/datastore/queue_test.go150
-rw-r--r--internal/praefect/replicator_test.go5
4 files changed, 109 insertions, 80 deletions
diff --git a/internal/praefect/datastore/collector_test.go b/internal/praefect/datastore/collector_test.go
index 09b2a0fa9..3044f2ae7 100644
--- a/internal/praefect/datastore/collector_test.go
+++ b/internal/praefect/datastore/collector_test.go
@@ -274,10 +274,11 @@ func TestRepositoryStoreCollector_ReplicationQueueDepth(t *testing.T) {
_, err := queue.Enqueue(ctx, ReplicationEvent{
Job: ReplicationJob{
Change: UpdateRepo,
- RelativePath: "/project/path-1",
+ RelativePath: fmt.Sprintf("/project/path-%d", i),
TargetNodeStorage: nodes[1],
SourceNodeStorage: nodes[0],
VirtualStorage: virtualStorage,
+ RepositoryID: int64(i),
Params: nil,
},
})
diff --git a/internal/praefect/datastore/queue.go b/internal/praefect/datastore/queue.go
index 2943974ce..52d7acea4 100644
--- a/internal/praefect/datastore/queue.go
+++ b/internal/praefect/datastore/queue.go
@@ -12,6 +12,8 @@ import (
"gitlab.com/gitlab-org/gitaly/v15/internal/praefect/datastore/glsql"
)
+var errDuplicateWorkNotEnqueued = errors.New("duplicate work not enqueued")
+
// ReplicationEventQueue allows to put new events to the persistent queue and retrieve them back.
type ReplicationEventQueue interface {
// Enqueue puts provided event into the persistent queue.
@@ -20,8 +22,8 @@ type ReplicationEventQueue interface {
Dequeue(ctx context.Context, virtualStorage, nodeStorage string, count int) ([]ReplicationEvent, error)
// Acknowledge updates previously dequeued events with the new state and releases resources acquired for it.
// It updates events that are in 'in_progress' state to the state that is passed in.
- // It also updates state of similar events (scheduled fot the same repository with same change from the same source)
- // that are in 'ready' state and created before the target event was dequeue for the processing if the new state is
+ // It also updates state of similar events (scheduled for the same repository with same change from the same source)
+ // that are in 'ready' state and created before the target event was dequeued for the processing if the new state is
// 'completed'. Otherwise it won't be changed.
// It returns sub-set of passed in ids that were updated.
Acknowledge(ctx context.Context, state JobState, ids []uint64) ([]uint64, error)
@@ -66,7 +68,7 @@ type ReplicationJob struct {
Params Params `json:"params"`
}
-//nolint: stylecheck // This is unintentionally missing documentation.
+// Scan a value of json data into the ReplicationJob
func (job *ReplicationJob) Scan(value interface{}) error {
if value == nil {
return nil
@@ -80,7 +82,7 @@ func (job *ReplicationJob) Scan(value interface{}) error {
return json.Unmarshal(d, job)
}
-//nolint: stylecheck // This is unintentionally missing documentation.
+// Value transforms the ReplicationJob into json
func (job ReplicationJob) Value() (driver.Value, error) {
data, err := json.Marshal(job)
if err != nil {
@@ -208,11 +210,12 @@ type PostgresReplicationEventQueue struct {
qc glsql.Querier
}
-//nolint: stylecheck // This is unintentionally missing documentation.
+// Enqueue puts provided event into the persistent queue.
func (rq PostgresReplicationEventQueue) Enqueue(ctx context.Context, event ReplicationEvent) (ReplicationEvent, error) {
// When `Enqueue` method is called:
// 1. Insertion of the new record into `replication_queue_lock` table, so we are ensured all events have
- // a corresponding <lock>. If a record already exists it won't be inserted again.
+ // a corresponding <lock>. If a record already exists it won't be inserted again, but locked for update to
+ // block concurrent Enqueue calls.
// 2. Insertion of the new record into the `replication_queue` table with the defaults listed above,
// the job, the meta and corresponding <lock> used in `replication_queue_lock` table for the `lock_id` column.
@@ -226,6 +229,15 @@ func (rq PostgresReplicationEventQueue) Enqueue(ctx context.Context, event Repli
INSERT INTO replication_queue(lock_id, job, meta)
SELECT insert_lock.id, $4, $5
FROM insert_lock
+ WHERE NOT EXISTS (
+ SELECT
+ FROM replication_queue AS q
+ WHERE q.state = 'ready'
+ AND q.job->>'virtual_storage' = $4::json->>'virtual_storage'
+ AND q.job->>'relative_path' = $4::json->>'relative_path'
+ AND q.job->>'target_node_storage' = $4::json->>'target_node_storage'
+ AND q.job->>'change' = $4::json->>'change'
+ )
RETURNING id, state, created_at, updated_at, lock_id, attempt, job, meta`
// this will always return a single row result (because of lock uniqueness) or an error
rows, err := rq.qc.QueryContext(ctx, query, event.Job.VirtualStorage, event.Job.TargetNodeStorage, event.Job.RelativePath, event.Job, event.Meta)
@@ -237,11 +249,14 @@ func (rq PostgresReplicationEventQueue) Enqueue(ctx context.Context, event Repli
if err != nil {
return ReplicationEvent{}, fmt.Errorf("scan: %w", err)
}
+ if len(events) == 0 {
+ return ReplicationEvent{}, errDuplicateWorkNotEnqueued
+ }
return events[0], nil
}
-//nolint: stylecheck // This is unintentionally missing documentation.
+// Dequeue retrieves events from the persistent queue using provided limitations and filters.
func (rq PostgresReplicationEventQueue) Dequeue(ctx context.Context, virtualStorage, nodeStorage string, count int) ([]ReplicationEvent, error) {
// When `Dequeue` method is called:
// 1. Events with attempts left that are either in `ready` or `failed` state are candidates for dequeuing.
@@ -322,7 +337,7 @@ func (rq PostgresReplicationEventQueue) Dequeue(ctx context.Context, virtualStor
return res, nil
}
-//nolint: stylecheck // This is unintentionally missing documentation.
+// Acknowledge updates previously dequeued events with the new state and releases resources acquired for it.
func (rq PostgresReplicationEventQueue) Acknowledge(ctx context.Context, state JobState, ids []uint64) ([]uint64, error) {
// When `Acknowledge` method is called:
// 1. The list of event `id`s and corresponding <lock>s retrieved from `replication_queue` table as passed in by the
diff --git a/internal/praefect/datastore/queue_test.go b/internal/praefect/datastore/queue_test.go
index 6aac7ca4d..f07e55f00 100644
--- a/internal/praefect/datastore/queue_test.go
+++ b/internal/praefect/datastore/queue_test.go
@@ -300,25 +300,10 @@ func TestPostgresReplicationEventQueue_EnqueueMultiple(t *testing.T) {
requireLocks(t, ctx, db, []LockRow{expLock1}) // expected a new lock for new event
db.RequireRowsInTable(t, "replication_queue_job_lock", 0)
- event2, err := queue.Enqueue(ctx, eventType1) // repeat of the same event
- require.NoError(t, err)
-
- expEvent2 := ReplicationEvent{
- ID: event2.ID,
- State: "ready",
- Attempt: 3,
- LockID: "praefect-0|gitaly-1|/project/path-1",
- Job: ReplicationJob{
- Change: UpdateRepo,
- RelativePath: "/project/path-1",
- TargetNodeStorage: "gitaly-1",
- SourceNodeStorage: "gitaly-0",
- VirtualStorage: "praefect-0",
- Params: nil,
- },
- }
+ _, err = queue.Enqueue(ctx, eventType1) // repeat of the same event
+ require.Error(t, err, errDuplicateWorkNotEnqueued)
- requireEvents(t, ctx, db, []ReplicationEvent{expEvent1, expEvent2})
+ requireEvents(t, ctx, db, []ReplicationEvent{expEvent1})
requireLocks(t, ctx, db, []LockRow{expLock1}) // expected still one the same lock for repeated event
event3, err := queue.Enqueue(ctx, eventType2) // event for another target
@@ -339,7 +324,7 @@ func TestPostgresReplicationEventQueue_EnqueueMultiple(t *testing.T) {
},
}
- requireEvents(t, ctx, db, []ReplicationEvent{expEvent1, expEvent2, expEvent3})
+ requireEvents(t, ctx, db, []ReplicationEvent{expEvent1, expEvent3})
requireLocks(t, ctx, db, []LockRow{expLock1, expLock2}) // the new lock for another target repeated event
event4, err := queue.Enqueue(ctx, eventType3) // event for another repo
@@ -360,10 +345,45 @@ func TestPostgresReplicationEventQueue_EnqueueMultiple(t *testing.T) {
},
}
- requireEvents(t, ctx, db, []ReplicationEvent{expEvent1, expEvent2, expEvent3, expEvent4})
+ requireEvents(t, ctx, db, []ReplicationEvent{expEvent1, expEvent3, expEvent4})
requireLocks(t, ctx, db, []LockRow{expLock1, expLock2, expLock3}) // the new lock for same target but for another repo
db.RequireRowsInTable(t, "replication_queue_job_lock", 0) // there is no fetches it must be empty
+
+ dequeued, err := queue.Dequeue(ctx, "praefect-0", "gitaly-1", 1)
+ require.NoError(t, err)
+ require.Len(t, dequeued, 1)
+ require.Equal(t, event1.ID, dequeued[0].ID)
+
+ expEvent1.State = JobStateInProgress
+ expEvent1.Attempt = 2
+ expLock1.Acquired = true
+
+ requireEvents(t, ctx, db, []ReplicationEvent{expEvent1, expEvent3, expEvent4})
+ requireLocks(t, ctx, db, []LockRow{expLock1, expLock2, expLock3})
+
+ event5, err := queue.Enqueue(ctx, eventType1) // event for in-progress repo
+ require.NoError(t, err)
+
+ expEvent5 := ReplicationEvent{
+ ID: event5.ID,
+ State: "ready",
+ Attempt: 3,
+ LockID: "praefect-0|gitaly-1|/project/path-1",
+ Job: ReplicationJob{
+ Change: UpdateRepo,
+ RelativePath: "/project/path-1",
+ TargetNodeStorage: "gitaly-1",
+ SourceNodeStorage: "gitaly-0",
+ VirtualStorage: "praefect-0",
+ Params: nil,
+ },
+ }
+
+ requireEvents(t, ctx, db, []ReplicationEvent{expEvent1, expEvent3, expEvent4, expEvent5})
+ requireLocks(t, ctx, db, []LockRow{expLock1, expLock2, expLock3})
+
+ db.RequireRowsInTable(t, "replication_queue_job_lock", 1)
}
func TestPostgresReplicationEventQueue_Dequeue(t *testing.T) {
@@ -464,7 +484,7 @@ func TestPostgresReplicationEventQueue_DequeueMultiple(t *testing.T) {
}
// events to fill in the queue
- events := []ReplicationEvent{eventType1, eventType1, eventType2, eventType1, eventType3, eventType4}
+ events := []ReplicationEvent{eventType1, eventType2, eventType3, eventType4}
for i := range events {
var err error
events[i], err = queue.Enqueue(ctx, events[i])
@@ -472,11 +492,11 @@ func TestPostgresReplicationEventQueue_DequeueMultiple(t *testing.T) {
}
// first request to deque
- expectedEvents1 := []ReplicationEvent{events[0], events[2], events[4]}
+ expectedEvents1 := []ReplicationEvent{events[0], events[1], events[2]}
expectedJobLocks1 := []JobLockRow{
{JobID: events[0].ID, LockID: "praefect|gitaly-1|/project/path-1"},
- {JobID: events[2].ID, LockID: "praefect|gitaly-1|/project/path-1"},
- {JobID: events[4].ID, LockID: "praefect|gitaly-1|/project/path-2"},
+ {JobID: events[1].ID, LockID: "praefect|gitaly-1|/project/path-1"},
+ {JobID: events[2].ID, LockID: "praefect|gitaly-1|/project/path-2"},
}
// we expect only first two types of events by limiting count to 3
@@ -500,11 +520,11 @@ func TestPostgresReplicationEventQueue_DequeueMultiple(t *testing.T) {
// second request to deque
// there must be only last event fetched from the queue
- expectedEvents2 := []ReplicationEvent{events[5]}
+ expectedEvents2 := []ReplicationEvent{events[3]}
expectedEvents2[0].State = JobStateInProgress
expectedEvents2[0].Attempt = 2
- expectedJobLocks2 := []JobLockRow{{JobID: 6, LockID: "backup|gitaly-1|/project/path-1"}}
+ expectedJobLocks2 := []JobLockRow{{JobID: 4, LockID: "backup|gitaly-1|/project/path-1"}}
dequeuedEvents2, err := queue.Dequeue(ctx, "backup", "gitaly-1", 100500)
require.NoError(t, err)
@@ -550,10 +570,8 @@ func TestPostgresReplicationEventQueue_DequeueSameStorageOtherRepository(t *test
},
}
- for i := 0; i < 2; i++ {
- _, err := queue.Enqueue(ctx, eventType1)
- require.NoError(t, err, "failed to fill in event queue")
- }
+ _, err := queue.Enqueue(ctx, eventType1)
+ require.NoError(t, err, "failed to fill in event queue")
dequeuedEvents1, err := queue.Dequeue(ctx, "praefect", "gitaly-1", 1)
require.NoError(t, err)
@@ -564,10 +582,8 @@ func TestPostgresReplicationEventQueue_DequeueSameStorageOtherRepository(t *test
})
requireJobLocks(t, ctx, db, []JobLockRow{{JobID: 1, LockID: "praefect|gitaly-1|/project/path-1"}})
- for i := 0; i < 2; i++ {
- _, err := queue.Enqueue(ctx, eventType2)
- require.NoError(t, err, "failed to fill in event queue")
- }
+ _, err = queue.Enqueue(ctx, eventType2)
+ require.NoError(t, err, "failed to fill in event queue")
dequeuedEvents2, err := queue.Dequeue(ctx, "praefect", "gitaly-1", 1)
require.NoError(t, err)
@@ -578,7 +594,7 @@ func TestPostgresReplicationEventQueue_DequeueSameStorageOtherRepository(t *test
})
requireJobLocks(t, ctx, db, []JobLockRow{
{JobID: 1, LockID: "praefect|gitaly-1|/project/path-1"},
- {JobID: 3, LockID: "praefect|gitaly-1|/project/path-2"},
+ {JobID: 2, LockID: "praefect|gitaly-1|/project/path-2"},
})
}
@@ -675,7 +691,7 @@ func TestPostgresReplicationEventQueue_AcknowledgeMultiple(t *testing.T) {
},
}
- events := []ReplicationEvent{eventType1, eventType1, eventType2, eventType1, eventType3, eventType2, eventType4} // events to fill in the queue
+ events := []ReplicationEvent{eventType1, eventType2, eventType3, eventType4}
for i := range events {
var err error
events[i], err = queue.Enqueue(ctx, events[i])
@@ -688,29 +704,29 @@ func TestPostgresReplicationEventQueue_AcknowledgeMultiple(t *testing.T) {
require.Len(t, dequeuedEvents1, 3)
requireLocks(t, ctx, db, []LockRow{
{ID: events[0].LockID, Acquired: true},
+ {ID: events[1].LockID, Acquired: true},
{ID: events[2].LockID, Acquired: true},
- {ID: events[4].LockID, Acquired: true},
- {ID: events[6].LockID, Acquired: false},
+ {ID: events[3].LockID, Acquired: false},
})
requireJobLocks(t, ctx, db, []JobLockRow{
{JobID: events[0].ID, LockID: events[0].LockID},
+ {JobID: events[1].ID, LockID: events[1].LockID},
{JobID: events[2].ID, LockID: events[2].LockID},
- {JobID: events[4].ID, LockID: events[4].LockID},
})
// release lock for events of second type
- acknowledge1, err := queue.Acknowledge(ctx, JobStateFailed, []uint64{events[2].ID})
+ acknowledge1, err := queue.Acknowledge(ctx, JobStateFailed, []uint64{events[1].ID})
require.NoError(t, err)
- require.Equal(t, []uint64{3}, acknowledge1)
+ require.Equal(t, []uint64{2}, acknowledge1)
requireLocks(t, ctx, db, []LockRow{
{ID: events[0].LockID, Acquired: true},
- {ID: events[2].LockID, Acquired: false},
- {ID: events[4].LockID, Acquired: true},
- {ID: events[6].LockID, Acquired: false},
+ {ID: events[1].LockID, Acquired: false},
+ {ID: events[2].LockID, Acquired: true},
+ {ID: events[3].LockID, Acquired: false},
})
requireJobLocks(t, ctx, db, []JobLockRow{
{JobID: events[0].ID, LockID: events[0].LockID},
- {JobID: events[4].ID, LockID: events[4].LockID},
+ {JobID: events[2].ID, LockID: events[2].LockID},
})
dequeuedEvents2, err := queue.Dequeue(ctx, "praefect", "gitaly-1", 3)
@@ -718,14 +734,14 @@ func TestPostgresReplicationEventQueue_AcknowledgeMultiple(t *testing.T) {
require.Len(t, dequeuedEvents2, 1, "expected: events of type 2 ('failed' will be fetched for retry)")
requireLocks(t, ctx, db, []LockRow{
{ID: events[0].LockID, Acquired: true},
+ {ID: events[1].LockID, Acquired: true},
{ID: events[2].LockID, Acquired: true},
- {ID: events[4].LockID, Acquired: true},
- {ID: events[6].LockID, Acquired: false},
+ {ID: events[3].LockID, Acquired: false},
})
requireJobLocks(t, ctx, db, []JobLockRow{
{JobID: events[0].ID, LockID: events[0].LockID},
+ {JobID: events[1].ID, LockID: events[1].LockID},
{JobID: events[2].ID, LockID: events[2].LockID},
- {JobID: events[4].ID, LockID: events[4].LockID},
})
// creation of the new event that is equal to those already dequeue and processed
@@ -734,47 +750,47 @@ func TestPostgresReplicationEventQueue_AcknowledgeMultiple(t *testing.T) {
_, err = queue.Enqueue(ctx, eventType1)
require.NoError(t, err)
- acknowledge2, err := queue.Acknowledge(ctx, JobStateCompleted, []uint64{events[0].ID, events[4].ID})
+ acknowledge2, err := queue.Acknowledge(ctx, JobStateCompleted, []uint64{events[0].ID, events[2].ID})
require.NoError(t, err)
- require.Equal(t, []uint64{events[0].ID, events[4].ID}, acknowledge2)
+ require.Equal(t, []uint64{events[0].ID, events[2].ID}, acknowledge2)
requireLocks(t, ctx, db, []LockRow{
{ID: events[0].LockID, Acquired: false},
- {ID: events[2].LockID, Acquired: true},
- {ID: events[4].LockID, Acquired: false},
- {ID: events[6].LockID, Acquired: false},
+ {ID: events[1].LockID, Acquired: true},
+ {ID: events[2].LockID, Acquired: false},
+ {ID: events[3].LockID, Acquired: false},
})
requireJobLocks(t, ctx, db, []JobLockRow{
- {JobID: events[2].ID, LockID: events[2].LockID},
+ {JobID: events[1].ID, LockID: events[1].LockID},
})
- db.RequireRowsInTable(t, "replication_queue", 4)
+ db.RequireRowsInTable(t, "replication_queue", 3)
dequeuedEvents3, err := queue.Dequeue(ctx, "praefect", "gitaly-2", 3)
require.NoError(t, err)
require.Len(t, dequeuedEvents3, 1, "expected: event of type 4")
requireLocks(t, ctx, db, []LockRow{
{ID: events[0].LockID, Acquired: false},
- {ID: events[2].LockID, Acquired: true},
- {ID: events[4].LockID, Acquired: false},
- {ID: events[6].LockID, Acquired: true},
+ {ID: events[1].LockID, Acquired: true},
+ {ID: events[2].LockID, Acquired: false},
+ {ID: events[3].LockID, Acquired: true},
})
requireJobLocks(t, ctx, db, []JobLockRow{
- {JobID: events[2].ID, LockID: events[2].LockID},
- {JobID: events[6].ID, LockID: events[6].LockID},
+ {JobID: events[1].ID, LockID: events[1].LockID},
+ {JobID: events[3].ID, LockID: events[3].LockID},
})
- acknowledged3, err := queue.Acknowledge(ctx, JobStateCompleted, []uint64{events[2].ID, events[6].ID})
+ acknowledged3, err := queue.Acknowledge(ctx, JobStateCompleted, []uint64{events[1].ID, events[3].ID})
require.NoError(t, err)
- require.Equal(t, []uint64{events[2].ID, events[6].ID}, acknowledged3)
+ require.Equal(t, []uint64{events[1].ID, events[3].ID}, acknowledged3)
requireLocks(t, ctx, db, []LockRow{
{ID: events[0].LockID, Acquired: false},
+ {ID: events[1].LockID, Acquired: false},
{ID: events[2].LockID, Acquired: false},
- {ID: events[4].LockID, Acquired: false},
- {ID: events[6].LockID, Acquired: false},
+ {ID: events[3].LockID, Acquired: false},
})
requireJobLocks(t, ctx, db, nil)
db.RequireRowsInTable(t, "replication_queue", 1)
- newEvent, err := queue.Enqueue(ctx, eventType1)
+ newEvent, err := queue.Enqueue(ctx, eventType2)
require.NoError(t, err)
acknowledge4, err := queue.Acknowledge(ctx, JobStateCompleted, []uint64{newEvent.ID})
@@ -787,9 +803,9 @@ func TestPostgresReplicationEventQueue_AcknowledgeMultiple(t *testing.T) {
require.Equal(t, "ready", newEventState, "no way to acknowledge event that is not in in_progress state(was not dequeued)")
requireLocks(t, ctx, db, []LockRow{
{ID: events[0].LockID, Acquired: false},
+ {ID: events[1].LockID, Acquired: false},
{ID: events[2].LockID, Acquired: false},
- {ID: events[4].LockID, Acquired: false},
- {ID: events[6].LockID, Acquired: false},
+ {ID: events[3].LockID, Acquired: false},
})
requireJobLocks(t, ctx, db, nil)
}
diff --git a/internal/praefect/replicator_test.go b/internal/praefect/replicator_test.go
index b9349a0bc..795ac9234 100644
--- a/internal/praefect/replicator_test.go
+++ b/internal/praefect/replicator_test.go
@@ -490,7 +490,7 @@ func TestProcessBacklog_Success(t *testing.T) {
ackIDs, err := queue.Acknowledge(ctx, state, ids)
if len(ids) > 0 {
assert.Equal(t, datastore.JobStateCompleted, state, "no fails expected")
- assert.Equal(t, []uint64{1, 3, 4}, ids, "all jobs must be processed at once")
+ assert.Equal(t, []uint64{1, 2, 3}, ids, "all jobs must be processed at once")
}
return ackIDs, err
})
@@ -517,9 +517,6 @@ func TestProcessBacklog_Success(t *testing.T) {
_, err := queueInterceptor.Enqueue(ctx, eventType1)
require.NoError(t, err)
- _, err = queueInterceptor.Enqueue(ctx, eventType1)
- require.NoError(t, err)
-
renameTo1 := filepath.Join(testRepo.GetRelativePath(), "..", filepath.Base(testRepo.GetRelativePath())+"-mv1")
fullNewPath1 := filepath.Join(backupCfg.Storages[0].Path, renameTo1)