Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJames Fargher <jfargher@gitlab.com>2021-08-30 03:22:52 +0300
committerJames Fargher <jfargher@gitlab.com>2021-09-23 02:55:44 +0300
commitbaf7b1dcc301c125086159ad65eadcabe71345e0 (patch)
treedb8691694be65d23e86490fb49a2b81adca14c78
parent29cdfa053025b777d2fed84774570e769f555f35 (diff)
Add FetchBundle RPC
FetchBundle copies the bundle stream to a tempfile that is then passed to `git-fetch`. Changelog: added
-rw-r--r--internal/gitaly/service/repository/fetch_bundle.go75
-rw-r--r--internal/gitaly/service/repository/fetch_bundle_test.go205
-rw-r--r--internal/praefect/coordinator.go1
-rw-r--r--internal/praefect/protoregistry/protoregistry_test.go1
4 files changed, 282 insertions, 0 deletions
diff --git a/internal/gitaly/service/repository/fetch_bundle.go b/internal/gitaly/service/repository/fetch_bundle.go
new file mode 100644
index 000000000..05204d064
--- /dev/null
+++ b/internal/gitaly/service/repository/fetch_bundle.go
@@ -0,0 +1,75 @@
+package repository
+
+import (
+ "io"
+ "os"
+ "path/filepath"
+
+ gitalyerrors "gitlab.com/gitlab-org/gitaly/v14/internal/errors"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/git"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/git/localrepo"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/helper"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/tempdir"
+ "gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb"
+ "gitlab.com/gitlab-org/gitaly/v14/streamio"
+)
+
+const (
+ mirrorRefSpec = "+refs/*:refs/*"
+)
+
+func (s *server) FetchBundle(stream gitalypb.RepositoryService_FetchBundleServer) error {
+ firstRequest, err := stream.Recv()
+ if err != nil {
+ return helper.ErrInternalf("first request: %v", err)
+ }
+
+ if firstRequest.GetRepository() == nil {
+ return helper.ErrInvalidArgument(gitalyerrors.ErrEmptyRepository)
+ }
+
+ firstRead := true
+ reader := streamio.NewReader(func() ([]byte, error) {
+ if firstRead {
+ firstRead = false
+ return firstRequest.GetData(), nil
+ }
+
+ request, err := stream.Recv()
+ return request.GetData(), err
+ })
+
+ ctx := stream.Context()
+ repo := s.localrepo(firstRequest.GetRepository())
+
+ tmpDir, err := tempdir.New(ctx, repo.GetStorageName(), s.locator)
+ if err != nil {
+ return helper.ErrInternal(err)
+ }
+
+ bundlePath := filepath.Join(tmpDir.Path(), "repo.bundle")
+ file, err := os.Create(bundlePath)
+ if err != nil {
+ return helper.ErrInternal(err)
+ }
+
+ _, err = io.Copy(file, reader)
+ if err != nil {
+ return helper.ErrInternalf("copy bundle: %w", err)
+ }
+
+ config := []git.ConfigPair{
+ {Key: "remote.inmemory.url", Value: bundlePath},
+ {Key: "remote.inmemory.fetch", Value: mirrorRefSpec},
+ }
+ opts := localrepo.FetchOpts{
+ Prune: true,
+ CommandOptions: []git.CmdOpt{git.WithConfigEnv(config...)},
+ }
+
+ if err := repo.FetchRemote(ctx, "inmemory", opts); err != nil {
+ return helper.ErrInternal(err)
+ }
+
+ return stream.SendAndClose(&gitalypb.FetchBundleResponse{})
+}
diff --git a/internal/gitaly/service/repository/fetch_bundle_test.go b/internal/gitaly/service/repository/fetch_bundle_test.go
new file mode 100644
index 000000000..a81889000
--- /dev/null
+++ b/internal/gitaly/service/repository/fetch_bundle_test.go
@@ -0,0 +1,205 @@
+package repository
+
+import (
+ "context"
+ "io"
+ "os"
+ "path/filepath"
+ "testing"
+
+ "github.com/stretchr/testify/require"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/git/gittest"
+ gitalyhook "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/hook"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/service"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/service/hook"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/helper"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/metadata"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testcfg"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testserver"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/transaction/txinfo"
+ "gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb"
+ "gitlab.com/gitlab-org/gitaly/v14/streamio"
+ "google.golang.org/grpc"
+ "google.golang.org/grpc/codes"
+)
+
+func TestServer_FetchBundle_success(t *testing.T) {
+ t.Parallel()
+ cfg, _, repoPath, client := setupRepositoryService(t)
+
+ tmp := testhelper.TempDir(t)
+ bundlePath := filepath.Join(tmp, "test.bundle")
+
+ gittest.Exec(t, cfg, "-C", repoPath, "bundle", "create", bundlePath, "--all")
+ expectedRefs := gittest.Exec(t, cfg, "-C", repoPath, "show-ref", "--head")
+
+ targetRepo, targetRepoPath := gittest.InitRepo(t, cfg, cfg.Storages[0])
+
+ ctx, cancel := testhelper.Context()
+ defer cancel()
+
+ stream, err := client.FetchBundle(ctx)
+ require.NoError(t, err)
+
+ request := &gitalypb.FetchBundleRequest{Repository: targetRepo}
+ writer := streamio.NewWriter(func(p []byte) error {
+ request.Data = p
+
+ if err := stream.Send(request); err != nil {
+ return err
+ }
+
+ request = &gitalypb.FetchBundleRequest{}
+
+ return nil
+ })
+
+ bundle, err := os.Open(bundlePath)
+ require.NoError(t, err)
+ defer testhelper.MustClose(t, bundle)
+
+ _, err = io.Copy(writer, bundle)
+ require.NoError(t, err)
+
+ _, err = stream.CloseAndRecv()
+ require.NoError(t, err)
+
+ refs := gittest.Exec(t, cfg, "-C", targetRepoPath, "show-ref", "--head")
+ require.Equal(t, string(expectedRefs), string(refs))
+}
+
+func TestServer_FetchBundle_transaction(t *testing.T) {
+ t.Parallel()
+ cfg, repoProto, repoPath := testcfg.BuildWithRepo(t)
+ testhelper.BuildGitalyHooks(t, cfg)
+
+ hookManager := &mockHookManager{}
+ addr := testserver.RunGitalyServer(t, cfg, nil, func(srv *grpc.Server, deps *service.Dependencies) {
+ gitalypb.RegisterRepositoryServiceServer(srv, NewServer(
+ deps.GetCfg(),
+ deps.GetRubyServer(),
+ deps.GetLocator(),
+ deps.GetTxManager(),
+ deps.GetGitCmdFactory(),
+ deps.GetCatfileCache(),
+ ))
+ gitalypb.RegisterHookServiceServer(srv, hook.NewServer(
+ deps.GetCfg(),
+ deps.GetHookManager(),
+ deps.GetGitCmdFactory(),
+ deps.GetPackObjectsCache(),
+ ))
+ }, testserver.WithHookManager(hookManager), testserver.WithDisablePraefect())
+
+ client := newRepositoryClient(t, cfg, addr)
+
+ tmp := testhelper.TempDir(t)
+ bundlePath := filepath.Join(tmp, "test.bundle")
+ gittest.BundleTestRepo(t, cfg, "gitlab-test.git", bundlePath)
+
+ _, stopGitServer := gittest.GitServer(t, cfg, repoPath, nil)
+ defer func() { require.NoError(t, stopGitServer()) }()
+
+ ctx, cancel := testhelper.Context()
+ defer cancel()
+ ctx, err := txinfo.InjectTransaction(ctx, 1, "node", true)
+ require.NoError(t, err)
+ ctx = metadata.IncomingToOutgoing(ctx)
+
+ require.Empty(t, hookManager.states)
+
+ stream, err := client.FetchBundle(ctx)
+ require.NoError(t, err)
+
+ request := &gitalypb.FetchBundleRequest{Repository: repoProto}
+ writer := streamio.NewWriter(func(p []byte) error {
+ request.Data = p
+
+ if err := stream.Send(request); err != nil {
+ return err
+ }
+
+ request = &gitalypb.FetchBundleRequest{}
+
+ return nil
+ })
+
+ bundle, err := os.Open(bundlePath)
+ require.NoError(t, err)
+ defer testhelper.MustClose(t, bundle)
+
+ _, err = io.Copy(writer, bundle)
+ require.NoError(t, err)
+
+ _, err = stream.CloseAndRecv()
+ require.NoError(t, err)
+
+ require.Equal(t, []gitalyhook.ReferenceTransactionState{
+ gitalyhook.ReferenceTransactionPrepared,
+ gitalyhook.ReferenceTransactionCommitted,
+ }, hookManager.states)
+}
+
+func TestServer_FetchBundle_validation(t *testing.T) {
+ t.Parallel()
+
+ for _, tc := range []struct {
+ desc string
+ firstRequest *gitalypb.FetchBundleRequest
+ expectedStreamErr string
+ expectedStreamErrCode codes.Code
+ }{
+ {
+ desc: "no repo",
+ firstRequest: &gitalypb.FetchBundleRequest{
+ Repository: nil,
+ },
+ expectedStreamErr: "empty Repository",
+ expectedStreamErrCode: codes.InvalidArgument,
+ },
+ {
+ desc: "unknown repo",
+ firstRequest: &gitalypb.FetchBundleRequest{
+ Repository: &gitalypb.Repository{
+ StorageName: "default",
+ RelativePath: "unknown",
+ },
+ },
+ expectedStreamErr: "not a git repository",
+ expectedStreamErrCode: codes.NotFound,
+ },
+ } {
+ t.Run(tc.desc, func(t *testing.T) {
+ _, client := setupRepositoryServiceWithoutRepo(t)
+
+ ctx, cancel := testhelper.Context()
+ defer cancel()
+
+ stream, err := client.FetchBundle(ctx)
+ require.NoError(t, err)
+
+ err = stream.Send(tc.firstRequest)
+ require.NoError(t, err)
+
+ _, err = stream.CloseAndRecv()
+ require.Error(t, err)
+ if tc.expectedStreamErr != "" {
+ require.Contains(t, err.Error(), tc.expectedStreamErr)
+ }
+ if tc.expectedStreamErrCode != 0 {
+ require.Equal(t, tc.expectedStreamErrCode, helper.GrpcCode(err))
+ }
+ })
+ }
+}
+
+type mockHookManager struct {
+ gitalyhook.Manager
+ states []gitalyhook.ReferenceTransactionState
+}
+
+func (m *mockHookManager) ReferenceTransactionHook(_ context.Context, state gitalyhook.ReferenceTransactionState, _ []string, _ io.Reader) error {
+ m.states = append(m.states, state)
+ return nil
+}
diff --git a/internal/praefect/coordinator.go b/internal/praefect/coordinator.go
index aea341dd0..938f47fe7 100644
--- a/internal/praefect/coordinator.go
+++ b/internal/praefect/coordinator.go
@@ -73,6 +73,7 @@ var transactionRPCs = map[string]transactionsCondition{
"/gitaly.RepositoryService/CreateRepositoryFromBundle": transactionsEnabled,
"/gitaly.RepositoryService/CreateRepositoryFromSnapshot": transactionsEnabled,
"/gitaly.RepositoryService/CreateRepositoryFromURL": transactionsEnabled,
+ "/gitaly.RepositoryService/FetchBundle": transactionsEnabled,
"/gitaly.RepositoryService/FetchRemote": transactionsEnabled,
"/gitaly.RepositoryService/FetchSourceBranch": transactionsEnabled,
"/gitaly.RepositoryService/RemoveRepository": transactionsEnabled,
diff --git a/internal/praefect/protoregistry/protoregistry_test.go b/internal/praefect/protoregistry/protoregistry_test.go
index 87ebfcd9d..515c13e3d 100644
--- a/internal/praefect/protoregistry/protoregistry_test.go
+++ b/internal/praefect/protoregistry/protoregistry_test.go
@@ -112,6 +112,7 @@ func TestNewProtoRegistry(t *testing.T) {
"RepositorySize": protoregistry.OpAccessor,
"ApplyGitattributes": protoregistry.OpMutator,
"FetchRemote": protoregistry.OpMutator,
+ "FetchBundle": protoregistry.OpMutator,
"CreateRepository": protoregistry.OpMutator,
"GetArchive": protoregistry.OpAccessor,
"HasLocalBranches": protoregistry.OpAccessor,