Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSami Hiltunen <shiltunen@gitlab.com>2020-04-08 12:18:22 +0300
committerSami Hiltunen <shiltunen@gitlab.com>2020-04-08 12:18:22 +0300
commitd0d6fc55a933d165910bb9afce3333872adb3aa8 (patch)
tree3f5677122406c7acfd516805463b1cffe48ccf30
parent0887bf430d11fdf8e9b54070e85d5dd2e7cce6ea (diff)
parentb96785b8dfa2bced487bf2f4b69c489dd97b1cdb (diff)
Merge branch 'smh-mem-datastore-dead-jobs' into 'master'
Track dead jobs in praefect in-memory replication job queue See merge request gitlab-org/gitaly!2004
-rw-r--r--internal/praefect/datastore/memory.go71
-rw-r--r--internal/praefect/datastore/memory_test.go131
-rw-r--r--internal/praefect/datastore/queue.go8
3 files changed, 195 insertions, 15 deletions
diff --git a/internal/praefect/datastore/memory.go b/internal/praefect/datastore/memory.go
index 4f6f72938..64826d78d 100644
--- a/internal/praefect/datastore/memory.go
+++ b/internal/praefect/datastore/memory.go
@@ -2,22 +2,37 @@ package datastore
import (
"context"
+ "errors"
"fmt"
"sync"
"time"
)
+var (
+ errDeadAckedAsFailed = errors.New("job acknowledged as failed with no attempts left, should be 'dead'")
+)
+
// NewMemoryReplicationEventQueue return in-memory implementation of the ReplicationEventQueue.
func NewMemoryReplicationEventQueue() ReplicationEventQueue {
- return &memoryReplicationEventQueue{dequeued: map[uint64]struct{}{}}
+ return &memoryReplicationEventQueue{
+ dequeued: map[uint64]struct{}{},
+ maxDeadJobs: 1000,
+ }
+}
+
+type deadJob struct {
+ createdAt time.Time
+ relativePath string
}
// memoryReplicationEventQueue implements queue interface with in-memory implementation of storage
type memoryReplicationEventQueue struct {
sync.RWMutex
- seq uint64 // used to generate unique identifiers for events
- queued []ReplicationEvent // all new events stored as queue
- dequeued map[uint64]struct{} // all events dequeued, but not yet acknowledged
+ seq uint64 // used to generate unique identifiers for events
+ queued []ReplicationEvent // all new events stored as queue
+ dequeued map[uint64]struct{} // all events dequeued, but not yet acknowledged
+ maxDeadJobs int // maximum amount of dead jobs to hold in memory
+ deadJobs []deadJob // dead jobs stored for reporting purposes
}
// nextID returns a new sequential ID for new events.
@@ -50,11 +65,10 @@ func (s *memoryReplicationEventQueue) Dequeue(_ context.Context, nodeStorage str
for i := 0; i < len(s.queued); i++ {
event := s.queued[i]
- hasMoreAttempts := event.Attempt > 0
isForTargetStorage := event.Job.TargetNodeStorage == nodeStorage
isReadyOrFailed := event.State == JobStateReady || event.State == JobStateFailed
- if hasMoreAttempts && isForTargetStorage && isReadyOrFailed {
+ if isForTargetStorage && isReadyOrFailed {
updatedAt := time.Now().UTC()
event.Attempt--
event.State = JobStateInProgress
@@ -101,6 +115,10 @@ func (s *memoryReplicationEventQueue) Acknowledge(_ context.Context, state JobSt
return nil, fmt.Errorf("event not in progress, can't be acknowledged: %d [%s]", s.queued[i].ID, s.queued[i].State)
}
+ if s.queued[i].Attempt == 0 && state == JobStateFailed {
+ return nil, errDeadAckedAsFailed
+ }
+
updatedAt := time.Now().UTC()
s.queued[i].State = state
s.queued[i].UpdatedAt = &updatedAt
@@ -110,12 +128,7 @@ func (s *memoryReplicationEventQueue) Acknowledge(_ context.Context, state JobSt
switch state {
case JobStateCompleted, JobStateCancelled, JobStateDead:
// this event is fully processed and could be removed
- s.remove(i)
- case JobStateFailed:
- if s.queued[i].Attempt == 0 {
- // out of luck for this replication event, remove from queue as no more attempts available
- s.remove(i)
- }
+ s.remove(i, state)
}
break
}
@@ -124,9 +137,39 @@ func (s *memoryReplicationEventQueue) Acknowledge(_ context.Context, state JobSt
return result, nil
}
-// remove deletes i-th element from slice and from tracking map.
+// CountDeadReplicationJobs returns the dead replication job counts by relative path within the given timerange.
+// The timerange beginning is inclusive and ending is exclusive. The in-memory queue stores only the most recent
+// 1000 dead jobs.
+func (s *memoryReplicationEventQueue) CountDeadReplicationJobs(ctx context.Context, from, to time.Time) (map[string]int64, error) {
+ s.RLock()
+ defer s.RUnlock()
+
+ from = from.Add(-time.Nanosecond)
+ dead := map[string]int64{}
+ for _, job := range s.deadJobs {
+ if job.createdAt.After(from) && job.createdAt.Before(to) {
+ dead[job.relativePath]++
+ }
+ }
+
+ return dead, nil
+}
+
+// remove deletes i-th element from the queue and from the in-flight tracking map.
// It doesn't check 'i' for the out of range and must be called with lock protection.
-func (s *memoryReplicationEventQueue) remove(i int) {
+// If state is JobStateDead, the event will be added to the dead job tracker.
+func (s *memoryReplicationEventQueue) remove(i int, state JobState) {
+ if state == JobStateDead {
+ if len(s.deadJobs) >= s.maxDeadJobs {
+ s.deadJobs = s.deadJobs[1:]
+ }
+
+ s.deadJobs = append(s.deadJobs, deadJob{
+ createdAt: s.queued[i].CreatedAt,
+ relativePath: s.queued[i].Job.RelativePath,
+ })
+ }
+
delete(s.dequeued, s.queued[i].ID)
s.queued = append(s.queued[:i], s.queued[i+1:]...)
}
diff --git a/internal/praefect/datastore/memory_test.go b/internal/praefect/datastore/memory_test.go
index 1f33ff200..274e3bb85 100644
--- a/internal/praefect/datastore/memory_test.go
+++ b/internal/praefect/datastore/memory_test.go
@@ -1,13 +1,138 @@
package datastore
import (
+ "fmt"
"sync"
"testing"
+ "time"
"github.com/stretchr/testify/require"
"gitlab.com/gitlab-org/gitaly/internal/testhelper"
)
+func ContractTestCountDeadReplicationJobs(t *testing.T, q ReplicationEventQueue) {
+ ctx, cancel := testhelper.Context()
+ defer cancel()
+
+ const target = "target"
+ ackJobsToDeath := func(t *testing.T) {
+ t.Helper()
+
+ for {
+ jobs, err := q.Dequeue(ctx, target, 1)
+ require.NoError(t, err)
+ if len(jobs) == 0 {
+ break
+ }
+
+ for _, job := range jobs {
+ state := JobStateFailed
+ if job.Attempt == 0 {
+ state = JobStateDead
+ }
+
+ _, err := q.Acknowledge(ctx, state, []uint64{job.ID})
+ require.NoError(t, err)
+ }
+ }
+ }
+
+ // add some other job states to the datastore to ensure they are not counted
+ for relPath, state := range map[string]JobState{"repo/completed-job": JobStateCompleted, "repo/cancelled-job": JobStateCancelled} {
+ _, err := q.Enqueue(ctx, ReplicationEvent{Job: ReplicationJob{RelativePath: relPath, TargetNodeStorage: target}})
+ require.NoError(t, err)
+
+ jobs, err := q.Dequeue(ctx, target, 1)
+ require.NoError(t, err)
+
+ _, err = q.Acknowledge(ctx, state, []uint64{jobs[0].ID})
+ require.NoError(t, err)
+ }
+
+ beforeOldest := time.Now()
+ _, err := q.Enqueue(ctx, ReplicationEvent{Job: ReplicationJob{RelativePath: "old", TargetNodeStorage: target}})
+ require.NoError(t, err)
+ afterOldest := time.Now()
+
+ dead, err := q.CountDeadReplicationJobs(ctx, beforeOldest, time.Now())
+ require.NoError(t, err)
+ require.Empty(t, dead, "should not include ready jobs")
+
+ jobs, err := q.Dequeue(ctx, target, 1)
+ require.NoError(t, err)
+
+ _, err = q.Acknowledge(ctx, JobStateFailed, []uint64{jobs[0].ID})
+ require.NoError(t, err)
+
+ dead, err = q.CountDeadReplicationJobs(ctx, beforeOldest, time.Now())
+ require.NoError(t, err)
+ require.Empty(t, dead, "should not include failed jobs")
+
+ ackJobsToDeath(t)
+ dead, err = q.CountDeadReplicationJobs(ctx, beforeOldest, time.Now())
+ require.NoError(t, err)
+ require.Equal(t, map[string]int64{"old": 1}, dead, "should include dead job")
+
+ _, err = q.Enqueue(ctx, ReplicationEvent{Job: ReplicationJob{RelativePath: "new", TargetNodeStorage: target}})
+ require.NoError(t, err)
+ afterMiddle := time.Now()
+
+ ackJobsToDeath(t)
+ dead, err = q.CountDeadReplicationJobs(ctx, beforeOldest, time.Now())
+ require.NoError(t, err)
+ require.Equal(t, map[string]int64{"old": 1, "new": 1}, dead, "should include both dead jobs")
+
+ time.Sleep(time.Millisecond)
+ _, err = q.Enqueue(ctx, ReplicationEvent{Job: ReplicationJob{RelativePath: "new", TargetNodeStorage: target}})
+ require.NoError(t, err)
+
+ ackJobsToDeath(t)
+ dead, err = q.CountDeadReplicationJobs(ctx, beforeOldest, time.Now())
+ require.NoError(t, err)
+ require.Equal(t, map[string]int64{"old": 1, "new": 2}, dead, "dead job are grouped by relative path")
+
+ dead, err = q.CountDeadReplicationJobs(ctx, afterOldest, afterMiddle)
+ require.NoError(t, err)
+ require.Equal(t, map[string]int64{"new": 1}, dead, "should only count the in-between dead job")
+}
+
+func TestMemoryCountDeadReplicationJobs(t *testing.T) {
+ ContractTestCountDeadReplicationJobs(t, NewMemoryReplicationEventQueue())
+}
+
+func TestMemoryCountDeadReplicationJobsLimit(t *testing.T) {
+ q := NewMemoryReplicationEventQueue().(*memoryReplicationEventQueue)
+ q.maxDeadJobs = 2
+
+ ctx, cancel := testhelper.Context()
+ defer cancel()
+
+ const target = "target"
+
+ beforeAll := time.Now()
+ for i := 0; i < q.maxDeadJobs+1; i++ {
+ job, err := q.Enqueue(ctx, ReplicationEvent{Job: ReplicationJob{RelativePath: fmt.Sprintf("job-%d", i), TargetNodeStorage: target}})
+ require.NoError(t, err)
+
+ for i := 0; i < job.Attempt; i++ {
+ _, err := q.Dequeue(ctx, target, 1)
+ require.NoError(t, err)
+
+ state := JobStateFailed
+ if i == job.Attempt-1 {
+ state = JobStateDead
+ }
+
+ _, err = q.Acknowledge(ctx, state, []uint64{job.ID})
+ require.NoError(t, err)
+ }
+ }
+
+ dead, err := q.CountDeadReplicationJobs(ctx, beforeAll, time.Now())
+ require.NoError(t, err)
+ require.Equal(t, map[string]int64{"job-1": 1, "job-2": 1}, dead, "should only include the last two dead jobs")
+}
+
func TestMemoryReplicationEventQueue(t *testing.T) {
ctx, cancel := testhelper.Context()
defer cancel()
@@ -124,7 +249,11 @@ func TestMemoryReplicationEventQueue(t *testing.T) {
}
require.Equal(t, expAttempt3, dequeuedAttempt3[0])
- acknowledgedAttempt3, err := queue.Acknowledge(ctx, JobStateFailed, []uint64{event1.ID})
+ ackFailedNoAttemptsLeft, err := queue.Acknowledge(ctx, JobStateFailed, []uint64{event1.ID})
+ require.Error(t, errDeadAckedAsFailed, err)
+ require.Empty(t, ackFailedNoAttemptsLeft)
+
+ acknowledgedAttempt3, err := queue.Acknowledge(ctx, JobStateDead, []uint64{event1.ID})
require.NoError(t, err)
require.Equal(t, []uint64{event1.ID}, acknowledgedAttempt3, "one event must be acknowledged")
diff --git a/internal/praefect/datastore/queue.go b/internal/praefect/datastore/queue.go
index f9fbd41c5..6de79e6ce 100644
--- a/internal/praefect/datastore/queue.go
+++ b/internal/praefect/datastore/queue.go
@@ -8,6 +8,7 @@ import (
"fmt"
"time"
+ "gitlab.com/gitlab-org/gitaly/internal/helper"
"gitlab.com/gitlab-org/gitaly/internal/praefect/datastore/glsql"
)
@@ -21,6 +22,9 @@ type ReplicationEventQueue interface {
// It only updates events that are in 'in_progress' state.
// It returns list of ids that was actually acknowledged.
Acknowledge(ctx context.Context, state JobState, ids []uint64) ([]uint64, error)
+ // CountDeadReplicationJobs returns the dead replication job counts by relative path within the
+ // given timerange. The timerange beginning is inclusive and ending is exclusive.
+ CountDeadReplicationJobs(ctx context.Context, from, to time.Time) (map[string]int64, error)
}
func allowToAck(state JobState) error {
@@ -148,6 +152,10 @@ type PostgresReplicationEventQueue struct {
qc glsql.Querier
}
+func (rq PostgresReplicationEventQueue) CountDeadReplicationJobs(ctx context.Context, from, to time.Time) (map[string]int64, error) {
+ return nil, helper.Unimplemented
+}
+
func (rq PostgresReplicationEventQueue) Enqueue(ctx context.Context, event ReplicationEvent) (ReplicationEvent, error) {
query := `
WITH insert_lock AS (