Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPavlo Strokov <pstrokov@gitlab.com>2021-11-10 16:28:35 +0300
committerPavlo Strokov <pstrokov@gitlab.com>2021-11-11 00:02:35 +0300
commitad63076210a0d46cc6933398392cf80e9d007b61 (patch)
treeabdd02790a67a345f97808538bd9dd8dab2b6b96
parentb0053d5886d379eaa94e36d94ba28926de5cabca (diff)
grpcstats: Extend log with payload size
This change finalize introduction of the payload size into the logging. The PerRPCLogHandler now responsible for writing log messages produced by the logrus interceptor with the additional data gathered by the PayloadBytes (payload sizes). The (Unary|Stream)LogDataCatcherServerInterceptor catches log data produced by the logrus interceptor and set it on the holder introduced in the previous commits. So it will be available for the PerRPCLogHandler once all interceptors are done. Closes: https://gitlab.com/gitlab-org/gitaly/-/issues/3867 Changelog: added
-rw-r--r--internal/gitaly/server/server.go21
-rw-r--r--internal/gitaly/server/server_factory_test.go25
-rw-r--r--internal/grpcstats/testhelper_test.go11
-rw-r--r--internal/log/log.go28
-rw-r--r--internal/log/log_test.go223
5 files changed, 306 insertions, 2 deletions
diff --git a/internal/gitaly/server/server.go b/internal/gitaly/server/server.go
index ffdf55c10..9dcf07aa4 100644
--- a/internal/gitaly/server/server.go
+++ b/internal/gitaly/server/server.go
@@ -16,6 +16,7 @@ import (
"gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/client"
"gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/config"
"gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/server/auth"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/grpcstats"
"gitlab.com/gitlab-org/gitaly/v14/internal/helper/fieldextractors"
"gitlab.com/gitlab-org/gitaly/v14/internal/listenmux"
gitalylog "gitlab.com/gitlab-org/gitaly/v14/internal/log"
@@ -103,7 +104,19 @@ func New(
[]grpc.DialOption{client.UnaryInterceptor()},
))
+ logMsgProducer := grpcmwlogrus.WithMessageProducer(
+ gitalylog.MessageProducer(
+ gitalylog.PropagationMessageProducer(grpcmwlogrus.DefaultMessageProducer),
+ commandstatshandler.FieldsProducer,
+ grpcstats.FieldsProducer,
+ ),
+ )
+
opts := []grpc.ServerOption{
+ grpc.StatsHandler(gitalylog.PerRPCLogHandler{
+ Underlying: &grpcstats.PayloadBytes{},
+ FieldProducers: []gitalylog.FieldsProducer{grpcstats.FieldsProducer},
+ }),
grpc.Creds(lm),
grpc.StreamInterceptor(grpcmw.ChainStreamServer(
grpcmwtags.StreamServerInterceptor(ctxTagOpts...),
@@ -113,7 +126,9 @@ func New(
commandstatshandler.StreamInterceptor,
grpcmwlogrus.StreamServerInterceptor(logrusEntry,
grpcmwlogrus.WithTimestampFormat(gitalylog.LogTimestampFormat),
- grpcmwlogrus.WithMessageProducer(commandstatshandler.CommandStatsMessageProducer)),
+ logMsgProducer,
+ ),
+ gitalylog.StreamLogDataCatcherServerInterceptor(),
sentryhandler.StreamLogHandler,
cancelhandler.Stream, // Should be below LogHandler
auth.StreamServerInterceptor(cfg.Auth),
@@ -132,7 +147,9 @@ func New(
commandstatshandler.UnaryInterceptor,
grpcmwlogrus.UnaryServerInterceptor(logrusEntry,
grpcmwlogrus.WithTimestampFormat(gitalylog.LogTimestampFormat),
- grpcmwlogrus.WithMessageProducer(commandstatshandler.CommandStatsMessageProducer)),
+ logMsgProducer,
+ ),
+ gitalylog.UnaryLogDataCatcherServerInterceptor(),
sentryhandler.UnaryLogHandler,
cancelhandler.Unary, // Should be below LogHandler
auth.UnaryServerInterceptor(cfg.Auth),
diff --git a/internal/gitaly/server/server_factory_test.go b/internal/gitaly/server/server_factory_test.go
index 369cf3775..a9ad14696 100644
--- a/internal/gitaly/server/server_factory_test.go
+++ b/internal/gitaly/server/server_factory_test.go
@@ -9,6 +9,8 @@ import (
"os"
"testing"
+ "github.com/sirupsen/logrus"
+ "github.com/sirupsen/logrus/hooks/test"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"gitlab.com/gitlab-org/gitaly/v14/client"
@@ -125,6 +127,29 @@ func TestGitalyServerFactory(t *testing.T) {
_, socketErr := socketHealthClient.Check(ctx, &healthpb.HealthCheckRequest{})
require.Equal(t, codes.Unavailable, status.Code(socketErr))
})
+
+ t.Run("logging check", func(t *testing.T) {
+ cfg := testcfg.Build(t)
+ logger, hook := test.NewNullLogger()
+ sf := NewGitalyServerFactory(cfg, logger.WithContext(ctx), backchannel.NewRegistry(), cache.New(cfg, config.NewLocator(cfg)))
+
+ checkHealth(t, sf, starter.TCP, "localhost:0")
+
+ var entry *logrus.Entry
+ for _, e := range hook.AllEntries() {
+ if e.Message == "finished unary call with code OK" {
+ entry = e
+ break
+ }
+ }
+ require.NotNil(t, entry)
+ reqSize, found := entry.Data["grpc.request.payload_bytes"]
+ assert.EqualValues(t, 0, reqSize)
+ require.True(t, found)
+ respSize, found := entry.Data["grpc.response.payload_bytes"]
+ assert.EqualValues(t, 2, respSize)
+ require.True(t, found)
+ })
}
func TestGitalyServerFactory_closeOrder(t *testing.T) {
diff --git a/internal/grpcstats/testhelper_test.go b/internal/grpcstats/testhelper_test.go
new file mode 100644
index 000000000..b41bdc29e
--- /dev/null
+++ b/internal/grpcstats/testhelper_test.go
@@ -0,0 +1,11 @@
+package grpcstats_test
+
+import (
+ "testing"
+
+ "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper"
+)
+
+func TestMain(m *testing.M) {
+ testhelper.Run(m)
+}
diff --git a/internal/log/log.go b/internal/log/log.go
index 3f610662f..5c4e5f6db 100644
--- a/internal/log/log.go
+++ b/internal/log/log.go
@@ -7,6 +7,7 @@ import (
grpcmwlogrus "github.com/grpc-ecosystem/go-grpc-middleware/logging/logrus"
"github.com/grpc-ecosystem/go-grpc-middleware/logging/logrus/ctxlogrus"
"github.com/sirupsen/logrus"
+ "google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/stats"
)
@@ -234,7 +235,34 @@ func (lh PerRPCLogHandler) HandleRPC(ctx context.Context, rs stats.RPCStats) {
// TagRPC propagates a special data holder into the context that is responsible to
// hold logging information produced by the logging interceptor.
+// The logging data should be caught by the UnaryLogDataCatcherServerInterceptor. It needs to
+// be included into the interceptor chain below logging interceptor.
func (lh PerRPCLogHandler) TagRPC(ctx context.Context, rti *stats.RPCTagInfo) context.Context {
ctx = context.WithValue(ctx, messageProducerHolderKey{}, new(messageProducerHolder))
return lh.Underlying.TagRPC(ctx, rti)
}
+
+// UnaryLogDataCatcherServerInterceptor catches logging data produced by the upper interceptors and
+// propagates it into the holder to pop up it to the HandleRPC method of the PerRPCLogHandler.
+func UnaryLogDataCatcherServerInterceptor() grpc.UnaryServerInterceptor {
+ return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
+ mpp := messageProducerPropagationFrom(ctx)
+ if mpp != nil {
+ mpp.fields = ctxlogrus.Extract(ctx).Data
+ }
+ return handler(ctx, req)
+ }
+}
+
+// StreamLogDataCatcherServerInterceptor catches logging data produced by the upper interceptors and
+// propagates it into the holder to pop up it to the HandleRPC method of the PerRPCLogHandler.
+func StreamLogDataCatcherServerInterceptor() grpc.StreamServerInterceptor {
+ return func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
+ ctx := ss.Context()
+ mpp := messageProducerPropagationFrom(ctx)
+ if mpp != nil {
+ mpp.fields = ctxlogrus.Extract(ctx).Data
+ }
+ return handler(srv, ss)
+ }
+}
diff --git a/internal/log/log_test.go b/internal/log/log_test.go
index ab08e6f9a..fcc3d0573 100644
--- a/internal/log/log_test.go
+++ b/internal/log/log_test.go
@@ -3,17 +3,174 @@ package log
import (
"bytes"
"context"
+ "io"
+ "net"
+ "os"
+ "sync"
"testing"
"time"
+ grpcmw "github.com/grpc-ecosystem/go-grpc-middleware"
+ grpcmwlogrus "github.com/grpc-ecosystem/go-grpc-middleware/logging/logrus"
"github.com/grpc-ecosystem/go-grpc-middleware/logging/logrus/ctxlogrus"
"github.com/sirupsen/logrus"
+ "github.com/sirupsen/logrus/hooks/test"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
+ "gitlab.com/gitlab-org/gitaly/v14/client"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/grpcstats"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testassert"
+ "google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/stats"
+ "google.golang.org/grpc/test/grpc_testing"
+ "google.golang.org/protobuf/proto"
)
+func TestPayloadBytes(t *testing.T) {
+ ctx := context.Background()
+
+ logger, hook := test.NewNullLogger()
+
+ opts := []grpc.ServerOption{
+ grpc.StatsHandler(PerRPCLogHandler{
+ Underlying: &grpcstats.PayloadBytes{},
+ FieldProducers: []FieldsProducer{grpcstats.FieldsProducer},
+ }),
+ grpc.UnaryInterceptor(
+ grpcmw.ChainUnaryServer(
+ grpcmwlogrus.UnaryServerInterceptor(
+ logrus.NewEntry(logger),
+ grpcmwlogrus.WithMessageProducer(
+ MessageProducer(
+ PropagationMessageProducer(grpcmwlogrus.DefaultMessageProducer),
+ grpcstats.FieldsProducer,
+ ),
+ ),
+ ),
+ UnaryLogDataCatcherServerInterceptor(),
+ ),
+ ),
+ grpc.StreamInterceptor(
+ grpcmw.ChainStreamServer(
+ grpcmwlogrus.StreamServerInterceptor(
+ logrus.NewEntry(logger),
+ grpcmwlogrus.WithMessageProducer(
+ MessageProducer(
+ PropagationMessageProducer(grpcmwlogrus.DefaultMessageProducer),
+ grpcstats.FieldsProducer,
+ ),
+ ),
+ ),
+ StreamLogDataCatcherServerInterceptor(),
+ ),
+ ),
+ }
+
+ srv := grpc.NewServer(opts...)
+ grpc_testing.RegisterTestServiceServer(srv, testService{})
+ sock, err := os.CreateTemp("", "")
+ require.NoError(t, err)
+ require.NoError(t, sock.Close())
+ require.NoError(t, os.RemoveAll(sock.Name()))
+ t.Cleanup(func() { require.NoError(t, os.RemoveAll(sock.Name())) })
+
+ lis, err := net.Listen("unix", sock.Name())
+ require.NoError(t, err)
+
+ t.Cleanup(srv.GracefulStop)
+ go func() { assert.NoError(t, srv.Serve(lis)) }()
+
+ cc, err := client.DialContext(ctx, "unix://"+sock.Name(), nil)
+ require.NoError(t, err)
+ t.Cleanup(func() { require.NoError(t, cc.Close()) })
+
+ testClient := grpc_testing.NewTestServiceClient(cc)
+ const invocations = 2
+ var wg sync.WaitGroup
+ for i := 0; i < invocations; i++ {
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+
+ resp, err := testClient.UnaryCall(ctx, &grpc_testing.SimpleRequest{Payload: newStubPayload()})
+ if !assert.NoError(t, err) {
+ return
+ }
+ testassert.ProtoEqual(t, newStubPayload(), resp.Payload)
+
+ call, err := testClient.HalfDuplexCall(ctx)
+ if !assert.NoError(t, err) {
+ return
+ }
+
+ done := make(chan struct{})
+ go func() {
+ defer close(done)
+ for {
+ _, err := call.Recv()
+ if err == io.EOF {
+ return
+ }
+ assert.NoError(t, err)
+ }
+ }()
+ assert.NoError(t, call.Send(&grpc_testing.StreamingOutputCallRequest{Payload: newStubPayload()}))
+ assert.NoError(t, call.Send(&grpc_testing.StreamingOutputCallRequest{Payload: newStubPayload()}))
+ assert.NoError(t, call.CloseSend())
+ <-done
+ }()
+ }
+ wg.Wait()
+
+ entries := hook.AllEntries()
+ require.Len(t, entries, 4)
+ var unary, stream int
+ for _, e := range entries {
+ if e.Message == "finished unary call with code OK" {
+ unary++
+ require.EqualValues(t, 8, e.Data["grpc.request.payload_bytes"])
+ require.EqualValues(t, 8, e.Data["grpc.response.payload_bytes"])
+ }
+ if e.Message == "finished streaming call with code OK" {
+ stream++
+ require.EqualValues(t, 16, e.Data["grpc.request.payload_bytes"])
+ require.EqualValues(t, 16, e.Data["grpc.response.payload_bytes"])
+ }
+ }
+ require.Equal(t, invocations, unary)
+ require.Equal(t, invocations, stream)
+}
+
+func newStubPayload() *grpc_testing.Payload {
+ return &grpc_testing.Payload{Body: []byte("stub")}
+}
+
+type testService struct {
+ grpc_testing.UnimplementedTestServiceServer
+}
+
+func (ts testService) UnaryCall(context.Context, *grpc_testing.SimpleRequest) (*grpc_testing.SimpleResponse, error) {
+ return &grpc_testing.SimpleResponse{Payload: newStubPayload()}, nil
+}
+
+func (ts testService) HalfDuplexCall(stream grpc_testing.TestService_HalfDuplexCallServer) error {
+ for {
+ if _, err := stream.Recv(); err != nil {
+ if err == io.EOF {
+ break
+ }
+ return err
+ }
+ }
+
+ resp := &grpc_testing.StreamingOutputCallResponse{Payload: newStubPayload()}
+ if err := stream.Send(proto.Clone(resp).(*grpc_testing.StreamingOutputCallResponse)); err != nil {
+ return err
+ }
+ return stream.Send(proto.Clone(resp).(*grpc_testing.StreamingOutputCallResponse))
+}
+
func TestConfigure(t *testing.T) {
for _, tc := range []struct {
desc string
@@ -215,3 +372,69 @@ func (m *mockStatHandler) TagConn(ctx context.Context, s *stats.ConnTagInfo) con
func (m *mockStatHandler) HandleConn(ctx context.Context, s stats.ConnStats) {
m.Calls["HandleConn"] = append(m.Calls["HandleConn"], s)
}
+
+func TestUnaryLogDataCatcherServerInterceptor(t *testing.T) {
+ handlerStub := func(context.Context, interface{}) (interface{}, error) {
+ return nil, nil
+ }
+
+ t.Run("propagates call", func(t *testing.T) {
+ interceptor := UnaryLogDataCatcherServerInterceptor()
+ resp, err := interceptor(context.Background(), nil, nil, func(ctx context.Context, req interface{}) (interface{}, error) {
+ return 42, assert.AnError
+ })
+
+ assert.Equal(t, 42, resp)
+ assert.Equal(t, assert.AnError, err)
+ })
+
+ t.Run("no logger", func(t *testing.T) {
+ mpp := &messageProducerHolder{}
+ ctx := context.WithValue(context.Background(), messageProducerHolderKey{}, mpp)
+
+ interceptor := UnaryLogDataCatcherServerInterceptor()
+ _, _ = interceptor(ctx, nil, nil, handlerStub)
+ assert.Empty(t, mpp.fields)
+ })
+
+ t.Run("caught", func(t *testing.T) {
+ mpp := &messageProducerHolder{}
+ ctx := context.WithValue(context.Background(), messageProducerHolderKey{}, mpp)
+ ctx = ctxlogrus.ToContext(ctx, logrus.New().WithField("a", 1))
+ interceptor := UnaryLogDataCatcherServerInterceptor()
+ _, _ = interceptor(ctx, nil, nil, handlerStub)
+ assert.Equal(t, logrus.Fields{"a": 1}, mpp.fields)
+ })
+}
+
+func TestStreamLogDataCatcherServerInterceptor(t *testing.T) {
+ t.Run("propagates call", func(t *testing.T) {
+ interceptor := StreamLogDataCatcherServerInterceptor()
+ ss := &grpcmw.WrappedServerStream{WrappedContext: context.Background()}
+ err := interceptor(nil, ss, nil, func(interface{}, grpc.ServerStream) error {
+ return assert.AnError
+ })
+
+ assert.Equal(t, assert.AnError, err)
+ })
+
+ t.Run("no logger", func(t *testing.T) {
+ mpp := &messageProducerHolder{}
+ ctx := context.WithValue(context.Background(), messageProducerHolderKey{}, mpp)
+
+ interceptor := StreamLogDataCatcherServerInterceptor()
+ ss := &grpcmw.WrappedServerStream{WrappedContext: ctx}
+ _ = interceptor(nil, ss, nil, func(interface{}, grpc.ServerStream) error { return nil })
+ })
+
+ t.Run("caught", func(t *testing.T) {
+ mpp := &messageProducerHolder{}
+ ctx := context.WithValue(context.Background(), messageProducerHolderKey{}, mpp)
+ ctx = ctxlogrus.ToContext(ctx, logrus.New().WithField("a", 1))
+
+ interceptor := StreamLogDataCatcherServerInterceptor()
+ ss := &grpcmw.WrappedServerStream{WrappedContext: ctx}
+ _ = interceptor(nil, ss, nil, func(interface{}, grpc.ServerStream) error { return nil })
+ assert.Equal(t, logrus.Fields{"a": 1}, mpp.fields)
+ })
+}