diff options
author | Pavlo Strokov <pstrokov@gitlab.com> | 2020-07-02 16:53:30 +0300 |
---|---|---|
committer | Sami Hiltunen <shiltunen@gitlab.com> | 2020-07-02 16:53:30 +0300 |
commit | 172e2a0a922edc4168aadb398b71cf20161ba3c3 (patch) | |
tree | 5940084943cec648770adc5806d9ef06f3935339 /internal/praefect/replicator.go | |
parent | 715cfcc7c66040149447ecdfb2400d0494c88b47 (diff) |
Praefect: replication jobs health ping
The replication process could take unpredictable amount of time.
And to distinct between stale jobs (the process is finished, but
the entry was not updated) and time-consuming jobs the health-ping
was introduced. It updates timestamp of the event in the database
so it can be used to identify if replication process is going on
for this event.
Part of: https://gitlab.com/gitlab-org/gitaly/-/issues/2873
Diffstat (limited to 'internal/praefect/replicator.go')
-rw-r--r-- | internal/praefect/replicator.go | 121 |
1 files changed, 77 insertions, 44 deletions
diff --git a/internal/praefect/replicator.go b/internal/praefect/replicator.go index d6354161c..a4d2a54da 100644 --- a/internal/praefect/replicator.go +++ b/internal/praefect/replicator.go @@ -396,57 +396,17 @@ func (r ReplMgr) processBacklog(ctx context.Context, b BackoffFunc, virtualStora var totalEvents int shard, err := r.nodeManager.GetShard(virtualStorage) - if err == nil { + if err != nil { + logger.WithError(err).Error("error when getting primary and secondaries") + } else { targetNodes := shard.Secondaries if shard.IsReadOnly { targetNodes = append(targetNodes, shard.Primary) } for _, target := range targetNodes { - events, err := r.queue.Dequeue(ctx, virtualStorage, target.GetStorage(), 10) - if err != nil { - logger.WithField(logWithReplTarget, target.GetStorage()).WithError(err).Error("failed to dequeue replication events") - continue - } - - totalEvents += len(events) - - eventIDsByState := map[datastore.JobState][]uint64{} - for _, event := range events { - if err := r.processReplicationEvent(ctx, event, shard, target.GetConnection()); err != nil { - logger.WithFields(logrus.Fields{ - logWithReplJobID: event.ID, - logWithReplVirtual: event.Job.VirtualStorage, - logWithReplTarget: event.Job.TargetNodeStorage, - logWithReplSource: event.Job.SourceNodeStorage, - logWithReplChange: event.Job.Change, - logWithReplPath: event.Job.RelativePath, - logWithCorrID: getCorrelationID(event.Meta), - }).WithError(err).Error("replication job failed") - if event.Attempt <= 0 { - eventIDsByState[datastore.JobStateDead] = append(eventIDsByState[datastore.JobStateDead], event.ID) - } else { - eventIDsByState[datastore.JobStateFailed] = append(eventIDsByState[datastore.JobStateFailed], event.ID) - } - continue - } - eventIDsByState[datastore.JobStateCompleted] = append(eventIDsByState[datastore.JobStateCompleted], event.ID) - } - for state, eventIDs := range eventIDsByState { - ackIDs, err := r.queue.Acknowledge(ctx, state, eventIDs) - if err != nil { - logger.WithField("state", state).WithField("event_ids", eventIDs).WithError(err).Error("failed to acknowledge replication events") - continue - } - - notAckIDs := subtractUint64(ackIDs, eventIDs) - if len(notAckIDs) > 0 { - logger.WithField("state", state).WithField("event_ids", notAckIDs).WithError(err).Error("replication events were not acknowledged") - } - } + totalEvents += r.handleNode(ctx, logger, shard, virtualStorage, target) } - } else { - logger.WithError(err).Error("error when getting primary and secondaries") } if totalEvents == 0 { @@ -463,6 +423,79 @@ func (r ReplMgr) processBacklog(ctx context.Context, b BackoffFunc, virtualStora } } +func (r ReplMgr) handleNode(ctx context.Context, logger logrus.FieldLogger, shard nodes.Shard, virtualStorage string, target nodes.Node) int { + events, err := r.queue.Dequeue(ctx, virtualStorage, target.GetStorage(), 10) + if err != nil { + logger.WithField(logWithReplTarget, target.GetStorage()).WithError(err).Error("failed to dequeue replication events") + return 0 + } + + if len(events) == 0 { + return 0 + } + + stopHealthUpdate := r.startHealthUpdate(ctx, logger, events) + defer stopHealthUpdate() + + eventIDsByState := map[datastore.JobState][]uint64{} + for _, event := range events { + state := r.handleNodeEvent(ctx, logger, shard, target, event) + eventIDsByState[state] = append(eventIDsByState[state], event.ID) + } + + for state, eventIDs := range eventIDsByState { + ackIDs, err := r.queue.Acknowledge(ctx, state, eventIDs) + if err != nil { + logger.WithField("state", state).WithField("event_ids", eventIDs).WithError(err).Error("failed to acknowledge replication events") + continue + } + + notAckIDs := subtractUint64(ackIDs, eventIDs) + if len(notAckIDs) > 0 { + logger.WithField("state", state).WithField("event_ids", notAckIDs).WithError(err).Error("replication events were not acknowledged") + } + } + + return len(events) +} + +func (r ReplMgr) startHealthUpdate(ctx context.Context, logger logrus.FieldLogger, events []datastore.ReplicationEvent) context.CancelFunc { + healthUpdateCtx, healthUpdateCancel := context.WithCancel(ctx) + go func() { + ticker := time.NewTicker(5 * time.Second) + defer ticker.Stop() + + if err := r.queue.StartHealthUpdate(healthUpdateCtx, ticker.C, events); err != nil { + ids := make([]uint64, len(events)) + for i, event := range events { + ids[i] = event.ID + } + + logger.WithField("event_ids", ids).WithError(err).Error("health update loop") + } + }() + + return healthUpdateCancel +} + +func (r ReplMgr) handleNodeEvent(ctx context.Context, logger logrus.FieldLogger, shard nodes.Shard, target nodes.Node, event datastore.ReplicationEvent) datastore.JobState { + err := r.processReplicationEvent(ctx, event, shard, target.GetConnection()) + if err == nil { + return datastore.JobStateCompleted + } + + logger.WithFields(logrus.Fields{ + logWithReplJobID: event.ID, + logWithCorrID: getCorrelationID(event.Meta), + }).WithError(err).Error("replication job failed") + + if event.Attempt <= 0 { + return datastore.JobStateDead + } + + return datastore.JobStateFailed +} + func (r ReplMgr) processReplicationEvent(ctx context.Context, event datastore.ReplicationEvent, shard nodes.Shard, targetCC *grpc.ClientConn) error { source, err := shard.GetNode(event.Job.SourceNodeStorage) if err != nil { |