diff options
author | John Cai <jcai@gitlab.com> | 2021-11-01 18:14:47 +0300 |
---|---|---|
committer | John Cai <jcai@gitlab.com> | 2021-11-01 18:14:47 +0300 |
commit | 0595ee8948842829e01fe10d6b7a8d27fc66dec4 (patch) | |
tree | fab37c034c4cc6f036e2123d78d78633f967b3d4 | |
parent | feb673d3328239cf669770406cfd0db188d347f9 (diff) | |
parent | b5724b09d133865ab808348fba7cd23c9e0a5ec4 (diff) |
Merge branch 'ps-praefect-cmd-port-14-0' into '14-0-stable'
Backport praefect sub-commands to 14.0
See merge request gitlab-org/gitaly!4018
22 files changed, 1848 insertions, 67 deletions
diff --git a/cmd/gitaly/main.go b/cmd/gitaly/main.go index dd4243003..111ac7d71 100644 --- a/cmd/gitaly/main.go +++ b/cmd/gitaly/main.go @@ -188,8 +188,6 @@ func run(cfg config.Cfg) error { return fmt.Errorf("linguist instance creation: %w", err) } - b.StopAction = gitalyServerFactory.GracefulStop - rubySrv := rubyserver.New(cfg) if err := rubySrv.Start(); err != nil { return fmt.Errorf("initialize gitaly-ruby: %v", err) @@ -291,5 +289,5 @@ func run(cfg config.Cfg) error { } }() - return b.Wait(cfg.GracefulRestartTimeout.Duration()) + return b.Wait(cfg.GracefulRestartTimeout.Duration(), gitalyServerFactory.GracefulStop) } diff --git a/cmd/praefect/main.go b/cmd/praefect/main.go index fde28f0ef..4314b7732 100644 --- a/cmd/praefect/main.go +++ b/cmd/praefect/main.go @@ -167,7 +167,12 @@ func main() { logger.Fatalf("%s", err) } - if err := run(starterConfigs, conf); err != nil { + b, err := bootstrap.New() + if err != nil { + logger.Fatalf("unable to create a bootstrap: %v", err) + } + + if err := run(starterConfigs, conf, b, prometheus.DefaultRegisterer); err != nil { logger.Fatalf("%v", err) } } @@ -210,18 +215,18 @@ func configure(conf config.Config) { sentry.ConfigureSentry(version.GetVersion(), conf.Sentry) } -func run(cfgs []starter.Config, conf config.Config) error { - nodeLatencyHistogram, err := metrics.RegisterNodeLatency(conf.Prometheus) +func run(cfgs []starter.Config, conf config.Config, b bootstrap.Listener, promreg prometheus.Registerer) error { + nodeLatencyHistogram, err := metrics.RegisterNodeLatency(conf.Prometheus, promreg) if err != nil { return err } - delayMetric, err := metrics.RegisterReplicationDelay(conf.Prometheus) + delayMetric, err := metrics.RegisterReplicationDelay(conf.Prometheus, promreg) if err != nil { return err } - latencyMetric, err := metrics.RegisterReplicationLatency(conf.Prometheus) + latencyMetric, err := metrics.RegisterReplicationLatency(conf.Prometheus, promreg) if err != nil { return err } @@ -408,7 +413,7 @@ func run(cfgs []starter.Config, conf config.Config) error { ) metricsCollectors = append(metricsCollectors, transactionManager, coordinator, repl) if db != nil { - prometheus.MustRegister( + promreg.MustRegister( datastore.NewRepositoryStoreCollector( logger, conf.VirtualStorageNames(), @@ -417,14 +422,8 @@ func run(cfgs []starter.Config, conf config.Config) error { ), ) } - prometheus.MustRegister(metricsCollectors...) - - b, err := bootstrap.New() - if err != nil { - return fmt.Errorf("unable to create a bootstrap: %v", err) - } + promreg.MustRegister(metricsCollectors...) - b.StopAction = srvFactory.GracefulStop for _, cfg := range cfgs { srv, err := srvFactory.Create(cfg.IsSecure()) if err != nil { @@ -507,7 +506,7 @@ func run(cfgs []starter.Config, conf config.Config) error { } } - return b.Wait(conf.GracefulStopTimeout.Duration()) + return b.Wait(conf.GracefulStopTimeout.Duration(), srvFactory.GracefulStop) } func getStarterConfigs(conf config.Config) ([]starter.Config, error) { diff --git a/cmd/praefect/subcmd.go b/cmd/praefect/subcmd.go index 70ddff3c1..08a11c21d 100644 --- a/cmd/praefect/subcmd.go +++ b/cmd/praefect/subcmd.go @@ -22,17 +22,22 @@ type subcmd interface { Exec(flags *flag.FlagSet, config config.Config) error } +const defaultDialTimeout = 30 * time.Second + var ( subcommands = map[string]subcmd{ - "sql-ping": &sqlPingSubcommand{}, - "sql-migrate": &sqlMigrateSubcommand{}, - "dial-nodes": &dialNodesSubcommand{}, - "reconcile": &reconcileSubcommand{}, - "sql-migrate-down": &sqlMigrateDownSubcommand{}, - "sql-migrate-status": &sqlMigrateStatusSubcommand{}, - "dataloss": newDatalossSubcommand(), - "accept-dataloss": &acceptDatalossSubcommand{}, - "set-replication-factor": newSetReplicatioFactorSubcommand(os.Stdout), + "sql-ping": &sqlPingSubcommand{}, + "sql-migrate": &sqlMigrateSubcommand{}, + "dial-nodes": &dialNodesSubcommand{}, + "reconcile": &reconcileSubcommand{}, + "sql-migrate-down": &sqlMigrateDownSubcommand{}, + "sql-migrate-status": &sqlMigrateStatusSubcommand{}, + "dataloss": newDatalossSubcommand(), + "accept-dataloss": &acceptDatalossSubcommand{}, + "set-replication-factor": newSetReplicatioFactorSubcommand(os.Stdout), + removeRepositoryCmdName: newRemoveRepository(logger), + trackRepositoryCmdName: newTrackRepository(logger), + listUntrackedRepositoriesName: newListUntrackedRepositories(logger, os.Stdout), } ) @@ -137,8 +142,8 @@ func printfErr(format string, a ...interface{}) (int, error) { return fmt.Fprintf(os.Stderr, format, a...) } -func subCmdDial(addr, token string, opts ...grpc.DialOption) (*grpc.ClientConn, error) { - ctx, cancel := context.WithTimeout(context.TODO(), 30*time.Second) +func subCmdDial(ctx context.Context, addr, token string, timeout time.Duration, opts ...grpc.DialOption) (*grpc.ClientConn, error) { + ctx, cancel := context.WithTimeout(ctx, timeout) defer cancel() opts = append(opts, diff --git a/cmd/praefect/subcmd_accept_dataloss.go b/cmd/praefect/subcmd_accept_dataloss.go index 0c4ea015a..e037df424 100644 --- a/cmd/praefect/subcmd_accept_dataloss.go +++ b/cmd/praefect/subcmd_accept_dataloss.go @@ -51,7 +51,7 @@ func (cmd *acceptDatalossSubcommand) Exec(flags *flag.FlagSet, cfg config.Config return err } - conn, err := subCmdDial(nodeAddr, cfg.Auth.Token) + conn, err := subCmdDial(context.Background(), nodeAddr, cfg.Auth.Token, defaultDialTimeout) if err != nil { return fmt.Errorf("error dialing: %w", err) } diff --git a/cmd/praefect/subcmd_dataloss.go b/cmd/praefect/subcmd_dataloss.go index 037edc886..89a1d595f 100644 --- a/cmd/praefect/subcmd_dataloss.go +++ b/cmd/praefect/subcmd_dataloss.go @@ -67,7 +67,7 @@ func (cmd *datalossSubcommand) Exec(flags *flag.FlagSet, cfg config.Config) erro return err } - conn, err := subCmdDial(nodeAddr, cfg.Auth.Token) + conn, err := subCmdDial(context.Background(), nodeAddr, cfg.Auth.Token, defaultDialTimeout) if err != nil { return fmt.Errorf("error dialing: %v", err) } diff --git a/cmd/praefect/subcmd_list_untracked_repositories.go b/cmd/praefect/subcmd_list_untracked_repositories.go new file mode 100644 index 000000000..bff6140a0 --- /dev/null +++ b/cmd/praefect/subcmd_list_untracked_repositories.go @@ -0,0 +1,172 @@ +package main + +import ( + "context" + "encoding/json" + "errors" + "flag" + "fmt" + "io" + + "github.com/sirupsen/logrus" + "gitlab.com/gitlab-org/gitaly/v14/internal/praefect" + "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/config" + "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore" + "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore/glsql" + "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/repocleaner" + "gitlab.com/gitlab-org/labkit/correlation" + "google.golang.org/grpc/metadata" +) + +const ( + listUntrackedRepositoriesName = "list-untracked-repositories" +) + +var errNoConnectionToGitalies = errors.New("no connection established to gitaly nodes") + +type listUntrackedRepositories struct { + logger logrus.FieldLogger + delimiter string + out io.Writer +} + +func newListUntrackedRepositories(logger logrus.FieldLogger, out io.Writer) *listUntrackedRepositories { + return &listUntrackedRepositories{logger: logger, out: out} +} + +func (cmd *listUntrackedRepositories) FlagSet() *flag.FlagSet { + fs := flag.NewFlagSet(listUntrackedRepositoriesName, flag.ExitOnError) + fs.StringVar(&cmd.delimiter, "delimiter", "\n", "string used as a delimiter in output") + fs.Usage = func() { + _, _ = printfErr("Description:\n" + + " This command checks if all repositories on all gitaly nodes tracked by praefect.\n" + + " If repository is found on the disk, but it is not known to praefect the location of\n" + + " that repository will be written into stdout stream in JSON format.\n") + fs.PrintDefaults() + _, _ = printfErr("NOTE:\n" + + " All errors and log messages directed to the stderr stream.\n" + + " The output is produced as the new data appears, it doesn't wait\n" + + " for the completion of the processing to produce the result.\n") + } + return fs +} + +func (cmd listUntrackedRepositories) Exec(flags *flag.FlagSet, cfg config.Config) error { + if flags.NArg() > 0 { + return unexpectedPositionalArgsError{Command: flags.Name()} + } + + ctx := correlation.ContextWithCorrelation(context.Background(), correlation.SafeRandomID()) + ctx = metadata.AppendToOutgoingContext(ctx, "client_name", listUntrackedRepositoriesName) + + logger := cmd.logger.WithField("correlation_id", correlation.ExtractFromContext(ctx)) + logger.Debugf("starting %s command", cmd.FlagSet().Name()) + + logger.Debug("dialing to gitaly nodes...") + nodeSet, err := dialGitalyStorages(cfg) + if err != nil { + return fmt.Errorf("dial nodes: %w", err) + } + defer nodeSet.Close() + logger.Debug("connected to gitaly nodes") + + logger.Debug("connecting to praefect database...") + db, err := glsql.OpenDB(cfg.DB) + if err != nil { + return fmt.Errorf("connect to database: %w", err) + } + defer func() { _ = db.Close() }() + logger.Debug("connected to praefect database") + + walker := repocleaner.NewWalker(nodeSet.Connections(), 16) + reporter := reportUntrackedRepositories{ + ctx: ctx, + checker: datastore.NewStorageCleanup(db), + delimiter: cmd.delimiter, + out: cmd.out, + } + for _, vs := range cfg.VirtualStorages { + for _, node := range vs.Nodes { + logger.Debugf("check %q/%q storage repositories", vs.Name, node.Storage) + if err := walker.ExecOnRepositories(ctx, vs.Name, node.Storage, reporter.Report); err != nil { + return fmt.Errorf("exec on %q/%q: %w", vs.Name, node.Storage, err) + } + } + } + logger.Debug("completed") + return nil +} + +func dialGitalyStorages(cfg config.Config) (praefect.NodeSet, error) { + nodeSet := praefect.NodeSet{} + for _, vs := range cfg.VirtualStorages { + for _, node := range vs.Nodes { + conn, err := subCmdDial(context.Background(), node.Address, node.Token, defaultDialTimeout) + if err != nil { + return nil, fmt.Errorf("dial with %q gitaly at %q", node.Storage, node.Address) + } + if _, found := nodeSet[vs.Name]; !found { + nodeSet[vs.Name] = map[string]praefect.Node{} + } + nodeSet[vs.Name][node.Storage] = praefect.Node{ + Storage: node.Storage, + Address: node.Address, + Token: node.Token, + Connection: conn, + } + } + } + if len(nodeSet.Connections()) == 0 { + return nil, errNoConnectionToGitalies + } + return nodeSet, nil +} + +type reportUntrackedRepositories struct { + ctx context.Context + checker *datastore.StorageCleanup + out io.Writer + delimiter string +} + +// Report method accepts a list of repositories, checks if they exist in the praefect database +// and writes JSON serialized location of each untracked repository using the configured delimiter +// and writer. +func (r *reportUntrackedRepositories) Report(paths []datastore.RepositoryClusterPath) error { + if len(paths) == 0 { + return nil + } + + replicaRelPaths := make([]string, len(paths)) + for i, path := range paths { + replicaRelPaths[i] = path.RelativePath + } + + missing, err := r.checker.DoesntExist(r.ctx, paths[0].VirtualStorage, paths[0].Storage, replicaRelPaths) + if err != nil { + return fmt.Errorf("existence check: %w", err) + } + + for _, repoClusterPath := range missing { + d, err := serializeRepositoryClusterPath(repoClusterPath) + if err != nil { + return fmt.Errorf("serialize %+v: %w", repoClusterPath, err) + } + if _, err := r.out.Write(d); err != nil { + return fmt.Errorf("write serialized data to output: %w", err) + } + if _, err := r.out.Write([]byte(r.delimiter)); err != nil { + return fmt.Errorf("write serialized data to output: %w", err) + } + } + + return nil +} + +func serializeRepositoryClusterPath(path datastore.RepositoryClusterPath) ([]byte, error) { + return json.Marshal(map[string]string{ + "virtual_storage": path.VirtualStorage, + "storage": path.Storage, + "relative_path": path.RelativePath, + }) +} diff --git a/cmd/praefect/subcmd_list_untracked_repositories_test.go b/cmd/praefect/subcmd_list_untracked_repositories_test.go new file mode 100644 index 000000000..95cd2e072 --- /dev/null +++ b/cmd/praefect/subcmd_list_untracked_repositories_test.go @@ -0,0 +1,143 @@ +// +build postgres + +package main + +import ( + "bytes" + "context" + "flag" + "fmt" + "strings" + "testing" + "time" + + "github.com/prometheus/client_golang/prometheus" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "gitlab.com/gitlab-org/gitaly/v14/client" + "gitlab.com/gitlab-org/gitaly/v14/internal/bootstrap" + "gitlab.com/gitlab-org/gitaly/v14/internal/git/gittest" + "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/service/setup" + "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/config" + "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore/glsql" + "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper" + "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testcfg" + "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testserver" + "gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb" +) + +func TestListUntrackedRepositories_FlagSet(t *testing.T) { + cmd := &listUntrackedRepositories{} + for _, tc := range []struct { + desc string + args []string + exp []interface{} + }{ + { + desc: "custom value", + args: []string{"--delimiter", ","}, + exp: []interface{}{","}, + }, + { + desc: "default value", + args: nil, + exp: []interface{}{"\n"}, + }, + } { + t.Run(tc.desc, func(t *testing.T) { + fs := cmd.FlagSet() + require.NoError(t, fs.Parse(tc.args)) + require.ElementsMatch(t, tc.exp, []interface{}{cmd.delimiter}) + }) + } +} + +func TestListUntrackedRepositories_Exec(t *testing.T) { + g1Cfg := testcfg.Build(t, testcfg.WithStorages("gitaly-1")) + g2Cfg := testcfg.Build(t, testcfg.WithStorages("gitaly-2")) + + // Repositories not managed by praefect. + repo1, _, _ := gittest.InitBareRepoAt(t, g1Cfg, g1Cfg.Storages[0]) + repo2, _, _ := gittest.InitBareRepoAt(t, g1Cfg, g1Cfg.Storages[0]) + repo3, _, _ := gittest.InitBareRepoAt(t, g2Cfg, g2Cfg.Storages[0]) + + g1Addr := testserver.RunGitalyServer(t, g1Cfg, nil, setup.RegisterAll, testserver.WithDisablePraefect()) + g2Addr := testserver.RunGitalyServer(t, g2Cfg, nil, setup.RegisterAll, testserver.WithDisablePraefect()) + + conf := config.Config{ + SocketPath: testhelper.GetTemporaryGitalySocketFileName(t), + VirtualStorages: []*config.VirtualStorage{ + { + Name: "praefect", + Nodes: []*config.Node{ + {Storage: g1Cfg.Storages[0].Name, Address: g1Addr}, + {Storage: g2Cfg.Storages[0].Name, Address: g2Addr}, + }, + }, + }, + DB: glsql.GetDBConfig(t, "cmd_praefect"), + } + + db := getDB(t) + // Mark virtual storages as imported to prevent the background importer job from running. + db.MustExec( + t, + `INSERT INTO virtual_storages(virtual_storage, repositories_imported) VALUES ($1, true)`, + conf.VirtualStorages[0].Name, + ) + + starterConfigs, err := getStarterConfigs(conf) + require.NoError(t, err) + stopped := make(chan struct{}) + bootstrapper := bootstrap.NewNoop() + go func() { + defer close(stopped) + assert.NoError(t, run(starterConfigs, conf, bootstrapper, prometheus.NewRegistry())) + }() + + cc, err := client.Dial("unix://"+conf.SocketPath, nil) + require.NoError(t, err) + defer func() { require.NoError(t, cc.Close()) }() + repoClient := gitalypb.NewRepositoryServiceClient(cc) + + ctx, cancel := testhelper.Context() + defer cancel() + + praefectStorage := conf.VirtualStorages[0].Name + + // Repository managed by praefect, exists on gitaly-1 and gitaly-2. + createRepo(t, ctx, repoClient, praefectStorage, "path/to/test/repo") + out := &bytes.Buffer{} + cmd := newListUntrackedRepositories(testhelper.NewTestLogger(t), out) + require.NoError(t, cmd.Exec(flag.NewFlagSet("", flag.PanicOnError), conf)) + + exp := []string{ + fmt.Sprintf(`{"relative_path":%q,"storage":"gitaly-1","virtual_storage":"praefect"}`, repo1.RelativePath), + fmt.Sprintf(`{"relative_path":%q,"storage":"gitaly-1","virtual_storage":"praefect"}`, repo2.RelativePath), + fmt.Sprintf(`{"relative_path":%q,"storage":"gitaly-2","virtual_storage":"praefect"}`, repo3.RelativePath), + "", // an empty extra element required as each line ends with "delimiter" and strings.Split returns all parts + } + require.ElementsMatch(t, exp, strings.Split(out.String(), "\n")) + + bootstrapper.Terminate() + <-stopped +} + +func createRepo(t *testing.T, ctx context.Context, repoClient gitalypb.RepositoryServiceClient, storageName, relativePath string) *gitalypb.Repository { + t.Helper() + repo := &gitalypb.Repository{ + StorageName: storageName, + RelativePath: relativePath, + } + for i := 0; true; i++ { + _, err := repoClient.CreateRepository(ctx, &gitalypb.CreateRepositoryRequest{Repository: repo}) + if err != nil { + require.Regexp(t, "(no healthy nodes)|(no such file or directory)|(connection refused)", err.Error()) + require.Less(t, i, 100, "praefect doesn't serve for too long") + time.Sleep(50 * time.Millisecond) + } else { + break + } + } + return repo +} diff --git a/cmd/praefect/subcmd_pingnodes.go b/cmd/praefect/subcmd_pingnodes.go index 0b0873484..4bdf0a76e 100644 --- a/cmd/praefect/subcmd_pingnodes.go +++ b/cmd/praefect/subcmd_pingnodes.go @@ -88,7 +88,7 @@ func (s *dialNodesSubcommand) Exec(flags *flag.FlagSet, conf config.Config) erro } func (npr *nodePing) dial() (*grpc.ClientConn, error) { - return subCmdDial(npr.address, npr.token) + return subCmdDial(context.Background(), npr.address, npr.token, defaultDialTimeout) } func (npr *nodePing) healthCheck(cc *grpc.ClientConn) (grpc_health_v1.HealthCheckResponse_ServingStatus, error) { diff --git a/cmd/praefect/subcmd_reconcile.go b/cmd/praefect/subcmd_reconcile.go index 47917501f..8e21789b8 100644 --- a/cmd/praefect/subcmd_reconcile.go +++ b/cmd/praefect/subcmd_reconcile.go @@ -61,6 +61,8 @@ func getNodeAddress(cfg config.Config) (string, error) { return "unix:" + cfg.SocketPath, nil case cfg.ListenAddr != "": return "tcp://" + cfg.ListenAddr, nil + case cfg.TLSListenAddr != "": + return "tls://" + cfg.TLSListenAddr, nil default: return "", errors.New("no Praefect address configured") } @@ -76,7 +78,7 @@ func (nr nodeReconciler) reconcile() error { return err } - cc, err := subCmdDial(nodeAddr, nr.conf.Auth.Token) + cc, err := subCmdDial(context.Background(), nodeAddr, nr.conf.Auth.Token, defaultDialTimeout) if err != nil { return err } diff --git a/cmd/praefect/subcmd_remove_repository.go b/cmd/praefect/subcmd_remove_repository.go new file mode 100644 index 000000000..426f51e1a --- /dev/null +++ b/cmd/praefect/subcmd_remove_repository.go @@ -0,0 +1,221 @@ +package main + +import ( + "context" + "database/sql" + "errors" + "flag" + "fmt" + "strings" + "sync" + "time" + + "github.com/sirupsen/logrus" + "gitlab.com/gitlab-org/gitaly/v14/internal/helper" + "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/config" + "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore/glsql" + "gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb" + "gitlab.com/gitlab-org/labkit/correlation" + "google.golang.org/grpc/metadata" + "google.golang.org/grpc/status" +) + +const ( + removeRepositoryCmdName = "remove-repository" +) + +type removeRepository struct { + logger logrus.FieldLogger + virtualStorage string + relativePath string + timeout time.Duration +} + +func newRemoveRepository(logger logrus.FieldLogger) *removeRepository { + return &removeRepository{logger: logger, timeout: defaultDialTimeout} +} + +func (cmd *removeRepository) FlagSet() *flag.FlagSet { + fs := flag.NewFlagSet(removeRepositoryCmdName, flag.ExitOnError) + fs.StringVar(&cmd.virtualStorage, paramVirtualStorage, "", "name of the repository's virtual storage") + fs.StringVar(&cmd.relativePath, paramRelativePath, "", "relative path to the repository") + fs.Usage = func() { + _, _ = printfErr("Description:\n" + + " This command removes all state associated with a given repository from the Gitaly Cluster.\n" + + " This includes both on-disk repositories on all relevant Gitaly nodes as well as any potential\n" + + " database state as tracked by Praefect.\n") + fs.PrintDefaults() + _, _ = printfErr("NOTE:\n" + + " It may happen that parts of the repository continue to exist after this command, either because\n" + + " of an error that happened during deletion or because of in-flight RPC calls targeting the repository.\n" + + " It is safe and recommended to re-run this command in such a case.\n") + } + return fs +} + +func (cmd removeRepository) Exec(flags *flag.FlagSet, cfg config.Config) error { + switch { + case flags.NArg() > 0: + return unexpectedPositionalArgsError{Command: flags.Name()} + case cmd.virtualStorage == "": + return requiredParameterError(paramVirtualStorage) + case cmd.relativePath == "": + return requiredParameterError(paramRelativePath) + } + + db, err := glsql.OpenDB(cfg.DB) + if err != nil { + return fmt.Errorf("connect to database: %w", err) + } + defer func() { _ = db.Close() }() + + ctx := correlation.ContextWithCorrelation(context.Background(), correlation.SafeRandomID()) + logger := cmd.logger.WithField("correlation_id", correlation.ExtractFromContext(ctx)) + + return cmd.exec(ctx, logger, db, cfg) +} + +func (cmd *removeRepository) exec(ctx context.Context, logger logrus.FieldLogger, db *sql.DB, cfg config.Config) error { + // Remove repository explicitly from all storages and clean up database info. + // This prevents creation of the new replication events. + logger.WithFields(logrus.Fields{ + "virtual_storage": cmd.virtualStorage, + "relative_path": cmd.relativePath, + }).Debug("remove repository") + + addr, err := getNodeAddress(cfg) + if err != nil { + return fmt.Errorf("get praefect address from config: %w", err) + } + + logger.Debugf("remove repository info from praefect database %q", addr) + removed, err := cmd.removeRepositoryFromDatabase(ctx, db) + if err != nil { + return fmt.Errorf("remove repository info from praefect database: %w", err) + } + if !removed { + logger.Warn("praefect database has no info about the repository") + } + logger.Debug("removal of the repository info from praefect database completed") + + logger.Debug("remove replication events") + ticker := helper.NewTimerTicker(time.Second) + defer ticker.Stop() + if err := cmd.removeReplicationEvents(ctx, logger, db, ticker); err != nil { + return fmt.Errorf("remove scheduled replication events: %w", err) + } + logger.Debug("replication events removal completed") + // We should try to remove repository from each of gitaly nodes. + logger.Debug("remove repository directly by each gitaly node") + cmd.removeRepositoryForEachGitaly(ctx, cfg, logger) + logger.Debug("direct repository removal by each gitaly node completed") + + return nil +} + +func (cmd *removeRepository) removeRepositoryFromDatabase(ctx context.Context, db *sql.DB) (bool, error) { + var removed bool + if err := db.QueryRowContext( + ctx, + `WITH remove_storages_info AS ( + DELETE FROM storage_repositories + WHERE virtual_storage = $1 AND relative_path = $2 + ) + DELETE FROM repositories + WHERE virtual_storage = $1 AND relative_path = $2 + RETURNING TRUE`, + cmd.virtualStorage, + cmd.relativePath, + ).Scan(&removed); err != nil { + if errors.Is(err, sql.ErrNoRows) { + return false, nil + } + return false, fmt.Errorf("query row: %w", err) + } + return removed, nil +} + +func (cmd *removeRepository) removeRepository(ctx context.Context, repo *gitalypb.Repository, addr, token string) (bool, error) { + conn, err := subCmdDial(ctx, addr, token, cmd.timeout) + if err != nil { + return false, fmt.Errorf("error dialing: %w", err) + } + defer func() { _ = conn.Close() }() + + ctx = metadata.AppendToOutgoingContext(ctx, "client_name", removeRepositoryCmdName) + repositoryClient := gitalypb.NewRepositoryServiceClient(conn) + if _, err := repositoryClient.RemoveRepository(ctx, &gitalypb.RemoveRepositoryRequest{Repository: repo}); err != nil { + s, ok := status.FromError(err) + if !ok { + return false, fmt.Errorf("RemoveRepository: %w", err) + } + if !strings.Contains(s.Message(), fmt.Sprintf("get primary: repository %q/%q not found", cmd.virtualStorage, cmd.relativePath)) { + return false, fmt.Errorf("RemoveRepository: %w", err) + } + return false, nil + } + return true, nil +} + +func (cmd *removeRepository) removeReplicationEvents(ctx context.Context, logger logrus.FieldLogger, db *sql.DB, ticker helper.Ticker) error { + // Wait for the completion of the repository replication jobs. + // As some of them could be a repository creation jobs we need to remove those newly created + // repositories after replication finished. + start := time.Now() + for found := true; found; { + ticker.Reset() + <-ticker.C() + if int(time.Since(start).Seconds())%5 == 0 { + logger.Debug("awaiting for the repository in_progress replication jobs to complete...") + } + row := db.QueryRowContext( + ctx, + `WITH remove_replication_jobs AS ( + DELETE FROM replication_queue + WHERE job->>'virtual_storage' = $1 + AND job->>'relative_path' = $2 + -- Do not remove ongoing replication events as we need to wait + -- for their completion. + AND state != 'in_progress' + ) + SELECT EXISTS( + SELECT + FROM replication_queue + WHERE job->>'virtual_storage' = $1 + AND job->>'relative_path' = $2 + AND state = 'in_progress')`, + cmd.virtualStorage, + cmd.relativePath, + ) + if err := row.Scan(&found); err != nil { + return fmt.Errorf("scan in progress jobs: %w", err) + } + } + return nil +} + +func (cmd *removeRepository) removeRepositoryForEachGitaly(ctx context.Context, cfg config.Config, logger logrus.FieldLogger) { + for _, vs := range cfg.VirtualStorages { + if vs.Name == cmd.virtualStorage { + var wg sync.WaitGroup + for i := 0; i < len(vs.Nodes); i++ { + wg.Add(1) + go func(node *config.Node) { + defer wg.Done() + logger.Debugf("remove repository with gitaly %q at %q", node.Storage, node.Address) + repo := &gitalypb.Repository{ + StorageName: node.Storage, + RelativePath: cmd.relativePath, + } + _, err := cmd.removeRepository(ctx, repo, node.Address, node.Token) + if err != nil { + logger.WithError(err).Warnf("repository removal failed for gitaly %q", node.Storage) + } + logger.Debugf("repository removal call to gitaly %q completed", node.Storage) + }(vs.Nodes[i]) + } + wg.Wait() + break + } + } +} diff --git a/cmd/praefect/subcmd_remove_repository_test.go b/cmd/praefect/subcmd_remove_repository_test.go new file mode 100644 index 000000000..eb5d0ed2a --- /dev/null +++ b/cmd/praefect/subcmd_remove_repository_test.go @@ -0,0 +1,328 @@ +// +build postgres + +package main + +import ( + "flag" + "path/filepath" + "strings" + "testing" + "time" + + "github.com/prometheus/client_golang/prometheus" + "github.com/sirupsen/logrus" + "github.com/sirupsen/logrus/hooks/test" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "gitlab.com/gitlab-org/gitaly/v14/client" + "gitlab.com/gitlab-org/gitaly/v14/internal/bootstrap" + "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/service/setup" + "gitlab.com/gitlab-org/gitaly/v14/internal/helper" + "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/config" + "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore" + "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore/glsql" + "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper" + "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testcfg" + "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testserver" + "gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb" +) + +func TestRemoveRepository_FlagSet(t *testing.T) { + cmd := &removeRepository{} + fs := cmd.FlagSet() + require.NoError(t, fs.Parse([]string{"--virtual-storage", "vs", "--repository", "repo"})) + require.Equal(t, "vs", cmd.virtualStorage) + require.Equal(t, "repo", cmd.relativePath) +} + +func TestRemoveRepository_Exec_invalidArgs(t *testing.T) { + t.Run("not all flag values processed", func(t *testing.T) { + cmd := removeRepository{} + flagSet := flag.NewFlagSet("cmd", flag.PanicOnError) + require.NoError(t, flagSet.Parse([]string{"stub"})) + err := cmd.Exec(flagSet, config.Config{}) + require.EqualError(t, err, "cmd doesn't accept positional arguments") + }) + + t.Run("virtual-storage is not set", func(t *testing.T) { + cmd := removeRepository{} + err := cmd.Exec(flag.NewFlagSet("", flag.PanicOnError), config.Config{}) + require.EqualError(t, err, `"virtual-storage" is a required parameter`) + }) + + t.Run("repository is not set", func(t *testing.T) { + cmd := removeRepository{virtualStorage: "stub"} + err := cmd.Exec(flag.NewFlagSet("", flag.PanicOnError), config.Config{}) + require.EqualError(t, err, `"repository" is a required parameter`) + }) + + t.Run("db connection error", func(t *testing.T) { + cmd := removeRepository{virtualStorage: "stub", relativePath: "stub"} + cfg := config.Config{DB: config.DB{Host: "stub", SSLMode: "disable"}} + err := cmd.Exec(flag.NewFlagSet("", flag.PanicOnError), cfg) + require.Error(t, err) + require.Contains(t, err.Error(), "connect to database: dial tcp: lookup stub") + }) + + t.Run("praefect address is not set in config ", func(t *testing.T) { + cmd := removeRepository{virtualStorage: "stub", relativePath: "stub", logger: testhelper.NewTestLogger(t)} + dbConf := glsql.GetDBConfig(t, "cmd_praefect") + err := cmd.Exec(flag.NewFlagSet("", flag.PanicOnError), config.Config{DB: dbConf}) + require.EqualError(t, err, "get praefect address from config: no Praefect address configured") + }) +} + +func TestRemoveRepository_Exec(t *testing.T) { + g1Cfg := testcfg.Build(t, testcfg.WithStorages("gitaly-1")) + g2Cfg := testcfg.Build(t, testcfg.WithStorages("gitaly-2")) + + g1Addr := testserver.RunGitalyServer(t, g1Cfg, nil, setup.RegisterAll, testserver.WithDisablePraefect()) + g2Srv := testserver.StartGitalyServer(t, g2Cfg, nil, setup.RegisterAll, testserver.WithDisablePraefect()) + + db := getDB(t) + dbConf := glsql.GetDBConfig(t, "cmd_praefect") + + conf := config.Config{ + SocketPath: testhelper.GetTemporaryGitalySocketFileName(t), + VirtualStorages: []*config.VirtualStorage{ + { + Name: "praefect", + Nodes: []*config.Node{ + {Storage: g1Cfg.Storages[0].Name, Address: g1Addr}, + {Storage: g2Cfg.Storages[0].Name, Address: g2Srv.Address()}, + }, + }, + }, + DB: dbConf, + Failover: config.Failover{ + Enabled: true, + ElectionStrategy: config.ElectionStrategyPerRepository, + }, + } + + starterConfigs, err := getStarterConfigs(conf) + require.NoError(t, err) + stopped := make(chan struct{}) + bootstrapper := bootstrap.NewNoop() + go func() { + defer close(stopped) + assert.NoError(t, run(starterConfigs, conf, bootstrapper, prometheus.NewRegistry())) + }() + + cc, err := client.Dial("unix://"+conf.SocketPath, nil) + require.NoError(t, err) + defer func() { require.NoError(t, cc.Close()) }() + repoClient := gitalypb.NewRepositoryServiceClient(cc) + + ctx, cancel := testhelper.Context() + defer cancel() + + praefectStorage := conf.VirtualStorages[0].Name + + t.Run("ok", func(t *testing.T) { + repo := createRepo(t, ctx, repoClient, praefectStorage, t.Name()) + cmd := &removeRepository{ + logger: testhelper.NewTestLogger(t), + virtualStorage: repo.StorageName, + relativePath: repo.RelativePath, + timeout: defaultDialTimeout, + } + require.NoError(t, cmd.Exec(flag.NewFlagSet("", flag.PanicOnError), conf)) + + require.NoDirExists(t, filepath.Join(g1Cfg.Storages[0].Path, repo.RelativePath)) + require.NoDirExists(t, filepath.Join(g2Cfg.Storages[0].Path, repo.RelativePath)) + + var repositoryRowExists bool + require.NoError(t, db.QueryRow( + `SELECT EXISTS(SELECT FROM repositories WHERE virtual_storage = $1 AND relative_path = $2)`, + cmd.virtualStorage, cmd.relativePath, + ).Scan(&repositoryRowExists)) + require.False(t, repositoryRowExists) + }) + + t.Run("no info about repository on praefect", func(t *testing.T) { + repo := createRepo(t, ctx, repoClient, praefectStorage, t.Name()) + repoStore := datastore.NewPostgresRepositoryStore(db.DB, nil) + require.NoError(t, repoStore.DeleteRepository( + ctx, repo.StorageName, repo.RelativePath, g1Cfg.Storages[0].Name, + )) + require.NoError(t, repoStore.DeleteRepository( + ctx, repo.StorageName, repo.RelativePath, g2Cfg.Storages[0].Name, + )) + + logger := testhelper.NewTestLogger(t) + loggerHook := test.NewLocal(logger) + cmd := &removeRepository{ + logger: logrus.NewEntry(logger), + virtualStorage: praefectStorage, + relativePath: repo.RelativePath, + timeout: defaultDialTimeout, + } + require.NoError(t, cmd.Exec(flag.NewFlagSet("", flag.PanicOnError), conf)) + var found bool + for _, entry := range loggerHook.AllEntries() { + if strings.Contains(entry.Message, "praefect database has no info about the repository") { + found = true + } + } + require.True(t, found, "no expected message in the log") + + require.NoDirExists(t, filepath.Join(g1Cfg.Storages[0].Path, repo.RelativePath)) + require.NoDirExists(t, filepath.Join(g2Cfg.Storages[0].Path, repo.RelativePath)) + + requireNoDatabaseInfo(t, db, cmd) + }) + + t.Run("one of gitalies is out of service", func(t *testing.T) { + repo := createRepo(t, ctx, repoClient, praefectStorage, t.Name()) + g2Srv.Shutdown() + + logger := testhelper.NewTestLogger(t) + loggerHook := test.NewLocal(logger) + cmd := &removeRepository{ + logger: logrus.NewEntry(logger), + virtualStorage: praefectStorage, + relativePath: repo.RelativePath, + timeout: time.Second, + } + + for { + err := cmd.Exec(flag.NewFlagSet("", flag.PanicOnError), conf) + if err == nil { + break + } + regexp := "(transport: Error while dialing dial unix /" + strings.TrimPrefix(g2Srv.Address(), "unix:/") + ")|(primary gitaly is not healthy)" + require.Regexp(t, regexp, err.Error()) + time.Sleep(time.Second) + } + + var found bool + for _, entry := range loggerHook.AllEntries() { + if strings.Contains(entry.Message, `repository removal failed for gitaly "gitaly-2"`) { + found = true + break + } + } + require.True(t, found, "no expected message in the log") + + require.NoDirExists(t, filepath.Join(g1Cfg.Storages[0].Path, repo.RelativePath)) + require.DirExists(t, filepath.Join(g2Cfg.Storages[0].Path, repo.RelativePath)) + + requireNoDatabaseInfo(t, db, cmd) + }) + + bootstrapper.Terminate() + <-stopped +} + +func requireNoDatabaseInfo(t *testing.T, db glsql.DB, cmd *removeRepository) { + t.Helper() + var repositoryRowExists bool + require.NoError(t, db.QueryRow( + `SELECT EXISTS(SELECT FROM repositories WHERE virtual_storage = $1 AND relative_path = $2)`, + cmd.virtualStorage, cmd.relativePath, + ).Scan(&repositoryRowExists)) + require.False(t, repositoryRowExists) + var storageRowExists bool + require.NoError(t, db.QueryRow( + `SELECT EXISTS(SELECT FROM storage_repositories WHERE virtual_storage = $1 AND relative_path = $2)`, + cmd.virtualStorage, cmd.relativePath, + ).Scan(&storageRowExists)) + require.False(t, storageRowExists) +} + +func TestRemoveRepository_removeReplicationEvents(t *testing.T) { + const ( + virtualStorage = "praefect" + relativePath = "relative_path/to/repo.git" + ) + + ctx, cancel := testhelper.Context() + defer cancel() + + db := getDB(t) + + queue := datastore.NewPostgresReplicationEventQueue(db) + + // Set replication event in_progress. + inProgressEvent, err := queue.Enqueue(ctx, datastore.ReplicationEvent{ + Job: datastore.ReplicationJob{ + Change: datastore.CreateRepo, + VirtualStorage: virtualStorage, + TargetNodeStorage: "gitaly-2", + RelativePath: relativePath, + }, + }) + require.NoError(t, err) + inProgress1, err := queue.Dequeue(ctx, virtualStorage, "gitaly-2", 10) + require.NoError(t, err) + require.Len(t, inProgress1, 1) + + // New event - events in the 'ready' state should be removed. + _, err = queue.Enqueue(ctx, datastore.ReplicationEvent{ + Job: datastore.ReplicationJob{ + Change: datastore.UpdateRepo, + VirtualStorage: virtualStorage, + TargetNodeStorage: "gitaly-3", + SourceNodeStorage: "gitaly-1", + RelativePath: relativePath, + }, + }) + require.NoError(t, err) + + // Failed event - should be removed as well. + failedEvent, err := queue.Enqueue(ctx, datastore.ReplicationEvent{ + Job: datastore.ReplicationJob{ + Change: datastore.UpdateRepo, + VirtualStorage: virtualStorage, + TargetNodeStorage: "gitaly-4", + SourceNodeStorage: "gitaly-0", + RelativePath: relativePath, + }, + }) + require.NoError(t, err) + inProgress2, err := queue.Dequeue(ctx, virtualStorage, "gitaly-4", 10) + require.NoError(t, err) + require.Len(t, inProgress2, 1) + // Acknowledge with failed status, so it will remain in the database for the next processing + // attempt or until it is deleted by the 'removeReplicationEvents' method. + acks2, err := queue.Acknowledge(ctx, datastore.JobStateFailed, []uint64{inProgress2[0].ID}) + require.NoError(t, err) + require.Equal(t, []uint64{inProgress2[0].ID}, acks2) + + ticker := helper.NewTimerTicker(time.Millisecond) + defer ticker.Stop() + + errChan := make(chan error, 1) + go func() { + cmd := &removeRepository{virtualStorage: virtualStorage, relativePath: relativePath} + errChan <- cmd.removeReplicationEvents(ctx, testhelper.NewTestLogger(t), db.DB, ticker) + }() + go func() { + // We acknowledge in_progress job, so it unblocks the waiting loop. + acks, err := queue.Acknowledge(ctx, datastore.JobStateCompleted, []uint64{inProgressEvent.ID}) + if assert.NoError(t, err) { + assert.Equal(t, []uint64{inProgress1[0].ID}, acks) + } + }() + + for checkChan, exists := errChan, true; exists; { + select { + case err := <-checkChan: + require.NoError(t, err) + close(errChan) + checkChan = nil + default: + } + // Wait until job removed + row := db.QueryRow(`SELECT EXISTS(SELECT FROM replication_queue WHERE id = $1)`, failedEvent.ID) + require.NoError(t, row.Scan(&exists)) + } + // Once there are no in_progress jobs anymore the method returns. + require.NoError(t, <-errChan) + + var notExists bool + row := db.QueryRow(`SELECT NOT EXISTS(SELECT FROM replication_queue)`) + require.NoError(t, row.Scan(¬Exists)) + require.True(t, notExists) +} diff --git a/cmd/praefect/subcmd_set_replication_factor.go b/cmd/praefect/subcmd_set_replication_factor.go index ff2cc4562..60c29203e 100644 --- a/cmd/praefect/subcmd_set_replication_factor.go +++ b/cmd/praefect/subcmd_set_replication_factor.go @@ -48,7 +48,7 @@ func (cmd *setReplicationFactorSubcommand) Exec(flags *flag.FlagSet, cfg config. return err } - conn, err := subCmdDial(nodeAddr, cfg.Auth.Token) + conn, err := subCmdDial(context.Background(), nodeAddr, cfg.Auth.Token, defaultDialTimeout) if err != nil { return fmt.Errorf("error dialing: %w", err) } diff --git a/cmd/praefect/subcmd_track_repository.go b/cmd/praefect/subcmd_track_repository.go new file mode 100644 index 000000000..169bfcf16 --- /dev/null +++ b/cmd/praefect/subcmd_track_repository.go @@ -0,0 +1,224 @@ +package main + +import ( + "context" + "database/sql" + "errors" + "flag" + "fmt" + "math/rand" + "time" + + "github.com/sirupsen/logrus" + "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/config" + "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore" + "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore/glsql" + "gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb" + "gitlab.com/gitlab-org/labkit/correlation" + "google.golang.org/grpc/metadata" +) + +const ( + trackRepositoryCmdName = "track-repository" +) + +type trackRepository struct { + logger logrus.FieldLogger + virtualStorage string + relativePath string + authoritativeStorage string +} + +var errAuthoritativeRepositoryNotExist = errors.New("authoritative repository does not exist") + +func newTrackRepository(logger logrus.FieldLogger) *trackRepository { + return &trackRepository{logger: logger} +} + +func (cmd *trackRepository) FlagSet() *flag.FlagSet { + fs := flag.NewFlagSet(trackRepositoryCmdName, flag.ExitOnError) + fs.StringVar(&cmd.virtualStorage, paramVirtualStorage, "", "name of the repository's virtual storage") + fs.StringVar(&cmd.relativePath, paramRelativePath, "", "relative path to the repository") + fs.StringVar(&cmd.authoritativeStorage, paramAuthoritativeStorage, "", "storage with the repository to consider as authoritative") + fs.Usage = func() { + _, _ = printfErr("Description:\n" + + " This command adds a given repository to be tracked by Praefect.\n" + + " It checks if the repository exists on disk on the authoritative storage, " + + " and whether database records are absent from tracking the repository.") + fs.PrintDefaults() + } + return fs +} + +func (cmd trackRepository) Exec(flags *flag.FlagSet, cfg config.Config) error { + switch { + case flags.NArg() > 0: + return unexpectedPositionalArgsError{Command: flags.Name()} + case cmd.virtualStorage == "": + return requiredParameterError(paramVirtualStorage) + case cmd.relativePath == "": + return requiredParameterError(paramRelativePath) + case cmd.authoritativeStorage == "": + if cfg.Failover.ElectionStrategy == config.ElectionStrategyPerRepository { + return requiredParameterError(paramAuthoritativeStorage) + } + } + + db, err := glsql.OpenDB(cfg.DB) + if err != nil { + return fmt.Errorf("connect to database: %w", err) + } + defer func() { _ = db.Close() }() + + ctx := correlation.ContextWithCorrelation(context.Background(), correlation.SafeRandomID()) + logger := cmd.logger.WithField("correlation_id", correlation.ExtractFromContext(ctx)) + + return cmd.exec(ctx, logger, db, cfg) +} + +const trackRepoErrorPrefix = "attempting to track repository in praefect database" + +func (cmd *trackRepository) exec(ctx context.Context, logger logrus.FieldLogger, db *sql.DB, cfg config.Config) error { + logger.WithFields(logrus.Fields{ + "virtual_storage": cmd.virtualStorage, + "relative_path": cmd.relativePath, + "authoritative_storage": cmd.authoritativeStorage, + }).Debug("track repository") + + var primary string + var secondaries []string + var variableReplicationFactorEnabled, savePrimary bool + if cfg.Failover.ElectionStrategy == config.ElectionStrategyPerRepository { + savePrimary = true + primary = cmd.authoritativeStorage + + for _, vs := range cfg.VirtualStorages { + if vs.Name == cmd.virtualStorage { + for _, node := range vs.Nodes { + if node.Storage == cmd.authoritativeStorage { + continue + } + secondaries = append(secondaries, node.Storage) + } + } + } + + r := rand.New(rand.NewSource(time.Now().UnixNano())) + replicationFactor := cfg.DefaultReplicationFactors()[cmd.virtualStorage] + + if replicationFactor > 0 { + variableReplicationFactorEnabled = true + // Select random secondaries according to the default replication factor. + r.Shuffle(len(secondaries), func(i, j int) { + secondaries[i], secondaries[j] = secondaries[j], secondaries[i] + }) + + secondaries = secondaries[:replicationFactor-1] + } + } else { + savePrimary = false + if err := db.QueryRowContext(ctx, `SELECT node_name FROM shard_primaries WHERE shard_name = $1 AND demoted = 'false'`, cmd.virtualStorage).Scan(&primary); err != nil { + if errors.Is(err, sql.ErrNoRows) { + return fmt.Errorf("%s: no primaries found", trackRepoErrorPrefix) + } + return fmt.Errorf("%s: %w", trackRepoErrorPrefix, err) + } + } + + authoritativeRepoExists, err := cmd.authoritativeRepositoryExists(ctx, cfg, primary) + if err != nil { + return fmt.Errorf("%s: %w", trackRepoErrorPrefix, err) + } + + if !authoritativeRepoExists { + return fmt.Errorf("%s: %w", trackRepoErrorPrefix, errAuthoritativeRepositoryNotExist) + } + + if err := cmd.trackRepository( + ctx, + datastore.NewPostgresRepositoryStore(db, cfg.StorageNames()), + primary, + secondaries, + savePrimary, + variableReplicationFactorEnabled, + ); err != nil { + return fmt.Errorf("%s: %w", trackRepoErrorPrefix, err) + } + + logger.Debug("finished adding new repository to be tracked in praefect database.") + + return nil +} + +func (cmd *trackRepository) trackRepository( + ctx context.Context, + ds *datastore.PostgresRepositoryStore, + primary string, + secondaries []string, + savePrimary bool, + variableReplicationFactorEnabled bool, +) error { + if err := ds.CreateRepository( + ctx, + cmd.virtualStorage, + cmd.relativePath, + primary, + nil, + secondaries, + savePrimary, + variableReplicationFactorEnabled, + ); err != nil { + var repoExistsError datastore.RepositoryExistsError + if errors.As(err, &repoExistsError) { + cmd.logger.Print("repository is already tracked in praefect database") + return nil + } + + return fmt.Errorf("CreateRepository: %w", err) + } + + return nil +} + +func (cmd *trackRepository) repositoryExists(ctx context.Context, repo *gitalypb.Repository, addr, token string) (bool, error) { + conn, err := subCmdDial(ctx, addr, token, defaultDialTimeout) + if err != nil { + return false, fmt.Errorf("error dialing: %w", err) + } + defer func() { _ = conn.Close() }() + + ctx = metadata.AppendToOutgoingContext(ctx, "client_name", trackRepositoryCmdName) + repositoryClient := gitalypb.NewRepositoryServiceClient(conn) + res, err := repositoryClient.RepositoryExists(ctx, &gitalypb.RepositoryExistsRequest{Repository: repo}) + if err != nil { + return false, err + } + + return res.GetExists(), nil +} + +func (cmd *trackRepository) authoritativeRepositoryExists(ctx context.Context, cfg config.Config, nodeName string) (bool, error) { + for _, vs := range cfg.VirtualStorages { + if vs.Name != cmd.virtualStorage { + continue + } + + for _, node := range vs.Nodes { + if node.Storage == nodeName { + logger.Debugf("check if repository %q exists on gitaly %q at %q", cmd.relativePath, node.Storage, node.Address) + repo := &gitalypb.Repository{ + StorageName: node.Storage, + RelativePath: cmd.relativePath, + } + exists, err := cmd.repositoryExists(ctx, repo, node.Address, node.Token) + if err != nil { + logger.WithError(err).Warnf("checking if repository exists %q, %q", node.Storage, cmd.relativePath) + return false, nil + } + return exists, nil + } + } + return false, fmt.Errorf("node %q not found", cmd.authoritativeStorage) + } + return false, fmt.Errorf("virtual storage %q not found", cmd.virtualStorage) +} diff --git a/cmd/praefect/subcmd_track_repository_test.go b/cmd/praefect/subcmd_track_repository_test.go new file mode 100644 index 000000000..4dca21993 --- /dev/null +++ b/cmd/praefect/subcmd_track_repository_test.go @@ -0,0 +1,230 @@ +//go:build postgres +// +build postgres + +package main + +import ( + "errors" + "flag" + "fmt" + "path/filepath" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "gitlab.com/gitlab-org/gitaly/v14/client" + "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/service/setup" + "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/config" + "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore" + "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore/glsql" + "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/nodes" + "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/protoregistry" + "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper" + "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/promtest" + "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testcfg" + "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testserver" + "gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb" +) + +func TestTrackRepository_FlagSet(t *testing.T) { + cmd := &trackRepository{} + fs := cmd.FlagSet() + require.NoError(t, fs.Parse([]string{"--virtual-storage", "vs", "--repository", "repo", "--authoritative-storage", "storage-0"})) + require.Equal(t, "vs", cmd.virtualStorage) + require.Equal(t, "repo", cmd.relativePath) + require.Equal(t, "storage-0", cmd.authoritativeStorage) +} + +func TestTrackRepository_Exec_invalidArgs(t *testing.T) { + t.Run("not all flag values processed", func(t *testing.T) { + cmd := trackRepository{} + flagSet := flag.NewFlagSet("cmd", flag.PanicOnError) + require.NoError(t, flagSet.Parse([]string{"stub"})) + err := cmd.Exec(flagSet, config.Config{}) + require.EqualError(t, err, "cmd doesn't accept positional arguments") + }) + + t.Run("virtual-storage is not set", func(t *testing.T) { + cmd := trackRepository{} + err := cmd.Exec(flag.NewFlagSet("", flag.PanicOnError), config.Config{}) + require.EqualError(t, err, `"virtual-storage" is a required parameter`) + }) + + t.Run("repository is not set", func(t *testing.T) { + cmd := trackRepository{virtualStorage: "stub"} + err := cmd.Exec(flag.NewFlagSet("", flag.PanicOnError), config.Config{}) + require.EqualError(t, err, `"repository" is a required parameter`) + }) + + t.Run("authoritative-storage is not set", func(t *testing.T) { + cmd := trackRepository{virtualStorage: "stub", relativePath: "path/to/repo"} + err := cmd.Exec(flag.NewFlagSet("", flag.PanicOnError), config.Config{Failover: config.Failover{ElectionStrategy: config.ElectionStrategyPerRepository}}) + require.EqualError(t, err, `"authoritative-storage" is a required parameter`) + }) + + t.Run("db connection error", func(t *testing.T) { + cmd := trackRepository{virtualStorage: "stub", relativePath: "stub", authoritativeStorage: "storage-0"} + cfg := config.Config{DB: config.DB{Host: "stub", SSLMode: "disable"}} + err := cmd.Exec(flag.NewFlagSet("", flag.PanicOnError), cfg) + require.Error(t, err) + require.Contains(t, err.Error(), "connect to database: dial tcp: lookup stub") + }) +} + +func TestTrackRepository_Exec(t *testing.T) { + g1Cfg := testcfg.Build(t, testcfg.WithStorages("gitaly-1")) + g2Cfg := testcfg.Build(t, testcfg.WithStorages("gitaly-2")) + + g1Srv := testserver.StartGitalyServer(t, g1Cfg, nil, setup.RegisterAll, testserver.WithDisablePraefect()) + g2Srv := testserver.StartGitalyServer(t, g2Cfg, nil, setup.RegisterAll, testserver.WithDisablePraefect()) + defer g2Srv.Shutdown() + defer g1Srv.Shutdown() + + g1Addr := g1Srv.Address() + + db := getDB(t) + dbConf := glsql.GetDBConfig(t, "cmd_praefect") + + virtualStorageName := "praefect" + conf := config.Config{ + SocketPath: testhelper.GetTemporaryGitalySocketFileName(t), + VirtualStorages: []*config.VirtualStorage{ + { + Name: virtualStorageName, + Nodes: []*config.Node{ + {Storage: g1Cfg.Storages[0].Name, Address: g1Addr}, + {Storage: g2Cfg.Storages[0].Name, Address: g2Srv.Address()}, + }, + DefaultReplicationFactor: 2, + }, + }, + DB: dbConf, + } + + gitalyCC, err := client.Dial(g1Addr, nil) + require.NoError(t, err) + defer func() { require.NoError(t, gitalyCC.Close()) }() + ctx, cancel := testhelper.Context() + defer cancel() + + gitaly1RepositoryClient := gitalypb.NewRepositoryServiceClient(gitalyCC) + + createRepoThroughGitaly1 := func(relativePath string) error { + _, err := gitaly1RepositoryClient.CreateRepository( + ctx, + &gitalypb.CreateRepositoryRequest{ + Repository: &gitalypb.Repository{ + StorageName: g1Cfg.Storages[0].Name, + RelativePath: relativePath, + }, + }) + return err + } + + testCases := map[string]struct { + failoverConfig config.Failover + authoritativeStorage string + }{ + "sql election": { + failoverConfig: config.Failover{ + Enabled: true, + ElectionStrategy: config.ElectionStrategySQL, + }, + authoritativeStorage: "", + }, + "per repository election": { + failoverConfig: config.Failover{ + Enabled: true, + ElectionStrategy: config.ElectionStrategyPerRepository, + }, + authoritativeStorage: g1Cfg.Storages[0].Name, + }, + } + + logger := testhelper.NewTestLogger(t) + for tn, tc := range testCases { + t.Run(tn, func(t *testing.T) { + addCmdConf := conf + addCmdConf.Failover = tc.failoverConfig + + t.Run("ok", func(t *testing.T) { + nodeMgr, err := nodes.NewManager(testhelper.DiscardTestEntry(t), addCmdConf, db.DB, nil, promtest.NewMockHistogramVec(), protoregistry.GitalyProtoPreregistered, nil, nil) + require.NoError(t, err) + nodeMgr.Start(0, time.Hour) + + relativePath := fmt.Sprintf("path/to/test/repo_%s", tn) + repoDS := datastore.NewPostgresRepositoryStore(db, conf.StorageNames()) + + require.NoError(t, createRepoThroughGitaly1(relativePath)) + + rmRepoCmd := &removeRepository{ + logger: logger, + virtualStorage: virtualStorageName, + relativePath: relativePath, + } + + require.NoError(t, rmRepoCmd.Exec(flag.NewFlagSet("", flag.PanicOnError), conf)) + + // create the repo on Gitaly without Praefect knowing + require.NoError(t, createRepoThroughGitaly1(relativePath)) + require.DirExists(t, filepath.Join(g1Cfg.Storages[0].Path, relativePath)) + require.NoDirExists(t, filepath.Join(g2Cfg.Storages[0].Path, relativePath)) + + addRepoCmd := &trackRepository{ + logger: logger, + virtualStorage: virtualStorageName, + relativePath: relativePath, + authoritativeStorage: tc.authoritativeStorage, + } + + require.NoError(t, addRepoCmd.Exec(flag.NewFlagSet("", flag.PanicOnError), addCmdConf)) + as := datastore.NewAssignmentStore(db, conf.StorageNames()) + + assignments, err := as.GetHostAssignments(ctx, virtualStorageName, relativePath) + require.NoError(t, err) + require.Len(t, assignments, 2) + assert.Contains(t, assignments, g1Cfg.Storages[0].Name) + assert.Contains(t, assignments, g2Cfg.Storages[0].Name) + + exists, err := repoDS.RepositoryExists(ctx, virtualStorageName, relativePath) + require.NoError(t, err) + assert.True(t, exists) + }) + + t.Run("repository does not exist", func(t *testing.T) { + relativePath := fmt.Sprintf("path/to/test/repo_1_%s", tn) + + cmd := &trackRepository{ + logger: testhelper.NewTestLogger(t), + virtualStorage: "praefect", + relativePath: relativePath, + authoritativeStorage: tc.authoritativeStorage, + } + + assert.True(t, errors.Is(cmd.Exec(flag.NewFlagSet("", flag.PanicOnError), addCmdConf), errAuthoritativeRepositoryNotExist)) + }) + + t.Run("records already exist", func(t *testing.T) { + relativePath := fmt.Sprintf("path/to/test/repo_2_%s", tn) + + require.NoError(t, createRepoThroughGitaly1(relativePath)) + require.DirExists(t, filepath.Join(g1Cfg.Storages[0].Path, relativePath)) + require.NoDirExists(t, filepath.Join(g2Cfg.Storages[0].Path, relativePath)) + + ds := datastore.NewPostgresRepositoryStore(db, conf.StorageNames()) + require.NoError(t, err) + require.NoError(t, ds.CreateRepository(ctx, virtualStorageName, relativePath, g1Cfg.Storages[0].Name, nil, nil, true, true)) + + cmd := &trackRepository{ + logger: testhelper.NewTestLogger(t), + virtualStorage: virtualStorageName, + relativePath: relativePath, + authoritativeStorage: tc.authoritativeStorage, + } + + assert.NoError(t, cmd.Exec(flag.NewFlagSet("", flag.PanicOnError), addCmdConf)) + }) + }) + } +} diff --git a/internal/bootstrap/bootstrap.go b/internal/bootstrap/bootstrap.go index ef414184d..2d741e45b 100644 --- a/internal/bootstrap/bootstrap.go +++ b/internal/bootstrap/bootstrap.go @@ -22,11 +22,18 @@ const ( socketReusePortWarning = "Unable to set SO_REUSEPORT: zero downtime upgrades will not work" ) +// Listener is an interface of the bootstrap manager. +type Listener interface { + // RegisterStarter adds starter to the pool. + RegisterStarter(starter Starter) + // Start starts all registered starters to accept connections. + Start() error + // Wait terminates all registered starters. + Wait(gracefulTimeout time.Duration, stopAction func()) error +} + // Bootstrap handles graceful upgrades type Bootstrap struct { - // StopAction will be invoked during a graceful stop. It must wait until the shutdown is completed - StopAction func() - upgrader upgrader listenFunc ListenFunc errChan chan error @@ -151,7 +158,8 @@ func (b *Bootstrap) Start() error { // Wait will signal process readiness to the parent and than wait for an exit condition // SIGTERM, SIGINT and a runtime error will trigger an immediate shutdown // in case of an upgrade there will be a grace period to complete the ongoing requests -func (b *Bootstrap) Wait(gracefulTimeout time.Duration) error { +// stopAction will be invoked during a graceful stop. It must wait until the shutdown is completed. +func (b *Bootstrap) Wait(gracefulTimeout time.Duration, stopAction func()) error { signals := []os.Signal{syscall.SIGTERM, syscall.SIGINT} immediateShutdown := make(chan os.Signal, len(signals)) signal.Notify(immediateShutdown, signals...) @@ -167,7 +175,7 @@ func (b *Bootstrap) Wait(gracefulTimeout time.Duration) error { // the new process signaled its readiness and we started a graceful stop // however no further upgrades can be started until this process is running // we set a grace period and then we force a termination. - waitError := b.waitGracePeriod(gracefulTimeout, immediateShutdown) + waitError := b.waitGracePeriod(gracefulTimeout, immediateShutdown, stopAction) err = fmt.Errorf("graceful upgrade: %v", waitError) case s := <-immediateShutdown: @@ -178,13 +186,13 @@ func (b *Bootstrap) Wait(gracefulTimeout time.Duration) error { return err } -func (b *Bootstrap) waitGracePeriod(gracefulTimeout time.Duration, kill <-chan os.Signal) error { +func (b *Bootstrap) waitGracePeriod(gracefulTimeout time.Duration, kill <-chan os.Signal, stopAction func()) error { log.WithField("graceful_timeout", gracefulTimeout).Warn("starting grace period") allServersDone := make(chan struct{}) go func() { - if b.StopAction != nil { - b.StopAction() + if stopAction != nil { + stopAction() } close(allServersDone) }() @@ -208,3 +216,51 @@ func (b *Bootstrap) listen(network, path string) (net.Listener, error) { return b.listenFunc(network, path) } + +// Noop is a bootstrapper that does no additional configurations. +type Noop struct { + starters []Starter + shutdown chan struct{} + errChan chan error +} + +// NewNoop returns initialized instance of the *Noop. +func NewNoop() *Noop { + return &Noop{shutdown: make(chan struct{})} +} + +// RegisterStarter adds starter to the pool. +func (n *Noop) RegisterStarter(starter Starter) { + n.starters = append(n.starters, starter) +} + +// Start starts all registered starters to accept connections. +func (n *Noop) Start() error { + n.errChan = make(chan error, len(n.starters)) + + for _, start := range n.starters { + if err := start(net.Listen, n.errChan); err != nil { + return err + } + } + return nil +} + +// Wait terminates all registered starters. +func (n *Noop) Wait(_ time.Duration, stopAction func()) error { + select { + case <-n.shutdown: + if stopAction != nil { + stopAction() + } + case err := <-n.errChan: + return err + } + + return nil +} + +// Terminate unblocks Wait method and executes stopAction call-back passed into it. +func (n *Noop) Terminate() { + close(n.shutdown) +} diff --git a/internal/bootstrap/bootstrap_test.go b/internal/bootstrap/bootstrap_test.go index 6af4bade1..d0a1562ca 100644 --- a/internal/bootstrap/bootstrap_test.go +++ b/internal/bootstrap/bootstrap_test.go @@ -104,10 +104,13 @@ func waitWithTimeout(t *testing.T, waitCh <-chan error, timeout time.Duration) e } func TestImmediateTerminationOnSocketError(t *testing.T) { - b, server := makeBootstrap(t) + ctx, cancel := testhelper.Context() + defer cancel() + + b, server, stopAction := makeBootstrap(t, ctx) waitCh := make(chan error) - go func() { waitCh <- b.Wait(2 * time.Second) }() + go func() { waitCh <- b.Wait(2*time.Second, stopAction) }() require.NoError(t, server.listeners["tcp"].Close(), "Closing first listener") @@ -119,12 +122,15 @@ func TestImmediateTerminationOnSocketError(t *testing.T) { func TestImmediateTerminationOnSignal(t *testing.T) { for _, sig := range []syscall.Signal{syscall.SIGTERM, syscall.SIGINT} { t.Run(sig.String(), func(t *testing.T) { - b, server := makeBootstrap(t) + ctx, cancel := testhelper.Context() + defer cancel() + + b, server, stopAction := makeBootstrap(t, ctx) done := server.slowRequest(3 * time.Minute) waitCh := make(chan error) - go func() { waitCh <- b.Wait(2 * time.Second) }() + go func() { waitCh <- b.Wait(2*time.Second, stopAction) }() // make sure we are inside b.Wait() or we'll kill the test suite time.Sleep(100 * time.Millisecond) @@ -146,9 +152,12 @@ func TestImmediateTerminationOnSignal(t *testing.T) { } func TestGracefulTerminationStuck(t *testing.T) { - b, server := makeBootstrap(t) + ctx, cancel := testhelper.Context() + defer cancel() + + b, server, stopAction := makeBootstrap(t, ctx) - err := testGracefulUpdate(t, server, b, 3*time.Second, 2*time.Second, nil) + err := testGracefulUpdate(t, server, b, 3*time.Second, 2*time.Second, nil, stopAction) require.Contains(t, err.Error(), "grace period expired") } @@ -158,22 +167,27 @@ func TestGracefulTerminationWithSignals(t *testing.T) { for _, sig := range []syscall.Signal{syscall.SIGTERM, syscall.SIGINT} { t.Run(sig.String(), func(t *testing.T) { - b, server := makeBootstrap(t) + ctx, cancel := testhelper.Context() + defer cancel() + b, server, stopAction := makeBootstrap(t, ctx) err := testGracefulUpdate(t, server, b, 1*time.Second, 2*time.Second, func() { require.NoError(t, self.Signal(sig)) - }) + }, stopAction) require.Contains(t, err.Error(), "force shutdown") }) } } func TestGracefulTerminationServerErrors(t *testing.T) { - b, server := makeBootstrap(t) + ctx, cancel := testhelper.Context() + defer cancel() + + b, server, _ := makeBootstrap(t, ctx) done := make(chan error, 1) // This is a simulation of receiving a listener error during waitGracePeriod - b.StopAction = func() { + stopAction := func() { // we close the unix listener in order to test that the shutdown will not fail, but it keep waiting for the TCP request require.NoError(t, server.listeners["unix"].Close()) @@ -185,19 +199,21 @@ func TestGracefulTerminationServerErrors(t *testing.T) { require.NoError(t, server.server.Shutdown(context.Background())) } - err := testGracefulUpdate(t, server, b, 3*time.Second, 2*time.Second, nil) + err := testGracefulUpdate(t, server, b, 3*time.Second, 2*time.Second, nil, stopAction) require.Contains(t, err.Error(), "grace period expired") require.NoError(t, <-done) } func TestGracefulTermination(t *testing.T) { - b, server := makeBootstrap(t) + ctx, cancel := testhelper.Context() + defer cancel() + b, server, _ := makeBootstrap(t, ctx) // Using server.Close we bypass the graceful shutdown faking a completed shutdown - b.StopAction = func() { server.server.Close() } + stopAction := func() { server.server.Close() } - err := testGracefulUpdate(t, server, b, 1*time.Second, 2*time.Second, nil) + err := testGracefulUpdate(t, server, b, 1*time.Second, 2*time.Second, nil, stopAction) require.Contains(t, err.Error(), "completed") } @@ -219,9 +235,9 @@ func TestPortReuse(t *testing.T) { require.NoError(t, l.Close()) } -func testGracefulUpdate(t *testing.T, server *testServer, b *Bootstrap, waitTimeout, gracefulWait time.Duration, duringGracePeriodCallback func()) error { +func testGracefulUpdate(t *testing.T, server *testServer, b *Bootstrap, waitTimeout, gracefulWait time.Duration, duringGracePeriodCallback func(), stopAction func()) error { waitCh := make(chan error) - go func() { waitCh <- b.Wait(gracefulWait) }() + go func() { waitCh <- b.Wait(gracefulWait, stopAction) }() // Start a slow request to keep the old server from shutting down immediately. req := server.slowRequest(2 * gracefulWait) @@ -251,7 +267,7 @@ func testGracefulUpdate(t *testing.T, server *testServer, b *Bootstrap, waitTime return waitErr } -func makeBootstrap(t *testing.T) (*Bootstrap, *testServer) { +func makeBootstrap(t *testing.T, ctx context.Context) (*Bootstrap, *testServer, func()) { mux := http.NewServeMux() mux.HandleFunc("/", func(w http.ResponseWriter, _ *http.Request) { w.WriteHeader(200) @@ -271,8 +287,6 @@ func makeBootstrap(t *testing.T) (*Bootstrap, *testServer) { b, err := _new(u, net.Listen, false) require.NoError(t, err) - b.StopAction = func() { require.NoError(t, s.Shutdown(context.Background())) } - listeners := make(map[string]net.Listener) start := func(network, address string) Starter { return func(listen ListenFunc, errors chan<- error) error { @@ -312,7 +326,7 @@ func makeBootstrap(t *testing.T) (*Bootstrap, *testServer) { server: &s, listeners: listeners, url: url, - } + }, func() { require.NoError(t, s.Shutdown(context.Background())) } } func testAllListeners(t *testing.T, listeners map[string]net.Listener) { diff --git a/internal/praefect/datastore/storage_cleanup.go b/internal/praefect/datastore/storage_cleanup.go new file mode 100644 index 000000000..455ddef2b --- /dev/null +++ b/internal/praefect/datastore/storage_cleanup.go @@ -0,0 +1,85 @@ +package datastore + +import ( + "context" + "database/sql" + "fmt" + + "github.com/lib/pq" +) + +// RepositoryClusterPath identifies location of the repository in the cluster. +type RepositoryClusterPath struct { + ClusterPath + // RelativePath relative path to the repository on the disk. + RelativePath string +} + +// NewRepositoryClusterPath initializes and returns RepositoryClusterPath. +func NewRepositoryClusterPath(virtualStorage, storage, relativePath string) RepositoryClusterPath { + return RepositoryClusterPath{ + ClusterPath: ClusterPath{ + VirtualStorage: virtualStorage, + Storage: storage, + }, + RelativePath: relativePath, + } +} + +// ClusterPath represents path on the cluster to the storage. +type ClusterPath struct { + // VirtualStorage is the name of the virtual storage. + VirtualStorage string + // Storage is the name of the gitaly storage. + Storage string +} + +// NewStorageCleanup initialises and returns a new instance of the StorageCleanup. +func NewStorageCleanup(db *sql.DB) *StorageCleanup { + return &StorageCleanup{db: db} +} + +// StorageCleanup provides methods on the database for the repository cleanup operation. +type StorageCleanup struct { + db *sql.DB +} + +// DoesntExist returns RepositoryClusterPath for each repository that doesn't exist in the database +// by querying repositories and storage_repositories tables. +func (ss *StorageCleanup) DoesntExist(ctx context.Context, virtualStorage, storage string, relativePath []string) ([]RepositoryClusterPath, error) { + if len(relativePath) == 0 { + return nil, nil + } + + rows, err := ss.db.QueryContext( + ctx, + `SELECT $1 AS virtual_storage, $2 AS storage, UNNEST($3::TEXT[]) AS relative_path + EXCEPT ( + SELECT virtual_storage, storage, relative_path + FROM repositories + JOIN storage_repositories USING (virtual_storage, relative_path) + WHERE virtual_storage = $1 AND storage = $2 AND relative_path = ANY($3) + )`, + virtualStorage, storage, pq.StringArray(relativePath), + ) + if err != nil { + return nil, fmt.Errorf("query: %w", err) + } + defer func() { _ = rows.Close() }() + + var res []RepositoryClusterPath + for rows.Next() { + var curr RepositoryClusterPath + if err := rows.Scan(&curr.VirtualStorage, &curr.Storage, &curr.RelativePath); err != nil { + return nil, fmt.Errorf("scan: %w", err) + } + res = append(res, curr) + } + if err := rows.Err(); err != nil { + return nil, fmt.Errorf("loop: %w", err) + } + if err := rows.Close(); err != nil { + return nil, fmt.Errorf("close: %w", err) + } + return res, nil +} diff --git a/internal/praefect/datastore/storage_cleanup_test.go b/internal/praefect/datastore/storage_cleanup_test.go new file mode 100644 index 000000000..9d60c7bae --- /dev/null +++ b/internal/praefect/datastore/storage_cleanup_test.go @@ -0,0 +1,92 @@ +// +build postgres + +package datastore + +import ( + "testing" + + "github.com/stretchr/testify/require" + "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper" +) + +func TestStorageCleanup_Exists(t *testing.T) { + ctx, cancel := testhelper.Context() + defer cancel() + + db := getDB(t) + + repoStore := NewPostgresRepositoryStore(db.DB, nil) + require.NoError(t, repoStore.CreateRepository(ctx, "vs", "p/1", "g1", []string{"g2", "g3"}, nil, false, false)) + require.NoError(t, repoStore.CreateRepository(ctx, "vs", "p/2", "g1", []string{"g2", "g3"}, nil, false, false)) + storageCleanup := NewStorageCleanup(db.DB) + + for _, tc := range []struct { + desc string + virtualStorage string + storage string + relativeReplicaPaths []string + out []RepositoryClusterPath + }{ + { + desc: "multiple doesn't exist", + virtualStorage: "vs", + storage: "g1", + relativeReplicaPaths: []string{"p/1", "p/2", "path/x", "path/y"}, + out: []RepositoryClusterPath{ + NewRepositoryClusterPath("vs", "g1", "path/x"), + NewRepositoryClusterPath("vs", "g1", "path/y"), + }, + }, + { + desc: "duplicates", + virtualStorage: "vs", + storage: "g1", + relativeReplicaPaths: []string{"p/1", "path/x", "path/x"}, + out: []RepositoryClusterPath{ + NewRepositoryClusterPath("vs", "g1", "path/x"), + }, + }, + { + desc: "all exist", + virtualStorage: "vs", + storage: "g1", + relativeReplicaPaths: []string{"p/1", "p/2"}, + out: nil, + }, + { + desc: "all doesn't exist", + virtualStorage: "vs", + storage: "g1", + relativeReplicaPaths: []string{"path/x", "path/y", "path/z"}, + out: []RepositoryClusterPath{ + NewRepositoryClusterPath("vs", "g1", "path/x"), + NewRepositoryClusterPath("vs", "g1", "path/y"), + NewRepositoryClusterPath("vs", "g1", "path/z"), + }, + }, + { + desc: "doesn't exist because of storage", + virtualStorage: "vs", + storage: "stub", + relativeReplicaPaths: []string{"path/x"}, + out: []RepositoryClusterPath{ + NewRepositoryClusterPath("vs", "stub", "path/x"), + }, + }, + { + desc: "doesn't exist because of virtual storage", + virtualStorage: "stub", + storage: "g1", + relativeReplicaPaths: []string{"path/x"}, + out: []RepositoryClusterPath{ + NewRepositoryClusterPath("stub", "g1", "path/x"), + }, + }, + } { + t.Run(tc.desc, func(t *testing.T) { + res, err := storageCleanup.DoesntExist(ctx, tc.virtualStorage, tc.storage, tc.relativeReplicaPaths) + require.NoError(t, err) + require.ElementsMatch(t, tc.out, res) + }) + } +} diff --git a/internal/praefect/metrics/prometheus.go b/internal/praefect/metrics/prometheus.go index 64e4eee52..8e98bd84b 100644 --- a/internal/praefect/metrics/prometheus.go +++ b/internal/praefect/metrics/prometheus.go @@ -9,7 +9,7 @@ import ( // RegisterReplicationDelay creates and registers a prometheus histogram // to observe replication delay times -func RegisterReplicationDelay(conf promconfig.Config) (metrics.HistogramVec, error) { +func RegisterReplicationDelay(conf promconfig.Config, registerer prometheus.Registerer) (metrics.HistogramVec, error) { replicationDelay := prometheus.NewHistogramVec( prometheus.HistogramOpts{ Namespace: "gitaly", @@ -20,12 +20,12 @@ func RegisterReplicationDelay(conf promconfig.Config) (metrics.HistogramVec, err []string{"type"}, ) - return replicationDelay, prometheus.Register(replicationDelay) + return replicationDelay, registerer.Register(replicationDelay) } // RegisterReplicationLatency creates and registers a prometheus histogram // to observe replication latency times -func RegisterReplicationLatency(conf promconfig.Config) (metrics.HistogramVec, error) { +func RegisterReplicationLatency(conf promconfig.Config, registerer prometheus.Registerer) (metrics.HistogramVec, error) { replicationLatency := prometheus.NewHistogramVec( prometheus.HistogramOpts{ Namespace: "gitaly", @@ -36,12 +36,12 @@ func RegisterReplicationLatency(conf promconfig.Config) (metrics.HistogramVec, e []string{"type"}, ) - return replicationLatency, prometheus.Register(replicationLatency) + return replicationLatency, registerer.Register(replicationLatency) } // RegisterNodeLatency creates and registers a prometheus histogram to // observe internal node latency -func RegisterNodeLatency(conf promconfig.Config) (metrics.HistogramVec, error) { +func RegisterNodeLatency(conf promconfig.Config, registerer prometheus.Registerer) (metrics.HistogramVec, error) { nodeLatency := prometheus.NewHistogramVec( prometheus.HistogramOpts{ Namespace: "gitaly", @@ -51,7 +51,7 @@ func RegisterNodeLatency(conf promconfig.Config) (metrics.HistogramVec, error) { }, []string{"gitaly_storage"}, ) - return nodeLatency, prometheus.Register(nodeLatency) + return nodeLatency, registerer.Register(nodeLatency) } var MethodTypeCounter = promauto.NewCounterVec( diff --git a/internal/praefect/repocleaner/init_test.go b/internal/praefect/repocleaner/init_test.go new file mode 100644 index 000000000..0b99e6d32 --- /dev/null +++ b/internal/praefect/repocleaner/init_test.go @@ -0,0 +1,21 @@ +package repocleaner + +import ( + "os" + "testing" + + "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper" +) + +func TestMain(m *testing.M) { + os.Exit(testMain(m)) +} + +func testMain(m *testing.M) int { + defer testhelper.MustHaveNoChildProcess() + + cleanup := testhelper.Configure() + defer cleanup() + + return m.Run() +} diff --git a/internal/praefect/repocleaner/repository.go b/internal/praefect/repocleaner/repository.go new file mode 100644 index 000000000..86ea69bd1 --- /dev/null +++ b/internal/praefect/repocleaner/repository.go @@ -0,0 +1,77 @@ +package repocleaner + +import ( + "context" + "errors" + "fmt" + "io" + + "gitlab.com/gitlab-org/gitaly/v14/internal/praefect" + "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore" + "gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb" +) + +// Walker allows iterating over the repositories of the gitaly storage. +type Walker struct { + conns praefect.Connections + batchSize int +} + +// NewWalker returns a new *Walker instance. +func NewWalker(conns praefect.Connections, batchSize int) *Walker { + return &Walker{conns: conns, batchSize: batchSize} +} + +// ExecOnRepositories runs through all the repositories on a Gitaly storage and executes the provided action. +// The processing is done in batches to reduce cost of operations. +func (wr *Walker) ExecOnRepositories(ctx context.Context, virtualStorage, storage string, action func([]datastore.RepositoryClusterPath) error) error { + gclient, err := wr.getInternalGitalyClient(virtualStorage, storage) + if err != nil { + return fmt.Errorf("setup gitaly client: %w", err) + } + + resp, err := gclient.WalkRepos(ctx, &gitalypb.WalkReposRequest{StorageName: storage}) + if err != nil { + return fmt.Errorf("unable to walk repos: %w", err) + } + + batch := make([]datastore.RepositoryClusterPath, 0, wr.batchSize) + for { + res, err := resp.Recv() + if err != nil { + if !errors.Is(err, io.EOF) { + return fmt.Errorf("failure on walking repos: %w", err) + } + break + } + + batch = append(batch, datastore.RepositoryClusterPath{ + ClusterPath: datastore.ClusterPath{ + VirtualStorage: virtualStorage, + Storage: storage, + }, + RelativePath: res.RelativePath, + }) + + if len(batch) == cap(batch) { + if err := action(batch); err != nil { + return err + } + batch = batch[:0] + } + } + if len(batch) > 0 { + if err := action(batch); err != nil { + return err + } + } + return nil +} + +func (wr *Walker) getInternalGitalyClient(virtualStorage, storage string) (gitalypb.InternalGitalyClient, error) { + conn, found := wr.conns[virtualStorage][storage] + if !found { + return nil, fmt.Errorf("no connection to the gitaly node %q/%q", virtualStorage, storage) + } + return gitalypb.NewInternalGitalyClient(conn), nil +} diff --git a/internal/praefect/repocleaner/repository_test.go b/internal/praefect/repocleaner/repository_test.go new file mode 100644 index 000000000..6cb3727d4 --- /dev/null +++ b/internal/praefect/repocleaner/repository_test.go @@ -0,0 +1,114 @@ +package repocleaner + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "gitlab.com/gitlab-org/gitaly/v14/internal/backchannel" + "gitlab.com/gitlab-org/gitaly/v14/internal/git/gittest" + "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/service/setup" + "gitlab.com/gitlab-org/gitaly/v14/internal/praefect" + "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/config" + "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore" + "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/protoregistry" + "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/service/transaction" + "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper" + "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testcfg" + "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testserver" +) + +func TestWalker_ExecOnRepositories(t *testing.T) { + const ( + repo1RelPath = "repo-1.git" + repo2RelPath = "repo-2.git" + repo3RelPath = "repo-3.git" + + storage1 = "gitaly-1" + + virtualStorage = "praefect" + ) + + gCfg := testcfg.Build(t, testcfg.WithStorages(storage1)) + gAddr := testserver.RunGitalyServer(t, gCfg, nil, setup.RegisterAll, testserver.WithDisablePraefect()) + + conf := config.Config{ + SocketPath: testhelper.GetTemporaryGitalySocketFileName(t), + VirtualStorages: []*config.VirtualStorage{ + { + Name: virtualStorage, + Nodes: []*config.Node{ + {Storage: gCfg.Storages[0].Name, Address: gAddr}, + }, + }, + }, + } + + gittest.CloneRepoAtStorage(t, gCfg, gCfg.Storages[0], repo1RelPath) + gittest.CloneRepoAtStorage(t, gCfg, gCfg.Storages[0], repo2RelPath) + gittest.CloneRepoAtStorage(t, gCfg, gCfg.Storages[0], repo3RelPath) + + ctx, cancel := testhelper.Context() + defer cancel() + + entry := testhelper.NewTestLogger(t).WithContext(ctx) + clientHandshaker := backchannel.NewClientHandshaker(entry, praefect.NewBackchannelServerFactory(entry, transaction.NewServer(nil))) + nodeSet, err := praefect.DialNodes(ctx, conf.VirtualStorages, protoregistry.GitalyProtoPreregistered, nil, clientHandshaker) + require.NoError(t, err) + defer nodeSet.Close() + + for _, tc := range []struct { + desc string + batchSize int + exp [][]datastore.RepositoryClusterPath + expErr error + }{ + { + desc: "multiple batches", + batchSize: 2, + exp: [][]datastore.RepositoryClusterPath{ + { + {ClusterPath: datastore.ClusterPath{VirtualStorage: virtualStorage, Storage: storage1}, RelativePath: repo1RelPath}, + {ClusterPath: datastore.ClusterPath{VirtualStorage: virtualStorage, Storage: storage1}, RelativePath: repo2RelPath}, + }, + { + {ClusterPath: datastore.ClusterPath{VirtualStorage: virtualStorage, Storage: storage1}, RelativePath: repo3RelPath}, + }, + }, + }, + { + desc: "single batch", + batchSize: 10, + exp: [][]datastore.RepositoryClusterPath{ + { + {ClusterPath: datastore.ClusterPath{VirtualStorage: virtualStorage, Storage: storage1}, RelativePath: repo1RelPath}, + {ClusterPath: datastore.ClusterPath{VirtualStorage: virtualStorage, Storage: storage1}, RelativePath: repo2RelPath}, + {ClusterPath: datastore.ClusterPath{VirtualStorage: virtualStorage, Storage: storage1}, RelativePath: repo3RelPath}, + }, + }, + }, + { + desc: "terminates on error", + batchSize: 1, + exp: [][]datastore.RepositoryClusterPath{ + { + {ClusterPath: datastore.ClusterPath{VirtualStorage: virtualStorage, Storage: storage1}, RelativePath: repo1RelPath}, + }, + }, + expErr: assert.AnError, + }, + } { + t.Run(tc.desc, func(t *testing.T) { + walker := NewWalker(nodeSet.Connections(), tc.batchSize) + var iteration int + err = walker.ExecOnRepositories(ctx, conf.VirtualStorages[0].Name, storage1, func(paths []datastore.RepositoryClusterPath) error { + require.Less(t, iteration, len(tc.exp)) + expected := tc.exp[iteration] + iteration++ + assert.ElementsMatch(t, paths, expected) + return tc.expErr + }) + require.Equal(t, tc.expErr, err) + }) + } +} |