From 81a41a856adc5628a57e49503ca1a5498c915671 Mon Sep 17 00:00:00 2001 From: John Cai Date: Fri, 10 Dec 2021 15:02:43 -0500 Subject: cmd/praefect: create replication events track-repository inserts database records for praefect to track the repository, but doesn't insert any replication jobs. When a mutator RPC comes through, it will trigger replication but we want to make sure that no matter what the reositories are replicated to its secondaries. Also refactors the tests to no longer test sql replication, as that is no longer supported. Changelog: fixed --- cmd/praefect/subcmd_track_repository.go | 28 +++- cmd/praefect/subcmd_track_repository_test.go | 211 ++++++++++++++------------- 2 files changed, 135 insertions(+), 104 deletions(-) (limited to 'cmd/praefect') diff --git a/cmd/praefect/subcmd_track_repository.go b/cmd/praefect/subcmd_track_repository.go index 810df39a8..e55fe3bba 100644 --- a/cmd/praefect/subcmd_track_repository.go +++ b/cmd/praefect/subcmd_track_repository.go @@ -10,6 +10,7 @@ import ( "time" "github.com/sirupsen/logrus" + "gitlab.com/gitlab-org/gitaly/v14/internal/middleware/metadatahandler" "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/commonerr" "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/config" "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore" @@ -137,9 +138,13 @@ func (cmd *trackRepository) exec(ctx context.Context, logger logrus.FieldLogger, return fmt.Errorf("%s: %w", trackRepoErrorPrefix, errAuthoritativeRepositoryNotExist) } + store := datastore.NewPostgresRepositoryStore(db, cfg.StorageNames()) + queue := datastore.NewPostgresReplicationEventQueue(db) + if err := cmd.trackRepository( ctx, - datastore.NewPostgresRepositoryStore(db, cfg.StorageNames()), + store, + queue, primary, secondaries, savePrimary, @@ -156,6 +161,7 @@ func (cmd *trackRepository) exec(ctx context.Context, logger logrus.FieldLogger, func (cmd *trackRepository) trackRepository( ctx context.Context, ds *datastore.PostgresRepositoryStore, + queue datastore.ReplicationEventQueue, primary string, secondaries []string, savePrimary bool, @@ -186,6 +192,26 @@ func (cmd *trackRepository) trackRepository( return fmt.Errorf("CreateRepository: %w", err) } + correlationID := correlation.SafeRandomID() + for _, secondary := range secondaries { + event := datastore.ReplicationEvent{ + Job: datastore.ReplicationJob{ + RepositoryID: repositoryID, + Change: datastore.UpdateRepo, + RelativePath: cmd.relativePath, + VirtualStorage: cmd.virtualStorage, + SourceNodeStorage: primary, + TargetNodeStorage: secondary, + }, + Meta: datastore.Params{metadatahandler.CorrelationIDKey: correlationID}, + } + + _, err := queue.Enqueue(ctx, event) + if err != nil { + return err + } + } + return nil } diff --git a/cmd/praefect/subcmd_track_repository_test.go b/cmd/praefect/subcmd_track_repository_test.go index 0e2667af3..be5377b4e 100644 --- a/cmd/praefect/subcmd_track_repository_test.go +++ b/cmd/praefect/subcmd_track_repository_test.go @@ -2,7 +2,6 @@ package main import ( "flag" - "fmt" "io" "path/filepath" "testing" @@ -102,6 +101,10 @@ func TestAddRepository_Exec(t *testing.T) { }, }, DB: dbConf, + Failover: config.Failover{ + Enabled: true, + ElectionStrategy: config.ElectionStrategyPerRepository, + }, } gitalyCC, err := client.Dial(g1Addr, nil) @@ -126,114 +129,116 @@ func TestAddRepository_Exec(t *testing.T) { return err } - testCases := map[string]struct { - failoverConfig config.Failover - authoritativeStorage string - }{ - "sql election": { - failoverConfig: config.Failover{ - Enabled: true, - ElectionStrategy: config.ElectionStrategySQL, - }, - authoritativeStorage: "", - }, - "per repository election": { - failoverConfig: config.Failover{ - Enabled: true, - ElectionStrategy: config.ElectionStrategyPerRepository, - }, - authoritativeStorage: g1Cfg.Storages[0].Name, - }, - } - + authoritativeStorage := g1Cfg.Storages[0].Name logger := testhelper.NewDiscardingLogger(t) - for tn, tc := range testCases { - t.Run(tn, func(t *testing.T) { - addCmdConf := conf - addCmdConf.Failover = tc.failoverConfig - - t.Run("ok", func(t *testing.T) { - nodeMgr, err := nodes.NewManager(testhelper.NewDiscardingLogEntry(t), addCmdConf, db.DB, nil, promtest.NewMockHistogramVec(), protoregistry.GitalyProtoPreregistered, nil, nil, nil) - require.NoError(t, err) - nodeMgr.Start(0, time.Hour) - defer nodeMgr.Stop() - - relativePath := fmt.Sprintf("path/to/test/repo_%s", tn) - repoDS := datastore.NewPostgresRepositoryStore(db, conf.StorageNames()) - - rmRepoCmd := &removeRepository{ - logger: logger, - virtualStorage: virtualStorageName, - relativePath: relativePath, - w: io.Discard, - apply: true, - } - - require.NoError(t, rmRepoCmd.Exec(flag.NewFlagSet("", flag.PanicOnError), conf)) - - // create the repo on Gitaly without Praefect knowing - require.NoError(t, createRepoThroughGitaly1(relativePath)) - require.DirExists(t, filepath.Join(g1Cfg.Storages[0].Path, relativePath)) - require.NoDirExists(t, filepath.Join(g2Cfg.Storages[0].Path, relativePath)) - - addRepoCmd := &trackRepository{ - logger: logger, - virtualStorage: virtualStorageName, - relativePath: relativePath, - authoritativeStorage: tc.authoritativeStorage, - } - - require.NoError(t, addRepoCmd.Exec(flag.NewFlagSet("", flag.PanicOnError), addCmdConf)) - as := datastore.NewAssignmentStore(db, conf.StorageNames()) - - repositoryID, err := repoDS.GetRepositoryID(ctx, virtualStorageName, relativePath) - require.NoError(t, err) - - assignments, err := as.GetHostAssignments(ctx, virtualStorageName, repositoryID) - require.NoError(t, err) - require.Len(t, assignments, 2) - assert.Contains(t, assignments, g1Cfg.Storages[0].Name) - assert.Contains(t, assignments, g2Cfg.Storages[0].Name) - - exists, err := repoDS.RepositoryExists(ctx, virtualStorageName, relativePath) - require.NoError(t, err) - assert.True(t, exists) - }) - t.Run("repository does not exist", func(t *testing.T) { - relativePath := fmt.Sprintf("path/to/test/repo_1_%s", tn) - - cmd := &trackRepository{ - logger: testhelper.NewDiscardingLogger(t), - virtualStorage: "praefect", - relativePath: relativePath, - authoritativeStorage: tc.authoritativeStorage, - } + t.Run("ok", func(t *testing.T) { + nodeMgr, err := nodes.NewManager( + testhelper.NewDiscardingLogEntry(t), + conf, + db.DB, + nil, + promtest.NewMockHistogramVec(), + protoregistry.GitalyProtoPreregistered, + nil, + nil, + nil, + ) + require.NoError(t, err) + nodeMgr.Start(0, time.Hour) + defer nodeMgr.Stop() + + relativePath := "path/to/test/repo" + repoDS := datastore.NewPostgresRepositoryStore(db, conf.StorageNames()) + + rmRepoCmd := &removeRepository{ + logger: logger, + virtualStorage: virtualStorageName, + relativePath: relativePath, + w: io.Discard, + apply: true, + } + + require.NoError(t, rmRepoCmd.Exec(flag.NewFlagSet("", flag.PanicOnError), conf)) + + // create the repo on Gitaly without Praefect knowing + require.NoError(t, createRepoThroughGitaly1(relativePath)) + require.DirExists(t, filepath.Join(g1Cfg.Storages[0].Path, relativePath)) + require.NoDirExists(t, filepath.Join(g2Cfg.Storages[0].Path, relativePath)) + + addRepoCmd := &trackRepository{ + logger: logger, + virtualStorage: virtualStorageName, + relativePath: relativePath, + authoritativeStorage: authoritativeStorage, + } + + require.NoError(t, addRepoCmd.Exec(flag.NewFlagSet("", flag.PanicOnError), conf)) + as := datastore.NewAssignmentStore(db, conf.StorageNames()) + + repositoryID, err := repoDS.GetRepositoryID(ctx, virtualStorageName, relativePath) + require.NoError(t, err) + + assignments, err := as.GetHostAssignments(ctx, virtualStorageName, repositoryID) + require.NoError(t, err) + require.Len(t, assignments, 2) + assert.Contains(t, assignments, g1Cfg.Storages[0].Name) + assert.Contains(t, assignments, g2Cfg.Storages[0].Name) + + exists, err := repoDS.RepositoryExists(ctx, virtualStorageName, relativePath) + require.NoError(t, err) + assert.True(t, exists) + + queue := datastore.NewPostgresReplicationEventQueue(db) + events, err := queue.Dequeue(ctx, virtualStorageName, g2Cfg.Storages[0].Name, 1) + require.NoError(t, err) + assert.Len(t, events, 1) + assert.Equal(t, relativePath, events[0].Job.RelativePath) + }) - assert.ErrorIs(t, cmd.Exec(flag.NewFlagSet("", flag.PanicOnError), addCmdConf), errAuthoritativeRepositoryNotExist) - }) + t.Run("repository does not exist", func(t *testing.T) { + relativePath := "path/to/test/repo_1" - t.Run("records already exist", func(t *testing.T) { - relativePath := fmt.Sprintf("path/to/test/repo_2_%s", tn) + cmd := &trackRepository{ + logger: testhelper.NewDiscardingLogger(t), + virtualStorage: "praefect", + relativePath: relativePath, + authoritativeStorage: authoritativeStorage, + } - require.NoError(t, createRepoThroughGitaly1(relativePath)) - require.DirExists(t, filepath.Join(g1Cfg.Storages[0].Path, relativePath)) - require.NoDirExists(t, filepath.Join(g2Cfg.Storages[0].Path, relativePath)) + assert.ErrorIs(t, cmd.Exec(flag.NewFlagSet("", flag.PanicOnError), conf), errAuthoritativeRepositoryNotExist) + }) - ds := datastore.NewPostgresRepositoryStore(db, conf.StorageNames()) - id, err := ds.ReserveRepositoryID(ctx, virtualStorageName, relativePath) - require.NoError(t, err) - require.NoError(t, ds.CreateRepository(ctx, id, virtualStorageName, relativePath, relativePath, g1Cfg.Storages[0].Name, nil, nil, true, true)) + t.Run("records already exist", func(t *testing.T) { + relativePath := "path/to/test/repo_2" - cmd := &trackRepository{ - logger: testhelper.NewDiscardingLogger(t), - virtualStorage: virtualStorageName, - relativePath: relativePath, - authoritativeStorage: tc.authoritativeStorage, - } + require.NoError(t, createRepoThroughGitaly1(relativePath)) + require.DirExists(t, filepath.Join(g1Cfg.Storages[0].Path, relativePath)) + require.NoDirExists(t, filepath.Join(g2Cfg.Storages[0].Path, relativePath)) - assert.NoError(t, cmd.Exec(flag.NewFlagSet("", flag.PanicOnError), addCmdConf)) - }) - }) - } + ds := datastore.NewPostgresRepositoryStore(db, conf.StorageNames()) + id, err := ds.ReserveRepositoryID(ctx, virtualStorageName, relativePath) + require.NoError(t, err) + require.NoError(t, ds.CreateRepository( + ctx, + id, + virtualStorageName, + relativePath, + relativePath, + g1Cfg.Storages[0].Name, + nil, + nil, + true, + true, + )) + + cmd := &trackRepository{ + logger: testhelper.NewDiscardingLogger(t), + virtualStorage: virtualStorageName, + relativePath: relativePath, + authoritativeStorage: authoritativeStorage, + } + + assert.NoError(t, cmd.Exec(flag.NewFlagSet("", flag.PanicOnError), conf)) + }) } -- cgit v1.2.3