Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSami Hiltunen <shiltunen@gitlab.com>2021-11-23 00:10:22 +0300
committerSami Hiltunen <shiltunen@gitlab.com>2021-11-23 00:10:22 +0300
commit5b5447202878a42c9afb7b8e844eca73012d7157 (patch)
tree6346dfa76abac389620dfd6645113ddbe9581184
parent650cb6e64c077ac89f8e8c4f175f602b504ef143 (diff)
parent8e6a8c7b7fda4c79392effa05530c9d421487475 (diff)
Merge branch 'ps-backport-sub-cmds-14-3' into '14-3-stable'
Backport praefect sub-commands to 14.3 See merge request gitlab-org/gitaly!4114
-rw-r--r--cmd/praefect/subcmd.go26
-rw-r--r--cmd/praefect/subcmd_accept_dataloss.go2
-rw-r--r--cmd/praefect/subcmd_dataloss.go2
-rw-r--r--cmd/praefect/subcmd_list_untracked_repositories.go172
-rw-r--r--cmd/praefect/subcmd_list_untracked_repositories_test.go135
-rw-r--r--cmd/praefect/subcmd_pingnodes.go2
-rw-r--r--cmd/praefect/subcmd_remove_repository.go5
-rw-r--r--cmd/praefect/subcmd_remove_repository_test.go57
-rw-r--r--cmd/praefect/subcmd_set_replication_factor.go2
-rw-r--r--cmd/praefect/subcmd_track_repository.go236
-rw-r--r--cmd/praefect/subcmd_track_repository_test.go231
-rw-r--r--internal/praefect/datastore/storage_cleanup.go85
-rw-r--r--internal/praefect/repocleaner/init_test.go21
-rw-r--r--internal/praefect/repocleaner/repository.go77
-rw-r--r--internal/praefect/repocleaner/repository_test.go114
15 files changed, 1120 insertions, 47 deletions
diff --git a/cmd/praefect/subcmd.go b/cmd/praefect/subcmd.go
index ca11658fe..a232835db 100644
--- a/cmd/praefect/subcmd.go
+++ b/cmd/praefect/subcmd.go
@@ -23,16 +23,20 @@ type subcmd interface {
Exec(flags *flag.FlagSet, config config.Config) error
}
+const defaultDialTimeout = 30 * time.Second
+
var subcommands = map[string]subcmd{
- "sql-ping": &sqlPingSubcommand{},
- "sql-migrate": &sqlMigrateSubcommand{},
- "dial-nodes": &dialNodesSubcommand{},
- "sql-migrate-down": &sqlMigrateDownSubcommand{},
- "sql-migrate-status": &sqlMigrateStatusSubcommand{},
- "dataloss": newDatalossSubcommand(),
- "accept-dataloss": &acceptDatalossSubcommand{},
- "set-replication-factor": newSetReplicatioFactorSubcommand(os.Stdout),
- removeRepositoryCmdName: newRemoveRepository(logger),
+ "sql-ping": &sqlPingSubcommand{},
+ "sql-migrate": &sqlMigrateSubcommand{},
+ "dial-nodes": &dialNodesSubcommand{},
+ "sql-migrate-down": &sqlMigrateDownSubcommand{},
+ "sql-migrate-status": &sqlMigrateStatusSubcommand{},
+ "dataloss": newDatalossSubcommand(),
+ "accept-dataloss": &acceptDatalossSubcommand{},
+ "set-replication-factor": newSetReplicatioFactorSubcommand(os.Stdout),
+ removeRepositoryCmdName: newRemoveRepository(logger),
+ trackRepositoryCmdName: newTrackRepository(logger),
+ listUntrackedRepositoriesName: newListUntrackedRepositories(logger, os.Stdout),
}
// subCommand returns an exit code, to be fed into os.Exit.
@@ -149,8 +153,8 @@ func printfErr(format string, a ...interface{}) (int, error) {
return fmt.Fprintf(os.Stderr, format, a...)
}
-func subCmdDial(addr, token string, opts ...grpc.DialOption) (*grpc.ClientConn, error) {
- ctx, cancel := context.WithTimeout(context.TODO(), 30*time.Second)
+func subCmdDial(ctx context.Context, addr, token string, timeout time.Duration, opts ...grpc.DialOption) (*grpc.ClientConn, error) {
+ ctx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
opts = append(opts,
diff --git a/cmd/praefect/subcmd_accept_dataloss.go b/cmd/praefect/subcmd_accept_dataloss.go
index 0c4ea015a..e037df424 100644
--- a/cmd/praefect/subcmd_accept_dataloss.go
+++ b/cmd/praefect/subcmd_accept_dataloss.go
@@ -51,7 +51,7 @@ func (cmd *acceptDatalossSubcommand) Exec(flags *flag.FlagSet, cfg config.Config
return err
}
- conn, err := subCmdDial(nodeAddr, cfg.Auth.Token)
+ conn, err := subCmdDial(context.Background(), nodeAddr, cfg.Auth.Token, defaultDialTimeout)
if err != nil {
return fmt.Errorf("error dialing: %w", err)
}
diff --git a/cmd/praefect/subcmd_dataloss.go b/cmd/praefect/subcmd_dataloss.go
index 06b493d71..75a51c266 100644
--- a/cmd/praefect/subcmd_dataloss.go
+++ b/cmd/praefect/subcmd_dataloss.go
@@ -65,7 +65,7 @@ func (cmd *datalossSubcommand) Exec(flags *flag.FlagSet, cfg config.Config) erro
return err
}
- conn, err := subCmdDial(nodeAddr, cfg.Auth.Token)
+ conn, err := subCmdDial(context.Background(), nodeAddr, cfg.Auth.Token, defaultDialTimeout)
if err != nil {
return fmt.Errorf("error dialing: %v", err)
}
diff --git a/cmd/praefect/subcmd_list_untracked_repositories.go b/cmd/praefect/subcmd_list_untracked_repositories.go
new file mode 100644
index 000000000..bff6140a0
--- /dev/null
+++ b/cmd/praefect/subcmd_list_untracked_repositories.go
@@ -0,0 +1,172 @@
+package main
+
+import (
+ "context"
+ "encoding/json"
+ "errors"
+ "flag"
+ "fmt"
+ "io"
+
+ "github.com/sirupsen/logrus"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/praefect"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/config"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore/glsql"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/repocleaner"
+ "gitlab.com/gitlab-org/labkit/correlation"
+ "google.golang.org/grpc/metadata"
+)
+
+const (
+ listUntrackedRepositoriesName = "list-untracked-repositories"
+)
+
+var errNoConnectionToGitalies = errors.New("no connection established to gitaly nodes")
+
+type listUntrackedRepositories struct {
+ logger logrus.FieldLogger
+ delimiter string
+ out io.Writer
+}
+
+func newListUntrackedRepositories(logger logrus.FieldLogger, out io.Writer) *listUntrackedRepositories {
+ return &listUntrackedRepositories{logger: logger, out: out}
+}
+
+func (cmd *listUntrackedRepositories) FlagSet() *flag.FlagSet {
+ fs := flag.NewFlagSet(listUntrackedRepositoriesName, flag.ExitOnError)
+ fs.StringVar(&cmd.delimiter, "delimiter", "\n", "string used as a delimiter in output")
+ fs.Usage = func() {
+ _, _ = printfErr("Description:\n" +
+ " This command checks if all repositories on all gitaly nodes tracked by praefect.\n" +
+ " If repository is found on the disk, but it is not known to praefect the location of\n" +
+ " that repository will be written into stdout stream in JSON format.\n")
+ fs.PrintDefaults()
+ _, _ = printfErr("NOTE:\n" +
+ " All errors and log messages directed to the stderr stream.\n" +
+ " The output is produced as the new data appears, it doesn't wait\n" +
+ " for the completion of the processing to produce the result.\n")
+ }
+ return fs
+}
+
+func (cmd listUntrackedRepositories) Exec(flags *flag.FlagSet, cfg config.Config) error {
+ if flags.NArg() > 0 {
+ return unexpectedPositionalArgsError{Command: flags.Name()}
+ }
+
+ ctx := correlation.ContextWithCorrelation(context.Background(), correlation.SafeRandomID())
+ ctx = metadata.AppendToOutgoingContext(ctx, "client_name", listUntrackedRepositoriesName)
+
+ logger := cmd.logger.WithField("correlation_id", correlation.ExtractFromContext(ctx))
+ logger.Debugf("starting %s command", cmd.FlagSet().Name())
+
+ logger.Debug("dialing to gitaly nodes...")
+ nodeSet, err := dialGitalyStorages(cfg)
+ if err != nil {
+ return fmt.Errorf("dial nodes: %w", err)
+ }
+ defer nodeSet.Close()
+ logger.Debug("connected to gitaly nodes")
+
+ logger.Debug("connecting to praefect database...")
+ db, err := glsql.OpenDB(cfg.DB)
+ if err != nil {
+ return fmt.Errorf("connect to database: %w", err)
+ }
+ defer func() { _ = db.Close() }()
+ logger.Debug("connected to praefect database")
+
+ walker := repocleaner.NewWalker(nodeSet.Connections(), 16)
+ reporter := reportUntrackedRepositories{
+ ctx: ctx,
+ checker: datastore.NewStorageCleanup(db),
+ delimiter: cmd.delimiter,
+ out: cmd.out,
+ }
+ for _, vs := range cfg.VirtualStorages {
+ for _, node := range vs.Nodes {
+ logger.Debugf("check %q/%q storage repositories", vs.Name, node.Storage)
+ if err := walker.ExecOnRepositories(ctx, vs.Name, node.Storage, reporter.Report); err != nil {
+ return fmt.Errorf("exec on %q/%q: %w", vs.Name, node.Storage, err)
+ }
+ }
+ }
+ logger.Debug("completed")
+ return nil
+}
+
+func dialGitalyStorages(cfg config.Config) (praefect.NodeSet, error) {
+ nodeSet := praefect.NodeSet{}
+ for _, vs := range cfg.VirtualStorages {
+ for _, node := range vs.Nodes {
+ conn, err := subCmdDial(context.Background(), node.Address, node.Token, defaultDialTimeout)
+ if err != nil {
+ return nil, fmt.Errorf("dial with %q gitaly at %q", node.Storage, node.Address)
+ }
+ if _, found := nodeSet[vs.Name]; !found {
+ nodeSet[vs.Name] = map[string]praefect.Node{}
+ }
+ nodeSet[vs.Name][node.Storage] = praefect.Node{
+ Storage: node.Storage,
+ Address: node.Address,
+ Token: node.Token,
+ Connection: conn,
+ }
+ }
+ }
+ if len(nodeSet.Connections()) == 0 {
+ return nil, errNoConnectionToGitalies
+ }
+ return nodeSet, nil
+}
+
+type reportUntrackedRepositories struct {
+ ctx context.Context
+ checker *datastore.StorageCleanup
+ out io.Writer
+ delimiter string
+}
+
+// Report method accepts a list of repositories, checks if they exist in the praefect database
+// and writes JSON serialized location of each untracked repository using the configured delimiter
+// and writer.
+func (r *reportUntrackedRepositories) Report(paths []datastore.RepositoryClusterPath) error {
+ if len(paths) == 0 {
+ return nil
+ }
+
+ replicaRelPaths := make([]string, len(paths))
+ for i, path := range paths {
+ replicaRelPaths[i] = path.RelativePath
+ }
+
+ missing, err := r.checker.DoesntExist(r.ctx, paths[0].VirtualStorage, paths[0].Storage, replicaRelPaths)
+ if err != nil {
+ return fmt.Errorf("existence check: %w", err)
+ }
+
+ for _, repoClusterPath := range missing {
+ d, err := serializeRepositoryClusterPath(repoClusterPath)
+ if err != nil {
+ return fmt.Errorf("serialize %+v: %w", repoClusterPath, err)
+ }
+ if _, err := r.out.Write(d); err != nil {
+ return fmt.Errorf("write serialized data to output: %w", err)
+ }
+ if _, err := r.out.Write([]byte(r.delimiter)); err != nil {
+ return fmt.Errorf("write serialized data to output: %w", err)
+ }
+ }
+
+ return nil
+}
+
+func serializeRepositoryClusterPath(path datastore.RepositoryClusterPath) ([]byte, error) {
+ return json.Marshal(map[string]string{
+ "virtual_storage": path.VirtualStorage,
+ "storage": path.Storage,
+ "relative_path": path.RelativePath,
+ })
+}
diff --git a/cmd/praefect/subcmd_list_untracked_repositories_test.go b/cmd/praefect/subcmd_list_untracked_repositories_test.go
new file mode 100644
index 000000000..a24300ebe
--- /dev/null
+++ b/cmd/praefect/subcmd_list_untracked_repositories_test.go
@@ -0,0 +1,135 @@
+package main
+
+import (
+ "bytes"
+ "context"
+ "database/sql"
+ "flag"
+ "fmt"
+ "strings"
+ "testing"
+ "time"
+
+ "github.com/prometheus/client_golang/prometheus"
+ "github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
+ "gitlab.com/gitlab-org/gitaly/v14/client"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/bootstrap"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/git/gittest"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/service/setup"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/config"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore/glsql"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testcfg"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testserver"
+ "gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb"
+)
+
+func TestListUntrackedRepositories_FlagSet(t *testing.T) {
+ t.Parallel()
+ cmd := &listUntrackedRepositories{}
+ for _, tc := range []struct {
+ desc string
+ args []string
+ exp []interface{}
+ }{
+ {
+ desc: "custom value",
+ args: []string{"--delimiter", ","},
+ exp: []interface{}{","},
+ },
+ {
+ desc: "default value",
+ args: nil,
+ exp: []interface{}{"\n"},
+ },
+ } {
+ t.Run(tc.desc, func(t *testing.T) {
+ fs := cmd.FlagSet()
+ require.NoError(t, fs.Parse(tc.args))
+ require.ElementsMatch(t, tc.exp, []interface{}{cmd.delimiter})
+ })
+ }
+}
+
+func TestListUntrackedRepositories_Exec(t *testing.T) {
+ t.Parallel()
+ g1Cfg := testcfg.Build(t, testcfg.WithStorages("gitaly-1"))
+ g2Cfg := testcfg.Build(t, testcfg.WithStorages("gitaly-2"))
+
+ // Repositories not managed by praefect.
+ repo1, _ := gittest.InitRepo(t, g1Cfg, g1Cfg.Storages[0])
+ repo2, _ := gittest.InitRepo(t, g1Cfg, g1Cfg.Storages[0])
+ repo3, _ := gittest.InitRepo(t, g2Cfg, g2Cfg.Storages[0])
+
+ g1Addr := testserver.RunGitalyServer(t, g1Cfg, nil, setup.RegisterAll, testserver.WithDisablePraefect())
+ g2Addr := testserver.RunGitalyServer(t, g2Cfg, nil, setup.RegisterAll, testserver.WithDisablePraefect())
+
+ db := glsql.NewDB(t)
+ conf := config.Config{
+ SocketPath: testhelper.GetTemporaryGitalySocketFileName(t),
+ VirtualStorages: []*config.VirtualStorage{
+ {
+ Name: "praefect",
+ Nodes: []*config.Node{
+ {Storage: g1Cfg.Storages[0].Name, Address: g1Addr},
+ {Storage: g2Cfg.Storages[0].Name, Address: g2Addr},
+ },
+ },
+ },
+ DB: glsql.GetDBConfig(t, db.Name),
+ }
+
+ starterConfigs, err := getStarterConfigs(conf)
+ require.NoError(t, err)
+ bootstrapper := bootstrap.NewNoop()
+ go func() {
+ assert.NoError(t, run(starterConfigs, conf, bootstrapper, prometheus.NewRegistry(), prometheus.NewRegistry()))
+ }()
+
+ cc, err := client.Dial("unix://"+conf.SocketPath, nil)
+ require.NoError(t, err)
+ defer func() { require.NoError(t, cc.Close()) }()
+ repoClient := gitalypb.NewRepositoryServiceClient(cc)
+
+ ctx, cancel := testhelper.Context()
+ defer cancel()
+
+ praefectStorage := conf.VirtualStorages[0].Name
+
+ // Repository managed by praefect, exists on gitaly-1 and gitaly-2.
+ createRepo(t, ctx, repoClient, praefectStorage, "@hashed/path/to/test/repo", db.DB)
+ out := &bytes.Buffer{}
+ cmd := newListUntrackedRepositories(testhelper.NewTestLogger(t), out)
+ require.NoError(t, cmd.Exec(flag.NewFlagSet("", flag.PanicOnError), conf))
+
+ exp := []string{
+ fmt.Sprintf(`{"relative_path":%q,"storage":"gitaly-1","virtual_storage":"praefect"}`, repo1.RelativePath),
+ fmt.Sprintf(`{"relative_path":%q,"storage":"gitaly-1","virtual_storage":"praefect"}`, repo2.RelativePath),
+ fmt.Sprintf(`{"relative_path":%q,"storage":"gitaly-2","virtual_storage":"praefect"}`, repo3.RelativePath),
+ "", // an empty extra element required as each line ends with "delimiter" and strings.Split returns all parts
+ }
+ require.ElementsMatch(t, exp, strings.Split(out.String(), "\n"))
+
+ bootstrapper.Terminate()
+}
+
+func createRepo(t *testing.T, ctx context.Context, repoClient gitalypb.RepositoryServiceClient, storageName, relativePath string, db *sql.DB) *gitalypb.Repository {
+ t.Helper()
+ repo := &gitalypb.Repository{
+ StorageName: storageName,
+ RelativePath: relativePath,
+ }
+ for i := 0; true; i++ {
+ _, err := repoClient.CreateRepository(ctx, &gitalypb.CreateRepositoryRequest{Repository: repo})
+ if err != nil {
+ require.Regexp(t, "(no healthy nodes)|(no such file or directory)|(connection refused)", err.Error())
+ require.Less(t, i, 100, "praefect doesn't serve for too long")
+ time.Sleep(50 * time.Millisecond)
+ } else {
+ break
+ }
+ }
+
+ return repo
+}
diff --git a/cmd/praefect/subcmd_pingnodes.go b/cmd/praefect/subcmd_pingnodes.go
index 0b0873484..4bdf0a76e 100644
--- a/cmd/praefect/subcmd_pingnodes.go
+++ b/cmd/praefect/subcmd_pingnodes.go
@@ -88,7 +88,7 @@ func (s *dialNodesSubcommand) Exec(flags *flag.FlagSet, conf config.Config) erro
}
func (npr *nodePing) dial() (*grpc.ClientConn, error) {
- return subCmdDial(npr.address, npr.token)
+ return subCmdDial(context.Background(), npr.address, npr.token, defaultDialTimeout)
}
func (npr *nodePing) healthCheck(cc *grpc.ClientConn) (grpc_health_v1.HealthCheckResponse_ServingStatus, error) {
diff --git a/cmd/praefect/subcmd_remove_repository.go b/cmd/praefect/subcmd_remove_repository.go
index 016a10daa..1a2034221 100644
--- a/cmd/praefect/subcmd_remove_repository.go
+++ b/cmd/praefect/subcmd_remove_repository.go
@@ -28,10 +28,11 @@ type removeRepository struct {
logger logrus.FieldLogger
virtualStorage string
relativePath string
+ timeout time.Duration
}
func newRemoveRepository(logger logrus.FieldLogger) *removeRepository {
- return &removeRepository{logger: logger}
+ return &removeRepository{logger: logger, timeout: defaultDialTimeout}
}
func (cmd *removeRepository) FlagSet() *flag.FlagSet {
@@ -136,7 +137,7 @@ func (cmd *removeRepository) removeRepositoryFromDatabase(ctx context.Context, d
}
func (cmd *removeRepository) removeRepository(ctx context.Context, repo *gitalypb.Repository, addr, token string) (bool, error) {
- conn, err := subCmdDial(addr, token)
+ conn, err := subCmdDial(ctx, addr, token, cmd.timeout)
if err != nil {
return false, fmt.Errorf("error dialing: %w", err)
}
diff --git a/cmd/praefect/subcmd_remove_repository_test.go b/cmd/praefect/subcmd_remove_repository_test.go
index bb56dee3c..2b7dbf590 100644
--- a/cmd/praefect/subcmd_remove_repository_test.go
+++ b/cmd/praefect/subcmd_remove_repository_test.go
@@ -67,9 +67,7 @@ func TestRemoveRepository_Exec_invalidArgs(t *testing.T) {
t.Run("praefect address is not set in config ", func(t *testing.T) {
cmd := removeRepository{virtualStorage: "stub", relativePath: "stub", logger: testhelper.NewTestLogger(t)}
db := glsql.NewDB(t)
- var database string
- require.NoError(t, db.QueryRow(`SELECT current_database()`).Scan(&database))
- dbConf := glsql.GetDBConfig(t, database)
+ dbConf := glsql.GetDBConfig(t, db.Name)
err := cmd.Exec(flag.NewFlagSet("", flag.PanicOnError), config.Config{DB: dbConf})
require.EqualError(t, err, "get praefect address from config: no Praefect address configured")
})
@@ -84,9 +82,7 @@ func TestRemoveRepository_Exec(t *testing.T) {
g2Srv := testserver.StartGitalyServer(t, g2Cfg, nil, setup.RegisterAll, testserver.WithDisablePraefect())
db := glsql.NewDB(t)
- var database string
- require.NoError(t, db.QueryRow(`SELECT current_database()`).Scan(&database))
- dbConf := glsql.GetDBConfig(t, database)
+ dbConf := glsql.GetDBConfig(t, db.Name)
conf := config.Config{
SocketPath: testhelper.GetTemporaryGitalySocketFileName(t),
@@ -108,10 +104,8 @@ func TestRemoveRepository_Exec(t *testing.T) {
starterConfigs, err := getStarterConfigs(conf)
require.NoError(t, err)
- stopped := make(chan struct{})
bootstrapper := bootstrap.NewNoop()
go func() {
- defer close(stopped)
assert.NoError(t, run(starterConfigs, conf, bootstrapper, prometheus.NewRegistry(), prometheus.NewRegistry()))
}()
@@ -123,33 +117,35 @@ func TestRemoveRepository_Exec(t *testing.T) {
ctx, cancel := testhelper.Context()
defer cancel()
- createRepo := func(t *testing.T, storageName, relativePath string) *gitalypb.Repository {
- t.Helper()
- repo := &gitalypb.Repository{
- StorageName: storageName,
- RelativePath: relativePath,
- }
- for i := 0; true; i++ {
- _, err := repoClient.CreateRepository(ctx, &gitalypb.CreateRepositoryRequest{Repository: repo})
- if err != nil {
- require.Regexp(t, "(no healthy nodes)|(no such file or directory)|(connection refused)", err.Error())
- require.Less(t, i, 100, "praefect doesn't serve for too long")
- time.Sleep(50 * time.Millisecond)
- } else {
- break
- }
- }
- return repo
- }
+ //createRepo := func(t *testing.T, storageName, relativePath string) *gitalypb.Repository {
+ // t.Helper()
+ // repo := &gitalypb.Repository{
+ // StorageName: storageName,
+ // RelativePath: relativePath,
+ // }
+ // for i := 0; true; i++ {
+ // _, err := repoClient.CreateRepository(ctx, &gitalypb.CreateRepositoryRequest{Repository: repo})
+ // if err != nil {
+ // require.Regexp(t, "(no healthy nodes)|(no such file or directory)|(connection refused)", err.Error())
+ // require.Less(t, i, 100, "praefect doesn't serve for too long")
+ // time.Sleep(50 * time.Millisecond)
+ // } else {
+ // break
+ // }
+ // }
+ //
+ // return repo
+ //}
praefectStorage := conf.VirtualStorages[0].Name
t.Run("ok", func(t *testing.T) {
- repo := createRepo(t, praefectStorage, "path/to/test/repo")
+ repo := createRepo(t, ctx, repoClient, praefectStorage, "path/to/test/repo", db.DB)
cmd := &removeRepository{
logger: testhelper.NewTestLogger(t),
virtualStorage: repo.StorageName,
relativePath: repo.RelativePath,
+ timeout: defaultDialTimeout,
}
require.NoError(t, cmd.Exec(flag.NewFlagSet("", flag.PanicOnError), conf))
@@ -165,7 +161,7 @@ func TestRemoveRepository_Exec(t *testing.T) {
})
t.Run("no info about repository on praefect", func(t *testing.T) {
- repo := createRepo(t, praefectStorage, "path/to/test/repo")
+ repo := createRepo(t, ctx, repoClient, praefectStorage, "path/to/test/repo", db.DB)
repoStore := datastore.NewPostgresRepositoryStore(db.DB, nil)
require.NoError(t, repoStore.DeleteRepository(
ctx, repo.StorageName, repo.RelativePath, []string{g1Cfg.Storages[0].Name, g2Cfg.Storages[0].Name},
@@ -177,6 +173,7 @@ func TestRemoveRepository_Exec(t *testing.T) {
logger: logrus.NewEntry(logger),
virtualStorage: praefectStorage,
relativePath: repo.RelativePath,
+ timeout: defaultDialTimeout,
}
require.NoError(t, cmd.Exec(flag.NewFlagSet("", flag.PanicOnError), conf))
var found bool
@@ -194,7 +191,7 @@ func TestRemoveRepository_Exec(t *testing.T) {
})
t.Run("one of gitalies is out of service", func(t *testing.T) {
- repo := createRepo(t, praefectStorage, "path/to/test/repo")
+ repo := createRepo(t, ctx, repoClient, praefectStorage, "path/to/test/repo", db.DB)
g2Srv.Shutdown()
logger := testhelper.NewTestLogger(t)
@@ -203,6 +200,7 @@ func TestRemoveRepository_Exec(t *testing.T) {
logger: logrus.NewEntry(logger),
virtualStorage: praefectStorage,
relativePath: repo.RelativePath,
+ timeout: time.Second,
}
for {
@@ -231,7 +229,6 @@ func TestRemoveRepository_Exec(t *testing.T) {
})
bootstrapper.Terminate()
- <-stopped
}
func requireNoDatabaseInfo(t *testing.T, db glsql.DB, cmd *removeRepository) {
diff --git a/cmd/praefect/subcmd_set_replication_factor.go b/cmd/praefect/subcmd_set_replication_factor.go
index ff2cc4562..60c29203e 100644
--- a/cmd/praefect/subcmd_set_replication_factor.go
+++ b/cmd/praefect/subcmd_set_replication_factor.go
@@ -48,7 +48,7 @@ func (cmd *setReplicationFactorSubcommand) Exec(flags *flag.FlagSet, cfg config.
return err
}
- conn, err := subCmdDial(nodeAddr, cfg.Auth.Token)
+ conn, err := subCmdDial(context.Background(), nodeAddr, cfg.Auth.Token, defaultDialTimeout)
if err != nil {
return fmt.Errorf("error dialing: %w", err)
}
diff --git a/cmd/praefect/subcmd_track_repository.go b/cmd/praefect/subcmd_track_repository.go
new file mode 100644
index 000000000..1f27b7646
--- /dev/null
+++ b/cmd/praefect/subcmd_track_repository.go
@@ -0,0 +1,236 @@
+package main
+
+import (
+ "context"
+ "database/sql"
+ "errors"
+ "flag"
+ "fmt"
+ "math/rand"
+ "time"
+
+ "github.com/sirupsen/logrus"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/commonerr"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/config"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore/glsql"
+ "gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb"
+ "gitlab.com/gitlab-org/labkit/correlation"
+ "google.golang.org/grpc/metadata"
+)
+
+const (
+ trackRepositoryCmdName = "track-repository"
+)
+
+type trackRepository struct {
+ logger logrus.FieldLogger
+ virtualStorage string
+ relativePath string
+ authoritativeStorage string
+}
+
+var errAuthoritativeRepositoryNotExist = errors.New("authoritative repository does not exist")
+
+func newTrackRepository(logger logrus.FieldLogger) *trackRepository {
+ return &trackRepository{logger: logger}
+}
+
+func (cmd *trackRepository) FlagSet() *flag.FlagSet {
+ fs := flag.NewFlagSet(trackRepositoryCmdName, flag.ExitOnError)
+ fs.StringVar(&cmd.virtualStorage, paramVirtualStorage, "", "name of the repository's virtual storage")
+ fs.StringVar(&cmd.relativePath, paramRelativePath, "", "relative path to the repository")
+ fs.StringVar(&cmd.authoritativeStorage, paramAuthoritativeStorage, "", "storage with the repository to consider as authoritative")
+ fs.Usage = func() {
+ _, _ = printfErr("Description:\n" +
+ " This command adds a given repository to be tracked by Praefect.\n" +
+ " It checks if the repository exists on disk on the authoritative storage, " +
+ " and whether database records are absent from tracking the repository.")
+ fs.PrintDefaults()
+ }
+ return fs
+}
+
+func (cmd trackRepository) Exec(flags *flag.FlagSet, cfg config.Config) error {
+ switch {
+ case flags.NArg() > 0:
+ return unexpectedPositionalArgsError{Command: flags.Name()}
+ case cmd.virtualStorage == "":
+ return requiredParameterError(paramVirtualStorage)
+ case cmd.relativePath == "":
+ return requiredParameterError(paramRelativePath)
+ case cmd.authoritativeStorage == "":
+ if cfg.Failover.ElectionStrategy == config.ElectionStrategyPerRepository {
+ return requiredParameterError(paramAuthoritativeStorage)
+ }
+ }
+
+ db, err := glsql.OpenDB(cfg.DB)
+ if err != nil {
+ return fmt.Errorf("connect to database: %w", err)
+ }
+ defer func() { _ = db.Close() }()
+
+ ctx := correlation.ContextWithCorrelation(context.Background(), correlation.SafeRandomID())
+ logger := cmd.logger.WithField("correlation_id", correlation.ExtractFromContext(ctx))
+
+ return cmd.exec(ctx, logger, db, cfg)
+}
+
+const trackRepoErrorPrefix = "attempting to track repository in praefect database"
+
+func (cmd *trackRepository) exec(ctx context.Context, logger logrus.FieldLogger, db *sql.DB, cfg config.Config) error {
+ logger.WithFields(logrus.Fields{
+ "virtual_storage": cmd.virtualStorage,
+ "relative_path": cmd.relativePath,
+ "authoritative_storage": cmd.authoritativeStorage,
+ }).Debug("track repository")
+
+ var primary string
+ var secondaries []string
+ var variableReplicationFactorEnabled, savePrimary bool
+ if cfg.Failover.ElectionStrategy == config.ElectionStrategyPerRepository {
+ savePrimary = true
+ primary = cmd.authoritativeStorage
+
+ for _, vs := range cfg.VirtualStorages {
+ if vs.Name == cmd.virtualStorage {
+ for _, node := range vs.Nodes {
+ if node.Storage == cmd.authoritativeStorage {
+ continue
+ }
+ secondaries = append(secondaries, node.Storage)
+ }
+ }
+ }
+
+ r := rand.New(rand.NewSource(time.Now().UnixNano()))
+ replicationFactor := cfg.DefaultReplicationFactors()[cmd.virtualStorage]
+
+ if replicationFactor > 0 {
+ variableReplicationFactorEnabled = true
+ // Select random secondaries according to the default replication factor.
+ r.Shuffle(len(secondaries), func(i, j int) {
+ secondaries[i], secondaries[j] = secondaries[j], secondaries[i]
+ })
+
+ secondaries = secondaries[:replicationFactor-1]
+ }
+ } else {
+ savePrimary = false
+ if err := db.QueryRowContext(ctx, `SELECT node_name FROM shard_primaries WHERE shard_name = $1 AND demoted = 'false'`, cmd.virtualStorage).Scan(&primary); err != nil {
+ if errors.Is(err, sql.ErrNoRows) {
+ return fmt.Errorf("%s: no primaries found", trackRepoErrorPrefix)
+ }
+ return fmt.Errorf("%s: %w", trackRepoErrorPrefix, err)
+ }
+ }
+
+ authoritativeRepoExists, err := cmd.authoritativeRepositoryExists(ctx, cfg, primary)
+ if err != nil {
+ return fmt.Errorf("%s: %w", trackRepoErrorPrefix, err)
+ }
+
+ if !authoritativeRepoExists {
+ return fmt.Errorf("%s: %w", trackRepoErrorPrefix, errAuthoritativeRepositoryNotExist)
+ }
+
+ if err := cmd.trackRepository(
+ ctx,
+ datastore.NewPostgresRepositoryStore(db, cfg.StorageNames()),
+ primary,
+ secondaries,
+ savePrimary,
+ variableReplicationFactorEnabled,
+ ); err != nil {
+ return fmt.Errorf("%s: %w", trackRepoErrorPrefix, err)
+ }
+
+ logger.Debug("finished adding new repository to be tracked in praefect database.")
+
+ return nil
+}
+
+func (cmd *trackRepository) trackRepository(
+ ctx context.Context,
+ ds *datastore.PostgresRepositoryStore,
+ primary string,
+ secondaries []string,
+ savePrimary bool,
+ variableReplicationFactorEnabled bool,
+) error {
+ repositoryID, err := ds.ReserveRepositoryID(ctx, cmd.virtualStorage, cmd.relativePath)
+ if err != nil {
+ if errors.Is(err, commonerr.ErrRepositoryAlreadyExists) {
+ cmd.logger.Print("repository is already tracked in praefect database")
+ return nil
+ }
+
+ return fmt.Errorf("ReserveRepositoryID: %w", err)
+ }
+
+ if err := ds.CreateRepository(
+ ctx,
+ repositoryID,
+ cmd.virtualStorage,
+ cmd.relativePath,
+ primary,
+ nil,
+ secondaries,
+ savePrimary,
+ variableReplicationFactorEnabled,
+ ); err != nil {
+ var repoExistsError datastore.RepositoryExistsError
+ if errors.As(err, &repoExistsError) {
+ cmd.logger.Print("repository is already tracked in praefect database")
+ return nil
+ }
+
+ return fmt.Errorf("CreateRepository: %w", err)
+ }
+
+ return nil
+}
+
+func (cmd *trackRepository) repositoryExists(ctx context.Context, repo *gitalypb.Repository, addr, token string) (bool, error) {
+ conn, err := subCmdDial(ctx, addr, token, defaultDialTimeout)
+ if err != nil {
+ return false, fmt.Errorf("error dialing: %w", err)
+ }
+ defer func() { _ = conn.Close() }()
+
+ ctx = metadata.AppendToOutgoingContext(ctx, "client_name", trackRepositoryCmdName)
+ repositoryClient := gitalypb.NewRepositoryServiceClient(conn)
+ res, err := repositoryClient.RepositoryExists(ctx, &gitalypb.RepositoryExistsRequest{Repository: repo})
+ if err != nil {
+ return false, err
+ }
+
+ return res.GetExists(), nil
+}
+
+func (cmd *trackRepository) authoritativeRepositoryExists(ctx context.Context, cfg config.Config, nodeName string) (bool, error) {
+ for _, vs := range cfg.VirtualStorages {
+ if vs.Name != cmd.virtualStorage {
+ continue
+ }
+
+ for _, node := range vs.Nodes {
+ if node.Storage == nodeName {
+ logger.Debugf("check if repository %q exists on gitaly %q at %q", cmd.relativePath, node.Storage, node.Address)
+ repo := &gitalypb.Repository{
+ StorageName: node.Storage,
+ RelativePath: cmd.relativePath,
+ }
+ exists, err := cmd.repositoryExists(ctx, repo, node.Address, node.Token)
+ if err != nil {
+ logger.WithError(err).Warnf("checking if repository exists %q, %q", node.Storage, cmd.relativePath)
+ return false, nil
+ }
+ return exists, nil
+ }
+ }
+ return false, fmt.Errorf("node %q not found", cmd.authoritativeStorage)
+ }
+ return false, fmt.Errorf("virtual storage %q not found", cmd.virtualStorage)
+}
diff --git a/cmd/praefect/subcmd_track_repository_test.go b/cmd/praefect/subcmd_track_repository_test.go
new file mode 100644
index 000000000..619c57778
--- /dev/null
+++ b/cmd/praefect/subcmd_track_repository_test.go
@@ -0,0 +1,231 @@
+package main
+
+import (
+ "errors"
+ "flag"
+ "fmt"
+ "path/filepath"
+ "testing"
+ "time"
+
+ "github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
+ "gitlab.com/gitlab-org/gitaly/v14/client"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/service/setup"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/config"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore/glsql"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/nodes"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/protoregistry"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/promtest"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testcfg"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testserver"
+ "gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb"
+)
+
+func TestTrackRepository_FlagSet(t *testing.T) {
+ t.Parallel()
+ cmd := &trackRepository{}
+ fs := cmd.FlagSet()
+ require.NoError(t, fs.Parse([]string{"--virtual-storage", "vs", "--repository", "repo", "--authoritative-storage", "storage-0"}))
+ require.Equal(t, "vs", cmd.virtualStorage)
+ require.Equal(t, "repo", cmd.relativePath)
+ require.Equal(t, "storage-0", cmd.authoritativeStorage)
+}
+
+func TestTrackRepository_Exec_invalidArgs(t *testing.T) {
+ t.Parallel()
+ t.Run("not all flag values processed", func(t *testing.T) {
+ cmd := trackRepository{}
+ flagSet := flag.NewFlagSet("cmd", flag.PanicOnError)
+ require.NoError(t, flagSet.Parse([]string{"stub"}))
+ err := cmd.Exec(flagSet, config.Config{})
+ require.EqualError(t, err, "cmd doesn't accept positional arguments")
+ })
+
+ t.Run("virtual-storage is not set", func(t *testing.T) {
+ cmd := trackRepository{}
+ err := cmd.Exec(flag.NewFlagSet("", flag.PanicOnError), config.Config{})
+ require.EqualError(t, err, `"virtual-storage" is a required parameter`)
+ })
+
+ t.Run("repository is not set", func(t *testing.T) {
+ cmd := trackRepository{virtualStorage: "stub"}
+ err := cmd.Exec(flag.NewFlagSet("", flag.PanicOnError), config.Config{})
+ require.EqualError(t, err, `"repository" is a required parameter`)
+ })
+
+ t.Run("authoritative-storage is not set", func(t *testing.T) {
+ cmd := trackRepository{virtualStorage: "stub", relativePath: "path/to/repo"}
+ err := cmd.Exec(flag.NewFlagSet("", flag.PanicOnError), config.Config{Failover: config.Failover{ElectionStrategy: config.ElectionStrategyPerRepository}})
+ require.EqualError(t, err, `"authoritative-storage" is a required parameter`)
+ })
+
+ t.Run("db connection error", func(t *testing.T) {
+ cmd := trackRepository{virtualStorage: "stub", relativePath: "stub", authoritativeStorage: "storage-0"}
+ cfg := config.Config{DB: config.DB{Host: "stub", SSLMode: "disable"}}
+ err := cmd.Exec(flag.NewFlagSet("", flag.PanicOnError), cfg)
+ require.Error(t, err)
+ require.Contains(t, err.Error(), "connect to database: dial tcp: lookup stub")
+ })
+}
+
+func TestTrackRepository_Exec(t *testing.T) {
+ t.Parallel()
+ g1Cfg := testcfg.Build(t, testcfg.WithStorages("gitaly-1"))
+ g2Cfg := testcfg.Build(t, testcfg.WithStorages("gitaly-2"))
+
+ g1Srv := testserver.StartGitalyServer(t, g1Cfg, nil, setup.RegisterAll, testserver.WithDisablePraefect())
+ g2Srv := testserver.StartGitalyServer(t, g2Cfg, nil, setup.RegisterAll, testserver.WithDisablePraefect())
+ defer g2Srv.Shutdown()
+ defer g1Srv.Shutdown()
+
+ g1Addr := g1Srv.Address()
+
+ db := glsql.NewDB(t)
+ dbConf := glsql.GetDBConfig(t, db.Name)
+
+ virtualStorageName := "praefect"
+ conf := config.Config{
+ SocketPath: testhelper.GetTemporaryGitalySocketFileName(t),
+ VirtualStorages: []*config.VirtualStorage{
+ {
+ Name: virtualStorageName,
+ Nodes: []*config.Node{
+ {Storage: g1Cfg.Storages[0].Name, Address: g1Addr},
+ {Storage: g2Cfg.Storages[0].Name, Address: g2Srv.Address()},
+ },
+ DefaultReplicationFactor: 2,
+ },
+ },
+ DB: dbConf,
+ }
+
+ gitalyCC, err := client.Dial(g1Addr, nil)
+ require.NoError(t, err)
+ defer func() { require.NoError(t, gitalyCC.Close()) }()
+ ctx, cancel := testhelper.Context()
+ defer cancel()
+
+ gitaly1RepositoryClient := gitalypb.NewRepositoryServiceClient(gitalyCC)
+
+ createRepoThroughGitaly1 := func(relativePath string) error {
+ _, err := gitaly1RepositoryClient.CreateRepository(
+ ctx,
+ &gitalypb.CreateRepositoryRequest{
+ Repository: &gitalypb.Repository{
+ StorageName: g1Cfg.Storages[0].Name,
+ RelativePath: relativePath,
+ },
+ })
+ return err
+ }
+
+ testCases := map[string]struct {
+ failoverConfig config.Failover
+ authoritativeStorage string
+ }{
+ "sql election": {
+ failoverConfig: config.Failover{
+ Enabled: true,
+ ElectionStrategy: config.ElectionStrategySQL,
+ },
+ authoritativeStorage: "",
+ },
+ "per repository election": {
+ failoverConfig: config.Failover{
+ Enabled: true,
+ ElectionStrategy: config.ElectionStrategyPerRepository,
+ },
+ authoritativeStorage: g1Cfg.Storages[0].Name,
+ },
+ }
+
+ logger := testhelper.NewTestLogger(t)
+ for tn, tc := range testCases {
+ t.Run(tn, func(t *testing.T) {
+ addCmdConf := conf
+ addCmdConf.Failover = tc.failoverConfig
+
+ t.Run("ok", func(t *testing.T) {
+ nodeMgr, err := nodes.NewManager(testhelper.DiscardTestEntry(t), addCmdConf, db.DB, nil, promtest.NewMockHistogramVec(), protoregistry.GitalyProtoPreregistered, nil, nil)
+ require.NoError(t, err)
+ nodeMgr.Start(0, time.Hour)
+
+ relativePath := fmt.Sprintf("path/to/test/repo_%s", tn)
+ repoDS := datastore.NewPostgresRepositoryStore(db.DB, conf.StorageNames())
+
+ require.NoError(t, createRepoThroughGitaly1(relativePath))
+
+ rmRepoCmd := &removeRepository{
+ logger: logger,
+ virtualStorage: virtualStorageName,
+ relativePath: relativePath,
+ }
+
+ require.NoError(t, rmRepoCmd.Exec(flag.NewFlagSet("", flag.PanicOnError), conf))
+
+ // create the repo on Gitaly without Praefect knowing
+ require.NoError(t, createRepoThroughGitaly1(relativePath))
+ require.DirExists(t, filepath.Join(g1Cfg.Storages[0].Path, relativePath))
+ require.NoDirExists(t, filepath.Join(g2Cfg.Storages[0].Path, relativePath))
+
+ addRepoCmd := &trackRepository{
+ logger: logger,
+ virtualStorage: virtualStorageName,
+ relativePath: relativePath,
+ authoritativeStorage: tc.authoritativeStorage,
+ }
+
+ require.NoError(t, addRepoCmd.Exec(flag.NewFlagSet("", flag.PanicOnError), addCmdConf))
+ as := datastore.NewAssignmentStore(db.DB, conf.StorageNames())
+
+ assignments, err := as.GetHostAssignments(ctx, virtualStorageName, relativePath)
+ require.NoError(t, err)
+ require.Len(t, assignments, 2)
+ assert.Contains(t, assignments, g1Cfg.Storages[0].Name)
+ assert.Contains(t, assignments, g2Cfg.Storages[0].Name)
+
+ exists, err := repoDS.RepositoryExists(ctx, virtualStorageName, relativePath)
+ require.NoError(t, err)
+ assert.True(t, exists)
+ })
+
+ t.Run("repository does not exist", func(t *testing.T) {
+ relativePath := fmt.Sprintf("path/to/test/repo_1_%s", tn)
+
+ cmd := &trackRepository{
+ logger: testhelper.NewTestLogger(t),
+ virtualStorage: "praefect",
+ relativePath: relativePath,
+ authoritativeStorage: tc.authoritativeStorage,
+ }
+
+ assert.True(t, errors.Is(cmd.Exec(flag.NewFlagSet("", flag.PanicOnError), addCmdConf), errAuthoritativeRepositoryNotExist))
+ })
+
+ t.Run("records already exist", func(t *testing.T) {
+ relativePath := fmt.Sprintf("path/to/test/repo_2_%s", tn)
+
+ require.NoError(t, createRepoThroughGitaly1(relativePath))
+ require.DirExists(t, filepath.Join(g1Cfg.Storages[0].Path, relativePath))
+ require.NoDirExists(t, filepath.Join(g2Cfg.Storages[0].Path, relativePath))
+
+ ds := datastore.NewPostgresRepositoryStore(db.DB, conf.StorageNames())
+ id, err := ds.ReserveRepositoryID(ctx, virtualStorageName, relativePath)
+ require.NoError(t, err)
+ require.NoError(t, ds.CreateRepository(ctx, id, virtualStorageName, relativePath, g1Cfg.Storages[0].Name, nil, nil, true, true))
+
+ cmd := &trackRepository{
+ logger: testhelper.NewTestLogger(t),
+ virtualStorage: virtualStorageName,
+ relativePath: relativePath,
+ authoritativeStorage: tc.authoritativeStorage,
+ }
+
+ assert.NoError(t, cmd.Exec(flag.NewFlagSet("", flag.PanicOnError), addCmdConf))
+ })
+ })
+ }
+}
diff --git a/internal/praefect/datastore/storage_cleanup.go b/internal/praefect/datastore/storage_cleanup.go
new file mode 100644
index 000000000..455ddef2b
--- /dev/null
+++ b/internal/praefect/datastore/storage_cleanup.go
@@ -0,0 +1,85 @@
+package datastore
+
+import (
+ "context"
+ "database/sql"
+ "fmt"
+
+ "github.com/lib/pq"
+)
+
+// RepositoryClusterPath identifies location of the repository in the cluster.
+type RepositoryClusterPath struct {
+ ClusterPath
+ // RelativePath relative path to the repository on the disk.
+ RelativePath string
+}
+
+// NewRepositoryClusterPath initializes and returns RepositoryClusterPath.
+func NewRepositoryClusterPath(virtualStorage, storage, relativePath string) RepositoryClusterPath {
+ return RepositoryClusterPath{
+ ClusterPath: ClusterPath{
+ VirtualStorage: virtualStorage,
+ Storage: storage,
+ },
+ RelativePath: relativePath,
+ }
+}
+
+// ClusterPath represents path on the cluster to the storage.
+type ClusterPath struct {
+ // VirtualStorage is the name of the virtual storage.
+ VirtualStorage string
+ // Storage is the name of the gitaly storage.
+ Storage string
+}
+
+// NewStorageCleanup initialises and returns a new instance of the StorageCleanup.
+func NewStorageCleanup(db *sql.DB) *StorageCleanup {
+ return &StorageCleanup{db: db}
+}
+
+// StorageCleanup provides methods on the database for the repository cleanup operation.
+type StorageCleanup struct {
+ db *sql.DB
+}
+
+// DoesntExist returns RepositoryClusterPath for each repository that doesn't exist in the database
+// by querying repositories and storage_repositories tables.
+func (ss *StorageCleanup) DoesntExist(ctx context.Context, virtualStorage, storage string, relativePath []string) ([]RepositoryClusterPath, error) {
+ if len(relativePath) == 0 {
+ return nil, nil
+ }
+
+ rows, err := ss.db.QueryContext(
+ ctx,
+ `SELECT $1 AS virtual_storage, $2 AS storage, UNNEST($3::TEXT[]) AS relative_path
+ EXCEPT (
+ SELECT virtual_storage, storage, relative_path
+ FROM repositories
+ JOIN storage_repositories USING (virtual_storage, relative_path)
+ WHERE virtual_storage = $1 AND storage = $2 AND relative_path = ANY($3)
+ )`,
+ virtualStorage, storage, pq.StringArray(relativePath),
+ )
+ if err != nil {
+ return nil, fmt.Errorf("query: %w", err)
+ }
+ defer func() { _ = rows.Close() }()
+
+ var res []RepositoryClusterPath
+ for rows.Next() {
+ var curr RepositoryClusterPath
+ if err := rows.Scan(&curr.VirtualStorage, &curr.Storage, &curr.RelativePath); err != nil {
+ return nil, fmt.Errorf("scan: %w", err)
+ }
+ res = append(res, curr)
+ }
+ if err := rows.Err(); err != nil {
+ return nil, fmt.Errorf("loop: %w", err)
+ }
+ if err := rows.Close(); err != nil {
+ return nil, fmt.Errorf("close: %w", err)
+ }
+ return res, nil
+}
diff --git a/internal/praefect/repocleaner/init_test.go b/internal/praefect/repocleaner/init_test.go
new file mode 100644
index 000000000..0b99e6d32
--- /dev/null
+++ b/internal/praefect/repocleaner/init_test.go
@@ -0,0 +1,21 @@
+package repocleaner
+
+import (
+ "os"
+ "testing"
+
+ "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper"
+)
+
+func TestMain(m *testing.M) {
+ os.Exit(testMain(m))
+}
+
+func testMain(m *testing.M) int {
+ defer testhelper.MustHaveNoChildProcess()
+
+ cleanup := testhelper.Configure()
+ defer cleanup()
+
+ return m.Run()
+}
diff --git a/internal/praefect/repocleaner/repository.go b/internal/praefect/repocleaner/repository.go
new file mode 100644
index 000000000..86ea69bd1
--- /dev/null
+++ b/internal/praefect/repocleaner/repository.go
@@ -0,0 +1,77 @@
+package repocleaner
+
+import (
+ "context"
+ "errors"
+ "fmt"
+ "io"
+
+ "gitlab.com/gitlab-org/gitaly/v14/internal/praefect"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore"
+ "gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb"
+)
+
+// Walker allows iterating over the repositories of the gitaly storage.
+type Walker struct {
+ conns praefect.Connections
+ batchSize int
+}
+
+// NewWalker returns a new *Walker instance.
+func NewWalker(conns praefect.Connections, batchSize int) *Walker {
+ return &Walker{conns: conns, batchSize: batchSize}
+}
+
+// ExecOnRepositories runs through all the repositories on a Gitaly storage and executes the provided action.
+// The processing is done in batches to reduce cost of operations.
+func (wr *Walker) ExecOnRepositories(ctx context.Context, virtualStorage, storage string, action func([]datastore.RepositoryClusterPath) error) error {
+ gclient, err := wr.getInternalGitalyClient(virtualStorage, storage)
+ if err != nil {
+ return fmt.Errorf("setup gitaly client: %w", err)
+ }
+
+ resp, err := gclient.WalkRepos(ctx, &gitalypb.WalkReposRequest{StorageName: storage})
+ if err != nil {
+ return fmt.Errorf("unable to walk repos: %w", err)
+ }
+
+ batch := make([]datastore.RepositoryClusterPath, 0, wr.batchSize)
+ for {
+ res, err := resp.Recv()
+ if err != nil {
+ if !errors.Is(err, io.EOF) {
+ return fmt.Errorf("failure on walking repos: %w", err)
+ }
+ break
+ }
+
+ batch = append(batch, datastore.RepositoryClusterPath{
+ ClusterPath: datastore.ClusterPath{
+ VirtualStorage: virtualStorage,
+ Storage: storage,
+ },
+ RelativePath: res.RelativePath,
+ })
+
+ if len(batch) == cap(batch) {
+ if err := action(batch); err != nil {
+ return err
+ }
+ batch = batch[:0]
+ }
+ }
+ if len(batch) > 0 {
+ if err := action(batch); err != nil {
+ return err
+ }
+ }
+ return nil
+}
+
+func (wr *Walker) getInternalGitalyClient(virtualStorage, storage string) (gitalypb.InternalGitalyClient, error) {
+ conn, found := wr.conns[virtualStorage][storage]
+ if !found {
+ return nil, fmt.Errorf("no connection to the gitaly node %q/%q", virtualStorage, storage)
+ }
+ return gitalypb.NewInternalGitalyClient(conn), nil
+}
diff --git a/internal/praefect/repocleaner/repository_test.go b/internal/praefect/repocleaner/repository_test.go
new file mode 100644
index 000000000..6d8b7bd19
--- /dev/null
+++ b/internal/praefect/repocleaner/repository_test.go
@@ -0,0 +1,114 @@
+package repocleaner
+
+import (
+ "testing"
+
+ "github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/backchannel"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/git/gittest"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/service/setup"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/praefect"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/config"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/protoregistry"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/service/transaction"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testcfg"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testserver"
+)
+
+func TestWalker_ExecOnRepositories(t *testing.T) {
+ const (
+ repo1RelPath = "repo-1.git"
+ repo2RelPath = "repo-2.git"
+ repo3RelPath = "repo-3.git"
+
+ storage1 = "gitaly-1"
+
+ virtualStorage = "praefect"
+ )
+
+ gCfg := testcfg.Build(t, testcfg.WithStorages(storage1))
+ gAddr := testserver.RunGitalyServer(t, gCfg, nil, setup.RegisterAll, testserver.WithDisablePraefect())
+
+ conf := config.Config{
+ SocketPath: testhelper.GetTemporaryGitalySocketFileName(t),
+ VirtualStorages: []*config.VirtualStorage{
+ {
+ Name: virtualStorage,
+ Nodes: []*config.Node{
+ {Storage: gCfg.Storages[0].Name, Address: gAddr},
+ },
+ },
+ },
+ }
+
+ gittest.CloneRepo(t, gCfg, gCfg.Storages[0], gittest.CloneRepoOpts{RelativePath: repo1RelPath})
+ gittest.CloneRepo(t, gCfg, gCfg.Storages[0], gittest.CloneRepoOpts{RelativePath: repo2RelPath})
+ gittest.CloneRepo(t, gCfg, gCfg.Storages[0], gittest.CloneRepoOpts{RelativePath: repo3RelPath})
+
+ ctx, cancel := testhelper.Context()
+ defer cancel()
+
+ entry := testhelper.NewTestLogger(t).WithContext(ctx)
+ clientHandshaker := backchannel.NewClientHandshaker(entry, praefect.NewBackchannelServerFactory(entry, transaction.NewServer(nil)))
+ nodeSet, err := praefect.DialNodes(ctx, conf.VirtualStorages, protoregistry.GitalyProtoPreregistered, nil, clientHandshaker)
+ require.NoError(t, err)
+ defer nodeSet.Close()
+
+ for _, tc := range []struct {
+ desc string
+ batchSize int
+ exp [][]datastore.RepositoryClusterPath
+ expErr error
+ }{
+ {
+ desc: "multiple batches",
+ batchSize: 2,
+ exp: [][]datastore.RepositoryClusterPath{
+ {
+ {ClusterPath: datastore.ClusterPath{VirtualStorage: virtualStorage, Storage: storage1}, RelativePath: repo1RelPath},
+ {ClusterPath: datastore.ClusterPath{VirtualStorage: virtualStorage, Storage: storage1}, RelativePath: repo2RelPath},
+ },
+ {
+ {ClusterPath: datastore.ClusterPath{VirtualStorage: virtualStorage, Storage: storage1}, RelativePath: repo3RelPath},
+ },
+ },
+ },
+ {
+ desc: "single batch",
+ batchSize: 10,
+ exp: [][]datastore.RepositoryClusterPath{
+ {
+ {ClusterPath: datastore.ClusterPath{VirtualStorage: virtualStorage, Storage: storage1}, RelativePath: repo1RelPath},
+ {ClusterPath: datastore.ClusterPath{VirtualStorage: virtualStorage, Storage: storage1}, RelativePath: repo2RelPath},
+ {ClusterPath: datastore.ClusterPath{VirtualStorage: virtualStorage, Storage: storage1}, RelativePath: repo3RelPath},
+ },
+ },
+ },
+ {
+ desc: "terminates on error",
+ batchSize: 1,
+ exp: [][]datastore.RepositoryClusterPath{
+ {
+ {ClusterPath: datastore.ClusterPath{VirtualStorage: virtualStorage, Storage: storage1}, RelativePath: repo1RelPath},
+ },
+ },
+ expErr: assert.AnError,
+ },
+ } {
+ t.Run(tc.desc, func(t *testing.T) {
+ walker := NewWalker(nodeSet.Connections(), tc.batchSize)
+ var iteration int
+ err = walker.ExecOnRepositories(ctx, conf.VirtualStorages[0].Name, storage1, func(paths []datastore.RepositoryClusterPath) error {
+ require.Less(t, iteration, len(tc.exp))
+ expected := tc.exp[iteration]
+ iteration++
+ assert.ElementsMatch(t, paths, expected)
+ return tc.expErr
+ })
+ require.Equal(t, tc.expErr, err)
+ })
+ }
+}