diff options
author | Sami Hiltunen <shiltunen@gitlab.com> | 2021-02-15 15:36:02 +0300 |
---|---|---|
committer | Sami Hiltunen <shiltunen@gitlab.com> | 2021-02-15 15:36:02 +0300 |
commit | b0d84984e93588c1d7f9d73bf0fce082f61d00f1 (patch) | |
tree | ed41a52adbef72c5ed7b27728e9dd95388f7e61d /internal/praefect/replicator.go | |
parent | 632f04c7badfaa36984310099798c01d1a42b906 (diff) |
fix flaky test TestReplMgr_ProcessBacklog
TestReplMgr_ProcessBacklog is asserting log statements outputted
by ReplMgr when processing jobs. The test is currently waiting until
the context is canceled before checking the log assertions. The
context gets canceled when acknowleding a job in the queue. Not all
log statements are printed out at that point yet, namely the
'replication job processing finished' would only be printed after
acknowledging the job.
This commit fixes the race by making ProcessBacklog block until all
goroutines launched from there have returned. The flaky test case
block on the call, while rest of the code using retains the existing
behavior of running it asynchronously.
Diffstat (limited to 'internal/praefect/replicator.go')
-rw-r--r-- | internal/praefect/replicator.go | 14 |
1 files changed, 12 insertions, 2 deletions
diff --git a/internal/praefect/replicator.go b/internal/praefect/replicator.go index 039c4c4ee..9cc09f68a 100644 --- a/internal/praefect/replicator.go +++ b/internal/praefect/replicator.go @@ -4,6 +4,7 @@ import ( "context" "errors" "fmt" + "sync" "time" "github.com/prometheus/client_golang/prometheus" @@ -405,11 +406,20 @@ func getCorrelationID(params datastore.Params) string { } // ProcessBacklog starts processing of queued jobs. -// It will be processing jobs until ctx is Done. +// It will be processing jobs until ctx is Done. ProcessBacklog +// blocks until all backlog processing goroutines have returned func (r ReplMgr) ProcessBacklog(ctx context.Context, b BackoffFunc) { + var wg sync.WaitGroup + for _, virtualStorage := range r.virtualStorages { - go r.processBacklog(ctx, b, virtualStorage) + wg.Add(1) + go func(virtualStorage string) { + defer wg.Done() + r.processBacklog(ctx, b, virtualStorage) + }(virtualStorage) } + + wg.Wait() } // ProcessStale starts a background process to acknowledge stale replication jobs. |