Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--internal/cli/gitaly/serve.go7
-rw-r--r--internal/gitaly/service/hook/pack_objects_test.go2
-rw-r--r--internal/grpc/middleware/limithandler/middleware.go4
-rw-r--r--internal/limiter/concurrency_limiter.go95
-rw-r--r--internal/limiter/concurrency_limiter_test.go560
-rw-r--r--internal/testhelper/testserver/gitaly.go2
6 files changed, 625 insertions, 45 deletions
diff --git a/internal/cli/gitaly/serve.go b/internal/cli/gitaly/serve.go
index 0b13c2a2a..302fd880d 100644
--- a/internal/cli/gitaly/serve.go
+++ b/internal/cli/gitaly/serve.go
@@ -271,6 +271,11 @@ func run(cfg config.Cfg, logger logrus.FieldLogger) error {
return fmt.Errorf("disk cache walkers: %w", err)
}
+ // The pack-objects limit below is static at this stage. It's always equal to the initial limit, which uses
+ // MaxConcurrency config.
+ packObjectLimit := limiter.NewAdaptiveLimit("packObjects", limiter.AdaptiveSetting{
+ Initial: cfg.PackObjectsLimiting.MaxConcurrency,
+ })
concurrencyLimitHandler := limithandler.New(
cfg,
limithandler.LimitConcurrencyByRepo,
@@ -296,7 +301,7 @@ func run(cfg config.Cfg, logger logrus.FieldLogger) error {
}
packObjectsLimiter := limiter.NewConcurrencyLimiter(
ctx,
- cfg.PackObjectsLimiting.MaxConcurrency,
+ packObjectLimit,
cfg.PackObjectsLimiting.MaxQueueLength,
newTickerFunc,
packObjectsMonitor,
diff --git a/internal/gitaly/service/hook/pack_objects_test.go b/internal/gitaly/service/hook/pack_objects_test.go
index 35dffadbc..62fa0dca8 100644
--- a/internal/gitaly/service/hook/pack_objects_test.go
+++ b/internal/gitaly/service/hook/pack_objects_test.go
@@ -859,7 +859,7 @@ func TestPackObjects_concurrencyLimit(t *testing.T) {
)
limiter := limiter.NewConcurrencyLimiter(
ctx,
- 1,
+ limiter.NewAdaptiveLimit("staticLimit", limiter.AdaptiveSetting{Initial: 1}),
0,
func() helper.Ticker { return ticker },
monitor,
diff --git a/internal/grpc/middleware/limithandler/middleware.go b/internal/grpc/middleware/limithandler/middleware.go
index ebbe44662..a6ed93f9a 100644
--- a/internal/grpc/middleware/limithandler/middleware.go
+++ b/internal/grpc/middleware/limithandler/middleware.go
@@ -218,7 +218,7 @@ func WithConcurrencyLimiters(ctx context.Context) SetupFunc {
result[limit.RPC] = limiter.NewConcurrencyLimiter(
ctx,
- limit.MaxPerRepo,
+ limiter.NewAdaptiveLimit("staticLimit", limiter.AdaptiveSetting{Initial: limit.MaxPerRepo}),
limit.MaxQueueSize,
newTickerFunc,
limiter.NewPerRPCPromMonitor(
@@ -233,7 +233,7 @@ func WithConcurrencyLimiters(ctx context.Context) SetupFunc {
if _, ok := result[replicateRepositoryFullMethod]; !ok {
result[replicateRepositoryFullMethod] = limiter.NewConcurrencyLimiter(
ctx,
- 1,
+ limiter.NewAdaptiveLimit("staticLimit", limiter.AdaptiveSetting{Initial: 1}),
0,
func() helper.Ticker {
return helper.NewManualTicker()
diff --git a/internal/limiter/concurrency_limiter.go b/internal/limiter/concurrency_limiter.go
index 32b08b32e..b4284adeb 100644
--- a/internal/limiter/concurrency_limiter.go
+++ b/internal/limiter/concurrency_limiter.go
@@ -41,10 +41,10 @@ type keyedConcurrencyLimiter struct {
// concurrencyTokens is the channel of available concurrency tokens, where every token
// allows one concurrent call to the concurrency-limited function.
- concurrencyTokens chan struct{}
+ concurrencyTokens *resizableSemaphore
// queueTokens is the channel of available queue tokens, where every token allows one
// concurrent call to be admitted to the queue.
- queueTokens chan struct{}
+ queueTokens *resizableSemaphore
}
// acquire tries to acquire the semaphore. It may fail if the admission queue is full or if the max
@@ -55,34 +55,33 @@ func (sem *keyedConcurrencyLimiter) acquire(ctx context.Context, limitingKey str
// callers may wait for the concurrency token at the same time. If there are no more
// queueing tokens then this indicates that the queue is full and we thus return an
// error immediately.
- select {
- case sem.queueTokens <- struct{}{}:
- // We have acquired a queueing token, so we need to release it if acquiring
- // the concurrency token fails. If we succeed to acquire the concurrency
- // token though then we retain the queueing token until the caller signals
- // that the concurrency-limited function has finished. As a consequence the
- // queue token is returned together with the concurrency token.
- //
- // A simpler model would be to just have `maxQueueLength` many queueing
- // tokens. But this would add concurrency-limiting when acquiring the queue
- // token itself, which is not what we want to do. Instead, we want to admit
- // as many callers into the queue as the queue length permits plus the
- // number of available concurrency tokens allows.
- defer func() {
- if returnedErr != nil {
- <-sem.queueTokens
- }
- }()
- default:
+ if !sem.queueTokens.TryAcquire() {
return ErrMaxQueueSize
}
+
+ // We have acquired a queueing token, so we need to release it if acquiring
+ // the concurrency token fails. If we successfully acquire the concurrency
+ // token though then we retain the queueing token until the caller signals
+ // that the concurrency-limited function has finished. As a consequence the
+ // queue token is returned together with the concurrency token.
+ //
+ // A simpler model would be to just have `maxQueueLength` many queueing
+ // tokens. But this would add concurrency-limiting when acquiring the queue
+ // token itself, which is not what we want to do. Instead, we want to admit
+ // as many callers into the queue as the queue length permits plus the
+ // number of available concurrency tokens allows.
+ defer func() {
+ if returnedErr != nil {
+ sem.queueTokens.Release()
+ }
+ }()
}
// We are queued now, so let's tell the monitor. Furthermore, even though we're still
// holding the queueing token when this function exits successfully we also tell the monitor
// that we have exited the queue. It is only an implementation detail anyway that we hold on
// to the token, so the monitor shouldn't care about that.
- sem.monitor.Queued(ctx, limitingKey, len(sem.queueTokens))
+ sem.monitor.Queued(ctx, limitingKey, sem.queueLength())
defer sem.monitor.Dequeued(ctx)
// Set up the ticker that keeps us from waiting indefinitely on the concurrency token.
@@ -98,7 +97,12 @@ func (sem *keyedConcurrencyLimiter) acquire(ctx context.Context, limitingKey str
// Try to acquire the concurrency token now that we're in the queue.
select {
- case sem.concurrencyTokens <- struct{}{}:
+ case acquired := <-sem.concurrencyTokens.Acquire():
+ // When the semaphore returns false, the semaphore was stopped. It's likely due to the context is
+ // cancelled. Hence, we should return the error here.
+ if !acquired {
+ return sem.concurrencyTokens.Err()
+ }
return nil
case <-ticker.C():
return ErrMaxQueueTime
@@ -110,14 +114,17 @@ func (sem *keyedConcurrencyLimiter) acquire(ctx context.Context, limitingKey str
// release releases the acquired tokens.
func (sem *keyedConcurrencyLimiter) release() {
if sem.queueTokens != nil {
- <-sem.queueTokens
+ sem.queueTokens.Release()
}
- <-sem.concurrencyTokens
+ sem.concurrencyTokens.Release()
}
// queueLength returns the length of token queue
func (sem *keyedConcurrencyLimiter) queueLength() int {
- return len(sem.queueTokens)
+ if sem.queueTokens == nil {
+ return 0
+ }
+ return int(sem.queueTokens.Current())
}
// ConcurrencyLimiter contains rate limiter state.
@@ -125,9 +132,9 @@ type ConcurrencyLimiter struct {
// ctx stores the context at initialization. This context is used as a stopping condition
// for some internal goroutines.
ctx context.Context
- // maxConcurrencyLimit is the maximum number of concurrent calls to the limited function.
- // This limit is per key.
- maxConcurrencyLimit int64
+ // limit is the adaptive maximum number of concurrent calls to the limited function. This limit is
+ // calculated adaptively from an outside calculator.
+ limit *AdaptiveLimit
// maxQueueLength is the maximum number of operations allowed to wait in a queued state.
// This limit is global and applies before the concurrency limit. Subsequent incoming
// operations will be rejected with an error immediately.
@@ -148,19 +155,31 @@ type ConcurrencyLimiter struct {
}
// NewConcurrencyLimiter creates a new concurrency rate limiter.
-func NewConcurrencyLimiter(ctx context.Context, maxConcurrencyLimit, maxQueueLength int, maxQueuedTickerCreator QueueTickerCreator, monitor ConcurrencyMonitor) *ConcurrencyLimiter {
+func NewConcurrencyLimiter(ctx context.Context, limit *AdaptiveLimit, maxQueueLength int, maxQueuedTickerCreator QueueTickerCreator, monitor ConcurrencyMonitor) *ConcurrencyLimiter {
if monitor == nil {
monitor = NewNoopConcurrencyMonitor()
}
- return &ConcurrencyLimiter{
+ limiter := &ConcurrencyLimiter{
ctx: ctx,
- maxConcurrencyLimit: int64(maxConcurrencyLimit),
+ limit: limit,
maxQueueLength: int64(maxQueueLength),
maxQueuedTickerCreator: maxQueuedTickerCreator,
monitor: monitor,
limitsByKey: make(map[string]*keyedConcurrencyLimiter),
}
+
+ // When the capacity of the limiter is updated we also need to update the size of both the queuing tokens as
+ // well as the concurrency tokens to match the new size.
+ limit.AfterUpdate(func(val int) {
+ for _, keyedLimiter := range limiter.limitsByKey {
+ if keyedLimiter.queueTokens != nil {
+ keyedLimiter.queueTokens.Resize(int64(val) + limiter.maxQueueLength)
+ }
+ keyedLimiter.concurrencyTokens.Resize(int64(val))
+ }
+ })
+ return limiter
}
// Limit will limit the concurrency of the limited function f. There are two distinct mechanisms
@@ -180,7 +199,7 @@ func (c *ConcurrencyLimiter) Limit(ctx context.Context, limitingKey string, f Li
)
defer span.Finish()
- if c.maxConcurrencyLimit <= 0 {
+ if c.currentLimit() <= 0 {
return f()
}
@@ -226,15 +245,15 @@ func (c *ConcurrencyLimiter) getConcurrencyLimit(limitingKey string) *keyedConcu
// Set up the queue tokens in case a maximum queue length was requested. As the
// queue tokens are kept during the whole lifetime of the concurrency-limited
// function we add the concurrency tokens to the number of available token.
- var queueTokens chan struct{}
+ var queueTokens *resizableSemaphore
if c.maxQueueLength > 0 {
- queueTokens = make(chan struct{}, c.maxConcurrencyLimit+c.maxQueueLength)
+ queueTokens = NewResizableSemaphore(c.ctx, c.currentLimit()+c.maxQueueLength)
}
c.limitsByKey[limitingKey] = &keyedConcurrencyLimiter{
monitor: c.monitor,
maxQueuedTickerCreator: c.maxQueuedTickerCreator,
- concurrencyTokens: make(chan struct{}, c.maxConcurrencyLimit),
+ concurrencyTokens: NewResizableSemaphore(c.ctx, c.currentLimit()),
queueTokens: queueTokens,
}
}
@@ -272,3 +291,7 @@ func (c *ConcurrencyLimiter) countSemaphores() int {
return len(c.limitsByKey)
}
+
+func (c *ConcurrencyLimiter) currentLimit() int64 {
+ return int64(c.limit.Current())
+}
diff --git a/internal/limiter/concurrency_limiter_test.go b/internal/limiter/concurrency_limiter_test.go
index b650ae5f5..6ba48d243 100644
--- a/internal/limiter/concurrency_limiter_test.go
+++ b/internal/limiter/concurrency_limiter_test.go
@@ -3,6 +3,7 @@ package limiter
import (
"context"
"errors"
+ "fmt"
"strconv"
"sync"
"testing"
@@ -86,7 +87,7 @@ func (c *counter) Dropped(_ context.Context, _ string, _ int, _ time.Duration, r
}
}
-func TestLimiter(t *testing.T) {
+func TestLimiter_static(t *testing.T) {
t.Parallel()
tests := []struct {
@@ -156,7 +157,7 @@ func TestLimiter(t *testing.T) {
limiter := NewConcurrencyLimiter(
ctx,
- tt.maxConcurrency,
+ NewAdaptiveLimit("staticLimit", AdaptiveSetting{Initial: tt.maxConcurrency}),
0,
nil,
gauge,
@@ -230,6 +231,557 @@ func TestLimiter(t *testing.T) {
}
}
+func TestLimiter_dynamic(t *testing.T) {
+ t.Parallel()
+
+ t.Run("increase dynamic limit when there is no queuing request", func(t *testing.T) {
+ ctx := testhelper.Context(t)
+ limit := NewAdaptiveLimit("dynamicLimit", AdaptiveSetting{Initial: 5, Max: 10, Min: 1})
+ gauge := &blockingQueueCounter{queuedCh: make(chan struct{})}
+ limiter := NewConcurrencyLimiter(ctx, limit, 10, nil, gauge)
+
+ // 5 requests acquired the tokens, the limiter is full now
+ release1, waitAfterRelease1 := spawnAndWaitAcquired(t, ctx, "1", limiter, gauge, 5)
+ require.Equal(t, 5, gauge.enter)
+
+ // Update the limit to 7
+ limit.Update(7)
+
+ // 2 more requests acquired the token. This proves the limit is expanded
+ release2, waitAfterRelease2 := spawnAndWaitAcquired(t, ctx, "1", limiter, gauge, 2)
+ require.Equal(t, 7, gauge.enter)
+
+ close(release1)
+ close(release2)
+ waitAfterRelease1()
+ waitAfterRelease2()
+ require.Equal(t, 7, gauge.exit)
+ })
+
+ t.Run("decrease dynamic limit when there is no queuing request", func(t *testing.T) {
+ ctx := testhelper.Context(t)
+ limit := NewAdaptiveLimit("dynamicLimit", AdaptiveSetting{Initial: 5, Max: 10, Min: 1})
+ gauge := &blockingQueueCounter{queuedCh: make(chan struct{})}
+ limiter := NewConcurrencyLimiter(ctx, limit, 10, nil, gauge)
+
+ // 3 requests acquired the tokens, 2 slots left
+ release1, waitAfterRelease1 := spawnAndWaitAcquired(t, ctx, "1", limiter, gauge, 3)
+ require.Equal(t, 3, gauge.enter)
+ require.Equal(t, 3, gauge.queued)
+
+ // Update the limit to 3
+ limit.Update(3)
+
+ // 2 requests are put in queue, meaning the limit shrinks down
+ waitAcquired2, release2, waitAfterRelease2 := spawnAndWaitQueued(t, ctx, "1", limiter, gauge, 2)
+ require.Equal(t, 3, gauge.enter)
+ require.Equal(t, 5, gauge.queued)
+
+ // Release first 3 requests
+ close(release1)
+ waitAfterRelease1()
+
+ // Now the last 2 requests can acquire token
+ waitAcquired2()
+ require.Equal(t, 5, gauge.enter)
+ require.Equal(t, 5, gauge.queued)
+ require.Equal(t, 3, gauge.exit)
+
+ // Release the last patch
+ close(release2)
+ waitAfterRelease2()
+ require.Equal(t, 5, gauge.exit)
+ })
+
+ t.Run("increase dynamic limit more than the number of queuing requests", func(t *testing.T) {
+ ctx := testhelper.Context(t)
+ limit := NewAdaptiveLimit("dynamicLimit", AdaptiveSetting{Initial: 5, Max: 10, Min: 1})
+ gauge := &blockingQueueCounter{queuedCh: make(chan struct{})}
+ limiter := NewConcurrencyLimiter(ctx, limit, 10, nil, gauge)
+
+ // 5 requests acquired the tokens, the limiter is full now
+ release1, waitAfterRelease1 := spawnAndWaitAcquired(t, ctx, "1", limiter, gauge, 5)
+ require.Equal(t, 5, gauge.enter)
+
+ // 2 requests waiting in the queue
+ waitAcquired2, release2, waitAfterRelease2 := spawnAndWaitQueued(t, ctx, "1", limiter, gauge, 2)
+ require.Equal(t, 5, gauge.enter)
+ require.Equal(t, 7, gauge.queued)
+
+ // Update the limit to 7
+ limit.Update(7)
+
+ // Wait for the other 2 requests acquired the token. This proves the limiter is expanded
+ waitAcquired2()
+ require.Equal(t, 7, gauge.enter)
+
+ close(release1)
+ close(release2)
+ waitAfterRelease1()
+ waitAfterRelease2()
+ require.Equal(t, 7, gauge.exit)
+ })
+
+ t.Run("increase dynamic limit less than the number of queuing requests", func(t *testing.T) {
+ ctx := testhelper.Context(t)
+ limit := NewAdaptiveLimit("dynamicLimit", AdaptiveSetting{Initial: 5, Max: 10, Min: 1})
+ gauge := &blockingQueueCounter{queuedCh: make(chan struct{})}
+ limiter := NewConcurrencyLimiter(ctx, limit, 10, nil, gauge)
+
+ // 5 requests acquired the tokens, the limiter is full now
+ release1, waitAfterRelease1 := spawnAndWaitAcquired(t, ctx, "1", limiter, gauge, 5)
+ require.Equal(t, 5, gauge.enter)
+
+ // 2 requests waiting in the queue
+ waitAcquired2, release2, waitAfterRelease2 := spawnAndWaitQueued(t, ctx, "1", limiter, gauge, 2)
+ require.Equal(t, 5, gauge.enter)
+ require.Equal(t, 7, gauge.queued)
+
+ // 5 more requests waiting in the queue
+ waitAcquired3, release3, waitAfterRelease3 := spawnAndWaitQueued(t, ctx, "1", limiter, gauge, 5)
+ require.Equal(t, 5, gauge.enter)
+ require.Equal(t, 12, gauge.queued)
+
+ // Update the limit to 7.
+ limit.Update(7)
+
+ // Release first 5 requests, all requests should fit in the queue now.
+ close(release1)
+ waitAfterRelease1()
+ require.Equal(t, 5, gauge.exit)
+
+ waitAcquired2()
+ waitAcquired3()
+ require.Equal(t, 12, gauge.enter)
+
+ // Now release all requests
+ close(release2)
+ close(release3)
+ waitAfterRelease2()
+ waitAfterRelease3()
+ require.Equal(t, 12, gauge.exit)
+ })
+
+ t.Run("decrease dynamic limit less than the number of concurrent requests", func(t *testing.T) {
+ ctx := testhelper.Context(t)
+ limit := NewAdaptiveLimit("dynamicLimit", AdaptiveSetting{Initial: 5, Max: 10, Min: 1})
+ gauge := &blockingQueueCounter{queuedCh: make(chan struct{})}
+ limiter := NewConcurrencyLimiter(ctx, limit, 10, nil, gauge)
+
+ // 5 requests acquired the tokens, the limiter is full now
+ release1, waitAfterRelease1 := spawnAndWaitAcquired(t, ctx, "1", limiter, gauge, 5)
+ require.Equal(t, 5, gauge.enter)
+
+ // Update the limit to 3
+ limit.Update(3)
+
+ // 3 requests are put in queue
+ waitAcquired2, release2, waitAfterRelease2 := spawnAndWaitQueued(t, ctx, "1", limiter, gauge, 3)
+ require.Equal(t, 5, gauge.enter)
+ require.Equal(t, 8, gauge.queued)
+
+ // Release the first 5 requests
+ close(release1)
+ waitAfterRelease1()
+ require.Equal(t, 5, gauge.exit)
+
+ // Now the last 3 requests acquire the tokens
+ waitAcquired2()
+ require.Equal(t, 8, gauge.enter)
+
+ // 1 more request is put in queue, meaning the limit shrinks down to 3.
+ waitAcquired3, release3, waitAfterRelease3 := spawnAndWaitQueued(t, ctx, "1", limiter, gauge, 1)
+ require.Equal(t, 8, gauge.enter)
+ require.Equal(t, 9, gauge.queued)
+
+ // Release the second 3 requests
+ close(release2)
+ waitAfterRelease2()
+ require.Equal(t, 8, gauge.exit)
+
+ // The last request acquires the token
+ waitAcquired3()
+ require.Equal(t, 9, gauge.enter)
+
+ // Release the last request
+ close(release3)
+ waitAfterRelease3()
+ require.Equal(t, 9, gauge.exit)
+ })
+
+ t.Run("increase and decrease dynamic limit multiple times", func(t *testing.T) {
+ ctx := testhelper.Context(t)
+ limit := NewAdaptiveLimit("dynamicLimit", AdaptiveSetting{Initial: 5, Max: 10, Min: 1})
+ gauge := &blockingQueueCounter{queuedCh: make(chan struct{})}
+ limiter := NewConcurrencyLimiter(ctx, limit, 10, nil, gauge)
+
+ // Update the limit to 7
+ limit.Update(7)
+
+ // 5 requests acquired the tokens
+ release1, waitAfterRelease1 := spawnAndWaitAcquired(t, ctx, "1", limiter, gauge, 5)
+ require.Equal(t, 5, gauge.enter)
+
+ // Update the limit to 3
+ limit.Update(3)
+
+ // 3 requests are put in queue
+ waitAcquired2, release2, waitAfterRelease2 := spawnAndWaitQueued(t, ctx, "1", limiter, gauge, 3)
+ require.Equal(t, 5, gauge.enter)
+ require.Equal(t, 8, gauge.queued)
+
+ // Update the limit to 10
+ limit.Update(10)
+
+ // All existing requests acquire the tokens
+ waitAcquired2()
+ require.Equal(t, 8, gauge.enter)
+
+ // 2 more requests
+ release3, waitAfterRelease3 := spawnAndWaitAcquired(t, ctx, "1", limiter, gauge, 2)
+ require.Equal(t, 10, gauge.enter)
+
+ // Update the limit to 1
+ limit.Update(1)
+
+ // Now release all of them
+ close(release1)
+ waitAfterRelease1()
+ require.Equal(t, 5, gauge.exit)
+
+ close(release2)
+ waitAfterRelease2()
+ require.Equal(t, 8, gauge.exit)
+
+ close(release3)
+ waitAfterRelease3()
+ require.Equal(t, 10, gauge.exit)
+ })
+
+ t.Run("increase the limit when the queue is full", func(t *testing.T) {
+ ctx := testhelper.Context(t)
+ limit := NewAdaptiveLimit("dynamicLimit", AdaptiveSetting{Initial: 1, Max: 10, Min: 1})
+ gauge := &blockingQueueCounter{queuedCh: make(chan struct{})}
+ // Mind the queue length here
+ limiter := NewConcurrencyLimiter(ctx, limit, 5, nil, gauge)
+
+ // 1 requests acquired the tokens, the limiter is full now
+ release1, waitAfterRelease1 := spawnAndWaitAcquired(t, ctx, "1", limiter, gauge, 1)
+ require.Equal(t, 1, gauge.enter)
+ require.Equal(t, 1, gauge.queued)
+
+ // 5 requests queuing for the tokens, the queue is full now
+ waitAcquired2, release2, waitAfterRelease2 := spawnAndWaitQueued(t, ctx, "1", limiter, gauge, 5)
+ require.Equal(t, 1, gauge.enter)
+ require.Equal(t, 6, gauge.queued)
+
+ // Limiter rejects new request
+ maximumQueueSizeReached(t, ctx, "1", limiter)
+
+ // Update the limit
+ limit.Update(6)
+ waitAcquired2()
+ require.Equal(t, 6, gauge.enter)
+
+ // 5 requests queuing for the tokens, the queue is full now
+ waitAcquired3, release3, waitAfterRelease3 := spawnAndWaitQueued(t, ctx, "1", limiter, gauge, 5)
+ require.Equal(t, 6, gauge.enter)
+ require.Equal(t, 11, gauge.queued)
+
+ // Limiter rejects new request
+ maximumQueueSizeReached(t, ctx, "1", limiter)
+
+ // Clean up
+ close(release1)
+ close(release2)
+ waitAfterRelease1()
+ waitAfterRelease2()
+ require.Equal(t, 6, gauge.exit)
+
+ waitAcquired3()
+ require.Equal(t, 11, gauge.enter)
+ close(release3)
+ waitAfterRelease3()
+ })
+
+ t.Run("decrease the limit when the queue is full", func(t *testing.T) {
+ ctx := testhelper.Context(t)
+ limit := NewAdaptiveLimit("dynamicLimit", AdaptiveSetting{Initial: 5, Max: 10, Min: 1})
+ gauge := &blockingQueueCounter{queuedCh: make(chan struct{})}
+ // Mind the queue length here
+ limiter := NewConcurrencyLimiter(ctx, limit, 3, nil, gauge)
+
+ // 5 requests acquired the tokens, the limiter is full now
+ release1, waitAfterRelease1 := spawnAndWaitAcquired(t, ctx, "1", limiter, gauge, 5)
+ require.Equal(t, 5, gauge.enter)
+ require.Equal(t, 5, gauge.queued)
+
+ // 5 requests queuing for the tokens, the queue is full now
+ waitAcquired2, release2, waitAfterRelease2 := spawnAndWaitQueued(t, ctx, "1", limiter, gauge, 3)
+ require.Equal(t, 5, gauge.enter)
+ require.Equal(t, 8, gauge.queued)
+
+ // Limiter rejects new request
+ maximumQueueSizeReached(t, ctx, "1", limiter)
+
+ // Update the limit.
+ limit.Update(3)
+
+ // The queue is still full
+ maximumQueueSizeReached(t, ctx, "1", limiter)
+
+ // Release first 5 requests and let the last 3 requests in
+ close(release1)
+ waitAfterRelease1()
+ require.Equal(t, 5, gauge.exit)
+ waitAcquired2()
+ require.Equal(t, 8, gauge.enter)
+
+ // Another 5 requests in queue. The queue is still full, meaning the concurrency is 3 and the queue is still 5.
+ waitAcquired3, release3, waitAfterRelease3 := spawnAndWaitQueued(t, ctx, "1", limiter, gauge, 3)
+ require.Equal(t, 8, gauge.enter)
+ require.Equal(t, 11, gauge.queued)
+ maximumQueueSizeReached(t, ctx, "1", limiter)
+
+ // Clean up
+ close(release2)
+ waitAfterRelease2()
+ require.Equal(t, 8, gauge.exit)
+ waitAcquired3()
+ require.Equal(t, 11, gauge.enter)
+ close(release3)
+ waitAfterRelease3()
+ require.Equal(t, 11, gauge.exit)
+ })
+
+ t.Run("dynamic limit works without queuing", func(t *testing.T) {
+ ctx := testhelper.Context(t)
+ limit := NewAdaptiveLimit("dynamicLimit", AdaptiveSetting{Initial: 5, Max: 10, Min: 1})
+ gauge := &blockingQueueCounter{queuedCh: make(chan struct{})}
+ // No queue, it means the limiter accepts unlimited requests
+ limiter := NewConcurrencyLimiter(ctx, limit, 0, nil, gauge)
+
+ // 5 requests acquired the tokens, the limiter is full now
+ release1, waitAfterRelease1 := spawnAndWaitAcquired(t, ctx, "1", limiter, gauge, 5)
+ require.Equal(t, 5, gauge.enter)
+ require.Equal(t, 5, gauge.queued)
+
+ // 5 more requests
+ waitAcquired2, release2, waitAfterRelease2 := spawnAndWaitQueued(t, ctx, "1", limiter, gauge, 5)
+
+ // Update the limit.
+ limit.Update(10)
+
+ // All of them acquired the tokens
+ waitAcquired2()
+ require.Equal(t, 10, gauge.enter)
+
+ // Clean up
+ close(release1)
+ close(release2)
+ waitAfterRelease1()
+ waitAfterRelease2()
+ require.Equal(t, 10, gauge.exit)
+ })
+
+ t.Run("dynamic limit works with queue timer", func(t *testing.T) {
+ ctx := testhelper.Context(t)
+ limit := NewAdaptiveLimit("dynamicLimit", AdaptiveSetting{Initial: 5, Max: 10, Min: 1})
+ gauge := &blockingQueueCounter{queuedCh: make(chan struct{})}
+
+ ticker := helper.NewManualTicker()
+ limiter := NewConcurrencyLimiter(ctx, limit, 0, func() helper.Ticker { return ticker }, gauge)
+
+ // 5 requests acquired the tokens, the limiter is full now
+ release1, waitAfterRelease1 := spawnAndWaitAcquired(t, ctx, "1", limiter, gauge, 5)
+ require.Equal(t, 5, gauge.enter)
+ require.Equal(t, 5, gauge.queued)
+
+ errors := make(chan error, 10)
+ // 5 requests in queue
+ spawnQueuedAndCollectErrors(ctx, "1", limiter, gauge, 5, errors)
+
+ // Decrease the limit
+ limit.Update(3)
+
+ // 5 more requests in queue
+ spawnQueuedAndCollectErrors(ctx, "1", limiter, gauge, 5, errors)
+
+ // Trigger timeout event
+ for i := 0; i < 10; i++ {
+ ticker.Tick()
+ require.EqualError(t, <-errors, "maximum time in concurrency queue reached")
+ }
+
+ // Other goroutines exit as normal
+ close(release1)
+ waitAfterRelease1()
+ require.Equal(t, 5, gauge.exit)
+ })
+
+ t.Run("dynamic limit works with context cancellation", func(t *testing.T) {
+ ctx := testhelper.Context(t)
+ ctx2, cancel := context.WithCancel(testhelper.Context(t))
+
+ limit := NewAdaptiveLimit("dynamicLimit", AdaptiveSetting{Initial: 5, Max: 10, Min: 1})
+ gauge := &blockingQueueCounter{queuedCh: make(chan struct{})}
+
+ limiter := NewConcurrencyLimiter(ctx, limit, 0, nil, gauge)
+
+ // 5 requests acquired the tokens, the limiter is full now
+ release1, waitAfterRelease1 := spawnAndWaitAcquired(t, ctx, "1", limiter, gauge, 5)
+ require.Equal(t, 5, gauge.enter)
+ require.Equal(t, 5, gauge.queued)
+
+ errors := make(chan error, 10)
+ // 5 requests in queue
+ spawnQueuedAndCollectErrors(ctx2, "1", limiter, gauge, 5, errors)
+
+ // Decrease the limit
+ limit.Update(3)
+
+ // 5 more requests in queue
+ spawnQueuedAndCollectErrors(ctx2, "1", limiter, gauge, 5, errors)
+
+ // Trigger context cancellation
+ cancel()
+ for i := 0; i < 10; i++ {
+ require.EqualError(t, <-errors, "unexpected error when dequeueing request: context canceled")
+ }
+
+ // Other goroutines exit as normal
+ close(release1)
+ waitAfterRelease1()
+ require.Equal(t, 5, gauge.exit)
+ })
+
+ t.Run("dynamic limit works with multiple buckets", func(t *testing.T) {
+ ctx := testhelper.Context(t)
+ limit := NewAdaptiveLimit("dynamicLimit", AdaptiveSetting{Initial: 5, Max: 10, Min: 1})
+ gauge := &blockingQueueCounter{queuedCh: make(chan struct{})}
+
+ limiter := NewConcurrencyLimiter(ctx, limit, 5, nil, gauge)
+
+ var releaseChans []chan struct{}
+ var waitAcquireFuncs, waitReleaseFuncs []func()
+
+ // 5 * 5 requests acquired tokens
+ for i := 1; i <= 5; i++ {
+ release, waitAfterRelease := spawnAndWaitAcquired(t, ctx, fmt.Sprintf("%d", i), limiter, gauge, 5)
+ releaseChans = append(releaseChans, release)
+ waitReleaseFuncs = append(waitReleaseFuncs, waitAfterRelease)
+ }
+ require.Equal(t, 25, gauge.enter)
+
+ // 1 + 2 + 3 + 4 + 5 requests are in queue
+ for i := 1; i <= 5; i++ {
+ waitAcquired, release, waitAfterRelease := spawnAndWaitQueued(t, ctx, fmt.Sprintf("%d", i), limiter, gauge, i)
+ waitAcquireFuncs = append(waitAcquireFuncs, waitAcquired)
+ releaseChans = append(releaseChans, release)
+ waitReleaseFuncs = append(waitReleaseFuncs, waitAfterRelease)
+ }
+ require.Equal(t, 25, gauge.enter)
+ require.Equal(t, 40, gauge.queued)
+
+ // Update limit, enough for all requests
+ limit.Update(10)
+
+ // All requests acquired tokens now
+ for _, wait := range waitAcquireFuncs {
+ wait()
+ }
+ require.Equal(t, 40, gauge.enter)
+
+ // Release all
+ for _, release := range releaseChans {
+ close(release)
+ }
+ for _, wait := range waitReleaseFuncs {
+ wait()
+ }
+ require.Equal(t, 40, gauge.exit)
+ })
+}
+
+// spawnAndWaitAcquired spawns N goroutines that wait for the limiter. They wait until all of them acquire the limiter
+// token before exiting. This function returns a channel to control token release and a function to wait until all
+// goroutines finish.
+func spawnAndWaitAcquired(t *testing.T, ctx context.Context, bucket string, limiter *ConcurrencyLimiter, gauge *blockingQueueCounter, n int) (chan struct{}, func()) {
+ var acquireWg, releaseWg sync.WaitGroup
+ release := make(chan struct{})
+
+ for i := 0; i < n; i++ {
+ acquireWg.Add(1)
+ releaseWg.Add(1)
+ go func() {
+ defer releaseWg.Done()
+ _, err := limiter.Limit(ctx, bucket, func() (resp interface{}, err error) {
+ acquireWg.Done()
+ <-release
+ return nil, nil
+ })
+ require.NoError(t, err)
+ }()
+ }
+ for i := 0; i < n; i++ {
+ <-gauge.queuedCh
+ gauge.queued++
+ }
+ acquireWg.Wait()
+
+ return release, releaseWg.Wait
+}
+
+// spawnAndWaitQueued spawns N goroutines that wait for the limiter. They wait until all of them are queued. This
+// function returns a function to wait for channel to acquired the token, a channel to control token release, and a
+// function to wait until all goroutines finish.
+func spawnAndWaitQueued(t *testing.T, ctx context.Context, bucket string, limiter *ConcurrencyLimiter, gauge *blockingQueueCounter, n int) (func(), chan struct{}, func()) {
+ var acquireWg, releaseWg sync.WaitGroup
+ release := make(chan struct{})
+
+ for i := 0; i < n; i++ {
+ acquireWg.Add(1)
+ releaseWg.Add(1)
+ go func() {
+ defer releaseWg.Done()
+ _, err := limiter.Limit(ctx, bucket, func() (resp interface{}, err error) {
+ acquireWg.Done()
+ <-release
+ return nil, nil
+ })
+ require.NoError(t, err)
+ }()
+ }
+ for i := 0; i < n; i++ {
+ <-gauge.queuedCh
+ gauge.queued++
+ }
+
+ return acquireWg.Wait, release, releaseWg.Wait
+}
+
+func spawnQueuedAndCollectErrors(ctx context.Context, bucket string, limiter *ConcurrencyLimiter, gauge *blockingQueueCounter, n int, errors chan error) {
+ for i := 0; i < n; i++ {
+ go func() {
+ _, err := limiter.Limit(ctx, bucket, func() (interface{}, error) {
+ return nil, fmt.Errorf("should not call")
+ })
+ errors <- err
+ }()
+ }
+ for i := 0; i < n; i++ {
+ <-gauge.queuedCh
+ gauge.queued++
+ }
+}
+
+func maximumQueueSizeReached(t *testing.T, ctx context.Context, bucket string, limiter *ConcurrencyLimiter) {
+ _, err := limiter.Limit(ctx, bucket, func() (interface{}, error) {
+ return nil, fmt.Errorf("should not call")
+ })
+ require.EqualError(t, err, "maximum queue size reached")
+}
+
type blockingQueueCounter struct {
counter
@@ -249,7 +801,7 @@ func TestConcurrencyLimiter_queueLimit(t *testing.T) {
monitorCh := make(chan struct{})
monitor := &blockingQueueCounter{queuedCh: monitorCh}
ch := make(chan struct{})
- limiter := NewConcurrencyLimiter(ctx, 1, queueLimit, nil, monitor)
+ limiter := NewConcurrencyLimiter(ctx, NewAdaptiveLimit("staticLimit", AdaptiveSetting{Initial: 1}), queueLimit, nil, monitor)
// occupied with one live request that takes a long time to complete
go func() {
@@ -335,7 +887,7 @@ func TestLimitConcurrency_queueWaitTime(t *testing.T) {
limiter := NewConcurrencyLimiter(
ctx,
- 1,
+ NewAdaptiveLimit("staticLimit", AdaptiveSetting{Initial: 1}),
0,
func() helper.Ticker {
return ticker
diff --git a/internal/testhelper/testserver/gitaly.go b/internal/testhelper/testserver/gitaly.go
index cf76158b8..6352fcfe0 100644
--- a/internal/testhelper/testserver/gitaly.go
+++ b/internal/testhelper/testserver/gitaly.go
@@ -329,7 +329,7 @@ func (gsd *gitalyServerDeps) createDependencies(tb testing.TB, ctx context.Conte
if gsd.packObjectsLimiter == nil {
gsd.packObjectsLimiter = limiter.NewConcurrencyLimiter(
ctx,
- 0,
+ limiter.NewAdaptiveLimit("staticLimit", limiter.AdaptiveSetting{Initial: 0}),
0,
nil,
limiter.NewNoopConcurrencyMonitor(),