diff options
author | Pavlo Strokov <pstrokov@gitlab.com> | 2021-09-30 22:28:51 +0300 |
---|---|---|
committer | Pavlo Strokov <pstrokov@gitlab.com> | 2021-10-08 10:46:18 +0300 |
commit | 00a4efe81d711f172347d3acbe893b360dab3c88 (patch) | |
tree | c801c8a9a7680c5f8d1da571dfa43ae178841642 | |
parent | 982f7e5cbc4449e19d6fce3cdf8d47715548e470 (diff) |
list-untracked-repositories: Extract ExecOnRepositories method for re-use
The ExecOnRepositories method is extracted into a new Walker type.
It will be re-used for the new 'list-untracked-repositories' sub-command.
-rw-r--r-- | internal/praefect/repocleaner/repository.go | 27 |
1 files changed, 21 insertions, 6 deletions
diff --git a/internal/praefect/repocleaner/repository.go b/internal/praefect/repocleaner/repository.go index fbf3b841b..ad6e053dc 100644 --- a/internal/praefect/repocleaner/repository.go +++ b/internal/praefect/repocleaner/repository.go @@ -42,6 +42,7 @@ type Runner struct { logger logrus.FieldLogger healthChecker praefect.HealthChecker conns praefect.Connections + walker *Walker stateOwner StateOwner acquirer Acquirer action Action @@ -64,6 +65,7 @@ func NewRunner(cfg Cfg, logger logrus.FieldLogger, healthChecker praefect.Health logger: logger.WithField("component", "repocleaner.repository_existence"), healthChecker: healthChecker, conns: conns, + walker: NewWalker(conns, cfg.RepositoriesInBatch), stateOwner: stateOwner, acquirer: acquirer, action: action, @@ -127,7 +129,7 @@ func (gs *Runner) run(ctx context.Context) { } logger = gs.loggerWith(clusterPath.VirtualStorage, clusterPath.Storage) - err = gs.execOnRepositories(ctx, clusterPath.VirtualStorage, clusterPath.Storage, func(paths []datastore.RepositoryClusterPath) { + err = gs.walker.ExecOnRepositories(ctx, clusterPath.VirtualStorage, clusterPath.Storage, func(paths []datastore.RepositoryClusterPath) { relativePaths := make([]string, len(paths)) for i, path := range paths { relativePaths[i] = path.RelativeReplicaPath @@ -153,8 +155,21 @@ func (gs *Runner) loggerWith(virtualStorage, storage string) logrus.FieldLogger return gs.logger.WithFields(logrus.Fields{"virtual_storage": virtualStorage, "storage": storage}) } -func (gs *Runner) execOnRepositories(ctx context.Context, virtualStorage, storage string, action func([]datastore.RepositoryClusterPath)) error { - gclient, err := gs.getInternalGitalyClient(virtualStorage, storage) +// Walker allows walk by the repositories of the gitaly storage. +type Walker struct { + conns praefect.Connections + batchSize int +} + +// NewWalker returns a new *Walker instance. +func NewWalker(conns praefect.Connections, batchSize int) *Walker { + return &Walker{conns: conns, batchSize: batchSize} +} + +// ExecOnRepositories runs throw the all repositories on a gitaly storage and executes provided action. +// The processing is done in batches to reduce cost of operations. +func (wr *Walker) ExecOnRepositories(ctx context.Context, virtualStorage, storage string, action func([]datastore.RepositoryClusterPath)) error { + gclient, err := wr.getInternalGitalyClient(virtualStorage, storage) if err != nil { return fmt.Errorf("setup gitaly client: %w", err) } @@ -164,7 +179,7 @@ func (gs *Runner) execOnRepositories(ctx context.Context, virtualStorage, storag return fmt.Errorf("unable to walk repos: %w", err) } - batch := make([]datastore.RepositoryClusterPath, 0, gs.cfg.RepositoriesInBatch) + batch := make([]datastore.RepositoryClusterPath, 0, wr.batchSize) for { res, err := resp.Recv() if err != nil { @@ -193,8 +208,8 @@ func (gs *Runner) execOnRepositories(ctx context.Context, virtualStorage, storag return nil } -func (gs *Runner) getInternalGitalyClient(virtualStorage, storage string) (gitalypb.InternalGitalyClient, error) { - conn, found := gs.conns[virtualStorage][storage] +func (wr *Walker) getInternalGitalyClient(virtualStorage, storage string) (gitalypb.InternalGitalyClient, error) { + conn, found := wr.conns[virtualStorage][storage] if !found { return nil, fmt.Errorf("no connection to the gitaly node %q/%q", virtualStorage, storage) } |