Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJohn Cai <jcai@gitlab.com>2020-01-11 00:30:41 +0300
committerJohn Cai <jcai@gitlab.com>2020-01-11 00:30:41 +0300
commit7ccb4a0c918b5544653af59e85775a912352868e (patch)
treea5f358cae42728190ba966ba117dfcdcff513ca3
parentb44242703d8e1975bd2b64f09e816ea8349e8314 (diff)
parentaf1768a04e0a9e060a640f8a75f9960a9cf08f91 (diff)
Merge branch 'jc-add-snapshot-repl' into 'master'
Add snapshot replication to ReplicateRepository Closes #2197 See merge request gitlab-org/gitaly!1717
-rw-r--r--changelogs/unreleased/jc-add-snapshot-repl.yml5
-rw-r--r--internal/praefect/replicator_test.go2
-rw-r--r--internal/service/repository/replicate.go96
-rw-r--r--internal/service/repository/replicate_test.go69
-rw-r--r--internal/testhelper/testhelper.go11
5 files changed, 173 insertions, 10 deletions
diff --git a/changelogs/unreleased/jc-add-snapshot-repl.yml b/changelogs/unreleased/jc-add-snapshot-repl.yml
new file mode 100644
index 000000000..720dbdc6e
--- /dev/null
+++ b/changelogs/unreleased/jc-add-snapshot-repl.yml
@@ -0,0 +1,5 @@
+---
+title: Add snapshot replication to ReplicateRepository
+merge_request: 1717
+author:
+type: changed
diff --git a/internal/praefect/replicator_test.go b/internal/praefect/replicator_test.go
index bb9914cc4..b90381d74 100644
--- a/internal/praefect/replicator_test.go
+++ b/internal/praefect/replicator_test.go
@@ -141,7 +141,7 @@ func TestProcessReplicationJob(t *testing.T) {
testhelper.MustRunCommand(t, nil, "git", "-C", replicatedPath, "cat-file", "-e", commitID)
testhelper.MustRunCommand(t, nil, "git", "-C", replicatedPath, "gc")
- require.Less(t, testhelper.GetGitObjectDirSize(t, replicatedPath), int64(100), "expect a small object directory")
+ require.Less(t, testhelper.GetGitPackfileDirSize(t, replicatedPath), int64(100), "expect a small pack directory")
require.Equal(t, 1, mockReplicationGauge.IncsCalled())
require.Equal(t, 1, mockReplicationGauge.DecsCalled())
diff --git a/internal/service/repository/replicate.go b/internal/service/repository/replicate.go
index 2f0069f0a..ff8bd422b 100644
--- a/internal/service/repository/replicate.go
+++ b/internal/service/repository/replicate.go
@@ -6,12 +6,16 @@ import (
"fmt"
"io"
"os"
+ "os/exec"
"path/filepath"
+ "github.com/grpc-ecosystem/go-grpc-middleware/logging/logrus/ctxlogrus"
"gitlab.com/gitlab-org/gitaly/client"
+ "gitlab.com/gitlab-org/gitaly/internal/command"
"gitlab.com/gitlab-org/gitaly/internal/config"
"gitlab.com/gitlab-org/gitaly/internal/helper"
"gitlab.com/gitlab-org/gitaly/internal/safe"
+ "gitlab.com/gitlab-org/gitaly/internal/tempdir"
"gitlab.com/gitlab-org/gitaly/proto/go/gitalypb"
"gitlab.com/gitlab-org/gitaly/streamio"
"golang.org/x/sync/errgroup"
@@ -22,19 +26,27 @@ func (s *server) ReplicateRepository(ctx context.Context, in *gitalypb.Replicate
return nil, helper.ErrInvalidArgument(err)
}
- if _, err := s.CreateRepository(ctx, &gitalypb.CreateRepositoryRequest{
- Repository: in.GetRepository(),
- }); err != nil {
+ syncFuncs := []func(context.Context, *gitalypb.ReplicateRepositoryRequest) error{
+ syncInfoAttributes,
+ }
+
+ repoPath, err := helper.GetPath(in.GetRepository())
+ if err != nil {
return nil, helper.ErrInternal(err)
}
+ if helper.IsGitDirectory(repoPath) {
+ syncFuncs = append(syncFuncs, syncRepository)
+ } else {
+ if err = s.create(ctx, in, repoPath); err != nil {
+ return nil, helper.ErrInternal(err)
+ }
+ }
+
g, ctx := errgroup.WithContext(ctx)
outgoingCtx := helper.IncomingToOutgoing(ctx)
- for _, f := range []func(context.Context, *gitalypb.ReplicateRepositoryRequest) error{
- syncRepository,
- syncInfoAttributes,
- } {
+ for _, f := range syncFuncs {
f := f // rescoping f
g.Go(func() error { return f(outgoingCtx, in) })
}
@@ -66,6 +78,76 @@ func validateReplicateRepository(in *gitalypb.ReplicateRepositoryRequest) error
return nil
}
+func (s *server) create(ctx context.Context, in *gitalypb.ReplicateRepositoryRequest, repoPath string) error {
+ // if the directory exists, remove it
+ if _, err := os.Stat(repoPath); err == nil {
+ tempDir, err := tempdir.ForDeleteAllRepositories(in.GetRepository().GetStorageName())
+ if err != nil {
+ return err
+ }
+
+ if err = os.Rename(repoPath, filepath.Join(tempDir, filepath.Base(repoPath))); err != nil {
+ return fmt.Errorf("error deleting invalid repo: %v", err)
+ }
+
+ ctxlogrus.Extract(ctx).WithField("repo_path", repoPath).Warn("removed invalid repository")
+ }
+
+ if err := s.createFromSnapshot(ctx, in); err != nil {
+ return fmt.Errorf("could not create repository from snapshot: %v", err)
+ }
+
+ return nil
+}
+
+func (s *server) createFromSnapshot(ctx context.Context, in *gitalypb.ReplicateRepositoryRequest) error {
+ tempRepo, tempPath, err := tempdir.NewAsRepository(ctx, in.GetRepository())
+ if err != nil {
+ return err
+ }
+
+ if _, err := s.CreateRepository(ctx, &gitalypb.CreateRepositoryRequest{
+ Repository: tempRepo,
+ }); err != nil {
+ return err
+ }
+
+ repoClient, err := newRepoClient(ctx, in.GetSource().GetStorageName())
+ if err != nil {
+ return err
+ }
+
+ stream, err := repoClient.GetSnapshot(ctx, &gitalypb.GetSnapshotRequest{Repository: in.GetSource()})
+ if err != nil {
+ return err
+ }
+
+ snapshotReader := streamio.NewReader(func() ([]byte, error) {
+ resp, err := stream.Recv()
+ return resp.GetData(), err
+ })
+
+ cmd, err := command.New(ctx, exec.Command("tar", "-C", tempPath, "-xvf", "-"), snapshotReader, nil, nil)
+ if err != nil {
+ return err
+ }
+
+ if err = cmd.Wait(); err != nil {
+ return err
+ }
+
+ targetPath, err := helper.GetPath(in.GetRepository())
+ if err != nil {
+ return err
+ }
+
+ if err := os.Rename(tempPath, targetPath); err != nil {
+ return err
+ }
+
+ return nil
+}
+
func syncRepository(ctx context.Context, in *gitalypb.ReplicateRepositoryRequest) error {
remoteClient, err := newRemoteClient()
if err != nil {
diff --git a/internal/service/repository/replicate_test.go b/internal/service/repository/replicate_test.go
index 547c872a9..2824f3f68 100644
--- a/internal/service/repository/replicate_test.go
+++ b/internal/service/repository/replicate_test.go
@@ -1,6 +1,7 @@
package repository_test
import (
+ "bytes"
"fmt"
"io/ioutil"
"os"
@@ -11,6 +12,7 @@ import (
"github.com/stretchr/testify/require"
"gitlab.com/gitlab-org/gitaly/internal/config"
"gitlab.com/gitlab-org/gitaly/internal/helper"
+ "gitlab.com/gitlab-org/gitaly/internal/helper/text"
"gitlab.com/gitlab-org/gitaly/internal/service/repository"
"gitlab.com/gitlab-org/gitaly/internal/testhelper"
"gitlab.com/gitlab-org/gitaly/proto/go/gitalypb"
@@ -46,6 +48,11 @@ func TestReplicateRepository(t *testing.T) {
testRepo, testRepoPath, cleanupRepo := testhelper.NewTestRepo(t)
defer cleanupRepo()
+ // create a loose object to ensure snapshot replication is used
+ blobData, err := text.RandomHex(10)
+ require.NoError(t, err)
+ blobID := text.ChompBytes(testhelper.MustRunCommand(t, bytes.NewBuffer([]byte(blobData)), "git", "-C", testRepoPath, "hash-object", "-w", "--stdin"))
+
config.Config.SocketPath = serverSocketPath
repoClient, conn := repository.NewRepositoryClient(t, serverSocketPath)
@@ -64,7 +71,7 @@ func TestReplicateRepository(t *testing.T) {
md := testhelper.GitalyServersMetadata(t, serverSocketPath)
injectedCtx := metadata.NewOutgoingContext(ctx, md)
- _, err := repoClient.ReplicateRepository(injectedCtx, &gitalypb.ReplicateRepositoryRequest{
+ _, err = repoClient.ReplicateRepository(injectedCtx, &gitalypb.ReplicateRepositoryRequest{
Repository: &targetRepo,
Source: testRepo,
})
@@ -91,6 +98,9 @@ func TestReplicateRepository(t *testing.T) {
testhelper.MustRunCommand(t, nil, "git", "-C", testRepoPath, "show-ref", "--hash", "--verify", fmt.Sprintf("refs/heads/%s", anotherNewBranch)),
testhelper.MustRunCommand(t, nil, "git", "-C", targetRepoPath, "show-ref", "--hash", "--verify", fmt.Sprintf("refs/heads/%s", anotherNewBranch)),
)
+
+ // if an unreachable object has been replicated, that means snapshot replication was used
+ testhelper.MustRunCommand(t, nil, "git", "-C", targetRepoPath, "cat-file", "-p", string(blobID))
}
func TestReplicateRepositoryInvalidArguments(t *testing.T) {
@@ -181,3 +191,60 @@ func TestReplicateRepositoryInvalidArguments(t *testing.T) {
})
}
}
+
+func TestReplicateRepository_BadRepository(t *testing.T) {
+ tmpPath, cleanup := testhelper.TempDir(t, t.Name())
+ defer cleanup()
+
+ replicaPath := filepath.Join(tmpPath, "replica")
+ require.NoError(t, os.MkdirAll(replicaPath, 0755))
+
+ defer func(storages []config.Storage) {
+ config.Config.Storages = storages
+ }(config.Config.Storages)
+
+ config.Config.Storages = []config.Storage{
+ config.Storage{
+ Name: "default",
+ Path: testhelper.GitlabTestStoragePath(),
+ },
+ config.Storage{
+ Name: "replica",
+ Path: replicaPath,
+ },
+ }
+
+ server, serverSocketPath := runFullServer(t)
+ defer server.Stop()
+
+ testRepo, _, cleanupRepo := testhelper.NewTestRepo(t)
+ defer cleanupRepo()
+
+ config.Config.SocketPath = serverSocketPath
+
+ repoClient, conn := repository.NewRepositoryClient(t, serverSocketPath)
+ defer conn.Close()
+
+ targetRepo := *testRepo
+ targetRepo.StorageName = "replica"
+
+ targetRepoPath, err := helper.GetPath(&targetRepo)
+ require.NoError(t, err)
+
+ require.NoError(t, os.MkdirAll(targetRepoPath, 0755))
+ testhelper.MustRunCommand(t, nil, "touch", filepath.Join(targetRepoPath, "invalid_git_repo"))
+
+ ctx, cancel := testhelper.Context()
+ defer cancel()
+
+ md := testhelper.GitalyServersMetadata(t, serverSocketPath)
+ injectedCtx := metadata.NewOutgoingContext(ctx, md)
+
+ _, err = repoClient.ReplicateRepository(injectedCtx, &gitalypb.ReplicateRepositoryRequest{
+ Repository: &targetRepo,
+ Source: testRepo,
+ })
+ require.NoError(t, err)
+
+ testhelper.MustRunCommand(t, nil, "git", "-C", targetRepoPath, "fsck")
+}
diff --git a/internal/testhelper/testhelper.go b/internal/testhelper/testhelper.go
index 1aedb76d5..af2673d85 100644
--- a/internal/testhelper/testhelper.go
+++ b/internal/testhelper/testhelper.go
@@ -566,7 +566,16 @@ type Cleanup func()
// GetGitObjectDirSize gets the number of 1k blocks of a git object directory
func GetGitObjectDirSize(t *testing.T, repoPath string) int64 {
- cmd := exec.Command("du", "-s", "-k", filepath.Join(repoPath, "objects"))
+ return getGitDirSize(t, repoPath, "objects")
+}
+
+// GetGitPackfileDirSize gets the number of 1k blocks of a git object directory
+func GetGitPackfileDirSize(t *testing.T, repoPath string) int64 {
+ return getGitDirSize(t, repoPath, "objects", "pack")
+}
+
+func getGitDirSize(t *testing.T, repoPath string, subdirs ...string) int64 {
+ cmd := exec.Command("du", "-s", "-k", filepath.Join(append([]string{repoPath}, subdirs...)...))
output, err := cmd.Output()
require.NoError(t, err)
if len(output) < 2 {