diff options
author | Toon Claes <toon@gitlab.com> | 2021-09-27 15:32:26 +0300 |
---|---|---|
committer | Toon Claes <toon@gitlab.com> | 2021-09-27 15:32:26 +0300 |
commit | 947aaa9aab0737ec4dd8abc65d53ca824aac71ba (patch) | |
tree | b3c386fe1f073653a4dc00c9e66e1dcc5efae451 /internal | |
parent | 80b9699ea44b716e74edf32130a6a3966cb59cae (diff) | |
parent | 2869d08ce029ef6cb11c19bad71c5f924dd275dd (diff) |
Merge branch 'ps-graceful-backlog-stop' into 'master'
replication: Graceful stop of the replication processing loop
Closes #2703
See merge request gitlab-org/gitaly!3885
Diffstat (limited to 'internal')
-rw-r--r-- | internal/praefect/coordinator_test.go | 48 | ||||
-rw-r--r-- | internal/praefect/datastore/memory.go | 185 | ||||
-rw-r--r-- | internal/praefect/helper_test.go | 5 | ||||
-rw-r--r-- | internal/praefect/replicator.go | 13 | ||||
-rw-r--r-- | internal/praefect/replicator_test.go | 95 | ||||
-rw-r--r-- | internal/praefect/server_test.go | 37 |
6 files changed, 231 insertions, 152 deletions
diff --git a/internal/praefect/coordinator_test.go b/internal/praefect/coordinator_test.go index 35e4b54a5..9df1dcf76 100644 --- a/internal/praefect/coordinator_test.go +++ b/internal/praefect/coordinator_test.go @@ -157,14 +157,7 @@ func TestStreamDirectorMutator(t *testing.T) { }, }, } - - var replEventWait sync.WaitGroup - - queueInterceptor := datastore.NewReplicationEventQueueInterceptor(datastore.NewPostgresReplicationEventQueue(glsql.NewDB(t))) - queueInterceptor.OnEnqueue(func(ctx context.Context, event datastore.ReplicationEvent, queue datastore.ReplicationEventQueue) (datastore.ReplicationEvent, error) { - defer replEventWait.Done() - return queue.Enqueue(ctx, event) - }) + db := glsql.NewDB(t) targetRepo := gitalypb.Repository{ StorageName: "praefect", @@ -195,7 +188,7 @@ func TestStreamDirectorMutator(t *testing.T) { }, } { t.Run(tc.desc, func(t *testing.T) { - tx := glsql.NewDB(t).Begin(t) + tx := db.Begin(t) defer tx.Rollback(t) rs := datastore.NewPostgresRepositoryStore(tx, conf.StorageNames()) @@ -205,6 +198,11 @@ func TestStreamDirectorMutator(t *testing.T) { } testhelper.SetHealthyNodes(t, ctx, tx, map[string]map[string][]string{"praefect": conf.StorageNames()}) + queueInterceptor := datastore.NewReplicationEventQueueInterceptor(datastore.NewPostgresReplicationEventQueue(db)) + queueInterceptor.OnEnqueue(func(ctx context.Context, event datastore.ReplicationEvent, queue datastore.ReplicationEventQueue) (datastore.ReplicationEvent, error) { + assert.True(t, len(queueInterceptor.GetEnqueued()) < 2, "expected only one event to be created") + return queue.Enqueue(ctx, event) + }) coordinator := NewCoordinator( queueInterceptor, @@ -254,11 +252,14 @@ func TestStreamDirectorMutator(t *testing.T) { require.NoError(t, err) require.Equal(t, "praefect-internal-1", rewrittenTargetRepo.GetStorageName(), "stream director should have rewritten the storage name") - replEventWait.Add(1) // expected only one event to be created // this call creates new events in the queue and simulates usual flow of the update operation require.NoError(t, streamParams.RequestFinalizer()) - replEventWait.Wait() // wait until event persisted (async operation) + // wait until event persisted (async operation) + require.NoError(t, queueInterceptor.Wait(time.Minute, func(i *datastore.ReplicationEventQueueInterceptor) bool { + return len(i.GetEnqueuedResult()) == 1 + })) + events, err := queueInterceptor.Dequeue(ctx, "praefect", "praefect-internal-2", 10) require.NoError(t, err) require.Len(t, events, 1) @@ -832,13 +833,6 @@ func TestStreamDirector_repo_creation(t *testing.T) { }, } - var replEventWait sync.WaitGroup - queueInterceptor := datastore.NewReplicationEventQueueInterceptor(datastore.NewPostgresReplicationEventQueue(db)) - queueInterceptor.OnEnqueue(func(ctx context.Context, event datastore.ReplicationEvent, queue datastore.ReplicationEventQueue) (datastore.ReplicationEvent, error) { - defer replEventWait.Done() - return queue.Enqueue(ctx, event) - }) - rewrittenStorage := primaryNode.Storage targetRepo := gitalypb.Repository{ StorageName: "praefect", @@ -926,6 +920,7 @@ func TestStreamDirector_repo_creation(t *testing.T) { } txMgr := transactions.NewManager(conf) + queueInterceptor := datastore.NewReplicationEventQueueInterceptor(datastore.NewPostgresReplicationEventQueue(db)) coordinator := NewCoordinator( queueInterceptor, @@ -967,8 +962,6 @@ func TestStreamDirector_repo_creation(t *testing.T) { require.NoError(t, err) require.Equal(t, rewrittenStorage, rewrittenTargetRepo.GetStorageName(), "stream director should have rewritten the storage name") - replEventWait.Add(1) - vote := voting.VoteFromData([]byte{}) require.NoError(t, txMgr.VoteTransaction(ctx, 1, "praefect-internal-1", vote)) require.NoError(t, txMgr.VoteTransaction(ctx, 1, "praefect-internal-2", vote)) @@ -977,7 +970,10 @@ func TestStreamDirector_repo_creation(t *testing.T) { err = streamParams.RequestFinalizer() require.NoError(t, err) - replEventWait.Wait() // wait until event persisted (async operation) + // wait until event persisted (async operation) + require.NoError(t, queueInterceptor.Wait(time.Minute, func(i *datastore.ReplicationEventQueueInterceptor) bool { + return len(i.GetEnqueuedResult()) == 1 + })) var expectedEvents, actualEvents []datastore.ReplicationEvent for _, target := range []string{unhealthySecondaryNode.Storage} { @@ -1063,14 +1059,11 @@ func TestAbsentCorrelationID(t *testing.T) { }, } - var replEventWait sync.WaitGroup - queueInterceptor := datastore.NewReplicationEventQueueInterceptor(datastore.NewPostgresReplicationEventQueue(glsql.NewDB(t))) queueInterceptor.OnEnqueue(func(ctx context.Context, event datastore.ReplicationEvent, queue datastore.ReplicationEventQueue) (datastore.ReplicationEvent, error) { - defer replEventWait.Done() + assert.True(t, len(queueInterceptor.GetEnqueued()) < 2, "expected only one event to be created") return queue.Enqueue(ctx, event) }) - targetRepo := gitalypb.Repository{ StorageName: "praefect", RelativePath: "/path/to/hashed/storage", @@ -1109,11 +1102,12 @@ func TestAbsentCorrelationID(t *testing.T) { require.NoError(t, err) require.Equal(t, primaryAddress, streamParams.Primary().Conn.Target()) - replEventWait.Add(1) // expected only one event to be created // must be run as it adds replication events to the queue require.NoError(t, streamParams.RequestFinalizer()) - replEventWait.Wait() // wait until event persisted (async operation) + require.NoError(t, queueInterceptor.Wait(time.Minute, func(i *datastore.ReplicationEventQueueInterceptor) bool { + return len(i.GetEnqueuedResult()) == 1 + })) jobs, err := queueInterceptor.Dequeue(ctx, conf.VirtualStorages[0].Name, conf.VirtualStorages[0].Nodes[1].Storage, 1) require.NoError(t, err) require.Len(t, jobs, 1) diff --git a/internal/praefect/datastore/memory.go b/internal/praefect/datastore/memory.go index 9c8139d48..a57507ffe 100644 --- a/internal/praefect/datastore/memory.go +++ b/internal/praefect/datastore/memory.go @@ -216,87 +216,204 @@ func (s *memoryReplicationEventQueue) defineDest(event ReplicationEvent) eventDe return eventDestination{virtual: event.Job.VirtualStorage, storage: event.Job.TargetNodeStorage, relativePath: event.Job.RelativePath} } -// ReplicationEventQueueInterceptor allows to register interceptors for `ReplicationEventQueue` interface. -type ReplicationEventQueueInterceptor interface { - // ReplicationEventQueue actual implementation. - ReplicationEventQueue - // OnEnqueue allows to set action that would be executed each time when `Enqueue` method called. - OnEnqueue(func(context.Context, ReplicationEvent, ReplicationEventQueue) (ReplicationEvent, error)) - // OnDequeue allows to set action that would be executed each time when `Dequeue` method called. - OnDequeue(func(context.Context, string, string, int, ReplicationEventQueue) ([]ReplicationEvent, error)) - // OnAcknowledge allows to set action that would be executed each time when `Acknowledge` method called. - OnAcknowledge(func(context.Context, JobState, []uint64, ReplicationEventQueue) ([]uint64, error)) - // OnStartHealthUpdate allows to set action that would be executed each time when `StartHealthUpdate` method called. - OnStartHealthUpdate(func(context.Context, <-chan time.Time, []ReplicationEvent) error) - // OnAcknowledgeStale allows to set action that would be executed each time when `AcknowledgeStale` method called. - OnAcknowledgeStale(func(context.Context, time.Duration) error) +// NewReplicationEventQueueInterceptor returns interception over `ReplicationEventQueue` interface. +func NewReplicationEventQueueInterceptor(queue ReplicationEventQueue) *ReplicationEventQueueInterceptor { + return &ReplicationEventQueueInterceptor{ + ReplicationEventQueue: queue, + } } -// NewReplicationEventQueueInterceptor returns interception over `ReplicationEventQueue` interface. -func NewReplicationEventQueueInterceptor(queue ReplicationEventQueue) ReplicationEventQueueInterceptor { - return &replicationEventQueueInterceptor{ReplicationEventQueue: queue} +// DequeParams is the list of parameters used for Dequeue method call. +type DequeParams struct { + VirtualStorage, NodeStorage string + Count int } -type replicationEventQueueInterceptor struct { +// AcknowledgeParams is the list of parameters used for Acknowledge method call. +type AcknowledgeParams struct { + State JobState + IDs []uint64 +} + +// ReplicationEventQueueInterceptor allows to register interceptors for `ReplicationEventQueue` interface. +// It also provides additional methods to get info about incoming and outgoing data from the underling +// queue. +// NOTE: it should be used for testing purposes only as it persists data in memory and doesn't clean it up. +type ReplicationEventQueueInterceptor struct { + mtx sync.Mutex ReplicationEventQueue onEnqueue func(context.Context, ReplicationEvent, ReplicationEventQueue) (ReplicationEvent, error) onDequeue func(context.Context, string, string, int, ReplicationEventQueue) ([]ReplicationEvent, error) onAcknowledge func(context.Context, JobState, []uint64, ReplicationEventQueue) ([]uint64, error) onStartHealthUpdate func(context.Context, <-chan time.Time, []ReplicationEvent) error onAcknowledgeStale func(context.Context, time.Duration) error + + enqueue []ReplicationEvent + enqueueResult []ReplicationEvent + dequeue []DequeParams + dequeueResult [][]ReplicationEvent + acknowledge []AcknowledgeParams + acknowledgeResult [][]uint64 } -func (i *replicationEventQueueInterceptor) OnEnqueue(action func(context.Context, ReplicationEvent, ReplicationEventQueue) (ReplicationEvent, error)) { +// OnEnqueue allows to set action that would be executed each time when `Enqueue` method called. +func (i *ReplicationEventQueueInterceptor) OnEnqueue(action func(context.Context, ReplicationEvent, ReplicationEventQueue) (ReplicationEvent, error)) { i.onEnqueue = action } -func (i *replicationEventQueueInterceptor) OnDequeue(action func(context.Context, string, string, int, ReplicationEventQueue) ([]ReplicationEvent, error)) { +// OnDequeue allows to set action that would be executed each time when `Dequeue` method called. +func (i *ReplicationEventQueueInterceptor) OnDequeue(action func(context.Context, string, string, int, ReplicationEventQueue) ([]ReplicationEvent, error)) { i.onDequeue = action } -func (i *replicationEventQueueInterceptor) OnAcknowledge(action func(context.Context, JobState, []uint64, ReplicationEventQueue) ([]uint64, error)) { +// OnAcknowledge allows to set action that would be executed each time when `Acknowledge` method called. +func (i *ReplicationEventQueueInterceptor) OnAcknowledge(action func(context.Context, JobState, []uint64, ReplicationEventQueue) ([]uint64, error)) { i.onAcknowledge = action } -func (i *replicationEventQueueInterceptor) OnStartHealthUpdate(action func(context.Context, <-chan time.Time, []ReplicationEvent) error) { +// OnStartHealthUpdate allows to set action that would be executed each time when `StartHealthUpdate` method called. +func (i *ReplicationEventQueueInterceptor) OnStartHealthUpdate(action func(context.Context, <-chan time.Time, []ReplicationEvent) error) { i.onStartHealthUpdate = action } -func (i *replicationEventQueueInterceptor) OnAcknowledgeStale(action func(context.Context, time.Duration) error) { +// OnAcknowledgeStale allows to set action that would be executed each time when `AcknowledgeStale` method called. +func (i *ReplicationEventQueueInterceptor) OnAcknowledgeStale(action func(context.Context, time.Duration) error) { i.onAcknowledgeStale = action } -func (i *replicationEventQueueInterceptor) Enqueue(ctx context.Context, event ReplicationEvent) (ReplicationEvent, error) { +// Enqueue intercepts call to the Enqueue method of the underling implementation or a call back. +// It populates storage of incoming and outgoing parameters before and after method call. +func (i *ReplicationEventQueueInterceptor) Enqueue(ctx context.Context, event ReplicationEvent) (ReplicationEvent, error) { + i.mtx.Lock() + i.enqueue = append(i.enqueue, event) + i.mtx.Unlock() + + var enqEvent ReplicationEvent + var err error + if i.onEnqueue != nil { - return i.onEnqueue(ctx, event, i.ReplicationEventQueue) + enqEvent, err = i.onEnqueue(ctx, event, i.ReplicationEventQueue) + } else { + enqEvent, err = i.ReplicationEventQueue.Enqueue(ctx, event) } - return i.ReplicationEventQueue.Enqueue(ctx, event) + + i.mtx.Lock() + i.enqueueResult = append(i.enqueueResult, enqEvent) + i.mtx.Unlock() + return enqEvent, err } -func (i *replicationEventQueueInterceptor) Dequeue(ctx context.Context, virtualStorage, nodeStorage string, count int) ([]ReplicationEvent, error) { +// Dequeue intercepts call to the Dequeue method of the underling implementation or a call back. +// It populates storage of incoming and outgoing parameters before and after method call. +func (i *ReplicationEventQueueInterceptor) Dequeue(ctx context.Context, virtualStorage, nodeStorage string, count int) ([]ReplicationEvent, error) { + i.mtx.Lock() + i.dequeue = append(i.dequeue, DequeParams{VirtualStorage: virtualStorage, NodeStorage: nodeStorage, Count: count}) + i.mtx.Unlock() + + var deqEvents []ReplicationEvent + var err error + if i.onDequeue != nil { - return i.onDequeue(ctx, virtualStorage, nodeStorage, count, i.ReplicationEventQueue) + deqEvents, err = i.onDequeue(ctx, virtualStorage, nodeStorage, count, i.ReplicationEventQueue) + } else { + deqEvents, err = i.ReplicationEventQueue.Dequeue(ctx, virtualStorage, nodeStorage, count) } - return i.ReplicationEventQueue.Dequeue(ctx, virtualStorage, nodeStorage, count) + + i.mtx.Lock() + i.dequeueResult = append(i.dequeueResult, deqEvents) + i.mtx.Unlock() + return deqEvents, err } -func (i *replicationEventQueueInterceptor) Acknowledge(ctx context.Context, state JobState, ids []uint64) ([]uint64, error) { +// Acknowledge intercepts call to the Acknowledge method of the underling implementation or a call back. +// It populates storage of incoming and outgoing parameters before and after method call. +func (i *ReplicationEventQueueInterceptor) Acknowledge(ctx context.Context, state JobState, ids []uint64) ([]uint64, error) { + i.mtx.Lock() + i.acknowledge = append(i.acknowledge, AcknowledgeParams{State: state, IDs: ids}) + i.mtx.Unlock() + + var ackIDs []uint64 + var err error + if i.onAcknowledge != nil { - return i.onAcknowledge(ctx, state, ids, i.ReplicationEventQueue) + ackIDs, err = i.onAcknowledge(ctx, state, ids, i.ReplicationEventQueue) + } else { + ackIDs, err = i.ReplicationEventQueue.Acknowledge(ctx, state, ids) } - return i.ReplicationEventQueue.Acknowledge(ctx, state, ids) + + i.mtx.Lock() + i.acknowledgeResult = append(i.acknowledgeResult, ackIDs) + i.mtx.Unlock() + return ackIDs, err } -func (i *replicationEventQueueInterceptor) StartHealthUpdate(ctx context.Context, trigger <-chan time.Time, events []ReplicationEvent) error { +// StartHealthUpdate intercepts call to the StartHealthUpdate method of the underling implementation or a call back. +func (i *ReplicationEventQueueInterceptor) StartHealthUpdate(ctx context.Context, trigger <-chan time.Time, events []ReplicationEvent) error { if i.onStartHealthUpdate != nil { return i.onStartHealthUpdate(ctx, trigger, events) } return i.ReplicationEventQueue.StartHealthUpdate(ctx, trigger, events) } -func (i *replicationEventQueueInterceptor) AcknowledgeStale(ctx context.Context, staleAfter time.Duration) error { +// AcknowledgeStale intercepts call to the AcknowledgeStale method of the underling implementation or a call back. +func (i *ReplicationEventQueueInterceptor) AcknowledgeStale(ctx context.Context, staleAfter time.Duration) error { if i.onAcknowledgeStale != nil { return i.onAcknowledgeStale(ctx, staleAfter) } return i.ReplicationEventQueue.AcknowledgeStale(ctx, staleAfter) } + +// GetEnqueued returns a list of events used for Enqueue method or a call-back invocation. +func (i *ReplicationEventQueueInterceptor) GetEnqueued() []ReplicationEvent { + i.mtx.Lock() + defer i.mtx.Unlock() + return i.enqueue +} + +// GetEnqueuedResult returns a list of events returned by Enqueue method or a call-back invocation. +func (i *ReplicationEventQueueInterceptor) GetEnqueuedResult() []ReplicationEvent { + i.mtx.Lock() + defer i.mtx.Unlock() + return i.enqueueResult +} + +// GetDequeued returns a list of parameters used for Dequeue method or a call-back invocation. +func (i *ReplicationEventQueueInterceptor) GetDequeued() []DequeParams { + i.mtx.Lock() + defer i.mtx.Unlock() + return i.dequeue +} + +// GetDequeuedResult returns a list of events returned after Dequeue method or a call-back invocation. +func (i *ReplicationEventQueueInterceptor) GetDequeuedResult() [][]ReplicationEvent { + i.mtx.Lock() + defer i.mtx.Unlock() + return i.dequeueResult +} + +// GetAcknowledge returns a list of parameters used for Acknowledge method or a call-back invocation. +func (i *ReplicationEventQueueInterceptor) GetAcknowledge() []AcknowledgeParams { + i.mtx.Lock() + defer i.mtx.Unlock() + return i.acknowledge +} + +// GetAcknowledgeResult returns a list of results returned after Acknowledge method or a call-back invocation. +func (i *ReplicationEventQueueInterceptor) GetAcknowledgeResult() [][]uint64 { + i.mtx.Lock() + defer i.mtx.Unlock() + return i.acknowledgeResult +} + +// Wait checks the condition in a loop with await until it returns true or deadline is exceeded. +// The error is returned only in case the deadline is exceeded. +func (i *ReplicationEventQueueInterceptor) Wait(deadline time.Duration, condition func(i *ReplicationEventQueueInterceptor) bool) error { + dead := time.Now().Add(deadline) + for !condition(i) { + if dead.Before(time.Now()) { + return context.DeadlineExceeded + } + time.Sleep(time.Millisecond * 100) + } + return nil +} diff --git a/internal/praefect/helper_test.go b/internal/praefect/helper_test.go index 0c559047d..e6e8f53ee 100644 --- a/internal/praefect/helper_test.go +++ b/internal/praefect/helper_test.go @@ -200,7 +200,10 @@ func runPraefectServer(t testing.TB, ctx context.Context, conf config.Config, op errQ := make(chan error) ctx, cancel := context.WithCancel(ctx) - go func() { errQ <- prf.Serve(listener) }() + go func() { + errQ <- prf.Serve(listener) + close(errQ) + }() replMgrDone := startProcessBacklog(ctx, replmgr) // dial client to praefect diff --git a/internal/praefect/replicator.go b/internal/praefect/replicator.go index 8d3cbdf4f..b770cfef7 100644 --- a/internal/praefect/replicator.go +++ b/internal/praefect/replicator.go @@ -11,6 +11,7 @@ import ( "github.com/sirupsen/logrus" "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/service/repository" "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/storage" + "gitlab.com/gitlab-org/gitaly/v14/internal/helper" "gitlab.com/gitlab-org/gitaly/v14/internal/middleware/metadatahandler" "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/config" "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore" @@ -569,10 +570,14 @@ func (r ReplMgr) processBacklog(ctx context.Context, b BackoffFunc, virtualStora logger.Info("processing started") + // We should make a graceful shutdown of the processing loop and don't want to interrupt + // in-flight operations. That is why we suppress cancellation on the provided context. + appCtx := ctx + ctx = helper.SuppressCancellation(ctx) for { select { - case <-ctx.Done(): - logger.WithError(ctx.Err()).Info("processing stopped") + case <-appCtx.Done(): + logger.WithError(appCtx.Err()).Info("processing stopped") return // processing must be stopped default: // proceed with processing @@ -593,8 +598,8 @@ func (r ReplMgr) processBacklog(ctx context.Context, b BackoffFunc, virtualStora select { case <-time.After(backoff()): continue - case <-ctx.Done(): - logger.WithError(ctx.Err()).Info("processing stopped") + case <-appCtx.Done(): + logger.WithError(appCtx.Err()).Info("processing stopped") return } } diff --git a/internal/praefect/replicator_test.go b/internal/praefect/replicator_test.go index 33fab625c..798005284 100644 --- a/internal/praefect/replicator_test.go +++ b/internal/praefect/replicator_test.go @@ -679,41 +679,6 @@ func TestProcessBacklog_FailedJobs(t *testing.T) { defer cancel() queueInterceptor := datastore.NewReplicationEventQueueInterceptor(datastore.NewPostgresReplicationEventQueue(glsql.NewDB(t))) - processed := make(chan struct{}) - - dequeues := 0 - queueInterceptor.OnDequeue(func(ctx context.Context, virtual, target string, count int, queue datastore.ReplicationEventQueue) ([]datastore.ReplicationEvent, error) { - events, err := queue.Dequeue(ctx, virtual, target, count) - if len(events) > 0 { - dequeues++ - } - return events, err - }) - - completedAcks := 0 - failedAcks := 0 - deadAcks := 0 - - queueInterceptor.OnAcknowledge(func(ctx context.Context, state datastore.JobState, ids []uint64, queue datastore.ReplicationEventQueue) ([]uint64, error) { - switch state { - case datastore.JobStateCompleted: - require.Equal(t, []uint64{1}, ids) - completedAcks++ - case datastore.JobStateFailed: - require.Equal(t, []uint64{2}, ids) - failedAcks++ - case datastore.JobStateDead: - require.Equal(t, []uint64{2}, ids) - deadAcks++ - default: - require.FailNow(t, "acknowledge is not expected", state) - } - ackIDs, err := queue.Acknowledge(ctx, state, ids) - if completedAcks+failedAcks+deadAcks == 4 { - close(processed) - } - return ackIDs, err - }) // this job exists to verify that replication works okJob := datastore.ReplicationJob{ @@ -750,19 +715,30 @@ func TestProcessBacklog_FailedJobs(t *testing.T) { ) replMgrDone := startProcessBacklog(ctx, replMgr) - select { - case <-processed: - cancel() - case <-time.After(60 * time.Second): - // strongly depends on the processing capacity - t.Fatal("time limit expired for job to complete") + require.NoError(t, queueInterceptor.Wait(time.Minute, func(i *datastore.ReplicationEventQueueInterceptor) bool { + return len(i.GetAcknowledgeResult()) == 4 + })) + cancel() + <-replMgrDone + + var dequeueCalledEffectively int + for _, res := range queueInterceptor.GetDequeuedResult() { + if len(res) > 0 { + dequeueCalledEffectively++ + } } + require.Equal(t, 3, dequeueCalledEffectively, "expected 1 deque to get [okJob, failJob] and 2 more for [failJob] only") - require.Equal(t, 3, dequeues, "expected 1 deque to get [okJob, failJob] and 2 more for [failJob] only") - require.Equal(t, 2, failedAcks) - require.Equal(t, 1, deadAcks) - require.Equal(t, 1, completedAcks) - <-replMgrDone + expAcks := map[datastore.JobState][]uint64{ + datastore.JobStateFailed: {2, 2}, + datastore.JobStateDead: {2}, + datastore.JobStateCompleted: {1}, + } + acks := map[datastore.JobState][]uint64{} + for _, ack := range queueInterceptor.GetAcknowledge() { + acks[ack.State] = append(acks[ack.State], ack.IDs...) + } + require.Equal(t, expAcks, acks) } func TestProcessBacklog_Success(t *testing.T) { @@ -803,21 +779,18 @@ func TestProcessBacklog_Success(t *testing.T) { defer cancel() queueInterceptor := datastore.NewReplicationEventQueueInterceptor(datastore.NewPostgresReplicationEventQueue(glsql.NewDB(t))) - - processed := make(chan struct{}) queueInterceptor.OnAcknowledge(func(ctx context.Context, state datastore.JobState, ids []uint64, queue datastore.ReplicationEventQueue) ([]uint64, error) { ackIDs, err := queue.Acknowledge(ctx, state, ids) if len(ids) > 0 { - require.Equal(t, datastore.JobStateCompleted, state, "no fails expected") - require.Equal(t, []uint64{1, 3, 4}, ids, "all jobs must be processed at once") - close(processed) + assert.Equal(t, datastore.JobStateCompleted, state, "no fails expected") + assert.Equal(t, []uint64{1, 3, 4}, ids, "all jobs must be processed at once") } return ackIDs, err }) var healthUpdated int32 queueInterceptor.OnStartHealthUpdate(func(ctx context.Context, trigger <-chan time.Time, events []datastore.ReplicationEvent) error { - require.Len(t, events, 3) + assert.Len(t, events, 3) atomic.AddInt32(&healthUpdated, 1) return nil }) @@ -892,18 +865,18 @@ func TestProcessBacklog_Success(t *testing.T) { ) replMgrDone := startProcessBacklog(ctx, replMgr) - select { - case <-processed: - require.EqualValues(t, 1, atomic.LoadInt32(&healthUpdated), "health update should be called") - cancel() - case <-time.After(30 * time.Second): - // strongly depends on the processing capacity - t.Fatal("time limit expired for job to complete") - } + require.NoError(t, queueInterceptor.Wait(time.Minute, func(i *datastore.ReplicationEventQueueInterceptor) bool { + var ids []uint64 + for _, params := range i.GetAcknowledge() { + ids = append(ids, params.IDs...) + } + return len(ids) == 3 + })) + cancel() + <-replMgrDone require.NoDirExists(t, fullNewPath1, "repository must be moved from %q to the new location", fullNewPath1) require.True(t, storage.IsGitDirectory(fullNewPath2), "repository must exist at new last RenameRepository location") - <-replMgrDone } func TestReplMgrProcessBacklog_OnlyHealthyNodes(t *testing.T) { diff --git a/internal/praefect/server_test.go b/internal/praefect/server_test.go index 275f21903..e824cb600 100644 --- a/internal/praefect/server_test.go +++ b/internal/praefect/server_test.go @@ -534,20 +534,7 @@ func TestRemoveRepository(t *testing.T) { verifyReposExistence(t, codes.OK) - // TODO: once https://gitlab.com/gitlab-org/gitaly/-/issues/2703 is done and the replication manager supports - // graceful shutdown, we can remove this code that waits for jobs to be complete queueInterceptor := datastore.NewReplicationEventQueueInterceptor(datastore.NewPostgresReplicationEventQueue(glsql.NewDB(t))) - jobsDoneCh := make(chan struct{}, 2) - queueInterceptor.OnAcknowledge(func(ctx context.Context, state datastore.JobState, ids []uint64, queue datastore.ReplicationEventQueue) ([]uint64, error) { - defer func() { - if state == datastore.JobStateCompleted { - jobsDoneCh <- struct{}{} - } - }() - - return queue.Acknowledge(ctx, state, ids) - }) - repoStore := defaultRepoStore(praefectCfg) txMgr := defaultTxMgr(praefectCfg) nodeMgr, err := nodes.NewManager(testhelper.DiscardTestEntry(t), praefectCfg, nil, @@ -579,9 +566,15 @@ func TestRemoveRepository(t *testing.T) { }) require.NoError(t, err) - for i := 0; i < cap(jobsDoneCh); i++ { - <-jobsDoneCh - } + require.NoError(t, queueInterceptor.Wait(time.Minute, func(i *datastore.ReplicationEventQueueInterceptor) bool { + var compl int + for _, ack := range i.GetAcknowledge() { + if ack.State == datastore.JobStateCompleted { + compl++ + } + } + return compl == 2 + })) verifyReposExistence(t, codes.NotFound) } @@ -632,15 +625,7 @@ func TestRenameRepository(t *testing.T) { repoPaths[i] = filepath.Join(gitalyCfg.Storages[0].Path, relativePath) } - var canCheckRepo sync.WaitGroup - canCheckRepo.Add(2) - evq := datastore.NewReplicationEventQueueInterceptor(datastore.NewPostgresReplicationEventQueue(glsql.NewDB(t))) - evq.OnAcknowledge(func(ctx context.Context, state datastore.JobState, ids []uint64, queue datastore.ReplicationEventQueue) ([]uint64, error) { - defer canCheckRepo.Done() - return queue.Acknowledge(ctx, state, ids) - }) - ctx, cancel := testhelper.Context() defer cancel() @@ -677,7 +662,9 @@ func TestRenameRepository(t *testing.T) { // wait until replication jobs propagate changes to other storages // as we don't know which one will be used to check because of reads distribution - canCheckRepo.Wait() + require.NoError(t, evq.Wait(time.Minute, func(i *datastore.ReplicationEventQueueInterceptor) bool { + return len(i.GetAcknowledge()) == 2 + })) for _, oldLocation := range repoPaths { pollUntilRemoved(t, oldLocation, time.After(10*time.Second)) |