diff options
author | John Cai <jcai@gitlab.com> | 2021-12-10 23:02:43 +0300 |
---|---|---|
committer | John Cai <jcai@gitlab.com> | 2021-12-11 00:33:27 +0300 |
commit | 81a41a856adc5628a57e49503ca1a5498c915671 (patch) | |
tree | 47a1fbccbe07805e1fd0a85a41503c82460bf2ef /cmd/praefect | |
parent | 34fd7dec6f48c74db80db6e04b25bb1d7be92c92 (diff) |
cmd/praefect: create replication events
track-repository inserts database records for praefect to track the
repository, but doesn't insert any replication jobs. When a mutator RPC
comes through, it will trigger replication but we want to make sure that
no matter what the reositories are replicated to its secondaries.
Also refactors the tests to no longer test sql replication, as that is
no longer supported.
Changelog: fixed
Diffstat (limited to 'cmd/praefect')
-rw-r--r-- | cmd/praefect/subcmd_track_repository.go | 28 | ||||
-rw-r--r-- | cmd/praefect/subcmd_track_repository_test.go | 211 |
2 files changed, 135 insertions, 104 deletions
diff --git a/cmd/praefect/subcmd_track_repository.go b/cmd/praefect/subcmd_track_repository.go index 810df39a8..e55fe3bba 100644 --- a/cmd/praefect/subcmd_track_repository.go +++ b/cmd/praefect/subcmd_track_repository.go @@ -10,6 +10,7 @@ import ( "time" "github.com/sirupsen/logrus" + "gitlab.com/gitlab-org/gitaly/v14/internal/middleware/metadatahandler" "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/commonerr" "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/config" "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore" @@ -137,9 +138,13 @@ func (cmd *trackRepository) exec(ctx context.Context, logger logrus.FieldLogger, return fmt.Errorf("%s: %w", trackRepoErrorPrefix, errAuthoritativeRepositoryNotExist) } + store := datastore.NewPostgresRepositoryStore(db, cfg.StorageNames()) + queue := datastore.NewPostgresReplicationEventQueue(db) + if err := cmd.trackRepository( ctx, - datastore.NewPostgresRepositoryStore(db, cfg.StorageNames()), + store, + queue, primary, secondaries, savePrimary, @@ -156,6 +161,7 @@ func (cmd *trackRepository) exec(ctx context.Context, logger logrus.FieldLogger, func (cmd *trackRepository) trackRepository( ctx context.Context, ds *datastore.PostgresRepositoryStore, + queue datastore.ReplicationEventQueue, primary string, secondaries []string, savePrimary bool, @@ -186,6 +192,26 @@ func (cmd *trackRepository) trackRepository( return fmt.Errorf("CreateRepository: %w", err) } + correlationID := correlation.SafeRandomID() + for _, secondary := range secondaries { + event := datastore.ReplicationEvent{ + Job: datastore.ReplicationJob{ + RepositoryID: repositoryID, + Change: datastore.UpdateRepo, + RelativePath: cmd.relativePath, + VirtualStorage: cmd.virtualStorage, + SourceNodeStorage: primary, + TargetNodeStorage: secondary, + }, + Meta: datastore.Params{metadatahandler.CorrelationIDKey: correlationID}, + } + + _, err := queue.Enqueue(ctx, event) + if err != nil { + return err + } + } + return nil } diff --git a/cmd/praefect/subcmd_track_repository_test.go b/cmd/praefect/subcmd_track_repository_test.go index 0e2667af3..be5377b4e 100644 --- a/cmd/praefect/subcmd_track_repository_test.go +++ b/cmd/praefect/subcmd_track_repository_test.go @@ -2,7 +2,6 @@ package main import ( "flag" - "fmt" "io" "path/filepath" "testing" @@ -102,6 +101,10 @@ func TestAddRepository_Exec(t *testing.T) { }, }, DB: dbConf, + Failover: config.Failover{ + Enabled: true, + ElectionStrategy: config.ElectionStrategyPerRepository, + }, } gitalyCC, err := client.Dial(g1Addr, nil) @@ -126,114 +129,116 @@ func TestAddRepository_Exec(t *testing.T) { return err } - testCases := map[string]struct { - failoverConfig config.Failover - authoritativeStorage string - }{ - "sql election": { - failoverConfig: config.Failover{ - Enabled: true, - ElectionStrategy: config.ElectionStrategySQL, - }, - authoritativeStorage: "", - }, - "per repository election": { - failoverConfig: config.Failover{ - Enabled: true, - ElectionStrategy: config.ElectionStrategyPerRepository, - }, - authoritativeStorage: g1Cfg.Storages[0].Name, - }, - } - + authoritativeStorage := g1Cfg.Storages[0].Name logger := testhelper.NewDiscardingLogger(t) - for tn, tc := range testCases { - t.Run(tn, func(t *testing.T) { - addCmdConf := conf - addCmdConf.Failover = tc.failoverConfig - - t.Run("ok", func(t *testing.T) { - nodeMgr, err := nodes.NewManager(testhelper.NewDiscardingLogEntry(t), addCmdConf, db.DB, nil, promtest.NewMockHistogramVec(), protoregistry.GitalyProtoPreregistered, nil, nil, nil) - require.NoError(t, err) - nodeMgr.Start(0, time.Hour) - defer nodeMgr.Stop() - - relativePath := fmt.Sprintf("path/to/test/repo_%s", tn) - repoDS := datastore.NewPostgresRepositoryStore(db, conf.StorageNames()) - - rmRepoCmd := &removeRepository{ - logger: logger, - virtualStorage: virtualStorageName, - relativePath: relativePath, - w: io.Discard, - apply: true, - } - - require.NoError(t, rmRepoCmd.Exec(flag.NewFlagSet("", flag.PanicOnError), conf)) - - // create the repo on Gitaly without Praefect knowing - require.NoError(t, createRepoThroughGitaly1(relativePath)) - require.DirExists(t, filepath.Join(g1Cfg.Storages[0].Path, relativePath)) - require.NoDirExists(t, filepath.Join(g2Cfg.Storages[0].Path, relativePath)) - - addRepoCmd := &trackRepository{ - logger: logger, - virtualStorage: virtualStorageName, - relativePath: relativePath, - authoritativeStorage: tc.authoritativeStorage, - } - - require.NoError(t, addRepoCmd.Exec(flag.NewFlagSet("", flag.PanicOnError), addCmdConf)) - as := datastore.NewAssignmentStore(db, conf.StorageNames()) - - repositoryID, err := repoDS.GetRepositoryID(ctx, virtualStorageName, relativePath) - require.NoError(t, err) - - assignments, err := as.GetHostAssignments(ctx, virtualStorageName, repositoryID) - require.NoError(t, err) - require.Len(t, assignments, 2) - assert.Contains(t, assignments, g1Cfg.Storages[0].Name) - assert.Contains(t, assignments, g2Cfg.Storages[0].Name) - - exists, err := repoDS.RepositoryExists(ctx, virtualStorageName, relativePath) - require.NoError(t, err) - assert.True(t, exists) - }) - t.Run("repository does not exist", func(t *testing.T) { - relativePath := fmt.Sprintf("path/to/test/repo_1_%s", tn) - - cmd := &trackRepository{ - logger: testhelper.NewDiscardingLogger(t), - virtualStorage: "praefect", - relativePath: relativePath, - authoritativeStorage: tc.authoritativeStorage, - } + t.Run("ok", func(t *testing.T) { + nodeMgr, err := nodes.NewManager( + testhelper.NewDiscardingLogEntry(t), + conf, + db.DB, + nil, + promtest.NewMockHistogramVec(), + protoregistry.GitalyProtoPreregistered, + nil, + nil, + nil, + ) + require.NoError(t, err) + nodeMgr.Start(0, time.Hour) + defer nodeMgr.Stop() + + relativePath := "path/to/test/repo" + repoDS := datastore.NewPostgresRepositoryStore(db, conf.StorageNames()) + + rmRepoCmd := &removeRepository{ + logger: logger, + virtualStorage: virtualStorageName, + relativePath: relativePath, + w: io.Discard, + apply: true, + } + + require.NoError(t, rmRepoCmd.Exec(flag.NewFlagSet("", flag.PanicOnError), conf)) + + // create the repo on Gitaly without Praefect knowing + require.NoError(t, createRepoThroughGitaly1(relativePath)) + require.DirExists(t, filepath.Join(g1Cfg.Storages[0].Path, relativePath)) + require.NoDirExists(t, filepath.Join(g2Cfg.Storages[0].Path, relativePath)) + + addRepoCmd := &trackRepository{ + logger: logger, + virtualStorage: virtualStorageName, + relativePath: relativePath, + authoritativeStorage: authoritativeStorage, + } + + require.NoError(t, addRepoCmd.Exec(flag.NewFlagSet("", flag.PanicOnError), conf)) + as := datastore.NewAssignmentStore(db, conf.StorageNames()) + + repositoryID, err := repoDS.GetRepositoryID(ctx, virtualStorageName, relativePath) + require.NoError(t, err) + + assignments, err := as.GetHostAssignments(ctx, virtualStorageName, repositoryID) + require.NoError(t, err) + require.Len(t, assignments, 2) + assert.Contains(t, assignments, g1Cfg.Storages[0].Name) + assert.Contains(t, assignments, g2Cfg.Storages[0].Name) + + exists, err := repoDS.RepositoryExists(ctx, virtualStorageName, relativePath) + require.NoError(t, err) + assert.True(t, exists) + + queue := datastore.NewPostgresReplicationEventQueue(db) + events, err := queue.Dequeue(ctx, virtualStorageName, g2Cfg.Storages[0].Name, 1) + require.NoError(t, err) + assert.Len(t, events, 1) + assert.Equal(t, relativePath, events[0].Job.RelativePath) + }) - assert.ErrorIs(t, cmd.Exec(flag.NewFlagSet("", flag.PanicOnError), addCmdConf), errAuthoritativeRepositoryNotExist) - }) + t.Run("repository does not exist", func(t *testing.T) { + relativePath := "path/to/test/repo_1" - t.Run("records already exist", func(t *testing.T) { - relativePath := fmt.Sprintf("path/to/test/repo_2_%s", tn) + cmd := &trackRepository{ + logger: testhelper.NewDiscardingLogger(t), + virtualStorage: "praefect", + relativePath: relativePath, + authoritativeStorage: authoritativeStorage, + } - require.NoError(t, createRepoThroughGitaly1(relativePath)) - require.DirExists(t, filepath.Join(g1Cfg.Storages[0].Path, relativePath)) - require.NoDirExists(t, filepath.Join(g2Cfg.Storages[0].Path, relativePath)) + assert.ErrorIs(t, cmd.Exec(flag.NewFlagSet("", flag.PanicOnError), conf), errAuthoritativeRepositoryNotExist) + }) - ds := datastore.NewPostgresRepositoryStore(db, conf.StorageNames()) - id, err := ds.ReserveRepositoryID(ctx, virtualStorageName, relativePath) - require.NoError(t, err) - require.NoError(t, ds.CreateRepository(ctx, id, virtualStorageName, relativePath, relativePath, g1Cfg.Storages[0].Name, nil, nil, true, true)) + t.Run("records already exist", func(t *testing.T) { + relativePath := "path/to/test/repo_2" - cmd := &trackRepository{ - logger: testhelper.NewDiscardingLogger(t), - virtualStorage: virtualStorageName, - relativePath: relativePath, - authoritativeStorage: tc.authoritativeStorage, - } + require.NoError(t, createRepoThroughGitaly1(relativePath)) + require.DirExists(t, filepath.Join(g1Cfg.Storages[0].Path, relativePath)) + require.NoDirExists(t, filepath.Join(g2Cfg.Storages[0].Path, relativePath)) - assert.NoError(t, cmd.Exec(flag.NewFlagSet("", flag.PanicOnError), addCmdConf)) - }) - }) - } + ds := datastore.NewPostgresRepositoryStore(db, conf.StorageNames()) + id, err := ds.ReserveRepositoryID(ctx, virtualStorageName, relativePath) + require.NoError(t, err) + require.NoError(t, ds.CreateRepository( + ctx, + id, + virtualStorageName, + relativePath, + relativePath, + g1Cfg.Storages[0].Name, + nil, + nil, + true, + true, + )) + + cmd := &trackRepository{ + logger: testhelper.NewDiscardingLogger(t), + virtualStorage: virtualStorageName, + relativePath: relativePath, + authoritativeStorage: authoritativeStorage, + } + + assert.NoError(t, cmd.Exec(flag.NewFlagSet("", flag.PanicOnError), conf)) + }) } |