diff options
-rw-r--r-- | internal/backup/pipeline.go | 11 | ||||
-rw-r--r-- | internal/backup/pipeline_test.go | 16 | ||||
-rw-r--r-- | internal/cli/gitalybackup/restore.go | 35 |
3 files changed, 46 insertions, 16 deletions
diff --git a/internal/backup/pipeline.go b/internal/backup/pipeline.go index 72e5d9740..a10962cad 100644 --- a/internal/backup/pipeline.go +++ b/internal/backup/pipeline.go @@ -196,7 +196,7 @@ type Pipeline struct { pipelineError error cmdErrors *commandErrors - processedRepos map[string][]*gitalypb.Repository + processedRepos map[string]map[*gitalypb.Repository]struct{} processedReposMu sync.Mutex } @@ -212,7 +212,7 @@ func NewPipeline(log log.Logger, opts ...PipelineOption) (*Pipeline, error) { done: make(chan struct{}), workersByStorage: make(map[string]chan *contextCommand), cmdErrors: &commandErrors{}, - processedRepos: make(map[string][]*gitalypb.Repository), + processedRepos: make(map[string]map[*gitalypb.Repository]struct{}), } for _, opt := range opts { @@ -270,7 +270,7 @@ func (p *Pipeline) Handle(ctx context.Context, cmd Command) { } // Done waits for any in progress jobs to complete then reports any accumulated errors -func (p *Pipeline) Done() (processedRepos map[string][]*gitalypb.Repository, err error) { +func (p *Pipeline) Done() (processedRepos map[string]map[*gitalypb.Repository]struct{}, err error) { close(p.done) p.workerWg.Wait() @@ -345,7 +345,10 @@ func (p *Pipeline) processCommand(ctx context.Context, cmd Command) { storageName := cmd.Repository().StorageName p.processedReposMu.Lock() - p.processedRepos[storageName] = append(p.processedRepos[storageName], cmd.Repository()) + if _, ok := p.processedRepos[storageName]; !ok { + p.processedRepos[storageName] = make(map[*gitalypb.Repository]struct{}) + } + p.processedRepos[storageName][cmd.Repository()] = struct{}{} p.processedReposMu.Unlock() log.Info(fmt.Sprintf("completed %s", cmd.Name())) diff --git a/internal/backup/pipeline_test.go b/internal/backup/pipeline_test.go index 22dd65a83..2dfae66d4 100644 --- a/internal/backup/pipeline_test.go +++ b/internal/backup/pipeline_test.go @@ -331,13 +331,17 @@ func TestPipelineError(t *testing.T) { func TestPipelineProcessedRepos(t *testing.T) { strategy := MockStrategy{} - repos := map[string][]*gitalypb.Repository{ + repos := map[string]map[*gitalypb.Repository]struct{}{ "storage1": { - {RelativePath: "a.git", StorageName: "storage1"}, - {RelativePath: "b.git", StorageName: "storage1"}, + &gitalypb.Repository{RelativePath: "a.git", StorageName: "storage1"}: struct{}{}, + &gitalypb.Repository{RelativePath: "b.git", StorageName: "storage1"}: struct{}{}, + }, + "storage2": { + &gitalypb.Repository{RelativePath: "c.git", StorageName: "storage2"}: struct{}{}, + }, + "storage3": { + &gitalypb.Repository{RelativePath: "d.git", StorageName: "storage3"}: struct{}{}, }, - "storage2": {{RelativePath: "c.git", StorageName: "storage2"}}, - "storage3": {{RelativePath: "d.git", StorageName: "storage3"}}, } p, err := NewPipeline(testhelper.SharedLogger(t)) @@ -345,7 +349,7 @@ func TestPipelineProcessedRepos(t *testing.T) { ctx := testhelper.Context(t) for _, v := range repos { - for _, repo := range v { + for repo := range v { p.Handle(ctx, NewRestoreCommand(strategy, RestoreRequest{Repository: repo})) } } diff --git a/internal/cli/gitalybackup/restore.go b/internal/cli/gitalybackup/restore.go index 7a304f2ed..defffa223 100644 --- a/internal/cli/gitalybackup/restore.go +++ b/internal/cli/gitalybackup/restore.go @@ -135,15 +135,18 @@ func (cmd *restoreSubcommand) run(ctx context.Context, logger log.Logger, stdin manager = backup.NewManager(sink, locator, pool) } + // Get the set of existing repositories keyed by storage. We'll later use this to determine any + // dangling repos that should be removed. + existingRepos := make(map[string][]*gitalypb.Repository) for _, storageName := range cmd.removeAllRepositories { - err := manager.RemoveAllRepositories(ctx, &backup.RemoveAllRepositoriesRequest{ + repos, err := manager.ListRepositories(ctx, &backup.ListRepositoriesRequest{ StorageName: storageName, }) if err != nil { - // Treat RemoveAll failures as soft failures until we can determine - // how often it fails. - logger.WithError(err).WithField("storage_name", storageName).Warn("failed to remove all repositories") + logger.WithError(err).WithField("storage_name", storageName).Warn("failed to list repositories") } + + existingRepos[storageName] = repos } var opts []backup.PipelineOption @@ -157,7 +160,6 @@ func (cmd *restoreSubcommand) run(ctx context.Context, logger log.Logger, stdin decoder := json.NewDecoder(stdin) for { - var req restoreRequest if err := decoder.Decode(&req); errors.Is(err, io.EOF) { break @@ -179,8 +181,29 @@ func (cmd *restoreSubcommand) run(ctx context.Context, logger log.Logger, stdin })) } - if _, err := pipeline.Done(); err != nil { + restoredRepos, err := pipeline.Done() + if err != nil { return fmt.Errorf("restore: %w", err) } + + var removalErrors []error + for storageName, repos := range existingRepos { + for _, repo := range repos { + if dangling := restoredRepos[storageName][repo]; dangling == struct{}{} { + // If we have dangling repos (those which exist in the storage but + // weren't part of the restore), they need to be deleted so the + // state of repos in Gitaly matches that in the Rails DB. + if err := manager.RemoveRepository(ctx, &backup.RemoveRepositoryRequest{Repo: repo}); err != nil { + removalErrors = append(removalErrors, fmt.Errorf("storage_name %q relative_path %q: %w", storageName, repo.RelativePath, err)) + } + } + } + } + + if len(removalErrors) > 0 { + return fmt.Errorf("remove dangling repositories: %d failures encountered: %w", + len(removalErrors), errors.Join(removalErrors...)) + } + return nil } |