Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPavlo Strokov <pstrokov@gitlab.com>2021-09-21 16:59:35 +0300
committerPavlo Strokov <pstrokov@gitlab.com>2021-10-05 15:44:44 +0300
commit7589ebf9bedf761f43f72194c07010ebf8fc661e (patch)
treea77da976394b9372fddb3be6746bb5ad3fe91e1b
parent40e3e5e971f2043c947b1bc347090c6c0e7d63df (diff)
replication: Backoff function provider
To support parallel processing of the replication events we need to refactor usage of the backoff function. Now it is a factory that creates backoff functions on demand. With that we could create a backoff function for each goroutine, so they are independent one from another.
-rw-r--r--cmd/praefect/main.go2
-rw-r--r--internal/praefect/helper_test.go6
-rw-r--r--internal/praefect/replicator.go64
-rw-r--r--internal/praefect/replicator_test.go6
4 files changed, 44 insertions, 34 deletions
diff --git a/cmd/praefect/main.go b/cmd/praefect/main.go
index 6470c52c9..abed88189 100644
--- a/cmd/praefect/main.go
+++ b/cmd/praefect/main.go
@@ -426,7 +426,7 @@ func run(cfgs []starter.Config, conf config.Config) error {
return fmt.Errorf("unable to start the bootstrap: %v", err)
}
- go repl.ProcessBacklog(ctx, praefect.ExpBackoffFunc(1*time.Second, 5*time.Second))
+ go repl.ProcessBacklog(ctx, praefect.ExpBackoffFactory{Start: time.Second, Max: 5 * time.Second})
logger.Info("background started: processing of the replication events")
repl.ProcessStale(ctx, 30*time.Second, time.Minute)
logger.Info("background started: processing of the stale replication events")
diff --git a/internal/praefect/helper_test.go b/internal/praefect/helper_test.go
index e6e8f53ee..625a29697 100644
--- a/internal/praefect/helper_test.go
+++ b/internal/praefect/helper_test.go
@@ -54,7 +54,9 @@ func testConfig(backends int) config.Config {
return cfg
}
-func noopBackoffFunc() (backoff, backoffReset) {
+type noopBackoffFactory struct{}
+
+func (noopBackoffFactory) Create() (Backoff, BackoffReset) {
return func() time.Duration {
return 0
}, func() {}
@@ -287,7 +289,7 @@ func startProcessBacklog(ctx context.Context, replMgr ReplMgr) <-chan struct{} {
done := make(chan struct{})
go func() {
defer close(done)
- replMgr.ProcessBacklog(ctx, noopBackoffFunc)
+ replMgr.ProcessBacklog(ctx, noopBackoffFactory{})
}()
return done
}
diff --git a/internal/praefect/replicator.go b/internal/praefect/replicator.go
index b770cfef7..71fa2a8f1 100644
--- a/internal/praefect/replicator.go
+++ b/internal/praefect/replicator.go
@@ -486,34 +486,36 @@ const (
logWithVirtualStorage = "virtual_storage"
)
-type (
- backoff func() time.Duration
- backoffReset func()
-)
+// ExpBackoffFactory creates exponentially growing durations.
+type ExpBackoffFactory struct {
+ Start, Max time.Duration
+}
-// BackoffFunc is a function that n turn provides a pair of functions backoff and backoffReset
-type BackoffFunc func() (backoff, backoffReset)
-
-// ExpBackoffFunc generates a backoffFunc based off of start and max time durations
-func ExpBackoffFunc(start time.Duration, max time.Duration) BackoffFunc {
- return func() (backoff, backoffReset) {
- const factor = 2
- duration := start
-
- return func() time.Duration {
- defer func() {
- duration *= time.Duration(factor)
- if (duration) >= max {
- duration = max
- }
- }()
- return duration
- }, func() {
- duration = start
- }
- }
+// Create returns a backoff function based on Start and Max time durations.
+func (b ExpBackoffFactory) Create() (Backoff, BackoffReset) {
+ const factor = 2
+ duration := b.Start
+
+ return func() time.Duration {
+ defer func() {
+ duration *= time.Duration(factor)
+ if (duration) >= b.Max {
+ duration = b.Max
+ }
+ }()
+ return duration
+ }, func() {
+ duration = b.Start
+ }
}
+type (
+ // Backoff returns next backoff.
+ Backoff func() time.Duration
+ // BackoffReset resets backoff provider.
+ BackoffReset func()
+)
+
func getCorrelationID(params datastore.Params) string {
correlationID := ""
if val, found := params[metadatahandler.CorrelationIDKey]; found {
@@ -522,10 +524,16 @@ func getCorrelationID(params datastore.Params) string {
return correlationID
}
+// BackoffFactory creates backoff function and a reset pair for it.
+type BackoffFactory interface {
+ // Create return new backoff provider and a reset function for it.
+ Create() (Backoff, BackoffReset)
+}
+
// ProcessBacklog starts processing of queued jobs.
// It will be processing jobs until ctx is Done. ProcessBacklog
// blocks until all backlog processing goroutines have returned
-func (r ReplMgr) ProcessBacklog(ctx context.Context, b BackoffFunc) {
+func (r ReplMgr) ProcessBacklog(ctx context.Context, b BackoffFactory) {
var wg sync.WaitGroup
for _, virtualStorage := range r.virtualStorages {
@@ -564,9 +572,9 @@ func (r ReplMgr) ProcessStale(ctx context.Context, checkPeriod, staleAfter time.
return done
}
-func (r ReplMgr) processBacklog(ctx context.Context, b BackoffFunc, virtualStorage string) {
+func (r ReplMgr) processBacklog(ctx context.Context, b BackoffFactory, virtualStorage string) {
logger := r.log.WithField(logWithVirtualStorage, virtualStorage)
- backoff, reset := b()
+ backoff, reset := b.Create()
logger.Info("processing started")
diff --git a/internal/praefect/replicator_test.go b/internal/praefect/replicator_test.go
index 798005284..d52e86f92 100644
--- a/internal/praefect/replicator_test.go
+++ b/internal/praefect/replicator_test.go
@@ -178,7 +178,7 @@ func TestReplMgr_ProcessBacklog(t *testing.T) {
WithDelayMetric(&mockReplicationDelayHistogramVec),
)
- replMgr.ProcessBacklog(ctx, ExpBackoffFunc(time.Hour, 0))
+ replMgr.ProcessBacklog(ctx, ExpBackoffFactory{Start: time.Hour, Max: 0})
logEntries := loggerHook.AllEntries()
require.True(t, len(logEntries) > 3, "expected at least 4 log entries to be present")
@@ -1024,7 +1024,7 @@ func TestProcessBacklog_ReplicatesToReadOnlyPrimary(t *testing.T) {
<-replMgrDone
}
-func TestBackoff(t *testing.T) {
+func TestBackoffFactory(t *testing.T) {
start := 1 * time.Microsecond
max := 6 * time.Microsecond
expectedBackoffs := []time.Duration{
@@ -1035,7 +1035,7 @@ func TestBackoff(t *testing.T) {
6 * time.Microsecond,
6 * time.Microsecond,
}
- b, reset := ExpBackoffFunc(start, max)()
+ b, reset := ExpBackoffFactory{Start: start, Max: max}.Create()
for _, expectedBackoff := range expectedBackoffs {
require.Equal(t, expectedBackoff, b())
}