diff options
author | Will Chandler <wchandler@gitlab.com> | 2023-11-10 17:35:10 +0300 |
---|---|---|
committer | Will Chandler <wchandler@gitlab.com> | 2023-11-10 17:35:10 +0300 |
commit | ccd0e9dea30d69a7746b0d7243dc94b6e699fe47 (patch) | |
tree | 6e4dec63f52702e0268a8e40f7879a78a72be13a | |
parent | dd40a6fb11606e47078851e7cde4b4715b054211 (diff) | |
parent | d33d508716f3f212a52a0266f81e9070513401eb (diff) |
Merge branch 'jt-praefect-replicator-pool-disconnect' into 'master'
praefect: Support object pool disconnection in Praefect replicator
See merge request https://gitlab.com/gitlab-org/gitaly/-/merge_requests/6512
Merged-by: Will Chandler <wchandler@gitlab.com>
Approved-by: Sami Hiltunen <shiltunen@gitlab.com>
Approved-by: Will Chandler <wchandler@gitlab.com>
Reviewed-by: Sami Hiltunen <shiltunen@gitlab.com>
Co-authored-by: Justin Tobler <jtobler@gitlab.com>
-rw-r--r-- | internal/praefect/replicator.go | 15 | ||||
-rw-r--r-- | internal/praefect/replicator_test.go | 175 |
2 files changed, 188 insertions, 2 deletions
diff --git a/internal/praefect/replicator.go b/internal/praefect/replicator.go index 42b3eebee..b19d17481 100644 --- a/internal/praefect/replicator.go +++ b/internal/praefect/replicator.go @@ -102,8 +102,19 @@ func (dr defaultReplicator) Replicate(ctx context.Context, event datastore.Repli sourceObjectPool := resp.GetObjectPool() - if sourceObjectPool != nil { - targetObjectPoolClient := gitalypb.NewObjectPoolServiceClient(targetCC) + targetObjectPoolClient := gitalypb.NewObjectPoolServiceClient(targetCC) + + if sourceObjectPool == nil { + // If the source repository is not linked to an object pool, the target repository + // should also not be linked to any object pool to ensure consistency. + if _, err := targetObjectPoolClient.DisconnectGitAlternates(ctx, &gitalypb.DisconnectGitAlternatesRequest{ + Repository: targetRepository, + }); err != nil { + return err + } + } else { + // If the source repository is linked to an object pool, the target repository + // should link to the same object pool. targetObjectPool := proto.Clone(sourceObjectPool).(*gitalypb.ObjectPool) targetObjectPool.GetRepository().StorageName = targetRepository.GetStorageName() if _, err := targetObjectPoolClient.LinkRepositoryToObjectPool(ctx, &gitalypb.LinkRepositoryToObjectPoolRequest{ diff --git a/internal/praefect/replicator_test.go b/internal/praefect/replicator_test.go index 9b1e0651e..369064ee6 100644 --- a/internal/praefect/replicator_test.go +++ b/internal/praefect/replicator_test.go @@ -18,6 +18,7 @@ import ( "gitlab.com/gitlab-org/gitaly/v16/internal/git/gittest" gitalycfg "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/config" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/service/setup" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage" "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/protoregistry" "gitlab.com/gitlab-org/gitaly/v16/internal/helper" "gitlab.com/gitlab-org/gitaly/v16/internal/log" @@ -200,6 +201,180 @@ gitaly_praefect_replication_jobs{change_type="update",gitaly_storage="backup",vi `))) } +func TestDefaultReplicator_Replicate(t *testing.T) { + t.Parallel() + testhelper.NewFeatureSets(featureflag.ReplicateRepositoryObjectPool).Run(t, testDefaultReplicatorReplicate) +} + +func testDefaultReplicatorReplicate(t *testing.T, ctx context.Context) { + t.Parallel() + + newGitalyConn := func(t *testing.T, config gitalycfg.Cfg) *grpc.ClientConn { + t.Helper() + + conn, err := grpc.Dial( + config.SocketPath, + grpc.WithTransportCredentials(insecure.NewCredentials()), + grpc.WithPerRPCCredentials(gitalyauth.RPCCredentialsV2(config.Auth.Token)), + ) + require.NoError(t, err) + t.Cleanup(func() { require.NoError(t, conn.Close()) }) + + return conn + } + + const sourceStorage = "source" + const targetStorage = "target" + + sourceCfg := testcfg.Build(t, testcfg.WithStorages(sourceStorage)) + targetCfg := testcfg.Build(t, testcfg.WithStorages(targetStorage)) + + // Set up source and target Gitaly servers to simulate the Praefect replicator performing a + // replication job. + sourceAddr := testserver.RunGitalyServer(t, sourceCfg, setup.RegisterAll, testserver.WithDisablePraefect()) + targetAddr := testserver.RunGitalyServer(t, targetCfg, setup.RegisterAll, testserver.WithDisablePraefect()) + + sourceCfg.SocketPath = sourceAddr + targetCfg.SocketPath = targetAddr + + sourceConn := newGitalyConn(t, sourceCfg) + targetConn := newGitalyConn(t, targetCfg) + + testcfg.BuildGitalySSH(t, targetCfg) + + // For the repository to be replicated from the source Gitaly, the target Gitaly must know + // how to connect to the source. This is done by injecting Gitaly server information into + // the context. + ctx, err := storage.InjectGitalyServers(ctx, sourceStorage, sourceAddr, sourceCfg.Auth.Token) + require.NoError(t, err) + + // Configure repository store operations to function as no-ops. + repoStore := datastore.MockRepositoryStore{ + GetReplicatedGenerationFunc: func(context.Context, int64, string, string) (int, error) { + return 0, nil + }, + SetGenerationFunc: func(context.Context, int64, string, string, int) error { + return nil + }, + } + + type setupData struct { + job datastore.ReplicationJob + expectedPool *gitalypb.ObjectPool + } + + for _, tc := range []struct { + desc string + setup func(t *testing.T) setupData + }{ + { + desc: "source does not have alternate link", + setup: func(t *testing.T) setupData { + // Create repository on source Gitaly that does not have a link to + // an object pool. + sourceRepo, _ := gittest.CreateRepository(t, ctx, sourceCfg) + return setupData{ + job: datastore.ReplicationJob{ + ReplicaPath: sourceRepo.RelativePath, + TargetNodeStorage: targetStorage, + SourceNodeStorage: sourceStorage, + }, + expectedPool: nil, + } + }, + }, + { + desc: "source alternate link replicated to target", + setup: func(t *testing.T) setupData { + // Create repository on source Gitaly that is linked to object pool. + sourceRepo, _ := gittest.CreateRepository(t, ctx, sourceCfg) + sourcePool, _ := gittest.CreateObjectPool(t, ctx, sourceCfg, sourceRepo, gittest.CreateObjectPoolConfig{ + LinkRepositoryToObjectPool: true, + }) + + // Create object pool on target Gitaly with the same relative path + // as the source object pool. + gittest.CreateRepository(t, ctx, targetCfg, gittest.CreateRepositoryConfig{ + RelativePath: sourcePool.Repository.RelativePath, + }) + + return setupData{ + job: datastore.ReplicationJob{ + ReplicaPath: sourceRepo.RelativePath, + TargetNodeStorage: targetStorage, + SourceNodeStorage: sourceStorage, + }, + expectedPool: &gitalypb.ObjectPool{ + Repository: &gitalypb.Repository{ + StorageName: targetStorage, + RelativePath: sourcePool.Repository.RelativePath, + }, + }, + } + }, + }, + { + desc: "target disconnected from alternate to match source", + setup: func(t *testing.T) setupData { + // Create repository on source Gitaly that does not have a link to + // an object pool. + sourceRepo, _ := gittest.CreateRepository(t, ctx, sourceCfg) + + // Create repository on target Gitaly with the same relative path as + // the source repository and link it to an object pool. + targetRepo, _ := gittest.CreateRepository(t, ctx, targetCfg, gittest.CreateRepositoryConfig{ + RelativePath: sourceRepo.RelativePath, + }) + gittest.CreateObjectPool(t, ctx, targetCfg, targetRepo, gittest.CreateObjectPoolConfig{ + LinkRepositoryToObjectPool: true, + }) + + return setupData{ + job: datastore.ReplicationJob{ + ReplicaPath: sourceRepo.RelativePath, + TargetNodeStorage: targetStorage, + SourceNodeStorage: sourceStorage, + }, + expectedPool: nil, + } + }, + }, + } { + t.Run(tc.desc, func(t *testing.T) { + testSetup := tc.setup(t) + + replicator := &defaultReplicator{ + rs: repoStore, + log: testhelper.SharedLogger(t), + } + err := replicator.Replicate(ctx, datastore.ReplicationEvent{Job: testSetup.job}, sourceConn, targetConn) + require.NoError(t, err) + + targetRepo := &gitalypb.Repository{ + StorageName: targetStorage, + RelativePath: testSetup.job.ReplicaPath, + } + + // After the replicator has completed, the target storage should have the repository. + targetRepoClient := gitalypb.NewRepositoryServiceClient(targetConn) + existsResp, err := targetRepoClient.RepositoryExists(ctx, &gitalypb.RepositoryExistsRequest{ + Repository: targetRepo, + }) + require.NoError(t, err) + require.True(t, existsResp.Exists) + + // If the source repository has linked to an object pool, the target should + // also be linked to an object pool with the same relative path. + targetPoolClient := gitalypb.NewObjectPoolServiceClient(targetConn) + poolResp, err := targetPoolClient.GetObjectPool(ctx, &gitalypb.GetObjectPoolRequest{ + Repository: targetRepo, + }) + require.NoError(t, err) + require.Equal(t, testSetup.expectedPool, poolResp.GetObjectPool()) + }) + } +} + func TestReplicatorDowngradeAttempt(t *testing.T) { ctx := testhelper.Context(t) |