Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJames Fargher <jfargher@gitlab.com>2023-07-12 02:51:06 +0300
committerJames Fargher <jfargher@gitlab.com>2023-07-12 02:51:06 +0300
commit510c9aa7917db90134845287a10ddfb280b2fe20 (patch)
tree4c4c944e9b08d304067f697ce268c7f899c474f0
parent6e31101f69c153a19b71adf31037c0b681add84a (diff)
parentb44bd8c40b777031a6f0bf01e2050b39a0540338 (diff)
Merge branch 'qmnguyen0711/refactor-limithandler' into 'master'
Refactor limithandler package See merge request https://gitlab.com/gitlab-org/gitaly/-/merge_requests/6038 Merged-by: James Fargher <jfargher@gitlab.com> Approved-by: James Fargher <jfargher@gitlab.com> Reviewed-by: James Fargher <jfargher@gitlab.com> Reviewed-by: Justin Tobler <jtobler@gitlab.com> Co-authored-by: Quang-Minh Nguyen <qmnguyen@gitlab.com>
-rw-r--r--cmd/gitaly-hooks/hooks_test.go10
-rw-r--r--internal/cli/gitaly/serve.go5
-rw-r--r--internal/gitaly/server/server.go4
-rw-r--r--internal/gitaly/service/dependencies.go5
-rw-r--r--internal/gitaly/service/hook/pack_objects_test.go6
-rw-r--r--internal/gitaly/service/hook/server.go6
-rw-r--r--internal/grpc/middleware/limithandler/middleware.go134
-rw-r--r--internal/grpc/middleware/limithandler/middleware_test.go15
-rw-r--r--internal/grpc/middleware/limithandler/stats.go125
-rw-r--r--internal/grpc/middleware/limithandler/stats_interceptor_test.go171
-rw-r--r--internal/grpc/middleware/limithandler/stats_test.go30
-rw-r--r--internal/limiter/concurrency_limiter.go (renamed from internal/grpc/middleware/limithandler/concurrency_limiter.go)88
-rw-r--r--internal/limiter/concurrency_limiter_test.go (renamed from internal/grpc/middleware/limithandler/concurrency_limiter_test.go)2
-rw-r--r--internal/limiter/limiter.go11
-rw-r--r--internal/limiter/monitor.go (renamed from internal/grpc/middleware/limithandler/monitor.go)31
-rw-r--r--internal/limiter/monitor_test.go (renamed from internal/grpc/middleware/limithandler/monitor_test.go)29
-rw-r--r--internal/limiter/rate_limiter.go (renamed from internal/grpc/middleware/limithandler/rate_limiter.go)34
-rw-r--r--internal/limiter/rate_limiter_test.go (renamed from internal/grpc/middleware/limithandler/rate_limiter_test.go)2
-rw-r--r--internal/log/customfields.go10
-rw-r--r--internal/log/customfields_test.go56
-rw-r--r--internal/testhelper/testserver/gitaly.go9
21 files changed, 252 insertions, 531 deletions
diff --git a/cmd/gitaly-hooks/hooks_test.go b/cmd/gitaly-hooks/hooks_test.go
index 9ceb43681..f0883e5bd 100644
--- a/cmd/gitaly-hooks/hooks_test.go
+++ b/cmd/gitaly-hooks/hooks_test.go
@@ -29,8 +29,8 @@ import (
"gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/transaction"
"gitlab.com/gitlab-org/gitaly/v16/internal/gitlab"
"gitlab.com/gitlab-org/gitaly/v16/internal/grpc/metadata"
- "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/middleware/limithandler"
"gitlab.com/gitlab-org/gitaly/v16/internal/helper/text"
+ "gitlab.com/gitlab-org/gitaly/v16/internal/limiter"
gitalylog "gitlab.com/gitlab-org/gitaly/v16/internal/log"
"gitlab.com/gitlab-org/gitaly/v16/internal/structerr"
"gitlab.com/gitlab-org/gitaly/v16/internal/testhelper"
@@ -660,8 +660,8 @@ func TestGitalyHooksPackObjects(t *testing.T) {
}
func TestGitalyServerReturnsError(t *testing.T) {
- resourceExhaustedErr := structerr.NewResourceExhausted("%w", limithandler.ErrMaxQueueTime).WithDetail(&gitalypb.LimitError{
- ErrorMessage: limithandler.ErrMaxQueueTime.Error(),
+ resourceExhaustedErr := structerr.NewResourceExhausted("%w", limiter.ErrMaxQueueTime).WithDetail(&gitalypb.LimitError{
+ ErrorMessage: limiter.ErrMaxQueueTime.Error(),
RetryAfter: durationpb.New(0),
})
@@ -765,8 +765,8 @@ func TestGitalyServerReturnsError_packObjects(t *testing.T) {
}{
{
name: "resource exhausted with LimitError detail",
- err: structerr.NewResourceExhausted("%w", limithandler.ErrMaxQueueTime).WithDetail(&gitalypb.LimitError{
- ErrorMessage: limithandler.ErrMaxQueueTime.Error(),
+ err: structerr.NewResourceExhausted("%w", limiter.ErrMaxQueueTime).WithDetail(&gitalypb.LimitError{
+ ErrorMessage: limiter.ErrMaxQueueTime.Error(),
RetryAfter: durationpb.New(0),
}),
expectedStderr: `
diff --git a/internal/cli/gitaly/serve.go b/internal/cli/gitaly/serve.go
index 8ba00536a..3fb245511 100644
--- a/internal/cli/gitaly/serve.go
+++ b/internal/cli/gitaly/serve.go
@@ -40,6 +40,7 @@ import (
"gitlab.com/gitlab-org/gitaly/v16/internal/grpc/middleware/limithandler"
"gitlab.com/gitlab-org/gitaly/v16/internal/helper"
"gitlab.com/gitlab-org/gitaly/v16/internal/helper/env"
+ "gitlab.com/gitlab-org/gitaly/v16/internal/limiter"
glog "gitlab.com/gitlab-org/gitaly/v16/internal/log"
"gitlab.com/gitlab-org/gitaly/v16/internal/streamcache"
"gitlab.com/gitlab-org/gitaly/v16/internal/tempdir"
@@ -269,7 +270,7 @@ func run(cfg config.Cfg) error {
limithandler.WithRateLimiters(ctx),
)
- packObjectsMonitor := limithandler.NewPackObjectsConcurrencyMonitor(
+ packObjectsMonitor := limiter.NewPackObjectsConcurrencyMonitor(
cfg.Prometheus.GRPCLatencyBuckets,
)
newTickerFunc := func() helper.Ticker {
@@ -280,7 +281,7 @@ func run(cfg config.Cfg) error {
return helper.NewTimerTicker(cfg.PackObjectsLimiting.MaxQueueWait.Duration())
}
}
- packObjectsLimiter := limithandler.NewConcurrencyLimiter(
+ packObjectsLimiter := limiter.NewConcurrencyLimiter(
cfg.PackObjectsLimiting.MaxConcurrency,
cfg.PackObjectsLimiting.MaxQueueLength,
newTickerFunc,
diff --git a/internal/gitaly/server/server.go b/internal/gitaly/server/server.go
index f9caf4e08..79392c46f 100644
--- a/internal/gitaly/server/server.go
+++ b/internal/gitaly/server/server.go
@@ -16,7 +16,6 @@ import (
"gitlab.com/gitlab-org/gitaly/v16/internal/grpc/middleware/cache"
"gitlab.com/gitlab-org/gitaly/v16/internal/grpc/middleware/customfieldshandler"
"gitlab.com/gitlab-org/gitaly/v16/internal/grpc/middleware/featureflag"
- "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/middleware/limithandler"
"gitlab.com/gitlab-org/gitaly/v16/internal/grpc/middleware/metadatahandler"
"gitlab.com/gitlab-org/gitaly/v16/internal/grpc/middleware/panichandler"
"gitlab.com/gitlab-org/gitaly/v16/internal/grpc/middleware/sentryhandler"
@@ -111,7 +110,6 @@ func (s *GitalyServerFactory) New(secure bool, opts ...Option) (*grpc.Server, er
grpcstats.FieldsProducer,
featureflag.FieldsProducer,
structerr.FieldsProducer,
- limithandler.FieldsProducer,
),
)
@@ -121,7 +119,6 @@ func (s *GitalyServerFactory) New(secure bool, opts ...Option) (*grpc.Server, er
metadatahandler.StreamInterceptor,
grpcprometheus.StreamServerInterceptor,
customfieldshandler.StreamInterceptor,
- limithandler.StatsStreamInterceptor,
grpcmwlogrus.StreamServerInterceptor(s.logger,
grpcmwlogrus.WithTimestampFormat(gitalylog.LogTimestampFormat),
logMsgProducer,
@@ -138,7 +135,6 @@ func (s *GitalyServerFactory) New(secure bool, opts ...Option) (*grpc.Server, er
metadatahandler.UnaryInterceptor,
grpcprometheus.UnaryServerInterceptor,
customfieldshandler.UnaryInterceptor,
- limithandler.StatsUnaryInterceptor,
grpcmwlogrus.UnaryServerInterceptor(s.logger,
grpcmwlogrus.WithTimestampFormat(gitalylog.LogTimestampFormat),
logMsgProducer,
diff --git a/internal/gitaly/service/dependencies.go b/internal/gitaly/service/dependencies.go
index 4b1cb34f1..6daf570c1 100644
--- a/internal/gitaly/service/dependencies.go
+++ b/internal/gitaly/service/dependencies.go
@@ -17,6 +17,7 @@ import (
"gitlab.com/gitlab-org/gitaly/v16/internal/gitlab"
"gitlab.com/gitlab-org/gitaly/v16/internal/grpc/backchannel"
"gitlab.com/gitlab-org/gitaly/v16/internal/grpc/middleware/limithandler"
+ "gitlab.com/gitlab-org/gitaly/v16/internal/limiter"
"gitlab.com/gitlab-org/gitaly/v16/internal/streamcache"
)
@@ -33,7 +34,7 @@ type Dependencies struct {
CatfileCache catfile.Cache
DiskCache cache.Cache
PackObjectsCache streamcache.Cache
- PackObjectsLimiter limithandler.Limiter
+ PackObjectsLimiter limiter.Limiter
LimitHandler *limithandler.LimiterMiddleware
Git2goExecutor *git2go.Executor
UpdaterWithHooks *updateref.UpdaterWithHooks
@@ -119,7 +120,7 @@ func (dc *Dependencies) GetHousekeepingManager() housekeeping.Manager {
}
// GetPackObjectsLimiter returns the pack-objects limiter.
-func (dc *Dependencies) GetPackObjectsLimiter() limithandler.Limiter {
+func (dc *Dependencies) GetPackObjectsLimiter() limiter.Limiter {
return dc.PackObjectsLimiter
}
diff --git a/internal/gitaly/service/hook/pack_objects_test.go b/internal/gitaly/service/hook/pack_objects_test.go
index 312bb0ab0..4d4d1a500 100644
--- a/internal/gitaly/service/hook/pack_objects_test.go
+++ b/internal/gitaly/service/hook/pack_objects_test.go
@@ -22,8 +22,8 @@ import (
"gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/config"
hookPkg "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/hook"
"gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage"
- "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/middleware/limithandler"
"gitlab.com/gitlab-org/gitaly/v16/internal/helper"
+ "gitlab.com/gitlab-org/gitaly/v16/internal/limiter"
"gitlab.com/gitlab-org/gitaly/v16/internal/streamcache"
"gitlab.com/gitlab-org/gitaly/v16/internal/structerr"
"gitlab.com/gitlab-org/gitaly/v16/internal/testhelper"
@@ -854,10 +854,10 @@ func TestPackObjects_concurrencyLimit(t *testing.T) {
cfg := cfgWithCache(t, 0)
ticker := helper.NewManualTicker()
- monitor := limithandler.NewPackObjectsConcurrencyMonitor(
+ monitor := limiter.NewPackObjectsConcurrencyMonitor(
cfg.Prometheus.GRPCLatencyBuckets,
)
- limiter := limithandler.NewConcurrencyLimiter(
+ limiter := limiter.NewConcurrencyLimiter(
1,
0,
func() helper.Ticker { return ticker },
diff --git a/internal/gitaly/service/hook/server.go b/internal/gitaly/service/hook/server.go
index c8944f3ec..d99f66aa9 100644
--- a/internal/gitaly/service/hook/server.go
+++ b/internal/gitaly/service/hook/server.go
@@ -7,7 +7,7 @@ import (
"gitlab.com/gitlab-org/gitaly/v16/internal/git"
gitalyhook "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/hook"
"gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage"
- "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/middleware/limithandler"
+ "gitlab.com/gitlab-org/gitaly/v16/internal/limiter"
"gitlab.com/gitlab-org/gitaly/v16/internal/streamcache"
"gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb"
)
@@ -18,7 +18,7 @@ type server struct {
locator storage.Locator
gitCmdFactory git.CommandFactory
packObjectsCache streamcache.Cache
- packObjectsLimiter limithandler.Limiter
+ packObjectsLimiter limiter.Limiter
runPackObjectsFn func(
context.Context,
git.CommandFactory,
@@ -36,7 +36,7 @@ func NewServer(
locator storage.Locator,
gitCmdFactory git.CommandFactory,
packObjectsCache streamcache.Cache,
- packObjectsLimiter limithandler.Limiter,
+ packObjectsLimiter limiter.Limiter,
) gitalypb.HookServiceServer {
srv := &server{
manager: manager,
diff --git a/internal/grpc/middleware/limithandler/middleware.go b/internal/grpc/middleware/limithandler/middleware.go
index c532943a4..0c41a4672 100644
--- a/internal/grpc/middleware/limithandler/middleware.go
+++ b/internal/grpc/middleware/limithandler/middleware.go
@@ -2,10 +2,14 @@ package limithandler
import (
"context"
+ "strings"
+ "time"
grpcmwtags "github.com/grpc-ecosystem/go-grpc-middleware/tags"
"github.com/prometheus/client_golang/prometheus"
"gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/config"
+ "gitlab.com/gitlab-org/gitaly/v16/internal/helper"
+ "gitlab.com/gitlab-org/gitaly/v16/internal/limiter"
"google.golang.org/grpc"
)
@@ -28,17 +32,9 @@ func LimitConcurrencyByRepo(ctx context.Context) string {
return ""
}
-// Limiter limits incoming requests
-type Limiter interface {
- Limit(ctx context.Context, lockKey string, f LimitedFunc) (interface{}, error)
-}
-
-// LimitedFunc represents a function that will be limited
-type LimitedFunc func() (resp interface{}, err error)
-
// LimiterMiddleware contains rate limiter state
type LimiterMiddleware struct {
- methodLimiters map[string]Limiter
+ methodLimiters map[string]limiter.Limiter
getLockKey GetLockKey
requestsDroppedMetric *prometheus.CounterVec
collect func(metrics chan<- prometheus.Metric)
@@ -166,3 +162,123 @@ func (w *wrappedStream) RecvMsg(m interface{}) error {
return err
}
}
+
+// WithConcurrencyLimiters sets up middleware to limit the concurrency of
+// requests based on RPC and repository
+func WithConcurrencyLimiters(cfg config.Cfg, middleware *LimiterMiddleware) {
+ acquiringSecondsMetric := prometheus.NewHistogramVec(
+ prometheus.HistogramOpts{
+ Namespace: "gitaly",
+ Subsystem: "concurrency_limiting",
+ Name: "acquiring_seconds",
+ Help: "Histogram of time calls are rate limited (in seconds)",
+ Buckets: cfg.Prometheus.GRPCLatencyBuckets,
+ },
+ []string{"system", "grpc_service", "grpc_method"},
+ )
+ inProgressMetric := prometheus.NewGaugeVec(
+ prometheus.GaugeOpts{
+ Namespace: "gitaly",
+ Subsystem: "concurrency_limiting",
+ Name: "in_progress",
+ Help: "Gauge of number of concurrent in-progress calls",
+ },
+ []string{"system", "grpc_service", "grpc_method"},
+ )
+ queuedMetric := prometheus.NewGaugeVec(
+ prometheus.GaugeOpts{
+ Namespace: "gitaly",
+ Subsystem: "concurrency_limiting",
+ Name: "queued",
+ Help: "Gauge of number of queued calls",
+ },
+ []string{"system", "grpc_service", "grpc_method"},
+ )
+
+ middleware.collect = func(metrics chan<- prometheus.Metric) {
+ acquiringSecondsMetric.Collect(metrics)
+ inProgressMetric.Collect(metrics)
+ queuedMetric.Collect(metrics)
+ }
+
+ result := make(map[string]limiter.Limiter)
+ for _, limit := range cfg.Concurrency {
+ limit := limit
+
+ newTickerFunc := func() helper.Ticker {
+ return helper.NewManualTicker()
+ }
+
+ if limit.MaxQueueWait > 0 {
+ newTickerFunc = func() helper.Ticker {
+ return helper.NewTimerTicker(limit.MaxQueueWait.Duration())
+ }
+ }
+
+ result[limit.RPC] = limiter.NewConcurrencyLimiter(
+ limit.MaxPerRepo,
+ limit.MaxQueueSize,
+ newTickerFunc,
+ limiter.NewPerRPCPromMonitor(
+ "gitaly", limit.RPC,
+ queuedMetric, inProgressMetric, acquiringSecondsMetric, middleware.requestsDroppedMetric,
+ ),
+ )
+ }
+
+ // Set default for ReplicateRepository.
+ replicateRepositoryFullMethod := "/gitaly.RepositoryService/ReplicateRepository"
+ if _, ok := result[replicateRepositoryFullMethod]; !ok {
+ result[replicateRepositoryFullMethod] = limiter.NewConcurrencyLimiter(
+ 1,
+ 0,
+ func() helper.Ticker {
+ return helper.NewManualTicker()
+ },
+ limiter.NewPerRPCPromMonitor(
+ "gitaly", replicateRepositoryFullMethod,
+ queuedMetric, inProgressMetric, acquiringSecondsMetric, middleware.requestsDroppedMetric,
+ ),
+ )
+ }
+
+ middleware.methodLimiters = result
+}
+
+// WithRateLimiters sets up a middleware with limiters that limit requests
+// based on its rate per second per RPC
+func WithRateLimiters(ctx context.Context) SetupFunc {
+ return func(cfg config.Cfg, middleware *LimiterMiddleware) {
+ result := make(map[string]limiter.Limiter)
+
+ for _, limitCfg := range cfg.RateLimiting {
+ if limitCfg.Burst > 0 && limitCfg.Interval > 0 {
+ serviceName, methodName := splitMethodName(limitCfg.RPC)
+ rateLimiter := limiter.NewRateLimiter(
+ limitCfg.Interval.Duration(),
+ limitCfg.Burst,
+ helper.NewTimerTicker(5*time.Minute),
+ middleware.requestsDroppedMetric.With(prometheus.Labels{
+ "system": "gitaly",
+ "grpc_service": serviceName,
+ "grpc_method": methodName,
+ "reason": "rate",
+ }),
+ )
+ result[limitCfg.RPC] = rateLimiter
+ go rateLimiter.PruneUnusedLimiters(ctx)
+ }
+ }
+
+ middleware.methodLimiters = result
+ }
+}
+
+func splitMethodName(fullMethodName string) (string, string) {
+ fullMethodName = strings.TrimPrefix(fullMethodName, "/") // remove leading slash
+ service, method, ok := strings.Cut(fullMethodName, "/")
+ if !ok {
+ return "unknown", "unknown"
+ }
+ return service, method
+}
diff --git a/internal/grpc/middleware/limithandler/middleware_test.go b/internal/grpc/middleware/limithandler/middleware_test.go
index e644d9f49..65adad2be 100644
--- a/internal/grpc/middleware/limithandler/middleware_test.go
+++ b/internal/grpc/middleware/limithandler/middleware_test.go
@@ -15,6 +15,7 @@ import (
"gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/config"
"gitlab.com/gitlab-org/gitaly/v16/internal/grpc/middleware/limithandler"
"gitlab.com/gitlab-org/gitaly/v16/internal/helper/duration"
+ "gitlab.com/gitlab-org/gitaly/v16/internal/limiter"
"gitlab.com/gitlab-org/gitaly/v16/internal/structerr"
"gitlab.com/gitlab-org/gitaly/v16/internal/testhelper"
"gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb"
@@ -142,7 +143,7 @@ func TestUnaryLimitHandler_queueing(t *testing.T) {
// Now we spawn a second RPC call. As the concurrency limit is satisfied we'll be
// put into queue and will eventually return with an error.
_, err := client.UnaryCall(ctx, &grpc_testing.SimpleRequest{})
- testhelper.RequireGrpcError(t, structerr.NewResourceExhausted("%w", limithandler.ErrMaxQueueTime).WithDetail(
+ testhelper.RequireGrpcError(t, structerr.NewResourceExhausted("%w", limiter.ErrMaxQueueTime).WithDetail(
&gitalypb.LimitError{
ErrorMessage: "maximum time in concurrency queue reached",
RetryAfter: durationpb.New(0),
@@ -272,7 +273,7 @@ func TestStreamLimitHandler(t *testing.T) {
maxConcurrency: 3,
expectedRequestCount: 4,
expectedResponseCount: 4,
- expectedErr: structerr.NewResourceExhausted("%w", limithandler.ErrMaxQueueSize).WithDetail(
+ expectedErr: structerr.NewResourceExhausted("%w", limiter.ErrMaxQueueSize).WithDetail(
&gitalypb.LimitError{
ErrorMessage: "maximum queue size reached",
RetryAfter: durationpb.New(0),
@@ -302,7 +303,7 @@ func TestStreamLimitHandler(t *testing.T) {
maxConcurrency: 3,
expectedRequestCount: 4,
expectedResponseCount: 4,
- expectedErr: structerr.NewResourceExhausted("%w", limithandler.ErrMaxQueueSize).WithDetail(
+ expectedErr: structerr.NewResourceExhausted("%w", limiter.ErrMaxQueueSize).WithDetail(
&gitalypb.LimitError{
ErrorMessage: "maximum queue size reached",
RetryAfter: durationpb.New(0),
@@ -334,7 +335,7 @@ func TestStreamLimitHandler(t *testing.T) {
maxConcurrency: 3,
expectedRequestCount: 4,
expectedResponseCount: 4,
- expectedErr: structerr.NewResourceExhausted("%w", limithandler.ErrMaxQueueSize).WithDetail(
+ expectedErr: structerr.NewResourceExhausted("%w", limiter.ErrMaxQueueSize).WithDetail(
&gitalypb.LimitError{
ErrorMessage: "maximum queue size reached",
RetryAfter: durationpb.New(0),
@@ -388,7 +389,7 @@ func TestStreamLimitHandler(t *testing.T) {
// + 1 (queued stream) * (10 requests per stream)
expectedRequestCount: 40,
expectedResponseCount: 4,
- expectedErr: structerr.NewResourceExhausted("%w", limithandler.ErrMaxQueueSize).WithDetail(
+ expectedErr: structerr.NewResourceExhausted("%w", limiter.ErrMaxQueueSize).WithDetail(
&gitalypb.LimitError{
ErrorMessage: "maximum queue size reached",
RetryAfter: durationpb.New(0),
@@ -652,7 +653,7 @@ func TestConcurrencyLimitHandlerMetrics(t *testing.T) {
limitErr, ok := details[0].(*gitalypb.LimitError)
require.True(t, ok)
- assert.Equal(t, limithandler.ErrMaxQueueSize.Error(), limitErr.ErrorMessage)
+ assert.Equal(t, limiter.ErrMaxQueueSize.Error(), limitErr.ErrorMessage)
assert.Equal(t, durationpb.New(0), limitErr.RetryAfter)
errs++
@@ -732,7 +733,7 @@ func TestRateLimitHandler(t *testing.T) {
limitErr, ok := details[0].(*gitalypb.LimitError)
require.True(t, ok)
- assert.Equal(t, limithandler.ErrRateLimit.Error(), limitErr.ErrorMessage)
+ assert.Equal(t, limiter.ErrRateLimit.Error(), limitErr.ErrorMessage)
assert.Equal(t, durationpb.New(0), limitErr.RetryAfter)
}
diff --git a/internal/grpc/middleware/limithandler/stats.go b/internal/grpc/middleware/limithandler/stats.go
deleted file mode 100644
index 30926695a..000000000
--- a/internal/grpc/middleware/limithandler/stats.go
+++ /dev/null
@@ -1,125 +0,0 @@
-package limithandler
-
-import (
- "context"
- "sync"
-
- grpcmw "github.com/grpc-ecosystem/go-grpc-middleware"
- "github.com/sirupsen/logrus"
- "google.golang.org/grpc"
-)
-
-type limitStatsKey struct{}
-
-// LimitStats contains info about the concurrency limiter.
-type LimitStats struct {
- sync.Mutex
- // limitingKey is the key used for limiting accounting
- limitingKey string
- // limitingType is the type of limiter
- limitingType string
- // concurrencyQueueLen is the combination of in-flight requests and in-queue requests. It tells
- // how busy the queue of the same limiting key is
- concurrencyQueueLength int
- // concurrencyQueueMs milliseconds waiting in concurrency limit queue.
- concurrencyQueueMs int64
- // concurrencyDropped stores the dropping reason of a request
- concurrencyDropped string
-}
-
-// InitLimitStats initializes context with a per-RPC stats struct.
-func InitLimitStats(ctx context.Context) context.Context {
- return context.WithValue(ctx, limitStatsKey{}, &LimitStats{})
-}
-
-// AddConcurrencyQueueMs adds queue time.
-func (s *LimitStats) AddConcurrencyQueueMs(queueMs int64) {
- s.Lock()
- defer s.Unlock()
- s.concurrencyQueueMs = queueMs
-}
-
-// SetLimitingKey set limiting key.
-func (s *LimitStats) SetLimitingKey(limitingType string, limitingKey string) {
- s.Lock()
- defer s.Unlock()
- s.limitingKey = limitingKey
- s.limitingType = limitingType
-}
-
-// SetConcurrencyQueueLength set concurrency queue length.
-func (s *LimitStats) SetConcurrencyQueueLength(queueLength int) {
- s.Lock()
- defer s.Unlock()
- s.concurrencyQueueLength = queueLength
-}
-
-// SetConcurrencyDroppedReason sets the reason why a call has been dropped from the queue.
-func (s *LimitStats) SetConcurrencyDroppedReason(reason string) {
- s.Lock()
- defer s.Unlock()
- s.concurrencyDropped = reason
-}
-
-// Fields returns logging info.
-func (s *LimitStats) Fields() logrus.Fields {
- s.Lock()
- defer s.Unlock()
-
- if s.limitingKey == "" {
- return nil
- }
- logs := logrus.Fields{
- "limit.limiting_type": s.limitingType,
- "limit.limiting_key": s.limitingKey,
- "limit.concurrency_queue_ms": s.concurrencyQueueMs,
- "limit.concurrency_queue_length": s.concurrencyQueueLength,
- }
- if s.concurrencyDropped != "" {
- logs["limit.concurrency_dropped"] = s.concurrencyDropped
- }
- return logs
-}
-
-// FieldsProducer extracts stats info from the context and returns it as a logging fields.
-func FieldsProducer(ctx context.Context, _ error) logrus.Fields {
- stats := limitStatsFromContext(ctx)
- if stats != nil {
- return stats.Fields()
- }
- return nil
-}
-
-func limitStatsFromContext(ctx context.Context) *LimitStats {
- v, ok := ctx.Value(limitStatsKey{}).(*LimitStats)
- if !ok {
- return nil
- }
- return v
-}
-
-// stats interceptors are separate from middleware and serve one main purpose:
-// initialize the stats object early, so that logrus can see it.
-// it must be placed before the limithandler middleware.
-
-// StatsUnaryInterceptor returns a Unary Interceptor that initializes the context.
-func StatsUnaryInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
- ctx = InitLimitStats(ctx)
-
- res, err := handler(ctx, req)
-
- return res, err
-}
-
-// StatsStreamInterceptor returns a Stream Interceptor.
-func StatsStreamInterceptor(srv interface{}, stream grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
- ctx := stream.Context()
- ctx = InitLimitStats(ctx)
-
- wrapped := grpcmw.WrapServerStream(stream)
- wrapped.WrappedContext = ctx
-
- err := handler(srv, wrapped)
-
- return err
-}
diff --git a/internal/grpc/middleware/limithandler/stats_interceptor_test.go b/internal/grpc/middleware/limithandler/stats_interceptor_test.go
deleted file mode 100644
index 010e487c8..000000000
--- a/internal/grpc/middleware/limithandler/stats_interceptor_test.go
+++ /dev/null
@@ -1,171 +0,0 @@
-package limithandler_test
-
-import (
- "context"
- "io"
- "net"
- "testing"
-
- grpcmwlogrus "github.com/grpc-ecosystem/go-grpc-middleware/logging/logrus"
- "github.com/sirupsen/logrus"
- "github.com/sirupsen/logrus/hooks/test"
- "github.com/stretchr/testify/require"
- "gitlab.com/gitlab-org/gitaly/v16/internal/git/catfile"
- "gitlab.com/gitlab-org/gitaly/v16/internal/git/gittest"
- "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/config"
- "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/service/ref"
- "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/transaction"
- "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/backchannel"
- "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/middleware/limithandler"
- "gitlab.com/gitlab-org/gitaly/v16/internal/log"
- "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper"
- "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper/testcfg"
- "gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb"
- "google.golang.org/grpc"
- "google.golang.org/grpc/credentials/insecure"
- "google.golang.org/grpc/test/bufconn"
-)
-
-func createNewServer(t *testing.T, cfg config.Cfg, logger *logrus.Logger) *grpc.Server {
- t.Helper()
-
- logrusEntry := logrus.NewEntry(logger).WithField("test", t.Name())
-
- concurrencyLimitHandler := limithandler.New(
- cfg,
- func(ctx context.Context) string { return "@hashed/1234" },
- limithandler.WithConcurrencyLimiters,
- )
-
- opts := []grpc.ServerOption{
- grpc.ChainStreamInterceptor(
- limithandler.StatsStreamInterceptor,
- grpcmwlogrus.StreamServerInterceptor(logrusEntry,
- grpcmwlogrus.WithTimestampFormat(log.LogTimestampFormat),
- grpcmwlogrus.WithMessageProducer(log.MessageProducer(grpcmwlogrus.DefaultMessageProducer, limithandler.FieldsProducer))),
- concurrencyLimitHandler.StreamInterceptor(),
- ),
- grpc.ChainUnaryInterceptor(
- limithandler.StatsUnaryInterceptor,
- grpcmwlogrus.UnaryServerInterceptor(logrusEntry,
- grpcmwlogrus.WithTimestampFormat(log.LogTimestampFormat),
- grpcmwlogrus.WithMessageProducer(log.MessageProducer(grpcmwlogrus.DefaultMessageProducer, limithandler.FieldsProducer))),
- concurrencyLimitHandler.UnaryInterceptor(),
- ),
- }
-
- server := grpc.NewServer(opts...)
-
- gitCommandFactory := gittest.NewCommandFactory(t, cfg)
- catfileCache := catfile.NewCache(cfg)
- t.Cleanup(catfileCache.Stop)
-
- gitalypb.RegisterRefServiceServer(server, ref.NewServer(
- config.NewLocator(cfg),
- gitCommandFactory,
- transaction.NewManager(cfg, backchannel.NewRegistry()),
- catfileCache,
- ))
-
- return server
-}
-
-func getBufDialer(listener *bufconn.Listener) func(context.Context, string) (net.Conn, error) {
- return func(ctx context.Context, url string) (net.Conn, error) {
- return listener.Dial()
- }
-}
-
-func TestInterceptor(t *testing.T) {
- t.Parallel()
-
- ctx := testhelper.Context(t)
- cfg := testcfg.Build(t, testcfg.WithBase(config.Cfg{
- Concurrency: []config.Concurrency{
- {
- RPC: "/gitaly.RefService/RefExists",
- MaxPerRepo: 1,
- },
- {
- RPC: "/gitaly.RefService/ListRefs",
- MaxPerRepo: 1,
- },
- },
- }))
-
- repo, _ := gittest.CreateRepository(t, ctx, cfg, gittest.CreateRepositoryConfig{
- SkipCreationViaService: true,
- })
-
- logger, hook := test.NewNullLogger()
-
- s := createNewServer(t, cfg, logger)
- defer s.Stop()
-
- bufferSize := 1024 * 1024
- listener := bufconn.Listen(bufferSize)
- go testhelper.MustServe(t, s, listener)
-
- tests := []struct {
- name string
- performRPC func(t *testing.T, ctx context.Context, client gitalypb.RefServiceClient)
- expectedLogData map[string]any
- }{
- {
- name: "Unary",
- performRPC: func(t *testing.T, ctx context.Context, client gitalypb.RefServiceClient) {
- req := &gitalypb.RefExistsRequest{Repository: repo, Ref: []byte("refs/foo")}
-
- _, err := client.RefExists(ctx, req)
- require.NoError(t, err)
- },
- expectedLogData: map[string]any{
- "limit.limiting_type": "per-rpc",
- "limit.limiting_key": "@hashed/1234",
- "limit.concurrency_queue_length": 0,
- },
- },
- {
- name: "Stream",
- performRPC: func(t *testing.T, ctx context.Context, client gitalypb.RefServiceClient) {
- req := &gitalypb.ListRefsRequest{Repository: repo, Patterns: [][]byte{[]byte("refs/heads/")}}
-
- stream, err := client.ListRefs(ctx, req)
- require.NoError(t, err)
-
- for {
- _, err := stream.Recv()
- if err == io.EOF {
- break
- }
- require.NoError(t, err)
- }
- },
- expectedLogData: map[string]any{
- "limit.limiting_type": "per-rpc",
- "limit.limiting_key": "@hashed/1234",
- "limit.concurrency_queue_length": 0,
- },
- },
- }
- for _, tt := range tests {
- t.Run(tt.name, func(t *testing.T) {
- hook.Reset()
-
- conn, err := grpc.DialContext(ctx, "", grpc.WithContextDialer(getBufDialer(listener)), grpc.WithTransportCredentials(insecure.NewCredentials()))
- require.NoError(t, err)
- defer conn.Close()
-
- client := gitalypb.NewRefServiceClient(conn)
-
- tt.performRPC(t, ctx, client)
-
- logEntries := hook.AllEntries()
- require.Len(t, logEntries, 1)
- for expectedLogKey, expectedLogValue := range tt.expectedLogData {
- require.Equal(t, expectedLogValue, logEntries[0].Data[expectedLogKey])
- }
- require.GreaterOrEqual(t, logEntries[0].Data["limit.concurrency_queue_ms"], int64(0))
- })
- }
-}
diff --git a/internal/grpc/middleware/limithandler/stats_test.go b/internal/grpc/middleware/limithandler/stats_test.go
deleted file mode 100644
index 53ccccfd5..000000000
--- a/internal/grpc/middleware/limithandler/stats_test.go
+++ /dev/null
@@ -1,30 +0,0 @@
-package limithandler
-
-import (
- "testing"
-
- "github.com/sirupsen/logrus"
- "github.com/stretchr/testify/assert"
- "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper"
-)
-
-func TestLimitStats(t *testing.T) {
- t.Parallel()
-
- ctx := testhelper.Context(t)
- ctx = InitLimitStats(ctx)
-
- stats := limitStatsFromContext(ctx)
- stats.SetLimitingKey("test-limiter", "hello-world")
- stats.AddConcurrencyQueueMs(13)
- stats.SetConcurrencyQueueLength(99)
- stats.SetConcurrencyDroppedReason("max_time")
-
- assert.Equal(t, FieldsProducer(ctx, nil), logrus.Fields{
- "limit.limiting_type": "test-limiter",
- "limit.limiting_key": "hello-world",
- "limit.concurrency_queue_ms": int64(13),
- "limit.concurrency_queue_length": 99,
- "limit.concurrency_dropped": "max_time",
- })
-}
diff --git a/internal/grpc/middleware/limithandler/concurrency_limiter.go b/internal/limiter/concurrency_limiter.go
index 52b4c293b..2807b856e 100644
--- a/internal/grpc/middleware/limithandler/concurrency_limiter.go
+++ b/internal/limiter/concurrency_limiter.go
@@ -1,4 +1,4 @@
-package limithandler
+package limiter
import (
"context"
@@ -7,8 +7,6 @@ import (
"sync"
"time"
- "github.com/prometheus/client_golang/prometheus"
- "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/config"
"gitlab.com/gitlab-org/gitaly/v16/internal/helper"
"gitlab.com/gitlab-org/gitaly/v16/internal/structerr"
"gitlab.com/gitlab-org/gitaly/v16/internal/tracing"
@@ -173,7 +171,7 @@ func NewConcurrencyLimiter(maxConcurrencyLimit, maxQueueLength int, maxQueuedTic
func (c *ConcurrencyLimiter) Limit(ctx context.Context, limitingKey string, f LimitedFunc) (interface{}, error) {
span, ctx := tracing.StartSpanIfHasParent(
ctx,
- "limithandler.ConcurrencyLimiter.Limit",
+ "limiter.ConcurrencyLimiter.Limit",
tracing.Tags{"key": limitingKey},
)
defer span.Finish()
@@ -270,85 +268,3 @@ func (c *ConcurrencyLimiter) countSemaphores() int {
return len(c.limitsByKey)
}
-
-// WithConcurrencyLimiters sets up middleware to limit the concurrency of
-// requests based on RPC and repository
-func WithConcurrencyLimiters(cfg config.Cfg, middleware *LimiterMiddleware) {
- acquiringSecondsMetric := prometheus.NewHistogramVec(
- prometheus.HistogramOpts{
- Namespace: "gitaly",
- Subsystem: "concurrency_limiting",
- Name: "acquiring_seconds",
- Help: "Histogram of time calls are rate limited (in seconds)",
- Buckets: cfg.Prometheus.GRPCLatencyBuckets,
- },
- []string{"system", "grpc_service", "grpc_method"},
- )
- inProgressMetric := prometheus.NewGaugeVec(
- prometheus.GaugeOpts{
- Namespace: "gitaly",
- Subsystem: "concurrency_limiting",
- Name: "in_progress",
- Help: "Gauge of number of concurrent in-progress calls",
- },
- []string{"system", "grpc_service", "grpc_method"},
- )
- queuedMetric := prometheus.NewGaugeVec(
- prometheus.GaugeOpts{
- Namespace: "gitaly",
- Subsystem: "concurrency_limiting",
- Name: "queued",
- Help: "Gauge of number of queued calls",
- },
- []string{"system", "grpc_service", "grpc_method"},
- )
-
- middleware.collect = func(metrics chan<- prometheus.Metric) {
- acquiringSecondsMetric.Collect(metrics)
- inProgressMetric.Collect(metrics)
- queuedMetric.Collect(metrics)
- }
-
- result := make(map[string]Limiter)
- for _, limit := range cfg.Concurrency {
- limit := limit
-
- newTickerFunc := func() helper.Ticker {
- return helper.NewManualTicker()
- }
-
- if limit.MaxQueueWait > 0 {
- newTickerFunc = func() helper.Ticker {
- return helper.NewTimerTicker(limit.MaxQueueWait.Duration())
- }
- }
-
- result[limit.RPC] = NewConcurrencyLimiter(
- limit.MaxPerRepo,
- limit.MaxQueueSize,
- newTickerFunc,
- newPerRPCPromMonitor(
- "gitaly", limit.RPC,
- queuedMetric, inProgressMetric, acquiringSecondsMetric, middleware.requestsDroppedMetric,
- ),
- )
- }
-
- // Set default for ReplicateRepository.
- replicateRepositoryFullMethod := "/gitaly.RepositoryService/ReplicateRepository"
- if _, ok := result[replicateRepositoryFullMethod]; !ok {
- result[replicateRepositoryFullMethod] = NewConcurrencyLimiter(
- 1,
- 0,
- func() helper.Ticker {
- return helper.NewManualTicker()
- },
- newPerRPCPromMonitor(
- "gitaly", replicateRepositoryFullMethod,
- queuedMetric, inProgressMetric, acquiringSecondsMetric, middleware.requestsDroppedMetric,
- ),
- )
- }
-
- middleware.methodLimiters = result
-}
diff --git a/internal/grpc/middleware/limithandler/concurrency_limiter_test.go b/internal/limiter/concurrency_limiter_test.go
index 02adb7d04..f99ca7eab 100644
--- a/internal/grpc/middleware/limithandler/concurrency_limiter_test.go
+++ b/internal/limiter/concurrency_limiter_test.go
@@ -1,4 +1,4 @@
-package limithandler
+package limiter
import (
"context"
diff --git a/internal/limiter/limiter.go b/internal/limiter/limiter.go
new file mode 100644
index 000000000..74c643662
--- /dev/null
+++ b/internal/limiter/limiter.go
@@ -0,0 +1,11 @@
+package limiter
+
+import "context"
+
+// Limiter limits incoming requests
+type Limiter interface {
+ Limit(ctx context.Context, lockKey string, f LimitedFunc) (interface{}, error)
+}
+
+// LimitedFunc represents a function that will be limited
+type LimitedFunc func() (resp interface{}, err error)
diff --git a/internal/grpc/middleware/limithandler/monitor.go b/internal/limiter/monitor.go
index b251df6d4..b1e074d41 100644
--- a/internal/grpc/middleware/limithandler/monitor.go
+++ b/internal/limiter/monitor.go
@@ -1,4 +1,4 @@
-package limithandler
+package limiter
import (
"context"
@@ -6,6 +6,7 @@ import (
"time"
"github.com/prometheus/client_golang/prometheus"
+ "gitlab.com/gitlab-org/gitaly/v16/internal/log"
)
// ConcurrencyMonitor allows the concurrency monitor to be observed.
@@ -30,7 +31,7 @@ func NewNoopConcurrencyMonitor() ConcurrencyMonitor {
return &noopConcurrencyMonitor{}
}
-// PromMonitor keeps track of prometheus metrics for limithandlers.
+// PromMonitor keeps track of prometheus metrics for limiters.
// It conforms to both the ConcurrencyMonitor, and prometheus.Collector
// interfaces.
type PromMonitor struct {
@@ -45,9 +46,9 @@ type PromMonitor struct {
acquiringSecondsHistogramVec *prometheus.HistogramVec
}
-// newPerRPCPromMonitor creates a new ConcurrencyMonitor that tracks limiter
+// NewPerRPCPromMonitor creates a new ConcurrencyMonitor that tracks limiter
// activity in Prometheus.
-func newPerRPCPromMonitor(
+func NewPerRPCPromMonitor(
system, fullMethod string,
queuedMetric, inProgressMetric *prometheus.GaugeVec,
acquiringSecondsVec *prometheus.HistogramVec,
@@ -71,9 +72,10 @@ func newPerRPCPromMonitor(
// Queued is called when a request has been queued.
func (p *PromMonitor) Queued(ctx context.Context, key string, queueLength int) {
- if stats := limitStatsFromContext(ctx); stats != nil {
- stats.SetLimitingKey(p.limitingType, key)
- stats.SetConcurrencyQueueLength(queueLength)
+ if stats := log.CustomFieldsFromContext(ctx); stats != nil {
+ stats.RecordMetadata("limit.limiting_type", p.limitingType)
+ stats.RecordMetadata("limit.limiting_key", key)
+ stats.RecordMetadata("limit.concurrency_queue_length", queueLength)
}
p.queuedMetric.Inc()
}
@@ -88,8 +90,8 @@ func (p *PromMonitor) Enter(ctx context.Context, acquireTime time.Duration) {
p.inProgressMetric.Inc()
p.acquiringSecondsMetric.Observe(acquireTime.Seconds())
- if stats := limitStatsFromContext(ctx); stats != nil {
- stats.AddConcurrencyQueueMs(acquireTime.Milliseconds())
+ if stats := log.CustomFieldsFromContext(ctx); stats != nil {
+ stats.RecordMetadata("limit.concurrency_queue_ms", acquireTime.Milliseconds())
}
}
@@ -100,11 +102,12 @@ func (p *PromMonitor) Exit(ctx context.Context) {
// Dropped is called when a request is dropped.
func (p *PromMonitor) Dropped(ctx context.Context, key string, length int, acquireTime time.Duration, reason string) {
- if stats := limitStatsFromContext(ctx); stats != nil {
- stats.SetLimitingKey(p.limitingType, key)
- stats.SetConcurrencyQueueLength(length)
- stats.SetConcurrencyDroppedReason(reason)
- stats.AddConcurrencyQueueMs(acquireTime.Milliseconds())
+ if stats := log.CustomFieldsFromContext(ctx); stats != nil {
+ stats.RecordMetadata("limit.limiting_type", p.limitingType)
+ stats.RecordMetadata("limit.limiting_key", key)
+ stats.RecordMetadata("limit.concurrency_queue_length", length)
+ stats.RecordMetadata("limit.concurrency_dropped", reason)
+ stats.RecordMetadata("limit.concurrency_queue_ms", acquireTime.Milliseconds())
}
p.requestsDroppedMetric.WithLabelValues(reason).Inc()
}
diff --git a/internal/grpc/middleware/limithandler/monitor_test.go b/internal/limiter/monitor_test.go
index 3b5d6163f..59fbbe7c8 100644
--- a/internal/grpc/middleware/limithandler/monitor_test.go
+++ b/internal/limiter/monitor_test.go
@@ -1,4 +1,4 @@
-package limithandler
+package limiter
import (
"bytes"
@@ -10,6 +10,7 @@ import (
"github.com/sirupsen/logrus"
"github.com/stretchr/testify/require"
promconfig "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/config/prometheus"
+ "gitlab.com/gitlab-org/gitaly/v16/internal/log"
"gitlab.com/gitlab-org/gitaly/v16/internal/testhelper"
)
@@ -51,7 +52,7 @@ func TestNewPerRPCPromMonitor(t *testing.T) {
"reason",
},
)
- return newPerRPCPromMonitor(
+ return NewPerRPCPromMonitor(
system,
fullMethod,
queuedMetric,
@@ -63,7 +64,7 @@ func TestNewPerRPCPromMonitor(t *testing.T) {
t.Run("request is dequeued successfully", func(t *testing.T) {
rpcMonitor := createNewMonitor()
- ctx := InitLimitStats(testhelper.Context(t))
+ ctx := log.InitContextCustomFields(testhelper.Context(t))
rpcMonitor.Queued(ctx, fullMethod, 5)
rpcMonitor.Enter(ctx, time.Second)
@@ -101,7 +102,7 @@ queued{grpc_method="unknown",grpc_service="unknown",system="gitaly"} 1
"acquiring_seconds",
))
- stats := limitStatsFromContext(ctx)
+ stats := log.CustomFieldsFromContext(ctx)
require.NotNil(t, stats)
require.Equal(t, logrus.Fields{
"limit.limiting_type": TypePerRPC,
@@ -132,7 +133,7 @@ queued{grpc_method="unknown",grpc_service="unknown",system="gitaly"} 0
t.Run("request is dropped after queueing", func(t *testing.T) {
rpcMonitor := createNewMonitor()
- ctx := InitLimitStats(testhelper.Context(t))
+ ctx := log.InitContextCustomFields(testhelper.Context(t))
rpcMonitor.Queued(ctx, fullMethod, 5)
rpcMonitor.Dropped(ctx, fullMethod, 5, time.Second, "load")
@@ -173,7 +174,7 @@ queued{grpc_method="unknown",grpc_service="unknown",system="gitaly"} 1
"acquiring_seconds",
))
- stats := limitStatsFromContext(ctx)
+ stats := log.CustomFieldsFromContext(ctx)
require.NotNil(t, stats)
require.Equal(t, logrus.Fields{
"limit.limiting_type": TypePerRPC,
@@ -186,7 +187,7 @@ queued{grpc_method="unknown",grpc_service="unknown",system="gitaly"} 1
t.Run("request is dropped before queueing", func(t *testing.T) {
rpcMonitor := createNewMonitor()
- ctx := InitLimitStats(testhelper.Context(t))
+ ctx := log.InitContextCustomFields(testhelper.Context(t))
rpcMonitor.Dropped(ctx, fullMethod, 5, time.Second, "load")
expectedMetrics := `# HELP acquiring_seconds seconds to acquire
@@ -225,7 +226,7 @@ queued{grpc_method="unknown",grpc_service="unknown",system="gitaly"} 0
"acquiring_seconds",
))
- stats := limitStatsFromContext(ctx)
+ stats := log.CustomFieldsFromContext(ctx)
require.NotNil(t, stats)
require.Equal(t, logrus.Fields{
"limit.limiting_type": TypePerRPC,
@@ -239,7 +240,7 @@ queued{grpc_method="unknown",grpc_service="unknown",system="gitaly"} 0
func TestNewPackObjectsConcurrencyMonitor(t *testing.T) {
t.Run("request is dequeued successfully", func(t *testing.T) {
- ctx := InitLimitStats(testhelper.Context(t))
+ ctx := log.InitContextCustomFields(testhelper.Context(t))
packObjectsConcurrencyMonitor := NewPackObjectsConcurrencyMonitor(
promconfig.DefaultConfig().GRPCLatencyBuckets,
)
@@ -280,7 +281,7 @@ gitaly_pack_objects_queued 1
"gitaly_pack_objects_dropped_total",
))
- stats := limitStatsFromContext(ctx)
+ stats := log.CustomFieldsFromContext(ctx)
require.NotNil(t, stats)
require.Equal(t, logrus.Fields{
"limit.limiting_type": TypePackObjects,
@@ -310,7 +311,7 @@ gitaly_pack_objects_queued 0
})
t.Run("request is dropped after queueing", func(t *testing.T) {
- ctx := InitLimitStats(testhelper.Context(t))
+ ctx := log.InitContextCustomFields(testhelper.Context(t))
packObjectsConcurrencyMonitor := NewPackObjectsConcurrencyMonitor(
promconfig.DefaultConfig().GRPCLatencyBuckets,
)
@@ -354,7 +355,7 @@ gitaly_pack_objects_queued 1
"gitaly_pack_objects_dropped_total",
))
- stats := limitStatsFromContext(ctx)
+ stats := log.CustomFieldsFromContext(ctx)
require.NotNil(t, stats)
require.Equal(t, logrus.Fields{
"limit.limiting_type": TypePackObjects,
@@ -366,7 +367,7 @@ gitaly_pack_objects_queued 1
})
t.Run("request is dropped before queueing", func(t *testing.T) {
- ctx := InitLimitStats(testhelper.Context(t))
+ ctx := log.InitContextCustomFields(testhelper.Context(t))
packObjectsConcurrencyMonitor := NewPackObjectsConcurrencyMonitor(
promconfig.DefaultConfig().GRPCLatencyBuckets,
)
@@ -409,7 +410,7 @@ gitaly_pack_objects_queued 0
"gitaly_pack_objects_dropped_total",
))
- stats := limitStatsFromContext(ctx)
+ stats := log.CustomFieldsFromContext(ctx)
require.NotNil(t, stats)
require.Equal(t, logrus.Fields{
"limit.limiting_type": TypePackObjects,
diff --git a/internal/grpc/middleware/limithandler/rate_limiter.go b/internal/limiter/rate_limiter.go
index 577e68b1d..73e0e5b6a 100644
--- a/internal/grpc/middleware/limithandler/rate_limiter.go
+++ b/internal/limiter/rate_limiter.go
@@ -1,4 +1,4 @@
-package limithandler
+package limiter
import (
"context"
@@ -7,7 +7,6 @@ import (
"time"
"github.com/prometheus/client_golang/prometheus"
- "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/config"
"gitlab.com/gitlab-org/gitaly/v16/internal/helper"
"gitlab.com/gitlab-org/gitaly/v16/internal/structerr"
"gitlab.com/gitlab-org/gitaly/v16/internal/tracing"
@@ -35,7 +34,7 @@ var ErrRateLimit = errors.New("rate limit reached")
func (r *RateLimiter) Limit(ctx context.Context, lockKey string, f LimitedFunc) (interface{}, error) {
span, _ := tracing.StartSpanIfHasParent(
ctx,
- "limithandler.RateLimiterLimit",
+ "limiter.RateLimiterLimit",
tracing.Tags{"key": lockKey},
)
defer span.Finish()
@@ -104,32 +103,3 @@ func NewRateLimiter(
return r
}
-
-// WithRateLimiters sets up a middleware with limiters that limit requests
-// based on its rate per second per RPC
-func WithRateLimiters(ctx context.Context) SetupFunc {
- return func(cfg config.Cfg, middleware *LimiterMiddleware) {
- result := make(map[string]Limiter)
-
- for _, limitCfg := range cfg.RateLimiting {
- if limitCfg.Burst > 0 && limitCfg.Interval > 0 {
- serviceName, methodName := splitMethodName(limitCfg.RPC)
- rateLimiter := NewRateLimiter(
- limitCfg.Interval.Duration(),
- limitCfg.Burst,
- helper.NewTimerTicker(5*time.Minute),
- middleware.requestsDroppedMetric.With(prometheus.Labels{
- "system": "gitaly",
- "grpc_service": serviceName,
- "grpc_method": methodName,
- "reason": "rate",
- }),
- )
- result[limitCfg.RPC] = rateLimiter
- go rateLimiter.PruneUnusedLimiters(ctx)
- }
- }
-
- middleware.methodLimiters = result
- }
-}
diff --git a/internal/grpc/middleware/limithandler/rate_limiter_test.go b/internal/limiter/rate_limiter_test.go
index 5e88b8f6b..5a39a694c 100644
--- a/internal/grpc/middleware/limithandler/rate_limiter_test.go
+++ b/internal/limiter/rate_limiter_test.go
@@ -1,4 +1,4 @@
-package limithandler
+package limiter
import (
"testing"
diff --git a/internal/log/customfields.go b/internal/log/customfields.go
index 1622810f2..38874e15c 100644
--- a/internal/log/customfields.go
+++ b/internal/log/customfields.go
@@ -14,7 +14,7 @@ type requestCustomFieldsKey struct{}
// the object out with CustomFieldsFromContext.
type CustomFields struct {
numericFields map[string]int
- stringFields map[string]string
+ anyFields map[string]any
sync.Mutex
}
@@ -45,11 +45,11 @@ func (fields *CustomFields) RecordMax(key string, value int) {
}
// RecordMetadata records a string metadata for the given key.
-func (fields *CustomFields) RecordMetadata(key string, value string) {
+func (fields *CustomFields) RecordMetadata(key string, value any) {
fields.Lock()
defer fields.Unlock()
- fields.stringFields[key] = value
+ fields.anyFields[key] = value
}
// Fields returns all the fields as logrus.Fields
@@ -61,7 +61,7 @@ func (fields *CustomFields) Fields() logrus.Fields {
for k, v := range fields.numericFields {
f[k] = v
}
- for k, v := range fields.stringFields {
+ for k, v := range fields.anyFields {
f[k] = v
}
return f
@@ -77,6 +77,6 @@ func CustomFieldsFromContext(ctx context.Context) *CustomFields {
func InitContextCustomFields(ctx context.Context) context.Context {
return context.WithValue(ctx, requestCustomFieldsKey{}, &CustomFields{
numericFields: make(map[string]int),
- stringFields: make(map[string]string),
+ anyFields: make(map[string]any),
})
}
diff --git a/internal/log/customfields_test.go b/internal/log/customfields_test.go
index 07577b17c..b2d89b707 100644
--- a/internal/log/customfields_test.go
+++ b/internal/log/customfields_test.go
@@ -1,6 +1,7 @@
package log_test
import (
+ "context"
"testing"
"github.com/sirupsen/logrus"
@@ -73,17 +74,46 @@ func TestStatsFromContext_RecordMax(t *testing.T) {
}
func TestStatsFromContext_RecordMetadata(t *testing.T) {
- ctx := testhelper.Context(t)
-
- ctx = log.InitContextCustomFields(ctx)
-
- customFields := log.CustomFieldsFromContext(ctx)
-
- customFields.RecordMetadata("foo", "bar")
- require.NotNil(t, customFields)
- require.Equal(t, customFields.Fields(), logrus.Fields{"foo": "bar"})
-
- customFields.RecordMetadata("foo", "baz") // override the existing value
- require.NotNil(t, customFields)
- require.Equal(t, customFields.Fields(), logrus.Fields{"foo": "baz"})
+ for _, tc := range []struct {
+ desc string
+ setup func(context.Context)
+ expectedFields logrus.Fields
+ }{
+ {
+ desc: "record a string metadata",
+ setup: func(ctx context.Context) {
+ customFields := log.CustomFieldsFromContext(ctx)
+ customFields.RecordMetadata("foo", "bar")
+ },
+ expectedFields: logrus.Fields{"foo": "bar"},
+ },
+ {
+ desc: "override metadata of the same key",
+ setup: func(ctx context.Context) {
+ customFields := log.CustomFieldsFromContext(ctx)
+ customFields.RecordMetadata("foo", "bar")
+ customFields.RecordMetadata("foo", "baz") // override the existing value
+ },
+ expectedFields: logrus.Fields{"foo": "baz"},
+ },
+ {
+ desc: "record metadata with different types",
+ setup: func(ctx context.Context) {
+ customFields := log.CustomFieldsFromContext(ctx)
+ customFields.RecordMetadata("hello", 1234)
+ customFields.RecordMetadata("hi", []int{1, 2, 3, 4})
+ },
+ expectedFields: logrus.Fields{
+ "hello": 1234,
+ "hi": []int{1, 2, 3, 4},
+ },
+ },
+ } {
+ ctx := log.InitContextCustomFields(testhelper.Context(t))
+ tc.setup(ctx)
+
+ customFields := log.CustomFieldsFromContext(ctx)
+ require.NotNil(t, customFields)
+ require.Equal(t, customFields.Fields(), tc.expectedFields)
+ }
}
diff --git a/internal/testhelper/testserver/gitaly.go b/internal/testhelper/testserver/gitaly.go
index 1b78c581d..1b62c903f 100644
--- a/internal/testhelper/testserver/gitaly.go
+++ b/internal/testhelper/testserver/gitaly.go
@@ -34,6 +34,7 @@ import (
internalclient "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/client"
"gitlab.com/gitlab-org/gitaly/v16/internal/grpc/middleware/limithandler"
"gitlab.com/gitlab-org/gitaly/v16/internal/helper/perm"
+ "gitlab.com/gitlab-org/gitaly/v16/internal/limiter"
praefectconfig "gitlab.com/gitlab-org/gitaly/v16/internal/praefect/config"
"gitlab.com/gitlab-org/gitaly/v16/internal/streamcache"
"gitlab.com/gitlab-org/gitaly/v16/internal/testhelper"
@@ -263,7 +264,7 @@ type gitalyServerDeps struct {
catfileCache catfile.Cache
diskCache cache.Cache
packObjectsCache streamcache.Cache
- packObjectsLimiter limithandler.Limiter
+ packObjectsLimiter limiter.Limiter
limitHandler *limithandler.LimiterMiddleware
git2goExecutor *git2go.Executor
updaterWithHooks *updateref.UpdaterWithHooks
@@ -323,11 +324,11 @@ func (gsd *gitalyServerDeps) createDependencies(tb testing.TB, cfg config.Cfg) *
}
if gsd.packObjectsLimiter == nil {
- gsd.packObjectsLimiter = limithandler.NewConcurrencyLimiter(
+ gsd.packObjectsLimiter = limiter.NewConcurrencyLimiter(
0,
0,
nil,
- limithandler.NewNoopConcurrencyMonitor(),
+ limiter.NewNoopConcurrencyMonitor(),
)
}
@@ -462,7 +463,7 @@ func WithDiskCache(diskCache cache.Cache) GitalyServerOpt {
// WithPackObjectsLimiter sets the PackObjectsLimiter that will be
// used for gitaly services initialization.
-func WithPackObjectsLimiter(limiter *limithandler.ConcurrencyLimiter) GitalyServerOpt {
+func WithPackObjectsLimiter(limiter *limiter.ConcurrencyLimiter) GitalyServerOpt {
return func(deps gitalyServerDeps) gitalyServerDeps {
deps.packObjectsLimiter = limiter
return deps