Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
Diffstat (limited to 'internal/backup')
-rw-r--r--internal/backup/pipeline.go20
-rw-r--r--internal/backup/pipeline_test.go29
2 files changed, 35 insertions, 14 deletions
diff --git a/internal/backup/pipeline.go b/internal/backup/pipeline.go
index 72e5d9740..bfb298350 100644
--- a/internal/backup/pipeline.go
+++ b/internal/backup/pipeline.go
@@ -12,6 +12,10 @@ import (
"gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb"
)
+// repositoryKey uniquely identifies a repository, and is used here to key the
+// map of processed repos.
+type repositoryKey string
+
// Strategy used to create/restore backups
type Strategy interface {
Create(context.Context, *CreateRequest) error
@@ -196,7 +200,7 @@ type Pipeline struct {
pipelineError error
cmdErrors *commandErrors
- processedRepos map[string][]*gitalypb.Repository
+ processedRepos map[string]map[repositoryKey]struct{}
processedReposMu sync.Mutex
}
@@ -212,7 +216,7 @@ func NewPipeline(log log.Logger, opts ...PipelineOption) (*Pipeline, error) {
done: make(chan struct{}),
workersByStorage: make(map[string]chan *contextCommand),
cmdErrors: &commandErrors{},
- processedRepos: make(map[string][]*gitalypb.Repository),
+ processedRepos: make(map[string]map[repositoryKey]struct{}),
}
for _, opt := range opts {
@@ -270,7 +274,7 @@ func (p *Pipeline) Handle(ctx context.Context, cmd Command) {
}
// Done waits for any in progress jobs to complete then reports any accumulated errors
-func (p *Pipeline) Done() (processedRepos map[string][]*gitalypb.Repository, err error) {
+func (p *Pipeline) Done() (processedRepos map[string]map[repositoryKey]struct{}, err error) {
close(p.done)
p.workerWg.Wait()
@@ -345,7 +349,10 @@ func (p *Pipeline) processCommand(ctx context.Context, cmd Command) {
storageName := cmd.Repository().StorageName
p.processedReposMu.Lock()
- p.processedRepos[storageName] = append(p.processedRepos[storageName], cmd.Repository())
+ if _, ok := p.processedRepos[storageName]; !ok {
+ p.processedRepos[storageName] = make(map[repositoryKey]struct{})
+ }
+ p.processedRepos[storageName][NewRepositoryKey(cmd.Repository())] = struct{}{}
p.processedReposMu.Unlock()
log.Info(fmt.Sprintf("completed %s", cmd.Name()))
@@ -386,3 +393,8 @@ func (p *Pipeline) releaseWorkerSlot() {
}
<-p.totalWorkers
}
+
+// NewRepositoryKey returns a unique identifier for the provided repo.
+func NewRepositoryKey(repo *gitalypb.Repository) repositoryKey {
+ return repositoryKey(repo.StorageName + "-" + repo.RelativePath)
+}
diff --git a/internal/backup/pipeline_test.go b/internal/backup/pipeline_test.go
index 22dd65a83..06d071f4b 100644
--- a/internal/backup/pipeline_test.go
+++ b/internal/backup/pipeline_test.go
@@ -331,26 +331,35 @@ func TestPipelineError(t *testing.T) {
func TestPipelineProcessedRepos(t *testing.T) {
strategy := MockStrategy{}
- repos := map[string][]*gitalypb.Repository{
+ repos := []*gitalypb.Repository{
+ {RelativePath: "a.git", StorageName: "storage1"},
+ {RelativePath: "b.git", StorageName: "storage1"},
+ {RelativePath: "c.git", StorageName: "storage2"},
+ {RelativePath: "d.git", StorageName: "storage3"},
+ }
+
+ expectedProcessedRepos := map[string]map[repositoryKey]struct{}{
"storage1": {
- {RelativePath: "a.git", StorageName: "storage1"},
- {RelativePath: "b.git", StorageName: "storage1"},
+ "storage1-a.git": {},
+ "storage1-b.git": {},
+ },
+ "storage2": {
+ "storage2-c.git": {},
+ },
+ "storage3": {
+ "storage3-d.git": {},
},
- "storage2": {{RelativePath: "c.git", StorageName: "storage2"}},
- "storage3": {{RelativePath: "d.git", StorageName: "storage3"}},
}
p, err := NewPipeline(testhelper.SharedLogger(t))
require.NoError(t, err)
ctx := testhelper.Context(t)
- for _, v := range repos {
- for _, repo := range v {
- p.Handle(ctx, NewRestoreCommand(strategy, RestoreRequest{Repository: repo}))
- }
+ for _, repo := range repos {
+ p.Handle(ctx, NewRestoreCommand(strategy, RestoreRequest{Repository: repo}))
}
processedRepos, err := p.Done()
require.NoError(t, err)
- require.EqualValues(t, repos, processedRepos)
+ require.EqualValues(t, expectedProcessedRepos, processedRepos)
}