diff options
author | John Cai <jcai@gitlab.com> | 2020-01-11 00:30:41 +0300 |
---|---|---|
committer | John Cai <jcai@gitlab.com> | 2020-01-11 00:30:41 +0300 |
commit | 7ccb4a0c918b5544653af59e85775a912352868e (patch) | |
tree | a5f358cae42728190ba966ba117dfcdcff513ca3 | |
parent | b44242703d8e1975bd2b64f09e816ea8349e8314 (diff) | |
parent | af1768a04e0a9e060a640f8a75f9960a9cf08f91 (diff) |
Merge branch 'jc-add-snapshot-repl' into 'master'
Add snapshot replication to ReplicateRepository
Closes #2197
See merge request gitlab-org/gitaly!1717
-rw-r--r-- | changelogs/unreleased/jc-add-snapshot-repl.yml | 5 | ||||
-rw-r--r-- | internal/praefect/replicator_test.go | 2 | ||||
-rw-r--r-- | internal/service/repository/replicate.go | 96 | ||||
-rw-r--r-- | internal/service/repository/replicate_test.go | 69 | ||||
-rw-r--r-- | internal/testhelper/testhelper.go | 11 |
5 files changed, 173 insertions, 10 deletions
diff --git a/changelogs/unreleased/jc-add-snapshot-repl.yml b/changelogs/unreleased/jc-add-snapshot-repl.yml new file mode 100644 index 000000000..720dbdc6e --- /dev/null +++ b/changelogs/unreleased/jc-add-snapshot-repl.yml @@ -0,0 +1,5 @@ +--- +title: Add snapshot replication to ReplicateRepository +merge_request: 1717 +author: +type: changed diff --git a/internal/praefect/replicator_test.go b/internal/praefect/replicator_test.go index bb9914cc4..b90381d74 100644 --- a/internal/praefect/replicator_test.go +++ b/internal/praefect/replicator_test.go @@ -141,7 +141,7 @@ func TestProcessReplicationJob(t *testing.T) { testhelper.MustRunCommand(t, nil, "git", "-C", replicatedPath, "cat-file", "-e", commitID) testhelper.MustRunCommand(t, nil, "git", "-C", replicatedPath, "gc") - require.Less(t, testhelper.GetGitObjectDirSize(t, replicatedPath), int64(100), "expect a small object directory") + require.Less(t, testhelper.GetGitPackfileDirSize(t, replicatedPath), int64(100), "expect a small pack directory") require.Equal(t, 1, mockReplicationGauge.IncsCalled()) require.Equal(t, 1, mockReplicationGauge.DecsCalled()) diff --git a/internal/service/repository/replicate.go b/internal/service/repository/replicate.go index 2f0069f0a..ff8bd422b 100644 --- a/internal/service/repository/replicate.go +++ b/internal/service/repository/replicate.go @@ -6,12 +6,16 @@ import ( "fmt" "io" "os" + "os/exec" "path/filepath" + "github.com/grpc-ecosystem/go-grpc-middleware/logging/logrus/ctxlogrus" "gitlab.com/gitlab-org/gitaly/client" + "gitlab.com/gitlab-org/gitaly/internal/command" "gitlab.com/gitlab-org/gitaly/internal/config" "gitlab.com/gitlab-org/gitaly/internal/helper" "gitlab.com/gitlab-org/gitaly/internal/safe" + "gitlab.com/gitlab-org/gitaly/internal/tempdir" "gitlab.com/gitlab-org/gitaly/proto/go/gitalypb" "gitlab.com/gitlab-org/gitaly/streamio" "golang.org/x/sync/errgroup" @@ -22,19 +26,27 @@ func (s *server) ReplicateRepository(ctx context.Context, in *gitalypb.Replicate return nil, helper.ErrInvalidArgument(err) } - if _, err := s.CreateRepository(ctx, &gitalypb.CreateRepositoryRequest{ - Repository: in.GetRepository(), - }); err != nil { + syncFuncs := []func(context.Context, *gitalypb.ReplicateRepositoryRequest) error{ + syncInfoAttributes, + } + + repoPath, err := helper.GetPath(in.GetRepository()) + if err != nil { return nil, helper.ErrInternal(err) } + if helper.IsGitDirectory(repoPath) { + syncFuncs = append(syncFuncs, syncRepository) + } else { + if err = s.create(ctx, in, repoPath); err != nil { + return nil, helper.ErrInternal(err) + } + } + g, ctx := errgroup.WithContext(ctx) outgoingCtx := helper.IncomingToOutgoing(ctx) - for _, f := range []func(context.Context, *gitalypb.ReplicateRepositoryRequest) error{ - syncRepository, - syncInfoAttributes, - } { + for _, f := range syncFuncs { f := f // rescoping f g.Go(func() error { return f(outgoingCtx, in) }) } @@ -66,6 +78,76 @@ func validateReplicateRepository(in *gitalypb.ReplicateRepositoryRequest) error return nil } +func (s *server) create(ctx context.Context, in *gitalypb.ReplicateRepositoryRequest, repoPath string) error { + // if the directory exists, remove it + if _, err := os.Stat(repoPath); err == nil { + tempDir, err := tempdir.ForDeleteAllRepositories(in.GetRepository().GetStorageName()) + if err != nil { + return err + } + + if err = os.Rename(repoPath, filepath.Join(tempDir, filepath.Base(repoPath))); err != nil { + return fmt.Errorf("error deleting invalid repo: %v", err) + } + + ctxlogrus.Extract(ctx).WithField("repo_path", repoPath).Warn("removed invalid repository") + } + + if err := s.createFromSnapshot(ctx, in); err != nil { + return fmt.Errorf("could not create repository from snapshot: %v", err) + } + + return nil +} + +func (s *server) createFromSnapshot(ctx context.Context, in *gitalypb.ReplicateRepositoryRequest) error { + tempRepo, tempPath, err := tempdir.NewAsRepository(ctx, in.GetRepository()) + if err != nil { + return err + } + + if _, err := s.CreateRepository(ctx, &gitalypb.CreateRepositoryRequest{ + Repository: tempRepo, + }); err != nil { + return err + } + + repoClient, err := newRepoClient(ctx, in.GetSource().GetStorageName()) + if err != nil { + return err + } + + stream, err := repoClient.GetSnapshot(ctx, &gitalypb.GetSnapshotRequest{Repository: in.GetSource()}) + if err != nil { + return err + } + + snapshotReader := streamio.NewReader(func() ([]byte, error) { + resp, err := stream.Recv() + return resp.GetData(), err + }) + + cmd, err := command.New(ctx, exec.Command("tar", "-C", tempPath, "-xvf", "-"), snapshotReader, nil, nil) + if err != nil { + return err + } + + if err = cmd.Wait(); err != nil { + return err + } + + targetPath, err := helper.GetPath(in.GetRepository()) + if err != nil { + return err + } + + if err := os.Rename(tempPath, targetPath); err != nil { + return err + } + + return nil +} + func syncRepository(ctx context.Context, in *gitalypb.ReplicateRepositoryRequest) error { remoteClient, err := newRemoteClient() if err != nil { diff --git a/internal/service/repository/replicate_test.go b/internal/service/repository/replicate_test.go index 547c872a9..2824f3f68 100644 --- a/internal/service/repository/replicate_test.go +++ b/internal/service/repository/replicate_test.go @@ -1,6 +1,7 @@ package repository_test import ( + "bytes" "fmt" "io/ioutil" "os" @@ -11,6 +12,7 @@ import ( "github.com/stretchr/testify/require" "gitlab.com/gitlab-org/gitaly/internal/config" "gitlab.com/gitlab-org/gitaly/internal/helper" + "gitlab.com/gitlab-org/gitaly/internal/helper/text" "gitlab.com/gitlab-org/gitaly/internal/service/repository" "gitlab.com/gitlab-org/gitaly/internal/testhelper" "gitlab.com/gitlab-org/gitaly/proto/go/gitalypb" @@ -46,6 +48,11 @@ func TestReplicateRepository(t *testing.T) { testRepo, testRepoPath, cleanupRepo := testhelper.NewTestRepo(t) defer cleanupRepo() + // create a loose object to ensure snapshot replication is used + blobData, err := text.RandomHex(10) + require.NoError(t, err) + blobID := text.ChompBytes(testhelper.MustRunCommand(t, bytes.NewBuffer([]byte(blobData)), "git", "-C", testRepoPath, "hash-object", "-w", "--stdin")) + config.Config.SocketPath = serverSocketPath repoClient, conn := repository.NewRepositoryClient(t, serverSocketPath) @@ -64,7 +71,7 @@ func TestReplicateRepository(t *testing.T) { md := testhelper.GitalyServersMetadata(t, serverSocketPath) injectedCtx := metadata.NewOutgoingContext(ctx, md) - _, err := repoClient.ReplicateRepository(injectedCtx, &gitalypb.ReplicateRepositoryRequest{ + _, err = repoClient.ReplicateRepository(injectedCtx, &gitalypb.ReplicateRepositoryRequest{ Repository: &targetRepo, Source: testRepo, }) @@ -91,6 +98,9 @@ func TestReplicateRepository(t *testing.T) { testhelper.MustRunCommand(t, nil, "git", "-C", testRepoPath, "show-ref", "--hash", "--verify", fmt.Sprintf("refs/heads/%s", anotherNewBranch)), testhelper.MustRunCommand(t, nil, "git", "-C", targetRepoPath, "show-ref", "--hash", "--verify", fmt.Sprintf("refs/heads/%s", anotherNewBranch)), ) + + // if an unreachable object has been replicated, that means snapshot replication was used + testhelper.MustRunCommand(t, nil, "git", "-C", targetRepoPath, "cat-file", "-p", string(blobID)) } func TestReplicateRepositoryInvalidArguments(t *testing.T) { @@ -181,3 +191,60 @@ func TestReplicateRepositoryInvalidArguments(t *testing.T) { }) } } + +func TestReplicateRepository_BadRepository(t *testing.T) { + tmpPath, cleanup := testhelper.TempDir(t, t.Name()) + defer cleanup() + + replicaPath := filepath.Join(tmpPath, "replica") + require.NoError(t, os.MkdirAll(replicaPath, 0755)) + + defer func(storages []config.Storage) { + config.Config.Storages = storages + }(config.Config.Storages) + + config.Config.Storages = []config.Storage{ + config.Storage{ + Name: "default", + Path: testhelper.GitlabTestStoragePath(), + }, + config.Storage{ + Name: "replica", + Path: replicaPath, + }, + } + + server, serverSocketPath := runFullServer(t) + defer server.Stop() + + testRepo, _, cleanupRepo := testhelper.NewTestRepo(t) + defer cleanupRepo() + + config.Config.SocketPath = serverSocketPath + + repoClient, conn := repository.NewRepositoryClient(t, serverSocketPath) + defer conn.Close() + + targetRepo := *testRepo + targetRepo.StorageName = "replica" + + targetRepoPath, err := helper.GetPath(&targetRepo) + require.NoError(t, err) + + require.NoError(t, os.MkdirAll(targetRepoPath, 0755)) + testhelper.MustRunCommand(t, nil, "touch", filepath.Join(targetRepoPath, "invalid_git_repo")) + + ctx, cancel := testhelper.Context() + defer cancel() + + md := testhelper.GitalyServersMetadata(t, serverSocketPath) + injectedCtx := metadata.NewOutgoingContext(ctx, md) + + _, err = repoClient.ReplicateRepository(injectedCtx, &gitalypb.ReplicateRepositoryRequest{ + Repository: &targetRepo, + Source: testRepo, + }) + require.NoError(t, err) + + testhelper.MustRunCommand(t, nil, "git", "-C", targetRepoPath, "fsck") +} diff --git a/internal/testhelper/testhelper.go b/internal/testhelper/testhelper.go index 1aedb76d5..af2673d85 100644 --- a/internal/testhelper/testhelper.go +++ b/internal/testhelper/testhelper.go @@ -566,7 +566,16 @@ type Cleanup func() // GetGitObjectDirSize gets the number of 1k blocks of a git object directory func GetGitObjectDirSize(t *testing.T, repoPath string) int64 { - cmd := exec.Command("du", "-s", "-k", filepath.Join(repoPath, "objects")) + return getGitDirSize(t, repoPath, "objects") +} + +// GetGitPackfileDirSize gets the number of 1k blocks of a git object directory +func GetGitPackfileDirSize(t *testing.T, repoPath string) int64 { + return getGitDirSize(t, repoPath, "objects", "pack") +} + +func getGitDirSize(t *testing.T, repoPath string, subdirs ...string) int64 { + cmd := exec.Command("du", "-s", "-k", filepath.Join(append([]string{repoPath}, subdirs...)...)) output, err := cmd.Output() require.NoError(t, err) if len(output) < 2 { |