diff options
author | Patrick Steinhardt <psteinhardt@gitlab.com> | 2023-01-05 13:29:47 +0300 |
---|---|---|
committer | Patrick Steinhardt <psteinhardt@gitlab.com> | 2023-01-09 11:51:48 +0300 |
commit | ee7c6acd586d74548138f5b7c8e89328621d2c71 (patch) | |
tree | 81b61a0e054f39cd07487696289f3ae792104f02 | |
parent | 22783620ad85d2aa4b5e6f628231d9bd8ca341d8 (diff) |
limithandler: Fix queueing mechanism in the concurrency limiter
The concurrency limiter has two different mechanisms:
1. Callers first get put into a queue that is global across all
limiting keys. This queue is depth- and time-limited, which means
that callers will get rejected if it is full and will be evicted
when they took too long to acquire the concurrency token.
2. The concurrency limit itself that will only allow a set number of
callers at the same time. This functionality is per limiting key.
While the intent is usually to concurrency-limit either by user or by
repository, this is sabotaged by the fact that the queue is key-agnostic
and thus global. So even if almost all calls would apply to a single
repository only, if the queue is enabled and full then it would become
impossible to invoke the function for any other repository now. As a
result the queue is rather useless as a concurrency-limiting primitive.
But there is another big design flaw: as callers need to be in the queue
to obtain the concurrency-limiting token, and the number of callers in
the queue is strictly limited, we essentially have a global concurrency
limit to obtain the concurrency-limiting tokens. Suppose you have two
calls to a function that has a maximum queue depth of 1 and two calls to
this function at the same time. Even if the concurrency limit would now
theoretically allow for both functions to run at the same time, there is
a race window where both callers might try to enter the queue at the
same point in time. If this race is lost, then one of both callers will
be rejected due to the queue being full while the other one is trying to
obtain the concurrency token. This issue in fact surfaces in our tests,
where the `TestStreamLimitHandler` test is frequently failing because
the race is often lost.
This second design flaw cannot easily be fixed while the queue remains
global: we need to remain in the queue when trying to acquire the
concurrency token, or otherwise it wouldn't really be limiting anything
at all.
Convert the queue to be per-key so that each resource identified by a
key essentially has its own queue. This fixes both issues:
- If the queue is full then we only limit access to the specific
resource for which it is full. This makes the queueing mechanism
useful again.
- We can now change the queueing logic to allow as many callers into
the queue as the concurrency limit would allow _plus_ the allowed
depth of the queue itself. This fixes the race described aboved.
Changelog: fixed
-rw-r--r-- | doc/backpressure.md | 8 | ||||
-rw-r--r-- | internal/middleware/limithandler/concurrency_limiter.go | 160 |
2 files changed, 90 insertions, 78 deletions
diff --git a/doc/backpressure.md b/doc/backpressure.md index 71ae4fef0..f5110f8d8 100644 --- a/doc/backpressure.md +++ b/doc/backpressure.md @@ -38,9 +38,9 @@ configuration can prevent an unbounded in-memory queue of requests: - `max_queue_wait` is the maximum amount of time a request can wait in the concurrency queue. When a request waits longer than this time, it returns an error to the client. -- `max_queue_size` is the maximum size the concurrency queue can grow for a given - RPC. If a concurrency queue is at its maximum, subsequent requests - return with an error. +- `max_queue_size` is the maximum size the concurrency queue can grow for a + given RPC. If a concurrency queue is at its maximum, subsequent requests + return with an error. The queue size is per repository. For example: @@ -75,7 +75,7 @@ In the above configuration, the `token bucket` has a capacity of 1 and gets refilled every minute. This means that Gitaly only accepts 1 `RepackFull` request per repository each minute. -Requests that come in after the `token bucket` is full (and before it is +Requests that come in after the `token bucket` is full (and before it is replenished) are rejected with an error. ## Errors diff --git a/internal/middleware/limithandler/concurrency_limiter.go b/internal/middleware/limithandler/concurrency_limiter.go index 0267dc3c1..b080ee658 100644 --- a/internal/middleware/limithandler/concurrency_limiter.go +++ b/internal/middleware/limithandler/concurrency_limiter.go @@ -28,18 +28,59 @@ type QueueTickerCreator func() helper.Ticker // keyedConcurrencyLimiter is a concurrency limiter that applies to a specific keyed resource. type keyedConcurrencyLimiter struct { - // tokens is the channel of available tokens, where every token allows one concurrent call - // to the concurrency-limited function. - tokens chan struct{} refcount int + monitor ConcurrencyMonitor maxQueuedTickerCreator QueueTickerCreator + + // concurrencyTokens is the channel of available concurrency tokens, where every token + // allows one concurrent call to the concurrency-limited function. + concurrencyTokens chan struct{} + // queueTokens is the channel of available queue tokens, where every token allows one + // concurrent call to be admitted to the queue. + queueTokens chan struct{} } -// acquire tries to acquire the semaphore. It may fail if there is a max queue-time ticker that -// ticks before acquiring a token. -func (sem *keyedConcurrencyLimiter) acquire(ctx context.Context) error { - var ticker helper.Ticker +// acquire tries to acquire the semaphore. It may fail if the admission queue is full or if the max +// queue-time ticker ticks before acquiring a concurrency token. +func (sem *keyedConcurrencyLimiter) acquire(ctx context.Context) (returnedErr error) { + if sem.queueTokens != nil { + // Try to acquire the queueing token. The queueing token is used to control how many + // callers may wait for the concurrency token at the same time. If there are no more + // queueing tokens then this indicates that the queue is full and we thus return an + // error immediately. + select { + case sem.queueTokens <- struct{}{}: + // We have acquired a queueing token, so we need to release it if acquiring + // the concurrency token fails. If we succeed to acquire the concurrency + // token though then we retain the queueing token until the caller signals + // that the concurrency-limited function has finished. As a consequence the + // queue token is returned together with the concurrency token. + // + // A simpler model would be to just have `maxQueueLength` many queueing + // tokens. But this would add concurrency-limiting when acquiring the queue + // token itself, which is not what we want to do. Instead, we want to admit + // as many callers into the queue as the queue length permits plus the + // number of available concurrency tokens allows. + defer func() { + if returnedErr != nil { + <-sem.queueTokens + } + }() + default: + sem.monitor.Dropped(ctx, "max_size") + return ErrMaxQueueSize + } + } + + // We are queued now, so let's tell the monitor. Furthermore, even though we're still + // holding the queueing token when this function exits successfully we also tell the monitor + // that we have exited the queue. It is only an implemenation detail anyway that we hold on + // to the token, so the monitor shouldn't care about that. + sem.monitor.Queued(ctx) + defer sem.monitor.Dequeued(ctx) + // Set up the ticker that keeps us from waiting indefinitely on the concurrency token. + var ticker helper.Ticker if sem.maxQueuedTickerCreator != nil { ticker = sem.maxQueuedTickerCreator() } else { @@ -49,20 +90,27 @@ func (sem *keyedConcurrencyLimiter) acquire(ctx context.Context) error { defer ticker.Stop() ticker.Reset() + // Try to acquire the concurrency token now that we're in the queue. select { - case sem.tokens <- struct{}{}: + case sem.concurrencyTokens <- struct{}{}: return nil case <-ticker.C(): + sem.monitor.Dropped(ctx, "max_time") return ErrMaxQueueTime case <-ctx.Done(): return ctx.Err() } } -// release releases the semaphore. -func (sem *keyedConcurrencyLimiter) release() { <-sem.tokens } +// release releases the acquired tokens. +func (sem *keyedConcurrencyLimiter) release() { + if sem.queueTokens != nil { + <-sem.queueTokens + } + <-sem.concurrencyTokens +} -// ConcurrencyLimiter contains rate limiter state +// ConcurrencyLimiter contains rate limiter state. type ConcurrencyLimiter struct { // maxConcurrencyLimit is the maximum number of concurrent calls to the limited function. // This limit is per key. @@ -80,8 +128,6 @@ type ConcurrencyLimiter struct { monitor ConcurrencyMonitor m sync.RWMutex - // queued tracks the current number of operations waiting in the admission queue. - queued int64 // limitsByKey tracks all concurrency limits per key. Its per-key entries are lazily created // and will get evicted once there are no concurrency-limited calls for any such key // anymore. @@ -106,9 +152,9 @@ func NewConcurrencyLimiter(maxConcurrencyLimit, maxQueueLength int, maxQueuedTic // Limit will limit the concurrency of the limited function f. There are two distinct mechanisms // that limit execution of the function: // -// 1. First, every call will enter the queue. This queue is global across all keys and limits how -// many callers may try to acquire their per-key semaphore at the same time. If the queue is full -// the caller will be rejected. +// 1. First, every call will enter the per-key queue. This queue limits how many callers may try to +// acquire their per-key semaphore at the same time. If the queue is full the caller will be +// rejected. // 2. Second, when the caller has successfully entered the queue, they try to acquire their per-key // semaphore. If this takes longer than the maximum queueing limit then the caller will be // dequeued and gets an error. @@ -117,52 +163,32 @@ func (c *ConcurrencyLimiter) Limit(ctx context.Context, limitingKey string, f Li return f() } - var decremented bool - - log := ctxlogrus.Extract(ctx).WithField("limiting_key", limitingKey) - if err := c.queueInc(ctx); err != nil { - if errors.Is(err, ErrMaxQueueSize) { - return nil, structerr.NewResourceExhausted("%w", ErrMaxQueueSize).WithDetail( - &gitalypb.LimitError{ - ErrorMessage: err.Error(), - RetryAfter: durationpb.New(0), - }, - ) - } - - log.WithError(err).Error("unexpected error when queueing request") - return nil, err - } - defer c.queueDec(&decremented) - - start := time.Now() - c.monitor.Queued(ctx) - sem := c.getConcurrencyLimit(limitingKey) defer c.putConcurrencyLimit(limitingKey) - err := sem.acquire(ctx) - c.queueDec(&decremented) - - c.monitor.Dequeued(ctx) - if err != nil { - if errors.Is(err, ErrMaxQueueTime) { - c.monitor.Dropped(ctx, "max_time") + start := time.Now() + if err := sem.acquire(ctx); err != nil { + switch err { + case ErrMaxQueueSize: + return nil, structerr.NewResourceExhausted("%w", ErrMaxQueueSize).WithDetail(&gitalypb.LimitError{ + ErrorMessage: err.Error(), + RetryAfter: durationpb.New(0), + }) + case ErrMaxQueueTime: return nil, structerr.NewResourceExhausted("%w", ErrMaxQueueTime).WithDetail(&gitalypb.LimitError{ ErrorMessage: err.Error(), RetryAfter: durationpb.New(0), }) + default: + ctxlogrus.Extract(ctx).WithField("limiting_key", limitingKey).WithError(err).Error("unexpected error when dequeueing request") + return nil, err } - - log.WithError(err).Error("unexpected error when dequeueing request") - return nil, err } defer sem.release() c.monitor.Enter(ctx, time.Since(start)) defer c.monitor.Exit(ctx) - return f() } @@ -173,13 +199,24 @@ func (c *ConcurrencyLimiter) getConcurrencyLimit(limitingKey string) *keyedConcu defer c.m.Unlock() if c.limitsByKey[limitingKey] == nil { + // Set up the queue tokens in case a maximum queue length was requested. As the + // queue tokens are kept during the whole lifetime of the concurrency-limited + // function we add the concurrency tokens to the number of available token. + var queueTokens chan struct{} + if c.maxQueueLength > 0 { + queueTokens = make(chan struct{}, c.maxConcurrencyLimit+c.maxQueueLength) + } + c.limitsByKey[limitingKey] = &keyedConcurrencyLimiter{ - tokens: make(chan struct{}, c.maxConcurrencyLimit), + monitor: c.monitor, maxQueuedTickerCreator: c.maxQueuedTickerCreator, + concurrencyTokens: make(chan struct{}, c.maxConcurrencyLimit), + queueTokens: queueTokens, } } c.limitsByKey[limitingKey].refcount++ + return c.limitsByKey[limitingKey] } @@ -212,31 +249,6 @@ func (c *ConcurrencyLimiter) countSemaphores() int { return len(c.limitsByKey) } -func (c *ConcurrencyLimiter) queueInc(ctx context.Context) error { - c.m.Lock() - defer c.m.Unlock() - - if c.maxQueueLength > 0 && - c.queued >= c.maxQueueLength { - c.monitor.Dropped(ctx, "max_size") - return ErrMaxQueueSize - } - - c.queued++ - return nil -} - -func (c *ConcurrencyLimiter) queueDec(decremented *bool) { - if decremented == nil || *decremented { - return - } - *decremented = true - c.m.Lock() - defer c.m.Unlock() - - c.queued-- -} - // WithConcurrencyLimiters sets up middleware to limit the concurrency of // requests based on RPC and repository func WithConcurrencyLimiters(cfg config.Cfg, middleware *LimiterMiddleware) { |