Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPatrick Steinhardt <psteinhardt@gitlab.com>2023-01-05 12:50:38 +0300
committerPatrick Steinhardt <psteinhardt@gitlab.com>2023-01-06 16:52:26 +0300
commit260cb9c9839bcc18c2f6325aa0cc9ef1bbd4eeee (patch)
tree36f0b441b8e6a4501ba285470e59bbf3e0c0ffb4
parent81d969aa2701d173aedf1e19011450c5b3553c0d (diff)
limithandler: Improve concurrency limiter documentation
Improve documentation of the concurrency limiting infrastructure so that it becomes easier to understand how it is supposed to work. Rename variables as needed to further improve the reading flow. No functional change is intended.
-rw-r--r--internal/middleware/limithandler/concurrency_limiter.go139
1 files changed, 83 insertions, 56 deletions
diff --git a/internal/middleware/limithandler/concurrency_limiter.go b/internal/middleware/limithandler/concurrency_limiter.go
index e1e842c9f..0267dc3c1 100644
--- a/internal/middleware/limithandler/concurrency_limiter.go
+++ b/internal/middleware/limithandler/concurrency_limiter.go
@@ -28,16 +28,20 @@ type QueueTickerCreator func() helper.Ticker
// keyedConcurrencyLimiter is a concurrency limiter that applies to a specific keyed resource.
type keyedConcurrencyLimiter struct {
- tokens chan struct{}
- count int
- newTicker QueueTickerCreator
+ // tokens is the channel of available tokens, where every token allows one concurrent call
+ // to the concurrency-limited function.
+ tokens chan struct{}
+ refcount int
+ maxQueuedTickerCreator QueueTickerCreator
}
+// acquire tries to acquire the semaphore. It may fail if there is a max queue-time ticker that
+// ticks before acquiring a token.
func (sem *keyedConcurrencyLimiter) acquire(ctx context.Context) error {
var ticker helper.Ticker
- if sem.newTicker != nil {
- ticker = sem.newTicker()
+ if sem.maxQueuedTickerCreator != nil {
+ ticker = sem.maxQueuedTickerCreator()
} else {
ticker = helper.Ticker(helper.NewManualTicker())
}
@@ -55,48 +59,67 @@ func (sem *keyedConcurrencyLimiter) acquire(ctx context.Context) error {
}
}
+// release releases the semaphore.
func (sem *keyedConcurrencyLimiter) release() { <-sem.tokens }
// ConcurrencyLimiter contains rate limiter state
type ConcurrencyLimiter struct {
- limitsByKey map[string]*keyedConcurrencyLimiter
- // maxPerKey is the maximum number of concurrent operations
- // per lockKey
- maxPerKey int64
- // queued tracks the current number of operations waiting to be picked up
+ // maxConcurrencyLimit is the maximum number of concurrent calls to the limited function.
+ // This limit is per key.
+ maxConcurrencyLimit int64
+ // maxQueueLength is the maximum number of operations allowed to wait in a queued state.
+ // This limit is global and applies before the concurrency limit. Subsequent incoming
+ // operations will be rejected with an error immediately.
+ maxQueueLength int64
+ // maxQueuedTickerCreator is a function that creates a ticker used to determine how long a
+ // call may be queued.
+ maxQueuedTickerCreator QueueTickerCreator
+
+ // monitor is a monitor that will get notified of the state of concurrency-limited RPC
+ // calls.
+ monitor ConcurrencyMonitor
+
+ m sync.RWMutex
+ // queued tracks the current number of operations waiting in the admission queue.
queued int64
- // queuedLimit is the maximum number of operations allowed to wait in a queued state.
- // subsequent incoming operations will fail with an error.
- queuedLimit int64
- monitor ConcurrencyMonitor
- mux sync.RWMutex
- maxWaitTickerGetter QueueTickerCreator
+ // limitsByKey tracks all concurrency limits per key. Its per-key entries are lazily created
+ // and will get evicted once there are no concurrency-limited calls for any such key
+ // anymore.
+ limitsByKey map[string]*keyedConcurrencyLimiter
}
-// NewConcurrencyLimiter creates a new concurrency rate limiter
-func NewConcurrencyLimiter(perKeyLimit, globalLimit int, maxWaitTickerGetter QueueTickerCreator, monitor ConcurrencyMonitor) *ConcurrencyLimiter {
+// NewConcurrencyLimiter creates a new concurrency rate limiter.
+func NewConcurrencyLimiter(maxConcurrencyLimit, maxQueueLength int, maxQueuedTickerCreator QueueTickerCreator, monitor ConcurrencyMonitor) *ConcurrencyLimiter {
if monitor == nil {
monitor = NewNoopConcurrencyMonitor()
}
return &ConcurrencyLimiter{
- limitsByKey: make(map[string]*keyedConcurrencyLimiter),
- maxPerKey: int64(perKeyLimit),
- queuedLimit: int64(globalLimit),
- monitor: monitor,
- maxWaitTickerGetter: maxWaitTickerGetter,
+ maxConcurrencyLimit: int64(maxConcurrencyLimit),
+ maxQueueLength: int64(maxQueueLength),
+ maxQueuedTickerCreator: maxQueuedTickerCreator,
+ monitor: monitor,
+ limitsByKey: make(map[string]*keyedConcurrencyLimiter),
}
}
-// Limit will limit the concurrency of f
-func (c *ConcurrencyLimiter) Limit(ctx context.Context, lockKey string, f LimitedFunc) (interface{}, error) {
- if c.maxPerKey <= 0 {
+// Limit will limit the concurrency of the limited function f. There are two distinct mechanisms
+// that limit execution of the function:
+//
+// 1. First, every call will enter the queue. This queue is global across all keys and limits how
+// many callers may try to acquire their per-key semaphore at the same time. If the queue is full
+// the caller will be rejected.
+// 2. Second, when the caller has successfully entered the queue, they try to acquire their per-key
+// semaphore. If this takes longer than the maximum queueing limit then the caller will be
+// dequeued and gets an error.
+func (c *ConcurrencyLimiter) Limit(ctx context.Context, limitingKey string, f LimitedFunc) (interface{}, error) {
+ if c.maxConcurrencyLimit <= 0 {
return f()
}
var decremented bool
- log := ctxlogrus.Extract(ctx).WithField("limiting_key", lockKey)
+ log := ctxlogrus.Extract(ctx).WithField("limiting_key", limitingKey)
if err := c.queueInc(ctx); err != nil {
if errors.Is(err, ErrMaxQueueSize) {
return nil, structerr.NewResourceExhausted("%w", ErrMaxQueueSize).WithDetail(
@@ -115,8 +138,8 @@ func (c *ConcurrencyLimiter) Limit(ctx context.Context, lockKey string, f Limite
start := time.Now()
c.monitor.Queued(ctx)
- sem := c.getConcurrencyLimit(lockKey)
- defer c.putConcurrencyLimit(lockKey)
+ sem := c.getConcurrencyLimit(limitingKey)
+ defer c.putConcurrencyLimit(limitingKey)
err := sem.acquire(ctx)
c.queueDec(&decremented)
@@ -143,54 +166,58 @@ func (c *ConcurrencyLimiter) Limit(ctx context.Context, lockKey string, f Limite
return f()
}
-// Lazy create a semaphore for the given key
-func (c *ConcurrencyLimiter) getConcurrencyLimit(lockKey string) *keyedConcurrencyLimiter {
- c.mux.Lock()
- defer c.mux.Unlock()
+// getConcurrencyLimit retrieves the concurrency limit for the given key. If no such limiter exists
+// it will be lazily constructed.
+func (c *ConcurrencyLimiter) getConcurrencyLimit(limitingKey string) *keyedConcurrencyLimiter {
+ c.m.Lock()
+ defer c.m.Unlock()
- if c.limitsByKey[lockKey] == nil {
- c.limitsByKey[lockKey] = &keyedConcurrencyLimiter{
- tokens: make(chan struct{}, c.maxPerKey),
- newTicker: c.maxWaitTickerGetter,
+ if c.limitsByKey[limitingKey] == nil {
+ c.limitsByKey[limitingKey] = &keyedConcurrencyLimiter{
+ tokens: make(chan struct{}, c.maxConcurrencyLimit),
+ maxQueuedTickerCreator: c.maxQueuedTickerCreator,
}
}
- c.limitsByKey[lockKey].count++
- return c.limitsByKey[lockKey]
+ c.limitsByKey[limitingKey].refcount++
+ return c.limitsByKey[limitingKey]
}
-func (c *ConcurrencyLimiter) putConcurrencyLimit(lockKey string) {
- c.mux.Lock()
- defer c.mux.Unlock()
+// putConcurrencyLimit drops the reference to the concurrency limit identified by the given key.
+// This must only ever be called after `getConcurrencyLimit()` for the same key. If the reference
+// count of the concurrency limit drops to zero then it will be destroyed.
+func (c *ConcurrencyLimiter) putConcurrencyLimit(limitingKey string) {
+ c.m.Lock()
+ defer c.m.Unlock()
- ref := c.limitsByKey[lockKey]
+ ref := c.limitsByKey[limitingKey]
if ref == nil {
panic("semaphore should be in the map")
}
- if ref.count <= 0 {
- panic(fmt.Sprintf("bad semaphore ref count %d", ref.count))
+ if ref.refcount <= 0 {
+ panic(fmt.Sprintf("bad semaphore ref refcount %d", ref.refcount))
}
- ref.count--
- if ref.count == 0 {
- delete(c.limitsByKey, lockKey)
+ ref.refcount--
+ if ref.refcount == 0 {
+ delete(c.limitsByKey, limitingKey)
}
}
func (c *ConcurrencyLimiter) countSemaphores() int {
- c.mux.RLock()
- defer c.mux.RUnlock()
+ c.m.RLock()
+ defer c.m.RUnlock()
return len(c.limitsByKey)
}
func (c *ConcurrencyLimiter) queueInc(ctx context.Context) error {
- c.mux.Lock()
- defer c.mux.Unlock()
+ c.m.Lock()
+ defer c.m.Unlock()
- if c.queuedLimit > 0 &&
- c.queued >= c.queuedLimit {
+ if c.maxQueueLength > 0 &&
+ c.queued >= c.maxQueueLength {
c.monitor.Dropped(ctx, "max_size")
return ErrMaxQueueSize
}
@@ -204,8 +231,8 @@ func (c *ConcurrencyLimiter) queueDec(decremented *bool) {
return
}
*decremented = true
- c.mux.Lock()
- defer c.mux.Unlock()
+ c.m.Lock()
+ defer c.m.Unlock()
c.queued--
}