Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPatrick Steinhardt <psteinhardt@gitlab.com>2020-03-11 13:33:07 +0300
committerPatrick Steinhardt <psteinhardt@gitlab.com>2020-03-11 13:33:07 +0300
commitb58a50ae0993cdfbb9aa39abdae2deb8c9f478ab (patch)
tree449ce0288bc847ccb34149ca5dbfed9d599a54b1
parentaa910b0b6fbc3f005a9924dee731123dfca2e48d (diff)
parent4341cd464913a32da78c19806243c0e25c54f917 (diff)
Merge branch 'pa-aline-replication-job-status-type' into 'master'
Praefect: use enum values for job states See merge request gitlab-org/gitaly!1906
-rw-r--r--changelogs/unreleased/pa-aline-replication-job-status-type.yml5
-rw-r--r--internal/praefect/coordinator.go4
-rw-r--r--internal/praefect/coordinator_test.go4
-rw-r--r--internal/praefect/datastore/datastore.go77
-rw-r--r--internal/praefect/datastore/datastore_test.go8
-rw-r--r--internal/praefect/datastore/queue.go18
-rw-r--r--internal/praefect/datastore/queue_test.go60
-rw-r--r--internal/praefect/replicator.go6
-rw-r--r--internal/praefect/replicator_test.go6
9 files changed, 92 insertions, 96 deletions
diff --git a/changelogs/unreleased/pa-aline-replication-job-status-type.yml b/changelogs/unreleased/pa-aline-replication-job-status-type.yml
new file mode 100644
index 000000000..2c03a5a93
--- /dev/null
+++ b/changelogs/unreleased/pa-aline-replication-job-status-type.yml
@@ -0,0 +1,5 @@
+---
+title: 'Praefect: use enum values for job states'
+merge_request: 1906
+author:
+type: changed
diff --git a/internal/praefect/coordinator.go b/internal/praefect/coordinator.go
index e4cf73589..ae4ebcb61 100644
--- a/internal/praefect/coordinator.go
+++ b/internal/praefect/coordinator.go
@@ -24,7 +24,7 @@ func getReplicationDetails(methodName string, m proto.Message) (datastore.Change
case "/gitaly.RepositoryService/RenameRepository":
req, ok := m.(*gitalypb.RenameRepositoryRequest)
if !ok {
- return 0, nil, fmt.Errorf("protocol changed: for method %q expected message type '%T', got '%T'", methodName, req, m)
+ return "", nil, fmt.Errorf("protocol changed: for method %q expected message type '%T', got '%T'", methodName, req, m)
}
return datastore.RenameRepo, datastore.Params{"RelativePath": req.RelativePath}, nil
default:
@@ -204,7 +204,7 @@ func (c *Coordinator) createReplicaJobs(
// - additional memory consumption
// - stale state of one of the git data stores
if err := c.datastore.UpdateReplJobState(jobID, datastore.JobStateReady); err != nil {
- c.log.WithField("job_id", jobID).WithError(err).Errorf("error when updating replication job to %d", datastore.JobStateReady)
+ c.log.WithField("job_id", jobID).WithError(err).Errorf("error when updating replication job to %q", datastore.JobStateReady)
}
}
}, nil
diff --git a/internal/praefect/coordinator_test.go b/internal/praefect/coordinator_test.go
index 9cd4d6916..1ef4e4520 100644
--- a/internal/praefect/coordinator_test.go
+++ b/internal/praefect/coordinator_test.go
@@ -93,7 +93,7 @@ func TestStreamDirector(t *testing.T) {
require.NoError(t, err)
require.Equal(t, "praefect-internal-1", rewrittenRepo.GetStorageName(), "stream director should have rewritten the storage name")
- jobs, err := ds.GetJobs(datastore.JobStatePending, "praefect-internal-2", 10)
+ jobs, err := ds.GetJobs([]datastore.JobState{datastore.JobStatePending}, "praefect-internal-2", 10)
require.NoError(t, err)
require.Len(t, jobs, 1)
@@ -116,7 +116,7 @@ func TestStreamDirector(t *testing.T) {
streamParams.RequestFinalizer()
- jobs, err = coordinator.datastore.GetJobs(datastore.JobStateReady, "praefect-internal-2", 10)
+ jobs, err = coordinator.datastore.GetJobs([]datastore.JobState{datastore.JobStateReady}, "praefect-internal-2", 10)
require.NoError(t, err)
require.Len(t, jobs, 1)
diff --git a/internal/praefect/datastore/datastore.go b/internal/praefect/datastore/datastore.go
index a16743a13..13af7212f 100644
--- a/internal/praefect/datastore/datastore.go
+++ b/internal/praefect/datastore/datastore.go
@@ -15,57 +15,47 @@ import (
"gitlab.com/gitlab-org/gitaly/internal/praefect/models"
)
-var (
- // ErrPrimaryNotSet indicates the primary has not been set in the datastore
- ErrPrimaryNotSet = errors.New("primary is not set")
-)
-
// JobState is an enum that indicates the state of a job
-type JobState uint8
+type JobState string
+
+func (js JobState) String() string {
+ return string(js)
+}
const (
// JobStatePending is the initial job state when it is not yet ready to run
- // and may indicate recovery from a failure prior to the ready-state
- JobStatePending JobState = 1 << iota
- // JobStateReady indicates the job is now ready to proceed
- JobStateReady
- // JobStateInProgress indicates the job is being processed by a worker
- JobStateInProgress
- // JobStateComplete indicates the job is now complete
- JobStateComplete
+ // and may indicate recovery from a failure prior to the ready-state.
+ JobStatePending = JobState("pending")
+ // JobStateReady indicates the job is now ready to proceed.
+ JobStateReady = JobState("ready")
+ // JobStateInProgress indicates the job is being processed by a worker.
+ JobStateInProgress = JobState("in_progress")
+ // JobStateCompleted indicates the job is now complete.
+ JobStateCompleted = JobState("completed")
// JobStateCancelled indicates the job was cancelled. This can occur if the
- // job is no longer relevant (e.g. a node is moved out of a repository)
- JobStateCancelled
+ // job is no longer relevant (e.g. a node is moved out of a repository).
+ JobStateCancelled = JobState("cancelled")
// JobStateFailed indicates the job did not succeed. The Replicator will retry
// failed jobs.
- JobStateFailed
- // JobStateDead indicates the job was retried up to the maximum retries
- JobStateDead
+ JobStateFailed = JobState("failed")
+ // JobStateDead indicates the job was retried up to the maximum retries.
+ JobStateDead = JobState("dead")
)
// ChangeType indicates what kind of change the replication is propagating
-type ChangeType int
+type ChangeType string
const (
// UpdateRepo is when a replication updates a repository in place
- UpdateRepo ChangeType = iota + 1
+ UpdateRepo = ChangeType("update")
// DeleteRepo is when a replication deletes a repo
- DeleteRepo
+ DeleteRepo = ChangeType("delete")
// RenameRepo is when a replication renames repo
- RenameRepo
+ RenameRepo = ChangeType("rename")
)
func (ct ChangeType) String() string {
- switch ct {
- case UpdateRepo:
- return "update"
- case DeleteRepo:
- return "delete"
- case RenameRepo:
- return "rename"
- default:
- return "UNDEFINED"
- }
+ return string(ct)
}
// Params represent additional information required to process event after fetching it from storage.
@@ -123,7 +113,7 @@ type ReplJobsDatastore interface {
// GetJobs fetches a list of chronologically ordered replication
// jobs for the given storage replica. The returned list will be at most
// count-length.
- GetJobs(flag JobState, nodeStorage string, count int) ([]ReplJob, error)
+ GetJobs(states []JobState, nodeStorage string, count int) ([]ReplJob, error)
// CreateReplicaReplJobs will create replication jobs for each secondary
// replica of a repository known to the datastore. A set of replication job
@@ -264,12 +254,13 @@ func (md *MemoryDatastore) GetStorageNodes() ([]models.Node, error) {
return storageNodes, nil
}
-// ErrReplicasMissing indicates the repository does not have any backup
-// replicas
-var ErrReplicasMissing = errors.New("repository missing secondary replicas")
-
// GetJobs is a more general method to retrieve jobs of a certain state from the datastore
-func (md *MemoryDatastore) GetJobs(state JobState, targetNodeStorage string, count int) ([]ReplJob, error) {
+func (md *MemoryDatastore) GetJobs(states []JobState, targetNodeStorage string, count int) ([]ReplJob, error) {
+ statesSet := make(map[JobState]bool, len(states))
+ for _, state := range states {
+ statesSet[state] = true
+ }
+
md.jobs.RLock()
defer md.jobs.RUnlock()
@@ -277,7 +268,7 @@ func (md *MemoryDatastore) GetJobs(state JobState, targetNodeStorage string, cou
for i, record := range md.jobs.records {
// state is a bitmap that is a combination of one or more JobStates
- if record.state&state != 0 && record.targetNodeStorage == targetNodeStorage {
+ if statesSet[record.state] && record.targetNodeStorage == targetNodeStorage {
job, err := md.replJobFromRecord(i, record)
if err != nil {
return nil, err
@@ -319,10 +310,6 @@ func (md *MemoryDatastore) replJobFromRecord(jobID uint64, record jobRecord) (Re
}, nil
}
-// ErrInvalidReplTarget indicates a targetStorage repository cannot be chosen because
-// it fails preconditions for being replicatable
-var ErrInvalidReplTarget = errors.New("targetStorage repository fails preconditions for replication")
-
// CreateReplicaReplJobs creates a replication job for each secondary that
// backs the specified repository. Upon success, the job IDs will be returned.
func (md *MemoryDatastore) CreateReplicaReplJobs(
@@ -369,7 +356,7 @@ func (md *MemoryDatastore) UpdateReplJobState(jobID uint64, newState JobState) e
return fmt.Errorf("job ID %d does not exist", jobID)
}
- if newState == JobStateComplete || newState == JobStateCancelled {
+ if newState == JobStateCompleted || newState == JobStateCancelled {
// remove the job to avoid filling up memory with unneeded job records
delete(md.jobs.records, jobID)
return nil
diff --git a/internal/praefect/datastore/datastore_test.go b/internal/praefect/datastore/datastore_test.go
index f4663bb66..f6fcddfd5 100644
--- a/internal/praefect/datastore/datastore_test.go
+++ b/internal/praefect/datastore/datastore_test.go
@@ -36,7 +36,7 @@ var operations = []struct {
{
desc: "query an empty datastore",
opFn: func(t *testing.T, ds Datastore) {
- jobs, err := ds.GetJobs(JobStatePending|JobStateReady, stor1.Storage, 1)
+ jobs, err := ds.GetJobs([]JobState{JobStatePending, JobStateReady}, stor1.Storage, 1)
require.NoError(t, err)
require.Len(t, jobs, 0)
},
@@ -58,7 +58,7 @@ var operations = []struct {
{
desc: "fetch inserted replication jobs",
opFn: func(t *testing.T, ds Datastore) {
- jobs, err := ds.GetJobs(JobStatePending, stor2.Storage, 10)
+ jobs, err := ds.GetJobs([]JobState{JobStatePending}, stor2.Storage, 10)
require.NoError(t, err)
require.Len(t, jobs, 2)
@@ -88,14 +88,14 @@ var operations = []struct {
{
desc: "mark Update replication job as done",
opFn: func(t *testing.T, ds Datastore) {
- err := ds.UpdateReplJobState(1, JobStateComplete)
+ err := ds.UpdateReplJobState(1, JobStateCompleted)
require.NoError(t, err)
},
},
{
desc: "try fetching pending replication jobs",
opFn: func(t *testing.T, ds Datastore) {
- jobs, err := ds.GetJobs(JobStatePending, stor2.Storage, 1)
+ jobs, err := ds.GetJobs([]JobState{JobStatePending}, stor2.Storage, 1)
require.NoError(t, err)
require.Len(t, jobs, 1)
diff --git a/internal/praefect/datastore/queue.go b/internal/praefect/datastore/queue.go
index db0c0edf4..60888af73 100644
--- a/internal/praefect/datastore/queue.go
+++ b/internal/praefect/datastore/queue.go
@@ -17,15 +17,19 @@ type ReplicationEventQueue interface {
Enqueue(ctx context.Context, event ReplicationEvent) (ReplicationEvent, error)
// Dequeue retrieves events from the persistent queue using provided limitations and filters.
Dequeue(ctx context.Context, nodeStorage string, count int) ([]ReplicationEvent, error)
+ // Acknowledge updates previously dequeued events with new state releasing resources acquired for it.
+ // It only updates events that are in 'in_progress' state.
+ // It returns list of ids that was actually acknowledged.
+ Acknowledge(ctx context.Context, state JobState, ids []uint64) ([]uint64, error)
}
// ReplicationJob is a persistent representation of the replication job.
type ReplicationJob struct {
- Change string `json:"change"`
- RelativePath string `json:"relative_path"`
- TargetNodeStorage string `json:"target_node_storage"`
- SourceNodeStorage string `json:"source_node_storage"`
- Params Params `json:"params"`
+ Change ChangeType `json:"change"`
+ RelativePath string `json:"relative_path"`
+ TargetNodeStorage string `json:"target_node_storage"`
+ SourceNodeStorage string `json:"source_node_storage"`
+ Params Params `json:"params"`
}
func (job *ReplicationJob) Scan(value interface{}) error {
@@ -48,7 +52,7 @@ func (job ReplicationJob) Value() (driver.Value, error) {
// ReplicationEvent is a persistent representation of the replication event.
type ReplicationEvent struct {
ID uint64
- State string
+ State JobState
Attempt int
LockID string
CreatedAt time.Time
@@ -205,7 +209,7 @@ ORDER BY id
// Acknowledge updates previously dequeued events with new state releasing resources acquired for it.
// It only updates events that are in 'in_progress' state.
// It returns list of ids that was actually acknowledged.
-func (rq PostgresReplicationEventQueue) Acknowledge(ctx context.Context, state string, ids []uint64) ([]uint64, error) {
+func (rq PostgresReplicationEventQueue) Acknowledge(ctx context.Context, state JobState, ids []uint64) ([]uint64, error) {
if len(ids) == 0 {
return nil, nil
}
diff --git a/internal/praefect/datastore/queue_test.go b/internal/praefect/datastore/queue_test.go
index 3294ec68a..749c45ca8 100644
--- a/internal/praefect/datastore/queue_test.go
+++ b/internal/praefect/datastore/queue_test.go
@@ -22,7 +22,7 @@ func TestPostgresReplicationEventQueue_Enqueue(t *testing.T) {
eventType := ReplicationEvent{
Job: ReplicationJob{
- Change: UpdateRepo.String(),
+ Change: UpdateRepo,
RelativePath: "/project/path-1",
TargetNodeStorage: "gitaly-1",
SourceNodeStorage: "gitaly-0",
@@ -38,11 +38,11 @@ func TestPostgresReplicationEventQueue_Enqueue(t *testing.T) {
expEvent := ReplicationEvent{
ID: 1,
- State: "ready",
+ State: JobStateReady,
Attempt: 3,
LockID: "gitaly-1|/project/path-1",
Job: ReplicationJob{
- Change: "update",
+ Change: UpdateRepo,
RelativePath: "/project/path-1",
TargetNodeStorage: "gitaly-1",
SourceNodeStorage: "gitaly-0",
@@ -66,7 +66,7 @@ func TestPostgresReplicationEventQueue_EnqueueMultiple(t *testing.T) {
eventType1 := ReplicationEvent{
Job: ReplicationJob{
- Change: UpdateRepo.String(),
+ Change: UpdateRepo,
RelativePath: "/project/path-1",
TargetNodeStorage: "gitaly-1",
SourceNodeStorage: "gitaly-0",
@@ -76,7 +76,7 @@ func TestPostgresReplicationEventQueue_EnqueueMultiple(t *testing.T) {
eventType2 := ReplicationEvent{
Job: ReplicationJob{
- Change: RenameRepo.String(),
+ Change: RenameRepo,
RelativePath: "/project/path-1",
TargetNodeStorage: "gitaly-2",
SourceNodeStorage: "",
@@ -86,7 +86,7 @@ func TestPostgresReplicationEventQueue_EnqueueMultiple(t *testing.T) {
eventType3 := ReplicationEvent{
Job: ReplicationJob{
- Change: UpdateRepo.String(),
+ Change: UpdateRepo,
RelativePath: "/project/path-2",
TargetNodeStorage: "gitaly-1",
SourceNodeStorage: "gitaly-0",
@@ -107,7 +107,7 @@ func TestPostgresReplicationEventQueue_EnqueueMultiple(t *testing.T) {
Attempt: 3,
LockID: "gitaly-1|/project/path-1",
Job: ReplicationJob{
- Change: "update",
+ Change: UpdateRepo,
RelativePath: "/project/path-1",
TargetNodeStorage: "gitaly-1",
SourceNodeStorage: "gitaly-0",
@@ -128,7 +128,7 @@ func TestPostgresReplicationEventQueue_EnqueueMultiple(t *testing.T) {
Attempt: 3,
LockID: "gitaly-1|/project/path-1",
Job: ReplicationJob{
- Change: "update",
+ Change: UpdateRepo,
RelativePath: "/project/path-1",
TargetNodeStorage: "gitaly-1",
SourceNodeStorage: "gitaly-0",
@@ -144,11 +144,11 @@ func TestPostgresReplicationEventQueue_EnqueueMultiple(t *testing.T) {
expEvent3 := ReplicationEvent{
ID: event3.ID,
- State: "ready",
+ State: JobStateReady,
Attempt: 3,
LockID: "gitaly-2|/project/path-1",
Job: ReplicationJob{
- Change: "rename",
+ Change: RenameRepo,
RelativePath: "/project/path-1",
TargetNodeStorage: "gitaly-2",
SourceNodeStorage: "",
@@ -164,11 +164,11 @@ func TestPostgresReplicationEventQueue_EnqueueMultiple(t *testing.T) {
expEvent4 := ReplicationEvent{
ID: event4.ID,
- State: "ready",
+ State: JobStateReady,
Attempt: 3,
LockID: "gitaly-1|/project/path-2",
Job: ReplicationJob{
- Change: "update",
+ Change: UpdateRepo,
RelativePath: "/project/path-2",
TargetNodeStorage: "gitaly-1",
SourceNodeStorage: "gitaly-0",
@@ -192,7 +192,7 @@ func TestPostgresReplicationEventQueue_Dequeue(t *testing.T) {
event := ReplicationEvent{
Job: ReplicationJob{
- Change: UpdateRepo.String(),
+ Change: UpdateRepo,
RelativePath: "/project/path-1",
TargetNodeStorage: "gitaly-1",
SourceNodeStorage: "gitaly-0",
@@ -208,7 +208,7 @@ func TestPostgresReplicationEventQueue_Dequeue(t *testing.T) {
require.Len(t, noEvents, 0, "there must be no events dequeued for not existing storage")
expectedEvent := event
- expectedEvent.State = "in_progress"
+ expectedEvent.State = JobStateInProgress
expectedEvent.Attempt = 2
expectedLock := LockRow{ID: event.LockID, Acquired: true} // as we deque events we acquire lock for processing
@@ -239,7 +239,7 @@ func TestPostgresReplicationEventQueue_DequeueMultiple(t *testing.T) {
eventType1 := ReplicationEvent{
Job: ReplicationJob{
- Change: UpdateRepo.String(),
+ Change: UpdateRepo,
RelativePath: "/project/path-1",
TargetNodeStorage: "gitaly-1",
SourceNodeStorage: "gitaly-0",
@@ -249,7 +249,7 @@ func TestPostgresReplicationEventQueue_DequeueMultiple(t *testing.T) {
eventType2 := ReplicationEvent{
Job: ReplicationJob{
- Change: DeleteRepo.String(),
+ Change: DeleteRepo,
RelativePath: "/project/path-1",
TargetNodeStorage: "gitaly-1",
SourceNodeStorage: "",
@@ -259,7 +259,7 @@ func TestPostgresReplicationEventQueue_DequeueMultiple(t *testing.T) {
eventType3 := ReplicationEvent{
Job: ReplicationJob{
- Change: RenameRepo.String(),
+ Change: RenameRepo,
RelativePath: "/project/path-2",
TargetNodeStorage: "gitaly-1",
SourceNodeStorage: "gitaly-0",
@@ -281,7 +281,7 @@ func TestPostgresReplicationEventQueue_DequeueMultiple(t *testing.T) {
expectedJobLocks1 := make([]JobLockRow, limitFirstN)
for i := range expectedEvents1 {
expectedEvents1[i] = events[i]
- expectedEvents1[i].State = "in_progress"
+ expectedEvents1[i].State = JobStateInProgress
expectedEvents1[i].Attempt = 2
expectedJobLocks1[i].JobID = expectedEvents1[i].ID
@@ -307,7 +307,7 @@ func TestPostgresReplicationEventQueue_DequeueMultiple(t *testing.T) {
// there must be only last event fetched from the queue
expectedEvents2 := []ReplicationEvent{events[len(events)-1]}
- expectedEvents2[0].State = "in_progress"
+ expectedEvents2[0].State = JobStateInProgress
expectedEvents2[0].Attempt = 2
expectedJobLocks2 := []JobLockRow{{JobID: 5, LockID: "gitaly-1|/project/path-2"}}
@@ -342,7 +342,7 @@ func TestPostgresReplicationEventQueue_Acknowledge(t *testing.T) {
event := ReplicationEvent{
Job: ReplicationJob{
- Change: UpdateRepo.String(),
+ Change: UpdateRepo,
RelativePath: "/project/path-1",
TargetNodeStorage: "gitaly-1",
SourceNodeStorage: "gitaly-0",
@@ -360,11 +360,11 @@ func TestPostgresReplicationEventQueue_Acknowledge(t *testing.T) {
requireLocks(t, ctx, db, []LockRow{{ID: event.LockID, Acquired: true}})
requireJobLocks(t, ctx, db, []JobLockRow{{JobID: event.ID, LockID: event.LockID}})
- acknowledged, err := queue.Acknowledge(ctx, "completed", []uint64{actual[0].ID, 100500})
+ acknowledged, err := queue.Acknowledge(ctx, JobStateCompleted, []uint64{actual[0].ID, 100500})
require.NoError(t, err)
require.Equal(t, []uint64{actual[0].ID}, acknowledged)
- event.State = "completed"
+ event.State = JobStateCompleted
event.Attempt = 2
requireEvents(t, ctx, db, []ReplicationEvent{event})
// lock must be released as the event was acknowledged and there are no other events left protected under this lock
@@ -383,7 +383,7 @@ func TestPostgresReplicationEventQueue_AcknowledgeMultiple(t *testing.T) {
eventType1 := ReplicationEvent{
Job: ReplicationJob{
- Change: UpdateRepo.String(),
+ Change: UpdateRepo,
RelativePath: "/project/path-1",
TargetNodeStorage: "gitaly-1",
SourceNodeStorage: "gitaly-0",
@@ -393,7 +393,7 @@ func TestPostgresReplicationEventQueue_AcknowledgeMultiple(t *testing.T) {
eventType2 := ReplicationEvent{
Job: ReplicationJob{
- Change: DeleteRepo.String(),
+ Change: DeleteRepo,
RelativePath: "/project/path-2",
TargetNodeStorage: "gitaly-1",
SourceNodeStorage: "",
@@ -403,7 +403,7 @@ func TestPostgresReplicationEventQueue_AcknowledgeMultiple(t *testing.T) {
eventType3 := ReplicationEvent{
Job: ReplicationJob{
- Change: UpdateRepo.String(),
+ Change: UpdateRepo,
RelativePath: "/project/path-3",
TargetNodeStorage: "gitaly-1",
SourceNodeStorage: "gitaly-0",
@@ -413,7 +413,7 @@ func TestPostgresReplicationEventQueue_AcknowledgeMultiple(t *testing.T) {
eventType4 := ReplicationEvent{
Job: ReplicationJob{
- Change: UpdateRepo.String(),
+ Change: UpdateRepo,
RelativePath: "/project/path-1",
TargetNodeStorage: "gitaly-2",
SourceNodeStorage: "gitaly-0",
@@ -445,7 +445,7 @@ func TestPostgresReplicationEventQueue_AcknowledgeMultiple(t *testing.T) {
})
// release lock for events of second type
- acknowledge1, err := queue.Acknowledge(ctx, "failed", []uint64{3})
+ acknowledge1, err := queue.Acknowledge(ctx, JobStateFailed, []uint64{3})
require.NoError(t, err)
require.Equal(t, []uint64{3}, acknowledge1)
requireLocks(t, ctx, db, []LockRow{
@@ -475,7 +475,7 @@ func TestPostgresReplicationEventQueue_AcknowledgeMultiple(t *testing.T) {
{JobID: 3, LockID: "gitaly-1|/project/path-2"},
})
- acknowledge2, err := queue.Acknowledge(ctx, "completed", []uint64{1, 3})
+ acknowledge2, err := queue.Acknowledge(ctx, JobStateCompleted, []uint64{1, 3})
require.NoError(t, err)
require.Equal(t, []uint64{1, 3}, acknowledge2)
requireLocks(t, ctx, db, []LockRow{
@@ -504,7 +504,7 @@ func TestPostgresReplicationEventQueue_AcknowledgeMultiple(t *testing.T) {
{JobID: 7, LockID: "gitaly-2|/project/path-1"},
})
- acknowledged3, err := queue.Acknowledge(ctx, "completed", []uint64{2, 5, 7})
+ acknowledged3, err := queue.Acknowledge(ctx, JobStateCompleted, []uint64{2, 5, 7})
require.NoError(t, err)
require.Equal(t, []uint64{2, 5, 7}, acknowledged3)
requireLocks(t, ctx, db, []LockRow{
@@ -532,7 +532,7 @@ func TestPostgresReplicationEventQueue_AcknowledgeMultiple(t *testing.T) {
newEvent, err := queue.Enqueue(ctx, eventType1)
require.NoError(t, err)
- acknowledge4, err := queue.Acknowledge(ctx, "completed", []uint64{newEvent.ID})
+ acknowledge4, err := queue.Acknowledge(ctx, JobStateCompleted, []uint64{newEvent.ID})
require.NoError(t, err)
require.Equal(t, ([]uint64)(nil), acknowledge4) // event that was not dequeued can't be acknowledged
var newEventState string
diff --git a/internal/praefect/replicator.go b/internal/praefect/replicator.go
index 769d3a56c..70ca77537 100644
--- a/internal/praefect/replicator.go
+++ b/internal/praefect/replicator.go
@@ -305,7 +305,7 @@ func (r ReplMgr) ProcessBacklog(ctx context.Context, b BackoffFunc) error {
primary, secondaries, err := r.getPrimaryAndSecondaries()
if err == nil {
for _, secondary := range secondaries {
- jobs, err := r.datastore.GetJobs(datastore.JobStateReady|datastore.JobStateFailed, secondary.GetStorage(), 10)
+ jobs, err := r.datastore.GetJobs([]datastore.JobState{datastore.JobStateReady, datastore.JobStateFailed}, secondary.GetStorage(), 10)
if err != nil {
return err
}
@@ -429,7 +429,7 @@ func (r ReplMgr) processReplJob(ctx context.Context, job datastore.ReplJob, sour
case datastore.RenameRepo:
err = r.replicator.Rename(injectedCtx, job, targetCC)
default:
- err = fmt.Errorf("unknown replication change type encountered: %d", job.Change)
+ err = fmt.Errorf("unknown replication change type encountered: %q", job.Change)
}
if err != nil {
l.WithError(err).Error("unable to replicate")
@@ -439,7 +439,7 @@ func (r ReplMgr) processReplJob(ctx context.Context, job datastore.ReplJob, sour
replDuration := time.Since(replStart)
r.replLatencyMetric.Observe(float64(replDuration) / float64(time.Second))
- if err := r.datastore.UpdateReplJobState(job.ID, datastore.JobStateComplete); err != nil {
+ if err := r.datastore.UpdateReplJobState(job.ID, datastore.JobStateCompleted); err != nil {
r.log.WithError(err).Error("error when updating replication job status to complete")
}
diff --git a/internal/praefect/replicator_test.go b/internal/praefect/replicator_test.go
index ca124a7e0..5110c49c8 100644
--- a/internal/praefect/replicator_test.go
+++ b/internal/praefect/replicator_test.go
@@ -127,7 +127,7 @@ func TestProcessReplicationJob(t *testing.T) {
_, err = ds.CreateReplicaReplJobs(testRepo.GetRelativePath(), primary.Storage, secondaryStorages, datastore.UpdateRepo, nil)
require.NoError(t, err)
- jobs, err := ds.GetJobs(datastore.JobStateReady|datastore.JobStatePending, backupStorageName, 1)
+ jobs, err := ds.GetJobs([]datastore.JobState{datastore.JobStateReady, datastore.JobStatePending}, backupStorageName, 1)
require.NoError(t, err)
require.Len(t, jobs, 1)
@@ -295,7 +295,7 @@ TestJobGetsCancelled:
for {
select {
case <-ticker.C:
- replJobs, err := ds.GetJobs(datastore.JobStateDead, "backup", 10)
+ replJobs, err := ds.GetJobs([]datastore.JobState{datastore.JobStateDead}, "backup", 10)
require.NoError(t, err)
if len(replJobs) == 1 {
//success
@@ -426,7 +426,7 @@ TestJobSucceeds:
for {
select {
case <-ticker.C:
- replJobs, err := ds.GetJobs(datastore.JobStateFailed|datastore.JobStateInProgress|datastore.JobStateReady|datastore.JobStateDead, "backup", 10)
+ replJobs, err := ds.GetJobs([]datastore.JobState{datastore.JobStateFailed, datastore.JobStateInProgress, datastore.JobStateReady, datastore.JobStateDead}, "backup", 10)
require.NoError(t, err)
if len(replJobs) == 0 {
//success