diff options
author | John Cai <jcai@gitlab.com> | 2019-11-08 02:44:58 +0300 |
---|---|---|
committer | John Cai <jcai@gitlab.com> | 2019-12-06 21:53:30 +0300 |
commit | fbc07e1f94f3de086319252faae5f3c18878adc6 (patch) | |
tree | 08e154df3a80952ced5d9cf31c510cdf7efad930 | |
parent | 5fae4483a6a832ce558c43f7d1867cd7062c7065 (diff) |
ReplicateRepository RPC
-rw-r--r-- | changelogs/unreleased/jc-replicate-repository-impl.yml | 5 | ||||
-rw-r--r-- | internal/service/repository/fetch_test.go | 10 | ||||
-rw-r--r-- | internal/service/repository/replicate.go | 78 | ||||
-rw-r--r-- | internal/service/repository/replicate_test.go | 175 | ||||
-rw-r--r-- | internal/service/repository/testhelper_test.go | 1 |
5 files changed, 266 insertions, 3 deletions
diff --git a/changelogs/unreleased/jc-replicate-repository-impl.yml b/changelogs/unreleased/jc-replicate-repository-impl.yml new file mode 100644 index 000000000..cad99c69b --- /dev/null +++ b/changelogs/unreleased/jc-replicate-repository-impl.yml @@ -0,0 +1,5 @@ +--- +title: ReplicateRepository RPC +merge_request: 1605 +author: +type: added diff --git a/internal/service/repository/fetch_test.go b/internal/service/repository/fetch_test.go index af3e2ff53..1939dd633 100644 --- a/internal/service/repository/fetch_test.go +++ b/internal/service/repository/fetch_test.go @@ -6,6 +6,7 @@ import ( "testing" "github.com/stretchr/testify/require" + "gitlab.com/gitlab-org/gitaly/internal/config" gitLog "gitlab.com/gitlab-org/gitaly/internal/git/log" "gitlab.com/gitlab-org/gitaly/internal/helper" serverPkg "gitlab.com/gitlab-org/gitaly/internal/server" @@ -187,10 +188,13 @@ func runFullServer(t *testing.T) (*grpc.Server, string) { serverSocketPath := testhelper.GetTemporaryGitalySocketFileName() listener, err := net.Listen("unix", serverSocketPath) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) + + //listen on internal socket + internalListener, err := net.Listen("unix", config.GitalyInternalSocketPath()) + require.NoError(t, err) + go server.Serve(internalListener) go server.Serve(listener) return server, "unix://" + serverSocketPath diff --git a/internal/service/repository/replicate.go b/internal/service/repository/replicate.go new file mode 100644 index 000000000..ede8a213f --- /dev/null +++ b/internal/service/repository/replicate.go @@ -0,0 +1,78 @@ +package repository + +import ( + "context" + "errors" + "fmt" + + "gitlab.com/gitlab-org/gitaly/client" + "gitlab.com/gitlab-org/gitaly/internal/config" + "gitlab.com/gitlab-org/gitaly/internal/helper" + "gitlab.com/gitlab-org/gitaly/proto/go/gitalypb" +) + +func (s *server) ReplicateRepository(ctx context.Context, in *gitalypb.ReplicateRepositoryRequest) (*gitalypb.ReplicateRepositoryResponse, error) { + if err := validateReplicateRepository(in); err != nil { + return nil, helper.ErrInvalidArgument(err) + } + + if _, err := s.CreateRepository(ctx, &gitalypb.CreateRepositoryRequest{ + Repository: in.GetRepository(), + }); err != nil { + return nil, helper.ErrInternal(err) + } + + outCtx := helper.IncomingToOutgoing(ctx) + + if err := syncRepository(outCtx, in); err != nil { + return nil, helper.ErrInternal(err) + } + + return &gitalypb.ReplicateRepositoryResponse{}, nil +} + +func validateReplicateRepository(in *gitalypb.ReplicateRepositoryRequest) error { + if in.GetRepository() == nil { + return errors.New("repository cannot be empty") + } + + if in.GetSource() == nil { + return errors.New("source repository cannot be empty") + } + + if in.GetRepository().GetRelativePath() != in.GetSource().GetRelativePath() { + return errors.New("both source and repository should have the same relative path") + } + + if in.GetRepository().GetStorageName() == in.GetSource().GetStorageName() { + return errors.New("repository and source have the same storage") + } + + return nil +} + +func syncRepository(ctx context.Context, in *gitalypb.ReplicateRepositoryRequest) error { + remoteClient, err := newRemoteClient() + if err != nil { + return err + } + + if _, err = remoteClient.FetchInternalRemote(ctx, &gitalypb.FetchInternalRemoteRequest{ + Repository: in.GetRepository(), + RemoteRepository: in.GetSource(), + }); err != nil { + return err + } + + return nil +} + +// newRemoteClient creates a new RemoteClient that talks to the same gitaly server +func newRemoteClient() (gitalypb.RemoteServiceClient, error) { + conn, err := client.Dial(fmt.Sprintf("unix:%s", config.GitalyInternalSocketPath()), nil) + if err != nil { + return nil, fmt.Errorf("could not dial source: %v", err) + } + + return gitalypb.NewRemoteServiceClient(conn), nil +} diff --git a/internal/service/repository/replicate_test.go b/internal/service/repository/replicate_test.go new file mode 100644 index 000000000..b48975a50 --- /dev/null +++ b/internal/service/repository/replicate_test.go @@ -0,0 +1,175 @@ +package repository_test + +import ( + "fmt" + "os" + "path/filepath" + "testing" + + "github.com/stretchr/testify/require" + "gitlab.com/gitlab-org/gitaly/internal/config" + "gitlab.com/gitlab-org/gitaly/internal/helper" + "gitlab.com/gitlab-org/gitaly/internal/service/repository" + "gitlab.com/gitlab-org/gitaly/internal/testhelper" + "gitlab.com/gitlab-org/gitaly/proto/go/gitalypb" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/metadata" +) + +func TestReplicateRepository(t *testing.T) { + tmpPath, cleanup := testhelper.TempDir(t, testhelper.GitlabTestStoragePath(), t.Name()) + defer cleanup() + + replicaPath := filepath.Join(tmpPath, "replica") + require.NoError(t, os.MkdirAll(replicaPath, 0755)) + + defer func(storages []config.Storage) { + config.Config.Storages = storages + }(config.Config.Storages) + + config.Config.Storages = []config.Storage{ + config.Storage{ + Name: "default", + Path: testhelper.GitlabTestStoragePath(), + }, + config.Storage{ + Name: "replica", + Path: replicaPath, + }, + } + + server, serverSocketPath := runFullServer(t) + defer server.Stop() + + testRepo, testRepoPath, cleanupRepo := testhelper.NewTestRepo(t) + defer cleanupRepo() + + config.Config.SocketPath = serverSocketPath + + repoClient, conn := repository.NewRepositoryClient(t, serverSocketPath) + defer conn.Close() + + targetRepo := *testRepo + targetRepo.StorageName = "replica" + + ctx, cancel := testhelper.Context() + defer cancel() + md := testhelper.GitalyServersMetadata(t, serverSocketPath) + injectedCtx := metadata.NewOutgoingContext(ctx, md) + + _, err := repoClient.ReplicateRepository(injectedCtx, &gitalypb.ReplicateRepositoryRequest{ + Repository: &targetRepo, + Source: testRepo, + }) + require.NoError(t, err) + + targetRepoPath, err := helper.GetRepoPath(&targetRepo) + require.NoError(t, err) + + testhelper.MustRunCommand(t, nil, "git", "-C", targetRepoPath, "fsck") + + sourceRefs := testhelper.GetRepositoryRefs(t, testRepoPath) + targetRefs := testhelper.GetRepositoryRefs(t, targetRepoPath) + require.Equal(t, sourceRefs, targetRefs) + + // create another branch + _, anotherNewBranch := testhelper.CreateCommitOnNewBranch(t, testRepoPath) + _, err = repoClient.ReplicateRepository(injectedCtx, &gitalypb.ReplicateRepositoryRequest{ + Repository: &targetRepo, + Source: testRepo, + }) + require.NoError(t, err) + require.Equal(t, + testhelper.MustRunCommand(t, nil, "git", "-C", testRepoPath, "show-ref", "--hash", "--verify", fmt.Sprintf("refs/heads/%s", anotherNewBranch)), + testhelper.MustRunCommand(t, nil, "git", "-C", targetRepoPath, "show-ref", "--hash", "--verify", fmt.Sprintf("refs/heads/%s", anotherNewBranch)), + ) +} + +func TestReplicateRepositoryInvalidArguments(t *testing.T) { + testCases := []struct { + description string + input *gitalypb.ReplicateRepositoryRequest + expectedError string + }{ + { + description: "everything ✅", + input: &gitalypb.ReplicateRepositoryRequest{ + Repository: &gitalypb.Repository{ + StorageName: "praefect-internal-0", + RelativePath: "/ab/cd/abcdef1234", + }, + Source: &gitalypb.Repository{ + StorageName: "praefect-internal-1", + RelativePath: "/ab/cd/abcdef1234", + }, + }, + expectedError: "", + }, + { + description: "empty repository", + input: &gitalypb.ReplicateRepositoryRequest{ + Repository: nil, + Source: &gitalypb.Repository{ + StorageName: "praefect-internal-1", + RelativePath: "/ab/cd/abcdef1234", + }, + }, + expectedError: "repository cannot be empty", + }, + { + description: "empty source", + input: &gitalypb.ReplicateRepositoryRequest{ + Repository: &gitalypb.Repository{ + StorageName: "praefect-internal-0", + RelativePath: "/ab/cd/abcdef1234", + }, + Source: nil, + }, + expectedError: "repository cannot be empty", + }, + { + description: "source and repository have different relative paths", + input: &gitalypb.ReplicateRepositoryRequest{ + Repository: &gitalypb.Repository{ + StorageName: "praefect-internal-0", + RelativePath: "/ab/cd/abcdef1234", + }, + Source: &gitalypb.Repository{ + StorageName: "praefect-internal-1", + RelativePath: "/ab/cd/abcdef4321", + }, + }, + expectedError: "both source and repository should have the same relative path", + }, + { + description: "source and repository have the same storage", + input: &gitalypb.ReplicateRepositoryRequest{ + Repository: &gitalypb.Repository{ + StorageName: "praefect-internal-0", + RelativePath: "/ab/cd/abcdef1234", + }, + Source: &gitalypb.Repository{ + StorageName: "praefect-internal-0", + RelativePath: "/ab/cd/abcdef1234", + }, + }, + expectedError: "repository and source have the same storage", + }, + } + + server, serverSocketPath := repository.RunRepoServer(t) + defer server.Stop() + + client, conn := repository.NewRepositoryClient(t, serverSocketPath) + defer conn.Close() + + ctx, cancel := testhelper.Context() + defer cancel() + + for _, tc := range testCases { + t.Run(tc.description, func(t *testing.T) { + _, err := client.ReplicateRepository(ctx, tc.input) + testhelper.RequireGrpcError(t, err, codes.InvalidArgument) + }) + } +} diff --git a/internal/service/repository/testhelper_test.go b/internal/service/repository/testhelper_test.go index e9be51311..4c3600da8 100644 --- a/internal/service/repository/testhelper_test.go +++ b/internal/service/repository/testhelper_test.go @@ -42,6 +42,7 @@ func newRepositoryClient(t *testing.T, serverSocketPath string) (gitalypb.Reposi } var NewRepositoryClient = newRepositoryClient +var RunRepoServer = runRepoServer func runRepoServer(t *testing.T) (*grpc.Server, string) { streamInt := []grpc.StreamServerInterceptor{auth.StreamServerInterceptor(config.Config.Auth)} |