diff options
Diffstat (limited to 'internal/backup/pipeline.go')
-rw-r--r-- | internal/backup/pipeline.go | 17 |
1 files changed, 13 insertions, 4 deletions
diff --git a/internal/backup/pipeline.go b/internal/backup/pipeline.go index 5672493ac..8000f61af 100644 --- a/internal/backup/pipeline.go +++ b/internal/backup/pipeline.go @@ -181,6 +181,9 @@ type Pipeline struct { pipelineError error cmdErrors *commandErrors + + processedRepos map[string][]*gitalypb.Repository + processedReposMu sync.Mutex } // NewPipeline creates a pipeline that executes backup and restore jobs. @@ -195,6 +198,7 @@ func NewPipeline(log log.Logger, opts ...PipelineOption) (*Pipeline, error) { done: make(chan struct{}), workersByStorage: make(map[string]chan *contextCommand), cmdErrors: &commandErrors{}, + processedRepos: make(map[string][]*gitalypb.Repository), } for _, opt := range opts { @@ -252,19 +256,19 @@ func (p *Pipeline) Handle(ctx context.Context, cmd Command) { } // Done waits for any in progress jobs to complete then reports any accumulated errors -func (p *Pipeline) Done() error { +func (p *Pipeline) Done() (processedRepos map[string][]*gitalypb.Repository, err error) { close(p.done) p.workerWg.Wait() if p.pipelineError != nil { - return fmt.Errorf("pipeline: %w", p.pipelineError) + return nil, fmt.Errorf("pipeline: %w", p.pipelineError) } if len(p.cmdErrors.errs) > 0 { - return fmt.Errorf("pipeline: %w", p.cmdErrors) + return nil, fmt.Errorf("pipeline: %w", p.cmdErrors) } - return nil + return p.processedRepos, nil } // getWorker finds the channel associated with a storage. When no channel is @@ -325,6 +329,11 @@ func (p *Pipeline) processCommand(ctx context.Context, cmd Command) { return } + storageName := cmd.Repository().StorageName + p.processedReposMu.Lock() + p.processedRepos[storageName] = append(p.processedRepos[storageName], cmd.Repository()) + p.processedReposMu.Unlock() + log.Info(fmt.Sprintf("completed %s", cmd.Name())) } |