Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorQuang-Minh Nguyen <qmnguyen@gitlab.com>2023-08-09 17:14:01 +0300
committerQuang-Minh Nguyen <qmnguyen@gitlab.com>2023-08-09 17:14:01 +0300
commit023c2a7d02fa92926cb3db390357149e5ac1151c (patch)
tree30b3cccf89f7292639855ebe8ad302948e960e0e
parent0e78015ff2052203845e049be8b3395bac782554 (diff)
parentea45aabc5baff7eb41f515eef6a44981e6da9945 (diff)
Merge branch 'qmnguyen0711/add-adaptive-limit-to-pack-objects-limit' into 'master'
limiter: Add adaptive limit to the concurrency limiter See merge request https://gitlab.com/gitlab-org/gitaly/-/merge_requests/6183 Merged-by: Quang-Minh Nguyen <qmnguyen@gitlab.com> Approved-by: Patrick Steinhardt <psteinhardt@gitlab.com> Reviewed-by: Patrick Steinhardt <psteinhardt@gitlab.com> Reviewed-by: Quang-Minh Nguyen <qmnguyen@gitlab.com> Reviewed-by: John Cai <jcai@gitlab.com>
-rw-r--r--internal/cli/gitaly/serve.go10
-rw-r--r--internal/gitaly/server/auth_test.go32
-rw-r--r--internal/gitaly/service/hook/pack_objects_test.go3
-rw-r--r--internal/grpc/middleware/limithandler/middleware.go144
-rw-r--r--internal/grpc/middleware/limithandler/middleware_test.go24
-rw-r--r--internal/limiter/adaptive_calculator.go2
-rw-r--r--internal/limiter/adaptive_calculator_test.go10
-rw-r--r--internal/limiter/adaptive_limit.go57
-rw-r--r--internal/limiter/adaptive_limit_test.go124
-rw-r--r--internal/limiter/concurrency_limiter.go99
-rw-r--r--internal/limiter/concurrency_limiter_test.go562
-rw-r--r--internal/limiter/resizable_semaphore.go157
-rw-r--r--internal/limiter/resizable_semaphore_test.go262
-rw-r--r--internal/testhelper/testserver/gitaly.go13
14 files changed, 1338 insertions, 161 deletions
diff --git a/internal/cli/gitaly/serve.go b/internal/cli/gitaly/serve.go
index c5a70688f..3c7d6bdaf 100644
--- a/internal/cli/gitaly/serve.go
+++ b/internal/cli/gitaly/serve.go
@@ -265,10 +265,15 @@ func run(cfg config.Cfg) error {
return fmt.Errorf("disk cache walkers: %w", err)
}
+ // The pack-objects limit below is static at this stage. It's always equal to the initial limit, which uses
+ // MaxConcurrency config.
+ packObjectLimit := limiter.NewAdaptiveLimit("packObjects", limiter.AdaptiveSetting{
+ Initial: cfg.PackObjectsLimiting.MaxConcurrency,
+ })
concurrencyLimitHandler := limithandler.New(
cfg,
limithandler.LimitConcurrencyByRepo,
- limithandler.WithConcurrencyLimiters,
+ limithandler.WithConcurrencyLimiters(ctx),
)
rateLimitHandler := limithandler.New(
@@ -289,7 +294,8 @@ func run(cfg config.Cfg) error {
}
}
packObjectsLimiter := limiter.NewConcurrencyLimiter(
- cfg.PackObjectsLimiting.MaxConcurrency,
+ ctx,
+ packObjectLimit,
cfg.PackObjectsLimiting.MaxQueueLength,
newTickerFunc,
packObjectsMonitor,
diff --git a/internal/gitaly/server/auth_test.go b/internal/gitaly/server/auth_test.go
index d275319f5..99d9f498a 100644
--- a/internal/gitaly/server/auth_test.go
+++ b/internal/gitaly/server/auth_test.go
@@ -1,7 +1,7 @@
package server
import (
- netctx "context"
+ "context"
"crypto/tls"
"crypto/x509"
"fmt"
@@ -44,7 +44,7 @@ func TestMain(m *testing.M) {
}
func TestSanity(t *testing.T) {
- serverSocketPath := runServer(t, testcfg.Build(t))
+ serverSocketPath := runServer(testhelper.Context(t), t, testcfg.Build(t))
conn, err := dial(serverSocketPath, []grpc.DialOption{grpc.WithTransportCredentials(insecure.NewCredentials())})
require.NoError(t, err)
@@ -55,7 +55,8 @@ func TestSanity(t *testing.T) {
func TestTLSSanity(t *testing.T) {
cfg := testcfg.Build(t)
- addr := runSecureServer(t, cfg)
+ ctx := testhelper.Context(t)
+ addr := runSecureServer(ctx, t, cfg)
certPool, err := x509.SystemCertPool()
require.NoError(t, err)
@@ -102,7 +103,7 @@ func TestAuthFailures(t *testing.T) {
Auth: auth.Config{Token: "quxbaz"},
}))
- serverSocketPath := runServer(t, cfg)
+ serverSocketPath := runServer(testhelper.Context(t), t, cfg)
connOpts := append(tc.opts, grpc.WithTransportCredentials(insecure.NewCredentials()))
conn, err := dial(serverSocketPath, connOpts)
require.NoError(t, err, tc.desc)
@@ -145,7 +146,7 @@ func TestAuthSuccess(t *testing.T) {
Auth: auth.Config{Token: tc.token, Transitioning: !tc.required},
}))
- serverSocketPath := runServer(t, cfg)
+ serverSocketPath := runServer(testhelper.Context(t), t, cfg)
connOpts := append(tc.opts, grpc.WithTransportCredentials(insecure.NewCredentials()))
conn, err := dial(serverSocketPath, connOpts)
require.NoError(t, err, tc.desc)
@@ -158,7 +159,7 @@ func TestAuthSuccess(t *testing.T) {
type brokenAuth struct{}
func (brokenAuth) RequireTransportSecurity() bool { return false }
-func (brokenAuth) GetRequestMetadata(netctx.Context, ...string) (map[string]string, error) {
+func (brokenAuth) GetRequestMetadata(context.Context, ...string) (map[string]string, error) {
return map[string]string{"authorization": "Bearer blablabla"}, nil
}
@@ -186,7 +187,7 @@ func newOperationClient(t *testing.T, token, serverSocketPath string) (gitalypb.
return gitalypb.NewOperationServiceClient(conn), conn
}
-func runServer(t *testing.T, cfg config.Cfg) string {
+func runServer(ctx context.Context, t *testing.T, cfg config.Cfg) string {
t.Helper()
registry := backchannel.NewRegistry()
@@ -201,7 +202,7 @@ func runServer(t *testing.T, cfg config.Cfg) string {
catfileCache := catfile.NewCache(cfg)
t.Cleanup(catfileCache.Stop)
diskCache := cache.New(cfg, locator)
- limitHandler := limithandler.New(cfg, limithandler.LimitConcurrencyByRepo, limithandler.WithConcurrencyLimiters)
+ limitHandler := limithandler.New(cfg, limithandler.LimitConcurrencyByRepo, limithandler.WithConcurrencyLimiters(ctx))
updaterWithHooks := updateref.NewUpdaterWithHooks(cfg, locator, hookManager, gitCmdFactory, catfileCache)
srv, err := NewGitalyServerFactory(cfg, testhelper.NewDiscardingLogEntry(t), registry, diskCache, []*limithandler.LimiterMiddleware{limitHandler}).New(false)
@@ -228,7 +229,7 @@ func runServer(t *testing.T, cfg config.Cfg) string {
}
//go:generate openssl req -newkey rsa:4096 -new -nodes -x509 -days 3650 -out testdata/gitalycert.pem -keyout testdata/gitalykey.pem -subj "/C=US/ST=California/L=San Francisco/O=GitLab/OU=GitLab-Shell/CN=localhost" -addext "subjectAltName = IP:127.0.0.1, DNS:localhost"
-func runSecureServer(t *testing.T, cfg config.Cfg) string {
+func runSecureServer(ctx context.Context, t *testing.T, cfg config.Cfg) string {
t.Helper()
cfg.TLS = config.TLS{
@@ -244,7 +245,7 @@ func runSecureServer(t *testing.T, cfg config.Cfg) string {
testhelper.NewDiscardingLogEntry(t),
backchannel.NewRegistry(),
cache.New(cfg, config.NewLocator(cfg)),
- []*limithandler.LimiterMiddleware{limithandler.New(cfg, limithandler.LimitConcurrencyByRepo, limithandler.WithConcurrencyLimiters)},
+ []*limithandler.LimiterMiddleware{limithandler.New(cfg, limithandler.LimitConcurrencyByRepo, limithandler.WithConcurrencyLimiters(ctx))},
).New(true)
require.NoError(t, err)
@@ -259,11 +260,12 @@ func runSecureServer(t *testing.T, cfg config.Cfg) string {
func TestUnaryNoAuth(t *testing.T) {
cfg := testcfg.Build(t, testcfg.WithBase(config.Cfg{Auth: auth.Config{Token: "testtoken"}}))
- path := runServer(t, cfg)
+ ctx := testhelper.Context(t)
+
+ path := runServer(ctx, t, cfg)
conn, err := grpc.Dial(path, grpc.WithTransportCredentials(insecure.NewCredentials()))
require.NoError(t, err)
defer testhelper.MustClose(t, conn)
- ctx := testhelper.Context(t)
client := gitalypb.NewRepositoryServiceClient(conn)
_, err = client.CreateRepository(ctx, &gitalypb.CreateRepositoryRequest{
@@ -279,12 +281,12 @@ func TestUnaryNoAuth(t *testing.T) {
func TestStreamingNoAuth(t *testing.T) {
cfg := testcfg.Build(t, testcfg.WithBase(config.Cfg{Auth: auth.Config{Token: "testtoken"}}))
+ ctx := testhelper.Context(t)
- path := runServer(t, cfg)
+ path := runServer(ctx, t, cfg)
conn, err := dial(path, []grpc.DialOption{grpc.WithTransportCredentials(insecure.NewCredentials())})
require.NoError(t, err)
t.Cleanup(func() { conn.Close() })
- ctx := testhelper.Context(t)
client := gitalypb.NewRepositoryServiceClient(conn)
stream, err := client.GetInfoAttributes(ctx, &gitalypb.GetInfoAttributesRequest{
@@ -329,7 +331,7 @@ func TestAuthBeforeLimit(t *testing.T) {
t.Cleanup(cleanup)
cfg.Gitlab.URL = gitlabURL
- serverSocketPath := runServer(t, cfg)
+ serverSocketPath := runServer(ctx, t, cfg)
client, conn := newOperationClient(t, cfg.Auth.Token, serverSocketPath)
t.Cleanup(func() { conn.Close() })
diff --git a/internal/gitaly/service/hook/pack_objects_test.go b/internal/gitaly/service/hook/pack_objects_test.go
index 4d4d1a500..62fa0dca8 100644
--- a/internal/gitaly/service/hook/pack_objects_test.go
+++ b/internal/gitaly/service/hook/pack_objects_test.go
@@ -858,7 +858,8 @@ func TestPackObjects_concurrencyLimit(t *testing.T) {
cfg.Prometheus.GRPCLatencyBuckets,
)
limiter := limiter.NewConcurrencyLimiter(
- 1,
+ ctx,
+ limiter.NewAdaptiveLimit("staticLimit", limiter.AdaptiveSetting{Initial: 1}),
0,
func() helper.Ticker { return ticker },
monitor,
diff --git a/internal/grpc/middleware/limithandler/middleware.go b/internal/grpc/middleware/limithandler/middleware.go
index 0c41a4672..a6ed93f9a 100644
--- a/internal/grpc/middleware/limithandler/middleware.go
+++ b/internal/grpc/middleware/limithandler/middleware.go
@@ -165,84 +165,88 @@ func (w *wrappedStream) RecvMsg(m interface{}) error {
// WithConcurrencyLimiters sets up middleware to limit the concurrency of
// requests based on RPC and repository
-func WithConcurrencyLimiters(cfg config.Cfg, middleware *LimiterMiddleware) {
- acquiringSecondsMetric := prometheus.NewHistogramVec(
- prometheus.HistogramOpts{
- Namespace: "gitaly",
- Subsystem: "concurrency_limiting",
- Name: "acquiring_seconds",
- Help: "Histogram of time calls are rate limited (in seconds)",
- Buckets: cfg.Prometheus.GRPCLatencyBuckets,
- },
- []string{"system", "grpc_service", "grpc_method"},
- )
- inProgressMetric := prometheus.NewGaugeVec(
- prometheus.GaugeOpts{
- Namespace: "gitaly",
- Subsystem: "concurrency_limiting",
- Name: "in_progress",
- Help: "Gauge of number of concurrent in-progress calls",
- },
- []string{"system", "grpc_service", "grpc_method"},
- )
- queuedMetric := prometheus.NewGaugeVec(
- prometheus.GaugeOpts{
- Namespace: "gitaly",
- Subsystem: "concurrency_limiting",
- Name: "queued",
- Help: "Gauge of number of queued calls",
- },
- []string{"system", "grpc_service", "grpc_method"},
- )
-
- middleware.collect = func(metrics chan<- prometheus.Metric) {
- acquiringSecondsMetric.Collect(metrics)
- inProgressMetric.Collect(metrics)
- queuedMetric.Collect(metrics)
- }
-
- result := make(map[string]limiter.Limiter)
- for _, limit := range cfg.Concurrency {
- limit := limit
+func WithConcurrencyLimiters(ctx context.Context) SetupFunc {
+ return func(cfg config.Cfg, middleware *LimiterMiddleware) {
+ acquiringSecondsMetric := prometheus.NewHistogramVec(
+ prometheus.HistogramOpts{
+ Namespace: "gitaly",
+ Subsystem: "concurrency_limiting",
+ Name: "acquiring_seconds",
+ Help: "Histogram of time calls are rate limited (in seconds)",
+ Buckets: cfg.Prometheus.GRPCLatencyBuckets,
+ },
+ []string{"system", "grpc_service", "grpc_method"},
+ )
+ inProgressMetric := prometheus.NewGaugeVec(
+ prometheus.GaugeOpts{
+ Namespace: "gitaly",
+ Subsystem: "concurrency_limiting",
+ Name: "in_progress",
+ Help: "Gauge of number of concurrent in-progress calls",
+ },
+ []string{"system", "grpc_service", "grpc_method"},
+ )
+ queuedMetric := prometheus.NewGaugeVec(
+ prometheus.GaugeOpts{
+ Namespace: "gitaly",
+ Subsystem: "concurrency_limiting",
+ Name: "queued",
+ Help: "Gauge of number of queued calls",
+ },
+ []string{"system", "grpc_service", "grpc_method"},
+ )
- newTickerFunc := func() helper.Ticker {
- return helper.NewManualTicker()
+ middleware.collect = func(metrics chan<- prometheus.Metric) {
+ acquiringSecondsMetric.Collect(metrics)
+ inProgressMetric.Collect(metrics)
+ queuedMetric.Collect(metrics)
}
- if limit.MaxQueueWait > 0 {
- newTickerFunc = func() helper.Ticker {
- return helper.NewTimerTicker(limit.MaxQueueWait.Duration())
+ result := make(map[string]limiter.Limiter)
+ for _, limit := range cfg.Concurrency {
+ limit := limit
+
+ newTickerFunc := func() helper.Ticker {
+ return helper.NewManualTicker()
}
+
+ if limit.MaxQueueWait > 0 {
+ newTickerFunc = func() helper.Ticker {
+ return helper.NewTimerTicker(limit.MaxQueueWait.Duration())
+ }
+ }
+
+ result[limit.RPC] = limiter.NewConcurrencyLimiter(
+ ctx,
+ limiter.NewAdaptiveLimit("staticLimit", limiter.AdaptiveSetting{Initial: limit.MaxPerRepo}),
+ limit.MaxQueueSize,
+ newTickerFunc,
+ limiter.NewPerRPCPromMonitor(
+ "gitaly", limit.RPC,
+ queuedMetric, inProgressMetric, acquiringSecondsMetric, middleware.requestsDroppedMetric,
+ ),
+ )
}
- result[limit.RPC] = limiter.NewConcurrencyLimiter(
- limit.MaxPerRepo,
- limit.MaxQueueSize,
- newTickerFunc,
- limiter.NewPerRPCPromMonitor(
- "gitaly", limit.RPC,
- queuedMetric, inProgressMetric, acquiringSecondsMetric, middleware.requestsDroppedMetric,
- ),
- )
- }
+ // Set default for ReplicateRepository.
+ replicateRepositoryFullMethod := "/gitaly.RepositoryService/ReplicateRepository"
+ if _, ok := result[replicateRepositoryFullMethod]; !ok {
+ result[replicateRepositoryFullMethod] = limiter.NewConcurrencyLimiter(
+ ctx,
+ limiter.NewAdaptiveLimit("staticLimit", limiter.AdaptiveSetting{Initial: 1}),
+ 0,
+ func() helper.Ticker {
+ return helper.NewManualTicker()
+ },
+ limiter.NewPerRPCPromMonitor(
+ "gitaly", replicateRepositoryFullMethod,
+ queuedMetric, inProgressMetric, acquiringSecondsMetric, middleware.requestsDroppedMetric,
+ ),
+ )
+ }
- // Set default for ReplicateRepository.
- replicateRepositoryFullMethod := "/gitaly.RepositoryService/ReplicateRepository"
- if _, ok := result[replicateRepositoryFullMethod]; !ok {
- result[replicateRepositoryFullMethod] = limiter.NewConcurrencyLimiter(
- 1,
- 0,
- func() helper.Ticker {
- return helper.NewManualTicker()
- },
- limiter.NewPerRPCPromMonitor(
- "gitaly", replicateRepositoryFullMethod,
- queuedMetric, inProgressMetric, acquiringSecondsMetric, middleware.requestsDroppedMetric,
- ),
- )
+ middleware.methodLimiters = result
}
-
- middleware.methodLimiters = result
}
// WithRateLimiters sets up a middleware with limiters that limit requests
diff --git a/internal/grpc/middleware/limithandler/middleware_test.go b/internal/grpc/middleware/limithandler/middleware_test.go
index 65adad2be..40978be69 100644
--- a/internal/grpc/middleware/limithandler/middleware_test.go
+++ b/internal/grpc/middleware/limithandler/middleware_test.go
@@ -38,6 +38,7 @@ func fixedLockKey(ctx context.Context) string {
func TestUnaryLimitHandler(t *testing.T) {
t.Parallel()
+ ctx := testhelper.Context(t)
s := &queueTestServer{
server: server{
blockCh: make(chan struct{}),
@@ -51,14 +52,13 @@ func TestUnaryLimitHandler(t *testing.T) {
},
}
- lh := limithandler.New(cfg, fixedLockKey, limithandler.WithConcurrencyLimiters)
+ lh := limithandler.New(cfg, fixedLockKey, limithandler.WithConcurrencyLimiters(ctx))
interceptor := lh.UnaryInterceptor()
srv, serverSocketPath := runServer(t, s, grpc.UnaryInterceptor(interceptor))
defer srv.Stop()
client, conn := newClient(t, serverSocketPath)
defer conn.Close()
- ctx := testhelper.Context(t)
var wg sync.WaitGroup
defer wg.Wait()
@@ -114,7 +114,7 @@ func TestUnaryLimitHandler_queueing(t *testing.T) {
MaxQueueWait: duration.Duration(time.Millisecond),
},
},
- }, fixedLockKey, limithandler.WithConcurrencyLimiters)
+ }, fixedLockKey, limithandler.WithConcurrencyLimiters(ctx))
s := &queueTestServer{
server: server{
@@ -175,7 +175,7 @@ func TestUnaryLimitHandler_queueing(t *testing.T) {
MaxPerRepo: 1,
},
},
- }, fixedLockKey, limithandler.WithConcurrencyLimiters)
+ }, fixedLockKey, limithandler.WithConcurrencyLimiters(ctx))
s := &queueTestServer{
server: server{
@@ -427,6 +427,7 @@ func TestStreamLimitHandler(t *testing.T) {
t.Run(tc.desc, func(t *testing.T) {
t.Parallel()
+ ctx := testhelper.Context(t)
s := &server{blockCh: make(chan struct{})}
maxQueueSize := 1
@@ -440,14 +441,13 @@ func TestStreamLimitHandler(t *testing.T) {
},
}
- lh := limithandler.New(cfg, fixedLockKey, limithandler.WithConcurrencyLimiters)
+ lh := limithandler.New(cfg, fixedLockKey, limithandler.WithConcurrencyLimiters(ctx))
interceptor := lh.StreamInterceptor()
srv, serverSocketPath := runServer(t, s, grpc.StreamInterceptor(interceptor))
defer srv.Stop()
client, conn := newClient(t, serverSocketPath)
defer conn.Close()
- ctx := testhelper.Context(t)
totalCalls := 10
@@ -481,6 +481,8 @@ func TestStreamLimitHandler(t *testing.T) {
func TestStreamLimitHandler_error(t *testing.T) {
t.Parallel()
+ ctx := testhelper.Context(t)
+
s := &queueTestServer{reqArrivedCh: make(chan struct{})}
s.blockCh = make(chan struct{})
@@ -490,7 +492,7 @@ func TestStreamLimitHandler_error(t *testing.T) {
},
}
- lh := limithandler.New(cfg, fixedLockKey, limithandler.WithConcurrencyLimiters)
+ lh := limithandler.New(cfg, fixedLockKey, limithandler.WithConcurrencyLimiters(ctx))
interceptor := lh.StreamInterceptor()
srv, serverSocketPath := runServer(t, s, grpc.StreamInterceptor(interceptor))
defer srv.Stop()
@@ -498,8 +500,6 @@ func TestStreamLimitHandler_error(t *testing.T) {
client, conn := newClient(t, serverSocketPath)
defer conn.Close()
- ctx := testhelper.Context(t)
-
respChan := make(chan *grpc_testing.StreamingOutputCallResponse)
go func() {
stream, err := client.FullDuplexCall(ctx)
@@ -600,6 +600,8 @@ func (q *queueTestServer) FullDuplexCall(stream grpc_testing.TestService_FullDup
}
func TestConcurrencyLimitHandlerMetrics(t *testing.T) {
+ ctx := testhelper.Context(t)
+
s := &queueTestServer{reqArrivedCh: make(chan struct{})}
s.blockCh = make(chan struct{})
@@ -610,7 +612,7 @@ func TestConcurrencyLimitHandlerMetrics(t *testing.T) {
},
}
- lh := limithandler.New(cfg, fixedLockKey, limithandler.WithConcurrencyLimiters)
+ lh := limithandler.New(cfg, fixedLockKey, limithandler.WithConcurrencyLimiters(ctx))
interceptor := lh.UnaryInterceptor()
srv, serverSocketPath := runServer(t, s, grpc.UnaryInterceptor(interceptor))
defer srv.Stop()
@@ -618,8 +620,6 @@ func TestConcurrencyLimitHandlerMetrics(t *testing.T) {
client, conn := newClient(t, serverSocketPath)
defer conn.Close()
- ctx := testhelper.Context(t)
-
respCh := make(chan *grpc_testing.SimpleResponse)
go func() {
resp, err := client.UnaryCall(ctx, &grpc_testing.SimpleRequest{})
diff --git a/internal/limiter/adaptive_calculator.go b/internal/limiter/adaptive_calculator.go
index 15d7cf87b..793fc853c 100644
--- a/internal/limiter/adaptive_calculator.go
+++ b/internal/limiter/adaptive_calculator.go
@@ -262,7 +262,7 @@ func (c *AdaptiveCalculator) calibrateLimits(ctx context.Context) {
}).Debugf("Additive increase")
} else {
// Multiplicative decrease
- newLimit = int(math.Floor(float64(limit.Current()) * setting.BackoffBackoff))
+ newLimit = int(math.Floor(float64(limit.Current()) * setting.BackoffFactor))
if newLimit < setting.Min {
newLimit = setting.Min
}
diff --git a/internal/limiter/adaptive_calculator_test.go b/internal/limiter/adaptive_calculator_test.go
index e2c2d95dc..d4965bdf4 100644
--- a/internal/limiter/adaptive_calculator_test.go
+++ b/internal/limiter/adaptive_calculator_test.go
@@ -657,12 +657,14 @@ func (l *testLimit) Update(val int) {
l.currents = append(l.currents, val)
}
+func (*testLimit) AfterUpdate(_ AfterUpdateHook) {}
+
func (l *testLimit) Setting() AdaptiveSetting {
return AdaptiveSetting{
- Initial: l.initial,
- Max: l.max,
- Min: l.min,
- BackoffBackoff: l.backoffBackoff,
+ Initial: l.initial,
+ Max: l.max,
+ Min: l.min,
+ BackoffFactor: l.backoffBackoff,
}
}
diff --git a/internal/limiter/adaptive_limit.go b/internal/limiter/adaptive_limit.go
index 2aefe5eb0..5af9f8ebd 100644
--- a/internal/limiter/adaptive_limit.go
+++ b/internal/limiter/adaptive_limit.go
@@ -1,30 +1,49 @@
package limiter
-import "sync/atomic"
+import (
+ "sync"
+)
// AdaptiveSetting is a struct that holds the configuration parameters for an adaptive limiter.
type AdaptiveSetting struct {
- Initial int
- Max int
- Min int
- BackoffBackoff float64
+ Initial int
+ Max int
+ Min int
+ BackoffFactor float64
}
+// AfterUpdateHook is a function hook that is triggered when the current value changes. The callers need to register a hook to
+// the AdaptiveLimiter implementation beforehand. They are required to handle errors inside the hook function.
+type AfterUpdateHook func(newVal int)
+
// AdaptiveLimiter is an interface for managing and updating adaptive limits.
// It exposes methods to get the name, current limit value, update the limit value, and access its settings.
type AdaptiveLimiter interface {
Name() string
Current() int
Update(val int)
+ AfterUpdate(AfterUpdateHook)
Setting() AdaptiveSetting
}
// AdaptiveLimit is an implementation of the AdaptiveLimiter interface. It uses an atomic Int32 to represent the current
// limit value, ensuring thread-safe updates.
type AdaptiveLimit struct {
- name string
- current atomic.Int32
- setting AdaptiveSetting
+ sync.RWMutex
+
+ name string
+ current int
+ setting AdaptiveSetting
+ updateHooks []AfterUpdateHook
+}
+
+// NewAdaptiveLimit initializes a new AdaptiveLimit object
+func NewAdaptiveLimit(name string, setting AdaptiveSetting) *AdaptiveLimit {
+ return &AdaptiveLimit{
+ name: name,
+ current: setting.Initial,
+ setting: setting,
+ }
}
// Name returns the name of the adaptive limit
@@ -34,12 +53,30 @@ func (l *AdaptiveLimit) Name() string {
// Current returns the current limit. This function can be called without the need for synchronization.
func (l *AdaptiveLimit) Current() int {
- return int(l.current.Load())
+ l.RLock()
+ defer l.RUnlock()
+
+ return l.current
}
// Update adjusts current limit value.
func (l *AdaptiveLimit) Update(val int) {
- l.current.Store(int32(val))
+ l.Lock()
+ defer l.Unlock()
+
+ if val != l.current {
+ l.current = val
+ for _, hook := range l.updateHooks {
+ hook(val)
+ }
+ }
+}
+
+// AfterUpdate registers a callback when the current limit is updated. Because all updates and hooks are synchronized,
+// calling Current() inside the update hook in the same goroutine will cause deadlock. Hence, the update hook must
+// use the new value passed as the argument instead.
+func (l *AdaptiveLimit) AfterUpdate(hook AfterUpdateHook) {
+ l.updateHooks = append(l.updateHooks, hook)
}
// Setting returns the configuration parameters for an adaptive limiter.
diff --git a/internal/limiter/adaptive_limit_test.go b/internal/limiter/adaptive_limit_test.go
new file mode 100644
index 000000000..f6c67f092
--- /dev/null
+++ b/internal/limiter/adaptive_limit_test.go
@@ -0,0 +1,124 @@
+package limiter
+
+import (
+ "testing"
+
+ "github.com/stretchr/testify/require"
+)
+
+func TestAdaptiveLimit_New(t *testing.T) {
+ t.Parallel()
+
+ setting := AdaptiveSetting{
+ Initial: 5,
+ Max: 10,
+ Min: 1,
+ BackoffFactor: 0.5,
+ }
+
+ limit := NewAdaptiveLimit("testLimit", setting)
+ require.Equal(t, limit.Name(), "testLimit")
+ require.Equal(t, limit.Current(), 5)
+ require.Equal(t, limit.Setting(), setting)
+}
+
+func TestAdaptiveLimit_Update(t *testing.T) {
+ t.Parallel()
+
+ newLimit := func() *AdaptiveLimit {
+ return NewAdaptiveLimit("testLimit", AdaptiveSetting{
+ Initial: 5,
+ Max: 10,
+ Min: 1,
+ BackoffFactor: 0.5,
+ })
+ }
+
+ t.Run("without update hooks", func(t *testing.T) {
+ limit := newLimit()
+
+ limit.Update(1)
+ require.Equal(t, 1, limit.Current())
+
+ limit.Update(2)
+ require.Equal(t, 2, limit.Current())
+
+ limit.Update(3)
+ require.Equal(t, 3, limit.Current())
+ })
+
+ t.Run("new values are different from old values", func(t *testing.T) {
+ limit := newLimit()
+
+ vals := []int{}
+ limit.AfterUpdate(func(val int) {
+ vals = append(vals, val)
+ })
+
+ limit.Update(1)
+ require.Equal(t, 1, limit.Current())
+ require.Equal(t, vals, []int{1})
+
+ limit.Update(2)
+ require.Equal(t, 2, limit.Current())
+ require.Equal(t, vals, []int{1, 2})
+
+ limit.Update(3)
+ require.Equal(t, 3, limit.Current())
+ require.Equal(t, vals, []int{1, 2, 3})
+ })
+
+ t.Run("new values are the same as old values", func(t *testing.T) {
+ limit := newLimit()
+
+ vals := []int{}
+ limit.AfterUpdate(func(val int) {
+ vals = append(vals, val)
+ })
+
+ limit.Update(1)
+ require.Equal(t, 1, limit.Current())
+ require.Equal(t, vals, []int{1})
+
+ limit.Update(1)
+ require.Equal(t, 1, limit.Current())
+ require.Equal(t, vals, []int{1})
+
+ limit.Update(2)
+ require.Equal(t, 2, limit.Current())
+ require.Equal(t, vals, []int{1, 2})
+
+ limit.Update(2)
+ require.Equal(t, 2, limit.Current())
+ require.Equal(t, vals, []int{1, 2})
+ })
+
+ t.Run("multiple update hooks", func(t *testing.T) {
+ limit := newLimit()
+
+ vals1 := []int{}
+ limit.AfterUpdate(func(val int) {
+ vals1 = append(vals1, val)
+ })
+
+ vals2 := []int{}
+ limit.AfterUpdate(func(val int) {
+ vals2 = append(vals2, val*2)
+ })
+
+ limit.Update(1)
+ require.Equal(t, 1, limit.Current())
+ require.Equal(t, vals1, []int{1})
+ require.Equal(t, vals2, []int{2})
+
+ limit.Update(2)
+ require.Equal(t, 2, limit.Current())
+ require.Equal(t, vals1, []int{1, 2})
+ require.Equal(t, vals2, []int{2, 4})
+
+ limit.Update(3)
+ require.Equal(t, 3, limit.Current())
+ require.Equal(t, vals1, []int{1, 2, 3})
+ require.Equal(t, vals2, []int{2, 4, 6})
+ })
+}
diff --git a/internal/limiter/concurrency_limiter.go b/internal/limiter/concurrency_limiter.go
index 2807b856e..b4284adeb 100644
--- a/internal/limiter/concurrency_limiter.go
+++ b/internal/limiter/concurrency_limiter.go
@@ -41,10 +41,10 @@ type keyedConcurrencyLimiter struct {
// concurrencyTokens is the channel of available concurrency tokens, where every token
// allows one concurrent call to the concurrency-limited function.
- concurrencyTokens chan struct{}
+ concurrencyTokens *resizableSemaphore
// queueTokens is the channel of available queue tokens, where every token allows one
// concurrent call to be admitted to the queue.
- queueTokens chan struct{}
+ queueTokens *resizableSemaphore
}
// acquire tries to acquire the semaphore. It may fail if the admission queue is full or if the max
@@ -55,34 +55,33 @@ func (sem *keyedConcurrencyLimiter) acquire(ctx context.Context, limitingKey str
// callers may wait for the concurrency token at the same time. If there are no more
// queueing tokens then this indicates that the queue is full and we thus return an
// error immediately.
- select {
- case sem.queueTokens <- struct{}{}:
- // We have acquired a queueing token, so we need to release it if acquiring
- // the concurrency token fails. If we succeed to acquire the concurrency
- // token though then we retain the queueing token until the caller signals
- // that the concurrency-limited function has finished. As a consequence the
- // queue token is returned together with the concurrency token.
- //
- // A simpler model would be to just have `maxQueueLength` many queueing
- // tokens. But this would add concurrency-limiting when acquiring the queue
- // token itself, which is not what we want to do. Instead, we want to admit
- // as many callers into the queue as the queue length permits plus the
- // number of available concurrency tokens allows.
- defer func() {
- if returnedErr != nil {
- <-sem.queueTokens
- }
- }()
- default:
+ if !sem.queueTokens.TryAcquire() {
return ErrMaxQueueSize
}
+
+ // We have acquired a queueing token, so we need to release it if acquiring
+ // the concurrency token fails. If we successfully acquire the concurrency
+ // token though then we retain the queueing token until the caller signals
+ // that the concurrency-limited function has finished. As a consequence the
+ // queue token is returned together with the concurrency token.
+ //
+ // A simpler model would be to just have `maxQueueLength` many queueing
+ // tokens. But this would add concurrency-limiting when acquiring the queue
+ // token itself, which is not what we want to do. Instead, we want to admit
+ // as many callers into the queue as the queue length permits plus the
+ // number of available concurrency tokens allows.
+ defer func() {
+ if returnedErr != nil {
+ sem.queueTokens.Release()
+ }
+ }()
}
// We are queued now, so let's tell the monitor. Furthermore, even though we're still
// holding the queueing token when this function exits successfully we also tell the monitor
// that we have exited the queue. It is only an implementation detail anyway that we hold on
// to the token, so the monitor shouldn't care about that.
- sem.monitor.Queued(ctx, limitingKey, len(sem.queueTokens))
+ sem.monitor.Queued(ctx, limitingKey, sem.queueLength())
defer sem.monitor.Dequeued(ctx)
// Set up the ticker that keeps us from waiting indefinitely on the concurrency token.
@@ -98,7 +97,12 @@ func (sem *keyedConcurrencyLimiter) acquire(ctx context.Context, limitingKey str
// Try to acquire the concurrency token now that we're in the queue.
select {
- case sem.concurrencyTokens <- struct{}{}:
+ case acquired := <-sem.concurrencyTokens.Acquire():
+ // When the semaphore returns false, the semaphore was stopped. It's likely due to the context is
+ // cancelled. Hence, we should return the error here.
+ if !acquired {
+ return sem.concurrencyTokens.Err()
+ }
return nil
case <-ticker.C():
return ErrMaxQueueTime
@@ -110,21 +114,27 @@ func (sem *keyedConcurrencyLimiter) acquire(ctx context.Context, limitingKey str
// release releases the acquired tokens.
func (sem *keyedConcurrencyLimiter) release() {
if sem.queueTokens != nil {
- <-sem.queueTokens
+ sem.queueTokens.Release()
}
- <-sem.concurrencyTokens
+ sem.concurrencyTokens.Release()
}
// queueLength returns the length of token queue
func (sem *keyedConcurrencyLimiter) queueLength() int {
- return len(sem.queueTokens)
+ if sem.queueTokens == nil {
+ return 0
+ }
+ return int(sem.queueTokens.Current())
}
// ConcurrencyLimiter contains rate limiter state.
type ConcurrencyLimiter struct {
- // maxConcurrencyLimit is the maximum number of concurrent calls to the limited function.
- // This limit is per key.
- maxConcurrencyLimit int64
+ // ctx stores the context at initialization. This context is used as a stopping condition
+ // for some internal goroutines.
+ ctx context.Context
+ // limit is the adaptive maximum number of concurrent calls to the limited function. This limit is
+ // calculated adaptively from an outside calculator.
+ limit *AdaptiveLimit
// maxQueueLength is the maximum number of operations allowed to wait in a queued state.
// This limit is global and applies before the concurrency limit. Subsequent incoming
// operations will be rejected with an error immediately.
@@ -145,18 +155,31 @@ type ConcurrencyLimiter struct {
}
// NewConcurrencyLimiter creates a new concurrency rate limiter.
-func NewConcurrencyLimiter(maxConcurrencyLimit, maxQueueLength int, maxQueuedTickerCreator QueueTickerCreator, monitor ConcurrencyMonitor) *ConcurrencyLimiter {
+func NewConcurrencyLimiter(ctx context.Context, limit *AdaptiveLimit, maxQueueLength int, maxQueuedTickerCreator QueueTickerCreator, monitor ConcurrencyMonitor) *ConcurrencyLimiter {
if monitor == nil {
monitor = NewNoopConcurrencyMonitor()
}
- return &ConcurrencyLimiter{
- maxConcurrencyLimit: int64(maxConcurrencyLimit),
+ limiter := &ConcurrencyLimiter{
+ ctx: ctx,
+ limit: limit,
maxQueueLength: int64(maxQueueLength),
maxQueuedTickerCreator: maxQueuedTickerCreator,
monitor: monitor,
limitsByKey: make(map[string]*keyedConcurrencyLimiter),
}
+
+ // When the capacity of the limiter is updated we also need to update the size of both the queuing tokens as
+ // well as the concurrency tokens to match the new size.
+ limit.AfterUpdate(func(val int) {
+ for _, keyedLimiter := range limiter.limitsByKey {
+ if keyedLimiter.queueTokens != nil {
+ keyedLimiter.queueTokens.Resize(int64(val) + limiter.maxQueueLength)
+ }
+ keyedLimiter.concurrencyTokens.Resize(int64(val))
+ }
+ })
+ return limiter
}
// Limit will limit the concurrency of the limited function f. There are two distinct mechanisms
@@ -176,7 +199,7 @@ func (c *ConcurrencyLimiter) Limit(ctx context.Context, limitingKey string, f Li
)
defer span.Finish()
- if c.maxConcurrencyLimit <= 0 {
+ if c.currentLimit() <= 0 {
return f()
}
@@ -222,15 +245,15 @@ func (c *ConcurrencyLimiter) getConcurrencyLimit(limitingKey string) *keyedConcu
// Set up the queue tokens in case a maximum queue length was requested. As the
// queue tokens are kept during the whole lifetime of the concurrency-limited
// function we add the concurrency tokens to the number of available token.
- var queueTokens chan struct{}
+ var queueTokens *resizableSemaphore
if c.maxQueueLength > 0 {
- queueTokens = make(chan struct{}, c.maxConcurrencyLimit+c.maxQueueLength)
+ queueTokens = NewResizableSemaphore(c.ctx, c.currentLimit()+c.maxQueueLength)
}
c.limitsByKey[limitingKey] = &keyedConcurrencyLimiter{
monitor: c.monitor,
maxQueuedTickerCreator: c.maxQueuedTickerCreator,
- concurrencyTokens: make(chan struct{}, c.maxConcurrencyLimit),
+ concurrencyTokens: NewResizableSemaphore(c.ctx, c.currentLimit()),
queueTokens: queueTokens,
}
}
@@ -268,3 +291,7 @@ func (c *ConcurrencyLimiter) countSemaphores() int {
return len(c.limitsByKey)
}
+
+func (c *ConcurrencyLimiter) currentLimit() int64 {
+ return int64(c.limit.Current())
+}
diff --git a/internal/limiter/concurrency_limiter_test.go b/internal/limiter/concurrency_limiter_test.go
index f99ca7eab..67ca38e1e 100644
--- a/internal/limiter/concurrency_limiter_test.go
+++ b/internal/limiter/concurrency_limiter_test.go
@@ -3,6 +3,7 @@ package limiter
import (
"context"
"errors"
+ "fmt"
"strconv"
"sync"
"testing"
@@ -86,7 +87,7 @@ func (c *counter) Dropped(_ context.Context, _ string, _ int, _ time.Duration, r
}
}
-func TestLimiter(t *testing.T) {
+func TestLimiter_static(t *testing.T) {
t.Parallel()
tests := []struct {
@@ -155,7 +156,8 @@ func TestLimiter(t *testing.T) {
gauge := &counter{}
limiter := NewConcurrencyLimiter(
- tt.maxConcurrency,
+ ctx,
+ NewAdaptiveLimit("staticLimit", AdaptiveSetting{Initial: tt.maxConcurrency}),
0,
nil,
gauge,
@@ -229,6 +231,557 @@ func TestLimiter(t *testing.T) {
}
}
+func TestLimiter_dynamic(t *testing.T) {
+ t.Parallel()
+
+ t.Run("increase dynamic limit when there is no queuing request", func(t *testing.T) {
+ ctx := testhelper.Context(t)
+ limit := NewAdaptiveLimit("dynamicLimit", AdaptiveSetting{Initial: 5, Max: 10, Min: 1})
+ gauge := &blockingQueueCounter{queuedCh: make(chan struct{})}
+ limiter := NewConcurrencyLimiter(ctx, limit, 10, nil, gauge)
+
+ // 5 requests acquired the tokens, the limiter is full now
+ release1, waitAfterRelease1 := spawnAndWaitAcquired(ctx, t, "1", limiter, gauge, 5)
+ require.Equal(t, 5, gauge.enter)
+
+ // Update the limit to 7
+ limit.Update(7)
+
+ // 2 more requests acquired the token. This proves the limit is expanded
+ release2, waitAfterRelease2 := spawnAndWaitAcquired(ctx, t, "1", limiter, gauge, 2)
+ require.Equal(t, 7, gauge.enter)
+
+ close(release1)
+ close(release2)
+ waitAfterRelease1()
+ waitAfterRelease2()
+ require.Equal(t, 7, gauge.exit)
+ })
+
+ t.Run("decrease dynamic limit when there is no queuing request", func(t *testing.T) {
+ ctx := testhelper.Context(t)
+ limit := NewAdaptiveLimit("dynamicLimit", AdaptiveSetting{Initial: 5, Max: 10, Min: 1})
+ gauge := &blockingQueueCounter{queuedCh: make(chan struct{})}
+ limiter := NewConcurrencyLimiter(ctx, limit, 10, nil, gauge)
+
+ // 3 requests acquired the tokens, 2 slots left
+ release1, waitAfterRelease1 := spawnAndWaitAcquired(ctx, t, "1", limiter, gauge, 3)
+ require.Equal(t, 3, gauge.enter)
+ require.Equal(t, 3, gauge.queued)
+
+ // Update the limit to 3
+ limit.Update(3)
+
+ // 2 requests are put in queue, meaning the limit shrinks down
+ waitAcquired2, release2, waitAfterRelease2 := spawnAndWaitQueued(ctx, t, "1", limiter, gauge, 2)
+ require.Equal(t, 3, gauge.enter)
+ require.Equal(t, 5, gauge.queued)
+
+ // Release first 3 requests
+ close(release1)
+ waitAfterRelease1()
+
+ // Now the last 2 requests can acquire token
+ waitAcquired2()
+ require.Equal(t, 5, gauge.enter)
+ require.Equal(t, 5, gauge.queued)
+ require.Equal(t, 3, gauge.exit)
+
+ // Release the last patch
+ close(release2)
+ waitAfterRelease2()
+ require.Equal(t, 5, gauge.exit)
+ })
+
+ t.Run("increase dynamic limit more than the number of queuing requests", func(t *testing.T) {
+ ctx := testhelper.Context(t)
+ limit := NewAdaptiveLimit("dynamicLimit", AdaptiveSetting{Initial: 5, Max: 10, Min: 1})
+ gauge := &blockingQueueCounter{queuedCh: make(chan struct{})}
+ limiter := NewConcurrencyLimiter(ctx, limit, 10, nil, gauge)
+
+ // 5 requests acquired the tokens, the limiter is full now
+ release1, waitAfterRelease1 := spawnAndWaitAcquired(ctx, t, "1", limiter, gauge, 5)
+ require.Equal(t, 5, gauge.enter)
+
+ // 2 requests waiting in the queue
+ waitAcquired2, release2, waitAfterRelease2 := spawnAndWaitQueued(ctx, t, "1", limiter, gauge, 2)
+ require.Equal(t, 5, gauge.enter)
+ require.Equal(t, 7, gauge.queued)
+
+ // Update the limit to 7
+ limit.Update(7)
+
+ // Wait for the other 2 requests acquired the token. This proves the limiter is expanded
+ waitAcquired2()
+ require.Equal(t, 7, gauge.enter)
+
+ close(release1)
+ close(release2)
+ waitAfterRelease1()
+ waitAfterRelease2()
+ require.Equal(t, 7, gauge.exit)
+ })
+
+ t.Run("increase dynamic limit less than the number of queuing requests", func(t *testing.T) {
+ ctx := testhelper.Context(t)
+ limit := NewAdaptiveLimit("dynamicLimit", AdaptiveSetting{Initial: 5, Max: 10, Min: 1})
+ gauge := &blockingQueueCounter{queuedCh: make(chan struct{})}
+ limiter := NewConcurrencyLimiter(ctx, limit, 10, nil, gauge)
+
+ // 5 requests acquired the tokens, the limiter is full now
+ release1, waitAfterRelease1 := spawnAndWaitAcquired(ctx, t, "1", limiter, gauge, 5)
+ require.Equal(t, 5, gauge.enter)
+
+ // 2 requests waiting in the queue
+ waitAcquired2, release2, waitAfterRelease2 := spawnAndWaitQueued(ctx, t, "1", limiter, gauge, 2)
+ require.Equal(t, 5, gauge.enter)
+ require.Equal(t, 7, gauge.queued)
+
+ // 5 more requests waiting in the queue
+ waitAcquired3, release3, waitAfterRelease3 := spawnAndWaitQueued(ctx, t, "1", limiter, gauge, 5)
+ require.Equal(t, 5, gauge.enter)
+ require.Equal(t, 12, gauge.queued)
+
+ // Update the limit to 7.
+ limit.Update(7)
+
+ // Release first 5 requests, all requests should fit in the queue now.
+ close(release1)
+ waitAfterRelease1()
+ require.Equal(t, 5, gauge.exit)
+
+ waitAcquired2()
+ waitAcquired3()
+ require.Equal(t, 12, gauge.enter)
+
+ // Now release all requests
+ close(release2)
+ close(release3)
+ waitAfterRelease2()
+ waitAfterRelease3()
+ require.Equal(t, 12, gauge.exit)
+ })
+
+ t.Run("decrease dynamic limit less than the number of concurrent requests", func(t *testing.T) {
+ ctx := testhelper.Context(t)
+ limit := NewAdaptiveLimit("dynamicLimit", AdaptiveSetting{Initial: 5, Max: 10, Min: 1})
+ gauge := &blockingQueueCounter{queuedCh: make(chan struct{})}
+ limiter := NewConcurrencyLimiter(ctx, limit, 10, nil, gauge)
+
+ // 5 requests acquired the tokens, the limiter is full now
+ release1, waitAfterRelease1 := spawnAndWaitAcquired(ctx, t, "1", limiter, gauge, 5)
+ require.Equal(t, 5, gauge.enter)
+
+ // Update the limit to 3
+ limit.Update(3)
+
+ // 3 requests are put in queue
+ waitAcquired2, release2, waitAfterRelease2 := spawnAndWaitQueued(ctx, t, "1", limiter, gauge, 3)
+ require.Equal(t, 5, gauge.enter)
+ require.Equal(t, 8, gauge.queued)
+
+ // Release the first 5 requests
+ close(release1)
+ waitAfterRelease1()
+ require.Equal(t, 5, gauge.exit)
+
+ // Now the last 3 requests acquire the tokens
+ waitAcquired2()
+ require.Equal(t, 8, gauge.enter)
+
+ // 1 more request is put in queue, meaning the limit shrinks down to 3.
+ waitAcquired3, release3, waitAfterRelease3 := spawnAndWaitQueued(ctx, t, "1", limiter, gauge, 1)
+ require.Equal(t, 8, gauge.enter)
+ require.Equal(t, 9, gauge.queued)
+
+ // Release the second 3 requests
+ close(release2)
+ waitAfterRelease2()
+ require.Equal(t, 8, gauge.exit)
+
+ // The last request acquires the token
+ waitAcquired3()
+ require.Equal(t, 9, gauge.enter)
+
+ // Release the last request
+ close(release3)
+ waitAfterRelease3()
+ require.Equal(t, 9, gauge.exit)
+ })
+
+ t.Run("increase and decrease dynamic limit multiple times", func(t *testing.T) {
+ ctx := testhelper.Context(t)
+ limit := NewAdaptiveLimit("dynamicLimit", AdaptiveSetting{Initial: 5, Max: 10, Min: 1})
+ gauge := &blockingQueueCounter{queuedCh: make(chan struct{})}
+ limiter := NewConcurrencyLimiter(ctx, limit, 10, nil, gauge)
+
+ // Update the limit to 7
+ limit.Update(7)
+
+ // 5 requests acquired the tokens
+ release1, waitAfterRelease1 := spawnAndWaitAcquired(ctx, t, "1", limiter, gauge, 5)
+ require.Equal(t, 5, gauge.enter)
+
+ // Update the limit to 3
+ limit.Update(3)
+
+ // 3 requests are put in queue
+ waitAcquired2, release2, waitAfterRelease2 := spawnAndWaitQueued(ctx, t, "1", limiter, gauge, 3)
+ require.Equal(t, 5, gauge.enter)
+ require.Equal(t, 8, gauge.queued)
+
+ // Update the limit to 10
+ limit.Update(10)
+
+ // All existing requests acquire the tokens
+ waitAcquired2()
+ require.Equal(t, 8, gauge.enter)
+
+ // 2 more requests
+ release3, waitAfterRelease3 := spawnAndWaitAcquired(ctx, t, "1", limiter, gauge, 2)
+ require.Equal(t, 10, gauge.enter)
+
+ // Update the limit to 1
+ limit.Update(1)
+
+ // Now release all of them
+ close(release1)
+ waitAfterRelease1()
+ require.Equal(t, 5, gauge.exit)
+
+ close(release2)
+ waitAfterRelease2()
+ require.Equal(t, 8, gauge.exit)
+
+ close(release3)
+ waitAfterRelease3()
+ require.Equal(t, 10, gauge.exit)
+ })
+
+ t.Run("increase the limit when the queue is full", func(t *testing.T) {
+ ctx := testhelper.Context(t)
+ limit := NewAdaptiveLimit("dynamicLimit", AdaptiveSetting{Initial: 1, Max: 10, Min: 1})
+ gauge := &blockingQueueCounter{queuedCh: make(chan struct{})}
+ // Mind the queue length here
+ limiter := NewConcurrencyLimiter(ctx, limit, 5, nil, gauge)
+
+ // 1 requests acquired the tokens, the limiter is full now
+ release1, waitAfterRelease1 := spawnAndWaitAcquired(ctx, t, "1", limiter, gauge, 1)
+ require.Equal(t, 1, gauge.enter)
+ require.Equal(t, 1, gauge.queued)
+
+ // 5 requests queuing for the tokens, the queue is full now
+ waitAcquired2, release2, waitAfterRelease2 := spawnAndWaitQueued(ctx, t, "1", limiter, gauge, 5)
+ require.Equal(t, 1, gauge.enter)
+ require.Equal(t, 6, gauge.queued)
+
+ // Limiter rejects new request
+ maximumQueueSizeReached(ctx, t, "1", limiter)
+
+ // Update the limit
+ limit.Update(6)
+ waitAcquired2()
+ require.Equal(t, 6, gauge.enter)
+
+ // 5 requests queuing for the tokens, the queue is full now
+ waitAcquired3, release3, waitAfterRelease3 := spawnAndWaitQueued(ctx, t, "1", limiter, gauge, 5)
+ require.Equal(t, 6, gauge.enter)
+ require.Equal(t, 11, gauge.queued)
+
+ // Limiter rejects new request
+ maximumQueueSizeReached(ctx, t, "1", limiter)
+
+ // Clean up
+ close(release1)
+ close(release2)
+ waitAfterRelease1()
+ waitAfterRelease2()
+ require.Equal(t, 6, gauge.exit)
+
+ waitAcquired3()
+ require.Equal(t, 11, gauge.enter)
+ close(release3)
+ waitAfterRelease3()
+ })
+
+ t.Run("decrease the limit when the queue is full", func(t *testing.T) {
+ ctx := testhelper.Context(t)
+ limit := NewAdaptiveLimit("dynamicLimit", AdaptiveSetting{Initial: 5, Max: 10, Min: 1})
+ gauge := &blockingQueueCounter{queuedCh: make(chan struct{})}
+ // Mind the queue length here
+ limiter := NewConcurrencyLimiter(ctx, limit, 3, nil, gauge)
+
+ // 5 requests acquired the tokens, the limiter is full now
+ release1, waitAfterRelease1 := spawnAndWaitAcquired(ctx, t, "1", limiter, gauge, 5)
+ require.Equal(t, 5, gauge.enter)
+ require.Equal(t, 5, gauge.queued)
+
+ // 5 requests queuing for the tokens, the queue is full now
+ waitAcquired2, release2, waitAfterRelease2 := spawnAndWaitQueued(ctx, t, "1", limiter, gauge, 3)
+ require.Equal(t, 5, gauge.enter)
+ require.Equal(t, 8, gauge.queued)
+
+ // Limiter rejects new request
+ maximumQueueSizeReached(ctx, t, "1", limiter)
+
+ // Update the limit.
+ limit.Update(3)
+
+ // The queue is still full
+ maximumQueueSizeReached(ctx, t, "1", limiter)
+
+ // Release first 5 requests and let the last 3 requests in
+ close(release1)
+ waitAfterRelease1()
+ require.Equal(t, 5, gauge.exit)
+ waitAcquired2()
+ require.Equal(t, 8, gauge.enter)
+
+ // Another 5 requests in queue. The queue is still full, meaning the concurrency is 3 and the queue is still 5.
+ waitAcquired3, release3, waitAfterRelease3 := spawnAndWaitQueued(ctx, t, "1", limiter, gauge, 3)
+ require.Equal(t, 8, gauge.enter)
+ require.Equal(t, 11, gauge.queued)
+ maximumQueueSizeReached(ctx, t, "1", limiter)
+
+ // Clean up
+ close(release2)
+ waitAfterRelease2()
+ require.Equal(t, 8, gauge.exit)
+ waitAcquired3()
+ require.Equal(t, 11, gauge.enter)
+ close(release3)
+ waitAfterRelease3()
+ require.Equal(t, 11, gauge.exit)
+ })
+
+ t.Run("dynamic limit works without queuing", func(t *testing.T) {
+ ctx := testhelper.Context(t)
+ limit := NewAdaptiveLimit("dynamicLimit", AdaptiveSetting{Initial: 5, Max: 10, Min: 1})
+ gauge := &blockingQueueCounter{queuedCh: make(chan struct{})}
+ // No queue, it means the limiter accepts unlimited requests
+ limiter := NewConcurrencyLimiter(ctx, limit, 0, nil, gauge)
+
+ // 5 requests acquired the tokens, the limiter is full now
+ release1, waitAfterRelease1 := spawnAndWaitAcquired(ctx, t, "1", limiter, gauge, 5)
+ require.Equal(t, 5, gauge.enter)
+ require.Equal(t, 5, gauge.queued)
+
+ // 5 more requests
+ waitAcquired2, release2, waitAfterRelease2 := spawnAndWaitQueued(ctx, t, "1", limiter, gauge, 5)
+
+ // Update the limit.
+ limit.Update(10)
+
+ // All of them acquired the tokens
+ waitAcquired2()
+ require.Equal(t, 10, gauge.enter)
+
+ // Clean up
+ close(release1)
+ close(release2)
+ waitAfterRelease1()
+ waitAfterRelease2()
+ require.Equal(t, 10, gauge.exit)
+ })
+
+ t.Run("dynamic limit works with queue timer", func(t *testing.T) {
+ ctx := testhelper.Context(t)
+ limit := NewAdaptiveLimit("dynamicLimit", AdaptiveSetting{Initial: 5, Max: 10, Min: 1})
+ gauge := &blockingQueueCounter{queuedCh: make(chan struct{})}
+
+ ticker := helper.NewManualTicker()
+ limiter := NewConcurrencyLimiter(ctx, limit, 0, func() helper.Ticker { return ticker }, gauge)
+
+ // 5 requests acquired the tokens, the limiter is full now
+ release1, waitAfterRelease1 := spawnAndWaitAcquired(ctx, t, "1", limiter, gauge, 5)
+ require.Equal(t, 5, gauge.enter)
+ require.Equal(t, 5, gauge.queued)
+
+ errors := make(chan error, 10)
+ // 5 requests in queue
+ spawnQueuedAndCollectErrors(ctx, "1", limiter, gauge, 5, errors)
+
+ // Decrease the limit
+ limit.Update(3)
+
+ // 5 more requests in queue
+ spawnQueuedAndCollectErrors(ctx, "1", limiter, gauge, 5, errors)
+
+ // Trigger timeout event
+ for i := 0; i < 10; i++ {
+ ticker.Tick()
+ require.EqualError(t, <-errors, "maximum time in concurrency queue reached")
+ }
+
+ // Other goroutines exit as normal
+ close(release1)
+ waitAfterRelease1()
+ require.Equal(t, 5, gauge.exit)
+ })
+
+ t.Run("dynamic limit works with context cancellation", func(t *testing.T) {
+ ctx := testhelper.Context(t)
+ ctx2, cancel := context.WithCancel(testhelper.Context(t))
+
+ limit := NewAdaptiveLimit("dynamicLimit", AdaptiveSetting{Initial: 5, Max: 10, Min: 1})
+ gauge := &blockingQueueCounter{queuedCh: make(chan struct{})}
+
+ limiter := NewConcurrencyLimiter(ctx, limit, 0, nil, gauge)
+
+ // 5 requests acquired the tokens, the limiter is full now
+ release1, waitAfterRelease1 := spawnAndWaitAcquired(ctx, t, "1", limiter, gauge, 5)
+ require.Equal(t, 5, gauge.enter)
+ require.Equal(t, 5, gauge.queued)
+
+ errors := make(chan error, 10)
+ // 5 requests in queue
+ spawnQueuedAndCollectErrors(ctx2, "1", limiter, gauge, 5, errors)
+
+ // Decrease the limit
+ limit.Update(3)
+
+ // 5 more requests in queue
+ spawnQueuedAndCollectErrors(ctx2, "1", limiter, gauge, 5, errors)
+
+ // Trigger context cancellation
+ cancel()
+ for i := 0; i < 10; i++ {
+ require.EqualError(t, <-errors, "unexpected error when dequeueing request: context canceled")
+ }
+
+ // Other goroutines exit as normal
+ close(release1)
+ waitAfterRelease1()
+ require.Equal(t, 5, gauge.exit)
+ })
+
+ t.Run("dynamic limit works with multiple buckets", func(t *testing.T) {
+ ctx := testhelper.Context(t)
+ limit := NewAdaptiveLimit("dynamicLimit", AdaptiveSetting{Initial: 5, Max: 10, Min: 1})
+ gauge := &blockingQueueCounter{queuedCh: make(chan struct{})}
+
+ limiter := NewConcurrencyLimiter(ctx, limit, 5, nil, gauge)
+
+ var releaseChans []chan struct{}
+ var waitAcquireFuncs, waitReleaseFuncs []func()
+
+ // 5 * 5 requests acquired tokens
+ for i := 1; i <= 5; i++ {
+ release, waitAfterRelease := spawnAndWaitAcquired(ctx, t, fmt.Sprintf("%d", i), limiter, gauge, 5)
+ releaseChans = append(releaseChans, release)
+ waitReleaseFuncs = append(waitReleaseFuncs, waitAfterRelease)
+ }
+ require.Equal(t, 25, gauge.enter)
+
+ // 1 + 2 + 3 + 4 + 5 requests are in queue
+ for i := 1; i <= 5; i++ {
+ waitAcquired, release, waitAfterRelease := spawnAndWaitQueued(ctx, t, fmt.Sprintf("%d", i), limiter, gauge, i)
+ waitAcquireFuncs = append(waitAcquireFuncs, waitAcquired)
+ releaseChans = append(releaseChans, release)
+ waitReleaseFuncs = append(waitReleaseFuncs, waitAfterRelease)
+ }
+ require.Equal(t, 25, gauge.enter)
+ require.Equal(t, 40, gauge.queued)
+
+ // Update limit, enough for all requests
+ limit.Update(10)
+
+ // All requests acquired tokens now
+ for _, wait := range waitAcquireFuncs {
+ wait()
+ }
+ require.Equal(t, 40, gauge.enter)
+
+ // Release all
+ for _, release := range releaseChans {
+ close(release)
+ }
+ for _, wait := range waitReleaseFuncs {
+ wait()
+ }
+ require.Equal(t, 40, gauge.exit)
+ })
+}
+
+// spawnAndWaitAcquired spawns N goroutines that wait for the limiter. They wait until all of them acquire the limiter
+// token before exiting. This function returns a channel to control token release and a function to wait until all
+// goroutines finish.
+func spawnAndWaitAcquired(ctx context.Context, t *testing.T, bucket string, limiter *ConcurrencyLimiter, gauge *blockingQueueCounter, n int) (chan struct{}, func()) {
+ var acquireWg, releaseWg sync.WaitGroup
+ release := make(chan struct{})
+
+ for i := 0; i < n; i++ {
+ acquireWg.Add(1)
+ releaseWg.Add(1)
+ go func() {
+ defer releaseWg.Done()
+ _, err := limiter.Limit(ctx, bucket, func() (resp interface{}, err error) {
+ acquireWg.Done()
+ <-release
+ return nil, nil
+ })
+ require.NoError(t, err)
+ }()
+ }
+ for i := 0; i < n; i++ {
+ <-gauge.queuedCh
+ gauge.queued++
+ }
+ acquireWg.Wait()
+
+ return release, releaseWg.Wait
+}
+
+// spawnAndWaitQueued spawns N goroutines that wait for the limiter. They wait until all of them are queued. This
+// function returns a function to wait for channel to acquired the token, a channel to control token release, and a
+// function to wait until all goroutines finish.
+func spawnAndWaitQueued(ctx context.Context, t *testing.T, bucket string, limiter *ConcurrencyLimiter, gauge *blockingQueueCounter, n int) (func(), chan struct{}, func()) {
+ var acquireWg, releaseWg sync.WaitGroup
+ release := make(chan struct{})
+
+ for i := 0; i < n; i++ {
+ acquireWg.Add(1)
+ releaseWg.Add(1)
+ go func() {
+ defer releaseWg.Done()
+ _, err := limiter.Limit(ctx, bucket, func() (resp interface{}, err error) {
+ acquireWg.Done()
+ <-release
+ return nil, nil
+ })
+ require.NoError(t, err)
+ }()
+ }
+ for i := 0; i < n; i++ {
+ <-gauge.queuedCh
+ gauge.queued++
+ }
+
+ return acquireWg.Wait, release, releaseWg.Wait
+}
+
+func spawnQueuedAndCollectErrors(ctx context.Context, bucket string, limiter *ConcurrencyLimiter, gauge *blockingQueueCounter, n int, errors chan error) {
+ for i := 0; i < n; i++ {
+ go func() {
+ _, err := limiter.Limit(ctx, bucket, func() (interface{}, error) {
+ return nil, fmt.Errorf("should not call")
+ })
+ errors <- err
+ }()
+ }
+ for i := 0; i < n; i++ {
+ <-gauge.queuedCh
+ gauge.queued++
+ }
+}
+
+func maximumQueueSizeReached(ctx context.Context, t *testing.T, bucket string, limiter *ConcurrencyLimiter) {
+ _, err := limiter.Limit(ctx, bucket, func() (interface{}, error) {
+ return nil, fmt.Errorf("should not call")
+ })
+ require.EqualError(t, err, "maximum queue size reached")
+}
+
type blockingQueueCounter struct {
counter
@@ -248,7 +801,7 @@ func TestConcurrencyLimiter_queueLimit(t *testing.T) {
monitorCh := make(chan struct{})
monitor := &blockingQueueCounter{queuedCh: monitorCh}
ch := make(chan struct{})
- limiter := NewConcurrencyLimiter(1, queueLimit, nil, monitor)
+ limiter := NewConcurrencyLimiter(ctx, NewAdaptiveLimit("staticLimit", AdaptiveSetting{Initial: 1}), queueLimit, nil, monitor)
// occupied with one live request that takes a long time to complete
go func() {
@@ -333,7 +886,8 @@ func TestLimitConcurrency_queueWaitTime(t *testing.T) {
monitor := &blockingDequeueCounter{dequeuedCh: dequeuedCh}
limiter := NewConcurrencyLimiter(
- 1,
+ ctx,
+ NewAdaptiveLimit("staticLimit", AdaptiveSetting{Initial: 1}),
0,
func() helper.Ticker {
return ticker
diff --git a/internal/limiter/resizable_semaphore.go b/internal/limiter/resizable_semaphore.go
new file mode 100644
index 000000000..19434afc3
--- /dev/null
+++ b/internal/limiter/resizable_semaphore.go
@@ -0,0 +1,157 @@
+package limiter
+
+import (
+ "context"
+ "sync/atomic"
+)
+
+// resizableSemaphore struct models provides a way to bound concurrent access to resources. It allows a certain number
+// of concurrent access to the resources. When the concurrency reaches the semaphore size, the callers are blocked until
+// a resource is available again. The size of the semaphore can be resized freely in an atomic manner.
+//
+// Internally, a separate goroutine manages the semaphore's functionality, providing synchronization across all channel
+// operations. Callers acquire the semaphore by pulling a "token" through an `acquireCh` channel via `acquire()`, and
+// return them to an `releaseCh` channel via `release()`. This goroutine ensures that tokens are properly distributed
+// from those channels, and also manages the semaphore's current length and size. It processes resize requests and
+// handles try requests and responses, thus ensuring a smooth operation.
+//
+// Note: This struct is not intended to serve as a general-purpose data structure but is specifically designed for
+// flexible concurrency control with resizable capacity.
+type resizableSemaphore struct {
+ // err stores the error of why the sempahore is stopped. Most of the case, it's due to context cancellation.
+ err atomic.Pointer[error]
+ // current represents the current concurrency access to the resources.
+ current atomic.Int64
+ // size is the maximum capacity of the semaphore. It represents the maximum concurrency that the resources can
+ // be accessed of the time.
+ size atomic.Int64
+ // releaseCh is a channel used to return tokens back to the semaphore. When a caller returns a token, it sends a signal to this channel.
+ releaseCh chan struct{}
+ // acquireCh is a channel used for callers to acquire a token. When a token is available, a signal is sent to this channel.
+ acquireCh chan bool
+ // tryAcquireCh is a channel used to signal a try request. A try request is a non-blocking request to acquire a token.
+ tryAcquireCh chan struct{}
+ // tryAcquireRespCh is a channel used to respond to a try request. If a token is available, a nil error is sent; otherwise, an error is sent.
+ tryAcquireRespCh chan bool
+ // resizeCh is a channel used to request a resize of the semaphore's capacity. The requested new capacity is sent to this channel.
+ resizeCh chan int64
+ // stopCh is a channel that determines whether the semaphore is stopped.
+ stopCh chan struct{}
+}
+
+// NewResizableSemaphore creates and starts a resizableSemaphore
+func NewResizableSemaphore(ctx context.Context, capacity int64) *resizableSemaphore {
+ s := &resizableSemaphore{
+ stopCh: make(chan struct{}),
+ releaseCh: make(chan struct{}),
+ acquireCh: make(chan bool),
+ tryAcquireCh: make(chan struct{}),
+ tryAcquireRespCh: make(chan bool),
+ resizeCh: make(chan int64),
+ }
+ s.size.Store(capacity)
+ go s.start(ctx)
+
+ return s
+}
+
+// start kicks off a goroutine that maintains the states of the semaphore. It acts as an event loop that listens to
+// all modifications.
+func (q *resizableSemaphore) start(ctx context.Context) {
+ for {
+ if len := q.current.Load(); len < q.size.Load() {
+ select {
+ case <-q.releaseCh:
+ q.current.Store(len - 1)
+ case q.acquireCh <- true:
+ q.current.Store(len + 1)
+ case <-q.tryAcquireCh:
+ q.current.Store(len + 1)
+ q.tryAcquireRespCh <- true
+ case newSize := <-q.resizeCh:
+ // If the new capacity is greater than the prior one, the capacity grows without
+ // the need to do anything. It allows more callers to acquire the token in the
+ // right next iteration.
+ // In contrast, when the new capacity is less than the prior one, the capacity
+ // shrinks and the length becomes bigger than the new capacity. That's perfectly
+ // fine. All existing callers continue, new callers can't acquire new token. The
+ // length will be naturally adjusted below the capacity overtime.
+ q.size.Store(newSize)
+ case <-ctx.Done():
+ q.stop(ctx.Err())
+ return
+ }
+ } else {
+ select {
+ case <-q.releaseCh:
+ q.current.Store(len - 1)
+ case <-q.tryAcquireCh:
+ q.tryAcquireRespCh <- false
+ case newSize := <-q.resizeCh:
+ // Simiarly to the above case, overriding the capacity is enough.
+ q.size.Store(newSize)
+ case <-ctx.Done():
+ q.stop(ctx.Err())
+ return
+ }
+ }
+ }
+}
+
+func (q *resizableSemaphore) stop(err error) {
+ q.current.Store(0)
+ q.err.Store(&err)
+
+ close(q.stopCh)
+ // The only exposed channel. After this channel is closed, caller receives `false` when trying to pull from this
+ // channel. Other state-modifying channels stay instact but TryAcquire(), Release(), and Resize() returns
+ // immediately.
+ close(q.acquireCh)
+}
+
+// stores the error of why the sempahore is stopped. Most of the case, it's due to context cancellation.
+func (q *resizableSemaphore) Err() error {
+ return *q.err.Load()
+}
+
+// Acquire returns a channel that allows the caller to acquire the semaphore. The caller is blocked until there
+// is an available resource. It is not safe to use with a `switch` statement having `default` statement. In such
+// use cases, use TryAcquire() instead.
+func (q *resizableSemaphore) Acquire() <-chan bool {
+ return q.acquireCh
+}
+
+// TryAcquire acquires the semaphore without blocking. On success, returns true. On failure, returns false and leaves
+// the semaphore unchanged.
+func (q *resizableSemaphore) TryAcquire() bool {
+ select {
+ case q.tryAcquireCh <- struct{}{}:
+ return <-q.tryAcquireRespCh
+ case <-q.stopCh:
+ return false
+ }
+}
+
+// Release releases the semaphore by pushing the token back. If the semaphore stops, this function returns immediately.
+func (q *resizableSemaphore) Release() {
+ select {
+ case q.releaseCh <- struct{}{}:
+ case <-q.stopCh:
+ // No op, return immediately.
+ }
+}
+
+// Current returns the amount of current concurrent access to the semaphore.
+func (q *resizableSemaphore) Current() int64 {
+ return q.current.Load()
+}
+
+// Resize modifies the size of the semaphore. If the semaphore stops, this function returns immediately.
+func (q *resizableSemaphore) Resize(newSize int64) {
+ select {
+ case q.resizeCh <- newSize:
+ case <-q.stopCh:
+ // This update is redundant, but it's a nice gesture that the state of semaphore is up-to-date.
+ q.size.Store(newSize)
+ }
+}
diff --git a/internal/limiter/resizable_semaphore_test.go b/internal/limiter/resizable_semaphore_test.go
new file mode 100644
index 000000000..00ac90097
--- /dev/null
+++ b/internal/limiter/resizable_semaphore_test.go
@@ -0,0 +1,262 @@
+package limiter
+
+import (
+ "context"
+ "sync"
+ "testing"
+
+ "github.com/stretchr/testify/require"
+ "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper"
+)
+
+func TestResizableSemaphore_New(t *testing.T) {
+ t.Parallel()
+
+ semaphore := NewResizableSemaphore(testhelper.Context(t), 5)
+ require.Equal(t, int64(0), semaphore.Current())
+}
+
+func TestResizableSemaphore_Stopped(t *testing.T) {
+ t.Parallel()
+
+ ctx, cancel := context.WithCancel(testhelper.Context(t))
+ semaphore := NewResizableSemaphore(ctx, 5)
+
+ // Try to acquire a token of the empty sempahore
+ require.True(t, semaphore.TryAcquire())
+ semaphore.Release()
+
+ // 5 goroutines acquired semaphore
+ beforeCallRelease := make(chan struct{})
+ var acquireWg1, releaseWg1 sync.WaitGroup
+ for i := 0; i < 5; i++ {
+ acquireWg1.Add(1)
+ releaseWg1.Add(1)
+ go func() {
+ require.True(t, <-semaphore.Acquire())
+ acquireWg1.Done()
+
+ <-beforeCallRelease
+
+ semaphore.Release()
+ releaseWg1.Done()
+ }()
+ }
+ acquireWg1.Wait()
+
+ // Another 5 waits for sempahore
+ var acquireWg2 sync.WaitGroup
+ for i := 0; i < 5; i++ {
+ acquireWg2.Add(1)
+ go func() {
+ // This goroutine is block until the context is cancel, which returns false
+ require.False(t, <-semaphore.Acquire())
+ acquireWg2.Done()
+ }()
+ }
+
+ // Try to acquire a token of the full semaphore
+ require.False(t, semaphore.TryAcquire())
+
+ // Cancel the context
+ cancel()
+ acquireWg2.Wait()
+
+ // The first 5 goroutines can call Release() even if the semaphore stopped
+ close(beforeCallRelease)
+ releaseWg1.Wait()
+
+ // The last 5 goroutines exits immediately, Acquire() returns false
+ acquireWg2.Wait()
+
+ require.False(t, semaphore.TryAcquire())
+ require.Equal(t, int64(0), semaphore.Current())
+ require.Equal(t, ctx.Err(), semaphore.Err())
+}
+
+func TestResizableSemaphore_Acquire(t *testing.T) {
+ t.Parallel()
+
+ t.Run("acquire less than the capacity", func(t *testing.T) {
+ semaphore := NewResizableSemaphore(testhelper.Context(t), 5)
+
+ waitBeforeRelease, waitRelease := acquireSemaphore(t, semaphore, 4)
+ require.NotNil(t, <-semaphore.Acquire())
+
+ close(waitBeforeRelease)
+ waitRelease()
+ })
+
+ t.Run("acquire more than the capacity", func(t *testing.T) {
+ semaphore := NewResizableSemaphore(testhelper.Context(t), 5)
+
+ waitBeforeRelease, waitRelease := acquireSemaphore(t, semaphore, 5)
+ require.False(t, semaphore.TryAcquire())
+
+ close(waitBeforeRelease)
+ waitRelease()
+ })
+
+ t.Run("semaphore is full then available again", func(t *testing.T) {
+ semaphore := NewResizableSemaphore(testhelper.Context(t), 5)
+
+ waitChan := make(chan bool)
+ _, _ = acquireSemaphore(t, semaphore, 5)
+
+ go func() {
+ waitChan <- <-semaphore.Acquire()
+ }()
+
+ // The semaphore is full now
+ require.False(t, semaphore.TryAcquire())
+
+ // Release one token
+ semaphore.Release()
+ // The waiting channel is unlocked
+ require.True(t, <-waitChan)
+
+ // Release another token
+ semaphore.Release()
+ // Now TryAcquire can pull out a token
+ require.True(t, semaphore.TryAcquire())
+ })
+
+ t.Run("the semaphore is resized up when empty", func(t *testing.T) {
+ semaphore := NewResizableSemaphore(testhelper.Context(t), 5)
+ semaphore.Resize(10)
+
+ waitBeforeRelease, waitRelease := acquireSemaphore(t, semaphore, 9)
+ require.NotNil(t, <-semaphore.Acquire())
+
+ close(waitBeforeRelease)
+ waitRelease()
+ })
+
+ t.Run("the semaphore is resized up when not empty", func(t *testing.T) {
+ semaphore := NewResizableSemaphore(testhelper.Context(t), 7)
+
+ waitBeforeRelease1, waitRelease1 := acquireSemaphore(t, semaphore, 5)
+ semaphore.Resize(15)
+ waitBeforeRelease2, waitRelease2 := acquireSemaphore(t, semaphore, 5)
+
+ require.NotNil(t, <-semaphore.Acquire())
+
+ close(waitBeforeRelease1)
+ close(waitBeforeRelease2)
+ waitRelease1()
+ waitRelease2()
+ })
+
+ t.Run("the semaphore is resized up when full", func(t *testing.T) {
+ semaphore := NewResizableSemaphore(testhelper.Context(t), 5)
+
+ waitBeforeRelease1, waitRelease1 := acquireSemaphore(t, semaphore, 5)
+ require.False(t, semaphore.TryAcquire())
+
+ semaphore.Resize(10)
+
+ waitBeforeRelease2, waitRelease2 := acquireSemaphore(t, semaphore, 5)
+ require.False(t, semaphore.TryAcquire())
+
+ var wg sync.WaitGroup
+ for i := 0; i < 5; i++ {
+ wg.Add(1)
+ go func() {
+ <-semaphore.Acquire()
+ wg.Done()
+ semaphore.Release()
+ }()
+ }
+
+ semaphore.Resize(15)
+ wg.Wait()
+
+ close(waitBeforeRelease1)
+ close(waitBeforeRelease2)
+ waitRelease1()
+ waitRelease2()
+ })
+
+ t.Run("the semaphore is resized down when empty", func(t *testing.T) {
+ semaphore := NewResizableSemaphore(testhelper.Context(t), 10)
+ semaphore.Resize(5)
+
+ waitBeforeRelease, waitRelease := acquireSemaphore(t, semaphore, 4)
+ require.NotNil(t, <-semaphore.Acquire())
+
+ close(waitBeforeRelease)
+ waitRelease()
+ })
+
+ t.Run("the semaphore is resized down when not empty", func(t *testing.T) {
+ semaphore := NewResizableSemaphore(testhelper.Context(t), 20)
+
+ waitBeforeRelease1, waitRelease1 := acquireSemaphore(t, semaphore, 5)
+ semaphore.Resize(15)
+ waitBeforeRelease2, waitRelease2 := acquireSemaphore(t, semaphore, 5)
+
+ require.NotNil(t, <-semaphore.Acquire())
+
+ close(waitBeforeRelease1)
+ close(waitBeforeRelease2)
+ waitRelease1()
+ waitRelease2()
+ })
+
+ t.Run("the semaphore is resized down lower than the current length", func(t *testing.T) {
+ semaphore := NewResizableSemaphore(testhelper.Context(t), 10)
+
+ waitBeforeRelease1, waitRelease1 := acquireSemaphore(t, semaphore, 5)
+
+ semaphore.Resize(3)
+
+ require.False(t, semaphore.TryAcquire())
+ close(waitBeforeRelease1)
+ waitRelease1()
+
+ waitBeforeRelease2, waitRelease2 := acquireSemaphore(t, semaphore, 3)
+ require.False(t, semaphore.TryAcquire())
+
+ close(waitBeforeRelease2)
+ waitRelease2()
+ })
+
+ t.Run("the semaphore is resized down when full", func(t *testing.T) {
+ semaphore := NewResizableSemaphore(testhelper.Context(t), 10)
+
+ waitBeforeRelease1, waitRelease1 := acquireSemaphore(t, semaphore, 10)
+
+ semaphore.Resize(5)
+
+ require.False(t, semaphore.TryAcquire())
+ close(waitBeforeRelease1)
+ waitRelease1()
+
+ waitBeforeRelease2, waitRelease2 := acquireSemaphore(t, semaphore, 5)
+ require.False(t, semaphore.TryAcquire())
+
+ close(waitBeforeRelease2)
+ waitRelease2()
+ })
+}
+
+func acquireSemaphore(t *testing.T, semaphore *resizableSemaphore, n int) (chan struct{}, func()) {
+ var acquireWg, releaseWg sync.WaitGroup
+ waitBeforeRelease := make(chan struct{})
+
+ for i := 0; i < n; i++ {
+ acquireWg.Add(1)
+ releaseWg.Add(1)
+ go func() {
+ require.True(t, <-semaphore.Acquire())
+ acquireWg.Done()
+
+ <-waitBeforeRelease
+ semaphore.Release()
+ releaseWg.Done()
+ }()
+ }
+ acquireWg.Wait()
+
+ return waitBeforeRelease, releaseWg.Wait
+}
diff --git a/internal/testhelper/testserver/gitaly.go b/internal/testhelper/testserver/gitaly.go
index 8e44b7839..47fb56f46 100644
--- a/internal/testhelper/testserver/gitaly.go
+++ b/internal/testhelper/testserver/gitaly.go
@@ -161,6 +161,8 @@ func waitHealthy(tb testing.TB, ctx context.Context, addr string, authToken stri
func runGitaly(tb testing.TB, cfg config.Cfg, registrar func(srv *grpc.Server, deps *service.Dependencies), opts ...GitalyServerOpt) (*grpc.Server, string, bool) {
tb.Helper()
+ ctx := testhelper.Context(tb)
+
var gsd gitalyServerDeps
for _, opt := range opts {
gsd = opt(gsd)
@@ -173,7 +175,7 @@ func runGitaly(tb testing.TB, cfg config.Cfg, registrar func(srv *grpc.Server, d
server.WithStreamInterceptor(StructErrStreamInterceptor),
}
- deps := gsd.createDependencies(tb, cfg)
+ deps := gsd.createDependencies(ctx, tb, cfg)
tb.Cleanup(func() { gsd.conns.Close() })
serverFactory := server.NewGitalyServerFactory(
@@ -201,7 +203,6 @@ func runGitaly(tb testing.TB, cfg config.Cfg, registrar func(srv *grpc.Server, d
assert.NoError(tb, internalServer.Serve(internalListener), "failure to serve internal gRPC")
}()
- ctx := testhelper.Context(tb)
waitHealthy(tb, ctx, "unix://"+internalListener.Addr().String(), cfg.Auth.Token)
}
@@ -238,7 +239,6 @@ func runGitaly(tb testing.TB, cfg config.Cfg, registrar func(srv *grpc.Server, d
assert.NoError(tb, externalServer.Serve(listener), "failure to serve external gRPC")
}()
- ctx := testhelper.Context(tb)
waitHealthy(tb, ctx, addr, cfg.Auth.Token)
return externalServer, addr, gsd.disablePraefect
@@ -276,7 +276,7 @@ type gitalyServerDeps struct {
signingKey string
}
-func (gsd *gitalyServerDeps) createDependencies(tb testing.TB, cfg config.Cfg) *service.Dependencies {
+func (gsd *gitalyServerDeps) createDependencies(ctx context.Context, tb testing.TB, cfg config.Cfg) *service.Dependencies {
if gsd.logger == nil {
gsd.logger = testhelper.NewGitalyServerLogger(tb)
}
@@ -328,7 +328,8 @@ func (gsd *gitalyServerDeps) createDependencies(tb testing.TB, cfg config.Cfg) *
if gsd.packObjectsLimiter == nil {
gsd.packObjectsLimiter = limiter.NewConcurrencyLimiter(
- 0,
+ ctx,
+ limiter.NewAdaptiveLimit("staticLimit", limiter.AdaptiveSetting{Initial: 0}),
0,
nil,
limiter.NewNoopConcurrencyMonitor(),
@@ -336,7 +337,7 @@ func (gsd *gitalyServerDeps) createDependencies(tb testing.TB, cfg config.Cfg) *
}
if gsd.limitHandler == nil {
- gsd.limitHandler = limithandler.New(cfg, limithandler.LimitConcurrencyByRepo, limithandler.WithConcurrencyLimiters)
+ gsd.limitHandler = limithandler.New(cfg, limithandler.LimitConcurrencyByRepo, limithandler.WithConcurrencyLimiters(ctx))
}
if gsd.repositoryCounter == nil {