Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorToon Claes <toon@gitlab.com>2023-01-09 14:19:01 +0300
committerToon Claes <toon@gitlab.com>2023-01-09 14:19:01 +0300
commitf426025daeb6b1f426d1f7f18ca69bb23ee5b7c7 (patch)
tree81b61a0e054f39cd07487696289f3ae792104f02
parent22783620ad85d2aa4b5e6f628231d9bd8ca341d8 (diff)
parentee7c6acd586d74548138f5b7c8e89328621d2c71 (diff)
Merge branch 'pks-limithandler-fix-queueing' into 'master'
limithandler: Fix queueing mechanism in the concurrency limiter Closes #4411, #4697, and #4625 See merge request https://gitlab.com/gitlab-org/gitaly/-/merge_requests/5236 Merged-by: Toon Claes <toon@gitlab.com> Approved-by: Quang-Minh Nguyen <qmnguyen@gitlab.com> Approved-by: Steve Azzopardi <sazzopardi@gitlab.com> Approved-by: Toon Claes <toon@gitlab.com> Reviewed-by: Steve Azzopardi <sazzopardi@gitlab.com> Reviewed-by: Toon Claes <toon@gitlab.com> Reviewed-by: Patrick Steinhardt <psteinhardt@gitlab.com> Co-authored-by: Patrick Steinhardt <psteinhardt@gitlab.com>
-rw-r--r--doc/backpressure.md8
-rw-r--r--internal/middleware/limithandler/concurrency_limiter.go160
2 files changed, 90 insertions, 78 deletions
diff --git a/doc/backpressure.md b/doc/backpressure.md
index 71ae4fef0..f5110f8d8 100644
--- a/doc/backpressure.md
+++ b/doc/backpressure.md
@@ -38,9 +38,9 @@ configuration can prevent an unbounded in-memory queue of requests:
- `max_queue_wait` is the maximum amount of time a request can wait in the
concurrency queue. When a request waits longer than this time, it returns
an error to the client.
-- `max_queue_size` is the maximum size the concurrency queue can grow for a given
- RPC. If a concurrency queue is at its maximum, subsequent requests
- return with an error.
+- `max_queue_size` is the maximum size the concurrency queue can grow for a
+ given RPC. If a concurrency queue is at its maximum, subsequent requests
+ return with an error. The queue size is per repository.
For example:
@@ -75,7 +75,7 @@ In the above configuration, the `token bucket` has a capacity of 1 and gets
refilled every minute. This means that Gitaly only accepts 1 `RepackFull`
request per repository each minute.
-Requests that come in after the `token bucket` is full (and before it is
+Requests that come in after the `token bucket` is full (and before it is
replenished) are rejected with an error.
## Errors
diff --git a/internal/middleware/limithandler/concurrency_limiter.go b/internal/middleware/limithandler/concurrency_limiter.go
index 0267dc3c1..b080ee658 100644
--- a/internal/middleware/limithandler/concurrency_limiter.go
+++ b/internal/middleware/limithandler/concurrency_limiter.go
@@ -28,18 +28,59 @@ type QueueTickerCreator func() helper.Ticker
// keyedConcurrencyLimiter is a concurrency limiter that applies to a specific keyed resource.
type keyedConcurrencyLimiter struct {
- // tokens is the channel of available tokens, where every token allows one concurrent call
- // to the concurrency-limited function.
- tokens chan struct{}
refcount int
+ monitor ConcurrencyMonitor
maxQueuedTickerCreator QueueTickerCreator
+
+ // concurrencyTokens is the channel of available concurrency tokens, where every token
+ // allows one concurrent call to the concurrency-limited function.
+ concurrencyTokens chan struct{}
+ // queueTokens is the channel of available queue tokens, where every token allows one
+ // concurrent call to be admitted to the queue.
+ queueTokens chan struct{}
}
-// acquire tries to acquire the semaphore. It may fail if there is a max queue-time ticker that
-// ticks before acquiring a token.
-func (sem *keyedConcurrencyLimiter) acquire(ctx context.Context) error {
- var ticker helper.Ticker
+// acquire tries to acquire the semaphore. It may fail if the admission queue is full or if the max
+// queue-time ticker ticks before acquiring a concurrency token.
+func (sem *keyedConcurrencyLimiter) acquire(ctx context.Context) (returnedErr error) {
+ if sem.queueTokens != nil {
+ // Try to acquire the queueing token. The queueing token is used to control how many
+ // callers may wait for the concurrency token at the same time. If there are no more
+ // queueing tokens then this indicates that the queue is full and we thus return an
+ // error immediately.
+ select {
+ case sem.queueTokens <- struct{}{}:
+ // We have acquired a queueing token, so we need to release it if acquiring
+ // the concurrency token fails. If we succeed to acquire the concurrency
+ // token though then we retain the queueing token until the caller signals
+ // that the concurrency-limited function has finished. As a consequence the
+ // queue token is returned together with the concurrency token.
+ //
+ // A simpler model would be to just have `maxQueueLength` many queueing
+ // tokens. But this would add concurrency-limiting when acquiring the queue
+ // token itself, which is not what we want to do. Instead, we want to admit
+ // as many callers into the queue as the queue length permits plus the
+ // number of available concurrency tokens allows.
+ defer func() {
+ if returnedErr != nil {
+ <-sem.queueTokens
+ }
+ }()
+ default:
+ sem.monitor.Dropped(ctx, "max_size")
+ return ErrMaxQueueSize
+ }
+ }
+
+ // We are queued now, so let's tell the monitor. Furthermore, even though we're still
+ // holding the queueing token when this function exits successfully we also tell the monitor
+ // that we have exited the queue. It is only an implemenation detail anyway that we hold on
+ // to the token, so the monitor shouldn't care about that.
+ sem.monitor.Queued(ctx)
+ defer sem.monitor.Dequeued(ctx)
+ // Set up the ticker that keeps us from waiting indefinitely on the concurrency token.
+ var ticker helper.Ticker
if sem.maxQueuedTickerCreator != nil {
ticker = sem.maxQueuedTickerCreator()
} else {
@@ -49,20 +90,27 @@ func (sem *keyedConcurrencyLimiter) acquire(ctx context.Context) error {
defer ticker.Stop()
ticker.Reset()
+ // Try to acquire the concurrency token now that we're in the queue.
select {
- case sem.tokens <- struct{}{}:
+ case sem.concurrencyTokens <- struct{}{}:
return nil
case <-ticker.C():
+ sem.monitor.Dropped(ctx, "max_time")
return ErrMaxQueueTime
case <-ctx.Done():
return ctx.Err()
}
}
-// release releases the semaphore.
-func (sem *keyedConcurrencyLimiter) release() { <-sem.tokens }
+// release releases the acquired tokens.
+func (sem *keyedConcurrencyLimiter) release() {
+ if sem.queueTokens != nil {
+ <-sem.queueTokens
+ }
+ <-sem.concurrencyTokens
+}
-// ConcurrencyLimiter contains rate limiter state
+// ConcurrencyLimiter contains rate limiter state.
type ConcurrencyLimiter struct {
// maxConcurrencyLimit is the maximum number of concurrent calls to the limited function.
// This limit is per key.
@@ -80,8 +128,6 @@ type ConcurrencyLimiter struct {
monitor ConcurrencyMonitor
m sync.RWMutex
- // queued tracks the current number of operations waiting in the admission queue.
- queued int64
// limitsByKey tracks all concurrency limits per key. Its per-key entries are lazily created
// and will get evicted once there are no concurrency-limited calls for any such key
// anymore.
@@ -106,9 +152,9 @@ func NewConcurrencyLimiter(maxConcurrencyLimit, maxQueueLength int, maxQueuedTic
// Limit will limit the concurrency of the limited function f. There are two distinct mechanisms
// that limit execution of the function:
//
-// 1. First, every call will enter the queue. This queue is global across all keys and limits how
-// many callers may try to acquire their per-key semaphore at the same time. If the queue is full
-// the caller will be rejected.
+// 1. First, every call will enter the per-key queue. This queue limits how many callers may try to
+// acquire their per-key semaphore at the same time. If the queue is full the caller will be
+// rejected.
// 2. Second, when the caller has successfully entered the queue, they try to acquire their per-key
// semaphore. If this takes longer than the maximum queueing limit then the caller will be
// dequeued and gets an error.
@@ -117,52 +163,32 @@ func (c *ConcurrencyLimiter) Limit(ctx context.Context, limitingKey string, f Li
return f()
}
- var decremented bool
-
- log := ctxlogrus.Extract(ctx).WithField("limiting_key", limitingKey)
- if err := c.queueInc(ctx); err != nil {
- if errors.Is(err, ErrMaxQueueSize) {
- return nil, structerr.NewResourceExhausted("%w", ErrMaxQueueSize).WithDetail(
- &gitalypb.LimitError{
- ErrorMessage: err.Error(),
- RetryAfter: durationpb.New(0),
- },
- )
- }
-
- log.WithError(err).Error("unexpected error when queueing request")
- return nil, err
- }
- defer c.queueDec(&decremented)
-
- start := time.Now()
- c.monitor.Queued(ctx)
-
sem := c.getConcurrencyLimit(limitingKey)
defer c.putConcurrencyLimit(limitingKey)
- err := sem.acquire(ctx)
- c.queueDec(&decremented)
-
- c.monitor.Dequeued(ctx)
- if err != nil {
- if errors.Is(err, ErrMaxQueueTime) {
- c.monitor.Dropped(ctx, "max_time")
+ start := time.Now()
+ if err := sem.acquire(ctx); err != nil {
+ switch err {
+ case ErrMaxQueueSize:
+ return nil, structerr.NewResourceExhausted("%w", ErrMaxQueueSize).WithDetail(&gitalypb.LimitError{
+ ErrorMessage: err.Error(),
+ RetryAfter: durationpb.New(0),
+ })
+ case ErrMaxQueueTime:
return nil, structerr.NewResourceExhausted("%w", ErrMaxQueueTime).WithDetail(&gitalypb.LimitError{
ErrorMessage: err.Error(),
RetryAfter: durationpb.New(0),
})
+ default:
+ ctxlogrus.Extract(ctx).WithField("limiting_key", limitingKey).WithError(err).Error("unexpected error when dequeueing request")
+ return nil, err
}
-
- log.WithError(err).Error("unexpected error when dequeueing request")
- return nil, err
}
defer sem.release()
c.monitor.Enter(ctx, time.Since(start))
defer c.monitor.Exit(ctx)
-
return f()
}
@@ -173,13 +199,24 @@ func (c *ConcurrencyLimiter) getConcurrencyLimit(limitingKey string) *keyedConcu
defer c.m.Unlock()
if c.limitsByKey[limitingKey] == nil {
+ // Set up the queue tokens in case a maximum queue length was requested. As the
+ // queue tokens are kept during the whole lifetime of the concurrency-limited
+ // function we add the concurrency tokens to the number of available token.
+ var queueTokens chan struct{}
+ if c.maxQueueLength > 0 {
+ queueTokens = make(chan struct{}, c.maxConcurrencyLimit+c.maxQueueLength)
+ }
+
c.limitsByKey[limitingKey] = &keyedConcurrencyLimiter{
- tokens: make(chan struct{}, c.maxConcurrencyLimit),
+ monitor: c.monitor,
maxQueuedTickerCreator: c.maxQueuedTickerCreator,
+ concurrencyTokens: make(chan struct{}, c.maxConcurrencyLimit),
+ queueTokens: queueTokens,
}
}
c.limitsByKey[limitingKey].refcount++
+
return c.limitsByKey[limitingKey]
}
@@ -212,31 +249,6 @@ func (c *ConcurrencyLimiter) countSemaphores() int {
return len(c.limitsByKey)
}
-func (c *ConcurrencyLimiter) queueInc(ctx context.Context) error {
- c.m.Lock()
- defer c.m.Unlock()
-
- if c.maxQueueLength > 0 &&
- c.queued >= c.maxQueueLength {
- c.monitor.Dropped(ctx, "max_size")
- return ErrMaxQueueSize
- }
-
- c.queued++
- return nil
-}
-
-func (c *ConcurrencyLimiter) queueDec(decremented *bool) {
- if decremented == nil || *decremented {
- return
- }
- *decremented = true
- c.m.Lock()
- defer c.m.Unlock()
-
- c.queued--
-}
-
// WithConcurrencyLimiters sets up middleware to limit the concurrency of
// requests based on RPC and repository
func WithConcurrencyLimiters(cfg config.Cfg, middleware *LimiterMiddleware) {