diff options
author | Sami Hiltunen <shiltunen@gitlab.com> | 2020-04-08 12:18:22 +0300 |
---|---|---|
committer | Sami Hiltunen <shiltunen@gitlab.com> | 2020-04-08 12:18:22 +0300 |
commit | d0d6fc55a933d165910bb9afce3333872adb3aa8 (patch) | |
tree | 3f5677122406c7acfd516805463b1cffe48ccf30 | |
parent | 0887bf430d11fdf8e9b54070e85d5dd2e7cce6ea (diff) | |
parent | b96785b8dfa2bced487bf2f4b69c489dd97b1cdb (diff) |
Merge branch 'smh-mem-datastore-dead-jobs' into 'master'
Track dead jobs in praefect in-memory replication job queue
See merge request gitlab-org/gitaly!2004
-rw-r--r-- | internal/praefect/datastore/memory.go | 71 | ||||
-rw-r--r-- | internal/praefect/datastore/memory_test.go | 131 | ||||
-rw-r--r-- | internal/praefect/datastore/queue.go | 8 |
3 files changed, 195 insertions, 15 deletions
diff --git a/internal/praefect/datastore/memory.go b/internal/praefect/datastore/memory.go index 4f6f72938..64826d78d 100644 --- a/internal/praefect/datastore/memory.go +++ b/internal/praefect/datastore/memory.go @@ -2,22 +2,37 @@ package datastore import ( "context" + "errors" "fmt" "sync" "time" ) +var ( + errDeadAckedAsFailed = errors.New("job acknowledged as failed with no attempts left, should be 'dead'") +) + // NewMemoryReplicationEventQueue return in-memory implementation of the ReplicationEventQueue. func NewMemoryReplicationEventQueue() ReplicationEventQueue { - return &memoryReplicationEventQueue{dequeued: map[uint64]struct{}{}} + return &memoryReplicationEventQueue{ + dequeued: map[uint64]struct{}{}, + maxDeadJobs: 1000, + } +} + +type deadJob struct { + createdAt time.Time + relativePath string } // memoryReplicationEventQueue implements queue interface with in-memory implementation of storage type memoryReplicationEventQueue struct { sync.RWMutex - seq uint64 // used to generate unique identifiers for events - queued []ReplicationEvent // all new events stored as queue - dequeued map[uint64]struct{} // all events dequeued, but not yet acknowledged + seq uint64 // used to generate unique identifiers for events + queued []ReplicationEvent // all new events stored as queue + dequeued map[uint64]struct{} // all events dequeued, but not yet acknowledged + maxDeadJobs int // maximum amount of dead jobs to hold in memory + deadJobs []deadJob // dead jobs stored for reporting purposes } // nextID returns a new sequential ID for new events. @@ -50,11 +65,10 @@ func (s *memoryReplicationEventQueue) Dequeue(_ context.Context, nodeStorage str for i := 0; i < len(s.queued); i++ { event := s.queued[i] - hasMoreAttempts := event.Attempt > 0 isForTargetStorage := event.Job.TargetNodeStorage == nodeStorage isReadyOrFailed := event.State == JobStateReady || event.State == JobStateFailed - if hasMoreAttempts && isForTargetStorage && isReadyOrFailed { + if isForTargetStorage && isReadyOrFailed { updatedAt := time.Now().UTC() event.Attempt-- event.State = JobStateInProgress @@ -101,6 +115,10 @@ func (s *memoryReplicationEventQueue) Acknowledge(_ context.Context, state JobSt return nil, fmt.Errorf("event not in progress, can't be acknowledged: %d [%s]", s.queued[i].ID, s.queued[i].State) } + if s.queued[i].Attempt == 0 && state == JobStateFailed { + return nil, errDeadAckedAsFailed + } + updatedAt := time.Now().UTC() s.queued[i].State = state s.queued[i].UpdatedAt = &updatedAt @@ -110,12 +128,7 @@ func (s *memoryReplicationEventQueue) Acknowledge(_ context.Context, state JobSt switch state { case JobStateCompleted, JobStateCancelled, JobStateDead: // this event is fully processed and could be removed - s.remove(i) - case JobStateFailed: - if s.queued[i].Attempt == 0 { - // out of luck for this replication event, remove from queue as no more attempts available - s.remove(i) - } + s.remove(i, state) } break } @@ -124,9 +137,39 @@ func (s *memoryReplicationEventQueue) Acknowledge(_ context.Context, state JobSt return result, nil } -// remove deletes i-th element from slice and from tracking map. +// CountDeadReplicationJobs returns the dead replication job counts by relative path within the given timerange. +// The timerange beginning is inclusive and ending is exclusive. The in-memory queue stores only the most recent +// 1000 dead jobs. +func (s *memoryReplicationEventQueue) CountDeadReplicationJobs(ctx context.Context, from, to time.Time) (map[string]int64, error) { + s.RLock() + defer s.RUnlock() + + from = from.Add(-time.Nanosecond) + dead := map[string]int64{} + for _, job := range s.deadJobs { + if job.createdAt.After(from) && job.createdAt.Before(to) { + dead[job.relativePath]++ + } + } + + return dead, nil +} + +// remove deletes i-th element from the queue and from the in-flight tracking map. // It doesn't check 'i' for the out of range and must be called with lock protection. -func (s *memoryReplicationEventQueue) remove(i int) { +// If state is JobStateDead, the event will be added to the dead job tracker. +func (s *memoryReplicationEventQueue) remove(i int, state JobState) { + if state == JobStateDead { + if len(s.deadJobs) >= s.maxDeadJobs { + s.deadJobs = s.deadJobs[1:] + } + + s.deadJobs = append(s.deadJobs, deadJob{ + createdAt: s.queued[i].CreatedAt, + relativePath: s.queued[i].Job.RelativePath, + }) + } + delete(s.dequeued, s.queued[i].ID) s.queued = append(s.queued[:i], s.queued[i+1:]...) } diff --git a/internal/praefect/datastore/memory_test.go b/internal/praefect/datastore/memory_test.go index 1f33ff200..274e3bb85 100644 --- a/internal/praefect/datastore/memory_test.go +++ b/internal/praefect/datastore/memory_test.go @@ -1,13 +1,138 @@ package datastore import ( + "fmt" "sync" "testing" + "time" "github.com/stretchr/testify/require" "gitlab.com/gitlab-org/gitaly/internal/testhelper" ) +func ContractTestCountDeadReplicationJobs(t *testing.T, q ReplicationEventQueue) { + ctx, cancel := testhelper.Context() + defer cancel() + + const target = "target" + ackJobsToDeath := func(t *testing.T) { + t.Helper() + + for { + jobs, err := q.Dequeue(ctx, target, 1) + require.NoError(t, err) + if len(jobs) == 0 { + break + } + + for _, job := range jobs { + state := JobStateFailed + if job.Attempt == 0 { + state = JobStateDead + } + + _, err := q.Acknowledge(ctx, state, []uint64{job.ID}) + require.NoError(t, err) + } + } + } + + // add some other job states to the datastore to ensure they are not counted + for relPath, state := range map[string]JobState{"repo/completed-job": JobStateCompleted, "repo/cancelled-job": JobStateCancelled} { + _, err := q.Enqueue(ctx, ReplicationEvent{Job: ReplicationJob{RelativePath: relPath, TargetNodeStorage: target}}) + require.NoError(t, err) + + jobs, err := q.Dequeue(ctx, target, 1) + require.NoError(t, err) + + _, err = q.Acknowledge(ctx, state, []uint64{jobs[0].ID}) + require.NoError(t, err) + } + + beforeOldest := time.Now() + _, err := q.Enqueue(ctx, ReplicationEvent{Job: ReplicationJob{RelativePath: "old", TargetNodeStorage: target}}) + require.NoError(t, err) + afterOldest := time.Now() + + dead, err := q.CountDeadReplicationJobs(ctx, beforeOldest, time.Now()) + require.NoError(t, err) + require.Empty(t, dead, "should not include ready jobs") + + jobs, err := q.Dequeue(ctx, target, 1) + require.NoError(t, err) + + _, err = q.Acknowledge(ctx, JobStateFailed, []uint64{jobs[0].ID}) + require.NoError(t, err) + + dead, err = q.CountDeadReplicationJobs(ctx, beforeOldest, time.Now()) + require.NoError(t, err) + require.Empty(t, dead, "should not include failed jobs") + + ackJobsToDeath(t) + dead, err = q.CountDeadReplicationJobs(ctx, beforeOldest, time.Now()) + require.NoError(t, err) + require.Equal(t, map[string]int64{"old": 1}, dead, "should include dead job") + + _, err = q.Enqueue(ctx, ReplicationEvent{Job: ReplicationJob{RelativePath: "new", TargetNodeStorage: target}}) + require.NoError(t, err) + afterMiddle := time.Now() + + ackJobsToDeath(t) + dead, err = q.CountDeadReplicationJobs(ctx, beforeOldest, time.Now()) + require.NoError(t, err) + require.Equal(t, map[string]int64{"old": 1, "new": 1}, dead, "should include both dead jobs") + + time.Sleep(time.Millisecond) + _, err = q.Enqueue(ctx, ReplicationEvent{Job: ReplicationJob{RelativePath: "new", TargetNodeStorage: target}}) + require.NoError(t, err) + + ackJobsToDeath(t) + dead, err = q.CountDeadReplicationJobs(ctx, beforeOldest, time.Now()) + require.NoError(t, err) + require.Equal(t, map[string]int64{"old": 1, "new": 2}, dead, "dead job are grouped by relative path") + + dead, err = q.CountDeadReplicationJobs(ctx, afterOldest, afterMiddle) + require.NoError(t, err) + require.Equal(t, map[string]int64{"new": 1}, dead, "should only count the in-between dead job") +} + +func TestMemoryCountDeadReplicationJobs(t *testing.T) { + ContractTestCountDeadReplicationJobs(t, NewMemoryReplicationEventQueue()) +} + +func TestMemoryCountDeadReplicationJobsLimit(t *testing.T) { + q := NewMemoryReplicationEventQueue().(*memoryReplicationEventQueue) + q.maxDeadJobs = 2 + + ctx, cancel := testhelper.Context() + defer cancel() + + const target = "target" + + beforeAll := time.Now() + for i := 0; i < q.maxDeadJobs+1; i++ { + job, err := q.Enqueue(ctx, ReplicationEvent{Job: ReplicationJob{RelativePath: fmt.Sprintf("job-%d", i), TargetNodeStorage: target}}) + require.NoError(t, err) + + for i := 0; i < job.Attempt; i++ { + _, err := q.Dequeue(ctx, target, 1) + require.NoError(t, err) + + state := JobStateFailed + if i == job.Attempt-1 { + state = JobStateDead + } + + _, err = q.Acknowledge(ctx, state, []uint64{job.ID}) + require.NoError(t, err) + } + } + + dead, err := q.CountDeadReplicationJobs(ctx, beforeAll, time.Now()) + require.NoError(t, err) + require.Equal(t, map[string]int64{"job-1": 1, "job-2": 1}, dead, "should only include the last two dead jobs") +} + func TestMemoryReplicationEventQueue(t *testing.T) { ctx, cancel := testhelper.Context() defer cancel() @@ -124,7 +249,11 @@ func TestMemoryReplicationEventQueue(t *testing.T) { } require.Equal(t, expAttempt3, dequeuedAttempt3[0]) - acknowledgedAttempt3, err := queue.Acknowledge(ctx, JobStateFailed, []uint64{event1.ID}) + ackFailedNoAttemptsLeft, err := queue.Acknowledge(ctx, JobStateFailed, []uint64{event1.ID}) + require.Error(t, errDeadAckedAsFailed, err) + require.Empty(t, ackFailedNoAttemptsLeft) + + acknowledgedAttempt3, err := queue.Acknowledge(ctx, JobStateDead, []uint64{event1.ID}) require.NoError(t, err) require.Equal(t, []uint64{event1.ID}, acknowledgedAttempt3, "one event must be acknowledged") diff --git a/internal/praefect/datastore/queue.go b/internal/praefect/datastore/queue.go index f9fbd41c5..6de79e6ce 100644 --- a/internal/praefect/datastore/queue.go +++ b/internal/praefect/datastore/queue.go @@ -8,6 +8,7 @@ import ( "fmt" "time" + "gitlab.com/gitlab-org/gitaly/internal/helper" "gitlab.com/gitlab-org/gitaly/internal/praefect/datastore/glsql" ) @@ -21,6 +22,9 @@ type ReplicationEventQueue interface { // It only updates events that are in 'in_progress' state. // It returns list of ids that was actually acknowledged. Acknowledge(ctx context.Context, state JobState, ids []uint64) ([]uint64, error) + // CountDeadReplicationJobs returns the dead replication job counts by relative path within the + // given timerange. The timerange beginning is inclusive and ending is exclusive. + CountDeadReplicationJobs(ctx context.Context, from, to time.Time) (map[string]int64, error) } func allowToAck(state JobState) error { @@ -148,6 +152,10 @@ type PostgresReplicationEventQueue struct { qc glsql.Querier } +func (rq PostgresReplicationEventQueue) CountDeadReplicationJobs(ctx context.Context, from, to time.Time) (map[string]int64, error) { + return nil, helper.Unimplemented +} + func (rq PostgresReplicationEventQueue) Enqueue(ctx context.Context, event ReplicationEvent) (ReplicationEvent, error) { query := ` WITH insert_lock AS ( |