diff options
author | Pavlo Strokov <pstrokov@gitlab.com> | 2021-11-23 23:31:37 +0300 |
---|---|---|
committer | Pavlo Strokov <pstrokov@gitlab.com> | 2021-11-23 23:31:37 +0300 |
commit | 9a2edd07ff933e1844a2380a437c84be53336ca2 (patch) | |
tree | d0bf08dd24bfa6837685bff08ec8d54bcdc0edd5 | |
parent | fe9eb6b44c555fe2c3ccecbe2c989eeebc0adb8e (diff) | |
parent | b6fb5c332c131df79668ae600aa34e42189a12d3 (diff) |
Merge branch 'ps-backport-sub-cmds-14-4' into '14-4-stable'
list-untracked-repositories: Praefect sub-command to show untracked repositories
See merge request gitlab-org/gitaly!4115
-rw-r--r-- | cmd/praefect/subcmd.go | 21 | ||||
-rw-r--r-- | cmd/praefect/subcmd_list_untracked_repositories.go | 172 | ||||
-rw-r--r-- | cmd/praefect/subcmd_list_untracked_repositories_test.go | 135 | ||||
-rw-r--r-- | cmd/praefect/subcmd_remove_repository_test.go | 11 | ||||
-rw-r--r-- | cmd/praefect/subcmd_track_repository_test.go | 4 | ||||
-rw-r--r-- | internal/praefect/repocleaner/repository.go | 41 |
6 files changed, 352 insertions, 32 deletions
diff --git a/cmd/praefect/subcmd.go b/cmd/praefect/subcmd.go index 70721aa92..a6cf21196 100644 --- a/cmd/praefect/subcmd.go +++ b/cmd/praefect/subcmd.go @@ -24,16 +24,17 @@ type subcmd interface { } var subcommands = map[string]subcmd{ - "sql-ping": &sqlPingSubcommand{}, - "sql-migrate": &sqlMigrateSubcommand{}, - "dial-nodes": &dialNodesSubcommand{}, - "sql-migrate-down": &sqlMigrateDownSubcommand{}, - "sql-migrate-status": &sqlMigrateStatusSubcommand{}, - "dataloss": newDatalossSubcommand(), - "accept-dataloss": &acceptDatalossSubcommand{}, - "set-replication-factor": newSetReplicatioFactorSubcommand(os.Stdout), - removeRepositoryCmdName: newRemoveRepository(logger), - trackRepositoryCmdName: newTrackRepository(logger), + "sql-ping": &sqlPingSubcommand{}, + "sql-migrate": &sqlMigrateSubcommand{}, + "dial-nodes": &dialNodesSubcommand{}, + "sql-migrate-down": &sqlMigrateDownSubcommand{}, + "sql-migrate-status": &sqlMigrateStatusSubcommand{}, + "dataloss": newDatalossSubcommand(), + "accept-dataloss": &acceptDatalossSubcommand{}, + "set-replication-factor": newSetReplicatioFactorSubcommand(os.Stdout), + removeRepositoryCmdName: newRemoveRepository(logger), + trackRepositoryCmdName: newTrackRepository(logger), + listUntrackedRepositoriesName: newListUntrackedRepositories(logger, os.Stdout), } // subCommand returns an exit code, to be fed into os.Exit. diff --git a/cmd/praefect/subcmd_list_untracked_repositories.go b/cmd/praefect/subcmd_list_untracked_repositories.go new file mode 100644 index 000000000..d76eb4328 --- /dev/null +++ b/cmd/praefect/subcmd_list_untracked_repositories.go @@ -0,0 +1,172 @@ +package main + +import ( + "context" + "encoding/json" + "errors" + "flag" + "fmt" + "io" + + "github.com/sirupsen/logrus" + "gitlab.com/gitlab-org/gitaly/v14/internal/praefect" + "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/config" + "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore" + "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore/glsql" + "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/repocleaner" + "gitlab.com/gitlab-org/labkit/correlation" + "google.golang.org/grpc/metadata" +) + +const ( + listUntrackedRepositoriesName = "list-untracked-repositories" +) + +var errNoConnectionToGitalies = errors.New("no connection established to gitaly nodes") + +type listUntrackedRepositories struct { + logger logrus.FieldLogger + delimiter string + out io.Writer +} + +func newListUntrackedRepositories(logger logrus.FieldLogger, out io.Writer) *listUntrackedRepositories { + return &listUntrackedRepositories{logger: logger, out: out} +} + +func (cmd *listUntrackedRepositories) FlagSet() *flag.FlagSet { + fs := flag.NewFlagSet(listUntrackedRepositoriesName, flag.ExitOnError) + fs.StringVar(&cmd.delimiter, "delimiter", "\n", "string used as a delimiter in output") + fs.Usage = func() { + _, _ = printfErr("Description:\n" + + " This command checks if all repositories on all gitaly nodes tracked by praefect.\n" + + " If repository is found on the disk, but it is not known to praefect the location of\n" + + " that repository will be written into stdout stream in JSON format.\n") + fs.PrintDefaults() + _, _ = printfErr("NOTE:\n" + + " All errors and log messages directed to the stderr stream.\n" + + " The output is produced as the new data appears, it doesn't wait\n" + + " for the completion of the processing to produce the result.\n") + } + return fs +} + +func (cmd listUntrackedRepositories) Exec(flags *flag.FlagSet, cfg config.Config) error { + if flags.NArg() > 0 { + return unexpectedPositionalArgsError{Command: flags.Name()} + } + + ctx := correlation.ContextWithCorrelation(context.Background(), correlation.SafeRandomID()) + ctx = metadata.AppendToOutgoingContext(ctx, "client_name", listUntrackedRepositoriesName) + + logger := cmd.logger.WithField("correlation_id", correlation.ExtractFromContext(ctx)) + logger.Debugf("starting %s command", cmd.FlagSet().Name()) + + logger.Debug("dialing to gitaly nodes...") + nodeSet, err := dialGitalyStorages(cfg) + if err != nil { + return fmt.Errorf("dial nodes: %w", err) + } + defer nodeSet.Close() + logger.Debug("connected to gitaly nodes") + + logger.Debug("connecting to praefect database...") + db, err := glsql.OpenDB(cfg.DB) + if err != nil { + return fmt.Errorf("connect to database: %w", err) + } + defer func() { _ = db.Close() }() + logger.Debug("connected to praefect database") + + walker := repocleaner.NewWalker(nodeSet.Connections(), 16) + reporter := reportUntrackedRepositories{ + ctx: ctx, + checker: datastore.NewStorageCleanup(db), + delimiter: cmd.delimiter, + out: cmd.out, + } + for _, vs := range cfg.VirtualStorages { + for _, node := range vs.Nodes { + logger.Debugf("check %q/%q storage repositories", vs.Name, node.Storage) + if err := walker.ExecOnRepositories(ctx, vs.Name, node.Storage, reporter.Report); err != nil { + return fmt.Errorf("exec on %q/%q: %w", vs.Name, node.Storage, err) + } + } + } + logger.Debug("completed") + return nil +} + +func dialGitalyStorages(cfg config.Config) (praefect.NodeSet, error) { + nodeSet := praefect.NodeSet{} + for _, vs := range cfg.VirtualStorages { + for _, node := range vs.Nodes { + conn, err := subCmdDial(node.Address, node.Token) + if err != nil { + return nil, fmt.Errorf("dial with %q gitaly at %q", node.Storage, node.Address) + } + if _, found := nodeSet[vs.Name]; !found { + nodeSet[vs.Name] = map[string]praefect.Node{} + } + nodeSet[vs.Name][node.Storage] = praefect.Node{ + Storage: node.Storage, + Address: node.Address, + Token: node.Token, + Connection: conn, + } + } + } + if len(nodeSet.Connections()) == 0 { + return nil, errNoConnectionToGitalies + } + return nodeSet, nil +} + +type reportUntrackedRepositories struct { + ctx context.Context + checker *datastore.StorageCleanup + out io.Writer + delimiter string +} + +// Report method accepts a list of repositories, checks if they exist in the praefect database +// and writes JSON serialized location of each untracked repository using the configured delimiter +// and writer. +func (r *reportUntrackedRepositories) Report(paths []datastore.RepositoryClusterPath) error { + if len(paths) == 0 { + return nil + } + + replicaRelPaths := make([]string, len(paths)) + for i, path := range paths { + replicaRelPaths[i] = path.RelativeReplicaPath + } + + missing, err := r.checker.DoesntExist(r.ctx, paths[0].VirtualStorage, paths[0].Storage, replicaRelPaths) + if err != nil { + return fmt.Errorf("existence check: %w", err) + } + + for _, repoClusterPath := range missing { + d, err := serializeRepositoryClusterPath(repoClusterPath) + if err != nil { + return fmt.Errorf("serialize %+v: %w", repoClusterPath, err) + } + if _, err := r.out.Write(d); err != nil { + return fmt.Errorf("write serialized data to output: %w", err) + } + if _, err := r.out.Write([]byte(r.delimiter)); err != nil { + return fmt.Errorf("write serialized data to output: %w", err) + } + } + + return nil +} + +func serializeRepositoryClusterPath(path datastore.RepositoryClusterPath) ([]byte, error) { + return json.Marshal(map[string]string{ + "virtual_storage": path.VirtualStorage, + "storage": path.Storage, + "relative_path": path.RelativeReplicaPath, + }) +} diff --git a/cmd/praefect/subcmd_list_untracked_repositories_test.go b/cmd/praefect/subcmd_list_untracked_repositories_test.go new file mode 100644 index 000000000..aa8c4ec7c --- /dev/null +++ b/cmd/praefect/subcmd_list_untracked_repositories_test.go @@ -0,0 +1,135 @@ +package main + +import ( + "bytes" + "context" + "flag" + "fmt" + "strings" + "testing" + "time" + + "github.com/prometheus/client_golang/prometheus" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "gitlab.com/gitlab-org/gitaly/v14/client" + "gitlab.com/gitlab-org/gitaly/v14/internal/bootstrap" + "gitlab.com/gitlab-org/gitaly/v14/internal/git/gittest" + "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/service/setup" + "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/config" + "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore/glsql" + "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper" + "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testcfg" + "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testserver" + "gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb" +) + +func TestListUntrackedRepositories_FlagSet(t *testing.T) { + t.Parallel() + cmd := &listUntrackedRepositories{} + for _, tc := range []struct { + desc string + args []string + exp []interface{} + }{ + { + desc: "custom value", + args: []string{"--delimiter", ","}, + exp: []interface{}{","}, + }, + { + desc: "default value", + args: nil, + exp: []interface{}{"\n"}, + }, + } { + t.Run(tc.desc, func(t *testing.T) { + fs := cmd.FlagSet() + require.NoError(t, fs.Parse(tc.args)) + require.ElementsMatch(t, tc.exp, []interface{}{cmd.delimiter}) + }) + } +} + +func TestListUntrackedRepositories_Exec(t *testing.T) { + t.Parallel() + g1Cfg := testcfg.Build(t, testcfg.WithStorages("gitaly-1")) + g2Cfg := testcfg.Build(t, testcfg.WithStorages("gitaly-2")) + + // Repositories not managed by praefect. + repo1, _ := gittest.InitRepo(t, g1Cfg, g1Cfg.Storages[0]) + repo2, _ := gittest.InitRepo(t, g1Cfg, g1Cfg.Storages[0]) + repo3, _ := gittest.InitRepo(t, g2Cfg, g2Cfg.Storages[0]) + + g1Addr := testserver.RunGitalyServer(t, g1Cfg, nil, setup.RegisterAll, testserver.WithDisablePraefect()) + g2Addr := testserver.RunGitalyServer(t, g2Cfg, nil, setup.RegisterAll, testserver.WithDisablePraefect()) + + db := glsql.NewDB(t) + dbConf := glsql.GetDBConfig(t, db.Name) + + conf := config.Config{ + SocketPath: testhelper.GetTemporaryGitalySocketFileName(t), + VirtualStorages: []*config.VirtualStorage{ + { + Name: "praefect", + Nodes: []*config.Node{ + {Storage: g1Cfg.Storages[0].Name, Address: g1Addr}, + {Storage: g2Cfg.Storages[0].Name, Address: g2Addr}, + }, + }, + }, + DB: dbConf, + } + + starterConfigs, err := getStarterConfigs(conf) + require.NoError(t, err) + bootstrapper := bootstrap.NewNoop() + go func() { + assert.NoError(t, run(starterConfigs, conf, bootstrapper, prometheus.NewRegistry(), prometheus.NewRegistry())) + }() + + cc, err := client.Dial("unix://"+conf.SocketPath, nil) + require.NoError(t, err) + defer func() { require.NoError(t, cc.Close()) }() + repoClient := gitalypb.NewRepositoryServiceClient(cc) + + ctx, cancel := testhelper.Context() + defer cancel() + + praefectStorage := conf.VirtualStorages[0].Name + + // Repository managed by praefect, exists on gitaly-1 and gitaly-2. + createRepo(t, ctx, repoClient, praefectStorage, "path/to/test/repo") + out := &bytes.Buffer{} + cmd := newListUntrackedRepositories(testhelper.NewTestLogger(t), out) + require.NoError(t, cmd.Exec(flag.NewFlagSet("", flag.PanicOnError), conf)) + + exp := []string{ + fmt.Sprintf(`{"relative_path":%q,"storage":"gitaly-1","virtual_storage":"praefect"}`, repo1.RelativePath), + fmt.Sprintf(`{"relative_path":%q,"storage":"gitaly-1","virtual_storage":"praefect"}`, repo2.RelativePath), + fmt.Sprintf(`{"relative_path":%q,"storage":"gitaly-2","virtual_storage":"praefect"}`, repo3.RelativePath), + "", // an empty extra element required as each line ends with "delimiter" and strings.Split returns all parts + } + require.ElementsMatch(t, exp, strings.Split(out.String(), "\n")) + + bootstrapper.Terminate() +} + +func createRepo(t *testing.T, ctx context.Context, repoClient gitalypb.RepositoryServiceClient, storageName, relativePath string) *gitalypb.Repository { + t.Helper() + repo := &gitalypb.Repository{ + StorageName: storageName, + RelativePath: relativePath, + } + for i := 0; true; i++ { + _, err := repoClient.CreateRepository(ctx, &gitalypb.CreateRepositoryRequest{Repository: repo}) + if err != nil { + require.Regexp(t, "(no healthy nodes)|(no such file or directory)|(connection refused)", err.Error()) + require.Less(t, i, 100, "praefect doesn't serve for too long") + time.Sleep(50 * time.Millisecond) + } else { + break + } + } + return repo +} diff --git a/cmd/praefect/subcmd_remove_repository_test.go b/cmd/praefect/subcmd_remove_repository_test.go index bb56dee3c..10d0424b2 100644 --- a/cmd/praefect/subcmd_remove_repository_test.go +++ b/cmd/praefect/subcmd_remove_repository_test.go @@ -67,9 +67,7 @@ func TestRemoveRepository_Exec_invalidArgs(t *testing.T) { t.Run("praefect address is not set in config ", func(t *testing.T) { cmd := removeRepository{virtualStorage: "stub", relativePath: "stub", logger: testhelper.NewTestLogger(t)} db := glsql.NewDB(t) - var database string - require.NoError(t, db.QueryRow(`SELECT current_database()`).Scan(&database)) - dbConf := glsql.GetDBConfig(t, database) + dbConf := glsql.GetDBConfig(t, db.Name) err := cmd.Exec(flag.NewFlagSet("", flag.PanicOnError), config.Config{DB: dbConf}) require.EqualError(t, err, "get praefect address from config: no Praefect address configured") }) @@ -84,9 +82,7 @@ func TestRemoveRepository_Exec(t *testing.T) { g2Srv := testserver.StartGitalyServer(t, g2Cfg, nil, setup.RegisterAll, testserver.WithDisablePraefect()) db := glsql.NewDB(t) - var database string - require.NoError(t, db.QueryRow(`SELECT current_database()`).Scan(&database)) - dbConf := glsql.GetDBConfig(t, database) + dbConf := glsql.GetDBConfig(t, db.Name) conf := config.Config{ SocketPath: testhelper.GetTemporaryGitalySocketFileName(t), @@ -108,10 +104,8 @@ func TestRemoveRepository_Exec(t *testing.T) { starterConfigs, err := getStarterConfigs(conf) require.NoError(t, err) - stopped := make(chan struct{}) bootstrapper := bootstrap.NewNoop() go func() { - defer close(stopped) assert.NoError(t, run(starterConfigs, conf, bootstrapper, prometheus.NewRegistry(), prometheus.NewRegistry())) }() @@ -231,7 +225,6 @@ func TestRemoveRepository_Exec(t *testing.T) { }) bootstrapper.Terminate() - <-stopped } func requireNoDatabaseInfo(t *testing.T, db glsql.DB, cmd *removeRepository) { diff --git a/cmd/praefect/subcmd_track_repository_test.go b/cmd/praefect/subcmd_track_repository_test.go index c03329eb1..4f03576ed 100644 --- a/cmd/praefect/subcmd_track_repository_test.go +++ b/cmd/praefect/subcmd_track_repository_test.go @@ -83,9 +83,7 @@ func TestAddRepository_Exec(t *testing.T) { g1Addr := g1Srv.Address() db := glsql.NewDB(t) - var database string - require.NoError(t, db.QueryRow(`SELECT current_database()`).Scan(&database)) - dbConf := glsql.GetDBConfig(t, database) + dbConf := glsql.GetDBConfig(t, db.Name) virtualStorageName := "praefect" conf := config.Config{ diff --git a/internal/praefect/repocleaner/repository.go b/internal/praefect/repocleaner/repository.go index fbf3b841b..ad15319bd 100644 --- a/internal/praefect/repocleaner/repository.go +++ b/internal/praefect/repocleaner/repository.go @@ -42,6 +42,7 @@ type Runner struct { logger logrus.FieldLogger healthChecker praefect.HealthChecker conns praefect.Connections + walker *Walker stateOwner StateOwner acquirer Acquirer action Action @@ -64,6 +65,7 @@ func NewRunner(cfg Cfg, logger logrus.FieldLogger, healthChecker praefect.Health logger: logger.WithField("component", "repocleaner.repository_existence"), healthChecker: healthChecker, conns: conns, + walker: NewWalker(conns, cfg.RepositoriesInBatch), stateOwner: stateOwner, acquirer: acquirer, action: action, @@ -127,7 +129,7 @@ func (gs *Runner) run(ctx context.Context) { } logger = gs.loggerWith(clusterPath.VirtualStorage, clusterPath.Storage) - err = gs.execOnRepositories(ctx, clusterPath.VirtualStorage, clusterPath.Storage, func(paths []datastore.RepositoryClusterPath) { + err = gs.walker.ExecOnRepositories(ctx, clusterPath.VirtualStorage, clusterPath.Storage, func(paths []datastore.RepositoryClusterPath) error { relativePaths := make([]string, len(paths)) for i, path := range paths { relativePaths[i] = path.RelativeReplicaPath @@ -135,13 +137,15 @@ func (gs *Runner) run(ctx context.Context) { notExisting, err := gs.stateOwner.DoesntExist(ctx, clusterPath.VirtualStorage, clusterPath.Storage, relativePaths) if err != nil { logger.WithError(err).WithField("repositories", paths).Error("failed to check existence") - return + return nil } if err := gs.action.Perform(ctx, notExisting); err != nil { logger.WithError(err).WithField("existence", notExisting).Error("perform action") - return + return nil } + + return nil }) if err != nil { logger.WithError(err).Error("failed to exec action on repositories") @@ -153,8 +157,21 @@ func (gs *Runner) loggerWith(virtualStorage, storage string) logrus.FieldLogger return gs.logger.WithFields(logrus.Fields{"virtual_storage": virtualStorage, "storage": storage}) } -func (gs *Runner) execOnRepositories(ctx context.Context, virtualStorage, storage string, action func([]datastore.RepositoryClusterPath)) error { - gclient, err := gs.getInternalGitalyClient(virtualStorage, storage) +// Walker allows walk by the repositories of the gitaly storage. +type Walker struct { + conns praefect.Connections + batchSize int +} + +// NewWalker returns a new *Walker instance. +func NewWalker(conns praefect.Connections, batchSize int) *Walker { + return &Walker{conns: conns, batchSize: batchSize} +} + +// ExecOnRepositories runs through all the repositories on a Gitaly storage and executes the provided action. +// The processing is done in batches to reduce cost of operations. +func (wr *Walker) ExecOnRepositories(ctx context.Context, virtualStorage, storage string, action func([]datastore.RepositoryClusterPath) error) error { + gclient, err := wr.getInternalGitalyClient(virtualStorage, storage) if err != nil { return fmt.Errorf("setup gitaly client: %w", err) } @@ -164,7 +181,7 @@ func (gs *Runner) execOnRepositories(ctx context.Context, virtualStorage, storag return fmt.Errorf("unable to walk repos: %w", err) } - batch := make([]datastore.RepositoryClusterPath, 0, gs.cfg.RepositoriesInBatch) + batch := make([]datastore.RepositoryClusterPath, 0, wr.batchSize) for { res, err := resp.Recv() if err != nil { @@ -183,18 +200,22 @@ func (gs *Runner) execOnRepositories(ctx context.Context, virtualStorage, storag }) if len(batch) == cap(batch) { - action(batch) + if err := action(batch); err != nil { + return err + } batch = batch[:0] } } if len(batch) > 0 { - action(batch) + if err := action(batch); err != nil { + return err + } } return nil } -func (gs *Runner) getInternalGitalyClient(virtualStorage, storage string) (gitalypb.InternalGitalyClient, error) { - conn, found := gs.conns[virtualStorage][storage] +func (wr *Walker) getInternalGitalyClient(virtualStorage, storage string) (gitalypb.InternalGitalyClient, error) { + conn, found := wr.conns[virtualStorage][storage] if !found { return nil, fmt.Errorf("no connection to the gitaly node %q/%q", virtualStorage, storage) } |