diff options
-rw-r--r-- | internal/cli/gitaly/serve.go | 7 | ||||
-rw-r--r-- | internal/gitaly/service/hook/pack_objects_test.go | 2 | ||||
-rw-r--r-- | internal/grpc/middleware/limithandler/middleware.go | 4 | ||||
-rw-r--r-- | internal/limiter/concurrency_limiter.go | 95 | ||||
-rw-r--r-- | internal/limiter/concurrency_limiter_test.go | 560 | ||||
-rw-r--r-- | internal/testhelper/testserver/gitaly.go | 2 |
6 files changed, 625 insertions, 45 deletions
diff --git a/internal/cli/gitaly/serve.go b/internal/cli/gitaly/serve.go index 0b13c2a2a..302fd880d 100644 --- a/internal/cli/gitaly/serve.go +++ b/internal/cli/gitaly/serve.go @@ -271,6 +271,11 @@ func run(cfg config.Cfg, logger logrus.FieldLogger) error { return fmt.Errorf("disk cache walkers: %w", err) } + // The pack-objects limit below is static at this stage. It's always equal to the initial limit, which uses + // MaxConcurrency config. + packObjectLimit := limiter.NewAdaptiveLimit("packObjects", limiter.AdaptiveSetting{ + Initial: cfg.PackObjectsLimiting.MaxConcurrency, + }) concurrencyLimitHandler := limithandler.New( cfg, limithandler.LimitConcurrencyByRepo, @@ -296,7 +301,7 @@ func run(cfg config.Cfg, logger logrus.FieldLogger) error { } packObjectsLimiter := limiter.NewConcurrencyLimiter( ctx, - cfg.PackObjectsLimiting.MaxConcurrency, + packObjectLimit, cfg.PackObjectsLimiting.MaxQueueLength, newTickerFunc, packObjectsMonitor, diff --git a/internal/gitaly/service/hook/pack_objects_test.go b/internal/gitaly/service/hook/pack_objects_test.go index 35dffadbc..62fa0dca8 100644 --- a/internal/gitaly/service/hook/pack_objects_test.go +++ b/internal/gitaly/service/hook/pack_objects_test.go @@ -859,7 +859,7 @@ func TestPackObjects_concurrencyLimit(t *testing.T) { ) limiter := limiter.NewConcurrencyLimiter( ctx, - 1, + limiter.NewAdaptiveLimit("staticLimit", limiter.AdaptiveSetting{Initial: 1}), 0, func() helper.Ticker { return ticker }, monitor, diff --git a/internal/grpc/middleware/limithandler/middleware.go b/internal/grpc/middleware/limithandler/middleware.go index ebbe44662..a6ed93f9a 100644 --- a/internal/grpc/middleware/limithandler/middleware.go +++ b/internal/grpc/middleware/limithandler/middleware.go @@ -218,7 +218,7 @@ func WithConcurrencyLimiters(ctx context.Context) SetupFunc { result[limit.RPC] = limiter.NewConcurrencyLimiter( ctx, - limit.MaxPerRepo, + limiter.NewAdaptiveLimit("staticLimit", limiter.AdaptiveSetting{Initial: limit.MaxPerRepo}), limit.MaxQueueSize, newTickerFunc, limiter.NewPerRPCPromMonitor( @@ -233,7 +233,7 @@ func WithConcurrencyLimiters(ctx context.Context) SetupFunc { if _, ok := result[replicateRepositoryFullMethod]; !ok { result[replicateRepositoryFullMethod] = limiter.NewConcurrencyLimiter( ctx, - 1, + limiter.NewAdaptiveLimit("staticLimit", limiter.AdaptiveSetting{Initial: 1}), 0, func() helper.Ticker { return helper.NewManualTicker() diff --git a/internal/limiter/concurrency_limiter.go b/internal/limiter/concurrency_limiter.go index 32b08b32e..b4284adeb 100644 --- a/internal/limiter/concurrency_limiter.go +++ b/internal/limiter/concurrency_limiter.go @@ -41,10 +41,10 @@ type keyedConcurrencyLimiter struct { // concurrencyTokens is the channel of available concurrency tokens, where every token // allows one concurrent call to the concurrency-limited function. - concurrencyTokens chan struct{} + concurrencyTokens *resizableSemaphore // queueTokens is the channel of available queue tokens, where every token allows one // concurrent call to be admitted to the queue. - queueTokens chan struct{} + queueTokens *resizableSemaphore } // acquire tries to acquire the semaphore. It may fail if the admission queue is full or if the max @@ -55,34 +55,33 @@ func (sem *keyedConcurrencyLimiter) acquire(ctx context.Context, limitingKey str // callers may wait for the concurrency token at the same time. If there are no more // queueing tokens then this indicates that the queue is full and we thus return an // error immediately. - select { - case sem.queueTokens <- struct{}{}: - // We have acquired a queueing token, so we need to release it if acquiring - // the concurrency token fails. If we succeed to acquire the concurrency - // token though then we retain the queueing token until the caller signals - // that the concurrency-limited function has finished. As a consequence the - // queue token is returned together with the concurrency token. - // - // A simpler model would be to just have `maxQueueLength` many queueing - // tokens. But this would add concurrency-limiting when acquiring the queue - // token itself, which is not what we want to do. Instead, we want to admit - // as many callers into the queue as the queue length permits plus the - // number of available concurrency tokens allows. - defer func() { - if returnedErr != nil { - <-sem.queueTokens - } - }() - default: + if !sem.queueTokens.TryAcquire() { return ErrMaxQueueSize } + + // We have acquired a queueing token, so we need to release it if acquiring + // the concurrency token fails. If we successfully acquire the concurrency + // token though then we retain the queueing token until the caller signals + // that the concurrency-limited function has finished. As a consequence the + // queue token is returned together with the concurrency token. + // + // A simpler model would be to just have `maxQueueLength` many queueing + // tokens. But this would add concurrency-limiting when acquiring the queue + // token itself, which is not what we want to do. Instead, we want to admit + // as many callers into the queue as the queue length permits plus the + // number of available concurrency tokens allows. + defer func() { + if returnedErr != nil { + sem.queueTokens.Release() + } + }() } // We are queued now, so let's tell the monitor. Furthermore, even though we're still // holding the queueing token when this function exits successfully we also tell the monitor // that we have exited the queue. It is only an implementation detail anyway that we hold on // to the token, so the monitor shouldn't care about that. - sem.monitor.Queued(ctx, limitingKey, len(sem.queueTokens)) + sem.monitor.Queued(ctx, limitingKey, sem.queueLength()) defer sem.monitor.Dequeued(ctx) // Set up the ticker that keeps us from waiting indefinitely on the concurrency token. @@ -98,7 +97,12 @@ func (sem *keyedConcurrencyLimiter) acquire(ctx context.Context, limitingKey str // Try to acquire the concurrency token now that we're in the queue. select { - case sem.concurrencyTokens <- struct{}{}: + case acquired := <-sem.concurrencyTokens.Acquire(): + // When the semaphore returns false, the semaphore was stopped. It's likely due to the context is + // cancelled. Hence, we should return the error here. + if !acquired { + return sem.concurrencyTokens.Err() + } return nil case <-ticker.C(): return ErrMaxQueueTime @@ -110,14 +114,17 @@ func (sem *keyedConcurrencyLimiter) acquire(ctx context.Context, limitingKey str // release releases the acquired tokens. func (sem *keyedConcurrencyLimiter) release() { if sem.queueTokens != nil { - <-sem.queueTokens + sem.queueTokens.Release() } - <-sem.concurrencyTokens + sem.concurrencyTokens.Release() } // queueLength returns the length of token queue func (sem *keyedConcurrencyLimiter) queueLength() int { - return len(sem.queueTokens) + if sem.queueTokens == nil { + return 0 + } + return int(sem.queueTokens.Current()) } // ConcurrencyLimiter contains rate limiter state. @@ -125,9 +132,9 @@ type ConcurrencyLimiter struct { // ctx stores the context at initialization. This context is used as a stopping condition // for some internal goroutines. ctx context.Context - // maxConcurrencyLimit is the maximum number of concurrent calls to the limited function. - // This limit is per key. - maxConcurrencyLimit int64 + // limit is the adaptive maximum number of concurrent calls to the limited function. This limit is + // calculated adaptively from an outside calculator. + limit *AdaptiveLimit // maxQueueLength is the maximum number of operations allowed to wait in a queued state. // This limit is global and applies before the concurrency limit. Subsequent incoming // operations will be rejected with an error immediately. @@ -148,19 +155,31 @@ type ConcurrencyLimiter struct { } // NewConcurrencyLimiter creates a new concurrency rate limiter. -func NewConcurrencyLimiter(ctx context.Context, maxConcurrencyLimit, maxQueueLength int, maxQueuedTickerCreator QueueTickerCreator, monitor ConcurrencyMonitor) *ConcurrencyLimiter { +func NewConcurrencyLimiter(ctx context.Context, limit *AdaptiveLimit, maxQueueLength int, maxQueuedTickerCreator QueueTickerCreator, monitor ConcurrencyMonitor) *ConcurrencyLimiter { if monitor == nil { monitor = NewNoopConcurrencyMonitor() } - return &ConcurrencyLimiter{ + limiter := &ConcurrencyLimiter{ ctx: ctx, - maxConcurrencyLimit: int64(maxConcurrencyLimit), + limit: limit, maxQueueLength: int64(maxQueueLength), maxQueuedTickerCreator: maxQueuedTickerCreator, monitor: monitor, limitsByKey: make(map[string]*keyedConcurrencyLimiter), } + + // When the capacity of the limiter is updated we also need to update the size of both the queuing tokens as + // well as the concurrency tokens to match the new size. + limit.AfterUpdate(func(val int) { + for _, keyedLimiter := range limiter.limitsByKey { + if keyedLimiter.queueTokens != nil { + keyedLimiter.queueTokens.Resize(int64(val) + limiter.maxQueueLength) + } + keyedLimiter.concurrencyTokens.Resize(int64(val)) + } + }) + return limiter } // Limit will limit the concurrency of the limited function f. There are two distinct mechanisms @@ -180,7 +199,7 @@ func (c *ConcurrencyLimiter) Limit(ctx context.Context, limitingKey string, f Li ) defer span.Finish() - if c.maxConcurrencyLimit <= 0 { + if c.currentLimit() <= 0 { return f() } @@ -226,15 +245,15 @@ func (c *ConcurrencyLimiter) getConcurrencyLimit(limitingKey string) *keyedConcu // Set up the queue tokens in case a maximum queue length was requested. As the // queue tokens are kept during the whole lifetime of the concurrency-limited // function we add the concurrency tokens to the number of available token. - var queueTokens chan struct{} + var queueTokens *resizableSemaphore if c.maxQueueLength > 0 { - queueTokens = make(chan struct{}, c.maxConcurrencyLimit+c.maxQueueLength) + queueTokens = NewResizableSemaphore(c.ctx, c.currentLimit()+c.maxQueueLength) } c.limitsByKey[limitingKey] = &keyedConcurrencyLimiter{ monitor: c.monitor, maxQueuedTickerCreator: c.maxQueuedTickerCreator, - concurrencyTokens: make(chan struct{}, c.maxConcurrencyLimit), + concurrencyTokens: NewResizableSemaphore(c.ctx, c.currentLimit()), queueTokens: queueTokens, } } @@ -272,3 +291,7 @@ func (c *ConcurrencyLimiter) countSemaphores() int { return len(c.limitsByKey) } + +func (c *ConcurrencyLimiter) currentLimit() int64 { + return int64(c.limit.Current()) +} diff --git a/internal/limiter/concurrency_limiter_test.go b/internal/limiter/concurrency_limiter_test.go index b650ae5f5..6ba48d243 100644 --- a/internal/limiter/concurrency_limiter_test.go +++ b/internal/limiter/concurrency_limiter_test.go @@ -3,6 +3,7 @@ package limiter import ( "context" "errors" + "fmt" "strconv" "sync" "testing" @@ -86,7 +87,7 @@ func (c *counter) Dropped(_ context.Context, _ string, _ int, _ time.Duration, r } } -func TestLimiter(t *testing.T) { +func TestLimiter_static(t *testing.T) { t.Parallel() tests := []struct { @@ -156,7 +157,7 @@ func TestLimiter(t *testing.T) { limiter := NewConcurrencyLimiter( ctx, - tt.maxConcurrency, + NewAdaptiveLimit("staticLimit", AdaptiveSetting{Initial: tt.maxConcurrency}), 0, nil, gauge, @@ -230,6 +231,557 @@ func TestLimiter(t *testing.T) { } } +func TestLimiter_dynamic(t *testing.T) { + t.Parallel() + + t.Run("increase dynamic limit when there is no queuing request", func(t *testing.T) { + ctx := testhelper.Context(t) + limit := NewAdaptiveLimit("dynamicLimit", AdaptiveSetting{Initial: 5, Max: 10, Min: 1}) + gauge := &blockingQueueCounter{queuedCh: make(chan struct{})} + limiter := NewConcurrencyLimiter(ctx, limit, 10, nil, gauge) + + // 5 requests acquired the tokens, the limiter is full now + release1, waitAfterRelease1 := spawnAndWaitAcquired(t, ctx, "1", limiter, gauge, 5) + require.Equal(t, 5, gauge.enter) + + // Update the limit to 7 + limit.Update(7) + + // 2 more requests acquired the token. This proves the limit is expanded + release2, waitAfterRelease2 := spawnAndWaitAcquired(t, ctx, "1", limiter, gauge, 2) + require.Equal(t, 7, gauge.enter) + + close(release1) + close(release2) + waitAfterRelease1() + waitAfterRelease2() + require.Equal(t, 7, gauge.exit) + }) + + t.Run("decrease dynamic limit when there is no queuing request", func(t *testing.T) { + ctx := testhelper.Context(t) + limit := NewAdaptiveLimit("dynamicLimit", AdaptiveSetting{Initial: 5, Max: 10, Min: 1}) + gauge := &blockingQueueCounter{queuedCh: make(chan struct{})} + limiter := NewConcurrencyLimiter(ctx, limit, 10, nil, gauge) + + // 3 requests acquired the tokens, 2 slots left + release1, waitAfterRelease1 := spawnAndWaitAcquired(t, ctx, "1", limiter, gauge, 3) + require.Equal(t, 3, gauge.enter) + require.Equal(t, 3, gauge.queued) + + // Update the limit to 3 + limit.Update(3) + + // 2 requests are put in queue, meaning the limit shrinks down + waitAcquired2, release2, waitAfterRelease2 := spawnAndWaitQueued(t, ctx, "1", limiter, gauge, 2) + require.Equal(t, 3, gauge.enter) + require.Equal(t, 5, gauge.queued) + + // Release first 3 requests + close(release1) + waitAfterRelease1() + + // Now the last 2 requests can acquire token + waitAcquired2() + require.Equal(t, 5, gauge.enter) + require.Equal(t, 5, gauge.queued) + require.Equal(t, 3, gauge.exit) + + // Release the last patch + close(release2) + waitAfterRelease2() + require.Equal(t, 5, gauge.exit) + }) + + t.Run("increase dynamic limit more than the number of queuing requests", func(t *testing.T) { + ctx := testhelper.Context(t) + limit := NewAdaptiveLimit("dynamicLimit", AdaptiveSetting{Initial: 5, Max: 10, Min: 1}) + gauge := &blockingQueueCounter{queuedCh: make(chan struct{})} + limiter := NewConcurrencyLimiter(ctx, limit, 10, nil, gauge) + + // 5 requests acquired the tokens, the limiter is full now + release1, waitAfterRelease1 := spawnAndWaitAcquired(t, ctx, "1", limiter, gauge, 5) + require.Equal(t, 5, gauge.enter) + + // 2 requests waiting in the queue + waitAcquired2, release2, waitAfterRelease2 := spawnAndWaitQueued(t, ctx, "1", limiter, gauge, 2) + require.Equal(t, 5, gauge.enter) + require.Equal(t, 7, gauge.queued) + + // Update the limit to 7 + limit.Update(7) + + // Wait for the other 2 requests acquired the token. This proves the limiter is expanded + waitAcquired2() + require.Equal(t, 7, gauge.enter) + + close(release1) + close(release2) + waitAfterRelease1() + waitAfterRelease2() + require.Equal(t, 7, gauge.exit) + }) + + t.Run("increase dynamic limit less than the number of queuing requests", func(t *testing.T) { + ctx := testhelper.Context(t) + limit := NewAdaptiveLimit("dynamicLimit", AdaptiveSetting{Initial: 5, Max: 10, Min: 1}) + gauge := &blockingQueueCounter{queuedCh: make(chan struct{})} + limiter := NewConcurrencyLimiter(ctx, limit, 10, nil, gauge) + + // 5 requests acquired the tokens, the limiter is full now + release1, waitAfterRelease1 := spawnAndWaitAcquired(t, ctx, "1", limiter, gauge, 5) + require.Equal(t, 5, gauge.enter) + + // 2 requests waiting in the queue + waitAcquired2, release2, waitAfterRelease2 := spawnAndWaitQueued(t, ctx, "1", limiter, gauge, 2) + require.Equal(t, 5, gauge.enter) + require.Equal(t, 7, gauge.queued) + + // 5 more requests waiting in the queue + waitAcquired3, release3, waitAfterRelease3 := spawnAndWaitQueued(t, ctx, "1", limiter, gauge, 5) + require.Equal(t, 5, gauge.enter) + require.Equal(t, 12, gauge.queued) + + // Update the limit to 7. + limit.Update(7) + + // Release first 5 requests, all requests should fit in the queue now. + close(release1) + waitAfterRelease1() + require.Equal(t, 5, gauge.exit) + + waitAcquired2() + waitAcquired3() + require.Equal(t, 12, gauge.enter) + + // Now release all requests + close(release2) + close(release3) + waitAfterRelease2() + waitAfterRelease3() + require.Equal(t, 12, gauge.exit) + }) + + t.Run("decrease dynamic limit less than the number of concurrent requests", func(t *testing.T) { + ctx := testhelper.Context(t) + limit := NewAdaptiveLimit("dynamicLimit", AdaptiveSetting{Initial: 5, Max: 10, Min: 1}) + gauge := &blockingQueueCounter{queuedCh: make(chan struct{})} + limiter := NewConcurrencyLimiter(ctx, limit, 10, nil, gauge) + + // 5 requests acquired the tokens, the limiter is full now + release1, waitAfterRelease1 := spawnAndWaitAcquired(t, ctx, "1", limiter, gauge, 5) + require.Equal(t, 5, gauge.enter) + + // Update the limit to 3 + limit.Update(3) + + // 3 requests are put in queue + waitAcquired2, release2, waitAfterRelease2 := spawnAndWaitQueued(t, ctx, "1", limiter, gauge, 3) + require.Equal(t, 5, gauge.enter) + require.Equal(t, 8, gauge.queued) + + // Release the first 5 requests + close(release1) + waitAfterRelease1() + require.Equal(t, 5, gauge.exit) + + // Now the last 3 requests acquire the tokens + waitAcquired2() + require.Equal(t, 8, gauge.enter) + + // 1 more request is put in queue, meaning the limit shrinks down to 3. + waitAcquired3, release3, waitAfterRelease3 := spawnAndWaitQueued(t, ctx, "1", limiter, gauge, 1) + require.Equal(t, 8, gauge.enter) + require.Equal(t, 9, gauge.queued) + + // Release the second 3 requests + close(release2) + waitAfterRelease2() + require.Equal(t, 8, gauge.exit) + + // The last request acquires the token + waitAcquired3() + require.Equal(t, 9, gauge.enter) + + // Release the last request + close(release3) + waitAfterRelease3() + require.Equal(t, 9, gauge.exit) + }) + + t.Run("increase and decrease dynamic limit multiple times", func(t *testing.T) { + ctx := testhelper.Context(t) + limit := NewAdaptiveLimit("dynamicLimit", AdaptiveSetting{Initial: 5, Max: 10, Min: 1}) + gauge := &blockingQueueCounter{queuedCh: make(chan struct{})} + limiter := NewConcurrencyLimiter(ctx, limit, 10, nil, gauge) + + // Update the limit to 7 + limit.Update(7) + + // 5 requests acquired the tokens + release1, waitAfterRelease1 := spawnAndWaitAcquired(t, ctx, "1", limiter, gauge, 5) + require.Equal(t, 5, gauge.enter) + + // Update the limit to 3 + limit.Update(3) + + // 3 requests are put in queue + waitAcquired2, release2, waitAfterRelease2 := spawnAndWaitQueued(t, ctx, "1", limiter, gauge, 3) + require.Equal(t, 5, gauge.enter) + require.Equal(t, 8, gauge.queued) + + // Update the limit to 10 + limit.Update(10) + + // All existing requests acquire the tokens + waitAcquired2() + require.Equal(t, 8, gauge.enter) + + // 2 more requests + release3, waitAfterRelease3 := spawnAndWaitAcquired(t, ctx, "1", limiter, gauge, 2) + require.Equal(t, 10, gauge.enter) + + // Update the limit to 1 + limit.Update(1) + + // Now release all of them + close(release1) + waitAfterRelease1() + require.Equal(t, 5, gauge.exit) + + close(release2) + waitAfterRelease2() + require.Equal(t, 8, gauge.exit) + + close(release3) + waitAfterRelease3() + require.Equal(t, 10, gauge.exit) + }) + + t.Run("increase the limit when the queue is full", func(t *testing.T) { + ctx := testhelper.Context(t) + limit := NewAdaptiveLimit("dynamicLimit", AdaptiveSetting{Initial: 1, Max: 10, Min: 1}) + gauge := &blockingQueueCounter{queuedCh: make(chan struct{})} + // Mind the queue length here + limiter := NewConcurrencyLimiter(ctx, limit, 5, nil, gauge) + + // 1 requests acquired the tokens, the limiter is full now + release1, waitAfterRelease1 := spawnAndWaitAcquired(t, ctx, "1", limiter, gauge, 1) + require.Equal(t, 1, gauge.enter) + require.Equal(t, 1, gauge.queued) + + // 5 requests queuing for the tokens, the queue is full now + waitAcquired2, release2, waitAfterRelease2 := spawnAndWaitQueued(t, ctx, "1", limiter, gauge, 5) + require.Equal(t, 1, gauge.enter) + require.Equal(t, 6, gauge.queued) + + // Limiter rejects new request + maximumQueueSizeReached(t, ctx, "1", limiter) + + // Update the limit + limit.Update(6) + waitAcquired2() + require.Equal(t, 6, gauge.enter) + + // 5 requests queuing for the tokens, the queue is full now + waitAcquired3, release3, waitAfterRelease3 := spawnAndWaitQueued(t, ctx, "1", limiter, gauge, 5) + require.Equal(t, 6, gauge.enter) + require.Equal(t, 11, gauge.queued) + + // Limiter rejects new request + maximumQueueSizeReached(t, ctx, "1", limiter) + + // Clean up + close(release1) + close(release2) + waitAfterRelease1() + waitAfterRelease2() + require.Equal(t, 6, gauge.exit) + + waitAcquired3() + require.Equal(t, 11, gauge.enter) + close(release3) + waitAfterRelease3() + }) + + t.Run("decrease the limit when the queue is full", func(t *testing.T) { + ctx := testhelper.Context(t) + limit := NewAdaptiveLimit("dynamicLimit", AdaptiveSetting{Initial: 5, Max: 10, Min: 1}) + gauge := &blockingQueueCounter{queuedCh: make(chan struct{})} + // Mind the queue length here + limiter := NewConcurrencyLimiter(ctx, limit, 3, nil, gauge) + + // 5 requests acquired the tokens, the limiter is full now + release1, waitAfterRelease1 := spawnAndWaitAcquired(t, ctx, "1", limiter, gauge, 5) + require.Equal(t, 5, gauge.enter) + require.Equal(t, 5, gauge.queued) + + // 5 requests queuing for the tokens, the queue is full now + waitAcquired2, release2, waitAfterRelease2 := spawnAndWaitQueued(t, ctx, "1", limiter, gauge, 3) + require.Equal(t, 5, gauge.enter) + require.Equal(t, 8, gauge.queued) + + // Limiter rejects new request + maximumQueueSizeReached(t, ctx, "1", limiter) + + // Update the limit. + limit.Update(3) + + // The queue is still full + maximumQueueSizeReached(t, ctx, "1", limiter) + + // Release first 5 requests and let the last 3 requests in + close(release1) + waitAfterRelease1() + require.Equal(t, 5, gauge.exit) + waitAcquired2() + require.Equal(t, 8, gauge.enter) + + // Another 5 requests in queue. The queue is still full, meaning the concurrency is 3 and the queue is still 5. + waitAcquired3, release3, waitAfterRelease3 := spawnAndWaitQueued(t, ctx, "1", limiter, gauge, 3) + require.Equal(t, 8, gauge.enter) + require.Equal(t, 11, gauge.queued) + maximumQueueSizeReached(t, ctx, "1", limiter) + + // Clean up + close(release2) + waitAfterRelease2() + require.Equal(t, 8, gauge.exit) + waitAcquired3() + require.Equal(t, 11, gauge.enter) + close(release3) + waitAfterRelease3() + require.Equal(t, 11, gauge.exit) + }) + + t.Run("dynamic limit works without queuing", func(t *testing.T) { + ctx := testhelper.Context(t) + limit := NewAdaptiveLimit("dynamicLimit", AdaptiveSetting{Initial: 5, Max: 10, Min: 1}) + gauge := &blockingQueueCounter{queuedCh: make(chan struct{})} + // No queue, it means the limiter accepts unlimited requests + limiter := NewConcurrencyLimiter(ctx, limit, 0, nil, gauge) + + // 5 requests acquired the tokens, the limiter is full now + release1, waitAfterRelease1 := spawnAndWaitAcquired(t, ctx, "1", limiter, gauge, 5) + require.Equal(t, 5, gauge.enter) + require.Equal(t, 5, gauge.queued) + + // 5 more requests + waitAcquired2, release2, waitAfterRelease2 := spawnAndWaitQueued(t, ctx, "1", limiter, gauge, 5) + + // Update the limit. + limit.Update(10) + + // All of them acquired the tokens + waitAcquired2() + require.Equal(t, 10, gauge.enter) + + // Clean up + close(release1) + close(release2) + waitAfterRelease1() + waitAfterRelease2() + require.Equal(t, 10, gauge.exit) + }) + + t.Run("dynamic limit works with queue timer", func(t *testing.T) { + ctx := testhelper.Context(t) + limit := NewAdaptiveLimit("dynamicLimit", AdaptiveSetting{Initial: 5, Max: 10, Min: 1}) + gauge := &blockingQueueCounter{queuedCh: make(chan struct{})} + + ticker := helper.NewManualTicker() + limiter := NewConcurrencyLimiter(ctx, limit, 0, func() helper.Ticker { return ticker }, gauge) + + // 5 requests acquired the tokens, the limiter is full now + release1, waitAfterRelease1 := spawnAndWaitAcquired(t, ctx, "1", limiter, gauge, 5) + require.Equal(t, 5, gauge.enter) + require.Equal(t, 5, gauge.queued) + + errors := make(chan error, 10) + // 5 requests in queue + spawnQueuedAndCollectErrors(ctx, "1", limiter, gauge, 5, errors) + + // Decrease the limit + limit.Update(3) + + // 5 more requests in queue + spawnQueuedAndCollectErrors(ctx, "1", limiter, gauge, 5, errors) + + // Trigger timeout event + for i := 0; i < 10; i++ { + ticker.Tick() + require.EqualError(t, <-errors, "maximum time in concurrency queue reached") + } + + // Other goroutines exit as normal + close(release1) + waitAfterRelease1() + require.Equal(t, 5, gauge.exit) + }) + + t.Run("dynamic limit works with context cancellation", func(t *testing.T) { + ctx := testhelper.Context(t) + ctx2, cancel := context.WithCancel(testhelper.Context(t)) + + limit := NewAdaptiveLimit("dynamicLimit", AdaptiveSetting{Initial: 5, Max: 10, Min: 1}) + gauge := &blockingQueueCounter{queuedCh: make(chan struct{})} + + limiter := NewConcurrencyLimiter(ctx, limit, 0, nil, gauge) + + // 5 requests acquired the tokens, the limiter is full now + release1, waitAfterRelease1 := spawnAndWaitAcquired(t, ctx, "1", limiter, gauge, 5) + require.Equal(t, 5, gauge.enter) + require.Equal(t, 5, gauge.queued) + + errors := make(chan error, 10) + // 5 requests in queue + spawnQueuedAndCollectErrors(ctx2, "1", limiter, gauge, 5, errors) + + // Decrease the limit + limit.Update(3) + + // 5 more requests in queue + spawnQueuedAndCollectErrors(ctx2, "1", limiter, gauge, 5, errors) + + // Trigger context cancellation + cancel() + for i := 0; i < 10; i++ { + require.EqualError(t, <-errors, "unexpected error when dequeueing request: context canceled") + } + + // Other goroutines exit as normal + close(release1) + waitAfterRelease1() + require.Equal(t, 5, gauge.exit) + }) + + t.Run("dynamic limit works with multiple buckets", func(t *testing.T) { + ctx := testhelper.Context(t) + limit := NewAdaptiveLimit("dynamicLimit", AdaptiveSetting{Initial: 5, Max: 10, Min: 1}) + gauge := &blockingQueueCounter{queuedCh: make(chan struct{})} + + limiter := NewConcurrencyLimiter(ctx, limit, 5, nil, gauge) + + var releaseChans []chan struct{} + var waitAcquireFuncs, waitReleaseFuncs []func() + + // 5 * 5 requests acquired tokens + for i := 1; i <= 5; i++ { + release, waitAfterRelease := spawnAndWaitAcquired(t, ctx, fmt.Sprintf("%d", i), limiter, gauge, 5) + releaseChans = append(releaseChans, release) + waitReleaseFuncs = append(waitReleaseFuncs, waitAfterRelease) + } + require.Equal(t, 25, gauge.enter) + + // 1 + 2 + 3 + 4 + 5 requests are in queue + for i := 1; i <= 5; i++ { + waitAcquired, release, waitAfterRelease := spawnAndWaitQueued(t, ctx, fmt.Sprintf("%d", i), limiter, gauge, i) + waitAcquireFuncs = append(waitAcquireFuncs, waitAcquired) + releaseChans = append(releaseChans, release) + waitReleaseFuncs = append(waitReleaseFuncs, waitAfterRelease) + } + require.Equal(t, 25, gauge.enter) + require.Equal(t, 40, gauge.queued) + + // Update limit, enough for all requests + limit.Update(10) + + // All requests acquired tokens now + for _, wait := range waitAcquireFuncs { + wait() + } + require.Equal(t, 40, gauge.enter) + + // Release all + for _, release := range releaseChans { + close(release) + } + for _, wait := range waitReleaseFuncs { + wait() + } + require.Equal(t, 40, gauge.exit) + }) +} + +// spawnAndWaitAcquired spawns N goroutines that wait for the limiter. They wait until all of them acquire the limiter +// token before exiting. This function returns a channel to control token release and a function to wait until all +// goroutines finish. +func spawnAndWaitAcquired(t *testing.T, ctx context.Context, bucket string, limiter *ConcurrencyLimiter, gauge *blockingQueueCounter, n int) (chan struct{}, func()) { + var acquireWg, releaseWg sync.WaitGroup + release := make(chan struct{}) + + for i := 0; i < n; i++ { + acquireWg.Add(1) + releaseWg.Add(1) + go func() { + defer releaseWg.Done() + _, err := limiter.Limit(ctx, bucket, func() (resp interface{}, err error) { + acquireWg.Done() + <-release + return nil, nil + }) + require.NoError(t, err) + }() + } + for i := 0; i < n; i++ { + <-gauge.queuedCh + gauge.queued++ + } + acquireWg.Wait() + + return release, releaseWg.Wait +} + +// spawnAndWaitQueued spawns N goroutines that wait for the limiter. They wait until all of them are queued. This +// function returns a function to wait for channel to acquired the token, a channel to control token release, and a +// function to wait until all goroutines finish. +func spawnAndWaitQueued(t *testing.T, ctx context.Context, bucket string, limiter *ConcurrencyLimiter, gauge *blockingQueueCounter, n int) (func(), chan struct{}, func()) { + var acquireWg, releaseWg sync.WaitGroup + release := make(chan struct{}) + + for i := 0; i < n; i++ { + acquireWg.Add(1) + releaseWg.Add(1) + go func() { + defer releaseWg.Done() + _, err := limiter.Limit(ctx, bucket, func() (resp interface{}, err error) { + acquireWg.Done() + <-release + return nil, nil + }) + require.NoError(t, err) + }() + } + for i := 0; i < n; i++ { + <-gauge.queuedCh + gauge.queued++ + } + + return acquireWg.Wait, release, releaseWg.Wait +} + +func spawnQueuedAndCollectErrors(ctx context.Context, bucket string, limiter *ConcurrencyLimiter, gauge *blockingQueueCounter, n int, errors chan error) { + for i := 0; i < n; i++ { + go func() { + _, err := limiter.Limit(ctx, bucket, func() (interface{}, error) { + return nil, fmt.Errorf("should not call") + }) + errors <- err + }() + } + for i := 0; i < n; i++ { + <-gauge.queuedCh + gauge.queued++ + } +} + +func maximumQueueSizeReached(t *testing.T, ctx context.Context, bucket string, limiter *ConcurrencyLimiter) { + _, err := limiter.Limit(ctx, bucket, func() (interface{}, error) { + return nil, fmt.Errorf("should not call") + }) + require.EqualError(t, err, "maximum queue size reached") +} + type blockingQueueCounter struct { counter @@ -249,7 +801,7 @@ func TestConcurrencyLimiter_queueLimit(t *testing.T) { monitorCh := make(chan struct{}) monitor := &blockingQueueCounter{queuedCh: monitorCh} ch := make(chan struct{}) - limiter := NewConcurrencyLimiter(ctx, 1, queueLimit, nil, monitor) + limiter := NewConcurrencyLimiter(ctx, NewAdaptiveLimit("staticLimit", AdaptiveSetting{Initial: 1}), queueLimit, nil, monitor) // occupied with one live request that takes a long time to complete go func() { @@ -335,7 +887,7 @@ func TestLimitConcurrency_queueWaitTime(t *testing.T) { limiter := NewConcurrencyLimiter( ctx, - 1, + NewAdaptiveLimit("staticLimit", AdaptiveSetting{Initial: 1}), 0, func() helper.Ticker { return ticker diff --git a/internal/testhelper/testserver/gitaly.go b/internal/testhelper/testserver/gitaly.go index cf76158b8..6352fcfe0 100644 --- a/internal/testhelper/testserver/gitaly.go +++ b/internal/testhelper/testserver/gitaly.go @@ -329,7 +329,7 @@ func (gsd *gitalyServerDeps) createDependencies(tb testing.TB, ctx context.Conte if gsd.packObjectsLimiter == nil { gsd.packObjectsLimiter = limiter.NewConcurrencyLimiter( ctx, - 0, + limiter.NewAdaptiveLimit("staticLimit", limiter.AdaptiveSetting{Initial: 0}), 0, nil, limiter.NewNoopConcurrencyMonitor(), |