Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorWill Chandler <wchandler@gitlab.com>2022-07-28 21:36:56 +0300
committerWill Chandler <wchandler@gitlab.com>2022-08-31 23:23:22 +0300
commitf01d4c36519c3249c7fdfc9973acc618e6f4940f (patch)
tree3e807a55840417831f93efbc2814f580967d77d1
parent927a008d4a5e08a6b1a70a42711d84af9b118057 (diff)
praefect: Convert to trackRepositoryRequest
In preparation for a new Praefect subcommand that will track many repositories in a single request, convert `track-repository` to use a `trackRepositoryRequest` struct internally. This will allow us to re-use this code when looping over multiple requests.
-rw-r--r--cmd/praefect/subcmd_track_repository.go94
1 files changed, 60 insertions, 34 deletions
diff --git a/cmd/praefect/subcmd_track_repository.go b/cmd/praefect/subcmd_track_repository.go
index fb377ad18..8fc4fd0a6 100644
--- a/cmd/praefect/subcmd_track_repository.go
+++ b/cmd/praefect/subcmd_track_repository.go
@@ -36,6 +36,12 @@ type trackRepository struct {
replicateImmediately bool
}
+type trackRepositoryRequest struct {
+ RelativePath string `json:"relative_path"`
+ VirtualStorage string `json:"virtual_storage"`
+ AuthoritativeStorage string `json:"authoritative_storage"`
+}
+
var errAuthoritativeRepositoryNotExist = errors.New("authoritative repository does not exist")
func newTrackRepository(logger logrus.FieldLogger, w io.Writer) *trackRepository {
@@ -75,7 +81,6 @@ func (cmd trackRepository) Exec(flags *flag.FlagSet, cfg config.Config) error {
}
ctx := correlation.ContextWithCorrelation(context.Background(), correlation.SafeRandomID())
- logger := cmd.logger.WithField("correlation_id", correlation.ExtractFromContext(ctx))
openDBCtx, cancel := context.WithTimeout(ctx, 30*time.Second)
defer cancel()
@@ -85,16 +90,34 @@ func (cmd trackRepository) Exec(flags *flag.FlagSet, cfg config.Config) error {
}
defer func() { _ = db.Close() }()
- return cmd.exec(ctx, logger, db, cfg)
+ return cmd.exec(ctx, db, cfg)
}
const trackRepoErrorPrefix = "attempting to track repository in praefect database"
-func (cmd *trackRepository) exec(ctx context.Context, logger logrus.FieldLogger, db *sql.DB, cfg config.Config) error {
+func (cmd *trackRepository) exec(ctx context.Context, db *sql.DB, cfg config.Config) error {
+ logger := cmd.logger.WithField("correlation_id", correlation.ExtractFromContext(ctx))
+
+ req := trackRepositoryRequest{
+ RelativePath: cmd.relativePath,
+ AuthoritativeStorage: cmd.authoritativeStorage,
+ VirtualStorage: cmd.virtualStorage,
+ }
+
+ return req.execRequest(ctx, db, cfg, cmd.w, logger, cmd.replicateImmediately)
+}
+
+func (req *trackRepositoryRequest) execRequest(ctx context.Context,
+ db *sql.DB,
+ cfg config.Config,
+ w io.Writer,
+ logger logrus.FieldLogger,
+ replicateImmediately bool,
+) error {
logger.WithFields(logrus.Fields{
- "virtual_storage": cmd.virtualStorage,
- "relative_path": cmd.relativePath,
- "authoritative_storage": cmd.authoritativeStorage,
+ "virtual_storage": req.VirtualStorage,
+ "relative_path": req.RelativePath,
+ "authoritative_storage": req.AuthoritativeStorage,
}).Debug("track repository")
var primary string
@@ -102,12 +125,12 @@ func (cmd *trackRepository) exec(ctx context.Context, logger logrus.FieldLogger,
var variableReplicationFactorEnabled, savePrimary bool
if cfg.Failover.ElectionStrategy == config.ElectionStrategyPerRepository {
savePrimary = true
- primary = cmd.authoritativeStorage
+ primary = req.AuthoritativeStorage
for _, vs := range cfg.VirtualStorages {
- if vs.Name == cmd.virtualStorage {
+ if vs.Name == req.VirtualStorage {
for _, node := range vs.Nodes {
- if node.Storage == cmd.authoritativeStorage {
+ if node.Storage == req.AuthoritativeStorage {
continue
}
secondaries = append(secondaries, node.Storage)
@@ -116,7 +139,7 @@ func (cmd *trackRepository) exec(ctx context.Context, logger logrus.FieldLogger,
}
r := rand.New(rand.NewSource(time.Now().UnixNano()))
- replicationFactor := cfg.DefaultReplicationFactors()[cmd.virtualStorage]
+ replicationFactor := cfg.DefaultReplicationFactors()[req.VirtualStorage]
if replicationFactor > 0 {
variableReplicationFactorEnabled = true
@@ -129,7 +152,7 @@ func (cmd *trackRepository) exec(ctx context.Context, logger logrus.FieldLogger,
}
} else {
savePrimary = false
- if err := db.QueryRowContext(ctx, `SELECT node_name FROM shard_primaries WHERE shard_name = $1 AND demoted = 'false'`, cmd.virtualStorage).Scan(&primary); err != nil {
+ if err := db.QueryRowContext(ctx, `SELECT node_name FROM shard_primaries WHERE shard_name = $1 AND demoted = 'false'`, req.VirtualStorage).Scan(&primary); err != nil {
if errors.Is(err, sql.ErrNoRows) {
return fmt.Errorf("%s: no primaries found", trackRepoErrorPrefix)
}
@@ -137,7 +160,7 @@ func (cmd *trackRepository) exec(ctx context.Context, logger logrus.FieldLogger,
}
}
- authoritativeRepoExists, err := cmd.authoritativeRepositoryExists(ctx, cfg, primary)
+ authoritativeRepoExists, err := req.authoritativeRepositoryExists(ctx, cfg, w, primary)
if err != nil {
return fmt.Errorf("%s: %w", trackRepoErrorPrefix, err)
}
@@ -162,7 +185,7 @@ func (cmd *trackRepository) exec(ctx context.Context, logger logrus.FieldLogger,
store := datastore.NewPostgresRepositoryStore(db, cfg.StorageNames())
queue := datastore.NewPostgresReplicationEventQueue(db)
replMgr := praefect.NewReplMgr(
- cmd.logger,
+ logger,
cfg.StorageNames(),
queue,
store,
@@ -170,9 +193,10 @@ func (cmd *trackRepository) exec(ctx context.Context, logger logrus.FieldLogger,
nodeSet,
)
- repositoryID, err := cmd.trackRepository(
+ repositoryID, err := req.trackRepository(
ctx,
store,
+ w,
primary,
secondaries,
savePrimary,
@@ -182,24 +206,24 @@ func (cmd *trackRepository) exec(ctx context.Context, logger logrus.FieldLogger,
return fmt.Errorf("%s: %w", trackRepoErrorPrefix, err)
}
- fmt.Fprintln(cmd.w, "Finished adding new repository to be tracked in praefect database.")
+ fmt.Fprintln(w, "Finished adding new repository to be tracked in praefect database.")
correlationID := correlation.SafeRandomID()
- connections := nodeSet.Connections()[cmd.virtualStorage]
+ connections := nodeSet.Connections()[req.VirtualStorage]
for _, secondary := range secondaries {
event := datastore.ReplicationEvent{
Job: datastore.ReplicationJob{
RepositoryID: repositoryID,
Change: datastore.UpdateRepo,
- RelativePath: cmd.relativePath,
- VirtualStorage: cmd.virtualStorage,
+ RelativePath: req.RelativePath,
+ VirtualStorage: req.VirtualStorage,
SourceNodeStorage: primary,
TargetNodeStorage: secondary,
},
Meta: datastore.Params{metadatahandler.CorrelationIDKey: correlationID},
}
- if cmd.replicateImmediately {
+ if replicateImmediately {
conn, ok := connections[secondary]
if !ok {
return fmt.Errorf("%s: connection for %q not found", trackRepoErrorPrefix, secondary)
@@ -209,31 +233,32 @@ func (cmd *trackRepository) exec(ctx context.Context, logger logrus.FieldLogger,
return fmt.Errorf("%s: processing replication event %w", trackRepoErrorPrefix, err)
}
- fmt.Fprintf(cmd.w, "Finished replicating repository to %q.\n", secondary)
+ fmt.Fprintf(w, "Finished replicating repository to %q.\n", secondary)
continue
}
if _, err := queue.Enqueue(ctx, event); err != nil {
return fmt.Errorf("%s: %w", trackRepoErrorPrefix, err)
}
- fmt.Fprintf(cmd.w, "Added replication job to replicate repository to %q.\n", secondary)
+ fmt.Fprintf(w, "Added replication job to replicate repository to %q.\n", secondary)
}
return nil
}
-func (cmd *trackRepository) trackRepository(
+func (req *trackRepositoryRequest) trackRepository(
ctx context.Context,
ds *datastore.PostgresRepositoryStore,
+ w io.Writer,
primary string,
secondaries []string,
savePrimary bool,
variableReplicationFactorEnabled bool,
) (int64, error) {
- repositoryID, err := ds.ReserveRepositoryID(ctx, cmd.virtualStorage, cmd.relativePath)
+ repositoryID, err := ds.ReserveRepositoryID(ctx, req.VirtualStorage, req.RelativePath)
if err != nil {
if errors.Is(err, commonerr.ErrRepositoryAlreadyExists) {
- fmt.Fprintf(cmd.w, "repository is already tracked in praefect database")
+ fmt.Fprintf(w, "repository is already tracked in praefect database")
return 0, nil
}
@@ -243,9 +268,9 @@ func (cmd *trackRepository) trackRepository(
if err := ds.CreateRepository(
ctx,
repositoryID,
- cmd.virtualStorage,
- cmd.relativePath,
- cmd.relativePath,
+ req.VirtualStorage,
+ req.RelativePath,
+ req.RelativePath,
primary,
nil,
secondaries,
@@ -275,27 +300,28 @@ func repositoryExists(ctx context.Context, repo *gitalypb.Repository, addr, toke
return res.GetExists(), nil
}
-func (cmd *trackRepository) authoritativeRepositoryExists(ctx context.Context, cfg config.Config, nodeName string) (bool, error) {
+func (req *trackRepositoryRequest) authoritativeRepositoryExists(ctx context.Context, cfg config.Config, w io.Writer, nodeName string) (bool, error) {
for _, vs := range cfg.VirtualStorages {
- if vs.Name != cmd.virtualStorage {
+ if vs.Name != req.VirtualStorage {
continue
}
for _, node := range vs.Nodes {
if node.Storage == nodeName {
- logger.Debugf("check if repository %q exists on gitaly %q at %q", cmd.relativePath, node.Storage, node.Address)
+ logger.Debugf("check if repository %q exists on gitaly %q at %q", req.RelativePath, node.Storage, node.Address)
repo := &gitalypb.Repository{
StorageName: node.Storage,
- RelativePath: cmd.relativePath,
+ RelativePath: req.RelativePath,
}
exists, err := repositoryExists(ctx, repo, node.Address, node.Token)
if err != nil {
- return false, fmt.Errorf("checking if repository exists %q, %q", node.Storage, cmd.relativePath)
+ fmt.Fprintf(w, "checking if repository exists %q, %q", node.Storage, req.RelativePath)
+ return false, nil
}
return exists, nil
}
}
- return false, fmt.Errorf("node %q not found", cmd.authoritativeStorage)
+ return false, fmt.Errorf("node %q not found", req.AuthoritativeStorage)
}
- return false, fmt.Errorf("virtual storage %q not found", cmd.virtualStorage)
+ return false, fmt.Errorf("virtual storage %q not found", req.VirtualStorage)
}