Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--changelogs/unreleased/jc-repl-job-errors.yml5
-rw-r--r--cmd/praefect/main.go5
-rw-r--r--internal/praefect/coordinator.go8
-rw-r--r--internal/praefect/datastore/datastore.go33
-rw-r--r--internal/praefect/datastore/datastore_test.go2
-rw-r--r--internal/praefect/helper_test.go8
-rw-r--r--internal/praefect/replicator.go116
-rw-r--r--internal/praefect/replicator_test.go116
8 files changed, 245 insertions, 48 deletions
diff --git a/changelogs/unreleased/jc-repl-job-errors.yml b/changelogs/unreleased/jc-repl-job-errors.yml
new file mode 100644
index 000000000..2ad142455
--- /dev/null
+++ b/changelogs/unreleased/jc-repl-job-errors.yml
@@ -0,0 +1,5 @@
+---
+title: Praefect replicator to mark job as failed and retry failed jobs
+merge_request: 1804
+author:
+type: changed
diff --git a/cmd/praefect/main.go b/cmd/praefect/main.go
index a0868f36d..e74e09c05 100644
--- a/cmd/praefect/main.go
+++ b/cmd/praefect/main.go
@@ -33,6 +33,7 @@ import (
"fmt"
"os"
"strings"
+ "time"
"github.com/sirupsen/logrus"
"gitlab.com/gitlab-org/gitaly/internal/bootstrap"
@@ -190,7 +191,9 @@ func run(cfgs []starter.Config, conf config.Config) error {
}
go func() { serverErrors <- b.Wait() }()
- go func() { serverErrors <- repl.ProcessBacklog(ctx) }()
+ go func() {
+ serverErrors <- repl.ProcessBacklog(ctx, praefect.ExpBackoffFunc(1*time.Second, 5*time.Minute))
+ }()
go coordinator.FailoverRotation()
diff --git a/internal/praefect/coordinator.go b/internal/praefect/coordinator.go
index 22c0a83a8..3ed5ad57c 100644
--- a/internal/praefect/coordinator.go
+++ b/internal/praefect/coordinator.go
@@ -205,10 +205,10 @@ func (c *Coordinator) createReplicaJobs(targetRepo *gitalypb.Repository, primary
return func() {
for _, jobID := range jobIDs {
- if err := c.datastore.UpdateReplJob(jobID, datastore.JobStateReady); err != nil {
- // TODO: in case of error the job remains in queue in 'pending' state and leads to:
- // - additional memory consumption
- // - stale state of one of the git data stores
+ // TODO: in case of error the job remains in queue in 'pending' state and leads to:
+ // - additional memory consumption
+ // - stale state of one of the git data stores
+ if err := c.datastore.UpdateReplJobState(jobID, datastore.JobStateReady); err != nil {
c.log.WithField("job_id", jobID).WithError(err).Errorf("error when updating replication job to %d", datastore.JobStateReady)
}
}
diff --git a/internal/praefect/datastore/datastore.go b/internal/praefect/datastore/datastore.go
index eb3cb0087..bc677aa33 100644
--- a/internal/praefect/datastore/datastore.go
+++ b/internal/praefect/datastore/datastore.go
@@ -36,6 +36,11 @@ const (
// JobStateCancelled indicates the job was cancelled. This can occur if the
// job is no longer relevant (e.g. a node is moved out of a repository)
JobStateCancelled
+ // JobStateFailed indicates the job did not succeed. The Replicator will retry
+ // failed jobs.
+ JobStateFailed
+ // JobStateDead indicates the job was retried up to the maximum retries
+ JobStateDead
)
// ChangeType indicates what kind of change the replication is propagating
@@ -57,6 +62,7 @@ type ReplJob struct {
TargetNode, SourceNode models.Node // which node to replicate to?
RelativePath string // source for replication
State JobState
+ Attempts int
}
// replJobs provides sort manipulation behavior
@@ -104,8 +110,10 @@ type ReplJobsDatastore interface {
// ID's for the created jobs will be returned upon success.
CreateReplicaReplJobs(relativePath string, primary models.Node, secondaries []models.Node, change ChangeType) ([]uint64, error)
- // UpdateReplJob updates the state of an existing replication job
- UpdateReplJob(jobID uint64, newState JobState) error
+ // UpdateReplJobState updates the state of an existing replication job
+ UpdateReplJobState(jobID uint64, newState JobState) error
+
+ IncrReplJobAttempts(jobID uint64) error
}
type jobRecord struct {
@@ -113,6 +121,7 @@ type jobRecord struct {
relativePath string // project's relative path
targetNodeStorage, sourceNodeStorage string
state JobState
+ attempts int
}
// MemoryDatastore is a simple datastore that isn't persisted to disk. It is
@@ -283,6 +292,7 @@ func (md *MemoryDatastore) replJobFromRecord(jobID uint64, record jobRecord) (Re
SourceNode: sourceNode,
State: record.state,
TargetNode: targetNode,
+ Attempts: record.attempts,
}, nil
}
@@ -319,8 +329,8 @@ func (md *MemoryDatastore) CreateReplicaReplJobs(relativePath string, primary mo
return jobIDs, nil
}
-// UpdateReplJob updates an existing replication job's state
-func (md *MemoryDatastore) UpdateReplJob(jobID uint64, newState JobState) error {
+// UpdateReplJobState updates an existing replication job's state
+func (md *MemoryDatastore) UpdateReplJobState(jobID uint64, newState JobState) error {
md.jobs.Lock()
defer md.jobs.Unlock()
@@ -339,3 +349,18 @@ func (md *MemoryDatastore) UpdateReplJob(jobID uint64, newState JobState) error
md.jobs.records[jobID] = job
return nil
}
+
+// IncrReplJobAttempts updates an existing replication job's state
+func (md *MemoryDatastore) IncrReplJobAttempts(jobID uint64) error {
+ md.jobs.Lock()
+ defer md.jobs.Unlock()
+
+ job, ok := md.jobs.records[jobID]
+ if !ok {
+ return fmt.Errorf("job ID %d does not exist", jobID)
+ }
+
+ job.attempts++
+ md.jobs.records[jobID] = job
+ return nil
+}
diff --git a/internal/praefect/datastore/datastore_test.go b/internal/praefect/datastore/datastore_test.go
index ffae23dd1..81e03e2cf 100644
--- a/internal/praefect/datastore/datastore_test.go
+++ b/internal/praefect/datastore/datastore_test.go
@@ -69,7 +69,7 @@ var operations = []struct {
{
desc: "mark replication job done",
opFn: func(t *testing.T, ds Datastore) {
- err := ds.UpdateReplJob(1, JobStateComplete)
+ err := ds.UpdateReplJobState(1, JobStateComplete)
require.NoError(t, err)
},
},
diff --git a/internal/praefect/helper_test.go b/internal/praefect/helper_test.go
index 6e8791afb..953bb88fe 100644
--- a/internal/praefect/helper_test.go
+++ b/internal/praefect/helper_test.go
@@ -159,6 +159,12 @@ func runPraefectServerWithMock(t *testing.T, conf config.Config, backends map[st
return mock.NewSimpleServiceClient(cc), prf, cleanup
}
+func noopBackoffFunc() (backoff, backoffReset) {
+ return func() time.Duration {
+ return 0
+ }, func() {}
+}
+
// runPraefectServerWithGitaly runs a praefect server with actual Gitaly nodes
// requires exactly 1 virtual storage
func runPraefectServerWithGitaly(t *testing.T, conf config.Config) (*grpc.ClientConn, *Server, testhelper.Cleanup) {
@@ -205,7 +211,7 @@ func runPraefectServerWithGitaly(t *testing.T, conf config.Config) (*grpc.Client
prf.RegisterServices()
go func() { errQ <- prf.Serve(listener, false) }()
- go func() { errQ <- replmgr.ProcessBacklog(ctx) }()
+ go func() { errQ <- replmgr.ProcessBacklog(ctx, noopBackoffFunc) }()
// dial client to praefect
cc := dialLocalPort(t, port, false)
diff --git a/internal/praefect/replicator.go b/internal/praefect/replicator.go
index 2f3a48ea3..f0e7abc5f 100644
--- a/internal/praefect/replicator.go
+++ b/internal/praefect/replicator.go
@@ -218,50 +218,87 @@ const (
logWithReplTarget = "replication_job_target"
)
-func expBackoff(start time.Duration, max time.Duration) (backoff func() time.Duration, reset func()) {
- duration := start
- factor := 2
-
- return func() time.Duration {
- defer func() {
- duration *= time.Duration(factor)
- if (duration) >= max {
- duration = max
- }
- }()
- return duration
- }, func() {
- duration = start
- }
+type backoff func() time.Duration
+type backoffReset func()
+
+// BackoffFunc is a function that n turn provides a pair of functions backoff and backoffReset
+type BackoffFunc func() (backoff, backoffReset)
+
+// ExpBackoffFunc generates a backoffFunc based off of start and max time durations
+func ExpBackoffFunc(start time.Duration, max time.Duration) BackoffFunc {
+ return func() (backoff, backoffReset) {
+ const factor = 2
+ duration := start
+
+ return func() time.Duration {
+ defer func() {
+ duration *= time.Duration(factor)
+ if (duration) >= max {
+ duration = max
+ }
+ }()
+ return duration
+ }, func() {
+ duration = start
+ }
+ }
}
+const (
+ maxAttempts = 3
+)
+
// ProcessBacklog will process queued jobs. It will block while processing jobs.
-func (r ReplMgr) ProcessBacklog(ctx context.Context) error {
- backoff, reset := expBackoff(10*time.Millisecond, 1*time.Second)
+func (r ReplMgr) ProcessBacklog(ctx context.Context, b BackoffFunc) error {
+ backoff, reset := b()
for {
nodes, err := r.datastore.GetStorageNodes()
if err != nil {
- return nil
+ r.log.WithError(err).Error("error when getting storage nodes")
+ return err
}
var totalJobs int
for _, node := range nodes {
- jobs, err := r.datastore.GetJobs(datastore.JobStateReady, node.Storage, 10)
+ jobs, err := r.datastore.GetJobs(datastore.JobStateReady|datastore.JobStateFailed, node.Storage, 10)
if err != nil {
- return err
+ r.log.WithField("storage", node.Storage).WithError(err).Error("error when retrieving jobs for replication")
+ continue
}
totalJobs += len(jobs)
+ reposReplicated := make(map[string]struct{})
for _, job := range jobs {
- r.log.WithFields(logrus.Fields{
- logWithReplJobID: job.ID,
- "from_storage": job.SourceNode.Storage,
- "to_storage": job.TargetNode.Storage,
- "relative_path": job.RelativePath,
- }).Info("processing replication job")
- r.processReplJob(ctx, job)
+ if job.Attempts >= maxAttempts {
+ if err := r.datastore.UpdateReplJobState(job.ID, datastore.JobStateDead); err != nil {
+ r.log.WithError(err).Error("error when updating replication job status to cancelled")
+ }
+ continue
+ }
+
+ if _, ok := reposReplicated[job.RelativePath]; ok {
+ if err := r.datastore.UpdateReplJobState(job.ID, datastore.JobStateCancelled); err != nil {
+ r.log.WithError(err).Error("error when updating replication job status to cancelled")
+ }
+ continue
+ }
+
+ if err := r.processReplJob(ctx, job); err != nil {
+ r.log.WithFields(logrus.Fields{
+ logWithReplJobID: job.ID,
+ "from_storage": job.SourceNode.Storage,
+ "to_storage": job.TargetNode.Storage,
+ }).WithError(err).Error("replication job failed")
+
+ if err := r.datastore.UpdateReplJobState(job.ID, datastore.JobStateFailed); err != nil {
+ r.log.WithError(err).Error("error when updating replication job status to failed")
+ continue
+ }
+ }
+
+ reposReplicated[job.RelativePath] = struct{}{}
}
}
@@ -283,33 +320,38 @@ func (r ReplMgr) ProcessBacklog(ctx context.Context) error {
// is a crutch in this situation. Ideally, we need to update state somewhere
// with information regarding the replication failure. See follow up issue:
// https://gitlab.com/gitlab-org/gitaly/issues/2138
-func (r ReplMgr) processReplJob(ctx context.Context, job datastore.ReplJob) {
+func (r ReplMgr) processReplJob(ctx context.Context, job datastore.ReplJob) error {
l := r.log.
WithField(logWithReplJobID, job.ID).
WithField(logWithReplSource, job.SourceNode).
WithField(logWithReplTarget, job.TargetNode)
- if err := r.datastore.UpdateReplJob(job.ID, datastore.JobStateInProgress); err != nil {
+ if err := r.datastore.UpdateReplJobState(job.ID, datastore.JobStateInProgress); err != nil {
l.WithError(err).Error("unable to update replication job to in progress")
- return
+ return err
+ }
+
+ if err := r.datastore.IncrReplJobAttempts(job.ID); err != nil {
+ l.WithError(err).Error("unable to increment replication job attempts")
+ return err
}
targetCC, err := r.clientConnections.GetConnection(job.TargetNode.Storage)
if err != nil {
l.WithError(err).Error("unable to obtain client connection for secondary node in replication job")
- return
+ return err
}
sourceCC, err := r.clientConnections.GetConnection(job.SourceNode.Storage)
if err != nil {
l.WithError(err).Error("unable to obtain client connection for primary node in replication job")
- return
+ return err
}
injectedCtx, err := helper.InjectGitalyServers(ctx, job.SourceNode.Storage, job.SourceNode.Address, job.SourceNode.Token)
if err != nil {
l.WithError(err).Error("unable to inject Gitaly servers into context for replication job")
- return
+ return err
}
replStart := time.Now()
@@ -326,13 +368,15 @@ func (r ReplMgr) processReplJob(ctx context.Context, job datastore.ReplJob) {
}
if err != nil {
l.WithError(err).Error("unable to replicate")
- return
+ return err
}
replDuration := time.Since(replStart)
r.replLatencyMetric.Observe(float64(replDuration) / float64(time.Second))
- if err := r.datastore.UpdateReplJob(job.ID, datastore.JobStateComplete); err != nil {
- l.WithError(err).Error("error when updating replication job status to complete")
+ if err := r.datastore.UpdateReplJobState(job.ID, datastore.JobStateComplete); err != nil {
+ r.log.WithError(err).Error("error when updating replication job status to complete")
}
+
+ return nil
}
diff --git a/internal/praefect/replicator_test.go b/internal/praefect/replicator_test.go
index 444579923..30bf4ae08 100644
--- a/internal/praefect/replicator_test.go
+++ b/internal/praefect/replicator_test.go
@@ -190,6 +190,120 @@ func TestConfirmReplication(t *testing.T) {
require.False(t, equal)
}
+func TestProcessBacklog(t *testing.T) {
+ srv, srvSocketPath := runFullGitalyServer(t)
+ defer srv.Stop()
+
+ testRepo, _, cleanupFn := testhelper.NewTestRepo(t)
+ defer cleanupFn()
+
+ backupStorageName := "backup"
+
+ backupDir, err := ioutil.TempDir(testhelper.GitlabTestStoragePath(), backupStorageName)
+ require.NoError(t, err)
+
+ defer os.RemoveAll(backupDir)
+
+ oldStorages := gitaly_config.Config.Storages
+ defer func() {
+ gitaly_config.Config.Storages = oldStorages
+ }()
+
+ gitaly_config.Config.Storages = append(gitaly_config.Config.Storages, gitaly_config.Storage{
+ Name: backupStorageName,
+ Path: backupDir,
+ },
+ gitaly_config.Storage{
+ Name: "default",
+ Path: testhelper.GitlabTestStoragePath(),
+ },
+ )
+
+ primary := models.Node{
+ Storage: "default",
+ Address: srvSocketPath,
+ Token: gitaly_config.Config.Auth.Token,
+ DefaultPrimary: true,
+ }
+
+ secondary := models.Node{
+ Storage: backupStorageName,
+ Address: srvSocketPath,
+ Token: gitaly_config.Config.Auth.Token,
+ }
+
+ config := config.Config{
+ VirtualStorages: []*config.VirtualStorage{
+ {
+ Name: "default",
+ Nodes: []*models.Node{
+ &primary,
+ &secondary,
+ },
+ },
+ },
+ }
+
+ ds := datastore.NewInMemory(config)
+ ids, err := ds.CreateReplicaReplJobs(testRepo.GetRelativePath(), primary, []models.Node{secondary}, datastore.UpdateRepo)
+ require.NoError(t, err)
+ require.Len(t, ids, 1)
+
+ require.NoError(t, ds.UpdateReplJobState(ids[0], datastore.JobStateReady))
+
+ clientCC := conn.NewClientConnections()
+ require.NoError(t, clientCC.RegisterNode("default", srvSocketPath, gitaly_config.Config.Auth.Token))
+
+ replMgr := NewReplMgr(backupStorageName, gitaly_log.Default(), ds, clientCC)
+ ctx, cancel := testhelper.Context()
+ defer cancel()
+
+ go replMgr.ProcessBacklog(ctx, noopBackoffFunc)
+
+ timeLimit := time.NewTimer(5 * time.Second)
+ ticker := time.NewTicker(10 * time.Millisecond)
+
+ // the job will fail to process because the client connection for "backup" is not registered. It should fail maxAttempts times
+ // and get cancelled.
+TestJobGetsCancelled:
+ for {
+ select {
+ case <-ticker.C:
+ replJobs, err := ds.GetJobs(datastore.JobStateDead, "backup", 10)
+ require.NoError(t, err)
+ if len(replJobs) == 1 {
+ //success
+ break TestJobGetsCancelled
+ }
+ case <-timeLimit.C:
+ t.Fatal("time limit expired for job to complete")
+ }
+ }
+
+ require.NoError(t, clientCC.RegisterNode("backup", srvSocketPath, gitaly_config.Config.Auth.Token))
+ ids, err = ds.CreateReplicaReplJobs(testRepo.GetRelativePath(), primary, []models.Node{secondary}, datastore.UpdateRepo)
+ require.NoError(t, err)
+ require.Len(t, ids, 1)
+ require.NoError(t, ds.UpdateReplJobState(ids[0], datastore.JobStateReady))
+ timeLimit.Reset(5 * time.Second)
+
+ // Once the node is registered, and we try the job again it should succeed
+TestJobSucceeds:
+ for {
+ select {
+ case <-ticker.C:
+ replJobs, err := ds.GetJobs(datastore.JobStateFailed|datastore.JobStateInProgress|datastore.JobStateReady, "backup", 10)
+ require.NoError(t, err)
+ if len(replJobs) == 0 {
+ //success
+ break TestJobSucceeds
+ }
+ case <-timeLimit.C:
+ t.Error("time limit expired for job to complete")
+ }
+ }
+}
+
func TestBackoff(t *testing.T) {
start := 1 * time.Microsecond
max := 6 * time.Microsecond
@@ -201,7 +315,7 @@ func TestBackoff(t *testing.T) {
6 * time.Microsecond,
6 * time.Microsecond,
}
- b, reset := expBackoff(start, max)
+ b, reset := ExpBackoffFunc(start, max)()
for _, expectedBackoff := range expectedBackoffs {
require.Equal(t, expectedBackoff, b())
}