diff options
author | Sami Hiltunen <shiltunen@gitlab.com> | 2021-11-23 00:10:22 +0300 |
---|---|---|
committer | Sami Hiltunen <shiltunen@gitlab.com> | 2021-11-23 00:10:22 +0300 |
commit | 5b5447202878a42c9afb7b8e844eca73012d7157 (patch) | |
tree | 6346dfa76abac389620dfd6645113ddbe9581184 | |
parent | 650cb6e64c077ac89f8e8c4f175f602b504ef143 (diff) | |
parent | 8e6a8c7b7fda4c79392effa05530c9d421487475 (diff) |
Merge branch 'ps-backport-sub-cmds-14-3' into '14-3-stable'
Backport praefect sub-commands to 14.3
See merge request gitlab-org/gitaly!4114
-rw-r--r-- | cmd/praefect/subcmd.go | 26 | ||||
-rw-r--r-- | cmd/praefect/subcmd_accept_dataloss.go | 2 | ||||
-rw-r--r-- | cmd/praefect/subcmd_dataloss.go | 2 | ||||
-rw-r--r-- | cmd/praefect/subcmd_list_untracked_repositories.go | 172 | ||||
-rw-r--r-- | cmd/praefect/subcmd_list_untracked_repositories_test.go | 135 | ||||
-rw-r--r-- | cmd/praefect/subcmd_pingnodes.go | 2 | ||||
-rw-r--r-- | cmd/praefect/subcmd_remove_repository.go | 5 | ||||
-rw-r--r-- | cmd/praefect/subcmd_remove_repository_test.go | 57 | ||||
-rw-r--r-- | cmd/praefect/subcmd_set_replication_factor.go | 2 | ||||
-rw-r--r-- | cmd/praefect/subcmd_track_repository.go | 236 | ||||
-rw-r--r-- | cmd/praefect/subcmd_track_repository_test.go | 231 | ||||
-rw-r--r-- | internal/praefect/datastore/storage_cleanup.go | 85 | ||||
-rw-r--r-- | internal/praefect/repocleaner/init_test.go | 21 | ||||
-rw-r--r-- | internal/praefect/repocleaner/repository.go | 77 | ||||
-rw-r--r-- | internal/praefect/repocleaner/repository_test.go | 114 |
15 files changed, 1120 insertions, 47 deletions
diff --git a/cmd/praefect/subcmd.go b/cmd/praefect/subcmd.go index ca11658fe..a232835db 100644 --- a/cmd/praefect/subcmd.go +++ b/cmd/praefect/subcmd.go @@ -23,16 +23,20 @@ type subcmd interface { Exec(flags *flag.FlagSet, config config.Config) error } +const defaultDialTimeout = 30 * time.Second + var subcommands = map[string]subcmd{ - "sql-ping": &sqlPingSubcommand{}, - "sql-migrate": &sqlMigrateSubcommand{}, - "dial-nodes": &dialNodesSubcommand{}, - "sql-migrate-down": &sqlMigrateDownSubcommand{}, - "sql-migrate-status": &sqlMigrateStatusSubcommand{}, - "dataloss": newDatalossSubcommand(), - "accept-dataloss": &acceptDatalossSubcommand{}, - "set-replication-factor": newSetReplicatioFactorSubcommand(os.Stdout), - removeRepositoryCmdName: newRemoveRepository(logger), + "sql-ping": &sqlPingSubcommand{}, + "sql-migrate": &sqlMigrateSubcommand{}, + "dial-nodes": &dialNodesSubcommand{}, + "sql-migrate-down": &sqlMigrateDownSubcommand{}, + "sql-migrate-status": &sqlMigrateStatusSubcommand{}, + "dataloss": newDatalossSubcommand(), + "accept-dataloss": &acceptDatalossSubcommand{}, + "set-replication-factor": newSetReplicatioFactorSubcommand(os.Stdout), + removeRepositoryCmdName: newRemoveRepository(logger), + trackRepositoryCmdName: newTrackRepository(logger), + listUntrackedRepositoriesName: newListUntrackedRepositories(logger, os.Stdout), } // subCommand returns an exit code, to be fed into os.Exit. @@ -149,8 +153,8 @@ func printfErr(format string, a ...interface{}) (int, error) { return fmt.Fprintf(os.Stderr, format, a...) } -func subCmdDial(addr, token string, opts ...grpc.DialOption) (*grpc.ClientConn, error) { - ctx, cancel := context.WithTimeout(context.TODO(), 30*time.Second) +func subCmdDial(ctx context.Context, addr, token string, timeout time.Duration, opts ...grpc.DialOption) (*grpc.ClientConn, error) { + ctx, cancel := context.WithTimeout(ctx, timeout) defer cancel() opts = append(opts, diff --git a/cmd/praefect/subcmd_accept_dataloss.go b/cmd/praefect/subcmd_accept_dataloss.go index 0c4ea015a..e037df424 100644 --- a/cmd/praefect/subcmd_accept_dataloss.go +++ b/cmd/praefect/subcmd_accept_dataloss.go @@ -51,7 +51,7 @@ func (cmd *acceptDatalossSubcommand) Exec(flags *flag.FlagSet, cfg config.Config return err } - conn, err := subCmdDial(nodeAddr, cfg.Auth.Token) + conn, err := subCmdDial(context.Background(), nodeAddr, cfg.Auth.Token, defaultDialTimeout) if err != nil { return fmt.Errorf("error dialing: %w", err) } diff --git a/cmd/praefect/subcmd_dataloss.go b/cmd/praefect/subcmd_dataloss.go index 06b493d71..75a51c266 100644 --- a/cmd/praefect/subcmd_dataloss.go +++ b/cmd/praefect/subcmd_dataloss.go @@ -65,7 +65,7 @@ func (cmd *datalossSubcommand) Exec(flags *flag.FlagSet, cfg config.Config) erro return err } - conn, err := subCmdDial(nodeAddr, cfg.Auth.Token) + conn, err := subCmdDial(context.Background(), nodeAddr, cfg.Auth.Token, defaultDialTimeout) if err != nil { return fmt.Errorf("error dialing: %v", err) } diff --git a/cmd/praefect/subcmd_list_untracked_repositories.go b/cmd/praefect/subcmd_list_untracked_repositories.go new file mode 100644 index 000000000..bff6140a0 --- /dev/null +++ b/cmd/praefect/subcmd_list_untracked_repositories.go @@ -0,0 +1,172 @@ +package main + +import ( + "context" + "encoding/json" + "errors" + "flag" + "fmt" + "io" + + "github.com/sirupsen/logrus" + "gitlab.com/gitlab-org/gitaly/v14/internal/praefect" + "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/config" + "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore" + "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore/glsql" + "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/repocleaner" + "gitlab.com/gitlab-org/labkit/correlation" + "google.golang.org/grpc/metadata" +) + +const ( + listUntrackedRepositoriesName = "list-untracked-repositories" +) + +var errNoConnectionToGitalies = errors.New("no connection established to gitaly nodes") + +type listUntrackedRepositories struct { + logger logrus.FieldLogger + delimiter string + out io.Writer +} + +func newListUntrackedRepositories(logger logrus.FieldLogger, out io.Writer) *listUntrackedRepositories { + return &listUntrackedRepositories{logger: logger, out: out} +} + +func (cmd *listUntrackedRepositories) FlagSet() *flag.FlagSet { + fs := flag.NewFlagSet(listUntrackedRepositoriesName, flag.ExitOnError) + fs.StringVar(&cmd.delimiter, "delimiter", "\n", "string used as a delimiter in output") + fs.Usage = func() { + _, _ = printfErr("Description:\n" + + " This command checks if all repositories on all gitaly nodes tracked by praefect.\n" + + " If repository is found on the disk, but it is not known to praefect the location of\n" + + " that repository will be written into stdout stream in JSON format.\n") + fs.PrintDefaults() + _, _ = printfErr("NOTE:\n" + + " All errors and log messages directed to the stderr stream.\n" + + " The output is produced as the new data appears, it doesn't wait\n" + + " for the completion of the processing to produce the result.\n") + } + return fs +} + +func (cmd listUntrackedRepositories) Exec(flags *flag.FlagSet, cfg config.Config) error { + if flags.NArg() > 0 { + return unexpectedPositionalArgsError{Command: flags.Name()} + } + + ctx := correlation.ContextWithCorrelation(context.Background(), correlation.SafeRandomID()) + ctx = metadata.AppendToOutgoingContext(ctx, "client_name", listUntrackedRepositoriesName) + + logger := cmd.logger.WithField("correlation_id", correlation.ExtractFromContext(ctx)) + logger.Debugf("starting %s command", cmd.FlagSet().Name()) + + logger.Debug("dialing to gitaly nodes...") + nodeSet, err := dialGitalyStorages(cfg) + if err != nil { + return fmt.Errorf("dial nodes: %w", err) + } + defer nodeSet.Close() + logger.Debug("connected to gitaly nodes") + + logger.Debug("connecting to praefect database...") + db, err := glsql.OpenDB(cfg.DB) + if err != nil { + return fmt.Errorf("connect to database: %w", err) + } + defer func() { _ = db.Close() }() + logger.Debug("connected to praefect database") + + walker := repocleaner.NewWalker(nodeSet.Connections(), 16) + reporter := reportUntrackedRepositories{ + ctx: ctx, + checker: datastore.NewStorageCleanup(db), + delimiter: cmd.delimiter, + out: cmd.out, + } + for _, vs := range cfg.VirtualStorages { + for _, node := range vs.Nodes { + logger.Debugf("check %q/%q storage repositories", vs.Name, node.Storage) + if err := walker.ExecOnRepositories(ctx, vs.Name, node.Storage, reporter.Report); err != nil { + return fmt.Errorf("exec on %q/%q: %w", vs.Name, node.Storage, err) + } + } + } + logger.Debug("completed") + return nil +} + +func dialGitalyStorages(cfg config.Config) (praefect.NodeSet, error) { + nodeSet := praefect.NodeSet{} + for _, vs := range cfg.VirtualStorages { + for _, node := range vs.Nodes { + conn, err := subCmdDial(context.Background(), node.Address, node.Token, defaultDialTimeout) + if err != nil { + return nil, fmt.Errorf("dial with %q gitaly at %q", node.Storage, node.Address) + } + if _, found := nodeSet[vs.Name]; !found { + nodeSet[vs.Name] = map[string]praefect.Node{} + } + nodeSet[vs.Name][node.Storage] = praefect.Node{ + Storage: node.Storage, + Address: node.Address, + Token: node.Token, + Connection: conn, + } + } + } + if len(nodeSet.Connections()) == 0 { + return nil, errNoConnectionToGitalies + } + return nodeSet, nil +} + +type reportUntrackedRepositories struct { + ctx context.Context + checker *datastore.StorageCleanup + out io.Writer + delimiter string +} + +// Report method accepts a list of repositories, checks if they exist in the praefect database +// and writes JSON serialized location of each untracked repository using the configured delimiter +// and writer. +func (r *reportUntrackedRepositories) Report(paths []datastore.RepositoryClusterPath) error { + if len(paths) == 0 { + return nil + } + + replicaRelPaths := make([]string, len(paths)) + for i, path := range paths { + replicaRelPaths[i] = path.RelativePath + } + + missing, err := r.checker.DoesntExist(r.ctx, paths[0].VirtualStorage, paths[0].Storage, replicaRelPaths) + if err != nil { + return fmt.Errorf("existence check: %w", err) + } + + for _, repoClusterPath := range missing { + d, err := serializeRepositoryClusterPath(repoClusterPath) + if err != nil { + return fmt.Errorf("serialize %+v: %w", repoClusterPath, err) + } + if _, err := r.out.Write(d); err != nil { + return fmt.Errorf("write serialized data to output: %w", err) + } + if _, err := r.out.Write([]byte(r.delimiter)); err != nil { + return fmt.Errorf("write serialized data to output: %w", err) + } + } + + return nil +} + +func serializeRepositoryClusterPath(path datastore.RepositoryClusterPath) ([]byte, error) { + return json.Marshal(map[string]string{ + "virtual_storage": path.VirtualStorage, + "storage": path.Storage, + "relative_path": path.RelativePath, + }) +} diff --git a/cmd/praefect/subcmd_list_untracked_repositories_test.go b/cmd/praefect/subcmd_list_untracked_repositories_test.go new file mode 100644 index 000000000..a24300ebe --- /dev/null +++ b/cmd/praefect/subcmd_list_untracked_repositories_test.go @@ -0,0 +1,135 @@ +package main + +import ( + "bytes" + "context" + "database/sql" + "flag" + "fmt" + "strings" + "testing" + "time" + + "github.com/prometheus/client_golang/prometheus" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "gitlab.com/gitlab-org/gitaly/v14/client" + "gitlab.com/gitlab-org/gitaly/v14/internal/bootstrap" + "gitlab.com/gitlab-org/gitaly/v14/internal/git/gittest" + "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/service/setup" + "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/config" + "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore/glsql" + "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper" + "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testcfg" + "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testserver" + "gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb" +) + +func TestListUntrackedRepositories_FlagSet(t *testing.T) { + t.Parallel() + cmd := &listUntrackedRepositories{} + for _, tc := range []struct { + desc string + args []string + exp []interface{} + }{ + { + desc: "custom value", + args: []string{"--delimiter", ","}, + exp: []interface{}{","}, + }, + { + desc: "default value", + args: nil, + exp: []interface{}{"\n"}, + }, + } { + t.Run(tc.desc, func(t *testing.T) { + fs := cmd.FlagSet() + require.NoError(t, fs.Parse(tc.args)) + require.ElementsMatch(t, tc.exp, []interface{}{cmd.delimiter}) + }) + } +} + +func TestListUntrackedRepositories_Exec(t *testing.T) { + t.Parallel() + g1Cfg := testcfg.Build(t, testcfg.WithStorages("gitaly-1")) + g2Cfg := testcfg.Build(t, testcfg.WithStorages("gitaly-2")) + + // Repositories not managed by praefect. + repo1, _ := gittest.InitRepo(t, g1Cfg, g1Cfg.Storages[0]) + repo2, _ := gittest.InitRepo(t, g1Cfg, g1Cfg.Storages[0]) + repo3, _ := gittest.InitRepo(t, g2Cfg, g2Cfg.Storages[0]) + + g1Addr := testserver.RunGitalyServer(t, g1Cfg, nil, setup.RegisterAll, testserver.WithDisablePraefect()) + g2Addr := testserver.RunGitalyServer(t, g2Cfg, nil, setup.RegisterAll, testserver.WithDisablePraefect()) + + db := glsql.NewDB(t) + conf := config.Config{ + SocketPath: testhelper.GetTemporaryGitalySocketFileName(t), + VirtualStorages: []*config.VirtualStorage{ + { + Name: "praefect", + Nodes: []*config.Node{ + {Storage: g1Cfg.Storages[0].Name, Address: g1Addr}, + {Storage: g2Cfg.Storages[0].Name, Address: g2Addr}, + }, + }, + }, + DB: glsql.GetDBConfig(t, db.Name), + } + + starterConfigs, err := getStarterConfigs(conf) + require.NoError(t, err) + bootstrapper := bootstrap.NewNoop() + go func() { + assert.NoError(t, run(starterConfigs, conf, bootstrapper, prometheus.NewRegistry(), prometheus.NewRegistry())) + }() + + cc, err := client.Dial("unix://"+conf.SocketPath, nil) + require.NoError(t, err) + defer func() { require.NoError(t, cc.Close()) }() + repoClient := gitalypb.NewRepositoryServiceClient(cc) + + ctx, cancel := testhelper.Context() + defer cancel() + + praefectStorage := conf.VirtualStorages[0].Name + + // Repository managed by praefect, exists on gitaly-1 and gitaly-2. + createRepo(t, ctx, repoClient, praefectStorage, "@hashed/path/to/test/repo", db.DB) + out := &bytes.Buffer{} + cmd := newListUntrackedRepositories(testhelper.NewTestLogger(t), out) + require.NoError(t, cmd.Exec(flag.NewFlagSet("", flag.PanicOnError), conf)) + + exp := []string{ + fmt.Sprintf(`{"relative_path":%q,"storage":"gitaly-1","virtual_storage":"praefect"}`, repo1.RelativePath), + fmt.Sprintf(`{"relative_path":%q,"storage":"gitaly-1","virtual_storage":"praefect"}`, repo2.RelativePath), + fmt.Sprintf(`{"relative_path":%q,"storage":"gitaly-2","virtual_storage":"praefect"}`, repo3.RelativePath), + "", // an empty extra element required as each line ends with "delimiter" and strings.Split returns all parts + } + require.ElementsMatch(t, exp, strings.Split(out.String(), "\n")) + + bootstrapper.Terminate() +} + +func createRepo(t *testing.T, ctx context.Context, repoClient gitalypb.RepositoryServiceClient, storageName, relativePath string, db *sql.DB) *gitalypb.Repository { + t.Helper() + repo := &gitalypb.Repository{ + StorageName: storageName, + RelativePath: relativePath, + } + for i := 0; true; i++ { + _, err := repoClient.CreateRepository(ctx, &gitalypb.CreateRepositoryRequest{Repository: repo}) + if err != nil { + require.Regexp(t, "(no healthy nodes)|(no such file or directory)|(connection refused)", err.Error()) + require.Less(t, i, 100, "praefect doesn't serve for too long") + time.Sleep(50 * time.Millisecond) + } else { + break + } + } + + return repo +} diff --git a/cmd/praefect/subcmd_pingnodes.go b/cmd/praefect/subcmd_pingnodes.go index 0b0873484..4bdf0a76e 100644 --- a/cmd/praefect/subcmd_pingnodes.go +++ b/cmd/praefect/subcmd_pingnodes.go @@ -88,7 +88,7 @@ func (s *dialNodesSubcommand) Exec(flags *flag.FlagSet, conf config.Config) erro } func (npr *nodePing) dial() (*grpc.ClientConn, error) { - return subCmdDial(npr.address, npr.token) + return subCmdDial(context.Background(), npr.address, npr.token, defaultDialTimeout) } func (npr *nodePing) healthCheck(cc *grpc.ClientConn) (grpc_health_v1.HealthCheckResponse_ServingStatus, error) { diff --git a/cmd/praefect/subcmd_remove_repository.go b/cmd/praefect/subcmd_remove_repository.go index 016a10daa..1a2034221 100644 --- a/cmd/praefect/subcmd_remove_repository.go +++ b/cmd/praefect/subcmd_remove_repository.go @@ -28,10 +28,11 @@ type removeRepository struct { logger logrus.FieldLogger virtualStorage string relativePath string + timeout time.Duration } func newRemoveRepository(logger logrus.FieldLogger) *removeRepository { - return &removeRepository{logger: logger} + return &removeRepository{logger: logger, timeout: defaultDialTimeout} } func (cmd *removeRepository) FlagSet() *flag.FlagSet { @@ -136,7 +137,7 @@ func (cmd *removeRepository) removeRepositoryFromDatabase(ctx context.Context, d } func (cmd *removeRepository) removeRepository(ctx context.Context, repo *gitalypb.Repository, addr, token string) (bool, error) { - conn, err := subCmdDial(addr, token) + conn, err := subCmdDial(ctx, addr, token, cmd.timeout) if err != nil { return false, fmt.Errorf("error dialing: %w", err) } diff --git a/cmd/praefect/subcmd_remove_repository_test.go b/cmd/praefect/subcmd_remove_repository_test.go index bb56dee3c..2b7dbf590 100644 --- a/cmd/praefect/subcmd_remove_repository_test.go +++ b/cmd/praefect/subcmd_remove_repository_test.go @@ -67,9 +67,7 @@ func TestRemoveRepository_Exec_invalidArgs(t *testing.T) { t.Run("praefect address is not set in config ", func(t *testing.T) { cmd := removeRepository{virtualStorage: "stub", relativePath: "stub", logger: testhelper.NewTestLogger(t)} db := glsql.NewDB(t) - var database string - require.NoError(t, db.QueryRow(`SELECT current_database()`).Scan(&database)) - dbConf := glsql.GetDBConfig(t, database) + dbConf := glsql.GetDBConfig(t, db.Name) err := cmd.Exec(flag.NewFlagSet("", flag.PanicOnError), config.Config{DB: dbConf}) require.EqualError(t, err, "get praefect address from config: no Praefect address configured") }) @@ -84,9 +82,7 @@ func TestRemoveRepository_Exec(t *testing.T) { g2Srv := testserver.StartGitalyServer(t, g2Cfg, nil, setup.RegisterAll, testserver.WithDisablePraefect()) db := glsql.NewDB(t) - var database string - require.NoError(t, db.QueryRow(`SELECT current_database()`).Scan(&database)) - dbConf := glsql.GetDBConfig(t, database) + dbConf := glsql.GetDBConfig(t, db.Name) conf := config.Config{ SocketPath: testhelper.GetTemporaryGitalySocketFileName(t), @@ -108,10 +104,8 @@ func TestRemoveRepository_Exec(t *testing.T) { starterConfigs, err := getStarterConfigs(conf) require.NoError(t, err) - stopped := make(chan struct{}) bootstrapper := bootstrap.NewNoop() go func() { - defer close(stopped) assert.NoError(t, run(starterConfigs, conf, bootstrapper, prometheus.NewRegistry(), prometheus.NewRegistry())) }() @@ -123,33 +117,35 @@ func TestRemoveRepository_Exec(t *testing.T) { ctx, cancel := testhelper.Context() defer cancel() - createRepo := func(t *testing.T, storageName, relativePath string) *gitalypb.Repository { - t.Helper() - repo := &gitalypb.Repository{ - StorageName: storageName, - RelativePath: relativePath, - } - for i := 0; true; i++ { - _, err := repoClient.CreateRepository(ctx, &gitalypb.CreateRepositoryRequest{Repository: repo}) - if err != nil { - require.Regexp(t, "(no healthy nodes)|(no such file or directory)|(connection refused)", err.Error()) - require.Less(t, i, 100, "praefect doesn't serve for too long") - time.Sleep(50 * time.Millisecond) - } else { - break - } - } - return repo - } + //createRepo := func(t *testing.T, storageName, relativePath string) *gitalypb.Repository { + // t.Helper() + // repo := &gitalypb.Repository{ + // StorageName: storageName, + // RelativePath: relativePath, + // } + // for i := 0; true; i++ { + // _, err := repoClient.CreateRepository(ctx, &gitalypb.CreateRepositoryRequest{Repository: repo}) + // if err != nil { + // require.Regexp(t, "(no healthy nodes)|(no such file or directory)|(connection refused)", err.Error()) + // require.Less(t, i, 100, "praefect doesn't serve for too long") + // time.Sleep(50 * time.Millisecond) + // } else { + // break + // } + // } + // + // return repo + //} praefectStorage := conf.VirtualStorages[0].Name t.Run("ok", func(t *testing.T) { - repo := createRepo(t, praefectStorage, "path/to/test/repo") + repo := createRepo(t, ctx, repoClient, praefectStorage, "path/to/test/repo", db.DB) cmd := &removeRepository{ logger: testhelper.NewTestLogger(t), virtualStorage: repo.StorageName, relativePath: repo.RelativePath, + timeout: defaultDialTimeout, } require.NoError(t, cmd.Exec(flag.NewFlagSet("", flag.PanicOnError), conf)) @@ -165,7 +161,7 @@ func TestRemoveRepository_Exec(t *testing.T) { }) t.Run("no info about repository on praefect", func(t *testing.T) { - repo := createRepo(t, praefectStorage, "path/to/test/repo") + repo := createRepo(t, ctx, repoClient, praefectStorage, "path/to/test/repo", db.DB) repoStore := datastore.NewPostgresRepositoryStore(db.DB, nil) require.NoError(t, repoStore.DeleteRepository( ctx, repo.StorageName, repo.RelativePath, []string{g1Cfg.Storages[0].Name, g2Cfg.Storages[0].Name}, @@ -177,6 +173,7 @@ func TestRemoveRepository_Exec(t *testing.T) { logger: logrus.NewEntry(logger), virtualStorage: praefectStorage, relativePath: repo.RelativePath, + timeout: defaultDialTimeout, } require.NoError(t, cmd.Exec(flag.NewFlagSet("", flag.PanicOnError), conf)) var found bool @@ -194,7 +191,7 @@ func TestRemoveRepository_Exec(t *testing.T) { }) t.Run("one of gitalies is out of service", func(t *testing.T) { - repo := createRepo(t, praefectStorage, "path/to/test/repo") + repo := createRepo(t, ctx, repoClient, praefectStorage, "path/to/test/repo", db.DB) g2Srv.Shutdown() logger := testhelper.NewTestLogger(t) @@ -203,6 +200,7 @@ func TestRemoveRepository_Exec(t *testing.T) { logger: logrus.NewEntry(logger), virtualStorage: praefectStorage, relativePath: repo.RelativePath, + timeout: time.Second, } for { @@ -231,7 +229,6 @@ func TestRemoveRepository_Exec(t *testing.T) { }) bootstrapper.Terminate() - <-stopped } func requireNoDatabaseInfo(t *testing.T, db glsql.DB, cmd *removeRepository) { diff --git a/cmd/praefect/subcmd_set_replication_factor.go b/cmd/praefect/subcmd_set_replication_factor.go index ff2cc4562..60c29203e 100644 --- a/cmd/praefect/subcmd_set_replication_factor.go +++ b/cmd/praefect/subcmd_set_replication_factor.go @@ -48,7 +48,7 @@ func (cmd *setReplicationFactorSubcommand) Exec(flags *flag.FlagSet, cfg config. return err } - conn, err := subCmdDial(nodeAddr, cfg.Auth.Token) + conn, err := subCmdDial(context.Background(), nodeAddr, cfg.Auth.Token, defaultDialTimeout) if err != nil { return fmt.Errorf("error dialing: %w", err) } diff --git a/cmd/praefect/subcmd_track_repository.go b/cmd/praefect/subcmd_track_repository.go new file mode 100644 index 000000000..1f27b7646 --- /dev/null +++ b/cmd/praefect/subcmd_track_repository.go @@ -0,0 +1,236 @@ +package main + +import ( + "context" + "database/sql" + "errors" + "flag" + "fmt" + "math/rand" + "time" + + "github.com/sirupsen/logrus" + "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/commonerr" + "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/config" + "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore" + "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore/glsql" + "gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb" + "gitlab.com/gitlab-org/labkit/correlation" + "google.golang.org/grpc/metadata" +) + +const ( + trackRepositoryCmdName = "track-repository" +) + +type trackRepository struct { + logger logrus.FieldLogger + virtualStorage string + relativePath string + authoritativeStorage string +} + +var errAuthoritativeRepositoryNotExist = errors.New("authoritative repository does not exist") + +func newTrackRepository(logger logrus.FieldLogger) *trackRepository { + return &trackRepository{logger: logger} +} + +func (cmd *trackRepository) FlagSet() *flag.FlagSet { + fs := flag.NewFlagSet(trackRepositoryCmdName, flag.ExitOnError) + fs.StringVar(&cmd.virtualStorage, paramVirtualStorage, "", "name of the repository's virtual storage") + fs.StringVar(&cmd.relativePath, paramRelativePath, "", "relative path to the repository") + fs.StringVar(&cmd.authoritativeStorage, paramAuthoritativeStorage, "", "storage with the repository to consider as authoritative") + fs.Usage = func() { + _, _ = printfErr("Description:\n" + + " This command adds a given repository to be tracked by Praefect.\n" + + " It checks if the repository exists on disk on the authoritative storage, " + + " and whether database records are absent from tracking the repository.") + fs.PrintDefaults() + } + return fs +} + +func (cmd trackRepository) Exec(flags *flag.FlagSet, cfg config.Config) error { + switch { + case flags.NArg() > 0: + return unexpectedPositionalArgsError{Command: flags.Name()} + case cmd.virtualStorage == "": + return requiredParameterError(paramVirtualStorage) + case cmd.relativePath == "": + return requiredParameterError(paramRelativePath) + case cmd.authoritativeStorage == "": + if cfg.Failover.ElectionStrategy == config.ElectionStrategyPerRepository { + return requiredParameterError(paramAuthoritativeStorage) + } + } + + db, err := glsql.OpenDB(cfg.DB) + if err != nil { + return fmt.Errorf("connect to database: %w", err) + } + defer func() { _ = db.Close() }() + + ctx := correlation.ContextWithCorrelation(context.Background(), correlation.SafeRandomID()) + logger := cmd.logger.WithField("correlation_id", correlation.ExtractFromContext(ctx)) + + return cmd.exec(ctx, logger, db, cfg) +} + +const trackRepoErrorPrefix = "attempting to track repository in praefect database" + +func (cmd *trackRepository) exec(ctx context.Context, logger logrus.FieldLogger, db *sql.DB, cfg config.Config) error { + logger.WithFields(logrus.Fields{ + "virtual_storage": cmd.virtualStorage, + "relative_path": cmd.relativePath, + "authoritative_storage": cmd.authoritativeStorage, + }).Debug("track repository") + + var primary string + var secondaries []string + var variableReplicationFactorEnabled, savePrimary bool + if cfg.Failover.ElectionStrategy == config.ElectionStrategyPerRepository { + savePrimary = true + primary = cmd.authoritativeStorage + + for _, vs := range cfg.VirtualStorages { + if vs.Name == cmd.virtualStorage { + for _, node := range vs.Nodes { + if node.Storage == cmd.authoritativeStorage { + continue + } + secondaries = append(secondaries, node.Storage) + } + } + } + + r := rand.New(rand.NewSource(time.Now().UnixNano())) + replicationFactor := cfg.DefaultReplicationFactors()[cmd.virtualStorage] + + if replicationFactor > 0 { + variableReplicationFactorEnabled = true + // Select random secondaries according to the default replication factor. + r.Shuffle(len(secondaries), func(i, j int) { + secondaries[i], secondaries[j] = secondaries[j], secondaries[i] + }) + + secondaries = secondaries[:replicationFactor-1] + } + } else { + savePrimary = false + if err := db.QueryRowContext(ctx, `SELECT node_name FROM shard_primaries WHERE shard_name = $1 AND demoted = 'false'`, cmd.virtualStorage).Scan(&primary); err != nil { + if errors.Is(err, sql.ErrNoRows) { + return fmt.Errorf("%s: no primaries found", trackRepoErrorPrefix) + } + return fmt.Errorf("%s: %w", trackRepoErrorPrefix, err) + } + } + + authoritativeRepoExists, err := cmd.authoritativeRepositoryExists(ctx, cfg, primary) + if err != nil { + return fmt.Errorf("%s: %w", trackRepoErrorPrefix, err) + } + + if !authoritativeRepoExists { + return fmt.Errorf("%s: %w", trackRepoErrorPrefix, errAuthoritativeRepositoryNotExist) + } + + if err := cmd.trackRepository( + ctx, + datastore.NewPostgresRepositoryStore(db, cfg.StorageNames()), + primary, + secondaries, + savePrimary, + variableReplicationFactorEnabled, + ); err != nil { + return fmt.Errorf("%s: %w", trackRepoErrorPrefix, err) + } + + logger.Debug("finished adding new repository to be tracked in praefect database.") + + return nil +} + +func (cmd *trackRepository) trackRepository( + ctx context.Context, + ds *datastore.PostgresRepositoryStore, + primary string, + secondaries []string, + savePrimary bool, + variableReplicationFactorEnabled bool, +) error { + repositoryID, err := ds.ReserveRepositoryID(ctx, cmd.virtualStorage, cmd.relativePath) + if err != nil { + if errors.Is(err, commonerr.ErrRepositoryAlreadyExists) { + cmd.logger.Print("repository is already tracked in praefect database") + return nil + } + + return fmt.Errorf("ReserveRepositoryID: %w", err) + } + + if err := ds.CreateRepository( + ctx, + repositoryID, + cmd.virtualStorage, + cmd.relativePath, + primary, + nil, + secondaries, + savePrimary, + variableReplicationFactorEnabled, + ); err != nil { + var repoExistsError datastore.RepositoryExistsError + if errors.As(err, &repoExistsError) { + cmd.logger.Print("repository is already tracked in praefect database") + return nil + } + + return fmt.Errorf("CreateRepository: %w", err) + } + + return nil +} + +func (cmd *trackRepository) repositoryExists(ctx context.Context, repo *gitalypb.Repository, addr, token string) (bool, error) { + conn, err := subCmdDial(ctx, addr, token, defaultDialTimeout) + if err != nil { + return false, fmt.Errorf("error dialing: %w", err) + } + defer func() { _ = conn.Close() }() + + ctx = metadata.AppendToOutgoingContext(ctx, "client_name", trackRepositoryCmdName) + repositoryClient := gitalypb.NewRepositoryServiceClient(conn) + res, err := repositoryClient.RepositoryExists(ctx, &gitalypb.RepositoryExistsRequest{Repository: repo}) + if err != nil { + return false, err + } + + return res.GetExists(), nil +} + +func (cmd *trackRepository) authoritativeRepositoryExists(ctx context.Context, cfg config.Config, nodeName string) (bool, error) { + for _, vs := range cfg.VirtualStorages { + if vs.Name != cmd.virtualStorage { + continue + } + + for _, node := range vs.Nodes { + if node.Storage == nodeName { + logger.Debugf("check if repository %q exists on gitaly %q at %q", cmd.relativePath, node.Storage, node.Address) + repo := &gitalypb.Repository{ + StorageName: node.Storage, + RelativePath: cmd.relativePath, + } + exists, err := cmd.repositoryExists(ctx, repo, node.Address, node.Token) + if err != nil { + logger.WithError(err).Warnf("checking if repository exists %q, %q", node.Storage, cmd.relativePath) + return false, nil + } + return exists, nil + } + } + return false, fmt.Errorf("node %q not found", cmd.authoritativeStorage) + } + return false, fmt.Errorf("virtual storage %q not found", cmd.virtualStorage) +} diff --git a/cmd/praefect/subcmd_track_repository_test.go b/cmd/praefect/subcmd_track_repository_test.go new file mode 100644 index 000000000..619c57778 --- /dev/null +++ b/cmd/praefect/subcmd_track_repository_test.go @@ -0,0 +1,231 @@ +package main + +import ( + "errors" + "flag" + "fmt" + "path/filepath" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "gitlab.com/gitlab-org/gitaly/v14/client" + "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/service/setup" + "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/config" + "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore" + "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore/glsql" + "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/nodes" + "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/protoregistry" + "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper" + "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/promtest" + "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testcfg" + "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testserver" + "gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb" +) + +func TestTrackRepository_FlagSet(t *testing.T) { + t.Parallel() + cmd := &trackRepository{} + fs := cmd.FlagSet() + require.NoError(t, fs.Parse([]string{"--virtual-storage", "vs", "--repository", "repo", "--authoritative-storage", "storage-0"})) + require.Equal(t, "vs", cmd.virtualStorage) + require.Equal(t, "repo", cmd.relativePath) + require.Equal(t, "storage-0", cmd.authoritativeStorage) +} + +func TestTrackRepository_Exec_invalidArgs(t *testing.T) { + t.Parallel() + t.Run("not all flag values processed", func(t *testing.T) { + cmd := trackRepository{} + flagSet := flag.NewFlagSet("cmd", flag.PanicOnError) + require.NoError(t, flagSet.Parse([]string{"stub"})) + err := cmd.Exec(flagSet, config.Config{}) + require.EqualError(t, err, "cmd doesn't accept positional arguments") + }) + + t.Run("virtual-storage is not set", func(t *testing.T) { + cmd := trackRepository{} + err := cmd.Exec(flag.NewFlagSet("", flag.PanicOnError), config.Config{}) + require.EqualError(t, err, `"virtual-storage" is a required parameter`) + }) + + t.Run("repository is not set", func(t *testing.T) { + cmd := trackRepository{virtualStorage: "stub"} + err := cmd.Exec(flag.NewFlagSet("", flag.PanicOnError), config.Config{}) + require.EqualError(t, err, `"repository" is a required parameter`) + }) + + t.Run("authoritative-storage is not set", func(t *testing.T) { + cmd := trackRepository{virtualStorage: "stub", relativePath: "path/to/repo"} + err := cmd.Exec(flag.NewFlagSet("", flag.PanicOnError), config.Config{Failover: config.Failover{ElectionStrategy: config.ElectionStrategyPerRepository}}) + require.EqualError(t, err, `"authoritative-storage" is a required parameter`) + }) + + t.Run("db connection error", func(t *testing.T) { + cmd := trackRepository{virtualStorage: "stub", relativePath: "stub", authoritativeStorage: "storage-0"} + cfg := config.Config{DB: config.DB{Host: "stub", SSLMode: "disable"}} + err := cmd.Exec(flag.NewFlagSet("", flag.PanicOnError), cfg) + require.Error(t, err) + require.Contains(t, err.Error(), "connect to database: dial tcp: lookup stub") + }) +} + +func TestTrackRepository_Exec(t *testing.T) { + t.Parallel() + g1Cfg := testcfg.Build(t, testcfg.WithStorages("gitaly-1")) + g2Cfg := testcfg.Build(t, testcfg.WithStorages("gitaly-2")) + + g1Srv := testserver.StartGitalyServer(t, g1Cfg, nil, setup.RegisterAll, testserver.WithDisablePraefect()) + g2Srv := testserver.StartGitalyServer(t, g2Cfg, nil, setup.RegisterAll, testserver.WithDisablePraefect()) + defer g2Srv.Shutdown() + defer g1Srv.Shutdown() + + g1Addr := g1Srv.Address() + + db := glsql.NewDB(t) + dbConf := glsql.GetDBConfig(t, db.Name) + + virtualStorageName := "praefect" + conf := config.Config{ + SocketPath: testhelper.GetTemporaryGitalySocketFileName(t), + VirtualStorages: []*config.VirtualStorage{ + { + Name: virtualStorageName, + Nodes: []*config.Node{ + {Storage: g1Cfg.Storages[0].Name, Address: g1Addr}, + {Storage: g2Cfg.Storages[0].Name, Address: g2Srv.Address()}, + }, + DefaultReplicationFactor: 2, + }, + }, + DB: dbConf, + } + + gitalyCC, err := client.Dial(g1Addr, nil) + require.NoError(t, err) + defer func() { require.NoError(t, gitalyCC.Close()) }() + ctx, cancel := testhelper.Context() + defer cancel() + + gitaly1RepositoryClient := gitalypb.NewRepositoryServiceClient(gitalyCC) + + createRepoThroughGitaly1 := func(relativePath string) error { + _, err := gitaly1RepositoryClient.CreateRepository( + ctx, + &gitalypb.CreateRepositoryRequest{ + Repository: &gitalypb.Repository{ + StorageName: g1Cfg.Storages[0].Name, + RelativePath: relativePath, + }, + }) + return err + } + + testCases := map[string]struct { + failoverConfig config.Failover + authoritativeStorage string + }{ + "sql election": { + failoverConfig: config.Failover{ + Enabled: true, + ElectionStrategy: config.ElectionStrategySQL, + }, + authoritativeStorage: "", + }, + "per repository election": { + failoverConfig: config.Failover{ + Enabled: true, + ElectionStrategy: config.ElectionStrategyPerRepository, + }, + authoritativeStorage: g1Cfg.Storages[0].Name, + }, + } + + logger := testhelper.NewTestLogger(t) + for tn, tc := range testCases { + t.Run(tn, func(t *testing.T) { + addCmdConf := conf + addCmdConf.Failover = tc.failoverConfig + + t.Run("ok", func(t *testing.T) { + nodeMgr, err := nodes.NewManager(testhelper.DiscardTestEntry(t), addCmdConf, db.DB, nil, promtest.NewMockHistogramVec(), protoregistry.GitalyProtoPreregistered, nil, nil) + require.NoError(t, err) + nodeMgr.Start(0, time.Hour) + + relativePath := fmt.Sprintf("path/to/test/repo_%s", tn) + repoDS := datastore.NewPostgresRepositoryStore(db.DB, conf.StorageNames()) + + require.NoError(t, createRepoThroughGitaly1(relativePath)) + + rmRepoCmd := &removeRepository{ + logger: logger, + virtualStorage: virtualStorageName, + relativePath: relativePath, + } + + require.NoError(t, rmRepoCmd.Exec(flag.NewFlagSet("", flag.PanicOnError), conf)) + + // create the repo on Gitaly without Praefect knowing + require.NoError(t, createRepoThroughGitaly1(relativePath)) + require.DirExists(t, filepath.Join(g1Cfg.Storages[0].Path, relativePath)) + require.NoDirExists(t, filepath.Join(g2Cfg.Storages[0].Path, relativePath)) + + addRepoCmd := &trackRepository{ + logger: logger, + virtualStorage: virtualStorageName, + relativePath: relativePath, + authoritativeStorage: tc.authoritativeStorage, + } + + require.NoError(t, addRepoCmd.Exec(flag.NewFlagSet("", flag.PanicOnError), addCmdConf)) + as := datastore.NewAssignmentStore(db.DB, conf.StorageNames()) + + assignments, err := as.GetHostAssignments(ctx, virtualStorageName, relativePath) + require.NoError(t, err) + require.Len(t, assignments, 2) + assert.Contains(t, assignments, g1Cfg.Storages[0].Name) + assert.Contains(t, assignments, g2Cfg.Storages[0].Name) + + exists, err := repoDS.RepositoryExists(ctx, virtualStorageName, relativePath) + require.NoError(t, err) + assert.True(t, exists) + }) + + t.Run("repository does not exist", func(t *testing.T) { + relativePath := fmt.Sprintf("path/to/test/repo_1_%s", tn) + + cmd := &trackRepository{ + logger: testhelper.NewTestLogger(t), + virtualStorage: "praefect", + relativePath: relativePath, + authoritativeStorage: tc.authoritativeStorage, + } + + assert.True(t, errors.Is(cmd.Exec(flag.NewFlagSet("", flag.PanicOnError), addCmdConf), errAuthoritativeRepositoryNotExist)) + }) + + t.Run("records already exist", func(t *testing.T) { + relativePath := fmt.Sprintf("path/to/test/repo_2_%s", tn) + + require.NoError(t, createRepoThroughGitaly1(relativePath)) + require.DirExists(t, filepath.Join(g1Cfg.Storages[0].Path, relativePath)) + require.NoDirExists(t, filepath.Join(g2Cfg.Storages[0].Path, relativePath)) + + ds := datastore.NewPostgresRepositoryStore(db.DB, conf.StorageNames()) + id, err := ds.ReserveRepositoryID(ctx, virtualStorageName, relativePath) + require.NoError(t, err) + require.NoError(t, ds.CreateRepository(ctx, id, virtualStorageName, relativePath, g1Cfg.Storages[0].Name, nil, nil, true, true)) + + cmd := &trackRepository{ + logger: testhelper.NewTestLogger(t), + virtualStorage: virtualStorageName, + relativePath: relativePath, + authoritativeStorage: tc.authoritativeStorage, + } + + assert.NoError(t, cmd.Exec(flag.NewFlagSet("", flag.PanicOnError), addCmdConf)) + }) + }) + } +} diff --git a/internal/praefect/datastore/storage_cleanup.go b/internal/praefect/datastore/storage_cleanup.go new file mode 100644 index 000000000..455ddef2b --- /dev/null +++ b/internal/praefect/datastore/storage_cleanup.go @@ -0,0 +1,85 @@ +package datastore + +import ( + "context" + "database/sql" + "fmt" + + "github.com/lib/pq" +) + +// RepositoryClusterPath identifies location of the repository in the cluster. +type RepositoryClusterPath struct { + ClusterPath + // RelativePath relative path to the repository on the disk. + RelativePath string +} + +// NewRepositoryClusterPath initializes and returns RepositoryClusterPath. +func NewRepositoryClusterPath(virtualStorage, storage, relativePath string) RepositoryClusterPath { + return RepositoryClusterPath{ + ClusterPath: ClusterPath{ + VirtualStorage: virtualStorage, + Storage: storage, + }, + RelativePath: relativePath, + } +} + +// ClusterPath represents path on the cluster to the storage. +type ClusterPath struct { + // VirtualStorage is the name of the virtual storage. + VirtualStorage string + // Storage is the name of the gitaly storage. + Storage string +} + +// NewStorageCleanup initialises and returns a new instance of the StorageCleanup. +func NewStorageCleanup(db *sql.DB) *StorageCleanup { + return &StorageCleanup{db: db} +} + +// StorageCleanup provides methods on the database for the repository cleanup operation. +type StorageCleanup struct { + db *sql.DB +} + +// DoesntExist returns RepositoryClusterPath for each repository that doesn't exist in the database +// by querying repositories and storage_repositories tables. +func (ss *StorageCleanup) DoesntExist(ctx context.Context, virtualStorage, storage string, relativePath []string) ([]RepositoryClusterPath, error) { + if len(relativePath) == 0 { + return nil, nil + } + + rows, err := ss.db.QueryContext( + ctx, + `SELECT $1 AS virtual_storage, $2 AS storage, UNNEST($3::TEXT[]) AS relative_path + EXCEPT ( + SELECT virtual_storage, storage, relative_path + FROM repositories + JOIN storage_repositories USING (virtual_storage, relative_path) + WHERE virtual_storage = $1 AND storage = $2 AND relative_path = ANY($3) + )`, + virtualStorage, storage, pq.StringArray(relativePath), + ) + if err != nil { + return nil, fmt.Errorf("query: %w", err) + } + defer func() { _ = rows.Close() }() + + var res []RepositoryClusterPath + for rows.Next() { + var curr RepositoryClusterPath + if err := rows.Scan(&curr.VirtualStorage, &curr.Storage, &curr.RelativePath); err != nil { + return nil, fmt.Errorf("scan: %w", err) + } + res = append(res, curr) + } + if err := rows.Err(); err != nil { + return nil, fmt.Errorf("loop: %w", err) + } + if err := rows.Close(); err != nil { + return nil, fmt.Errorf("close: %w", err) + } + return res, nil +} diff --git a/internal/praefect/repocleaner/init_test.go b/internal/praefect/repocleaner/init_test.go new file mode 100644 index 000000000..0b99e6d32 --- /dev/null +++ b/internal/praefect/repocleaner/init_test.go @@ -0,0 +1,21 @@ +package repocleaner + +import ( + "os" + "testing" + + "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper" +) + +func TestMain(m *testing.M) { + os.Exit(testMain(m)) +} + +func testMain(m *testing.M) int { + defer testhelper.MustHaveNoChildProcess() + + cleanup := testhelper.Configure() + defer cleanup() + + return m.Run() +} diff --git a/internal/praefect/repocleaner/repository.go b/internal/praefect/repocleaner/repository.go new file mode 100644 index 000000000..86ea69bd1 --- /dev/null +++ b/internal/praefect/repocleaner/repository.go @@ -0,0 +1,77 @@ +package repocleaner + +import ( + "context" + "errors" + "fmt" + "io" + + "gitlab.com/gitlab-org/gitaly/v14/internal/praefect" + "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore" + "gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb" +) + +// Walker allows iterating over the repositories of the gitaly storage. +type Walker struct { + conns praefect.Connections + batchSize int +} + +// NewWalker returns a new *Walker instance. +func NewWalker(conns praefect.Connections, batchSize int) *Walker { + return &Walker{conns: conns, batchSize: batchSize} +} + +// ExecOnRepositories runs through all the repositories on a Gitaly storage and executes the provided action. +// The processing is done in batches to reduce cost of operations. +func (wr *Walker) ExecOnRepositories(ctx context.Context, virtualStorage, storage string, action func([]datastore.RepositoryClusterPath) error) error { + gclient, err := wr.getInternalGitalyClient(virtualStorage, storage) + if err != nil { + return fmt.Errorf("setup gitaly client: %w", err) + } + + resp, err := gclient.WalkRepos(ctx, &gitalypb.WalkReposRequest{StorageName: storage}) + if err != nil { + return fmt.Errorf("unable to walk repos: %w", err) + } + + batch := make([]datastore.RepositoryClusterPath, 0, wr.batchSize) + for { + res, err := resp.Recv() + if err != nil { + if !errors.Is(err, io.EOF) { + return fmt.Errorf("failure on walking repos: %w", err) + } + break + } + + batch = append(batch, datastore.RepositoryClusterPath{ + ClusterPath: datastore.ClusterPath{ + VirtualStorage: virtualStorage, + Storage: storage, + }, + RelativePath: res.RelativePath, + }) + + if len(batch) == cap(batch) { + if err := action(batch); err != nil { + return err + } + batch = batch[:0] + } + } + if len(batch) > 0 { + if err := action(batch); err != nil { + return err + } + } + return nil +} + +func (wr *Walker) getInternalGitalyClient(virtualStorage, storage string) (gitalypb.InternalGitalyClient, error) { + conn, found := wr.conns[virtualStorage][storage] + if !found { + return nil, fmt.Errorf("no connection to the gitaly node %q/%q", virtualStorage, storage) + } + return gitalypb.NewInternalGitalyClient(conn), nil +} diff --git a/internal/praefect/repocleaner/repository_test.go b/internal/praefect/repocleaner/repository_test.go new file mode 100644 index 000000000..6d8b7bd19 --- /dev/null +++ b/internal/praefect/repocleaner/repository_test.go @@ -0,0 +1,114 @@ +package repocleaner + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "gitlab.com/gitlab-org/gitaly/v14/internal/backchannel" + "gitlab.com/gitlab-org/gitaly/v14/internal/git/gittest" + "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/service/setup" + "gitlab.com/gitlab-org/gitaly/v14/internal/praefect" + "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/config" + "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore" + "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/protoregistry" + "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/service/transaction" + "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper" + "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testcfg" + "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testserver" +) + +func TestWalker_ExecOnRepositories(t *testing.T) { + const ( + repo1RelPath = "repo-1.git" + repo2RelPath = "repo-2.git" + repo3RelPath = "repo-3.git" + + storage1 = "gitaly-1" + + virtualStorage = "praefect" + ) + + gCfg := testcfg.Build(t, testcfg.WithStorages(storage1)) + gAddr := testserver.RunGitalyServer(t, gCfg, nil, setup.RegisterAll, testserver.WithDisablePraefect()) + + conf := config.Config{ + SocketPath: testhelper.GetTemporaryGitalySocketFileName(t), + VirtualStorages: []*config.VirtualStorage{ + { + Name: virtualStorage, + Nodes: []*config.Node{ + {Storage: gCfg.Storages[0].Name, Address: gAddr}, + }, + }, + }, + } + + gittest.CloneRepo(t, gCfg, gCfg.Storages[0], gittest.CloneRepoOpts{RelativePath: repo1RelPath}) + gittest.CloneRepo(t, gCfg, gCfg.Storages[0], gittest.CloneRepoOpts{RelativePath: repo2RelPath}) + gittest.CloneRepo(t, gCfg, gCfg.Storages[0], gittest.CloneRepoOpts{RelativePath: repo3RelPath}) + + ctx, cancel := testhelper.Context() + defer cancel() + + entry := testhelper.NewTestLogger(t).WithContext(ctx) + clientHandshaker := backchannel.NewClientHandshaker(entry, praefect.NewBackchannelServerFactory(entry, transaction.NewServer(nil))) + nodeSet, err := praefect.DialNodes(ctx, conf.VirtualStorages, protoregistry.GitalyProtoPreregistered, nil, clientHandshaker) + require.NoError(t, err) + defer nodeSet.Close() + + for _, tc := range []struct { + desc string + batchSize int + exp [][]datastore.RepositoryClusterPath + expErr error + }{ + { + desc: "multiple batches", + batchSize: 2, + exp: [][]datastore.RepositoryClusterPath{ + { + {ClusterPath: datastore.ClusterPath{VirtualStorage: virtualStorage, Storage: storage1}, RelativePath: repo1RelPath}, + {ClusterPath: datastore.ClusterPath{VirtualStorage: virtualStorage, Storage: storage1}, RelativePath: repo2RelPath}, + }, + { + {ClusterPath: datastore.ClusterPath{VirtualStorage: virtualStorage, Storage: storage1}, RelativePath: repo3RelPath}, + }, + }, + }, + { + desc: "single batch", + batchSize: 10, + exp: [][]datastore.RepositoryClusterPath{ + { + {ClusterPath: datastore.ClusterPath{VirtualStorage: virtualStorage, Storage: storage1}, RelativePath: repo1RelPath}, + {ClusterPath: datastore.ClusterPath{VirtualStorage: virtualStorage, Storage: storage1}, RelativePath: repo2RelPath}, + {ClusterPath: datastore.ClusterPath{VirtualStorage: virtualStorage, Storage: storage1}, RelativePath: repo3RelPath}, + }, + }, + }, + { + desc: "terminates on error", + batchSize: 1, + exp: [][]datastore.RepositoryClusterPath{ + { + {ClusterPath: datastore.ClusterPath{VirtualStorage: virtualStorage, Storage: storage1}, RelativePath: repo1RelPath}, + }, + }, + expErr: assert.AnError, + }, + } { + t.Run(tc.desc, func(t *testing.T) { + walker := NewWalker(nodeSet.Connections(), tc.batchSize) + var iteration int + err = walker.ExecOnRepositories(ctx, conf.VirtualStorages[0].Name, storage1, func(paths []datastore.RepositoryClusterPath) error { + require.Less(t, iteration, len(tc.exp)) + expected := tc.exp[iteration] + iteration++ + assert.ElementsMatch(t, paths, expected) + return tc.expErr + }) + require.Equal(t, tc.expErr, err) + }) + } +} |