From 95bbcbcca4a2f344a171dba71f30071850f0fba6 Mon Sep 17 00:00:00 2001 From: James Liu Date: Tue, 12 Dec 2023 14:12:04 +1100 Subject: backup: Track repos that have been processed Adds a map to the Pipeline to track repos that have been restored or backed up. A mutex is used to synchronise access to the map, as entries are appended by goroutines operating in the workers. The signature of Done() is modified to return the map, and is intentionally ignored in the actual backup and restore logic for now. A subsequent commit will utilise the map for restore operations. --- internal/backup/pipeline.go | 17 +++++++++++++---- internal/backup/pipeline_test.go | 37 ++++++++++++++++++++++++++++++++---- internal/cli/gitalybackup/create.go | 2 +- internal/cli/gitalybackup/restore.go | 3 ++- 4 files changed, 49 insertions(+), 10 deletions(-) diff --git a/internal/backup/pipeline.go b/internal/backup/pipeline.go index 5672493ac..8000f61af 100644 --- a/internal/backup/pipeline.go +++ b/internal/backup/pipeline.go @@ -181,6 +181,9 @@ type Pipeline struct { pipelineError error cmdErrors *commandErrors + + processedRepos map[string][]*gitalypb.Repository + processedReposMu sync.Mutex } // NewPipeline creates a pipeline that executes backup and restore jobs. @@ -195,6 +198,7 @@ func NewPipeline(log log.Logger, opts ...PipelineOption) (*Pipeline, error) { done: make(chan struct{}), workersByStorage: make(map[string]chan *contextCommand), cmdErrors: &commandErrors{}, + processedRepos: make(map[string][]*gitalypb.Repository), } for _, opt := range opts { @@ -252,19 +256,19 @@ func (p *Pipeline) Handle(ctx context.Context, cmd Command) { } // Done waits for any in progress jobs to complete then reports any accumulated errors -func (p *Pipeline) Done() error { +func (p *Pipeline) Done() (processedRepos map[string][]*gitalypb.Repository, err error) { close(p.done) p.workerWg.Wait() if p.pipelineError != nil { - return fmt.Errorf("pipeline: %w", p.pipelineError) + return nil, fmt.Errorf("pipeline: %w", p.pipelineError) } if len(p.cmdErrors.errs) > 0 { - return fmt.Errorf("pipeline: %w", p.cmdErrors) + return nil, fmt.Errorf("pipeline: %w", p.cmdErrors) } - return nil + return p.processedRepos, nil } // getWorker finds the channel associated with a storage. When no channel is @@ -325,6 +329,11 @@ func (p *Pipeline) processCommand(ctx context.Context, cmd Command) { return } + storageName := cmd.Repository().StorageName + p.processedReposMu.Lock() + p.processedRepos[storageName] = append(p.processedRepos[storageName], cmd.Repository()) + p.processedReposMu.Unlock() + log.Info(fmt.Sprintf("completed %s", cmd.Name())) } diff --git a/internal/backup/pipeline_test.go b/internal/backup/pipeline_test.go index 04c539a5f..8545c905e 100644 --- a/internal/backup/pipeline_test.go +++ b/internal/backup/pipeline_test.go @@ -98,7 +98,8 @@ func TestPipeline(t *testing.T) { p.Handle(ctx, NewCreateCommand(strategy, CreateRequest{Repository: &gitalypb.Repository{StorageName: "storage1"}})) p.Handle(ctx, NewCreateCommand(strategy, CreateRequest{Repository: &gitalypb.Repository{StorageName: "storage2"}})) } - require.NoError(t, p.Done()) + _, err = p.Done() + require.NoError(t, err) }) } }) @@ -115,7 +116,8 @@ func TestPipeline(t *testing.T) { p.Handle(ctx, NewCreateCommand(strategy, CreateRequest{Repository: &gitalypb.Repository{StorageName: "default"}})) - require.EqualError(t, p.Done(), "pipeline: context canceled") + _, err = p.Done() + require.EqualError(t, err, "pipeline: context canceled") }) } @@ -222,7 +224,7 @@ func testPipeline(t *testing.T, init func() *Pipeline) { require.Equal(t, tc.level, logEntry.Level) } - err := p.Done() + _, err := p.Done() if tc.level == logrus.ErrorLevel { require.EqualError(t, err, "pipeline: 1 failures encountered:\n - c.git: assert.AnError general error for testing\n") @@ -258,7 +260,7 @@ func testPipeline(t *testing.T, init func() *Pipeline) { for _, cmd := range commands { p.Handle(ctx, cmd) } - err := p.Done() + _, err := p.Done() require.EqualError(t, err, "pipeline: 1 failures encountered:\n - c.git: assert.AnError general error for testing\n") }) } @@ -309,3 +311,30 @@ func TestPipelineError(t *testing.T) { }) } } + +func TestPipelineProcessedRepos(t *testing.T) { + strategy := MockStrategy{} + + repos := map[string][]*gitalypb.Repository{ + "storage1": { + {RelativePath: "a.git", StorageName: "storage1"}, + {RelativePath: "b.git", StorageName: "storage1"}, + }, + "storage2": {{RelativePath: "c.git", StorageName: "storage2"}}, + "storage3": {{RelativePath: "d.git", StorageName: "storage3"}}, + } + + p, err := NewPipeline(testhelper.SharedLogger(t)) + require.NoError(t, err) + + ctx := testhelper.Context(t) + for _, v := range repos { + for _, repo := range v { + p.Handle(ctx, NewRestoreCommand(strategy, RestoreRequest{Repository: repo})) + } + } + + processedRepos, err := p.Done() + require.NoError(t, err) + require.EqualValues(t, repos, processedRepos) +} diff --git a/internal/cli/gitalybackup/create.go b/internal/cli/gitalybackup/create.go index 4ad653490..be7e75cdb 100644 --- a/internal/cli/gitalybackup/create.go +++ b/internal/cli/gitalybackup/create.go @@ -174,7 +174,7 @@ func (cmd *createSubcommand) run(ctx context.Context, logger log.Logger, stdin i })) } - if err := pipeline.Done(); err != nil { + if _, err := pipeline.Done(); err != nil { return fmt.Errorf("create: %w", err) } return nil diff --git a/internal/cli/gitalybackup/restore.go b/internal/cli/gitalybackup/restore.go index de9e2cd3d..7a304f2ed 100644 --- a/internal/cli/gitalybackup/restore.go +++ b/internal/cli/gitalybackup/restore.go @@ -157,6 +157,7 @@ func (cmd *restoreSubcommand) run(ctx context.Context, logger log.Logger, stdin decoder := json.NewDecoder(stdin) for { + var req restoreRequest if err := decoder.Decode(&req); errors.Is(err, io.EOF) { break @@ -178,7 +179,7 @@ func (cmd *restoreSubcommand) run(ctx context.Context, logger log.Logger, stdin })) } - if err := pipeline.Done(); err != nil { + if _, err := pipeline.Done(); err != nil { return fmt.Errorf("restore: %w", err) } return nil -- cgit v1.2.3