Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorToon Claes <toon@gitlab.com>2022-09-01 17:53:22 +0300
committerToon Claes <toon@gitlab.com>2022-09-01 17:53:22 +0300
commit60e5a0938e3496ef4b943de506b7500c48392d9b (patch)
tree571a2da1d778f64536102d785befa8ff859d009e
parentc3718f5f9a3285ad8d99b4e3383edc13b1546fda (diff)
parent69420b517a18582cfefeab4297c0ac2456b96de8 (diff)
Merge branch 'wc-add-many-repos' into 'master'
praefect: Add `track-repositories` command See merge request gitlab-org/gitaly!4845
-rw-r--r--cmd/praefect/subcmd.go1
-rw-r--r--cmd/praefect/subcmd_track_repositories.go214
-rw-r--r--cmd/praefect/subcmd_track_repositories_test.go357
-rw-r--r--cmd/praefect/subcmd_track_repository.go95
4 files changed, 632 insertions, 35 deletions
diff --git a/cmd/praefect/subcmd.go b/cmd/praefect/subcmd.go
index 78b3bcca4..53df26eee 100644
--- a/cmd/praefect/subcmd.go
+++ b/cmd/praefect/subcmd.go
@@ -42,6 +42,7 @@ var subcommands = map[string]subcmd{
setReplicationFactorCmdName: newSetReplicatioFactorSubcommand(os.Stdout),
removeRepositoryCmdName: newRemoveRepository(logger, os.Stdout),
trackRepositoryCmdName: newTrackRepository(logger, os.Stdout),
+ trackRepositoriesCmdName: newTrackRepositories(logger, os.Stdout),
listUntrackedRepositoriesName: newListUntrackedRepositories(logger, os.Stdout),
checkCmdName: newCheckSubcommand(os.Stdout, service.AllChecks()...),
metadataCmdName: newMetadataSubcommand(os.Stdout),
diff --git a/cmd/praefect/subcmd_track_repositories.go b/cmd/praefect/subcmd_track_repositories.go
new file mode 100644
index 000000000..44bdb8c71
--- /dev/null
+++ b/cmd/praefect/subcmd_track_repositories.go
@@ -0,0 +1,214 @@
+package main
+
+import (
+ "context"
+ "encoding/json"
+ "flag"
+ "fmt"
+ "io"
+ "os"
+ "time"
+
+ "github.com/sirupsen/logrus"
+ "gitlab.com/gitlab-org/gitaly/v15/internal/praefect/config"
+ "gitlab.com/gitlab-org/gitaly/v15/internal/praefect/datastore"
+ "gitlab.com/gitlab-org/gitaly/v15/internal/praefect/datastore/glsql"
+ "gitlab.com/gitlab-org/labkit/correlation"
+)
+
+const (
+ trackRepositoriesCmdName = "track-repositories"
+ paramInputPath = "input-path"
+)
+
+type invalidRequest struct {
+ reqNum int
+ path string
+ errs []error
+}
+
+type dupPathError struct {
+ path string
+ reqNums []int
+}
+
+func (d *dupPathError) Error() string {
+ return fmt.Sprintf("duplicate entries for relative_path, item #: %v", d.reqNums)
+}
+
+type trackRepositories struct {
+ w io.Writer
+ logger logrus.FieldLogger
+ inputPath string
+ replicateImmediately bool
+}
+
+func newTrackRepositories(logger logrus.FieldLogger, w io.Writer) *trackRepositories {
+ return &trackRepositories{w: w, logger: logger}
+}
+
+func (cmd *trackRepositories) FlagSet() *flag.FlagSet {
+ fs := flag.NewFlagSet(trackRepositoryCmdName, flag.ExitOnError)
+ fs.BoolVar(&cmd.replicateImmediately, "replicate-immediately", false, "kick off a replication immediately")
+ fs.StringVar(&cmd.inputPath, paramInputPath, "", "path to file with details of repositories to track")
+ fs.Usage = func() {
+ printfErr("Description:\n" +
+ " This command allows bulk requests for repositories to be tracked by Praefect.\n" +
+ " The -input-path flag must be the path of a file containing the details of the repositories\n" +
+ " to track as a list of newline-delimited JSON objects. Each entry must contain the\n" +
+ " following keys:\n\n" +
+ " relative_path - The relative path of the repository on-disk.\n" +
+ " virtual_storage - The Praefect virtual storage name.\n" +
+ " authoritative_storage - Which storage to consider as the canonical copy of the repository.\n\n" +
+ " If -replicate-immediately is used, the command will attempt to replicate the repositories\n" +
+ " to the secondaries. Otherwise, replication jobs will be created and will be executed\n" +
+ " eventually by Praefect itself.\n")
+ fs.PrintDefaults()
+ }
+ return fs
+}
+
+func (cmd trackRepositories) Exec(flags *flag.FlagSet, cfg config.Config) error {
+ switch {
+ case flags.NArg() > 0:
+ return unexpectedPositionalArgsError{Command: flags.Name()}
+ }
+
+ ctx := correlation.ContextWithCorrelation(context.Background(), correlation.SafeRandomID())
+ logger = cmd.logger.WithField("correlation_id", correlation.ExtractFromContext(ctx))
+
+ openDBCtx, cancel := context.WithTimeout(ctx, 30*time.Second)
+ defer cancel()
+ db, err := glsql.OpenDB(openDBCtx, cfg.DB)
+ if err != nil {
+ return fmt.Errorf("connect to database: %w", err)
+ }
+ defer func() { _ = db.Close() }()
+ store := datastore.NewPostgresRepositoryStore(db, cfg.StorageNames())
+
+ f, err := os.Open(cmd.inputPath)
+ if err != nil {
+ return fmt.Errorf("open input: %w", err)
+ }
+ defer f.Close()
+
+ d := json.NewDecoder(f)
+ d.DisallowUnknownFields()
+
+ fmt.Fprintf(cmd.w, "Validating repository information in %q\n", cmd.inputPath)
+
+ var requests []trackRepositoryRequest
+ var repoNum int
+ var repoErrs []invalidRequest
+ pathLines := make(map[string][]int)
+
+ // Read in and validate all requests from input file before executing. This prevents us from
+ // partially executing a file, which makes it difficult to tell which repos were actually
+ // tracked.
+ for d.More() {
+ repoNum++
+
+ request := trackRepositoryRequest{}
+ badReq := invalidRequest{reqNum: repoNum}
+
+ if err := d.Decode(&request); err != nil {
+ badReq.errs = append(badReq.errs, err)
+ repoErrs = append(repoErrs, badReq)
+
+ // Invalid request, nothing to validate.
+ continue
+ }
+
+ if request.RelativePath == "" {
+ badReq.errs = append(badReq.errs, requiredParameterError(paramRelativePath))
+ }
+ badReq.path = request.RelativePath
+
+ if request.VirtualStorage == "" {
+ badReq.errs = append(badReq.errs, requiredParameterError(paramVirtualStorage))
+ }
+ if request.AuthoritativeStorage == "" {
+ badReq.errs = append(badReq.errs, requiredParameterError(paramAuthoritativeStorage))
+ }
+ if len(badReq.errs) > 0 {
+ repoErrs = append(repoErrs, badReq)
+
+ // Incomplete request, no further validation possible.
+ continue
+ }
+
+ // Repo paths are globally unique, any attempt to add the same path multiple virtual storages
+ // is invalid and must be rejected.
+ prevLines, exists := pathLines[request.RelativePath]
+ if exists {
+ badReq.errs = append(badReq.errs, &dupPathError{path: request.RelativePath})
+ repoErrs = append(repoErrs, badReq)
+
+ prevLines = append(prevLines, repoNum)
+ pathLines[request.RelativePath] = prevLines
+
+ // We've already checked this path, no need to run further checks.
+ continue
+ }
+ pathLines[request.RelativePath] = []int{repoNum}
+
+ repoInDB, err := store.RepositoryExists(ctx, request.VirtualStorage, request.RelativePath)
+ if err != nil {
+ // Bail out if we're having trouble contacting the DB, nothing is going to work if this fails.
+ return fmt.Errorf("checking database: %w", err)
+ }
+ if repoInDB {
+ badReq.errs = append(badReq.errs, fmt.Errorf("repository is already tracked by Praefect"))
+ repoErrs = append(repoErrs, badReq)
+ // Repo already in Praefect DB, we can skip it.
+ continue
+ }
+
+ authoritativeRepoExists, err := request.authoritativeRepositoryExists(ctx, cfg, cmd.w, request.AuthoritativeStorage)
+ if err != nil {
+ badReq.errs = append(badReq.errs, fmt.Errorf("checking repository on disk: %w", err))
+ } else if !authoritativeRepoExists {
+ badReq.errs = append(badReq.errs, fmt.Errorf("not a valid git repository"))
+ }
+
+ if len(badReq.errs) > 0 {
+ repoErrs = append(repoErrs, badReq)
+ continue
+ }
+ requests = append(requests, request)
+ }
+
+ if len(repoErrs) > 0 {
+ printInvalidRequests(cmd.w, repoErrs, pathLines, cmd.inputPath)
+ return fmt.Errorf("invalid entries found, aborting")
+ }
+ if len(requests) == 0 {
+ return fmt.Errorf("no repository information found in %q", cmd.inputPath)
+ }
+
+ fmt.Fprintf(cmd.w, "All repository details are correctly formatted\n")
+ fmt.Fprintf(cmd.w, "Tracking %v repositories in Praefect DB...\n", repoNum)
+ for _, request := range requests {
+ if err := request.execRequest(ctx, db, cfg, cmd.w, logger, cmd.replicateImmediately); err != nil {
+ return fmt.Errorf("tracking repository %q: %w", request.RelativePath, err)
+ }
+ }
+
+ return nil
+}
+
+func printInvalidRequests(w io.Writer, repoErrs []invalidRequest, pathLines map[string][]int, inputPath string) {
+ fmt.Fprintf(w, "Found %v invalid request(s) in %q:\n", len(repoErrs), inputPath)
+
+ for _, l := range repoErrs {
+ fmt.Fprintf(w, " item #: %v, relative_path: %q\n", l.reqNum, l.path)
+ for _, err := range l.errs {
+ if dup, ok := err.(*dupPathError); ok {
+ // The complete set of duplicate reqNums won't be known until input is
+ // fully processed, fetch them now.
+ err = &dupPathError{path: dup.path, reqNums: pathLines[dup.path]}
+ }
+ fmt.Fprintf(w, " %v\n", err)
+ }
+ }
+}
diff --git a/cmd/praefect/subcmd_track_repositories_test.go b/cmd/praefect/subcmd_track_repositories_test.go
new file mode 100644
index 000000000..cd8e97a1b
--- /dev/null
+++ b/cmd/praefect/subcmd_track_repositories_test.go
@@ -0,0 +1,357 @@
+//go:build !gitaly_test_sha256
+
+package main
+
+import (
+ "bytes"
+ "encoding/json"
+ "flag"
+ "fmt"
+ "os"
+ "path/filepath"
+ "testing"
+ "time"
+
+ "github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
+ "gitlab.com/gitlab-org/gitaly/v15/client"
+ "gitlab.com/gitlab-org/gitaly/v15/internal/gitaly/service/setup"
+ "gitlab.com/gitlab-org/gitaly/v15/internal/praefect/config"
+ "gitlab.com/gitlab-org/gitaly/v15/internal/praefect/datastore"
+ "gitlab.com/gitlab-org/gitaly/v15/internal/praefect/nodes"
+ "gitlab.com/gitlab-org/gitaly/v15/internal/praefect/protoregistry"
+ "gitlab.com/gitlab-org/gitaly/v15/internal/testhelper"
+ "gitlab.com/gitlab-org/gitaly/v15/internal/testhelper/promtest"
+ "gitlab.com/gitlab-org/gitaly/v15/internal/testhelper/testcfg"
+ "gitlab.com/gitlab-org/gitaly/v15/internal/testhelper/testdb"
+ "gitlab.com/gitlab-org/gitaly/v15/internal/testhelper/testserver"
+ "gitlab.com/gitlab-org/gitaly/v15/proto/go/gitalypb"
+)
+
+func TestAddRepositories_FlagSet(t *testing.T) {
+ t.Parallel()
+ cmd := &trackRepositories{}
+ fs := cmd.FlagSet()
+ require.NoError(t, fs.Parse([]string{"--input-path", "/dev/stdin", "--replicate-immediately", "true"}))
+ require.Equal(t, "/dev/stdin", cmd.inputPath)
+ require.Equal(t, true, cmd.replicateImmediately)
+}
+
+func TestAddRepositories_Exec_invalidInput(t *testing.T) {
+ t.Parallel()
+ g1Cfg := testcfg.Build(t, testcfg.WithStorages("gitaly-1"))
+ g2Cfg := testcfg.Build(t, testcfg.WithStorages("gitaly-2"))
+
+ g1Srv := testserver.StartGitalyServer(t, g1Cfg, nil, setup.RegisterAll, testserver.WithDisablePraefect())
+ g2Srv := testserver.StartGitalyServer(t, g2Cfg, nil, setup.RegisterAll, testserver.WithDisablePraefect())
+ defer g2Srv.Shutdown()
+ defer g1Srv.Shutdown()
+ g1Addr := g1Srv.Address()
+
+ db := testdb.New(t)
+ dbConf := testdb.GetConfig(t, db.Name)
+
+ virtualStorageName := "praefect"
+ conf := config.Config{
+ AllowLegacyElectors: true,
+ SocketPath: testhelper.GetTemporaryGitalySocketFileName(t),
+ VirtualStorages: []*config.VirtualStorage{
+ {
+ Name: virtualStorageName,
+ Nodes: []*config.Node{
+ {Storage: g1Cfg.Storages[0].Name, Address: g1Addr},
+ {Storage: g2Cfg.Storages[0].Name, Address: g2Srv.Address()},
+ },
+ DefaultReplicationFactor: 2,
+ },
+ },
+ DB: dbConf,
+ Failover: config.Failover{
+ Enabled: true,
+ ElectionStrategy: config.ElectionStrategyPerRepository,
+ },
+ }
+ rs := datastore.NewPostgresRepositoryStore(db, nil)
+ ctx := testhelper.Context(t)
+ inputFile := "input_file"
+
+ trackRepo := func(relativePath string) error {
+ repositoryID, err := rs.ReserveRepositoryID(ctx, virtualStorageName, relativePath)
+ if err != nil {
+ return err
+ }
+ return rs.CreateRepository(
+ ctx,
+ repositoryID,
+ virtualStorageName,
+ relativePath,
+ relativePath,
+ g1Cfg.Storages[0].Name,
+ nil,
+ nil,
+ false,
+ false,
+ )
+ }
+
+ invalidEntryErr := "invalid entries found, aborting"
+
+ testCases := []struct {
+ input string
+ desc string
+ expectedOutput string
+ expectedError string
+ trackedPath string
+ }{
+ {
+ input: "",
+ desc: "empty input",
+ expectedError: "no repository information found",
+ },
+ {
+ input: `{"foo":"bar"}`,
+ desc: "unexpected key in JSON",
+ expectedOutput: `json: unknown field "foo"`,
+ expectedError: invalidEntryErr,
+ },
+ {
+ input: `{"virtual_storage":"praefect","authoritative_storage":"gitaly-1"}`,
+ desc: "missing path",
+ expectedOutput: `"repository" is a required parameter`,
+ expectedError: invalidEntryErr,
+ },
+ {
+ input: `{"virtual_storage":"foo","relative_path":"bar","authoritative_storage":"gitaly-1"}`,
+ desc: "invalid virtual storage",
+ expectedOutput: `virtual storage "foo" not found`,
+ expectedError: invalidEntryErr,
+ },
+ {
+ input: `{"relative_path":"not_a_repo","virtual_storage":"praefect","authoritative_storage":"gitaly-1"}`,
+ desc: "repo does not exist",
+ expectedOutput: "not a valid git repository",
+ expectedError: invalidEntryErr,
+ },
+ {
+ input: `{"virtual_storage":"praefect","relative_path":"duplicate","authoritative_storage":"gitaly-1"}
+{"virtual_storage":"praefect","relative_path":"duplicate","authoritative_storage":"gitaly-1"}`,
+ desc: "duplicate path",
+ expectedOutput: "duplicate entries for relative_path",
+ expectedError: invalidEntryErr,
+ },
+ {
+ input: `{"relative_path":"already_tracked","virtual_storage":"praefect","authoritative_storage":"gitaly-1"}`,
+ desc: "repo is already tracked",
+ expectedOutput: "repository is already tracked by Praefect",
+ expectedError: invalidEntryErr,
+ trackedPath: "already_tracked",
+ },
+ }
+ for _, tc := range testCases {
+ t.Run(tc.desc, func(t *testing.T) {
+ nodeMgr, err := nodes.NewManager(
+ testhelper.NewDiscardingLogEntry(t),
+ conf,
+ db.DB,
+ nil,
+ promtest.NewMockHistogramVec(),
+ protoregistry.GitalyProtoPreregistered,
+ nil,
+ nil,
+ nil,
+ )
+ require.NoError(t, err)
+ nodeMgr.Start(0, time.Hour)
+ defer nodeMgr.Stop()
+
+ tempDir := testhelper.TempDir(t)
+ f, err := os.Create(filepath.Join(tempDir, inputFile))
+ require.NoError(t, err)
+ _, err = f.Write([]byte(tc.input))
+ require.NoError(t, err)
+ require.NoError(t, f.Close())
+
+ var stdout bytes.Buffer
+
+ if tc.trackedPath != "" {
+ require.NoError(t, trackRepo(tc.trackedPath))
+ }
+
+ addReposCmd := &trackRepositories{
+ inputPath: filepath.Join(tempDir, inputFile),
+ replicateImmediately: true,
+ logger: logger,
+ w: &stdout,
+ }
+ err = addReposCmd.Exec(flag.NewFlagSet("", flag.PanicOnError), conf)
+ require.Error(t, err)
+
+ if tc.expectedOutput != "" {
+ require.Contains(t, stdout.String(), tc.expectedOutput)
+ }
+ require.Contains(t, err.Error(), tc.expectedError)
+ })
+ }
+}
+
+func TestAddRepositories_Exec(t *testing.T) {
+ t.Parallel()
+ g1Cfg := testcfg.Build(t, testcfg.WithStorages("gitaly-1"))
+ g2Cfg := testcfg.Build(t, testcfg.WithStorages("gitaly-2"))
+ testcfg.BuildGitalyHooks(t, g2Cfg)
+ testcfg.BuildGitalySSH(t, g2Cfg)
+
+ g1Srv := testserver.StartGitalyServer(t, g1Cfg, nil, setup.RegisterAll, testserver.WithDisablePraefect())
+ g2Srv := testserver.StartGitalyServer(t, g2Cfg, nil, setup.RegisterAll, testserver.WithDisablePraefect())
+ defer g2Srv.Shutdown()
+ defer g1Srv.Shutdown()
+
+ g1Addr := g1Srv.Address()
+
+ db := testdb.New(t)
+ dbConf := testdb.GetConfig(t, db.Name)
+
+ virtualStorageName := "praefect"
+ conf := config.Config{
+ AllowLegacyElectors: true,
+ SocketPath: testhelper.GetTemporaryGitalySocketFileName(t),
+ VirtualStorages: []*config.VirtualStorage{
+ {
+ Name: virtualStorageName,
+ Nodes: []*config.Node{
+ {Storage: g1Cfg.Storages[0].Name, Address: g1Addr},
+ {Storage: g2Cfg.Storages[0].Name, Address: g2Srv.Address()},
+ },
+ DefaultReplicationFactor: 2,
+ },
+ },
+ DB: dbConf,
+ Failover: config.Failover{
+ Enabled: true,
+ ElectionStrategy: config.ElectionStrategyPerRepository,
+ },
+ }
+
+ gitalyCC, err := client.Dial(g1Addr, nil)
+ require.NoError(t, err)
+ defer func() { require.NoError(t, gitalyCC.Close()) }()
+ ctx := testhelper.Context(t)
+
+ gitaly1RepositoryClient := gitalypb.NewRepositoryServiceClient(gitalyCC)
+
+ createRepoThroughGitaly1 := func(relativePath string) error {
+ _, err := gitaly1RepositoryClient.CreateRepository(
+ ctx,
+ &gitalypb.CreateRepositoryRequest{
+ Repository: &gitalypb.Repository{
+ StorageName: g1Cfg.Storages[0].Name,
+ RelativePath: relativePath,
+ },
+ })
+ return err
+ }
+
+ authoritativeStorage := g1Cfg.Storages[0].Name
+ logger := testhelper.NewDiscardingLogger(t)
+
+ t.Run("ok", func(t *testing.T) {
+ testCases := []struct {
+ relativePaths []string
+ desc string
+ replicateImmediately bool
+ expectedOutput string
+ }{
+ {
+ relativePaths: []string{"path/to/test/repo1", "path/to/test/repo2"},
+ desc: "immediate replication",
+ replicateImmediately: true,
+ expectedOutput: "Finished replicating repository to \"gitaly-2\".\n",
+ },
+ {
+ relativePaths: []string{"path/to/test/repo3", "path/to/test/repo4"},
+ desc: "no immediate replication",
+ replicateImmediately: false,
+ expectedOutput: "Added replication job to replicate repository to \"gitaly-2\".\n",
+ },
+ }
+
+ for _, tc := range testCases {
+ t.Run(tc.desc, func(t *testing.T) {
+ nodeMgr, err := nodes.NewManager(
+ testhelper.NewDiscardingLogEntry(t),
+ conf,
+ db.DB,
+ nil,
+ promtest.NewMockHistogramVec(),
+ protoregistry.GitalyProtoPreregistered,
+ nil,
+ nil,
+ nil,
+ )
+ require.NoError(t, err)
+ nodeMgr.Start(0, time.Hour)
+ defer nodeMgr.Stop()
+
+ tempDir := testhelper.TempDir(t)
+ input, err := os.Create(filepath.Join(tempDir, "input"))
+ require.NoError(t, err)
+
+ repoDS := datastore.NewPostgresRepositoryStore(db, conf.StorageNames())
+ for _, path := range tc.relativePaths {
+ exists, err := repoDS.RepositoryExists(ctx, virtualStorageName, path)
+ require.NoError(t, err)
+ require.False(t, exists)
+
+ // create the repo on Gitaly without Praefect knowing
+ require.NoError(t, createRepoThroughGitaly1(path))
+ require.DirExists(t, filepath.Join(g1Cfg.Storages[0].Path, path))
+ require.NoDirExists(t, filepath.Join(g2Cfg.Storages[0].Path, path))
+
+ // Write repo details to input file
+ repoEntry, err := json.Marshal(trackRepositoryRequest{RelativePath: path, VirtualStorage: virtualStorageName, AuthoritativeStorage: authoritativeStorage})
+ require.NoError(t, err)
+ fmt.Fprintf(input, string(repoEntry)+"\n")
+ }
+
+ var stdout bytes.Buffer
+
+ addRepoCmd := &trackRepositories{
+ inputPath: filepath.Join(tempDir, "input"),
+ replicateImmediately: tc.replicateImmediately,
+ logger: logger,
+ w: &stdout,
+ }
+
+ require.NoError(t, addRepoCmd.Exec(flag.NewFlagSet("", flag.PanicOnError), conf))
+ assert.Contains(t, stdout.String(), tc.expectedOutput)
+
+ as := datastore.NewAssignmentStore(db, conf.StorageNames())
+
+ for _, path := range tc.relativePaths {
+ repositoryID, err := repoDS.GetRepositoryID(ctx, virtualStorageName, path)
+ require.NoError(t, err)
+
+ assignments, err := as.GetHostAssignments(ctx, virtualStorageName, repositoryID)
+ require.NoError(t, err)
+ require.Len(t, assignments, 2)
+ assert.Contains(t, assignments, g1Cfg.Storages[0].Name)
+ assert.Contains(t, assignments, g2Cfg.Storages[0].Name)
+
+ exists, err := repoDS.RepositoryExists(ctx, virtualStorageName, path)
+ require.NoError(t, err)
+ assert.True(t, exists)
+
+ if !tc.replicateImmediately {
+ queue := datastore.NewPostgresReplicationEventQueue(db)
+ events, err := queue.Dequeue(ctx, virtualStorageName, g2Cfg.Storages[0].Name, 1)
+ require.NoError(t, err)
+ assert.Len(t, events, 1)
+ assert.Equal(t, path, events[0].Job.RelativePath)
+ } else {
+ require.DirExists(t, filepath.Join(g2Cfg.Storages[0].Path, path))
+ }
+ }
+ })
+ }
+ })
+}
diff --git a/cmd/praefect/subcmd_track_repository.go b/cmd/praefect/subcmd_track_repository.go
index e54ff3ca1..8fc4fd0a6 100644
--- a/cmd/praefect/subcmd_track_repository.go
+++ b/cmd/praefect/subcmd_track_repository.go
@@ -36,6 +36,12 @@ type trackRepository struct {
replicateImmediately bool
}
+type trackRepositoryRequest struct {
+ RelativePath string `json:"relative_path"`
+ VirtualStorage string `json:"virtual_storage"`
+ AuthoritativeStorage string `json:"authoritative_storage"`
+}
+
var errAuthoritativeRepositoryNotExist = errors.New("authoritative repository does not exist")
func newTrackRepository(logger logrus.FieldLogger, w io.Writer) *trackRepository {
@@ -54,7 +60,7 @@ func (cmd *trackRepository) FlagSet() *flag.FlagSet {
" It checks if the repository exists on disk on the authoritative storage,\n" +
" and whether database records are absent from tracking the repository.\n" +
" If -replicate-immediately is used, the command will attempt to replicate the repository to the secondaries.\n" +
- " Otherwise, replication jobs will be created and will be excuted eventually by Praefect itself.\n")
+ " Otherwise, replication jobs will be created and will be executed eventually by Praefect itself.\n")
fs.PrintDefaults()
}
return fs
@@ -75,7 +81,6 @@ func (cmd trackRepository) Exec(flags *flag.FlagSet, cfg config.Config) error {
}
ctx := correlation.ContextWithCorrelation(context.Background(), correlation.SafeRandomID())
- logger := cmd.logger.WithField("correlation_id", correlation.ExtractFromContext(ctx))
openDBCtx, cancel := context.WithTimeout(ctx, 30*time.Second)
defer cancel()
@@ -85,16 +90,34 @@ func (cmd trackRepository) Exec(flags *flag.FlagSet, cfg config.Config) error {
}
defer func() { _ = db.Close() }()
- return cmd.exec(ctx, logger, db, cfg)
+ return cmd.exec(ctx, db, cfg)
}
const trackRepoErrorPrefix = "attempting to track repository in praefect database"
-func (cmd *trackRepository) exec(ctx context.Context, logger logrus.FieldLogger, db *sql.DB, cfg config.Config) error {
+func (cmd *trackRepository) exec(ctx context.Context, db *sql.DB, cfg config.Config) error {
+ logger := cmd.logger.WithField("correlation_id", correlation.ExtractFromContext(ctx))
+
+ req := trackRepositoryRequest{
+ RelativePath: cmd.relativePath,
+ AuthoritativeStorage: cmd.authoritativeStorage,
+ VirtualStorage: cmd.virtualStorage,
+ }
+
+ return req.execRequest(ctx, db, cfg, cmd.w, logger, cmd.replicateImmediately)
+}
+
+func (req *trackRepositoryRequest) execRequest(ctx context.Context,
+ db *sql.DB,
+ cfg config.Config,
+ w io.Writer,
+ logger logrus.FieldLogger,
+ replicateImmediately bool,
+) error {
logger.WithFields(logrus.Fields{
- "virtual_storage": cmd.virtualStorage,
- "relative_path": cmd.relativePath,
- "authoritative_storage": cmd.authoritativeStorage,
+ "virtual_storage": req.VirtualStorage,
+ "relative_path": req.RelativePath,
+ "authoritative_storage": req.AuthoritativeStorage,
}).Debug("track repository")
var primary string
@@ -102,12 +125,12 @@ func (cmd *trackRepository) exec(ctx context.Context, logger logrus.FieldLogger,
var variableReplicationFactorEnabled, savePrimary bool
if cfg.Failover.ElectionStrategy == config.ElectionStrategyPerRepository {
savePrimary = true
- primary = cmd.authoritativeStorage
+ primary = req.AuthoritativeStorage
for _, vs := range cfg.VirtualStorages {
- if vs.Name == cmd.virtualStorage {
+ if vs.Name == req.VirtualStorage {
for _, node := range vs.Nodes {
- if node.Storage == cmd.authoritativeStorage {
+ if node.Storage == req.AuthoritativeStorage {
continue
}
secondaries = append(secondaries, node.Storage)
@@ -116,7 +139,7 @@ func (cmd *trackRepository) exec(ctx context.Context, logger logrus.FieldLogger,
}
r := rand.New(rand.NewSource(time.Now().UnixNano()))
- replicationFactor := cfg.DefaultReplicationFactors()[cmd.virtualStorage]
+ replicationFactor := cfg.DefaultReplicationFactors()[req.VirtualStorage]
if replicationFactor > 0 {
variableReplicationFactorEnabled = true
@@ -129,7 +152,7 @@ func (cmd *trackRepository) exec(ctx context.Context, logger logrus.FieldLogger,
}
} else {
savePrimary = false
- if err := db.QueryRowContext(ctx, `SELECT node_name FROM shard_primaries WHERE shard_name = $1 AND demoted = 'false'`, cmd.virtualStorage).Scan(&primary); err != nil {
+ if err := db.QueryRowContext(ctx, `SELECT node_name FROM shard_primaries WHERE shard_name = $1 AND demoted = 'false'`, req.VirtualStorage).Scan(&primary); err != nil {
if errors.Is(err, sql.ErrNoRows) {
return fmt.Errorf("%s: no primaries found", trackRepoErrorPrefix)
}
@@ -137,7 +160,7 @@ func (cmd *trackRepository) exec(ctx context.Context, logger logrus.FieldLogger,
}
}
- authoritativeRepoExists, err := cmd.authoritativeRepositoryExists(ctx, cfg, primary)
+ authoritativeRepoExists, err := req.authoritativeRepositoryExists(ctx, cfg, w, primary)
if err != nil {
return fmt.Errorf("%s: %w", trackRepoErrorPrefix, err)
}
@@ -162,7 +185,7 @@ func (cmd *trackRepository) exec(ctx context.Context, logger logrus.FieldLogger,
store := datastore.NewPostgresRepositoryStore(db, cfg.StorageNames())
queue := datastore.NewPostgresReplicationEventQueue(db)
replMgr := praefect.NewReplMgr(
- cmd.logger,
+ logger,
cfg.StorageNames(),
queue,
store,
@@ -170,9 +193,10 @@ func (cmd *trackRepository) exec(ctx context.Context, logger logrus.FieldLogger,
nodeSet,
)
- repositoryID, err := cmd.trackRepository(
+ repositoryID, err := req.trackRepository(
ctx,
store,
+ w,
primary,
secondaries,
savePrimary,
@@ -182,24 +206,24 @@ func (cmd *trackRepository) exec(ctx context.Context, logger logrus.FieldLogger,
return fmt.Errorf("%s: %w", trackRepoErrorPrefix, err)
}
- fmt.Fprintln(cmd.w, "Finished adding new repository to be tracked in praefect database.")
+ fmt.Fprintln(w, "Finished adding new repository to be tracked in praefect database.")
correlationID := correlation.SafeRandomID()
- connections := nodeSet.Connections()[cmd.virtualStorage]
+ connections := nodeSet.Connections()[req.VirtualStorage]
for _, secondary := range secondaries {
event := datastore.ReplicationEvent{
Job: datastore.ReplicationJob{
RepositoryID: repositoryID,
Change: datastore.UpdateRepo,
- RelativePath: cmd.relativePath,
- VirtualStorage: cmd.virtualStorage,
+ RelativePath: req.RelativePath,
+ VirtualStorage: req.VirtualStorage,
SourceNodeStorage: primary,
TargetNodeStorage: secondary,
},
Meta: datastore.Params{metadatahandler.CorrelationIDKey: correlationID},
}
- if cmd.replicateImmediately {
+ if replicateImmediately {
conn, ok := connections[secondary]
if !ok {
return fmt.Errorf("%s: connection for %q not found", trackRepoErrorPrefix, secondary)
@@ -209,31 +233,32 @@ func (cmd *trackRepository) exec(ctx context.Context, logger logrus.FieldLogger,
return fmt.Errorf("%s: processing replication event %w", trackRepoErrorPrefix, err)
}
- fmt.Fprintf(cmd.w, "Finished replicating repository to %q.\n", secondary)
+ fmt.Fprintf(w, "Finished replicating repository to %q.\n", secondary)
continue
}
if _, err := queue.Enqueue(ctx, event); err != nil {
return fmt.Errorf("%s: %w", trackRepoErrorPrefix, err)
}
- fmt.Fprintf(cmd.w, "Added replication job to replicate repository to %q.\n", secondary)
+ fmt.Fprintf(w, "Added replication job to replicate repository to %q.\n", secondary)
}
return nil
}
-func (cmd *trackRepository) trackRepository(
+func (req *trackRepositoryRequest) trackRepository(
ctx context.Context,
ds *datastore.PostgresRepositoryStore,
+ w io.Writer,
primary string,
secondaries []string,
savePrimary bool,
variableReplicationFactorEnabled bool,
) (int64, error) {
- repositoryID, err := ds.ReserveRepositoryID(ctx, cmd.virtualStorage, cmd.relativePath)
+ repositoryID, err := ds.ReserveRepositoryID(ctx, req.VirtualStorage, req.RelativePath)
if err != nil {
if errors.Is(err, commonerr.ErrRepositoryAlreadyExists) {
- cmd.logger.Print("repository is already tracked in praefect database")
+ fmt.Fprintf(w, "repository is already tracked in praefect database")
return 0, nil
}
@@ -243,9 +268,9 @@ func (cmd *trackRepository) trackRepository(
if err := ds.CreateRepository(
ctx,
repositoryID,
- cmd.virtualStorage,
- cmd.relativePath,
- cmd.relativePath,
+ req.VirtualStorage,
+ req.RelativePath,
+ req.RelativePath,
primary,
nil,
secondaries,
@@ -275,28 +300,28 @@ func repositoryExists(ctx context.Context, repo *gitalypb.Repository, addr, toke
return res.GetExists(), nil
}
-func (cmd *trackRepository) authoritativeRepositoryExists(ctx context.Context, cfg config.Config, nodeName string) (bool, error) {
+func (req *trackRepositoryRequest) authoritativeRepositoryExists(ctx context.Context, cfg config.Config, w io.Writer, nodeName string) (bool, error) {
for _, vs := range cfg.VirtualStorages {
- if vs.Name != cmd.virtualStorage {
+ if vs.Name != req.VirtualStorage {
continue
}
for _, node := range vs.Nodes {
if node.Storage == nodeName {
- logger.Debugf("check if repository %q exists on gitaly %q at %q", cmd.relativePath, node.Storage, node.Address)
+ logger.Debugf("check if repository %q exists on gitaly %q at %q", req.RelativePath, node.Storage, node.Address)
repo := &gitalypb.Repository{
StorageName: node.Storage,
- RelativePath: cmd.relativePath,
+ RelativePath: req.RelativePath,
}
exists, err := repositoryExists(ctx, repo, node.Address, node.Token)
if err != nil {
- logger.WithError(err).Warnf("checking if repository exists %q, %q", node.Storage, cmd.relativePath)
+ fmt.Fprintf(w, "checking if repository exists %q, %q", node.Storage, req.RelativePath)
return false, nil
}
return exists, nil
}
}
- return false, fmt.Errorf("node %q not found", cmd.authoritativeStorage)
+ return false, fmt.Errorf("node %q not found", req.AuthoritativeStorage)
}
- return false, fmt.Errorf("virtual storage %q not found", cmd.virtualStorage)
+ return false, fmt.Errorf("virtual storage %q not found", req.VirtualStorage)
}