diff options
author | Zeger-Jan van de Weg <git@zjvandeweg.nl> | 2019-05-10 19:35:51 +0300 |
---|---|---|
committer | Zeger-Jan van de Weg <git@zjvandeweg.nl> | 2019-05-10 19:35:51 +0300 |
commit | ef4a5d9721ceeb5ab09486856442ce6863563ab5 (patch) | |
tree | c186ff7cf62cfe46e82f418df1d7a3d73f8fd3f1 | |
parent | 33fd67aad42c81ae224fde1dfdc6bda5e8153fcf (diff) | |
parent | 28266fbc780ac7a2e85f36fe160cdb050b8be65f (diff) |
Merge branch 'po-fix-datastore-update' into 'master'
Fix replication job state changing
See merge request gitlab-org/gitaly!1236
-rw-r--r-- | changelogs/unreleased/po-fix-datastore-update.yml | 5 | ||||
-rw-r--r-- | internal/praefect/datastore.go | 49 | ||||
-rw-r--r-- | internal/praefect/datastore_test.go | 1 | ||||
-rw-r--r-- | internal/praefect/replicator.go | 25 | ||||
-rw-r--r-- | internal/praefect/replicator_test.go | 15 |
5 files changed, 65 insertions, 30 deletions
diff --git a/changelogs/unreleased/po-fix-datastore-update.yml b/changelogs/unreleased/po-fix-datastore-update.yml new file mode 100644 index 000000000..871b77ebf --- /dev/null +++ b/changelogs/unreleased/po-fix-datastore-update.yml @@ -0,0 +1,5 @@ +--- +title: Fix replication job state changing +merge_request: 1236 +author: +type: fixed diff --git a/internal/praefect/datastore.go b/internal/praefect/datastore.go index 388435441..fb5b2daea 100644 --- a/internal/praefect/datastore.go +++ b/internal/praefect/datastore.go @@ -8,6 +8,7 @@ package praefect import ( "errors" "fmt" + "sort" "sync" "gitlab.com/gitlab-org/gitaly/internal/praefect/config" @@ -41,6 +42,17 @@ type ReplJob struct { State JobState } +// replJobs provides sort manipulation behavior +type replJobs []ReplJob + +func (rjs replJobs) Len() int { return len(rjs) } +func (rjs replJobs) Swap(i, j int) { rjs[i], rjs[j] = rjs[j], rjs[i] } + +// byJobID provides a comparator for sorting jobs +type byJobID struct{ replJobs } + +func (b byJobID) Less(i, j int) bool { return b.replJobs[i].ID < b.replJobs[j].ID } + // Datastore is a data persistence abstraction for all of Praefect's // persistence needs type Datastore interface { @@ -100,7 +112,8 @@ type MemoryDatastore struct { jobs *struct { sync.RWMutex - records []jobRecord // all jobs indexed by ID + next uint64 + records map[uint64]jobRecord // all jobs indexed by ID } } @@ -115,8 +128,12 @@ func NewMemoryDatastore(cfg config.Config) *MemoryDatastore { }, jobs: &struct { sync.RWMutex - records []jobRecord // all jobs indexed by ID - }{}, + next uint64 + records map[uint64]jobRecord // all jobs indexed by ID + }{ + next: 0, + records: map[uint64]jobRecord{}, + }, } secondaries := make([]string, len(cfg.SecondaryServers)) @@ -134,11 +151,12 @@ func NewMemoryDatastore(cfg config.Config) *MemoryDatastore { // initialize replication job queue to replicate all whitelisted repos // to every secondary server for _, secondary := range cfg.SecondaryServers { - m.jobs.records = append(m.jobs.records, jobRecord{ + m.jobs.next++ + m.jobs.records[m.jobs.next] = jobRecord{ state: JobStateReady, target: secondary.Name, relativePath: relativePath, - }) + } } } @@ -205,12 +223,14 @@ func (md *MemoryDatastore) GetIncompleteJobs(storage string, count int) ([]ReplJ } } + sort.Sort(byJobID{results}) + return results, nil } // replJobFromRecord constructs a replication job from a record and by cross // referencing the current shard for the project being replicated -func (md *MemoryDatastore) replJobFromRecord(index int, record jobRecord) (ReplJob, error) { +func (md *MemoryDatastore) replJobFromRecord(jobID uint64, record jobRecord) (ReplJob, error) { shard, ok := md.getShard(record.relativePath) if !ok { return ReplJob{}, fmt.Errorf( @@ -220,7 +240,7 @@ func (md *MemoryDatastore) replJobFromRecord(index int, record jobRecord) (ReplJ } return ReplJob{ - ID: uint64(index + 1), + ID: jobID, Source: Repository{ RelativePath: record.relativePath, Storage: shard.primary, @@ -258,11 +278,12 @@ func (md *MemoryDatastore) CreateSecondaryReplJobs(source Repository) ([]uint64, for _, secondary := range shard.secondaries { nextID := uint64(len(md.jobs.records) + 1) - md.jobs.records = append(md.jobs.records, jobRecord{ + md.jobs.next++ + md.jobs.records[md.jobs.next] = jobRecord{ target: secondary, state: JobStatePending, relativePath: source.RelativePath, - }) + } jobIDs = append(jobIDs, nextID) } @@ -275,18 +296,18 @@ func (md *MemoryDatastore) UpdateReplJob(jobID uint64, newState JobState) error md.jobs.Lock() defer md.jobs.Unlock() - if jobID == 0 || jobID > uint64(len(md.jobs.records)) { + job, ok := md.jobs.records[jobID] + if !ok { return fmt.Errorf("job ID %d does not exist", jobID) } - index := jobID - 1 - if newState == JobStateComplete || newState == JobStateCancelled { // remove the job to avoid filling up memory with unneeded job records - md.jobs.records = append(md.jobs.records[:index], md.jobs.records[index+1:]...) + delete(md.jobs.records, jobID) return nil } - md.jobs.records[index].state = newState + job.state = newState + md.jobs.records[jobID] = job return nil } diff --git a/internal/praefect/datastore_test.go b/internal/praefect/datastore_test.go index 8356fede2..b41f6736b 100644 --- a/internal/praefect/datastore_test.go +++ b/internal/praefect/datastore_test.go @@ -59,7 +59,6 @@ var operations = []struct { desc: "fetch inserted replication jobs after primary mapped", opFn: func(t *testing.T, ds praefect.Datastore) { jobs, err := ds.GetIncompleteJobs(stor2, 10) - require.NoError(t, err) require.Len(t, jobs, 1) diff --git a/internal/praefect/replicator.go b/internal/praefect/replicator.go index fa05f62b9..723726567 100644 --- a/internal/praefect/replicator.go +++ b/internal/praefect/replicator.go @@ -96,6 +96,11 @@ func (r ReplMgr) ScheduleReplication(ctx context.Context, repo Repository) error return nil } +const ( + jobFetchInterval = 10 * time.Millisecond + logWithReplJobID = "replication-job-ID" +) + // ProcessBacklog will process queued jobs. It will block while processing jobs. func (r ReplMgr) ProcessBacklog(ctx context.Context) error { since := time.Time{} @@ -107,10 +112,12 @@ func (r ReplMgr) ProcessBacklog(ctx context.Context) error { } if len(jobs) == 0 { + r.log.Debugf("no jobs for %s, checking again in %s", r.storage, jobFetchInterval) + select { // TODO: exponential backoff when no queries are returned - case <-time.After(10 * time.Millisecond): + case <-time.After(jobFetchInterval): continue case <-ctx.Done(): @@ -122,14 +129,24 @@ func (r ReplMgr) ProcessBacklog(ctx context.Context) error { r.log.Debugf("fetched replication jobs: %#v", jobs) for _, job := range jobs { - r.log.Infof("processing replication job %#v", job) + r.log.WithField(logWithReplJobID, job.ID). + Infof("processing replication job %#v", job) node, err := r.coordinator.GetStorageNode(job.Target) if err != nil { return err } - err = r.replicator.Replicate(ctx, job.Source, node) - if err != nil { + if err := r.jobsStore.UpdateReplJob(job.ID, JobStateInProgress); err != nil { + return err + } + + if err := r.replicator.Replicate(ctx, job.Source, node); err != nil { + return err + } + + r.log.WithField(logWithReplJobID, job.ID). + Info("completed replication") + if err := r.jobsStore.UpdateReplJob(job.ID, JobStateComplete); err != nil { return err } } diff --git a/internal/praefect/replicator_test.go b/internal/praefect/replicator_test.go index dc5566cb3..1f1955e81 100644 --- a/internal/praefect/replicator_test.go +++ b/internal/praefect/replicator_test.go @@ -69,29 +69,22 @@ func TestReplicatorProcessJobsWhitelist(t *testing.T) { }() success := make(chan struct{}) - expectJobs := len(cfg.Whitelist) * len(cfg.SecondaryServers) go func() { // we expect one job per whitelisted repo with each backend server - for i := 0; i < expectJobs; i++ { + for i := 0; i < len(cfg.Whitelist); i++ { result := <-resultsCh assert.Contains(t, cfg.Whitelist, result.source.RelativePath) - assert.Contains(t, - []string{ - cfg.SecondaryServers[0].Name, - cfg.SecondaryServers[1].Name, - }, - result.target.Storage, - ) + assert.Equal(t, result.target.Storage, cfg.SecondaryServers[1].Name) + assert.Equal(t, result.source.Storage, cfg.PrimaryServer.Name) } cancel() + require.EqualError(t, <-errQ, context.Canceled.Error()) success <- struct{}{} }() - require.EqualError(t, <-errQ, context.Canceled.Error()) - select { case <-success: |