diff options
author | Sami Hiltunen <shiltunen@gitlab.com> | 2020-04-01 19:18:35 +0300 |
---|---|---|
committer | Sami Hiltunen <shiltunen@gitlab.com> | 2020-04-06 17:50:25 +0300 |
commit | b96785b8dfa2bced487bf2f4b69c489dd97b1cdb (patch) | |
tree | aa64f6dd12717b909d4706dc0efb08c90c312bcf | |
parent | 07e565d52335026ac206843d340898eb46222a2e (diff) |
track dead jobs in praefect in-memory replication job queue
Adds dead job tracking for praefect in-memory replication job queue.
This will be used by praefect dataloss command that will fetch dead
replication jobs.
As there dead replication jobs are kept in memory, only the latest
1000 jobs are kept in order to keep the memory consumption in check.
-rw-r--r-- | internal/praefect/datastore/memory.go | 71 | ||||
-rw-r--r-- | internal/praefect/datastore/memory_test.go | 131 | ||||
-rw-r--r-- | internal/praefect/datastore/queue.go | 8 |
3 files changed, 195 insertions, 15 deletions
diff --git a/internal/praefect/datastore/memory.go b/internal/praefect/datastore/memory.go index 4f6f72938..64826d78d 100644 --- a/internal/praefect/datastore/memory.go +++ b/internal/praefect/datastore/memory.go @@ -2,22 +2,37 @@ package datastore import ( "context" + "errors" "fmt" "sync" "time" ) +var ( + errDeadAckedAsFailed = errors.New("job acknowledged as failed with no attempts left, should be 'dead'") +) + // NewMemoryReplicationEventQueue return in-memory implementation of the ReplicationEventQueue. func NewMemoryReplicationEventQueue() ReplicationEventQueue { - return &memoryReplicationEventQueue{dequeued: map[uint64]struct{}{}} + return &memoryReplicationEventQueue{ + dequeued: map[uint64]struct{}{}, + maxDeadJobs: 1000, + } +} + +type deadJob struct { + createdAt time.Time + relativePath string } // memoryReplicationEventQueue implements queue interface with in-memory implementation of storage type memoryReplicationEventQueue struct { sync.RWMutex - seq uint64 // used to generate unique identifiers for events - queued []ReplicationEvent // all new events stored as queue - dequeued map[uint64]struct{} // all events dequeued, but not yet acknowledged + seq uint64 // used to generate unique identifiers for events + queued []ReplicationEvent // all new events stored as queue + dequeued map[uint64]struct{} // all events dequeued, but not yet acknowledged + maxDeadJobs int // maximum amount of dead jobs to hold in memory + deadJobs []deadJob // dead jobs stored for reporting purposes } // nextID returns a new sequential ID for new events. @@ -50,11 +65,10 @@ func (s *memoryReplicationEventQueue) Dequeue(_ context.Context, nodeStorage str for i := 0; i < len(s.queued); i++ { event := s.queued[i] - hasMoreAttempts := event.Attempt > 0 isForTargetStorage := event.Job.TargetNodeStorage == nodeStorage isReadyOrFailed := event.State == JobStateReady || event.State == JobStateFailed - if hasMoreAttempts && isForTargetStorage && isReadyOrFailed { + if isForTargetStorage && isReadyOrFailed { updatedAt := time.Now().UTC() event.Attempt-- event.State = JobStateInProgress @@ -101,6 +115,10 @@ func (s *memoryReplicationEventQueue) Acknowledge(_ context.Context, state JobSt return nil, fmt.Errorf("event not in progress, can't be acknowledged: %d [%s]", s.queued[i].ID, s.queued[i].State) } + if s.queued[i].Attempt == 0 && state == JobStateFailed { + return nil, errDeadAckedAsFailed + } + updatedAt := time.Now().UTC() s.queued[i].State = state s.queued[i].UpdatedAt = &updatedAt @@ -110,12 +128,7 @@ func (s *memoryReplicationEventQueue) Acknowledge(_ context.Context, state JobSt switch state { case JobStateCompleted, JobStateCancelled, JobStateDead: // this event is fully processed and could be removed - s.remove(i) - case JobStateFailed: - if s.queued[i].Attempt == 0 { - // out of luck for this replication event, remove from queue as no more attempts available - s.remove(i) - } + s.remove(i, state) } break } @@ -124,9 +137,39 @@ func (s *memoryReplicationEventQueue) Acknowledge(_ context.Context, state JobSt return result, nil } -// remove deletes i-th element from slice and from tracking map. +// CountDeadReplicationJobs returns the dead replication job counts by relative path within the given timerange. +// The timerange beginning is inclusive and ending is exclusive. The in-memory queue stores only the most recent +// 1000 dead jobs. +func (s *memoryReplicationEventQueue) CountDeadReplicationJobs(ctx context.Context, from, to time.Time) (map[string]int64, error) { + s.RLock() + defer s.RUnlock() + + from = from.Add(-time.Nanosecond) + dead := map[string]int64{} + for _, job := range s.deadJobs { + if job.createdAt.After(from) && job.createdAt.Before(to) { + dead[job.relativePath]++ + } + } + + return dead, nil +} + +// remove deletes i-th element from the queue and from the in-flight tracking map. // It doesn't check 'i' for the out of range and must be called with lock protection. -func (s *memoryReplicationEventQueue) remove(i int) { +// If state is JobStateDead, the event will be added to the dead job tracker. +func (s *memoryReplicationEventQueue) remove(i int, state JobState) { + if state == JobStateDead { + if len(s.deadJobs) >= s.maxDeadJobs { + s.deadJobs = s.deadJobs[1:] + } + + s.deadJobs = append(s.deadJobs, deadJob{ + createdAt: s.queued[i].CreatedAt, + relativePath: s.queued[i].Job.RelativePath, + }) + } + delete(s.dequeued, s.queued[i].ID) s.queued = append(s.queued[:i], s.queued[i+1:]...) } diff --git a/internal/praefect/datastore/memory_test.go b/internal/praefect/datastore/memory_test.go index 1f33ff200..274e3bb85 100644 --- a/internal/praefect/datastore/memory_test.go +++ b/internal/praefect/datastore/memory_test.go @@ -1,13 +1,138 @@ package datastore import ( + "fmt" "sync" "testing" + "time" "github.com/stretchr/testify/require" "gitlab.com/gitlab-org/gitaly/internal/testhelper" ) +func ContractTestCountDeadReplicationJobs(t *testing.T, q ReplicationEventQueue) { + ctx, cancel := testhelper.Context() + defer cancel() + + const target = "target" + ackJobsToDeath := func(t *testing.T) { + t.Helper() + + for { + jobs, err := q.Dequeue(ctx, target, 1) + require.NoError(t, err) + if len(jobs) == 0 { + break + } + + for _, job := range jobs { + state := JobStateFailed + if job.Attempt == 0 { + state = JobStateDead + } + + _, err := q.Acknowledge(ctx, state, []uint64{job.ID}) + require.NoError(t, err) + } + } + } + + // add some other job states to the datastore to ensure they are not counted + for relPath, state := range map[string]JobState{"repo/completed-job": JobStateCompleted, "repo/cancelled-job": JobStateCancelled} { + _, err := q.Enqueue(ctx, ReplicationEvent{Job: ReplicationJob{RelativePath: relPath, TargetNodeStorage: target}}) + require.NoError(t, err) + + jobs, err := q.Dequeue(ctx, target, 1) + require.NoError(t, err) + + _, err = q.Acknowledge(ctx, state, []uint64{jobs[0].ID}) + require.NoError(t, err) + } + + beforeOldest := time.Now() + _, err := q.Enqueue(ctx, ReplicationEvent{Job: ReplicationJob{RelativePath: "old", TargetNodeStorage: target}}) + require.NoError(t, err) + afterOldest := time.Now() + + dead, err := q.CountDeadReplicationJobs(ctx, beforeOldest, time.Now()) + require.NoError(t, err) + require.Empty(t, dead, "should not include ready jobs") + + jobs, err := q.Dequeue(ctx, target, 1) + require.NoError(t, err) + + _, err = q.Acknowledge(ctx, JobStateFailed, []uint64{jobs[0].ID}) + require.NoError(t, err) + + dead, err = q.CountDeadReplicationJobs(ctx, beforeOldest, time.Now()) + require.NoError(t, err) + require.Empty(t, dead, "should not include failed jobs") + + ackJobsToDeath(t) + dead, err = q.CountDeadReplicationJobs(ctx, beforeOldest, time.Now()) + require.NoError(t, err) + require.Equal(t, map[string]int64{"old": 1}, dead, "should include dead job") + + _, err = q.Enqueue(ctx, ReplicationEvent{Job: ReplicationJob{RelativePath: "new", TargetNodeStorage: target}}) + require.NoError(t, err) + afterMiddle := time.Now() + + ackJobsToDeath(t) + dead, err = q.CountDeadReplicationJobs(ctx, beforeOldest, time.Now()) + require.NoError(t, err) + require.Equal(t, map[string]int64{"old": 1, "new": 1}, dead, "should include both dead jobs") + + time.Sleep(time.Millisecond) + _, err = q.Enqueue(ctx, ReplicationEvent{Job: ReplicationJob{RelativePath: "new", TargetNodeStorage: target}}) + require.NoError(t, err) + + ackJobsToDeath(t) + dead, err = q.CountDeadReplicationJobs(ctx, beforeOldest, time.Now()) + require.NoError(t, err) + require.Equal(t, map[string]int64{"old": 1, "new": 2}, dead, "dead job are grouped by relative path") + + dead, err = q.CountDeadReplicationJobs(ctx, afterOldest, afterMiddle) + require.NoError(t, err) + require.Equal(t, map[string]int64{"new": 1}, dead, "should only count the in-between dead job") +} + +func TestMemoryCountDeadReplicationJobs(t *testing.T) { + ContractTestCountDeadReplicationJobs(t, NewMemoryReplicationEventQueue()) +} + +func TestMemoryCountDeadReplicationJobsLimit(t *testing.T) { + q := NewMemoryReplicationEventQueue().(*memoryReplicationEventQueue) + q.maxDeadJobs = 2 + + ctx, cancel := testhelper.Context() + defer cancel() + + const target = "target" + + beforeAll := time.Now() + for i := 0; i < q.maxDeadJobs+1; i++ { + job, err := q.Enqueue(ctx, ReplicationEvent{Job: ReplicationJob{RelativePath: fmt.Sprintf("job-%d", i), TargetNodeStorage: target}}) + require.NoError(t, err) + + for i := 0; i < job.Attempt; i++ { + _, err := q.Dequeue(ctx, target, 1) + require.NoError(t, err) + + state := JobStateFailed + if i == job.Attempt-1 { + state = JobStateDead + } + + _, err = q.Acknowledge(ctx, state, []uint64{job.ID}) + require.NoError(t, err) + } + } + + dead, err := q.CountDeadReplicationJobs(ctx, beforeAll, time.Now()) + require.NoError(t, err) + require.Equal(t, map[string]int64{"job-1": 1, "job-2": 1}, dead, "should only include the last two dead jobs") +} + func TestMemoryReplicationEventQueue(t *testing.T) { ctx, cancel := testhelper.Context() defer cancel() @@ -124,7 +249,11 @@ func TestMemoryReplicationEventQueue(t *testing.T) { } require.Equal(t, expAttempt3, dequeuedAttempt3[0]) - acknowledgedAttempt3, err := queue.Acknowledge(ctx, JobStateFailed, []uint64{event1.ID}) + ackFailedNoAttemptsLeft, err := queue.Acknowledge(ctx, JobStateFailed, []uint64{event1.ID}) + require.Error(t, errDeadAckedAsFailed, err) + require.Empty(t, ackFailedNoAttemptsLeft) + + acknowledgedAttempt3, err := queue.Acknowledge(ctx, JobStateDead, []uint64{event1.ID}) require.NoError(t, err) require.Equal(t, []uint64{event1.ID}, acknowledgedAttempt3, "one event must be acknowledged") diff --git a/internal/praefect/datastore/queue.go b/internal/praefect/datastore/queue.go index f9fbd41c5..6de79e6ce 100644 --- a/internal/praefect/datastore/queue.go +++ b/internal/praefect/datastore/queue.go @@ -8,6 +8,7 @@ import ( "fmt" "time" + "gitlab.com/gitlab-org/gitaly/internal/helper" "gitlab.com/gitlab-org/gitaly/internal/praefect/datastore/glsql" ) @@ -21,6 +22,9 @@ type ReplicationEventQueue interface { // It only updates events that are in 'in_progress' state. // It returns list of ids that was actually acknowledged. Acknowledge(ctx context.Context, state JobState, ids []uint64) ([]uint64, error) + // CountDeadReplicationJobs returns the dead replication job counts by relative path within the + // given timerange. The timerange beginning is inclusive and ending is exclusive. + CountDeadReplicationJobs(ctx context.Context, from, to time.Time) (map[string]int64, error) } func allowToAck(state JobState) error { @@ -148,6 +152,10 @@ type PostgresReplicationEventQueue struct { qc glsql.Querier } +func (rq PostgresReplicationEventQueue) CountDeadReplicationJobs(ctx context.Context, from, to time.Time) (map[string]int64, error) { + return nil, helper.Unimplemented +} + func (rq PostgresReplicationEventQueue) Enqueue(ctx context.Context, event ReplicationEvent) (ReplicationEvent, error) { query := ` WITH insert_lock AS ( |