Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorZeger-Jan van de Weg <git@zjvandeweg.nl>2019-05-10 19:35:51 +0300
committerZeger-Jan van de Weg <git@zjvandeweg.nl>2019-05-10 19:35:51 +0300
commitef4a5d9721ceeb5ab09486856442ce6863563ab5 (patch)
treec186ff7cf62cfe46e82f418df1d7a3d73f8fd3f1
parent33fd67aad42c81ae224fde1dfdc6bda5e8153fcf (diff)
parent28266fbc780ac7a2e85f36fe160cdb050b8be65f (diff)
Merge branch 'po-fix-datastore-update' into 'master'
Fix replication job state changing See merge request gitlab-org/gitaly!1236
-rw-r--r--changelogs/unreleased/po-fix-datastore-update.yml5
-rw-r--r--internal/praefect/datastore.go49
-rw-r--r--internal/praefect/datastore_test.go1
-rw-r--r--internal/praefect/replicator.go25
-rw-r--r--internal/praefect/replicator_test.go15
5 files changed, 65 insertions, 30 deletions
diff --git a/changelogs/unreleased/po-fix-datastore-update.yml b/changelogs/unreleased/po-fix-datastore-update.yml
new file mode 100644
index 000000000..871b77ebf
--- /dev/null
+++ b/changelogs/unreleased/po-fix-datastore-update.yml
@@ -0,0 +1,5 @@
+---
+title: Fix replication job state changing
+merge_request: 1236
+author:
+type: fixed
diff --git a/internal/praefect/datastore.go b/internal/praefect/datastore.go
index 388435441..fb5b2daea 100644
--- a/internal/praefect/datastore.go
+++ b/internal/praefect/datastore.go
@@ -8,6 +8,7 @@ package praefect
import (
"errors"
"fmt"
+ "sort"
"sync"
"gitlab.com/gitlab-org/gitaly/internal/praefect/config"
@@ -41,6 +42,17 @@ type ReplJob struct {
State JobState
}
+// replJobs provides sort manipulation behavior
+type replJobs []ReplJob
+
+func (rjs replJobs) Len() int { return len(rjs) }
+func (rjs replJobs) Swap(i, j int) { rjs[i], rjs[j] = rjs[j], rjs[i] }
+
+// byJobID provides a comparator for sorting jobs
+type byJobID struct{ replJobs }
+
+func (b byJobID) Less(i, j int) bool { return b.replJobs[i].ID < b.replJobs[j].ID }
+
// Datastore is a data persistence abstraction for all of Praefect's
// persistence needs
type Datastore interface {
@@ -100,7 +112,8 @@ type MemoryDatastore struct {
jobs *struct {
sync.RWMutex
- records []jobRecord // all jobs indexed by ID
+ next uint64
+ records map[uint64]jobRecord // all jobs indexed by ID
}
}
@@ -115,8 +128,12 @@ func NewMemoryDatastore(cfg config.Config) *MemoryDatastore {
},
jobs: &struct {
sync.RWMutex
- records []jobRecord // all jobs indexed by ID
- }{},
+ next uint64
+ records map[uint64]jobRecord // all jobs indexed by ID
+ }{
+ next: 0,
+ records: map[uint64]jobRecord{},
+ },
}
secondaries := make([]string, len(cfg.SecondaryServers))
@@ -134,11 +151,12 @@ func NewMemoryDatastore(cfg config.Config) *MemoryDatastore {
// initialize replication job queue to replicate all whitelisted repos
// to every secondary server
for _, secondary := range cfg.SecondaryServers {
- m.jobs.records = append(m.jobs.records, jobRecord{
+ m.jobs.next++
+ m.jobs.records[m.jobs.next] = jobRecord{
state: JobStateReady,
target: secondary.Name,
relativePath: relativePath,
- })
+ }
}
}
@@ -205,12 +223,14 @@ func (md *MemoryDatastore) GetIncompleteJobs(storage string, count int) ([]ReplJ
}
}
+ sort.Sort(byJobID{results})
+
return results, nil
}
// replJobFromRecord constructs a replication job from a record and by cross
// referencing the current shard for the project being replicated
-func (md *MemoryDatastore) replJobFromRecord(index int, record jobRecord) (ReplJob, error) {
+func (md *MemoryDatastore) replJobFromRecord(jobID uint64, record jobRecord) (ReplJob, error) {
shard, ok := md.getShard(record.relativePath)
if !ok {
return ReplJob{}, fmt.Errorf(
@@ -220,7 +240,7 @@ func (md *MemoryDatastore) replJobFromRecord(index int, record jobRecord) (ReplJ
}
return ReplJob{
- ID: uint64(index + 1),
+ ID: jobID,
Source: Repository{
RelativePath: record.relativePath,
Storage: shard.primary,
@@ -258,11 +278,12 @@ func (md *MemoryDatastore) CreateSecondaryReplJobs(source Repository) ([]uint64,
for _, secondary := range shard.secondaries {
nextID := uint64(len(md.jobs.records) + 1)
- md.jobs.records = append(md.jobs.records, jobRecord{
+ md.jobs.next++
+ md.jobs.records[md.jobs.next] = jobRecord{
target: secondary,
state: JobStatePending,
relativePath: source.RelativePath,
- })
+ }
jobIDs = append(jobIDs, nextID)
}
@@ -275,18 +296,18 @@ func (md *MemoryDatastore) UpdateReplJob(jobID uint64, newState JobState) error
md.jobs.Lock()
defer md.jobs.Unlock()
- if jobID == 0 || jobID > uint64(len(md.jobs.records)) {
+ job, ok := md.jobs.records[jobID]
+ if !ok {
return fmt.Errorf("job ID %d does not exist", jobID)
}
- index := jobID - 1
-
if newState == JobStateComplete || newState == JobStateCancelled {
// remove the job to avoid filling up memory with unneeded job records
- md.jobs.records = append(md.jobs.records[:index], md.jobs.records[index+1:]...)
+ delete(md.jobs.records, jobID)
return nil
}
- md.jobs.records[index].state = newState
+ job.state = newState
+ md.jobs.records[jobID] = job
return nil
}
diff --git a/internal/praefect/datastore_test.go b/internal/praefect/datastore_test.go
index 8356fede2..b41f6736b 100644
--- a/internal/praefect/datastore_test.go
+++ b/internal/praefect/datastore_test.go
@@ -59,7 +59,6 @@ var operations = []struct {
desc: "fetch inserted replication jobs after primary mapped",
opFn: func(t *testing.T, ds praefect.Datastore) {
jobs, err := ds.GetIncompleteJobs(stor2, 10)
-
require.NoError(t, err)
require.Len(t, jobs, 1)
diff --git a/internal/praefect/replicator.go b/internal/praefect/replicator.go
index fa05f62b9..723726567 100644
--- a/internal/praefect/replicator.go
+++ b/internal/praefect/replicator.go
@@ -96,6 +96,11 @@ func (r ReplMgr) ScheduleReplication(ctx context.Context, repo Repository) error
return nil
}
+const (
+ jobFetchInterval = 10 * time.Millisecond
+ logWithReplJobID = "replication-job-ID"
+)
+
// ProcessBacklog will process queued jobs. It will block while processing jobs.
func (r ReplMgr) ProcessBacklog(ctx context.Context) error {
since := time.Time{}
@@ -107,10 +112,12 @@ func (r ReplMgr) ProcessBacklog(ctx context.Context) error {
}
if len(jobs) == 0 {
+ r.log.Debugf("no jobs for %s, checking again in %s", r.storage, jobFetchInterval)
+
select {
// TODO: exponential backoff when no queries are returned
- case <-time.After(10 * time.Millisecond):
+ case <-time.After(jobFetchInterval):
continue
case <-ctx.Done():
@@ -122,14 +129,24 @@ func (r ReplMgr) ProcessBacklog(ctx context.Context) error {
r.log.Debugf("fetched replication jobs: %#v", jobs)
for _, job := range jobs {
- r.log.Infof("processing replication job %#v", job)
+ r.log.WithField(logWithReplJobID, job.ID).
+ Infof("processing replication job %#v", job)
node, err := r.coordinator.GetStorageNode(job.Target)
if err != nil {
return err
}
- err = r.replicator.Replicate(ctx, job.Source, node)
- if err != nil {
+ if err := r.jobsStore.UpdateReplJob(job.ID, JobStateInProgress); err != nil {
+ return err
+ }
+
+ if err := r.replicator.Replicate(ctx, job.Source, node); err != nil {
+ return err
+ }
+
+ r.log.WithField(logWithReplJobID, job.ID).
+ Info("completed replication")
+ if err := r.jobsStore.UpdateReplJob(job.ID, JobStateComplete); err != nil {
return err
}
}
diff --git a/internal/praefect/replicator_test.go b/internal/praefect/replicator_test.go
index dc5566cb3..1f1955e81 100644
--- a/internal/praefect/replicator_test.go
+++ b/internal/praefect/replicator_test.go
@@ -69,29 +69,22 @@ func TestReplicatorProcessJobsWhitelist(t *testing.T) {
}()
success := make(chan struct{})
- expectJobs := len(cfg.Whitelist) * len(cfg.SecondaryServers)
go func() {
// we expect one job per whitelisted repo with each backend server
- for i := 0; i < expectJobs; i++ {
+ for i := 0; i < len(cfg.Whitelist); i++ {
result := <-resultsCh
assert.Contains(t, cfg.Whitelist, result.source.RelativePath)
- assert.Contains(t,
- []string{
- cfg.SecondaryServers[0].Name,
- cfg.SecondaryServers[1].Name,
- },
- result.target.Storage,
- )
+ assert.Equal(t, result.target.Storage, cfg.SecondaryServers[1].Name)
+ assert.Equal(t, result.source.Storage, cfg.PrimaryServer.Name)
}
cancel()
+ require.EqualError(t, <-errQ, context.Canceled.Error())
success <- struct{}{}
}()
- require.EqualError(t, <-errQ, context.Canceled.Error())
-
select {
case <-success: