Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
Diffstat (limited to 'cmd/praefect/subcmd_track_repository.go')
-rw-r--r--cmd/praefect/subcmd_track_repository.go230
1 files changed, 0 insertions, 230 deletions
diff --git a/cmd/praefect/subcmd_track_repository.go b/cmd/praefect/subcmd_track_repository.go
deleted file mode 100644
index 0be14d778..000000000
--- a/cmd/praefect/subcmd_track_repository.go
+++ /dev/null
@@ -1,230 +0,0 @@
-package main
-
-import (
- "context"
- "database/sql"
- "errors"
- "flag"
- "fmt"
- "math/rand"
- "time"
-
- "github.com/sirupsen/logrus"
- "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/commonerr"
- "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/config"
- "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore"
- "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore/glsql"
- "gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb"
- "gitlab.com/gitlab-org/labkit/correlation"
- "google.golang.org/grpc/metadata"
-)
-
-const (
- trackRepositoryCmdName = "track-repository"
-)
-
-type trackRepository struct {
- logger logrus.FieldLogger
- virtualStorage string
- relativePath string
- authoritativeStorage string
-}
-
-var errAuthoritativeRepositoryNotExist = errors.New("authoritative repository does not exist")
-
-func newTrackRepository(logger logrus.FieldLogger) *trackRepository {
- return &trackRepository{logger: logger}
-}
-
-func (cmd *trackRepository) FlagSet() *flag.FlagSet {
- fs := flag.NewFlagSet(trackRepositoryCmdName, flag.ExitOnError)
- fs.StringVar(&cmd.virtualStorage, paramVirtualStorage, "", "name of the repository's virtual storage")
- fs.StringVar(&cmd.relativePath, paramRelativePath, "", "relative path to the repository")
- fs.StringVar(&cmd.authoritativeStorage, paramAuthoritativeStorage, "", "storage with the repository to consider as authoritative")
- fs.Usage = func() {
- _, _ = printfErr("Description:\n" +
- " This command adds a given repository to be tracked by Praefect.\n" +
- " It checks if the repository exists on disk on the authoritative storage, " +
- " and whether database records are absent from tracking the repository.")
- fs.PrintDefaults()
- }
- return fs
-}
-
-func (cmd trackRepository) Exec(flags *flag.FlagSet, cfg config.Config) error {
- switch {
- case flags.NArg() > 0:
- return unexpectedPositionalArgsError{Command: flags.Name()}
- case cmd.virtualStorage == "":
- return requiredParameterError(paramVirtualStorage)
- case cmd.relativePath == "":
- return requiredParameterError(paramRelativePath)
- case cmd.authoritativeStorage == "":
- if cfg.Failover.ElectionStrategy == config.ElectionStrategyPerRepository {
- return requiredParameterError(paramAuthoritativeStorage)
- }
- }
-
- db, err := glsql.OpenDB(cfg.DB)
- if err != nil {
- return fmt.Errorf("connect to database: %w", err)
- }
- defer func() { _ = db.Close() }()
-
- ctx := correlation.ContextWithCorrelation(context.Background(), correlation.SafeRandomID())
- logger := cmd.logger.WithField("correlation_id", correlation.ExtractFromContext(ctx))
-
- return cmd.exec(ctx, logger, db, cfg)
-}
-
-const trackRepoErrorPrefix = "attempting to track repository in praefect database"
-
-func (cmd *trackRepository) exec(ctx context.Context, logger logrus.FieldLogger, db *sql.DB, cfg config.Config) error {
- logger.WithFields(logrus.Fields{
- "virtual_storage": cmd.virtualStorage,
- "relative_path": cmd.relativePath,
- "authoritative_storage": cmd.authoritativeStorage,
- }).Debug("track repository")
-
- var primary string
- var secondaries []string
- var variableReplicationFactorEnabled, savePrimary bool
- if cfg.Failover.ElectionStrategy == config.ElectionStrategyPerRepository {
- savePrimary = true
- primary = cmd.authoritativeStorage
-
- for _, vs := range cfg.VirtualStorages {
- if vs.Name == cmd.virtualStorage {
- for _, node := range vs.Nodes {
- if node.Storage == cmd.authoritativeStorage {
- continue
- }
- secondaries = append(secondaries, node.Storage)
- }
- }
- }
-
- r := rand.New(rand.NewSource(time.Now().UnixNano()))
- replicationFactor := cfg.DefaultReplicationFactors()[cmd.virtualStorage]
-
- if replicationFactor > 0 {
- variableReplicationFactorEnabled = true
- // Select random secondaries according to the default replication factor.
- r.Shuffle(len(secondaries), func(i, j int) {
- secondaries[i], secondaries[j] = secondaries[j], secondaries[i]
- })
-
- secondaries = secondaries[:replicationFactor-1]
- }
- } else {
- savePrimary = false
- if err := db.QueryRowContext(ctx, `SELECT node_name FROM shard_primaries WHERE shard_name = $1 AND demoted = 'false'`, cmd.virtualStorage).Scan(&primary); err != nil {
- if errors.Is(err, sql.ErrNoRows) {
- return fmt.Errorf("%s: no primaries found", trackRepoErrorPrefix)
- }
- return fmt.Errorf("%s: %w", trackRepoErrorPrefix, err)
- }
- }
-
- authoritativeRepoExists, err := cmd.authoritativeRepositoryExists(ctx, cfg, primary)
- if err != nil {
- return fmt.Errorf("%s: %w", trackRepoErrorPrefix, err)
- }
-
- if !authoritativeRepoExists {
- return fmt.Errorf("%s: %w", trackRepoErrorPrefix, errAuthoritativeRepositoryNotExist)
- }
-
- if err := cmd.trackRepository(
- ctx,
- datastore.NewPostgresRepositoryStore(db, cfg.StorageNames()),
- primary,
- secondaries,
- savePrimary,
- variableReplicationFactorEnabled,
- ); err != nil {
- return fmt.Errorf("%s: %w", trackRepoErrorPrefix, err)
- }
-
- logger.Debug("finished adding new repository to be tracked in praefect database.")
-
- return nil
-}
-
-func (cmd *trackRepository) trackRepository(
- ctx context.Context,
- ds *datastore.PostgresRepositoryStore,
- primary string,
- secondaries []string,
- savePrimary bool,
- variableReplicationFactorEnabled bool,
-) error {
- repositoryID, err := ds.ReserveRepositoryID(ctx, cmd.virtualStorage, cmd.relativePath)
- if err != nil {
- if errors.Is(err, commonerr.ErrRepositoryAlreadyExists) {
- cmd.logger.Print("repository is already tracked in praefect database")
- return nil
- }
-
- return fmt.Errorf("ReserveRepositoryID: %w", err)
- }
-
- if err := ds.CreateRepository(
- ctx,
- repositoryID,
- cmd.virtualStorage,
- cmd.relativePath,
- primary,
- nil,
- secondaries,
- savePrimary,
- variableReplicationFactorEnabled,
- ); err != nil {
- return fmt.Errorf("CreateRepository: %w", err)
- }
-
- return nil
-}
-
-func (cmd *trackRepository) repositoryExists(ctx context.Context, repo *gitalypb.Repository, addr, token string) (bool, error) {
- conn, err := subCmdDial(addr, token)
- if err != nil {
- return false, fmt.Errorf("error dialing: %w", err)
- }
- defer func() { _ = conn.Close() }()
-
- ctx = metadata.AppendToOutgoingContext(ctx, "client_name", trackRepositoryCmdName)
- repositoryClient := gitalypb.NewRepositoryServiceClient(conn)
- res, err := repositoryClient.RepositoryExists(ctx, &gitalypb.RepositoryExistsRequest{Repository: repo})
- if err != nil {
- return false, err
- }
-
- return res.GetExists(), nil
-}
-
-func (cmd *trackRepository) authoritativeRepositoryExists(ctx context.Context, cfg config.Config, nodeName string) (bool, error) {
- for _, vs := range cfg.VirtualStorages {
- if vs.Name != cmd.virtualStorage {
- continue
- }
-
- for _, node := range vs.Nodes {
- if node.Storage == nodeName {
- logger.Debugf("check if repository %q exists on gitaly %q at %q", cmd.relativePath, node.Storage, node.Address)
- repo := &gitalypb.Repository{
- StorageName: node.Storage,
- RelativePath: cmd.relativePath,
- }
- exists, err := cmd.repositoryExists(ctx, repo, node.Address, node.Token)
- if err != nil {
- logger.WithError(err).Warnf("checking if repository exists %q, %q", node.Storage, cmd.relativePath)
- return false, nil
- }
- return exists, nil
- }
- }
- return false, fmt.Errorf("node %q not found", cmd.authoritativeStorage)
- }
- return false, fmt.Errorf("virtual storage %q not found", cmd.virtualStorage)
-}