Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPatrick Steinhardt <psteinhardt@gitlab.com>2023-09-29 16:11:05 +0300
committerPatrick Steinhardt <psteinhardt@gitlab.com>2023-10-04 13:46:01 +0300
commit6a0e4459a16dc0cd6d0007c08d44172b1ce360b7 (patch)
tree644b6cc5a7c4bb800e748b049ec29fd132d8bb90
parenta59dc197402b0eb7d5cc53a7ea57c85daad3abe1 (diff)
requestinfohandler: Absorb functionality of the field extractor
Merge the functionality that the field extractor provides into our requestinfohandler logic. This unifies two related functionalities and will eventually make it easier to move away from the tags mechanism that is going away in go-grpc-middleware v2. Add tests to make sure that the requestinfohandler interceptors work as expected.
-rw-r--r--internal/gitaly/server/server.go8
-rw-r--r--internal/grpc/middleware/requestinfohandler/requestinfohandler.go119
-rw-r--r--internal/grpc/middleware/requestinfohandler/requestinfohandler_test.go336
-rw-r--r--internal/helper/fieldextractors/fieldextractor.go96
-rw-r--r--internal/praefect/server.go8
5 files changed, 430 insertions, 137 deletions
diff --git a/internal/gitaly/server/server.go b/internal/gitaly/server/server.go
index a828c3f03..d8c1ab9e7 100644
--- a/internal/gitaly/server/server.go
+++ b/internal/gitaly/server/server.go
@@ -6,7 +6,6 @@ import (
"time"
grpcmwlogrus "github.com/grpc-ecosystem/go-grpc-middleware/logging/logrus"
- grpcmwtags "github.com/grpc-ecosystem/go-grpc-middleware/tags"
grpcprometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
"gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/server/auth"
"gitlab.com/gitlab-org/gitaly/v16/internal/grpc/backchannel"
@@ -21,7 +20,6 @@ import (
"gitlab.com/gitlab-org/gitaly/v16/internal/grpc/middleware/sentryhandler"
"gitlab.com/gitlab-org/gitaly/v16/internal/grpc/middleware/statushandler"
"gitlab.com/gitlab-org/gitaly/v16/internal/grpc/protoregistry"
- "gitlab.com/gitlab-org/gitaly/v16/internal/helper/fieldextractors"
gitalylog "gitlab.com/gitlab-org/gitaly/v16/internal/log"
"gitlab.com/gitlab-org/gitaly/v16/internal/structerr"
grpccorrelation "gitlab.com/gitlab-org/labkit/correlation/grpc"
@@ -61,10 +59,6 @@ func (s *GitalyServerFactory) New(secure bool, opts ...Option) (*grpc.Server, er
opt(&cfg)
}
- ctxTagOpts := []grpcmwtags.Option{
- grpcmwtags.WithFieldExtractorForInitialReq(fieldextractors.FieldExtractor),
- }
-
transportCredentials := insecure.NewCredentials()
// If tls config is specified attempt to extract tls options and use it
// as a grpc.ServerOption
@@ -106,7 +100,6 @@ func (s *GitalyServerFactory) New(secure bool, opts ...Option) (*grpc.Server, er
)
streamServerInterceptors := []grpc.StreamServerInterceptor{
- grpcmwtags.StreamServerInterceptor(ctxTagOpts...),
grpccorrelation.StreamServerCorrelationInterceptor(), // Must be above the metadata handler
requestinfohandler.StreamInterceptor,
grpcprometheus.StreamServerInterceptor,
@@ -122,7 +115,6 @@ func (s *GitalyServerFactory) New(secure bool, opts ...Option) (*grpc.Server, er
auth.StreamServerInterceptor(s.cfg.Auth),
}
unaryServerInterceptors := []grpc.UnaryServerInterceptor{
- grpcmwtags.UnaryServerInterceptor(ctxTagOpts...),
grpccorrelation.UnaryServerCorrelationInterceptor(), // Must be above the metadata handler
requestinfohandler.UnaryInterceptor,
grpcprometheus.UnaryServerInterceptor,
diff --git a/internal/grpc/middleware/requestinfohandler/requestinfohandler.go b/internal/grpc/middleware/requestinfohandler/requestinfohandler.go
index f4a1787bf..a064f82ea 100644
--- a/internal/grpc/middleware/requestinfohandler/requestinfohandler.go
+++ b/internal/grpc/middleware/requestinfohandler/requestinfohandler.go
@@ -11,6 +11,7 @@ import (
gitalyauth "gitlab.com/gitlab-org/gitaly/v16/auth"
"gitlab.com/gitlab-org/gitaly/v16/internal/grpc/protoregistry"
"gitlab.com/gitlab-org/gitaly/v16/internal/structerr"
+ "gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb"
"gitlab.com/gitlab-org/labkit/correlation"
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"
@@ -47,6 +48,10 @@ type requestInfo struct {
deadlineType string
methodOperation string
methodScope string
+
+ repository *gitalypb.Repository
+ objectPool *gitalypb.ObjectPool
+ storageName string
}
// Unknown client and feature. Matches the prometheus grpc unknown value
@@ -64,8 +69,8 @@ func getFromMD(md metadata.MD, header string) string {
// newRequestInfo extracts metadata from the connection headers and add it to the
// ctx_tags, if it is set. Returns values appropriate for use with prometheus labels,
// using `unknown` if a value is not set
-func newRequestInfo(ctx context.Context, fullMethod, grpcMethodType string) requestInfo {
- info := requestInfo{
+func newRequestInfo(ctx context.Context, fullMethod, grpcMethodType string) *requestInfo {
+ info := &requestInfo{
fullMethod: fullMethod,
methodType: grpcMethodType,
clientName: unknownValue,
@@ -149,9 +154,33 @@ func newRequestInfo(ctx context.Context, fullMethod, grpcMethodType string) requ
return info
}
-func (i requestInfo) injectTags(ctx context.Context) {
- tags := grpcmwtags.Extract(ctx)
+func (i *requestInfo) extractRequestInfo(request any) {
+ type repoScopedRequest interface {
+ GetRepository() *gitalypb.Repository
+ }
+
+ type poolScopedRequest interface {
+ GetObjectPool() *gitalypb.ObjectPool
+ }
+
+ type storageScopedRequest interface {
+ GetStorageName() string
+ }
+
+ if repoScoped, ok := request.(repoScopedRequest); ok {
+ i.repository = repoScoped.GetRepository()
+ }
+
+ if poolScoped, ok := request.(poolScopedRequest); ok {
+ i.objectPool = poolScoped.GetObjectPool()
+ }
+
+ if storageScoped, ok := request.(storageScopedRequest); ok {
+ i.storageName = storageScoped.GetStorageName()
+ }
+}
+func (i *requestInfo) injectTags(tags grpcmwtags.Tags) {
for key, value := range map[string]string{
"grpc.meta.call_site": i.callSite,
"grpc.meta.client_name": i.clientName,
@@ -160,6 +189,8 @@ func (i requestInfo) injectTags(ctx context.Context) {
"grpc.meta.method_type": i.methodType,
"grpc.meta.method_operation": i.methodOperation,
"grpc.meta.method_scope": i.methodScope,
+ "grpc.request.fullMethod": i.fullMethod,
+ "grpc.request.StorageName": i.storageName,
"remote_ip": i.remoteIP,
"user_id": i.userID,
"username": i.userName,
@@ -171,9 +202,34 @@ func (i requestInfo) injectTags(ctx context.Context) {
tags.Set(key, value)
}
+
+ // We handle the repository-related fields separately such that all fields will be set unconditionally,
+ // regardless of whether they are empty or not. This is done to retain all fields even if their values
+ // are empty.
+ if repo := i.repository; repo != nil {
+ for key, value := range map[string]string{
+ "grpc.request.repoStorage": repo.GetStorageName(),
+ "grpc.request.repoPath": repo.GetRelativePath(),
+ "grpc.request.glRepository": repo.GetGlRepository(),
+ "grpc.request.glProjectPath": repo.GetGlProjectPath(),
+ } {
+ tags.Set(key, value)
+ }
+ }
+
+ // Same for the object pool repository.
+ if pool := i.objectPool.GetRepository(); pool != nil {
+ for key, value := range map[string]string{
+ "grpc.request.pool.storage": pool.GetStorageName(),
+ "grpc.request.pool.relativePath": pool.GetRelativePath(),
+ "grpc.request.pool.sourceProjectPath": pool.GetGlProjectPath(),
+ } {
+ tags.Set(key, value)
+ }
+ }
}
-func (i requestInfo) reportPrometheusMetrics(err error) {
+func (i *requestInfo) reportPrometheusMetrics(err error) {
grpcCode := structerr.GRPCCode(err)
serviceName, methodName := extractServiceAndMethodName(i.fullMethod)
@@ -202,9 +258,13 @@ func extractServiceAndMethodName(fullMethodName string) (string, string) {
// UnaryInterceptor returns a Unary Interceptor
func UnaryInterceptor(ctx context.Context, req interface{}, serverInfo *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
+ tags := grpcmwtags.NewTags()
+ ctx = grpcmwtags.SetInContext(ctx, tags)
+
info := newRequestInfo(ctx, serverInfo.FullMethod, "unary")
+ info.extractRequestInfo(req)
- info.injectTags(ctx)
+ info.injectTags(tags)
res, err := handler(ctx, req)
info.reportPrometheusMetrics(err)
@@ -213,11 +273,21 @@ func UnaryInterceptor(ctx context.Context, req interface{}, serverInfo *grpc.Una
// StreamInterceptor returns a Stream Interceptor
func StreamInterceptor(srv interface{}, stream grpc.ServerStream, serverInfo *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
- ctx := stream.Context()
+ tags := grpcmwtags.NewTags()
+ ctx := grpcmwtags.SetInContext(stream.Context(), tags)
+
info := newRequestInfo(ctx, serverInfo.FullMethod, streamRPCType(serverInfo))
- info.injectTags(ctx)
- err := handler(srv, stream)
+ // Even though we don't yet have all information set up we already inject the tags here. This is done such that
+ // log messages will at least have the metadata set up correctly in case there is no first request.
+ info.injectTags(tags)
+ err := handler(srv, &wrappedServerStream{
+ ServerStream: stream,
+ ctx: ctx,
+ info: info,
+ tags: tags,
+ initial: true,
+ })
info.reportPrometheusMetrics(err)
return err
@@ -231,3 +301,34 @@ func streamRPCType(info *grpc.StreamServerInfo) string {
}
return "bidi_stream"
}
+
+// wrappedServerStream wraps a grpc.ServerStream such that we can intercept and extract info from the first gRPC request
+// on that stream.
+type wrappedServerStream struct {
+ grpc.ServerStream
+ ctx context.Context
+ tags grpcmwtags.Tags
+ info *requestInfo
+ initial bool
+}
+
+// Context overrides the context of the ServerStream with our own context that has the tags set up.
+func (w *wrappedServerStream) Context() context.Context {
+ return w.ctx
+}
+
+// RecvMsg receives a message from the underlying server stream. The initial received message will be used to extract
+// request information and inject it into the context.
+func (w *wrappedServerStream) RecvMsg(req interface{}) error {
+ err := w.ServerStream.RecvMsg(req)
+
+ if w.initial {
+ w.initial = false
+
+ w.info.extractRequestInfo(req)
+ // Re-inject the tags a second time here.
+ w.info.injectTags(w.tags)
+ }
+
+ return err
+}
diff --git a/internal/grpc/middleware/requestinfohandler/requestinfohandler_test.go b/internal/grpc/middleware/requestinfohandler/requestinfohandler_test.go
index cdf7b08d1..0974ee6a1 100644
--- a/internal/grpc/middleware/requestinfohandler/requestinfohandler_test.go
+++ b/internal/grpc/middleware/requestinfohandler/requestinfohandler_test.go
@@ -2,14 +2,21 @@ package requestinfohandler
import (
"context"
+ "io"
+ "net"
"testing"
"time"
grpcmwtags "github.com/grpc-ecosystem/go-grpc-middleware/tags"
"github.com/stretchr/testify/require"
+ "gitlab.com/gitlab-org/gitaly/v16/internal/structerr"
"gitlab.com/gitlab-org/gitaly/v16/internal/testhelper"
+ "gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb"
"gitlab.com/gitlab-org/labkit/correlation"
+ "google.golang.org/grpc"
+ "google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/metadata"
+ "google.golang.org/grpc/test/bufconn"
)
const (
@@ -27,13 +34,13 @@ func TestNewRequestInfo(t *testing.T) {
fullMethod string
metadata metadata.MD
deadline bool
- expectedInfo requestInfo
+ expectedInfo *requestInfo
}{
{
desc: "empty metadata",
metadata: metadata.Pairs(),
deadline: false,
- expectedInfo: requestInfo{
+ expectedInfo: &requestInfo{
methodType: "unary",
clientName: unknownValue,
callSite: unknownValue,
@@ -47,7 +54,7 @@ func TestNewRequestInfo(t *testing.T) {
desc: "context containing metadata",
metadata: metadata.Pairs("call_site", "testsite"),
deadline: false,
- expectedInfo: requestInfo{
+ expectedInfo: &requestInfo{
methodType: "unary",
clientName: unknownValue,
callSite: "testsite",
@@ -61,7 +68,7 @@ func TestNewRequestInfo(t *testing.T) {
desc: "context containing metadata and a deadline",
metadata: metadata.Pairs("call_site", "testsite"),
deadline: true,
- expectedInfo: requestInfo{
+ expectedInfo: &requestInfo{
methodType: "unary",
clientName: unknownValue,
callSite: "testsite",
@@ -75,7 +82,7 @@ func TestNewRequestInfo(t *testing.T) {
desc: "context containing metadata and a deadline type",
metadata: metadata.Pairs("deadline_type", "regular"),
deadline: true,
- expectedInfo: requestInfo{
+ expectedInfo: &requestInfo{
methodType: "unary",
clientName: unknownValue,
callSite: unknownValue,
@@ -89,7 +96,7 @@ func TestNewRequestInfo(t *testing.T) {
desc: "a context without deadline but with deadline type",
metadata: metadata.Pairs("deadline_type", "regular"),
deadline: false,
- expectedInfo: requestInfo{
+ expectedInfo: &requestInfo{
methodType: "unary",
clientName: unknownValue,
callSite: unknownValue,
@@ -103,7 +110,7 @@ func TestNewRequestInfo(t *testing.T) {
desc: "with a context containing metadata",
metadata: metadata.Pairs("deadline_type", "regular", "client_name", "rails"),
deadline: true,
- expectedInfo: requestInfo{
+ expectedInfo: &requestInfo{
methodType: "unary",
clientName: "rails",
callSite: unknownValue,
@@ -118,7 +125,7 @@ func TestNewRequestInfo(t *testing.T) {
fullMethod: "/gitaly.RepositoryService/UnknownMethod",
metadata: metadata.Pairs(),
deadline: false,
- expectedInfo: requestInfo{
+ expectedInfo: &requestInfo{
fullMethod: "/gitaly.RepositoryService/UnknownMethod",
methodType: "unary",
clientName: unknownValue,
@@ -134,7 +141,7 @@ func TestNewRequestInfo(t *testing.T) {
fullMethod: "/gitaly.RepositoryService/ObjectFormat",
metadata: metadata.Pairs(),
deadline: false,
- expectedInfo: requestInfo{
+ expectedInfo: &requestInfo{
fullMethod: "/gitaly.RepositoryService/ObjectFormat",
methodType: "unary",
clientName: unknownValue,
@@ -150,7 +157,7 @@ func TestNewRequestInfo(t *testing.T) {
fullMethod: "/gitaly.RepositoryService/CreateRepository",
metadata: metadata.Pairs(),
deadline: false,
- expectedInfo: requestInfo{
+ expectedInfo: &requestInfo{
fullMethod: "/gitaly.RepositoryService/CreateRepository",
methodType: "unary",
clientName: unknownValue,
@@ -166,7 +173,7 @@ func TestNewRequestInfo(t *testing.T) {
fullMethod: "/gitaly.RepositoryService/OptimizeRepository",
metadata: metadata.Pairs(),
deadline: false,
- expectedInfo: requestInfo{
+ expectedInfo: &requestInfo{
fullMethod: "/gitaly.RepositoryService/OptimizeRepository",
methodType: "unary",
clientName: unknownValue,
@@ -182,7 +189,7 @@ func TestNewRequestInfo(t *testing.T) {
fullMethod: "/gitaly.RepositoryService/OptimizeRepository",
metadata: metadata.Pairs(),
deadline: false,
- expectedInfo: requestInfo{
+ expectedInfo: &requestInfo{
fullMethod: "/gitaly.RepositoryService/OptimizeRepository",
methodType: "unary",
clientName: unknownValue,
@@ -198,7 +205,7 @@ func TestNewRequestInfo(t *testing.T) {
fullMethod: "/gitaly.RemoteService/FindRemoteRepository",
metadata: metadata.Pairs(),
deadline: false,
- expectedInfo: requestInfo{
+ expectedInfo: &requestInfo{
fullMethod: "/gitaly.RemoteService/FindRemoteRepository",
methodType: "unary",
clientName: unknownValue,
@@ -247,9 +254,11 @@ func TestGRPCTags(t *testing.T) {
_, err := interceptor(ctx, nil, nil, func(ctx context.Context, _ interface{}) (interface{}, error) {
info := newRequestInfo(ctx, "/gitaly.RepositoryService/OptimizeRepository", "unary")
- info.injectTags(ctx)
- require.Equal(t, requestInfo{
+ tags := grpcmwtags.NewTags()
+ info.injectTags(tags)
+
+ require.Equal(t, &requestInfo{
correlationID: correlationID,
fullMethod: "/gitaly.RepositoryService/OptimizeRepository",
methodType: "unary",
@@ -268,7 +277,8 @@ func TestGRPCTags(t *testing.T) {
"grpc.meta.method_type": "unary",
"grpc.meta.method_operation": "maintenance",
"grpc.meta.method_scope": "repository",
- }, grpcmwtags.Extract(ctx).Values())
+ "grpc.request.fullMethod": "/gitaly.RepositoryService/OptimizeRepository",
+ }, tags.Values())
return nil, nil
})
@@ -320,3 +330,297 @@ func TestExtractServiceAndMethodName(t *testing.T) {
})
}
}
+
+func TestInterceptors(t *testing.T) {
+ t.Parallel()
+
+ ctx := testhelper.Context(t)
+
+ for _, tc := range []struct {
+ desc string
+ call func(*testing.T, mockClient)
+ expectedTags map[string]any
+ }{
+ {
+ desc: "unary repository-scoped call",
+ call: func(t *testing.T, client mockClient) {
+ _, err := client.RepositoryInfo(ctx, &gitalypb.RepositoryInfoRequest{
+ Repository: &gitalypb.Repository{
+ StorageName: "storage",
+ RelativePath: "path",
+ GlProjectPath: "glProject",
+ GlRepository: "glRepository",
+ },
+ })
+
+ require.NoError(t, err)
+ },
+ expectedTags: map[string]any{
+ "grpc.meta.deadline_type": "none",
+ "grpc.meta.method_operation": "accessor",
+ "grpc.meta.method_scope": "repository",
+ "grpc.meta.method_type": "unary",
+ "grpc.request.fullMethod": "/gitaly.RepositoryService/RepositoryInfo",
+ "grpc.request.repoStorage": "storage",
+ "grpc.request.repoPath": "path",
+ "grpc.request.glProjectPath": "glProject",
+ "grpc.request.glRepository": "glRepository",
+ },
+ },
+ {
+ desc: "unary repository-scoped call with unset repository",
+ call: func(t *testing.T, client mockClient) {
+ _, err := client.RepositoryInfo(ctx, &gitalypb.RepositoryInfoRequest{
+ Repository: nil,
+ })
+
+ require.NoError(t, err)
+ },
+ expectedTags: map[string]any{
+ "grpc.meta.deadline_type": "none",
+ "grpc.meta.method_operation": "accessor",
+ "grpc.meta.method_scope": "repository",
+ "grpc.meta.method_type": "unary",
+ "grpc.request.fullMethod": "/gitaly.RepositoryService/RepositoryInfo",
+ },
+ },
+ {
+ desc: "unary object-pool-scoped call",
+ call: func(t *testing.T, client mockClient) {
+ _, err := client.FetchIntoObjectPool(ctx, &gitalypb.FetchIntoObjectPoolRequest{
+ ObjectPool: &gitalypb.ObjectPool{
+ Repository: &gitalypb.Repository{
+ StorageName: "storage",
+ RelativePath: "path",
+ GlProjectPath: "glProject",
+ },
+ },
+ })
+
+ require.NoError(t, err)
+ },
+ expectedTags: map[string]any{
+ "grpc.meta.deadline_type": "none",
+ "grpc.meta.method_operation": "mutator",
+ "grpc.meta.method_scope": "repository",
+ "grpc.meta.method_type": "unary",
+ "grpc.request.fullMethod": "/gitaly.ObjectPoolService/FetchIntoObjectPool",
+ "grpc.request.pool.relativePath": "path",
+ "grpc.request.pool.storage": "storage",
+ "grpc.request.pool.sourceProjectPath": "glProject",
+ },
+ },
+ {
+ desc: "unary repository-scoped call with deadline",
+ call: func(t *testing.T, client mockClient) {
+ ctx, cancel := context.WithDeadline(ctx, time.Date(2100, time.January, 1, 12, 0, 0, 0, time.UTC))
+ defer cancel()
+
+ _, err := client.RepositoryInfo(ctx, &gitalypb.RepositoryInfoRequest{
+ Repository: &gitalypb.Repository{
+ StorageName: "storage",
+ RelativePath: "path",
+ GlProjectPath: "glProject",
+ GlRepository: "glRepository",
+ },
+ })
+
+ require.NoError(t, err)
+ },
+ expectedTags: map[string]any{
+ // Note that there is no "deadline: none" field anymore. If we were
+ // to inject the deadline type then it would appear here.
+ "grpc.meta.method_operation": "accessor",
+ "grpc.meta.method_scope": "repository",
+ "grpc.meta.method_type": "unary",
+ "grpc.request.fullMethod": "/gitaly.RepositoryService/RepositoryInfo",
+ "grpc.request.repoStorage": "storage",
+ "grpc.request.repoPath": "path",
+ "grpc.request.glProjectPath": "glProject",
+ "grpc.request.glRepository": "glRepository",
+ },
+ },
+ {
+ desc: "unary repository-scoped call with additional metadata",
+ call: func(t *testing.T, client mockClient) {
+ ctx, cancel := context.WithDeadline(ctx, time.Date(2100, time.January, 1, 12, 0, 0, 0, time.UTC))
+ defer cancel()
+
+ ctx = metadata.NewOutgoingContext(ctx, metadata.MD{
+ "call_site": []string{"callSite"},
+ "deadline_type": []string{"deadlineType"},
+ "client_name": []string{"clientName"},
+ "remote_ip": []string{"remoteIP"},
+ "user_id": []string{"userID"},
+ "username": []string{"userName"},
+ correlation.FieldName: []string{"correlationID"},
+ })
+
+ _, err := client.RepositoryInfo(ctx, &gitalypb.RepositoryInfoRequest{
+ Repository: &gitalypb.Repository{
+ StorageName: "storage",
+ RelativePath: "path",
+ GlProjectPath: "glProject",
+ GlRepository: "glRepository",
+ },
+ })
+
+ require.NoError(t, err)
+ },
+ expectedTags: map[string]any{
+ "grpc.meta.call_site": "callSite",
+ "grpc.meta.deadline_type": "deadlineType",
+ "grpc.meta.client_name": "clientName",
+ "grpc.meta.method_operation": "accessor",
+ "grpc.meta.method_scope": "repository",
+ "grpc.meta.method_type": "unary",
+ "grpc.request.fullMethod": "/gitaly.RepositoryService/RepositoryInfo",
+ "grpc.request.repoStorage": "storage",
+ "grpc.request.repoPath": "path",
+ "grpc.request.glProjectPath": "glProject",
+ "grpc.request.glRepository": "glRepository",
+ "remote_ip": "remoteIP",
+ "user_id": "userID",
+ "username": "userName",
+ },
+ },
+ {
+ desc: "streaming repository-scoped call",
+ call: func(t *testing.T, client mockClient) {
+ stream, err := client.CreateBundleFromRefList(ctx)
+ require.NoError(t, err)
+
+ require.NoError(t, stream.Send(&gitalypb.CreateBundleFromRefListRequest{
+ Repository: &gitalypb.Repository{
+ StorageName: "storage",
+ RelativePath: "path",
+ GlProjectPath: "glProject",
+ GlRepository: "glRepository",
+ },
+ }))
+
+ _, err = stream.Recv()
+ require.NoError(t, err)
+ },
+ expectedTags: map[string]any{
+ "grpc.meta.deadline_type": "none",
+ "grpc.meta.method_operation": "accessor",
+ "grpc.meta.method_scope": "repository",
+ "grpc.meta.method_type": "bidi_stream",
+ "grpc.request.fullMethod": "/gitaly.RepositoryService/CreateBundleFromRefList",
+ "grpc.request.repoStorage": "storage",
+ "grpc.request.repoPath": "path",
+ "grpc.request.glProjectPath": "glProject",
+ "grpc.request.glRepository": "glRepository",
+ },
+ },
+ {
+ desc: "streaming repository-scoped call with missing initial request",
+ call: func(t *testing.T, client mockClient) {
+ stream, err := client.CreateBundleFromRefList(ctx)
+ require.NoError(t, err)
+ require.NoError(t, stream.CloseSend())
+
+ _, err = stream.Recv()
+ testhelper.RequireGrpcError(t, structerr.New("%w", io.EOF), err)
+ },
+ expectedTags: map[string]any{
+ "grpc.meta.deadline_type": "none",
+ "grpc.meta.method_operation": "accessor",
+ "grpc.meta.method_scope": "repository",
+ "grpc.meta.method_type": "bidi_stream",
+ "grpc.request.fullMethod": "/gitaly.RepositoryService/CreateBundleFromRefList",
+ },
+ },
+ } {
+ t.Run(tc.desc, func(t *testing.T) {
+ server, client := setupServer(t, ctx)
+
+ tc.call(t, client)
+
+ if tc.expectedTags == nil {
+ require.Equal(t, nil, tc.expectedTags)
+ } else {
+ require.Equal(t, tc.expectedTags, server.tags.Values())
+ }
+ })
+ }
+}
+
+type mockServer struct {
+ gitalypb.RepositoryServiceServer
+ gitalypb.ObjectPoolServiceServer
+ tags grpcmwtags.Tags
+}
+
+type mockClient struct {
+ gitalypb.RepositoryServiceClient
+ gitalypb.ObjectPoolServiceClient
+}
+
+func setupServer(tb testing.TB, ctx context.Context) (*mockServer, mockClient) {
+ tb.Helper()
+
+ var mockServer mockServer
+
+ server := grpc.NewServer(
+ grpc.ChainUnaryInterceptor(
+ grpcmwtags.UnaryServerInterceptor(),
+ UnaryInterceptor,
+ // This interceptor and the equivalent interceptor for the streaming gRPC calls is responsible
+ // for recording the tags that the preceding interceptor has injected.
+ func(ctx context.Context, req any, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (any, error) {
+ mockServer.tags = grpcmwtags.Extract(ctx)
+ return handler(ctx, req)
+ },
+ ),
+ grpc.ChainStreamInterceptor(
+ grpcmwtags.StreamServerInterceptor(),
+ StreamInterceptor,
+ func(server any, stream grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
+ mockServer.tags = grpcmwtags.Extract(stream.Context())
+ return handler(server, stream)
+ },
+ ),
+ )
+ tb.Cleanup(server.Stop)
+ gitalypb.RegisterRepositoryServiceServer(server, &mockServer)
+ gitalypb.RegisterObjectPoolServiceServer(server, &mockServer)
+
+ listener := bufconn.Listen(1)
+ go testhelper.MustServe(tb, server, listener)
+
+ conn, err := grpc.DialContext(ctx, listener.Addr().String(),
+ grpc.WithTransportCredentials(insecure.NewCredentials()),
+ grpc.WithContextDialer(func(ctx context.Context, _ string) (net.Conn, error) {
+ return listener.DialContext(ctx)
+ }),
+ )
+ require.NoError(tb, err)
+ tb.Cleanup(func() { testhelper.MustClose(tb, conn) })
+
+ return &mockServer, mockClient{
+ RepositoryServiceClient: gitalypb.NewRepositoryServiceClient(conn),
+ ObjectPoolServiceClient: gitalypb.NewObjectPoolServiceClient(conn),
+ }
+}
+
+func (s *mockServer) RepositoryInfo(ctx context.Context, _ *gitalypb.RepositoryInfoRequest) (*gitalypb.RepositoryInfoResponse, error) {
+ return &gitalypb.RepositoryInfoResponse{}, nil
+}
+
+func (s *mockServer) FetchIntoObjectPool(ctx context.Context, _ *gitalypb.FetchIntoObjectPoolRequest) (*gitalypb.FetchIntoObjectPoolResponse, error) {
+ return &gitalypb.FetchIntoObjectPoolResponse{}, nil
+}
+
+func (s *mockServer) CreateBundleFromRefList(stream gitalypb.RepositoryService_CreateBundleFromRefListServer) error {
+ if _, err := stream.Recv(); err != nil {
+ return err
+ }
+
+ if err := stream.Send(&gitalypb.CreateBundleFromRefListResponse{}); err != nil {
+ return err
+ }
+
+ return nil
+}
diff --git a/internal/helper/fieldextractors/fieldextractor.go b/internal/helper/fieldextractors/fieldextractor.go
deleted file mode 100644
index 2f4a1ec1a..000000000
--- a/internal/helper/fieldextractors/fieldextractor.go
+++ /dev/null
@@ -1,96 +0,0 @@
-package fieldextractors
-
-import (
- "strings"
-
- "gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb"
-)
-
-type repositoryBasedRequest interface {
- GetRepository() *gitalypb.Repository
-}
-
-type storageBasedRequest interface {
- GetStorageName() string
-}
-
-func formatRepoRequest(repo *gitalypb.Repository) map[string]interface{} {
- if repo == nil {
- // Signals that the client did not send a repo through, which
- // will be useful for logging
- return map[string]interface{}{
- "repo": nil,
- }
- }
-
- return map[string]interface{}{
- "repoStorage": repo.StorageName,
- "repoPath": repo.RelativePath,
- "glRepository": repo.GlRepository,
- "glProjectPath": repo.GlProjectPath,
- }
-}
-
-func formatStorageRequest(storageReq storageBasedRequest) map[string]interface{} {
- return map[string]interface{}{
- "StorageName": storageReq.GetStorageName(),
- }
-}
-
-// FieldExtractor will extract the relevant fields from an incoming grpc request
-func FieldExtractor(fullMethod string, req interface{}) map[string]interface{} {
- if req == nil {
- return nil
- }
-
- var result map[string]interface{}
-
- switch req := req.(type) {
- case repositoryBasedRequest:
- result = formatRepoRequest(req.GetRepository())
- case storageBasedRequest:
- result = formatStorageRequest(req)
- }
-
- if result == nil {
- result = make(map[string]interface{})
- }
-
- switch {
- case strings.HasPrefix(fullMethod, "/gitaly.ObjectPoolService/"):
- addObjectPool(req, result)
- }
-
- result["fullMethod"] = fullMethod
-
- return result
-}
-
-type objectPoolRequest interface {
- GetObjectPool() *gitalypb.ObjectPool
-}
-
-func addObjectPool(req interface{}, tags map[string]interface{}) {
- oReq, ok := req.(objectPoolRequest)
- if !ok {
- return
- }
-
- pool := oReq.GetObjectPool()
- if pool == nil {
- return
- }
-
- repo := pool.GetRepository()
- if repo == nil {
- return
- }
-
- for k, v := range map[string]string{
- "pool.storage": repo.StorageName,
- "pool.relativePath": repo.RelativePath,
- "pool.sourceProjectPath": repo.GlProjectPath,
- } {
- tags[k] = v
- }
-}
diff --git a/internal/praefect/server.go b/internal/praefect/server.go
index 4044e8eab..167688b51 100644
--- a/internal/praefect/server.go
+++ b/internal/praefect/server.go
@@ -8,7 +8,6 @@ import (
"time"
grpcmwlogrus "github.com/grpc-ecosystem/go-grpc-middleware/logging/logrus"
- grpcmwtags "github.com/grpc-ecosystem/go-grpc-middleware/tags"
grpcprometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
"gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/server/auth"
"gitlab.com/gitlab-org/gitaly/v16/internal/grpc/backchannel"
@@ -20,7 +19,6 @@ import (
"gitlab.com/gitlab-org/gitaly/v16/internal/grpc/middleware/statushandler"
"gitlab.com/gitlab-org/gitaly/v16/internal/grpc/proxy"
"gitlab.com/gitlab-org/gitaly/v16/internal/grpc/sidechannel"
- "gitlab.com/gitlab-org/gitaly/v16/internal/helper/fieldextractors"
"gitlab.com/gitlab-org/gitaly/v16/internal/log"
"gitlab.com/gitlab-org/gitaly/v16/internal/praefect/config"
"gitlab.com/gitlab-org/gitaly/v16/internal/praefect/datastore"
@@ -67,7 +65,6 @@ func NewBackchannelServerFactory(logger log.Logger, refSvc gitalypb.RefTransacti
func commonUnaryServerInterceptors(logger log.Logger, messageProducer grpcmwlogrus.MessageProducer) []grpc.UnaryServerInterceptor {
return []grpc.UnaryServerInterceptor{
- grpcmwtags.UnaryServerInterceptor(ctxtagsInterceptorOption()),
grpccorrelation.UnaryServerCorrelationInterceptor(), // Must be above the metadata handler
requestinfohandler.UnaryInterceptor,
grpcprometheus.UnaryServerInterceptor,
@@ -85,10 +82,6 @@ func commonUnaryServerInterceptors(logger log.Logger, messageProducer grpcmwlogr
}
}
-func ctxtagsInterceptorOption() grpcmwtags.Option {
- return grpcmwtags.WithFieldExtractorForInitialReq(fieldextractors.FieldExtractor)
-}
-
// ServerOption is an option that can be passed to `NewGRPCServer()`.
type ServerOption func(cfg *serverConfig)
@@ -136,7 +129,6 @@ func NewGRPCServer(
unaryInterceptors = append(unaryInterceptors, serverCfg.unaryInterceptors...)
streamInterceptors := []grpc.StreamServerInterceptor{
- grpcmwtags.StreamServerInterceptor(ctxtagsInterceptorOption()),
grpccorrelation.StreamServerCorrelationInterceptor(), // Must be above the metadata handler
middleware.MethodTypeStreamInterceptor(deps.Registry, deps.Logger),
requestinfohandler.StreamInterceptor,