diff options
author | Will Chandler <wchandler@gitlab.com> | 2022-07-28 21:36:56 +0300 |
---|---|---|
committer | Will Chandler <wchandler@gitlab.com> | 2022-08-31 23:23:22 +0300 |
commit | f01d4c36519c3249c7fdfc9973acc618e6f4940f (patch) | |
tree | 3e807a55840417831f93efbc2814f580967d77d1 | |
parent | 927a008d4a5e08a6b1a70a42711d84af9b118057 (diff) |
praefect: Convert to trackRepositoryRequest
In preparation for a new Praefect subcommand that will track many
repositories in a single request, convert `track-repository` to use a
`trackRepositoryRequest` struct internally.
This will allow us to re-use this code when looping over multiple
requests.
-rw-r--r-- | cmd/praefect/subcmd_track_repository.go | 94 |
1 files changed, 60 insertions, 34 deletions
diff --git a/cmd/praefect/subcmd_track_repository.go b/cmd/praefect/subcmd_track_repository.go index fb377ad18..8fc4fd0a6 100644 --- a/cmd/praefect/subcmd_track_repository.go +++ b/cmd/praefect/subcmd_track_repository.go @@ -36,6 +36,12 @@ type trackRepository struct { replicateImmediately bool } +type trackRepositoryRequest struct { + RelativePath string `json:"relative_path"` + VirtualStorage string `json:"virtual_storage"` + AuthoritativeStorage string `json:"authoritative_storage"` +} + var errAuthoritativeRepositoryNotExist = errors.New("authoritative repository does not exist") func newTrackRepository(logger logrus.FieldLogger, w io.Writer) *trackRepository { @@ -75,7 +81,6 @@ func (cmd trackRepository) Exec(flags *flag.FlagSet, cfg config.Config) error { } ctx := correlation.ContextWithCorrelation(context.Background(), correlation.SafeRandomID()) - logger := cmd.logger.WithField("correlation_id", correlation.ExtractFromContext(ctx)) openDBCtx, cancel := context.WithTimeout(ctx, 30*time.Second) defer cancel() @@ -85,16 +90,34 @@ func (cmd trackRepository) Exec(flags *flag.FlagSet, cfg config.Config) error { } defer func() { _ = db.Close() }() - return cmd.exec(ctx, logger, db, cfg) + return cmd.exec(ctx, db, cfg) } const trackRepoErrorPrefix = "attempting to track repository in praefect database" -func (cmd *trackRepository) exec(ctx context.Context, logger logrus.FieldLogger, db *sql.DB, cfg config.Config) error { +func (cmd *trackRepository) exec(ctx context.Context, db *sql.DB, cfg config.Config) error { + logger := cmd.logger.WithField("correlation_id", correlation.ExtractFromContext(ctx)) + + req := trackRepositoryRequest{ + RelativePath: cmd.relativePath, + AuthoritativeStorage: cmd.authoritativeStorage, + VirtualStorage: cmd.virtualStorage, + } + + return req.execRequest(ctx, db, cfg, cmd.w, logger, cmd.replicateImmediately) +} + +func (req *trackRepositoryRequest) execRequest(ctx context.Context, + db *sql.DB, + cfg config.Config, + w io.Writer, + logger logrus.FieldLogger, + replicateImmediately bool, +) error { logger.WithFields(logrus.Fields{ - "virtual_storage": cmd.virtualStorage, - "relative_path": cmd.relativePath, - "authoritative_storage": cmd.authoritativeStorage, + "virtual_storage": req.VirtualStorage, + "relative_path": req.RelativePath, + "authoritative_storage": req.AuthoritativeStorage, }).Debug("track repository") var primary string @@ -102,12 +125,12 @@ func (cmd *trackRepository) exec(ctx context.Context, logger logrus.FieldLogger, var variableReplicationFactorEnabled, savePrimary bool if cfg.Failover.ElectionStrategy == config.ElectionStrategyPerRepository { savePrimary = true - primary = cmd.authoritativeStorage + primary = req.AuthoritativeStorage for _, vs := range cfg.VirtualStorages { - if vs.Name == cmd.virtualStorage { + if vs.Name == req.VirtualStorage { for _, node := range vs.Nodes { - if node.Storage == cmd.authoritativeStorage { + if node.Storage == req.AuthoritativeStorage { continue } secondaries = append(secondaries, node.Storage) @@ -116,7 +139,7 @@ func (cmd *trackRepository) exec(ctx context.Context, logger logrus.FieldLogger, } r := rand.New(rand.NewSource(time.Now().UnixNano())) - replicationFactor := cfg.DefaultReplicationFactors()[cmd.virtualStorage] + replicationFactor := cfg.DefaultReplicationFactors()[req.VirtualStorage] if replicationFactor > 0 { variableReplicationFactorEnabled = true @@ -129,7 +152,7 @@ func (cmd *trackRepository) exec(ctx context.Context, logger logrus.FieldLogger, } } else { savePrimary = false - if err := db.QueryRowContext(ctx, `SELECT node_name FROM shard_primaries WHERE shard_name = $1 AND demoted = 'false'`, cmd.virtualStorage).Scan(&primary); err != nil { + if err := db.QueryRowContext(ctx, `SELECT node_name FROM shard_primaries WHERE shard_name = $1 AND demoted = 'false'`, req.VirtualStorage).Scan(&primary); err != nil { if errors.Is(err, sql.ErrNoRows) { return fmt.Errorf("%s: no primaries found", trackRepoErrorPrefix) } @@ -137,7 +160,7 @@ func (cmd *trackRepository) exec(ctx context.Context, logger logrus.FieldLogger, } } - authoritativeRepoExists, err := cmd.authoritativeRepositoryExists(ctx, cfg, primary) + authoritativeRepoExists, err := req.authoritativeRepositoryExists(ctx, cfg, w, primary) if err != nil { return fmt.Errorf("%s: %w", trackRepoErrorPrefix, err) } @@ -162,7 +185,7 @@ func (cmd *trackRepository) exec(ctx context.Context, logger logrus.FieldLogger, store := datastore.NewPostgresRepositoryStore(db, cfg.StorageNames()) queue := datastore.NewPostgresReplicationEventQueue(db) replMgr := praefect.NewReplMgr( - cmd.logger, + logger, cfg.StorageNames(), queue, store, @@ -170,9 +193,10 @@ func (cmd *trackRepository) exec(ctx context.Context, logger logrus.FieldLogger, nodeSet, ) - repositoryID, err := cmd.trackRepository( + repositoryID, err := req.trackRepository( ctx, store, + w, primary, secondaries, savePrimary, @@ -182,24 +206,24 @@ func (cmd *trackRepository) exec(ctx context.Context, logger logrus.FieldLogger, return fmt.Errorf("%s: %w", trackRepoErrorPrefix, err) } - fmt.Fprintln(cmd.w, "Finished adding new repository to be tracked in praefect database.") + fmt.Fprintln(w, "Finished adding new repository to be tracked in praefect database.") correlationID := correlation.SafeRandomID() - connections := nodeSet.Connections()[cmd.virtualStorage] + connections := nodeSet.Connections()[req.VirtualStorage] for _, secondary := range secondaries { event := datastore.ReplicationEvent{ Job: datastore.ReplicationJob{ RepositoryID: repositoryID, Change: datastore.UpdateRepo, - RelativePath: cmd.relativePath, - VirtualStorage: cmd.virtualStorage, + RelativePath: req.RelativePath, + VirtualStorage: req.VirtualStorage, SourceNodeStorage: primary, TargetNodeStorage: secondary, }, Meta: datastore.Params{metadatahandler.CorrelationIDKey: correlationID}, } - if cmd.replicateImmediately { + if replicateImmediately { conn, ok := connections[secondary] if !ok { return fmt.Errorf("%s: connection for %q not found", trackRepoErrorPrefix, secondary) @@ -209,31 +233,32 @@ func (cmd *trackRepository) exec(ctx context.Context, logger logrus.FieldLogger, return fmt.Errorf("%s: processing replication event %w", trackRepoErrorPrefix, err) } - fmt.Fprintf(cmd.w, "Finished replicating repository to %q.\n", secondary) + fmt.Fprintf(w, "Finished replicating repository to %q.\n", secondary) continue } if _, err := queue.Enqueue(ctx, event); err != nil { return fmt.Errorf("%s: %w", trackRepoErrorPrefix, err) } - fmt.Fprintf(cmd.w, "Added replication job to replicate repository to %q.\n", secondary) + fmt.Fprintf(w, "Added replication job to replicate repository to %q.\n", secondary) } return nil } -func (cmd *trackRepository) trackRepository( +func (req *trackRepositoryRequest) trackRepository( ctx context.Context, ds *datastore.PostgresRepositoryStore, + w io.Writer, primary string, secondaries []string, savePrimary bool, variableReplicationFactorEnabled bool, ) (int64, error) { - repositoryID, err := ds.ReserveRepositoryID(ctx, cmd.virtualStorage, cmd.relativePath) + repositoryID, err := ds.ReserveRepositoryID(ctx, req.VirtualStorage, req.RelativePath) if err != nil { if errors.Is(err, commonerr.ErrRepositoryAlreadyExists) { - fmt.Fprintf(cmd.w, "repository is already tracked in praefect database") + fmt.Fprintf(w, "repository is already tracked in praefect database") return 0, nil } @@ -243,9 +268,9 @@ func (cmd *trackRepository) trackRepository( if err := ds.CreateRepository( ctx, repositoryID, - cmd.virtualStorage, - cmd.relativePath, - cmd.relativePath, + req.VirtualStorage, + req.RelativePath, + req.RelativePath, primary, nil, secondaries, @@ -275,27 +300,28 @@ func repositoryExists(ctx context.Context, repo *gitalypb.Repository, addr, toke return res.GetExists(), nil } -func (cmd *trackRepository) authoritativeRepositoryExists(ctx context.Context, cfg config.Config, nodeName string) (bool, error) { +func (req *trackRepositoryRequest) authoritativeRepositoryExists(ctx context.Context, cfg config.Config, w io.Writer, nodeName string) (bool, error) { for _, vs := range cfg.VirtualStorages { - if vs.Name != cmd.virtualStorage { + if vs.Name != req.VirtualStorage { continue } for _, node := range vs.Nodes { if node.Storage == nodeName { - logger.Debugf("check if repository %q exists on gitaly %q at %q", cmd.relativePath, node.Storage, node.Address) + logger.Debugf("check if repository %q exists on gitaly %q at %q", req.RelativePath, node.Storage, node.Address) repo := &gitalypb.Repository{ StorageName: node.Storage, - RelativePath: cmd.relativePath, + RelativePath: req.RelativePath, } exists, err := repositoryExists(ctx, repo, node.Address, node.Token) if err != nil { - return false, fmt.Errorf("checking if repository exists %q, %q", node.Storage, cmd.relativePath) + fmt.Fprintf(w, "checking if repository exists %q, %q", node.Storage, req.RelativePath) + return false, nil } return exists, nil } } - return false, fmt.Errorf("node %q not found", cmd.authoritativeStorage) + return false, fmt.Errorf("node %q not found", req.AuthoritativeStorage) } - return false, fmt.Errorf("virtual storage %q not found", cmd.virtualStorage) + return false, fmt.Errorf("virtual storage %q not found", req.VirtualStorage) } |