diff options
author | Patrick Steinhardt <psteinhardt@gitlab.com> | 2023-09-15 13:25:55 +0300 |
---|---|---|
committer | Patrick Steinhardt <psteinhardt@gitlab.com> | 2023-09-15 13:25:55 +0300 |
commit | 7f83c086aafe17cb36fd2f544e4cdb3564694280 (patch) | |
tree | c187f710d79a5203794bbf8e877e90e89249ce14 | |
parent | 16cd7e7fa2bb9afdd7061ae6187f739dc2fe120f (diff) | |
parent | 52a14788903be8b13c1e7acd87b52ff986a3c4f8 (diff) |
Merge branch 'pks-log-move-payload-bytes-test' into 'master'
log: Move PayloadBytes tests into grpcstats package
See merge request https://gitlab.com/gitlab-org/gitaly/-/merge_requests/6370
Merged-by: Patrick Steinhardt <psteinhardt@gitlab.com>
Approved-by: karthik nayak <knayak@gitlab.com>
-rw-r--r-- | internal/grpc/grpcstats/stats_test.go | 165 | ||||
-rw-r--r-- | internal/log/middleware_test.go | 150 |
2 files changed, 165 insertions, 150 deletions
diff --git a/internal/grpc/grpcstats/stats_test.go b/internal/grpc/grpcstats/stats_test.go index 84d13930c..9b334bfb8 100644 --- a/internal/grpc/grpcstats/stats_test.go +++ b/internal/grpc/grpcstats/stats_test.go @@ -1,15 +1,173 @@ package grpcstats import ( + "context" + "io" + "net" + "os" + "sync" "testing" + grpcmwlogrus "github.com/grpc-ecosystem/go-grpc-middleware/logging/logrus" "github.com/sirupsen/logrus" + "github.com/sirupsen/logrus/hooks/test" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/client" + "gitlab.com/gitlab-org/gitaly/v16/internal/log" "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper" + "google.golang.org/grpc" + "google.golang.org/grpc/interop/grpc_testing" "google.golang.org/grpc/stats" + "google.golang.org/protobuf/proto" ) +func newStubPayload() *grpc_testing.Payload { + return &grpc_testing.Payload{Body: []byte("stub")} +} + +type testService struct { + grpc_testing.UnimplementedTestServiceServer +} + +func (ts testService) UnaryCall(context.Context, *grpc_testing.SimpleRequest) (*grpc_testing.SimpleResponse, error) { + return &grpc_testing.SimpleResponse{Payload: newStubPayload()}, nil +} + +func (ts testService) HalfDuplexCall(stream grpc_testing.TestService_HalfDuplexCallServer) error { + for { + if _, err := stream.Recv(); err != nil { + if err == io.EOF { + break + } + return err + } + } + + resp := &grpc_testing.StreamingOutputCallResponse{Payload: newStubPayload()} + if err := stream.Send(proto.Clone(resp).(*grpc_testing.StreamingOutputCallResponse)); err != nil { + return err + } + return stream.Send(proto.Clone(resp).(*grpc_testing.StreamingOutputCallResponse)) +} + +func TestPayloadBytes(t *testing.T) { + t.Parallel() + + ctx := testhelper.Context(t) + logger, hook := test.NewNullLogger() + + opts := []grpc.ServerOption{ + grpc.StatsHandler(log.PerRPCLogHandler{ + Underlying: &PayloadBytes{}, + FieldProducers: []log.FieldsProducer{FieldsProducer}, + }), + grpc.ChainUnaryInterceptor( + grpcmwlogrus.UnaryServerInterceptor( + logrus.NewEntry(logger), + grpcmwlogrus.WithMessageProducer( + log.MessageProducer( + log.PropagationMessageProducer(grpcmwlogrus.DefaultMessageProducer), + FieldsProducer, + ), + ), + ), + log.UnaryLogDataCatcherServerInterceptor(), + ), + grpc.ChainStreamInterceptor( + grpcmwlogrus.StreamServerInterceptor( + logrus.NewEntry(logger), + grpcmwlogrus.WithMessageProducer( + log.MessageProducer( + log.PropagationMessageProducer(grpcmwlogrus.DefaultMessageProducer), + FieldsProducer, + ), + ), + ), + log.StreamLogDataCatcherServerInterceptor(), + ), + } + + srv := grpc.NewServer(opts...) + grpc_testing.RegisterTestServiceServer(srv, testService{}) + sock, err := os.CreateTemp("", "") + require.NoError(t, err) + require.NoError(t, sock.Close()) + require.NoError(t, os.RemoveAll(sock.Name())) + t.Cleanup(func() { require.NoError(t, os.RemoveAll(sock.Name())) }) + + lis, err := net.Listen("unix", sock.Name()) + require.NoError(t, err) + + t.Cleanup(srv.GracefulStop) + go func() { assert.NoError(t, srv.Serve(lis)) }() + + cc, err := client.Dial(ctx, "unix://"+sock.Name()) + require.NoError(t, err) + t.Cleanup(func() { require.NoError(t, cc.Close()) }) + + testClient := grpc_testing.NewTestServiceClient(cc) + const invocations = 2 + var wg sync.WaitGroup + for i := 0; i < invocations; i++ { + wg.Add(1) + go func() { + defer wg.Done() + + resp, err := testClient.UnaryCall(ctx, &grpc_testing.SimpleRequest{Payload: newStubPayload()}) + if !assert.NoError(t, err) { + return + } + require.Equal(t, newStubPayload(), resp.Payload) + + call, err := testClient.HalfDuplexCall(ctx) + if !assert.NoError(t, err) { + return + } + + done := make(chan struct{}) + go func() { + defer close(done) + for { + _, err := call.Recv() + if err == io.EOF { + return + } + assert.NoError(t, err) + } + }() + assert.NoError(t, call.Send(&grpc_testing.StreamingOutputCallRequest{Payload: newStubPayload()})) + assert.NoError(t, call.Send(&grpc_testing.StreamingOutputCallRequest{Payload: newStubPayload()})) + assert.NoError(t, call.CloseSend()) + <-done + }() + } + wg.Wait() + + srv.GracefulStop() + + entries := hook.AllEntries() + require.Len(t, entries, 4) + var unary, stream int + for _, e := range entries { + if e.Message == "finished unary call with code OK" { + unary++ + require.EqualValues(t, 8, e.Data["grpc.request.payload_bytes"]) + require.EqualValues(t, 8, e.Data["grpc.response.payload_bytes"]) + } + if e.Message == "finished streaming call with code OK" { + stream++ + require.EqualValues(t, 16, e.Data["grpc.request.payload_bytes"]) + require.EqualValues(t, 16, e.Data["grpc.response.payload_bytes"]) + } + } + require.Equal(t, invocations, unary) + require.Equal(t, invocations, stream) +} + func TestPayloadBytes_TagRPC(t *testing.T) { + t.Parallel() + ctx := testhelper.Context(t) ctx = (&PayloadBytes{}).TagRPC(ctx, nil) require.Equal(t, @@ -19,9 +177,12 @@ func TestPayloadBytes_TagRPC(t *testing.T) { } func TestPayloadBytes_HandleRPC(t *testing.T) { + t.Parallel() + ctx := testhelper.Context(t) handler := &PayloadBytes{} ctx = handler.TagRPC(ctx, nil) + handler.HandleRPC(ctx, nil) // sanity check we don't fail anything handler.HandleRPC(ctx, &stats.Begin{}) // sanity check we don't fail anything handler.HandleRPC(ctx, &stats.InPayload{Length: 42}) @@ -47,6 +208,8 @@ func TestPayloadBytes_HandleRPC(t *testing.T) { } func TestPayloadBytesStats_Fields(t *testing.T) { + t.Parallel() + bytesStats := PayloadBytesStats{InPayloadBytes: 80, OutPayloadBytes: 90} require.Equal(t, logrus.Fields{ "grpc.request.payload_bytes": int64(80), @@ -55,6 +218,8 @@ func TestPayloadBytesStats_Fields(t *testing.T) { } func TestFieldsProducer(t *testing.T) { + t.Parallel() + ctx := testhelper.Context(t) t.Run("ok", func(t *testing.T) { diff --git a/internal/log/middleware_test.go b/internal/log/middleware_test.go index 8e333997c..bfcc6b3c6 100644 --- a/internal/log/middleware_test.go +++ b/internal/log/middleware_test.go @@ -2,10 +2,6 @@ package log import ( "context" - "io" - "net" - "os" - "sync" "testing" grpcmw "github.com/grpc-ecosystem/go-grpc-middleware" @@ -15,157 +11,11 @@ import ( "github.com/sirupsen/logrus/hooks/test" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/client" - "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/grpcstats" "google.golang.org/grpc" "google.golang.org/grpc/codes" - "google.golang.org/grpc/interop/grpc_testing" "google.golang.org/grpc/stats" - "google.golang.org/protobuf/proto" ) -func TestPayloadBytes(t *testing.T) { - ctx := createContext() - - logger, hook := test.NewNullLogger() - - opts := []grpc.ServerOption{ - grpc.StatsHandler(PerRPCLogHandler{ - Underlying: &grpcstats.PayloadBytes{}, - FieldProducers: []FieldsProducer{grpcstats.FieldsProducer}, - }), - grpc.ChainUnaryInterceptor( - grpcmwlogrus.UnaryServerInterceptor( - logrus.NewEntry(logger), - grpcmwlogrus.WithMessageProducer( - MessageProducer( - PropagationMessageProducer(grpcmwlogrus.DefaultMessageProducer), - grpcstats.FieldsProducer, - ), - ), - ), - UnaryLogDataCatcherServerInterceptor(), - ), - grpc.ChainStreamInterceptor( - grpcmwlogrus.StreamServerInterceptor( - logrus.NewEntry(logger), - grpcmwlogrus.WithMessageProducer( - MessageProducer( - PropagationMessageProducer(grpcmwlogrus.DefaultMessageProducer), - grpcstats.FieldsProducer, - ), - ), - ), - StreamLogDataCatcherServerInterceptor(), - ), - } - - srv := grpc.NewServer(opts...) - grpc_testing.RegisterTestServiceServer(srv, testService{}) - sock, err := os.CreateTemp("", "") - require.NoError(t, err) - require.NoError(t, sock.Close()) - require.NoError(t, os.RemoveAll(sock.Name())) - t.Cleanup(func() { require.NoError(t, os.RemoveAll(sock.Name())) }) - - lis, err := net.Listen("unix", sock.Name()) - require.NoError(t, err) - - t.Cleanup(srv.GracefulStop) - go func() { assert.NoError(t, srv.Serve(lis)) }() - - cc, err := client.Dial(ctx, "unix://"+sock.Name()) - require.NoError(t, err) - t.Cleanup(func() { require.NoError(t, cc.Close()) }) - - testClient := grpc_testing.NewTestServiceClient(cc) - const invocations = 2 - var wg sync.WaitGroup - for i := 0; i < invocations; i++ { - wg.Add(1) - go func() { - defer wg.Done() - - resp, err := testClient.UnaryCall(ctx, &grpc_testing.SimpleRequest{Payload: newStubPayload()}) - if !assert.NoError(t, err) { - return - } - require.Equal(t, newStubPayload(), resp.Payload) - - call, err := testClient.HalfDuplexCall(ctx) - if !assert.NoError(t, err) { - return - } - - done := make(chan struct{}) - go func() { - defer close(done) - for { - _, err := call.Recv() - if err == io.EOF { - return - } - assert.NoError(t, err) - } - }() - assert.NoError(t, call.Send(&grpc_testing.StreamingOutputCallRequest{Payload: newStubPayload()})) - assert.NoError(t, call.Send(&grpc_testing.StreamingOutputCallRequest{Payload: newStubPayload()})) - assert.NoError(t, call.CloseSend()) - <-done - }() - } - wg.Wait() - - srv.GracefulStop() - - entries := hook.AllEntries() - require.Len(t, entries, 4) - var unary, stream int - for _, e := range entries { - if e.Message == "finished unary call with code OK" { - unary++ - require.EqualValues(t, 8, e.Data["grpc.request.payload_bytes"]) - require.EqualValues(t, 8, e.Data["grpc.response.payload_bytes"]) - } - if e.Message == "finished streaming call with code OK" { - stream++ - require.EqualValues(t, 16, e.Data["grpc.request.payload_bytes"]) - require.EqualValues(t, 16, e.Data["grpc.response.payload_bytes"]) - } - } - require.Equal(t, invocations, unary) - require.Equal(t, invocations, stream) -} - -func newStubPayload() *grpc_testing.Payload { - return &grpc_testing.Payload{Body: []byte("stub")} -} - -type testService struct { - grpc_testing.UnimplementedTestServiceServer -} - -func (ts testService) UnaryCall(context.Context, *grpc_testing.SimpleRequest) (*grpc_testing.SimpleResponse, error) { - return &grpc_testing.SimpleResponse{Payload: newStubPayload()}, nil -} - -func (ts testService) HalfDuplexCall(stream grpc_testing.TestService_HalfDuplexCallServer) error { - for { - if _, err := stream.Recv(); err != nil { - if err == io.EOF { - break - } - return err - } - } - - resp := &grpc_testing.StreamingOutputCallResponse{Payload: newStubPayload()} - if err := stream.Send(proto.Clone(resp).(*grpc_testing.StreamingOutputCallResponse)); err != nil { - return err - } - return stream.Send(proto.Clone(resp).(*grpc_testing.StreamingOutputCallResponse)) -} - func TestMessageProducer(t *testing.T) { triggered := false |