diff options
Diffstat (limited to 'internal/backup')
-rw-r--r-- | internal/backup/pipeline.go | 20 | ||||
-rw-r--r-- | internal/backup/pipeline_test.go | 29 |
2 files changed, 35 insertions, 14 deletions
diff --git a/internal/backup/pipeline.go b/internal/backup/pipeline.go index 72e5d9740..bfb298350 100644 --- a/internal/backup/pipeline.go +++ b/internal/backup/pipeline.go @@ -12,6 +12,10 @@ import ( "gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb" ) +// repositoryKey uniquely identifies a repository, and is used here to key the +// map of processed repos. +type repositoryKey string + // Strategy used to create/restore backups type Strategy interface { Create(context.Context, *CreateRequest) error @@ -196,7 +200,7 @@ type Pipeline struct { pipelineError error cmdErrors *commandErrors - processedRepos map[string][]*gitalypb.Repository + processedRepos map[string]map[repositoryKey]struct{} processedReposMu sync.Mutex } @@ -212,7 +216,7 @@ func NewPipeline(log log.Logger, opts ...PipelineOption) (*Pipeline, error) { done: make(chan struct{}), workersByStorage: make(map[string]chan *contextCommand), cmdErrors: &commandErrors{}, - processedRepos: make(map[string][]*gitalypb.Repository), + processedRepos: make(map[string]map[repositoryKey]struct{}), } for _, opt := range opts { @@ -270,7 +274,7 @@ func (p *Pipeline) Handle(ctx context.Context, cmd Command) { } // Done waits for any in progress jobs to complete then reports any accumulated errors -func (p *Pipeline) Done() (processedRepos map[string][]*gitalypb.Repository, err error) { +func (p *Pipeline) Done() (processedRepos map[string]map[repositoryKey]struct{}, err error) { close(p.done) p.workerWg.Wait() @@ -345,7 +349,10 @@ func (p *Pipeline) processCommand(ctx context.Context, cmd Command) { storageName := cmd.Repository().StorageName p.processedReposMu.Lock() - p.processedRepos[storageName] = append(p.processedRepos[storageName], cmd.Repository()) + if _, ok := p.processedRepos[storageName]; !ok { + p.processedRepos[storageName] = make(map[repositoryKey]struct{}) + } + p.processedRepos[storageName][NewRepositoryKey(cmd.Repository())] = struct{}{} p.processedReposMu.Unlock() log.Info(fmt.Sprintf("completed %s", cmd.Name())) @@ -386,3 +393,8 @@ func (p *Pipeline) releaseWorkerSlot() { } <-p.totalWorkers } + +// NewRepositoryKey returns a unique identifier for the provided repo. +func NewRepositoryKey(repo *gitalypb.Repository) repositoryKey { + return repositoryKey(repo.StorageName + "-" + repo.RelativePath) +} diff --git a/internal/backup/pipeline_test.go b/internal/backup/pipeline_test.go index 22dd65a83..06d071f4b 100644 --- a/internal/backup/pipeline_test.go +++ b/internal/backup/pipeline_test.go @@ -331,26 +331,35 @@ func TestPipelineError(t *testing.T) { func TestPipelineProcessedRepos(t *testing.T) { strategy := MockStrategy{} - repos := map[string][]*gitalypb.Repository{ + repos := []*gitalypb.Repository{ + {RelativePath: "a.git", StorageName: "storage1"}, + {RelativePath: "b.git", StorageName: "storage1"}, + {RelativePath: "c.git", StorageName: "storage2"}, + {RelativePath: "d.git", StorageName: "storage3"}, + } + + expectedProcessedRepos := map[string]map[repositoryKey]struct{}{ "storage1": { - {RelativePath: "a.git", StorageName: "storage1"}, - {RelativePath: "b.git", StorageName: "storage1"}, + "storage1-a.git": {}, + "storage1-b.git": {}, + }, + "storage2": { + "storage2-c.git": {}, + }, + "storage3": { + "storage3-d.git": {}, }, - "storage2": {{RelativePath: "c.git", StorageName: "storage2"}}, - "storage3": {{RelativePath: "d.git", StorageName: "storage3"}}, } p, err := NewPipeline(testhelper.SharedLogger(t)) require.NoError(t, err) ctx := testhelper.Context(t) - for _, v := range repos { - for _, repo := range v { - p.Handle(ctx, NewRestoreCommand(strategy, RestoreRequest{Repository: repo})) - } + for _, repo := range repos { + p.Handle(ctx, NewRestoreCommand(strategy, RestoreRequest{Repository: repo})) } processedRepos, err := p.Done() require.NoError(t, err) - require.EqualValues(t, repos, processedRepos) + require.EqualValues(t, expectedProcessedRepos, processedRepos) } |