Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorToon Claes <toon@gitlab.com>2021-09-27 15:32:26 +0300
committerToon Claes <toon@gitlab.com>2021-09-27 15:32:26 +0300
commit947aaa9aab0737ec4dd8abc65d53ca824aac71ba (patch)
treeb3c386fe1f073653a4dc00c9e66e1dcc5efae451 /internal
parent80b9699ea44b716e74edf32130a6a3966cb59cae (diff)
parent2869d08ce029ef6cb11c19bad71c5f924dd275dd (diff)
Merge branch 'ps-graceful-backlog-stop' into 'master'
replication: Graceful stop of the replication processing loop Closes #2703 See merge request gitlab-org/gitaly!3885
Diffstat (limited to 'internal')
-rw-r--r--internal/praefect/coordinator_test.go48
-rw-r--r--internal/praefect/datastore/memory.go185
-rw-r--r--internal/praefect/helper_test.go5
-rw-r--r--internal/praefect/replicator.go13
-rw-r--r--internal/praefect/replicator_test.go95
-rw-r--r--internal/praefect/server_test.go37
6 files changed, 231 insertions, 152 deletions
diff --git a/internal/praefect/coordinator_test.go b/internal/praefect/coordinator_test.go
index 35e4b54a5..9df1dcf76 100644
--- a/internal/praefect/coordinator_test.go
+++ b/internal/praefect/coordinator_test.go
@@ -157,14 +157,7 @@ func TestStreamDirectorMutator(t *testing.T) {
},
},
}
-
- var replEventWait sync.WaitGroup
-
- queueInterceptor := datastore.NewReplicationEventQueueInterceptor(datastore.NewPostgresReplicationEventQueue(glsql.NewDB(t)))
- queueInterceptor.OnEnqueue(func(ctx context.Context, event datastore.ReplicationEvent, queue datastore.ReplicationEventQueue) (datastore.ReplicationEvent, error) {
- defer replEventWait.Done()
- return queue.Enqueue(ctx, event)
- })
+ db := glsql.NewDB(t)
targetRepo := gitalypb.Repository{
StorageName: "praefect",
@@ -195,7 +188,7 @@ func TestStreamDirectorMutator(t *testing.T) {
},
} {
t.Run(tc.desc, func(t *testing.T) {
- tx := glsql.NewDB(t).Begin(t)
+ tx := db.Begin(t)
defer tx.Rollback(t)
rs := datastore.NewPostgresRepositoryStore(tx, conf.StorageNames())
@@ -205,6 +198,11 @@ func TestStreamDirectorMutator(t *testing.T) {
}
testhelper.SetHealthyNodes(t, ctx, tx, map[string]map[string][]string{"praefect": conf.StorageNames()})
+ queueInterceptor := datastore.NewReplicationEventQueueInterceptor(datastore.NewPostgresReplicationEventQueue(db))
+ queueInterceptor.OnEnqueue(func(ctx context.Context, event datastore.ReplicationEvent, queue datastore.ReplicationEventQueue) (datastore.ReplicationEvent, error) {
+ assert.True(t, len(queueInterceptor.GetEnqueued()) < 2, "expected only one event to be created")
+ return queue.Enqueue(ctx, event)
+ })
coordinator := NewCoordinator(
queueInterceptor,
@@ -254,11 +252,14 @@ func TestStreamDirectorMutator(t *testing.T) {
require.NoError(t, err)
require.Equal(t, "praefect-internal-1", rewrittenTargetRepo.GetStorageName(), "stream director should have rewritten the storage name")
- replEventWait.Add(1) // expected only one event to be created
// this call creates new events in the queue and simulates usual flow of the update operation
require.NoError(t, streamParams.RequestFinalizer())
- replEventWait.Wait() // wait until event persisted (async operation)
+ // wait until event persisted (async operation)
+ require.NoError(t, queueInterceptor.Wait(time.Minute, func(i *datastore.ReplicationEventQueueInterceptor) bool {
+ return len(i.GetEnqueuedResult()) == 1
+ }))
+
events, err := queueInterceptor.Dequeue(ctx, "praefect", "praefect-internal-2", 10)
require.NoError(t, err)
require.Len(t, events, 1)
@@ -832,13 +833,6 @@ func TestStreamDirector_repo_creation(t *testing.T) {
},
}
- var replEventWait sync.WaitGroup
- queueInterceptor := datastore.NewReplicationEventQueueInterceptor(datastore.NewPostgresReplicationEventQueue(db))
- queueInterceptor.OnEnqueue(func(ctx context.Context, event datastore.ReplicationEvent, queue datastore.ReplicationEventQueue) (datastore.ReplicationEvent, error) {
- defer replEventWait.Done()
- return queue.Enqueue(ctx, event)
- })
-
rewrittenStorage := primaryNode.Storage
targetRepo := gitalypb.Repository{
StorageName: "praefect",
@@ -926,6 +920,7 @@ func TestStreamDirector_repo_creation(t *testing.T) {
}
txMgr := transactions.NewManager(conf)
+ queueInterceptor := datastore.NewReplicationEventQueueInterceptor(datastore.NewPostgresReplicationEventQueue(db))
coordinator := NewCoordinator(
queueInterceptor,
@@ -967,8 +962,6 @@ func TestStreamDirector_repo_creation(t *testing.T) {
require.NoError(t, err)
require.Equal(t, rewrittenStorage, rewrittenTargetRepo.GetStorageName(), "stream director should have rewritten the storage name")
- replEventWait.Add(1)
-
vote := voting.VoteFromData([]byte{})
require.NoError(t, txMgr.VoteTransaction(ctx, 1, "praefect-internal-1", vote))
require.NoError(t, txMgr.VoteTransaction(ctx, 1, "praefect-internal-2", vote))
@@ -977,7 +970,10 @@ func TestStreamDirector_repo_creation(t *testing.T) {
err = streamParams.RequestFinalizer()
require.NoError(t, err)
- replEventWait.Wait() // wait until event persisted (async operation)
+ // wait until event persisted (async operation)
+ require.NoError(t, queueInterceptor.Wait(time.Minute, func(i *datastore.ReplicationEventQueueInterceptor) bool {
+ return len(i.GetEnqueuedResult()) == 1
+ }))
var expectedEvents, actualEvents []datastore.ReplicationEvent
for _, target := range []string{unhealthySecondaryNode.Storage} {
@@ -1063,14 +1059,11 @@ func TestAbsentCorrelationID(t *testing.T) {
},
}
- var replEventWait sync.WaitGroup
-
queueInterceptor := datastore.NewReplicationEventQueueInterceptor(datastore.NewPostgresReplicationEventQueue(glsql.NewDB(t)))
queueInterceptor.OnEnqueue(func(ctx context.Context, event datastore.ReplicationEvent, queue datastore.ReplicationEventQueue) (datastore.ReplicationEvent, error) {
- defer replEventWait.Done()
+ assert.True(t, len(queueInterceptor.GetEnqueued()) < 2, "expected only one event to be created")
return queue.Enqueue(ctx, event)
})
-
targetRepo := gitalypb.Repository{
StorageName: "praefect",
RelativePath: "/path/to/hashed/storage",
@@ -1109,11 +1102,12 @@ func TestAbsentCorrelationID(t *testing.T) {
require.NoError(t, err)
require.Equal(t, primaryAddress, streamParams.Primary().Conn.Target())
- replEventWait.Add(1) // expected only one event to be created
// must be run as it adds replication events to the queue
require.NoError(t, streamParams.RequestFinalizer())
- replEventWait.Wait() // wait until event persisted (async operation)
+ require.NoError(t, queueInterceptor.Wait(time.Minute, func(i *datastore.ReplicationEventQueueInterceptor) bool {
+ return len(i.GetEnqueuedResult()) == 1
+ }))
jobs, err := queueInterceptor.Dequeue(ctx, conf.VirtualStorages[0].Name, conf.VirtualStorages[0].Nodes[1].Storage, 1)
require.NoError(t, err)
require.Len(t, jobs, 1)
diff --git a/internal/praefect/datastore/memory.go b/internal/praefect/datastore/memory.go
index 9c8139d48..a57507ffe 100644
--- a/internal/praefect/datastore/memory.go
+++ b/internal/praefect/datastore/memory.go
@@ -216,87 +216,204 @@ func (s *memoryReplicationEventQueue) defineDest(event ReplicationEvent) eventDe
return eventDestination{virtual: event.Job.VirtualStorage, storage: event.Job.TargetNodeStorage, relativePath: event.Job.RelativePath}
}
-// ReplicationEventQueueInterceptor allows to register interceptors for `ReplicationEventQueue` interface.
-type ReplicationEventQueueInterceptor interface {
- // ReplicationEventQueue actual implementation.
- ReplicationEventQueue
- // OnEnqueue allows to set action that would be executed each time when `Enqueue` method called.
- OnEnqueue(func(context.Context, ReplicationEvent, ReplicationEventQueue) (ReplicationEvent, error))
- // OnDequeue allows to set action that would be executed each time when `Dequeue` method called.
- OnDequeue(func(context.Context, string, string, int, ReplicationEventQueue) ([]ReplicationEvent, error))
- // OnAcknowledge allows to set action that would be executed each time when `Acknowledge` method called.
- OnAcknowledge(func(context.Context, JobState, []uint64, ReplicationEventQueue) ([]uint64, error))
- // OnStartHealthUpdate allows to set action that would be executed each time when `StartHealthUpdate` method called.
- OnStartHealthUpdate(func(context.Context, <-chan time.Time, []ReplicationEvent) error)
- // OnAcknowledgeStale allows to set action that would be executed each time when `AcknowledgeStale` method called.
- OnAcknowledgeStale(func(context.Context, time.Duration) error)
+// NewReplicationEventQueueInterceptor returns interception over `ReplicationEventQueue` interface.
+func NewReplicationEventQueueInterceptor(queue ReplicationEventQueue) *ReplicationEventQueueInterceptor {
+ return &ReplicationEventQueueInterceptor{
+ ReplicationEventQueue: queue,
+ }
}
-// NewReplicationEventQueueInterceptor returns interception over `ReplicationEventQueue` interface.
-func NewReplicationEventQueueInterceptor(queue ReplicationEventQueue) ReplicationEventQueueInterceptor {
- return &replicationEventQueueInterceptor{ReplicationEventQueue: queue}
+// DequeParams is the list of parameters used for Dequeue method call.
+type DequeParams struct {
+ VirtualStorage, NodeStorage string
+ Count int
}
-type replicationEventQueueInterceptor struct {
+// AcknowledgeParams is the list of parameters used for Acknowledge method call.
+type AcknowledgeParams struct {
+ State JobState
+ IDs []uint64
+}
+
+// ReplicationEventQueueInterceptor allows to register interceptors for `ReplicationEventQueue` interface.
+// It also provides additional methods to get info about incoming and outgoing data from the underling
+// queue.
+// NOTE: it should be used for testing purposes only as it persists data in memory and doesn't clean it up.
+type ReplicationEventQueueInterceptor struct {
+ mtx sync.Mutex
ReplicationEventQueue
onEnqueue func(context.Context, ReplicationEvent, ReplicationEventQueue) (ReplicationEvent, error)
onDequeue func(context.Context, string, string, int, ReplicationEventQueue) ([]ReplicationEvent, error)
onAcknowledge func(context.Context, JobState, []uint64, ReplicationEventQueue) ([]uint64, error)
onStartHealthUpdate func(context.Context, <-chan time.Time, []ReplicationEvent) error
onAcknowledgeStale func(context.Context, time.Duration) error
+
+ enqueue []ReplicationEvent
+ enqueueResult []ReplicationEvent
+ dequeue []DequeParams
+ dequeueResult [][]ReplicationEvent
+ acknowledge []AcknowledgeParams
+ acknowledgeResult [][]uint64
}
-func (i *replicationEventQueueInterceptor) OnEnqueue(action func(context.Context, ReplicationEvent, ReplicationEventQueue) (ReplicationEvent, error)) {
+// OnEnqueue allows to set action that would be executed each time when `Enqueue` method called.
+func (i *ReplicationEventQueueInterceptor) OnEnqueue(action func(context.Context, ReplicationEvent, ReplicationEventQueue) (ReplicationEvent, error)) {
i.onEnqueue = action
}
-func (i *replicationEventQueueInterceptor) OnDequeue(action func(context.Context, string, string, int, ReplicationEventQueue) ([]ReplicationEvent, error)) {
+// OnDequeue allows to set action that would be executed each time when `Dequeue` method called.
+func (i *ReplicationEventQueueInterceptor) OnDequeue(action func(context.Context, string, string, int, ReplicationEventQueue) ([]ReplicationEvent, error)) {
i.onDequeue = action
}
-func (i *replicationEventQueueInterceptor) OnAcknowledge(action func(context.Context, JobState, []uint64, ReplicationEventQueue) ([]uint64, error)) {
+// OnAcknowledge allows to set action that would be executed each time when `Acknowledge` method called.
+func (i *ReplicationEventQueueInterceptor) OnAcknowledge(action func(context.Context, JobState, []uint64, ReplicationEventQueue) ([]uint64, error)) {
i.onAcknowledge = action
}
-func (i *replicationEventQueueInterceptor) OnStartHealthUpdate(action func(context.Context, <-chan time.Time, []ReplicationEvent) error) {
+// OnStartHealthUpdate allows to set action that would be executed each time when `StartHealthUpdate` method called.
+func (i *ReplicationEventQueueInterceptor) OnStartHealthUpdate(action func(context.Context, <-chan time.Time, []ReplicationEvent) error) {
i.onStartHealthUpdate = action
}
-func (i *replicationEventQueueInterceptor) OnAcknowledgeStale(action func(context.Context, time.Duration) error) {
+// OnAcknowledgeStale allows to set action that would be executed each time when `AcknowledgeStale` method called.
+func (i *ReplicationEventQueueInterceptor) OnAcknowledgeStale(action func(context.Context, time.Duration) error) {
i.onAcknowledgeStale = action
}
-func (i *replicationEventQueueInterceptor) Enqueue(ctx context.Context, event ReplicationEvent) (ReplicationEvent, error) {
+// Enqueue intercepts call to the Enqueue method of the underling implementation or a call back.
+// It populates storage of incoming and outgoing parameters before and after method call.
+func (i *ReplicationEventQueueInterceptor) Enqueue(ctx context.Context, event ReplicationEvent) (ReplicationEvent, error) {
+ i.mtx.Lock()
+ i.enqueue = append(i.enqueue, event)
+ i.mtx.Unlock()
+
+ var enqEvent ReplicationEvent
+ var err error
+
if i.onEnqueue != nil {
- return i.onEnqueue(ctx, event, i.ReplicationEventQueue)
+ enqEvent, err = i.onEnqueue(ctx, event, i.ReplicationEventQueue)
+ } else {
+ enqEvent, err = i.ReplicationEventQueue.Enqueue(ctx, event)
}
- return i.ReplicationEventQueue.Enqueue(ctx, event)
+
+ i.mtx.Lock()
+ i.enqueueResult = append(i.enqueueResult, enqEvent)
+ i.mtx.Unlock()
+ return enqEvent, err
}
-func (i *replicationEventQueueInterceptor) Dequeue(ctx context.Context, virtualStorage, nodeStorage string, count int) ([]ReplicationEvent, error) {
+// Dequeue intercepts call to the Dequeue method of the underling implementation or a call back.
+// It populates storage of incoming and outgoing parameters before and after method call.
+func (i *ReplicationEventQueueInterceptor) Dequeue(ctx context.Context, virtualStorage, nodeStorage string, count int) ([]ReplicationEvent, error) {
+ i.mtx.Lock()
+ i.dequeue = append(i.dequeue, DequeParams{VirtualStorage: virtualStorage, NodeStorage: nodeStorage, Count: count})
+ i.mtx.Unlock()
+
+ var deqEvents []ReplicationEvent
+ var err error
+
if i.onDequeue != nil {
- return i.onDequeue(ctx, virtualStorage, nodeStorage, count, i.ReplicationEventQueue)
+ deqEvents, err = i.onDequeue(ctx, virtualStorage, nodeStorage, count, i.ReplicationEventQueue)
+ } else {
+ deqEvents, err = i.ReplicationEventQueue.Dequeue(ctx, virtualStorage, nodeStorage, count)
}
- return i.ReplicationEventQueue.Dequeue(ctx, virtualStorage, nodeStorage, count)
+
+ i.mtx.Lock()
+ i.dequeueResult = append(i.dequeueResult, deqEvents)
+ i.mtx.Unlock()
+ return deqEvents, err
}
-func (i *replicationEventQueueInterceptor) Acknowledge(ctx context.Context, state JobState, ids []uint64) ([]uint64, error) {
+// Acknowledge intercepts call to the Acknowledge method of the underling implementation or a call back.
+// It populates storage of incoming and outgoing parameters before and after method call.
+func (i *ReplicationEventQueueInterceptor) Acknowledge(ctx context.Context, state JobState, ids []uint64) ([]uint64, error) {
+ i.mtx.Lock()
+ i.acknowledge = append(i.acknowledge, AcknowledgeParams{State: state, IDs: ids})
+ i.mtx.Unlock()
+
+ var ackIDs []uint64
+ var err error
+
if i.onAcknowledge != nil {
- return i.onAcknowledge(ctx, state, ids, i.ReplicationEventQueue)
+ ackIDs, err = i.onAcknowledge(ctx, state, ids, i.ReplicationEventQueue)
+ } else {
+ ackIDs, err = i.ReplicationEventQueue.Acknowledge(ctx, state, ids)
}
- return i.ReplicationEventQueue.Acknowledge(ctx, state, ids)
+
+ i.mtx.Lock()
+ i.acknowledgeResult = append(i.acknowledgeResult, ackIDs)
+ i.mtx.Unlock()
+ return ackIDs, err
}
-func (i *replicationEventQueueInterceptor) StartHealthUpdate(ctx context.Context, trigger <-chan time.Time, events []ReplicationEvent) error {
+// StartHealthUpdate intercepts call to the StartHealthUpdate method of the underling implementation or a call back.
+func (i *ReplicationEventQueueInterceptor) StartHealthUpdate(ctx context.Context, trigger <-chan time.Time, events []ReplicationEvent) error {
if i.onStartHealthUpdate != nil {
return i.onStartHealthUpdate(ctx, trigger, events)
}
return i.ReplicationEventQueue.StartHealthUpdate(ctx, trigger, events)
}
-func (i *replicationEventQueueInterceptor) AcknowledgeStale(ctx context.Context, staleAfter time.Duration) error {
+// AcknowledgeStale intercepts call to the AcknowledgeStale method of the underling implementation or a call back.
+func (i *ReplicationEventQueueInterceptor) AcknowledgeStale(ctx context.Context, staleAfter time.Duration) error {
if i.onAcknowledgeStale != nil {
return i.onAcknowledgeStale(ctx, staleAfter)
}
return i.ReplicationEventQueue.AcknowledgeStale(ctx, staleAfter)
}
+
+// GetEnqueued returns a list of events used for Enqueue method or a call-back invocation.
+func (i *ReplicationEventQueueInterceptor) GetEnqueued() []ReplicationEvent {
+ i.mtx.Lock()
+ defer i.mtx.Unlock()
+ return i.enqueue
+}
+
+// GetEnqueuedResult returns a list of events returned by Enqueue method or a call-back invocation.
+func (i *ReplicationEventQueueInterceptor) GetEnqueuedResult() []ReplicationEvent {
+ i.mtx.Lock()
+ defer i.mtx.Unlock()
+ return i.enqueueResult
+}
+
+// GetDequeued returns a list of parameters used for Dequeue method or a call-back invocation.
+func (i *ReplicationEventQueueInterceptor) GetDequeued() []DequeParams {
+ i.mtx.Lock()
+ defer i.mtx.Unlock()
+ return i.dequeue
+}
+
+// GetDequeuedResult returns a list of events returned after Dequeue method or a call-back invocation.
+func (i *ReplicationEventQueueInterceptor) GetDequeuedResult() [][]ReplicationEvent {
+ i.mtx.Lock()
+ defer i.mtx.Unlock()
+ return i.dequeueResult
+}
+
+// GetAcknowledge returns a list of parameters used for Acknowledge method or a call-back invocation.
+func (i *ReplicationEventQueueInterceptor) GetAcknowledge() []AcknowledgeParams {
+ i.mtx.Lock()
+ defer i.mtx.Unlock()
+ return i.acknowledge
+}
+
+// GetAcknowledgeResult returns a list of results returned after Acknowledge method or a call-back invocation.
+func (i *ReplicationEventQueueInterceptor) GetAcknowledgeResult() [][]uint64 {
+ i.mtx.Lock()
+ defer i.mtx.Unlock()
+ return i.acknowledgeResult
+}
+
+// Wait checks the condition in a loop with await until it returns true or deadline is exceeded.
+// The error is returned only in case the deadline is exceeded.
+func (i *ReplicationEventQueueInterceptor) Wait(deadline time.Duration, condition func(i *ReplicationEventQueueInterceptor) bool) error {
+ dead := time.Now().Add(deadline)
+ for !condition(i) {
+ if dead.Before(time.Now()) {
+ return context.DeadlineExceeded
+ }
+ time.Sleep(time.Millisecond * 100)
+ }
+ return nil
+}
diff --git a/internal/praefect/helper_test.go b/internal/praefect/helper_test.go
index 0c559047d..e6e8f53ee 100644
--- a/internal/praefect/helper_test.go
+++ b/internal/praefect/helper_test.go
@@ -200,7 +200,10 @@ func runPraefectServer(t testing.TB, ctx context.Context, conf config.Config, op
errQ := make(chan error)
ctx, cancel := context.WithCancel(ctx)
- go func() { errQ <- prf.Serve(listener) }()
+ go func() {
+ errQ <- prf.Serve(listener)
+ close(errQ)
+ }()
replMgrDone := startProcessBacklog(ctx, replmgr)
// dial client to praefect
diff --git a/internal/praefect/replicator.go b/internal/praefect/replicator.go
index 8d3cbdf4f..b770cfef7 100644
--- a/internal/praefect/replicator.go
+++ b/internal/praefect/replicator.go
@@ -11,6 +11,7 @@ import (
"github.com/sirupsen/logrus"
"gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/service/repository"
"gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/storage"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/helper"
"gitlab.com/gitlab-org/gitaly/v14/internal/middleware/metadatahandler"
"gitlab.com/gitlab-org/gitaly/v14/internal/praefect/config"
"gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore"
@@ -569,10 +570,14 @@ func (r ReplMgr) processBacklog(ctx context.Context, b BackoffFunc, virtualStora
logger.Info("processing started")
+ // We should make a graceful shutdown of the processing loop and don't want to interrupt
+ // in-flight operations. That is why we suppress cancellation on the provided context.
+ appCtx := ctx
+ ctx = helper.SuppressCancellation(ctx)
for {
select {
- case <-ctx.Done():
- logger.WithError(ctx.Err()).Info("processing stopped")
+ case <-appCtx.Done():
+ logger.WithError(appCtx.Err()).Info("processing stopped")
return // processing must be stopped
default:
// proceed with processing
@@ -593,8 +598,8 @@ func (r ReplMgr) processBacklog(ctx context.Context, b BackoffFunc, virtualStora
select {
case <-time.After(backoff()):
continue
- case <-ctx.Done():
- logger.WithError(ctx.Err()).Info("processing stopped")
+ case <-appCtx.Done():
+ logger.WithError(appCtx.Err()).Info("processing stopped")
return
}
}
diff --git a/internal/praefect/replicator_test.go b/internal/praefect/replicator_test.go
index 33fab625c..798005284 100644
--- a/internal/praefect/replicator_test.go
+++ b/internal/praefect/replicator_test.go
@@ -679,41 +679,6 @@ func TestProcessBacklog_FailedJobs(t *testing.T) {
defer cancel()
queueInterceptor := datastore.NewReplicationEventQueueInterceptor(datastore.NewPostgresReplicationEventQueue(glsql.NewDB(t)))
- processed := make(chan struct{})
-
- dequeues := 0
- queueInterceptor.OnDequeue(func(ctx context.Context, virtual, target string, count int, queue datastore.ReplicationEventQueue) ([]datastore.ReplicationEvent, error) {
- events, err := queue.Dequeue(ctx, virtual, target, count)
- if len(events) > 0 {
- dequeues++
- }
- return events, err
- })
-
- completedAcks := 0
- failedAcks := 0
- deadAcks := 0
-
- queueInterceptor.OnAcknowledge(func(ctx context.Context, state datastore.JobState, ids []uint64, queue datastore.ReplicationEventQueue) ([]uint64, error) {
- switch state {
- case datastore.JobStateCompleted:
- require.Equal(t, []uint64{1}, ids)
- completedAcks++
- case datastore.JobStateFailed:
- require.Equal(t, []uint64{2}, ids)
- failedAcks++
- case datastore.JobStateDead:
- require.Equal(t, []uint64{2}, ids)
- deadAcks++
- default:
- require.FailNow(t, "acknowledge is not expected", state)
- }
- ackIDs, err := queue.Acknowledge(ctx, state, ids)
- if completedAcks+failedAcks+deadAcks == 4 {
- close(processed)
- }
- return ackIDs, err
- })
// this job exists to verify that replication works
okJob := datastore.ReplicationJob{
@@ -750,19 +715,30 @@ func TestProcessBacklog_FailedJobs(t *testing.T) {
)
replMgrDone := startProcessBacklog(ctx, replMgr)
- select {
- case <-processed:
- cancel()
- case <-time.After(60 * time.Second):
- // strongly depends on the processing capacity
- t.Fatal("time limit expired for job to complete")
+ require.NoError(t, queueInterceptor.Wait(time.Minute, func(i *datastore.ReplicationEventQueueInterceptor) bool {
+ return len(i.GetAcknowledgeResult()) == 4
+ }))
+ cancel()
+ <-replMgrDone
+
+ var dequeueCalledEffectively int
+ for _, res := range queueInterceptor.GetDequeuedResult() {
+ if len(res) > 0 {
+ dequeueCalledEffectively++
+ }
}
+ require.Equal(t, 3, dequeueCalledEffectively, "expected 1 deque to get [okJob, failJob] and 2 more for [failJob] only")
- require.Equal(t, 3, dequeues, "expected 1 deque to get [okJob, failJob] and 2 more for [failJob] only")
- require.Equal(t, 2, failedAcks)
- require.Equal(t, 1, deadAcks)
- require.Equal(t, 1, completedAcks)
- <-replMgrDone
+ expAcks := map[datastore.JobState][]uint64{
+ datastore.JobStateFailed: {2, 2},
+ datastore.JobStateDead: {2},
+ datastore.JobStateCompleted: {1},
+ }
+ acks := map[datastore.JobState][]uint64{}
+ for _, ack := range queueInterceptor.GetAcknowledge() {
+ acks[ack.State] = append(acks[ack.State], ack.IDs...)
+ }
+ require.Equal(t, expAcks, acks)
}
func TestProcessBacklog_Success(t *testing.T) {
@@ -803,21 +779,18 @@ func TestProcessBacklog_Success(t *testing.T) {
defer cancel()
queueInterceptor := datastore.NewReplicationEventQueueInterceptor(datastore.NewPostgresReplicationEventQueue(glsql.NewDB(t)))
-
- processed := make(chan struct{})
queueInterceptor.OnAcknowledge(func(ctx context.Context, state datastore.JobState, ids []uint64, queue datastore.ReplicationEventQueue) ([]uint64, error) {
ackIDs, err := queue.Acknowledge(ctx, state, ids)
if len(ids) > 0 {
- require.Equal(t, datastore.JobStateCompleted, state, "no fails expected")
- require.Equal(t, []uint64{1, 3, 4}, ids, "all jobs must be processed at once")
- close(processed)
+ assert.Equal(t, datastore.JobStateCompleted, state, "no fails expected")
+ assert.Equal(t, []uint64{1, 3, 4}, ids, "all jobs must be processed at once")
}
return ackIDs, err
})
var healthUpdated int32
queueInterceptor.OnStartHealthUpdate(func(ctx context.Context, trigger <-chan time.Time, events []datastore.ReplicationEvent) error {
- require.Len(t, events, 3)
+ assert.Len(t, events, 3)
atomic.AddInt32(&healthUpdated, 1)
return nil
})
@@ -892,18 +865,18 @@ func TestProcessBacklog_Success(t *testing.T) {
)
replMgrDone := startProcessBacklog(ctx, replMgr)
- select {
- case <-processed:
- require.EqualValues(t, 1, atomic.LoadInt32(&healthUpdated), "health update should be called")
- cancel()
- case <-time.After(30 * time.Second):
- // strongly depends on the processing capacity
- t.Fatal("time limit expired for job to complete")
- }
+ require.NoError(t, queueInterceptor.Wait(time.Minute, func(i *datastore.ReplicationEventQueueInterceptor) bool {
+ var ids []uint64
+ for _, params := range i.GetAcknowledge() {
+ ids = append(ids, params.IDs...)
+ }
+ return len(ids) == 3
+ }))
+ cancel()
+ <-replMgrDone
require.NoDirExists(t, fullNewPath1, "repository must be moved from %q to the new location", fullNewPath1)
require.True(t, storage.IsGitDirectory(fullNewPath2), "repository must exist at new last RenameRepository location")
- <-replMgrDone
}
func TestReplMgrProcessBacklog_OnlyHealthyNodes(t *testing.T) {
diff --git a/internal/praefect/server_test.go b/internal/praefect/server_test.go
index 275f21903..e824cb600 100644
--- a/internal/praefect/server_test.go
+++ b/internal/praefect/server_test.go
@@ -534,20 +534,7 @@ func TestRemoveRepository(t *testing.T) {
verifyReposExistence(t, codes.OK)
- // TODO: once https://gitlab.com/gitlab-org/gitaly/-/issues/2703 is done and the replication manager supports
- // graceful shutdown, we can remove this code that waits for jobs to be complete
queueInterceptor := datastore.NewReplicationEventQueueInterceptor(datastore.NewPostgresReplicationEventQueue(glsql.NewDB(t)))
- jobsDoneCh := make(chan struct{}, 2)
- queueInterceptor.OnAcknowledge(func(ctx context.Context, state datastore.JobState, ids []uint64, queue datastore.ReplicationEventQueue) ([]uint64, error) {
- defer func() {
- if state == datastore.JobStateCompleted {
- jobsDoneCh <- struct{}{}
- }
- }()
-
- return queue.Acknowledge(ctx, state, ids)
- })
-
repoStore := defaultRepoStore(praefectCfg)
txMgr := defaultTxMgr(praefectCfg)
nodeMgr, err := nodes.NewManager(testhelper.DiscardTestEntry(t), praefectCfg, nil,
@@ -579,9 +566,15 @@ func TestRemoveRepository(t *testing.T) {
})
require.NoError(t, err)
- for i := 0; i < cap(jobsDoneCh); i++ {
- <-jobsDoneCh
- }
+ require.NoError(t, queueInterceptor.Wait(time.Minute, func(i *datastore.ReplicationEventQueueInterceptor) bool {
+ var compl int
+ for _, ack := range i.GetAcknowledge() {
+ if ack.State == datastore.JobStateCompleted {
+ compl++
+ }
+ }
+ return compl == 2
+ }))
verifyReposExistence(t, codes.NotFound)
}
@@ -632,15 +625,7 @@ func TestRenameRepository(t *testing.T) {
repoPaths[i] = filepath.Join(gitalyCfg.Storages[0].Path, relativePath)
}
- var canCheckRepo sync.WaitGroup
- canCheckRepo.Add(2)
-
evq := datastore.NewReplicationEventQueueInterceptor(datastore.NewPostgresReplicationEventQueue(glsql.NewDB(t)))
- evq.OnAcknowledge(func(ctx context.Context, state datastore.JobState, ids []uint64, queue datastore.ReplicationEventQueue) ([]uint64, error) {
- defer canCheckRepo.Done()
- return queue.Acknowledge(ctx, state, ids)
- })
-
ctx, cancel := testhelper.Context()
defer cancel()
@@ -677,7 +662,9 @@ func TestRenameRepository(t *testing.T) {
// wait until replication jobs propagate changes to other storages
// as we don't know which one will be used to check because of reads distribution
- canCheckRepo.Wait()
+ require.NoError(t, evq.Wait(time.Minute, func(i *datastore.ReplicationEventQueueInterceptor) bool {
+ return len(i.GetAcknowledge()) == 2
+ }))
for _, oldLocation := range repoPaths {
pollUntilRemoved(t, oldLocation, time.After(10*time.Second))