diff options
author | James Fargher <jfargher@gitlab.com> | 2023-07-12 02:51:06 +0300 |
---|---|---|
committer | James Fargher <jfargher@gitlab.com> | 2023-07-12 02:51:06 +0300 |
commit | 510c9aa7917db90134845287a10ddfb280b2fe20 (patch) | |
tree | 4c4c944e9b08d304067f697ce268c7f899c474f0 | |
parent | 6e31101f69c153a19b71adf31037c0b681add84a (diff) | |
parent | b44bd8c40b777031a6f0bf01e2050b39a0540338 (diff) |
Merge branch 'qmnguyen0711/refactor-limithandler' into 'master'
Refactor limithandler package
See merge request https://gitlab.com/gitlab-org/gitaly/-/merge_requests/6038
Merged-by: James Fargher <jfargher@gitlab.com>
Approved-by: James Fargher <jfargher@gitlab.com>
Reviewed-by: James Fargher <jfargher@gitlab.com>
Reviewed-by: Justin Tobler <jtobler@gitlab.com>
Co-authored-by: Quang-Minh Nguyen <qmnguyen@gitlab.com>
21 files changed, 252 insertions, 531 deletions
diff --git a/cmd/gitaly-hooks/hooks_test.go b/cmd/gitaly-hooks/hooks_test.go index 9ceb43681..f0883e5bd 100644 --- a/cmd/gitaly-hooks/hooks_test.go +++ b/cmd/gitaly-hooks/hooks_test.go @@ -29,8 +29,8 @@ import ( "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/transaction" "gitlab.com/gitlab-org/gitaly/v16/internal/gitlab" "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/metadata" - "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/middleware/limithandler" "gitlab.com/gitlab-org/gitaly/v16/internal/helper/text" + "gitlab.com/gitlab-org/gitaly/v16/internal/limiter" gitalylog "gitlab.com/gitlab-org/gitaly/v16/internal/log" "gitlab.com/gitlab-org/gitaly/v16/internal/structerr" "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper" @@ -660,8 +660,8 @@ func TestGitalyHooksPackObjects(t *testing.T) { } func TestGitalyServerReturnsError(t *testing.T) { - resourceExhaustedErr := structerr.NewResourceExhausted("%w", limithandler.ErrMaxQueueTime).WithDetail(&gitalypb.LimitError{ - ErrorMessage: limithandler.ErrMaxQueueTime.Error(), + resourceExhaustedErr := structerr.NewResourceExhausted("%w", limiter.ErrMaxQueueTime).WithDetail(&gitalypb.LimitError{ + ErrorMessage: limiter.ErrMaxQueueTime.Error(), RetryAfter: durationpb.New(0), }) @@ -765,8 +765,8 @@ func TestGitalyServerReturnsError_packObjects(t *testing.T) { }{ { name: "resource exhausted with LimitError detail", - err: structerr.NewResourceExhausted("%w", limithandler.ErrMaxQueueTime).WithDetail(&gitalypb.LimitError{ - ErrorMessage: limithandler.ErrMaxQueueTime.Error(), + err: structerr.NewResourceExhausted("%w", limiter.ErrMaxQueueTime).WithDetail(&gitalypb.LimitError{ + ErrorMessage: limiter.ErrMaxQueueTime.Error(), RetryAfter: durationpb.New(0), }), expectedStderr: ` diff --git a/internal/cli/gitaly/serve.go b/internal/cli/gitaly/serve.go index 8ba00536a..3fb245511 100644 --- a/internal/cli/gitaly/serve.go +++ b/internal/cli/gitaly/serve.go @@ -40,6 +40,7 @@ import ( "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/middleware/limithandler" "gitlab.com/gitlab-org/gitaly/v16/internal/helper" "gitlab.com/gitlab-org/gitaly/v16/internal/helper/env" + "gitlab.com/gitlab-org/gitaly/v16/internal/limiter" glog "gitlab.com/gitlab-org/gitaly/v16/internal/log" "gitlab.com/gitlab-org/gitaly/v16/internal/streamcache" "gitlab.com/gitlab-org/gitaly/v16/internal/tempdir" @@ -269,7 +270,7 @@ func run(cfg config.Cfg) error { limithandler.WithRateLimiters(ctx), ) - packObjectsMonitor := limithandler.NewPackObjectsConcurrencyMonitor( + packObjectsMonitor := limiter.NewPackObjectsConcurrencyMonitor( cfg.Prometheus.GRPCLatencyBuckets, ) newTickerFunc := func() helper.Ticker { @@ -280,7 +281,7 @@ func run(cfg config.Cfg) error { return helper.NewTimerTicker(cfg.PackObjectsLimiting.MaxQueueWait.Duration()) } } - packObjectsLimiter := limithandler.NewConcurrencyLimiter( + packObjectsLimiter := limiter.NewConcurrencyLimiter( cfg.PackObjectsLimiting.MaxConcurrency, cfg.PackObjectsLimiting.MaxQueueLength, newTickerFunc, diff --git a/internal/gitaly/server/server.go b/internal/gitaly/server/server.go index f9caf4e08..79392c46f 100644 --- a/internal/gitaly/server/server.go +++ b/internal/gitaly/server/server.go @@ -16,7 +16,6 @@ import ( "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/middleware/cache" "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/middleware/customfieldshandler" "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/middleware/featureflag" - "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/middleware/limithandler" "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/middleware/metadatahandler" "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/middleware/panichandler" "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/middleware/sentryhandler" @@ -111,7 +110,6 @@ func (s *GitalyServerFactory) New(secure bool, opts ...Option) (*grpc.Server, er grpcstats.FieldsProducer, featureflag.FieldsProducer, structerr.FieldsProducer, - limithandler.FieldsProducer, ), ) @@ -121,7 +119,6 @@ func (s *GitalyServerFactory) New(secure bool, opts ...Option) (*grpc.Server, er metadatahandler.StreamInterceptor, grpcprometheus.StreamServerInterceptor, customfieldshandler.StreamInterceptor, - limithandler.StatsStreamInterceptor, grpcmwlogrus.StreamServerInterceptor(s.logger, grpcmwlogrus.WithTimestampFormat(gitalylog.LogTimestampFormat), logMsgProducer, @@ -138,7 +135,6 @@ func (s *GitalyServerFactory) New(secure bool, opts ...Option) (*grpc.Server, er metadatahandler.UnaryInterceptor, grpcprometheus.UnaryServerInterceptor, customfieldshandler.UnaryInterceptor, - limithandler.StatsUnaryInterceptor, grpcmwlogrus.UnaryServerInterceptor(s.logger, grpcmwlogrus.WithTimestampFormat(gitalylog.LogTimestampFormat), logMsgProducer, diff --git a/internal/gitaly/service/dependencies.go b/internal/gitaly/service/dependencies.go index 4b1cb34f1..6daf570c1 100644 --- a/internal/gitaly/service/dependencies.go +++ b/internal/gitaly/service/dependencies.go @@ -17,6 +17,7 @@ import ( "gitlab.com/gitlab-org/gitaly/v16/internal/gitlab" "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/backchannel" "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/middleware/limithandler" + "gitlab.com/gitlab-org/gitaly/v16/internal/limiter" "gitlab.com/gitlab-org/gitaly/v16/internal/streamcache" ) @@ -33,7 +34,7 @@ type Dependencies struct { CatfileCache catfile.Cache DiskCache cache.Cache PackObjectsCache streamcache.Cache - PackObjectsLimiter limithandler.Limiter + PackObjectsLimiter limiter.Limiter LimitHandler *limithandler.LimiterMiddleware Git2goExecutor *git2go.Executor UpdaterWithHooks *updateref.UpdaterWithHooks @@ -119,7 +120,7 @@ func (dc *Dependencies) GetHousekeepingManager() housekeeping.Manager { } // GetPackObjectsLimiter returns the pack-objects limiter. -func (dc *Dependencies) GetPackObjectsLimiter() limithandler.Limiter { +func (dc *Dependencies) GetPackObjectsLimiter() limiter.Limiter { return dc.PackObjectsLimiter } diff --git a/internal/gitaly/service/hook/pack_objects_test.go b/internal/gitaly/service/hook/pack_objects_test.go index 312bb0ab0..4d4d1a500 100644 --- a/internal/gitaly/service/hook/pack_objects_test.go +++ b/internal/gitaly/service/hook/pack_objects_test.go @@ -22,8 +22,8 @@ import ( "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/config" hookPkg "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/hook" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage" - "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/middleware/limithandler" "gitlab.com/gitlab-org/gitaly/v16/internal/helper" + "gitlab.com/gitlab-org/gitaly/v16/internal/limiter" "gitlab.com/gitlab-org/gitaly/v16/internal/streamcache" "gitlab.com/gitlab-org/gitaly/v16/internal/structerr" "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper" @@ -854,10 +854,10 @@ func TestPackObjects_concurrencyLimit(t *testing.T) { cfg := cfgWithCache(t, 0) ticker := helper.NewManualTicker() - monitor := limithandler.NewPackObjectsConcurrencyMonitor( + monitor := limiter.NewPackObjectsConcurrencyMonitor( cfg.Prometheus.GRPCLatencyBuckets, ) - limiter := limithandler.NewConcurrencyLimiter( + limiter := limiter.NewConcurrencyLimiter( 1, 0, func() helper.Ticker { return ticker }, diff --git a/internal/gitaly/service/hook/server.go b/internal/gitaly/service/hook/server.go index c8944f3ec..d99f66aa9 100644 --- a/internal/gitaly/service/hook/server.go +++ b/internal/gitaly/service/hook/server.go @@ -7,7 +7,7 @@ import ( "gitlab.com/gitlab-org/gitaly/v16/internal/git" gitalyhook "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/hook" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage" - "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/middleware/limithandler" + "gitlab.com/gitlab-org/gitaly/v16/internal/limiter" "gitlab.com/gitlab-org/gitaly/v16/internal/streamcache" "gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb" ) @@ -18,7 +18,7 @@ type server struct { locator storage.Locator gitCmdFactory git.CommandFactory packObjectsCache streamcache.Cache - packObjectsLimiter limithandler.Limiter + packObjectsLimiter limiter.Limiter runPackObjectsFn func( context.Context, git.CommandFactory, @@ -36,7 +36,7 @@ func NewServer( locator storage.Locator, gitCmdFactory git.CommandFactory, packObjectsCache streamcache.Cache, - packObjectsLimiter limithandler.Limiter, + packObjectsLimiter limiter.Limiter, ) gitalypb.HookServiceServer { srv := &server{ manager: manager, diff --git a/internal/grpc/middleware/limithandler/middleware.go b/internal/grpc/middleware/limithandler/middleware.go index c532943a4..0c41a4672 100644 --- a/internal/grpc/middleware/limithandler/middleware.go +++ b/internal/grpc/middleware/limithandler/middleware.go @@ -2,10 +2,14 @@ package limithandler import ( "context" + "strings" + "time" grpcmwtags "github.com/grpc-ecosystem/go-grpc-middleware/tags" "github.com/prometheus/client_golang/prometheus" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/config" + "gitlab.com/gitlab-org/gitaly/v16/internal/helper" + "gitlab.com/gitlab-org/gitaly/v16/internal/limiter" "google.golang.org/grpc" ) @@ -28,17 +32,9 @@ func LimitConcurrencyByRepo(ctx context.Context) string { return "" } -// Limiter limits incoming requests -type Limiter interface { - Limit(ctx context.Context, lockKey string, f LimitedFunc) (interface{}, error) -} - -// LimitedFunc represents a function that will be limited -type LimitedFunc func() (resp interface{}, err error) - // LimiterMiddleware contains rate limiter state type LimiterMiddleware struct { - methodLimiters map[string]Limiter + methodLimiters map[string]limiter.Limiter getLockKey GetLockKey requestsDroppedMetric *prometheus.CounterVec collect func(metrics chan<- prometheus.Metric) @@ -166,3 +162,123 @@ func (w *wrappedStream) RecvMsg(m interface{}) error { return err } } + +// WithConcurrencyLimiters sets up middleware to limit the concurrency of +// requests based on RPC and repository +func WithConcurrencyLimiters(cfg config.Cfg, middleware *LimiterMiddleware) { + acquiringSecondsMetric := prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: "gitaly", + Subsystem: "concurrency_limiting", + Name: "acquiring_seconds", + Help: "Histogram of time calls are rate limited (in seconds)", + Buckets: cfg.Prometheus.GRPCLatencyBuckets, + }, + []string{"system", "grpc_service", "grpc_method"}, + ) + inProgressMetric := prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: "gitaly", + Subsystem: "concurrency_limiting", + Name: "in_progress", + Help: "Gauge of number of concurrent in-progress calls", + }, + []string{"system", "grpc_service", "grpc_method"}, + ) + queuedMetric := prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: "gitaly", + Subsystem: "concurrency_limiting", + Name: "queued", + Help: "Gauge of number of queued calls", + }, + []string{"system", "grpc_service", "grpc_method"}, + ) + + middleware.collect = func(metrics chan<- prometheus.Metric) { + acquiringSecondsMetric.Collect(metrics) + inProgressMetric.Collect(metrics) + queuedMetric.Collect(metrics) + } + + result := make(map[string]limiter.Limiter) + for _, limit := range cfg.Concurrency { + limit := limit + + newTickerFunc := func() helper.Ticker { + return helper.NewManualTicker() + } + + if limit.MaxQueueWait > 0 { + newTickerFunc = func() helper.Ticker { + return helper.NewTimerTicker(limit.MaxQueueWait.Duration()) + } + } + + result[limit.RPC] = limiter.NewConcurrencyLimiter( + limit.MaxPerRepo, + limit.MaxQueueSize, + newTickerFunc, + limiter.NewPerRPCPromMonitor( + "gitaly", limit.RPC, + queuedMetric, inProgressMetric, acquiringSecondsMetric, middleware.requestsDroppedMetric, + ), + ) + } + + // Set default for ReplicateRepository. + replicateRepositoryFullMethod := "/gitaly.RepositoryService/ReplicateRepository" + if _, ok := result[replicateRepositoryFullMethod]; !ok { + result[replicateRepositoryFullMethod] = limiter.NewConcurrencyLimiter( + 1, + 0, + func() helper.Ticker { + return helper.NewManualTicker() + }, + limiter.NewPerRPCPromMonitor( + "gitaly", replicateRepositoryFullMethod, + queuedMetric, inProgressMetric, acquiringSecondsMetric, middleware.requestsDroppedMetric, + ), + ) + } + + middleware.methodLimiters = result +} + +// WithRateLimiters sets up a middleware with limiters that limit requests +// based on its rate per second per RPC +func WithRateLimiters(ctx context.Context) SetupFunc { + return func(cfg config.Cfg, middleware *LimiterMiddleware) { + result := make(map[string]limiter.Limiter) + + for _, limitCfg := range cfg.RateLimiting { + if limitCfg.Burst > 0 && limitCfg.Interval > 0 { + serviceName, methodName := splitMethodName(limitCfg.RPC) + rateLimiter := limiter.NewRateLimiter( + limitCfg.Interval.Duration(), + limitCfg.Burst, + helper.NewTimerTicker(5*time.Minute), + middleware.requestsDroppedMetric.With(prometheus.Labels{ + "system": "gitaly", + "grpc_service": serviceName, + "grpc_method": methodName, + "reason": "rate", + }), + ) + result[limitCfg.RPC] = rateLimiter + go rateLimiter.PruneUnusedLimiters(ctx) + } + } + + middleware.methodLimiters = result + } +} + +func splitMethodName(fullMethodName string) (string, string) { + fullMethodName = strings.TrimPrefix(fullMethodName, "/") // remove leading slash + service, method, ok := strings.Cut(fullMethodName, "/") + if !ok { + return "unknown", "unknown" + } + return service, method +} diff --git a/internal/grpc/middleware/limithandler/middleware_test.go b/internal/grpc/middleware/limithandler/middleware_test.go index e644d9f49..65adad2be 100644 --- a/internal/grpc/middleware/limithandler/middleware_test.go +++ b/internal/grpc/middleware/limithandler/middleware_test.go @@ -15,6 +15,7 @@ import ( "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/config" "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/middleware/limithandler" "gitlab.com/gitlab-org/gitaly/v16/internal/helper/duration" + "gitlab.com/gitlab-org/gitaly/v16/internal/limiter" "gitlab.com/gitlab-org/gitaly/v16/internal/structerr" "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper" "gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb" @@ -142,7 +143,7 @@ func TestUnaryLimitHandler_queueing(t *testing.T) { // Now we spawn a second RPC call. As the concurrency limit is satisfied we'll be // put into queue and will eventually return with an error. _, err := client.UnaryCall(ctx, &grpc_testing.SimpleRequest{}) - testhelper.RequireGrpcError(t, structerr.NewResourceExhausted("%w", limithandler.ErrMaxQueueTime).WithDetail( + testhelper.RequireGrpcError(t, structerr.NewResourceExhausted("%w", limiter.ErrMaxQueueTime).WithDetail( &gitalypb.LimitError{ ErrorMessage: "maximum time in concurrency queue reached", RetryAfter: durationpb.New(0), @@ -272,7 +273,7 @@ func TestStreamLimitHandler(t *testing.T) { maxConcurrency: 3, expectedRequestCount: 4, expectedResponseCount: 4, - expectedErr: structerr.NewResourceExhausted("%w", limithandler.ErrMaxQueueSize).WithDetail( + expectedErr: structerr.NewResourceExhausted("%w", limiter.ErrMaxQueueSize).WithDetail( &gitalypb.LimitError{ ErrorMessage: "maximum queue size reached", RetryAfter: durationpb.New(0), @@ -302,7 +303,7 @@ func TestStreamLimitHandler(t *testing.T) { maxConcurrency: 3, expectedRequestCount: 4, expectedResponseCount: 4, - expectedErr: structerr.NewResourceExhausted("%w", limithandler.ErrMaxQueueSize).WithDetail( + expectedErr: structerr.NewResourceExhausted("%w", limiter.ErrMaxQueueSize).WithDetail( &gitalypb.LimitError{ ErrorMessage: "maximum queue size reached", RetryAfter: durationpb.New(0), @@ -334,7 +335,7 @@ func TestStreamLimitHandler(t *testing.T) { maxConcurrency: 3, expectedRequestCount: 4, expectedResponseCount: 4, - expectedErr: structerr.NewResourceExhausted("%w", limithandler.ErrMaxQueueSize).WithDetail( + expectedErr: structerr.NewResourceExhausted("%w", limiter.ErrMaxQueueSize).WithDetail( &gitalypb.LimitError{ ErrorMessage: "maximum queue size reached", RetryAfter: durationpb.New(0), @@ -388,7 +389,7 @@ func TestStreamLimitHandler(t *testing.T) { // + 1 (queued stream) * (10 requests per stream) expectedRequestCount: 40, expectedResponseCount: 4, - expectedErr: structerr.NewResourceExhausted("%w", limithandler.ErrMaxQueueSize).WithDetail( + expectedErr: structerr.NewResourceExhausted("%w", limiter.ErrMaxQueueSize).WithDetail( &gitalypb.LimitError{ ErrorMessage: "maximum queue size reached", RetryAfter: durationpb.New(0), @@ -652,7 +653,7 @@ func TestConcurrencyLimitHandlerMetrics(t *testing.T) { limitErr, ok := details[0].(*gitalypb.LimitError) require.True(t, ok) - assert.Equal(t, limithandler.ErrMaxQueueSize.Error(), limitErr.ErrorMessage) + assert.Equal(t, limiter.ErrMaxQueueSize.Error(), limitErr.ErrorMessage) assert.Equal(t, durationpb.New(0), limitErr.RetryAfter) errs++ @@ -732,7 +733,7 @@ func TestRateLimitHandler(t *testing.T) { limitErr, ok := details[0].(*gitalypb.LimitError) require.True(t, ok) - assert.Equal(t, limithandler.ErrRateLimit.Error(), limitErr.ErrorMessage) + assert.Equal(t, limiter.ErrRateLimit.Error(), limitErr.ErrorMessage) assert.Equal(t, durationpb.New(0), limitErr.RetryAfter) } diff --git a/internal/grpc/middleware/limithandler/stats.go b/internal/grpc/middleware/limithandler/stats.go deleted file mode 100644 index 30926695a..000000000 --- a/internal/grpc/middleware/limithandler/stats.go +++ /dev/null @@ -1,125 +0,0 @@ -package limithandler - -import ( - "context" - "sync" - - grpcmw "github.com/grpc-ecosystem/go-grpc-middleware" - "github.com/sirupsen/logrus" - "google.golang.org/grpc" -) - -type limitStatsKey struct{} - -// LimitStats contains info about the concurrency limiter. -type LimitStats struct { - sync.Mutex - // limitingKey is the key used for limiting accounting - limitingKey string - // limitingType is the type of limiter - limitingType string - // concurrencyQueueLen is the combination of in-flight requests and in-queue requests. It tells - // how busy the queue of the same limiting key is - concurrencyQueueLength int - // concurrencyQueueMs milliseconds waiting in concurrency limit queue. - concurrencyQueueMs int64 - // concurrencyDropped stores the dropping reason of a request - concurrencyDropped string -} - -// InitLimitStats initializes context with a per-RPC stats struct. -func InitLimitStats(ctx context.Context) context.Context { - return context.WithValue(ctx, limitStatsKey{}, &LimitStats{}) -} - -// AddConcurrencyQueueMs adds queue time. -func (s *LimitStats) AddConcurrencyQueueMs(queueMs int64) { - s.Lock() - defer s.Unlock() - s.concurrencyQueueMs = queueMs -} - -// SetLimitingKey set limiting key. -func (s *LimitStats) SetLimitingKey(limitingType string, limitingKey string) { - s.Lock() - defer s.Unlock() - s.limitingKey = limitingKey - s.limitingType = limitingType -} - -// SetConcurrencyQueueLength set concurrency queue length. -func (s *LimitStats) SetConcurrencyQueueLength(queueLength int) { - s.Lock() - defer s.Unlock() - s.concurrencyQueueLength = queueLength -} - -// SetConcurrencyDroppedReason sets the reason why a call has been dropped from the queue. -func (s *LimitStats) SetConcurrencyDroppedReason(reason string) { - s.Lock() - defer s.Unlock() - s.concurrencyDropped = reason -} - -// Fields returns logging info. -func (s *LimitStats) Fields() logrus.Fields { - s.Lock() - defer s.Unlock() - - if s.limitingKey == "" { - return nil - } - logs := logrus.Fields{ - "limit.limiting_type": s.limitingType, - "limit.limiting_key": s.limitingKey, - "limit.concurrency_queue_ms": s.concurrencyQueueMs, - "limit.concurrency_queue_length": s.concurrencyQueueLength, - } - if s.concurrencyDropped != "" { - logs["limit.concurrency_dropped"] = s.concurrencyDropped - } - return logs -} - -// FieldsProducer extracts stats info from the context and returns it as a logging fields. -func FieldsProducer(ctx context.Context, _ error) logrus.Fields { - stats := limitStatsFromContext(ctx) - if stats != nil { - return stats.Fields() - } - return nil -} - -func limitStatsFromContext(ctx context.Context) *LimitStats { - v, ok := ctx.Value(limitStatsKey{}).(*LimitStats) - if !ok { - return nil - } - return v -} - -// stats interceptors are separate from middleware and serve one main purpose: -// initialize the stats object early, so that logrus can see it. -// it must be placed before the limithandler middleware. - -// StatsUnaryInterceptor returns a Unary Interceptor that initializes the context. -func StatsUnaryInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) { - ctx = InitLimitStats(ctx) - - res, err := handler(ctx, req) - - return res, err -} - -// StatsStreamInterceptor returns a Stream Interceptor. -func StatsStreamInterceptor(srv interface{}, stream grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error { - ctx := stream.Context() - ctx = InitLimitStats(ctx) - - wrapped := grpcmw.WrapServerStream(stream) - wrapped.WrappedContext = ctx - - err := handler(srv, wrapped) - - return err -} diff --git a/internal/grpc/middleware/limithandler/stats_interceptor_test.go b/internal/grpc/middleware/limithandler/stats_interceptor_test.go deleted file mode 100644 index 010e487c8..000000000 --- a/internal/grpc/middleware/limithandler/stats_interceptor_test.go +++ /dev/null @@ -1,171 +0,0 @@ -package limithandler_test - -import ( - "context" - "io" - "net" - "testing" - - grpcmwlogrus "github.com/grpc-ecosystem/go-grpc-middleware/logging/logrus" - "github.com/sirupsen/logrus" - "github.com/sirupsen/logrus/hooks/test" - "github.com/stretchr/testify/require" - "gitlab.com/gitlab-org/gitaly/v16/internal/git/catfile" - "gitlab.com/gitlab-org/gitaly/v16/internal/git/gittest" - "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/config" - "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/service/ref" - "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/transaction" - "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/backchannel" - "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/middleware/limithandler" - "gitlab.com/gitlab-org/gitaly/v16/internal/log" - "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper" - "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper/testcfg" - "gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb" - "google.golang.org/grpc" - "google.golang.org/grpc/credentials/insecure" - "google.golang.org/grpc/test/bufconn" -) - -func createNewServer(t *testing.T, cfg config.Cfg, logger *logrus.Logger) *grpc.Server { - t.Helper() - - logrusEntry := logrus.NewEntry(logger).WithField("test", t.Name()) - - concurrencyLimitHandler := limithandler.New( - cfg, - func(ctx context.Context) string { return "@hashed/1234" }, - limithandler.WithConcurrencyLimiters, - ) - - opts := []grpc.ServerOption{ - grpc.ChainStreamInterceptor( - limithandler.StatsStreamInterceptor, - grpcmwlogrus.StreamServerInterceptor(logrusEntry, - grpcmwlogrus.WithTimestampFormat(log.LogTimestampFormat), - grpcmwlogrus.WithMessageProducer(log.MessageProducer(grpcmwlogrus.DefaultMessageProducer, limithandler.FieldsProducer))), - concurrencyLimitHandler.StreamInterceptor(), - ), - grpc.ChainUnaryInterceptor( - limithandler.StatsUnaryInterceptor, - grpcmwlogrus.UnaryServerInterceptor(logrusEntry, - grpcmwlogrus.WithTimestampFormat(log.LogTimestampFormat), - grpcmwlogrus.WithMessageProducer(log.MessageProducer(grpcmwlogrus.DefaultMessageProducer, limithandler.FieldsProducer))), - concurrencyLimitHandler.UnaryInterceptor(), - ), - } - - server := grpc.NewServer(opts...) - - gitCommandFactory := gittest.NewCommandFactory(t, cfg) - catfileCache := catfile.NewCache(cfg) - t.Cleanup(catfileCache.Stop) - - gitalypb.RegisterRefServiceServer(server, ref.NewServer( - config.NewLocator(cfg), - gitCommandFactory, - transaction.NewManager(cfg, backchannel.NewRegistry()), - catfileCache, - )) - - return server -} - -func getBufDialer(listener *bufconn.Listener) func(context.Context, string) (net.Conn, error) { - return func(ctx context.Context, url string) (net.Conn, error) { - return listener.Dial() - } -} - -func TestInterceptor(t *testing.T) { - t.Parallel() - - ctx := testhelper.Context(t) - cfg := testcfg.Build(t, testcfg.WithBase(config.Cfg{ - Concurrency: []config.Concurrency{ - { - RPC: "/gitaly.RefService/RefExists", - MaxPerRepo: 1, - }, - { - RPC: "/gitaly.RefService/ListRefs", - MaxPerRepo: 1, - }, - }, - })) - - repo, _ := gittest.CreateRepository(t, ctx, cfg, gittest.CreateRepositoryConfig{ - SkipCreationViaService: true, - }) - - logger, hook := test.NewNullLogger() - - s := createNewServer(t, cfg, logger) - defer s.Stop() - - bufferSize := 1024 * 1024 - listener := bufconn.Listen(bufferSize) - go testhelper.MustServe(t, s, listener) - - tests := []struct { - name string - performRPC func(t *testing.T, ctx context.Context, client gitalypb.RefServiceClient) - expectedLogData map[string]any - }{ - { - name: "Unary", - performRPC: func(t *testing.T, ctx context.Context, client gitalypb.RefServiceClient) { - req := &gitalypb.RefExistsRequest{Repository: repo, Ref: []byte("refs/foo")} - - _, err := client.RefExists(ctx, req) - require.NoError(t, err) - }, - expectedLogData: map[string]any{ - "limit.limiting_type": "per-rpc", - "limit.limiting_key": "@hashed/1234", - "limit.concurrency_queue_length": 0, - }, - }, - { - name: "Stream", - performRPC: func(t *testing.T, ctx context.Context, client gitalypb.RefServiceClient) { - req := &gitalypb.ListRefsRequest{Repository: repo, Patterns: [][]byte{[]byte("refs/heads/")}} - - stream, err := client.ListRefs(ctx, req) - require.NoError(t, err) - - for { - _, err := stream.Recv() - if err == io.EOF { - break - } - require.NoError(t, err) - } - }, - expectedLogData: map[string]any{ - "limit.limiting_type": "per-rpc", - "limit.limiting_key": "@hashed/1234", - "limit.concurrency_queue_length": 0, - }, - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - hook.Reset() - - conn, err := grpc.DialContext(ctx, "", grpc.WithContextDialer(getBufDialer(listener)), grpc.WithTransportCredentials(insecure.NewCredentials())) - require.NoError(t, err) - defer conn.Close() - - client := gitalypb.NewRefServiceClient(conn) - - tt.performRPC(t, ctx, client) - - logEntries := hook.AllEntries() - require.Len(t, logEntries, 1) - for expectedLogKey, expectedLogValue := range tt.expectedLogData { - require.Equal(t, expectedLogValue, logEntries[0].Data[expectedLogKey]) - } - require.GreaterOrEqual(t, logEntries[0].Data["limit.concurrency_queue_ms"], int64(0)) - }) - } -} diff --git a/internal/grpc/middleware/limithandler/stats_test.go b/internal/grpc/middleware/limithandler/stats_test.go deleted file mode 100644 index 53ccccfd5..000000000 --- a/internal/grpc/middleware/limithandler/stats_test.go +++ /dev/null @@ -1,30 +0,0 @@ -package limithandler - -import ( - "testing" - - "github.com/sirupsen/logrus" - "github.com/stretchr/testify/assert" - "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper" -) - -func TestLimitStats(t *testing.T) { - t.Parallel() - - ctx := testhelper.Context(t) - ctx = InitLimitStats(ctx) - - stats := limitStatsFromContext(ctx) - stats.SetLimitingKey("test-limiter", "hello-world") - stats.AddConcurrencyQueueMs(13) - stats.SetConcurrencyQueueLength(99) - stats.SetConcurrencyDroppedReason("max_time") - - assert.Equal(t, FieldsProducer(ctx, nil), logrus.Fields{ - "limit.limiting_type": "test-limiter", - "limit.limiting_key": "hello-world", - "limit.concurrency_queue_ms": int64(13), - "limit.concurrency_queue_length": 99, - "limit.concurrency_dropped": "max_time", - }) -} diff --git a/internal/grpc/middleware/limithandler/concurrency_limiter.go b/internal/limiter/concurrency_limiter.go index 52b4c293b..2807b856e 100644 --- a/internal/grpc/middleware/limithandler/concurrency_limiter.go +++ b/internal/limiter/concurrency_limiter.go @@ -1,4 +1,4 @@ -package limithandler +package limiter import ( "context" @@ -7,8 +7,6 @@ import ( "sync" "time" - "github.com/prometheus/client_golang/prometheus" - "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/config" "gitlab.com/gitlab-org/gitaly/v16/internal/helper" "gitlab.com/gitlab-org/gitaly/v16/internal/structerr" "gitlab.com/gitlab-org/gitaly/v16/internal/tracing" @@ -173,7 +171,7 @@ func NewConcurrencyLimiter(maxConcurrencyLimit, maxQueueLength int, maxQueuedTic func (c *ConcurrencyLimiter) Limit(ctx context.Context, limitingKey string, f LimitedFunc) (interface{}, error) { span, ctx := tracing.StartSpanIfHasParent( ctx, - "limithandler.ConcurrencyLimiter.Limit", + "limiter.ConcurrencyLimiter.Limit", tracing.Tags{"key": limitingKey}, ) defer span.Finish() @@ -270,85 +268,3 @@ func (c *ConcurrencyLimiter) countSemaphores() int { return len(c.limitsByKey) } - -// WithConcurrencyLimiters sets up middleware to limit the concurrency of -// requests based on RPC and repository -func WithConcurrencyLimiters(cfg config.Cfg, middleware *LimiterMiddleware) { - acquiringSecondsMetric := prometheus.NewHistogramVec( - prometheus.HistogramOpts{ - Namespace: "gitaly", - Subsystem: "concurrency_limiting", - Name: "acquiring_seconds", - Help: "Histogram of time calls are rate limited (in seconds)", - Buckets: cfg.Prometheus.GRPCLatencyBuckets, - }, - []string{"system", "grpc_service", "grpc_method"}, - ) - inProgressMetric := prometheus.NewGaugeVec( - prometheus.GaugeOpts{ - Namespace: "gitaly", - Subsystem: "concurrency_limiting", - Name: "in_progress", - Help: "Gauge of number of concurrent in-progress calls", - }, - []string{"system", "grpc_service", "grpc_method"}, - ) - queuedMetric := prometheus.NewGaugeVec( - prometheus.GaugeOpts{ - Namespace: "gitaly", - Subsystem: "concurrency_limiting", - Name: "queued", - Help: "Gauge of number of queued calls", - }, - []string{"system", "grpc_service", "grpc_method"}, - ) - - middleware.collect = func(metrics chan<- prometheus.Metric) { - acquiringSecondsMetric.Collect(metrics) - inProgressMetric.Collect(metrics) - queuedMetric.Collect(metrics) - } - - result := make(map[string]Limiter) - for _, limit := range cfg.Concurrency { - limit := limit - - newTickerFunc := func() helper.Ticker { - return helper.NewManualTicker() - } - - if limit.MaxQueueWait > 0 { - newTickerFunc = func() helper.Ticker { - return helper.NewTimerTicker(limit.MaxQueueWait.Duration()) - } - } - - result[limit.RPC] = NewConcurrencyLimiter( - limit.MaxPerRepo, - limit.MaxQueueSize, - newTickerFunc, - newPerRPCPromMonitor( - "gitaly", limit.RPC, - queuedMetric, inProgressMetric, acquiringSecondsMetric, middleware.requestsDroppedMetric, - ), - ) - } - - // Set default for ReplicateRepository. - replicateRepositoryFullMethod := "/gitaly.RepositoryService/ReplicateRepository" - if _, ok := result[replicateRepositoryFullMethod]; !ok { - result[replicateRepositoryFullMethod] = NewConcurrencyLimiter( - 1, - 0, - func() helper.Ticker { - return helper.NewManualTicker() - }, - newPerRPCPromMonitor( - "gitaly", replicateRepositoryFullMethod, - queuedMetric, inProgressMetric, acquiringSecondsMetric, middleware.requestsDroppedMetric, - ), - ) - } - - middleware.methodLimiters = result -} diff --git a/internal/grpc/middleware/limithandler/concurrency_limiter_test.go b/internal/limiter/concurrency_limiter_test.go index 02adb7d04..f99ca7eab 100644 --- a/internal/grpc/middleware/limithandler/concurrency_limiter_test.go +++ b/internal/limiter/concurrency_limiter_test.go @@ -1,4 +1,4 @@ -package limithandler +package limiter import ( "context" diff --git a/internal/limiter/limiter.go b/internal/limiter/limiter.go new file mode 100644 index 000000000..74c643662 --- /dev/null +++ b/internal/limiter/limiter.go @@ -0,0 +1,11 @@ +package limiter + +import "context" + +// Limiter limits incoming requests +type Limiter interface { + Limit(ctx context.Context, lockKey string, f LimitedFunc) (interface{}, error) +} + +// LimitedFunc represents a function that will be limited +type LimitedFunc func() (resp interface{}, err error) diff --git a/internal/grpc/middleware/limithandler/monitor.go b/internal/limiter/monitor.go index b251df6d4..b1e074d41 100644 --- a/internal/grpc/middleware/limithandler/monitor.go +++ b/internal/limiter/monitor.go @@ -1,4 +1,4 @@ -package limithandler +package limiter import ( "context" @@ -6,6 +6,7 @@ import ( "time" "github.com/prometheus/client_golang/prometheus" + "gitlab.com/gitlab-org/gitaly/v16/internal/log" ) // ConcurrencyMonitor allows the concurrency monitor to be observed. @@ -30,7 +31,7 @@ func NewNoopConcurrencyMonitor() ConcurrencyMonitor { return &noopConcurrencyMonitor{} } -// PromMonitor keeps track of prometheus metrics for limithandlers. +// PromMonitor keeps track of prometheus metrics for limiters. // It conforms to both the ConcurrencyMonitor, and prometheus.Collector // interfaces. type PromMonitor struct { @@ -45,9 +46,9 @@ type PromMonitor struct { acquiringSecondsHistogramVec *prometheus.HistogramVec } -// newPerRPCPromMonitor creates a new ConcurrencyMonitor that tracks limiter +// NewPerRPCPromMonitor creates a new ConcurrencyMonitor that tracks limiter // activity in Prometheus. -func newPerRPCPromMonitor( +func NewPerRPCPromMonitor( system, fullMethod string, queuedMetric, inProgressMetric *prometheus.GaugeVec, acquiringSecondsVec *prometheus.HistogramVec, @@ -71,9 +72,10 @@ func newPerRPCPromMonitor( // Queued is called when a request has been queued. func (p *PromMonitor) Queued(ctx context.Context, key string, queueLength int) { - if stats := limitStatsFromContext(ctx); stats != nil { - stats.SetLimitingKey(p.limitingType, key) - stats.SetConcurrencyQueueLength(queueLength) + if stats := log.CustomFieldsFromContext(ctx); stats != nil { + stats.RecordMetadata("limit.limiting_type", p.limitingType) + stats.RecordMetadata("limit.limiting_key", key) + stats.RecordMetadata("limit.concurrency_queue_length", queueLength) } p.queuedMetric.Inc() } @@ -88,8 +90,8 @@ func (p *PromMonitor) Enter(ctx context.Context, acquireTime time.Duration) { p.inProgressMetric.Inc() p.acquiringSecondsMetric.Observe(acquireTime.Seconds()) - if stats := limitStatsFromContext(ctx); stats != nil { - stats.AddConcurrencyQueueMs(acquireTime.Milliseconds()) + if stats := log.CustomFieldsFromContext(ctx); stats != nil { + stats.RecordMetadata("limit.concurrency_queue_ms", acquireTime.Milliseconds()) } } @@ -100,11 +102,12 @@ func (p *PromMonitor) Exit(ctx context.Context) { // Dropped is called when a request is dropped. func (p *PromMonitor) Dropped(ctx context.Context, key string, length int, acquireTime time.Duration, reason string) { - if stats := limitStatsFromContext(ctx); stats != nil { - stats.SetLimitingKey(p.limitingType, key) - stats.SetConcurrencyQueueLength(length) - stats.SetConcurrencyDroppedReason(reason) - stats.AddConcurrencyQueueMs(acquireTime.Milliseconds()) + if stats := log.CustomFieldsFromContext(ctx); stats != nil { + stats.RecordMetadata("limit.limiting_type", p.limitingType) + stats.RecordMetadata("limit.limiting_key", key) + stats.RecordMetadata("limit.concurrency_queue_length", length) + stats.RecordMetadata("limit.concurrency_dropped", reason) + stats.RecordMetadata("limit.concurrency_queue_ms", acquireTime.Milliseconds()) } p.requestsDroppedMetric.WithLabelValues(reason).Inc() } diff --git a/internal/grpc/middleware/limithandler/monitor_test.go b/internal/limiter/monitor_test.go index 3b5d6163f..59fbbe7c8 100644 --- a/internal/grpc/middleware/limithandler/monitor_test.go +++ b/internal/limiter/monitor_test.go @@ -1,4 +1,4 @@ -package limithandler +package limiter import ( "bytes" @@ -10,6 +10,7 @@ import ( "github.com/sirupsen/logrus" "github.com/stretchr/testify/require" promconfig "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/config/prometheus" + "gitlab.com/gitlab-org/gitaly/v16/internal/log" "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper" ) @@ -51,7 +52,7 @@ func TestNewPerRPCPromMonitor(t *testing.T) { "reason", }, ) - return newPerRPCPromMonitor( + return NewPerRPCPromMonitor( system, fullMethod, queuedMetric, @@ -63,7 +64,7 @@ func TestNewPerRPCPromMonitor(t *testing.T) { t.Run("request is dequeued successfully", func(t *testing.T) { rpcMonitor := createNewMonitor() - ctx := InitLimitStats(testhelper.Context(t)) + ctx := log.InitContextCustomFields(testhelper.Context(t)) rpcMonitor.Queued(ctx, fullMethod, 5) rpcMonitor.Enter(ctx, time.Second) @@ -101,7 +102,7 @@ queued{grpc_method="unknown",grpc_service="unknown",system="gitaly"} 1 "acquiring_seconds", )) - stats := limitStatsFromContext(ctx) + stats := log.CustomFieldsFromContext(ctx) require.NotNil(t, stats) require.Equal(t, logrus.Fields{ "limit.limiting_type": TypePerRPC, @@ -132,7 +133,7 @@ queued{grpc_method="unknown",grpc_service="unknown",system="gitaly"} 0 t.Run("request is dropped after queueing", func(t *testing.T) { rpcMonitor := createNewMonitor() - ctx := InitLimitStats(testhelper.Context(t)) + ctx := log.InitContextCustomFields(testhelper.Context(t)) rpcMonitor.Queued(ctx, fullMethod, 5) rpcMonitor.Dropped(ctx, fullMethod, 5, time.Second, "load") @@ -173,7 +174,7 @@ queued{grpc_method="unknown",grpc_service="unknown",system="gitaly"} 1 "acquiring_seconds", )) - stats := limitStatsFromContext(ctx) + stats := log.CustomFieldsFromContext(ctx) require.NotNil(t, stats) require.Equal(t, logrus.Fields{ "limit.limiting_type": TypePerRPC, @@ -186,7 +187,7 @@ queued{grpc_method="unknown",grpc_service="unknown",system="gitaly"} 1 t.Run("request is dropped before queueing", func(t *testing.T) { rpcMonitor := createNewMonitor() - ctx := InitLimitStats(testhelper.Context(t)) + ctx := log.InitContextCustomFields(testhelper.Context(t)) rpcMonitor.Dropped(ctx, fullMethod, 5, time.Second, "load") expectedMetrics := `# HELP acquiring_seconds seconds to acquire @@ -225,7 +226,7 @@ queued{grpc_method="unknown",grpc_service="unknown",system="gitaly"} 0 "acquiring_seconds", )) - stats := limitStatsFromContext(ctx) + stats := log.CustomFieldsFromContext(ctx) require.NotNil(t, stats) require.Equal(t, logrus.Fields{ "limit.limiting_type": TypePerRPC, @@ -239,7 +240,7 @@ queued{grpc_method="unknown",grpc_service="unknown",system="gitaly"} 0 func TestNewPackObjectsConcurrencyMonitor(t *testing.T) { t.Run("request is dequeued successfully", func(t *testing.T) { - ctx := InitLimitStats(testhelper.Context(t)) + ctx := log.InitContextCustomFields(testhelper.Context(t)) packObjectsConcurrencyMonitor := NewPackObjectsConcurrencyMonitor( promconfig.DefaultConfig().GRPCLatencyBuckets, ) @@ -280,7 +281,7 @@ gitaly_pack_objects_queued 1 "gitaly_pack_objects_dropped_total", )) - stats := limitStatsFromContext(ctx) + stats := log.CustomFieldsFromContext(ctx) require.NotNil(t, stats) require.Equal(t, logrus.Fields{ "limit.limiting_type": TypePackObjects, @@ -310,7 +311,7 @@ gitaly_pack_objects_queued 0 }) t.Run("request is dropped after queueing", func(t *testing.T) { - ctx := InitLimitStats(testhelper.Context(t)) + ctx := log.InitContextCustomFields(testhelper.Context(t)) packObjectsConcurrencyMonitor := NewPackObjectsConcurrencyMonitor( promconfig.DefaultConfig().GRPCLatencyBuckets, ) @@ -354,7 +355,7 @@ gitaly_pack_objects_queued 1 "gitaly_pack_objects_dropped_total", )) - stats := limitStatsFromContext(ctx) + stats := log.CustomFieldsFromContext(ctx) require.NotNil(t, stats) require.Equal(t, logrus.Fields{ "limit.limiting_type": TypePackObjects, @@ -366,7 +367,7 @@ gitaly_pack_objects_queued 1 }) t.Run("request is dropped before queueing", func(t *testing.T) { - ctx := InitLimitStats(testhelper.Context(t)) + ctx := log.InitContextCustomFields(testhelper.Context(t)) packObjectsConcurrencyMonitor := NewPackObjectsConcurrencyMonitor( promconfig.DefaultConfig().GRPCLatencyBuckets, ) @@ -409,7 +410,7 @@ gitaly_pack_objects_queued 0 "gitaly_pack_objects_dropped_total", )) - stats := limitStatsFromContext(ctx) + stats := log.CustomFieldsFromContext(ctx) require.NotNil(t, stats) require.Equal(t, logrus.Fields{ "limit.limiting_type": TypePackObjects, diff --git a/internal/grpc/middleware/limithandler/rate_limiter.go b/internal/limiter/rate_limiter.go index 577e68b1d..73e0e5b6a 100644 --- a/internal/grpc/middleware/limithandler/rate_limiter.go +++ b/internal/limiter/rate_limiter.go @@ -1,4 +1,4 @@ -package limithandler +package limiter import ( "context" @@ -7,7 +7,6 @@ import ( "time" "github.com/prometheus/client_golang/prometheus" - "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/config" "gitlab.com/gitlab-org/gitaly/v16/internal/helper" "gitlab.com/gitlab-org/gitaly/v16/internal/structerr" "gitlab.com/gitlab-org/gitaly/v16/internal/tracing" @@ -35,7 +34,7 @@ var ErrRateLimit = errors.New("rate limit reached") func (r *RateLimiter) Limit(ctx context.Context, lockKey string, f LimitedFunc) (interface{}, error) { span, _ := tracing.StartSpanIfHasParent( ctx, - "limithandler.RateLimiterLimit", + "limiter.RateLimiterLimit", tracing.Tags{"key": lockKey}, ) defer span.Finish() @@ -104,32 +103,3 @@ func NewRateLimiter( return r } - -// WithRateLimiters sets up a middleware with limiters that limit requests -// based on its rate per second per RPC -func WithRateLimiters(ctx context.Context) SetupFunc { - return func(cfg config.Cfg, middleware *LimiterMiddleware) { - result := make(map[string]Limiter) - - for _, limitCfg := range cfg.RateLimiting { - if limitCfg.Burst > 0 && limitCfg.Interval > 0 { - serviceName, methodName := splitMethodName(limitCfg.RPC) - rateLimiter := NewRateLimiter( - limitCfg.Interval.Duration(), - limitCfg.Burst, - helper.NewTimerTicker(5*time.Minute), - middleware.requestsDroppedMetric.With(prometheus.Labels{ - "system": "gitaly", - "grpc_service": serviceName, - "grpc_method": methodName, - "reason": "rate", - }), - ) - result[limitCfg.RPC] = rateLimiter - go rateLimiter.PruneUnusedLimiters(ctx) - } - } - - middleware.methodLimiters = result - } -} diff --git a/internal/grpc/middleware/limithandler/rate_limiter_test.go b/internal/limiter/rate_limiter_test.go index 5e88b8f6b..5a39a694c 100644 --- a/internal/grpc/middleware/limithandler/rate_limiter_test.go +++ b/internal/limiter/rate_limiter_test.go @@ -1,4 +1,4 @@ -package limithandler +package limiter import ( "testing" diff --git a/internal/log/customfields.go b/internal/log/customfields.go index 1622810f2..38874e15c 100644 --- a/internal/log/customfields.go +++ b/internal/log/customfields.go @@ -14,7 +14,7 @@ type requestCustomFieldsKey struct{} // the object out with CustomFieldsFromContext. type CustomFields struct { numericFields map[string]int - stringFields map[string]string + anyFields map[string]any sync.Mutex } @@ -45,11 +45,11 @@ func (fields *CustomFields) RecordMax(key string, value int) { } // RecordMetadata records a string metadata for the given key. -func (fields *CustomFields) RecordMetadata(key string, value string) { +func (fields *CustomFields) RecordMetadata(key string, value any) { fields.Lock() defer fields.Unlock() - fields.stringFields[key] = value + fields.anyFields[key] = value } // Fields returns all the fields as logrus.Fields @@ -61,7 +61,7 @@ func (fields *CustomFields) Fields() logrus.Fields { for k, v := range fields.numericFields { f[k] = v } - for k, v := range fields.stringFields { + for k, v := range fields.anyFields { f[k] = v } return f @@ -77,6 +77,6 @@ func CustomFieldsFromContext(ctx context.Context) *CustomFields { func InitContextCustomFields(ctx context.Context) context.Context { return context.WithValue(ctx, requestCustomFieldsKey{}, &CustomFields{ numericFields: make(map[string]int), - stringFields: make(map[string]string), + anyFields: make(map[string]any), }) } diff --git a/internal/log/customfields_test.go b/internal/log/customfields_test.go index 07577b17c..b2d89b707 100644 --- a/internal/log/customfields_test.go +++ b/internal/log/customfields_test.go @@ -1,6 +1,7 @@ package log_test import ( + "context" "testing" "github.com/sirupsen/logrus" @@ -73,17 +74,46 @@ func TestStatsFromContext_RecordMax(t *testing.T) { } func TestStatsFromContext_RecordMetadata(t *testing.T) { - ctx := testhelper.Context(t) - - ctx = log.InitContextCustomFields(ctx) - - customFields := log.CustomFieldsFromContext(ctx) - - customFields.RecordMetadata("foo", "bar") - require.NotNil(t, customFields) - require.Equal(t, customFields.Fields(), logrus.Fields{"foo": "bar"}) - - customFields.RecordMetadata("foo", "baz") // override the existing value - require.NotNil(t, customFields) - require.Equal(t, customFields.Fields(), logrus.Fields{"foo": "baz"}) + for _, tc := range []struct { + desc string + setup func(context.Context) + expectedFields logrus.Fields + }{ + { + desc: "record a string metadata", + setup: func(ctx context.Context) { + customFields := log.CustomFieldsFromContext(ctx) + customFields.RecordMetadata("foo", "bar") + }, + expectedFields: logrus.Fields{"foo": "bar"}, + }, + { + desc: "override metadata of the same key", + setup: func(ctx context.Context) { + customFields := log.CustomFieldsFromContext(ctx) + customFields.RecordMetadata("foo", "bar") + customFields.RecordMetadata("foo", "baz") // override the existing value + }, + expectedFields: logrus.Fields{"foo": "baz"}, + }, + { + desc: "record metadata with different types", + setup: func(ctx context.Context) { + customFields := log.CustomFieldsFromContext(ctx) + customFields.RecordMetadata("hello", 1234) + customFields.RecordMetadata("hi", []int{1, 2, 3, 4}) + }, + expectedFields: logrus.Fields{ + "hello": 1234, + "hi": []int{1, 2, 3, 4}, + }, + }, + } { + ctx := log.InitContextCustomFields(testhelper.Context(t)) + tc.setup(ctx) + + customFields := log.CustomFieldsFromContext(ctx) + require.NotNil(t, customFields) + require.Equal(t, customFields.Fields(), tc.expectedFields) + } } diff --git a/internal/testhelper/testserver/gitaly.go b/internal/testhelper/testserver/gitaly.go index 1b78c581d..1b62c903f 100644 --- a/internal/testhelper/testserver/gitaly.go +++ b/internal/testhelper/testserver/gitaly.go @@ -34,6 +34,7 @@ import ( internalclient "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/client" "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/middleware/limithandler" "gitlab.com/gitlab-org/gitaly/v16/internal/helper/perm" + "gitlab.com/gitlab-org/gitaly/v16/internal/limiter" praefectconfig "gitlab.com/gitlab-org/gitaly/v16/internal/praefect/config" "gitlab.com/gitlab-org/gitaly/v16/internal/streamcache" "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper" @@ -263,7 +264,7 @@ type gitalyServerDeps struct { catfileCache catfile.Cache diskCache cache.Cache packObjectsCache streamcache.Cache - packObjectsLimiter limithandler.Limiter + packObjectsLimiter limiter.Limiter limitHandler *limithandler.LimiterMiddleware git2goExecutor *git2go.Executor updaterWithHooks *updateref.UpdaterWithHooks @@ -323,11 +324,11 @@ func (gsd *gitalyServerDeps) createDependencies(tb testing.TB, cfg config.Cfg) * } if gsd.packObjectsLimiter == nil { - gsd.packObjectsLimiter = limithandler.NewConcurrencyLimiter( + gsd.packObjectsLimiter = limiter.NewConcurrencyLimiter( 0, 0, nil, - limithandler.NewNoopConcurrencyMonitor(), + limiter.NewNoopConcurrencyMonitor(), ) } @@ -462,7 +463,7 @@ func WithDiskCache(diskCache cache.Cache) GitalyServerOpt { // WithPackObjectsLimiter sets the PackObjectsLimiter that will be // used for gitaly services initialization. -func WithPackObjectsLimiter(limiter *limithandler.ConcurrencyLimiter) GitalyServerOpt { +func WithPackObjectsLimiter(limiter *limiter.ConcurrencyLimiter) GitalyServerOpt { return func(deps gitalyServerDeps) gitalyServerDeps { deps.packObjectsLimiter = limiter return deps |