diff options
Diffstat (limited to 'internal/backup/pipeline.go')
-rw-r--r-- | internal/backup/pipeline.go | 34 |
1 files changed, 8 insertions, 26 deletions
diff --git a/internal/backup/pipeline.go b/internal/backup/pipeline.go index 3eb91de91..5672493ac 100644 --- a/internal/backup/pipeline.go +++ b/internal/backup/pipeline.go @@ -16,8 +16,7 @@ import ( type Strategy interface { Create(context.Context, *CreateRequest) error Restore(context.Context, *RestoreRequest) error - ListRepositories(context.Context, *ListRepositoriesRequest) ([]*gitalypb.Repository, error) - RemoveRepository(context.Context, *RemoveRepositoryRequest) error + RemoveAllRepositories(context.Context, *RemoveAllRepositoriesRequest) error } // CreateRequest is the request to create a backup @@ -53,14 +52,9 @@ type RestoreRequest struct { BackupID string } -// RemoveRepositoryRequest is a request to remove an individual repository from its storage. -type RemoveRepositoryRequest struct { - Server storage.ServerInfo - Repo *gitalypb.Repository -} - -// ListRepositoriesRequest is the request to list repositories in a given storage. -type ListRepositoriesRequest struct { +// RemoveAllRepositoriesRequest is the request to remove all repositories in the specified +// storage name. +type RemoveAllRepositoriesRequest struct { Server storage.ServerInfo StorageName string } @@ -187,9 +181,6 @@ type Pipeline struct { pipelineError error cmdErrors *commandErrors - - processedRepos map[string]map[*gitalypb.Repository]struct{} - processedReposMu sync.Mutex } // NewPipeline creates a pipeline that executes backup and restore jobs. @@ -204,7 +195,6 @@ func NewPipeline(log log.Logger, opts ...PipelineOption) (*Pipeline, error) { done: make(chan struct{}), workersByStorage: make(map[string]chan *contextCommand), cmdErrors: &commandErrors{}, - processedRepos: make(map[string]map[*gitalypb.Repository]struct{}), } for _, opt := range opts { @@ -262,19 +252,19 @@ func (p *Pipeline) Handle(ctx context.Context, cmd Command) { } // Done waits for any in progress jobs to complete then reports any accumulated errors -func (p *Pipeline) Done() (processedRepos map[string]map[*gitalypb.Repository]struct{}, err error) { +func (p *Pipeline) Done() error { close(p.done) p.workerWg.Wait() if p.pipelineError != nil { - return nil, fmt.Errorf("pipeline: %w", p.pipelineError) + return fmt.Errorf("pipeline: %w", p.pipelineError) } if len(p.cmdErrors.errs) > 0 { - return nil, fmt.Errorf("pipeline: %w", p.cmdErrors) + return fmt.Errorf("pipeline: %w", p.cmdErrors) } - return p.processedRepos, nil + return nil } // getWorker finds the channel associated with a storage. When no channel is @@ -335,14 +325,6 @@ func (p *Pipeline) processCommand(ctx context.Context, cmd Command) { return } - storageName := cmd.Repository().StorageName - p.processedReposMu.Lock() - if _, ok := p.processedRepos[storageName]; !ok { - p.processedRepos[storageName] = make(map[*gitalypb.Repository]struct{}) - } - p.processedRepos[storageName][cmd.Repository()] = struct{}{} - p.processedReposMu.Unlock() - log.Info(fmt.Sprintf("completed %s", cmd.Name())) } |