Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJohn Cai <jcai@gitlab.com>2021-11-01 18:14:47 +0300
committerJohn Cai <jcai@gitlab.com>2021-11-01 18:14:47 +0300
commit0595ee8948842829e01fe10d6b7a8d27fc66dec4 (patch)
treefab37c034c4cc6f036e2123d78d78633f967b3d4
parentfeb673d3328239cf669770406cfd0db188d347f9 (diff)
parentb5724b09d133865ab808348fba7cd23c9e0a5ec4 (diff)
Merge branch 'ps-praefect-cmd-port-14-0' into '14-0-stable'
Backport praefect sub-commands to 14.0 See merge request gitlab-org/gitaly!4018
-rw-r--r--cmd/gitaly/main.go4
-rw-r--r--cmd/praefect/main.go27
-rw-r--r--cmd/praefect/subcmd.go27
-rw-r--r--cmd/praefect/subcmd_accept_dataloss.go2
-rw-r--r--cmd/praefect/subcmd_dataloss.go2
-rw-r--r--cmd/praefect/subcmd_list_untracked_repositories.go172
-rw-r--r--cmd/praefect/subcmd_list_untracked_repositories_test.go143
-rw-r--r--cmd/praefect/subcmd_pingnodes.go2
-rw-r--r--cmd/praefect/subcmd_reconcile.go4
-rw-r--r--cmd/praefect/subcmd_remove_repository.go221
-rw-r--r--cmd/praefect/subcmd_remove_repository_test.go328
-rw-r--r--cmd/praefect/subcmd_set_replication_factor.go2
-rw-r--r--cmd/praefect/subcmd_track_repository.go224
-rw-r--r--cmd/praefect/subcmd_track_repository_test.go230
-rw-r--r--internal/bootstrap/bootstrap.go72
-rw-r--r--internal/bootstrap/bootstrap_test.go54
-rw-r--r--internal/praefect/datastore/storage_cleanup.go85
-rw-r--r--internal/praefect/datastore/storage_cleanup_test.go92
-rw-r--r--internal/praefect/metrics/prometheus.go12
-rw-r--r--internal/praefect/repocleaner/init_test.go21
-rw-r--r--internal/praefect/repocleaner/repository.go77
-rw-r--r--internal/praefect/repocleaner/repository_test.go114
22 files changed, 1848 insertions, 67 deletions
diff --git a/cmd/gitaly/main.go b/cmd/gitaly/main.go
index dd4243003..111ac7d71 100644
--- a/cmd/gitaly/main.go
+++ b/cmd/gitaly/main.go
@@ -188,8 +188,6 @@ func run(cfg config.Cfg) error {
return fmt.Errorf("linguist instance creation: %w", err)
}
- b.StopAction = gitalyServerFactory.GracefulStop
-
rubySrv := rubyserver.New(cfg)
if err := rubySrv.Start(); err != nil {
return fmt.Errorf("initialize gitaly-ruby: %v", err)
@@ -291,5 +289,5 @@ func run(cfg config.Cfg) error {
}
}()
- return b.Wait(cfg.GracefulRestartTimeout.Duration())
+ return b.Wait(cfg.GracefulRestartTimeout.Duration(), gitalyServerFactory.GracefulStop)
}
diff --git a/cmd/praefect/main.go b/cmd/praefect/main.go
index fde28f0ef..4314b7732 100644
--- a/cmd/praefect/main.go
+++ b/cmd/praefect/main.go
@@ -167,7 +167,12 @@ func main() {
logger.Fatalf("%s", err)
}
- if err := run(starterConfigs, conf); err != nil {
+ b, err := bootstrap.New()
+ if err != nil {
+ logger.Fatalf("unable to create a bootstrap: %v", err)
+ }
+
+ if err := run(starterConfigs, conf, b, prometheus.DefaultRegisterer); err != nil {
logger.Fatalf("%v", err)
}
}
@@ -210,18 +215,18 @@ func configure(conf config.Config) {
sentry.ConfigureSentry(version.GetVersion(), conf.Sentry)
}
-func run(cfgs []starter.Config, conf config.Config) error {
- nodeLatencyHistogram, err := metrics.RegisterNodeLatency(conf.Prometheus)
+func run(cfgs []starter.Config, conf config.Config, b bootstrap.Listener, promreg prometheus.Registerer) error {
+ nodeLatencyHistogram, err := metrics.RegisterNodeLatency(conf.Prometheus, promreg)
if err != nil {
return err
}
- delayMetric, err := metrics.RegisterReplicationDelay(conf.Prometheus)
+ delayMetric, err := metrics.RegisterReplicationDelay(conf.Prometheus, promreg)
if err != nil {
return err
}
- latencyMetric, err := metrics.RegisterReplicationLatency(conf.Prometheus)
+ latencyMetric, err := metrics.RegisterReplicationLatency(conf.Prometheus, promreg)
if err != nil {
return err
}
@@ -408,7 +413,7 @@ func run(cfgs []starter.Config, conf config.Config) error {
)
metricsCollectors = append(metricsCollectors, transactionManager, coordinator, repl)
if db != nil {
- prometheus.MustRegister(
+ promreg.MustRegister(
datastore.NewRepositoryStoreCollector(
logger,
conf.VirtualStorageNames(),
@@ -417,14 +422,8 @@ func run(cfgs []starter.Config, conf config.Config) error {
),
)
}
- prometheus.MustRegister(metricsCollectors...)
-
- b, err := bootstrap.New()
- if err != nil {
- return fmt.Errorf("unable to create a bootstrap: %v", err)
- }
+ promreg.MustRegister(metricsCollectors...)
- b.StopAction = srvFactory.GracefulStop
for _, cfg := range cfgs {
srv, err := srvFactory.Create(cfg.IsSecure())
if err != nil {
@@ -507,7 +506,7 @@ func run(cfgs []starter.Config, conf config.Config) error {
}
}
- return b.Wait(conf.GracefulStopTimeout.Duration())
+ return b.Wait(conf.GracefulStopTimeout.Duration(), srvFactory.GracefulStop)
}
func getStarterConfigs(conf config.Config) ([]starter.Config, error) {
diff --git a/cmd/praefect/subcmd.go b/cmd/praefect/subcmd.go
index 70ddff3c1..08a11c21d 100644
--- a/cmd/praefect/subcmd.go
+++ b/cmd/praefect/subcmd.go
@@ -22,17 +22,22 @@ type subcmd interface {
Exec(flags *flag.FlagSet, config config.Config) error
}
+const defaultDialTimeout = 30 * time.Second
+
var (
subcommands = map[string]subcmd{
- "sql-ping": &sqlPingSubcommand{},
- "sql-migrate": &sqlMigrateSubcommand{},
- "dial-nodes": &dialNodesSubcommand{},
- "reconcile": &reconcileSubcommand{},
- "sql-migrate-down": &sqlMigrateDownSubcommand{},
- "sql-migrate-status": &sqlMigrateStatusSubcommand{},
- "dataloss": newDatalossSubcommand(),
- "accept-dataloss": &acceptDatalossSubcommand{},
- "set-replication-factor": newSetReplicatioFactorSubcommand(os.Stdout),
+ "sql-ping": &sqlPingSubcommand{},
+ "sql-migrate": &sqlMigrateSubcommand{},
+ "dial-nodes": &dialNodesSubcommand{},
+ "reconcile": &reconcileSubcommand{},
+ "sql-migrate-down": &sqlMigrateDownSubcommand{},
+ "sql-migrate-status": &sqlMigrateStatusSubcommand{},
+ "dataloss": newDatalossSubcommand(),
+ "accept-dataloss": &acceptDatalossSubcommand{},
+ "set-replication-factor": newSetReplicatioFactorSubcommand(os.Stdout),
+ removeRepositoryCmdName: newRemoveRepository(logger),
+ trackRepositoryCmdName: newTrackRepository(logger),
+ listUntrackedRepositoriesName: newListUntrackedRepositories(logger, os.Stdout),
}
)
@@ -137,8 +142,8 @@ func printfErr(format string, a ...interface{}) (int, error) {
return fmt.Fprintf(os.Stderr, format, a...)
}
-func subCmdDial(addr, token string, opts ...grpc.DialOption) (*grpc.ClientConn, error) {
- ctx, cancel := context.WithTimeout(context.TODO(), 30*time.Second)
+func subCmdDial(ctx context.Context, addr, token string, timeout time.Duration, opts ...grpc.DialOption) (*grpc.ClientConn, error) {
+ ctx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
opts = append(opts,
diff --git a/cmd/praefect/subcmd_accept_dataloss.go b/cmd/praefect/subcmd_accept_dataloss.go
index 0c4ea015a..e037df424 100644
--- a/cmd/praefect/subcmd_accept_dataloss.go
+++ b/cmd/praefect/subcmd_accept_dataloss.go
@@ -51,7 +51,7 @@ func (cmd *acceptDatalossSubcommand) Exec(flags *flag.FlagSet, cfg config.Config
return err
}
- conn, err := subCmdDial(nodeAddr, cfg.Auth.Token)
+ conn, err := subCmdDial(context.Background(), nodeAddr, cfg.Auth.Token, defaultDialTimeout)
if err != nil {
return fmt.Errorf("error dialing: %w", err)
}
diff --git a/cmd/praefect/subcmd_dataloss.go b/cmd/praefect/subcmd_dataloss.go
index 037edc886..89a1d595f 100644
--- a/cmd/praefect/subcmd_dataloss.go
+++ b/cmd/praefect/subcmd_dataloss.go
@@ -67,7 +67,7 @@ func (cmd *datalossSubcommand) Exec(flags *flag.FlagSet, cfg config.Config) erro
return err
}
- conn, err := subCmdDial(nodeAddr, cfg.Auth.Token)
+ conn, err := subCmdDial(context.Background(), nodeAddr, cfg.Auth.Token, defaultDialTimeout)
if err != nil {
return fmt.Errorf("error dialing: %v", err)
}
diff --git a/cmd/praefect/subcmd_list_untracked_repositories.go b/cmd/praefect/subcmd_list_untracked_repositories.go
new file mode 100644
index 000000000..bff6140a0
--- /dev/null
+++ b/cmd/praefect/subcmd_list_untracked_repositories.go
@@ -0,0 +1,172 @@
+package main
+
+import (
+ "context"
+ "encoding/json"
+ "errors"
+ "flag"
+ "fmt"
+ "io"
+
+ "github.com/sirupsen/logrus"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/praefect"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/config"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore/glsql"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/repocleaner"
+ "gitlab.com/gitlab-org/labkit/correlation"
+ "google.golang.org/grpc/metadata"
+)
+
+const (
+ listUntrackedRepositoriesName = "list-untracked-repositories"
+)
+
+var errNoConnectionToGitalies = errors.New("no connection established to gitaly nodes")
+
+type listUntrackedRepositories struct {
+ logger logrus.FieldLogger
+ delimiter string
+ out io.Writer
+}
+
+func newListUntrackedRepositories(logger logrus.FieldLogger, out io.Writer) *listUntrackedRepositories {
+ return &listUntrackedRepositories{logger: logger, out: out}
+}
+
+func (cmd *listUntrackedRepositories) FlagSet() *flag.FlagSet {
+ fs := flag.NewFlagSet(listUntrackedRepositoriesName, flag.ExitOnError)
+ fs.StringVar(&cmd.delimiter, "delimiter", "\n", "string used as a delimiter in output")
+ fs.Usage = func() {
+ _, _ = printfErr("Description:\n" +
+ " This command checks if all repositories on all gitaly nodes tracked by praefect.\n" +
+ " If repository is found on the disk, but it is not known to praefect the location of\n" +
+ " that repository will be written into stdout stream in JSON format.\n")
+ fs.PrintDefaults()
+ _, _ = printfErr("NOTE:\n" +
+ " All errors and log messages directed to the stderr stream.\n" +
+ " The output is produced as the new data appears, it doesn't wait\n" +
+ " for the completion of the processing to produce the result.\n")
+ }
+ return fs
+}
+
+func (cmd listUntrackedRepositories) Exec(flags *flag.FlagSet, cfg config.Config) error {
+ if flags.NArg() > 0 {
+ return unexpectedPositionalArgsError{Command: flags.Name()}
+ }
+
+ ctx := correlation.ContextWithCorrelation(context.Background(), correlation.SafeRandomID())
+ ctx = metadata.AppendToOutgoingContext(ctx, "client_name", listUntrackedRepositoriesName)
+
+ logger := cmd.logger.WithField("correlation_id", correlation.ExtractFromContext(ctx))
+ logger.Debugf("starting %s command", cmd.FlagSet().Name())
+
+ logger.Debug("dialing to gitaly nodes...")
+ nodeSet, err := dialGitalyStorages(cfg)
+ if err != nil {
+ return fmt.Errorf("dial nodes: %w", err)
+ }
+ defer nodeSet.Close()
+ logger.Debug("connected to gitaly nodes")
+
+ logger.Debug("connecting to praefect database...")
+ db, err := glsql.OpenDB(cfg.DB)
+ if err != nil {
+ return fmt.Errorf("connect to database: %w", err)
+ }
+ defer func() { _ = db.Close() }()
+ logger.Debug("connected to praefect database")
+
+ walker := repocleaner.NewWalker(nodeSet.Connections(), 16)
+ reporter := reportUntrackedRepositories{
+ ctx: ctx,
+ checker: datastore.NewStorageCleanup(db),
+ delimiter: cmd.delimiter,
+ out: cmd.out,
+ }
+ for _, vs := range cfg.VirtualStorages {
+ for _, node := range vs.Nodes {
+ logger.Debugf("check %q/%q storage repositories", vs.Name, node.Storage)
+ if err := walker.ExecOnRepositories(ctx, vs.Name, node.Storage, reporter.Report); err != nil {
+ return fmt.Errorf("exec on %q/%q: %w", vs.Name, node.Storage, err)
+ }
+ }
+ }
+ logger.Debug("completed")
+ return nil
+}
+
+func dialGitalyStorages(cfg config.Config) (praefect.NodeSet, error) {
+ nodeSet := praefect.NodeSet{}
+ for _, vs := range cfg.VirtualStorages {
+ for _, node := range vs.Nodes {
+ conn, err := subCmdDial(context.Background(), node.Address, node.Token, defaultDialTimeout)
+ if err != nil {
+ return nil, fmt.Errorf("dial with %q gitaly at %q", node.Storage, node.Address)
+ }
+ if _, found := nodeSet[vs.Name]; !found {
+ nodeSet[vs.Name] = map[string]praefect.Node{}
+ }
+ nodeSet[vs.Name][node.Storage] = praefect.Node{
+ Storage: node.Storage,
+ Address: node.Address,
+ Token: node.Token,
+ Connection: conn,
+ }
+ }
+ }
+ if len(nodeSet.Connections()) == 0 {
+ return nil, errNoConnectionToGitalies
+ }
+ return nodeSet, nil
+}
+
+type reportUntrackedRepositories struct {
+ ctx context.Context
+ checker *datastore.StorageCleanup
+ out io.Writer
+ delimiter string
+}
+
+// Report method accepts a list of repositories, checks if they exist in the praefect database
+// and writes JSON serialized location of each untracked repository using the configured delimiter
+// and writer.
+func (r *reportUntrackedRepositories) Report(paths []datastore.RepositoryClusterPath) error {
+ if len(paths) == 0 {
+ return nil
+ }
+
+ replicaRelPaths := make([]string, len(paths))
+ for i, path := range paths {
+ replicaRelPaths[i] = path.RelativePath
+ }
+
+ missing, err := r.checker.DoesntExist(r.ctx, paths[0].VirtualStorage, paths[0].Storage, replicaRelPaths)
+ if err != nil {
+ return fmt.Errorf("existence check: %w", err)
+ }
+
+ for _, repoClusterPath := range missing {
+ d, err := serializeRepositoryClusterPath(repoClusterPath)
+ if err != nil {
+ return fmt.Errorf("serialize %+v: %w", repoClusterPath, err)
+ }
+ if _, err := r.out.Write(d); err != nil {
+ return fmt.Errorf("write serialized data to output: %w", err)
+ }
+ if _, err := r.out.Write([]byte(r.delimiter)); err != nil {
+ return fmt.Errorf("write serialized data to output: %w", err)
+ }
+ }
+
+ return nil
+}
+
+func serializeRepositoryClusterPath(path datastore.RepositoryClusterPath) ([]byte, error) {
+ return json.Marshal(map[string]string{
+ "virtual_storage": path.VirtualStorage,
+ "storage": path.Storage,
+ "relative_path": path.RelativePath,
+ })
+}
diff --git a/cmd/praefect/subcmd_list_untracked_repositories_test.go b/cmd/praefect/subcmd_list_untracked_repositories_test.go
new file mode 100644
index 000000000..95cd2e072
--- /dev/null
+++ b/cmd/praefect/subcmd_list_untracked_repositories_test.go
@@ -0,0 +1,143 @@
+// +build postgres
+
+package main
+
+import (
+ "bytes"
+ "context"
+ "flag"
+ "fmt"
+ "strings"
+ "testing"
+ "time"
+
+ "github.com/prometheus/client_golang/prometheus"
+ "github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
+ "gitlab.com/gitlab-org/gitaly/v14/client"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/bootstrap"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/git/gittest"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/service/setup"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/config"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore/glsql"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testcfg"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testserver"
+ "gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb"
+)
+
+func TestListUntrackedRepositories_FlagSet(t *testing.T) {
+ cmd := &listUntrackedRepositories{}
+ for _, tc := range []struct {
+ desc string
+ args []string
+ exp []interface{}
+ }{
+ {
+ desc: "custom value",
+ args: []string{"--delimiter", ","},
+ exp: []interface{}{","},
+ },
+ {
+ desc: "default value",
+ args: nil,
+ exp: []interface{}{"\n"},
+ },
+ } {
+ t.Run(tc.desc, func(t *testing.T) {
+ fs := cmd.FlagSet()
+ require.NoError(t, fs.Parse(tc.args))
+ require.ElementsMatch(t, tc.exp, []interface{}{cmd.delimiter})
+ })
+ }
+}
+
+func TestListUntrackedRepositories_Exec(t *testing.T) {
+ g1Cfg := testcfg.Build(t, testcfg.WithStorages("gitaly-1"))
+ g2Cfg := testcfg.Build(t, testcfg.WithStorages("gitaly-2"))
+
+ // Repositories not managed by praefect.
+ repo1, _, _ := gittest.InitBareRepoAt(t, g1Cfg, g1Cfg.Storages[0])
+ repo2, _, _ := gittest.InitBareRepoAt(t, g1Cfg, g1Cfg.Storages[0])
+ repo3, _, _ := gittest.InitBareRepoAt(t, g2Cfg, g2Cfg.Storages[0])
+
+ g1Addr := testserver.RunGitalyServer(t, g1Cfg, nil, setup.RegisterAll, testserver.WithDisablePraefect())
+ g2Addr := testserver.RunGitalyServer(t, g2Cfg, nil, setup.RegisterAll, testserver.WithDisablePraefect())
+
+ conf := config.Config{
+ SocketPath: testhelper.GetTemporaryGitalySocketFileName(t),
+ VirtualStorages: []*config.VirtualStorage{
+ {
+ Name: "praefect",
+ Nodes: []*config.Node{
+ {Storage: g1Cfg.Storages[0].Name, Address: g1Addr},
+ {Storage: g2Cfg.Storages[0].Name, Address: g2Addr},
+ },
+ },
+ },
+ DB: glsql.GetDBConfig(t, "cmd_praefect"),
+ }
+
+ db := getDB(t)
+ // Mark virtual storages as imported to prevent the background importer job from running.
+ db.MustExec(
+ t,
+ `INSERT INTO virtual_storages(virtual_storage, repositories_imported) VALUES ($1, true)`,
+ conf.VirtualStorages[0].Name,
+ )
+
+ starterConfigs, err := getStarterConfigs(conf)
+ require.NoError(t, err)
+ stopped := make(chan struct{})
+ bootstrapper := bootstrap.NewNoop()
+ go func() {
+ defer close(stopped)
+ assert.NoError(t, run(starterConfigs, conf, bootstrapper, prometheus.NewRegistry()))
+ }()
+
+ cc, err := client.Dial("unix://"+conf.SocketPath, nil)
+ require.NoError(t, err)
+ defer func() { require.NoError(t, cc.Close()) }()
+ repoClient := gitalypb.NewRepositoryServiceClient(cc)
+
+ ctx, cancel := testhelper.Context()
+ defer cancel()
+
+ praefectStorage := conf.VirtualStorages[0].Name
+
+ // Repository managed by praefect, exists on gitaly-1 and gitaly-2.
+ createRepo(t, ctx, repoClient, praefectStorage, "path/to/test/repo")
+ out := &bytes.Buffer{}
+ cmd := newListUntrackedRepositories(testhelper.NewTestLogger(t), out)
+ require.NoError(t, cmd.Exec(flag.NewFlagSet("", flag.PanicOnError), conf))
+
+ exp := []string{
+ fmt.Sprintf(`{"relative_path":%q,"storage":"gitaly-1","virtual_storage":"praefect"}`, repo1.RelativePath),
+ fmt.Sprintf(`{"relative_path":%q,"storage":"gitaly-1","virtual_storage":"praefect"}`, repo2.RelativePath),
+ fmt.Sprintf(`{"relative_path":%q,"storage":"gitaly-2","virtual_storage":"praefect"}`, repo3.RelativePath),
+ "", // an empty extra element required as each line ends with "delimiter" and strings.Split returns all parts
+ }
+ require.ElementsMatch(t, exp, strings.Split(out.String(), "\n"))
+
+ bootstrapper.Terminate()
+ <-stopped
+}
+
+func createRepo(t *testing.T, ctx context.Context, repoClient gitalypb.RepositoryServiceClient, storageName, relativePath string) *gitalypb.Repository {
+ t.Helper()
+ repo := &gitalypb.Repository{
+ StorageName: storageName,
+ RelativePath: relativePath,
+ }
+ for i := 0; true; i++ {
+ _, err := repoClient.CreateRepository(ctx, &gitalypb.CreateRepositoryRequest{Repository: repo})
+ if err != nil {
+ require.Regexp(t, "(no healthy nodes)|(no such file or directory)|(connection refused)", err.Error())
+ require.Less(t, i, 100, "praefect doesn't serve for too long")
+ time.Sleep(50 * time.Millisecond)
+ } else {
+ break
+ }
+ }
+ return repo
+}
diff --git a/cmd/praefect/subcmd_pingnodes.go b/cmd/praefect/subcmd_pingnodes.go
index 0b0873484..4bdf0a76e 100644
--- a/cmd/praefect/subcmd_pingnodes.go
+++ b/cmd/praefect/subcmd_pingnodes.go
@@ -88,7 +88,7 @@ func (s *dialNodesSubcommand) Exec(flags *flag.FlagSet, conf config.Config) erro
}
func (npr *nodePing) dial() (*grpc.ClientConn, error) {
- return subCmdDial(npr.address, npr.token)
+ return subCmdDial(context.Background(), npr.address, npr.token, defaultDialTimeout)
}
func (npr *nodePing) healthCheck(cc *grpc.ClientConn) (grpc_health_v1.HealthCheckResponse_ServingStatus, error) {
diff --git a/cmd/praefect/subcmd_reconcile.go b/cmd/praefect/subcmd_reconcile.go
index 47917501f..8e21789b8 100644
--- a/cmd/praefect/subcmd_reconcile.go
+++ b/cmd/praefect/subcmd_reconcile.go
@@ -61,6 +61,8 @@ func getNodeAddress(cfg config.Config) (string, error) {
return "unix:" + cfg.SocketPath, nil
case cfg.ListenAddr != "":
return "tcp://" + cfg.ListenAddr, nil
+ case cfg.TLSListenAddr != "":
+ return "tls://" + cfg.TLSListenAddr, nil
default:
return "", errors.New("no Praefect address configured")
}
@@ -76,7 +78,7 @@ func (nr nodeReconciler) reconcile() error {
return err
}
- cc, err := subCmdDial(nodeAddr, nr.conf.Auth.Token)
+ cc, err := subCmdDial(context.Background(), nodeAddr, nr.conf.Auth.Token, defaultDialTimeout)
if err != nil {
return err
}
diff --git a/cmd/praefect/subcmd_remove_repository.go b/cmd/praefect/subcmd_remove_repository.go
new file mode 100644
index 000000000..426f51e1a
--- /dev/null
+++ b/cmd/praefect/subcmd_remove_repository.go
@@ -0,0 +1,221 @@
+package main
+
+import (
+ "context"
+ "database/sql"
+ "errors"
+ "flag"
+ "fmt"
+ "strings"
+ "sync"
+ "time"
+
+ "github.com/sirupsen/logrus"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/helper"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/config"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore/glsql"
+ "gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb"
+ "gitlab.com/gitlab-org/labkit/correlation"
+ "google.golang.org/grpc/metadata"
+ "google.golang.org/grpc/status"
+)
+
+const (
+ removeRepositoryCmdName = "remove-repository"
+)
+
+type removeRepository struct {
+ logger logrus.FieldLogger
+ virtualStorage string
+ relativePath string
+ timeout time.Duration
+}
+
+func newRemoveRepository(logger logrus.FieldLogger) *removeRepository {
+ return &removeRepository{logger: logger, timeout: defaultDialTimeout}
+}
+
+func (cmd *removeRepository) FlagSet() *flag.FlagSet {
+ fs := flag.NewFlagSet(removeRepositoryCmdName, flag.ExitOnError)
+ fs.StringVar(&cmd.virtualStorage, paramVirtualStorage, "", "name of the repository's virtual storage")
+ fs.StringVar(&cmd.relativePath, paramRelativePath, "", "relative path to the repository")
+ fs.Usage = func() {
+ _, _ = printfErr("Description:\n" +
+ " This command removes all state associated with a given repository from the Gitaly Cluster.\n" +
+ " This includes both on-disk repositories on all relevant Gitaly nodes as well as any potential\n" +
+ " database state as tracked by Praefect.\n")
+ fs.PrintDefaults()
+ _, _ = printfErr("NOTE:\n" +
+ " It may happen that parts of the repository continue to exist after this command, either because\n" +
+ " of an error that happened during deletion or because of in-flight RPC calls targeting the repository.\n" +
+ " It is safe and recommended to re-run this command in such a case.\n")
+ }
+ return fs
+}
+
+func (cmd removeRepository) Exec(flags *flag.FlagSet, cfg config.Config) error {
+ switch {
+ case flags.NArg() > 0:
+ return unexpectedPositionalArgsError{Command: flags.Name()}
+ case cmd.virtualStorage == "":
+ return requiredParameterError(paramVirtualStorage)
+ case cmd.relativePath == "":
+ return requiredParameterError(paramRelativePath)
+ }
+
+ db, err := glsql.OpenDB(cfg.DB)
+ if err != nil {
+ return fmt.Errorf("connect to database: %w", err)
+ }
+ defer func() { _ = db.Close() }()
+
+ ctx := correlation.ContextWithCorrelation(context.Background(), correlation.SafeRandomID())
+ logger := cmd.logger.WithField("correlation_id", correlation.ExtractFromContext(ctx))
+
+ return cmd.exec(ctx, logger, db, cfg)
+}
+
+func (cmd *removeRepository) exec(ctx context.Context, logger logrus.FieldLogger, db *sql.DB, cfg config.Config) error {
+ // Remove repository explicitly from all storages and clean up database info.
+ // This prevents creation of the new replication events.
+ logger.WithFields(logrus.Fields{
+ "virtual_storage": cmd.virtualStorage,
+ "relative_path": cmd.relativePath,
+ }).Debug("remove repository")
+
+ addr, err := getNodeAddress(cfg)
+ if err != nil {
+ return fmt.Errorf("get praefect address from config: %w", err)
+ }
+
+ logger.Debugf("remove repository info from praefect database %q", addr)
+ removed, err := cmd.removeRepositoryFromDatabase(ctx, db)
+ if err != nil {
+ return fmt.Errorf("remove repository info from praefect database: %w", err)
+ }
+ if !removed {
+ logger.Warn("praefect database has no info about the repository")
+ }
+ logger.Debug("removal of the repository info from praefect database completed")
+
+ logger.Debug("remove replication events")
+ ticker := helper.NewTimerTicker(time.Second)
+ defer ticker.Stop()
+ if err := cmd.removeReplicationEvents(ctx, logger, db, ticker); err != nil {
+ return fmt.Errorf("remove scheduled replication events: %w", err)
+ }
+ logger.Debug("replication events removal completed")
+ // We should try to remove repository from each of gitaly nodes.
+ logger.Debug("remove repository directly by each gitaly node")
+ cmd.removeRepositoryForEachGitaly(ctx, cfg, logger)
+ logger.Debug("direct repository removal by each gitaly node completed")
+
+ return nil
+}
+
+func (cmd *removeRepository) removeRepositoryFromDatabase(ctx context.Context, db *sql.DB) (bool, error) {
+ var removed bool
+ if err := db.QueryRowContext(
+ ctx,
+ `WITH remove_storages_info AS (
+ DELETE FROM storage_repositories
+ WHERE virtual_storage = $1 AND relative_path = $2
+ )
+ DELETE FROM repositories
+ WHERE virtual_storage = $1 AND relative_path = $2
+ RETURNING TRUE`,
+ cmd.virtualStorage,
+ cmd.relativePath,
+ ).Scan(&removed); err != nil {
+ if errors.Is(err, sql.ErrNoRows) {
+ return false, nil
+ }
+ return false, fmt.Errorf("query row: %w", err)
+ }
+ return removed, nil
+}
+
+func (cmd *removeRepository) removeRepository(ctx context.Context, repo *gitalypb.Repository, addr, token string) (bool, error) {
+ conn, err := subCmdDial(ctx, addr, token, cmd.timeout)
+ if err != nil {
+ return false, fmt.Errorf("error dialing: %w", err)
+ }
+ defer func() { _ = conn.Close() }()
+
+ ctx = metadata.AppendToOutgoingContext(ctx, "client_name", removeRepositoryCmdName)
+ repositoryClient := gitalypb.NewRepositoryServiceClient(conn)
+ if _, err := repositoryClient.RemoveRepository(ctx, &gitalypb.RemoveRepositoryRequest{Repository: repo}); err != nil {
+ s, ok := status.FromError(err)
+ if !ok {
+ return false, fmt.Errorf("RemoveRepository: %w", err)
+ }
+ if !strings.Contains(s.Message(), fmt.Sprintf("get primary: repository %q/%q not found", cmd.virtualStorage, cmd.relativePath)) {
+ return false, fmt.Errorf("RemoveRepository: %w", err)
+ }
+ return false, nil
+ }
+ return true, nil
+}
+
+func (cmd *removeRepository) removeReplicationEvents(ctx context.Context, logger logrus.FieldLogger, db *sql.DB, ticker helper.Ticker) error {
+ // Wait for the completion of the repository replication jobs.
+ // As some of them could be a repository creation jobs we need to remove those newly created
+ // repositories after replication finished.
+ start := time.Now()
+ for found := true; found; {
+ ticker.Reset()
+ <-ticker.C()
+ if int(time.Since(start).Seconds())%5 == 0 {
+ logger.Debug("awaiting for the repository in_progress replication jobs to complete...")
+ }
+ row := db.QueryRowContext(
+ ctx,
+ `WITH remove_replication_jobs AS (
+ DELETE FROM replication_queue
+ WHERE job->>'virtual_storage' = $1
+ AND job->>'relative_path' = $2
+ -- Do not remove ongoing replication events as we need to wait
+ -- for their completion.
+ AND state != 'in_progress'
+ )
+ SELECT EXISTS(
+ SELECT
+ FROM replication_queue
+ WHERE job->>'virtual_storage' = $1
+ AND job->>'relative_path' = $2
+ AND state = 'in_progress')`,
+ cmd.virtualStorage,
+ cmd.relativePath,
+ )
+ if err := row.Scan(&found); err != nil {
+ return fmt.Errorf("scan in progress jobs: %w", err)
+ }
+ }
+ return nil
+}
+
+func (cmd *removeRepository) removeRepositoryForEachGitaly(ctx context.Context, cfg config.Config, logger logrus.FieldLogger) {
+ for _, vs := range cfg.VirtualStorages {
+ if vs.Name == cmd.virtualStorage {
+ var wg sync.WaitGroup
+ for i := 0; i < len(vs.Nodes); i++ {
+ wg.Add(1)
+ go func(node *config.Node) {
+ defer wg.Done()
+ logger.Debugf("remove repository with gitaly %q at %q", node.Storage, node.Address)
+ repo := &gitalypb.Repository{
+ StorageName: node.Storage,
+ RelativePath: cmd.relativePath,
+ }
+ _, err := cmd.removeRepository(ctx, repo, node.Address, node.Token)
+ if err != nil {
+ logger.WithError(err).Warnf("repository removal failed for gitaly %q", node.Storage)
+ }
+ logger.Debugf("repository removal call to gitaly %q completed", node.Storage)
+ }(vs.Nodes[i])
+ }
+ wg.Wait()
+ break
+ }
+ }
+}
diff --git a/cmd/praefect/subcmd_remove_repository_test.go b/cmd/praefect/subcmd_remove_repository_test.go
new file mode 100644
index 000000000..eb5d0ed2a
--- /dev/null
+++ b/cmd/praefect/subcmd_remove_repository_test.go
@@ -0,0 +1,328 @@
+// +build postgres
+
+package main
+
+import (
+ "flag"
+ "path/filepath"
+ "strings"
+ "testing"
+ "time"
+
+ "github.com/prometheus/client_golang/prometheus"
+ "github.com/sirupsen/logrus"
+ "github.com/sirupsen/logrus/hooks/test"
+ "github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
+ "gitlab.com/gitlab-org/gitaly/v14/client"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/bootstrap"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/service/setup"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/helper"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/config"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore/glsql"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testcfg"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testserver"
+ "gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb"
+)
+
+func TestRemoveRepository_FlagSet(t *testing.T) {
+ cmd := &removeRepository{}
+ fs := cmd.FlagSet()
+ require.NoError(t, fs.Parse([]string{"--virtual-storage", "vs", "--repository", "repo"}))
+ require.Equal(t, "vs", cmd.virtualStorage)
+ require.Equal(t, "repo", cmd.relativePath)
+}
+
+func TestRemoveRepository_Exec_invalidArgs(t *testing.T) {
+ t.Run("not all flag values processed", func(t *testing.T) {
+ cmd := removeRepository{}
+ flagSet := flag.NewFlagSet("cmd", flag.PanicOnError)
+ require.NoError(t, flagSet.Parse([]string{"stub"}))
+ err := cmd.Exec(flagSet, config.Config{})
+ require.EqualError(t, err, "cmd doesn't accept positional arguments")
+ })
+
+ t.Run("virtual-storage is not set", func(t *testing.T) {
+ cmd := removeRepository{}
+ err := cmd.Exec(flag.NewFlagSet("", flag.PanicOnError), config.Config{})
+ require.EqualError(t, err, `"virtual-storage" is a required parameter`)
+ })
+
+ t.Run("repository is not set", func(t *testing.T) {
+ cmd := removeRepository{virtualStorage: "stub"}
+ err := cmd.Exec(flag.NewFlagSet("", flag.PanicOnError), config.Config{})
+ require.EqualError(t, err, `"repository" is a required parameter`)
+ })
+
+ t.Run("db connection error", func(t *testing.T) {
+ cmd := removeRepository{virtualStorage: "stub", relativePath: "stub"}
+ cfg := config.Config{DB: config.DB{Host: "stub", SSLMode: "disable"}}
+ err := cmd.Exec(flag.NewFlagSet("", flag.PanicOnError), cfg)
+ require.Error(t, err)
+ require.Contains(t, err.Error(), "connect to database: dial tcp: lookup stub")
+ })
+
+ t.Run("praefect address is not set in config ", func(t *testing.T) {
+ cmd := removeRepository{virtualStorage: "stub", relativePath: "stub", logger: testhelper.NewTestLogger(t)}
+ dbConf := glsql.GetDBConfig(t, "cmd_praefect")
+ err := cmd.Exec(flag.NewFlagSet("", flag.PanicOnError), config.Config{DB: dbConf})
+ require.EqualError(t, err, "get praefect address from config: no Praefect address configured")
+ })
+}
+
+func TestRemoveRepository_Exec(t *testing.T) {
+ g1Cfg := testcfg.Build(t, testcfg.WithStorages("gitaly-1"))
+ g2Cfg := testcfg.Build(t, testcfg.WithStorages("gitaly-2"))
+
+ g1Addr := testserver.RunGitalyServer(t, g1Cfg, nil, setup.RegisterAll, testserver.WithDisablePraefect())
+ g2Srv := testserver.StartGitalyServer(t, g2Cfg, nil, setup.RegisterAll, testserver.WithDisablePraefect())
+
+ db := getDB(t)
+ dbConf := glsql.GetDBConfig(t, "cmd_praefect")
+
+ conf := config.Config{
+ SocketPath: testhelper.GetTemporaryGitalySocketFileName(t),
+ VirtualStorages: []*config.VirtualStorage{
+ {
+ Name: "praefect",
+ Nodes: []*config.Node{
+ {Storage: g1Cfg.Storages[0].Name, Address: g1Addr},
+ {Storage: g2Cfg.Storages[0].Name, Address: g2Srv.Address()},
+ },
+ },
+ },
+ DB: dbConf,
+ Failover: config.Failover{
+ Enabled: true,
+ ElectionStrategy: config.ElectionStrategyPerRepository,
+ },
+ }
+
+ starterConfigs, err := getStarterConfigs(conf)
+ require.NoError(t, err)
+ stopped := make(chan struct{})
+ bootstrapper := bootstrap.NewNoop()
+ go func() {
+ defer close(stopped)
+ assert.NoError(t, run(starterConfigs, conf, bootstrapper, prometheus.NewRegistry()))
+ }()
+
+ cc, err := client.Dial("unix://"+conf.SocketPath, nil)
+ require.NoError(t, err)
+ defer func() { require.NoError(t, cc.Close()) }()
+ repoClient := gitalypb.NewRepositoryServiceClient(cc)
+
+ ctx, cancel := testhelper.Context()
+ defer cancel()
+
+ praefectStorage := conf.VirtualStorages[0].Name
+
+ t.Run("ok", func(t *testing.T) {
+ repo := createRepo(t, ctx, repoClient, praefectStorage, t.Name())
+ cmd := &removeRepository{
+ logger: testhelper.NewTestLogger(t),
+ virtualStorage: repo.StorageName,
+ relativePath: repo.RelativePath,
+ timeout: defaultDialTimeout,
+ }
+ require.NoError(t, cmd.Exec(flag.NewFlagSet("", flag.PanicOnError), conf))
+
+ require.NoDirExists(t, filepath.Join(g1Cfg.Storages[0].Path, repo.RelativePath))
+ require.NoDirExists(t, filepath.Join(g2Cfg.Storages[0].Path, repo.RelativePath))
+
+ var repositoryRowExists bool
+ require.NoError(t, db.QueryRow(
+ `SELECT EXISTS(SELECT FROM repositories WHERE virtual_storage = $1 AND relative_path = $2)`,
+ cmd.virtualStorage, cmd.relativePath,
+ ).Scan(&repositoryRowExists))
+ require.False(t, repositoryRowExists)
+ })
+
+ t.Run("no info about repository on praefect", func(t *testing.T) {
+ repo := createRepo(t, ctx, repoClient, praefectStorage, t.Name())
+ repoStore := datastore.NewPostgresRepositoryStore(db.DB, nil)
+ require.NoError(t, repoStore.DeleteRepository(
+ ctx, repo.StorageName, repo.RelativePath, g1Cfg.Storages[0].Name,
+ ))
+ require.NoError(t, repoStore.DeleteRepository(
+ ctx, repo.StorageName, repo.RelativePath, g2Cfg.Storages[0].Name,
+ ))
+
+ logger := testhelper.NewTestLogger(t)
+ loggerHook := test.NewLocal(logger)
+ cmd := &removeRepository{
+ logger: logrus.NewEntry(logger),
+ virtualStorage: praefectStorage,
+ relativePath: repo.RelativePath,
+ timeout: defaultDialTimeout,
+ }
+ require.NoError(t, cmd.Exec(flag.NewFlagSet("", flag.PanicOnError), conf))
+ var found bool
+ for _, entry := range loggerHook.AllEntries() {
+ if strings.Contains(entry.Message, "praefect database has no info about the repository") {
+ found = true
+ }
+ }
+ require.True(t, found, "no expected message in the log")
+
+ require.NoDirExists(t, filepath.Join(g1Cfg.Storages[0].Path, repo.RelativePath))
+ require.NoDirExists(t, filepath.Join(g2Cfg.Storages[0].Path, repo.RelativePath))
+
+ requireNoDatabaseInfo(t, db, cmd)
+ })
+
+ t.Run("one of gitalies is out of service", func(t *testing.T) {
+ repo := createRepo(t, ctx, repoClient, praefectStorage, t.Name())
+ g2Srv.Shutdown()
+
+ logger := testhelper.NewTestLogger(t)
+ loggerHook := test.NewLocal(logger)
+ cmd := &removeRepository{
+ logger: logrus.NewEntry(logger),
+ virtualStorage: praefectStorage,
+ relativePath: repo.RelativePath,
+ timeout: time.Second,
+ }
+
+ for {
+ err := cmd.Exec(flag.NewFlagSet("", flag.PanicOnError), conf)
+ if err == nil {
+ break
+ }
+ regexp := "(transport: Error while dialing dial unix /" + strings.TrimPrefix(g2Srv.Address(), "unix:/") + ")|(primary gitaly is not healthy)"
+ require.Regexp(t, regexp, err.Error())
+ time.Sleep(time.Second)
+ }
+
+ var found bool
+ for _, entry := range loggerHook.AllEntries() {
+ if strings.Contains(entry.Message, `repository removal failed for gitaly "gitaly-2"`) {
+ found = true
+ break
+ }
+ }
+ require.True(t, found, "no expected message in the log")
+
+ require.NoDirExists(t, filepath.Join(g1Cfg.Storages[0].Path, repo.RelativePath))
+ require.DirExists(t, filepath.Join(g2Cfg.Storages[0].Path, repo.RelativePath))
+
+ requireNoDatabaseInfo(t, db, cmd)
+ })
+
+ bootstrapper.Terminate()
+ <-stopped
+}
+
+func requireNoDatabaseInfo(t *testing.T, db glsql.DB, cmd *removeRepository) {
+ t.Helper()
+ var repositoryRowExists bool
+ require.NoError(t, db.QueryRow(
+ `SELECT EXISTS(SELECT FROM repositories WHERE virtual_storage = $1 AND relative_path = $2)`,
+ cmd.virtualStorage, cmd.relativePath,
+ ).Scan(&repositoryRowExists))
+ require.False(t, repositoryRowExists)
+ var storageRowExists bool
+ require.NoError(t, db.QueryRow(
+ `SELECT EXISTS(SELECT FROM storage_repositories WHERE virtual_storage = $1 AND relative_path = $2)`,
+ cmd.virtualStorage, cmd.relativePath,
+ ).Scan(&storageRowExists))
+ require.False(t, storageRowExists)
+}
+
+func TestRemoveRepository_removeReplicationEvents(t *testing.T) {
+ const (
+ virtualStorage = "praefect"
+ relativePath = "relative_path/to/repo.git"
+ )
+
+ ctx, cancel := testhelper.Context()
+ defer cancel()
+
+ db := getDB(t)
+
+ queue := datastore.NewPostgresReplicationEventQueue(db)
+
+ // Set replication event in_progress.
+ inProgressEvent, err := queue.Enqueue(ctx, datastore.ReplicationEvent{
+ Job: datastore.ReplicationJob{
+ Change: datastore.CreateRepo,
+ VirtualStorage: virtualStorage,
+ TargetNodeStorage: "gitaly-2",
+ RelativePath: relativePath,
+ },
+ })
+ require.NoError(t, err)
+ inProgress1, err := queue.Dequeue(ctx, virtualStorage, "gitaly-2", 10)
+ require.NoError(t, err)
+ require.Len(t, inProgress1, 1)
+
+ // New event - events in the 'ready' state should be removed.
+ _, err = queue.Enqueue(ctx, datastore.ReplicationEvent{
+ Job: datastore.ReplicationJob{
+ Change: datastore.UpdateRepo,
+ VirtualStorage: virtualStorage,
+ TargetNodeStorage: "gitaly-3",
+ SourceNodeStorage: "gitaly-1",
+ RelativePath: relativePath,
+ },
+ })
+ require.NoError(t, err)
+
+ // Failed event - should be removed as well.
+ failedEvent, err := queue.Enqueue(ctx, datastore.ReplicationEvent{
+ Job: datastore.ReplicationJob{
+ Change: datastore.UpdateRepo,
+ VirtualStorage: virtualStorage,
+ TargetNodeStorage: "gitaly-4",
+ SourceNodeStorage: "gitaly-0",
+ RelativePath: relativePath,
+ },
+ })
+ require.NoError(t, err)
+ inProgress2, err := queue.Dequeue(ctx, virtualStorage, "gitaly-4", 10)
+ require.NoError(t, err)
+ require.Len(t, inProgress2, 1)
+ // Acknowledge with failed status, so it will remain in the database for the next processing
+ // attempt or until it is deleted by the 'removeReplicationEvents' method.
+ acks2, err := queue.Acknowledge(ctx, datastore.JobStateFailed, []uint64{inProgress2[0].ID})
+ require.NoError(t, err)
+ require.Equal(t, []uint64{inProgress2[0].ID}, acks2)
+
+ ticker := helper.NewTimerTicker(time.Millisecond)
+ defer ticker.Stop()
+
+ errChan := make(chan error, 1)
+ go func() {
+ cmd := &removeRepository{virtualStorage: virtualStorage, relativePath: relativePath}
+ errChan <- cmd.removeReplicationEvents(ctx, testhelper.NewTestLogger(t), db.DB, ticker)
+ }()
+ go func() {
+ // We acknowledge in_progress job, so it unblocks the waiting loop.
+ acks, err := queue.Acknowledge(ctx, datastore.JobStateCompleted, []uint64{inProgressEvent.ID})
+ if assert.NoError(t, err) {
+ assert.Equal(t, []uint64{inProgress1[0].ID}, acks)
+ }
+ }()
+
+ for checkChan, exists := errChan, true; exists; {
+ select {
+ case err := <-checkChan:
+ require.NoError(t, err)
+ close(errChan)
+ checkChan = nil
+ default:
+ }
+ // Wait until job removed
+ row := db.QueryRow(`SELECT EXISTS(SELECT FROM replication_queue WHERE id = $1)`, failedEvent.ID)
+ require.NoError(t, row.Scan(&exists))
+ }
+ // Once there are no in_progress jobs anymore the method returns.
+ require.NoError(t, <-errChan)
+
+ var notExists bool
+ row := db.QueryRow(`SELECT NOT EXISTS(SELECT FROM replication_queue)`)
+ require.NoError(t, row.Scan(&notExists))
+ require.True(t, notExists)
+}
diff --git a/cmd/praefect/subcmd_set_replication_factor.go b/cmd/praefect/subcmd_set_replication_factor.go
index ff2cc4562..60c29203e 100644
--- a/cmd/praefect/subcmd_set_replication_factor.go
+++ b/cmd/praefect/subcmd_set_replication_factor.go
@@ -48,7 +48,7 @@ func (cmd *setReplicationFactorSubcommand) Exec(flags *flag.FlagSet, cfg config.
return err
}
- conn, err := subCmdDial(nodeAddr, cfg.Auth.Token)
+ conn, err := subCmdDial(context.Background(), nodeAddr, cfg.Auth.Token, defaultDialTimeout)
if err != nil {
return fmt.Errorf("error dialing: %w", err)
}
diff --git a/cmd/praefect/subcmd_track_repository.go b/cmd/praefect/subcmd_track_repository.go
new file mode 100644
index 000000000..169bfcf16
--- /dev/null
+++ b/cmd/praefect/subcmd_track_repository.go
@@ -0,0 +1,224 @@
+package main
+
+import (
+ "context"
+ "database/sql"
+ "errors"
+ "flag"
+ "fmt"
+ "math/rand"
+ "time"
+
+ "github.com/sirupsen/logrus"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/config"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore/glsql"
+ "gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb"
+ "gitlab.com/gitlab-org/labkit/correlation"
+ "google.golang.org/grpc/metadata"
+)
+
+const (
+ trackRepositoryCmdName = "track-repository"
+)
+
+type trackRepository struct {
+ logger logrus.FieldLogger
+ virtualStorage string
+ relativePath string
+ authoritativeStorage string
+}
+
+var errAuthoritativeRepositoryNotExist = errors.New("authoritative repository does not exist")
+
+func newTrackRepository(logger logrus.FieldLogger) *trackRepository {
+ return &trackRepository{logger: logger}
+}
+
+func (cmd *trackRepository) FlagSet() *flag.FlagSet {
+ fs := flag.NewFlagSet(trackRepositoryCmdName, flag.ExitOnError)
+ fs.StringVar(&cmd.virtualStorage, paramVirtualStorage, "", "name of the repository's virtual storage")
+ fs.StringVar(&cmd.relativePath, paramRelativePath, "", "relative path to the repository")
+ fs.StringVar(&cmd.authoritativeStorage, paramAuthoritativeStorage, "", "storage with the repository to consider as authoritative")
+ fs.Usage = func() {
+ _, _ = printfErr("Description:\n" +
+ " This command adds a given repository to be tracked by Praefect.\n" +
+ " It checks if the repository exists on disk on the authoritative storage, " +
+ " and whether database records are absent from tracking the repository.")
+ fs.PrintDefaults()
+ }
+ return fs
+}
+
+func (cmd trackRepository) Exec(flags *flag.FlagSet, cfg config.Config) error {
+ switch {
+ case flags.NArg() > 0:
+ return unexpectedPositionalArgsError{Command: flags.Name()}
+ case cmd.virtualStorage == "":
+ return requiredParameterError(paramVirtualStorage)
+ case cmd.relativePath == "":
+ return requiredParameterError(paramRelativePath)
+ case cmd.authoritativeStorage == "":
+ if cfg.Failover.ElectionStrategy == config.ElectionStrategyPerRepository {
+ return requiredParameterError(paramAuthoritativeStorage)
+ }
+ }
+
+ db, err := glsql.OpenDB(cfg.DB)
+ if err != nil {
+ return fmt.Errorf("connect to database: %w", err)
+ }
+ defer func() { _ = db.Close() }()
+
+ ctx := correlation.ContextWithCorrelation(context.Background(), correlation.SafeRandomID())
+ logger := cmd.logger.WithField("correlation_id", correlation.ExtractFromContext(ctx))
+
+ return cmd.exec(ctx, logger, db, cfg)
+}
+
+const trackRepoErrorPrefix = "attempting to track repository in praefect database"
+
+func (cmd *trackRepository) exec(ctx context.Context, logger logrus.FieldLogger, db *sql.DB, cfg config.Config) error {
+ logger.WithFields(logrus.Fields{
+ "virtual_storage": cmd.virtualStorage,
+ "relative_path": cmd.relativePath,
+ "authoritative_storage": cmd.authoritativeStorage,
+ }).Debug("track repository")
+
+ var primary string
+ var secondaries []string
+ var variableReplicationFactorEnabled, savePrimary bool
+ if cfg.Failover.ElectionStrategy == config.ElectionStrategyPerRepository {
+ savePrimary = true
+ primary = cmd.authoritativeStorage
+
+ for _, vs := range cfg.VirtualStorages {
+ if vs.Name == cmd.virtualStorage {
+ for _, node := range vs.Nodes {
+ if node.Storage == cmd.authoritativeStorage {
+ continue
+ }
+ secondaries = append(secondaries, node.Storage)
+ }
+ }
+ }
+
+ r := rand.New(rand.NewSource(time.Now().UnixNano()))
+ replicationFactor := cfg.DefaultReplicationFactors()[cmd.virtualStorage]
+
+ if replicationFactor > 0 {
+ variableReplicationFactorEnabled = true
+ // Select random secondaries according to the default replication factor.
+ r.Shuffle(len(secondaries), func(i, j int) {
+ secondaries[i], secondaries[j] = secondaries[j], secondaries[i]
+ })
+
+ secondaries = secondaries[:replicationFactor-1]
+ }
+ } else {
+ savePrimary = false
+ if err := db.QueryRowContext(ctx, `SELECT node_name FROM shard_primaries WHERE shard_name = $1 AND demoted = 'false'`, cmd.virtualStorage).Scan(&primary); err != nil {
+ if errors.Is(err, sql.ErrNoRows) {
+ return fmt.Errorf("%s: no primaries found", trackRepoErrorPrefix)
+ }
+ return fmt.Errorf("%s: %w", trackRepoErrorPrefix, err)
+ }
+ }
+
+ authoritativeRepoExists, err := cmd.authoritativeRepositoryExists(ctx, cfg, primary)
+ if err != nil {
+ return fmt.Errorf("%s: %w", trackRepoErrorPrefix, err)
+ }
+
+ if !authoritativeRepoExists {
+ return fmt.Errorf("%s: %w", trackRepoErrorPrefix, errAuthoritativeRepositoryNotExist)
+ }
+
+ if err := cmd.trackRepository(
+ ctx,
+ datastore.NewPostgresRepositoryStore(db, cfg.StorageNames()),
+ primary,
+ secondaries,
+ savePrimary,
+ variableReplicationFactorEnabled,
+ ); err != nil {
+ return fmt.Errorf("%s: %w", trackRepoErrorPrefix, err)
+ }
+
+ logger.Debug("finished adding new repository to be tracked in praefect database.")
+
+ return nil
+}
+
+func (cmd *trackRepository) trackRepository(
+ ctx context.Context,
+ ds *datastore.PostgresRepositoryStore,
+ primary string,
+ secondaries []string,
+ savePrimary bool,
+ variableReplicationFactorEnabled bool,
+) error {
+ if err := ds.CreateRepository(
+ ctx,
+ cmd.virtualStorage,
+ cmd.relativePath,
+ primary,
+ nil,
+ secondaries,
+ savePrimary,
+ variableReplicationFactorEnabled,
+ ); err != nil {
+ var repoExistsError datastore.RepositoryExistsError
+ if errors.As(err, &repoExistsError) {
+ cmd.logger.Print("repository is already tracked in praefect database")
+ return nil
+ }
+
+ return fmt.Errorf("CreateRepository: %w", err)
+ }
+
+ return nil
+}
+
+func (cmd *trackRepository) repositoryExists(ctx context.Context, repo *gitalypb.Repository, addr, token string) (bool, error) {
+ conn, err := subCmdDial(ctx, addr, token, defaultDialTimeout)
+ if err != nil {
+ return false, fmt.Errorf("error dialing: %w", err)
+ }
+ defer func() { _ = conn.Close() }()
+
+ ctx = metadata.AppendToOutgoingContext(ctx, "client_name", trackRepositoryCmdName)
+ repositoryClient := gitalypb.NewRepositoryServiceClient(conn)
+ res, err := repositoryClient.RepositoryExists(ctx, &gitalypb.RepositoryExistsRequest{Repository: repo})
+ if err != nil {
+ return false, err
+ }
+
+ return res.GetExists(), nil
+}
+
+func (cmd *trackRepository) authoritativeRepositoryExists(ctx context.Context, cfg config.Config, nodeName string) (bool, error) {
+ for _, vs := range cfg.VirtualStorages {
+ if vs.Name != cmd.virtualStorage {
+ continue
+ }
+
+ for _, node := range vs.Nodes {
+ if node.Storage == nodeName {
+ logger.Debugf("check if repository %q exists on gitaly %q at %q", cmd.relativePath, node.Storage, node.Address)
+ repo := &gitalypb.Repository{
+ StorageName: node.Storage,
+ RelativePath: cmd.relativePath,
+ }
+ exists, err := cmd.repositoryExists(ctx, repo, node.Address, node.Token)
+ if err != nil {
+ logger.WithError(err).Warnf("checking if repository exists %q, %q", node.Storage, cmd.relativePath)
+ return false, nil
+ }
+ return exists, nil
+ }
+ }
+ return false, fmt.Errorf("node %q not found", cmd.authoritativeStorage)
+ }
+ return false, fmt.Errorf("virtual storage %q not found", cmd.virtualStorage)
+}
diff --git a/cmd/praefect/subcmd_track_repository_test.go b/cmd/praefect/subcmd_track_repository_test.go
new file mode 100644
index 000000000..4dca21993
--- /dev/null
+++ b/cmd/praefect/subcmd_track_repository_test.go
@@ -0,0 +1,230 @@
+//go:build postgres
+// +build postgres
+
+package main
+
+import (
+ "errors"
+ "flag"
+ "fmt"
+ "path/filepath"
+ "testing"
+ "time"
+
+ "github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
+ "gitlab.com/gitlab-org/gitaly/v14/client"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/service/setup"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/config"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore/glsql"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/nodes"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/protoregistry"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/promtest"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testcfg"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testserver"
+ "gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb"
+)
+
+func TestTrackRepository_FlagSet(t *testing.T) {
+ cmd := &trackRepository{}
+ fs := cmd.FlagSet()
+ require.NoError(t, fs.Parse([]string{"--virtual-storage", "vs", "--repository", "repo", "--authoritative-storage", "storage-0"}))
+ require.Equal(t, "vs", cmd.virtualStorage)
+ require.Equal(t, "repo", cmd.relativePath)
+ require.Equal(t, "storage-0", cmd.authoritativeStorage)
+}
+
+func TestTrackRepository_Exec_invalidArgs(t *testing.T) {
+ t.Run("not all flag values processed", func(t *testing.T) {
+ cmd := trackRepository{}
+ flagSet := flag.NewFlagSet("cmd", flag.PanicOnError)
+ require.NoError(t, flagSet.Parse([]string{"stub"}))
+ err := cmd.Exec(flagSet, config.Config{})
+ require.EqualError(t, err, "cmd doesn't accept positional arguments")
+ })
+
+ t.Run("virtual-storage is not set", func(t *testing.T) {
+ cmd := trackRepository{}
+ err := cmd.Exec(flag.NewFlagSet("", flag.PanicOnError), config.Config{})
+ require.EqualError(t, err, `"virtual-storage" is a required parameter`)
+ })
+
+ t.Run("repository is not set", func(t *testing.T) {
+ cmd := trackRepository{virtualStorage: "stub"}
+ err := cmd.Exec(flag.NewFlagSet("", flag.PanicOnError), config.Config{})
+ require.EqualError(t, err, `"repository" is a required parameter`)
+ })
+
+ t.Run("authoritative-storage is not set", func(t *testing.T) {
+ cmd := trackRepository{virtualStorage: "stub", relativePath: "path/to/repo"}
+ err := cmd.Exec(flag.NewFlagSet("", flag.PanicOnError), config.Config{Failover: config.Failover{ElectionStrategy: config.ElectionStrategyPerRepository}})
+ require.EqualError(t, err, `"authoritative-storage" is a required parameter`)
+ })
+
+ t.Run("db connection error", func(t *testing.T) {
+ cmd := trackRepository{virtualStorage: "stub", relativePath: "stub", authoritativeStorage: "storage-0"}
+ cfg := config.Config{DB: config.DB{Host: "stub", SSLMode: "disable"}}
+ err := cmd.Exec(flag.NewFlagSet("", flag.PanicOnError), cfg)
+ require.Error(t, err)
+ require.Contains(t, err.Error(), "connect to database: dial tcp: lookup stub")
+ })
+}
+
+func TestTrackRepository_Exec(t *testing.T) {
+ g1Cfg := testcfg.Build(t, testcfg.WithStorages("gitaly-1"))
+ g2Cfg := testcfg.Build(t, testcfg.WithStorages("gitaly-2"))
+
+ g1Srv := testserver.StartGitalyServer(t, g1Cfg, nil, setup.RegisterAll, testserver.WithDisablePraefect())
+ g2Srv := testserver.StartGitalyServer(t, g2Cfg, nil, setup.RegisterAll, testserver.WithDisablePraefect())
+ defer g2Srv.Shutdown()
+ defer g1Srv.Shutdown()
+
+ g1Addr := g1Srv.Address()
+
+ db := getDB(t)
+ dbConf := glsql.GetDBConfig(t, "cmd_praefect")
+
+ virtualStorageName := "praefect"
+ conf := config.Config{
+ SocketPath: testhelper.GetTemporaryGitalySocketFileName(t),
+ VirtualStorages: []*config.VirtualStorage{
+ {
+ Name: virtualStorageName,
+ Nodes: []*config.Node{
+ {Storage: g1Cfg.Storages[0].Name, Address: g1Addr},
+ {Storage: g2Cfg.Storages[0].Name, Address: g2Srv.Address()},
+ },
+ DefaultReplicationFactor: 2,
+ },
+ },
+ DB: dbConf,
+ }
+
+ gitalyCC, err := client.Dial(g1Addr, nil)
+ require.NoError(t, err)
+ defer func() { require.NoError(t, gitalyCC.Close()) }()
+ ctx, cancel := testhelper.Context()
+ defer cancel()
+
+ gitaly1RepositoryClient := gitalypb.NewRepositoryServiceClient(gitalyCC)
+
+ createRepoThroughGitaly1 := func(relativePath string) error {
+ _, err := gitaly1RepositoryClient.CreateRepository(
+ ctx,
+ &gitalypb.CreateRepositoryRequest{
+ Repository: &gitalypb.Repository{
+ StorageName: g1Cfg.Storages[0].Name,
+ RelativePath: relativePath,
+ },
+ })
+ return err
+ }
+
+ testCases := map[string]struct {
+ failoverConfig config.Failover
+ authoritativeStorage string
+ }{
+ "sql election": {
+ failoverConfig: config.Failover{
+ Enabled: true,
+ ElectionStrategy: config.ElectionStrategySQL,
+ },
+ authoritativeStorage: "",
+ },
+ "per repository election": {
+ failoverConfig: config.Failover{
+ Enabled: true,
+ ElectionStrategy: config.ElectionStrategyPerRepository,
+ },
+ authoritativeStorage: g1Cfg.Storages[0].Name,
+ },
+ }
+
+ logger := testhelper.NewTestLogger(t)
+ for tn, tc := range testCases {
+ t.Run(tn, func(t *testing.T) {
+ addCmdConf := conf
+ addCmdConf.Failover = tc.failoverConfig
+
+ t.Run("ok", func(t *testing.T) {
+ nodeMgr, err := nodes.NewManager(testhelper.DiscardTestEntry(t), addCmdConf, db.DB, nil, promtest.NewMockHistogramVec(), protoregistry.GitalyProtoPreregistered, nil, nil)
+ require.NoError(t, err)
+ nodeMgr.Start(0, time.Hour)
+
+ relativePath := fmt.Sprintf("path/to/test/repo_%s", tn)
+ repoDS := datastore.NewPostgresRepositoryStore(db, conf.StorageNames())
+
+ require.NoError(t, createRepoThroughGitaly1(relativePath))
+
+ rmRepoCmd := &removeRepository{
+ logger: logger,
+ virtualStorage: virtualStorageName,
+ relativePath: relativePath,
+ }
+
+ require.NoError(t, rmRepoCmd.Exec(flag.NewFlagSet("", flag.PanicOnError), conf))
+
+ // create the repo on Gitaly without Praefect knowing
+ require.NoError(t, createRepoThroughGitaly1(relativePath))
+ require.DirExists(t, filepath.Join(g1Cfg.Storages[0].Path, relativePath))
+ require.NoDirExists(t, filepath.Join(g2Cfg.Storages[0].Path, relativePath))
+
+ addRepoCmd := &trackRepository{
+ logger: logger,
+ virtualStorage: virtualStorageName,
+ relativePath: relativePath,
+ authoritativeStorage: tc.authoritativeStorage,
+ }
+
+ require.NoError(t, addRepoCmd.Exec(flag.NewFlagSet("", flag.PanicOnError), addCmdConf))
+ as := datastore.NewAssignmentStore(db, conf.StorageNames())
+
+ assignments, err := as.GetHostAssignments(ctx, virtualStorageName, relativePath)
+ require.NoError(t, err)
+ require.Len(t, assignments, 2)
+ assert.Contains(t, assignments, g1Cfg.Storages[0].Name)
+ assert.Contains(t, assignments, g2Cfg.Storages[0].Name)
+
+ exists, err := repoDS.RepositoryExists(ctx, virtualStorageName, relativePath)
+ require.NoError(t, err)
+ assert.True(t, exists)
+ })
+
+ t.Run("repository does not exist", func(t *testing.T) {
+ relativePath := fmt.Sprintf("path/to/test/repo_1_%s", tn)
+
+ cmd := &trackRepository{
+ logger: testhelper.NewTestLogger(t),
+ virtualStorage: "praefect",
+ relativePath: relativePath,
+ authoritativeStorage: tc.authoritativeStorage,
+ }
+
+ assert.True(t, errors.Is(cmd.Exec(flag.NewFlagSet("", flag.PanicOnError), addCmdConf), errAuthoritativeRepositoryNotExist))
+ })
+
+ t.Run("records already exist", func(t *testing.T) {
+ relativePath := fmt.Sprintf("path/to/test/repo_2_%s", tn)
+
+ require.NoError(t, createRepoThroughGitaly1(relativePath))
+ require.DirExists(t, filepath.Join(g1Cfg.Storages[0].Path, relativePath))
+ require.NoDirExists(t, filepath.Join(g2Cfg.Storages[0].Path, relativePath))
+
+ ds := datastore.NewPostgresRepositoryStore(db, conf.StorageNames())
+ require.NoError(t, err)
+ require.NoError(t, ds.CreateRepository(ctx, virtualStorageName, relativePath, g1Cfg.Storages[0].Name, nil, nil, true, true))
+
+ cmd := &trackRepository{
+ logger: testhelper.NewTestLogger(t),
+ virtualStorage: virtualStorageName,
+ relativePath: relativePath,
+ authoritativeStorage: tc.authoritativeStorage,
+ }
+
+ assert.NoError(t, cmd.Exec(flag.NewFlagSet("", flag.PanicOnError), addCmdConf))
+ })
+ })
+ }
+}
diff --git a/internal/bootstrap/bootstrap.go b/internal/bootstrap/bootstrap.go
index ef414184d..2d741e45b 100644
--- a/internal/bootstrap/bootstrap.go
+++ b/internal/bootstrap/bootstrap.go
@@ -22,11 +22,18 @@ const (
socketReusePortWarning = "Unable to set SO_REUSEPORT: zero downtime upgrades will not work"
)
+// Listener is an interface of the bootstrap manager.
+type Listener interface {
+ // RegisterStarter adds starter to the pool.
+ RegisterStarter(starter Starter)
+ // Start starts all registered starters to accept connections.
+ Start() error
+ // Wait terminates all registered starters.
+ Wait(gracefulTimeout time.Duration, stopAction func()) error
+}
+
// Bootstrap handles graceful upgrades
type Bootstrap struct {
- // StopAction will be invoked during a graceful stop. It must wait until the shutdown is completed
- StopAction func()
-
upgrader upgrader
listenFunc ListenFunc
errChan chan error
@@ -151,7 +158,8 @@ func (b *Bootstrap) Start() error {
// Wait will signal process readiness to the parent and than wait for an exit condition
// SIGTERM, SIGINT and a runtime error will trigger an immediate shutdown
// in case of an upgrade there will be a grace period to complete the ongoing requests
-func (b *Bootstrap) Wait(gracefulTimeout time.Duration) error {
+// stopAction will be invoked during a graceful stop. It must wait until the shutdown is completed.
+func (b *Bootstrap) Wait(gracefulTimeout time.Duration, stopAction func()) error {
signals := []os.Signal{syscall.SIGTERM, syscall.SIGINT}
immediateShutdown := make(chan os.Signal, len(signals))
signal.Notify(immediateShutdown, signals...)
@@ -167,7 +175,7 @@ func (b *Bootstrap) Wait(gracefulTimeout time.Duration) error {
// the new process signaled its readiness and we started a graceful stop
// however no further upgrades can be started until this process is running
// we set a grace period and then we force a termination.
- waitError := b.waitGracePeriod(gracefulTimeout, immediateShutdown)
+ waitError := b.waitGracePeriod(gracefulTimeout, immediateShutdown, stopAction)
err = fmt.Errorf("graceful upgrade: %v", waitError)
case s := <-immediateShutdown:
@@ -178,13 +186,13 @@ func (b *Bootstrap) Wait(gracefulTimeout time.Duration) error {
return err
}
-func (b *Bootstrap) waitGracePeriod(gracefulTimeout time.Duration, kill <-chan os.Signal) error {
+func (b *Bootstrap) waitGracePeriod(gracefulTimeout time.Duration, kill <-chan os.Signal, stopAction func()) error {
log.WithField("graceful_timeout", gracefulTimeout).Warn("starting grace period")
allServersDone := make(chan struct{})
go func() {
- if b.StopAction != nil {
- b.StopAction()
+ if stopAction != nil {
+ stopAction()
}
close(allServersDone)
}()
@@ -208,3 +216,51 @@ func (b *Bootstrap) listen(network, path string) (net.Listener, error) {
return b.listenFunc(network, path)
}
+
+// Noop is a bootstrapper that does no additional configurations.
+type Noop struct {
+ starters []Starter
+ shutdown chan struct{}
+ errChan chan error
+}
+
+// NewNoop returns initialized instance of the *Noop.
+func NewNoop() *Noop {
+ return &Noop{shutdown: make(chan struct{})}
+}
+
+// RegisterStarter adds starter to the pool.
+func (n *Noop) RegisterStarter(starter Starter) {
+ n.starters = append(n.starters, starter)
+}
+
+// Start starts all registered starters to accept connections.
+func (n *Noop) Start() error {
+ n.errChan = make(chan error, len(n.starters))
+
+ for _, start := range n.starters {
+ if err := start(net.Listen, n.errChan); err != nil {
+ return err
+ }
+ }
+ return nil
+}
+
+// Wait terminates all registered starters.
+func (n *Noop) Wait(_ time.Duration, stopAction func()) error {
+ select {
+ case <-n.shutdown:
+ if stopAction != nil {
+ stopAction()
+ }
+ case err := <-n.errChan:
+ return err
+ }
+
+ return nil
+}
+
+// Terminate unblocks Wait method and executes stopAction call-back passed into it.
+func (n *Noop) Terminate() {
+ close(n.shutdown)
+}
diff --git a/internal/bootstrap/bootstrap_test.go b/internal/bootstrap/bootstrap_test.go
index 6af4bade1..d0a1562ca 100644
--- a/internal/bootstrap/bootstrap_test.go
+++ b/internal/bootstrap/bootstrap_test.go
@@ -104,10 +104,13 @@ func waitWithTimeout(t *testing.T, waitCh <-chan error, timeout time.Duration) e
}
func TestImmediateTerminationOnSocketError(t *testing.T) {
- b, server := makeBootstrap(t)
+ ctx, cancel := testhelper.Context()
+ defer cancel()
+
+ b, server, stopAction := makeBootstrap(t, ctx)
waitCh := make(chan error)
- go func() { waitCh <- b.Wait(2 * time.Second) }()
+ go func() { waitCh <- b.Wait(2*time.Second, stopAction) }()
require.NoError(t, server.listeners["tcp"].Close(), "Closing first listener")
@@ -119,12 +122,15 @@ func TestImmediateTerminationOnSocketError(t *testing.T) {
func TestImmediateTerminationOnSignal(t *testing.T) {
for _, sig := range []syscall.Signal{syscall.SIGTERM, syscall.SIGINT} {
t.Run(sig.String(), func(t *testing.T) {
- b, server := makeBootstrap(t)
+ ctx, cancel := testhelper.Context()
+ defer cancel()
+
+ b, server, stopAction := makeBootstrap(t, ctx)
done := server.slowRequest(3 * time.Minute)
waitCh := make(chan error)
- go func() { waitCh <- b.Wait(2 * time.Second) }()
+ go func() { waitCh <- b.Wait(2*time.Second, stopAction) }()
// make sure we are inside b.Wait() or we'll kill the test suite
time.Sleep(100 * time.Millisecond)
@@ -146,9 +152,12 @@ func TestImmediateTerminationOnSignal(t *testing.T) {
}
func TestGracefulTerminationStuck(t *testing.T) {
- b, server := makeBootstrap(t)
+ ctx, cancel := testhelper.Context()
+ defer cancel()
+
+ b, server, stopAction := makeBootstrap(t, ctx)
- err := testGracefulUpdate(t, server, b, 3*time.Second, 2*time.Second, nil)
+ err := testGracefulUpdate(t, server, b, 3*time.Second, 2*time.Second, nil, stopAction)
require.Contains(t, err.Error(), "grace period expired")
}
@@ -158,22 +167,27 @@ func TestGracefulTerminationWithSignals(t *testing.T) {
for _, sig := range []syscall.Signal{syscall.SIGTERM, syscall.SIGINT} {
t.Run(sig.String(), func(t *testing.T) {
- b, server := makeBootstrap(t)
+ ctx, cancel := testhelper.Context()
+ defer cancel()
+ b, server, stopAction := makeBootstrap(t, ctx)
err := testGracefulUpdate(t, server, b, 1*time.Second, 2*time.Second, func() {
require.NoError(t, self.Signal(sig))
- })
+ }, stopAction)
require.Contains(t, err.Error(), "force shutdown")
})
}
}
func TestGracefulTerminationServerErrors(t *testing.T) {
- b, server := makeBootstrap(t)
+ ctx, cancel := testhelper.Context()
+ defer cancel()
+
+ b, server, _ := makeBootstrap(t, ctx)
done := make(chan error, 1)
// This is a simulation of receiving a listener error during waitGracePeriod
- b.StopAction = func() {
+ stopAction := func() {
// we close the unix listener in order to test that the shutdown will not fail, but it keep waiting for the TCP request
require.NoError(t, server.listeners["unix"].Close())
@@ -185,19 +199,21 @@ func TestGracefulTerminationServerErrors(t *testing.T) {
require.NoError(t, server.server.Shutdown(context.Background()))
}
- err := testGracefulUpdate(t, server, b, 3*time.Second, 2*time.Second, nil)
+ err := testGracefulUpdate(t, server, b, 3*time.Second, 2*time.Second, nil, stopAction)
require.Contains(t, err.Error(), "grace period expired")
require.NoError(t, <-done)
}
func TestGracefulTermination(t *testing.T) {
- b, server := makeBootstrap(t)
+ ctx, cancel := testhelper.Context()
+ defer cancel()
+ b, server, _ := makeBootstrap(t, ctx)
// Using server.Close we bypass the graceful shutdown faking a completed shutdown
- b.StopAction = func() { server.server.Close() }
+ stopAction := func() { server.server.Close() }
- err := testGracefulUpdate(t, server, b, 1*time.Second, 2*time.Second, nil)
+ err := testGracefulUpdate(t, server, b, 1*time.Second, 2*time.Second, nil, stopAction)
require.Contains(t, err.Error(), "completed")
}
@@ -219,9 +235,9 @@ func TestPortReuse(t *testing.T) {
require.NoError(t, l.Close())
}
-func testGracefulUpdate(t *testing.T, server *testServer, b *Bootstrap, waitTimeout, gracefulWait time.Duration, duringGracePeriodCallback func()) error {
+func testGracefulUpdate(t *testing.T, server *testServer, b *Bootstrap, waitTimeout, gracefulWait time.Duration, duringGracePeriodCallback func(), stopAction func()) error {
waitCh := make(chan error)
- go func() { waitCh <- b.Wait(gracefulWait) }()
+ go func() { waitCh <- b.Wait(gracefulWait, stopAction) }()
// Start a slow request to keep the old server from shutting down immediately.
req := server.slowRequest(2 * gracefulWait)
@@ -251,7 +267,7 @@ func testGracefulUpdate(t *testing.T, server *testServer, b *Bootstrap, waitTime
return waitErr
}
-func makeBootstrap(t *testing.T) (*Bootstrap, *testServer) {
+func makeBootstrap(t *testing.T, ctx context.Context) (*Bootstrap, *testServer, func()) {
mux := http.NewServeMux()
mux.HandleFunc("/", func(w http.ResponseWriter, _ *http.Request) {
w.WriteHeader(200)
@@ -271,8 +287,6 @@ func makeBootstrap(t *testing.T) (*Bootstrap, *testServer) {
b, err := _new(u, net.Listen, false)
require.NoError(t, err)
- b.StopAction = func() { require.NoError(t, s.Shutdown(context.Background())) }
-
listeners := make(map[string]net.Listener)
start := func(network, address string) Starter {
return func(listen ListenFunc, errors chan<- error) error {
@@ -312,7 +326,7 @@ func makeBootstrap(t *testing.T) (*Bootstrap, *testServer) {
server: &s,
listeners: listeners,
url: url,
- }
+ }, func() { require.NoError(t, s.Shutdown(context.Background())) }
}
func testAllListeners(t *testing.T, listeners map[string]net.Listener) {
diff --git a/internal/praefect/datastore/storage_cleanup.go b/internal/praefect/datastore/storage_cleanup.go
new file mode 100644
index 000000000..455ddef2b
--- /dev/null
+++ b/internal/praefect/datastore/storage_cleanup.go
@@ -0,0 +1,85 @@
+package datastore
+
+import (
+ "context"
+ "database/sql"
+ "fmt"
+
+ "github.com/lib/pq"
+)
+
+// RepositoryClusterPath identifies location of the repository in the cluster.
+type RepositoryClusterPath struct {
+ ClusterPath
+ // RelativePath relative path to the repository on the disk.
+ RelativePath string
+}
+
+// NewRepositoryClusterPath initializes and returns RepositoryClusterPath.
+func NewRepositoryClusterPath(virtualStorage, storage, relativePath string) RepositoryClusterPath {
+ return RepositoryClusterPath{
+ ClusterPath: ClusterPath{
+ VirtualStorage: virtualStorage,
+ Storage: storage,
+ },
+ RelativePath: relativePath,
+ }
+}
+
+// ClusterPath represents path on the cluster to the storage.
+type ClusterPath struct {
+ // VirtualStorage is the name of the virtual storage.
+ VirtualStorage string
+ // Storage is the name of the gitaly storage.
+ Storage string
+}
+
+// NewStorageCleanup initialises and returns a new instance of the StorageCleanup.
+func NewStorageCleanup(db *sql.DB) *StorageCleanup {
+ return &StorageCleanup{db: db}
+}
+
+// StorageCleanup provides methods on the database for the repository cleanup operation.
+type StorageCleanup struct {
+ db *sql.DB
+}
+
+// DoesntExist returns RepositoryClusterPath for each repository that doesn't exist in the database
+// by querying repositories and storage_repositories tables.
+func (ss *StorageCleanup) DoesntExist(ctx context.Context, virtualStorage, storage string, relativePath []string) ([]RepositoryClusterPath, error) {
+ if len(relativePath) == 0 {
+ return nil, nil
+ }
+
+ rows, err := ss.db.QueryContext(
+ ctx,
+ `SELECT $1 AS virtual_storage, $2 AS storage, UNNEST($3::TEXT[]) AS relative_path
+ EXCEPT (
+ SELECT virtual_storage, storage, relative_path
+ FROM repositories
+ JOIN storage_repositories USING (virtual_storage, relative_path)
+ WHERE virtual_storage = $1 AND storage = $2 AND relative_path = ANY($3)
+ )`,
+ virtualStorage, storage, pq.StringArray(relativePath),
+ )
+ if err != nil {
+ return nil, fmt.Errorf("query: %w", err)
+ }
+ defer func() { _ = rows.Close() }()
+
+ var res []RepositoryClusterPath
+ for rows.Next() {
+ var curr RepositoryClusterPath
+ if err := rows.Scan(&curr.VirtualStorage, &curr.Storage, &curr.RelativePath); err != nil {
+ return nil, fmt.Errorf("scan: %w", err)
+ }
+ res = append(res, curr)
+ }
+ if err := rows.Err(); err != nil {
+ return nil, fmt.Errorf("loop: %w", err)
+ }
+ if err := rows.Close(); err != nil {
+ return nil, fmt.Errorf("close: %w", err)
+ }
+ return res, nil
+}
diff --git a/internal/praefect/datastore/storage_cleanup_test.go b/internal/praefect/datastore/storage_cleanup_test.go
new file mode 100644
index 000000000..9d60c7bae
--- /dev/null
+++ b/internal/praefect/datastore/storage_cleanup_test.go
@@ -0,0 +1,92 @@
+// +build postgres
+
+package datastore
+
+import (
+ "testing"
+
+ "github.com/stretchr/testify/require"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper"
+)
+
+func TestStorageCleanup_Exists(t *testing.T) {
+ ctx, cancel := testhelper.Context()
+ defer cancel()
+
+ db := getDB(t)
+
+ repoStore := NewPostgresRepositoryStore(db.DB, nil)
+ require.NoError(t, repoStore.CreateRepository(ctx, "vs", "p/1", "g1", []string{"g2", "g3"}, nil, false, false))
+ require.NoError(t, repoStore.CreateRepository(ctx, "vs", "p/2", "g1", []string{"g2", "g3"}, nil, false, false))
+ storageCleanup := NewStorageCleanup(db.DB)
+
+ for _, tc := range []struct {
+ desc string
+ virtualStorage string
+ storage string
+ relativeReplicaPaths []string
+ out []RepositoryClusterPath
+ }{
+ {
+ desc: "multiple doesn't exist",
+ virtualStorage: "vs",
+ storage: "g1",
+ relativeReplicaPaths: []string{"p/1", "p/2", "path/x", "path/y"},
+ out: []RepositoryClusterPath{
+ NewRepositoryClusterPath("vs", "g1", "path/x"),
+ NewRepositoryClusterPath("vs", "g1", "path/y"),
+ },
+ },
+ {
+ desc: "duplicates",
+ virtualStorage: "vs",
+ storage: "g1",
+ relativeReplicaPaths: []string{"p/1", "path/x", "path/x"},
+ out: []RepositoryClusterPath{
+ NewRepositoryClusterPath("vs", "g1", "path/x"),
+ },
+ },
+ {
+ desc: "all exist",
+ virtualStorage: "vs",
+ storage: "g1",
+ relativeReplicaPaths: []string{"p/1", "p/2"},
+ out: nil,
+ },
+ {
+ desc: "all doesn't exist",
+ virtualStorage: "vs",
+ storage: "g1",
+ relativeReplicaPaths: []string{"path/x", "path/y", "path/z"},
+ out: []RepositoryClusterPath{
+ NewRepositoryClusterPath("vs", "g1", "path/x"),
+ NewRepositoryClusterPath("vs", "g1", "path/y"),
+ NewRepositoryClusterPath("vs", "g1", "path/z"),
+ },
+ },
+ {
+ desc: "doesn't exist because of storage",
+ virtualStorage: "vs",
+ storage: "stub",
+ relativeReplicaPaths: []string{"path/x"},
+ out: []RepositoryClusterPath{
+ NewRepositoryClusterPath("vs", "stub", "path/x"),
+ },
+ },
+ {
+ desc: "doesn't exist because of virtual storage",
+ virtualStorage: "stub",
+ storage: "g1",
+ relativeReplicaPaths: []string{"path/x"},
+ out: []RepositoryClusterPath{
+ NewRepositoryClusterPath("stub", "g1", "path/x"),
+ },
+ },
+ } {
+ t.Run(tc.desc, func(t *testing.T) {
+ res, err := storageCleanup.DoesntExist(ctx, tc.virtualStorage, tc.storage, tc.relativeReplicaPaths)
+ require.NoError(t, err)
+ require.ElementsMatch(t, tc.out, res)
+ })
+ }
+}
diff --git a/internal/praefect/metrics/prometheus.go b/internal/praefect/metrics/prometheus.go
index 64e4eee52..8e98bd84b 100644
--- a/internal/praefect/metrics/prometheus.go
+++ b/internal/praefect/metrics/prometheus.go
@@ -9,7 +9,7 @@ import (
// RegisterReplicationDelay creates and registers a prometheus histogram
// to observe replication delay times
-func RegisterReplicationDelay(conf promconfig.Config) (metrics.HistogramVec, error) {
+func RegisterReplicationDelay(conf promconfig.Config, registerer prometheus.Registerer) (metrics.HistogramVec, error) {
replicationDelay := prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: "gitaly",
@@ -20,12 +20,12 @@ func RegisterReplicationDelay(conf promconfig.Config) (metrics.HistogramVec, err
[]string{"type"},
)
- return replicationDelay, prometheus.Register(replicationDelay)
+ return replicationDelay, registerer.Register(replicationDelay)
}
// RegisterReplicationLatency creates and registers a prometheus histogram
// to observe replication latency times
-func RegisterReplicationLatency(conf promconfig.Config) (metrics.HistogramVec, error) {
+func RegisterReplicationLatency(conf promconfig.Config, registerer prometheus.Registerer) (metrics.HistogramVec, error) {
replicationLatency := prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: "gitaly",
@@ -36,12 +36,12 @@ func RegisterReplicationLatency(conf promconfig.Config) (metrics.HistogramVec, e
[]string{"type"},
)
- return replicationLatency, prometheus.Register(replicationLatency)
+ return replicationLatency, registerer.Register(replicationLatency)
}
// RegisterNodeLatency creates and registers a prometheus histogram to
// observe internal node latency
-func RegisterNodeLatency(conf promconfig.Config) (metrics.HistogramVec, error) {
+func RegisterNodeLatency(conf promconfig.Config, registerer prometheus.Registerer) (metrics.HistogramVec, error) {
nodeLatency := prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: "gitaly",
@@ -51,7 +51,7 @@ func RegisterNodeLatency(conf promconfig.Config) (metrics.HistogramVec, error) {
}, []string{"gitaly_storage"},
)
- return nodeLatency, prometheus.Register(nodeLatency)
+ return nodeLatency, registerer.Register(nodeLatency)
}
var MethodTypeCounter = promauto.NewCounterVec(
diff --git a/internal/praefect/repocleaner/init_test.go b/internal/praefect/repocleaner/init_test.go
new file mode 100644
index 000000000..0b99e6d32
--- /dev/null
+++ b/internal/praefect/repocleaner/init_test.go
@@ -0,0 +1,21 @@
+package repocleaner
+
+import (
+ "os"
+ "testing"
+
+ "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper"
+)
+
+func TestMain(m *testing.M) {
+ os.Exit(testMain(m))
+}
+
+func testMain(m *testing.M) int {
+ defer testhelper.MustHaveNoChildProcess()
+
+ cleanup := testhelper.Configure()
+ defer cleanup()
+
+ return m.Run()
+}
diff --git a/internal/praefect/repocleaner/repository.go b/internal/praefect/repocleaner/repository.go
new file mode 100644
index 000000000..86ea69bd1
--- /dev/null
+++ b/internal/praefect/repocleaner/repository.go
@@ -0,0 +1,77 @@
+package repocleaner
+
+import (
+ "context"
+ "errors"
+ "fmt"
+ "io"
+
+ "gitlab.com/gitlab-org/gitaly/v14/internal/praefect"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore"
+ "gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb"
+)
+
+// Walker allows iterating over the repositories of the gitaly storage.
+type Walker struct {
+ conns praefect.Connections
+ batchSize int
+}
+
+// NewWalker returns a new *Walker instance.
+func NewWalker(conns praefect.Connections, batchSize int) *Walker {
+ return &Walker{conns: conns, batchSize: batchSize}
+}
+
+// ExecOnRepositories runs through all the repositories on a Gitaly storage and executes the provided action.
+// The processing is done in batches to reduce cost of operations.
+func (wr *Walker) ExecOnRepositories(ctx context.Context, virtualStorage, storage string, action func([]datastore.RepositoryClusterPath) error) error {
+ gclient, err := wr.getInternalGitalyClient(virtualStorage, storage)
+ if err != nil {
+ return fmt.Errorf("setup gitaly client: %w", err)
+ }
+
+ resp, err := gclient.WalkRepos(ctx, &gitalypb.WalkReposRequest{StorageName: storage})
+ if err != nil {
+ return fmt.Errorf("unable to walk repos: %w", err)
+ }
+
+ batch := make([]datastore.RepositoryClusterPath, 0, wr.batchSize)
+ for {
+ res, err := resp.Recv()
+ if err != nil {
+ if !errors.Is(err, io.EOF) {
+ return fmt.Errorf("failure on walking repos: %w", err)
+ }
+ break
+ }
+
+ batch = append(batch, datastore.RepositoryClusterPath{
+ ClusterPath: datastore.ClusterPath{
+ VirtualStorage: virtualStorage,
+ Storage: storage,
+ },
+ RelativePath: res.RelativePath,
+ })
+
+ if len(batch) == cap(batch) {
+ if err := action(batch); err != nil {
+ return err
+ }
+ batch = batch[:0]
+ }
+ }
+ if len(batch) > 0 {
+ if err := action(batch); err != nil {
+ return err
+ }
+ }
+ return nil
+}
+
+func (wr *Walker) getInternalGitalyClient(virtualStorage, storage string) (gitalypb.InternalGitalyClient, error) {
+ conn, found := wr.conns[virtualStorage][storage]
+ if !found {
+ return nil, fmt.Errorf("no connection to the gitaly node %q/%q", virtualStorage, storage)
+ }
+ return gitalypb.NewInternalGitalyClient(conn), nil
+}
diff --git a/internal/praefect/repocleaner/repository_test.go b/internal/praefect/repocleaner/repository_test.go
new file mode 100644
index 000000000..6cb3727d4
--- /dev/null
+++ b/internal/praefect/repocleaner/repository_test.go
@@ -0,0 +1,114 @@
+package repocleaner
+
+import (
+ "testing"
+
+ "github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/backchannel"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/git/gittest"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/service/setup"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/praefect"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/config"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/protoregistry"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/service/transaction"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testcfg"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testserver"
+)
+
+func TestWalker_ExecOnRepositories(t *testing.T) {
+ const (
+ repo1RelPath = "repo-1.git"
+ repo2RelPath = "repo-2.git"
+ repo3RelPath = "repo-3.git"
+
+ storage1 = "gitaly-1"
+
+ virtualStorage = "praefect"
+ )
+
+ gCfg := testcfg.Build(t, testcfg.WithStorages(storage1))
+ gAddr := testserver.RunGitalyServer(t, gCfg, nil, setup.RegisterAll, testserver.WithDisablePraefect())
+
+ conf := config.Config{
+ SocketPath: testhelper.GetTemporaryGitalySocketFileName(t),
+ VirtualStorages: []*config.VirtualStorage{
+ {
+ Name: virtualStorage,
+ Nodes: []*config.Node{
+ {Storage: gCfg.Storages[0].Name, Address: gAddr},
+ },
+ },
+ },
+ }
+
+ gittest.CloneRepoAtStorage(t, gCfg, gCfg.Storages[0], repo1RelPath)
+ gittest.CloneRepoAtStorage(t, gCfg, gCfg.Storages[0], repo2RelPath)
+ gittest.CloneRepoAtStorage(t, gCfg, gCfg.Storages[0], repo3RelPath)
+
+ ctx, cancel := testhelper.Context()
+ defer cancel()
+
+ entry := testhelper.NewTestLogger(t).WithContext(ctx)
+ clientHandshaker := backchannel.NewClientHandshaker(entry, praefect.NewBackchannelServerFactory(entry, transaction.NewServer(nil)))
+ nodeSet, err := praefect.DialNodes(ctx, conf.VirtualStorages, protoregistry.GitalyProtoPreregistered, nil, clientHandshaker)
+ require.NoError(t, err)
+ defer nodeSet.Close()
+
+ for _, tc := range []struct {
+ desc string
+ batchSize int
+ exp [][]datastore.RepositoryClusterPath
+ expErr error
+ }{
+ {
+ desc: "multiple batches",
+ batchSize: 2,
+ exp: [][]datastore.RepositoryClusterPath{
+ {
+ {ClusterPath: datastore.ClusterPath{VirtualStorage: virtualStorage, Storage: storage1}, RelativePath: repo1RelPath},
+ {ClusterPath: datastore.ClusterPath{VirtualStorage: virtualStorage, Storage: storage1}, RelativePath: repo2RelPath},
+ },
+ {
+ {ClusterPath: datastore.ClusterPath{VirtualStorage: virtualStorage, Storage: storage1}, RelativePath: repo3RelPath},
+ },
+ },
+ },
+ {
+ desc: "single batch",
+ batchSize: 10,
+ exp: [][]datastore.RepositoryClusterPath{
+ {
+ {ClusterPath: datastore.ClusterPath{VirtualStorage: virtualStorage, Storage: storage1}, RelativePath: repo1RelPath},
+ {ClusterPath: datastore.ClusterPath{VirtualStorage: virtualStorage, Storage: storage1}, RelativePath: repo2RelPath},
+ {ClusterPath: datastore.ClusterPath{VirtualStorage: virtualStorage, Storage: storage1}, RelativePath: repo3RelPath},
+ },
+ },
+ },
+ {
+ desc: "terminates on error",
+ batchSize: 1,
+ exp: [][]datastore.RepositoryClusterPath{
+ {
+ {ClusterPath: datastore.ClusterPath{VirtualStorage: virtualStorage, Storage: storage1}, RelativePath: repo1RelPath},
+ },
+ },
+ expErr: assert.AnError,
+ },
+ } {
+ t.Run(tc.desc, func(t *testing.T) {
+ walker := NewWalker(nodeSet.Connections(), tc.batchSize)
+ var iteration int
+ err = walker.ExecOnRepositories(ctx, conf.VirtualStorages[0].Name, storage1, func(paths []datastore.RepositoryClusterPath) error {
+ require.Less(t, iteration, len(tc.exp))
+ expected := tc.exp[iteration]
+ iteration++
+ assert.ElementsMatch(t, paths, expected)
+ return tc.expErr
+ })
+ require.Equal(t, tc.expErr, err)
+ })
+ }
+}