Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJohn Cai <jcai@gitlab.com>2020-01-10 03:42:07 +0300
committerJohn Cai <jcai@gitlab.com>2020-01-15 03:42:13 +0300
commit490a639bfb875d507d112242b7b1bdbbd2fe231b (patch)
treea45b5c279fbce049140df0817e6840b63eb95446 /internal/praefect/replicator.go
parentb44242703d8e1975bd2b64f09e816ea8349e8314 (diff)
Add exponential backoff to replication manager
Diffstat (limited to 'internal/praefect/replicator.go')
-rw-r--r--internal/praefect/replicator.go49
1 files changed, 33 insertions, 16 deletions
diff --git a/internal/praefect/replicator.go b/internal/praefect/replicator.go
index 7061e2f6b..2695858e5 100644
--- a/internal/praefect/replicator.go
+++ b/internal/praefect/replicator.go
@@ -238,41 +238,46 @@ func (r ReplMgr) ScheduleReplication(ctx context.Context, repo models.Repository
}
const (
- jobFetchInterval = 10 * time.Millisecond
logWithReplJobID = "replication_job_id"
logWithReplSource = "replication_job_source"
logWithReplTarget = "replication_job_target"
)
+func expBackoff(start time.Duration, max time.Duration) (backoff func() time.Duration, reset func()) {
+ duration := start
+ factor := 2
+
+ return func() time.Duration {
+ defer func() {
+ duration *= time.Duration(factor)
+ if (duration) >= max {
+ duration = max
+ }
+ }()
+ return duration
+ }, func() {
+ duration = start
+ }
+}
+
// ProcessBacklog will process queued jobs. It will block while processing jobs.
func (r ReplMgr) ProcessBacklog(ctx context.Context) error {
+ backoff, reset := expBackoff(10*time.Millisecond, 1*time.Second)
+
for {
nodes, err := r.datastore.GetStorageNodes()
if err != nil {
return nil
}
+ var totalJobs int
for _, node := range nodes {
jobs, err := r.datastore.GetJobs(datastore.JobStateReady, node.Storage, 10)
if err != nil {
return err
}
- if len(jobs) == 0 {
- r.log.WithFields(logrus.Fields{
- "node_storage": node.Storage,
- "recheck_interval": jobFetchInterval,
- }).Trace("no jobs")
-
- select {
- // TODO: exponential backoff when no queries are returned
- case <-time.After(jobFetchInterval):
- continue
-
- case <-ctx.Done():
- return ctx.Err()
- }
- }
+ totalJobs += len(jobs)
for _, job := range jobs {
r.log.WithFields(logrus.Fields{
@@ -284,6 +289,18 @@ func (r ReplMgr) ProcessBacklog(ctx context.Context) error {
r.processReplJob(ctx, job)
}
}
+
+ if totalJobs == 0 {
+ select {
+ case <-time.After(backoff()):
+ continue
+
+ case <-ctx.Done():
+ return ctx.Err()
+ }
+ }
+
+ reset()
}
}