diff options
author | Toon Claes <toon@gitlab.com> | 2023-01-09 14:19:01 +0300 |
---|---|---|
committer | Toon Claes <toon@gitlab.com> | 2023-01-09 14:19:01 +0300 |
commit | f426025daeb6b1f426d1f7f18ca69bb23ee5b7c7 (patch) | |
tree | 81b61a0e054f39cd07487696289f3ae792104f02 | |
parent | 22783620ad85d2aa4b5e6f628231d9bd8ca341d8 (diff) | |
parent | ee7c6acd586d74548138f5b7c8e89328621d2c71 (diff) |
Merge branch 'pks-limithandler-fix-queueing' into 'master'
limithandler: Fix queueing mechanism in the concurrency limiter
Closes #4411, #4697, and #4625
See merge request https://gitlab.com/gitlab-org/gitaly/-/merge_requests/5236
Merged-by: Toon Claes <toon@gitlab.com>
Approved-by: Quang-Minh Nguyen <qmnguyen@gitlab.com>
Approved-by: Steve Azzopardi <sazzopardi@gitlab.com>
Approved-by: Toon Claes <toon@gitlab.com>
Reviewed-by: Steve Azzopardi <sazzopardi@gitlab.com>
Reviewed-by: Toon Claes <toon@gitlab.com>
Reviewed-by: Patrick Steinhardt <psteinhardt@gitlab.com>
Co-authored-by: Patrick Steinhardt <psteinhardt@gitlab.com>
-rw-r--r-- | doc/backpressure.md | 8 | ||||
-rw-r--r-- | internal/middleware/limithandler/concurrency_limiter.go | 160 |
2 files changed, 90 insertions, 78 deletions
diff --git a/doc/backpressure.md b/doc/backpressure.md index 71ae4fef0..f5110f8d8 100644 --- a/doc/backpressure.md +++ b/doc/backpressure.md @@ -38,9 +38,9 @@ configuration can prevent an unbounded in-memory queue of requests: - `max_queue_wait` is the maximum amount of time a request can wait in the concurrency queue. When a request waits longer than this time, it returns an error to the client. -- `max_queue_size` is the maximum size the concurrency queue can grow for a given - RPC. If a concurrency queue is at its maximum, subsequent requests - return with an error. +- `max_queue_size` is the maximum size the concurrency queue can grow for a + given RPC. If a concurrency queue is at its maximum, subsequent requests + return with an error. The queue size is per repository. For example: @@ -75,7 +75,7 @@ In the above configuration, the `token bucket` has a capacity of 1 and gets refilled every minute. This means that Gitaly only accepts 1 `RepackFull` request per repository each minute. -Requests that come in after the `token bucket` is full (and before it is +Requests that come in after the `token bucket` is full (and before it is replenished) are rejected with an error. ## Errors diff --git a/internal/middleware/limithandler/concurrency_limiter.go b/internal/middleware/limithandler/concurrency_limiter.go index 0267dc3c1..b080ee658 100644 --- a/internal/middleware/limithandler/concurrency_limiter.go +++ b/internal/middleware/limithandler/concurrency_limiter.go @@ -28,18 +28,59 @@ type QueueTickerCreator func() helper.Ticker // keyedConcurrencyLimiter is a concurrency limiter that applies to a specific keyed resource. type keyedConcurrencyLimiter struct { - // tokens is the channel of available tokens, where every token allows one concurrent call - // to the concurrency-limited function. - tokens chan struct{} refcount int + monitor ConcurrencyMonitor maxQueuedTickerCreator QueueTickerCreator + + // concurrencyTokens is the channel of available concurrency tokens, where every token + // allows one concurrent call to the concurrency-limited function. + concurrencyTokens chan struct{} + // queueTokens is the channel of available queue tokens, where every token allows one + // concurrent call to be admitted to the queue. + queueTokens chan struct{} } -// acquire tries to acquire the semaphore. It may fail if there is a max queue-time ticker that -// ticks before acquiring a token. -func (sem *keyedConcurrencyLimiter) acquire(ctx context.Context) error { - var ticker helper.Ticker +// acquire tries to acquire the semaphore. It may fail if the admission queue is full or if the max +// queue-time ticker ticks before acquiring a concurrency token. +func (sem *keyedConcurrencyLimiter) acquire(ctx context.Context) (returnedErr error) { + if sem.queueTokens != nil { + // Try to acquire the queueing token. The queueing token is used to control how many + // callers may wait for the concurrency token at the same time. If there are no more + // queueing tokens then this indicates that the queue is full and we thus return an + // error immediately. + select { + case sem.queueTokens <- struct{}{}: + // We have acquired a queueing token, so we need to release it if acquiring + // the concurrency token fails. If we succeed to acquire the concurrency + // token though then we retain the queueing token until the caller signals + // that the concurrency-limited function has finished. As a consequence the + // queue token is returned together with the concurrency token. + // + // A simpler model would be to just have `maxQueueLength` many queueing + // tokens. But this would add concurrency-limiting when acquiring the queue + // token itself, which is not what we want to do. Instead, we want to admit + // as many callers into the queue as the queue length permits plus the + // number of available concurrency tokens allows. + defer func() { + if returnedErr != nil { + <-sem.queueTokens + } + }() + default: + sem.monitor.Dropped(ctx, "max_size") + return ErrMaxQueueSize + } + } + + // We are queued now, so let's tell the monitor. Furthermore, even though we're still + // holding the queueing token when this function exits successfully we also tell the monitor + // that we have exited the queue. It is only an implemenation detail anyway that we hold on + // to the token, so the monitor shouldn't care about that. + sem.monitor.Queued(ctx) + defer sem.monitor.Dequeued(ctx) + // Set up the ticker that keeps us from waiting indefinitely on the concurrency token. + var ticker helper.Ticker if sem.maxQueuedTickerCreator != nil { ticker = sem.maxQueuedTickerCreator() } else { @@ -49,20 +90,27 @@ func (sem *keyedConcurrencyLimiter) acquire(ctx context.Context) error { defer ticker.Stop() ticker.Reset() + // Try to acquire the concurrency token now that we're in the queue. select { - case sem.tokens <- struct{}{}: + case sem.concurrencyTokens <- struct{}{}: return nil case <-ticker.C(): + sem.monitor.Dropped(ctx, "max_time") return ErrMaxQueueTime case <-ctx.Done(): return ctx.Err() } } -// release releases the semaphore. -func (sem *keyedConcurrencyLimiter) release() { <-sem.tokens } +// release releases the acquired tokens. +func (sem *keyedConcurrencyLimiter) release() { + if sem.queueTokens != nil { + <-sem.queueTokens + } + <-sem.concurrencyTokens +} -// ConcurrencyLimiter contains rate limiter state +// ConcurrencyLimiter contains rate limiter state. type ConcurrencyLimiter struct { // maxConcurrencyLimit is the maximum number of concurrent calls to the limited function. // This limit is per key. @@ -80,8 +128,6 @@ type ConcurrencyLimiter struct { monitor ConcurrencyMonitor m sync.RWMutex - // queued tracks the current number of operations waiting in the admission queue. - queued int64 // limitsByKey tracks all concurrency limits per key. Its per-key entries are lazily created // and will get evicted once there are no concurrency-limited calls for any such key // anymore. @@ -106,9 +152,9 @@ func NewConcurrencyLimiter(maxConcurrencyLimit, maxQueueLength int, maxQueuedTic // Limit will limit the concurrency of the limited function f. There are two distinct mechanisms // that limit execution of the function: // -// 1. First, every call will enter the queue. This queue is global across all keys and limits how -// many callers may try to acquire their per-key semaphore at the same time. If the queue is full -// the caller will be rejected. +// 1. First, every call will enter the per-key queue. This queue limits how many callers may try to +// acquire their per-key semaphore at the same time. If the queue is full the caller will be +// rejected. // 2. Second, when the caller has successfully entered the queue, they try to acquire their per-key // semaphore. If this takes longer than the maximum queueing limit then the caller will be // dequeued and gets an error. @@ -117,52 +163,32 @@ func (c *ConcurrencyLimiter) Limit(ctx context.Context, limitingKey string, f Li return f() } - var decremented bool - - log := ctxlogrus.Extract(ctx).WithField("limiting_key", limitingKey) - if err := c.queueInc(ctx); err != nil { - if errors.Is(err, ErrMaxQueueSize) { - return nil, structerr.NewResourceExhausted("%w", ErrMaxQueueSize).WithDetail( - &gitalypb.LimitError{ - ErrorMessage: err.Error(), - RetryAfter: durationpb.New(0), - }, - ) - } - - log.WithError(err).Error("unexpected error when queueing request") - return nil, err - } - defer c.queueDec(&decremented) - - start := time.Now() - c.monitor.Queued(ctx) - sem := c.getConcurrencyLimit(limitingKey) defer c.putConcurrencyLimit(limitingKey) - err := sem.acquire(ctx) - c.queueDec(&decremented) - - c.monitor.Dequeued(ctx) - if err != nil { - if errors.Is(err, ErrMaxQueueTime) { - c.monitor.Dropped(ctx, "max_time") + start := time.Now() + if err := sem.acquire(ctx); err != nil { + switch err { + case ErrMaxQueueSize: + return nil, structerr.NewResourceExhausted("%w", ErrMaxQueueSize).WithDetail(&gitalypb.LimitError{ + ErrorMessage: err.Error(), + RetryAfter: durationpb.New(0), + }) + case ErrMaxQueueTime: return nil, structerr.NewResourceExhausted("%w", ErrMaxQueueTime).WithDetail(&gitalypb.LimitError{ ErrorMessage: err.Error(), RetryAfter: durationpb.New(0), }) + default: + ctxlogrus.Extract(ctx).WithField("limiting_key", limitingKey).WithError(err).Error("unexpected error when dequeueing request") + return nil, err } - - log.WithError(err).Error("unexpected error when dequeueing request") - return nil, err } defer sem.release() c.monitor.Enter(ctx, time.Since(start)) defer c.monitor.Exit(ctx) - return f() } @@ -173,13 +199,24 @@ func (c *ConcurrencyLimiter) getConcurrencyLimit(limitingKey string) *keyedConcu defer c.m.Unlock() if c.limitsByKey[limitingKey] == nil { + // Set up the queue tokens in case a maximum queue length was requested. As the + // queue tokens are kept during the whole lifetime of the concurrency-limited + // function we add the concurrency tokens to the number of available token. + var queueTokens chan struct{} + if c.maxQueueLength > 0 { + queueTokens = make(chan struct{}, c.maxConcurrencyLimit+c.maxQueueLength) + } + c.limitsByKey[limitingKey] = &keyedConcurrencyLimiter{ - tokens: make(chan struct{}, c.maxConcurrencyLimit), + monitor: c.monitor, maxQueuedTickerCreator: c.maxQueuedTickerCreator, + concurrencyTokens: make(chan struct{}, c.maxConcurrencyLimit), + queueTokens: queueTokens, } } c.limitsByKey[limitingKey].refcount++ + return c.limitsByKey[limitingKey] } @@ -212,31 +249,6 @@ func (c *ConcurrencyLimiter) countSemaphores() int { return len(c.limitsByKey) } -func (c *ConcurrencyLimiter) queueInc(ctx context.Context) error { - c.m.Lock() - defer c.m.Unlock() - - if c.maxQueueLength > 0 && - c.queued >= c.maxQueueLength { - c.monitor.Dropped(ctx, "max_size") - return ErrMaxQueueSize - } - - c.queued++ - return nil -} - -func (c *ConcurrencyLimiter) queueDec(decremented *bool) { - if decremented == nil || *decremented { - return - } - *decremented = true - c.m.Lock() - defer c.m.Unlock() - - c.queued-- -} - // WithConcurrencyLimiters sets up middleware to limit the concurrency of // requests based on RPC and repository func WithConcurrencyLimiters(cfg config.Cfg, middleware *LimiterMiddleware) { |