Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPavlo Strokov <pstrokov@gitlab.com>2021-10-01 15:43:14 +0300
committerPavlo Strokov <pstrokov@gitlab.com>2021-10-08 10:46:19 +0300
commit86b813312e32540e7b7b57cda7ffd4be32b09397 (patch)
treeaa2f2e00a8e12947fe2492212e38416ef511e8aa
parent12f3206cf9a836b84e22be7dfe815d254a500e64 (diff)
list-untracked-repositories: New praefect sub-command
Praefect extended with a new sub-command 'list-untracked-repositories'. It walks over all repositories of all accessible gitaly nodes and checks if repositories exist in the praefect database. If repository doesn't exist in the database the information about its location will be printed on STDOUT in JSON format terminated with a newline (it is configurable with --delimiter flag). STDERR is used to write log messages and errors if any. Closes: https://gitlab.com/gitlab-org/gitaly/-/issues/3792 Changelog: added
-rw-r--r--cmd/praefect/subcmd.go21
-rw-r--r--cmd/praefect/subcmd_list_untracked_repositories.go172
-rw-r--r--cmd/praefect/subcmd_list_untracked_repositories_test.go139
3 files changed, 322 insertions, 10 deletions
diff --git a/cmd/praefect/subcmd.go b/cmd/praefect/subcmd.go
index 70721aa92..a6cf21196 100644
--- a/cmd/praefect/subcmd.go
+++ b/cmd/praefect/subcmd.go
@@ -24,16 +24,17 @@ type subcmd interface {
}
var subcommands = map[string]subcmd{
- "sql-ping": &sqlPingSubcommand{},
- "sql-migrate": &sqlMigrateSubcommand{},
- "dial-nodes": &dialNodesSubcommand{},
- "sql-migrate-down": &sqlMigrateDownSubcommand{},
- "sql-migrate-status": &sqlMigrateStatusSubcommand{},
- "dataloss": newDatalossSubcommand(),
- "accept-dataloss": &acceptDatalossSubcommand{},
- "set-replication-factor": newSetReplicatioFactorSubcommand(os.Stdout),
- removeRepositoryCmdName: newRemoveRepository(logger),
- trackRepositoryCmdName: newTrackRepository(logger),
+ "sql-ping": &sqlPingSubcommand{},
+ "sql-migrate": &sqlMigrateSubcommand{},
+ "dial-nodes": &dialNodesSubcommand{},
+ "sql-migrate-down": &sqlMigrateDownSubcommand{},
+ "sql-migrate-status": &sqlMigrateStatusSubcommand{},
+ "dataloss": newDatalossSubcommand(),
+ "accept-dataloss": &acceptDatalossSubcommand{},
+ "set-replication-factor": newSetReplicatioFactorSubcommand(os.Stdout),
+ removeRepositoryCmdName: newRemoveRepository(logger),
+ trackRepositoryCmdName: newTrackRepository(logger),
+ listUntrackedRepositoriesName: newListUntrackedRepositories(logger, os.Stdout),
}
// subCommand returns an exit code, to be fed into os.Exit.
diff --git a/cmd/praefect/subcmd_list_untracked_repositories.go b/cmd/praefect/subcmd_list_untracked_repositories.go
new file mode 100644
index 000000000..d76eb4328
--- /dev/null
+++ b/cmd/praefect/subcmd_list_untracked_repositories.go
@@ -0,0 +1,172 @@
+package main
+
+import (
+ "context"
+ "encoding/json"
+ "errors"
+ "flag"
+ "fmt"
+ "io"
+
+ "github.com/sirupsen/logrus"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/praefect"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/config"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore/glsql"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/repocleaner"
+ "gitlab.com/gitlab-org/labkit/correlation"
+ "google.golang.org/grpc/metadata"
+)
+
+const (
+ listUntrackedRepositoriesName = "list-untracked-repositories"
+)
+
+var errNoConnectionToGitalies = errors.New("no connection established to gitaly nodes")
+
+type listUntrackedRepositories struct {
+ logger logrus.FieldLogger
+ delimiter string
+ out io.Writer
+}
+
+func newListUntrackedRepositories(logger logrus.FieldLogger, out io.Writer) *listUntrackedRepositories {
+ return &listUntrackedRepositories{logger: logger, out: out}
+}
+
+func (cmd *listUntrackedRepositories) FlagSet() *flag.FlagSet {
+ fs := flag.NewFlagSet(listUntrackedRepositoriesName, flag.ExitOnError)
+ fs.StringVar(&cmd.delimiter, "delimiter", "\n", "string used as a delimiter in output")
+ fs.Usage = func() {
+ _, _ = printfErr("Description:\n" +
+ " This command checks if all repositories on all gitaly nodes tracked by praefect.\n" +
+ " If repository is found on the disk, but it is not known to praefect the location of\n" +
+ " that repository will be written into stdout stream in JSON format.\n")
+ fs.PrintDefaults()
+ _, _ = printfErr("NOTE:\n" +
+ " All errors and log messages directed to the stderr stream.\n" +
+ " The output is produced as the new data appears, it doesn't wait\n" +
+ " for the completion of the processing to produce the result.\n")
+ }
+ return fs
+}
+
+func (cmd listUntrackedRepositories) Exec(flags *flag.FlagSet, cfg config.Config) error {
+ if flags.NArg() > 0 {
+ return unexpectedPositionalArgsError{Command: flags.Name()}
+ }
+
+ ctx := correlation.ContextWithCorrelation(context.Background(), correlation.SafeRandomID())
+ ctx = metadata.AppendToOutgoingContext(ctx, "client_name", listUntrackedRepositoriesName)
+
+ logger := cmd.logger.WithField("correlation_id", correlation.ExtractFromContext(ctx))
+ logger.Debugf("starting %s command", cmd.FlagSet().Name())
+
+ logger.Debug("dialing to gitaly nodes...")
+ nodeSet, err := dialGitalyStorages(cfg)
+ if err != nil {
+ return fmt.Errorf("dial nodes: %w", err)
+ }
+ defer nodeSet.Close()
+ logger.Debug("connected to gitaly nodes")
+
+ logger.Debug("connecting to praefect database...")
+ db, err := glsql.OpenDB(cfg.DB)
+ if err != nil {
+ return fmt.Errorf("connect to database: %w", err)
+ }
+ defer func() { _ = db.Close() }()
+ logger.Debug("connected to praefect database")
+
+ walker := repocleaner.NewWalker(nodeSet.Connections(), 16)
+ reporter := reportUntrackedRepositories{
+ ctx: ctx,
+ checker: datastore.NewStorageCleanup(db),
+ delimiter: cmd.delimiter,
+ out: cmd.out,
+ }
+ for _, vs := range cfg.VirtualStorages {
+ for _, node := range vs.Nodes {
+ logger.Debugf("check %q/%q storage repositories", vs.Name, node.Storage)
+ if err := walker.ExecOnRepositories(ctx, vs.Name, node.Storage, reporter.Report); err != nil {
+ return fmt.Errorf("exec on %q/%q: %w", vs.Name, node.Storage, err)
+ }
+ }
+ }
+ logger.Debug("completed")
+ return nil
+}
+
+func dialGitalyStorages(cfg config.Config) (praefect.NodeSet, error) {
+ nodeSet := praefect.NodeSet{}
+ for _, vs := range cfg.VirtualStorages {
+ for _, node := range vs.Nodes {
+ conn, err := subCmdDial(node.Address, node.Token)
+ if err != nil {
+ return nil, fmt.Errorf("dial with %q gitaly at %q", node.Storage, node.Address)
+ }
+ if _, found := nodeSet[vs.Name]; !found {
+ nodeSet[vs.Name] = map[string]praefect.Node{}
+ }
+ nodeSet[vs.Name][node.Storage] = praefect.Node{
+ Storage: node.Storage,
+ Address: node.Address,
+ Token: node.Token,
+ Connection: conn,
+ }
+ }
+ }
+ if len(nodeSet.Connections()) == 0 {
+ return nil, errNoConnectionToGitalies
+ }
+ return nodeSet, nil
+}
+
+type reportUntrackedRepositories struct {
+ ctx context.Context
+ checker *datastore.StorageCleanup
+ out io.Writer
+ delimiter string
+}
+
+// Report method accepts a list of repositories, checks if they exist in the praefect database
+// and writes JSON serialized location of each untracked repository using the configured delimiter
+// and writer.
+func (r *reportUntrackedRepositories) Report(paths []datastore.RepositoryClusterPath) error {
+ if len(paths) == 0 {
+ return nil
+ }
+
+ replicaRelPaths := make([]string, len(paths))
+ for i, path := range paths {
+ replicaRelPaths[i] = path.RelativeReplicaPath
+ }
+
+ missing, err := r.checker.DoesntExist(r.ctx, paths[0].VirtualStorage, paths[0].Storage, replicaRelPaths)
+ if err != nil {
+ return fmt.Errorf("existence check: %w", err)
+ }
+
+ for _, repoClusterPath := range missing {
+ d, err := serializeRepositoryClusterPath(repoClusterPath)
+ if err != nil {
+ return fmt.Errorf("serialize %+v: %w", repoClusterPath, err)
+ }
+ if _, err := r.out.Write(d); err != nil {
+ return fmt.Errorf("write serialized data to output: %w", err)
+ }
+ if _, err := r.out.Write([]byte(r.delimiter)); err != nil {
+ return fmt.Errorf("write serialized data to output: %w", err)
+ }
+ }
+
+ return nil
+}
+
+func serializeRepositoryClusterPath(path datastore.RepositoryClusterPath) ([]byte, error) {
+ return json.Marshal(map[string]string{
+ "virtual_storage": path.VirtualStorage,
+ "storage": path.Storage,
+ "relative_path": path.RelativeReplicaPath,
+ })
+}
diff --git a/cmd/praefect/subcmd_list_untracked_repositories_test.go b/cmd/praefect/subcmd_list_untracked_repositories_test.go
new file mode 100644
index 000000000..561dc1457
--- /dev/null
+++ b/cmd/praefect/subcmd_list_untracked_repositories_test.go
@@ -0,0 +1,139 @@
+package main
+
+import (
+ "bytes"
+ "context"
+ "flag"
+ "fmt"
+ "strings"
+ "syscall"
+ "testing"
+ "time"
+
+ "github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
+ "gitlab.com/gitlab-org/gitaly/v14/client"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/git/gittest"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/service/setup"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/config"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore/glsql"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testcfg"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testserver"
+ "gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb"
+)
+
+func TestListUntrackedRepositories_FlagSet(t *testing.T) {
+ t.Parallel()
+ cmd := &listUntrackedRepositories{}
+ for _, tc := range []struct {
+ desc string
+ args []string
+ exp []interface{}
+ }{
+ {
+ desc: "custom value",
+ args: []string{"--delimiter", ","},
+ exp: []interface{}{","},
+ },
+ {
+ desc: "default value",
+ args: nil,
+ exp: []interface{}{"\n"},
+ },
+ } {
+ t.Run(tc.desc, func(t *testing.T) {
+ fs := cmd.FlagSet()
+ require.NoError(t, fs.Parse(tc.args))
+ require.ElementsMatch(t, tc.exp, []interface{}{cmd.delimiter})
+ })
+ }
+}
+
+func TestListUntrackedRepositories_Exec(t *testing.T) {
+ t.Parallel()
+ g1Cfg := testcfg.Build(t, testcfg.WithStorages("gitaly-1"))
+ g2Cfg := testcfg.Build(t, testcfg.WithStorages("gitaly-2"))
+
+ // Repositories not managed by praefect.
+ repo1, _ := gittest.InitRepo(t, g1Cfg, g1Cfg.Storages[0])
+ repo2, _ := gittest.InitRepo(t, g1Cfg, g1Cfg.Storages[0])
+ repo3, _ := gittest.InitRepo(t, g2Cfg, g2Cfg.Storages[0])
+
+ g1Addr := testserver.RunGitalyServer(t, g1Cfg, nil, setup.RegisterAll, testserver.WithDisablePraefect())
+ g2Addr := testserver.RunGitalyServer(t, g2Cfg, nil, setup.RegisterAll, testserver.WithDisablePraefect())
+
+ db := glsql.NewDB(t)
+ var database string
+ require.NoError(t, db.QueryRow(`SELECT current_database()`).Scan(&database))
+ dbConf := glsql.GetDBConfig(t, database)
+
+ conf := config.Config{
+ SocketPath: testhelper.GetTemporaryGitalySocketFileName(t),
+ VirtualStorages: []*config.VirtualStorage{
+ {
+ Name: "praefect",
+ Nodes: []*config.Node{
+ {Storage: g1Cfg.Storages[0].Name, Address: g1Addr},
+ {Storage: g2Cfg.Storages[0].Name, Address: g2Addr},
+ },
+ },
+ },
+ DB: dbConf,
+ }
+
+ starterConfigs, err := getStarterConfigs(conf)
+ require.NoError(t, err)
+ stopped := make(chan struct{})
+ go func() {
+ defer close(stopped)
+ err := run(starterConfigs, conf)
+ assert.EqualError(t, err, `received signal "terminated"`)
+ }()
+
+ cc, err := client.Dial("unix://"+conf.SocketPath, nil)
+ require.NoError(t, err)
+ defer func() { require.NoError(t, cc.Close()) }()
+ repoClient := gitalypb.NewRepositoryServiceClient(cc)
+
+ ctx, cancel := testhelper.Context()
+ defer cancel()
+
+ praefectStorage := conf.VirtualStorages[0].Name
+
+ // Repository managed by praefect, exists on gitaly-1 and gitaly-2.
+ createRepo(t, ctx, repoClient, praefectStorage, "path/to/test/repo")
+ out := &bytes.Buffer{}
+ cmd := newListUntrackedRepositories(testhelper.NewTestLogger(t), out)
+ require.NoError(t, cmd.Exec(flag.NewFlagSet("", flag.PanicOnError), conf))
+
+ exp := []string{
+ fmt.Sprintf(`{"relative_path":%q,"storage":"gitaly-1","virtual_storage":"praefect"}`, repo1.RelativePath),
+ fmt.Sprintf(`{"relative_path":%q,"storage":"gitaly-1","virtual_storage":"praefect"}`, repo2.RelativePath),
+ fmt.Sprintf(`{"relative_path":%q,"storage":"gitaly-2","virtual_storage":"praefect"}`, repo3.RelativePath),
+ "", // an empty extra element required as each line ends with "delimiter" and strings.Split returns all parts
+ }
+ require.ElementsMatch(t, exp, strings.Split(out.String(), "\n"))
+
+ require.NoError(t, syscall.Kill(syscall.Getpid(), syscall.SIGTERM))
+ <-stopped
+}
+
+func createRepo(t *testing.T, ctx context.Context, repoClient gitalypb.RepositoryServiceClient, storageName, relativePath string) *gitalypb.Repository {
+ t.Helper()
+ repo := &gitalypb.Repository{
+ StorageName: storageName,
+ RelativePath: relativePath,
+ }
+ for i := 0; true; i++ {
+ _, err := repoClient.CreateRepository(ctx, &gitalypb.CreateRepositoryRequest{Repository: repo})
+ if err != nil {
+ require.Regexp(t, "(no healthy nodes)|(no such file or directory)|(connection refused)", err.Error())
+ require.Less(t, i, 100, "praefect doesn't serve for too long")
+ time.Sleep(50 * time.Millisecond)
+ } else {
+ break
+ }
+ }
+ return repo
+}