From 3529554f5b6bae1f479748e23f294ce5983c002c Mon Sep 17 00:00:00 2001 From: John Cai Date: Thu, 7 Nov 2019 15:44:58 -0800 Subject: ReplicateRepository RPC --- .../unreleased/jc-replicate-repository-impl.yml | 5 + internal/service/repository/replicate.go | 231 +++++++++++++++++++++ internal/service/repository/replicate_test.go | 106 ++++++++++ 3 files changed, 342 insertions(+) create mode 100644 changelogs/unreleased/jc-replicate-repository-impl.yml create mode 100644 internal/service/repository/replicate.go create mode 100644 internal/service/repository/replicate_test.go diff --git a/changelogs/unreleased/jc-replicate-repository-impl.yml b/changelogs/unreleased/jc-replicate-repository-impl.yml new file mode 100644 index 000000000..cad99c69b --- /dev/null +++ b/changelogs/unreleased/jc-replicate-repository-impl.yml @@ -0,0 +1,5 @@ +--- +title: ReplicateRepository RPC +merge_request: 1605 +author: +type: added diff --git a/internal/service/repository/replicate.go b/internal/service/repository/replicate.go new file mode 100644 index 000000000..29b4106a7 --- /dev/null +++ b/internal/service/repository/replicate.go @@ -0,0 +1,231 @@ +package repository + +import ( + "context" + "io" + "os" + "path/filepath" + + "golang.org/x/sync/errgroup" + "google.golang.org/grpc" + + gitalyauth "gitlab.com/gitlab-org/gitaly/auth" + "gitlab.com/gitlab-org/gitaly/internal/config" + "gitlab.com/gitlab-org/gitaly/internal/git/objectpool" + "gitlab.com/gitlab-org/gitaly/internal/helper" + "gitlab.com/gitlab-org/gitaly/internal/tempdir" + "gitlab.com/gitlab-org/gitaly/proto/go/gitalypb" + "gitlab.com/gitlab-org/gitaly/streamio" +) + +func (s *server) ReplicateRepository(ctx context.Context, in *gitalypb.ReplicateRepositoryRequest) (*gitalypb.ReplicateRepositoryResponse, error) { + if _, err := s.CreateRepository(ctx, &gitalypb.CreateRepositoryRequest{ + Repository: in.GetRepository(), + }); err != nil { + return nil, helper.ErrInternal(err) + } + + g, ctx := errgroup.WithContext(ctx) + outgoingCtx := helper.IncomingToOutgoing(ctx) + + for _, f := range []func(context.Context, *gitalypb.ReplicateRepositoryRequest) error{ + syncRepository, + syncInfoAttributes, + s.syncObjectPool, + } { + f := f // rescoping f + g.Go(func() error { return f(outgoingCtx, in) }) + } + + if err := g.Wait(); err != nil { + return nil, helper.ErrInternal(err) + } + + return &gitalypb.ReplicateRepositoryResponse{}, nil +} + +func syncRepository(ctx context.Context, in *gitalypb.ReplicateRepositoryRequest) error { + remoteClient, err := newRemoteClient() + if err != nil { + return err + } + + if _, err = remoteClient.FetchInternalRemote(ctx, &gitalypb.FetchInternalRemoteRequest{ + Repository: in.GetRepository(), + RemoteRepository: in.GetSource(), + }); err != nil { + return helper.ErrInternal(err) + } + + return nil +} + +func syncInfoAttributes(ctx context.Context, in *gitalypb.ReplicateRepositoryRequest) error { + repoClient, err := newRepoClient(ctx, in.GetSource().GetStorageName()) + if err != nil { + return err + } + + repoPath, err := helper.GetRepoPath(in.GetRepository()) + if err != nil { + return helper.ErrInternal(err) + } + + infoPath := filepath.Join(repoPath, "info") + attributesPath := filepath.Join(infoPath, "attributes") + + if err := os.MkdirAll(infoPath, 0755); err != nil { + return helper.ErrInternal(err) + } + + tmpDir, err := tempdir.New(ctx, in.GetRepository()) + if err != nil { + return helper.ErrInternal(err) + } + + attributesFile, err := os.Create(filepath.Join(tmpDir, "attributes")) + if err != nil { + return helper.ErrInternal(err) + } + defer attributesFile.Close() + + stream, err := repoClient.GetInfoAttributes(ctx, &gitalypb.GetInfoAttributesRequest{ + Repository: in.GetSource(), + }) + if err != nil { + return helper.ErrInternal(err) + } + + if _, err := io.Copy(attributesFile, streamio.NewReader(func() ([]byte, error) { + resp, err := stream.Recv() + return resp.GetAttributes(), err + })); err != nil { + return helper.ErrInternal(err) + } + + if err := os.Chmod(attributesFile.Name(), attributesFileMode); err != nil { + return helper.ErrInternal(err) + } + + if err = os.Rename(attributesFile.Name(), attributesPath); err != nil { + return helper.ErrInternal(err) + } + + return nil +} + +func copyObjectPoolProto(objectPoolProto *gitalypb.ObjectPool) *gitalypb.ObjectPool { + objectPoolProtoCp := *objectPoolProto + objectPoolRepoCp := *objectPoolProto.Repository + objectPoolProtoCp.Repository = &objectPoolRepoCp + + return &objectPoolProtoCp +} + +func (s *server) syncObjectPool(ctx context.Context, in *gitalypb.ReplicateRepositoryRequest) error { + objectPoolClient, err := newObjectPoolClient(ctx, in.GetSource().GetStorageName()) + if err != nil { + return err + } + + resp, err := objectPoolClient.GetObjectPool(ctx, &gitalypb.GetObjectPoolRequest{ + Repository: in.GetSource(), + }) + if err != nil { + return err + } + + sourceObjectPoolProto := resp.GetObjectPool() + if sourceObjectPoolProto == nil { + return nil + } + + targetObjectPoolProto := copyObjectPoolProto(sourceObjectPoolProto) + targetObjectPoolProto.Repository.StorageName = in.GetRepository().GetStorageName() + + targetObjectPool, err := objectpool.FromProto(targetObjectPoolProto) + if err != nil { + return err + } + + if !targetObjectPool.Exists() { + if _, err := s.CreateRepository(ctx, &gitalypb.CreateRepositoryRequest{Repository: targetObjectPoolProto.GetRepository()}); err != nil { + return err + } + + remoteClient, err := newRemoteClient() + + if err != nil { + return err + } + + poolRepository := targetObjectPoolProto.GetRepository() + + if _, err := remoteClient.FetchInternalRemote(ctx, &gitalypb.FetchInternalRemoteRequest{ + Repository: poolRepository, + RemoteRepository: sourceObjectPoolProto.GetRepository(), + }); err != nil { + return err + } + } + + return targetObjectPool.Link(ctx, in.GetRepository()) +} + +// newRemoteClient creates a new RemoteClient that talks to the same gitaly server +func newRemoteClient() (gitalypb.RemoteServiceClient, error) { + connOpts := []grpc.DialOption{ + grpc.WithInsecure(), + grpc.WithPerRPCCredentials(gitalyauth.RPCCredentials(config.Config.Auth.Token)), + } + conn, err := grpc.Dial(config.Config.SocketPath, connOpts...) + if err != nil { + return nil, helper.ErrInternalf("could not dial source: %v", err) + } + + return gitalypb.NewRemoteServiceClient(conn), nil +} + +// newRepoClient creates a new RepositoryClient that talks to the gitaly of the source repository +func newRepoClient(ctx context.Context, storageName string) (gitalypb.RepositoryServiceClient, error) { + conn, err := newClientConnection(ctx, storageName) + if err != nil { + return nil, err + } + + return gitalypb.NewRepositoryServiceClient(conn), nil +} + +// newObjectPoolClient creates a new RepositoryClient that talks to the gitaly of the source repository +func newObjectPoolClient(ctx context.Context, storageName string) (gitalypb.ObjectPoolServiceClient, error) { + conn, err := newClientConnection(ctx, storageName) + if err != nil { + return nil, err + } + + return gitalypb.NewObjectPoolServiceClient(conn), nil +} + +func newClientConnection(ctx context.Context, storageName string) (*grpc.ClientConn, error) { + gitalyServersInfo, err := helper.ExtractGitalyServers(ctx) + if err != nil { + return nil, helper.ErrInternal(err) + } + + sourceRepositoryStorageInfo, ok := gitalyServersInfo[storageName] + if !ok { + return nil, helper.ErrInternalf("gitaly server info for %s not found", storageName) + } + + connOpts := []grpc.DialOption{ + grpc.WithInsecure(), + grpc.WithPerRPCCredentials(gitalyauth.RPCCredentials(sourceRepositoryStorageInfo["token"])), + } + + conn, err := grpc.Dial(sourceRepositoryStorageInfo["address"], connOpts...) + if err != nil { + return nil, helper.ErrInternalf("could not dial source: %v", err) + } + + return conn, nil +} diff --git a/internal/service/repository/replicate_test.go b/internal/service/repository/replicate_test.go new file mode 100644 index 000000000..e2686d7a8 --- /dev/null +++ b/internal/service/repository/replicate_test.go @@ -0,0 +1,106 @@ +package repository_test + +import ( + "fmt" + "io/ioutil" + "os" + "path" + "path/filepath" + "testing" + + "github.com/stretchr/testify/require" + gitalyauth "gitlab.com/gitlab-org/gitaly/auth" + "gitlab.com/gitlab-org/gitaly/internal/config" + "gitlab.com/gitlab-org/gitaly/internal/git/objectpool" + "gitlab.com/gitlab-org/gitaly/internal/helper" + "gitlab.com/gitlab-org/gitaly/internal/testhelper" + "gitlab.com/gitlab-org/gitaly/proto/go/gitalypb" + "google.golang.org/grpc" +) + +func TestReplicateRepository(t *testing.T) { + tmpPath, cleanup := testhelper.TempDir(t, testhelper.GitlabTestStoragePath(), t.Name()) + defer cleanup() + + replicaPath := filepath.Join(tmpPath, "replica") + require.NoError(t, os.MkdirAll(replicaPath, 0755)) + + defer func(storages []config.Storage) { + config.Config.Storages = storages + }(config.Config.Storages) + + config.Config.Storages = []config.Storage{ + config.Storage{ + Name: "default", + Path: testhelper.GitlabTestStoragePath(), + }, + config.Storage{ + Name: "replica", + Path: replicaPath, + }, + } + + server, serverSocketPath := runFullServer(t) + defer server.Stop() + + config.Config.SocketPath = serverSocketPath + + testRepo, testRepoPath, cleanupRepo := testhelper.NewTestRepo(t) + defer cleanupRepo() + + _, newBranch := testhelper.CreateCommitOnNewBranch(t, testRepoPath) + + // write info attributes + attrFilePath := path.Join(testRepoPath, "info", "attributes") + attrData := []byte("*.pbxproj binary\n") + require.NoError(t, ioutil.WriteFile(attrFilePath, attrData, 0644)) + + // create object pool + pool, err := objectpool.NewObjectPool(testRepo.GetStorageName(), testhelper.NewTestObjectPoolName(t)) + require.NoError(t, err) + + poolCtx, cancel := testhelper.Context() + + require.NoError(t, pool.Create(poolCtx, testRepo)) + require.NoError(t, pool.Link(poolCtx, testRepo)) + + cancel() + + ctx, cancel := testhelper.Context() + defer cancel() + + connOpts := []grpc.DialOption{ + grpc.WithInsecure(), + grpc.WithPerRPCCredentials(gitalyauth.RPCCredentials(config.Config.Auth.Token)), + } + + conn, err := grpc.Dial(serverSocketPath, connOpts...) + require.NoError(t, err) + + repoClient := gitalypb.NewRepositoryServiceClient(conn) + + injectedCtx, err := helper.InjectGitalyServers(ctx, "default", serverSocketPath, config.Config.Auth.Token) + require.NoError(t, err) + + targetRepo := *testRepo + targetRepo.StorageName = "replica" + + _, err = repoClient.ReplicateRepository(injectedCtx, &gitalypb.ReplicateRepositoryRequest{ + Repository: &targetRepo, + Source: testRepo, + }) + require.NoError(t, err) + + targetRepoPath, err := helper.GetRepoPath(&targetRepo) + require.NoError(t, err) + + testhelper.MustRunCommand(t, nil, "git", "-C", targetRepoPath, "gc") + + require.True(t, getGitObjectDirSize(t, targetRepoPath) < 100, "expect a small object directory size") + testhelper.MustRunCommand(t, nil, "git", "-C", targetRepoPath, "show-ref", "--verify", fmt.Sprintf("refs/heads/%s", newBranch)) + + replicatedAttrFilePath := path.Join(targetRepoPath, "info", "attributes") + replicatedAttrData, err := ioutil.ReadFile(replicatedAttrFilePath) + require.NoError(t, err) + require.Equal(t, attrData, replicatedAttrData, "info/attributes files must match") +} -- cgit v1.2.3