Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPavlo Strokov <pstrokov@gitlab.com>2020-07-15 13:33:50 +0300
committerPavlo Strokov <pstrokov@gitlab.com>2020-07-15 13:33:50 +0300
commitb20de222e88a56b6d2697285745a3273f4ac5fa3 (patch)
tree04b00512a77af8afd5a74291d983b49cd6c42f1e
parentea5ad320cd282aa17267887e40de22ae01b6aa03 (diff)
Praefect: handling of stale replication jobs
New background job to move stale replication events to the next state: 'failed' or 'dead'. The event considered stale if the processing of takes too long without any health-pings. Health-ping is defined by the 'triggered_at' column of the 'replication_queue_job_lock' that exists only if replication event is 'in_progress' state. The check executed periodically and starts with start of the service. The check covers all storages at once. Check stops when the passed in context is cancelled/expired. Part of: https://gitlab.com/gitlab-org/gitaly/-/issues/2873
-rw-r--r--changelogs/unreleased/ps-process-stale-replication.yml5
-rw-r--r--cmd/praefect/main.go1
-rw-r--r--internal/praefect/datastore/memory.go20
-rw-r--r--internal/praefect/datastore/queue.go45
-rw-r--r--internal/praefect/datastore/queue_test.go112
-rw-r--r--internal/praefect/replicator.go27
-rw-r--r--internal/praefect/replicator_test.go41
7 files changed, 250 insertions, 1 deletions
diff --git a/changelogs/unreleased/ps-process-stale-replication.yml b/changelogs/unreleased/ps-process-stale-replication.yml
new file mode 100644
index 000000000..5f7a43006
--- /dev/null
+++ b/changelogs/unreleased/ps-process-stale-replication.yml
@@ -0,0 +1,5 @@
+---
+title: 'Praefect: handling of stale replication jobs'
+merge_request: 2322
+author:
+type: added
diff --git a/cmd/praefect/main.go b/cmd/praefect/main.go
index 0f42edc77..efd65e91d 100644
--- a/cmd/praefect/main.go
+++ b/cmd/praefect/main.go
@@ -316,6 +316,7 @@ func run(cfgs []starter.Config, conf config.Config) error {
}
repl.ProcessBacklog(ctx, praefect.ExpBackoffFunc(1*time.Second, 5*time.Second))
+ repl.ProcessStale(ctx, 30*time.Second, time.Minute)
return b.Wait(conf.GracefulStopTimeout.Duration())
}
diff --git a/internal/praefect/datastore/memory.go b/internal/praefect/datastore/memory.go
index be3cfc37c..d2c981e5c 100644
--- a/internal/praefect/datastore/memory.go
+++ b/internal/praefect/datastore/memory.go
@@ -263,6 +263,12 @@ func (s *memoryReplicationEventQueue) StartHealthUpdate(context.Context, <-chan
return nil
}
+func (s *memoryReplicationEventQueue) AcknowledgeStale(context.Context, time.Duration) error {
+ // this implementation has no problem of stale replication events as it has no information about
+ // job processing after restart of the application
+ return nil
+}
+
// remove deletes i-th element from the queue and from the in-flight tracking map.
// It doesn't check 'i' for the out of range and must be called with lock protection.
func (s *memoryReplicationEventQueue) remove(i int) {
@@ -286,6 +292,8 @@ type ReplicationEventQueueInterceptor interface {
OnAcknowledge(func(context.Context, JobState, []uint64, ReplicationEventQueue) ([]uint64, error))
// OnStartHealthUpdate allows to set action that would be executed each time when `StartHealthUpdate` method called.
OnStartHealthUpdate(func(context.Context, <-chan time.Time, []ReplicationEvent) error)
+ // OnAcknowledgeStale allows to set action that would be executed each time when `AcknowledgeStale` method called.
+ OnAcknowledgeStale(func(context.Context, time.Duration) error)
}
// NewReplicationEventQueueInterceptor returns interception over `ReplicationEventQueue` interface.
@@ -299,6 +307,7 @@ type replicationEventQueueInterceptor struct {
onDequeue func(context.Context, string, string, int, ReplicationEventQueue) ([]ReplicationEvent, error)
onAcknowledge func(context.Context, JobState, []uint64, ReplicationEventQueue) ([]uint64, error)
onStartHealthUpdate func(context.Context, <-chan time.Time, []ReplicationEvent) error
+ onAcknowledgeStale func(context.Context, time.Duration) error
}
func (i *replicationEventQueueInterceptor) OnEnqueue(action func(context.Context, ReplicationEvent, ReplicationEventQueue) (ReplicationEvent, error)) {
@@ -317,6 +326,10 @@ func (i *replicationEventQueueInterceptor) OnStartHealthUpdate(action func(conte
i.onStartHealthUpdate = action
}
+func (i *replicationEventQueueInterceptor) OnAcknowledgeStale(action func(context.Context, time.Duration) error) {
+ i.onAcknowledgeStale = action
+}
+
func (i *replicationEventQueueInterceptor) Enqueue(ctx context.Context, event ReplicationEvent) (ReplicationEvent, error) {
if i.onEnqueue != nil {
return i.onEnqueue(ctx, event, i.ReplicationEventQueue)
@@ -344,3 +357,10 @@ func (i *replicationEventQueueInterceptor) StartHealthUpdate(ctx context.Context
}
return i.ReplicationEventQueue.StartHealthUpdate(ctx, trigger, events)
}
+
+func (i *replicationEventQueueInterceptor) AcknowledgeStale(ctx context.Context, staleAfter time.Duration) error {
+ if i.onAcknowledgeStale != nil {
+ return i.onAcknowledgeStale(ctx, staleAfter)
+ }
+ return i.ReplicationEventQueue.AcknowledgeStale(ctx, staleAfter)
+}
diff --git a/internal/praefect/datastore/queue.go b/internal/praefect/datastore/queue.go
index e64a84e92..8d1d6e941 100644
--- a/internal/praefect/datastore/queue.go
+++ b/internal/praefect/datastore/queue.go
@@ -38,6 +38,11 @@ type ReplicationEventQueue interface {
// The health update will be executed on each new entry received from trigger channel passed in.
// It is a blocking call that is managed by the passed in context.
StartHealthUpdate(ctx context.Context, trigger <-chan time.Time, events []ReplicationEvent) error
+ // AcknowledgeStale moves replication events that are 'in_progress' state for too long (more than staleAfter)
+ // into the next state:
+ // 'failed' - in case it has more attempts to be executed
+ // 'dead' - in case it has no more attempts to be executed
+ AcknowledgeStale(ctx context.Context, staleAfter time.Duration) error
}
func allowToAck(state JobState) error {
@@ -442,3 +447,43 @@ func (rq PostgresReplicationEventQueue) StartHealthUpdate(ctx context.Context, t
}
}
}
+
+// AcknowledgeStale moves replication events that are 'in_progress' state for too long (more then staleAfter)
+// into the next state:
+// 'failed' - in case it has more attempts to be executed
+// 'dead' - in case it has no more attempts to be executed
+// The job considered 'in_progress' if it has corresponding entry in the 'replication_queue_job_lock' table.
+// When moving from 'in_progress' to other state the entry from 'replication_queue_job_lock' table will be
+// removed and entry in the 'replication_queue_lock' will be updated if needed (release of the lock).
+func (rq PostgresReplicationEventQueue) AcknowledgeStale(ctx context.Context, staleAfter time.Duration) error {
+ query := `
+ WITH stale_job_lock AS (
+ DELETE FROM replication_queue_job_lock WHERE triggered_at < NOW() - INTERVAL '1 MILLISECOND' * $1
+ RETURNING job_id, lock_id
+ )
+ , update_job AS (
+ UPDATE replication_queue AS queue
+ SET state = (CASE WHEN attempt >= 1 THEN 'failed' ELSE 'dead' END)::REPLICATION_JOB_STATE
+ FROM stale_job_lock
+ WHERE stale_job_lock.job_id = queue.id
+ RETURNING queue.id, queue.lock_id
+ )
+ UPDATE replication_queue_lock
+ SET acquired = FALSE
+ WHERE id IN (
+ SELECT existing.lock_id
+ FROM (SELECT lock_id, COUNT(*) AS amount FROM stale_job_lock GROUP BY lock_id) AS removed
+ JOIN (
+ SELECT lock_id, COUNT(*) AS amount
+ FROM replication_queue_job_lock
+ WHERE lock_id IN (SELECT lock_id FROM stale_job_lock)
+ GROUP BY lock_id
+ ) AS existing ON removed.lock_id = existing.lock_id AND removed.amount = existing.amount
+ )`
+ _, err := rq.qc.ExecContext(ctx, query, staleAfter.Milliseconds())
+ if err != nil {
+ return fmt.Errorf("exec acknowledge stale: %w", err)
+ }
+
+ return nil
+}
diff --git a/internal/praefect/datastore/queue_test.go b/internal/praefect/datastore/queue_test.go
index 06cdfebd3..c8be612e8 100644
--- a/internal/praefect/datastore/queue_test.go
+++ b/internal/praefect/datastore/queue_test.go
@@ -975,6 +975,118 @@ func TestPostgresReplicationEventQueue_StartHealthUpdate(t *testing.T) {
})
}
+func TestPostgresReplicationEventQueue_AcknowledgeStale(t *testing.T) {
+ db := getDB(t)
+
+ ctx, cancel := testhelper.Context()
+ defer cancel()
+
+ source := PostgresReplicationEventQueue{qc: db}
+
+ eventType := ReplicationEvent{Job: ReplicationJob{
+ Change: UpdateRepo,
+ RelativePath: "/project/path-1",
+ TargetNodeStorage: "gitaly-1",
+ SourceNodeStorage: "gitaly-0",
+ VirtualStorage: "praefect-1",
+ }}
+
+ eventType1 := eventType
+
+ eventType2 := eventType
+ eventType2.Job.VirtualStorage = "praefect-2"
+
+ eventType3 := eventType2
+ eventType3.Job.RelativePath = "/project/path-2"
+ eventType3.Job.TargetNodeStorage = "gitaly-2"
+
+ eventType4 := eventType3
+ eventType4.Job.TargetNodeStorage = "gitaly-3"
+
+ t.Run("no stale jobs yet", func(t *testing.T) {
+ db.TruncateAll(t)
+
+ event, err := source.Enqueue(ctx, eventType1)
+ require.NoError(t, err)
+
+ devents, err := source.Dequeue(ctx, event.Job.VirtualStorage, event.Job.TargetNodeStorage, 1)
+ require.NoError(t, err)
+
+ // events triggered just now (< 1 sec ago), so nothing considered stale
+ require.NoError(t, source.AcknowledgeStale(ctx, time.Second))
+ requireEvents(t, ctx, db, devents)
+ })
+
+ t.Run("jobs considered stale only at 'in_progress' state", func(t *testing.T) {
+ db.TruncateAll(t)
+
+ // move event to 'ready' state
+ event1, err := source.Enqueue(ctx, eventType1)
+ require.NoError(t, err)
+
+ // move event to 'failed' state
+ event2, err := source.Enqueue(ctx, eventType2)
+ require.NoError(t, err)
+ devents2, err := source.Dequeue(ctx, event2.Job.VirtualStorage, event2.Job.TargetNodeStorage, 1)
+ require.NoError(t, err)
+ require.Equal(t, event2.ID, devents2[0].ID)
+ _, err = source.Acknowledge(ctx, JobStateFailed, []uint64{devents2[0].ID})
+ require.NoError(t, err)
+
+ // move event to 'dead' state
+ event3, err := source.Enqueue(ctx, eventType3)
+ require.NoError(t, err)
+ devents3, err := source.Dequeue(ctx, event3.Job.VirtualStorage, event3.Job.TargetNodeStorage, 1)
+ require.NoError(t, err)
+ require.Equal(t, event3.ID, devents3[0].ID)
+ _, err = source.Acknowledge(ctx, JobStateDead, []uint64{devents3[0].ID})
+ require.NoError(t, err)
+
+ event4, err := source.Enqueue(ctx, eventType4)
+ require.NoError(t, err)
+ devents4, err := source.Dequeue(ctx, event4.Job.VirtualStorage, event4.Job.TargetNodeStorage, 1)
+ require.NoError(t, err)
+
+ require.NoError(t, source.AcknowledgeStale(ctx, time.Microsecond))
+
+ devents2[0].State = JobStateFailed
+ devents3[0].State = JobStateDead
+ devents4[0].Attempt = 2
+ devents4[0].State = JobStateFailed
+ requireEvents(t, ctx, db, []ReplicationEvent{event1, devents2[0], devents3[0], devents4[0]})
+ })
+
+ t.Run("stale jobs updated for all virtual storages and storages at once", func(t *testing.T) {
+ db.TruncateAll(t)
+
+ var exp []ReplicationEvent
+ for _, eventType := range []ReplicationEvent{eventType1, eventType2, eventType3} {
+ event, err := source.Enqueue(ctx, eventType)
+ require.NoError(t, err)
+ devents, err := source.Dequeue(ctx, event.Job.VirtualStorage, event.Job.TargetNodeStorage, 1)
+ require.NoError(t, err)
+ exp = append(exp, devents...)
+ }
+
+ for event, i := exp[0], 0; i < 2; i++ { // consume all processing attempts to verify that state will be changed to 'dead'
+ _, err := source.Acknowledge(ctx, JobStateFailed, []uint64{event.ID})
+ require.NoError(t, err)
+ _, err = source.Dequeue(ctx, event.Job.VirtualStorage, event.Job.TargetNodeStorage, 1)
+ require.NoError(t, err)
+ }
+
+ require.NoError(t, source.AcknowledgeStale(ctx, time.Microsecond))
+
+ exp[0].State = JobStateDead
+ exp[0].Attempt = 0
+ for i := range exp[1:] {
+ exp[1+i].State = JobStateFailed
+ }
+
+ requireEvents(t, ctx, db, exp)
+ })
+}
+
func requireEvents(t *testing.T, ctx context.Context, db glsql.DB, expected []ReplicationEvent) {
t.Helper()
diff --git a/internal/praefect/replicator.go b/internal/praefect/replicator.go
index a52faad23..82de3ca28 100644
--- a/internal/praefect/replicator.go
+++ b/internal/praefect/replicator.go
@@ -331,7 +331,7 @@ func WithReplicator(r Replicator) ReplMgrOpt {
const (
logWithReplJobID = "replication_job_id"
logWithReplTarget = "replication_job_target"
- logWithCorrID = "replication_correlation_id"
+ logWithCorrID = "correlation_id"
)
type backoff func() time.Duration
@@ -376,6 +376,31 @@ func (r ReplMgr) ProcessBacklog(ctx context.Context, b BackoffFunc) {
}
}
+// ProcessStale starts a background process to acknowledge stale replication jobs.
+// It will process jobs until ctx is Done.
+func (r ReplMgr) ProcessStale(ctx context.Context, checkPeriod, staleAfter time.Duration) chan struct{} {
+ done := make(chan struct{})
+
+ go func() {
+ defer close(done)
+
+ t := time.NewTimer(checkPeriod)
+ for {
+ select {
+ case <-t.C:
+ if err := r.queue.AcknowledgeStale(ctx, staleAfter); err != nil {
+ r.log.WithError(err).Error("background periodical acknowledgement for stale replication jobs")
+ }
+ t.Reset(checkPeriod)
+ case <-ctx.Done():
+ return
+ }
+ }
+ }()
+
+ return done
+}
+
func (r ReplMgr) processBacklog(ctx context.Context, b BackoffFunc, virtualStorage string) {
logger := r.log.WithField("virtual_storage", virtualStorage)
backoff, reset := b()
diff --git a/internal/praefect/replicator_test.go b/internal/praefect/replicator_test.go
index 0987a4043..8a8af2b30 100644
--- a/internal/praefect/replicator_test.go
+++ b/internal/praefect/replicator_test.go
@@ -14,6 +14,9 @@ import (
"github.com/golang/protobuf/proto"
"github.com/prometheus/client_golang/prometheus/testutil"
+ "github.com/sirupsen/logrus"
+ "github.com/sirupsen/logrus/hooks/test"
+ "github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
gitalyauth "gitlab.com/gitlab-org/gitaly/auth"
gitaly_config "gitlab.com/gitlab-org/gitaly/internal/config"
@@ -871,3 +874,41 @@ func TestSubtractUint64(t *testing.T) {
})
}
}
+
+func TestReplMgr_ProcessStale(t *testing.T) {
+ logger := testhelper.DiscardTestLogger(t)
+ hook := test.NewLocal(logger)
+
+ queue := datastore.NewReplicationEventQueueInterceptor(nil)
+ mgr := NewReplMgr(logger.WithField("test", t.Name()), nil, queue, nil)
+
+ var counter int32
+ queue.OnAcknowledgeStale(func(ctx context.Context, duration time.Duration) error {
+ counter++
+ if counter > 2 {
+ return assert.AnError
+ }
+ return nil
+ })
+
+ ctx, cancel := testhelper.Context()
+ defer cancel()
+
+ ctx, cancel = context.WithTimeout(ctx, 350*time.Millisecond)
+ defer cancel()
+
+ done := mgr.ProcessStale(ctx, 100*time.Millisecond, time.Second)
+
+ select {
+ case <-time.After(time.Second):
+ require.FailNow(t, "execution had stuck")
+ case <-done:
+ }
+
+ require.Equal(t, int32(3), counter)
+ require.Len(t, hook.Entries, 1)
+ require.Equal(t, logrus.ErrorLevel, hook.LastEntry().Level)
+ require.Equal(t, "background periodical acknowledgement for stale replication jobs", hook.LastEntry().Message)
+ require.Equal(t, "replication_manager", hook.LastEntry().Data["component"])
+ require.Equal(t, assert.AnError, hook.LastEntry().Data["error"])
+}