diff options
author | Patrick Steinhardt <psteinhardt@gitlab.com> | 2021-12-14 16:01:23 +0300 |
---|---|---|
committer | Patrick Steinhardt <psteinhardt@gitlab.com> | 2021-12-15 10:15:46 +0300 |
commit | be7617b20bbeb81c1036443c6b176574cc3d3b8f (patch) | |
tree | 8b02eb47a96ddd42269e8424ca2927fbe75936b5 | |
parent | bbc8ddda6b33ee1afa88b328a5cf53ce7f5d64f0 (diff) |
limithandler: Refactor package to not use global state
The limithandler package uses global variables to both track Prometheus
metrics and to handle its configuration. Especially the latter is really
fragile, where we need to set up state of the limithandler in various
places by calling global functions.
Fix this by making the `LimiterMiddleware` self-contained, where it
hosts all configuration as well as the Prometheus metrics.
-rw-r--r-- | cmd/gitaly-ssh/auth_test.go | 2 | ||||
-rw-r--r-- | cmd/gitaly/main.go | 3 | ||||
-rw-r--r-- | internal/gitaly/config/concurrency.go | 22 | ||||
-rw-r--r-- | internal/gitaly/config/prometheus/config.go | 3 | ||||
-rw-r--r-- | internal/gitaly/server/auth_test.go | 4 | ||||
-rw-r--r-- | internal/gitaly/server/server_factory_test.go | 10 | ||||
-rw-r--r-- | internal/gitaly/service/repository/create_fork_test.go | 2 | ||||
-rw-r--r-- | internal/middleware/limithandler/metrics.go | 45 | ||||
-rw-r--r-- | internal/middleware/limithandler/middleware.go | 74 | ||||
-rw-r--r-- | internal/middleware/limithandler/middleware_test.go | 20 | ||||
-rw-r--r-- | internal/middleware/limithandler/monitor.go | 34 | ||||
-rw-r--r-- | internal/testhelper/testserver/gitaly.go | 2 |
12 files changed, 100 insertions, 121 deletions
diff --git a/cmd/gitaly-ssh/auth_test.go b/cmd/gitaly-ssh/auth_test.go index c15901765..3dd82bea1 100644 --- a/cmd/gitaly-ssh/auth_test.go +++ b/cmd/gitaly-ssh/auth_test.go @@ -155,7 +155,7 @@ func runServer(t *testing.T, secure bool, cfg config.Cfg, connectionType string, t, gitlab.MockAllowed, gitlab.MockPreReceive, gitlab.MockPostReceive, ), cfg) gitCmdFactory := git.NewExecCommandFactory(cfg) - limitHandler := limithandler.New(limithandler.LimitConcurrencyByRepo) + limitHandler := limithandler.New(cfg, limithandler.LimitConcurrencyByRepo) diskCache := cache.New(cfg, locator) srv, err := server.New(secure, cfg, testhelper.NewDiscardingLogEntry(t), registry, diskCache, limitHandler) require.NoError(t, err) diff --git a/cmd/gitaly/main.go b/cmd/gitaly/main.go index 2699fa5de..8b0e0da6e 100644 --- a/cmd/gitaly/main.go +++ b/cmd/gitaly/main.go @@ -192,7 +192,8 @@ func run(cfg config.Cfg) error { return fmt.Errorf("disk cache walkers: %w", err) } - limitHandler := limithandler.New(limithandler.LimitConcurrencyByRepo) + limitHandler := limithandler.New(cfg, limithandler.LimitConcurrencyByRepo) + prometheus.MustRegister(limitHandler) gitalyServerFactory := server.NewGitalyServerFactory(cfg, glog.Default(), registry, diskCache, limitHandler) defer gitalyServerFactory.Stop() diff --git a/internal/gitaly/config/concurrency.go b/internal/gitaly/config/concurrency.go deleted file mode 100644 index 1c54a0250..000000000 --- a/internal/gitaly/config/concurrency.go +++ /dev/null @@ -1,22 +0,0 @@ -package config - -import ( - "gitlab.com/gitlab-org/gitaly/v14/internal/middleware/limithandler" -) - -// ConfigureConcurrencyLimits configures the per-repo, per RPC rate limits -func ConfigureConcurrencyLimits(cfg Cfg) { - maxConcurrencyPerRepoPerRPC := make(map[string]int) - - for _, v := range cfg.Concurrency { - maxConcurrencyPerRepoPerRPC[v.RPC] = v.MaxPerRepo - } - - // Set default for ReplicateRepository - replicateRepositoryFullMethod := "/gitaly.RepositoryService/ReplicateRepository" - if _, ok := maxConcurrencyPerRepoPerRPC[replicateRepositoryFullMethod]; !ok { - maxConcurrencyPerRepoPerRPC[replicateRepositoryFullMethod] = 1 - } - - limithandler.SetMaxRepoConcurrency(maxConcurrencyPerRepoPerRPC) -} diff --git a/internal/gitaly/config/prometheus/config.go b/internal/gitaly/config/prometheus/config.go index 8d3c91fc4..4873eec1d 100644 --- a/internal/gitaly/config/prometheus/config.go +++ b/internal/gitaly/config/prometheus/config.go @@ -6,7 +6,6 @@ import ( grpcprometheus "github.com/grpc-ecosystem/go-grpc-prometheus" "github.com/prometheus/client_golang/prometheus" log "github.com/sirupsen/logrus" - "gitlab.com/gitlab-org/gitaly/v14/internal/middleware/limithandler" ) // Config contains additional configuration data for prometheus @@ -40,6 +39,4 @@ func (c *Config) Configure() { grpcprometheus.EnableClientHandlingTimeHistogram(func(histogramOpts *prometheus.HistogramOpts) { histogramOpts.Buckets = c.GRPCLatencyBuckets }) - - limithandler.EnableAcquireTimeHistogram(c.GRPCLatencyBuckets) } diff --git a/internal/gitaly/server/auth_test.go b/internal/gitaly/server/auth_test.go index 907f8f3af..44ffcf08c 100644 --- a/internal/gitaly/server/auth_test.go +++ b/internal/gitaly/server/auth_test.go @@ -201,7 +201,7 @@ func runServer(t *testing.T, cfg config.Cfg) string { catfileCache := catfile.NewCache(cfg) t.Cleanup(catfileCache.Stop) diskCache := cache.New(cfg, locator) - limitHandler := limithandler.New(limithandler.LimitConcurrencyByRepo) + limitHandler := limithandler.New(cfg, limithandler.LimitConcurrencyByRepo) srv, err := New(false, cfg, testhelper.NewDiscardingLogEntry(t), registry, diskCache, limitHandler) require.NoError(t, err) @@ -243,7 +243,7 @@ func runSecureServer(t *testing.T, cfg config.Cfg) string { testhelper.NewDiscardingLogEntry(t), backchannel.NewRegistry(), cache.New(cfg, config.NewLocator(cfg)), - limithandler.New(limithandler.LimitConcurrencyByRepo), + limithandler.New(cfg, limithandler.LimitConcurrencyByRepo), ) require.NoError(t, err) diff --git a/internal/gitaly/server/server_factory_test.go b/internal/gitaly/server/server_factory_test.go index 7a16fe4b7..6e29b9cd3 100644 --- a/internal/gitaly/server/server_factory_test.go +++ b/internal/gitaly/server/server_factory_test.go @@ -94,7 +94,7 @@ func TestGitalyServerFactory(t *testing.T) { testhelper.NewDiscardingLogEntry(t), backchannel.NewRegistry(), cache.New(cfg, config.NewLocator(cfg)), - limithandler.New(limithandler.LimitConcurrencyByRepo), + limithandler.New(cfg, limithandler.LimitConcurrencyByRepo), ) checkHealth(t, sf, starter.TCP, "localhost:0") @@ -113,7 +113,7 @@ func TestGitalyServerFactory(t *testing.T) { testhelper.NewDiscardingLogEntry(t), backchannel.NewRegistry(), cache.New(cfg, config.NewLocator(cfg)), - limithandler.New(limithandler.LimitConcurrencyByRepo), + limithandler.New(cfg, limithandler.LimitConcurrencyByRepo), ) t.Cleanup(sf.Stop) @@ -127,7 +127,7 @@ func TestGitalyServerFactory(t *testing.T) { testhelper.NewDiscardingLogEntry(t), backchannel.NewRegistry(), cache.New(cfg, config.NewLocator(cfg)), - limithandler.New(limithandler.LimitConcurrencyByRepo), + limithandler.New(cfg, limithandler.LimitConcurrencyByRepo), ) t.Cleanup(sf.Stop) @@ -155,7 +155,7 @@ func TestGitalyServerFactory(t *testing.T) { logger.WithContext(ctx), backchannel.NewRegistry(), cache.New(cfg, config.NewLocator(cfg)), - limithandler.New(limithandler.LimitConcurrencyByRepo), + limithandler.New(cfg, limithandler.LimitConcurrencyByRepo), ) checkHealth(t, sf, starter.TCP, "localhost:0") @@ -190,7 +190,7 @@ func TestGitalyServerFactory_closeOrder(t *testing.T) { testhelper.NewDiscardingLogEntry(t), backchannel.NewRegistry(), cache.New(cfg, config.NewLocator(cfg)), - limithandler.New(limithandler.LimitConcurrencyByRepo), + limithandler.New(cfg, limithandler.LimitConcurrencyByRepo), ) defer sf.Stop() diff --git a/internal/gitaly/service/repository/create_fork_test.go b/internal/gitaly/service/repository/create_fork_test.go index 28512be86..ec8721697 100644 --- a/internal/gitaly/service/repository/create_fork_test.go +++ b/internal/gitaly/service/repository/create_fork_test.go @@ -271,7 +271,7 @@ func runSecureServer(t *testing.T, cfg config.Cfg, rubySrv *rubyserver.Server) s registry := backchannel.NewRegistry() locator := config.NewLocator(cfg) cache := cache.New(cfg, locator) - limitHandler := limithandler.New(limithandler.LimitConcurrencyByRepo) + limitHandler := limithandler.New(cfg, limithandler.LimitConcurrencyByRepo) server, err := gserver.New(true, cfg, testhelper.NewDiscardingLogEntry(t), registry, cache, limitHandler) require.NoError(t, err) listener, addr := testhelper.GetLocalhostListener(t) diff --git a/internal/middleware/limithandler/metrics.go b/internal/middleware/limithandler/metrics.go deleted file mode 100644 index 77fd77137..000000000 --- a/internal/middleware/limithandler/metrics.go +++ /dev/null @@ -1,45 +0,0 @@ -package limithandler - -import ( - "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/client_golang/prometheus/promauto" -) - -var ( - histogramVec *prometheus.HistogramVec - inprogressGaugeVec = promauto.NewGaugeVec( - prometheus.GaugeOpts{ - Namespace: "gitaly", - Subsystem: "rate_limiting", - Name: "in_progress", - Help: "Gauge of number of concurrent in-progress calls", - }, - []string{"system", "grpc_service", "grpc_method"}, - ) - - queuedGaugeVec = promauto.NewGaugeVec( - prometheus.GaugeOpts{ - Namespace: "gitaly", - Subsystem: "rate_limiting", - Name: "queued", - Help: "Gauge of number of queued calls", - }, - []string{"system", "grpc_service", "grpc_method"}, - ) -) - -// EnableAcquireTimeHistogram enables histograms for acquisition times -func EnableAcquireTimeHistogram(buckets []float64) { - histogramOpts := prometheus.HistogramOpts{ - Namespace: "gitaly", - Subsystem: "rate_limiting", - Name: "acquiring_seconds", - Help: "Histogram of time calls are rate limited (in seconds)", - Buckets: buckets, - } - - histogramVec = promauto.NewHistogramVec( - histogramOpts, - []string{"system", "grpc_service", "grpc_method"}, - ) -} diff --git a/internal/middleware/limithandler/middleware.go b/internal/middleware/limithandler/middleware.go index f5e14b30e..febc25576 100644 --- a/internal/middleware/limithandler/middleware.go +++ b/internal/middleware/limithandler/middleware.go @@ -5,11 +5,11 @@ import ( "github.com/grpc-ecosystem/go-grpc-middleware/logging/logrus/ctxlogrus" grpcmwtags "github.com/grpc-ecosystem/go-grpc-middleware/tags" + "github.com/prometheus/client_golang/prometheus" + "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/config" "google.golang.org/grpc" ) -var maxConcurrencyPerRepoPerRPC map[string]int - // GetLockKey function defines the lock key of an RPC invocation based on its context type GetLockKey func(context.Context) string @@ -33,14 +33,60 @@ func LimitConcurrencyByRepo(ctx context.Context) string { type LimiterMiddleware struct { methodLimiters map[string]*ConcurrencyLimiter getLockKey GetLockKey + + acquiringSecondsMetric *prometheus.HistogramVec + inProgressMetric *prometheus.GaugeVec + queuedMetric *prometheus.GaugeVec } // New creates a new rate limiter -func New(getLockKey GetLockKey) *LimiterMiddleware { - return &LimiterMiddleware{ - methodLimiters: createLimiterConfig(), - getLockKey: getLockKey, +func New(cfg config.Cfg, getLockKey GetLockKey) *LimiterMiddleware { + middleware := &LimiterMiddleware{ + getLockKey: getLockKey, + + acquiringSecondsMetric: prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: "gitaly", + Subsystem: "rate_limiting", + Name: "acquiring_seconds", + Help: "Histogram of time calls are rate limited (in seconds)", + Buckets: cfg.Prometheus.GRPCLatencyBuckets, + }, + []string{"system", "grpc_service", "grpc_method"}, + ), + inProgressMetric: prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: "gitaly", + Subsystem: "rate_limiting", + Name: "in_progress", + Help: "Gauge of number of concurrent in-progress calls", + }, + []string{"system", "grpc_service", "grpc_method"}, + ), + queuedMetric: prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: "gitaly", + Subsystem: "rate_limiting", + Name: "queued", + Help: "Gauge of number of queued calls", + }, + []string{"system", "grpc_service", "grpc_method"}, + ), } + middleware.methodLimiters = createLimiterConfig(middleware, cfg) + return middleware +} + +// Describe is used to describe Prometheus metrics. +func (c *LimiterMiddleware) Describe(descs chan<- *prometheus.Desc) { + prometheus.DescribeByCollect(c, descs) +} + +// Collect is used to collect Prometheus metrics. +func (c *LimiterMiddleware) Collect(metrics chan<- prometheus.Metric) { + c.acquiringSecondsMetric.Collect(metrics) + c.inProgressMetric.Collect(metrics) + c.queuedMetric.Collect(metrics) } // UnaryInterceptor returns a Unary Interceptor @@ -71,21 +117,21 @@ func (c *LimiterMiddleware) StreamInterceptor() grpc.StreamServerInterceptor { } } -func createLimiterConfig() map[string]*ConcurrencyLimiter { +func createLimiterConfig(middleware *LimiterMiddleware, cfg config.Cfg) map[string]*ConcurrencyLimiter { result := make(map[string]*ConcurrencyLimiter) + for _, limit := range cfg.Concurrency { + result[limit.RPC] = NewLimiter(limit.MaxPerRepo, newPromMonitor(middleware, "gitaly", limit.RPC)) + } - for fullMethodName, max := range maxConcurrencyPerRepoPerRPC { - result[fullMethodName] = NewLimiter(max, NewPromMonitor("gitaly", fullMethodName)) + // Set default for ReplicateRepository. + replicateRepositoryFullMethod := "/gitaly.RepositoryService/ReplicateRepository" + if _, ok := result[replicateRepositoryFullMethod]; !ok { + result[replicateRepositoryFullMethod] = NewLimiter(1, newPromMonitor(middleware, "gitaly", replicateRepositoryFullMethod)) } return result } -// SetMaxRepoConcurrency Configures the max concurrency per repo per RPC -func SetMaxRepoConcurrency(config map[string]int) { - maxConcurrencyPerRepoPerRPC = config -} - type wrappedStream struct { grpc.ServerStream info *grpc.StreamServerInfo diff --git a/internal/middleware/limithandler/middleware_test.go b/internal/middleware/limithandler/middleware_test.go index 84df9f150..7f3c3149b 100644 --- a/internal/middleware/limithandler/middleware_test.go +++ b/internal/middleware/limithandler/middleware_test.go @@ -9,6 +9,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/config" "gitlab.com/gitlab-org/gitaly/v14/internal/middleware/limithandler" pb "gitlab.com/gitlab-org/gitaly/v14/internal/middleware/limithandler/testdata" "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper" @@ -26,8 +27,13 @@ func fixedLockKey(ctx context.Context) string { func TestUnaryLimitHandler(t *testing.T) { s := &server{blockCh: make(chan struct{})} - limithandler.SetMaxRepoConcurrency(map[string]int{"/test.limithandler.Test/Unary": 2}) - lh := limithandler.New(fixedLockKey) + cfg := config.Cfg{ + Concurrency: []config.Concurrency{ + {RPC: "/test.limithandler.Test/Unary", MaxPerRepo: 2}, + }, + } + + lh := limithandler.New(cfg, fixedLockKey) interceptor := lh.UnaryInterceptor() srv, serverSocketPath := runServer(t, s, grpc.UnaryInterceptor(interceptor)) defer srv.Stop() @@ -172,11 +178,13 @@ func TestStreamLimitHandler(t *testing.T) { t.Run(tc.desc, func(t *testing.T) { s := &server{blockCh: make(chan struct{})} - limithandler.SetMaxRepoConcurrency(map[string]int{ - tc.fullname: tc.maxConcurrency, - }) + cfg := config.Cfg{ + Concurrency: []config.Concurrency{ + {RPC: tc.fullname, MaxPerRepo: tc.maxConcurrency}, + }, + } - lh := limithandler.New(fixedLockKey) + lh := limithandler.New(cfg, fixedLockKey) interceptor := lh.StreamInterceptor() srv, serverSocketPath := runServer(t, s, grpc.StreamInterceptor(interceptor)) defer srv.Stop() diff --git a/internal/middleware/limithandler/monitor.go b/internal/middleware/limithandler/monitor.go index 8079eab37..40c3869dc 100644 --- a/internal/middleware/limithandler/monitor.go +++ b/internal/middleware/limithandler/monitor.go @@ -27,50 +27,44 @@ func (c *nullConcurrencyMonitor) Enter(ctx context.Context, acquireTime time.Dur func (c *nullConcurrencyMonitor) Exit(ctx context.Context) {} type promMonitor struct { - queuedGauge prometheus.Gauge - inprogressGauge prometheus.Gauge - histogram prometheus.Observer + queuedMetric prometheus.Gauge + inProgressMetric prometheus.Gauge + acquiringSecondsMetric prometheus.Observer } -// NewPromMonitor creates a new ConcurrencyMonitor that tracks limiter +// newPromMonitor creates a new ConcurrencyMonitor that tracks limiter // activity in Prometheus. -func NewPromMonitor(system string, fullMethod string) ConcurrencyMonitor { +func newPromMonitor(lh *LimiterMiddleware, system string, fullMethod string) ConcurrencyMonitor { serviceName, methodName := splitMethodName(fullMethod) - queuedGauge := queuedGaugeVec.WithLabelValues(system, serviceName, methodName) - inprogressGauge := inprogressGaugeVec.WithLabelValues(system, serviceName, methodName) - - var histogram prometheus.Observer - if histogramVec != nil { - histogram = histogramVec.WithLabelValues(system, serviceName, methodName) + return &promMonitor{ + lh.queuedMetric.WithLabelValues(system, serviceName, methodName), + lh.inProgressMetric.WithLabelValues(system, serviceName, methodName), + lh.acquiringSecondsMetric.WithLabelValues(system, serviceName, methodName), } - - return &promMonitor{queuedGauge, inprogressGauge, histogram} } func (c *promMonitor) Queued(ctx context.Context) { - c.queuedGauge.Inc() + c.queuedMetric.Inc() } func (c *promMonitor) Dequeued(ctx context.Context) { - c.queuedGauge.Dec() + c.queuedMetric.Dec() } func (c *promMonitor) Enter(ctx context.Context, acquireTime time.Duration) { - c.inprogressGauge.Inc() + c.inProgressMetric.Inc() if acquireTime > acquireDurationLogThreshold { logger := ctxlogrus.Extract(ctx) logger.WithField("acquire_ms", acquireTime.Seconds()*1000).Info("Rate limit acquire wait") } - if c.histogram != nil { - c.histogram.Observe(acquireTime.Seconds()) - } + c.acquiringSecondsMetric.Observe(acquireTime.Seconds()) } func (c *promMonitor) Exit(ctx context.Context) { - c.inprogressGauge.Dec() + c.inProgressMetric.Dec() } func splitMethodName(fullMethodName string) (string, string) { diff --git a/internal/testhelper/testserver/gitaly.go b/internal/testhelper/testserver/gitaly.go index 8428515de..56845d84d 100644 --- a/internal/testhelper/testserver/gitaly.go +++ b/internal/testhelper/testserver/gitaly.go @@ -361,7 +361,7 @@ func (gsd *gitalyServerDeps) createDependencies(t testing.TB, cfg config.Cfg, ru } if gsd.limitHandler == nil { - gsd.limitHandler = limithandler.New(limithandler.LimitConcurrencyByRepo) + gsd.limitHandler = limithandler.New(cfg, limithandler.LimitConcurrencyByRepo) } return &service.Dependencies{ |