Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--internal/backup/pipeline.go11
-rw-r--r--internal/backup/pipeline_test.go16
-rw-r--r--internal/cli/gitalybackup/restore.go35
3 files changed, 46 insertions, 16 deletions
diff --git a/internal/backup/pipeline.go b/internal/backup/pipeline.go
index 72e5d9740..a10962cad 100644
--- a/internal/backup/pipeline.go
+++ b/internal/backup/pipeline.go
@@ -196,7 +196,7 @@ type Pipeline struct {
pipelineError error
cmdErrors *commandErrors
- processedRepos map[string][]*gitalypb.Repository
+ processedRepos map[string]map[*gitalypb.Repository]struct{}
processedReposMu sync.Mutex
}
@@ -212,7 +212,7 @@ func NewPipeline(log log.Logger, opts ...PipelineOption) (*Pipeline, error) {
done: make(chan struct{}),
workersByStorage: make(map[string]chan *contextCommand),
cmdErrors: &commandErrors{},
- processedRepos: make(map[string][]*gitalypb.Repository),
+ processedRepos: make(map[string]map[*gitalypb.Repository]struct{}),
}
for _, opt := range opts {
@@ -270,7 +270,7 @@ func (p *Pipeline) Handle(ctx context.Context, cmd Command) {
}
// Done waits for any in progress jobs to complete then reports any accumulated errors
-func (p *Pipeline) Done() (processedRepos map[string][]*gitalypb.Repository, err error) {
+func (p *Pipeline) Done() (processedRepos map[string]map[*gitalypb.Repository]struct{}, err error) {
close(p.done)
p.workerWg.Wait()
@@ -345,7 +345,10 @@ func (p *Pipeline) processCommand(ctx context.Context, cmd Command) {
storageName := cmd.Repository().StorageName
p.processedReposMu.Lock()
- p.processedRepos[storageName] = append(p.processedRepos[storageName], cmd.Repository())
+ if _, ok := p.processedRepos[storageName]; !ok {
+ p.processedRepos[storageName] = make(map[*gitalypb.Repository]struct{})
+ }
+ p.processedRepos[storageName][cmd.Repository()] = struct{}{}
p.processedReposMu.Unlock()
log.Info(fmt.Sprintf("completed %s", cmd.Name()))
diff --git a/internal/backup/pipeline_test.go b/internal/backup/pipeline_test.go
index 22dd65a83..2dfae66d4 100644
--- a/internal/backup/pipeline_test.go
+++ b/internal/backup/pipeline_test.go
@@ -331,13 +331,17 @@ func TestPipelineError(t *testing.T) {
func TestPipelineProcessedRepos(t *testing.T) {
strategy := MockStrategy{}
- repos := map[string][]*gitalypb.Repository{
+ repos := map[string]map[*gitalypb.Repository]struct{}{
"storage1": {
- {RelativePath: "a.git", StorageName: "storage1"},
- {RelativePath: "b.git", StorageName: "storage1"},
+ &gitalypb.Repository{RelativePath: "a.git", StorageName: "storage1"}: struct{}{},
+ &gitalypb.Repository{RelativePath: "b.git", StorageName: "storage1"}: struct{}{},
+ },
+ "storage2": {
+ &gitalypb.Repository{RelativePath: "c.git", StorageName: "storage2"}: struct{}{},
+ },
+ "storage3": {
+ &gitalypb.Repository{RelativePath: "d.git", StorageName: "storage3"}: struct{}{},
},
- "storage2": {{RelativePath: "c.git", StorageName: "storage2"}},
- "storage3": {{RelativePath: "d.git", StorageName: "storage3"}},
}
p, err := NewPipeline(testhelper.SharedLogger(t))
@@ -345,7 +349,7 @@ func TestPipelineProcessedRepos(t *testing.T) {
ctx := testhelper.Context(t)
for _, v := range repos {
- for _, repo := range v {
+ for repo := range v {
p.Handle(ctx, NewRestoreCommand(strategy, RestoreRequest{Repository: repo}))
}
}
diff --git a/internal/cli/gitalybackup/restore.go b/internal/cli/gitalybackup/restore.go
index 7a304f2ed..defffa223 100644
--- a/internal/cli/gitalybackup/restore.go
+++ b/internal/cli/gitalybackup/restore.go
@@ -135,15 +135,18 @@ func (cmd *restoreSubcommand) run(ctx context.Context, logger log.Logger, stdin
manager = backup.NewManager(sink, locator, pool)
}
+ // Get the set of existing repositories keyed by storage. We'll later use this to determine any
+ // dangling repos that should be removed.
+ existingRepos := make(map[string][]*gitalypb.Repository)
for _, storageName := range cmd.removeAllRepositories {
- err := manager.RemoveAllRepositories(ctx, &backup.RemoveAllRepositoriesRequest{
+ repos, err := manager.ListRepositories(ctx, &backup.ListRepositoriesRequest{
StorageName: storageName,
})
if err != nil {
- // Treat RemoveAll failures as soft failures until we can determine
- // how often it fails.
- logger.WithError(err).WithField("storage_name", storageName).Warn("failed to remove all repositories")
+ logger.WithError(err).WithField("storage_name", storageName).Warn("failed to list repositories")
}
+
+ existingRepos[storageName] = repos
}
var opts []backup.PipelineOption
@@ -157,7 +160,6 @@ func (cmd *restoreSubcommand) run(ctx context.Context, logger log.Logger, stdin
decoder := json.NewDecoder(stdin)
for {
-
var req restoreRequest
if err := decoder.Decode(&req); errors.Is(err, io.EOF) {
break
@@ -179,8 +181,29 @@ func (cmd *restoreSubcommand) run(ctx context.Context, logger log.Logger, stdin
}))
}
- if _, err := pipeline.Done(); err != nil {
+ restoredRepos, err := pipeline.Done()
+ if err != nil {
return fmt.Errorf("restore: %w", err)
}
+
+ var removalErrors []error
+ for storageName, repos := range existingRepos {
+ for _, repo := range repos {
+ if dangling := restoredRepos[storageName][repo]; dangling == struct{}{} {
+ // If we have dangling repos (those which exist in the storage but
+ // weren't part of the restore), they need to be deleted so the
+ // state of repos in Gitaly matches that in the Rails DB.
+ if err := manager.RemoveRepository(ctx, &backup.RemoveRepositoryRequest{Repo: repo}); err != nil {
+ removalErrors = append(removalErrors, fmt.Errorf("storage_name %q relative_path %q: %w", storageName, repo.RelativePath, err))
+ }
+ }
+ }
+ }
+
+ if len(removalErrors) > 0 {
+ return fmt.Errorf("remove dangling repositories: %d failures encountered: %w",
+ len(removalErrors), errors.Join(removalErrors...))
+ }
+
return nil
}