diff options
-rw-r--r-- | cmd/praefect/subcmd_track_repository.go | 28 | ||||
-rw-r--r-- | cmd/praefect/subcmd_track_repository_test.go | 211 |
2 files changed, 135 insertions, 104 deletions
diff --git a/cmd/praefect/subcmd_track_repository.go b/cmd/praefect/subcmd_track_repository.go index 810df39a8..e55fe3bba 100644 --- a/cmd/praefect/subcmd_track_repository.go +++ b/cmd/praefect/subcmd_track_repository.go @@ -10,6 +10,7 @@ import ( "time" "github.com/sirupsen/logrus" + "gitlab.com/gitlab-org/gitaly/v14/internal/middleware/metadatahandler" "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/commonerr" "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/config" "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore" @@ -137,9 +138,13 @@ func (cmd *trackRepository) exec(ctx context.Context, logger logrus.FieldLogger, return fmt.Errorf("%s: %w", trackRepoErrorPrefix, errAuthoritativeRepositoryNotExist) } + store := datastore.NewPostgresRepositoryStore(db, cfg.StorageNames()) + queue := datastore.NewPostgresReplicationEventQueue(db) + if err := cmd.trackRepository( ctx, - datastore.NewPostgresRepositoryStore(db, cfg.StorageNames()), + store, + queue, primary, secondaries, savePrimary, @@ -156,6 +161,7 @@ func (cmd *trackRepository) exec(ctx context.Context, logger logrus.FieldLogger, func (cmd *trackRepository) trackRepository( ctx context.Context, ds *datastore.PostgresRepositoryStore, + queue datastore.ReplicationEventQueue, primary string, secondaries []string, savePrimary bool, @@ -186,6 +192,26 @@ func (cmd *trackRepository) trackRepository( return fmt.Errorf("CreateRepository: %w", err) } + correlationID := correlation.SafeRandomID() + for _, secondary := range secondaries { + event := datastore.ReplicationEvent{ + Job: datastore.ReplicationJob{ + RepositoryID: repositoryID, + Change: datastore.UpdateRepo, + RelativePath: cmd.relativePath, + VirtualStorage: cmd.virtualStorage, + SourceNodeStorage: primary, + TargetNodeStorage: secondary, + }, + Meta: datastore.Params{metadatahandler.CorrelationIDKey: correlationID}, + } + + _, err := queue.Enqueue(ctx, event) + if err != nil { + return err + } + } + return nil } diff --git a/cmd/praefect/subcmd_track_repository_test.go b/cmd/praefect/subcmd_track_repository_test.go index 0e2667af3..be5377b4e 100644 --- a/cmd/praefect/subcmd_track_repository_test.go +++ b/cmd/praefect/subcmd_track_repository_test.go @@ -2,7 +2,6 @@ package main import ( "flag" - "fmt" "io" "path/filepath" "testing" @@ -102,6 +101,10 @@ func TestAddRepository_Exec(t *testing.T) { }, }, DB: dbConf, + Failover: config.Failover{ + Enabled: true, + ElectionStrategy: config.ElectionStrategyPerRepository, + }, } gitalyCC, err := client.Dial(g1Addr, nil) @@ -126,114 +129,116 @@ func TestAddRepository_Exec(t *testing.T) { return err } - testCases := map[string]struct { - failoverConfig config.Failover - authoritativeStorage string - }{ - "sql election": { - failoverConfig: config.Failover{ - Enabled: true, - ElectionStrategy: config.ElectionStrategySQL, - }, - authoritativeStorage: "", - }, - "per repository election": { - failoverConfig: config.Failover{ - Enabled: true, - ElectionStrategy: config.ElectionStrategyPerRepository, - }, - authoritativeStorage: g1Cfg.Storages[0].Name, - }, - } - + authoritativeStorage := g1Cfg.Storages[0].Name logger := testhelper.NewDiscardingLogger(t) - for tn, tc := range testCases { - t.Run(tn, func(t *testing.T) { - addCmdConf := conf - addCmdConf.Failover = tc.failoverConfig - - t.Run("ok", func(t *testing.T) { - nodeMgr, err := nodes.NewManager(testhelper.NewDiscardingLogEntry(t), addCmdConf, db.DB, nil, promtest.NewMockHistogramVec(), protoregistry.GitalyProtoPreregistered, nil, nil, nil) - require.NoError(t, err) - nodeMgr.Start(0, time.Hour) - defer nodeMgr.Stop() - - relativePath := fmt.Sprintf("path/to/test/repo_%s", tn) - repoDS := datastore.NewPostgresRepositoryStore(db, conf.StorageNames()) - - rmRepoCmd := &removeRepository{ - logger: logger, - virtualStorage: virtualStorageName, - relativePath: relativePath, - w: io.Discard, - apply: true, - } - - require.NoError(t, rmRepoCmd.Exec(flag.NewFlagSet("", flag.PanicOnError), conf)) - - // create the repo on Gitaly without Praefect knowing - require.NoError(t, createRepoThroughGitaly1(relativePath)) - require.DirExists(t, filepath.Join(g1Cfg.Storages[0].Path, relativePath)) - require.NoDirExists(t, filepath.Join(g2Cfg.Storages[0].Path, relativePath)) - - addRepoCmd := &trackRepository{ - logger: logger, - virtualStorage: virtualStorageName, - relativePath: relativePath, - authoritativeStorage: tc.authoritativeStorage, - } - - require.NoError(t, addRepoCmd.Exec(flag.NewFlagSet("", flag.PanicOnError), addCmdConf)) - as := datastore.NewAssignmentStore(db, conf.StorageNames()) - - repositoryID, err := repoDS.GetRepositoryID(ctx, virtualStorageName, relativePath) - require.NoError(t, err) - - assignments, err := as.GetHostAssignments(ctx, virtualStorageName, repositoryID) - require.NoError(t, err) - require.Len(t, assignments, 2) - assert.Contains(t, assignments, g1Cfg.Storages[0].Name) - assert.Contains(t, assignments, g2Cfg.Storages[0].Name) - - exists, err := repoDS.RepositoryExists(ctx, virtualStorageName, relativePath) - require.NoError(t, err) - assert.True(t, exists) - }) - t.Run("repository does not exist", func(t *testing.T) { - relativePath := fmt.Sprintf("path/to/test/repo_1_%s", tn) - - cmd := &trackRepository{ - logger: testhelper.NewDiscardingLogger(t), - virtualStorage: "praefect", - relativePath: relativePath, - authoritativeStorage: tc.authoritativeStorage, - } + t.Run("ok", func(t *testing.T) { + nodeMgr, err := nodes.NewManager( + testhelper.NewDiscardingLogEntry(t), + conf, + db.DB, + nil, + promtest.NewMockHistogramVec(), + protoregistry.GitalyProtoPreregistered, + nil, + nil, + nil, + ) + require.NoError(t, err) + nodeMgr.Start(0, time.Hour) + defer nodeMgr.Stop() + + relativePath := "path/to/test/repo" + repoDS := datastore.NewPostgresRepositoryStore(db, conf.StorageNames()) + + rmRepoCmd := &removeRepository{ + logger: logger, + virtualStorage: virtualStorageName, + relativePath: relativePath, + w: io.Discard, + apply: true, + } + + require.NoError(t, rmRepoCmd.Exec(flag.NewFlagSet("", flag.PanicOnError), conf)) + + // create the repo on Gitaly without Praefect knowing + require.NoError(t, createRepoThroughGitaly1(relativePath)) + require.DirExists(t, filepath.Join(g1Cfg.Storages[0].Path, relativePath)) + require.NoDirExists(t, filepath.Join(g2Cfg.Storages[0].Path, relativePath)) + + addRepoCmd := &trackRepository{ + logger: logger, + virtualStorage: virtualStorageName, + relativePath: relativePath, + authoritativeStorage: authoritativeStorage, + } + + require.NoError(t, addRepoCmd.Exec(flag.NewFlagSet("", flag.PanicOnError), conf)) + as := datastore.NewAssignmentStore(db, conf.StorageNames()) + + repositoryID, err := repoDS.GetRepositoryID(ctx, virtualStorageName, relativePath) + require.NoError(t, err) + + assignments, err := as.GetHostAssignments(ctx, virtualStorageName, repositoryID) + require.NoError(t, err) + require.Len(t, assignments, 2) + assert.Contains(t, assignments, g1Cfg.Storages[0].Name) + assert.Contains(t, assignments, g2Cfg.Storages[0].Name) + + exists, err := repoDS.RepositoryExists(ctx, virtualStorageName, relativePath) + require.NoError(t, err) + assert.True(t, exists) + + queue := datastore.NewPostgresReplicationEventQueue(db) + events, err := queue.Dequeue(ctx, virtualStorageName, g2Cfg.Storages[0].Name, 1) + require.NoError(t, err) + assert.Len(t, events, 1) + assert.Equal(t, relativePath, events[0].Job.RelativePath) + }) - assert.ErrorIs(t, cmd.Exec(flag.NewFlagSet("", flag.PanicOnError), addCmdConf), errAuthoritativeRepositoryNotExist) - }) + t.Run("repository does not exist", func(t *testing.T) { + relativePath := "path/to/test/repo_1" - t.Run("records already exist", func(t *testing.T) { - relativePath := fmt.Sprintf("path/to/test/repo_2_%s", tn) + cmd := &trackRepository{ + logger: testhelper.NewDiscardingLogger(t), + virtualStorage: "praefect", + relativePath: relativePath, + authoritativeStorage: authoritativeStorage, + } - require.NoError(t, createRepoThroughGitaly1(relativePath)) - require.DirExists(t, filepath.Join(g1Cfg.Storages[0].Path, relativePath)) - require.NoDirExists(t, filepath.Join(g2Cfg.Storages[0].Path, relativePath)) + assert.ErrorIs(t, cmd.Exec(flag.NewFlagSet("", flag.PanicOnError), conf), errAuthoritativeRepositoryNotExist) + }) - ds := datastore.NewPostgresRepositoryStore(db, conf.StorageNames()) - id, err := ds.ReserveRepositoryID(ctx, virtualStorageName, relativePath) - require.NoError(t, err) - require.NoError(t, ds.CreateRepository(ctx, id, virtualStorageName, relativePath, relativePath, g1Cfg.Storages[0].Name, nil, nil, true, true)) + t.Run("records already exist", func(t *testing.T) { + relativePath := "path/to/test/repo_2" - cmd := &trackRepository{ - logger: testhelper.NewDiscardingLogger(t), - virtualStorage: virtualStorageName, - relativePath: relativePath, - authoritativeStorage: tc.authoritativeStorage, - } + require.NoError(t, createRepoThroughGitaly1(relativePath)) + require.DirExists(t, filepath.Join(g1Cfg.Storages[0].Path, relativePath)) + require.NoDirExists(t, filepath.Join(g2Cfg.Storages[0].Path, relativePath)) - assert.NoError(t, cmd.Exec(flag.NewFlagSet("", flag.PanicOnError), addCmdConf)) - }) - }) - } + ds := datastore.NewPostgresRepositoryStore(db, conf.StorageNames()) + id, err := ds.ReserveRepositoryID(ctx, virtualStorageName, relativePath) + require.NoError(t, err) + require.NoError(t, ds.CreateRepository( + ctx, + id, + virtualStorageName, + relativePath, + relativePath, + g1Cfg.Storages[0].Name, + nil, + nil, + true, + true, + )) + + cmd := &trackRepository{ + logger: testhelper.NewDiscardingLogger(t), + virtualStorage: virtualStorageName, + relativePath: relativePath, + authoritativeStorage: authoritativeStorage, + } + + assert.NoError(t, cmd.Exec(flag.NewFlagSet("", flag.PanicOnError), conf)) + }) } |