diff options
author | Pavlo Strokov <pstrokov@gitlab.com> | 2021-11-10 16:28:35 +0300 |
---|---|---|
committer | Pavlo Strokov <pstrokov@gitlab.com> | 2021-11-11 00:02:35 +0300 |
commit | ad63076210a0d46cc6933398392cf80e9d007b61 (patch) | |
tree | abdd02790a67a345f97808538bd9dd8dab2b6b96 | |
parent | b0053d5886d379eaa94e36d94ba28926de5cabca (diff) |
grpcstats: Extend log with payload size
This change finalize introduction of the payload
size into the logging. The PerRPCLogHandler now
responsible for writing log messages produced by
the logrus interceptor with the additional data
gathered by the PayloadBytes (payload sizes).
The (Unary|Stream)LogDataCatcherServerInterceptor
catches log data produced by the logrus interceptor
and set it on the holder introduced in the previous
commits. So it will be available for the PerRPCLogHandler
once all interceptors are done.
Closes: https://gitlab.com/gitlab-org/gitaly/-/issues/3867
Changelog: added
-rw-r--r-- | internal/gitaly/server/server.go | 21 | ||||
-rw-r--r-- | internal/gitaly/server/server_factory_test.go | 25 | ||||
-rw-r--r-- | internal/grpcstats/testhelper_test.go | 11 | ||||
-rw-r--r-- | internal/log/log.go | 28 | ||||
-rw-r--r-- | internal/log/log_test.go | 223 |
5 files changed, 306 insertions, 2 deletions
diff --git a/internal/gitaly/server/server.go b/internal/gitaly/server/server.go index ffdf55c10..9dcf07aa4 100644 --- a/internal/gitaly/server/server.go +++ b/internal/gitaly/server/server.go @@ -16,6 +16,7 @@ import ( "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/client" "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/config" "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/server/auth" + "gitlab.com/gitlab-org/gitaly/v14/internal/grpcstats" "gitlab.com/gitlab-org/gitaly/v14/internal/helper/fieldextractors" "gitlab.com/gitlab-org/gitaly/v14/internal/listenmux" gitalylog "gitlab.com/gitlab-org/gitaly/v14/internal/log" @@ -103,7 +104,19 @@ func New( []grpc.DialOption{client.UnaryInterceptor()}, )) + logMsgProducer := grpcmwlogrus.WithMessageProducer( + gitalylog.MessageProducer( + gitalylog.PropagationMessageProducer(grpcmwlogrus.DefaultMessageProducer), + commandstatshandler.FieldsProducer, + grpcstats.FieldsProducer, + ), + ) + opts := []grpc.ServerOption{ + grpc.StatsHandler(gitalylog.PerRPCLogHandler{ + Underlying: &grpcstats.PayloadBytes{}, + FieldProducers: []gitalylog.FieldsProducer{grpcstats.FieldsProducer}, + }), grpc.Creds(lm), grpc.StreamInterceptor(grpcmw.ChainStreamServer( grpcmwtags.StreamServerInterceptor(ctxTagOpts...), @@ -113,7 +126,9 @@ func New( commandstatshandler.StreamInterceptor, grpcmwlogrus.StreamServerInterceptor(logrusEntry, grpcmwlogrus.WithTimestampFormat(gitalylog.LogTimestampFormat), - grpcmwlogrus.WithMessageProducer(commandstatshandler.CommandStatsMessageProducer)), + logMsgProducer, + ), + gitalylog.StreamLogDataCatcherServerInterceptor(), sentryhandler.StreamLogHandler, cancelhandler.Stream, // Should be below LogHandler auth.StreamServerInterceptor(cfg.Auth), @@ -132,7 +147,9 @@ func New( commandstatshandler.UnaryInterceptor, grpcmwlogrus.UnaryServerInterceptor(logrusEntry, grpcmwlogrus.WithTimestampFormat(gitalylog.LogTimestampFormat), - grpcmwlogrus.WithMessageProducer(commandstatshandler.CommandStatsMessageProducer)), + logMsgProducer, + ), + gitalylog.UnaryLogDataCatcherServerInterceptor(), sentryhandler.UnaryLogHandler, cancelhandler.Unary, // Should be below LogHandler auth.UnaryServerInterceptor(cfg.Auth), diff --git a/internal/gitaly/server/server_factory_test.go b/internal/gitaly/server/server_factory_test.go index 369cf3775..a9ad14696 100644 --- a/internal/gitaly/server/server_factory_test.go +++ b/internal/gitaly/server/server_factory_test.go @@ -9,6 +9,8 @@ import ( "os" "testing" + "github.com/sirupsen/logrus" + "github.com/sirupsen/logrus/hooks/test" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "gitlab.com/gitlab-org/gitaly/v14/client" @@ -125,6 +127,29 @@ func TestGitalyServerFactory(t *testing.T) { _, socketErr := socketHealthClient.Check(ctx, &healthpb.HealthCheckRequest{}) require.Equal(t, codes.Unavailable, status.Code(socketErr)) }) + + t.Run("logging check", func(t *testing.T) { + cfg := testcfg.Build(t) + logger, hook := test.NewNullLogger() + sf := NewGitalyServerFactory(cfg, logger.WithContext(ctx), backchannel.NewRegistry(), cache.New(cfg, config.NewLocator(cfg))) + + checkHealth(t, sf, starter.TCP, "localhost:0") + + var entry *logrus.Entry + for _, e := range hook.AllEntries() { + if e.Message == "finished unary call with code OK" { + entry = e + break + } + } + require.NotNil(t, entry) + reqSize, found := entry.Data["grpc.request.payload_bytes"] + assert.EqualValues(t, 0, reqSize) + require.True(t, found) + respSize, found := entry.Data["grpc.response.payload_bytes"] + assert.EqualValues(t, 2, respSize) + require.True(t, found) + }) } func TestGitalyServerFactory_closeOrder(t *testing.T) { diff --git a/internal/grpcstats/testhelper_test.go b/internal/grpcstats/testhelper_test.go new file mode 100644 index 000000000..b41bdc29e --- /dev/null +++ b/internal/grpcstats/testhelper_test.go @@ -0,0 +1,11 @@ +package grpcstats_test + +import ( + "testing" + + "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper" +) + +func TestMain(m *testing.M) { + testhelper.Run(m) +} diff --git a/internal/log/log.go b/internal/log/log.go index 3f610662f..5c4e5f6db 100644 --- a/internal/log/log.go +++ b/internal/log/log.go @@ -7,6 +7,7 @@ import ( grpcmwlogrus "github.com/grpc-ecosystem/go-grpc-middleware/logging/logrus" "github.com/grpc-ecosystem/go-grpc-middleware/logging/logrus/ctxlogrus" "github.com/sirupsen/logrus" + "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/stats" ) @@ -234,7 +235,34 @@ func (lh PerRPCLogHandler) HandleRPC(ctx context.Context, rs stats.RPCStats) { // TagRPC propagates a special data holder into the context that is responsible to // hold logging information produced by the logging interceptor. +// The logging data should be caught by the UnaryLogDataCatcherServerInterceptor. It needs to +// be included into the interceptor chain below logging interceptor. func (lh PerRPCLogHandler) TagRPC(ctx context.Context, rti *stats.RPCTagInfo) context.Context { ctx = context.WithValue(ctx, messageProducerHolderKey{}, new(messageProducerHolder)) return lh.Underlying.TagRPC(ctx, rti) } + +// UnaryLogDataCatcherServerInterceptor catches logging data produced by the upper interceptors and +// propagates it into the holder to pop up it to the HandleRPC method of the PerRPCLogHandler. +func UnaryLogDataCatcherServerInterceptor() grpc.UnaryServerInterceptor { + return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) { + mpp := messageProducerPropagationFrom(ctx) + if mpp != nil { + mpp.fields = ctxlogrus.Extract(ctx).Data + } + return handler(ctx, req) + } +} + +// StreamLogDataCatcherServerInterceptor catches logging data produced by the upper interceptors and +// propagates it into the holder to pop up it to the HandleRPC method of the PerRPCLogHandler. +func StreamLogDataCatcherServerInterceptor() grpc.StreamServerInterceptor { + return func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error { + ctx := ss.Context() + mpp := messageProducerPropagationFrom(ctx) + if mpp != nil { + mpp.fields = ctxlogrus.Extract(ctx).Data + } + return handler(srv, ss) + } +} diff --git a/internal/log/log_test.go b/internal/log/log_test.go index ab08e6f9a..fcc3d0573 100644 --- a/internal/log/log_test.go +++ b/internal/log/log_test.go @@ -3,17 +3,174 @@ package log import ( "bytes" "context" + "io" + "net" + "os" + "sync" "testing" "time" + grpcmw "github.com/grpc-ecosystem/go-grpc-middleware" + grpcmwlogrus "github.com/grpc-ecosystem/go-grpc-middleware/logging/logrus" "github.com/grpc-ecosystem/go-grpc-middleware/logging/logrus/ctxlogrus" "github.com/sirupsen/logrus" + "github.com/sirupsen/logrus/hooks/test" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "gitlab.com/gitlab-org/gitaly/v14/client" + "gitlab.com/gitlab-org/gitaly/v14/internal/grpcstats" + "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testassert" + "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/stats" + "google.golang.org/grpc/test/grpc_testing" + "google.golang.org/protobuf/proto" ) +func TestPayloadBytes(t *testing.T) { + ctx := context.Background() + + logger, hook := test.NewNullLogger() + + opts := []grpc.ServerOption{ + grpc.StatsHandler(PerRPCLogHandler{ + Underlying: &grpcstats.PayloadBytes{}, + FieldProducers: []FieldsProducer{grpcstats.FieldsProducer}, + }), + grpc.UnaryInterceptor( + grpcmw.ChainUnaryServer( + grpcmwlogrus.UnaryServerInterceptor( + logrus.NewEntry(logger), + grpcmwlogrus.WithMessageProducer( + MessageProducer( + PropagationMessageProducer(grpcmwlogrus.DefaultMessageProducer), + grpcstats.FieldsProducer, + ), + ), + ), + UnaryLogDataCatcherServerInterceptor(), + ), + ), + grpc.StreamInterceptor( + grpcmw.ChainStreamServer( + grpcmwlogrus.StreamServerInterceptor( + logrus.NewEntry(logger), + grpcmwlogrus.WithMessageProducer( + MessageProducer( + PropagationMessageProducer(grpcmwlogrus.DefaultMessageProducer), + grpcstats.FieldsProducer, + ), + ), + ), + StreamLogDataCatcherServerInterceptor(), + ), + ), + } + + srv := grpc.NewServer(opts...) + grpc_testing.RegisterTestServiceServer(srv, testService{}) + sock, err := os.CreateTemp("", "") + require.NoError(t, err) + require.NoError(t, sock.Close()) + require.NoError(t, os.RemoveAll(sock.Name())) + t.Cleanup(func() { require.NoError(t, os.RemoveAll(sock.Name())) }) + + lis, err := net.Listen("unix", sock.Name()) + require.NoError(t, err) + + t.Cleanup(srv.GracefulStop) + go func() { assert.NoError(t, srv.Serve(lis)) }() + + cc, err := client.DialContext(ctx, "unix://"+sock.Name(), nil) + require.NoError(t, err) + t.Cleanup(func() { require.NoError(t, cc.Close()) }) + + testClient := grpc_testing.NewTestServiceClient(cc) + const invocations = 2 + var wg sync.WaitGroup + for i := 0; i < invocations; i++ { + wg.Add(1) + go func() { + defer wg.Done() + + resp, err := testClient.UnaryCall(ctx, &grpc_testing.SimpleRequest{Payload: newStubPayload()}) + if !assert.NoError(t, err) { + return + } + testassert.ProtoEqual(t, newStubPayload(), resp.Payload) + + call, err := testClient.HalfDuplexCall(ctx) + if !assert.NoError(t, err) { + return + } + + done := make(chan struct{}) + go func() { + defer close(done) + for { + _, err := call.Recv() + if err == io.EOF { + return + } + assert.NoError(t, err) + } + }() + assert.NoError(t, call.Send(&grpc_testing.StreamingOutputCallRequest{Payload: newStubPayload()})) + assert.NoError(t, call.Send(&grpc_testing.StreamingOutputCallRequest{Payload: newStubPayload()})) + assert.NoError(t, call.CloseSend()) + <-done + }() + } + wg.Wait() + + entries := hook.AllEntries() + require.Len(t, entries, 4) + var unary, stream int + for _, e := range entries { + if e.Message == "finished unary call with code OK" { + unary++ + require.EqualValues(t, 8, e.Data["grpc.request.payload_bytes"]) + require.EqualValues(t, 8, e.Data["grpc.response.payload_bytes"]) + } + if e.Message == "finished streaming call with code OK" { + stream++ + require.EqualValues(t, 16, e.Data["grpc.request.payload_bytes"]) + require.EqualValues(t, 16, e.Data["grpc.response.payload_bytes"]) + } + } + require.Equal(t, invocations, unary) + require.Equal(t, invocations, stream) +} + +func newStubPayload() *grpc_testing.Payload { + return &grpc_testing.Payload{Body: []byte("stub")} +} + +type testService struct { + grpc_testing.UnimplementedTestServiceServer +} + +func (ts testService) UnaryCall(context.Context, *grpc_testing.SimpleRequest) (*grpc_testing.SimpleResponse, error) { + return &grpc_testing.SimpleResponse{Payload: newStubPayload()}, nil +} + +func (ts testService) HalfDuplexCall(stream grpc_testing.TestService_HalfDuplexCallServer) error { + for { + if _, err := stream.Recv(); err != nil { + if err == io.EOF { + break + } + return err + } + } + + resp := &grpc_testing.StreamingOutputCallResponse{Payload: newStubPayload()} + if err := stream.Send(proto.Clone(resp).(*grpc_testing.StreamingOutputCallResponse)); err != nil { + return err + } + return stream.Send(proto.Clone(resp).(*grpc_testing.StreamingOutputCallResponse)) +} + func TestConfigure(t *testing.T) { for _, tc := range []struct { desc string @@ -215,3 +372,69 @@ func (m *mockStatHandler) TagConn(ctx context.Context, s *stats.ConnTagInfo) con func (m *mockStatHandler) HandleConn(ctx context.Context, s stats.ConnStats) { m.Calls["HandleConn"] = append(m.Calls["HandleConn"], s) } + +func TestUnaryLogDataCatcherServerInterceptor(t *testing.T) { + handlerStub := func(context.Context, interface{}) (interface{}, error) { + return nil, nil + } + + t.Run("propagates call", func(t *testing.T) { + interceptor := UnaryLogDataCatcherServerInterceptor() + resp, err := interceptor(context.Background(), nil, nil, func(ctx context.Context, req interface{}) (interface{}, error) { + return 42, assert.AnError + }) + + assert.Equal(t, 42, resp) + assert.Equal(t, assert.AnError, err) + }) + + t.Run("no logger", func(t *testing.T) { + mpp := &messageProducerHolder{} + ctx := context.WithValue(context.Background(), messageProducerHolderKey{}, mpp) + + interceptor := UnaryLogDataCatcherServerInterceptor() + _, _ = interceptor(ctx, nil, nil, handlerStub) + assert.Empty(t, mpp.fields) + }) + + t.Run("caught", func(t *testing.T) { + mpp := &messageProducerHolder{} + ctx := context.WithValue(context.Background(), messageProducerHolderKey{}, mpp) + ctx = ctxlogrus.ToContext(ctx, logrus.New().WithField("a", 1)) + interceptor := UnaryLogDataCatcherServerInterceptor() + _, _ = interceptor(ctx, nil, nil, handlerStub) + assert.Equal(t, logrus.Fields{"a": 1}, mpp.fields) + }) +} + +func TestStreamLogDataCatcherServerInterceptor(t *testing.T) { + t.Run("propagates call", func(t *testing.T) { + interceptor := StreamLogDataCatcherServerInterceptor() + ss := &grpcmw.WrappedServerStream{WrappedContext: context.Background()} + err := interceptor(nil, ss, nil, func(interface{}, grpc.ServerStream) error { + return assert.AnError + }) + + assert.Equal(t, assert.AnError, err) + }) + + t.Run("no logger", func(t *testing.T) { + mpp := &messageProducerHolder{} + ctx := context.WithValue(context.Background(), messageProducerHolderKey{}, mpp) + + interceptor := StreamLogDataCatcherServerInterceptor() + ss := &grpcmw.WrappedServerStream{WrappedContext: ctx} + _ = interceptor(nil, ss, nil, func(interface{}, grpc.ServerStream) error { return nil }) + }) + + t.Run("caught", func(t *testing.T) { + mpp := &messageProducerHolder{} + ctx := context.WithValue(context.Background(), messageProducerHolderKey{}, mpp) + ctx = ctxlogrus.ToContext(ctx, logrus.New().WithField("a", 1)) + + interceptor := StreamLogDataCatcherServerInterceptor() + ss := &grpcmw.WrappedServerStream{WrappedContext: ctx} + _ = interceptor(nil, ss, nil, func(interface{}, grpc.ServerStream) error { return nil }) + assert.Equal(t, logrus.Fields{"a": 1}, mpp.fields) + }) +} |