From 592cd8528e30f1e4c0810b1889854aa38b208418 Mon Sep 17 00:00:00 2001 From: James Liu Date: Thu, 14 Dec 2023 11:51:00 +1100 Subject: backup: Only remove "dangling" repositories Instead of invoking the RemoveAll() RPC prior to the restore, we instead compare the set of existing repositories to the set of repositories contained in the backup being restored. The difference between the two sets -- the set of "dangling" repositories -- are removed individually. This is done to ensure the state of repos in Gitaly matches the worldview held by the Rails DB after a GitLab instance restore. Also fixes the existing tests so that all repositories are created with `gittest.CreateRepository` and are thus visible when we later query `ListRepositories` in the restore logic. --- internal/backup/pipeline.go | 20 +++++++++++--- internal/backup/pipeline_test.go | 29 +++++++++++++------- internal/cli/gitalybackup/restore.go | 35 +++++++++++++++++++----- internal/cli/gitalybackup/restore_test.go | 45 ++++++++++++++++++++----------- 4 files changed, 93 insertions(+), 36 deletions(-) diff --git a/internal/backup/pipeline.go b/internal/backup/pipeline.go index 72e5d9740..bfb298350 100644 --- a/internal/backup/pipeline.go +++ b/internal/backup/pipeline.go @@ -12,6 +12,10 @@ import ( "gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb" ) +// repositoryKey uniquely identifies a repository, and is used here to key the +// map of processed repos. +type repositoryKey string + // Strategy used to create/restore backups type Strategy interface { Create(context.Context, *CreateRequest) error @@ -196,7 +200,7 @@ type Pipeline struct { pipelineError error cmdErrors *commandErrors - processedRepos map[string][]*gitalypb.Repository + processedRepos map[string]map[repositoryKey]struct{} processedReposMu sync.Mutex } @@ -212,7 +216,7 @@ func NewPipeline(log log.Logger, opts ...PipelineOption) (*Pipeline, error) { done: make(chan struct{}), workersByStorage: make(map[string]chan *contextCommand), cmdErrors: &commandErrors{}, - processedRepos: make(map[string][]*gitalypb.Repository), + processedRepos: make(map[string]map[repositoryKey]struct{}), } for _, opt := range opts { @@ -270,7 +274,7 @@ func (p *Pipeline) Handle(ctx context.Context, cmd Command) { } // Done waits for any in progress jobs to complete then reports any accumulated errors -func (p *Pipeline) Done() (processedRepos map[string][]*gitalypb.Repository, err error) { +func (p *Pipeline) Done() (processedRepos map[string]map[repositoryKey]struct{}, err error) { close(p.done) p.workerWg.Wait() @@ -345,7 +349,10 @@ func (p *Pipeline) processCommand(ctx context.Context, cmd Command) { storageName := cmd.Repository().StorageName p.processedReposMu.Lock() - p.processedRepos[storageName] = append(p.processedRepos[storageName], cmd.Repository()) + if _, ok := p.processedRepos[storageName]; !ok { + p.processedRepos[storageName] = make(map[repositoryKey]struct{}) + } + p.processedRepos[storageName][NewRepositoryKey(cmd.Repository())] = struct{}{} p.processedReposMu.Unlock() log.Info(fmt.Sprintf("completed %s", cmd.Name())) @@ -386,3 +393,8 @@ func (p *Pipeline) releaseWorkerSlot() { } <-p.totalWorkers } + +// NewRepositoryKey returns a unique identifier for the provided repo. +func NewRepositoryKey(repo *gitalypb.Repository) repositoryKey { + return repositoryKey(repo.StorageName + "-" + repo.RelativePath) +} diff --git a/internal/backup/pipeline_test.go b/internal/backup/pipeline_test.go index 22dd65a83..06d071f4b 100644 --- a/internal/backup/pipeline_test.go +++ b/internal/backup/pipeline_test.go @@ -331,26 +331,35 @@ func TestPipelineError(t *testing.T) { func TestPipelineProcessedRepos(t *testing.T) { strategy := MockStrategy{} - repos := map[string][]*gitalypb.Repository{ + repos := []*gitalypb.Repository{ + {RelativePath: "a.git", StorageName: "storage1"}, + {RelativePath: "b.git", StorageName: "storage1"}, + {RelativePath: "c.git", StorageName: "storage2"}, + {RelativePath: "d.git", StorageName: "storage3"}, + } + + expectedProcessedRepos := map[string]map[repositoryKey]struct{}{ "storage1": { - {RelativePath: "a.git", StorageName: "storage1"}, - {RelativePath: "b.git", StorageName: "storage1"}, + "storage1-a.git": {}, + "storage1-b.git": {}, + }, + "storage2": { + "storage2-c.git": {}, + }, + "storage3": { + "storage3-d.git": {}, }, - "storage2": {{RelativePath: "c.git", StorageName: "storage2"}}, - "storage3": {{RelativePath: "d.git", StorageName: "storage3"}}, } p, err := NewPipeline(testhelper.SharedLogger(t)) require.NoError(t, err) ctx := testhelper.Context(t) - for _, v := range repos { - for _, repo := range v { - p.Handle(ctx, NewRestoreCommand(strategy, RestoreRequest{Repository: repo})) - } + for _, repo := range repos { + p.Handle(ctx, NewRestoreCommand(strategy, RestoreRequest{Repository: repo})) } processedRepos, err := p.Done() require.NoError(t, err) - require.EqualValues(t, repos, processedRepos) + require.EqualValues(t, expectedProcessedRepos, processedRepos) } diff --git a/internal/cli/gitalybackup/restore.go b/internal/cli/gitalybackup/restore.go index 7a304f2ed..7a3833f80 100644 --- a/internal/cli/gitalybackup/restore.go +++ b/internal/cli/gitalybackup/restore.go @@ -135,15 +135,18 @@ func (cmd *restoreSubcommand) run(ctx context.Context, logger log.Logger, stdin manager = backup.NewManager(sink, locator, pool) } + // Get the set of existing repositories keyed by storage. We'll later use this to determine any + // dangling repos that should be removed. + existingRepos := make(map[string][]*gitalypb.Repository) for _, storageName := range cmd.removeAllRepositories { - err := manager.RemoveAllRepositories(ctx, &backup.RemoveAllRepositoriesRequest{ + repos, err := manager.ListRepositories(ctx, &backup.ListRepositoriesRequest{ StorageName: storageName, }) if err != nil { - // Treat RemoveAll failures as soft failures until we can determine - // how often it fails. - logger.WithError(err).WithField("storage_name", storageName).Warn("failed to remove all repositories") + logger.WithError(err).WithField("storage_name", storageName).Warn("failed to list repositories") } + + existingRepos[storageName] = repos } var opts []backup.PipelineOption @@ -157,7 +160,6 @@ func (cmd *restoreSubcommand) run(ctx context.Context, logger log.Logger, stdin decoder := json.NewDecoder(stdin) for { - var req restoreRequest if err := decoder.Decode(&req); errors.Is(err, io.EOF) { break @@ -179,8 +181,29 @@ func (cmd *restoreSubcommand) run(ctx context.Context, logger log.Logger, stdin })) } - if _, err := pipeline.Done(); err != nil { + restoredRepos, err := pipeline.Done() + if err != nil { return fmt.Errorf("restore: %w", err) } + + var removalErrors []error + for storageName, repos := range existingRepos { + for _, repo := range repos { + if _, ok := restoredRepos[storageName][backup.NewRepositoryKey(repo)]; !ok { + // If we have dangling repos (those which exist in the storage but + // weren't part of the restore), they need to be deleted so the + // state of repos in Gitaly matches that in the Rails DB. + if err := manager.RemoveRepository(ctx, &backup.RemoveRepositoryRequest{Repo: repo}); err != nil { + removalErrors = append(removalErrors, fmt.Errorf("storage_name %q relative_path %q: %w", storageName, repo.RelativePath, err)) + } + } + } + } + + if len(removalErrors) > 0 { + return fmt.Errorf("remove dangling repositories: %d failures encountered: %w", + len(removalErrors), errors.Join(removalErrors...)) + } + return nil } diff --git a/internal/cli/gitalybackup/restore_test.go b/internal/cli/gitalybackup/restore_test.go index 6d235525c..033420d8f 100644 --- a/internal/cli/gitalybackup/restore_test.go +++ b/internal/cli/gitalybackup/restore_test.go @@ -40,24 +40,32 @@ Issue: https://gitlab.com/gitlab-org/gitaly/-/issues/5269`) cfg.SocketPath = testserver.RunGitalyServer(t, cfg, setup.RegisterAll) + // This is an example of a "dangling" repository (one that was created after a backup was taken) that should be + // removed after the backup is restored. existingRepo, existRepoPath := gittest.CreateRepository(t, ctx, cfg, gittest.CreateRepositoryConfig{ RelativePath: "existing_repo", }) gittest.WriteCommit(t, cfg, existRepoPath, gittest.WithBranch(git.DefaultBranch)) - path := testhelper.TempDir(t) - existingRepoBundlePath := filepath.Join(path, existingRepo.RelativePath+".bundle") - existingRepoRefPath := filepath.Join(path, existingRepo.RelativePath+".refs") + // The backupDir contains the artifacts that would've been created as part of a backup. + backupDir := testhelper.TempDir(t) + existingRepoBundlePath := filepath.Join(backupDir, existingRepo.RelativePath+".bundle") + existingRepoRefPath := filepath.Join(backupDir, existingRepo.RelativePath+".refs") gittest.Exec(t, cfg, "-C", existRepoPath, "bundle", "create", existingRepoBundlePath, "--all") require.NoError(t, os.WriteFile(existingRepoRefPath, gittest.Exec(t, cfg, "-C", existRepoPath, "show-ref"), perm.SharedFile)) + // These repos are the ones being restored, and should exist after the restore. var repos []*gitalypb.Repository for i := 0; i < 2; i++ { - repo := gittest.InitRepoDir(t, cfg.Storages[0].Path, fmt.Sprintf("repo-%d", i)) - repoBundlePath := filepath.Join(path, repo.RelativePath+".bundle") - repoRefPath := filepath.Join(path, repo.RelativePath+".refs") + repo, _ := gittest.CreateRepository(t, ctx, cfg, gittest.CreateRepositoryConfig{ + RelativePath: fmt.Sprintf("repo-%d", i), + Storage: cfg.Storages[0], + }) + + repoBundlePath := filepath.Join(backupDir, repo.RelativePath+".bundle") testhelper.CopyFile(t, existingRepoBundlePath, repoBundlePath) + repoRefPath := filepath.Join(backupDir, repo.RelativePath+".refs") testhelper.CopyFile(t, existingRepoRefPath, repoRefPath) repos = append(repos, repo) } @@ -81,7 +89,7 @@ Issue: https://gitlab.com/gitlab-org/gitaly/-/issues/5269`) progname, "restore", "--path", - path, + backupDir, "--parallel", strconv.Itoa(runtime.NumCPU()), "--parallel-storage", @@ -101,9 +109,10 @@ Issue: https://gitlab.com/gitlab-org/gitaly/-/issues/5269`) require.NoDirExists(t, existRepoPath) + // Ensure the repos were restored correctly. for _, repo := range repos { repoPath := filepath.Join(cfg.Storages[0].Path, gittest.GetReplicaPath(t, ctx, cfg, repo)) - bundlePath := filepath.Join(path, repo.RelativePath+".bundle") + bundlePath := filepath.Join(backupDir, repo.RelativePath+".bundle") output := gittest.Exec(t, cfg, "-C", repoPath, "bundle", "verify", bundlePath) require.Contains(t, string(output), "The bundle records a complete history") @@ -122,8 +131,8 @@ Issue: https://gitlab.com/gitlab-org/gitaly/-/issues/5269`) ctx := testhelper.Context(t) - path := testhelper.TempDir(t) - backupSink, err := backup.ResolveSink(ctx, path) + backupDir := testhelper.TempDir(t) + backupSink, err := backup.ResolveSink(ctx, backupDir) require.NoError(t, err) backupLocator, err := backup.ResolveLocator("pointer", backupSink) @@ -142,18 +151,22 @@ Issue: https://gitlab.com/gitlab-org/gitaly/-/issues/5269`) }) gittest.WriteCommit(t, cfg, existRepoPath, gittest.WithBranch(git.DefaultBranch)) - existingRepoBundlePath := filepath.Join(path, existingRepo.RelativePath+".bundle") - existingRepoRefPath := filepath.Join(path, existingRepo.RelativePath+".refs") + existingRepoBundlePath := filepath.Join(backupDir, existingRepo.RelativePath+".bundle") + existingRepoRefPath := filepath.Join(backupDir, existingRepo.RelativePath+".refs") gittest.Exec(t, cfg, "-C", existRepoPath, "bundle", "create", existingRepoBundlePath, "--all") require.NoError(t, os.WriteFile(existingRepoRefPath, gittest.Exec(t, cfg, "-C", existRepoPath, "show-ref"), perm.SharedFile)) var repos []*gitalypb.Repository for i := 0; i < 2; i++ { - repo := gittest.InitRepoDir(t, cfg.Storages[0].Path, fmt.Sprintf("repo-%d", i)) - repoBundlePath := filepath.Join(path, repo.RelativePath+".bundle") - repoRefPath := filepath.Join(path, repo.RelativePath+".refs") + repo, _ := gittest.CreateRepository(t, ctx, cfg, gittest.CreateRepositoryConfig{ + RelativePath: fmt.Sprintf("repo-%d", i), + Storage: cfg.Storages[0], + }) + + repoBundlePath := filepath.Join(backupDir, repo.RelativePath+".bundle") testhelper.CopyFile(t, existingRepoBundlePath, repoBundlePath) + repoRefPath := filepath.Join(backupDir, repo.RelativePath+".refs") testhelper.CopyFile(t, existingRepoRefPath, repoRefPath) repos = append(repos, repo) } @@ -199,7 +212,7 @@ Issue: https://gitlab.com/gitlab-org/gitaly/-/issues/5269`) for _, repo := range repos { repoPath := filepath.Join(cfg.Storages[0].Path, gittest.GetReplicaPath(t, ctx, cfg, repo)) - bundlePath := filepath.Join(path, repo.RelativePath+".bundle") + bundlePath := filepath.Join(backupDir, repo.RelativePath+".bundle") output := gittest.Exec(t, cfg, "-C", repoPath, "bundle", "verify", bundlePath) require.Contains(t, string(output), "The bundle records a complete history") -- cgit v1.2.3