Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJames Liu <jliu@gitlab.com>2023-12-12 06:12:04 +0300
committerJames Liu <jliu@gitlab.com>2024-01-15 05:40:53 +0300
commit95bbcbcca4a2f344a171dba71f30071850f0fba6 (patch)
tree8a278f10cdfd6ac2c83b21a8583a037dfcba3167
parent96b75e53b61c476029d510270daa1d5dfc444a09 (diff)
backup: Track repos that have been processed
Adds a map to the Pipeline to track repos that have been restored or backed up. A mutex is used to synchronise access to the map, as entries are appended by goroutines operating in the workers. The signature of Done() is modified to return the map, and is intentionally ignored in the actual backup and restore logic for now. A subsequent commit will utilise the map for restore operations.
-rw-r--r--internal/backup/pipeline.go17
-rw-r--r--internal/backup/pipeline_test.go37
-rw-r--r--internal/cli/gitalybackup/create.go2
-rw-r--r--internal/cli/gitalybackup/restore.go3
4 files changed, 49 insertions, 10 deletions
diff --git a/internal/backup/pipeline.go b/internal/backup/pipeline.go
index 5672493ac..8000f61af 100644
--- a/internal/backup/pipeline.go
+++ b/internal/backup/pipeline.go
@@ -181,6 +181,9 @@ type Pipeline struct {
pipelineError error
cmdErrors *commandErrors
+
+ processedRepos map[string][]*gitalypb.Repository
+ processedReposMu sync.Mutex
}
// NewPipeline creates a pipeline that executes backup and restore jobs.
@@ -195,6 +198,7 @@ func NewPipeline(log log.Logger, opts ...PipelineOption) (*Pipeline, error) {
done: make(chan struct{}),
workersByStorage: make(map[string]chan *contextCommand),
cmdErrors: &commandErrors{},
+ processedRepos: make(map[string][]*gitalypb.Repository),
}
for _, opt := range opts {
@@ -252,19 +256,19 @@ func (p *Pipeline) Handle(ctx context.Context, cmd Command) {
}
// Done waits for any in progress jobs to complete then reports any accumulated errors
-func (p *Pipeline) Done() error {
+func (p *Pipeline) Done() (processedRepos map[string][]*gitalypb.Repository, err error) {
close(p.done)
p.workerWg.Wait()
if p.pipelineError != nil {
- return fmt.Errorf("pipeline: %w", p.pipelineError)
+ return nil, fmt.Errorf("pipeline: %w", p.pipelineError)
}
if len(p.cmdErrors.errs) > 0 {
- return fmt.Errorf("pipeline: %w", p.cmdErrors)
+ return nil, fmt.Errorf("pipeline: %w", p.cmdErrors)
}
- return nil
+ return p.processedRepos, nil
}
// getWorker finds the channel associated with a storage. When no channel is
@@ -325,6 +329,11 @@ func (p *Pipeline) processCommand(ctx context.Context, cmd Command) {
return
}
+ storageName := cmd.Repository().StorageName
+ p.processedReposMu.Lock()
+ p.processedRepos[storageName] = append(p.processedRepos[storageName], cmd.Repository())
+ p.processedReposMu.Unlock()
+
log.Info(fmt.Sprintf("completed %s", cmd.Name()))
}
diff --git a/internal/backup/pipeline_test.go b/internal/backup/pipeline_test.go
index 04c539a5f..8545c905e 100644
--- a/internal/backup/pipeline_test.go
+++ b/internal/backup/pipeline_test.go
@@ -98,7 +98,8 @@ func TestPipeline(t *testing.T) {
p.Handle(ctx, NewCreateCommand(strategy, CreateRequest{Repository: &gitalypb.Repository{StorageName: "storage1"}}))
p.Handle(ctx, NewCreateCommand(strategy, CreateRequest{Repository: &gitalypb.Repository{StorageName: "storage2"}}))
}
- require.NoError(t, p.Done())
+ _, err = p.Done()
+ require.NoError(t, err)
})
}
})
@@ -115,7 +116,8 @@ func TestPipeline(t *testing.T) {
p.Handle(ctx, NewCreateCommand(strategy, CreateRequest{Repository: &gitalypb.Repository{StorageName: "default"}}))
- require.EqualError(t, p.Done(), "pipeline: context canceled")
+ _, err = p.Done()
+ require.EqualError(t, err, "pipeline: context canceled")
})
}
@@ -222,7 +224,7 @@ func testPipeline(t *testing.T, init func() *Pipeline) {
require.Equal(t, tc.level, logEntry.Level)
}
- err := p.Done()
+ _, err := p.Done()
if tc.level == logrus.ErrorLevel {
require.EqualError(t, err, "pipeline: 1 failures encountered:\n - c.git: assert.AnError general error for testing\n")
@@ -258,7 +260,7 @@ func testPipeline(t *testing.T, init func() *Pipeline) {
for _, cmd := range commands {
p.Handle(ctx, cmd)
}
- err := p.Done()
+ _, err := p.Done()
require.EqualError(t, err, "pipeline: 1 failures encountered:\n - c.git: assert.AnError general error for testing\n")
})
}
@@ -309,3 +311,30 @@ func TestPipelineError(t *testing.T) {
})
}
}
+
+func TestPipelineProcessedRepos(t *testing.T) {
+ strategy := MockStrategy{}
+
+ repos := map[string][]*gitalypb.Repository{
+ "storage1": {
+ {RelativePath: "a.git", StorageName: "storage1"},
+ {RelativePath: "b.git", StorageName: "storage1"},
+ },
+ "storage2": {{RelativePath: "c.git", StorageName: "storage2"}},
+ "storage3": {{RelativePath: "d.git", StorageName: "storage3"}},
+ }
+
+ p, err := NewPipeline(testhelper.SharedLogger(t))
+ require.NoError(t, err)
+
+ ctx := testhelper.Context(t)
+ for _, v := range repos {
+ for _, repo := range v {
+ p.Handle(ctx, NewRestoreCommand(strategy, RestoreRequest{Repository: repo}))
+ }
+ }
+
+ processedRepos, err := p.Done()
+ require.NoError(t, err)
+ require.EqualValues(t, repos, processedRepos)
+}
diff --git a/internal/cli/gitalybackup/create.go b/internal/cli/gitalybackup/create.go
index 4ad653490..be7e75cdb 100644
--- a/internal/cli/gitalybackup/create.go
+++ b/internal/cli/gitalybackup/create.go
@@ -174,7 +174,7 @@ func (cmd *createSubcommand) run(ctx context.Context, logger log.Logger, stdin i
}))
}
- if err := pipeline.Done(); err != nil {
+ if _, err := pipeline.Done(); err != nil {
return fmt.Errorf("create: %w", err)
}
return nil
diff --git a/internal/cli/gitalybackup/restore.go b/internal/cli/gitalybackup/restore.go
index de9e2cd3d..7a304f2ed 100644
--- a/internal/cli/gitalybackup/restore.go
+++ b/internal/cli/gitalybackup/restore.go
@@ -157,6 +157,7 @@ func (cmd *restoreSubcommand) run(ctx context.Context, logger log.Logger, stdin
decoder := json.NewDecoder(stdin)
for {
+
var req restoreRequest
if err := decoder.Decode(&req); errors.Is(err, io.EOF) {
break
@@ -178,7 +179,7 @@ func (cmd *restoreSubcommand) run(ctx context.Context, logger log.Logger, stdin
}))
}
- if err := pipeline.Done(); err != nil {
+ if _, err := pipeline.Done(); err != nil {
return fmt.Errorf("restore: %w", err)
}
return nil