Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPavlo Strokov <pstrokov@gitlab.com>2020-03-19 07:00:27 +0300
committerPaul Okstad <pokstad@gitlab.com>2020-03-19 07:00:27 +0300
commit20ba20be8be4fc97f7ba5b1c3b25615349bdce9e (patch)
tree9961b3d4965e63c268fbdcc76d1e619a1b719614
parentcbc7f2b1aa58f252bb466cf98a860a74f10c6d5e (diff)
Praefect: in-memory replication event queue
This is substitution of the existing in-memory storage. It implements new `ReplicationEventQueue` interface and required as next step towards including SQL storage. Part of: https://gitlab.com/gitlab-org/gitaly/-/issues/2166
-rw-r--r--internal/praefect/datastore/memory.go133
-rw-r--r--internal/praefect/datastore/memory_test.go208
-rw-r--r--internal/praefect/datastore/queue.go13
3 files changed, 354 insertions, 0 deletions
diff --git a/internal/praefect/datastore/memory.go b/internal/praefect/datastore/memory.go
new file mode 100644
index 000000000..e5408e6f0
--- /dev/null
+++ b/internal/praefect/datastore/memory.go
@@ -0,0 +1,133 @@
+package datastore
+
+import (
+ "context"
+ "fmt"
+ "sync"
+ "time"
+)
+
+// NewMemoryReplicationEventQueue return in-memory implementation of the ReplicationEventQueue.
+func NewMemoryReplicationEventQueue() ReplicationEventQueue {
+ return &memoryReplicationEventQueue{dequeued: map[uint64]struct{}{}}
+}
+
+// memoryReplicationEventQueue implements queue interface with in-memory implementation of storage
+type memoryReplicationEventQueue struct {
+ sync.RWMutex
+ seq uint64 // used to generate unique identifiers for events
+ queued []ReplicationEvent // all new events stored as queue
+ dequeued map[uint64]struct{} // all events dequeued, but not yet acknowledged
+}
+
+// nextID returns a new sequential ID for new events.
+// Needs to be called with lock protection.
+func (s *memoryReplicationEventQueue) nextID() uint64 {
+ s.seq++
+ return s.seq
+}
+
+func (s *memoryReplicationEventQueue) Enqueue(_ context.Context, event ReplicationEvent) (ReplicationEvent, error) {
+ event.Attempt = 3
+ event.State = JobStateReady
+ event.CreatedAt = time.Now().UTC()
+ // event.LockID is unnecessary with an in memory data store as it is intended to synchronize multiple praefect instances
+
+ s.Lock()
+ defer s.Unlock()
+ event.ID = s.nextID()
+ s.queued = append(s.queued, event)
+ return event, nil
+}
+
+func (s *memoryReplicationEventQueue) Dequeue(_ context.Context, nodeStorage string, count int) ([]ReplicationEvent, error) {
+ s.Lock()
+ defer s.Unlock()
+
+ var result []ReplicationEvent
+ for i := 0; i < len(s.queued); i++ {
+ event := s.queued[i]
+
+ hasMoreAttempts := event.Attempt > 0
+ isForTargetStorage := event.Job.TargetNodeStorage == nodeStorage
+ isReadyOrFailed := event.State == JobStateReady || event.State == JobStateFailed
+
+ if hasMoreAttempts && isForTargetStorage && isReadyOrFailed {
+ updatedAt := time.Now().UTC()
+ event.Attempt--
+ event.State = JobStateInProgress
+ event.UpdatedAt = &updatedAt
+
+ s.queued[i] = event
+ s.dequeued[event.ID] = struct{}{}
+ result = append(result, event)
+
+ if len(result) >= count {
+ break
+ }
+ }
+ }
+
+ return result, nil
+}
+
+func (s *memoryReplicationEventQueue) Acknowledge(_ context.Context, state JobState, ids []uint64) ([]uint64, error) {
+ if len(ids) == 0 {
+ return nil, nil
+ }
+
+ if err := allowToAck(state); err != nil {
+ return nil, err
+ }
+
+ s.Lock()
+ defer s.Unlock()
+
+ var result []uint64
+ for _, id := range ids {
+ if _, found := s.dequeued[id]; !found {
+ // event was not dequeued from the queue, so it can't be acknowledged
+ continue
+ }
+
+ for i := 0; i < len(s.queued); i++ {
+ if s.queued[i].ID != id {
+ continue
+ }
+
+ if s.queued[i].State != JobStateInProgress {
+ return nil, fmt.Errorf("event not in progress, can't be acknowledged: %d [%s]", s.queued[i].ID, s.queued[i].State)
+ }
+
+ updatedAt := time.Now().UTC()
+ s.queued[i].State = state
+ s.queued[i].UpdatedAt = &updatedAt
+
+ result = append(result, id)
+
+ switch state {
+ case JobStateCompleted:
+ // this event is fully processed and could be removed
+ s.remove(i)
+ case JobStateFailed:
+ if s.queued[i].Attempt == 0 {
+ // out of luck for this replication event, remove from queue as no more attempts available
+ s.remove(i)
+ }
+ case JobStateCancelled:
+ // out of luck for this replication event, remove from queue as no more attempts available
+ s.remove(i)
+ }
+ break
+ }
+ }
+
+ return result, nil
+}
+
+// remove deletes i-th element from slice and from tracking map.
+// It doesn't check 'i' for the out of range and must be called with lock protection.
+func (s *memoryReplicationEventQueue) remove(i int) {
+ delete(s.dequeued, s.queued[i].ID)
+ s.queued = append(s.queued[:i], s.queued[i+1:]...)
+}
diff --git a/internal/praefect/datastore/memory_test.go b/internal/praefect/datastore/memory_test.go
new file mode 100644
index 000000000..c55f9df50
--- /dev/null
+++ b/internal/praefect/datastore/memory_test.go
@@ -0,0 +1,208 @@
+package datastore
+
+import (
+ "sync"
+ "testing"
+
+ "github.com/stretchr/testify/require"
+ "gitlab.com/gitlab-org/gitaly/internal/testhelper"
+)
+
+func TestMemoryReplicationEventQueue(t *testing.T) {
+ ctx, cancel := testhelper.Context()
+ defer cancel()
+
+ queue := NewMemoryReplicationEventQueue()
+
+ noEvents, err := queue.Dequeue(ctx, "storage-1", 100500)
+ require.NoError(t, err)
+ require.Empty(t, noEvents, "no events as queue is empty")
+
+ noAcknowledged, err := queue.Acknowledge(ctx, JobStateCompleted, []uint64{1, 2, 3})
+ require.NoError(t, err)
+ require.Empty(t, noAcknowledged, "no acknowledged ids as queue is empty")
+
+ job1 := ReplicationJob{
+ Change: UpdateRepo,
+ RelativePath: "/project/path-1",
+ TargetNodeStorage: "storage-1",
+ SourceNodeStorage: "storage-0",
+ Params: nil,
+ }
+
+ eventType1 := ReplicationEvent{Job: job1}
+
+ event1, err := queue.Enqueue(ctx, eventType1)
+ require.NoError(t, err)
+
+ expEvent1 := ReplicationEvent{
+ ID: 1,
+ State: JobStateReady,
+ Attempt: 3,
+ LockID: "",
+ CreatedAt: event1.CreatedAt, // it is a hack to have same time for both
+ Job: job1,
+ }
+ require.Equal(t, expEvent1, event1)
+
+ notAcknowledged1, err := queue.Acknowledge(ctx, JobStateCompleted, []uint64{event1.ID})
+ require.NoError(t, err)
+ require.Empty(t, notAcknowledged1, "no acknowledged ids as events were not dequeued")
+
+ job2 := ReplicationJob{
+ Change: UpdateRepo,
+ RelativePath: "/project/path-1",
+ TargetNodeStorage: "storage-2",
+ SourceNodeStorage: "storage-0",
+ Params: nil,
+ }
+ eventType2 := ReplicationEvent{Job: job2}
+
+ event2, err := queue.Enqueue(ctx, eventType2)
+ require.NoError(t, err)
+
+ expEvent2 := ReplicationEvent{
+ ID: 2,
+ State: JobStateReady,
+ Attempt: 3,
+ LockID: "",
+ CreatedAt: event2.CreatedAt, // it is a hack to have same time for both
+ Job: job2,
+ }
+ require.Equal(t, expEvent2, event2)
+
+ dequeuedAttempt1, err := queue.Dequeue(ctx, "storage-1", 100500)
+ require.NoError(t, err)
+ require.Len(t, dequeuedAttempt1, 1, "only single event must be fetched for this storage")
+
+ expAttempt1 := ReplicationEvent{
+ ID: 1,
+ State: JobStateInProgress,
+ Attempt: 2,
+ LockID: "",
+ CreatedAt: event1.CreatedAt, // it is a hack to have same time for both
+ UpdatedAt: dequeuedAttempt1[0].UpdatedAt, // it is a hack to have same time for both
+ Job: job1,
+ }
+ require.Equal(t, expAttempt1, dequeuedAttempt1[0])
+
+ acknowledgedAttempt1, err := queue.Acknowledge(ctx, JobStateFailed, []uint64{event1.ID, event2.ID})
+ require.NoError(t, err)
+ require.Equal(t, []uint64{event1.ID}, acknowledgedAttempt1, "one event must be acknowledged")
+
+ dequeuedAttempt2, err := queue.Dequeue(ctx, "storage-1", 100500)
+ require.NoError(t, err)
+ require.Len(t, dequeuedAttempt2, 1, "only single event must be fetched for this storage")
+
+ expAttempt2 := ReplicationEvent{
+ ID: 1,
+ State: JobStateInProgress,
+ Attempt: 1,
+ LockID: "",
+ CreatedAt: event1.CreatedAt, // it is a hack to have same time for both
+ UpdatedAt: dequeuedAttempt2[0].UpdatedAt, // it is a hack to have same time for both
+ Job: job1,
+ }
+ require.Equal(t, expAttempt2, dequeuedAttempt2[0])
+
+ acknowledgedAttempt2, err := queue.Acknowledge(ctx, JobStateFailed, []uint64{event1.ID})
+ require.NoError(t, err)
+ require.Equal(t, []uint64{event1.ID}, acknowledgedAttempt2, "one event must be acknowledged")
+
+ dequeuedAttempt3, err := queue.Dequeue(ctx, "storage-1", 100500)
+ require.NoError(t, err)
+ require.Len(t, dequeuedAttempt3, 1, "only single event must be fetched for this storage")
+
+ expAttempt3 := ReplicationEvent{
+ ID: 1,
+ State: JobStateInProgress,
+ Attempt: 0,
+ LockID: "",
+ CreatedAt: event1.CreatedAt, // it is a hack to have same time for both
+ UpdatedAt: dequeuedAttempt3[0].UpdatedAt, // it is a hack to have same time for both
+ Job: job1,
+ }
+ require.Equal(t, expAttempt3, dequeuedAttempt3[0])
+
+ acknowledgedAttempt3, err := queue.Acknowledge(ctx, JobStateFailed, []uint64{event1.ID})
+ require.NoError(t, err)
+ require.Equal(t, []uint64{event1.ID}, acknowledgedAttempt3, "one event must be acknowledged")
+
+ dequeuedAttempt4, err := queue.Dequeue(ctx, "storage-1", 100500)
+ require.NoError(t, err)
+ require.Empty(t, dequeuedAttempt4, "all attempts to process job were used")
+
+ dequeuedAttempt5, err := queue.Dequeue(ctx, "storage-2", 100500)
+ require.NoError(t, err)
+ require.Len(t, dequeuedAttempt5, 1, "only single event must be fetched for this storage")
+
+ expAttempt5 := ReplicationEvent{
+ ID: 2,
+ State: JobStateInProgress,
+ Attempt: 2,
+ LockID: "",
+ CreatedAt: event2.CreatedAt, // it is a hack to have same time for both
+ UpdatedAt: dequeuedAttempt5[0].UpdatedAt, // it is a hack to have same time for both
+ Job: job2,
+ }
+ require.Equal(t, expAttempt5, dequeuedAttempt5[0])
+
+ acknowledgedAttempt5, err := queue.Acknowledge(ctx, JobStateCompleted, []uint64{event2.ID})
+ require.NoError(t, err)
+ require.Equal(t, []uint64{event2.ID}, acknowledgedAttempt5, "one event must be acknowledged")
+
+ dequeuedAttempt6, err := queue.Dequeue(ctx, "storage-2", 100500)
+ require.NoError(t, err)
+ require.Empty(t, dequeuedAttempt6, "all jobs marked as completed for this storage")
+}
+
+func TestMemoryReplicationEventQueue_ConcurrentAccess(t *testing.T) {
+ ctx, cancel := testhelper.Context()
+ defer cancel()
+
+ queue := NewMemoryReplicationEventQueue()
+
+ job1 := ReplicationJob{
+ Change: UpdateRepo,
+ RelativePath: "/project/path-1",
+ TargetNodeStorage: "storage-1",
+ SourceNodeStorage: "storage-0",
+ }
+
+ job2 := ReplicationJob{
+ Change: UpdateRepo,
+ RelativePath: "/project/path-1",
+ TargetNodeStorage: "storage-2",
+ SourceNodeStorage: "storage-0",
+ }
+
+ eventType1 := ReplicationEvent{Job: job1}
+ eventType2 := ReplicationEvent{Job: job2}
+
+ var checkScenario = func(wg *sync.WaitGroup, event ReplicationEvent, state JobState) {
+ defer wg.Done()
+
+ created, err := queue.Enqueue(ctx, event)
+ require.NoError(t, err)
+
+ dequeued, err := queue.Dequeue(ctx, created.Job.TargetNodeStorage, 100500)
+ require.NoError(t, err)
+ require.Len(t, dequeued, 1)
+ require.Equal(t, created.Job, dequeued[0].Job)
+
+ ackIDs, err := queue.Acknowledge(ctx, state, []uint64{created.ID})
+ require.NoError(t, err)
+ require.Len(t, ackIDs, 1)
+ require.Equal(t, created.ID, ackIDs[0])
+
+ nothing, err := queue.Dequeue(ctx, created.Job.TargetNodeStorage, 100500)
+ require.NoError(t, err)
+ require.Len(t, nothing, 0)
+ }
+
+ wg := &sync.WaitGroup{}
+ wg.Add(2)
+ go checkScenario(wg, eventType1, JobStateCompleted)
+ go checkScenario(wg, eventType2, JobStateCancelled)
+ wg.Wait()
+}
diff --git a/internal/praefect/datastore/queue.go b/internal/praefect/datastore/queue.go
index 60888af73..42453f3a4 100644
--- a/internal/praefect/datastore/queue.go
+++ b/internal/praefect/datastore/queue.go
@@ -23,6 +23,15 @@ type ReplicationEventQueue interface {
Acknowledge(ctx context.Context, state JobState, ids []uint64) ([]uint64, error)
}
+func allowToAck(state JobState) error {
+ switch state {
+ case JobStateCompleted, JobStateFailed, JobStateCancelled:
+ return nil
+ default:
+ return fmt.Errorf("event state is not supported: %q", state)
+ }
+}
+
// ReplicationJob is a persistent representation of the replication job.
type ReplicationJob struct {
Change ChangeType `json:"change"`
@@ -214,6 +223,10 @@ func (rq PostgresReplicationEventQueue) Acknowledge(ctx context.Context, state J
return nil, nil
}
+ if err := allowToAck(state); err != nil {
+ return nil, err
+ }
+
params := glsql.NewParamsAssembler()
query := `
WITH existing AS (