diff options
author | Patrick Steinhardt <psteinhardt@gitlab.com> | 2020-03-11 13:33:07 +0300 |
---|---|---|
committer | Patrick Steinhardt <psteinhardt@gitlab.com> | 2020-03-11 13:33:07 +0300 |
commit | b58a50ae0993cdfbb9aa39abdae2deb8c9f478ab (patch) | |
tree | 449ce0288bc847ccb34149ca5dbfed9d599a54b1 | |
parent | aa910b0b6fbc3f005a9924dee731123dfca2e48d (diff) | |
parent | 4341cd464913a32da78c19806243c0e25c54f917 (diff) |
Merge branch 'pa-aline-replication-job-status-type' into 'master'
Praefect: use enum values for job states
See merge request gitlab-org/gitaly!1906
-rw-r--r-- | changelogs/unreleased/pa-aline-replication-job-status-type.yml | 5 | ||||
-rw-r--r-- | internal/praefect/coordinator.go | 4 | ||||
-rw-r--r-- | internal/praefect/coordinator_test.go | 4 | ||||
-rw-r--r-- | internal/praefect/datastore/datastore.go | 77 | ||||
-rw-r--r-- | internal/praefect/datastore/datastore_test.go | 8 | ||||
-rw-r--r-- | internal/praefect/datastore/queue.go | 18 | ||||
-rw-r--r-- | internal/praefect/datastore/queue_test.go | 60 | ||||
-rw-r--r-- | internal/praefect/replicator.go | 6 | ||||
-rw-r--r-- | internal/praefect/replicator_test.go | 6 |
9 files changed, 92 insertions, 96 deletions
diff --git a/changelogs/unreleased/pa-aline-replication-job-status-type.yml b/changelogs/unreleased/pa-aline-replication-job-status-type.yml new file mode 100644 index 000000000..2c03a5a93 --- /dev/null +++ b/changelogs/unreleased/pa-aline-replication-job-status-type.yml @@ -0,0 +1,5 @@ +--- +title: 'Praefect: use enum values for job states' +merge_request: 1906 +author: +type: changed diff --git a/internal/praefect/coordinator.go b/internal/praefect/coordinator.go index e4cf73589..ae4ebcb61 100644 --- a/internal/praefect/coordinator.go +++ b/internal/praefect/coordinator.go @@ -24,7 +24,7 @@ func getReplicationDetails(methodName string, m proto.Message) (datastore.Change case "/gitaly.RepositoryService/RenameRepository": req, ok := m.(*gitalypb.RenameRepositoryRequest) if !ok { - return 0, nil, fmt.Errorf("protocol changed: for method %q expected message type '%T', got '%T'", methodName, req, m) + return "", nil, fmt.Errorf("protocol changed: for method %q expected message type '%T', got '%T'", methodName, req, m) } return datastore.RenameRepo, datastore.Params{"RelativePath": req.RelativePath}, nil default: @@ -204,7 +204,7 @@ func (c *Coordinator) createReplicaJobs( // - additional memory consumption // - stale state of one of the git data stores if err := c.datastore.UpdateReplJobState(jobID, datastore.JobStateReady); err != nil { - c.log.WithField("job_id", jobID).WithError(err).Errorf("error when updating replication job to %d", datastore.JobStateReady) + c.log.WithField("job_id", jobID).WithError(err).Errorf("error when updating replication job to %q", datastore.JobStateReady) } } }, nil diff --git a/internal/praefect/coordinator_test.go b/internal/praefect/coordinator_test.go index 9cd4d6916..1ef4e4520 100644 --- a/internal/praefect/coordinator_test.go +++ b/internal/praefect/coordinator_test.go @@ -93,7 +93,7 @@ func TestStreamDirector(t *testing.T) { require.NoError(t, err) require.Equal(t, "praefect-internal-1", rewrittenRepo.GetStorageName(), "stream director should have rewritten the storage name") - jobs, err := ds.GetJobs(datastore.JobStatePending, "praefect-internal-2", 10) + jobs, err := ds.GetJobs([]datastore.JobState{datastore.JobStatePending}, "praefect-internal-2", 10) require.NoError(t, err) require.Len(t, jobs, 1) @@ -116,7 +116,7 @@ func TestStreamDirector(t *testing.T) { streamParams.RequestFinalizer() - jobs, err = coordinator.datastore.GetJobs(datastore.JobStateReady, "praefect-internal-2", 10) + jobs, err = coordinator.datastore.GetJobs([]datastore.JobState{datastore.JobStateReady}, "praefect-internal-2", 10) require.NoError(t, err) require.Len(t, jobs, 1) diff --git a/internal/praefect/datastore/datastore.go b/internal/praefect/datastore/datastore.go index a16743a13..13af7212f 100644 --- a/internal/praefect/datastore/datastore.go +++ b/internal/praefect/datastore/datastore.go @@ -15,57 +15,47 @@ import ( "gitlab.com/gitlab-org/gitaly/internal/praefect/models" ) -var ( - // ErrPrimaryNotSet indicates the primary has not been set in the datastore - ErrPrimaryNotSet = errors.New("primary is not set") -) - // JobState is an enum that indicates the state of a job -type JobState uint8 +type JobState string + +func (js JobState) String() string { + return string(js) +} const ( // JobStatePending is the initial job state when it is not yet ready to run - // and may indicate recovery from a failure prior to the ready-state - JobStatePending JobState = 1 << iota - // JobStateReady indicates the job is now ready to proceed - JobStateReady - // JobStateInProgress indicates the job is being processed by a worker - JobStateInProgress - // JobStateComplete indicates the job is now complete - JobStateComplete + // and may indicate recovery from a failure prior to the ready-state. + JobStatePending = JobState("pending") + // JobStateReady indicates the job is now ready to proceed. + JobStateReady = JobState("ready") + // JobStateInProgress indicates the job is being processed by a worker. + JobStateInProgress = JobState("in_progress") + // JobStateCompleted indicates the job is now complete. + JobStateCompleted = JobState("completed") // JobStateCancelled indicates the job was cancelled. This can occur if the - // job is no longer relevant (e.g. a node is moved out of a repository) - JobStateCancelled + // job is no longer relevant (e.g. a node is moved out of a repository). + JobStateCancelled = JobState("cancelled") // JobStateFailed indicates the job did not succeed. The Replicator will retry // failed jobs. - JobStateFailed - // JobStateDead indicates the job was retried up to the maximum retries - JobStateDead + JobStateFailed = JobState("failed") + // JobStateDead indicates the job was retried up to the maximum retries. + JobStateDead = JobState("dead") ) // ChangeType indicates what kind of change the replication is propagating -type ChangeType int +type ChangeType string const ( // UpdateRepo is when a replication updates a repository in place - UpdateRepo ChangeType = iota + 1 + UpdateRepo = ChangeType("update") // DeleteRepo is when a replication deletes a repo - DeleteRepo + DeleteRepo = ChangeType("delete") // RenameRepo is when a replication renames repo - RenameRepo + RenameRepo = ChangeType("rename") ) func (ct ChangeType) String() string { - switch ct { - case UpdateRepo: - return "update" - case DeleteRepo: - return "delete" - case RenameRepo: - return "rename" - default: - return "UNDEFINED" - } + return string(ct) } // Params represent additional information required to process event after fetching it from storage. @@ -123,7 +113,7 @@ type ReplJobsDatastore interface { // GetJobs fetches a list of chronologically ordered replication // jobs for the given storage replica. The returned list will be at most // count-length. - GetJobs(flag JobState, nodeStorage string, count int) ([]ReplJob, error) + GetJobs(states []JobState, nodeStorage string, count int) ([]ReplJob, error) // CreateReplicaReplJobs will create replication jobs for each secondary // replica of a repository known to the datastore. A set of replication job @@ -264,12 +254,13 @@ func (md *MemoryDatastore) GetStorageNodes() ([]models.Node, error) { return storageNodes, nil } -// ErrReplicasMissing indicates the repository does not have any backup -// replicas -var ErrReplicasMissing = errors.New("repository missing secondary replicas") - // GetJobs is a more general method to retrieve jobs of a certain state from the datastore -func (md *MemoryDatastore) GetJobs(state JobState, targetNodeStorage string, count int) ([]ReplJob, error) { +func (md *MemoryDatastore) GetJobs(states []JobState, targetNodeStorage string, count int) ([]ReplJob, error) { + statesSet := make(map[JobState]bool, len(states)) + for _, state := range states { + statesSet[state] = true + } + md.jobs.RLock() defer md.jobs.RUnlock() @@ -277,7 +268,7 @@ func (md *MemoryDatastore) GetJobs(state JobState, targetNodeStorage string, cou for i, record := range md.jobs.records { // state is a bitmap that is a combination of one or more JobStates - if record.state&state != 0 && record.targetNodeStorage == targetNodeStorage { + if statesSet[record.state] && record.targetNodeStorage == targetNodeStorage { job, err := md.replJobFromRecord(i, record) if err != nil { return nil, err @@ -319,10 +310,6 @@ func (md *MemoryDatastore) replJobFromRecord(jobID uint64, record jobRecord) (Re }, nil } -// ErrInvalidReplTarget indicates a targetStorage repository cannot be chosen because -// it fails preconditions for being replicatable -var ErrInvalidReplTarget = errors.New("targetStorage repository fails preconditions for replication") - // CreateReplicaReplJobs creates a replication job for each secondary that // backs the specified repository. Upon success, the job IDs will be returned. func (md *MemoryDatastore) CreateReplicaReplJobs( @@ -369,7 +356,7 @@ func (md *MemoryDatastore) UpdateReplJobState(jobID uint64, newState JobState) e return fmt.Errorf("job ID %d does not exist", jobID) } - if newState == JobStateComplete || newState == JobStateCancelled { + if newState == JobStateCompleted || newState == JobStateCancelled { // remove the job to avoid filling up memory with unneeded job records delete(md.jobs.records, jobID) return nil diff --git a/internal/praefect/datastore/datastore_test.go b/internal/praefect/datastore/datastore_test.go index f4663bb66..f6fcddfd5 100644 --- a/internal/praefect/datastore/datastore_test.go +++ b/internal/praefect/datastore/datastore_test.go @@ -36,7 +36,7 @@ var operations = []struct { { desc: "query an empty datastore", opFn: func(t *testing.T, ds Datastore) { - jobs, err := ds.GetJobs(JobStatePending|JobStateReady, stor1.Storage, 1) + jobs, err := ds.GetJobs([]JobState{JobStatePending, JobStateReady}, stor1.Storage, 1) require.NoError(t, err) require.Len(t, jobs, 0) }, @@ -58,7 +58,7 @@ var operations = []struct { { desc: "fetch inserted replication jobs", opFn: func(t *testing.T, ds Datastore) { - jobs, err := ds.GetJobs(JobStatePending, stor2.Storage, 10) + jobs, err := ds.GetJobs([]JobState{JobStatePending}, stor2.Storage, 10) require.NoError(t, err) require.Len(t, jobs, 2) @@ -88,14 +88,14 @@ var operations = []struct { { desc: "mark Update replication job as done", opFn: func(t *testing.T, ds Datastore) { - err := ds.UpdateReplJobState(1, JobStateComplete) + err := ds.UpdateReplJobState(1, JobStateCompleted) require.NoError(t, err) }, }, { desc: "try fetching pending replication jobs", opFn: func(t *testing.T, ds Datastore) { - jobs, err := ds.GetJobs(JobStatePending, stor2.Storage, 1) + jobs, err := ds.GetJobs([]JobState{JobStatePending}, stor2.Storage, 1) require.NoError(t, err) require.Len(t, jobs, 1) diff --git a/internal/praefect/datastore/queue.go b/internal/praefect/datastore/queue.go index db0c0edf4..60888af73 100644 --- a/internal/praefect/datastore/queue.go +++ b/internal/praefect/datastore/queue.go @@ -17,15 +17,19 @@ type ReplicationEventQueue interface { Enqueue(ctx context.Context, event ReplicationEvent) (ReplicationEvent, error) // Dequeue retrieves events from the persistent queue using provided limitations and filters. Dequeue(ctx context.Context, nodeStorage string, count int) ([]ReplicationEvent, error) + // Acknowledge updates previously dequeued events with new state releasing resources acquired for it. + // It only updates events that are in 'in_progress' state. + // It returns list of ids that was actually acknowledged. + Acknowledge(ctx context.Context, state JobState, ids []uint64) ([]uint64, error) } // ReplicationJob is a persistent representation of the replication job. type ReplicationJob struct { - Change string `json:"change"` - RelativePath string `json:"relative_path"` - TargetNodeStorage string `json:"target_node_storage"` - SourceNodeStorage string `json:"source_node_storage"` - Params Params `json:"params"` + Change ChangeType `json:"change"` + RelativePath string `json:"relative_path"` + TargetNodeStorage string `json:"target_node_storage"` + SourceNodeStorage string `json:"source_node_storage"` + Params Params `json:"params"` } func (job *ReplicationJob) Scan(value interface{}) error { @@ -48,7 +52,7 @@ func (job ReplicationJob) Value() (driver.Value, error) { // ReplicationEvent is a persistent representation of the replication event. type ReplicationEvent struct { ID uint64 - State string + State JobState Attempt int LockID string CreatedAt time.Time @@ -205,7 +209,7 @@ ORDER BY id // Acknowledge updates previously dequeued events with new state releasing resources acquired for it. // It only updates events that are in 'in_progress' state. // It returns list of ids that was actually acknowledged. -func (rq PostgresReplicationEventQueue) Acknowledge(ctx context.Context, state string, ids []uint64) ([]uint64, error) { +func (rq PostgresReplicationEventQueue) Acknowledge(ctx context.Context, state JobState, ids []uint64) ([]uint64, error) { if len(ids) == 0 { return nil, nil } diff --git a/internal/praefect/datastore/queue_test.go b/internal/praefect/datastore/queue_test.go index 3294ec68a..749c45ca8 100644 --- a/internal/praefect/datastore/queue_test.go +++ b/internal/praefect/datastore/queue_test.go @@ -22,7 +22,7 @@ func TestPostgresReplicationEventQueue_Enqueue(t *testing.T) { eventType := ReplicationEvent{ Job: ReplicationJob{ - Change: UpdateRepo.String(), + Change: UpdateRepo, RelativePath: "/project/path-1", TargetNodeStorage: "gitaly-1", SourceNodeStorage: "gitaly-0", @@ -38,11 +38,11 @@ func TestPostgresReplicationEventQueue_Enqueue(t *testing.T) { expEvent := ReplicationEvent{ ID: 1, - State: "ready", + State: JobStateReady, Attempt: 3, LockID: "gitaly-1|/project/path-1", Job: ReplicationJob{ - Change: "update", + Change: UpdateRepo, RelativePath: "/project/path-1", TargetNodeStorage: "gitaly-1", SourceNodeStorage: "gitaly-0", @@ -66,7 +66,7 @@ func TestPostgresReplicationEventQueue_EnqueueMultiple(t *testing.T) { eventType1 := ReplicationEvent{ Job: ReplicationJob{ - Change: UpdateRepo.String(), + Change: UpdateRepo, RelativePath: "/project/path-1", TargetNodeStorage: "gitaly-1", SourceNodeStorage: "gitaly-0", @@ -76,7 +76,7 @@ func TestPostgresReplicationEventQueue_EnqueueMultiple(t *testing.T) { eventType2 := ReplicationEvent{ Job: ReplicationJob{ - Change: RenameRepo.String(), + Change: RenameRepo, RelativePath: "/project/path-1", TargetNodeStorage: "gitaly-2", SourceNodeStorage: "", @@ -86,7 +86,7 @@ func TestPostgresReplicationEventQueue_EnqueueMultiple(t *testing.T) { eventType3 := ReplicationEvent{ Job: ReplicationJob{ - Change: UpdateRepo.String(), + Change: UpdateRepo, RelativePath: "/project/path-2", TargetNodeStorage: "gitaly-1", SourceNodeStorage: "gitaly-0", @@ -107,7 +107,7 @@ func TestPostgresReplicationEventQueue_EnqueueMultiple(t *testing.T) { Attempt: 3, LockID: "gitaly-1|/project/path-1", Job: ReplicationJob{ - Change: "update", + Change: UpdateRepo, RelativePath: "/project/path-1", TargetNodeStorage: "gitaly-1", SourceNodeStorage: "gitaly-0", @@ -128,7 +128,7 @@ func TestPostgresReplicationEventQueue_EnqueueMultiple(t *testing.T) { Attempt: 3, LockID: "gitaly-1|/project/path-1", Job: ReplicationJob{ - Change: "update", + Change: UpdateRepo, RelativePath: "/project/path-1", TargetNodeStorage: "gitaly-1", SourceNodeStorage: "gitaly-0", @@ -144,11 +144,11 @@ func TestPostgresReplicationEventQueue_EnqueueMultiple(t *testing.T) { expEvent3 := ReplicationEvent{ ID: event3.ID, - State: "ready", + State: JobStateReady, Attempt: 3, LockID: "gitaly-2|/project/path-1", Job: ReplicationJob{ - Change: "rename", + Change: RenameRepo, RelativePath: "/project/path-1", TargetNodeStorage: "gitaly-2", SourceNodeStorage: "", @@ -164,11 +164,11 @@ func TestPostgresReplicationEventQueue_EnqueueMultiple(t *testing.T) { expEvent4 := ReplicationEvent{ ID: event4.ID, - State: "ready", + State: JobStateReady, Attempt: 3, LockID: "gitaly-1|/project/path-2", Job: ReplicationJob{ - Change: "update", + Change: UpdateRepo, RelativePath: "/project/path-2", TargetNodeStorage: "gitaly-1", SourceNodeStorage: "gitaly-0", @@ -192,7 +192,7 @@ func TestPostgresReplicationEventQueue_Dequeue(t *testing.T) { event := ReplicationEvent{ Job: ReplicationJob{ - Change: UpdateRepo.String(), + Change: UpdateRepo, RelativePath: "/project/path-1", TargetNodeStorage: "gitaly-1", SourceNodeStorage: "gitaly-0", @@ -208,7 +208,7 @@ func TestPostgresReplicationEventQueue_Dequeue(t *testing.T) { require.Len(t, noEvents, 0, "there must be no events dequeued for not existing storage") expectedEvent := event - expectedEvent.State = "in_progress" + expectedEvent.State = JobStateInProgress expectedEvent.Attempt = 2 expectedLock := LockRow{ID: event.LockID, Acquired: true} // as we deque events we acquire lock for processing @@ -239,7 +239,7 @@ func TestPostgresReplicationEventQueue_DequeueMultiple(t *testing.T) { eventType1 := ReplicationEvent{ Job: ReplicationJob{ - Change: UpdateRepo.String(), + Change: UpdateRepo, RelativePath: "/project/path-1", TargetNodeStorage: "gitaly-1", SourceNodeStorage: "gitaly-0", @@ -249,7 +249,7 @@ func TestPostgresReplicationEventQueue_DequeueMultiple(t *testing.T) { eventType2 := ReplicationEvent{ Job: ReplicationJob{ - Change: DeleteRepo.String(), + Change: DeleteRepo, RelativePath: "/project/path-1", TargetNodeStorage: "gitaly-1", SourceNodeStorage: "", @@ -259,7 +259,7 @@ func TestPostgresReplicationEventQueue_DequeueMultiple(t *testing.T) { eventType3 := ReplicationEvent{ Job: ReplicationJob{ - Change: RenameRepo.String(), + Change: RenameRepo, RelativePath: "/project/path-2", TargetNodeStorage: "gitaly-1", SourceNodeStorage: "gitaly-0", @@ -281,7 +281,7 @@ func TestPostgresReplicationEventQueue_DequeueMultiple(t *testing.T) { expectedJobLocks1 := make([]JobLockRow, limitFirstN) for i := range expectedEvents1 { expectedEvents1[i] = events[i] - expectedEvents1[i].State = "in_progress" + expectedEvents1[i].State = JobStateInProgress expectedEvents1[i].Attempt = 2 expectedJobLocks1[i].JobID = expectedEvents1[i].ID @@ -307,7 +307,7 @@ func TestPostgresReplicationEventQueue_DequeueMultiple(t *testing.T) { // there must be only last event fetched from the queue expectedEvents2 := []ReplicationEvent{events[len(events)-1]} - expectedEvents2[0].State = "in_progress" + expectedEvents2[0].State = JobStateInProgress expectedEvents2[0].Attempt = 2 expectedJobLocks2 := []JobLockRow{{JobID: 5, LockID: "gitaly-1|/project/path-2"}} @@ -342,7 +342,7 @@ func TestPostgresReplicationEventQueue_Acknowledge(t *testing.T) { event := ReplicationEvent{ Job: ReplicationJob{ - Change: UpdateRepo.String(), + Change: UpdateRepo, RelativePath: "/project/path-1", TargetNodeStorage: "gitaly-1", SourceNodeStorage: "gitaly-0", @@ -360,11 +360,11 @@ func TestPostgresReplicationEventQueue_Acknowledge(t *testing.T) { requireLocks(t, ctx, db, []LockRow{{ID: event.LockID, Acquired: true}}) requireJobLocks(t, ctx, db, []JobLockRow{{JobID: event.ID, LockID: event.LockID}}) - acknowledged, err := queue.Acknowledge(ctx, "completed", []uint64{actual[0].ID, 100500}) + acknowledged, err := queue.Acknowledge(ctx, JobStateCompleted, []uint64{actual[0].ID, 100500}) require.NoError(t, err) require.Equal(t, []uint64{actual[0].ID}, acknowledged) - event.State = "completed" + event.State = JobStateCompleted event.Attempt = 2 requireEvents(t, ctx, db, []ReplicationEvent{event}) // lock must be released as the event was acknowledged and there are no other events left protected under this lock @@ -383,7 +383,7 @@ func TestPostgresReplicationEventQueue_AcknowledgeMultiple(t *testing.T) { eventType1 := ReplicationEvent{ Job: ReplicationJob{ - Change: UpdateRepo.String(), + Change: UpdateRepo, RelativePath: "/project/path-1", TargetNodeStorage: "gitaly-1", SourceNodeStorage: "gitaly-0", @@ -393,7 +393,7 @@ func TestPostgresReplicationEventQueue_AcknowledgeMultiple(t *testing.T) { eventType2 := ReplicationEvent{ Job: ReplicationJob{ - Change: DeleteRepo.String(), + Change: DeleteRepo, RelativePath: "/project/path-2", TargetNodeStorage: "gitaly-1", SourceNodeStorage: "", @@ -403,7 +403,7 @@ func TestPostgresReplicationEventQueue_AcknowledgeMultiple(t *testing.T) { eventType3 := ReplicationEvent{ Job: ReplicationJob{ - Change: UpdateRepo.String(), + Change: UpdateRepo, RelativePath: "/project/path-3", TargetNodeStorage: "gitaly-1", SourceNodeStorage: "gitaly-0", @@ -413,7 +413,7 @@ func TestPostgresReplicationEventQueue_AcknowledgeMultiple(t *testing.T) { eventType4 := ReplicationEvent{ Job: ReplicationJob{ - Change: UpdateRepo.String(), + Change: UpdateRepo, RelativePath: "/project/path-1", TargetNodeStorage: "gitaly-2", SourceNodeStorage: "gitaly-0", @@ -445,7 +445,7 @@ func TestPostgresReplicationEventQueue_AcknowledgeMultiple(t *testing.T) { }) // release lock for events of second type - acknowledge1, err := queue.Acknowledge(ctx, "failed", []uint64{3}) + acknowledge1, err := queue.Acknowledge(ctx, JobStateFailed, []uint64{3}) require.NoError(t, err) require.Equal(t, []uint64{3}, acknowledge1) requireLocks(t, ctx, db, []LockRow{ @@ -475,7 +475,7 @@ func TestPostgresReplicationEventQueue_AcknowledgeMultiple(t *testing.T) { {JobID: 3, LockID: "gitaly-1|/project/path-2"}, }) - acknowledge2, err := queue.Acknowledge(ctx, "completed", []uint64{1, 3}) + acknowledge2, err := queue.Acknowledge(ctx, JobStateCompleted, []uint64{1, 3}) require.NoError(t, err) require.Equal(t, []uint64{1, 3}, acknowledge2) requireLocks(t, ctx, db, []LockRow{ @@ -504,7 +504,7 @@ func TestPostgresReplicationEventQueue_AcknowledgeMultiple(t *testing.T) { {JobID: 7, LockID: "gitaly-2|/project/path-1"}, }) - acknowledged3, err := queue.Acknowledge(ctx, "completed", []uint64{2, 5, 7}) + acknowledged3, err := queue.Acknowledge(ctx, JobStateCompleted, []uint64{2, 5, 7}) require.NoError(t, err) require.Equal(t, []uint64{2, 5, 7}, acknowledged3) requireLocks(t, ctx, db, []LockRow{ @@ -532,7 +532,7 @@ func TestPostgresReplicationEventQueue_AcknowledgeMultiple(t *testing.T) { newEvent, err := queue.Enqueue(ctx, eventType1) require.NoError(t, err) - acknowledge4, err := queue.Acknowledge(ctx, "completed", []uint64{newEvent.ID}) + acknowledge4, err := queue.Acknowledge(ctx, JobStateCompleted, []uint64{newEvent.ID}) require.NoError(t, err) require.Equal(t, ([]uint64)(nil), acknowledge4) // event that was not dequeued can't be acknowledged var newEventState string diff --git a/internal/praefect/replicator.go b/internal/praefect/replicator.go index 769d3a56c..70ca77537 100644 --- a/internal/praefect/replicator.go +++ b/internal/praefect/replicator.go @@ -305,7 +305,7 @@ func (r ReplMgr) ProcessBacklog(ctx context.Context, b BackoffFunc) error { primary, secondaries, err := r.getPrimaryAndSecondaries() if err == nil { for _, secondary := range secondaries { - jobs, err := r.datastore.GetJobs(datastore.JobStateReady|datastore.JobStateFailed, secondary.GetStorage(), 10) + jobs, err := r.datastore.GetJobs([]datastore.JobState{datastore.JobStateReady, datastore.JobStateFailed}, secondary.GetStorage(), 10) if err != nil { return err } @@ -429,7 +429,7 @@ func (r ReplMgr) processReplJob(ctx context.Context, job datastore.ReplJob, sour case datastore.RenameRepo: err = r.replicator.Rename(injectedCtx, job, targetCC) default: - err = fmt.Errorf("unknown replication change type encountered: %d", job.Change) + err = fmt.Errorf("unknown replication change type encountered: %q", job.Change) } if err != nil { l.WithError(err).Error("unable to replicate") @@ -439,7 +439,7 @@ func (r ReplMgr) processReplJob(ctx context.Context, job datastore.ReplJob, sour replDuration := time.Since(replStart) r.replLatencyMetric.Observe(float64(replDuration) / float64(time.Second)) - if err := r.datastore.UpdateReplJobState(job.ID, datastore.JobStateComplete); err != nil { + if err := r.datastore.UpdateReplJobState(job.ID, datastore.JobStateCompleted); err != nil { r.log.WithError(err).Error("error when updating replication job status to complete") } diff --git a/internal/praefect/replicator_test.go b/internal/praefect/replicator_test.go index ca124a7e0..5110c49c8 100644 --- a/internal/praefect/replicator_test.go +++ b/internal/praefect/replicator_test.go @@ -127,7 +127,7 @@ func TestProcessReplicationJob(t *testing.T) { _, err = ds.CreateReplicaReplJobs(testRepo.GetRelativePath(), primary.Storage, secondaryStorages, datastore.UpdateRepo, nil) require.NoError(t, err) - jobs, err := ds.GetJobs(datastore.JobStateReady|datastore.JobStatePending, backupStorageName, 1) + jobs, err := ds.GetJobs([]datastore.JobState{datastore.JobStateReady, datastore.JobStatePending}, backupStorageName, 1) require.NoError(t, err) require.Len(t, jobs, 1) @@ -295,7 +295,7 @@ TestJobGetsCancelled: for { select { case <-ticker.C: - replJobs, err := ds.GetJobs(datastore.JobStateDead, "backup", 10) + replJobs, err := ds.GetJobs([]datastore.JobState{datastore.JobStateDead}, "backup", 10) require.NoError(t, err) if len(replJobs) == 1 { //success @@ -426,7 +426,7 @@ TestJobSucceeds: for { select { case <-ticker.C: - replJobs, err := ds.GetJobs(datastore.JobStateFailed|datastore.JobStateInProgress|datastore.JobStateReady|datastore.JobStateDead, "backup", 10) + replJobs, err := ds.GetJobs([]datastore.JobState{datastore.JobStateFailed, datastore.JobStateInProgress, datastore.JobStateReady, datastore.JobStateDead}, "backup", 10) require.NoError(t, err) if len(replJobs) == 0 { //success |