diff options
author | John Cai <jcai@gitlab.com> | 2021-09-27 21:56:48 +0300 |
---|---|---|
committer | John Cai <jcai@gitlab.com> | 2021-10-12 17:57:52 +0300 |
commit | 190e2e505f022ee911ead4f02fb4d22888f3f532 (patch) | |
tree | c000628b28b0e23bb5d45392f4b8e97d5ebc1c6e | |
parent | c29860242695db7fc10aa47ff9c38d0c47293beb (diff) |
Add track-repository praefect subcommandjc-backport-track-repo-13-12-stable
-rw-r--r-- | cmd/praefect/subcmd.go | 1 | ||||
-rw-r--r-- | cmd/praefect/subcmd_track_repository.go | 224 | ||||
-rw-r--r-- | cmd/praefect/subcmd_track_repository_test.go | 230 |
3 files changed, 455 insertions, 0 deletions
diff --git a/cmd/praefect/subcmd.go b/cmd/praefect/subcmd.go index f6ae54818..13602a25e 100644 --- a/cmd/praefect/subcmd.go +++ b/cmd/praefect/subcmd.go @@ -34,6 +34,7 @@ var ( "accept-dataloss": &acceptDatalossSubcommand{}, "set-replication-factor": newSetReplicatioFactorSubcommand(os.Stdout), removeRepositoryCmdName: newRemoveRepository(logger), + trackRepositoryCmdName: newTrackRepository(logger), } ) diff --git a/cmd/praefect/subcmd_track_repository.go b/cmd/praefect/subcmd_track_repository.go new file mode 100644 index 000000000..7af6d2b01 --- /dev/null +++ b/cmd/praefect/subcmd_track_repository.go @@ -0,0 +1,224 @@ +package main + +import ( + "context" + "database/sql" + "errors" + "flag" + "fmt" + "math/rand" + "time" + + "github.com/sirupsen/logrus" + "gitlab.com/gitlab-org/gitaly/internal/praefect/config" + "gitlab.com/gitlab-org/gitaly/internal/praefect/datastore" + "gitlab.com/gitlab-org/gitaly/internal/praefect/datastore/glsql" + "gitlab.com/gitlab-org/gitaly/proto/go/gitalypb" + "gitlab.com/gitlab-org/labkit/correlation" + "google.golang.org/grpc/metadata" +) + +const ( + trackRepositoryCmdName = "track-repository" +) + +type trackRepository struct { + logger logrus.FieldLogger + virtualStorage string + relativePath string + authoritativeStorage string +} + +var errAuthoritativeRepositoryNotExist = errors.New("authoritative repository does not exist") + +func newTrackRepository(logger logrus.FieldLogger) *trackRepository { + return &trackRepository{logger: logger} +} + +func (cmd *trackRepository) FlagSet() *flag.FlagSet { + fs := flag.NewFlagSet(trackRepositoryCmdName, flag.ExitOnError) + fs.StringVar(&cmd.virtualStorage, paramVirtualStorage, "", "name of the repository's virtual storage") + fs.StringVar(&cmd.relativePath, paramRelativePath, "", "relative path to the repository") + fs.StringVar(&cmd.authoritativeStorage, paramAuthoritativeStorage, "", "storage with the repository to consider as authoritative") + fs.Usage = func() { + _, _ = printfErr("Description:\n" + + " This command adds a given repository to be tracked by Praefect.\n" + + " It checks if the repository exists on disk on the authoritative storage, " + + " and whether database records are absent from tracking the repository.") + fs.PrintDefaults() + } + return fs +} + +func (cmd trackRepository) Exec(flags *flag.FlagSet, cfg config.Config) error { + switch { + case flags.NArg() > 0: + return unexpectedPositionalArgsError{Command: flags.Name()} + case cmd.virtualStorage == "": + return requiredParameterError(paramVirtualStorage) + case cmd.relativePath == "": + return requiredParameterError(paramRelativePath) + case cmd.authoritativeStorage == "": + if cfg.Failover.ElectionStrategy == config.ElectionStrategyPerRepository { + return requiredParameterError(paramAuthoritativeStorage) + } + } + + db, err := glsql.OpenDB(cfg.DB) + if err != nil { + return fmt.Errorf("connect to database: %w", err) + } + defer func() { _ = db.Close() }() + + ctx := correlation.ContextWithCorrelation(context.Background(), correlation.SafeRandomID()) + logger := cmd.logger.WithField("correlation_id", correlation.ExtractFromContext(ctx)) + + return cmd.exec(ctx, logger, db, cfg) +} + +const trackRepoErrorPrefix = "attempting to track repository in praefect database" + +func (cmd *trackRepository) exec(ctx context.Context, logger logrus.FieldLogger, db *sql.DB, cfg config.Config) error { + logger.WithFields(logrus.Fields{ + "virtual_storage": cmd.virtualStorage, + "relative_path": cmd.relativePath, + "authoritative_storage": cmd.authoritativeStorage, + }).Debug("track repository") + + var primary string + var secondaries []string + var variableReplicationFactorEnabled, savePrimary bool + if cfg.Failover.ElectionStrategy == config.ElectionStrategyPerRepository { + savePrimary = true + primary = cmd.authoritativeStorage + + for _, vs := range cfg.VirtualStorages { + if vs.Name == cmd.virtualStorage { + for _, node := range vs.Nodes { + if node.Storage == cmd.authoritativeStorage { + continue + } + secondaries = append(secondaries, node.Storage) + } + } + } + + r := rand.New(rand.NewSource(time.Now().UnixNano())) + replicationFactor := cfg.DefaultReplicationFactors()[cmd.virtualStorage] + + if replicationFactor > 0 { + variableReplicationFactorEnabled = true + // Select random secondaries according to the default replication factor. + r.Shuffle(len(secondaries), func(i, j int) { + secondaries[i], secondaries[j] = secondaries[j], secondaries[i] + }) + + secondaries = secondaries[:replicationFactor-1] + } + } else { + savePrimary = false + if err := db.QueryRowContext(ctx, `SELECT node_name FROM shard_primaries WHERE shard_name = $1 AND demoted = 'false'`, cmd.virtualStorage).Scan(&primary); err != nil { + if errors.Is(err, sql.ErrNoRows) { + return fmt.Errorf("%s: no primaries found", trackRepoErrorPrefix) + } + return fmt.Errorf("%s: %w", trackRepoErrorPrefix, err) + } + } + + authoritativeRepoExists, err := cmd.authoritativeRepositoryExists(ctx, cfg, primary) + if err != nil { + return fmt.Errorf("%s: %w", trackRepoErrorPrefix, err) + } + + if !authoritativeRepoExists { + return fmt.Errorf("%s: %w", trackRepoErrorPrefix, errAuthoritativeRepositoryNotExist) + } + + if err := cmd.trackRepository( + ctx, + datastore.NewPostgresRepositoryStore(db, cfg.StorageNames()), + primary, + secondaries, + savePrimary, + variableReplicationFactorEnabled, + ); err != nil { + return fmt.Errorf("%s: %w", trackRepoErrorPrefix, err) + } + + logger.Debug("finished adding new repository to be tracked in praefect database.") + + return nil +} + +func (cmd *trackRepository) trackRepository( + ctx context.Context, + ds *datastore.PostgresRepositoryStore, + primary string, + secondaries []string, + savePrimary bool, + variableReplicationFactorEnabled bool, +) error { + if err := ds.CreateRepository( + ctx, + cmd.virtualStorage, + cmd.relativePath, + primary, + nil, + secondaries, + savePrimary, + variableReplicationFactorEnabled, + ); err != nil { + var repoExistsError *datastore.RepositoryExistsError + if errors.As(err, &repoExistsError) { + cmd.logger.Print("repository is already tracked in praefect database") + return nil + } + + return fmt.Errorf("CreateRepository: %w", err) + } + + return nil +} + +func (cmd *trackRepository) repositoryExists(ctx context.Context, repo *gitalypb.Repository, addr, token string) (bool, error) { + conn, err := subCmdDial(addr, token) + if err != nil { + return false, fmt.Errorf("error dialing: %w", err) + } + defer func() { _ = conn.Close() }() + + ctx = metadata.AppendToOutgoingContext(ctx, "client_name", trackRepositoryCmdName) + repositoryClient := gitalypb.NewRepositoryServiceClient(conn) + res, err := repositoryClient.RepositoryExists(ctx, &gitalypb.RepositoryExistsRequest{Repository: repo}) + if err != nil { + return false, err + } + + return res.GetExists(), nil +} + +func (cmd *trackRepository) authoritativeRepositoryExists(ctx context.Context, cfg config.Config, nodeName string) (bool, error) { + for _, vs := range cfg.VirtualStorages { + if vs.Name != cmd.virtualStorage { + continue + } + + for _, node := range vs.Nodes { + if node.Storage == nodeName { + logger.Debugf("check if repository %q exists on gitaly %q at %q", cmd.relativePath, node.Storage, node.Address) + repo := &gitalypb.Repository{ + StorageName: node.Storage, + RelativePath: cmd.relativePath, + } + exists, err := cmd.repositoryExists(ctx, repo, node.Address, node.Token) + if err != nil { + logger.WithError(err).Warnf("checking if repository exists %q, %q", node.Storage, cmd.relativePath) + return false, nil + } + return exists, nil + } + } + return false, fmt.Errorf("node %q not found", cmd.authoritativeStorage) + } + return false, fmt.Errorf("virtual storage %q not found", cmd.virtualStorage) +} diff --git a/cmd/praefect/subcmd_track_repository_test.go b/cmd/praefect/subcmd_track_repository_test.go new file mode 100644 index 000000000..f1d106d4f --- /dev/null +++ b/cmd/praefect/subcmd_track_repository_test.go @@ -0,0 +1,230 @@ +//go:build postgres +// +build postgres + +package main + +import ( + "flag" + "fmt" + "path/filepath" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "gitlab.com/gitlab-org/gitaly/client" + "gitlab.com/gitlab-org/gitaly/internal/gitaly/service/setup" + "gitlab.com/gitlab-org/gitaly/internal/praefect/config" + "gitlab.com/gitlab-org/gitaly/internal/praefect/datastore" + "gitlab.com/gitlab-org/gitaly/internal/praefect/datastore/glsql" + "gitlab.com/gitlab-org/gitaly/internal/praefect/nodes" + "gitlab.com/gitlab-org/gitaly/internal/praefect/protoregistry" + "gitlab.com/gitlab-org/gitaly/internal/testhelper" + "gitlab.com/gitlab-org/gitaly/internal/testhelper/promtest" + "gitlab.com/gitlab-org/gitaly/internal/testhelper/testcfg" + "gitlab.com/gitlab-org/gitaly/internal/testhelper/testserver" + "gitlab.com/gitlab-org/gitaly/proto/go/gitalypb" +) + +func TestTrackRepository_FlagSet(t *testing.T) { + cmd := &trackRepository{} + fs := cmd.FlagSet() + require.NoError(t, fs.Parse([]string{"--virtual-storage", "vs", "--repository", "repo", "--authoritative-storage", "storage-0"})) + require.Equal(t, "vs", cmd.virtualStorage) + require.Equal(t, "repo", cmd.relativePath) + require.Equal(t, "storage-0", cmd.authoritativeStorage) +} + +func TestTrackRepository_Exec_invalidArgs(t *testing.T) { + t.Run("not all flag values processed", func(t *testing.T) { + cmd := trackRepository{} + flagSet := flag.NewFlagSet("cmd", flag.PanicOnError) + require.NoError(t, flagSet.Parse([]string{"stub"})) + err := cmd.Exec(flagSet, config.Config{}) + require.EqualError(t, err, "cmd doesn't accept positional arguments") + }) + + t.Run("virtual-storage is not set", func(t *testing.T) { + cmd := trackRepository{} + err := cmd.Exec(flag.NewFlagSet("", flag.PanicOnError), config.Config{}) + require.EqualError(t, err, `"virtual-storage" is a required parameter`) + }) + + t.Run("repository is not set", func(t *testing.T) { + cmd := trackRepository{virtualStorage: "stub"} + err := cmd.Exec(flag.NewFlagSet("", flag.PanicOnError), config.Config{}) + require.EqualError(t, err, `"repository" is a required parameter`) + }) + + t.Run("authoritative-storage is not set", func(t *testing.T) { + cmd := trackRepository{virtualStorage: "stub", relativePath: "path/to/repo"} + err := cmd.Exec(flag.NewFlagSet("", flag.PanicOnError), config.Config{Failover: config.Failover{ElectionStrategy: config.ElectionStrategyPerRepository}}) + require.EqualError(t, err, `"authoritative-storage" is a required parameter`) + }) + + t.Run("db connection error", func(t *testing.T) { + cmd := trackRepository{virtualStorage: "stub", relativePath: "stub", authoritativeStorage: "storage-0"} + cfg := config.Config{DB: config.DB{Host: "stub", SSLMode: "disable"}} + err := cmd.Exec(flag.NewFlagSet("", flag.PanicOnError), cfg) + require.Error(t, err) + require.Contains(t, err.Error(), "connect to database: dial tcp: lookup stub") + }) +} + +func TestTrackRepository_Exec(t *testing.T) { + g1Cfg := testcfg.Build(t, testcfg.WithStorages("gitaly-1")) + g2Cfg := testcfg.Build(t, testcfg.WithStorages("gitaly-2")) + + g1Srv := testserver.StartGitalyServer(t, g1Cfg, nil, setup.RegisterAll, testserver.WithDisablePraefect()) + g2Srv := testserver.StartGitalyServer(t, g2Cfg, nil, setup.RegisterAll, testserver.WithDisablePraefect()) + defer g2Srv.Shutdown() + defer g1Srv.Shutdown() + + g1Addr := g1Srv.Address() + + db := getDB(t) + var database string + require.NoError(t, db.QueryRow(`SELECT current_database()`).Scan(&database)) + dbConf := glsql.GetDBConfig(t, database) + + virtualStorageName := "praefect" + conf := config.Config{ + SocketPath: testhelper.GetTemporaryGitalySocketFileName(t), + VirtualStorages: []*config.VirtualStorage{ + { + Name: virtualStorageName, + Nodes: []*config.Node{ + {Storage: g1Cfg.Storages[0].Name, Address: g1Addr}, + {Storage: g2Cfg.Storages[0].Name, Address: g2Srv.Address()}, + }, + DefaultReplicationFactor: 2, + }, + }, + DB: dbConf, + } + + gitalyCC, err := client.Dial(g1Addr, nil) + require.NoError(t, err) + defer func() { require.NoError(t, gitalyCC.Close()) }() + ctx, cancel := testhelper.Context() + defer cancel() + + gitaly1RepositoryClient := gitalypb.NewRepositoryServiceClient(gitalyCC) + + createRepoThroughGitaly1 := func(relativePath string) error { + _, err := gitaly1RepositoryClient.CreateRepository( + ctx, + &gitalypb.CreateRepositoryRequest{ + Repository: &gitalypb.Repository{ + StorageName: g1Cfg.Storages[0].Name, + RelativePath: relativePath, + }, + }) + return err + } + + testCases := map[string]struct { + failoverConfig config.Failover + authoritativeStorage string + }{ + "sql election": { + failoverConfig: config.Failover{ + Enabled: true, + ElectionStrategy: config.ElectionStrategySQL, + }, + authoritativeStorage: "", + }, + "per repository election": { + failoverConfig: config.Failover{ + Enabled: true, + ElectionStrategy: config.ElectionStrategyPerRepository, + }, + authoritativeStorage: g1Cfg.Storages[0].Name, + }, + } + + logger := testhelper.NewTestLogger(t) + for tn, tc := range testCases { + t.Run(tn, func(t *testing.T) { + addCmdConf := conf + addCmdConf.Failover = tc.failoverConfig + + t.Run("ok", func(t *testing.T) { + nodeMgr, err := nodes.NewManager(testhelper.DiscardTestEntry(t), addCmdConf, db.DB, nil, promtest.NewMockHistogramVec(), protoregistry.GitalyProtoPreregistered, nil, nil) + require.NoError(t, err) + nodeMgr.Start(0, time.Hour) + + relativePath := fmt.Sprintf("path/to/test/repo_%s", tn) + repoDS := datastore.NewPostgresRepositoryStore(db, conf.StorageNames()) + + require.NoError(t, createRepoThroughGitaly1(relativePath)) + + rmRepoCmd := &removeRepository{ + logger: logger, + virtualStorage: virtualStorageName, + relativePath: relativePath, + } + + require.NoError(t, rmRepoCmd.Exec(flag.NewFlagSet("", flag.PanicOnError), conf)) + + // create the repo on Gitaly without Praefect knowing + require.NoError(t, createRepoThroughGitaly1(relativePath)) + require.DirExists(t, filepath.Join(g1Cfg.Storages[0].Path, relativePath)) + require.NoDirExists(t, filepath.Join(g2Cfg.Storages[0].Path, relativePath)) + + addRepoCmd := &trackRepository{ + logger: logger, + virtualStorage: virtualStorageName, + relativePath: relativePath, + authoritativeStorage: tc.authoritativeStorage, + } + + require.NoError(t, addRepoCmd.Exec(flag.NewFlagSet("", flag.PanicOnError), addCmdConf)) + as := datastore.NewAssignmentStore(db, conf.StorageNames()) + + assignments, err := as.GetHostAssignments(ctx, virtualStorageName, relativePath) + require.NoError(t, err) + require.Len(t, assignments, 2) + assert.Contains(t, assignments, g1Cfg.Storages[0].Name) + assert.Contains(t, assignments, g2Cfg.Storages[0].Name) + + exists, err := repoDS.RepositoryExists(ctx, virtualStorageName, relativePath) + require.NoError(t, err) + assert.True(t, exists) + }) + + t.Run("repository does not exist", func(t *testing.T) { + relativePath := fmt.Sprintf("path/to/test/repo_1_%s", tn) + + cmd := &trackRepository{ + logger: testhelper.NewTestLogger(t), + virtualStorage: "praefect", + relativePath: relativePath, + authoritativeStorage: tc.authoritativeStorage, + } + assert.EqualError(t, cmd.Exec(flag.NewFlagSet("", flag.PanicOnError), addCmdConf), errAuthoritativeRepositoryNotExist.Error()) + }) + + t.Run("records already exist", func(t *testing.T) { + relativePath := fmt.Sprintf("path/to/test/repo_2_%s", tn) + + require.NoError(t, createRepoThroughGitaly1(relativePath)) + require.DirExists(t, filepath.Join(g1Cfg.Storages[0].Path, relativePath)) + require.NoDirExists(t, filepath.Join(g2Cfg.Storages[0].Path, relativePath)) + + ds := datastore.NewPostgresRepositoryStore(db, conf.StorageNames()) + require.NoError(t, err) + require.NoError(t, ds.CreateRepository(ctx, virtualStorageName, relativePath, g1Cfg.Storages[0].Name, nil, nil, true, true)) + + cmd := &trackRepository{ + logger: testhelper.NewTestLogger(t), + virtualStorage: virtualStorageName, + relativePath: relativePath, + authoritativeStorage: tc.authoritativeStorage, + } + + assert.NoError(t, cmd.Exec(flag.NewFlagSet("", flag.PanicOnError), addCmdConf)) + }) + }) + } +} |