Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPavlo Strokov <pstrokov@gitlab.com>2020-05-26 20:00:29 +0300
committerPavlo Strokov <pstrokov@gitlab.com>2020-05-27 10:49:49 +0300
commitc7cfeabd068e737b80a3302e02fe5cddeeb0ef3b (patch)
tree93b6be8089ce14108c387dc2f3a461e53ef11562 /internal/praefect/replicator.go
parenta1fe1986c75312d557b73f13fe35587462c4ca5f (diff)
Praefect: same storage name can't be used for different virtual storages
In order to exclude usage of Datastore struct as not actual anymore migration to replication queue is node and set of virtual storages is provided from outside in time of creation of replication manager. Part of: https://gitlab.com/gitlab-org/gitaly/-/issues/2613
Diffstat (limited to 'internal/praefect/replicator.go')
-rw-r--r--internal/praefect/replicator.go186
1 files changed, 68 insertions, 118 deletions
diff --git a/internal/praefect/replicator.go b/internal/praefect/replicator.go
index fe6a438ff..721edab29 100644
--- a/internal/praefect/replicator.go
+++ b/internal/praefect/replicator.go
@@ -23,32 +23,32 @@ import (
// Replicator performs the actual replication logic between two nodes
type Replicator interface {
// Replicate propagates changes from the source to the target
- Replicate(ctx context.Context, job datastore.ReplJob, source, target *grpc.ClientConn) error
+ Replicate(ctx context.Context, event datastore.ReplicationEvent, source, target *grpc.ClientConn) error
// Destroy will remove the target repo on the specified target connection
- Destroy(ctx context.Context, job datastore.ReplJob, target *grpc.ClientConn) error
+ Destroy(ctx context.Context, event datastore.ReplicationEvent, target *grpc.ClientConn) error
// Rename will rename(move) the target repo on the specified target connection
- Rename(ctx context.Context, job datastore.ReplJob, target *grpc.ClientConn) error
+ Rename(ctx context.Context, event datastore.ReplicationEvent, target *grpc.ClientConn) error
// GarbageCollect will run gc on the target repository
- GarbageCollect(ctx context.Context, job datastore.ReplJob, target *grpc.ClientConn) error
+ GarbageCollect(ctx context.Context, event datastore.ReplicationEvent, target *grpc.ClientConn) error
// RepackFull will do a full repack on the target repository
- RepackFull(ctx context.Context, job datastore.ReplJob, target *grpc.ClientConn) error
+ RepackFull(ctx context.Context, event datastore.ReplicationEvent, target *grpc.ClientConn) error
// RepackIncremental will do an incremental repack on the target repository
- RepackIncremental(ctx context.Context, job datastore.ReplJob, target *grpc.ClientConn) error
+ RepackIncremental(ctx context.Context, event datastore.ReplicationEvent, target *grpc.ClientConn) error
}
type defaultReplicator struct {
log *logrus.Entry
}
-func (dr defaultReplicator) Replicate(ctx context.Context, job datastore.ReplJob, sourceCC, targetCC *grpc.ClientConn) error {
+func (dr defaultReplicator) Replicate(ctx context.Context, event datastore.ReplicationEvent, sourceCC, targetCC *grpc.ClientConn) error {
targetRepository := &gitalypb.Repository{
- StorageName: job.TargetNode.Storage,
- RelativePath: job.RelativePath,
+ StorageName: event.Job.TargetNodeStorage,
+ RelativePath: event.Job.RelativePath,
}
sourceRepository := &gitalypb.Repository{
- StorageName: job.SourceNode.Storage,
- RelativePath: job.RelativePath,
+ StorageName: event.Job.SourceNodeStorage,
+ RelativePath: event.Job.RelativePath,
}
targetRepositoryClient := gitalypb.NewRepositoryServiceClient(targetCC)
@@ -110,10 +110,10 @@ func (dr defaultReplicator) Replicate(ctx context.Context, job datastore.ReplJob
return nil
}
-func (dr defaultReplicator) Destroy(ctx context.Context, job datastore.ReplJob, targetCC *grpc.ClientConn) error {
+func (dr defaultReplicator) Destroy(ctx context.Context, event datastore.ReplicationEvent, targetCC *grpc.ClientConn) error {
targetRepo := &gitalypb.Repository{
- StorageName: job.TargetNode.Storage,
- RelativePath: job.RelativePath,
+ StorageName: event.Job.TargetNodeStorage,
+ RelativePath: event.Job.RelativePath,
}
repoSvcClient := gitalypb.NewRepositoryServiceClient(targetCC)
@@ -125,15 +125,15 @@ func (dr defaultReplicator) Destroy(ctx context.Context, job datastore.ReplJob,
return err
}
-func (dr defaultReplicator) Rename(ctx context.Context, job datastore.ReplJob, targetCC *grpc.ClientConn) error {
+func (dr defaultReplicator) Rename(ctx context.Context, event datastore.ReplicationEvent, targetCC *grpc.ClientConn) error {
targetRepo := &gitalypb.Repository{
- StorageName: job.TargetNode.Storage,
- RelativePath: job.RelativePath,
+ StorageName: event.Job.TargetNodeStorage,
+ RelativePath: event.Job.RelativePath,
}
repoSvcClient := gitalypb.NewRepositoryServiceClient(targetCC)
- val, found := job.Params["RelativePath"]
+ val, found := event.Job.Params["RelativePath"]
if !found {
return errors.New("no 'RelativePath' parameter for rename")
}
@@ -151,13 +151,13 @@ func (dr defaultReplicator) Rename(ctx context.Context, job datastore.ReplJob, t
return err
}
-func (dr defaultReplicator) GarbageCollect(ctx context.Context, job datastore.ReplJob, targetCC *grpc.ClientConn) error {
+func (dr defaultReplicator) GarbageCollect(ctx context.Context, event datastore.ReplicationEvent, targetCC *grpc.ClientConn) error {
targetRepo := &gitalypb.Repository{
- StorageName: job.TargetNode.Storage,
- RelativePath: job.RelativePath,
+ StorageName: event.Job.TargetNodeStorage,
+ RelativePath: event.Job.RelativePath,
}
- val, found := job.Params["CreateBitmap"]
+ val, found := event.Job.Params["CreateBitmap"]
if !found {
return errors.New("no 'CreateBitmap' parameter for garbage collect")
}
@@ -176,10 +176,10 @@ func (dr defaultReplicator) GarbageCollect(ctx context.Context, job datastore.Re
return err
}
-func (dr defaultReplicator) RepackIncremental(ctx context.Context, job datastore.ReplJob, targetCC *grpc.ClientConn) error {
+func (dr defaultReplicator) RepackIncremental(ctx context.Context, event datastore.ReplicationEvent, targetCC *grpc.ClientConn) error {
targetRepo := &gitalypb.Repository{
- StorageName: job.TargetNode.Storage,
- RelativePath: job.RelativePath,
+ StorageName: event.Job.TargetNodeStorage,
+ RelativePath: event.Job.RelativePath,
}
repoSvcClient := gitalypb.NewRepositoryServiceClient(targetCC)
@@ -191,13 +191,13 @@ func (dr defaultReplicator) RepackIncremental(ctx context.Context, job datastore
return err
}
-func (dr defaultReplicator) RepackFull(ctx context.Context, job datastore.ReplJob, targetCC *grpc.ClientConn) error {
+func (dr defaultReplicator) RepackFull(ctx context.Context, event datastore.ReplicationEvent, targetCC *grpc.ClientConn) error {
targetRepo := &gitalypb.Repository{
- StorageName: job.TargetNode.Storage,
- RelativePath: job.RelativePath,
+ StorageName: event.Job.TargetNodeStorage,
+ RelativePath: event.Job.RelativePath,
}
- val, found := job.Params["CreateBitmap"]
+ val, found := event.Job.Params["CreateBitmap"]
if !found {
return errors.New("no 'CreateBitmap' parameter for repack full")
}
@@ -254,7 +254,7 @@ func (dr defaultReplicator) confirmChecksums(ctx context.Context, primaryClient,
// ReplMgr is a replication manager for handling replication jobs
type ReplMgr struct {
log *logrus.Entry
- datastore datastore.Datastore
+ queue datastore.ReplicationEventQueue
nodeManager nodes.Manager
virtualStorages []string // replicas this replicator is responsible for
replicator Replicator // does the actual replication logic
@@ -292,13 +292,13 @@ func WithDelayMetric(h prommetrics.HistogramVec) func(*ReplMgr) {
// NewReplMgr initializes a replication manager with the provided dependencies
// and options
-func NewReplMgr(log *logrus.Entry, datastore datastore.Datastore, nodeMgr nodes.Manager, opts ...ReplMgrOpt) ReplMgr {
+func NewReplMgr(log *logrus.Entry, virtualStorages []string, queue datastore.ReplicationEventQueue, nodeMgr nodes.Manager, opts ...ReplMgrOpt) ReplMgr {
r := ReplMgr{
log: log.WithField("component", "replication_manager"),
- datastore: datastore,
+ queue: queue,
whitelist: map[string]struct{}{},
replicator: defaultReplicator{log},
- virtualStorages: datastore.VirtualStorages(),
+ virtualStorages: virtualStorages,
nodeManager: nodeMgr,
replLatencyMetric: prometheus.NewHistogramVec(prometheus.HistogramOpts{}, []string{"type"}),
replDelayMetric: prometheus.NewHistogramVec(prometheus.HistogramOpts{}, []string{"type"}),
@@ -364,38 +364,12 @@ func ExpBackoffFunc(start time.Duration, max time.Duration) BackoffFunc {
}
}
-// createReplJob converts `ReplicationEvent` into `ReplJob`.
-// It is intermediate solution until `ReplJob` removed and code not adopted to `ReplicationEvent`.
-func (r ReplMgr) createReplJob(event datastore.ReplicationEvent) (datastore.ReplJob, error) {
- targetNode, err := r.datastore.GetStorageNode(event.Job.TargetNodeStorage)
- if err != nil {
- return datastore.ReplJob{}, err
- }
-
- sourceNode, err := r.datastore.GetStorageNode(event.Job.SourceNodeStorage)
- if err != nil {
- return datastore.ReplJob{}, err
- }
-
- var correlationID string
- if val, found := event.Meta[metadatahandler.CorrelationIDKey]; found {
+func getCorrelationID(params datastore.Params) string {
+ correlationID := ""
+ if val, found := params[metadatahandler.CorrelationIDKey]; found {
correlationID, _ = val.(string)
}
-
- replJob := datastore.ReplJob{
- Attempts: event.Attempt,
- Change: event.Job.Change,
- ID: event.ID,
- VirtualStorage: event.Job.VirtualStorage,
- TargetNode: targetNode,
- SourceNode: sourceNode,
- RelativePath: event.Job.RelativePath,
- Params: event.Job.Params,
- CorrelationID: correlationID,
- CreatedAt: event.CreatedAt,
- }
-
- return replJob, nil
+ return correlationID
}
// ProcessBacklog starts processing of queued jobs.
@@ -428,7 +402,7 @@ func (r ReplMgr) processBacklog(ctx context.Context, b BackoffFunc, virtualStora
}
for _, target := range targetNodes {
- events, err := r.datastore.Dequeue(ctx, virtualStorage, target.GetStorage(), 10)
+ events, err := r.queue.Dequeue(ctx, virtualStorage, target.GetStorage(), 10)
if err != nil {
logger.WithField(logWithReplTarget, target.GetStorage()).WithError(err).Error("failed to dequeue replication events")
continue
@@ -438,31 +412,17 @@ func (r ReplMgr) processBacklog(ctx context.Context, b BackoffFunc, virtualStora
eventIDsByState := map[datastore.JobState][]uint64{}
for _, event := range events {
- job, err := r.createReplJob(event)
- if err != nil {
- logger.WithField("event", event).WithError(err).Error("failed to restore replication job")
- eventIDsByState[datastore.JobStateFailed] = append(eventIDsByState[datastore.JobStateFailed], event.ID)
- continue
- }
-
- source, err := shard.GetNode(job.SourceNode.Storage)
- if err != nil {
- logger.WithField("event", event).WithError(err).Error("failed to get source node for replication job")
- eventIDsByState[datastore.JobStateFailed] = append(eventIDsByState[datastore.JobStateFailed], event.ID)
- continue
- }
-
- if err := r.processReplJob(ctx, job, source.GetConnection(), target.GetConnection()); err != nil {
+ if err := r.processReplJob(ctx, event, shard, target.GetConnection()); err != nil {
logger.WithFields(logrus.Fields{
- logWithReplJobID: job.ID,
- logWithReplVirtual: job.VirtualStorage,
- logWithReplTarget: job.TargetNode.Storage,
- logWithReplSource: job.SourceNode.Storage,
- logWithReplChange: job.Change,
- logWithReplPath: job.RelativePath,
- logWithCorrID: job.CorrelationID,
+ logWithReplJobID: event.ID,
+ logWithReplVirtual: event.Job.VirtualStorage,
+ logWithReplTarget: event.Job.TargetNodeStorage,
+ logWithReplSource: event.Job.SourceNodeStorage,
+ logWithReplChange: event.Job.Change,
+ logWithReplPath: event.Job.RelativePath,
+ logWithCorrID: getCorrelationID(event.Meta),
}).WithError(err).Error("replication job failed")
- if job.Attempts == 0 {
+ if event.Attempt <= 0 {
eventIDsByState[datastore.JobStateDead] = append(eventIDsByState[datastore.JobStateDead], event.ID)
} else {
eventIDsByState[datastore.JobStateFailed] = append(eventIDsByState[datastore.JobStateFailed], event.ID)
@@ -472,7 +432,7 @@ func (r ReplMgr) processBacklog(ctx context.Context, b BackoffFunc, virtualStora
eventIDsByState[datastore.JobStateCompleted] = append(eventIDsByState[datastore.JobStateCompleted], event.ID)
}
for state, eventIDs := range eventIDsByState {
- ackIDs, err := r.datastore.Acknowledge(ctx, state, eventIDs)
+ ackIDs, err := r.queue.Acknowledge(ctx, state, eventIDs)
if err != nil {
logger.WithField("state", state).WithField("event_ids", eventIDs).WithError(err).Error("failed to acknowledge replication events")
continue
@@ -502,15 +462,13 @@ func (r ReplMgr) processBacklog(ctx context.Context, b BackoffFunc, virtualStora
}
}
-func (r ReplMgr) processReplJob(ctx context.Context, job datastore.ReplJob, sourceCC, targetCC *grpc.ClientConn) error {
- l := r.log.
- WithField(logWithReplJobID, job.ID).
- WithField(logWithReplVirtual, job.VirtualStorage).
- WithField(logWithReplSource, job.SourceNode).
- WithField(logWithReplTarget, job.TargetNode).
- WithField(logWithReplPath, job.RelativePath).
- WithField(logWithReplChange, job.Change).
- WithField(logWithCorrID, job.CorrelationID)
+func (r ReplMgr) processReplJob(ctx context.Context, event datastore.ReplicationEvent, shard nodes.Shard, targetCC *grpc.ClientConn) error {
+ source, err := shard.GetNode(event.Job.SourceNodeStorage)
+ if err != nil {
+ return fmt.Errorf("get source node: %w", err)
+ }
+
+ cid := getCorrelationID(event.Meta)
var replCtx context.Context
var cancel func()
@@ -522,48 +480,40 @@ func (r ReplMgr) processReplJob(ctx context.Context, job datastore.ReplJob, sour
}
defer cancel()
- injectedCtx, err := helper.InjectGitalyServers(replCtx, job.SourceNode.Storage, job.SourceNode.Address, job.SourceNode.Token)
+ injectedCtx, err := helper.InjectGitalyServers(replCtx, event.Job.SourceNodeStorage, source.GetAddress(), source.GetToken())
if err != nil {
- l.WithError(err).Error("unable to inject Gitaly servers into context for replication job")
- return err
- }
-
- if job.CorrelationID == "" {
- l.Warn("replication job missing correlation ID")
+ return fmt.Errorf("inject Gitaly servers into context: %w", err)
}
- injectedCtx = grpccorrelation.InjectToOutgoingContext(injectedCtx, job.CorrelationID)
+ injectedCtx = grpccorrelation.InjectToOutgoingContext(injectedCtx, cid)
replStart := time.Now()
- replDelay := replStart.Sub(job.CreatedAt)
- r.replDelayMetric.WithLabelValues(job.Change.String()).Observe(replDelay.Seconds())
+ r.replDelayMetric.WithLabelValues(event.Job.Change.String()).Observe(replStart.Sub(event.CreatedAt).Seconds())
r.replQueueMetric.Inc()
defer r.replQueueMetric.Dec()
- switch job.Change {
+ switch event.Job.Change {
case datastore.UpdateRepo:
- err = r.replicator.Replicate(injectedCtx, job, sourceCC, targetCC)
+ err = r.replicator.Replicate(injectedCtx, event, source.GetConnection(), targetCC)
case datastore.DeleteRepo:
- err = r.replicator.Destroy(injectedCtx, job, targetCC)
+ err = r.replicator.Destroy(injectedCtx, event, targetCC)
case datastore.RenameRepo:
- err = r.replicator.Rename(injectedCtx, job, targetCC)
+ err = r.replicator.Rename(injectedCtx, event, targetCC)
case datastore.GarbageCollect:
- err = r.replicator.GarbageCollect(injectedCtx, job, targetCC)
+ err = r.replicator.GarbageCollect(injectedCtx, event, targetCC)
case datastore.RepackFull:
- err = r.replicator.RepackFull(injectedCtx, job, targetCC)
+ err = r.replicator.RepackFull(injectedCtx, event, targetCC)
case datastore.RepackIncremental:
- err = r.replicator.RepackIncremental(injectedCtx, job, targetCC)
+ err = r.replicator.RepackIncremental(injectedCtx, event, targetCC)
default:
- err = fmt.Errorf("unknown replication change type encountered: %q", job.Change)
+ err = fmt.Errorf("unknown replication change type encountered: %q", event.Job.Change)
}
if err != nil {
- l.WithError(err).Error("unable to replicate")
return err
}
- replDuration := time.Since(replStart)
- r.replLatencyMetric.WithLabelValues(job.Change.String()).Observe(replDuration.Seconds())
+ r.replLatencyMetric.WithLabelValues(event.Job.Change.String()).Observe(time.Since(replStart).Seconds())
return nil
}