Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJohn Cai <jcai@gitlab.com>2022-03-21 21:20:06 +0300
committerJohn Cai <jcai@gitlab.com>2022-04-05 20:57:43 +0300
commit31259d9d793cce6f62a1b1f8e6a4649072742c00 (patch)
tree6e4e86bfc1edef0559017e3cd454d1005e3723dc
parentaad545f661c295bcff422424e76abe7c2fd85a10 (diff)
limithandler: Pave the way for a second limiter type
A future commit will add a new middleware that will limit based on the rate rather than concurrent calls. There is a good amount of logic currently used by the concurrency limiter that can be reused since a rate limiter is also operating on incoming requests based on RPC name. To make easier to add this new limiter type in the future, refactor the code by adding some abstractions easier to add another type of limiter.
-rw-r--r--cmd/gitaly-ssh/auth_test.go2
-rw-r--r--cmd/gitaly/main.go2
-rw-r--r--internal/gitaly/server/auth_test.go4
-rw-r--r--internal/gitaly/server/server_factory_test.go10
-rw-r--r--internal/gitaly/service/repository/create_fork_test.go2
-rw-r--r--internal/middleware/limithandler/concurrency_limiter.go86
-rw-r--r--internal/middleware/limithandler/concurrency_limiter_test.go8
-rw-r--r--internal/middleware/limithandler/middleware.go103
-rw-r--r--internal/middleware/limithandler/middleware_test.go8
-rw-r--r--internal/middleware/limithandler/monitor.go15
-rw-r--r--internal/testhelper/testserver/gitaly.go2
11 files changed, 133 insertions, 109 deletions
diff --git a/cmd/gitaly-ssh/auth_test.go b/cmd/gitaly-ssh/auth_test.go
index cc3b9fd29..5b8cb9ee8 100644
--- a/cmd/gitaly-ssh/auth_test.go
+++ b/cmd/gitaly-ssh/auth_test.go
@@ -150,7 +150,7 @@ func runServer(t *testing.T, secure bool, cfg config.Cfg, connectionType string,
hookManager := hook.NewManager(cfg, locator, gitCmdFactory, txManager, gitlab.NewMockClient(
t, gitlab.MockAllowed, gitlab.MockPreReceive, gitlab.MockPostReceive,
))
- limitHandler := limithandler.New(cfg, limithandler.LimitConcurrencyByRepo)
+ limitHandler := limithandler.New(cfg, limithandler.LimitConcurrencyByRepo, limithandler.WithConcurrencyLimiters)
diskCache := cache.New(cfg, locator)
srv, err := server.New(secure, cfg, testhelper.NewDiscardingLogEntry(t), registry, diskCache, limitHandler)
require.NoError(t, err)
diff --git a/cmd/gitaly/main.go b/cmd/gitaly/main.go
index a0e231cb1..bba3d4ca8 100644
--- a/cmd/gitaly/main.go
+++ b/cmd/gitaly/main.go
@@ -213,7 +213,7 @@ func run(cfg config.Cfg) error {
return fmt.Errorf("disk cache walkers: %w", err)
}
- limitHandler := limithandler.New(cfg, limithandler.LimitConcurrencyByRepo)
+ limitHandler := limithandler.New(cfg, limithandler.LimitConcurrencyByRepo, limithandler.WithConcurrencyLimiters)
prometheus.MustRegister(limitHandler)
gitalyServerFactory := server.NewGitalyServerFactory(cfg, glog.Default(), registry, diskCache, limitHandler)
diff --git a/internal/gitaly/server/auth_test.go b/internal/gitaly/server/auth_test.go
index 82be922fd..b54fca091 100644
--- a/internal/gitaly/server/auth_test.go
+++ b/internal/gitaly/server/auth_test.go
@@ -200,7 +200,7 @@ func runServer(t *testing.T, cfg config.Cfg) string {
catfileCache := catfile.NewCache(cfg)
t.Cleanup(catfileCache.Stop)
diskCache := cache.New(cfg, locator)
- limitHandler := limithandler.New(cfg, limithandler.LimitConcurrencyByRepo)
+ limitHandler := limithandler.New(cfg, limithandler.LimitConcurrencyByRepo, limithandler.WithConcurrencyLimiters)
updaterWithHooks := updateref.NewUpdaterWithHooks(cfg, locator, hookManager, gitCmdFactory, catfileCache)
srv, err := New(false, cfg, testhelper.NewDiscardingLogEntry(t), registry, diskCache, limitHandler)
@@ -244,7 +244,7 @@ func runSecureServer(t *testing.T, cfg config.Cfg) string {
testhelper.NewDiscardingLogEntry(t),
backchannel.NewRegistry(),
cache.New(cfg, config.NewLocator(cfg)),
- limithandler.New(cfg, limithandler.LimitConcurrencyByRepo),
+ limithandler.New(cfg, limithandler.LimitConcurrencyByRepo, limithandler.WithConcurrencyLimiters),
)
require.NoError(t, err)
diff --git a/internal/gitaly/server/server_factory_test.go b/internal/gitaly/server/server_factory_test.go
index c2afcd7c6..4d560250c 100644
--- a/internal/gitaly/server/server_factory_test.go
+++ b/internal/gitaly/server/server_factory_test.go
@@ -93,7 +93,7 @@ func TestGitalyServerFactory(t *testing.T) {
testhelper.NewDiscardingLogEntry(t),
backchannel.NewRegistry(),
cache.New(cfg, config.NewLocator(cfg)),
- limithandler.New(cfg, limithandler.LimitConcurrencyByRepo),
+ limithandler.New(cfg, limithandler.LimitConcurrencyByRepo, limithandler.WithConcurrencyLimiters),
)
checkHealth(t, sf, starter.TCP, "localhost:0")
@@ -112,7 +112,7 @@ func TestGitalyServerFactory(t *testing.T) {
testhelper.NewDiscardingLogEntry(t),
backchannel.NewRegistry(),
cache.New(cfg, config.NewLocator(cfg)),
- limithandler.New(cfg, limithandler.LimitConcurrencyByRepo),
+ limithandler.New(cfg, limithandler.LimitConcurrencyByRepo, limithandler.WithConcurrencyLimiters),
)
t.Cleanup(sf.Stop)
@@ -126,7 +126,7 @@ func TestGitalyServerFactory(t *testing.T) {
testhelper.NewDiscardingLogEntry(t),
backchannel.NewRegistry(),
cache.New(cfg, config.NewLocator(cfg)),
- limithandler.New(cfg, limithandler.LimitConcurrencyByRepo),
+ limithandler.New(cfg, limithandler.LimitConcurrencyByRepo, limithandler.WithConcurrencyLimiters),
)
t.Cleanup(sf.Stop)
@@ -156,7 +156,7 @@ func TestGitalyServerFactory(t *testing.T) {
logger.WithContext(ctx),
backchannel.NewRegistry(),
cache.New(cfg, config.NewLocator(cfg)),
- limithandler.New(cfg, limithandler.LimitConcurrencyByRepo),
+ limithandler.New(cfg, limithandler.LimitConcurrencyByRepo, limithandler.WithConcurrencyLimiters),
)
checkHealth(t, sf, starter.TCP, "localhost:0")
@@ -190,7 +190,7 @@ func TestGitalyServerFactory_closeOrder(t *testing.T) {
testhelper.NewDiscardingLogEntry(t),
backchannel.NewRegistry(),
cache.New(cfg, config.NewLocator(cfg)),
- limithandler.New(cfg, limithandler.LimitConcurrencyByRepo),
+ limithandler.New(cfg, limithandler.LimitConcurrencyByRepo, limithandler.WithConcurrencyLimiters),
)
defer sf.Stop()
diff --git a/internal/gitaly/service/repository/create_fork_test.go b/internal/gitaly/service/repository/create_fork_test.go
index b8ea85654..b3d0ceaa3 100644
--- a/internal/gitaly/service/repository/create_fork_test.go
+++ b/internal/gitaly/service/repository/create_fork_test.go
@@ -259,7 +259,7 @@ func runSecureServer(t *testing.T, cfg config.Cfg, rubySrv *rubyserver.Server) s
registry := backchannel.NewRegistry()
locator := config.NewLocator(cfg)
cache := cache.New(cfg, locator)
- limitHandler := limithandler.New(cfg, limithandler.LimitConcurrencyByRepo)
+ limitHandler := limithandler.New(cfg, limithandler.LimitConcurrencyByRepo, limithandler.WithConcurrencyLimiters)
server, err := gserver.New(true, cfg, testhelper.NewDiscardingLogEntry(t), registry, cache, limitHandler)
require.NoError(t, err)
listener, addr := testhelper.GetLocalhostListener(t)
diff --git a/internal/middleware/limithandler/concurrency_limiter.go b/internal/middleware/limithandler/concurrency_limiter.go
index 9dbf87b75..92802634f 100644
--- a/internal/middleware/limithandler/concurrency_limiter.go
+++ b/internal/middleware/limithandler/concurrency_limiter.go
@@ -7,6 +7,8 @@ import (
"sync"
"time"
+ "github.com/prometheus/client_golang/prometheus"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/config"
"gitlab.com/gitlab-org/gitaly/v14/internal/helper"
"gitlab.com/gitlab-org/gitaly/v14/internal/metadata/featureflag"
)
@@ -18,9 +20,6 @@ var ErrMaxQueueTime = errors.New("maximum time in concurrency queue reached")
// ErrMaxQueueSize indicates the concurrency queue has reached its maximum size
var ErrMaxQueueSize = errors.New("maximum queue size reached")
-// LimitedFunc represents a function that will be limited
-type LimitedFunc func() (resp interface{}, err error)
-
// QueueTickerCreator is a function that provides a ticker
type QueueTickerCreator func() helper.Ticker
@@ -176,8 +175,8 @@ func (c *ConcurrencyLimiter) Limit(ctx context.Context, lockKey string, f Limite
return f()
}
-// NewLimiter creates a new rate limiter
-func NewLimiter(perKeyLimit, globalLimit int, maxWaitTickerGetter QueueTickerCreator, monitor ConcurrencyMonitor) *ConcurrencyLimiter {
+// NewConcurrencyLimiter creates a new concurrency rate limiter
+func NewConcurrencyLimiter(perKeyLimit, globalLimit int, maxWaitTickerGetter QueueTickerCreator, monitor ConcurrencyMonitor) *ConcurrencyLimiter {
if monitor == nil {
monitor = &nullConcurrencyMonitor{}
}
@@ -190,3 +189,80 @@ func NewLimiter(perKeyLimit, globalLimit int, maxWaitTickerGetter QueueTickerCre
maxWaitTickerGetter: maxWaitTickerGetter,
}
}
+
+// WithConcurrencyLimiters sets up middleware to limit the concurrency of
+// requests based on RPC and repository
+func WithConcurrencyLimiters(cfg config.Cfg, middleware *LimiterMiddleware) {
+ acquiringSecondsMetric := prometheus.NewHistogramVec(
+ prometheus.HistogramOpts{
+ Namespace: "gitaly",
+ Subsystem: "rate_limiting",
+ Name: "acquiring_seconds",
+ Help: "Histogram of time calls are rate limited (in seconds)",
+ Buckets: cfg.Prometheus.GRPCLatencyBuckets,
+ },
+ []string{"system", "grpc_service", "grpc_method"},
+ )
+ inProgressMetric := prometheus.NewGaugeVec(
+ prometheus.GaugeOpts{
+ Namespace: "gitaly",
+ Subsystem: "rate_limiting",
+ Name: "in_progress",
+ Help: "Gauge of number of concurrent in-progress calls",
+ },
+ []string{"system", "grpc_service", "grpc_method"},
+ )
+ queuedMetric := prometheus.NewGaugeVec(
+ prometheus.GaugeOpts{
+ Namespace: "gitaly",
+ Subsystem: "rate_limiting",
+ Name: "queued",
+ Help: "Gauge of number of queued calls",
+ },
+ []string{"system", "grpc_service", "grpc_method"},
+ )
+
+ middleware.collect = func(metrics chan<- prometheus.Metric) {
+ acquiringSecondsMetric.Collect(metrics)
+ inProgressMetric.Collect(metrics)
+ queuedMetric.Collect(metrics)
+ }
+
+ result := make(map[string]Limiter)
+
+ newTickerFunc := func() helper.Ticker {
+ return helper.NewManualTicker()
+ }
+
+ for _, limit := range cfg.Concurrency {
+ if limit.MaxQueueWait > 0 {
+ limit := limit
+ newTickerFunc = func() helper.Ticker {
+ return helper.NewTimerTicker(limit.MaxQueueWait.Duration())
+ }
+ }
+
+ result[limit.RPC] = NewConcurrencyLimiter(
+ limit.MaxPerRepo,
+ limit.MaxQueueSize,
+ newTickerFunc,
+ newPromMonitor("gitaly", limit.RPC, queuedMetric, inProgressMetric,
+ acquiringSecondsMetric, middleware.requestsDroppedMetric),
+ )
+ }
+
+ // Set default for ReplicateRepository.
+ replicateRepositoryFullMethod := "/gitaly.RepositoryService/ReplicateRepository"
+ if _, ok := result[replicateRepositoryFullMethod]; !ok {
+ result[replicateRepositoryFullMethod] = NewConcurrencyLimiter(
+ 1,
+ 0,
+ func() helper.Ticker {
+ return helper.NewManualTicker()
+ },
+ newPromMonitor("gitaly", replicateRepositoryFullMethod, queuedMetric,
+ inProgressMetric, acquiringSecondsMetric, middleware.requestsDroppedMetric))
+ }
+
+ middleware.methodLimiters = result
+}
diff --git a/internal/middleware/limithandler/concurrency_limiter_test.go b/internal/middleware/limithandler/concurrency_limiter_test.go
index bbeda4d76..4a13a3cd4 100644
--- a/internal/middleware/limithandler/concurrency_limiter_test.go
+++ b/internal/middleware/limithandler/concurrency_limiter_test.go
@@ -150,7 +150,7 @@ func TestLimiter(t *testing.T) {
gauge := &counter{}
- limiter := NewLimiter(
+ limiter := NewConcurrencyLimiter(
tt.maxConcurrency,
0,
nil,
@@ -266,7 +266,7 @@ func TestConcurrencyLimiter_queueLimit(t *testing.T) {
monitorCh := make(chan struct{})
monitor := &blockingQueueCounter{queuedCh: monitorCh}
ch := make(chan struct{})
- limiter := NewLimiter(1, queueLimit, nil, monitor)
+ limiter := NewConcurrencyLimiter(1, queueLimit, nil, monitor)
// occupied with one live request that takes a long time to complete
go func() {
@@ -355,7 +355,7 @@ func TestLimitConcurrency_queueWaitTime(t *testing.T) {
dequeuedCh := make(chan struct{})
monitor := &blockingDequeueCounter{dequeuedCh: dequeuedCh}
- limiter := NewLimiter(
+ limiter := NewConcurrencyLimiter(
1,
0,
func() helper.Ticker {
@@ -409,7 +409,7 @@ func TestLimitConcurrency_queueWaitTime(t *testing.T) {
dequeuedCh := make(chan struct{})
monitor := &blockingDequeueCounter{dequeuedCh: dequeuedCh}
- limiter := NewLimiter(
+ limiter := NewConcurrencyLimiter(
1,
0,
func() helper.Ticker {
diff --git a/internal/middleware/limithandler/middleware.go b/internal/middleware/limithandler/middleware.go
index ac33ff4b1..0d4ff6bbc 100644
--- a/internal/middleware/limithandler/middleware.go
+++ b/internal/middleware/limithandler/middleware.go
@@ -7,7 +7,6 @@ import (
grpcmwtags "github.com/grpc-ecosystem/go-grpc-middleware/tags"
"github.com/prometheus/client_golang/prometheus"
"gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/config"
- "gitlab.com/gitlab-org/gitaly/v14/internal/helper"
"google.golang.org/grpc"
)
@@ -30,50 +29,27 @@ func LimitConcurrencyByRepo(ctx context.Context) string {
return ""
}
+// Limiter limits incoming requests
+type Limiter interface {
+ Limit(ctx context.Context, lockKey string, f LimitedFunc) (interface{}, error)
+}
+
+// LimitedFunc represents a function that will be limited
+type LimitedFunc func() (resp interface{}, err error)
+
// LimiterMiddleware contains rate limiter state
type LimiterMiddleware struct {
- methodLimiters map[string]*ConcurrencyLimiter
- getLockKey GetLockKey
-
- acquiringSecondsMetric *prometheus.HistogramVec
- inProgressMetric *prometheus.GaugeVec
- queuedMetric *prometheus.GaugeVec
- requestsDroppedMetric *prometheus.CounterVec
+ methodLimiters map[string]Limiter
+ getLockKey GetLockKey
+ requestsDroppedMetric *prometheus.CounterVec
+ collect func(metrics chan<- prometheus.Metric)
}
-// New creates a new rate limiter
-func New(cfg config.Cfg, getLockKey GetLockKey) *LimiterMiddleware {
+// New creates a new middleware that limits requests. SetupFunc sets up the
+// middlware with a specific kind of limiter.
+func New(cfg config.Cfg, getLockKey GetLockKey, setupMiddleware SetupFunc) *LimiterMiddleware {
middleware := &LimiterMiddleware{
getLockKey: getLockKey,
-
- acquiringSecondsMetric: prometheus.NewHistogramVec(
- prometheus.HistogramOpts{
- Namespace: "gitaly",
- Subsystem: "rate_limiting",
- Name: "acquiring_seconds",
- Help: "Histogram of time calls are rate limited (in seconds)",
- Buckets: cfg.Prometheus.GRPCLatencyBuckets,
- },
- []string{"system", "grpc_service", "grpc_method"},
- ),
- inProgressMetric: prometheus.NewGaugeVec(
- prometheus.GaugeOpts{
- Namespace: "gitaly",
- Subsystem: "rate_limiting",
- Name: "in_progress",
- Help: "Gauge of number of concurrent in-progress calls",
- },
- []string{"system", "grpc_service", "grpc_method"},
- ),
- queuedMetric: prometheus.NewGaugeVec(
- prometheus.GaugeOpts{
- Namespace: "gitaly",
- Subsystem: "rate_limiting",
- Name: "queued",
- Help: "Gauge of number of queued calls",
- },
- []string{"system", "grpc_service", "grpc_method"},
- ),
requestsDroppedMetric: prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "gitaly_requests_dropped_total",
@@ -87,7 +63,9 @@ func New(cfg config.Cfg, getLockKey GetLockKey) *LimiterMiddleware {
},
),
}
- middleware.methodLimiters = createLimiterConfig(middleware, cfg)
+
+ setupMiddleware(cfg, middleware)
+
return middleware
}
@@ -98,10 +76,10 @@ func (c *LimiterMiddleware) Describe(descs chan<- *prometheus.Desc) {
// Collect is used to collect Prometheus metrics.
func (c *LimiterMiddleware) Collect(metrics chan<- prometheus.Metric) {
- c.acquiringSecondsMetric.Collect(metrics)
- c.inProgressMetric.Collect(metrics)
- c.queuedMetric.Collect(metrics)
c.requestsDroppedMetric.Collect(metrics)
+ if c.collect != nil {
+ c.collect(metrics)
+ }
}
// UnaryInterceptor returns a Unary Interceptor
@@ -132,43 +110,8 @@ func (c *LimiterMiddleware) StreamInterceptor() grpc.StreamServerInterceptor {
}
}
-func createLimiterConfig(middleware *LimiterMiddleware, cfg config.Cfg) map[string]*ConcurrencyLimiter {
- result := make(map[string]*ConcurrencyLimiter)
-
- newTickerFunc := func() helper.Ticker {
- return helper.NewManualTicker()
- }
-
- for _, limit := range cfg.Concurrency {
- if limit.MaxQueueWait > 0 {
- limit := limit
- newTickerFunc = func() helper.Ticker {
- return helper.NewTimerTicker(limit.MaxQueueWait.Duration())
- }
- }
-
- result[limit.RPC] = NewLimiter(
- limit.MaxPerRepo,
- limit.MaxQueueSize,
- newTickerFunc,
- newPromMonitor(middleware, "gitaly", limit.RPC),
- )
- }
-
- // Set default for ReplicateRepository.
- replicateRepositoryFullMethod := "/gitaly.RepositoryService/ReplicateRepository"
- if _, ok := result[replicateRepositoryFullMethod]; !ok {
- result[replicateRepositoryFullMethod] = NewLimiter(
- 1,
- 0,
- func() helper.Ticker {
- return helper.NewManualTicker()
- },
- newPromMonitor(middleware, "gitaly", replicateRepositoryFullMethod))
- }
-
- return result
-}
+// SetupFunc set up a middleware to limiting requests
+type SetupFunc func(cfg config.Cfg, middleware *LimiterMiddleware)
type wrappedStream struct {
grpc.ServerStream
diff --git a/internal/middleware/limithandler/middleware_test.go b/internal/middleware/limithandler/middleware_test.go
index bd94228bb..610475a30 100644
--- a/internal/middleware/limithandler/middleware_test.go
+++ b/internal/middleware/limithandler/middleware_test.go
@@ -38,7 +38,7 @@ func TestUnaryLimitHandler(t *testing.T) {
},
}
- lh := limithandler.New(cfg, fixedLockKey)
+ lh := limithandler.New(cfg, fixedLockKey, limithandler.WithConcurrencyLimiters)
interceptor := lh.UnaryInterceptor()
srv, serverSocketPath := runServer(t, s, grpc.UnaryInterceptor(interceptor))
defer srv.Stop()
@@ -191,7 +191,7 @@ func TestStreamLimitHandler(t *testing.T) {
},
}
- lh := limithandler.New(cfg, fixedLockKey)
+ lh := limithandler.New(cfg, fixedLockKey, limithandler.WithConcurrencyLimiters)
interceptor := lh.StreamInterceptor()
srv, serverSocketPath := runServer(t, s, grpc.StreamInterceptor(interceptor))
defer srv.Stop()
@@ -233,7 +233,7 @@ func (q *queueTestServer) Unary(ctx context.Context, in *pb.UnaryRequest) (*pb.U
return &pb.UnaryResponse{Ok: true}, nil
}
-func TestLimitHandlerMetrics(t *testing.T) {
+func TestConcurrencyLimitHandlerMetrics(t *testing.T) {
s := &queueTestServer{reqArrivedCh: make(chan struct{})}
s.blockCh = make(chan struct{})
@@ -244,7 +244,7 @@ func TestLimitHandlerMetrics(t *testing.T) {
},
}
- lh := limithandler.New(cfg, fixedLockKey)
+ lh := limithandler.New(cfg, fixedLockKey, limithandler.WithConcurrencyLimiters)
interceptor := lh.UnaryInterceptor()
srv, serverSocketPath := runServer(t, s, grpc.UnaryInterceptor(interceptor))
defer srv.Stop()
diff --git a/internal/middleware/limithandler/monitor.go b/internal/middleware/limithandler/monitor.go
index f77014b9d..98dabf2a0 100644
--- a/internal/middleware/limithandler/monitor.go
+++ b/internal/middleware/limithandler/monitor.go
@@ -37,14 +37,19 @@ type promMonitor struct {
// newPromMonitor creates a new ConcurrencyMonitor that tracks limiter
// activity in Prometheus.
-func newPromMonitor(lh *LimiterMiddleware, system string, fullMethod string) ConcurrencyMonitor {
+func newPromMonitor(
+ system, fullMethod string,
+ queuedMetric, inProgressMetric *prometheus.GaugeVec,
+ acquiringSecondsMetric prometheus.ObserverVec,
+ requestsDroppedMetric *prometheus.CounterVec,
+) ConcurrencyMonitor {
serviceName, methodName := splitMethodName(fullMethod)
return &promMonitor{
- lh.queuedMetric.WithLabelValues(system, serviceName, methodName),
- lh.inProgressMetric.WithLabelValues(system, serviceName, methodName),
- lh.acquiringSecondsMetric.WithLabelValues(system, serviceName, methodName),
- lh.requestsDroppedMetric.MustCurryWith(prometheus.Labels{
+ queuedMetric.WithLabelValues(system, serviceName, methodName),
+ inProgressMetric.WithLabelValues(system, serviceName, methodName),
+ acquiringSecondsMetric.WithLabelValues(system, serviceName, methodName),
+ requestsDroppedMetric.MustCurryWith(prometheus.Labels{
"system": system,
"grpc_service": serviceName,
"grpc_method": methodName,
diff --git a/internal/testhelper/testserver/gitaly.go b/internal/testhelper/testserver/gitaly.go
index 9f60a4afa..d2f212045 100644
--- a/internal/testhelper/testserver/gitaly.go
+++ b/internal/testhelper/testserver/gitaly.go
@@ -302,7 +302,7 @@ func (gsd *gitalyServerDeps) createDependencies(t testing.TB, cfg config.Cfg, ru
}
if gsd.limitHandler == nil {
- gsd.limitHandler = limithandler.New(cfg, limithandler.LimitConcurrencyByRepo)
+ gsd.limitHandler = limithandler.New(cfg, limithandler.LimitConcurrencyByRepo, limithandler.WithConcurrencyLimiters)
}
if gsd.git2goExecutor == nil {