Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--internal/backup/pipeline.go17
-rw-r--r--internal/backup/pipeline_test.go37
-rw-r--r--internal/cli/gitalybackup/create.go2
-rw-r--r--internal/cli/gitalybackup/restore.go3
4 files changed, 49 insertions, 10 deletions
diff --git a/internal/backup/pipeline.go b/internal/backup/pipeline.go
index 5672493ac..8000f61af 100644
--- a/internal/backup/pipeline.go
+++ b/internal/backup/pipeline.go
@@ -181,6 +181,9 @@ type Pipeline struct {
pipelineError error
cmdErrors *commandErrors
+
+ processedRepos map[string][]*gitalypb.Repository
+ processedReposMu sync.Mutex
}
// NewPipeline creates a pipeline that executes backup and restore jobs.
@@ -195,6 +198,7 @@ func NewPipeline(log log.Logger, opts ...PipelineOption) (*Pipeline, error) {
done: make(chan struct{}),
workersByStorage: make(map[string]chan *contextCommand),
cmdErrors: &commandErrors{},
+ processedRepos: make(map[string][]*gitalypb.Repository),
}
for _, opt := range opts {
@@ -252,19 +256,19 @@ func (p *Pipeline) Handle(ctx context.Context, cmd Command) {
}
// Done waits for any in progress jobs to complete then reports any accumulated errors
-func (p *Pipeline) Done() error {
+func (p *Pipeline) Done() (processedRepos map[string][]*gitalypb.Repository, err error) {
close(p.done)
p.workerWg.Wait()
if p.pipelineError != nil {
- return fmt.Errorf("pipeline: %w", p.pipelineError)
+ return nil, fmt.Errorf("pipeline: %w", p.pipelineError)
}
if len(p.cmdErrors.errs) > 0 {
- return fmt.Errorf("pipeline: %w", p.cmdErrors)
+ return nil, fmt.Errorf("pipeline: %w", p.cmdErrors)
}
- return nil
+ return p.processedRepos, nil
}
// getWorker finds the channel associated with a storage. When no channel is
@@ -325,6 +329,11 @@ func (p *Pipeline) processCommand(ctx context.Context, cmd Command) {
return
}
+ storageName := cmd.Repository().StorageName
+ p.processedReposMu.Lock()
+ p.processedRepos[storageName] = append(p.processedRepos[storageName], cmd.Repository())
+ p.processedReposMu.Unlock()
+
log.Info(fmt.Sprintf("completed %s", cmd.Name()))
}
diff --git a/internal/backup/pipeline_test.go b/internal/backup/pipeline_test.go
index 04c539a5f..8545c905e 100644
--- a/internal/backup/pipeline_test.go
+++ b/internal/backup/pipeline_test.go
@@ -98,7 +98,8 @@ func TestPipeline(t *testing.T) {
p.Handle(ctx, NewCreateCommand(strategy, CreateRequest{Repository: &gitalypb.Repository{StorageName: "storage1"}}))
p.Handle(ctx, NewCreateCommand(strategy, CreateRequest{Repository: &gitalypb.Repository{StorageName: "storage2"}}))
}
- require.NoError(t, p.Done())
+ _, err = p.Done()
+ require.NoError(t, err)
})
}
})
@@ -115,7 +116,8 @@ func TestPipeline(t *testing.T) {
p.Handle(ctx, NewCreateCommand(strategy, CreateRequest{Repository: &gitalypb.Repository{StorageName: "default"}}))
- require.EqualError(t, p.Done(), "pipeline: context canceled")
+ _, err = p.Done()
+ require.EqualError(t, err, "pipeline: context canceled")
})
}
@@ -222,7 +224,7 @@ func testPipeline(t *testing.T, init func() *Pipeline) {
require.Equal(t, tc.level, logEntry.Level)
}
- err := p.Done()
+ _, err := p.Done()
if tc.level == logrus.ErrorLevel {
require.EqualError(t, err, "pipeline: 1 failures encountered:\n - c.git: assert.AnError general error for testing\n")
@@ -258,7 +260,7 @@ func testPipeline(t *testing.T, init func() *Pipeline) {
for _, cmd := range commands {
p.Handle(ctx, cmd)
}
- err := p.Done()
+ _, err := p.Done()
require.EqualError(t, err, "pipeline: 1 failures encountered:\n - c.git: assert.AnError general error for testing\n")
})
}
@@ -309,3 +311,30 @@ func TestPipelineError(t *testing.T) {
})
}
}
+
+func TestPipelineProcessedRepos(t *testing.T) {
+ strategy := MockStrategy{}
+
+ repos := map[string][]*gitalypb.Repository{
+ "storage1": {
+ {RelativePath: "a.git", StorageName: "storage1"},
+ {RelativePath: "b.git", StorageName: "storage1"},
+ },
+ "storage2": {{RelativePath: "c.git", StorageName: "storage2"}},
+ "storage3": {{RelativePath: "d.git", StorageName: "storage3"}},
+ }
+
+ p, err := NewPipeline(testhelper.SharedLogger(t))
+ require.NoError(t, err)
+
+ ctx := testhelper.Context(t)
+ for _, v := range repos {
+ for _, repo := range v {
+ p.Handle(ctx, NewRestoreCommand(strategy, RestoreRequest{Repository: repo}))
+ }
+ }
+
+ processedRepos, err := p.Done()
+ require.NoError(t, err)
+ require.EqualValues(t, repos, processedRepos)
+}
diff --git a/internal/cli/gitalybackup/create.go b/internal/cli/gitalybackup/create.go
index 4ad653490..be7e75cdb 100644
--- a/internal/cli/gitalybackup/create.go
+++ b/internal/cli/gitalybackup/create.go
@@ -174,7 +174,7 @@ func (cmd *createSubcommand) run(ctx context.Context, logger log.Logger, stdin i
}))
}
- if err := pipeline.Done(); err != nil {
+ if _, err := pipeline.Done(); err != nil {
return fmt.Errorf("create: %w", err)
}
return nil
diff --git a/internal/cli/gitalybackup/restore.go b/internal/cli/gitalybackup/restore.go
index de9e2cd3d..7a304f2ed 100644
--- a/internal/cli/gitalybackup/restore.go
+++ b/internal/cli/gitalybackup/restore.go
@@ -157,6 +157,7 @@ func (cmd *restoreSubcommand) run(ctx context.Context, logger log.Logger, stdin
decoder := json.NewDecoder(stdin)
for {
+
var req restoreRequest
if err := decoder.Decode(&req); errors.Is(err, io.EOF) {
break
@@ -178,7 +179,7 @@ func (cmd *restoreSubcommand) run(ctx context.Context, logger log.Logger, stdin
}))
}
- if err := pipeline.Done(); err != nil {
+ if _, err := pipeline.Done(); err != nil {
return fmt.Errorf("restore: %w", err)
}
return nil