Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPavlo Strokov <pstrokov@gitlab.com>2020-07-02 16:53:30 +0300
committerSami Hiltunen <shiltunen@gitlab.com>2020-07-02 16:53:30 +0300
commit172e2a0a922edc4168aadb398b71cf20161ba3c3 (patch)
tree5940084943cec648770adc5806d9ef06f3935339 /internal/praefect/replicator.go
parent715cfcc7c66040149447ecdfb2400d0494c88b47 (diff)
Praefect: replication jobs health ping
The replication process could take unpredictable amount of time. And to distinct between stale jobs (the process is finished, but the entry was not updated) and time-consuming jobs the health-ping was introduced. It updates timestamp of the event in the database so it can be used to identify if replication process is going on for this event. Part of: https://gitlab.com/gitlab-org/gitaly/-/issues/2873
Diffstat (limited to 'internal/praefect/replicator.go')
-rw-r--r--internal/praefect/replicator.go121
1 files changed, 77 insertions, 44 deletions
diff --git a/internal/praefect/replicator.go b/internal/praefect/replicator.go
index d6354161c..a4d2a54da 100644
--- a/internal/praefect/replicator.go
+++ b/internal/praefect/replicator.go
@@ -396,57 +396,17 @@ func (r ReplMgr) processBacklog(ctx context.Context, b BackoffFunc, virtualStora
var totalEvents int
shard, err := r.nodeManager.GetShard(virtualStorage)
- if err == nil {
+ if err != nil {
+ logger.WithError(err).Error("error when getting primary and secondaries")
+ } else {
targetNodes := shard.Secondaries
if shard.IsReadOnly {
targetNodes = append(targetNodes, shard.Primary)
}
for _, target := range targetNodes {
- events, err := r.queue.Dequeue(ctx, virtualStorage, target.GetStorage(), 10)
- if err != nil {
- logger.WithField(logWithReplTarget, target.GetStorage()).WithError(err).Error("failed to dequeue replication events")
- continue
- }
-
- totalEvents += len(events)
-
- eventIDsByState := map[datastore.JobState][]uint64{}
- for _, event := range events {
- if err := r.processReplicationEvent(ctx, event, shard, target.GetConnection()); err != nil {
- logger.WithFields(logrus.Fields{
- logWithReplJobID: event.ID,
- logWithReplVirtual: event.Job.VirtualStorage,
- logWithReplTarget: event.Job.TargetNodeStorage,
- logWithReplSource: event.Job.SourceNodeStorage,
- logWithReplChange: event.Job.Change,
- logWithReplPath: event.Job.RelativePath,
- logWithCorrID: getCorrelationID(event.Meta),
- }).WithError(err).Error("replication job failed")
- if event.Attempt <= 0 {
- eventIDsByState[datastore.JobStateDead] = append(eventIDsByState[datastore.JobStateDead], event.ID)
- } else {
- eventIDsByState[datastore.JobStateFailed] = append(eventIDsByState[datastore.JobStateFailed], event.ID)
- }
- continue
- }
- eventIDsByState[datastore.JobStateCompleted] = append(eventIDsByState[datastore.JobStateCompleted], event.ID)
- }
- for state, eventIDs := range eventIDsByState {
- ackIDs, err := r.queue.Acknowledge(ctx, state, eventIDs)
- if err != nil {
- logger.WithField("state", state).WithField("event_ids", eventIDs).WithError(err).Error("failed to acknowledge replication events")
- continue
- }
-
- notAckIDs := subtractUint64(ackIDs, eventIDs)
- if len(notAckIDs) > 0 {
- logger.WithField("state", state).WithField("event_ids", notAckIDs).WithError(err).Error("replication events were not acknowledged")
- }
- }
+ totalEvents += r.handleNode(ctx, logger, shard, virtualStorage, target)
}
- } else {
- logger.WithError(err).Error("error when getting primary and secondaries")
}
if totalEvents == 0 {
@@ -463,6 +423,79 @@ func (r ReplMgr) processBacklog(ctx context.Context, b BackoffFunc, virtualStora
}
}
+func (r ReplMgr) handleNode(ctx context.Context, logger logrus.FieldLogger, shard nodes.Shard, virtualStorage string, target nodes.Node) int {
+ events, err := r.queue.Dequeue(ctx, virtualStorage, target.GetStorage(), 10)
+ if err != nil {
+ logger.WithField(logWithReplTarget, target.GetStorage()).WithError(err).Error("failed to dequeue replication events")
+ return 0
+ }
+
+ if len(events) == 0 {
+ return 0
+ }
+
+ stopHealthUpdate := r.startHealthUpdate(ctx, logger, events)
+ defer stopHealthUpdate()
+
+ eventIDsByState := map[datastore.JobState][]uint64{}
+ for _, event := range events {
+ state := r.handleNodeEvent(ctx, logger, shard, target, event)
+ eventIDsByState[state] = append(eventIDsByState[state], event.ID)
+ }
+
+ for state, eventIDs := range eventIDsByState {
+ ackIDs, err := r.queue.Acknowledge(ctx, state, eventIDs)
+ if err != nil {
+ logger.WithField("state", state).WithField("event_ids", eventIDs).WithError(err).Error("failed to acknowledge replication events")
+ continue
+ }
+
+ notAckIDs := subtractUint64(ackIDs, eventIDs)
+ if len(notAckIDs) > 0 {
+ logger.WithField("state", state).WithField("event_ids", notAckIDs).WithError(err).Error("replication events were not acknowledged")
+ }
+ }
+
+ return len(events)
+}
+
+func (r ReplMgr) startHealthUpdate(ctx context.Context, logger logrus.FieldLogger, events []datastore.ReplicationEvent) context.CancelFunc {
+ healthUpdateCtx, healthUpdateCancel := context.WithCancel(ctx)
+ go func() {
+ ticker := time.NewTicker(5 * time.Second)
+ defer ticker.Stop()
+
+ if err := r.queue.StartHealthUpdate(healthUpdateCtx, ticker.C, events); err != nil {
+ ids := make([]uint64, len(events))
+ for i, event := range events {
+ ids[i] = event.ID
+ }
+
+ logger.WithField("event_ids", ids).WithError(err).Error("health update loop")
+ }
+ }()
+
+ return healthUpdateCancel
+}
+
+func (r ReplMgr) handleNodeEvent(ctx context.Context, logger logrus.FieldLogger, shard nodes.Shard, target nodes.Node, event datastore.ReplicationEvent) datastore.JobState {
+ err := r.processReplicationEvent(ctx, event, shard, target.GetConnection())
+ if err == nil {
+ return datastore.JobStateCompleted
+ }
+
+ logger.WithFields(logrus.Fields{
+ logWithReplJobID: event.ID,
+ logWithCorrID: getCorrelationID(event.Meta),
+ }).WithError(err).Error("replication job failed")
+
+ if event.Attempt <= 0 {
+ return datastore.JobStateDead
+ }
+
+ return datastore.JobStateFailed
+}
+
func (r ReplMgr) processReplicationEvent(ctx context.Context, event datastore.ReplicationEvent, shard nodes.Shard, targetCC *grpc.ClientConn) error {
source, err := shard.GetNode(event.Job.SourceNodeStorage)
if err != nil {