Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPatrick Steinhardt <psteinhardt@gitlab.com>2023-09-15 13:25:55 +0300
committerPatrick Steinhardt <psteinhardt@gitlab.com>2023-09-15 13:25:55 +0300
commit7f83c086aafe17cb36fd2f544e4cdb3564694280 (patch)
treec187f710d79a5203794bbf8e877e90e89249ce14
parent16cd7e7fa2bb9afdd7061ae6187f739dc2fe120f (diff)
parent52a14788903be8b13c1e7acd87b52ff986a3c4f8 (diff)
Merge branch 'pks-log-move-payload-bytes-test' into 'master'
log: Move PayloadBytes tests into grpcstats package See merge request https://gitlab.com/gitlab-org/gitaly/-/merge_requests/6370 Merged-by: Patrick Steinhardt <psteinhardt@gitlab.com> Approved-by: karthik nayak <knayak@gitlab.com>
-rw-r--r--internal/grpc/grpcstats/stats_test.go165
-rw-r--r--internal/log/middleware_test.go150
2 files changed, 165 insertions, 150 deletions
diff --git a/internal/grpc/grpcstats/stats_test.go b/internal/grpc/grpcstats/stats_test.go
index 84d13930c..9b334bfb8 100644
--- a/internal/grpc/grpcstats/stats_test.go
+++ b/internal/grpc/grpcstats/stats_test.go
@@ -1,15 +1,173 @@
package grpcstats
import (
+ "context"
+ "io"
+ "net"
+ "os"
+ "sync"
"testing"
+ grpcmwlogrus "github.com/grpc-ecosystem/go-grpc-middleware/logging/logrus"
"github.com/sirupsen/logrus"
+ "github.com/sirupsen/logrus/hooks/test"
+ "github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
+ "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/client"
+ "gitlab.com/gitlab-org/gitaly/v16/internal/log"
"gitlab.com/gitlab-org/gitaly/v16/internal/testhelper"
+ "google.golang.org/grpc"
+ "google.golang.org/grpc/interop/grpc_testing"
"google.golang.org/grpc/stats"
+ "google.golang.org/protobuf/proto"
)
+func newStubPayload() *grpc_testing.Payload {
+ return &grpc_testing.Payload{Body: []byte("stub")}
+}
+
+type testService struct {
+ grpc_testing.UnimplementedTestServiceServer
+}
+
+func (ts testService) UnaryCall(context.Context, *grpc_testing.SimpleRequest) (*grpc_testing.SimpleResponse, error) {
+ return &grpc_testing.SimpleResponse{Payload: newStubPayload()}, nil
+}
+
+func (ts testService) HalfDuplexCall(stream grpc_testing.TestService_HalfDuplexCallServer) error {
+ for {
+ if _, err := stream.Recv(); err != nil {
+ if err == io.EOF {
+ break
+ }
+ return err
+ }
+ }
+
+ resp := &grpc_testing.StreamingOutputCallResponse{Payload: newStubPayload()}
+ if err := stream.Send(proto.Clone(resp).(*grpc_testing.StreamingOutputCallResponse)); err != nil {
+ return err
+ }
+ return stream.Send(proto.Clone(resp).(*grpc_testing.StreamingOutputCallResponse))
+}
+
+func TestPayloadBytes(t *testing.T) {
+ t.Parallel()
+
+ ctx := testhelper.Context(t)
+ logger, hook := test.NewNullLogger()
+
+ opts := []grpc.ServerOption{
+ grpc.StatsHandler(log.PerRPCLogHandler{
+ Underlying: &PayloadBytes{},
+ FieldProducers: []log.FieldsProducer{FieldsProducer},
+ }),
+ grpc.ChainUnaryInterceptor(
+ grpcmwlogrus.UnaryServerInterceptor(
+ logrus.NewEntry(logger),
+ grpcmwlogrus.WithMessageProducer(
+ log.MessageProducer(
+ log.PropagationMessageProducer(grpcmwlogrus.DefaultMessageProducer),
+ FieldsProducer,
+ ),
+ ),
+ ),
+ log.UnaryLogDataCatcherServerInterceptor(),
+ ),
+ grpc.ChainStreamInterceptor(
+ grpcmwlogrus.StreamServerInterceptor(
+ logrus.NewEntry(logger),
+ grpcmwlogrus.WithMessageProducer(
+ log.MessageProducer(
+ log.PropagationMessageProducer(grpcmwlogrus.DefaultMessageProducer),
+ FieldsProducer,
+ ),
+ ),
+ ),
+ log.StreamLogDataCatcherServerInterceptor(),
+ ),
+ }
+
+ srv := grpc.NewServer(opts...)
+ grpc_testing.RegisterTestServiceServer(srv, testService{})
+ sock, err := os.CreateTemp("", "")
+ require.NoError(t, err)
+ require.NoError(t, sock.Close())
+ require.NoError(t, os.RemoveAll(sock.Name()))
+ t.Cleanup(func() { require.NoError(t, os.RemoveAll(sock.Name())) })
+
+ lis, err := net.Listen("unix", sock.Name())
+ require.NoError(t, err)
+
+ t.Cleanup(srv.GracefulStop)
+ go func() { assert.NoError(t, srv.Serve(lis)) }()
+
+ cc, err := client.Dial(ctx, "unix://"+sock.Name())
+ require.NoError(t, err)
+ t.Cleanup(func() { require.NoError(t, cc.Close()) })
+
+ testClient := grpc_testing.NewTestServiceClient(cc)
+ const invocations = 2
+ var wg sync.WaitGroup
+ for i := 0; i < invocations; i++ {
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+
+ resp, err := testClient.UnaryCall(ctx, &grpc_testing.SimpleRequest{Payload: newStubPayload()})
+ if !assert.NoError(t, err) {
+ return
+ }
+ require.Equal(t, newStubPayload(), resp.Payload)
+
+ call, err := testClient.HalfDuplexCall(ctx)
+ if !assert.NoError(t, err) {
+ return
+ }
+
+ done := make(chan struct{})
+ go func() {
+ defer close(done)
+ for {
+ _, err := call.Recv()
+ if err == io.EOF {
+ return
+ }
+ assert.NoError(t, err)
+ }
+ }()
+ assert.NoError(t, call.Send(&grpc_testing.StreamingOutputCallRequest{Payload: newStubPayload()}))
+ assert.NoError(t, call.Send(&grpc_testing.StreamingOutputCallRequest{Payload: newStubPayload()}))
+ assert.NoError(t, call.CloseSend())
+ <-done
+ }()
+ }
+ wg.Wait()
+
+ srv.GracefulStop()
+
+ entries := hook.AllEntries()
+ require.Len(t, entries, 4)
+ var unary, stream int
+ for _, e := range entries {
+ if e.Message == "finished unary call with code OK" {
+ unary++
+ require.EqualValues(t, 8, e.Data["grpc.request.payload_bytes"])
+ require.EqualValues(t, 8, e.Data["grpc.response.payload_bytes"])
+ }
+ if e.Message == "finished streaming call with code OK" {
+ stream++
+ require.EqualValues(t, 16, e.Data["grpc.request.payload_bytes"])
+ require.EqualValues(t, 16, e.Data["grpc.response.payload_bytes"])
+ }
+ }
+ require.Equal(t, invocations, unary)
+ require.Equal(t, invocations, stream)
+}
+
func TestPayloadBytes_TagRPC(t *testing.T) {
+ t.Parallel()
+
ctx := testhelper.Context(t)
ctx = (&PayloadBytes{}).TagRPC(ctx, nil)
require.Equal(t,
@@ -19,9 +177,12 @@ func TestPayloadBytes_TagRPC(t *testing.T) {
}
func TestPayloadBytes_HandleRPC(t *testing.T) {
+ t.Parallel()
+
ctx := testhelper.Context(t)
handler := &PayloadBytes{}
ctx = handler.TagRPC(ctx, nil)
+
handler.HandleRPC(ctx, nil) // sanity check we don't fail anything
handler.HandleRPC(ctx, &stats.Begin{}) // sanity check we don't fail anything
handler.HandleRPC(ctx, &stats.InPayload{Length: 42})
@@ -47,6 +208,8 @@ func TestPayloadBytes_HandleRPC(t *testing.T) {
}
func TestPayloadBytesStats_Fields(t *testing.T) {
+ t.Parallel()
+
bytesStats := PayloadBytesStats{InPayloadBytes: 80, OutPayloadBytes: 90}
require.Equal(t, logrus.Fields{
"grpc.request.payload_bytes": int64(80),
@@ -55,6 +218,8 @@ func TestPayloadBytesStats_Fields(t *testing.T) {
}
func TestFieldsProducer(t *testing.T) {
+ t.Parallel()
+
ctx := testhelper.Context(t)
t.Run("ok", func(t *testing.T) {
diff --git a/internal/log/middleware_test.go b/internal/log/middleware_test.go
index 8e333997c..bfcc6b3c6 100644
--- a/internal/log/middleware_test.go
+++ b/internal/log/middleware_test.go
@@ -2,10 +2,6 @@ package log
import (
"context"
- "io"
- "net"
- "os"
- "sync"
"testing"
grpcmw "github.com/grpc-ecosystem/go-grpc-middleware"
@@ -15,157 +11,11 @@ import (
"github.com/sirupsen/logrus/hooks/test"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
- "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/client"
- "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/grpcstats"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
- "google.golang.org/grpc/interop/grpc_testing"
"google.golang.org/grpc/stats"
- "google.golang.org/protobuf/proto"
)
-func TestPayloadBytes(t *testing.T) {
- ctx := createContext()
-
- logger, hook := test.NewNullLogger()
-
- opts := []grpc.ServerOption{
- grpc.StatsHandler(PerRPCLogHandler{
- Underlying: &grpcstats.PayloadBytes{},
- FieldProducers: []FieldsProducer{grpcstats.FieldsProducer},
- }),
- grpc.ChainUnaryInterceptor(
- grpcmwlogrus.UnaryServerInterceptor(
- logrus.NewEntry(logger),
- grpcmwlogrus.WithMessageProducer(
- MessageProducer(
- PropagationMessageProducer(grpcmwlogrus.DefaultMessageProducer),
- grpcstats.FieldsProducer,
- ),
- ),
- ),
- UnaryLogDataCatcherServerInterceptor(),
- ),
- grpc.ChainStreamInterceptor(
- grpcmwlogrus.StreamServerInterceptor(
- logrus.NewEntry(logger),
- grpcmwlogrus.WithMessageProducer(
- MessageProducer(
- PropagationMessageProducer(grpcmwlogrus.DefaultMessageProducer),
- grpcstats.FieldsProducer,
- ),
- ),
- ),
- StreamLogDataCatcherServerInterceptor(),
- ),
- }
-
- srv := grpc.NewServer(opts...)
- grpc_testing.RegisterTestServiceServer(srv, testService{})
- sock, err := os.CreateTemp("", "")
- require.NoError(t, err)
- require.NoError(t, sock.Close())
- require.NoError(t, os.RemoveAll(sock.Name()))
- t.Cleanup(func() { require.NoError(t, os.RemoveAll(sock.Name())) })
-
- lis, err := net.Listen("unix", sock.Name())
- require.NoError(t, err)
-
- t.Cleanup(srv.GracefulStop)
- go func() { assert.NoError(t, srv.Serve(lis)) }()
-
- cc, err := client.Dial(ctx, "unix://"+sock.Name())
- require.NoError(t, err)
- t.Cleanup(func() { require.NoError(t, cc.Close()) })
-
- testClient := grpc_testing.NewTestServiceClient(cc)
- const invocations = 2
- var wg sync.WaitGroup
- for i := 0; i < invocations; i++ {
- wg.Add(1)
- go func() {
- defer wg.Done()
-
- resp, err := testClient.UnaryCall(ctx, &grpc_testing.SimpleRequest{Payload: newStubPayload()})
- if !assert.NoError(t, err) {
- return
- }
- require.Equal(t, newStubPayload(), resp.Payload)
-
- call, err := testClient.HalfDuplexCall(ctx)
- if !assert.NoError(t, err) {
- return
- }
-
- done := make(chan struct{})
- go func() {
- defer close(done)
- for {
- _, err := call.Recv()
- if err == io.EOF {
- return
- }
- assert.NoError(t, err)
- }
- }()
- assert.NoError(t, call.Send(&grpc_testing.StreamingOutputCallRequest{Payload: newStubPayload()}))
- assert.NoError(t, call.Send(&grpc_testing.StreamingOutputCallRequest{Payload: newStubPayload()}))
- assert.NoError(t, call.CloseSend())
- <-done
- }()
- }
- wg.Wait()
-
- srv.GracefulStop()
-
- entries := hook.AllEntries()
- require.Len(t, entries, 4)
- var unary, stream int
- for _, e := range entries {
- if e.Message == "finished unary call with code OK" {
- unary++
- require.EqualValues(t, 8, e.Data["grpc.request.payload_bytes"])
- require.EqualValues(t, 8, e.Data["grpc.response.payload_bytes"])
- }
- if e.Message == "finished streaming call with code OK" {
- stream++
- require.EqualValues(t, 16, e.Data["grpc.request.payload_bytes"])
- require.EqualValues(t, 16, e.Data["grpc.response.payload_bytes"])
- }
- }
- require.Equal(t, invocations, unary)
- require.Equal(t, invocations, stream)
-}
-
-func newStubPayload() *grpc_testing.Payload {
- return &grpc_testing.Payload{Body: []byte("stub")}
-}
-
-type testService struct {
- grpc_testing.UnimplementedTestServiceServer
-}
-
-func (ts testService) UnaryCall(context.Context, *grpc_testing.SimpleRequest) (*grpc_testing.SimpleResponse, error) {
- return &grpc_testing.SimpleResponse{Payload: newStubPayload()}, nil
-}
-
-func (ts testService) HalfDuplexCall(stream grpc_testing.TestService_HalfDuplexCallServer) error {
- for {
- if _, err := stream.Recv(); err != nil {
- if err == io.EOF {
- break
- }
- return err
- }
- }
-
- resp := &grpc_testing.StreamingOutputCallResponse{Payload: newStubPayload()}
- if err := stream.Send(proto.Clone(resp).(*grpc_testing.StreamingOutputCallResponse)); err != nil {
- return err
- }
- return stream.Send(proto.Clone(resp).(*grpc_testing.StreamingOutputCallResponse))
-}
-
func TestMessageProducer(t *testing.T) {
triggered := false