diff options
author | Patrick Steinhardt <psteinhardt@gitlab.com> | 2023-09-29 16:11:05 +0300 |
---|---|---|
committer | Patrick Steinhardt <psteinhardt@gitlab.com> | 2023-10-04 13:46:01 +0300 |
commit | 6a0e4459a16dc0cd6d0007c08d44172b1ce360b7 (patch) | |
tree | 644b6cc5a7c4bb800e748b049ec29fd132d8bb90 | |
parent | a59dc197402b0eb7d5cc53a7ea57c85daad3abe1 (diff) |
requestinfohandler: Absorb functionality of the field extractor
Merge the functionality that the field extractor provides into our
requestinfohandler logic. This unifies two related functionalities and
will eventually make it easier to move away from the tags mechanism that
is going away in go-grpc-middleware v2.
Add tests to make sure that the requestinfohandler interceptors work as
expected.
-rw-r--r-- | internal/gitaly/server/server.go | 8 | ||||
-rw-r--r-- | internal/grpc/middleware/requestinfohandler/requestinfohandler.go | 119 | ||||
-rw-r--r-- | internal/grpc/middleware/requestinfohandler/requestinfohandler_test.go | 336 | ||||
-rw-r--r-- | internal/helper/fieldextractors/fieldextractor.go | 96 | ||||
-rw-r--r-- | internal/praefect/server.go | 8 |
5 files changed, 430 insertions, 137 deletions
diff --git a/internal/gitaly/server/server.go b/internal/gitaly/server/server.go index a828c3f03..d8c1ab9e7 100644 --- a/internal/gitaly/server/server.go +++ b/internal/gitaly/server/server.go @@ -6,7 +6,6 @@ import ( "time" grpcmwlogrus "github.com/grpc-ecosystem/go-grpc-middleware/logging/logrus" - grpcmwtags "github.com/grpc-ecosystem/go-grpc-middleware/tags" grpcprometheus "github.com/grpc-ecosystem/go-grpc-prometheus" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/server/auth" "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/backchannel" @@ -21,7 +20,6 @@ import ( "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/middleware/sentryhandler" "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/middleware/statushandler" "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/protoregistry" - "gitlab.com/gitlab-org/gitaly/v16/internal/helper/fieldextractors" gitalylog "gitlab.com/gitlab-org/gitaly/v16/internal/log" "gitlab.com/gitlab-org/gitaly/v16/internal/structerr" grpccorrelation "gitlab.com/gitlab-org/labkit/correlation/grpc" @@ -61,10 +59,6 @@ func (s *GitalyServerFactory) New(secure bool, opts ...Option) (*grpc.Server, er opt(&cfg) } - ctxTagOpts := []grpcmwtags.Option{ - grpcmwtags.WithFieldExtractorForInitialReq(fieldextractors.FieldExtractor), - } - transportCredentials := insecure.NewCredentials() // If tls config is specified attempt to extract tls options and use it // as a grpc.ServerOption @@ -106,7 +100,6 @@ func (s *GitalyServerFactory) New(secure bool, opts ...Option) (*grpc.Server, er ) streamServerInterceptors := []grpc.StreamServerInterceptor{ - grpcmwtags.StreamServerInterceptor(ctxTagOpts...), grpccorrelation.StreamServerCorrelationInterceptor(), // Must be above the metadata handler requestinfohandler.StreamInterceptor, grpcprometheus.StreamServerInterceptor, @@ -122,7 +115,6 @@ func (s *GitalyServerFactory) New(secure bool, opts ...Option) (*grpc.Server, er auth.StreamServerInterceptor(s.cfg.Auth), } unaryServerInterceptors := []grpc.UnaryServerInterceptor{ - grpcmwtags.UnaryServerInterceptor(ctxTagOpts...), grpccorrelation.UnaryServerCorrelationInterceptor(), // Must be above the metadata handler requestinfohandler.UnaryInterceptor, grpcprometheus.UnaryServerInterceptor, diff --git a/internal/grpc/middleware/requestinfohandler/requestinfohandler.go b/internal/grpc/middleware/requestinfohandler/requestinfohandler.go index f4a1787bf..a064f82ea 100644 --- a/internal/grpc/middleware/requestinfohandler/requestinfohandler.go +++ b/internal/grpc/middleware/requestinfohandler/requestinfohandler.go @@ -11,6 +11,7 @@ import ( gitalyauth "gitlab.com/gitlab-org/gitaly/v16/auth" "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/protoregistry" "gitlab.com/gitlab-org/gitaly/v16/internal/structerr" + "gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb" "gitlab.com/gitlab-org/labkit/correlation" "google.golang.org/grpc" "google.golang.org/grpc/metadata" @@ -47,6 +48,10 @@ type requestInfo struct { deadlineType string methodOperation string methodScope string + + repository *gitalypb.Repository + objectPool *gitalypb.ObjectPool + storageName string } // Unknown client and feature. Matches the prometheus grpc unknown value @@ -64,8 +69,8 @@ func getFromMD(md metadata.MD, header string) string { // newRequestInfo extracts metadata from the connection headers and add it to the // ctx_tags, if it is set. Returns values appropriate for use with prometheus labels, // using `unknown` if a value is not set -func newRequestInfo(ctx context.Context, fullMethod, grpcMethodType string) requestInfo { - info := requestInfo{ +func newRequestInfo(ctx context.Context, fullMethod, grpcMethodType string) *requestInfo { + info := &requestInfo{ fullMethod: fullMethod, methodType: grpcMethodType, clientName: unknownValue, @@ -149,9 +154,33 @@ func newRequestInfo(ctx context.Context, fullMethod, grpcMethodType string) requ return info } -func (i requestInfo) injectTags(ctx context.Context) { - tags := grpcmwtags.Extract(ctx) +func (i *requestInfo) extractRequestInfo(request any) { + type repoScopedRequest interface { + GetRepository() *gitalypb.Repository + } + + type poolScopedRequest interface { + GetObjectPool() *gitalypb.ObjectPool + } + + type storageScopedRequest interface { + GetStorageName() string + } + + if repoScoped, ok := request.(repoScopedRequest); ok { + i.repository = repoScoped.GetRepository() + } + + if poolScoped, ok := request.(poolScopedRequest); ok { + i.objectPool = poolScoped.GetObjectPool() + } + + if storageScoped, ok := request.(storageScopedRequest); ok { + i.storageName = storageScoped.GetStorageName() + } +} +func (i *requestInfo) injectTags(tags grpcmwtags.Tags) { for key, value := range map[string]string{ "grpc.meta.call_site": i.callSite, "grpc.meta.client_name": i.clientName, @@ -160,6 +189,8 @@ func (i requestInfo) injectTags(ctx context.Context) { "grpc.meta.method_type": i.methodType, "grpc.meta.method_operation": i.methodOperation, "grpc.meta.method_scope": i.methodScope, + "grpc.request.fullMethod": i.fullMethod, + "grpc.request.StorageName": i.storageName, "remote_ip": i.remoteIP, "user_id": i.userID, "username": i.userName, @@ -171,9 +202,34 @@ func (i requestInfo) injectTags(ctx context.Context) { tags.Set(key, value) } + + // We handle the repository-related fields separately such that all fields will be set unconditionally, + // regardless of whether they are empty or not. This is done to retain all fields even if their values + // are empty. + if repo := i.repository; repo != nil { + for key, value := range map[string]string{ + "grpc.request.repoStorage": repo.GetStorageName(), + "grpc.request.repoPath": repo.GetRelativePath(), + "grpc.request.glRepository": repo.GetGlRepository(), + "grpc.request.glProjectPath": repo.GetGlProjectPath(), + } { + tags.Set(key, value) + } + } + + // Same for the object pool repository. + if pool := i.objectPool.GetRepository(); pool != nil { + for key, value := range map[string]string{ + "grpc.request.pool.storage": pool.GetStorageName(), + "grpc.request.pool.relativePath": pool.GetRelativePath(), + "grpc.request.pool.sourceProjectPath": pool.GetGlProjectPath(), + } { + tags.Set(key, value) + } + } } -func (i requestInfo) reportPrometheusMetrics(err error) { +func (i *requestInfo) reportPrometheusMetrics(err error) { grpcCode := structerr.GRPCCode(err) serviceName, methodName := extractServiceAndMethodName(i.fullMethod) @@ -202,9 +258,13 @@ func extractServiceAndMethodName(fullMethodName string) (string, string) { // UnaryInterceptor returns a Unary Interceptor func UnaryInterceptor(ctx context.Context, req interface{}, serverInfo *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) { + tags := grpcmwtags.NewTags() + ctx = grpcmwtags.SetInContext(ctx, tags) + info := newRequestInfo(ctx, serverInfo.FullMethod, "unary") + info.extractRequestInfo(req) - info.injectTags(ctx) + info.injectTags(tags) res, err := handler(ctx, req) info.reportPrometheusMetrics(err) @@ -213,11 +273,21 @@ func UnaryInterceptor(ctx context.Context, req interface{}, serverInfo *grpc.Una // StreamInterceptor returns a Stream Interceptor func StreamInterceptor(srv interface{}, stream grpc.ServerStream, serverInfo *grpc.StreamServerInfo, handler grpc.StreamHandler) error { - ctx := stream.Context() + tags := grpcmwtags.NewTags() + ctx := grpcmwtags.SetInContext(stream.Context(), tags) + info := newRequestInfo(ctx, serverInfo.FullMethod, streamRPCType(serverInfo)) - info.injectTags(ctx) - err := handler(srv, stream) + // Even though we don't yet have all information set up we already inject the tags here. This is done such that + // log messages will at least have the metadata set up correctly in case there is no first request. + info.injectTags(tags) + err := handler(srv, &wrappedServerStream{ + ServerStream: stream, + ctx: ctx, + info: info, + tags: tags, + initial: true, + }) info.reportPrometheusMetrics(err) return err @@ -231,3 +301,34 @@ func streamRPCType(info *grpc.StreamServerInfo) string { } return "bidi_stream" } + +// wrappedServerStream wraps a grpc.ServerStream such that we can intercept and extract info from the first gRPC request +// on that stream. +type wrappedServerStream struct { + grpc.ServerStream + ctx context.Context + tags grpcmwtags.Tags + info *requestInfo + initial bool +} + +// Context overrides the context of the ServerStream with our own context that has the tags set up. +func (w *wrappedServerStream) Context() context.Context { + return w.ctx +} + +// RecvMsg receives a message from the underlying server stream. The initial received message will be used to extract +// request information and inject it into the context. +func (w *wrappedServerStream) RecvMsg(req interface{}) error { + err := w.ServerStream.RecvMsg(req) + + if w.initial { + w.initial = false + + w.info.extractRequestInfo(req) + // Re-inject the tags a second time here. + w.info.injectTags(w.tags) + } + + return err +} diff --git a/internal/grpc/middleware/requestinfohandler/requestinfohandler_test.go b/internal/grpc/middleware/requestinfohandler/requestinfohandler_test.go index cdf7b08d1..0974ee6a1 100644 --- a/internal/grpc/middleware/requestinfohandler/requestinfohandler_test.go +++ b/internal/grpc/middleware/requestinfohandler/requestinfohandler_test.go @@ -2,14 +2,21 @@ package requestinfohandler import ( "context" + "io" + "net" "testing" "time" grpcmwtags "github.com/grpc-ecosystem/go-grpc-middleware/tags" "github.com/stretchr/testify/require" + "gitlab.com/gitlab-org/gitaly/v16/internal/structerr" "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper" + "gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb" "gitlab.com/gitlab-org/labkit/correlation" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/metadata" + "google.golang.org/grpc/test/bufconn" ) const ( @@ -27,13 +34,13 @@ func TestNewRequestInfo(t *testing.T) { fullMethod string metadata metadata.MD deadline bool - expectedInfo requestInfo + expectedInfo *requestInfo }{ { desc: "empty metadata", metadata: metadata.Pairs(), deadline: false, - expectedInfo: requestInfo{ + expectedInfo: &requestInfo{ methodType: "unary", clientName: unknownValue, callSite: unknownValue, @@ -47,7 +54,7 @@ func TestNewRequestInfo(t *testing.T) { desc: "context containing metadata", metadata: metadata.Pairs("call_site", "testsite"), deadline: false, - expectedInfo: requestInfo{ + expectedInfo: &requestInfo{ methodType: "unary", clientName: unknownValue, callSite: "testsite", @@ -61,7 +68,7 @@ func TestNewRequestInfo(t *testing.T) { desc: "context containing metadata and a deadline", metadata: metadata.Pairs("call_site", "testsite"), deadline: true, - expectedInfo: requestInfo{ + expectedInfo: &requestInfo{ methodType: "unary", clientName: unknownValue, callSite: "testsite", @@ -75,7 +82,7 @@ func TestNewRequestInfo(t *testing.T) { desc: "context containing metadata and a deadline type", metadata: metadata.Pairs("deadline_type", "regular"), deadline: true, - expectedInfo: requestInfo{ + expectedInfo: &requestInfo{ methodType: "unary", clientName: unknownValue, callSite: unknownValue, @@ -89,7 +96,7 @@ func TestNewRequestInfo(t *testing.T) { desc: "a context without deadline but with deadline type", metadata: metadata.Pairs("deadline_type", "regular"), deadline: false, - expectedInfo: requestInfo{ + expectedInfo: &requestInfo{ methodType: "unary", clientName: unknownValue, callSite: unknownValue, @@ -103,7 +110,7 @@ func TestNewRequestInfo(t *testing.T) { desc: "with a context containing metadata", metadata: metadata.Pairs("deadline_type", "regular", "client_name", "rails"), deadline: true, - expectedInfo: requestInfo{ + expectedInfo: &requestInfo{ methodType: "unary", clientName: "rails", callSite: unknownValue, @@ -118,7 +125,7 @@ func TestNewRequestInfo(t *testing.T) { fullMethod: "/gitaly.RepositoryService/UnknownMethod", metadata: metadata.Pairs(), deadline: false, - expectedInfo: requestInfo{ + expectedInfo: &requestInfo{ fullMethod: "/gitaly.RepositoryService/UnknownMethod", methodType: "unary", clientName: unknownValue, @@ -134,7 +141,7 @@ func TestNewRequestInfo(t *testing.T) { fullMethod: "/gitaly.RepositoryService/ObjectFormat", metadata: metadata.Pairs(), deadline: false, - expectedInfo: requestInfo{ + expectedInfo: &requestInfo{ fullMethod: "/gitaly.RepositoryService/ObjectFormat", methodType: "unary", clientName: unknownValue, @@ -150,7 +157,7 @@ func TestNewRequestInfo(t *testing.T) { fullMethod: "/gitaly.RepositoryService/CreateRepository", metadata: metadata.Pairs(), deadline: false, - expectedInfo: requestInfo{ + expectedInfo: &requestInfo{ fullMethod: "/gitaly.RepositoryService/CreateRepository", methodType: "unary", clientName: unknownValue, @@ -166,7 +173,7 @@ func TestNewRequestInfo(t *testing.T) { fullMethod: "/gitaly.RepositoryService/OptimizeRepository", metadata: metadata.Pairs(), deadline: false, - expectedInfo: requestInfo{ + expectedInfo: &requestInfo{ fullMethod: "/gitaly.RepositoryService/OptimizeRepository", methodType: "unary", clientName: unknownValue, @@ -182,7 +189,7 @@ func TestNewRequestInfo(t *testing.T) { fullMethod: "/gitaly.RepositoryService/OptimizeRepository", metadata: metadata.Pairs(), deadline: false, - expectedInfo: requestInfo{ + expectedInfo: &requestInfo{ fullMethod: "/gitaly.RepositoryService/OptimizeRepository", methodType: "unary", clientName: unknownValue, @@ -198,7 +205,7 @@ func TestNewRequestInfo(t *testing.T) { fullMethod: "/gitaly.RemoteService/FindRemoteRepository", metadata: metadata.Pairs(), deadline: false, - expectedInfo: requestInfo{ + expectedInfo: &requestInfo{ fullMethod: "/gitaly.RemoteService/FindRemoteRepository", methodType: "unary", clientName: unknownValue, @@ -247,9 +254,11 @@ func TestGRPCTags(t *testing.T) { _, err := interceptor(ctx, nil, nil, func(ctx context.Context, _ interface{}) (interface{}, error) { info := newRequestInfo(ctx, "/gitaly.RepositoryService/OptimizeRepository", "unary") - info.injectTags(ctx) - require.Equal(t, requestInfo{ + tags := grpcmwtags.NewTags() + info.injectTags(tags) + + require.Equal(t, &requestInfo{ correlationID: correlationID, fullMethod: "/gitaly.RepositoryService/OptimizeRepository", methodType: "unary", @@ -268,7 +277,8 @@ func TestGRPCTags(t *testing.T) { "grpc.meta.method_type": "unary", "grpc.meta.method_operation": "maintenance", "grpc.meta.method_scope": "repository", - }, grpcmwtags.Extract(ctx).Values()) + "grpc.request.fullMethod": "/gitaly.RepositoryService/OptimizeRepository", + }, tags.Values()) return nil, nil }) @@ -320,3 +330,297 @@ func TestExtractServiceAndMethodName(t *testing.T) { }) } } + +func TestInterceptors(t *testing.T) { + t.Parallel() + + ctx := testhelper.Context(t) + + for _, tc := range []struct { + desc string + call func(*testing.T, mockClient) + expectedTags map[string]any + }{ + { + desc: "unary repository-scoped call", + call: func(t *testing.T, client mockClient) { + _, err := client.RepositoryInfo(ctx, &gitalypb.RepositoryInfoRequest{ + Repository: &gitalypb.Repository{ + StorageName: "storage", + RelativePath: "path", + GlProjectPath: "glProject", + GlRepository: "glRepository", + }, + }) + + require.NoError(t, err) + }, + expectedTags: map[string]any{ + "grpc.meta.deadline_type": "none", + "grpc.meta.method_operation": "accessor", + "grpc.meta.method_scope": "repository", + "grpc.meta.method_type": "unary", + "grpc.request.fullMethod": "/gitaly.RepositoryService/RepositoryInfo", + "grpc.request.repoStorage": "storage", + "grpc.request.repoPath": "path", + "grpc.request.glProjectPath": "glProject", + "grpc.request.glRepository": "glRepository", + }, + }, + { + desc: "unary repository-scoped call with unset repository", + call: func(t *testing.T, client mockClient) { + _, err := client.RepositoryInfo(ctx, &gitalypb.RepositoryInfoRequest{ + Repository: nil, + }) + + require.NoError(t, err) + }, + expectedTags: map[string]any{ + "grpc.meta.deadline_type": "none", + "grpc.meta.method_operation": "accessor", + "grpc.meta.method_scope": "repository", + "grpc.meta.method_type": "unary", + "grpc.request.fullMethod": "/gitaly.RepositoryService/RepositoryInfo", + }, + }, + { + desc: "unary object-pool-scoped call", + call: func(t *testing.T, client mockClient) { + _, err := client.FetchIntoObjectPool(ctx, &gitalypb.FetchIntoObjectPoolRequest{ + ObjectPool: &gitalypb.ObjectPool{ + Repository: &gitalypb.Repository{ + StorageName: "storage", + RelativePath: "path", + GlProjectPath: "glProject", + }, + }, + }) + + require.NoError(t, err) + }, + expectedTags: map[string]any{ + "grpc.meta.deadline_type": "none", + "grpc.meta.method_operation": "mutator", + "grpc.meta.method_scope": "repository", + "grpc.meta.method_type": "unary", + "grpc.request.fullMethod": "/gitaly.ObjectPoolService/FetchIntoObjectPool", + "grpc.request.pool.relativePath": "path", + "grpc.request.pool.storage": "storage", + "grpc.request.pool.sourceProjectPath": "glProject", + }, + }, + { + desc: "unary repository-scoped call with deadline", + call: func(t *testing.T, client mockClient) { + ctx, cancel := context.WithDeadline(ctx, time.Date(2100, time.January, 1, 12, 0, 0, 0, time.UTC)) + defer cancel() + + _, err := client.RepositoryInfo(ctx, &gitalypb.RepositoryInfoRequest{ + Repository: &gitalypb.Repository{ + StorageName: "storage", + RelativePath: "path", + GlProjectPath: "glProject", + GlRepository: "glRepository", + }, + }) + + require.NoError(t, err) + }, + expectedTags: map[string]any{ + // Note that there is no "deadline: none" field anymore. If we were + // to inject the deadline type then it would appear here. + "grpc.meta.method_operation": "accessor", + "grpc.meta.method_scope": "repository", + "grpc.meta.method_type": "unary", + "grpc.request.fullMethod": "/gitaly.RepositoryService/RepositoryInfo", + "grpc.request.repoStorage": "storage", + "grpc.request.repoPath": "path", + "grpc.request.glProjectPath": "glProject", + "grpc.request.glRepository": "glRepository", + }, + }, + { + desc: "unary repository-scoped call with additional metadata", + call: func(t *testing.T, client mockClient) { + ctx, cancel := context.WithDeadline(ctx, time.Date(2100, time.January, 1, 12, 0, 0, 0, time.UTC)) + defer cancel() + + ctx = metadata.NewOutgoingContext(ctx, metadata.MD{ + "call_site": []string{"callSite"}, + "deadline_type": []string{"deadlineType"}, + "client_name": []string{"clientName"}, + "remote_ip": []string{"remoteIP"}, + "user_id": []string{"userID"}, + "username": []string{"userName"}, + correlation.FieldName: []string{"correlationID"}, + }) + + _, err := client.RepositoryInfo(ctx, &gitalypb.RepositoryInfoRequest{ + Repository: &gitalypb.Repository{ + StorageName: "storage", + RelativePath: "path", + GlProjectPath: "glProject", + GlRepository: "glRepository", + }, + }) + + require.NoError(t, err) + }, + expectedTags: map[string]any{ + "grpc.meta.call_site": "callSite", + "grpc.meta.deadline_type": "deadlineType", + "grpc.meta.client_name": "clientName", + "grpc.meta.method_operation": "accessor", + "grpc.meta.method_scope": "repository", + "grpc.meta.method_type": "unary", + "grpc.request.fullMethod": "/gitaly.RepositoryService/RepositoryInfo", + "grpc.request.repoStorage": "storage", + "grpc.request.repoPath": "path", + "grpc.request.glProjectPath": "glProject", + "grpc.request.glRepository": "glRepository", + "remote_ip": "remoteIP", + "user_id": "userID", + "username": "userName", + }, + }, + { + desc: "streaming repository-scoped call", + call: func(t *testing.T, client mockClient) { + stream, err := client.CreateBundleFromRefList(ctx) + require.NoError(t, err) + + require.NoError(t, stream.Send(&gitalypb.CreateBundleFromRefListRequest{ + Repository: &gitalypb.Repository{ + StorageName: "storage", + RelativePath: "path", + GlProjectPath: "glProject", + GlRepository: "glRepository", + }, + })) + + _, err = stream.Recv() + require.NoError(t, err) + }, + expectedTags: map[string]any{ + "grpc.meta.deadline_type": "none", + "grpc.meta.method_operation": "accessor", + "grpc.meta.method_scope": "repository", + "grpc.meta.method_type": "bidi_stream", + "grpc.request.fullMethod": "/gitaly.RepositoryService/CreateBundleFromRefList", + "grpc.request.repoStorage": "storage", + "grpc.request.repoPath": "path", + "grpc.request.glProjectPath": "glProject", + "grpc.request.glRepository": "glRepository", + }, + }, + { + desc: "streaming repository-scoped call with missing initial request", + call: func(t *testing.T, client mockClient) { + stream, err := client.CreateBundleFromRefList(ctx) + require.NoError(t, err) + require.NoError(t, stream.CloseSend()) + + _, err = stream.Recv() + testhelper.RequireGrpcError(t, structerr.New("%w", io.EOF), err) + }, + expectedTags: map[string]any{ + "grpc.meta.deadline_type": "none", + "grpc.meta.method_operation": "accessor", + "grpc.meta.method_scope": "repository", + "grpc.meta.method_type": "bidi_stream", + "grpc.request.fullMethod": "/gitaly.RepositoryService/CreateBundleFromRefList", + }, + }, + } { + t.Run(tc.desc, func(t *testing.T) { + server, client := setupServer(t, ctx) + + tc.call(t, client) + + if tc.expectedTags == nil { + require.Equal(t, nil, tc.expectedTags) + } else { + require.Equal(t, tc.expectedTags, server.tags.Values()) + } + }) + } +} + +type mockServer struct { + gitalypb.RepositoryServiceServer + gitalypb.ObjectPoolServiceServer + tags grpcmwtags.Tags +} + +type mockClient struct { + gitalypb.RepositoryServiceClient + gitalypb.ObjectPoolServiceClient +} + +func setupServer(tb testing.TB, ctx context.Context) (*mockServer, mockClient) { + tb.Helper() + + var mockServer mockServer + + server := grpc.NewServer( + grpc.ChainUnaryInterceptor( + grpcmwtags.UnaryServerInterceptor(), + UnaryInterceptor, + // This interceptor and the equivalent interceptor for the streaming gRPC calls is responsible + // for recording the tags that the preceding interceptor has injected. + func(ctx context.Context, req any, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (any, error) { + mockServer.tags = grpcmwtags.Extract(ctx) + return handler(ctx, req) + }, + ), + grpc.ChainStreamInterceptor( + grpcmwtags.StreamServerInterceptor(), + StreamInterceptor, + func(server any, stream grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error { + mockServer.tags = grpcmwtags.Extract(stream.Context()) + return handler(server, stream) + }, + ), + ) + tb.Cleanup(server.Stop) + gitalypb.RegisterRepositoryServiceServer(server, &mockServer) + gitalypb.RegisterObjectPoolServiceServer(server, &mockServer) + + listener := bufconn.Listen(1) + go testhelper.MustServe(tb, server, listener) + + conn, err := grpc.DialContext(ctx, listener.Addr().String(), + grpc.WithTransportCredentials(insecure.NewCredentials()), + grpc.WithContextDialer(func(ctx context.Context, _ string) (net.Conn, error) { + return listener.DialContext(ctx) + }), + ) + require.NoError(tb, err) + tb.Cleanup(func() { testhelper.MustClose(tb, conn) }) + + return &mockServer, mockClient{ + RepositoryServiceClient: gitalypb.NewRepositoryServiceClient(conn), + ObjectPoolServiceClient: gitalypb.NewObjectPoolServiceClient(conn), + } +} + +func (s *mockServer) RepositoryInfo(ctx context.Context, _ *gitalypb.RepositoryInfoRequest) (*gitalypb.RepositoryInfoResponse, error) { + return &gitalypb.RepositoryInfoResponse{}, nil +} + +func (s *mockServer) FetchIntoObjectPool(ctx context.Context, _ *gitalypb.FetchIntoObjectPoolRequest) (*gitalypb.FetchIntoObjectPoolResponse, error) { + return &gitalypb.FetchIntoObjectPoolResponse{}, nil +} + +func (s *mockServer) CreateBundleFromRefList(stream gitalypb.RepositoryService_CreateBundleFromRefListServer) error { + if _, err := stream.Recv(); err != nil { + return err + } + + if err := stream.Send(&gitalypb.CreateBundleFromRefListResponse{}); err != nil { + return err + } + + return nil +} diff --git a/internal/helper/fieldextractors/fieldextractor.go b/internal/helper/fieldextractors/fieldextractor.go deleted file mode 100644 index 2f4a1ec1a..000000000 --- a/internal/helper/fieldextractors/fieldextractor.go +++ /dev/null @@ -1,96 +0,0 @@ -package fieldextractors - -import ( - "strings" - - "gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb" -) - -type repositoryBasedRequest interface { - GetRepository() *gitalypb.Repository -} - -type storageBasedRequest interface { - GetStorageName() string -} - -func formatRepoRequest(repo *gitalypb.Repository) map[string]interface{} { - if repo == nil { - // Signals that the client did not send a repo through, which - // will be useful for logging - return map[string]interface{}{ - "repo": nil, - } - } - - return map[string]interface{}{ - "repoStorage": repo.StorageName, - "repoPath": repo.RelativePath, - "glRepository": repo.GlRepository, - "glProjectPath": repo.GlProjectPath, - } -} - -func formatStorageRequest(storageReq storageBasedRequest) map[string]interface{} { - return map[string]interface{}{ - "StorageName": storageReq.GetStorageName(), - } -} - -// FieldExtractor will extract the relevant fields from an incoming grpc request -func FieldExtractor(fullMethod string, req interface{}) map[string]interface{} { - if req == nil { - return nil - } - - var result map[string]interface{} - - switch req := req.(type) { - case repositoryBasedRequest: - result = formatRepoRequest(req.GetRepository()) - case storageBasedRequest: - result = formatStorageRequest(req) - } - - if result == nil { - result = make(map[string]interface{}) - } - - switch { - case strings.HasPrefix(fullMethod, "/gitaly.ObjectPoolService/"): - addObjectPool(req, result) - } - - result["fullMethod"] = fullMethod - - return result -} - -type objectPoolRequest interface { - GetObjectPool() *gitalypb.ObjectPool -} - -func addObjectPool(req interface{}, tags map[string]interface{}) { - oReq, ok := req.(objectPoolRequest) - if !ok { - return - } - - pool := oReq.GetObjectPool() - if pool == nil { - return - } - - repo := pool.GetRepository() - if repo == nil { - return - } - - for k, v := range map[string]string{ - "pool.storage": repo.StorageName, - "pool.relativePath": repo.RelativePath, - "pool.sourceProjectPath": repo.GlProjectPath, - } { - tags[k] = v - } -} diff --git a/internal/praefect/server.go b/internal/praefect/server.go index 4044e8eab..167688b51 100644 --- a/internal/praefect/server.go +++ b/internal/praefect/server.go @@ -8,7 +8,6 @@ import ( "time" grpcmwlogrus "github.com/grpc-ecosystem/go-grpc-middleware/logging/logrus" - grpcmwtags "github.com/grpc-ecosystem/go-grpc-middleware/tags" grpcprometheus "github.com/grpc-ecosystem/go-grpc-prometheus" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/server/auth" "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/backchannel" @@ -20,7 +19,6 @@ import ( "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/middleware/statushandler" "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/proxy" "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/sidechannel" - "gitlab.com/gitlab-org/gitaly/v16/internal/helper/fieldextractors" "gitlab.com/gitlab-org/gitaly/v16/internal/log" "gitlab.com/gitlab-org/gitaly/v16/internal/praefect/config" "gitlab.com/gitlab-org/gitaly/v16/internal/praefect/datastore" @@ -67,7 +65,6 @@ func NewBackchannelServerFactory(logger log.Logger, refSvc gitalypb.RefTransacti func commonUnaryServerInterceptors(logger log.Logger, messageProducer grpcmwlogrus.MessageProducer) []grpc.UnaryServerInterceptor { return []grpc.UnaryServerInterceptor{ - grpcmwtags.UnaryServerInterceptor(ctxtagsInterceptorOption()), grpccorrelation.UnaryServerCorrelationInterceptor(), // Must be above the metadata handler requestinfohandler.UnaryInterceptor, grpcprometheus.UnaryServerInterceptor, @@ -85,10 +82,6 @@ func commonUnaryServerInterceptors(logger log.Logger, messageProducer grpcmwlogr } } -func ctxtagsInterceptorOption() grpcmwtags.Option { - return grpcmwtags.WithFieldExtractorForInitialReq(fieldextractors.FieldExtractor) -} - // ServerOption is an option that can be passed to `NewGRPCServer()`. type ServerOption func(cfg *serverConfig) @@ -136,7 +129,6 @@ func NewGRPCServer( unaryInterceptors = append(unaryInterceptors, serverCfg.unaryInterceptors...) streamInterceptors := []grpc.StreamServerInterceptor{ - grpcmwtags.StreamServerInterceptor(ctxtagsInterceptorOption()), grpccorrelation.StreamServerCorrelationInterceptor(), // Must be above the metadata handler middleware.MethodTypeStreamInterceptor(deps.Registry, deps.Logger), requestinfohandler.StreamInterceptor, |