diff options
author | John Cai <jcai@gitlab.com> | 2019-12-19 23:25:31 +0300 |
---|---|---|
committer | John Cai <jcai@gitlab.com> | 2020-01-06 22:01:58 +0300 |
commit | 855b495d44838d7d448c866625d493f53b554eb2 (patch) | |
tree | 9a3f648d61e8f33aacc718ce1a35b5c6369e560e | |
parent | 2bd3f9d895fb61db0253f0e320c40b2dd60c09c5 (diff) |
praefect replicator links object pool if it exists
if the source repository is linked to an object pool, the praefet
replicator will try to link it in on the replica.
-rw-r--r-- | changelogs/unreleased/jc-sync-pools-in-replicator.yml | 5 | ||||
-rw-r--r-- | internal/praefect/replicator.go | 30 | ||||
-rw-r--r-- | internal/praefect/replicator_test.go | 60 | ||||
-rw-r--r-- | internal/service/repository/clone_from_pool_internal_test.go | 18 | ||||
-rw-r--r-- | internal/service/repository/clone_from_pool_test.go | 2 | ||||
-rw-r--r-- | internal/testhelper/testhelper.go | 17 |
6 files changed, 103 insertions, 29 deletions
diff --git a/changelogs/unreleased/jc-sync-pools-in-replicator.yml b/changelogs/unreleased/jc-sync-pools-in-replicator.yml new file mode 100644 index 000000000..2e56a868c --- /dev/null +++ b/changelogs/unreleased/jc-sync-pools-in-replicator.yml @@ -0,0 +1,5 @@ +--- +title: praefect replicator links object pool if it exists +merge_request: 1718 +author: +type: changed diff --git a/internal/praefect/replicator.go b/internal/praefect/replicator.go index f99e97018..03b76ffa9 100644 --- a/internal/praefect/replicator.go +++ b/internal/praefect/replicator.go @@ -41,16 +41,40 @@ func (dr defaultReplicator) Replicate(ctx context.Context, job datastore.ReplJob RelativePath: job.Repository.RelativePath, } - repositoryClient := gitalypb.NewRepositoryServiceClient(targetCC) + targetRepositoryClient := gitalypb.NewRepositoryServiceClient(targetCC) - if _, err := repositoryClient.ReplicateRepository(ctx, &gitalypb.ReplicateRepositoryRequest{ + if _, err := targetRepositoryClient.ReplicateRepository(ctx, &gitalypb.ReplicateRepositoryRequest{ Source: sourceRepository, Repository: targetRepository, }); err != nil { return fmt.Errorf("failed to create repository: %v", err) } - checksumsMatch, err := dr.confirmChecksums(ctx, gitalypb.NewRepositoryServiceClient(sourceCC), repositoryClient, sourceRepository, targetRepository) + // check if the repository has an object pool + sourceObjectPoolClient := gitalypb.NewObjectPoolServiceClient(sourceCC) + + resp, err := sourceObjectPoolClient.GetObjectPool(ctx, &gitalypb.GetObjectPoolRequest{ + Repository: sourceRepository, + }) + if err != nil { + return err + } + + sourceObjectPool := resp.GetObjectPool() + + if sourceObjectPool != nil { + targetObjectPoolClient := gitalypb.NewObjectPoolServiceClient(targetCC) + targetObjectPool := *sourceObjectPool + targetObjectPool.GetRepository().StorageName = targetRepository.GetStorageName() + if _, err := targetObjectPoolClient.LinkRepositoryToObjectPool(ctx, &gitalypb.LinkRepositoryToObjectPoolRequest{ + ObjectPool: &targetObjectPool, + Repository: targetRepository, + }); err != nil { + return err + } + } + + checksumsMatch, err := dr.confirmChecksums(ctx, gitalypb.NewRepositoryServiceClient(sourceCC), targetRepositoryClient, sourceRepository, targetRepository) if err != nil { return err } diff --git a/internal/praefect/replicator_test.go b/internal/praefect/replicator_test.go index fcb8d64ac..bb9914cc4 100644 --- a/internal/praefect/replicator_test.go +++ b/internal/praefect/replicator_test.go @@ -11,6 +11,7 @@ import ( "github.com/stretchr/testify/require" gitalyauth "gitlab.com/gitlab-org/gitaly/auth" gitaly_config "gitlab.com/gitlab-org/gitaly/internal/config" + "gitlab.com/gitlab-org/gitaly/internal/git/objectpool" gitaly_log "gitlab.com/gitlab-org/gitaly/internal/log" "gitlab.com/gitlab-org/gitaly/internal/praefect/config" "gitlab.com/gitlab-org/gitaly/internal/praefect/conn" @@ -22,9 +23,10 @@ import ( "gitlab.com/gitlab-org/gitaly/internal/testhelper/promtest" "gitlab.com/gitlab-org/gitaly/proto/go/gitalypb" "google.golang.org/grpc" + "google.golang.org/grpc/metadata" ) -func TestProceessReplicationJob(t *testing.T) { +func TestProcessReplicationJob(t *testing.T) { srv, srvSocketPath := runFullGitalyServer(t) defer srv.Stop() @@ -77,8 +79,37 @@ func TestProceessReplicationJob(t *testing.T) { ds := datastore.NewInMemory(config) - ds.SetPrimary(testRepo.GetRelativePath(), "default") - ds.AddReplica(testRepo.GetRelativePath(), backupStorageName) + require.NoError(t, ds.SetPrimary(testRepo.GetRelativePath(), "default")) + require.NoError(t, ds.AddReplica(testRepo.GetRelativePath(), backupStorageName)) + + // create object pool on the source + objectPoolPath := testhelper.NewTestObjectPoolName(t) + pool, err := objectpool.NewObjectPool(testRepo.GetStorageName(), objectPoolPath) + require.NoError(t, err) + + poolCtx, cancel := testhelper.Context() + defer cancel() + + require.NoError(t, pool.Create(poolCtx, testRepo)) + require.NoError(t, pool.Link(poolCtx, testRepo)) + + // replicate object pool repository to target node + targetObjectPoolRepo := *pool.ToProto().GetRepository() + targetObjectPoolRepo.StorageName = "backup" + + ctx, cancel := testhelper.Context() + defer cancel() + + injectedCtx := metadata.NewOutgoingContext(ctx, testhelper.GitalyServersMetadata(t, srvSocketPath)) + + repoClient, con := newRepositoryClient(t, srvSocketPath) + defer con.Close() + + _, err = repoClient.ReplicateRepository(injectedCtx, &gitalypb.ReplicateRepositoryRequest{ + Repository: &targetObjectPoolRepo, + Source: pool.ToProto().GetRepository(), + }) + require.NoError(t, err) _, err = ds.CreateReplicaReplJobs(testRepo.GetRelativePath(), datastore.UpdateRepo) require.NoError(t, err) @@ -87,9 +118,6 @@ func TestProceessReplicationJob(t *testing.T) { require.NoError(t, err) require.Len(t, jobs, 1) - ctx, cancel := testhelper.Context() - defer cancel() - commitID := testhelper.CreateCommit(t, testRepoPath, "master", &testhelper.CreateCommitOpts{ Message: "a commit", }) @@ -98,8 +126,8 @@ func TestProceessReplicationJob(t *testing.T) { replicator.log = gitaly_log.Default() clientCC := conn.NewClientConnections() - clientCC.RegisterNode("default", srvSocketPath, gitaly_config.Config.Auth.Token) - clientCC.RegisterNode("backup", srvSocketPath, gitaly_config.Config.Auth.Token) + require.NoError(t, clientCC.RegisterNode("default", srvSocketPath, gitaly_config.Config.Auth.Token)) + require.NoError(t, clientCC.RegisterNode("backup", srvSocketPath, gitaly_config.Config.Auth.Token)) var mockReplicationGauge promtest.MockGauge var mockReplicationHistogram promtest.MockHistogram @@ -110,7 +138,10 @@ func TestProceessReplicationJob(t *testing.T) { replMgr.processReplJob(ctx, jobs[0]) replicatedPath := filepath.Join(backupDir, filepath.Base(testRepoPath)) + testhelper.MustRunCommand(t, nil, "git", "-C", replicatedPath, "cat-file", "-e", commitID) + testhelper.MustRunCommand(t, nil, "git", "-C", replicatedPath, "gc") + require.Less(t, testhelper.GetGitObjectDirSize(t, replicatedPath), int64(100), "expect a small object directory") require.Equal(t, 1, mockReplicationGauge.IncsCalled()) require.Equal(t, 1, mockReplicationGauge.DecsCalled()) @@ -171,6 +202,19 @@ func runFullGitalyServer(t *testing.T) (*grpc.Server, string) { return server, "unix://" + serverSocketPath } +func newRepositoryClient(t *testing.T, serverSocketPath string) (gitalypb.RepositoryServiceClient, *grpc.ClientConn) { + connOpts := []grpc.DialOption{ + grpc.WithInsecure(), + grpc.WithPerRPCCredentials(gitalyauth.RPCCredentials(testhelper.RepositoryAuthToken)), + } + conn, err := grpc.Dial(serverSocketPath, connOpts...) + if err != nil { + t.Fatal(err) + } + + return gitalypb.NewRepositoryServiceClient(conn), conn +} + var RubyServer = &rubyserver.Server{} func TestMain(m *testing.M) { diff --git a/internal/service/repository/clone_from_pool_internal_test.go b/internal/service/repository/clone_from_pool_internal_test.go index 4de108c08..7d4fbc649 100644 --- a/internal/service/repository/clone_from_pool_internal_test.go +++ b/internal/service/repository/clone_from_pool_internal_test.go @@ -4,8 +4,6 @@ import ( "fmt" "os" "path/filepath" - "strconv" - "strings" "testing" "github.com/stretchr/testify/assert" @@ -38,20 +36,6 @@ func getForkDestination(t *testing.T) (*gitalypb.Repository, string, func()) { return forkedRepo, forkRepoPath, func() { os.RemoveAll(forkRepoPath) } } -// getGitObjectDirSize gets the number of 1k blocks of a git object directory -func getGitObjectDirSize(t *testing.T, repoPath string) int64 { - output := testhelper.MustRunCommand(t, nil, "du", "-s", "-k", filepath.Join(repoPath, "objects")) - if len(output) < 2 { - t.Error("invalid output of du -s -k") - } - - outputSplit := strings.SplitN(string(output), "\t", 2) - blocks, err := strconv.ParseInt(outputSplit[0], 10, 64) - require.NoError(t, err) - - return blocks -} - func TestCloneFromPoolInternal(t *testing.T) { server, serverSocketPath := runFullServer(t) defer server.Stop() @@ -92,7 +76,7 @@ func TestCloneFromPoolInternal(t *testing.T) { _, err := client.CloneFromPoolInternal(ctx, req) require.NoError(t, err) - assert.True(t, getGitObjectDirSize(t, forkRepoPath) < 100) + assert.True(t, testhelper.GetGitObjectDirSize(t, forkRepoPath) < 100) isLinked, err := pool.LinkedToRepository(testRepo) require.NoError(t, err) diff --git a/internal/service/repository/clone_from_pool_test.go b/internal/service/repository/clone_from_pool_test.go index 759a5a437..86731b7af 100644 --- a/internal/service/repository/clone_from_pool_test.go +++ b/internal/service/repository/clone_from_pool_test.go @@ -66,7 +66,7 @@ func TestCloneFromPoolHTTP(t *testing.T) { require.NoError(t, err) require.True(t, isLinked, "repository is not linked to the pool repository") - assert.True(t, getGitObjectDirSize(t, forkRepoPath) < 100, "expect a small object directory size") + assert.True(t, testhelper.GetGitObjectDirSize(t, forkRepoPath) < 100, "expect a small object directory size") // feature is a branch known to exist in the source repository. By looking it up in the target // we establish that the target has branches, even though (as we saw above) it has no objects. diff --git a/internal/testhelper/testhelper.go b/internal/testhelper/testhelper.go index c97dcfc3b..1aedb76d5 100644 --- a/internal/testhelper/testhelper.go +++ b/internal/testhelper/testhelper.go @@ -14,6 +14,7 @@ import ( "path" "path/filepath" "runtime" + "strconv" "strings" "syscall" "testing" @@ -562,3 +563,19 @@ func gitObjectExists(t testing.TB, repoPath, sha string, exists bool) { // Cleanup functions should be called in a defer statement // immediately after they are returned from a test helper type Cleanup func() + +// GetGitObjectDirSize gets the number of 1k blocks of a git object directory +func GetGitObjectDirSize(t *testing.T, repoPath string) int64 { + cmd := exec.Command("du", "-s", "-k", filepath.Join(repoPath, "objects")) + output, err := cmd.Output() + require.NoError(t, err) + if len(output) < 2 { + t.Error("invalid output of du -s -k") + } + + outputSplit := strings.SplitN(string(output), "\t", 2) + blocks, err := strconv.ParseInt(outputSplit[0], 10, 64) + require.NoError(t, err) + + return blocks +} |