Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPatrick Steinhardt <psteinhardt@gitlab.com>2023-08-17 10:43:26 +0300
committerPatrick Steinhardt <psteinhardt@gitlab.com>2023-08-17 10:46:16 +0300
commitd83327df2f9c35c0cc46268d068c673341d7a4dc (patch)
treefa992712f3186d662427727caa2f7fc8d6ba987b
parentb3d63188f0d22232335bf973f3a236c388816942 (diff)
Revert introduction of adaptive pack-objects limiter
In 023c2a7d0 (Merge branch 'qmnguyen0711/add-adaptive-limit-to-pack-objects-limit' into 'master', 2023-08-09), we have introduced new code to add an adaptive limit to pack-objects. This new limiter is seemingly creating a memory leak that causes issues in production systems. Revert this merge request. Changelog: fixed
-rw-r--r--internal/cli/gitaly/serve.go10
-rw-r--r--internal/gitaly/server/auth_test.go31
-rw-r--r--internal/gitaly/service/hook/pack_objects_test.go3
-rw-r--r--internal/grpc/middleware/limithandler/middleware.go144
-rw-r--r--internal/grpc/middleware/limithandler/middleware_test.go24
-rw-r--r--internal/limiter/adaptive_calculator.go2
-rw-r--r--internal/limiter/adaptive_calculator_test.go10
-rw-r--r--internal/limiter/adaptive_limit.go57
-rw-r--r--internal/limiter/adaptive_limit_test.go124
-rw-r--r--internal/limiter/concurrency_limiter.go99
-rw-r--r--internal/limiter/concurrency_limiter_test.go562
-rw-r--r--internal/limiter/resizable_semaphore.go157
-rw-r--r--internal/limiter/resizable_semaphore_test.go262
-rw-r--r--internal/testhelper/testserver/gitaly.go13
14 files changed, 161 insertions, 1337 deletions
diff --git a/internal/cli/gitaly/serve.go b/internal/cli/gitaly/serve.go
index 302fd880d..f8180cdd4 100644
--- a/internal/cli/gitaly/serve.go
+++ b/internal/cli/gitaly/serve.go
@@ -271,15 +271,10 @@ func run(cfg config.Cfg, logger logrus.FieldLogger) error {
return fmt.Errorf("disk cache walkers: %w", err)
}
- // The pack-objects limit below is static at this stage. It's always equal to the initial limit, which uses
- // MaxConcurrency config.
- packObjectLimit := limiter.NewAdaptiveLimit("packObjects", limiter.AdaptiveSetting{
- Initial: cfg.PackObjectsLimiting.MaxConcurrency,
- })
concurrencyLimitHandler := limithandler.New(
cfg,
limithandler.LimitConcurrencyByRepo,
- limithandler.WithConcurrencyLimiters(ctx),
+ limithandler.WithConcurrencyLimiters,
)
rateLimitHandler := limithandler.New(
@@ -300,8 +295,7 @@ func run(cfg config.Cfg, logger logrus.FieldLogger) error {
}
}
packObjectsLimiter := limiter.NewConcurrencyLimiter(
- ctx,
- packObjectLimit,
+ cfg.PackObjectsLimiting.MaxConcurrency,
cfg.PackObjectsLimiting.MaxQueueLength,
newTickerFunc,
packObjectsMonitor,
diff --git a/internal/gitaly/server/auth_test.go b/internal/gitaly/server/auth_test.go
index c845bd68f..6333b5942 100644
--- a/internal/gitaly/server/auth_test.go
+++ b/internal/gitaly/server/auth_test.go
@@ -1,7 +1,7 @@
package server
import (
- "context"
+ netctx "context"
"crypto/tls"
"crypto/x509"
"fmt"
@@ -44,7 +44,7 @@ func TestMain(m *testing.M) {
}
func TestSanity(t *testing.T) {
- serverSocketPath := runServer(t, testhelper.Context(t), testcfg.Build(t))
+ serverSocketPath := runServer(t, testcfg.Build(t))
conn, err := dial(serverSocketPath, []grpc.DialOption{grpc.WithTransportCredentials(insecure.NewCredentials())})
require.NoError(t, err)
@@ -55,8 +55,7 @@ func TestSanity(t *testing.T) {
func TestTLSSanity(t *testing.T) {
cfg := testcfg.Build(t)
- ctx := testhelper.Context(t)
- addr := runSecureServer(t, ctx, cfg)
+ addr := runSecureServer(t, cfg)
certPool, err := x509.SystemCertPool()
require.NoError(t, err)
@@ -103,7 +102,7 @@ func TestAuthFailures(t *testing.T) {
Auth: auth.Config{Token: "quxbaz"},
}))
- serverSocketPath := runServer(t, testhelper.Context(t), cfg)
+ serverSocketPath := runServer(t, cfg)
connOpts := append(tc.opts, grpc.WithTransportCredentials(insecure.NewCredentials()))
conn, err := dial(serverSocketPath, connOpts)
require.NoError(t, err, tc.desc)
@@ -146,7 +145,7 @@ func TestAuthSuccess(t *testing.T) {
Auth: auth.Config{Token: tc.token, Transitioning: !tc.required},
}))
- serverSocketPath := runServer(t, testhelper.Context(t), cfg)
+ serverSocketPath := runServer(t, cfg)
connOpts := append(tc.opts, grpc.WithTransportCredentials(insecure.NewCredentials()))
conn, err := dial(serverSocketPath, connOpts)
require.NoError(t, err, tc.desc)
@@ -159,7 +158,7 @@ func TestAuthSuccess(t *testing.T) {
type brokenAuth struct{}
func (brokenAuth) RequireTransportSecurity() bool { return false }
-func (brokenAuth) GetRequestMetadata(context.Context, ...string) (map[string]string, error) {
+func (brokenAuth) GetRequestMetadata(netctx.Context, ...string) (map[string]string, error) {
return map[string]string{"authorization": "Bearer blablabla"}, nil
}
@@ -187,7 +186,7 @@ func newOperationClient(t *testing.T, token, serverSocketPath string) (gitalypb.
return gitalypb.NewOperationServiceClient(conn), conn
}
-func runServer(t *testing.T, ctx context.Context, cfg config.Cfg) string {
+func runServer(t *testing.T, cfg config.Cfg) string {
t.Helper()
registry := backchannel.NewRegistry()
@@ -202,7 +201,7 @@ func runServer(t *testing.T, ctx context.Context, cfg config.Cfg) string {
catfileCache := catfile.NewCache(cfg)
t.Cleanup(catfileCache.Stop)
diskCache := cache.New(cfg, locator)
- limitHandler := limithandler.New(cfg, limithandler.LimitConcurrencyByRepo, limithandler.WithConcurrencyLimiters(ctx))
+ limitHandler := limithandler.New(cfg, limithandler.LimitConcurrencyByRepo, limithandler.WithConcurrencyLimiters)
updaterWithHooks := updateref.NewUpdaterWithHooks(cfg, locator, hookManager, gitCmdFactory, catfileCache)
srv, err := NewGitalyServerFactory(cfg, testhelper.NewDiscardingLogEntry(t), registry, diskCache, []*limithandler.LimiterMiddleware{limitHandler}).New(false)
@@ -229,7 +228,7 @@ func runServer(t *testing.T, ctx context.Context, cfg config.Cfg) string {
}
//go:generate openssl req -newkey rsa:4096 -new -nodes -x509 -days 3650 -out testdata/gitalycert.pem -keyout testdata/gitalykey.pem -subj "/C=US/ST=California/L=San Francisco/O=GitLab/OU=GitLab-Shell/CN=localhost" -addext "subjectAltName = IP:127.0.0.1, DNS:localhost"
-func runSecureServer(t *testing.T, ctx context.Context, cfg config.Cfg) string {
+func runSecureServer(t *testing.T, cfg config.Cfg) string {
t.Helper()
cfg.TLS = config.TLS{
@@ -245,7 +244,7 @@ func runSecureServer(t *testing.T, ctx context.Context, cfg config.Cfg) string {
testhelper.NewDiscardingLogEntry(t),
backchannel.NewRegistry(),
cache.New(cfg, config.NewLocator(cfg)),
- []*limithandler.LimiterMiddleware{limithandler.New(cfg, limithandler.LimitConcurrencyByRepo, limithandler.WithConcurrencyLimiters(ctx))},
+ []*limithandler.LimiterMiddleware{limithandler.New(cfg, limithandler.LimitConcurrencyByRepo, limithandler.WithConcurrencyLimiters)},
).New(true)
require.NoError(t, err)
@@ -260,12 +259,12 @@ func runSecureServer(t *testing.T, ctx context.Context, cfg config.Cfg) string {
func TestUnaryNoAuth(t *testing.T) {
cfg := testcfg.Build(t, testcfg.WithBase(config.Cfg{Auth: auth.Config{Token: "testtoken"}}))
- ctx := testhelper.Context(t)
- path := runServer(t, ctx, cfg)
+ path := runServer(t, cfg)
conn, err := grpc.Dial(path, grpc.WithTransportCredentials(insecure.NewCredentials()))
require.NoError(t, err)
defer testhelper.MustClose(t, conn)
+ ctx := testhelper.Context(t)
client := gitalypb.NewRepositoryServiceClient(conn)
_, err = client.CreateRepository(ctx, &gitalypb.CreateRepositoryRequest{
@@ -281,12 +280,12 @@ func TestUnaryNoAuth(t *testing.T) {
func TestStreamingNoAuth(t *testing.T) {
cfg := testcfg.Build(t, testcfg.WithBase(config.Cfg{Auth: auth.Config{Token: "testtoken"}}))
- ctx := testhelper.Context(t)
- path := runServer(t, ctx, cfg)
+ path := runServer(t, cfg)
conn, err := dial(path, []grpc.DialOption{grpc.WithTransportCredentials(insecure.NewCredentials())})
require.NoError(t, err)
t.Cleanup(func() { conn.Close() })
+ ctx := testhelper.Context(t)
client := gitalypb.NewRepositoryServiceClient(conn)
stream, err := client.GetInfoAttributes(ctx, &gitalypb.GetInfoAttributesRequest{
@@ -331,7 +330,7 @@ func TestAuthBeforeLimit(t *testing.T) {
t.Cleanup(cleanup)
cfg.Gitlab.URL = gitlabURL
- serverSocketPath := runServer(t, ctx, cfg)
+ serverSocketPath := runServer(t, cfg)
client, conn := newOperationClient(t, cfg.Auth.Token, serverSocketPath)
t.Cleanup(func() { conn.Close() })
diff --git a/internal/gitaly/service/hook/pack_objects_test.go b/internal/gitaly/service/hook/pack_objects_test.go
index 62fa0dca8..4d4d1a500 100644
--- a/internal/gitaly/service/hook/pack_objects_test.go
+++ b/internal/gitaly/service/hook/pack_objects_test.go
@@ -858,8 +858,7 @@ func TestPackObjects_concurrencyLimit(t *testing.T) {
cfg.Prometheus.GRPCLatencyBuckets,
)
limiter := limiter.NewConcurrencyLimiter(
- ctx,
- limiter.NewAdaptiveLimit("staticLimit", limiter.AdaptiveSetting{Initial: 1}),
+ 1,
0,
func() helper.Ticker { return ticker },
monitor,
diff --git a/internal/grpc/middleware/limithandler/middleware.go b/internal/grpc/middleware/limithandler/middleware.go
index a6ed93f9a..0c41a4672 100644
--- a/internal/grpc/middleware/limithandler/middleware.go
+++ b/internal/grpc/middleware/limithandler/middleware.go
@@ -165,88 +165,84 @@ func (w *wrappedStream) RecvMsg(m interface{}) error {
// WithConcurrencyLimiters sets up middleware to limit the concurrency of
// requests based on RPC and repository
-func WithConcurrencyLimiters(ctx context.Context) SetupFunc {
- return func(cfg config.Cfg, middleware *LimiterMiddleware) {
- acquiringSecondsMetric := prometheus.NewHistogramVec(
- prometheus.HistogramOpts{
- Namespace: "gitaly",
- Subsystem: "concurrency_limiting",
- Name: "acquiring_seconds",
- Help: "Histogram of time calls are rate limited (in seconds)",
- Buckets: cfg.Prometheus.GRPCLatencyBuckets,
- },
- []string{"system", "grpc_service", "grpc_method"},
- )
- inProgressMetric := prometheus.NewGaugeVec(
- prometheus.GaugeOpts{
- Namespace: "gitaly",
- Subsystem: "concurrency_limiting",
- Name: "in_progress",
- Help: "Gauge of number of concurrent in-progress calls",
- },
- []string{"system", "grpc_service", "grpc_method"},
- )
- queuedMetric := prometheus.NewGaugeVec(
- prometheus.GaugeOpts{
- Namespace: "gitaly",
- Subsystem: "concurrency_limiting",
- Name: "queued",
- Help: "Gauge of number of queued calls",
- },
- []string{"system", "grpc_service", "grpc_method"},
- )
-
- middleware.collect = func(metrics chan<- prometheus.Metric) {
- acquiringSecondsMetric.Collect(metrics)
- inProgressMetric.Collect(metrics)
- queuedMetric.Collect(metrics)
- }
+func WithConcurrencyLimiters(cfg config.Cfg, middleware *LimiterMiddleware) {
+ acquiringSecondsMetric := prometheus.NewHistogramVec(
+ prometheus.HistogramOpts{
+ Namespace: "gitaly",
+ Subsystem: "concurrency_limiting",
+ Name: "acquiring_seconds",
+ Help: "Histogram of time calls are rate limited (in seconds)",
+ Buckets: cfg.Prometheus.GRPCLatencyBuckets,
+ },
+ []string{"system", "grpc_service", "grpc_method"},
+ )
+ inProgressMetric := prometheus.NewGaugeVec(
+ prometheus.GaugeOpts{
+ Namespace: "gitaly",
+ Subsystem: "concurrency_limiting",
+ Name: "in_progress",
+ Help: "Gauge of number of concurrent in-progress calls",
+ },
+ []string{"system", "grpc_service", "grpc_method"},
+ )
+ queuedMetric := prometheus.NewGaugeVec(
+ prometheus.GaugeOpts{
+ Namespace: "gitaly",
+ Subsystem: "concurrency_limiting",
+ Name: "queued",
+ Help: "Gauge of number of queued calls",
+ },
+ []string{"system", "grpc_service", "grpc_method"},
+ )
+
+ middleware.collect = func(metrics chan<- prometheus.Metric) {
+ acquiringSecondsMetric.Collect(metrics)
+ inProgressMetric.Collect(metrics)
+ queuedMetric.Collect(metrics)
+ }
- result := make(map[string]limiter.Limiter)
- for _, limit := range cfg.Concurrency {
- limit := limit
+ result := make(map[string]limiter.Limiter)
+ for _, limit := range cfg.Concurrency {
+ limit := limit
- newTickerFunc := func() helper.Ticker {
- return helper.NewManualTicker()
- }
+ newTickerFunc := func() helper.Ticker {
+ return helper.NewManualTicker()
+ }
- if limit.MaxQueueWait > 0 {
- newTickerFunc = func() helper.Ticker {
- return helper.NewTimerTicker(limit.MaxQueueWait.Duration())
- }
+ if limit.MaxQueueWait > 0 {
+ newTickerFunc = func() helper.Ticker {
+ return helper.NewTimerTicker(limit.MaxQueueWait.Duration())
}
-
- result[limit.RPC] = limiter.NewConcurrencyLimiter(
- ctx,
- limiter.NewAdaptiveLimit("staticLimit", limiter.AdaptiveSetting{Initial: limit.MaxPerRepo}),
- limit.MaxQueueSize,
- newTickerFunc,
- limiter.NewPerRPCPromMonitor(
- "gitaly", limit.RPC,
- queuedMetric, inProgressMetric, acquiringSecondsMetric, middleware.requestsDroppedMetric,
- ),
- )
}
- // Set default for ReplicateRepository.
- replicateRepositoryFullMethod := "/gitaly.RepositoryService/ReplicateRepository"
- if _, ok := result[replicateRepositoryFullMethod]; !ok {
- result[replicateRepositoryFullMethod] = limiter.NewConcurrencyLimiter(
- ctx,
- limiter.NewAdaptiveLimit("staticLimit", limiter.AdaptiveSetting{Initial: 1}),
- 0,
- func() helper.Ticker {
- return helper.NewManualTicker()
- },
- limiter.NewPerRPCPromMonitor(
- "gitaly", replicateRepositoryFullMethod,
- queuedMetric, inProgressMetric, acquiringSecondsMetric, middleware.requestsDroppedMetric,
- ),
- )
- }
+ result[limit.RPC] = limiter.NewConcurrencyLimiter(
+ limit.MaxPerRepo,
+ limit.MaxQueueSize,
+ newTickerFunc,
+ limiter.NewPerRPCPromMonitor(
+ "gitaly", limit.RPC,
+ queuedMetric, inProgressMetric, acquiringSecondsMetric, middleware.requestsDroppedMetric,
+ ),
+ )
+ }
- middleware.methodLimiters = result
+ // Set default for ReplicateRepository.
+ replicateRepositoryFullMethod := "/gitaly.RepositoryService/ReplicateRepository"
+ if _, ok := result[replicateRepositoryFullMethod]; !ok {
+ result[replicateRepositoryFullMethod] = limiter.NewConcurrencyLimiter(
+ 1,
+ 0,
+ func() helper.Ticker {
+ return helper.NewManualTicker()
+ },
+ limiter.NewPerRPCPromMonitor(
+ "gitaly", replicateRepositoryFullMethod,
+ queuedMetric, inProgressMetric, acquiringSecondsMetric, middleware.requestsDroppedMetric,
+ ),
+ )
}
+
+ middleware.methodLimiters = result
}
// WithRateLimiters sets up a middleware with limiters that limit requests
diff --git a/internal/grpc/middleware/limithandler/middleware_test.go b/internal/grpc/middleware/limithandler/middleware_test.go
index 40978be69..65adad2be 100644
--- a/internal/grpc/middleware/limithandler/middleware_test.go
+++ b/internal/grpc/middleware/limithandler/middleware_test.go
@@ -38,7 +38,6 @@ func fixedLockKey(ctx context.Context) string {
func TestUnaryLimitHandler(t *testing.T) {
t.Parallel()
- ctx := testhelper.Context(t)
s := &queueTestServer{
server: server{
blockCh: make(chan struct{}),
@@ -52,13 +51,14 @@ func TestUnaryLimitHandler(t *testing.T) {
},
}
- lh := limithandler.New(cfg, fixedLockKey, limithandler.WithConcurrencyLimiters(ctx))
+ lh := limithandler.New(cfg, fixedLockKey, limithandler.WithConcurrencyLimiters)
interceptor := lh.UnaryInterceptor()
srv, serverSocketPath := runServer(t, s, grpc.UnaryInterceptor(interceptor))
defer srv.Stop()
client, conn := newClient(t, serverSocketPath)
defer conn.Close()
+ ctx := testhelper.Context(t)
var wg sync.WaitGroup
defer wg.Wait()
@@ -114,7 +114,7 @@ func TestUnaryLimitHandler_queueing(t *testing.T) {
MaxQueueWait: duration.Duration(time.Millisecond),
},
},
- }, fixedLockKey, limithandler.WithConcurrencyLimiters(ctx))
+ }, fixedLockKey, limithandler.WithConcurrencyLimiters)
s := &queueTestServer{
server: server{
@@ -175,7 +175,7 @@ func TestUnaryLimitHandler_queueing(t *testing.T) {
MaxPerRepo: 1,
},
},
- }, fixedLockKey, limithandler.WithConcurrencyLimiters(ctx))
+ }, fixedLockKey, limithandler.WithConcurrencyLimiters)
s := &queueTestServer{
server: server{
@@ -427,7 +427,6 @@ func TestStreamLimitHandler(t *testing.T) {
t.Run(tc.desc, func(t *testing.T) {
t.Parallel()
- ctx := testhelper.Context(t)
s := &server{blockCh: make(chan struct{})}
maxQueueSize := 1
@@ -441,13 +440,14 @@ func TestStreamLimitHandler(t *testing.T) {
},
}
- lh := limithandler.New(cfg, fixedLockKey, limithandler.WithConcurrencyLimiters(ctx))
+ lh := limithandler.New(cfg, fixedLockKey, limithandler.WithConcurrencyLimiters)
interceptor := lh.StreamInterceptor()
srv, serverSocketPath := runServer(t, s, grpc.StreamInterceptor(interceptor))
defer srv.Stop()
client, conn := newClient(t, serverSocketPath)
defer conn.Close()
+ ctx := testhelper.Context(t)
totalCalls := 10
@@ -481,8 +481,6 @@ func TestStreamLimitHandler(t *testing.T) {
func TestStreamLimitHandler_error(t *testing.T) {
t.Parallel()
- ctx := testhelper.Context(t)
-
s := &queueTestServer{reqArrivedCh: make(chan struct{})}
s.blockCh = make(chan struct{})
@@ -492,7 +490,7 @@ func TestStreamLimitHandler_error(t *testing.T) {
},
}
- lh := limithandler.New(cfg, fixedLockKey, limithandler.WithConcurrencyLimiters(ctx))
+ lh := limithandler.New(cfg, fixedLockKey, limithandler.WithConcurrencyLimiters)
interceptor := lh.StreamInterceptor()
srv, serverSocketPath := runServer(t, s, grpc.StreamInterceptor(interceptor))
defer srv.Stop()
@@ -500,6 +498,8 @@ func TestStreamLimitHandler_error(t *testing.T) {
client, conn := newClient(t, serverSocketPath)
defer conn.Close()
+ ctx := testhelper.Context(t)
+
respChan := make(chan *grpc_testing.StreamingOutputCallResponse)
go func() {
stream, err := client.FullDuplexCall(ctx)
@@ -600,8 +600,6 @@ func (q *queueTestServer) FullDuplexCall(stream grpc_testing.TestService_FullDup
}
func TestConcurrencyLimitHandlerMetrics(t *testing.T) {
- ctx := testhelper.Context(t)
-
s := &queueTestServer{reqArrivedCh: make(chan struct{})}
s.blockCh = make(chan struct{})
@@ -612,7 +610,7 @@ func TestConcurrencyLimitHandlerMetrics(t *testing.T) {
},
}
- lh := limithandler.New(cfg, fixedLockKey, limithandler.WithConcurrencyLimiters(ctx))
+ lh := limithandler.New(cfg, fixedLockKey, limithandler.WithConcurrencyLimiters)
interceptor := lh.UnaryInterceptor()
srv, serverSocketPath := runServer(t, s, grpc.UnaryInterceptor(interceptor))
defer srv.Stop()
@@ -620,6 +618,8 @@ func TestConcurrencyLimitHandlerMetrics(t *testing.T) {
client, conn := newClient(t, serverSocketPath)
defer conn.Close()
+ ctx := testhelper.Context(t)
+
respCh := make(chan *grpc_testing.SimpleResponse)
go func() {
resp, err := client.UnaryCall(ctx, &grpc_testing.SimpleRequest{})
diff --git a/internal/limiter/adaptive_calculator.go b/internal/limiter/adaptive_calculator.go
index 793fc853c..15d7cf87b 100644
--- a/internal/limiter/adaptive_calculator.go
+++ b/internal/limiter/adaptive_calculator.go
@@ -262,7 +262,7 @@ func (c *AdaptiveCalculator) calibrateLimits(ctx context.Context) {
}).Debugf("Additive increase")
} else {
// Multiplicative decrease
- newLimit = int(math.Floor(float64(limit.Current()) * setting.BackoffFactor))
+ newLimit = int(math.Floor(float64(limit.Current()) * setting.BackoffBackoff))
if newLimit < setting.Min {
newLimit = setting.Min
}
diff --git a/internal/limiter/adaptive_calculator_test.go b/internal/limiter/adaptive_calculator_test.go
index 5c2a5dae7..f4461d27d 100644
--- a/internal/limiter/adaptive_calculator_test.go
+++ b/internal/limiter/adaptive_calculator_test.go
@@ -659,14 +659,12 @@ func (l *testLimit) Update(val int) {
l.currents = append(l.currents, val)
}
-func (*testLimit) AfterUpdate(_ AfterUpdateHook) {}
-
func (l *testLimit) Setting() AdaptiveSetting {
return AdaptiveSetting{
- Initial: l.initial,
- Max: l.max,
- Min: l.min,
- BackoffFactor: l.backoffBackoff,
+ Initial: l.initial,
+ Max: l.max,
+ Min: l.min,
+ BackoffBackoff: l.backoffBackoff,
}
}
diff --git a/internal/limiter/adaptive_limit.go b/internal/limiter/adaptive_limit.go
index 5af9f8ebd..2aefe5eb0 100644
--- a/internal/limiter/adaptive_limit.go
+++ b/internal/limiter/adaptive_limit.go
@@ -1,49 +1,30 @@
package limiter
-import (
- "sync"
-)
+import "sync/atomic"
// AdaptiveSetting is a struct that holds the configuration parameters for an adaptive limiter.
type AdaptiveSetting struct {
- Initial int
- Max int
- Min int
- BackoffFactor float64
+ Initial int
+ Max int
+ Min int
+ BackoffBackoff float64
}
-// AfterUpdateHook is a function hook that is triggered when the current value changes. The callers need to register a hook to
-// the AdaptiveLimiter implementation beforehand. They are required to handle errors inside the hook function.
-type AfterUpdateHook func(newVal int)
-
// AdaptiveLimiter is an interface for managing and updating adaptive limits.
// It exposes methods to get the name, current limit value, update the limit value, and access its settings.
type AdaptiveLimiter interface {
Name() string
Current() int
Update(val int)
- AfterUpdate(AfterUpdateHook)
Setting() AdaptiveSetting
}
// AdaptiveLimit is an implementation of the AdaptiveLimiter interface. It uses an atomic Int32 to represent the current
// limit value, ensuring thread-safe updates.
type AdaptiveLimit struct {
- sync.RWMutex
-
- name string
- current int
- setting AdaptiveSetting
- updateHooks []AfterUpdateHook
-}
-
-// NewAdaptiveLimit initializes a new AdaptiveLimit object
-func NewAdaptiveLimit(name string, setting AdaptiveSetting) *AdaptiveLimit {
- return &AdaptiveLimit{
- name: name,
- current: setting.Initial,
- setting: setting,
- }
+ name string
+ current atomic.Int32
+ setting AdaptiveSetting
}
// Name returns the name of the adaptive limit
@@ -53,30 +34,12 @@ func (l *AdaptiveLimit) Name() string {
// Current returns the current limit. This function can be called without the need for synchronization.
func (l *AdaptiveLimit) Current() int {
- l.RLock()
- defer l.RUnlock()
-
- return l.current
+ return int(l.current.Load())
}
// Update adjusts current limit value.
func (l *AdaptiveLimit) Update(val int) {
- l.Lock()
- defer l.Unlock()
-
- if val != l.current {
- l.current = val
- for _, hook := range l.updateHooks {
- hook(val)
- }
- }
-}
-
-// AfterUpdate registers a callback when the current limit is updated. Because all updates and hooks are synchronized,
-// calling Current() inside the update hook in the same goroutine will cause deadlock. Hence, the update hook must
-// use the new value passed as the argument instead.
-func (l *AdaptiveLimit) AfterUpdate(hook AfterUpdateHook) {
- l.updateHooks = append(l.updateHooks, hook)
+ l.current.Store(int32(val))
}
// Setting returns the configuration parameters for an adaptive limiter.
diff --git a/internal/limiter/adaptive_limit_test.go b/internal/limiter/adaptive_limit_test.go
deleted file mode 100644
index f6c67f092..000000000
--- a/internal/limiter/adaptive_limit_test.go
+++ /dev/null
@@ -1,124 +0,0 @@
-package limiter
-
-import (
- "testing"
-
- "github.com/stretchr/testify/require"
-)
-
-func TestAdaptiveLimit_New(t *testing.T) {
- t.Parallel()
-
- setting := AdaptiveSetting{
- Initial: 5,
- Max: 10,
- Min: 1,
- BackoffFactor: 0.5,
- }
-
- limit := NewAdaptiveLimit("testLimit", setting)
- require.Equal(t, limit.Name(), "testLimit")
- require.Equal(t, limit.Current(), 5)
- require.Equal(t, limit.Setting(), setting)
-}
-
-func TestAdaptiveLimit_Update(t *testing.T) {
- t.Parallel()
-
- newLimit := func() *AdaptiveLimit {
- return NewAdaptiveLimit("testLimit", AdaptiveSetting{
- Initial: 5,
- Max: 10,
- Min: 1,
- BackoffFactor: 0.5,
- })
- }
-
- t.Run("without update hooks", func(t *testing.T) {
- limit := newLimit()
-
- limit.Update(1)
- require.Equal(t, 1, limit.Current())
-
- limit.Update(2)
- require.Equal(t, 2, limit.Current())
-
- limit.Update(3)
- require.Equal(t, 3, limit.Current())
- })
-
- t.Run("new values are different from old values", func(t *testing.T) {
- limit := newLimit()
-
- vals := []int{}
- limit.AfterUpdate(func(val int) {
- vals = append(vals, val)
- })
-
- limit.Update(1)
- require.Equal(t, 1, limit.Current())
- require.Equal(t, vals, []int{1})
-
- limit.Update(2)
- require.Equal(t, 2, limit.Current())
- require.Equal(t, vals, []int{1, 2})
-
- limit.Update(3)
- require.Equal(t, 3, limit.Current())
- require.Equal(t, vals, []int{1, 2, 3})
- })
-
- t.Run("new values are the same as old values", func(t *testing.T) {
- limit := newLimit()
-
- vals := []int{}
- limit.AfterUpdate(func(val int) {
- vals = append(vals, val)
- })
-
- limit.Update(1)
- require.Equal(t, 1, limit.Current())
- require.Equal(t, vals, []int{1})
-
- limit.Update(1)
- require.Equal(t, 1, limit.Current())
- require.Equal(t, vals, []int{1})
-
- limit.Update(2)
- require.Equal(t, 2, limit.Current())
- require.Equal(t, vals, []int{1, 2})
-
- limit.Update(2)
- require.Equal(t, 2, limit.Current())
- require.Equal(t, vals, []int{1, 2})
- })
-
- t.Run("multiple update hooks", func(t *testing.T) {
- limit := newLimit()
-
- vals1 := []int{}
- limit.AfterUpdate(func(val int) {
- vals1 = append(vals1, val)
- })
-
- vals2 := []int{}
- limit.AfterUpdate(func(val int) {
- vals2 = append(vals2, val*2)
- })
-
- limit.Update(1)
- require.Equal(t, 1, limit.Current())
- require.Equal(t, vals1, []int{1})
- require.Equal(t, vals2, []int{2})
-
- limit.Update(2)
- require.Equal(t, 2, limit.Current())
- require.Equal(t, vals1, []int{1, 2})
- require.Equal(t, vals2, []int{2, 4})
-
- limit.Update(3)
- require.Equal(t, 3, limit.Current())
- require.Equal(t, vals1, []int{1, 2, 3})
- require.Equal(t, vals2, []int{2, 4, 6})
- })
-}
diff --git a/internal/limiter/concurrency_limiter.go b/internal/limiter/concurrency_limiter.go
index b4284adeb..2807b856e 100644
--- a/internal/limiter/concurrency_limiter.go
+++ b/internal/limiter/concurrency_limiter.go
@@ -41,10 +41,10 @@ type keyedConcurrencyLimiter struct {
// concurrencyTokens is the channel of available concurrency tokens, where every token
// allows one concurrent call to the concurrency-limited function.
- concurrencyTokens *resizableSemaphore
+ concurrencyTokens chan struct{}
// queueTokens is the channel of available queue tokens, where every token allows one
// concurrent call to be admitted to the queue.
- queueTokens *resizableSemaphore
+ queueTokens chan struct{}
}
// acquire tries to acquire the semaphore. It may fail if the admission queue is full or if the max
@@ -55,33 +55,34 @@ func (sem *keyedConcurrencyLimiter) acquire(ctx context.Context, limitingKey str
// callers may wait for the concurrency token at the same time. If there are no more
// queueing tokens then this indicates that the queue is full and we thus return an
// error immediately.
- if !sem.queueTokens.TryAcquire() {
+ select {
+ case sem.queueTokens <- struct{}{}:
+ // We have acquired a queueing token, so we need to release it if acquiring
+ // the concurrency token fails. If we succeed to acquire the concurrency
+ // token though then we retain the queueing token until the caller signals
+ // that the concurrency-limited function has finished. As a consequence the
+ // queue token is returned together with the concurrency token.
+ //
+ // A simpler model would be to just have `maxQueueLength` many queueing
+ // tokens. But this would add concurrency-limiting when acquiring the queue
+ // token itself, which is not what we want to do. Instead, we want to admit
+ // as many callers into the queue as the queue length permits plus the
+ // number of available concurrency tokens allows.
+ defer func() {
+ if returnedErr != nil {
+ <-sem.queueTokens
+ }
+ }()
+ default:
return ErrMaxQueueSize
}
-
- // We have acquired a queueing token, so we need to release it if acquiring
- // the concurrency token fails. If we successfully acquire the concurrency
- // token though then we retain the queueing token until the caller signals
- // that the concurrency-limited function has finished. As a consequence the
- // queue token is returned together with the concurrency token.
- //
- // A simpler model would be to just have `maxQueueLength` many queueing
- // tokens. But this would add concurrency-limiting when acquiring the queue
- // token itself, which is not what we want to do. Instead, we want to admit
- // as many callers into the queue as the queue length permits plus the
- // number of available concurrency tokens allows.
- defer func() {
- if returnedErr != nil {
- sem.queueTokens.Release()
- }
- }()
}
// We are queued now, so let's tell the monitor. Furthermore, even though we're still
// holding the queueing token when this function exits successfully we also tell the monitor
// that we have exited the queue. It is only an implementation detail anyway that we hold on
// to the token, so the monitor shouldn't care about that.
- sem.monitor.Queued(ctx, limitingKey, sem.queueLength())
+ sem.monitor.Queued(ctx, limitingKey, len(sem.queueTokens))
defer sem.monitor.Dequeued(ctx)
// Set up the ticker that keeps us from waiting indefinitely on the concurrency token.
@@ -97,12 +98,7 @@ func (sem *keyedConcurrencyLimiter) acquire(ctx context.Context, limitingKey str
// Try to acquire the concurrency token now that we're in the queue.
select {
- case acquired := <-sem.concurrencyTokens.Acquire():
- // When the semaphore returns false, the semaphore was stopped. It's likely due to the context is
- // cancelled. Hence, we should return the error here.
- if !acquired {
- return sem.concurrencyTokens.Err()
- }
+ case sem.concurrencyTokens <- struct{}{}:
return nil
case <-ticker.C():
return ErrMaxQueueTime
@@ -114,27 +110,21 @@ func (sem *keyedConcurrencyLimiter) acquire(ctx context.Context, limitingKey str
// release releases the acquired tokens.
func (sem *keyedConcurrencyLimiter) release() {
if sem.queueTokens != nil {
- sem.queueTokens.Release()
+ <-sem.queueTokens
}
- sem.concurrencyTokens.Release()
+ <-sem.concurrencyTokens
}
// queueLength returns the length of token queue
func (sem *keyedConcurrencyLimiter) queueLength() int {
- if sem.queueTokens == nil {
- return 0
- }
- return int(sem.queueTokens.Current())
+ return len(sem.queueTokens)
}
// ConcurrencyLimiter contains rate limiter state.
type ConcurrencyLimiter struct {
- // ctx stores the context at initialization. This context is used as a stopping condition
- // for some internal goroutines.
- ctx context.Context
- // limit is the adaptive maximum number of concurrent calls to the limited function. This limit is
- // calculated adaptively from an outside calculator.
- limit *AdaptiveLimit
+ // maxConcurrencyLimit is the maximum number of concurrent calls to the limited function.
+ // This limit is per key.
+ maxConcurrencyLimit int64
// maxQueueLength is the maximum number of operations allowed to wait in a queued state.
// This limit is global and applies before the concurrency limit. Subsequent incoming
// operations will be rejected with an error immediately.
@@ -155,31 +145,18 @@ type ConcurrencyLimiter struct {
}
// NewConcurrencyLimiter creates a new concurrency rate limiter.
-func NewConcurrencyLimiter(ctx context.Context, limit *AdaptiveLimit, maxQueueLength int, maxQueuedTickerCreator QueueTickerCreator, monitor ConcurrencyMonitor) *ConcurrencyLimiter {
+func NewConcurrencyLimiter(maxConcurrencyLimit, maxQueueLength int, maxQueuedTickerCreator QueueTickerCreator, monitor ConcurrencyMonitor) *ConcurrencyLimiter {
if monitor == nil {
monitor = NewNoopConcurrencyMonitor()
}
- limiter := &ConcurrencyLimiter{
- ctx: ctx,
- limit: limit,
+ return &ConcurrencyLimiter{
+ maxConcurrencyLimit: int64(maxConcurrencyLimit),
maxQueueLength: int64(maxQueueLength),
maxQueuedTickerCreator: maxQueuedTickerCreator,
monitor: monitor,
limitsByKey: make(map[string]*keyedConcurrencyLimiter),
}
-
- // When the capacity of the limiter is updated we also need to update the size of both the queuing tokens as
- // well as the concurrency tokens to match the new size.
- limit.AfterUpdate(func(val int) {
- for _, keyedLimiter := range limiter.limitsByKey {
- if keyedLimiter.queueTokens != nil {
- keyedLimiter.queueTokens.Resize(int64(val) + limiter.maxQueueLength)
- }
- keyedLimiter.concurrencyTokens.Resize(int64(val))
- }
- })
- return limiter
}
// Limit will limit the concurrency of the limited function f. There are two distinct mechanisms
@@ -199,7 +176,7 @@ func (c *ConcurrencyLimiter) Limit(ctx context.Context, limitingKey string, f Li
)
defer span.Finish()
- if c.currentLimit() <= 0 {
+ if c.maxConcurrencyLimit <= 0 {
return f()
}
@@ -245,15 +222,15 @@ func (c *ConcurrencyLimiter) getConcurrencyLimit(limitingKey string) *keyedConcu
// Set up the queue tokens in case a maximum queue length was requested. As the
// queue tokens are kept during the whole lifetime of the concurrency-limited
// function we add the concurrency tokens to the number of available token.
- var queueTokens *resizableSemaphore
+ var queueTokens chan struct{}
if c.maxQueueLength > 0 {
- queueTokens = NewResizableSemaphore(c.ctx, c.currentLimit()+c.maxQueueLength)
+ queueTokens = make(chan struct{}, c.maxConcurrencyLimit+c.maxQueueLength)
}
c.limitsByKey[limitingKey] = &keyedConcurrencyLimiter{
monitor: c.monitor,
maxQueuedTickerCreator: c.maxQueuedTickerCreator,
- concurrencyTokens: NewResizableSemaphore(c.ctx, c.currentLimit()),
+ concurrencyTokens: make(chan struct{}, c.maxConcurrencyLimit),
queueTokens: queueTokens,
}
}
@@ -291,7 +268,3 @@ func (c *ConcurrencyLimiter) countSemaphores() int {
return len(c.limitsByKey)
}
-
-func (c *ConcurrencyLimiter) currentLimit() int64 {
- return int64(c.limit.Current())
-}
diff --git a/internal/limiter/concurrency_limiter_test.go b/internal/limiter/concurrency_limiter_test.go
index 6ba48d243..f99ca7eab 100644
--- a/internal/limiter/concurrency_limiter_test.go
+++ b/internal/limiter/concurrency_limiter_test.go
@@ -3,7 +3,6 @@ package limiter
import (
"context"
"errors"
- "fmt"
"strconv"
"sync"
"testing"
@@ -87,7 +86,7 @@ func (c *counter) Dropped(_ context.Context, _ string, _ int, _ time.Duration, r
}
}
-func TestLimiter_static(t *testing.T) {
+func TestLimiter(t *testing.T) {
t.Parallel()
tests := []struct {
@@ -156,8 +155,7 @@ func TestLimiter_static(t *testing.T) {
gauge := &counter{}
limiter := NewConcurrencyLimiter(
- ctx,
- NewAdaptiveLimit("staticLimit", AdaptiveSetting{Initial: tt.maxConcurrency}),
+ tt.maxConcurrency,
0,
nil,
gauge,
@@ -231,557 +229,6 @@ func TestLimiter_static(t *testing.T) {
}
}
-func TestLimiter_dynamic(t *testing.T) {
- t.Parallel()
-
- t.Run("increase dynamic limit when there is no queuing request", func(t *testing.T) {
- ctx := testhelper.Context(t)
- limit := NewAdaptiveLimit("dynamicLimit", AdaptiveSetting{Initial: 5, Max: 10, Min: 1})
- gauge := &blockingQueueCounter{queuedCh: make(chan struct{})}
- limiter := NewConcurrencyLimiter(ctx, limit, 10, nil, gauge)
-
- // 5 requests acquired the tokens, the limiter is full now
- release1, waitAfterRelease1 := spawnAndWaitAcquired(t, ctx, "1", limiter, gauge, 5)
- require.Equal(t, 5, gauge.enter)
-
- // Update the limit to 7
- limit.Update(7)
-
- // 2 more requests acquired the token. This proves the limit is expanded
- release2, waitAfterRelease2 := spawnAndWaitAcquired(t, ctx, "1", limiter, gauge, 2)
- require.Equal(t, 7, gauge.enter)
-
- close(release1)
- close(release2)
- waitAfterRelease1()
- waitAfterRelease2()
- require.Equal(t, 7, gauge.exit)
- })
-
- t.Run("decrease dynamic limit when there is no queuing request", func(t *testing.T) {
- ctx := testhelper.Context(t)
- limit := NewAdaptiveLimit("dynamicLimit", AdaptiveSetting{Initial: 5, Max: 10, Min: 1})
- gauge := &blockingQueueCounter{queuedCh: make(chan struct{})}
- limiter := NewConcurrencyLimiter(ctx, limit, 10, nil, gauge)
-
- // 3 requests acquired the tokens, 2 slots left
- release1, waitAfterRelease1 := spawnAndWaitAcquired(t, ctx, "1", limiter, gauge, 3)
- require.Equal(t, 3, gauge.enter)
- require.Equal(t, 3, gauge.queued)
-
- // Update the limit to 3
- limit.Update(3)
-
- // 2 requests are put in queue, meaning the limit shrinks down
- waitAcquired2, release2, waitAfterRelease2 := spawnAndWaitQueued(t, ctx, "1", limiter, gauge, 2)
- require.Equal(t, 3, gauge.enter)
- require.Equal(t, 5, gauge.queued)
-
- // Release first 3 requests
- close(release1)
- waitAfterRelease1()
-
- // Now the last 2 requests can acquire token
- waitAcquired2()
- require.Equal(t, 5, gauge.enter)
- require.Equal(t, 5, gauge.queued)
- require.Equal(t, 3, gauge.exit)
-
- // Release the last patch
- close(release2)
- waitAfterRelease2()
- require.Equal(t, 5, gauge.exit)
- })
-
- t.Run("increase dynamic limit more than the number of queuing requests", func(t *testing.T) {
- ctx := testhelper.Context(t)
- limit := NewAdaptiveLimit("dynamicLimit", AdaptiveSetting{Initial: 5, Max: 10, Min: 1})
- gauge := &blockingQueueCounter{queuedCh: make(chan struct{})}
- limiter := NewConcurrencyLimiter(ctx, limit, 10, nil, gauge)
-
- // 5 requests acquired the tokens, the limiter is full now
- release1, waitAfterRelease1 := spawnAndWaitAcquired(t, ctx, "1", limiter, gauge, 5)
- require.Equal(t, 5, gauge.enter)
-
- // 2 requests waiting in the queue
- waitAcquired2, release2, waitAfterRelease2 := spawnAndWaitQueued(t, ctx, "1", limiter, gauge, 2)
- require.Equal(t, 5, gauge.enter)
- require.Equal(t, 7, gauge.queued)
-
- // Update the limit to 7
- limit.Update(7)
-
- // Wait for the other 2 requests acquired the token. This proves the limiter is expanded
- waitAcquired2()
- require.Equal(t, 7, gauge.enter)
-
- close(release1)
- close(release2)
- waitAfterRelease1()
- waitAfterRelease2()
- require.Equal(t, 7, gauge.exit)
- })
-
- t.Run("increase dynamic limit less than the number of queuing requests", func(t *testing.T) {
- ctx := testhelper.Context(t)
- limit := NewAdaptiveLimit("dynamicLimit", AdaptiveSetting{Initial: 5, Max: 10, Min: 1})
- gauge := &blockingQueueCounter{queuedCh: make(chan struct{})}
- limiter := NewConcurrencyLimiter(ctx, limit, 10, nil, gauge)
-
- // 5 requests acquired the tokens, the limiter is full now
- release1, waitAfterRelease1 := spawnAndWaitAcquired(t, ctx, "1", limiter, gauge, 5)
- require.Equal(t, 5, gauge.enter)
-
- // 2 requests waiting in the queue
- waitAcquired2, release2, waitAfterRelease2 := spawnAndWaitQueued(t, ctx, "1", limiter, gauge, 2)
- require.Equal(t, 5, gauge.enter)
- require.Equal(t, 7, gauge.queued)
-
- // 5 more requests waiting in the queue
- waitAcquired3, release3, waitAfterRelease3 := spawnAndWaitQueued(t, ctx, "1", limiter, gauge, 5)
- require.Equal(t, 5, gauge.enter)
- require.Equal(t, 12, gauge.queued)
-
- // Update the limit to 7.
- limit.Update(7)
-
- // Release first 5 requests, all requests should fit in the queue now.
- close(release1)
- waitAfterRelease1()
- require.Equal(t, 5, gauge.exit)
-
- waitAcquired2()
- waitAcquired3()
- require.Equal(t, 12, gauge.enter)
-
- // Now release all requests
- close(release2)
- close(release3)
- waitAfterRelease2()
- waitAfterRelease3()
- require.Equal(t, 12, gauge.exit)
- })
-
- t.Run("decrease dynamic limit less than the number of concurrent requests", func(t *testing.T) {
- ctx := testhelper.Context(t)
- limit := NewAdaptiveLimit("dynamicLimit", AdaptiveSetting{Initial: 5, Max: 10, Min: 1})
- gauge := &blockingQueueCounter{queuedCh: make(chan struct{})}
- limiter := NewConcurrencyLimiter(ctx, limit, 10, nil, gauge)
-
- // 5 requests acquired the tokens, the limiter is full now
- release1, waitAfterRelease1 := spawnAndWaitAcquired(t, ctx, "1", limiter, gauge, 5)
- require.Equal(t, 5, gauge.enter)
-
- // Update the limit to 3
- limit.Update(3)
-
- // 3 requests are put in queue
- waitAcquired2, release2, waitAfterRelease2 := spawnAndWaitQueued(t, ctx, "1", limiter, gauge, 3)
- require.Equal(t, 5, gauge.enter)
- require.Equal(t, 8, gauge.queued)
-
- // Release the first 5 requests
- close(release1)
- waitAfterRelease1()
- require.Equal(t, 5, gauge.exit)
-
- // Now the last 3 requests acquire the tokens
- waitAcquired2()
- require.Equal(t, 8, gauge.enter)
-
- // 1 more request is put in queue, meaning the limit shrinks down to 3.
- waitAcquired3, release3, waitAfterRelease3 := spawnAndWaitQueued(t, ctx, "1", limiter, gauge, 1)
- require.Equal(t, 8, gauge.enter)
- require.Equal(t, 9, gauge.queued)
-
- // Release the second 3 requests
- close(release2)
- waitAfterRelease2()
- require.Equal(t, 8, gauge.exit)
-
- // The last request acquires the token
- waitAcquired3()
- require.Equal(t, 9, gauge.enter)
-
- // Release the last request
- close(release3)
- waitAfterRelease3()
- require.Equal(t, 9, gauge.exit)
- })
-
- t.Run("increase and decrease dynamic limit multiple times", func(t *testing.T) {
- ctx := testhelper.Context(t)
- limit := NewAdaptiveLimit("dynamicLimit", AdaptiveSetting{Initial: 5, Max: 10, Min: 1})
- gauge := &blockingQueueCounter{queuedCh: make(chan struct{})}
- limiter := NewConcurrencyLimiter(ctx, limit, 10, nil, gauge)
-
- // Update the limit to 7
- limit.Update(7)
-
- // 5 requests acquired the tokens
- release1, waitAfterRelease1 := spawnAndWaitAcquired(t, ctx, "1", limiter, gauge, 5)
- require.Equal(t, 5, gauge.enter)
-
- // Update the limit to 3
- limit.Update(3)
-
- // 3 requests are put in queue
- waitAcquired2, release2, waitAfterRelease2 := spawnAndWaitQueued(t, ctx, "1", limiter, gauge, 3)
- require.Equal(t, 5, gauge.enter)
- require.Equal(t, 8, gauge.queued)
-
- // Update the limit to 10
- limit.Update(10)
-
- // All existing requests acquire the tokens
- waitAcquired2()
- require.Equal(t, 8, gauge.enter)
-
- // 2 more requests
- release3, waitAfterRelease3 := spawnAndWaitAcquired(t, ctx, "1", limiter, gauge, 2)
- require.Equal(t, 10, gauge.enter)
-
- // Update the limit to 1
- limit.Update(1)
-
- // Now release all of them
- close(release1)
- waitAfterRelease1()
- require.Equal(t, 5, gauge.exit)
-
- close(release2)
- waitAfterRelease2()
- require.Equal(t, 8, gauge.exit)
-
- close(release3)
- waitAfterRelease3()
- require.Equal(t, 10, gauge.exit)
- })
-
- t.Run("increase the limit when the queue is full", func(t *testing.T) {
- ctx := testhelper.Context(t)
- limit := NewAdaptiveLimit("dynamicLimit", AdaptiveSetting{Initial: 1, Max: 10, Min: 1})
- gauge := &blockingQueueCounter{queuedCh: make(chan struct{})}
- // Mind the queue length here
- limiter := NewConcurrencyLimiter(ctx, limit, 5, nil, gauge)
-
- // 1 requests acquired the tokens, the limiter is full now
- release1, waitAfterRelease1 := spawnAndWaitAcquired(t, ctx, "1", limiter, gauge, 1)
- require.Equal(t, 1, gauge.enter)
- require.Equal(t, 1, gauge.queued)
-
- // 5 requests queuing for the tokens, the queue is full now
- waitAcquired2, release2, waitAfterRelease2 := spawnAndWaitQueued(t, ctx, "1", limiter, gauge, 5)
- require.Equal(t, 1, gauge.enter)
- require.Equal(t, 6, gauge.queued)
-
- // Limiter rejects new request
- maximumQueueSizeReached(t, ctx, "1", limiter)
-
- // Update the limit
- limit.Update(6)
- waitAcquired2()
- require.Equal(t, 6, gauge.enter)
-
- // 5 requests queuing for the tokens, the queue is full now
- waitAcquired3, release3, waitAfterRelease3 := spawnAndWaitQueued(t, ctx, "1", limiter, gauge, 5)
- require.Equal(t, 6, gauge.enter)
- require.Equal(t, 11, gauge.queued)
-
- // Limiter rejects new request
- maximumQueueSizeReached(t, ctx, "1", limiter)
-
- // Clean up
- close(release1)
- close(release2)
- waitAfterRelease1()
- waitAfterRelease2()
- require.Equal(t, 6, gauge.exit)
-
- waitAcquired3()
- require.Equal(t, 11, gauge.enter)
- close(release3)
- waitAfterRelease3()
- })
-
- t.Run("decrease the limit when the queue is full", func(t *testing.T) {
- ctx := testhelper.Context(t)
- limit := NewAdaptiveLimit("dynamicLimit", AdaptiveSetting{Initial: 5, Max: 10, Min: 1})
- gauge := &blockingQueueCounter{queuedCh: make(chan struct{})}
- // Mind the queue length here
- limiter := NewConcurrencyLimiter(ctx, limit, 3, nil, gauge)
-
- // 5 requests acquired the tokens, the limiter is full now
- release1, waitAfterRelease1 := spawnAndWaitAcquired(t, ctx, "1", limiter, gauge, 5)
- require.Equal(t, 5, gauge.enter)
- require.Equal(t, 5, gauge.queued)
-
- // 5 requests queuing for the tokens, the queue is full now
- waitAcquired2, release2, waitAfterRelease2 := spawnAndWaitQueued(t, ctx, "1", limiter, gauge, 3)
- require.Equal(t, 5, gauge.enter)
- require.Equal(t, 8, gauge.queued)
-
- // Limiter rejects new request
- maximumQueueSizeReached(t, ctx, "1", limiter)
-
- // Update the limit.
- limit.Update(3)
-
- // The queue is still full
- maximumQueueSizeReached(t, ctx, "1", limiter)
-
- // Release first 5 requests and let the last 3 requests in
- close(release1)
- waitAfterRelease1()
- require.Equal(t, 5, gauge.exit)
- waitAcquired2()
- require.Equal(t, 8, gauge.enter)
-
- // Another 5 requests in queue. The queue is still full, meaning the concurrency is 3 and the queue is still 5.
- waitAcquired3, release3, waitAfterRelease3 := spawnAndWaitQueued(t, ctx, "1", limiter, gauge, 3)
- require.Equal(t, 8, gauge.enter)
- require.Equal(t, 11, gauge.queued)
- maximumQueueSizeReached(t, ctx, "1", limiter)
-
- // Clean up
- close(release2)
- waitAfterRelease2()
- require.Equal(t, 8, gauge.exit)
- waitAcquired3()
- require.Equal(t, 11, gauge.enter)
- close(release3)
- waitAfterRelease3()
- require.Equal(t, 11, gauge.exit)
- })
-
- t.Run("dynamic limit works without queuing", func(t *testing.T) {
- ctx := testhelper.Context(t)
- limit := NewAdaptiveLimit("dynamicLimit", AdaptiveSetting{Initial: 5, Max: 10, Min: 1})
- gauge := &blockingQueueCounter{queuedCh: make(chan struct{})}
- // No queue, it means the limiter accepts unlimited requests
- limiter := NewConcurrencyLimiter(ctx, limit, 0, nil, gauge)
-
- // 5 requests acquired the tokens, the limiter is full now
- release1, waitAfterRelease1 := spawnAndWaitAcquired(t, ctx, "1", limiter, gauge, 5)
- require.Equal(t, 5, gauge.enter)
- require.Equal(t, 5, gauge.queued)
-
- // 5 more requests
- waitAcquired2, release2, waitAfterRelease2 := spawnAndWaitQueued(t, ctx, "1", limiter, gauge, 5)
-
- // Update the limit.
- limit.Update(10)
-
- // All of them acquired the tokens
- waitAcquired2()
- require.Equal(t, 10, gauge.enter)
-
- // Clean up
- close(release1)
- close(release2)
- waitAfterRelease1()
- waitAfterRelease2()
- require.Equal(t, 10, gauge.exit)
- })
-
- t.Run("dynamic limit works with queue timer", func(t *testing.T) {
- ctx := testhelper.Context(t)
- limit := NewAdaptiveLimit("dynamicLimit", AdaptiveSetting{Initial: 5, Max: 10, Min: 1})
- gauge := &blockingQueueCounter{queuedCh: make(chan struct{})}
-
- ticker := helper.NewManualTicker()
- limiter := NewConcurrencyLimiter(ctx, limit, 0, func() helper.Ticker { return ticker }, gauge)
-
- // 5 requests acquired the tokens, the limiter is full now
- release1, waitAfterRelease1 := spawnAndWaitAcquired(t, ctx, "1", limiter, gauge, 5)
- require.Equal(t, 5, gauge.enter)
- require.Equal(t, 5, gauge.queued)
-
- errors := make(chan error, 10)
- // 5 requests in queue
- spawnQueuedAndCollectErrors(ctx, "1", limiter, gauge, 5, errors)
-
- // Decrease the limit
- limit.Update(3)
-
- // 5 more requests in queue
- spawnQueuedAndCollectErrors(ctx, "1", limiter, gauge, 5, errors)
-
- // Trigger timeout event
- for i := 0; i < 10; i++ {
- ticker.Tick()
- require.EqualError(t, <-errors, "maximum time in concurrency queue reached")
- }
-
- // Other goroutines exit as normal
- close(release1)
- waitAfterRelease1()
- require.Equal(t, 5, gauge.exit)
- })
-
- t.Run("dynamic limit works with context cancellation", func(t *testing.T) {
- ctx := testhelper.Context(t)
- ctx2, cancel := context.WithCancel(testhelper.Context(t))
-
- limit := NewAdaptiveLimit("dynamicLimit", AdaptiveSetting{Initial: 5, Max: 10, Min: 1})
- gauge := &blockingQueueCounter{queuedCh: make(chan struct{})}
-
- limiter := NewConcurrencyLimiter(ctx, limit, 0, nil, gauge)
-
- // 5 requests acquired the tokens, the limiter is full now
- release1, waitAfterRelease1 := spawnAndWaitAcquired(t, ctx, "1", limiter, gauge, 5)
- require.Equal(t, 5, gauge.enter)
- require.Equal(t, 5, gauge.queued)
-
- errors := make(chan error, 10)
- // 5 requests in queue
- spawnQueuedAndCollectErrors(ctx2, "1", limiter, gauge, 5, errors)
-
- // Decrease the limit
- limit.Update(3)
-
- // 5 more requests in queue
- spawnQueuedAndCollectErrors(ctx2, "1", limiter, gauge, 5, errors)
-
- // Trigger context cancellation
- cancel()
- for i := 0; i < 10; i++ {
- require.EqualError(t, <-errors, "unexpected error when dequeueing request: context canceled")
- }
-
- // Other goroutines exit as normal
- close(release1)
- waitAfterRelease1()
- require.Equal(t, 5, gauge.exit)
- })
-
- t.Run("dynamic limit works with multiple buckets", func(t *testing.T) {
- ctx := testhelper.Context(t)
- limit := NewAdaptiveLimit("dynamicLimit", AdaptiveSetting{Initial: 5, Max: 10, Min: 1})
- gauge := &blockingQueueCounter{queuedCh: make(chan struct{})}
-
- limiter := NewConcurrencyLimiter(ctx, limit, 5, nil, gauge)
-
- var releaseChans []chan struct{}
- var waitAcquireFuncs, waitReleaseFuncs []func()
-
- // 5 * 5 requests acquired tokens
- for i := 1; i <= 5; i++ {
- release, waitAfterRelease := spawnAndWaitAcquired(t, ctx, fmt.Sprintf("%d", i), limiter, gauge, 5)
- releaseChans = append(releaseChans, release)
- waitReleaseFuncs = append(waitReleaseFuncs, waitAfterRelease)
- }
- require.Equal(t, 25, gauge.enter)
-
- // 1 + 2 + 3 + 4 + 5 requests are in queue
- for i := 1; i <= 5; i++ {
- waitAcquired, release, waitAfterRelease := spawnAndWaitQueued(t, ctx, fmt.Sprintf("%d", i), limiter, gauge, i)
- waitAcquireFuncs = append(waitAcquireFuncs, waitAcquired)
- releaseChans = append(releaseChans, release)
- waitReleaseFuncs = append(waitReleaseFuncs, waitAfterRelease)
- }
- require.Equal(t, 25, gauge.enter)
- require.Equal(t, 40, gauge.queued)
-
- // Update limit, enough for all requests
- limit.Update(10)
-
- // All requests acquired tokens now
- for _, wait := range waitAcquireFuncs {
- wait()
- }
- require.Equal(t, 40, gauge.enter)
-
- // Release all
- for _, release := range releaseChans {
- close(release)
- }
- for _, wait := range waitReleaseFuncs {
- wait()
- }
- require.Equal(t, 40, gauge.exit)
- })
-}
-
-// spawnAndWaitAcquired spawns N goroutines that wait for the limiter. They wait until all of them acquire the limiter
-// token before exiting. This function returns a channel to control token release and a function to wait until all
-// goroutines finish.
-func spawnAndWaitAcquired(t *testing.T, ctx context.Context, bucket string, limiter *ConcurrencyLimiter, gauge *blockingQueueCounter, n int) (chan struct{}, func()) {
- var acquireWg, releaseWg sync.WaitGroup
- release := make(chan struct{})
-
- for i := 0; i < n; i++ {
- acquireWg.Add(1)
- releaseWg.Add(1)
- go func() {
- defer releaseWg.Done()
- _, err := limiter.Limit(ctx, bucket, func() (resp interface{}, err error) {
- acquireWg.Done()
- <-release
- return nil, nil
- })
- require.NoError(t, err)
- }()
- }
- for i := 0; i < n; i++ {
- <-gauge.queuedCh
- gauge.queued++
- }
- acquireWg.Wait()
-
- return release, releaseWg.Wait
-}
-
-// spawnAndWaitQueued spawns N goroutines that wait for the limiter. They wait until all of them are queued. This
-// function returns a function to wait for channel to acquired the token, a channel to control token release, and a
-// function to wait until all goroutines finish.
-func spawnAndWaitQueued(t *testing.T, ctx context.Context, bucket string, limiter *ConcurrencyLimiter, gauge *blockingQueueCounter, n int) (func(), chan struct{}, func()) {
- var acquireWg, releaseWg sync.WaitGroup
- release := make(chan struct{})
-
- for i := 0; i < n; i++ {
- acquireWg.Add(1)
- releaseWg.Add(1)
- go func() {
- defer releaseWg.Done()
- _, err := limiter.Limit(ctx, bucket, func() (resp interface{}, err error) {
- acquireWg.Done()
- <-release
- return nil, nil
- })
- require.NoError(t, err)
- }()
- }
- for i := 0; i < n; i++ {
- <-gauge.queuedCh
- gauge.queued++
- }
-
- return acquireWg.Wait, release, releaseWg.Wait
-}
-
-func spawnQueuedAndCollectErrors(ctx context.Context, bucket string, limiter *ConcurrencyLimiter, gauge *blockingQueueCounter, n int, errors chan error) {
- for i := 0; i < n; i++ {
- go func() {
- _, err := limiter.Limit(ctx, bucket, func() (interface{}, error) {
- return nil, fmt.Errorf("should not call")
- })
- errors <- err
- }()
- }
- for i := 0; i < n; i++ {
- <-gauge.queuedCh
- gauge.queued++
- }
-}
-
-func maximumQueueSizeReached(t *testing.T, ctx context.Context, bucket string, limiter *ConcurrencyLimiter) {
- _, err := limiter.Limit(ctx, bucket, func() (interface{}, error) {
- return nil, fmt.Errorf("should not call")
- })
- require.EqualError(t, err, "maximum queue size reached")
-}
-
type blockingQueueCounter struct {
counter
@@ -801,7 +248,7 @@ func TestConcurrencyLimiter_queueLimit(t *testing.T) {
monitorCh := make(chan struct{})
monitor := &blockingQueueCounter{queuedCh: monitorCh}
ch := make(chan struct{})
- limiter := NewConcurrencyLimiter(ctx, NewAdaptiveLimit("staticLimit", AdaptiveSetting{Initial: 1}), queueLimit, nil, monitor)
+ limiter := NewConcurrencyLimiter(1, queueLimit, nil, monitor)
// occupied with one live request that takes a long time to complete
go func() {
@@ -886,8 +333,7 @@ func TestLimitConcurrency_queueWaitTime(t *testing.T) {
monitor := &blockingDequeueCounter{dequeuedCh: dequeuedCh}
limiter := NewConcurrencyLimiter(
- ctx,
- NewAdaptiveLimit("staticLimit", AdaptiveSetting{Initial: 1}),
+ 1,
0,
func() helper.Ticker {
return ticker
diff --git a/internal/limiter/resizable_semaphore.go b/internal/limiter/resizable_semaphore.go
deleted file mode 100644
index 19434afc3..000000000
--- a/internal/limiter/resizable_semaphore.go
+++ /dev/null
@@ -1,157 +0,0 @@
-package limiter
-
-import (
- "context"
- "sync/atomic"
-)
-
-// resizableSemaphore struct models provides a way to bound concurrent access to resources. It allows a certain number
-// of concurrent access to the resources. When the concurrency reaches the semaphore size, the callers are blocked until
-// a resource is available again. The size of the semaphore can be resized freely in an atomic manner.
-//
-// Internally, a separate goroutine manages the semaphore's functionality, providing synchronization across all channel
-// operations. Callers acquire the semaphore by pulling a "token" through an `acquireCh` channel via `acquire()`, and
-// return them to an `releaseCh` channel via `release()`. This goroutine ensures that tokens are properly distributed
-// from those channels, and also manages the semaphore's current length and size. It processes resize requests and
-// handles try requests and responses, thus ensuring a smooth operation.
-//
-// Note: This struct is not intended to serve as a general-purpose data structure but is specifically designed for
-// flexible concurrency control with resizable capacity.
-type resizableSemaphore struct {
- // err stores the error of why the sempahore is stopped. Most of the case, it's due to context cancellation.
- err atomic.Pointer[error]
- // current represents the current concurrency access to the resources.
- current atomic.Int64
- // size is the maximum capacity of the semaphore. It represents the maximum concurrency that the resources can
- // be accessed of the time.
- size atomic.Int64
- // releaseCh is a channel used to return tokens back to the semaphore. When a caller returns a token, it sends a signal to this channel.
- releaseCh chan struct{}
- // acquireCh is a channel used for callers to acquire a token. When a token is available, a signal is sent to this channel.
- acquireCh chan bool
- // tryAcquireCh is a channel used to signal a try request. A try request is a non-blocking request to acquire a token.
- tryAcquireCh chan struct{}
- // tryAcquireRespCh is a channel used to respond to a try request. If a token is available, a nil error is sent; otherwise, an error is sent.
- tryAcquireRespCh chan bool
- // resizeCh is a channel used to request a resize of the semaphore's capacity. The requested new capacity is sent to this channel.
- resizeCh chan int64
- // stopCh is a channel that determines whether the semaphore is stopped.
- stopCh chan struct{}
-}
-
-// NewResizableSemaphore creates and starts a resizableSemaphore
-func NewResizableSemaphore(ctx context.Context, capacity int64) *resizableSemaphore {
- s := &resizableSemaphore{
- stopCh: make(chan struct{}),
- releaseCh: make(chan struct{}),
- acquireCh: make(chan bool),
- tryAcquireCh: make(chan struct{}),
- tryAcquireRespCh: make(chan bool),
- resizeCh: make(chan int64),
- }
- s.size.Store(capacity)
- go s.start(ctx)
-
- return s
-}
-
-// start kicks off a goroutine that maintains the states of the semaphore. It acts as an event loop that listens to
-// all modifications.
-func (q *resizableSemaphore) start(ctx context.Context) {
- for {
- if len := q.current.Load(); len < q.size.Load() {
- select {
- case <-q.releaseCh:
- q.current.Store(len - 1)
- case q.acquireCh <- true:
- q.current.Store(len + 1)
- case <-q.tryAcquireCh:
- q.current.Store(len + 1)
- q.tryAcquireRespCh <- true
- case newSize := <-q.resizeCh:
- // If the new capacity is greater than the prior one, the capacity grows without
- // the need to do anything. It allows more callers to acquire the token in the
- // right next iteration.
- // In contrast, when the new capacity is less than the prior one, the capacity
- // shrinks and the length becomes bigger than the new capacity. That's perfectly
- // fine. All existing callers continue, new callers can't acquire new token. The
- // length will be naturally adjusted below the capacity overtime.
- q.size.Store(newSize)
- case <-ctx.Done():
- q.stop(ctx.Err())
- return
- }
- } else {
- select {
- case <-q.releaseCh:
- q.current.Store(len - 1)
- case <-q.tryAcquireCh:
- q.tryAcquireRespCh <- false
- case newSize := <-q.resizeCh:
- // Simiarly to the above case, overriding the capacity is enough.
- q.size.Store(newSize)
- case <-ctx.Done():
- q.stop(ctx.Err())
- return
- }
- }
- }
-}
-
-func (q *resizableSemaphore) stop(err error) {
- q.current.Store(0)
- q.err.Store(&err)
-
- close(q.stopCh)
- // The only exposed channel. After this channel is closed, caller receives `false` when trying to pull from this
- // channel. Other state-modifying channels stay instact but TryAcquire(), Release(), and Resize() returns
- // immediately.
- close(q.acquireCh)
-}
-
-// stores the error of why the sempahore is stopped. Most of the case, it's due to context cancellation.
-func (q *resizableSemaphore) Err() error {
- return *q.err.Load()
-}
-
-// Acquire returns a channel that allows the caller to acquire the semaphore. The caller is blocked until there
-// is an available resource. It is not safe to use with a `switch` statement having `default` statement. In such
-// use cases, use TryAcquire() instead.
-func (q *resizableSemaphore) Acquire() <-chan bool {
- return q.acquireCh
-}
-
-// TryAcquire acquires the semaphore without blocking. On success, returns true. On failure, returns false and leaves
-// the semaphore unchanged.
-func (q *resizableSemaphore) TryAcquire() bool {
- select {
- case q.tryAcquireCh <- struct{}{}:
- return <-q.tryAcquireRespCh
- case <-q.stopCh:
- return false
- }
-}
-
-// Release releases the semaphore by pushing the token back. If the semaphore stops, this function returns immediately.
-func (q *resizableSemaphore) Release() {
- select {
- case q.releaseCh <- struct{}{}:
- case <-q.stopCh:
- // No op, return immediately.
- }
-}
-
-// Current returns the amount of current concurrent access to the semaphore.
-func (q *resizableSemaphore) Current() int64 {
- return q.current.Load()
-}
-
-// Resize modifies the size of the semaphore. If the semaphore stops, this function returns immediately.
-func (q *resizableSemaphore) Resize(newSize int64) {
- select {
- case q.resizeCh <- newSize:
- case <-q.stopCh:
- // This update is redundant, but it's a nice gesture that the state of semaphore is up-to-date.
- q.size.Store(newSize)
- }
-}
diff --git a/internal/limiter/resizable_semaphore_test.go b/internal/limiter/resizable_semaphore_test.go
deleted file mode 100644
index 00ac90097..000000000
--- a/internal/limiter/resizable_semaphore_test.go
+++ /dev/null
@@ -1,262 +0,0 @@
-package limiter
-
-import (
- "context"
- "sync"
- "testing"
-
- "github.com/stretchr/testify/require"
- "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper"
-)
-
-func TestResizableSemaphore_New(t *testing.T) {
- t.Parallel()
-
- semaphore := NewResizableSemaphore(testhelper.Context(t), 5)
- require.Equal(t, int64(0), semaphore.Current())
-}
-
-func TestResizableSemaphore_Stopped(t *testing.T) {
- t.Parallel()
-
- ctx, cancel := context.WithCancel(testhelper.Context(t))
- semaphore := NewResizableSemaphore(ctx, 5)
-
- // Try to acquire a token of the empty sempahore
- require.True(t, semaphore.TryAcquire())
- semaphore.Release()
-
- // 5 goroutines acquired semaphore
- beforeCallRelease := make(chan struct{})
- var acquireWg1, releaseWg1 sync.WaitGroup
- for i := 0; i < 5; i++ {
- acquireWg1.Add(1)
- releaseWg1.Add(1)
- go func() {
- require.True(t, <-semaphore.Acquire())
- acquireWg1.Done()
-
- <-beforeCallRelease
-
- semaphore.Release()
- releaseWg1.Done()
- }()
- }
- acquireWg1.Wait()
-
- // Another 5 waits for sempahore
- var acquireWg2 sync.WaitGroup
- for i := 0; i < 5; i++ {
- acquireWg2.Add(1)
- go func() {
- // This goroutine is block until the context is cancel, which returns false
- require.False(t, <-semaphore.Acquire())
- acquireWg2.Done()
- }()
- }
-
- // Try to acquire a token of the full semaphore
- require.False(t, semaphore.TryAcquire())
-
- // Cancel the context
- cancel()
- acquireWg2.Wait()
-
- // The first 5 goroutines can call Release() even if the semaphore stopped
- close(beforeCallRelease)
- releaseWg1.Wait()
-
- // The last 5 goroutines exits immediately, Acquire() returns false
- acquireWg2.Wait()
-
- require.False(t, semaphore.TryAcquire())
- require.Equal(t, int64(0), semaphore.Current())
- require.Equal(t, ctx.Err(), semaphore.Err())
-}
-
-func TestResizableSemaphore_Acquire(t *testing.T) {
- t.Parallel()
-
- t.Run("acquire less than the capacity", func(t *testing.T) {
- semaphore := NewResizableSemaphore(testhelper.Context(t), 5)
-
- waitBeforeRelease, waitRelease := acquireSemaphore(t, semaphore, 4)
- require.NotNil(t, <-semaphore.Acquire())
-
- close(waitBeforeRelease)
- waitRelease()
- })
-
- t.Run("acquire more than the capacity", func(t *testing.T) {
- semaphore := NewResizableSemaphore(testhelper.Context(t), 5)
-
- waitBeforeRelease, waitRelease := acquireSemaphore(t, semaphore, 5)
- require.False(t, semaphore.TryAcquire())
-
- close(waitBeforeRelease)
- waitRelease()
- })
-
- t.Run("semaphore is full then available again", func(t *testing.T) {
- semaphore := NewResizableSemaphore(testhelper.Context(t), 5)
-
- waitChan := make(chan bool)
- _, _ = acquireSemaphore(t, semaphore, 5)
-
- go func() {
- waitChan <- <-semaphore.Acquire()
- }()
-
- // The semaphore is full now
- require.False(t, semaphore.TryAcquire())
-
- // Release one token
- semaphore.Release()
- // The waiting channel is unlocked
- require.True(t, <-waitChan)
-
- // Release another token
- semaphore.Release()
- // Now TryAcquire can pull out a token
- require.True(t, semaphore.TryAcquire())
- })
-
- t.Run("the semaphore is resized up when empty", func(t *testing.T) {
- semaphore := NewResizableSemaphore(testhelper.Context(t), 5)
- semaphore.Resize(10)
-
- waitBeforeRelease, waitRelease := acquireSemaphore(t, semaphore, 9)
- require.NotNil(t, <-semaphore.Acquire())
-
- close(waitBeforeRelease)
- waitRelease()
- })
-
- t.Run("the semaphore is resized up when not empty", func(t *testing.T) {
- semaphore := NewResizableSemaphore(testhelper.Context(t), 7)
-
- waitBeforeRelease1, waitRelease1 := acquireSemaphore(t, semaphore, 5)
- semaphore.Resize(15)
- waitBeforeRelease2, waitRelease2 := acquireSemaphore(t, semaphore, 5)
-
- require.NotNil(t, <-semaphore.Acquire())
-
- close(waitBeforeRelease1)
- close(waitBeforeRelease2)
- waitRelease1()
- waitRelease2()
- })
-
- t.Run("the semaphore is resized up when full", func(t *testing.T) {
- semaphore := NewResizableSemaphore(testhelper.Context(t), 5)
-
- waitBeforeRelease1, waitRelease1 := acquireSemaphore(t, semaphore, 5)
- require.False(t, semaphore.TryAcquire())
-
- semaphore.Resize(10)
-
- waitBeforeRelease2, waitRelease2 := acquireSemaphore(t, semaphore, 5)
- require.False(t, semaphore.TryAcquire())
-
- var wg sync.WaitGroup
- for i := 0; i < 5; i++ {
- wg.Add(1)
- go func() {
- <-semaphore.Acquire()
- wg.Done()
- semaphore.Release()
- }()
- }
-
- semaphore.Resize(15)
- wg.Wait()
-
- close(waitBeforeRelease1)
- close(waitBeforeRelease2)
- waitRelease1()
- waitRelease2()
- })
-
- t.Run("the semaphore is resized down when empty", func(t *testing.T) {
- semaphore := NewResizableSemaphore(testhelper.Context(t), 10)
- semaphore.Resize(5)
-
- waitBeforeRelease, waitRelease := acquireSemaphore(t, semaphore, 4)
- require.NotNil(t, <-semaphore.Acquire())
-
- close(waitBeforeRelease)
- waitRelease()
- })
-
- t.Run("the semaphore is resized down when not empty", func(t *testing.T) {
- semaphore := NewResizableSemaphore(testhelper.Context(t), 20)
-
- waitBeforeRelease1, waitRelease1 := acquireSemaphore(t, semaphore, 5)
- semaphore.Resize(15)
- waitBeforeRelease2, waitRelease2 := acquireSemaphore(t, semaphore, 5)
-
- require.NotNil(t, <-semaphore.Acquire())
-
- close(waitBeforeRelease1)
- close(waitBeforeRelease2)
- waitRelease1()
- waitRelease2()
- })
-
- t.Run("the semaphore is resized down lower than the current length", func(t *testing.T) {
- semaphore := NewResizableSemaphore(testhelper.Context(t), 10)
-
- waitBeforeRelease1, waitRelease1 := acquireSemaphore(t, semaphore, 5)
-
- semaphore.Resize(3)
-
- require.False(t, semaphore.TryAcquire())
- close(waitBeforeRelease1)
- waitRelease1()
-
- waitBeforeRelease2, waitRelease2 := acquireSemaphore(t, semaphore, 3)
- require.False(t, semaphore.TryAcquire())
-
- close(waitBeforeRelease2)
- waitRelease2()
- })
-
- t.Run("the semaphore is resized down when full", func(t *testing.T) {
- semaphore := NewResizableSemaphore(testhelper.Context(t), 10)
-
- waitBeforeRelease1, waitRelease1 := acquireSemaphore(t, semaphore, 10)
-
- semaphore.Resize(5)
-
- require.False(t, semaphore.TryAcquire())
- close(waitBeforeRelease1)
- waitRelease1()
-
- waitBeforeRelease2, waitRelease2 := acquireSemaphore(t, semaphore, 5)
- require.False(t, semaphore.TryAcquire())
-
- close(waitBeforeRelease2)
- waitRelease2()
- })
-}
-
-func acquireSemaphore(t *testing.T, semaphore *resizableSemaphore, n int) (chan struct{}, func()) {
- var acquireWg, releaseWg sync.WaitGroup
- waitBeforeRelease := make(chan struct{})
-
- for i := 0; i < n; i++ {
- acquireWg.Add(1)
- releaseWg.Add(1)
- go func() {
- require.True(t, <-semaphore.Acquire())
- acquireWg.Done()
-
- <-waitBeforeRelease
- semaphore.Release()
- releaseWg.Done()
- }()
- }
- acquireWg.Wait()
-
- return waitBeforeRelease, releaseWg.Wait
-}
diff --git a/internal/testhelper/testserver/gitaly.go b/internal/testhelper/testserver/gitaly.go
index 6352fcfe0..51d29bcdc 100644
--- a/internal/testhelper/testserver/gitaly.go
+++ b/internal/testhelper/testserver/gitaly.go
@@ -161,8 +161,6 @@ func waitHealthy(tb testing.TB, ctx context.Context, addr string, authToken stri
func runGitaly(tb testing.TB, cfg config.Cfg, registrar func(srv *grpc.Server, deps *service.Dependencies), opts ...GitalyServerOpt) (*grpc.Server, string, bool) {
tb.Helper()
- ctx := testhelper.Context(tb)
-
var gsd gitalyServerDeps
for _, opt := range opts {
gsd = opt(gsd)
@@ -175,7 +173,7 @@ func runGitaly(tb testing.TB, cfg config.Cfg, registrar func(srv *grpc.Server, d
server.WithStreamInterceptor(StructErrStreamInterceptor),
}
- deps := gsd.createDependencies(tb, ctx, cfg)
+ deps := gsd.createDependencies(tb, cfg)
tb.Cleanup(func() { gsd.conns.Close() })
serverFactory := server.NewGitalyServerFactory(
@@ -203,6 +201,7 @@ func runGitaly(tb testing.TB, cfg config.Cfg, registrar func(srv *grpc.Server, d
assert.NoError(tb, internalServer.Serve(internalListener), "failure to serve internal gRPC")
}()
+ ctx := testhelper.Context(tb)
waitHealthy(tb, ctx, "unix://"+internalListener.Addr().String(), cfg.Auth.Token)
}
@@ -239,6 +238,7 @@ func runGitaly(tb testing.TB, cfg config.Cfg, registrar func(srv *grpc.Server, d
assert.NoError(tb, externalServer.Serve(listener), "failure to serve external gRPC")
}()
+ ctx := testhelper.Context(tb)
waitHealthy(tb, ctx, addr, cfg.Auth.Token)
return externalServer, addr, gsd.disablePraefect
@@ -276,7 +276,7 @@ type gitalyServerDeps struct {
signingKey string
}
-func (gsd *gitalyServerDeps) createDependencies(tb testing.TB, ctx context.Context, cfg config.Cfg) *service.Dependencies {
+func (gsd *gitalyServerDeps) createDependencies(tb testing.TB, cfg config.Cfg) *service.Dependencies {
if gsd.logger == nil {
gsd.logger = testhelper.NewGitalyServerLogger(tb)
}
@@ -328,8 +328,7 @@ func (gsd *gitalyServerDeps) createDependencies(tb testing.TB, ctx context.Conte
if gsd.packObjectsLimiter == nil {
gsd.packObjectsLimiter = limiter.NewConcurrencyLimiter(
- ctx,
- limiter.NewAdaptiveLimit("staticLimit", limiter.AdaptiveSetting{Initial: 0}),
+ 0,
0,
nil,
limiter.NewNoopConcurrencyMonitor(),
@@ -337,7 +336,7 @@ func (gsd *gitalyServerDeps) createDependencies(tb testing.TB, ctx context.Conte
}
if gsd.limitHandler == nil {
- gsd.limitHandler = limithandler.New(cfg, limithandler.LimitConcurrencyByRepo, limithandler.WithConcurrencyLimiters(ctx))
+ gsd.limitHandler = limithandler.New(cfg, limithandler.LimitConcurrencyByRepo, limithandler.WithConcurrencyLimiters)
}
if gsd.repositoryCounter == nil {