diff options
author | Quang-Minh Nguyen <qmnguyen@gitlab.com> | 2023-08-09 17:14:01 +0300 |
---|---|---|
committer | Quang-Minh Nguyen <qmnguyen@gitlab.com> | 2023-08-09 17:14:01 +0300 |
commit | 023c2a7d02fa92926cb3db390357149e5ac1151c (patch) | |
tree | 30b3cccf89f7292639855ebe8ad302948e960e0e | |
parent | 0e78015ff2052203845e049be8b3395bac782554 (diff) | |
parent | ea45aabc5baff7eb41f515eef6a44981e6da9945 (diff) |
Merge branch 'qmnguyen0711/add-adaptive-limit-to-pack-objects-limit' into 'master'
limiter: Add adaptive limit to the concurrency limiter
See merge request https://gitlab.com/gitlab-org/gitaly/-/merge_requests/6183
Merged-by: Quang-Minh Nguyen <qmnguyen@gitlab.com>
Approved-by: Patrick Steinhardt <psteinhardt@gitlab.com>
Reviewed-by: Patrick Steinhardt <psteinhardt@gitlab.com>
Reviewed-by: Quang-Minh Nguyen <qmnguyen@gitlab.com>
Reviewed-by: John Cai <jcai@gitlab.com>
-rw-r--r-- | internal/cli/gitaly/serve.go | 10 | ||||
-rw-r--r-- | internal/gitaly/server/auth_test.go | 32 | ||||
-rw-r--r-- | internal/gitaly/service/hook/pack_objects_test.go | 3 | ||||
-rw-r--r-- | internal/grpc/middleware/limithandler/middleware.go | 144 | ||||
-rw-r--r-- | internal/grpc/middleware/limithandler/middleware_test.go | 24 | ||||
-rw-r--r-- | internal/limiter/adaptive_calculator.go | 2 | ||||
-rw-r--r-- | internal/limiter/adaptive_calculator_test.go | 10 | ||||
-rw-r--r-- | internal/limiter/adaptive_limit.go | 57 | ||||
-rw-r--r-- | internal/limiter/adaptive_limit_test.go | 124 | ||||
-rw-r--r-- | internal/limiter/concurrency_limiter.go | 99 | ||||
-rw-r--r-- | internal/limiter/concurrency_limiter_test.go | 562 | ||||
-rw-r--r-- | internal/limiter/resizable_semaphore.go | 157 | ||||
-rw-r--r-- | internal/limiter/resizable_semaphore_test.go | 262 | ||||
-rw-r--r-- | internal/testhelper/testserver/gitaly.go | 13 |
14 files changed, 1338 insertions, 161 deletions
diff --git a/internal/cli/gitaly/serve.go b/internal/cli/gitaly/serve.go index c5a70688f..3c7d6bdaf 100644 --- a/internal/cli/gitaly/serve.go +++ b/internal/cli/gitaly/serve.go @@ -265,10 +265,15 @@ func run(cfg config.Cfg) error { return fmt.Errorf("disk cache walkers: %w", err) } + // The pack-objects limit below is static at this stage. It's always equal to the initial limit, which uses + // MaxConcurrency config. + packObjectLimit := limiter.NewAdaptiveLimit("packObjects", limiter.AdaptiveSetting{ + Initial: cfg.PackObjectsLimiting.MaxConcurrency, + }) concurrencyLimitHandler := limithandler.New( cfg, limithandler.LimitConcurrencyByRepo, - limithandler.WithConcurrencyLimiters, + limithandler.WithConcurrencyLimiters(ctx), ) rateLimitHandler := limithandler.New( @@ -289,7 +294,8 @@ func run(cfg config.Cfg) error { } } packObjectsLimiter := limiter.NewConcurrencyLimiter( - cfg.PackObjectsLimiting.MaxConcurrency, + ctx, + packObjectLimit, cfg.PackObjectsLimiting.MaxQueueLength, newTickerFunc, packObjectsMonitor, diff --git a/internal/gitaly/server/auth_test.go b/internal/gitaly/server/auth_test.go index d275319f5..99d9f498a 100644 --- a/internal/gitaly/server/auth_test.go +++ b/internal/gitaly/server/auth_test.go @@ -1,7 +1,7 @@ package server import ( - netctx "context" + "context" "crypto/tls" "crypto/x509" "fmt" @@ -44,7 +44,7 @@ func TestMain(m *testing.M) { } func TestSanity(t *testing.T) { - serverSocketPath := runServer(t, testcfg.Build(t)) + serverSocketPath := runServer(testhelper.Context(t), t, testcfg.Build(t)) conn, err := dial(serverSocketPath, []grpc.DialOption{grpc.WithTransportCredentials(insecure.NewCredentials())}) require.NoError(t, err) @@ -55,7 +55,8 @@ func TestSanity(t *testing.T) { func TestTLSSanity(t *testing.T) { cfg := testcfg.Build(t) - addr := runSecureServer(t, cfg) + ctx := testhelper.Context(t) + addr := runSecureServer(ctx, t, cfg) certPool, err := x509.SystemCertPool() require.NoError(t, err) @@ -102,7 +103,7 @@ func TestAuthFailures(t *testing.T) { Auth: auth.Config{Token: "quxbaz"}, })) - serverSocketPath := runServer(t, cfg) + serverSocketPath := runServer(testhelper.Context(t), t, cfg) connOpts := append(tc.opts, grpc.WithTransportCredentials(insecure.NewCredentials())) conn, err := dial(serverSocketPath, connOpts) require.NoError(t, err, tc.desc) @@ -145,7 +146,7 @@ func TestAuthSuccess(t *testing.T) { Auth: auth.Config{Token: tc.token, Transitioning: !tc.required}, })) - serverSocketPath := runServer(t, cfg) + serverSocketPath := runServer(testhelper.Context(t), t, cfg) connOpts := append(tc.opts, grpc.WithTransportCredentials(insecure.NewCredentials())) conn, err := dial(serverSocketPath, connOpts) require.NoError(t, err, tc.desc) @@ -158,7 +159,7 @@ func TestAuthSuccess(t *testing.T) { type brokenAuth struct{} func (brokenAuth) RequireTransportSecurity() bool { return false } -func (brokenAuth) GetRequestMetadata(netctx.Context, ...string) (map[string]string, error) { +func (brokenAuth) GetRequestMetadata(context.Context, ...string) (map[string]string, error) { return map[string]string{"authorization": "Bearer blablabla"}, nil } @@ -186,7 +187,7 @@ func newOperationClient(t *testing.T, token, serverSocketPath string) (gitalypb. return gitalypb.NewOperationServiceClient(conn), conn } -func runServer(t *testing.T, cfg config.Cfg) string { +func runServer(ctx context.Context, t *testing.T, cfg config.Cfg) string { t.Helper() registry := backchannel.NewRegistry() @@ -201,7 +202,7 @@ func runServer(t *testing.T, cfg config.Cfg) string { catfileCache := catfile.NewCache(cfg) t.Cleanup(catfileCache.Stop) diskCache := cache.New(cfg, locator) - limitHandler := limithandler.New(cfg, limithandler.LimitConcurrencyByRepo, limithandler.WithConcurrencyLimiters) + limitHandler := limithandler.New(cfg, limithandler.LimitConcurrencyByRepo, limithandler.WithConcurrencyLimiters(ctx)) updaterWithHooks := updateref.NewUpdaterWithHooks(cfg, locator, hookManager, gitCmdFactory, catfileCache) srv, err := NewGitalyServerFactory(cfg, testhelper.NewDiscardingLogEntry(t), registry, diskCache, []*limithandler.LimiterMiddleware{limitHandler}).New(false) @@ -228,7 +229,7 @@ func runServer(t *testing.T, cfg config.Cfg) string { } //go:generate openssl req -newkey rsa:4096 -new -nodes -x509 -days 3650 -out testdata/gitalycert.pem -keyout testdata/gitalykey.pem -subj "/C=US/ST=California/L=San Francisco/O=GitLab/OU=GitLab-Shell/CN=localhost" -addext "subjectAltName = IP:127.0.0.1, DNS:localhost" -func runSecureServer(t *testing.T, cfg config.Cfg) string { +func runSecureServer(ctx context.Context, t *testing.T, cfg config.Cfg) string { t.Helper() cfg.TLS = config.TLS{ @@ -244,7 +245,7 @@ func runSecureServer(t *testing.T, cfg config.Cfg) string { testhelper.NewDiscardingLogEntry(t), backchannel.NewRegistry(), cache.New(cfg, config.NewLocator(cfg)), - []*limithandler.LimiterMiddleware{limithandler.New(cfg, limithandler.LimitConcurrencyByRepo, limithandler.WithConcurrencyLimiters)}, + []*limithandler.LimiterMiddleware{limithandler.New(cfg, limithandler.LimitConcurrencyByRepo, limithandler.WithConcurrencyLimiters(ctx))}, ).New(true) require.NoError(t, err) @@ -259,11 +260,12 @@ func runSecureServer(t *testing.T, cfg config.Cfg) string { func TestUnaryNoAuth(t *testing.T) { cfg := testcfg.Build(t, testcfg.WithBase(config.Cfg{Auth: auth.Config{Token: "testtoken"}})) - path := runServer(t, cfg) + ctx := testhelper.Context(t) + + path := runServer(ctx, t, cfg) conn, err := grpc.Dial(path, grpc.WithTransportCredentials(insecure.NewCredentials())) require.NoError(t, err) defer testhelper.MustClose(t, conn) - ctx := testhelper.Context(t) client := gitalypb.NewRepositoryServiceClient(conn) _, err = client.CreateRepository(ctx, &gitalypb.CreateRepositoryRequest{ @@ -279,12 +281,12 @@ func TestUnaryNoAuth(t *testing.T) { func TestStreamingNoAuth(t *testing.T) { cfg := testcfg.Build(t, testcfg.WithBase(config.Cfg{Auth: auth.Config{Token: "testtoken"}})) + ctx := testhelper.Context(t) - path := runServer(t, cfg) + path := runServer(ctx, t, cfg) conn, err := dial(path, []grpc.DialOption{grpc.WithTransportCredentials(insecure.NewCredentials())}) require.NoError(t, err) t.Cleanup(func() { conn.Close() }) - ctx := testhelper.Context(t) client := gitalypb.NewRepositoryServiceClient(conn) stream, err := client.GetInfoAttributes(ctx, &gitalypb.GetInfoAttributesRequest{ @@ -329,7 +331,7 @@ func TestAuthBeforeLimit(t *testing.T) { t.Cleanup(cleanup) cfg.Gitlab.URL = gitlabURL - serverSocketPath := runServer(t, cfg) + serverSocketPath := runServer(ctx, t, cfg) client, conn := newOperationClient(t, cfg.Auth.Token, serverSocketPath) t.Cleanup(func() { conn.Close() }) diff --git a/internal/gitaly/service/hook/pack_objects_test.go b/internal/gitaly/service/hook/pack_objects_test.go index 4d4d1a500..62fa0dca8 100644 --- a/internal/gitaly/service/hook/pack_objects_test.go +++ b/internal/gitaly/service/hook/pack_objects_test.go @@ -858,7 +858,8 @@ func TestPackObjects_concurrencyLimit(t *testing.T) { cfg.Prometheus.GRPCLatencyBuckets, ) limiter := limiter.NewConcurrencyLimiter( - 1, + ctx, + limiter.NewAdaptiveLimit("staticLimit", limiter.AdaptiveSetting{Initial: 1}), 0, func() helper.Ticker { return ticker }, monitor, diff --git a/internal/grpc/middleware/limithandler/middleware.go b/internal/grpc/middleware/limithandler/middleware.go index 0c41a4672..a6ed93f9a 100644 --- a/internal/grpc/middleware/limithandler/middleware.go +++ b/internal/grpc/middleware/limithandler/middleware.go @@ -165,84 +165,88 @@ func (w *wrappedStream) RecvMsg(m interface{}) error { // WithConcurrencyLimiters sets up middleware to limit the concurrency of // requests based on RPC and repository -func WithConcurrencyLimiters(cfg config.Cfg, middleware *LimiterMiddleware) { - acquiringSecondsMetric := prometheus.NewHistogramVec( - prometheus.HistogramOpts{ - Namespace: "gitaly", - Subsystem: "concurrency_limiting", - Name: "acquiring_seconds", - Help: "Histogram of time calls are rate limited (in seconds)", - Buckets: cfg.Prometheus.GRPCLatencyBuckets, - }, - []string{"system", "grpc_service", "grpc_method"}, - ) - inProgressMetric := prometheus.NewGaugeVec( - prometheus.GaugeOpts{ - Namespace: "gitaly", - Subsystem: "concurrency_limiting", - Name: "in_progress", - Help: "Gauge of number of concurrent in-progress calls", - }, - []string{"system", "grpc_service", "grpc_method"}, - ) - queuedMetric := prometheus.NewGaugeVec( - prometheus.GaugeOpts{ - Namespace: "gitaly", - Subsystem: "concurrency_limiting", - Name: "queued", - Help: "Gauge of number of queued calls", - }, - []string{"system", "grpc_service", "grpc_method"}, - ) - - middleware.collect = func(metrics chan<- prometheus.Metric) { - acquiringSecondsMetric.Collect(metrics) - inProgressMetric.Collect(metrics) - queuedMetric.Collect(metrics) - } - - result := make(map[string]limiter.Limiter) - for _, limit := range cfg.Concurrency { - limit := limit +func WithConcurrencyLimiters(ctx context.Context) SetupFunc { + return func(cfg config.Cfg, middleware *LimiterMiddleware) { + acquiringSecondsMetric := prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: "gitaly", + Subsystem: "concurrency_limiting", + Name: "acquiring_seconds", + Help: "Histogram of time calls are rate limited (in seconds)", + Buckets: cfg.Prometheus.GRPCLatencyBuckets, + }, + []string{"system", "grpc_service", "grpc_method"}, + ) + inProgressMetric := prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: "gitaly", + Subsystem: "concurrency_limiting", + Name: "in_progress", + Help: "Gauge of number of concurrent in-progress calls", + }, + []string{"system", "grpc_service", "grpc_method"}, + ) + queuedMetric := prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: "gitaly", + Subsystem: "concurrency_limiting", + Name: "queued", + Help: "Gauge of number of queued calls", + }, + []string{"system", "grpc_service", "grpc_method"}, + ) - newTickerFunc := func() helper.Ticker { - return helper.NewManualTicker() + middleware.collect = func(metrics chan<- prometheus.Metric) { + acquiringSecondsMetric.Collect(metrics) + inProgressMetric.Collect(metrics) + queuedMetric.Collect(metrics) } - if limit.MaxQueueWait > 0 { - newTickerFunc = func() helper.Ticker { - return helper.NewTimerTicker(limit.MaxQueueWait.Duration()) + result := make(map[string]limiter.Limiter) + for _, limit := range cfg.Concurrency { + limit := limit + + newTickerFunc := func() helper.Ticker { + return helper.NewManualTicker() } + + if limit.MaxQueueWait > 0 { + newTickerFunc = func() helper.Ticker { + return helper.NewTimerTicker(limit.MaxQueueWait.Duration()) + } + } + + result[limit.RPC] = limiter.NewConcurrencyLimiter( + ctx, + limiter.NewAdaptiveLimit("staticLimit", limiter.AdaptiveSetting{Initial: limit.MaxPerRepo}), + limit.MaxQueueSize, + newTickerFunc, + limiter.NewPerRPCPromMonitor( + "gitaly", limit.RPC, + queuedMetric, inProgressMetric, acquiringSecondsMetric, middleware.requestsDroppedMetric, + ), + ) } - result[limit.RPC] = limiter.NewConcurrencyLimiter( - limit.MaxPerRepo, - limit.MaxQueueSize, - newTickerFunc, - limiter.NewPerRPCPromMonitor( - "gitaly", limit.RPC, - queuedMetric, inProgressMetric, acquiringSecondsMetric, middleware.requestsDroppedMetric, - ), - ) - } + // Set default for ReplicateRepository. + replicateRepositoryFullMethod := "/gitaly.RepositoryService/ReplicateRepository" + if _, ok := result[replicateRepositoryFullMethod]; !ok { + result[replicateRepositoryFullMethod] = limiter.NewConcurrencyLimiter( + ctx, + limiter.NewAdaptiveLimit("staticLimit", limiter.AdaptiveSetting{Initial: 1}), + 0, + func() helper.Ticker { + return helper.NewManualTicker() + }, + limiter.NewPerRPCPromMonitor( + "gitaly", replicateRepositoryFullMethod, + queuedMetric, inProgressMetric, acquiringSecondsMetric, middleware.requestsDroppedMetric, + ), + ) + } - // Set default for ReplicateRepository. - replicateRepositoryFullMethod := "/gitaly.RepositoryService/ReplicateRepository" - if _, ok := result[replicateRepositoryFullMethod]; !ok { - result[replicateRepositoryFullMethod] = limiter.NewConcurrencyLimiter( - 1, - 0, - func() helper.Ticker { - return helper.NewManualTicker() - }, - limiter.NewPerRPCPromMonitor( - "gitaly", replicateRepositoryFullMethod, - queuedMetric, inProgressMetric, acquiringSecondsMetric, middleware.requestsDroppedMetric, - ), - ) + middleware.methodLimiters = result } - - middleware.methodLimiters = result } // WithRateLimiters sets up a middleware with limiters that limit requests diff --git a/internal/grpc/middleware/limithandler/middleware_test.go b/internal/grpc/middleware/limithandler/middleware_test.go index 65adad2be..40978be69 100644 --- a/internal/grpc/middleware/limithandler/middleware_test.go +++ b/internal/grpc/middleware/limithandler/middleware_test.go @@ -38,6 +38,7 @@ func fixedLockKey(ctx context.Context) string { func TestUnaryLimitHandler(t *testing.T) { t.Parallel() + ctx := testhelper.Context(t) s := &queueTestServer{ server: server{ blockCh: make(chan struct{}), @@ -51,14 +52,13 @@ func TestUnaryLimitHandler(t *testing.T) { }, } - lh := limithandler.New(cfg, fixedLockKey, limithandler.WithConcurrencyLimiters) + lh := limithandler.New(cfg, fixedLockKey, limithandler.WithConcurrencyLimiters(ctx)) interceptor := lh.UnaryInterceptor() srv, serverSocketPath := runServer(t, s, grpc.UnaryInterceptor(interceptor)) defer srv.Stop() client, conn := newClient(t, serverSocketPath) defer conn.Close() - ctx := testhelper.Context(t) var wg sync.WaitGroup defer wg.Wait() @@ -114,7 +114,7 @@ func TestUnaryLimitHandler_queueing(t *testing.T) { MaxQueueWait: duration.Duration(time.Millisecond), }, }, - }, fixedLockKey, limithandler.WithConcurrencyLimiters) + }, fixedLockKey, limithandler.WithConcurrencyLimiters(ctx)) s := &queueTestServer{ server: server{ @@ -175,7 +175,7 @@ func TestUnaryLimitHandler_queueing(t *testing.T) { MaxPerRepo: 1, }, }, - }, fixedLockKey, limithandler.WithConcurrencyLimiters) + }, fixedLockKey, limithandler.WithConcurrencyLimiters(ctx)) s := &queueTestServer{ server: server{ @@ -427,6 +427,7 @@ func TestStreamLimitHandler(t *testing.T) { t.Run(tc.desc, func(t *testing.T) { t.Parallel() + ctx := testhelper.Context(t) s := &server{blockCh: make(chan struct{})} maxQueueSize := 1 @@ -440,14 +441,13 @@ func TestStreamLimitHandler(t *testing.T) { }, } - lh := limithandler.New(cfg, fixedLockKey, limithandler.WithConcurrencyLimiters) + lh := limithandler.New(cfg, fixedLockKey, limithandler.WithConcurrencyLimiters(ctx)) interceptor := lh.StreamInterceptor() srv, serverSocketPath := runServer(t, s, grpc.StreamInterceptor(interceptor)) defer srv.Stop() client, conn := newClient(t, serverSocketPath) defer conn.Close() - ctx := testhelper.Context(t) totalCalls := 10 @@ -481,6 +481,8 @@ func TestStreamLimitHandler(t *testing.T) { func TestStreamLimitHandler_error(t *testing.T) { t.Parallel() + ctx := testhelper.Context(t) + s := &queueTestServer{reqArrivedCh: make(chan struct{})} s.blockCh = make(chan struct{}) @@ -490,7 +492,7 @@ func TestStreamLimitHandler_error(t *testing.T) { }, } - lh := limithandler.New(cfg, fixedLockKey, limithandler.WithConcurrencyLimiters) + lh := limithandler.New(cfg, fixedLockKey, limithandler.WithConcurrencyLimiters(ctx)) interceptor := lh.StreamInterceptor() srv, serverSocketPath := runServer(t, s, grpc.StreamInterceptor(interceptor)) defer srv.Stop() @@ -498,8 +500,6 @@ func TestStreamLimitHandler_error(t *testing.T) { client, conn := newClient(t, serverSocketPath) defer conn.Close() - ctx := testhelper.Context(t) - respChan := make(chan *grpc_testing.StreamingOutputCallResponse) go func() { stream, err := client.FullDuplexCall(ctx) @@ -600,6 +600,8 @@ func (q *queueTestServer) FullDuplexCall(stream grpc_testing.TestService_FullDup } func TestConcurrencyLimitHandlerMetrics(t *testing.T) { + ctx := testhelper.Context(t) + s := &queueTestServer{reqArrivedCh: make(chan struct{})} s.blockCh = make(chan struct{}) @@ -610,7 +612,7 @@ func TestConcurrencyLimitHandlerMetrics(t *testing.T) { }, } - lh := limithandler.New(cfg, fixedLockKey, limithandler.WithConcurrencyLimiters) + lh := limithandler.New(cfg, fixedLockKey, limithandler.WithConcurrencyLimiters(ctx)) interceptor := lh.UnaryInterceptor() srv, serverSocketPath := runServer(t, s, grpc.UnaryInterceptor(interceptor)) defer srv.Stop() @@ -618,8 +620,6 @@ func TestConcurrencyLimitHandlerMetrics(t *testing.T) { client, conn := newClient(t, serverSocketPath) defer conn.Close() - ctx := testhelper.Context(t) - respCh := make(chan *grpc_testing.SimpleResponse) go func() { resp, err := client.UnaryCall(ctx, &grpc_testing.SimpleRequest{}) diff --git a/internal/limiter/adaptive_calculator.go b/internal/limiter/adaptive_calculator.go index 15d7cf87b..793fc853c 100644 --- a/internal/limiter/adaptive_calculator.go +++ b/internal/limiter/adaptive_calculator.go @@ -262,7 +262,7 @@ func (c *AdaptiveCalculator) calibrateLimits(ctx context.Context) { }).Debugf("Additive increase") } else { // Multiplicative decrease - newLimit = int(math.Floor(float64(limit.Current()) * setting.BackoffBackoff)) + newLimit = int(math.Floor(float64(limit.Current()) * setting.BackoffFactor)) if newLimit < setting.Min { newLimit = setting.Min } diff --git a/internal/limiter/adaptive_calculator_test.go b/internal/limiter/adaptive_calculator_test.go index e2c2d95dc..d4965bdf4 100644 --- a/internal/limiter/adaptive_calculator_test.go +++ b/internal/limiter/adaptive_calculator_test.go @@ -657,12 +657,14 @@ func (l *testLimit) Update(val int) { l.currents = append(l.currents, val) } +func (*testLimit) AfterUpdate(_ AfterUpdateHook) {} + func (l *testLimit) Setting() AdaptiveSetting { return AdaptiveSetting{ - Initial: l.initial, - Max: l.max, - Min: l.min, - BackoffBackoff: l.backoffBackoff, + Initial: l.initial, + Max: l.max, + Min: l.min, + BackoffFactor: l.backoffBackoff, } } diff --git a/internal/limiter/adaptive_limit.go b/internal/limiter/adaptive_limit.go index 2aefe5eb0..5af9f8ebd 100644 --- a/internal/limiter/adaptive_limit.go +++ b/internal/limiter/adaptive_limit.go @@ -1,30 +1,49 @@ package limiter -import "sync/atomic" +import ( + "sync" +) // AdaptiveSetting is a struct that holds the configuration parameters for an adaptive limiter. type AdaptiveSetting struct { - Initial int - Max int - Min int - BackoffBackoff float64 + Initial int + Max int + Min int + BackoffFactor float64 } +// AfterUpdateHook is a function hook that is triggered when the current value changes. The callers need to register a hook to +// the AdaptiveLimiter implementation beforehand. They are required to handle errors inside the hook function. +type AfterUpdateHook func(newVal int) + // AdaptiveLimiter is an interface for managing and updating adaptive limits. // It exposes methods to get the name, current limit value, update the limit value, and access its settings. type AdaptiveLimiter interface { Name() string Current() int Update(val int) + AfterUpdate(AfterUpdateHook) Setting() AdaptiveSetting } // AdaptiveLimit is an implementation of the AdaptiveLimiter interface. It uses an atomic Int32 to represent the current // limit value, ensuring thread-safe updates. type AdaptiveLimit struct { - name string - current atomic.Int32 - setting AdaptiveSetting + sync.RWMutex + + name string + current int + setting AdaptiveSetting + updateHooks []AfterUpdateHook +} + +// NewAdaptiveLimit initializes a new AdaptiveLimit object +func NewAdaptiveLimit(name string, setting AdaptiveSetting) *AdaptiveLimit { + return &AdaptiveLimit{ + name: name, + current: setting.Initial, + setting: setting, + } } // Name returns the name of the adaptive limit @@ -34,12 +53,30 @@ func (l *AdaptiveLimit) Name() string { // Current returns the current limit. This function can be called without the need for synchronization. func (l *AdaptiveLimit) Current() int { - return int(l.current.Load()) + l.RLock() + defer l.RUnlock() + + return l.current } // Update adjusts current limit value. func (l *AdaptiveLimit) Update(val int) { - l.current.Store(int32(val)) + l.Lock() + defer l.Unlock() + + if val != l.current { + l.current = val + for _, hook := range l.updateHooks { + hook(val) + } + } +} + +// AfterUpdate registers a callback when the current limit is updated. Because all updates and hooks are synchronized, +// calling Current() inside the update hook in the same goroutine will cause deadlock. Hence, the update hook must +// use the new value passed as the argument instead. +func (l *AdaptiveLimit) AfterUpdate(hook AfterUpdateHook) { + l.updateHooks = append(l.updateHooks, hook) } // Setting returns the configuration parameters for an adaptive limiter. diff --git a/internal/limiter/adaptive_limit_test.go b/internal/limiter/adaptive_limit_test.go new file mode 100644 index 000000000..f6c67f092 --- /dev/null +++ b/internal/limiter/adaptive_limit_test.go @@ -0,0 +1,124 @@ +package limiter + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func TestAdaptiveLimit_New(t *testing.T) { + t.Parallel() + + setting := AdaptiveSetting{ + Initial: 5, + Max: 10, + Min: 1, + BackoffFactor: 0.5, + } + + limit := NewAdaptiveLimit("testLimit", setting) + require.Equal(t, limit.Name(), "testLimit") + require.Equal(t, limit.Current(), 5) + require.Equal(t, limit.Setting(), setting) +} + +func TestAdaptiveLimit_Update(t *testing.T) { + t.Parallel() + + newLimit := func() *AdaptiveLimit { + return NewAdaptiveLimit("testLimit", AdaptiveSetting{ + Initial: 5, + Max: 10, + Min: 1, + BackoffFactor: 0.5, + }) + } + + t.Run("without update hooks", func(t *testing.T) { + limit := newLimit() + + limit.Update(1) + require.Equal(t, 1, limit.Current()) + + limit.Update(2) + require.Equal(t, 2, limit.Current()) + + limit.Update(3) + require.Equal(t, 3, limit.Current()) + }) + + t.Run("new values are different from old values", func(t *testing.T) { + limit := newLimit() + + vals := []int{} + limit.AfterUpdate(func(val int) { + vals = append(vals, val) + }) + + limit.Update(1) + require.Equal(t, 1, limit.Current()) + require.Equal(t, vals, []int{1}) + + limit.Update(2) + require.Equal(t, 2, limit.Current()) + require.Equal(t, vals, []int{1, 2}) + + limit.Update(3) + require.Equal(t, 3, limit.Current()) + require.Equal(t, vals, []int{1, 2, 3}) + }) + + t.Run("new values are the same as old values", func(t *testing.T) { + limit := newLimit() + + vals := []int{} + limit.AfterUpdate(func(val int) { + vals = append(vals, val) + }) + + limit.Update(1) + require.Equal(t, 1, limit.Current()) + require.Equal(t, vals, []int{1}) + + limit.Update(1) + require.Equal(t, 1, limit.Current()) + require.Equal(t, vals, []int{1}) + + limit.Update(2) + require.Equal(t, 2, limit.Current()) + require.Equal(t, vals, []int{1, 2}) + + limit.Update(2) + require.Equal(t, 2, limit.Current()) + require.Equal(t, vals, []int{1, 2}) + }) + + t.Run("multiple update hooks", func(t *testing.T) { + limit := newLimit() + + vals1 := []int{} + limit.AfterUpdate(func(val int) { + vals1 = append(vals1, val) + }) + + vals2 := []int{} + limit.AfterUpdate(func(val int) { + vals2 = append(vals2, val*2) + }) + + limit.Update(1) + require.Equal(t, 1, limit.Current()) + require.Equal(t, vals1, []int{1}) + require.Equal(t, vals2, []int{2}) + + limit.Update(2) + require.Equal(t, 2, limit.Current()) + require.Equal(t, vals1, []int{1, 2}) + require.Equal(t, vals2, []int{2, 4}) + + limit.Update(3) + require.Equal(t, 3, limit.Current()) + require.Equal(t, vals1, []int{1, 2, 3}) + require.Equal(t, vals2, []int{2, 4, 6}) + }) +} diff --git a/internal/limiter/concurrency_limiter.go b/internal/limiter/concurrency_limiter.go index 2807b856e..b4284adeb 100644 --- a/internal/limiter/concurrency_limiter.go +++ b/internal/limiter/concurrency_limiter.go @@ -41,10 +41,10 @@ type keyedConcurrencyLimiter struct { // concurrencyTokens is the channel of available concurrency tokens, where every token // allows one concurrent call to the concurrency-limited function. - concurrencyTokens chan struct{} + concurrencyTokens *resizableSemaphore // queueTokens is the channel of available queue tokens, where every token allows one // concurrent call to be admitted to the queue. - queueTokens chan struct{} + queueTokens *resizableSemaphore } // acquire tries to acquire the semaphore. It may fail if the admission queue is full or if the max @@ -55,34 +55,33 @@ func (sem *keyedConcurrencyLimiter) acquire(ctx context.Context, limitingKey str // callers may wait for the concurrency token at the same time. If there are no more // queueing tokens then this indicates that the queue is full and we thus return an // error immediately. - select { - case sem.queueTokens <- struct{}{}: - // We have acquired a queueing token, so we need to release it if acquiring - // the concurrency token fails. If we succeed to acquire the concurrency - // token though then we retain the queueing token until the caller signals - // that the concurrency-limited function has finished. As a consequence the - // queue token is returned together with the concurrency token. - // - // A simpler model would be to just have `maxQueueLength` many queueing - // tokens. But this would add concurrency-limiting when acquiring the queue - // token itself, which is not what we want to do. Instead, we want to admit - // as many callers into the queue as the queue length permits plus the - // number of available concurrency tokens allows. - defer func() { - if returnedErr != nil { - <-sem.queueTokens - } - }() - default: + if !sem.queueTokens.TryAcquire() { return ErrMaxQueueSize } + + // We have acquired a queueing token, so we need to release it if acquiring + // the concurrency token fails. If we successfully acquire the concurrency + // token though then we retain the queueing token until the caller signals + // that the concurrency-limited function has finished. As a consequence the + // queue token is returned together with the concurrency token. + // + // A simpler model would be to just have `maxQueueLength` many queueing + // tokens. But this would add concurrency-limiting when acquiring the queue + // token itself, which is not what we want to do. Instead, we want to admit + // as many callers into the queue as the queue length permits plus the + // number of available concurrency tokens allows. + defer func() { + if returnedErr != nil { + sem.queueTokens.Release() + } + }() } // We are queued now, so let's tell the monitor. Furthermore, even though we're still // holding the queueing token when this function exits successfully we also tell the monitor // that we have exited the queue. It is only an implementation detail anyway that we hold on // to the token, so the monitor shouldn't care about that. - sem.monitor.Queued(ctx, limitingKey, len(sem.queueTokens)) + sem.monitor.Queued(ctx, limitingKey, sem.queueLength()) defer sem.monitor.Dequeued(ctx) // Set up the ticker that keeps us from waiting indefinitely on the concurrency token. @@ -98,7 +97,12 @@ func (sem *keyedConcurrencyLimiter) acquire(ctx context.Context, limitingKey str // Try to acquire the concurrency token now that we're in the queue. select { - case sem.concurrencyTokens <- struct{}{}: + case acquired := <-sem.concurrencyTokens.Acquire(): + // When the semaphore returns false, the semaphore was stopped. It's likely due to the context is + // cancelled. Hence, we should return the error here. + if !acquired { + return sem.concurrencyTokens.Err() + } return nil case <-ticker.C(): return ErrMaxQueueTime @@ -110,21 +114,27 @@ func (sem *keyedConcurrencyLimiter) acquire(ctx context.Context, limitingKey str // release releases the acquired tokens. func (sem *keyedConcurrencyLimiter) release() { if sem.queueTokens != nil { - <-sem.queueTokens + sem.queueTokens.Release() } - <-sem.concurrencyTokens + sem.concurrencyTokens.Release() } // queueLength returns the length of token queue func (sem *keyedConcurrencyLimiter) queueLength() int { - return len(sem.queueTokens) + if sem.queueTokens == nil { + return 0 + } + return int(sem.queueTokens.Current()) } // ConcurrencyLimiter contains rate limiter state. type ConcurrencyLimiter struct { - // maxConcurrencyLimit is the maximum number of concurrent calls to the limited function. - // This limit is per key. - maxConcurrencyLimit int64 + // ctx stores the context at initialization. This context is used as a stopping condition + // for some internal goroutines. + ctx context.Context + // limit is the adaptive maximum number of concurrent calls to the limited function. This limit is + // calculated adaptively from an outside calculator. + limit *AdaptiveLimit // maxQueueLength is the maximum number of operations allowed to wait in a queued state. // This limit is global and applies before the concurrency limit. Subsequent incoming // operations will be rejected with an error immediately. @@ -145,18 +155,31 @@ type ConcurrencyLimiter struct { } // NewConcurrencyLimiter creates a new concurrency rate limiter. -func NewConcurrencyLimiter(maxConcurrencyLimit, maxQueueLength int, maxQueuedTickerCreator QueueTickerCreator, monitor ConcurrencyMonitor) *ConcurrencyLimiter { +func NewConcurrencyLimiter(ctx context.Context, limit *AdaptiveLimit, maxQueueLength int, maxQueuedTickerCreator QueueTickerCreator, monitor ConcurrencyMonitor) *ConcurrencyLimiter { if monitor == nil { monitor = NewNoopConcurrencyMonitor() } - return &ConcurrencyLimiter{ - maxConcurrencyLimit: int64(maxConcurrencyLimit), + limiter := &ConcurrencyLimiter{ + ctx: ctx, + limit: limit, maxQueueLength: int64(maxQueueLength), maxQueuedTickerCreator: maxQueuedTickerCreator, monitor: monitor, limitsByKey: make(map[string]*keyedConcurrencyLimiter), } + + // When the capacity of the limiter is updated we also need to update the size of both the queuing tokens as + // well as the concurrency tokens to match the new size. + limit.AfterUpdate(func(val int) { + for _, keyedLimiter := range limiter.limitsByKey { + if keyedLimiter.queueTokens != nil { + keyedLimiter.queueTokens.Resize(int64(val) + limiter.maxQueueLength) + } + keyedLimiter.concurrencyTokens.Resize(int64(val)) + } + }) + return limiter } // Limit will limit the concurrency of the limited function f. There are two distinct mechanisms @@ -176,7 +199,7 @@ func (c *ConcurrencyLimiter) Limit(ctx context.Context, limitingKey string, f Li ) defer span.Finish() - if c.maxConcurrencyLimit <= 0 { + if c.currentLimit() <= 0 { return f() } @@ -222,15 +245,15 @@ func (c *ConcurrencyLimiter) getConcurrencyLimit(limitingKey string) *keyedConcu // Set up the queue tokens in case a maximum queue length was requested. As the // queue tokens are kept during the whole lifetime of the concurrency-limited // function we add the concurrency tokens to the number of available token. - var queueTokens chan struct{} + var queueTokens *resizableSemaphore if c.maxQueueLength > 0 { - queueTokens = make(chan struct{}, c.maxConcurrencyLimit+c.maxQueueLength) + queueTokens = NewResizableSemaphore(c.ctx, c.currentLimit()+c.maxQueueLength) } c.limitsByKey[limitingKey] = &keyedConcurrencyLimiter{ monitor: c.monitor, maxQueuedTickerCreator: c.maxQueuedTickerCreator, - concurrencyTokens: make(chan struct{}, c.maxConcurrencyLimit), + concurrencyTokens: NewResizableSemaphore(c.ctx, c.currentLimit()), queueTokens: queueTokens, } } @@ -268,3 +291,7 @@ func (c *ConcurrencyLimiter) countSemaphores() int { return len(c.limitsByKey) } + +func (c *ConcurrencyLimiter) currentLimit() int64 { + return int64(c.limit.Current()) +} diff --git a/internal/limiter/concurrency_limiter_test.go b/internal/limiter/concurrency_limiter_test.go index f99ca7eab..67ca38e1e 100644 --- a/internal/limiter/concurrency_limiter_test.go +++ b/internal/limiter/concurrency_limiter_test.go @@ -3,6 +3,7 @@ package limiter import ( "context" "errors" + "fmt" "strconv" "sync" "testing" @@ -86,7 +87,7 @@ func (c *counter) Dropped(_ context.Context, _ string, _ int, _ time.Duration, r } } -func TestLimiter(t *testing.T) { +func TestLimiter_static(t *testing.T) { t.Parallel() tests := []struct { @@ -155,7 +156,8 @@ func TestLimiter(t *testing.T) { gauge := &counter{} limiter := NewConcurrencyLimiter( - tt.maxConcurrency, + ctx, + NewAdaptiveLimit("staticLimit", AdaptiveSetting{Initial: tt.maxConcurrency}), 0, nil, gauge, @@ -229,6 +231,557 @@ func TestLimiter(t *testing.T) { } } +func TestLimiter_dynamic(t *testing.T) { + t.Parallel() + + t.Run("increase dynamic limit when there is no queuing request", func(t *testing.T) { + ctx := testhelper.Context(t) + limit := NewAdaptiveLimit("dynamicLimit", AdaptiveSetting{Initial: 5, Max: 10, Min: 1}) + gauge := &blockingQueueCounter{queuedCh: make(chan struct{})} + limiter := NewConcurrencyLimiter(ctx, limit, 10, nil, gauge) + + // 5 requests acquired the tokens, the limiter is full now + release1, waitAfterRelease1 := spawnAndWaitAcquired(ctx, t, "1", limiter, gauge, 5) + require.Equal(t, 5, gauge.enter) + + // Update the limit to 7 + limit.Update(7) + + // 2 more requests acquired the token. This proves the limit is expanded + release2, waitAfterRelease2 := spawnAndWaitAcquired(ctx, t, "1", limiter, gauge, 2) + require.Equal(t, 7, gauge.enter) + + close(release1) + close(release2) + waitAfterRelease1() + waitAfterRelease2() + require.Equal(t, 7, gauge.exit) + }) + + t.Run("decrease dynamic limit when there is no queuing request", func(t *testing.T) { + ctx := testhelper.Context(t) + limit := NewAdaptiveLimit("dynamicLimit", AdaptiveSetting{Initial: 5, Max: 10, Min: 1}) + gauge := &blockingQueueCounter{queuedCh: make(chan struct{})} + limiter := NewConcurrencyLimiter(ctx, limit, 10, nil, gauge) + + // 3 requests acquired the tokens, 2 slots left + release1, waitAfterRelease1 := spawnAndWaitAcquired(ctx, t, "1", limiter, gauge, 3) + require.Equal(t, 3, gauge.enter) + require.Equal(t, 3, gauge.queued) + + // Update the limit to 3 + limit.Update(3) + + // 2 requests are put in queue, meaning the limit shrinks down + waitAcquired2, release2, waitAfterRelease2 := spawnAndWaitQueued(ctx, t, "1", limiter, gauge, 2) + require.Equal(t, 3, gauge.enter) + require.Equal(t, 5, gauge.queued) + + // Release first 3 requests + close(release1) + waitAfterRelease1() + + // Now the last 2 requests can acquire token + waitAcquired2() + require.Equal(t, 5, gauge.enter) + require.Equal(t, 5, gauge.queued) + require.Equal(t, 3, gauge.exit) + + // Release the last patch + close(release2) + waitAfterRelease2() + require.Equal(t, 5, gauge.exit) + }) + + t.Run("increase dynamic limit more than the number of queuing requests", func(t *testing.T) { + ctx := testhelper.Context(t) + limit := NewAdaptiveLimit("dynamicLimit", AdaptiveSetting{Initial: 5, Max: 10, Min: 1}) + gauge := &blockingQueueCounter{queuedCh: make(chan struct{})} + limiter := NewConcurrencyLimiter(ctx, limit, 10, nil, gauge) + + // 5 requests acquired the tokens, the limiter is full now + release1, waitAfterRelease1 := spawnAndWaitAcquired(ctx, t, "1", limiter, gauge, 5) + require.Equal(t, 5, gauge.enter) + + // 2 requests waiting in the queue + waitAcquired2, release2, waitAfterRelease2 := spawnAndWaitQueued(ctx, t, "1", limiter, gauge, 2) + require.Equal(t, 5, gauge.enter) + require.Equal(t, 7, gauge.queued) + + // Update the limit to 7 + limit.Update(7) + + // Wait for the other 2 requests acquired the token. This proves the limiter is expanded + waitAcquired2() + require.Equal(t, 7, gauge.enter) + + close(release1) + close(release2) + waitAfterRelease1() + waitAfterRelease2() + require.Equal(t, 7, gauge.exit) + }) + + t.Run("increase dynamic limit less than the number of queuing requests", func(t *testing.T) { + ctx := testhelper.Context(t) + limit := NewAdaptiveLimit("dynamicLimit", AdaptiveSetting{Initial: 5, Max: 10, Min: 1}) + gauge := &blockingQueueCounter{queuedCh: make(chan struct{})} + limiter := NewConcurrencyLimiter(ctx, limit, 10, nil, gauge) + + // 5 requests acquired the tokens, the limiter is full now + release1, waitAfterRelease1 := spawnAndWaitAcquired(ctx, t, "1", limiter, gauge, 5) + require.Equal(t, 5, gauge.enter) + + // 2 requests waiting in the queue + waitAcquired2, release2, waitAfterRelease2 := spawnAndWaitQueued(ctx, t, "1", limiter, gauge, 2) + require.Equal(t, 5, gauge.enter) + require.Equal(t, 7, gauge.queued) + + // 5 more requests waiting in the queue + waitAcquired3, release3, waitAfterRelease3 := spawnAndWaitQueued(ctx, t, "1", limiter, gauge, 5) + require.Equal(t, 5, gauge.enter) + require.Equal(t, 12, gauge.queued) + + // Update the limit to 7. + limit.Update(7) + + // Release first 5 requests, all requests should fit in the queue now. + close(release1) + waitAfterRelease1() + require.Equal(t, 5, gauge.exit) + + waitAcquired2() + waitAcquired3() + require.Equal(t, 12, gauge.enter) + + // Now release all requests + close(release2) + close(release3) + waitAfterRelease2() + waitAfterRelease3() + require.Equal(t, 12, gauge.exit) + }) + + t.Run("decrease dynamic limit less than the number of concurrent requests", func(t *testing.T) { + ctx := testhelper.Context(t) + limit := NewAdaptiveLimit("dynamicLimit", AdaptiveSetting{Initial: 5, Max: 10, Min: 1}) + gauge := &blockingQueueCounter{queuedCh: make(chan struct{})} + limiter := NewConcurrencyLimiter(ctx, limit, 10, nil, gauge) + + // 5 requests acquired the tokens, the limiter is full now + release1, waitAfterRelease1 := spawnAndWaitAcquired(ctx, t, "1", limiter, gauge, 5) + require.Equal(t, 5, gauge.enter) + + // Update the limit to 3 + limit.Update(3) + + // 3 requests are put in queue + waitAcquired2, release2, waitAfterRelease2 := spawnAndWaitQueued(ctx, t, "1", limiter, gauge, 3) + require.Equal(t, 5, gauge.enter) + require.Equal(t, 8, gauge.queued) + + // Release the first 5 requests + close(release1) + waitAfterRelease1() + require.Equal(t, 5, gauge.exit) + + // Now the last 3 requests acquire the tokens + waitAcquired2() + require.Equal(t, 8, gauge.enter) + + // 1 more request is put in queue, meaning the limit shrinks down to 3. + waitAcquired3, release3, waitAfterRelease3 := spawnAndWaitQueued(ctx, t, "1", limiter, gauge, 1) + require.Equal(t, 8, gauge.enter) + require.Equal(t, 9, gauge.queued) + + // Release the second 3 requests + close(release2) + waitAfterRelease2() + require.Equal(t, 8, gauge.exit) + + // The last request acquires the token + waitAcquired3() + require.Equal(t, 9, gauge.enter) + + // Release the last request + close(release3) + waitAfterRelease3() + require.Equal(t, 9, gauge.exit) + }) + + t.Run("increase and decrease dynamic limit multiple times", func(t *testing.T) { + ctx := testhelper.Context(t) + limit := NewAdaptiveLimit("dynamicLimit", AdaptiveSetting{Initial: 5, Max: 10, Min: 1}) + gauge := &blockingQueueCounter{queuedCh: make(chan struct{})} + limiter := NewConcurrencyLimiter(ctx, limit, 10, nil, gauge) + + // Update the limit to 7 + limit.Update(7) + + // 5 requests acquired the tokens + release1, waitAfterRelease1 := spawnAndWaitAcquired(ctx, t, "1", limiter, gauge, 5) + require.Equal(t, 5, gauge.enter) + + // Update the limit to 3 + limit.Update(3) + + // 3 requests are put in queue + waitAcquired2, release2, waitAfterRelease2 := spawnAndWaitQueued(ctx, t, "1", limiter, gauge, 3) + require.Equal(t, 5, gauge.enter) + require.Equal(t, 8, gauge.queued) + + // Update the limit to 10 + limit.Update(10) + + // All existing requests acquire the tokens + waitAcquired2() + require.Equal(t, 8, gauge.enter) + + // 2 more requests + release3, waitAfterRelease3 := spawnAndWaitAcquired(ctx, t, "1", limiter, gauge, 2) + require.Equal(t, 10, gauge.enter) + + // Update the limit to 1 + limit.Update(1) + + // Now release all of them + close(release1) + waitAfterRelease1() + require.Equal(t, 5, gauge.exit) + + close(release2) + waitAfterRelease2() + require.Equal(t, 8, gauge.exit) + + close(release3) + waitAfterRelease3() + require.Equal(t, 10, gauge.exit) + }) + + t.Run("increase the limit when the queue is full", func(t *testing.T) { + ctx := testhelper.Context(t) + limit := NewAdaptiveLimit("dynamicLimit", AdaptiveSetting{Initial: 1, Max: 10, Min: 1}) + gauge := &blockingQueueCounter{queuedCh: make(chan struct{})} + // Mind the queue length here + limiter := NewConcurrencyLimiter(ctx, limit, 5, nil, gauge) + + // 1 requests acquired the tokens, the limiter is full now + release1, waitAfterRelease1 := spawnAndWaitAcquired(ctx, t, "1", limiter, gauge, 1) + require.Equal(t, 1, gauge.enter) + require.Equal(t, 1, gauge.queued) + + // 5 requests queuing for the tokens, the queue is full now + waitAcquired2, release2, waitAfterRelease2 := spawnAndWaitQueued(ctx, t, "1", limiter, gauge, 5) + require.Equal(t, 1, gauge.enter) + require.Equal(t, 6, gauge.queued) + + // Limiter rejects new request + maximumQueueSizeReached(ctx, t, "1", limiter) + + // Update the limit + limit.Update(6) + waitAcquired2() + require.Equal(t, 6, gauge.enter) + + // 5 requests queuing for the tokens, the queue is full now + waitAcquired3, release3, waitAfterRelease3 := spawnAndWaitQueued(ctx, t, "1", limiter, gauge, 5) + require.Equal(t, 6, gauge.enter) + require.Equal(t, 11, gauge.queued) + + // Limiter rejects new request + maximumQueueSizeReached(ctx, t, "1", limiter) + + // Clean up + close(release1) + close(release2) + waitAfterRelease1() + waitAfterRelease2() + require.Equal(t, 6, gauge.exit) + + waitAcquired3() + require.Equal(t, 11, gauge.enter) + close(release3) + waitAfterRelease3() + }) + + t.Run("decrease the limit when the queue is full", func(t *testing.T) { + ctx := testhelper.Context(t) + limit := NewAdaptiveLimit("dynamicLimit", AdaptiveSetting{Initial: 5, Max: 10, Min: 1}) + gauge := &blockingQueueCounter{queuedCh: make(chan struct{})} + // Mind the queue length here + limiter := NewConcurrencyLimiter(ctx, limit, 3, nil, gauge) + + // 5 requests acquired the tokens, the limiter is full now + release1, waitAfterRelease1 := spawnAndWaitAcquired(ctx, t, "1", limiter, gauge, 5) + require.Equal(t, 5, gauge.enter) + require.Equal(t, 5, gauge.queued) + + // 5 requests queuing for the tokens, the queue is full now + waitAcquired2, release2, waitAfterRelease2 := spawnAndWaitQueued(ctx, t, "1", limiter, gauge, 3) + require.Equal(t, 5, gauge.enter) + require.Equal(t, 8, gauge.queued) + + // Limiter rejects new request + maximumQueueSizeReached(ctx, t, "1", limiter) + + // Update the limit. + limit.Update(3) + + // The queue is still full + maximumQueueSizeReached(ctx, t, "1", limiter) + + // Release first 5 requests and let the last 3 requests in + close(release1) + waitAfterRelease1() + require.Equal(t, 5, gauge.exit) + waitAcquired2() + require.Equal(t, 8, gauge.enter) + + // Another 5 requests in queue. The queue is still full, meaning the concurrency is 3 and the queue is still 5. + waitAcquired3, release3, waitAfterRelease3 := spawnAndWaitQueued(ctx, t, "1", limiter, gauge, 3) + require.Equal(t, 8, gauge.enter) + require.Equal(t, 11, gauge.queued) + maximumQueueSizeReached(ctx, t, "1", limiter) + + // Clean up + close(release2) + waitAfterRelease2() + require.Equal(t, 8, gauge.exit) + waitAcquired3() + require.Equal(t, 11, gauge.enter) + close(release3) + waitAfterRelease3() + require.Equal(t, 11, gauge.exit) + }) + + t.Run("dynamic limit works without queuing", func(t *testing.T) { + ctx := testhelper.Context(t) + limit := NewAdaptiveLimit("dynamicLimit", AdaptiveSetting{Initial: 5, Max: 10, Min: 1}) + gauge := &blockingQueueCounter{queuedCh: make(chan struct{})} + // No queue, it means the limiter accepts unlimited requests + limiter := NewConcurrencyLimiter(ctx, limit, 0, nil, gauge) + + // 5 requests acquired the tokens, the limiter is full now + release1, waitAfterRelease1 := spawnAndWaitAcquired(ctx, t, "1", limiter, gauge, 5) + require.Equal(t, 5, gauge.enter) + require.Equal(t, 5, gauge.queued) + + // 5 more requests + waitAcquired2, release2, waitAfterRelease2 := spawnAndWaitQueued(ctx, t, "1", limiter, gauge, 5) + + // Update the limit. + limit.Update(10) + + // All of them acquired the tokens + waitAcquired2() + require.Equal(t, 10, gauge.enter) + + // Clean up + close(release1) + close(release2) + waitAfterRelease1() + waitAfterRelease2() + require.Equal(t, 10, gauge.exit) + }) + + t.Run("dynamic limit works with queue timer", func(t *testing.T) { + ctx := testhelper.Context(t) + limit := NewAdaptiveLimit("dynamicLimit", AdaptiveSetting{Initial: 5, Max: 10, Min: 1}) + gauge := &blockingQueueCounter{queuedCh: make(chan struct{})} + + ticker := helper.NewManualTicker() + limiter := NewConcurrencyLimiter(ctx, limit, 0, func() helper.Ticker { return ticker }, gauge) + + // 5 requests acquired the tokens, the limiter is full now + release1, waitAfterRelease1 := spawnAndWaitAcquired(ctx, t, "1", limiter, gauge, 5) + require.Equal(t, 5, gauge.enter) + require.Equal(t, 5, gauge.queued) + + errors := make(chan error, 10) + // 5 requests in queue + spawnQueuedAndCollectErrors(ctx, "1", limiter, gauge, 5, errors) + + // Decrease the limit + limit.Update(3) + + // 5 more requests in queue + spawnQueuedAndCollectErrors(ctx, "1", limiter, gauge, 5, errors) + + // Trigger timeout event + for i := 0; i < 10; i++ { + ticker.Tick() + require.EqualError(t, <-errors, "maximum time in concurrency queue reached") + } + + // Other goroutines exit as normal + close(release1) + waitAfterRelease1() + require.Equal(t, 5, gauge.exit) + }) + + t.Run("dynamic limit works with context cancellation", func(t *testing.T) { + ctx := testhelper.Context(t) + ctx2, cancel := context.WithCancel(testhelper.Context(t)) + + limit := NewAdaptiveLimit("dynamicLimit", AdaptiveSetting{Initial: 5, Max: 10, Min: 1}) + gauge := &blockingQueueCounter{queuedCh: make(chan struct{})} + + limiter := NewConcurrencyLimiter(ctx, limit, 0, nil, gauge) + + // 5 requests acquired the tokens, the limiter is full now + release1, waitAfterRelease1 := spawnAndWaitAcquired(ctx, t, "1", limiter, gauge, 5) + require.Equal(t, 5, gauge.enter) + require.Equal(t, 5, gauge.queued) + + errors := make(chan error, 10) + // 5 requests in queue + spawnQueuedAndCollectErrors(ctx2, "1", limiter, gauge, 5, errors) + + // Decrease the limit + limit.Update(3) + + // 5 more requests in queue + spawnQueuedAndCollectErrors(ctx2, "1", limiter, gauge, 5, errors) + + // Trigger context cancellation + cancel() + for i := 0; i < 10; i++ { + require.EqualError(t, <-errors, "unexpected error when dequeueing request: context canceled") + } + + // Other goroutines exit as normal + close(release1) + waitAfterRelease1() + require.Equal(t, 5, gauge.exit) + }) + + t.Run("dynamic limit works with multiple buckets", func(t *testing.T) { + ctx := testhelper.Context(t) + limit := NewAdaptiveLimit("dynamicLimit", AdaptiveSetting{Initial: 5, Max: 10, Min: 1}) + gauge := &blockingQueueCounter{queuedCh: make(chan struct{})} + + limiter := NewConcurrencyLimiter(ctx, limit, 5, nil, gauge) + + var releaseChans []chan struct{} + var waitAcquireFuncs, waitReleaseFuncs []func() + + // 5 * 5 requests acquired tokens + for i := 1; i <= 5; i++ { + release, waitAfterRelease := spawnAndWaitAcquired(ctx, t, fmt.Sprintf("%d", i), limiter, gauge, 5) + releaseChans = append(releaseChans, release) + waitReleaseFuncs = append(waitReleaseFuncs, waitAfterRelease) + } + require.Equal(t, 25, gauge.enter) + + // 1 + 2 + 3 + 4 + 5 requests are in queue + for i := 1; i <= 5; i++ { + waitAcquired, release, waitAfterRelease := spawnAndWaitQueued(ctx, t, fmt.Sprintf("%d", i), limiter, gauge, i) + waitAcquireFuncs = append(waitAcquireFuncs, waitAcquired) + releaseChans = append(releaseChans, release) + waitReleaseFuncs = append(waitReleaseFuncs, waitAfterRelease) + } + require.Equal(t, 25, gauge.enter) + require.Equal(t, 40, gauge.queued) + + // Update limit, enough for all requests + limit.Update(10) + + // All requests acquired tokens now + for _, wait := range waitAcquireFuncs { + wait() + } + require.Equal(t, 40, gauge.enter) + + // Release all + for _, release := range releaseChans { + close(release) + } + for _, wait := range waitReleaseFuncs { + wait() + } + require.Equal(t, 40, gauge.exit) + }) +} + +// spawnAndWaitAcquired spawns N goroutines that wait for the limiter. They wait until all of them acquire the limiter +// token before exiting. This function returns a channel to control token release and a function to wait until all +// goroutines finish. +func spawnAndWaitAcquired(ctx context.Context, t *testing.T, bucket string, limiter *ConcurrencyLimiter, gauge *blockingQueueCounter, n int) (chan struct{}, func()) { + var acquireWg, releaseWg sync.WaitGroup + release := make(chan struct{}) + + for i := 0; i < n; i++ { + acquireWg.Add(1) + releaseWg.Add(1) + go func() { + defer releaseWg.Done() + _, err := limiter.Limit(ctx, bucket, func() (resp interface{}, err error) { + acquireWg.Done() + <-release + return nil, nil + }) + require.NoError(t, err) + }() + } + for i := 0; i < n; i++ { + <-gauge.queuedCh + gauge.queued++ + } + acquireWg.Wait() + + return release, releaseWg.Wait +} + +// spawnAndWaitQueued spawns N goroutines that wait for the limiter. They wait until all of them are queued. This +// function returns a function to wait for channel to acquired the token, a channel to control token release, and a +// function to wait until all goroutines finish. +func spawnAndWaitQueued(ctx context.Context, t *testing.T, bucket string, limiter *ConcurrencyLimiter, gauge *blockingQueueCounter, n int) (func(), chan struct{}, func()) { + var acquireWg, releaseWg sync.WaitGroup + release := make(chan struct{}) + + for i := 0; i < n; i++ { + acquireWg.Add(1) + releaseWg.Add(1) + go func() { + defer releaseWg.Done() + _, err := limiter.Limit(ctx, bucket, func() (resp interface{}, err error) { + acquireWg.Done() + <-release + return nil, nil + }) + require.NoError(t, err) + }() + } + for i := 0; i < n; i++ { + <-gauge.queuedCh + gauge.queued++ + } + + return acquireWg.Wait, release, releaseWg.Wait +} + +func spawnQueuedAndCollectErrors(ctx context.Context, bucket string, limiter *ConcurrencyLimiter, gauge *blockingQueueCounter, n int, errors chan error) { + for i := 0; i < n; i++ { + go func() { + _, err := limiter.Limit(ctx, bucket, func() (interface{}, error) { + return nil, fmt.Errorf("should not call") + }) + errors <- err + }() + } + for i := 0; i < n; i++ { + <-gauge.queuedCh + gauge.queued++ + } +} + +func maximumQueueSizeReached(ctx context.Context, t *testing.T, bucket string, limiter *ConcurrencyLimiter) { + _, err := limiter.Limit(ctx, bucket, func() (interface{}, error) { + return nil, fmt.Errorf("should not call") + }) + require.EqualError(t, err, "maximum queue size reached") +} + type blockingQueueCounter struct { counter @@ -248,7 +801,7 @@ func TestConcurrencyLimiter_queueLimit(t *testing.T) { monitorCh := make(chan struct{}) monitor := &blockingQueueCounter{queuedCh: monitorCh} ch := make(chan struct{}) - limiter := NewConcurrencyLimiter(1, queueLimit, nil, monitor) + limiter := NewConcurrencyLimiter(ctx, NewAdaptiveLimit("staticLimit", AdaptiveSetting{Initial: 1}), queueLimit, nil, monitor) // occupied with one live request that takes a long time to complete go func() { @@ -333,7 +886,8 @@ func TestLimitConcurrency_queueWaitTime(t *testing.T) { monitor := &blockingDequeueCounter{dequeuedCh: dequeuedCh} limiter := NewConcurrencyLimiter( - 1, + ctx, + NewAdaptiveLimit("staticLimit", AdaptiveSetting{Initial: 1}), 0, func() helper.Ticker { return ticker diff --git a/internal/limiter/resizable_semaphore.go b/internal/limiter/resizable_semaphore.go new file mode 100644 index 000000000..19434afc3 --- /dev/null +++ b/internal/limiter/resizable_semaphore.go @@ -0,0 +1,157 @@ +package limiter + +import ( + "context" + "sync/atomic" +) + +// resizableSemaphore struct models provides a way to bound concurrent access to resources. It allows a certain number +// of concurrent access to the resources. When the concurrency reaches the semaphore size, the callers are blocked until +// a resource is available again. The size of the semaphore can be resized freely in an atomic manner. +// +// Internally, a separate goroutine manages the semaphore's functionality, providing synchronization across all channel +// operations. Callers acquire the semaphore by pulling a "token" through an `acquireCh` channel via `acquire()`, and +// return them to an `releaseCh` channel via `release()`. This goroutine ensures that tokens are properly distributed +// from those channels, and also manages the semaphore's current length and size. It processes resize requests and +// handles try requests and responses, thus ensuring a smooth operation. +// +// Note: This struct is not intended to serve as a general-purpose data structure but is specifically designed for +// flexible concurrency control with resizable capacity. +type resizableSemaphore struct { + // err stores the error of why the sempahore is stopped. Most of the case, it's due to context cancellation. + err atomic.Pointer[error] + // current represents the current concurrency access to the resources. + current atomic.Int64 + // size is the maximum capacity of the semaphore. It represents the maximum concurrency that the resources can + // be accessed of the time. + size atomic.Int64 + // releaseCh is a channel used to return tokens back to the semaphore. When a caller returns a token, it sends a signal to this channel. + releaseCh chan struct{} + // acquireCh is a channel used for callers to acquire a token. When a token is available, a signal is sent to this channel. + acquireCh chan bool + // tryAcquireCh is a channel used to signal a try request. A try request is a non-blocking request to acquire a token. + tryAcquireCh chan struct{} + // tryAcquireRespCh is a channel used to respond to a try request. If a token is available, a nil error is sent; otherwise, an error is sent. + tryAcquireRespCh chan bool + // resizeCh is a channel used to request a resize of the semaphore's capacity. The requested new capacity is sent to this channel. + resizeCh chan int64 + // stopCh is a channel that determines whether the semaphore is stopped. + stopCh chan struct{} +} + +// NewResizableSemaphore creates and starts a resizableSemaphore +func NewResizableSemaphore(ctx context.Context, capacity int64) *resizableSemaphore { + s := &resizableSemaphore{ + stopCh: make(chan struct{}), + releaseCh: make(chan struct{}), + acquireCh: make(chan bool), + tryAcquireCh: make(chan struct{}), + tryAcquireRespCh: make(chan bool), + resizeCh: make(chan int64), + } + s.size.Store(capacity) + go s.start(ctx) + + return s +} + +// start kicks off a goroutine that maintains the states of the semaphore. It acts as an event loop that listens to +// all modifications. +func (q *resizableSemaphore) start(ctx context.Context) { + for { + if len := q.current.Load(); len < q.size.Load() { + select { + case <-q.releaseCh: + q.current.Store(len - 1) + case q.acquireCh <- true: + q.current.Store(len + 1) + case <-q.tryAcquireCh: + q.current.Store(len + 1) + q.tryAcquireRespCh <- true + case newSize := <-q.resizeCh: + // If the new capacity is greater than the prior one, the capacity grows without + // the need to do anything. It allows more callers to acquire the token in the + // right next iteration. + // In contrast, when the new capacity is less than the prior one, the capacity + // shrinks and the length becomes bigger than the new capacity. That's perfectly + // fine. All existing callers continue, new callers can't acquire new token. The + // length will be naturally adjusted below the capacity overtime. + q.size.Store(newSize) + case <-ctx.Done(): + q.stop(ctx.Err()) + return + } + } else { + select { + case <-q.releaseCh: + q.current.Store(len - 1) + case <-q.tryAcquireCh: + q.tryAcquireRespCh <- false + case newSize := <-q.resizeCh: + // Simiarly to the above case, overriding the capacity is enough. + q.size.Store(newSize) + case <-ctx.Done(): + q.stop(ctx.Err()) + return + } + } + } +} + +func (q *resizableSemaphore) stop(err error) { + q.current.Store(0) + q.err.Store(&err) + + close(q.stopCh) + // The only exposed channel. After this channel is closed, caller receives `false` when trying to pull from this + // channel. Other state-modifying channels stay instact but TryAcquire(), Release(), and Resize() returns + // immediately. + close(q.acquireCh) +} + +// stores the error of why the sempahore is stopped. Most of the case, it's due to context cancellation. +func (q *resizableSemaphore) Err() error { + return *q.err.Load() +} + +// Acquire returns a channel that allows the caller to acquire the semaphore. The caller is blocked until there +// is an available resource. It is not safe to use with a `switch` statement having `default` statement. In such +// use cases, use TryAcquire() instead. +func (q *resizableSemaphore) Acquire() <-chan bool { + return q.acquireCh +} + +// TryAcquire acquires the semaphore without blocking. On success, returns true. On failure, returns false and leaves +// the semaphore unchanged. +func (q *resizableSemaphore) TryAcquire() bool { + select { + case q.tryAcquireCh <- struct{}{}: + return <-q.tryAcquireRespCh + case <-q.stopCh: + return false + } +} + +// Release releases the semaphore by pushing the token back. If the semaphore stops, this function returns immediately. +func (q *resizableSemaphore) Release() { + select { + case q.releaseCh <- struct{}{}: + case <-q.stopCh: + // No op, return immediately. + } +} + +// Current returns the amount of current concurrent access to the semaphore. +func (q *resizableSemaphore) Current() int64 { + return q.current.Load() +} + +// Resize modifies the size of the semaphore. If the semaphore stops, this function returns immediately. +func (q *resizableSemaphore) Resize(newSize int64) { + select { + case q.resizeCh <- newSize: + case <-q.stopCh: + // This update is redundant, but it's a nice gesture that the state of semaphore is up-to-date. + q.size.Store(newSize) + } +} diff --git a/internal/limiter/resizable_semaphore_test.go b/internal/limiter/resizable_semaphore_test.go new file mode 100644 index 000000000..00ac90097 --- /dev/null +++ b/internal/limiter/resizable_semaphore_test.go @@ -0,0 +1,262 @@ +package limiter + +import ( + "context" + "sync" + "testing" + + "github.com/stretchr/testify/require" + "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper" +) + +func TestResizableSemaphore_New(t *testing.T) { + t.Parallel() + + semaphore := NewResizableSemaphore(testhelper.Context(t), 5) + require.Equal(t, int64(0), semaphore.Current()) +} + +func TestResizableSemaphore_Stopped(t *testing.T) { + t.Parallel() + + ctx, cancel := context.WithCancel(testhelper.Context(t)) + semaphore := NewResizableSemaphore(ctx, 5) + + // Try to acquire a token of the empty sempahore + require.True(t, semaphore.TryAcquire()) + semaphore.Release() + + // 5 goroutines acquired semaphore + beforeCallRelease := make(chan struct{}) + var acquireWg1, releaseWg1 sync.WaitGroup + for i := 0; i < 5; i++ { + acquireWg1.Add(1) + releaseWg1.Add(1) + go func() { + require.True(t, <-semaphore.Acquire()) + acquireWg1.Done() + + <-beforeCallRelease + + semaphore.Release() + releaseWg1.Done() + }() + } + acquireWg1.Wait() + + // Another 5 waits for sempahore + var acquireWg2 sync.WaitGroup + for i := 0; i < 5; i++ { + acquireWg2.Add(1) + go func() { + // This goroutine is block until the context is cancel, which returns false + require.False(t, <-semaphore.Acquire()) + acquireWg2.Done() + }() + } + + // Try to acquire a token of the full semaphore + require.False(t, semaphore.TryAcquire()) + + // Cancel the context + cancel() + acquireWg2.Wait() + + // The first 5 goroutines can call Release() even if the semaphore stopped + close(beforeCallRelease) + releaseWg1.Wait() + + // The last 5 goroutines exits immediately, Acquire() returns false + acquireWg2.Wait() + + require.False(t, semaphore.TryAcquire()) + require.Equal(t, int64(0), semaphore.Current()) + require.Equal(t, ctx.Err(), semaphore.Err()) +} + +func TestResizableSemaphore_Acquire(t *testing.T) { + t.Parallel() + + t.Run("acquire less than the capacity", func(t *testing.T) { + semaphore := NewResizableSemaphore(testhelper.Context(t), 5) + + waitBeforeRelease, waitRelease := acquireSemaphore(t, semaphore, 4) + require.NotNil(t, <-semaphore.Acquire()) + + close(waitBeforeRelease) + waitRelease() + }) + + t.Run("acquire more than the capacity", func(t *testing.T) { + semaphore := NewResizableSemaphore(testhelper.Context(t), 5) + + waitBeforeRelease, waitRelease := acquireSemaphore(t, semaphore, 5) + require.False(t, semaphore.TryAcquire()) + + close(waitBeforeRelease) + waitRelease() + }) + + t.Run("semaphore is full then available again", func(t *testing.T) { + semaphore := NewResizableSemaphore(testhelper.Context(t), 5) + + waitChan := make(chan bool) + _, _ = acquireSemaphore(t, semaphore, 5) + + go func() { + waitChan <- <-semaphore.Acquire() + }() + + // The semaphore is full now + require.False(t, semaphore.TryAcquire()) + + // Release one token + semaphore.Release() + // The waiting channel is unlocked + require.True(t, <-waitChan) + + // Release another token + semaphore.Release() + // Now TryAcquire can pull out a token + require.True(t, semaphore.TryAcquire()) + }) + + t.Run("the semaphore is resized up when empty", func(t *testing.T) { + semaphore := NewResizableSemaphore(testhelper.Context(t), 5) + semaphore.Resize(10) + + waitBeforeRelease, waitRelease := acquireSemaphore(t, semaphore, 9) + require.NotNil(t, <-semaphore.Acquire()) + + close(waitBeforeRelease) + waitRelease() + }) + + t.Run("the semaphore is resized up when not empty", func(t *testing.T) { + semaphore := NewResizableSemaphore(testhelper.Context(t), 7) + + waitBeforeRelease1, waitRelease1 := acquireSemaphore(t, semaphore, 5) + semaphore.Resize(15) + waitBeforeRelease2, waitRelease2 := acquireSemaphore(t, semaphore, 5) + + require.NotNil(t, <-semaphore.Acquire()) + + close(waitBeforeRelease1) + close(waitBeforeRelease2) + waitRelease1() + waitRelease2() + }) + + t.Run("the semaphore is resized up when full", func(t *testing.T) { + semaphore := NewResizableSemaphore(testhelper.Context(t), 5) + + waitBeforeRelease1, waitRelease1 := acquireSemaphore(t, semaphore, 5) + require.False(t, semaphore.TryAcquire()) + + semaphore.Resize(10) + + waitBeforeRelease2, waitRelease2 := acquireSemaphore(t, semaphore, 5) + require.False(t, semaphore.TryAcquire()) + + var wg sync.WaitGroup + for i := 0; i < 5; i++ { + wg.Add(1) + go func() { + <-semaphore.Acquire() + wg.Done() + semaphore.Release() + }() + } + + semaphore.Resize(15) + wg.Wait() + + close(waitBeforeRelease1) + close(waitBeforeRelease2) + waitRelease1() + waitRelease2() + }) + + t.Run("the semaphore is resized down when empty", func(t *testing.T) { + semaphore := NewResizableSemaphore(testhelper.Context(t), 10) + semaphore.Resize(5) + + waitBeforeRelease, waitRelease := acquireSemaphore(t, semaphore, 4) + require.NotNil(t, <-semaphore.Acquire()) + + close(waitBeforeRelease) + waitRelease() + }) + + t.Run("the semaphore is resized down when not empty", func(t *testing.T) { + semaphore := NewResizableSemaphore(testhelper.Context(t), 20) + + waitBeforeRelease1, waitRelease1 := acquireSemaphore(t, semaphore, 5) + semaphore.Resize(15) + waitBeforeRelease2, waitRelease2 := acquireSemaphore(t, semaphore, 5) + + require.NotNil(t, <-semaphore.Acquire()) + + close(waitBeforeRelease1) + close(waitBeforeRelease2) + waitRelease1() + waitRelease2() + }) + + t.Run("the semaphore is resized down lower than the current length", func(t *testing.T) { + semaphore := NewResizableSemaphore(testhelper.Context(t), 10) + + waitBeforeRelease1, waitRelease1 := acquireSemaphore(t, semaphore, 5) + + semaphore.Resize(3) + + require.False(t, semaphore.TryAcquire()) + close(waitBeforeRelease1) + waitRelease1() + + waitBeforeRelease2, waitRelease2 := acquireSemaphore(t, semaphore, 3) + require.False(t, semaphore.TryAcquire()) + + close(waitBeforeRelease2) + waitRelease2() + }) + + t.Run("the semaphore is resized down when full", func(t *testing.T) { + semaphore := NewResizableSemaphore(testhelper.Context(t), 10) + + waitBeforeRelease1, waitRelease1 := acquireSemaphore(t, semaphore, 10) + + semaphore.Resize(5) + + require.False(t, semaphore.TryAcquire()) + close(waitBeforeRelease1) + waitRelease1() + + waitBeforeRelease2, waitRelease2 := acquireSemaphore(t, semaphore, 5) + require.False(t, semaphore.TryAcquire()) + + close(waitBeforeRelease2) + waitRelease2() + }) +} + +func acquireSemaphore(t *testing.T, semaphore *resizableSemaphore, n int) (chan struct{}, func()) { + var acquireWg, releaseWg sync.WaitGroup + waitBeforeRelease := make(chan struct{}) + + for i := 0; i < n; i++ { + acquireWg.Add(1) + releaseWg.Add(1) + go func() { + require.True(t, <-semaphore.Acquire()) + acquireWg.Done() + + <-waitBeforeRelease + semaphore.Release() + releaseWg.Done() + }() + } + acquireWg.Wait() + + return waitBeforeRelease, releaseWg.Wait +} diff --git a/internal/testhelper/testserver/gitaly.go b/internal/testhelper/testserver/gitaly.go index 8e44b7839..47fb56f46 100644 --- a/internal/testhelper/testserver/gitaly.go +++ b/internal/testhelper/testserver/gitaly.go @@ -161,6 +161,8 @@ func waitHealthy(tb testing.TB, ctx context.Context, addr string, authToken stri func runGitaly(tb testing.TB, cfg config.Cfg, registrar func(srv *grpc.Server, deps *service.Dependencies), opts ...GitalyServerOpt) (*grpc.Server, string, bool) { tb.Helper() + ctx := testhelper.Context(tb) + var gsd gitalyServerDeps for _, opt := range opts { gsd = opt(gsd) @@ -173,7 +175,7 @@ func runGitaly(tb testing.TB, cfg config.Cfg, registrar func(srv *grpc.Server, d server.WithStreamInterceptor(StructErrStreamInterceptor), } - deps := gsd.createDependencies(tb, cfg) + deps := gsd.createDependencies(ctx, tb, cfg) tb.Cleanup(func() { gsd.conns.Close() }) serverFactory := server.NewGitalyServerFactory( @@ -201,7 +203,6 @@ func runGitaly(tb testing.TB, cfg config.Cfg, registrar func(srv *grpc.Server, d assert.NoError(tb, internalServer.Serve(internalListener), "failure to serve internal gRPC") }() - ctx := testhelper.Context(tb) waitHealthy(tb, ctx, "unix://"+internalListener.Addr().String(), cfg.Auth.Token) } @@ -238,7 +239,6 @@ func runGitaly(tb testing.TB, cfg config.Cfg, registrar func(srv *grpc.Server, d assert.NoError(tb, externalServer.Serve(listener), "failure to serve external gRPC") }() - ctx := testhelper.Context(tb) waitHealthy(tb, ctx, addr, cfg.Auth.Token) return externalServer, addr, gsd.disablePraefect @@ -276,7 +276,7 @@ type gitalyServerDeps struct { signingKey string } -func (gsd *gitalyServerDeps) createDependencies(tb testing.TB, cfg config.Cfg) *service.Dependencies { +func (gsd *gitalyServerDeps) createDependencies(ctx context.Context, tb testing.TB, cfg config.Cfg) *service.Dependencies { if gsd.logger == nil { gsd.logger = testhelper.NewGitalyServerLogger(tb) } @@ -328,7 +328,8 @@ func (gsd *gitalyServerDeps) createDependencies(tb testing.TB, cfg config.Cfg) * if gsd.packObjectsLimiter == nil { gsd.packObjectsLimiter = limiter.NewConcurrencyLimiter( - 0, + ctx, + limiter.NewAdaptiveLimit("staticLimit", limiter.AdaptiveSetting{Initial: 0}), 0, nil, limiter.NewNoopConcurrencyMonitor(), @@ -336,7 +337,7 @@ func (gsd *gitalyServerDeps) createDependencies(tb testing.TB, cfg config.Cfg) * } if gsd.limitHandler == nil { - gsd.limitHandler = limithandler.New(cfg, limithandler.LimitConcurrencyByRepo, limithandler.WithConcurrencyLimiters) + gsd.limitHandler = limithandler.New(cfg, limithandler.LimitConcurrencyByRepo, limithandler.WithConcurrencyLimiters(ctx)) } if gsd.repositoryCounter == nil { |