Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPatrick Steinhardt <psteinhardt@gitlab.com>2021-12-14 16:01:23 +0300
committerPatrick Steinhardt <psteinhardt@gitlab.com>2021-12-15 10:15:46 +0300
commitbe7617b20bbeb81c1036443c6b176574cc3d3b8f (patch)
tree8b02eb47a96ddd42269e8424ca2927fbe75936b5
parentbbc8ddda6b33ee1afa88b328a5cf53ce7f5d64f0 (diff)
limithandler: Refactor package to not use global state
The limithandler package uses global variables to both track Prometheus metrics and to handle its configuration. Especially the latter is really fragile, where we need to set up state of the limithandler in various places by calling global functions. Fix this by making the `LimiterMiddleware` self-contained, where it hosts all configuration as well as the Prometheus metrics.
-rw-r--r--cmd/gitaly-ssh/auth_test.go2
-rw-r--r--cmd/gitaly/main.go3
-rw-r--r--internal/gitaly/config/concurrency.go22
-rw-r--r--internal/gitaly/config/prometheus/config.go3
-rw-r--r--internal/gitaly/server/auth_test.go4
-rw-r--r--internal/gitaly/server/server_factory_test.go10
-rw-r--r--internal/gitaly/service/repository/create_fork_test.go2
-rw-r--r--internal/middleware/limithandler/metrics.go45
-rw-r--r--internal/middleware/limithandler/middleware.go74
-rw-r--r--internal/middleware/limithandler/middleware_test.go20
-rw-r--r--internal/middleware/limithandler/monitor.go34
-rw-r--r--internal/testhelper/testserver/gitaly.go2
12 files changed, 100 insertions, 121 deletions
diff --git a/cmd/gitaly-ssh/auth_test.go b/cmd/gitaly-ssh/auth_test.go
index c15901765..3dd82bea1 100644
--- a/cmd/gitaly-ssh/auth_test.go
+++ b/cmd/gitaly-ssh/auth_test.go
@@ -155,7 +155,7 @@ func runServer(t *testing.T, secure bool, cfg config.Cfg, connectionType string,
t, gitlab.MockAllowed, gitlab.MockPreReceive, gitlab.MockPostReceive,
), cfg)
gitCmdFactory := git.NewExecCommandFactory(cfg)
- limitHandler := limithandler.New(limithandler.LimitConcurrencyByRepo)
+ limitHandler := limithandler.New(cfg, limithandler.LimitConcurrencyByRepo)
diskCache := cache.New(cfg, locator)
srv, err := server.New(secure, cfg, testhelper.NewDiscardingLogEntry(t), registry, diskCache, limitHandler)
require.NoError(t, err)
diff --git a/cmd/gitaly/main.go b/cmd/gitaly/main.go
index 2699fa5de..8b0e0da6e 100644
--- a/cmd/gitaly/main.go
+++ b/cmd/gitaly/main.go
@@ -192,7 +192,8 @@ func run(cfg config.Cfg) error {
return fmt.Errorf("disk cache walkers: %w", err)
}
- limitHandler := limithandler.New(limithandler.LimitConcurrencyByRepo)
+ limitHandler := limithandler.New(cfg, limithandler.LimitConcurrencyByRepo)
+ prometheus.MustRegister(limitHandler)
gitalyServerFactory := server.NewGitalyServerFactory(cfg, glog.Default(), registry, diskCache, limitHandler)
defer gitalyServerFactory.Stop()
diff --git a/internal/gitaly/config/concurrency.go b/internal/gitaly/config/concurrency.go
deleted file mode 100644
index 1c54a0250..000000000
--- a/internal/gitaly/config/concurrency.go
+++ /dev/null
@@ -1,22 +0,0 @@
-package config
-
-import (
- "gitlab.com/gitlab-org/gitaly/v14/internal/middleware/limithandler"
-)
-
-// ConfigureConcurrencyLimits configures the per-repo, per RPC rate limits
-func ConfigureConcurrencyLimits(cfg Cfg) {
- maxConcurrencyPerRepoPerRPC := make(map[string]int)
-
- for _, v := range cfg.Concurrency {
- maxConcurrencyPerRepoPerRPC[v.RPC] = v.MaxPerRepo
- }
-
- // Set default for ReplicateRepository
- replicateRepositoryFullMethod := "/gitaly.RepositoryService/ReplicateRepository"
- if _, ok := maxConcurrencyPerRepoPerRPC[replicateRepositoryFullMethod]; !ok {
- maxConcurrencyPerRepoPerRPC[replicateRepositoryFullMethod] = 1
- }
-
- limithandler.SetMaxRepoConcurrency(maxConcurrencyPerRepoPerRPC)
-}
diff --git a/internal/gitaly/config/prometheus/config.go b/internal/gitaly/config/prometheus/config.go
index 8d3c91fc4..4873eec1d 100644
--- a/internal/gitaly/config/prometheus/config.go
+++ b/internal/gitaly/config/prometheus/config.go
@@ -6,7 +6,6 @@ import (
grpcprometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
"github.com/prometheus/client_golang/prometheus"
log "github.com/sirupsen/logrus"
- "gitlab.com/gitlab-org/gitaly/v14/internal/middleware/limithandler"
)
// Config contains additional configuration data for prometheus
@@ -40,6 +39,4 @@ func (c *Config) Configure() {
grpcprometheus.EnableClientHandlingTimeHistogram(func(histogramOpts *prometheus.HistogramOpts) {
histogramOpts.Buckets = c.GRPCLatencyBuckets
})
-
- limithandler.EnableAcquireTimeHistogram(c.GRPCLatencyBuckets)
}
diff --git a/internal/gitaly/server/auth_test.go b/internal/gitaly/server/auth_test.go
index 907f8f3af..44ffcf08c 100644
--- a/internal/gitaly/server/auth_test.go
+++ b/internal/gitaly/server/auth_test.go
@@ -201,7 +201,7 @@ func runServer(t *testing.T, cfg config.Cfg) string {
catfileCache := catfile.NewCache(cfg)
t.Cleanup(catfileCache.Stop)
diskCache := cache.New(cfg, locator)
- limitHandler := limithandler.New(limithandler.LimitConcurrencyByRepo)
+ limitHandler := limithandler.New(cfg, limithandler.LimitConcurrencyByRepo)
srv, err := New(false, cfg, testhelper.NewDiscardingLogEntry(t), registry, diskCache, limitHandler)
require.NoError(t, err)
@@ -243,7 +243,7 @@ func runSecureServer(t *testing.T, cfg config.Cfg) string {
testhelper.NewDiscardingLogEntry(t),
backchannel.NewRegistry(),
cache.New(cfg, config.NewLocator(cfg)),
- limithandler.New(limithandler.LimitConcurrencyByRepo),
+ limithandler.New(cfg, limithandler.LimitConcurrencyByRepo),
)
require.NoError(t, err)
diff --git a/internal/gitaly/server/server_factory_test.go b/internal/gitaly/server/server_factory_test.go
index 7a16fe4b7..6e29b9cd3 100644
--- a/internal/gitaly/server/server_factory_test.go
+++ b/internal/gitaly/server/server_factory_test.go
@@ -94,7 +94,7 @@ func TestGitalyServerFactory(t *testing.T) {
testhelper.NewDiscardingLogEntry(t),
backchannel.NewRegistry(),
cache.New(cfg, config.NewLocator(cfg)),
- limithandler.New(limithandler.LimitConcurrencyByRepo),
+ limithandler.New(cfg, limithandler.LimitConcurrencyByRepo),
)
checkHealth(t, sf, starter.TCP, "localhost:0")
@@ -113,7 +113,7 @@ func TestGitalyServerFactory(t *testing.T) {
testhelper.NewDiscardingLogEntry(t),
backchannel.NewRegistry(),
cache.New(cfg, config.NewLocator(cfg)),
- limithandler.New(limithandler.LimitConcurrencyByRepo),
+ limithandler.New(cfg, limithandler.LimitConcurrencyByRepo),
)
t.Cleanup(sf.Stop)
@@ -127,7 +127,7 @@ func TestGitalyServerFactory(t *testing.T) {
testhelper.NewDiscardingLogEntry(t),
backchannel.NewRegistry(),
cache.New(cfg, config.NewLocator(cfg)),
- limithandler.New(limithandler.LimitConcurrencyByRepo),
+ limithandler.New(cfg, limithandler.LimitConcurrencyByRepo),
)
t.Cleanup(sf.Stop)
@@ -155,7 +155,7 @@ func TestGitalyServerFactory(t *testing.T) {
logger.WithContext(ctx),
backchannel.NewRegistry(),
cache.New(cfg, config.NewLocator(cfg)),
- limithandler.New(limithandler.LimitConcurrencyByRepo),
+ limithandler.New(cfg, limithandler.LimitConcurrencyByRepo),
)
checkHealth(t, sf, starter.TCP, "localhost:0")
@@ -190,7 +190,7 @@ func TestGitalyServerFactory_closeOrder(t *testing.T) {
testhelper.NewDiscardingLogEntry(t),
backchannel.NewRegistry(),
cache.New(cfg, config.NewLocator(cfg)),
- limithandler.New(limithandler.LimitConcurrencyByRepo),
+ limithandler.New(cfg, limithandler.LimitConcurrencyByRepo),
)
defer sf.Stop()
diff --git a/internal/gitaly/service/repository/create_fork_test.go b/internal/gitaly/service/repository/create_fork_test.go
index 28512be86..ec8721697 100644
--- a/internal/gitaly/service/repository/create_fork_test.go
+++ b/internal/gitaly/service/repository/create_fork_test.go
@@ -271,7 +271,7 @@ func runSecureServer(t *testing.T, cfg config.Cfg, rubySrv *rubyserver.Server) s
registry := backchannel.NewRegistry()
locator := config.NewLocator(cfg)
cache := cache.New(cfg, locator)
- limitHandler := limithandler.New(limithandler.LimitConcurrencyByRepo)
+ limitHandler := limithandler.New(cfg, limithandler.LimitConcurrencyByRepo)
server, err := gserver.New(true, cfg, testhelper.NewDiscardingLogEntry(t), registry, cache, limitHandler)
require.NoError(t, err)
listener, addr := testhelper.GetLocalhostListener(t)
diff --git a/internal/middleware/limithandler/metrics.go b/internal/middleware/limithandler/metrics.go
deleted file mode 100644
index 77fd77137..000000000
--- a/internal/middleware/limithandler/metrics.go
+++ /dev/null
@@ -1,45 +0,0 @@
-package limithandler
-
-import (
- "github.com/prometheus/client_golang/prometheus"
- "github.com/prometheus/client_golang/prometheus/promauto"
-)
-
-var (
- histogramVec *prometheus.HistogramVec
- inprogressGaugeVec = promauto.NewGaugeVec(
- prometheus.GaugeOpts{
- Namespace: "gitaly",
- Subsystem: "rate_limiting",
- Name: "in_progress",
- Help: "Gauge of number of concurrent in-progress calls",
- },
- []string{"system", "grpc_service", "grpc_method"},
- )
-
- queuedGaugeVec = promauto.NewGaugeVec(
- prometheus.GaugeOpts{
- Namespace: "gitaly",
- Subsystem: "rate_limiting",
- Name: "queued",
- Help: "Gauge of number of queued calls",
- },
- []string{"system", "grpc_service", "grpc_method"},
- )
-)
-
-// EnableAcquireTimeHistogram enables histograms for acquisition times
-func EnableAcquireTimeHistogram(buckets []float64) {
- histogramOpts := prometheus.HistogramOpts{
- Namespace: "gitaly",
- Subsystem: "rate_limiting",
- Name: "acquiring_seconds",
- Help: "Histogram of time calls are rate limited (in seconds)",
- Buckets: buckets,
- }
-
- histogramVec = promauto.NewHistogramVec(
- histogramOpts,
- []string{"system", "grpc_service", "grpc_method"},
- )
-}
diff --git a/internal/middleware/limithandler/middleware.go b/internal/middleware/limithandler/middleware.go
index f5e14b30e..febc25576 100644
--- a/internal/middleware/limithandler/middleware.go
+++ b/internal/middleware/limithandler/middleware.go
@@ -5,11 +5,11 @@ import (
"github.com/grpc-ecosystem/go-grpc-middleware/logging/logrus/ctxlogrus"
grpcmwtags "github.com/grpc-ecosystem/go-grpc-middleware/tags"
+ "github.com/prometheus/client_golang/prometheus"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/config"
"google.golang.org/grpc"
)
-var maxConcurrencyPerRepoPerRPC map[string]int
-
// GetLockKey function defines the lock key of an RPC invocation based on its context
type GetLockKey func(context.Context) string
@@ -33,14 +33,60 @@ func LimitConcurrencyByRepo(ctx context.Context) string {
type LimiterMiddleware struct {
methodLimiters map[string]*ConcurrencyLimiter
getLockKey GetLockKey
+
+ acquiringSecondsMetric *prometheus.HistogramVec
+ inProgressMetric *prometheus.GaugeVec
+ queuedMetric *prometheus.GaugeVec
}
// New creates a new rate limiter
-func New(getLockKey GetLockKey) *LimiterMiddleware {
- return &LimiterMiddleware{
- methodLimiters: createLimiterConfig(),
- getLockKey: getLockKey,
+func New(cfg config.Cfg, getLockKey GetLockKey) *LimiterMiddleware {
+ middleware := &LimiterMiddleware{
+ getLockKey: getLockKey,
+
+ acquiringSecondsMetric: prometheus.NewHistogramVec(
+ prometheus.HistogramOpts{
+ Namespace: "gitaly",
+ Subsystem: "rate_limiting",
+ Name: "acquiring_seconds",
+ Help: "Histogram of time calls are rate limited (in seconds)",
+ Buckets: cfg.Prometheus.GRPCLatencyBuckets,
+ },
+ []string{"system", "grpc_service", "grpc_method"},
+ ),
+ inProgressMetric: prometheus.NewGaugeVec(
+ prometheus.GaugeOpts{
+ Namespace: "gitaly",
+ Subsystem: "rate_limiting",
+ Name: "in_progress",
+ Help: "Gauge of number of concurrent in-progress calls",
+ },
+ []string{"system", "grpc_service", "grpc_method"},
+ ),
+ queuedMetric: prometheus.NewGaugeVec(
+ prometheus.GaugeOpts{
+ Namespace: "gitaly",
+ Subsystem: "rate_limiting",
+ Name: "queued",
+ Help: "Gauge of number of queued calls",
+ },
+ []string{"system", "grpc_service", "grpc_method"},
+ ),
}
+ middleware.methodLimiters = createLimiterConfig(middleware, cfg)
+ return middleware
+}
+
+// Describe is used to describe Prometheus metrics.
+func (c *LimiterMiddleware) Describe(descs chan<- *prometheus.Desc) {
+ prometheus.DescribeByCollect(c, descs)
+}
+
+// Collect is used to collect Prometheus metrics.
+func (c *LimiterMiddleware) Collect(metrics chan<- prometheus.Metric) {
+ c.acquiringSecondsMetric.Collect(metrics)
+ c.inProgressMetric.Collect(metrics)
+ c.queuedMetric.Collect(metrics)
}
// UnaryInterceptor returns a Unary Interceptor
@@ -71,21 +117,21 @@ func (c *LimiterMiddleware) StreamInterceptor() grpc.StreamServerInterceptor {
}
}
-func createLimiterConfig() map[string]*ConcurrencyLimiter {
+func createLimiterConfig(middleware *LimiterMiddleware, cfg config.Cfg) map[string]*ConcurrencyLimiter {
result := make(map[string]*ConcurrencyLimiter)
+ for _, limit := range cfg.Concurrency {
+ result[limit.RPC] = NewLimiter(limit.MaxPerRepo, newPromMonitor(middleware, "gitaly", limit.RPC))
+ }
- for fullMethodName, max := range maxConcurrencyPerRepoPerRPC {
- result[fullMethodName] = NewLimiter(max, NewPromMonitor("gitaly", fullMethodName))
+ // Set default for ReplicateRepository.
+ replicateRepositoryFullMethod := "/gitaly.RepositoryService/ReplicateRepository"
+ if _, ok := result[replicateRepositoryFullMethod]; !ok {
+ result[replicateRepositoryFullMethod] = NewLimiter(1, newPromMonitor(middleware, "gitaly", replicateRepositoryFullMethod))
}
return result
}
-// SetMaxRepoConcurrency Configures the max concurrency per repo per RPC
-func SetMaxRepoConcurrency(config map[string]int) {
- maxConcurrencyPerRepoPerRPC = config
-}
-
type wrappedStream struct {
grpc.ServerStream
info *grpc.StreamServerInfo
diff --git a/internal/middleware/limithandler/middleware_test.go b/internal/middleware/limithandler/middleware_test.go
index 84df9f150..7f3c3149b 100644
--- a/internal/middleware/limithandler/middleware_test.go
+++ b/internal/middleware/limithandler/middleware_test.go
@@ -9,6 +9,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/config"
"gitlab.com/gitlab-org/gitaly/v14/internal/middleware/limithandler"
pb "gitlab.com/gitlab-org/gitaly/v14/internal/middleware/limithandler/testdata"
"gitlab.com/gitlab-org/gitaly/v14/internal/testhelper"
@@ -26,8 +27,13 @@ func fixedLockKey(ctx context.Context) string {
func TestUnaryLimitHandler(t *testing.T) {
s := &server{blockCh: make(chan struct{})}
- limithandler.SetMaxRepoConcurrency(map[string]int{"/test.limithandler.Test/Unary": 2})
- lh := limithandler.New(fixedLockKey)
+ cfg := config.Cfg{
+ Concurrency: []config.Concurrency{
+ {RPC: "/test.limithandler.Test/Unary", MaxPerRepo: 2},
+ },
+ }
+
+ lh := limithandler.New(cfg, fixedLockKey)
interceptor := lh.UnaryInterceptor()
srv, serverSocketPath := runServer(t, s, grpc.UnaryInterceptor(interceptor))
defer srv.Stop()
@@ -172,11 +178,13 @@ func TestStreamLimitHandler(t *testing.T) {
t.Run(tc.desc, func(t *testing.T) {
s := &server{blockCh: make(chan struct{})}
- limithandler.SetMaxRepoConcurrency(map[string]int{
- tc.fullname: tc.maxConcurrency,
- })
+ cfg := config.Cfg{
+ Concurrency: []config.Concurrency{
+ {RPC: tc.fullname, MaxPerRepo: tc.maxConcurrency},
+ },
+ }
- lh := limithandler.New(fixedLockKey)
+ lh := limithandler.New(cfg, fixedLockKey)
interceptor := lh.StreamInterceptor()
srv, serverSocketPath := runServer(t, s, grpc.StreamInterceptor(interceptor))
defer srv.Stop()
diff --git a/internal/middleware/limithandler/monitor.go b/internal/middleware/limithandler/monitor.go
index 8079eab37..40c3869dc 100644
--- a/internal/middleware/limithandler/monitor.go
+++ b/internal/middleware/limithandler/monitor.go
@@ -27,50 +27,44 @@ func (c *nullConcurrencyMonitor) Enter(ctx context.Context, acquireTime time.Dur
func (c *nullConcurrencyMonitor) Exit(ctx context.Context) {}
type promMonitor struct {
- queuedGauge prometheus.Gauge
- inprogressGauge prometheus.Gauge
- histogram prometheus.Observer
+ queuedMetric prometheus.Gauge
+ inProgressMetric prometheus.Gauge
+ acquiringSecondsMetric prometheus.Observer
}
-// NewPromMonitor creates a new ConcurrencyMonitor that tracks limiter
+// newPromMonitor creates a new ConcurrencyMonitor that tracks limiter
// activity in Prometheus.
-func NewPromMonitor(system string, fullMethod string) ConcurrencyMonitor {
+func newPromMonitor(lh *LimiterMiddleware, system string, fullMethod string) ConcurrencyMonitor {
serviceName, methodName := splitMethodName(fullMethod)
- queuedGauge := queuedGaugeVec.WithLabelValues(system, serviceName, methodName)
- inprogressGauge := inprogressGaugeVec.WithLabelValues(system, serviceName, methodName)
-
- var histogram prometheus.Observer
- if histogramVec != nil {
- histogram = histogramVec.WithLabelValues(system, serviceName, methodName)
+ return &promMonitor{
+ lh.queuedMetric.WithLabelValues(system, serviceName, methodName),
+ lh.inProgressMetric.WithLabelValues(system, serviceName, methodName),
+ lh.acquiringSecondsMetric.WithLabelValues(system, serviceName, methodName),
}
-
- return &promMonitor{queuedGauge, inprogressGauge, histogram}
}
func (c *promMonitor) Queued(ctx context.Context) {
- c.queuedGauge.Inc()
+ c.queuedMetric.Inc()
}
func (c *promMonitor) Dequeued(ctx context.Context) {
- c.queuedGauge.Dec()
+ c.queuedMetric.Dec()
}
func (c *promMonitor) Enter(ctx context.Context, acquireTime time.Duration) {
- c.inprogressGauge.Inc()
+ c.inProgressMetric.Inc()
if acquireTime > acquireDurationLogThreshold {
logger := ctxlogrus.Extract(ctx)
logger.WithField("acquire_ms", acquireTime.Seconds()*1000).Info("Rate limit acquire wait")
}
- if c.histogram != nil {
- c.histogram.Observe(acquireTime.Seconds())
- }
+ c.acquiringSecondsMetric.Observe(acquireTime.Seconds())
}
func (c *promMonitor) Exit(ctx context.Context) {
- c.inprogressGauge.Dec()
+ c.inProgressMetric.Dec()
}
func splitMethodName(fullMethodName string) (string, string) {
diff --git a/internal/testhelper/testserver/gitaly.go b/internal/testhelper/testserver/gitaly.go
index 8428515de..56845d84d 100644
--- a/internal/testhelper/testserver/gitaly.go
+++ b/internal/testhelper/testserver/gitaly.go
@@ -361,7 +361,7 @@ func (gsd *gitalyServerDeps) createDependencies(t testing.TB, cfg config.Cfg, ru
}
if gsd.limitHandler == nil {
- gsd.limitHandler = limithandler.New(limithandler.LimitConcurrencyByRepo)
+ gsd.limitHandler = limithandler.New(cfg, limithandler.LimitConcurrencyByRepo)
}
return &service.Dependencies{