diff options
author | Patrick Steinhardt <psteinhardt@gitlab.com> | 2023-08-17 10:43:26 +0300 |
---|---|---|
committer | Patrick Steinhardt <psteinhardt@gitlab.com> | 2023-08-17 10:46:16 +0300 |
commit | d83327df2f9c35c0cc46268d068c673341d7a4dc (patch) | |
tree | fa992712f3186d662427727caa2f7fc8d6ba987b | |
parent | b3d63188f0d22232335bf973f3a236c388816942 (diff) |
Revert introduction of adaptive pack-objects limiter
In 023c2a7d0 (Merge branch 'qmnguyen0711/add-adaptive-limit-to-pack-objects-limit'
into 'master', 2023-08-09), we have introduced new code to add an
adaptive limit to pack-objects. This new limiter is seemingly creating a
memory leak that causes issues in production systems.
Revert this merge request.
Changelog: fixed
-rw-r--r-- | internal/cli/gitaly/serve.go | 10 | ||||
-rw-r--r-- | internal/gitaly/server/auth_test.go | 31 | ||||
-rw-r--r-- | internal/gitaly/service/hook/pack_objects_test.go | 3 | ||||
-rw-r--r-- | internal/grpc/middleware/limithandler/middleware.go | 144 | ||||
-rw-r--r-- | internal/grpc/middleware/limithandler/middleware_test.go | 24 | ||||
-rw-r--r-- | internal/limiter/adaptive_calculator.go | 2 | ||||
-rw-r--r-- | internal/limiter/adaptive_calculator_test.go | 10 | ||||
-rw-r--r-- | internal/limiter/adaptive_limit.go | 57 | ||||
-rw-r--r-- | internal/limiter/adaptive_limit_test.go | 124 | ||||
-rw-r--r-- | internal/limiter/concurrency_limiter.go | 99 | ||||
-rw-r--r-- | internal/limiter/concurrency_limiter_test.go | 562 | ||||
-rw-r--r-- | internal/limiter/resizable_semaphore.go | 157 | ||||
-rw-r--r-- | internal/limiter/resizable_semaphore_test.go | 262 | ||||
-rw-r--r-- | internal/testhelper/testserver/gitaly.go | 13 |
14 files changed, 161 insertions, 1337 deletions
diff --git a/internal/cli/gitaly/serve.go b/internal/cli/gitaly/serve.go index 302fd880d..f8180cdd4 100644 --- a/internal/cli/gitaly/serve.go +++ b/internal/cli/gitaly/serve.go @@ -271,15 +271,10 @@ func run(cfg config.Cfg, logger logrus.FieldLogger) error { return fmt.Errorf("disk cache walkers: %w", err) } - // The pack-objects limit below is static at this stage. It's always equal to the initial limit, which uses - // MaxConcurrency config. - packObjectLimit := limiter.NewAdaptiveLimit("packObjects", limiter.AdaptiveSetting{ - Initial: cfg.PackObjectsLimiting.MaxConcurrency, - }) concurrencyLimitHandler := limithandler.New( cfg, limithandler.LimitConcurrencyByRepo, - limithandler.WithConcurrencyLimiters(ctx), + limithandler.WithConcurrencyLimiters, ) rateLimitHandler := limithandler.New( @@ -300,8 +295,7 @@ func run(cfg config.Cfg, logger logrus.FieldLogger) error { } } packObjectsLimiter := limiter.NewConcurrencyLimiter( - ctx, - packObjectLimit, + cfg.PackObjectsLimiting.MaxConcurrency, cfg.PackObjectsLimiting.MaxQueueLength, newTickerFunc, packObjectsMonitor, diff --git a/internal/gitaly/server/auth_test.go b/internal/gitaly/server/auth_test.go index c845bd68f..6333b5942 100644 --- a/internal/gitaly/server/auth_test.go +++ b/internal/gitaly/server/auth_test.go @@ -1,7 +1,7 @@ package server import ( - "context" + netctx "context" "crypto/tls" "crypto/x509" "fmt" @@ -44,7 +44,7 @@ func TestMain(m *testing.M) { } func TestSanity(t *testing.T) { - serverSocketPath := runServer(t, testhelper.Context(t), testcfg.Build(t)) + serverSocketPath := runServer(t, testcfg.Build(t)) conn, err := dial(serverSocketPath, []grpc.DialOption{grpc.WithTransportCredentials(insecure.NewCredentials())}) require.NoError(t, err) @@ -55,8 +55,7 @@ func TestSanity(t *testing.T) { func TestTLSSanity(t *testing.T) { cfg := testcfg.Build(t) - ctx := testhelper.Context(t) - addr := runSecureServer(t, ctx, cfg) + addr := runSecureServer(t, cfg) certPool, err := x509.SystemCertPool() require.NoError(t, err) @@ -103,7 +102,7 @@ func TestAuthFailures(t *testing.T) { Auth: auth.Config{Token: "quxbaz"}, })) - serverSocketPath := runServer(t, testhelper.Context(t), cfg) + serverSocketPath := runServer(t, cfg) connOpts := append(tc.opts, grpc.WithTransportCredentials(insecure.NewCredentials())) conn, err := dial(serverSocketPath, connOpts) require.NoError(t, err, tc.desc) @@ -146,7 +145,7 @@ func TestAuthSuccess(t *testing.T) { Auth: auth.Config{Token: tc.token, Transitioning: !tc.required}, })) - serverSocketPath := runServer(t, testhelper.Context(t), cfg) + serverSocketPath := runServer(t, cfg) connOpts := append(tc.opts, grpc.WithTransportCredentials(insecure.NewCredentials())) conn, err := dial(serverSocketPath, connOpts) require.NoError(t, err, tc.desc) @@ -159,7 +158,7 @@ func TestAuthSuccess(t *testing.T) { type brokenAuth struct{} func (brokenAuth) RequireTransportSecurity() bool { return false } -func (brokenAuth) GetRequestMetadata(context.Context, ...string) (map[string]string, error) { +func (brokenAuth) GetRequestMetadata(netctx.Context, ...string) (map[string]string, error) { return map[string]string{"authorization": "Bearer blablabla"}, nil } @@ -187,7 +186,7 @@ func newOperationClient(t *testing.T, token, serverSocketPath string) (gitalypb. return gitalypb.NewOperationServiceClient(conn), conn } -func runServer(t *testing.T, ctx context.Context, cfg config.Cfg) string { +func runServer(t *testing.T, cfg config.Cfg) string { t.Helper() registry := backchannel.NewRegistry() @@ -202,7 +201,7 @@ func runServer(t *testing.T, ctx context.Context, cfg config.Cfg) string { catfileCache := catfile.NewCache(cfg) t.Cleanup(catfileCache.Stop) diskCache := cache.New(cfg, locator) - limitHandler := limithandler.New(cfg, limithandler.LimitConcurrencyByRepo, limithandler.WithConcurrencyLimiters(ctx)) + limitHandler := limithandler.New(cfg, limithandler.LimitConcurrencyByRepo, limithandler.WithConcurrencyLimiters) updaterWithHooks := updateref.NewUpdaterWithHooks(cfg, locator, hookManager, gitCmdFactory, catfileCache) srv, err := NewGitalyServerFactory(cfg, testhelper.NewDiscardingLogEntry(t), registry, diskCache, []*limithandler.LimiterMiddleware{limitHandler}).New(false) @@ -229,7 +228,7 @@ func runServer(t *testing.T, ctx context.Context, cfg config.Cfg) string { } //go:generate openssl req -newkey rsa:4096 -new -nodes -x509 -days 3650 -out testdata/gitalycert.pem -keyout testdata/gitalykey.pem -subj "/C=US/ST=California/L=San Francisco/O=GitLab/OU=GitLab-Shell/CN=localhost" -addext "subjectAltName = IP:127.0.0.1, DNS:localhost" -func runSecureServer(t *testing.T, ctx context.Context, cfg config.Cfg) string { +func runSecureServer(t *testing.T, cfg config.Cfg) string { t.Helper() cfg.TLS = config.TLS{ @@ -245,7 +244,7 @@ func runSecureServer(t *testing.T, ctx context.Context, cfg config.Cfg) string { testhelper.NewDiscardingLogEntry(t), backchannel.NewRegistry(), cache.New(cfg, config.NewLocator(cfg)), - []*limithandler.LimiterMiddleware{limithandler.New(cfg, limithandler.LimitConcurrencyByRepo, limithandler.WithConcurrencyLimiters(ctx))}, + []*limithandler.LimiterMiddleware{limithandler.New(cfg, limithandler.LimitConcurrencyByRepo, limithandler.WithConcurrencyLimiters)}, ).New(true) require.NoError(t, err) @@ -260,12 +259,12 @@ func runSecureServer(t *testing.T, ctx context.Context, cfg config.Cfg) string { func TestUnaryNoAuth(t *testing.T) { cfg := testcfg.Build(t, testcfg.WithBase(config.Cfg{Auth: auth.Config{Token: "testtoken"}})) - ctx := testhelper.Context(t) - path := runServer(t, ctx, cfg) + path := runServer(t, cfg) conn, err := grpc.Dial(path, grpc.WithTransportCredentials(insecure.NewCredentials())) require.NoError(t, err) defer testhelper.MustClose(t, conn) + ctx := testhelper.Context(t) client := gitalypb.NewRepositoryServiceClient(conn) _, err = client.CreateRepository(ctx, &gitalypb.CreateRepositoryRequest{ @@ -281,12 +280,12 @@ func TestUnaryNoAuth(t *testing.T) { func TestStreamingNoAuth(t *testing.T) { cfg := testcfg.Build(t, testcfg.WithBase(config.Cfg{Auth: auth.Config{Token: "testtoken"}})) - ctx := testhelper.Context(t) - path := runServer(t, ctx, cfg) + path := runServer(t, cfg) conn, err := dial(path, []grpc.DialOption{grpc.WithTransportCredentials(insecure.NewCredentials())}) require.NoError(t, err) t.Cleanup(func() { conn.Close() }) + ctx := testhelper.Context(t) client := gitalypb.NewRepositoryServiceClient(conn) stream, err := client.GetInfoAttributes(ctx, &gitalypb.GetInfoAttributesRequest{ @@ -331,7 +330,7 @@ func TestAuthBeforeLimit(t *testing.T) { t.Cleanup(cleanup) cfg.Gitlab.URL = gitlabURL - serverSocketPath := runServer(t, ctx, cfg) + serverSocketPath := runServer(t, cfg) client, conn := newOperationClient(t, cfg.Auth.Token, serverSocketPath) t.Cleanup(func() { conn.Close() }) diff --git a/internal/gitaly/service/hook/pack_objects_test.go b/internal/gitaly/service/hook/pack_objects_test.go index 62fa0dca8..4d4d1a500 100644 --- a/internal/gitaly/service/hook/pack_objects_test.go +++ b/internal/gitaly/service/hook/pack_objects_test.go @@ -858,8 +858,7 @@ func TestPackObjects_concurrencyLimit(t *testing.T) { cfg.Prometheus.GRPCLatencyBuckets, ) limiter := limiter.NewConcurrencyLimiter( - ctx, - limiter.NewAdaptiveLimit("staticLimit", limiter.AdaptiveSetting{Initial: 1}), + 1, 0, func() helper.Ticker { return ticker }, monitor, diff --git a/internal/grpc/middleware/limithandler/middleware.go b/internal/grpc/middleware/limithandler/middleware.go index a6ed93f9a..0c41a4672 100644 --- a/internal/grpc/middleware/limithandler/middleware.go +++ b/internal/grpc/middleware/limithandler/middleware.go @@ -165,88 +165,84 @@ func (w *wrappedStream) RecvMsg(m interface{}) error { // WithConcurrencyLimiters sets up middleware to limit the concurrency of // requests based on RPC and repository -func WithConcurrencyLimiters(ctx context.Context) SetupFunc { - return func(cfg config.Cfg, middleware *LimiterMiddleware) { - acquiringSecondsMetric := prometheus.NewHistogramVec( - prometheus.HistogramOpts{ - Namespace: "gitaly", - Subsystem: "concurrency_limiting", - Name: "acquiring_seconds", - Help: "Histogram of time calls are rate limited (in seconds)", - Buckets: cfg.Prometheus.GRPCLatencyBuckets, - }, - []string{"system", "grpc_service", "grpc_method"}, - ) - inProgressMetric := prometheus.NewGaugeVec( - prometheus.GaugeOpts{ - Namespace: "gitaly", - Subsystem: "concurrency_limiting", - Name: "in_progress", - Help: "Gauge of number of concurrent in-progress calls", - }, - []string{"system", "grpc_service", "grpc_method"}, - ) - queuedMetric := prometheus.NewGaugeVec( - prometheus.GaugeOpts{ - Namespace: "gitaly", - Subsystem: "concurrency_limiting", - Name: "queued", - Help: "Gauge of number of queued calls", - }, - []string{"system", "grpc_service", "grpc_method"}, - ) - - middleware.collect = func(metrics chan<- prometheus.Metric) { - acquiringSecondsMetric.Collect(metrics) - inProgressMetric.Collect(metrics) - queuedMetric.Collect(metrics) - } +func WithConcurrencyLimiters(cfg config.Cfg, middleware *LimiterMiddleware) { + acquiringSecondsMetric := prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: "gitaly", + Subsystem: "concurrency_limiting", + Name: "acquiring_seconds", + Help: "Histogram of time calls are rate limited (in seconds)", + Buckets: cfg.Prometheus.GRPCLatencyBuckets, + }, + []string{"system", "grpc_service", "grpc_method"}, + ) + inProgressMetric := prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: "gitaly", + Subsystem: "concurrency_limiting", + Name: "in_progress", + Help: "Gauge of number of concurrent in-progress calls", + }, + []string{"system", "grpc_service", "grpc_method"}, + ) + queuedMetric := prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: "gitaly", + Subsystem: "concurrency_limiting", + Name: "queued", + Help: "Gauge of number of queued calls", + }, + []string{"system", "grpc_service", "grpc_method"}, + ) + + middleware.collect = func(metrics chan<- prometheus.Metric) { + acquiringSecondsMetric.Collect(metrics) + inProgressMetric.Collect(metrics) + queuedMetric.Collect(metrics) + } - result := make(map[string]limiter.Limiter) - for _, limit := range cfg.Concurrency { - limit := limit + result := make(map[string]limiter.Limiter) + for _, limit := range cfg.Concurrency { + limit := limit - newTickerFunc := func() helper.Ticker { - return helper.NewManualTicker() - } + newTickerFunc := func() helper.Ticker { + return helper.NewManualTicker() + } - if limit.MaxQueueWait > 0 { - newTickerFunc = func() helper.Ticker { - return helper.NewTimerTicker(limit.MaxQueueWait.Duration()) - } + if limit.MaxQueueWait > 0 { + newTickerFunc = func() helper.Ticker { + return helper.NewTimerTicker(limit.MaxQueueWait.Duration()) } - - result[limit.RPC] = limiter.NewConcurrencyLimiter( - ctx, - limiter.NewAdaptiveLimit("staticLimit", limiter.AdaptiveSetting{Initial: limit.MaxPerRepo}), - limit.MaxQueueSize, - newTickerFunc, - limiter.NewPerRPCPromMonitor( - "gitaly", limit.RPC, - queuedMetric, inProgressMetric, acquiringSecondsMetric, middleware.requestsDroppedMetric, - ), - ) } - // Set default for ReplicateRepository. - replicateRepositoryFullMethod := "/gitaly.RepositoryService/ReplicateRepository" - if _, ok := result[replicateRepositoryFullMethod]; !ok { - result[replicateRepositoryFullMethod] = limiter.NewConcurrencyLimiter( - ctx, - limiter.NewAdaptiveLimit("staticLimit", limiter.AdaptiveSetting{Initial: 1}), - 0, - func() helper.Ticker { - return helper.NewManualTicker() - }, - limiter.NewPerRPCPromMonitor( - "gitaly", replicateRepositoryFullMethod, - queuedMetric, inProgressMetric, acquiringSecondsMetric, middleware.requestsDroppedMetric, - ), - ) - } + result[limit.RPC] = limiter.NewConcurrencyLimiter( + limit.MaxPerRepo, + limit.MaxQueueSize, + newTickerFunc, + limiter.NewPerRPCPromMonitor( + "gitaly", limit.RPC, + queuedMetric, inProgressMetric, acquiringSecondsMetric, middleware.requestsDroppedMetric, + ), + ) + } - middleware.methodLimiters = result + // Set default for ReplicateRepository. + replicateRepositoryFullMethod := "/gitaly.RepositoryService/ReplicateRepository" + if _, ok := result[replicateRepositoryFullMethod]; !ok { + result[replicateRepositoryFullMethod] = limiter.NewConcurrencyLimiter( + 1, + 0, + func() helper.Ticker { + return helper.NewManualTicker() + }, + limiter.NewPerRPCPromMonitor( + "gitaly", replicateRepositoryFullMethod, + queuedMetric, inProgressMetric, acquiringSecondsMetric, middleware.requestsDroppedMetric, + ), + ) } + + middleware.methodLimiters = result } // WithRateLimiters sets up a middleware with limiters that limit requests diff --git a/internal/grpc/middleware/limithandler/middleware_test.go b/internal/grpc/middleware/limithandler/middleware_test.go index 40978be69..65adad2be 100644 --- a/internal/grpc/middleware/limithandler/middleware_test.go +++ b/internal/grpc/middleware/limithandler/middleware_test.go @@ -38,7 +38,6 @@ func fixedLockKey(ctx context.Context) string { func TestUnaryLimitHandler(t *testing.T) { t.Parallel() - ctx := testhelper.Context(t) s := &queueTestServer{ server: server{ blockCh: make(chan struct{}), @@ -52,13 +51,14 @@ func TestUnaryLimitHandler(t *testing.T) { }, } - lh := limithandler.New(cfg, fixedLockKey, limithandler.WithConcurrencyLimiters(ctx)) + lh := limithandler.New(cfg, fixedLockKey, limithandler.WithConcurrencyLimiters) interceptor := lh.UnaryInterceptor() srv, serverSocketPath := runServer(t, s, grpc.UnaryInterceptor(interceptor)) defer srv.Stop() client, conn := newClient(t, serverSocketPath) defer conn.Close() + ctx := testhelper.Context(t) var wg sync.WaitGroup defer wg.Wait() @@ -114,7 +114,7 @@ func TestUnaryLimitHandler_queueing(t *testing.T) { MaxQueueWait: duration.Duration(time.Millisecond), }, }, - }, fixedLockKey, limithandler.WithConcurrencyLimiters(ctx)) + }, fixedLockKey, limithandler.WithConcurrencyLimiters) s := &queueTestServer{ server: server{ @@ -175,7 +175,7 @@ func TestUnaryLimitHandler_queueing(t *testing.T) { MaxPerRepo: 1, }, }, - }, fixedLockKey, limithandler.WithConcurrencyLimiters(ctx)) + }, fixedLockKey, limithandler.WithConcurrencyLimiters) s := &queueTestServer{ server: server{ @@ -427,7 +427,6 @@ func TestStreamLimitHandler(t *testing.T) { t.Run(tc.desc, func(t *testing.T) { t.Parallel() - ctx := testhelper.Context(t) s := &server{blockCh: make(chan struct{})} maxQueueSize := 1 @@ -441,13 +440,14 @@ func TestStreamLimitHandler(t *testing.T) { }, } - lh := limithandler.New(cfg, fixedLockKey, limithandler.WithConcurrencyLimiters(ctx)) + lh := limithandler.New(cfg, fixedLockKey, limithandler.WithConcurrencyLimiters) interceptor := lh.StreamInterceptor() srv, serverSocketPath := runServer(t, s, grpc.StreamInterceptor(interceptor)) defer srv.Stop() client, conn := newClient(t, serverSocketPath) defer conn.Close() + ctx := testhelper.Context(t) totalCalls := 10 @@ -481,8 +481,6 @@ func TestStreamLimitHandler(t *testing.T) { func TestStreamLimitHandler_error(t *testing.T) { t.Parallel() - ctx := testhelper.Context(t) - s := &queueTestServer{reqArrivedCh: make(chan struct{})} s.blockCh = make(chan struct{}) @@ -492,7 +490,7 @@ func TestStreamLimitHandler_error(t *testing.T) { }, } - lh := limithandler.New(cfg, fixedLockKey, limithandler.WithConcurrencyLimiters(ctx)) + lh := limithandler.New(cfg, fixedLockKey, limithandler.WithConcurrencyLimiters) interceptor := lh.StreamInterceptor() srv, serverSocketPath := runServer(t, s, grpc.StreamInterceptor(interceptor)) defer srv.Stop() @@ -500,6 +498,8 @@ func TestStreamLimitHandler_error(t *testing.T) { client, conn := newClient(t, serverSocketPath) defer conn.Close() + ctx := testhelper.Context(t) + respChan := make(chan *grpc_testing.StreamingOutputCallResponse) go func() { stream, err := client.FullDuplexCall(ctx) @@ -600,8 +600,6 @@ func (q *queueTestServer) FullDuplexCall(stream grpc_testing.TestService_FullDup } func TestConcurrencyLimitHandlerMetrics(t *testing.T) { - ctx := testhelper.Context(t) - s := &queueTestServer{reqArrivedCh: make(chan struct{})} s.blockCh = make(chan struct{}) @@ -612,7 +610,7 @@ func TestConcurrencyLimitHandlerMetrics(t *testing.T) { }, } - lh := limithandler.New(cfg, fixedLockKey, limithandler.WithConcurrencyLimiters(ctx)) + lh := limithandler.New(cfg, fixedLockKey, limithandler.WithConcurrencyLimiters) interceptor := lh.UnaryInterceptor() srv, serverSocketPath := runServer(t, s, grpc.UnaryInterceptor(interceptor)) defer srv.Stop() @@ -620,6 +618,8 @@ func TestConcurrencyLimitHandlerMetrics(t *testing.T) { client, conn := newClient(t, serverSocketPath) defer conn.Close() + ctx := testhelper.Context(t) + respCh := make(chan *grpc_testing.SimpleResponse) go func() { resp, err := client.UnaryCall(ctx, &grpc_testing.SimpleRequest{}) diff --git a/internal/limiter/adaptive_calculator.go b/internal/limiter/adaptive_calculator.go index 793fc853c..15d7cf87b 100644 --- a/internal/limiter/adaptive_calculator.go +++ b/internal/limiter/adaptive_calculator.go @@ -262,7 +262,7 @@ func (c *AdaptiveCalculator) calibrateLimits(ctx context.Context) { }).Debugf("Additive increase") } else { // Multiplicative decrease - newLimit = int(math.Floor(float64(limit.Current()) * setting.BackoffFactor)) + newLimit = int(math.Floor(float64(limit.Current()) * setting.BackoffBackoff)) if newLimit < setting.Min { newLimit = setting.Min } diff --git a/internal/limiter/adaptive_calculator_test.go b/internal/limiter/adaptive_calculator_test.go index 5c2a5dae7..f4461d27d 100644 --- a/internal/limiter/adaptive_calculator_test.go +++ b/internal/limiter/adaptive_calculator_test.go @@ -659,14 +659,12 @@ func (l *testLimit) Update(val int) { l.currents = append(l.currents, val) } -func (*testLimit) AfterUpdate(_ AfterUpdateHook) {} - func (l *testLimit) Setting() AdaptiveSetting { return AdaptiveSetting{ - Initial: l.initial, - Max: l.max, - Min: l.min, - BackoffFactor: l.backoffBackoff, + Initial: l.initial, + Max: l.max, + Min: l.min, + BackoffBackoff: l.backoffBackoff, } } diff --git a/internal/limiter/adaptive_limit.go b/internal/limiter/adaptive_limit.go index 5af9f8ebd..2aefe5eb0 100644 --- a/internal/limiter/adaptive_limit.go +++ b/internal/limiter/adaptive_limit.go @@ -1,49 +1,30 @@ package limiter -import ( - "sync" -) +import "sync/atomic" // AdaptiveSetting is a struct that holds the configuration parameters for an adaptive limiter. type AdaptiveSetting struct { - Initial int - Max int - Min int - BackoffFactor float64 + Initial int + Max int + Min int + BackoffBackoff float64 } -// AfterUpdateHook is a function hook that is triggered when the current value changes. The callers need to register a hook to -// the AdaptiveLimiter implementation beforehand. They are required to handle errors inside the hook function. -type AfterUpdateHook func(newVal int) - // AdaptiveLimiter is an interface for managing and updating adaptive limits. // It exposes methods to get the name, current limit value, update the limit value, and access its settings. type AdaptiveLimiter interface { Name() string Current() int Update(val int) - AfterUpdate(AfterUpdateHook) Setting() AdaptiveSetting } // AdaptiveLimit is an implementation of the AdaptiveLimiter interface. It uses an atomic Int32 to represent the current // limit value, ensuring thread-safe updates. type AdaptiveLimit struct { - sync.RWMutex - - name string - current int - setting AdaptiveSetting - updateHooks []AfterUpdateHook -} - -// NewAdaptiveLimit initializes a new AdaptiveLimit object -func NewAdaptiveLimit(name string, setting AdaptiveSetting) *AdaptiveLimit { - return &AdaptiveLimit{ - name: name, - current: setting.Initial, - setting: setting, - } + name string + current atomic.Int32 + setting AdaptiveSetting } // Name returns the name of the adaptive limit @@ -53,30 +34,12 @@ func (l *AdaptiveLimit) Name() string { // Current returns the current limit. This function can be called without the need for synchronization. func (l *AdaptiveLimit) Current() int { - l.RLock() - defer l.RUnlock() - - return l.current + return int(l.current.Load()) } // Update adjusts current limit value. func (l *AdaptiveLimit) Update(val int) { - l.Lock() - defer l.Unlock() - - if val != l.current { - l.current = val - for _, hook := range l.updateHooks { - hook(val) - } - } -} - -// AfterUpdate registers a callback when the current limit is updated. Because all updates and hooks are synchronized, -// calling Current() inside the update hook in the same goroutine will cause deadlock. Hence, the update hook must -// use the new value passed as the argument instead. -func (l *AdaptiveLimit) AfterUpdate(hook AfterUpdateHook) { - l.updateHooks = append(l.updateHooks, hook) + l.current.Store(int32(val)) } // Setting returns the configuration parameters for an adaptive limiter. diff --git a/internal/limiter/adaptive_limit_test.go b/internal/limiter/adaptive_limit_test.go deleted file mode 100644 index f6c67f092..000000000 --- a/internal/limiter/adaptive_limit_test.go +++ /dev/null @@ -1,124 +0,0 @@ -package limiter - -import ( - "testing" - - "github.com/stretchr/testify/require" -) - -func TestAdaptiveLimit_New(t *testing.T) { - t.Parallel() - - setting := AdaptiveSetting{ - Initial: 5, - Max: 10, - Min: 1, - BackoffFactor: 0.5, - } - - limit := NewAdaptiveLimit("testLimit", setting) - require.Equal(t, limit.Name(), "testLimit") - require.Equal(t, limit.Current(), 5) - require.Equal(t, limit.Setting(), setting) -} - -func TestAdaptiveLimit_Update(t *testing.T) { - t.Parallel() - - newLimit := func() *AdaptiveLimit { - return NewAdaptiveLimit("testLimit", AdaptiveSetting{ - Initial: 5, - Max: 10, - Min: 1, - BackoffFactor: 0.5, - }) - } - - t.Run("without update hooks", func(t *testing.T) { - limit := newLimit() - - limit.Update(1) - require.Equal(t, 1, limit.Current()) - - limit.Update(2) - require.Equal(t, 2, limit.Current()) - - limit.Update(3) - require.Equal(t, 3, limit.Current()) - }) - - t.Run("new values are different from old values", func(t *testing.T) { - limit := newLimit() - - vals := []int{} - limit.AfterUpdate(func(val int) { - vals = append(vals, val) - }) - - limit.Update(1) - require.Equal(t, 1, limit.Current()) - require.Equal(t, vals, []int{1}) - - limit.Update(2) - require.Equal(t, 2, limit.Current()) - require.Equal(t, vals, []int{1, 2}) - - limit.Update(3) - require.Equal(t, 3, limit.Current()) - require.Equal(t, vals, []int{1, 2, 3}) - }) - - t.Run("new values are the same as old values", func(t *testing.T) { - limit := newLimit() - - vals := []int{} - limit.AfterUpdate(func(val int) { - vals = append(vals, val) - }) - - limit.Update(1) - require.Equal(t, 1, limit.Current()) - require.Equal(t, vals, []int{1}) - - limit.Update(1) - require.Equal(t, 1, limit.Current()) - require.Equal(t, vals, []int{1}) - - limit.Update(2) - require.Equal(t, 2, limit.Current()) - require.Equal(t, vals, []int{1, 2}) - - limit.Update(2) - require.Equal(t, 2, limit.Current()) - require.Equal(t, vals, []int{1, 2}) - }) - - t.Run("multiple update hooks", func(t *testing.T) { - limit := newLimit() - - vals1 := []int{} - limit.AfterUpdate(func(val int) { - vals1 = append(vals1, val) - }) - - vals2 := []int{} - limit.AfterUpdate(func(val int) { - vals2 = append(vals2, val*2) - }) - - limit.Update(1) - require.Equal(t, 1, limit.Current()) - require.Equal(t, vals1, []int{1}) - require.Equal(t, vals2, []int{2}) - - limit.Update(2) - require.Equal(t, 2, limit.Current()) - require.Equal(t, vals1, []int{1, 2}) - require.Equal(t, vals2, []int{2, 4}) - - limit.Update(3) - require.Equal(t, 3, limit.Current()) - require.Equal(t, vals1, []int{1, 2, 3}) - require.Equal(t, vals2, []int{2, 4, 6}) - }) -} diff --git a/internal/limiter/concurrency_limiter.go b/internal/limiter/concurrency_limiter.go index b4284adeb..2807b856e 100644 --- a/internal/limiter/concurrency_limiter.go +++ b/internal/limiter/concurrency_limiter.go @@ -41,10 +41,10 @@ type keyedConcurrencyLimiter struct { // concurrencyTokens is the channel of available concurrency tokens, where every token // allows one concurrent call to the concurrency-limited function. - concurrencyTokens *resizableSemaphore + concurrencyTokens chan struct{} // queueTokens is the channel of available queue tokens, where every token allows one // concurrent call to be admitted to the queue. - queueTokens *resizableSemaphore + queueTokens chan struct{} } // acquire tries to acquire the semaphore. It may fail if the admission queue is full or if the max @@ -55,33 +55,34 @@ func (sem *keyedConcurrencyLimiter) acquire(ctx context.Context, limitingKey str // callers may wait for the concurrency token at the same time. If there are no more // queueing tokens then this indicates that the queue is full and we thus return an // error immediately. - if !sem.queueTokens.TryAcquire() { + select { + case sem.queueTokens <- struct{}{}: + // We have acquired a queueing token, so we need to release it if acquiring + // the concurrency token fails. If we succeed to acquire the concurrency + // token though then we retain the queueing token until the caller signals + // that the concurrency-limited function has finished. As a consequence the + // queue token is returned together with the concurrency token. + // + // A simpler model would be to just have `maxQueueLength` many queueing + // tokens. But this would add concurrency-limiting when acquiring the queue + // token itself, which is not what we want to do. Instead, we want to admit + // as many callers into the queue as the queue length permits plus the + // number of available concurrency tokens allows. + defer func() { + if returnedErr != nil { + <-sem.queueTokens + } + }() + default: return ErrMaxQueueSize } - - // We have acquired a queueing token, so we need to release it if acquiring - // the concurrency token fails. If we successfully acquire the concurrency - // token though then we retain the queueing token until the caller signals - // that the concurrency-limited function has finished. As a consequence the - // queue token is returned together with the concurrency token. - // - // A simpler model would be to just have `maxQueueLength` many queueing - // tokens. But this would add concurrency-limiting when acquiring the queue - // token itself, which is not what we want to do. Instead, we want to admit - // as many callers into the queue as the queue length permits plus the - // number of available concurrency tokens allows. - defer func() { - if returnedErr != nil { - sem.queueTokens.Release() - } - }() } // We are queued now, so let's tell the monitor. Furthermore, even though we're still // holding the queueing token when this function exits successfully we also tell the monitor // that we have exited the queue. It is only an implementation detail anyway that we hold on // to the token, so the monitor shouldn't care about that. - sem.monitor.Queued(ctx, limitingKey, sem.queueLength()) + sem.monitor.Queued(ctx, limitingKey, len(sem.queueTokens)) defer sem.monitor.Dequeued(ctx) // Set up the ticker that keeps us from waiting indefinitely on the concurrency token. @@ -97,12 +98,7 @@ func (sem *keyedConcurrencyLimiter) acquire(ctx context.Context, limitingKey str // Try to acquire the concurrency token now that we're in the queue. select { - case acquired := <-sem.concurrencyTokens.Acquire(): - // When the semaphore returns false, the semaphore was stopped. It's likely due to the context is - // cancelled. Hence, we should return the error here. - if !acquired { - return sem.concurrencyTokens.Err() - } + case sem.concurrencyTokens <- struct{}{}: return nil case <-ticker.C(): return ErrMaxQueueTime @@ -114,27 +110,21 @@ func (sem *keyedConcurrencyLimiter) acquire(ctx context.Context, limitingKey str // release releases the acquired tokens. func (sem *keyedConcurrencyLimiter) release() { if sem.queueTokens != nil { - sem.queueTokens.Release() + <-sem.queueTokens } - sem.concurrencyTokens.Release() + <-sem.concurrencyTokens } // queueLength returns the length of token queue func (sem *keyedConcurrencyLimiter) queueLength() int { - if sem.queueTokens == nil { - return 0 - } - return int(sem.queueTokens.Current()) + return len(sem.queueTokens) } // ConcurrencyLimiter contains rate limiter state. type ConcurrencyLimiter struct { - // ctx stores the context at initialization. This context is used as a stopping condition - // for some internal goroutines. - ctx context.Context - // limit is the adaptive maximum number of concurrent calls to the limited function. This limit is - // calculated adaptively from an outside calculator. - limit *AdaptiveLimit + // maxConcurrencyLimit is the maximum number of concurrent calls to the limited function. + // This limit is per key. + maxConcurrencyLimit int64 // maxQueueLength is the maximum number of operations allowed to wait in a queued state. // This limit is global and applies before the concurrency limit. Subsequent incoming // operations will be rejected with an error immediately. @@ -155,31 +145,18 @@ type ConcurrencyLimiter struct { } // NewConcurrencyLimiter creates a new concurrency rate limiter. -func NewConcurrencyLimiter(ctx context.Context, limit *AdaptiveLimit, maxQueueLength int, maxQueuedTickerCreator QueueTickerCreator, monitor ConcurrencyMonitor) *ConcurrencyLimiter { +func NewConcurrencyLimiter(maxConcurrencyLimit, maxQueueLength int, maxQueuedTickerCreator QueueTickerCreator, monitor ConcurrencyMonitor) *ConcurrencyLimiter { if monitor == nil { monitor = NewNoopConcurrencyMonitor() } - limiter := &ConcurrencyLimiter{ - ctx: ctx, - limit: limit, + return &ConcurrencyLimiter{ + maxConcurrencyLimit: int64(maxConcurrencyLimit), maxQueueLength: int64(maxQueueLength), maxQueuedTickerCreator: maxQueuedTickerCreator, monitor: monitor, limitsByKey: make(map[string]*keyedConcurrencyLimiter), } - - // When the capacity of the limiter is updated we also need to update the size of both the queuing tokens as - // well as the concurrency tokens to match the new size. - limit.AfterUpdate(func(val int) { - for _, keyedLimiter := range limiter.limitsByKey { - if keyedLimiter.queueTokens != nil { - keyedLimiter.queueTokens.Resize(int64(val) + limiter.maxQueueLength) - } - keyedLimiter.concurrencyTokens.Resize(int64(val)) - } - }) - return limiter } // Limit will limit the concurrency of the limited function f. There are two distinct mechanisms @@ -199,7 +176,7 @@ func (c *ConcurrencyLimiter) Limit(ctx context.Context, limitingKey string, f Li ) defer span.Finish() - if c.currentLimit() <= 0 { + if c.maxConcurrencyLimit <= 0 { return f() } @@ -245,15 +222,15 @@ func (c *ConcurrencyLimiter) getConcurrencyLimit(limitingKey string) *keyedConcu // Set up the queue tokens in case a maximum queue length was requested. As the // queue tokens are kept during the whole lifetime of the concurrency-limited // function we add the concurrency tokens to the number of available token. - var queueTokens *resizableSemaphore + var queueTokens chan struct{} if c.maxQueueLength > 0 { - queueTokens = NewResizableSemaphore(c.ctx, c.currentLimit()+c.maxQueueLength) + queueTokens = make(chan struct{}, c.maxConcurrencyLimit+c.maxQueueLength) } c.limitsByKey[limitingKey] = &keyedConcurrencyLimiter{ monitor: c.monitor, maxQueuedTickerCreator: c.maxQueuedTickerCreator, - concurrencyTokens: NewResizableSemaphore(c.ctx, c.currentLimit()), + concurrencyTokens: make(chan struct{}, c.maxConcurrencyLimit), queueTokens: queueTokens, } } @@ -291,7 +268,3 @@ func (c *ConcurrencyLimiter) countSemaphores() int { return len(c.limitsByKey) } - -func (c *ConcurrencyLimiter) currentLimit() int64 { - return int64(c.limit.Current()) -} diff --git a/internal/limiter/concurrency_limiter_test.go b/internal/limiter/concurrency_limiter_test.go index 6ba48d243..f99ca7eab 100644 --- a/internal/limiter/concurrency_limiter_test.go +++ b/internal/limiter/concurrency_limiter_test.go @@ -3,7 +3,6 @@ package limiter import ( "context" "errors" - "fmt" "strconv" "sync" "testing" @@ -87,7 +86,7 @@ func (c *counter) Dropped(_ context.Context, _ string, _ int, _ time.Duration, r } } -func TestLimiter_static(t *testing.T) { +func TestLimiter(t *testing.T) { t.Parallel() tests := []struct { @@ -156,8 +155,7 @@ func TestLimiter_static(t *testing.T) { gauge := &counter{} limiter := NewConcurrencyLimiter( - ctx, - NewAdaptiveLimit("staticLimit", AdaptiveSetting{Initial: tt.maxConcurrency}), + tt.maxConcurrency, 0, nil, gauge, @@ -231,557 +229,6 @@ func TestLimiter_static(t *testing.T) { } } -func TestLimiter_dynamic(t *testing.T) { - t.Parallel() - - t.Run("increase dynamic limit when there is no queuing request", func(t *testing.T) { - ctx := testhelper.Context(t) - limit := NewAdaptiveLimit("dynamicLimit", AdaptiveSetting{Initial: 5, Max: 10, Min: 1}) - gauge := &blockingQueueCounter{queuedCh: make(chan struct{})} - limiter := NewConcurrencyLimiter(ctx, limit, 10, nil, gauge) - - // 5 requests acquired the tokens, the limiter is full now - release1, waitAfterRelease1 := spawnAndWaitAcquired(t, ctx, "1", limiter, gauge, 5) - require.Equal(t, 5, gauge.enter) - - // Update the limit to 7 - limit.Update(7) - - // 2 more requests acquired the token. This proves the limit is expanded - release2, waitAfterRelease2 := spawnAndWaitAcquired(t, ctx, "1", limiter, gauge, 2) - require.Equal(t, 7, gauge.enter) - - close(release1) - close(release2) - waitAfterRelease1() - waitAfterRelease2() - require.Equal(t, 7, gauge.exit) - }) - - t.Run("decrease dynamic limit when there is no queuing request", func(t *testing.T) { - ctx := testhelper.Context(t) - limit := NewAdaptiveLimit("dynamicLimit", AdaptiveSetting{Initial: 5, Max: 10, Min: 1}) - gauge := &blockingQueueCounter{queuedCh: make(chan struct{})} - limiter := NewConcurrencyLimiter(ctx, limit, 10, nil, gauge) - - // 3 requests acquired the tokens, 2 slots left - release1, waitAfterRelease1 := spawnAndWaitAcquired(t, ctx, "1", limiter, gauge, 3) - require.Equal(t, 3, gauge.enter) - require.Equal(t, 3, gauge.queued) - - // Update the limit to 3 - limit.Update(3) - - // 2 requests are put in queue, meaning the limit shrinks down - waitAcquired2, release2, waitAfterRelease2 := spawnAndWaitQueued(t, ctx, "1", limiter, gauge, 2) - require.Equal(t, 3, gauge.enter) - require.Equal(t, 5, gauge.queued) - - // Release first 3 requests - close(release1) - waitAfterRelease1() - - // Now the last 2 requests can acquire token - waitAcquired2() - require.Equal(t, 5, gauge.enter) - require.Equal(t, 5, gauge.queued) - require.Equal(t, 3, gauge.exit) - - // Release the last patch - close(release2) - waitAfterRelease2() - require.Equal(t, 5, gauge.exit) - }) - - t.Run("increase dynamic limit more than the number of queuing requests", func(t *testing.T) { - ctx := testhelper.Context(t) - limit := NewAdaptiveLimit("dynamicLimit", AdaptiveSetting{Initial: 5, Max: 10, Min: 1}) - gauge := &blockingQueueCounter{queuedCh: make(chan struct{})} - limiter := NewConcurrencyLimiter(ctx, limit, 10, nil, gauge) - - // 5 requests acquired the tokens, the limiter is full now - release1, waitAfterRelease1 := spawnAndWaitAcquired(t, ctx, "1", limiter, gauge, 5) - require.Equal(t, 5, gauge.enter) - - // 2 requests waiting in the queue - waitAcquired2, release2, waitAfterRelease2 := spawnAndWaitQueued(t, ctx, "1", limiter, gauge, 2) - require.Equal(t, 5, gauge.enter) - require.Equal(t, 7, gauge.queued) - - // Update the limit to 7 - limit.Update(7) - - // Wait for the other 2 requests acquired the token. This proves the limiter is expanded - waitAcquired2() - require.Equal(t, 7, gauge.enter) - - close(release1) - close(release2) - waitAfterRelease1() - waitAfterRelease2() - require.Equal(t, 7, gauge.exit) - }) - - t.Run("increase dynamic limit less than the number of queuing requests", func(t *testing.T) { - ctx := testhelper.Context(t) - limit := NewAdaptiveLimit("dynamicLimit", AdaptiveSetting{Initial: 5, Max: 10, Min: 1}) - gauge := &blockingQueueCounter{queuedCh: make(chan struct{})} - limiter := NewConcurrencyLimiter(ctx, limit, 10, nil, gauge) - - // 5 requests acquired the tokens, the limiter is full now - release1, waitAfterRelease1 := spawnAndWaitAcquired(t, ctx, "1", limiter, gauge, 5) - require.Equal(t, 5, gauge.enter) - - // 2 requests waiting in the queue - waitAcquired2, release2, waitAfterRelease2 := spawnAndWaitQueued(t, ctx, "1", limiter, gauge, 2) - require.Equal(t, 5, gauge.enter) - require.Equal(t, 7, gauge.queued) - - // 5 more requests waiting in the queue - waitAcquired3, release3, waitAfterRelease3 := spawnAndWaitQueued(t, ctx, "1", limiter, gauge, 5) - require.Equal(t, 5, gauge.enter) - require.Equal(t, 12, gauge.queued) - - // Update the limit to 7. - limit.Update(7) - - // Release first 5 requests, all requests should fit in the queue now. - close(release1) - waitAfterRelease1() - require.Equal(t, 5, gauge.exit) - - waitAcquired2() - waitAcquired3() - require.Equal(t, 12, gauge.enter) - - // Now release all requests - close(release2) - close(release3) - waitAfterRelease2() - waitAfterRelease3() - require.Equal(t, 12, gauge.exit) - }) - - t.Run("decrease dynamic limit less than the number of concurrent requests", func(t *testing.T) { - ctx := testhelper.Context(t) - limit := NewAdaptiveLimit("dynamicLimit", AdaptiveSetting{Initial: 5, Max: 10, Min: 1}) - gauge := &blockingQueueCounter{queuedCh: make(chan struct{})} - limiter := NewConcurrencyLimiter(ctx, limit, 10, nil, gauge) - - // 5 requests acquired the tokens, the limiter is full now - release1, waitAfterRelease1 := spawnAndWaitAcquired(t, ctx, "1", limiter, gauge, 5) - require.Equal(t, 5, gauge.enter) - - // Update the limit to 3 - limit.Update(3) - - // 3 requests are put in queue - waitAcquired2, release2, waitAfterRelease2 := spawnAndWaitQueued(t, ctx, "1", limiter, gauge, 3) - require.Equal(t, 5, gauge.enter) - require.Equal(t, 8, gauge.queued) - - // Release the first 5 requests - close(release1) - waitAfterRelease1() - require.Equal(t, 5, gauge.exit) - - // Now the last 3 requests acquire the tokens - waitAcquired2() - require.Equal(t, 8, gauge.enter) - - // 1 more request is put in queue, meaning the limit shrinks down to 3. - waitAcquired3, release3, waitAfterRelease3 := spawnAndWaitQueued(t, ctx, "1", limiter, gauge, 1) - require.Equal(t, 8, gauge.enter) - require.Equal(t, 9, gauge.queued) - - // Release the second 3 requests - close(release2) - waitAfterRelease2() - require.Equal(t, 8, gauge.exit) - - // The last request acquires the token - waitAcquired3() - require.Equal(t, 9, gauge.enter) - - // Release the last request - close(release3) - waitAfterRelease3() - require.Equal(t, 9, gauge.exit) - }) - - t.Run("increase and decrease dynamic limit multiple times", func(t *testing.T) { - ctx := testhelper.Context(t) - limit := NewAdaptiveLimit("dynamicLimit", AdaptiveSetting{Initial: 5, Max: 10, Min: 1}) - gauge := &blockingQueueCounter{queuedCh: make(chan struct{})} - limiter := NewConcurrencyLimiter(ctx, limit, 10, nil, gauge) - - // Update the limit to 7 - limit.Update(7) - - // 5 requests acquired the tokens - release1, waitAfterRelease1 := spawnAndWaitAcquired(t, ctx, "1", limiter, gauge, 5) - require.Equal(t, 5, gauge.enter) - - // Update the limit to 3 - limit.Update(3) - - // 3 requests are put in queue - waitAcquired2, release2, waitAfterRelease2 := spawnAndWaitQueued(t, ctx, "1", limiter, gauge, 3) - require.Equal(t, 5, gauge.enter) - require.Equal(t, 8, gauge.queued) - - // Update the limit to 10 - limit.Update(10) - - // All existing requests acquire the tokens - waitAcquired2() - require.Equal(t, 8, gauge.enter) - - // 2 more requests - release3, waitAfterRelease3 := spawnAndWaitAcquired(t, ctx, "1", limiter, gauge, 2) - require.Equal(t, 10, gauge.enter) - - // Update the limit to 1 - limit.Update(1) - - // Now release all of them - close(release1) - waitAfterRelease1() - require.Equal(t, 5, gauge.exit) - - close(release2) - waitAfterRelease2() - require.Equal(t, 8, gauge.exit) - - close(release3) - waitAfterRelease3() - require.Equal(t, 10, gauge.exit) - }) - - t.Run("increase the limit when the queue is full", func(t *testing.T) { - ctx := testhelper.Context(t) - limit := NewAdaptiveLimit("dynamicLimit", AdaptiveSetting{Initial: 1, Max: 10, Min: 1}) - gauge := &blockingQueueCounter{queuedCh: make(chan struct{})} - // Mind the queue length here - limiter := NewConcurrencyLimiter(ctx, limit, 5, nil, gauge) - - // 1 requests acquired the tokens, the limiter is full now - release1, waitAfterRelease1 := spawnAndWaitAcquired(t, ctx, "1", limiter, gauge, 1) - require.Equal(t, 1, gauge.enter) - require.Equal(t, 1, gauge.queued) - - // 5 requests queuing for the tokens, the queue is full now - waitAcquired2, release2, waitAfterRelease2 := spawnAndWaitQueued(t, ctx, "1", limiter, gauge, 5) - require.Equal(t, 1, gauge.enter) - require.Equal(t, 6, gauge.queued) - - // Limiter rejects new request - maximumQueueSizeReached(t, ctx, "1", limiter) - - // Update the limit - limit.Update(6) - waitAcquired2() - require.Equal(t, 6, gauge.enter) - - // 5 requests queuing for the tokens, the queue is full now - waitAcquired3, release3, waitAfterRelease3 := spawnAndWaitQueued(t, ctx, "1", limiter, gauge, 5) - require.Equal(t, 6, gauge.enter) - require.Equal(t, 11, gauge.queued) - - // Limiter rejects new request - maximumQueueSizeReached(t, ctx, "1", limiter) - - // Clean up - close(release1) - close(release2) - waitAfterRelease1() - waitAfterRelease2() - require.Equal(t, 6, gauge.exit) - - waitAcquired3() - require.Equal(t, 11, gauge.enter) - close(release3) - waitAfterRelease3() - }) - - t.Run("decrease the limit when the queue is full", func(t *testing.T) { - ctx := testhelper.Context(t) - limit := NewAdaptiveLimit("dynamicLimit", AdaptiveSetting{Initial: 5, Max: 10, Min: 1}) - gauge := &blockingQueueCounter{queuedCh: make(chan struct{})} - // Mind the queue length here - limiter := NewConcurrencyLimiter(ctx, limit, 3, nil, gauge) - - // 5 requests acquired the tokens, the limiter is full now - release1, waitAfterRelease1 := spawnAndWaitAcquired(t, ctx, "1", limiter, gauge, 5) - require.Equal(t, 5, gauge.enter) - require.Equal(t, 5, gauge.queued) - - // 5 requests queuing for the tokens, the queue is full now - waitAcquired2, release2, waitAfterRelease2 := spawnAndWaitQueued(t, ctx, "1", limiter, gauge, 3) - require.Equal(t, 5, gauge.enter) - require.Equal(t, 8, gauge.queued) - - // Limiter rejects new request - maximumQueueSizeReached(t, ctx, "1", limiter) - - // Update the limit. - limit.Update(3) - - // The queue is still full - maximumQueueSizeReached(t, ctx, "1", limiter) - - // Release first 5 requests and let the last 3 requests in - close(release1) - waitAfterRelease1() - require.Equal(t, 5, gauge.exit) - waitAcquired2() - require.Equal(t, 8, gauge.enter) - - // Another 5 requests in queue. The queue is still full, meaning the concurrency is 3 and the queue is still 5. - waitAcquired3, release3, waitAfterRelease3 := spawnAndWaitQueued(t, ctx, "1", limiter, gauge, 3) - require.Equal(t, 8, gauge.enter) - require.Equal(t, 11, gauge.queued) - maximumQueueSizeReached(t, ctx, "1", limiter) - - // Clean up - close(release2) - waitAfterRelease2() - require.Equal(t, 8, gauge.exit) - waitAcquired3() - require.Equal(t, 11, gauge.enter) - close(release3) - waitAfterRelease3() - require.Equal(t, 11, gauge.exit) - }) - - t.Run("dynamic limit works without queuing", func(t *testing.T) { - ctx := testhelper.Context(t) - limit := NewAdaptiveLimit("dynamicLimit", AdaptiveSetting{Initial: 5, Max: 10, Min: 1}) - gauge := &blockingQueueCounter{queuedCh: make(chan struct{})} - // No queue, it means the limiter accepts unlimited requests - limiter := NewConcurrencyLimiter(ctx, limit, 0, nil, gauge) - - // 5 requests acquired the tokens, the limiter is full now - release1, waitAfterRelease1 := spawnAndWaitAcquired(t, ctx, "1", limiter, gauge, 5) - require.Equal(t, 5, gauge.enter) - require.Equal(t, 5, gauge.queued) - - // 5 more requests - waitAcquired2, release2, waitAfterRelease2 := spawnAndWaitQueued(t, ctx, "1", limiter, gauge, 5) - - // Update the limit. - limit.Update(10) - - // All of them acquired the tokens - waitAcquired2() - require.Equal(t, 10, gauge.enter) - - // Clean up - close(release1) - close(release2) - waitAfterRelease1() - waitAfterRelease2() - require.Equal(t, 10, gauge.exit) - }) - - t.Run("dynamic limit works with queue timer", func(t *testing.T) { - ctx := testhelper.Context(t) - limit := NewAdaptiveLimit("dynamicLimit", AdaptiveSetting{Initial: 5, Max: 10, Min: 1}) - gauge := &blockingQueueCounter{queuedCh: make(chan struct{})} - - ticker := helper.NewManualTicker() - limiter := NewConcurrencyLimiter(ctx, limit, 0, func() helper.Ticker { return ticker }, gauge) - - // 5 requests acquired the tokens, the limiter is full now - release1, waitAfterRelease1 := spawnAndWaitAcquired(t, ctx, "1", limiter, gauge, 5) - require.Equal(t, 5, gauge.enter) - require.Equal(t, 5, gauge.queued) - - errors := make(chan error, 10) - // 5 requests in queue - spawnQueuedAndCollectErrors(ctx, "1", limiter, gauge, 5, errors) - - // Decrease the limit - limit.Update(3) - - // 5 more requests in queue - spawnQueuedAndCollectErrors(ctx, "1", limiter, gauge, 5, errors) - - // Trigger timeout event - for i := 0; i < 10; i++ { - ticker.Tick() - require.EqualError(t, <-errors, "maximum time in concurrency queue reached") - } - - // Other goroutines exit as normal - close(release1) - waitAfterRelease1() - require.Equal(t, 5, gauge.exit) - }) - - t.Run("dynamic limit works with context cancellation", func(t *testing.T) { - ctx := testhelper.Context(t) - ctx2, cancel := context.WithCancel(testhelper.Context(t)) - - limit := NewAdaptiveLimit("dynamicLimit", AdaptiveSetting{Initial: 5, Max: 10, Min: 1}) - gauge := &blockingQueueCounter{queuedCh: make(chan struct{})} - - limiter := NewConcurrencyLimiter(ctx, limit, 0, nil, gauge) - - // 5 requests acquired the tokens, the limiter is full now - release1, waitAfterRelease1 := spawnAndWaitAcquired(t, ctx, "1", limiter, gauge, 5) - require.Equal(t, 5, gauge.enter) - require.Equal(t, 5, gauge.queued) - - errors := make(chan error, 10) - // 5 requests in queue - spawnQueuedAndCollectErrors(ctx2, "1", limiter, gauge, 5, errors) - - // Decrease the limit - limit.Update(3) - - // 5 more requests in queue - spawnQueuedAndCollectErrors(ctx2, "1", limiter, gauge, 5, errors) - - // Trigger context cancellation - cancel() - for i := 0; i < 10; i++ { - require.EqualError(t, <-errors, "unexpected error when dequeueing request: context canceled") - } - - // Other goroutines exit as normal - close(release1) - waitAfterRelease1() - require.Equal(t, 5, gauge.exit) - }) - - t.Run("dynamic limit works with multiple buckets", func(t *testing.T) { - ctx := testhelper.Context(t) - limit := NewAdaptiveLimit("dynamicLimit", AdaptiveSetting{Initial: 5, Max: 10, Min: 1}) - gauge := &blockingQueueCounter{queuedCh: make(chan struct{})} - - limiter := NewConcurrencyLimiter(ctx, limit, 5, nil, gauge) - - var releaseChans []chan struct{} - var waitAcquireFuncs, waitReleaseFuncs []func() - - // 5 * 5 requests acquired tokens - for i := 1; i <= 5; i++ { - release, waitAfterRelease := spawnAndWaitAcquired(t, ctx, fmt.Sprintf("%d", i), limiter, gauge, 5) - releaseChans = append(releaseChans, release) - waitReleaseFuncs = append(waitReleaseFuncs, waitAfterRelease) - } - require.Equal(t, 25, gauge.enter) - - // 1 + 2 + 3 + 4 + 5 requests are in queue - for i := 1; i <= 5; i++ { - waitAcquired, release, waitAfterRelease := spawnAndWaitQueued(t, ctx, fmt.Sprintf("%d", i), limiter, gauge, i) - waitAcquireFuncs = append(waitAcquireFuncs, waitAcquired) - releaseChans = append(releaseChans, release) - waitReleaseFuncs = append(waitReleaseFuncs, waitAfterRelease) - } - require.Equal(t, 25, gauge.enter) - require.Equal(t, 40, gauge.queued) - - // Update limit, enough for all requests - limit.Update(10) - - // All requests acquired tokens now - for _, wait := range waitAcquireFuncs { - wait() - } - require.Equal(t, 40, gauge.enter) - - // Release all - for _, release := range releaseChans { - close(release) - } - for _, wait := range waitReleaseFuncs { - wait() - } - require.Equal(t, 40, gauge.exit) - }) -} - -// spawnAndWaitAcquired spawns N goroutines that wait for the limiter. They wait until all of them acquire the limiter -// token before exiting. This function returns a channel to control token release and a function to wait until all -// goroutines finish. -func spawnAndWaitAcquired(t *testing.T, ctx context.Context, bucket string, limiter *ConcurrencyLimiter, gauge *blockingQueueCounter, n int) (chan struct{}, func()) { - var acquireWg, releaseWg sync.WaitGroup - release := make(chan struct{}) - - for i := 0; i < n; i++ { - acquireWg.Add(1) - releaseWg.Add(1) - go func() { - defer releaseWg.Done() - _, err := limiter.Limit(ctx, bucket, func() (resp interface{}, err error) { - acquireWg.Done() - <-release - return nil, nil - }) - require.NoError(t, err) - }() - } - for i := 0; i < n; i++ { - <-gauge.queuedCh - gauge.queued++ - } - acquireWg.Wait() - - return release, releaseWg.Wait -} - -// spawnAndWaitQueued spawns N goroutines that wait for the limiter. They wait until all of them are queued. This -// function returns a function to wait for channel to acquired the token, a channel to control token release, and a -// function to wait until all goroutines finish. -func spawnAndWaitQueued(t *testing.T, ctx context.Context, bucket string, limiter *ConcurrencyLimiter, gauge *blockingQueueCounter, n int) (func(), chan struct{}, func()) { - var acquireWg, releaseWg sync.WaitGroup - release := make(chan struct{}) - - for i := 0; i < n; i++ { - acquireWg.Add(1) - releaseWg.Add(1) - go func() { - defer releaseWg.Done() - _, err := limiter.Limit(ctx, bucket, func() (resp interface{}, err error) { - acquireWg.Done() - <-release - return nil, nil - }) - require.NoError(t, err) - }() - } - for i := 0; i < n; i++ { - <-gauge.queuedCh - gauge.queued++ - } - - return acquireWg.Wait, release, releaseWg.Wait -} - -func spawnQueuedAndCollectErrors(ctx context.Context, bucket string, limiter *ConcurrencyLimiter, gauge *blockingQueueCounter, n int, errors chan error) { - for i := 0; i < n; i++ { - go func() { - _, err := limiter.Limit(ctx, bucket, func() (interface{}, error) { - return nil, fmt.Errorf("should not call") - }) - errors <- err - }() - } - for i := 0; i < n; i++ { - <-gauge.queuedCh - gauge.queued++ - } -} - -func maximumQueueSizeReached(t *testing.T, ctx context.Context, bucket string, limiter *ConcurrencyLimiter) { - _, err := limiter.Limit(ctx, bucket, func() (interface{}, error) { - return nil, fmt.Errorf("should not call") - }) - require.EqualError(t, err, "maximum queue size reached") -} - type blockingQueueCounter struct { counter @@ -801,7 +248,7 @@ func TestConcurrencyLimiter_queueLimit(t *testing.T) { monitorCh := make(chan struct{}) monitor := &blockingQueueCounter{queuedCh: monitorCh} ch := make(chan struct{}) - limiter := NewConcurrencyLimiter(ctx, NewAdaptiveLimit("staticLimit", AdaptiveSetting{Initial: 1}), queueLimit, nil, monitor) + limiter := NewConcurrencyLimiter(1, queueLimit, nil, monitor) // occupied with one live request that takes a long time to complete go func() { @@ -886,8 +333,7 @@ func TestLimitConcurrency_queueWaitTime(t *testing.T) { monitor := &blockingDequeueCounter{dequeuedCh: dequeuedCh} limiter := NewConcurrencyLimiter( - ctx, - NewAdaptiveLimit("staticLimit", AdaptiveSetting{Initial: 1}), + 1, 0, func() helper.Ticker { return ticker diff --git a/internal/limiter/resizable_semaphore.go b/internal/limiter/resizable_semaphore.go deleted file mode 100644 index 19434afc3..000000000 --- a/internal/limiter/resizable_semaphore.go +++ /dev/null @@ -1,157 +0,0 @@ -package limiter - -import ( - "context" - "sync/atomic" -) - -// resizableSemaphore struct models provides a way to bound concurrent access to resources. It allows a certain number -// of concurrent access to the resources. When the concurrency reaches the semaphore size, the callers are blocked until -// a resource is available again. The size of the semaphore can be resized freely in an atomic manner. -// -// Internally, a separate goroutine manages the semaphore's functionality, providing synchronization across all channel -// operations. Callers acquire the semaphore by pulling a "token" through an `acquireCh` channel via `acquire()`, and -// return them to an `releaseCh` channel via `release()`. This goroutine ensures that tokens are properly distributed -// from those channels, and also manages the semaphore's current length and size. It processes resize requests and -// handles try requests and responses, thus ensuring a smooth operation. -// -// Note: This struct is not intended to serve as a general-purpose data structure but is specifically designed for -// flexible concurrency control with resizable capacity. -type resizableSemaphore struct { - // err stores the error of why the sempahore is stopped. Most of the case, it's due to context cancellation. - err atomic.Pointer[error] - // current represents the current concurrency access to the resources. - current atomic.Int64 - // size is the maximum capacity of the semaphore. It represents the maximum concurrency that the resources can - // be accessed of the time. - size atomic.Int64 - // releaseCh is a channel used to return tokens back to the semaphore. When a caller returns a token, it sends a signal to this channel. - releaseCh chan struct{} - // acquireCh is a channel used for callers to acquire a token. When a token is available, a signal is sent to this channel. - acquireCh chan bool - // tryAcquireCh is a channel used to signal a try request. A try request is a non-blocking request to acquire a token. - tryAcquireCh chan struct{} - // tryAcquireRespCh is a channel used to respond to a try request. If a token is available, a nil error is sent; otherwise, an error is sent. - tryAcquireRespCh chan bool - // resizeCh is a channel used to request a resize of the semaphore's capacity. The requested new capacity is sent to this channel. - resizeCh chan int64 - // stopCh is a channel that determines whether the semaphore is stopped. - stopCh chan struct{} -} - -// NewResizableSemaphore creates and starts a resizableSemaphore -func NewResizableSemaphore(ctx context.Context, capacity int64) *resizableSemaphore { - s := &resizableSemaphore{ - stopCh: make(chan struct{}), - releaseCh: make(chan struct{}), - acquireCh: make(chan bool), - tryAcquireCh: make(chan struct{}), - tryAcquireRespCh: make(chan bool), - resizeCh: make(chan int64), - } - s.size.Store(capacity) - go s.start(ctx) - - return s -} - -// start kicks off a goroutine that maintains the states of the semaphore. It acts as an event loop that listens to -// all modifications. -func (q *resizableSemaphore) start(ctx context.Context) { - for { - if len := q.current.Load(); len < q.size.Load() { - select { - case <-q.releaseCh: - q.current.Store(len - 1) - case q.acquireCh <- true: - q.current.Store(len + 1) - case <-q.tryAcquireCh: - q.current.Store(len + 1) - q.tryAcquireRespCh <- true - case newSize := <-q.resizeCh: - // If the new capacity is greater than the prior one, the capacity grows without - // the need to do anything. It allows more callers to acquire the token in the - // right next iteration. - // In contrast, when the new capacity is less than the prior one, the capacity - // shrinks and the length becomes bigger than the new capacity. That's perfectly - // fine. All existing callers continue, new callers can't acquire new token. The - // length will be naturally adjusted below the capacity overtime. - q.size.Store(newSize) - case <-ctx.Done(): - q.stop(ctx.Err()) - return - } - } else { - select { - case <-q.releaseCh: - q.current.Store(len - 1) - case <-q.tryAcquireCh: - q.tryAcquireRespCh <- false - case newSize := <-q.resizeCh: - // Simiarly to the above case, overriding the capacity is enough. - q.size.Store(newSize) - case <-ctx.Done(): - q.stop(ctx.Err()) - return - } - } - } -} - -func (q *resizableSemaphore) stop(err error) { - q.current.Store(0) - q.err.Store(&err) - - close(q.stopCh) - // The only exposed channel. After this channel is closed, caller receives `false` when trying to pull from this - // channel. Other state-modifying channels stay instact but TryAcquire(), Release(), and Resize() returns - // immediately. - close(q.acquireCh) -} - -// stores the error of why the sempahore is stopped. Most of the case, it's due to context cancellation. -func (q *resizableSemaphore) Err() error { - return *q.err.Load() -} - -// Acquire returns a channel that allows the caller to acquire the semaphore. The caller is blocked until there -// is an available resource. It is not safe to use with a `switch` statement having `default` statement. In such -// use cases, use TryAcquire() instead. -func (q *resizableSemaphore) Acquire() <-chan bool { - return q.acquireCh -} - -// TryAcquire acquires the semaphore without blocking. On success, returns true. On failure, returns false and leaves -// the semaphore unchanged. -func (q *resizableSemaphore) TryAcquire() bool { - select { - case q.tryAcquireCh <- struct{}{}: - return <-q.tryAcquireRespCh - case <-q.stopCh: - return false - } -} - -// Release releases the semaphore by pushing the token back. If the semaphore stops, this function returns immediately. -func (q *resizableSemaphore) Release() { - select { - case q.releaseCh <- struct{}{}: - case <-q.stopCh: - // No op, return immediately. - } -} - -// Current returns the amount of current concurrent access to the semaphore. -func (q *resizableSemaphore) Current() int64 { - return q.current.Load() -} - -// Resize modifies the size of the semaphore. If the semaphore stops, this function returns immediately. -func (q *resizableSemaphore) Resize(newSize int64) { - select { - case q.resizeCh <- newSize: - case <-q.stopCh: - // This update is redundant, but it's a nice gesture that the state of semaphore is up-to-date. - q.size.Store(newSize) - } -} diff --git a/internal/limiter/resizable_semaphore_test.go b/internal/limiter/resizable_semaphore_test.go deleted file mode 100644 index 00ac90097..000000000 --- a/internal/limiter/resizable_semaphore_test.go +++ /dev/null @@ -1,262 +0,0 @@ -package limiter - -import ( - "context" - "sync" - "testing" - - "github.com/stretchr/testify/require" - "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper" -) - -func TestResizableSemaphore_New(t *testing.T) { - t.Parallel() - - semaphore := NewResizableSemaphore(testhelper.Context(t), 5) - require.Equal(t, int64(0), semaphore.Current()) -} - -func TestResizableSemaphore_Stopped(t *testing.T) { - t.Parallel() - - ctx, cancel := context.WithCancel(testhelper.Context(t)) - semaphore := NewResizableSemaphore(ctx, 5) - - // Try to acquire a token of the empty sempahore - require.True(t, semaphore.TryAcquire()) - semaphore.Release() - - // 5 goroutines acquired semaphore - beforeCallRelease := make(chan struct{}) - var acquireWg1, releaseWg1 sync.WaitGroup - for i := 0; i < 5; i++ { - acquireWg1.Add(1) - releaseWg1.Add(1) - go func() { - require.True(t, <-semaphore.Acquire()) - acquireWg1.Done() - - <-beforeCallRelease - - semaphore.Release() - releaseWg1.Done() - }() - } - acquireWg1.Wait() - - // Another 5 waits for sempahore - var acquireWg2 sync.WaitGroup - for i := 0; i < 5; i++ { - acquireWg2.Add(1) - go func() { - // This goroutine is block until the context is cancel, which returns false - require.False(t, <-semaphore.Acquire()) - acquireWg2.Done() - }() - } - - // Try to acquire a token of the full semaphore - require.False(t, semaphore.TryAcquire()) - - // Cancel the context - cancel() - acquireWg2.Wait() - - // The first 5 goroutines can call Release() even if the semaphore stopped - close(beforeCallRelease) - releaseWg1.Wait() - - // The last 5 goroutines exits immediately, Acquire() returns false - acquireWg2.Wait() - - require.False(t, semaphore.TryAcquire()) - require.Equal(t, int64(0), semaphore.Current()) - require.Equal(t, ctx.Err(), semaphore.Err()) -} - -func TestResizableSemaphore_Acquire(t *testing.T) { - t.Parallel() - - t.Run("acquire less than the capacity", func(t *testing.T) { - semaphore := NewResizableSemaphore(testhelper.Context(t), 5) - - waitBeforeRelease, waitRelease := acquireSemaphore(t, semaphore, 4) - require.NotNil(t, <-semaphore.Acquire()) - - close(waitBeforeRelease) - waitRelease() - }) - - t.Run("acquire more than the capacity", func(t *testing.T) { - semaphore := NewResizableSemaphore(testhelper.Context(t), 5) - - waitBeforeRelease, waitRelease := acquireSemaphore(t, semaphore, 5) - require.False(t, semaphore.TryAcquire()) - - close(waitBeforeRelease) - waitRelease() - }) - - t.Run("semaphore is full then available again", func(t *testing.T) { - semaphore := NewResizableSemaphore(testhelper.Context(t), 5) - - waitChan := make(chan bool) - _, _ = acquireSemaphore(t, semaphore, 5) - - go func() { - waitChan <- <-semaphore.Acquire() - }() - - // The semaphore is full now - require.False(t, semaphore.TryAcquire()) - - // Release one token - semaphore.Release() - // The waiting channel is unlocked - require.True(t, <-waitChan) - - // Release another token - semaphore.Release() - // Now TryAcquire can pull out a token - require.True(t, semaphore.TryAcquire()) - }) - - t.Run("the semaphore is resized up when empty", func(t *testing.T) { - semaphore := NewResizableSemaphore(testhelper.Context(t), 5) - semaphore.Resize(10) - - waitBeforeRelease, waitRelease := acquireSemaphore(t, semaphore, 9) - require.NotNil(t, <-semaphore.Acquire()) - - close(waitBeforeRelease) - waitRelease() - }) - - t.Run("the semaphore is resized up when not empty", func(t *testing.T) { - semaphore := NewResizableSemaphore(testhelper.Context(t), 7) - - waitBeforeRelease1, waitRelease1 := acquireSemaphore(t, semaphore, 5) - semaphore.Resize(15) - waitBeforeRelease2, waitRelease2 := acquireSemaphore(t, semaphore, 5) - - require.NotNil(t, <-semaphore.Acquire()) - - close(waitBeforeRelease1) - close(waitBeforeRelease2) - waitRelease1() - waitRelease2() - }) - - t.Run("the semaphore is resized up when full", func(t *testing.T) { - semaphore := NewResizableSemaphore(testhelper.Context(t), 5) - - waitBeforeRelease1, waitRelease1 := acquireSemaphore(t, semaphore, 5) - require.False(t, semaphore.TryAcquire()) - - semaphore.Resize(10) - - waitBeforeRelease2, waitRelease2 := acquireSemaphore(t, semaphore, 5) - require.False(t, semaphore.TryAcquire()) - - var wg sync.WaitGroup - for i := 0; i < 5; i++ { - wg.Add(1) - go func() { - <-semaphore.Acquire() - wg.Done() - semaphore.Release() - }() - } - - semaphore.Resize(15) - wg.Wait() - - close(waitBeforeRelease1) - close(waitBeforeRelease2) - waitRelease1() - waitRelease2() - }) - - t.Run("the semaphore is resized down when empty", func(t *testing.T) { - semaphore := NewResizableSemaphore(testhelper.Context(t), 10) - semaphore.Resize(5) - - waitBeforeRelease, waitRelease := acquireSemaphore(t, semaphore, 4) - require.NotNil(t, <-semaphore.Acquire()) - - close(waitBeforeRelease) - waitRelease() - }) - - t.Run("the semaphore is resized down when not empty", func(t *testing.T) { - semaphore := NewResizableSemaphore(testhelper.Context(t), 20) - - waitBeforeRelease1, waitRelease1 := acquireSemaphore(t, semaphore, 5) - semaphore.Resize(15) - waitBeforeRelease2, waitRelease2 := acquireSemaphore(t, semaphore, 5) - - require.NotNil(t, <-semaphore.Acquire()) - - close(waitBeforeRelease1) - close(waitBeforeRelease2) - waitRelease1() - waitRelease2() - }) - - t.Run("the semaphore is resized down lower than the current length", func(t *testing.T) { - semaphore := NewResizableSemaphore(testhelper.Context(t), 10) - - waitBeforeRelease1, waitRelease1 := acquireSemaphore(t, semaphore, 5) - - semaphore.Resize(3) - - require.False(t, semaphore.TryAcquire()) - close(waitBeforeRelease1) - waitRelease1() - - waitBeforeRelease2, waitRelease2 := acquireSemaphore(t, semaphore, 3) - require.False(t, semaphore.TryAcquire()) - - close(waitBeforeRelease2) - waitRelease2() - }) - - t.Run("the semaphore is resized down when full", func(t *testing.T) { - semaphore := NewResizableSemaphore(testhelper.Context(t), 10) - - waitBeforeRelease1, waitRelease1 := acquireSemaphore(t, semaphore, 10) - - semaphore.Resize(5) - - require.False(t, semaphore.TryAcquire()) - close(waitBeforeRelease1) - waitRelease1() - - waitBeforeRelease2, waitRelease2 := acquireSemaphore(t, semaphore, 5) - require.False(t, semaphore.TryAcquire()) - - close(waitBeforeRelease2) - waitRelease2() - }) -} - -func acquireSemaphore(t *testing.T, semaphore *resizableSemaphore, n int) (chan struct{}, func()) { - var acquireWg, releaseWg sync.WaitGroup - waitBeforeRelease := make(chan struct{}) - - for i := 0; i < n; i++ { - acquireWg.Add(1) - releaseWg.Add(1) - go func() { - require.True(t, <-semaphore.Acquire()) - acquireWg.Done() - - <-waitBeforeRelease - semaphore.Release() - releaseWg.Done() - }() - } - acquireWg.Wait() - - return waitBeforeRelease, releaseWg.Wait -} diff --git a/internal/testhelper/testserver/gitaly.go b/internal/testhelper/testserver/gitaly.go index 6352fcfe0..51d29bcdc 100644 --- a/internal/testhelper/testserver/gitaly.go +++ b/internal/testhelper/testserver/gitaly.go @@ -161,8 +161,6 @@ func waitHealthy(tb testing.TB, ctx context.Context, addr string, authToken stri func runGitaly(tb testing.TB, cfg config.Cfg, registrar func(srv *grpc.Server, deps *service.Dependencies), opts ...GitalyServerOpt) (*grpc.Server, string, bool) { tb.Helper() - ctx := testhelper.Context(tb) - var gsd gitalyServerDeps for _, opt := range opts { gsd = opt(gsd) @@ -175,7 +173,7 @@ func runGitaly(tb testing.TB, cfg config.Cfg, registrar func(srv *grpc.Server, d server.WithStreamInterceptor(StructErrStreamInterceptor), } - deps := gsd.createDependencies(tb, ctx, cfg) + deps := gsd.createDependencies(tb, cfg) tb.Cleanup(func() { gsd.conns.Close() }) serverFactory := server.NewGitalyServerFactory( @@ -203,6 +201,7 @@ func runGitaly(tb testing.TB, cfg config.Cfg, registrar func(srv *grpc.Server, d assert.NoError(tb, internalServer.Serve(internalListener), "failure to serve internal gRPC") }() + ctx := testhelper.Context(tb) waitHealthy(tb, ctx, "unix://"+internalListener.Addr().String(), cfg.Auth.Token) } @@ -239,6 +238,7 @@ func runGitaly(tb testing.TB, cfg config.Cfg, registrar func(srv *grpc.Server, d assert.NoError(tb, externalServer.Serve(listener), "failure to serve external gRPC") }() + ctx := testhelper.Context(tb) waitHealthy(tb, ctx, addr, cfg.Auth.Token) return externalServer, addr, gsd.disablePraefect @@ -276,7 +276,7 @@ type gitalyServerDeps struct { signingKey string } -func (gsd *gitalyServerDeps) createDependencies(tb testing.TB, ctx context.Context, cfg config.Cfg) *service.Dependencies { +func (gsd *gitalyServerDeps) createDependencies(tb testing.TB, cfg config.Cfg) *service.Dependencies { if gsd.logger == nil { gsd.logger = testhelper.NewGitalyServerLogger(tb) } @@ -328,8 +328,7 @@ func (gsd *gitalyServerDeps) createDependencies(tb testing.TB, ctx context.Conte if gsd.packObjectsLimiter == nil { gsd.packObjectsLimiter = limiter.NewConcurrencyLimiter( - ctx, - limiter.NewAdaptiveLimit("staticLimit", limiter.AdaptiveSetting{Initial: 0}), + 0, 0, nil, limiter.NewNoopConcurrencyMonitor(), @@ -337,7 +336,7 @@ func (gsd *gitalyServerDeps) createDependencies(tb testing.TB, ctx context.Conte } if gsd.limitHandler == nil { - gsd.limitHandler = limithandler.New(cfg, limithandler.LimitConcurrencyByRepo, limithandler.WithConcurrencyLimiters(ctx)) + gsd.limitHandler = limithandler.New(cfg, limithandler.LimitConcurrencyByRepo, limithandler.WithConcurrencyLimiters) } if gsd.repositoryCounter == nil { |