diff options
-rw-r--r-- | internal/gitaly/server/server.go | 32 |
1 files changed, 32 insertions, 0 deletions
diff --git a/internal/gitaly/server/server.go b/internal/gitaly/server/server.go index f7d6d4a93..8c905f608 100644 --- a/internal/gitaly/server/server.go +++ b/internal/gitaly/server/server.go @@ -3,13 +3,17 @@ package server import ( "context" "crypto/tls" + "errors" "fmt" "time" grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware" grpc_logrus "github.com/grpc-ecosystem/go-grpc-middleware/logging/logrus" + "github.com/grpc-ecosystem/go-grpc-middleware/logging/logrus/ctxlogrus" grpc_ctxtags "github.com/grpc-ecosystem/go-grpc-middleware/tags" grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" log "github.com/sirupsen/logrus" "gitlab.com/gitlab-org/gitaly/internal/backchannel" diskcache "gitlab.com/gitlab-org/gitaly/internal/cache" @@ -33,6 +37,26 @@ import ( "google.golang.org/grpc/keepalive" ) +var requestConnectionTypeTotal = promauto.NewCounterVec( + prometheus.CounterOpts{ + Name: "gitaly_request_connection_type_total", + Help: "Number of requests received through each connection type", + }, + []string{"type"}, +) + +func observeConnectionType(ctx context.Context) { + connectionType := "muxed" + if _, err := backchannel.GetPeerID(ctx); errors.Is(err, backchannel.ErrNonMultiplexedConnection) { + connectionType = "normal" + } else if err != nil { + ctxlogrus.Extract(ctx).WithError(err).Error("failed observing connection type") + return + } + + requestConnectionTypeTotal.WithLabelValues(connectionType).Inc() +} + func concurrencyKeyFn(ctx context.Context) string { tags := grpc_ctxtags.Extract(ctx) ctxValue := tags.Values()["grpc.request.repoPath"] @@ -106,6 +130,10 @@ func New(secure bool, cfg config.Cfg, logrusEntry *log.Entry) (*grpc.Server, err lh.StreamInterceptor(), // Should be below auth handler to prevent v2 hmac tokens from timing out while queued grpctracing.StreamServerTracingInterceptor(), cache.StreamInvalidator(diskcache.NewLeaseKeyer(storageLocator), protoregistry.GitalyProtoPreregistered), + grpc.StreamServerInterceptor(func(server interface{}, serverStream grpc.ServerStream, _ *grpc.StreamServerInfo, handler grpc.StreamHandler) error { + observeConnectionType(serverStream.Context()) + return handler(server, serverStream) + }), // Panic handler should remain last so that application panics will be // converted to errors and logged panichandler.StreamPanicHandler, @@ -125,6 +153,10 @@ func New(secure bool, cfg config.Cfg, logrusEntry *log.Entry) (*grpc.Server, err lh.UnaryInterceptor(), // Should be below auth handler to prevent v2 hmac tokens from timing out while queued grpctracing.UnaryServerTracingInterceptor(), cache.UnaryInvalidator(diskcache.NewLeaseKeyer(storageLocator), protoregistry.GitalyProtoPreregistered), + grpc.UnaryServerInterceptor(func(ctx context.Context, req interface{}, _ *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) { + observeConnectionType(ctx) + return handler(ctx, req) + }), // Panic handler should remain last so that application panics will be // converted to errors and logged panichandler.UnaryPanicHandler, |