Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPavlo Strokov <pstrokov@gitlab.com>2021-11-23 23:31:37 +0300
committerPavlo Strokov <pstrokov@gitlab.com>2021-11-23 23:31:37 +0300
commit9a2edd07ff933e1844a2380a437c84be53336ca2 (patch)
treed0bf08dd24bfa6837685bff08ec8d54bcdc0edd5
parentfe9eb6b44c555fe2c3ccecbe2c989eeebc0adb8e (diff)
parentb6fb5c332c131df79668ae600aa34e42189a12d3 (diff)
Merge branch 'ps-backport-sub-cmds-14-4' into '14-4-stable'
list-untracked-repositories: Praefect sub-command to show untracked repositories See merge request gitlab-org/gitaly!4115
-rw-r--r--cmd/praefect/subcmd.go21
-rw-r--r--cmd/praefect/subcmd_list_untracked_repositories.go172
-rw-r--r--cmd/praefect/subcmd_list_untracked_repositories_test.go135
-rw-r--r--cmd/praefect/subcmd_remove_repository_test.go11
-rw-r--r--cmd/praefect/subcmd_track_repository_test.go4
-rw-r--r--internal/praefect/repocleaner/repository.go41
6 files changed, 352 insertions, 32 deletions
diff --git a/cmd/praefect/subcmd.go b/cmd/praefect/subcmd.go
index 70721aa92..a6cf21196 100644
--- a/cmd/praefect/subcmd.go
+++ b/cmd/praefect/subcmd.go
@@ -24,16 +24,17 @@ type subcmd interface {
}
var subcommands = map[string]subcmd{
- "sql-ping": &sqlPingSubcommand{},
- "sql-migrate": &sqlMigrateSubcommand{},
- "dial-nodes": &dialNodesSubcommand{},
- "sql-migrate-down": &sqlMigrateDownSubcommand{},
- "sql-migrate-status": &sqlMigrateStatusSubcommand{},
- "dataloss": newDatalossSubcommand(),
- "accept-dataloss": &acceptDatalossSubcommand{},
- "set-replication-factor": newSetReplicatioFactorSubcommand(os.Stdout),
- removeRepositoryCmdName: newRemoveRepository(logger),
- trackRepositoryCmdName: newTrackRepository(logger),
+ "sql-ping": &sqlPingSubcommand{},
+ "sql-migrate": &sqlMigrateSubcommand{},
+ "dial-nodes": &dialNodesSubcommand{},
+ "sql-migrate-down": &sqlMigrateDownSubcommand{},
+ "sql-migrate-status": &sqlMigrateStatusSubcommand{},
+ "dataloss": newDatalossSubcommand(),
+ "accept-dataloss": &acceptDatalossSubcommand{},
+ "set-replication-factor": newSetReplicatioFactorSubcommand(os.Stdout),
+ removeRepositoryCmdName: newRemoveRepository(logger),
+ trackRepositoryCmdName: newTrackRepository(logger),
+ listUntrackedRepositoriesName: newListUntrackedRepositories(logger, os.Stdout),
}
// subCommand returns an exit code, to be fed into os.Exit.
diff --git a/cmd/praefect/subcmd_list_untracked_repositories.go b/cmd/praefect/subcmd_list_untracked_repositories.go
new file mode 100644
index 000000000..d76eb4328
--- /dev/null
+++ b/cmd/praefect/subcmd_list_untracked_repositories.go
@@ -0,0 +1,172 @@
+package main
+
+import (
+ "context"
+ "encoding/json"
+ "errors"
+ "flag"
+ "fmt"
+ "io"
+
+ "github.com/sirupsen/logrus"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/praefect"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/config"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore/glsql"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/repocleaner"
+ "gitlab.com/gitlab-org/labkit/correlation"
+ "google.golang.org/grpc/metadata"
+)
+
+const (
+ listUntrackedRepositoriesName = "list-untracked-repositories"
+)
+
+var errNoConnectionToGitalies = errors.New("no connection established to gitaly nodes")
+
+type listUntrackedRepositories struct {
+ logger logrus.FieldLogger
+ delimiter string
+ out io.Writer
+}
+
+func newListUntrackedRepositories(logger logrus.FieldLogger, out io.Writer) *listUntrackedRepositories {
+ return &listUntrackedRepositories{logger: logger, out: out}
+}
+
+func (cmd *listUntrackedRepositories) FlagSet() *flag.FlagSet {
+ fs := flag.NewFlagSet(listUntrackedRepositoriesName, flag.ExitOnError)
+ fs.StringVar(&cmd.delimiter, "delimiter", "\n", "string used as a delimiter in output")
+ fs.Usage = func() {
+ _, _ = printfErr("Description:\n" +
+ " This command checks if all repositories on all gitaly nodes tracked by praefect.\n" +
+ " If repository is found on the disk, but it is not known to praefect the location of\n" +
+ " that repository will be written into stdout stream in JSON format.\n")
+ fs.PrintDefaults()
+ _, _ = printfErr("NOTE:\n" +
+ " All errors and log messages directed to the stderr stream.\n" +
+ " The output is produced as the new data appears, it doesn't wait\n" +
+ " for the completion of the processing to produce the result.\n")
+ }
+ return fs
+}
+
+func (cmd listUntrackedRepositories) Exec(flags *flag.FlagSet, cfg config.Config) error {
+ if flags.NArg() > 0 {
+ return unexpectedPositionalArgsError{Command: flags.Name()}
+ }
+
+ ctx := correlation.ContextWithCorrelation(context.Background(), correlation.SafeRandomID())
+ ctx = metadata.AppendToOutgoingContext(ctx, "client_name", listUntrackedRepositoriesName)
+
+ logger := cmd.logger.WithField("correlation_id", correlation.ExtractFromContext(ctx))
+ logger.Debugf("starting %s command", cmd.FlagSet().Name())
+
+ logger.Debug("dialing to gitaly nodes...")
+ nodeSet, err := dialGitalyStorages(cfg)
+ if err != nil {
+ return fmt.Errorf("dial nodes: %w", err)
+ }
+ defer nodeSet.Close()
+ logger.Debug("connected to gitaly nodes")
+
+ logger.Debug("connecting to praefect database...")
+ db, err := glsql.OpenDB(cfg.DB)
+ if err != nil {
+ return fmt.Errorf("connect to database: %w", err)
+ }
+ defer func() { _ = db.Close() }()
+ logger.Debug("connected to praefect database")
+
+ walker := repocleaner.NewWalker(nodeSet.Connections(), 16)
+ reporter := reportUntrackedRepositories{
+ ctx: ctx,
+ checker: datastore.NewStorageCleanup(db),
+ delimiter: cmd.delimiter,
+ out: cmd.out,
+ }
+ for _, vs := range cfg.VirtualStorages {
+ for _, node := range vs.Nodes {
+ logger.Debugf("check %q/%q storage repositories", vs.Name, node.Storage)
+ if err := walker.ExecOnRepositories(ctx, vs.Name, node.Storage, reporter.Report); err != nil {
+ return fmt.Errorf("exec on %q/%q: %w", vs.Name, node.Storage, err)
+ }
+ }
+ }
+ logger.Debug("completed")
+ return nil
+}
+
+func dialGitalyStorages(cfg config.Config) (praefect.NodeSet, error) {
+ nodeSet := praefect.NodeSet{}
+ for _, vs := range cfg.VirtualStorages {
+ for _, node := range vs.Nodes {
+ conn, err := subCmdDial(node.Address, node.Token)
+ if err != nil {
+ return nil, fmt.Errorf("dial with %q gitaly at %q", node.Storage, node.Address)
+ }
+ if _, found := nodeSet[vs.Name]; !found {
+ nodeSet[vs.Name] = map[string]praefect.Node{}
+ }
+ nodeSet[vs.Name][node.Storage] = praefect.Node{
+ Storage: node.Storage,
+ Address: node.Address,
+ Token: node.Token,
+ Connection: conn,
+ }
+ }
+ }
+ if len(nodeSet.Connections()) == 0 {
+ return nil, errNoConnectionToGitalies
+ }
+ return nodeSet, nil
+}
+
+type reportUntrackedRepositories struct {
+ ctx context.Context
+ checker *datastore.StorageCleanup
+ out io.Writer
+ delimiter string
+}
+
+// Report method accepts a list of repositories, checks if they exist in the praefect database
+// and writes JSON serialized location of each untracked repository using the configured delimiter
+// and writer.
+func (r *reportUntrackedRepositories) Report(paths []datastore.RepositoryClusterPath) error {
+ if len(paths) == 0 {
+ return nil
+ }
+
+ replicaRelPaths := make([]string, len(paths))
+ for i, path := range paths {
+ replicaRelPaths[i] = path.RelativeReplicaPath
+ }
+
+ missing, err := r.checker.DoesntExist(r.ctx, paths[0].VirtualStorage, paths[0].Storage, replicaRelPaths)
+ if err != nil {
+ return fmt.Errorf("existence check: %w", err)
+ }
+
+ for _, repoClusterPath := range missing {
+ d, err := serializeRepositoryClusterPath(repoClusterPath)
+ if err != nil {
+ return fmt.Errorf("serialize %+v: %w", repoClusterPath, err)
+ }
+ if _, err := r.out.Write(d); err != nil {
+ return fmt.Errorf("write serialized data to output: %w", err)
+ }
+ if _, err := r.out.Write([]byte(r.delimiter)); err != nil {
+ return fmt.Errorf("write serialized data to output: %w", err)
+ }
+ }
+
+ return nil
+}
+
+func serializeRepositoryClusterPath(path datastore.RepositoryClusterPath) ([]byte, error) {
+ return json.Marshal(map[string]string{
+ "virtual_storage": path.VirtualStorage,
+ "storage": path.Storage,
+ "relative_path": path.RelativeReplicaPath,
+ })
+}
diff --git a/cmd/praefect/subcmd_list_untracked_repositories_test.go b/cmd/praefect/subcmd_list_untracked_repositories_test.go
new file mode 100644
index 000000000..aa8c4ec7c
--- /dev/null
+++ b/cmd/praefect/subcmd_list_untracked_repositories_test.go
@@ -0,0 +1,135 @@
+package main
+
+import (
+ "bytes"
+ "context"
+ "flag"
+ "fmt"
+ "strings"
+ "testing"
+ "time"
+
+ "github.com/prometheus/client_golang/prometheus"
+ "github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
+ "gitlab.com/gitlab-org/gitaly/v14/client"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/bootstrap"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/git/gittest"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/service/setup"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/config"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore/glsql"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testcfg"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testserver"
+ "gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb"
+)
+
+func TestListUntrackedRepositories_FlagSet(t *testing.T) {
+ t.Parallel()
+ cmd := &listUntrackedRepositories{}
+ for _, tc := range []struct {
+ desc string
+ args []string
+ exp []interface{}
+ }{
+ {
+ desc: "custom value",
+ args: []string{"--delimiter", ","},
+ exp: []interface{}{","},
+ },
+ {
+ desc: "default value",
+ args: nil,
+ exp: []interface{}{"\n"},
+ },
+ } {
+ t.Run(tc.desc, func(t *testing.T) {
+ fs := cmd.FlagSet()
+ require.NoError(t, fs.Parse(tc.args))
+ require.ElementsMatch(t, tc.exp, []interface{}{cmd.delimiter})
+ })
+ }
+}
+
+func TestListUntrackedRepositories_Exec(t *testing.T) {
+ t.Parallel()
+ g1Cfg := testcfg.Build(t, testcfg.WithStorages("gitaly-1"))
+ g2Cfg := testcfg.Build(t, testcfg.WithStorages("gitaly-2"))
+
+ // Repositories not managed by praefect.
+ repo1, _ := gittest.InitRepo(t, g1Cfg, g1Cfg.Storages[0])
+ repo2, _ := gittest.InitRepo(t, g1Cfg, g1Cfg.Storages[0])
+ repo3, _ := gittest.InitRepo(t, g2Cfg, g2Cfg.Storages[0])
+
+ g1Addr := testserver.RunGitalyServer(t, g1Cfg, nil, setup.RegisterAll, testserver.WithDisablePraefect())
+ g2Addr := testserver.RunGitalyServer(t, g2Cfg, nil, setup.RegisterAll, testserver.WithDisablePraefect())
+
+ db := glsql.NewDB(t)
+ dbConf := glsql.GetDBConfig(t, db.Name)
+
+ conf := config.Config{
+ SocketPath: testhelper.GetTemporaryGitalySocketFileName(t),
+ VirtualStorages: []*config.VirtualStorage{
+ {
+ Name: "praefect",
+ Nodes: []*config.Node{
+ {Storage: g1Cfg.Storages[0].Name, Address: g1Addr},
+ {Storage: g2Cfg.Storages[0].Name, Address: g2Addr},
+ },
+ },
+ },
+ DB: dbConf,
+ }
+
+ starterConfigs, err := getStarterConfigs(conf)
+ require.NoError(t, err)
+ bootstrapper := bootstrap.NewNoop()
+ go func() {
+ assert.NoError(t, run(starterConfigs, conf, bootstrapper, prometheus.NewRegistry(), prometheus.NewRegistry()))
+ }()
+
+ cc, err := client.Dial("unix://"+conf.SocketPath, nil)
+ require.NoError(t, err)
+ defer func() { require.NoError(t, cc.Close()) }()
+ repoClient := gitalypb.NewRepositoryServiceClient(cc)
+
+ ctx, cancel := testhelper.Context()
+ defer cancel()
+
+ praefectStorage := conf.VirtualStorages[0].Name
+
+ // Repository managed by praefect, exists on gitaly-1 and gitaly-2.
+ createRepo(t, ctx, repoClient, praefectStorage, "path/to/test/repo")
+ out := &bytes.Buffer{}
+ cmd := newListUntrackedRepositories(testhelper.NewTestLogger(t), out)
+ require.NoError(t, cmd.Exec(flag.NewFlagSet("", flag.PanicOnError), conf))
+
+ exp := []string{
+ fmt.Sprintf(`{"relative_path":%q,"storage":"gitaly-1","virtual_storage":"praefect"}`, repo1.RelativePath),
+ fmt.Sprintf(`{"relative_path":%q,"storage":"gitaly-1","virtual_storage":"praefect"}`, repo2.RelativePath),
+ fmt.Sprintf(`{"relative_path":%q,"storage":"gitaly-2","virtual_storage":"praefect"}`, repo3.RelativePath),
+ "", // an empty extra element required as each line ends with "delimiter" and strings.Split returns all parts
+ }
+ require.ElementsMatch(t, exp, strings.Split(out.String(), "\n"))
+
+ bootstrapper.Terminate()
+}
+
+func createRepo(t *testing.T, ctx context.Context, repoClient gitalypb.RepositoryServiceClient, storageName, relativePath string) *gitalypb.Repository {
+ t.Helper()
+ repo := &gitalypb.Repository{
+ StorageName: storageName,
+ RelativePath: relativePath,
+ }
+ for i := 0; true; i++ {
+ _, err := repoClient.CreateRepository(ctx, &gitalypb.CreateRepositoryRequest{Repository: repo})
+ if err != nil {
+ require.Regexp(t, "(no healthy nodes)|(no such file or directory)|(connection refused)", err.Error())
+ require.Less(t, i, 100, "praefect doesn't serve for too long")
+ time.Sleep(50 * time.Millisecond)
+ } else {
+ break
+ }
+ }
+ return repo
+}
diff --git a/cmd/praefect/subcmd_remove_repository_test.go b/cmd/praefect/subcmd_remove_repository_test.go
index bb56dee3c..10d0424b2 100644
--- a/cmd/praefect/subcmd_remove_repository_test.go
+++ b/cmd/praefect/subcmd_remove_repository_test.go
@@ -67,9 +67,7 @@ func TestRemoveRepository_Exec_invalidArgs(t *testing.T) {
t.Run("praefect address is not set in config ", func(t *testing.T) {
cmd := removeRepository{virtualStorage: "stub", relativePath: "stub", logger: testhelper.NewTestLogger(t)}
db := glsql.NewDB(t)
- var database string
- require.NoError(t, db.QueryRow(`SELECT current_database()`).Scan(&database))
- dbConf := glsql.GetDBConfig(t, database)
+ dbConf := glsql.GetDBConfig(t, db.Name)
err := cmd.Exec(flag.NewFlagSet("", flag.PanicOnError), config.Config{DB: dbConf})
require.EqualError(t, err, "get praefect address from config: no Praefect address configured")
})
@@ -84,9 +82,7 @@ func TestRemoveRepository_Exec(t *testing.T) {
g2Srv := testserver.StartGitalyServer(t, g2Cfg, nil, setup.RegisterAll, testserver.WithDisablePraefect())
db := glsql.NewDB(t)
- var database string
- require.NoError(t, db.QueryRow(`SELECT current_database()`).Scan(&database))
- dbConf := glsql.GetDBConfig(t, database)
+ dbConf := glsql.GetDBConfig(t, db.Name)
conf := config.Config{
SocketPath: testhelper.GetTemporaryGitalySocketFileName(t),
@@ -108,10 +104,8 @@ func TestRemoveRepository_Exec(t *testing.T) {
starterConfigs, err := getStarterConfigs(conf)
require.NoError(t, err)
- stopped := make(chan struct{})
bootstrapper := bootstrap.NewNoop()
go func() {
- defer close(stopped)
assert.NoError(t, run(starterConfigs, conf, bootstrapper, prometheus.NewRegistry(), prometheus.NewRegistry()))
}()
@@ -231,7 +225,6 @@ func TestRemoveRepository_Exec(t *testing.T) {
})
bootstrapper.Terminate()
- <-stopped
}
func requireNoDatabaseInfo(t *testing.T, db glsql.DB, cmd *removeRepository) {
diff --git a/cmd/praefect/subcmd_track_repository_test.go b/cmd/praefect/subcmd_track_repository_test.go
index c03329eb1..4f03576ed 100644
--- a/cmd/praefect/subcmd_track_repository_test.go
+++ b/cmd/praefect/subcmd_track_repository_test.go
@@ -83,9 +83,7 @@ func TestAddRepository_Exec(t *testing.T) {
g1Addr := g1Srv.Address()
db := glsql.NewDB(t)
- var database string
- require.NoError(t, db.QueryRow(`SELECT current_database()`).Scan(&database))
- dbConf := glsql.GetDBConfig(t, database)
+ dbConf := glsql.GetDBConfig(t, db.Name)
virtualStorageName := "praefect"
conf := config.Config{
diff --git a/internal/praefect/repocleaner/repository.go b/internal/praefect/repocleaner/repository.go
index fbf3b841b..ad15319bd 100644
--- a/internal/praefect/repocleaner/repository.go
+++ b/internal/praefect/repocleaner/repository.go
@@ -42,6 +42,7 @@ type Runner struct {
logger logrus.FieldLogger
healthChecker praefect.HealthChecker
conns praefect.Connections
+ walker *Walker
stateOwner StateOwner
acquirer Acquirer
action Action
@@ -64,6 +65,7 @@ func NewRunner(cfg Cfg, logger logrus.FieldLogger, healthChecker praefect.Health
logger: logger.WithField("component", "repocleaner.repository_existence"),
healthChecker: healthChecker,
conns: conns,
+ walker: NewWalker(conns, cfg.RepositoriesInBatch),
stateOwner: stateOwner,
acquirer: acquirer,
action: action,
@@ -127,7 +129,7 @@ func (gs *Runner) run(ctx context.Context) {
}
logger = gs.loggerWith(clusterPath.VirtualStorage, clusterPath.Storage)
- err = gs.execOnRepositories(ctx, clusterPath.VirtualStorage, clusterPath.Storage, func(paths []datastore.RepositoryClusterPath) {
+ err = gs.walker.ExecOnRepositories(ctx, clusterPath.VirtualStorage, clusterPath.Storage, func(paths []datastore.RepositoryClusterPath) error {
relativePaths := make([]string, len(paths))
for i, path := range paths {
relativePaths[i] = path.RelativeReplicaPath
@@ -135,13 +137,15 @@ func (gs *Runner) run(ctx context.Context) {
notExisting, err := gs.stateOwner.DoesntExist(ctx, clusterPath.VirtualStorage, clusterPath.Storage, relativePaths)
if err != nil {
logger.WithError(err).WithField("repositories", paths).Error("failed to check existence")
- return
+ return nil
}
if err := gs.action.Perform(ctx, notExisting); err != nil {
logger.WithError(err).WithField("existence", notExisting).Error("perform action")
- return
+ return nil
}
+
+ return nil
})
if err != nil {
logger.WithError(err).Error("failed to exec action on repositories")
@@ -153,8 +157,21 @@ func (gs *Runner) loggerWith(virtualStorage, storage string) logrus.FieldLogger
return gs.logger.WithFields(logrus.Fields{"virtual_storage": virtualStorage, "storage": storage})
}
-func (gs *Runner) execOnRepositories(ctx context.Context, virtualStorage, storage string, action func([]datastore.RepositoryClusterPath)) error {
- gclient, err := gs.getInternalGitalyClient(virtualStorage, storage)
+// Walker allows walk by the repositories of the gitaly storage.
+type Walker struct {
+ conns praefect.Connections
+ batchSize int
+}
+
+// NewWalker returns a new *Walker instance.
+func NewWalker(conns praefect.Connections, batchSize int) *Walker {
+ return &Walker{conns: conns, batchSize: batchSize}
+}
+
+// ExecOnRepositories runs through all the repositories on a Gitaly storage and executes the provided action.
+// The processing is done in batches to reduce cost of operations.
+func (wr *Walker) ExecOnRepositories(ctx context.Context, virtualStorage, storage string, action func([]datastore.RepositoryClusterPath) error) error {
+ gclient, err := wr.getInternalGitalyClient(virtualStorage, storage)
if err != nil {
return fmt.Errorf("setup gitaly client: %w", err)
}
@@ -164,7 +181,7 @@ func (gs *Runner) execOnRepositories(ctx context.Context, virtualStorage, storag
return fmt.Errorf("unable to walk repos: %w", err)
}
- batch := make([]datastore.RepositoryClusterPath, 0, gs.cfg.RepositoriesInBatch)
+ batch := make([]datastore.RepositoryClusterPath, 0, wr.batchSize)
for {
res, err := resp.Recv()
if err != nil {
@@ -183,18 +200,22 @@ func (gs *Runner) execOnRepositories(ctx context.Context, virtualStorage, storag
})
if len(batch) == cap(batch) {
- action(batch)
+ if err := action(batch); err != nil {
+ return err
+ }
batch = batch[:0]
}
}
if len(batch) > 0 {
- action(batch)
+ if err := action(batch); err != nil {
+ return err
+ }
}
return nil
}
-func (gs *Runner) getInternalGitalyClient(virtualStorage, storage string) (gitalypb.InternalGitalyClient, error) {
- conn, found := gs.conns[virtualStorage][storage]
+func (wr *Walker) getInternalGitalyClient(virtualStorage, storage string) (gitalypb.InternalGitalyClient, error) {
+ conn, found := wr.conns[virtualStorage][storage]
if !found {
return nil, fmt.Errorf("no connection to the gitaly node %q/%q", virtualStorage, storage)
}