Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJohn Cai <jcai@gitlab.com>2019-11-08 02:44:58 +0300
committerJohn Cai <jcai@gitlab.com>2019-12-06 21:53:30 +0300
commitfbc07e1f94f3de086319252faae5f3c18878adc6 (patch)
tree08e154df3a80952ced5d9cf31c510cdf7efad930
parent5fae4483a6a832ce558c43f7d1867cd7062c7065 (diff)
ReplicateRepository RPC
-rw-r--r--changelogs/unreleased/jc-replicate-repository-impl.yml5
-rw-r--r--internal/service/repository/fetch_test.go10
-rw-r--r--internal/service/repository/replicate.go78
-rw-r--r--internal/service/repository/replicate_test.go175
-rw-r--r--internal/service/repository/testhelper_test.go1
5 files changed, 266 insertions, 3 deletions
diff --git a/changelogs/unreleased/jc-replicate-repository-impl.yml b/changelogs/unreleased/jc-replicate-repository-impl.yml
new file mode 100644
index 000000000..cad99c69b
--- /dev/null
+++ b/changelogs/unreleased/jc-replicate-repository-impl.yml
@@ -0,0 +1,5 @@
+---
+title: ReplicateRepository RPC
+merge_request: 1605
+author:
+type: added
diff --git a/internal/service/repository/fetch_test.go b/internal/service/repository/fetch_test.go
index af3e2ff53..1939dd633 100644
--- a/internal/service/repository/fetch_test.go
+++ b/internal/service/repository/fetch_test.go
@@ -6,6 +6,7 @@ import (
"testing"
"github.com/stretchr/testify/require"
+ "gitlab.com/gitlab-org/gitaly/internal/config"
gitLog "gitlab.com/gitlab-org/gitaly/internal/git/log"
"gitlab.com/gitlab-org/gitaly/internal/helper"
serverPkg "gitlab.com/gitlab-org/gitaly/internal/server"
@@ -187,10 +188,13 @@ func runFullServer(t *testing.T) (*grpc.Server, string) {
serverSocketPath := testhelper.GetTemporaryGitalySocketFileName()
listener, err := net.Listen("unix", serverSocketPath)
- if err != nil {
- t.Fatal(err)
- }
+ require.NoError(t, err)
+
+ //listen on internal socket
+ internalListener, err := net.Listen("unix", config.GitalyInternalSocketPath())
+ require.NoError(t, err)
+ go server.Serve(internalListener)
go server.Serve(listener)
return server, "unix://" + serverSocketPath
diff --git a/internal/service/repository/replicate.go b/internal/service/repository/replicate.go
new file mode 100644
index 000000000..ede8a213f
--- /dev/null
+++ b/internal/service/repository/replicate.go
@@ -0,0 +1,78 @@
+package repository
+
+import (
+ "context"
+ "errors"
+ "fmt"
+
+ "gitlab.com/gitlab-org/gitaly/client"
+ "gitlab.com/gitlab-org/gitaly/internal/config"
+ "gitlab.com/gitlab-org/gitaly/internal/helper"
+ "gitlab.com/gitlab-org/gitaly/proto/go/gitalypb"
+)
+
+func (s *server) ReplicateRepository(ctx context.Context, in *gitalypb.ReplicateRepositoryRequest) (*gitalypb.ReplicateRepositoryResponse, error) {
+ if err := validateReplicateRepository(in); err != nil {
+ return nil, helper.ErrInvalidArgument(err)
+ }
+
+ if _, err := s.CreateRepository(ctx, &gitalypb.CreateRepositoryRequest{
+ Repository: in.GetRepository(),
+ }); err != nil {
+ return nil, helper.ErrInternal(err)
+ }
+
+ outCtx := helper.IncomingToOutgoing(ctx)
+
+ if err := syncRepository(outCtx, in); err != nil {
+ return nil, helper.ErrInternal(err)
+ }
+
+ return &gitalypb.ReplicateRepositoryResponse{}, nil
+}
+
+func validateReplicateRepository(in *gitalypb.ReplicateRepositoryRequest) error {
+ if in.GetRepository() == nil {
+ return errors.New("repository cannot be empty")
+ }
+
+ if in.GetSource() == nil {
+ return errors.New("source repository cannot be empty")
+ }
+
+ if in.GetRepository().GetRelativePath() != in.GetSource().GetRelativePath() {
+ return errors.New("both source and repository should have the same relative path")
+ }
+
+ if in.GetRepository().GetStorageName() == in.GetSource().GetStorageName() {
+ return errors.New("repository and source have the same storage")
+ }
+
+ return nil
+}
+
+func syncRepository(ctx context.Context, in *gitalypb.ReplicateRepositoryRequest) error {
+ remoteClient, err := newRemoteClient()
+ if err != nil {
+ return err
+ }
+
+ if _, err = remoteClient.FetchInternalRemote(ctx, &gitalypb.FetchInternalRemoteRequest{
+ Repository: in.GetRepository(),
+ RemoteRepository: in.GetSource(),
+ }); err != nil {
+ return err
+ }
+
+ return nil
+}
+
+// newRemoteClient creates a new RemoteClient that talks to the same gitaly server
+func newRemoteClient() (gitalypb.RemoteServiceClient, error) {
+ conn, err := client.Dial(fmt.Sprintf("unix:%s", config.GitalyInternalSocketPath()), nil)
+ if err != nil {
+ return nil, fmt.Errorf("could not dial source: %v", err)
+ }
+
+ return gitalypb.NewRemoteServiceClient(conn), nil
+}
diff --git a/internal/service/repository/replicate_test.go b/internal/service/repository/replicate_test.go
new file mode 100644
index 000000000..b48975a50
--- /dev/null
+++ b/internal/service/repository/replicate_test.go
@@ -0,0 +1,175 @@
+package repository_test
+
+import (
+ "fmt"
+ "os"
+ "path/filepath"
+ "testing"
+
+ "github.com/stretchr/testify/require"
+ "gitlab.com/gitlab-org/gitaly/internal/config"
+ "gitlab.com/gitlab-org/gitaly/internal/helper"
+ "gitlab.com/gitlab-org/gitaly/internal/service/repository"
+ "gitlab.com/gitlab-org/gitaly/internal/testhelper"
+ "gitlab.com/gitlab-org/gitaly/proto/go/gitalypb"
+ "google.golang.org/grpc/codes"
+ "google.golang.org/grpc/metadata"
+)
+
+func TestReplicateRepository(t *testing.T) {
+ tmpPath, cleanup := testhelper.TempDir(t, testhelper.GitlabTestStoragePath(), t.Name())
+ defer cleanup()
+
+ replicaPath := filepath.Join(tmpPath, "replica")
+ require.NoError(t, os.MkdirAll(replicaPath, 0755))
+
+ defer func(storages []config.Storage) {
+ config.Config.Storages = storages
+ }(config.Config.Storages)
+
+ config.Config.Storages = []config.Storage{
+ config.Storage{
+ Name: "default",
+ Path: testhelper.GitlabTestStoragePath(),
+ },
+ config.Storage{
+ Name: "replica",
+ Path: replicaPath,
+ },
+ }
+
+ server, serverSocketPath := runFullServer(t)
+ defer server.Stop()
+
+ testRepo, testRepoPath, cleanupRepo := testhelper.NewTestRepo(t)
+ defer cleanupRepo()
+
+ config.Config.SocketPath = serverSocketPath
+
+ repoClient, conn := repository.NewRepositoryClient(t, serverSocketPath)
+ defer conn.Close()
+
+ targetRepo := *testRepo
+ targetRepo.StorageName = "replica"
+
+ ctx, cancel := testhelper.Context()
+ defer cancel()
+ md := testhelper.GitalyServersMetadata(t, serverSocketPath)
+ injectedCtx := metadata.NewOutgoingContext(ctx, md)
+
+ _, err := repoClient.ReplicateRepository(injectedCtx, &gitalypb.ReplicateRepositoryRequest{
+ Repository: &targetRepo,
+ Source: testRepo,
+ })
+ require.NoError(t, err)
+
+ targetRepoPath, err := helper.GetRepoPath(&targetRepo)
+ require.NoError(t, err)
+
+ testhelper.MustRunCommand(t, nil, "git", "-C", targetRepoPath, "fsck")
+
+ sourceRefs := testhelper.GetRepositoryRefs(t, testRepoPath)
+ targetRefs := testhelper.GetRepositoryRefs(t, targetRepoPath)
+ require.Equal(t, sourceRefs, targetRefs)
+
+ // create another branch
+ _, anotherNewBranch := testhelper.CreateCommitOnNewBranch(t, testRepoPath)
+ _, err = repoClient.ReplicateRepository(injectedCtx, &gitalypb.ReplicateRepositoryRequest{
+ Repository: &targetRepo,
+ Source: testRepo,
+ })
+ require.NoError(t, err)
+ require.Equal(t,
+ testhelper.MustRunCommand(t, nil, "git", "-C", testRepoPath, "show-ref", "--hash", "--verify", fmt.Sprintf("refs/heads/%s", anotherNewBranch)),
+ testhelper.MustRunCommand(t, nil, "git", "-C", targetRepoPath, "show-ref", "--hash", "--verify", fmt.Sprintf("refs/heads/%s", anotherNewBranch)),
+ )
+}
+
+func TestReplicateRepositoryInvalidArguments(t *testing.T) {
+ testCases := []struct {
+ description string
+ input *gitalypb.ReplicateRepositoryRequest
+ expectedError string
+ }{
+ {
+ description: "everything ✅",
+ input: &gitalypb.ReplicateRepositoryRequest{
+ Repository: &gitalypb.Repository{
+ StorageName: "praefect-internal-0",
+ RelativePath: "/ab/cd/abcdef1234",
+ },
+ Source: &gitalypb.Repository{
+ StorageName: "praefect-internal-1",
+ RelativePath: "/ab/cd/abcdef1234",
+ },
+ },
+ expectedError: "",
+ },
+ {
+ description: "empty repository",
+ input: &gitalypb.ReplicateRepositoryRequest{
+ Repository: nil,
+ Source: &gitalypb.Repository{
+ StorageName: "praefect-internal-1",
+ RelativePath: "/ab/cd/abcdef1234",
+ },
+ },
+ expectedError: "repository cannot be empty",
+ },
+ {
+ description: "empty source",
+ input: &gitalypb.ReplicateRepositoryRequest{
+ Repository: &gitalypb.Repository{
+ StorageName: "praefect-internal-0",
+ RelativePath: "/ab/cd/abcdef1234",
+ },
+ Source: nil,
+ },
+ expectedError: "repository cannot be empty",
+ },
+ {
+ description: "source and repository have different relative paths",
+ input: &gitalypb.ReplicateRepositoryRequest{
+ Repository: &gitalypb.Repository{
+ StorageName: "praefect-internal-0",
+ RelativePath: "/ab/cd/abcdef1234",
+ },
+ Source: &gitalypb.Repository{
+ StorageName: "praefect-internal-1",
+ RelativePath: "/ab/cd/abcdef4321",
+ },
+ },
+ expectedError: "both source and repository should have the same relative path",
+ },
+ {
+ description: "source and repository have the same storage",
+ input: &gitalypb.ReplicateRepositoryRequest{
+ Repository: &gitalypb.Repository{
+ StorageName: "praefect-internal-0",
+ RelativePath: "/ab/cd/abcdef1234",
+ },
+ Source: &gitalypb.Repository{
+ StorageName: "praefect-internal-0",
+ RelativePath: "/ab/cd/abcdef1234",
+ },
+ },
+ expectedError: "repository and source have the same storage",
+ },
+ }
+
+ server, serverSocketPath := repository.RunRepoServer(t)
+ defer server.Stop()
+
+ client, conn := repository.NewRepositoryClient(t, serverSocketPath)
+ defer conn.Close()
+
+ ctx, cancel := testhelper.Context()
+ defer cancel()
+
+ for _, tc := range testCases {
+ t.Run(tc.description, func(t *testing.T) {
+ _, err := client.ReplicateRepository(ctx, tc.input)
+ testhelper.RequireGrpcError(t, err, codes.InvalidArgument)
+ })
+ }
+}
diff --git a/internal/service/repository/testhelper_test.go b/internal/service/repository/testhelper_test.go
index e9be51311..4c3600da8 100644
--- a/internal/service/repository/testhelper_test.go
+++ b/internal/service/repository/testhelper_test.go
@@ -42,6 +42,7 @@ func newRepositoryClient(t *testing.T, serverSocketPath string) (gitalypb.Reposi
}
var NewRepositoryClient = newRepositoryClient
+var RunRepoServer = runRepoServer
func runRepoServer(t *testing.T) (*grpc.Server, string) {
streamInt := []grpc.StreamServerInterceptor{auth.StreamServerInterceptor(config.Config.Auth)}