diff options
author | Sami Hiltunen <shiltunen@gitlab.com> | 2020-07-02 16:53:30 +0300 |
---|---|---|
committer | Sami Hiltunen <shiltunen@gitlab.com> | 2020-07-02 16:53:30 +0300 |
commit | a5341cd00664f011d3365cdeba462f04d6e214ed (patch) | |
tree | 6b9eebc1f0ca8e9acd8d3a679a4c803ef671649c | |
parent | c3948afe4501a7b2f94d553063c3c76dc6bc11f0 (diff) | |
parent | 172e2a0a922edc4168aadb398b71cf20161ba3c3 (diff) |
Merge branch 'ps-replication-health-ping' into 'master'
Praefect: replication jobs health ping
See merge request gitlab-org/gitaly!2321
-rw-r--r-- | changelogs/unreleased/ps-replication-health-ping.yml | 5 | ||||
-rw-r--r-- | internal/praefect/datastore/glsql/postgres.go | 1 | ||||
-rw-r--r-- | internal/praefect/datastore/memory.go | 26 | ||||
-rw-r--r-- | internal/praefect/datastore/queue.go | 53 | ||||
-rw-r--r-- | internal/praefect/datastore/queue_test.go | 153 | ||||
-rw-r--r-- | internal/praefect/replicator.go | 121 | ||||
-rw-r--r-- | internal/praefect/replicator_test.go | 9 |
7 files changed, 316 insertions, 52 deletions
diff --git a/changelogs/unreleased/ps-replication-health-ping.yml b/changelogs/unreleased/ps-replication-health-ping.yml new file mode 100644 index 000000000..a03ebeee1 --- /dev/null +++ b/changelogs/unreleased/ps-replication-health-ping.yml @@ -0,0 +1,5 @@ +--- +title: 'Praefect: replication jobs health ping' +merge_request: 2321 +author: +type: added diff --git a/internal/praefect/datastore/glsql/postgres.go b/internal/praefect/datastore/glsql/postgres.go index 6e3346a24..2d3f36150 100644 --- a/internal/praefect/datastore/glsql/postgres.go +++ b/internal/praefect/datastore/glsql/postgres.go @@ -40,6 +40,7 @@ func Migrate(db *sql.DB, ignoreUnknown bool) (int, error) { type Querier interface { QueryContext(ctx context.Context, query string, args ...interface{}) (*sql.Rows, error) QueryRowContext(ctx context.Context, query string, args ...interface{}) *sql.Row + ExecContext(ctx context.Context, query string, args ...interface{}) (sql.Result, error) } // TxQuery runs operations inside transaction and commits|rollbacks on Done. diff --git a/internal/praefect/datastore/memory.go b/internal/praefect/datastore/memory.go index 7ad900b80..ed122e21a 100644 --- a/internal/praefect/datastore/memory.go +++ b/internal/praefect/datastore/memory.go @@ -219,6 +219,12 @@ func (s *memoryReplicationEventQueue) GetUpToDateStorages(_ context.Context, vir return result, nil } +// StartHealthUpdate does nothing as it has no sense in terms of in-memory implementation as +// all information about events will be lost after restart. +func (s *memoryReplicationEventQueue) StartHealthUpdate(context.Context, <-chan time.Time, []ReplicationEvent) error { + return nil +} + // remove deletes i-th element from the queue and from the in-flight tracking map. // It doesn't check 'i' for the out of range and must be called with lock protection. func (s *memoryReplicationEventQueue) remove(i int) { @@ -240,6 +246,8 @@ type ReplicationEventQueueInterceptor interface { OnDequeue(func(context.Context, string, string, int, ReplicationEventQueue) ([]ReplicationEvent, error)) // OnAcknowledge allows to set action that would be executed each time when `Acknowledge` method called. OnAcknowledge(func(context.Context, JobState, []uint64, ReplicationEventQueue) ([]uint64, error)) + // OnStartHealthUpdate allows to set action that would be executed each time when `StartHealthUpdate` method called. + OnStartHealthUpdate(func(context.Context, <-chan time.Time, []ReplicationEvent) error) } // NewReplicationEventQueueInterceptor returns interception over `ReplicationEventQueue` interface. @@ -249,9 +257,10 @@ func NewReplicationEventQueueInterceptor(queue ReplicationEventQueue) Replicatio type replicationEventQueueInterceptor struct { ReplicationEventQueue - onEnqueue func(context.Context, ReplicationEvent, ReplicationEventQueue) (ReplicationEvent, error) - onDequeue func(context.Context, string, string, int, ReplicationEventQueue) ([]ReplicationEvent, error) - onAcknowledge func(context.Context, JobState, []uint64, ReplicationEventQueue) ([]uint64, error) + onEnqueue func(context.Context, ReplicationEvent, ReplicationEventQueue) (ReplicationEvent, error) + onDequeue func(context.Context, string, string, int, ReplicationEventQueue) ([]ReplicationEvent, error) + onAcknowledge func(context.Context, JobState, []uint64, ReplicationEventQueue) ([]uint64, error) + onStartHealthUpdate func(context.Context, <-chan time.Time, []ReplicationEvent) error } func (i *replicationEventQueueInterceptor) OnEnqueue(action func(context.Context, ReplicationEvent, ReplicationEventQueue) (ReplicationEvent, error)) { @@ -266,6 +275,10 @@ func (i *replicationEventQueueInterceptor) OnAcknowledge(action func(context.Con i.onAcknowledge = action } +func (i *replicationEventQueueInterceptor) OnStartHealthUpdate(action func(context.Context, <-chan time.Time, []ReplicationEvent) error) { + i.onStartHealthUpdate = action +} + func (i *replicationEventQueueInterceptor) Enqueue(ctx context.Context, event ReplicationEvent) (ReplicationEvent, error) { if i.onEnqueue != nil { return i.onEnqueue(ctx, event, i.ReplicationEventQueue) @@ -286,3 +299,10 @@ func (i *replicationEventQueueInterceptor) Acknowledge(ctx context.Context, stat } return i.ReplicationEventQueue.Acknowledge(ctx, state, ids) } + +func (i *replicationEventQueueInterceptor) StartHealthUpdate(ctx context.Context, trigger <-chan time.Time, events []ReplicationEvent) error { + if i.onStartHealthUpdate != nil { + return i.onStartHealthUpdate(ctx, trigger, events) + } + return i.ReplicationEventQueue.StartHealthUpdate(ctx, trigger, events) +} diff --git a/internal/praefect/datastore/queue.go b/internal/praefect/datastore/queue.go index 7edc4b006..516573f26 100644 --- a/internal/praefect/datastore/queue.go +++ b/internal/praefect/datastore/queue.go @@ -5,9 +5,11 @@ import ( "database/sql" "database/sql/driver" "encoding/json" + "errors" "fmt" "time" + "github.com/lib/pq" "gitlab.com/gitlab-org/gitaly/internal/praefect/datastore/glsql" ) @@ -28,6 +30,11 @@ type ReplicationEventQueue interface { // GetUpToDateStorages returns list of target storages where latest replication job is in 'completed' state. // It returns no results if there is no up to date storages or there were no replication events yet. GetUpToDateStorages(ctx context.Context, virtualStorage, repoPath string) ([]string, error) + // StartHealthUpdate starts periodical update of the event's health identifier. + // The events with fresh health identifier won't be considered as stale. + // The health update will be executed on each new entry received from trigger channel passed in. + // It is a blocking call that is managed by the passed in context. + StartHealthUpdate(ctx context.Context, trigger <-chan time.Time, events []ReplicationEvent) error } func allowToAck(state JobState) error { @@ -374,3 +381,49 @@ func (rq PostgresReplicationEventQueue) GetUpToDateStorages(ctx context.Context, return storages.Values(), nil } + +// StartHealthUpdate starts periodical update of the event's health identifier. +// The events with fresh health identifier won't be considered as stale. +// The health update will be executed on each new entry received from trigger channel passed in. +// It is a blocking call that is managed by the passed in context. +func (rq PostgresReplicationEventQueue) StartHealthUpdate(ctx context.Context, trigger <-chan time.Time, events []ReplicationEvent) error { + if len(events) == 0 { + return nil + } + + jobIDs := make(pq.Int64Array, len(events)) + lockIDs := make(pq.StringArray, len(events)) + for i := range events { + jobIDs[i] = int64(events[i].ID) + lockIDs[i] = events[i].LockID + } + + query := ` + UPDATE replication_queue_job_lock + SET triggered_at = NOW() AT TIME ZONE 'UTC' + WHERE (job_id, lock_id) IN (SELECT UNNEST($1::BIGINT[]), UNNEST($2::TEXT[]))` + + for { + select { + case <-ctx.Done(): + return nil + case <-trigger: + res, err := rq.qc.ExecContext(ctx, query, jobIDs, lockIDs) + if err != nil { + if !(errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded)) { + return err + } + return nil + } + + affected, err := res.RowsAffected() + if err != nil { + return err + } + + if affected == 0 { + return nil + } + } + } +} diff --git a/internal/praefect/datastore/queue_test.go b/internal/praefect/datastore/queue_test.go index 9f4c458d1..f63049b29 100644 --- a/internal/praefect/datastore/queue_test.go +++ b/internal/praefect/datastore/queue_test.go @@ -7,6 +7,7 @@ import ( "testing" "time" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "gitlab.com/gitlab-org/gitaly/internal/praefect/datastore/glsql" "gitlab.com/gitlab-org/gitaly/internal/testhelper" @@ -853,6 +854,138 @@ func TestPostgresReplicationEventQueue_GetUpToDateStorages(t *testing.T) { }) } +func TestPostgresReplicationEventQueue_StartHealthUpdate(t *testing.T) { + db := getDB(t) + + eventType1 := ReplicationEvent{Job: ReplicationJob{ + Change: UpdateRepo, + VirtualStorage: "vs-1", + TargetNodeStorage: "s-1", + SourceNodeStorage: "s-0", + RelativePath: "/path/1", + }} + + eventType2 := eventType1 + eventType2.Job.RelativePath = "/path/2" + + eventType3 := eventType1 + eventType3.Job.VirtualStorage = "vs-2" + + eventType4 := eventType1 + eventType4.Job.TargetNodeStorage = "s-2" + + t.Run("no events is valid", func(t *testing.T) { + // 'qc' is not initialized, so the test will fail if there will be an attempt to make SQL operation + queue := PostgresReplicationEventQueue{} + + ctx, cancel := testhelper.Context() + defer cancel() + + require.NoError(t, queue.StartHealthUpdate(ctx, nil, nil)) + }) + + t.Run("can be terminated by the passed in context", func(t *testing.T) { + ctx, cancel := testhelper.Context() + defer cancel() + + // 'qc' is not initialized, so the test will fail if there will be an attempt to make SQL operation + queue := PostgresReplicationEventQueue{} + cancel() + require.NoError(t, queue.StartHealthUpdate(ctx, nil, []ReplicationEvent{eventType1})) + }) + + t.Run("stops after first error", func(t *testing.T) { + ctx, cancel := testhelper.Context() + defer cancel() + + qc, err := db.DB.BeginTx(ctx, nil) + require.NoError(t, err) + require.NoError(t, qc.Rollback()) + + // 'qc' is initialized with invalid connection (transaction is finished), so operations on it will fail + queue := PostgresReplicationEventQueue{qc: qc} + + trigger := make(chan time.Time, 1) + trigger <- time.Time{} + + require.Error(t, queue.StartHealthUpdate(ctx, trigger, []ReplicationEvent{eventType1})) + }) + + t.Run("stops if nothing to update (extended coverage)", func(t *testing.T) { + db.TruncateAll(t) + + ctx, cancel := testhelper.Context() + defer cancel() + + done := make(chan struct{}) + queue := PostgresReplicationEventQueue{qc: db} + go func() { + trigger := make(chan time.Time) + close(trigger) + + defer close(done) + assert.NoError(t, queue.StartHealthUpdate(ctx, trigger, []ReplicationEvent{eventType1})) + }() + + select { + case <-done: + return // happy path + case <-time.After(time.Second): + require.FailNow(t, "method should return almost immediately as there is nothing to process") + } + }) + + t.Run("triggers all passed in events", func(t *testing.T) { + db.TruncateAll(t) + + ctx, cancel := testhelper.Context() + defer cancel() + + queue := PostgresReplicationEventQueue{qc: db} + events := []ReplicationEvent{eventType1, eventType2, eventType3, eventType4} + for i := range events { + var err error + events[i], err = queue.Enqueue(ctx, events[i]) + require.NoError(t, err, "failed to fill in event queue") + } + + dequeuedEventsToTrigger, err := queue.Dequeue(ctx, eventType1.Job.VirtualStorage, eventType1.Job.TargetNodeStorage, 10) + require.NoError(t, err) + require.Len(t, dequeuedEventsToTrigger, 2, "eventType3 and eventType4 should not be fetched") + ids := []uint64{dequeuedEventsToTrigger[0].ID, dequeuedEventsToTrigger[1].ID} + + dequeuedEventsUntriggered, err := queue.Dequeue(ctx, eventType3.Job.VirtualStorage, eventType3.Job.TargetNodeStorage, 10) + require.NoError(t, err) + require.Len(t, dequeuedEventsUntriggered, 1, "only eventType3 should be fetched") + + initialJobLocks := fetchJobLocks(t, ctx, db) + + trigger := make(chan time.Time, 1) + go func() { + trigger <- time.Time{} + assert.NoError(t, queue.StartHealthUpdate(ctx, trigger, dequeuedEventsToTrigger)) + }() + + time.Sleep(time.Millisecond) // we should sleep as the processing is too fast and won't give different time + trigger <- time.Time{} // once this consumed we are sure that the previous update has been executed + + updatedJobLocks := fetchJobLocks(t, ctx, db) + for i := range initialJobLocks { + if updatedJobLocks[i].JobID == dequeuedEventsUntriggered[0].ID { + require.Equal(t, initialJobLocks[i].TriggeredAt, updatedJobLocks[i].TriggeredAt, "no update expected as it was not submitted") + } else { + require.True(t, updatedJobLocks[i].TriggeredAt.After(initialJobLocks[i].TriggeredAt)) + } + } + + ackIDs, err := queue.Acknowledge(ctx, JobStateFailed, ids) + require.NoError(t, err) + require.ElementsMatch(t, ackIDs, ids) + + require.Len(t, fetchJobLocks(t, ctx, db), 1, "bindings should be removed after acknowledgment") + }) +} + func requireEvents(t *testing.T, ctx context.Context, db glsql.DB, expected []ReplicationEvent) { t.Helper() @@ -909,17 +1042,27 @@ type JobLockRow struct { func requireJobLocks(t *testing.T, ctx context.Context, db glsql.DB, expected []JobLockRow) { t.Helper() - sqlStmt := `SELECT job_id, lock_id FROM replication_queue_job_lock ORDER BY triggered_at` + actual := fetchJobLocks(t, ctx, db) + for i := range actual { + actual[i].TriggeredAt = time.Time{} + } + require.ElementsMatch(t, expected, actual) +} + +func fetchJobLocks(t *testing.T, ctx context.Context, db glsql.DB) []JobLockRow { + t.Helper() + sqlStmt := `SELECT job_id, lock_id, triggered_at FROM replication_queue_job_lock ORDER BY job_id` rows, err := db.QueryContext(ctx, sqlStmt) require.NoError(t, err) defer func() { require.NoError(t, rows.Close(), "completion of result fetching") }() - var actual []JobLockRow + var entries []JobLockRow for rows.Next() { var entry JobLockRow - require.NoError(t, rows.Scan(&entry.JobID, &entry.LockID), "failed to scan entry") - actual = append(actual, entry) + require.NoError(t, rows.Scan(&entry.JobID, &entry.LockID, &entry.TriggeredAt), "failed to scan entry") + entries = append(entries, entry) } require.NoError(t, rows.Err(), "completion of result loop scan") - require.ElementsMatch(t, expected, actual) + + return entries } diff --git a/internal/praefect/replicator.go b/internal/praefect/replicator.go index d6354161c..a4d2a54da 100644 --- a/internal/praefect/replicator.go +++ b/internal/praefect/replicator.go @@ -396,57 +396,17 @@ func (r ReplMgr) processBacklog(ctx context.Context, b BackoffFunc, virtualStora var totalEvents int shard, err := r.nodeManager.GetShard(virtualStorage) - if err == nil { + if err != nil { + logger.WithError(err).Error("error when getting primary and secondaries") + } else { targetNodes := shard.Secondaries if shard.IsReadOnly { targetNodes = append(targetNodes, shard.Primary) } for _, target := range targetNodes { - events, err := r.queue.Dequeue(ctx, virtualStorage, target.GetStorage(), 10) - if err != nil { - logger.WithField(logWithReplTarget, target.GetStorage()).WithError(err).Error("failed to dequeue replication events") - continue - } - - totalEvents += len(events) - - eventIDsByState := map[datastore.JobState][]uint64{} - for _, event := range events { - if err := r.processReplicationEvent(ctx, event, shard, target.GetConnection()); err != nil { - logger.WithFields(logrus.Fields{ - logWithReplJobID: event.ID, - logWithReplVirtual: event.Job.VirtualStorage, - logWithReplTarget: event.Job.TargetNodeStorage, - logWithReplSource: event.Job.SourceNodeStorage, - logWithReplChange: event.Job.Change, - logWithReplPath: event.Job.RelativePath, - logWithCorrID: getCorrelationID(event.Meta), - }).WithError(err).Error("replication job failed") - if event.Attempt <= 0 { - eventIDsByState[datastore.JobStateDead] = append(eventIDsByState[datastore.JobStateDead], event.ID) - } else { - eventIDsByState[datastore.JobStateFailed] = append(eventIDsByState[datastore.JobStateFailed], event.ID) - } - continue - } - eventIDsByState[datastore.JobStateCompleted] = append(eventIDsByState[datastore.JobStateCompleted], event.ID) - } - for state, eventIDs := range eventIDsByState { - ackIDs, err := r.queue.Acknowledge(ctx, state, eventIDs) - if err != nil { - logger.WithField("state", state).WithField("event_ids", eventIDs).WithError(err).Error("failed to acknowledge replication events") - continue - } - - notAckIDs := subtractUint64(ackIDs, eventIDs) - if len(notAckIDs) > 0 { - logger.WithField("state", state).WithField("event_ids", notAckIDs).WithError(err).Error("replication events were not acknowledged") - } - } + totalEvents += r.handleNode(ctx, logger, shard, virtualStorage, target) } - } else { - logger.WithError(err).Error("error when getting primary and secondaries") } if totalEvents == 0 { @@ -463,6 +423,79 @@ func (r ReplMgr) processBacklog(ctx context.Context, b BackoffFunc, virtualStora } } +func (r ReplMgr) handleNode(ctx context.Context, logger logrus.FieldLogger, shard nodes.Shard, virtualStorage string, target nodes.Node) int { + events, err := r.queue.Dequeue(ctx, virtualStorage, target.GetStorage(), 10) + if err != nil { + logger.WithField(logWithReplTarget, target.GetStorage()).WithError(err).Error("failed to dequeue replication events") + return 0 + } + + if len(events) == 0 { + return 0 + } + + stopHealthUpdate := r.startHealthUpdate(ctx, logger, events) + defer stopHealthUpdate() + + eventIDsByState := map[datastore.JobState][]uint64{} + for _, event := range events { + state := r.handleNodeEvent(ctx, logger, shard, target, event) + eventIDsByState[state] = append(eventIDsByState[state], event.ID) + } + + for state, eventIDs := range eventIDsByState { + ackIDs, err := r.queue.Acknowledge(ctx, state, eventIDs) + if err != nil { + logger.WithField("state", state).WithField("event_ids", eventIDs).WithError(err).Error("failed to acknowledge replication events") + continue + } + + notAckIDs := subtractUint64(ackIDs, eventIDs) + if len(notAckIDs) > 0 { + logger.WithField("state", state).WithField("event_ids", notAckIDs).WithError(err).Error("replication events were not acknowledged") + } + } + + return len(events) +} + +func (r ReplMgr) startHealthUpdate(ctx context.Context, logger logrus.FieldLogger, events []datastore.ReplicationEvent) context.CancelFunc { + healthUpdateCtx, healthUpdateCancel := context.WithCancel(ctx) + go func() { + ticker := time.NewTicker(5 * time.Second) + defer ticker.Stop() + + if err := r.queue.StartHealthUpdate(healthUpdateCtx, ticker.C, events); err != nil { + ids := make([]uint64, len(events)) + for i, event := range events { + ids[i] = event.ID + } + + logger.WithField("event_ids", ids).WithError(err).Error("health update loop") + } + }() + + return healthUpdateCancel +} + +func (r ReplMgr) handleNodeEvent(ctx context.Context, logger logrus.FieldLogger, shard nodes.Shard, target nodes.Node, event datastore.ReplicationEvent) datastore.JobState { + err := r.processReplicationEvent(ctx, event, shard, target.GetConnection()) + if err == nil { + return datastore.JobStateCompleted + } + + logger.WithFields(logrus.Fields{ + logWithReplJobID: event.ID, + logWithCorrID: getCorrelationID(event.Meta), + }).WithError(err).Error("replication job failed") + + if event.Attempt <= 0 { + return datastore.JobStateDead + } + + return datastore.JobStateFailed +} + func (r ReplMgr) processReplicationEvent(ctx context.Context, event datastore.ReplicationEvent, shard nodes.Shard, targetCC *grpc.ClientConn) error { source, err := shard.GetNode(event.Job.SourceNodeStorage) if err != nil { diff --git a/internal/praefect/replicator_test.go b/internal/praefect/replicator_test.go index 2ae4dd1dc..93954bd3a 100644 --- a/internal/praefect/replicator_test.go +++ b/internal/praefect/replicator_test.go @@ -7,6 +7,7 @@ import ( "net" "os" "path/filepath" + "sync/atomic" "testing" "time" @@ -572,6 +573,13 @@ func TestProcessBacklog_Success(t *testing.T) { return ackIDs, err }) + var healthUpdated int32 + queueInterceptor.OnStartHealthUpdate(func(ctx context.Context, trigger <-chan time.Time, events []datastore.ReplicationEvent) error { + require.Len(t, events, 4) + atomic.AddInt32(&healthUpdated, 1) + return nil + }) + // Update replication job eventType1 := datastore.ReplicationEvent{ Job: datastore.ReplicationJob{ @@ -636,6 +644,7 @@ func TestProcessBacklog_Success(t *testing.T) { select { case <-processed: + require.EqualValues(t, 1, atomic.LoadInt32(&healthUpdated), "health update should be called") case <-time.After(30 * time.Second): // strongly depends on the processing capacity t.Fatal("time limit expired for job to complete") |