Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPatrick Steinhardt <psteinhardt@gitlab.com>2023-01-05 13:29:47 +0300
committerPatrick Steinhardt <psteinhardt@gitlab.com>2023-01-09 11:51:48 +0300
commitee7c6acd586d74548138f5b7c8e89328621d2c71 (patch)
tree81b61a0e054f39cd07487696289f3ae792104f02
parent22783620ad85d2aa4b5e6f628231d9bd8ca341d8 (diff)
limithandler: Fix queueing mechanism in the concurrency limiter
The concurrency limiter has two different mechanisms: 1. Callers first get put into a queue that is global across all limiting keys. This queue is depth- and time-limited, which means that callers will get rejected if it is full and will be evicted when they took too long to acquire the concurrency token. 2. The concurrency limit itself that will only allow a set number of callers at the same time. This functionality is per limiting key. While the intent is usually to concurrency-limit either by user or by repository, this is sabotaged by the fact that the queue is key-agnostic and thus global. So even if almost all calls would apply to a single repository only, if the queue is enabled and full then it would become impossible to invoke the function for any other repository now. As a result the queue is rather useless as a concurrency-limiting primitive. But there is another big design flaw: as callers need to be in the queue to obtain the concurrency-limiting token, and the number of callers in the queue is strictly limited, we essentially have a global concurrency limit to obtain the concurrency-limiting tokens. Suppose you have two calls to a function that has a maximum queue depth of 1 and two calls to this function at the same time. Even if the concurrency limit would now theoretically allow for both functions to run at the same time, there is a race window where both callers might try to enter the queue at the same point in time. If this race is lost, then one of both callers will be rejected due to the queue being full while the other one is trying to obtain the concurrency token. This issue in fact surfaces in our tests, where the `TestStreamLimitHandler` test is frequently failing because the race is often lost. This second design flaw cannot easily be fixed while the queue remains global: we need to remain in the queue when trying to acquire the concurrency token, or otherwise it wouldn't really be limiting anything at all. Convert the queue to be per-key so that each resource identified by a key essentially has its own queue. This fixes both issues: - If the queue is full then we only limit access to the specific resource for which it is full. This makes the queueing mechanism useful again. - We can now change the queueing logic to allow as many callers into the queue as the concurrency limit would allow _plus_ the allowed depth of the queue itself. This fixes the race described aboved. Changelog: fixed
-rw-r--r--doc/backpressure.md8
-rw-r--r--internal/middleware/limithandler/concurrency_limiter.go160
2 files changed, 90 insertions, 78 deletions
diff --git a/doc/backpressure.md b/doc/backpressure.md
index 71ae4fef0..f5110f8d8 100644
--- a/doc/backpressure.md
+++ b/doc/backpressure.md
@@ -38,9 +38,9 @@ configuration can prevent an unbounded in-memory queue of requests:
- `max_queue_wait` is the maximum amount of time a request can wait in the
concurrency queue. When a request waits longer than this time, it returns
an error to the client.
-- `max_queue_size` is the maximum size the concurrency queue can grow for a given
- RPC. If a concurrency queue is at its maximum, subsequent requests
- return with an error.
+- `max_queue_size` is the maximum size the concurrency queue can grow for a
+ given RPC. If a concurrency queue is at its maximum, subsequent requests
+ return with an error. The queue size is per repository.
For example:
@@ -75,7 +75,7 @@ In the above configuration, the `token bucket` has a capacity of 1 and gets
refilled every minute. This means that Gitaly only accepts 1 `RepackFull`
request per repository each minute.
-Requests that come in after the `token bucket` is full (and before it is
+Requests that come in after the `token bucket` is full (and before it is
replenished) are rejected with an error.
## Errors
diff --git a/internal/middleware/limithandler/concurrency_limiter.go b/internal/middleware/limithandler/concurrency_limiter.go
index 0267dc3c1..b080ee658 100644
--- a/internal/middleware/limithandler/concurrency_limiter.go
+++ b/internal/middleware/limithandler/concurrency_limiter.go
@@ -28,18 +28,59 @@ type QueueTickerCreator func() helper.Ticker
// keyedConcurrencyLimiter is a concurrency limiter that applies to a specific keyed resource.
type keyedConcurrencyLimiter struct {
- // tokens is the channel of available tokens, where every token allows one concurrent call
- // to the concurrency-limited function.
- tokens chan struct{}
refcount int
+ monitor ConcurrencyMonitor
maxQueuedTickerCreator QueueTickerCreator
+
+ // concurrencyTokens is the channel of available concurrency tokens, where every token
+ // allows one concurrent call to the concurrency-limited function.
+ concurrencyTokens chan struct{}
+ // queueTokens is the channel of available queue tokens, where every token allows one
+ // concurrent call to be admitted to the queue.
+ queueTokens chan struct{}
}
-// acquire tries to acquire the semaphore. It may fail if there is a max queue-time ticker that
-// ticks before acquiring a token.
-func (sem *keyedConcurrencyLimiter) acquire(ctx context.Context) error {
- var ticker helper.Ticker
+// acquire tries to acquire the semaphore. It may fail if the admission queue is full or if the max
+// queue-time ticker ticks before acquiring a concurrency token.
+func (sem *keyedConcurrencyLimiter) acquire(ctx context.Context) (returnedErr error) {
+ if sem.queueTokens != nil {
+ // Try to acquire the queueing token. The queueing token is used to control how many
+ // callers may wait for the concurrency token at the same time. If there are no more
+ // queueing tokens then this indicates that the queue is full and we thus return an
+ // error immediately.
+ select {
+ case sem.queueTokens <- struct{}{}:
+ // We have acquired a queueing token, so we need to release it if acquiring
+ // the concurrency token fails. If we succeed to acquire the concurrency
+ // token though then we retain the queueing token until the caller signals
+ // that the concurrency-limited function has finished. As a consequence the
+ // queue token is returned together with the concurrency token.
+ //
+ // A simpler model would be to just have `maxQueueLength` many queueing
+ // tokens. But this would add concurrency-limiting when acquiring the queue
+ // token itself, which is not what we want to do. Instead, we want to admit
+ // as many callers into the queue as the queue length permits plus the
+ // number of available concurrency tokens allows.
+ defer func() {
+ if returnedErr != nil {
+ <-sem.queueTokens
+ }
+ }()
+ default:
+ sem.monitor.Dropped(ctx, "max_size")
+ return ErrMaxQueueSize
+ }
+ }
+
+ // We are queued now, so let's tell the monitor. Furthermore, even though we're still
+ // holding the queueing token when this function exits successfully we also tell the monitor
+ // that we have exited the queue. It is only an implemenation detail anyway that we hold on
+ // to the token, so the monitor shouldn't care about that.
+ sem.monitor.Queued(ctx)
+ defer sem.monitor.Dequeued(ctx)
+ // Set up the ticker that keeps us from waiting indefinitely on the concurrency token.
+ var ticker helper.Ticker
if sem.maxQueuedTickerCreator != nil {
ticker = sem.maxQueuedTickerCreator()
} else {
@@ -49,20 +90,27 @@ func (sem *keyedConcurrencyLimiter) acquire(ctx context.Context) error {
defer ticker.Stop()
ticker.Reset()
+ // Try to acquire the concurrency token now that we're in the queue.
select {
- case sem.tokens <- struct{}{}:
+ case sem.concurrencyTokens <- struct{}{}:
return nil
case <-ticker.C():
+ sem.monitor.Dropped(ctx, "max_time")
return ErrMaxQueueTime
case <-ctx.Done():
return ctx.Err()
}
}
-// release releases the semaphore.
-func (sem *keyedConcurrencyLimiter) release() { <-sem.tokens }
+// release releases the acquired tokens.
+func (sem *keyedConcurrencyLimiter) release() {
+ if sem.queueTokens != nil {
+ <-sem.queueTokens
+ }
+ <-sem.concurrencyTokens
+}
-// ConcurrencyLimiter contains rate limiter state
+// ConcurrencyLimiter contains rate limiter state.
type ConcurrencyLimiter struct {
// maxConcurrencyLimit is the maximum number of concurrent calls to the limited function.
// This limit is per key.
@@ -80,8 +128,6 @@ type ConcurrencyLimiter struct {
monitor ConcurrencyMonitor
m sync.RWMutex
- // queued tracks the current number of operations waiting in the admission queue.
- queued int64
// limitsByKey tracks all concurrency limits per key. Its per-key entries are lazily created
// and will get evicted once there are no concurrency-limited calls for any such key
// anymore.
@@ -106,9 +152,9 @@ func NewConcurrencyLimiter(maxConcurrencyLimit, maxQueueLength int, maxQueuedTic
// Limit will limit the concurrency of the limited function f. There are two distinct mechanisms
// that limit execution of the function:
//
-// 1. First, every call will enter the queue. This queue is global across all keys and limits how
-// many callers may try to acquire their per-key semaphore at the same time. If the queue is full
-// the caller will be rejected.
+// 1. First, every call will enter the per-key queue. This queue limits how many callers may try to
+// acquire their per-key semaphore at the same time. If the queue is full the caller will be
+// rejected.
// 2. Second, when the caller has successfully entered the queue, they try to acquire their per-key
// semaphore. If this takes longer than the maximum queueing limit then the caller will be
// dequeued and gets an error.
@@ -117,52 +163,32 @@ func (c *ConcurrencyLimiter) Limit(ctx context.Context, limitingKey string, f Li
return f()
}
- var decremented bool
-
- log := ctxlogrus.Extract(ctx).WithField("limiting_key", limitingKey)
- if err := c.queueInc(ctx); err != nil {
- if errors.Is(err, ErrMaxQueueSize) {
- return nil, structerr.NewResourceExhausted("%w", ErrMaxQueueSize).WithDetail(
- &gitalypb.LimitError{
- ErrorMessage: err.Error(),
- RetryAfter: durationpb.New(0),
- },
- )
- }
-
- log.WithError(err).Error("unexpected error when queueing request")
- return nil, err
- }
- defer c.queueDec(&decremented)
-
- start := time.Now()
- c.monitor.Queued(ctx)
-
sem := c.getConcurrencyLimit(limitingKey)
defer c.putConcurrencyLimit(limitingKey)
- err := sem.acquire(ctx)
- c.queueDec(&decremented)
-
- c.monitor.Dequeued(ctx)
- if err != nil {
- if errors.Is(err, ErrMaxQueueTime) {
- c.monitor.Dropped(ctx, "max_time")
+ start := time.Now()
+ if err := sem.acquire(ctx); err != nil {
+ switch err {
+ case ErrMaxQueueSize:
+ return nil, structerr.NewResourceExhausted("%w", ErrMaxQueueSize).WithDetail(&gitalypb.LimitError{
+ ErrorMessage: err.Error(),
+ RetryAfter: durationpb.New(0),
+ })
+ case ErrMaxQueueTime:
return nil, structerr.NewResourceExhausted("%w", ErrMaxQueueTime).WithDetail(&gitalypb.LimitError{
ErrorMessage: err.Error(),
RetryAfter: durationpb.New(0),
})
+ default:
+ ctxlogrus.Extract(ctx).WithField("limiting_key", limitingKey).WithError(err).Error("unexpected error when dequeueing request")
+ return nil, err
}
-
- log.WithError(err).Error("unexpected error when dequeueing request")
- return nil, err
}
defer sem.release()
c.monitor.Enter(ctx, time.Since(start))
defer c.monitor.Exit(ctx)
-
return f()
}
@@ -173,13 +199,24 @@ func (c *ConcurrencyLimiter) getConcurrencyLimit(limitingKey string) *keyedConcu
defer c.m.Unlock()
if c.limitsByKey[limitingKey] == nil {
+ // Set up the queue tokens in case a maximum queue length was requested. As the
+ // queue tokens are kept during the whole lifetime of the concurrency-limited
+ // function we add the concurrency tokens to the number of available token.
+ var queueTokens chan struct{}
+ if c.maxQueueLength > 0 {
+ queueTokens = make(chan struct{}, c.maxConcurrencyLimit+c.maxQueueLength)
+ }
+
c.limitsByKey[limitingKey] = &keyedConcurrencyLimiter{
- tokens: make(chan struct{}, c.maxConcurrencyLimit),
+ monitor: c.monitor,
maxQueuedTickerCreator: c.maxQueuedTickerCreator,
+ concurrencyTokens: make(chan struct{}, c.maxConcurrencyLimit),
+ queueTokens: queueTokens,
}
}
c.limitsByKey[limitingKey].refcount++
+
return c.limitsByKey[limitingKey]
}
@@ -212,31 +249,6 @@ func (c *ConcurrencyLimiter) countSemaphores() int {
return len(c.limitsByKey)
}
-func (c *ConcurrencyLimiter) queueInc(ctx context.Context) error {
- c.m.Lock()
- defer c.m.Unlock()
-
- if c.maxQueueLength > 0 &&
- c.queued >= c.maxQueueLength {
- c.monitor.Dropped(ctx, "max_size")
- return ErrMaxQueueSize
- }
-
- c.queued++
- return nil
-}
-
-func (c *ConcurrencyLimiter) queueDec(decremented *bool) {
- if decremented == nil || *decremented {
- return
- }
- *decremented = true
- c.m.Lock()
- defer c.m.Unlock()
-
- c.queued--
-}
-
// WithConcurrencyLimiters sets up middleware to limit the concurrency of
// requests based on RPC and repository
func WithConcurrencyLimiters(cfg config.Cfg, middleware *LimiterMiddleware) {