diff options
author | Quang-Minh Nguyen <qmnguyen@gitlab.com> | 2021-09-17 11:28:12 +0300 |
---|---|---|
committer | Quang-Minh Nguyen <qmnguyen@gitlab.com> | 2021-09-30 13:58:52 +0300 |
commit | 7d681ebd6c531e048a9e5d41f524dafc02e76516 (patch) | |
tree | f285556273e5995e33434673748867ebe7e08cc3 /internal | |
parent | dc297bbcb3bf33b798b58b934259ce94335294c3 (diff) |
Implement PostUploadPackWithSidechannel using sidechannel protocol
Issue: https://gitlab.com/gitlab-com/gl-infra/scalability/-/issues/1219
Changelog: added
Diffstat (limited to 'internal')
-rw-r--r-- | internal/gitaly/service/smarthttp/testhelper_test.go | 16 | ||||
-rw-r--r-- | internal/gitaly/service/smarthttp/upload_pack.go | 145 | ||||
-rw-r--r-- | internal/gitaly/service/smarthttp/upload_pack_test.go | 259 |
3 files changed, 309 insertions, 111 deletions
diff --git a/internal/gitaly/service/smarthttp/testhelper_test.go b/internal/gitaly/service/smarthttp/testhelper_test.go index bb6b2c9fe..3b4ef7cfc 100644 --- a/internal/gitaly/service/smarthttp/testhelper_test.go +++ b/internal/gitaly/service/smarthttp/testhelper_test.go @@ -53,6 +53,22 @@ func runSmartHTTPServer(t *testing.T, cfg config.Cfg, serverOpts ...ServerOpt) s return gitalyServer.Address() } +// TODO: remove this method and use runSmartHTTPServer after +// https://gitlab.com/gitlab-com/gl-infra/scalability/-/issues/1218 +func runSmartHTTPServerWithoutPraefect(t *testing.T, cfg config.Cfg, serverOpts ...ServerOpt) string { + gitalyServer := testserver.StartGitalyServer(t, cfg, nil, func(srv *grpc.Server, deps *service.Dependencies) { + gitalypb.RegisterSmartHTTPServiceServer(srv, NewServer( + deps.GetCfg(), + deps.GetLocator(), + deps.GetGitCmdFactory(), + deps.GetDiskCache(), + serverOpts..., + )) + gitalypb.RegisterHookServiceServer(srv, hookservice.NewServer(deps.GetCfg(), deps.GetHookManager(), deps.GetGitCmdFactory(), deps.GetPackObjectsCache())) + }, testserver.WithDisablePraefect()) + return gitalyServer.Address() +} + func newSmartHTTPClient(t *testing.T, serverSocketPath, token string) (gitalypb.SmartHTTPServiceClient, *grpc.ClientConn) { t.Helper() diff --git a/internal/gitaly/service/smarthttp/upload_pack.go b/internal/gitaly/service/smarthttp/upload_pack.go index e8e900d63..768e686e6 100644 --- a/internal/gitaly/service/smarthttp/upload_pack.go +++ b/internal/gitaly/service/smarthttp/upload_pack.go @@ -10,12 +10,20 @@ import ( "gitlab.com/gitlab-org/gitaly/v14/internal/command" "gitlab.com/gitlab-org/gitaly/v14/internal/git" "gitlab.com/gitlab-org/gitaly/v14/internal/git/stats" + "gitlab.com/gitlab-org/gitaly/v14/internal/helper" + "gitlab.com/gitlab-org/gitaly/v14/internal/sidechannel" "gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb" "gitlab.com/gitlab-org/gitaly/v14/streamio" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" ) +type basicPostUploadPackRequest interface { + GetRepository() *gitalypb.Repository + GetGitConfigOptions() []string + GetGitProtocol() string +} + func (s *server) PostUploadPack(stream gitalypb.SmartHTTPService_PostUploadPackServer) error { ctx := stream.Context() @@ -24,81 +32,48 @@ func (s *server) PostUploadPack(stream gitalypb.SmartHTTPService_PostUploadPackS return err } - if err := validateUploadPackRequest(req); err != nil { - return err + if req.Data != nil { + return status.Errorf(codes.InvalidArgument, "non-empty Data") } - h := sha1.New() + repoPath, gitConfig, err := s.validateUploadPackRequest(ctx, req) + if err != nil { + return err + } - stdinReader := io.TeeReader(streamio.NewReader(func() ([]byte, error) { + stdin := streamio.NewReader(func() ([]byte, error) { resp, err := stream.Recv() return resp.GetData(), err - }), h) - - stdin, collector := s.runStatsCollector(stream.Context(), stdinReader) - defer collector.finish() - - var respBytes int64 - + }) stdout := streamio.NewWriter(func(p []byte) error { - respBytes += int64(len(p)) return stream.Send(&gitalypb.PostUploadPackResponse{Data: p}) }) - repoPath, err := s.locator.GetRepoPath(req.Repository) - if err != nil { - return err - } - - git.WarnIfTooManyBitmaps(ctx, s.locator, req.GetRepository().GetStorageName(), repoPath) + return s.runUploadPack(ctx, req, repoPath, gitConfig, stdin, stdout) +} - config, err := git.ConvertConfigOptions(req.GitConfigOptions) +func (s *server) PostUploadPackWithSidechannel(ctx context.Context, req *gitalypb.PostUploadPackWithSidechannelRequest) (*gitalypb.PostUploadPackWithSidechannelResponse, error) { + repoPath, gitConfig, err := s.validateUploadPackRequest(ctx, req) if err != nil { - return err + return nil, err } - commandOpts := []git.CmdOpt{ - git.WithStdin(stdin), - git.WithStdout(stdout), - git.WithGitProtocol(ctx, req), - git.WithConfig(config...), - git.WithPackObjectsHookEnv(ctx, req.Repository, s.cfg), - } - - cmd, err := s.gitCmdFactory.NewWithoutRepo(ctx, git.SubCmd{ - Name: "upload-pack", - Flags: []git.Option{git.Flag{Name: "--stateless-rpc"}}, - Args: []string{repoPath}, - }, commandOpts...) + conn, err := sidechannel.OpenSidechannel(ctx) if err != nil { - return status.Errorf(codes.Unavailable, "PostUploadPack: cmd: %v", err) + return nil, status.Errorf(codes.Internal, "open sidechannel: %v", err) } + defer conn.Close() - if err := cmd.Wait(); err != nil { - stats := collector.finish() - - if _, ok := command.ExitStatus(err); ok && stats.Deepen != "" { - // We have seen a 'deepen' message in the request. It is expected that - // git-upload-pack has a non-zero exit status: don't treat this as an - // error. - return nil - } - - return status.Errorf(codes.Unavailable, "PostUploadPack: %v", err) + if err := s.runUploadPack(ctx, req, repoPath, gitConfig, conn, conn); err != nil { + return nil, err } - ctxlogrus.Extract(ctx).WithField("request_sha", fmt.Sprintf("%x", h.Sum(nil))).WithField("response_bytes", respBytes).Info("request details") - - return nil -} - -func validateUploadPackRequest(req *gitalypb.PostUploadPackRequest) error { - if req.Data != nil { - return status.Errorf(codes.InvalidArgument, "PostUploadPack: non-empty Data") + if err := conn.Close(); err != nil { + return nil, status.Errorf(codes.Internal, "close sidechannel connection: %v", err) } - return nil + return &gitalypb.PostUploadPackWithSidechannelResponse{}, nil } type statsCollector struct { @@ -133,3 +108,65 @@ func (s *server) runStatsCollector(ctx context.Context, r io.Reader) (io.Reader, return io.TeeReader(r, pw), sc } + +func (s *server) validateUploadPackRequest(ctx context.Context, req basicPostUploadPackRequest) (string, []git.ConfigPair, error) { + repoPath, err := s.locator.GetRepoPath(req.GetRepository()) + if err != nil { + return "", nil, helper.ErrInvalidArgument(err) + } + + git.WarnIfTooManyBitmaps(ctx, s.locator, req.GetRepository().GetStorageName(), repoPath) + + config, err := git.ConvertConfigOptions(req.GetGitConfigOptions()) + if err != nil { + return "", nil, helper.ErrInvalidArgument(err) + } + + return repoPath, config, nil +} + +func (s *server) runUploadPack(ctx context.Context, req basicPostUploadPackRequest, repoPath string, gitConfig []git.ConfigPair, stdin io.Reader, stdout io.Writer) error { + h := sha1.New() + + stdin = io.TeeReader(stdin, h) + stdin, collector := s.runStatsCollector(ctx, stdin) + defer collector.finish() + + commandOpts := []git.CmdOpt{ + git.WithStdin(stdin), + git.WithGitProtocol(ctx, req), + git.WithConfig(gitConfig...), + git.WithPackObjectsHookEnv(ctx, req.GetRepository(), s.cfg), + } + + cmd, err := s.gitCmdFactory.NewWithoutRepo(ctx, git.SubCmd{ + Name: "upload-pack", + Flags: []git.Option{git.Flag{Name: "--stateless-rpc"}}, + Args: []string{repoPath}, + }, commandOpts...) + if err != nil { + return helper.ErrUnavailablef("cmd: %v", err) + } + + respBytes, err := io.Copy(stdout, cmd) + if err != nil { + return helper.ErrUnavailablef("Fail to transfer git data: %v", err) + } + + if err := cmd.Wait(); err != nil { + stats := collector.finish() + + if _, ok := command.ExitStatus(err); ok && stats.Deepen != "" { + // We have seen a 'deepen' message in the request. It is expected that + // git-upload-pack has a non-zero exit status: don't treat this as an + // error. + return nil + } + + return helper.ErrUnavailable(err) + } + + ctxlogrus.Extract(ctx).WithField("request_sha", fmt.Sprintf("%x", h.Sum(nil))).WithField("response_bytes", respBytes).Info("request details") + + return nil +} diff --git a/internal/gitaly/service/smarthttp/upload_pack_test.go b/internal/gitaly/service/smarthttp/upload_pack_test.go index 8785a2c0e..1d9a9012a 100644 --- a/internal/gitaly/service/smarthttp/upload_pack_test.go +++ b/internal/gitaly/service/smarthttp/upload_pack_test.go @@ -11,46 +11,69 @@ import ( "github.com/prometheus/client_golang/prometheus" promtest "github.com/prometheus/client_golang/prometheus/testutil" + "github.com/sirupsen/logrus" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + gitalyauth "gitlab.com/gitlab-org/gitaly/v14/auth" + "gitlab.com/gitlab-org/gitaly/v14/internal/backchannel" "gitlab.com/gitlab-org/gitaly/v14/internal/git" "gitlab.com/gitlab-org/gitaly/v14/internal/git/gittest" "gitlab.com/gitlab-org/gitaly/v14/internal/git/pktline" + "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/config" + "gitlab.com/gitlab-org/gitaly/v14/internal/listenmux" + "gitlab.com/gitlab-org/gitaly/v14/internal/sidechannel" "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper" "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testcfg" "gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb" "gitlab.com/gitlab-org/gitaly/v14/streamio" + "google.golang.org/grpc" "google.golang.org/grpc/codes" + "google.golang.org/grpc/credentials/insecure" ) const ( clientCapabilities = `multi_ack_detailed no-done side-band-64k thin-pack include-tag ofs-delta deepen-since deepen-not filter agent=git/2.18.0` ) -func runTestWithAndWithoutConfigOptions(t *testing.T, tf func(t *testing.T, ctx context.Context, opts ...testcfg.Option), opts ...testcfg.Option) { +type ( + requestMaker func(ctx context.Context, t *testing.T, serverSocketPath, token string, in *gitalypb.PostUploadPackRequest, body io.Reader) (*bytes.Buffer, error) + serverRunner func(t *testing.T, cfg config.Cfg, serverOpts ...ServerOpt) string +) + +func runTestWithAndWithoutConfigOptions( + t *testing.T, + tf func(t *testing.T, ctx context.Context, makeRequest requestMaker, runServer serverRunner, opts ...testcfg.Option), + makeRequest requestMaker, + runServer serverRunner, + opts ...testcfg.Option, +) { ctx, cancel := testhelper.Context() defer cancel() - t.Run("no config options", func(t *testing.T) { tf(t, ctx) }) + t.Run("no config options", func(t *testing.T) { tf(t, ctx, makeRequest, runServer) }) if len(opts) > 0 { t.Run("with config options", func(t *testing.T) { - tf(t, ctx, opts...) + tf(t, ctx, makeRequest, runServer, opts...) }) } } func TestServer_PostUpload(t *testing.T) { - runTestWithAndWithoutConfigOptions(t, testServerPostUpload, testcfg.WithPackObjectsCacheEnabled()) + runTestWithAndWithoutConfigOptions(t, testServerPostUpload, makePostUploadPackRequest, runSmartHTTPServer, testcfg.WithPackObjectsCacheEnabled()) +} + +func TestServer_PostUploadWithChannel(t *testing.T) { + runTestWithAndWithoutConfigOptions(t, testServerPostUpload, makePostUploadPackWithSidechannelRequest, runSmartHTTPServerWithoutPraefect, testcfg.WithPackObjectsCacheEnabled()) } -func testServerPostUpload(t *testing.T, ctx context.Context, opts ...testcfg.Option) { +func testServerPostUpload(t *testing.T, ctx context.Context, makeRequest requestMaker, runServer serverRunner, opts ...testcfg.Option) { cfg, repo, repoPath := testcfg.BuildWithRepo(t, opts...) _, localRepoPath := gittest.CloneRepo(t, cfg, cfg.Storages[0]) testhelper.BuildGitalyHooks(t, cfg) negotiationMetrics := prometheus.NewCounterVec(prometheus.CounterOpts{}, []string{"feature"}) - serverSocketPath := runSmartHTTPServer(t, cfg, WithPackfileNegotiationMetrics(negotiationMetrics)) + serverSocketPath := runServer(t, cfg, WithPackfileNegotiationMetrics(negotiationMetrics)) oldCommit, err := git.NewObjectIDFromHex("1e292f8fedd741b75372e19097c76d327140c312") // refs/heads/master require.NoError(t, err) @@ -65,7 +88,7 @@ func testServerPostUpload(t *testing.T, ctx context.Context, opts ...testcfg.Opt gittest.WritePktlineFlush(t, requestBuffer) req := &gitalypb.PostUploadPackRequest{Repository: repo} - responseBuffer, err := makePostUploadPackRequest(ctx, t, serverSocketPath, cfg.Auth.Token, req, requestBuffer) + responseBuffer, err := makeRequest(ctx, t, serverSocketPath, cfg.Auth.Token, req, requestBuffer) require.NoError(t, err) pack, version, entries := extractPackDataFromResponse(t, responseBuffer) @@ -81,14 +104,18 @@ func testServerPostUpload(t *testing.T, ctx context.Context, opts ...testcfg.Opt } func TestServer_PostUploadPack_gitConfigOptions(t *testing.T) { - runTestWithAndWithoutConfigOptions(t, testServerPostUploadPackGitConfigOptions, testcfg.WithPackObjectsCacheEnabled()) + runTestWithAndWithoutConfigOptions(t, testServerPostUploadPackGitConfigOptions, makePostUploadPackRequest, runSmartHTTPServer, testcfg.WithPackObjectsCacheEnabled()) } -func testServerPostUploadPackGitConfigOptions(t *testing.T, ctx context.Context, opts ...testcfg.Option) { +func TestServer_PostUploadPackSidechannel_gitConfigOptions(t *testing.T) { + runTestWithAndWithoutConfigOptions(t, testServerPostUploadPackGitConfigOptions, makePostUploadPackWithSidechannelRequest, runSmartHTTPServerWithoutPraefect, testcfg.WithPackObjectsCacheEnabled()) +} + +func testServerPostUploadPackGitConfigOptions(t *testing.T, ctx context.Context, makeRequest requestMaker, runServer serverRunner, opts ...testcfg.Option) { cfg, repo, repoPath := testcfg.BuildWithRepo(t, opts...) testhelper.BuildGitalyHooks(t, cfg) - serverSocketPath := runSmartHTTPServer(t, cfg) + serverSocketPath := runServer(t, cfg) want := "3dd08961455abf80ef9115f4afdc1c6f968b503c" // refs/heads/csv gittest.Exec(t, cfg, "-C", repoPath, "update-ref", "refs/hidden/csv", want) @@ -106,7 +133,7 @@ func testServerPostUploadPackGitConfigOptions(t *testing.T, ctx context.Context, t.Run("sanity check: ref exists and can be fetched", func(t *testing.T) { rpcRequest := &gitalypb.PostUploadPackRequest{Repository: repo} - response, err := makePostUploadPackRequest(ctx, t, serverSocketPath, cfg.Auth.Token, rpcRequest, bytes.NewReader(requestBody.Bytes())) + response, err := makeRequest(ctx, t, serverSocketPath, cfg.Auth.Token, rpcRequest, bytes.NewReader(requestBody.Bytes())) require.NoError(t, err) _, _, count := extractPackDataFromResponse(t, response) require.Equal(t, 5, count, "pack should have 5 entries") @@ -120,7 +147,7 @@ func testServerPostUploadPackGitConfigOptions(t *testing.T, ctx context.Context, "uploadpack.allowAnySHA1InWant=false", }, } - response, err := makePostUploadPackRequest(ctx, t, serverSocketPath, cfg.Auth.Token, rpcRequest, bytes.NewReader(requestBody.Bytes())) + response, err := makeRequest(ctx, t, serverSocketPath, cfg.Auth.Token, rpcRequest, bytes.NewReader(requestBody.Bytes())) testhelper.RequireGrpcError(t, err, codes.Unavailable) // The failure message proves that upload-pack failed because of @@ -131,13 +158,17 @@ func testServerPostUploadPackGitConfigOptions(t *testing.T, ctx context.Context, } func TestServer_PostUploadPack_gitProtocol(t *testing.T) { - runTestWithAndWithoutConfigOptions(t, testServerPostUploadPackGitProtocol, testcfg.WithPackObjectsCacheEnabled()) + runTestWithAndWithoutConfigOptions(t, testServerPostUploadPackGitProtocol, makePostUploadPackRequest, runSmartHTTPServer, testcfg.WithPackObjectsCacheEnabled()) +} + +func TestServer_PostUploadPackWithSidechannel_gitProtocol(t *testing.T) { + runTestWithAndWithoutConfigOptions(t, testServerPostUploadPackGitProtocol, makePostUploadPackWithSidechannelRequest, runSmartHTTPServerWithoutPraefect, testcfg.WithPackObjectsCacheEnabled()) } -func testServerPostUploadPackGitProtocol(t *testing.T, ctx context.Context, opts ...testcfg.Option) { +func testServerPostUploadPackGitProtocol(t *testing.T, ctx context.Context, makeRequest requestMaker, runServer serverRunner, opts ...testcfg.Option) { cfg, repo, _ := testcfg.BuildWithRepo(t, opts...) readProto, cfg := gittest.EnableGitProtocolV2Support(t, cfg) - serverSocketPath := runSmartHTTPServer(t, cfg) + serverSocketPath := runServer(t, cfg) // command=ls-refs does not exist in protocol v0, so if this succeeds, we're talking v2 requestBody := &bytes.Buffer{} @@ -152,7 +183,7 @@ func testServerPostUploadPackGitProtocol(t *testing.T, ctx context.Context, opts GitProtocol: git.ProtocolV2, } - _, err := makePostUploadPackRequest(ctx, t, serverSocketPath, cfg.Auth.Token, rpcRequest, requestBody) + _, err := makeRequest(ctx, t, serverSocketPath, cfg.Auth.Token, rpcRequest, requestBody) require.NoError(t, err) envData := readProto() @@ -163,12 +194,16 @@ func testServerPostUploadPackGitProtocol(t *testing.T, ctx context.Context, opts // on 'deepen' requests even though the request is being handled just // fine from the client perspective. func TestServer_PostUploadPack_suppressDeepenExitError(t *testing.T) { - runTestWithAndWithoutConfigOptions(t, testServerPostUploadPackSuppressDeepenExitError, testcfg.WithPackObjectsCacheEnabled()) + runTestWithAndWithoutConfigOptions(t, testServerPostUploadPackSuppressDeepenExitError, makePostUploadPackRequest, runSmartHTTPServer, testcfg.WithPackObjectsCacheEnabled()) } -func testServerPostUploadPackSuppressDeepenExitError(t *testing.T, ctx context.Context, opts ...testcfg.Option) { +func TestServer_PostUploadPackWithSidechannel_suppressDeepenExitError(t *testing.T) { + runTestWithAndWithoutConfigOptions(t, testServerPostUploadPackSuppressDeepenExitError, makePostUploadPackWithSidechannelRequest, runSmartHTTPServerWithoutPraefect, testcfg.WithPackObjectsCacheEnabled()) +} + +func testServerPostUploadPackSuppressDeepenExitError(t *testing.T, ctx context.Context, makeRequest requestMaker, runServer serverRunner, opts ...testcfg.Option) { cfg, repo, _ := testcfg.BuildWithRepo(t, opts...) - serverSocketPath := runSmartHTTPServer(t, cfg) + serverSocketPath := runServer(t, cfg) requestBody := &bytes.Buffer{} gittest.WritePktlineString(t, requestBody, fmt.Sprintf("want e63f41fe459e62e1228fcef60d7189127aeba95a %s\n", clientCapabilities)) @@ -176,7 +211,7 @@ func testServerPostUploadPackSuppressDeepenExitError(t *testing.T, ctx context.C gittest.WritePktlineFlush(t, requestBody) rpcRequest := &gitalypb.PostUploadPackRequest{Repository: repo} - response, err := makePostUploadPackRequest(ctx, t, serverSocketPath, cfg.Auth.Token, rpcRequest, requestBody) + response, err := makeRequest(ctx, t, serverSocketPath, cfg.Auth.Token, rpcRequest, requestBody) // This assertion is the main reason this test exists. assert.NoError(t, err) @@ -184,6 +219,20 @@ func testServerPostUploadPackSuppressDeepenExitError(t *testing.T, ctx context.C } func TestServer_PostUploadPack_usesPackObjectsHook(t *testing.T) { + ctx, cancel := testhelper.Context() + defer cancel() + + testServerPostUploadPackUsesPackObjectsHook(t, ctx, makePostUploadPackRequest, runSmartHTTPServer) +} + +func TestServer_PostUploadPackWithSidechannel_usesPackObjectsHook(t *testing.T) { + ctx, cancel := testhelper.Context() + defer cancel() + + testServerPostUploadPackUsesPackObjectsHook(t, ctx, makePostUploadPackWithSidechannelRequest, runSmartHTTPServerWithoutPraefect) +} + +func testServerPostUploadPackUsesPackObjectsHook(t *testing.T, ctx context.Context, makeRequest requestMaker, runServer serverRunner, opts ...testcfg.Option) { cfg, repo, repoPath := testcfg.BuildWithRepo(t, testcfg.WithPackObjectsCacheEnabled()) cfg.BinDir = testhelper.TempDir(t) @@ -199,7 +248,7 @@ func TestServer_PostUploadPack_usesPackObjectsHook(t *testing.T) { // transferred back. testhelper.WriteExecutable(t, filepath.Join(cfg.BinDir, "gitaly-hooks"), []byte(hookScript)) - serverSocketPath := runSmartHTTPServer(t, cfg) + serverSocketPath := runServer(t, cfg) oldHead := bytes.TrimSpace(gittest.Exec(t, cfg, "-C", repoPath, "rev-parse", "master~")) newHead := bytes.TrimSpace(gittest.Exec(t, cfg, "-C", repoPath, "rev-parse", "master")) @@ -210,10 +259,7 @@ func TestServer_PostUploadPack_usesPackObjectsHook(t *testing.T) { gittest.WritePktlineString(t, requestBuffer, fmt.Sprintf("have %s\n", oldHead)) gittest.WritePktlineFlush(t, requestBuffer) - ctx, cancel := testhelper.Context() - defer cancel() - - _, err := makePostUploadPackRequest(ctx, t, serverSocketPath, cfg.Auth.Token, &gitalypb.PostUploadPackRequest{ + _, err := makeRequest(ctx, t, serverSocketPath, cfg.Auth.Token, &gitalypb.PostUploadPackRequest{ Repository: repo, }, requestBuffer) require.NoError(t, err) @@ -223,12 +269,12 @@ func TestServer_PostUploadPack_usesPackObjectsHook(t *testing.T) { } func TestServer_PostUploadPack_validation(t *testing.T) { - runTestWithAndWithoutConfigOptions(t, testServerPostUploadPackValidation, testcfg.WithPackObjectsCacheEnabled()) + runTestWithAndWithoutConfigOptions(t, testServerPostUploadPackValidation, makePostUploadPackRequest, runSmartHTTPServer, testcfg.WithPackObjectsCacheEnabled()) } -func testServerPostUploadPackValidation(t *testing.T, ctx context.Context, opts ...testcfg.Option) { +func testServerPostUploadPackValidation(t *testing.T, ctx context.Context, makeRequest requestMaker, runServer serverRunner, opts ...testcfg.Option) { cfg := testcfg.Build(t, opts...) - serverSocketPath := runSmartHTTPServer(t, cfg) + serverSocketPath := runServer(t, cfg) rpcRequests := []*gitalypb.PostUploadPackRequest{ {Repository: &gitalypb.Repository{StorageName: "fake", RelativePath: "path"}}, // Repository doesn't exist @@ -238,39 +284,31 @@ func testServerPostUploadPackValidation(t *testing.T, ctx context.Context, opts for _, rpcRequest := range rpcRequests { t.Run(fmt.Sprintf("%v", rpcRequest), func(t *testing.T) { - _, err := makePostUploadPackRequest(ctx, t, serverSocketPath, cfg.Auth.Token, rpcRequest, bytes.NewBuffer(nil)) + _, err := makeRequest(ctx, t, serverSocketPath, cfg.Auth.Token, rpcRequest, bytes.NewBuffer(nil)) testhelper.RequireGrpcError(t, err, codes.InvalidArgument) }) } } -func makePostUploadPackRequest(ctx context.Context, t *testing.T, serverSocketPath, token string, in *gitalypb.PostUploadPackRequest, body io.Reader) (*bytes.Buffer, error) { - client, conn := newSmartHTTPClient(t, serverSocketPath, token) - defer conn.Close() +func TestServer_PostUploadPackSidechannel_validation(t *testing.T) { + runTestWithAndWithoutConfigOptions(t, testServerPostUploadPackWithSideChannelValidation, makePostUploadPackWithSidechannelRequest, runSmartHTTPServerWithoutPraefect, testcfg.WithPackObjectsCacheEnabled()) +} - stream, err := client.PostUploadPack(ctx) - require.NoError(t, err) +func testServerPostUploadPackWithSideChannelValidation(t *testing.T, ctx context.Context, makeRequest requestMaker, runServer serverRunner, opts ...testcfg.Option) { + cfg := testcfg.Build(t, opts...) + serverSocketPath := runServer(t, cfg) - require.NoError(t, stream.Send(in)) + rpcRequests := []*gitalypb.PostUploadPackRequest{ + {Repository: &gitalypb.Repository{StorageName: "fake", RelativePath: "path"}}, // Repository doesn't exist + {Repository: nil}, // Repository is nil + } - if body != nil { - sw := streamio.NewWriter(func(p []byte) error { - return stream.Send(&gitalypb.PostUploadPackRequest{Data: p}) + for _, rpcRequest := range rpcRequests { + t.Run(fmt.Sprintf("%v", rpcRequest), func(t *testing.T) { + _, err := makeRequest(ctx, t, serverSocketPath, cfg.Auth.Token, rpcRequest, bytes.NewBuffer(nil)) + testhelper.RequireGrpcError(t, err, codes.InvalidArgument) }) - - _, err = io.Copy(sw, body) - require.NoError(t, err) - require.NoError(t, stream.CloseSend()) } - - responseBuffer := &bytes.Buffer{} - rr := streamio.NewReader(func() ([]byte, error) { - resp, err := stream.Recv() - return resp.GetData(), err - }) - _, err = io.Copy(responseBuffer, rr) - - return responseBuffer, err } // The response contains bunch of things; metadata, progress messages, and a pack file. We're only @@ -313,16 +351,20 @@ func extractPackDataFromResponse(t *testing.T, buf *bytes.Buffer) ([]byte, int, } func TestServer_PostUploadPack_partialClone(t *testing.T) { - runTestWithAndWithoutConfigOptions(t, testServerPostUploadPackPartialClone, testcfg.WithPackObjectsCacheEnabled()) + runTestWithAndWithoutConfigOptions(t, testServerPostUploadPackPartialClone, makePostUploadPackRequest, runSmartHTTPServer, testcfg.WithPackObjectsCacheEnabled()) } -func testServerPostUploadPackPartialClone(t *testing.T, ctx context.Context, opts ...testcfg.Option) { +func TestServer_PostUploadPackWithSidechannel_partialClone(t *testing.T) { + runTestWithAndWithoutConfigOptions(t, testServerPostUploadPackPartialClone, makePostUploadPackWithSidechannelRequest, runSmartHTTPServerWithoutPraefect, testcfg.WithPackObjectsCacheEnabled()) +} + +func testServerPostUploadPackPartialClone(t *testing.T, ctx context.Context, makeRequest requestMaker, runServer serverRunner, opts ...testcfg.Option) { cfg, repo, repoPath := testcfg.BuildWithRepo(t, opts...) _, localRepoPath := gittest.InitRepo(t, cfg, cfg.Storages[0]) testhelper.BuildGitalyHooks(t, cfg) negotiationMetrics := prometheus.NewCounterVec(prometheus.CounterOpts{}, []string{"feature"}) - serverSocketPath := runSmartHTTPServer(t, cfg, WithPackfileNegotiationMetrics(negotiationMetrics)) + serverSocketPath := runServer(t, cfg, WithPackfileNegotiationMetrics(negotiationMetrics)) oldCommit, err := git.NewObjectIDFromHex("1e292f8fedd741b75372e19097c76d327140c312") // refs/heads/master require.NoError(t, err) @@ -336,7 +378,7 @@ func testServerPostUploadPackPartialClone(t *testing.T, ctx context.Context, opt gittest.WritePktlineFlush(t, &requestBuffer) req := &gitalypb.PostUploadPackRequest{Repository: repo} - responseBuffer, err := makePostUploadPackRequest(ctx, t, serverSocketPath, cfg.Auth.Token, req, &requestBuffer) + responseBuffer, err := makeRequest(ctx, t, serverSocketPath, cfg.Auth.Token, req, &requestBuffer) require.NoError(t, err) pack, version, entries := extractPackDataFromResponse(t, responseBuffer) @@ -363,11 +405,22 @@ func TestServer_PostUploadPack_allowAnySHA1InWant(t *testing.T) { ctx, cancel := testhelper.Context() defer cancel() + testServerPostUploadPackAllowAnySHA1InWant(t, ctx, makePostUploadPackRequest, runSmartHTTPServer) +} + +func TestServer_PostUploadPackWithSidechannel_allowAnySHA1InWant(t *testing.T) { + ctx, cancel := testhelper.Context() + defer cancel() + + testServerPostUploadPackAllowAnySHA1InWant(t, ctx, makePostUploadPackWithSidechannelRequest, runSmartHTTPServerWithoutPraefect) +} + +func testServerPostUploadPackAllowAnySHA1InWant(t *testing.T, ctx context.Context, makeRequest requestMaker, runServer serverRunner, opts ...testcfg.Option) { cfg, repo, repoPath := testcfg.BuildWithRepo(t) _, localRepoPath := gittest.InitRepo(t, cfg, cfg.Storages[0]) testhelper.BuildGitalyHooks(t, cfg) - serverSocketPath := runSmartHTTPServer(t, cfg) + serverSocketPath := runServer(t, cfg) newCommit := gittest.WriteCommit(t, cfg, repoPath) var requestBuffer bytes.Buffer @@ -377,7 +430,7 @@ func TestServer_PostUploadPack_allowAnySHA1InWant(t *testing.T) { gittest.WritePktlineFlush(t, &requestBuffer) req := &gitalypb.PostUploadPackRequest{Repository: repo} - responseBuffer, err := makePostUploadPackRequest(ctx, t, serverSocketPath, cfg.Auth.Token, req, &requestBuffer) + responseBuffer, err := makeRequest(ctx, t, serverSocketPath, cfg.Auth.Token, req, &requestBuffer) require.NoError(t, err) pack, version, entries := extractPackDataFromResponse(t, responseBuffer) @@ -387,3 +440,95 @@ func TestServer_PostUploadPack_allowAnySHA1InWant(t *testing.T) { gittest.GitObjectMustExist(t, cfg.Git.BinPath, localRepoPath, newCommit.String()) } + +func makePostUploadPackRequest(ctx context.Context, t *testing.T, serverSocketPath, token string, in *gitalypb.PostUploadPackRequest, body io.Reader) (*bytes.Buffer, error) { + client, conn := newSmartHTTPClient(t, serverSocketPath, token) + defer conn.Close() + + stream, err := client.PostUploadPack(ctx) + require.NoError(t, err) + + require.NoError(t, stream.Send(in)) + + if body != nil { + sw := streamio.NewWriter(func(p []byte) error { + return stream.Send(&gitalypb.PostUploadPackRequest{Data: p}) + }) + + _, err = io.Copy(sw, body) + require.NoError(t, err) + require.NoError(t, stream.CloseSend()) + } + + responseBuffer := &bytes.Buffer{} + rr := streamio.NewReader(func() ([]byte, error) { + resp, err := stream.Recv() + return resp.GetData(), err + }) + _, err = io.Copy(responseBuffer, rr) + + return responseBuffer, err +} + +func dialSmartHTTPServerWithSidechannel(t *testing.T, serverSocketPath, token string, registry *sidechannel.Registry) *grpc.ClientConn { + logger := logrus.NewEntry(logrus.New()) + + factory := func() backchannel.Server { + lm := listenmux.New(insecure.NewCredentials()) + lm.Register(sidechannel.NewServerHandshaker(registry)) + return grpc.NewServer(grpc.Creds(lm)) + } + clientHandshaker := backchannel.NewClientHandshaker(logger, factory) + connOpts := []grpc.DialOption{ + grpc.WithTransportCredentials(clientHandshaker.ClientHandshake(insecure.NewCredentials())), + grpc.WithPerRPCCredentials(gitalyauth.RPCCredentialsV2(token)), + } + + conn, err := grpc.Dial(serverSocketPath, connOpts...) + require.NoError(t, err) + + return conn +} + +func makePostUploadPackWithSidechannelRequest(ctx context.Context, t *testing.T, serverSocketPath, token string, in *gitalypb.PostUploadPackRequest, body io.Reader) (*bytes.Buffer, error) { + t.Helper() + + registry := sidechannel.NewRegistry() + conn := dialSmartHTTPServerWithSidechannel(t, serverSocketPath, token, registry) + client := gitalypb.NewSmartHTTPServiceClient(conn) + defer testhelper.MustClose(t, conn) + + responseBuffer := &bytes.Buffer{} + ctxOut, waiter := sidechannel.RegisterSidechannel(ctx, registry, func(sideConn *sidechannel.ClientConn) error { + errC := make(chan error, 1) + go func() { + _, err := io.Copy(responseBuffer, sideConn) + errC <- err + }() + + if body != nil { + if _, err := io.Copy(sideConn, body); err != nil { + return err + } + } + + if err := sideConn.CloseWrite(); err != nil { + return err + } + + return <-errC + }) + defer testhelper.MustClose(t, waiter) + + rpcRequest := &gitalypb.PostUploadPackWithSidechannelRequest{ + Repository: in.GetRepository(), + GitConfigOptions: in.GetGitConfigOptions(), + GitProtocol: in.GetGitProtocol(), + } + _, err := client.PostUploadPackWithSidechannel(ctxOut, rpcRequest) + if err == nil { + require.NoError(t, waiter.Wait()) + } + + return responseBuffer, err +} |