Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJohn Cai <jcai@gitlab.com>2021-09-27 21:56:48 +0300
committerJohn Cai <jcai@gitlab.com>2021-10-12 17:57:52 +0300
commit190e2e505f022ee911ead4f02fb4d22888f3f532 (patch)
treec000628b28b0e23bb5d45392f4b8e97d5ebc1c6e
parentc29860242695db7fc10aa47ff9c38d0c47293beb (diff)
Add track-repository praefect subcommandjc-backport-track-repo-13-12-stable
-rw-r--r--cmd/praefect/subcmd.go1
-rw-r--r--cmd/praefect/subcmd_track_repository.go224
-rw-r--r--cmd/praefect/subcmd_track_repository_test.go230
3 files changed, 455 insertions, 0 deletions
diff --git a/cmd/praefect/subcmd.go b/cmd/praefect/subcmd.go
index f6ae54818..13602a25e 100644
--- a/cmd/praefect/subcmd.go
+++ b/cmd/praefect/subcmd.go
@@ -34,6 +34,7 @@ var (
"accept-dataloss": &acceptDatalossSubcommand{},
"set-replication-factor": newSetReplicatioFactorSubcommand(os.Stdout),
removeRepositoryCmdName: newRemoveRepository(logger),
+ trackRepositoryCmdName: newTrackRepository(logger),
}
)
diff --git a/cmd/praefect/subcmd_track_repository.go b/cmd/praefect/subcmd_track_repository.go
new file mode 100644
index 000000000..7af6d2b01
--- /dev/null
+++ b/cmd/praefect/subcmd_track_repository.go
@@ -0,0 +1,224 @@
+package main
+
+import (
+ "context"
+ "database/sql"
+ "errors"
+ "flag"
+ "fmt"
+ "math/rand"
+ "time"
+
+ "github.com/sirupsen/logrus"
+ "gitlab.com/gitlab-org/gitaly/internal/praefect/config"
+ "gitlab.com/gitlab-org/gitaly/internal/praefect/datastore"
+ "gitlab.com/gitlab-org/gitaly/internal/praefect/datastore/glsql"
+ "gitlab.com/gitlab-org/gitaly/proto/go/gitalypb"
+ "gitlab.com/gitlab-org/labkit/correlation"
+ "google.golang.org/grpc/metadata"
+)
+
+const (
+ trackRepositoryCmdName = "track-repository"
+)
+
+type trackRepository struct {
+ logger logrus.FieldLogger
+ virtualStorage string
+ relativePath string
+ authoritativeStorage string
+}
+
+var errAuthoritativeRepositoryNotExist = errors.New("authoritative repository does not exist")
+
+func newTrackRepository(logger logrus.FieldLogger) *trackRepository {
+ return &trackRepository{logger: logger}
+}
+
+func (cmd *trackRepository) FlagSet() *flag.FlagSet {
+ fs := flag.NewFlagSet(trackRepositoryCmdName, flag.ExitOnError)
+ fs.StringVar(&cmd.virtualStorage, paramVirtualStorage, "", "name of the repository's virtual storage")
+ fs.StringVar(&cmd.relativePath, paramRelativePath, "", "relative path to the repository")
+ fs.StringVar(&cmd.authoritativeStorage, paramAuthoritativeStorage, "", "storage with the repository to consider as authoritative")
+ fs.Usage = func() {
+ _, _ = printfErr("Description:\n" +
+ " This command adds a given repository to be tracked by Praefect.\n" +
+ " It checks if the repository exists on disk on the authoritative storage, " +
+ " and whether database records are absent from tracking the repository.")
+ fs.PrintDefaults()
+ }
+ return fs
+}
+
+func (cmd trackRepository) Exec(flags *flag.FlagSet, cfg config.Config) error {
+ switch {
+ case flags.NArg() > 0:
+ return unexpectedPositionalArgsError{Command: flags.Name()}
+ case cmd.virtualStorage == "":
+ return requiredParameterError(paramVirtualStorage)
+ case cmd.relativePath == "":
+ return requiredParameterError(paramRelativePath)
+ case cmd.authoritativeStorage == "":
+ if cfg.Failover.ElectionStrategy == config.ElectionStrategyPerRepository {
+ return requiredParameterError(paramAuthoritativeStorage)
+ }
+ }
+
+ db, err := glsql.OpenDB(cfg.DB)
+ if err != nil {
+ return fmt.Errorf("connect to database: %w", err)
+ }
+ defer func() { _ = db.Close() }()
+
+ ctx := correlation.ContextWithCorrelation(context.Background(), correlation.SafeRandomID())
+ logger := cmd.logger.WithField("correlation_id", correlation.ExtractFromContext(ctx))
+
+ return cmd.exec(ctx, logger, db, cfg)
+}
+
+const trackRepoErrorPrefix = "attempting to track repository in praefect database"
+
+func (cmd *trackRepository) exec(ctx context.Context, logger logrus.FieldLogger, db *sql.DB, cfg config.Config) error {
+ logger.WithFields(logrus.Fields{
+ "virtual_storage": cmd.virtualStorage,
+ "relative_path": cmd.relativePath,
+ "authoritative_storage": cmd.authoritativeStorage,
+ }).Debug("track repository")
+
+ var primary string
+ var secondaries []string
+ var variableReplicationFactorEnabled, savePrimary bool
+ if cfg.Failover.ElectionStrategy == config.ElectionStrategyPerRepository {
+ savePrimary = true
+ primary = cmd.authoritativeStorage
+
+ for _, vs := range cfg.VirtualStorages {
+ if vs.Name == cmd.virtualStorage {
+ for _, node := range vs.Nodes {
+ if node.Storage == cmd.authoritativeStorage {
+ continue
+ }
+ secondaries = append(secondaries, node.Storage)
+ }
+ }
+ }
+
+ r := rand.New(rand.NewSource(time.Now().UnixNano()))
+ replicationFactor := cfg.DefaultReplicationFactors()[cmd.virtualStorage]
+
+ if replicationFactor > 0 {
+ variableReplicationFactorEnabled = true
+ // Select random secondaries according to the default replication factor.
+ r.Shuffle(len(secondaries), func(i, j int) {
+ secondaries[i], secondaries[j] = secondaries[j], secondaries[i]
+ })
+
+ secondaries = secondaries[:replicationFactor-1]
+ }
+ } else {
+ savePrimary = false
+ if err := db.QueryRowContext(ctx, `SELECT node_name FROM shard_primaries WHERE shard_name = $1 AND demoted = 'false'`, cmd.virtualStorage).Scan(&primary); err != nil {
+ if errors.Is(err, sql.ErrNoRows) {
+ return fmt.Errorf("%s: no primaries found", trackRepoErrorPrefix)
+ }
+ return fmt.Errorf("%s: %w", trackRepoErrorPrefix, err)
+ }
+ }
+
+ authoritativeRepoExists, err := cmd.authoritativeRepositoryExists(ctx, cfg, primary)
+ if err != nil {
+ return fmt.Errorf("%s: %w", trackRepoErrorPrefix, err)
+ }
+
+ if !authoritativeRepoExists {
+ return fmt.Errorf("%s: %w", trackRepoErrorPrefix, errAuthoritativeRepositoryNotExist)
+ }
+
+ if err := cmd.trackRepository(
+ ctx,
+ datastore.NewPostgresRepositoryStore(db, cfg.StorageNames()),
+ primary,
+ secondaries,
+ savePrimary,
+ variableReplicationFactorEnabled,
+ ); err != nil {
+ return fmt.Errorf("%s: %w", trackRepoErrorPrefix, err)
+ }
+
+ logger.Debug("finished adding new repository to be tracked in praefect database.")
+
+ return nil
+}
+
+func (cmd *trackRepository) trackRepository(
+ ctx context.Context,
+ ds *datastore.PostgresRepositoryStore,
+ primary string,
+ secondaries []string,
+ savePrimary bool,
+ variableReplicationFactorEnabled bool,
+) error {
+ if err := ds.CreateRepository(
+ ctx,
+ cmd.virtualStorage,
+ cmd.relativePath,
+ primary,
+ nil,
+ secondaries,
+ savePrimary,
+ variableReplicationFactorEnabled,
+ ); err != nil {
+ var repoExistsError *datastore.RepositoryExistsError
+ if errors.As(err, &repoExistsError) {
+ cmd.logger.Print("repository is already tracked in praefect database")
+ return nil
+ }
+
+ return fmt.Errorf("CreateRepository: %w", err)
+ }
+
+ return nil
+}
+
+func (cmd *trackRepository) repositoryExists(ctx context.Context, repo *gitalypb.Repository, addr, token string) (bool, error) {
+ conn, err := subCmdDial(addr, token)
+ if err != nil {
+ return false, fmt.Errorf("error dialing: %w", err)
+ }
+ defer func() { _ = conn.Close() }()
+
+ ctx = metadata.AppendToOutgoingContext(ctx, "client_name", trackRepositoryCmdName)
+ repositoryClient := gitalypb.NewRepositoryServiceClient(conn)
+ res, err := repositoryClient.RepositoryExists(ctx, &gitalypb.RepositoryExistsRequest{Repository: repo})
+ if err != nil {
+ return false, err
+ }
+
+ return res.GetExists(), nil
+}
+
+func (cmd *trackRepository) authoritativeRepositoryExists(ctx context.Context, cfg config.Config, nodeName string) (bool, error) {
+ for _, vs := range cfg.VirtualStorages {
+ if vs.Name != cmd.virtualStorage {
+ continue
+ }
+
+ for _, node := range vs.Nodes {
+ if node.Storage == nodeName {
+ logger.Debugf("check if repository %q exists on gitaly %q at %q", cmd.relativePath, node.Storage, node.Address)
+ repo := &gitalypb.Repository{
+ StorageName: node.Storage,
+ RelativePath: cmd.relativePath,
+ }
+ exists, err := cmd.repositoryExists(ctx, repo, node.Address, node.Token)
+ if err != nil {
+ logger.WithError(err).Warnf("checking if repository exists %q, %q", node.Storage, cmd.relativePath)
+ return false, nil
+ }
+ return exists, nil
+ }
+ }
+ return false, fmt.Errorf("node %q not found", cmd.authoritativeStorage)
+ }
+ return false, fmt.Errorf("virtual storage %q not found", cmd.virtualStorage)
+}
diff --git a/cmd/praefect/subcmd_track_repository_test.go b/cmd/praefect/subcmd_track_repository_test.go
new file mode 100644
index 000000000..f1d106d4f
--- /dev/null
+++ b/cmd/praefect/subcmd_track_repository_test.go
@@ -0,0 +1,230 @@
+//go:build postgres
+// +build postgres
+
+package main
+
+import (
+ "flag"
+ "fmt"
+ "path/filepath"
+ "testing"
+ "time"
+
+ "github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
+ "gitlab.com/gitlab-org/gitaly/client"
+ "gitlab.com/gitlab-org/gitaly/internal/gitaly/service/setup"
+ "gitlab.com/gitlab-org/gitaly/internal/praefect/config"
+ "gitlab.com/gitlab-org/gitaly/internal/praefect/datastore"
+ "gitlab.com/gitlab-org/gitaly/internal/praefect/datastore/glsql"
+ "gitlab.com/gitlab-org/gitaly/internal/praefect/nodes"
+ "gitlab.com/gitlab-org/gitaly/internal/praefect/protoregistry"
+ "gitlab.com/gitlab-org/gitaly/internal/testhelper"
+ "gitlab.com/gitlab-org/gitaly/internal/testhelper/promtest"
+ "gitlab.com/gitlab-org/gitaly/internal/testhelper/testcfg"
+ "gitlab.com/gitlab-org/gitaly/internal/testhelper/testserver"
+ "gitlab.com/gitlab-org/gitaly/proto/go/gitalypb"
+)
+
+func TestTrackRepository_FlagSet(t *testing.T) {
+ cmd := &trackRepository{}
+ fs := cmd.FlagSet()
+ require.NoError(t, fs.Parse([]string{"--virtual-storage", "vs", "--repository", "repo", "--authoritative-storage", "storage-0"}))
+ require.Equal(t, "vs", cmd.virtualStorage)
+ require.Equal(t, "repo", cmd.relativePath)
+ require.Equal(t, "storage-0", cmd.authoritativeStorage)
+}
+
+func TestTrackRepository_Exec_invalidArgs(t *testing.T) {
+ t.Run("not all flag values processed", func(t *testing.T) {
+ cmd := trackRepository{}
+ flagSet := flag.NewFlagSet("cmd", flag.PanicOnError)
+ require.NoError(t, flagSet.Parse([]string{"stub"}))
+ err := cmd.Exec(flagSet, config.Config{})
+ require.EqualError(t, err, "cmd doesn't accept positional arguments")
+ })
+
+ t.Run("virtual-storage is not set", func(t *testing.T) {
+ cmd := trackRepository{}
+ err := cmd.Exec(flag.NewFlagSet("", flag.PanicOnError), config.Config{})
+ require.EqualError(t, err, `"virtual-storage" is a required parameter`)
+ })
+
+ t.Run("repository is not set", func(t *testing.T) {
+ cmd := trackRepository{virtualStorage: "stub"}
+ err := cmd.Exec(flag.NewFlagSet("", flag.PanicOnError), config.Config{})
+ require.EqualError(t, err, `"repository" is a required parameter`)
+ })
+
+ t.Run("authoritative-storage is not set", func(t *testing.T) {
+ cmd := trackRepository{virtualStorage: "stub", relativePath: "path/to/repo"}
+ err := cmd.Exec(flag.NewFlagSet("", flag.PanicOnError), config.Config{Failover: config.Failover{ElectionStrategy: config.ElectionStrategyPerRepository}})
+ require.EqualError(t, err, `"authoritative-storage" is a required parameter`)
+ })
+
+ t.Run("db connection error", func(t *testing.T) {
+ cmd := trackRepository{virtualStorage: "stub", relativePath: "stub", authoritativeStorage: "storage-0"}
+ cfg := config.Config{DB: config.DB{Host: "stub", SSLMode: "disable"}}
+ err := cmd.Exec(flag.NewFlagSet("", flag.PanicOnError), cfg)
+ require.Error(t, err)
+ require.Contains(t, err.Error(), "connect to database: dial tcp: lookup stub")
+ })
+}
+
+func TestTrackRepository_Exec(t *testing.T) {
+ g1Cfg := testcfg.Build(t, testcfg.WithStorages("gitaly-1"))
+ g2Cfg := testcfg.Build(t, testcfg.WithStorages("gitaly-2"))
+
+ g1Srv := testserver.StartGitalyServer(t, g1Cfg, nil, setup.RegisterAll, testserver.WithDisablePraefect())
+ g2Srv := testserver.StartGitalyServer(t, g2Cfg, nil, setup.RegisterAll, testserver.WithDisablePraefect())
+ defer g2Srv.Shutdown()
+ defer g1Srv.Shutdown()
+
+ g1Addr := g1Srv.Address()
+
+ db := getDB(t)
+ var database string
+ require.NoError(t, db.QueryRow(`SELECT current_database()`).Scan(&database))
+ dbConf := glsql.GetDBConfig(t, database)
+
+ virtualStorageName := "praefect"
+ conf := config.Config{
+ SocketPath: testhelper.GetTemporaryGitalySocketFileName(t),
+ VirtualStorages: []*config.VirtualStorage{
+ {
+ Name: virtualStorageName,
+ Nodes: []*config.Node{
+ {Storage: g1Cfg.Storages[0].Name, Address: g1Addr},
+ {Storage: g2Cfg.Storages[0].Name, Address: g2Srv.Address()},
+ },
+ DefaultReplicationFactor: 2,
+ },
+ },
+ DB: dbConf,
+ }
+
+ gitalyCC, err := client.Dial(g1Addr, nil)
+ require.NoError(t, err)
+ defer func() { require.NoError(t, gitalyCC.Close()) }()
+ ctx, cancel := testhelper.Context()
+ defer cancel()
+
+ gitaly1RepositoryClient := gitalypb.NewRepositoryServiceClient(gitalyCC)
+
+ createRepoThroughGitaly1 := func(relativePath string) error {
+ _, err := gitaly1RepositoryClient.CreateRepository(
+ ctx,
+ &gitalypb.CreateRepositoryRequest{
+ Repository: &gitalypb.Repository{
+ StorageName: g1Cfg.Storages[0].Name,
+ RelativePath: relativePath,
+ },
+ })
+ return err
+ }
+
+ testCases := map[string]struct {
+ failoverConfig config.Failover
+ authoritativeStorage string
+ }{
+ "sql election": {
+ failoverConfig: config.Failover{
+ Enabled: true,
+ ElectionStrategy: config.ElectionStrategySQL,
+ },
+ authoritativeStorage: "",
+ },
+ "per repository election": {
+ failoverConfig: config.Failover{
+ Enabled: true,
+ ElectionStrategy: config.ElectionStrategyPerRepository,
+ },
+ authoritativeStorage: g1Cfg.Storages[0].Name,
+ },
+ }
+
+ logger := testhelper.NewTestLogger(t)
+ for tn, tc := range testCases {
+ t.Run(tn, func(t *testing.T) {
+ addCmdConf := conf
+ addCmdConf.Failover = tc.failoverConfig
+
+ t.Run("ok", func(t *testing.T) {
+ nodeMgr, err := nodes.NewManager(testhelper.DiscardTestEntry(t), addCmdConf, db.DB, nil, promtest.NewMockHistogramVec(), protoregistry.GitalyProtoPreregistered, nil, nil)
+ require.NoError(t, err)
+ nodeMgr.Start(0, time.Hour)
+
+ relativePath := fmt.Sprintf("path/to/test/repo_%s", tn)
+ repoDS := datastore.NewPostgresRepositoryStore(db, conf.StorageNames())
+
+ require.NoError(t, createRepoThroughGitaly1(relativePath))
+
+ rmRepoCmd := &removeRepository{
+ logger: logger,
+ virtualStorage: virtualStorageName,
+ relativePath: relativePath,
+ }
+
+ require.NoError(t, rmRepoCmd.Exec(flag.NewFlagSet("", flag.PanicOnError), conf))
+
+ // create the repo on Gitaly without Praefect knowing
+ require.NoError(t, createRepoThroughGitaly1(relativePath))
+ require.DirExists(t, filepath.Join(g1Cfg.Storages[0].Path, relativePath))
+ require.NoDirExists(t, filepath.Join(g2Cfg.Storages[0].Path, relativePath))
+
+ addRepoCmd := &trackRepository{
+ logger: logger,
+ virtualStorage: virtualStorageName,
+ relativePath: relativePath,
+ authoritativeStorage: tc.authoritativeStorage,
+ }
+
+ require.NoError(t, addRepoCmd.Exec(flag.NewFlagSet("", flag.PanicOnError), addCmdConf))
+ as := datastore.NewAssignmentStore(db, conf.StorageNames())
+
+ assignments, err := as.GetHostAssignments(ctx, virtualStorageName, relativePath)
+ require.NoError(t, err)
+ require.Len(t, assignments, 2)
+ assert.Contains(t, assignments, g1Cfg.Storages[0].Name)
+ assert.Contains(t, assignments, g2Cfg.Storages[0].Name)
+
+ exists, err := repoDS.RepositoryExists(ctx, virtualStorageName, relativePath)
+ require.NoError(t, err)
+ assert.True(t, exists)
+ })
+
+ t.Run("repository does not exist", func(t *testing.T) {
+ relativePath := fmt.Sprintf("path/to/test/repo_1_%s", tn)
+
+ cmd := &trackRepository{
+ logger: testhelper.NewTestLogger(t),
+ virtualStorage: "praefect",
+ relativePath: relativePath,
+ authoritativeStorage: tc.authoritativeStorage,
+ }
+ assert.EqualError(t, cmd.Exec(flag.NewFlagSet("", flag.PanicOnError), addCmdConf), errAuthoritativeRepositoryNotExist.Error())
+ })
+
+ t.Run("records already exist", func(t *testing.T) {
+ relativePath := fmt.Sprintf("path/to/test/repo_2_%s", tn)
+
+ require.NoError(t, createRepoThroughGitaly1(relativePath))
+ require.DirExists(t, filepath.Join(g1Cfg.Storages[0].Path, relativePath))
+ require.NoDirExists(t, filepath.Join(g2Cfg.Storages[0].Path, relativePath))
+
+ ds := datastore.NewPostgresRepositoryStore(db, conf.StorageNames())
+ require.NoError(t, err)
+ require.NoError(t, ds.CreateRepository(ctx, virtualStorageName, relativePath, g1Cfg.Storages[0].Name, nil, nil, true, true))
+
+ cmd := &trackRepository{
+ logger: testhelper.NewTestLogger(t),
+ virtualStorage: virtualStorageName,
+ relativePath: relativePath,
+ authoritativeStorage: tc.authoritativeStorage,
+ }
+
+ assert.NoError(t, cmd.Exec(flag.NewFlagSet("", flag.PanicOnError), addCmdConf))
+ })
+ })
+ }
+}