diff options
author | John Cai <jcai@gitlab.com> | 2022-07-28 20:46:51 +0300 |
---|---|---|
committer | John Cai <jcai@gitlab.com> | 2022-07-28 20:46:51 +0300 |
commit | 0fa08d953e0d5497fe5366836d0ed54b9ff557d8 (patch) | |
tree | 0cbaa7c48dbc03c3d0226a6007e46bb9ebbb764e | |
parent | 1ce91833ee8b048250cd6eb776ea7c6ab61f1366 (diff) | |
parent | c5ca566139c5855201904f0b428ff18a732ae3fa (diff) |
Merge branch 'pks-grpc-proxy-test-refactorings' into 'master'
proxy: Refactor tests to share less state and use `grpc_testing` test service
See merge request gitlab-org/gitaly!4756
-rw-r--r-- | Makefile | 1 | ||||
-rw-r--r-- | client/dial_test.go | 60 | ||||
-rw-r--r-- | internal/praefect/grpc-proxy/proxy/codec_test.go | 2 | ||||
-rw-r--r-- | internal/praefect/grpc-proxy/proxy/handler_ext_test.go | 517 | ||||
-rw-r--r-- | internal/praefect/grpc-proxy/proxy/handler_test.go | 2 | ||||
-rw-r--r-- | internal/praefect/grpc-proxy/proxy/helper_test.go | 120 | ||||
-rw-r--r-- | internal/praefect/grpc-proxy/proxy/peeker_test.go | 169 | ||||
-rw-r--r-- | internal/praefect/grpc-proxy/proxy/testhelper_test.go | 131 | ||||
-rw-r--r-- | internal/praefect/grpc-proxy/testdata/test.pb.go | 306 | ||||
-rw-r--r-- | internal/praefect/grpc-proxy/testdata/test.proto | 31 | ||||
-rw-r--r-- | internal/praefect/grpc-proxy/testdata/test_grpc.pb.go | 309 |
11 files changed, 517 insertions, 1131 deletions
@@ -498,7 +498,6 @@ proto: ${PROTOC} ${PROTOC_GEN_GO} ${PROTOC_GEN_GO_GRPC} ${PROTOC_GEN_GITALY_PROT ${PROTOC} ${SHARED_PROTOC_OPTS} -I ${SOURCE_DIR}/proto -I ${PROTOC_INSTALL_DIR}/include --go_out=${SOURCE_DIR}/proto/go/gitalypb --gitaly-protolist_out=proto_dir=${SOURCE_DIR}/proto,gitalypb_dir=${SOURCE_DIR}/proto/go/gitalypb:${SOURCE_DIR} --go-grpc_out=${SOURCE_DIR}/proto/go/gitalypb ${SOURCE_DIR}/proto/*.proto ${SOURCE_DIR}/_support/generate-proto-ruby @ # this part is related to the generation of sources from testing proto files - ${PROTOC} ${SHARED_PROTOC_OPTS} -I ${SOURCE_DIR}/internal --go_out=${SOURCE_DIR}/internal --go-grpc_out=${SOURCE_DIR}/internal ${SOURCE_DIR}/internal/praefect/grpc-proxy/testdata/test.proto ${PROTOC} ${SHARED_PROTOC_OPTS} -I ${SOURCE_DIR}/proto -I ${SOURCE_DIR}/internal -I ${PROTOC_INSTALL_DIR}/include --go_out=${SOURCE_DIR}/internal --go-grpc_out=${SOURCE_DIR}/internal \ ${SOURCE_DIR}/internal/praefect/mock/mock.proto \ ${SOURCE_DIR}/internal/middleware/cache/testdata/stream.proto \ diff --git a/client/dial_test.go b/client/dial_test.go index e67b6bea9..92b4aa0be 100644 --- a/client/dial_test.go +++ b/client/dial_test.go @@ -19,7 +19,6 @@ import ( "github.com/stretchr/testify/require" "github.com/uber/jaeger-client-go" gitalyauth "gitlab.com/gitlab-org/gitaly/v15/auth" - proxytestdata "gitlab.com/gitlab-org/gitaly/v15/internal/praefect/grpc-proxy/testdata" "gitlab.com/gitlab-org/gitaly/v15/internal/testhelper" gitalyx509 "gitlab.com/gitlab-org/gitaly/v15/internal/x509" "gitlab.com/gitlab-org/labkit/correlation" @@ -31,6 +30,7 @@ import ( "google.golang.org/grpc/credentials/insecure" healthpb "google.golang.org/grpc/health/grpc_health_v1" "google.golang.org/grpc/status" + "google.golang.org/grpc/test/grpc_testing" ) var proxyEnvironmentKeys = []string{"http_proxy", "https_proxy", "no_proxy"} @@ -257,22 +257,22 @@ func TestDialSidechannel(t *testing.T) { } type testSvc struct { - proxytestdata.UnimplementedTestServiceServer - PingMethod func(context.Context, *proxytestdata.PingRequest) (*proxytestdata.PingResponse, error) - PingStreamMethod func(stream proxytestdata.TestService_PingStreamServer) error + grpc_testing.UnimplementedTestServiceServer + unaryCall func(context.Context, *grpc_testing.SimpleRequest) (*grpc_testing.SimpleResponse, error) + fullDuplexCall func(stream grpc_testing.TestService_FullDuplexCallServer) error } -func (ts *testSvc) Ping(ctx context.Context, r *proxytestdata.PingRequest) (*proxytestdata.PingResponse, error) { - if ts.PingMethod != nil { - return ts.PingMethod(ctx, r) +func (ts *testSvc) UnaryCall(ctx context.Context, r *grpc_testing.SimpleRequest) (*grpc_testing.SimpleResponse, error) { + if ts.unaryCall != nil { + return ts.unaryCall(ctx, r) } - return &proxytestdata.PingResponse{}, nil + return &grpc_testing.SimpleResponse{}, nil } -func (ts *testSvc) PingStream(stream proxytestdata.TestService_PingStreamServer) error { - if ts.PingStreamMethod != nil { - return ts.PingStreamMethod(stream) +func (ts *testSvc) FullDuplexCall(stream grpc_testing.TestService_FullDuplexCallServer) error { + if ts.fullDuplexCall != nil { + return ts.fullDuplexCall(stream) } return nil @@ -287,13 +287,13 @@ func TestDial_Correlation(t *testing.T) { grpcServer := grpc.NewServer(grpc.UnaryInterceptor(grpccorrelation.UnaryServerCorrelationInterceptor())) svc := &testSvc{ - PingMethod: func(ctx context.Context, r *proxytestdata.PingRequest) (*proxytestdata.PingResponse, error) { + unaryCall: func(ctx context.Context, r *grpc_testing.SimpleRequest) (*grpc_testing.SimpleResponse, error) { cid := correlation.ExtractFromContext(ctx) assert.Equal(t, "correlation-id-1", cid) - return &proxytestdata.PingResponse{}, nil + return &grpc_testing.SimpleResponse{}, nil }, } - proxytestdata.RegisterTestServiceServer(grpcServer, svc) + grpc_testing.RegisterTestServiceServer(grpcServer, svc) go func() { assert.NoError(t, grpcServer.Serve(listener)) }() @@ -304,10 +304,10 @@ func TestDial_Correlation(t *testing.T) { require.NoError(t, err) defer cc.Close() - client := proxytestdata.NewTestServiceClient(cc) + client := grpc_testing.NewTestServiceClient(cc) ctx = correlation.ContextWithCorrelation(ctx, "correlation-id-1") - _, err = client.Ping(ctx, &proxytestdata.PingRequest{}) + _, err = client.UnaryCall(ctx, &grpc_testing.SimpleRequest{}) require.NoError(t, err) }) @@ -319,15 +319,15 @@ func TestDial_Correlation(t *testing.T) { grpcServer := grpc.NewServer(grpc.StreamInterceptor(grpccorrelation.StreamServerCorrelationInterceptor())) svc := &testSvc{ - PingStreamMethod: func(stream proxytestdata.TestService_PingStreamServer) error { + fullDuplexCall: func(stream grpc_testing.TestService_FullDuplexCallServer) error { cid := correlation.ExtractFromContext(stream.Context()) assert.Equal(t, "correlation-id-1", cid) _, err := stream.Recv() assert.NoError(t, err) - return stream.Send(&proxytestdata.PingResponse{}) + return stream.Send(&grpc_testing.StreamingOutputCallResponse{}) }, } - proxytestdata.RegisterTestServiceServer(grpcServer, svc) + grpc_testing.RegisterTestServiceServer(grpcServer, svc) go func() { assert.NoError(t, grpcServer.Serve(listener)) }() defer grpcServer.Stop() @@ -337,13 +337,13 @@ func TestDial_Correlation(t *testing.T) { require.NoError(t, err) defer cc.Close() - client := proxytestdata.NewTestServiceClient(cc) + client := grpc_testing.NewTestServiceClient(cc) ctx = correlation.ContextWithCorrelation(ctx, "correlation-id-1") - stream, err := client.PingStream(ctx) + stream, err := client.FullDuplexCall(ctx) require.NoError(t, err) - require.NoError(t, stream.Send(&proxytestdata.PingRequest{})) + require.NoError(t, stream.Send(&grpc_testing.StreamingOutputCallRequest{})) require.NoError(t, stream.CloseSend()) _, err = stream.Recv() @@ -367,13 +367,13 @@ func TestDial_Tracing(t *testing.T) { grpc.StreamInterceptor(grpctracing.StreamServerTracingInterceptor()), ) svc := &testSvc{ - PingMethod: func(ctx context.Context, r *proxytestdata.PingRequest) (*proxytestdata.PingResponse, error) { + unaryCall: func(ctx context.Context, r *grpc_testing.SimpleRequest) (*grpc_testing.SimpleResponse, error) { span, _ := opentracing.StartSpanFromContext(ctx, "nested-span") defer span.Finish() span.LogKV("was", "called") - return &proxytestdata.PingResponse{}, nil + return &grpc_testing.SimpleResponse{}, nil }, - PingStreamMethod: func(stream proxytestdata.TestService_PingStreamServer) error { + fullDuplexCall: func(stream grpc_testing.TestService_FullDuplexCallServer) error { // synchronize the client has returned from CloseSend as the client span finishing // races with sending the stream close to the server select { @@ -388,7 +388,7 @@ func TestDial_Tracing(t *testing.T) { return nil }, } - proxytestdata.RegisterTestServiceServer(grpcServer, svc) + grpc_testing.RegisterTestServiceServer(grpcServer, svc) go func() { require.NoError(t, grpcServer.Serve(listener)) }() defer grpcServer.Stop() @@ -419,7 +419,7 @@ func TestDial_Tracing(t *testing.T) { // We're now invoking the unary RPC with the span injected into // the context. This should create a span that's nested into // the "stream-check" span. - _, err = proxytestdata.NewTestServiceClient(cc).Ping(ctx, &proxytestdata.PingRequest{}) + _, err = grpc_testing.NewTestServiceClient(cc).UnaryCall(ctx, &grpc_testing.SimpleRequest{}) require.NoError(t, err) span.Finish() @@ -438,7 +438,7 @@ func TestDial_Tracing(t *testing.T) { // This span is the RPC call to TestService/Ping. It // inherits the "unary-check" we set up and thus has // baggage. - {baggage: "stub", operation: "/mwitkow.testproto.TestService/Ping"}, + {baggage: "stub", operation: "/grpc.testing.TestService/UnaryCall"}, // And this finally is the outermost span which we // manually set up before the RPC call. {baggage: "stub", operation: "unary-check"}, @@ -474,7 +474,7 @@ func TestDial_Tracing(t *testing.T) { // We're now invoking the streaming RPC with the span injected into the context. // This should create a span that's nested into the "stream-check" span. - stream, err := proxytestdata.NewTestServiceClient(cc).PingStream(ctx) + stream, err := grpc_testing.NewTestServiceClient(cc).FullDuplexCall(ctx) require.NoError(t, err) require.NoError(t, stream.CloseSend()) close(clientSendClosed) @@ -494,7 +494,7 @@ func TestDial_Tracing(t *testing.T) { operation string }{ // This span is the RPC call to TestService/Ping. - {baggage: "stub", operation: "/mwitkow.testproto.TestService/PingStream"}, + {baggage: "stub", operation: "/grpc.testing.TestService/FullDuplexCall"}, // This is the second span we expect, which is the "nested-span" span which // we've manually created inside of PingMethod. This is different than for // unary RPCs: given that one can send multiple messages to the RPC, we may diff --git a/internal/praefect/grpc-proxy/proxy/codec_test.go b/internal/praefect/grpc-proxy/proxy/codec_test.go index 959d4203d..b05738f1e 100644 --- a/internal/praefect/grpc-proxy/proxy/codec_test.go +++ b/internal/praefect/grpc-proxy/proxy/codec_test.go @@ -9,6 +9,8 @@ import ( ) func TestCodec_ReadYourWrites(t *testing.T) { + t.Parallel() + framePtr := &frame{} data := []byte{0xDE, 0xAD, 0xBE, 0xEF} codec := rawCodec{} diff --git a/internal/praefect/grpc-proxy/proxy/handler_ext_test.go b/internal/praefect/grpc-proxy/proxy/handler_ext_test.go index 2610a6132..7e5234fcd 100644 --- a/internal/praefect/grpc-proxy/proxy/handler_ext_test.go +++ b/internal/praefect/grpc-proxy/proxy/handler_ext_test.go @@ -18,134 +18,99 @@ import ( "testing" "github.com/getsentry/sentry-go" - grpcmw "github.com/grpc-ecosystem/go-grpc-middleware" - grpcmwtags "github.com/grpc-ecosystem/go-grpc-middleware/tags" - "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "github.com/stretchr/testify/suite" "gitlab.com/gitlab-org/gitaly/v15/client" - "gitlab.com/gitlab-org/gitaly/v15/internal/helper/fieldextractors" + "gitlab.com/gitlab-org/gitaly/v15/internal/helper" "gitlab.com/gitlab-org/gitaly/v15/internal/metadata" - "gitlab.com/gitlab-org/gitaly/v15/internal/middleware/sentryhandler" "gitlab.com/gitlab-org/gitaly/v15/internal/praefect/grpc-proxy/proxy" - pb "gitlab.com/gitlab-org/gitaly/v15/internal/praefect/grpc-proxy/testdata" "gitlab.com/gitlab-org/gitaly/v15/internal/testhelper" "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/credentials/insecure" grpc_metadata "google.golang.org/grpc/metadata" "google.golang.org/grpc/status" + "google.golang.org/grpc/test/grpc_testing" ) const ( - pingDefaultValue = "I like kittens." - clientMdKey = "test-client-header" - serverHeaderMdKey = "test-client-header" - serverTrailerMdKey = "test-client-trailer" - rejectingMdKey = "test-reject-rpc-if-in-context" - - countListResponses = 20 ) -func TestMain(m *testing.M) { - testhelper.Run(m) +func TestHandler_carriesClientMetadata(t *testing.T) { + t.Parallel() + testHandlerCarriesClientMetadata(t) } -// asserting service is implemented on the server side and serves as a handler for stuff -type assertingService struct { - pb.UnimplementedTestServiceServer - t *testing.T -} +func TestHandler_carriesClientMetadataStressTest(t *testing.T) { + t.Parallel() -func (s *assertingService) PingEmpty(ctx context.Context, _ *pb.Empty) (*pb.PingResponse, error) { - // Check that this call has client's metadata. - md, ok := grpc_metadata.FromIncomingContext(ctx) - assert.True(s.t, ok, "PingEmpty call must have metadata in context") - _, ok = md[clientMdKey] - assert.True(s.t, ok, "PingEmpty call must have clients's custom headers in metadata") - return &pb.PingResponse{Value: pingDefaultValue, Counter: 42}, nil + for i := 0; i < 50; i++ { + testHandlerCarriesClientMetadata(t) + } } -func (s *assertingService) Ping(ctx context.Context, ping *pb.PingRequest) (*pb.PingResponse, error) { - // Send user trailers and headers. - require.NoError(s.t, grpc.SendHeader(ctx, grpc_metadata.Pairs(serverHeaderMdKey, "I like turtles."))) - require.NoError(s.t, grpc.SetTrailer(ctx, grpc_metadata.Pairs(serverTrailerMdKey, "I like ending turtles."))) - return &pb.PingResponse{Value: ping.Value, Counter: 42}, nil -} +func testHandlerCarriesClientMetadata(t *testing.T) { + ctx, client, backend := setupProxy(t) -func (s *assertingService) PingError(ctx context.Context, ping *pb.PingRequest) (*pb.Empty, error) { - return nil, status.Errorf(codes.ResourceExhausted, "Userspace error.") -} + backend.unaryCall = func(ctx context.Context, request *grpc_testing.SimpleRequest) (*grpc_testing.SimpleResponse, error) { + metadata, ok := grpc_metadata.FromIncomingContext(ctx) + require.True(t, ok) -func (s *assertingService) PingList(ping *pb.PingRequest, stream pb.TestService_PingListServer) error { - // Send user trailers and headers. - require.NoError(s.t, stream.SendHeader(grpc_metadata.Pairs(serverHeaderMdKey, "I like turtles."))) - for i := 0; i < countListResponses; i++ { - require.NoError(s.t, stream.Send(&pb.PingResponse{Value: ping.Value, Counter: int32(i)})) - } - stream.SetTrailer(grpc_metadata.Pairs(serverTrailerMdKey, "I like ending turtles.")) - return nil -} + metadataValue, ok := metadata["injected_metadata"] + require.True(t, ok) + require.Equal(t, []string{"injected_value"}, metadataValue) -func (s *assertingService) PingStream(stream pb.TestService_PingStreamServer) error { - require.NoError(s.t, stream.SendHeader(grpc_metadata.Pairs(serverHeaderMdKey, "I like turtles."))) - counter := int32(0) - for { - ping, err := stream.Recv() - if err == io.EOF { - break - } else if err != nil { - require.NoError(s.t, err, "can't fail reading stream") - return err - } - pong := &pb.PingResponse{Value: ping.Value, Counter: counter} - if err := stream.Send(pong); err != nil { - require.NoError(s.t, err, "can't fail sending back a pong") - } - counter++ + return &grpc_testing.SimpleResponse{Payload: request.GetPayload()}, nil } - stream.SetTrailer(grpc_metadata.Pairs(serverTrailerMdKey, "I like ending turtles.")) - return nil -} -// ProxyHappySuite tests the "happy" path of handling: that everything works in absence of connection issues. -type ProxyHappySuite struct { - suite.Suite - ctx context.Context - cancel context.CancelFunc - server *grpc.Server - proxy *grpc.Server - connProxy2Server *grpc.ClientConn - client pb.TestServiceClient - connClient2Proxy *grpc.ClientConn -} + ctx = grpc_metadata.NewOutgoingContext(ctx, grpc_metadata.Pairs("injected_metadata", "injected_value")) -func (s *ProxyHappySuite) TestPingEmptyCarriesClientMetadata() { - ctx := grpc_metadata.NewOutgoingContext(s.ctx, grpc_metadata.Pairs(clientMdKey, "true")) - out, err := s.client.PingEmpty(ctx, &pb.Empty{}) - require.NoError(s.T(), err, "PingEmpty should succeed without errors") - testhelper.ProtoEqual(s.T(), &pb.PingResponse{Value: pingDefaultValue, Counter: 42}, out) + response, err := client.UnaryCall(ctx, &grpc_testing.SimpleRequest{ + Payload: &grpc_testing.Payload{Body: []byte("data")}, + }) + require.NoError(t, err, "PingEmpty should succeed without errors") + testhelper.ProtoEqual(t, &grpc_testing.SimpleResponse{ + Payload: &grpc_testing.Payload{Body: []byte("data")}, + }, response) } -func (s *ProxyHappySuite) TestPingEmpty_StressTest() { - for i := 0; i < 50; i++ { - s.TestPingEmptyCarriesClientMetadata() +func TestHandler_carriesHeadersAndTrailers(t *testing.T) { + t.Parallel() + + ctx, client, backend := setupProxy(t) + + backend.unaryCall = func(ctx context.Context, request *grpc_testing.SimpleRequest) (*grpc_testing.SimpleResponse, error) { + require.NoError(t, grpc.SendHeader(ctx, grpc_metadata.Pairs("injected_header", "header_value"))) + require.NoError(t, grpc.SetTrailer(ctx, grpc_metadata.Pairs("injected_trailer", "trailer_value"))) + return &grpc_testing.SimpleResponse{Payload: request.GetPayload()}, nil } -} -func (s *ProxyHappySuite) TestPingCarriesServerHeadersAndTrailers() { - headerMd := make(grpc_metadata.MD) - trailerMd := make(grpc_metadata.MD) - // This is an awkward calling convention... but meh. - out, err := s.client.Ping(s.ctx, &pb.PingRequest{Value: "foo"}, grpc.Header(&headerMd), grpc.Trailer(&trailerMd)) - require.NoError(s.T(), err, "Ping should succeed without errors") - testhelper.ProtoEqual(s.T(), &pb.PingResponse{Value: "foo", Counter: 42}, out) - assert.Contains(s.T(), headerMd, serverHeaderMdKey, "server response headers must contain server data") - assert.Len(s.T(), trailerMd, 1, "server response trailers must contain server data") + var headerMetadata, trailerMetadata grpc_metadata.MD + + response, err := client.UnaryCall(ctx, &grpc_testing.SimpleRequest{ + Payload: &grpc_testing.Payload{Body: []byte("data")}, + }, grpc.Header(&headerMetadata), grpc.Trailer(&trailerMetadata)) + require.NoError(t, err) + testhelper.ProtoEqual(t, &grpc_testing.SimpleResponse{ + Payload: &grpc_testing.Payload{Body: []byte("data")}, + }, response) + + require.Equal(t, grpc_metadata.Pairs( + "content-type", "application/grpc", + "injected_header", "header_value", + ), headerMetadata) + require.Equal(t, grpc_metadata.Pairs( + "injected_trailer", "trailer_value", + ), trailerMetadata) } -func (s *ProxyHappySuite) TestPingErrorPropagatesAppError() { +func TestHandler_propagatesServerError(t *testing.T) { + ctx, client, backend := setupProxy(t) + + backend.unaryCall = func(context.Context, *grpc_testing.SimpleRequest) (*grpc_testing.SimpleResponse, error) { + return nil, status.Errorf(codes.ResourceExhausted, "service error") + } + sentryTriggered := 0 sentrySrv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { sentryTriggered++ @@ -154,79 +119,124 @@ func (s *ProxyHappySuite) TestPingErrorPropagatesAppError() { // minimal required sentry client configuration sentryURL, err := url.Parse(sentrySrv.URL) - require.NoError(s.T(), err) + require.NoError(t, err) sentryURL.User = url.UserPassword("stub", "stub") sentryURL.Path = "/stub/1" - require.NoError(s.T(), sentry.Init(sentry.ClientOptions{ + require.NoError(t, sentry.Init(sentry.ClientOptions{ Dsn: sentryURL.String(), Transport: sentry.NewHTTPSyncTransport(), })) + // Verify that Sentry is configured correctyl to be triggered. sentry.CaptureEvent(sentry.NewEvent()) - require.Equal(s.T(), 1, sentryTriggered, "sentry configured incorrectly") + require.Equal(t, 1, sentryTriggered) + + _, err = client.UnaryCall(ctx, &grpc_testing.SimpleRequest{}) + testhelper.RequireGrpcError(t, status.Errorf(codes.ResourceExhausted, "service error"), err) + + // Sentry must not be triggered because errors from remote must be just propagated. + require.Equal(t, 1, sentryTriggered) +} + +func TestHandler_directorErrorIsPropagated(t *testing.T) { + t.Parallel() - _, err = s.client.PingError(s.ctx, &pb.PingRequest{Value: "foo"}) - require.Error(s.T(), err, "PingError should never succeed") - assert.Equal(s.T(), codes.ResourceExhausted, status.Code(err)) - assert.Equal(s.T(), "Userspace error.", status.Convert(err).Message()) - require.Equal(s.T(), 1, sentryTriggered, "sentry must not be triggered because errors from remote must be just propagated") + // There is no need to set up the backend given that we should reject the call before we + // even hit the server. + ctx, client, _ := setupProxy(t) + + // The proxy's director is set up so that it is rejecting requests when it sees the + // following gRPC metadata. + ctx = grpc_metadata.NewOutgoingContext(ctx, grpc_metadata.Pairs(rejectingMdKey, "true")) + + _, err := client.UnaryCall(ctx, &grpc_testing.SimpleRequest{}) + require.Error(t, err) + testhelper.RequireGrpcError(t, helper.ErrPermissionDeniedf("testing rejection"), err) +} + +func TestHandler_fullDuplex(t *testing.T) { + t.Parallel() + testHandlerFullDuplex(t) } -func (s *ProxyHappySuite) TestDirectorErrorIsPropagated() { - // See SetupSuite where the StreamDirector has a special case. - ctx := grpc_metadata.NewOutgoingContext(s.ctx, grpc_metadata.Pairs(rejectingMdKey, "true")) - _, err := s.client.Ping(ctx, &pb.PingRequest{Value: "foo"}) - require.Error(s.T(), err, "Director should reject this RPC") - assert.Equal(s.T(), codes.PermissionDenied, status.Code(err)) - assert.Equal(s.T(), "testing rejection", status.Convert(err).Message()) +func TestHandler_fullDuplexStressTest(t *testing.T) { + t.Parallel() + + for i := 0; i < 50; i++ { + testHandlerFullDuplex(t) + } } -func (s *ProxyHappySuite) TestPingStream_FullDuplexWorks() { - stream, err := s.client.PingStream(s.ctx) - require.NoError(s.T(), err, "PingStream request should be successful.") +func testHandlerFullDuplex(t *testing.T) { + ctx, client, backend := setupProxy(t) - for i := 0; i < countListResponses; i++ { - ping := &pb.PingRequest{Value: fmt.Sprintf("foo:%d", i)} - require.NoError(s.T(), stream.Send(ping), "sending to PingStream must not fail") - resp, err := stream.Recv() - if err == io.EOF { - break + backend.fullDuplexCall = func(stream grpc_testing.TestService_FullDuplexCallServer) error { + require.NoError(t, stream.SendHeader(grpc_metadata.Pairs("custom_header", "header_value"))) + + for i := 0; ; i++ { + request, err := stream.Recv() + if err == io.EOF { + break + } + require.NoError(t, err) + + require.NoError(t, stream.Send(&grpc_testing.StreamingOutputCallResponse{ + Payload: &grpc_testing.Payload{ + Body: []byte(fmt.Sprintf("%s: %d", request.GetPayload().GetBody(), i)), + }, + })) } + + stream.SetTrailer(grpc_metadata.Pairs("custom_trailer", "trailer_value")) + return nil + } + + stream, err := client.FullDuplexCall(ctx) + require.NoError(t, err) + + for i := 0; i < 20; i++ { + require.NoError(t, stream.Send(&grpc_testing.StreamingOutputCallRequest{ + Payload: &grpc_testing.Payload{ + Body: []byte(fmt.Sprintf("foo:%d", i)), + }, + })) + + response, err := stream.Recv() + require.NoError(t, err) + testhelper.ProtoEqual(t, &grpc_testing.StreamingOutputCallResponse{ + Payload: &grpc_testing.Payload{ + Body: []byte(fmt.Sprintf("foo:%d: %d", i, i)), + }, + }, response) + if i == 0 { - // Check that the header arrives before all entries. - headerMd, err := stream.Header() - require.NoError(s.T(), err, "PingStream headers should not error.") - assert.Contains(s.T(), headerMd, serverHeaderMdKey, "PingStream response headers user contain metadata") + headerMetadata, err := stream.Header() + require.NoError(t, err) + require.Equal(t, grpc_metadata.Pairs( + "content-type", "application/grpc", + "custom_header", "header_value", + ), headerMetadata) } - assert.EqualValues(s.T(), i, resp.Counter, "ping roundtrip must succeed with the correct id") } - require.NoError(s.T(), stream.CloseSend(), "no error on close send") + + require.NoError(t, stream.CloseSend()) _, err = stream.Recv() - require.Equal(s.T(), io.EOF, err, "stream should close with io.EOF, meaining OK") - // Check that the trailer headers are here. - trailerMd := stream.Trailer() - assert.Len(s.T(), trailerMd, 1, "PingList trailer headers user contain metadata") -} + require.Equal(t, io.EOF, err) -func (s *ProxyHappySuite) TestPingStream_StressTest() { - for i := 0; i < 50; i++ { - s.TestPingStream_FullDuplexWorks() - } + require.Equal(t, grpc_metadata.Pairs( + "custom_trailer", "trailer_value", + ), stream.Trailer()) } -func (s *ProxyHappySuite) SetupSuite() { - s.ctx, s.cancel = context.WithCancel(testhelper.Context(s.T())) +func setupProxy(t *testing.T) (context.Context, grpc_testing.TestServiceClient, *interceptPinger) { + t.Helper() + + ctx := testhelper.Context(t) - listenerProxy, err := net.Listen("tcp", "127.0.0.1:0") - require.NoError(s.T(), err, "must be able to allocate a port for listenerProxy") - listenerServer, err := net.Listen("tcp", "127.0.0.1:0") - require.NoError(s.T(), err, "must be able to allocate a port for listenerServer") + proxy2Server, backend := newBackendPinger(t, ctx) - // Setup of the proxy's Director. - s.connProxy2Server, err = grpc.Dial(listenerServer.Addr().String(), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithDefaultCallOptions(grpc.ForceCodec(proxy.NewCodec()))) - require.NoError(s.T(), err, "must not error on deferred client Dial") - director := func(ctx context.Context, fullName string, peeker proxy.StreamPeeker) (*proxy.StreamParameters, error) { + director := func(ctx context.Context, _ string, peeker proxy.StreamPeeker) (*proxy.StreamParameters, error) { payload, err := peeker.Peek() if err != nil { return nil, err @@ -240,66 +250,21 @@ func (s *ProxyHappySuite) SetupSuite() { } // Explicitly copy the metadata, otherwise the tests will fail. - return proxy.NewStreamParameters(proxy.Destination{Ctx: metadata.IncomingToOutgoing(ctx), Conn: s.connProxy2Server, Msg: payload}, nil, nil, nil), nil + return proxy.NewStreamParameters(proxy.Destination{ + Ctx: metadata.IncomingToOutgoing(ctx), + Conn: proxy2Server, + Msg: payload, + }, nil, nil, nil), nil } - // Setup backend server for test suite - s.server = grpc.NewServer() - pb.RegisterTestServiceServer(s.server, &assertingService{t: s.T()}) - go func() { - s.server.Serve(listenerServer) - }() - - // Setup grpc-proxy server for test suite - s.proxy = grpc.NewServer( - grpc.ForceServerCodec(proxy.NewCodec()), - grpc.StreamInterceptor( - grpcmw.ChainStreamServer( - // context tags usage is required by sentryhandler.StreamLogHandler - grpcmwtags.StreamServerInterceptor(grpcmwtags.WithFieldExtractorForInitialReq(fieldextractors.FieldExtractor)), - // sentry middleware to capture errors - sentryhandler.StreamLogHandler, - ), - ), - grpc.UnknownServiceHandler(proxy.TransparentHandler(director)), - ) - // Ping handler is handled as an explicit registration and not as a TransparentHandler. - proxy.RegisterService(s.proxy, director, "mwitkow.testproto.TestService", "Ping") - go func() { - s.proxy.Serve(listenerProxy) - }() - - // Setup client for test suite - ctx := testhelper.Context(s.T()) - - s.connClient2Proxy, err = grpc.DialContext(ctx, listenerProxy.Addr().String(), grpc.WithTransportCredentials(insecure.NewCredentials())) - require.NoError(s.T(), err, "must not error on deferred client Dial") - s.client = pb.NewTestServiceClient(s.connClient2Proxy) -} - -func (s *ProxyHappySuite) TearDownSuite() { - if s.cancel != nil { - s.cancel() - } - if s.connClient2Proxy != nil { - s.connClient2Proxy.Close() - } - if s.connProxy2Server != nil { - s.connProxy2Server.Close() - } - if s.proxy != nil { - s.proxy.Stop() - } - if s.server != nil { - s.server.Stop() - } -} + client2Proxy := newProxy(t, ctx, director, "mwitkow.testproto.TestService", "Ping") -func TestProxyHappySuite(t *testing.T) { - suite.Run(t, &ProxyHappySuite{}) + return ctx, grpc_testing.NewTestServiceClient(client2Proxy), backend } func TestProxyErrorPropagation(t *testing.T) { + t.Parallel() + errBackend := status.Error(codes.InvalidArgument, "backend error") errDirector := status.Error(codes.FailedPrecondition, "director error") errRequestFinalizer := status.Error(codes.Internal, "request finalizer error") @@ -418,7 +383,7 @@ func TestProxyErrorPropagation(t *testing.T) { require.NoError(t, proxyClientConn.Close()) }() - resp, err := pb.NewTestServiceClient(proxyClientConn).Ping(ctx, &pb.PingRequest{}) + resp, err := grpc_testing.NewTestServiceClient(proxyClientConn).UnaryCall(ctx, &grpc_testing.SimpleRequest{}) testhelper.RequireGrpcError(t, tc.returnedError, err) require.Nil(t, resp) }) @@ -426,76 +391,114 @@ func TestProxyErrorPropagation(t *testing.T) { } func TestRegisterStreamHandlers(t *testing.T) { - directorCalledError := errors.New("director was called") - - server := grpc.NewServer( - grpc.ForceServerCodec(proxy.NewCodec()), - grpc.UnknownServiceHandler(proxy.TransparentHandler(func(ctx context.Context, fullMethodName string, peeker proxy.StreamPeeker) (*proxy.StreamParameters, error) { - return nil, directorCalledError - })), - ) - - var pingStreamHandlerCalled, pingEmptyStreamHandlerCalled bool - - pingValue := "hello" + t.Parallel() - pingStreamHandler := func(srv interface{}, stream grpc.ServerStream) error { - pingStreamHandlerCalled = true - var req pb.PingRequest - - if err := stream.RecvMsg(&req); err != nil { - return err - } - - require.Equal(t, pingValue, req.Value) + directorCalledError := errors.New("director was called") - return stream.SendMsg(nil) + requestSent := &grpc_testing.SimpleRequest{ + Payload: &grpc_testing.Payload{ + Body: []byte("hello"), + }, } - pingEmptyStreamHandler := func(srv interface{}, stream grpc.ServerStream) error { - pingEmptyStreamHandlerCalled = true - var req pb.Empty - - if err := stream.RecvMsg(&req); err != nil { - return err - } - - return stream.SendMsg(nil) + unaryCallStreamHandler := func(t *testing.T, srv interface{}, stream grpc.ServerStream) { + var request grpc_testing.SimpleRequest + require.NoError(t, stream.RecvMsg(&request)) + testhelper.ProtoEqual(t, requestSent, &request) + require.NoError(t, stream.SendMsg(nil)) } - streamers := map[string]grpc.StreamHandler{ - "Ping": pingStreamHandler, - "PingEmpty": pingEmptyStreamHandler, + emptyCallStreamHandler := func(t *testing.T, srv interface{}, stream grpc.ServerStream) { + var request grpc_testing.Empty + require.NoError(t, stream.RecvMsg(&request)) + require.NoError(t, stream.SendMsg(nil)) } - proxy.RegisterStreamHandlers(server, "mwitkow.testproto.TestService", streamers) - - serverSocketPath := testhelper.GetTemporaryGitalySocketFileName(t) - - listener, err := net.Listen("unix", serverSocketPath) - if err != nil { - t.Fatal(err) - } + for _, tc := range []struct { + desc string + registeredHandlers map[string]func(*testing.T, interface{}, grpc.ServerStream) + execute func(context.Context, *testing.T, grpc_testing.TestServiceClient) + expectedErr error + expectedCalls map[string]int + }{ + { + desc: "single handler", + registeredHandlers: map[string]func(*testing.T, interface{}, grpc.ServerStream){ + "UnaryCall": unaryCallStreamHandler, + }, + execute: func(ctx context.Context, t *testing.T, client grpc_testing.TestServiceClient) { + _, err := client.UnaryCall(ctx, requestSent) + require.NoError(t, err) + }, + expectedCalls: map[string]int{ + "UnaryCall": 1, + }, + }, + { + desc: "multiple handlers picks the right one", + registeredHandlers: map[string]func(*testing.T, interface{}, grpc.ServerStream){ + "UnaryCall": unaryCallStreamHandler, + "EmptyCall": emptyCallStreamHandler, + }, + execute: func(ctx context.Context, t *testing.T, client grpc_testing.TestServiceClient) { + _, err := client.EmptyCall(ctx, &grpc_testing.Empty{}) + require.NoError(t, err) + }, + expectedCalls: map[string]int{ + "EmptyCall": 1, + }, + }, + { + desc: "call to unregistered handler", + registeredHandlers: map[string]func(*testing.T, interface{}, grpc.ServerStream){ + "EmptyCall": emptyCallStreamHandler, + }, + execute: func(ctx context.Context, t *testing.T, client grpc_testing.TestServiceClient) { + _, err := client.UnaryCall(ctx, requestSent) + testhelper.RequireGrpcError(t, directorCalledError, err) + }, + expectedCalls: map[string]int{}, + }, + } { + t.Run(tc.desc, func(t *testing.T) { + ctx := testhelper.Context(t) - go server.Serve(listener) - defer server.Stop() + server := grpc.NewServer( + grpc.ForceServerCodec(proxy.NewCodec()), + grpc.UnknownServiceHandler(proxy.TransparentHandler( + func(ctx context.Context, fullMethodName string, peeker proxy.StreamPeeker) (*proxy.StreamParameters, error) { + return nil, directorCalledError + }, + )), + ) - cc, err := client.Dial("unix://"+serverSocketPath, []grpc.DialOption{grpc.WithBlock()}) - require.NoError(t, err) - defer cc.Close() + calls := map[string]int{} + registeredHandlers := map[string]grpc.StreamHandler{} + for name, handler := range tc.registeredHandlers { + name, handler := name, handler + + // We wrap every handler so that we can easily count the number of + // times each of them has been invoked. + registeredHandlers[name] = func(srv interface{}, stream grpc.ServerStream) error { + calls[name]++ + handler(t, srv, stream) + return nil + } + } + proxy.RegisterStreamHandlers(server, grpc_testing.TestService_ServiceDesc.ServiceName, registeredHandlers) - testServiceClient := pb.NewTestServiceClient(cc) - ctx := testhelper.Context(t) + listener := newListener(t) + go server.Serve(listener) + defer server.Stop() - _, err = testServiceClient.Ping(ctx, &pb.PingRequest{Value: pingValue}) - require.NoError(t, err) - require.True(t, pingStreamHandlerCalled) + conn, err := client.Dial("tcp://"+listener.Addr().String(), []grpc.DialOption{grpc.WithBlock()}) + require.NoError(t, err) + defer conn.Close() + client := grpc_testing.NewTestServiceClient(conn) - _, err = testServiceClient.PingEmpty(ctx, &pb.Empty{}) - require.NoError(t, err) - require.True(t, pingEmptyStreamHandlerCalled) + tc.execute(ctx, t, client) - // since PingError was never registered with its own streamer, it should get sent to the UnknownServiceHandler - _, err = testServiceClient.PingError(ctx, &pb.PingRequest{}) - testhelper.RequireGrpcError(t, status.Error(codes.Unknown, directorCalledError.Error()), err) + require.Equal(t, tc.expectedCalls, calls) + }) + } } diff --git a/internal/praefect/grpc-proxy/proxy/handler_test.go b/internal/praefect/grpc-proxy/proxy/handler_test.go index 8810bd7fc..a4fafab58 100644 --- a/internal/praefect/grpc-proxy/proxy/handler_test.go +++ b/internal/praefect/grpc-proxy/proxy/handler_test.go @@ -10,6 +10,8 @@ import ( ) func TestFailDestinationWithError(t *testing.T) { + t.Parallel() + expectedErr := errors.New("some error") t.Run("works with nil ErrHandlers", func(t *testing.T) { diff --git a/internal/praefect/grpc-proxy/proxy/helper_test.go b/internal/praefect/grpc-proxy/proxy/helper_test.go deleted file mode 100644 index d025c9c3a..000000000 --- a/internal/praefect/grpc-proxy/proxy/helper_test.go +++ /dev/null @@ -1,120 +0,0 @@ -//go:build !gitaly_test_sha256 - -package proxy_test - -import ( - "context" - "net" - "testing" - - "github.com/stretchr/testify/require" - "gitlab.com/gitlab-org/gitaly/v15/internal/praefect/grpc-proxy/proxy" - testservice "gitlab.com/gitlab-org/gitaly/v15/internal/praefect/grpc-proxy/testdata" - "google.golang.org/grpc" - "google.golang.org/grpc/credentials/insecure" -) - -func newListener(tb testing.TB) net.Listener { - listener, err := net.Listen("tcp", "127.0.0.1:0") - require.NoError(tb, err, "must be able to allocate a port for listener") - - return listener -} - -func newBackendPinger(tb testing.TB, ctx context.Context) (*grpc.ClientConn, *interceptPinger, func()) { - ip := &interceptPinger{} - - done := make(chan struct{}) - srvr := grpc.NewServer() - listener := newListener(tb) - - testservice.RegisterTestServiceServer(srvr, ip) - - go func() { - defer close(done) - srvr.Serve(listener) - }() - - cc, err := grpc.DialContext( - ctx, - listener.Addr().String(), - grpc.WithTransportCredentials(insecure.NewCredentials()), - grpc.WithBlock(), - grpc.WithDefaultCallOptions( - grpc.ForceCodec(proxy.NewCodec()), - ), - ) - require.NoError(tb, err) - - cleanup := func() { - srvr.GracefulStop() - require.NoError(tb, cc.Close()) - <-done - } - - return cc, ip, cleanup -} - -func newProxy(tb testing.TB, ctx context.Context, director proxy.StreamDirector, svc, method string) (*grpc.ClientConn, func()) { - proxySrvr := grpc.NewServer( - grpc.ForceServerCodec(proxy.NewCodec()), - grpc.UnknownServiceHandler(proxy.TransparentHandler(director)), - ) - - proxy.RegisterService(proxySrvr, director, svc, method) - - done := make(chan struct{}) - listener := newListener(tb) - - go func() { - defer close(done) - proxySrvr.Serve(listener) - }() - - proxyCC, err := grpc.DialContext( - ctx, - listener.Addr().String(), - grpc.WithTransportCredentials(insecure.NewCredentials()), - grpc.WithBlock(), - ) - require.NoError(tb, err) - - cleanup := func() { - proxySrvr.GracefulStop() - require.NoError(tb, proxyCC.Close()) - <-done - } - - return proxyCC, cleanup -} - -// interceptPinger allows an RPC to be intercepted with a custom -// function defined in each unit test -type interceptPinger struct { - testservice.UnimplementedTestServiceServer - pingStream func(testservice.TestService_PingStreamServer) error - pingEmpty func(context.Context, *testservice.Empty) (*testservice.PingResponse, error) - ping func(context.Context, *testservice.PingRequest) (*testservice.PingResponse, error) - pingError func(context.Context, *testservice.PingRequest) (*testservice.Empty, error) - pingList func(*testservice.PingRequest, testservice.TestService_PingListServer) error -} - -func (ip *interceptPinger) PingStream(stream testservice.TestService_PingStreamServer) error { - return ip.pingStream(stream) -} - -func (ip *interceptPinger) PingEmpty(ctx context.Context, req *testservice.Empty) (*testservice.PingResponse, error) { - return ip.pingEmpty(ctx, req) -} - -func (ip *interceptPinger) Ping(ctx context.Context, req *testservice.PingRequest) (*testservice.PingResponse, error) { - return ip.ping(ctx, req) -} - -func (ip *interceptPinger) PingError(ctx context.Context, req *testservice.PingRequest) (*testservice.Empty, error) { - return ip.pingError(ctx, req) -} - -func (ip *interceptPinger) PingList(req *testservice.PingRequest, stream testservice.TestService_PingListServer) error { - return ip.pingList(req, stream) -} diff --git a/internal/praefect/grpc-proxy/proxy/peeker_test.go b/internal/praefect/grpc-proxy/proxy/peeker_test.go index 276f175d9..9b63653e8 100644 --- a/internal/praefect/grpc-proxy/proxy/peeker_test.go +++ b/internal/praefect/grpc-proxy/proxy/peeker_test.go @@ -7,133 +7,148 @@ import ( "io" "testing" - "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "gitlab.com/gitlab-org/gitaly/v15/internal/metadata" "gitlab.com/gitlab-org/gitaly/v15/internal/praefect/grpc-proxy/proxy" - testservice "gitlab.com/gitlab-org/gitaly/v15/internal/praefect/grpc-proxy/testdata" "gitlab.com/gitlab-org/gitaly/v15/internal/testhelper" + "google.golang.org/grpc/test/grpc_testing" "google.golang.org/protobuf/proto" ) -// TestStreamPeeking demonstrates that a director function is able to peek -// into a stream. Further more, it demonstrates that peeking into a stream -// will not disturb the stream sent from the proxy client to the backend. +// TestStreamPeeking demonstrates that a director function is able to peek into a stream. Further +// more, it demonstrates that peeking into a stream will not disturb the stream sent from the proxy +// client to the backend. func TestStreamPeeking(t *testing.T) { + t.Parallel() + ctx := testhelper.Context(t) - backendCC, backendSrvr, cleanupPinger := newBackendPinger(t, ctx) - defer cleanupPinger() + backendCC, backendSrvr := newBackendPinger(t, ctx) - pingReqSent := &testservice.PingRequest{Value: "hi"} + requestSent := &grpc_testing.StreamingOutputCallRequest{ + Payload: &grpc_testing.Payload{ + Body: []byte("hi"), + }, + } + responseSent := &grpc_testing.StreamingOutputCallResponse{ + Payload: &grpc_testing.Payload{ + Body: []byte("bye"), + }, + } - // director will peek into stream before routing traffic - director := func(ctx context.Context, fullMethodName string, peeker proxy.StreamPeeker) (*proxy.StreamParameters, error) { - peekedMsg, err := peeker.Peek() + // We create a director that's peeking into the message in order to assert that the peeked + // message will still be seen by the client. + director := func(ctx context.Context, _ string, peeker proxy.StreamPeeker) (*proxy.StreamParameters, error) { + peekedMessage, err := peeker.Peek() require.NoError(t, err) - peekedRequest := &testservice.PingRequest{} - err = proto.Unmarshal(peekedMsg, peekedRequest) - require.NoError(t, err) - require.True(t, proto.Equal(pingReqSent, peekedRequest), "expected to be the same") + var peekedRequest grpc_testing.StreamingOutputCallRequest + require.NoError(t, proto.Unmarshal(peekedMessage, &peekedRequest)) + testhelper.ProtoEqual(t, requestSent, &peekedRequest) - return proxy.NewStreamParameters(proxy.Destination{Ctx: metadata.IncomingToOutgoing(ctx), Conn: backendCC, Msg: peekedMsg}, nil, nil, nil), nil + return proxy.NewStreamParameters(proxy.Destination{ + Ctx: metadata.IncomingToOutgoing(ctx), + Conn: backendCC, + Msg: peekedMessage, + }, nil, nil, nil), nil } - pingResp := &testservice.PingResponse{ - Counter: 1, - } - - // we expect the backend server to receive the peeked message - backendSrvr.pingStream = func(stream testservice.TestService_PingStreamServer) error { - pingReqReceived, err := stream.Recv() - assert.NoError(t, err) - assert.True(t, proto.Equal(pingReqSent, pingReqReceived), "expected to be the same") + // The backend is supposed to still receive the message as expected without any modification + // to it. + backendSrvr.fullDuplexCall = func(stream grpc_testing.TestService_FullDuplexCallServer) error { + requestReceived, err := stream.Recv() + require.NoError(t, err) + testhelper.ProtoEqual(t, requestSent, requestReceived) - return stream.Send(pingResp) + return stream.Send(responseSent) } - proxyCC, cleanupProxy := newProxy(t, ctx, director, "mwitkow.testproto.TestService", "PingStream") - defer cleanupProxy() - - proxyClient := testservice.NewTestServiceClient(proxyCC) + proxyConn := newProxy(t, ctx, director, "grpc_testing.TestService", "FullDuplexCall") + proxyClient := grpc_testing.NewTestServiceClient(proxyConn) - proxyClientPingStream, err := proxyClient.PingStream(ctx) + // Send the request on the stream and close the writing side. + proxyStream, err := proxyClient.FullDuplexCall(ctx) require.NoError(t, err) - defer func() { - require.NoError(t, proxyClientPingStream.CloseSend()) - }() + require.NoError(t, proxyStream.Send(requestSent)) + require.NoError(t, proxyStream.CloseSend()) - require.NoError(t, - proxyClientPingStream.Send(pingReqSent), - ) - - resp, err := proxyClientPingStream.Recv() + // And now verify that the response we've got in fact matches our expected response. + responseReceived, err := proxyStream.Recv() require.NoError(t, err) - require.True(t, proto.Equal(resp, pingResp), "expected to be the same") + testhelper.ProtoEqual(t, responseReceived, responseSent) - _, err = proxyClientPingStream.Recv() + _, err = proxyStream.Recv() require.Equal(t, io.EOF, err) } func TestStreamInjecting(t *testing.T) { + t.Parallel() + ctx := testhelper.Context(t) - backendCC, backendSrvr, cleanupPinger := newBackendPinger(t, ctx) - defer cleanupPinger() + backendCC, backendSrvr := newBackendPinger(t, ctx) - pingReqSent := &testservice.PingRequest{Value: "hi"} - newValue := "bye" + requestSent := &grpc_testing.StreamingOutputCallRequest{ + Payload: &grpc_testing.Payload{ + Body: []byte("hi"), + }, + } + requestReplaced := &grpc_testing.StreamingOutputCallRequest{ + Payload: &grpc_testing.Payload{ + Body: []byte("replaced"), + }, + } + responseSent := &grpc_testing.StreamingOutputCallResponse{ + Payload: &grpc_testing.Payload{ + Body: []byte("bye"), + }, + } - // director will peek into stream and change some frames + // We create a director that peeks the incoming request and in fact changes its values. This + // is to assert that the client receives the changed requests. director := func(ctx context.Context, fullMethodName string, peeker proxy.StreamPeeker) (*proxy.StreamParameters, error) { - peekedMsg, err := peeker.Peek() + peekedMessage, err := peeker.Peek() require.NoError(t, err) - peekedRequest := &testservice.PingRequest{} - require.NoError(t, proto.Unmarshal(peekedMsg, peekedRequest)) - require.Equal(t, "hi", peekedRequest.GetValue()) - - peekedRequest.Value = newValue + // Assert that we get the expected original ping request. + var peekedRequest grpc_testing.StreamingOutputCallRequest + require.NoError(t, proto.Unmarshal(peekedMessage, &peekedRequest)) + testhelper.ProtoEqual(t, requestSent, &peekedRequest) - newPayload, err := proto.Marshal(peekedRequest) + // Replace the value of the peeked request and send along the changed request. + replacedMessage, err := proto.Marshal(requestReplaced) require.NoError(t, err) - return proxy.NewStreamParameters(proxy.Destination{Ctx: metadata.IncomingToOutgoing(ctx), Conn: backendCC, Msg: newPayload}, nil, nil, nil), nil + return proxy.NewStreamParameters(proxy.Destination{ + Ctx: metadata.IncomingToOutgoing(ctx), + Conn: backendCC, + Msg: replacedMessage, + }, nil, nil, nil), nil } - pingResp := &testservice.PingResponse{ - Counter: 1, - } - - // we expect the backend server to receive the modified message - backendSrvr.pingStream = func(stream testservice.TestService_PingStreamServer) error { - pingReqReceived, err := stream.Recv() - assert.NoError(t, err) - assert.Equal(t, newValue, pingReqReceived.GetValue()) + // Upon receiving the request the backend server should only ever see the changed request. + backendSrvr.fullDuplexCall = func(stream grpc_testing.TestService_FullDuplexCallServer) error { + requestReceived, err := stream.Recv() + require.NoError(t, err) + testhelper.ProtoEqual(t, requestReplaced, requestReceived) - return stream.Send(pingResp) + return stream.Send(responseSent) } - proxyCC, cleanupProxy := newProxy(t, ctx, director, "mwitkow.testproto.TestService", "PingStream") - defer cleanupProxy() + proxyConn := newProxy(t, ctx, director, "grpc_testing.TestService", "FullDuplexCall") + proxyClient := grpc_testing.NewTestServiceClient(proxyConn) - proxyClient := testservice.NewTestServiceClient(proxyCC) - - proxyClientPingStream, err := proxyClient.PingStream(ctx) + proxyStream, err := proxyClient.FullDuplexCall(ctx) require.NoError(t, err) defer func() { - require.NoError(t, proxyClientPingStream.CloseSend()) + require.NoError(t, proxyStream.CloseSend()) }() + require.NoError(t, proxyStream.Send(requestSent)) - require.NoError(t, - proxyClientPingStream.Send(pingReqSent), - ) - - resp, err := proxyClientPingStream.Recv() + responseReceived, err := proxyStream.Recv() require.NoError(t, err) - require.True(t, proto.Equal(resp, pingResp), "expected to be the same") + testhelper.ProtoEqual(t, responseSent, responseReceived) - _, err = proxyClientPingStream.Recv() + _, err = proxyStream.Recv() require.Equal(t, io.EOF, err) } diff --git a/internal/praefect/grpc-proxy/proxy/testhelper_test.go b/internal/praefect/grpc-proxy/proxy/testhelper_test.go new file mode 100644 index 000000000..bc7ffca5e --- /dev/null +++ b/internal/praefect/grpc-proxy/proxy/testhelper_test.go @@ -0,0 +1,131 @@ +//go:build !gitaly_test_sha256 + +package proxy_test + +import ( + "context" + "net" + "testing" + + grpcmw "github.com/grpc-ecosystem/go-grpc-middleware" + grpcmwtags "github.com/grpc-ecosystem/go-grpc-middleware/tags" + "github.com/stretchr/testify/require" + "gitlab.com/gitlab-org/gitaly/v15/internal/helper/fieldextractors" + "gitlab.com/gitlab-org/gitaly/v15/internal/middleware/sentryhandler" + "gitlab.com/gitlab-org/gitaly/v15/internal/praefect/grpc-proxy/proxy" + "gitlab.com/gitlab-org/gitaly/v15/internal/testhelper" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" + "google.golang.org/grpc/test/grpc_testing" +) + +func TestMain(m *testing.M) { + testhelper.Run(m) +} + +func newListener(tb testing.TB) net.Listener { + listener, err := net.Listen("tcp", "127.0.0.1:0") + require.NoError(tb, err, "must be able to allocate a port for listener") + + return listener +} + +func newBackendPinger(tb testing.TB, ctx context.Context) (*grpc.ClientConn, *interceptPinger) { + ip := &interceptPinger{} + + srvr := grpc.NewServer() + listener := newListener(tb) + + grpc_testing.RegisterTestServiceServer(srvr, ip) + + done := make(chan struct{}) + go func() { + defer close(done) + srvr.Serve(listener) + }() + + cc, err := grpc.DialContext( + ctx, + listener.Addr().String(), + grpc.WithTransportCredentials(insecure.NewCredentials()), + grpc.WithBlock(), + grpc.WithDefaultCallOptions( + grpc.ForceCodec(proxy.NewCodec()), + ), + ) + require.NoError(tb, err) + + tb.Cleanup(func() { + srvr.GracefulStop() + require.NoError(tb, cc.Close()) + <-done + }) + + return cc, ip +} + +func newProxy(tb testing.TB, ctx context.Context, director proxy.StreamDirector, svc, method string) *grpc.ClientConn { + proxySrvr := grpc.NewServer( + grpc.ForceServerCodec(proxy.NewCodec()), + grpc.StreamInterceptor( + grpcmw.ChainStreamServer( + // context tags usage is required by sentryhandler.StreamLogHandler + grpcmwtags.StreamServerInterceptor(grpcmwtags.WithFieldExtractorForInitialReq(fieldextractors.FieldExtractor)), + // sentry middleware to capture errors + sentryhandler.StreamLogHandler, + ), + ), + grpc.UnknownServiceHandler(proxy.TransparentHandler(director)), + ) + proxy.RegisterService(proxySrvr, director, svc, method) + + done := make(chan struct{}) + listener := newListener(tb) + go func() { + defer close(done) + proxySrvr.Serve(listener) + }() + + proxyCC, err := grpc.DialContext( + ctx, + listener.Addr().String(), + grpc.WithTransportCredentials(insecure.NewCredentials()), + grpc.WithBlock(), + ) + require.NoError(tb, err) + + tb.Cleanup(func() { + proxySrvr.GracefulStop() + require.NoError(tb, proxyCC.Close()) + <-done + }) + + return proxyCC +} + +// interceptPinger allows an RPC to be intercepted with a custom +// function defined in each unit test +type interceptPinger struct { + grpc_testing.UnimplementedTestServiceServer + + fullDuplexCall func(grpc_testing.TestService_FullDuplexCallServer) error + emptyCall func(context.Context, *grpc_testing.Empty) (*grpc_testing.Empty, error) + unaryCall func(context.Context, *grpc_testing.SimpleRequest) (*grpc_testing.SimpleResponse, error) + streamingOutputCall func(*grpc_testing.StreamingOutputCallRequest, grpc_testing.TestService_StreamingOutputCallServer) error +} + +func (ip *interceptPinger) FullDuplexCall(stream grpc_testing.TestService_FullDuplexCallServer) error { + return ip.fullDuplexCall(stream) +} + +func (ip *interceptPinger) EmptyCall(ctx context.Context, req *grpc_testing.Empty) (*grpc_testing.Empty, error) { + return ip.emptyCall(ctx, req) +} + +func (ip *interceptPinger) UnaryCall(ctx context.Context, req *grpc_testing.SimpleRequest) (*grpc_testing.SimpleResponse, error) { + return ip.unaryCall(ctx, req) +} + +func (ip *interceptPinger) StreamingOutputCall(req *grpc_testing.StreamingOutputCallRequest, stream grpc_testing.TestService_StreamingOutputCallServer) error { + return ip.streamingOutputCall(req, stream) +} diff --git a/internal/praefect/grpc-proxy/testdata/test.pb.go b/internal/praefect/grpc-proxy/testdata/test.pb.go deleted file mode 100644 index ec62c5466..000000000 --- a/internal/praefect/grpc-proxy/testdata/test.pb.go +++ /dev/null @@ -1,306 +0,0 @@ -// Code generated by protoc-gen-go. DO NOT EDIT. -// versions: -// protoc-gen-go v1.28.0 -// protoc v3.21.1 -// source: praefect/grpc-proxy/testdata/test.proto - -package testdata - -import ( - protoreflect "google.golang.org/protobuf/reflect/protoreflect" - protoimpl "google.golang.org/protobuf/runtime/protoimpl" - reflect "reflect" - sync "sync" -) - -const ( - // Verify that this generated code is sufficiently up-to-date. - _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) - // Verify that runtime/protoimpl is sufficiently up-to-date. - _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) -) - -type Empty struct { - state protoimpl.MessageState - sizeCache protoimpl.SizeCache - unknownFields protoimpl.UnknownFields -} - -func (x *Empty) Reset() { - *x = Empty{} - if protoimpl.UnsafeEnabled { - mi := &file_praefect_grpc_proxy_testdata_test_proto_msgTypes[0] - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - ms.StoreMessageInfo(mi) - } -} - -func (x *Empty) String() string { - return protoimpl.X.MessageStringOf(x) -} - -func (*Empty) ProtoMessage() {} - -func (x *Empty) ProtoReflect() protoreflect.Message { - mi := &file_praefect_grpc_proxy_testdata_test_proto_msgTypes[0] - if protoimpl.UnsafeEnabled && x != nil { - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - if ms.LoadMessageInfo() == nil { - ms.StoreMessageInfo(mi) - } - return ms - } - return mi.MessageOf(x) -} - -// Deprecated: Use Empty.ProtoReflect.Descriptor instead. -func (*Empty) Descriptor() ([]byte, []int) { - return file_praefect_grpc_proxy_testdata_test_proto_rawDescGZIP(), []int{0} -} - -type PingRequest struct { - state protoimpl.MessageState - sizeCache protoimpl.SizeCache - unknownFields protoimpl.UnknownFields - - Value string `protobuf:"bytes,1,opt,name=value,proto3" json:"value,omitempty"` -} - -func (x *PingRequest) Reset() { - *x = PingRequest{} - if protoimpl.UnsafeEnabled { - mi := &file_praefect_grpc_proxy_testdata_test_proto_msgTypes[1] - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - ms.StoreMessageInfo(mi) - } -} - -func (x *PingRequest) String() string { - return protoimpl.X.MessageStringOf(x) -} - -func (*PingRequest) ProtoMessage() {} - -func (x *PingRequest) ProtoReflect() protoreflect.Message { - mi := &file_praefect_grpc_proxy_testdata_test_proto_msgTypes[1] - if protoimpl.UnsafeEnabled && x != nil { - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - if ms.LoadMessageInfo() == nil { - ms.StoreMessageInfo(mi) - } - return ms - } - return mi.MessageOf(x) -} - -// Deprecated: Use PingRequest.ProtoReflect.Descriptor instead. -func (*PingRequest) Descriptor() ([]byte, []int) { - return file_praefect_grpc_proxy_testdata_test_proto_rawDescGZIP(), []int{1} -} - -func (x *PingRequest) GetValue() string { - if x != nil { - return x.Value - } - return "" -} - -type PingResponse struct { - state protoimpl.MessageState - sizeCache protoimpl.SizeCache - unknownFields protoimpl.UnknownFields - - Value string `protobuf:"bytes,1,opt,name=Value,proto3" json:"Value,omitempty"` - Counter int32 `protobuf:"varint,2,opt,name=counter,proto3" json:"counter,omitempty"` -} - -func (x *PingResponse) Reset() { - *x = PingResponse{} - if protoimpl.UnsafeEnabled { - mi := &file_praefect_grpc_proxy_testdata_test_proto_msgTypes[2] - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - ms.StoreMessageInfo(mi) - } -} - -func (x *PingResponse) String() string { - return protoimpl.X.MessageStringOf(x) -} - -func (*PingResponse) ProtoMessage() {} - -func (x *PingResponse) ProtoReflect() protoreflect.Message { - mi := &file_praefect_grpc_proxy_testdata_test_proto_msgTypes[2] - if protoimpl.UnsafeEnabled && x != nil { - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - if ms.LoadMessageInfo() == nil { - ms.StoreMessageInfo(mi) - } - return ms - } - return mi.MessageOf(x) -} - -// Deprecated: Use PingResponse.ProtoReflect.Descriptor instead. -func (*PingResponse) Descriptor() ([]byte, []int) { - return file_praefect_grpc_proxy_testdata_test_proto_rawDescGZIP(), []int{2} -} - -func (x *PingResponse) GetValue() string { - if x != nil { - return x.Value - } - return "" -} - -func (x *PingResponse) GetCounter() int32 { - if x != nil { - return x.Counter - } - return 0 -} - -var File_praefect_grpc_proxy_testdata_test_proto protoreflect.FileDescriptor - -var file_praefect_grpc_proxy_testdata_test_proto_rawDesc = []byte{ - 0x0a, 0x27, 0x70, 0x72, 0x61, 0x65, 0x66, 0x65, 0x63, 0x74, 0x2f, 0x67, 0x72, 0x70, 0x63, 0x2d, - 0x70, 0x72, 0x6f, 0x78, 0x79, 0x2f, 0x74, 0x65, 0x73, 0x74, 0x64, 0x61, 0x74, 0x61, 0x2f, 0x74, - 0x65, 0x73, 0x74, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x11, 0x6d, 0x77, 0x69, 0x74, 0x6b, - 0x6f, 0x77, 0x2e, 0x74, 0x65, 0x73, 0x74, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0x07, 0x0a, 0x05, - 0x45, 0x6d, 0x70, 0x74, 0x79, 0x22, 0x23, 0x0a, 0x0b, 0x50, 0x69, 0x6e, 0x67, 0x52, 0x65, 0x71, - 0x75, 0x65, 0x73, 0x74, 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x01, 0x20, - 0x01, 0x28, 0x09, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x22, 0x3e, 0x0a, 0x0c, 0x50, 0x69, - 0x6e, 0x67, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x14, 0x0a, 0x05, 0x56, 0x61, - 0x6c, 0x75, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x56, 0x61, 0x6c, 0x75, 0x65, - 0x12, 0x18, 0x0a, 0x07, 0x63, 0x6f, 0x75, 0x6e, 0x74, 0x65, 0x72, 0x18, 0x02, 0x20, 0x01, 0x28, - 0x05, 0x52, 0x07, 0x63, 0x6f, 0x75, 0x6e, 0x74, 0x65, 0x72, 0x32, 0x91, 0x03, 0x0a, 0x0b, 0x54, - 0x65, 0x73, 0x74, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x12, 0x48, 0x0a, 0x09, 0x50, 0x69, - 0x6e, 0x67, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x12, 0x18, 0x2e, 0x6d, 0x77, 0x69, 0x74, 0x6b, 0x6f, - 0x77, 0x2e, 0x74, 0x65, 0x73, 0x74, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x45, 0x6d, 0x70, 0x74, - 0x79, 0x1a, 0x1f, 0x2e, 0x6d, 0x77, 0x69, 0x74, 0x6b, 0x6f, 0x77, 0x2e, 0x74, 0x65, 0x73, 0x74, - 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x50, 0x69, 0x6e, 0x67, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, - 0x73, 0x65, 0x22, 0x00, 0x12, 0x49, 0x0a, 0x04, 0x50, 0x69, 0x6e, 0x67, 0x12, 0x1e, 0x2e, 0x6d, - 0x77, 0x69, 0x74, 0x6b, 0x6f, 0x77, 0x2e, 0x74, 0x65, 0x73, 0x74, 0x70, 0x72, 0x6f, 0x74, 0x6f, - 0x2e, 0x50, 0x69, 0x6e, 0x67, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1f, 0x2e, 0x6d, - 0x77, 0x69, 0x74, 0x6b, 0x6f, 0x77, 0x2e, 0x74, 0x65, 0x73, 0x74, 0x70, 0x72, 0x6f, 0x74, 0x6f, - 0x2e, 0x50, 0x69, 0x6e, 0x67, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, - 0x47, 0x0a, 0x09, 0x50, 0x69, 0x6e, 0x67, 0x45, 0x72, 0x72, 0x6f, 0x72, 0x12, 0x1e, 0x2e, 0x6d, - 0x77, 0x69, 0x74, 0x6b, 0x6f, 0x77, 0x2e, 0x74, 0x65, 0x73, 0x74, 0x70, 0x72, 0x6f, 0x74, 0x6f, - 0x2e, 0x50, 0x69, 0x6e, 0x67, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x18, 0x2e, 0x6d, - 0x77, 0x69, 0x74, 0x6b, 0x6f, 0x77, 0x2e, 0x74, 0x65, 0x73, 0x74, 0x70, 0x72, 0x6f, 0x74, 0x6f, - 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x22, 0x00, 0x12, 0x4f, 0x0a, 0x08, 0x50, 0x69, 0x6e, 0x67, - 0x4c, 0x69, 0x73, 0x74, 0x12, 0x1e, 0x2e, 0x6d, 0x77, 0x69, 0x74, 0x6b, 0x6f, 0x77, 0x2e, 0x74, - 0x65, 0x73, 0x74, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x50, 0x69, 0x6e, 0x67, 0x52, 0x65, 0x71, - 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1f, 0x2e, 0x6d, 0x77, 0x69, 0x74, 0x6b, 0x6f, 0x77, 0x2e, 0x74, - 0x65, 0x73, 0x74, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x50, 0x69, 0x6e, 0x67, 0x52, 0x65, 0x73, - 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x30, 0x01, 0x12, 0x53, 0x0a, 0x0a, 0x50, 0x69, 0x6e, - 0x67, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x12, 0x1e, 0x2e, 0x6d, 0x77, 0x69, 0x74, 0x6b, 0x6f, - 0x77, 0x2e, 0x74, 0x65, 0x73, 0x74, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x50, 0x69, 0x6e, 0x67, - 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1f, 0x2e, 0x6d, 0x77, 0x69, 0x74, 0x6b, 0x6f, - 0x77, 0x2e, 0x74, 0x65, 0x73, 0x74, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x50, 0x69, 0x6e, 0x67, - 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x28, 0x01, 0x30, 0x01, 0x42, 0x48, - 0x5a, 0x46, 0x67, 0x69, 0x74, 0x6c, 0x61, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x67, 0x69, 0x74, - 0x6c, 0x61, 0x62, 0x2d, 0x6f, 0x72, 0x67, 0x2f, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2f, 0x76, - 0x31, 0x35, 0x2f, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x2f, 0x70, 0x72, 0x61, 0x65, - 0x66, 0x65, 0x63, 0x74, 0x2f, 0x67, 0x72, 0x70, 0x63, 0x2d, 0x70, 0x72, 0x6f, 0x78, 0x79, 0x2f, - 0x74, 0x65, 0x73, 0x74, 0x64, 0x61, 0x74, 0x61, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, -} - -var ( - file_praefect_grpc_proxy_testdata_test_proto_rawDescOnce sync.Once - file_praefect_grpc_proxy_testdata_test_proto_rawDescData = file_praefect_grpc_proxy_testdata_test_proto_rawDesc -) - -func file_praefect_grpc_proxy_testdata_test_proto_rawDescGZIP() []byte { - file_praefect_grpc_proxy_testdata_test_proto_rawDescOnce.Do(func() { - file_praefect_grpc_proxy_testdata_test_proto_rawDescData = protoimpl.X.CompressGZIP(file_praefect_grpc_proxy_testdata_test_proto_rawDescData) - }) - return file_praefect_grpc_proxy_testdata_test_proto_rawDescData -} - -var file_praefect_grpc_proxy_testdata_test_proto_msgTypes = make([]protoimpl.MessageInfo, 3) -var file_praefect_grpc_proxy_testdata_test_proto_goTypes = []interface{}{ - (*Empty)(nil), // 0: mwitkow.testproto.Empty - (*PingRequest)(nil), // 1: mwitkow.testproto.PingRequest - (*PingResponse)(nil), // 2: mwitkow.testproto.PingResponse -} -var file_praefect_grpc_proxy_testdata_test_proto_depIdxs = []int32{ - 0, // 0: mwitkow.testproto.TestService.PingEmpty:input_type -> mwitkow.testproto.Empty - 1, // 1: mwitkow.testproto.TestService.Ping:input_type -> mwitkow.testproto.PingRequest - 1, // 2: mwitkow.testproto.TestService.PingError:input_type -> mwitkow.testproto.PingRequest - 1, // 3: mwitkow.testproto.TestService.PingList:input_type -> mwitkow.testproto.PingRequest - 1, // 4: mwitkow.testproto.TestService.PingStream:input_type -> mwitkow.testproto.PingRequest - 2, // 5: mwitkow.testproto.TestService.PingEmpty:output_type -> mwitkow.testproto.PingResponse - 2, // 6: mwitkow.testproto.TestService.Ping:output_type -> mwitkow.testproto.PingResponse - 0, // 7: mwitkow.testproto.TestService.PingError:output_type -> mwitkow.testproto.Empty - 2, // 8: mwitkow.testproto.TestService.PingList:output_type -> mwitkow.testproto.PingResponse - 2, // 9: mwitkow.testproto.TestService.PingStream:output_type -> mwitkow.testproto.PingResponse - 5, // [5:10] is the sub-list for method output_type - 0, // [0:5] is the sub-list for method input_type - 0, // [0:0] is the sub-list for extension type_name - 0, // [0:0] is the sub-list for extension extendee - 0, // [0:0] is the sub-list for field type_name -} - -func init() { file_praefect_grpc_proxy_testdata_test_proto_init() } -func file_praefect_grpc_proxy_testdata_test_proto_init() { - if File_praefect_grpc_proxy_testdata_test_proto != nil { - return - } - if !protoimpl.UnsafeEnabled { - file_praefect_grpc_proxy_testdata_test_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*Empty); i { - case 0: - return &v.state - case 1: - return &v.sizeCache - case 2: - return &v.unknownFields - default: - return nil - } - } - file_praefect_grpc_proxy_testdata_test_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*PingRequest); i { - case 0: - return &v.state - case 1: - return &v.sizeCache - case 2: - return &v.unknownFields - default: - return nil - } - } - file_praefect_grpc_proxy_testdata_test_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*PingResponse); i { - case 0: - return &v.state - case 1: - return &v.sizeCache - case 2: - return &v.unknownFields - default: - return nil - } - } - } - type x struct{} - out := protoimpl.TypeBuilder{ - File: protoimpl.DescBuilder{ - GoPackagePath: reflect.TypeOf(x{}).PkgPath(), - RawDescriptor: file_praefect_grpc_proxy_testdata_test_proto_rawDesc, - NumEnums: 0, - NumMessages: 3, - NumExtensions: 0, - NumServices: 1, - }, - GoTypes: file_praefect_grpc_proxy_testdata_test_proto_goTypes, - DependencyIndexes: file_praefect_grpc_proxy_testdata_test_proto_depIdxs, - MessageInfos: file_praefect_grpc_proxy_testdata_test_proto_msgTypes, - }.Build() - File_praefect_grpc_proxy_testdata_test_proto = out.File - file_praefect_grpc_proxy_testdata_test_proto_rawDesc = nil - file_praefect_grpc_proxy_testdata_test_proto_goTypes = nil - file_praefect_grpc_proxy_testdata_test_proto_depIdxs = nil -} diff --git a/internal/praefect/grpc-proxy/testdata/test.proto b/internal/praefect/grpc-proxy/testdata/test.proto deleted file mode 100644 index 227d3cbeb..000000000 --- a/internal/praefect/grpc-proxy/testdata/test.proto +++ /dev/null @@ -1,31 +0,0 @@ -syntax = "proto3"; - -package mwitkow.testproto; - -option go_package = "gitlab.com/gitlab-org/gitaly/v15/internal/praefect/grpc-proxy/testdata"; - -message Empty { -} - -message PingRequest { - string value = 1; -} - -message PingResponse { - string Value = 1; - int32 counter = 2; -} - -service TestService { - rpc PingEmpty(Empty) returns (PingResponse) {} - - rpc Ping(PingRequest) returns (PingResponse) {} - - rpc PingError(PingRequest) returns (Empty) {} - - rpc PingList(PingRequest) returns (stream PingResponse) {} - - rpc PingStream(stream PingRequest) returns (stream PingResponse) {} - -} - diff --git a/internal/praefect/grpc-proxy/testdata/test_grpc.pb.go b/internal/praefect/grpc-proxy/testdata/test_grpc.pb.go deleted file mode 100644 index d1b660df2..000000000 --- a/internal/praefect/grpc-proxy/testdata/test_grpc.pb.go +++ /dev/null @@ -1,309 +0,0 @@ -// Code generated by protoc-gen-go-grpc. DO NOT EDIT. -// versions: -// - protoc-gen-go-grpc v1.2.0 -// - protoc v3.21.1 -// source: praefect/grpc-proxy/testdata/test.proto - -package testdata - -import ( - context "context" - grpc "google.golang.org/grpc" - codes "google.golang.org/grpc/codes" - status "google.golang.org/grpc/status" -) - -// This is a compile-time assertion to ensure that this generated file -// is compatible with the grpc package it is being compiled against. -// Requires gRPC-Go v1.32.0 or later. -const _ = grpc.SupportPackageIsVersion7 - -// TestServiceClient is the client API for TestService service. -// -// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream. -type TestServiceClient interface { - PingEmpty(ctx context.Context, in *Empty, opts ...grpc.CallOption) (*PingResponse, error) - Ping(ctx context.Context, in *PingRequest, opts ...grpc.CallOption) (*PingResponse, error) - PingError(ctx context.Context, in *PingRequest, opts ...grpc.CallOption) (*Empty, error) - PingList(ctx context.Context, in *PingRequest, opts ...grpc.CallOption) (TestService_PingListClient, error) - PingStream(ctx context.Context, opts ...grpc.CallOption) (TestService_PingStreamClient, error) -} - -type testServiceClient struct { - cc grpc.ClientConnInterface -} - -func NewTestServiceClient(cc grpc.ClientConnInterface) TestServiceClient { - return &testServiceClient{cc} -} - -func (c *testServiceClient) PingEmpty(ctx context.Context, in *Empty, opts ...grpc.CallOption) (*PingResponse, error) { - out := new(PingResponse) - err := c.cc.Invoke(ctx, "/mwitkow.testproto.TestService/PingEmpty", in, out, opts...) - if err != nil { - return nil, err - } - return out, nil -} - -func (c *testServiceClient) Ping(ctx context.Context, in *PingRequest, opts ...grpc.CallOption) (*PingResponse, error) { - out := new(PingResponse) - err := c.cc.Invoke(ctx, "/mwitkow.testproto.TestService/Ping", in, out, opts...) - if err != nil { - return nil, err - } - return out, nil -} - -func (c *testServiceClient) PingError(ctx context.Context, in *PingRequest, opts ...grpc.CallOption) (*Empty, error) { - out := new(Empty) - err := c.cc.Invoke(ctx, "/mwitkow.testproto.TestService/PingError", in, out, opts...) - if err != nil { - return nil, err - } - return out, nil -} - -func (c *testServiceClient) PingList(ctx context.Context, in *PingRequest, opts ...grpc.CallOption) (TestService_PingListClient, error) { - stream, err := c.cc.NewStream(ctx, &TestService_ServiceDesc.Streams[0], "/mwitkow.testproto.TestService/PingList", opts...) - if err != nil { - return nil, err - } - x := &testServicePingListClient{stream} - if err := x.ClientStream.SendMsg(in); err != nil { - return nil, err - } - if err := x.ClientStream.CloseSend(); err != nil { - return nil, err - } - return x, nil -} - -type TestService_PingListClient interface { - Recv() (*PingResponse, error) - grpc.ClientStream -} - -type testServicePingListClient struct { - grpc.ClientStream -} - -func (x *testServicePingListClient) Recv() (*PingResponse, error) { - m := new(PingResponse) - if err := x.ClientStream.RecvMsg(m); err != nil { - return nil, err - } - return m, nil -} - -func (c *testServiceClient) PingStream(ctx context.Context, opts ...grpc.CallOption) (TestService_PingStreamClient, error) { - stream, err := c.cc.NewStream(ctx, &TestService_ServiceDesc.Streams[1], "/mwitkow.testproto.TestService/PingStream", opts...) - if err != nil { - return nil, err - } - x := &testServicePingStreamClient{stream} - return x, nil -} - -type TestService_PingStreamClient interface { - Send(*PingRequest) error - Recv() (*PingResponse, error) - grpc.ClientStream -} - -type testServicePingStreamClient struct { - grpc.ClientStream -} - -func (x *testServicePingStreamClient) Send(m *PingRequest) error { - return x.ClientStream.SendMsg(m) -} - -func (x *testServicePingStreamClient) Recv() (*PingResponse, error) { - m := new(PingResponse) - if err := x.ClientStream.RecvMsg(m); err != nil { - return nil, err - } - return m, nil -} - -// TestServiceServer is the server API for TestService service. -// All implementations must embed UnimplementedTestServiceServer -// for forward compatibility -type TestServiceServer interface { - PingEmpty(context.Context, *Empty) (*PingResponse, error) - Ping(context.Context, *PingRequest) (*PingResponse, error) - PingError(context.Context, *PingRequest) (*Empty, error) - PingList(*PingRequest, TestService_PingListServer) error - PingStream(TestService_PingStreamServer) error - mustEmbedUnimplementedTestServiceServer() -} - -// UnimplementedTestServiceServer must be embedded to have forward compatible implementations. -type UnimplementedTestServiceServer struct { -} - -func (UnimplementedTestServiceServer) PingEmpty(context.Context, *Empty) (*PingResponse, error) { - return nil, status.Errorf(codes.Unimplemented, "method PingEmpty not implemented") -} -func (UnimplementedTestServiceServer) Ping(context.Context, *PingRequest) (*PingResponse, error) { - return nil, status.Errorf(codes.Unimplemented, "method Ping not implemented") -} -func (UnimplementedTestServiceServer) PingError(context.Context, *PingRequest) (*Empty, error) { - return nil, status.Errorf(codes.Unimplemented, "method PingError not implemented") -} -func (UnimplementedTestServiceServer) PingList(*PingRequest, TestService_PingListServer) error { - return status.Errorf(codes.Unimplemented, "method PingList not implemented") -} -func (UnimplementedTestServiceServer) PingStream(TestService_PingStreamServer) error { - return status.Errorf(codes.Unimplemented, "method PingStream not implemented") -} -func (UnimplementedTestServiceServer) mustEmbedUnimplementedTestServiceServer() {} - -// UnsafeTestServiceServer may be embedded to opt out of forward compatibility for this service. -// Use of this interface is not recommended, as added methods to TestServiceServer will -// result in compilation errors. -type UnsafeTestServiceServer interface { - mustEmbedUnimplementedTestServiceServer() -} - -func RegisterTestServiceServer(s grpc.ServiceRegistrar, srv TestServiceServer) { - s.RegisterService(&TestService_ServiceDesc, srv) -} - -func _TestService_PingEmpty_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { - in := new(Empty) - if err := dec(in); err != nil { - return nil, err - } - if interceptor == nil { - return srv.(TestServiceServer).PingEmpty(ctx, in) - } - info := &grpc.UnaryServerInfo{ - Server: srv, - FullMethod: "/mwitkow.testproto.TestService/PingEmpty", - } - handler := func(ctx context.Context, req interface{}) (interface{}, error) { - return srv.(TestServiceServer).PingEmpty(ctx, req.(*Empty)) - } - return interceptor(ctx, in, info, handler) -} - -func _TestService_Ping_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { - in := new(PingRequest) - if err := dec(in); err != nil { - return nil, err - } - if interceptor == nil { - return srv.(TestServiceServer).Ping(ctx, in) - } - info := &grpc.UnaryServerInfo{ - Server: srv, - FullMethod: "/mwitkow.testproto.TestService/Ping", - } - handler := func(ctx context.Context, req interface{}) (interface{}, error) { - return srv.(TestServiceServer).Ping(ctx, req.(*PingRequest)) - } - return interceptor(ctx, in, info, handler) -} - -func _TestService_PingError_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { - in := new(PingRequest) - if err := dec(in); err != nil { - return nil, err - } - if interceptor == nil { - return srv.(TestServiceServer).PingError(ctx, in) - } - info := &grpc.UnaryServerInfo{ - Server: srv, - FullMethod: "/mwitkow.testproto.TestService/PingError", - } - handler := func(ctx context.Context, req interface{}) (interface{}, error) { - return srv.(TestServiceServer).PingError(ctx, req.(*PingRequest)) - } - return interceptor(ctx, in, info, handler) -} - -func _TestService_PingList_Handler(srv interface{}, stream grpc.ServerStream) error { - m := new(PingRequest) - if err := stream.RecvMsg(m); err != nil { - return err - } - return srv.(TestServiceServer).PingList(m, &testServicePingListServer{stream}) -} - -type TestService_PingListServer interface { - Send(*PingResponse) error - grpc.ServerStream -} - -type testServicePingListServer struct { - grpc.ServerStream -} - -func (x *testServicePingListServer) Send(m *PingResponse) error { - return x.ServerStream.SendMsg(m) -} - -func _TestService_PingStream_Handler(srv interface{}, stream grpc.ServerStream) error { - return srv.(TestServiceServer).PingStream(&testServicePingStreamServer{stream}) -} - -type TestService_PingStreamServer interface { - Send(*PingResponse) error - Recv() (*PingRequest, error) - grpc.ServerStream -} - -type testServicePingStreamServer struct { - grpc.ServerStream -} - -func (x *testServicePingStreamServer) Send(m *PingResponse) error { - return x.ServerStream.SendMsg(m) -} - -func (x *testServicePingStreamServer) Recv() (*PingRequest, error) { - m := new(PingRequest) - if err := x.ServerStream.RecvMsg(m); err != nil { - return nil, err - } - return m, nil -} - -// TestService_ServiceDesc is the grpc.ServiceDesc for TestService service. -// It's only intended for direct use with grpc.RegisterService, -// and not to be introspected or modified (even as a copy) -var TestService_ServiceDesc = grpc.ServiceDesc{ - ServiceName: "mwitkow.testproto.TestService", - HandlerType: (*TestServiceServer)(nil), - Methods: []grpc.MethodDesc{ - { - MethodName: "PingEmpty", - Handler: _TestService_PingEmpty_Handler, - }, - { - MethodName: "Ping", - Handler: _TestService_Ping_Handler, - }, - { - MethodName: "PingError", - Handler: _TestService_PingError_Handler, - }, - }, - Streams: []grpc.StreamDesc{ - { - StreamName: "PingList", - Handler: _TestService_PingList_Handler, - ServerStreams: true, - }, - { - StreamName: "PingStream", - Handler: _TestService_PingStream_Handler, - ServerStreams: true, - ClientStreams: true, - }, - }, - Metadata: "praefect/grpc-proxy/testdata/test.proto", -} |