diff options
author | Pavlo Strokov <pstrokov@gitlab.com> | 2021-09-21 16:59:35 +0300 |
---|---|---|
committer | Pavlo Strokov <pstrokov@gitlab.com> | 2021-10-05 15:44:44 +0300 |
commit | 7589ebf9bedf761f43f72194c07010ebf8fc661e (patch) | |
tree | a77da976394b9372fddb3be6746bb5ad3fe91e1b | |
parent | 40e3e5e971f2043c947b1bc347090c6c0e7d63df (diff) |
replication: Backoff function provider
To support parallel processing of the replication events
we need to refactor usage of the backoff function. Now it
is a factory that creates backoff functions on demand.
With that we could create a backoff function for each
goroutine, so they are independent one from another.
-rw-r--r-- | cmd/praefect/main.go | 2 | ||||
-rw-r--r-- | internal/praefect/helper_test.go | 6 | ||||
-rw-r--r-- | internal/praefect/replicator.go | 64 | ||||
-rw-r--r-- | internal/praefect/replicator_test.go | 6 |
4 files changed, 44 insertions, 34 deletions
diff --git a/cmd/praefect/main.go b/cmd/praefect/main.go index 6470c52c9..abed88189 100644 --- a/cmd/praefect/main.go +++ b/cmd/praefect/main.go @@ -426,7 +426,7 @@ func run(cfgs []starter.Config, conf config.Config) error { return fmt.Errorf("unable to start the bootstrap: %v", err) } - go repl.ProcessBacklog(ctx, praefect.ExpBackoffFunc(1*time.Second, 5*time.Second)) + go repl.ProcessBacklog(ctx, praefect.ExpBackoffFactory{Start: time.Second, Max: 5 * time.Second}) logger.Info("background started: processing of the replication events") repl.ProcessStale(ctx, 30*time.Second, time.Minute) logger.Info("background started: processing of the stale replication events") diff --git a/internal/praefect/helper_test.go b/internal/praefect/helper_test.go index e6e8f53ee..625a29697 100644 --- a/internal/praefect/helper_test.go +++ b/internal/praefect/helper_test.go @@ -54,7 +54,9 @@ func testConfig(backends int) config.Config { return cfg } -func noopBackoffFunc() (backoff, backoffReset) { +type noopBackoffFactory struct{} + +func (noopBackoffFactory) Create() (Backoff, BackoffReset) { return func() time.Duration { return 0 }, func() {} @@ -287,7 +289,7 @@ func startProcessBacklog(ctx context.Context, replMgr ReplMgr) <-chan struct{} { done := make(chan struct{}) go func() { defer close(done) - replMgr.ProcessBacklog(ctx, noopBackoffFunc) + replMgr.ProcessBacklog(ctx, noopBackoffFactory{}) }() return done } diff --git a/internal/praefect/replicator.go b/internal/praefect/replicator.go index b770cfef7..71fa2a8f1 100644 --- a/internal/praefect/replicator.go +++ b/internal/praefect/replicator.go @@ -486,34 +486,36 @@ const ( logWithVirtualStorage = "virtual_storage" ) -type ( - backoff func() time.Duration - backoffReset func() -) +// ExpBackoffFactory creates exponentially growing durations. +type ExpBackoffFactory struct { + Start, Max time.Duration +} -// BackoffFunc is a function that n turn provides a pair of functions backoff and backoffReset -type BackoffFunc func() (backoff, backoffReset) - -// ExpBackoffFunc generates a backoffFunc based off of start and max time durations -func ExpBackoffFunc(start time.Duration, max time.Duration) BackoffFunc { - return func() (backoff, backoffReset) { - const factor = 2 - duration := start - - return func() time.Duration { - defer func() { - duration *= time.Duration(factor) - if (duration) >= max { - duration = max - } - }() - return duration - }, func() { - duration = start - } - } +// Create returns a backoff function based on Start and Max time durations. +func (b ExpBackoffFactory) Create() (Backoff, BackoffReset) { + const factor = 2 + duration := b.Start + + return func() time.Duration { + defer func() { + duration *= time.Duration(factor) + if (duration) >= b.Max { + duration = b.Max + } + }() + return duration + }, func() { + duration = b.Start + } } +type ( + // Backoff returns next backoff. + Backoff func() time.Duration + // BackoffReset resets backoff provider. + BackoffReset func() +) + func getCorrelationID(params datastore.Params) string { correlationID := "" if val, found := params[metadatahandler.CorrelationIDKey]; found { @@ -522,10 +524,16 @@ func getCorrelationID(params datastore.Params) string { return correlationID } +// BackoffFactory creates backoff function and a reset pair for it. +type BackoffFactory interface { + // Create return new backoff provider and a reset function for it. + Create() (Backoff, BackoffReset) +} + // ProcessBacklog starts processing of queued jobs. // It will be processing jobs until ctx is Done. ProcessBacklog // blocks until all backlog processing goroutines have returned -func (r ReplMgr) ProcessBacklog(ctx context.Context, b BackoffFunc) { +func (r ReplMgr) ProcessBacklog(ctx context.Context, b BackoffFactory) { var wg sync.WaitGroup for _, virtualStorage := range r.virtualStorages { @@ -564,9 +572,9 @@ func (r ReplMgr) ProcessStale(ctx context.Context, checkPeriod, staleAfter time. return done } -func (r ReplMgr) processBacklog(ctx context.Context, b BackoffFunc, virtualStorage string) { +func (r ReplMgr) processBacklog(ctx context.Context, b BackoffFactory, virtualStorage string) { logger := r.log.WithField(logWithVirtualStorage, virtualStorage) - backoff, reset := b() + backoff, reset := b.Create() logger.Info("processing started") diff --git a/internal/praefect/replicator_test.go b/internal/praefect/replicator_test.go index 798005284..d52e86f92 100644 --- a/internal/praefect/replicator_test.go +++ b/internal/praefect/replicator_test.go @@ -178,7 +178,7 @@ func TestReplMgr_ProcessBacklog(t *testing.T) { WithDelayMetric(&mockReplicationDelayHistogramVec), ) - replMgr.ProcessBacklog(ctx, ExpBackoffFunc(time.Hour, 0)) + replMgr.ProcessBacklog(ctx, ExpBackoffFactory{Start: time.Hour, Max: 0}) logEntries := loggerHook.AllEntries() require.True(t, len(logEntries) > 3, "expected at least 4 log entries to be present") @@ -1024,7 +1024,7 @@ func TestProcessBacklog_ReplicatesToReadOnlyPrimary(t *testing.T) { <-replMgrDone } -func TestBackoff(t *testing.T) { +func TestBackoffFactory(t *testing.T) { start := 1 * time.Microsecond max := 6 * time.Microsecond expectedBackoffs := []time.Duration{ @@ -1035,7 +1035,7 @@ func TestBackoff(t *testing.T) { 6 * time.Microsecond, 6 * time.Microsecond, } - b, reset := ExpBackoffFunc(start, max)() + b, reset := ExpBackoffFactory{Start: start, Max: max}.Create() for _, expectedBackoff := range expectedBackoffs { require.Equal(t, expectedBackoff, b()) } |