Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorToon Claes <toon@gitlab.com>2023-04-17 15:22:59 +0300
committerToon Claes <toon@gitlab.com>2023-04-17 15:22:59 +0300
commit978f7d2b58620b95cba50b77885f9a8d261da752 (patch)
treefca7b7cab6d79402b016c04294b16e1c060abfbc
parent748f7b786b7eb29221c4ac7113c7adfce7674ec2 (diff)
parent86652606c7a5a0df0026969c07846a8b5d271193 (diff)
Merge branch 'ps-praefect-cli-upgrade' into 'master'
praefect: Start switch to the new cli implementation See merge request https://gitlab.com/gitlab-org/gitaly/-/merge_requests/5616 Merged-by: Toon Claes <toon@gitlab.com> Approved-by: John Cai <jcai@gitlab.com> Approved-by: Toon Claes <toon@gitlab.com> Reviewed-by: Toon Claes <toon@gitlab.com> Reviewed-by: Pavlo Strokov <pstrokov@gitlab.com> Co-authored-by: Pavlo Strokov <pstrokov@gitlab.com>
-rw-r--r--cmd/praefect/main.go11
-rw-r--r--internal/cli/praefect/main.go660
-rw-r--r--internal/cli/praefect/main_test.go4
-rw-r--r--internal/cli/praefect/serve.go606
-rw-r--r--internal/cli/praefect/subcmd_configuration.go15
-rw-r--r--internal/cli/praefect/subcmd_configuration_validate.go34
-rw-r--r--internal/cli/praefect/subcmd_configuration_validate_test.go22
7 files changed, 742 insertions, 610 deletions
diff --git a/cmd/praefect/main.go b/cmd/praefect/main.go
index 027aca865..cc7e88b93 100644
--- a/cmd/praefect/main.go
+++ b/cmd/praefect/main.go
@@ -1,7 +1,14 @@
package main
-import "gitlab.com/gitlab-org/gitaly/v15/internal/cli/praefect"
+import (
+ "log"
+ "os"
+
+ cli "gitlab.com/gitlab-org/gitaly/v15/internal/cli/praefect"
+)
func main() {
- praefect.Main()
+ if err := cli.NewApp().Run(os.Args); err != nil {
+ log.Fatal(err)
+ }
}
diff --git a/internal/cli/praefect/main.go b/internal/cli/praefect/main.go
index 2fa7b5822..c142713d6 100644
--- a/internal/cli/praefect/main.go
+++ b/internal/cli/praefect/main.go
@@ -57,614 +57,92 @@
package praefect
import (
- "context"
- "database/sql"
- "errors"
- "flag"
"fmt"
- "math/rand"
- "net/http"
+ "log"
"os"
- "runtime/debug"
"sort"
"strings"
- "time"
- "github.com/prometheus/client_golang/prometheus"
- "github.com/prometheus/client_golang/prometheus/promauto"
- "github.com/prometheus/client_golang/prometheus/promhttp"
- "github.com/sirupsen/logrus"
- "gitlab.com/gitlab-org/gitaly/v15/internal/backchannel"
- "gitlab.com/gitlab-org/gitaly/v15/internal/bootstrap"
- "gitlab.com/gitlab-org/gitaly/v15/internal/bootstrap/starter"
- "gitlab.com/gitlab-org/gitaly/v15/internal/gitaly/config/sentry"
- "gitlab.com/gitlab-org/gitaly/v15/internal/helper"
- "gitlab.com/gitlab-org/gitaly/v15/internal/log"
- "gitlab.com/gitlab-org/gitaly/v15/internal/praefect"
- "gitlab.com/gitlab-org/gitaly/v15/internal/praefect/config"
- "gitlab.com/gitlab-org/gitaly/v15/internal/praefect/datastore"
- "gitlab.com/gitlab-org/gitaly/v15/internal/praefect/datastore/glsql"
- "gitlab.com/gitlab-org/gitaly/v15/internal/praefect/metrics"
- "gitlab.com/gitlab-org/gitaly/v15/internal/praefect/nodes"
- "gitlab.com/gitlab-org/gitaly/v15/internal/praefect/nodes/tracker"
- "gitlab.com/gitlab-org/gitaly/v15/internal/praefect/protoregistry"
- "gitlab.com/gitlab-org/gitaly/v15/internal/praefect/reconciler"
- "gitlab.com/gitlab-org/gitaly/v15/internal/praefect/repocleaner"
- "gitlab.com/gitlab-org/gitaly/v15/internal/praefect/service"
- "gitlab.com/gitlab-org/gitaly/v15/internal/praefect/service/transaction"
- "gitlab.com/gitlab-org/gitaly/v15/internal/praefect/transactions"
- "gitlab.com/gitlab-org/gitaly/v15/internal/sidechannel"
+ "github.com/urfave/cli/v2"
"gitlab.com/gitlab-org/gitaly/v15/internal/version"
- "gitlab.com/gitlab-org/labkit/monitoring"
- "gitlab.com/gitlab-org/labkit/tracing"
)
-var (
- flagConfig = flag.String("config", "", "Location for the config.toml")
- flagVersion = flag.Bool("version", false, "Print version and exit")
-
- errNoConfigFile = errors.New("the config flag must be passed")
-)
-
-const progname = "praefect"
-
-// Main is an entry point of the praefect binary.
-func Main() {
- logger := log.Default()
- flag.Usage = func() {
- cmds := []string{configurationCmdName}
- for k := range subcommands(logger) {
- cmds = append(cmds, k)
- }
- sort.Strings(cmds)
-
- printfErr("Usage of %s:\n", progname)
- flag.PrintDefaults()
- printfErr(" subcommand (optional)\n")
- printfErr("\tOne of %s\n", strings.Join(cmds, ", "))
- }
- flag.Parse()
-
- // If invoked with -version
- if *flagVersion {
- fmt.Println(version.GetVersionString("Praefect"))
- os.Exit(0)
- }
-
- args := flag.Args()
- // The configuration sub-command differs from all other sub-commands because it doesn't
- // require a valid configuration to run. It expects configuration to be provided
- // on STDIN instead of file.
- if len(args) > 0 && args[0] == configurationCmdName {
- if len(args) == 2 && args[1] == validateCmdName {
- os.Exit(validateConfiguration(os.Stdin, os.Stdout, os.Stderr))
- }
- flag.Usage()
- os.Exit(1)
- }
-
- conf, err := initConfig(logger)
- if err != nil {
- printfErr("%s: configuration error: %v\n", progname, err)
- os.Exit(1)
- }
-
- conf.ConfigureLogger()
-
- if len(args) > 0 {
- os.Exit(subCommand(conf, logger, args[0], args[1:]))
- }
-
- configure(conf)
-
- logger.WithField("version", version.GetVersionString("Praefect")).Info("Starting " + progname)
-
- starterConfigs, err := getStarterConfigs(conf)
- if err != nil {
- logger.Fatalf("%s", err)
- }
-
- promreg := prometheus.DefaultRegisterer
- b, err := bootstrap.New(promauto.With(promreg).NewCounterVec(
- prometheus.CounterOpts{
- Name: "gitaly_praefect_connections_total",
- Help: "Total number of connections to Praefect",
- },
- []string{"type"},
- ))
- if err != nil {
- logger.Fatalf("unable to create a bootstrap: %v", err)
- }
-
- dbPromRegistry := prometheus.NewRegistry()
-
- if err := run(starterConfigs, conf, logger, b, promreg, dbPromRegistry); err != nil {
- logger.Fatalf("%v", err)
+func init() {
+ // Override the version printer so the output format matches what Praefect
+ // used before the introduction of the CLI toolkit.
+ cli.VersionPrinter = func(ctx *cli.Context) {
+ fmt.Fprintln(ctx.App.Writer, version.GetVersionString("Praefect"))
}
}
-func initConfig(logger *logrus.Entry) (config.Config, error) {
- var conf config.Config
+const (
+ progname = "praefect"
- if *flagConfig == "" {
- return conf, errNoConfigFile
- }
-
- conf, err := config.FromFile(*flagConfig)
- if err != nil {
- return conf, fmt.Errorf("error reading config file: %w", err)
- }
-
- if err := conf.Validate(); err != nil {
- return config.Config{}, err
- }
-
- if !conf.AllowLegacyElectors {
- conf.Failover.ElectionStrategy = config.ElectionStrategyPerRepository
- }
+ configFlagName = "config"
+)
- if !conf.Failover.Enabled && conf.Failover.ElectionStrategy != "" {
- logger.WithField("election_strategy", conf.Failover.ElectionStrategy).Warn(
- "ignoring configured election strategy as failover is disabled")
+// NewApp returns a new praefect app.
+func NewApp() *cli.App {
+ return &cli.App{
+ Name: progname,
+ Usage: "a gitaly proxy",
+ Version: version.GetVersionString("Praefect"),
+ // serveAction is also here in the root to keep the CLI backwards compatible with
+ // the previous way to launch Praefect with just `praefect -config FILE`.
+ // We may want to deprecate this eventually.
+ //
+ // The 'DefaultCommand: "serve"' setting can't be used here because it won't be
+ // possible to invoke sub-command not yet registered.
+ Action: serveAction,
+ Commands: []*cli.Command{
+ newServeCommand(),
+ newConfigurationCommand(),
+ },
+ Flags: []cli.Flag{
+ &cli.StringFlag{
+ // We can't mark it required, because it is not for all sub-commands.
+ // We need it as it is used by majority of the sub-commands and
+ // because of the existing format of commands invocation.
+ Name: configFlagName,
+ Usage: "load configuration from `FILE`",
+ },
+ },
+ CustomAppHelpTemplate: helpTextTemplate(),
}
-
- return conf, nil
}
-func configure(conf config.Config) {
- tracing.Initialize(tracing.WithServiceName(progname))
-
- if conf.PrometheusListenAddr != "" {
- conf.Prometheus.Configure()
- }
-
- sentry.ConfigureSentry(version.GetVersion(), conf.Sentry)
+// mustProvideConfigFlag extracts value of the 'config' flag and returns it.
+// If flag is not set the help for the command will be printed and terminated with exit code 2.
+func mustProvideConfigFlag(ctx *cli.Context, command string) string {
+ pathToConfigFile := ctx.String(configFlagName)
+ if pathToConfigFile == "" {
+ // We can't make 'config' flag required for all commands, but we still want the
+ // same output to be printed if it is not provided.
+ // It should be removed after migration to the `praefect CMD -config FILE`
+ // where we can mark it as required for each sub-command.
+ _ = cli.ShowCommandHelp(ctx, command)
+ log.Printf("Required flag %q not set\n", configFlagName)
+ os.Exit(2)
+ }
+
+ return pathToConfigFile
}
-func run(
- cfgs []starter.Config,
- conf config.Config,
- logger *logrus.Entry,
- b bootstrap.Listener,
- promreg prometheus.Registerer,
- dbPromRegistry interface {
- prometheus.Registerer
- prometheus.Gatherer
- },
-) error {
- nodeLatencyHistogram, err := metrics.RegisterNodeLatency(conf.Prometheus, promreg)
- if err != nil {
- return err
- }
-
- delayMetric, err := metrics.RegisterReplicationDelay(conf.Prometheus, promreg)
- if err != nil {
- return err
- }
-
- latencyMetric, err := metrics.RegisterReplicationLatency(conf.Prometheus, promreg)
- if err != nil {
- return err
- }
-
- ctx, cancel := context.WithCancel(context.Background())
- defer cancel()
-
- var db *sql.DB
- if conf.NeedsSQL() {
- logger.Infof("establishing database connection to %s:%d ...", conf.DB.Host, conf.DB.Port)
- dbConn, closedb, err := initDatabase(ctx, logger, conf)
- if err != nil {
- return err
- }
- defer closedb()
- db = dbConn
- logger.Info("database connection established")
- }
-
- var queue datastore.ReplicationEventQueue
- var rs datastore.RepositoryStore
- var csg datastore.ConsistentStoragesGetter
- var metricsCollectors []prometheus.Collector
-
- if conf.MemoryQueueEnabled {
- queue = datastore.NewMemoryReplicationEventQueue(conf)
- rs = datastore.MockRepositoryStore{}
- csg = rs
- logger.Info("reads distribution caching is disabled for in memory storage")
- } else {
- queue = datastore.NewPostgresReplicationEventQueue(db)
- rs = datastore.NewPostgresRepositoryStore(db, conf.StorageNames())
-
- dsn := glsql.DSN(conf.DB, true)
- if dsn == "" {
- csg = rs
- logger.Info("reads distribution caching is disabled because direct connection to Postgres is not set")
- } else {
- storagesCached, err := datastore.NewCachingConsistentStoragesGetter(logger, rs, conf.VirtualStorageNames())
- if err != nil {
- return fmt.Errorf("caching storage provider: %w", err)
- }
-
- resilientListenerTicker := helper.NewTimerTicker(5 * time.Second)
- notificationsListener := datastore.NewResilientListener(conf.DB, resilientListenerTicker, logger)
- go func() {
- err := notificationsListener.Listen(ctx, storagesCached, datastore.StorageRepositoriesUpdatesChannel, datastore.RepositoriesUpdatesChannel)
- if err != nil && !errors.Is(err, context.Canceled) {
- logger.WithError(err).Error("notifications listener terminated")
- }
- }()
-
- metricsCollectors = append(metricsCollectors, storagesCached, notificationsListener)
- csg = storagesCached
- logger.Info("reads distribution caching is enabled by configuration")
- }
- }
-
- var errTracker tracker.ErrorTracker
-
- if conf.Failover.Enabled {
- thresholdsConfigured, err := conf.Failover.ErrorThresholdsConfigured()
- if err != nil {
- return err
- }
-
- if thresholdsConfigured {
- errorWindowFunction, err := tracker.NewErrorWindowFunction(conf.Failover)
- if err != nil {
- return err
- }
-
- errTracker, err = tracker.NewErrors(ctx, errorWindowFunction, conf.Failover.ReadErrorThresholdCount, conf.Failover.WriteErrorThresholdCount)
- if err != nil {
- return err
- }
- }
- }
-
- transactionManager := transactions.NewManager(conf)
- sidechannelRegistry := sidechannel.NewRegistry()
-
- backchannelCfg := backchannel.DefaultConfiguration()
- backchannelCfg.AcceptBacklog = int(conf.Yamux.AcceptBacklog)
- backchannelCfg.MaximumStreamWindowSizeBytes = conf.Yamux.MaximumStreamWindowSizeBytes
- clientHandshaker := backchannel.NewClientHandshaker(
- logger,
- praefect.NewBackchannelServerFactory(logger, transaction.NewServer(transactionManager), sidechannelRegistry),
- backchannelCfg,
- )
-
- assignmentStore := praefect.NewDisabledAssignmentStore(conf.StorageNames())
- var (
- nodeManager nodes.Manager
- healthChecker praefect.HealthChecker
- nodeSet praefect.NodeSet
- router praefect.Router
- primaryGetter praefect.PrimaryGetter
+func helpTextTemplate() string {
+ var cmds []string
+ for k := range subcommands(nil) {
+ cmds = append(cmds, k)
+ }
+ sort.Strings(cmds)
+
+ // Because not all sub-commands are registered with the new approach they won't be shown
+ // with the -help. To have them in the output we inject a simple list of their names into
+ // the template to have them presented.
+ return strings.Replace(
+ cli.AppHelpTemplate,
+ `COMMANDS:{{template "visibleCommandCategoryTemplate" .}}{{end}}`,
+ `COMMANDS:{{template "visibleCommandCategoryTemplate" .}}{{end}}`+
+ "\n "+strings.Join(cmds, "\n "),
+ 1,
)
- if conf.Failover.ElectionStrategy == config.ElectionStrategyPerRepository {
- nodeSet, err = praefect.DialNodes(
- ctx,
- conf.VirtualStorages,
- protoregistry.GitalyProtoPreregistered,
- errTracker,
- clientHandshaker,
- sidechannelRegistry,
- )
- if err != nil {
- return fmt.Errorf("dial nodes: %w", err)
- }
- defer nodeSet.Close()
-
- healthManager := nodes.NewHealthManager(logger, db, nodes.GeneratePraefectName(conf, logger), nodeSet.HealthClients())
- go func() {
- if err := healthManager.Run(ctx, helper.NewTimerTicker(time.Second)); err != nil {
- logger.WithError(err).Error("health manager exited")
- }
- }()
-
- healthChecker = healthManager
-
- // Wait for the first health check to complete so the Praefect doesn't start serving RPC
- // before the router is ready with the health status of the nodes.
- <-healthManager.Updated()
-
- elector := nodes.NewPerRepositoryElector(db)
-
- primaryGetter = elector
- assignmentStore = datastore.NewAssignmentStore(db, conf.StorageNames())
-
- router = praefect.NewPerRepositoryRouter(
- nodeSet.Connections(),
- elector,
- healthManager,
- praefect.NewLockedRandom(rand.New(rand.NewSource(time.Now().UnixNano()))),
- csg,
- assignmentStore,
- rs,
- conf.DefaultReplicationFactors(),
- )
-
- if conf.BackgroundVerification.VerificationInterval > 0 {
- logger.WithField("config", conf.BackgroundVerification).Info("background verifier started")
- verifier := praefect.NewMetadataVerifier(
- logger,
- db,
- nodeSet.Connections(),
- healthManager,
- conf.BackgroundVerification.VerificationInterval.Duration(),
- conf.BackgroundVerification.DeleteInvalidRecords,
- )
- promreg.MustRegister(verifier)
-
- go func() {
- if err := verifier.Run(ctx, helper.NewTimerTicker(2*time.Second)); err != nil {
- logger.WithError(err).Error("metadata verifier finished")
- }
- }()
-
- go func() {
- if err := verifier.RunExpiredLeaseReleaser(ctx, helper.NewTimerTicker(10*time.Second)); err != nil {
- logger.WithError(err).Error("expired verification lease releaser finished")
- }
- }()
- } else {
- logger.Info("background verifier is disabled")
- }
- } else {
- if conf.Failover.Enabled {
- logger.WithField("election_strategy", conf.Failover.ElectionStrategy).Warn(
- "Deprecated election stategy in use, migrate to repository specific primary nodes following https://docs.gitlab.com/ee/administration/gitaly/praefect.html#migrate-to-repository-specific-primary-gitaly-nodes. The other election strategies are scheduled for removal in GitLab 14.0.")
- }
-
- nodeMgr, err := nodes.NewManager(logger, conf, db, csg, nodeLatencyHistogram, protoregistry.GitalyProtoPreregistered, errTracker, clientHandshaker, sidechannelRegistry)
- if err != nil {
- return err
- }
-
- healthChecker = praefect.HealthChecker(nodeMgr)
- nodeSet = praefect.NodeSetFromNodeManager(nodeMgr)
- router = praefect.NewNodeManagerRouter(nodeMgr, rs)
- primaryGetter = nodeMgr
- nodeManager = nodeMgr
-
- nodeMgr.Start(conf.Failover.BootstrapInterval.Duration(), conf.Failover.MonitorInterval.Duration())
- defer nodeMgr.Stop()
- }
-
- logger.Infof("election strategy: %q", conf.Failover.ElectionStrategy)
- logger.Info("background started: gitaly nodes health monitoring")
-
- var (
- // top level server dependencies
- coordinator = praefect.NewCoordinator(
- queue,
- rs,
- router,
- transactionManager,
- conf,
- protoregistry.GitalyProtoPreregistered,
- )
-
- repl = praefect.NewReplMgr(
- logger,
- conf.StorageNames(),
- queue,
- rs,
- healthChecker,
- nodeSet,
- praefect.WithDelayMetric(delayMetric),
- praefect.WithLatencyMetric(latencyMetric),
- praefect.WithDequeueBatchSize(conf.Replication.BatchSize),
- praefect.WithParallelStorageProcessingWorkers(conf.Replication.ParallelStorageProcessingWorkers),
- )
- srvFactory = praefect.NewServerFactory(
- conf,
- logger,
- coordinator.StreamDirector,
- nodeManager,
- transactionManager,
- queue,
- rs,
- assignmentStore,
- protoregistry.GitalyProtoPreregistered,
- nodeSet.Connections(),
- primaryGetter,
- service.ReadinessChecks(),
- )
- )
- metricsCollectors = append(metricsCollectors, transactionManager, coordinator, repl)
- if db != nil {
- dbMetricCollectors := []prometheus.Collector{
- datastore.NewRepositoryStoreCollector(logger, conf.VirtualStorageNames(), db, conf.Prometheus.ScrapeTimeout.Duration()),
- datastore.NewQueueDepthCollector(logger, db, conf.Prometheus.ScrapeTimeout.Duration()),
- }
-
- if conf.BackgroundVerification.VerificationInterval > 0 {
- dbMetricCollectors = append(dbMetricCollectors, datastore.NewVerificationQueueDepthCollector(
- logger,
- db,
- conf.Prometheus.ScrapeTimeout.Duration(),
- conf.BackgroundVerification.VerificationInterval.Duration(),
- conf.StorageNames(),
- ))
- }
-
- // Eventually, database-related metrics will always be exported via a separate
- // endpoint such that it's possible to set a different scraping interval and thus to
- // reduce database load. For now though, we register the metrics twice, once for the
- // standard and once for the database-specific endpoint. This is done to ensure a
- // transitory period where deployments can be moved to the new endpoint without
- // causing breakage if they still use the old endpoint.
- dbPromRegistry.MustRegister(dbMetricCollectors...)
- if !conf.PrometheusExcludeDatabaseFromDefaultMetrics {
- promreg.MustRegister(dbMetricCollectors...)
- }
- }
- promreg.MustRegister(metricsCollectors...)
-
- for _, cfg := range cfgs {
- srv, err := srvFactory.Create(cfg.IsSecure())
- if err != nil {
- return fmt.Errorf("create gRPC server: %w", err)
- }
- defer srv.Stop()
-
- b.RegisterStarter(starter.New(cfg, srv))
- }
-
- if conf.PrometheusListenAddr != "" {
- logger.WithField("address", conf.PrometheusListenAddr).Info("Starting prometheus listener")
-
- b.RegisterStarter(func(listen bootstrap.ListenFunc, _ chan<- error, _ *prometheus.CounterVec) error {
- l, err := listen(starter.TCP, conf.PrometheusListenAddr)
- if err != nil {
- return err
- }
-
- serveMux := http.NewServeMux()
- serveMux.Handle("/db_metrics", promhttp.HandlerFor(dbPromRegistry, promhttp.HandlerOpts{}))
-
- go func() {
- opts := []monitoring.Option{
- monitoring.WithListener(l),
- monitoring.WithServeMux(serveMux),
- }
-
- if buildInfo, ok := debug.ReadBuildInfo(); ok {
- opts = append(opts, monitoring.WithGoBuildInformation(buildInfo))
- }
-
- if err := monitoring.Start(opts...); err != nil {
- logger.WithError(err).Errorf("Unable to start prometheus listener: %v", conf.PrometheusListenAddr)
- }
- }()
-
- return nil
- })
- }
-
- if err := b.Start(); err != nil {
- return fmt.Errorf("unable to start the bootstrap: %w", err)
- }
- for _, cfg := range cfgs {
- logger.WithFields(logrus.Fields{"schema": cfg.Name, "address": cfg.Addr}).Info("listening")
- }
-
- go repl.ProcessBacklog(ctx, praefect.ExpBackoffFactory{Start: time.Second, Max: 5 * time.Second})
-
- staleTicker := helper.NewTimerTicker(30 * time.Second)
- defer staleTicker.Stop()
-
- logger.Info("background started: processing of the replication events")
- repl.ProcessStale(ctx, staleTicker, time.Minute)
- logger.Info("background started: processing of the stale replication events")
-
- if interval := conf.Reconciliation.SchedulingInterval.Duration(); interval > 0 {
- if conf.MemoryQueueEnabled {
- logger.Warn("Disabled automatic reconciliation as it is only implemented using SQL queue and in-memory queue is configured.")
- } else {
- r := reconciler.NewReconciler(
- logger,
- db,
- healthChecker,
- conf.StorageNames(),
- conf.Reconciliation.HistogramBuckets,
- )
- promreg.MustRegister(r)
- go func() {
- if err := r.Run(ctx, helper.NewTimerTicker(interval)); err != nil {
- logger.WithError(err).Error("reconciler finished execution")
- }
- }()
- }
- }
-
- if interval := conf.RepositoriesCleanup.RunInterval.Duration(); interval > 0 {
- if db != nil {
- go func() {
- storageSync := datastore.NewStorageCleanup(db)
- cfg := repocleaner.Cfg{
- RunInterval: conf.RepositoriesCleanup.RunInterval.Duration(),
- LivenessInterval: 30 * time.Second,
- RepositoriesInBatch: int(conf.RepositoriesCleanup.RepositoriesInBatch),
- }
- repoCleaner := repocleaner.NewRunner(cfg, logger, healthChecker, nodeSet.Connections(), storageSync, storageSync, repocleaner.NewLogWarnAction(logger))
- if err := repoCleaner.Run(ctx, helper.NewTimerTicker(conf.RepositoriesCleanup.CheckInterval.Duration())); err != nil && !errors.Is(context.Canceled, err) {
- logger.WithError(err).Error("repository cleaner finished execution")
- } else {
- logger.Info("repository cleaner finished execution")
- }
- }()
- } else {
- logger.Warn("Repository cleanup background task disabled as there is no database connection configured.")
- }
- } else {
- logger.Warn(`Repository cleanup background task disabled as "repositories_cleanup.run_interval" is not set or 0.`)
- }
-
- gracefulStopTicker := helper.NewTimerTicker(conf.GracefulStopTimeout.Duration())
- defer gracefulStopTicker.Stop()
-
- return b.Wait(gracefulStopTicker, srvFactory.GracefulStop)
-}
-
-func getStarterConfigs(conf config.Config) ([]starter.Config, error) {
- var cfgs []starter.Config
- unique := map[string]struct{}{}
- for schema, addr := range map[string]string{
- starter.TCP: conf.ListenAddr,
- starter.TLS: conf.TLSListenAddr,
- starter.Unix: conf.SocketPath,
- } {
- if addr == "" {
- continue
- }
-
- addrConf, err := starter.ParseEndpoint(addr)
- if err != nil {
- // address doesn't include schema
- if !errors.Is(err, starter.ErrEmptySchema) {
- return nil, err
- }
- addrConf = starter.Config{Name: schema, Addr: addr}
- }
- addrConf.HandoverOnUpgrade = true
-
- if _, found := unique[addrConf.Addr]; found {
- return nil, fmt.Errorf("same address can't be used for different schemas %q", addr)
- }
- unique[addrConf.Addr] = struct{}{}
-
- cfgs = append(cfgs, addrConf)
- }
-
- if len(cfgs) == 0 {
- return nil, errors.New("no listening addresses were provided, unable to start")
- }
-
- return cfgs, nil
-}
-
-func initDatabase(ctx context.Context, logger *logrus.Entry, conf config.Config) (*sql.DB, func(), error) {
- openDBCtx, cancel := context.WithTimeout(ctx, 30*time.Second)
- defer cancel()
- db, err := glsql.OpenDB(openDBCtx, conf.DB)
- if err != nil {
- logger.WithError(err).Error("SQL connection open failed")
- return nil, nil, err
- }
-
- closedb := func() {
- if err := db.Close(); err != nil {
- logger.WithError(err).Error("SQL connection close failed")
- }
- }
-
- if err := datastore.CheckPostgresVersion(db); err != nil {
- closedb()
- return nil, nil, err
- }
-
- return db, closedb, nil
}
diff --git a/internal/cli/praefect/main_test.go b/internal/cli/praefect/main_test.go
index 787035155..57f657a72 100644
--- a/internal/cli/praefect/main_test.go
+++ b/internal/cli/praefect/main_test.go
@@ -22,7 +22,7 @@ func TestMain(m *testing.M) {
}
func TestNoConfigFlag(t *testing.T) {
- _, err := initConfig(testhelper.NewDiscardingLogEntry(t))
+ _, err := initConfig(testhelper.NewDiscardingLogEntry(t), "")
assert.Equal(t, err, errNoConfigFile)
}
@@ -226,7 +226,7 @@ func TestExcludeDatabaseMetricsFromDefaultMetrics(t *testing.T) {
go func() {
defer close(stopped)
logger := testhelper.NewDiscardingLogEntry(t)
- assert.NoError(t, run(starterConfigs, conf, logger, bootstrapper, metricRegisterer, dbMetricsRegisterer))
+ assert.NoError(t, server(starterConfigs, conf, logger, bootstrapper, metricRegisterer, dbMetricsRegisterer))
}()
bootstrapper.Terminate()
diff --git a/internal/cli/praefect/serve.go b/internal/cli/praefect/serve.go
new file mode 100644
index 000000000..2761c8eda
--- /dev/null
+++ b/internal/cli/praefect/serve.go
@@ -0,0 +1,606 @@
+package praefect
+
+import (
+ "context"
+ "database/sql"
+ "errors"
+ "fmt"
+ "math/rand"
+ "net/http"
+ "os"
+ "runtime/debug"
+ "time"
+
+ "github.com/prometheus/client_golang/prometheus"
+ "github.com/prometheus/client_golang/prometheus/promauto"
+ "github.com/prometheus/client_golang/prometheus/promhttp"
+ "github.com/sirupsen/logrus"
+ "github.com/urfave/cli/v2"
+ "gitlab.com/gitlab-org/gitaly/v15/internal/backchannel"
+ "gitlab.com/gitlab-org/gitaly/v15/internal/bootstrap"
+ "gitlab.com/gitlab-org/gitaly/v15/internal/bootstrap/starter"
+ "gitlab.com/gitlab-org/gitaly/v15/internal/gitaly/config/sentry"
+ "gitlab.com/gitlab-org/gitaly/v15/internal/helper"
+ "gitlab.com/gitlab-org/gitaly/v15/internal/log"
+ "gitlab.com/gitlab-org/gitaly/v15/internal/praefect"
+ "gitlab.com/gitlab-org/gitaly/v15/internal/praefect/config"
+ "gitlab.com/gitlab-org/gitaly/v15/internal/praefect/datastore"
+ "gitlab.com/gitlab-org/gitaly/v15/internal/praefect/datastore/glsql"
+ "gitlab.com/gitlab-org/gitaly/v15/internal/praefect/metrics"
+ "gitlab.com/gitlab-org/gitaly/v15/internal/praefect/nodes"
+ "gitlab.com/gitlab-org/gitaly/v15/internal/praefect/nodes/tracker"
+ "gitlab.com/gitlab-org/gitaly/v15/internal/praefect/protoregistry"
+ "gitlab.com/gitlab-org/gitaly/v15/internal/praefect/reconciler"
+ "gitlab.com/gitlab-org/gitaly/v15/internal/praefect/repocleaner"
+ "gitlab.com/gitlab-org/gitaly/v15/internal/praefect/service"
+ "gitlab.com/gitlab-org/gitaly/v15/internal/praefect/service/transaction"
+ "gitlab.com/gitlab-org/gitaly/v15/internal/praefect/transactions"
+ "gitlab.com/gitlab-org/gitaly/v15/internal/sidechannel"
+ "gitlab.com/gitlab-org/gitaly/v15/internal/version"
+ "gitlab.com/gitlab-org/labkit/monitoring"
+ "gitlab.com/gitlab-org/labkit/tracing"
+)
+
+var errNoConfigFile = errors.New("the config flag must be passed")
+
+func newServeCommand() *cli.Command {
+ return &cli.Command{
+ Name: "serve",
+ Usage: "launch the server daemon",
+ Action: serveAction,
+ }
+}
+
+func serveAction(ctx *cli.Context) error {
+ logger := log.Default()
+ // In order to support execution of all sub-commands not yet migrated to use a new cli
+ // implementation the invocation is done manually here.
+ subCmd := ctx.Args().First()
+ if subCmd != "" {
+ // It doesn't make difference if we provide command name to the invocation below
+ // or not as there won't be any output printed, because sub-commands are not yet
+ // registered.
+ pathToConfigFile := mustProvideConfigFlag(ctx, "")
+ conf, err := initConfig(logger, pathToConfigFile)
+ if err != nil {
+ return cli.Exit(fmt.Errorf("configuration error: %w", err), 1)
+ }
+ os.Exit(subCommand(conf, logger, subCmd, ctx.Args().Slice()[1:]))
+ }
+
+ // The ctx.Command.Name can't be used here because if `praefect -config FILE` is used
+ // it will be set to 'praefect' instead of 'serve'.
+ pathToConfigFile := mustProvideConfigFlag(ctx, "serve")
+
+ logger.Infof("Starting %s", version.GetVersionString("Praefect"))
+
+ if err := run(ctx.App.Name, logger, pathToConfigFile); err != nil {
+ logger.WithError(err).Error("Praefect shutdown")
+ return cli.Exit("", 1)
+ }
+
+ logger.Info("Praefect shutdown")
+
+ return nil
+}
+
+func run(appName string, logger *logrus.Entry, configPath string) error {
+ conf, err := initConfig(logger, configPath)
+ if err != nil {
+ return cli.Exit(fmt.Errorf("configuration error: %w", err), 1)
+ }
+
+ conf.ConfigureLogger()
+ configure(appName, conf)
+
+ starterConfigs, err := getStarterConfigs(conf)
+ if err != nil {
+ return cli.Exit(err, 1)
+ }
+
+ promreg := prometheus.DefaultRegisterer
+ b, err := bootstrap.New(promauto.With(promreg).NewCounterVec(
+ prometheus.CounterOpts{
+ Name: "gitaly_praefect_connections_total",
+ Help: "Total number of connections to Praefect",
+ },
+ []string{"type"},
+ ))
+ if err != nil {
+ return cli.Exit(fmt.Errorf("unable to create a bootstrap: %w", err), 1)
+ }
+
+ dbPromRegistry := prometheus.NewRegistry()
+
+ if err := server(starterConfigs, conf, logger, b, promreg, dbPromRegistry); err != nil {
+ return cli.Exit(err, 1)
+ }
+
+ return nil
+}
+
+func initConfig(logger *logrus.Entry, path string) (config.Config, error) {
+ var conf config.Config
+
+ if path == "" {
+ return conf, errNoConfigFile
+ }
+
+ conf, err := config.FromFile(path)
+ if err != nil {
+ return conf, fmt.Errorf("error reading config file: %w", err)
+ }
+
+ if err := conf.Validate(); err != nil {
+ return config.Config{}, err
+ }
+
+ if !conf.AllowLegacyElectors {
+ conf.Failover.ElectionStrategy = config.ElectionStrategyPerRepository
+ }
+
+ if !conf.Failover.Enabled && conf.Failover.ElectionStrategy != "" {
+ logger.WithField("election_strategy", conf.Failover.ElectionStrategy).Warn(
+ "ignoring configured election strategy as failover is disabled")
+ }
+
+ return conf, nil
+}
+
+func configure(appName string, conf config.Config) {
+ tracing.Initialize(tracing.WithServiceName(appName))
+
+ if conf.PrometheusListenAddr != "" {
+ conf.Prometheus.Configure()
+ }
+
+ sentry.ConfigureSentry(version.GetVersion(), conf.Sentry)
+}
+
+func server(
+ cfgs []starter.Config,
+ conf config.Config,
+ logger *logrus.Entry,
+ b bootstrap.Listener,
+ promreg prometheus.Registerer,
+ dbPromRegistry interface {
+ prometheus.Registerer
+ prometheus.Gatherer
+ },
+) error {
+ nodeLatencyHistogram, err := metrics.RegisterNodeLatency(conf.Prometheus, promreg)
+ if err != nil {
+ return err
+ }
+
+ delayMetric, err := metrics.RegisterReplicationDelay(conf.Prometheus, promreg)
+ if err != nil {
+ return err
+ }
+
+ latencyMetric, err := metrics.RegisterReplicationLatency(conf.Prometheus, promreg)
+ if err != nil {
+ return err
+ }
+
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
+
+ var db *sql.DB
+ if conf.NeedsSQL() {
+ logger.Infof("establishing database connection to %s:%d ...", conf.DB.Host, conf.DB.Port)
+ dbConn, closedb, err := initDatabase(ctx, logger, conf)
+ if err != nil {
+ return err
+ }
+ defer closedb()
+ db = dbConn
+ logger.Info("database connection established")
+ }
+
+ var queue datastore.ReplicationEventQueue
+ var rs datastore.RepositoryStore
+ var csg datastore.ConsistentStoragesGetter
+ var metricsCollectors []prometheus.Collector
+
+ if conf.MemoryQueueEnabled {
+ queue = datastore.NewMemoryReplicationEventQueue(conf)
+ rs = datastore.MockRepositoryStore{}
+ csg = rs
+ logger.Info("reads distribution caching is disabled for in memory storage")
+ } else {
+ queue = datastore.NewPostgresReplicationEventQueue(db)
+ rs = datastore.NewPostgresRepositoryStore(db, conf.StorageNames())
+
+ dsn := glsql.DSN(conf.DB, true)
+ if dsn == "" {
+ csg = rs
+ logger.Info("reads distribution caching is disabled because direct connection to Postgres is not set")
+ } else {
+ storagesCached, err := datastore.NewCachingConsistentStoragesGetter(logger, rs, conf.VirtualStorageNames())
+ if err != nil {
+ return fmt.Errorf("caching storage provider: %w", err)
+ }
+
+ resilientListenerTicker := helper.NewTimerTicker(5 * time.Second)
+ notificationsListener := datastore.NewResilientListener(conf.DB, resilientListenerTicker, logger)
+ go func() {
+ err := notificationsListener.Listen(ctx, storagesCached, datastore.StorageRepositoriesUpdatesChannel, datastore.RepositoriesUpdatesChannel)
+ if err != nil && !errors.Is(err, context.Canceled) {
+ logger.WithError(err).Error("notifications listener terminated")
+ }
+ }()
+
+ metricsCollectors = append(metricsCollectors, storagesCached, notificationsListener)
+ csg = storagesCached
+ logger.Info("reads distribution caching is enabled by configuration")
+ }
+ }
+
+ var errTracker tracker.ErrorTracker
+
+ if conf.Failover.Enabled {
+ thresholdsConfigured, err := conf.Failover.ErrorThresholdsConfigured()
+ if err != nil {
+ return err
+ }
+
+ if thresholdsConfigured {
+ errorWindowFunction, err := tracker.NewErrorWindowFunction(conf.Failover)
+ if err != nil {
+ return err
+ }
+
+ errTracker, err = tracker.NewErrors(ctx, errorWindowFunction, conf.Failover.ReadErrorThresholdCount, conf.Failover.WriteErrorThresholdCount)
+ if err != nil {
+ return err
+ }
+ }
+ }
+
+ transactionManager := transactions.NewManager(conf)
+ sidechannelRegistry := sidechannel.NewRegistry()
+
+ backchannelCfg := backchannel.DefaultConfiguration()
+ backchannelCfg.AcceptBacklog = int(conf.Yamux.AcceptBacklog)
+ backchannelCfg.MaximumStreamWindowSizeBytes = conf.Yamux.MaximumStreamWindowSizeBytes
+ clientHandshaker := backchannel.NewClientHandshaker(
+ logger,
+ praefect.NewBackchannelServerFactory(logger, transaction.NewServer(transactionManager), sidechannelRegistry),
+ backchannelCfg,
+ )
+
+ assignmentStore := praefect.NewDisabledAssignmentStore(conf.StorageNames())
+ var (
+ nodeManager nodes.Manager
+ healthChecker praefect.HealthChecker
+ nodeSet praefect.NodeSet
+ router praefect.Router
+ primaryGetter praefect.PrimaryGetter
+ )
+ if conf.Failover.ElectionStrategy == config.ElectionStrategyPerRepository {
+ nodeSet, err = praefect.DialNodes(
+ ctx,
+ conf.VirtualStorages,
+ protoregistry.GitalyProtoPreregistered,
+ errTracker,
+ clientHandshaker,
+ sidechannelRegistry,
+ )
+ if err != nil {
+ return fmt.Errorf("dial nodes: %w", err)
+ }
+ defer nodeSet.Close()
+
+ healthManager := nodes.NewHealthManager(logger, db, nodes.GeneratePraefectName(conf, logger), nodeSet.HealthClients())
+ go func() {
+ if err := healthManager.Run(ctx, helper.NewTimerTicker(time.Second)); err != nil {
+ logger.WithError(err).Error("health manager exited")
+ }
+ }()
+
+ healthChecker = healthManager
+
+ // Wait for the first health check to complete so the Praefect doesn't start serving RPC
+ // before the router is ready with the health status of the nodes.
+ <-healthManager.Updated()
+
+ elector := nodes.NewPerRepositoryElector(db)
+
+ primaryGetter = elector
+ assignmentStore = datastore.NewAssignmentStore(db, conf.StorageNames())
+
+ router = praefect.NewPerRepositoryRouter(
+ nodeSet.Connections(),
+ elector,
+ healthManager,
+ praefect.NewLockedRandom(rand.New(rand.NewSource(time.Now().UnixNano()))),
+ csg,
+ assignmentStore,
+ rs,
+ conf.DefaultReplicationFactors(),
+ )
+
+ if conf.BackgroundVerification.VerificationInterval > 0 {
+ logger.WithField("config", conf.BackgroundVerification).Info("background verifier started")
+ verifier := praefect.NewMetadataVerifier(
+ logger,
+ db,
+ nodeSet.Connections(),
+ healthManager,
+ conf.BackgroundVerification.VerificationInterval.Duration(),
+ conf.BackgroundVerification.DeleteInvalidRecords,
+ )
+ promreg.MustRegister(verifier)
+
+ go func() {
+ if err := verifier.Run(ctx, helper.NewTimerTicker(2*time.Second)); err != nil {
+ logger.WithError(err).Error("metadata verifier finished")
+ }
+ }()
+
+ go func() {
+ if err := verifier.RunExpiredLeaseReleaser(ctx, helper.NewTimerTicker(10*time.Second)); err != nil {
+ logger.WithError(err).Error("expired verification lease releaser finished")
+ }
+ }()
+ } else {
+ logger.Info("background verifier is disabled")
+ }
+ } else {
+ if conf.Failover.Enabled {
+ logger.WithField("election_strategy", conf.Failover.ElectionStrategy).Warn(
+ "Deprecated election stategy in use, migrate to repository specific primary nodes following https://docs.gitlab.com/ee/administration/gitaly/praefect.html#migrate-to-repository-specific-primary-gitaly-nodes. The other election strategies are scheduled for removal in GitLab 14.0.")
+ }
+
+ nodeMgr, err := nodes.NewManager(logger, conf, db, csg, nodeLatencyHistogram, protoregistry.GitalyProtoPreregistered, errTracker, clientHandshaker, sidechannelRegistry)
+ if err != nil {
+ return err
+ }
+
+ healthChecker = praefect.HealthChecker(nodeMgr)
+ nodeSet = praefect.NodeSetFromNodeManager(nodeMgr)
+ router = praefect.NewNodeManagerRouter(nodeMgr, rs)
+ primaryGetter = nodeMgr
+ nodeManager = nodeMgr
+
+ nodeMgr.Start(conf.Failover.BootstrapInterval.Duration(), conf.Failover.MonitorInterval.Duration())
+ defer nodeMgr.Stop()
+ }
+
+ logger.Infof("election strategy: %q", conf.Failover.ElectionStrategy)
+ logger.Info("background started: gitaly nodes health monitoring")
+
+ var (
+ // top level server dependencies
+ coordinator = praefect.NewCoordinator(
+ queue,
+ rs,
+ router,
+ transactionManager,
+ conf,
+ protoregistry.GitalyProtoPreregistered,
+ )
+
+ repl = praefect.NewReplMgr(
+ logger,
+ conf.StorageNames(),
+ queue,
+ rs,
+ healthChecker,
+ nodeSet,
+ praefect.WithDelayMetric(delayMetric),
+ praefect.WithLatencyMetric(latencyMetric),
+ praefect.WithDequeueBatchSize(conf.Replication.BatchSize),
+ praefect.WithParallelStorageProcessingWorkers(conf.Replication.ParallelStorageProcessingWorkers),
+ )
+ srvFactory = praefect.NewServerFactory(
+ conf,
+ logger,
+ coordinator.StreamDirector,
+ nodeManager,
+ transactionManager,
+ queue,
+ rs,
+ assignmentStore,
+ protoregistry.GitalyProtoPreregistered,
+ nodeSet.Connections(),
+ primaryGetter,
+ service.ReadinessChecks(),
+ )
+ )
+ metricsCollectors = append(metricsCollectors, transactionManager, coordinator, repl)
+ if db != nil {
+ dbMetricCollectors := []prometheus.Collector{
+ datastore.NewRepositoryStoreCollector(logger, conf.VirtualStorageNames(), db, conf.Prometheus.ScrapeTimeout.Duration()),
+ datastore.NewQueueDepthCollector(logger, db, conf.Prometheus.ScrapeTimeout.Duration()),
+ }
+
+ if conf.BackgroundVerification.VerificationInterval > 0 {
+ dbMetricCollectors = append(dbMetricCollectors, datastore.NewVerificationQueueDepthCollector(
+ logger,
+ db,
+ conf.Prometheus.ScrapeTimeout.Duration(),
+ conf.BackgroundVerification.VerificationInterval.Duration(),
+ conf.StorageNames(),
+ ))
+ }
+
+ // Eventually, database-related metrics will always be exported via a separate
+ // endpoint such that it's possible to set a different scraping interval and thus to
+ // reduce database load. For now though, we register the metrics twice, once for the
+ // standard and once for the database-specific endpoint. This is done to ensure a
+ // transitory period where deployments can be moved to the new endpoint without
+ // causing breakage if they still use the old endpoint.
+ dbPromRegistry.MustRegister(dbMetricCollectors...)
+ if !conf.PrometheusExcludeDatabaseFromDefaultMetrics {
+ promreg.MustRegister(dbMetricCollectors...)
+ }
+ }
+ promreg.MustRegister(metricsCollectors...)
+
+ for _, cfg := range cfgs {
+ srv, err := srvFactory.Create(cfg.IsSecure())
+ if err != nil {
+ return fmt.Errorf("create gRPC server: %w", err)
+ }
+ defer srv.Stop()
+
+ b.RegisterStarter(starter.New(cfg, srv))
+ }
+
+ if conf.PrometheusListenAddr != "" {
+ logger.WithField("address", conf.PrometheusListenAddr).Info("Starting prometheus listener")
+
+ b.RegisterStarter(func(listen bootstrap.ListenFunc, _ chan<- error, _ *prometheus.CounterVec) error {
+ l, err := listen(starter.TCP, conf.PrometheusListenAddr)
+ if err != nil {
+ return err
+ }
+
+ serveMux := http.NewServeMux()
+ serveMux.Handle("/db_metrics", promhttp.HandlerFor(dbPromRegistry, promhttp.HandlerOpts{}))
+
+ go func() {
+ opts := []monitoring.Option{
+ monitoring.WithListener(l),
+ monitoring.WithServeMux(serveMux),
+ }
+
+ if buildInfo, ok := debug.ReadBuildInfo(); ok {
+ opts = append(opts, monitoring.WithGoBuildInformation(buildInfo))
+ }
+
+ if err := monitoring.Start(opts...); err != nil {
+ logger.WithError(err).Errorf("Unable to start prometheus listener: %v", conf.PrometheusListenAddr)
+ }
+ }()
+
+ return nil
+ })
+ }
+
+ if err := b.Start(); err != nil {
+ return fmt.Errorf("unable to start the bootstrap: %w", err)
+ }
+ for _, cfg := range cfgs {
+ logger.WithFields(logrus.Fields{"schema": cfg.Name, "address": cfg.Addr}).Info("listening")
+ }
+
+ go repl.ProcessBacklog(ctx, praefect.ExpBackoffFactory{Start: time.Second, Max: 5 * time.Second})
+
+ staleTicker := helper.NewTimerTicker(30 * time.Second)
+ defer staleTicker.Stop()
+
+ logger.Info("background started: processing of the replication events")
+ repl.ProcessStale(ctx, staleTicker, time.Minute)
+ logger.Info("background started: processing of the stale replication events")
+
+ if interval := conf.Reconciliation.SchedulingInterval.Duration(); interval > 0 {
+ if conf.MemoryQueueEnabled {
+ logger.Warn("Disabled automatic reconciliation as it is only implemented using SQL queue and in-memory queue is configured.")
+ } else {
+ r := reconciler.NewReconciler(
+ logger,
+ db,
+ healthChecker,
+ conf.StorageNames(),
+ conf.Reconciliation.HistogramBuckets,
+ )
+ promreg.MustRegister(r)
+ go func() {
+ if err := r.Run(ctx, helper.NewTimerTicker(interval)); err != nil {
+ logger.WithError(err).Error("reconciler finished execution")
+ }
+ }()
+ }
+ }
+
+ if interval := conf.RepositoriesCleanup.RunInterval.Duration(); interval > 0 {
+ if db != nil {
+ go func() {
+ storageSync := datastore.NewStorageCleanup(db)
+ cfg := repocleaner.Cfg{
+ RunInterval: conf.RepositoriesCleanup.RunInterval.Duration(),
+ LivenessInterval: 30 * time.Second,
+ RepositoriesInBatch: int(conf.RepositoriesCleanup.RepositoriesInBatch),
+ }
+ repoCleaner := repocleaner.NewRunner(cfg, logger, healthChecker, nodeSet.Connections(), storageSync, storageSync, repocleaner.NewLogWarnAction(logger))
+ if err := repoCleaner.Run(ctx, helper.NewTimerTicker(conf.RepositoriesCleanup.CheckInterval.Duration())); err != nil && !errors.Is(context.Canceled, err) {
+ logger.WithError(err).Error("repository cleaner finished execution")
+ } else {
+ logger.Info("repository cleaner finished execution")
+ }
+ }()
+ } else {
+ logger.Warn("Repository cleanup background task disabled as there is no database connection configured.")
+ }
+ } else {
+ logger.Warn(`Repository cleanup background task disabled as "repositories_cleanup.run_interval" is not set or 0.`)
+ }
+
+ gracefulStopTicker := helper.NewTimerTicker(conf.GracefulStopTimeout.Duration())
+ defer gracefulStopTicker.Stop()
+
+ return b.Wait(gracefulStopTicker, srvFactory.GracefulStop)
+}
+
+func getStarterConfigs(conf config.Config) ([]starter.Config, error) {
+ var cfgs []starter.Config
+ unique := map[string]struct{}{}
+ for schema, addr := range map[string]string{
+ starter.TCP: conf.ListenAddr,
+ starter.TLS: conf.TLSListenAddr,
+ starter.Unix: conf.SocketPath,
+ } {
+ if addr == "" {
+ continue
+ }
+
+ addrConf, err := starter.ParseEndpoint(addr)
+ if err != nil {
+ // address doesn't include schema
+ if !errors.Is(err, starter.ErrEmptySchema) {
+ return nil, err
+ }
+ addrConf = starter.Config{Name: schema, Addr: addr}
+ }
+ addrConf.HandoverOnUpgrade = true
+
+ if _, found := unique[addrConf.Addr]; found {
+ return nil, fmt.Errorf("same address can't be used for different schemas %q", addr)
+ }
+ unique[addrConf.Addr] = struct{}{}
+
+ cfgs = append(cfgs, addrConf)
+ }
+
+ if len(cfgs) == 0 {
+ return nil, errors.New("no listening addresses were provided, unable to start")
+ }
+
+ return cfgs, nil
+}
+
+func initDatabase(ctx context.Context, logger *logrus.Entry, conf config.Config) (*sql.DB, func(), error) {
+ openDBCtx, cancel := context.WithTimeout(ctx, 30*time.Second)
+ defer cancel()
+ db, err := glsql.OpenDB(openDBCtx, conf.DB)
+ if err != nil {
+ logger.WithError(err).Error("SQL connection open failed")
+ return nil, nil, err
+ }
+
+ closedb := func() {
+ if err := db.Close(); err != nil {
+ logger.WithError(err).Error("SQL connection close failed")
+ }
+ }
+
+ if err := datastore.CheckPostgresVersion(db); err != nil {
+ closedb()
+ return nil, nil, err
+ }
+
+ return db, closedb, nil
+}
diff --git a/internal/cli/praefect/subcmd_configuration.go b/internal/cli/praefect/subcmd_configuration.go
new file mode 100644
index 000000000..4f35ed6ef
--- /dev/null
+++ b/internal/cli/praefect/subcmd_configuration.go
@@ -0,0 +1,15 @@
+package praefect
+
+import "github.com/urfave/cli/v2"
+
+const configurationCmdName = "configuration"
+
+func newConfigurationCommand() *cli.Command {
+ return &cli.Command{
+ Name: configurationCmdName,
+ Usage: "manages configuration",
+ Subcommands: []*cli.Command{
+ newConfigurationValidateCommand(),
+ },
+ }
+}
diff --git a/internal/cli/praefect/subcmd_configuration_validate.go b/internal/cli/praefect/subcmd_configuration_validate.go
index e9ad741c3..2b3da6d07 100644
--- a/internal/cli/praefect/subcmd_configuration_validate.go
+++ b/internal/cli/praefect/subcmd_configuration_validate.go
@@ -3,21 +3,39 @@ package praefect
import (
"io"
- "github.com/sirupsen/logrus"
+ "github.com/urfave/cli/v2"
"gitlab.com/gitlab-org/gitaly/v15/cmd"
"gitlab.com/gitlab-org/gitaly/v15/internal/praefect/config"
)
-const (
- configurationCmdName = "configuration"
- validateCmdName = "validate"
- validationErrorCode = 2
-)
+const validationErrorCode = 2
+
+func newConfigurationValidateCommand() *cli.Command {
+ return &cli.Command{
+ Name: "validate",
+ Usage: "validates configuration",
+ Description: "The command accepts configuration in toml format on STDIN. It applies " +
+ "validation rules to the received configuration and returns all the found " +
+ "validation errors in JSON format back on STDOUT.",
+ Action: configurationValidateAction,
+ }
+}
+
+func configurationValidateAction(ctx *cli.Context) error {
+ if ctx.Args().Present() {
+ _ = cli.ShowSubcommandHelp(ctx)
+ return cli.Exit("invalid argument(s)", 1)
+ }
+
+ if code := validateConfiguration(ctx.App.Reader, ctx.App.Writer, ctx.App.ErrWriter); code != 0 {
+ return cli.Exit("", code)
+ }
+
+ return nil
+}
// validateConfiguration checks if provided configuration is valid.
func validateConfiguration(reader io.Reader, outWriter, errWriter io.Writer) int {
- logrus.SetLevel(logrus.ErrorLevel)
-
cfg, err := config.FromReader(reader)
if err != nil {
if cmd.WriteTomlReadError(err, outWriter, errWriter) {
diff --git a/internal/cli/praefect/subcmd_configuration_validate_test.go b/internal/cli/praefect/subcmd_configuration_validate_test.go
index 74152ff63..9cb1364b2 100644
--- a/internal/cli/praefect/subcmd_configuration_validate_test.go
+++ b/internal/cli/praefect/subcmd_configuration_validate_test.go
@@ -62,14 +62,22 @@ election_strategy = invalid`)
exitCode: 1,
args: []string{"arg1", "arg2"},
stdin: func(t *testing.T) io.Reader { return nil },
- stderr: `Usage of praefect:
- -config string
- Location for the config.toml
- -version
- Print version and exit
- subcommand (optional)
- One of accept-dataloss, check, configuration, dataloss, dial-nodes, list-storages, list-untracked-repositories, metadata, remove-repository, set-replication-factor, sql-migrate, sql-migrate-down, sql-migrate-status, sql-ping, track-repositories, track-repository, verify
+ stdout: `NAME:
+ praefect configuration validate - validates configuration
+
+USAGE:
+ praefect configuration validate command [command options] [arguments...]
+
+DESCRIPTION:
+ The command accepts configuration in toml format on STDIN. It applies validation rules to the received configuration and returns all the found validation errors in JSON format back on STDOUT.
+
+COMMANDS:
+ help, h Shows a list of commands or help for one command
+
+OPTIONS:
+ --help, -h show help
`,
+ stderr: "invalid argument(s)\n",
},
{
name: "validation failures",