Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJohn Cai <jcai@gitlab.com>2019-11-08 02:44:58 +0300
committerJohn Cai <jcai@gitlab.com>2019-11-14 05:35:21 +0300
commit3529554f5b6bae1f479748e23f294ce5983c002c (patch)
tree60ab7dd0ec0fbeeafc15076bad0bc3039107662d
parent3a11abf729ab992ff1467b266ddd120725882116 (diff)
-rw-r--r--changelogs/unreleased/jc-replicate-repository-impl.yml5
-rw-r--r--internal/service/repository/replicate.go231
-rw-r--r--internal/service/repository/replicate_test.go106
3 files changed, 342 insertions, 0 deletions
diff --git a/changelogs/unreleased/jc-replicate-repository-impl.yml b/changelogs/unreleased/jc-replicate-repository-impl.yml
new file mode 100644
index 000000000..cad99c69b
--- /dev/null
+++ b/changelogs/unreleased/jc-replicate-repository-impl.yml
@@ -0,0 +1,5 @@
+---
+title: ReplicateRepository RPC
+merge_request: 1605
+author:
+type: added
diff --git a/internal/service/repository/replicate.go b/internal/service/repository/replicate.go
new file mode 100644
index 000000000..29b4106a7
--- /dev/null
+++ b/internal/service/repository/replicate.go
@@ -0,0 +1,231 @@
+package repository
+
+import (
+ "context"
+ "io"
+ "os"
+ "path/filepath"
+
+ "golang.org/x/sync/errgroup"
+ "google.golang.org/grpc"
+
+ gitalyauth "gitlab.com/gitlab-org/gitaly/auth"
+ "gitlab.com/gitlab-org/gitaly/internal/config"
+ "gitlab.com/gitlab-org/gitaly/internal/git/objectpool"
+ "gitlab.com/gitlab-org/gitaly/internal/helper"
+ "gitlab.com/gitlab-org/gitaly/internal/tempdir"
+ "gitlab.com/gitlab-org/gitaly/proto/go/gitalypb"
+ "gitlab.com/gitlab-org/gitaly/streamio"
+)
+
+func (s *server) ReplicateRepository(ctx context.Context, in *gitalypb.ReplicateRepositoryRequest) (*gitalypb.ReplicateRepositoryResponse, error) {
+ if _, err := s.CreateRepository(ctx, &gitalypb.CreateRepositoryRequest{
+ Repository: in.GetRepository(),
+ }); err != nil {
+ return nil, helper.ErrInternal(err)
+ }
+
+ g, ctx := errgroup.WithContext(ctx)
+ outgoingCtx := helper.IncomingToOutgoing(ctx)
+
+ for _, f := range []func(context.Context, *gitalypb.ReplicateRepositoryRequest) error{
+ syncRepository,
+ syncInfoAttributes,
+ s.syncObjectPool,
+ } {
+ f := f // rescoping f
+ g.Go(func() error { return f(outgoingCtx, in) })
+ }
+
+ if err := g.Wait(); err != nil {
+ return nil, helper.ErrInternal(err)
+ }
+
+ return &gitalypb.ReplicateRepositoryResponse{}, nil
+}
+
+func syncRepository(ctx context.Context, in *gitalypb.ReplicateRepositoryRequest) error {
+ remoteClient, err := newRemoteClient()
+ if err != nil {
+ return err
+ }
+
+ if _, err = remoteClient.FetchInternalRemote(ctx, &gitalypb.FetchInternalRemoteRequest{
+ Repository: in.GetRepository(),
+ RemoteRepository: in.GetSource(),
+ }); err != nil {
+ return helper.ErrInternal(err)
+ }
+
+ return nil
+}
+
+func syncInfoAttributes(ctx context.Context, in *gitalypb.ReplicateRepositoryRequest) error {
+ repoClient, err := newRepoClient(ctx, in.GetSource().GetStorageName())
+ if err != nil {
+ return err
+ }
+
+ repoPath, err := helper.GetRepoPath(in.GetRepository())
+ if err != nil {
+ return helper.ErrInternal(err)
+ }
+
+ infoPath := filepath.Join(repoPath, "info")
+ attributesPath := filepath.Join(infoPath, "attributes")
+
+ if err := os.MkdirAll(infoPath, 0755); err != nil {
+ return helper.ErrInternal(err)
+ }
+
+ tmpDir, err := tempdir.New(ctx, in.GetRepository())
+ if err != nil {
+ return helper.ErrInternal(err)
+ }
+
+ attributesFile, err := os.Create(filepath.Join(tmpDir, "attributes"))
+ if err != nil {
+ return helper.ErrInternal(err)
+ }
+ defer attributesFile.Close()
+
+ stream, err := repoClient.GetInfoAttributes(ctx, &gitalypb.GetInfoAttributesRequest{
+ Repository: in.GetSource(),
+ })
+ if err != nil {
+ return helper.ErrInternal(err)
+ }
+
+ if _, err := io.Copy(attributesFile, streamio.NewReader(func() ([]byte, error) {
+ resp, err := stream.Recv()
+ return resp.GetAttributes(), err
+ })); err != nil {
+ return helper.ErrInternal(err)
+ }
+
+ if err := os.Chmod(attributesFile.Name(), attributesFileMode); err != nil {
+ return helper.ErrInternal(err)
+ }
+
+ if err = os.Rename(attributesFile.Name(), attributesPath); err != nil {
+ return helper.ErrInternal(err)
+ }
+
+ return nil
+}
+
+func copyObjectPoolProto(objectPoolProto *gitalypb.ObjectPool) *gitalypb.ObjectPool {
+ objectPoolProtoCp := *objectPoolProto
+ objectPoolRepoCp := *objectPoolProto.Repository
+ objectPoolProtoCp.Repository = &objectPoolRepoCp
+
+ return &objectPoolProtoCp
+}
+
+func (s *server) syncObjectPool(ctx context.Context, in *gitalypb.ReplicateRepositoryRequest) error {
+ objectPoolClient, err := newObjectPoolClient(ctx, in.GetSource().GetStorageName())
+ if err != nil {
+ return err
+ }
+
+ resp, err := objectPoolClient.GetObjectPool(ctx, &gitalypb.GetObjectPoolRequest{
+ Repository: in.GetSource(),
+ })
+ if err != nil {
+ return err
+ }
+
+ sourceObjectPoolProto := resp.GetObjectPool()
+ if sourceObjectPoolProto == nil {
+ return nil
+ }
+
+ targetObjectPoolProto := copyObjectPoolProto(sourceObjectPoolProto)
+ targetObjectPoolProto.Repository.StorageName = in.GetRepository().GetStorageName()
+
+ targetObjectPool, err := objectpool.FromProto(targetObjectPoolProto)
+ if err != nil {
+ return err
+ }
+
+ if !targetObjectPool.Exists() {
+ if _, err := s.CreateRepository(ctx, &gitalypb.CreateRepositoryRequest{Repository: targetObjectPoolProto.GetRepository()}); err != nil {
+ return err
+ }
+
+ remoteClient, err := newRemoteClient()
+
+ if err != nil {
+ return err
+ }
+
+ poolRepository := targetObjectPoolProto.GetRepository()
+
+ if _, err := remoteClient.FetchInternalRemote(ctx, &gitalypb.FetchInternalRemoteRequest{
+ Repository: poolRepository,
+ RemoteRepository: sourceObjectPoolProto.GetRepository(),
+ }); err != nil {
+ return err
+ }
+ }
+
+ return targetObjectPool.Link(ctx, in.GetRepository())
+}
+
+// newRemoteClient creates a new RemoteClient that talks to the same gitaly server
+func newRemoteClient() (gitalypb.RemoteServiceClient, error) {
+ connOpts := []grpc.DialOption{
+ grpc.WithInsecure(),
+ grpc.WithPerRPCCredentials(gitalyauth.RPCCredentials(config.Config.Auth.Token)),
+ }
+ conn, err := grpc.Dial(config.Config.SocketPath, connOpts...)
+ if err != nil {
+ return nil, helper.ErrInternalf("could not dial source: %v", err)
+ }
+
+ return gitalypb.NewRemoteServiceClient(conn), nil
+}
+
+// newRepoClient creates a new RepositoryClient that talks to the gitaly of the source repository
+func newRepoClient(ctx context.Context, storageName string) (gitalypb.RepositoryServiceClient, error) {
+ conn, err := newClientConnection(ctx, storageName)
+ if err != nil {
+ return nil, err
+ }
+
+ return gitalypb.NewRepositoryServiceClient(conn), nil
+}
+
+// newObjectPoolClient creates a new RepositoryClient that talks to the gitaly of the source repository
+func newObjectPoolClient(ctx context.Context, storageName string) (gitalypb.ObjectPoolServiceClient, error) {
+ conn, err := newClientConnection(ctx, storageName)
+ if err != nil {
+ return nil, err
+ }
+
+ return gitalypb.NewObjectPoolServiceClient(conn), nil
+}
+
+func newClientConnection(ctx context.Context, storageName string) (*grpc.ClientConn, error) {
+ gitalyServersInfo, err := helper.ExtractGitalyServers(ctx)
+ if err != nil {
+ return nil, helper.ErrInternal(err)
+ }
+
+ sourceRepositoryStorageInfo, ok := gitalyServersInfo[storageName]
+ if !ok {
+ return nil, helper.ErrInternalf("gitaly server info for %s not found", storageName)
+ }
+
+ connOpts := []grpc.DialOption{
+ grpc.WithInsecure(),
+ grpc.WithPerRPCCredentials(gitalyauth.RPCCredentials(sourceRepositoryStorageInfo["token"])),
+ }
+
+ conn, err := grpc.Dial(sourceRepositoryStorageInfo["address"], connOpts...)
+ if err != nil {
+ return nil, helper.ErrInternalf("could not dial source: %v", err)
+ }
+
+ return conn, nil
+}
diff --git a/internal/service/repository/replicate_test.go b/internal/service/repository/replicate_test.go
new file mode 100644
index 000000000..e2686d7a8
--- /dev/null
+++ b/internal/service/repository/replicate_test.go
@@ -0,0 +1,106 @@
+package repository_test
+
+import (
+ "fmt"
+ "io/ioutil"
+ "os"
+ "path"
+ "path/filepath"
+ "testing"
+
+ "github.com/stretchr/testify/require"
+ gitalyauth "gitlab.com/gitlab-org/gitaly/auth"
+ "gitlab.com/gitlab-org/gitaly/internal/config"
+ "gitlab.com/gitlab-org/gitaly/internal/git/objectpool"
+ "gitlab.com/gitlab-org/gitaly/internal/helper"
+ "gitlab.com/gitlab-org/gitaly/internal/testhelper"
+ "gitlab.com/gitlab-org/gitaly/proto/go/gitalypb"
+ "google.golang.org/grpc"
+)
+
+func TestReplicateRepository(t *testing.T) {
+ tmpPath, cleanup := testhelper.TempDir(t, testhelper.GitlabTestStoragePath(), t.Name())
+ defer cleanup()
+
+ replicaPath := filepath.Join(tmpPath, "replica")
+ require.NoError(t, os.MkdirAll(replicaPath, 0755))
+
+ defer func(storages []config.Storage) {
+ config.Config.Storages = storages
+ }(config.Config.Storages)
+
+ config.Config.Storages = []config.Storage{
+ config.Storage{
+ Name: "default",
+ Path: testhelper.GitlabTestStoragePath(),
+ },
+ config.Storage{
+ Name: "replica",
+ Path: replicaPath,
+ },
+ }
+
+ server, serverSocketPath := runFullServer(t)
+ defer server.Stop()
+
+ config.Config.SocketPath = serverSocketPath
+
+ testRepo, testRepoPath, cleanupRepo := testhelper.NewTestRepo(t)
+ defer cleanupRepo()
+
+ _, newBranch := testhelper.CreateCommitOnNewBranch(t, testRepoPath)
+
+ // write info attributes
+ attrFilePath := path.Join(testRepoPath, "info", "attributes")
+ attrData := []byte("*.pbxproj binary\n")
+ require.NoError(t, ioutil.WriteFile(attrFilePath, attrData, 0644))
+
+ // create object pool
+ pool, err := objectpool.NewObjectPool(testRepo.GetStorageName(), testhelper.NewTestObjectPoolName(t))
+ require.NoError(t, err)
+
+ poolCtx, cancel := testhelper.Context()
+
+ require.NoError(t, pool.Create(poolCtx, testRepo))
+ require.NoError(t, pool.Link(poolCtx, testRepo))
+
+ cancel()
+
+ ctx, cancel := testhelper.Context()
+ defer cancel()
+
+ connOpts := []grpc.DialOption{
+ grpc.WithInsecure(),
+ grpc.WithPerRPCCredentials(gitalyauth.RPCCredentials(config.Config.Auth.Token)),
+ }
+
+ conn, err := grpc.Dial(serverSocketPath, connOpts...)
+ require.NoError(t, err)
+
+ repoClient := gitalypb.NewRepositoryServiceClient(conn)
+
+ injectedCtx, err := helper.InjectGitalyServers(ctx, "default", serverSocketPath, config.Config.Auth.Token)
+ require.NoError(t, err)
+
+ targetRepo := *testRepo
+ targetRepo.StorageName = "replica"
+
+ _, err = repoClient.ReplicateRepository(injectedCtx, &gitalypb.ReplicateRepositoryRequest{
+ Repository: &targetRepo,
+ Source: testRepo,
+ })
+ require.NoError(t, err)
+
+ targetRepoPath, err := helper.GetRepoPath(&targetRepo)
+ require.NoError(t, err)
+
+ testhelper.MustRunCommand(t, nil, "git", "-C", targetRepoPath, "gc")
+
+ require.True(t, getGitObjectDirSize(t, targetRepoPath) < 100, "expect a small object directory size")
+ testhelper.MustRunCommand(t, nil, "git", "-C", targetRepoPath, "show-ref", "--verify", fmt.Sprintf("refs/heads/%s", newBranch))
+
+ replicatedAttrFilePath := path.Join(targetRepoPath, "info", "attributes")
+ replicatedAttrData, err := ioutil.ReadFile(replicatedAttrFilePath)
+ require.NoError(t, err)
+ require.Equal(t, attrData, replicatedAttrData, "info/attributes files must match")
+}