diff options
author | Toon Claes <toon@gitlab.com> | 2022-09-01 17:53:22 +0300 |
---|---|---|
committer | Toon Claes <toon@gitlab.com> | 2022-09-01 17:53:22 +0300 |
commit | 60e5a0938e3496ef4b943de506b7500c48392d9b (patch) | |
tree | 571a2da1d778f64536102d785befa8ff859d009e | |
parent | c3718f5f9a3285ad8d99b4e3383edc13b1546fda (diff) | |
parent | 69420b517a18582cfefeab4297c0ac2456b96de8 (diff) |
Merge branch 'wc-add-many-repos' into 'master'
praefect: Add `track-repositories` command
See merge request gitlab-org/gitaly!4845
-rw-r--r-- | cmd/praefect/subcmd.go | 1 | ||||
-rw-r--r-- | cmd/praefect/subcmd_track_repositories.go | 214 | ||||
-rw-r--r-- | cmd/praefect/subcmd_track_repositories_test.go | 357 | ||||
-rw-r--r-- | cmd/praefect/subcmd_track_repository.go | 95 |
4 files changed, 632 insertions, 35 deletions
diff --git a/cmd/praefect/subcmd.go b/cmd/praefect/subcmd.go index 78b3bcca4..53df26eee 100644 --- a/cmd/praefect/subcmd.go +++ b/cmd/praefect/subcmd.go @@ -42,6 +42,7 @@ var subcommands = map[string]subcmd{ setReplicationFactorCmdName: newSetReplicatioFactorSubcommand(os.Stdout), removeRepositoryCmdName: newRemoveRepository(logger, os.Stdout), trackRepositoryCmdName: newTrackRepository(logger, os.Stdout), + trackRepositoriesCmdName: newTrackRepositories(logger, os.Stdout), listUntrackedRepositoriesName: newListUntrackedRepositories(logger, os.Stdout), checkCmdName: newCheckSubcommand(os.Stdout, service.AllChecks()...), metadataCmdName: newMetadataSubcommand(os.Stdout), diff --git a/cmd/praefect/subcmd_track_repositories.go b/cmd/praefect/subcmd_track_repositories.go new file mode 100644 index 000000000..44bdb8c71 --- /dev/null +++ b/cmd/praefect/subcmd_track_repositories.go @@ -0,0 +1,214 @@ +package main + +import ( + "context" + "encoding/json" + "flag" + "fmt" + "io" + "os" + "time" + + "github.com/sirupsen/logrus" + "gitlab.com/gitlab-org/gitaly/v15/internal/praefect/config" + "gitlab.com/gitlab-org/gitaly/v15/internal/praefect/datastore" + "gitlab.com/gitlab-org/gitaly/v15/internal/praefect/datastore/glsql" + "gitlab.com/gitlab-org/labkit/correlation" +) + +const ( + trackRepositoriesCmdName = "track-repositories" + paramInputPath = "input-path" +) + +type invalidRequest struct { + reqNum int + path string + errs []error +} + +type dupPathError struct { + path string + reqNums []int +} + +func (d *dupPathError) Error() string { + return fmt.Sprintf("duplicate entries for relative_path, item #: %v", d.reqNums) +} + +type trackRepositories struct { + w io.Writer + logger logrus.FieldLogger + inputPath string + replicateImmediately bool +} + +func newTrackRepositories(logger logrus.FieldLogger, w io.Writer) *trackRepositories { + return &trackRepositories{w: w, logger: logger} +} + +func (cmd *trackRepositories) FlagSet() *flag.FlagSet { + fs := flag.NewFlagSet(trackRepositoryCmdName, flag.ExitOnError) + fs.BoolVar(&cmd.replicateImmediately, "replicate-immediately", false, "kick off a replication immediately") + fs.StringVar(&cmd.inputPath, paramInputPath, "", "path to file with details of repositories to track") + fs.Usage = func() { + printfErr("Description:\n" + + " This command allows bulk requests for repositories to be tracked by Praefect.\n" + + " The -input-path flag must be the path of a file containing the details of the repositories\n" + + " to track as a list of newline-delimited JSON objects. Each entry must contain the\n" + + " following keys:\n\n" + + " relative_path - The relative path of the repository on-disk.\n" + + " virtual_storage - The Praefect virtual storage name.\n" + + " authoritative_storage - Which storage to consider as the canonical copy of the repository.\n\n" + + " If -replicate-immediately is used, the command will attempt to replicate the repositories\n" + + " to the secondaries. Otherwise, replication jobs will be created and will be executed\n" + + " eventually by Praefect itself.\n") + fs.PrintDefaults() + } + return fs +} + +func (cmd trackRepositories) Exec(flags *flag.FlagSet, cfg config.Config) error { + switch { + case flags.NArg() > 0: + return unexpectedPositionalArgsError{Command: flags.Name()} + } + + ctx := correlation.ContextWithCorrelation(context.Background(), correlation.SafeRandomID()) + logger = cmd.logger.WithField("correlation_id", correlation.ExtractFromContext(ctx)) + + openDBCtx, cancel := context.WithTimeout(ctx, 30*time.Second) + defer cancel() + db, err := glsql.OpenDB(openDBCtx, cfg.DB) + if err != nil { + return fmt.Errorf("connect to database: %w", err) + } + defer func() { _ = db.Close() }() + store := datastore.NewPostgresRepositoryStore(db, cfg.StorageNames()) + + f, err := os.Open(cmd.inputPath) + if err != nil { + return fmt.Errorf("open input: %w", err) + } + defer f.Close() + + d := json.NewDecoder(f) + d.DisallowUnknownFields() + + fmt.Fprintf(cmd.w, "Validating repository information in %q\n", cmd.inputPath) + + var requests []trackRepositoryRequest + var repoNum int + var repoErrs []invalidRequest + pathLines := make(map[string][]int) + + // Read in and validate all requests from input file before executing. This prevents us from + // partially executing a file, which makes it difficult to tell which repos were actually + // tracked. + for d.More() { + repoNum++ + + request := trackRepositoryRequest{} + badReq := invalidRequest{reqNum: repoNum} + + if err := d.Decode(&request); err != nil { + badReq.errs = append(badReq.errs, err) + repoErrs = append(repoErrs, badReq) + + // Invalid request, nothing to validate. + continue + } + + if request.RelativePath == "" { + badReq.errs = append(badReq.errs, requiredParameterError(paramRelativePath)) + } + badReq.path = request.RelativePath + + if request.VirtualStorage == "" { + badReq.errs = append(badReq.errs, requiredParameterError(paramVirtualStorage)) + } + if request.AuthoritativeStorage == "" { + badReq.errs = append(badReq.errs, requiredParameterError(paramAuthoritativeStorage)) + } + if len(badReq.errs) > 0 { + repoErrs = append(repoErrs, badReq) + + // Incomplete request, no further validation possible. + continue + } + + // Repo paths are globally unique, any attempt to add the same path multiple virtual storages + // is invalid and must be rejected. + prevLines, exists := pathLines[request.RelativePath] + if exists { + badReq.errs = append(badReq.errs, &dupPathError{path: request.RelativePath}) + repoErrs = append(repoErrs, badReq) + + prevLines = append(prevLines, repoNum) + pathLines[request.RelativePath] = prevLines + + // We've already checked this path, no need to run further checks. + continue + } + pathLines[request.RelativePath] = []int{repoNum} + + repoInDB, err := store.RepositoryExists(ctx, request.VirtualStorage, request.RelativePath) + if err != nil { + // Bail out if we're having trouble contacting the DB, nothing is going to work if this fails. + return fmt.Errorf("checking database: %w", err) + } + if repoInDB { + badReq.errs = append(badReq.errs, fmt.Errorf("repository is already tracked by Praefect")) + repoErrs = append(repoErrs, badReq) + // Repo already in Praefect DB, we can skip it. + continue + } + + authoritativeRepoExists, err := request.authoritativeRepositoryExists(ctx, cfg, cmd.w, request.AuthoritativeStorage) + if err != nil { + badReq.errs = append(badReq.errs, fmt.Errorf("checking repository on disk: %w", err)) + } else if !authoritativeRepoExists { + badReq.errs = append(badReq.errs, fmt.Errorf("not a valid git repository")) + } + + if len(badReq.errs) > 0 { + repoErrs = append(repoErrs, badReq) + continue + } + requests = append(requests, request) + } + + if len(repoErrs) > 0 { + printInvalidRequests(cmd.w, repoErrs, pathLines, cmd.inputPath) + return fmt.Errorf("invalid entries found, aborting") + } + if len(requests) == 0 { + return fmt.Errorf("no repository information found in %q", cmd.inputPath) + } + + fmt.Fprintf(cmd.w, "All repository details are correctly formatted\n") + fmt.Fprintf(cmd.w, "Tracking %v repositories in Praefect DB...\n", repoNum) + for _, request := range requests { + if err := request.execRequest(ctx, db, cfg, cmd.w, logger, cmd.replicateImmediately); err != nil { + return fmt.Errorf("tracking repository %q: %w", request.RelativePath, err) + } + } + + return nil +} + +func printInvalidRequests(w io.Writer, repoErrs []invalidRequest, pathLines map[string][]int, inputPath string) { + fmt.Fprintf(w, "Found %v invalid request(s) in %q:\n", len(repoErrs), inputPath) + + for _, l := range repoErrs { + fmt.Fprintf(w, " item #: %v, relative_path: %q\n", l.reqNum, l.path) + for _, err := range l.errs { + if dup, ok := err.(*dupPathError); ok { + // The complete set of duplicate reqNums won't be known until input is + // fully processed, fetch them now. + err = &dupPathError{path: dup.path, reqNums: pathLines[dup.path]} + } + fmt.Fprintf(w, " %v\n", err) + } + } +} diff --git a/cmd/praefect/subcmd_track_repositories_test.go b/cmd/praefect/subcmd_track_repositories_test.go new file mode 100644 index 000000000..cd8e97a1b --- /dev/null +++ b/cmd/praefect/subcmd_track_repositories_test.go @@ -0,0 +1,357 @@ +//go:build !gitaly_test_sha256 + +package main + +import ( + "bytes" + "encoding/json" + "flag" + "fmt" + "os" + "path/filepath" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "gitlab.com/gitlab-org/gitaly/v15/client" + "gitlab.com/gitlab-org/gitaly/v15/internal/gitaly/service/setup" + "gitlab.com/gitlab-org/gitaly/v15/internal/praefect/config" + "gitlab.com/gitlab-org/gitaly/v15/internal/praefect/datastore" + "gitlab.com/gitlab-org/gitaly/v15/internal/praefect/nodes" + "gitlab.com/gitlab-org/gitaly/v15/internal/praefect/protoregistry" + "gitlab.com/gitlab-org/gitaly/v15/internal/testhelper" + "gitlab.com/gitlab-org/gitaly/v15/internal/testhelper/promtest" + "gitlab.com/gitlab-org/gitaly/v15/internal/testhelper/testcfg" + "gitlab.com/gitlab-org/gitaly/v15/internal/testhelper/testdb" + "gitlab.com/gitlab-org/gitaly/v15/internal/testhelper/testserver" + "gitlab.com/gitlab-org/gitaly/v15/proto/go/gitalypb" +) + +func TestAddRepositories_FlagSet(t *testing.T) { + t.Parallel() + cmd := &trackRepositories{} + fs := cmd.FlagSet() + require.NoError(t, fs.Parse([]string{"--input-path", "/dev/stdin", "--replicate-immediately", "true"})) + require.Equal(t, "/dev/stdin", cmd.inputPath) + require.Equal(t, true, cmd.replicateImmediately) +} + +func TestAddRepositories_Exec_invalidInput(t *testing.T) { + t.Parallel() + g1Cfg := testcfg.Build(t, testcfg.WithStorages("gitaly-1")) + g2Cfg := testcfg.Build(t, testcfg.WithStorages("gitaly-2")) + + g1Srv := testserver.StartGitalyServer(t, g1Cfg, nil, setup.RegisterAll, testserver.WithDisablePraefect()) + g2Srv := testserver.StartGitalyServer(t, g2Cfg, nil, setup.RegisterAll, testserver.WithDisablePraefect()) + defer g2Srv.Shutdown() + defer g1Srv.Shutdown() + g1Addr := g1Srv.Address() + + db := testdb.New(t) + dbConf := testdb.GetConfig(t, db.Name) + + virtualStorageName := "praefect" + conf := config.Config{ + AllowLegacyElectors: true, + SocketPath: testhelper.GetTemporaryGitalySocketFileName(t), + VirtualStorages: []*config.VirtualStorage{ + { + Name: virtualStorageName, + Nodes: []*config.Node{ + {Storage: g1Cfg.Storages[0].Name, Address: g1Addr}, + {Storage: g2Cfg.Storages[0].Name, Address: g2Srv.Address()}, + }, + DefaultReplicationFactor: 2, + }, + }, + DB: dbConf, + Failover: config.Failover{ + Enabled: true, + ElectionStrategy: config.ElectionStrategyPerRepository, + }, + } + rs := datastore.NewPostgresRepositoryStore(db, nil) + ctx := testhelper.Context(t) + inputFile := "input_file" + + trackRepo := func(relativePath string) error { + repositoryID, err := rs.ReserveRepositoryID(ctx, virtualStorageName, relativePath) + if err != nil { + return err + } + return rs.CreateRepository( + ctx, + repositoryID, + virtualStorageName, + relativePath, + relativePath, + g1Cfg.Storages[0].Name, + nil, + nil, + false, + false, + ) + } + + invalidEntryErr := "invalid entries found, aborting" + + testCases := []struct { + input string + desc string + expectedOutput string + expectedError string + trackedPath string + }{ + { + input: "", + desc: "empty input", + expectedError: "no repository information found", + }, + { + input: `{"foo":"bar"}`, + desc: "unexpected key in JSON", + expectedOutput: `json: unknown field "foo"`, + expectedError: invalidEntryErr, + }, + { + input: `{"virtual_storage":"praefect","authoritative_storage":"gitaly-1"}`, + desc: "missing path", + expectedOutput: `"repository" is a required parameter`, + expectedError: invalidEntryErr, + }, + { + input: `{"virtual_storage":"foo","relative_path":"bar","authoritative_storage":"gitaly-1"}`, + desc: "invalid virtual storage", + expectedOutput: `virtual storage "foo" not found`, + expectedError: invalidEntryErr, + }, + { + input: `{"relative_path":"not_a_repo","virtual_storage":"praefect","authoritative_storage":"gitaly-1"}`, + desc: "repo does not exist", + expectedOutput: "not a valid git repository", + expectedError: invalidEntryErr, + }, + { + input: `{"virtual_storage":"praefect","relative_path":"duplicate","authoritative_storage":"gitaly-1"} +{"virtual_storage":"praefect","relative_path":"duplicate","authoritative_storage":"gitaly-1"}`, + desc: "duplicate path", + expectedOutput: "duplicate entries for relative_path", + expectedError: invalidEntryErr, + }, + { + input: `{"relative_path":"already_tracked","virtual_storage":"praefect","authoritative_storage":"gitaly-1"}`, + desc: "repo is already tracked", + expectedOutput: "repository is already tracked by Praefect", + expectedError: invalidEntryErr, + trackedPath: "already_tracked", + }, + } + for _, tc := range testCases { + t.Run(tc.desc, func(t *testing.T) { + nodeMgr, err := nodes.NewManager( + testhelper.NewDiscardingLogEntry(t), + conf, + db.DB, + nil, + promtest.NewMockHistogramVec(), + protoregistry.GitalyProtoPreregistered, + nil, + nil, + nil, + ) + require.NoError(t, err) + nodeMgr.Start(0, time.Hour) + defer nodeMgr.Stop() + + tempDir := testhelper.TempDir(t) + f, err := os.Create(filepath.Join(tempDir, inputFile)) + require.NoError(t, err) + _, err = f.Write([]byte(tc.input)) + require.NoError(t, err) + require.NoError(t, f.Close()) + + var stdout bytes.Buffer + + if tc.trackedPath != "" { + require.NoError(t, trackRepo(tc.trackedPath)) + } + + addReposCmd := &trackRepositories{ + inputPath: filepath.Join(tempDir, inputFile), + replicateImmediately: true, + logger: logger, + w: &stdout, + } + err = addReposCmd.Exec(flag.NewFlagSet("", flag.PanicOnError), conf) + require.Error(t, err) + + if tc.expectedOutput != "" { + require.Contains(t, stdout.String(), tc.expectedOutput) + } + require.Contains(t, err.Error(), tc.expectedError) + }) + } +} + +func TestAddRepositories_Exec(t *testing.T) { + t.Parallel() + g1Cfg := testcfg.Build(t, testcfg.WithStorages("gitaly-1")) + g2Cfg := testcfg.Build(t, testcfg.WithStorages("gitaly-2")) + testcfg.BuildGitalyHooks(t, g2Cfg) + testcfg.BuildGitalySSH(t, g2Cfg) + + g1Srv := testserver.StartGitalyServer(t, g1Cfg, nil, setup.RegisterAll, testserver.WithDisablePraefect()) + g2Srv := testserver.StartGitalyServer(t, g2Cfg, nil, setup.RegisterAll, testserver.WithDisablePraefect()) + defer g2Srv.Shutdown() + defer g1Srv.Shutdown() + + g1Addr := g1Srv.Address() + + db := testdb.New(t) + dbConf := testdb.GetConfig(t, db.Name) + + virtualStorageName := "praefect" + conf := config.Config{ + AllowLegacyElectors: true, + SocketPath: testhelper.GetTemporaryGitalySocketFileName(t), + VirtualStorages: []*config.VirtualStorage{ + { + Name: virtualStorageName, + Nodes: []*config.Node{ + {Storage: g1Cfg.Storages[0].Name, Address: g1Addr}, + {Storage: g2Cfg.Storages[0].Name, Address: g2Srv.Address()}, + }, + DefaultReplicationFactor: 2, + }, + }, + DB: dbConf, + Failover: config.Failover{ + Enabled: true, + ElectionStrategy: config.ElectionStrategyPerRepository, + }, + } + + gitalyCC, err := client.Dial(g1Addr, nil) + require.NoError(t, err) + defer func() { require.NoError(t, gitalyCC.Close()) }() + ctx := testhelper.Context(t) + + gitaly1RepositoryClient := gitalypb.NewRepositoryServiceClient(gitalyCC) + + createRepoThroughGitaly1 := func(relativePath string) error { + _, err := gitaly1RepositoryClient.CreateRepository( + ctx, + &gitalypb.CreateRepositoryRequest{ + Repository: &gitalypb.Repository{ + StorageName: g1Cfg.Storages[0].Name, + RelativePath: relativePath, + }, + }) + return err + } + + authoritativeStorage := g1Cfg.Storages[0].Name + logger := testhelper.NewDiscardingLogger(t) + + t.Run("ok", func(t *testing.T) { + testCases := []struct { + relativePaths []string + desc string + replicateImmediately bool + expectedOutput string + }{ + { + relativePaths: []string{"path/to/test/repo1", "path/to/test/repo2"}, + desc: "immediate replication", + replicateImmediately: true, + expectedOutput: "Finished replicating repository to \"gitaly-2\".\n", + }, + { + relativePaths: []string{"path/to/test/repo3", "path/to/test/repo4"}, + desc: "no immediate replication", + replicateImmediately: false, + expectedOutput: "Added replication job to replicate repository to \"gitaly-2\".\n", + }, + } + + for _, tc := range testCases { + t.Run(tc.desc, func(t *testing.T) { + nodeMgr, err := nodes.NewManager( + testhelper.NewDiscardingLogEntry(t), + conf, + db.DB, + nil, + promtest.NewMockHistogramVec(), + protoregistry.GitalyProtoPreregistered, + nil, + nil, + nil, + ) + require.NoError(t, err) + nodeMgr.Start(0, time.Hour) + defer nodeMgr.Stop() + + tempDir := testhelper.TempDir(t) + input, err := os.Create(filepath.Join(tempDir, "input")) + require.NoError(t, err) + + repoDS := datastore.NewPostgresRepositoryStore(db, conf.StorageNames()) + for _, path := range tc.relativePaths { + exists, err := repoDS.RepositoryExists(ctx, virtualStorageName, path) + require.NoError(t, err) + require.False(t, exists) + + // create the repo on Gitaly without Praefect knowing + require.NoError(t, createRepoThroughGitaly1(path)) + require.DirExists(t, filepath.Join(g1Cfg.Storages[0].Path, path)) + require.NoDirExists(t, filepath.Join(g2Cfg.Storages[0].Path, path)) + + // Write repo details to input file + repoEntry, err := json.Marshal(trackRepositoryRequest{RelativePath: path, VirtualStorage: virtualStorageName, AuthoritativeStorage: authoritativeStorage}) + require.NoError(t, err) + fmt.Fprintf(input, string(repoEntry)+"\n") + } + + var stdout bytes.Buffer + + addRepoCmd := &trackRepositories{ + inputPath: filepath.Join(tempDir, "input"), + replicateImmediately: tc.replicateImmediately, + logger: logger, + w: &stdout, + } + + require.NoError(t, addRepoCmd.Exec(flag.NewFlagSet("", flag.PanicOnError), conf)) + assert.Contains(t, stdout.String(), tc.expectedOutput) + + as := datastore.NewAssignmentStore(db, conf.StorageNames()) + + for _, path := range tc.relativePaths { + repositoryID, err := repoDS.GetRepositoryID(ctx, virtualStorageName, path) + require.NoError(t, err) + + assignments, err := as.GetHostAssignments(ctx, virtualStorageName, repositoryID) + require.NoError(t, err) + require.Len(t, assignments, 2) + assert.Contains(t, assignments, g1Cfg.Storages[0].Name) + assert.Contains(t, assignments, g2Cfg.Storages[0].Name) + + exists, err := repoDS.RepositoryExists(ctx, virtualStorageName, path) + require.NoError(t, err) + assert.True(t, exists) + + if !tc.replicateImmediately { + queue := datastore.NewPostgresReplicationEventQueue(db) + events, err := queue.Dequeue(ctx, virtualStorageName, g2Cfg.Storages[0].Name, 1) + require.NoError(t, err) + assert.Len(t, events, 1) + assert.Equal(t, path, events[0].Job.RelativePath) + } else { + require.DirExists(t, filepath.Join(g2Cfg.Storages[0].Path, path)) + } + } + }) + } + }) +} diff --git a/cmd/praefect/subcmd_track_repository.go b/cmd/praefect/subcmd_track_repository.go index e54ff3ca1..8fc4fd0a6 100644 --- a/cmd/praefect/subcmd_track_repository.go +++ b/cmd/praefect/subcmd_track_repository.go @@ -36,6 +36,12 @@ type trackRepository struct { replicateImmediately bool } +type trackRepositoryRequest struct { + RelativePath string `json:"relative_path"` + VirtualStorage string `json:"virtual_storage"` + AuthoritativeStorage string `json:"authoritative_storage"` +} + var errAuthoritativeRepositoryNotExist = errors.New("authoritative repository does not exist") func newTrackRepository(logger logrus.FieldLogger, w io.Writer) *trackRepository { @@ -54,7 +60,7 @@ func (cmd *trackRepository) FlagSet() *flag.FlagSet { " It checks if the repository exists on disk on the authoritative storage,\n" + " and whether database records are absent from tracking the repository.\n" + " If -replicate-immediately is used, the command will attempt to replicate the repository to the secondaries.\n" + - " Otherwise, replication jobs will be created and will be excuted eventually by Praefect itself.\n") + " Otherwise, replication jobs will be created and will be executed eventually by Praefect itself.\n") fs.PrintDefaults() } return fs @@ -75,7 +81,6 @@ func (cmd trackRepository) Exec(flags *flag.FlagSet, cfg config.Config) error { } ctx := correlation.ContextWithCorrelation(context.Background(), correlation.SafeRandomID()) - logger := cmd.logger.WithField("correlation_id", correlation.ExtractFromContext(ctx)) openDBCtx, cancel := context.WithTimeout(ctx, 30*time.Second) defer cancel() @@ -85,16 +90,34 @@ func (cmd trackRepository) Exec(flags *flag.FlagSet, cfg config.Config) error { } defer func() { _ = db.Close() }() - return cmd.exec(ctx, logger, db, cfg) + return cmd.exec(ctx, db, cfg) } const trackRepoErrorPrefix = "attempting to track repository in praefect database" -func (cmd *trackRepository) exec(ctx context.Context, logger logrus.FieldLogger, db *sql.DB, cfg config.Config) error { +func (cmd *trackRepository) exec(ctx context.Context, db *sql.DB, cfg config.Config) error { + logger := cmd.logger.WithField("correlation_id", correlation.ExtractFromContext(ctx)) + + req := trackRepositoryRequest{ + RelativePath: cmd.relativePath, + AuthoritativeStorage: cmd.authoritativeStorage, + VirtualStorage: cmd.virtualStorage, + } + + return req.execRequest(ctx, db, cfg, cmd.w, logger, cmd.replicateImmediately) +} + +func (req *trackRepositoryRequest) execRequest(ctx context.Context, + db *sql.DB, + cfg config.Config, + w io.Writer, + logger logrus.FieldLogger, + replicateImmediately bool, +) error { logger.WithFields(logrus.Fields{ - "virtual_storage": cmd.virtualStorage, - "relative_path": cmd.relativePath, - "authoritative_storage": cmd.authoritativeStorage, + "virtual_storage": req.VirtualStorage, + "relative_path": req.RelativePath, + "authoritative_storage": req.AuthoritativeStorage, }).Debug("track repository") var primary string @@ -102,12 +125,12 @@ func (cmd *trackRepository) exec(ctx context.Context, logger logrus.FieldLogger, var variableReplicationFactorEnabled, savePrimary bool if cfg.Failover.ElectionStrategy == config.ElectionStrategyPerRepository { savePrimary = true - primary = cmd.authoritativeStorage + primary = req.AuthoritativeStorage for _, vs := range cfg.VirtualStorages { - if vs.Name == cmd.virtualStorage { + if vs.Name == req.VirtualStorage { for _, node := range vs.Nodes { - if node.Storage == cmd.authoritativeStorage { + if node.Storage == req.AuthoritativeStorage { continue } secondaries = append(secondaries, node.Storage) @@ -116,7 +139,7 @@ func (cmd *trackRepository) exec(ctx context.Context, logger logrus.FieldLogger, } r := rand.New(rand.NewSource(time.Now().UnixNano())) - replicationFactor := cfg.DefaultReplicationFactors()[cmd.virtualStorage] + replicationFactor := cfg.DefaultReplicationFactors()[req.VirtualStorage] if replicationFactor > 0 { variableReplicationFactorEnabled = true @@ -129,7 +152,7 @@ func (cmd *trackRepository) exec(ctx context.Context, logger logrus.FieldLogger, } } else { savePrimary = false - if err := db.QueryRowContext(ctx, `SELECT node_name FROM shard_primaries WHERE shard_name = $1 AND demoted = 'false'`, cmd.virtualStorage).Scan(&primary); err != nil { + if err := db.QueryRowContext(ctx, `SELECT node_name FROM shard_primaries WHERE shard_name = $1 AND demoted = 'false'`, req.VirtualStorage).Scan(&primary); err != nil { if errors.Is(err, sql.ErrNoRows) { return fmt.Errorf("%s: no primaries found", trackRepoErrorPrefix) } @@ -137,7 +160,7 @@ func (cmd *trackRepository) exec(ctx context.Context, logger logrus.FieldLogger, } } - authoritativeRepoExists, err := cmd.authoritativeRepositoryExists(ctx, cfg, primary) + authoritativeRepoExists, err := req.authoritativeRepositoryExists(ctx, cfg, w, primary) if err != nil { return fmt.Errorf("%s: %w", trackRepoErrorPrefix, err) } @@ -162,7 +185,7 @@ func (cmd *trackRepository) exec(ctx context.Context, logger logrus.FieldLogger, store := datastore.NewPostgresRepositoryStore(db, cfg.StorageNames()) queue := datastore.NewPostgresReplicationEventQueue(db) replMgr := praefect.NewReplMgr( - cmd.logger, + logger, cfg.StorageNames(), queue, store, @@ -170,9 +193,10 @@ func (cmd *trackRepository) exec(ctx context.Context, logger logrus.FieldLogger, nodeSet, ) - repositoryID, err := cmd.trackRepository( + repositoryID, err := req.trackRepository( ctx, store, + w, primary, secondaries, savePrimary, @@ -182,24 +206,24 @@ func (cmd *trackRepository) exec(ctx context.Context, logger logrus.FieldLogger, return fmt.Errorf("%s: %w", trackRepoErrorPrefix, err) } - fmt.Fprintln(cmd.w, "Finished adding new repository to be tracked in praefect database.") + fmt.Fprintln(w, "Finished adding new repository to be tracked in praefect database.") correlationID := correlation.SafeRandomID() - connections := nodeSet.Connections()[cmd.virtualStorage] + connections := nodeSet.Connections()[req.VirtualStorage] for _, secondary := range secondaries { event := datastore.ReplicationEvent{ Job: datastore.ReplicationJob{ RepositoryID: repositoryID, Change: datastore.UpdateRepo, - RelativePath: cmd.relativePath, - VirtualStorage: cmd.virtualStorage, + RelativePath: req.RelativePath, + VirtualStorage: req.VirtualStorage, SourceNodeStorage: primary, TargetNodeStorage: secondary, }, Meta: datastore.Params{metadatahandler.CorrelationIDKey: correlationID}, } - if cmd.replicateImmediately { + if replicateImmediately { conn, ok := connections[secondary] if !ok { return fmt.Errorf("%s: connection for %q not found", trackRepoErrorPrefix, secondary) @@ -209,31 +233,32 @@ func (cmd *trackRepository) exec(ctx context.Context, logger logrus.FieldLogger, return fmt.Errorf("%s: processing replication event %w", trackRepoErrorPrefix, err) } - fmt.Fprintf(cmd.w, "Finished replicating repository to %q.\n", secondary) + fmt.Fprintf(w, "Finished replicating repository to %q.\n", secondary) continue } if _, err := queue.Enqueue(ctx, event); err != nil { return fmt.Errorf("%s: %w", trackRepoErrorPrefix, err) } - fmt.Fprintf(cmd.w, "Added replication job to replicate repository to %q.\n", secondary) + fmt.Fprintf(w, "Added replication job to replicate repository to %q.\n", secondary) } return nil } -func (cmd *trackRepository) trackRepository( +func (req *trackRepositoryRequest) trackRepository( ctx context.Context, ds *datastore.PostgresRepositoryStore, + w io.Writer, primary string, secondaries []string, savePrimary bool, variableReplicationFactorEnabled bool, ) (int64, error) { - repositoryID, err := ds.ReserveRepositoryID(ctx, cmd.virtualStorage, cmd.relativePath) + repositoryID, err := ds.ReserveRepositoryID(ctx, req.VirtualStorage, req.RelativePath) if err != nil { if errors.Is(err, commonerr.ErrRepositoryAlreadyExists) { - cmd.logger.Print("repository is already tracked in praefect database") + fmt.Fprintf(w, "repository is already tracked in praefect database") return 0, nil } @@ -243,9 +268,9 @@ func (cmd *trackRepository) trackRepository( if err := ds.CreateRepository( ctx, repositoryID, - cmd.virtualStorage, - cmd.relativePath, - cmd.relativePath, + req.VirtualStorage, + req.RelativePath, + req.RelativePath, primary, nil, secondaries, @@ -275,28 +300,28 @@ func repositoryExists(ctx context.Context, repo *gitalypb.Repository, addr, toke return res.GetExists(), nil } -func (cmd *trackRepository) authoritativeRepositoryExists(ctx context.Context, cfg config.Config, nodeName string) (bool, error) { +func (req *trackRepositoryRequest) authoritativeRepositoryExists(ctx context.Context, cfg config.Config, w io.Writer, nodeName string) (bool, error) { for _, vs := range cfg.VirtualStorages { - if vs.Name != cmd.virtualStorage { + if vs.Name != req.VirtualStorage { continue } for _, node := range vs.Nodes { if node.Storage == nodeName { - logger.Debugf("check if repository %q exists on gitaly %q at %q", cmd.relativePath, node.Storage, node.Address) + logger.Debugf("check if repository %q exists on gitaly %q at %q", req.RelativePath, node.Storage, node.Address) repo := &gitalypb.Repository{ StorageName: node.Storage, - RelativePath: cmd.relativePath, + RelativePath: req.RelativePath, } exists, err := repositoryExists(ctx, repo, node.Address, node.Token) if err != nil { - logger.WithError(err).Warnf("checking if repository exists %q, %q", node.Storage, cmd.relativePath) + fmt.Fprintf(w, "checking if repository exists %q, %q", node.Storage, req.RelativePath) return false, nil } return exists, nil } } - return false, fmt.Errorf("node %q not found", cmd.authoritativeStorage) + return false, fmt.Errorf("node %q not found", req.AuthoritativeStorage) } - return false, fmt.Errorf("virtual storage %q not found", cmd.virtualStorage) + return false, fmt.Errorf("virtual storage %q not found", req.VirtualStorage) } |