diff options
Diffstat (limited to 'internal/backup/pipeline.go')
-rw-r--r-- | internal/backup/pipeline.go | 43 |
1 files changed, 35 insertions, 8 deletions
diff --git a/internal/backup/pipeline.go b/internal/backup/pipeline.go index 5672493ac..feb0eaf96 100644 --- a/internal/backup/pipeline.go +++ b/internal/backup/pipeline.go @@ -12,11 +12,16 @@ import ( "gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb" ) +// repositoryKey uniquely identifies a repository, and is used here to key the +// map of processed repos. +type repositoryKey string + // Strategy used to create/restore backups type Strategy interface { Create(context.Context, *CreateRequest) error Restore(context.Context, *RestoreRequest) error - RemoveAllRepositories(context.Context, *RemoveAllRepositoriesRequest) error + ListRepositories(context.Context, *ListRepositoriesRequest) ([]*gitalypb.Repository, error) + RemoveRepository(context.Context, *RemoveRepositoryRequest) error } // CreateRequest is the request to create a backup @@ -52,9 +57,14 @@ type RestoreRequest struct { BackupID string } -// RemoveAllRepositoriesRequest is the request to remove all repositories in the specified -// storage name. -type RemoveAllRepositoriesRequest struct { +// RemoveRepositoryRequest is a request to remove an individual repository from its storage. +type RemoveRepositoryRequest struct { + Server storage.ServerInfo + Repo *gitalypb.Repository +} + +// ListRepositoriesRequest is the request to list repositories in a given storage. +type ListRepositoriesRequest struct { Server storage.ServerInfo StorageName string } @@ -181,6 +191,9 @@ type Pipeline struct { pipelineError error cmdErrors *commandErrors + + processedRepos map[string]map[repositoryKey]struct{} + processedReposMu sync.Mutex } // NewPipeline creates a pipeline that executes backup and restore jobs. @@ -195,6 +208,7 @@ func NewPipeline(log log.Logger, opts ...PipelineOption) (*Pipeline, error) { done: make(chan struct{}), workersByStorage: make(map[string]chan *contextCommand), cmdErrors: &commandErrors{}, + processedRepos: make(map[string]map[repositoryKey]struct{}), } for _, opt := range opts { @@ -252,19 +266,19 @@ func (p *Pipeline) Handle(ctx context.Context, cmd Command) { } // Done waits for any in progress jobs to complete then reports any accumulated errors -func (p *Pipeline) Done() error { +func (p *Pipeline) Done() (processedRepos map[string]map[repositoryKey]struct{}, err error) { close(p.done) p.workerWg.Wait() if p.pipelineError != nil { - return fmt.Errorf("pipeline: %w", p.pipelineError) + return nil, fmt.Errorf("pipeline: %w", p.pipelineError) } if len(p.cmdErrors.errs) > 0 { - return fmt.Errorf("pipeline: %w", p.cmdErrors) + return nil, fmt.Errorf("pipeline: %w", p.cmdErrors) } - return nil + return p.processedRepos, nil } // getWorker finds the channel associated with a storage. When no channel is @@ -325,6 +339,14 @@ func (p *Pipeline) processCommand(ctx context.Context, cmd Command) { return } + storageName := cmd.Repository().StorageName + p.processedReposMu.Lock() + if _, ok := p.processedRepos[storageName]; !ok { + p.processedRepos[storageName] = make(map[repositoryKey]struct{}) + } + p.processedRepos[storageName][NewRepositoryKey(cmd.Repository())] = struct{}{} + p.processedReposMu.Unlock() + log.Info(fmt.Sprintf("completed %s", cmd.Name())) } @@ -363,3 +385,8 @@ func (p *Pipeline) releaseWorkerSlot() { } <-p.totalWorkers } + +// NewRepositoryKey returns a unique identifier for the provided repo. +func NewRepositoryKey(repo *gitalypb.Repository) repositoryKey { + return repositoryKey(repo.StorageName + "-" + repo.RelativePath) +} |