diff options
author | Sami Hiltunen <shiltunen@gitlab.com> | 2020-07-07 17:42:22 +0300 |
---|---|---|
committer | Sami Hiltunen <shiltunen@gitlab.com> | 2020-07-22 16:19:12 +0300 |
commit | 3beb442788cf5f8bc1aa74de745d4f5de76e175b (patch) | |
tree | 1d26334a4834f46d6d768bd74690decac598332d /internal/praefect/replicator.go | |
parent | 521bb978da8780aca690136e78a3ad388726c8ad (diff) |
repository state tracking
Tracking the expected and the actual repository states within a virtual
storage is currently done by searching through the replication queue. This
requires many variables to be taken in to account such as timings between
different jobs and the job history of source nodes. To make the tracking
easier, this commit adds two tables to record the latest state of
repositories across the cluster:
1. `repositories` table contains the expected state of a repository
within a virtual storage.
2. `storage_repositories` table contains the state of the repository
on a given storage that is part of a virtual storage.
Cross-referencing `storage_repositories` with `repositories` makes it
straightforward to figure out repositories which are in the expected
state. If a repository on a storage is not in the expected state,
appropriate corrective actions can be scheduled by diffing the expected
record with the record of the stale storage.
Each repository has a generation number which increases monotonically
for each write. The generation number can be used to deduce whether
the repository has the latest changes or not. The generation number
guarantees the repository is at least on the generation stored but it
may also be on a later generation if an update was partially applied.
To prevent the generation number from referring to outdated data,
repository downgrades are rejected. Generation numbers get propagated
via replication jobs which again guarantee the repository will be at
least on the generation included in the job.
After the upgrade, there won't be any repositories in the tables and
there might be replication jobs which do not have a generation number.
To account for this, the downgrade protection is only applied to
repositories which have a stored generation number, ensuring existing
replication jobs during cluster upgrade are still accepted. As an
upgraded primary receives new writes, the repository entries will be
added to the tables and replication jobs with correct generation numbers
scheduled.
Diffstat (limited to 'internal/praefect/replicator.go')
-rw-r--r-- | internal/praefect/replicator.go | 63 |
1 files changed, 55 insertions, 8 deletions
diff --git a/internal/praefect/replicator.go b/internal/praefect/replicator.go index 82de3ca28..d126798a1 100644 --- a/internal/praefect/replicator.go +++ b/internal/praefect/replicator.go @@ -38,6 +38,7 @@ type Replicator interface { type defaultReplicator struct { log *logrus.Entry + rs datastore.RepositoryStore } func (dr defaultReplicator) Replicate(ctx context.Context, event datastore.ReplicationEvent, sourceCC, targetCC *grpc.ClientConn) error { @@ -51,6 +52,17 @@ func (dr defaultReplicator) Replicate(ctx context.Context, event datastore.Repli RelativePath: event.Job.RelativePath, } + generation, err := dr.rs.GetReplicatedGeneration(ctx, event.Job.VirtualStorage, event.Job.RelativePath, event.Job.SourceNodeStorage, event.Job.TargetNodeStorage) + if err != nil { + // Later generation might have already been replicated by an earlier replication job. If that's the case, + // we'll simply acknowledge the job. This also prevents accidental downgrades from happening. + if errors.Is(err, datastore.DowngradeAttemptedError{}) { + return nil + } + + return fmt.Errorf("get replicated generation: %w", err) + } + targetRepositoryClient := gitalypb.NewRepositoryServiceClient(targetCC) if _, err := targetRepositoryClient.ReplicateRepository(ctx, &gitalypb.ReplicateRepositoryRequest{ @@ -101,6 +113,15 @@ func (dr defaultReplicator) Replicate(ctx context.Context, event datastore.Repli }).Error("checksums do not match") } + if generation != datastore.GenerationUnknown { + return dr.rs.SetGeneration(ctx, + event.Job.VirtualStorage, + event.Job.RelativePath, + event.Job.TargetNodeStorage, + generation, + ) + } + return nil } @@ -112,11 +133,23 @@ func (dr defaultReplicator) Destroy(ctx context.Context, event datastore.Replica repoSvcClient := gitalypb.NewRepositoryServiceClient(targetCC) - _, err := repoSvcClient.RemoveRepository(ctx, &gitalypb.RemoveRepositoryRequest{ + if _, err := repoSvcClient.RemoveRepository(ctx, &gitalypb.RemoveRepositoryRequest{ Repository: targetRepo, - }) + }); err != nil { + return err + } - return err + // If the repository was deleted but this fails, we'll know by the repository not having a record in the virtual + // storage but having one for the storage. We can later retry the deletion. + if err := dr.rs.DeleteRepository(ctx, event.Job.VirtualStorage, event.Job.RelativePath, event.Job.TargetNodeStorage); err != nil { + if !errors.Is(err, datastore.RepositoryNotExistsError{}) { + return err + } + + dr.log.WithError(err).Info("replicated repository delete does not have a store entry") + } + + return nil } func (dr defaultReplicator) Rename(ctx context.Context, event datastore.ReplicationEvent, targetCC *grpc.ClientConn) error { @@ -137,12 +170,26 @@ func (dr defaultReplicator) Rename(ctx context.Context, event datastore.Replicat return fmt.Errorf("parameter 'RelativePath' has unexpected type: %T", relativePath) } - _, err := repoSvcClient.RenameRepository(ctx, &gitalypb.RenameRepositoryRequest{ + if _, err := repoSvcClient.RenameRepository(ctx, &gitalypb.RenameRepositoryRequest{ Repository: targetRepo, RelativePath: relativePath, - }) + }); err != nil { + return err + } - return err + // If the repository was moved but this fails, we'll have a stale record on the storage but it is missing from the + // virtual storage. We can later schedule a deletion to fix the situation. The newly named repository's record + // will be present once a replication job arrives for it. + if err := dr.rs.RenameRepository(ctx, + event.Job.VirtualStorage, event.Job.RelativePath, event.Job.TargetNodeStorage, relativePath); err != nil { + if !errors.Is(err, datastore.RepositoryNotExistsError{}) { + return err + } + + dr.log.WithError(err).Info("replicated repository rename does not have a store entry") + } + + return nil } func (dr defaultReplicator) GarbageCollect(ctx context.Context, event datastore.ReplicationEvent, targetCC *grpc.ClientConn) error { @@ -279,12 +326,12 @@ func WithDelayMetric(h prommetrics.HistogramVec) func(*ReplMgr) { // NewReplMgr initializes a replication manager with the provided dependencies // and options -func NewReplMgr(log *logrus.Entry, virtualStorages []string, queue datastore.ReplicationEventQueue, nodeMgr nodes.Manager, opts ...ReplMgrOpt) ReplMgr { +func NewReplMgr(log *logrus.Entry, virtualStorages []string, queue datastore.ReplicationEventQueue, rs datastore.RepositoryStore, nodeMgr nodes.Manager, opts ...ReplMgrOpt) ReplMgr { r := ReplMgr{ log: log.WithField("component", "replication_manager"), queue: queue, whitelist: map[string]struct{}{}, - replicator: defaultReplicator{log}, + replicator: defaultReplicator{log, rs}, virtualStorages: virtualStorages, nodeManager: nodeMgr, replInFlightMetric: prometheus.NewGaugeVec( |