diff options
author | Quang-Minh Nguyen <qmnguyen@gitlab.com> | 2023-08-02 05:22:35 +0300 |
---|---|---|
committer | Quang-Minh Nguyen <qmnguyen@gitlab.com> | 2023-08-17 11:53:28 +0300 |
commit | b0cea14b85380178ef826a4aab2c7beb383d3e4e (patch) | |
tree | 711fb68480a77f3d778c62aade2a6eea4b583623 | |
parent | 4aa92f92eba7535694d90194a4d6c1fff090c508 (diff) |
limiter: Add context to limithandler.WithConcurrencyLimiters
This commit adds context to limithandler.WithConcurrencyLimiters. This
addition makes it align with limithandler.WithRateLimiters's signature.
This context is used to control the cancellation of some internal
goroutines in some next commits.
-rw-r--r-- | internal/cli/gitaly/serve.go | 3 | ||||
-rw-r--r-- | internal/gitaly/server/auth_test.go | 31 | ||||
-rw-r--r-- | internal/gitaly/service/hook/pack_objects_test.go | 1 | ||||
-rw-r--r-- | internal/grpc/middleware/limithandler/middleware.go | 144 | ||||
-rw-r--r-- | internal/grpc/middleware/limithandler/middleware_test.go | 24 | ||||
-rw-r--r-- | internal/limiter/concurrency_limiter.go | 6 | ||||
-rw-r--r-- | internal/limiter/concurrency_limiter_test.go | 4 | ||||
-rw-r--r-- | internal/testhelper/testserver/gitaly.go | 11 |
8 files changed, 119 insertions, 105 deletions
diff --git a/internal/cli/gitaly/serve.go b/internal/cli/gitaly/serve.go index f8180cdd4..0b13c2a2a 100644 --- a/internal/cli/gitaly/serve.go +++ b/internal/cli/gitaly/serve.go @@ -274,7 +274,7 @@ func run(cfg config.Cfg, logger logrus.FieldLogger) error { concurrencyLimitHandler := limithandler.New( cfg, limithandler.LimitConcurrencyByRepo, - limithandler.WithConcurrencyLimiters, + limithandler.WithConcurrencyLimiters(ctx), ) rateLimitHandler := limithandler.New( @@ -295,6 +295,7 @@ func run(cfg config.Cfg, logger logrus.FieldLogger) error { } } packObjectsLimiter := limiter.NewConcurrencyLimiter( + ctx, cfg.PackObjectsLimiting.MaxConcurrency, cfg.PackObjectsLimiting.MaxQueueLength, newTickerFunc, diff --git a/internal/gitaly/server/auth_test.go b/internal/gitaly/server/auth_test.go index 6333b5942..c845bd68f 100644 --- a/internal/gitaly/server/auth_test.go +++ b/internal/gitaly/server/auth_test.go @@ -1,7 +1,7 @@ package server import ( - netctx "context" + "context" "crypto/tls" "crypto/x509" "fmt" @@ -44,7 +44,7 @@ func TestMain(m *testing.M) { } func TestSanity(t *testing.T) { - serverSocketPath := runServer(t, testcfg.Build(t)) + serverSocketPath := runServer(t, testhelper.Context(t), testcfg.Build(t)) conn, err := dial(serverSocketPath, []grpc.DialOption{grpc.WithTransportCredentials(insecure.NewCredentials())}) require.NoError(t, err) @@ -55,7 +55,8 @@ func TestSanity(t *testing.T) { func TestTLSSanity(t *testing.T) { cfg := testcfg.Build(t) - addr := runSecureServer(t, cfg) + ctx := testhelper.Context(t) + addr := runSecureServer(t, ctx, cfg) certPool, err := x509.SystemCertPool() require.NoError(t, err) @@ -102,7 +103,7 @@ func TestAuthFailures(t *testing.T) { Auth: auth.Config{Token: "quxbaz"}, })) - serverSocketPath := runServer(t, cfg) + serverSocketPath := runServer(t, testhelper.Context(t), cfg) connOpts := append(tc.opts, grpc.WithTransportCredentials(insecure.NewCredentials())) conn, err := dial(serverSocketPath, connOpts) require.NoError(t, err, tc.desc) @@ -145,7 +146,7 @@ func TestAuthSuccess(t *testing.T) { Auth: auth.Config{Token: tc.token, Transitioning: !tc.required}, })) - serverSocketPath := runServer(t, cfg) + serverSocketPath := runServer(t, testhelper.Context(t), cfg) connOpts := append(tc.opts, grpc.WithTransportCredentials(insecure.NewCredentials())) conn, err := dial(serverSocketPath, connOpts) require.NoError(t, err, tc.desc) @@ -158,7 +159,7 @@ func TestAuthSuccess(t *testing.T) { type brokenAuth struct{} func (brokenAuth) RequireTransportSecurity() bool { return false } -func (brokenAuth) GetRequestMetadata(netctx.Context, ...string) (map[string]string, error) { +func (brokenAuth) GetRequestMetadata(context.Context, ...string) (map[string]string, error) { return map[string]string{"authorization": "Bearer blablabla"}, nil } @@ -186,7 +187,7 @@ func newOperationClient(t *testing.T, token, serverSocketPath string) (gitalypb. return gitalypb.NewOperationServiceClient(conn), conn } -func runServer(t *testing.T, cfg config.Cfg) string { +func runServer(t *testing.T, ctx context.Context, cfg config.Cfg) string { t.Helper() registry := backchannel.NewRegistry() @@ -201,7 +202,7 @@ func runServer(t *testing.T, cfg config.Cfg) string { catfileCache := catfile.NewCache(cfg) t.Cleanup(catfileCache.Stop) diskCache := cache.New(cfg, locator) - limitHandler := limithandler.New(cfg, limithandler.LimitConcurrencyByRepo, limithandler.WithConcurrencyLimiters) + limitHandler := limithandler.New(cfg, limithandler.LimitConcurrencyByRepo, limithandler.WithConcurrencyLimiters(ctx)) updaterWithHooks := updateref.NewUpdaterWithHooks(cfg, locator, hookManager, gitCmdFactory, catfileCache) srv, err := NewGitalyServerFactory(cfg, testhelper.NewDiscardingLogEntry(t), registry, diskCache, []*limithandler.LimiterMiddleware{limitHandler}).New(false) @@ -228,7 +229,7 @@ func runServer(t *testing.T, cfg config.Cfg) string { } //go:generate openssl req -newkey rsa:4096 -new -nodes -x509 -days 3650 -out testdata/gitalycert.pem -keyout testdata/gitalykey.pem -subj "/C=US/ST=California/L=San Francisco/O=GitLab/OU=GitLab-Shell/CN=localhost" -addext "subjectAltName = IP:127.0.0.1, DNS:localhost" -func runSecureServer(t *testing.T, cfg config.Cfg) string { +func runSecureServer(t *testing.T, ctx context.Context, cfg config.Cfg) string { t.Helper() cfg.TLS = config.TLS{ @@ -244,7 +245,7 @@ func runSecureServer(t *testing.T, cfg config.Cfg) string { testhelper.NewDiscardingLogEntry(t), backchannel.NewRegistry(), cache.New(cfg, config.NewLocator(cfg)), - []*limithandler.LimiterMiddleware{limithandler.New(cfg, limithandler.LimitConcurrencyByRepo, limithandler.WithConcurrencyLimiters)}, + []*limithandler.LimiterMiddleware{limithandler.New(cfg, limithandler.LimitConcurrencyByRepo, limithandler.WithConcurrencyLimiters(ctx))}, ).New(true) require.NoError(t, err) @@ -259,12 +260,12 @@ func runSecureServer(t *testing.T, cfg config.Cfg) string { func TestUnaryNoAuth(t *testing.T) { cfg := testcfg.Build(t, testcfg.WithBase(config.Cfg{Auth: auth.Config{Token: "testtoken"}})) + ctx := testhelper.Context(t) - path := runServer(t, cfg) + path := runServer(t, ctx, cfg) conn, err := grpc.Dial(path, grpc.WithTransportCredentials(insecure.NewCredentials())) require.NoError(t, err) defer testhelper.MustClose(t, conn) - ctx := testhelper.Context(t) client := gitalypb.NewRepositoryServiceClient(conn) _, err = client.CreateRepository(ctx, &gitalypb.CreateRepositoryRequest{ @@ -280,12 +281,12 @@ func TestUnaryNoAuth(t *testing.T) { func TestStreamingNoAuth(t *testing.T) { cfg := testcfg.Build(t, testcfg.WithBase(config.Cfg{Auth: auth.Config{Token: "testtoken"}})) + ctx := testhelper.Context(t) - path := runServer(t, cfg) + path := runServer(t, ctx, cfg) conn, err := dial(path, []grpc.DialOption{grpc.WithTransportCredentials(insecure.NewCredentials())}) require.NoError(t, err) t.Cleanup(func() { conn.Close() }) - ctx := testhelper.Context(t) client := gitalypb.NewRepositoryServiceClient(conn) stream, err := client.GetInfoAttributes(ctx, &gitalypb.GetInfoAttributesRequest{ @@ -330,7 +331,7 @@ func TestAuthBeforeLimit(t *testing.T) { t.Cleanup(cleanup) cfg.Gitlab.URL = gitlabURL - serverSocketPath := runServer(t, cfg) + serverSocketPath := runServer(t, ctx, cfg) client, conn := newOperationClient(t, cfg.Auth.Token, serverSocketPath) t.Cleanup(func() { conn.Close() }) diff --git a/internal/gitaly/service/hook/pack_objects_test.go b/internal/gitaly/service/hook/pack_objects_test.go index 4d4d1a500..35dffadbc 100644 --- a/internal/gitaly/service/hook/pack_objects_test.go +++ b/internal/gitaly/service/hook/pack_objects_test.go @@ -858,6 +858,7 @@ func TestPackObjects_concurrencyLimit(t *testing.T) { cfg.Prometheus.GRPCLatencyBuckets, ) limiter := limiter.NewConcurrencyLimiter( + ctx, 1, 0, func() helper.Ticker { return ticker }, diff --git a/internal/grpc/middleware/limithandler/middleware.go b/internal/grpc/middleware/limithandler/middleware.go index 0c41a4672..ebbe44662 100644 --- a/internal/grpc/middleware/limithandler/middleware.go +++ b/internal/grpc/middleware/limithandler/middleware.go @@ -165,84 +165,88 @@ func (w *wrappedStream) RecvMsg(m interface{}) error { // WithConcurrencyLimiters sets up middleware to limit the concurrency of // requests based on RPC and repository -func WithConcurrencyLimiters(cfg config.Cfg, middleware *LimiterMiddleware) { - acquiringSecondsMetric := prometheus.NewHistogramVec( - prometheus.HistogramOpts{ - Namespace: "gitaly", - Subsystem: "concurrency_limiting", - Name: "acquiring_seconds", - Help: "Histogram of time calls are rate limited (in seconds)", - Buckets: cfg.Prometheus.GRPCLatencyBuckets, - }, - []string{"system", "grpc_service", "grpc_method"}, - ) - inProgressMetric := prometheus.NewGaugeVec( - prometheus.GaugeOpts{ - Namespace: "gitaly", - Subsystem: "concurrency_limiting", - Name: "in_progress", - Help: "Gauge of number of concurrent in-progress calls", - }, - []string{"system", "grpc_service", "grpc_method"}, - ) - queuedMetric := prometheus.NewGaugeVec( - prometheus.GaugeOpts{ - Namespace: "gitaly", - Subsystem: "concurrency_limiting", - Name: "queued", - Help: "Gauge of number of queued calls", - }, - []string{"system", "grpc_service", "grpc_method"}, - ) - - middleware.collect = func(metrics chan<- prometheus.Metric) { - acquiringSecondsMetric.Collect(metrics) - inProgressMetric.Collect(metrics) - queuedMetric.Collect(metrics) - } - - result := make(map[string]limiter.Limiter) - for _, limit := range cfg.Concurrency { - limit := limit +func WithConcurrencyLimiters(ctx context.Context) SetupFunc { + return func(cfg config.Cfg, middleware *LimiterMiddleware) { + acquiringSecondsMetric := prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: "gitaly", + Subsystem: "concurrency_limiting", + Name: "acquiring_seconds", + Help: "Histogram of time calls are rate limited (in seconds)", + Buckets: cfg.Prometheus.GRPCLatencyBuckets, + }, + []string{"system", "grpc_service", "grpc_method"}, + ) + inProgressMetric := prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: "gitaly", + Subsystem: "concurrency_limiting", + Name: "in_progress", + Help: "Gauge of number of concurrent in-progress calls", + }, + []string{"system", "grpc_service", "grpc_method"}, + ) + queuedMetric := prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: "gitaly", + Subsystem: "concurrency_limiting", + Name: "queued", + Help: "Gauge of number of queued calls", + }, + []string{"system", "grpc_service", "grpc_method"}, + ) - newTickerFunc := func() helper.Ticker { - return helper.NewManualTicker() + middleware.collect = func(metrics chan<- prometheus.Metric) { + acquiringSecondsMetric.Collect(metrics) + inProgressMetric.Collect(metrics) + queuedMetric.Collect(metrics) } - if limit.MaxQueueWait > 0 { - newTickerFunc = func() helper.Ticker { - return helper.NewTimerTicker(limit.MaxQueueWait.Duration()) + result := make(map[string]limiter.Limiter) + for _, limit := range cfg.Concurrency { + limit := limit + + newTickerFunc := func() helper.Ticker { + return helper.NewManualTicker() } + + if limit.MaxQueueWait > 0 { + newTickerFunc = func() helper.Ticker { + return helper.NewTimerTicker(limit.MaxQueueWait.Duration()) + } + } + + result[limit.RPC] = limiter.NewConcurrencyLimiter( + ctx, + limit.MaxPerRepo, + limit.MaxQueueSize, + newTickerFunc, + limiter.NewPerRPCPromMonitor( + "gitaly", limit.RPC, + queuedMetric, inProgressMetric, acquiringSecondsMetric, middleware.requestsDroppedMetric, + ), + ) } - result[limit.RPC] = limiter.NewConcurrencyLimiter( - limit.MaxPerRepo, - limit.MaxQueueSize, - newTickerFunc, - limiter.NewPerRPCPromMonitor( - "gitaly", limit.RPC, - queuedMetric, inProgressMetric, acquiringSecondsMetric, middleware.requestsDroppedMetric, - ), - ) - } + // Set default for ReplicateRepository. + replicateRepositoryFullMethod := "/gitaly.RepositoryService/ReplicateRepository" + if _, ok := result[replicateRepositoryFullMethod]; !ok { + result[replicateRepositoryFullMethod] = limiter.NewConcurrencyLimiter( + ctx, + 1, + 0, + func() helper.Ticker { + return helper.NewManualTicker() + }, + limiter.NewPerRPCPromMonitor( + "gitaly", replicateRepositoryFullMethod, + queuedMetric, inProgressMetric, acquiringSecondsMetric, middleware.requestsDroppedMetric, + ), + ) + } - // Set default for ReplicateRepository. - replicateRepositoryFullMethod := "/gitaly.RepositoryService/ReplicateRepository" - if _, ok := result[replicateRepositoryFullMethod]; !ok { - result[replicateRepositoryFullMethod] = limiter.NewConcurrencyLimiter( - 1, - 0, - func() helper.Ticker { - return helper.NewManualTicker() - }, - limiter.NewPerRPCPromMonitor( - "gitaly", replicateRepositoryFullMethod, - queuedMetric, inProgressMetric, acquiringSecondsMetric, middleware.requestsDroppedMetric, - ), - ) + middleware.methodLimiters = result } - - middleware.methodLimiters = result } // WithRateLimiters sets up a middleware with limiters that limit requests diff --git a/internal/grpc/middleware/limithandler/middleware_test.go b/internal/grpc/middleware/limithandler/middleware_test.go index 65adad2be..40978be69 100644 --- a/internal/grpc/middleware/limithandler/middleware_test.go +++ b/internal/grpc/middleware/limithandler/middleware_test.go @@ -38,6 +38,7 @@ func fixedLockKey(ctx context.Context) string { func TestUnaryLimitHandler(t *testing.T) { t.Parallel() + ctx := testhelper.Context(t) s := &queueTestServer{ server: server{ blockCh: make(chan struct{}), @@ -51,14 +52,13 @@ func TestUnaryLimitHandler(t *testing.T) { }, } - lh := limithandler.New(cfg, fixedLockKey, limithandler.WithConcurrencyLimiters) + lh := limithandler.New(cfg, fixedLockKey, limithandler.WithConcurrencyLimiters(ctx)) interceptor := lh.UnaryInterceptor() srv, serverSocketPath := runServer(t, s, grpc.UnaryInterceptor(interceptor)) defer srv.Stop() client, conn := newClient(t, serverSocketPath) defer conn.Close() - ctx := testhelper.Context(t) var wg sync.WaitGroup defer wg.Wait() @@ -114,7 +114,7 @@ func TestUnaryLimitHandler_queueing(t *testing.T) { MaxQueueWait: duration.Duration(time.Millisecond), }, }, - }, fixedLockKey, limithandler.WithConcurrencyLimiters) + }, fixedLockKey, limithandler.WithConcurrencyLimiters(ctx)) s := &queueTestServer{ server: server{ @@ -175,7 +175,7 @@ func TestUnaryLimitHandler_queueing(t *testing.T) { MaxPerRepo: 1, }, }, - }, fixedLockKey, limithandler.WithConcurrencyLimiters) + }, fixedLockKey, limithandler.WithConcurrencyLimiters(ctx)) s := &queueTestServer{ server: server{ @@ -427,6 +427,7 @@ func TestStreamLimitHandler(t *testing.T) { t.Run(tc.desc, func(t *testing.T) { t.Parallel() + ctx := testhelper.Context(t) s := &server{blockCh: make(chan struct{})} maxQueueSize := 1 @@ -440,14 +441,13 @@ func TestStreamLimitHandler(t *testing.T) { }, } - lh := limithandler.New(cfg, fixedLockKey, limithandler.WithConcurrencyLimiters) + lh := limithandler.New(cfg, fixedLockKey, limithandler.WithConcurrencyLimiters(ctx)) interceptor := lh.StreamInterceptor() srv, serverSocketPath := runServer(t, s, grpc.StreamInterceptor(interceptor)) defer srv.Stop() client, conn := newClient(t, serverSocketPath) defer conn.Close() - ctx := testhelper.Context(t) totalCalls := 10 @@ -481,6 +481,8 @@ func TestStreamLimitHandler(t *testing.T) { func TestStreamLimitHandler_error(t *testing.T) { t.Parallel() + ctx := testhelper.Context(t) + s := &queueTestServer{reqArrivedCh: make(chan struct{})} s.blockCh = make(chan struct{}) @@ -490,7 +492,7 @@ func TestStreamLimitHandler_error(t *testing.T) { }, } - lh := limithandler.New(cfg, fixedLockKey, limithandler.WithConcurrencyLimiters) + lh := limithandler.New(cfg, fixedLockKey, limithandler.WithConcurrencyLimiters(ctx)) interceptor := lh.StreamInterceptor() srv, serverSocketPath := runServer(t, s, grpc.StreamInterceptor(interceptor)) defer srv.Stop() @@ -498,8 +500,6 @@ func TestStreamLimitHandler_error(t *testing.T) { client, conn := newClient(t, serverSocketPath) defer conn.Close() - ctx := testhelper.Context(t) - respChan := make(chan *grpc_testing.StreamingOutputCallResponse) go func() { stream, err := client.FullDuplexCall(ctx) @@ -600,6 +600,8 @@ func (q *queueTestServer) FullDuplexCall(stream grpc_testing.TestService_FullDup } func TestConcurrencyLimitHandlerMetrics(t *testing.T) { + ctx := testhelper.Context(t) + s := &queueTestServer{reqArrivedCh: make(chan struct{})} s.blockCh = make(chan struct{}) @@ -610,7 +612,7 @@ func TestConcurrencyLimitHandlerMetrics(t *testing.T) { }, } - lh := limithandler.New(cfg, fixedLockKey, limithandler.WithConcurrencyLimiters) + lh := limithandler.New(cfg, fixedLockKey, limithandler.WithConcurrencyLimiters(ctx)) interceptor := lh.UnaryInterceptor() srv, serverSocketPath := runServer(t, s, grpc.UnaryInterceptor(interceptor)) defer srv.Stop() @@ -618,8 +620,6 @@ func TestConcurrencyLimitHandlerMetrics(t *testing.T) { client, conn := newClient(t, serverSocketPath) defer conn.Close() - ctx := testhelper.Context(t) - respCh := make(chan *grpc_testing.SimpleResponse) go func() { resp, err := client.UnaryCall(ctx, &grpc_testing.SimpleRequest{}) diff --git a/internal/limiter/concurrency_limiter.go b/internal/limiter/concurrency_limiter.go index 2807b856e..32b08b32e 100644 --- a/internal/limiter/concurrency_limiter.go +++ b/internal/limiter/concurrency_limiter.go @@ -122,6 +122,9 @@ func (sem *keyedConcurrencyLimiter) queueLength() int { // ConcurrencyLimiter contains rate limiter state. type ConcurrencyLimiter struct { + // ctx stores the context at initialization. This context is used as a stopping condition + // for some internal goroutines. + ctx context.Context // maxConcurrencyLimit is the maximum number of concurrent calls to the limited function. // This limit is per key. maxConcurrencyLimit int64 @@ -145,12 +148,13 @@ type ConcurrencyLimiter struct { } // NewConcurrencyLimiter creates a new concurrency rate limiter. -func NewConcurrencyLimiter(maxConcurrencyLimit, maxQueueLength int, maxQueuedTickerCreator QueueTickerCreator, monitor ConcurrencyMonitor) *ConcurrencyLimiter { +func NewConcurrencyLimiter(ctx context.Context, maxConcurrencyLimit, maxQueueLength int, maxQueuedTickerCreator QueueTickerCreator, monitor ConcurrencyMonitor) *ConcurrencyLimiter { if monitor == nil { monitor = NewNoopConcurrencyMonitor() } return &ConcurrencyLimiter{ + ctx: ctx, maxConcurrencyLimit: int64(maxConcurrencyLimit), maxQueueLength: int64(maxQueueLength), maxQueuedTickerCreator: maxQueuedTickerCreator, diff --git a/internal/limiter/concurrency_limiter_test.go b/internal/limiter/concurrency_limiter_test.go index f99ca7eab..b650ae5f5 100644 --- a/internal/limiter/concurrency_limiter_test.go +++ b/internal/limiter/concurrency_limiter_test.go @@ -155,6 +155,7 @@ func TestLimiter(t *testing.T) { gauge := &counter{} limiter := NewConcurrencyLimiter( + ctx, tt.maxConcurrency, 0, nil, @@ -248,7 +249,7 @@ func TestConcurrencyLimiter_queueLimit(t *testing.T) { monitorCh := make(chan struct{}) monitor := &blockingQueueCounter{queuedCh: monitorCh} ch := make(chan struct{}) - limiter := NewConcurrencyLimiter(1, queueLimit, nil, monitor) + limiter := NewConcurrencyLimiter(ctx, 1, queueLimit, nil, monitor) // occupied with one live request that takes a long time to complete go func() { @@ -333,6 +334,7 @@ func TestLimitConcurrency_queueWaitTime(t *testing.T) { monitor := &blockingDequeueCounter{dequeuedCh: dequeuedCh} limiter := NewConcurrencyLimiter( + ctx, 1, 0, func() helper.Ticker { diff --git a/internal/testhelper/testserver/gitaly.go b/internal/testhelper/testserver/gitaly.go index 51d29bcdc..cf76158b8 100644 --- a/internal/testhelper/testserver/gitaly.go +++ b/internal/testhelper/testserver/gitaly.go @@ -161,6 +161,8 @@ func waitHealthy(tb testing.TB, ctx context.Context, addr string, authToken stri func runGitaly(tb testing.TB, cfg config.Cfg, registrar func(srv *grpc.Server, deps *service.Dependencies), opts ...GitalyServerOpt) (*grpc.Server, string, bool) { tb.Helper() + ctx := testhelper.Context(tb) + var gsd gitalyServerDeps for _, opt := range opts { gsd = opt(gsd) @@ -173,7 +175,7 @@ func runGitaly(tb testing.TB, cfg config.Cfg, registrar func(srv *grpc.Server, d server.WithStreamInterceptor(StructErrStreamInterceptor), } - deps := gsd.createDependencies(tb, cfg) + deps := gsd.createDependencies(tb, ctx, cfg) tb.Cleanup(func() { gsd.conns.Close() }) serverFactory := server.NewGitalyServerFactory( @@ -201,7 +203,6 @@ func runGitaly(tb testing.TB, cfg config.Cfg, registrar func(srv *grpc.Server, d assert.NoError(tb, internalServer.Serve(internalListener), "failure to serve internal gRPC") }() - ctx := testhelper.Context(tb) waitHealthy(tb, ctx, "unix://"+internalListener.Addr().String(), cfg.Auth.Token) } @@ -238,7 +239,6 @@ func runGitaly(tb testing.TB, cfg config.Cfg, registrar func(srv *grpc.Server, d assert.NoError(tb, externalServer.Serve(listener), "failure to serve external gRPC") }() - ctx := testhelper.Context(tb) waitHealthy(tb, ctx, addr, cfg.Auth.Token) return externalServer, addr, gsd.disablePraefect @@ -276,7 +276,7 @@ type gitalyServerDeps struct { signingKey string } -func (gsd *gitalyServerDeps) createDependencies(tb testing.TB, cfg config.Cfg) *service.Dependencies { +func (gsd *gitalyServerDeps) createDependencies(tb testing.TB, ctx context.Context, cfg config.Cfg) *service.Dependencies { if gsd.logger == nil { gsd.logger = testhelper.NewGitalyServerLogger(tb) } @@ -328,6 +328,7 @@ func (gsd *gitalyServerDeps) createDependencies(tb testing.TB, cfg config.Cfg) * if gsd.packObjectsLimiter == nil { gsd.packObjectsLimiter = limiter.NewConcurrencyLimiter( + ctx, 0, 0, nil, @@ -336,7 +337,7 @@ func (gsd *gitalyServerDeps) createDependencies(tb testing.TB, cfg config.Cfg) * } if gsd.limitHandler == nil { - gsd.limitHandler = limithandler.New(cfg, limithandler.LimitConcurrencyByRepo, limithandler.WithConcurrencyLimiters) + gsd.limitHandler = limithandler.New(cfg, limithandler.LimitConcurrencyByRepo, limithandler.WithConcurrencyLimiters(ctx)) } if gsd.repositoryCounter == nil { |