Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorWill Chandler <wchandler@gitlab.com>2023-11-10 17:35:10 +0300
committerWill Chandler <wchandler@gitlab.com>2023-11-10 17:35:10 +0300
commitccd0e9dea30d69a7746b0d7243dc94b6e699fe47 (patch)
tree6e4dec63f52702e0268a8e40f7879a78a72be13a
parentdd40a6fb11606e47078851e7cde4b4715b054211 (diff)
parentd33d508716f3f212a52a0266f81e9070513401eb (diff)
Merge branch 'jt-praefect-replicator-pool-disconnect' into 'master'
praefect: Support object pool disconnection in Praefect replicator See merge request https://gitlab.com/gitlab-org/gitaly/-/merge_requests/6512 Merged-by: Will Chandler <wchandler@gitlab.com> Approved-by: Sami Hiltunen <shiltunen@gitlab.com> Approved-by: Will Chandler <wchandler@gitlab.com> Reviewed-by: Sami Hiltunen <shiltunen@gitlab.com> Co-authored-by: Justin Tobler <jtobler@gitlab.com>
-rw-r--r--internal/praefect/replicator.go15
-rw-r--r--internal/praefect/replicator_test.go175
2 files changed, 188 insertions, 2 deletions
diff --git a/internal/praefect/replicator.go b/internal/praefect/replicator.go
index 42b3eebee..b19d17481 100644
--- a/internal/praefect/replicator.go
+++ b/internal/praefect/replicator.go
@@ -102,8 +102,19 @@ func (dr defaultReplicator) Replicate(ctx context.Context, event datastore.Repli
sourceObjectPool := resp.GetObjectPool()
- if sourceObjectPool != nil {
- targetObjectPoolClient := gitalypb.NewObjectPoolServiceClient(targetCC)
+ targetObjectPoolClient := gitalypb.NewObjectPoolServiceClient(targetCC)
+
+ if sourceObjectPool == nil {
+ // If the source repository is not linked to an object pool, the target repository
+ // should also not be linked to any object pool to ensure consistency.
+ if _, err := targetObjectPoolClient.DisconnectGitAlternates(ctx, &gitalypb.DisconnectGitAlternatesRequest{
+ Repository: targetRepository,
+ }); err != nil {
+ return err
+ }
+ } else {
+ // If the source repository is linked to an object pool, the target repository
+ // should link to the same object pool.
targetObjectPool := proto.Clone(sourceObjectPool).(*gitalypb.ObjectPool)
targetObjectPool.GetRepository().StorageName = targetRepository.GetStorageName()
if _, err := targetObjectPoolClient.LinkRepositoryToObjectPool(ctx, &gitalypb.LinkRepositoryToObjectPoolRequest{
diff --git a/internal/praefect/replicator_test.go b/internal/praefect/replicator_test.go
index 9b1e0651e..369064ee6 100644
--- a/internal/praefect/replicator_test.go
+++ b/internal/praefect/replicator_test.go
@@ -18,6 +18,7 @@ import (
"gitlab.com/gitlab-org/gitaly/v16/internal/git/gittest"
gitalycfg "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/config"
"gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/service/setup"
+ "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage"
"gitlab.com/gitlab-org/gitaly/v16/internal/grpc/protoregistry"
"gitlab.com/gitlab-org/gitaly/v16/internal/helper"
"gitlab.com/gitlab-org/gitaly/v16/internal/log"
@@ -200,6 +201,180 @@ gitaly_praefect_replication_jobs{change_type="update",gitaly_storage="backup",vi
`)))
}
+func TestDefaultReplicator_Replicate(t *testing.T) {
+ t.Parallel()
+ testhelper.NewFeatureSets(featureflag.ReplicateRepositoryObjectPool).Run(t, testDefaultReplicatorReplicate)
+}
+
+func testDefaultReplicatorReplicate(t *testing.T, ctx context.Context) {
+ t.Parallel()
+
+ newGitalyConn := func(t *testing.T, config gitalycfg.Cfg) *grpc.ClientConn {
+ t.Helper()
+
+ conn, err := grpc.Dial(
+ config.SocketPath,
+ grpc.WithTransportCredentials(insecure.NewCredentials()),
+ grpc.WithPerRPCCredentials(gitalyauth.RPCCredentialsV2(config.Auth.Token)),
+ )
+ require.NoError(t, err)
+ t.Cleanup(func() { require.NoError(t, conn.Close()) })
+
+ return conn
+ }
+
+ const sourceStorage = "source"
+ const targetStorage = "target"
+
+ sourceCfg := testcfg.Build(t, testcfg.WithStorages(sourceStorage))
+ targetCfg := testcfg.Build(t, testcfg.WithStorages(targetStorage))
+
+ // Set up source and target Gitaly servers to simulate the Praefect replicator performing a
+ // replication job.
+ sourceAddr := testserver.RunGitalyServer(t, sourceCfg, setup.RegisterAll, testserver.WithDisablePraefect())
+ targetAddr := testserver.RunGitalyServer(t, targetCfg, setup.RegisterAll, testserver.WithDisablePraefect())
+
+ sourceCfg.SocketPath = sourceAddr
+ targetCfg.SocketPath = targetAddr
+
+ sourceConn := newGitalyConn(t, sourceCfg)
+ targetConn := newGitalyConn(t, targetCfg)
+
+ testcfg.BuildGitalySSH(t, targetCfg)
+
+ // For the repository to be replicated from the source Gitaly, the target Gitaly must know
+ // how to connect to the source. This is done by injecting Gitaly server information into
+ // the context.
+ ctx, err := storage.InjectGitalyServers(ctx, sourceStorage, sourceAddr, sourceCfg.Auth.Token)
+ require.NoError(t, err)
+
+ // Configure repository store operations to function as no-ops.
+ repoStore := datastore.MockRepositoryStore{
+ GetReplicatedGenerationFunc: func(context.Context, int64, string, string) (int, error) {
+ return 0, nil
+ },
+ SetGenerationFunc: func(context.Context, int64, string, string, int) error {
+ return nil
+ },
+ }
+
+ type setupData struct {
+ job datastore.ReplicationJob
+ expectedPool *gitalypb.ObjectPool
+ }
+
+ for _, tc := range []struct {
+ desc string
+ setup func(t *testing.T) setupData
+ }{
+ {
+ desc: "source does not have alternate link",
+ setup: func(t *testing.T) setupData {
+ // Create repository on source Gitaly that does not have a link to
+ // an object pool.
+ sourceRepo, _ := gittest.CreateRepository(t, ctx, sourceCfg)
+ return setupData{
+ job: datastore.ReplicationJob{
+ ReplicaPath: sourceRepo.RelativePath,
+ TargetNodeStorage: targetStorage,
+ SourceNodeStorage: sourceStorage,
+ },
+ expectedPool: nil,
+ }
+ },
+ },
+ {
+ desc: "source alternate link replicated to target",
+ setup: func(t *testing.T) setupData {
+ // Create repository on source Gitaly that is linked to object pool.
+ sourceRepo, _ := gittest.CreateRepository(t, ctx, sourceCfg)
+ sourcePool, _ := gittest.CreateObjectPool(t, ctx, sourceCfg, sourceRepo, gittest.CreateObjectPoolConfig{
+ LinkRepositoryToObjectPool: true,
+ })
+
+ // Create object pool on target Gitaly with the same relative path
+ // as the source object pool.
+ gittest.CreateRepository(t, ctx, targetCfg, gittest.CreateRepositoryConfig{
+ RelativePath: sourcePool.Repository.RelativePath,
+ })
+
+ return setupData{
+ job: datastore.ReplicationJob{
+ ReplicaPath: sourceRepo.RelativePath,
+ TargetNodeStorage: targetStorage,
+ SourceNodeStorage: sourceStorage,
+ },
+ expectedPool: &gitalypb.ObjectPool{
+ Repository: &gitalypb.Repository{
+ StorageName: targetStorage,
+ RelativePath: sourcePool.Repository.RelativePath,
+ },
+ },
+ }
+ },
+ },
+ {
+ desc: "target disconnected from alternate to match source",
+ setup: func(t *testing.T) setupData {
+ // Create repository on source Gitaly that does not have a link to
+ // an object pool.
+ sourceRepo, _ := gittest.CreateRepository(t, ctx, sourceCfg)
+
+ // Create repository on target Gitaly with the same relative path as
+ // the source repository and link it to an object pool.
+ targetRepo, _ := gittest.CreateRepository(t, ctx, targetCfg, gittest.CreateRepositoryConfig{
+ RelativePath: sourceRepo.RelativePath,
+ })
+ gittest.CreateObjectPool(t, ctx, targetCfg, targetRepo, gittest.CreateObjectPoolConfig{
+ LinkRepositoryToObjectPool: true,
+ })
+
+ return setupData{
+ job: datastore.ReplicationJob{
+ ReplicaPath: sourceRepo.RelativePath,
+ TargetNodeStorage: targetStorage,
+ SourceNodeStorage: sourceStorage,
+ },
+ expectedPool: nil,
+ }
+ },
+ },
+ } {
+ t.Run(tc.desc, func(t *testing.T) {
+ testSetup := tc.setup(t)
+
+ replicator := &defaultReplicator{
+ rs: repoStore,
+ log: testhelper.SharedLogger(t),
+ }
+ err := replicator.Replicate(ctx, datastore.ReplicationEvent{Job: testSetup.job}, sourceConn, targetConn)
+ require.NoError(t, err)
+
+ targetRepo := &gitalypb.Repository{
+ StorageName: targetStorage,
+ RelativePath: testSetup.job.ReplicaPath,
+ }
+
+ // After the replicator has completed, the target storage should have the repository.
+ targetRepoClient := gitalypb.NewRepositoryServiceClient(targetConn)
+ existsResp, err := targetRepoClient.RepositoryExists(ctx, &gitalypb.RepositoryExistsRequest{
+ Repository: targetRepo,
+ })
+ require.NoError(t, err)
+ require.True(t, existsResp.Exists)
+
+ // If the source repository has linked to an object pool, the target should
+ // also be linked to an object pool with the same relative path.
+ targetPoolClient := gitalypb.NewObjectPoolServiceClient(targetConn)
+ poolResp, err := targetPoolClient.GetObjectPool(ctx, &gitalypb.GetObjectPoolRequest{
+ Repository: targetRepo,
+ })
+ require.NoError(t, err)
+ require.Equal(t, testSetup.expectedPool, poolResp.GetObjectPool())
+ })
+ }
+}
+
func TestReplicatorDowngradeAttempt(t *testing.T) {
ctx := testhelper.Context(t)