Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJohn Cai <jcai@gitlab.com>2022-07-28 20:46:51 +0300
committerJohn Cai <jcai@gitlab.com>2022-07-28 20:46:51 +0300
commit0fa08d953e0d5497fe5366836d0ed54b9ff557d8 (patch)
tree0cbaa7c48dbc03c3d0226a6007e46bb9ebbb764e
parent1ce91833ee8b048250cd6eb776ea7c6ab61f1366 (diff)
parentc5ca566139c5855201904f0b428ff18a732ae3fa (diff)
Merge branch 'pks-grpc-proxy-test-refactorings' into 'master'
proxy: Refactor tests to share less state and use `grpc_testing` test service See merge request gitlab-org/gitaly!4756
-rw-r--r--Makefile1
-rw-r--r--client/dial_test.go60
-rw-r--r--internal/praefect/grpc-proxy/proxy/codec_test.go2
-rw-r--r--internal/praefect/grpc-proxy/proxy/handler_ext_test.go517
-rw-r--r--internal/praefect/grpc-proxy/proxy/handler_test.go2
-rw-r--r--internal/praefect/grpc-proxy/proxy/helper_test.go120
-rw-r--r--internal/praefect/grpc-proxy/proxy/peeker_test.go169
-rw-r--r--internal/praefect/grpc-proxy/proxy/testhelper_test.go131
-rw-r--r--internal/praefect/grpc-proxy/testdata/test.pb.go306
-rw-r--r--internal/praefect/grpc-proxy/testdata/test.proto31
-rw-r--r--internal/praefect/grpc-proxy/testdata/test_grpc.pb.go309
11 files changed, 517 insertions, 1131 deletions
diff --git a/Makefile b/Makefile
index 089d606f0..b642960fa 100644
--- a/Makefile
+++ b/Makefile
@@ -498,7 +498,6 @@ proto: ${PROTOC} ${PROTOC_GEN_GO} ${PROTOC_GEN_GO_GRPC} ${PROTOC_GEN_GITALY_PROT
${PROTOC} ${SHARED_PROTOC_OPTS} -I ${SOURCE_DIR}/proto -I ${PROTOC_INSTALL_DIR}/include --go_out=${SOURCE_DIR}/proto/go/gitalypb --gitaly-protolist_out=proto_dir=${SOURCE_DIR}/proto,gitalypb_dir=${SOURCE_DIR}/proto/go/gitalypb:${SOURCE_DIR} --go-grpc_out=${SOURCE_DIR}/proto/go/gitalypb ${SOURCE_DIR}/proto/*.proto
${SOURCE_DIR}/_support/generate-proto-ruby
@ # this part is related to the generation of sources from testing proto files
- ${PROTOC} ${SHARED_PROTOC_OPTS} -I ${SOURCE_DIR}/internal --go_out=${SOURCE_DIR}/internal --go-grpc_out=${SOURCE_DIR}/internal ${SOURCE_DIR}/internal/praefect/grpc-proxy/testdata/test.proto
${PROTOC} ${SHARED_PROTOC_OPTS} -I ${SOURCE_DIR}/proto -I ${SOURCE_DIR}/internal -I ${PROTOC_INSTALL_DIR}/include --go_out=${SOURCE_DIR}/internal --go-grpc_out=${SOURCE_DIR}/internal \
${SOURCE_DIR}/internal/praefect/mock/mock.proto \
${SOURCE_DIR}/internal/middleware/cache/testdata/stream.proto \
diff --git a/client/dial_test.go b/client/dial_test.go
index e67b6bea9..92b4aa0be 100644
--- a/client/dial_test.go
+++ b/client/dial_test.go
@@ -19,7 +19,6 @@ import (
"github.com/stretchr/testify/require"
"github.com/uber/jaeger-client-go"
gitalyauth "gitlab.com/gitlab-org/gitaly/v15/auth"
- proxytestdata "gitlab.com/gitlab-org/gitaly/v15/internal/praefect/grpc-proxy/testdata"
"gitlab.com/gitlab-org/gitaly/v15/internal/testhelper"
gitalyx509 "gitlab.com/gitlab-org/gitaly/v15/internal/x509"
"gitlab.com/gitlab-org/labkit/correlation"
@@ -31,6 +30,7 @@ import (
"google.golang.org/grpc/credentials/insecure"
healthpb "google.golang.org/grpc/health/grpc_health_v1"
"google.golang.org/grpc/status"
+ "google.golang.org/grpc/test/grpc_testing"
)
var proxyEnvironmentKeys = []string{"http_proxy", "https_proxy", "no_proxy"}
@@ -257,22 +257,22 @@ func TestDialSidechannel(t *testing.T) {
}
type testSvc struct {
- proxytestdata.UnimplementedTestServiceServer
- PingMethod func(context.Context, *proxytestdata.PingRequest) (*proxytestdata.PingResponse, error)
- PingStreamMethod func(stream proxytestdata.TestService_PingStreamServer) error
+ grpc_testing.UnimplementedTestServiceServer
+ unaryCall func(context.Context, *grpc_testing.SimpleRequest) (*grpc_testing.SimpleResponse, error)
+ fullDuplexCall func(stream grpc_testing.TestService_FullDuplexCallServer) error
}
-func (ts *testSvc) Ping(ctx context.Context, r *proxytestdata.PingRequest) (*proxytestdata.PingResponse, error) {
- if ts.PingMethod != nil {
- return ts.PingMethod(ctx, r)
+func (ts *testSvc) UnaryCall(ctx context.Context, r *grpc_testing.SimpleRequest) (*grpc_testing.SimpleResponse, error) {
+ if ts.unaryCall != nil {
+ return ts.unaryCall(ctx, r)
}
- return &proxytestdata.PingResponse{}, nil
+ return &grpc_testing.SimpleResponse{}, nil
}
-func (ts *testSvc) PingStream(stream proxytestdata.TestService_PingStreamServer) error {
- if ts.PingStreamMethod != nil {
- return ts.PingStreamMethod(stream)
+func (ts *testSvc) FullDuplexCall(stream grpc_testing.TestService_FullDuplexCallServer) error {
+ if ts.fullDuplexCall != nil {
+ return ts.fullDuplexCall(stream)
}
return nil
@@ -287,13 +287,13 @@ func TestDial_Correlation(t *testing.T) {
grpcServer := grpc.NewServer(grpc.UnaryInterceptor(grpccorrelation.UnaryServerCorrelationInterceptor()))
svc := &testSvc{
- PingMethod: func(ctx context.Context, r *proxytestdata.PingRequest) (*proxytestdata.PingResponse, error) {
+ unaryCall: func(ctx context.Context, r *grpc_testing.SimpleRequest) (*grpc_testing.SimpleResponse, error) {
cid := correlation.ExtractFromContext(ctx)
assert.Equal(t, "correlation-id-1", cid)
- return &proxytestdata.PingResponse{}, nil
+ return &grpc_testing.SimpleResponse{}, nil
},
}
- proxytestdata.RegisterTestServiceServer(grpcServer, svc)
+ grpc_testing.RegisterTestServiceServer(grpcServer, svc)
go func() { assert.NoError(t, grpcServer.Serve(listener)) }()
@@ -304,10 +304,10 @@ func TestDial_Correlation(t *testing.T) {
require.NoError(t, err)
defer cc.Close()
- client := proxytestdata.NewTestServiceClient(cc)
+ client := grpc_testing.NewTestServiceClient(cc)
ctx = correlation.ContextWithCorrelation(ctx, "correlation-id-1")
- _, err = client.Ping(ctx, &proxytestdata.PingRequest{})
+ _, err = client.UnaryCall(ctx, &grpc_testing.SimpleRequest{})
require.NoError(t, err)
})
@@ -319,15 +319,15 @@ func TestDial_Correlation(t *testing.T) {
grpcServer := grpc.NewServer(grpc.StreamInterceptor(grpccorrelation.StreamServerCorrelationInterceptor()))
svc := &testSvc{
- PingStreamMethod: func(stream proxytestdata.TestService_PingStreamServer) error {
+ fullDuplexCall: func(stream grpc_testing.TestService_FullDuplexCallServer) error {
cid := correlation.ExtractFromContext(stream.Context())
assert.Equal(t, "correlation-id-1", cid)
_, err := stream.Recv()
assert.NoError(t, err)
- return stream.Send(&proxytestdata.PingResponse{})
+ return stream.Send(&grpc_testing.StreamingOutputCallResponse{})
},
}
- proxytestdata.RegisterTestServiceServer(grpcServer, svc)
+ grpc_testing.RegisterTestServiceServer(grpcServer, svc)
go func() { assert.NoError(t, grpcServer.Serve(listener)) }()
defer grpcServer.Stop()
@@ -337,13 +337,13 @@ func TestDial_Correlation(t *testing.T) {
require.NoError(t, err)
defer cc.Close()
- client := proxytestdata.NewTestServiceClient(cc)
+ client := grpc_testing.NewTestServiceClient(cc)
ctx = correlation.ContextWithCorrelation(ctx, "correlation-id-1")
- stream, err := client.PingStream(ctx)
+ stream, err := client.FullDuplexCall(ctx)
require.NoError(t, err)
- require.NoError(t, stream.Send(&proxytestdata.PingRequest{}))
+ require.NoError(t, stream.Send(&grpc_testing.StreamingOutputCallRequest{}))
require.NoError(t, stream.CloseSend())
_, err = stream.Recv()
@@ -367,13 +367,13 @@ func TestDial_Tracing(t *testing.T) {
grpc.StreamInterceptor(grpctracing.StreamServerTracingInterceptor()),
)
svc := &testSvc{
- PingMethod: func(ctx context.Context, r *proxytestdata.PingRequest) (*proxytestdata.PingResponse, error) {
+ unaryCall: func(ctx context.Context, r *grpc_testing.SimpleRequest) (*grpc_testing.SimpleResponse, error) {
span, _ := opentracing.StartSpanFromContext(ctx, "nested-span")
defer span.Finish()
span.LogKV("was", "called")
- return &proxytestdata.PingResponse{}, nil
+ return &grpc_testing.SimpleResponse{}, nil
},
- PingStreamMethod: func(stream proxytestdata.TestService_PingStreamServer) error {
+ fullDuplexCall: func(stream grpc_testing.TestService_FullDuplexCallServer) error {
// synchronize the client has returned from CloseSend as the client span finishing
// races with sending the stream close to the server
select {
@@ -388,7 +388,7 @@ func TestDial_Tracing(t *testing.T) {
return nil
},
}
- proxytestdata.RegisterTestServiceServer(grpcServer, svc)
+ grpc_testing.RegisterTestServiceServer(grpcServer, svc)
go func() { require.NoError(t, grpcServer.Serve(listener)) }()
defer grpcServer.Stop()
@@ -419,7 +419,7 @@ func TestDial_Tracing(t *testing.T) {
// We're now invoking the unary RPC with the span injected into
// the context. This should create a span that's nested into
// the "stream-check" span.
- _, err = proxytestdata.NewTestServiceClient(cc).Ping(ctx, &proxytestdata.PingRequest{})
+ _, err = grpc_testing.NewTestServiceClient(cc).UnaryCall(ctx, &grpc_testing.SimpleRequest{})
require.NoError(t, err)
span.Finish()
@@ -438,7 +438,7 @@ func TestDial_Tracing(t *testing.T) {
// This span is the RPC call to TestService/Ping. It
// inherits the "unary-check" we set up and thus has
// baggage.
- {baggage: "stub", operation: "/mwitkow.testproto.TestService/Ping"},
+ {baggage: "stub", operation: "/grpc.testing.TestService/UnaryCall"},
// And this finally is the outermost span which we
// manually set up before the RPC call.
{baggage: "stub", operation: "unary-check"},
@@ -474,7 +474,7 @@ func TestDial_Tracing(t *testing.T) {
// We're now invoking the streaming RPC with the span injected into the context.
// This should create a span that's nested into the "stream-check" span.
- stream, err := proxytestdata.NewTestServiceClient(cc).PingStream(ctx)
+ stream, err := grpc_testing.NewTestServiceClient(cc).FullDuplexCall(ctx)
require.NoError(t, err)
require.NoError(t, stream.CloseSend())
close(clientSendClosed)
@@ -494,7 +494,7 @@ func TestDial_Tracing(t *testing.T) {
operation string
}{
// This span is the RPC call to TestService/Ping.
- {baggage: "stub", operation: "/mwitkow.testproto.TestService/PingStream"},
+ {baggage: "stub", operation: "/grpc.testing.TestService/FullDuplexCall"},
// This is the second span we expect, which is the "nested-span" span which
// we've manually created inside of PingMethod. This is different than for
// unary RPCs: given that one can send multiple messages to the RPC, we may
diff --git a/internal/praefect/grpc-proxy/proxy/codec_test.go b/internal/praefect/grpc-proxy/proxy/codec_test.go
index 959d4203d..b05738f1e 100644
--- a/internal/praefect/grpc-proxy/proxy/codec_test.go
+++ b/internal/praefect/grpc-proxy/proxy/codec_test.go
@@ -9,6 +9,8 @@ import (
)
func TestCodec_ReadYourWrites(t *testing.T) {
+ t.Parallel()
+
framePtr := &frame{}
data := []byte{0xDE, 0xAD, 0xBE, 0xEF}
codec := rawCodec{}
diff --git a/internal/praefect/grpc-proxy/proxy/handler_ext_test.go b/internal/praefect/grpc-proxy/proxy/handler_ext_test.go
index 2610a6132..7e5234fcd 100644
--- a/internal/praefect/grpc-proxy/proxy/handler_ext_test.go
+++ b/internal/praefect/grpc-proxy/proxy/handler_ext_test.go
@@ -18,134 +18,99 @@ import (
"testing"
"github.com/getsentry/sentry-go"
- grpcmw "github.com/grpc-ecosystem/go-grpc-middleware"
- grpcmwtags "github.com/grpc-ecosystem/go-grpc-middleware/tags"
- "github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
- "github.com/stretchr/testify/suite"
"gitlab.com/gitlab-org/gitaly/v15/client"
- "gitlab.com/gitlab-org/gitaly/v15/internal/helper/fieldextractors"
+ "gitlab.com/gitlab-org/gitaly/v15/internal/helper"
"gitlab.com/gitlab-org/gitaly/v15/internal/metadata"
- "gitlab.com/gitlab-org/gitaly/v15/internal/middleware/sentryhandler"
"gitlab.com/gitlab-org/gitaly/v15/internal/praefect/grpc-proxy/proxy"
- pb "gitlab.com/gitlab-org/gitaly/v15/internal/praefect/grpc-proxy/testdata"
"gitlab.com/gitlab-org/gitaly/v15/internal/testhelper"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials/insecure"
grpc_metadata "google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"
+ "google.golang.org/grpc/test/grpc_testing"
)
const (
- pingDefaultValue = "I like kittens."
- clientMdKey = "test-client-header"
- serverHeaderMdKey = "test-client-header"
- serverTrailerMdKey = "test-client-trailer"
-
rejectingMdKey = "test-reject-rpc-if-in-context"
-
- countListResponses = 20
)
-func TestMain(m *testing.M) {
- testhelper.Run(m)
+func TestHandler_carriesClientMetadata(t *testing.T) {
+ t.Parallel()
+ testHandlerCarriesClientMetadata(t)
}
-// asserting service is implemented on the server side and serves as a handler for stuff
-type assertingService struct {
- pb.UnimplementedTestServiceServer
- t *testing.T
-}
+func TestHandler_carriesClientMetadataStressTest(t *testing.T) {
+ t.Parallel()
-func (s *assertingService) PingEmpty(ctx context.Context, _ *pb.Empty) (*pb.PingResponse, error) {
- // Check that this call has client's metadata.
- md, ok := grpc_metadata.FromIncomingContext(ctx)
- assert.True(s.t, ok, "PingEmpty call must have metadata in context")
- _, ok = md[clientMdKey]
- assert.True(s.t, ok, "PingEmpty call must have clients's custom headers in metadata")
- return &pb.PingResponse{Value: pingDefaultValue, Counter: 42}, nil
+ for i := 0; i < 50; i++ {
+ testHandlerCarriesClientMetadata(t)
+ }
}
-func (s *assertingService) Ping(ctx context.Context, ping *pb.PingRequest) (*pb.PingResponse, error) {
- // Send user trailers and headers.
- require.NoError(s.t, grpc.SendHeader(ctx, grpc_metadata.Pairs(serverHeaderMdKey, "I like turtles.")))
- require.NoError(s.t, grpc.SetTrailer(ctx, grpc_metadata.Pairs(serverTrailerMdKey, "I like ending turtles.")))
- return &pb.PingResponse{Value: ping.Value, Counter: 42}, nil
-}
+func testHandlerCarriesClientMetadata(t *testing.T) {
+ ctx, client, backend := setupProxy(t)
-func (s *assertingService) PingError(ctx context.Context, ping *pb.PingRequest) (*pb.Empty, error) {
- return nil, status.Errorf(codes.ResourceExhausted, "Userspace error.")
-}
+ backend.unaryCall = func(ctx context.Context, request *grpc_testing.SimpleRequest) (*grpc_testing.SimpleResponse, error) {
+ metadata, ok := grpc_metadata.FromIncomingContext(ctx)
+ require.True(t, ok)
-func (s *assertingService) PingList(ping *pb.PingRequest, stream pb.TestService_PingListServer) error {
- // Send user trailers and headers.
- require.NoError(s.t, stream.SendHeader(grpc_metadata.Pairs(serverHeaderMdKey, "I like turtles.")))
- for i := 0; i < countListResponses; i++ {
- require.NoError(s.t, stream.Send(&pb.PingResponse{Value: ping.Value, Counter: int32(i)}))
- }
- stream.SetTrailer(grpc_metadata.Pairs(serverTrailerMdKey, "I like ending turtles."))
- return nil
-}
+ metadataValue, ok := metadata["injected_metadata"]
+ require.True(t, ok)
+ require.Equal(t, []string{"injected_value"}, metadataValue)
-func (s *assertingService) PingStream(stream pb.TestService_PingStreamServer) error {
- require.NoError(s.t, stream.SendHeader(grpc_metadata.Pairs(serverHeaderMdKey, "I like turtles.")))
- counter := int32(0)
- for {
- ping, err := stream.Recv()
- if err == io.EOF {
- break
- } else if err != nil {
- require.NoError(s.t, err, "can't fail reading stream")
- return err
- }
- pong := &pb.PingResponse{Value: ping.Value, Counter: counter}
- if err := stream.Send(pong); err != nil {
- require.NoError(s.t, err, "can't fail sending back a pong")
- }
- counter++
+ return &grpc_testing.SimpleResponse{Payload: request.GetPayload()}, nil
}
- stream.SetTrailer(grpc_metadata.Pairs(serverTrailerMdKey, "I like ending turtles."))
- return nil
-}
-// ProxyHappySuite tests the "happy" path of handling: that everything works in absence of connection issues.
-type ProxyHappySuite struct {
- suite.Suite
- ctx context.Context
- cancel context.CancelFunc
- server *grpc.Server
- proxy *grpc.Server
- connProxy2Server *grpc.ClientConn
- client pb.TestServiceClient
- connClient2Proxy *grpc.ClientConn
-}
+ ctx = grpc_metadata.NewOutgoingContext(ctx, grpc_metadata.Pairs("injected_metadata", "injected_value"))
-func (s *ProxyHappySuite) TestPingEmptyCarriesClientMetadata() {
- ctx := grpc_metadata.NewOutgoingContext(s.ctx, grpc_metadata.Pairs(clientMdKey, "true"))
- out, err := s.client.PingEmpty(ctx, &pb.Empty{})
- require.NoError(s.T(), err, "PingEmpty should succeed without errors")
- testhelper.ProtoEqual(s.T(), &pb.PingResponse{Value: pingDefaultValue, Counter: 42}, out)
+ response, err := client.UnaryCall(ctx, &grpc_testing.SimpleRequest{
+ Payload: &grpc_testing.Payload{Body: []byte("data")},
+ })
+ require.NoError(t, err, "PingEmpty should succeed without errors")
+ testhelper.ProtoEqual(t, &grpc_testing.SimpleResponse{
+ Payload: &grpc_testing.Payload{Body: []byte("data")},
+ }, response)
}
-func (s *ProxyHappySuite) TestPingEmpty_StressTest() {
- for i := 0; i < 50; i++ {
- s.TestPingEmptyCarriesClientMetadata()
+func TestHandler_carriesHeadersAndTrailers(t *testing.T) {
+ t.Parallel()
+
+ ctx, client, backend := setupProxy(t)
+
+ backend.unaryCall = func(ctx context.Context, request *grpc_testing.SimpleRequest) (*grpc_testing.SimpleResponse, error) {
+ require.NoError(t, grpc.SendHeader(ctx, grpc_metadata.Pairs("injected_header", "header_value")))
+ require.NoError(t, grpc.SetTrailer(ctx, grpc_metadata.Pairs("injected_trailer", "trailer_value")))
+ return &grpc_testing.SimpleResponse{Payload: request.GetPayload()}, nil
}
-}
-func (s *ProxyHappySuite) TestPingCarriesServerHeadersAndTrailers() {
- headerMd := make(grpc_metadata.MD)
- trailerMd := make(grpc_metadata.MD)
- // This is an awkward calling convention... but meh.
- out, err := s.client.Ping(s.ctx, &pb.PingRequest{Value: "foo"}, grpc.Header(&headerMd), grpc.Trailer(&trailerMd))
- require.NoError(s.T(), err, "Ping should succeed without errors")
- testhelper.ProtoEqual(s.T(), &pb.PingResponse{Value: "foo", Counter: 42}, out)
- assert.Contains(s.T(), headerMd, serverHeaderMdKey, "server response headers must contain server data")
- assert.Len(s.T(), trailerMd, 1, "server response trailers must contain server data")
+ var headerMetadata, trailerMetadata grpc_metadata.MD
+
+ response, err := client.UnaryCall(ctx, &grpc_testing.SimpleRequest{
+ Payload: &grpc_testing.Payload{Body: []byte("data")},
+ }, grpc.Header(&headerMetadata), grpc.Trailer(&trailerMetadata))
+ require.NoError(t, err)
+ testhelper.ProtoEqual(t, &grpc_testing.SimpleResponse{
+ Payload: &grpc_testing.Payload{Body: []byte("data")},
+ }, response)
+
+ require.Equal(t, grpc_metadata.Pairs(
+ "content-type", "application/grpc",
+ "injected_header", "header_value",
+ ), headerMetadata)
+ require.Equal(t, grpc_metadata.Pairs(
+ "injected_trailer", "trailer_value",
+ ), trailerMetadata)
}
-func (s *ProxyHappySuite) TestPingErrorPropagatesAppError() {
+func TestHandler_propagatesServerError(t *testing.T) {
+ ctx, client, backend := setupProxy(t)
+
+ backend.unaryCall = func(context.Context, *grpc_testing.SimpleRequest) (*grpc_testing.SimpleResponse, error) {
+ return nil, status.Errorf(codes.ResourceExhausted, "service error")
+ }
+
sentryTriggered := 0
sentrySrv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
sentryTriggered++
@@ -154,79 +119,124 @@ func (s *ProxyHappySuite) TestPingErrorPropagatesAppError() {
// minimal required sentry client configuration
sentryURL, err := url.Parse(sentrySrv.URL)
- require.NoError(s.T(), err)
+ require.NoError(t, err)
sentryURL.User = url.UserPassword("stub", "stub")
sentryURL.Path = "/stub/1"
- require.NoError(s.T(), sentry.Init(sentry.ClientOptions{
+ require.NoError(t, sentry.Init(sentry.ClientOptions{
Dsn: sentryURL.String(),
Transport: sentry.NewHTTPSyncTransport(),
}))
+ // Verify that Sentry is configured correctyl to be triggered.
sentry.CaptureEvent(sentry.NewEvent())
- require.Equal(s.T(), 1, sentryTriggered, "sentry configured incorrectly")
+ require.Equal(t, 1, sentryTriggered)
+
+ _, err = client.UnaryCall(ctx, &grpc_testing.SimpleRequest{})
+ testhelper.RequireGrpcError(t, status.Errorf(codes.ResourceExhausted, "service error"), err)
+
+ // Sentry must not be triggered because errors from remote must be just propagated.
+ require.Equal(t, 1, sentryTriggered)
+}
+
+func TestHandler_directorErrorIsPropagated(t *testing.T) {
+ t.Parallel()
- _, err = s.client.PingError(s.ctx, &pb.PingRequest{Value: "foo"})
- require.Error(s.T(), err, "PingError should never succeed")
- assert.Equal(s.T(), codes.ResourceExhausted, status.Code(err))
- assert.Equal(s.T(), "Userspace error.", status.Convert(err).Message())
- require.Equal(s.T(), 1, sentryTriggered, "sentry must not be triggered because errors from remote must be just propagated")
+ // There is no need to set up the backend given that we should reject the call before we
+ // even hit the server.
+ ctx, client, _ := setupProxy(t)
+
+ // The proxy's director is set up so that it is rejecting requests when it sees the
+ // following gRPC metadata.
+ ctx = grpc_metadata.NewOutgoingContext(ctx, grpc_metadata.Pairs(rejectingMdKey, "true"))
+
+ _, err := client.UnaryCall(ctx, &grpc_testing.SimpleRequest{})
+ require.Error(t, err)
+ testhelper.RequireGrpcError(t, helper.ErrPermissionDeniedf("testing rejection"), err)
+}
+
+func TestHandler_fullDuplex(t *testing.T) {
+ t.Parallel()
+ testHandlerFullDuplex(t)
}
-func (s *ProxyHappySuite) TestDirectorErrorIsPropagated() {
- // See SetupSuite where the StreamDirector has a special case.
- ctx := grpc_metadata.NewOutgoingContext(s.ctx, grpc_metadata.Pairs(rejectingMdKey, "true"))
- _, err := s.client.Ping(ctx, &pb.PingRequest{Value: "foo"})
- require.Error(s.T(), err, "Director should reject this RPC")
- assert.Equal(s.T(), codes.PermissionDenied, status.Code(err))
- assert.Equal(s.T(), "testing rejection", status.Convert(err).Message())
+func TestHandler_fullDuplexStressTest(t *testing.T) {
+ t.Parallel()
+
+ for i := 0; i < 50; i++ {
+ testHandlerFullDuplex(t)
+ }
}
-func (s *ProxyHappySuite) TestPingStream_FullDuplexWorks() {
- stream, err := s.client.PingStream(s.ctx)
- require.NoError(s.T(), err, "PingStream request should be successful.")
+func testHandlerFullDuplex(t *testing.T) {
+ ctx, client, backend := setupProxy(t)
- for i := 0; i < countListResponses; i++ {
- ping := &pb.PingRequest{Value: fmt.Sprintf("foo:%d", i)}
- require.NoError(s.T(), stream.Send(ping), "sending to PingStream must not fail")
- resp, err := stream.Recv()
- if err == io.EOF {
- break
+ backend.fullDuplexCall = func(stream grpc_testing.TestService_FullDuplexCallServer) error {
+ require.NoError(t, stream.SendHeader(grpc_metadata.Pairs("custom_header", "header_value")))
+
+ for i := 0; ; i++ {
+ request, err := stream.Recv()
+ if err == io.EOF {
+ break
+ }
+ require.NoError(t, err)
+
+ require.NoError(t, stream.Send(&grpc_testing.StreamingOutputCallResponse{
+ Payload: &grpc_testing.Payload{
+ Body: []byte(fmt.Sprintf("%s: %d", request.GetPayload().GetBody(), i)),
+ },
+ }))
}
+
+ stream.SetTrailer(grpc_metadata.Pairs("custom_trailer", "trailer_value"))
+ return nil
+ }
+
+ stream, err := client.FullDuplexCall(ctx)
+ require.NoError(t, err)
+
+ for i := 0; i < 20; i++ {
+ require.NoError(t, stream.Send(&grpc_testing.StreamingOutputCallRequest{
+ Payload: &grpc_testing.Payload{
+ Body: []byte(fmt.Sprintf("foo:%d", i)),
+ },
+ }))
+
+ response, err := stream.Recv()
+ require.NoError(t, err)
+ testhelper.ProtoEqual(t, &grpc_testing.StreamingOutputCallResponse{
+ Payload: &grpc_testing.Payload{
+ Body: []byte(fmt.Sprintf("foo:%d: %d", i, i)),
+ },
+ }, response)
+
if i == 0 {
- // Check that the header arrives before all entries.
- headerMd, err := stream.Header()
- require.NoError(s.T(), err, "PingStream headers should not error.")
- assert.Contains(s.T(), headerMd, serverHeaderMdKey, "PingStream response headers user contain metadata")
+ headerMetadata, err := stream.Header()
+ require.NoError(t, err)
+ require.Equal(t, grpc_metadata.Pairs(
+ "content-type", "application/grpc",
+ "custom_header", "header_value",
+ ), headerMetadata)
}
- assert.EqualValues(s.T(), i, resp.Counter, "ping roundtrip must succeed with the correct id")
}
- require.NoError(s.T(), stream.CloseSend(), "no error on close send")
+
+ require.NoError(t, stream.CloseSend())
_, err = stream.Recv()
- require.Equal(s.T(), io.EOF, err, "stream should close with io.EOF, meaining OK")
- // Check that the trailer headers are here.
- trailerMd := stream.Trailer()
- assert.Len(s.T(), trailerMd, 1, "PingList trailer headers user contain metadata")
-}
+ require.Equal(t, io.EOF, err)
-func (s *ProxyHappySuite) TestPingStream_StressTest() {
- for i := 0; i < 50; i++ {
- s.TestPingStream_FullDuplexWorks()
- }
+ require.Equal(t, grpc_metadata.Pairs(
+ "custom_trailer", "trailer_value",
+ ), stream.Trailer())
}
-func (s *ProxyHappySuite) SetupSuite() {
- s.ctx, s.cancel = context.WithCancel(testhelper.Context(s.T()))
+func setupProxy(t *testing.T) (context.Context, grpc_testing.TestServiceClient, *interceptPinger) {
+ t.Helper()
+
+ ctx := testhelper.Context(t)
- listenerProxy, err := net.Listen("tcp", "127.0.0.1:0")
- require.NoError(s.T(), err, "must be able to allocate a port for listenerProxy")
- listenerServer, err := net.Listen("tcp", "127.0.0.1:0")
- require.NoError(s.T(), err, "must be able to allocate a port for listenerServer")
+ proxy2Server, backend := newBackendPinger(t, ctx)
- // Setup of the proxy's Director.
- s.connProxy2Server, err = grpc.Dial(listenerServer.Addr().String(), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithDefaultCallOptions(grpc.ForceCodec(proxy.NewCodec())))
- require.NoError(s.T(), err, "must not error on deferred client Dial")
- director := func(ctx context.Context, fullName string, peeker proxy.StreamPeeker) (*proxy.StreamParameters, error) {
+ director := func(ctx context.Context, _ string, peeker proxy.StreamPeeker) (*proxy.StreamParameters, error) {
payload, err := peeker.Peek()
if err != nil {
return nil, err
@@ -240,66 +250,21 @@ func (s *ProxyHappySuite) SetupSuite() {
}
// Explicitly copy the metadata, otherwise the tests will fail.
- return proxy.NewStreamParameters(proxy.Destination{Ctx: metadata.IncomingToOutgoing(ctx), Conn: s.connProxy2Server, Msg: payload}, nil, nil, nil), nil
+ return proxy.NewStreamParameters(proxy.Destination{
+ Ctx: metadata.IncomingToOutgoing(ctx),
+ Conn: proxy2Server,
+ Msg: payload,
+ }, nil, nil, nil), nil
}
- // Setup backend server for test suite
- s.server = grpc.NewServer()
- pb.RegisterTestServiceServer(s.server, &assertingService{t: s.T()})
- go func() {
- s.server.Serve(listenerServer)
- }()
-
- // Setup grpc-proxy server for test suite
- s.proxy = grpc.NewServer(
- grpc.ForceServerCodec(proxy.NewCodec()),
- grpc.StreamInterceptor(
- grpcmw.ChainStreamServer(
- // context tags usage is required by sentryhandler.StreamLogHandler
- grpcmwtags.StreamServerInterceptor(grpcmwtags.WithFieldExtractorForInitialReq(fieldextractors.FieldExtractor)),
- // sentry middleware to capture errors
- sentryhandler.StreamLogHandler,
- ),
- ),
- grpc.UnknownServiceHandler(proxy.TransparentHandler(director)),
- )
- // Ping handler is handled as an explicit registration and not as a TransparentHandler.
- proxy.RegisterService(s.proxy, director, "mwitkow.testproto.TestService", "Ping")
- go func() {
- s.proxy.Serve(listenerProxy)
- }()
-
- // Setup client for test suite
- ctx := testhelper.Context(s.T())
-
- s.connClient2Proxy, err = grpc.DialContext(ctx, listenerProxy.Addr().String(), grpc.WithTransportCredentials(insecure.NewCredentials()))
- require.NoError(s.T(), err, "must not error on deferred client Dial")
- s.client = pb.NewTestServiceClient(s.connClient2Proxy)
-}
-
-func (s *ProxyHappySuite) TearDownSuite() {
- if s.cancel != nil {
- s.cancel()
- }
- if s.connClient2Proxy != nil {
- s.connClient2Proxy.Close()
- }
- if s.connProxy2Server != nil {
- s.connProxy2Server.Close()
- }
- if s.proxy != nil {
- s.proxy.Stop()
- }
- if s.server != nil {
- s.server.Stop()
- }
-}
+ client2Proxy := newProxy(t, ctx, director, "mwitkow.testproto.TestService", "Ping")
-func TestProxyHappySuite(t *testing.T) {
- suite.Run(t, &ProxyHappySuite{})
+ return ctx, grpc_testing.NewTestServiceClient(client2Proxy), backend
}
func TestProxyErrorPropagation(t *testing.T) {
+ t.Parallel()
+
errBackend := status.Error(codes.InvalidArgument, "backend error")
errDirector := status.Error(codes.FailedPrecondition, "director error")
errRequestFinalizer := status.Error(codes.Internal, "request finalizer error")
@@ -418,7 +383,7 @@ func TestProxyErrorPropagation(t *testing.T) {
require.NoError(t, proxyClientConn.Close())
}()
- resp, err := pb.NewTestServiceClient(proxyClientConn).Ping(ctx, &pb.PingRequest{})
+ resp, err := grpc_testing.NewTestServiceClient(proxyClientConn).UnaryCall(ctx, &grpc_testing.SimpleRequest{})
testhelper.RequireGrpcError(t, tc.returnedError, err)
require.Nil(t, resp)
})
@@ -426,76 +391,114 @@ func TestProxyErrorPropagation(t *testing.T) {
}
func TestRegisterStreamHandlers(t *testing.T) {
- directorCalledError := errors.New("director was called")
-
- server := grpc.NewServer(
- grpc.ForceServerCodec(proxy.NewCodec()),
- grpc.UnknownServiceHandler(proxy.TransparentHandler(func(ctx context.Context, fullMethodName string, peeker proxy.StreamPeeker) (*proxy.StreamParameters, error) {
- return nil, directorCalledError
- })),
- )
-
- var pingStreamHandlerCalled, pingEmptyStreamHandlerCalled bool
-
- pingValue := "hello"
+ t.Parallel()
- pingStreamHandler := func(srv interface{}, stream grpc.ServerStream) error {
- pingStreamHandlerCalled = true
- var req pb.PingRequest
-
- if err := stream.RecvMsg(&req); err != nil {
- return err
- }
-
- require.Equal(t, pingValue, req.Value)
+ directorCalledError := errors.New("director was called")
- return stream.SendMsg(nil)
+ requestSent := &grpc_testing.SimpleRequest{
+ Payload: &grpc_testing.Payload{
+ Body: []byte("hello"),
+ },
}
- pingEmptyStreamHandler := func(srv interface{}, stream grpc.ServerStream) error {
- pingEmptyStreamHandlerCalled = true
- var req pb.Empty
-
- if err := stream.RecvMsg(&req); err != nil {
- return err
- }
-
- return stream.SendMsg(nil)
+ unaryCallStreamHandler := func(t *testing.T, srv interface{}, stream grpc.ServerStream) {
+ var request grpc_testing.SimpleRequest
+ require.NoError(t, stream.RecvMsg(&request))
+ testhelper.ProtoEqual(t, requestSent, &request)
+ require.NoError(t, stream.SendMsg(nil))
}
- streamers := map[string]grpc.StreamHandler{
- "Ping": pingStreamHandler,
- "PingEmpty": pingEmptyStreamHandler,
+ emptyCallStreamHandler := func(t *testing.T, srv interface{}, stream grpc.ServerStream) {
+ var request grpc_testing.Empty
+ require.NoError(t, stream.RecvMsg(&request))
+ require.NoError(t, stream.SendMsg(nil))
}
- proxy.RegisterStreamHandlers(server, "mwitkow.testproto.TestService", streamers)
-
- serverSocketPath := testhelper.GetTemporaryGitalySocketFileName(t)
-
- listener, err := net.Listen("unix", serverSocketPath)
- if err != nil {
- t.Fatal(err)
- }
+ for _, tc := range []struct {
+ desc string
+ registeredHandlers map[string]func(*testing.T, interface{}, grpc.ServerStream)
+ execute func(context.Context, *testing.T, grpc_testing.TestServiceClient)
+ expectedErr error
+ expectedCalls map[string]int
+ }{
+ {
+ desc: "single handler",
+ registeredHandlers: map[string]func(*testing.T, interface{}, grpc.ServerStream){
+ "UnaryCall": unaryCallStreamHandler,
+ },
+ execute: func(ctx context.Context, t *testing.T, client grpc_testing.TestServiceClient) {
+ _, err := client.UnaryCall(ctx, requestSent)
+ require.NoError(t, err)
+ },
+ expectedCalls: map[string]int{
+ "UnaryCall": 1,
+ },
+ },
+ {
+ desc: "multiple handlers picks the right one",
+ registeredHandlers: map[string]func(*testing.T, interface{}, grpc.ServerStream){
+ "UnaryCall": unaryCallStreamHandler,
+ "EmptyCall": emptyCallStreamHandler,
+ },
+ execute: func(ctx context.Context, t *testing.T, client grpc_testing.TestServiceClient) {
+ _, err := client.EmptyCall(ctx, &grpc_testing.Empty{})
+ require.NoError(t, err)
+ },
+ expectedCalls: map[string]int{
+ "EmptyCall": 1,
+ },
+ },
+ {
+ desc: "call to unregistered handler",
+ registeredHandlers: map[string]func(*testing.T, interface{}, grpc.ServerStream){
+ "EmptyCall": emptyCallStreamHandler,
+ },
+ execute: func(ctx context.Context, t *testing.T, client grpc_testing.TestServiceClient) {
+ _, err := client.UnaryCall(ctx, requestSent)
+ testhelper.RequireGrpcError(t, directorCalledError, err)
+ },
+ expectedCalls: map[string]int{},
+ },
+ } {
+ t.Run(tc.desc, func(t *testing.T) {
+ ctx := testhelper.Context(t)
- go server.Serve(listener)
- defer server.Stop()
+ server := grpc.NewServer(
+ grpc.ForceServerCodec(proxy.NewCodec()),
+ grpc.UnknownServiceHandler(proxy.TransparentHandler(
+ func(ctx context.Context, fullMethodName string, peeker proxy.StreamPeeker) (*proxy.StreamParameters, error) {
+ return nil, directorCalledError
+ },
+ )),
+ )
- cc, err := client.Dial("unix://"+serverSocketPath, []grpc.DialOption{grpc.WithBlock()})
- require.NoError(t, err)
- defer cc.Close()
+ calls := map[string]int{}
+ registeredHandlers := map[string]grpc.StreamHandler{}
+ for name, handler := range tc.registeredHandlers {
+ name, handler := name, handler
+
+ // We wrap every handler so that we can easily count the number of
+ // times each of them has been invoked.
+ registeredHandlers[name] = func(srv interface{}, stream grpc.ServerStream) error {
+ calls[name]++
+ handler(t, srv, stream)
+ return nil
+ }
+ }
+ proxy.RegisterStreamHandlers(server, grpc_testing.TestService_ServiceDesc.ServiceName, registeredHandlers)
- testServiceClient := pb.NewTestServiceClient(cc)
- ctx := testhelper.Context(t)
+ listener := newListener(t)
+ go server.Serve(listener)
+ defer server.Stop()
- _, err = testServiceClient.Ping(ctx, &pb.PingRequest{Value: pingValue})
- require.NoError(t, err)
- require.True(t, pingStreamHandlerCalled)
+ conn, err := client.Dial("tcp://"+listener.Addr().String(), []grpc.DialOption{grpc.WithBlock()})
+ require.NoError(t, err)
+ defer conn.Close()
+ client := grpc_testing.NewTestServiceClient(conn)
- _, err = testServiceClient.PingEmpty(ctx, &pb.Empty{})
- require.NoError(t, err)
- require.True(t, pingEmptyStreamHandlerCalled)
+ tc.execute(ctx, t, client)
- // since PingError was never registered with its own streamer, it should get sent to the UnknownServiceHandler
- _, err = testServiceClient.PingError(ctx, &pb.PingRequest{})
- testhelper.RequireGrpcError(t, status.Error(codes.Unknown, directorCalledError.Error()), err)
+ require.Equal(t, tc.expectedCalls, calls)
+ })
+ }
}
diff --git a/internal/praefect/grpc-proxy/proxy/handler_test.go b/internal/praefect/grpc-proxy/proxy/handler_test.go
index 8810bd7fc..a4fafab58 100644
--- a/internal/praefect/grpc-proxy/proxy/handler_test.go
+++ b/internal/praefect/grpc-proxy/proxy/handler_test.go
@@ -10,6 +10,8 @@ import (
)
func TestFailDestinationWithError(t *testing.T) {
+ t.Parallel()
+
expectedErr := errors.New("some error")
t.Run("works with nil ErrHandlers", func(t *testing.T) {
diff --git a/internal/praefect/grpc-proxy/proxy/helper_test.go b/internal/praefect/grpc-proxy/proxy/helper_test.go
deleted file mode 100644
index d025c9c3a..000000000
--- a/internal/praefect/grpc-proxy/proxy/helper_test.go
+++ /dev/null
@@ -1,120 +0,0 @@
-//go:build !gitaly_test_sha256
-
-package proxy_test
-
-import (
- "context"
- "net"
- "testing"
-
- "github.com/stretchr/testify/require"
- "gitlab.com/gitlab-org/gitaly/v15/internal/praefect/grpc-proxy/proxy"
- testservice "gitlab.com/gitlab-org/gitaly/v15/internal/praefect/grpc-proxy/testdata"
- "google.golang.org/grpc"
- "google.golang.org/grpc/credentials/insecure"
-)
-
-func newListener(tb testing.TB) net.Listener {
- listener, err := net.Listen("tcp", "127.0.0.1:0")
- require.NoError(tb, err, "must be able to allocate a port for listener")
-
- return listener
-}
-
-func newBackendPinger(tb testing.TB, ctx context.Context) (*grpc.ClientConn, *interceptPinger, func()) {
- ip := &interceptPinger{}
-
- done := make(chan struct{})
- srvr := grpc.NewServer()
- listener := newListener(tb)
-
- testservice.RegisterTestServiceServer(srvr, ip)
-
- go func() {
- defer close(done)
- srvr.Serve(listener)
- }()
-
- cc, err := grpc.DialContext(
- ctx,
- listener.Addr().String(),
- grpc.WithTransportCredentials(insecure.NewCredentials()),
- grpc.WithBlock(),
- grpc.WithDefaultCallOptions(
- grpc.ForceCodec(proxy.NewCodec()),
- ),
- )
- require.NoError(tb, err)
-
- cleanup := func() {
- srvr.GracefulStop()
- require.NoError(tb, cc.Close())
- <-done
- }
-
- return cc, ip, cleanup
-}
-
-func newProxy(tb testing.TB, ctx context.Context, director proxy.StreamDirector, svc, method string) (*grpc.ClientConn, func()) {
- proxySrvr := grpc.NewServer(
- grpc.ForceServerCodec(proxy.NewCodec()),
- grpc.UnknownServiceHandler(proxy.TransparentHandler(director)),
- )
-
- proxy.RegisterService(proxySrvr, director, svc, method)
-
- done := make(chan struct{})
- listener := newListener(tb)
-
- go func() {
- defer close(done)
- proxySrvr.Serve(listener)
- }()
-
- proxyCC, err := grpc.DialContext(
- ctx,
- listener.Addr().String(),
- grpc.WithTransportCredentials(insecure.NewCredentials()),
- grpc.WithBlock(),
- )
- require.NoError(tb, err)
-
- cleanup := func() {
- proxySrvr.GracefulStop()
- require.NoError(tb, proxyCC.Close())
- <-done
- }
-
- return proxyCC, cleanup
-}
-
-// interceptPinger allows an RPC to be intercepted with a custom
-// function defined in each unit test
-type interceptPinger struct {
- testservice.UnimplementedTestServiceServer
- pingStream func(testservice.TestService_PingStreamServer) error
- pingEmpty func(context.Context, *testservice.Empty) (*testservice.PingResponse, error)
- ping func(context.Context, *testservice.PingRequest) (*testservice.PingResponse, error)
- pingError func(context.Context, *testservice.PingRequest) (*testservice.Empty, error)
- pingList func(*testservice.PingRequest, testservice.TestService_PingListServer) error
-}
-
-func (ip *interceptPinger) PingStream(stream testservice.TestService_PingStreamServer) error {
- return ip.pingStream(stream)
-}
-
-func (ip *interceptPinger) PingEmpty(ctx context.Context, req *testservice.Empty) (*testservice.PingResponse, error) {
- return ip.pingEmpty(ctx, req)
-}
-
-func (ip *interceptPinger) Ping(ctx context.Context, req *testservice.PingRequest) (*testservice.PingResponse, error) {
- return ip.ping(ctx, req)
-}
-
-func (ip *interceptPinger) PingError(ctx context.Context, req *testservice.PingRequest) (*testservice.Empty, error) {
- return ip.pingError(ctx, req)
-}
-
-func (ip *interceptPinger) PingList(req *testservice.PingRequest, stream testservice.TestService_PingListServer) error {
- return ip.pingList(req, stream)
-}
diff --git a/internal/praefect/grpc-proxy/proxy/peeker_test.go b/internal/praefect/grpc-proxy/proxy/peeker_test.go
index 276f175d9..9b63653e8 100644
--- a/internal/praefect/grpc-proxy/proxy/peeker_test.go
+++ b/internal/praefect/grpc-proxy/proxy/peeker_test.go
@@ -7,133 +7,148 @@ import (
"io"
"testing"
- "github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"gitlab.com/gitlab-org/gitaly/v15/internal/metadata"
"gitlab.com/gitlab-org/gitaly/v15/internal/praefect/grpc-proxy/proxy"
- testservice "gitlab.com/gitlab-org/gitaly/v15/internal/praefect/grpc-proxy/testdata"
"gitlab.com/gitlab-org/gitaly/v15/internal/testhelper"
+ "google.golang.org/grpc/test/grpc_testing"
"google.golang.org/protobuf/proto"
)
-// TestStreamPeeking demonstrates that a director function is able to peek
-// into a stream. Further more, it demonstrates that peeking into a stream
-// will not disturb the stream sent from the proxy client to the backend.
+// TestStreamPeeking demonstrates that a director function is able to peek into a stream. Further
+// more, it demonstrates that peeking into a stream will not disturb the stream sent from the proxy
+// client to the backend.
func TestStreamPeeking(t *testing.T) {
+ t.Parallel()
+
ctx := testhelper.Context(t)
- backendCC, backendSrvr, cleanupPinger := newBackendPinger(t, ctx)
- defer cleanupPinger()
+ backendCC, backendSrvr := newBackendPinger(t, ctx)
- pingReqSent := &testservice.PingRequest{Value: "hi"}
+ requestSent := &grpc_testing.StreamingOutputCallRequest{
+ Payload: &grpc_testing.Payload{
+ Body: []byte("hi"),
+ },
+ }
+ responseSent := &grpc_testing.StreamingOutputCallResponse{
+ Payload: &grpc_testing.Payload{
+ Body: []byte("bye"),
+ },
+ }
- // director will peek into stream before routing traffic
- director := func(ctx context.Context, fullMethodName string, peeker proxy.StreamPeeker) (*proxy.StreamParameters, error) {
- peekedMsg, err := peeker.Peek()
+ // We create a director that's peeking into the message in order to assert that the peeked
+ // message will still be seen by the client.
+ director := func(ctx context.Context, _ string, peeker proxy.StreamPeeker) (*proxy.StreamParameters, error) {
+ peekedMessage, err := peeker.Peek()
require.NoError(t, err)
- peekedRequest := &testservice.PingRequest{}
- err = proto.Unmarshal(peekedMsg, peekedRequest)
- require.NoError(t, err)
- require.True(t, proto.Equal(pingReqSent, peekedRequest), "expected to be the same")
+ var peekedRequest grpc_testing.StreamingOutputCallRequest
+ require.NoError(t, proto.Unmarshal(peekedMessage, &peekedRequest))
+ testhelper.ProtoEqual(t, requestSent, &peekedRequest)
- return proxy.NewStreamParameters(proxy.Destination{Ctx: metadata.IncomingToOutgoing(ctx), Conn: backendCC, Msg: peekedMsg}, nil, nil, nil), nil
+ return proxy.NewStreamParameters(proxy.Destination{
+ Ctx: metadata.IncomingToOutgoing(ctx),
+ Conn: backendCC,
+ Msg: peekedMessage,
+ }, nil, nil, nil), nil
}
- pingResp := &testservice.PingResponse{
- Counter: 1,
- }
-
- // we expect the backend server to receive the peeked message
- backendSrvr.pingStream = func(stream testservice.TestService_PingStreamServer) error {
- pingReqReceived, err := stream.Recv()
- assert.NoError(t, err)
- assert.True(t, proto.Equal(pingReqSent, pingReqReceived), "expected to be the same")
+ // The backend is supposed to still receive the message as expected without any modification
+ // to it.
+ backendSrvr.fullDuplexCall = func(stream grpc_testing.TestService_FullDuplexCallServer) error {
+ requestReceived, err := stream.Recv()
+ require.NoError(t, err)
+ testhelper.ProtoEqual(t, requestSent, requestReceived)
- return stream.Send(pingResp)
+ return stream.Send(responseSent)
}
- proxyCC, cleanupProxy := newProxy(t, ctx, director, "mwitkow.testproto.TestService", "PingStream")
- defer cleanupProxy()
-
- proxyClient := testservice.NewTestServiceClient(proxyCC)
+ proxyConn := newProxy(t, ctx, director, "grpc_testing.TestService", "FullDuplexCall")
+ proxyClient := grpc_testing.NewTestServiceClient(proxyConn)
- proxyClientPingStream, err := proxyClient.PingStream(ctx)
+ // Send the request on the stream and close the writing side.
+ proxyStream, err := proxyClient.FullDuplexCall(ctx)
require.NoError(t, err)
- defer func() {
- require.NoError(t, proxyClientPingStream.CloseSend())
- }()
+ require.NoError(t, proxyStream.Send(requestSent))
+ require.NoError(t, proxyStream.CloseSend())
- require.NoError(t,
- proxyClientPingStream.Send(pingReqSent),
- )
-
- resp, err := proxyClientPingStream.Recv()
+ // And now verify that the response we've got in fact matches our expected response.
+ responseReceived, err := proxyStream.Recv()
require.NoError(t, err)
- require.True(t, proto.Equal(resp, pingResp), "expected to be the same")
+ testhelper.ProtoEqual(t, responseReceived, responseSent)
- _, err = proxyClientPingStream.Recv()
+ _, err = proxyStream.Recv()
require.Equal(t, io.EOF, err)
}
func TestStreamInjecting(t *testing.T) {
+ t.Parallel()
+
ctx := testhelper.Context(t)
- backendCC, backendSrvr, cleanupPinger := newBackendPinger(t, ctx)
- defer cleanupPinger()
+ backendCC, backendSrvr := newBackendPinger(t, ctx)
- pingReqSent := &testservice.PingRequest{Value: "hi"}
- newValue := "bye"
+ requestSent := &grpc_testing.StreamingOutputCallRequest{
+ Payload: &grpc_testing.Payload{
+ Body: []byte("hi"),
+ },
+ }
+ requestReplaced := &grpc_testing.StreamingOutputCallRequest{
+ Payload: &grpc_testing.Payload{
+ Body: []byte("replaced"),
+ },
+ }
+ responseSent := &grpc_testing.StreamingOutputCallResponse{
+ Payload: &grpc_testing.Payload{
+ Body: []byte("bye"),
+ },
+ }
- // director will peek into stream and change some frames
+ // We create a director that peeks the incoming request and in fact changes its values. This
+ // is to assert that the client receives the changed requests.
director := func(ctx context.Context, fullMethodName string, peeker proxy.StreamPeeker) (*proxy.StreamParameters, error) {
- peekedMsg, err := peeker.Peek()
+ peekedMessage, err := peeker.Peek()
require.NoError(t, err)
- peekedRequest := &testservice.PingRequest{}
- require.NoError(t, proto.Unmarshal(peekedMsg, peekedRequest))
- require.Equal(t, "hi", peekedRequest.GetValue())
-
- peekedRequest.Value = newValue
+ // Assert that we get the expected original ping request.
+ var peekedRequest grpc_testing.StreamingOutputCallRequest
+ require.NoError(t, proto.Unmarshal(peekedMessage, &peekedRequest))
+ testhelper.ProtoEqual(t, requestSent, &peekedRequest)
- newPayload, err := proto.Marshal(peekedRequest)
+ // Replace the value of the peeked request and send along the changed request.
+ replacedMessage, err := proto.Marshal(requestReplaced)
require.NoError(t, err)
- return proxy.NewStreamParameters(proxy.Destination{Ctx: metadata.IncomingToOutgoing(ctx), Conn: backendCC, Msg: newPayload}, nil, nil, nil), nil
+ return proxy.NewStreamParameters(proxy.Destination{
+ Ctx: metadata.IncomingToOutgoing(ctx),
+ Conn: backendCC,
+ Msg: replacedMessage,
+ }, nil, nil, nil), nil
}
- pingResp := &testservice.PingResponse{
- Counter: 1,
- }
-
- // we expect the backend server to receive the modified message
- backendSrvr.pingStream = func(stream testservice.TestService_PingStreamServer) error {
- pingReqReceived, err := stream.Recv()
- assert.NoError(t, err)
- assert.Equal(t, newValue, pingReqReceived.GetValue())
+ // Upon receiving the request the backend server should only ever see the changed request.
+ backendSrvr.fullDuplexCall = func(stream grpc_testing.TestService_FullDuplexCallServer) error {
+ requestReceived, err := stream.Recv()
+ require.NoError(t, err)
+ testhelper.ProtoEqual(t, requestReplaced, requestReceived)
- return stream.Send(pingResp)
+ return stream.Send(responseSent)
}
- proxyCC, cleanupProxy := newProxy(t, ctx, director, "mwitkow.testproto.TestService", "PingStream")
- defer cleanupProxy()
+ proxyConn := newProxy(t, ctx, director, "grpc_testing.TestService", "FullDuplexCall")
+ proxyClient := grpc_testing.NewTestServiceClient(proxyConn)
- proxyClient := testservice.NewTestServiceClient(proxyCC)
-
- proxyClientPingStream, err := proxyClient.PingStream(ctx)
+ proxyStream, err := proxyClient.FullDuplexCall(ctx)
require.NoError(t, err)
defer func() {
- require.NoError(t, proxyClientPingStream.CloseSend())
+ require.NoError(t, proxyStream.CloseSend())
}()
+ require.NoError(t, proxyStream.Send(requestSent))
- require.NoError(t,
- proxyClientPingStream.Send(pingReqSent),
- )
-
- resp, err := proxyClientPingStream.Recv()
+ responseReceived, err := proxyStream.Recv()
require.NoError(t, err)
- require.True(t, proto.Equal(resp, pingResp), "expected to be the same")
+ testhelper.ProtoEqual(t, responseSent, responseReceived)
- _, err = proxyClientPingStream.Recv()
+ _, err = proxyStream.Recv()
require.Equal(t, io.EOF, err)
}
diff --git a/internal/praefect/grpc-proxy/proxy/testhelper_test.go b/internal/praefect/grpc-proxy/proxy/testhelper_test.go
new file mode 100644
index 000000000..bc7ffca5e
--- /dev/null
+++ b/internal/praefect/grpc-proxy/proxy/testhelper_test.go
@@ -0,0 +1,131 @@
+//go:build !gitaly_test_sha256
+
+package proxy_test
+
+import (
+ "context"
+ "net"
+ "testing"
+
+ grpcmw "github.com/grpc-ecosystem/go-grpc-middleware"
+ grpcmwtags "github.com/grpc-ecosystem/go-grpc-middleware/tags"
+ "github.com/stretchr/testify/require"
+ "gitlab.com/gitlab-org/gitaly/v15/internal/helper/fieldextractors"
+ "gitlab.com/gitlab-org/gitaly/v15/internal/middleware/sentryhandler"
+ "gitlab.com/gitlab-org/gitaly/v15/internal/praefect/grpc-proxy/proxy"
+ "gitlab.com/gitlab-org/gitaly/v15/internal/testhelper"
+ "google.golang.org/grpc"
+ "google.golang.org/grpc/credentials/insecure"
+ "google.golang.org/grpc/test/grpc_testing"
+)
+
+func TestMain(m *testing.M) {
+ testhelper.Run(m)
+}
+
+func newListener(tb testing.TB) net.Listener {
+ listener, err := net.Listen("tcp", "127.0.0.1:0")
+ require.NoError(tb, err, "must be able to allocate a port for listener")
+
+ return listener
+}
+
+func newBackendPinger(tb testing.TB, ctx context.Context) (*grpc.ClientConn, *interceptPinger) {
+ ip := &interceptPinger{}
+
+ srvr := grpc.NewServer()
+ listener := newListener(tb)
+
+ grpc_testing.RegisterTestServiceServer(srvr, ip)
+
+ done := make(chan struct{})
+ go func() {
+ defer close(done)
+ srvr.Serve(listener)
+ }()
+
+ cc, err := grpc.DialContext(
+ ctx,
+ listener.Addr().String(),
+ grpc.WithTransportCredentials(insecure.NewCredentials()),
+ grpc.WithBlock(),
+ grpc.WithDefaultCallOptions(
+ grpc.ForceCodec(proxy.NewCodec()),
+ ),
+ )
+ require.NoError(tb, err)
+
+ tb.Cleanup(func() {
+ srvr.GracefulStop()
+ require.NoError(tb, cc.Close())
+ <-done
+ })
+
+ return cc, ip
+}
+
+func newProxy(tb testing.TB, ctx context.Context, director proxy.StreamDirector, svc, method string) *grpc.ClientConn {
+ proxySrvr := grpc.NewServer(
+ grpc.ForceServerCodec(proxy.NewCodec()),
+ grpc.StreamInterceptor(
+ grpcmw.ChainStreamServer(
+ // context tags usage is required by sentryhandler.StreamLogHandler
+ grpcmwtags.StreamServerInterceptor(grpcmwtags.WithFieldExtractorForInitialReq(fieldextractors.FieldExtractor)),
+ // sentry middleware to capture errors
+ sentryhandler.StreamLogHandler,
+ ),
+ ),
+ grpc.UnknownServiceHandler(proxy.TransparentHandler(director)),
+ )
+ proxy.RegisterService(proxySrvr, director, svc, method)
+
+ done := make(chan struct{})
+ listener := newListener(tb)
+ go func() {
+ defer close(done)
+ proxySrvr.Serve(listener)
+ }()
+
+ proxyCC, err := grpc.DialContext(
+ ctx,
+ listener.Addr().String(),
+ grpc.WithTransportCredentials(insecure.NewCredentials()),
+ grpc.WithBlock(),
+ )
+ require.NoError(tb, err)
+
+ tb.Cleanup(func() {
+ proxySrvr.GracefulStop()
+ require.NoError(tb, proxyCC.Close())
+ <-done
+ })
+
+ return proxyCC
+}
+
+// interceptPinger allows an RPC to be intercepted with a custom
+// function defined in each unit test
+type interceptPinger struct {
+ grpc_testing.UnimplementedTestServiceServer
+
+ fullDuplexCall func(grpc_testing.TestService_FullDuplexCallServer) error
+ emptyCall func(context.Context, *grpc_testing.Empty) (*grpc_testing.Empty, error)
+ unaryCall func(context.Context, *grpc_testing.SimpleRequest) (*grpc_testing.SimpleResponse, error)
+ streamingOutputCall func(*grpc_testing.StreamingOutputCallRequest, grpc_testing.TestService_StreamingOutputCallServer) error
+}
+
+func (ip *interceptPinger) FullDuplexCall(stream grpc_testing.TestService_FullDuplexCallServer) error {
+ return ip.fullDuplexCall(stream)
+}
+
+func (ip *interceptPinger) EmptyCall(ctx context.Context, req *grpc_testing.Empty) (*grpc_testing.Empty, error) {
+ return ip.emptyCall(ctx, req)
+}
+
+func (ip *interceptPinger) UnaryCall(ctx context.Context, req *grpc_testing.SimpleRequest) (*grpc_testing.SimpleResponse, error) {
+ return ip.unaryCall(ctx, req)
+}
+
+func (ip *interceptPinger) StreamingOutputCall(req *grpc_testing.StreamingOutputCallRequest, stream grpc_testing.TestService_StreamingOutputCallServer) error {
+ return ip.streamingOutputCall(req, stream)
+}
diff --git a/internal/praefect/grpc-proxy/testdata/test.pb.go b/internal/praefect/grpc-proxy/testdata/test.pb.go
deleted file mode 100644
index ec62c5466..000000000
--- a/internal/praefect/grpc-proxy/testdata/test.pb.go
+++ /dev/null
@@ -1,306 +0,0 @@
-// Code generated by protoc-gen-go. DO NOT EDIT.
-// versions:
-// protoc-gen-go v1.28.0
-// protoc v3.21.1
-// source: praefect/grpc-proxy/testdata/test.proto
-
-package testdata
-
-import (
- protoreflect "google.golang.org/protobuf/reflect/protoreflect"
- protoimpl "google.golang.org/protobuf/runtime/protoimpl"
- reflect "reflect"
- sync "sync"
-)
-
-const (
- // Verify that this generated code is sufficiently up-to-date.
- _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion)
- // Verify that runtime/protoimpl is sufficiently up-to-date.
- _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20)
-)
-
-type Empty struct {
- state protoimpl.MessageState
- sizeCache protoimpl.SizeCache
- unknownFields protoimpl.UnknownFields
-}
-
-func (x *Empty) Reset() {
- *x = Empty{}
- if protoimpl.UnsafeEnabled {
- mi := &file_praefect_grpc_proxy_testdata_test_proto_msgTypes[0]
- ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
- ms.StoreMessageInfo(mi)
- }
-}
-
-func (x *Empty) String() string {
- return protoimpl.X.MessageStringOf(x)
-}
-
-func (*Empty) ProtoMessage() {}
-
-func (x *Empty) ProtoReflect() protoreflect.Message {
- mi := &file_praefect_grpc_proxy_testdata_test_proto_msgTypes[0]
- if protoimpl.UnsafeEnabled && x != nil {
- ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
- if ms.LoadMessageInfo() == nil {
- ms.StoreMessageInfo(mi)
- }
- return ms
- }
- return mi.MessageOf(x)
-}
-
-// Deprecated: Use Empty.ProtoReflect.Descriptor instead.
-func (*Empty) Descriptor() ([]byte, []int) {
- return file_praefect_grpc_proxy_testdata_test_proto_rawDescGZIP(), []int{0}
-}
-
-type PingRequest struct {
- state protoimpl.MessageState
- sizeCache protoimpl.SizeCache
- unknownFields protoimpl.UnknownFields
-
- Value string `protobuf:"bytes,1,opt,name=value,proto3" json:"value,omitempty"`
-}
-
-func (x *PingRequest) Reset() {
- *x = PingRequest{}
- if protoimpl.UnsafeEnabled {
- mi := &file_praefect_grpc_proxy_testdata_test_proto_msgTypes[1]
- ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
- ms.StoreMessageInfo(mi)
- }
-}
-
-func (x *PingRequest) String() string {
- return protoimpl.X.MessageStringOf(x)
-}
-
-func (*PingRequest) ProtoMessage() {}
-
-func (x *PingRequest) ProtoReflect() protoreflect.Message {
- mi := &file_praefect_grpc_proxy_testdata_test_proto_msgTypes[1]
- if protoimpl.UnsafeEnabled && x != nil {
- ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
- if ms.LoadMessageInfo() == nil {
- ms.StoreMessageInfo(mi)
- }
- return ms
- }
- return mi.MessageOf(x)
-}
-
-// Deprecated: Use PingRequest.ProtoReflect.Descriptor instead.
-func (*PingRequest) Descriptor() ([]byte, []int) {
- return file_praefect_grpc_proxy_testdata_test_proto_rawDescGZIP(), []int{1}
-}
-
-func (x *PingRequest) GetValue() string {
- if x != nil {
- return x.Value
- }
- return ""
-}
-
-type PingResponse struct {
- state protoimpl.MessageState
- sizeCache protoimpl.SizeCache
- unknownFields protoimpl.UnknownFields
-
- Value string `protobuf:"bytes,1,opt,name=Value,proto3" json:"Value,omitempty"`
- Counter int32 `protobuf:"varint,2,opt,name=counter,proto3" json:"counter,omitempty"`
-}
-
-func (x *PingResponse) Reset() {
- *x = PingResponse{}
- if protoimpl.UnsafeEnabled {
- mi := &file_praefect_grpc_proxy_testdata_test_proto_msgTypes[2]
- ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
- ms.StoreMessageInfo(mi)
- }
-}
-
-func (x *PingResponse) String() string {
- return protoimpl.X.MessageStringOf(x)
-}
-
-func (*PingResponse) ProtoMessage() {}
-
-func (x *PingResponse) ProtoReflect() protoreflect.Message {
- mi := &file_praefect_grpc_proxy_testdata_test_proto_msgTypes[2]
- if protoimpl.UnsafeEnabled && x != nil {
- ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
- if ms.LoadMessageInfo() == nil {
- ms.StoreMessageInfo(mi)
- }
- return ms
- }
- return mi.MessageOf(x)
-}
-
-// Deprecated: Use PingResponse.ProtoReflect.Descriptor instead.
-func (*PingResponse) Descriptor() ([]byte, []int) {
- return file_praefect_grpc_proxy_testdata_test_proto_rawDescGZIP(), []int{2}
-}
-
-func (x *PingResponse) GetValue() string {
- if x != nil {
- return x.Value
- }
- return ""
-}
-
-func (x *PingResponse) GetCounter() int32 {
- if x != nil {
- return x.Counter
- }
- return 0
-}
-
-var File_praefect_grpc_proxy_testdata_test_proto protoreflect.FileDescriptor
-
-var file_praefect_grpc_proxy_testdata_test_proto_rawDesc = []byte{
- 0x0a, 0x27, 0x70, 0x72, 0x61, 0x65, 0x66, 0x65, 0x63, 0x74, 0x2f, 0x67, 0x72, 0x70, 0x63, 0x2d,
- 0x70, 0x72, 0x6f, 0x78, 0x79, 0x2f, 0x74, 0x65, 0x73, 0x74, 0x64, 0x61, 0x74, 0x61, 0x2f, 0x74,
- 0x65, 0x73, 0x74, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x11, 0x6d, 0x77, 0x69, 0x74, 0x6b,
- 0x6f, 0x77, 0x2e, 0x74, 0x65, 0x73, 0x74, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0x07, 0x0a, 0x05,
- 0x45, 0x6d, 0x70, 0x74, 0x79, 0x22, 0x23, 0x0a, 0x0b, 0x50, 0x69, 0x6e, 0x67, 0x52, 0x65, 0x71,
- 0x75, 0x65, 0x73, 0x74, 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x01, 0x20,
- 0x01, 0x28, 0x09, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x22, 0x3e, 0x0a, 0x0c, 0x50, 0x69,
- 0x6e, 0x67, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x14, 0x0a, 0x05, 0x56, 0x61,
- 0x6c, 0x75, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x56, 0x61, 0x6c, 0x75, 0x65,
- 0x12, 0x18, 0x0a, 0x07, 0x63, 0x6f, 0x75, 0x6e, 0x74, 0x65, 0x72, 0x18, 0x02, 0x20, 0x01, 0x28,
- 0x05, 0x52, 0x07, 0x63, 0x6f, 0x75, 0x6e, 0x74, 0x65, 0x72, 0x32, 0x91, 0x03, 0x0a, 0x0b, 0x54,
- 0x65, 0x73, 0x74, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x12, 0x48, 0x0a, 0x09, 0x50, 0x69,
- 0x6e, 0x67, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x12, 0x18, 0x2e, 0x6d, 0x77, 0x69, 0x74, 0x6b, 0x6f,
- 0x77, 0x2e, 0x74, 0x65, 0x73, 0x74, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x45, 0x6d, 0x70, 0x74,
- 0x79, 0x1a, 0x1f, 0x2e, 0x6d, 0x77, 0x69, 0x74, 0x6b, 0x6f, 0x77, 0x2e, 0x74, 0x65, 0x73, 0x74,
- 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x50, 0x69, 0x6e, 0x67, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e,
- 0x73, 0x65, 0x22, 0x00, 0x12, 0x49, 0x0a, 0x04, 0x50, 0x69, 0x6e, 0x67, 0x12, 0x1e, 0x2e, 0x6d,
- 0x77, 0x69, 0x74, 0x6b, 0x6f, 0x77, 0x2e, 0x74, 0x65, 0x73, 0x74, 0x70, 0x72, 0x6f, 0x74, 0x6f,
- 0x2e, 0x50, 0x69, 0x6e, 0x67, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1f, 0x2e, 0x6d,
- 0x77, 0x69, 0x74, 0x6b, 0x6f, 0x77, 0x2e, 0x74, 0x65, 0x73, 0x74, 0x70, 0x72, 0x6f, 0x74, 0x6f,
- 0x2e, 0x50, 0x69, 0x6e, 0x67, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12,
- 0x47, 0x0a, 0x09, 0x50, 0x69, 0x6e, 0x67, 0x45, 0x72, 0x72, 0x6f, 0x72, 0x12, 0x1e, 0x2e, 0x6d,
- 0x77, 0x69, 0x74, 0x6b, 0x6f, 0x77, 0x2e, 0x74, 0x65, 0x73, 0x74, 0x70, 0x72, 0x6f, 0x74, 0x6f,
- 0x2e, 0x50, 0x69, 0x6e, 0x67, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x18, 0x2e, 0x6d,
- 0x77, 0x69, 0x74, 0x6b, 0x6f, 0x77, 0x2e, 0x74, 0x65, 0x73, 0x74, 0x70, 0x72, 0x6f, 0x74, 0x6f,
- 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x22, 0x00, 0x12, 0x4f, 0x0a, 0x08, 0x50, 0x69, 0x6e, 0x67,
- 0x4c, 0x69, 0x73, 0x74, 0x12, 0x1e, 0x2e, 0x6d, 0x77, 0x69, 0x74, 0x6b, 0x6f, 0x77, 0x2e, 0x74,
- 0x65, 0x73, 0x74, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x50, 0x69, 0x6e, 0x67, 0x52, 0x65, 0x71,
- 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1f, 0x2e, 0x6d, 0x77, 0x69, 0x74, 0x6b, 0x6f, 0x77, 0x2e, 0x74,
- 0x65, 0x73, 0x74, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x50, 0x69, 0x6e, 0x67, 0x52, 0x65, 0x73,
- 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x30, 0x01, 0x12, 0x53, 0x0a, 0x0a, 0x50, 0x69, 0x6e,
- 0x67, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x12, 0x1e, 0x2e, 0x6d, 0x77, 0x69, 0x74, 0x6b, 0x6f,
- 0x77, 0x2e, 0x74, 0x65, 0x73, 0x74, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x50, 0x69, 0x6e, 0x67,
- 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1f, 0x2e, 0x6d, 0x77, 0x69, 0x74, 0x6b, 0x6f,
- 0x77, 0x2e, 0x74, 0x65, 0x73, 0x74, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x50, 0x69, 0x6e, 0x67,
- 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x28, 0x01, 0x30, 0x01, 0x42, 0x48,
- 0x5a, 0x46, 0x67, 0x69, 0x74, 0x6c, 0x61, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x67, 0x69, 0x74,
- 0x6c, 0x61, 0x62, 0x2d, 0x6f, 0x72, 0x67, 0x2f, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2f, 0x76,
- 0x31, 0x35, 0x2f, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x2f, 0x70, 0x72, 0x61, 0x65,
- 0x66, 0x65, 0x63, 0x74, 0x2f, 0x67, 0x72, 0x70, 0x63, 0x2d, 0x70, 0x72, 0x6f, 0x78, 0x79, 0x2f,
- 0x74, 0x65, 0x73, 0x74, 0x64, 0x61, 0x74, 0x61, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
-}
-
-var (
- file_praefect_grpc_proxy_testdata_test_proto_rawDescOnce sync.Once
- file_praefect_grpc_proxy_testdata_test_proto_rawDescData = file_praefect_grpc_proxy_testdata_test_proto_rawDesc
-)
-
-func file_praefect_grpc_proxy_testdata_test_proto_rawDescGZIP() []byte {
- file_praefect_grpc_proxy_testdata_test_proto_rawDescOnce.Do(func() {
- file_praefect_grpc_proxy_testdata_test_proto_rawDescData = protoimpl.X.CompressGZIP(file_praefect_grpc_proxy_testdata_test_proto_rawDescData)
- })
- return file_praefect_grpc_proxy_testdata_test_proto_rawDescData
-}
-
-var file_praefect_grpc_proxy_testdata_test_proto_msgTypes = make([]protoimpl.MessageInfo, 3)
-var file_praefect_grpc_proxy_testdata_test_proto_goTypes = []interface{}{
- (*Empty)(nil), // 0: mwitkow.testproto.Empty
- (*PingRequest)(nil), // 1: mwitkow.testproto.PingRequest
- (*PingResponse)(nil), // 2: mwitkow.testproto.PingResponse
-}
-var file_praefect_grpc_proxy_testdata_test_proto_depIdxs = []int32{
- 0, // 0: mwitkow.testproto.TestService.PingEmpty:input_type -> mwitkow.testproto.Empty
- 1, // 1: mwitkow.testproto.TestService.Ping:input_type -> mwitkow.testproto.PingRequest
- 1, // 2: mwitkow.testproto.TestService.PingError:input_type -> mwitkow.testproto.PingRequest
- 1, // 3: mwitkow.testproto.TestService.PingList:input_type -> mwitkow.testproto.PingRequest
- 1, // 4: mwitkow.testproto.TestService.PingStream:input_type -> mwitkow.testproto.PingRequest
- 2, // 5: mwitkow.testproto.TestService.PingEmpty:output_type -> mwitkow.testproto.PingResponse
- 2, // 6: mwitkow.testproto.TestService.Ping:output_type -> mwitkow.testproto.PingResponse
- 0, // 7: mwitkow.testproto.TestService.PingError:output_type -> mwitkow.testproto.Empty
- 2, // 8: mwitkow.testproto.TestService.PingList:output_type -> mwitkow.testproto.PingResponse
- 2, // 9: mwitkow.testproto.TestService.PingStream:output_type -> mwitkow.testproto.PingResponse
- 5, // [5:10] is the sub-list for method output_type
- 0, // [0:5] is the sub-list for method input_type
- 0, // [0:0] is the sub-list for extension type_name
- 0, // [0:0] is the sub-list for extension extendee
- 0, // [0:0] is the sub-list for field type_name
-}
-
-func init() { file_praefect_grpc_proxy_testdata_test_proto_init() }
-func file_praefect_grpc_proxy_testdata_test_proto_init() {
- if File_praefect_grpc_proxy_testdata_test_proto != nil {
- return
- }
- if !protoimpl.UnsafeEnabled {
- file_praefect_grpc_proxy_testdata_test_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} {
- switch v := v.(*Empty); i {
- case 0:
- return &v.state
- case 1:
- return &v.sizeCache
- case 2:
- return &v.unknownFields
- default:
- return nil
- }
- }
- file_praefect_grpc_proxy_testdata_test_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} {
- switch v := v.(*PingRequest); i {
- case 0:
- return &v.state
- case 1:
- return &v.sizeCache
- case 2:
- return &v.unknownFields
- default:
- return nil
- }
- }
- file_praefect_grpc_proxy_testdata_test_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} {
- switch v := v.(*PingResponse); i {
- case 0:
- return &v.state
- case 1:
- return &v.sizeCache
- case 2:
- return &v.unknownFields
- default:
- return nil
- }
- }
- }
- type x struct{}
- out := protoimpl.TypeBuilder{
- File: protoimpl.DescBuilder{
- GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
- RawDescriptor: file_praefect_grpc_proxy_testdata_test_proto_rawDesc,
- NumEnums: 0,
- NumMessages: 3,
- NumExtensions: 0,
- NumServices: 1,
- },
- GoTypes: file_praefect_grpc_proxy_testdata_test_proto_goTypes,
- DependencyIndexes: file_praefect_grpc_proxy_testdata_test_proto_depIdxs,
- MessageInfos: file_praefect_grpc_proxy_testdata_test_proto_msgTypes,
- }.Build()
- File_praefect_grpc_proxy_testdata_test_proto = out.File
- file_praefect_grpc_proxy_testdata_test_proto_rawDesc = nil
- file_praefect_grpc_proxy_testdata_test_proto_goTypes = nil
- file_praefect_grpc_proxy_testdata_test_proto_depIdxs = nil
-}
diff --git a/internal/praefect/grpc-proxy/testdata/test.proto b/internal/praefect/grpc-proxy/testdata/test.proto
deleted file mode 100644
index 227d3cbeb..000000000
--- a/internal/praefect/grpc-proxy/testdata/test.proto
+++ /dev/null
@@ -1,31 +0,0 @@
-syntax = "proto3";
-
-package mwitkow.testproto;
-
-option go_package = "gitlab.com/gitlab-org/gitaly/v15/internal/praefect/grpc-proxy/testdata";
-
-message Empty {
-}
-
-message PingRequest {
- string value = 1;
-}
-
-message PingResponse {
- string Value = 1;
- int32 counter = 2;
-}
-
-service TestService {
- rpc PingEmpty(Empty) returns (PingResponse) {}
-
- rpc Ping(PingRequest) returns (PingResponse) {}
-
- rpc PingError(PingRequest) returns (Empty) {}
-
- rpc PingList(PingRequest) returns (stream PingResponse) {}
-
- rpc PingStream(stream PingRequest) returns (stream PingResponse) {}
-
-}
-
diff --git a/internal/praefect/grpc-proxy/testdata/test_grpc.pb.go b/internal/praefect/grpc-proxy/testdata/test_grpc.pb.go
deleted file mode 100644
index d1b660df2..000000000
--- a/internal/praefect/grpc-proxy/testdata/test_grpc.pb.go
+++ /dev/null
@@ -1,309 +0,0 @@
-// Code generated by protoc-gen-go-grpc. DO NOT EDIT.
-// versions:
-// - protoc-gen-go-grpc v1.2.0
-// - protoc v3.21.1
-// source: praefect/grpc-proxy/testdata/test.proto
-
-package testdata
-
-import (
- context "context"
- grpc "google.golang.org/grpc"
- codes "google.golang.org/grpc/codes"
- status "google.golang.org/grpc/status"
-)
-
-// This is a compile-time assertion to ensure that this generated file
-// is compatible with the grpc package it is being compiled against.
-// Requires gRPC-Go v1.32.0 or later.
-const _ = grpc.SupportPackageIsVersion7
-
-// TestServiceClient is the client API for TestService service.
-//
-// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream.
-type TestServiceClient interface {
- PingEmpty(ctx context.Context, in *Empty, opts ...grpc.CallOption) (*PingResponse, error)
- Ping(ctx context.Context, in *PingRequest, opts ...grpc.CallOption) (*PingResponse, error)
- PingError(ctx context.Context, in *PingRequest, opts ...grpc.CallOption) (*Empty, error)
- PingList(ctx context.Context, in *PingRequest, opts ...grpc.CallOption) (TestService_PingListClient, error)
- PingStream(ctx context.Context, opts ...grpc.CallOption) (TestService_PingStreamClient, error)
-}
-
-type testServiceClient struct {
- cc grpc.ClientConnInterface
-}
-
-func NewTestServiceClient(cc grpc.ClientConnInterface) TestServiceClient {
- return &testServiceClient{cc}
-}
-
-func (c *testServiceClient) PingEmpty(ctx context.Context, in *Empty, opts ...grpc.CallOption) (*PingResponse, error) {
- out := new(PingResponse)
- err := c.cc.Invoke(ctx, "/mwitkow.testproto.TestService/PingEmpty", in, out, opts...)
- if err != nil {
- return nil, err
- }
- return out, nil
-}
-
-func (c *testServiceClient) Ping(ctx context.Context, in *PingRequest, opts ...grpc.CallOption) (*PingResponse, error) {
- out := new(PingResponse)
- err := c.cc.Invoke(ctx, "/mwitkow.testproto.TestService/Ping", in, out, opts...)
- if err != nil {
- return nil, err
- }
- return out, nil
-}
-
-func (c *testServiceClient) PingError(ctx context.Context, in *PingRequest, opts ...grpc.CallOption) (*Empty, error) {
- out := new(Empty)
- err := c.cc.Invoke(ctx, "/mwitkow.testproto.TestService/PingError", in, out, opts...)
- if err != nil {
- return nil, err
- }
- return out, nil
-}
-
-func (c *testServiceClient) PingList(ctx context.Context, in *PingRequest, opts ...grpc.CallOption) (TestService_PingListClient, error) {
- stream, err := c.cc.NewStream(ctx, &TestService_ServiceDesc.Streams[0], "/mwitkow.testproto.TestService/PingList", opts...)
- if err != nil {
- return nil, err
- }
- x := &testServicePingListClient{stream}
- if err := x.ClientStream.SendMsg(in); err != nil {
- return nil, err
- }
- if err := x.ClientStream.CloseSend(); err != nil {
- return nil, err
- }
- return x, nil
-}
-
-type TestService_PingListClient interface {
- Recv() (*PingResponse, error)
- grpc.ClientStream
-}
-
-type testServicePingListClient struct {
- grpc.ClientStream
-}
-
-func (x *testServicePingListClient) Recv() (*PingResponse, error) {
- m := new(PingResponse)
- if err := x.ClientStream.RecvMsg(m); err != nil {
- return nil, err
- }
- return m, nil
-}
-
-func (c *testServiceClient) PingStream(ctx context.Context, opts ...grpc.CallOption) (TestService_PingStreamClient, error) {
- stream, err := c.cc.NewStream(ctx, &TestService_ServiceDesc.Streams[1], "/mwitkow.testproto.TestService/PingStream", opts...)
- if err != nil {
- return nil, err
- }
- x := &testServicePingStreamClient{stream}
- return x, nil
-}
-
-type TestService_PingStreamClient interface {
- Send(*PingRequest) error
- Recv() (*PingResponse, error)
- grpc.ClientStream
-}
-
-type testServicePingStreamClient struct {
- grpc.ClientStream
-}
-
-func (x *testServicePingStreamClient) Send(m *PingRequest) error {
- return x.ClientStream.SendMsg(m)
-}
-
-func (x *testServicePingStreamClient) Recv() (*PingResponse, error) {
- m := new(PingResponse)
- if err := x.ClientStream.RecvMsg(m); err != nil {
- return nil, err
- }
- return m, nil
-}
-
-// TestServiceServer is the server API for TestService service.
-// All implementations must embed UnimplementedTestServiceServer
-// for forward compatibility
-type TestServiceServer interface {
- PingEmpty(context.Context, *Empty) (*PingResponse, error)
- Ping(context.Context, *PingRequest) (*PingResponse, error)
- PingError(context.Context, *PingRequest) (*Empty, error)
- PingList(*PingRequest, TestService_PingListServer) error
- PingStream(TestService_PingStreamServer) error
- mustEmbedUnimplementedTestServiceServer()
-}
-
-// UnimplementedTestServiceServer must be embedded to have forward compatible implementations.
-type UnimplementedTestServiceServer struct {
-}
-
-func (UnimplementedTestServiceServer) PingEmpty(context.Context, *Empty) (*PingResponse, error) {
- return nil, status.Errorf(codes.Unimplemented, "method PingEmpty not implemented")
-}
-func (UnimplementedTestServiceServer) Ping(context.Context, *PingRequest) (*PingResponse, error) {
- return nil, status.Errorf(codes.Unimplemented, "method Ping not implemented")
-}
-func (UnimplementedTestServiceServer) PingError(context.Context, *PingRequest) (*Empty, error) {
- return nil, status.Errorf(codes.Unimplemented, "method PingError not implemented")
-}
-func (UnimplementedTestServiceServer) PingList(*PingRequest, TestService_PingListServer) error {
- return status.Errorf(codes.Unimplemented, "method PingList not implemented")
-}
-func (UnimplementedTestServiceServer) PingStream(TestService_PingStreamServer) error {
- return status.Errorf(codes.Unimplemented, "method PingStream not implemented")
-}
-func (UnimplementedTestServiceServer) mustEmbedUnimplementedTestServiceServer() {}
-
-// UnsafeTestServiceServer may be embedded to opt out of forward compatibility for this service.
-// Use of this interface is not recommended, as added methods to TestServiceServer will
-// result in compilation errors.
-type UnsafeTestServiceServer interface {
- mustEmbedUnimplementedTestServiceServer()
-}
-
-func RegisterTestServiceServer(s grpc.ServiceRegistrar, srv TestServiceServer) {
- s.RegisterService(&TestService_ServiceDesc, srv)
-}
-
-func _TestService_PingEmpty_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
- in := new(Empty)
- if err := dec(in); err != nil {
- return nil, err
- }
- if interceptor == nil {
- return srv.(TestServiceServer).PingEmpty(ctx, in)
- }
- info := &grpc.UnaryServerInfo{
- Server: srv,
- FullMethod: "/mwitkow.testproto.TestService/PingEmpty",
- }
- handler := func(ctx context.Context, req interface{}) (interface{}, error) {
- return srv.(TestServiceServer).PingEmpty(ctx, req.(*Empty))
- }
- return interceptor(ctx, in, info, handler)
-}
-
-func _TestService_Ping_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
- in := new(PingRequest)
- if err := dec(in); err != nil {
- return nil, err
- }
- if interceptor == nil {
- return srv.(TestServiceServer).Ping(ctx, in)
- }
- info := &grpc.UnaryServerInfo{
- Server: srv,
- FullMethod: "/mwitkow.testproto.TestService/Ping",
- }
- handler := func(ctx context.Context, req interface{}) (interface{}, error) {
- return srv.(TestServiceServer).Ping(ctx, req.(*PingRequest))
- }
- return interceptor(ctx, in, info, handler)
-}
-
-func _TestService_PingError_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
- in := new(PingRequest)
- if err := dec(in); err != nil {
- return nil, err
- }
- if interceptor == nil {
- return srv.(TestServiceServer).PingError(ctx, in)
- }
- info := &grpc.UnaryServerInfo{
- Server: srv,
- FullMethod: "/mwitkow.testproto.TestService/PingError",
- }
- handler := func(ctx context.Context, req interface{}) (interface{}, error) {
- return srv.(TestServiceServer).PingError(ctx, req.(*PingRequest))
- }
- return interceptor(ctx, in, info, handler)
-}
-
-func _TestService_PingList_Handler(srv interface{}, stream grpc.ServerStream) error {
- m := new(PingRequest)
- if err := stream.RecvMsg(m); err != nil {
- return err
- }
- return srv.(TestServiceServer).PingList(m, &testServicePingListServer{stream})
-}
-
-type TestService_PingListServer interface {
- Send(*PingResponse) error
- grpc.ServerStream
-}
-
-type testServicePingListServer struct {
- grpc.ServerStream
-}
-
-func (x *testServicePingListServer) Send(m *PingResponse) error {
- return x.ServerStream.SendMsg(m)
-}
-
-func _TestService_PingStream_Handler(srv interface{}, stream grpc.ServerStream) error {
- return srv.(TestServiceServer).PingStream(&testServicePingStreamServer{stream})
-}
-
-type TestService_PingStreamServer interface {
- Send(*PingResponse) error
- Recv() (*PingRequest, error)
- grpc.ServerStream
-}
-
-type testServicePingStreamServer struct {
- grpc.ServerStream
-}
-
-func (x *testServicePingStreamServer) Send(m *PingResponse) error {
- return x.ServerStream.SendMsg(m)
-}
-
-func (x *testServicePingStreamServer) Recv() (*PingRequest, error) {
- m := new(PingRequest)
- if err := x.ServerStream.RecvMsg(m); err != nil {
- return nil, err
- }
- return m, nil
-}
-
-// TestService_ServiceDesc is the grpc.ServiceDesc for TestService service.
-// It's only intended for direct use with grpc.RegisterService,
-// and not to be introspected or modified (even as a copy)
-var TestService_ServiceDesc = grpc.ServiceDesc{
- ServiceName: "mwitkow.testproto.TestService",
- HandlerType: (*TestServiceServer)(nil),
- Methods: []grpc.MethodDesc{
- {
- MethodName: "PingEmpty",
- Handler: _TestService_PingEmpty_Handler,
- },
- {
- MethodName: "Ping",
- Handler: _TestService_Ping_Handler,
- },
- {
- MethodName: "PingError",
- Handler: _TestService_PingError_Handler,
- },
- },
- Streams: []grpc.StreamDesc{
- {
- StreamName: "PingList",
- Handler: _TestService_PingList_Handler,
- ServerStreams: true,
- },
- {
- StreamName: "PingStream",
- Handler: _TestService_PingStream_Handler,
- ServerStreams: true,
- ClientStreams: true,
- },
- },
- Metadata: "praefect/grpc-proxy/testdata/test.proto",
-}