Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJames Liu <jliu@gitlab.com>2023-10-05 04:31:44 +0300
committerJames Liu <jliu@gitlab.com>2023-10-05 04:31:44 +0300
commit82988a601f0ae35d7c260bfe01d3304e642429db (patch)
tree52a6c52e9677013c68f629438b8e032085dc5612
parente7819007012a85f41eb6a3ca69df3e82bbd06a02 (diff)
parent6a0e4459a16dc0cd6d0007c08d44172b1ce360b7 (diff)
Merge branch 'pks-grpc-middleware-merge-metadatahandler-fieldextractors' into 'master'
grpc/middleware: Absorb functionality of the field extractor into the metadatahandler See merge request https://gitlab.com/gitlab-org/gitaly/-/merge_requests/6440 Merged-by: James Liu <jliu@gitlab.com> Approved-by: James Liu <jliu@gitlab.com> Reviewed-by: karthik nayak <knayak@gitlab.com> Co-authored-by: Patrick Steinhardt <psteinhardt@gitlab.com>
-rw-r--r--internal/cli/praefect/subcmd_track_repository.go3
-rw-r--r--internal/gitaly/server/server.go14
-rw-r--r--internal/grpc/middleware/metadatahandler/metadatahandler.go259
-rw-r--r--internal/grpc/middleware/metadatahandler/metadatahandler_test.go300
-rw-r--r--internal/grpc/middleware/requestinfohandler/requestinfohandler.go334
-rw-r--r--internal/grpc/middleware/requestinfohandler/requestinfohandler_test.go626
-rw-r--r--internal/grpc/middleware/requestinfohandler/testhelper_test.go (renamed from internal/grpc/middleware/metadatahandler/testhelper_test.go)2
-rw-r--r--internal/helper/fieldextractors/fieldextractor.go120
-rw-r--r--internal/praefect/coordinator.go3
-rw-r--r--internal/praefect/coordinator_test.go9
-rw-r--r--internal/praefect/datastore/queue.go4
-rw-r--r--internal/praefect/reconciler/reconciler_test.go3
-rw-r--r--internal/praefect/replicator.go12
-rw-r--r--internal/praefect/replicator_test.go9
-rw-r--r--internal/praefect/server.go14
15 files changed, 987 insertions, 725 deletions
diff --git a/internal/cli/praefect/subcmd_track_repository.go b/internal/cli/praefect/subcmd_track_repository.go
index 207d89542..4b5dd1026 100644
--- a/internal/cli/praefect/subcmd_track_repository.go
+++ b/internal/cli/praefect/subcmd_track_repository.go
@@ -11,7 +11,6 @@ import (
"github.com/urfave/cli/v2"
glcli "gitlab.com/gitlab-org/gitaly/v16/internal/cli"
- "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/middleware/metadatahandler"
"gitlab.com/gitlab-org/gitaly/v16/internal/grpc/protoregistry"
"gitlab.com/gitlab-org/gitaly/v16/internal/log"
"gitlab.com/gitlab-org/gitaly/v16/internal/praefect"
@@ -244,7 +243,7 @@ func (req *trackRepositoryRequest) execRequest(ctx context.Context,
SourceNodeStorage: primary,
TargetNodeStorage: secondary,
},
- Meta: datastore.Params{metadatahandler.CorrelationIDKey: correlationID},
+ Meta: datastore.Params{datastore.CorrelationIDKey: correlationID},
}
if replicateImmediately {
conn, ok := connections[secondary]
diff --git a/internal/gitaly/server/server.go b/internal/gitaly/server/server.go
index 9ffb88c65..d8c1ab9e7 100644
--- a/internal/gitaly/server/server.go
+++ b/internal/gitaly/server/server.go
@@ -6,7 +6,6 @@ import (
"time"
grpcmwlogrus "github.com/grpc-ecosystem/go-grpc-middleware/logging/logrus"
- grpcmwtags "github.com/grpc-ecosystem/go-grpc-middleware/tags"
grpcprometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
"gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/server/auth"
"gitlab.com/gitlab-org/gitaly/v16/internal/grpc/backchannel"
@@ -16,12 +15,11 @@ import (
"gitlab.com/gitlab-org/gitaly/v16/internal/grpc/middleware/cache"
"gitlab.com/gitlab-org/gitaly/v16/internal/grpc/middleware/customfieldshandler"
"gitlab.com/gitlab-org/gitaly/v16/internal/grpc/middleware/featureflag"
- "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/middleware/metadatahandler"
"gitlab.com/gitlab-org/gitaly/v16/internal/grpc/middleware/panichandler"
+ "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/middleware/requestinfohandler"
"gitlab.com/gitlab-org/gitaly/v16/internal/grpc/middleware/sentryhandler"
"gitlab.com/gitlab-org/gitaly/v16/internal/grpc/middleware/statushandler"
"gitlab.com/gitlab-org/gitaly/v16/internal/grpc/protoregistry"
- "gitlab.com/gitlab-org/gitaly/v16/internal/helper/fieldextractors"
gitalylog "gitlab.com/gitlab-org/gitaly/v16/internal/log"
"gitlab.com/gitlab-org/gitaly/v16/internal/structerr"
grpccorrelation "gitlab.com/gitlab-org/labkit/correlation/grpc"
@@ -61,10 +59,6 @@ func (s *GitalyServerFactory) New(secure bool, opts ...Option) (*grpc.Server, er
opt(&cfg)
}
- ctxTagOpts := []grpcmwtags.Option{
- grpcmwtags.WithFieldExtractorForInitialReq(fieldextractors.FieldExtractor),
- }
-
transportCredentials := insecure.NewCredentials()
// If tls config is specified attempt to extract tls options and use it
// as a grpc.ServerOption
@@ -106,9 +100,8 @@ func (s *GitalyServerFactory) New(secure bool, opts ...Option) (*grpc.Server, er
)
streamServerInterceptors := []grpc.StreamServerInterceptor{
- grpcmwtags.StreamServerInterceptor(ctxTagOpts...),
grpccorrelation.StreamServerCorrelationInterceptor(), // Must be above the metadata handler
- metadatahandler.StreamInterceptor,
+ requestinfohandler.StreamInterceptor,
grpcprometheus.StreamServerInterceptor,
customfieldshandler.StreamInterceptor,
s.logger.WithField("component", "gitaly.StreamServerInterceptor").StreamServerInterceptor(
@@ -122,9 +115,8 @@ func (s *GitalyServerFactory) New(secure bool, opts ...Option) (*grpc.Server, er
auth.StreamServerInterceptor(s.cfg.Auth),
}
unaryServerInterceptors := []grpc.UnaryServerInterceptor{
- grpcmwtags.UnaryServerInterceptor(ctxTagOpts...),
grpccorrelation.UnaryServerCorrelationInterceptor(), // Must be above the metadata handler
- metadatahandler.UnaryInterceptor,
+ requestinfohandler.UnaryInterceptor,
grpcprometheus.UnaryServerInterceptor,
customfieldshandler.UnaryInterceptor,
s.logger.WithField("component", "gitaly.UnaryServerInterceptor").UnaryServerInterceptor(
diff --git a/internal/grpc/middleware/metadatahandler/metadatahandler.go b/internal/grpc/middleware/metadatahandler/metadatahandler.go
deleted file mode 100644
index cbaadc56b..000000000
--- a/internal/grpc/middleware/metadatahandler/metadatahandler.go
+++ /dev/null
@@ -1,259 +0,0 @@
-package metadatahandler
-
-import (
- "context"
- "strings"
-
- grpcmwtags "github.com/grpc-ecosystem/go-grpc-middleware/tags"
- grpcprometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
- "github.com/prometheus/client_golang/prometheus"
- "github.com/prometheus/client_golang/prometheus/promauto"
- gitalyauth "gitlab.com/gitlab-org/gitaly/v16/auth"
- "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/protoregistry"
- "gitlab.com/gitlab-org/gitaly/v16/internal/structerr"
- "gitlab.com/gitlab-org/labkit/correlation"
- "google.golang.org/grpc"
- "google.golang.org/grpc/metadata"
-)
-
-var requests = promauto.NewCounterVec(
- prometheus.CounterOpts{
- Name: "gitaly_service_client_requests_total",
- Help: "Counter of client requests received by client, call_site, auth version, response code and deadline_type",
- },
- []string{
- "client_name",
- "grpc_service",
- "grpc_method",
- "call_site",
- "auth_version",
- "grpc_code",
- "deadline_type",
- "method_operation",
- "method_scope",
- },
-)
-
-type metadataTags struct {
- clientName string
- callSite string
- authVersion string
- deadlineType string
- methodOperation string
- methodScope string
-}
-
-// CallSiteKey is the key used in ctx_tags to store the client feature
-const CallSiteKey = "grpc.meta.call_site"
-
-// ClientNameKey is the key used in ctx_tags to store the client name
-const ClientNameKey = "grpc.meta.client_name"
-
-// AuthVersionKey is the key used in ctx_tags to store the auth version
-const AuthVersionKey = "grpc.meta.auth_version"
-
-// DeadlineTypeKey is the key used in ctx_tags to store the deadline type
-const DeadlineTypeKey = "grpc.meta.deadline_type"
-
-// MethodTypeKey is one of "unary", "client_stream", "server_stream", "bidi_stream"
-const MethodTypeKey = "grpc.meta.method_type"
-
-// MethodOperationKey is one of "mutator", "accessor" or "maintenance" and corresponds to the `MethodOptions`
-// extension.
-const MethodOperationKey = "grpc.meta.method_operation"
-
-// MethodScopeKey is one of "repository" or "storage" and corresponds to the `MethodOptions` extension.
-const MethodScopeKey = "grpc.meta.method_scope"
-
-// RemoteIPKey is the key used in ctx_tags to store the remote_ip
-const RemoteIPKey = "remote_ip"
-
-// UserIDKey is the key used in ctx_tags to store the user_id
-const UserIDKey = "user_id"
-
-// UsernameKey is the key used in ctx_tags to store the username
-const UsernameKey = "username"
-
-// CorrelationIDKey is the key used in ctx_tags to store the correlation ID
-const CorrelationIDKey = "correlation_id"
-
-// Unknown client and feature. Matches the prometheus grpc unknown value
-const unknownValue = "unknown"
-
-func getFromMD(md metadata.MD, header string) string {
- values := md[header]
- if len(values) != 1 {
- return ""
- }
-
- return values[0]
-}
-
-// addMetadataTags extracts metadata from the connection headers and add it to the
-// ctx_tags, if it is set. Returns values appropriate for use with prometheus labels,
-// using `unknown` if a value is not set
-func addMetadataTags(ctx context.Context, fullMethod, grpcMethodType string) metadataTags {
- metaTags := metadataTags{
- clientName: unknownValue,
- callSite: unknownValue,
- authVersion: unknownValue,
- deadlineType: unknownValue,
- methodOperation: unknownValue,
- methodScope: unknownValue,
- }
-
- md, ok := metadata.FromIncomingContext(ctx)
- if !ok {
- return metaTags
- }
-
- tags := grpcmwtags.Extract(ctx)
-
- if methodInfo, err := protoregistry.GitalyProtoPreregistered.LookupMethod(fullMethod); err == nil {
- var operation string
- switch methodInfo.Operation {
- case protoregistry.OpAccessor:
- operation = "accessor"
- case protoregistry.OpMutator:
- operation = "mutator"
- case protoregistry.OpMaintenance:
- operation = "maintenance"
- default:
- operation = unknownValue
- }
-
- metaTags.methodOperation = operation
- tags.Set(MethodOperationKey, operation)
-
- var scope string
- switch methodInfo.Scope {
- case protoregistry.ScopeRepository:
- scope = "repository"
- case protoregistry.ScopeStorage:
- scope = "storage"
- default:
- scope = unknownValue
- }
-
- metaTags.methodScope = scope
- tags.Set(MethodScopeKey, scope)
- }
-
- metadata := getFromMD(md, "call_site")
- if metadata != "" {
- metaTags.callSite = metadata
- tags.Set(CallSiteKey, metadata)
- }
-
- metadata = getFromMD(md, "deadline_type")
- _, deadlineSet := ctx.Deadline()
- if !deadlineSet {
- metaTags.deadlineType = "none"
- } else if metadata != "" {
- metaTags.deadlineType = metadata
- }
-
- clientName := correlation.ExtractClientNameFromContext(ctx)
- if clientName != "" {
- metaTags.clientName = clientName
- tags.Set(ClientNameKey, clientName)
- } else {
- metadata = getFromMD(md, "client_name")
- if metadata != "" {
- metaTags.clientName = metadata
- tags.Set(ClientNameKey, metadata)
- }
- }
-
- // Set the deadline and method types in the logs
- tags.Set(DeadlineTypeKey, metaTags.deadlineType)
- tags.Set(MethodTypeKey, grpcMethodType)
-
- authInfo, _ := gitalyauth.ExtractAuthInfo(ctx)
- if authInfo != nil {
- metaTags.authVersion = authInfo.Version
- tags.Set(AuthVersionKey, authInfo.Version)
- }
-
- metadata = getFromMD(md, "remote_ip")
- if metadata != "" {
- tags.Set(RemoteIPKey, metadata)
- }
-
- metadata = getFromMD(md, "user_id")
- if metadata != "" {
- tags.Set(UserIDKey, metadata)
- }
-
- metadata = getFromMD(md, "username")
- if metadata != "" {
- tags.Set(UsernameKey, metadata)
- }
-
- // This is a stop-gap approach to logging correlation_ids
- correlationID := correlation.ExtractFromContext(ctx)
- if correlationID != "" {
- tags.Set(CorrelationIDKey, correlationID)
- }
-
- return metaTags
-}
-
-func extractServiceAndMethodName(fullMethodName string) (string, string) {
- fullMethodName = strings.TrimPrefix(fullMethodName, "/") // remove leading slash
- service, method, ok := strings.Cut(fullMethodName, "/")
- if !ok {
- return unknownValue, unknownValue
- }
- return service, method
-}
-
-func streamRPCType(info *grpc.StreamServerInfo) string {
- if info.IsClientStream && !info.IsServerStream {
- return "client_stream"
- } else if !info.IsClientStream && info.IsServerStream {
- return "server_stream"
- }
- return "bidi_stream"
-}
-
-func reportWithPrometheusLabels(metaTags metadataTags, fullMethod string, err error) {
- grpcCode := structerr.GRPCCode(err)
- serviceName, methodName := extractServiceAndMethodName(fullMethod)
-
- requests.WithLabelValues(
- metaTags.clientName, // client_name
- serviceName, // grpc_service
- methodName, // grpc_method
- metaTags.callSite, // call_site
- metaTags.authVersion, // auth_version
- grpcCode.String(), // grpc_code
- metaTags.deadlineType, // deadline_type
- metaTags.methodOperation,
- metaTags.methodScope,
- ).Inc()
- grpcprometheus.WithConstLabels(prometheus.Labels{"deadline_type": metaTags.deadlineType})
-}
-
-// UnaryInterceptor returns a Unary Interceptor
-func UnaryInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
- metaTags := addMetadataTags(ctx, info.FullMethod, "unary")
-
- res, err := handler(ctx, req)
-
- reportWithPrometheusLabels(metaTags, info.FullMethod, err)
-
- return res, err
-}
-
-// StreamInterceptor returns a Stream Interceptor
-func StreamInterceptor(srv interface{}, stream grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
- ctx := stream.Context()
- metaTags := addMetadataTags(ctx, info.FullMethod, streamRPCType(info))
-
- err := handler(srv, stream)
-
- reportWithPrometheusLabels(metaTags, info.FullMethod, err)
-
- return err
-}
diff --git a/internal/grpc/middleware/metadatahandler/metadatahandler_test.go b/internal/grpc/middleware/metadatahandler/metadatahandler_test.go
deleted file mode 100644
index 460dd2f68..000000000
--- a/internal/grpc/middleware/metadatahandler/metadatahandler_test.go
+++ /dev/null
@@ -1,300 +0,0 @@
-package metadatahandler
-
-import (
- "context"
- "testing"
- "time"
-
- grpcmwtags "github.com/grpc-ecosystem/go-grpc-middleware/tags"
- "github.com/stretchr/testify/require"
- "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper"
- "gitlab.com/gitlab-org/labkit/correlation"
- "google.golang.org/grpc/metadata"
-)
-
-const (
- correlationID = "CORRELATION_ID"
- clientName = "CLIENT_NAME"
-)
-
-func TestAddMetadataTags(t *testing.T) {
- t.Parallel()
-
- baseContext := testhelper.Context(t)
-
- for _, tc := range []struct {
- desc string
- fullMethod string
- metadata metadata.MD
- deadline bool
- expectedMetatags metadataTags
- }{
- {
- desc: "empty metadata",
- metadata: metadata.Pairs(),
- deadline: false,
- expectedMetatags: metadataTags{
- clientName: unknownValue,
- callSite: unknownValue,
- authVersion: unknownValue,
- deadlineType: "none",
- methodOperation: unknownValue,
- methodScope: unknownValue,
- },
- },
- {
- desc: "context containing metadata",
- metadata: metadata.Pairs("call_site", "testsite"),
- deadline: false,
- expectedMetatags: metadataTags{
- clientName: unknownValue,
- callSite: "testsite",
- authVersion: unknownValue,
- deadlineType: "none",
- methodOperation: unknownValue,
- methodScope: unknownValue,
- },
- },
- {
- desc: "context containing metadata and a deadline",
- metadata: metadata.Pairs("call_site", "testsite"),
- deadline: true,
- expectedMetatags: metadataTags{
- clientName: unknownValue,
- callSite: "testsite",
- authVersion: unknownValue,
- deadlineType: unknownValue,
- methodOperation: unknownValue,
- methodScope: unknownValue,
- },
- },
- {
- desc: "context containing metadata and a deadline type",
- metadata: metadata.Pairs("deadline_type", "regular"),
- deadline: true,
- expectedMetatags: metadataTags{
- clientName: unknownValue,
- callSite: unknownValue,
- authVersion: unknownValue,
- deadlineType: "regular",
- methodOperation: unknownValue,
- methodScope: unknownValue,
- },
- },
- {
- desc: "a context without deadline but with deadline type",
- metadata: metadata.Pairs("deadline_type", "regular"),
- deadline: false,
- expectedMetatags: metadataTags{
- clientName: unknownValue,
- callSite: unknownValue,
- authVersion: unknownValue,
- deadlineType: "none",
- methodOperation: unknownValue,
- methodScope: unknownValue,
- },
- },
- {
- desc: "with a context containing metadata",
- metadata: metadata.Pairs("deadline_type", "regular", "client_name", "rails"),
- deadline: true,
- expectedMetatags: metadataTags{
- clientName: "rails",
- callSite: unknownValue,
- authVersion: unknownValue,
- deadlineType: "regular",
- methodOperation: unknownValue,
- methodScope: unknownValue,
- },
- },
- {
- desc: "with unknown method",
- fullMethod: "/gitaly.RepositoryService/UnknownMethod",
- metadata: metadata.Pairs(),
- deadline: false,
- expectedMetatags: metadataTags{
- clientName: unknownValue,
- callSite: unknownValue,
- authVersion: unknownValue,
- deadlineType: "none",
- methodOperation: unknownValue,
- methodScope: unknownValue,
- },
- },
- {
- desc: "with repository-scoped accessor",
- fullMethod: "/gitaly.RepositoryService/ObjectFormat",
- metadata: metadata.Pairs(),
- deadline: false,
- expectedMetatags: metadataTags{
- clientName: unknownValue,
- callSite: unknownValue,
- authVersion: unknownValue,
- deadlineType: "none",
- methodOperation: "accessor",
- methodScope: "repository",
- },
- },
- {
- desc: "with repository-scoped mutator",
- fullMethod: "/gitaly.RepositoryService/CreateRepository",
- metadata: metadata.Pairs(),
- deadline: false,
- expectedMetatags: metadataTags{
- clientName: unknownValue,
- callSite: unknownValue,
- authVersion: unknownValue,
- deadlineType: "none",
- methodOperation: "mutator",
- methodScope: "repository",
- },
- },
- {
- desc: "with repository-scoped maintenance",
- fullMethod: "/gitaly.RepositoryService/OptimizeRepository",
- metadata: metadata.Pairs(),
- deadline: false,
- expectedMetatags: metadataTags{
- clientName: unknownValue,
- callSite: unknownValue,
- authVersion: unknownValue,
- deadlineType: "none",
- methodOperation: "maintenance",
- methodScope: "repository",
- },
- },
- {
- desc: "with repository-scoped maintenance",
- fullMethod: "/gitaly.RepositoryService/OptimizeRepository",
- metadata: metadata.Pairs(),
- deadline: false,
- expectedMetatags: metadataTags{
- clientName: unknownValue,
- callSite: unknownValue,
- authVersion: unknownValue,
- deadlineType: "none",
- methodOperation: "maintenance",
- methodScope: "repository",
- },
- },
- {
- desc: "with storage-scoped accessor",
- fullMethod: "/gitaly.RemoteService/FindRemoteRepository",
- metadata: metadata.Pairs(),
- deadline: false,
- expectedMetatags: metadataTags{
- clientName: unknownValue,
- callSite: unknownValue,
- authVersion: unknownValue,
- deadlineType: "none",
- methodOperation: "accessor",
- methodScope: "storage",
- },
- },
- } {
- tc := tc
-
- t.Run(tc.desc, func(t *testing.T) {
- t.Parallel()
-
- ctx := metadata.NewIncomingContext(baseContext, tc.metadata)
- if tc.deadline {
- var cancel func()
-
- ctx, cancel = context.WithDeadline(ctx, time.Now().Add(50*time.Millisecond))
- defer cancel()
- }
-
- require.Equal(t, tc.expectedMetatags, addMetadataTags(ctx, tc.fullMethod, "unary"))
- })
- }
-}
-
-func TestGRPCTags(t *testing.T) {
- t.Parallel()
-
- ctx := testhelper.Context(t)
- ctx = metadata.NewIncomingContext(
- correlation.ContextWithCorrelation(
- correlation.ContextWithClientName(
- ctx,
- clientName,
- ),
- correlationID,
- ),
- metadata.Pairs(),
- )
-
- interceptor := grpcmwtags.UnaryServerInterceptor()
-
- _, err := interceptor(ctx, nil, nil, func(ctx context.Context, _ interface{}) (interface{}, error) {
- metaTags := addMetadataTags(ctx, "/gitaly.RepositoryService/OptimizeRepository", "unary")
-
- require.Equal(t, metadataTags{
- clientName: clientName,
- callSite: "unknown",
- authVersion: "unknown",
- deadlineType: "none",
- methodOperation: "maintenance",
- methodScope: "repository",
- }, metaTags)
-
- require.Equal(t, map[string]interface{}{
- "correlation_id": correlationID,
- ClientNameKey: clientName,
- DeadlineTypeKey: "none",
- MethodTypeKey: "unary",
- MethodOperationKey: "maintenance",
- MethodScopeKey: "repository",
- }, grpcmwtags.Extract(ctx).Values())
-
- return nil, nil
- })
- require.NoError(t, err)
-}
-
-func TestExtractServiceAndMethodName(t *testing.T) {
- t.Parallel()
-
- for _, tc := range []struct {
- desc string
- fullMethodName string
- expectedService string
- expectedMethod string
- }{
- {
- desc: "blank",
- fullMethodName: "",
- expectedService: unknownValue,
- expectedMethod: unknownValue,
- },
- {
- desc: "normal",
- fullMethodName: "/gitaly.OperationService/method",
- expectedService: "gitaly.OperationService",
- expectedMethod: "method",
- },
- {
- desc: "malformed",
- fullMethodName: "//method",
- expectedService: "",
- expectedMethod: "method",
- },
- {
- desc: "malformed",
- fullMethodName: "/gitaly.OperationService/",
- expectedService: "gitaly.OperationService",
- expectedMethod: "",
- },
- } {
- tc := tc
-
- t.Run(tc.desc, func(t *testing.T) {
- t.Parallel()
-
- service, method := extractServiceAndMethodName(tc.fullMethodName)
- require.Equal(t, tc.expectedService, service)
- require.Equal(t, tc.expectedMethod, method)
- })
- }
-}
diff --git a/internal/grpc/middleware/requestinfohandler/requestinfohandler.go b/internal/grpc/middleware/requestinfohandler/requestinfohandler.go
new file mode 100644
index 000000000..a064f82ea
--- /dev/null
+++ b/internal/grpc/middleware/requestinfohandler/requestinfohandler.go
@@ -0,0 +1,334 @@
+package requestinfohandler
+
+import (
+ "context"
+ "strings"
+
+ grpcmwtags "github.com/grpc-ecosystem/go-grpc-middleware/tags"
+ grpcprometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
+ "github.com/prometheus/client_golang/prometheus"
+ "github.com/prometheus/client_golang/prometheus/promauto"
+ gitalyauth "gitlab.com/gitlab-org/gitaly/v16/auth"
+ "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/protoregistry"
+ "gitlab.com/gitlab-org/gitaly/v16/internal/structerr"
+ "gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb"
+ "gitlab.com/gitlab-org/labkit/correlation"
+ "google.golang.org/grpc"
+ "google.golang.org/grpc/metadata"
+)
+
+var requests = promauto.NewCounterVec(
+ prometheus.CounterOpts{
+ Name: "gitaly_service_client_requests_total",
+ Help: "Counter of client requests received by client, call_site, auth version, response code and deadline_type",
+ },
+ []string{
+ "client_name",
+ "grpc_service",
+ "grpc_method",
+ "call_site",
+ "auth_version",
+ "grpc_code",
+ "deadline_type",
+ "method_operation",
+ "method_scope",
+ },
+)
+
+type requestInfo struct {
+ correlationID string
+ fullMethod string
+ methodType string
+ clientName string
+ remoteIP string
+ userID string
+ userName string
+ callSite string
+ authVersion string
+ deadlineType string
+ methodOperation string
+ methodScope string
+
+ repository *gitalypb.Repository
+ objectPool *gitalypb.ObjectPool
+ storageName string
+}
+
+// Unknown client and feature. Matches the prometheus grpc unknown value
+const unknownValue = "unknown"
+
+func getFromMD(md metadata.MD, header string) string {
+ values := md[header]
+ if len(values) != 1 {
+ return ""
+ }
+
+ return values[0]
+}
+
+// newRequestInfo extracts metadata from the connection headers and add it to the
+// ctx_tags, if it is set. Returns values appropriate for use with prometheus labels,
+// using `unknown` if a value is not set
+func newRequestInfo(ctx context.Context, fullMethod, grpcMethodType string) *requestInfo {
+ info := &requestInfo{
+ fullMethod: fullMethod,
+ methodType: grpcMethodType,
+ clientName: unknownValue,
+ callSite: unknownValue,
+ authVersion: unknownValue,
+ deadlineType: unknownValue,
+ methodOperation: unknownValue,
+ methodScope: unknownValue,
+ }
+
+ md, ok := metadata.FromIncomingContext(ctx)
+ if !ok {
+ return info
+ }
+
+ if methodInfo, err := protoregistry.GitalyProtoPreregistered.LookupMethod(fullMethod); err == nil {
+ var operation string
+ switch methodInfo.Operation {
+ case protoregistry.OpAccessor:
+ operation = "accessor"
+ case protoregistry.OpMutator:
+ operation = "mutator"
+ case protoregistry.OpMaintenance:
+ operation = "maintenance"
+ default:
+ operation = unknownValue
+ }
+
+ info.methodOperation = operation
+
+ var scope string
+ switch methodInfo.Scope {
+ case protoregistry.ScopeRepository:
+ scope = "repository"
+ case protoregistry.ScopeStorage:
+ scope = "storage"
+ default:
+ scope = unknownValue
+ }
+
+ info.methodScope = scope
+ }
+
+ if callSite := getFromMD(md, "call_site"); callSite != "" {
+ info.callSite = callSite
+ }
+
+ if _, deadlineSet := ctx.Deadline(); !deadlineSet {
+ info.deadlineType = "none"
+ } else if deadlineType := getFromMD(md, "deadline_type"); deadlineType != "" {
+ info.deadlineType = deadlineType
+ }
+
+ if clientName := correlation.ExtractClientNameFromContext(ctx); clientName != "" {
+ info.clientName = clientName
+ } else if clientName := getFromMD(md, "client_name"); clientName != "" {
+ info.clientName = clientName
+ }
+
+ if authInfo, _ := gitalyauth.ExtractAuthInfo(ctx); authInfo != nil {
+ info.authVersion = authInfo.Version
+ }
+
+ if remoteIP := getFromMD(md, "remote_ip"); remoteIP != "" {
+ info.remoteIP = remoteIP
+ }
+
+ if userID := getFromMD(md, "user_id"); userID != "" {
+ info.userID = userID
+ }
+
+ if userName := getFromMD(md, "username"); userName != "" {
+ info.userName = userName
+ }
+
+ // This is a stop-gap approach to logging correlation_ids
+ if correlationID := correlation.ExtractFromContext(ctx); correlationID != "" {
+ info.correlationID = correlationID
+ }
+
+ return info
+}
+
+func (i *requestInfo) extractRequestInfo(request any) {
+ type repoScopedRequest interface {
+ GetRepository() *gitalypb.Repository
+ }
+
+ type poolScopedRequest interface {
+ GetObjectPool() *gitalypb.ObjectPool
+ }
+
+ type storageScopedRequest interface {
+ GetStorageName() string
+ }
+
+ if repoScoped, ok := request.(repoScopedRequest); ok {
+ i.repository = repoScoped.GetRepository()
+ }
+
+ if poolScoped, ok := request.(poolScopedRequest); ok {
+ i.objectPool = poolScoped.GetObjectPool()
+ }
+
+ if storageScoped, ok := request.(storageScopedRequest); ok {
+ i.storageName = storageScoped.GetStorageName()
+ }
+}
+
+func (i *requestInfo) injectTags(tags grpcmwtags.Tags) {
+ for key, value := range map[string]string{
+ "grpc.meta.call_site": i.callSite,
+ "grpc.meta.client_name": i.clientName,
+ "grpc.meta.auth_version": i.authVersion,
+ "grpc.meta.deadline_type": i.deadlineType,
+ "grpc.meta.method_type": i.methodType,
+ "grpc.meta.method_operation": i.methodOperation,
+ "grpc.meta.method_scope": i.methodScope,
+ "grpc.request.fullMethod": i.fullMethod,
+ "grpc.request.StorageName": i.storageName,
+ "remote_ip": i.remoteIP,
+ "user_id": i.userID,
+ "username": i.userName,
+ "correlation_id": i.correlationID,
+ } {
+ if value == "" || value == unknownValue {
+ continue
+ }
+
+ tags.Set(key, value)
+ }
+
+ // We handle the repository-related fields separately such that all fields will be set unconditionally,
+ // regardless of whether they are empty or not. This is done to retain all fields even if their values
+ // are empty.
+ if repo := i.repository; repo != nil {
+ for key, value := range map[string]string{
+ "grpc.request.repoStorage": repo.GetStorageName(),
+ "grpc.request.repoPath": repo.GetRelativePath(),
+ "grpc.request.glRepository": repo.GetGlRepository(),
+ "grpc.request.glProjectPath": repo.GetGlProjectPath(),
+ } {
+ tags.Set(key, value)
+ }
+ }
+
+ // Same for the object pool repository.
+ if pool := i.objectPool.GetRepository(); pool != nil {
+ for key, value := range map[string]string{
+ "grpc.request.pool.storage": pool.GetStorageName(),
+ "grpc.request.pool.relativePath": pool.GetRelativePath(),
+ "grpc.request.pool.sourceProjectPath": pool.GetGlProjectPath(),
+ } {
+ tags.Set(key, value)
+ }
+ }
+}
+
+func (i *requestInfo) reportPrometheusMetrics(err error) {
+ grpcCode := structerr.GRPCCode(err)
+ serviceName, methodName := extractServiceAndMethodName(i.fullMethod)
+
+ requests.WithLabelValues(
+ i.clientName, // client_name
+ serviceName, // grpc_service
+ methodName, // grpc_method
+ i.callSite, // call_site
+ i.authVersion, // auth_version
+ grpcCode.String(), // grpc_code
+ i.deadlineType, // deadline_type
+ i.methodOperation,
+ i.methodScope,
+ ).Inc()
+ grpcprometheus.WithConstLabels(prometheus.Labels{"deadline_type": i.deadlineType})
+}
+
+func extractServiceAndMethodName(fullMethodName string) (string, string) {
+ fullMethodName = strings.TrimPrefix(fullMethodName, "/") // remove leading slash
+ service, method, ok := strings.Cut(fullMethodName, "/")
+ if !ok {
+ return unknownValue, unknownValue
+ }
+ return service, method
+}
+
+// UnaryInterceptor returns a Unary Interceptor
+func UnaryInterceptor(ctx context.Context, req interface{}, serverInfo *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
+ tags := grpcmwtags.NewTags()
+ ctx = grpcmwtags.SetInContext(ctx, tags)
+
+ info := newRequestInfo(ctx, serverInfo.FullMethod, "unary")
+ info.extractRequestInfo(req)
+
+ info.injectTags(tags)
+ res, err := handler(ctx, req)
+ info.reportPrometheusMetrics(err)
+
+ return res, err
+}
+
+// StreamInterceptor returns a Stream Interceptor
+func StreamInterceptor(srv interface{}, stream grpc.ServerStream, serverInfo *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
+ tags := grpcmwtags.NewTags()
+ ctx := grpcmwtags.SetInContext(stream.Context(), tags)
+
+ info := newRequestInfo(ctx, serverInfo.FullMethod, streamRPCType(serverInfo))
+
+ // Even though we don't yet have all information set up we already inject the tags here. This is done such that
+ // log messages will at least have the metadata set up correctly in case there is no first request.
+ info.injectTags(tags)
+ err := handler(srv, &wrappedServerStream{
+ ServerStream: stream,
+ ctx: ctx,
+ info: info,
+ tags: tags,
+ initial: true,
+ })
+ info.reportPrometheusMetrics(err)
+
+ return err
+}
+
+func streamRPCType(info *grpc.StreamServerInfo) string {
+ if info.IsClientStream && !info.IsServerStream {
+ return "client_stream"
+ } else if !info.IsClientStream && info.IsServerStream {
+ return "server_stream"
+ }
+ return "bidi_stream"
+}
+
+// wrappedServerStream wraps a grpc.ServerStream such that we can intercept and extract info from the first gRPC request
+// on that stream.
+type wrappedServerStream struct {
+ grpc.ServerStream
+ ctx context.Context
+ tags grpcmwtags.Tags
+ info *requestInfo
+ initial bool
+}
+
+// Context overrides the context of the ServerStream with our own context that has the tags set up.
+func (w *wrappedServerStream) Context() context.Context {
+ return w.ctx
+}
+
+// RecvMsg receives a message from the underlying server stream. The initial received message will be used to extract
+// request information and inject it into the context.
+func (w *wrappedServerStream) RecvMsg(req interface{}) error {
+ err := w.ServerStream.RecvMsg(req)
+
+ if w.initial {
+ w.initial = false
+
+ w.info.extractRequestInfo(req)
+ // Re-inject the tags a second time here.
+ w.info.injectTags(w.tags)
+ }
+
+ return err
+}
diff --git a/internal/grpc/middleware/requestinfohandler/requestinfohandler_test.go b/internal/grpc/middleware/requestinfohandler/requestinfohandler_test.go
new file mode 100644
index 000000000..0974ee6a1
--- /dev/null
+++ b/internal/grpc/middleware/requestinfohandler/requestinfohandler_test.go
@@ -0,0 +1,626 @@
+package requestinfohandler
+
+import (
+ "context"
+ "io"
+ "net"
+ "testing"
+ "time"
+
+ grpcmwtags "github.com/grpc-ecosystem/go-grpc-middleware/tags"
+ "github.com/stretchr/testify/require"
+ "gitlab.com/gitlab-org/gitaly/v16/internal/structerr"
+ "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper"
+ "gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb"
+ "gitlab.com/gitlab-org/labkit/correlation"
+ "google.golang.org/grpc"
+ "google.golang.org/grpc/credentials/insecure"
+ "google.golang.org/grpc/metadata"
+ "google.golang.org/grpc/test/bufconn"
+)
+
+const (
+ correlationID = "CORRELATION_ID"
+ clientName = "CLIENT_NAME"
+)
+
+func TestNewRequestInfo(t *testing.T) {
+ t.Parallel()
+
+ baseContext := testhelper.Context(t)
+
+ for _, tc := range []struct {
+ desc string
+ fullMethod string
+ metadata metadata.MD
+ deadline bool
+ expectedInfo *requestInfo
+ }{
+ {
+ desc: "empty metadata",
+ metadata: metadata.Pairs(),
+ deadline: false,
+ expectedInfo: &requestInfo{
+ methodType: "unary",
+ clientName: unknownValue,
+ callSite: unknownValue,
+ authVersion: unknownValue,
+ deadlineType: "none",
+ methodOperation: unknownValue,
+ methodScope: unknownValue,
+ },
+ },
+ {
+ desc: "context containing metadata",
+ metadata: metadata.Pairs("call_site", "testsite"),
+ deadline: false,
+ expectedInfo: &requestInfo{
+ methodType: "unary",
+ clientName: unknownValue,
+ callSite: "testsite",
+ authVersion: unknownValue,
+ deadlineType: "none",
+ methodOperation: unknownValue,
+ methodScope: unknownValue,
+ },
+ },
+ {
+ desc: "context containing metadata and a deadline",
+ metadata: metadata.Pairs("call_site", "testsite"),
+ deadline: true,
+ expectedInfo: &requestInfo{
+ methodType: "unary",
+ clientName: unknownValue,
+ callSite: "testsite",
+ authVersion: unknownValue,
+ deadlineType: unknownValue,
+ methodOperation: unknownValue,
+ methodScope: unknownValue,
+ },
+ },
+ {
+ desc: "context containing metadata and a deadline type",
+ metadata: metadata.Pairs("deadline_type", "regular"),
+ deadline: true,
+ expectedInfo: &requestInfo{
+ methodType: "unary",
+ clientName: unknownValue,
+ callSite: unknownValue,
+ authVersion: unknownValue,
+ deadlineType: "regular",
+ methodOperation: unknownValue,
+ methodScope: unknownValue,
+ },
+ },
+ {
+ desc: "a context without deadline but with deadline type",
+ metadata: metadata.Pairs("deadline_type", "regular"),
+ deadline: false,
+ expectedInfo: &requestInfo{
+ methodType: "unary",
+ clientName: unknownValue,
+ callSite: unknownValue,
+ authVersion: unknownValue,
+ deadlineType: "none",
+ methodOperation: unknownValue,
+ methodScope: unknownValue,
+ },
+ },
+ {
+ desc: "with a context containing metadata",
+ metadata: metadata.Pairs("deadline_type", "regular", "client_name", "rails"),
+ deadline: true,
+ expectedInfo: &requestInfo{
+ methodType: "unary",
+ clientName: "rails",
+ callSite: unknownValue,
+ authVersion: unknownValue,
+ deadlineType: "regular",
+ methodOperation: unknownValue,
+ methodScope: unknownValue,
+ },
+ },
+ {
+ desc: "with unknown method",
+ fullMethod: "/gitaly.RepositoryService/UnknownMethod",
+ metadata: metadata.Pairs(),
+ deadline: false,
+ expectedInfo: &requestInfo{
+ fullMethod: "/gitaly.RepositoryService/UnknownMethod",
+ methodType: "unary",
+ clientName: unknownValue,
+ callSite: unknownValue,
+ authVersion: unknownValue,
+ deadlineType: "none",
+ methodOperation: unknownValue,
+ methodScope: unknownValue,
+ },
+ },
+ {
+ desc: "with repository-scoped accessor",
+ fullMethod: "/gitaly.RepositoryService/ObjectFormat",
+ metadata: metadata.Pairs(),
+ deadline: false,
+ expectedInfo: &requestInfo{
+ fullMethod: "/gitaly.RepositoryService/ObjectFormat",
+ methodType: "unary",
+ clientName: unknownValue,
+ callSite: unknownValue,
+ authVersion: unknownValue,
+ deadlineType: "none",
+ methodOperation: "accessor",
+ methodScope: "repository",
+ },
+ },
+ {
+ desc: "with repository-scoped mutator",
+ fullMethod: "/gitaly.RepositoryService/CreateRepository",
+ metadata: metadata.Pairs(),
+ deadline: false,
+ expectedInfo: &requestInfo{
+ fullMethod: "/gitaly.RepositoryService/CreateRepository",
+ methodType: "unary",
+ clientName: unknownValue,
+ callSite: unknownValue,
+ authVersion: unknownValue,
+ deadlineType: "none",
+ methodOperation: "mutator",
+ methodScope: "repository",
+ },
+ },
+ {
+ desc: "with repository-scoped maintenance",
+ fullMethod: "/gitaly.RepositoryService/OptimizeRepository",
+ metadata: metadata.Pairs(),
+ deadline: false,
+ expectedInfo: &requestInfo{
+ fullMethod: "/gitaly.RepositoryService/OptimizeRepository",
+ methodType: "unary",
+ clientName: unknownValue,
+ callSite: unknownValue,
+ authVersion: unknownValue,
+ deadlineType: "none",
+ methodOperation: "maintenance",
+ methodScope: "repository",
+ },
+ },
+ {
+ desc: "with repository-scoped maintenance",
+ fullMethod: "/gitaly.RepositoryService/OptimizeRepository",
+ metadata: metadata.Pairs(),
+ deadline: false,
+ expectedInfo: &requestInfo{
+ fullMethod: "/gitaly.RepositoryService/OptimizeRepository",
+ methodType: "unary",
+ clientName: unknownValue,
+ callSite: unknownValue,
+ authVersion: unknownValue,
+ deadlineType: "none",
+ methodOperation: "maintenance",
+ methodScope: "repository",
+ },
+ },
+ {
+ desc: "with storage-scoped accessor",
+ fullMethod: "/gitaly.RemoteService/FindRemoteRepository",
+ metadata: metadata.Pairs(),
+ deadline: false,
+ expectedInfo: &requestInfo{
+ fullMethod: "/gitaly.RemoteService/FindRemoteRepository",
+ methodType: "unary",
+ clientName: unknownValue,
+ callSite: unknownValue,
+ authVersion: unknownValue,
+ deadlineType: "none",
+ methodOperation: "accessor",
+ methodScope: "storage",
+ },
+ },
+ } {
+ tc := tc
+
+ t.Run(tc.desc, func(t *testing.T) {
+ t.Parallel()
+
+ ctx := metadata.NewIncomingContext(baseContext, tc.metadata)
+ if tc.deadline {
+ var cancel func()
+
+ ctx, cancel = context.WithDeadline(ctx, time.Now().Add(50*time.Millisecond))
+ defer cancel()
+ }
+
+ require.Equal(t, tc.expectedInfo, newRequestInfo(ctx, tc.fullMethod, "unary"))
+ })
+ }
+}
+
+func TestGRPCTags(t *testing.T) {
+ t.Parallel()
+
+ ctx := testhelper.Context(t)
+ ctx = metadata.NewIncomingContext(
+ correlation.ContextWithCorrelation(
+ correlation.ContextWithClientName(
+ ctx,
+ clientName,
+ ),
+ correlationID,
+ ),
+ metadata.Pairs(),
+ )
+
+ interceptor := grpcmwtags.UnaryServerInterceptor()
+
+ _, err := interceptor(ctx, nil, nil, func(ctx context.Context, _ interface{}) (interface{}, error) {
+ info := newRequestInfo(ctx, "/gitaly.RepositoryService/OptimizeRepository", "unary")
+
+ tags := grpcmwtags.NewTags()
+ info.injectTags(tags)
+
+ require.Equal(t, &requestInfo{
+ correlationID: correlationID,
+ fullMethod: "/gitaly.RepositoryService/OptimizeRepository",
+ methodType: "unary",
+ clientName: clientName,
+ callSite: "unknown",
+ authVersion: "unknown",
+ deadlineType: "none",
+ methodOperation: "maintenance",
+ methodScope: "repository",
+ }, info)
+
+ require.Equal(t, map[string]interface{}{
+ "correlation_id": correlationID,
+ "grpc.meta.client_name": clientName,
+ "grpc.meta.deadline_type": "none",
+ "grpc.meta.method_type": "unary",
+ "grpc.meta.method_operation": "maintenance",
+ "grpc.meta.method_scope": "repository",
+ "grpc.request.fullMethod": "/gitaly.RepositoryService/OptimizeRepository",
+ }, tags.Values())
+
+ return nil, nil
+ })
+ require.NoError(t, err)
+}
+
+func TestExtractServiceAndMethodName(t *testing.T) {
+ t.Parallel()
+
+ for _, tc := range []struct {
+ desc string
+ fullMethodName string
+ expectedService string
+ expectedMethod string
+ }{
+ {
+ desc: "blank",
+ fullMethodName: "",
+ expectedService: unknownValue,
+ expectedMethod: unknownValue,
+ },
+ {
+ desc: "normal",
+ fullMethodName: "/gitaly.OperationService/method",
+ expectedService: "gitaly.OperationService",
+ expectedMethod: "method",
+ },
+ {
+ desc: "malformed",
+ fullMethodName: "//method",
+ expectedService: "",
+ expectedMethod: "method",
+ },
+ {
+ desc: "malformed",
+ fullMethodName: "/gitaly.OperationService/",
+ expectedService: "gitaly.OperationService",
+ expectedMethod: "",
+ },
+ } {
+ tc := tc
+
+ t.Run(tc.desc, func(t *testing.T) {
+ t.Parallel()
+
+ service, method := extractServiceAndMethodName(tc.fullMethodName)
+ require.Equal(t, tc.expectedService, service)
+ require.Equal(t, tc.expectedMethod, method)
+ })
+ }
+}
+
+func TestInterceptors(t *testing.T) {
+ t.Parallel()
+
+ ctx := testhelper.Context(t)
+
+ for _, tc := range []struct {
+ desc string
+ call func(*testing.T, mockClient)
+ expectedTags map[string]any
+ }{
+ {
+ desc: "unary repository-scoped call",
+ call: func(t *testing.T, client mockClient) {
+ _, err := client.RepositoryInfo(ctx, &gitalypb.RepositoryInfoRequest{
+ Repository: &gitalypb.Repository{
+ StorageName: "storage",
+ RelativePath: "path",
+ GlProjectPath: "glProject",
+ GlRepository: "glRepository",
+ },
+ })
+
+ require.NoError(t, err)
+ },
+ expectedTags: map[string]any{
+ "grpc.meta.deadline_type": "none",
+ "grpc.meta.method_operation": "accessor",
+ "grpc.meta.method_scope": "repository",
+ "grpc.meta.method_type": "unary",
+ "grpc.request.fullMethod": "/gitaly.RepositoryService/RepositoryInfo",
+ "grpc.request.repoStorage": "storage",
+ "grpc.request.repoPath": "path",
+ "grpc.request.glProjectPath": "glProject",
+ "grpc.request.glRepository": "glRepository",
+ },
+ },
+ {
+ desc: "unary repository-scoped call with unset repository",
+ call: func(t *testing.T, client mockClient) {
+ _, err := client.RepositoryInfo(ctx, &gitalypb.RepositoryInfoRequest{
+ Repository: nil,
+ })
+
+ require.NoError(t, err)
+ },
+ expectedTags: map[string]any{
+ "grpc.meta.deadline_type": "none",
+ "grpc.meta.method_operation": "accessor",
+ "grpc.meta.method_scope": "repository",
+ "grpc.meta.method_type": "unary",
+ "grpc.request.fullMethod": "/gitaly.RepositoryService/RepositoryInfo",
+ },
+ },
+ {
+ desc: "unary object-pool-scoped call",
+ call: func(t *testing.T, client mockClient) {
+ _, err := client.FetchIntoObjectPool(ctx, &gitalypb.FetchIntoObjectPoolRequest{
+ ObjectPool: &gitalypb.ObjectPool{
+ Repository: &gitalypb.Repository{
+ StorageName: "storage",
+ RelativePath: "path",
+ GlProjectPath: "glProject",
+ },
+ },
+ })
+
+ require.NoError(t, err)
+ },
+ expectedTags: map[string]any{
+ "grpc.meta.deadline_type": "none",
+ "grpc.meta.method_operation": "mutator",
+ "grpc.meta.method_scope": "repository",
+ "grpc.meta.method_type": "unary",
+ "grpc.request.fullMethod": "/gitaly.ObjectPoolService/FetchIntoObjectPool",
+ "grpc.request.pool.relativePath": "path",
+ "grpc.request.pool.storage": "storage",
+ "grpc.request.pool.sourceProjectPath": "glProject",
+ },
+ },
+ {
+ desc: "unary repository-scoped call with deadline",
+ call: func(t *testing.T, client mockClient) {
+ ctx, cancel := context.WithDeadline(ctx, time.Date(2100, time.January, 1, 12, 0, 0, 0, time.UTC))
+ defer cancel()
+
+ _, err := client.RepositoryInfo(ctx, &gitalypb.RepositoryInfoRequest{
+ Repository: &gitalypb.Repository{
+ StorageName: "storage",
+ RelativePath: "path",
+ GlProjectPath: "glProject",
+ GlRepository: "glRepository",
+ },
+ })
+
+ require.NoError(t, err)
+ },
+ expectedTags: map[string]any{
+ // Note that there is no "deadline: none" field anymore. If we were
+ // to inject the deadline type then it would appear here.
+ "grpc.meta.method_operation": "accessor",
+ "grpc.meta.method_scope": "repository",
+ "grpc.meta.method_type": "unary",
+ "grpc.request.fullMethod": "/gitaly.RepositoryService/RepositoryInfo",
+ "grpc.request.repoStorage": "storage",
+ "grpc.request.repoPath": "path",
+ "grpc.request.glProjectPath": "glProject",
+ "grpc.request.glRepository": "glRepository",
+ },
+ },
+ {
+ desc: "unary repository-scoped call with additional metadata",
+ call: func(t *testing.T, client mockClient) {
+ ctx, cancel := context.WithDeadline(ctx, time.Date(2100, time.January, 1, 12, 0, 0, 0, time.UTC))
+ defer cancel()
+
+ ctx = metadata.NewOutgoingContext(ctx, metadata.MD{
+ "call_site": []string{"callSite"},
+ "deadline_type": []string{"deadlineType"},
+ "client_name": []string{"clientName"},
+ "remote_ip": []string{"remoteIP"},
+ "user_id": []string{"userID"},
+ "username": []string{"userName"},
+ correlation.FieldName: []string{"correlationID"},
+ })
+
+ _, err := client.RepositoryInfo(ctx, &gitalypb.RepositoryInfoRequest{
+ Repository: &gitalypb.Repository{
+ StorageName: "storage",
+ RelativePath: "path",
+ GlProjectPath: "glProject",
+ GlRepository: "glRepository",
+ },
+ })
+
+ require.NoError(t, err)
+ },
+ expectedTags: map[string]any{
+ "grpc.meta.call_site": "callSite",
+ "grpc.meta.deadline_type": "deadlineType",
+ "grpc.meta.client_name": "clientName",
+ "grpc.meta.method_operation": "accessor",
+ "grpc.meta.method_scope": "repository",
+ "grpc.meta.method_type": "unary",
+ "grpc.request.fullMethod": "/gitaly.RepositoryService/RepositoryInfo",
+ "grpc.request.repoStorage": "storage",
+ "grpc.request.repoPath": "path",
+ "grpc.request.glProjectPath": "glProject",
+ "grpc.request.glRepository": "glRepository",
+ "remote_ip": "remoteIP",
+ "user_id": "userID",
+ "username": "userName",
+ },
+ },
+ {
+ desc: "streaming repository-scoped call",
+ call: func(t *testing.T, client mockClient) {
+ stream, err := client.CreateBundleFromRefList(ctx)
+ require.NoError(t, err)
+
+ require.NoError(t, stream.Send(&gitalypb.CreateBundleFromRefListRequest{
+ Repository: &gitalypb.Repository{
+ StorageName: "storage",
+ RelativePath: "path",
+ GlProjectPath: "glProject",
+ GlRepository: "glRepository",
+ },
+ }))
+
+ _, err = stream.Recv()
+ require.NoError(t, err)
+ },
+ expectedTags: map[string]any{
+ "grpc.meta.deadline_type": "none",
+ "grpc.meta.method_operation": "accessor",
+ "grpc.meta.method_scope": "repository",
+ "grpc.meta.method_type": "bidi_stream",
+ "grpc.request.fullMethod": "/gitaly.RepositoryService/CreateBundleFromRefList",
+ "grpc.request.repoStorage": "storage",
+ "grpc.request.repoPath": "path",
+ "grpc.request.glProjectPath": "glProject",
+ "grpc.request.glRepository": "glRepository",
+ },
+ },
+ {
+ desc: "streaming repository-scoped call with missing initial request",
+ call: func(t *testing.T, client mockClient) {
+ stream, err := client.CreateBundleFromRefList(ctx)
+ require.NoError(t, err)
+ require.NoError(t, stream.CloseSend())
+
+ _, err = stream.Recv()
+ testhelper.RequireGrpcError(t, structerr.New("%w", io.EOF), err)
+ },
+ expectedTags: map[string]any{
+ "grpc.meta.deadline_type": "none",
+ "grpc.meta.method_operation": "accessor",
+ "grpc.meta.method_scope": "repository",
+ "grpc.meta.method_type": "bidi_stream",
+ "grpc.request.fullMethod": "/gitaly.RepositoryService/CreateBundleFromRefList",
+ },
+ },
+ } {
+ t.Run(tc.desc, func(t *testing.T) {
+ server, client := setupServer(t, ctx)
+
+ tc.call(t, client)
+
+ if tc.expectedTags == nil {
+ require.Equal(t, nil, tc.expectedTags)
+ } else {
+ require.Equal(t, tc.expectedTags, server.tags.Values())
+ }
+ })
+ }
+}
+
+type mockServer struct {
+ gitalypb.RepositoryServiceServer
+ gitalypb.ObjectPoolServiceServer
+ tags grpcmwtags.Tags
+}
+
+type mockClient struct {
+ gitalypb.RepositoryServiceClient
+ gitalypb.ObjectPoolServiceClient
+}
+
+func setupServer(tb testing.TB, ctx context.Context) (*mockServer, mockClient) {
+ tb.Helper()
+
+ var mockServer mockServer
+
+ server := grpc.NewServer(
+ grpc.ChainUnaryInterceptor(
+ grpcmwtags.UnaryServerInterceptor(),
+ UnaryInterceptor,
+ // This interceptor and the equivalent interceptor for the streaming gRPC calls is responsible
+ // for recording the tags that the preceding interceptor has injected.
+ func(ctx context.Context, req any, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (any, error) {
+ mockServer.tags = grpcmwtags.Extract(ctx)
+ return handler(ctx, req)
+ },
+ ),
+ grpc.ChainStreamInterceptor(
+ grpcmwtags.StreamServerInterceptor(),
+ StreamInterceptor,
+ func(server any, stream grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
+ mockServer.tags = grpcmwtags.Extract(stream.Context())
+ return handler(server, stream)
+ },
+ ),
+ )
+ tb.Cleanup(server.Stop)
+ gitalypb.RegisterRepositoryServiceServer(server, &mockServer)
+ gitalypb.RegisterObjectPoolServiceServer(server, &mockServer)
+
+ listener := bufconn.Listen(1)
+ go testhelper.MustServe(tb, server, listener)
+
+ conn, err := grpc.DialContext(ctx, listener.Addr().String(),
+ grpc.WithTransportCredentials(insecure.NewCredentials()),
+ grpc.WithContextDialer(func(ctx context.Context, _ string) (net.Conn, error) {
+ return listener.DialContext(ctx)
+ }),
+ )
+ require.NoError(tb, err)
+ tb.Cleanup(func() { testhelper.MustClose(tb, conn) })
+
+ return &mockServer, mockClient{
+ RepositoryServiceClient: gitalypb.NewRepositoryServiceClient(conn),
+ ObjectPoolServiceClient: gitalypb.NewObjectPoolServiceClient(conn),
+ }
+}
+
+func (s *mockServer) RepositoryInfo(ctx context.Context, _ *gitalypb.RepositoryInfoRequest) (*gitalypb.RepositoryInfoResponse, error) {
+ return &gitalypb.RepositoryInfoResponse{}, nil
+}
+
+func (s *mockServer) FetchIntoObjectPool(ctx context.Context, _ *gitalypb.FetchIntoObjectPoolRequest) (*gitalypb.FetchIntoObjectPoolResponse, error) {
+ return &gitalypb.FetchIntoObjectPoolResponse{}, nil
+}
+
+func (s *mockServer) CreateBundleFromRefList(stream gitalypb.RepositoryService_CreateBundleFromRefListServer) error {
+ if _, err := stream.Recv(); err != nil {
+ return err
+ }
+
+ if err := stream.Send(&gitalypb.CreateBundleFromRefListResponse{}); err != nil {
+ return err
+ }
+
+ return nil
+}
diff --git a/internal/grpc/middleware/metadatahandler/testhelper_test.go b/internal/grpc/middleware/requestinfohandler/testhelper_test.go
index ee8340678..464eca43d 100644
--- a/internal/grpc/middleware/metadatahandler/testhelper_test.go
+++ b/internal/grpc/middleware/requestinfohandler/testhelper_test.go
@@ -1,4 +1,4 @@
-package metadatahandler
+package requestinfohandler
import (
"testing"
diff --git a/internal/helper/fieldextractors/fieldextractor.go b/internal/helper/fieldextractors/fieldextractor.go
deleted file mode 100644
index a866dc3ea..000000000
--- a/internal/helper/fieldextractors/fieldextractor.go
+++ /dev/null
@@ -1,120 +0,0 @@
-package fieldextractors
-
-import (
- "strings"
-
- "gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb"
-)
-
-type repositoryBasedRequest interface {
- GetRepository() *gitalypb.Repository
-}
-
-type namespaceBasedRequest interface {
- storageBasedRequest
- GetName() string
-}
-
-type storageBasedRequest interface {
- GetStorageName() string
-}
-
-func formatRepoRequest(repo *gitalypb.Repository) map[string]interface{} {
- if repo == nil {
- // Signals that the client did not send a repo through, which
- // will be useful for logging
- return map[string]interface{}{
- "repo": nil,
- }
- }
-
- return map[string]interface{}{
- "repoStorage": repo.StorageName,
- "repoPath": repo.RelativePath,
- "glRepository": repo.GlRepository,
- "glProjectPath": repo.GlProjectPath,
- }
-}
-
-func formatStorageRequest(storageReq storageBasedRequest) map[string]interface{} {
- return map[string]interface{}{
- "StorageName": storageReq.GetStorageName(),
- }
-}
-
-func formatNamespaceRequest(namespaceReq namespaceBasedRequest) map[string]interface{} {
- return map[string]interface{}{
- "StorageName": namespaceReq.GetStorageName(),
- "Name": namespaceReq.GetName(),
- }
-}
-
-func formatRenameNamespaceRequest(renameReq *gitalypb.RenameNamespaceRequest) map[string]interface{} {
- return map[string]interface{}{
- "StorageName": renameReq.GetStorageName(),
- "From": renameReq.GetFrom(),
- "To": renameReq.GetTo(),
- }
-}
-
-// FieldExtractor will extract the relevant fields from an incoming grpc request
-func FieldExtractor(fullMethod string, req interface{}) map[string]interface{} {
- if req == nil {
- return nil
- }
-
- var result map[string]interface{}
-
- switch req := req.(type) {
- case *gitalypb.RenameNamespaceRequest:
- result = formatRenameNamespaceRequest(req)
- case repositoryBasedRequest:
- result = formatRepoRequest(req.GetRepository())
- case namespaceBasedRequest:
- result = formatNamespaceRequest(req)
- case storageBasedRequest:
- result = formatStorageRequest(req)
- }
-
- if result == nil {
- result = make(map[string]interface{})
- }
-
- switch {
- case strings.HasPrefix(fullMethod, "/gitaly.ObjectPoolService/"):
- addObjectPool(req, result)
- }
-
- result["fullMethod"] = fullMethod
-
- return result
-}
-
-type objectPoolRequest interface {
- GetObjectPool() *gitalypb.ObjectPool
-}
-
-func addObjectPool(req interface{}, tags map[string]interface{}) {
- oReq, ok := req.(objectPoolRequest)
- if !ok {
- return
- }
-
- pool := oReq.GetObjectPool()
- if pool == nil {
- return
- }
-
- repo := pool.GetRepository()
- if repo == nil {
- return
- }
-
- for k, v := range map[string]string{
- "pool.storage": repo.StorageName,
- "pool.relativePath": repo.RelativePath,
- "pool.sourceProjectPath": repo.GlProjectPath,
- } {
- tags[k] = v
- }
-}
diff --git a/internal/praefect/coordinator.go b/internal/praefect/coordinator.go
index 76d0b15f2..ac1d554f3 100644
--- a/internal/praefect/coordinator.go
+++ b/internal/praefect/coordinator.go
@@ -11,7 +11,6 @@ import (
"gitlab.com/gitlab-org/gitaly/v16/internal/featureflag"
"gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage"
"gitlab.com/gitlab-org/gitaly/v16/internal/grpc/metadata"
- "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/middleware/metadatahandler"
"gitlab.com/gitlab-org/gitaly/v16/internal/grpc/protoregistry"
"gitlab.com/gitlab-org/gitaly/v16/internal/grpc/proxy"
"gitlab.com/gitlab-org/gitaly/v16/internal/helper"
@@ -1144,7 +1143,7 @@ func (c *Coordinator) newRequestFinalizer(
TargetNodeStorage: secondary,
Params: params,
},
- Meta: datastore.Params{metadatahandler.CorrelationIDKey: correlationID},
+ Meta: datastore.Params{datastore.CorrelationIDKey: correlationID},
}
g.Go(func() error {
diff --git a/internal/praefect/coordinator_test.go b/internal/praefect/coordinator_test.go
index e0eafe0c5..a8c873fd4 100644
--- a/internal/praefect/coordinator_test.go
+++ b/internal/praefect/coordinator_test.go
@@ -25,7 +25,6 @@ import (
"gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage"
"gitlab.com/gitlab-org/gitaly/v16/internal/grpc/client"
gitaly_metadata "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/metadata"
- "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/middleware/metadatahandler"
"gitlab.com/gitlab-org/gitaly/v16/internal/grpc/protoregistry"
"gitlab.com/gitlab-org/gitaly/v16/internal/grpc/proxy"
"gitlab.com/gitlab-org/gitaly/v16/internal/praefect/config"
@@ -224,7 +223,7 @@ func TestStreamDirectorMutator(t *testing.T) {
TargetNodeStorage: secondaryNode.Storage,
SourceNodeStorage: primaryNode.Storage,
},
- Meta: datastore.Params{metadatahandler.CorrelationIDKey: "my-correlation-id"},
+ Meta: datastore.Params{datastore.CorrelationIDKey: "my-correlation-id"},
},
}
},
@@ -276,7 +275,7 @@ func TestStreamDirectorMutator(t *testing.T) {
TargetNodeStorage: secondaryNode.Storage,
SourceNodeStorage: primaryNode.Storage,
},
- Meta: datastore.Params{metadatahandler.CorrelationIDKey: "my-correlation-id"},
+ Meta: datastore.Params{datastore.CorrelationIDKey: "my-correlation-id"},
},
}
},
@@ -1646,7 +1645,7 @@ func TestStreamDirector_repo_creation(t *testing.T) {
TargetNodeStorage: target,
SourceNodeStorage: primaryNode.Storage,
},
- Meta: datastore.Params{metadatahandler.CorrelationIDKey: "my-correlation-id"},
+ Meta: datastore.Params{datastore.CorrelationIDKey: "my-correlation-id"},
})
}
@@ -1763,7 +1762,7 @@ func TestAbsentCorrelationID(t *testing.T) {
require.NoError(t, err)
require.Len(t, jobs, 1)
- require.NotZero(t, jobs[0].Meta[metadatahandler.CorrelationIDKey],
+ require.NotZero(t, jobs[0].Meta[datastore.CorrelationIDKey],
"the coordinator should have generated a random ID")
}
diff --git a/internal/praefect/datastore/queue.go b/internal/praefect/datastore/queue.go
index 3dccd2da8..9865ea744 100644
--- a/internal/praefect/datastore/queue.go
+++ b/internal/praefect/datastore/queue.go
@@ -12,6 +12,10 @@ import (
"gitlab.com/gitlab-org/gitaly/v16/internal/praefect/datastore/glsql"
)
+// CorrelationIDKey is the key that is used to store the correlation ID for a specific replication job as part of the
+// parameters.
+const CorrelationIDKey = "correlation_id"
+
// ReplicationEventExistsError is returned when trying to add an already existing
// replication event into the queue.
type ReplicationEventExistsError struct {
diff --git a/internal/praefect/reconciler/reconciler_test.go b/internal/praefect/reconciler/reconciler_test.go
index a7debac53..0a30858cb 100644
--- a/internal/praefect/reconciler/reconciler_test.go
+++ b/internal/praefect/reconciler/reconciler_test.go
@@ -10,7 +10,6 @@ import (
"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/require"
"gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage"
- "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/middleware/metadatahandler"
"gitlab.com/gitlab-org/gitaly/v16/internal/helper"
"gitlab.com/gitlab-org/gitaly/v16/internal/praefect"
"gitlab.com/gitlab-org/gitaly/v16/internal/praefect/datastore"
@@ -1140,7 +1139,7 @@ func TestReconciler(t *testing.T) {
var job datastore.ReplicationJob
var meta datastore.Params
require.NoError(t, rows.Scan(&job, &meta))
- require.NotEmpty(t, meta[metadatahandler.CorrelationIDKey])
+ require.NotEmpty(t, meta[datastore.CorrelationIDKey])
actualJobs = append(actualJobs, job)
}
diff --git a/internal/praefect/replicator.go b/internal/praefect/replicator.go
index 2d283749e..42b3eebee 100644
--- a/internal/praefect/replicator.go
+++ b/internal/praefect/replicator.go
@@ -10,7 +10,6 @@ import (
"github.com/prometheus/client_golang/prometheus"
"gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/service/repository"
"gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage"
- "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/middleware/metadatahandler"
"gitlab.com/gitlab-org/gitaly/v16/internal/helper"
"gitlab.com/gitlab-org/gitaly/v16/internal/log"
"gitlab.com/gitlab-org/gitaly/v16/internal/praefect/config"
@@ -52,7 +51,7 @@ func (dr defaultReplicator) Replicate(ctx context.Context, event datastore.Repli
logWithVirtualStorage: event.Job.VirtualStorage,
logWithReplTarget: event.Job.TargetNodeStorage,
"replication_job_source": event.Job.SourceNodeStorage,
- logWithCorrID: correlation.ExtractFromContext(ctx),
+ correlation.FieldName: correlation.ExtractFromContext(ctx),
})
generation, err := dr.rs.GetReplicatedGeneration(ctx, event.Job.RepositoryID, event.Job.SourceNodeStorage, event.Job.TargetNodeStorage)
@@ -148,7 +147,7 @@ func (dr defaultReplicator) Destroy(ctx context.Context, event datastore.Replica
return err
}
- dr.log.WithField(logWithCorrID, correlation.ExtractFromContext(ctx)).
+ dr.log.WithField(correlation.FieldName, correlation.ExtractFromContext(ctx)).
WithError(err).
Info("deleted repository did not have a store entry")
}
@@ -190,7 +189,7 @@ func (dr defaultReplicator) Rename(ctx context.Context, event datastore.Replicat
return err
}
- dr.log.WithField(logWithCorrID, correlation.ExtractFromContext(ctx)).
+ dr.log.WithField(correlation.FieldName, correlation.ExtractFromContext(ctx)).
WithError(err).
Info("replicated repository rename does not have a store entry")
}
@@ -299,7 +298,6 @@ func (r ReplMgr) Collect(ch chan<- prometheus.Metric) {
const (
logWithReplTarget = "replication_job_target"
- logWithCorrID = "correlation_id"
logWithVirtualStorage = "virtual_storage"
)
@@ -335,7 +333,7 @@ type (
func getCorrelationID(params datastore.Params) string {
correlationID := ""
- if val, found := params[metadatahandler.CorrelationIDKey]; found {
+ if val, found := params[datastore.CorrelationIDKey]; found {
correlationID, _ = val.(string)
}
return correlationID
@@ -538,7 +536,7 @@ func (r ReplMgr) handleNodeEvent(ctx context.Context, logger log.Logger, targetC
ctx = correlation.ContextWithCorrelation(ctx, cid)
// we want it to be queryable by common `json.correlation_id` filter
- logger = logger.WithField(logWithCorrID, cid)
+ logger = logger.WithField(correlation.FieldName, cid)
// we log all details about the event only once before start of the processing
logger.WithField("event", event).Info("replication job processing started")
diff --git a/internal/praefect/replicator_test.go b/internal/praefect/replicator_test.go
index 02b2458c1..9b1e0651e 100644
--- a/internal/praefect/replicator_test.go
+++ b/internal/praefect/replicator_test.go
@@ -18,7 +18,6 @@ import (
"gitlab.com/gitlab-org/gitaly/v16/internal/git/gittest"
gitalycfg "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/config"
"gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/service/setup"
- "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/middleware/metadatahandler"
"gitlab.com/gitlab-org/gitaly/v16/internal/grpc/protoregistry"
"gitlab.com/gitlab-org/gitaly/v16/internal/helper"
"gitlab.com/gitlab-org/gitaly/v16/internal/log"
@@ -119,7 +118,7 @@ func testReplMgrProcessBacklog(t *testing.T, ctx context.Context) {
},
State: datastore.JobStateReady,
Attempt: 3,
- Meta: datastore.Params{metadatahandler.CorrelationIDKey: "correlation-id"},
+ Meta: datastore.Params{datastore.CorrelationIDKey: "correlation-id"},
})
}
require.Len(t, events, 1)
@@ -174,7 +173,7 @@ func testReplMgrProcessBacklog(t *testing.T, ctx context.Context) {
require.Equal(t,
[]interface{}{"replication job processing started", "virtual", "correlation-id"},
- []interface{}{logEntries[2].Message, logEntries[2].Data["virtual_storage"], logEntries[2].Data[logWithCorrID]},
+ []interface{}{logEntries[2].Message, logEntries[2].Data["virtual_storage"], logEntries[2].Data[correlation.FieldName]},
)
dequeuedEvent := logEntries[2].Data["event"].(datastore.ReplicationEvent)
@@ -183,7 +182,7 @@ func testReplMgrProcessBacklog(t *testing.T, ctx context.Context) {
require.Equal(t,
[]interface{}{"replication job processing finished", "virtual", datastore.JobStateCompleted, "correlation-id"},
- []interface{}{logEntries[3].Message, logEntries[3].Data["virtual_storage"], logEntries[3].Data["new_state"], logEntries[3].Data[logWithCorrID]},
+ []interface{}{logEntries[3].Message, logEntries[3].Data["virtual_storage"], logEntries[3].Data["new_state"], logEntries[3].Data[correlation.FieldName]},
)
replicatedPath := filepath.Join(backupCfg.Storages[0].Path, testRepoProto.GetRelativePath())
@@ -254,7 +253,7 @@ func TestReplicatorDowngradeAttempt(t *testing.T) {
lastEntry := entries[0]
require.Equal(t, logrus.InfoLevel, lastEntry.Level)
require.Equal(t, returnedErr, lastEntry.Data["error"])
- require.Equal(t, "correlation-id", lastEntry.Data[logWithCorrID])
+ require.Equal(t, "correlation-id", lastEntry.Data[correlation.FieldName])
require.Equal(t, tc.expectedMessage, lastEntry.Message)
})
}
diff --git a/internal/praefect/server.go b/internal/praefect/server.go
index 066f2bf78..167688b51 100644
--- a/internal/praefect/server.go
+++ b/internal/praefect/server.go
@@ -8,19 +8,17 @@ import (
"time"
grpcmwlogrus "github.com/grpc-ecosystem/go-grpc-middleware/logging/logrus"
- grpcmwtags "github.com/grpc-ecosystem/go-grpc-middleware/tags"
grpcprometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
"gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/server/auth"
"gitlab.com/gitlab-org/gitaly/v16/internal/grpc/backchannel"
"gitlab.com/gitlab-org/gitaly/v16/internal/grpc/grpcstats"
"gitlab.com/gitlab-org/gitaly/v16/internal/grpc/listenmux"
- "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/middleware/metadatahandler"
"gitlab.com/gitlab-org/gitaly/v16/internal/grpc/middleware/panichandler"
+ "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/middleware/requestinfohandler"
"gitlab.com/gitlab-org/gitaly/v16/internal/grpc/middleware/sentryhandler"
"gitlab.com/gitlab-org/gitaly/v16/internal/grpc/middleware/statushandler"
"gitlab.com/gitlab-org/gitaly/v16/internal/grpc/proxy"
"gitlab.com/gitlab-org/gitaly/v16/internal/grpc/sidechannel"
- "gitlab.com/gitlab-org/gitaly/v16/internal/helper/fieldextractors"
"gitlab.com/gitlab-org/gitaly/v16/internal/log"
"gitlab.com/gitlab-org/gitaly/v16/internal/praefect/config"
"gitlab.com/gitlab-org/gitaly/v16/internal/praefect/datastore"
@@ -67,9 +65,8 @@ func NewBackchannelServerFactory(logger log.Logger, refSvc gitalypb.RefTransacti
func commonUnaryServerInterceptors(logger log.Logger, messageProducer grpcmwlogrus.MessageProducer) []grpc.UnaryServerInterceptor {
return []grpc.UnaryServerInterceptor{
- grpcmwtags.UnaryServerInterceptor(ctxtagsInterceptorOption()),
grpccorrelation.UnaryServerCorrelationInterceptor(), // Must be above the metadata handler
- metadatahandler.UnaryInterceptor,
+ requestinfohandler.UnaryInterceptor,
grpcprometheus.UnaryServerInterceptor,
logger.UnaryServerInterceptor(
grpcmwlogrus.WithTimestampFormat(log.LogTimestampFormat),
@@ -85,10 +82,6 @@ func commonUnaryServerInterceptors(logger log.Logger, messageProducer grpcmwlogr
}
}
-func ctxtagsInterceptorOption() grpcmwtags.Option {
- return grpcmwtags.WithFieldExtractorForInitialReq(fieldextractors.FieldExtractor)
-}
-
// ServerOption is an option that can be passed to `NewGRPCServer()`.
type ServerOption func(cfg *serverConfig)
@@ -136,10 +129,9 @@ func NewGRPCServer(
unaryInterceptors = append(unaryInterceptors, serverCfg.unaryInterceptors...)
streamInterceptors := []grpc.StreamServerInterceptor{
- grpcmwtags.StreamServerInterceptor(ctxtagsInterceptorOption()),
grpccorrelation.StreamServerCorrelationInterceptor(), // Must be above the metadata handler
middleware.MethodTypeStreamInterceptor(deps.Registry, deps.Logger),
- metadatahandler.StreamInterceptor,
+ requestinfohandler.StreamInterceptor,
grpcprometheus.StreamServerInterceptor,
deps.Logger.WithField("component", "praefect.StreamServerInterceptor").StreamServerInterceptor(
grpcmwlogrus.WithTimestampFormat(log.LogTimestampFormat),