diff options
author | Toon Claes <toon@gitlab.com> | 2023-04-17 15:22:59 +0300 |
---|---|---|
committer | Toon Claes <toon@gitlab.com> | 2023-04-17 15:22:59 +0300 |
commit | 978f7d2b58620b95cba50b77885f9a8d261da752 (patch) | |
tree | fca7b7cab6d79402b016c04294b16e1c060abfbc | |
parent | 748f7b786b7eb29221c4ac7113c7adfce7674ec2 (diff) | |
parent | 86652606c7a5a0df0026969c07846a8b5d271193 (diff) |
Merge branch 'ps-praefect-cli-upgrade' into 'master'
praefect: Start switch to the new cli implementation
See merge request https://gitlab.com/gitlab-org/gitaly/-/merge_requests/5616
Merged-by: Toon Claes <toon@gitlab.com>
Approved-by: John Cai <jcai@gitlab.com>
Approved-by: Toon Claes <toon@gitlab.com>
Reviewed-by: Toon Claes <toon@gitlab.com>
Reviewed-by: Pavlo Strokov <pstrokov@gitlab.com>
Co-authored-by: Pavlo Strokov <pstrokov@gitlab.com>
-rw-r--r-- | cmd/praefect/main.go | 11 | ||||
-rw-r--r-- | internal/cli/praefect/main.go | 660 | ||||
-rw-r--r-- | internal/cli/praefect/main_test.go | 4 | ||||
-rw-r--r-- | internal/cli/praefect/serve.go | 606 | ||||
-rw-r--r-- | internal/cli/praefect/subcmd_configuration.go | 15 | ||||
-rw-r--r-- | internal/cli/praefect/subcmd_configuration_validate.go | 34 | ||||
-rw-r--r-- | internal/cli/praefect/subcmd_configuration_validate_test.go | 22 |
7 files changed, 742 insertions, 610 deletions
diff --git a/cmd/praefect/main.go b/cmd/praefect/main.go index 027aca865..cc7e88b93 100644 --- a/cmd/praefect/main.go +++ b/cmd/praefect/main.go @@ -1,7 +1,14 @@ package main -import "gitlab.com/gitlab-org/gitaly/v15/internal/cli/praefect" +import ( + "log" + "os" + + cli "gitlab.com/gitlab-org/gitaly/v15/internal/cli/praefect" +) func main() { - praefect.Main() + if err := cli.NewApp().Run(os.Args); err != nil { + log.Fatal(err) + } } diff --git a/internal/cli/praefect/main.go b/internal/cli/praefect/main.go index 2fa7b5822..c142713d6 100644 --- a/internal/cli/praefect/main.go +++ b/internal/cli/praefect/main.go @@ -57,614 +57,92 @@ package praefect import ( - "context" - "database/sql" - "errors" - "flag" "fmt" - "math/rand" - "net/http" + "log" "os" - "runtime/debug" "sort" "strings" - "time" - "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/client_golang/prometheus/promauto" - "github.com/prometheus/client_golang/prometheus/promhttp" - "github.com/sirupsen/logrus" - "gitlab.com/gitlab-org/gitaly/v15/internal/backchannel" - "gitlab.com/gitlab-org/gitaly/v15/internal/bootstrap" - "gitlab.com/gitlab-org/gitaly/v15/internal/bootstrap/starter" - "gitlab.com/gitlab-org/gitaly/v15/internal/gitaly/config/sentry" - "gitlab.com/gitlab-org/gitaly/v15/internal/helper" - "gitlab.com/gitlab-org/gitaly/v15/internal/log" - "gitlab.com/gitlab-org/gitaly/v15/internal/praefect" - "gitlab.com/gitlab-org/gitaly/v15/internal/praefect/config" - "gitlab.com/gitlab-org/gitaly/v15/internal/praefect/datastore" - "gitlab.com/gitlab-org/gitaly/v15/internal/praefect/datastore/glsql" - "gitlab.com/gitlab-org/gitaly/v15/internal/praefect/metrics" - "gitlab.com/gitlab-org/gitaly/v15/internal/praefect/nodes" - "gitlab.com/gitlab-org/gitaly/v15/internal/praefect/nodes/tracker" - "gitlab.com/gitlab-org/gitaly/v15/internal/praefect/protoregistry" - "gitlab.com/gitlab-org/gitaly/v15/internal/praefect/reconciler" - "gitlab.com/gitlab-org/gitaly/v15/internal/praefect/repocleaner" - "gitlab.com/gitlab-org/gitaly/v15/internal/praefect/service" - "gitlab.com/gitlab-org/gitaly/v15/internal/praefect/service/transaction" - "gitlab.com/gitlab-org/gitaly/v15/internal/praefect/transactions" - "gitlab.com/gitlab-org/gitaly/v15/internal/sidechannel" + "github.com/urfave/cli/v2" "gitlab.com/gitlab-org/gitaly/v15/internal/version" - "gitlab.com/gitlab-org/labkit/monitoring" - "gitlab.com/gitlab-org/labkit/tracing" ) -var ( - flagConfig = flag.String("config", "", "Location for the config.toml") - flagVersion = flag.Bool("version", false, "Print version and exit") - - errNoConfigFile = errors.New("the config flag must be passed") -) - -const progname = "praefect" - -// Main is an entry point of the praefect binary. -func Main() { - logger := log.Default() - flag.Usage = func() { - cmds := []string{configurationCmdName} - for k := range subcommands(logger) { - cmds = append(cmds, k) - } - sort.Strings(cmds) - - printfErr("Usage of %s:\n", progname) - flag.PrintDefaults() - printfErr(" subcommand (optional)\n") - printfErr("\tOne of %s\n", strings.Join(cmds, ", ")) - } - flag.Parse() - - // If invoked with -version - if *flagVersion { - fmt.Println(version.GetVersionString("Praefect")) - os.Exit(0) - } - - args := flag.Args() - // The configuration sub-command differs from all other sub-commands because it doesn't - // require a valid configuration to run. It expects configuration to be provided - // on STDIN instead of file. - if len(args) > 0 && args[0] == configurationCmdName { - if len(args) == 2 && args[1] == validateCmdName { - os.Exit(validateConfiguration(os.Stdin, os.Stdout, os.Stderr)) - } - flag.Usage() - os.Exit(1) - } - - conf, err := initConfig(logger) - if err != nil { - printfErr("%s: configuration error: %v\n", progname, err) - os.Exit(1) - } - - conf.ConfigureLogger() - - if len(args) > 0 { - os.Exit(subCommand(conf, logger, args[0], args[1:])) - } - - configure(conf) - - logger.WithField("version", version.GetVersionString("Praefect")).Info("Starting " + progname) - - starterConfigs, err := getStarterConfigs(conf) - if err != nil { - logger.Fatalf("%s", err) - } - - promreg := prometheus.DefaultRegisterer - b, err := bootstrap.New(promauto.With(promreg).NewCounterVec( - prometheus.CounterOpts{ - Name: "gitaly_praefect_connections_total", - Help: "Total number of connections to Praefect", - }, - []string{"type"}, - )) - if err != nil { - logger.Fatalf("unable to create a bootstrap: %v", err) - } - - dbPromRegistry := prometheus.NewRegistry() - - if err := run(starterConfigs, conf, logger, b, promreg, dbPromRegistry); err != nil { - logger.Fatalf("%v", err) +func init() { + // Override the version printer so the output format matches what Praefect + // used before the introduction of the CLI toolkit. + cli.VersionPrinter = func(ctx *cli.Context) { + fmt.Fprintln(ctx.App.Writer, version.GetVersionString("Praefect")) } } -func initConfig(logger *logrus.Entry) (config.Config, error) { - var conf config.Config +const ( + progname = "praefect" - if *flagConfig == "" { - return conf, errNoConfigFile - } - - conf, err := config.FromFile(*flagConfig) - if err != nil { - return conf, fmt.Errorf("error reading config file: %w", err) - } - - if err := conf.Validate(); err != nil { - return config.Config{}, err - } - - if !conf.AllowLegacyElectors { - conf.Failover.ElectionStrategy = config.ElectionStrategyPerRepository - } + configFlagName = "config" +) - if !conf.Failover.Enabled && conf.Failover.ElectionStrategy != "" { - logger.WithField("election_strategy", conf.Failover.ElectionStrategy).Warn( - "ignoring configured election strategy as failover is disabled") +// NewApp returns a new praefect app. +func NewApp() *cli.App { + return &cli.App{ + Name: progname, + Usage: "a gitaly proxy", + Version: version.GetVersionString("Praefect"), + // serveAction is also here in the root to keep the CLI backwards compatible with + // the previous way to launch Praefect with just `praefect -config FILE`. + // We may want to deprecate this eventually. + // + // The 'DefaultCommand: "serve"' setting can't be used here because it won't be + // possible to invoke sub-command not yet registered. + Action: serveAction, + Commands: []*cli.Command{ + newServeCommand(), + newConfigurationCommand(), + }, + Flags: []cli.Flag{ + &cli.StringFlag{ + // We can't mark it required, because it is not for all sub-commands. + // We need it as it is used by majority of the sub-commands and + // because of the existing format of commands invocation. + Name: configFlagName, + Usage: "load configuration from `FILE`", + }, + }, + CustomAppHelpTemplate: helpTextTemplate(), } - - return conf, nil } -func configure(conf config.Config) { - tracing.Initialize(tracing.WithServiceName(progname)) - - if conf.PrometheusListenAddr != "" { - conf.Prometheus.Configure() - } - - sentry.ConfigureSentry(version.GetVersion(), conf.Sentry) +// mustProvideConfigFlag extracts value of the 'config' flag and returns it. +// If flag is not set the help for the command will be printed and terminated with exit code 2. +func mustProvideConfigFlag(ctx *cli.Context, command string) string { + pathToConfigFile := ctx.String(configFlagName) + if pathToConfigFile == "" { + // We can't make 'config' flag required for all commands, but we still want the + // same output to be printed if it is not provided. + // It should be removed after migration to the `praefect CMD -config FILE` + // where we can mark it as required for each sub-command. + _ = cli.ShowCommandHelp(ctx, command) + log.Printf("Required flag %q not set\n", configFlagName) + os.Exit(2) + } + + return pathToConfigFile } -func run( - cfgs []starter.Config, - conf config.Config, - logger *logrus.Entry, - b bootstrap.Listener, - promreg prometheus.Registerer, - dbPromRegistry interface { - prometheus.Registerer - prometheus.Gatherer - }, -) error { - nodeLatencyHistogram, err := metrics.RegisterNodeLatency(conf.Prometheus, promreg) - if err != nil { - return err - } - - delayMetric, err := metrics.RegisterReplicationDelay(conf.Prometheus, promreg) - if err != nil { - return err - } - - latencyMetric, err := metrics.RegisterReplicationLatency(conf.Prometheus, promreg) - if err != nil { - return err - } - - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - var db *sql.DB - if conf.NeedsSQL() { - logger.Infof("establishing database connection to %s:%d ...", conf.DB.Host, conf.DB.Port) - dbConn, closedb, err := initDatabase(ctx, logger, conf) - if err != nil { - return err - } - defer closedb() - db = dbConn - logger.Info("database connection established") - } - - var queue datastore.ReplicationEventQueue - var rs datastore.RepositoryStore - var csg datastore.ConsistentStoragesGetter - var metricsCollectors []prometheus.Collector - - if conf.MemoryQueueEnabled { - queue = datastore.NewMemoryReplicationEventQueue(conf) - rs = datastore.MockRepositoryStore{} - csg = rs - logger.Info("reads distribution caching is disabled for in memory storage") - } else { - queue = datastore.NewPostgresReplicationEventQueue(db) - rs = datastore.NewPostgresRepositoryStore(db, conf.StorageNames()) - - dsn := glsql.DSN(conf.DB, true) - if dsn == "" { - csg = rs - logger.Info("reads distribution caching is disabled because direct connection to Postgres is not set") - } else { - storagesCached, err := datastore.NewCachingConsistentStoragesGetter(logger, rs, conf.VirtualStorageNames()) - if err != nil { - return fmt.Errorf("caching storage provider: %w", err) - } - - resilientListenerTicker := helper.NewTimerTicker(5 * time.Second) - notificationsListener := datastore.NewResilientListener(conf.DB, resilientListenerTicker, logger) - go func() { - err := notificationsListener.Listen(ctx, storagesCached, datastore.StorageRepositoriesUpdatesChannel, datastore.RepositoriesUpdatesChannel) - if err != nil && !errors.Is(err, context.Canceled) { - logger.WithError(err).Error("notifications listener terminated") - } - }() - - metricsCollectors = append(metricsCollectors, storagesCached, notificationsListener) - csg = storagesCached - logger.Info("reads distribution caching is enabled by configuration") - } - } - - var errTracker tracker.ErrorTracker - - if conf.Failover.Enabled { - thresholdsConfigured, err := conf.Failover.ErrorThresholdsConfigured() - if err != nil { - return err - } - - if thresholdsConfigured { - errorWindowFunction, err := tracker.NewErrorWindowFunction(conf.Failover) - if err != nil { - return err - } - - errTracker, err = tracker.NewErrors(ctx, errorWindowFunction, conf.Failover.ReadErrorThresholdCount, conf.Failover.WriteErrorThresholdCount) - if err != nil { - return err - } - } - } - - transactionManager := transactions.NewManager(conf) - sidechannelRegistry := sidechannel.NewRegistry() - - backchannelCfg := backchannel.DefaultConfiguration() - backchannelCfg.AcceptBacklog = int(conf.Yamux.AcceptBacklog) - backchannelCfg.MaximumStreamWindowSizeBytes = conf.Yamux.MaximumStreamWindowSizeBytes - clientHandshaker := backchannel.NewClientHandshaker( - logger, - praefect.NewBackchannelServerFactory(logger, transaction.NewServer(transactionManager), sidechannelRegistry), - backchannelCfg, - ) - - assignmentStore := praefect.NewDisabledAssignmentStore(conf.StorageNames()) - var ( - nodeManager nodes.Manager - healthChecker praefect.HealthChecker - nodeSet praefect.NodeSet - router praefect.Router - primaryGetter praefect.PrimaryGetter +func helpTextTemplate() string { + var cmds []string + for k := range subcommands(nil) { + cmds = append(cmds, k) + } + sort.Strings(cmds) + + // Because not all sub-commands are registered with the new approach they won't be shown + // with the -help. To have them in the output we inject a simple list of their names into + // the template to have them presented. + return strings.Replace( + cli.AppHelpTemplate, + `COMMANDS:{{template "visibleCommandCategoryTemplate" .}}{{end}}`, + `COMMANDS:{{template "visibleCommandCategoryTemplate" .}}{{end}}`+ + "\n "+strings.Join(cmds, "\n "), + 1, ) - if conf.Failover.ElectionStrategy == config.ElectionStrategyPerRepository { - nodeSet, err = praefect.DialNodes( - ctx, - conf.VirtualStorages, - protoregistry.GitalyProtoPreregistered, - errTracker, - clientHandshaker, - sidechannelRegistry, - ) - if err != nil { - return fmt.Errorf("dial nodes: %w", err) - } - defer nodeSet.Close() - - healthManager := nodes.NewHealthManager(logger, db, nodes.GeneratePraefectName(conf, logger), nodeSet.HealthClients()) - go func() { - if err := healthManager.Run(ctx, helper.NewTimerTicker(time.Second)); err != nil { - logger.WithError(err).Error("health manager exited") - } - }() - - healthChecker = healthManager - - // Wait for the first health check to complete so the Praefect doesn't start serving RPC - // before the router is ready with the health status of the nodes. - <-healthManager.Updated() - - elector := nodes.NewPerRepositoryElector(db) - - primaryGetter = elector - assignmentStore = datastore.NewAssignmentStore(db, conf.StorageNames()) - - router = praefect.NewPerRepositoryRouter( - nodeSet.Connections(), - elector, - healthManager, - praefect.NewLockedRandom(rand.New(rand.NewSource(time.Now().UnixNano()))), - csg, - assignmentStore, - rs, - conf.DefaultReplicationFactors(), - ) - - if conf.BackgroundVerification.VerificationInterval > 0 { - logger.WithField("config", conf.BackgroundVerification).Info("background verifier started") - verifier := praefect.NewMetadataVerifier( - logger, - db, - nodeSet.Connections(), - healthManager, - conf.BackgroundVerification.VerificationInterval.Duration(), - conf.BackgroundVerification.DeleteInvalidRecords, - ) - promreg.MustRegister(verifier) - - go func() { - if err := verifier.Run(ctx, helper.NewTimerTicker(2*time.Second)); err != nil { - logger.WithError(err).Error("metadata verifier finished") - } - }() - - go func() { - if err := verifier.RunExpiredLeaseReleaser(ctx, helper.NewTimerTicker(10*time.Second)); err != nil { - logger.WithError(err).Error("expired verification lease releaser finished") - } - }() - } else { - logger.Info("background verifier is disabled") - } - } else { - if conf.Failover.Enabled { - logger.WithField("election_strategy", conf.Failover.ElectionStrategy).Warn( - "Deprecated election stategy in use, migrate to repository specific primary nodes following https://docs.gitlab.com/ee/administration/gitaly/praefect.html#migrate-to-repository-specific-primary-gitaly-nodes. The other election strategies are scheduled for removal in GitLab 14.0.") - } - - nodeMgr, err := nodes.NewManager(logger, conf, db, csg, nodeLatencyHistogram, protoregistry.GitalyProtoPreregistered, errTracker, clientHandshaker, sidechannelRegistry) - if err != nil { - return err - } - - healthChecker = praefect.HealthChecker(nodeMgr) - nodeSet = praefect.NodeSetFromNodeManager(nodeMgr) - router = praefect.NewNodeManagerRouter(nodeMgr, rs) - primaryGetter = nodeMgr - nodeManager = nodeMgr - - nodeMgr.Start(conf.Failover.BootstrapInterval.Duration(), conf.Failover.MonitorInterval.Duration()) - defer nodeMgr.Stop() - } - - logger.Infof("election strategy: %q", conf.Failover.ElectionStrategy) - logger.Info("background started: gitaly nodes health monitoring") - - var ( - // top level server dependencies - coordinator = praefect.NewCoordinator( - queue, - rs, - router, - transactionManager, - conf, - protoregistry.GitalyProtoPreregistered, - ) - - repl = praefect.NewReplMgr( - logger, - conf.StorageNames(), - queue, - rs, - healthChecker, - nodeSet, - praefect.WithDelayMetric(delayMetric), - praefect.WithLatencyMetric(latencyMetric), - praefect.WithDequeueBatchSize(conf.Replication.BatchSize), - praefect.WithParallelStorageProcessingWorkers(conf.Replication.ParallelStorageProcessingWorkers), - ) - srvFactory = praefect.NewServerFactory( - conf, - logger, - coordinator.StreamDirector, - nodeManager, - transactionManager, - queue, - rs, - assignmentStore, - protoregistry.GitalyProtoPreregistered, - nodeSet.Connections(), - primaryGetter, - service.ReadinessChecks(), - ) - ) - metricsCollectors = append(metricsCollectors, transactionManager, coordinator, repl) - if db != nil { - dbMetricCollectors := []prometheus.Collector{ - datastore.NewRepositoryStoreCollector(logger, conf.VirtualStorageNames(), db, conf.Prometheus.ScrapeTimeout.Duration()), - datastore.NewQueueDepthCollector(logger, db, conf.Prometheus.ScrapeTimeout.Duration()), - } - - if conf.BackgroundVerification.VerificationInterval > 0 { - dbMetricCollectors = append(dbMetricCollectors, datastore.NewVerificationQueueDepthCollector( - logger, - db, - conf.Prometheus.ScrapeTimeout.Duration(), - conf.BackgroundVerification.VerificationInterval.Duration(), - conf.StorageNames(), - )) - } - - // Eventually, database-related metrics will always be exported via a separate - // endpoint such that it's possible to set a different scraping interval and thus to - // reduce database load. For now though, we register the metrics twice, once for the - // standard and once for the database-specific endpoint. This is done to ensure a - // transitory period where deployments can be moved to the new endpoint without - // causing breakage if they still use the old endpoint. - dbPromRegistry.MustRegister(dbMetricCollectors...) - if !conf.PrometheusExcludeDatabaseFromDefaultMetrics { - promreg.MustRegister(dbMetricCollectors...) - } - } - promreg.MustRegister(metricsCollectors...) - - for _, cfg := range cfgs { - srv, err := srvFactory.Create(cfg.IsSecure()) - if err != nil { - return fmt.Errorf("create gRPC server: %w", err) - } - defer srv.Stop() - - b.RegisterStarter(starter.New(cfg, srv)) - } - - if conf.PrometheusListenAddr != "" { - logger.WithField("address", conf.PrometheusListenAddr).Info("Starting prometheus listener") - - b.RegisterStarter(func(listen bootstrap.ListenFunc, _ chan<- error, _ *prometheus.CounterVec) error { - l, err := listen(starter.TCP, conf.PrometheusListenAddr) - if err != nil { - return err - } - - serveMux := http.NewServeMux() - serveMux.Handle("/db_metrics", promhttp.HandlerFor(dbPromRegistry, promhttp.HandlerOpts{})) - - go func() { - opts := []monitoring.Option{ - monitoring.WithListener(l), - monitoring.WithServeMux(serveMux), - } - - if buildInfo, ok := debug.ReadBuildInfo(); ok { - opts = append(opts, monitoring.WithGoBuildInformation(buildInfo)) - } - - if err := monitoring.Start(opts...); err != nil { - logger.WithError(err).Errorf("Unable to start prometheus listener: %v", conf.PrometheusListenAddr) - } - }() - - return nil - }) - } - - if err := b.Start(); err != nil { - return fmt.Errorf("unable to start the bootstrap: %w", err) - } - for _, cfg := range cfgs { - logger.WithFields(logrus.Fields{"schema": cfg.Name, "address": cfg.Addr}).Info("listening") - } - - go repl.ProcessBacklog(ctx, praefect.ExpBackoffFactory{Start: time.Second, Max: 5 * time.Second}) - - staleTicker := helper.NewTimerTicker(30 * time.Second) - defer staleTicker.Stop() - - logger.Info("background started: processing of the replication events") - repl.ProcessStale(ctx, staleTicker, time.Minute) - logger.Info("background started: processing of the stale replication events") - - if interval := conf.Reconciliation.SchedulingInterval.Duration(); interval > 0 { - if conf.MemoryQueueEnabled { - logger.Warn("Disabled automatic reconciliation as it is only implemented using SQL queue and in-memory queue is configured.") - } else { - r := reconciler.NewReconciler( - logger, - db, - healthChecker, - conf.StorageNames(), - conf.Reconciliation.HistogramBuckets, - ) - promreg.MustRegister(r) - go func() { - if err := r.Run(ctx, helper.NewTimerTicker(interval)); err != nil { - logger.WithError(err).Error("reconciler finished execution") - } - }() - } - } - - if interval := conf.RepositoriesCleanup.RunInterval.Duration(); interval > 0 { - if db != nil { - go func() { - storageSync := datastore.NewStorageCleanup(db) - cfg := repocleaner.Cfg{ - RunInterval: conf.RepositoriesCleanup.RunInterval.Duration(), - LivenessInterval: 30 * time.Second, - RepositoriesInBatch: int(conf.RepositoriesCleanup.RepositoriesInBatch), - } - repoCleaner := repocleaner.NewRunner(cfg, logger, healthChecker, nodeSet.Connections(), storageSync, storageSync, repocleaner.NewLogWarnAction(logger)) - if err := repoCleaner.Run(ctx, helper.NewTimerTicker(conf.RepositoriesCleanup.CheckInterval.Duration())); err != nil && !errors.Is(context.Canceled, err) { - logger.WithError(err).Error("repository cleaner finished execution") - } else { - logger.Info("repository cleaner finished execution") - } - }() - } else { - logger.Warn("Repository cleanup background task disabled as there is no database connection configured.") - } - } else { - logger.Warn(`Repository cleanup background task disabled as "repositories_cleanup.run_interval" is not set or 0.`) - } - - gracefulStopTicker := helper.NewTimerTicker(conf.GracefulStopTimeout.Duration()) - defer gracefulStopTicker.Stop() - - return b.Wait(gracefulStopTicker, srvFactory.GracefulStop) -} - -func getStarterConfigs(conf config.Config) ([]starter.Config, error) { - var cfgs []starter.Config - unique := map[string]struct{}{} - for schema, addr := range map[string]string{ - starter.TCP: conf.ListenAddr, - starter.TLS: conf.TLSListenAddr, - starter.Unix: conf.SocketPath, - } { - if addr == "" { - continue - } - - addrConf, err := starter.ParseEndpoint(addr) - if err != nil { - // address doesn't include schema - if !errors.Is(err, starter.ErrEmptySchema) { - return nil, err - } - addrConf = starter.Config{Name: schema, Addr: addr} - } - addrConf.HandoverOnUpgrade = true - - if _, found := unique[addrConf.Addr]; found { - return nil, fmt.Errorf("same address can't be used for different schemas %q", addr) - } - unique[addrConf.Addr] = struct{}{} - - cfgs = append(cfgs, addrConf) - } - - if len(cfgs) == 0 { - return nil, errors.New("no listening addresses were provided, unable to start") - } - - return cfgs, nil -} - -func initDatabase(ctx context.Context, logger *logrus.Entry, conf config.Config) (*sql.DB, func(), error) { - openDBCtx, cancel := context.WithTimeout(ctx, 30*time.Second) - defer cancel() - db, err := glsql.OpenDB(openDBCtx, conf.DB) - if err != nil { - logger.WithError(err).Error("SQL connection open failed") - return nil, nil, err - } - - closedb := func() { - if err := db.Close(); err != nil { - logger.WithError(err).Error("SQL connection close failed") - } - } - - if err := datastore.CheckPostgresVersion(db); err != nil { - closedb() - return nil, nil, err - } - - return db, closedb, nil } diff --git a/internal/cli/praefect/main_test.go b/internal/cli/praefect/main_test.go index 787035155..57f657a72 100644 --- a/internal/cli/praefect/main_test.go +++ b/internal/cli/praefect/main_test.go @@ -22,7 +22,7 @@ func TestMain(m *testing.M) { } func TestNoConfigFlag(t *testing.T) { - _, err := initConfig(testhelper.NewDiscardingLogEntry(t)) + _, err := initConfig(testhelper.NewDiscardingLogEntry(t), "") assert.Equal(t, err, errNoConfigFile) } @@ -226,7 +226,7 @@ func TestExcludeDatabaseMetricsFromDefaultMetrics(t *testing.T) { go func() { defer close(stopped) logger := testhelper.NewDiscardingLogEntry(t) - assert.NoError(t, run(starterConfigs, conf, logger, bootstrapper, metricRegisterer, dbMetricsRegisterer)) + assert.NoError(t, server(starterConfigs, conf, logger, bootstrapper, metricRegisterer, dbMetricsRegisterer)) }() bootstrapper.Terminate() diff --git a/internal/cli/praefect/serve.go b/internal/cli/praefect/serve.go new file mode 100644 index 000000000..2761c8eda --- /dev/null +++ b/internal/cli/praefect/serve.go @@ -0,0 +1,606 @@ +package praefect + +import ( + "context" + "database/sql" + "errors" + "fmt" + "math/rand" + "net/http" + "os" + "runtime/debug" + "time" + + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" + "github.com/prometheus/client_golang/prometheus/promhttp" + "github.com/sirupsen/logrus" + "github.com/urfave/cli/v2" + "gitlab.com/gitlab-org/gitaly/v15/internal/backchannel" + "gitlab.com/gitlab-org/gitaly/v15/internal/bootstrap" + "gitlab.com/gitlab-org/gitaly/v15/internal/bootstrap/starter" + "gitlab.com/gitlab-org/gitaly/v15/internal/gitaly/config/sentry" + "gitlab.com/gitlab-org/gitaly/v15/internal/helper" + "gitlab.com/gitlab-org/gitaly/v15/internal/log" + "gitlab.com/gitlab-org/gitaly/v15/internal/praefect" + "gitlab.com/gitlab-org/gitaly/v15/internal/praefect/config" + "gitlab.com/gitlab-org/gitaly/v15/internal/praefect/datastore" + "gitlab.com/gitlab-org/gitaly/v15/internal/praefect/datastore/glsql" + "gitlab.com/gitlab-org/gitaly/v15/internal/praefect/metrics" + "gitlab.com/gitlab-org/gitaly/v15/internal/praefect/nodes" + "gitlab.com/gitlab-org/gitaly/v15/internal/praefect/nodes/tracker" + "gitlab.com/gitlab-org/gitaly/v15/internal/praefect/protoregistry" + "gitlab.com/gitlab-org/gitaly/v15/internal/praefect/reconciler" + "gitlab.com/gitlab-org/gitaly/v15/internal/praefect/repocleaner" + "gitlab.com/gitlab-org/gitaly/v15/internal/praefect/service" + "gitlab.com/gitlab-org/gitaly/v15/internal/praefect/service/transaction" + "gitlab.com/gitlab-org/gitaly/v15/internal/praefect/transactions" + "gitlab.com/gitlab-org/gitaly/v15/internal/sidechannel" + "gitlab.com/gitlab-org/gitaly/v15/internal/version" + "gitlab.com/gitlab-org/labkit/monitoring" + "gitlab.com/gitlab-org/labkit/tracing" +) + +var errNoConfigFile = errors.New("the config flag must be passed") + +func newServeCommand() *cli.Command { + return &cli.Command{ + Name: "serve", + Usage: "launch the server daemon", + Action: serveAction, + } +} + +func serveAction(ctx *cli.Context) error { + logger := log.Default() + // In order to support execution of all sub-commands not yet migrated to use a new cli + // implementation the invocation is done manually here. + subCmd := ctx.Args().First() + if subCmd != "" { + // It doesn't make difference if we provide command name to the invocation below + // or not as there won't be any output printed, because sub-commands are not yet + // registered. + pathToConfigFile := mustProvideConfigFlag(ctx, "") + conf, err := initConfig(logger, pathToConfigFile) + if err != nil { + return cli.Exit(fmt.Errorf("configuration error: %w", err), 1) + } + os.Exit(subCommand(conf, logger, subCmd, ctx.Args().Slice()[1:])) + } + + // The ctx.Command.Name can't be used here because if `praefect -config FILE` is used + // it will be set to 'praefect' instead of 'serve'. + pathToConfigFile := mustProvideConfigFlag(ctx, "serve") + + logger.Infof("Starting %s", version.GetVersionString("Praefect")) + + if err := run(ctx.App.Name, logger, pathToConfigFile); err != nil { + logger.WithError(err).Error("Praefect shutdown") + return cli.Exit("", 1) + } + + logger.Info("Praefect shutdown") + + return nil +} + +func run(appName string, logger *logrus.Entry, configPath string) error { + conf, err := initConfig(logger, configPath) + if err != nil { + return cli.Exit(fmt.Errorf("configuration error: %w", err), 1) + } + + conf.ConfigureLogger() + configure(appName, conf) + + starterConfigs, err := getStarterConfigs(conf) + if err != nil { + return cli.Exit(err, 1) + } + + promreg := prometheus.DefaultRegisterer + b, err := bootstrap.New(promauto.With(promreg).NewCounterVec( + prometheus.CounterOpts{ + Name: "gitaly_praefect_connections_total", + Help: "Total number of connections to Praefect", + }, + []string{"type"}, + )) + if err != nil { + return cli.Exit(fmt.Errorf("unable to create a bootstrap: %w", err), 1) + } + + dbPromRegistry := prometheus.NewRegistry() + + if err := server(starterConfigs, conf, logger, b, promreg, dbPromRegistry); err != nil { + return cli.Exit(err, 1) + } + + return nil +} + +func initConfig(logger *logrus.Entry, path string) (config.Config, error) { + var conf config.Config + + if path == "" { + return conf, errNoConfigFile + } + + conf, err := config.FromFile(path) + if err != nil { + return conf, fmt.Errorf("error reading config file: %w", err) + } + + if err := conf.Validate(); err != nil { + return config.Config{}, err + } + + if !conf.AllowLegacyElectors { + conf.Failover.ElectionStrategy = config.ElectionStrategyPerRepository + } + + if !conf.Failover.Enabled && conf.Failover.ElectionStrategy != "" { + logger.WithField("election_strategy", conf.Failover.ElectionStrategy).Warn( + "ignoring configured election strategy as failover is disabled") + } + + return conf, nil +} + +func configure(appName string, conf config.Config) { + tracing.Initialize(tracing.WithServiceName(appName)) + + if conf.PrometheusListenAddr != "" { + conf.Prometheus.Configure() + } + + sentry.ConfigureSentry(version.GetVersion(), conf.Sentry) +} + +func server( + cfgs []starter.Config, + conf config.Config, + logger *logrus.Entry, + b bootstrap.Listener, + promreg prometheus.Registerer, + dbPromRegistry interface { + prometheus.Registerer + prometheus.Gatherer + }, +) error { + nodeLatencyHistogram, err := metrics.RegisterNodeLatency(conf.Prometheus, promreg) + if err != nil { + return err + } + + delayMetric, err := metrics.RegisterReplicationDelay(conf.Prometheus, promreg) + if err != nil { + return err + } + + latencyMetric, err := metrics.RegisterReplicationLatency(conf.Prometheus, promreg) + if err != nil { + return err + } + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + var db *sql.DB + if conf.NeedsSQL() { + logger.Infof("establishing database connection to %s:%d ...", conf.DB.Host, conf.DB.Port) + dbConn, closedb, err := initDatabase(ctx, logger, conf) + if err != nil { + return err + } + defer closedb() + db = dbConn + logger.Info("database connection established") + } + + var queue datastore.ReplicationEventQueue + var rs datastore.RepositoryStore + var csg datastore.ConsistentStoragesGetter + var metricsCollectors []prometheus.Collector + + if conf.MemoryQueueEnabled { + queue = datastore.NewMemoryReplicationEventQueue(conf) + rs = datastore.MockRepositoryStore{} + csg = rs + logger.Info("reads distribution caching is disabled for in memory storage") + } else { + queue = datastore.NewPostgresReplicationEventQueue(db) + rs = datastore.NewPostgresRepositoryStore(db, conf.StorageNames()) + + dsn := glsql.DSN(conf.DB, true) + if dsn == "" { + csg = rs + logger.Info("reads distribution caching is disabled because direct connection to Postgres is not set") + } else { + storagesCached, err := datastore.NewCachingConsistentStoragesGetter(logger, rs, conf.VirtualStorageNames()) + if err != nil { + return fmt.Errorf("caching storage provider: %w", err) + } + + resilientListenerTicker := helper.NewTimerTicker(5 * time.Second) + notificationsListener := datastore.NewResilientListener(conf.DB, resilientListenerTicker, logger) + go func() { + err := notificationsListener.Listen(ctx, storagesCached, datastore.StorageRepositoriesUpdatesChannel, datastore.RepositoriesUpdatesChannel) + if err != nil && !errors.Is(err, context.Canceled) { + logger.WithError(err).Error("notifications listener terminated") + } + }() + + metricsCollectors = append(metricsCollectors, storagesCached, notificationsListener) + csg = storagesCached + logger.Info("reads distribution caching is enabled by configuration") + } + } + + var errTracker tracker.ErrorTracker + + if conf.Failover.Enabled { + thresholdsConfigured, err := conf.Failover.ErrorThresholdsConfigured() + if err != nil { + return err + } + + if thresholdsConfigured { + errorWindowFunction, err := tracker.NewErrorWindowFunction(conf.Failover) + if err != nil { + return err + } + + errTracker, err = tracker.NewErrors(ctx, errorWindowFunction, conf.Failover.ReadErrorThresholdCount, conf.Failover.WriteErrorThresholdCount) + if err != nil { + return err + } + } + } + + transactionManager := transactions.NewManager(conf) + sidechannelRegistry := sidechannel.NewRegistry() + + backchannelCfg := backchannel.DefaultConfiguration() + backchannelCfg.AcceptBacklog = int(conf.Yamux.AcceptBacklog) + backchannelCfg.MaximumStreamWindowSizeBytes = conf.Yamux.MaximumStreamWindowSizeBytes + clientHandshaker := backchannel.NewClientHandshaker( + logger, + praefect.NewBackchannelServerFactory(logger, transaction.NewServer(transactionManager), sidechannelRegistry), + backchannelCfg, + ) + + assignmentStore := praefect.NewDisabledAssignmentStore(conf.StorageNames()) + var ( + nodeManager nodes.Manager + healthChecker praefect.HealthChecker + nodeSet praefect.NodeSet + router praefect.Router + primaryGetter praefect.PrimaryGetter + ) + if conf.Failover.ElectionStrategy == config.ElectionStrategyPerRepository { + nodeSet, err = praefect.DialNodes( + ctx, + conf.VirtualStorages, + protoregistry.GitalyProtoPreregistered, + errTracker, + clientHandshaker, + sidechannelRegistry, + ) + if err != nil { + return fmt.Errorf("dial nodes: %w", err) + } + defer nodeSet.Close() + + healthManager := nodes.NewHealthManager(logger, db, nodes.GeneratePraefectName(conf, logger), nodeSet.HealthClients()) + go func() { + if err := healthManager.Run(ctx, helper.NewTimerTicker(time.Second)); err != nil { + logger.WithError(err).Error("health manager exited") + } + }() + + healthChecker = healthManager + + // Wait for the first health check to complete so the Praefect doesn't start serving RPC + // before the router is ready with the health status of the nodes. + <-healthManager.Updated() + + elector := nodes.NewPerRepositoryElector(db) + + primaryGetter = elector + assignmentStore = datastore.NewAssignmentStore(db, conf.StorageNames()) + + router = praefect.NewPerRepositoryRouter( + nodeSet.Connections(), + elector, + healthManager, + praefect.NewLockedRandom(rand.New(rand.NewSource(time.Now().UnixNano()))), + csg, + assignmentStore, + rs, + conf.DefaultReplicationFactors(), + ) + + if conf.BackgroundVerification.VerificationInterval > 0 { + logger.WithField("config", conf.BackgroundVerification).Info("background verifier started") + verifier := praefect.NewMetadataVerifier( + logger, + db, + nodeSet.Connections(), + healthManager, + conf.BackgroundVerification.VerificationInterval.Duration(), + conf.BackgroundVerification.DeleteInvalidRecords, + ) + promreg.MustRegister(verifier) + + go func() { + if err := verifier.Run(ctx, helper.NewTimerTicker(2*time.Second)); err != nil { + logger.WithError(err).Error("metadata verifier finished") + } + }() + + go func() { + if err := verifier.RunExpiredLeaseReleaser(ctx, helper.NewTimerTicker(10*time.Second)); err != nil { + logger.WithError(err).Error("expired verification lease releaser finished") + } + }() + } else { + logger.Info("background verifier is disabled") + } + } else { + if conf.Failover.Enabled { + logger.WithField("election_strategy", conf.Failover.ElectionStrategy).Warn( + "Deprecated election stategy in use, migrate to repository specific primary nodes following https://docs.gitlab.com/ee/administration/gitaly/praefect.html#migrate-to-repository-specific-primary-gitaly-nodes. The other election strategies are scheduled for removal in GitLab 14.0.") + } + + nodeMgr, err := nodes.NewManager(logger, conf, db, csg, nodeLatencyHistogram, protoregistry.GitalyProtoPreregistered, errTracker, clientHandshaker, sidechannelRegistry) + if err != nil { + return err + } + + healthChecker = praefect.HealthChecker(nodeMgr) + nodeSet = praefect.NodeSetFromNodeManager(nodeMgr) + router = praefect.NewNodeManagerRouter(nodeMgr, rs) + primaryGetter = nodeMgr + nodeManager = nodeMgr + + nodeMgr.Start(conf.Failover.BootstrapInterval.Duration(), conf.Failover.MonitorInterval.Duration()) + defer nodeMgr.Stop() + } + + logger.Infof("election strategy: %q", conf.Failover.ElectionStrategy) + logger.Info("background started: gitaly nodes health monitoring") + + var ( + // top level server dependencies + coordinator = praefect.NewCoordinator( + queue, + rs, + router, + transactionManager, + conf, + protoregistry.GitalyProtoPreregistered, + ) + + repl = praefect.NewReplMgr( + logger, + conf.StorageNames(), + queue, + rs, + healthChecker, + nodeSet, + praefect.WithDelayMetric(delayMetric), + praefect.WithLatencyMetric(latencyMetric), + praefect.WithDequeueBatchSize(conf.Replication.BatchSize), + praefect.WithParallelStorageProcessingWorkers(conf.Replication.ParallelStorageProcessingWorkers), + ) + srvFactory = praefect.NewServerFactory( + conf, + logger, + coordinator.StreamDirector, + nodeManager, + transactionManager, + queue, + rs, + assignmentStore, + protoregistry.GitalyProtoPreregistered, + nodeSet.Connections(), + primaryGetter, + service.ReadinessChecks(), + ) + ) + metricsCollectors = append(metricsCollectors, transactionManager, coordinator, repl) + if db != nil { + dbMetricCollectors := []prometheus.Collector{ + datastore.NewRepositoryStoreCollector(logger, conf.VirtualStorageNames(), db, conf.Prometheus.ScrapeTimeout.Duration()), + datastore.NewQueueDepthCollector(logger, db, conf.Prometheus.ScrapeTimeout.Duration()), + } + + if conf.BackgroundVerification.VerificationInterval > 0 { + dbMetricCollectors = append(dbMetricCollectors, datastore.NewVerificationQueueDepthCollector( + logger, + db, + conf.Prometheus.ScrapeTimeout.Duration(), + conf.BackgroundVerification.VerificationInterval.Duration(), + conf.StorageNames(), + )) + } + + // Eventually, database-related metrics will always be exported via a separate + // endpoint such that it's possible to set a different scraping interval and thus to + // reduce database load. For now though, we register the metrics twice, once for the + // standard and once for the database-specific endpoint. This is done to ensure a + // transitory period where deployments can be moved to the new endpoint without + // causing breakage if they still use the old endpoint. + dbPromRegistry.MustRegister(dbMetricCollectors...) + if !conf.PrometheusExcludeDatabaseFromDefaultMetrics { + promreg.MustRegister(dbMetricCollectors...) + } + } + promreg.MustRegister(metricsCollectors...) + + for _, cfg := range cfgs { + srv, err := srvFactory.Create(cfg.IsSecure()) + if err != nil { + return fmt.Errorf("create gRPC server: %w", err) + } + defer srv.Stop() + + b.RegisterStarter(starter.New(cfg, srv)) + } + + if conf.PrometheusListenAddr != "" { + logger.WithField("address", conf.PrometheusListenAddr).Info("Starting prometheus listener") + + b.RegisterStarter(func(listen bootstrap.ListenFunc, _ chan<- error, _ *prometheus.CounterVec) error { + l, err := listen(starter.TCP, conf.PrometheusListenAddr) + if err != nil { + return err + } + + serveMux := http.NewServeMux() + serveMux.Handle("/db_metrics", promhttp.HandlerFor(dbPromRegistry, promhttp.HandlerOpts{})) + + go func() { + opts := []monitoring.Option{ + monitoring.WithListener(l), + monitoring.WithServeMux(serveMux), + } + + if buildInfo, ok := debug.ReadBuildInfo(); ok { + opts = append(opts, monitoring.WithGoBuildInformation(buildInfo)) + } + + if err := monitoring.Start(opts...); err != nil { + logger.WithError(err).Errorf("Unable to start prometheus listener: %v", conf.PrometheusListenAddr) + } + }() + + return nil + }) + } + + if err := b.Start(); err != nil { + return fmt.Errorf("unable to start the bootstrap: %w", err) + } + for _, cfg := range cfgs { + logger.WithFields(logrus.Fields{"schema": cfg.Name, "address": cfg.Addr}).Info("listening") + } + + go repl.ProcessBacklog(ctx, praefect.ExpBackoffFactory{Start: time.Second, Max: 5 * time.Second}) + + staleTicker := helper.NewTimerTicker(30 * time.Second) + defer staleTicker.Stop() + + logger.Info("background started: processing of the replication events") + repl.ProcessStale(ctx, staleTicker, time.Minute) + logger.Info("background started: processing of the stale replication events") + + if interval := conf.Reconciliation.SchedulingInterval.Duration(); interval > 0 { + if conf.MemoryQueueEnabled { + logger.Warn("Disabled automatic reconciliation as it is only implemented using SQL queue and in-memory queue is configured.") + } else { + r := reconciler.NewReconciler( + logger, + db, + healthChecker, + conf.StorageNames(), + conf.Reconciliation.HistogramBuckets, + ) + promreg.MustRegister(r) + go func() { + if err := r.Run(ctx, helper.NewTimerTicker(interval)); err != nil { + logger.WithError(err).Error("reconciler finished execution") + } + }() + } + } + + if interval := conf.RepositoriesCleanup.RunInterval.Duration(); interval > 0 { + if db != nil { + go func() { + storageSync := datastore.NewStorageCleanup(db) + cfg := repocleaner.Cfg{ + RunInterval: conf.RepositoriesCleanup.RunInterval.Duration(), + LivenessInterval: 30 * time.Second, + RepositoriesInBatch: int(conf.RepositoriesCleanup.RepositoriesInBatch), + } + repoCleaner := repocleaner.NewRunner(cfg, logger, healthChecker, nodeSet.Connections(), storageSync, storageSync, repocleaner.NewLogWarnAction(logger)) + if err := repoCleaner.Run(ctx, helper.NewTimerTicker(conf.RepositoriesCleanup.CheckInterval.Duration())); err != nil && !errors.Is(context.Canceled, err) { + logger.WithError(err).Error("repository cleaner finished execution") + } else { + logger.Info("repository cleaner finished execution") + } + }() + } else { + logger.Warn("Repository cleanup background task disabled as there is no database connection configured.") + } + } else { + logger.Warn(`Repository cleanup background task disabled as "repositories_cleanup.run_interval" is not set or 0.`) + } + + gracefulStopTicker := helper.NewTimerTicker(conf.GracefulStopTimeout.Duration()) + defer gracefulStopTicker.Stop() + + return b.Wait(gracefulStopTicker, srvFactory.GracefulStop) +} + +func getStarterConfigs(conf config.Config) ([]starter.Config, error) { + var cfgs []starter.Config + unique := map[string]struct{}{} + for schema, addr := range map[string]string{ + starter.TCP: conf.ListenAddr, + starter.TLS: conf.TLSListenAddr, + starter.Unix: conf.SocketPath, + } { + if addr == "" { + continue + } + + addrConf, err := starter.ParseEndpoint(addr) + if err != nil { + // address doesn't include schema + if !errors.Is(err, starter.ErrEmptySchema) { + return nil, err + } + addrConf = starter.Config{Name: schema, Addr: addr} + } + addrConf.HandoverOnUpgrade = true + + if _, found := unique[addrConf.Addr]; found { + return nil, fmt.Errorf("same address can't be used for different schemas %q", addr) + } + unique[addrConf.Addr] = struct{}{} + + cfgs = append(cfgs, addrConf) + } + + if len(cfgs) == 0 { + return nil, errors.New("no listening addresses were provided, unable to start") + } + + return cfgs, nil +} + +func initDatabase(ctx context.Context, logger *logrus.Entry, conf config.Config) (*sql.DB, func(), error) { + openDBCtx, cancel := context.WithTimeout(ctx, 30*time.Second) + defer cancel() + db, err := glsql.OpenDB(openDBCtx, conf.DB) + if err != nil { + logger.WithError(err).Error("SQL connection open failed") + return nil, nil, err + } + + closedb := func() { + if err := db.Close(); err != nil { + logger.WithError(err).Error("SQL connection close failed") + } + } + + if err := datastore.CheckPostgresVersion(db); err != nil { + closedb() + return nil, nil, err + } + + return db, closedb, nil +} diff --git a/internal/cli/praefect/subcmd_configuration.go b/internal/cli/praefect/subcmd_configuration.go new file mode 100644 index 000000000..4f35ed6ef --- /dev/null +++ b/internal/cli/praefect/subcmd_configuration.go @@ -0,0 +1,15 @@ +package praefect + +import "github.com/urfave/cli/v2" + +const configurationCmdName = "configuration" + +func newConfigurationCommand() *cli.Command { + return &cli.Command{ + Name: configurationCmdName, + Usage: "manages configuration", + Subcommands: []*cli.Command{ + newConfigurationValidateCommand(), + }, + } +} diff --git a/internal/cli/praefect/subcmd_configuration_validate.go b/internal/cli/praefect/subcmd_configuration_validate.go index e9ad741c3..2b3da6d07 100644 --- a/internal/cli/praefect/subcmd_configuration_validate.go +++ b/internal/cli/praefect/subcmd_configuration_validate.go @@ -3,21 +3,39 @@ package praefect import ( "io" - "github.com/sirupsen/logrus" + "github.com/urfave/cli/v2" "gitlab.com/gitlab-org/gitaly/v15/cmd" "gitlab.com/gitlab-org/gitaly/v15/internal/praefect/config" ) -const ( - configurationCmdName = "configuration" - validateCmdName = "validate" - validationErrorCode = 2 -) +const validationErrorCode = 2 + +func newConfigurationValidateCommand() *cli.Command { + return &cli.Command{ + Name: "validate", + Usage: "validates configuration", + Description: "The command accepts configuration in toml format on STDIN. It applies " + + "validation rules to the received configuration and returns all the found " + + "validation errors in JSON format back on STDOUT.", + Action: configurationValidateAction, + } +} + +func configurationValidateAction(ctx *cli.Context) error { + if ctx.Args().Present() { + _ = cli.ShowSubcommandHelp(ctx) + return cli.Exit("invalid argument(s)", 1) + } + + if code := validateConfiguration(ctx.App.Reader, ctx.App.Writer, ctx.App.ErrWriter); code != 0 { + return cli.Exit("", code) + } + + return nil +} // validateConfiguration checks if provided configuration is valid. func validateConfiguration(reader io.Reader, outWriter, errWriter io.Writer) int { - logrus.SetLevel(logrus.ErrorLevel) - cfg, err := config.FromReader(reader) if err != nil { if cmd.WriteTomlReadError(err, outWriter, errWriter) { diff --git a/internal/cli/praefect/subcmd_configuration_validate_test.go b/internal/cli/praefect/subcmd_configuration_validate_test.go index 74152ff63..9cb1364b2 100644 --- a/internal/cli/praefect/subcmd_configuration_validate_test.go +++ b/internal/cli/praefect/subcmd_configuration_validate_test.go @@ -62,14 +62,22 @@ election_strategy = invalid`) exitCode: 1, args: []string{"arg1", "arg2"}, stdin: func(t *testing.T) io.Reader { return nil }, - stderr: `Usage of praefect: - -config string - Location for the config.toml - -version - Print version and exit - subcommand (optional) - One of accept-dataloss, check, configuration, dataloss, dial-nodes, list-storages, list-untracked-repositories, metadata, remove-repository, set-replication-factor, sql-migrate, sql-migrate-down, sql-migrate-status, sql-ping, track-repositories, track-repository, verify + stdout: `NAME: + praefect configuration validate - validates configuration + +USAGE: + praefect configuration validate command [command options] [arguments...] + +DESCRIPTION: + The command accepts configuration in toml format on STDIN. It applies validation rules to the received configuration and returns all the found validation errors in JSON format back on STDOUT. + +COMMANDS: + help, h Shows a list of commands or help for one command + +OPTIONS: + --help, -h show help `, + stderr: "invalid argument(s)\n", }, { name: "validation failures", |