Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorToon Claes <toon@gitlab.com>2021-09-30 18:06:15 +0300
committerToon Claes <toon@gitlab.com>2021-09-30 18:06:15 +0300
commit1ec2f7682f9aad976a323881c4437228f4e7c201 (patch)
treeaab863f7db31c4cd62f2a9f8c9126cf87721ec96 /internal
parenteeca095e5680831ad140c2338d0f1c7d8b06a1ba (diff)
parent7d681ebd6c531e048a9e5d41f524dafc02e76516 (diff)
Merge branch 'qmnguyen0711/1219-create-postuploadpackwithsidechannel-postuploadpack-replacement-using-sidechannel' into 'master'
Create PostUploadPackWithSidechannel See merge request gitlab-org/gitaly!3883
Diffstat (limited to 'internal')
-rw-r--r--internal/gitaly/service/smarthttp/testhelper_test.go16
-rw-r--r--internal/gitaly/service/smarthttp/upload_pack.go145
-rw-r--r--internal/gitaly/service/smarthttp/upload_pack_test.go259
3 files changed, 309 insertions, 111 deletions
diff --git a/internal/gitaly/service/smarthttp/testhelper_test.go b/internal/gitaly/service/smarthttp/testhelper_test.go
index bb6b2c9fe..3b4ef7cfc 100644
--- a/internal/gitaly/service/smarthttp/testhelper_test.go
+++ b/internal/gitaly/service/smarthttp/testhelper_test.go
@@ -53,6 +53,22 @@ func runSmartHTTPServer(t *testing.T, cfg config.Cfg, serverOpts ...ServerOpt) s
return gitalyServer.Address()
}
+// TODO: remove this method and use runSmartHTTPServer after
+// https://gitlab.com/gitlab-com/gl-infra/scalability/-/issues/1218
+func runSmartHTTPServerWithoutPraefect(t *testing.T, cfg config.Cfg, serverOpts ...ServerOpt) string {
+ gitalyServer := testserver.StartGitalyServer(t, cfg, nil, func(srv *grpc.Server, deps *service.Dependencies) {
+ gitalypb.RegisterSmartHTTPServiceServer(srv, NewServer(
+ deps.GetCfg(),
+ deps.GetLocator(),
+ deps.GetGitCmdFactory(),
+ deps.GetDiskCache(),
+ serverOpts...,
+ ))
+ gitalypb.RegisterHookServiceServer(srv, hookservice.NewServer(deps.GetCfg(), deps.GetHookManager(), deps.GetGitCmdFactory(), deps.GetPackObjectsCache()))
+ }, testserver.WithDisablePraefect())
+ return gitalyServer.Address()
+}
+
func newSmartHTTPClient(t *testing.T, serverSocketPath, token string) (gitalypb.SmartHTTPServiceClient, *grpc.ClientConn) {
t.Helper()
diff --git a/internal/gitaly/service/smarthttp/upload_pack.go b/internal/gitaly/service/smarthttp/upload_pack.go
index e8e900d63..768e686e6 100644
--- a/internal/gitaly/service/smarthttp/upload_pack.go
+++ b/internal/gitaly/service/smarthttp/upload_pack.go
@@ -10,12 +10,20 @@ import (
"gitlab.com/gitlab-org/gitaly/v14/internal/command"
"gitlab.com/gitlab-org/gitaly/v14/internal/git"
"gitlab.com/gitlab-org/gitaly/v14/internal/git/stats"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/helper"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/sidechannel"
"gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb"
"gitlab.com/gitlab-org/gitaly/v14/streamio"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
+type basicPostUploadPackRequest interface {
+ GetRepository() *gitalypb.Repository
+ GetGitConfigOptions() []string
+ GetGitProtocol() string
+}
+
func (s *server) PostUploadPack(stream gitalypb.SmartHTTPService_PostUploadPackServer) error {
ctx := stream.Context()
@@ -24,81 +32,48 @@ func (s *server) PostUploadPack(stream gitalypb.SmartHTTPService_PostUploadPackS
return err
}
- if err := validateUploadPackRequest(req); err != nil {
- return err
+ if req.Data != nil {
+ return status.Errorf(codes.InvalidArgument, "non-empty Data")
}
- h := sha1.New()
+ repoPath, gitConfig, err := s.validateUploadPackRequest(ctx, req)
+ if err != nil {
+ return err
+ }
- stdinReader := io.TeeReader(streamio.NewReader(func() ([]byte, error) {
+ stdin := streamio.NewReader(func() ([]byte, error) {
resp, err := stream.Recv()
return resp.GetData(), err
- }), h)
-
- stdin, collector := s.runStatsCollector(stream.Context(), stdinReader)
- defer collector.finish()
-
- var respBytes int64
-
+ })
stdout := streamio.NewWriter(func(p []byte) error {
- respBytes += int64(len(p))
return stream.Send(&gitalypb.PostUploadPackResponse{Data: p})
})
- repoPath, err := s.locator.GetRepoPath(req.Repository)
- if err != nil {
- return err
- }
-
- git.WarnIfTooManyBitmaps(ctx, s.locator, req.GetRepository().GetStorageName(), repoPath)
+ return s.runUploadPack(ctx, req, repoPath, gitConfig, stdin, stdout)
+}
- config, err := git.ConvertConfigOptions(req.GitConfigOptions)
+func (s *server) PostUploadPackWithSidechannel(ctx context.Context, req *gitalypb.PostUploadPackWithSidechannelRequest) (*gitalypb.PostUploadPackWithSidechannelResponse, error) {
+ repoPath, gitConfig, err := s.validateUploadPackRequest(ctx, req)
if err != nil {
- return err
+ return nil, err
}
- commandOpts := []git.CmdOpt{
- git.WithStdin(stdin),
- git.WithStdout(stdout),
- git.WithGitProtocol(ctx, req),
- git.WithConfig(config...),
- git.WithPackObjectsHookEnv(ctx, req.Repository, s.cfg),
- }
-
- cmd, err := s.gitCmdFactory.NewWithoutRepo(ctx, git.SubCmd{
- Name: "upload-pack",
- Flags: []git.Option{git.Flag{Name: "--stateless-rpc"}},
- Args: []string{repoPath},
- }, commandOpts...)
+ conn, err := sidechannel.OpenSidechannel(ctx)
if err != nil {
- return status.Errorf(codes.Unavailable, "PostUploadPack: cmd: %v", err)
+ return nil, status.Errorf(codes.Internal, "open sidechannel: %v", err)
}
+ defer conn.Close()
- if err := cmd.Wait(); err != nil {
- stats := collector.finish()
-
- if _, ok := command.ExitStatus(err); ok && stats.Deepen != "" {
- // We have seen a 'deepen' message in the request. It is expected that
- // git-upload-pack has a non-zero exit status: don't treat this as an
- // error.
- return nil
- }
-
- return status.Errorf(codes.Unavailable, "PostUploadPack: %v", err)
+ if err := s.runUploadPack(ctx, req, repoPath, gitConfig, conn, conn); err != nil {
+ return nil, err
}
- ctxlogrus.Extract(ctx).WithField("request_sha", fmt.Sprintf("%x", h.Sum(nil))).WithField("response_bytes", respBytes).Info("request details")
-
- return nil
-}
-
-func validateUploadPackRequest(req *gitalypb.PostUploadPackRequest) error {
- if req.Data != nil {
- return status.Errorf(codes.InvalidArgument, "PostUploadPack: non-empty Data")
+ if err := conn.Close(); err != nil {
+ return nil, status.Errorf(codes.Internal, "close sidechannel connection: %v", err)
}
- return nil
+ return &gitalypb.PostUploadPackWithSidechannelResponse{}, nil
}
type statsCollector struct {
@@ -133,3 +108,65 @@ func (s *server) runStatsCollector(ctx context.Context, r io.Reader) (io.Reader,
return io.TeeReader(r, pw), sc
}
+
+func (s *server) validateUploadPackRequest(ctx context.Context, req basicPostUploadPackRequest) (string, []git.ConfigPair, error) {
+ repoPath, err := s.locator.GetRepoPath(req.GetRepository())
+ if err != nil {
+ return "", nil, helper.ErrInvalidArgument(err)
+ }
+
+ git.WarnIfTooManyBitmaps(ctx, s.locator, req.GetRepository().GetStorageName(), repoPath)
+
+ config, err := git.ConvertConfigOptions(req.GetGitConfigOptions())
+ if err != nil {
+ return "", nil, helper.ErrInvalidArgument(err)
+ }
+
+ return repoPath, config, nil
+}
+
+func (s *server) runUploadPack(ctx context.Context, req basicPostUploadPackRequest, repoPath string, gitConfig []git.ConfigPair, stdin io.Reader, stdout io.Writer) error {
+ h := sha1.New()
+
+ stdin = io.TeeReader(stdin, h)
+ stdin, collector := s.runStatsCollector(ctx, stdin)
+ defer collector.finish()
+
+ commandOpts := []git.CmdOpt{
+ git.WithStdin(stdin),
+ git.WithGitProtocol(ctx, req),
+ git.WithConfig(gitConfig...),
+ git.WithPackObjectsHookEnv(ctx, req.GetRepository(), s.cfg),
+ }
+
+ cmd, err := s.gitCmdFactory.NewWithoutRepo(ctx, git.SubCmd{
+ Name: "upload-pack",
+ Flags: []git.Option{git.Flag{Name: "--stateless-rpc"}},
+ Args: []string{repoPath},
+ }, commandOpts...)
+ if err != nil {
+ return helper.ErrUnavailablef("cmd: %v", err)
+ }
+
+ respBytes, err := io.Copy(stdout, cmd)
+ if err != nil {
+ return helper.ErrUnavailablef("Fail to transfer git data: %v", err)
+ }
+
+ if err := cmd.Wait(); err != nil {
+ stats := collector.finish()
+
+ if _, ok := command.ExitStatus(err); ok && stats.Deepen != "" {
+ // We have seen a 'deepen' message in the request. It is expected that
+ // git-upload-pack has a non-zero exit status: don't treat this as an
+ // error.
+ return nil
+ }
+
+ return helper.ErrUnavailable(err)
+ }
+
+ ctxlogrus.Extract(ctx).WithField("request_sha", fmt.Sprintf("%x", h.Sum(nil))).WithField("response_bytes", respBytes).Info("request details")
+
+ return nil
+}
diff --git a/internal/gitaly/service/smarthttp/upload_pack_test.go b/internal/gitaly/service/smarthttp/upload_pack_test.go
index 8785a2c0e..1d9a9012a 100644
--- a/internal/gitaly/service/smarthttp/upload_pack_test.go
+++ b/internal/gitaly/service/smarthttp/upload_pack_test.go
@@ -11,46 +11,69 @@ import (
"github.com/prometheus/client_golang/prometheus"
promtest "github.com/prometheus/client_golang/prometheus/testutil"
+ "github.com/sirupsen/logrus"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
+ gitalyauth "gitlab.com/gitlab-org/gitaly/v14/auth"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/backchannel"
"gitlab.com/gitlab-org/gitaly/v14/internal/git"
"gitlab.com/gitlab-org/gitaly/v14/internal/git/gittest"
"gitlab.com/gitlab-org/gitaly/v14/internal/git/pktline"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/config"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/listenmux"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/sidechannel"
"gitlab.com/gitlab-org/gitaly/v14/internal/testhelper"
"gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testcfg"
"gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb"
"gitlab.com/gitlab-org/gitaly/v14/streamio"
+ "google.golang.org/grpc"
"google.golang.org/grpc/codes"
+ "google.golang.org/grpc/credentials/insecure"
)
const (
clientCapabilities = `multi_ack_detailed no-done side-band-64k thin-pack include-tag ofs-delta deepen-since deepen-not filter agent=git/2.18.0`
)
-func runTestWithAndWithoutConfigOptions(t *testing.T, tf func(t *testing.T, ctx context.Context, opts ...testcfg.Option), opts ...testcfg.Option) {
+type (
+ requestMaker func(ctx context.Context, t *testing.T, serverSocketPath, token string, in *gitalypb.PostUploadPackRequest, body io.Reader) (*bytes.Buffer, error)
+ serverRunner func(t *testing.T, cfg config.Cfg, serverOpts ...ServerOpt) string
+)
+
+func runTestWithAndWithoutConfigOptions(
+ t *testing.T,
+ tf func(t *testing.T, ctx context.Context, makeRequest requestMaker, runServer serverRunner, opts ...testcfg.Option),
+ makeRequest requestMaker,
+ runServer serverRunner,
+ opts ...testcfg.Option,
+) {
ctx, cancel := testhelper.Context()
defer cancel()
- t.Run("no config options", func(t *testing.T) { tf(t, ctx) })
+ t.Run("no config options", func(t *testing.T) { tf(t, ctx, makeRequest, runServer) })
if len(opts) > 0 {
t.Run("with config options", func(t *testing.T) {
- tf(t, ctx, opts...)
+ tf(t, ctx, makeRequest, runServer, opts...)
})
}
}
func TestServer_PostUpload(t *testing.T) {
- runTestWithAndWithoutConfigOptions(t, testServerPostUpload, testcfg.WithPackObjectsCacheEnabled())
+ runTestWithAndWithoutConfigOptions(t, testServerPostUpload, makePostUploadPackRequest, runSmartHTTPServer, testcfg.WithPackObjectsCacheEnabled())
+}
+
+func TestServer_PostUploadWithChannel(t *testing.T) {
+ runTestWithAndWithoutConfigOptions(t, testServerPostUpload, makePostUploadPackWithSidechannelRequest, runSmartHTTPServerWithoutPraefect, testcfg.WithPackObjectsCacheEnabled())
}
-func testServerPostUpload(t *testing.T, ctx context.Context, opts ...testcfg.Option) {
+func testServerPostUpload(t *testing.T, ctx context.Context, makeRequest requestMaker, runServer serverRunner, opts ...testcfg.Option) {
cfg, repo, repoPath := testcfg.BuildWithRepo(t, opts...)
_, localRepoPath := gittest.CloneRepo(t, cfg, cfg.Storages[0])
testhelper.BuildGitalyHooks(t, cfg)
negotiationMetrics := prometheus.NewCounterVec(prometheus.CounterOpts{}, []string{"feature"})
- serverSocketPath := runSmartHTTPServer(t, cfg, WithPackfileNegotiationMetrics(negotiationMetrics))
+ serverSocketPath := runServer(t, cfg, WithPackfileNegotiationMetrics(negotiationMetrics))
oldCommit, err := git.NewObjectIDFromHex("1e292f8fedd741b75372e19097c76d327140c312") // refs/heads/master
require.NoError(t, err)
@@ -65,7 +88,7 @@ func testServerPostUpload(t *testing.T, ctx context.Context, opts ...testcfg.Opt
gittest.WritePktlineFlush(t, requestBuffer)
req := &gitalypb.PostUploadPackRequest{Repository: repo}
- responseBuffer, err := makePostUploadPackRequest(ctx, t, serverSocketPath, cfg.Auth.Token, req, requestBuffer)
+ responseBuffer, err := makeRequest(ctx, t, serverSocketPath, cfg.Auth.Token, req, requestBuffer)
require.NoError(t, err)
pack, version, entries := extractPackDataFromResponse(t, responseBuffer)
@@ -81,14 +104,18 @@ func testServerPostUpload(t *testing.T, ctx context.Context, opts ...testcfg.Opt
}
func TestServer_PostUploadPack_gitConfigOptions(t *testing.T) {
- runTestWithAndWithoutConfigOptions(t, testServerPostUploadPackGitConfigOptions, testcfg.WithPackObjectsCacheEnabled())
+ runTestWithAndWithoutConfigOptions(t, testServerPostUploadPackGitConfigOptions, makePostUploadPackRequest, runSmartHTTPServer, testcfg.WithPackObjectsCacheEnabled())
}
-func testServerPostUploadPackGitConfigOptions(t *testing.T, ctx context.Context, opts ...testcfg.Option) {
+func TestServer_PostUploadPackSidechannel_gitConfigOptions(t *testing.T) {
+ runTestWithAndWithoutConfigOptions(t, testServerPostUploadPackGitConfigOptions, makePostUploadPackWithSidechannelRequest, runSmartHTTPServerWithoutPraefect, testcfg.WithPackObjectsCacheEnabled())
+}
+
+func testServerPostUploadPackGitConfigOptions(t *testing.T, ctx context.Context, makeRequest requestMaker, runServer serverRunner, opts ...testcfg.Option) {
cfg, repo, repoPath := testcfg.BuildWithRepo(t, opts...)
testhelper.BuildGitalyHooks(t, cfg)
- serverSocketPath := runSmartHTTPServer(t, cfg)
+ serverSocketPath := runServer(t, cfg)
want := "3dd08961455abf80ef9115f4afdc1c6f968b503c" // refs/heads/csv
gittest.Exec(t, cfg, "-C", repoPath, "update-ref", "refs/hidden/csv", want)
@@ -106,7 +133,7 @@ func testServerPostUploadPackGitConfigOptions(t *testing.T, ctx context.Context,
t.Run("sanity check: ref exists and can be fetched", func(t *testing.T) {
rpcRequest := &gitalypb.PostUploadPackRequest{Repository: repo}
- response, err := makePostUploadPackRequest(ctx, t, serverSocketPath, cfg.Auth.Token, rpcRequest, bytes.NewReader(requestBody.Bytes()))
+ response, err := makeRequest(ctx, t, serverSocketPath, cfg.Auth.Token, rpcRequest, bytes.NewReader(requestBody.Bytes()))
require.NoError(t, err)
_, _, count := extractPackDataFromResponse(t, response)
require.Equal(t, 5, count, "pack should have 5 entries")
@@ -120,7 +147,7 @@ func testServerPostUploadPackGitConfigOptions(t *testing.T, ctx context.Context,
"uploadpack.allowAnySHA1InWant=false",
},
}
- response, err := makePostUploadPackRequest(ctx, t, serverSocketPath, cfg.Auth.Token, rpcRequest, bytes.NewReader(requestBody.Bytes()))
+ response, err := makeRequest(ctx, t, serverSocketPath, cfg.Auth.Token, rpcRequest, bytes.NewReader(requestBody.Bytes()))
testhelper.RequireGrpcError(t, err, codes.Unavailable)
// The failure message proves that upload-pack failed because of
@@ -131,13 +158,17 @@ func testServerPostUploadPackGitConfigOptions(t *testing.T, ctx context.Context,
}
func TestServer_PostUploadPack_gitProtocol(t *testing.T) {
- runTestWithAndWithoutConfigOptions(t, testServerPostUploadPackGitProtocol, testcfg.WithPackObjectsCacheEnabled())
+ runTestWithAndWithoutConfigOptions(t, testServerPostUploadPackGitProtocol, makePostUploadPackRequest, runSmartHTTPServer, testcfg.WithPackObjectsCacheEnabled())
+}
+
+func TestServer_PostUploadPackWithSidechannel_gitProtocol(t *testing.T) {
+ runTestWithAndWithoutConfigOptions(t, testServerPostUploadPackGitProtocol, makePostUploadPackWithSidechannelRequest, runSmartHTTPServerWithoutPraefect, testcfg.WithPackObjectsCacheEnabled())
}
-func testServerPostUploadPackGitProtocol(t *testing.T, ctx context.Context, opts ...testcfg.Option) {
+func testServerPostUploadPackGitProtocol(t *testing.T, ctx context.Context, makeRequest requestMaker, runServer serverRunner, opts ...testcfg.Option) {
cfg, repo, _ := testcfg.BuildWithRepo(t, opts...)
readProto, cfg := gittest.EnableGitProtocolV2Support(t, cfg)
- serverSocketPath := runSmartHTTPServer(t, cfg)
+ serverSocketPath := runServer(t, cfg)
// command=ls-refs does not exist in protocol v0, so if this succeeds, we're talking v2
requestBody := &bytes.Buffer{}
@@ -152,7 +183,7 @@ func testServerPostUploadPackGitProtocol(t *testing.T, ctx context.Context, opts
GitProtocol: git.ProtocolV2,
}
- _, err := makePostUploadPackRequest(ctx, t, serverSocketPath, cfg.Auth.Token, rpcRequest, requestBody)
+ _, err := makeRequest(ctx, t, serverSocketPath, cfg.Auth.Token, rpcRequest, requestBody)
require.NoError(t, err)
envData := readProto()
@@ -163,12 +194,16 @@ func testServerPostUploadPackGitProtocol(t *testing.T, ctx context.Context, opts
// on 'deepen' requests even though the request is being handled just
// fine from the client perspective.
func TestServer_PostUploadPack_suppressDeepenExitError(t *testing.T) {
- runTestWithAndWithoutConfigOptions(t, testServerPostUploadPackSuppressDeepenExitError, testcfg.WithPackObjectsCacheEnabled())
+ runTestWithAndWithoutConfigOptions(t, testServerPostUploadPackSuppressDeepenExitError, makePostUploadPackRequest, runSmartHTTPServer, testcfg.WithPackObjectsCacheEnabled())
}
-func testServerPostUploadPackSuppressDeepenExitError(t *testing.T, ctx context.Context, opts ...testcfg.Option) {
+func TestServer_PostUploadPackWithSidechannel_suppressDeepenExitError(t *testing.T) {
+ runTestWithAndWithoutConfigOptions(t, testServerPostUploadPackSuppressDeepenExitError, makePostUploadPackWithSidechannelRequest, runSmartHTTPServerWithoutPraefect, testcfg.WithPackObjectsCacheEnabled())
+}
+
+func testServerPostUploadPackSuppressDeepenExitError(t *testing.T, ctx context.Context, makeRequest requestMaker, runServer serverRunner, opts ...testcfg.Option) {
cfg, repo, _ := testcfg.BuildWithRepo(t, opts...)
- serverSocketPath := runSmartHTTPServer(t, cfg)
+ serverSocketPath := runServer(t, cfg)
requestBody := &bytes.Buffer{}
gittest.WritePktlineString(t, requestBody, fmt.Sprintf("want e63f41fe459e62e1228fcef60d7189127aeba95a %s\n", clientCapabilities))
@@ -176,7 +211,7 @@ func testServerPostUploadPackSuppressDeepenExitError(t *testing.T, ctx context.C
gittest.WritePktlineFlush(t, requestBody)
rpcRequest := &gitalypb.PostUploadPackRequest{Repository: repo}
- response, err := makePostUploadPackRequest(ctx, t, serverSocketPath, cfg.Auth.Token, rpcRequest, requestBody)
+ response, err := makeRequest(ctx, t, serverSocketPath, cfg.Auth.Token, rpcRequest, requestBody)
// This assertion is the main reason this test exists.
assert.NoError(t, err)
@@ -184,6 +219,20 @@ func testServerPostUploadPackSuppressDeepenExitError(t *testing.T, ctx context.C
}
func TestServer_PostUploadPack_usesPackObjectsHook(t *testing.T) {
+ ctx, cancel := testhelper.Context()
+ defer cancel()
+
+ testServerPostUploadPackUsesPackObjectsHook(t, ctx, makePostUploadPackRequest, runSmartHTTPServer)
+}
+
+func TestServer_PostUploadPackWithSidechannel_usesPackObjectsHook(t *testing.T) {
+ ctx, cancel := testhelper.Context()
+ defer cancel()
+
+ testServerPostUploadPackUsesPackObjectsHook(t, ctx, makePostUploadPackWithSidechannelRequest, runSmartHTTPServerWithoutPraefect)
+}
+
+func testServerPostUploadPackUsesPackObjectsHook(t *testing.T, ctx context.Context, makeRequest requestMaker, runServer serverRunner, opts ...testcfg.Option) {
cfg, repo, repoPath := testcfg.BuildWithRepo(t, testcfg.WithPackObjectsCacheEnabled())
cfg.BinDir = testhelper.TempDir(t)
@@ -199,7 +248,7 @@ func TestServer_PostUploadPack_usesPackObjectsHook(t *testing.T) {
// transferred back.
testhelper.WriteExecutable(t, filepath.Join(cfg.BinDir, "gitaly-hooks"), []byte(hookScript))
- serverSocketPath := runSmartHTTPServer(t, cfg)
+ serverSocketPath := runServer(t, cfg)
oldHead := bytes.TrimSpace(gittest.Exec(t, cfg, "-C", repoPath, "rev-parse", "master~"))
newHead := bytes.TrimSpace(gittest.Exec(t, cfg, "-C", repoPath, "rev-parse", "master"))
@@ -210,10 +259,7 @@ func TestServer_PostUploadPack_usesPackObjectsHook(t *testing.T) {
gittest.WritePktlineString(t, requestBuffer, fmt.Sprintf("have %s\n", oldHead))
gittest.WritePktlineFlush(t, requestBuffer)
- ctx, cancel := testhelper.Context()
- defer cancel()
-
- _, err := makePostUploadPackRequest(ctx, t, serverSocketPath, cfg.Auth.Token, &gitalypb.PostUploadPackRequest{
+ _, err := makeRequest(ctx, t, serverSocketPath, cfg.Auth.Token, &gitalypb.PostUploadPackRequest{
Repository: repo,
}, requestBuffer)
require.NoError(t, err)
@@ -223,12 +269,12 @@ func TestServer_PostUploadPack_usesPackObjectsHook(t *testing.T) {
}
func TestServer_PostUploadPack_validation(t *testing.T) {
- runTestWithAndWithoutConfigOptions(t, testServerPostUploadPackValidation, testcfg.WithPackObjectsCacheEnabled())
+ runTestWithAndWithoutConfigOptions(t, testServerPostUploadPackValidation, makePostUploadPackRequest, runSmartHTTPServer, testcfg.WithPackObjectsCacheEnabled())
}
-func testServerPostUploadPackValidation(t *testing.T, ctx context.Context, opts ...testcfg.Option) {
+func testServerPostUploadPackValidation(t *testing.T, ctx context.Context, makeRequest requestMaker, runServer serverRunner, opts ...testcfg.Option) {
cfg := testcfg.Build(t, opts...)
- serverSocketPath := runSmartHTTPServer(t, cfg)
+ serverSocketPath := runServer(t, cfg)
rpcRequests := []*gitalypb.PostUploadPackRequest{
{Repository: &gitalypb.Repository{StorageName: "fake", RelativePath: "path"}}, // Repository doesn't exist
@@ -238,39 +284,31 @@ func testServerPostUploadPackValidation(t *testing.T, ctx context.Context, opts
for _, rpcRequest := range rpcRequests {
t.Run(fmt.Sprintf("%v", rpcRequest), func(t *testing.T) {
- _, err := makePostUploadPackRequest(ctx, t, serverSocketPath, cfg.Auth.Token, rpcRequest, bytes.NewBuffer(nil))
+ _, err := makeRequest(ctx, t, serverSocketPath, cfg.Auth.Token, rpcRequest, bytes.NewBuffer(nil))
testhelper.RequireGrpcError(t, err, codes.InvalidArgument)
})
}
}
-func makePostUploadPackRequest(ctx context.Context, t *testing.T, serverSocketPath, token string, in *gitalypb.PostUploadPackRequest, body io.Reader) (*bytes.Buffer, error) {
- client, conn := newSmartHTTPClient(t, serverSocketPath, token)
- defer conn.Close()
+func TestServer_PostUploadPackSidechannel_validation(t *testing.T) {
+ runTestWithAndWithoutConfigOptions(t, testServerPostUploadPackWithSideChannelValidation, makePostUploadPackWithSidechannelRequest, runSmartHTTPServerWithoutPraefect, testcfg.WithPackObjectsCacheEnabled())
+}
- stream, err := client.PostUploadPack(ctx)
- require.NoError(t, err)
+func testServerPostUploadPackWithSideChannelValidation(t *testing.T, ctx context.Context, makeRequest requestMaker, runServer serverRunner, opts ...testcfg.Option) {
+ cfg := testcfg.Build(t, opts...)
+ serverSocketPath := runServer(t, cfg)
- require.NoError(t, stream.Send(in))
+ rpcRequests := []*gitalypb.PostUploadPackRequest{
+ {Repository: &gitalypb.Repository{StorageName: "fake", RelativePath: "path"}}, // Repository doesn't exist
+ {Repository: nil}, // Repository is nil
+ }
- if body != nil {
- sw := streamio.NewWriter(func(p []byte) error {
- return stream.Send(&gitalypb.PostUploadPackRequest{Data: p})
+ for _, rpcRequest := range rpcRequests {
+ t.Run(fmt.Sprintf("%v", rpcRequest), func(t *testing.T) {
+ _, err := makeRequest(ctx, t, serverSocketPath, cfg.Auth.Token, rpcRequest, bytes.NewBuffer(nil))
+ testhelper.RequireGrpcError(t, err, codes.InvalidArgument)
})
-
- _, err = io.Copy(sw, body)
- require.NoError(t, err)
- require.NoError(t, stream.CloseSend())
}
-
- responseBuffer := &bytes.Buffer{}
- rr := streamio.NewReader(func() ([]byte, error) {
- resp, err := stream.Recv()
- return resp.GetData(), err
- })
- _, err = io.Copy(responseBuffer, rr)
-
- return responseBuffer, err
}
// The response contains bunch of things; metadata, progress messages, and a pack file. We're only
@@ -313,16 +351,20 @@ func extractPackDataFromResponse(t *testing.T, buf *bytes.Buffer) ([]byte, int,
}
func TestServer_PostUploadPack_partialClone(t *testing.T) {
- runTestWithAndWithoutConfigOptions(t, testServerPostUploadPackPartialClone, testcfg.WithPackObjectsCacheEnabled())
+ runTestWithAndWithoutConfigOptions(t, testServerPostUploadPackPartialClone, makePostUploadPackRequest, runSmartHTTPServer, testcfg.WithPackObjectsCacheEnabled())
}
-func testServerPostUploadPackPartialClone(t *testing.T, ctx context.Context, opts ...testcfg.Option) {
+func TestServer_PostUploadPackWithSidechannel_partialClone(t *testing.T) {
+ runTestWithAndWithoutConfigOptions(t, testServerPostUploadPackPartialClone, makePostUploadPackWithSidechannelRequest, runSmartHTTPServerWithoutPraefect, testcfg.WithPackObjectsCacheEnabled())
+}
+
+func testServerPostUploadPackPartialClone(t *testing.T, ctx context.Context, makeRequest requestMaker, runServer serverRunner, opts ...testcfg.Option) {
cfg, repo, repoPath := testcfg.BuildWithRepo(t, opts...)
_, localRepoPath := gittest.InitRepo(t, cfg, cfg.Storages[0])
testhelper.BuildGitalyHooks(t, cfg)
negotiationMetrics := prometheus.NewCounterVec(prometheus.CounterOpts{}, []string{"feature"})
- serverSocketPath := runSmartHTTPServer(t, cfg, WithPackfileNegotiationMetrics(negotiationMetrics))
+ serverSocketPath := runServer(t, cfg, WithPackfileNegotiationMetrics(negotiationMetrics))
oldCommit, err := git.NewObjectIDFromHex("1e292f8fedd741b75372e19097c76d327140c312") // refs/heads/master
require.NoError(t, err)
@@ -336,7 +378,7 @@ func testServerPostUploadPackPartialClone(t *testing.T, ctx context.Context, opt
gittest.WritePktlineFlush(t, &requestBuffer)
req := &gitalypb.PostUploadPackRequest{Repository: repo}
- responseBuffer, err := makePostUploadPackRequest(ctx, t, serverSocketPath, cfg.Auth.Token, req, &requestBuffer)
+ responseBuffer, err := makeRequest(ctx, t, serverSocketPath, cfg.Auth.Token, req, &requestBuffer)
require.NoError(t, err)
pack, version, entries := extractPackDataFromResponse(t, responseBuffer)
@@ -363,11 +405,22 @@ func TestServer_PostUploadPack_allowAnySHA1InWant(t *testing.T) {
ctx, cancel := testhelper.Context()
defer cancel()
+ testServerPostUploadPackAllowAnySHA1InWant(t, ctx, makePostUploadPackRequest, runSmartHTTPServer)
+}
+
+func TestServer_PostUploadPackWithSidechannel_allowAnySHA1InWant(t *testing.T) {
+ ctx, cancel := testhelper.Context()
+ defer cancel()
+
+ testServerPostUploadPackAllowAnySHA1InWant(t, ctx, makePostUploadPackWithSidechannelRequest, runSmartHTTPServerWithoutPraefect)
+}
+
+func testServerPostUploadPackAllowAnySHA1InWant(t *testing.T, ctx context.Context, makeRequest requestMaker, runServer serverRunner, opts ...testcfg.Option) {
cfg, repo, repoPath := testcfg.BuildWithRepo(t)
_, localRepoPath := gittest.InitRepo(t, cfg, cfg.Storages[0])
testhelper.BuildGitalyHooks(t, cfg)
- serverSocketPath := runSmartHTTPServer(t, cfg)
+ serverSocketPath := runServer(t, cfg)
newCommit := gittest.WriteCommit(t, cfg, repoPath)
var requestBuffer bytes.Buffer
@@ -377,7 +430,7 @@ func TestServer_PostUploadPack_allowAnySHA1InWant(t *testing.T) {
gittest.WritePktlineFlush(t, &requestBuffer)
req := &gitalypb.PostUploadPackRequest{Repository: repo}
- responseBuffer, err := makePostUploadPackRequest(ctx, t, serverSocketPath, cfg.Auth.Token, req, &requestBuffer)
+ responseBuffer, err := makeRequest(ctx, t, serverSocketPath, cfg.Auth.Token, req, &requestBuffer)
require.NoError(t, err)
pack, version, entries := extractPackDataFromResponse(t, responseBuffer)
@@ -387,3 +440,95 @@ func TestServer_PostUploadPack_allowAnySHA1InWant(t *testing.T) {
gittest.GitObjectMustExist(t, cfg.Git.BinPath, localRepoPath, newCommit.String())
}
+
+func makePostUploadPackRequest(ctx context.Context, t *testing.T, serverSocketPath, token string, in *gitalypb.PostUploadPackRequest, body io.Reader) (*bytes.Buffer, error) {
+ client, conn := newSmartHTTPClient(t, serverSocketPath, token)
+ defer conn.Close()
+
+ stream, err := client.PostUploadPack(ctx)
+ require.NoError(t, err)
+
+ require.NoError(t, stream.Send(in))
+
+ if body != nil {
+ sw := streamio.NewWriter(func(p []byte) error {
+ return stream.Send(&gitalypb.PostUploadPackRequest{Data: p})
+ })
+
+ _, err = io.Copy(sw, body)
+ require.NoError(t, err)
+ require.NoError(t, stream.CloseSend())
+ }
+
+ responseBuffer := &bytes.Buffer{}
+ rr := streamio.NewReader(func() ([]byte, error) {
+ resp, err := stream.Recv()
+ return resp.GetData(), err
+ })
+ _, err = io.Copy(responseBuffer, rr)
+
+ return responseBuffer, err
+}
+
+func dialSmartHTTPServerWithSidechannel(t *testing.T, serverSocketPath, token string, registry *sidechannel.Registry) *grpc.ClientConn {
+ logger := logrus.NewEntry(logrus.New())
+
+ factory := func() backchannel.Server {
+ lm := listenmux.New(insecure.NewCredentials())
+ lm.Register(sidechannel.NewServerHandshaker(registry))
+ return grpc.NewServer(grpc.Creds(lm))
+ }
+ clientHandshaker := backchannel.NewClientHandshaker(logger, factory)
+ connOpts := []grpc.DialOption{
+ grpc.WithTransportCredentials(clientHandshaker.ClientHandshake(insecure.NewCredentials())),
+ grpc.WithPerRPCCredentials(gitalyauth.RPCCredentialsV2(token)),
+ }
+
+ conn, err := grpc.Dial(serverSocketPath, connOpts...)
+ require.NoError(t, err)
+
+ return conn
+}
+
+func makePostUploadPackWithSidechannelRequest(ctx context.Context, t *testing.T, serverSocketPath, token string, in *gitalypb.PostUploadPackRequest, body io.Reader) (*bytes.Buffer, error) {
+ t.Helper()
+
+ registry := sidechannel.NewRegistry()
+ conn := dialSmartHTTPServerWithSidechannel(t, serverSocketPath, token, registry)
+ client := gitalypb.NewSmartHTTPServiceClient(conn)
+ defer testhelper.MustClose(t, conn)
+
+ responseBuffer := &bytes.Buffer{}
+ ctxOut, waiter := sidechannel.RegisterSidechannel(ctx, registry, func(sideConn *sidechannel.ClientConn) error {
+ errC := make(chan error, 1)
+ go func() {
+ _, err := io.Copy(responseBuffer, sideConn)
+ errC <- err
+ }()
+
+ if body != nil {
+ if _, err := io.Copy(sideConn, body); err != nil {
+ return err
+ }
+ }
+
+ if err := sideConn.CloseWrite(); err != nil {
+ return err
+ }
+
+ return <-errC
+ })
+ defer testhelper.MustClose(t, waiter)
+
+ rpcRequest := &gitalypb.PostUploadPackWithSidechannelRequest{
+ Repository: in.GetRepository(),
+ GitConfigOptions: in.GetGitConfigOptions(),
+ GitProtocol: in.GetGitProtocol(),
+ }
+ _, err := client.PostUploadPackWithSidechannel(ctxOut, rpcRequest)
+ if err == nil {
+ require.NoError(t, waiter.Wait())
+ }
+
+ return responseBuffer, err
+}