Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSami Hiltunen <shiltunen@gitlab.com>2020-04-01 19:18:35 +0300
committerSami Hiltunen <shiltunen@gitlab.com>2020-04-06 17:50:25 +0300
commitb96785b8dfa2bced487bf2f4b69c489dd97b1cdb (patch)
treeaa64f6dd12717b909d4706dc0efb08c90c312bcf
parent07e565d52335026ac206843d340898eb46222a2e (diff)
track dead jobs in praefect in-memory replication job queue
Adds dead job tracking for praefect in-memory replication job queue. This will be used by praefect dataloss command that will fetch dead replication jobs. As there dead replication jobs are kept in memory, only the latest 1000 jobs are kept in order to keep the memory consumption in check.
-rw-r--r--internal/praefect/datastore/memory.go71
-rw-r--r--internal/praefect/datastore/memory_test.go131
-rw-r--r--internal/praefect/datastore/queue.go8
3 files changed, 195 insertions, 15 deletions
diff --git a/internal/praefect/datastore/memory.go b/internal/praefect/datastore/memory.go
index 4f6f72938..64826d78d 100644
--- a/internal/praefect/datastore/memory.go
+++ b/internal/praefect/datastore/memory.go
@@ -2,22 +2,37 @@ package datastore
import (
"context"
+ "errors"
"fmt"
"sync"
"time"
)
+var (
+ errDeadAckedAsFailed = errors.New("job acknowledged as failed with no attempts left, should be 'dead'")
+)
+
// NewMemoryReplicationEventQueue return in-memory implementation of the ReplicationEventQueue.
func NewMemoryReplicationEventQueue() ReplicationEventQueue {
- return &memoryReplicationEventQueue{dequeued: map[uint64]struct{}{}}
+ return &memoryReplicationEventQueue{
+ dequeued: map[uint64]struct{}{},
+ maxDeadJobs: 1000,
+ }
+}
+
+type deadJob struct {
+ createdAt time.Time
+ relativePath string
}
// memoryReplicationEventQueue implements queue interface with in-memory implementation of storage
type memoryReplicationEventQueue struct {
sync.RWMutex
- seq uint64 // used to generate unique identifiers for events
- queued []ReplicationEvent // all new events stored as queue
- dequeued map[uint64]struct{} // all events dequeued, but not yet acknowledged
+ seq uint64 // used to generate unique identifiers for events
+ queued []ReplicationEvent // all new events stored as queue
+ dequeued map[uint64]struct{} // all events dequeued, but not yet acknowledged
+ maxDeadJobs int // maximum amount of dead jobs to hold in memory
+ deadJobs []deadJob // dead jobs stored for reporting purposes
}
// nextID returns a new sequential ID for new events.
@@ -50,11 +65,10 @@ func (s *memoryReplicationEventQueue) Dequeue(_ context.Context, nodeStorage str
for i := 0; i < len(s.queued); i++ {
event := s.queued[i]
- hasMoreAttempts := event.Attempt > 0
isForTargetStorage := event.Job.TargetNodeStorage == nodeStorage
isReadyOrFailed := event.State == JobStateReady || event.State == JobStateFailed
- if hasMoreAttempts && isForTargetStorage && isReadyOrFailed {
+ if isForTargetStorage && isReadyOrFailed {
updatedAt := time.Now().UTC()
event.Attempt--
event.State = JobStateInProgress
@@ -101,6 +115,10 @@ func (s *memoryReplicationEventQueue) Acknowledge(_ context.Context, state JobSt
return nil, fmt.Errorf("event not in progress, can't be acknowledged: %d [%s]", s.queued[i].ID, s.queued[i].State)
}
+ if s.queued[i].Attempt == 0 && state == JobStateFailed {
+ return nil, errDeadAckedAsFailed
+ }
+
updatedAt := time.Now().UTC()
s.queued[i].State = state
s.queued[i].UpdatedAt = &updatedAt
@@ -110,12 +128,7 @@ func (s *memoryReplicationEventQueue) Acknowledge(_ context.Context, state JobSt
switch state {
case JobStateCompleted, JobStateCancelled, JobStateDead:
// this event is fully processed and could be removed
- s.remove(i)
- case JobStateFailed:
- if s.queued[i].Attempt == 0 {
- // out of luck for this replication event, remove from queue as no more attempts available
- s.remove(i)
- }
+ s.remove(i, state)
}
break
}
@@ -124,9 +137,39 @@ func (s *memoryReplicationEventQueue) Acknowledge(_ context.Context, state JobSt
return result, nil
}
-// remove deletes i-th element from slice and from tracking map.
+// CountDeadReplicationJobs returns the dead replication job counts by relative path within the given timerange.
+// The timerange beginning is inclusive and ending is exclusive. The in-memory queue stores only the most recent
+// 1000 dead jobs.
+func (s *memoryReplicationEventQueue) CountDeadReplicationJobs(ctx context.Context, from, to time.Time) (map[string]int64, error) {
+ s.RLock()
+ defer s.RUnlock()
+
+ from = from.Add(-time.Nanosecond)
+ dead := map[string]int64{}
+ for _, job := range s.deadJobs {
+ if job.createdAt.After(from) && job.createdAt.Before(to) {
+ dead[job.relativePath]++
+ }
+ }
+
+ return dead, nil
+}
+
+// remove deletes i-th element from the queue and from the in-flight tracking map.
// It doesn't check 'i' for the out of range and must be called with lock protection.
-func (s *memoryReplicationEventQueue) remove(i int) {
+// If state is JobStateDead, the event will be added to the dead job tracker.
+func (s *memoryReplicationEventQueue) remove(i int, state JobState) {
+ if state == JobStateDead {
+ if len(s.deadJobs) >= s.maxDeadJobs {
+ s.deadJobs = s.deadJobs[1:]
+ }
+
+ s.deadJobs = append(s.deadJobs, deadJob{
+ createdAt: s.queued[i].CreatedAt,
+ relativePath: s.queued[i].Job.RelativePath,
+ })
+ }
+
delete(s.dequeued, s.queued[i].ID)
s.queued = append(s.queued[:i], s.queued[i+1:]...)
}
diff --git a/internal/praefect/datastore/memory_test.go b/internal/praefect/datastore/memory_test.go
index 1f33ff200..274e3bb85 100644
--- a/internal/praefect/datastore/memory_test.go
+++ b/internal/praefect/datastore/memory_test.go
@@ -1,13 +1,138 @@
package datastore
import (
+ "fmt"
"sync"
"testing"
+ "time"
"github.com/stretchr/testify/require"
"gitlab.com/gitlab-org/gitaly/internal/testhelper"
)
+func ContractTestCountDeadReplicationJobs(t *testing.T, q ReplicationEventQueue) {
+ ctx, cancel := testhelper.Context()
+ defer cancel()
+
+ const target = "target"
+ ackJobsToDeath := func(t *testing.T) {
+ t.Helper()
+
+ for {
+ jobs, err := q.Dequeue(ctx, target, 1)
+ require.NoError(t, err)
+ if len(jobs) == 0 {
+ break
+ }
+
+ for _, job := range jobs {
+ state := JobStateFailed
+ if job.Attempt == 0 {
+ state = JobStateDead
+ }
+
+ _, err := q.Acknowledge(ctx, state, []uint64{job.ID})
+ require.NoError(t, err)
+ }
+ }
+ }
+
+ // add some other job states to the datastore to ensure they are not counted
+ for relPath, state := range map[string]JobState{"repo/completed-job": JobStateCompleted, "repo/cancelled-job": JobStateCancelled} {
+ _, err := q.Enqueue(ctx, ReplicationEvent{Job: ReplicationJob{RelativePath: relPath, TargetNodeStorage: target}})
+ require.NoError(t, err)
+
+ jobs, err := q.Dequeue(ctx, target, 1)
+ require.NoError(t, err)
+
+ _, err = q.Acknowledge(ctx, state, []uint64{jobs[0].ID})
+ require.NoError(t, err)
+ }
+
+ beforeOldest := time.Now()
+ _, err := q.Enqueue(ctx, ReplicationEvent{Job: ReplicationJob{RelativePath: "old", TargetNodeStorage: target}})
+ require.NoError(t, err)
+ afterOldest := time.Now()
+
+ dead, err := q.CountDeadReplicationJobs(ctx, beforeOldest, time.Now())
+ require.NoError(t, err)
+ require.Empty(t, dead, "should not include ready jobs")
+
+ jobs, err := q.Dequeue(ctx, target, 1)
+ require.NoError(t, err)
+
+ _, err = q.Acknowledge(ctx, JobStateFailed, []uint64{jobs[0].ID})
+ require.NoError(t, err)
+
+ dead, err = q.CountDeadReplicationJobs(ctx, beforeOldest, time.Now())
+ require.NoError(t, err)
+ require.Empty(t, dead, "should not include failed jobs")
+
+ ackJobsToDeath(t)
+ dead, err = q.CountDeadReplicationJobs(ctx, beforeOldest, time.Now())
+ require.NoError(t, err)
+ require.Equal(t, map[string]int64{"old": 1}, dead, "should include dead job")
+
+ _, err = q.Enqueue(ctx, ReplicationEvent{Job: ReplicationJob{RelativePath: "new", TargetNodeStorage: target}})
+ require.NoError(t, err)
+ afterMiddle := time.Now()
+
+ ackJobsToDeath(t)
+ dead, err = q.CountDeadReplicationJobs(ctx, beforeOldest, time.Now())
+ require.NoError(t, err)
+ require.Equal(t, map[string]int64{"old": 1, "new": 1}, dead, "should include both dead jobs")
+
+ time.Sleep(time.Millisecond)
+ _, err = q.Enqueue(ctx, ReplicationEvent{Job: ReplicationJob{RelativePath: "new", TargetNodeStorage: target}})
+ require.NoError(t, err)
+
+ ackJobsToDeath(t)
+ dead, err = q.CountDeadReplicationJobs(ctx, beforeOldest, time.Now())
+ require.NoError(t, err)
+ require.Equal(t, map[string]int64{"old": 1, "new": 2}, dead, "dead job are grouped by relative path")
+
+ dead, err = q.CountDeadReplicationJobs(ctx, afterOldest, afterMiddle)
+ require.NoError(t, err)
+ require.Equal(t, map[string]int64{"new": 1}, dead, "should only count the in-between dead job")
+}
+
+func TestMemoryCountDeadReplicationJobs(t *testing.T) {
+ ContractTestCountDeadReplicationJobs(t, NewMemoryReplicationEventQueue())
+}
+
+func TestMemoryCountDeadReplicationJobsLimit(t *testing.T) {
+ q := NewMemoryReplicationEventQueue().(*memoryReplicationEventQueue)
+ q.maxDeadJobs = 2
+
+ ctx, cancel := testhelper.Context()
+ defer cancel()
+
+ const target = "target"
+
+ beforeAll := time.Now()
+ for i := 0; i < q.maxDeadJobs+1; i++ {
+ job, err := q.Enqueue(ctx, ReplicationEvent{Job: ReplicationJob{RelativePath: fmt.Sprintf("job-%d", i), TargetNodeStorage: target}})
+ require.NoError(t, err)
+
+ for i := 0; i < job.Attempt; i++ {
+ _, err := q.Dequeue(ctx, target, 1)
+ require.NoError(t, err)
+
+ state := JobStateFailed
+ if i == job.Attempt-1 {
+ state = JobStateDead
+ }
+
+ _, err = q.Acknowledge(ctx, state, []uint64{job.ID})
+ require.NoError(t, err)
+ }
+ }
+
+ dead, err := q.CountDeadReplicationJobs(ctx, beforeAll, time.Now())
+ require.NoError(t, err)
+ require.Equal(t, map[string]int64{"job-1": 1, "job-2": 1}, dead, "should only include the last two dead jobs")
+}
+
func TestMemoryReplicationEventQueue(t *testing.T) {
ctx, cancel := testhelper.Context()
defer cancel()
@@ -124,7 +249,11 @@ func TestMemoryReplicationEventQueue(t *testing.T) {
}
require.Equal(t, expAttempt3, dequeuedAttempt3[0])
- acknowledgedAttempt3, err := queue.Acknowledge(ctx, JobStateFailed, []uint64{event1.ID})
+ ackFailedNoAttemptsLeft, err := queue.Acknowledge(ctx, JobStateFailed, []uint64{event1.ID})
+ require.Error(t, errDeadAckedAsFailed, err)
+ require.Empty(t, ackFailedNoAttemptsLeft)
+
+ acknowledgedAttempt3, err := queue.Acknowledge(ctx, JobStateDead, []uint64{event1.ID})
require.NoError(t, err)
require.Equal(t, []uint64{event1.ID}, acknowledgedAttempt3, "one event must be acknowledged")
diff --git a/internal/praefect/datastore/queue.go b/internal/praefect/datastore/queue.go
index f9fbd41c5..6de79e6ce 100644
--- a/internal/praefect/datastore/queue.go
+++ b/internal/praefect/datastore/queue.go
@@ -8,6 +8,7 @@ import (
"fmt"
"time"
+ "gitlab.com/gitlab-org/gitaly/internal/helper"
"gitlab.com/gitlab-org/gitaly/internal/praefect/datastore/glsql"
)
@@ -21,6 +22,9 @@ type ReplicationEventQueue interface {
// It only updates events that are in 'in_progress' state.
// It returns list of ids that was actually acknowledged.
Acknowledge(ctx context.Context, state JobState, ids []uint64) ([]uint64, error)
+ // CountDeadReplicationJobs returns the dead replication job counts by relative path within the
+ // given timerange. The timerange beginning is inclusive and ending is exclusive.
+ CountDeadReplicationJobs(ctx context.Context, from, to time.Time) (map[string]int64, error)
}
func allowToAck(state JobState) error {
@@ -148,6 +152,10 @@ type PostgresReplicationEventQueue struct {
qc glsql.Querier
}
+func (rq PostgresReplicationEventQueue) CountDeadReplicationJobs(ctx context.Context, from, to time.Time) (map[string]int64, error) {
+ return nil, helper.Unimplemented
+}
+
func (rq PostgresReplicationEventQueue) Enqueue(ctx context.Context, event ReplicationEvent) (ReplicationEvent, error) {
query := `
WITH insert_lock AS (