Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorQuang-Minh Nguyen <qmnguyen@gitlab.com>2023-08-02 05:22:35 +0300
committerQuang-Minh Nguyen <qmnguyen@gitlab.com>2023-08-17 11:53:28 +0300
commitb0cea14b85380178ef826a4aab2c7beb383d3e4e (patch)
tree711fb68480a77f3d778c62aade2a6eea4b583623
parent4aa92f92eba7535694d90194a4d6c1fff090c508 (diff)
limiter: Add context to limithandler.WithConcurrencyLimiters
This commit adds context to limithandler.WithConcurrencyLimiters. This addition makes it align with limithandler.WithRateLimiters's signature. This context is used to control the cancellation of some internal goroutines in some next commits.
-rw-r--r--internal/cli/gitaly/serve.go3
-rw-r--r--internal/gitaly/server/auth_test.go31
-rw-r--r--internal/gitaly/service/hook/pack_objects_test.go1
-rw-r--r--internal/grpc/middleware/limithandler/middleware.go144
-rw-r--r--internal/grpc/middleware/limithandler/middleware_test.go24
-rw-r--r--internal/limiter/concurrency_limiter.go6
-rw-r--r--internal/limiter/concurrency_limiter_test.go4
-rw-r--r--internal/testhelper/testserver/gitaly.go11
8 files changed, 119 insertions, 105 deletions
diff --git a/internal/cli/gitaly/serve.go b/internal/cli/gitaly/serve.go
index f8180cdd4..0b13c2a2a 100644
--- a/internal/cli/gitaly/serve.go
+++ b/internal/cli/gitaly/serve.go
@@ -274,7 +274,7 @@ func run(cfg config.Cfg, logger logrus.FieldLogger) error {
concurrencyLimitHandler := limithandler.New(
cfg,
limithandler.LimitConcurrencyByRepo,
- limithandler.WithConcurrencyLimiters,
+ limithandler.WithConcurrencyLimiters(ctx),
)
rateLimitHandler := limithandler.New(
@@ -295,6 +295,7 @@ func run(cfg config.Cfg, logger logrus.FieldLogger) error {
}
}
packObjectsLimiter := limiter.NewConcurrencyLimiter(
+ ctx,
cfg.PackObjectsLimiting.MaxConcurrency,
cfg.PackObjectsLimiting.MaxQueueLength,
newTickerFunc,
diff --git a/internal/gitaly/server/auth_test.go b/internal/gitaly/server/auth_test.go
index 6333b5942..c845bd68f 100644
--- a/internal/gitaly/server/auth_test.go
+++ b/internal/gitaly/server/auth_test.go
@@ -1,7 +1,7 @@
package server
import (
- netctx "context"
+ "context"
"crypto/tls"
"crypto/x509"
"fmt"
@@ -44,7 +44,7 @@ func TestMain(m *testing.M) {
}
func TestSanity(t *testing.T) {
- serverSocketPath := runServer(t, testcfg.Build(t))
+ serverSocketPath := runServer(t, testhelper.Context(t), testcfg.Build(t))
conn, err := dial(serverSocketPath, []grpc.DialOption{grpc.WithTransportCredentials(insecure.NewCredentials())})
require.NoError(t, err)
@@ -55,7 +55,8 @@ func TestSanity(t *testing.T) {
func TestTLSSanity(t *testing.T) {
cfg := testcfg.Build(t)
- addr := runSecureServer(t, cfg)
+ ctx := testhelper.Context(t)
+ addr := runSecureServer(t, ctx, cfg)
certPool, err := x509.SystemCertPool()
require.NoError(t, err)
@@ -102,7 +103,7 @@ func TestAuthFailures(t *testing.T) {
Auth: auth.Config{Token: "quxbaz"},
}))
- serverSocketPath := runServer(t, cfg)
+ serverSocketPath := runServer(t, testhelper.Context(t), cfg)
connOpts := append(tc.opts, grpc.WithTransportCredentials(insecure.NewCredentials()))
conn, err := dial(serverSocketPath, connOpts)
require.NoError(t, err, tc.desc)
@@ -145,7 +146,7 @@ func TestAuthSuccess(t *testing.T) {
Auth: auth.Config{Token: tc.token, Transitioning: !tc.required},
}))
- serverSocketPath := runServer(t, cfg)
+ serverSocketPath := runServer(t, testhelper.Context(t), cfg)
connOpts := append(tc.opts, grpc.WithTransportCredentials(insecure.NewCredentials()))
conn, err := dial(serverSocketPath, connOpts)
require.NoError(t, err, tc.desc)
@@ -158,7 +159,7 @@ func TestAuthSuccess(t *testing.T) {
type brokenAuth struct{}
func (brokenAuth) RequireTransportSecurity() bool { return false }
-func (brokenAuth) GetRequestMetadata(netctx.Context, ...string) (map[string]string, error) {
+func (brokenAuth) GetRequestMetadata(context.Context, ...string) (map[string]string, error) {
return map[string]string{"authorization": "Bearer blablabla"}, nil
}
@@ -186,7 +187,7 @@ func newOperationClient(t *testing.T, token, serverSocketPath string) (gitalypb.
return gitalypb.NewOperationServiceClient(conn), conn
}
-func runServer(t *testing.T, cfg config.Cfg) string {
+func runServer(t *testing.T, ctx context.Context, cfg config.Cfg) string {
t.Helper()
registry := backchannel.NewRegistry()
@@ -201,7 +202,7 @@ func runServer(t *testing.T, cfg config.Cfg) string {
catfileCache := catfile.NewCache(cfg)
t.Cleanup(catfileCache.Stop)
diskCache := cache.New(cfg, locator)
- limitHandler := limithandler.New(cfg, limithandler.LimitConcurrencyByRepo, limithandler.WithConcurrencyLimiters)
+ limitHandler := limithandler.New(cfg, limithandler.LimitConcurrencyByRepo, limithandler.WithConcurrencyLimiters(ctx))
updaterWithHooks := updateref.NewUpdaterWithHooks(cfg, locator, hookManager, gitCmdFactory, catfileCache)
srv, err := NewGitalyServerFactory(cfg, testhelper.NewDiscardingLogEntry(t), registry, diskCache, []*limithandler.LimiterMiddleware{limitHandler}).New(false)
@@ -228,7 +229,7 @@ func runServer(t *testing.T, cfg config.Cfg) string {
}
//go:generate openssl req -newkey rsa:4096 -new -nodes -x509 -days 3650 -out testdata/gitalycert.pem -keyout testdata/gitalykey.pem -subj "/C=US/ST=California/L=San Francisco/O=GitLab/OU=GitLab-Shell/CN=localhost" -addext "subjectAltName = IP:127.0.0.1, DNS:localhost"
-func runSecureServer(t *testing.T, cfg config.Cfg) string {
+func runSecureServer(t *testing.T, ctx context.Context, cfg config.Cfg) string {
t.Helper()
cfg.TLS = config.TLS{
@@ -244,7 +245,7 @@ func runSecureServer(t *testing.T, cfg config.Cfg) string {
testhelper.NewDiscardingLogEntry(t),
backchannel.NewRegistry(),
cache.New(cfg, config.NewLocator(cfg)),
- []*limithandler.LimiterMiddleware{limithandler.New(cfg, limithandler.LimitConcurrencyByRepo, limithandler.WithConcurrencyLimiters)},
+ []*limithandler.LimiterMiddleware{limithandler.New(cfg, limithandler.LimitConcurrencyByRepo, limithandler.WithConcurrencyLimiters(ctx))},
).New(true)
require.NoError(t, err)
@@ -259,12 +260,12 @@ func runSecureServer(t *testing.T, cfg config.Cfg) string {
func TestUnaryNoAuth(t *testing.T) {
cfg := testcfg.Build(t, testcfg.WithBase(config.Cfg{Auth: auth.Config{Token: "testtoken"}}))
+ ctx := testhelper.Context(t)
- path := runServer(t, cfg)
+ path := runServer(t, ctx, cfg)
conn, err := grpc.Dial(path, grpc.WithTransportCredentials(insecure.NewCredentials()))
require.NoError(t, err)
defer testhelper.MustClose(t, conn)
- ctx := testhelper.Context(t)
client := gitalypb.NewRepositoryServiceClient(conn)
_, err = client.CreateRepository(ctx, &gitalypb.CreateRepositoryRequest{
@@ -280,12 +281,12 @@ func TestUnaryNoAuth(t *testing.T) {
func TestStreamingNoAuth(t *testing.T) {
cfg := testcfg.Build(t, testcfg.WithBase(config.Cfg{Auth: auth.Config{Token: "testtoken"}}))
+ ctx := testhelper.Context(t)
- path := runServer(t, cfg)
+ path := runServer(t, ctx, cfg)
conn, err := dial(path, []grpc.DialOption{grpc.WithTransportCredentials(insecure.NewCredentials())})
require.NoError(t, err)
t.Cleanup(func() { conn.Close() })
- ctx := testhelper.Context(t)
client := gitalypb.NewRepositoryServiceClient(conn)
stream, err := client.GetInfoAttributes(ctx, &gitalypb.GetInfoAttributesRequest{
@@ -330,7 +331,7 @@ func TestAuthBeforeLimit(t *testing.T) {
t.Cleanup(cleanup)
cfg.Gitlab.URL = gitlabURL
- serverSocketPath := runServer(t, cfg)
+ serverSocketPath := runServer(t, ctx, cfg)
client, conn := newOperationClient(t, cfg.Auth.Token, serverSocketPath)
t.Cleanup(func() { conn.Close() })
diff --git a/internal/gitaly/service/hook/pack_objects_test.go b/internal/gitaly/service/hook/pack_objects_test.go
index 4d4d1a500..35dffadbc 100644
--- a/internal/gitaly/service/hook/pack_objects_test.go
+++ b/internal/gitaly/service/hook/pack_objects_test.go
@@ -858,6 +858,7 @@ func TestPackObjects_concurrencyLimit(t *testing.T) {
cfg.Prometheus.GRPCLatencyBuckets,
)
limiter := limiter.NewConcurrencyLimiter(
+ ctx,
1,
0,
func() helper.Ticker { return ticker },
diff --git a/internal/grpc/middleware/limithandler/middleware.go b/internal/grpc/middleware/limithandler/middleware.go
index 0c41a4672..ebbe44662 100644
--- a/internal/grpc/middleware/limithandler/middleware.go
+++ b/internal/grpc/middleware/limithandler/middleware.go
@@ -165,84 +165,88 @@ func (w *wrappedStream) RecvMsg(m interface{}) error {
// WithConcurrencyLimiters sets up middleware to limit the concurrency of
// requests based on RPC and repository
-func WithConcurrencyLimiters(cfg config.Cfg, middleware *LimiterMiddleware) {
- acquiringSecondsMetric := prometheus.NewHistogramVec(
- prometheus.HistogramOpts{
- Namespace: "gitaly",
- Subsystem: "concurrency_limiting",
- Name: "acquiring_seconds",
- Help: "Histogram of time calls are rate limited (in seconds)",
- Buckets: cfg.Prometheus.GRPCLatencyBuckets,
- },
- []string{"system", "grpc_service", "grpc_method"},
- )
- inProgressMetric := prometheus.NewGaugeVec(
- prometheus.GaugeOpts{
- Namespace: "gitaly",
- Subsystem: "concurrency_limiting",
- Name: "in_progress",
- Help: "Gauge of number of concurrent in-progress calls",
- },
- []string{"system", "grpc_service", "grpc_method"},
- )
- queuedMetric := prometheus.NewGaugeVec(
- prometheus.GaugeOpts{
- Namespace: "gitaly",
- Subsystem: "concurrency_limiting",
- Name: "queued",
- Help: "Gauge of number of queued calls",
- },
- []string{"system", "grpc_service", "grpc_method"},
- )
-
- middleware.collect = func(metrics chan<- prometheus.Metric) {
- acquiringSecondsMetric.Collect(metrics)
- inProgressMetric.Collect(metrics)
- queuedMetric.Collect(metrics)
- }
-
- result := make(map[string]limiter.Limiter)
- for _, limit := range cfg.Concurrency {
- limit := limit
+func WithConcurrencyLimiters(ctx context.Context) SetupFunc {
+ return func(cfg config.Cfg, middleware *LimiterMiddleware) {
+ acquiringSecondsMetric := prometheus.NewHistogramVec(
+ prometheus.HistogramOpts{
+ Namespace: "gitaly",
+ Subsystem: "concurrency_limiting",
+ Name: "acquiring_seconds",
+ Help: "Histogram of time calls are rate limited (in seconds)",
+ Buckets: cfg.Prometheus.GRPCLatencyBuckets,
+ },
+ []string{"system", "grpc_service", "grpc_method"},
+ )
+ inProgressMetric := prometheus.NewGaugeVec(
+ prometheus.GaugeOpts{
+ Namespace: "gitaly",
+ Subsystem: "concurrency_limiting",
+ Name: "in_progress",
+ Help: "Gauge of number of concurrent in-progress calls",
+ },
+ []string{"system", "grpc_service", "grpc_method"},
+ )
+ queuedMetric := prometheus.NewGaugeVec(
+ prometheus.GaugeOpts{
+ Namespace: "gitaly",
+ Subsystem: "concurrency_limiting",
+ Name: "queued",
+ Help: "Gauge of number of queued calls",
+ },
+ []string{"system", "grpc_service", "grpc_method"},
+ )
- newTickerFunc := func() helper.Ticker {
- return helper.NewManualTicker()
+ middleware.collect = func(metrics chan<- prometheus.Metric) {
+ acquiringSecondsMetric.Collect(metrics)
+ inProgressMetric.Collect(metrics)
+ queuedMetric.Collect(metrics)
}
- if limit.MaxQueueWait > 0 {
- newTickerFunc = func() helper.Ticker {
- return helper.NewTimerTicker(limit.MaxQueueWait.Duration())
+ result := make(map[string]limiter.Limiter)
+ for _, limit := range cfg.Concurrency {
+ limit := limit
+
+ newTickerFunc := func() helper.Ticker {
+ return helper.NewManualTicker()
}
+
+ if limit.MaxQueueWait > 0 {
+ newTickerFunc = func() helper.Ticker {
+ return helper.NewTimerTicker(limit.MaxQueueWait.Duration())
+ }
+ }
+
+ result[limit.RPC] = limiter.NewConcurrencyLimiter(
+ ctx,
+ limit.MaxPerRepo,
+ limit.MaxQueueSize,
+ newTickerFunc,
+ limiter.NewPerRPCPromMonitor(
+ "gitaly", limit.RPC,
+ queuedMetric, inProgressMetric, acquiringSecondsMetric, middleware.requestsDroppedMetric,
+ ),
+ )
}
- result[limit.RPC] = limiter.NewConcurrencyLimiter(
- limit.MaxPerRepo,
- limit.MaxQueueSize,
- newTickerFunc,
- limiter.NewPerRPCPromMonitor(
- "gitaly", limit.RPC,
- queuedMetric, inProgressMetric, acquiringSecondsMetric, middleware.requestsDroppedMetric,
- ),
- )
- }
+ // Set default for ReplicateRepository.
+ replicateRepositoryFullMethod := "/gitaly.RepositoryService/ReplicateRepository"
+ if _, ok := result[replicateRepositoryFullMethod]; !ok {
+ result[replicateRepositoryFullMethod] = limiter.NewConcurrencyLimiter(
+ ctx,
+ 1,
+ 0,
+ func() helper.Ticker {
+ return helper.NewManualTicker()
+ },
+ limiter.NewPerRPCPromMonitor(
+ "gitaly", replicateRepositoryFullMethod,
+ queuedMetric, inProgressMetric, acquiringSecondsMetric, middleware.requestsDroppedMetric,
+ ),
+ )
+ }
- // Set default for ReplicateRepository.
- replicateRepositoryFullMethod := "/gitaly.RepositoryService/ReplicateRepository"
- if _, ok := result[replicateRepositoryFullMethod]; !ok {
- result[replicateRepositoryFullMethod] = limiter.NewConcurrencyLimiter(
- 1,
- 0,
- func() helper.Ticker {
- return helper.NewManualTicker()
- },
- limiter.NewPerRPCPromMonitor(
- "gitaly", replicateRepositoryFullMethod,
- queuedMetric, inProgressMetric, acquiringSecondsMetric, middleware.requestsDroppedMetric,
- ),
- )
+ middleware.methodLimiters = result
}
-
- middleware.methodLimiters = result
}
// WithRateLimiters sets up a middleware with limiters that limit requests
diff --git a/internal/grpc/middleware/limithandler/middleware_test.go b/internal/grpc/middleware/limithandler/middleware_test.go
index 65adad2be..40978be69 100644
--- a/internal/grpc/middleware/limithandler/middleware_test.go
+++ b/internal/grpc/middleware/limithandler/middleware_test.go
@@ -38,6 +38,7 @@ func fixedLockKey(ctx context.Context) string {
func TestUnaryLimitHandler(t *testing.T) {
t.Parallel()
+ ctx := testhelper.Context(t)
s := &queueTestServer{
server: server{
blockCh: make(chan struct{}),
@@ -51,14 +52,13 @@ func TestUnaryLimitHandler(t *testing.T) {
},
}
- lh := limithandler.New(cfg, fixedLockKey, limithandler.WithConcurrencyLimiters)
+ lh := limithandler.New(cfg, fixedLockKey, limithandler.WithConcurrencyLimiters(ctx))
interceptor := lh.UnaryInterceptor()
srv, serverSocketPath := runServer(t, s, grpc.UnaryInterceptor(interceptor))
defer srv.Stop()
client, conn := newClient(t, serverSocketPath)
defer conn.Close()
- ctx := testhelper.Context(t)
var wg sync.WaitGroup
defer wg.Wait()
@@ -114,7 +114,7 @@ func TestUnaryLimitHandler_queueing(t *testing.T) {
MaxQueueWait: duration.Duration(time.Millisecond),
},
},
- }, fixedLockKey, limithandler.WithConcurrencyLimiters)
+ }, fixedLockKey, limithandler.WithConcurrencyLimiters(ctx))
s := &queueTestServer{
server: server{
@@ -175,7 +175,7 @@ func TestUnaryLimitHandler_queueing(t *testing.T) {
MaxPerRepo: 1,
},
},
- }, fixedLockKey, limithandler.WithConcurrencyLimiters)
+ }, fixedLockKey, limithandler.WithConcurrencyLimiters(ctx))
s := &queueTestServer{
server: server{
@@ -427,6 +427,7 @@ func TestStreamLimitHandler(t *testing.T) {
t.Run(tc.desc, func(t *testing.T) {
t.Parallel()
+ ctx := testhelper.Context(t)
s := &server{blockCh: make(chan struct{})}
maxQueueSize := 1
@@ -440,14 +441,13 @@ func TestStreamLimitHandler(t *testing.T) {
},
}
- lh := limithandler.New(cfg, fixedLockKey, limithandler.WithConcurrencyLimiters)
+ lh := limithandler.New(cfg, fixedLockKey, limithandler.WithConcurrencyLimiters(ctx))
interceptor := lh.StreamInterceptor()
srv, serverSocketPath := runServer(t, s, grpc.StreamInterceptor(interceptor))
defer srv.Stop()
client, conn := newClient(t, serverSocketPath)
defer conn.Close()
- ctx := testhelper.Context(t)
totalCalls := 10
@@ -481,6 +481,8 @@ func TestStreamLimitHandler(t *testing.T) {
func TestStreamLimitHandler_error(t *testing.T) {
t.Parallel()
+ ctx := testhelper.Context(t)
+
s := &queueTestServer{reqArrivedCh: make(chan struct{})}
s.blockCh = make(chan struct{})
@@ -490,7 +492,7 @@ func TestStreamLimitHandler_error(t *testing.T) {
},
}
- lh := limithandler.New(cfg, fixedLockKey, limithandler.WithConcurrencyLimiters)
+ lh := limithandler.New(cfg, fixedLockKey, limithandler.WithConcurrencyLimiters(ctx))
interceptor := lh.StreamInterceptor()
srv, serverSocketPath := runServer(t, s, grpc.StreamInterceptor(interceptor))
defer srv.Stop()
@@ -498,8 +500,6 @@ func TestStreamLimitHandler_error(t *testing.T) {
client, conn := newClient(t, serverSocketPath)
defer conn.Close()
- ctx := testhelper.Context(t)
-
respChan := make(chan *grpc_testing.StreamingOutputCallResponse)
go func() {
stream, err := client.FullDuplexCall(ctx)
@@ -600,6 +600,8 @@ func (q *queueTestServer) FullDuplexCall(stream grpc_testing.TestService_FullDup
}
func TestConcurrencyLimitHandlerMetrics(t *testing.T) {
+ ctx := testhelper.Context(t)
+
s := &queueTestServer{reqArrivedCh: make(chan struct{})}
s.blockCh = make(chan struct{})
@@ -610,7 +612,7 @@ func TestConcurrencyLimitHandlerMetrics(t *testing.T) {
},
}
- lh := limithandler.New(cfg, fixedLockKey, limithandler.WithConcurrencyLimiters)
+ lh := limithandler.New(cfg, fixedLockKey, limithandler.WithConcurrencyLimiters(ctx))
interceptor := lh.UnaryInterceptor()
srv, serverSocketPath := runServer(t, s, grpc.UnaryInterceptor(interceptor))
defer srv.Stop()
@@ -618,8 +620,6 @@ func TestConcurrencyLimitHandlerMetrics(t *testing.T) {
client, conn := newClient(t, serverSocketPath)
defer conn.Close()
- ctx := testhelper.Context(t)
-
respCh := make(chan *grpc_testing.SimpleResponse)
go func() {
resp, err := client.UnaryCall(ctx, &grpc_testing.SimpleRequest{})
diff --git a/internal/limiter/concurrency_limiter.go b/internal/limiter/concurrency_limiter.go
index 2807b856e..32b08b32e 100644
--- a/internal/limiter/concurrency_limiter.go
+++ b/internal/limiter/concurrency_limiter.go
@@ -122,6 +122,9 @@ func (sem *keyedConcurrencyLimiter) queueLength() int {
// ConcurrencyLimiter contains rate limiter state.
type ConcurrencyLimiter struct {
+ // ctx stores the context at initialization. This context is used as a stopping condition
+ // for some internal goroutines.
+ ctx context.Context
// maxConcurrencyLimit is the maximum number of concurrent calls to the limited function.
// This limit is per key.
maxConcurrencyLimit int64
@@ -145,12 +148,13 @@ type ConcurrencyLimiter struct {
}
// NewConcurrencyLimiter creates a new concurrency rate limiter.
-func NewConcurrencyLimiter(maxConcurrencyLimit, maxQueueLength int, maxQueuedTickerCreator QueueTickerCreator, monitor ConcurrencyMonitor) *ConcurrencyLimiter {
+func NewConcurrencyLimiter(ctx context.Context, maxConcurrencyLimit, maxQueueLength int, maxQueuedTickerCreator QueueTickerCreator, monitor ConcurrencyMonitor) *ConcurrencyLimiter {
if monitor == nil {
monitor = NewNoopConcurrencyMonitor()
}
return &ConcurrencyLimiter{
+ ctx: ctx,
maxConcurrencyLimit: int64(maxConcurrencyLimit),
maxQueueLength: int64(maxQueueLength),
maxQueuedTickerCreator: maxQueuedTickerCreator,
diff --git a/internal/limiter/concurrency_limiter_test.go b/internal/limiter/concurrency_limiter_test.go
index f99ca7eab..b650ae5f5 100644
--- a/internal/limiter/concurrency_limiter_test.go
+++ b/internal/limiter/concurrency_limiter_test.go
@@ -155,6 +155,7 @@ func TestLimiter(t *testing.T) {
gauge := &counter{}
limiter := NewConcurrencyLimiter(
+ ctx,
tt.maxConcurrency,
0,
nil,
@@ -248,7 +249,7 @@ func TestConcurrencyLimiter_queueLimit(t *testing.T) {
monitorCh := make(chan struct{})
monitor := &blockingQueueCounter{queuedCh: monitorCh}
ch := make(chan struct{})
- limiter := NewConcurrencyLimiter(1, queueLimit, nil, monitor)
+ limiter := NewConcurrencyLimiter(ctx, 1, queueLimit, nil, monitor)
// occupied with one live request that takes a long time to complete
go func() {
@@ -333,6 +334,7 @@ func TestLimitConcurrency_queueWaitTime(t *testing.T) {
monitor := &blockingDequeueCounter{dequeuedCh: dequeuedCh}
limiter := NewConcurrencyLimiter(
+ ctx,
1,
0,
func() helper.Ticker {
diff --git a/internal/testhelper/testserver/gitaly.go b/internal/testhelper/testserver/gitaly.go
index 51d29bcdc..cf76158b8 100644
--- a/internal/testhelper/testserver/gitaly.go
+++ b/internal/testhelper/testserver/gitaly.go
@@ -161,6 +161,8 @@ func waitHealthy(tb testing.TB, ctx context.Context, addr string, authToken stri
func runGitaly(tb testing.TB, cfg config.Cfg, registrar func(srv *grpc.Server, deps *service.Dependencies), opts ...GitalyServerOpt) (*grpc.Server, string, bool) {
tb.Helper()
+ ctx := testhelper.Context(tb)
+
var gsd gitalyServerDeps
for _, opt := range opts {
gsd = opt(gsd)
@@ -173,7 +175,7 @@ func runGitaly(tb testing.TB, cfg config.Cfg, registrar func(srv *grpc.Server, d
server.WithStreamInterceptor(StructErrStreamInterceptor),
}
- deps := gsd.createDependencies(tb, cfg)
+ deps := gsd.createDependencies(tb, ctx, cfg)
tb.Cleanup(func() { gsd.conns.Close() })
serverFactory := server.NewGitalyServerFactory(
@@ -201,7 +203,6 @@ func runGitaly(tb testing.TB, cfg config.Cfg, registrar func(srv *grpc.Server, d
assert.NoError(tb, internalServer.Serve(internalListener), "failure to serve internal gRPC")
}()
- ctx := testhelper.Context(tb)
waitHealthy(tb, ctx, "unix://"+internalListener.Addr().String(), cfg.Auth.Token)
}
@@ -238,7 +239,6 @@ func runGitaly(tb testing.TB, cfg config.Cfg, registrar func(srv *grpc.Server, d
assert.NoError(tb, externalServer.Serve(listener), "failure to serve external gRPC")
}()
- ctx := testhelper.Context(tb)
waitHealthy(tb, ctx, addr, cfg.Auth.Token)
return externalServer, addr, gsd.disablePraefect
@@ -276,7 +276,7 @@ type gitalyServerDeps struct {
signingKey string
}
-func (gsd *gitalyServerDeps) createDependencies(tb testing.TB, cfg config.Cfg) *service.Dependencies {
+func (gsd *gitalyServerDeps) createDependencies(tb testing.TB, ctx context.Context, cfg config.Cfg) *service.Dependencies {
if gsd.logger == nil {
gsd.logger = testhelper.NewGitalyServerLogger(tb)
}
@@ -328,6 +328,7 @@ func (gsd *gitalyServerDeps) createDependencies(tb testing.TB, cfg config.Cfg) *
if gsd.packObjectsLimiter == nil {
gsd.packObjectsLimiter = limiter.NewConcurrencyLimiter(
+ ctx,
0,
0,
nil,
@@ -336,7 +337,7 @@ func (gsd *gitalyServerDeps) createDependencies(tb testing.TB, cfg config.Cfg) *
}
if gsd.limitHandler == nil {
- gsd.limitHandler = limithandler.New(cfg, limithandler.LimitConcurrencyByRepo, limithandler.WithConcurrencyLimiters)
+ gsd.limitHandler = limithandler.New(cfg, limithandler.LimitConcurrencyByRepo, limithandler.WithConcurrencyLimiters(ctx))
}
if gsd.repositoryCounter == nil {