Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorjramsay <jcai@gitlab.com>2020-01-24 21:53:04 +0300
committerJohn Cai <jcai@gitlab.com>2020-02-11 19:19:49 +0300
commit7252714913c4fb81234f4bf1f2ff5a31ba255e6b (patch)
treea87fddc64b9674f935aa340d656853fd77b67941 /internal/praefect/replicator.go
parent974010a7802b73674d06288dda327dff69fb7e82 (diff)
praefect replicator to mark job as failed and retry failed jobs
the replicator will mark jobs as failed so it can retry those failed jobs. In this first iteration we take a naive approach of having the replicator retry all failed jobs.
Diffstat (limited to 'internal/praefect/replicator.go')
-rw-r--r--internal/praefect/replicator.go116
1 files changed, 80 insertions, 36 deletions
diff --git a/internal/praefect/replicator.go b/internal/praefect/replicator.go
index 2f3a48ea3..f0e7abc5f 100644
--- a/internal/praefect/replicator.go
+++ b/internal/praefect/replicator.go
@@ -218,50 +218,87 @@ const (
logWithReplTarget = "replication_job_target"
)
-func expBackoff(start time.Duration, max time.Duration) (backoff func() time.Duration, reset func()) {
- duration := start
- factor := 2
-
- return func() time.Duration {
- defer func() {
- duration *= time.Duration(factor)
- if (duration) >= max {
- duration = max
- }
- }()
- return duration
- }, func() {
- duration = start
- }
+type backoff func() time.Duration
+type backoffReset func()
+
+// BackoffFunc is a function that n turn provides a pair of functions backoff and backoffReset
+type BackoffFunc func() (backoff, backoffReset)
+
+// ExpBackoffFunc generates a backoffFunc based off of start and max time durations
+func ExpBackoffFunc(start time.Duration, max time.Duration) BackoffFunc {
+ return func() (backoff, backoffReset) {
+ const factor = 2
+ duration := start
+
+ return func() time.Duration {
+ defer func() {
+ duration *= time.Duration(factor)
+ if (duration) >= max {
+ duration = max
+ }
+ }()
+ return duration
+ }, func() {
+ duration = start
+ }
+ }
}
+const (
+ maxAttempts = 3
+)
+
// ProcessBacklog will process queued jobs. It will block while processing jobs.
-func (r ReplMgr) ProcessBacklog(ctx context.Context) error {
- backoff, reset := expBackoff(10*time.Millisecond, 1*time.Second)
+func (r ReplMgr) ProcessBacklog(ctx context.Context, b BackoffFunc) error {
+ backoff, reset := b()
for {
nodes, err := r.datastore.GetStorageNodes()
if err != nil {
- return nil
+ r.log.WithError(err).Error("error when getting storage nodes")
+ return err
}
var totalJobs int
for _, node := range nodes {
- jobs, err := r.datastore.GetJobs(datastore.JobStateReady, node.Storage, 10)
+ jobs, err := r.datastore.GetJobs(datastore.JobStateReady|datastore.JobStateFailed, node.Storage, 10)
if err != nil {
- return err
+ r.log.WithField("storage", node.Storage).WithError(err).Error("error when retrieving jobs for replication")
+ continue
}
totalJobs += len(jobs)
+ reposReplicated := make(map[string]struct{})
for _, job := range jobs {
- r.log.WithFields(logrus.Fields{
- logWithReplJobID: job.ID,
- "from_storage": job.SourceNode.Storage,
- "to_storage": job.TargetNode.Storage,
- "relative_path": job.RelativePath,
- }).Info("processing replication job")
- r.processReplJob(ctx, job)
+ if job.Attempts >= maxAttempts {
+ if err := r.datastore.UpdateReplJobState(job.ID, datastore.JobStateDead); err != nil {
+ r.log.WithError(err).Error("error when updating replication job status to cancelled")
+ }
+ continue
+ }
+
+ if _, ok := reposReplicated[job.RelativePath]; ok {
+ if err := r.datastore.UpdateReplJobState(job.ID, datastore.JobStateCancelled); err != nil {
+ r.log.WithError(err).Error("error when updating replication job status to cancelled")
+ }
+ continue
+ }
+
+ if err := r.processReplJob(ctx, job); err != nil {
+ r.log.WithFields(logrus.Fields{
+ logWithReplJobID: job.ID,
+ "from_storage": job.SourceNode.Storage,
+ "to_storage": job.TargetNode.Storage,
+ }).WithError(err).Error("replication job failed")
+
+ if err := r.datastore.UpdateReplJobState(job.ID, datastore.JobStateFailed); err != nil {
+ r.log.WithError(err).Error("error when updating replication job status to failed")
+ continue
+ }
+ }
+
+ reposReplicated[job.RelativePath] = struct{}{}
}
}
@@ -283,33 +320,38 @@ func (r ReplMgr) ProcessBacklog(ctx context.Context) error {
// is a crutch in this situation. Ideally, we need to update state somewhere
// with information regarding the replication failure. See follow up issue:
// https://gitlab.com/gitlab-org/gitaly/issues/2138
-func (r ReplMgr) processReplJob(ctx context.Context, job datastore.ReplJob) {
+func (r ReplMgr) processReplJob(ctx context.Context, job datastore.ReplJob) error {
l := r.log.
WithField(logWithReplJobID, job.ID).
WithField(logWithReplSource, job.SourceNode).
WithField(logWithReplTarget, job.TargetNode)
- if err := r.datastore.UpdateReplJob(job.ID, datastore.JobStateInProgress); err != nil {
+ if err := r.datastore.UpdateReplJobState(job.ID, datastore.JobStateInProgress); err != nil {
l.WithError(err).Error("unable to update replication job to in progress")
- return
+ return err
+ }
+
+ if err := r.datastore.IncrReplJobAttempts(job.ID); err != nil {
+ l.WithError(err).Error("unable to increment replication job attempts")
+ return err
}
targetCC, err := r.clientConnections.GetConnection(job.TargetNode.Storage)
if err != nil {
l.WithError(err).Error("unable to obtain client connection for secondary node in replication job")
- return
+ return err
}
sourceCC, err := r.clientConnections.GetConnection(job.SourceNode.Storage)
if err != nil {
l.WithError(err).Error("unable to obtain client connection for primary node in replication job")
- return
+ return err
}
injectedCtx, err := helper.InjectGitalyServers(ctx, job.SourceNode.Storage, job.SourceNode.Address, job.SourceNode.Token)
if err != nil {
l.WithError(err).Error("unable to inject Gitaly servers into context for replication job")
- return
+ return err
}
replStart := time.Now()
@@ -326,13 +368,15 @@ func (r ReplMgr) processReplJob(ctx context.Context, job datastore.ReplJob) {
}
if err != nil {
l.WithError(err).Error("unable to replicate")
- return
+ return err
}
replDuration := time.Since(replStart)
r.replLatencyMetric.Observe(float64(replDuration) / float64(time.Second))
- if err := r.datastore.UpdateReplJob(job.ID, datastore.JobStateComplete); err != nil {
- l.WithError(err).Error("error when updating replication job status to complete")
+ if err := r.datastore.UpdateReplJobState(job.ID, datastore.JobStateComplete); err != nil {
+ r.log.WithError(err).Error("error when updating replication job status to complete")
}
+
+ return nil
}