diff options
-rw-r--r-- | internal/backup/pipeline.go | 17 | ||||
-rw-r--r-- | internal/backup/pipeline_test.go | 37 | ||||
-rw-r--r-- | internal/cli/gitalybackup/create.go | 2 | ||||
-rw-r--r-- | internal/cli/gitalybackup/restore.go | 3 |
4 files changed, 49 insertions, 10 deletions
diff --git a/internal/backup/pipeline.go b/internal/backup/pipeline.go index 5672493ac..8000f61af 100644 --- a/internal/backup/pipeline.go +++ b/internal/backup/pipeline.go @@ -181,6 +181,9 @@ type Pipeline struct { pipelineError error cmdErrors *commandErrors + + processedRepos map[string][]*gitalypb.Repository + processedReposMu sync.Mutex } // NewPipeline creates a pipeline that executes backup and restore jobs. @@ -195,6 +198,7 @@ func NewPipeline(log log.Logger, opts ...PipelineOption) (*Pipeline, error) { done: make(chan struct{}), workersByStorage: make(map[string]chan *contextCommand), cmdErrors: &commandErrors{}, + processedRepos: make(map[string][]*gitalypb.Repository), } for _, opt := range opts { @@ -252,19 +256,19 @@ func (p *Pipeline) Handle(ctx context.Context, cmd Command) { } // Done waits for any in progress jobs to complete then reports any accumulated errors -func (p *Pipeline) Done() error { +func (p *Pipeline) Done() (processedRepos map[string][]*gitalypb.Repository, err error) { close(p.done) p.workerWg.Wait() if p.pipelineError != nil { - return fmt.Errorf("pipeline: %w", p.pipelineError) + return nil, fmt.Errorf("pipeline: %w", p.pipelineError) } if len(p.cmdErrors.errs) > 0 { - return fmt.Errorf("pipeline: %w", p.cmdErrors) + return nil, fmt.Errorf("pipeline: %w", p.cmdErrors) } - return nil + return p.processedRepos, nil } // getWorker finds the channel associated with a storage. When no channel is @@ -325,6 +329,11 @@ func (p *Pipeline) processCommand(ctx context.Context, cmd Command) { return } + storageName := cmd.Repository().StorageName + p.processedReposMu.Lock() + p.processedRepos[storageName] = append(p.processedRepos[storageName], cmd.Repository()) + p.processedReposMu.Unlock() + log.Info(fmt.Sprintf("completed %s", cmd.Name())) } diff --git a/internal/backup/pipeline_test.go b/internal/backup/pipeline_test.go index 04c539a5f..8545c905e 100644 --- a/internal/backup/pipeline_test.go +++ b/internal/backup/pipeline_test.go @@ -98,7 +98,8 @@ func TestPipeline(t *testing.T) { p.Handle(ctx, NewCreateCommand(strategy, CreateRequest{Repository: &gitalypb.Repository{StorageName: "storage1"}})) p.Handle(ctx, NewCreateCommand(strategy, CreateRequest{Repository: &gitalypb.Repository{StorageName: "storage2"}})) } - require.NoError(t, p.Done()) + _, err = p.Done() + require.NoError(t, err) }) } }) @@ -115,7 +116,8 @@ func TestPipeline(t *testing.T) { p.Handle(ctx, NewCreateCommand(strategy, CreateRequest{Repository: &gitalypb.Repository{StorageName: "default"}})) - require.EqualError(t, p.Done(), "pipeline: context canceled") + _, err = p.Done() + require.EqualError(t, err, "pipeline: context canceled") }) } @@ -222,7 +224,7 @@ func testPipeline(t *testing.T, init func() *Pipeline) { require.Equal(t, tc.level, logEntry.Level) } - err := p.Done() + _, err := p.Done() if tc.level == logrus.ErrorLevel { require.EqualError(t, err, "pipeline: 1 failures encountered:\n - c.git: assert.AnError general error for testing\n") @@ -258,7 +260,7 @@ func testPipeline(t *testing.T, init func() *Pipeline) { for _, cmd := range commands { p.Handle(ctx, cmd) } - err := p.Done() + _, err := p.Done() require.EqualError(t, err, "pipeline: 1 failures encountered:\n - c.git: assert.AnError general error for testing\n") }) } @@ -309,3 +311,30 @@ func TestPipelineError(t *testing.T) { }) } } + +func TestPipelineProcessedRepos(t *testing.T) { + strategy := MockStrategy{} + + repos := map[string][]*gitalypb.Repository{ + "storage1": { + {RelativePath: "a.git", StorageName: "storage1"}, + {RelativePath: "b.git", StorageName: "storage1"}, + }, + "storage2": {{RelativePath: "c.git", StorageName: "storage2"}}, + "storage3": {{RelativePath: "d.git", StorageName: "storage3"}}, + } + + p, err := NewPipeline(testhelper.SharedLogger(t)) + require.NoError(t, err) + + ctx := testhelper.Context(t) + for _, v := range repos { + for _, repo := range v { + p.Handle(ctx, NewRestoreCommand(strategy, RestoreRequest{Repository: repo})) + } + } + + processedRepos, err := p.Done() + require.NoError(t, err) + require.EqualValues(t, repos, processedRepos) +} diff --git a/internal/cli/gitalybackup/create.go b/internal/cli/gitalybackup/create.go index 4ad653490..be7e75cdb 100644 --- a/internal/cli/gitalybackup/create.go +++ b/internal/cli/gitalybackup/create.go @@ -174,7 +174,7 @@ func (cmd *createSubcommand) run(ctx context.Context, logger log.Logger, stdin i })) } - if err := pipeline.Done(); err != nil { + if _, err := pipeline.Done(); err != nil { return fmt.Errorf("create: %w", err) } return nil diff --git a/internal/cli/gitalybackup/restore.go b/internal/cli/gitalybackup/restore.go index de9e2cd3d..7a304f2ed 100644 --- a/internal/cli/gitalybackup/restore.go +++ b/internal/cli/gitalybackup/restore.go @@ -157,6 +157,7 @@ func (cmd *restoreSubcommand) run(ctx context.Context, logger log.Logger, stdin decoder := json.NewDecoder(stdin) for { + var req restoreRequest if err := decoder.Decode(&req); errors.Is(err, io.EOF) { break @@ -178,7 +179,7 @@ func (cmd *restoreSubcommand) run(ctx context.Context, logger log.Logger, stdin })) } - if err := pipeline.Done(); err != nil { + if _, err := pipeline.Done(); err != nil { return fmt.Errorf("restore: %w", err) } return nil |