Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJohn Cai <jcai@gitlab.com>2021-12-10 23:02:43 +0300
committerJohn Cai <jcai@gitlab.com>2021-12-11 00:33:27 +0300
commit81a41a856adc5628a57e49503ca1a5498c915671 (patch)
tree47a1fbccbe07805e1fd0a85a41503c82460bf2ef /cmd/praefect
parent34fd7dec6f48c74db80db6e04b25bb1d7be92c92 (diff)
cmd/praefect: create replication events
track-repository inserts database records for praefect to track the repository, but doesn't insert any replication jobs. When a mutator RPC comes through, it will trigger replication but we want to make sure that no matter what the reositories are replicated to its secondaries. Also refactors the tests to no longer test sql replication, as that is no longer supported. Changelog: fixed
Diffstat (limited to 'cmd/praefect')
-rw-r--r--cmd/praefect/subcmd_track_repository.go28
-rw-r--r--cmd/praefect/subcmd_track_repository_test.go211
2 files changed, 135 insertions, 104 deletions
diff --git a/cmd/praefect/subcmd_track_repository.go b/cmd/praefect/subcmd_track_repository.go
index 810df39a8..e55fe3bba 100644
--- a/cmd/praefect/subcmd_track_repository.go
+++ b/cmd/praefect/subcmd_track_repository.go
@@ -10,6 +10,7 @@ import (
"time"
"github.com/sirupsen/logrus"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/middleware/metadatahandler"
"gitlab.com/gitlab-org/gitaly/v14/internal/praefect/commonerr"
"gitlab.com/gitlab-org/gitaly/v14/internal/praefect/config"
"gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore"
@@ -137,9 +138,13 @@ func (cmd *trackRepository) exec(ctx context.Context, logger logrus.FieldLogger,
return fmt.Errorf("%s: %w", trackRepoErrorPrefix, errAuthoritativeRepositoryNotExist)
}
+ store := datastore.NewPostgresRepositoryStore(db, cfg.StorageNames())
+ queue := datastore.NewPostgresReplicationEventQueue(db)
+
if err := cmd.trackRepository(
ctx,
- datastore.NewPostgresRepositoryStore(db, cfg.StorageNames()),
+ store,
+ queue,
primary,
secondaries,
savePrimary,
@@ -156,6 +161,7 @@ func (cmd *trackRepository) exec(ctx context.Context, logger logrus.FieldLogger,
func (cmd *trackRepository) trackRepository(
ctx context.Context,
ds *datastore.PostgresRepositoryStore,
+ queue datastore.ReplicationEventQueue,
primary string,
secondaries []string,
savePrimary bool,
@@ -186,6 +192,26 @@ func (cmd *trackRepository) trackRepository(
return fmt.Errorf("CreateRepository: %w", err)
}
+ correlationID := correlation.SafeRandomID()
+ for _, secondary := range secondaries {
+ event := datastore.ReplicationEvent{
+ Job: datastore.ReplicationJob{
+ RepositoryID: repositoryID,
+ Change: datastore.UpdateRepo,
+ RelativePath: cmd.relativePath,
+ VirtualStorage: cmd.virtualStorage,
+ SourceNodeStorage: primary,
+ TargetNodeStorage: secondary,
+ },
+ Meta: datastore.Params{metadatahandler.CorrelationIDKey: correlationID},
+ }
+
+ _, err := queue.Enqueue(ctx, event)
+ if err != nil {
+ return err
+ }
+ }
+
return nil
}
diff --git a/cmd/praefect/subcmd_track_repository_test.go b/cmd/praefect/subcmd_track_repository_test.go
index 0e2667af3..be5377b4e 100644
--- a/cmd/praefect/subcmd_track_repository_test.go
+++ b/cmd/praefect/subcmd_track_repository_test.go
@@ -2,7 +2,6 @@ package main
import (
"flag"
- "fmt"
"io"
"path/filepath"
"testing"
@@ -102,6 +101,10 @@ func TestAddRepository_Exec(t *testing.T) {
},
},
DB: dbConf,
+ Failover: config.Failover{
+ Enabled: true,
+ ElectionStrategy: config.ElectionStrategyPerRepository,
+ },
}
gitalyCC, err := client.Dial(g1Addr, nil)
@@ -126,114 +129,116 @@ func TestAddRepository_Exec(t *testing.T) {
return err
}
- testCases := map[string]struct {
- failoverConfig config.Failover
- authoritativeStorage string
- }{
- "sql election": {
- failoverConfig: config.Failover{
- Enabled: true,
- ElectionStrategy: config.ElectionStrategySQL,
- },
- authoritativeStorage: "",
- },
- "per repository election": {
- failoverConfig: config.Failover{
- Enabled: true,
- ElectionStrategy: config.ElectionStrategyPerRepository,
- },
- authoritativeStorage: g1Cfg.Storages[0].Name,
- },
- }
-
+ authoritativeStorage := g1Cfg.Storages[0].Name
logger := testhelper.NewDiscardingLogger(t)
- for tn, tc := range testCases {
- t.Run(tn, func(t *testing.T) {
- addCmdConf := conf
- addCmdConf.Failover = tc.failoverConfig
-
- t.Run("ok", func(t *testing.T) {
- nodeMgr, err := nodes.NewManager(testhelper.NewDiscardingLogEntry(t), addCmdConf, db.DB, nil, promtest.NewMockHistogramVec(), protoregistry.GitalyProtoPreregistered, nil, nil, nil)
- require.NoError(t, err)
- nodeMgr.Start(0, time.Hour)
- defer nodeMgr.Stop()
-
- relativePath := fmt.Sprintf("path/to/test/repo_%s", tn)
- repoDS := datastore.NewPostgresRepositoryStore(db, conf.StorageNames())
-
- rmRepoCmd := &removeRepository{
- logger: logger,
- virtualStorage: virtualStorageName,
- relativePath: relativePath,
- w: io.Discard,
- apply: true,
- }
-
- require.NoError(t, rmRepoCmd.Exec(flag.NewFlagSet("", flag.PanicOnError), conf))
-
- // create the repo on Gitaly without Praefect knowing
- require.NoError(t, createRepoThroughGitaly1(relativePath))
- require.DirExists(t, filepath.Join(g1Cfg.Storages[0].Path, relativePath))
- require.NoDirExists(t, filepath.Join(g2Cfg.Storages[0].Path, relativePath))
-
- addRepoCmd := &trackRepository{
- logger: logger,
- virtualStorage: virtualStorageName,
- relativePath: relativePath,
- authoritativeStorage: tc.authoritativeStorage,
- }
-
- require.NoError(t, addRepoCmd.Exec(flag.NewFlagSet("", flag.PanicOnError), addCmdConf))
- as := datastore.NewAssignmentStore(db, conf.StorageNames())
-
- repositoryID, err := repoDS.GetRepositoryID(ctx, virtualStorageName, relativePath)
- require.NoError(t, err)
-
- assignments, err := as.GetHostAssignments(ctx, virtualStorageName, repositoryID)
- require.NoError(t, err)
- require.Len(t, assignments, 2)
- assert.Contains(t, assignments, g1Cfg.Storages[0].Name)
- assert.Contains(t, assignments, g2Cfg.Storages[0].Name)
-
- exists, err := repoDS.RepositoryExists(ctx, virtualStorageName, relativePath)
- require.NoError(t, err)
- assert.True(t, exists)
- })
- t.Run("repository does not exist", func(t *testing.T) {
- relativePath := fmt.Sprintf("path/to/test/repo_1_%s", tn)
-
- cmd := &trackRepository{
- logger: testhelper.NewDiscardingLogger(t),
- virtualStorage: "praefect",
- relativePath: relativePath,
- authoritativeStorage: tc.authoritativeStorage,
- }
+ t.Run("ok", func(t *testing.T) {
+ nodeMgr, err := nodes.NewManager(
+ testhelper.NewDiscardingLogEntry(t),
+ conf,
+ db.DB,
+ nil,
+ promtest.NewMockHistogramVec(),
+ protoregistry.GitalyProtoPreregistered,
+ nil,
+ nil,
+ nil,
+ )
+ require.NoError(t, err)
+ nodeMgr.Start(0, time.Hour)
+ defer nodeMgr.Stop()
+
+ relativePath := "path/to/test/repo"
+ repoDS := datastore.NewPostgresRepositoryStore(db, conf.StorageNames())
+
+ rmRepoCmd := &removeRepository{
+ logger: logger,
+ virtualStorage: virtualStorageName,
+ relativePath: relativePath,
+ w: io.Discard,
+ apply: true,
+ }
+
+ require.NoError(t, rmRepoCmd.Exec(flag.NewFlagSet("", flag.PanicOnError), conf))
+
+ // create the repo on Gitaly without Praefect knowing
+ require.NoError(t, createRepoThroughGitaly1(relativePath))
+ require.DirExists(t, filepath.Join(g1Cfg.Storages[0].Path, relativePath))
+ require.NoDirExists(t, filepath.Join(g2Cfg.Storages[0].Path, relativePath))
+
+ addRepoCmd := &trackRepository{
+ logger: logger,
+ virtualStorage: virtualStorageName,
+ relativePath: relativePath,
+ authoritativeStorage: authoritativeStorage,
+ }
+
+ require.NoError(t, addRepoCmd.Exec(flag.NewFlagSet("", flag.PanicOnError), conf))
+ as := datastore.NewAssignmentStore(db, conf.StorageNames())
+
+ repositoryID, err := repoDS.GetRepositoryID(ctx, virtualStorageName, relativePath)
+ require.NoError(t, err)
+
+ assignments, err := as.GetHostAssignments(ctx, virtualStorageName, repositoryID)
+ require.NoError(t, err)
+ require.Len(t, assignments, 2)
+ assert.Contains(t, assignments, g1Cfg.Storages[0].Name)
+ assert.Contains(t, assignments, g2Cfg.Storages[0].Name)
+
+ exists, err := repoDS.RepositoryExists(ctx, virtualStorageName, relativePath)
+ require.NoError(t, err)
+ assert.True(t, exists)
+
+ queue := datastore.NewPostgresReplicationEventQueue(db)
+ events, err := queue.Dequeue(ctx, virtualStorageName, g2Cfg.Storages[0].Name, 1)
+ require.NoError(t, err)
+ assert.Len(t, events, 1)
+ assert.Equal(t, relativePath, events[0].Job.RelativePath)
+ })
- assert.ErrorIs(t, cmd.Exec(flag.NewFlagSet("", flag.PanicOnError), addCmdConf), errAuthoritativeRepositoryNotExist)
- })
+ t.Run("repository does not exist", func(t *testing.T) {
+ relativePath := "path/to/test/repo_1"
- t.Run("records already exist", func(t *testing.T) {
- relativePath := fmt.Sprintf("path/to/test/repo_2_%s", tn)
+ cmd := &trackRepository{
+ logger: testhelper.NewDiscardingLogger(t),
+ virtualStorage: "praefect",
+ relativePath: relativePath,
+ authoritativeStorage: authoritativeStorage,
+ }
- require.NoError(t, createRepoThroughGitaly1(relativePath))
- require.DirExists(t, filepath.Join(g1Cfg.Storages[0].Path, relativePath))
- require.NoDirExists(t, filepath.Join(g2Cfg.Storages[0].Path, relativePath))
+ assert.ErrorIs(t, cmd.Exec(flag.NewFlagSet("", flag.PanicOnError), conf), errAuthoritativeRepositoryNotExist)
+ })
- ds := datastore.NewPostgresRepositoryStore(db, conf.StorageNames())
- id, err := ds.ReserveRepositoryID(ctx, virtualStorageName, relativePath)
- require.NoError(t, err)
- require.NoError(t, ds.CreateRepository(ctx, id, virtualStorageName, relativePath, relativePath, g1Cfg.Storages[0].Name, nil, nil, true, true))
+ t.Run("records already exist", func(t *testing.T) {
+ relativePath := "path/to/test/repo_2"
- cmd := &trackRepository{
- logger: testhelper.NewDiscardingLogger(t),
- virtualStorage: virtualStorageName,
- relativePath: relativePath,
- authoritativeStorage: tc.authoritativeStorage,
- }
+ require.NoError(t, createRepoThroughGitaly1(relativePath))
+ require.DirExists(t, filepath.Join(g1Cfg.Storages[0].Path, relativePath))
+ require.NoDirExists(t, filepath.Join(g2Cfg.Storages[0].Path, relativePath))
- assert.NoError(t, cmd.Exec(flag.NewFlagSet("", flag.PanicOnError), addCmdConf))
- })
- })
- }
+ ds := datastore.NewPostgresRepositoryStore(db, conf.StorageNames())
+ id, err := ds.ReserveRepositoryID(ctx, virtualStorageName, relativePath)
+ require.NoError(t, err)
+ require.NoError(t, ds.CreateRepository(
+ ctx,
+ id,
+ virtualStorageName,
+ relativePath,
+ relativePath,
+ g1Cfg.Storages[0].Name,
+ nil,
+ nil,
+ true,
+ true,
+ ))
+
+ cmd := &trackRepository{
+ logger: testhelper.NewDiscardingLogger(t),
+ virtualStorage: virtualStorageName,
+ relativePath: relativePath,
+ authoritativeStorage: authoritativeStorage,
+ }
+
+ assert.NoError(t, cmd.Exec(flag.NewFlagSet("", flag.PanicOnError), conf))
+ })
}