diff options
Diffstat (limited to 'cmd')
-rw-r--r-- | cmd/praefect/subcmd.go | 2 | ||||
-rw-r--r-- | cmd/praefect/subcmd_track_repository.go | 97 | ||||
-rw-r--r-- | cmd/praefect/subcmd_track_repository_test.go | 176 |
3 files changed, 192 insertions, 83 deletions
diff --git a/cmd/praefect/subcmd.go b/cmd/praefect/subcmd.go index f6b63ee2c..b11d155bd 100644 --- a/cmd/praefect/subcmd.go +++ b/cmd/praefect/subcmd.go @@ -40,7 +40,7 @@ var subcommands = map[string]subcmd{ acceptDatalossCmdName: &acceptDatalossSubcommand{}, setReplicationFactorCmdName: newSetReplicatioFactorSubcommand(os.Stdout), removeRepositoryCmdName: newRemoveRepository(logger, os.Stdout), - trackRepositoryCmdName: newTrackRepository(logger), + trackRepositoryCmdName: newTrackRepository(logger, os.Stdout), listUntrackedRepositoriesName: newListUntrackedRepositories(logger, os.Stdout), checkCmdName: newCheckSubcommand( os.Stdout, diff --git a/cmd/praefect/subcmd_track_repository.go b/cmd/praefect/subcmd_track_repository.go index 810df39a8..ec4e3489a 100644 --- a/cmd/praefect/subcmd_track_repository.go +++ b/cmd/praefect/subcmd_track_repository.go @@ -6,14 +6,18 @@ import ( "errors" "flag" "fmt" + "io" "math/rand" "time" "github.com/sirupsen/logrus" + "gitlab.com/gitlab-org/gitaly/v14/internal/middleware/metadatahandler" + "gitlab.com/gitlab-org/gitaly/v14/internal/praefect" "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/commonerr" "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/config" "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore" "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore/glsql" + "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/protoregistry" "gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb" "gitlab.com/gitlab-org/labkit/correlation" "google.golang.org/grpc/metadata" @@ -24,16 +28,18 @@ const ( ) type trackRepository struct { + w io.Writer logger logrus.FieldLogger virtualStorage string relativePath string authoritativeStorage string + replicateImmediately bool } var errAuthoritativeRepositoryNotExist = errors.New("authoritative repository does not exist") -func newTrackRepository(logger logrus.FieldLogger) *trackRepository { - return &trackRepository{logger: logger} +func newTrackRepository(logger logrus.FieldLogger, w io.Writer) *trackRepository { + return &trackRepository{w: w, logger: logger} } func (cmd *trackRepository) FlagSet() *flag.FlagSet { @@ -41,11 +47,14 @@ func (cmd *trackRepository) FlagSet() *flag.FlagSet { fs.StringVar(&cmd.virtualStorage, paramVirtualStorage, "", "name of the repository's virtual storage") fs.StringVar(&cmd.relativePath, paramRelativePath, "", "relative path to the repository") fs.StringVar(&cmd.authoritativeStorage, paramAuthoritativeStorage, "", "storage with the repository to consider as authoritative") + fs.BoolVar(&cmd.replicateImmediately, "replicate-immediately", false, "kick off a replication immediately") fs.Usage = func() { printfErr("Description:\n" + " This command adds a given repository to be tracked by Praefect.\n" + - " It checks if the repository exists on disk on the authoritative storage, " + - " and whether database records are absent from tracking the repository.") + " It checks if the repository exists on disk on the authoritative storage,\n" + + " and whether database records are absent from tracking the repository.\n" + + " If -replicate-immediately is used, the command will attempt to replicate the repository to the secondaries.\n" + + " Otherwise, replication jobs will be created and will be excuted eventually by Praefect itself.\n") fs.PrintDefaults() } return fs @@ -137,18 +146,79 @@ func (cmd *trackRepository) exec(ctx context.Context, logger logrus.FieldLogger, return fmt.Errorf("%s: %w", trackRepoErrorPrefix, errAuthoritativeRepositoryNotExist) } - if err := cmd.trackRepository( + nodeSet, err := praefect.DialNodes( ctx, - datastore.NewPostgresRepositoryStore(db, cfg.StorageNames()), + cfg.VirtualStorages, + protoregistry.GitalyProtoPreregistered, + nil, + nil, + nil, + ) + if err != nil { + return fmt.Errorf("%s: %w", trackRepoErrorPrefix, err) + } + defer nodeSet.Close() + + store := datastore.NewPostgresRepositoryStore(db, cfg.StorageNames()) + queue := datastore.NewPostgresReplicationEventQueue(db) + replMgr := praefect.NewReplMgr( + cmd.logger, + cfg.StorageNames(), + queue, + store, + praefect.StaticHealthChecker(cfg.StorageNames()), + nodeSet, + ) + + repositoryID, err := cmd.trackRepository( + ctx, + store, + queue, primary, secondaries, savePrimary, variableReplicationFactorEnabled, - ); err != nil { + ) + if err != nil { return fmt.Errorf("%s: %w", trackRepoErrorPrefix, err) } - logger.Debug("finished adding new repository to be tracked in praefect database.") + fmt.Fprintln(cmd.w, "Finished adding new repository to be tracked in praefect database.") + + correlationID := correlation.SafeRandomID() + connections := nodeSet.Connections()[cmd.virtualStorage] + + for _, secondary := range secondaries { + event := datastore.ReplicationEvent{ + Job: datastore.ReplicationJob{ + RepositoryID: repositoryID, + Change: datastore.UpdateRepo, + RelativePath: cmd.relativePath, + VirtualStorage: cmd.virtualStorage, + SourceNodeStorage: primary, + TargetNodeStorage: secondary, + }, + Meta: datastore.Params{metadatahandler.CorrelationIDKey: correlationID}, + } + if cmd.replicateImmediately { + conn, ok := connections[secondary] + if !ok { + return fmt.Errorf("%s: connection for %q not found", trackRepoErrorPrefix, secondary) + } + + if err := replMgr.ProcessReplicationEvent(ctx, event, conn); err != nil { + return fmt.Errorf("%s: processing replication event %w", trackRepoErrorPrefix, err) + } + + fmt.Fprintf(cmd.w, "Finished replicating repository to %q.\n", secondary) + continue + } + + if _, err := queue.Enqueue(ctx, event); err != nil { + return fmt.Errorf("%s: %w", trackRepoErrorPrefix, err) + } + fmt.Fprintf(cmd.w, "Added replication job to replicate repository to %q.\n", secondary) + } return nil } @@ -156,19 +226,20 @@ func (cmd *trackRepository) exec(ctx context.Context, logger logrus.FieldLogger, func (cmd *trackRepository) trackRepository( ctx context.Context, ds *datastore.PostgresRepositoryStore, + queue datastore.ReplicationEventQueue, primary string, secondaries []string, savePrimary bool, variableReplicationFactorEnabled bool, -) error { +) (int64, error) { repositoryID, err := ds.ReserveRepositoryID(ctx, cmd.virtualStorage, cmd.relativePath) if err != nil { if errors.Is(err, commonerr.ErrRepositoryAlreadyExists) { cmd.logger.Print("repository is already tracked in praefect database") - return nil + return 0, nil } - return fmt.Errorf("ReserveRepositoryID: %w", err) + return 0, fmt.Errorf("ReserveRepositoryID: %w", err) } if err := ds.CreateRepository( @@ -183,10 +254,10 @@ func (cmd *trackRepository) trackRepository( savePrimary, variableReplicationFactorEnabled, ); err != nil { - return fmt.Errorf("CreateRepository: %w", err) + return 0, fmt.Errorf("CreateRepository: %w", err) } - return nil + return repositoryID, nil } func repositoryExists(ctx context.Context, repo *gitalypb.Repository, addr, token string) (bool, error) { diff --git a/cmd/praefect/subcmd_track_repository_test.go b/cmd/praefect/subcmd_track_repository_test.go index 0e2667af3..ed55c1ce7 100644 --- a/cmd/praefect/subcmd_track_repository_test.go +++ b/cmd/praefect/subcmd_track_repository_test.go @@ -1,8 +1,8 @@ package main import ( + "bytes" "flag" - "fmt" "io" "path/filepath" "testing" @@ -76,6 +76,8 @@ func TestAddRepository_Exec(t *testing.T) { t.Parallel() g1Cfg := testcfg.Build(t, testcfg.WithStorages("gitaly-1")) g2Cfg := testcfg.Build(t, testcfg.WithStorages("gitaly-2")) + testcfg.BuildGitalyHooks(t, g2Cfg) + testcfg.BuildGitalySSH(t, g2Cfg) g1Srv := testserver.StartGitalyServer(t, g1Cfg, nil, setup.RegisterAll, testserver.WithDisablePraefect()) g2Srv := testserver.StartGitalyServer(t, g2Cfg, nil, setup.RegisterAll, testserver.WithDisablePraefect()) @@ -102,6 +104,10 @@ func TestAddRepository_Exec(t *testing.T) { }, }, DB: dbConf, + Failover: config.Failover{ + Enabled: true, + ElectionStrategy: config.ElectionStrategyPerRepository, + }, } gitalyCC, err := client.Dial(g1Addr, nil) @@ -126,67 +132,77 @@ func TestAddRepository_Exec(t *testing.T) { return err } - testCases := map[string]struct { - failoverConfig config.Failover - authoritativeStorage string - }{ - "sql election": { - failoverConfig: config.Failover{ - Enabled: true, - ElectionStrategy: config.ElectionStrategySQL, - }, - authoritativeStorage: "", - }, - "per repository election": { - failoverConfig: config.Failover{ - Enabled: true, - ElectionStrategy: config.ElectionStrategyPerRepository, - }, - authoritativeStorage: g1Cfg.Storages[0].Name, - }, - } - + authoritativeStorage := g1Cfg.Storages[0].Name logger := testhelper.NewDiscardingLogger(t) - for tn, tc := range testCases { - t.Run(tn, func(t *testing.T) { - addCmdConf := conf - addCmdConf.Failover = tc.failoverConfig - t.Run("ok", func(t *testing.T) { - nodeMgr, err := nodes.NewManager(testhelper.NewDiscardingLogEntry(t), addCmdConf, db.DB, nil, promtest.NewMockHistogramVec(), protoregistry.GitalyProtoPreregistered, nil, nil, nil) + t.Run("ok", func(t *testing.T) { + testCases := []struct { + relativePath string + desc string + replicateImmediately bool + expectedOutput string + }{ + { + relativePath: "path/to/test/repo1", + desc: "force replication", + replicateImmediately: true, + expectedOutput: "Finished replicating repository to \"gitaly-2\".\n", + }, + { + relativePath: "path/to/test/repo2", + desc: "do not force replication", + replicateImmediately: false, + expectedOutput: "Added replication job to replicate repository to \"gitaly-2\".\n", + }, + } + + for _, tc := range testCases { + t.Run(tc.desc, func(t *testing.T) { + nodeMgr, err := nodes.NewManager( + testhelper.NewDiscardingLogEntry(t), + conf, + db.DB, + nil, + promtest.NewMockHistogramVec(), + protoregistry.GitalyProtoPreregistered, + nil, + nil, + nil, + ) require.NoError(t, err) nodeMgr.Start(0, time.Hour) defer nodeMgr.Stop() - relativePath := fmt.Sprintf("path/to/test/repo_%s", tn) repoDS := datastore.NewPostgresRepositoryStore(db, conf.StorageNames()) rmRepoCmd := &removeRepository{ logger: logger, virtualStorage: virtualStorageName, - relativePath: relativePath, + relativePath: tc.relativePath, w: io.Discard, apply: true, } - require.NoError(t, rmRepoCmd.Exec(flag.NewFlagSet("", flag.PanicOnError), conf)) // create the repo on Gitaly without Praefect knowing - require.NoError(t, createRepoThroughGitaly1(relativePath)) - require.DirExists(t, filepath.Join(g1Cfg.Storages[0].Path, relativePath)) - require.NoDirExists(t, filepath.Join(g2Cfg.Storages[0].Path, relativePath)) + require.NoError(t, createRepoThroughGitaly1(tc.relativePath)) + require.DirExists(t, filepath.Join(g1Cfg.Storages[0].Path, tc.relativePath)) + require.NoDirExists(t, filepath.Join(g2Cfg.Storages[0].Path, tc.relativePath)) + var stdout bytes.Buffer addRepoCmd := &trackRepository{ logger: logger, virtualStorage: virtualStorageName, - relativePath: relativePath, - authoritativeStorage: tc.authoritativeStorage, + relativePath: tc.relativePath, + authoritativeStorage: authoritativeStorage, + replicateImmediately: tc.replicateImmediately, + w: &stdout, } - require.NoError(t, addRepoCmd.Exec(flag.NewFlagSet("", flag.PanicOnError), addCmdConf)) + require.NoError(t, addRepoCmd.Exec(flag.NewFlagSet("", flag.PanicOnError), conf)) as := datastore.NewAssignmentStore(db, conf.StorageNames()) - repositoryID, err := repoDS.GetRepositoryID(ctx, virtualStorageName, relativePath) + repositoryID, err := repoDS.GetRepositoryID(ctx, virtualStorageName, tc.relativePath) require.NoError(t, err) assignments, err := as.GetHostAssignments(ctx, virtualStorageName, repositoryID) @@ -195,45 +211,67 @@ func TestAddRepository_Exec(t *testing.T) { assert.Contains(t, assignments, g1Cfg.Storages[0].Name) assert.Contains(t, assignments, g2Cfg.Storages[0].Name) - exists, err := repoDS.RepositoryExists(ctx, virtualStorageName, relativePath) + exists, err := repoDS.RepositoryExists(ctx, virtualStorageName, tc.relativePath) require.NoError(t, err) assert.True(t, exists) - }) - - t.Run("repository does not exist", func(t *testing.T) { - relativePath := fmt.Sprintf("path/to/test/repo_1_%s", tn) - - cmd := &trackRepository{ - logger: testhelper.NewDiscardingLogger(t), - virtualStorage: "praefect", - relativePath: relativePath, - authoritativeStorage: tc.authoritativeStorage, + assert.Contains(t, stdout.String(), tc.expectedOutput) + + if !tc.replicateImmediately { + queue := datastore.NewPostgresReplicationEventQueue(db) + events, err := queue.Dequeue(ctx, virtualStorageName, g2Cfg.Storages[0].Name, 1) + require.NoError(t, err) + assert.Len(t, events, 1) + assert.Equal(t, tc.relativePath, events[0].Job.RelativePath) } - - assert.ErrorIs(t, cmd.Exec(flag.NewFlagSet("", flag.PanicOnError), addCmdConf), errAuthoritativeRepositoryNotExist) }) + } + }) - t.Run("records already exist", func(t *testing.T) { - relativePath := fmt.Sprintf("path/to/test/repo_2_%s", tn) + t.Run("repository does not exist", func(t *testing.T) { + relativePath := "path/to/test/repo_1" - require.NoError(t, createRepoThroughGitaly1(relativePath)) - require.DirExists(t, filepath.Join(g1Cfg.Storages[0].Path, relativePath)) - require.NoDirExists(t, filepath.Join(g2Cfg.Storages[0].Path, relativePath)) + cmd := &trackRepository{ + w: &bytes.Buffer{}, + logger: testhelper.NewDiscardingLogger(t), + virtualStorage: "praefect", + relativePath: relativePath, + authoritativeStorage: authoritativeStorage, + } - ds := datastore.NewPostgresRepositoryStore(db, conf.StorageNames()) - id, err := ds.ReserveRepositoryID(ctx, virtualStorageName, relativePath) - require.NoError(t, err) - require.NoError(t, ds.CreateRepository(ctx, id, virtualStorageName, relativePath, relativePath, g1Cfg.Storages[0].Name, nil, nil, true, true)) + assert.ErrorIs(t, cmd.Exec(flag.NewFlagSet("", flag.PanicOnError), conf), errAuthoritativeRepositoryNotExist) + }) - cmd := &trackRepository{ - logger: testhelper.NewDiscardingLogger(t), - virtualStorage: virtualStorageName, - relativePath: relativePath, - authoritativeStorage: tc.authoritativeStorage, - } + t.Run("records already exist", func(t *testing.T) { + relativePath := "path/to/test/repo_2" - assert.NoError(t, cmd.Exec(flag.NewFlagSet("", flag.PanicOnError), addCmdConf)) - }) - }) - } + require.NoError(t, createRepoThroughGitaly1(relativePath)) + require.DirExists(t, filepath.Join(g1Cfg.Storages[0].Path, relativePath)) + require.NoDirExists(t, filepath.Join(g2Cfg.Storages[0].Path, relativePath)) + + ds := datastore.NewPostgresRepositoryStore(db, conf.StorageNames()) + id, err := ds.ReserveRepositoryID(ctx, virtualStorageName, relativePath) + require.NoError(t, err) + require.NoError(t, ds.CreateRepository( + ctx, + id, + virtualStorageName, + relativePath, + relativePath, + g1Cfg.Storages[0].Name, + nil, + nil, + true, + true, + )) + + cmd := &trackRepository{ + w: &bytes.Buffer{}, + logger: testhelper.NewDiscardingLogger(t), + virtualStorage: virtualStorageName, + relativePath: relativePath, + authoritativeStorage: authoritativeStorage, + } + + assert.NoError(t, cmd.Exec(flag.NewFlagSet("", flag.PanicOnError), conf)) + }) } |