diff options
author | Paul Okstad <pokstad@gitlab.com> | 2020-01-15 20:22:45 +0300 |
---|---|---|
committer | Paul Okstad <pokstad@gitlab.com> | 2020-01-15 20:22:45 +0300 |
commit | b9b01d4ca58c023e609de1f04efb868daadc2e63 (patch) | |
tree | 69fe3ec56cf6bb6c8102dfb4681cd32ae1ed22f3 | |
parent | 4d4ac3c1a974ba11164809f20fb4c24853b8012a (diff) | |
parent | 490a639bfb875d507d112242b7b1bdbbd2fe231b (diff) |
Merge branch 'jc-replicator-backoff' into 'master'
Add exponential backoff to replication manager
Closes #2349
See merge request gitlab-org/gitaly!1746
-rw-r--r-- | changelogs/unreleased/jc-replicator-backoff.yml | 5 | ||||
-rw-r--r-- | internal/praefect/replicator.go | 49 | ||||
-rw-r--r-- | internal/praefect/replicator_test.go | 21 |
3 files changed, 59 insertions, 16 deletions
diff --git a/changelogs/unreleased/jc-replicator-backoff.yml b/changelogs/unreleased/jc-replicator-backoff.yml new file mode 100644 index 000000000..26dfd3e49 --- /dev/null +++ b/changelogs/unreleased/jc-replicator-backoff.yml @@ -0,0 +1,5 @@ +--- +title: Add exponential backoff to replication manager +merge_request: 1746 +author: +type: changed diff --git a/internal/praefect/replicator.go b/internal/praefect/replicator.go index 7061e2f6b..2695858e5 100644 --- a/internal/praefect/replicator.go +++ b/internal/praefect/replicator.go @@ -238,41 +238,46 @@ func (r ReplMgr) ScheduleReplication(ctx context.Context, repo models.Repository } const ( - jobFetchInterval = 10 * time.Millisecond logWithReplJobID = "replication_job_id" logWithReplSource = "replication_job_source" logWithReplTarget = "replication_job_target" ) +func expBackoff(start time.Duration, max time.Duration) (backoff func() time.Duration, reset func()) { + duration := start + factor := 2 + + return func() time.Duration { + defer func() { + duration *= time.Duration(factor) + if (duration) >= max { + duration = max + } + }() + return duration + }, func() { + duration = start + } +} + // ProcessBacklog will process queued jobs. It will block while processing jobs. func (r ReplMgr) ProcessBacklog(ctx context.Context) error { + backoff, reset := expBackoff(10*time.Millisecond, 1*time.Second) + for { nodes, err := r.datastore.GetStorageNodes() if err != nil { return nil } + var totalJobs int for _, node := range nodes { jobs, err := r.datastore.GetJobs(datastore.JobStateReady, node.Storage, 10) if err != nil { return err } - if len(jobs) == 0 { - r.log.WithFields(logrus.Fields{ - "node_storage": node.Storage, - "recheck_interval": jobFetchInterval, - }).Trace("no jobs") - - select { - // TODO: exponential backoff when no queries are returned - case <-time.After(jobFetchInterval): - continue - - case <-ctx.Done(): - return ctx.Err() - } - } + totalJobs += len(jobs) for _, job := range jobs { r.log.WithFields(logrus.Fields{ @@ -284,6 +289,18 @@ func (r ReplMgr) ProcessBacklog(ctx context.Context) error { r.processReplJob(ctx, job) } } + + if totalJobs == 0 { + select { + case <-time.After(backoff()): + continue + + case <-ctx.Done(): + return ctx.Err() + } + } + + reset() } } diff --git a/internal/praefect/replicator_test.go b/internal/praefect/replicator_test.go index b90381d74..96d1eb8ad 100644 --- a/internal/praefect/replicator_test.go +++ b/internal/praefect/replicator_test.go @@ -7,6 +7,7 @@ import ( "os" "path/filepath" "testing" + "time" "github.com/stretchr/testify/require" gitalyauth "gitlab.com/gitlab-org/gitaly/auth" @@ -184,6 +185,26 @@ func TestConfirmReplication(t *testing.T) { require.False(t, equal) } +func TestBackoff(t *testing.T) { + start := 1 * time.Microsecond + max := 6 * time.Microsecond + expectedBackoffs := []time.Duration{ + 1 * time.Microsecond, + 2 * time.Microsecond, + 4 * time.Microsecond, + 6 * time.Microsecond, + 6 * time.Microsecond, + 6 * time.Microsecond, + } + b, reset := expBackoff(start, max) + for _, expectedBackoff := range expectedBackoffs { + require.Equal(t, expectedBackoff, b()) + } + + reset() + require.Equal(t, start, b()) +} + func runFullGitalyServer(t *testing.T) (*grpc.Server, string) { server := serverPkg.NewInsecure(RubyServer) serverSocketPath := testhelper.GetTemporaryGitalySocketFileName() |