diff options
author | Pavlo Strokov <pstrokov@gitlab.com> | 2020-03-27 07:25:52 +0300 |
---|---|---|
committer | Paul Okstad <pokstad@gitlab.com> | 2020-03-27 07:25:52 +0300 |
commit | a98b52ec08cffd61cd41bcc53761d54ef360ecee (patch) | |
tree | 914cad6d1e7fc7aa0d25ddc9d6e6c4ba448ba3e3 /internal/praefect/replicator.go | |
parent | 5cad7eb778160c81aa9a8755ec991030266bb4d5 (diff) |
Praefect: replication event queue as a primary storage of events
Replication storage interface switched to `ReplicationEventQueue`.
`gitaly_replication_queue` table extended with `meta` column
introduced as a container for meta information such as correlation
ID, etc.
`memoryReplicationEventQueue` now populates `LockID` field to
produce same result as SQL impl.
`ReplicationEventQueueInterceptor` introduced for testing purposes
as well as an interceptor for metrics, etc.
`slice` package created to assemble common operation on different
kind of slices (`Uint64` is first one).
Part of: https://gitlab.com/gitlab-org/gitaly/-/issues/2166
Diffstat (limited to 'internal/praefect/replicator.go')
-rw-r--r-- | internal/praefect/replicator.go | 161 |
1 files changed, 98 insertions, 63 deletions
diff --git a/internal/praefect/replicator.go b/internal/praefect/replicator.go index 9fa23eeb6..d1afaf8a5 100644 --- a/internal/praefect/replicator.go +++ b/internal/praefect/replicator.go @@ -9,6 +9,7 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/sirupsen/logrus" "gitlab.com/gitlab-org/gitaly/internal/helper" + "gitlab.com/gitlab-org/gitaly/internal/middleware/metadatahandler" "gitlab.com/gitlab-org/gitaly/internal/praefect/datastore" "gitlab.com/gitlab-org/gitaly/internal/praefect/metrics" "gitlab.com/gitlab-org/gitaly/internal/praefect/nodes" @@ -250,6 +251,8 @@ const ( logWithReplJobID = "replication_job_id" logWithReplSource = "replication_job_source" logWithReplTarget = "replication_job_target" + logWithReplChange = "replication_job_change" + logWithReplPath = "replication_job_path" logWithCorrID = "replication_correlation_id" ) @@ -279,10 +282,6 @@ func ExpBackoffFunc(start time.Duration, max time.Duration) BackoffFunc { } } -const ( - maxAttempts = 3 -) - func (r ReplMgr) getPrimaryAndSecondaries() (primary nodes.Node, secondaries []nodes.Node, err error) { shard, err := r.nodeManager.GetShard(r.virtualStorage) if err != nil { @@ -302,78 +301,99 @@ func (r ReplMgr) getPrimaryAndSecondaries() (primary nodes.Node, secondaries []n return primary, secondaries, nil } +// createReplJob converts `ReplicationEvent` into `ReplJob`. +// It is intermediate solution until `ReplJob` removed and code not adopted to `ReplicationEvent`. +func (r ReplMgr) createReplJob(event datastore.ReplicationEvent) (datastore.ReplJob, error) { + targetNode, err := r.datastore.GetStorageNode(event.Job.TargetNodeStorage) + if err != nil { + return datastore.ReplJob{}, err + } + + sourceNode, err := r.datastore.GetStorageNode(event.Job.SourceNodeStorage) + if err != nil { + return datastore.ReplJob{}, err + } + + var correlationID string + if val, found := event.Meta[metadatahandler.CorrelationIDKey]; found { + correlationID, _ = val.(string) + } + + replJob := datastore.ReplJob{ + Attempts: event.Attempt, + Change: event.Job.Change, + ID: event.ID, + TargetNode: targetNode, + SourceNode: sourceNode, + RelativePath: event.Job.RelativePath, + Params: event.Job.Params, + CorrelationID: correlationID, + } + + return replJob, nil +} + // ProcessBacklog will process queued jobs. It will block while processing jobs. func (r ReplMgr) ProcessBacklog(ctx context.Context, b BackoffFunc) error { backoff, reset := b() for { - var totalJobs int + var totalEvents int primary, secondaries, err := r.getPrimaryAndSecondaries() if err == nil { for _, secondary := range secondaries { - jobs, err := r.datastore.GetJobs([]datastore.JobState{datastore.JobStateReady, datastore.JobStateFailed}, secondary.GetStorage(), 10) + events, err := r.datastore.Dequeue(ctx, secondary.GetStorage(), 10) if err != nil { - return err + r.log.WithField(logWithReplTarget, secondary.GetStorage()).WithError(err).Error("failed to dequeue replication events") + continue } - totalJobs += len(jobs) + totalEvents += len(events) - type replicatedKey struct { - change datastore.ChangeType - repoPath, source, target string - } - reposReplicated := make(map[replicatedKey]struct{}) - - for _, job := range jobs { - if job.Attempts >= maxAttempts { - if err := r.datastore.UpdateReplJobState(job.ID, datastore.JobStateDead); err != nil { - r.log.WithError(err).Error("error when updating replication job status to cancelled") - } + eventIDsByState := map[datastore.JobState][]uint64{} + for _, event := range events { + job, err := r.createReplJob(event) + if err != nil { + r.log.WithField("event", event).WithError(err).Error("failed to restore replication job") + eventIDsByState[datastore.JobStateFailed] = append(eventIDsByState[datastore.JobStateFailed], event.ID) continue } - - var replicationKey replicatedKey - switch job.Change { - // this optimization could be done only for Update and Delete replication jobs as we treat them as idempotent - // Update - there is no much profit from executing multiple fetches for the same target from the same source one by one - // Delete - there is no way how we could remove already removed repository - // that is why those Jobs needs to be tracked and marked as Cancelled (removed from queue without execution). - case datastore.UpdateRepo, datastore.DeleteRepo: - replicationKey = replicatedKey{ - change: job.Change, - repoPath: job.RelativePath, - source: job.SourceNode.Storage, - target: job.TargetNode.Storage, - } - - if _, ok := reposReplicated[replicationKey]; ok { - if err := r.datastore.UpdateReplJobState(job.ID, datastore.JobStateCancelled); err != nil { - r.log.WithError(err).Error("error when updating replication job status to cancelled") - } - continue - } - } - - if err = r.processReplJob(ctx, job, primary.GetConnection(), secondary.GetConnection()); err != nil { + if err := r.processReplJob(ctx, job, primary.GetConnection(), secondary.GetConnection()); err != nil { r.log.WithFields(logrus.Fields{ - logWithReplJobID: job.ID, - "from_storage": job.SourceNode.Storage, - "to_storage": job.TargetNode.Storage, + logWithReplJobID: job.ID, + logWithReplTarget: job.TargetNode.Storage, + logWithReplSource: job.SourceNode.Storage, + logWithReplChange: job.Change, + logWithReplPath: job.RelativePath, + logWithCorrID: job.CorrelationID, }).WithError(err).Error("replication job failed") - if err := r.datastore.UpdateReplJobState(job.ID, datastore.JobStateFailed); err != nil { - r.log.WithError(err).Error("error when updating replication job status to failed") + if job.Attempts == 0 { + eventIDsByState[datastore.JobStateDead] = append(eventIDsByState[datastore.JobStateDead], event.ID) + } else { + eventIDsByState[datastore.JobStateFailed] = append(eventIDsByState[datastore.JobStateFailed], event.ID) } continue } + eventIDsByState[datastore.JobStateCompleted] = append(eventIDsByState[datastore.JobStateCompleted], event.ID) + } + for state, eventIDs := range eventIDsByState { + ackIDs, err := r.datastore.Acknowledge(ctx, state, eventIDs) + if err != nil { + r.log.WithField("state", state).WithField("event_ids", eventIDs).WithError(err).Error("failed to acknowledge replication events") + continue + } - reposReplicated[replicationKey] = struct{}{} + notAckIDs := subtractUint64(ackIDs, eventIDs) + if len(notAckIDs) > 0 { + r.log.WithField("state", state).WithField("event_ids", notAckIDs).WithError(err).Error("replication events were not acknowledged") + } } } } else { r.log.WithError(err).WithField("virtual_storage", r.virtualStorage).Error("error when getting primary and secondaries") } - if totalJobs == 0 { + if totalEvents == 0 { select { case <-time.After(backoff()): continue @@ -396,18 +416,10 @@ func (r ReplMgr) processReplJob(ctx context.Context, job datastore.ReplJob, sour WithField(logWithReplJobID, job.ID). WithField(logWithReplSource, job.SourceNode). WithField(logWithReplTarget, job.TargetNode). + WithField(logWithReplPath, job.RelativePath). + WithField(logWithReplChange, job.Change). WithField(logWithCorrID, job.CorrelationID) - if err := r.datastore.UpdateReplJobState(job.ID, datastore.JobStateInProgress); err != nil { - l.WithError(err).Error("unable to update replication job to in progress") - return err - } - - if err := r.datastore.IncrReplJobAttempts(job.ID); err != nil { - l.WithError(err).Error("unable to increment replication job attempts") - return err - } - var replCtx context.Context var cancel func() @@ -451,9 +463,32 @@ func (r ReplMgr) processReplJob(ctx context.Context, job datastore.ReplJob, sour replDuration := time.Since(replStart) r.replLatencyMetric.Observe(float64(replDuration) / float64(time.Second)) - if err := r.datastore.UpdateReplJobState(job.ID, datastore.JobStateCompleted); err != nil { - r.log.WithError(err).Error("error when updating replication job status to complete") + return nil +} + +// subtractUint64 returns new slice that has all elements from left that does not exist at right. +func subtractUint64(l, r []uint64) []uint64 { + if len(l) == 0 { + return nil } - return nil + if len(r) == 0 { + result := make([]uint64, len(l)) + copy(result, l) + return result + } + + excludeSet := make(map[uint64]struct{}, len(l)) + for _, v := range r { + excludeSet[v] = struct{}{} + } + + var result []uint64 + for _, v := range l { + if _, found := excludeSet[v]; !found { + result = append(result, v) + } + } + + return result } |