Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPaul Okstad <pokstad@gitlab.com>2020-01-15 20:22:45 +0300
committerPaul Okstad <pokstad@gitlab.com>2020-01-15 20:22:45 +0300
commitb9b01d4ca58c023e609de1f04efb868daadc2e63 (patch)
tree69fe3ec56cf6bb6c8102dfb4681cd32ae1ed22f3
parent4d4ac3c1a974ba11164809f20fb4c24853b8012a (diff)
parent490a639bfb875d507d112242b7b1bdbbd2fe231b (diff)
Merge branch 'jc-replicator-backoff' into 'master'
Add exponential backoff to replication manager Closes #2349 See merge request gitlab-org/gitaly!1746
-rw-r--r--changelogs/unreleased/jc-replicator-backoff.yml5
-rw-r--r--internal/praefect/replicator.go49
-rw-r--r--internal/praefect/replicator_test.go21
3 files changed, 59 insertions, 16 deletions
diff --git a/changelogs/unreleased/jc-replicator-backoff.yml b/changelogs/unreleased/jc-replicator-backoff.yml
new file mode 100644
index 000000000..26dfd3e49
--- /dev/null
+++ b/changelogs/unreleased/jc-replicator-backoff.yml
@@ -0,0 +1,5 @@
+---
+title: Add exponential backoff to replication manager
+merge_request: 1746
+author:
+type: changed
diff --git a/internal/praefect/replicator.go b/internal/praefect/replicator.go
index 7061e2f6b..2695858e5 100644
--- a/internal/praefect/replicator.go
+++ b/internal/praefect/replicator.go
@@ -238,41 +238,46 @@ func (r ReplMgr) ScheduleReplication(ctx context.Context, repo models.Repository
}
const (
- jobFetchInterval = 10 * time.Millisecond
logWithReplJobID = "replication_job_id"
logWithReplSource = "replication_job_source"
logWithReplTarget = "replication_job_target"
)
+func expBackoff(start time.Duration, max time.Duration) (backoff func() time.Duration, reset func()) {
+ duration := start
+ factor := 2
+
+ return func() time.Duration {
+ defer func() {
+ duration *= time.Duration(factor)
+ if (duration) >= max {
+ duration = max
+ }
+ }()
+ return duration
+ }, func() {
+ duration = start
+ }
+}
+
// ProcessBacklog will process queued jobs. It will block while processing jobs.
func (r ReplMgr) ProcessBacklog(ctx context.Context) error {
+ backoff, reset := expBackoff(10*time.Millisecond, 1*time.Second)
+
for {
nodes, err := r.datastore.GetStorageNodes()
if err != nil {
return nil
}
+ var totalJobs int
for _, node := range nodes {
jobs, err := r.datastore.GetJobs(datastore.JobStateReady, node.Storage, 10)
if err != nil {
return err
}
- if len(jobs) == 0 {
- r.log.WithFields(logrus.Fields{
- "node_storage": node.Storage,
- "recheck_interval": jobFetchInterval,
- }).Trace("no jobs")
-
- select {
- // TODO: exponential backoff when no queries are returned
- case <-time.After(jobFetchInterval):
- continue
-
- case <-ctx.Done():
- return ctx.Err()
- }
- }
+ totalJobs += len(jobs)
for _, job := range jobs {
r.log.WithFields(logrus.Fields{
@@ -284,6 +289,18 @@ func (r ReplMgr) ProcessBacklog(ctx context.Context) error {
r.processReplJob(ctx, job)
}
}
+
+ if totalJobs == 0 {
+ select {
+ case <-time.After(backoff()):
+ continue
+
+ case <-ctx.Done():
+ return ctx.Err()
+ }
+ }
+
+ reset()
}
}
diff --git a/internal/praefect/replicator_test.go b/internal/praefect/replicator_test.go
index b90381d74..96d1eb8ad 100644
--- a/internal/praefect/replicator_test.go
+++ b/internal/praefect/replicator_test.go
@@ -7,6 +7,7 @@ import (
"os"
"path/filepath"
"testing"
+ "time"
"github.com/stretchr/testify/require"
gitalyauth "gitlab.com/gitlab-org/gitaly/auth"
@@ -184,6 +185,26 @@ func TestConfirmReplication(t *testing.T) {
require.False(t, equal)
}
+func TestBackoff(t *testing.T) {
+ start := 1 * time.Microsecond
+ max := 6 * time.Microsecond
+ expectedBackoffs := []time.Duration{
+ 1 * time.Microsecond,
+ 2 * time.Microsecond,
+ 4 * time.Microsecond,
+ 6 * time.Microsecond,
+ 6 * time.Microsecond,
+ 6 * time.Microsecond,
+ }
+ b, reset := expBackoff(start, max)
+ for _, expectedBackoff := range expectedBackoffs {
+ require.Equal(t, expectedBackoff, b())
+ }
+
+ reset()
+ require.Equal(t, start, b())
+}
+
func runFullGitalyServer(t *testing.T) (*grpc.Server, string) {
server := serverPkg.NewInsecure(RubyServer)
serverSocketPath := testhelper.GetTemporaryGitalySocketFileName()