Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorQuang-Minh Nguyen <qmnguyen@gitlab.com>2023-08-02 13:48:19 +0300
committerQuang-Minh Nguyen <qmnguyen@gitlab.com>2023-08-17 11:56:42 +0300
commitae948744cd840979c495d8b54404191d23bc1f68 (patch)
tree90bf78067633702eef5862340a9dec905735831f
parentf15ee0cc570d1eed3578896a8ada80a78dd4daca (diff)
limiter: Add adaptive limit to the concurrency limiterqmnguyen0711/add-adaptive-limit
This commit adds the adaptive limit to limiter.ConcurrencyLimiter. It changes the signature of the initializer to receive an AdaptiveLimit object and converts the queues in the limiter to dynamic pools. At this point, as the calculator has not been integrated (yet), the input limit never changes. It always returns a persistent limit, which is a perfect replacement for the integer static limit. In the future, when the calculator kicks in, the dynamic pools in the concurrency limiter scale up and down accordingly when the current limit is changed by the calculator.
-rw-r--r--internal/cli/gitaly/serve.go7
-rw-r--r--internal/gitaly/service/hook/pack_objects_test.go2
-rw-r--r--internal/grpc/middleware/limithandler/middleware.go4
-rw-r--r--internal/limiter/concurrency_limiter.go95
-rw-r--r--internal/limiter/concurrency_limiter_test.go560
-rw-r--r--internal/testhelper/testserver/gitaly.go2
6 files changed, 625 insertions, 45 deletions
diff --git a/internal/cli/gitaly/serve.go b/internal/cli/gitaly/serve.go
index 0b13c2a2a..302fd880d 100644
--- a/internal/cli/gitaly/serve.go
+++ b/internal/cli/gitaly/serve.go
@@ -271,6 +271,11 @@ func run(cfg config.Cfg, logger logrus.FieldLogger) error {
return fmt.Errorf("disk cache walkers: %w", err)
}
+ // The pack-objects limit below is static at this stage. It's always equal to the initial limit, which uses
+ // MaxConcurrency config.
+ packObjectLimit := limiter.NewAdaptiveLimit("packObjects", limiter.AdaptiveSetting{
+ Initial: cfg.PackObjectsLimiting.MaxConcurrency,
+ })
concurrencyLimitHandler := limithandler.New(
cfg,
limithandler.LimitConcurrencyByRepo,
@@ -296,7 +301,7 @@ func run(cfg config.Cfg, logger logrus.FieldLogger) error {
}
packObjectsLimiter := limiter.NewConcurrencyLimiter(
ctx,
- cfg.PackObjectsLimiting.MaxConcurrency,
+ packObjectLimit,
cfg.PackObjectsLimiting.MaxQueueLength,
newTickerFunc,
packObjectsMonitor,
diff --git a/internal/gitaly/service/hook/pack_objects_test.go b/internal/gitaly/service/hook/pack_objects_test.go
index 35dffadbc..62fa0dca8 100644
--- a/internal/gitaly/service/hook/pack_objects_test.go
+++ b/internal/gitaly/service/hook/pack_objects_test.go
@@ -859,7 +859,7 @@ func TestPackObjects_concurrencyLimit(t *testing.T) {
)
limiter := limiter.NewConcurrencyLimiter(
ctx,
- 1,
+ limiter.NewAdaptiveLimit("staticLimit", limiter.AdaptiveSetting{Initial: 1}),
0,
func() helper.Ticker { return ticker },
monitor,
diff --git a/internal/grpc/middleware/limithandler/middleware.go b/internal/grpc/middleware/limithandler/middleware.go
index ebbe44662..a6ed93f9a 100644
--- a/internal/grpc/middleware/limithandler/middleware.go
+++ b/internal/grpc/middleware/limithandler/middleware.go
@@ -218,7 +218,7 @@ func WithConcurrencyLimiters(ctx context.Context) SetupFunc {
result[limit.RPC] = limiter.NewConcurrencyLimiter(
ctx,
- limit.MaxPerRepo,
+ limiter.NewAdaptiveLimit("staticLimit", limiter.AdaptiveSetting{Initial: limit.MaxPerRepo}),
limit.MaxQueueSize,
newTickerFunc,
limiter.NewPerRPCPromMonitor(
@@ -233,7 +233,7 @@ func WithConcurrencyLimiters(ctx context.Context) SetupFunc {
if _, ok := result[replicateRepositoryFullMethod]; !ok {
result[replicateRepositoryFullMethod] = limiter.NewConcurrencyLimiter(
ctx,
- 1,
+ limiter.NewAdaptiveLimit("staticLimit", limiter.AdaptiveSetting{Initial: 1}),
0,
func() helper.Ticker {
return helper.NewManualTicker()
diff --git a/internal/limiter/concurrency_limiter.go b/internal/limiter/concurrency_limiter.go
index 32b08b32e..b4284adeb 100644
--- a/internal/limiter/concurrency_limiter.go
+++ b/internal/limiter/concurrency_limiter.go
@@ -41,10 +41,10 @@ type keyedConcurrencyLimiter struct {
// concurrencyTokens is the channel of available concurrency tokens, where every token
// allows one concurrent call to the concurrency-limited function.
- concurrencyTokens chan struct{}
+ concurrencyTokens *resizableSemaphore
// queueTokens is the channel of available queue tokens, where every token allows one
// concurrent call to be admitted to the queue.
- queueTokens chan struct{}
+ queueTokens *resizableSemaphore
}
// acquire tries to acquire the semaphore. It may fail if the admission queue is full or if the max
@@ -55,34 +55,33 @@ func (sem *keyedConcurrencyLimiter) acquire(ctx context.Context, limitingKey str
// callers may wait for the concurrency token at the same time. If there are no more
// queueing tokens then this indicates that the queue is full and we thus return an
// error immediately.
- select {
- case sem.queueTokens <- struct{}{}:
- // We have acquired a queueing token, so we need to release it if acquiring
- // the concurrency token fails. If we succeed to acquire the concurrency
- // token though then we retain the queueing token until the caller signals
- // that the concurrency-limited function has finished. As a consequence the
- // queue token is returned together with the concurrency token.
- //
- // A simpler model would be to just have `maxQueueLength` many queueing
- // tokens. But this would add concurrency-limiting when acquiring the queue
- // token itself, which is not what we want to do. Instead, we want to admit
- // as many callers into the queue as the queue length permits plus the
- // number of available concurrency tokens allows.
- defer func() {
- if returnedErr != nil {
- <-sem.queueTokens
- }
- }()
- default:
+ if !sem.queueTokens.TryAcquire() {
return ErrMaxQueueSize
}
+
+ // We have acquired a queueing token, so we need to release it if acquiring
+ // the concurrency token fails. If we successfully acquire the concurrency
+ // token though then we retain the queueing token until the caller signals
+ // that the concurrency-limited function has finished. As a consequence the
+ // queue token is returned together with the concurrency token.
+ //
+ // A simpler model would be to just have `maxQueueLength` many queueing
+ // tokens. But this would add concurrency-limiting when acquiring the queue
+ // token itself, which is not what we want to do. Instead, we want to admit
+ // as many callers into the queue as the queue length permits plus the
+ // number of available concurrency tokens allows.
+ defer func() {
+ if returnedErr != nil {
+ sem.queueTokens.Release()
+ }
+ }()
}
// We are queued now, so let's tell the monitor. Furthermore, even though we're still
// holding the queueing token when this function exits successfully we also tell the monitor
// that we have exited the queue. It is only an implementation detail anyway that we hold on
// to the token, so the monitor shouldn't care about that.
- sem.monitor.Queued(ctx, limitingKey, len(sem.queueTokens))
+ sem.monitor.Queued(ctx, limitingKey, sem.queueLength())
defer sem.monitor.Dequeued(ctx)
// Set up the ticker that keeps us from waiting indefinitely on the concurrency token.
@@ -98,7 +97,12 @@ func (sem *keyedConcurrencyLimiter) acquire(ctx context.Context, limitingKey str
// Try to acquire the concurrency token now that we're in the queue.
select {
- case sem.concurrencyTokens <- struct{}{}:
+ case acquired := <-sem.concurrencyTokens.Acquire():
+ // When the semaphore returns false, the semaphore was stopped. It's likely due to the context is
+ // cancelled. Hence, we should return the error here.
+ if !acquired {
+ return sem.concurrencyTokens.Err()
+ }
return nil
case <-ticker.C():
return ErrMaxQueueTime
@@ -110,14 +114,17 @@ func (sem *keyedConcurrencyLimiter) acquire(ctx context.Context, limitingKey str
// release releases the acquired tokens.
func (sem *keyedConcurrencyLimiter) release() {
if sem.queueTokens != nil {
- <-sem.queueTokens
+ sem.queueTokens.Release()
}
- <-sem.concurrencyTokens
+ sem.concurrencyTokens.Release()
}
// queueLength returns the length of token queue
func (sem *keyedConcurrencyLimiter) queueLength() int {
- return len(sem.queueTokens)
+ if sem.queueTokens == nil {
+ return 0
+ }
+ return int(sem.queueTokens.Current())
}
// ConcurrencyLimiter contains rate limiter state.
@@ -125,9 +132,9 @@ type ConcurrencyLimiter struct {
// ctx stores the context at initialization. This context is used as a stopping condition
// for some internal goroutines.
ctx context.Context
- // maxConcurrencyLimit is the maximum number of concurrent calls to the limited function.
- // This limit is per key.
- maxConcurrencyLimit int64
+ // limit is the adaptive maximum number of concurrent calls to the limited function. This limit is
+ // calculated adaptively from an outside calculator.
+ limit *AdaptiveLimit
// maxQueueLength is the maximum number of operations allowed to wait in a queued state.
// This limit is global and applies before the concurrency limit. Subsequent incoming
// operations will be rejected with an error immediately.
@@ -148,19 +155,31 @@ type ConcurrencyLimiter struct {
}
// NewConcurrencyLimiter creates a new concurrency rate limiter.
-func NewConcurrencyLimiter(ctx context.Context, maxConcurrencyLimit, maxQueueLength int, maxQueuedTickerCreator QueueTickerCreator, monitor ConcurrencyMonitor) *ConcurrencyLimiter {
+func NewConcurrencyLimiter(ctx context.Context, limit *AdaptiveLimit, maxQueueLength int, maxQueuedTickerCreator QueueTickerCreator, monitor ConcurrencyMonitor) *ConcurrencyLimiter {
if monitor == nil {
monitor = NewNoopConcurrencyMonitor()
}
- return &ConcurrencyLimiter{
+ limiter := &ConcurrencyLimiter{
ctx: ctx,
- maxConcurrencyLimit: int64(maxConcurrencyLimit),
+ limit: limit,
maxQueueLength: int64(maxQueueLength),
maxQueuedTickerCreator: maxQueuedTickerCreator,
monitor: monitor,
limitsByKey: make(map[string]*keyedConcurrencyLimiter),
}
+
+ // When the capacity of the limiter is updated we also need to update the size of both the queuing tokens as
+ // well as the concurrency tokens to match the new size.
+ limit.AfterUpdate(func(val int) {
+ for _, keyedLimiter := range limiter.limitsByKey {
+ if keyedLimiter.queueTokens != nil {
+ keyedLimiter.queueTokens.Resize(int64(val) + limiter.maxQueueLength)
+ }
+ keyedLimiter.concurrencyTokens.Resize(int64(val))
+ }
+ })
+ return limiter
}
// Limit will limit the concurrency of the limited function f. There are two distinct mechanisms
@@ -180,7 +199,7 @@ func (c *ConcurrencyLimiter) Limit(ctx context.Context, limitingKey string, f Li
)
defer span.Finish()
- if c.maxConcurrencyLimit <= 0 {
+ if c.currentLimit() <= 0 {
return f()
}
@@ -226,15 +245,15 @@ func (c *ConcurrencyLimiter) getConcurrencyLimit(limitingKey string) *keyedConcu
// Set up the queue tokens in case a maximum queue length was requested. As the
// queue tokens are kept during the whole lifetime of the concurrency-limited
// function we add the concurrency tokens to the number of available token.
- var queueTokens chan struct{}
+ var queueTokens *resizableSemaphore
if c.maxQueueLength > 0 {
- queueTokens = make(chan struct{}, c.maxConcurrencyLimit+c.maxQueueLength)
+ queueTokens = NewResizableSemaphore(c.ctx, c.currentLimit()+c.maxQueueLength)
}
c.limitsByKey[limitingKey] = &keyedConcurrencyLimiter{
monitor: c.monitor,
maxQueuedTickerCreator: c.maxQueuedTickerCreator,
- concurrencyTokens: make(chan struct{}, c.maxConcurrencyLimit),
+ concurrencyTokens: NewResizableSemaphore(c.ctx, c.currentLimit()),
queueTokens: queueTokens,
}
}
@@ -272,3 +291,7 @@ func (c *ConcurrencyLimiter) countSemaphores() int {
return len(c.limitsByKey)
}
+
+func (c *ConcurrencyLimiter) currentLimit() int64 {
+ return int64(c.limit.Current())
+}
diff --git a/internal/limiter/concurrency_limiter_test.go b/internal/limiter/concurrency_limiter_test.go
index b650ae5f5..6ba48d243 100644
--- a/internal/limiter/concurrency_limiter_test.go
+++ b/internal/limiter/concurrency_limiter_test.go
@@ -3,6 +3,7 @@ package limiter
import (
"context"
"errors"
+ "fmt"
"strconv"
"sync"
"testing"
@@ -86,7 +87,7 @@ func (c *counter) Dropped(_ context.Context, _ string, _ int, _ time.Duration, r
}
}
-func TestLimiter(t *testing.T) {
+func TestLimiter_static(t *testing.T) {
t.Parallel()
tests := []struct {
@@ -156,7 +157,7 @@ func TestLimiter(t *testing.T) {
limiter := NewConcurrencyLimiter(
ctx,
- tt.maxConcurrency,
+ NewAdaptiveLimit("staticLimit", AdaptiveSetting{Initial: tt.maxConcurrency}),
0,
nil,
gauge,
@@ -230,6 +231,557 @@ func TestLimiter(t *testing.T) {
}
}
+func TestLimiter_dynamic(t *testing.T) {
+ t.Parallel()
+
+ t.Run("increase dynamic limit when there is no queuing request", func(t *testing.T) {
+ ctx := testhelper.Context(t)
+ limit := NewAdaptiveLimit("dynamicLimit", AdaptiveSetting{Initial: 5, Max: 10, Min: 1})
+ gauge := &blockingQueueCounter{queuedCh: make(chan struct{})}
+ limiter := NewConcurrencyLimiter(ctx, limit, 10, nil, gauge)
+
+ // 5 requests acquired the tokens, the limiter is full now
+ release1, waitAfterRelease1 := spawnAndWaitAcquired(t, ctx, "1", limiter, gauge, 5)
+ require.Equal(t, 5, gauge.enter)
+
+ // Update the limit to 7
+ limit.Update(7)
+
+ // 2 more requests acquired the token. This proves the limit is expanded
+ release2, waitAfterRelease2 := spawnAndWaitAcquired(t, ctx, "1", limiter, gauge, 2)
+ require.Equal(t, 7, gauge.enter)
+
+ close(release1)
+ close(release2)
+ waitAfterRelease1()
+ waitAfterRelease2()
+ require.Equal(t, 7, gauge.exit)
+ })
+
+ t.Run("decrease dynamic limit when there is no queuing request", func(t *testing.T) {
+ ctx := testhelper.Context(t)
+ limit := NewAdaptiveLimit("dynamicLimit", AdaptiveSetting{Initial: 5, Max: 10, Min: 1})
+ gauge := &blockingQueueCounter{queuedCh: make(chan struct{})}
+ limiter := NewConcurrencyLimiter(ctx, limit, 10, nil, gauge)
+
+ // 3 requests acquired the tokens, 2 slots left
+ release1, waitAfterRelease1 := spawnAndWaitAcquired(t, ctx, "1", limiter, gauge, 3)
+ require.Equal(t, 3, gauge.enter)
+ require.Equal(t, 3, gauge.queued)
+
+ // Update the limit to 3
+ limit.Update(3)
+
+ // 2 requests are put in queue, meaning the limit shrinks down
+ waitAcquired2, release2, waitAfterRelease2 := spawnAndWaitQueued(t, ctx, "1", limiter, gauge, 2)
+ require.Equal(t, 3, gauge.enter)
+ require.Equal(t, 5, gauge.queued)
+
+ // Release first 3 requests
+ close(release1)
+ waitAfterRelease1()
+
+ // Now the last 2 requests can acquire token
+ waitAcquired2()
+ require.Equal(t, 5, gauge.enter)
+ require.Equal(t, 5, gauge.queued)
+ require.Equal(t, 3, gauge.exit)
+
+ // Release the last patch
+ close(release2)
+ waitAfterRelease2()
+ require.Equal(t, 5, gauge.exit)
+ })
+
+ t.Run("increase dynamic limit more than the number of queuing requests", func(t *testing.T) {
+ ctx := testhelper.Context(t)
+ limit := NewAdaptiveLimit("dynamicLimit", AdaptiveSetting{Initial: 5, Max: 10, Min: 1})
+ gauge := &blockingQueueCounter{queuedCh: make(chan struct{})}
+ limiter := NewConcurrencyLimiter(ctx, limit, 10, nil, gauge)
+
+ // 5 requests acquired the tokens, the limiter is full now
+ release1, waitAfterRelease1 := spawnAndWaitAcquired(t, ctx, "1", limiter, gauge, 5)
+ require.Equal(t, 5, gauge.enter)
+
+ // 2 requests waiting in the queue
+ waitAcquired2, release2, waitAfterRelease2 := spawnAndWaitQueued(t, ctx, "1", limiter, gauge, 2)
+ require.Equal(t, 5, gauge.enter)
+ require.Equal(t, 7, gauge.queued)
+
+ // Update the limit to 7
+ limit.Update(7)
+
+ // Wait for the other 2 requests acquired the token. This proves the limiter is expanded
+ waitAcquired2()
+ require.Equal(t, 7, gauge.enter)
+
+ close(release1)
+ close(release2)
+ waitAfterRelease1()
+ waitAfterRelease2()
+ require.Equal(t, 7, gauge.exit)
+ })
+
+ t.Run("increase dynamic limit less than the number of queuing requests", func(t *testing.T) {
+ ctx := testhelper.Context(t)
+ limit := NewAdaptiveLimit("dynamicLimit", AdaptiveSetting{Initial: 5, Max: 10, Min: 1})
+ gauge := &blockingQueueCounter{queuedCh: make(chan struct{})}
+ limiter := NewConcurrencyLimiter(ctx, limit, 10, nil, gauge)
+
+ // 5 requests acquired the tokens, the limiter is full now
+ release1, waitAfterRelease1 := spawnAndWaitAcquired(t, ctx, "1", limiter, gauge, 5)
+ require.Equal(t, 5, gauge.enter)
+
+ // 2 requests waiting in the queue
+ waitAcquired2, release2, waitAfterRelease2 := spawnAndWaitQueued(t, ctx, "1", limiter, gauge, 2)
+ require.Equal(t, 5, gauge.enter)
+ require.Equal(t, 7, gauge.queued)
+
+ // 5 more requests waiting in the queue
+ waitAcquired3, release3, waitAfterRelease3 := spawnAndWaitQueued(t, ctx, "1", limiter, gauge, 5)
+ require.Equal(t, 5, gauge.enter)
+ require.Equal(t, 12, gauge.queued)
+
+ // Update the limit to 7.
+ limit.Update(7)
+
+ // Release first 5 requests, all requests should fit in the queue now.
+ close(release1)
+ waitAfterRelease1()
+ require.Equal(t, 5, gauge.exit)
+
+ waitAcquired2()
+ waitAcquired3()
+ require.Equal(t, 12, gauge.enter)
+
+ // Now release all requests
+ close(release2)
+ close(release3)
+ waitAfterRelease2()
+ waitAfterRelease3()
+ require.Equal(t, 12, gauge.exit)
+ })
+
+ t.Run("decrease dynamic limit less than the number of concurrent requests", func(t *testing.T) {
+ ctx := testhelper.Context(t)
+ limit := NewAdaptiveLimit("dynamicLimit", AdaptiveSetting{Initial: 5, Max: 10, Min: 1})
+ gauge := &blockingQueueCounter{queuedCh: make(chan struct{})}
+ limiter := NewConcurrencyLimiter(ctx, limit, 10, nil, gauge)
+
+ // 5 requests acquired the tokens, the limiter is full now
+ release1, waitAfterRelease1 := spawnAndWaitAcquired(t, ctx, "1", limiter, gauge, 5)
+ require.Equal(t, 5, gauge.enter)
+
+ // Update the limit to 3
+ limit.Update(3)
+
+ // 3 requests are put in queue
+ waitAcquired2, release2, waitAfterRelease2 := spawnAndWaitQueued(t, ctx, "1", limiter, gauge, 3)
+ require.Equal(t, 5, gauge.enter)
+ require.Equal(t, 8, gauge.queued)
+
+ // Release the first 5 requests
+ close(release1)
+ waitAfterRelease1()
+ require.Equal(t, 5, gauge.exit)
+
+ // Now the last 3 requests acquire the tokens
+ waitAcquired2()
+ require.Equal(t, 8, gauge.enter)
+
+ // 1 more request is put in queue, meaning the limit shrinks down to 3.
+ waitAcquired3, release3, waitAfterRelease3 := spawnAndWaitQueued(t, ctx, "1", limiter, gauge, 1)
+ require.Equal(t, 8, gauge.enter)
+ require.Equal(t, 9, gauge.queued)
+
+ // Release the second 3 requests
+ close(release2)
+ waitAfterRelease2()
+ require.Equal(t, 8, gauge.exit)
+
+ // The last request acquires the token
+ waitAcquired3()
+ require.Equal(t, 9, gauge.enter)
+
+ // Release the last request
+ close(release3)
+ waitAfterRelease3()
+ require.Equal(t, 9, gauge.exit)
+ })
+
+ t.Run("increase and decrease dynamic limit multiple times", func(t *testing.T) {
+ ctx := testhelper.Context(t)
+ limit := NewAdaptiveLimit("dynamicLimit", AdaptiveSetting{Initial: 5, Max: 10, Min: 1})
+ gauge := &blockingQueueCounter{queuedCh: make(chan struct{})}
+ limiter := NewConcurrencyLimiter(ctx, limit, 10, nil, gauge)
+
+ // Update the limit to 7
+ limit.Update(7)
+
+ // 5 requests acquired the tokens
+ release1, waitAfterRelease1 := spawnAndWaitAcquired(t, ctx, "1", limiter, gauge, 5)
+ require.Equal(t, 5, gauge.enter)
+
+ // Update the limit to 3
+ limit.Update(3)
+
+ // 3 requests are put in queue
+ waitAcquired2, release2, waitAfterRelease2 := spawnAndWaitQueued(t, ctx, "1", limiter, gauge, 3)
+ require.Equal(t, 5, gauge.enter)
+ require.Equal(t, 8, gauge.queued)
+
+ // Update the limit to 10
+ limit.Update(10)
+
+ // All existing requests acquire the tokens
+ waitAcquired2()
+ require.Equal(t, 8, gauge.enter)
+
+ // 2 more requests
+ release3, waitAfterRelease3 := spawnAndWaitAcquired(t, ctx, "1", limiter, gauge, 2)
+ require.Equal(t, 10, gauge.enter)
+
+ // Update the limit to 1
+ limit.Update(1)
+
+ // Now release all of them
+ close(release1)
+ waitAfterRelease1()
+ require.Equal(t, 5, gauge.exit)
+
+ close(release2)
+ waitAfterRelease2()
+ require.Equal(t, 8, gauge.exit)
+
+ close(release3)
+ waitAfterRelease3()
+ require.Equal(t, 10, gauge.exit)
+ })
+
+ t.Run("increase the limit when the queue is full", func(t *testing.T) {
+ ctx := testhelper.Context(t)
+ limit := NewAdaptiveLimit("dynamicLimit", AdaptiveSetting{Initial: 1, Max: 10, Min: 1})
+ gauge := &blockingQueueCounter{queuedCh: make(chan struct{})}
+ // Mind the queue length here
+ limiter := NewConcurrencyLimiter(ctx, limit, 5, nil, gauge)
+
+ // 1 requests acquired the tokens, the limiter is full now
+ release1, waitAfterRelease1 := spawnAndWaitAcquired(t, ctx, "1", limiter, gauge, 1)
+ require.Equal(t, 1, gauge.enter)
+ require.Equal(t, 1, gauge.queued)
+
+ // 5 requests queuing for the tokens, the queue is full now
+ waitAcquired2, release2, waitAfterRelease2 := spawnAndWaitQueued(t, ctx, "1", limiter, gauge, 5)
+ require.Equal(t, 1, gauge.enter)
+ require.Equal(t, 6, gauge.queued)
+
+ // Limiter rejects new request
+ maximumQueueSizeReached(t, ctx, "1", limiter)
+
+ // Update the limit
+ limit.Update(6)
+ waitAcquired2()
+ require.Equal(t, 6, gauge.enter)
+
+ // 5 requests queuing for the tokens, the queue is full now
+ waitAcquired3, release3, waitAfterRelease3 := spawnAndWaitQueued(t, ctx, "1", limiter, gauge, 5)
+ require.Equal(t, 6, gauge.enter)
+ require.Equal(t, 11, gauge.queued)
+
+ // Limiter rejects new request
+ maximumQueueSizeReached(t, ctx, "1", limiter)
+
+ // Clean up
+ close(release1)
+ close(release2)
+ waitAfterRelease1()
+ waitAfterRelease2()
+ require.Equal(t, 6, gauge.exit)
+
+ waitAcquired3()
+ require.Equal(t, 11, gauge.enter)
+ close(release3)
+ waitAfterRelease3()
+ })
+
+ t.Run("decrease the limit when the queue is full", func(t *testing.T) {
+ ctx := testhelper.Context(t)
+ limit := NewAdaptiveLimit("dynamicLimit", AdaptiveSetting{Initial: 5, Max: 10, Min: 1})
+ gauge := &blockingQueueCounter{queuedCh: make(chan struct{})}
+ // Mind the queue length here
+ limiter := NewConcurrencyLimiter(ctx, limit, 3, nil, gauge)
+
+ // 5 requests acquired the tokens, the limiter is full now
+ release1, waitAfterRelease1 := spawnAndWaitAcquired(t, ctx, "1", limiter, gauge, 5)
+ require.Equal(t, 5, gauge.enter)
+ require.Equal(t, 5, gauge.queued)
+
+ // 5 requests queuing for the tokens, the queue is full now
+ waitAcquired2, release2, waitAfterRelease2 := spawnAndWaitQueued(t, ctx, "1", limiter, gauge, 3)
+ require.Equal(t, 5, gauge.enter)
+ require.Equal(t, 8, gauge.queued)
+
+ // Limiter rejects new request
+ maximumQueueSizeReached(t, ctx, "1", limiter)
+
+ // Update the limit.
+ limit.Update(3)
+
+ // The queue is still full
+ maximumQueueSizeReached(t, ctx, "1", limiter)
+
+ // Release first 5 requests and let the last 3 requests in
+ close(release1)
+ waitAfterRelease1()
+ require.Equal(t, 5, gauge.exit)
+ waitAcquired2()
+ require.Equal(t, 8, gauge.enter)
+
+ // Another 5 requests in queue. The queue is still full, meaning the concurrency is 3 and the queue is still 5.
+ waitAcquired3, release3, waitAfterRelease3 := spawnAndWaitQueued(t, ctx, "1", limiter, gauge, 3)
+ require.Equal(t, 8, gauge.enter)
+ require.Equal(t, 11, gauge.queued)
+ maximumQueueSizeReached(t, ctx, "1", limiter)
+
+ // Clean up
+ close(release2)
+ waitAfterRelease2()
+ require.Equal(t, 8, gauge.exit)
+ waitAcquired3()
+ require.Equal(t, 11, gauge.enter)
+ close(release3)
+ waitAfterRelease3()
+ require.Equal(t, 11, gauge.exit)
+ })
+
+ t.Run("dynamic limit works without queuing", func(t *testing.T) {
+ ctx := testhelper.Context(t)
+ limit := NewAdaptiveLimit("dynamicLimit", AdaptiveSetting{Initial: 5, Max: 10, Min: 1})
+ gauge := &blockingQueueCounter{queuedCh: make(chan struct{})}
+ // No queue, it means the limiter accepts unlimited requests
+ limiter := NewConcurrencyLimiter(ctx, limit, 0, nil, gauge)
+
+ // 5 requests acquired the tokens, the limiter is full now
+ release1, waitAfterRelease1 := spawnAndWaitAcquired(t, ctx, "1", limiter, gauge, 5)
+ require.Equal(t, 5, gauge.enter)
+ require.Equal(t, 5, gauge.queued)
+
+ // 5 more requests
+ waitAcquired2, release2, waitAfterRelease2 := spawnAndWaitQueued(t, ctx, "1", limiter, gauge, 5)
+
+ // Update the limit.
+ limit.Update(10)
+
+ // All of them acquired the tokens
+ waitAcquired2()
+ require.Equal(t, 10, gauge.enter)
+
+ // Clean up
+ close(release1)
+ close(release2)
+ waitAfterRelease1()
+ waitAfterRelease2()
+ require.Equal(t, 10, gauge.exit)
+ })
+
+ t.Run("dynamic limit works with queue timer", func(t *testing.T) {
+ ctx := testhelper.Context(t)
+ limit := NewAdaptiveLimit("dynamicLimit", AdaptiveSetting{Initial: 5, Max: 10, Min: 1})
+ gauge := &blockingQueueCounter{queuedCh: make(chan struct{})}
+
+ ticker := helper.NewManualTicker()
+ limiter := NewConcurrencyLimiter(ctx, limit, 0, func() helper.Ticker { return ticker }, gauge)
+
+ // 5 requests acquired the tokens, the limiter is full now
+ release1, waitAfterRelease1 := spawnAndWaitAcquired(t, ctx, "1", limiter, gauge, 5)
+ require.Equal(t, 5, gauge.enter)
+ require.Equal(t, 5, gauge.queued)
+
+ errors := make(chan error, 10)
+ // 5 requests in queue
+ spawnQueuedAndCollectErrors(ctx, "1", limiter, gauge, 5, errors)
+
+ // Decrease the limit
+ limit.Update(3)
+
+ // 5 more requests in queue
+ spawnQueuedAndCollectErrors(ctx, "1", limiter, gauge, 5, errors)
+
+ // Trigger timeout event
+ for i := 0; i < 10; i++ {
+ ticker.Tick()
+ require.EqualError(t, <-errors, "maximum time in concurrency queue reached")
+ }
+
+ // Other goroutines exit as normal
+ close(release1)
+ waitAfterRelease1()
+ require.Equal(t, 5, gauge.exit)
+ })
+
+ t.Run("dynamic limit works with context cancellation", func(t *testing.T) {
+ ctx := testhelper.Context(t)
+ ctx2, cancel := context.WithCancel(testhelper.Context(t))
+
+ limit := NewAdaptiveLimit("dynamicLimit", AdaptiveSetting{Initial: 5, Max: 10, Min: 1})
+ gauge := &blockingQueueCounter{queuedCh: make(chan struct{})}
+
+ limiter := NewConcurrencyLimiter(ctx, limit, 0, nil, gauge)
+
+ // 5 requests acquired the tokens, the limiter is full now
+ release1, waitAfterRelease1 := spawnAndWaitAcquired(t, ctx, "1", limiter, gauge, 5)
+ require.Equal(t, 5, gauge.enter)
+ require.Equal(t, 5, gauge.queued)
+
+ errors := make(chan error, 10)
+ // 5 requests in queue
+ spawnQueuedAndCollectErrors(ctx2, "1", limiter, gauge, 5, errors)
+
+ // Decrease the limit
+ limit.Update(3)
+
+ // 5 more requests in queue
+ spawnQueuedAndCollectErrors(ctx2, "1", limiter, gauge, 5, errors)
+
+ // Trigger context cancellation
+ cancel()
+ for i := 0; i < 10; i++ {
+ require.EqualError(t, <-errors, "unexpected error when dequeueing request: context canceled")
+ }
+
+ // Other goroutines exit as normal
+ close(release1)
+ waitAfterRelease1()
+ require.Equal(t, 5, gauge.exit)
+ })
+
+ t.Run("dynamic limit works with multiple buckets", func(t *testing.T) {
+ ctx := testhelper.Context(t)
+ limit := NewAdaptiveLimit("dynamicLimit", AdaptiveSetting{Initial: 5, Max: 10, Min: 1})
+ gauge := &blockingQueueCounter{queuedCh: make(chan struct{})}
+
+ limiter := NewConcurrencyLimiter(ctx, limit, 5, nil, gauge)
+
+ var releaseChans []chan struct{}
+ var waitAcquireFuncs, waitReleaseFuncs []func()
+
+ // 5 * 5 requests acquired tokens
+ for i := 1; i <= 5; i++ {
+ release, waitAfterRelease := spawnAndWaitAcquired(t, ctx, fmt.Sprintf("%d", i), limiter, gauge, 5)
+ releaseChans = append(releaseChans, release)
+ waitReleaseFuncs = append(waitReleaseFuncs, waitAfterRelease)
+ }
+ require.Equal(t, 25, gauge.enter)
+
+ // 1 + 2 + 3 + 4 + 5 requests are in queue
+ for i := 1; i <= 5; i++ {
+ waitAcquired, release, waitAfterRelease := spawnAndWaitQueued(t, ctx, fmt.Sprintf("%d", i), limiter, gauge, i)
+ waitAcquireFuncs = append(waitAcquireFuncs, waitAcquired)
+ releaseChans = append(releaseChans, release)
+ waitReleaseFuncs = append(waitReleaseFuncs, waitAfterRelease)
+ }
+ require.Equal(t, 25, gauge.enter)
+ require.Equal(t, 40, gauge.queued)
+
+ // Update limit, enough for all requests
+ limit.Update(10)
+
+ // All requests acquired tokens now
+ for _, wait := range waitAcquireFuncs {
+ wait()
+ }
+ require.Equal(t, 40, gauge.enter)
+
+ // Release all
+ for _, release := range releaseChans {
+ close(release)
+ }
+ for _, wait := range waitReleaseFuncs {
+ wait()
+ }
+ require.Equal(t, 40, gauge.exit)
+ })
+}
+
+// spawnAndWaitAcquired spawns N goroutines that wait for the limiter. They wait until all of them acquire the limiter
+// token before exiting. This function returns a channel to control token release and a function to wait until all
+// goroutines finish.
+func spawnAndWaitAcquired(t *testing.T, ctx context.Context, bucket string, limiter *ConcurrencyLimiter, gauge *blockingQueueCounter, n int) (chan struct{}, func()) {
+ var acquireWg, releaseWg sync.WaitGroup
+ release := make(chan struct{})
+
+ for i := 0; i < n; i++ {
+ acquireWg.Add(1)
+ releaseWg.Add(1)
+ go func() {
+ defer releaseWg.Done()
+ _, err := limiter.Limit(ctx, bucket, func() (resp interface{}, err error) {
+ acquireWg.Done()
+ <-release
+ return nil, nil
+ })
+ require.NoError(t, err)
+ }()
+ }
+ for i := 0; i < n; i++ {
+ <-gauge.queuedCh
+ gauge.queued++
+ }
+ acquireWg.Wait()
+
+ return release, releaseWg.Wait
+}
+
+// spawnAndWaitQueued spawns N goroutines that wait for the limiter. They wait until all of them are queued. This
+// function returns a function to wait for channel to acquired the token, a channel to control token release, and a
+// function to wait until all goroutines finish.
+func spawnAndWaitQueued(t *testing.T, ctx context.Context, bucket string, limiter *ConcurrencyLimiter, gauge *blockingQueueCounter, n int) (func(), chan struct{}, func()) {
+ var acquireWg, releaseWg sync.WaitGroup
+ release := make(chan struct{})
+
+ for i := 0; i < n; i++ {
+ acquireWg.Add(1)
+ releaseWg.Add(1)
+ go func() {
+ defer releaseWg.Done()
+ _, err := limiter.Limit(ctx, bucket, func() (resp interface{}, err error) {
+ acquireWg.Done()
+ <-release
+ return nil, nil
+ })
+ require.NoError(t, err)
+ }()
+ }
+ for i := 0; i < n; i++ {
+ <-gauge.queuedCh
+ gauge.queued++
+ }
+
+ return acquireWg.Wait, release, releaseWg.Wait
+}
+
+func spawnQueuedAndCollectErrors(ctx context.Context, bucket string, limiter *ConcurrencyLimiter, gauge *blockingQueueCounter, n int, errors chan error) {
+ for i := 0; i < n; i++ {
+ go func() {
+ _, err := limiter.Limit(ctx, bucket, func() (interface{}, error) {
+ return nil, fmt.Errorf("should not call")
+ })
+ errors <- err
+ }()
+ }
+ for i := 0; i < n; i++ {
+ <-gauge.queuedCh
+ gauge.queued++
+ }
+}
+
+func maximumQueueSizeReached(t *testing.T, ctx context.Context, bucket string, limiter *ConcurrencyLimiter) {
+ _, err := limiter.Limit(ctx, bucket, func() (interface{}, error) {
+ return nil, fmt.Errorf("should not call")
+ })
+ require.EqualError(t, err, "maximum queue size reached")
+}
+
type blockingQueueCounter struct {
counter
@@ -249,7 +801,7 @@ func TestConcurrencyLimiter_queueLimit(t *testing.T) {
monitorCh := make(chan struct{})
monitor := &blockingQueueCounter{queuedCh: monitorCh}
ch := make(chan struct{})
- limiter := NewConcurrencyLimiter(ctx, 1, queueLimit, nil, monitor)
+ limiter := NewConcurrencyLimiter(ctx, NewAdaptiveLimit("staticLimit", AdaptiveSetting{Initial: 1}), queueLimit, nil, monitor)
// occupied with one live request that takes a long time to complete
go func() {
@@ -335,7 +887,7 @@ func TestLimitConcurrency_queueWaitTime(t *testing.T) {
limiter := NewConcurrencyLimiter(
ctx,
- 1,
+ NewAdaptiveLimit("staticLimit", AdaptiveSetting{Initial: 1}),
0,
func() helper.Ticker {
return ticker
diff --git a/internal/testhelper/testserver/gitaly.go b/internal/testhelper/testserver/gitaly.go
index cf76158b8..6352fcfe0 100644
--- a/internal/testhelper/testserver/gitaly.go
+++ b/internal/testhelper/testserver/gitaly.go
@@ -329,7 +329,7 @@ func (gsd *gitalyServerDeps) createDependencies(tb testing.TB, ctx context.Conte
if gsd.packObjectsLimiter == nil {
gsd.packObjectsLimiter = limiter.NewConcurrencyLimiter(
ctx,
- 0,
+ limiter.NewAdaptiveLimit("staticLimit", limiter.AdaptiveSetting{Initial: 0}),
0,
nil,
limiter.NewNoopConcurrencyMonitor(),