Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSami Hiltunen <shiltunen@gitlab.com>2020-07-07 17:42:22 +0300
committerSami Hiltunen <shiltunen@gitlab.com>2020-07-22 16:19:12 +0300
commit3beb442788cf5f8bc1aa74de745d4f5de76e175b (patch)
tree1d26334a4834f46d6d768bd74690decac598332d /internal/praefect/replicator.go
parent521bb978da8780aca690136e78a3ad388726c8ad (diff)
repository state tracking
Tracking the expected and the actual repository states within a virtual storage is currently done by searching through the replication queue. This requires many variables to be taken in to account such as timings between different jobs and the job history of source nodes. To make the tracking easier, this commit adds two tables to record the latest state of repositories across the cluster: 1. `repositories` table contains the expected state of a repository within a virtual storage. 2. `storage_repositories` table contains the state of the repository on a given storage that is part of a virtual storage. Cross-referencing `storage_repositories` with `repositories` makes it straightforward to figure out repositories which are in the expected state. If a repository on a storage is not in the expected state, appropriate corrective actions can be scheduled by diffing the expected record with the record of the stale storage. Each repository has a generation number which increases monotonically for each write. The generation number can be used to deduce whether the repository has the latest changes or not. The generation number guarantees the repository is at least on the generation stored but it may also be on a later generation if an update was partially applied. To prevent the generation number from referring to outdated data, repository downgrades are rejected. Generation numbers get propagated via replication jobs which again guarantee the repository will be at least on the generation included in the job. After the upgrade, there won't be any repositories in the tables and there might be replication jobs which do not have a generation number. To account for this, the downgrade protection is only applied to repositories which have a stored generation number, ensuring existing replication jobs during cluster upgrade are still accepted. As an upgraded primary receives new writes, the repository entries will be added to the tables and replication jobs with correct generation numbers scheduled.
Diffstat (limited to 'internal/praefect/replicator.go')
-rw-r--r--internal/praefect/replicator.go63
1 files changed, 55 insertions, 8 deletions
diff --git a/internal/praefect/replicator.go b/internal/praefect/replicator.go
index 82de3ca28..d126798a1 100644
--- a/internal/praefect/replicator.go
+++ b/internal/praefect/replicator.go
@@ -38,6 +38,7 @@ type Replicator interface {
type defaultReplicator struct {
log *logrus.Entry
+ rs datastore.RepositoryStore
}
func (dr defaultReplicator) Replicate(ctx context.Context, event datastore.ReplicationEvent, sourceCC, targetCC *grpc.ClientConn) error {
@@ -51,6 +52,17 @@ func (dr defaultReplicator) Replicate(ctx context.Context, event datastore.Repli
RelativePath: event.Job.RelativePath,
}
+ generation, err := dr.rs.GetReplicatedGeneration(ctx, event.Job.VirtualStorage, event.Job.RelativePath, event.Job.SourceNodeStorage, event.Job.TargetNodeStorage)
+ if err != nil {
+ // Later generation might have already been replicated by an earlier replication job. If that's the case,
+ // we'll simply acknowledge the job. This also prevents accidental downgrades from happening.
+ if errors.Is(err, datastore.DowngradeAttemptedError{}) {
+ return nil
+ }
+
+ return fmt.Errorf("get replicated generation: %w", err)
+ }
+
targetRepositoryClient := gitalypb.NewRepositoryServiceClient(targetCC)
if _, err := targetRepositoryClient.ReplicateRepository(ctx, &gitalypb.ReplicateRepositoryRequest{
@@ -101,6 +113,15 @@ func (dr defaultReplicator) Replicate(ctx context.Context, event datastore.Repli
}).Error("checksums do not match")
}
+ if generation != datastore.GenerationUnknown {
+ return dr.rs.SetGeneration(ctx,
+ event.Job.VirtualStorage,
+ event.Job.RelativePath,
+ event.Job.TargetNodeStorage,
+ generation,
+ )
+ }
+
return nil
}
@@ -112,11 +133,23 @@ func (dr defaultReplicator) Destroy(ctx context.Context, event datastore.Replica
repoSvcClient := gitalypb.NewRepositoryServiceClient(targetCC)
- _, err := repoSvcClient.RemoveRepository(ctx, &gitalypb.RemoveRepositoryRequest{
+ if _, err := repoSvcClient.RemoveRepository(ctx, &gitalypb.RemoveRepositoryRequest{
Repository: targetRepo,
- })
+ }); err != nil {
+ return err
+ }
- return err
+ // If the repository was deleted but this fails, we'll know by the repository not having a record in the virtual
+ // storage but having one for the storage. We can later retry the deletion.
+ if err := dr.rs.DeleteRepository(ctx, event.Job.VirtualStorage, event.Job.RelativePath, event.Job.TargetNodeStorage); err != nil {
+ if !errors.Is(err, datastore.RepositoryNotExistsError{}) {
+ return err
+ }
+
+ dr.log.WithError(err).Info("replicated repository delete does not have a store entry")
+ }
+
+ return nil
}
func (dr defaultReplicator) Rename(ctx context.Context, event datastore.ReplicationEvent, targetCC *grpc.ClientConn) error {
@@ -137,12 +170,26 @@ func (dr defaultReplicator) Rename(ctx context.Context, event datastore.Replicat
return fmt.Errorf("parameter 'RelativePath' has unexpected type: %T", relativePath)
}
- _, err := repoSvcClient.RenameRepository(ctx, &gitalypb.RenameRepositoryRequest{
+ if _, err := repoSvcClient.RenameRepository(ctx, &gitalypb.RenameRepositoryRequest{
Repository: targetRepo,
RelativePath: relativePath,
- })
+ }); err != nil {
+ return err
+ }
- return err
+ // If the repository was moved but this fails, we'll have a stale record on the storage but it is missing from the
+ // virtual storage. We can later schedule a deletion to fix the situation. The newly named repository's record
+ // will be present once a replication job arrives for it.
+ if err := dr.rs.RenameRepository(ctx,
+ event.Job.VirtualStorage, event.Job.RelativePath, event.Job.TargetNodeStorage, relativePath); err != nil {
+ if !errors.Is(err, datastore.RepositoryNotExistsError{}) {
+ return err
+ }
+
+ dr.log.WithError(err).Info("replicated repository rename does not have a store entry")
+ }
+
+ return nil
}
func (dr defaultReplicator) GarbageCollect(ctx context.Context, event datastore.ReplicationEvent, targetCC *grpc.ClientConn) error {
@@ -279,12 +326,12 @@ func WithDelayMetric(h prommetrics.HistogramVec) func(*ReplMgr) {
// NewReplMgr initializes a replication manager with the provided dependencies
// and options
-func NewReplMgr(log *logrus.Entry, virtualStorages []string, queue datastore.ReplicationEventQueue, nodeMgr nodes.Manager, opts ...ReplMgrOpt) ReplMgr {
+func NewReplMgr(log *logrus.Entry, virtualStorages []string, queue datastore.ReplicationEventQueue, rs datastore.RepositoryStore, nodeMgr nodes.Manager, opts ...ReplMgrOpt) ReplMgr {
r := ReplMgr{
log: log.WithField("component", "replication_manager"),
queue: queue,
whitelist: map[string]struct{}{},
- replicator: defaultReplicator{log},
+ replicator: defaultReplicator{log, rs},
virtualStorages: virtualStorages,
nodeManager: nodeMgr,
replInFlightMetric: prometheus.NewGaugeVec(