diff options
author | Pavlo Strokov <pstrokov@gitlab.com> | 2020-07-15 13:33:50 +0300 |
---|---|---|
committer | Pavlo Strokov <pstrokov@gitlab.com> | 2020-07-15 13:33:50 +0300 |
commit | b20de222e88a56b6d2697285745a3273f4ac5fa3 (patch) | |
tree | 04b00512a77af8afd5a74291d983b49cd6c42f1e | |
parent | ea5ad320cd282aa17267887e40de22ae01b6aa03 (diff) |
Praefect: handling of stale replication jobs
New background job to move stale replication events to the next
state: 'failed' or 'dead'. The event considered stale if the
processing of takes too long without any health-pings.
Health-ping is defined by the 'triggered_at' column of the
'replication_queue_job_lock' that exists only if replication
event is 'in_progress' state.
The check executed periodically and starts with start of
the service. The check covers all storages at once.
Check stops when the passed in context is cancelled/expired.
Part of: https://gitlab.com/gitlab-org/gitaly/-/issues/2873
-rw-r--r-- | changelogs/unreleased/ps-process-stale-replication.yml | 5 | ||||
-rw-r--r-- | cmd/praefect/main.go | 1 | ||||
-rw-r--r-- | internal/praefect/datastore/memory.go | 20 | ||||
-rw-r--r-- | internal/praefect/datastore/queue.go | 45 | ||||
-rw-r--r-- | internal/praefect/datastore/queue_test.go | 112 | ||||
-rw-r--r-- | internal/praefect/replicator.go | 27 | ||||
-rw-r--r-- | internal/praefect/replicator_test.go | 41 |
7 files changed, 250 insertions, 1 deletions
diff --git a/changelogs/unreleased/ps-process-stale-replication.yml b/changelogs/unreleased/ps-process-stale-replication.yml new file mode 100644 index 000000000..5f7a43006 --- /dev/null +++ b/changelogs/unreleased/ps-process-stale-replication.yml @@ -0,0 +1,5 @@ +--- +title: 'Praefect: handling of stale replication jobs' +merge_request: 2322 +author: +type: added diff --git a/cmd/praefect/main.go b/cmd/praefect/main.go index 0f42edc77..efd65e91d 100644 --- a/cmd/praefect/main.go +++ b/cmd/praefect/main.go @@ -316,6 +316,7 @@ func run(cfgs []starter.Config, conf config.Config) error { } repl.ProcessBacklog(ctx, praefect.ExpBackoffFunc(1*time.Second, 5*time.Second)) + repl.ProcessStale(ctx, 30*time.Second, time.Minute) return b.Wait(conf.GracefulStopTimeout.Duration()) } diff --git a/internal/praefect/datastore/memory.go b/internal/praefect/datastore/memory.go index be3cfc37c..d2c981e5c 100644 --- a/internal/praefect/datastore/memory.go +++ b/internal/praefect/datastore/memory.go @@ -263,6 +263,12 @@ func (s *memoryReplicationEventQueue) StartHealthUpdate(context.Context, <-chan return nil } +func (s *memoryReplicationEventQueue) AcknowledgeStale(context.Context, time.Duration) error { + // this implementation has no problem of stale replication events as it has no information about + // job processing after restart of the application + return nil +} + // remove deletes i-th element from the queue and from the in-flight tracking map. // It doesn't check 'i' for the out of range and must be called with lock protection. func (s *memoryReplicationEventQueue) remove(i int) { @@ -286,6 +292,8 @@ type ReplicationEventQueueInterceptor interface { OnAcknowledge(func(context.Context, JobState, []uint64, ReplicationEventQueue) ([]uint64, error)) // OnStartHealthUpdate allows to set action that would be executed each time when `StartHealthUpdate` method called. OnStartHealthUpdate(func(context.Context, <-chan time.Time, []ReplicationEvent) error) + // OnAcknowledgeStale allows to set action that would be executed each time when `AcknowledgeStale` method called. + OnAcknowledgeStale(func(context.Context, time.Duration) error) } // NewReplicationEventQueueInterceptor returns interception over `ReplicationEventQueue` interface. @@ -299,6 +307,7 @@ type replicationEventQueueInterceptor struct { onDequeue func(context.Context, string, string, int, ReplicationEventQueue) ([]ReplicationEvent, error) onAcknowledge func(context.Context, JobState, []uint64, ReplicationEventQueue) ([]uint64, error) onStartHealthUpdate func(context.Context, <-chan time.Time, []ReplicationEvent) error + onAcknowledgeStale func(context.Context, time.Duration) error } func (i *replicationEventQueueInterceptor) OnEnqueue(action func(context.Context, ReplicationEvent, ReplicationEventQueue) (ReplicationEvent, error)) { @@ -317,6 +326,10 @@ func (i *replicationEventQueueInterceptor) OnStartHealthUpdate(action func(conte i.onStartHealthUpdate = action } +func (i *replicationEventQueueInterceptor) OnAcknowledgeStale(action func(context.Context, time.Duration) error) { + i.onAcknowledgeStale = action +} + func (i *replicationEventQueueInterceptor) Enqueue(ctx context.Context, event ReplicationEvent) (ReplicationEvent, error) { if i.onEnqueue != nil { return i.onEnqueue(ctx, event, i.ReplicationEventQueue) @@ -344,3 +357,10 @@ func (i *replicationEventQueueInterceptor) StartHealthUpdate(ctx context.Context } return i.ReplicationEventQueue.StartHealthUpdate(ctx, trigger, events) } + +func (i *replicationEventQueueInterceptor) AcknowledgeStale(ctx context.Context, staleAfter time.Duration) error { + if i.onAcknowledgeStale != nil { + return i.onAcknowledgeStale(ctx, staleAfter) + } + return i.ReplicationEventQueue.AcknowledgeStale(ctx, staleAfter) +} diff --git a/internal/praefect/datastore/queue.go b/internal/praefect/datastore/queue.go index e64a84e92..8d1d6e941 100644 --- a/internal/praefect/datastore/queue.go +++ b/internal/praefect/datastore/queue.go @@ -38,6 +38,11 @@ type ReplicationEventQueue interface { // The health update will be executed on each new entry received from trigger channel passed in. // It is a blocking call that is managed by the passed in context. StartHealthUpdate(ctx context.Context, trigger <-chan time.Time, events []ReplicationEvent) error + // AcknowledgeStale moves replication events that are 'in_progress' state for too long (more than staleAfter) + // into the next state: + // 'failed' - in case it has more attempts to be executed + // 'dead' - in case it has no more attempts to be executed + AcknowledgeStale(ctx context.Context, staleAfter time.Duration) error } func allowToAck(state JobState) error { @@ -442,3 +447,43 @@ func (rq PostgresReplicationEventQueue) StartHealthUpdate(ctx context.Context, t } } } + +// AcknowledgeStale moves replication events that are 'in_progress' state for too long (more then staleAfter) +// into the next state: +// 'failed' - in case it has more attempts to be executed +// 'dead' - in case it has no more attempts to be executed +// The job considered 'in_progress' if it has corresponding entry in the 'replication_queue_job_lock' table. +// When moving from 'in_progress' to other state the entry from 'replication_queue_job_lock' table will be +// removed and entry in the 'replication_queue_lock' will be updated if needed (release of the lock). +func (rq PostgresReplicationEventQueue) AcknowledgeStale(ctx context.Context, staleAfter time.Duration) error { + query := ` + WITH stale_job_lock AS ( + DELETE FROM replication_queue_job_lock WHERE triggered_at < NOW() - INTERVAL '1 MILLISECOND' * $1 + RETURNING job_id, lock_id + ) + , update_job AS ( + UPDATE replication_queue AS queue + SET state = (CASE WHEN attempt >= 1 THEN 'failed' ELSE 'dead' END)::REPLICATION_JOB_STATE + FROM stale_job_lock + WHERE stale_job_lock.job_id = queue.id + RETURNING queue.id, queue.lock_id + ) + UPDATE replication_queue_lock + SET acquired = FALSE + WHERE id IN ( + SELECT existing.lock_id + FROM (SELECT lock_id, COUNT(*) AS amount FROM stale_job_lock GROUP BY lock_id) AS removed + JOIN ( + SELECT lock_id, COUNT(*) AS amount + FROM replication_queue_job_lock + WHERE lock_id IN (SELECT lock_id FROM stale_job_lock) + GROUP BY lock_id + ) AS existing ON removed.lock_id = existing.lock_id AND removed.amount = existing.amount + )` + _, err := rq.qc.ExecContext(ctx, query, staleAfter.Milliseconds()) + if err != nil { + return fmt.Errorf("exec acknowledge stale: %w", err) + } + + return nil +} diff --git a/internal/praefect/datastore/queue_test.go b/internal/praefect/datastore/queue_test.go index 06cdfebd3..c8be612e8 100644 --- a/internal/praefect/datastore/queue_test.go +++ b/internal/praefect/datastore/queue_test.go @@ -975,6 +975,118 @@ func TestPostgresReplicationEventQueue_StartHealthUpdate(t *testing.T) { }) } +func TestPostgresReplicationEventQueue_AcknowledgeStale(t *testing.T) { + db := getDB(t) + + ctx, cancel := testhelper.Context() + defer cancel() + + source := PostgresReplicationEventQueue{qc: db} + + eventType := ReplicationEvent{Job: ReplicationJob{ + Change: UpdateRepo, + RelativePath: "/project/path-1", + TargetNodeStorage: "gitaly-1", + SourceNodeStorage: "gitaly-0", + VirtualStorage: "praefect-1", + }} + + eventType1 := eventType + + eventType2 := eventType + eventType2.Job.VirtualStorage = "praefect-2" + + eventType3 := eventType2 + eventType3.Job.RelativePath = "/project/path-2" + eventType3.Job.TargetNodeStorage = "gitaly-2" + + eventType4 := eventType3 + eventType4.Job.TargetNodeStorage = "gitaly-3" + + t.Run("no stale jobs yet", func(t *testing.T) { + db.TruncateAll(t) + + event, err := source.Enqueue(ctx, eventType1) + require.NoError(t, err) + + devents, err := source.Dequeue(ctx, event.Job.VirtualStorage, event.Job.TargetNodeStorage, 1) + require.NoError(t, err) + + // events triggered just now (< 1 sec ago), so nothing considered stale + require.NoError(t, source.AcknowledgeStale(ctx, time.Second)) + requireEvents(t, ctx, db, devents) + }) + + t.Run("jobs considered stale only at 'in_progress' state", func(t *testing.T) { + db.TruncateAll(t) + + // move event to 'ready' state + event1, err := source.Enqueue(ctx, eventType1) + require.NoError(t, err) + + // move event to 'failed' state + event2, err := source.Enqueue(ctx, eventType2) + require.NoError(t, err) + devents2, err := source.Dequeue(ctx, event2.Job.VirtualStorage, event2.Job.TargetNodeStorage, 1) + require.NoError(t, err) + require.Equal(t, event2.ID, devents2[0].ID) + _, err = source.Acknowledge(ctx, JobStateFailed, []uint64{devents2[0].ID}) + require.NoError(t, err) + + // move event to 'dead' state + event3, err := source.Enqueue(ctx, eventType3) + require.NoError(t, err) + devents3, err := source.Dequeue(ctx, event3.Job.VirtualStorage, event3.Job.TargetNodeStorage, 1) + require.NoError(t, err) + require.Equal(t, event3.ID, devents3[0].ID) + _, err = source.Acknowledge(ctx, JobStateDead, []uint64{devents3[0].ID}) + require.NoError(t, err) + + event4, err := source.Enqueue(ctx, eventType4) + require.NoError(t, err) + devents4, err := source.Dequeue(ctx, event4.Job.VirtualStorage, event4.Job.TargetNodeStorage, 1) + require.NoError(t, err) + + require.NoError(t, source.AcknowledgeStale(ctx, time.Microsecond)) + + devents2[0].State = JobStateFailed + devents3[0].State = JobStateDead + devents4[0].Attempt = 2 + devents4[0].State = JobStateFailed + requireEvents(t, ctx, db, []ReplicationEvent{event1, devents2[0], devents3[0], devents4[0]}) + }) + + t.Run("stale jobs updated for all virtual storages and storages at once", func(t *testing.T) { + db.TruncateAll(t) + + var exp []ReplicationEvent + for _, eventType := range []ReplicationEvent{eventType1, eventType2, eventType3} { + event, err := source.Enqueue(ctx, eventType) + require.NoError(t, err) + devents, err := source.Dequeue(ctx, event.Job.VirtualStorage, event.Job.TargetNodeStorage, 1) + require.NoError(t, err) + exp = append(exp, devents...) + } + + for event, i := exp[0], 0; i < 2; i++ { // consume all processing attempts to verify that state will be changed to 'dead' + _, err := source.Acknowledge(ctx, JobStateFailed, []uint64{event.ID}) + require.NoError(t, err) + _, err = source.Dequeue(ctx, event.Job.VirtualStorage, event.Job.TargetNodeStorage, 1) + require.NoError(t, err) + } + + require.NoError(t, source.AcknowledgeStale(ctx, time.Microsecond)) + + exp[0].State = JobStateDead + exp[0].Attempt = 0 + for i := range exp[1:] { + exp[1+i].State = JobStateFailed + } + + requireEvents(t, ctx, db, exp) + }) +} + func requireEvents(t *testing.T, ctx context.Context, db glsql.DB, expected []ReplicationEvent) { t.Helper() diff --git a/internal/praefect/replicator.go b/internal/praefect/replicator.go index a52faad23..82de3ca28 100644 --- a/internal/praefect/replicator.go +++ b/internal/praefect/replicator.go @@ -331,7 +331,7 @@ func WithReplicator(r Replicator) ReplMgrOpt { const ( logWithReplJobID = "replication_job_id" logWithReplTarget = "replication_job_target" - logWithCorrID = "replication_correlation_id" + logWithCorrID = "correlation_id" ) type backoff func() time.Duration @@ -376,6 +376,31 @@ func (r ReplMgr) ProcessBacklog(ctx context.Context, b BackoffFunc) { } } +// ProcessStale starts a background process to acknowledge stale replication jobs. +// It will process jobs until ctx is Done. +func (r ReplMgr) ProcessStale(ctx context.Context, checkPeriod, staleAfter time.Duration) chan struct{} { + done := make(chan struct{}) + + go func() { + defer close(done) + + t := time.NewTimer(checkPeriod) + for { + select { + case <-t.C: + if err := r.queue.AcknowledgeStale(ctx, staleAfter); err != nil { + r.log.WithError(err).Error("background periodical acknowledgement for stale replication jobs") + } + t.Reset(checkPeriod) + case <-ctx.Done(): + return + } + } + }() + + return done +} + func (r ReplMgr) processBacklog(ctx context.Context, b BackoffFunc, virtualStorage string) { logger := r.log.WithField("virtual_storage", virtualStorage) backoff, reset := b() diff --git a/internal/praefect/replicator_test.go b/internal/praefect/replicator_test.go index 0987a4043..8a8af2b30 100644 --- a/internal/praefect/replicator_test.go +++ b/internal/praefect/replicator_test.go @@ -14,6 +14,9 @@ import ( "github.com/golang/protobuf/proto" "github.com/prometheus/client_golang/prometheus/testutil" + "github.com/sirupsen/logrus" + "github.com/sirupsen/logrus/hooks/test" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" gitalyauth "gitlab.com/gitlab-org/gitaly/auth" gitaly_config "gitlab.com/gitlab-org/gitaly/internal/config" @@ -871,3 +874,41 @@ func TestSubtractUint64(t *testing.T) { }) } } + +func TestReplMgr_ProcessStale(t *testing.T) { + logger := testhelper.DiscardTestLogger(t) + hook := test.NewLocal(logger) + + queue := datastore.NewReplicationEventQueueInterceptor(nil) + mgr := NewReplMgr(logger.WithField("test", t.Name()), nil, queue, nil) + + var counter int32 + queue.OnAcknowledgeStale(func(ctx context.Context, duration time.Duration) error { + counter++ + if counter > 2 { + return assert.AnError + } + return nil + }) + + ctx, cancel := testhelper.Context() + defer cancel() + + ctx, cancel = context.WithTimeout(ctx, 350*time.Millisecond) + defer cancel() + + done := mgr.ProcessStale(ctx, 100*time.Millisecond, time.Second) + + select { + case <-time.After(time.Second): + require.FailNow(t, "execution had stuck") + case <-done: + } + + require.Equal(t, int32(3), counter) + require.Len(t, hook.Entries, 1) + require.Equal(t, logrus.ErrorLevel, hook.LastEntry().Level) + require.Equal(t, "background periodical acknowledgement for stale replication jobs", hook.LastEntry().Message) + require.Equal(t, "replication_manager", hook.LastEntry().Data["component"]) + require.Equal(t, assert.AnError, hook.LastEntry().Data["error"]) +} |