diff options
author | James Liu <jliu@gitlab.com> | 2023-10-05 04:31:44 +0300 |
---|---|---|
committer | James Liu <jliu@gitlab.com> | 2023-10-05 04:31:44 +0300 |
commit | 82988a601f0ae35d7c260bfe01d3304e642429db (patch) | |
tree | 52a6c52e9677013c68f629438b8e032085dc5612 | |
parent | e7819007012a85f41eb6a3ca69df3e82bbd06a02 (diff) | |
parent | 6a0e4459a16dc0cd6d0007c08d44172b1ce360b7 (diff) |
Merge branch 'pks-grpc-middleware-merge-metadatahandler-fieldextractors' into 'master'
grpc/middleware: Absorb functionality of the field extractor into the metadatahandler
See merge request https://gitlab.com/gitlab-org/gitaly/-/merge_requests/6440
Merged-by: James Liu <jliu@gitlab.com>
Approved-by: James Liu <jliu@gitlab.com>
Reviewed-by: karthik nayak <knayak@gitlab.com>
Co-authored-by: Patrick Steinhardt <psteinhardt@gitlab.com>
-rw-r--r-- | internal/cli/praefect/subcmd_track_repository.go | 3 | ||||
-rw-r--r-- | internal/gitaly/server/server.go | 14 | ||||
-rw-r--r-- | internal/grpc/middleware/metadatahandler/metadatahandler.go | 259 | ||||
-rw-r--r-- | internal/grpc/middleware/metadatahandler/metadatahandler_test.go | 300 | ||||
-rw-r--r-- | internal/grpc/middleware/requestinfohandler/requestinfohandler.go | 334 | ||||
-rw-r--r-- | internal/grpc/middleware/requestinfohandler/requestinfohandler_test.go | 626 | ||||
-rw-r--r-- | internal/grpc/middleware/requestinfohandler/testhelper_test.go (renamed from internal/grpc/middleware/metadatahandler/testhelper_test.go) | 2 | ||||
-rw-r--r-- | internal/helper/fieldextractors/fieldextractor.go | 120 | ||||
-rw-r--r-- | internal/praefect/coordinator.go | 3 | ||||
-rw-r--r-- | internal/praefect/coordinator_test.go | 9 | ||||
-rw-r--r-- | internal/praefect/datastore/queue.go | 4 | ||||
-rw-r--r-- | internal/praefect/reconciler/reconciler_test.go | 3 | ||||
-rw-r--r-- | internal/praefect/replicator.go | 12 | ||||
-rw-r--r-- | internal/praefect/replicator_test.go | 9 | ||||
-rw-r--r-- | internal/praefect/server.go | 14 |
15 files changed, 987 insertions, 725 deletions
diff --git a/internal/cli/praefect/subcmd_track_repository.go b/internal/cli/praefect/subcmd_track_repository.go index 207d89542..4b5dd1026 100644 --- a/internal/cli/praefect/subcmd_track_repository.go +++ b/internal/cli/praefect/subcmd_track_repository.go @@ -11,7 +11,6 @@ import ( "github.com/urfave/cli/v2" glcli "gitlab.com/gitlab-org/gitaly/v16/internal/cli" - "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/middleware/metadatahandler" "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/protoregistry" "gitlab.com/gitlab-org/gitaly/v16/internal/log" "gitlab.com/gitlab-org/gitaly/v16/internal/praefect" @@ -244,7 +243,7 @@ func (req *trackRepositoryRequest) execRequest(ctx context.Context, SourceNodeStorage: primary, TargetNodeStorage: secondary, }, - Meta: datastore.Params{metadatahandler.CorrelationIDKey: correlationID}, + Meta: datastore.Params{datastore.CorrelationIDKey: correlationID}, } if replicateImmediately { conn, ok := connections[secondary] diff --git a/internal/gitaly/server/server.go b/internal/gitaly/server/server.go index 9ffb88c65..d8c1ab9e7 100644 --- a/internal/gitaly/server/server.go +++ b/internal/gitaly/server/server.go @@ -6,7 +6,6 @@ import ( "time" grpcmwlogrus "github.com/grpc-ecosystem/go-grpc-middleware/logging/logrus" - grpcmwtags "github.com/grpc-ecosystem/go-grpc-middleware/tags" grpcprometheus "github.com/grpc-ecosystem/go-grpc-prometheus" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/server/auth" "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/backchannel" @@ -16,12 +15,11 @@ import ( "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/middleware/cache" "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/middleware/customfieldshandler" "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/middleware/featureflag" - "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/middleware/metadatahandler" "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/middleware/panichandler" + "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/middleware/requestinfohandler" "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/middleware/sentryhandler" "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/middleware/statushandler" "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/protoregistry" - "gitlab.com/gitlab-org/gitaly/v16/internal/helper/fieldextractors" gitalylog "gitlab.com/gitlab-org/gitaly/v16/internal/log" "gitlab.com/gitlab-org/gitaly/v16/internal/structerr" grpccorrelation "gitlab.com/gitlab-org/labkit/correlation/grpc" @@ -61,10 +59,6 @@ func (s *GitalyServerFactory) New(secure bool, opts ...Option) (*grpc.Server, er opt(&cfg) } - ctxTagOpts := []grpcmwtags.Option{ - grpcmwtags.WithFieldExtractorForInitialReq(fieldextractors.FieldExtractor), - } - transportCredentials := insecure.NewCredentials() // If tls config is specified attempt to extract tls options and use it // as a grpc.ServerOption @@ -106,9 +100,8 @@ func (s *GitalyServerFactory) New(secure bool, opts ...Option) (*grpc.Server, er ) streamServerInterceptors := []grpc.StreamServerInterceptor{ - grpcmwtags.StreamServerInterceptor(ctxTagOpts...), grpccorrelation.StreamServerCorrelationInterceptor(), // Must be above the metadata handler - metadatahandler.StreamInterceptor, + requestinfohandler.StreamInterceptor, grpcprometheus.StreamServerInterceptor, customfieldshandler.StreamInterceptor, s.logger.WithField("component", "gitaly.StreamServerInterceptor").StreamServerInterceptor( @@ -122,9 +115,8 @@ func (s *GitalyServerFactory) New(secure bool, opts ...Option) (*grpc.Server, er auth.StreamServerInterceptor(s.cfg.Auth), } unaryServerInterceptors := []grpc.UnaryServerInterceptor{ - grpcmwtags.UnaryServerInterceptor(ctxTagOpts...), grpccorrelation.UnaryServerCorrelationInterceptor(), // Must be above the metadata handler - metadatahandler.UnaryInterceptor, + requestinfohandler.UnaryInterceptor, grpcprometheus.UnaryServerInterceptor, customfieldshandler.UnaryInterceptor, s.logger.WithField("component", "gitaly.UnaryServerInterceptor").UnaryServerInterceptor( diff --git a/internal/grpc/middleware/metadatahandler/metadatahandler.go b/internal/grpc/middleware/metadatahandler/metadatahandler.go deleted file mode 100644 index cbaadc56b..000000000 --- a/internal/grpc/middleware/metadatahandler/metadatahandler.go +++ /dev/null @@ -1,259 +0,0 @@ -package metadatahandler - -import ( - "context" - "strings" - - grpcmwtags "github.com/grpc-ecosystem/go-grpc-middleware/tags" - grpcprometheus "github.com/grpc-ecosystem/go-grpc-prometheus" - "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/client_golang/prometheus/promauto" - gitalyauth "gitlab.com/gitlab-org/gitaly/v16/auth" - "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/protoregistry" - "gitlab.com/gitlab-org/gitaly/v16/internal/structerr" - "gitlab.com/gitlab-org/labkit/correlation" - "google.golang.org/grpc" - "google.golang.org/grpc/metadata" -) - -var requests = promauto.NewCounterVec( - prometheus.CounterOpts{ - Name: "gitaly_service_client_requests_total", - Help: "Counter of client requests received by client, call_site, auth version, response code and deadline_type", - }, - []string{ - "client_name", - "grpc_service", - "grpc_method", - "call_site", - "auth_version", - "grpc_code", - "deadline_type", - "method_operation", - "method_scope", - }, -) - -type metadataTags struct { - clientName string - callSite string - authVersion string - deadlineType string - methodOperation string - methodScope string -} - -// CallSiteKey is the key used in ctx_tags to store the client feature -const CallSiteKey = "grpc.meta.call_site" - -// ClientNameKey is the key used in ctx_tags to store the client name -const ClientNameKey = "grpc.meta.client_name" - -// AuthVersionKey is the key used in ctx_tags to store the auth version -const AuthVersionKey = "grpc.meta.auth_version" - -// DeadlineTypeKey is the key used in ctx_tags to store the deadline type -const DeadlineTypeKey = "grpc.meta.deadline_type" - -// MethodTypeKey is one of "unary", "client_stream", "server_stream", "bidi_stream" -const MethodTypeKey = "grpc.meta.method_type" - -// MethodOperationKey is one of "mutator", "accessor" or "maintenance" and corresponds to the `MethodOptions` -// extension. -const MethodOperationKey = "grpc.meta.method_operation" - -// MethodScopeKey is one of "repository" or "storage" and corresponds to the `MethodOptions` extension. -const MethodScopeKey = "grpc.meta.method_scope" - -// RemoteIPKey is the key used in ctx_tags to store the remote_ip -const RemoteIPKey = "remote_ip" - -// UserIDKey is the key used in ctx_tags to store the user_id -const UserIDKey = "user_id" - -// UsernameKey is the key used in ctx_tags to store the username -const UsernameKey = "username" - -// CorrelationIDKey is the key used in ctx_tags to store the correlation ID -const CorrelationIDKey = "correlation_id" - -// Unknown client and feature. Matches the prometheus grpc unknown value -const unknownValue = "unknown" - -func getFromMD(md metadata.MD, header string) string { - values := md[header] - if len(values) != 1 { - return "" - } - - return values[0] -} - -// addMetadataTags extracts metadata from the connection headers and add it to the -// ctx_tags, if it is set. Returns values appropriate for use with prometheus labels, -// using `unknown` if a value is not set -func addMetadataTags(ctx context.Context, fullMethod, grpcMethodType string) metadataTags { - metaTags := metadataTags{ - clientName: unknownValue, - callSite: unknownValue, - authVersion: unknownValue, - deadlineType: unknownValue, - methodOperation: unknownValue, - methodScope: unknownValue, - } - - md, ok := metadata.FromIncomingContext(ctx) - if !ok { - return metaTags - } - - tags := grpcmwtags.Extract(ctx) - - if methodInfo, err := protoregistry.GitalyProtoPreregistered.LookupMethod(fullMethod); err == nil { - var operation string - switch methodInfo.Operation { - case protoregistry.OpAccessor: - operation = "accessor" - case protoregistry.OpMutator: - operation = "mutator" - case protoregistry.OpMaintenance: - operation = "maintenance" - default: - operation = unknownValue - } - - metaTags.methodOperation = operation - tags.Set(MethodOperationKey, operation) - - var scope string - switch methodInfo.Scope { - case protoregistry.ScopeRepository: - scope = "repository" - case protoregistry.ScopeStorage: - scope = "storage" - default: - scope = unknownValue - } - - metaTags.methodScope = scope - tags.Set(MethodScopeKey, scope) - } - - metadata := getFromMD(md, "call_site") - if metadata != "" { - metaTags.callSite = metadata - tags.Set(CallSiteKey, metadata) - } - - metadata = getFromMD(md, "deadline_type") - _, deadlineSet := ctx.Deadline() - if !deadlineSet { - metaTags.deadlineType = "none" - } else if metadata != "" { - metaTags.deadlineType = metadata - } - - clientName := correlation.ExtractClientNameFromContext(ctx) - if clientName != "" { - metaTags.clientName = clientName - tags.Set(ClientNameKey, clientName) - } else { - metadata = getFromMD(md, "client_name") - if metadata != "" { - metaTags.clientName = metadata - tags.Set(ClientNameKey, metadata) - } - } - - // Set the deadline and method types in the logs - tags.Set(DeadlineTypeKey, metaTags.deadlineType) - tags.Set(MethodTypeKey, grpcMethodType) - - authInfo, _ := gitalyauth.ExtractAuthInfo(ctx) - if authInfo != nil { - metaTags.authVersion = authInfo.Version - tags.Set(AuthVersionKey, authInfo.Version) - } - - metadata = getFromMD(md, "remote_ip") - if metadata != "" { - tags.Set(RemoteIPKey, metadata) - } - - metadata = getFromMD(md, "user_id") - if metadata != "" { - tags.Set(UserIDKey, metadata) - } - - metadata = getFromMD(md, "username") - if metadata != "" { - tags.Set(UsernameKey, metadata) - } - - // This is a stop-gap approach to logging correlation_ids - correlationID := correlation.ExtractFromContext(ctx) - if correlationID != "" { - tags.Set(CorrelationIDKey, correlationID) - } - - return metaTags -} - -func extractServiceAndMethodName(fullMethodName string) (string, string) { - fullMethodName = strings.TrimPrefix(fullMethodName, "/") // remove leading slash - service, method, ok := strings.Cut(fullMethodName, "/") - if !ok { - return unknownValue, unknownValue - } - return service, method -} - -func streamRPCType(info *grpc.StreamServerInfo) string { - if info.IsClientStream && !info.IsServerStream { - return "client_stream" - } else if !info.IsClientStream && info.IsServerStream { - return "server_stream" - } - return "bidi_stream" -} - -func reportWithPrometheusLabels(metaTags metadataTags, fullMethod string, err error) { - grpcCode := structerr.GRPCCode(err) - serviceName, methodName := extractServiceAndMethodName(fullMethod) - - requests.WithLabelValues( - metaTags.clientName, // client_name - serviceName, // grpc_service - methodName, // grpc_method - metaTags.callSite, // call_site - metaTags.authVersion, // auth_version - grpcCode.String(), // grpc_code - metaTags.deadlineType, // deadline_type - metaTags.methodOperation, - metaTags.methodScope, - ).Inc() - grpcprometheus.WithConstLabels(prometheus.Labels{"deadline_type": metaTags.deadlineType}) -} - -// UnaryInterceptor returns a Unary Interceptor -func UnaryInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) { - metaTags := addMetadataTags(ctx, info.FullMethod, "unary") - - res, err := handler(ctx, req) - - reportWithPrometheusLabels(metaTags, info.FullMethod, err) - - return res, err -} - -// StreamInterceptor returns a Stream Interceptor -func StreamInterceptor(srv interface{}, stream grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error { - ctx := stream.Context() - metaTags := addMetadataTags(ctx, info.FullMethod, streamRPCType(info)) - - err := handler(srv, stream) - - reportWithPrometheusLabels(metaTags, info.FullMethod, err) - - return err -} diff --git a/internal/grpc/middleware/metadatahandler/metadatahandler_test.go b/internal/grpc/middleware/metadatahandler/metadatahandler_test.go deleted file mode 100644 index 460dd2f68..000000000 --- a/internal/grpc/middleware/metadatahandler/metadatahandler_test.go +++ /dev/null @@ -1,300 +0,0 @@ -package metadatahandler - -import ( - "context" - "testing" - "time" - - grpcmwtags "github.com/grpc-ecosystem/go-grpc-middleware/tags" - "github.com/stretchr/testify/require" - "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper" - "gitlab.com/gitlab-org/labkit/correlation" - "google.golang.org/grpc/metadata" -) - -const ( - correlationID = "CORRELATION_ID" - clientName = "CLIENT_NAME" -) - -func TestAddMetadataTags(t *testing.T) { - t.Parallel() - - baseContext := testhelper.Context(t) - - for _, tc := range []struct { - desc string - fullMethod string - metadata metadata.MD - deadline bool - expectedMetatags metadataTags - }{ - { - desc: "empty metadata", - metadata: metadata.Pairs(), - deadline: false, - expectedMetatags: metadataTags{ - clientName: unknownValue, - callSite: unknownValue, - authVersion: unknownValue, - deadlineType: "none", - methodOperation: unknownValue, - methodScope: unknownValue, - }, - }, - { - desc: "context containing metadata", - metadata: metadata.Pairs("call_site", "testsite"), - deadline: false, - expectedMetatags: metadataTags{ - clientName: unknownValue, - callSite: "testsite", - authVersion: unknownValue, - deadlineType: "none", - methodOperation: unknownValue, - methodScope: unknownValue, - }, - }, - { - desc: "context containing metadata and a deadline", - metadata: metadata.Pairs("call_site", "testsite"), - deadline: true, - expectedMetatags: metadataTags{ - clientName: unknownValue, - callSite: "testsite", - authVersion: unknownValue, - deadlineType: unknownValue, - methodOperation: unknownValue, - methodScope: unknownValue, - }, - }, - { - desc: "context containing metadata and a deadline type", - metadata: metadata.Pairs("deadline_type", "regular"), - deadline: true, - expectedMetatags: metadataTags{ - clientName: unknownValue, - callSite: unknownValue, - authVersion: unknownValue, - deadlineType: "regular", - methodOperation: unknownValue, - methodScope: unknownValue, - }, - }, - { - desc: "a context without deadline but with deadline type", - metadata: metadata.Pairs("deadline_type", "regular"), - deadline: false, - expectedMetatags: metadataTags{ - clientName: unknownValue, - callSite: unknownValue, - authVersion: unknownValue, - deadlineType: "none", - methodOperation: unknownValue, - methodScope: unknownValue, - }, - }, - { - desc: "with a context containing metadata", - metadata: metadata.Pairs("deadline_type", "regular", "client_name", "rails"), - deadline: true, - expectedMetatags: metadataTags{ - clientName: "rails", - callSite: unknownValue, - authVersion: unknownValue, - deadlineType: "regular", - methodOperation: unknownValue, - methodScope: unknownValue, - }, - }, - { - desc: "with unknown method", - fullMethod: "/gitaly.RepositoryService/UnknownMethod", - metadata: metadata.Pairs(), - deadline: false, - expectedMetatags: metadataTags{ - clientName: unknownValue, - callSite: unknownValue, - authVersion: unknownValue, - deadlineType: "none", - methodOperation: unknownValue, - methodScope: unknownValue, - }, - }, - { - desc: "with repository-scoped accessor", - fullMethod: "/gitaly.RepositoryService/ObjectFormat", - metadata: metadata.Pairs(), - deadline: false, - expectedMetatags: metadataTags{ - clientName: unknownValue, - callSite: unknownValue, - authVersion: unknownValue, - deadlineType: "none", - methodOperation: "accessor", - methodScope: "repository", - }, - }, - { - desc: "with repository-scoped mutator", - fullMethod: "/gitaly.RepositoryService/CreateRepository", - metadata: metadata.Pairs(), - deadline: false, - expectedMetatags: metadataTags{ - clientName: unknownValue, - callSite: unknownValue, - authVersion: unknownValue, - deadlineType: "none", - methodOperation: "mutator", - methodScope: "repository", - }, - }, - { - desc: "with repository-scoped maintenance", - fullMethod: "/gitaly.RepositoryService/OptimizeRepository", - metadata: metadata.Pairs(), - deadline: false, - expectedMetatags: metadataTags{ - clientName: unknownValue, - callSite: unknownValue, - authVersion: unknownValue, - deadlineType: "none", - methodOperation: "maintenance", - methodScope: "repository", - }, - }, - { - desc: "with repository-scoped maintenance", - fullMethod: "/gitaly.RepositoryService/OptimizeRepository", - metadata: metadata.Pairs(), - deadline: false, - expectedMetatags: metadataTags{ - clientName: unknownValue, - callSite: unknownValue, - authVersion: unknownValue, - deadlineType: "none", - methodOperation: "maintenance", - methodScope: "repository", - }, - }, - { - desc: "with storage-scoped accessor", - fullMethod: "/gitaly.RemoteService/FindRemoteRepository", - metadata: metadata.Pairs(), - deadline: false, - expectedMetatags: metadataTags{ - clientName: unknownValue, - callSite: unknownValue, - authVersion: unknownValue, - deadlineType: "none", - methodOperation: "accessor", - methodScope: "storage", - }, - }, - } { - tc := tc - - t.Run(tc.desc, func(t *testing.T) { - t.Parallel() - - ctx := metadata.NewIncomingContext(baseContext, tc.metadata) - if tc.deadline { - var cancel func() - - ctx, cancel = context.WithDeadline(ctx, time.Now().Add(50*time.Millisecond)) - defer cancel() - } - - require.Equal(t, tc.expectedMetatags, addMetadataTags(ctx, tc.fullMethod, "unary")) - }) - } -} - -func TestGRPCTags(t *testing.T) { - t.Parallel() - - ctx := testhelper.Context(t) - ctx = metadata.NewIncomingContext( - correlation.ContextWithCorrelation( - correlation.ContextWithClientName( - ctx, - clientName, - ), - correlationID, - ), - metadata.Pairs(), - ) - - interceptor := grpcmwtags.UnaryServerInterceptor() - - _, err := interceptor(ctx, nil, nil, func(ctx context.Context, _ interface{}) (interface{}, error) { - metaTags := addMetadataTags(ctx, "/gitaly.RepositoryService/OptimizeRepository", "unary") - - require.Equal(t, metadataTags{ - clientName: clientName, - callSite: "unknown", - authVersion: "unknown", - deadlineType: "none", - methodOperation: "maintenance", - methodScope: "repository", - }, metaTags) - - require.Equal(t, map[string]interface{}{ - "correlation_id": correlationID, - ClientNameKey: clientName, - DeadlineTypeKey: "none", - MethodTypeKey: "unary", - MethodOperationKey: "maintenance", - MethodScopeKey: "repository", - }, grpcmwtags.Extract(ctx).Values()) - - return nil, nil - }) - require.NoError(t, err) -} - -func TestExtractServiceAndMethodName(t *testing.T) { - t.Parallel() - - for _, tc := range []struct { - desc string - fullMethodName string - expectedService string - expectedMethod string - }{ - { - desc: "blank", - fullMethodName: "", - expectedService: unknownValue, - expectedMethod: unknownValue, - }, - { - desc: "normal", - fullMethodName: "/gitaly.OperationService/method", - expectedService: "gitaly.OperationService", - expectedMethod: "method", - }, - { - desc: "malformed", - fullMethodName: "//method", - expectedService: "", - expectedMethod: "method", - }, - { - desc: "malformed", - fullMethodName: "/gitaly.OperationService/", - expectedService: "gitaly.OperationService", - expectedMethod: "", - }, - } { - tc := tc - - t.Run(tc.desc, func(t *testing.T) { - t.Parallel() - - service, method := extractServiceAndMethodName(tc.fullMethodName) - require.Equal(t, tc.expectedService, service) - require.Equal(t, tc.expectedMethod, method) - }) - } -} diff --git a/internal/grpc/middleware/requestinfohandler/requestinfohandler.go b/internal/grpc/middleware/requestinfohandler/requestinfohandler.go new file mode 100644 index 000000000..a064f82ea --- /dev/null +++ b/internal/grpc/middleware/requestinfohandler/requestinfohandler.go @@ -0,0 +1,334 @@ +package requestinfohandler + +import ( + "context" + "strings" + + grpcmwtags "github.com/grpc-ecosystem/go-grpc-middleware/tags" + grpcprometheus "github.com/grpc-ecosystem/go-grpc-prometheus" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" + gitalyauth "gitlab.com/gitlab-org/gitaly/v16/auth" + "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/protoregistry" + "gitlab.com/gitlab-org/gitaly/v16/internal/structerr" + "gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb" + "gitlab.com/gitlab-org/labkit/correlation" + "google.golang.org/grpc" + "google.golang.org/grpc/metadata" +) + +var requests = promauto.NewCounterVec( + prometheus.CounterOpts{ + Name: "gitaly_service_client_requests_total", + Help: "Counter of client requests received by client, call_site, auth version, response code and deadline_type", + }, + []string{ + "client_name", + "grpc_service", + "grpc_method", + "call_site", + "auth_version", + "grpc_code", + "deadline_type", + "method_operation", + "method_scope", + }, +) + +type requestInfo struct { + correlationID string + fullMethod string + methodType string + clientName string + remoteIP string + userID string + userName string + callSite string + authVersion string + deadlineType string + methodOperation string + methodScope string + + repository *gitalypb.Repository + objectPool *gitalypb.ObjectPool + storageName string +} + +// Unknown client and feature. Matches the prometheus grpc unknown value +const unknownValue = "unknown" + +func getFromMD(md metadata.MD, header string) string { + values := md[header] + if len(values) != 1 { + return "" + } + + return values[0] +} + +// newRequestInfo extracts metadata from the connection headers and add it to the +// ctx_tags, if it is set. Returns values appropriate for use with prometheus labels, +// using `unknown` if a value is not set +func newRequestInfo(ctx context.Context, fullMethod, grpcMethodType string) *requestInfo { + info := &requestInfo{ + fullMethod: fullMethod, + methodType: grpcMethodType, + clientName: unknownValue, + callSite: unknownValue, + authVersion: unknownValue, + deadlineType: unknownValue, + methodOperation: unknownValue, + methodScope: unknownValue, + } + + md, ok := metadata.FromIncomingContext(ctx) + if !ok { + return info + } + + if methodInfo, err := protoregistry.GitalyProtoPreregistered.LookupMethod(fullMethod); err == nil { + var operation string + switch methodInfo.Operation { + case protoregistry.OpAccessor: + operation = "accessor" + case protoregistry.OpMutator: + operation = "mutator" + case protoregistry.OpMaintenance: + operation = "maintenance" + default: + operation = unknownValue + } + + info.methodOperation = operation + + var scope string + switch methodInfo.Scope { + case protoregistry.ScopeRepository: + scope = "repository" + case protoregistry.ScopeStorage: + scope = "storage" + default: + scope = unknownValue + } + + info.methodScope = scope + } + + if callSite := getFromMD(md, "call_site"); callSite != "" { + info.callSite = callSite + } + + if _, deadlineSet := ctx.Deadline(); !deadlineSet { + info.deadlineType = "none" + } else if deadlineType := getFromMD(md, "deadline_type"); deadlineType != "" { + info.deadlineType = deadlineType + } + + if clientName := correlation.ExtractClientNameFromContext(ctx); clientName != "" { + info.clientName = clientName + } else if clientName := getFromMD(md, "client_name"); clientName != "" { + info.clientName = clientName + } + + if authInfo, _ := gitalyauth.ExtractAuthInfo(ctx); authInfo != nil { + info.authVersion = authInfo.Version + } + + if remoteIP := getFromMD(md, "remote_ip"); remoteIP != "" { + info.remoteIP = remoteIP + } + + if userID := getFromMD(md, "user_id"); userID != "" { + info.userID = userID + } + + if userName := getFromMD(md, "username"); userName != "" { + info.userName = userName + } + + // This is a stop-gap approach to logging correlation_ids + if correlationID := correlation.ExtractFromContext(ctx); correlationID != "" { + info.correlationID = correlationID + } + + return info +} + +func (i *requestInfo) extractRequestInfo(request any) { + type repoScopedRequest interface { + GetRepository() *gitalypb.Repository + } + + type poolScopedRequest interface { + GetObjectPool() *gitalypb.ObjectPool + } + + type storageScopedRequest interface { + GetStorageName() string + } + + if repoScoped, ok := request.(repoScopedRequest); ok { + i.repository = repoScoped.GetRepository() + } + + if poolScoped, ok := request.(poolScopedRequest); ok { + i.objectPool = poolScoped.GetObjectPool() + } + + if storageScoped, ok := request.(storageScopedRequest); ok { + i.storageName = storageScoped.GetStorageName() + } +} + +func (i *requestInfo) injectTags(tags grpcmwtags.Tags) { + for key, value := range map[string]string{ + "grpc.meta.call_site": i.callSite, + "grpc.meta.client_name": i.clientName, + "grpc.meta.auth_version": i.authVersion, + "grpc.meta.deadline_type": i.deadlineType, + "grpc.meta.method_type": i.methodType, + "grpc.meta.method_operation": i.methodOperation, + "grpc.meta.method_scope": i.methodScope, + "grpc.request.fullMethod": i.fullMethod, + "grpc.request.StorageName": i.storageName, + "remote_ip": i.remoteIP, + "user_id": i.userID, + "username": i.userName, + "correlation_id": i.correlationID, + } { + if value == "" || value == unknownValue { + continue + } + + tags.Set(key, value) + } + + // We handle the repository-related fields separately such that all fields will be set unconditionally, + // regardless of whether they are empty or not. This is done to retain all fields even if their values + // are empty. + if repo := i.repository; repo != nil { + for key, value := range map[string]string{ + "grpc.request.repoStorage": repo.GetStorageName(), + "grpc.request.repoPath": repo.GetRelativePath(), + "grpc.request.glRepository": repo.GetGlRepository(), + "grpc.request.glProjectPath": repo.GetGlProjectPath(), + } { + tags.Set(key, value) + } + } + + // Same for the object pool repository. + if pool := i.objectPool.GetRepository(); pool != nil { + for key, value := range map[string]string{ + "grpc.request.pool.storage": pool.GetStorageName(), + "grpc.request.pool.relativePath": pool.GetRelativePath(), + "grpc.request.pool.sourceProjectPath": pool.GetGlProjectPath(), + } { + tags.Set(key, value) + } + } +} + +func (i *requestInfo) reportPrometheusMetrics(err error) { + grpcCode := structerr.GRPCCode(err) + serviceName, methodName := extractServiceAndMethodName(i.fullMethod) + + requests.WithLabelValues( + i.clientName, // client_name + serviceName, // grpc_service + methodName, // grpc_method + i.callSite, // call_site + i.authVersion, // auth_version + grpcCode.String(), // grpc_code + i.deadlineType, // deadline_type + i.methodOperation, + i.methodScope, + ).Inc() + grpcprometheus.WithConstLabels(prometheus.Labels{"deadline_type": i.deadlineType}) +} + +func extractServiceAndMethodName(fullMethodName string) (string, string) { + fullMethodName = strings.TrimPrefix(fullMethodName, "/") // remove leading slash + service, method, ok := strings.Cut(fullMethodName, "/") + if !ok { + return unknownValue, unknownValue + } + return service, method +} + +// UnaryInterceptor returns a Unary Interceptor +func UnaryInterceptor(ctx context.Context, req interface{}, serverInfo *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) { + tags := grpcmwtags.NewTags() + ctx = grpcmwtags.SetInContext(ctx, tags) + + info := newRequestInfo(ctx, serverInfo.FullMethod, "unary") + info.extractRequestInfo(req) + + info.injectTags(tags) + res, err := handler(ctx, req) + info.reportPrometheusMetrics(err) + + return res, err +} + +// StreamInterceptor returns a Stream Interceptor +func StreamInterceptor(srv interface{}, stream grpc.ServerStream, serverInfo *grpc.StreamServerInfo, handler grpc.StreamHandler) error { + tags := grpcmwtags.NewTags() + ctx := grpcmwtags.SetInContext(stream.Context(), tags) + + info := newRequestInfo(ctx, serverInfo.FullMethod, streamRPCType(serverInfo)) + + // Even though we don't yet have all information set up we already inject the tags here. This is done such that + // log messages will at least have the metadata set up correctly in case there is no first request. + info.injectTags(tags) + err := handler(srv, &wrappedServerStream{ + ServerStream: stream, + ctx: ctx, + info: info, + tags: tags, + initial: true, + }) + info.reportPrometheusMetrics(err) + + return err +} + +func streamRPCType(info *grpc.StreamServerInfo) string { + if info.IsClientStream && !info.IsServerStream { + return "client_stream" + } else if !info.IsClientStream && info.IsServerStream { + return "server_stream" + } + return "bidi_stream" +} + +// wrappedServerStream wraps a grpc.ServerStream such that we can intercept and extract info from the first gRPC request +// on that stream. +type wrappedServerStream struct { + grpc.ServerStream + ctx context.Context + tags grpcmwtags.Tags + info *requestInfo + initial bool +} + +// Context overrides the context of the ServerStream with our own context that has the tags set up. +func (w *wrappedServerStream) Context() context.Context { + return w.ctx +} + +// RecvMsg receives a message from the underlying server stream. The initial received message will be used to extract +// request information and inject it into the context. +func (w *wrappedServerStream) RecvMsg(req interface{}) error { + err := w.ServerStream.RecvMsg(req) + + if w.initial { + w.initial = false + + w.info.extractRequestInfo(req) + // Re-inject the tags a second time here. + w.info.injectTags(w.tags) + } + + return err +} diff --git a/internal/grpc/middleware/requestinfohandler/requestinfohandler_test.go b/internal/grpc/middleware/requestinfohandler/requestinfohandler_test.go new file mode 100644 index 000000000..0974ee6a1 --- /dev/null +++ b/internal/grpc/middleware/requestinfohandler/requestinfohandler_test.go @@ -0,0 +1,626 @@ +package requestinfohandler + +import ( + "context" + "io" + "net" + "testing" + "time" + + grpcmwtags "github.com/grpc-ecosystem/go-grpc-middleware/tags" + "github.com/stretchr/testify/require" + "gitlab.com/gitlab-org/gitaly/v16/internal/structerr" + "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper" + "gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb" + "gitlab.com/gitlab-org/labkit/correlation" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" + "google.golang.org/grpc/metadata" + "google.golang.org/grpc/test/bufconn" +) + +const ( + correlationID = "CORRELATION_ID" + clientName = "CLIENT_NAME" +) + +func TestNewRequestInfo(t *testing.T) { + t.Parallel() + + baseContext := testhelper.Context(t) + + for _, tc := range []struct { + desc string + fullMethod string + metadata metadata.MD + deadline bool + expectedInfo *requestInfo + }{ + { + desc: "empty metadata", + metadata: metadata.Pairs(), + deadline: false, + expectedInfo: &requestInfo{ + methodType: "unary", + clientName: unknownValue, + callSite: unknownValue, + authVersion: unknownValue, + deadlineType: "none", + methodOperation: unknownValue, + methodScope: unknownValue, + }, + }, + { + desc: "context containing metadata", + metadata: metadata.Pairs("call_site", "testsite"), + deadline: false, + expectedInfo: &requestInfo{ + methodType: "unary", + clientName: unknownValue, + callSite: "testsite", + authVersion: unknownValue, + deadlineType: "none", + methodOperation: unknownValue, + methodScope: unknownValue, + }, + }, + { + desc: "context containing metadata and a deadline", + metadata: metadata.Pairs("call_site", "testsite"), + deadline: true, + expectedInfo: &requestInfo{ + methodType: "unary", + clientName: unknownValue, + callSite: "testsite", + authVersion: unknownValue, + deadlineType: unknownValue, + methodOperation: unknownValue, + methodScope: unknownValue, + }, + }, + { + desc: "context containing metadata and a deadline type", + metadata: metadata.Pairs("deadline_type", "regular"), + deadline: true, + expectedInfo: &requestInfo{ + methodType: "unary", + clientName: unknownValue, + callSite: unknownValue, + authVersion: unknownValue, + deadlineType: "regular", + methodOperation: unknownValue, + methodScope: unknownValue, + }, + }, + { + desc: "a context without deadline but with deadline type", + metadata: metadata.Pairs("deadline_type", "regular"), + deadline: false, + expectedInfo: &requestInfo{ + methodType: "unary", + clientName: unknownValue, + callSite: unknownValue, + authVersion: unknownValue, + deadlineType: "none", + methodOperation: unknownValue, + methodScope: unknownValue, + }, + }, + { + desc: "with a context containing metadata", + metadata: metadata.Pairs("deadline_type", "regular", "client_name", "rails"), + deadline: true, + expectedInfo: &requestInfo{ + methodType: "unary", + clientName: "rails", + callSite: unknownValue, + authVersion: unknownValue, + deadlineType: "regular", + methodOperation: unknownValue, + methodScope: unknownValue, + }, + }, + { + desc: "with unknown method", + fullMethod: "/gitaly.RepositoryService/UnknownMethod", + metadata: metadata.Pairs(), + deadline: false, + expectedInfo: &requestInfo{ + fullMethod: "/gitaly.RepositoryService/UnknownMethod", + methodType: "unary", + clientName: unknownValue, + callSite: unknownValue, + authVersion: unknownValue, + deadlineType: "none", + methodOperation: unknownValue, + methodScope: unknownValue, + }, + }, + { + desc: "with repository-scoped accessor", + fullMethod: "/gitaly.RepositoryService/ObjectFormat", + metadata: metadata.Pairs(), + deadline: false, + expectedInfo: &requestInfo{ + fullMethod: "/gitaly.RepositoryService/ObjectFormat", + methodType: "unary", + clientName: unknownValue, + callSite: unknownValue, + authVersion: unknownValue, + deadlineType: "none", + methodOperation: "accessor", + methodScope: "repository", + }, + }, + { + desc: "with repository-scoped mutator", + fullMethod: "/gitaly.RepositoryService/CreateRepository", + metadata: metadata.Pairs(), + deadline: false, + expectedInfo: &requestInfo{ + fullMethod: "/gitaly.RepositoryService/CreateRepository", + methodType: "unary", + clientName: unknownValue, + callSite: unknownValue, + authVersion: unknownValue, + deadlineType: "none", + methodOperation: "mutator", + methodScope: "repository", + }, + }, + { + desc: "with repository-scoped maintenance", + fullMethod: "/gitaly.RepositoryService/OptimizeRepository", + metadata: metadata.Pairs(), + deadline: false, + expectedInfo: &requestInfo{ + fullMethod: "/gitaly.RepositoryService/OptimizeRepository", + methodType: "unary", + clientName: unknownValue, + callSite: unknownValue, + authVersion: unknownValue, + deadlineType: "none", + methodOperation: "maintenance", + methodScope: "repository", + }, + }, + { + desc: "with repository-scoped maintenance", + fullMethod: "/gitaly.RepositoryService/OptimizeRepository", + metadata: metadata.Pairs(), + deadline: false, + expectedInfo: &requestInfo{ + fullMethod: "/gitaly.RepositoryService/OptimizeRepository", + methodType: "unary", + clientName: unknownValue, + callSite: unknownValue, + authVersion: unknownValue, + deadlineType: "none", + methodOperation: "maintenance", + methodScope: "repository", + }, + }, + { + desc: "with storage-scoped accessor", + fullMethod: "/gitaly.RemoteService/FindRemoteRepository", + metadata: metadata.Pairs(), + deadline: false, + expectedInfo: &requestInfo{ + fullMethod: "/gitaly.RemoteService/FindRemoteRepository", + methodType: "unary", + clientName: unknownValue, + callSite: unknownValue, + authVersion: unknownValue, + deadlineType: "none", + methodOperation: "accessor", + methodScope: "storage", + }, + }, + } { + tc := tc + + t.Run(tc.desc, func(t *testing.T) { + t.Parallel() + + ctx := metadata.NewIncomingContext(baseContext, tc.metadata) + if tc.deadline { + var cancel func() + + ctx, cancel = context.WithDeadline(ctx, time.Now().Add(50*time.Millisecond)) + defer cancel() + } + + require.Equal(t, tc.expectedInfo, newRequestInfo(ctx, tc.fullMethod, "unary")) + }) + } +} + +func TestGRPCTags(t *testing.T) { + t.Parallel() + + ctx := testhelper.Context(t) + ctx = metadata.NewIncomingContext( + correlation.ContextWithCorrelation( + correlation.ContextWithClientName( + ctx, + clientName, + ), + correlationID, + ), + metadata.Pairs(), + ) + + interceptor := grpcmwtags.UnaryServerInterceptor() + + _, err := interceptor(ctx, nil, nil, func(ctx context.Context, _ interface{}) (interface{}, error) { + info := newRequestInfo(ctx, "/gitaly.RepositoryService/OptimizeRepository", "unary") + + tags := grpcmwtags.NewTags() + info.injectTags(tags) + + require.Equal(t, &requestInfo{ + correlationID: correlationID, + fullMethod: "/gitaly.RepositoryService/OptimizeRepository", + methodType: "unary", + clientName: clientName, + callSite: "unknown", + authVersion: "unknown", + deadlineType: "none", + methodOperation: "maintenance", + methodScope: "repository", + }, info) + + require.Equal(t, map[string]interface{}{ + "correlation_id": correlationID, + "grpc.meta.client_name": clientName, + "grpc.meta.deadline_type": "none", + "grpc.meta.method_type": "unary", + "grpc.meta.method_operation": "maintenance", + "grpc.meta.method_scope": "repository", + "grpc.request.fullMethod": "/gitaly.RepositoryService/OptimizeRepository", + }, tags.Values()) + + return nil, nil + }) + require.NoError(t, err) +} + +func TestExtractServiceAndMethodName(t *testing.T) { + t.Parallel() + + for _, tc := range []struct { + desc string + fullMethodName string + expectedService string + expectedMethod string + }{ + { + desc: "blank", + fullMethodName: "", + expectedService: unknownValue, + expectedMethod: unknownValue, + }, + { + desc: "normal", + fullMethodName: "/gitaly.OperationService/method", + expectedService: "gitaly.OperationService", + expectedMethod: "method", + }, + { + desc: "malformed", + fullMethodName: "//method", + expectedService: "", + expectedMethod: "method", + }, + { + desc: "malformed", + fullMethodName: "/gitaly.OperationService/", + expectedService: "gitaly.OperationService", + expectedMethod: "", + }, + } { + tc := tc + + t.Run(tc.desc, func(t *testing.T) { + t.Parallel() + + service, method := extractServiceAndMethodName(tc.fullMethodName) + require.Equal(t, tc.expectedService, service) + require.Equal(t, tc.expectedMethod, method) + }) + } +} + +func TestInterceptors(t *testing.T) { + t.Parallel() + + ctx := testhelper.Context(t) + + for _, tc := range []struct { + desc string + call func(*testing.T, mockClient) + expectedTags map[string]any + }{ + { + desc: "unary repository-scoped call", + call: func(t *testing.T, client mockClient) { + _, err := client.RepositoryInfo(ctx, &gitalypb.RepositoryInfoRequest{ + Repository: &gitalypb.Repository{ + StorageName: "storage", + RelativePath: "path", + GlProjectPath: "glProject", + GlRepository: "glRepository", + }, + }) + + require.NoError(t, err) + }, + expectedTags: map[string]any{ + "grpc.meta.deadline_type": "none", + "grpc.meta.method_operation": "accessor", + "grpc.meta.method_scope": "repository", + "grpc.meta.method_type": "unary", + "grpc.request.fullMethod": "/gitaly.RepositoryService/RepositoryInfo", + "grpc.request.repoStorage": "storage", + "grpc.request.repoPath": "path", + "grpc.request.glProjectPath": "glProject", + "grpc.request.glRepository": "glRepository", + }, + }, + { + desc: "unary repository-scoped call with unset repository", + call: func(t *testing.T, client mockClient) { + _, err := client.RepositoryInfo(ctx, &gitalypb.RepositoryInfoRequest{ + Repository: nil, + }) + + require.NoError(t, err) + }, + expectedTags: map[string]any{ + "grpc.meta.deadline_type": "none", + "grpc.meta.method_operation": "accessor", + "grpc.meta.method_scope": "repository", + "grpc.meta.method_type": "unary", + "grpc.request.fullMethod": "/gitaly.RepositoryService/RepositoryInfo", + }, + }, + { + desc: "unary object-pool-scoped call", + call: func(t *testing.T, client mockClient) { + _, err := client.FetchIntoObjectPool(ctx, &gitalypb.FetchIntoObjectPoolRequest{ + ObjectPool: &gitalypb.ObjectPool{ + Repository: &gitalypb.Repository{ + StorageName: "storage", + RelativePath: "path", + GlProjectPath: "glProject", + }, + }, + }) + + require.NoError(t, err) + }, + expectedTags: map[string]any{ + "grpc.meta.deadline_type": "none", + "grpc.meta.method_operation": "mutator", + "grpc.meta.method_scope": "repository", + "grpc.meta.method_type": "unary", + "grpc.request.fullMethod": "/gitaly.ObjectPoolService/FetchIntoObjectPool", + "grpc.request.pool.relativePath": "path", + "grpc.request.pool.storage": "storage", + "grpc.request.pool.sourceProjectPath": "glProject", + }, + }, + { + desc: "unary repository-scoped call with deadline", + call: func(t *testing.T, client mockClient) { + ctx, cancel := context.WithDeadline(ctx, time.Date(2100, time.January, 1, 12, 0, 0, 0, time.UTC)) + defer cancel() + + _, err := client.RepositoryInfo(ctx, &gitalypb.RepositoryInfoRequest{ + Repository: &gitalypb.Repository{ + StorageName: "storage", + RelativePath: "path", + GlProjectPath: "glProject", + GlRepository: "glRepository", + }, + }) + + require.NoError(t, err) + }, + expectedTags: map[string]any{ + // Note that there is no "deadline: none" field anymore. If we were + // to inject the deadline type then it would appear here. + "grpc.meta.method_operation": "accessor", + "grpc.meta.method_scope": "repository", + "grpc.meta.method_type": "unary", + "grpc.request.fullMethod": "/gitaly.RepositoryService/RepositoryInfo", + "grpc.request.repoStorage": "storage", + "grpc.request.repoPath": "path", + "grpc.request.glProjectPath": "glProject", + "grpc.request.glRepository": "glRepository", + }, + }, + { + desc: "unary repository-scoped call with additional metadata", + call: func(t *testing.T, client mockClient) { + ctx, cancel := context.WithDeadline(ctx, time.Date(2100, time.January, 1, 12, 0, 0, 0, time.UTC)) + defer cancel() + + ctx = metadata.NewOutgoingContext(ctx, metadata.MD{ + "call_site": []string{"callSite"}, + "deadline_type": []string{"deadlineType"}, + "client_name": []string{"clientName"}, + "remote_ip": []string{"remoteIP"}, + "user_id": []string{"userID"}, + "username": []string{"userName"}, + correlation.FieldName: []string{"correlationID"}, + }) + + _, err := client.RepositoryInfo(ctx, &gitalypb.RepositoryInfoRequest{ + Repository: &gitalypb.Repository{ + StorageName: "storage", + RelativePath: "path", + GlProjectPath: "glProject", + GlRepository: "glRepository", + }, + }) + + require.NoError(t, err) + }, + expectedTags: map[string]any{ + "grpc.meta.call_site": "callSite", + "grpc.meta.deadline_type": "deadlineType", + "grpc.meta.client_name": "clientName", + "grpc.meta.method_operation": "accessor", + "grpc.meta.method_scope": "repository", + "grpc.meta.method_type": "unary", + "grpc.request.fullMethod": "/gitaly.RepositoryService/RepositoryInfo", + "grpc.request.repoStorage": "storage", + "grpc.request.repoPath": "path", + "grpc.request.glProjectPath": "glProject", + "grpc.request.glRepository": "glRepository", + "remote_ip": "remoteIP", + "user_id": "userID", + "username": "userName", + }, + }, + { + desc: "streaming repository-scoped call", + call: func(t *testing.T, client mockClient) { + stream, err := client.CreateBundleFromRefList(ctx) + require.NoError(t, err) + + require.NoError(t, stream.Send(&gitalypb.CreateBundleFromRefListRequest{ + Repository: &gitalypb.Repository{ + StorageName: "storage", + RelativePath: "path", + GlProjectPath: "glProject", + GlRepository: "glRepository", + }, + })) + + _, err = stream.Recv() + require.NoError(t, err) + }, + expectedTags: map[string]any{ + "grpc.meta.deadline_type": "none", + "grpc.meta.method_operation": "accessor", + "grpc.meta.method_scope": "repository", + "grpc.meta.method_type": "bidi_stream", + "grpc.request.fullMethod": "/gitaly.RepositoryService/CreateBundleFromRefList", + "grpc.request.repoStorage": "storage", + "grpc.request.repoPath": "path", + "grpc.request.glProjectPath": "glProject", + "grpc.request.glRepository": "glRepository", + }, + }, + { + desc: "streaming repository-scoped call with missing initial request", + call: func(t *testing.T, client mockClient) { + stream, err := client.CreateBundleFromRefList(ctx) + require.NoError(t, err) + require.NoError(t, stream.CloseSend()) + + _, err = stream.Recv() + testhelper.RequireGrpcError(t, structerr.New("%w", io.EOF), err) + }, + expectedTags: map[string]any{ + "grpc.meta.deadline_type": "none", + "grpc.meta.method_operation": "accessor", + "grpc.meta.method_scope": "repository", + "grpc.meta.method_type": "bidi_stream", + "grpc.request.fullMethod": "/gitaly.RepositoryService/CreateBundleFromRefList", + }, + }, + } { + t.Run(tc.desc, func(t *testing.T) { + server, client := setupServer(t, ctx) + + tc.call(t, client) + + if tc.expectedTags == nil { + require.Equal(t, nil, tc.expectedTags) + } else { + require.Equal(t, tc.expectedTags, server.tags.Values()) + } + }) + } +} + +type mockServer struct { + gitalypb.RepositoryServiceServer + gitalypb.ObjectPoolServiceServer + tags grpcmwtags.Tags +} + +type mockClient struct { + gitalypb.RepositoryServiceClient + gitalypb.ObjectPoolServiceClient +} + +func setupServer(tb testing.TB, ctx context.Context) (*mockServer, mockClient) { + tb.Helper() + + var mockServer mockServer + + server := grpc.NewServer( + grpc.ChainUnaryInterceptor( + grpcmwtags.UnaryServerInterceptor(), + UnaryInterceptor, + // This interceptor and the equivalent interceptor for the streaming gRPC calls is responsible + // for recording the tags that the preceding interceptor has injected. + func(ctx context.Context, req any, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (any, error) { + mockServer.tags = grpcmwtags.Extract(ctx) + return handler(ctx, req) + }, + ), + grpc.ChainStreamInterceptor( + grpcmwtags.StreamServerInterceptor(), + StreamInterceptor, + func(server any, stream grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error { + mockServer.tags = grpcmwtags.Extract(stream.Context()) + return handler(server, stream) + }, + ), + ) + tb.Cleanup(server.Stop) + gitalypb.RegisterRepositoryServiceServer(server, &mockServer) + gitalypb.RegisterObjectPoolServiceServer(server, &mockServer) + + listener := bufconn.Listen(1) + go testhelper.MustServe(tb, server, listener) + + conn, err := grpc.DialContext(ctx, listener.Addr().String(), + grpc.WithTransportCredentials(insecure.NewCredentials()), + grpc.WithContextDialer(func(ctx context.Context, _ string) (net.Conn, error) { + return listener.DialContext(ctx) + }), + ) + require.NoError(tb, err) + tb.Cleanup(func() { testhelper.MustClose(tb, conn) }) + + return &mockServer, mockClient{ + RepositoryServiceClient: gitalypb.NewRepositoryServiceClient(conn), + ObjectPoolServiceClient: gitalypb.NewObjectPoolServiceClient(conn), + } +} + +func (s *mockServer) RepositoryInfo(ctx context.Context, _ *gitalypb.RepositoryInfoRequest) (*gitalypb.RepositoryInfoResponse, error) { + return &gitalypb.RepositoryInfoResponse{}, nil +} + +func (s *mockServer) FetchIntoObjectPool(ctx context.Context, _ *gitalypb.FetchIntoObjectPoolRequest) (*gitalypb.FetchIntoObjectPoolResponse, error) { + return &gitalypb.FetchIntoObjectPoolResponse{}, nil +} + +func (s *mockServer) CreateBundleFromRefList(stream gitalypb.RepositoryService_CreateBundleFromRefListServer) error { + if _, err := stream.Recv(); err != nil { + return err + } + + if err := stream.Send(&gitalypb.CreateBundleFromRefListResponse{}); err != nil { + return err + } + + return nil +} diff --git a/internal/grpc/middleware/metadatahandler/testhelper_test.go b/internal/grpc/middleware/requestinfohandler/testhelper_test.go index ee8340678..464eca43d 100644 --- a/internal/grpc/middleware/metadatahandler/testhelper_test.go +++ b/internal/grpc/middleware/requestinfohandler/testhelper_test.go @@ -1,4 +1,4 @@ -package metadatahandler +package requestinfohandler import ( "testing" diff --git a/internal/helper/fieldextractors/fieldextractor.go b/internal/helper/fieldextractors/fieldextractor.go deleted file mode 100644 index a866dc3ea..000000000 --- a/internal/helper/fieldextractors/fieldextractor.go +++ /dev/null @@ -1,120 +0,0 @@ -package fieldextractors - -import ( - "strings" - - "gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb" -) - -type repositoryBasedRequest interface { - GetRepository() *gitalypb.Repository -} - -type namespaceBasedRequest interface { - storageBasedRequest - GetName() string -} - -type storageBasedRequest interface { - GetStorageName() string -} - -func formatRepoRequest(repo *gitalypb.Repository) map[string]interface{} { - if repo == nil { - // Signals that the client did not send a repo through, which - // will be useful for logging - return map[string]interface{}{ - "repo": nil, - } - } - - return map[string]interface{}{ - "repoStorage": repo.StorageName, - "repoPath": repo.RelativePath, - "glRepository": repo.GlRepository, - "glProjectPath": repo.GlProjectPath, - } -} - -func formatStorageRequest(storageReq storageBasedRequest) map[string]interface{} { - return map[string]interface{}{ - "StorageName": storageReq.GetStorageName(), - } -} - -func formatNamespaceRequest(namespaceReq namespaceBasedRequest) map[string]interface{} { - return map[string]interface{}{ - "StorageName": namespaceReq.GetStorageName(), - "Name": namespaceReq.GetName(), - } -} - -func formatRenameNamespaceRequest(renameReq *gitalypb.RenameNamespaceRequest) map[string]interface{} { - return map[string]interface{}{ - "StorageName": renameReq.GetStorageName(), - "From": renameReq.GetFrom(), - "To": renameReq.GetTo(), - } -} - -// FieldExtractor will extract the relevant fields from an incoming grpc request -func FieldExtractor(fullMethod string, req interface{}) map[string]interface{} { - if req == nil { - return nil - } - - var result map[string]interface{} - - switch req := req.(type) { - case *gitalypb.RenameNamespaceRequest: - result = formatRenameNamespaceRequest(req) - case repositoryBasedRequest: - result = formatRepoRequest(req.GetRepository()) - case namespaceBasedRequest: - result = formatNamespaceRequest(req) - case storageBasedRequest: - result = formatStorageRequest(req) - } - - if result == nil { - result = make(map[string]interface{}) - } - - switch { - case strings.HasPrefix(fullMethod, "/gitaly.ObjectPoolService/"): - addObjectPool(req, result) - } - - result["fullMethod"] = fullMethod - - return result -} - -type objectPoolRequest interface { - GetObjectPool() *gitalypb.ObjectPool -} - -func addObjectPool(req interface{}, tags map[string]interface{}) { - oReq, ok := req.(objectPoolRequest) - if !ok { - return - } - - pool := oReq.GetObjectPool() - if pool == nil { - return - } - - repo := pool.GetRepository() - if repo == nil { - return - } - - for k, v := range map[string]string{ - "pool.storage": repo.StorageName, - "pool.relativePath": repo.RelativePath, - "pool.sourceProjectPath": repo.GlProjectPath, - } { - tags[k] = v - } -} diff --git a/internal/praefect/coordinator.go b/internal/praefect/coordinator.go index 76d0b15f2..ac1d554f3 100644 --- a/internal/praefect/coordinator.go +++ b/internal/praefect/coordinator.go @@ -11,7 +11,6 @@ import ( "gitlab.com/gitlab-org/gitaly/v16/internal/featureflag" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage" "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/metadata" - "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/middleware/metadatahandler" "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/protoregistry" "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/proxy" "gitlab.com/gitlab-org/gitaly/v16/internal/helper" @@ -1144,7 +1143,7 @@ func (c *Coordinator) newRequestFinalizer( TargetNodeStorage: secondary, Params: params, }, - Meta: datastore.Params{metadatahandler.CorrelationIDKey: correlationID}, + Meta: datastore.Params{datastore.CorrelationIDKey: correlationID}, } g.Go(func() error { diff --git a/internal/praefect/coordinator_test.go b/internal/praefect/coordinator_test.go index e0eafe0c5..a8c873fd4 100644 --- a/internal/praefect/coordinator_test.go +++ b/internal/praefect/coordinator_test.go @@ -25,7 +25,6 @@ import ( "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage" "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/client" gitaly_metadata "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/metadata" - "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/middleware/metadatahandler" "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/protoregistry" "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/proxy" "gitlab.com/gitlab-org/gitaly/v16/internal/praefect/config" @@ -224,7 +223,7 @@ func TestStreamDirectorMutator(t *testing.T) { TargetNodeStorage: secondaryNode.Storage, SourceNodeStorage: primaryNode.Storage, }, - Meta: datastore.Params{metadatahandler.CorrelationIDKey: "my-correlation-id"}, + Meta: datastore.Params{datastore.CorrelationIDKey: "my-correlation-id"}, }, } }, @@ -276,7 +275,7 @@ func TestStreamDirectorMutator(t *testing.T) { TargetNodeStorage: secondaryNode.Storage, SourceNodeStorage: primaryNode.Storage, }, - Meta: datastore.Params{metadatahandler.CorrelationIDKey: "my-correlation-id"}, + Meta: datastore.Params{datastore.CorrelationIDKey: "my-correlation-id"}, }, } }, @@ -1646,7 +1645,7 @@ func TestStreamDirector_repo_creation(t *testing.T) { TargetNodeStorage: target, SourceNodeStorage: primaryNode.Storage, }, - Meta: datastore.Params{metadatahandler.CorrelationIDKey: "my-correlation-id"}, + Meta: datastore.Params{datastore.CorrelationIDKey: "my-correlation-id"}, }) } @@ -1763,7 +1762,7 @@ func TestAbsentCorrelationID(t *testing.T) { require.NoError(t, err) require.Len(t, jobs, 1) - require.NotZero(t, jobs[0].Meta[metadatahandler.CorrelationIDKey], + require.NotZero(t, jobs[0].Meta[datastore.CorrelationIDKey], "the coordinator should have generated a random ID") } diff --git a/internal/praefect/datastore/queue.go b/internal/praefect/datastore/queue.go index 3dccd2da8..9865ea744 100644 --- a/internal/praefect/datastore/queue.go +++ b/internal/praefect/datastore/queue.go @@ -12,6 +12,10 @@ import ( "gitlab.com/gitlab-org/gitaly/v16/internal/praefect/datastore/glsql" ) +// CorrelationIDKey is the key that is used to store the correlation ID for a specific replication job as part of the +// parameters. +const CorrelationIDKey = "correlation_id" + // ReplicationEventExistsError is returned when trying to add an already existing // replication event into the queue. type ReplicationEventExistsError struct { diff --git a/internal/praefect/reconciler/reconciler_test.go b/internal/praefect/reconciler/reconciler_test.go index a7debac53..0a30858cb 100644 --- a/internal/praefect/reconciler/reconciler_test.go +++ b/internal/praefect/reconciler/reconciler_test.go @@ -10,7 +10,6 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/stretchr/testify/require" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage" - "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/middleware/metadatahandler" "gitlab.com/gitlab-org/gitaly/v16/internal/helper" "gitlab.com/gitlab-org/gitaly/v16/internal/praefect" "gitlab.com/gitlab-org/gitaly/v16/internal/praefect/datastore" @@ -1140,7 +1139,7 @@ func TestReconciler(t *testing.T) { var job datastore.ReplicationJob var meta datastore.Params require.NoError(t, rows.Scan(&job, &meta)) - require.NotEmpty(t, meta[metadatahandler.CorrelationIDKey]) + require.NotEmpty(t, meta[datastore.CorrelationIDKey]) actualJobs = append(actualJobs, job) } diff --git a/internal/praefect/replicator.go b/internal/praefect/replicator.go index 2d283749e..42b3eebee 100644 --- a/internal/praefect/replicator.go +++ b/internal/praefect/replicator.go @@ -10,7 +10,6 @@ import ( "github.com/prometheus/client_golang/prometheus" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/service/repository" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage" - "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/middleware/metadatahandler" "gitlab.com/gitlab-org/gitaly/v16/internal/helper" "gitlab.com/gitlab-org/gitaly/v16/internal/log" "gitlab.com/gitlab-org/gitaly/v16/internal/praefect/config" @@ -52,7 +51,7 @@ func (dr defaultReplicator) Replicate(ctx context.Context, event datastore.Repli logWithVirtualStorage: event.Job.VirtualStorage, logWithReplTarget: event.Job.TargetNodeStorage, "replication_job_source": event.Job.SourceNodeStorage, - logWithCorrID: correlation.ExtractFromContext(ctx), + correlation.FieldName: correlation.ExtractFromContext(ctx), }) generation, err := dr.rs.GetReplicatedGeneration(ctx, event.Job.RepositoryID, event.Job.SourceNodeStorage, event.Job.TargetNodeStorage) @@ -148,7 +147,7 @@ func (dr defaultReplicator) Destroy(ctx context.Context, event datastore.Replica return err } - dr.log.WithField(logWithCorrID, correlation.ExtractFromContext(ctx)). + dr.log.WithField(correlation.FieldName, correlation.ExtractFromContext(ctx)). WithError(err). Info("deleted repository did not have a store entry") } @@ -190,7 +189,7 @@ func (dr defaultReplicator) Rename(ctx context.Context, event datastore.Replicat return err } - dr.log.WithField(logWithCorrID, correlation.ExtractFromContext(ctx)). + dr.log.WithField(correlation.FieldName, correlation.ExtractFromContext(ctx)). WithError(err). Info("replicated repository rename does not have a store entry") } @@ -299,7 +298,6 @@ func (r ReplMgr) Collect(ch chan<- prometheus.Metric) { const ( logWithReplTarget = "replication_job_target" - logWithCorrID = "correlation_id" logWithVirtualStorage = "virtual_storage" ) @@ -335,7 +333,7 @@ type ( func getCorrelationID(params datastore.Params) string { correlationID := "" - if val, found := params[metadatahandler.CorrelationIDKey]; found { + if val, found := params[datastore.CorrelationIDKey]; found { correlationID, _ = val.(string) } return correlationID @@ -538,7 +536,7 @@ func (r ReplMgr) handleNodeEvent(ctx context.Context, logger log.Logger, targetC ctx = correlation.ContextWithCorrelation(ctx, cid) // we want it to be queryable by common `json.correlation_id` filter - logger = logger.WithField(logWithCorrID, cid) + logger = logger.WithField(correlation.FieldName, cid) // we log all details about the event only once before start of the processing logger.WithField("event", event).Info("replication job processing started") diff --git a/internal/praefect/replicator_test.go b/internal/praefect/replicator_test.go index 02b2458c1..9b1e0651e 100644 --- a/internal/praefect/replicator_test.go +++ b/internal/praefect/replicator_test.go @@ -18,7 +18,6 @@ import ( "gitlab.com/gitlab-org/gitaly/v16/internal/git/gittest" gitalycfg "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/config" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/service/setup" - "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/middleware/metadatahandler" "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/protoregistry" "gitlab.com/gitlab-org/gitaly/v16/internal/helper" "gitlab.com/gitlab-org/gitaly/v16/internal/log" @@ -119,7 +118,7 @@ func testReplMgrProcessBacklog(t *testing.T, ctx context.Context) { }, State: datastore.JobStateReady, Attempt: 3, - Meta: datastore.Params{metadatahandler.CorrelationIDKey: "correlation-id"}, + Meta: datastore.Params{datastore.CorrelationIDKey: "correlation-id"}, }) } require.Len(t, events, 1) @@ -174,7 +173,7 @@ func testReplMgrProcessBacklog(t *testing.T, ctx context.Context) { require.Equal(t, []interface{}{"replication job processing started", "virtual", "correlation-id"}, - []interface{}{logEntries[2].Message, logEntries[2].Data["virtual_storage"], logEntries[2].Data[logWithCorrID]}, + []interface{}{logEntries[2].Message, logEntries[2].Data["virtual_storage"], logEntries[2].Data[correlation.FieldName]}, ) dequeuedEvent := logEntries[2].Data["event"].(datastore.ReplicationEvent) @@ -183,7 +182,7 @@ func testReplMgrProcessBacklog(t *testing.T, ctx context.Context) { require.Equal(t, []interface{}{"replication job processing finished", "virtual", datastore.JobStateCompleted, "correlation-id"}, - []interface{}{logEntries[3].Message, logEntries[3].Data["virtual_storage"], logEntries[3].Data["new_state"], logEntries[3].Data[logWithCorrID]}, + []interface{}{logEntries[3].Message, logEntries[3].Data["virtual_storage"], logEntries[3].Data["new_state"], logEntries[3].Data[correlation.FieldName]}, ) replicatedPath := filepath.Join(backupCfg.Storages[0].Path, testRepoProto.GetRelativePath()) @@ -254,7 +253,7 @@ func TestReplicatorDowngradeAttempt(t *testing.T) { lastEntry := entries[0] require.Equal(t, logrus.InfoLevel, lastEntry.Level) require.Equal(t, returnedErr, lastEntry.Data["error"]) - require.Equal(t, "correlation-id", lastEntry.Data[logWithCorrID]) + require.Equal(t, "correlation-id", lastEntry.Data[correlation.FieldName]) require.Equal(t, tc.expectedMessage, lastEntry.Message) }) } diff --git a/internal/praefect/server.go b/internal/praefect/server.go index 066f2bf78..167688b51 100644 --- a/internal/praefect/server.go +++ b/internal/praefect/server.go @@ -8,19 +8,17 @@ import ( "time" grpcmwlogrus "github.com/grpc-ecosystem/go-grpc-middleware/logging/logrus" - grpcmwtags "github.com/grpc-ecosystem/go-grpc-middleware/tags" grpcprometheus "github.com/grpc-ecosystem/go-grpc-prometheus" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/server/auth" "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/backchannel" "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/grpcstats" "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/listenmux" - "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/middleware/metadatahandler" "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/middleware/panichandler" + "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/middleware/requestinfohandler" "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/middleware/sentryhandler" "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/middleware/statushandler" "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/proxy" "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/sidechannel" - "gitlab.com/gitlab-org/gitaly/v16/internal/helper/fieldextractors" "gitlab.com/gitlab-org/gitaly/v16/internal/log" "gitlab.com/gitlab-org/gitaly/v16/internal/praefect/config" "gitlab.com/gitlab-org/gitaly/v16/internal/praefect/datastore" @@ -67,9 +65,8 @@ func NewBackchannelServerFactory(logger log.Logger, refSvc gitalypb.RefTransacti func commonUnaryServerInterceptors(logger log.Logger, messageProducer grpcmwlogrus.MessageProducer) []grpc.UnaryServerInterceptor { return []grpc.UnaryServerInterceptor{ - grpcmwtags.UnaryServerInterceptor(ctxtagsInterceptorOption()), grpccorrelation.UnaryServerCorrelationInterceptor(), // Must be above the metadata handler - metadatahandler.UnaryInterceptor, + requestinfohandler.UnaryInterceptor, grpcprometheus.UnaryServerInterceptor, logger.UnaryServerInterceptor( grpcmwlogrus.WithTimestampFormat(log.LogTimestampFormat), @@ -85,10 +82,6 @@ func commonUnaryServerInterceptors(logger log.Logger, messageProducer grpcmwlogr } } -func ctxtagsInterceptorOption() grpcmwtags.Option { - return grpcmwtags.WithFieldExtractorForInitialReq(fieldextractors.FieldExtractor) -} - // ServerOption is an option that can be passed to `NewGRPCServer()`. type ServerOption func(cfg *serverConfig) @@ -136,10 +129,9 @@ func NewGRPCServer( unaryInterceptors = append(unaryInterceptors, serverCfg.unaryInterceptors...) streamInterceptors := []grpc.StreamServerInterceptor{ - grpcmwtags.StreamServerInterceptor(ctxtagsInterceptorOption()), grpccorrelation.StreamServerCorrelationInterceptor(), // Must be above the metadata handler middleware.MethodTypeStreamInterceptor(deps.Registry, deps.Logger), - metadatahandler.StreamInterceptor, + requestinfohandler.StreamInterceptor, grpcprometheus.StreamServerInterceptor, deps.Logger.WithField("component", "praefect.StreamServerInterceptor").StreamServerInterceptor( grpcmwlogrus.WithTimestampFormat(log.LogTimestampFormat), |