Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
Diffstat (limited to 'internal/backup/pipeline.go')
-rw-r--r--internal/backup/pipeline.go17
1 files changed, 13 insertions, 4 deletions
diff --git a/internal/backup/pipeline.go b/internal/backup/pipeline.go
index 5672493ac..8000f61af 100644
--- a/internal/backup/pipeline.go
+++ b/internal/backup/pipeline.go
@@ -181,6 +181,9 @@ type Pipeline struct {
pipelineError error
cmdErrors *commandErrors
+
+ processedRepos map[string][]*gitalypb.Repository
+ processedReposMu sync.Mutex
}
// NewPipeline creates a pipeline that executes backup and restore jobs.
@@ -195,6 +198,7 @@ func NewPipeline(log log.Logger, opts ...PipelineOption) (*Pipeline, error) {
done: make(chan struct{}),
workersByStorage: make(map[string]chan *contextCommand),
cmdErrors: &commandErrors{},
+ processedRepos: make(map[string][]*gitalypb.Repository),
}
for _, opt := range opts {
@@ -252,19 +256,19 @@ func (p *Pipeline) Handle(ctx context.Context, cmd Command) {
}
// Done waits for any in progress jobs to complete then reports any accumulated errors
-func (p *Pipeline) Done() error {
+func (p *Pipeline) Done() (processedRepos map[string][]*gitalypb.Repository, err error) {
close(p.done)
p.workerWg.Wait()
if p.pipelineError != nil {
- return fmt.Errorf("pipeline: %w", p.pipelineError)
+ return nil, fmt.Errorf("pipeline: %w", p.pipelineError)
}
if len(p.cmdErrors.errs) > 0 {
- return fmt.Errorf("pipeline: %w", p.cmdErrors)
+ return nil, fmt.Errorf("pipeline: %w", p.cmdErrors)
}
- return nil
+ return p.processedRepos, nil
}
// getWorker finds the channel associated with a storage. When no channel is
@@ -325,6 +329,11 @@ func (p *Pipeline) processCommand(ctx context.Context, cmd Command) {
return
}
+ storageName := cmd.Repository().StorageName
+ p.processedReposMu.Lock()
+ p.processedRepos[storageName] = append(p.processedRepos[storageName], cmd.Repository())
+ p.processedReposMu.Unlock()
+
log.Info(fmt.Sprintf("completed %s", cmd.Name()))
}