Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPavlo Strokov <pstrokov@gitlab.com>2020-03-27 07:25:52 +0300
committerPaul Okstad <pokstad@gitlab.com>2020-03-27 07:25:52 +0300
commita98b52ec08cffd61cd41bcc53761d54ef360ecee (patch)
tree914cad6d1e7fc7aa0d25ddc9d6e6c4ba448ba3e3 /internal/praefect/replicator.go
parent5cad7eb778160c81aa9a8755ec991030266bb4d5 (diff)
Praefect: replication event queue as a primary storage of events
Replication storage interface switched to `ReplicationEventQueue`. `gitaly_replication_queue` table extended with `meta` column introduced as a container for meta information such as correlation ID, etc. `memoryReplicationEventQueue` now populates `LockID` field to produce same result as SQL impl. `ReplicationEventQueueInterceptor` introduced for testing purposes as well as an interceptor for metrics, etc. `slice` package created to assemble common operation on different kind of slices (`Uint64` is first one). Part of: https://gitlab.com/gitlab-org/gitaly/-/issues/2166
Diffstat (limited to 'internal/praefect/replicator.go')
-rw-r--r--internal/praefect/replicator.go161
1 files changed, 98 insertions, 63 deletions
diff --git a/internal/praefect/replicator.go b/internal/praefect/replicator.go
index 9fa23eeb6..d1afaf8a5 100644
--- a/internal/praefect/replicator.go
+++ b/internal/praefect/replicator.go
@@ -9,6 +9,7 @@ import (
"github.com/prometheus/client_golang/prometheus"
"github.com/sirupsen/logrus"
"gitlab.com/gitlab-org/gitaly/internal/helper"
+ "gitlab.com/gitlab-org/gitaly/internal/middleware/metadatahandler"
"gitlab.com/gitlab-org/gitaly/internal/praefect/datastore"
"gitlab.com/gitlab-org/gitaly/internal/praefect/metrics"
"gitlab.com/gitlab-org/gitaly/internal/praefect/nodes"
@@ -250,6 +251,8 @@ const (
logWithReplJobID = "replication_job_id"
logWithReplSource = "replication_job_source"
logWithReplTarget = "replication_job_target"
+ logWithReplChange = "replication_job_change"
+ logWithReplPath = "replication_job_path"
logWithCorrID = "replication_correlation_id"
)
@@ -279,10 +282,6 @@ func ExpBackoffFunc(start time.Duration, max time.Duration) BackoffFunc {
}
}
-const (
- maxAttempts = 3
-)
-
func (r ReplMgr) getPrimaryAndSecondaries() (primary nodes.Node, secondaries []nodes.Node, err error) {
shard, err := r.nodeManager.GetShard(r.virtualStorage)
if err != nil {
@@ -302,78 +301,99 @@ func (r ReplMgr) getPrimaryAndSecondaries() (primary nodes.Node, secondaries []n
return primary, secondaries, nil
}
+// createReplJob converts `ReplicationEvent` into `ReplJob`.
+// It is intermediate solution until `ReplJob` removed and code not adopted to `ReplicationEvent`.
+func (r ReplMgr) createReplJob(event datastore.ReplicationEvent) (datastore.ReplJob, error) {
+ targetNode, err := r.datastore.GetStorageNode(event.Job.TargetNodeStorage)
+ if err != nil {
+ return datastore.ReplJob{}, err
+ }
+
+ sourceNode, err := r.datastore.GetStorageNode(event.Job.SourceNodeStorage)
+ if err != nil {
+ return datastore.ReplJob{}, err
+ }
+
+ var correlationID string
+ if val, found := event.Meta[metadatahandler.CorrelationIDKey]; found {
+ correlationID, _ = val.(string)
+ }
+
+ replJob := datastore.ReplJob{
+ Attempts: event.Attempt,
+ Change: event.Job.Change,
+ ID: event.ID,
+ TargetNode: targetNode,
+ SourceNode: sourceNode,
+ RelativePath: event.Job.RelativePath,
+ Params: event.Job.Params,
+ CorrelationID: correlationID,
+ }
+
+ return replJob, nil
+}
+
// ProcessBacklog will process queued jobs. It will block while processing jobs.
func (r ReplMgr) ProcessBacklog(ctx context.Context, b BackoffFunc) error {
backoff, reset := b()
for {
- var totalJobs int
+ var totalEvents int
primary, secondaries, err := r.getPrimaryAndSecondaries()
if err == nil {
for _, secondary := range secondaries {
- jobs, err := r.datastore.GetJobs([]datastore.JobState{datastore.JobStateReady, datastore.JobStateFailed}, secondary.GetStorage(), 10)
+ events, err := r.datastore.Dequeue(ctx, secondary.GetStorage(), 10)
if err != nil {
- return err
+ r.log.WithField(logWithReplTarget, secondary.GetStorage()).WithError(err).Error("failed to dequeue replication events")
+ continue
}
- totalJobs += len(jobs)
+ totalEvents += len(events)
- type replicatedKey struct {
- change datastore.ChangeType
- repoPath, source, target string
- }
- reposReplicated := make(map[replicatedKey]struct{})
-
- for _, job := range jobs {
- if job.Attempts >= maxAttempts {
- if err := r.datastore.UpdateReplJobState(job.ID, datastore.JobStateDead); err != nil {
- r.log.WithError(err).Error("error when updating replication job status to cancelled")
- }
+ eventIDsByState := map[datastore.JobState][]uint64{}
+ for _, event := range events {
+ job, err := r.createReplJob(event)
+ if err != nil {
+ r.log.WithField("event", event).WithError(err).Error("failed to restore replication job")
+ eventIDsByState[datastore.JobStateFailed] = append(eventIDsByState[datastore.JobStateFailed], event.ID)
continue
}
-
- var replicationKey replicatedKey
- switch job.Change {
- // this optimization could be done only for Update and Delete replication jobs as we treat them as idempotent
- // Update - there is no much profit from executing multiple fetches for the same target from the same source one by one
- // Delete - there is no way how we could remove already removed repository
- // that is why those Jobs needs to be tracked and marked as Cancelled (removed from queue without execution).
- case datastore.UpdateRepo, datastore.DeleteRepo:
- replicationKey = replicatedKey{
- change: job.Change,
- repoPath: job.RelativePath,
- source: job.SourceNode.Storage,
- target: job.TargetNode.Storage,
- }
-
- if _, ok := reposReplicated[replicationKey]; ok {
- if err := r.datastore.UpdateReplJobState(job.ID, datastore.JobStateCancelled); err != nil {
- r.log.WithError(err).Error("error when updating replication job status to cancelled")
- }
- continue
- }
- }
-
- if err = r.processReplJob(ctx, job, primary.GetConnection(), secondary.GetConnection()); err != nil {
+ if err := r.processReplJob(ctx, job, primary.GetConnection(), secondary.GetConnection()); err != nil {
r.log.WithFields(logrus.Fields{
- logWithReplJobID: job.ID,
- "from_storage": job.SourceNode.Storage,
- "to_storage": job.TargetNode.Storage,
+ logWithReplJobID: job.ID,
+ logWithReplTarget: job.TargetNode.Storage,
+ logWithReplSource: job.SourceNode.Storage,
+ logWithReplChange: job.Change,
+ logWithReplPath: job.RelativePath,
+ logWithCorrID: job.CorrelationID,
}).WithError(err).Error("replication job failed")
- if err := r.datastore.UpdateReplJobState(job.ID, datastore.JobStateFailed); err != nil {
- r.log.WithError(err).Error("error when updating replication job status to failed")
+ if job.Attempts == 0 {
+ eventIDsByState[datastore.JobStateDead] = append(eventIDsByState[datastore.JobStateDead], event.ID)
+ } else {
+ eventIDsByState[datastore.JobStateFailed] = append(eventIDsByState[datastore.JobStateFailed], event.ID)
}
continue
}
+ eventIDsByState[datastore.JobStateCompleted] = append(eventIDsByState[datastore.JobStateCompleted], event.ID)
+ }
+ for state, eventIDs := range eventIDsByState {
+ ackIDs, err := r.datastore.Acknowledge(ctx, state, eventIDs)
+ if err != nil {
+ r.log.WithField("state", state).WithField("event_ids", eventIDs).WithError(err).Error("failed to acknowledge replication events")
+ continue
+ }
- reposReplicated[replicationKey] = struct{}{}
+ notAckIDs := subtractUint64(ackIDs, eventIDs)
+ if len(notAckIDs) > 0 {
+ r.log.WithField("state", state).WithField("event_ids", notAckIDs).WithError(err).Error("replication events were not acknowledged")
+ }
}
}
} else {
r.log.WithError(err).WithField("virtual_storage", r.virtualStorage).Error("error when getting primary and secondaries")
}
- if totalJobs == 0 {
+ if totalEvents == 0 {
select {
case <-time.After(backoff()):
continue
@@ -396,18 +416,10 @@ func (r ReplMgr) processReplJob(ctx context.Context, job datastore.ReplJob, sour
WithField(logWithReplJobID, job.ID).
WithField(logWithReplSource, job.SourceNode).
WithField(logWithReplTarget, job.TargetNode).
+ WithField(logWithReplPath, job.RelativePath).
+ WithField(logWithReplChange, job.Change).
WithField(logWithCorrID, job.CorrelationID)
- if err := r.datastore.UpdateReplJobState(job.ID, datastore.JobStateInProgress); err != nil {
- l.WithError(err).Error("unable to update replication job to in progress")
- return err
- }
-
- if err := r.datastore.IncrReplJobAttempts(job.ID); err != nil {
- l.WithError(err).Error("unable to increment replication job attempts")
- return err
- }
-
var replCtx context.Context
var cancel func()
@@ -451,9 +463,32 @@ func (r ReplMgr) processReplJob(ctx context.Context, job datastore.ReplJob, sour
replDuration := time.Since(replStart)
r.replLatencyMetric.Observe(float64(replDuration) / float64(time.Second))
- if err := r.datastore.UpdateReplJobState(job.ID, datastore.JobStateCompleted); err != nil {
- r.log.WithError(err).Error("error when updating replication job status to complete")
+ return nil
+}
+
+// subtractUint64 returns new slice that has all elements from left that does not exist at right.
+func subtractUint64(l, r []uint64) []uint64 {
+ if len(l) == 0 {
+ return nil
}
- return nil
+ if len(r) == 0 {
+ result := make([]uint64, len(l))
+ copy(result, l)
+ return result
+ }
+
+ excludeSet := make(map[uint64]struct{}, len(l))
+ for _, v := range r {
+ excludeSet[v] = struct{}{}
+ }
+
+ var result []uint64
+ for _, v := range l {
+ if _, found := excludeSet[v]; !found {
+ result = append(result, v)
+ }
+ }
+
+ return result
}