diff options
author | Paul Okstad <pokstad@gitlab.com> | 2020-03-19 07:00:27 +0300 |
---|---|---|
committer | Paul Okstad <pokstad@gitlab.com> | 2020-03-19 07:00:27 +0300 |
commit | e5980ffdbb7e9d5dcb13648248de0630fb201a56 (patch) | |
tree | e3d26bae9f34f403dfaa4268aa39cc1565f5ec83 | |
parent | 9f5a7f456d3f62d608dd59a2a19a25cc2e7f3f34 (diff) | |
parent | 20ba20be8be4fc97f7ba5b1c3b25615349bdce9e (diff) |
Merge branch 'ps-in-memory-replication-event-queue' into 'master'
Praefect: in-memory replication event queue
See merge request gitlab-org/gitaly!1908
-rw-r--r-- | internal/praefect/datastore/memory.go | 133 | ||||
-rw-r--r-- | internal/praefect/datastore/memory_test.go | 208 | ||||
-rw-r--r-- | internal/praefect/datastore/queue.go | 13 |
3 files changed, 354 insertions, 0 deletions
diff --git a/internal/praefect/datastore/memory.go b/internal/praefect/datastore/memory.go new file mode 100644 index 000000000..e5408e6f0 --- /dev/null +++ b/internal/praefect/datastore/memory.go @@ -0,0 +1,133 @@ +package datastore + +import ( + "context" + "fmt" + "sync" + "time" +) + +// NewMemoryReplicationEventQueue return in-memory implementation of the ReplicationEventQueue. +func NewMemoryReplicationEventQueue() ReplicationEventQueue { + return &memoryReplicationEventQueue{dequeued: map[uint64]struct{}{}} +} + +// memoryReplicationEventQueue implements queue interface with in-memory implementation of storage +type memoryReplicationEventQueue struct { + sync.RWMutex + seq uint64 // used to generate unique identifiers for events + queued []ReplicationEvent // all new events stored as queue + dequeued map[uint64]struct{} // all events dequeued, but not yet acknowledged +} + +// nextID returns a new sequential ID for new events. +// Needs to be called with lock protection. +func (s *memoryReplicationEventQueue) nextID() uint64 { + s.seq++ + return s.seq +} + +func (s *memoryReplicationEventQueue) Enqueue(_ context.Context, event ReplicationEvent) (ReplicationEvent, error) { + event.Attempt = 3 + event.State = JobStateReady + event.CreatedAt = time.Now().UTC() + // event.LockID is unnecessary with an in memory data store as it is intended to synchronize multiple praefect instances + + s.Lock() + defer s.Unlock() + event.ID = s.nextID() + s.queued = append(s.queued, event) + return event, nil +} + +func (s *memoryReplicationEventQueue) Dequeue(_ context.Context, nodeStorage string, count int) ([]ReplicationEvent, error) { + s.Lock() + defer s.Unlock() + + var result []ReplicationEvent + for i := 0; i < len(s.queued); i++ { + event := s.queued[i] + + hasMoreAttempts := event.Attempt > 0 + isForTargetStorage := event.Job.TargetNodeStorage == nodeStorage + isReadyOrFailed := event.State == JobStateReady || event.State == JobStateFailed + + if hasMoreAttempts && isForTargetStorage && isReadyOrFailed { + updatedAt := time.Now().UTC() + event.Attempt-- + event.State = JobStateInProgress + event.UpdatedAt = &updatedAt + + s.queued[i] = event + s.dequeued[event.ID] = struct{}{} + result = append(result, event) + + if len(result) >= count { + break + } + } + } + + return result, nil +} + +func (s *memoryReplicationEventQueue) Acknowledge(_ context.Context, state JobState, ids []uint64) ([]uint64, error) { + if len(ids) == 0 { + return nil, nil + } + + if err := allowToAck(state); err != nil { + return nil, err + } + + s.Lock() + defer s.Unlock() + + var result []uint64 + for _, id := range ids { + if _, found := s.dequeued[id]; !found { + // event was not dequeued from the queue, so it can't be acknowledged + continue + } + + for i := 0; i < len(s.queued); i++ { + if s.queued[i].ID != id { + continue + } + + if s.queued[i].State != JobStateInProgress { + return nil, fmt.Errorf("event not in progress, can't be acknowledged: %d [%s]", s.queued[i].ID, s.queued[i].State) + } + + updatedAt := time.Now().UTC() + s.queued[i].State = state + s.queued[i].UpdatedAt = &updatedAt + + result = append(result, id) + + switch state { + case JobStateCompleted: + // this event is fully processed and could be removed + s.remove(i) + case JobStateFailed: + if s.queued[i].Attempt == 0 { + // out of luck for this replication event, remove from queue as no more attempts available + s.remove(i) + } + case JobStateCancelled: + // out of luck for this replication event, remove from queue as no more attempts available + s.remove(i) + } + break + } + } + + return result, nil +} + +// remove deletes i-th element from slice and from tracking map. +// It doesn't check 'i' for the out of range and must be called with lock protection. +func (s *memoryReplicationEventQueue) remove(i int) { + delete(s.dequeued, s.queued[i].ID) + s.queued = append(s.queued[:i], s.queued[i+1:]...) +} diff --git a/internal/praefect/datastore/memory_test.go b/internal/praefect/datastore/memory_test.go new file mode 100644 index 000000000..c55f9df50 --- /dev/null +++ b/internal/praefect/datastore/memory_test.go @@ -0,0 +1,208 @@ +package datastore + +import ( + "sync" + "testing" + + "github.com/stretchr/testify/require" + "gitlab.com/gitlab-org/gitaly/internal/testhelper" +) + +func TestMemoryReplicationEventQueue(t *testing.T) { + ctx, cancel := testhelper.Context() + defer cancel() + + queue := NewMemoryReplicationEventQueue() + + noEvents, err := queue.Dequeue(ctx, "storage-1", 100500) + require.NoError(t, err) + require.Empty(t, noEvents, "no events as queue is empty") + + noAcknowledged, err := queue.Acknowledge(ctx, JobStateCompleted, []uint64{1, 2, 3}) + require.NoError(t, err) + require.Empty(t, noAcknowledged, "no acknowledged ids as queue is empty") + + job1 := ReplicationJob{ + Change: UpdateRepo, + RelativePath: "/project/path-1", + TargetNodeStorage: "storage-1", + SourceNodeStorage: "storage-0", + Params: nil, + } + + eventType1 := ReplicationEvent{Job: job1} + + event1, err := queue.Enqueue(ctx, eventType1) + require.NoError(t, err) + + expEvent1 := ReplicationEvent{ + ID: 1, + State: JobStateReady, + Attempt: 3, + LockID: "", + CreatedAt: event1.CreatedAt, // it is a hack to have same time for both + Job: job1, + } + require.Equal(t, expEvent1, event1) + + notAcknowledged1, err := queue.Acknowledge(ctx, JobStateCompleted, []uint64{event1.ID}) + require.NoError(t, err) + require.Empty(t, notAcknowledged1, "no acknowledged ids as events were not dequeued") + + job2 := ReplicationJob{ + Change: UpdateRepo, + RelativePath: "/project/path-1", + TargetNodeStorage: "storage-2", + SourceNodeStorage: "storage-0", + Params: nil, + } + eventType2 := ReplicationEvent{Job: job2} + + event2, err := queue.Enqueue(ctx, eventType2) + require.NoError(t, err) + + expEvent2 := ReplicationEvent{ + ID: 2, + State: JobStateReady, + Attempt: 3, + LockID: "", + CreatedAt: event2.CreatedAt, // it is a hack to have same time for both + Job: job2, + } + require.Equal(t, expEvent2, event2) + + dequeuedAttempt1, err := queue.Dequeue(ctx, "storage-1", 100500) + require.NoError(t, err) + require.Len(t, dequeuedAttempt1, 1, "only single event must be fetched for this storage") + + expAttempt1 := ReplicationEvent{ + ID: 1, + State: JobStateInProgress, + Attempt: 2, + LockID: "", + CreatedAt: event1.CreatedAt, // it is a hack to have same time for both + UpdatedAt: dequeuedAttempt1[0].UpdatedAt, // it is a hack to have same time for both + Job: job1, + } + require.Equal(t, expAttempt1, dequeuedAttempt1[0]) + + acknowledgedAttempt1, err := queue.Acknowledge(ctx, JobStateFailed, []uint64{event1.ID, event2.ID}) + require.NoError(t, err) + require.Equal(t, []uint64{event1.ID}, acknowledgedAttempt1, "one event must be acknowledged") + + dequeuedAttempt2, err := queue.Dequeue(ctx, "storage-1", 100500) + require.NoError(t, err) + require.Len(t, dequeuedAttempt2, 1, "only single event must be fetched for this storage") + + expAttempt2 := ReplicationEvent{ + ID: 1, + State: JobStateInProgress, + Attempt: 1, + LockID: "", + CreatedAt: event1.CreatedAt, // it is a hack to have same time for both + UpdatedAt: dequeuedAttempt2[0].UpdatedAt, // it is a hack to have same time for both + Job: job1, + } + require.Equal(t, expAttempt2, dequeuedAttempt2[0]) + + acknowledgedAttempt2, err := queue.Acknowledge(ctx, JobStateFailed, []uint64{event1.ID}) + require.NoError(t, err) + require.Equal(t, []uint64{event1.ID}, acknowledgedAttempt2, "one event must be acknowledged") + + dequeuedAttempt3, err := queue.Dequeue(ctx, "storage-1", 100500) + require.NoError(t, err) + require.Len(t, dequeuedAttempt3, 1, "only single event must be fetched for this storage") + + expAttempt3 := ReplicationEvent{ + ID: 1, + State: JobStateInProgress, + Attempt: 0, + LockID: "", + CreatedAt: event1.CreatedAt, // it is a hack to have same time for both + UpdatedAt: dequeuedAttempt3[0].UpdatedAt, // it is a hack to have same time for both + Job: job1, + } + require.Equal(t, expAttempt3, dequeuedAttempt3[0]) + + acknowledgedAttempt3, err := queue.Acknowledge(ctx, JobStateFailed, []uint64{event1.ID}) + require.NoError(t, err) + require.Equal(t, []uint64{event1.ID}, acknowledgedAttempt3, "one event must be acknowledged") + + dequeuedAttempt4, err := queue.Dequeue(ctx, "storage-1", 100500) + require.NoError(t, err) + require.Empty(t, dequeuedAttempt4, "all attempts to process job were used") + + dequeuedAttempt5, err := queue.Dequeue(ctx, "storage-2", 100500) + require.NoError(t, err) + require.Len(t, dequeuedAttempt5, 1, "only single event must be fetched for this storage") + + expAttempt5 := ReplicationEvent{ + ID: 2, + State: JobStateInProgress, + Attempt: 2, + LockID: "", + CreatedAt: event2.CreatedAt, // it is a hack to have same time for both + UpdatedAt: dequeuedAttempt5[0].UpdatedAt, // it is a hack to have same time for both + Job: job2, + } + require.Equal(t, expAttempt5, dequeuedAttempt5[0]) + + acknowledgedAttempt5, err := queue.Acknowledge(ctx, JobStateCompleted, []uint64{event2.ID}) + require.NoError(t, err) + require.Equal(t, []uint64{event2.ID}, acknowledgedAttempt5, "one event must be acknowledged") + + dequeuedAttempt6, err := queue.Dequeue(ctx, "storage-2", 100500) + require.NoError(t, err) + require.Empty(t, dequeuedAttempt6, "all jobs marked as completed for this storage") +} + +func TestMemoryReplicationEventQueue_ConcurrentAccess(t *testing.T) { + ctx, cancel := testhelper.Context() + defer cancel() + + queue := NewMemoryReplicationEventQueue() + + job1 := ReplicationJob{ + Change: UpdateRepo, + RelativePath: "/project/path-1", + TargetNodeStorage: "storage-1", + SourceNodeStorage: "storage-0", + } + + job2 := ReplicationJob{ + Change: UpdateRepo, + RelativePath: "/project/path-1", + TargetNodeStorage: "storage-2", + SourceNodeStorage: "storage-0", + } + + eventType1 := ReplicationEvent{Job: job1} + eventType2 := ReplicationEvent{Job: job2} + + var checkScenario = func(wg *sync.WaitGroup, event ReplicationEvent, state JobState) { + defer wg.Done() + + created, err := queue.Enqueue(ctx, event) + require.NoError(t, err) + + dequeued, err := queue.Dequeue(ctx, created.Job.TargetNodeStorage, 100500) + require.NoError(t, err) + require.Len(t, dequeued, 1) + require.Equal(t, created.Job, dequeued[0].Job) + + ackIDs, err := queue.Acknowledge(ctx, state, []uint64{created.ID}) + require.NoError(t, err) + require.Len(t, ackIDs, 1) + require.Equal(t, created.ID, ackIDs[0]) + + nothing, err := queue.Dequeue(ctx, created.Job.TargetNodeStorage, 100500) + require.NoError(t, err) + require.Len(t, nothing, 0) + } + + wg := &sync.WaitGroup{} + wg.Add(2) + go checkScenario(wg, eventType1, JobStateCompleted) + go checkScenario(wg, eventType2, JobStateCancelled) + wg.Wait() +} diff --git a/internal/praefect/datastore/queue.go b/internal/praefect/datastore/queue.go index 60888af73..42453f3a4 100644 --- a/internal/praefect/datastore/queue.go +++ b/internal/praefect/datastore/queue.go @@ -23,6 +23,15 @@ type ReplicationEventQueue interface { Acknowledge(ctx context.Context, state JobState, ids []uint64) ([]uint64, error) } +func allowToAck(state JobState) error { + switch state { + case JobStateCompleted, JobStateFailed, JobStateCancelled: + return nil + default: + return fmt.Errorf("event state is not supported: %q", state) + } +} + // ReplicationJob is a persistent representation of the replication job. type ReplicationJob struct { Change ChangeType `json:"change"` @@ -214,6 +223,10 @@ func (rq PostgresReplicationEventQueue) Acknowledge(ctx context.Context, state J return nil, nil } + if err := allowToAck(state); err != nil { + return nil, err + } + params := glsql.NewParamsAssembler() query := ` WITH existing AS ( |