Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSami Hiltunen <shiltunen@gitlab.com>2021-10-13 16:31:09 +0300
committerSami Hiltunen <shiltunen@gitlab.com>2021-11-05 12:27:36 +0300
commit8f34243e3c9f87d30a3d3f6ba3e8174717a19892 (patch)
tree2b8316e59d673b79256c8cc859fc174278743d33 /internal/praefect/replicator.go
parent2288d675a169c0c45445be2afdfcc9074f6470e7 (diff)
Use replica path when processing replication jobs
Praefect begins generating relative paths in 14.6. In order to ensure replication RPCs end up targeting the correct repositories on the disk, this commit changes the replicator to use the stored replica path for the various RPCs. Renames and deletes are handled in the old manner as they won't be produced anymore from 14.6 onwards and are left in place only for backwards compatibility. Rest of the replication operations now target the stored replica path instead of the relative path the clients use to access the repository. As the replication jobs do not yet carry the replica path in them, the field is filled from the database. A later commit will begin including replica path in the replication jobs which will then allow for removing GetReplicaPath.
Diffstat (limited to 'internal/praefect/replicator.go')
-rw-r--r--internal/praefect/replicator.go65
1 files changed, 54 insertions, 11 deletions
diff --git a/internal/praefect/replicator.go b/internal/praefect/replicator.go
index dc896cd31..147b1186e 100644
--- a/internal/praefect/replicator.go
+++ b/internal/praefect/replicator.go
@@ -56,12 +56,12 @@ type defaultReplicator struct {
func (dr defaultReplicator) Replicate(ctx context.Context, event datastore.ReplicationEvent, sourceCC, targetCC *grpc.ClientConn) error {
targetRepository := &gitalypb.Repository{
StorageName: event.Job.TargetNodeStorage,
- RelativePath: event.Job.RelativePath,
+ RelativePath: event.Job.ReplicaPath,
}
sourceRepository := &gitalypb.Repository{
StorageName: event.Job.SourceNodeStorage,
- RelativePath: event.Job.RelativePath,
+ RelativePath: event.Job.ReplicaPath,
}
logger := dr.log.WithFields(logrus.Fields{
@@ -146,7 +146,7 @@ func (dr defaultReplicator) Replicate(ctx context.Context, event datastore.Repli
func (dr defaultReplicator) Destroy(ctx context.Context, event datastore.ReplicationEvent, targetCC *grpc.ClientConn) error {
targetRepo := &gitalypb.Repository{
StorageName: event.Job.TargetNodeStorage,
- RelativePath: event.Job.RelativePath,
+ RelativePath: event.Job.ReplicaPath,
}
repoSvcClient := gitalypb.NewRepositoryServiceClient(targetCC)
@@ -217,7 +217,7 @@ func (dr defaultReplicator) Rename(ctx context.Context, event datastore.Replicat
func (dr defaultReplicator) GarbageCollect(ctx context.Context, event datastore.ReplicationEvent, targetCC *grpc.ClientConn) error {
targetRepo := &gitalypb.Repository{
StorageName: event.Job.TargetNodeStorage,
- RelativePath: event.Job.RelativePath,
+ RelativePath: event.Job.ReplicaPath,
}
createBitmap, err := event.Job.Params.GetBool("CreateBitmap")
@@ -246,7 +246,7 @@ func (dr defaultReplicator) GarbageCollect(ctx context.Context, event datastore.
func (dr defaultReplicator) RepackIncremental(ctx context.Context, event datastore.ReplicationEvent, targetCC *grpc.ClientConn) error {
targetRepo := &gitalypb.Repository{
StorageName: event.Job.TargetNodeStorage,
- RelativePath: event.Job.RelativePath,
+ RelativePath: event.Job.ReplicaPath,
}
repoSvcClient := gitalypb.NewRepositoryServiceClient(targetCC)
@@ -261,7 +261,7 @@ func (dr defaultReplicator) RepackIncremental(ctx context.Context, event datasto
func (dr defaultReplicator) Cleanup(ctx context.Context, event datastore.ReplicationEvent, targetCC *grpc.ClientConn) error {
targetRepo := &gitalypb.Repository{
StorageName: event.Job.TargetNodeStorage,
- RelativePath: event.Job.RelativePath,
+ RelativePath: event.Job.ReplicaPath,
}
repoSvcClient := gitalypb.NewRepositoryServiceClient(targetCC)
@@ -276,7 +276,7 @@ func (dr defaultReplicator) Cleanup(ctx context.Context, event datastore.Replica
func (dr defaultReplicator) PackRefs(ctx context.Context, event datastore.ReplicationEvent, targetCC *grpc.ClientConn) error {
targetRepo := &gitalypb.Repository{
StorageName: event.Job.TargetNodeStorage,
- RelativePath: event.Job.RelativePath,
+ RelativePath: event.Job.ReplicaPath,
}
allRefs, err := event.Job.Params.GetBool("AllRefs")
@@ -299,7 +299,7 @@ func (dr defaultReplicator) PackRefs(ctx context.Context, event datastore.Replic
func (dr defaultReplicator) WriteCommitGraph(ctx context.Context, event datastore.ReplicationEvent, targetCC *grpc.ClientConn) error {
targetRepo := &gitalypb.Repository{
StorageName: event.Job.TargetNodeStorage,
- RelativePath: event.Job.RelativePath,
+ RelativePath: event.Job.ReplicaPath,
}
val, found := event.Job.Params["SplitStrategy"]
@@ -335,7 +335,7 @@ func (dr defaultReplicator) WriteCommitGraph(ctx context.Context, event datastor
func (dr defaultReplicator) MidxRepack(ctx context.Context, event datastore.ReplicationEvent, targetCC *grpc.ClientConn) error {
targetRepo := &gitalypb.Repository{
StorageName: event.Job.TargetNodeStorage,
- RelativePath: event.Job.RelativePath,
+ RelativePath: event.Job.ReplicaPath,
}
repoSvcClient := gitalypb.NewRepositoryServiceClient(targetCC)
@@ -352,7 +352,7 @@ func (dr defaultReplicator) MidxRepack(ctx context.Context, event datastore.Repl
func (dr defaultReplicator) OptimizeRepository(ctx context.Context, event datastore.ReplicationEvent, targetCC *grpc.ClientConn) error {
targetRepo := &gitalypb.Repository{
StorageName: event.Job.TargetNodeStorage,
- RelativePath: event.Job.RelativePath,
+ RelativePath: event.Job.ReplicaPath,
}
repoSvcClient := gitalypb.NewRepositoryServiceClient(targetCC)
@@ -369,7 +369,7 @@ func (dr defaultReplicator) OptimizeRepository(ctx context.Context, event datast
func (dr defaultReplicator) RepackFull(ctx context.Context, event datastore.ReplicationEvent, targetCC *grpc.ClientConn) error {
targetRepo := &gitalypb.Repository{
StorageName: event.Job.TargetNodeStorage,
- RelativePath: event.Job.RelativePath,
+ RelativePath: event.Job.ReplicaPath,
}
createBitmap, err := event.Job.Params.GetBool("CreateBitmap")
@@ -403,6 +403,7 @@ type ReplMgr struct {
replJobTimeout time.Duration
dequeueBatchSize uint
parallelStorageProcessingWorkers uint
+ repositoryStore datastore.RepositoryStore
}
// ReplMgrOpt allows a replicator to be configured with additional options
@@ -457,6 +458,7 @@ func NewReplMgr(log *logrus.Entry, storageNames map[string][]string, queue datas
replDelayMetric: prometheus.NewHistogramVec(prometheus.HistogramOpts{}, []string{"type"}),
dequeueBatchSize: config.DefaultReplicationConfig().BatchSize,
parallelStorageProcessingWorkers: 1,
+ repositoryStore: rs,
}
for _, opt := range opts {
@@ -740,6 +742,42 @@ func (r ReplMgr) handleNodeEvent(ctx context.Context, logger logrus.FieldLogger,
return newState
}
+// backfillReplicaPath backfills the replica path in the replication job. As of 14.5, not all jobs are guaranteed
+// to have a replica path on them yet. There are few special cased jobs which won't be scheduled anymore in 14.6 a
+// and thus do not need to use replica paths.
+func (r ReplMgr) backfillReplicaPath(ctx context.Context, event datastore.ReplicationEvent) (string, error) {
+ switch {
+ // The reconciler scheduled DeleteReplica jobs which are missing repository ID. 14.5 has
+ // dropped this logic and doesn't leave orphaned records any more as Praefect has a walker
+ // to identify stale replicas. Any jobs still in flight have been scheduled prior to 14.4 and
+ // should be handled in the old manner.
+ case event.Job.Change == datastore.DeleteReplica && event.Job.RepositoryID == 0:
+ fallthrough
+ // 14.5 also doesn't schedule DeleteRepo jobs. Any jobs are again old jobs in-flight.
+ // The repository ID in delete jobs scheduled in 14.4 won't be present anymore at the time the
+ // replication job is being executed, as the the 'repositories' record is deleted. Given that,
+ // it's not possible to get the replica path. In 14.4, Praefect intercepts deletes and handles
+ // them without scheduling replication jobs. The 'delete' jobs still in flight are handled as before
+ // for backwards compatibility.
+ case event.Job.Change == datastore.DeleteRepo:
+ fallthrough
+ // RenameRepo doesn't need to use repository ID as the RenameRepository RPC
+ // call will be intercepted in 14.6 by Praefect to perform an atomic rename in
+ // the database. Any jobs still in flight are from 14.5 and older, and should be
+ // handled in the old manner. We'll use the relative path from the replication job
+ // for the backwards compatible handling.
+ case event.Job.Change == datastore.RenameRepo:
+ return event.Job.RelativePath, nil
+ default:
+ replicaPath, err := r.repositoryStore.GetReplicaPath(ctx, event.Job.RepositoryID)
+ if err != nil {
+ return "", fmt.Errorf("get replica path: %w", err)
+ }
+
+ return replicaPath, nil
+ }
+}
+
func (r ReplMgr) processReplicationEvent(ctx context.Context, event datastore.ReplicationEvent, targetCC *grpc.ClientConn) error {
var cancel func()
@@ -759,6 +797,11 @@ func (r ReplMgr) processReplicationEvent(ctx context.Context, event datastore.Re
defer inFlightGauge.Dec()
var err error
+ event.Job.ReplicaPath, err = r.backfillReplicaPath(ctx, event)
+ if err != nil {
+ return fmt.Errorf("choose replica path: %w", err)
+ }
+
switch event.Job.Change {
case datastore.UpdateRepo:
source, ok := r.nodes[event.Job.VirtualStorage][event.Job.SourceNodeStorage]