Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
Diffstat (limited to 'cmd/praefect')
-rw-r--r--cmd/praefect/subcmd_track_repository.go28
-rw-r--r--cmd/praefect/subcmd_track_repository_test.go211
2 files changed, 135 insertions, 104 deletions
diff --git a/cmd/praefect/subcmd_track_repository.go b/cmd/praefect/subcmd_track_repository.go
index 810df39a8..e55fe3bba 100644
--- a/cmd/praefect/subcmd_track_repository.go
+++ b/cmd/praefect/subcmd_track_repository.go
@@ -10,6 +10,7 @@ import (
"time"
"github.com/sirupsen/logrus"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/middleware/metadatahandler"
"gitlab.com/gitlab-org/gitaly/v14/internal/praefect/commonerr"
"gitlab.com/gitlab-org/gitaly/v14/internal/praefect/config"
"gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore"
@@ -137,9 +138,13 @@ func (cmd *trackRepository) exec(ctx context.Context, logger logrus.FieldLogger,
return fmt.Errorf("%s: %w", trackRepoErrorPrefix, errAuthoritativeRepositoryNotExist)
}
+ store := datastore.NewPostgresRepositoryStore(db, cfg.StorageNames())
+ queue := datastore.NewPostgresReplicationEventQueue(db)
+
if err := cmd.trackRepository(
ctx,
- datastore.NewPostgresRepositoryStore(db, cfg.StorageNames()),
+ store,
+ queue,
primary,
secondaries,
savePrimary,
@@ -156,6 +161,7 @@ func (cmd *trackRepository) exec(ctx context.Context, logger logrus.FieldLogger,
func (cmd *trackRepository) trackRepository(
ctx context.Context,
ds *datastore.PostgresRepositoryStore,
+ queue datastore.ReplicationEventQueue,
primary string,
secondaries []string,
savePrimary bool,
@@ -186,6 +192,26 @@ func (cmd *trackRepository) trackRepository(
return fmt.Errorf("CreateRepository: %w", err)
}
+ correlationID := correlation.SafeRandomID()
+ for _, secondary := range secondaries {
+ event := datastore.ReplicationEvent{
+ Job: datastore.ReplicationJob{
+ RepositoryID: repositoryID,
+ Change: datastore.UpdateRepo,
+ RelativePath: cmd.relativePath,
+ VirtualStorage: cmd.virtualStorage,
+ SourceNodeStorage: primary,
+ TargetNodeStorage: secondary,
+ },
+ Meta: datastore.Params{metadatahandler.CorrelationIDKey: correlationID},
+ }
+
+ _, err := queue.Enqueue(ctx, event)
+ if err != nil {
+ return err
+ }
+ }
+
return nil
}
diff --git a/cmd/praefect/subcmd_track_repository_test.go b/cmd/praefect/subcmd_track_repository_test.go
index 0e2667af3..be5377b4e 100644
--- a/cmd/praefect/subcmd_track_repository_test.go
+++ b/cmd/praefect/subcmd_track_repository_test.go
@@ -2,7 +2,6 @@ package main
import (
"flag"
- "fmt"
"io"
"path/filepath"
"testing"
@@ -102,6 +101,10 @@ func TestAddRepository_Exec(t *testing.T) {
},
},
DB: dbConf,
+ Failover: config.Failover{
+ Enabled: true,
+ ElectionStrategy: config.ElectionStrategyPerRepository,
+ },
}
gitalyCC, err := client.Dial(g1Addr, nil)
@@ -126,114 +129,116 @@ func TestAddRepository_Exec(t *testing.T) {
return err
}
- testCases := map[string]struct {
- failoverConfig config.Failover
- authoritativeStorage string
- }{
- "sql election": {
- failoverConfig: config.Failover{
- Enabled: true,
- ElectionStrategy: config.ElectionStrategySQL,
- },
- authoritativeStorage: "",
- },
- "per repository election": {
- failoverConfig: config.Failover{
- Enabled: true,
- ElectionStrategy: config.ElectionStrategyPerRepository,
- },
- authoritativeStorage: g1Cfg.Storages[0].Name,
- },
- }
-
+ authoritativeStorage := g1Cfg.Storages[0].Name
logger := testhelper.NewDiscardingLogger(t)
- for tn, tc := range testCases {
- t.Run(tn, func(t *testing.T) {
- addCmdConf := conf
- addCmdConf.Failover = tc.failoverConfig
-
- t.Run("ok", func(t *testing.T) {
- nodeMgr, err := nodes.NewManager(testhelper.NewDiscardingLogEntry(t), addCmdConf, db.DB, nil, promtest.NewMockHistogramVec(), protoregistry.GitalyProtoPreregistered, nil, nil, nil)
- require.NoError(t, err)
- nodeMgr.Start(0, time.Hour)
- defer nodeMgr.Stop()
-
- relativePath := fmt.Sprintf("path/to/test/repo_%s", tn)
- repoDS := datastore.NewPostgresRepositoryStore(db, conf.StorageNames())
-
- rmRepoCmd := &removeRepository{
- logger: logger,
- virtualStorage: virtualStorageName,
- relativePath: relativePath,
- w: io.Discard,
- apply: true,
- }
-
- require.NoError(t, rmRepoCmd.Exec(flag.NewFlagSet("", flag.PanicOnError), conf))
-
- // create the repo on Gitaly without Praefect knowing
- require.NoError(t, createRepoThroughGitaly1(relativePath))
- require.DirExists(t, filepath.Join(g1Cfg.Storages[0].Path, relativePath))
- require.NoDirExists(t, filepath.Join(g2Cfg.Storages[0].Path, relativePath))
-
- addRepoCmd := &trackRepository{
- logger: logger,
- virtualStorage: virtualStorageName,
- relativePath: relativePath,
- authoritativeStorage: tc.authoritativeStorage,
- }
-
- require.NoError(t, addRepoCmd.Exec(flag.NewFlagSet("", flag.PanicOnError), addCmdConf))
- as := datastore.NewAssignmentStore(db, conf.StorageNames())
-
- repositoryID, err := repoDS.GetRepositoryID(ctx, virtualStorageName, relativePath)
- require.NoError(t, err)
-
- assignments, err := as.GetHostAssignments(ctx, virtualStorageName, repositoryID)
- require.NoError(t, err)
- require.Len(t, assignments, 2)
- assert.Contains(t, assignments, g1Cfg.Storages[0].Name)
- assert.Contains(t, assignments, g2Cfg.Storages[0].Name)
-
- exists, err := repoDS.RepositoryExists(ctx, virtualStorageName, relativePath)
- require.NoError(t, err)
- assert.True(t, exists)
- })
- t.Run("repository does not exist", func(t *testing.T) {
- relativePath := fmt.Sprintf("path/to/test/repo_1_%s", tn)
-
- cmd := &trackRepository{
- logger: testhelper.NewDiscardingLogger(t),
- virtualStorage: "praefect",
- relativePath: relativePath,
- authoritativeStorage: tc.authoritativeStorage,
- }
+ t.Run("ok", func(t *testing.T) {
+ nodeMgr, err := nodes.NewManager(
+ testhelper.NewDiscardingLogEntry(t),
+ conf,
+ db.DB,
+ nil,
+ promtest.NewMockHistogramVec(),
+ protoregistry.GitalyProtoPreregistered,
+ nil,
+ nil,
+ nil,
+ )
+ require.NoError(t, err)
+ nodeMgr.Start(0, time.Hour)
+ defer nodeMgr.Stop()
+
+ relativePath := "path/to/test/repo"
+ repoDS := datastore.NewPostgresRepositoryStore(db, conf.StorageNames())
+
+ rmRepoCmd := &removeRepository{
+ logger: logger,
+ virtualStorage: virtualStorageName,
+ relativePath: relativePath,
+ w: io.Discard,
+ apply: true,
+ }
+
+ require.NoError(t, rmRepoCmd.Exec(flag.NewFlagSet("", flag.PanicOnError), conf))
+
+ // create the repo on Gitaly without Praefect knowing
+ require.NoError(t, createRepoThroughGitaly1(relativePath))
+ require.DirExists(t, filepath.Join(g1Cfg.Storages[0].Path, relativePath))
+ require.NoDirExists(t, filepath.Join(g2Cfg.Storages[0].Path, relativePath))
+
+ addRepoCmd := &trackRepository{
+ logger: logger,
+ virtualStorage: virtualStorageName,
+ relativePath: relativePath,
+ authoritativeStorage: authoritativeStorage,
+ }
+
+ require.NoError(t, addRepoCmd.Exec(flag.NewFlagSet("", flag.PanicOnError), conf))
+ as := datastore.NewAssignmentStore(db, conf.StorageNames())
+
+ repositoryID, err := repoDS.GetRepositoryID(ctx, virtualStorageName, relativePath)
+ require.NoError(t, err)
+
+ assignments, err := as.GetHostAssignments(ctx, virtualStorageName, repositoryID)
+ require.NoError(t, err)
+ require.Len(t, assignments, 2)
+ assert.Contains(t, assignments, g1Cfg.Storages[0].Name)
+ assert.Contains(t, assignments, g2Cfg.Storages[0].Name)
+
+ exists, err := repoDS.RepositoryExists(ctx, virtualStorageName, relativePath)
+ require.NoError(t, err)
+ assert.True(t, exists)
+
+ queue := datastore.NewPostgresReplicationEventQueue(db)
+ events, err := queue.Dequeue(ctx, virtualStorageName, g2Cfg.Storages[0].Name, 1)
+ require.NoError(t, err)
+ assert.Len(t, events, 1)
+ assert.Equal(t, relativePath, events[0].Job.RelativePath)
+ })
- assert.ErrorIs(t, cmd.Exec(flag.NewFlagSet("", flag.PanicOnError), addCmdConf), errAuthoritativeRepositoryNotExist)
- })
+ t.Run("repository does not exist", func(t *testing.T) {
+ relativePath := "path/to/test/repo_1"
- t.Run("records already exist", func(t *testing.T) {
- relativePath := fmt.Sprintf("path/to/test/repo_2_%s", tn)
+ cmd := &trackRepository{
+ logger: testhelper.NewDiscardingLogger(t),
+ virtualStorage: "praefect",
+ relativePath: relativePath,
+ authoritativeStorage: authoritativeStorage,
+ }
- require.NoError(t, createRepoThroughGitaly1(relativePath))
- require.DirExists(t, filepath.Join(g1Cfg.Storages[0].Path, relativePath))
- require.NoDirExists(t, filepath.Join(g2Cfg.Storages[0].Path, relativePath))
+ assert.ErrorIs(t, cmd.Exec(flag.NewFlagSet("", flag.PanicOnError), conf), errAuthoritativeRepositoryNotExist)
+ })
- ds := datastore.NewPostgresRepositoryStore(db, conf.StorageNames())
- id, err := ds.ReserveRepositoryID(ctx, virtualStorageName, relativePath)
- require.NoError(t, err)
- require.NoError(t, ds.CreateRepository(ctx, id, virtualStorageName, relativePath, relativePath, g1Cfg.Storages[0].Name, nil, nil, true, true))
+ t.Run("records already exist", func(t *testing.T) {
+ relativePath := "path/to/test/repo_2"
- cmd := &trackRepository{
- logger: testhelper.NewDiscardingLogger(t),
- virtualStorage: virtualStorageName,
- relativePath: relativePath,
- authoritativeStorage: tc.authoritativeStorage,
- }
+ require.NoError(t, createRepoThroughGitaly1(relativePath))
+ require.DirExists(t, filepath.Join(g1Cfg.Storages[0].Path, relativePath))
+ require.NoDirExists(t, filepath.Join(g2Cfg.Storages[0].Path, relativePath))
- assert.NoError(t, cmd.Exec(flag.NewFlagSet("", flag.PanicOnError), addCmdConf))
- })
- })
- }
+ ds := datastore.NewPostgresRepositoryStore(db, conf.StorageNames())
+ id, err := ds.ReserveRepositoryID(ctx, virtualStorageName, relativePath)
+ require.NoError(t, err)
+ require.NoError(t, ds.CreateRepository(
+ ctx,
+ id,
+ virtualStorageName,
+ relativePath,
+ relativePath,
+ g1Cfg.Storages[0].Name,
+ nil,
+ nil,
+ true,
+ true,
+ ))
+
+ cmd := &trackRepository{
+ logger: testhelper.NewDiscardingLogger(t),
+ virtualStorage: virtualStorageName,
+ relativePath: relativePath,
+ authoritativeStorage: authoritativeStorage,
+ }
+
+ assert.NoError(t, cmd.Exec(flag.NewFlagSet("", flag.PanicOnError), conf))
+ })
}